한퓨어 엑셀: - ExcelProcessor에 hanpure 형식 자동 감지 및 처리 추가 - 옵션항목에서 중량 파싱 (600g*5개 → 3000g 등) - 주문번호에서 입고일 추출, ingredient_code 직접 활용 조제 용도 구분: - compounds.usage_type 컬럼 추가 (SALE/SELF_USE/SAMPLE/DISPOSAL) - 조제 실행 시 용도 선택 드롭다운 - 조제 목록에서 용도 뱃지 클릭으로 사후 변경 가능 - 비판매 용도 시 sell_price_total=0, 매출 통계 제외 - PUT /api/compounds/:id/usage-type API 추가 - 용도 구분 설계 문서 (docs/조제_용도구분_usage_type.md) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
403 lines
14 KiB
Python
403 lines
14 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
Excel 파일 처리 모듈
|
||
한의사랑, 한의정보 형식 자동 감지 및 처리
|
||
"""
|
||
|
||
import pandas as pd
|
||
import numpy as np
|
||
from datetime import datetime
|
||
import re
|
||
|
||
class ExcelProcessor:
|
||
"""Excel 파일 형식별 처리 클래스"""
|
||
|
||
# 한의사랑 형식 컬럼 매핑
|
||
HANISARANG_MAPPING = {
|
||
'품목명': 'herb_name',
|
||
'제품코드': 'insurance_code',
|
||
'일그램당단가': 'unit_price',
|
||
'원산지': 'origin_country',
|
||
'적용일': 'receipt_date',
|
||
'총구입량': 'quantity',
|
||
'총구입단가': 'total_amount'
|
||
}
|
||
|
||
# 한의정보 형식 컬럼 매핑
|
||
HANINFO_MAPPING = {
|
||
'제품코드': 'insurance_code',
|
||
'업체명': 'supplier_name',
|
||
'약재명': 'herb_name',
|
||
'구입일자': 'receipt_date',
|
||
'구입량': 'quantity',
|
||
'구입액': 'total_amount',
|
||
'원산지': 'origin_country',
|
||
'비고': 'notes'
|
||
}
|
||
|
||
# 한퓨어 형식 컬럼 매핑
|
||
HANPURE_MAPPING = {
|
||
'상품명': 'herb_name',
|
||
'제조사코드': 'insurance_code',
|
||
'주성분코드': 'ingredient_code',
|
||
'제조사명': 'supplier_name',
|
||
'원산지': 'origin_country',
|
||
'소계': 'total_amount',
|
||
'옵션항목': 'option_detail',
|
||
'주문번호': 'order_number',
|
||
}
|
||
|
||
def __init__(self):
|
||
self.format_type = None
|
||
self.df_original = None
|
||
self.df_processed = None
|
||
|
||
def detect_format(self, df):
|
||
"""Excel 형식 자동 감지"""
|
||
columns = df.columns.tolist()
|
||
|
||
# 한퓨어 형식 체크 (주문번호, 주성분코드, 제조사코드, 옵션항목이 특징)
|
||
hanpure_cols = ['주문번호', '주성분코드', '제조사코드', '상품명', '옵션항목']
|
||
if all(col in columns for col in hanpure_cols):
|
||
return 'hanpure'
|
||
|
||
# 한의사랑 형식 체크
|
||
hanisarang_cols = ['품목명', '제품코드', '일그램당단가', '총구입량', '총구입단가']
|
||
if all(col in columns for col in hanisarang_cols):
|
||
return 'hanisarang'
|
||
|
||
# 한의정보 형식 체크
|
||
haninfo_cols = ['제품코드', '업체명', '약재명', '구입일자', '구입량', '구입액']
|
||
if all(col in columns for col in haninfo_cols):
|
||
return 'haninfo'
|
||
|
||
# 기본 형식 (제품코드가 있는 경우 한의정보로 간주)
|
||
if '제품코드' in columns and '약재명' in columns:
|
||
return 'haninfo'
|
||
|
||
return 'unknown'
|
||
|
||
def read_excel(self, file_path):
|
||
"""Excel 파일 읽기"""
|
||
try:
|
||
# 코드 컬럼을 문자열로 읽기 위한 dtype 설정
|
||
self.df_original = pd.read_excel(file_path, dtype={
|
||
'제품코드': str, '제조사코드': str, '주성분코드': str, '대표코드': str
|
||
})
|
||
self.format_type = self.detect_format(self.df_original)
|
||
return True
|
||
except Exception as e:
|
||
print(f"Excel 파일 읽기 실패: {str(e)}")
|
||
return False
|
||
|
||
def process_hanisarang(self):
|
||
"""한의사랑 형식 처리"""
|
||
df = self.df_original.copy()
|
||
|
||
# 컬럼 매핑
|
||
df_mapped = pd.DataFrame()
|
||
|
||
for old_col, new_col in self.HANISARANG_MAPPING.items():
|
||
if old_col in df.columns:
|
||
df_mapped[new_col] = df[old_col]
|
||
|
||
# 보험코드 9자리 패딩 처리
|
||
if 'insurance_code' in df_mapped.columns:
|
||
df_mapped['insurance_code'] = df_mapped['insurance_code'].apply(
|
||
lambda x: str(x).zfill(9) if pd.notna(x) and str(x).isdigit() else str(x) if pd.notna(x) else None
|
||
)
|
||
|
||
# 업체명 추가 (기본값)
|
||
df_mapped['supplier_name'] = '한의사랑'
|
||
|
||
# 날짜 처리
|
||
if 'receipt_date' in df_mapped.columns:
|
||
df_mapped['receipt_date'] = pd.to_datetime(
|
||
df_mapped['receipt_date'],
|
||
format='%Y-%m-%d',
|
||
errors='coerce'
|
||
).dt.strftime('%Y%m%d')
|
||
|
||
# 단가 계산 (이미 있지만 검증)
|
||
if 'unit_price' not in df_mapped.columns or df_mapped['unit_price'].isnull().all():
|
||
if 'total_amount' in df_mapped.columns and 'quantity' in df_mapped.columns:
|
||
df_mapped['unit_price'] = df_mapped['total_amount'] / df_mapped['quantity']
|
||
|
||
self.df_processed = df_mapped
|
||
return df_mapped
|
||
|
||
def process_haninfo(self):
|
||
"""한의정보 형식 처리"""
|
||
df = self.df_original.copy()
|
||
|
||
# 컬럼 매핑
|
||
df_mapped = pd.DataFrame()
|
||
|
||
for old_col, new_col in self.HANINFO_MAPPING.items():
|
||
if old_col in df.columns:
|
||
df_mapped[new_col] = df[old_col]
|
||
|
||
# 보험코드 9자리 패딩 처리
|
||
if 'insurance_code' in df_mapped.columns:
|
||
df_mapped['insurance_code'] = df_mapped['insurance_code'].apply(
|
||
lambda x: str(x).zfill(9) if pd.notna(x) and str(x).isdigit() else str(x) if pd.notna(x) else None
|
||
)
|
||
|
||
# 날짜 처리 (YYYYMMDD 형식)
|
||
if 'receipt_date' in df_mapped.columns:
|
||
df_mapped['receipt_date'] = df_mapped['receipt_date'].astype(str)
|
||
|
||
# 단가 계산
|
||
if 'total_amount' in df_mapped.columns and 'quantity' in df_mapped.columns:
|
||
df_mapped['unit_price'] = df_mapped['total_amount'] / df_mapped['quantity']
|
||
df_mapped['unit_price'] = df_mapped['unit_price'].round(2)
|
||
|
||
self.df_processed = df_mapped
|
||
return df_mapped
|
||
|
||
@staticmethod
|
||
def parse_option_quantity_g(option_text):
|
||
"""옵션항목에서 총 중량(g) 파싱
|
||
예: '인삼 특A (4~5년근) 600g*5개' → 3000
|
||
'감초 1kg' → 1000
|
||
'복령 500g' → 500
|
||
'백출 300g*3개' → 900
|
||
"""
|
||
if not option_text or pd.isna(option_text):
|
||
return None
|
||
|
||
text = str(option_text)
|
||
|
||
# 패턴1: NNNg*N개 또는 NNNg×N개
|
||
m = re.search(r'(\d+(?:\.\d+)?)\s*g\s*[*×x]\s*(\d+)', text, re.IGNORECASE)
|
||
if m:
|
||
return float(m.group(1)) * int(m.group(2))
|
||
|
||
# 패턴2: N kg*N개
|
||
m = re.search(r'(\d+(?:\.\d+)?)\s*kg\s*[*×x]\s*(\d+)', text, re.IGNORECASE)
|
||
if m:
|
||
return float(m.group(1)) * 1000 * int(m.group(2))
|
||
|
||
# 패턴3: NNNg (단독)
|
||
m = re.search(r'(\d+(?:\.\d+)?)\s*g(?!\w)', text, re.IGNORECASE)
|
||
if m:
|
||
return float(m.group(1))
|
||
|
||
# 패턴4: Nkg (단독)
|
||
m = re.search(r'(\d+(?:\.\d+)?)\s*kg(?!\w)', text, re.IGNORECASE)
|
||
if m:
|
||
return float(m.group(1)) * 1000
|
||
|
||
return None
|
||
|
||
def process_hanpure(self):
|
||
"""한퓨어 형식 처리"""
|
||
df = self.df_original.copy()
|
||
|
||
df_mapped = pd.DataFrame()
|
||
|
||
for old_col, new_col in self.HANPURE_MAPPING.items():
|
||
if old_col in df.columns:
|
||
df_mapped[new_col] = df[old_col]
|
||
|
||
# 보험코드 9자리 패딩 처리
|
||
if 'insurance_code' in df_mapped.columns:
|
||
df_mapped['insurance_code'] = df_mapped['insurance_code'].apply(
|
||
lambda x: str(x).zfill(9) if pd.notna(x) and str(x).strip().isdigit() else str(x).strip() if pd.notna(x) else None
|
||
)
|
||
|
||
# 주문번호에서 날짜 추출 (20260211-22511888 → 20260211)
|
||
if 'order_number' in df_mapped.columns:
|
||
df_mapped['receipt_date'] = df_mapped['order_number'].apply(
|
||
lambda x: str(x).split('-')[0] if pd.notna(x) else None
|
||
)
|
||
|
||
# 옵션항목에서 중량(g) 파싱
|
||
if 'option_detail' in df_mapped.columns:
|
||
df_mapped['quantity'] = df_mapped['option_detail'].apply(self.parse_option_quantity_g)
|
||
|
||
# 업체명 기본값
|
||
if 'supplier_name' not in df_mapped.columns or df_mapped['supplier_name'].isnull().all():
|
||
df_mapped['supplier_name'] = '한퓨어'
|
||
|
||
# 단가 계산 (소계 / 중량g)
|
||
if 'total_amount' in df_mapped.columns and 'quantity' in df_mapped.columns:
|
||
df_mapped['unit_price'] = df_mapped.apply(
|
||
lambda row: round(row['total_amount'] / row['quantity'], 2)
|
||
if pd.notna(row.get('quantity')) and row.get('quantity', 0) > 0
|
||
else None, axis=1
|
||
)
|
||
|
||
# 비고에 옵션항목 원문 저장
|
||
df_mapped['notes'] = df_mapped.get('option_detail', '')
|
||
|
||
# 임시 컬럼 제거
|
||
df_mapped.drop(columns=['order_number', 'option_detail'], errors='ignore', inplace=True)
|
||
|
||
self.df_processed = df_mapped
|
||
return df_mapped
|
||
|
||
def process(self):
|
||
"""형식에 따라 자동 처리"""
|
||
if self.format_type == 'hanisarang':
|
||
return self.process_hanisarang()
|
||
elif self.format_type == 'haninfo':
|
||
return self.process_haninfo()
|
||
elif self.format_type == 'hanpure':
|
||
return self.process_hanpure()
|
||
else:
|
||
raise ValueError(f"지원하지 않는 형식: {self.format_type}")
|
||
|
||
def validate_data(self):
|
||
"""처리된 데이터 검증"""
|
||
if self.df_processed is None:
|
||
return False, "처리된 데이터가 없습니다"
|
||
|
||
df = self.df_processed
|
||
|
||
# 필수 컬럼 확인
|
||
required_columns = ['herb_name', 'quantity', 'total_amount']
|
||
missing_cols = [col for col in required_columns if col not in df.columns]
|
||
|
||
if missing_cols:
|
||
return False, f"필수 컬럼 누락: {', '.join(missing_cols)}"
|
||
|
||
# 데이터 타입 검증
|
||
numeric_cols = ['quantity', 'total_amount', 'unit_price']
|
||
for col in numeric_cols:
|
||
if col in df.columns:
|
||
try:
|
||
df[col] = pd.to_numeric(df[col], errors='coerce')
|
||
except:
|
||
return False, f"{col} 컬럼이 숫자 형식이 아닙니다"
|
||
|
||
# NULL 값 확인
|
||
null_check = df[required_columns].isnull().sum()
|
||
if null_check.sum() > 0:
|
||
null_cols = null_check[null_check > 0].index.tolist()
|
||
return False, f"NULL 값 포함 컬럼: {', '.join(null_cols)}"
|
||
|
||
# 음수 값 확인
|
||
for col in ['quantity', 'total_amount']:
|
||
if col in df.columns:
|
||
if (df[col] < 0).any():
|
||
return False, f"{col} 컬럼에 음수 값이 있습니다"
|
||
|
||
return True, "검증 통과"
|
||
|
||
def get_summary(self):
|
||
"""처리 결과 요약"""
|
||
if self.df_processed is None:
|
||
return None
|
||
|
||
df = self.df_processed
|
||
|
||
summary = {
|
||
'format_type': self.format_type,
|
||
'total_rows': len(df),
|
||
'total_items': df['herb_name'].nunique() if 'herb_name' in df.columns else 0,
|
||
'total_quantity': df['quantity'].sum() if 'quantity' in df.columns else 0,
|
||
'total_amount': df['total_amount'].sum() if 'total_amount' in df.columns else 0,
|
||
'suppliers': df['supplier_name'].unique().tolist() if 'supplier_name' in df.columns else [],
|
||
'date_range': None
|
||
}
|
||
|
||
# 날짜 범위
|
||
if 'receipt_date' in df.columns:
|
||
dates = pd.to_datetime(df['receipt_date'], format='%Y%m%d', errors='coerce')
|
||
dates = dates.dropna()
|
||
if not dates.empty:
|
||
summary['date_range'] = {
|
||
'start': dates.min().strftime('%Y-%m-%d'),
|
||
'end': dates.max().strftime('%Y-%m-%d')
|
||
}
|
||
|
||
return summary
|
||
|
||
def export_to_standard(self):
|
||
"""표준 형식으로 변환"""
|
||
if self.df_processed is None:
|
||
return None
|
||
|
||
# 표준 컬럼 순서
|
||
standard_columns = [
|
||
'insurance_code', 'supplier_name', 'herb_name',
|
||
'receipt_date', 'quantity', 'total_amount',
|
||
'unit_price', 'origin_country', 'notes',
|
||
'ingredient_code'
|
||
]
|
||
|
||
# 있는 컬럼만 선택
|
||
available_cols = [col for col in standard_columns if col in self.df_processed.columns]
|
||
df_standard = self.df_processed[available_cols].copy()
|
||
|
||
# 누락된 컬럼 추가 (기본값)
|
||
for col in standard_columns:
|
||
if col not in df_standard.columns:
|
||
if col == 'notes':
|
||
df_standard[col] = ''
|
||
elif col == 'supplier_name':
|
||
df_standard[col] = '미지정'
|
||
else:
|
||
df_standard[col] = None
|
||
|
||
return df_standard[standard_columns]
|
||
|
||
|
||
# 테스트 함수
|
||
def test_processor():
|
||
"""프로세서 테스트"""
|
||
processor = ExcelProcessor()
|
||
|
||
# 한의사랑 테스트
|
||
print("="*60)
|
||
print("한의사랑 형식 테스트")
|
||
print("="*60)
|
||
|
||
if processor.read_excel('/root/kdrug/sample/한의사랑.xlsx'):
|
||
print(f"형식 감지: {processor.format_type}")
|
||
df = processor.process()
|
||
print(f"처리된 행 수: {len(df)}")
|
||
|
||
valid, msg = processor.validate_data()
|
||
print(f"검증 결과: {msg}")
|
||
|
||
summary = processor.get_summary()
|
||
print(f"요약:")
|
||
print(f" - 총 약재: {summary['total_items']}종")
|
||
print(f" - 총 수량: {summary['total_quantity']:,.0f}g")
|
||
print(f" - 총 금액: {summary['total_amount']:,.0f}원")
|
||
|
||
# 샘플 출력
|
||
print("\n처리된 데이터 샘플:")
|
||
print(df.head(3).to_string())
|
||
|
||
# 한의정보 테스트
|
||
print("\n" + "="*60)
|
||
print("한의정보 형식 테스트")
|
||
print("="*60)
|
||
|
||
processor2 = ExcelProcessor()
|
||
if processor2.read_excel('/root/kdrug/sample/한의정보.xlsx'):
|
||
print(f"형식 감지: {processor2.format_type}")
|
||
df = processor2.process()
|
||
print(f"처리된 행 수: {len(df)}")
|
||
|
||
valid, msg = processor2.validate_data()
|
||
print(f"검증 결과: {msg}")
|
||
|
||
summary = processor2.get_summary()
|
||
print(f"요약:")
|
||
print(f" - 총 약재: {summary['total_items']}종")
|
||
print(f" - 총 수량: {summary['total_quantity']:,.0f}g")
|
||
print(f" - 총 금액: {summary['total_amount']:,.0f}원")
|
||
print(f" - 공급업체: {', '.join(summary['suppliers'])}")
|
||
|
||
# 샘플 출력
|
||
print("\n처리된 데이터 샘플:")
|
||
print(df.head(3).to_string())
|
||
|
||
|
||
if __name__ == "__main__":
|
||
test_processor() |