- Excel 읽기 시 제품코드를 문자열로 처리하여 앞자리 0 보존 - 한의사랑/한의정보 형식 모두 보험코드 9자리 패딩 적용 - 예: 60600420 → 060600420 자동 변환 이제 Excel 파일의 보험코드가 숫자로 읽혀도 정상 처리됨 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
299 lines
10 KiB
Python
299 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
Excel 파일 처리 모듈
|
|
한의사랑, 한의정보 형식 자동 감지 및 처리
|
|
"""
|
|
|
|
import pandas as pd
|
|
import numpy as np
|
|
from datetime import datetime
|
|
import re
|
|
|
|
class ExcelProcessor:
|
|
"""Excel 파일 형식별 처리 클래스"""
|
|
|
|
# 한의사랑 형식 컬럼 매핑
|
|
HANISARANG_MAPPING = {
|
|
'품목명': 'herb_name',
|
|
'제품코드': 'insurance_code',
|
|
'일그램당단가': 'unit_price',
|
|
'원산지': 'origin_country',
|
|
'적용일': 'receipt_date',
|
|
'총구입량': 'quantity',
|
|
'총구입단가': 'total_amount'
|
|
}
|
|
|
|
# 한의정보 형식 컬럼 매핑
|
|
HANINFO_MAPPING = {
|
|
'제품코드': 'insurance_code',
|
|
'업체명': 'supplier_name',
|
|
'약재명': 'herb_name',
|
|
'구입일자': 'receipt_date',
|
|
'구입량': 'quantity',
|
|
'구입액': 'total_amount',
|
|
'원산지': 'origin_country',
|
|
'비고': 'notes'
|
|
}
|
|
|
|
def __init__(self):
|
|
self.format_type = None
|
|
self.df_original = None
|
|
self.df_processed = None
|
|
|
|
def detect_format(self, df):
|
|
"""Excel 형식 자동 감지"""
|
|
columns = df.columns.tolist()
|
|
|
|
# 한의사랑 형식 체크
|
|
hanisarang_cols = ['품목명', '제품코드', '일그램당단가', '총구입량', '총구입단가']
|
|
if all(col in columns for col in hanisarang_cols):
|
|
return 'hanisarang'
|
|
|
|
# 한의정보 형식 체크
|
|
haninfo_cols = ['제품코드', '업체명', '약재명', '구입일자', '구입량', '구입액']
|
|
if all(col in columns for col in haninfo_cols):
|
|
return 'haninfo'
|
|
|
|
# 기본 형식 (제품코드가 있는 경우 한의정보로 간주)
|
|
if '제품코드' in columns and '약재명' in columns:
|
|
return 'haninfo'
|
|
|
|
return 'unknown'
|
|
|
|
def read_excel(self, file_path):
|
|
"""Excel 파일 읽기"""
|
|
try:
|
|
# 제품코드를 문자열로 읽기 위한 dtype 설정
|
|
self.df_original = pd.read_excel(file_path, dtype={'제품코드': str})
|
|
self.format_type = self.detect_format(self.df_original)
|
|
return True
|
|
except Exception as e:
|
|
print(f"Excel 파일 읽기 실패: {str(e)}")
|
|
return False
|
|
|
|
def process_hanisarang(self):
|
|
"""한의사랑 형식 처리"""
|
|
df = self.df_original.copy()
|
|
|
|
# 컬럼 매핑
|
|
df_mapped = pd.DataFrame()
|
|
|
|
for old_col, new_col in self.HANISARANG_MAPPING.items():
|
|
if old_col in df.columns:
|
|
df_mapped[new_col] = df[old_col]
|
|
|
|
# 보험코드 9자리 패딩 처리
|
|
if 'insurance_code' in df_mapped.columns:
|
|
df_mapped['insurance_code'] = df_mapped['insurance_code'].apply(
|
|
lambda x: str(x).zfill(9) if pd.notna(x) and str(x).isdigit() else str(x) if pd.notna(x) else None
|
|
)
|
|
|
|
# 업체명 추가 (기본값)
|
|
df_mapped['supplier_name'] = '한의사랑'
|
|
|
|
# 날짜 처리
|
|
if 'receipt_date' in df_mapped.columns:
|
|
df_mapped['receipt_date'] = pd.to_datetime(
|
|
df_mapped['receipt_date'],
|
|
format='%Y-%m-%d',
|
|
errors='coerce'
|
|
).dt.strftime('%Y%m%d')
|
|
|
|
# 단가 계산 (이미 있지만 검증)
|
|
if 'unit_price' not in df_mapped.columns or df_mapped['unit_price'].isnull().all():
|
|
if 'total_amount' in df_mapped.columns and 'quantity' in df_mapped.columns:
|
|
df_mapped['unit_price'] = df_mapped['total_amount'] / df_mapped['quantity']
|
|
|
|
self.df_processed = df_mapped
|
|
return df_mapped
|
|
|
|
def process_haninfo(self):
|
|
"""한의정보 형식 처리"""
|
|
df = self.df_original.copy()
|
|
|
|
# 컬럼 매핑
|
|
df_mapped = pd.DataFrame()
|
|
|
|
for old_col, new_col in self.HANINFO_MAPPING.items():
|
|
if old_col in df.columns:
|
|
df_mapped[new_col] = df[old_col]
|
|
|
|
# 보험코드 9자리 패딩 처리
|
|
if 'insurance_code' in df_mapped.columns:
|
|
df_mapped['insurance_code'] = df_mapped['insurance_code'].apply(
|
|
lambda x: str(x).zfill(9) if pd.notna(x) and str(x).isdigit() else str(x) if pd.notna(x) else None
|
|
)
|
|
|
|
# 날짜 처리 (YYYYMMDD 형식)
|
|
if 'receipt_date' in df_mapped.columns:
|
|
df_mapped['receipt_date'] = df_mapped['receipt_date'].astype(str)
|
|
|
|
# 단가 계산
|
|
if 'total_amount' in df_mapped.columns and 'quantity' in df_mapped.columns:
|
|
df_mapped['unit_price'] = df_mapped['total_amount'] / df_mapped['quantity']
|
|
df_mapped['unit_price'] = df_mapped['unit_price'].round(2)
|
|
|
|
self.df_processed = df_mapped
|
|
return df_mapped
|
|
|
|
def process(self):
|
|
"""형식에 따라 자동 처리"""
|
|
if self.format_type == 'hanisarang':
|
|
return self.process_hanisarang()
|
|
elif self.format_type == 'haninfo':
|
|
return self.process_haninfo()
|
|
else:
|
|
raise ValueError(f"지원하지 않는 형식: {self.format_type}")
|
|
|
|
def validate_data(self):
|
|
"""처리된 데이터 검증"""
|
|
if self.df_processed is None:
|
|
return False, "처리된 데이터가 없습니다"
|
|
|
|
df = self.df_processed
|
|
|
|
# 필수 컬럼 확인
|
|
required_columns = ['herb_name', 'quantity', 'total_amount']
|
|
missing_cols = [col for col in required_columns if col not in df.columns]
|
|
|
|
if missing_cols:
|
|
return False, f"필수 컬럼 누락: {', '.join(missing_cols)}"
|
|
|
|
# 데이터 타입 검증
|
|
numeric_cols = ['quantity', 'total_amount', 'unit_price']
|
|
for col in numeric_cols:
|
|
if col in df.columns:
|
|
try:
|
|
df[col] = pd.to_numeric(df[col], errors='coerce')
|
|
except:
|
|
return False, f"{col} 컬럼이 숫자 형식이 아닙니다"
|
|
|
|
# NULL 값 확인
|
|
null_check = df[required_columns].isnull().sum()
|
|
if null_check.sum() > 0:
|
|
null_cols = null_check[null_check > 0].index.tolist()
|
|
return False, f"NULL 값 포함 컬럼: {', '.join(null_cols)}"
|
|
|
|
# 음수 값 확인
|
|
for col in ['quantity', 'total_amount']:
|
|
if col in df.columns:
|
|
if (df[col] < 0).any():
|
|
return False, f"{col} 컬럼에 음수 값이 있습니다"
|
|
|
|
return True, "검증 통과"
|
|
|
|
def get_summary(self):
|
|
"""처리 결과 요약"""
|
|
if self.df_processed is None:
|
|
return None
|
|
|
|
df = self.df_processed
|
|
|
|
summary = {
|
|
'format_type': self.format_type,
|
|
'total_rows': len(df),
|
|
'total_items': df['herb_name'].nunique() if 'herb_name' in df.columns else 0,
|
|
'total_quantity': df['quantity'].sum() if 'quantity' in df.columns else 0,
|
|
'total_amount': df['total_amount'].sum() if 'total_amount' in df.columns else 0,
|
|
'suppliers': df['supplier_name'].unique().tolist() if 'supplier_name' in df.columns else [],
|
|
'date_range': None
|
|
}
|
|
|
|
# 날짜 범위
|
|
if 'receipt_date' in df.columns:
|
|
dates = pd.to_datetime(df['receipt_date'], format='%Y%m%d', errors='coerce')
|
|
dates = dates.dropna()
|
|
if not dates.empty:
|
|
summary['date_range'] = {
|
|
'start': dates.min().strftime('%Y-%m-%d'),
|
|
'end': dates.max().strftime('%Y-%m-%d')
|
|
}
|
|
|
|
return summary
|
|
|
|
def export_to_standard(self):
|
|
"""표준 형식으로 변환"""
|
|
if self.df_processed is None:
|
|
return None
|
|
|
|
# 표준 컬럼 순서
|
|
standard_columns = [
|
|
'insurance_code', 'supplier_name', 'herb_name',
|
|
'receipt_date', 'quantity', 'total_amount',
|
|
'unit_price', 'origin_country', 'notes'
|
|
]
|
|
|
|
# 있는 컬럼만 선택
|
|
available_cols = [col for col in standard_columns if col in self.df_processed.columns]
|
|
df_standard = self.df_processed[available_cols].copy()
|
|
|
|
# 누락된 컬럼 추가 (기본값)
|
|
for col in standard_columns:
|
|
if col not in df_standard.columns:
|
|
if col == 'notes':
|
|
df_standard[col] = ''
|
|
elif col == 'supplier_name':
|
|
df_standard[col] = '미지정'
|
|
else:
|
|
df_standard[col] = None
|
|
|
|
return df_standard[standard_columns]
|
|
|
|
|
|
# 테스트 함수
|
|
def test_processor():
|
|
"""프로세서 테스트"""
|
|
processor = ExcelProcessor()
|
|
|
|
# 한의사랑 테스트
|
|
print("="*60)
|
|
print("한의사랑 형식 테스트")
|
|
print("="*60)
|
|
|
|
if processor.read_excel('/root/kdrug/sample/한의사랑.xlsx'):
|
|
print(f"형식 감지: {processor.format_type}")
|
|
df = processor.process()
|
|
print(f"처리된 행 수: {len(df)}")
|
|
|
|
valid, msg = processor.validate_data()
|
|
print(f"검증 결과: {msg}")
|
|
|
|
summary = processor.get_summary()
|
|
print(f"요약:")
|
|
print(f" - 총 약재: {summary['total_items']}종")
|
|
print(f" - 총 수량: {summary['total_quantity']:,.0f}g")
|
|
print(f" - 총 금액: {summary['total_amount']:,.0f}원")
|
|
|
|
# 샘플 출력
|
|
print("\n처리된 데이터 샘플:")
|
|
print(df.head(3).to_string())
|
|
|
|
# 한의정보 테스트
|
|
print("\n" + "="*60)
|
|
print("한의정보 형식 테스트")
|
|
print("="*60)
|
|
|
|
processor2 = ExcelProcessor()
|
|
if processor2.read_excel('/root/kdrug/sample/한의정보.xlsx'):
|
|
print(f"형식 감지: {processor2.format_type}")
|
|
df = processor2.process()
|
|
print(f"처리된 행 수: {len(df)}")
|
|
|
|
valid, msg = processor2.validate_data()
|
|
print(f"검증 결과: {msg}")
|
|
|
|
summary = processor2.get_summary()
|
|
print(f"요약:")
|
|
print(f" - 총 약재: {summary['total_items']}종")
|
|
print(f" - 총 수량: {summary['total_quantity']:,.0f}g")
|
|
print(f" - 총 금액: {summary['total_amount']:,.0f}원")
|
|
print(f" - 공급업체: {', '.join(summary['suppliers'])}")
|
|
|
|
# 샘플 출력
|
|
print("\n처리된 데이터 샘플:")
|
|
print(df.head(3).to_string())
|
|
|
|
|
|
if __name__ == "__main__":
|
|
test_processor() |