# pmr_api.py - 조제관리(PMR) Blueprint API # PharmaIT3000 MSSQL 연동 (192.168.0.4) # # ═══════════════════════════════════════════════════════════════ # 📋 주요 함수 가이드 # ═══════════════════════════════════════════════════════════════ # # 🏷️ 라벨 관련: # - normalize_medication_name(med_name) # 약품명 정규화: 밀리그램→mg, 언더스코어 제거 등 # 예: "케이발린캡슐75밀리그램_" → "케이발린캡슐75mg" # # - get_drug_unit(goods_name, sung_code) [utils/drug_unit.py] # SUNG_CODE 기반 단위 판별 # 예: SUNG_CODE "123456TB" → "정" (TB=정제) # FormCode: TB=정, CA/CH/CS=캡슐, SY=mL, GA/GB=포 등 # # - create_label_image(patient_name, med_name, ...) # PIL로 29mm 라벨 이미지 생성 # 지그재그 테두리, 동적 폰트, 복용량 박스 등 # # 📊 SUNG_CODE FormCode 참조 (마지막 2자리): # 정제류: TA, TB, TC, TD, TE, TH, TJ, TR → "정" # 캡슐류: CA, CB, CH, CI, CJ, CS → "캡슐" # 액제류: SS, SY, LQ → "mL" (시럽) # 산제류: GA, GB, GC, GN, PD → "포" # 점안제: EY, OS → "병" 또는 "개" # 외용제: XT, XO, XL → "g" 또는 "개" # # ═══════════════════════════════════════════════════════════════ from flask import Blueprint, jsonify, request, render_template, send_file import pyodbc import sqlite3 from pathlib import Path from datetime import datetime, date import logging from PIL import Image, ImageDraw, ImageFont # Pillow 10+ 호환성 패치 (brother_ql용) if not hasattr(Image, 'ANTIALIAS'): Image.ANTIALIAS = Image.Resampling.LANCZOS import io import base64 import os from utils.drug_unit import get_drug_unit pmr_bp = Blueprint('pmr', __name__, url_prefix='/pmr') # ───────────────────────────────────────────────────────────── # MSSQL 연결 설정 (PharmaIT3000 - 192.168.0.4) # ───────────────────────────────────────────────────────────── MSSQL_CONFIG = { 'server': '192.168.0.4\\PM2014', 'username': 'sa', 'password': 'tmddls214!%(', 'driver': 'ODBC Driver 17 for SQL Server' } def get_mssql_connection(database='PM_PRES'): """MSSQL 연결 획득""" conn_str = ( f"DRIVER={{{MSSQL_CONFIG['driver']}}};" f"SERVER={MSSQL_CONFIG['server']};" f"DATABASE={database};" f"UID={MSSQL_CONFIG['username']};" f"PWD={MSSQL_CONFIG['password']};" "TrustServerCertificate=yes;" "Connection Timeout=10" ) return pyodbc.connect(conn_str, timeout=10) def warmup_db_connection(): """앱 시작 시 DB 연결 미리 생성 (첫 요청 속도 개선)""" try: conn = get_mssql_connection('PM_PRES') conn.cursor().execute("SELECT 1") conn.close() logging.info("[PMR] DB 연결 warmup 완료") except Exception as e: logging.warning(f"[PMR] DB warmup 실패: {e}") # 앱 로드 시 warmup 실행 warmup_db_connection() def enrich_medications(medications: list) -> list: """ 약품 목록에 성분/분류/상호작용/금기 정보 추가 (PAAI용) CD_SUNG: 성분 정보 CD_MC: 분류(PRINT_TYPE), 상호작용(INTERACTION), 금기(CONTRA) """ if not medications: return medications try: conn = get_mssql_connection('PM_DRUG') cursor = conn.cursor() # DrugCode 목록 추출 drug_codes = [m.get('code') or m.get('medication_code') for m in medications if m.get('code') or m.get('medication_code')] if not drug_codes: conn.close() return medications # 1. CD_MC에서 분류/상호작용/금기 조회 placeholders = ','.join(['?' for _ in drug_codes]) cursor.execute(f""" SELECT DRUGCODE, PRINT_TYPE, INTERACTION, CONTRA FROM CD_MC WHERE DRUGCODE IN ({placeholders}) """, drug_codes) mc_info = {} for row in cursor.fetchall(): mc_info[row.DRUGCODE] = { 'print_type': row.PRINT_TYPE or '', 'interaction': (row.INTERACTION or '')[:500], # 너무 길면 자르기 'contra': (row.CONTRA or '')[:300] } # 2. CD_GOODS에서 SUNG_CODE 조회 cursor.execute(f""" SELECT DrugCode, SUNG_CODE FROM CD_GOODS WHERE DrugCode IN ({placeholders}) AND SUNG_CODE IS NOT NULL """, drug_codes) sung_codes = {} for row in cursor.fetchall(): if row.SUNG_CODE: sung_codes[row.DrugCode] = row.SUNG_CODE # 3. CD_SUNG에서 성분 정보 조회 components_by_drug = {} if sung_codes: unique_sung_codes = list(set(sung_codes.values())) placeholders2 = ','.join(['?' for _ in unique_sung_codes]) cursor.execute(f""" SELECT SUNG_CODE, SUNG_HNM FROM CD_SUNG WHERE SUNG_CODE IN ({placeholders2}) """, unique_sung_codes) # SUNG_CODE별 성분 목록 sung_components = {} for row in cursor.fetchall(): if row.SUNG_CODE not in sung_components: sung_components[row.SUNG_CODE] = [] sung_components[row.SUNG_CODE].append(row.SUNG_HNM) # DrugCode별로 매핑 for drug_code, sung_code in sung_codes.items(): components_by_drug[drug_code] = sung_components.get(sung_code, []) conn.close() # 4. medications에 정보 추가 for med in medications: code = med.get('code') or med.get('medication_code') if code: # MC 정보 if code in mc_info: med['print_type'] = mc_info[code]['print_type'] med['interaction_info'] = mc_info[code]['interaction'] med['contra_info'] = mc_info[code]['contra'] # 성분 정보 if code in components_by_drug: med['components'] = components_by_drug[code] return medications except Exception as e: logging.error(f"[PAAI] Medication enrichment 오류: {e}") return medications # ───────────────────────────────────────────────────────────── # 조제관리 페이지 # ───────────────────────────────────────────────────────────── @pmr_bp.route('/') def pmr_index(): """조제관리 메인 페이지""" return render_template('pmr.html') # ───────────────────────────────────────────────────────────── # API: 날짜별 처방전 목록 # ───────────────────────────────────────────────────────────── @pmr_bp.route('/api/prescriptions', methods=['GET']) def get_prescriptions_by_date(): """ 날짜별 처방전 목록 조회 Query Params: - date: YYYY-MM-DD (기본값: 오늘) """ try: date_str = request.args.get('date', date.today().strftime('%Y-%m-%d')) # YYYYMMDD 형식으로 변환 date_yyyymmdd = date_str.replace('-', '') conn = get_mssql_connection('PM_PRES') cursor = conn.cursor() cursor.execute(""" SELECT PreSerial, Day_Serial, PassDay, Indate, Paname, PaNum, CusCode, InsName, Drname, PresTime, PreGubun, PRICE_T, PRICE_P, PRICE_C FROM PS_MAIN WHERE Indate = ? ORDER BY Day_Serial ASC """, (date_yyyymmdd,)) prescriptions = [] for row in cursor.fetchall(): # 주민번호에서 나이/성별 추출 panum = row.PaNum or '' age = None gender = None if len(panum) >= 7: try: birth_year = int(panum[:2]) gender_code = panum[6] if len(panum) > 6 else '' # 성별 및 세기 판단 if gender_code in ['1', '2', '5', '6']: birth_year += 1900 elif gender_code in ['3', '4', '7', '8']: birth_year += 2000 else: birth_year += 1900 gender = '남' if gender_code in ['1', '3', '5', '7'] else '여' age = datetime.now().year - birth_year except: pass prescriptions.append({ 'prescription_id': row.PreSerial, 'order_number': row.Day_Serial, 'date': row.Indate, # 조제일 기준 'issue_date': row.PassDay, # 처방전 발행일 'dispense_date': row.Indate, # 조제일 'patient_name': row.Paname, 'patient_id': row.PaNum[:6] + '******' if row.PaNum and len(row.PaNum) > 6 else row.PaNum, 'patient_code': row.CusCode, 'hospital': row.InsName, 'doctor': row.Drname, 'time': row.PresTime, 'type': '급여' if row.PreGubun == '0' else '비급여' if row.PreGubun == '9' else row.PreGubun, 'age': age, 'gender': gender, 'price_total': row.PRICE_T, 'price_patient': row.PRICE_P, 'price_claim': row.PRICE_C }) conn.close() return jsonify({ 'success': True, 'date': date_str, 'count': len(prescriptions), 'prescriptions': prescriptions }) except Exception as e: logging.error(f"PMR 처방전 목록 조회 오류: {e}") return jsonify({'success': False, 'error': str(e)}), 500 # ───────────────────────────────────────────────────────────── # API: 처방전 상세 (약품 목록) # ───────────────────────────────────────────────────────────── @pmr_bp.route('/api/prescription/', methods=['GET']) def get_prescription_detail(prescription_id): """ 처방전 상세 정보 (약품 목록 포함) """ try: conn = get_mssql_connection('PM_PRES') cursor = conn.cursor() # 처방전 기본 정보 + 질병정보 cursor.execute(""" SELECT M.PreSerial, M.Day_Serial, M.PassDay, M.Paname, M.PaNum, M.CusCode, M.InsName, M.Drname, M.PresTime, M.PreGubun, M.PRICE_T, M.PRICE_P, M.PRICE_C, M.St1, M.St2, S1.AU_NAME, S2.AU_NAME FROM PS_MAIN M LEFT JOIN PM_BASE.dbo.CD_SANG S1 ON M.St1 = S1.AU_CODE LEFT JOIN PM_BASE.dbo.CD_SANG S2 ON M.St2 = S2.AU_CODE WHERE M.PreSerial = ? """, (prescription_id,)) rx_row = cursor.fetchone() if not rx_row: conn.close() return jsonify({'success': False, 'error': '처방전을 찾을 수 없습니다'}), 404 # 처방 약품 목록 (PS_sub_pharm + CD_GOODS + CD_MC JOIN) # PS_Type: 0,1=일반, 4=대체조제(실제), 9=대체조제(원본) medications = [] original_prescriptions = {} # PS_Type=9인 원본 처방 저장 cursor.execute(""" SELECT s.DrugCode, s.Days, s.QUAN, s.QUAN_TIME, s.PS_Type, s.INV_QUAN, s.SUB_SERIAL, s.UnitCode, g.GoodsName, g.SUNG_CODE, m.PRINT_TYPE, m.SIM_EFFECT FROM PS_sub_pharm s LEFT JOIN PM_DRUG.dbo.CD_GOODS g ON s.DrugCode = g.DrugCode LEFT JOIN PM_DRUG.dbo.CD_MC m ON s.DrugCode = m.DRUGCODE WHERE s.PreSerial = ? ORDER BY s.SUB_SERIAL """, (prescription_id,)) all_rows = cursor.fetchall() # 1차: PS_Type=9 (원본 처방) 수집 - 인덱스로 저장 for i, row in enumerate(all_rows): if row.PS_Type == '9': original_prescriptions[i] = { 'drug_code': row.DrugCode or '', 'drug_name': row.GoodsName or row.DrugCode or '', 'add_info': row.PRINT_TYPE or row.SIM_EFFECT or '' } # 2차: 실제 조제약만 추가 (PS_Type != 9) for i, row in enumerate(all_rows): if row.PS_Type == '9': continue # 원본 처방은 스킵 # 효능: PRINT_TYPE > SIM_EFFECT > 없음 add_info = row.PRINT_TYPE or row.SIM_EFFECT or '' # 대체조제 여부 확인: PS_Type=4이고 바로 다음이 PS_Type=9 # 순서: 4(대체) → 9(원본) is_substituted = row.PS_Type == '4' and (i + 1) in original_prescriptions original_drug = original_prescriptions.get(i + 1) if is_substituted else None # UnitCode: 1=보험, 2=비보험, 3=100/100, 4~7=급여(본인부담률) unit_code = int(row.UnitCode) if row.UnitCode else 1 medications.append({ 'medication_code': row.DrugCode or '', 'med_name': row.GoodsName or row.DrugCode or '', 'add_info': add_info, 'dosage': float(row.QUAN) if row.QUAN else 0, 'frequency': row.QUAN_TIME or 0, 'duration': row.Days or 0, 'total_qty': float(row.INV_QUAN) if row.INV_QUAN else 0, 'type': '급여' if row.PS_Type in ['0', '4'] else '비급여' if row.PS_Type == '1' else row.PS_Type, 'sung_code': row.SUNG_CODE or '', 'unit': get_drug_unit(row.GoodsName or '', row.SUNG_CODE or ''), 'ps_type': row.PS_Type or '0', 'unit_code': unit_code, 'is_substituted': is_substituted, 'original_drug': original_drug }) # 나이/성별 계산 panum = rx_row.PaNum or '' age = None gender = None if len(panum) >= 7: try: birth_year = int(panum[:2]) gender_code = panum[6] if gender_code in ['1', '2', '5', '6']: birth_year += 1900 elif gender_code in ['3', '4', '7', '8']: birth_year += 2000 gender = '남' if gender_code in ['1', '3', '5', '7'] else '여' age = datetime.now().year - birth_year except: pass # 질병 정보 (St1, St2 → AU_NAME) disease_info = None st1 = rx_row[13] or '' # St1 st2 = rx_row[14] or '' # St2 disease_name_1 = rx_row[15] or '' # S1.AU_NAME disease_name_2 = rx_row[16] or '' # S2.AU_NAME if st1 or st2: disease_info = { 'code_1': st1, 'code_2': st2, 'name_1': disease_name_1, 'name_2': disease_name_2 } # 환자 특이사항(CUSETC) + 전화번호 조회 - CD_PERSON 테이블 cusetc = '' phone = '' cus_code = rx_row.CusCode if cus_code: try: # PM_BASE.dbo.CD_PERSON에서 조회 cursor.execute(""" SELECT CUSETC, PHONE, TEL_NO, PHONE2 FROM PM_BASE.dbo.CD_PERSON WHERE CUSCODE = ? """, (cus_code,)) person_row = cursor.fetchone() if person_row: if person_row.CUSETC: cusetc = person_row.CUSETC # 전화번호 (PHONE, TEL_NO, PHONE2 중 하나) phone = person_row.PHONE or person_row.TEL_NO or person_row.PHONE2 or '' except Exception as e: logging.warning(f"환자정보 조회 실패: {e}") conn.close() return jsonify({ 'success': True, 'prescription': { 'prescription_id': rx_row.PreSerial, 'order_number': rx_row.Day_Serial, 'date': rx_row.PassDay, 'time': rx_row.PresTime, 'hospital': rx_row.InsName, 'doctor': rx_row.Drname, 'type': '급여' if rx_row.PreGubun == '0' else '비급여', 'price_total': rx_row.PRICE_T, 'price_patient': rx_row.PRICE_P }, 'patient': { 'name': rx_row.Paname, 'code': rx_row.CusCode, 'cus_code': rx_row.CusCode, # 호환성 'age': age, 'gender': gender, 'cusetc': cusetc, # 특이사항 'phone': phone # 전화번호 }, 'disease_info': disease_info, 'medications': medications, 'medication_count': len(medications) }) except Exception as e: logging.error(f"PMR 처방전 상세 조회 오류: {e}") return jsonify({'success': False, 'error': str(e)}), 500 # ───────────────────────────────────────────────────────────── # API: 통계 (당일 요약) # ───────────────────────────────────────────────────────────── @pmr_bp.route('/api/stats', methods=['GET']) def get_daily_stats(): """당일 조제 통계""" try: date_str = request.args.get('date', date.today().strftime('%Y-%m-%d')) date_yyyymmdd = date_str.replace('-', '') conn = get_mssql_connection('PM_PRES') cursor = conn.cursor() # 처방전 수 cursor.execute(""" SELECT COUNT(*) as cnt FROM PS_MAIN WHERE PassDay = ? """, (date_yyyymmdd,)) total_prescriptions = cursor.fetchone()[0] # 총 금액 cursor.execute(""" SELECT ISNULL(SUM(PRICE_T), 0) as total, ISNULL(SUM(PRICE_P), 0) as patient, ISNULL(SUM(PRICE_C), 0) as claim FROM PS_MAIN WHERE PassDay = ? """, (date_yyyymmdd,)) price_row = cursor.fetchone() conn.close() return jsonify({ 'success': True, 'date': date_str, 'stats': { 'total_prescriptions': total_prescriptions, 'total_amount': price_row[0] if price_row else 0, 'patient_amount': price_row[1] if price_row else 0, 'claim_amount': price_row[2] if price_row else 0 } }) except Exception as e: logging.error(f"PMR 통계 조회 오류: {e}") return jsonify({'success': False, 'error': str(e)}), 500 # ───────────────────────────────────────────────────────────── # API: DB 연결 테스트 # ───────────────────────────────────────────────────────────── @pmr_bp.route('/api/test', methods=['GET']) def test_connection(): """DB 연결 테스트""" try: conn = get_mssql_connection('PM_PRES') cursor = conn.cursor() cursor.execute("SELECT @@VERSION") version = cursor.fetchone()[0] conn.close() return jsonify({ 'success': True, 'server': MSSQL_CONFIG['server'], 'version': version[:100] + '...' }) except Exception as e: return jsonify({'success': False, 'error': str(e)}), 500 # ───────────────────────────────────────────────────────────── # API: 라벨 미리보기 # ───────────────────────────────────────────────────────────── @pmr_bp.route('/api/label/preview', methods=['POST']) def preview_label(): """ 라벨 미리보기 (PIL 렌더링 → Base64 이미지) Request Body: - patient_name: 환자명 - med_name: 약품명 - dosage: 1회 복용량 - frequency: 복용 횟수 - duration: 복용 일수 - unit: 단위 (정, 캡슐, mL 등) - sung_code: 성분코드 (환산계수 조회용, 선택) """ try: data = request.get_json() patient_name = data.get('patient_name', '') med_name = data.get('med_name', '') add_info = data.get('add_info', '') dosage = float(data.get('dosage', 0)) frequency = int(data.get('frequency', 0)) duration = int(data.get('duration', 0)) unit = data.get('unit', '정') sung_code = data.get('sung_code', '') # 환산계수 및 보관조건 조회 (sung_code가 있는 경우) conversion_factor = None storage_conditions = '실온보관' if sung_code: try: from db.dbsetup import db_manager cf_result = db_manager.get_conversion_factor(sung_code) conversion_factor = cf_result.get('conversion_factor') storage_conditions = cf_result.get('storage_conditions', '실온보관') except Exception as cf_err: logging.warning(f"환산계수 조회 실패 (무시): {cf_err}") # 라벨 이미지 생성 image = create_label_image( patient_name=patient_name, med_name=med_name, add_info=add_info, dosage=dosage, frequency=frequency, duration=duration, unit=unit, conversion_factor=conversion_factor, storage_conditions=storage_conditions ) # Base64 인코딩 buffer = io.BytesIO() image.save(buffer, format='PNG') buffer.seek(0) img_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8') return jsonify({ 'success': True, 'image': f'data:image/png;base64,{img_base64}', 'conversion_factor': conversion_factor, 'storage_conditions': storage_conditions }) except Exception as e: logging.error(f"라벨 미리보기 오류: {e}") return jsonify({'success': False, 'error': str(e)}), 500 # API: 라벨 인쇄 (Brother QL 프린터) # ───────────────────────────────────────────────────────────── @pmr_bp.route('/api/label/print', methods=['POST']) def print_label(): """ 라벨 인쇄 (PIL 렌더링 → Brother QL 프린터 전송) Request Body: - patient_name, med_name, dosage, frequency, duration, unit, sung_code - printer: 프린터 선택 (선택, 기본값 '168') - '121': QL-710W (192.168.0.121) - '168': QL-810W (192.168.0.168) - orientation: 출력 방향 (선택, 기본값 'portrait') - 'portrait': 세로 모드 (QR 라벨과 동일, 회전 없음) - 'landscape': 가로 모드 (90도 회전) """ try: from brother_ql.raster import BrotherQLRaster from brother_ql.conversion import convert from brother_ql.backends.helpers import send data = request.get_json() patient_name = data.get('patient_name', '') med_name = data.get('med_name', '') add_info = data.get('add_info', '') dosage = float(data.get('dosage', 0)) frequency = int(data.get('frequency', 0)) duration = int(data.get('duration', 0)) unit = data.get('unit', '정') sung_code = data.get('sung_code', '') printer = data.get('printer', '168') # 기본값: QL-810W orientation = data.get('orientation', 'portrait') # 기본값: 세로 모드 # 프린터 설정 if printer == '121': printer_ip = '192.168.0.121' printer_model = 'QL-710W' else: printer_ip = '192.168.0.168' printer_model = 'QL-810W' # 환산계수 및 보관조건 조회 conversion_factor = None storage_conditions = '실온보관' if sung_code: try: from db.dbsetup import db_manager cf_result = db_manager.get_conversion_factor(sung_code) conversion_factor = cf_result.get('conversion_factor') storage_conditions = cf_result.get('storage_conditions', '실온보관') except Exception as cf_err: logging.warning(f"환산계수 조회 실패 (무시): {cf_err}") # 1. 라벨 이미지 생성 label_image = create_label_image( patient_name=patient_name, med_name=med_name, add_info=add_info, dosage=dosage, frequency=frequency, duration=duration, unit=unit, conversion_factor=conversion_factor, storage_conditions=storage_conditions ) # 2. 방향 설정 (portrait: 회전 없음, landscape: 90도 회전) if orientation == 'landscape': # 가로 모드: 90도 회전 (기존 방식) label_final = label_image.rotate(90, expand=True) else: # 세로 모드: 회전 없음 (QR 라벨과 동일) label_final = label_image # 3. Brother QL 프린터로 전송 qlr = BrotherQLRaster(printer_model) instructions = convert( qlr=qlr, images=[label_final], label='29', rotate='0', threshold=70.0, dither=False, compress=False, red=False, dpi_600=False, hq=True, cut=True ) send(instructions, printer_identifier=f"tcp://{printer_ip}:9100") logging.info(f"[SUCCESS] PMR 라벨 인쇄 성공: {med_name} → {printer_model} ({orientation})") return jsonify({ 'success': True, 'message': f'{med_name} 라벨 인쇄 완료 ({printer_model})', 'printer': printer_model, 'orientation': orientation }) except ImportError as e: logging.error(f"brother_ql 라이브러리 없음: {e}") return jsonify({'success': False, 'error': 'brother_ql 라이브러리가 설치되지 않았습니다'}), 500 except Exception as e: logging.error(f"라벨 인쇄 오류: {e}") return jsonify({'success': False, 'error': str(e)}), 500 def normalize_medication_name(med_name): """ 약품명 정제 - 밀리그램 등을 mg로 변환, 불필요한 부분 제거 """ import re if not med_name: return med_name # 언더스코어 뒤 내용 제거 (예: 휴니즈레바미피드정_ → 휴니즈레바미피드정) med_name = re.sub(r'_.*$', '', med_name) # 대괄호 및 내용 제거 med_name = re.sub(r'\[.*?\]', '', med_name) med_name = re.sub(r'\[.*$', '', med_name) # 밀리그램 변환 med_name = re.sub(r'밀리그램|밀리그람|미리그램|미리그람', 'mg', med_name) # 마이크로그램 변환 med_name = re.sub(r'마이크로그램|마이크로그람', 'μg', med_name) # 그램 변환 (mg/μg 제외) med_name = re.sub(r'(?= min_font_size: try: test_font = ImageFont.truetype(font_path, current_size) except: return ImageFont.load_default() bbox = draw.textbbox((0, 0), text, font=test_font) text_width = bbox[2] - bbox[0] if text_width <= max_width: return test_font current_size -= 2 try: return ImageFont.truetype(font_path, min_font_size) except: return ImageFont.load_default() # 중앙 정렬 텍스트 함수 def draw_centered(text, y, font, fill="black"): bbox = draw.textbbox((0, 0), text, font=font) w = bbox[2] - bbox[0] x = (label_width - w) // 2 draw.text((x, y), text, font=font, fill=fill) return y + bbox[3] - bbox[1] + 5 # 약품명 줄바꿈 처리 def wrap_text_korean(text, font, max_width, draw): """한글/영문 혼합 텍스트 줄바꿈 (글자 단위)""" if not text: return [text] lines = [] current_line = "" for char in text: test_line = current_line + char bbox = draw.textbbox((0, 0), test_line, font=font) if bbox[2] - bbox[0] <= max_width: current_line = test_line else: if current_line: lines.append(current_line) current_line = char if current_line: lines.append(current_line) return lines if lines else [text] def wrap_text(text, font, max_width): lines = [] words = text.split() current_line = "" for word in words: test_line = f"{current_line} {word}".strip() bbox = draw.textbbox((0, 0), test_line, font=font) if bbox[2] - bbox[0] <= max_width: current_line = test_line else: if current_line: lines.append(current_line) current_line = word if current_line: lines.append(current_line) return lines if lines else [text] y = 8 # 상단 지그재그 ↔ 이름 간격 (공간 확보를 위해 축소) # 환자명 (띄어쓰기) spaced_name = " ".join(patient_name) if patient_name else "" y = draw_centered(spaced_name, y, name_font) y += 5 # 약품명 (줄바꿈) # 앞에 있는 (숫자mg) 패턴 제거 후, 뒤의 괄호 앞에서 분리 import re # (2.5mg)노바스크정 → 노바스크정 main_name = re.sub(r'^\([^)]+\)', '', med_name).strip() # 노바스크정(고혈압) → 노바스크정 if '(' in main_name: main_name = main_name.split('(')[0].strip() # 빈 문자열이면 원본 사용 if not main_name: main_name = med_name # 약품명 - 동적 폰트 크기 적용 (긴 이름 자동 축소) adaptive_drug_font = get_adaptive_font(main_name, label_width - 30, 32, 18) name_lines = wrap_text_korean(main_name, adaptive_drug_font, label_width - 30, draw) for line in name_lines: y = draw_centered(line, y, adaptive_drug_font) # 효능효과 (add_info) - 동적 폰트 크기 적용 if add_info: efficacy_text = f"({add_info})" adaptive_efficacy_font = get_adaptive_font(efficacy_text, label_width - 40, 30, 20) y = draw_centered(efficacy_text, y, adaptive_efficacy_font, fill="black") y += 5 # 총량 계산 및 표시 (환산계수 반영) if dosage > 0 and frequency > 0 and duration > 0: total = dosage * frequency * duration total_str = str(int(total)) if total == int(total) else f"{total:.2f}".rstrip('0').rstrip('.') # 환산계수가 있으면 변환된 총량도 표시 (예: "총120mL (13.2g)/5일분") if conversion_factor is not None and conversion_factor > 0: converted_total = total * conversion_factor if converted_total == int(converted_total): converted_str = str(int(converted_total)) else: converted_str = f"{converted_total:.2f}".rstrip('0').rstrip('.') total_text = f"총{total_str}{unit} ({converted_str}g)/{duration}일분" else: total_text = f"총{total_str}{unit}/{duration}일분" y = draw_centered(total_text, y, additional_font) y += 5 # 용법 박스 (테두리 있는 박스) box_margin = 20 box_height = 75 box_top = y box_bottom = y + box_height box_width = label_width - 2 * box_margin draw.rectangle( [(box_margin, box_top), (label_width - box_margin, box_bottom)], outline="black", width=2 ) # 박스 내용 - 1회 복용량 dosage_str = str(int(dosage)) if dosage == int(dosage) else f"{dosage:.4f}".rstrip('0').rstrip('.') dosage_text = f"{dosage_str}{unit}" # 복용 시간 if frequency == 1: time_text = "아침" elif frequency == 2: time_text = "아침, 저녁" elif frequency == 3: time_text = "아침, 점심, 저녁" else: time_text = f"1일 {frequency}회" # 박스 내 텍스트 중앙 배치 (수직 중앙 정렬) line_spacing = 6 # 1회복용량 ↔ 복용횟수 간격 bbox1 = draw.textbbox((0, 0), dosage_text, font=info_font) text1_height = bbox1[3] - bbox1[1] bbox2 = draw.textbbox((0, 0), time_text, font=info_font) text2_height = bbox2[3] - bbox2[1] # 방법 2: 폰트 최대 높이 기준 고정 (글자 내용과 무관하게 일정한 레이아웃) fixed_line_height = 32 # 폰트 크기 30 기반 고정 라인 높이 fixed_total_height = fixed_line_height * 2 + line_spacing center_y = (box_top + box_bottom) // 2 start_y = center_y - (fixed_total_height // 2) - 5 # 박스 내 텍스트 전체 위로 조정 draw_centered(dosage_text, start_y, info_font) draw_centered(time_text, start_y + fixed_line_height + line_spacing, info_font) # 조제일 (시그니처 위쪽에 배치) - 먼저 위치 계산 today = datetime.now().strftime('%Y-%m-%d') print_date_text = f"조제일 : {today}" bbox = draw.textbbox((0, 0), print_date_text, font=small_font) date_w, date_h = bbox[2] - bbox[0], bbox[3] - bbox[1] print_date_y = label_height - date_h - 70 # 시그니처 위쪽 # 보관조건 표시 (조제일 바로 위에 고정 배치) if storage_conditions: storage_text = f"* {storage_conditions}" try: storage_font = ImageFont.truetype(font_path, 28) except: storage_font = ImageFont.load_default() bbox_storage = draw.textbbox((0, 0), storage_text, font=storage_font) storage_w = bbox_storage[2] - bbox_storage[0] storage_h = bbox_storage[3] - bbox_storage[1] storage_y = print_date_y - storage_h - 8 # 조제일 위 8px 간격 draw.text(((label_width - storage_w) / 2, storage_y), storage_text, font=storage_font, fill="black") # 조제일 그리기 draw.text(((label_width - date_w) / 2, print_date_y), print_date_text, font=small_font, fill="black") # 시그니처 박스 (하단 - 약국명) signature_text = "청 춘 약 국" bbox = draw.textbbox((0, 0), signature_text, font=signature_font) w_sig, h_sig = bbox[2] - bbox[0], bbox[3] - bbox[1] # 시그니처 박스 패딩 및 위치 계산 padding_top = int(h_sig * 0.1) padding_bottom = int(h_sig * 0.5) padding_sides = int(h_sig * 0.2) box_x = (label_width - w_sig) / 2 - padding_sides box_y = label_height - h_sig - padding_top - padding_bottom - 10 box_x2 = box_x + w_sig + 2 * padding_sides box_y2 = box_y + h_sig + padding_top + padding_bottom # 시그니처 테두리 및 텍스트 draw.rectangle([(box_x, box_y), (box_x2, box_y2)], outline="black", width=1) draw.text(((label_width - w_sig) / 2, box_y + padding_top), signature_text, font=signature_font, fill="black") # 지그재그 테두리 (가위로 자른 느낌) draw_scissor_border(draw, label_width, label_height, edge_size=10, steps=20) return image # ───────────────────────────────────────────────────────────── # API: 환자 이전 처방 이력 # ───────────────────────────────────────────────────────────── @pmr_bp.route('/api/patient//history', methods=['GET']) def get_patient_history(cus_code): """ 환자 이전 처방 이력 조회 Args: cus_code: 환자 고유코드 (CusCode) Query Params: - limit: 최대 조회 건수 (기본 10, 최대 50) - exclude: 제외할 처방번호 (현재 처방) """ try: limit = min(int(request.args.get('limit', 10)), 50) exclude_serial = request.args.get('exclude', '') conn = get_mssql_connection('PM_PRES') cursor = conn.cursor() # 이전 처방 목록 조회 query = """ SELECT TOP (?) m.PreSerial, m.PassDay, m.PresTime, m.Paname, m.InsName, m.Drname, m.PRICE_P, (SELECT COUNT(*) FROM PS_sub_pharm WHERE PreSerial = m.PreSerial) as drug_count FROM PS_MAIN m WHERE m.CusCode = ? """ params = [limit, cus_code] if exclude_serial: query += " AND m.PreSerial != ?" params.append(exclude_serial) query += " ORDER BY m.PassDay DESC, m.PresTime DESC" cursor.execute(query, params) history = [] for row in cursor.fetchall(): pre_serial = row.PreSerial # 해당 처방의 약품 목록 조회 # PS_Type=9 (대체조제 원처방)는 제외 cursor.execute(""" SELECT s.DrugCode, s.Days, s.QUAN, s.QUAN_TIME, s.PS_Type, g.GoodsName, m.PRINT_TYPE FROM PS_sub_pharm s LEFT JOIN PM_DRUG.dbo.CD_GOODS g ON s.DrugCode = g.DrugCode LEFT JOIN PM_DRUG.dbo.CD_MC m ON s.DrugCode = m.DRUGCODE WHERE s.PreSerial = ? AND (s.PS_Type IS NULL OR s.PS_Type != '9') ORDER BY s.SUB_SERIAL """, (pre_serial,)) medications = [] for med_row in cursor.fetchall(): medications.append({ 'medication_code': med_row.DrugCode or '', 'med_name': med_row.GoodsName or med_row.DrugCode or '', 'add_info': med_row.PRINT_TYPE or '', 'dosage': float(med_row.QUAN) if med_row.QUAN else 0, 'frequency': med_row.QUAN_TIME or 0, 'duration': med_row.Days or 0, 'ps_type': med_row.PS_Type or '0' }) # 날짜 포맷 pass_day = row.PassDay if pass_day and len(pass_day) == 8: date_formatted = f"{pass_day[:4]}-{pass_day[4:6]}-{pass_day[6:8]}" else: date_formatted = pass_day or '' history.append({ 'prescription_id': pre_serial, 'date': date_formatted, 'time': row.PresTime or '', 'patient_name': row.Paname or '', 'hospital': row.InsName or '', 'doctor': row.Drname or '', 'copayment': int(row.PRICE_P or 0), 'medication_count': row.drug_count or 0, 'medications': medications }) conn.close() return jsonify({ 'success': True, 'cus_code': cus_code, 'count': len(history), 'history': history }) except Exception as e: logging.error(f"환자 이전 처방 조회 오류: {e}") return jsonify({'success': False, 'error': str(e)}), 500 # ───────────────────────────────────────────────────────────── # API: 환자 OTC 구매 이력 # ───────────────────────────────────────────────────────────── @pmr_bp.route('/api/patient//otc', methods=['GET']) def get_patient_otc_history(cus_code): """ 환자 OTC (일반의약품) 구매 이력 조회 Args: cus_code: 환자 고유코드 (CusCode = SL_CD_custom) Query Params: - limit: 최대 조회 건수 (기본 20, 최대 100) """ try: limit = min(int(request.args.get('limit', 20)), 100) conn = get_mssql_connection('PM_PRES') cursor = conn.cursor() # ✅ 최적화: 한번의 쿼리로 거래 + 품목 모두 조회 (JOIN) cursor.execute(""" SELECT m.SL_NO_order, m.SL_DT_appl, m.InsertTime, m.SL_MY_sale, m.SL_NM_custom, s.DrugCode, g.GoodsName, s.SL_NM_item, s.SL_TOTAL_PRICE, mc.PRINT_TYPE FROM ( SELECT TOP (?) * FROM SALE_MAIN WHERE SL_CD_custom = ? AND PRESERIAL = 'V' ORDER BY InsertTime DESC ) m LEFT JOIN SALE_SUB s ON m.SL_NO_order = s.SL_NO_order LEFT JOIN PM_DRUG.dbo.CD_GOODS g ON s.DrugCode = g.DrugCode LEFT JOIN PM_DRUG.dbo.CD_MC mc ON s.DrugCode = mc.DRUGCODE ORDER BY m.InsertTime DESC, s.DrugCode """, (limit, cus_code)) # 결과를 order_no별로 그룹핑 orders_dict = {} all_drug_codes = [] for row in cursor.fetchall(): order_no = row.SL_NO_order if order_no not in orders_dict: orders_dict[order_no] = { 'order_no': order_no, 'date': row.SL_DT_appl, 'datetime': row.InsertTime.strftime('%Y-%m-%d %H:%M') if row.InsertTime else '', 'amount': int(row.SL_MY_sale or 0), 'customer_name': row.SL_NM_custom or '', 'items': [] } # 품목 추가 (DrugCode가 있는 경우만) if row.DrugCode: drug_code = row.DrugCode all_drug_codes.append(drug_code) orders_dict[order_no]['items'].append({ 'drug_code': drug_code, 'name': row.GoodsName or drug_code, 'quantity': int(row.SL_NM_item or 0), 'price': int(row.SL_TOTAL_PRICE or 0), 'category': row.PRINT_TYPE or '', 'image': None }) conn.close() # dict → list 변환 purchases = list(orders_dict.values()) for p in purchases: p['item_count'] = len(p['items']) if not purchases: return jsonify({ 'success': True, 'cus_code': cus_code, 'count': 0, 'purchases': [] }) # 제품 이미지 조회 (product_images.db) image_map = {} try: img_db_path = Path(__file__).parent / 'db' / 'product_images.db' if img_db_path.exists() and all_drug_codes: img_conn = sqlite3.connect(str(img_db_path)) img_cursor = img_conn.cursor() placeholders = ','.join(['?' for _ in all_drug_codes]) img_cursor.execute(f''' SELECT drug_code, thumbnail_base64 FROM product_images WHERE drug_code IN ({placeholders}) AND thumbnail_base64 IS NOT NULL ''', all_drug_codes) for row in img_cursor.fetchall(): image_map[row[0]] = row[1] img_conn.close() except Exception as img_err: logging.warning(f"제품 이미지 조회 오류: {img_err}") # 이미지 매핑 for purchase in purchases: for item in purchase['items']: if item['drug_code'] in image_map: item['image'] = image_map[item['drug_code']] # 통계 계산 total_amount = sum(p['amount'] for p in purchases) total_visits = len(purchases) # 자주 구매하는 품목 집계 item_freq = {} for p in purchases: for item in p['items']: key = item['drug_code'] if key not in item_freq: item_freq[key] = { 'name': item['name'], 'category': item['category'], 'count': 0, 'total_qty': 0 } item_freq[key]['count'] += 1 item_freq[key]['total_qty'] += item['quantity'] # 빈도순 정렬 frequent_items = sorted(item_freq.values(), key=lambda x: x['count'], reverse=True)[:10] return jsonify({ 'success': True, 'cus_code': cus_code, 'count': len(purchases), 'summary': { 'total_visits': total_visits, 'total_amount': total_amount, 'frequent_items': frequent_items }, 'purchases': purchases }) except Exception as e: logging.error(f"환자 OTC 구매 이력 조회 오류: {e}") return jsonify({'success': False, 'error': str(e)}), 500 # ───────────────────────────────────────────────────────────── # PAAI (Pharmacist Assistant AI) API # ───────────────────────────────────────────────────────────── @pmr_bp.route('/api/paai/result/', methods=['GET']) def paai_get_result(pre_serial): """ PAAI 분석 결과 조회 (캐시) Response: - 결과 있음: {exists: true, status: "success", result: {...}, log_id: 123} - 생성 중: {exists: true, status: "generating", log_id: 123} - 없음: {exists: false} """ import sqlite3 import json from pathlib import Path try: db_path = Path(__file__).parent / 'db' / 'paai_logs.db' if not db_path.exists(): return jsonify({'exists': False}) conn = sqlite3.connect(str(db_path)) conn.row_factory = sqlite3.Row cursor = conn.cursor() # 해당 처방의 최신 성공 로그 조회 cursor.execute(''' SELECT id, status, created_at, patient_name, disease_name_1, disease_name_2, current_med_count, kims_interaction_count, kims_has_severe, kims_response_time_ms, ai_response_time_ms, ai_response FROM paai_logs WHERE pre_serial = ? ORDER BY id DESC LIMIT 1 ''', (pre_serial,)) row = cursor.fetchone() conn.close() if not row: return jsonify({'exists': False}) status = row['status'] # 생성 중 상태 if status in ('pending', 'kims_done', 'generating'): return jsonify({ 'exists': True, 'status': 'generating', 'log_id': row['id'] }) # 에러 상태 if status == 'error': return jsonify({ 'exists': True, 'status': 'error', 'log_id': row['id'] }) # 성공 상태 - 전체 결과 반환 ai_response = {} if row['ai_response']: try: ai_response = json.loads(row['ai_response']) except: pass return jsonify({ 'exists': True, 'status': 'success', 'log_id': row['id'], 'result': { 'created_at': row['created_at'], 'patient_name': row['patient_name'], 'disease_name_1': row['disease_name_1'], 'disease_name_2': row['disease_name_2'], 'medication_count': row['current_med_count'], 'kims_summary': { 'interaction_count': row['kims_interaction_count'], 'has_severe': bool(row['kims_has_severe']) }, 'timing': { 'kims_ms': row['kims_response_time_ms'], 'ai_ms': row['ai_response_time_ms'], 'total_ms': (row['kims_response_time_ms'] or 0) + (row['ai_response_time_ms'] or 0) }, 'analysis': ai_response } }) except Exception as e: logging.error(f"PAAI 결과 조회 오류: {e}") return jsonify({'exists': False, 'error': str(e)}) @pmr_bp.route('/api/paai/analyze', methods=['POST']) def paai_analyze(): """ PAAI 분석 API Request: { "pre_serial": "P20260304001", "cus_code": "00001234", "patient_name": "홍길동", "disease_info": { "code_1": "M170", "name_1": "무릎골관절염", "code_2": "K299", "name_2": "상세불명의 위십이지장염" }, "current_medications": [ {"code": "055101150", "name": "록소프로펜정", "dosage": 1, "frequency": 3, "days": 7} ], "previous_serial": "P20260225001", "previous_medications": [...], "otc_history": {...} } """ import requests as http_requests import time as time_module from db.paai_logger import create_log, update_kims_result, update_ai_result, update_error start_time = time_module.time() log_id = None try: data = request.get_json() if not data: return jsonify({'success': False, 'error': '요청 데이터가 없습니다.'}), 400 pre_serial = data.get('pre_serial') cus_code = data.get('cus_code') patient_name = data.get('patient_name') patient_note = data.get('patient_note', '') # 환자 특이사항 (알러지, 기저질환 등) disease_info = data.get('disease_info', {}) current_medications = data.get('current_medications', []) previous_serial = data.get('previous_serial') previous_medications = data.get('previous_medications', []) otc_history = data.get('otc_history', {}) # ✅ 약품 정보 Enrichment (성분/분류/상호작용/금기) current_medications = enrich_medications(current_medications) # 처방 변화 분석 prescription_changes = analyze_prescription_changes( current_medications, previous_medications ) # 1. 로그 생성 log_id = create_log( pre_serial=pre_serial, patient_code=cus_code, patient_name=patient_name, disease_code_1=disease_info.get('code_1'), disease_name_1=disease_info.get('name_1'), disease_code_2=disease_info.get('code_2'), disease_name_2=disease_info.get('name_2'), current_medications=current_medications, previous_serial=previous_serial, previous_medications=previous_medications, prescription_changes=prescription_changes, otc_history=otc_history ) # 2. KD코드 추출 (9자리 KIMS 코드) kd_codes = [] drug_names = [] for med in current_medications: code = med.get('code', '') if code and len(str(code)) == 9: kd_codes.append(str(code)) drug_names.append(med.get('name', '')) # 3. KIMS API 호출 (약품 2개 이상인 경우) kims_interactions = [] kims_start = time_module.time() if len(kd_codes) >= 2: try: kims_url = "https://api2.kims.co.kr/api/interaction/info" kims_headers = { 'Authorization': 'Basic VFNQTUtSOg==', 'Content-Type': 'application/json', 'Accept': 'application/json; charset=utf-8' } kims_payload = {'KDCodes': kd_codes} kims_response = http_requests.get( kims_url, headers=kims_headers, data=__import__('json').dumps(kims_payload), timeout=10, verify=False ) if kims_response.status_code == 200: kims_data = kims_response.json() if kims_data.get('Message') == 'SUCCESS': kims_interactions = kims_data.get('InteractionList', []) except Exception as kims_err: logging.warning(f"KIMS API 오류 (무시하고 계속): {kims_err}") kims_time = int((time_module.time() - kims_start) * 1000) # KIMS 결과 로그 업데이트 update_kims_result( log_id=log_id, kims_drug_codes=kd_codes, kims_interactions=kims_interactions, kims_response_time_ms=kims_time ) # 4. AI 프롬프트 생성 ai_prompt = build_paai_prompt( disease_info=disease_info, current_medications=current_medications, prescription_changes=prescription_changes, kims_interactions=kims_interactions, otc_history=otc_history, patient_note=patient_note ) # 5. Clawdbot AI 호출 (WebSocket) ai_start = time_module.time() ai_response = call_clawdbot_ai(ai_prompt) ai_time = int((time_module.time() - ai_start) * 1000) # AI 결과 로그 업데이트 update_ai_result( log_id=log_id, ai_prompt=ai_prompt, ai_model='claude-sonnet-4', ai_response=ai_response, ai_response_time_ms=ai_time ) total_time = int((time_module.time() - start_time) * 1000) return jsonify({ 'success': True, 'log_id': log_id, 'analysis': ai_response, 'kims_summary': { 'drug_count': len(kd_codes), 'interaction_count': len(kims_interactions), 'has_severe': any(str(i.get('Severity', '5')) in ['1', '2'] for i in kims_interactions) }, 'timing': { 'kims_ms': kims_time, 'ai_ms': ai_time, 'total_ms': total_time } }) except Exception as e: logging.error(f"PAAI 분석 오류: {e}") if log_id: update_error(log_id, str(e)) return jsonify({'success': False, 'error': str(e)}), 500 def analyze_prescription_changes(current: list, previous: list) -> dict: """처방 변화 분석""" current_codes = {m.get('code'): m for m in current} previous_codes = {m.get('code'): m for m in previous} added = [] removed = [] changed = [] same = [] # 추가된 약품 for code, med in current_codes.items(): if code not in previous_codes: added.append(med) else: # 변경 여부 확인 prev_med = previous_codes[code] changes = [] for field in ['dosage', 'frequency', 'days']: if med.get(field) != prev_med.get(field): changes.append({ 'field': field, 'from': prev_med.get(field), 'to': med.get(field) }) if changes: changed.append({'medication': med, 'changes': changes}) else: same.append(med) # 중단된 약품 for code, med in previous_codes.items(): if code not in current_codes: removed.append(med) return { 'added': added, 'removed': removed, 'changed': changed, 'same': same } def build_paai_prompt( disease_info: dict, current_medications: list, prescription_changes: dict, kims_interactions: list, otc_history: dict, patient_note: str = '' ) -> str: """AI 프롬프트 생성""" # 질병 정보 diseases = [] if disease_info.get('code_1'): diseases.append(f"[{disease_info['code_1']}] {disease_info.get('name_1', '')}") if disease_info.get('code_2'): diseases.append(f"[{disease_info['code_2']}] {disease_info.get('name_2', '')}") # 현재 처방 (성분 정보 포함) med_lines = [] for med in current_medications: name = med.get('name', '?') dosage = med.get('dosage', 0) freq = med.get('frequency', 0) days = med.get('days', 0) line = f"- {name}: {dosage}정 × {freq}회 × {days}일" # 분류 정보 if med.get('print_type'): line += f"\n └ 분류: {med['print_type']}" # 성분 정보 if med.get('components'): components_str = ', '.join(med['components'][:3]) # 최대 3개 if len(med['components']) > 3: components_str += f" 외 {len(med['components'])-3}개" line += f"\n └ 성분: {components_str}" med_lines.append(line) # 처방 변화 change_lines = [] if prescription_changes.get('added'): names = [m.get('name', '?') for m in prescription_changes['added']] change_lines.append(f"- 추가: {', '.join(names)}") if prescription_changes.get('removed'): names = [m.get('name', '?') for m in prescription_changes['removed']] change_lines.append(f"- 중단: {', '.join(names)}") if prescription_changes.get('changed'): for item in prescription_changes['changed']: med = item['medication'] changes = item['changes'] change_desc = ', '.join([f"{c['field']}: {c['from']}→{c['to']}" for c in changes]) change_lines.append(f"- 변경: {med.get('name', '?')} ({change_desc})") # KIMS 상호작용 kims_lines = [] for inter in kims_interactions: severity = inter.get('Severity', 5) severity_text = {1: '🔴 금기', 2: '🟠 경고', 3: '🟡 주의', 4: '참고', 5: '정보'}.get(int(severity), '정보') drug1 = inter.get('Drug1Name', '?') drug2 = inter.get('Drug2Name', '?') desc = inter.get('InteractionDesc', '')[:100] kims_lines.append(f"- [{severity_text}] {drug1} + {drug2}: {desc}") # OTC 이력 otc_lines = [] if otc_history.get('frequent_items'): for item in otc_history['frequent_items'][:5]: otc_lines.append(f"- {item.get('name', '?')} ({item.get('count', 0)}회 구매)") # 환자 특이사항 (알러지, 기저질환 등) note_text = patient_note.strip() if patient_note else '' prompt = f"""당신은 약사를 보조하는 AI입니다. 환자 정보와 KIMS 상호작용 데이터를 바탕으로 분석해주세요. ## 환자 질병 {chr(10).join(diseases) if diseases else '- 정보 없음'} ## 환자 특이사항 (알러지/기저질환/주의사항) {note_text if note_text else '- 없음'} ## 현재 처방 {chr(10).join(med_lines) if med_lines else '- 정보 없음'} ## 처방 변화 (vs 이전) {chr(10).join(change_lines) if change_lines else '- 변화 없음'} ## KIMS 약물 상호작용 {chr(10).join(kims_lines) if kims_lines else '- 상호작용 없음'} ## OTC 구매 이력 {chr(10).join(otc_lines) if otc_lines else '- 이력 없음'} ## 요청 다음 형식의 JSON으로 간결하게 답변해주세요: {{ "prescription_insight": "처방 변화에 대한 분석 (1-2문장)", "kims_analysis": "KIMS 상호작용 해석 및 임상적 의미 (1-2문장)", "cautions": ["복용 주의사항 1", "복용 주의사항 2"], "otc_recommendations": [ {{"product": "추천 OTC명", "reason": "추천 이유 (현재 처방과 상호작용 없음 확인)"}} ], "counseling_points": ["상담 포인트 1", "상담 포인트 2"] }} """ return prompt def call_clawdbot_ai(prompt: str) -> dict: """Clawdbot AI 호출 (WebSocket Gateway)""" import json import re from services.clawdbot_client import ask_clawdbot PAAI_SYSTEM_PROMPT = """당신은 경험 많은 약사입니다. 처방 데이터를 분석하여 약사에게 유용한 정보를 제공합니다. 반드시 요청된 JSON 형식으로만 응답하세요.""" try: # Clawdbot Gateway WebSocket API 호출 ai_text = ask_clawdbot( message=prompt, session_id='paai-analysis', system_prompt=PAAI_SYSTEM_PROMPT, timeout=60, model='anthropic/claude-sonnet-4-5' # 빠른 Sonnet 사용 ) if not ai_text: logging.warning("[PAAI] Clawdbot 응답 없음") return generate_fallback_response(prompt) # JSON 블록 추출 try: json_match = re.search(r'\{[\s\S]*\}', ai_text) if json_match: return json.loads(json_match.group()) except Exception as parse_err: logging.warning(f"[PAAI] JSON 파싱 실패: {parse_err}") # JSON 파싱 실패 시 텍스트 그대로 반환 return { 'prescription_insight': ai_text[:500] if ai_text else '분석 결과 없음', 'kims_analysis': '', 'cautions': [], 'otc_recommendations': [], 'counseling_points': [] } except Exception as e: logging.error(f"[PAAI] Clawdbot AI 호출 오류: {e}") return generate_fallback_response(prompt) def generate_fallback_response(prompt: str) -> dict: """Clawdbot 연결 실패 시 기본 응답""" return { 'prescription_insight': 'AI 분석 서비스에 연결할 수 없습니다. KIMS 상호작용 정보를 직접 확인해주세요.', 'kims_analysis': '', 'cautions': ['AI 분석 불가 - 직접 검토 필요'], 'otc_recommendations': [], 'counseling_points': [], '_fallback': True } @pmr_bp.route('/api/paai/feedback', methods=['POST']) def paai_feedback(): """PAAI 피드백 저장""" from db.paai_logger import update_feedback try: data = request.get_json() log_id = data.get('log_id') useful = data.get('useful') comment = data.get('comment') if not log_id: return jsonify({'success': False, 'error': 'log_id 필요'}), 400 update_feedback(log_id, useful, comment) return jsonify({'success': True}) except Exception as e: logging.error(f"PAAI 피드백 저장 오류: {e}") return jsonify({'success': False, 'error': str(e)}), 500 # ──────────────────────────────────────────────────────────────────────────────── # PAAI 어드민 페이지 # ──────────────────────────────────────────────────────────────────────────────── @pmr_bp.route('/admin') def paai_admin_page(): """PAAI 어드민 대시보드 페이지""" return render_template('pmr_admin.html') @pmr_bp.route('/api/admin/stats') def paai_admin_stats(): """PAAI 통계 API""" from db.paai_logger import get_stats try: stats = get_stats() return jsonify({'success': True, 'stats': stats}) except Exception as e: logging.error(f"PAAI 통계 조회 오류: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @pmr_bp.route('/api/admin/logs') def paai_admin_logs(): """PAAI 로그 목록 API""" from db.paai_logger import get_recent_logs try: limit = request.args.get('limit', 50, type=int) status = request.args.get('status') has_severe = request.args.get('has_severe') date = request.args.get('date') patient_name = request.args.get('patient_name') # has_severe 파싱 if has_severe == 'true': has_severe = True elif has_severe == 'false': has_severe = False else: has_severe = None logs = get_recent_logs( limit=limit, status=status, has_severe=has_severe, date=date ) # 환자명 필터링 (클라이언트 사이드에서 하기엔 데이터가 많을 수 있음) if patient_name: logs = [log for log in logs if patient_name.lower() in (log.get('patient_name') or '').lower()] return jsonify({'success': True, 'logs': logs, 'count': len(logs)}) except Exception as e: logging.error(f"PAAI 로그 조회 오류: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @pmr_bp.route('/api/admin/log/') def paai_admin_log_detail(log_id): """PAAI 로그 상세 API""" from db.paai_logger import get_log_detail try: log = get_log_detail(log_id) if not log: return jsonify({'success': False, 'error': '로그를 찾을 수 없습니다'}), 404 return jsonify({'success': True, 'log': log}) except Exception as e: logging.error(f"PAAI 로그 상세 조회 오류: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @pmr_bp.route('/api/admin/feedback-stats') def paai_admin_feedback_stats(): """피드백 통계 API (일별)""" from db.paai_logger import DB_PATH import sqlite3 from datetime import datetime, timedelta try: if not DB_PATH.exists(): return jsonify({'success': True, 'stats': []}) conn = sqlite3.connect(str(DB_PATH)) cursor = conn.cursor() # 최근 30일 일별 통계 cursor.execute(""" SELECT DATE(created_at) as date, COUNT(*) as total, SUM(CASE WHEN feedback_useful = 1 THEN 1 ELSE 0 END) as useful, SUM(CASE WHEN feedback_useful = 0 THEN 1 ELSE 0 END) as not_useful, SUM(CASE WHEN feedback_useful IS NULL THEN 1 ELSE 0 END) as no_feedback, SUM(CASE WHEN kims_has_severe = 1 THEN 1 ELSE 0 END) as severe, AVG(ai_response_time_ms) as avg_ai_time FROM paai_logs WHERE created_at >= date('now', '-30 days') GROUP BY DATE(created_at) ORDER BY date DESC """) rows = cursor.fetchall() stats = [] for row in rows: stats.append({ 'date': row[0], 'total': row[1], 'useful': row[2] or 0, 'not_useful': row[3] or 0, 'no_feedback': row[4] or 0, 'severe': row[5] or 0, 'avg_ai_time': int(row[6]) if row[6] else 0 }) conn.close() return jsonify({'success': True, 'stats': stats}) except Exception as e: logging.error(f"피드백 통계 조회 오류: {e}") return jsonify({'success': False, 'error': str(e)}), 500 # ───────────────────────────────────────────────────────────── # ESC/POS 자동인쇄 API (EUC-KR 텍스트 방식) # ───────────────────────────────────────────────────────────── import socket # 프린터 설정 ESCPOS_PRINTER_IP = "192.168.0.174" ESCPOS_PRINTER_PORT = 9100 # ESC/POS 명령어 _ESC = b'\x1b' _INIT = _ESC + b'@' # 프린터 초기화 _CUT = _ESC + b'd\x03' # 피드 + 커트 def _log_print_history(pre_serial, patient_name, success, error=None): """인쇄 이력을 파일에 기록""" try: log_dir = Path(__file__).parent / 'logs' log_dir.mkdir(exist_ok=True) log_file = log_dir / 'print_history.log' timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') status = '✅ 성공' if success else f'❌ 실패: {error}' line = f"[{timestamp}] {pre_serial} | {patient_name} | {status}\n" with open(log_file, 'a', encoding='utf-8') as f: f.write(line) except Exception as e: logging.warning(f"인쇄 로그 기록 실패: {e}") @pmr_bp.route('/api/paai/print', methods=['POST']) def paai_print(): """PAAI 분석 결과 ESC/POS 인쇄""" try: data = request.get_json() pre_serial = data.get('pre_serial', '') patient_name = data.get('patient_name', '') result = data.get('result', {}) analysis = result.get('analysis', {}) kims_summary = result.get('kims_summary', {}) logging.info(f"[PRINT] 요청 수신: {pre_serial} ({patient_name})") # 영수증 텍스트 생성 message = _format_paai_receipt(pre_serial, patient_name, analysis, kims_summary) # 인쇄 success = _print_escpos_text(message) if success: logging.info(f"[PRINT] ✅ 완료: {pre_serial} ({patient_name})") _log_print_history(pre_serial, patient_name, True) return jsonify({'success': True, 'message': '인쇄 완료'}) else: logging.error(f"[PRINT] ❌ 프린터 연결 실패: {pre_serial}") _log_print_history(pre_serial, patient_name, False, '프린터 연결 실패') return jsonify({'success': False, 'error': '프린터 연결 실패'}), 500 except Exception as e: logging.error(f"[PRINT] ❌ 오류: {pre_serial} - {e}") _log_print_history(pre_serial, patient_name, False, str(e)) return jsonify({'success': False, 'error': str(e)}), 500 def _print_escpos_text(message: str) -> bool: """ESC/POS 프린터로 텍스트 전송 (EUC-KR)""" try: # EUC-KR 인코딩 text_bytes = message.encode('euc-kr', errors='replace') # 명령어 조합 command = _INIT + text_bytes + b'\n\n\n' + _CUT # 소켓 전송 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(10) sock.connect((ESCPOS_PRINTER_IP, ESCPOS_PRINTER_PORT)) sock.sendall(command) sock.close() return True except Exception as e: logging.error(f"ESC/POS 전송 실패: {e}") return False def _format_paai_receipt(pre_serial: str, patient_name: str, analysis: dict, kims_summary: dict) -> str: """PAAI 복약안내 영수증 텍스트 생성 (48자 기준)""" LINE = "=" * 48 THIN = "-" * 48 now = datetime.now().strftime("%Y-%m-%d %H:%M") # 헤더 msg = f"\n{LINE}\n" msg += _center_text("[ PAAI 복약안내 ]") + "\n" msg += f"{LINE}\n" # 환자 정보 msg += f"환자: {patient_name}\n" msg += f"처방번호: {pre_serial}\n" msg += f"출력: {now}\n" msg += f"{THIN}\n" # 상호작용 요약 interaction_count = kims_summary.get('interaction_count', 0) has_severe = kims_summary.get('has_severe', False) if has_severe: msg += "[!!] 중증 상호작용 있음!\n" elif interaction_count > 0: msg += f"[!] 약물 상호작용: {interaction_count}건\n" else: msg += "[V] 상호작용 없음\n" msg += "\n" # 처방 해석 insight = analysis.get('prescription_insight', '') if insight: msg += f"{THIN}\n" msg += ">> 처방 해석\n" for line in _wrap_text(insight, 44): msg += f" {line}\n" msg += "\n" # 복용 주의사항 cautions = analysis.get('cautions', []) if cautions: msg += f"{THIN}\n" msg += ">> 복용 주의사항\n" for i, caution in enumerate(cautions[:4], 1): first_line = True for line in _wrap_text(f"{i}. {caution}", 44): if first_line: msg += f" {line}\n" first_line = False else: msg += f" {line}\n" msg += "\n" # 상담 포인트 counseling = analysis.get('counseling_points', []) if counseling: msg += f"{THIN}\n" msg += ">> 상담 포인트\n" for i, point in enumerate(counseling[:3], 1): first_line = True for line in _wrap_text(f"{i}. {point}", 44): if first_line: msg += f" {line}\n" first_line = False else: msg += f" {line}\n" msg += "\n" # OTC 추천 otc_recs = analysis.get('otc_recommendations', []) if otc_recs: msg += f"{THIN}\n" msg += ">> OTC 추천\n" for rec in otc_recs[:2]: product = rec.get('product', '') reason = rec.get('reason', '') msg += f" - {product}\n" for line in _wrap_text(reason, 42): msg += f" {line}\n" msg += "\n" # 푸터 msg += f"{LINE}\n" msg += _center_text("양구청춘약국 PAAI") + "\n" msg += _center_text("Tel: 033-481-5222") + "\n" msg += "\n" return msg def _center_text(text: str, width: int = 48) -> str: """중앙 정렬""" text_len = len(text) if text_len >= width: return text spaces = (width - text_len) // 2 return " " * spaces + text def _wrap_text(text: str, width: int = 44) -> list: """텍스트 줄바꿈""" if not text: return [] lines = [] words = text.split() current = "" for word in words: if len(current) + len(word) + 1 <= width: current = current + " " + word if current else word else: if current: lines.append(current) current = word if current: lines.append(current) return lines if lines else [text[:width]]