pharmacy-pos-qr-system/backend/utils/animal_rag.py
thug0bin 83ecf88bd4 feat(animal-chat): APC 코드 2024년 체계 지원 및 피부약 2단계 추천
## APC 코드 체계 확장
- 기존: 023%만 검색 (~2023년 제품만)
- 변경: 02% OR 92% + 13자리 검증
  - 02%: 2023년 이전 item_seq (9자리) 기반 APC
  - 92%: 2024년 이후 item_seq (10자리) 기반 APC
- 999% 등 청구프로그램 임의코드는 제외

## 동물약 챗봇 피부약 추천 개선
- 피부약 2단계 추천 구조 추가
  - 1차(치료): 의약품 (개시딘겔, 테르비덤 등)
  - 2차(보조케어): 의약외품 (스킨카솔 - 회복기 피부보호)
- 스킨카솔은 의약외품임을 명시하여 치료제로 오인 방지

## 기타
- RAG 테스트 스크립트 추가
- 수인약품 API 문서화
2026-03-11 14:20:44 +09:00

403 lines
14 KiB
Python

# -*- coding: utf-8 -*-
"""
동물약 벡터 DB RAG 모듈
- LanceDB + OpenAI text-embedding-3-small
- MD 파일 청킹 및 임베딩
- 유사도 검색
"""
import os
import re
import logging
from pathlib import Path
from typing import List, Dict, Optional
# .env 로드
from dotenv import load_dotenv
env_path = Path(__file__).parent.parent / ".env"
load_dotenv(env_path)
# LanceDB
import lancedb
from openai import OpenAI
logger = logging.getLogger(__name__)
# 설정
LANCE_DB_PATH = Path(__file__).parent.parent / "db" / "lance_animal_drugs"
MD_DOCS_PATH = Path("C:/Users/청춘약국/source/new_anipharm")
TABLE_NAME = "animal_drugs"
CHUNK_SIZE = 1500 # 약 500 토큰
CHUNK_OVERLAP = 300 # 약 100 토큰
EMBEDDING_DIM = 1536 # text-embedding-3-small
class AnimalDrugRAG:
"""동물약 RAG 클래스 (LanceDB 버전)"""
def __init__(self, openai_api_key: str = None):
"""
Args:
openai_api_key: OpenAI API 키 (없으면 환경변수에서 가져옴)
"""
self.api_key = openai_api_key or os.getenv('OPENAI_API_KEY')
self.db = None
self.table = None
self.openai_client = None
self._initialized = False
def _init_db(self):
"""DB 초기화 (lazy loading)"""
if self._initialized:
return
try:
# LanceDB 연결
LANCE_DB_PATH.mkdir(parents=True, exist_ok=True)
self.db = lancedb.connect(str(LANCE_DB_PATH))
# OpenAI 클라이언트
if self.api_key:
self.openai_client = OpenAI(api_key=self.api_key)
else:
logger.warning("OpenAI API 키 없음")
# 기존 테이블 열기
if TABLE_NAME in self.db.table_names():
self.table = self.db.open_table(TABLE_NAME)
logger.info(f"기존 테이블 열림 (행 수: {len(self.table)})")
else:
logger.info("테이블 없음 - index_md_files() 호출 필요")
self._initialized = True
except Exception as e:
logger.error(f"AnimalDrugRAG 초기화 실패: {e}")
raise
def _get_embedding(self, text: str) -> List[float]:
"""OpenAI 임베딩 생성"""
if not self.openai_client:
raise ValueError("OpenAI 클라이언트 없음")
response = self.openai_client.embeddings.create(
model="text-embedding-3-small",
input=text
)
return response.data[0].embedding
def _get_embeddings_batch(self, texts: List[str]) -> List[List[float]]:
"""배치 임베딩 생성"""
if not self.openai_client:
raise ValueError("OpenAI 클라이언트 없음")
# OpenAI는 한 번에 최대 2048개 텍스트 처리
embeddings = []
batch_size = 100
for i in range(0, len(texts), batch_size):
batch = texts[i:i+batch_size]
response = self.openai_client.embeddings.create(
model="text-embedding-3-small",
input=batch
)
embeddings.extend([d.embedding for d in response.data])
logger.info(f"임베딩 생성: {i+len(batch)}/{len(texts)}")
return embeddings
def _extract_product_info(self, content: str) -> Dict[str, str]:
"""
MD 파일 상단에서 제품 정보 추출
- 제품명 (한글/영문)
- 성분
- 대상 동물
"""
info = {"product_name": "", "ingredients": "", "target_animal": ""}
# # 제목에서 제품명 추출 (예: "# 복합 개시딘 겔 - 표면성...")
title_match = re.search(r'^# (.+?)(?:\s*[-–—]|$)', content, re.MULTILINE)
if title_match:
info["product_name"] = title_match.group(1).strip()
# > 성분: 라인에서 추출
ingredient_match = re.search(r'>\s*성분[:\s]+(.+?)(?:\n|$)', content)
if ingredient_match:
info["ingredients"] = ingredient_match.group(1).strip()[:100] # 100자 제한
# 대상 동물 추출 (테이블에서)
animal_match = re.search(r'\*\*대상\s*동물\*\*[^\|]*\|\s*([^\|]+)', content)
if animal_match:
info["target_animal"] = animal_match.group(1).strip()
return info
def _make_chunk_prefix(self, product_info: Dict[str, str]) -> str:
"""청크 prefix 생성"""
parts = []
if product_info["product_name"]:
parts.append(f"제품명: {product_info['product_name']}")
if product_info["target_animal"]:
parts.append(f"대상: {product_info['target_animal']}")
if product_info["ingredients"]:
parts.append(f"성분: {product_info['ingredients']}")
if parts:
return "[" + " | ".join(parts) + "]\n\n"
return ""
def chunk_markdown(self, content: str, source_file: str) -> List[Dict]:
"""
마크다운 청킹 (섹션 기반 + 제품명 prefix)
"""
chunks = []
# 제품 정보 추출 & prefix 생성
product_info = self._extract_product_info(content)
prefix = self._make_chunk_prefix(product_info)
# ## 헤더 기준 분리
sections = re.split(r'\n(?=## )', content)
for i, section in enumerate(sections):
if not section.strip():
continue
# 섹션 제목 추출
title_match = re.match(r'^## (.+?)(?:\n|$)', section)
section_title = title_match.group(1).strip() if title_match else f"섹션{i+1}"
# prefix + section 결합
prefixed_section = prefix + section
# 큰 섹션은 추가 분할
if len(prefixed_section) > CHUNK_SIZE:
sub_chunks = self._split_by_size(prefixed_section, CHUNK_SIZE, CHUNK_OVERLAP)
for j, sub_chunk in enumerate(sub_chunks):
# 분할된 청크에도 prefix 보장 (overlap으로 잘렸을 경우)
if j > 0 and not sub_chunk.startswith("["):
sub_chunk = prefix + sub_chunk
chunk_id = f"{source_file}#{section_title}#{j}"
chunks.append({
"id": chunk_id,
"text": sub_chunk,
"source": source_file,
"section": section_title,
"chunk_index": j,
"product_name": product_info["product_name"]
})
else:
chunk_id = f"{source_file}#{section_title}"
chunks.append({
"id": chunk_id,
"text": prefixed_section,
"source": source_file,
"section": section_title,
"chunk_index": 0,
"product_name": product_info["product_name"]
})
return chunks
def _split_by_size(self, text: str, size: int, overlap: int) -> List[str]:
"""텍스트를 크기 기준으로 분할"""
chunks = []
start = 0
while start < len(text):
end = start + size
# 문장 경계에서 자르기
if end < len(text):
last_break = text.rfind('\n', start, end)
if last_break == -1:
last_break = text.rfind('. ', start, end)
if last_break > start + size // 2:
end = last_break + 1
chunks.append(text[start:end])
start = end - overlap
return chunks
def index_md_files(self, md_path: Path = None) -> int:
"""
MD 파일들을 인덱싱
"""
self._init_db()
md_path = md_path or MD_DOCS_PATH
if not md_path.exists():
logger.error(f"MD 파일 경로 없음: {md_path}")
return 0
# 기존 테이블 삭제
if TABLE_NAME in self.db.table_names():
self.db.drop_table(TABLE_NAME)
logger.info("기존 테이블 삭제")
# 모든 청크 수집
all_chunks = []
md_files = list(md_path.glob("*.md"))
for md_file in md_files:
try:
content = md_file.read_text(encoding='utf-8')
chunks = self.chunk_markdown(content, md_file.name)
all_chunks.extend(chunks)
logger.info(f"청킹: {md_file.name} ({len(chunks)}개)")
except Exception as e:
logger.error(f"청킹 실패 ({md_file.name}): {e}")
if not all_chunks:
logger.warning("청크 없음")
return 0
# 임베딩 생성
texts = [c["text"] for c in all_chunks]
logger.info(f"{len(texts)}개 청크 임베딩 시작...")
embeddings = self._get_embeddings_batch(texts)
# 데이터 준비
data = []
for chunk, emb in zip(all_chunks, embeddings):
data.append({
"id": chunk["id"],
"text": chunk["text"],
"source": chunk["source"],
"section": chunk["section"],
"chunk_index": chunk["chunk_index"],
"product_name": chunk.get("product_name", ""),
"vector": emb
})
# 테이블 생성
self.table = self.db.create_table(TABLE_NAME, data)
logger.info(f"인덱싱 완료: {len(data)}개 청크")
return len(data)
def search(self, query: str, n_results: int = 5) -> List[Dict]:
"""
유사도 검색
"""
self._init_db()
if self.table is None:
logger.warning("테이블 없음 - index_md_files() 필요")
return []
try:
# 쿼리 임베딩
query_emb = self._get_embedding(query)
# 검색
results = self.table.search(query_emb).limit(n_results).to_list()
output = []
for r in results:
# L2 거리 (0~∞) → 유사도 (1~0)
# 거리가 작을수록 유사도 높음
distance = r.get("_distance", 10)
score = 1 / (1 + distance) # 0~1 범위로 변환
# 임계값: 유사도 0.2 미만은 제외 (관련 없는 문서)
# L2 거리 4.0 이상이면 제외
if score < 0.2:
continue
output.append({
"text": r["text"],
"source": r["source"],
"section": r["section"],
"score": score
})
return output
except Exception as e:
logger.error(f"검색 실패: {e}")
return []
def get_context_for_chat(self, query: str, n_results: int = 3) -> str:
"""
챗봇용 컨텍스트 생성
"""
results = self.search(query, n_results)
if not results:
return ""
context_parts = ["## 📚 관련 문서 (RAG 검색 결과)"]
for i, r in enumerate(results, 1):
source = r["source"].replace(".md", "")
section = r["section"]
score = r["score"]
text = r["text"][:1500]
context_parts.append(f"\n### [{i}] {source} - {section} (관련도: {score:.0%})")
context_parts.append(text)
return "\n".join(context_parts)
def get_stats(self) -> Dict:
"""통계 정보 반환"""
self._init_db()
count = len(self.table) if self.table else 0
return {
"table_name": TABLE_NAME,
"document_count": count,
"db_path": str(LANCE_DB_PATH)
}
# 싱글톤 인스턴스
_rag_instance: Optional[AnimalDrugRAG] = None
def get_animal_rag(api_key: str = None) -> AnimalDrugRAG:
"""싱글톤 RAG 인스턴스 반환"""
global _rag_instance
if _rag_instance is None:
_rag_instance = AnimalDrugRAG(api_key)
return _rag_instance
# CLI 테스트
if __name__ == "__main__":
import sys
logging.basicConfig(level=logging.INFO)
rag = AnimalDrugRAG()
if len(sys.argv) > 1:
cmd = sys.argv[1]
if cmd == "index":
count = rag.index_md_files()
print(f"\n{count}개 청크 인덱싱 완료")
elif cmd == "search" and len(sys.argv) > 2:
query = " ".join(sys.argv[2:])
results = rag.search(query)
print(f"\n🔍 검색: {query}")
for r in results:
print(f"\n[{r['score']:.0%}] {r['source']} - {r['section']}")
print(r['text'][:300] + "...")
elif cmd == "stats":
stats = rag.get_stats()
print(f"\n📊 통계:")
print(f" - 테이블: {stats['table_name']}")
print(f" - 문서 수: {stats['document_count']}")
print(f" - DB 경로: {stats['db_path']}")
else:
print("사용법:")
print(" python animal_rag.py index # MD 파일 인덱싱")
print(" python animal_rag.py search 질문 # 검색")
print(" python animal_rag.py stats # 통계")