kdrug-inventory-system/excel_processor.py
시골약사 1198c22083 fix: Excel 입고 시 보험코드 9자리 패딩 처리
- Excel 읽기 시 제품코드를 문자열로 처리하여 앞자리 0 보존
- 한의사랑/한의정보 형식 모두 보험코드 9자리 패딩 적용
- 예: 60600420 → 060600420 자동 변환

이제 Excel 파일의 보험코드가 숫자로 읽혀도 정상 처리됨

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2026-02-16 14:36:52 +00:00

299 lines
10 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Excel 파일 처리 모듈
한의사랑, 한의정보 형식 자동 감지 및 처리
"""
import pandas as pd
import numpy as np
from datetime import datetime
import re
class ExcelProcessor:
"""Excel 파일 형식별 처리 클래스"""
# 한의사랑 형식 컬럼 매핑
HANISARANG_MAPPING = {
'품목명': 'herb_name',
'제품코드': 'insurance_code',
'일그램당단가': 'unit_price',
'원산지': 'origin_country',
'적용일': 'receipt_date',
'총구입량': 'quantity',
'총구입단가': 'total_amount'
}
# 한의정보 형식 컬럼 매핑
HANINFO_MAPPING = {
'제품코드': 'insurance_code',
'업체명': 'supplier_name',
'약재명': 'herb_name',
'구입일자': 'receipt_date',
'구입량': 'quantity',
'구입액': 'total_amount',
'원산지': 'origin_country',
'비고': 'notes'
}
def __init__(self):
self.format_type = None
self.df_original = None
self.df_processed = None
def detect_format(self, df):
"""Excel 형식 자동 감지"""
columns = df.columns.tolist()
# 한의사랑 형식 체크
hanisarang_cols = ['품목명', '제품코드', '일그램당단가', '총구입량', '총구입단가']
if all(col in columns for col in hanisarang_cols):
return 'hanisarang'
# 한의정보 형식 체크
haninfo_cols = ['제품코드', '업체명', '약재명', '구입일자', '구입량', '구입액']
if all(col in columns for col in haninfo_cols):
return 'haninfo'
# 기본 형식 (제품코드가 있는 경우 한의정보로 간주)
if '제품코드' in columns and '약재명' in columns:
return 'haninfo'
return 'unknown'
def read_excel(self, file_path):
"""Excel 파일 읽기"""
try:
# 제품코드를 문자열로 읽기 위한 dtype 설정
self.df_original = pd.read_excel(file_path, dtype={'제품코드': str})
self.format_type = self.detect_format(self.df_original)
return True
except Exception as e:
print(f"Excel 파일 읽기 실패: {str(e)}")
return False
def process_hanisarang(self):
"""한의사랑 형식 처리"""
df = self.df_original.copy()
# 컬럼 매핑
df_mapped = pd.DataFrame()
for old_col, new_col in self.HANISARANG_MAPPING.items():
if old_col in df.columns:
df_mapped[new_col] = df[old_col]
# 보험코드 9자리 패딩 처리
if 'insurance_code' in df_mapped.columns:
df_mapped['insurance_code'] = df_mapped['insurance_code'].apply(
lambda x: str(x).zfill(9) if pd.notna(x) and str(x).isdigit() else str(x) if pd.notna(x) else None
)
# 업체명 추가 (기본값)
df_mapped['supplier_name'] = '한의사랑'
# 날짜 처리
if 'receipt_date' in df_mapped.columns:
df_mapped['receipt_date'] = pd.to_datetime(
df_mapped['receipt_date'],
format='%Y-%m-%d',
errors='coerce'
).dt.strftime('%Y%m%d')
# 단가 계산 (이미 있지만 검증)
if 'unit_price' not in df_mapped.columns or df_mapped['unit_price'].isnull().all():
if 'total_amount' in df_mapped.columns and 'quantity' in df_mapped.columns:
df_mapped['unit_price'] = df_mapped['total_amount'] / df_mapped['quantity']
self.df_processed = df_mapped
return df_mapped
def process_haninfo(self):
"""한의정보 형식 처리"""
df = self.df_original.copy()
# 컬럼 매핑
df_mapped = pd.DataFrame()
for old_col, new_col in self.HANINFO_MAPPING.items():
if old_col in df.columns:
df_mapped[new_col] = df[old_col]
# 보험코드 9자리 패딩 처리
if 'insurance_code' in df_mapped.columns:
df_mapped['insurance_code'] = df_mapped['insurance_code'].apply(
lambda x: str(x).zfill(9) if pd.notna(x) and str(x).isdigit() else str(x) if pd.notna(x) else None
)
# 날짜 처리 (YYYYMMDD 형식)
if 'receipt_date' in df_mapped.columns:
df_mapped['receipt_date'] = df_mapped['receipt_date'].astype(str)
# 단가 계산
if 'total_amount' in df_mapped.columns and 'quantity' in df_mapped.columns:
df_mapped['unit_price'] = df_mapped['total_amount'] / df_mapped['quantity']
df_mapped['unit_price'] = df_mapped['unit_price'].round(2)
self.df_processed = df_mapped
return df_mapped
def process(self):
"""형식에 따라 자동 처리"""
if self.format_type == 'hanisarang':
return self.process_hanisarang()
elif self.format_type == 'haninfo':
return self.process_haninfo()
else:
raise ValueError(f"지원하지 않는 형식: {self.format_type}")
def validate_data(self):
"""처리된 데이터 검증"""
if self.df_processed is None:
return False, "처리된 데이터가 없습니다"
df = self.df_processed
# 필수 컬럼 확인
required_columns = ['herb_name', 'quantity', 'total_amount']
missing_cols = [col for col in required_columns if col not in df.columns]
if missing_cols:
return False, f"필수 컬럼 누락: {', '.join(missing_cols)}"
# 데이터 타입 검증
numeric_cols = ['quantity', 'total_amount', 'unit_price']
for col in numeric_cols:
if col in df.columns:
try:
df[col] = pd.to_numeric(df[col], errors='coerce')
except:
return False, f"{col} 컬럼이 숫자 형식이 아닙니다"
# NULL 값 확인
null_check = df[required_columns].isnull().sum()
if null_check.sum() > 0:
null_cols = null_check[null_check > 0].index.tolist()
return False, f"NULL 값 포함 컬럼: {', '.join(null_cols)}"
# 음수 값 확인
for col in ['quantity', 'total_amount']:
if col in df.columns:
if (df[col] < 0).any():
return False, f"{col} 컬럼에 음수 값이 있습니다"
return True, "검증 통과"
def get_summary(self):
"""처리 결과 요약"""
if self.df_processed is None:
return None
df = self.df_processed
summary = {
'format_type': self.format_type,
'total_rows': len(df),
'total_items': df['herb_name'].nunique() if 'herb_name' in df.columns else 0,
'total_quantity': df['quantity'].sum() if 'quantity' in df.columns else 0,
'total_amount': df['total_amount'].sum() if 'total_amount' in df.columns else 0,
'suppliers': df['supplier_name'].unique().tolist() if 'supplier_name' in df.columns else [],
'date_range': None
}
# 날짜 범위
if 'receipt_date' in df.columns:
dates = pd.to_datetime(df['receipt_date'], format='%Y%m%d', errors='coerce')
dates = dates.dropna()
if not dates.empty:
summary['date_range'] = {
'start': dates.min().strftime('%Y-%m-%d'),
'end': dates.max().strftime('%Y-%m-%d')
}
return summary
def export_to_standard(self):
"""표준 형식으로 변환"""
if self.df_processed is None:
return None
# 표준 컬럼 순서
standard_columns = [
'insurance_code', 'supplier_name', 'herb_name',
'receipt_date', 'quantity', 'total_amount',
'unit_price', 'origin_country', 'notes'
]
# 있는 컬럼만 선택
available_cols = [col for col in standard_columns if col in self.df_processed.columns]
df_standard = self.df_processed[available_cols].copy()
# 누락된 컬럼 추가 (기본값)
for col in standard_columns:
if col not in df_standard.columns:
if col == 'notes':
df_standard[col] = ''
elif col == 'supplier_name':
df_standard[col] = '미지정'
else:
df_standard[col] = None
return df_standard[standard_columns]
# 테스트 함수
def test_processor():
"""프로세서 테스트"""
processor = ExcelProcessor()
# 한의사랑 테스트
print("="*60)
print("한의사랑 형식 테스트")
print("="*60)
if processor.read_excel('/root/kdrug/sample/한의사랑.xlsx'):
print(f"형식 감지: {processor.format_type}")
df = processor.process()
print(f"처리된 행 수: {len(df)}")
valid, msg = processor.validate_data()
print(f"검증 결과: {msg}")
summary = processor.get_summary()
print(f"요약:")
print(f" - 총 약재: {summary['total_items']}")
print(f" - 총 수량: {summary['total_quantity']:,.0f}g")
print(f" - 총 금액: {summary['total_amount']:,.0f}")
# 샘플 출력
print("\n처리된 데이터 샘플:")
print(df.head(3).to_string())
# 한의정보 테스트
print("\n" + "="*60)
print("한의정보 형식 테스트")
print("="*60)
processor2 = ExcelProcessor()
if processor2.read_excel('/root/kdrug/sample/한의정보.xlsx'):
print(f"형식 감지: {processor2.format_type}")
df = processor2.process()
print(f"처리된 행 수: {len(df)}")
valid, msg = processor2.validate_data()
print(f"검증 결과: {msg}")
summary = processor2.get_summary()
print(f"요약:")
print(f" - 총 약재: {summary['total_items']}")
print(f" - 총 수량: {summary['total_quantity']:,.0f}g")
print(f" - 총 금액: {summary['total_amount']:,.0f}")
print(f" - 공급업체: {', '.join(summary['suppliers'])}")
# 샘플 출력
print("\n처리된 데이터 샘플:")
print(df.head(3).to_string())
if __name__ == "__main__":
test_processor()