pharmacy-pos-qr-system/backend/pmr_api.py
thug0bin 0d9f4c9a23 fix: 이전 처방에서도 대체조제 원처방(PS_Type=9) 제외
- 현재 처방과 동일하게 PS_Type=9는 목록에서 제외
- 중복 처방처럼 보이는 문제 해결
2026-03-05 20:44:04 +09:00

1821 lines
64 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# pmr_api.py - 조제관리(PMR) Blueprint API
# PharmaIT3000 MSSQL 연동 (192.168.0.4)
from flask import Blueprint, jsonify, request, render_template, send_file
import pyodbc
import sqlite3
from pathlib import Path
from datetime import datetime, date
import logging
from PIL import Image, ImageDraw, ImageFont
import io
import base64
import os
pmr_bp = Blueprint('pmr', __name__, url_prefix='/pmr')
# ─────────────────────────────────────────────────────────────
# MSSQL 연결 설정 (PharmaIT3000 - 192.168.0.4)
# ─────────────────────────────────────────────────────────────
MSSQL_CONFIG = {
'server': '192.168.0.4\\PM2014',
'username': 'sa',
'password': 'tmddls214!%(',
'driver': 'ODBC Driver 17 for SQL Server'
}
def get_mssql_connection(database='PM_PRES'):
"""MSSQL 연결 획득"""
conn_str = (
f"DRIVER={{{MSSQL_CONFIG['driver']}}};"
f"SERVER={MSSQL_CONFIG['server']};"
f"DATABASE={database};"
f"UID={MSSQL_CONFIG['username']};"
f"PWD={MSSQL_CONFIG['password']};"
"TrustServerCertificate=yes;"
"Connection Timeout=10"
)
return pyodbc.connect(conn_str, timeout=10)
def warmup_db_connection():
"""앱 시작 시 DB 연결 미리 생성 (첫 요청 속도 개선)"""
try:
conn = get_mssql_connection('PM_PRES')
conn.cursor().execute("SELECT 1")
conn.close()
logging.info("[PMR] DB 연결 warmup 완료")
except Exception as e:
logging.warning(f"[PMR] DB warmup 실패: {e}")
# 앱 로드 시 warmup 실행
warmup_db_connection()
def enrich_medications(medications: list) -> list:
"""
약품 목록에 성분/분류/상호작용/금기 정보 추가 (PAAI용)
CD_SUNG: 성분 정보
CD_MC: 분류(PRINT_TYPE), 상호작용(INTERACTION), 금기(CONTRA)
"""
if not medications:
return medications
try:
conn = get_mssql_connection('PM_DRUG')
cursor = conn.cursor()
# DrugCode 목록 추출
drug_codes = [m.get('code') or m.get('medication_code') for m in medications if m.get('code') or m.get('medication_code')]
if not drug_codes:
conn.close()
return medications
# 1. CD_MC에서 분류/상호작용/금기 조회
placeholders = ','.join(['?' for _ in drug_codes])
cursor.execute(f"""
SELECT
DRUGCODE,
PRINT_TYPE,
INTERACTION,
CONTRA
FROM CD_MC
WHERE DRUGCODE IN ({placeholders})
""", drug_codes)
mc_info = {}
for row in cursor.fetchall():
mc_info[row.DRUGCODE] = {
'print_type': row.PRINT_TYPE or '',
'interaction': (row.INTERACTION or '')[:500], # 너무 길면 자르기
'contra': (row.CONTRA or '')[:300]
}
# 2. CD_GOODS에서 SUNG_CODE 조회
cursor.execute(f"""
SELECT DrugCode, SUNG_CODE
FROM CD_GOODS
WHERE DrugCode IN ({placeholders}) AND SUNG_CODE IS NOT NULL
""", drug_codes)
sung_codes = {}
for row in cursor.fetchall():
if row.SUNG_CODE:
sung_codes[row.DrugCode] = row.SUNG_CODE
# 3. CD_SUNG에서 성분 정보 조회
components_by_drug = {}
if sung_codes:
unique_sung_codes = list(set(sung_codes.values()))
placeholders2 = ','.join(['?' for _ in unique_sung_codes])
cursor.execute(f"""
SELECT SUNG_CODE, SUNG_HNM
FROM CD_SUNG
WHERE SUNG_CODE IN ({placeholders2})
""", unique_sung_codes)
# SUNG_CODE별 성분 목록
sung_components = {}
for row in cursor.fetchall():
if row.SUNG_CODE not in sung_components:
sung_components[row.SUNG_CODE] = []
sung_components[row.SUNG_CODE].append(row.SUNG_HNM)
# DrugCode별로 매핑
for drug_code, sung_code in sung_codes.items():
components_by_drug[drug_code] = sung_components.get(sung_code, [])
conn.close()
# 4. medications에 정보 추가
for med in medications:
code = med.get('code') or med.get('medication_code')
if code:
# MC 정보
if code in mc_info:
med['print_type'] = mc_info[code]['print_type']
med['interaction_info'] = mc_info[code]['interaction']
med['contra_info'] = mc_info[code]['contra']
# 성분 정보
if code in components_by_drug:
med['components'] = components_by_drug[code]
return medications
except Exception as e:
logging.error(f"[PAAI] Medication enrichment 오류: {e}")
return medications
# ─────────────────────────────────────────────────────────────
# 조제관리 페이지
# ─────────────────────────────────────────────────────────────
@pmr_bp.route('/')
def pmr_index():
"""조제관리 메인 페이지"""
return render_template('pmr.html')
# ─────────────────────────────────────────────────────────────
# API: 날짜별 처방전 목록
# ─────────────────────────────────────────────────────────────
@pmr_bp.route('/api/prescriptions', methods=['GET'])
def get_prescriptions_by_date():
"""
날짜별 처방전 목록 조회
Query Params:
- date: YYYY-MM-DD (기본값: 오늘)
"""
try:
date_str = request.args.get('date', date.today().strftime('%Y-%m-%d'))
# YYYYMMDD 형식으로 변환
date_yyyymmdd = date_str.replace('-', '')
conn = get_mssql_connection('PM_PRES')
cursor = conn.cursor()
cursor.execute("""
SELECT
PreSerial,
Day_Serial,
PassDay,
Indate,
Paname,
PaNum,
CusCode,
InsName,
Drname,
PresTime,
PreGubun,
PRICE_T,
PRICE_P,
PRICE_C
FROM PS_MAIN
WHERE Indate = ?
ORDER BY Day_Serial ASC
""", (date_yyyymmdd,))
prescriptions = []
for row in cursor.fetchall():
# 주민번호에서 나이/성별 추출
panum = row.PaNum or ''
age = None
gender = None
if len(panum) >= 7:
try:
birth_year = int(panum[:2])
gender_code = panum[6] if len(panum) > 6 else ''
# 성별 및 세기 판단
if gender_code in ['1', '2', '5', '6']:
birth_year += 1900
elif gender_code in ['3', '4', '7', '8']:
birth_year += 2000
else:
birth_year += 1900
gender = '' if gender_code in ['1', '3', '5', '7'] else ''
age = datetime.now().year - birth_year
except:
pass
prescriptions.append({
'prescription_id': row.PreSerial,
'order_number': row.Day_Serial,
'date': row.Indate, # 조제일 기준
'issue_date': row.PassDay, # 처방전 발행일
'dispense_date': row.Indate, # 조제일
'patient_name': row.Paname,
'patient_id': row.PaNum[:6] + '******' if row.PaNum and len(row.PaNum) > 6 else row.PaNum,
'patient_code': row.CusCode,
'hospital': row.InsName,
'doctor': row.Drname,
'time': row.PresTime,
'type': '급여' if row.PreGubun == '0' else '비급여' if row.PreGubun == '9' else row.PreGubun,
'age': age,
'gender': gender,
'price_total': row.PRICE_T,
'price_patient': row.PRICE_P,
'price_claim': row.PRICE_C
})
conn.close()
return jsonify({
'success': True,
'date': date_str,
'count': len(prescriptions),
'prescriptions': prescriptions
})
except Exception as e:
logging.error(f"PMR 처방전 목록 조회 오류: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
# ─────────────────────────────────────────────────────────────
# API: 처방전 상세 (약품 목록)
# ─────────────────────────────────────────────────────────────
@pmr_bp.route('/api/prescription/<prescription_id>', methods=['GET'])
def get_prescription_detail(prescription_id):
"""
처방전 상세 정보 (약품 목록 포함)
"""
try:
conn = get_mssql_connection('PM_PRES')
cursor = conn.cursor()
# 처방전 기본 정보 + 질병정보
cursor.execute("""
SELECT
M.PreSerial,
M.Day_Serial,
M.PassDay,
M.Paname,
M.PaNum,
M.CusCode,
M.InsName,
M.Drname,
M.PresTime,
M.PreGubun,
M.PRICE_T,
M.PRICE_P,
M.PRICE_C,
M.St1,
M.St2,
S1.AU_NAME,
S2.AU_NAME
FROM PS_MAIN M
LEFT JOIN PM_BASE.dbo.CD_SANG S1 ON M.St1 = S1.AU_CODE
LEFT JOIN PM_BASE.dbo.CD_SANG S2 ON M.St2 = S2.AU_CODE
WHERE M.PreSerial = ?
""", (prescription_id,))
rx_row = cursor.fetchone()
if not rx_row:
conn.close()
return jsonify({'success': False, 'error': '처방전을 찾을 수 없습니다'}), 404
# 처방 약품 목록 (PS_sub_pharm + CD_GOODS + CD_MC JOIN)
# PS_Type: 0,1=일반, 4=대체조제(실제), 9=대체조제(원본)
medications = []
original_prescriptions = {} # PS_Type=9인 원본 처방 저장
cursor.execute("""
SELECT
s.DrugCode,
s.Days,
s.QUAN,
s.QUAN_TIME,
s.PS_Type,
s.INV_QUAN,
s.SUB_SERIAL,
s.UnitCode,
g.GoodsName,
g.SUNG_CODE,
m.PRINT_TYPE,
m.SIM_EFFECT
FROM PS_sub_pharm s
LEFT JOIN PM_DRUG.dbo.CD_GOODS g ON s.DrugCode = g.DrugCode
LEFT JOIN PM_DRUG.dbo.CD_MC m ON s.DrugCode = m.DRUGCODE
WHERE s.PreSerial = ?
ORDER BY s.SUB_SERIAL
""", (prescription_id,))
all_rows = cursor.fetchall()
# 1차: PS_Type=9 (원본 처방) 수집 - 인덱스로 저장
for i, row in enumerate(all_rows):
if row.PS_Type == '9':
original_prescriptions[i] = {
'drug_code': row.DrugCode or '',
'drug_name': row.GoodsName or row.DrugCode or '',
'add_info': row.PRINT_TYPE or row.SIM_EFFECT or ''
}
# 2차: 실제 조제약만 추가 (PS_Type != 9)
for i, row in enumerate(all_rows):
if row.PS_Type == '9':
continue # 원본 처방은 스킵
# 효능: PRINT_TYPE > SIM_EFFECT > 없음
add_info = row.PRINT_TYPE or row.SIM_EFFECT or ''
# 대체조제 여부 확인: PS_Type=4이고 바로 다음이 PS_Type=9
# 순서: 4(대체) → 9(원본)
is_substituted = row.PS_Type == '4' and (i + 1) in original_prescriptions
original_drug = original_prescriptions.get(i + 1) if is_substituted else None
# UnitCode: 1=보험, 2=비보험, 3=100/100, 4~7=급여(본인부담률)
unit_code = int(row.UnitCode) if row.UnitCode else 1
medications.append({
'medication_code': row.DrugCode or '',
'med_name': row.GoodsName or row.DrugCode or '',
'add_info': add_info,
'dosage': float(row.QUAN) if row.QUAN else 0,
'frequency': row.QUAN_TIME or 0,
'duration': row.Days or 0,
'total_qty': float(row.INV_QUAN) if row.INV_QUAN else 0,
'type': '급여' if row.PS_Type in ['0', '4'] else '비급여' if row.PS_Type == '1' else row.PS_Type,
'sung_code': row.SUNG_CODE or '',
'ps_type': row.PS_Type or '0',
'unit_code': unit_code,
'is_substituted': is_substituted,
'original_drug': original_drug
})
# 나이/성별 계산
panum = rx_row.PaNum or ''
age = None
gender = None
if len(panum) >= 7:
try:
birth_year = int(panum[:2])
gender_code = panum[6]
if gender_code in ['1', '2', '5', '6']:
birth_year += 1900
elif gender_code in ['3', '4', '7', '8']:
birth_year += 2000
gender = '' if gender_code in ['1', '3', '5', '7'] else ''
age = datetime.now().year - birth_year
except:
pass
# 질병 정보 (St1, St2 → AU_NAME)
disease_info = None
st1 = rx_row[13] or '' # St1
st2 = rx_row[14] or '' # St2
disease_name_1 = rx_row[15] or '' # S1.AU_NAME
disease_name_2 = rx_row[16] or '' # S2.AU_NAME
if st1 or st2:
disease_info = {
'code_1': st1,
'code_2': st2,
'name_1': disease_name_1,
'name_2': disease_name_2
}
# 환자 특이사항(CUSETC) 조회 - CD_PERSON 테이블
cusetc = ''
cus_code = rx_row.CusCode
if cus_code:
try:
# PM_BASE.dbo.CD_PERSON에서 조회
cursor.execute("""
SELECT CUSETC FROM PM_BASE.dbo.CD_PERSON WHERE CUSCODE = ?
""", (cus_code,))
person_row = cursor.fetchone()
if person_row and person_row.CUSETC:
cusetc = person_row.CUSETC
except Exception as e:
logging.warning(f"특이사항 조회 실패: {e}")
conn.close()
return jsonify({
'success': True,
'prescription': {
'prescription_id': rx_row.PreSerial,
'order_number': rx_row.Day_Serial,
'date': rx_row.PassDay,
'time': rx_row.PresTime,
'hospital': rx_row.InsName,
'doctor': rx_row.Drname,
'type': '급여' if rx_row.PreGubun == '0' else '비급여',
'price_total': rx_row.PRICE_T,
'price_patient': rx_row.PRICE_P
},
'patient': {
'name': rx_row.Paname,
'code': rx_row.CusCode,
'cus_code': rx_row.CusCode, # 호환성
'age': age,
'gender': gender,
'cusetc': cusetc # 특이사항
},
'disease_info': disease_info,
'medications': medications,
'medication_count': len(medications)
})
except Exception as e:
logging.error(f"PMR 처방전 상세 조회 오류: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
# ─────────────────────────────────────────────────────────────
# API: 통계 (당일 요약)
# ─────────────────────────────────────────────────────────────
@pmr_bp.route('/api/stats', methods=['GET'])
def get_daily_stats():
"""당일 조제 통계"""
try:
date_str = request.args.get('date', date.today().strftime('%Y-%m-%d'))
date_yyyymmdd = date_str.replace('-', '')
conn = get_mssql_connection('PM_PRES')
cursor = conn.cursor()
# 처방전 수
cursor.execute("""
SELECT COUNT(*) as cnt
FROM PS_MAIN
WHERE PassDay = ?
""", (date_yyyymmdd,))
total_prescriptions = cursor.fetchone()[0]
# 총 금액
cursor.execute("""
SELECT
ISNULL(SUM(PRICE_T), 0) as total,
ISNULL(SUM(PRICE_P), 0) as patient,
ISNULL(SUM(PRICE_C), 0) as claim
FROM PS_MAIN
WHERE PassDay = ?
""", (date_yyyymmdd,))
price_row = cursor.fetchone()
conn.close()
return jsonify({
'success': True,
'date': date_str,
'stats': {
'total_prescriptions': total_prescriptions,
'total_amount': price_row[0] if price_row else 0,
'patient_amount': price_row[1] if price_row else 0,
'claim_amount': price_row[2] if price_row else 0
}
})
except Exception as e:
logging.error(f"PMR 통계 조회 오류: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
# ─────────────────────────────────────────────────────────────
# API: DB 연결 테스트
# ─────────────────────────────────────────────────────────────
@pmr_bp.route('/api/test', methods=['GET'])
def test_connection():
"""DB 연결 테스트"""
try:
conn = get_mssql_connection('PM_PRES')
cursor = conn.cursor()
cursor.execute("SELECT @@VERSION")
version = cursor.fetchone()[0]
conn.close()
return jsonify({
'success': True,
'server': MSSQL_CONFIG['server'],
'version': version[:100] + '...'
})
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 500
# ─────────────────────────────────────────────────────────────
# API: 라벨 미리보기
# ─────────────────────────────────────────────────────────────
@pmr_bp.route('/api/label/preview', methods=['POST'])
def preview_label():
"""
라벨 미리보기 (PIL 렌더링 → Base64 이미지)
Request Body:
- patient_name: 환자명
- med_name: 약품명
- dosage: 1회 복용량
- frequency: 복용 횟수
- duration: 복용 일수
- unit: 단위 (정, 캡슐, mL 등)
"""
try:
data = request.get_json()
patient_name = data.get('patient_name', '')
med_name = data.get('med_name', '')
add_info = data.get('add_info', '')
dosage = float(data.get('dosage', 0))
frequency = int(data.get('frequency', 0))
duration = int(data.get('duration', 0))
unit = data.get('unit', '')
# 라벨 이미지 생성
image = create_label_image(
patient_name=patient_name,
med_name=med_name,
add_info=add_info,
dosage=dosage,
frequency=frequency,
duration=duration,
unit=unit
)
# Base64 인코딩
buffer = io.BytesIO()
image.save(buffer, format='PNG')
buffer.seek(0)
img_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8')
return jsonify({
'success': True,
'image': f'data:image/png;base64,{img_base64}'
})
except Exception as e:
logging.error(f"라벨 미리보기 오류: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
def create_label_image(patient_name, med_name, add_info='', dosage=0, frequency=0, duration=0, unit=''):
"""
라벨 이미지 생성 (29mm 용지 기준)
"""
# 라벨 크기 (29mm 용지, 300dpi 기준)
label_width = 306
label_height = 380
image = Image.new("RGB", (label_width, label_height), "white")
draw = ImageDraw.Draw(image)
# 폰트 설정 (Windows 경로)
font_path = "C:/Windows/Fonts/malgunbd.ttf"
if not os.path.exists(font_path):
font_path = "C:/Windows/Fonts/malgun.ttf"
try:
name_font = ImageFont.truetype(font_path, 36)
drug_font = ImageFont.truetype(font_path, 24)
info_font = ImageFont.truetype(font_path, 22)
small_font = ImageFont.truetype(font_path, 18)
except:
name_font = ImageFont.load_default()
drug_font = ImageFont.load_default()
info_font = ImageFont.load_default()
small_font = ImageFont.load_default()
# 중앙 정렬 텍스트 함수
def draw_centered(text, y, font, fill="black"):
bbox = draw.textbbox((0, 0), text, font=font)
w = bbox[2] - bbox[0]
x = (label_width - w) // 2
draw.text((x, y), text, font=font, fill=fill)
return y + bbox[3] - bbox[1] + 5
# 약품명 줄바꿈 처리
def wrap_text(text, font, max_width):
lines = []
words = text.split()
current_line = ""
for word in words:
test_line = f"{current_line} {word}".strip()
bbox = draw.textbbox((0, 0), test_line, font=font)
if bbox[2] - bbox[0] <= max_width:
current_line = test_line
else:
if current_line:
lines.append(current_line)
current_line = word
if current_line:
lines.append(current_line)
return lines if lines else [text]
y = 15
# 환자명 (띄어쓰기)
spaced_name = " ".join(patient_name) if patient_name else ""
y = draw_centered(spaced_name, y, name_font)
y += 5
# 약품명 (줄바꿈)
# 괄호 앞에서 분리
if '(' in med_name:
main_name = med_name.split('(')[0].strip()
else:
main_name = med_name
# 약품명 줄바꿈
name_lines = wrap_text(main_name, drug_font, label_width - 30)
for line in name_lines:
y = draw_centered(line, y, drug_font)
# 효능효과 (add_info)
if add_info:
y = draw_centered(f"({add_info})", y, small_font, fill="gray")
y += 5
# 총량 계산
if dosage > 0 and frequency > 0 and duration > 0:
total = dosage * frequency * duration
total_str = str(int(total)) if total == int(total) else f"{total:.1f}"
total_text = f"{total_str}{unit} / {duration}일분"
y = draw_centered(total_text, y, info_font)
y += 5
# 용법 박스
box_margin = 20
box_top = y
box_bottom = y + 70
draw.rectangle(
[(box_margin, box_top), (label_width - box_margin, box_bottom)],
outline="black",
width=2
)
# 박스 내용
dosage_str = str(int(dosage)) if dosage == int(dosage) else f"{dosage:.2f}".rstrip('0').rstrip('.')
dosage_text = f"{dosage_str}{unit}"
# 복용 시간
if frequency == 1:
time_text = "아침"
elif frequency == 2:
time_text = "아침, 저녁"
elif frequency == 3:
time_text = "아침, 점심, 저녁"
else:
time_text = f"1일 {frequency}"
box_center_y = (box_top + box_bottom) // 2
draw_centered(dosage_text, box_center_y - 20, info_font)
draw_centered(time_text, box_center_y + 5, info_font)
y = box_bottom + 10
# 조제일
today = datetime.now().strftime('%Y-%m-%d')
y = draw_centered(f"조제일: {today}", y, small_font)
# 약국명 (하단)
pharmacy_y = label_height - 40
draw.rectangle(
[(50, pharmacy_y - 5), (label_width - 50, pharmacy_y + 25)],
outline="black",
width=1
)
draw_centered("청 춘 약 국", pharmacy_y, info_font)
return image
# ─────────────────────────────────────────────────────────────
# API: 환자 이전 처방 이력
# ─────────────────────────────────────────────────────────────
@pmr_bp.route('/api/patient/<cus_code>/history', methods=['GET'])
def get_patient_history(cus_code):
"""
환자 이전 처방 이력 조회
Args:
cus_code: 환자 고유코드 (CusCode)
Query Params:
- limit: 최대 조회 건수 (기본 10, 최대 50)
- exclude: 제외할 처방번호 (현재 처방)
"""
try:
limit = min(int(request.args.get('limit', 10)), 50)
exclude_serial = request.args.get('exclude', '')
conn = get_mssql_connection('PM_PRES')
cursor = conn.cursor()
# 이전 처방 목록 조회
query = """
SELECT TOP (?)
m.PreSerial,
m.PassDay,
m.PresTime,
m.Paname,
m.InsName,
m.Drname,
m.PRICE_P,
(SELECT COUNT(*) FROM PS_sub_pharm WHERE PreSerial = m.PreSerial) as drug_count
FROM PS_MAIN m
WHERE m.CusCode = ?
"""
params = [limit, cus_code]
if exclude_serial:
query += " AND m.PreSerial != ?"
params.append(exclude_serial)
query += " ORDER BY m.PassDay DESC, m.PresTime DESC"
cursor.execute(query, params)
history = []
for row in cursor.fetchall():
pre_serial = row.PreSerial
# 해당 처방의 약품 목록 조회
# PS_Type=9 (대체조제 원처방)는 제외
cursor.execute("""
SELECT
s.DrugCode,
s.Days,
s.QUAN,
s.QUAN_TIME,
s.PS_Type,
g.GoodsName,
m.PRINT_TYPE
FROM PS_sub_pharm s
LEFT JOIN PM_DRUG.dbo.CD_GOODS g ON s.DrugCode = g.DrugCode
LEFT JOIN PM_DRUG.dbo.CD_MC m ON s.DrugCode = m.DRUGCODE
WHERE s.PreSerial = ?
AND (s.PS_Type IS NULL OR s.PS_Type != '9')
ORDER BY s.SUB_SERIAL
""", (pre_serial,))
medications = []
for med_row in cursor.fetchall():
medications.append({
'medication_code': med_row.DrugCode or '',
'med_name': med_row.GoodsName or med_row.DrugCode or '',
'add_info': med_row.PRINT_TYPE or '',
'dosage': float(med_row.QUAN) if med_row.QUAN else 0,
'frequency': med_row.QUAN_TIME or 0,
'duration': med_row.Days or 0,
'ps_type': med_row.PS_Type or '0'
})
# 날짜 포맷
pass_day = row.PassDay
if pass_day and len(pass_day) == 8:
date_formatted = f"{pass_day[:4]}-{pass_day[4:6]}-{pass_day[6:8]}"
else:
date_formatted = pass_day or ''
history.append({
'prescription_id': pre_serial,
'date': date_formatted,
'time': row.PresTime or '',
'patient_name': row.Paname or '',
'hospital': row.InsName or '',
'doctor': row.Drname or '',
'copayment': int(row.PRICE_P or 0),
'medication_count': row.drug_count or 0,
'medications': medications
})
conn.close()
return jsonify({
'success': True,
'cus_code': cus_code,
'count': len(history),
'history': history
})
except Exception as e:
logging.error(f"환자 이전 처방 조회 오류: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
# ─────────────────────────────────────────────────────────────
# API: 환자 OTC 구매 이력
# ─────────────────────────────────────────────────────────────
@pmr_bp.route('/api/patient/<cus_code>/otc', methods=['GET'])
def get_patient_otc_history(cus_code):
"""
환자 OTC (일반의약품) 구매 이력 조회
Args:
cus_code: 환자 고유코드 (CusCode = SL_CD_custom)
Query Params:
- limit: 최대 조회 건수 (기본 20, 최대 100)
"""
try:
limit = min(int(request.args.get('limit', 20)), 100)
conn = get_mssql_connection('PM_PRES')
cursor = conn.cursor()
# ✅ 최적화: 한번의 쿼리로 거래 + 품목 모두 조회 (JOIN)
cursor.execute("""
SELECT
m.SL_NO_order,
m.SL_DT_appl,
m.InsertTime,
m.SL_MY_sale,
m.SL_NM_custom,
s.DrugCode,
g.GoodsName,
s.SL_NM_item,
s.SL_TOTAL_PRICE,
mc.PRINT_TYPE
FROM (
SELECT TOP (?) *
FROM SALE_MAIN
WHERE SL_CD_custom = ? AND PRESERIAL = 'V'
ORDER BY InsertTime DESC
) m
LEFT JOIN SALE_SUB s ON m.SL_NO_order = s.SL_NO_order
LEFT JOIN PM_DRUG.dbo.CD_GOODS g ON s.DrugCode = g.DrugCode
LEFT JOIN PM_DRUG.dbo.CD_MC mc ON s.DrugCode = mc.DRUGCODE
ORDER BY m.InsertTime DESC, s.DrugCode
""", (limit, cus_code))
# 결과를 order_no별로 그룹핑
orders_dict = {}
all_drug_codes = []
for row in cursor.fetchall():
order_no = row.SL_NO_order
if order_no not in orders_dict:
orders_dict[order_no] = {
'order_no': order_no,
'date': row.SL_DT_appl,
'datetime': row.InsertTime.strftime('%Y-%m-%d %H:%M') if row.InsertTime else '',
'amount': int(row.SL_MY_sale or 0),
'customer_name': row.SL_NM_custom or '',
'items': []
}
# 품목 추가 (DrugCode가 있는 경우만)
if row.DrugCode:
drug_code = row.DrugCode
all_drug_codes.append(drug_code)
orders_dict[order_no]['items'].append({
'drug_code': drug_code,
'name': row.GoodsName or drug_code,
'quantity': int(row.SL_NM_item or 0),
'price': int(row.SL_TOTAL_PRICE or 0),
'category': row.PRINT_TYPE or '',
'image': None
})
conn.close()
# dict → list 변환
purchases = list(orders_dict.values())
for p in purchases:
p['item_count'] = len(p['items'])
if not purchases:
return jsonify({
'success': True,
'cus_code': cus_code,
'count': 0,
'purchases': []
})
# 제품 이미지 조회 (product_images.db)
image_map = {}
try:
img_db_path = Path(__file__).parent / 'db' / 'product_images.db'
if img_db_path.exists() and all_drug_codes:
img_conn = sqlite3.connect(str(img_db_path))
img_cursor = img_conn.cursor()
placeholders = ','.join(['?' for _ in all_drug_codes])
img_cursor.execute(f'''
SELECT drug_code, thumbnail_base64
FROM product_images
WHERE drug_code IN ({placeholders}) AND thumbnail_base64 IS NOT NULL
''', all_drug_codes)
for row in img_cursor.fetchall():
image_map[row[0]] = row[1]
img_conn.close()
except Exception as img_err:
logging.warning(f"제품 이미지 조회 오류: {img_err}")
# 이미지 매핑
for purchase in purchases:
for item in purchase['items']:
if item['drug_code'] in image_map:
item['image'] = image_map[item['drug_code']]
# 통계 계산
total_amount = sum(p['amount'] for p in purchases)
total_visits = len(purchases)
# 자주 구매하는 품목 집계
item_freq = {}
for p in purchases:
for item in p['items']:
key = item['drug_code']
if key not in item_freq:
item_freq[key] = {
'name': item['name'],
'category': item['category'],
'count': 0,
'total_qty': 0
}
item_freq[key]['count'] += 1
item_freq[key]['total_qty'] += item['quantity']
# 빈도순 정렬
frequent_items = sorted(item_freq.values(), key=lambda x: x['count'], reverse=True)[:10]
return jsonify({
'success': True,
'cus_code': cus_code,
'count': len(purchases),
'summary': {
'total_visits': total_visits,
'total_amount': total_amount,
'frequent_items': frequent_items
},
'purchases': purchases
})
except Exception as e:
logging.error(f"환자 OTC 구매 이력 조회 오류: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
# ─────────────────────────────────────────────────────────────
# PAAI (Pharmacist Assistant AI) API
# ─────────────────────────────────────────────────────────────
@pmr_bp.route('/api/paai/result/<pre_serial>', methods=['GET'])
def paai_get_result(pre_serial):
"""
PAAI 분석 결과 조회 (캐시)
Response:
- 결과 있음: {exists: true, status: "success", result: {...}, log_id: 123}
- 생성 중: {exists: true, status: "generating", log_id: 123}
- 없음: {exists: false}
"""
import sqlite3
import json
from pathlib import Path
try:
db_path = Path(__file__).parent / 'db' / 'paai_logs.db'
if not db_path.exists():
return jsonify({'exists': False})
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# 해당 처방의 최신 성공 로그 조회
cursor.execute('''
SELECT id, status, created_at, patient_name,
disease_name_1, disease_name_2,
current_med_count, kims_interaction_count, kims_has_severe,
kims_response_time_ms, ai_response_time_ms,
ai_response
FROM paai_logs
WHERE pre_serial = ?
ORDER BY id DESC
LIMIT 1
''', (pre_serial,))
row = cursor.fetchone()
conn.close()
if not row:
return jsonify({'exists': False})
status = row['status']
# 생성 중 상태
if status in ('pending', 'kims_done', 'generating'):
return jsonify({
'exists': True,
'status': 'generating',
'log_id': row['id']
})
# 에러 상태
if status == 'error':
return jsonify({
'exists': True,
'status': 'error',
'log_id': row['id']
})
# 성공 상태 - 전체 결과 반환
ai_response = {}
if row['ai_response']:
try:
ai_response = json.loads(row['ai_response'])
except:
pass
return jsonify({
'exists': True,
'status': 'success',
'log_id': row['id'],
'result': {
'created_at': row['created_at'],
'patient_name': row['patient_name'],
'disease_name_1': row['disease_name_1'],
'disease_name_2': row['disease_name_2'],
'medication_count': row['current_med_count'],
'kims_summary': {
'interaction_count': row['kims_interaction_count'],
'has_severe': bool(row['kims_has_severe'])
},
'timing': {
'kims_ms': row['kims_response_time_ms'],
'ai_ms': row['ai_response_time_ms'],
'total_ms': (row['kims_response_time_ms'] or 0) + (row['ai_response_time_ms'] or 0)
},
'analysis': ai_response
}
})
except Exception as e:
logging.error(f"PAAI 결과 조회 오류: {e}")
return jsonify({'exists': False, 'error': str(e)})
@pmr_bp.route('/api/paai/analyze', methods=['POST'])
def paai_analyze():
"""
PAAI 분석 API
Request:
{
"pre_serial": "P20260304001",
"cus_code": "00001234",
"patient_name": "홍길동",
"disease_info": {
"code_1": "M170", "name_1": "무릎골관절염",
"code_2": "K299", "name_2": "상세불명의 위십이지장염"
},
"current_medications": [
{"code": "055101150", "name": "록소프로펜정", "dosage": 1, "frequency": 3, "days": 7}
],
"previous_serial": "P20260225001",
"previous_medications": [...],
"otc_history": {...}
}
"""
import requests as http_requests
import time as time_module
from db.paai_logger import create_log, update_kims_result, update_ai_result, update_error
start_time = time_module.time()
log_id = None
try:
data = request.get_json()
if not data:
return jsonify({'success': False, 'error': '요청 데이터가 없습니다.'}), 400
pre_serial = data.get('pre_serial')
cus_code = data.get('cus_code')
patient_name = data.get('patient_name')
patient_note = data.get('patient_note', '') # 환자 특이사항 (알러지, 기저질환 등)
disease_info = data.get('disease_info', {})
current_medications = data.get('current_medications', [])
previous_serial = data.get('previous_serial')
previous_medications = data.get('previous_medications', [])
otc_history = data.get('otc_history', {})
# ✅ 약품 정보 Enrichment (성분/분류/상호작용/금기)
current_medications = enrich_medications(current_medications)
# 처방 변화 분석
prescription_changes = analyze_prescription_changes(
current_medications, previous_medications
)
# 1. 로그 생성
log_id = create_log(
pre_serial=pre_serial,
patient_code=cus_code,
patient_name=patient_name,
disease_code_1=disease_info.get('code_1'),
disease_name_1=disease_info.get('name_1'),
disease_code_2=disease_info.get('code_2'),
disease_name_2=disease_info.get('name_2'),
current_medications=current_medications,
previous_serial=previous_serial,
previous_medications=previous_medications,
prescription_changes=prescription_changes,
otc_history=otc_history
)
# 2. KD코드 추출 (9자리 KIMS 코드)
kd_codes = []
drug_names = []
for med in current_medications:
code = med.get('code', '')
if code and len(str(code)) == 9:
kd_codes.append(str(code))
drug_names.append(med.get('name', ''))
# 3. KIMS API 호출 (약품 2개 이상인 경우)
kims_interactions = []
kims_start = time_module.time()
if len(kd_codes) >= 2:
try:
kims_url = "https://api2.kims.co.kr/api/interaction/info"
kims_headers = {
'Authorization': 'Basic VFNQTUtSOg==',
'Content-Type': 'application/json',
'Accept': 'application/json; charset=utf-8'
}
kims_payload = {'KDCodes': kd_codes}
kims_response = http_requests.get(
kims_url,
headers=kims_headers,
data=__import__('json').dumps(kims_payload),
timeout=10,
verify=False
)
if kims_response.status_code == 200:
kims_data = kims_response.json()
if kims_data.get('Message') == 'SUCCESS':
kims_interactions = kims_data.get('InteractionList', [])
except Exception as kims_err:
logging.warning(f"KIMS API 오류 (무시하고 계속): {kims_err}")
kims_time = int((time_module.time() - kims_start) * 1000)
# KIMS 결과 로그 업데이트
update_kims_result(
log_id=log_id,
kims_drug_codes=kd_codes,
kims_interactions=kims_interactions,
kims_response_time_ms=kims_time
)
# 4. AI 프롬프트 생성
ai_prompt = build_paai_prompt(
disease_info=disease_info,
current_medications=current_medications,
prescription_changes=prescription_changes,
kims_interactions=kims_interactions,
otc_history=otc_history,
patient_note=patient_note
)
# 5. Clawdbot AI 호출 (WebSocket)
ai_start = time_module.time()
ai_response = call_clawdbot_ai(ai_prompt)
ai_time = int((time_module.time() - ai_start) * 1000)
# AI 결과 로그 업데이트
update_ai_result(
log_id=log_id,
ai_prompt=ai_prompt,
ai_model='claude-sonnet-4',
ai_response=ai_response,
ai_response_time_ms=ai_time
)
total_time = int((time_module.time() - start_time) * 1000)
return jsonify({
'success': True,
'log_id': log_id,
'analysis': ai_response,
'kims_summary': {
'drug_count': len(kd_codes),
'interaction_count': len(kims_interactions),
'has_severe': any(str(i.get('Severity', '5')) in ['1', '2'] for i in kims_interactions)
},
'timing': {
'kims_ms': kims_time,
'ai_ms': ai_time,
'total_ms': total_time
}
})
except Exception as e:
logging.error(f"PAAI 분석 오류: {e}")
if log_id:
update_error(log_id, str(e))
return jsonify({'success': False, 'error': str(e)}), 500
def analyze_prescription_changes(current: list, previous: list) -> dict:
"""처방 변화 분석"""
current_codes = {m.get('code'): m for m in current}
previous_codes = {m.get('code'): m for m in previous}
added = []
removed = []
changed = []
same = []
# 추가된 약품
for code, med in current_codes.items():
if code not in previous_codes:
added.append(med)
else:
# 변경 여부 확인
prev_med = previous_codes[code]
changes = []
for field in ['dosage', 'frequency', 'days']:
if med.get(field) != prev_med.get(field):
changes.append({
'field': field,
'from': prev_med.get(field),
'to': med.get(field)
})
if changes:
changed.append({'medication': med, 'changes': changes})
else:
same.append(med)
# 중단된 약품
for code, med in previous_codes.items():
if code not in current_codes:
removed.append(med)
return {
'added': added,
'removed': removed,
'changed': changed,
'same': same
}
def build_paai_prompt(
disease_info: dict,
current_medications: list,
prescription_changes: dict,
kims_interactions: list,
otc_history: dict,
patient_note: str = ''
) -> str:
"""AI 프롬프트 생성"""
# 질병 정보
diseases = []
if disease_info.get('code_1'):
diseases.append(f"[{disease_info['code_1']}] {disease_info.get('name_1', '')}")
if disease_info.get('code_2'):
diseases.append(f"[{disease_info['code_2']}] {disease_info.get('name_2', '')}")
# 현재 처방 (성분 정보 포함)
med_lines = []
for med in current_medications:
name = med.get('name', '?')
dosage = med.get('dosage', 0)
freq = med.get('frequency', 0)
days = med.get('days', 0)
line = f"- {name}: {dosage}× {freq}× {days}"
# 분류 정보
if med.get('print_type'):
line += f"\n └ 분류: {med['print_type']}"
# 성분 정보
if med.get('components'):
components_str = ', '.join(med['components'][:3]) # 최대 3개
if len(med['components']) > 3:
components_str += f"{len(med['components'])-3}"
line += f"\n └ 성분: {components_str}"
med_lines.append(line)
# 처방 변화
change_lines = []
if prescription_changes.get('added'):
names = [m.get('name', '?') for m in prescription_changes['added']]
change_lines.append(f"- 추가: {', '.join(names)}")
if prescription_changes.get('removed'):
names = [m.get('name', '?') for m in prescription_changes['removed']]
change_lines.append(f"- 중단: {', '.join(names)}")
if prescription_changes.get('changed'):
for item in prescription_changes['changed']:
med = item['medication']
changes = item['changes']
change_desc = ', '.join([f"{c['field']}: {c['from']}{c['to']}" for c in changes])
change_lines.append(f"- 변경: {med.get('name', '?')} ({change_desc})")
# KIMS 상호작용
kims_lines = []
for inter in kims_interactions:
severity = inter.get('Severity', 5)
severity_text = {1: '🔴 금기', 2: '🟠 경고', 3: '🟡 주의', 4: '참고', 5: '정보'}.get(int(severity), '정보')
drug1 = inter.get('Drug1Name', '?')
drug2 = inter.get('Drug2Name', '?')
desc = inter.get('InteractionDesc', '')[:100]
kims_lines.append(f"- [{severity_text}] {drug1} + {drug2}: {desc}")
# OTC 이력
otc_lines = []
if otc_history.get('frequent_items'):
for item in otc_history['frequent_items'][:5]:
otc_lines.append(f"- {item.get('name', '?')} ({item.get('count', 0)}회 구매)")
# 환자 특이사항 (알러지, 기저질환 등)
note_text = patient_note.strip() if patient_note else ''
prompt = f"""당신은 약사를 보조하는 AI입니다. 환자 정보와 KIMS 상호작용 데이터를 바탕으로 분석해주세요.
## 환자 질병
{chr(10).join(diseases) if diseases else '- 정보 없음'}
## 환자 특이사항 (알러지/기저질환/주의사항)
{note_text if note_text else '- 없음'}
## 현재 처방
{chr(10).join(med_lines) if med_lines else '- 정보 없음'}
## 처방 변화 (vs 이전)
{chr(10).join(change_lines) if change_lines else '- 변화 없음'}
## KIMS 약물 상호작용
{chr(10).join(kims_lines) if kims_lines else '- 상호작용 없음'}
## OTC 구매 이력
{chr(10).join(otc_lines) if otc_lines else '- 이력 없음'}
## 요청
다음 형식의 JSON으로 간결하게 답변해주세요:
{{
"prescription_insight": "처방 변화에 대한 분석 (1-2문장)",
"kims_analysis": "KIMS 상호작용 해석 및 임상적 의미 (1-2문장)",
"cautions": ["복용 주의사항 1", "복용 주의사항 2"],
"otc_recommendations": [
{{"product": "추천 OTC명", "reason": "추천 이유 (현재 처방과 상호작용 없음 확인)"}}
],
"counseling_points": ["상담 포인트 1", "상담 포인트 2"]
}}
"""
return prompt
def call_clawdbot_ai(prompt: str) -> dict:
"""Clawdbot AI 호출 (WebSocket Gateway)"""
import json
import re
from services.clawdbot_client import ask_clawdbot
PAAI_SYSTEM_PROMPT = """당신은 경험 많은 약사입니다.
처방 데이터를 분석하여 약사에게 유용한 정보를 제공합니다.
반드시 요청된 JSON 형식으로만 응답하세요."""
try:
# Clawdbot Gateway WebSocket API 호출
ai_text = ask_clawdbot(
message=prompt,
session_id='paai-analysis',
system_prompt=PAAI_SYSTEM_PROMPT,
timeout=60,
model='anthropic/claude-sonnet-4-5' # 빠른 Sonnet 사용
)
if not ai_text:
logging.warning("[PAAI] Clawdbot 응답 없음")
return generate_fallback_response(prompt)
# JSON 블록 추출
try:
json_match = re.search(r'\{[\s\S]*\}', ai_text)
if json_match:
return json.loads(json_match.group())
except Exception as parse_err:
logging.warning(f"[PAAI] JSON 파싱 실패: {parse_err}")
# JSON 파싱 실패 시 텍스트 그대로 반환
return {
'prescription_insight': ai_text[:500] if ai_text else '분석 결과 없음',
'kims_analysis': '',
'cautions': [],
'otc_recommendations': [],
'counseling_points': []
}
except Exception as e:
logging.error(f"[PAAI] Clawdbot AI 호출 오류: {e}")
return generate_fallback_response(prompt)
def generate_fallback_response(prompt: str) -> dict:
"""Clawdbot 연결 실패 시 기본 응답"""
return {
'prescription_insight': 'AI 분석 서비스에 연결할 수 없습니다. KIMS 상호작용 정보를 직접 확인해주세요.',
'kims_analysis': '',
'cautions': ['AI 분석 불가 - 직접 검토 필요'],
'otc_recommendations': [],
'counseling_points': [],
'_fallback': True
}
@pmr_bp.route('/api/paai/feedback', methods=['POST'])
def paai_feedback():
"""PAAI 피드백 저장"""
from db.paai_logger import update_feedback
try:
data = request.get_json()
log_id = data.get('log_id')
useful = data.get('useful')
comment = data.get('comment')
if not log_id:
return jsonify({'success': False, 'error': 'log_id 필요'}), 400
update_feedback(log_id, useful, comment)
return jsonify({'success': True})
except Exception as e:
logging.error(f"PAAI 피드백 저장 오류: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
# ────────────────────────────────────────────────────────────────────────────────
# PAAI 어드민 페이지
# ────────────────────────────────────────────────────────────────────────────────
@pmr_bp.route('/admin')
def paai_admin_page():
"""PAAI 어드민 대시보드 페이지"""
return render_template('pmr_admin.html')
@pmr_bp.route('/api/admin/stats')
def paai_admin_stats():
"""PAAI 통계 API"""
from db.paai_logger import get_stats
try:
stats = get_stats()
return jsonify({'success': True, 'stats': stats})
except Exception as e:
logging.error(f"PAAI 통계 조회 오류: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@pmr_bp.route('/api/admin/logs')
def paai_admin_logs():
"""PAAI 로그 목록 API"""
from db.paai_logger import get_recent_logs
try:
limit = request.args.get('limit', 50, type=int)
status = request.args.get('status')
has_severe = request.args.get('has_severe')
date = request.args.get('date')
patient_name = request.args.get('patient_name')
# has_severe 파싱
if has_severe == 'true':
has_severe = True
elif has_severe == 'false':
has_severe = False
else:
has_severe = None
logs = get_recent_logs(
limit=limit,
status=status,
has_severe=has_severe,
date=date
)
# 환자명 필터링 (클라이언트 사이드에서 하기엔 데이터가 많을 수 있음)
if patient_name:
logs = [log for log in logs if patient_name.lower() in (log.get('patient_name') or '').lower()]
return jsonify({'success': True, 'logs': logs, 'count': len(logs)})
except Exception as e:
logging.error(f"PAAI 로그 조회 오류: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@pmr_bp.route('/api/admin/log/<int:log_id>')
def paai_admin_log_detail(log_id):
"""PAAI 로그 상세 API"""
from db.paai_logger import get_log_detail
try:
log = get_log_detail(log_id)
if not log:
return jsonify({'success': False, 'error': '로그를 찾을 수 없습니다'}), 404
return jsonify({'success': True, 'log': log})
except Exception as e:
logging.error(f"PAAI 로그 상세 조회 오류: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@pmr_bp.route('/api/admin/feedback-stats')
def paai_admin_feedback_stats():
"""피드백 통계 API (일별)"""
from db.paai_logger import DB_PATH
import sqlite3
from datetime import datetime, timedelta
try:
if not DB_PATH.exists():
return jsonify({'success': True, 'stats': []})
conn = sqlite3.connect(str(DB_PATH))
cursor = conn.cursor()
# 최근 30일 일별 통계
cursor.execute("""
SELECT
DATE(created_at) as date,
COUNT(*) as total,
SUM(CASE WHEN feedback_useful = 1 THEN 1 ELSE 0 END) as useful,
SUM(CASE WHEN feedback_useful = 0 THEN 1 ELSE 0 END) as not_useful,
SUM(CASE WHEN feedback_useful IS NULL THEN 1 ELSE 0 END) as no_feedback,
SUM(CASE WHEN kims_has_severe = 1 THEN 1 ELSE 0 END) as severe,
AVG(ai_response_time_ms) as avg_ai_time
FROM paai_logs
WHERE created_at >= date('now', '-30 days')
GROUP BY DATE(created_at)
ORDER BY date DESC
""")
rows = cursor.fetchall()
stats = []
for row in rows:
stats.append({
'date': row[0],
'total': row[1],
'useful': row[2] or 0,
'not_useful': row[3] or 0,
'no_feedback': row[4] or 0,
'severe': row[5] or 0,
'avg_ai_time': int(row[6]) if row[6] else 0
})
conn.close()
return jsonify({'success': True, 'stats': stats})
except Exception as e:
logging.error(f"피드백 통계 조회 오류: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
# ─────────────────────────────────────────────────────────────
# ESC/POS 자동인쇄 API (EUC-KR 텍스트 방식)
# ─────────────────────────────────────────────────────────────
import socket
# 프린터 설정
ESCPOS_PRINTER_IP = "192.168.0.174"
ESCPOS_PRINTER_PORT = 9100
# ESC/POS 명령어
_ESC = b'\x1b'
_INIT = _ESC + b'@' # 프린터 초기화
_CUT = _ESC + b'd\x03' # 피드 + 커트
def _log_print_history(pre_serial, patient_name, success, error=None):
"""인쇄 이력을 파일에 기록"""
try:
log_dir = Path(__file__).parent / 'logs'
log_dir.mkdir(exist_ok=True)
log_file = log_dir / 'print_history.log'
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
status = '✅ 성공' if success else f'❌ 실패: {error}'
line = f"[{timestamp}] {pre_serial} | {patient_name} | {status}\n"
with open(log_file, 'a', encoding='utf-8') as f:
f.write(line)
except Exception as e:
logging.warning(f"인쇄 로그 기록 실패: {e}")
@pmr_bp.route('/api/paai/print', methods=['POST'])
def paai_print():
"""PAAI 분석 결과 ESC/POS 인쇄"""
try:
data = request.get_json()
pre_serial = data.get('pre_serial', '')
patient_name = data.get('patient_name', '')
result = data.get('result', {})
analysis = result.get('analysis', {})
kims_summary = result.get('kims_summary', {})
logging.info(f"[PRINT] 요청 수신: {pre_serial} ({patient_name})")
# 영수증 텍스트 생성
message = _format_paai_receipt(pre_serial, patient_name, analysis, kims_summary)
# 인쇄
success = _print_escpos_text(message)
if success:
logging.info(f"[PRINT] ✅ 완료: {pre_serial} ({patient_name})")
_log_print_history(pre_serial, patient_name, True)
return jsonify({'success': True, 'message': '인쇄 완료'})
else:
logging.error(f"[PRINT] ❌ 프린터 연결 실패: {pre_serial}")
_log_print_history(pre_serial, patient_name, False, '프린터 연결 실패')
return jsonify({'success': False, 'error': '프린터 연결 실패'}), 500
except Exception as e:
logging.error(f"[PRINT] ❌ 오류: {pre_serial} - {e}")
_log_print_history(pre_serial, patient_name, False, str(e))
return jsonify({'success': False, 'error': str(e)}), 500
def _print_escpos_text(message: str) -> bool:
"""ESC/POS 프린터로 텍스트 전송 (EUC-KR)"""
try:
# EUC-KR 인코딩
text_bytes = message.encode('euc-kr', errors='replace')
# 명령어 조합
command = _INIT + text_bytes + b'\n\n\n' + _CUT
# 소켓 전송
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(10)
sock.connect((ESCPOS_PRINTER_IP, ESCPOS_PRINTER_PORT))
sock.sendall(command)
sock.close()
return True
except Exception as e:
logging.error(f"ESC/POS 전송 실패: {e}")
return False
def _format_paai_receipt(pre_serial: str, patient_name: str,
analysis: dict, kims_summary: dict) -> str:
"""PAAI 복약안내 영수증 텍스트 생성 (48자 기준)"""
LINE = "=" * 48
THIN = "-" * 48
now = datetime.now().strftime("%Y-%m-%d %H:%M")
# 헤더
msg = f"\n{LINE}\n"
msg += _center_text("[ PAAI 복약안내 ]") + "\n"
msg += f"{LINE}\n"
# 환자 정보
msg += f"환자: {patient_name}\n"
msg += f"처방번호: {pre_serial}\n"
msg += f"출력: {now}\n"
msg += f"{THIN}\n"
# 상호작용 요약
interaction_count = kims_summary.get('interaction_count', 0)
has_severe = kims_summary.get('has_severe', False)
if has_severe:
msg += "[!!] 중증 상호작용 있음!\n"
elif interaction_count > 0:
msg += f"[!] 약물 상호작용: {interaction_count}\n"
else:
msg += "[V] 상호작용 없음\n"
msg += "\n"
# 처방 해석
insight = analysis.get('prescription_insight', '')
if insight:
msg += f"{THIN}\n"
msg += ">> 처방 해석\n"
for line in _wrap_text(insight, 44):
msg += f" {line}\n"
msg += "\n"
# 복용 주의사항
cautions = analysis.get('cautions', [])
if cautions:
msg += f"{THIN}\n"
msg += ">> 복용 주의사항\n"
for i, caution in enumerate(cautions[:4], 1):
first_line = True
for line in _wrap_text(f"{i}. {caution}", 44):
if first_line:
msg += f" {line}\n"
first_line = False
else:
msg += f" {line}\n"
msg += "\n"
# 상담 포인트
counseling = analysis.get('counseling_points', [])
if counseling:
msg += f"{THIN}\n"
msg += ">> 상담 포인트\n"
for i, point in enumerate(counseling[:3], 1):
first_line = True
for line in _wrap_text(f"{i}. {point}", 44):
if first_line:
msg += f" {line}\n"
first_line = False
else:
msg += f" {line}\n"
msg += "\n"
# OTC 추천
otc_recs = analysis.get('otc_recommendations', [])
if otc_recs:
msg += f"{THIN}\n"
msg += ">> OTC 추천\n"
for rec in otc_recs[:2]:
product = rec.get('product', '')
reason = rec.get('reason', '')
msg += f" - {product}\n"
for line in _wrap_text(reason, 42):
msg += f" {line}\n"
msg += "\n"
# 푸터
msg += f"{LINE}\n"
msg += _center_text("양구청춘약국 PAAI") + "\n"
msg += _center_text("Tel: 033-481-5222") + "\n"
msg += "\n"
return msg
def _center_text(text: str, width: int = 48) -> str:
"""중앙 정렬"""
text_len = len(text)
if text_len >= width:
return text
spaces = (width - text_len) // 2
return " " * spaces + text
def _wrap_text(text: str, width: int = 44) -> list:
"""텍스트 줄바꿈"""
if not text:
return []
lines = []
words = text.split()
current = ""
for word in words:
if len(current) + len(word) + 1 <= width:
current = current + " " + word if current else word
else:
if current:
lines.append(current)
current = word
if current:
lines.append(current)
return lines if lines else [text[:width]]