pharmacy-pos-qr-system/backend/pmr_api.py
thug0bin cb90d4a7a6 fix: 처방목록 조회 기준을 발행일에서 조제일로 변경
문제:
- PMR 처방 목록이 PassDay(처방전 발행일) 기준으로 조회되어
  발행일과 조제일이 다른 처방(예: 3일 전 발행, 오늘 조제)이
  오늘 목록에 표시되지 않는 버그

해결:
- PS_MAIN 테이블 조회 시 PassDay 대신 Indate(조제일) 기준으로 변경
- issue_date(발행일), dispense_date(조제일) 필드 추가로 구분 명확화

추가 변경:
- WebSocket 연결/해제 시 토스트 알림 추가
- WebSocket 프록시 트러블슈팅 문서 추가 (NPM 설정 가이드)
2026-03-05 10:56:24 +09:00

1437 lines
51 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# pmr_api.py - 조제관리(PMR) Blueprint API
# PharmaIT3000 MSSQL 연동 (192.168.0.4)
from flask import Blueprint, jsonify, request, render_template, send_file
import pyodbc
import sqlite3
from pathlib import Path
from datetime import datetime, date
import logging
from PIL import Image, ImageDraw, ImageFont
import io
import base64
import os
pmr_bp = Blueprint('pmr', __name__, url_prefix='/pmr')
# ─────────────────────────────────────────────────────────────
# MSSQL 연결 설정 (PharmaIT3000 - 192.168.0.4)
# ─────────────────────────────────────────────────────────────
MSSQL_CONFIG = {
'server': '192.168.0.4\\PM2014',
'username': 'sa',
'password': 'tmddls214!%(',
'driver': 'ODBC Driver 17 for SQL Server'
}
def get_mssql_connection(database='PM_PRES'):
"""MSSQL 연결 획득"""
conn_str = (
f"DRIVER={{{MSSQL_CONFIG['driver']}}};"
f"SERVER={MSSQL_CONFIG['server']};"
f"DATABASE={database};"
f"UID={MSSQL_CONFIG['username']};"
f"PWD={MSSQL_CONFIG['password']};"
"TrustServerCertificate=yes;"
"Connection Timeout=10"
)
return pyodbc.connect(conn_str, timeout=10)
# ─────────────────────────────────────────────────────────────
# 조제관리 페이지
# ─────────────────────────────────────────────────────────────
@pmr_bp.route('/')
def pmr_index():
"""조제관리 메인 페이지"""
return render_template('pmr.html')
# ─────────────────────────────────────────────────────────────
# API: 날짜별 처방전 목록
# ─────────────────────────────────────────────────────────────
@pmr_bp.route('/api/prescriptions', methods=['GET'])
def get_prescriptions_by_date():
"""
날짜별 처방전 목록 조회
Query Params:
- date: YYYY-MM-DD (기본값: 오늘)
"""
try:
date_str = request.args.get('date', date.today().strftime('%Y-%m-%d'))
# YYYYMMDD 형식으로 변환
date_yyyymmdd = date_str.replace('-', '')
conn = get_mssql_connection('PM_PRES')
cursor = conn.cursor()
cursor.execute("""
SELECT
PreSerial,
Day_Serial,
PassDay,
Indate,
Paname,
PaNum,
CusCode,
InsName,
Drname,
PresTime,
PreGubun,
PRICE_T,
PRICE_P,
PRICE_C
FROM PS_MAIN
WHERE Indate = ?
ORDER BY Day_Serial ASC
""", (date_yyyymmdd,))
prescriptions = []
for row in cursor.fetchall():
# 주민번호에서 나이/성별 추출
panum = row.PaNum or ''
age = None
gender = None
if len(panum) >= 7:
try:
birth_year = int(panum[:2])
gender_code = panum[6] if len(panum) > 6 else ''
# 성별 및 세기 판단
if gender_code in ['1', '2', '5', '6']:
birth_year += 1900
elif gender_code in ['3', '4', '7', '8']:
birth_year += 2000
else:
birth_year += 1900
gender = '' if gender_code in ['1', '3', '5', '7'] else ''
age = datetime.now().year - birth_year
except:
pass
prescriptions.append({
'prescription_id': row.PreSerial,
'order_number': row.Day_Serial,
'date': row.Indate, # 조제일 기준
'issue_date': row.PassDay, # 처방전 발행일
'dispense_date': row.Indate, # 조제일
'patient_name': row.Paname,
'patient_id': row.PaNum[:6] + '******' if row.PaNum and len(row.PaNum) > 6 else row.PaNum,
'patient_code': row.CusCode,
'hospital': row.InsName,
'doctor': row.Drname,
'time': row.PresTime,
'type': '급여' if row.PreGubun == '0' else '비급여' if row.PreGubun == '9' else row.PreGubun,
'age': age,
'gender': gender,
'price_total': row.PRICE_T,
'price_patient': row.PRICE_P,
'price_claim': row.PRICE_C
})
conn.close()
return jsonify({
'success': True,
'date': date_str,
'count': len(prescriptions),
'prescriptions': prescriptions
})
except Exception as e:
logging.error(f"PMR 처방전 목록 조회 오류: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
# ─────────────────────────────────────────────────────────────
# API: 처방전 상세 (약품 목록)
# ─────────────────────────────────────────────────────────────
@pmr_bp.route('/api/prescription/<prescription_id>', methods=['GET'])
def get_prescription_detail(prescription_id):
"""
처방전 상세 정보 (약품 목록 포함)
"""
try:
conn = get_mssql_connection('PM_PRES')
cursor = conn.cursor()
# 처방전 기본 정보 + 질병정보
cursor.execute("""
SELECT
M.PreSerial,
M.Day_Serial,
M.PassDay,
M.Paname,
M.PaNum,
M.CusCode,
M.InsName,
M.Drname,
M.PresTime,
M.PreGubun,
M.PRICE_T,
M.PRICE_P,
M.PRICE_C,
M.St1,
M.St2,
S1.AU_NAME,
S2.AU_NAME
FROM PS_MAIN M
LEFT JOIN PM_BASE.dbo.CD_SANG S1 ON M.St1 = S1.AU_CODE
LEFT JOIN PM_BASE.dbo.CD_SANG S2 ON M.St2 = S2.AU_CODE
WHERE M.PreSerial = ?
""", (prescription_id,))
rx_row = cursor.fetchone()
if not rx_row:
conn.close()
return jsonify({'success': False, 'error': '처방전을 찾을 수 없습니다'}), 404
# 처방 약품 목록 (PS_sub_pharm + CD_GOODS + CD_MC JOIN)
medications = []
cursor.execute("""
SELECT
s.DrugCode,
s.Days,
s.QUAN,
s.QUAN_TIME,
s.PS_Type,
s.INV_QUAN,
g.GoodsName,
g.SUNG_CODE,
m.PRINT_TYPE,
m.SIM_EFFECT
FROM PS_sub_pharm s
LEFT JOIN PM_DRUG.dbo.CD_GOODS g ON s.DrugCode = g.DrugCode
LEFT JOIN PM_DRUG.dbo.CD_MC m ON s.DrugCode = m.DRUGCODE
WHERE s.PreSerial = ?
ORDER BY s.SUB_SERIAL
""", (prescription_id,))
for row in cursor.fetchall():
# 효능: PRINT_TYPE > SIM_EFFECT > 없음
add_info = row.PRINT_TYPE or row.SIM_EFFECT or ''
medications.append({
'medication_code': row.DrugCode or '',
'med_name': row.GoodsName or row.DrugCode or '',
'add_info': add_info,
'dosage': float(row.QUAN) if row.QUAN else 0,
'frequency': row.QUAN_TIME or 0,
'duration': row.Days or 0,
'total_qty': float(row.INV_QUAN) if row.INV_QUAN else 0,
'type': '급여' if row.PS_Type in ['0', '4'] else '비급여' if row.PS_Type == '1' else row.PS_Type,
'sung_code': row.SUNG_CODE or ''
})
# 나이/성별 계산
panum = rx_row.PaNum or ''
age = None
gender = None
if len(panum) >= 7:
try:
birth_year = int(panum[:2])
gender_code = panum[6]
if gender_code in ['1', '2', '5', '6']:
birth_year += 1900
elif gender_code in ['3', '4', '7', '8']:
birth_year += 2000
gender = '' if gender_code in ['1', '3', '5', '7'] else ''
age = datetime.now().year - birth_year
except:
pass
# 질병 정보 (St1, St2 → AU_NAME)
disease_info = None
st1 = rx_row[13] or '' # St1
st2 = rx_row[14] or '' # St2
disease_name_1 = rx_row[15] or '' # S1.AU_NAME
disease_name_2 = rx_row[16] or '' # S2.AU_NAME
if st1 or st2:
disease_info = {
'code_1': st1,
'code_2': st2,
'name_1': disease_name_1,
'name_2': disease_name_2
}
# 환자 특이사항(CUSETC) 조회 - CD_PERSON 테이블
cusetc = ''
cus_code = rx_row.CusCode
if cus_code:
try:
# PM_BASE.dbo.CD_PERSON에서 조회
cursor.execute("""
SELECT CUSETC FROM PM_BASE.dbo.CD_PERSON WHERE CUSCODE = ?
""", (cus_code,))
person_row = cursor.fetchone()
if person_row and person_row.CUSETC:
cusetc = person_row.CUSETC
except Exception as e:
logging.warning(f"특이사항 조회 실패: {e}")
conn.close()
return jsonify({
'success': True,
'prescription': {
'prescription_id': rx_row.PreSerial,
'order_number': rx_row.Day_Serial,
'date': rx_row.PassDay,
'time': rx_row.PresTime,
'hospital': rx_row.InsName,
'doctor': rx_row.Drname,
'type': '급여' if rx_row.PreGubun == '0' else '비급여',
'price_total': rx_row.PRICE_T,
'price_patient': rx_row.PRICE_P
},
'patient': {
'name': rx_row.Paname,
'code': rx_row.CusCode,
'cus_code': rx_row.CusCode, # 호환성
'age': age,
'gender': gender,
'cusetc': cusetc # 특이사항
},
'disease_info': disease_info,
'medications': medications,
'medication_count': len(medications)
})
except Exception as e:
logging.error(f"PMR 처방전 상세 조회 오류: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
# ─────────────────────────────────────────────────────────────
# API: 통계 (당일 요약)
# ─────────────────────────────────────────────────────────────
@pmr_bp.route('/api/stats', methods=['GET'])
def get_daily_stats():
"""당일 조제 통계"""
try:
date_str = request.args.get('date', date.today().strftime('%Y-%m-%d'))
date_yyyymmdd = date_str.replace('-', '')
conn = get_mssql_connection('PM_PRES')
cursor = conn.cursor()
# 처방전 수
cursor.execute("""
SELECT COUNT(*) as cnt
FROM PS_MAIN
WHERE PassDay = ?
""", (date_yyyymmdd,))
total_prescriptions = cursor.fetchone()[0]
# 총 금액
cursor.execute("""
SELECT
ISNULL(SUM(PRICE_T), 0) as total,
ISNULL(SUM(PRICE_P), 0) as patient,
ISNULL(SUM(PRICE_C), 0) as claim
FROM PS_MAIN
WHERE PassDay = ?
""", (date_yyyymmdd,))
price_row = cursor.fetchone()
conn.close()
return jsonify({
'success': True,
'date': date_str,
'stats': {
'total_prescriptions': total_prescriptions,
'total_amount': price_row[0] if price_row else 0,
'patient_amount': price_row[1] if price_row else 0,
'claim_amount': price_row[2] if price_row else 0
}
})
except Exception as e:
logging.error(f"PMR 통계 조회 오류: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
# ─────────────────────────────────────────────────────────────
# API: DB 연결 테스트
# ─────────────────────────────────────────────────────────────
@pmr_bp.route('/api/test', methods=['GET'])
def test_connection():
"""DB 연결 테스트"""
try:
conn = get_mssql_connection('PM_PRES')
cursor = conn.cursor()
cursor.execute("SELECT @@VERSION")
version = cursor.fetchone()[0]
conn.close()
return jsonify({
'success': True,
'server': MSSQL_CONFIG['server'],
'version': version[:100] + '...'
})
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 500
# ─────────────────────────────────────────────────────────────
# API: 라벨 미리보기
# ─────────────────────────────────────────────────────────────
@pmr_bp.route('/api/label/preview', methods=['POST'])
def preview_label():
"""
라벨 미리보기 (PIL 렌더링 → Base64 이미지)
Request Body:
- patient_name: 환자명
- med_name: 약품명
- dosage: 1회 복용량
- frequency: 복용 횟수
- duration: 복용 일수
- unit: 단위 (정, 캡슐, mL 등)
"""
try:
data = request.get_json()
patient_name = data.get('patient_name', '')
med_name = data.get('med_name', '')
add_info = data.get('add_info', '')
dosage = float(data.get('dosage', 0))
frequency = int(data.get('frequency', 0))
duration = int(data.get('duration', 0))
unit = data.get('unit', '')
# 라벨 이미지 생성
image = create_label_image(
patient_name=patient_name,
med_name=med_name,
add_info=add_info,
dosage=dosage,
frequency=frequency,
duration=duration,
unit=unit
)
# Base64 인코딩
buffer = io.BytesIO()
image.save(buffer, format='PNG')
buffer.seek(0)
img_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8')
return jsonify({
'success': True,
'image': f'data:image/png;base64,{img_base64}'
})
except Exception as e:
logging.error(f"라벨 미리보기 오류: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
def create_label_image(patient_name, med_name, add_info='', dosage=0, frequency=0, duration=0, unit=''):
"""
라벨 이미지 생성 (29mm 용지 기준)
"""
# 라벨 크기 (29mm 용지, 300dpi 기준)
label_width = 306
label_height = 380
image = Image.new("RGB", (label_width, label_height), "white")
draw = ImageDraw.Draw(image)
# 폰트 설정 (Windows 경로)
font_path = "C:/Windows/Fonts/malgunbd.ttf"
if not os.path.exists(font_path):
font_path = "C:/Windows/Fonts/malgun.ttf"
try:
name_font = ImageFont.truetype(font_path, 36)
drug_font = ImageFont.truetype(font_path, 24)
info_font = ImageFont.truetype(font_path, 22)
small_font = ImageFont.truetype(font_path, 18)
except:
name_font = ImageFont.load_default()
drug_font = ImageFont.load_default()
info_font = ImageFont.load_default()
small_font = ImageFont.load_default()
# 중앙 정렬 텍스트 함수
def draw_centered(text, y, font, fill="black"):
bbox = draw.textbbox((0, 0), text, font=font)
w = bbox[2] - bbox[0]
x = (label_width - w) // 2
draw.text((x, y), text, font=font, fill=fill)
return y + bbox[3] - bbox[1] + 5
# 약품명 줄바꿈 처리
def wrap_text(text, font, max_width):
lines = []
words = text.split()
current_line = ""
for word in words:
test_line = f"{current_line} {word}".strip()
bbox = draw.textbbox((0, 0), test_line, font=font)
if bbox[2] - bbox[0] <= max_width:
current_line = test_line
else:
if current_line:
lines.append(current_line)
current_line = word
if current_line:
lines.append(current_line)
return lines if lines else [text]
y = 15
# 환자명 (띄어쓰기)
spaced_name = " ".join(patient_name) if patient_name else ""
y = draw_centered(spaced_name, y, name_font)
y += 5
# 약품명 (줄바꿈)
# 괄호 앞에서 분리
if '(' in med_name:
main_name = med_name.split('(')[0].strip()
else:
main_name = med_name
# 약품명 줄바꿈
name_lines = wrap_text(main_name, drug_font, label_width - 30)
for line in name_lines:
y = draw_centered(line, y, drug_font)
# 효능효과 (add_info)
if add_info:
y = draw_centered(f"({add_info})", y, small_font, fill="gray")
y += 5
# 총량 계산
if dosage > 0 and frequency > 0 and duration > 0:
total = dosage * frequency * duration
total_str = str(int(total)) if total == int(total) else f"{total:.1f}"
total_text = f"{total_str}{unit} / {duration}일분"
y = draw_centered(total_text, y, info_font)
y += 5
# 용법 박스
box_margin = 20
box_top = y
box_bottom = y + 70
draw.rectangle(
[(box_margin, box_top), (label_width - box_margin, box_bottom)],
outline="black",
width=2
)
# 박스 내용
dosage_str = str(int(dosage)) if dosage == int(dosage) else f"{dosage:.2f}".rstrip('0').rstrip('.')
dosage_text = f"{dosage_str}{unit}"
# 복용 시간
if frequency == 1:
time_text = "아침"
elif frequency == 2:
time_text = "아침, 저녁"
elif frequency == 3:
time_text = "아침, 점심, 저녁"
else:
time_text = f"1일 {frequency}"
box_center_y = (box_top + box_bottom) // 2
draw_centered(dosage_text, box_center_y - 20, info_font)
draw_centered(time_text, box_center_y + 5, info_font)
y = box_bottom + 10
# 조제일
today = datetime.now().strftime('%Y-%m-%d')
y = draw_centered(f"조제일: {today}", y, small_font)
# 약국명 (하단)
pharmacy_y = label_height - 40
draw.rectangle(
[(50, pharmacy_y - 5), (label_width - 50, pharmacy_y + 25)],
outline="black",
width=1
)
draw_centered("청 춘 약 국", pharmacy_y, info_font)
return image
# ─────────────────────────────────────────────────────────────
# API: 환자 이전 처방 이력
# ─────────────────────────────────────────────────────────────
@pmr_bp.route('/api/patient/<cus_code>/history', methods=['GET'])
def get_patient_history(cus_code):
"""
환자 이전 처방 이력 조회
Args:
cus_code: 환자 고유코드 (CusCode)
Query Params:
- limit: 최대 조회 건수 (기본 10, 최대 50)
- exclude: 제외할 처방번호 (현재 처방)
"""
try:
limit = min(int(request.args.get('limit', 10)), 50)
exclude_serial = request.args.get('exclude', '')
conn = get_mssql_connection('PM_PRES')
cursor = conn.cursor()
# 이전 처방 목록 조회
query = """
SELECT TOP (?)
m.PreSerial,
m.PassDay,
m.PresTime,
m.Paname,
m.InsName,
m.Drname,
m.PRICE_P,
(SELECT COUNT(*) FROM PS_sub_pharm WHERE PreSerial = m.PreSerial) as drug_count
FROM PS_MAIN m
WHERE m.CusCode = ?
"""
params = [limit, cus_code]
if exclude_serial:
query += " AND m.PreSerial != ?"
params.append(exclude_serial)
query += " ORDER BY m.PassDay DESC, m.PresTime DESC"
cursor.execute(query, params)
history = []
for row in cursor.fetchall():
pre_serial = row.PreSerial
# 해당 처방의 약품 목록 조회
cursor.execute("""
SELECT
s.DrugCode,
s.Days,
s.QUAN,
s.QUAN_TIME,
g.GoodsName,
m.PRINT_TYPE
FROM PS_sub_pharm s
LEFT JOIN PM_DRUG.dbo.CD_GOODS g ON s.DrugCode = g.DrugCode
LEFT JOIN PM_DRUG.dbo.CD_MC m ON s.DrugCode = m.DRUGCODE
WHERE s.PreSerial = ?
ORDER BY s.SUB_SERIAL
""", (pre_serial,))
medications = []
for med_row in cursor.fetchall():
medications.append({
'medication_code': med_row.DrugCode or '',
'med_name': med_row.GoodsName or med_row.DrugCode or '',
'add_info': med_row.PRINT_TYPE or '',
'dosage': float(med_row.QUAN) if med_row.QUAN else 0,
'frequency': med_row.QUAN_TIME or 0,
'duration': med_row.Days or 0
})
# 날짜 포맷
pass_day = row.PassDay
if pass_day and len(pass_day) == 8:
date_formatted = f"{pass_day[:4]}-{pass_day[4:6]}-{pass_day[6:8]}"
else:
date_formatted = pass_day or ''
history.append({
'prescription_id': pre_serial,
'date': date_formatted,
'time': row.PresTime or '',
'patient_name': row.Paname or '',
'hospital': row.InsName or '',
'doctor': row.Drname or '',
'copayment': int(row.PRICE_P or 0),
'medication_count': row.drug_count or 0,
'medications': medications
})
conn.close()
return jsonify({
'success': True,
'cus_code': cus_code,
'count': len(history),
'history': history
})
except Exception as e:
logging.error(f"환자 이전 처방 조회 오류: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
# ─────────────────────────────────────────────────────────────
# API: 환자 OTC 구매 이력
# ─────────────────────────────────────────────────────────────
@pmr_bp.route('/api/patient/<cus_code>/otc', methods=['GET'])
def get_patient_otc_history(cus_code):
"""
환자 OTC (일반의약품) 구매 이력 조회
Args:
cus_code: 환자 고유코드 (CusCode = SL_CD_custom)
Query Params:
- limit: 최대 조회 건수 (기본 20, 최대 100)
"""
try:
limit = min(int(request.args.get('limit', 20)), 100)
conn = get_mssql_connection('PM_PRES')
cursor = conn.cursor()
# OTC 거래 목록 조회 (PRESERIAL = 'V' = OTC 판매)
cursor.execute("""
SELECT
m.SL_NO_order,
m.SL_DT_appl,
m.InsertTime,
m.SL_MY_sale,
m.SL_NM_custom
FROM SALE_MAIN m
WHERE m.SL_CD_custom = ?
AND m.PRESERIAL = 'V'
ORDER BY m.InsertTime DESC
""", (cus_code,))
# 먼저 거래 목록 수집
orders = []
for row in cursor.fetchall():
orders.append({
'order_no': row.SL_NO_order,
'date': row.SL_DT_appl,
'datetime': row.InsertTime.strftime('%Y-%m-%d %H:%M') if row.InsertTime else '',
'amount': int(row.SL_MY_sale or 0),
'customer_name': row.SL_NM_custom or ''
})
# 최근 limit개만
orders = orders[:limit]
if not orders:
conn.close()
return jsonify({
'success': True,
'cus_code': cus_code,
'count': 0,
'purchases': []
})
# 각 거래의 품목 조회
purchases = []
all_drug_codes = []
for order in orders:
cursor.execute("""
SELECT
s.DrugCode,
g.GoodsName,
s.SL_NM_item,
s.SL_TOTAL_PRICE,
mc.PRINT_TYPE
FROM SALE_SUB s
LEFT JOIN PM_DRUG.dbo.CD_GOODS g ON s.DrugCode = g.DrugCode
LEFT JOIN PM_DRUG.dbo.CD_MC mc ON s.DrugCode = mc.DRUGCODE
WHERE s.SL_NO_order = ?
ORDER BY s.DrugCode
""", (order['order_no'],))
items = []
for item_row in cursor.fetchall():
drug_code = item_row.DrugCode or ''
all_drug_codes.append(drug_code)
items.append({
'drug_code': drug_code,
'name': item_row.GoodsName or drug_code,
'quantity': int(item_row.SL_NM_item or 0),
'price': int(item_row.SL_TOTAL_PRICE or 0),
'category': item_row.PRINT_TYPE or '',
'image': None
})
purchases.append({
**order,
'items': items,
'item_count': len(items)
})
conn.close()
# 제품 이미지 조회 (product_images.db)
image_map = {}
try:
img_db_path = Path(__file__).parent / 'db' / 'product_images.db'
if img_db_path.exists() and all_drug_codes:
img_conn = sqlite3.connect(str(img_db_path))
img_cursor = img_conn.cursor()
placeholders = ','.join(['?' for _ in all_drug_codes])
img_cursor.execute(f'''
SELECT drug_code, thumbnail_base64
FROM product_images
WHERE drug_code IN ({placeholders}) AND thumbnail_base64 IS NOT NULL
''', all_drug_codes)
for row in img_cursor.fetchall():
image_map[row[0]] = row[1]
img_conn.close()
except Exception as img_err:
logging.warning(f"제품 이미지 조회 오류: {img_err}")
# 이미지 매핑
for purchase in purchases:
for item in purchase['items']:
if item['drug_code'] in image_map:
item['image'] = image_map[item['drug_code']]
# 통계 계산
total_amount = sum(p['amount'] for p in purchases)
total_visits = len(purchases)
# 자주 구매하는 품목 집계
item_freq = {}
for p in purchases:
for item in p['items']:
key = item['drug_code']
if key not in item_freq:
item_freq[key] = {
'name': item['name'],
'category': item['category'],
'count': 0,
'total_qty': 0
}
item_freq[key]['count'] += 1
item_freq[key]['total_qty'] += item['quantity']
# 빈도순 정렬
frequent_items = sorted(item_freq.values(), key=lambda x: x['count'], reverse=True)[:10]
return jsonify({
'success': True,
'cus_code': cus_code,
'count': len(purchases),
'summary': {
'total_visits': total_visits,
'total_amount': total_amount,
'frequent_items': frequent_items
},
'purchases': purchases
})
except Exception as e:
logging.error(f"환자 OTC 구매 이력 조회 오류: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
# ─────────────────────────────────────────────────────────────
# PAAI (Pharmacist Assistant AI) API
# ─────────────────────────────────────────────────────────────
@pmr_bp.route('/api/paai/result/<pre_serial>', methods=['GET'])
def paai_get_result(pre_serial):
"""
PAAI 분석 결과 조회 (캐시)
Response:
- 결과 있음: {exists: true, status: "success", result: {...}, log_id: 123}
- 생성 중: {exists: true, status: "generating", log_id: 123}
- 없음: {exists: false}
"""
import sqlite3
import json
from pathlib import Path
try:
db_path = Path(__file__).parent / 'db' / 'paai_logs.db'
if not db_path.exists():
return jsonify({'exists': False})
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# 해당 처방의 최신 성공 로그 조회
cursor.execute('''
SELECT id, status, created_at, patient_name,
disease_name_1, disease_name_2,
current_med_count, kims_interaction_count, kims_has_severe,
kims_response_time_ms, ai_response_time_ms,
ai_response
FROM paai_logs
WHERE pre_serial = ?
ORDER BY id DESC
LIMIT 1
''', (pre_serial,))
row = cursor.fetchone()
conn.close()
if not row:
return jsonify({'exists': False})
status = row['status']
# 생성 중 상태
if status in ('pending', 'kims_done', 'generating'):
return jsonify({
'exists': True,
'status': 'generating',
'log_id': row['id']
})
# 에러 상태
if status == 'error':
return jsonify({
'exists': True,
'status': 'error',
'log_id': row['id']
})
# 성공 상태 - 전체 결과 반환
ai_response = {}
if row['ai_response']:
try:
ai_response = json.loads(row['ai_response'])
except:
pass
return jsonify({
'exists': True,
'status': 'success',
'log_id': row['id'],
'result': {
'created_at': row['created_at'],
'patient_name': row['patient_name'],
'disease_name_1': row['disease_name_1'],
'disease_name_2': row['disease_name_2'],
'medication_count': row['current_med_count'],
'kims_summary': {
'interaction_count': row['kims_interaction_count'],
'has_severe': bool(row['kims_has_severe'])
},
'timing': {
'kims_ms': row['kims_response_time_ms'],
'ai_ms': row['ai_response_time_ms'],
'total_ms': (row['kims_response_time_ms'] or 0) + (row['ai_response_time_ms'] or 0)
},
'analysis': ai_response
}
})
except Exception as e:
logging.error(f"PAAI 결과 조회 오류: {e}")
return jsonify({'exists': False, 'error': str(e)})
@pmr_bp.route('/api/paai/analyze', methods=['POST'])
def paai_analyze():
"""
PAAI 분석 API
Request:
{
"pre_serial": "P20260304001",
"cus_code": "00001234",
"patient_name": "홍길동",
"disease_info": {
"code_1": "M170", "name_1": "무릎골관절염",
"code_2": "K299", "name_2": "상세불명의 위십이지장염"
},
"current_medications": [
{"code": "055101150", "name": "록소프로펜정", "dosage": 1, "frequency": 3, "days": 7}
],
"previous_serial": "P20260225001",
"previous_medications": [...],
"otc_history": {...}
}
"""
import requests as http_requests
import time as time_module
from db.paai_logger import create_log, update_kims_result, update_ai_result, update_error
start_time = time_module.time()
log_id = None
try:
data = request.get_json()
if not data:
return jsonify({'success': False, 'error': '요청 데이터가 없습니다.'}), 400
pre_serial = data.get('pre_serial')
cus_code = data.get('cus_code')
patient_name = data.get('patient_name')
disease_info = data.get('disease_info', {})
current_medications = data.get('current_medications', [])
previous_serial = data.get('previous_serial')
previous_medications = data.get('previous_medications', [])
otc_history = data.get('otc_history', {})
# 처방 변화 분석
prescription_changes = analyze_prescription_changes(
current_medications, previous_medications
)
# 1. 로그 생성
log_id = create_log(
pre_serial=pre_serial,
patient_code=cus_code,
patient_name=patient_name,
disease_code_1=disease_info.get('code_1'),
disease_name_1=disease_info.get('name_1'),
disease_code_2=disease_info.get('code_2'),
disease_name_2=disease_info.get('name_2'),
current_medications=current_medications,
previous_serial=previous_serial,
previous_medications=previous_medications,
prescription_changes=prescription_changes,
otc_history=otc_history
)
# 2. KD코드 추출 (9자리 KIMS 코드)
kd_codes = []
drug_names = []
for med in current_medications:
code = med.get('code', '')
if code and len(str(code)) == 9:
kd_codes.append(str(code))
drug_names.append(med.get('name', ''))
# 3. KIMS API 호출 (약품 2개 이상인 경우)
kims_interactions = []
kims_start = time_module.time()
if len(kd_codes) >= 2:
try:
kims_url = "https://api2.kims.co.kr/api/interaction/info"
kims_headers = {
'Authorization': 'Basic VFNQTUtSOg==',
'Content-Type': 'application/json',
'Accept': 'application/json; charset=utf-8'
}
kims_payload = {'KDCodes': kd_codes}
kims_response = http_requests.get(
kims_url,
headers=kims_headers,
data=__import__('json').dumps(kims_payload),
timeout=10,
verify=False
)
if kims_response.status_code == 200:
kims_data = kims_response.json()
if kims_data.get('Message') == 'SUCCESS':
kims_interactions = kims_data.get('InteractionList', [])
except Exception as kims_err:
logging.warning(f"KIMS API 오류 (무시하고 계속): {kims_err}")
kims_time = int((time_module.time() - kims_start) * 1000)
# KIMS 결과 로그 업데이트
update_kims_result(
log_id=log_id,
kims_drug_codes=kd_codes,
kims_interactions=kims_interactions,
kims_response_time_ms=kims_time
)
# 4. AI 프롬프트 생성
ai_prompt = build_paai_prompt(
disease_info=disease_info,
current_medications=current_medications,
prescription_changes=prescription_changes,
kims_interactions=kims_interactions,
otc_history=otc_history
)
# 5. Clawdbot AI 호출 (WebSocket)
ai_start = time_module.time()
ai_response = call_clawdbot_ai(ai_prompt)
ai_time = int((time_module.time() - ai_start) * 1000)
# AI 결과 로그 업데이트
update_ai_result(
log_id=log_id,
ai_prompt=ai_prompt,
ai_model='claude-sonnet-4',
ai_response=ai_response,
ai_response_time_ms=ai_time
)
total_time = int((time_module.time() - start_time) * 1000)
return jsonify({
'success': True,
'log_id': log_id,
'analysis': ai_response,
'kims_summary': {
'drug_count': len(kd_codes),
'interaction_count': len(kims_interactions),
'has_severe': any(str(i.get('Severity', '5')) in ['1', '2'] for i in kims_interactions)
},
'timing': {
'kims_ms': kims_time,
'ai_ms': ai_time,
'total_ms': total_time
}
})
except Exception as e:
logging.error(f"PAAI 분석 오류: {e}")
if log_id:
update_error(log_id, str(e))
return jsonify({'success': False, 'error': str(e)}), 500
def analyze_prescription_changes(current: list, previous: list) -> dict:
"""처방 변화 분석"""
current_codes = {m.get('code'): m for m in current}
previous_codes = {m.get('code'): m for m in previous}
added = []
removed = []
changed = []
same = []
# 추가된 약품
for code, med in current_codes.items():
if code not in previous_codes:
added.append(med)
else:
# 변경 여부 확인
prev_med = previous_codes[code]
changes = []
for field in ['dosage', 'frequency', 'days']:
if med.get(field) != prev_med.get(field):
changes.append({
'field': field,
'from': prev_med.get(field),
'to': med.get(field)
})
if changes:
changed.append({'medication': med, 'changes': changes})
else:
same.append(med)
# 중단된 약품
for code, med in previous_codes.items():
if code not in current_codes:
removed.append(med)
return {
'added': added,
'removed': removed,
'changed': changed,
'same': same
}
def build_paai_prompt(
disease_info: dict,
current_medications: list,
prescription_changes: dict,
kims_interactions: list,
otc_history: dict
) -> str:
"""AI 프롬프트 생성"""
# 질병 정보
diseases = []
if disease_info.get('code_1'):
diseases.append(f"[{disease_info['code_1']}] {disease_info.get('name_1', '')}")
if disease_info.get('code_2'):
diseases.append(f"[{disease_info['code_2']}] {disease_info.get('name_2', '')}")
# 현재 처방
med_lines = []
for med in current_medications:
line = f"- {med.get('name', '?')}: {med.get('dosage', 0)}× {med.get('frequency', 0)}× {med.get('days', 0)}"
med_lines.append(line)
# 처방 변화
change_lines = []
if prescription_changes.get('added'):
names = [m.get('name', '?') for m in prescription_changes['added']]
change_lines.append(f"- 추가: {', '.join(names)}")
if prescription_changes.get('removed'):
names = [m.get('name', '?') for m in prescription_changes['removed']]
change_lines.append(f"- 중단: {', '.join(names)}")
if prescription_changes.get('changed'):
for item in prescription_changes['changed']:
med = item['medication']
changes = item['changes']
change_desc = ', '.join([f"{c['field']}: {c['from']}{c['to']}" for c in changes])
change_lines.append(f"- 변경: {med.get('name', '?')} ({change_desc})")
# KIMS 상호작용
kims_lines = []
for inter in kims_interactions:
severity = inter.get('Severity', 5)
severity_text = {1: '🔴 금기', 2: '🟠 경고', 3: '🟡 주의', 4: '참고', 5: '정보'}.get(int(severity), '정보')
drug1 = inter.get('Drug1Name', '?')
drug2 = inter.get('Drug2Name', '?')
desc = inter.get('InteractionDesc', '')[:100]
kims_lines.append(f"- [{severity_text}] {drug1} + {drug2}: {desc}")
# OTC 이력
otc_lines = []
if otc_history.get('frequent_items'):
for item in otc_history['frequent_items'][:5]:
otc_lines.append(f"- {item.get('name', '?')} ({item.get('count', 0)}회 구매)")
prompt = f"""당신은 약사를 보조하는 AI입니다. 환자 정보와 KIMS 상호작용 데이터를 바탕으로 분석해주세요.
## 환자 질병
{chr(10).join(diseases) if diseases else '- 정보 없음'}
## 현재 처방
{chr(10).join(med_lines) if med_lines else '- 정보 없음'}
## 처방 변화 (vs 이전)
{chr(10).join(change_lines) if change_lines else '- 변화 없음'}
## KIMS 약물 상호작용
{chr(10).join(kims_lines) if kims_lines else '- 상호작용 없음'}
## OTC 구매 이력
{chr(10).join(otc_lines) if otc_lines else '- 이력 없음'}
## 요청
다음 형식의 JSON으로 간결하게 답변해주세요:
{{
"prescription_insight": "처방 변화에 대한 분석 (1-2문장)",
"kims_analysis": "KIMS 상호작용 해석 및 임상적 의미 (1-2문장)",
"cautions": ["복용 주의사항 1", "복용 주의사항 2"],
"otc_recommendations": [
{{"product": "추천 OTC명", "reason": "추천 이유 (현재 처방과 상호작용 없음 확인)"}}
],
"counseling_points": ["상담 포인트 1", "상담 포인트 2"]
}}
"""
return prompt
def call_clawdbot_ai(prompt: str) -> dict:
"""Clawdbot AI 호출 (WebSocket Gateway)"""
import json
import re
from services.clawdbot_client import ask_clawdbot
PAAI_SYSTEM_PROMPT = """당신은 경험 많은 약사입니다.
처방 데이터를 분석하여 약사에게 유용한 정보를 제공합니다.
반드시 요청된 JSON 형식으로만 응답하세요."""
try:
# Clawdbot Gateway WebSocket API 호출
ai_text = ask_clawdbot(
message=prompt,
session_id='paai-analysis',
system_prompt=PAAI_SYSTEM_PROMPT,
timeout=60,
model='anthropic/claude-sonnet-4-5' # 빠른 Sonnet 사용
)
if not ai_text:
logging.warning("[PAAI] Clawdbot 응답 없음")
return generate_fallback_response(prompt)
# JSON 블록 추출
try:
json_match = re.search(r'\{[\s\S]*\}', ai_text)
if json_match:
return json.loads(json_match.group())
except Exception as parse_err:
logging.warning(f"[PAAI] JSON 파싱 실패: {parse_err}")
# JSON 파싱 실패 시 텍스트 그대로 반환
return {
'prescription_insight': ai_text[:500] if ai_text else '분석 결과 없음',
'kims_analysis': '',
'cautions': [],
'otc_recommendations': [],
'counseling_points': []
}
except Exception as e:
logging.error(f"[PAAI] Clawdbot AI 호출 오류: {e}")
return generate_fallback_response(prompt)
def generate_fallback_response(prompt: str) -> dict:
"""Clawdbot 연결 실패 시 기본 응답"""
return {
'prescription_insight': 'AI 분석 서비스에 연결할 수 없습니다. KIMS 상호작용 정보를 직접 확인해주세요.',
'kims_analysis': '',
'cautions': ['AI 분석 불가 - 직접 검토 필요'],
'otc_recommendations': [],
'counseling_points': [],
'_fallback': True
}
@pmr_bp.route('/api/paai/feedback', methods=['POST'])
def paai_feedback():
"""PAAI 피드백 저장"""
from db.paai_logger import update_feedback
try:
data = request.get_json()
log_id = data.get('log_id')
useful = data.get('useful')
comment = data.get('comment')
if not log_id:
return jsonify({'success': False, 'error': 'log_id 필요'}), 400
update_feedback(log_id, useful, comment)
return jsonify({'success': True})
except Exception as e:
logging.error(f"PAAI 피드백 저장 오류: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
# ────────────────────────────────────────────────────────────────────────────────
# PAAI 어드민 페이지
# ────────────────────────────────────────────────────────────────────────────────
@pmr_bp.route('/admin')
def paai_admin_page():
"""PAAI 어드민 대시보드 페이지"""
return render_template('pmr_admin.html')
@pmr_bp.route('/api/admin/stats')
def paai_admin_stats():
"""PAAI 통계 API"""
from db.paai_logger import get_stats
try:
stats = get_stats()
return jsonify({'success': True, 'stats': stats})
except Exception as e:
logging.error(f"PAAI 통계 조회 오류: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@pmr_bp.route('/api/admin/logs')
def paai_admin_logs():
"""PAAI 로그 목록 API"""
from db.paai_logger import get_recent_logs
try:
limit = request.args.get('limit', 50, type=int)
status = request.args.get('status')
has_severe = request.args.get('has_severe')
date = request.args.get('date')
patient_name = request.args.get('patient_name')
# has_severe 파싱
if has_severe == 'true':
has_severe = True
elif has_severe == 'false':
has_severe = False
else:
has_severe = None
logs = get_recent_logs(
limit=limit,
status=status,
has_severe=has_severe,
date=date
)
# 환자명 필터링 (클라이언트 사이드에서 하기엔 데이터가 많을 수 있음)
if patient_name:
logs = [log for log in logs if patient_name.lower() in (log.get('patient_name') or '').lower()]
return jsonify({'success': True, 'logs': logs, 'count': len(logs)})
except Exception as e:
logging.error(f"PAAI 로그 조회 오류: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@pmr_bp.route('/api/admin/log/<int:log_id>')
def paai_admin_log_detail(log_id):
"""PAAI 로그 상세 API"""
from db.paai_logger import get_log_detail
try:
log = get_log_detail(log_id)
if not log:
return jsonify({'success': False, 'error': '로그를 찾을 수 없습니다'}), 404
return jsonify({'success': True, 'log': log})
except Exception as e:
logging.error(f"PAAI 로그 상세 조회 오류: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@pmr_bp.route('/api/admin/feedback-stats')
def paai_admin_feedback_stats():
"""피드백 통계 API (일별)"""
from db.paai_logger import DB_PATH
import sqlite3
from datetime import datetime, timedelta
try:
if not DB_PATH.exists():
return jsonify({'success': True, 'stats': []})
conn = sqlite3.connect(str(DB_PATH))
cursor = conn.cursor()
# 최근 30일 일별 통계
cursor.execute("""
SELECT
DATE(created_at) as date,
COUNT(*) as total,
SUM(CASE WHEN feedback_useful = 1 THEN 1 ELSE 0 END) as useful,
SUM(CASE WHEN feedback_useful = 0 THEN 1 ELSE 0 END) as not_useful,
SUM(CASE WHEN feedback_useful IS NULL THEN 1 ELSE 0 END) as no_feedback,
SUM(CASE WHEN kims_has_severe = 1 THEN 1 ELSE 0 END) as severe,
AVG(ai_response_time_ms) as avg_ai_time
FROM paai_logs
WHERE created_at >= date('now', '-30 days')
GROUP BY DATE(created_at)
ORDER BY date DESC
""")
rows = cursor.fetchall()
stats = []
for row in rows:
stats.append({
'date': row[0],
'total': row[1],
'useful': row[2] or 0,
'not_useful': row[3] or 0,
'no_feedback': row[4] or 0,
'severe': row[5] or 0,
'avg_ai_time': int(row[6]) if row[6] else 0
})
conn.close()
return jsonify({'success': True, 'stats': stats})
except Exception as e:
logging.error(f"피드백 통계 조회 오류: {e}")
return jsonify({'success': False, 'error': str(e)}), 500