Files
pharmacy-stats-api/queries/v1_pharmit3000.py
청춘약국 ef23830a77 fix(v1): QT-POS sales_stats_dialog.py 와 1:1 동기화
pharmon-web/sales_stats_dialog.py 의 _query / _build_margin_cache
로직을 그대로 이식하여 QT-POS 와 stats-api 의 통계 수치 불일치
문제를 해결.

주요 변경:
- CD_SUNAB LEFT JOIN → OUTER APPLY TOP 1
  (1:N 곱증으로 행 중복되어 매출 이중계산되던 버그 제거)
- PS_SUB_BOJO JOIN 추가 (보훈 GITA_GUBUN 세분화 지원)
- SELECT 확장: S_FASTMON, S_TEMP3, SE_BOHUN_C, DRUG_T1~T3,
  S_S_PHOL_0~3, SUGA_ZE_PRICE, GITA_GUBUN
- GPPOS2 공식으로 재작성
  sales_amt = PRICE_C + PRICE_P + S_FASTMON + S_TEMP3
            + SE_PRICE_C + SE_PRICE_P + SE_BOHUN_C
- 보훈(PreGubun='4') + GITA_GUBUN ∈ {1,2,3,4} → '4_1'~'4_4'
- 자동차보험 할증금액, 선별급여 청구액 (SE_PRICE_P*0.25) 반영
- 비급여 마진 캐시 (PS_SUB_PHARM × WH_sub/CD_GOODS)
- config.py: DB 서버 192.168.0.201 → 192.168.0.69 (테스트 서버)

2026-04-08 1/3 스크린샷 기준 QT-POS 와 1원 단위까지 수치 일치 확인.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-08 20:53:46 +09:00

368 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
PharmIT3000 통계 쿼리 (v1)
**QT-POS sales_stats_dialog.py 와 완전 동기화된 버전** (2026-04-08 rewrite).
기준 파일: pharmon-web/sales_stats_dialog.py (_query / _build_margin_cache / _calc_age / _empty_row / _add)
변경 포인트 (구버전 대비):
1. CD_SUNAB LEFT JOIN → OUTER APPLY TOP 1 (1:N 곱증으로 행 중복 → 금액 이중계산되던 버그 제거)
2. PS_SUB_BOJO JOIN 추가 (보훈 세분화 GITA_GUBUN)
3. SELECT 컬럼 확장: S_FASTMON, S_TEMP3, SE_BOHUN_C, DRUG_T1~T3,
S_S_PHOL_0~3, SUGA_ZE_PRICE, GITA_GUBUN
4. 매출공식 GPPOS2 기준으로 재작성
- sales_amt = PRICE_C + PRICE_P + S_FASTMON + S_TEMP3 + SE_PRICE_C + SE_PRICE_P + SE_BOHUN_C
- ins_prep = S_Prep + S_S_PHOL_0~3 + SUGA_ZE_PRICE
- ins_drug = ((DRUG_T1+T2+T3 + SE_PRICE_C + SE_BOHUN_C) // 10) * 10 (10원 절사)
- nonins_prep= S_FASTMON - Drug_T4 (S_FASTMON>0 일 때)
- nonins_drug= Drug_T4
- claim_amt = PRICE_C + SE_PRICE_P * 0.25 (선별급여 보험부담 20%)
5. 보훈(PreGubun='4') + GITA_GUBUN ∈ {1,2,3,4} → '4_1'~'4_4' 세분화 키
6. 자동차보험(PreGubun='3') 할증금액 car_surcharge = PRICE_T - (ins_drug + ins_prep)
7. 비급여 마진 = Drug_T4 - 입고총액 (PS_SUB_PHARM × WH_sub/CD_GOODS 캐시)
8. 결제수단 분류
- card : Appr_Gubun IN ('1','5','9')
- paper: nAPPROVAL_NUM 보유 (현금영수증 번호)
- cash : 나머지
9. 시간가산 분류
- hd_add (Holiday IN '2','4') → saturday(공휴가산)
- overtime (Holiday IN '3','4') → overtime (hd_add 없고 overtime인 경우)
- normal (그 외)
"""
import pyodbc
from datetime import date
from config import PHARMIT3000_CONFIG as CFG
def get_connection():
conn_str = (
f"DRIVER={{ODBC Driver 17 for SQL Server}};"
f"SERVER={CFG['server']};"
f"DATABASE={CFG['database']};"
f"UID={CFG['username']};"
f"PWD={CFG['password']};"
f"TrustServerCertificate=yes;"
f"Connection Timeout=30;"
)
return pyodbc.connect(conn_str)
def query(sql, params=None):
"""쿼리 실행 후 dict 리스트 반환."""
conn = get_connection()
try:
cursor = conn.cursor()
cursor.execute(sql, params or ())
columns = [col[0] for col in cursor.description]
rows = [dict(zip(columns, row)) for row in cursor.fetchall()]
return rows
finally:
conn.close()
# ─── 비급여 마진 캐시 ──────────────────────────────────────────
def _build_margin_cache(rows) -> dict:
"""비급여 처방들의 입고총액을 계산하여 캐시로 반환.
마진 = Drug_T4 - cost_cache[serial]
QT-POS sales_stats_dialog.py::_build_margin_cache 동일 로직.
- PS_SUB_PHARM.UnitCode != 1 (비급여 약품)만
- 입고가 우선순위: WH_sub 최근 입고가 → CD_GOODS.Price
(최근 입고가가 판매가의 5배 이상이면 데이터 오류로 보고 CD_GOODS.Price 사용)
- SQL Server 파라미터 제한(2100) 고려하여 500건 청크 처리
"""
noncov_serials = [r['PreSerial'] for r in rows if (r.get('Drug_T4') or 0) > 0]
if not noncov_serials:
return {}
CHUNK_SIZE = 500
sub_rows = []
for i in range(0, len(noncov_serials), CHUNK_SIZE):
chunk = noncov_serials[i:i + CHUNK_SIZE]
placeholders = ','.join(['?'] * len(chunk))
sub_sql = f"""
SELECT s.PreSerial, s.DrugCode, s.USERPRICE, s.QUAN, s.QUAN_TIME, s.Days
FROM PM_PRES..PS_SUB_PHARM s
WHERE s.PreSerial IN ({placeholders})
AND s.UnitCode != 1
"""
sub_rows.extend(query(sub_sql, tuple(chunk)))
drug_codes = list({r['DrugCode'] for r in sub_rows if r.get('DrugCode')})
if not drug_codes:
return {}
cost_rows = []
for i in range(0, len(drug_codes), CHUNK_SIZE):
chunk = drug_codes[i:i + CHUNK_SIZE]
placeholders = ','.join(['?'] * len(chunk))
cost_sql = f"""
SELECT g.DrugCode, g.Price AS GoodsPrice,
(SELECT TOP 1 WH_MY_unit_a FROM PM_DRUG..WH_sub w
WHERE w.DrugCode = g.DrugCode
ORDER BY w.WH_DT_appl DESC) AS LastCost
FROM PM_DRUG..CD_GOODS g
WHERE g.DrugCode IN ({placeholders})
"""
cost_rows.extend(query(cost_sql, tuple(chunk)))
cost_map = {}
for c in cost_rows:
code = c['DrugCode']
last_cost = float(c.get('LastCost') or 0)
goods_price = float(c.get('GoodsPrice') or 0)
cost_map[code] = {'last_cost': last_cost, 'goods_price': goods_price}
cost_cache: dict = {}
for s in sub_rows:
serial = s['PreSerial']
code = s['DrugCode']
user_price = float(s.get('USERPRICE') or 0)
qty = float(s.get('QUAN') or 0)
qty_time = float(s.get('QUAN_TIME') or 1)
days = float(s.get('Days') or 1)
info = cost_map.get(code, {})
last_cost = info.get('last_cost', 0)
goods_price = info.get('goods_price', 0)
if last_cost > user_price * 5 and goods_price > 0:
cost_price = goods_price
elif last_cost > 0:
cost_price = last_cost
else:
cost_price = goods_price
cost_cache[serial] = cost_cache.get(serial, 0) + cost_price * qty * qty_time * days
return cost_cache
# ─── 메인 통계 ────────────────────────────────────────────────
def get_sales_stats(date_from: str, date_to: str) -> dict:
"""매출 통계 조회. QT-POS sales_stats_dialog.py::_query() 이식.
Returns:
{total, by_gubun, by_age, by_time, by_pay, by_hosp}
"""
sql = """
SELECT
m.PreSerial,
m.PreGubun,
m.PaNum,
m.OrderName,
m.Holiday,
m.PRICE_T,
m.Drug_T4,
m.S_Prep,
m.PRICE_C,
m.PRICE_P,
-- 수납금액 = PRICE_N + 선별급여/상한제초과
(m.PRICE_N + ISNULL(sub.SE_PRICE_P, 0)) AS PRICE_N,
-- GPPOS 매출계산용
ISNULL(m.S_FASTMON, 0) AS S_FASTMON,
ISNULL(sub.S_TEMP3, 0) AS S_TEMP3,
ISNULL(sub.SE_PRICE_C, 0) AS SE_PRICE_C,
ISNULL(sub.SE_PRICE_P, 0) AS SE_PRICE_P,
ISNULL(sub.SE_BOHUN_C, 0) AS SE_BOHUN_C,
-- GPPOS 급여약가 계산용 (DRUG_T1 + T2 + T3)
ISNULL(sub.DRUG_T1, 0) AS DRUG_T1,
ISNULL(sub.DRUG_T2, 0) AS DRUG_T2,
ISNULL(sub.DRUG_T3, 0) AS DRUG_T3,
-- 급여조제료 가산 (약국관리료)
ISNULL(sub.S_S_PHOL_0, 0) AS S_S_PHOL_0,
ISNULL(sub.S_S_PHOL_1, 0) AS S_S_PHOL_1,
ISNULL(sub.S_S_PHOL_2, 0) AS S_S_PHOL_2,
ISNULL(sub.S_S_PHOL_3, 0) AS S_S_PHOL_3,
-- 명절/휴일 특수가산
ISNULL(sub.SUGA_ZE_PRICE, 0) AS SUGA_ZE_PRICE,
-- 보훈 세분화용 (1=100%, 2=60%, 3=50%, 4=30%)
ISNULL(bojo.GITA_GUBUN, '') AS GITA_GUBUN,
ISNULL(n.Appr_Gubun, '') AS Appr_Gubun,
ISNULL(n.nAPPROVAL_NUM, '') AS nAPPROVAL_NUM
FROM PM_PRES..PS_MAIN m
LEFT JOIN PM_PRES..PS_Main_Sub sub ON sub.PreSerial = m.PreSerial
LEFT JOIN PM_PRES..PS_SUB_BOJO bojo ON bojo.PreSerial = m.PreSerial
OUTER APPLY (
SELECT TOP 1 *
FROM PM_PRES..CD_SUNAB WITH (NOLOCK)
WHERE PRESERIAL = m.PreSerial
ORDER BY CASE WHEN ISNULL(APPR_DATE,'') = '' THEN 1 ELSE 0 END,
APPR_DATE DESC
) n
WHERE m.INDATE BETWEEN ? AND ?
"""
rows = query(sql, (date_from, date_to))
cost_cache = _build_margin_cache(rows)
ref_date = date(
int(date_to[:4]),
int(date_to[4:6]),
int(date_to[6:8]),
)
result = {
'total': _empty_row(),
'by_gubun': {},
'by_age': {'old': _empty_row(), 'infant': _empty_row(), 'mid': _empty_row()},
'by_time': {'overtime': _empty_row(), 'saturday': _empty_row(), 'normal': _empty_row()},
'by_pay': {'card': _empty_row(), 'cash': _empty_row(), 'paper': _empty_row()},
'by_hosp': {},
}
for r in rows:
gubun = str(r.get('PreGubun') or '0')
gita_gubun = str(r.get('GITA_GUBUN') or '')
# 보훈 세분화: PreGubun='4' + GITA_GUBUN ∈ {1,2,3,4}
if gubun == '4' and gita_gubun in ('1', '2', '3', '4'):
gubun = f'4_{gita_gubun}'
holiday = str(r.get('Holiday') or '1')
price_t = int(r.get('PRICE_T') or 0)
drug_t4 = int(r.get('Drug_T4') or 0)
s_prep = int(r.get('S_Prep') or 0)
price_c = int(r.get('PRICE_C') or 0)
price_p = int(r.get('PRICE_P') or 0)
price_n = int(r.get('PRICE_N') or 0)
s_fastmon = int(r.get('S_FASTMON') or 0)
s_temp3 = int(r.get('S_TEMP3') or 0)
se_price_c = int(r.get('SE_PRICE_C') or 0)
se_price_p = int(r.get('SE_PRICE_P') or 0)
se_bohun_c = int(r.get('SE_BOHUN_C') or 0)
drug_t1 = int(r.get('DRUG_T1') or 0)
drug_t2 = int(r.get('DRUG_T2') or 0)
drug_t3 = int(r.get('DRUG_T3') or 0)
s_s_phol = (int(r.get('S_S_PHOL_0') or 0) + int(r.get('S_S_PHOL_1') or 0)
+ int(r.get('S_S_PHOL_2') or 0) + int(r.get('S_S_PHOL_3') or 0))
suga_ze_price = int(r.get('SUGA_ZE_PRICE') or 0)
appr_gubun = str(r.get('Appr_Gubun') or '')
nappr_num = str(r.get('nAPPROVAL_NUM') or '').strip()
# ─── GPPOS 공식 ───────────────────────────────────────
sales_amt = price_c + price_p + s_fastmon + s_temp3 + se_price_c + se_price_p + se_bohun_c
calc_nonins_prep = s_fastmon - drug_t4 if s_fastmon > 0 else 0
pre_serial = r.get('PreSerial')
cost_total = cost_cache.get(pre_serial, 0)
nonins_margin = int(drug_t4 - cost_total) if drug_t4 > 0 else 0
ins_prep = s_prep + s_s_phol + suga_ze_price
ins_drug = ((drug_t1 + drug_t2 + drug_t3 + se_price_c + se_bohun_c) // 10) * 10
nonins_prep, nonins_drug = calc_nonins_prep, drug_t4
# 청구액 = PRICE_C + SE_PRICE_P * 0.25 (선별급여 보험부담 20% — SE_PRICE_P는 80% 본인부담)
claim_amt = price_c + int(se_price_p * 0.25)
# 자동차보험 할증금액
if gubun == '3':
base_total = ins_drug + ins_prep
car_surcharge = price_t - base_total if price_t > base_total else 0
else:
car_surcharge = 0
args = (sales_amt, ins_prep, ins_drug,
nonins_prep, nonins_drug, se_price_p, nonins_margin,
claim_amt, price_p, price_n, car_surcharge)
# 전체
_add(result['total'], *args)
# 보험별
if gubun not in result['by_gubun']:
result['by_gubun'][gubun] = _empty_row()
_add(result['by_gubun'][gubun], *args)
# 연령가산별
age = _calc_age(str(r.get('PaNum') or ''), ref_date)
if age is None or (6 <= age < 65):
_add(result['by_age']['mid'], *args)
elif age >= 65:
_add(result['by_age']['old'], *args)
else:
_add(result['by_age']['infant'], *args)
# 시간가산별 (Unit_Statistics.pas 기준)
hd_add = holiday in ('2', '4')
overtime = holiday in ('3', '4')
if hd_add:
_add(result['by_time']['saturday'], *args)
elif overtime:
_add(result['by_time']['overtime'], *args)
else:
_add(result['by_time']['normal'], *args)
# 결제수단별
if appr_gubun in ('1', '5', '9'):
_add(result['by_pay']['card'], *args)
elif nappr_num:
_add(result['by_pay']['paper'], *args)
else:
_add(result['by_pay']['cash'], *args)
# 병원별
hosp = str(r.get('OrderName') or '').strip() or '(미상)'
if hosp not in result['by_hosp']:
result['by_hosp'][hosp] = _empty_row()
_add(result['by_hosp'][hosp], *args)
return result
# ─── 유틸 ─────────────────────────────────────────────────────
def _empty_row() -> dict:
return dict(cnt=0, sales_amt=0, ins_prep=0, ins_drug=0,
nonins_prep=0, nonins_drug=0, se_price_p=0, nonins_margin=0,
claim_amt=0, copay=0, receipt=0, car_surcharge=0)
def _add(d: dict, sales_amt, ins_prep, ins_drug,
nonins_prep, nonins_drug, se_price_p, nonins_margin,
claim_amt, copay, receipt, car_surcharge=0):
d['cnt'] += 1
d['sales_amt'] += sales_amt
d['ins_prep'] += ins_prep
d['ins_drug'] += ins_drug
d['nonins_prep'] += nonins_prep
d['nonins_drug'] += nonins_drug
d['se_price_p'] += se_price_p
d['nonins_margin'] += nonins_margin
d['claim_amt'] += claim_amt
d['copay'] += copay
d['receipt'] += receipt
d['car_surcharge'] += car_surcharge
def _calc_age(panum: str, ref_date: date):
"""주민번호 → 나이. 실패 시 None."""
if not panum or len(panum) < 7:
return None
birth6 = panum[:6]
gender = panum[6]
if not birth6.isdigit():
return None
yy, mm, dd = int(birth6[:2]), int(birth6[2:4]), int(birth6[4:6])
if gender in ('1', '2'):
year = 1900 + yy
elif gender in ('3', '4'):
year = 2000 + yy
else:
year = 1900 + yy
try:
bday = date(year, mm, dd)
return ref_date.year - bday.year - (
(ref_date.month, ref_date.day) < (bday.month, bday.day))
except ValueError:
return None