Files
pharmacy-stats-api/queries/v2_pmplus20.py
root 9836d2bdab fix: V2 비급여 매출금액 계산 수정 - EXE_PRICE 사용
비급여(MPRE_TYPE='9') 처방의 매출금액:
- V1: PRICE_T에 비급여 총액이 들어감
- V2: TOT_PRICE는 조제료만, EXE_PRICE에 비급여 총액

수정:
- PRICE_T = CASE WHEN MPRE_TYPE='9' THEN EXE_PRICE ELSE TOT_PRICE END

검증 (2026년 3월):
- V1 비급여 매출: 3,804,250
- V2 비급여 매출: 3,804,250
- 차이: 0 
2026-04-01 15:34:56 +00:00

240 lines
8.0 KiB
Python

# -*- coding: utf-8 -*-
"""
PMPLUS20 통계 쿼리 (v2)
테이블 매핑 (PharmIT3000 → PMPLUS20):
- PM_PRES.PS_MAIN → PM_MAIN.TBSID040_03 (처방 헤더)
- PM_PRES.CD_SUNAB → PM_MAIN.TBSIR000_01 (수납/결제, DRUG_SEQ로 JOIN)
컬럼 매핑:
- PreSerial → DRUG_SEQ
- INDATE → SUNAB_DT
- PRICE_T → TOT_PRICE, PRICE_C → INS_PRICE, PRICE_P → EXE_PRICE, PRICE_N → REAL_PRICE
- S_Prep → INS_PREP_PRICE
- Drug_T4 → NON_DRUG_PRICE + EXP_EXE_PRICE
- PreGubun → MPRE_TYPE + MPRE_TYPE_GUBUN 조합 (아래 매핑 참고)
- Holiday → HD_ADD + PRES_TIME_GUBUN 조합
- Appr_Gubun → APPR_GUBUN (값 동일)
- nAPPROVAL_NUM → CARD_ADM_NO
보험구분 매핑 (V1 PreGubun ↔ V2):
- '0' ← MPRE_TYPE='0' (건강보험)
- '1' ← MPRE_TYPE='1' (의료급여1종)
- '2' ← MPRE_TYPE='2' (산재)
- '3' ← MPRE_TYPE='3' (의료급여2종)
- '7' ← MPRE_TYPE='4' + MPRE_TYPE_GUBUN='C' (차상위1)
- 'E' ← MPRE_TYPE='4' + MPRE_TYPE_GUBUN='E' (보훈)
- 'F' ← MPRE_TYPE='4' + MPRE_TYPE_GUBUN='F' (차상위2)
- '9' ← MPRE_TYPE='9' (비급여)
주의:
- PRES_GUBUN='E' (재고보정) 레코드 제외 필요
- MPRE_TYPE='4'는 특수보험 그룹, MPRE_TYPE_GUBUN으로 세부 구분
"""
import pyodbc
from datetime import date
from config import PMPLUS20_CONFIG as CFG
def get_connection():
conn_str = (
f"DRIVER={{ODBC Driver 17 for SQL Server}};"
f"SERVER={CFG['server']};"
f"DATABASE={CFG['database']};"
f"UID={CFG['username']};"
f"PWD={CFG['password']};"
f"TrustServerCertificate=yes;"
f"Connection Timeout=30;"
)
return pyodbc.connect(conn_str)
def query(sql, params=None):
"""쿼리 실행 후 dict 리스트 반환"""
conn = get_connection()
cursor = conn.cursor()
cursor.execute(sql, params or ())
columns = [col[0] for col in cursor.description]
rows = [dict(zip(columns, row)) for row in cursor.fetchall()]
conn.close()
return rows
def get_sales_stats(date_from: str, date_to: str) -> dict:
"""
매출 통계 조회 (PMPLUS20 버전)
Args:
date_from: 시작일 (YYYYMMDD)
date_to: 종료일 (YYYYMMDD)
Returns:
dict: {total, by_gubun, by_age, by_time, by_pay, by_hosp}
Note:
TBSID040_03(처방헤더) + TBSIR000_01(수납) 매핑 확정 (2026-04-01)
"""
sql = """
SELECT
m.DRUG_SEQ AS PreSerial,
CASE
WHEN m.MPRE_TYPE = '4' AND m.MPRE_TYPE_GUBUN = 'E' THEN 'E' -- 보훈
WHEN m.MPRE_TYPE = '4' AND m.MPRE_TYPE_GUBUN = 'F' THEN 'F' -- 차상위2
WHEN m.MPRE_TYPE = '4' AND m.MPRE_TYPE_GUBUN = 'C' THEN '7' -- 차상위1
ELSE ISNULL(m.MPRE_TYPE, '0')
END AS PreGubun,
m.PAT_JUMIN_NO AS PaNum,
m.YOYANG_NM AS OrderName,
CASE
WHEN m.HD_ADD = '2' AND m.PRES_TIME_GUBUN = '1' THEN '4'
WHEN m.HD_ADD = '2' THEN '2'
WHEN m.PRES_TIME_GUBUN = '1' THEN '3'
ELSE '1'
END AS Holiday,
-- 비급여(MPRE_TYPE='9')는 TOT_PRICE가 아닌 EXE_PRICE가 매출금액
CASE
WHEN m.MPRE_TYPE = '9' THEN m.EXE_PRICE
ELSE m.TOT_PRICE
END AS PRICE_T,
(ISNULL(m.NON_DRUG_PRICE, 0) + ISNULL(m.EXP_EXE_PRICE, 0)) AS Drug_T4,
m.INS_PREP_PRICE AS S_Prep,
m.INS_PRICE AS PRICE_C,
m.EXE_PRICE AS PRICE_P,
m.REAL_PRICE AS PRICE_N,
ISNULL(n.APPR_GUBUN, '') AS Appr_Gubun,
ISNULL(n.CARD_ADM_NO, '') AS nAPPROVAL_NUM
FROM PM_MAIN..TBSID040_03 m
LEFT JOIN PM_MAIN..TBSIR000_01 n ON n.DRUG_SEQ = m.DRUG_SEQ
WHERE m.SUNAB_DT BETWEEN ? AND ?
AND m.PRES_GUBUN != 'E'
"""
try:
rows = query(sql, (date_from, date_to))
except Exception as e:
return {
'error': str(e),
'note': 'PMPLUS20 쿼리 오류. TBSID040_03/TBSIR000_01 테이블 확인 필요.'
}
ref_date = date(
int(date_to[:4]),
int(date_to[4:6]),
int(date_to[6:8]),
)
# 집계
result = {
'total': _empty_row(),
'by_gubun': {},
'by_age': {'old': _empty_row(), 'infant': _empty_row(), 'mid': _empty_row()},
'by_time': {'overtime': _empty_row(), 'saturday': _empty_row(), 'normal': _empty_row()},
'by_pay': {'card': _empty_row(), 'cash': _empty_row(), 'paper': _empty_row()},
'by_hosp': {},
}
for r in rows:
gubun = str(r.get('PreGubun') or '0')
holiday = str(r.get('Holiday') or '1')
price_t = int(r.get('PRICE_T') or 0)
drug_t4 = int(r.get('Drug_T4') or 0)
s_prep = int(r.get('S_Prep') or 0)
price_c = int(r.get('PRICE_C') or 0)
price_p = int(r.get('PRICE_P') or 0)
price_n = int(r.get('PRICE_N') or 0)
appr_gubun = str(r.get('Appr_Gubun') or '')
order_name = str(r.get('OrderName') or '기타')
panum = str(r.get('PaNum') or '')
sales_amt = price_t + drug_t4
if gubun == '9':
ins_prep, ins_drug = 0, 0
nonins_prep = max(0, price_n - drug_t4 - price_p) if drug_t4 > 0 else 0
nonins_drug = drug_t4
else:
ins_prep, ins_drug = s_prep, price_t - s_prep
nonins_prep = max(0, price_n - drug_t4 - price_p) if drug_t4 > 0 else 0
nonins_drug = drug_t4
args = (sales_amt, ins_prep, ins_drug, nonins_prep, nonins_drug, 0, price_c, price_p, price_n)
_add(result['total'], *args)
if gubun not in result['by_gubun']:
result['by_gubun'][gubun] = _empty_row()
_add(result['by_gubun'][gubun], *args)
age = _calc_age(panum, ref_date)
if age is None or (6 <= age < 65):
_add(result['by_age']['mid'], *args)
elif age >= 65:
_add(result['by_age']['old'], *args)
else:
_add(result['by_age']['infant'], *args)
hd_add = holiday in ('2', '4')
overtime = holiday in ('3', '4')
if hd_add:
_add(result['by_time']['saturday'], *args)
elif overtime:
_add(result['by_time']['overtime'], *args)
else:
_add(result['by_time']['normal'], *args)
if appr_gubun == '1':
_add(result['by_pay']['card'], *args)
elif appr_gubun == '2':
_add(result['by_pay']['cash'], *args)
else:
_add(result['by_pay']['paper'], *args)
if order_name not in result['by_hosp']:
result['by_hosp'][order_name] = _empty_row()
_add(result['by_hosp'][order_name], *args)
return result
def _empty_row():
return dict(cnt=0, sales_amt=0, ins_prep=0, ins_drug=0,
nonins_prep=0, nonins_drug=0, nonins_margin=0,
claim_amt=0, copay=0, receipt=0)
def _add(d, sales_amt, ins_prep, ins_drug, nonins_prep, nonins_drug,
nonins_margin, claim_amt, copay, receipt):
d['cnt'] += 1
d['sales_amt'] += sales_amt
d['ins_prep'] += ins_prep
d['ins_drug'] += ins_drug
d['nonins_prep'] += nonins_prep
d['nonins_drug'] += nonins_drug
d['nonins_margin'] += nonins_margin
d['claim_amt'] += claim_amt
d['copay'] += copay
d['receipt'] += receipt
def _calc_age(panum: str, ref_date: date) -> int | None:
"""주민번호 → 나이"""
if not panum or len(panum) < 7:
return None
birth6 = panum[:6]
gender = panum[6]
if not birth6.isdigit():
return None
yy, mm, dd = int(birth6[:2]), int(birth6[2:4]), int(birth6[4:6])
if gender in ('1', '2'):
year = 1900 + yy
elif gender in ('3', '4'):
year = 2000 + yy
else:
year = 1900 + yy
try:
bday = date(year, mm, dd)
return ref_date.year - bday.year - (
(ref_date.month, ref_date.day) < (bday.month, bday.day))
except ValueError:
return None