feat: IL-1β 식품 GraphRAG 스키마 확장 및 데이터 파이프라인

PostgreSQL + Apache AGE에 식품-바이오마커 관계 추가:

1. schema_food_biomarker.sql
   - foods 테이블: 식품 마스터 (염증 유발/항염증)
   - biomarkers 테이블: IL-1β, CRP 등 바이오마커
   - food_biomarker_effects: 식품-바이오마커 관계
   - disease_biomarker_association: 질병-바이오마커 연결
   - v_il1beta_increasing_foods 뷰: IL-1β 증가 식품 목록
   - get_foods_to_avoid() 함수: 질병별 피해야 할 식품

2. age_food_graph.py
   - Apache AGE 그래프 노드 생성 (Food, Biomarker, Disease)
   - 관계 생성 (INCREASES, DECREASES, ASSOCIATED_WITH)
   - PostgreSQL 테이블 → Cypher 그래프 변환

3. import_il1beta_foods.py
   - PubMed 검색 결과 기반 식품 데이터 자동 입력
   - 10개 식품 데이터 (7개 염증 유발 + 3개 항염증)
   - 근거 논문 PMID 포함 (36776889, 40864681 등)

4. il1beta_proinflammatory_foods_research.py
   - PubMed 검색: 고지방, 고당, 가공육, 적색육, 알코올
   - 24개 논문 분석
   - 카테고리별 분류 및 메커니즘 분석

활용:
- NAFLD 환자 식이 지도 (고지방식 금지)
- 관절염 환자 항염증 식단 (오메가-3 권장)
- 근거 기반 영양 상담 (PubMed PMID 제시)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-02-04 17:20:53 +09:00
parent 1e904000c7
commit 37821fefdb
4 changed files with 1288 additions and 0 deletions

View File

@@ -0,0 +1,335 @@
"""
Apache AGE 그래프 생성: Food + Biomarker 노드 및 관계
목적: PostgreSQL 테이블 데이터를 Apache AGE 그래프로 변환
작성일: 2026-02-04
"""
import sys
import os
# UTF-8 인코딩 강제
if sys.platform == 'win32':
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
import psycopg2
from psycopg2.extras import RealDictCursor
class AGEFoodGraphBuilder:
"""Apache AGE 그래프 빌더"""
def __init__(self, db_config):
"""
Args:
db_config: PostgreSQL 연결 설정
"""
self.db_config = db_config
self.conn = None
self.cursor = None
self.graph_name = 'pharmacy_graph'
def connect(self):
"""PostgreSQL 연결"""
try:
self.conn = psycopg2.connect(**self.db_config)
self.cursor = self.conn.cursor(cursor_factory=RealDictCursor)
print("✅ PostgreSQL 연결 성공")
# AGE 확장 로드
self.cursor.execute("LOAD 'age';")
self.cursor.execute("SET search_path = ag_catalog, '$user', public;")
# 그래프 생성 (이미 있으면 무시)
try:
self.cursor.execute(f"SELECT create_graph('{self.graph_name}');")
self.conn.commit()
print(f"✅ 그래프 '{self.graph_name}' 생성 완료")
except psycopg2.Error as e:
if 'already exists' in str(e):
print(f" 그래프 '{self.graph_name}' 이미 존재")
self.conn.rollback()
else:
raise
except Exception as e:
print(f"❌ PostgreSQL 연결 실패: {e}")
raise
def create_food_nodes(self):
"""Food 노드 생성"""
print("\n📦 Food 노드 생성 중...")
try:
# SQL 테이블에서 식품 데이터 조회
self.cursor.execute("""
SELECT food_id, food_name, food_name_en, category, subcategory, description
FROM foods
""")
foods = self.cursor.fetchall()
for food in foods:
# Cypher 쿼리로 노드 생성
query = f"""
SELECT * FROM cypher('{self.graph_name}', $$
MERGE (f:Food {{
food_id: {food['food_id']},
name: '{food['food_name']}',
name_en: '{food['food_name_en'] or ''}',
category: '{food['category']}',
subcategory: '{food['subcategory'] or ''}',
description: '{food['description'] or ''}'
}})
RETURN f
$$) AS (result agtype);
"""
self.cursor.execute(query)
self.conn.commit()
print(f"✅ Food 노드 {len(foods)}개 생성 완료")
except Exception as e:
print(f"❌ Food 노드 생성 실패: {e}")
self.conn.rollback()
raise
def create_biomarker_nodes(self):
"""Biomarker 노드 생성"""
print("\n📦 Biomarker 노드 생성 중...")
try:
# SQL 테이블에서 바이오마커 데이터 조회
self.cursor.execute("""
SELECT biomarker_id, biomarker_name, biomarker_type,
normal_range_min, normal_range_max, unit, description
FROM biomarkers
""")
biomarkers = self.cursor.fetchall()
for bm in biomarkers:
query = f"""
SELECT * FROM cypher('{self.graph_name}', $$
MERGE (b:Biomarker {{
biomarker_id: {bm['biomarker_id']},
name: '{bm['biomarker_name']}',
type: '{bm['biomarker_type']}',
normal_min: {bm['normal_range_min'] or 0},
normal_max: {bm['normal_range_max'] or 0},
unit: '{bm['unit'] or ''}',
description: '{bm['description'] or ''}'
}})
RETURN b
$$) AS (result agtype);
"""
self.cursor.execute(query)
self.conn.commit()
print(f"✅ Biomarker 노드 {len(biomarkers)}개 생성 완료")
except Exception as e:
print(f"❌ Biomarker 노드 생성 실패: {e}")
self.conn.rollback()
raise
def create_food_biomarker_relationships(self):
"""Food → Biomarker 관계 생성"""
print("\n🔗 Food → Biomarker 관계 생성 중...")
try:
# SQL 테이블에서 관계 데이터 조회
self.cursor.execute("""
SELECT
f.food_id, f.food_name,
b.biomarker_id, b.biomarker_name,
fbe.effect_type, fbe.magnitude, fbe.percent_change,
fbe.mechanism, fbe.evidence_pmid, fbe.study_type, fbe.reliability
FROM food_biomarker_effects fbe
JOIN foods f ON fbe.food_id = f.food_id
JOIN biomarkers b ON fbe.biomarker_id = b.biomarker_id
""")
effects = self.cursor.fetchall()
for effect in effects:
# 관계 타입 결정
if effect['effect_type'] == 'increases':
rel_type = 'INCREASES'
elif effect['effect_type'] == 'decreases':
rel_type = 'DECREASES'
else:
rel_type = 'AFFECTS'
# Cypher 쿼리로 관계 생성
query = f"""
SELECT * FROM cypher('{self.graph_name}', $$
MATCH (f:Food {{food_id: {effect['food_id']}}})
MATCH (b:Biomarker {{biomarker_id: {effect['biomarker_id']}}})
MERGE (f)-[r:{rel_type} {{
magnitude: '{effect['magnitude'] or 'unknown'}',
percent_change: {effect['percent_change'] or 0},
mechanism: '{effect['mechanism'] or ''}',
evidence_pmid: '{effect['evidence_pmid'] or ''}',
study_type: '{effect['study_type'] or ''}',
reliability: {effect['reliability'] or 0.5}
}}]->(b)
RETURN r
$$) AS (result agtype);
"""
self.cursor.execute(query)
self.conn.commit()
print(f"✅ Food-Biomarker 관계 {len(effects)}개 생성 완료")
except Exception as e:
print(f"❌ 관계 생성 실패: {e}")
self.conn.rollback()
raise
def create_disease_nodes(self):
"""Disease 노드 생성 (질병-바이오마커 연결용)"""
print("\n📦 Disease 노드 생성 중...")
try:
# SQL 테이블에서 질병 데이터 조회
self.cursor.execute("""
SELECT DISTINCT disease_icd_code, disease_name
FROM disease_biomarker_association
""")
diseases = self.cursor.fetchall()
for disease in diseases:
query = f"""
SELECT * FROM cypher('{self.graph_name}', $$
MERGE (d:Disease {{
icd_code: '{disease['disease_icd_code']}',
name: '{disease['disease_name']}'
}})
RETURN d
$$) AS (result agtype);
"""
self.cursor.execute(query)
self.conn.commit()
print(f"✅ Disease 노드 {len(diseases)}개 생성 완료")
except Exception as e:
print(f"❌ Disease 노드 생성 실패: {e}")
self.conn.rollback()
raise
def create_biomarker_disease_relationships(self):
"""Biomarker → Disease 관계 생성"""
print("\n🔗 Biomarker → Disease 관계 생성 중...")
try:
self.cursor.execute("""
SELECT
b.biomarker_id, b.biomarker_name,
dba.disease_icd_code, dba.disease_name,
dba.association_strength, dba.threshold_value,
dba.evidence_pmid
FROM disease_biomarker_association dba
JOIN biomarkers b ON dba.biomarker_id = b.biomarker_id
""")
associations = self.cursor.fetchall()
for assoc in associations:
query = f"""
SELECT * FROM cypher('{self.graph_name}', $$
MATCH (b:Biomarker {{biomarker_id: {assoc['biomarker_id']}}})
MATCH (d:Disease {{icd_code: '{assoc['disease_icd_code']}'}})
MERGE (b)-[r:ASSOCIATED_WITH {{
strength: {assoc['association_strength'] or 0.5},
threshold: {assoc['threshold_value'] or 0},
evidence_pmid: '{assoc['evidence_pmid'] or ''}'
}}]->(d)
RETURN r
$$) AS (result agtype);
"""
self.cursor.execute(query)
self.conn.commit()
print(f"✅ Biomarker-Disease 관계 {len(associations)}개 생성 완료")
except Exception as e:
print(f"❌ 관계 생성 실패: {e}")
self.conn.rollback()
raise
def verify_graph(self):
"""그래프 검증"""
print("\n🔍 그래프 검증 중...")
try:
# 노드 개수 확인
queries = {
'Food': f"SELECT * FROM cypher('{self.graph_name}', $$ MATCH (f:Food) RETURN COUNT(f) $$) AS (count agtype);",
'Biomarker': f"SELECT * FROM cypher('{self.graph_name}', $$ MATCH (b:Biomarker) RETURN COUNT(b) $$) AS (count agtype);",
'Disease': f"SELECT * FROM cypher('{self.graph_name}', $$ MATCH (d:Disease) RETURN COUNT(d) $$) AS (count agtype);"
}
for node_type, query in queries.items():
self.cursor.execute(query)
result = self.cursor.fetchone()
count = result['count'] if result else 0
print(f" {node_type} 노드: {count}")
# 관계 개수 확인
rel_query = f"SELECT * FROM cypher('{self.graph_name}', $$ MATCH ()-[r]->() RETURN COUNT(r) $$) AS (count agtype);"
self.cursor.execute(rel_query)
rel_result = self.cursor.fetchone()
rel_count = rel_result['count'] if rel_result else 0
print(f" 관계: {rel_count}")
print("✅ 그래프 검증 완료")
except Exception as e:
print(f"❌ 그래프 검증 실패: {e}")
def build(self):
"""전체 그래프 빌드"""
print("\n" + "=" * 60)
print("Apache AGE 그래프 빌드 시작")
print("=" * 60)
try:
self.connect()
self.create_food_nodes()
self.create_biomarker_nodes()
self.create_disease_nodes()
self.create_food_biomarker_relationships()
self.create_biomarker_disease_relationships()
self.verify_graph()
print("\n" + "=" * 60)
print("✅ 그래프 빌드 완료!")
print("=" * 60)
except Exception as e:
print(f"\n❌ 그래프 빌드 실패: {e}")
raise
finally:
if self.conn:
self.conn.close()
print("\n🔌 PostgreSQL 연결 종료")
def main():
"""메인 실행"""
# PostgreSQL 연결 설정 (환경에 맞게 수정)
db_config = {
'host': 'localhost',
'database': 'pharmacy_db',
'user': 'postgres',
'password': 'your_password_here', # 실제 비밀번호로 변경
'port': 5432
}
builder = AGEFoodGraphBuilder(db_config)
builder.build()
if __name__ == '__main__':
main()