pharmacy-pos-qr-system/backend/db/age_food_graph.py
시골약사 37821fefdb feat: IL-1β 식품 GraphRAG 스키마 확장 및 데이터 파이프라인
PostgreSQL + Apache AGE에 식품-바이오마커 관계 추가:

1. schema_food_biomarker.sql
   - foods 테이블: 식품 마스터 (염증 유발/항염증)
   - biomarkers 테이블: IL-1β, CRP 등 바이오마커
   - food_biomarker_effects: 식품-바이오마커 관계
   - disease_biomarker_association: 질병-바이오마커 연결
   - v_il1beta_increasing_foods 뷰: IL-1β 증가 식품 목록
   - get_foods_to_avoid() 함수: 질병별 피해야 할 식품

2. age_food_graph.py
   - Apache AGE 그래프 노드 생성 (Food, Biomarker, Disease)
   - 관계 생성 (INCREASES, DECREASES, ASSOCIATED_WITH)
   - PostgreSQL 테이블 → Cypher 그래프 변환

3. import_il1beta_foods.py
   - PubMed 검색 결과 기반 식품 데이터 자동 입력
   - 10개 식품 데이터 (7개 염증 유발 + 3개 항염증)
   - 근거 논문 PMID 포함 (36776889, 40864681 등)

4. il1beta_proinflammatory_foods_research.py
   - PubMed 검색: 고지방, 고당, 가공육, 적색육, 알코올
   - 24개 논문 분석
   - 카테고리별 분류 및 메커니즘 분석

활용:
- NAFLD 환자 식이 지도 (고지방식 금지)
- 관절염 환자 항염증 식단 (오메가-3 권장)
- 근거 기반 영양 상담 (PubMed PMID 제시)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-04 17:20:53 +09:00

336 lines
12 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Apache AGE 그래프 생성: Food + Biomarker 노드 및 관계
목적: PostgreSQL 테이블 데이터를 Apache AGE 그래프로 변환
작성일: 2026-02-04
"""
import sys
import os
# UTF-8 인코딩 강제
if sys.platform == 'win32':
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
import psycopg2
from psycopg2.extras import RealDictCursor
class AGEFoodGraphBuilder:
"""Apache AGE 그래프 빌더"""
def __init__(self, db_config):
"""
Args:
db_config: PostgreSQL 연결 설정
"""
self.db_config = db_config
self.conn = None
self.cursor = None
self.graph_name = 'pharmacy_graph'
def connect(self):
"""PostgreSQL 연결"""
try:
self.conn = psycopg2.connect(**self.db_config)
self.cursor = self.conn.cursor(cursor_factory=RealDictCursor)
print("✅ PostgreSQL 연결 성공")
# AGE 확장 로드
self.cursor.execute("LOAD 'age';")
self.cursor.execute("SET search_path = ag_catalog, '$user', public;")
# 그래프 생성 (이미 있으면 무시)
try:
self.cursor.execute(f"SELECT create_graph('{self.graph_name}');")
self.conn.commit()
print(f"✅ 그래프 '{self.graph_name}' 생성 완료")
except psycopg2.Error as e:
if 'already exists' in str(e):
print(f" 그래프 '{self.graph_name}' 이미 존재")
self.conn.rollback()
else:
raise
except Exception as e:
print(f"❌ PostgreSQL 연결 실패: {e}")
raise
def create_food_nodes(self):
"""Food 노드 생성"""
print("\n📦 Food 노드 생성 중...")
try:
# SQL 테이블에서 식품 데이터 조회
self.cursor.execute("""
SELECT food_id, food_name, food_name_en, category, subcategory, description
FROM foods
""")
foods = self.cursor.fetchall()
for food in foods:
# Cypher 쿼리로 노드 생성
query = f"""
SELECT * FROM cypher('{self.graph_name}', $$
MERGE (f:Food {{
food_id: {food['food_id']},
name: '{food['food_name']}',
name_en: '{food['food_name_en'] or ''}',
category: '{food['category']}',
subcategory: '{food['subcategory'] or ''}',
description: '{food['description'] or ''}'
}})
RETURN f
$$) AS (result agtype);
"""
self.cursor.execute(query)
self.conn.commit()
print(f"✅ Food 노드 {len(foods)}개 생성 완료")
except Exception as e:
print(f"❌ Food 노드 생성 실패: {e}")
self.conn.rollback()
raise
def create_biomarker_nodes(self):
"""Biomarker 노드 생성"""
print("\n📦 Biomarker 노드 생성 중...")
try:
# SQL 테이블에서 바이오마커 데이터 조회
self.cursor.execute("""
SELECT biomarker_id, biomarker_name, biomarker_type,
normal_range_min, normal_range_max, unit, description
FROM biomarkers
""")
biomarkers = self.cursor.fetchall()
for bm in biomarkers:
query = f"""
SELECT * FROM cypher('{self.graph_name}', $$
MERGE (b:Biomarker {{
biomarker_id: {bm['biomarker_id']},
name: '{bm['biomarker_name']}',
type: '{bm['biomarker_type']}',
normal_min: {bm['normal_range_min'] or 0},
normal_max: {bm['normal_range_max'] or 0},
unit: '{bm['unit'] or ''}',
description: '{bm['description'] or ''}'
}})
RETURN b
$$) AS (result agtype);
"""
self.cursor.execute(query)
self.conn.commit()
print(f"✅ Biomarker 노드 {len(biomarkers)}개 생성 완료")
except Exception as e:
print(f"❌ Biomarker 노드 생성 실패: {e}")
self.conn.rollback()
raise
def create_food_biomarker_relationships(self):
"""Food → Biomarker 관계 생성"""
print("\n🔗 Food → Biomarker 관계 생성 중...")
try:
# SQL 테이블에서 관계 데이터 조회
self.cursor.execute("""
SELECT
f.food_id, f.food_name,
b.biomarker_id, b.biomarker_name,
fbe.effect_type, fbe.magnitude, fbe.percent_change,
fbe.mechanism, fbe.evidence_pmid, fbe.study_type, fbe.reliability
FROM food_biomarker_effects fbe
JOIN foods f ON fbe.food_id = f.food_id
JOIN biomarkers b ON fbe.biomarker_id = b.biomarker_id
""")
effects = self.cursor.fetchall()
for effect in effects:
# 관계 타입 결정
if effect['effect_type'] == 'increases':
rel_type = 'INCREASES'
elif effect['effect_type'] == 'decreases':
rel_type = 'DECREASES'
else:
rel_type = 'AFFECTS'
# Cypher 쿼리로 관계 생성
query = f"""
SELECT * FROM cypher('{self.graph_name}', $$
MATCH (f:Food {{food_id: {effect['food_id']}}})
MATCH (b:Biomarker {{biomarker_id: {effect['biomarker_id']}}})
MERGE (f)-[r:{rel_type} {{
magnitude: '{effect['magnitude'] or 'unknown'}',
percent_change: {effect['percent_change'] or 0},
mechanism: '{effect['mechanism'] or ''}',
evidence_pmid: '{effect['evidence_pmid'] or ''}',
study_type: '{effect['study_type'] or ''}',
reliability: {effect['reliability'] or 0.5}
}}]->(b)
RETURN r
$$) AS (result agtype);
"""
self.cursor.execute(query)
self.conn.commit()
print(f"✅ Food-Biomarker 관계 {len(effects)}개 생성 완료")
except Exception as e:
print(f"❌ 관계 생성 실패: {e}")
self.conn.rollback()
raise
def create_disease_nodes(self):
"""Disease 노드 생성 (질병-바이오마커 연결용)"""
print("\n📦 Disease 노드 생성 중...")
try:
# SQL 테이블에서 질병 데이터 조회
self.cursor.execute("""
SELECT DISTINCT disease_icd_code, disease_name
FROM disease_biomarker_association
""")
diseases = self.cursor.fetchall()
for disease in diseases:
query = f"""
SELECT * FROM cypher('{self.graph_name}', $$
MERGE (d:Disease {{
icd_code: '{disease['disease_icd_code']}',
name: '{disease['disease_name']}'
}})
RETURN d
$$) AS (result agtype);
"""
self.cursor.execute(query)
self.conn.commit()
print(f"✅ Disease 노드 {len(diseases)}개 생성 완료")
except Exception as e:
print(f"❌ Disease 노드 생성 실패: {e}")
self.conn.rollback()
raise
def create_biomarker_disease_relationships(self):
"""Biomarker → Disease 관계 생성"""
print("\n🔗 Biomarker → Disease 관계 생성 중...")
try:
self.cursor.execute("""
SELECT
b.biomarker_id, b.biomarker_name,
dba.disease_icd_code, dba.disease_name,
dba.association_strength, dba.threshold_value,
dba.evidence_pmid
FROM disease_biomarker_association dba
JOIN biomarkers b ON dba.biomarker_id = b.biomarker_id
""")
associations = self.cursor.fetchall()
for assoc in associations:
query = f"""
SELECT * FROM cypher('{self.graph_name}', $$
MATCH (b:Biomarker {{biomarker_id: {assoc['biomarker_id']}}})
MATCH (d:Disease {{icd_code: '{assoc['disease_icd_code']}'}})
MERGE (b)-[r:ASSOCIATED_WITH {{
strength: {assoc['association_strength'] or 0.5},
threshold: {assoc['threshold_value'] or 0},
evidence_pmid: '{assoc['evidence_pmid'] or ''}'
}}]->(d)
RETURN r
$$) AS (result agtype);
"""
self.cursor.execute(query)
self.conn.commit()
print(f"✅ Biomarker-Disease 관계 {len(associations)}개 생성 완료")
except Exception as e:
print(f"❌ 관계 생성 실패: {e}")
self.conn.rollback()
raise
def verify_graph(self):
"""그래프 검증"""
print("\n🔍 그래프 검증 중...")
try:
# 노드 개수 확인
queries = {
'Food': f"SELECT * FROM cypher('{self.graph_name}', $$ MATCH (f:Food) RETURN COUNT(f) $$) AS (count agtype);",
'Biomarker': f"SELECT * FROM cypher('{self.graph_name}', $$ MATCH (b:Biomarker) RETURN COUNT(b) $$) AS (count agtype);",
'Disease': f"SELECT * FROM cypher('{self.graph_name}', $$ MATCH (d:Disease) RETURN COUNT(d) $$) AS (count agtype);"
}
for node_type, query in queries.items():
self.cursor.execute(query)
result = self.cursor.fetchone()
count = result['count'] if result else 0
print(f" {node_type} 노드: {count}")
# 관계 개수 확인
rel_query = f"SELECT * FROM cypher('{self.graph_name}', $$ MATCH ()-[r]->() RETURN COUNT(r) $$) AS (count agtype);"
self.cursor.execute(rel_query)
rel_result = self.cursor.fetchone()
rel_count = rel_result['count'] if rel_result else 0
print(f" 관계: {rel_count}")
print("✅ 그래프 검증 완료")
except Exception as e:
print(f"❌ 그래프 검증 실패: {e}")
def build(self):
"""전체 그래프 빌드"""
print("\n" + "=" * 60)
print("Apache AGE 그래프 빌드 시작")
print("=" * 60)
try:
self.connect()
self.create_food_nodes()
self.create_biomarker_nodes()
self.create_disease_nodes()
self.create_food_biomarker_relationships()
self.create_biomarker_disease_relationships()
self.verify_graph()
print("\n" + "=" * 60)
print("✅ 그래프 빌드 완료!")
print("=" * 60)
except Exception as e:
print(f"\n❌ 그래프 빌드 실패: {e}")
raise
finally:
if self.conn:
self.conn.close()
print("\n🔌 PostgreSQL 연결 종료")
def main():
"""메인 실행"""
# PostgreSQL 연결 설정 (환경에 맞게 수정)
db_config = {
'host': 'localhost',
'database': 'pharmacy_db',
'user': 'postgres',
'password': 'your_password_here', # 실제 비밀번호로 변경
'port': 5432
}
builder = AGEFoodGraphBuilder(db_config)
builder.build()
if __name__ == '__main__':
main()