# -*- coding: utf-8 -*- """ PharmIT3000 통계 쿼리 (v1) **QT-POS sales_stats_dialog.py 와 완전 동기화된 버전** (2026-04-08 rewrite). 기준 파일: pharmon-web/sales_stats_dialog.py (_query / _build_margin_cache / _calc_age / _empty_row / _add) 변경 포인트 (구버전 대비): 1. CD_SUNAB LEFT JOIN → OUTER APPLY TOP 1 (1:N 곱증으로 행 중복 → 금액 이중계산되던 버그 제거) 2. PS_SUB_BOJO JOIN 추가 (보훈 세분화 GITA_GUBUN) 3. SELECT 컬럼 확장: S_FASTMON, S_TEMP3, SE_BOHUN_C, DRUG_T1~T3, S_S_PHOL_0~3, SUGA_ZE_PRICE, GITA_GUBUN 4. 매출공식 GPPOS2 기준으로 재작성 - sales_amt = PRICE_C + PRICE_P + S_FASTMON + S_TEMP3 + SE_PRICE_C + SE_PRICE_P + SE_BOHUN_C - ins_prep = S_Prep + S_S_PHOL_0~3 + SUGA_ZE_PRICE - ins_drug = ((DRUG_T1+T2+T3 + SE_PRICE_C + SE_BOHUN_C) // 10) * 10 (10원 절사) - nonins_prep= S_FASTMON - Drug_T4 (S_FASTMON>0 일 때) - nonins_drug= Drug_T4 - claim_amt = PRICE_C + SE_PRICE_P * 0.25 (선별급여 보험부담 20%) 5. 보훈(PreGubun='4') + GITA_GUBUN ∈ {1,2,3,4} → '4_1'~'4_4' 세분화 키 6. 자동차보험(PreGubun='3') 할증금액 car_surcharge = PRICE_T - (ins_drug + ins_prep) 7. 비급여 마진 = Drug_T4 - 입고총액 (PS_SUB_PHARM × WH_sub/CD_GOODS 캐시) 8. 결제수단 분류 - card : Appr_Gubun IN ('1','5','9') - paper: nAPPROVAL_NUM 보유 (현금영수증 번호) - cash : 나머지 9. 시간가산 분류 - hd_add (Holiday IN '2','4') → saturday(공휴가산) - overtime (Holiday IN '3','4') → overtime (hd_add 없고 overtime인 경우) - normal (그 외) """ import pyodbc from datetime import date from config import PHARMIT3000_CONFIG as CFG def get_connection(): conn_str = ( f"DRIVER={{ODBC Driver 17 for SQL Server}};" f"SERVER={CFG['server']};" f"DATABASE={CFG['database']};" f"UID={CFG['username']};" f"PWD={CFG['password']};" f"TrustServerCertificate=yes;" f"Connection Timeout=30;" ) return pyodbc.connect(conn_str) def query(sql, params=None): """쿼리 실행 후 dict 리스트 반환.""" conn = get_connection() try: cursor = conn.cursor() cursor.execute(sql, params or ()) columns = [col[0] for col in cursor.description] rows = [dict(zip(columns, row)) for row in cursor.fetchall()] return rows finally: conn.close() # ─── 비급여 마진 캐시 ────────────────────────────────────────── def _build_margin_cache(rows) -> dict: """비급여 처방들의 입고총액을 계산하여 캐시로 반환. 마진 = Drug_T4 - cost_cache[serial] QT-POS sales_stats_dialog.py::_build_margin_cache 동일 로직. - PS_SUB_PHARM.UnitCode != 1 (비급여 약품)만 - 입고가 우선순위: WH_sub 최근 입고가 → CD_GOODS.Price (최근 입고가가 판매가의 5배 이상이면 데이터 오류로 보고 CD_GOODS.Price 사용) - SQL Server 파라미터 제한(2100) 고려하여 500건 청크 처리 """ noncov_serials = [r['PreSerial'] for r in rows if (r.get('Drug_T4') or 0) > 0] if not noncov_serials: return {} CHUNK_SIZE = 500 sub_rows = [] for i in range(0, len(noncov_serials), CHUNK_SIZE): chunk = noncov_serials[i:i + CHUNK_SIZE] placeholders = ','.join(['?'] * len(chunk)) sub_sql = f""" SELECT s.PreSerial, s.DrugCode, s.USERPRICE, s.QUAN, s.QUAN_TIME, s.Days FROM PM_PRES..PS_SUB_PHARM s WHERE s.PreSerial IN ({placeholders}) AND s.UnitCode != 1 """ sub_rows.extend(query(sub_sql, tuple(chunk))) drug_codes = list({r['DrugCode'] for r in sub_rows if r.get('DrugCode')}) if not drug_codes: return {} cost_rows = [] for i in range(0, len(drug_codes), CHUNK_SIZE): chunk = drug_codes[i:i + CHUNK_SIZE] placeholders = ','.join(['?'] * len(chunk)) cost_sql = f""" SELECT g.DrugCode, g.Price AS GoodsPrice, (SELECT TOP 1 WH_MY_unit_a FROM PM_DRUG..WH_sub w WHERE w.DrugCode = g.DrugCode ORDER BY w.WH_DT_appl DESC) AS LastCost FROM PM_DRUG..CD_GOODS g WHERE g.DrugCode IN ({placeholders}) """ cost_rows.extend(query(cost_sql, tuple(chunk))) cost_map = {} for c in cost_rows: code = c['DrugCode'] last_cost = float(c.get('LastCost') or 0) goods_price = float(c.get('GoodsPrice') or 0) cost_map[code] = {'last_cost': last_cost, 'goods_price': goods_price} cost_cache: dict = {} for s in sub_rows: serial = s['PreSerial'] code = s['DrugCode'] user_price = float(s.get('USERPRICE') or 0) qty = float(s.get('QUAN') or 0) qty_time = float(s.get('QUAN_TIME') or 1) days = float(s.get('Days') or 1) info = cost_map.get(code, {}) last_cost = info.get('last_cost', 0) goods_price = info.get('goods_price', 0) if last_cost > user_price * 5 and goods_price > 0: cost_price = goods_price elif last_cost > 0: cost_price = last_cost else: cost_price = goods_price cost_cache[serial] = cost_cache.get(serial, 0) + cost_price * qty * qty_time * days return cost_cache # ─── 메인 통계 ──────────────────────────────────────────────── def get_sales_stats(date_from: str, date_to: str) -> dict: """매출 통계 조회. QT-POS sales_stats_dialog.py::_query() 이식. Returns: {total, by_gubun, by_age, by_time, by_pay, by_hosp} """ sql = """ SELECT m.PreSerial, m.PreGubun, m.PaNum, m.OrderName, m.Holiday, m.PRICE_T, m.Drug_T4, m.S_Prep, m.PRICE_C, m.PRICE_P, -- 수납금액 = PRICE_N + 선별급여/상한제초과 (m.PRICE_N + ISNULL(sub.SE_PRICE_P, 0)) AS PRICE_N, -- GPPOS 매출계산용 ISNULL(m.S_FASTMON, 0) AS S_FASTMON, ISNULL(sub.S_TEMP3, 0) AS S_TEMP3, ISNULL(sub.SE_PRICE_C, 0) AS SE_PRICE_C, ISNULL(sub.SE_PRICE_P, 0) AS SE_PRICE_P, ISNULL(sub.SE_BOHUN_C, 0) AS SE_BOHUN_C, -- GPPOS 급여약가 계산용 (DRUG_T1 + T2 + T3) ISNULL(sub.DRUG_T1, 0) AS DRUG_T1, ISNULL(sub.DRUG_T2, 0) AS DRUG_T2, ISNULL(sub.DRUG_T3, 0) AS DRUG_T3, -- 급여조제료 가산 (약국관리료) ISNULL(sub.S_S_PHOL_0, 0) AS S_S_PHOL_0, ISNULL(sub.S_S_PHOL_1, 0) AS S_S_PHOL_1, ISNULL(sub.S_S_PHOL_2, 0) AS S_S_PHOL_2, ISNULL(sub.S_S_PHOL_3, 0) AS S_S_PHOL_3, -- 명절/휴일 특수가산 ISNULL(sub.SUGA_ZE_PRICE, 0) AS SUGA_ZE_PRICE, -- 보훈 세분화용 (1=100%, 2=60%, 3=50%, 4=30%) ISNULL(bojo.GITA_GUBUN, '') AS GITA_GUBUN, ISNULL(n.Appr_Gubun, '') AS Appr_Gubun, ISNULL(n.nAPPROVAL_NUM, '') AS nAPPROVAL_NUM FROM PM_PRES..PS_MAIN m LEFT JOIN PM_PRES..PS_Main_Sub sub ON sub.PreSerial = m.PreSerial LEFT JOIN PM_PRES..PS_SUB_BOJO bojo ON bojo.PreSerial = m.PreSerial OUTER APPLY ( SELECT TOP 1 * FROM PM_PRES..CD_SUNAB WITH (NOLOCK) WHERE PRESERIAL = m.PreSerial ORDER BY CASE WHEN ISNULL(APPR_DATE,'') = '' THEN 1 ELSE 0 END, APPR_DATE DESC ) n WHERE m.INDATE BETWEEN ? AND ? """ rows = query(sql, (date_from, date_to)) cost_cache = _build_margin_cache(rows) ref_date = date( int(date_to[:4]), int(date_to[4:6]), int(date_to[6:8]), ) result = { 'total': _empty_row(), 'by_gubun': {}, 'by_age': {'old': _empty_row(), 'infant': _empty_row(), 'mid': _empty_row()}, 'by_time': {'overtime': _empty_row(), 'saturday': _empty_row(), 'normal': _empty_row()}, 'by_pay': {'card': _empty_row(), 'cash': _empty_row(), 'paper': _empty_row()}, 'by_hosp': {}, } for r in rows: gubun = str(r.get('PreGubun') or '0') gita_gubun = str(r.get('GITA_GUBUN') or '') # 보훈 세분화: PreGubun='4' + GITA_GUBUN ∈ {1,2,3,4} if gubun == '4' and gita_gubun in ('1', '2', '3', '4'): gubun = f'4_{gita_gubun}' holiday = str(r.get('Holiday') or '1') price_t = int(r.get('PRICE_T') or 0) drug_t4 = int(r.get('Drug_T4') or 0) s_prep = int(r.get('S_Prep') or 0) price_c = int(r.get('PRICE_C') or 0) price_p = int(r.get('PRICE_P') or 0) price_n = int(r.get('PRICE_N') or 0) s_fastmon = int(r.get('S_FASTMON') or 0) s_temp3 = int(r.get('S_TEMP3') or 0) se_price_c = int(r.get('SE_PRICE_C') or 0) se_price_p = int(r.get('SE_PRICE_P') or 0) se_bohun_c = int(r.get('SE_BOHUN_C') or 0) drug_t1 = int(r.get('DRUG_T1') or 0) drug_t2 = int(r.get('DRUG_T2') or 0) drug_t3 = int(r.get('DRUG_T3') or 0) s_s_phol = (int(r.get('S_S_PHOL_0') or 0) + int(r.get('S_S_PHOL_1') or 0) + int(r.get('S_S_PHOL_2') or 0) + int(r.get('S_S_PHOL_3') or 0)) suga_ze_price = int(r.get('SUGA_ZE_PRICE') or 0) appr_gubun = str(r.get('Appr_Gubun') or '') nappr_num = str(r.get('nAPPROVAL_NUM') or '').strip() # ─── GPPOS 공식 ─────────────────────────────────────── sales_amt = price_c + price_p + s_fastmon + s_temp3 + se_price_c + se_price_p + se_bohun_c calc_nonins_prep = s_fastmon - drug_t4 if s_fastmon > 0 else 0 pre_serial = r.get('PreSerial') cost_total = cost_cache.get(pre_serial, 0) nonins_margin = int(drug_t4 - cost_total) if drug_t4 > 0 else 0 ins_prep = s_prep + s_s_phol + suga_ze_price ins_drug = ((drug_t1 + drug_t2 + drug_t3 + se_price_c + se_bohun_c) // 10) * 10 nonins_prep, nonins_drug = calc_nonins_prep, drug_t4 # 청구액 = PRICE_C + SE_PRICE_P * 0.25 (선별급여 보험부담 20% — SE_PRICE_P는 80% 본인부담) claim_amt = price_c + int(se_price_p * 0.25) # 자동차보험 할증금액 if gubun == '3': base_total = ins_drug + ins_prep car_surcharge = price_t - base_total if price_t > base_total else 0 else: car_surcharge = 0 args = (sales_amt, ins_prep, ins_drug, nonins_prep, nonins_drug, se_price_p, nonins_margin, claim_amt, price_p, price_n, car_surcharge) # 전체 _add(result['total'], *args) # 보험별 if gubun not in result['by_gubun']: result['by_gubun'][gubun] = _empty_row() _add(result['by_gubun'][gubun], *args) # 연령가산별 age = _calc_age(str(r.get('PaNum') or ''), ref_date) if age is None or (6 <= age < 65): _add(result['by_age']['mid'], *args) elif age >= 65: _add(result['by_age']['old'], *args) else: _add(result['by_age']['infant'], *args) # 시간가산별 (Unit_Statistics.pas 기준) hd_add = holiday in ('2', '4') overtime = holiday in ('3', '4') if hd_add: _add(result['by_time']['saturday'], *args) elif overtime: _add(result['by_time']['overtime'], *args) else: _add(result['by_time']['normal'], *args) # 결제수단별 if appr_gubun in ('1', '5', '9'): _add(result['by_pay']['card'], *args) elif nappr_num: _add(result['by_pay']['paper'], *args) else: _add(result['by_pay']['cash'], *args) # 병원별 hosp = str(r.get('OrderName') or '').strip() or '(미상)' if hosp not in result['by_hosp']: result['by_hosp'][hosp] = _empty_row() _add(result['by_hosp'][hosp], *args) return result # ─── 유틸 ───────────────────────────────────────────────────── def _empty_row() -> dict: return dict(cnt=0, sales_amt=0, ins_prep=0, ins_drug=0, nonins_prep=0, nonins_drug=0, se_price_p=0, nonins_margin=0, claim_amt=0, copay=0, receipt=0, car_surcharge=0) def _add(d: dict, sales_amt, ins_prep, ins_drug, nonins_prep, nonins_drug, se_price_p, nonins_margin, claim_amt, copay, receipt, car_surcharge=0): d['cnt'] += 1 d['sales_amt'] += sales_amt d['ins_prep'] += ins_prep d['ins_drug'] += ins_drug d['nonins_prep'] += nonins_prep d['nonins_drug'] += nonins_drug d['se_price_p'] += se_price_p d['nonins_margin'] += nonins_margin d['claim_amt'] += claim_amt d['copay'] += copay d['receipt'] += receipt d['car_surcharge'] += car_surcharge def _calc_age(panum: str, ref_date: date): """주민번호 → 나이. 실패 시 None.""" if not panum or len(panum) < 7: return None birth6 = panum[:6] gender = panum[6] if not birth6.isdigit(): return None yy, mm, dd = int(birth6[:2]), int(birth6[2:4]), int(birth6[4:6]) if gender in ('1', '2'): year = 1900 + yy elif gender in ('3', '4'): year = 2000 + yy else: year = 1900 + yy try: bday = date(year, mm, dd) return ref_date.year - bday.year - ( (ref_date.month, ref_date.day) < (bday.month, bday.day)) except ValueError: return None