pharmacy-pos-qr-system/backend/pmr_api.py
thug0bin 1b33f82fd4 feat: PAAI (Pharmacist Assistant AI) 기능 구현
- PAAI 로그 테이블 스키마 (paai_logs_schema.sql)
- PAAI 로거 모듈 (db/paai_logger.py)
- /pmr/api/paai/analyze API 엔드포인트
- KIMS API 연동 (KD코드 기반 상호작용 조회)
- Clawdbot AI 연동 (HTTP API)
- PMR 화면 PAAI 버튼 및 모달
- Admin 페이지 (/admin/paai)
- 피드백 수집 기능
2026-03-05 00:36:51 +09:00

1196 lines
42 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# pmr_api.py - 조제관리(PMR) Blueprint API
# PharmaIT3000 MSSQL 연동 (192.168.0.4)
from flask import Blueprint, jsonify, request, render_template, send_file
import pyodbc
import sqlite3
from pathlib import Path
from datetime import datetime, date
import logging
from PIL import Image, ImageDraw, ImageFont
import io
import base64
import os
pmr_bp = Blueprint('pmr', __name__, url_prefix='/pmr')
# ─────────────────────────────────────────────────────────────
# MSSQL 연결 설정 (PharmaIT3000 - 192.168.0.4)
# ─────────────────────────────────────────────────────────────
MSSQL_CONFIG = {
'server': '192.168.0.4\\PM2014',
'username': 'sa',
'password': 'tmddls214!%(',
'driver': 'ODBC Driver 17 for SQL Server'
}
def get_mssql_connection(database='PM_PRES'):
"""MSSQL 연결 획득"""
conn_str = (
f"DRIVER={{{MSSQL_CONFIG['driver']}}};"
f"SERVER={MSSQL_CONFIG['server']};"
f"DATABASE={database};"
f"UID={MSSQL_CONFIG['username']};"
f"PWD={MSSQL_CONFIG['password']};"
"TrustServerCertificate=yes;"
"Connection Timeout=10"
)
return pyodbc.connect(conn_str, timeout=10)
# ─────────────────────────────────────────────────────────────
# 조제관리 페이지
# ─────────────────────────────────────────────────────────────
@pmr_bp.route('/')
def pmr_index():
"""조제관리 메인 페이지"""
return render_template('pmr.html')
# ─────────────────────────────────────────────────────────────
# API: 날짜별 처방전 목록
# ─────────────────────────────────────────────────────────────
@pmr_bp.route('/api/prescriptions', methods=['GET'])
def get_prescriptions_by_date():
"""
날짜별 처방전 목록 조회
Query Params:
- date: YYYY-MM-DD (기본값: 오늘)
"""
try:
date_str = request.args.get('date', date.today().strftime('%Y-%m-%d'))
# YYYYMMDD 형식으로 변환
date_yyyymmdd = date_str.replace('-', '')
conn = get_mssql_connection('PM_PRES')
cursor = conn.cursor()
cursor.execute("""
SELECT
PreSerial,
Day_Serial,
PassDay,
Paname,
PaNum,
CusCode,
InsName,
Drname,
PresTime,
PreGubun,
PRICE_T,
PRICE_P,
PRICE_C
FROM PS_MAIN
WHERE PassDay = ?
ORDER BY Day_Serial ASC
""", (date_yyyymmdd,))
prescriptions = []
for row in cursor.fetchall():
# 주민번호에서 나이/성별 추출
panum = row.PaNum or ''
age = None
gender = None
if len(panum) >= 7:
try:
birth_year = int(panum[:2])
gender_code = panum[6] if len(panum) > 6 else ''
# 성별 및 세기 판단
if gender_code in ['1', '2', '5', '6']:
birth_year += 1900
elif gender_code in ['3', '4', '7', '8']:
birth_year += 2000
else:
birth_year += 1900
gender = '' if gender_code in ['1', '3', '5', '7'] else ''
age = datetime.now().year - birth_year
except:
pass
prescriptions.append({
'prescription_id': row.PreSerial,
'order_number': row.Day_Serial,
'date': row.PassDay,
'patient_name': row.Paname,
'patient_id': row.PaNum[:6] + '******' if row.PaNum and len(row.PaNum) > 6 else row.PaNum,
'patient_code': row.CusCode,
'hospital': row.InsName,
'doctor': row.Drname,
'time': row.PresTime,
'type': '급여' if row.PreGubun == '0' else '비급여' if row.PreGubun == '9' else row.PreGubun,
'age': age,
'gender': gender,
'price_total': row.PRICE_T,
'price_patient': row.PRICE_P,
'price_claim': row.PRICE_C
})
conn.close()
return jsonify({
'success': True,
'date': date_str,
'count': len(prescriptions),
'prescriptions': prescriptions
})
except Exception as e:
logging.error(f"PMR 처방전 목록 조회 오류: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
# ─────────────────────────────────────────────────────────────
# API: 처방전 상세 (약품 목록)
# ─────────────────────────────────────────────────────────────
@pmr_bp.route('/api/prescription/<prescription_id>', methods=['GET'])
def get_prescription_detail(prescription_id):
"""
처방전 상세 정보 (약품 목록 포함)
"""
try:
conn = get_mssql_connection('PM_PRES')
cursor = conn.cursor()
# 처방전 기본 정보 + 질병정보
cursor.execute("""
SELECT
M.PreSerial,
M.Day_Serial,
M.PassDay,
M.Paname,
M.PaNum,
M.CusCode,
M.InsName,
M.Drname,
M.PresTime,
M.PreGubun,
M.PRICE_T,
M.PRICE_P,
M.PRICE_C,
M.St1,
M.St2,
S1.AU_NAME,
S2.AU_NAME
FROM PS_MAIN M
LEFT JOIN PM_BASE.dbo.CD_SANG S1 ON M.St1 = S1.AU_CODE
LEFT JOIN PM_BASE.dbo.CD_SANG S2 ON M.St2 = S2.AU_CODE
WHERE M.PreSerial = ?
""", (prescription_id,))
rx_row = cursor.fetchone()
if not rx_row:
conn.close()
return jsonify({'success': False, 'error': '처방전을 찾을 수 없습니다'}), 404
# 처방 약품 목록 (PS_sub_pharm + CD_GOODS + CD_MC JOIN)
medications = []
cursor.execute("""
SELECT
s.DrugCode,
s.Days,
s.QUAN,
s.QUAN_TIME,
s.PS_Type,
s.INV_QUAN,
g.GoodsName,
g.SUNG_CODE,
m.PRINT_TYPE,
m.SIM_EFFECT
FROM PS_sub_pharm s
LEFT JOIN PM_DRUG.dbo.CD_GOODS g ON s.DrugCode = g.DrugCode
LEFT JOIN PM_DRUG.dbo.CD_MC m ON s.DrugCode = m.DRUGCODE
WHERE s.PreSerial = ?
ORDER BY s.SUB_SERIAL
""", (prescription_id,))
for row in cursor.fetchall():
# 효능: PRINT_TYPE > SIM_EFFECT > 없음
add_info = row.PRINT_TYPE or row.SIM_EFFECT or ''
medications.append({
'medication_code': row.DrugCode or '',
'med_name': row.GoodsName or row.DrugCode or '',
'add_info': add_info,
'dosage': float(row.QUAN) if row.QUAN else 0,
'frequency': row.QUAN_TIME or 0,
'duration': row.Days or 0,
'total_qty': float(row.INV_QUAN) if row.INV_QUAN else 0,
'type': '급여' if row.PS_Type in ['0', '4'] else '비급여' if row.PS_Type == '1' else row.PS_Type,
'sung_code': row.SUNG_CODE or ''
})
conn.close()
# 나이/성별 계산
panum = rx_row.PaNum or ''
age = None
gender = None
if len(panum) >= 7:
try:
birth_year = int(panum[:2])
gender_code = panum[6]
if gender_code in ['1', '2', '5', '6']:
birth_year += 1900
elif gender_code in ['3', '4', '7', '8']:
birth_year += 2000
gender = '' if gender_code in ['1', '3', '5', '7'] else ''
age = datetime.now().year - birth_year
except:
pass
# 질병 정보 (St1, St2 → AU_NAME)
disease_info = None
st1 = rx_row[13] or '' # St1
st2 = rx_row[14] or '' # St2
disease_name_1 = rx_row[15] or '' # S1.AU_NAME
disease_name_2 = rx_row[16] or '' # S2.AU_NAME
if st1 or st2:
disease_info = {
'code_1': st1,
'code_2': st2,
'name_1': disease_name_1,
'name_2': disease_name_2
}
return jsonify({
'success': True,
'prescription': {
'prescription_id': rx_row.PreSerial,
'order_number': rx_row.Day_Serial,
'date': rx_row.PassDay,
'time': rx_row.PresTime,
'hospital': rx_row.InsName,
'doctor': rx_row.Drname,
'type': '급여' if rx_row.PreGubun == '0' else '비급여',
'price_total': rx_row.PRICE_T,
'price_patient': rx_row.PRICE_P
},
'patient': {
'name': rx_row.Paname,
'code': rx_row.CusCode,
'age': age,
'gender': gender
},
'disease_info': disease_info,
'medications': medications,
'medication_count': len(medications)
})
except Exception as e:
logging.error(f"PMR 처방전 상세 조회 오류: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
# ─────────────────────────────────────────────────────────────
# API: 통계 (당일 요약)
# ─────────────────────────────────────────────────────────────
@pmr_bp.route('/api/stats', methods=['GET'])
def get_daily_stats():
"""당일 조제 통계"""
try:
date_str = request.args.get('date', date.today().strftime('%Y-%m-%d'))
date_yyyymmdd = date_str.replace('-', '')
conn = get_mssql_connection('PM_PRES')
cursor = conn.cursor()
# 처방전 수
cursor.execute("""
SELECT COUNT(*) as cnt
FROM PS_MAIN
WHERE PassDay = ?
""", (date_yyyymmdd,))
total_prescriptions = cursor.fetchone()[0]
# 총 금액
cursor.execute("""
SELECT
ISNULL(SUM(PRICE_T), 0) as total,
ISNULL(SUM(PRICE_P), 0) as patient,
ISNULL(SUM(PRICE_C), 0) as claim
FROM PS_MAIN
WHERE PassDay = ?
""", (date_yyyymmdd,))
price_row = cursor.fetchone()
conn.close()
return jsonify({
'success': True,
'date': date_str,
'stats': {
'total_prescriptions': total_prescriptions,
'total_amount': price_row[0] if price_row else 0,
'patient_amount': price_row[1] if price_row else 0,
'claim_amount': price_row[2] if price_row else 0
}
})
except Exception as e:
logging.error(f"PMR 통계 조회 오류: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
# ─────────────────────────────────────────────────────────────
# API: DB 연결 테스트
# ─────────────────────────────────────────────────────────────
@pmr_bp.route('/api/test', methods=['GET'])
def test_connection():
"""DB 연결 테스트"""
try:
conn = get_mssql_connection('PM_PRES')
cursor = conn.cursor()
cursor.execute("SELECT @@VERSION")
version = cursor.fetchone()[0]
conn.close()
return jsonify({
'success': True,
'server': MSSQL_CONFIG['server'],
'version': version[:100] + '...'
})
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 500
# ─────────────────────────────────────────────────────────────
# API: 라벨 미리보기
# ─────────────────────────────────────────────────────────────
@pmr_bp.route('/api/label/preview', methods=['POST'])
def preview_label():
"""
라벨 미리보기 (PIL 렌더링 → Base64 이미지)
Request Body:
- patient_name: 환자명
- med_name: 약품명
- dosage: 1회 복용량
- frequency: 복용 횟수
- duration: 복용 일수
- unit: 단위 (정, 캡슐, mL 등)
"""
try:
data = request.get_json()
patient_name = data.get('patient_name', '')
med_name = data.get('med_name', '')
add_info = data.get('add_info', '')
dosage = float(data.get('dosage', 0))
frequency = int(data.get('frequency', 0))
duration = int(data.get('duration', 0))
unit = data.get('unit', '')
# 라벨 이미지 생성
image = create_label_image(
patient_name=patient_name,
med_name=med_name,
add_info=add_info,
dosage=dosage,
frequency=frequency,
duration=duration,
unit=unit
)
# Base64 인코딩
buffer = io.BytesIO()
image.save(buffer, format='PNG')
buffer.seek(0)
img_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8')
return jsonify({
'success': True,
'image': f'data:image/png;base64,{img_base64}'
})
except Exception as e:
logging.error(f"라벨 미리보기 오류: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
def create_label_image(patient_name, med_name, add_info='', dosage=0, frequency=0, duration=0, unit=''):
"""
라벨 이미지 생성 (29mm 용지 기준)
"""
# 라벨 크기 (29mm 용지, 300dpi 기준)
label_width = 306
label_height = 380
image = Image.new("RGB", (label_width, label_height), "white")
draw = ImageDraw.Draw(image)
# 폰트 설정 (Windows 경로)
font_path = "C:/Windows/Fonts/malgunbd.ttf"
if not os.path.exists(font_path):
font_path = "C:/Windows/Fonts/malgun.ttf"
try:
name_font = ImageFont.truetype(font_path, 36)
drug_font = ImageFont.truetype(font_path, 24)
info_font = ImageFont.truetype(font_path, 22)
small_font = ImageFont.truetype(font_path, 18)
except:
name_font = ImageFont.load_default()
drug_font = ImageFont.load_default()
info_font = ImageFont.load_default()
small_font = ImageFont.load_default()
# 중앙 정렬 텍스트 함수
def draw_centered(text, y, font, fill="black"):
bbox = draw.textbbox((0, 0), text, font=font)
w = bbox[2] - bbox[0]
x = (label_width - w) // 2
draw.text((x, y), text, font=font, fill=fill)
return y + bbox[3] - bbox[1] + 5
# 약품명 줄바꿈 처리
def wrap_text(text, font, max_width):
lines = []
words = text.split()
current_line = ""
for word in words:
test_line = f"{current_line} {word}".strip()
bbox = draw.textbbox((0, 0), test_line, font=font)
if bbox[2] - bbox[0] <= max_width:
current_line = test_line
else:
if current_line:
lines.append(current_line)
current_line = word
if current_line:
lines.append(current_line)
return lines if lines else [text]
y = 15
# 환자명 (띄어쓰기)
spaced_name = " ".join(patient_name) if patient_name else ""
y = draw_centered(spaced_name, y, name_font)
y += 5
# 약품명 (줄바꿈)
# 괄호 앞에서 분리
if '(' in med_name:
main_name = med_name.split('(')[0].strip()
else:
main_name = med_name
# 약품명 줄바꿈
name_lines = wrap_text(main_name, drug_font, label_width - 30)
for line in name_lines:
y = draw_centered(line, y, drug_font)
# 효능효과 (add_info)
if add_info:
y = draw_centered(f"({add_info})", y, small_font, fill="gray")
y += 5
# 총량 계산
if dosage > 0 and frequency > 0 and duration > 0:
total = dosage * frequency * duration
total_str = str(int(total)) if total == int(total) else f"{total:.1f}"
total_text = f"{total_str}{unit} / {duration}일분"
y = draw_centered(total_text, y, info_font)
y += 5
# 용법 박스
box_margin = 20
box_top = y
box_bottom = y + 70
draw.rectangle(
[(box_margin, box_top), (label_width - box_margin, box_bottom)],
outline="black",
width=2
)
# 박스 내용
dosage_str = str(int(dosage)) if dosage == int(dosage) else f"{dosage:.2f}".rstrip('0').rstrip('.')
dosage_text = f"{dosage_str}{unit}"
# 복용 시간
if frequency == 1:
time_text = "아침"
elif frequency == 2:
time_text = "아침, 저녁"
elif frequency == 3:
time_text = "아침, 점심, 저녁"
else:
time_text = f"1일 {frequency}"
box_center_y = (box_top + box_bottom) // 2
draw_centered(dosage_text, box_center_y - 20, info_font)
draw_centered(time_text, box_center_y + 5, info_font)
y = box_bottom + 10
# 조제일
today = datetime.now().strftime('%Y-%m-%d')
y = draw_centered(f"조제일: {today}", y, small_font)
# 약국명 (하단)
pharmacy_y = label_height - 40
draw.rectangle(
[(50, pharmacy_y - 5), (label_width - 50, pharmacy_y + 25)],
outline="black",
width=1
)
draw_centered("청 춘 약 국", pharmacy_y, info_font)
return image
# ─────────────────────────────────────────────────────────────
# API: 환자 이전 처방 이력
# ─────────────────────────────────────────────────────────────
@pmr_bp.route('/api/patient/<cus_code>/history', methods=['GET'])
def get_patient_history(cus_code):
"""
환자 이전 처방 이력 조회
Args:
cus_code: 환자 고유코드 (CusCode)
Query Params:
- limit: 최대 조회 건수 (기본 10, 최대 50)
- exclude: 제외할 처방번호 (현재 처방)
"""
try:
limit = min(int(request.args.get('limit', 10)), 50)
exclude_serial = request.args.get('exclude', '')
conn = get_mssql_connection('PM_PRES')
cursor = conn.cursor()
# 이전 처방 목록 조회
query = """
SELECT TOP (?)
m.PreSerial,
m.PassDay,
m.PresTime,
m.Paname,
m.InsName,
m.Drname,
m.PRICE_P,
(SELECT COUNT(*) FROM PS_sub_pharm WHERE PreSerial = m.PreSerial) as drug_count
FROM PS_MAIN m
WHERE m.CusCode = ?
"""
params = [limit, cus_code]
if exclude_serial:
query += " AND m.PreSerial != ?"
params.append(exclude_serial)
query += " ORDER BY m.PassDay DESC, m.PresTime DESC"
cursor.execute(query, params)
history = []
for row in cursor.fetchall():
pre_serial = row.PreSerial
# 해당 처방의 약품 목록 조회
cursor.execute("""
SELECT
s.DrugCode,
s.Days,
s.QUAN,
s.QUAN_TIME,
g.GoodsName,
m.PRINT_TYPE
FROM PS_sub_pharm s
LEFT JOIN PM_DRUG.dbo.CD_GOODS g ON s.DrugCode = g.DrugCode
LEFT JOIN PM_DRUG.dbo.CD_MC m ON s.DrugCode = m.DRUGCODE
WHERE s.PreSerial = ?
ORDER BY s.SUB_SERIAL
""", (pre_serial,))
medications = []
for med_row in cursor.fetchall():
medications.append({
'medication_code': med_row.DrugCode or '',
'med_name': med_row.GoodsName or med_row.DrugCode or '',
'add_info': med_row.PRINT_TYPE or '',
'dosage': float(med_row.QUAN) if med_row.QUAN else 0,
'frequency': med_row.QUAN_TIME or 0,
'duration': med_row.Days or 0
})
# 날짜 포맷
pass_day = row.PassDay
if pass_day and len(pass_day) == 8:
date_formatted = f"{pass_day[:4]}-{pass_day[4:6]}-{pass_day[6:8]}"
else:
date_formatted = pass_day or ''
history.append({
'prescription_id': pre_serial,
'date': date_formatted,
'time': row.PresTime or '',
'patient_name': row.Paname or '',
'hospital': row.InsName or '',
'doctor': row.Drname or '',
'copayment': int(row.PRICE_P or 0),
'medication_count': row.drug_count or 0,
'medications': medications
})
conn.close()
return jsonify({
'success': True,
'cus_code': cus_code,
'count': len(history),
'history': history
})
except Exception as e:
logging.error(f"환자 이전 처방 조회 오류: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
# ─────────────────────────────────────────────────────────────
# API: 환자 OTC 구매 이력
# ─────────────────────────────────────────────────────────────
@pmr_bp.route('/api/patient/<cus_code>/otc', methods=['GET'])
def get_patient_otc_history(cus_code):
"""
환자 OTC (일반의약품) 구매 이력 조회
Args:
cus_code: 환자 고유코드 (CusCode = SL_CD_custom)
Query Params:
- limit: 최대 조회 건수 (기본 20, 최대 100)
"""
try:
limit = min(int(request.args.get('limit', 20)), 100)
conn = get_mssql_connection('PM_PRES')
cursor = conn.cursor()
# OTC 거래 목록 조회 (PRESERIAL = 'V' = OTC 판매)
cursor.execute("""
SELECT
m.SL_NO_order,
m.SL_DT_appl,
m.InsertTime,
m.SL_MY_sale,
m.SL_NM_custom
FROM SALE_MAIN m
WHERE m.SL_CD_custom = ?
AND m.PRESERIAL = 'V'
ORDER BY m.InsertTime DESC
""", (cus_code,))
# 먼저 거래 목록 수집
orders = []
for row in cursor.fetchall():
orders.append({
'order_no': row.SL_NO_order,
'date': row.SL_DT_appl,
'datetime': row.InsertTime.strftime('%Y-%m-%d %H:%M') if row.InsertTime else '',
'amount': int(row.SL_MY_sale or 0),
'customer_name': row.SL_NM_custom or ''
})
# 최근 limit개만
orders = orders[:limit]
if not orders:
conn.close()
return jsonify({
'success': True,
'cus_code': cus_code,
'count': 0,
'purchases': []
})
# 각 거래의 품목 조회
purchases = []
all_drug_codes = []
for order in orders:
cursor.execute("""
SELECT
s.DrugCode,
g.GoodsName,
s.SL_NM_item,
s.SL_TOTAL_PRICE,
mc.PRINT_TYPE
FROM SALE_SUB s
LEFT JOIN PM_DRUG.dbo.CD_GOODS g ON s.DrugCode = g.DrugCode
LEFT JOIN PM_DRUG.dbo.CD_MC mc ON s.DrugCode = mc.DRUGCODE
WHERE s.SL_NO_order = ?
ORDER BY s.DrugCode
""", (order['order_no'],))
items = []
for item_row in cursor.fetchall():
drug_code = item_row.DrugCode or ''
all_drug_codes.append(drug_code)
items.append({
'drug_code': drug_code,
'name': item_row.GoodsName or drug_code,
'quantity': int(item_row.SL_NM_item or 0),
'price': int(item_row.SL_TOTAL_PRICE or 0),
'category': item_row.PRINT_TYPE or '',
'image': None
})
purchases.append({
**order,
'items': items,
'item_count': len(items)
})
conn.close()
# 제품 이미지 조회 (product_images.db)
image_map = {}
try:
img_db_path = Path(__file__).parent / 'db' / 'product_images.db'
if img_db_path.exists() and all_drug_codes:
img_conn = sqlite3.connect(str(img_db_path))
img_cursor = img_conn.cursor()
placeholders = ','.join(['?' for _ in all_drug_codes])
img_cursor.execute(f'''
SELECT drug_code, thumbnail_base64
FROM product_images
WHERE drug_code IN ({placeholders}) AND thumbnail_base64 IS NOT NULL
''', all_drug_codes)
for row in img_cursor.fetchall():
image_map[row[0]] = row[1]
img_conn.close()
except Exception as img_err:
logging.warning(f"제품 이미지 조회 오류: {img_err}")
# 이미지 매핑
for purchase in purchases:
for item in purchase['items']:
if item['drug_code'] in image_map:
item['image'] = image_map[item['drug_code']]
# 통계 계산
total_amount = sum(p['amount'] for p in purchases)
total_visits = len(purchases)
# 자주 구매하는 품목 집계
item_freq = {}
for p in purchases:
for item in p['items']:
key = item['drug_code']
if key not in item_freq:
item_freq[key] = {
'name': item['name'],
'category': item['category'],
'count': 0,
'total_qty': 0
}
item_freq[key]['count'] += 1
item_freq[key]['total_qty'] += item['quantity']
# 빈도순 정렬
frequent_items = sorted(item_freq.values(), key=lambda x: x['count'], reverse=True)[:10]
return jsonify({
'success': True,
'cus_code': cus_code,
'count': len(purchases),
'summary': {
'total_visits': total_visits,
'total_amount': total_amount,
'frequent_items': frequent_items
},
'purchases': purchases
})
except Exception as e:
logging.error(f"환자 OTC 구매 이력 조회 오류: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
# ─────────────────────────────────────────────────────────────
# PAAI (Pharmacist Assistant AI) API
# ─────────────────────────────────────────────────────────────
@pmr_bp.route('/api/paai/analyze', methods=['POST'])
def paai_analyze():
"""
PAAI 분석 API
Request:
{
"pre_serial": "P20260304001",
"cus_code": "00001234",
"patient_name": "홍길동",
"disease_info": {
"code_1": "M170", "name_1": "무릎골관절염",
"code_2": "K299", "name_2": "상세불명의 위십이지장염"
},
"current_medications": [
{"code": "055101150", "name": "록소프로펜정", "dosage": 1, "frequency": 3, "days": 7}
],
"previous_serial": "P20260225001",
"previous_medications": [...],
"otc_history": {...}
}
"""
import requests as http_requests
import time as time_module
from db.paai_logger import create_log, update_kims_result, update_ai_result, update_error
start_time = time_module.time()
log_id = None
try:
data = request.get_json()
if not data:
return jsonify({'success': False, 'error': '요청 데이터가 없습니다.'}), 400
pre_serial = data.get('pre_serial')
cus_code = data.get('cus_code')
patient_name = data.get('patient_name')
disease_info = data.get('disease_info', {})
current_medications = data.get('current_medications', [])
previous_serial = data.get('previous_serial')
previous_medications = data.get('previous_medications', [])
otc_history = data.get('otc_history', {})
# 처방 변화 분석
prescription_changes = analyze_prescription_changes(
current_medications, previous_medications
)
# 1. 로그 생성
log_id = create_log(
pre_serial=pre_serial,
patient_code=cus_code,
patient_name=patient_name,
disease_code_1=disease_info.get('code_1'),
disease_name_1=disease_info.get('name_1'),
disease_code_2=disease_info.get('code_2'),
disease_name_2=disease_info.get('name_2'),
current_medications=current_medications,
previous_serial=previous_serial,
previous_medications=previous_medications,
prescription_changes=prescription_changes,
otc_history=otc_history
)
# 2. KD코드 추출 (9자리 KIMS 코드)
kd_codes = []
drug_names = []
for med in current_medications:
code = med.get('code', '')
if code and len(str(code)) == 9:
kd_codes.append(str(code))
drug_names.append(med.get('name', ''))
# 3. KIMS API 호출 (약품 2개 이상인 경우)
kims_interactions = []
kims_start = time_module.time()
if len(kd_codes) >= 2:
try:
kims_url = "https://api2.kims.co.kr/api/interaction/info"
kims_headers = {
'Authorization': 'Basic VFNQTUtSOg==',
'Content-Type': 'application/json',
'Accept': 'application/json; charset=utf-8'
}
kims_payload = {'KDCodes': kd_codes}
kims_response = http_requests.get(
kims_url,
headers=kims_headers,
data=__import__('json').dumps(kims_payload),
timeout=10,
verify=False
)
if kims_response.status_code == 200:
kims_data = kims_response.json()
if kims_data.get('Message') == 'SUCCESS':
kims_interactions = kims_data.get('InteractionList', [])
except Exception as kims_err:
logging.warning(f"KIMS API 오류 (무시하고 계속): {kims_err}")
kims_time = int((time_module.time() - kims_start) * 1000)
# KIMS 결과 로그 업데이트
update_kims_result(
log_id=log_id,
kims_drug_codes=kd_codes,
kims_interactions=kims_interactions,
kims_response_time_ms=kims_time
)
# 4. AI 프롬프트 생성
ai_prompt = build_paai_prompt(
disease_info=disease_info,
current_medications=current_medications,
prescription_changes=prescription_changes,
kims_interactions=kims_interactions,
otc_history=otc_history
)
# 5. Clawdbot AI 호출 (WebSocket)
ai_start = time_module.time()
ai_response = call_clawdbot_ai(ai_prompt)
ai_time = int((time_module.time() - ai_start) * 1000)
# AI 결과 로그 업데이트
update_ai_result(
log_id=log_id,
ai_prompt=ai_prompt,
ai_model='claude-sonnet-4',
ai_response=ai_response,
ai_response_time_ms=ai_time
)
total_time = int((time_module.time() - start_time) * 1000)
return jsonify({
'success': True,
'log_id': log_id,
'analysis': ai_response,
'kims_summary': {
'drug_count': len(kd_codes),
'interaction_count': len(kims_interactions),
'has_severe': any(str(i.get('Severity', '5')) in ['1', '2'] for i in kims_interactions)
},
'timing': {
'kims_ms': kims_time,
'ai_ms': ai_time,
'total_ms': total_time
}
})
except Exception as e:
logging.error(f"PAAI 분석 오류: {e}")
if log_id:
update_error(log_id, str(e))
return jsonify({'success': False, 'error': str(e)}), 500
def analyze_prescription_changes(current: list, previous: list) -> dict:
"""처방 변화 분석"""
current_codes = {m.get('code'): m for m in current}
previous_codes = {m.get('code'): m for m in previous}
added = []
removed = []
changed = []
same = []
# 추가된 약품
for code, med in current_codes.items():
if code not in previous_codes:
added.append(med)
else:
# 변경 여부 확인
prev_med = previous_codes[code]
changes = []
for field in ['dosage', 'frequency', 'days']:
if med.get(field) != prev_med.get(field):
changes.append({
'field': field,
'from': prev_med.get(field),
'to': med.get(field)
})
if changes:
changed.append({'medication': med, 'changes': changes})
else:
same.append(med)
# 중단된 약품
for code, med in previous_codes.items():
if code not in current_codes:
removed.append(med)
return {
'added': added,
'removed': removed,
'changed': changed,
'same': same
}
def build_paai_prompt(
disease_info: dict,
current_medications: list,
prescription_changes: dict,
kims_interactions: list,
otc_history: dict
) -> str:
"""AI 프롬프트 생성"""
# 질병 정보
diseases = []
if disease_info.get('code_1'):
diseases.append(f"[{disease_info['code_1']}] {disease_info.get('name_1', '')}")
if disease_info.get('code_2'):
diseases.append(f"[{disease_info['code_2']}] {disease_info.get('name_2', '')}")
# 현재 처방
med_lines = []
for med in current_medications:
line = f"- {med.get('name', '?')}: {med.get('dosage', 0)}× {med.get('frequency', 0)}× {med.get('days', 0)}"
med_lines.append(line)
# 처방 변화
change_lines = []
if prescription_changes.get('added'):
names = [m.get('name', '?') for m in prescription_changes['added']]
change_lines.append(f"- 추가: {', '.join(names)}")
if prescription_changes.get('removed'):
names = [m.get('name', '?') for m in prescription_changes['removed']]
change_lines.append(f"- 중단: {', '.join(names)}")
if prescription_changes.get('changed'):
for item in prescription_changes['changed']:
med = item['medication']
changes = item['changes']
change_desc = ', '.join([f"{c['field']}: {c['from']}{c['to']}" for c in changes])
change_lines.append(f"- 변경: {med.get('name', '?')} ({change_desc})")
# KIMS 상호작용
kims_lines = []
for inter in kims_interactions:
severity = inter.get('Severity', 5)
severity_text = {1: '🔴 금기', 2: '🟠 경고', 3: '🟡 주의', 4: '참고', 5: '정보'}.get(int(severity), '정보')
drug1 = inter.get('Drug1Name', '?')
drug2 = inter.get('Drug2Name', '?')
desc = inter.get('InteractionDesc', '')[:100]
kims_lines.append(f"- [{severity_text}] {drug1} + {drug2}: {desc}")
# OTC 이력
otc_lines = []
if otc_history.get('frequent_items'):
for item in otc_history['frequent_items'][:5]:
otc_lines.append(f"- {item.get('name', '?')} ({item.get('count', 0)}회 구매)")
prompt = f"""당신은 약사를 보조하는 AI입니다. 환자 정보와 KIMS 상호작용 데이터를 바탕으로 분석해주세요.
## 환자 질병
{chr(10).join(diseases) if diseases else '- 정보 없음'}
## 현재 처방
{chr(10).join(med_lines) if med_lines else '- 정보 없음'}
## 처방 변화 (vs 이전)
{chr(10).join(change_lines) if change_lines else '- 변화 없음'}
## KIMS 약물 상호작용
{chr(10).join(kims_lines) if kims_lines else '- 상호작용 없음'}
## OTC 구매 이력
{chr(10).join(otc_lines) if otc_lines else '- 이력 없음'}
## 요청
다음 형식의 JSON으로 간결하게 답변해주세요:
{{
"prescription_insight": "처방 변화에 대한 분석 (1-2문장)",
"kims_analysis": "KIMS 상호작용 해석 및 임상적 의미 (1-2문장)",
"cautions": ["복용 주의사항 1", "복용 주의사항 2"],
"otc_recommendations": [
{{"product": "추천 OTC명", "reason": "추천 이유 (현재 처방과 상호작용 없음 확인)"}}
],
"counseling_points": ["상담 포인트 1", "상담 포인트 2"]
}}
"""
return prompt
def call_clawdbot_ai(prompt: str) -> dict:
"""Clawdbot AI 호출 (HTTP API)"""
import requests as http_requests
import json
try:
# Clawdbot Gateway API 호출
response = http_requests.post(
'http://localhost:8765/api/chat',
json={
'message': prompt,
'session': 'paai-analysis',
'timeout': 60
},
timeout=65
)
if response.status_code == 200:
result = response.json()
# AI 응답에서 JSON 파싱 시도
ai_text = result.get('response', '')
# JSON 블록 추출
try:
import re
json_match = re.search(r'\{[\s\S]*\}', ai_text)
if json_match:
return json.loads(json_match.group())
except:
pass
# JSON 파싱 실패 시 텍스트 그대로 반환
return {
'prescription_insight': ai_text[:200] if ai_text else '분석 결과 없음',
'kims_analysis': '',
'cautions': [],
'otc_recommendations': [],
'counseling_points': []
}
else:
raise Exception(f"Clawdbot API 오류: {response.status_code}")
except http_requests.exceptions.ConnectionError:
# Clawdbot 연결 안됨 - 기본 응답 생성
return generate_fallback_response(prompt)
except Exception as e:
logging.error(f"Clawdbot AI 호출 오류: {e}")
return generate_fallback_response(prompt)
def generate_fallback_response(prompt: str) -> dict:
"""Clawdbot 연결 실패 시 기본 응답"""
return {
'prescription_insight': 'AI 분석 서비스에 연결할 수 없습니다. KIMS 상호작용 정보를 직접 확인해주세요.',
'kims_analysis': '',
'cautions': ['AI 분석 불가 - 직접 검토 필요'],
'otc_recommendations': [],
'counseling_points': [],
'_fallback': True
}
@pmr_bp.route('/api/paai/feedback', methods=['POST'])
def paai_feedback():
"""PAAI 피드백 저장"""
from db.paai_logger import update_feedback
try:
data = request.get_json()
log_id = data.get('log_id')
useful = data.get('useful')
comment = data.get('comment')
if not log_id:
return jsonify({'success': False, 'error': 'log_id 필요'}), 400
update_feedback(log_id, useful, comment)
return jsonify({'success': True})
except Exception as e:
logging.error(f"PAAI 피드백 저장 오류: {e}")
return jsonify({'success': False, 'error': str(e)}), 500