V2 PMPLUS20에서 MPRE_TYPE='4'는 특수보험 그룹: - MPRE_TYPE_GUBUN='E' → 보훈 (V1: 'E') - MPRE_TYPE_GUBUN='F' → 차상위2 (V1: 'F') - MPRE_TYPE_GUBUN='C' → 차상위1 (V1: '7') 기존 코드는 MPRE_TYPE_GUBUN='F'만 체크해서 차상위2만 인식하고 보훈과 차상위1이 누락되는 문제가 있었음
236 lines
7.8 KiB
Python
236 lines
7.8 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
PMPLUS20 통계 쿼리 (v2)
|
|
|
|
테이블 매핑 (PharmIT3000 → PMPLUS20):
|
|
- PM_PRES.PS_MAIN → PM_MAIN.TBSID040_03 (처방 헤더)
|
|
- PM_PRES.CD_SUNAB → PM_MAIN.TBSIR000_01 (수납/결제, DRUG_SEQ로 JOIN)
|
|
|
|
컬럼 매핑:
|
|
- PreSerial → DRUG_SEQ
|
|
- INDATE → SUNAB_DT
|
|
- PRICE_T → TOT_PRICE, PRICE_C → INS_PRICE, PRICE_P → EXE_PRICE, PRICE_N → REAL_PRICE
|
|
- S_Prep → INS_PREP_PRICE
|
|
- Drug_T4 → NON_DRUG_PRICE + EXP_EXE_PRICE
|
|
- PreGubun → MPRE_TYPE + MPRE_TYPE_GUBUN 조합 (아래 매핑 참고)
|
|
- Holiday → HD_ADD + PRES_TIME_GUBUN 조합
|
|
- Appr_Gubun → APPR_GUBUN (값 동일)
|
|
- nAPPROVAL_NUM → CARD_ADM_NO
|
|
|
|
보험구분 매핑 (V1 PreGubun ↔ V2):
|
|
- '0' ← MPRE_TYPE='0' (건강보험)
|
|
- '1' ← MPRE_TYPE='1' (의료급여1종)
|
|
- '2' ← MPRE_TYPE='2' (산재)
|
|
- '3' ← MPRE_TYPE='3' (의료급여2종)
|
|
- '7' ← MPRE_TYPE='4' + MPRE_TYPE_GUBUN='C' (차상위1)
|
|
- 'E' ← MPRE_TYPE='4' + MPRE_TYPE_GUBUN='E' (보훈)
|
|
- 'F' ← MPRE_TYPE='4' + MPRE_TYPE_GUBUN='F' (차상위2)
|
|
- '9' ← MPRE_TYPE='9' (비급여)
|
|
|
|
주의:
|
|
- PRES_GUBUN='E' (재고보정) 레코드 제외 필요
|
|
- MPRE_TYPE='4'는 특수보험 그룹, MPRE_TYPE_GUBUN으로 세부 구분
|
|
"""
|
|
import pyodbc
|
|
from datetime import date
|
|
from config import PMPLUS20_CONFIG as CFG
|
|
|
|
|
|
def get_connection():
|
|
conn_str = (
|
|
f"DRIVER={{ODBC Driver 17 for SQL Server}};"
|
|
f"SERVER={CFG['server']};"
|
|
f"DATABASE={CFG['database']};"
|
|
f"UID={CFG['username']};"
|
|
f"PWD={CFG['password']};"
|
|
f"TrustServerCertificate=yes;"
|
|
f"Connection Timeout=30;"
|
|
)
|
|
return pyodbc.connect(conn_str)
|
|
|
|
|
|
def query(sql, params=None):
|
|
"""쿼리 실행 후 dict 리스트 반환"""
|
|
conn = get_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute(sql, params or ())
|
|
columns = [col[0] for col in cursor.description]
|
|
rows = [dict(zip(columns, row)) for row in cursor.fetchall()]
|
|
conn.close()
|
|
return rows
|
|
|
|
|
|
def get_sales_stats(date_from: str, date_to: str) -> dict:
|
|
"""
|
|
매출 통계 조회 (PMPLUS20 버전)
|
|
|
|
Args:
|
|
date_from: 시작일 (YYYYMMDD)
|
|
date_to: 종료일 (YYYYMMDD)
|
|
|
|
Returns:
|
|
dict: {total, by_gubun, by_age, by_time, by_pay, by_hosp}
|
|
|
|
Note:
|
|
TBSID040_03(처방헤더) + TBSIR000_01(수납) 매핑 확정 (2026-04-01)
|
|
"""
|
|
sql = """
|
|
SELECT
|
|
m.DRUG_SEQ AS PreSerial,
|
|
CASE
|
|
WHEN m.MPRE_TYPE = '4' AND m.MPRE_TYPE_GUBUN = 'E' THEN 'E' -- 보훈
|
|
WHEN m.MPRE_TYPE = '4' AND m.MPRE_TYPE_GUBUN = 'F' THEN 'F' -- 차상위2
|
|
WHEN m.MPRE_TYPE = '4' AND m.MPRE_TYPE_GUBUN = 'C' THEN '7' -- 차상위1
|
|
ELSE ISNULL(m.MPRE_TYPE, '0')
|
|
END AS PreGubun,
|
|
m.PAT_JUMIN_NO AS PaNum,
|
|
m.YOYANG_NM AS OrderName,
|
|
CASE
|
|
WHEN m.HD_ADD = '2' AND m.PRES_TIME_GUBUN = '1' THEN '4'
|
|
WHEN m.HD_ADD = '2' THEN '2'
|
|
WHEN m.PRES_TIME_GUBUN = '1' THEN '3'
|
|
ELSE '1'
|
|
END AS Holiday,
|
|
m.TOT_PRICE AS PRICE_T,
|
|
(ISNULL(m.NON_DRUG_PRICE, 0) + ISNULL(m.EXP_EXE_PRICE, 0)) AS Drug_T4,
|
|
m.INS_PREP_PRICE AS S_Prep,
|
|
m.INS_PRICE AS PRICE_C,
|
|
m.EXE_PRICE AS PRICE_P,
|
|
m.REAL_PRICE AS PRICE_N,
|
|
ISNULL(n.APPR_GUBUN, '') AS Appr_Gubun,
|
|
ISNULL(n.CARD_ADM_NO, '') AS nAPPROVAL_NUM
|
|
FROM PM_MAIN..TBSID040_03 m
|
|
LEFT JOIN PM_MAIN..TBSIR000_01 n ON n.DRUG_SEQ = m.DRUG_SEQ
|
|
WHERE m.SUNAB_DT BETWEEN ? AND ?
|
|
AND m.PRES_GUBUN != 'E'
|
|
"""
|
|
|
|
try:
|
|
rows = query(sql, (date_from, date_to))
|
|
except Exception as e:
|
|
return {
|
|
'error': str(e),
|
|
'note': 'PMPLUS20 쿼리 오류. TBSID040_03/TBSIR000_01 테이블 확인 필요.'
|
|
}
|
|
|
|
ref_date = date(
|
|
int(date_to[:4]),
|
|
int(date_to[4:6]),
|
|
int(date_to[6:8]),
|
|
)
|
|
|
|
# 집계
|
|
result = {
|
|
'total': _empty_row(),
|
|
'by_gubun': {},
|
|
'by_age': {'old': _empty_row(), 'infant': _empty_row(), 'mid': _empty_row()},
|
|
'by_time': {'overtime': _empty_row(), 'saturday': _empty_row(), 'normal': _empty_row()},
|
|
'by_pay': {'card': _empty_row(), 'cash': _empty_row(), 'paper': _empty_row()},
|
|
'by_hosp': {},
|
|
}
|
|
|
|
for r in rows:
|
|
gubun = str(r.get('PreGubun') or '0')
|
|
holiday = str(r.get('Holiday') or '1')
|
|
price_t = int(r.get('PRICE_T') or 0)
|
|
drug_t4 = int(r.get('Drug_T4') or 0)
|
|
s_prep = int(r.get('S_Prep') or 0)
|
|
price_c = int(r.get('PRICE_C') or 0)
|
|
price_p = int(r.get('PRICE_P') or 0)
|
|
price_n = int(r.get('PRICE_N') or 0)
|
|
appr_gubun = str(r.get('Appr_Gubun') or '')
|
|
order_name = str(r.get('OrderName') or '기타')
|
|
panum = str(r.get('PaNum') or '')
|
|
|
|
sales_amt = price_t + drug_t4
|
|
|
|
if gubun == '9':
|
|
ins_prep, ins_drug = 0, 0
|
|
nonins_prep = max(0, price_n - drug_t4 - price_p) if drug_t4 > 0 else 0
|
|
nonins_drug = drug_t4
|
|
else:
|
|
ins_prep, ins_drug = s_prep, price_t - s_prep
|
|
nonins_prep = max(0, price_n - drug_t4 - price_p) if drug_t4 > 0 else 0
|
|
nonins_drug = drug_t4
|
|
|
|
args = (sales_amt, ins_prep, ins_drug, nonins_prep, nonins_drug, 0, price_c, price_p, price_n)
|
|
|
|
_add(result['total'], *args)
|
|
|
|
if gubun not in result['by_gubun']:
|
|
result['by_gubun'][gubun] = _empty_row()
|
|
_add(result['by_gubun'][gubun], *args)
|
|
|
|
age = _calc_age(panum, ref_date)
|
|
if age is None or (6 <= age < 65):
|
|
_add(result['by_age']['mid'], *args)
|
|
elif age >= 65:
|
|
_add(result['by_age']['old'], *args)
|
|
else:
|
|
_add(result['by_age']['infant'], *args)
|
|
|
|
hd_add = holiday in ('2', '4')
|
|
overtime = holiday in ('3', '4')
|
|
if hd_add:
|
|
_add(result['by_time']['saturday'], *args)
|
|
elif overtime:
|
|
_add(result['by_time']['overtime'], *args)
|
|
else:
|
|
_add(result['by_time']['normal'], *args)
|
|
|
|
if appr_gubun == '1':
|
|
_add(result['by_pay']['card'], *args)
|
|
elif appr_gubun == '2':
|
|
_add(result['by_pay']['cash'], *args)
|
|
else:
|
|
_add(result['by_pay']['paper'], *args)
|
|
|
|
if order_name not in result['by_hosp']:
|
|
result['by_hosp'][order_name] = _empty_row()
|
|
_add(result['by_hosp'][order_name], *args)
|
|
|
|
return result
|
|
|
|
|
|
def _empty_row():
|
|
return dict(cnt=0, sales_amt=0, ins_prep=0, ins_drug=0,
|
|
nonins_prep=0, nonins_drug=0, nonins_margin=0,
|
|
claim_amt=0, copay=0, receipt=0)
|
|
|
|
|
|
def _add(d, sales_amt, ins_prep, ins_drug, nonins_prep, nonins_drug,
|
|
nonins_margin, claim_amt, copay, receipt):
|
|
d['cnt'] += 1
|
|
d['sales_amt'] += sales_amt
|
|
d['ins_prep'] += ins_prep
|
|
d['ins_drug'] += ins_drug
|
|
d['nonins_prep'] += nonins_prep
|
|
d['nonins_drug'] += nonins_drug
|
|
d['nonins_margin'] += nonins_margin
|
|
d['claim_amt'] += claim_amt
|
|
d['copay'] += copay
|
|
d['receipt'] += receipt
|
|
|
|
|
|
def _calc_age(panum: str, ref_date: date) -> int | None:
|
|
"""주민번호 → 나이"""
|
|
if not panum or len(panum) < 7:
|
|
return None
|
|
birth6 = panum[:6]
|
|
gender = panum[6]
|
|
if not birth6.isdigit():
|
|
return None
|
|
yy, mm, dd = int(birth6[:2]), int(birth6[2:4]), int(birth6[4:6])
|
|
if gender in ('1', '2'):
|
|
year = 1900 + yy
|
|
elif gender in ('3', '4'):
|
|
year = 2000 + yy
|
|
else:
|
|
year = 1900 + yy
|
|
try:
|
|
bday = date(year, mm, dd)
|
|
return ref_date.year - bday.year - (
|
|
(ref_date.month, ref_date.day) < (bday.month, bday.day))
|
|
except ValueError:
|
|
return None
|