""" Apache AGE 그래프 생성: Food + Biomarker 노드 및 관계 목적: PostgreSQL 테이블 데이터를 Apache AGE 그래프로 변환 작성일: 2026-02-04 """ import sys import os # UTF-8 인코딩 강제 if sys.platform == 'win32': import io sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8') sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8') import psycopg2 from psycopg2.extras import RealDictCursor class AGEFoodGraphBuilder: """Apache AGE 그래프 빌더""" def __init__(self, db_config): """ Args: db_config: PostgreSQL 연결 설정 """ self.db_config = db_config self.conn = None self.cursor = None self.graph_name = 'pharmacy_graph' def connect(self): """PostgreSQL 연결""" try: self.conn = psycopg2.connect(**self.db_config) self.cursor = self.conn.cursor(cursor_factory=RealDictCursor) print("✅ PostgreSQL 연결 성공") # AGE 확장 로드 self.cursor.execute("LOAD 'age';") self.cursor.execute("SET search_path = ag_catalog, '$user', public;") # 그래프 생성 (이미 있으면 무시) try: self.cursor.execute(f"SELECT create_graph('{self.graph_name}');") self.conn.commit() print(f"✅ 그래프 '{self.graph_name}' 생성 완료") except psycopg2.Error as e: if 'already exists' in str(e): print(f"ℹ️ 그래프 '{self.graph_name}' 이미 존재") self.conn.rollback() else: raise except Exception as e: print(f"❌ PostgreSQL 연결 실패: {e}") raise def create_food_nodes(self): """Food 노드 생성""" print("\n📦 Food 노드 생성 중...") try: # SQL 테이블에서 식품 데이터 조회 self.cursor.execute(""" SELECT food_id, food_name, food_name_en, category, subcategory, description FROM foods """) foods = self.cursor.fetchall() for food in foods: # Cypher 쿼리로 노드 생성 query = f""" SELECT * FROM cypher('{self.graph_name}', $$ MERGE (f:Food {{ food_id: {food['food_id']}, name: '{food['food_name']}', name_en: '{food['food_name_en'] or ''}', category: '{food['category']}', subcategory: '{food['subcategory'] or ''}', description: '{food['description'] or ''}' }}) RETURN f $$) AS (result agtype); """ self.cursor.execute(query) self.conn.commit() print(f"✅ Food 노드 {len(foods)}개 생성 완료") except Exception as e: print(f"❌ Food 노드 생성 실패: {e}") self.conn.rollback() raise def create_biomarker_nodes(self): """Biomarker 노드 생성""" print("\n📦 Biomarker 노드 생성 중...") try: # SQL 테이블에서 바이오마커 데이터 조회 self.cursor.execute(""" SELECT biomarker_id, biomarker_name, biomarker_type, normal_range_min, normal_range_max, unit, description FROM biomarkers """) biomarkers = self.cursor.fetchall() for bm in biomarkers: query = f""" SELECT * FROM cypher('{self.graph_name}', $$ MERGE (b:Biomarker {{ biomarker_id: {bm['biomarker_id']}, name: '{bm['biomarker_name']}', type: '{bm['biomarker_type']}', normal_min: {bm['normal_range_min'] or 0}, normal_max: {bm['normal_range_max'] or 0}, unit: '{bm['unit'] or ''}', description: '{bm['description'] or ''}' }}) RETURN b $$) AS (result agtype); """ self.cursor.execute(query) self.conn.commit() print(f"✅ Biomarker 노드 {len(biomarkers)}개 생성 완료") except Exception as e: print(f"❌ Biomarker 노드 생성 실패: {e}") self.conn.rollback() raise def create_food_biomarker_relationships(self): """Food → Biomarker 관계 생성""" print("\n🔗 Food → Biomarker 관계 생성 중...") try: # SQL 테이블에서 관계 데이터 조회 self.cursor.execute(""" SELECT f.food_id, f.food_name, b.biomarker_id, b.biomarker_name, fbe.effect_type, fbe.magnitude, fbe.percent_change, fbe.mechanism, fbe.evidence_pmid, fbe.study_type, fbe.reliability FROM food_biomarker_effects fbe JOIN foods f ON fbe.food_id = f.food_id JOIN biomarkers b ON fbe.biomarker_id = b.biomarker_id """) effects = self.cursor.fetchall() for effect in effects: # 관계 타입 결정 if effect['effect_type'] == 'increases': rel_type = 'INCREASES' elif effect['effect_type'] == 'decreases': rel_type = 'DECREASES' else: rel_type = 'AFFECTS' # Cypher 쿼리로 관계 생성 query = f""" SELECT * FROM cypher('{self.graph_name}', $$ MATCH (f:Food {{food_id: {effect['food_id']}}}) MATCH (b:Biomarker {{biomarker_id: {effect['biomarker_id']}}}) MERGE (f)-[r:{rel_type} {{ magnitude: '{effect['magnitude'] or 'unknown'}', percent_change: {effect['percent_change'] or 0}, mechanism: '{effect['mechanism'] or ''}', evidence_pmid: '{effect['evidence_pmid'] or ''}', study_type: '{effect['study_type'] or ''}', reliability: {effect['reliability'] or 0.5} }}]->(b) RETURN r $$) AS (result agtype); """ self.cursor.execute(query) self.conn.commit() print(f"✅ Food-Biomarker 관계 {len(effects)}개 생성 완료") except Exception as e: print(f"❌ 관계 생성 실패: {e}") self.conn.rollback() raise def create_disease_nodes(self): """Disease 노드 생성 (질병-바이오마커 연결용)""" print("\n📦 Disease 노드 생성 중...") try: # SQL 테이블에서 질병 데이터 조회 self.cursor.execute(""" SELECT DISTINCT disease_icd_code, disease_name FROM disease_biomarker_association """) diseases = self.cursor.fetchall() for disease in diseases: query = f""" SELECT * FROM cypher('{self.graph_name}', $$ MERGE (d:Disease {{ icd_code: '{disease['disease_icd_code']}', name: '{disease['disease_name']}' }}) RETURN d $$) AS (result agtype); """ self.cursor.execute(query) self.conn.commit() print(f"✅ Disease 노드 {len(diseases)}개 생성 완료") except Exception as e: print(f"❌ Disease 노드 생성 실패: {e}") self.conn.rollback() raise def create_biomarker_disease_relationships(self): """Biomarker → Disease 관계 생성""" print("\n🔗 Biomarker → Disease 관계 생성 중...") try: self.cursor.execute(""" SELECT b.biomarker_id, b.biomarker_name, dba.disease_icd_code, dba.disease_name, dba.association_strength, dba.threshold_value, dba.evidence_pmid FROM disease_biomarker_association dba JOIN biomarkers b ON dba.biomarker_id = b.biomarker_id """) associations = self.cursor.fetchall() for assoc in associations: query = f""" SELECT * FROM cypher('{self.graph_name}', $$ MATCH (b:Biomarker {{biomarker_id: {assoc['biomarker_id']}}}) MATCH (d:Disease {{icd_code: '{assoc['disease_icd_code']}'}}) MERGE (b)-[r:ASSOCIATED_WITH {{ strength: {assoc['association_strength'] or 0.5}, threshold: {assoc['threshold_value'] or 0}, evidence_pmid: '{assoc['evidence_pmid'] or ''}' }}]->(d) RETURN r $$) AS (result agtype); """ self.cursor.execute(query) self.conn.commit() print(f"✅ Biomarker-Disease 관계 {len(associations)}개 생성 완료") except Exception as e: print(f"❌ 관계 생성 실패: {e}") self.conn.rollback() raise def verify_graph(self): """그래프 검증""" print("\n🔍 그래프 검증 중...") try: # 노드 개수 확인 queries = { 'Food': f"SELECT * FROM cypher('{self.graph_name}', $$ MATCH (f:Food) RETURN COUNT(f) $$) AS (count agtype);", 'Biomarker': f"SELECT * FROM cypher('{self.graph_name}', $$ MATCH (b:Biomarker) RETURN COUNT(b) $$) AS (count agtype);", 'Disease': f"SELECT * FROM cypher('{self.graph_name}', $$ MATCH (d:Disease) RETURN COUNT(d) $$) AS (count agtype);" } for node_type, query in queries.items(): self.cursor.execute(query) result = self.cursor.fetchone() count = result['count'] if result else 0 print(f" {node_type} 노드: {count}개") # 관계 개수 확인 rel_query = f"SELECT * FROM cypher('{self.graph_name}', $$ MATCH ()-[r]->() RETURN COUNT(r) $$) AS (count agtype);" self.cursor.execute(rel_query) rel_result = self.cursor.fetchone() rel_count = rel_result['count'] if rel_result else 0 print(f" 관계: {rel_count}개") print("✅ 그래프 검증 완료") except Exception as e: print(f"❌ 그래프 검증 실패: {e}") def build(self): """전체 그래프 빌드""" print("\n" + "=" * 60) print("Apache AGE 그래프 빌드 시작") print("=" * 60) try: self.connect() self.create_food_nodes() self.create_biomarker_nodes() self.create_disease_nodes() self.create_food_biomarker_relationships() self.create_biomarker_disease_relationships() self.verify_graph() print("\n" + "=" * 60) print("✅ 그래프 빌드 완료!") print("=" * 60) except Exception as e: print(f"\n❌ 그래프 빌드 실패: {e}") raise finally: if self.conn: self.conn.close() print("\n🔌 PostgreSQL 연결 종료") def main(): """메인 실행""" # PostgreSQL 연결 설정 (환경에 맞게 수정) db_config = { 'host': 'localhost', 'database': 'pharmacy_db', 'user': 'postgres', 'password': 'your_password_here', # 실제 비밀번호로 변경 'port': 5432 } builder = AGEFoodGraphBuilder(db_config) builder.build() if __name__ == '__main__': main()