#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Excel 파일 처리 모듈 한의사랑, 한의정보 형식 자동 감지 및 처리 """ import pandas as pd import numpy as np from datetime import datetime import re class ExcelProcessor: """Excel 파일 형식별 처리 클래스""" # 한의사랑 형식 컬럼 매핑 HANISARANG_MAPPING = { '품목명': 'herb_name', '제품코드': 'insurance_code', '일그램당단가': 'unit_price', '원산지': 'origin_country', '적용일': 'receipt_date', '총구입량': 'quantity', '총구입단가': 'total_amount' } # 한의정보 형식 컬럼 매핑 HANINFO_MAPPING = { '제품코드': 'insurance_code', '업체명': 'supplier_name', '약재명': 'herb_name', '구입일자': 'receipt_date', '구입량': 'quantity', '구입액': 'total_amount', '원산지': 'origin_country', '비고': 'notes' } def __init__(self): self.format_type = None self.df_original = None self.df_processed = None def detect_format(self, df): """Excel 형식 자동 감지""" columns = df.columns.tolist() # 한의사랑 형식 체크 hanisarang_cols = ['품목명', '제품코드', '일그램당단가', '총구입량', '총구입단가'] if all(col in columns for col in hanisarang_cols): return 'hanisarang' # 한의정보 형식 체크 haninfo_cols = ['제품코드', '업체명', '약재명', '구입일자', '구입량', '구입액'] if all(col in columns for col in haninfo_cols): return 'haninfo' # 기본 형식 (제품코드가 있는 경우 한의정보로 간주) if '제품코드' in columns and '약재명' in columns: return 'haninfo' return 'unknown' def read_excel(self, file_path): """Excel 파일 읽기""" try: self.df_original = pd.read_excel(file_path) self.format_type = self.detect_format(self.df_original) return True except Exception as e: print(f"Excel 파일 읽기 실패: {str(e)}") return False def process_hanisarang(self): """한의사랑 형식 처리""" df = self.df_original.copy() # 컬럼 매핑 df_mapped = pd.DataFrame() for old_col, new_col in self.HANISARANG_MAPPING.items(): if old_col in df.columns: df_mapped[new_col] = df[old_col] # 업체명 추가 (기본값) df_mapped['supplier_name'] = '한의사랑' # 날짜 처리 if 'receipt_date' in df_mapped.columns: df_mapped['receipt_date'] = pd.to_datetime( df_mapped['receipt_date'], format='%Y-%m-%d', errors='coerce' ).dt.strftime('%Y%m%d') # 단가 계산 (이미 있지만 검증) if 'unit_price' not in df_mapped.columns or df_mapped['unit_price'].isnull().all(): if 'total_amount' in df_mapped.columns and 'quantity' in df_mapped.columns: df_mapped['unit_price'] = df_mapped['total_amount'] / df_mapped['quantity'] self.df_processed = df_mapped return df_mapped def process_haninfo(self): """한의정보 형식 처리""" df = self.df_original.copy() # 컬럼 매핑 df_mapped = pd.DataFrame() for old_col, new_col in self.HANINFO_MAPPING.items(): if old_col in df.columns: df_mapped[new_col] = df[old_col] # 날짜 처리 (YYYYMMDD 형식) if 'receipt_date' in df_mapped.columns: df_mapped['receipt_date'] = df_mapped['receipt_date'].astype(str) # 단가 계산 if 'total_amount' in df_mapped.columns and 'quantity' in df_mapped.columns: df_mapped['unit_price'] = df_mapped['total_amount'] / df_mapped['quantity'] df_mapped['unit_price'] = df_mapped['unit_price'].round(2) self.df_processed = df_mapped return df_mapped def process(self): """형식에 따라 자동 처리""" if self.format_type == 'hanisarang': return self.process_hanisarang() elif self.format_type == 'haninfo': return self.process_haninfo() else: raise ValueError(f"지원하지 않는 형식: {self.format_type}") def validate_data(self): """처리된 데이터 검증""" if self.df_processed is None: return False, "처리된 데이터가 없습니다" df = self.df_processed # 필수 컬럼 확인 required_columns = ['herb_name', 'quantity', 'total_amount'] missing_cols = [col for col in required_columns if col not in df.columns] if missing_cols: return False, f"필수 컬럼 누락: {', '.join(missing_cols)}" # 데이터 타입 검증 numeric_cols = ['quantity', 'total_amount', 'unit_price'] for col in numeric_cols: if col in df.columns: try: df[col] = pd.to_numeric(df[col], errors='coerce') except: return False, f"{col} 컬럼이 숫자 형식이 아닙니다" # NULL 값 확인 null_check = df[required_columns].isnull().sum() if null_check.sum() > 0: null_cols = null_check[null_check > 0].index.tolist() return False, f"NULL 값 포함 컬럼: {', '.join(null_cols)}" # 음수 값 확인 for col in ['quantity', 'total_amount']: if col in df.columns: if (df[col] < 0).any(): return False, f"{col} 컬럼에 음수 값이 있습니다" return True, "검증 통과" def get_summary(self): """처리 결과 요약""" if self.df_processed is None: return None df = self.df_processed summary = { 'format_type': self.format_type, 'total_rows': len(df), 'total_items': df['herb_name'].nunique() if 'herb_name' in df.columns else 0, 'total_quantity': df['quantity'].sum() if 'quantity' in df.columns else 0, 'total_amount': df['total_amount'].sum() if 'total_amount' in df.columns else 0, 'suppliers': df['supplier_name'].unique().tolist() if 'supplier_name' in df.columns else [], 'date_range': None } # 날짜 범위 if 'receipt_date' in df.columns: dates = pd.to_datetime(df['receipt_date'], format='%Y%m%d', errors='coerce') dates = dates.dropna() if not dates.empty: summary['date_range'] = { 'start': dates.min().strftime('%Y-%m-%d'), 'end': dates.max().strftime('%Y-%m-%d') } return summary def export_to_standard(self): """표준 형식으로 변환""" if self.df_processed is None: return None # 표준 컬럼 순서 standard_columns = [ 'insurance_code', 'supplier_name', 'herb_name', 'receipt_date', 'quantity', 'total_amount', 'unit_price', 'origin_country', 'notes' ] # 있는 컬럼만 선택 available_cols = [col for col in standard_columns if col in self.df_processed.columns] df_standard = self.df_processed[available_cols].copy() # 누락된 컬럼 추가 (기본값) for col in standard_columns: if col not in df_standard.columns: if col == 'notes': df_standard[col] = '' elif col == 'supplier_name': df_standard[col] = '미지정' else: df_standard[col] = None return df_standard[standard_columns] # 테스트 함수 def test_processor(): """프로세서 테스트""" processor = ExcelProcessor() # 한의사랑 테스트 print("="*60) print("한의사랑 형식 테스트") print("="*60) if processor.read_excel('/root/kdrug/sample/한의사랑.xlsx'): print(f"형식 감지: {processor.format_type}") df = processor.process() print(f"처리된 행 수: {len(df)}") valid, msg = processor.validate_data() print(f"검증 결과: {msg}") summary = processor.get_summary() print(f"요약:") print(f" - 총 약재: {summary['total_items']}종") print(f" - 총 수량: {summary['total_quantity']:,.0f}g") print(f" - 총 금액: {summary['total_amount']:,.0f}원") # 샘플 출력 print("\n처리된 데이터 샘플:") print(df.head(3).to_string()) # 한의정보 테스트 print("\n" + "="*60) print("한의정보 형식 테스트") print("="*60) processor2 = ExcelProcessor() if processor2.read_excel('/root/kdrug/sample/한의정보.xlsx'): print(f"형식 감지: {processor2.format_type}") df = processor2.process() print(f"처리된 행 수: {len(df)}") valid, msg = processor2.validate_data() print(f"검증 결과: {msg}") summary = processor2.get_summary() print(f"요약:") print(f" - 총 약재: {summary['total_items']}종") print(f" - 총 수량: {summary['total_quantity']:,.0f}g") print(f" - 총 금액: {summary['total_amount']:,.0f}원") print(f" - 공급업체: {', '.join(summary['suppliers'])}") # 샘플 출력 print("\n처리된 데이터 샘플:") print(df.head(3).to_string()) if __name__ == "__main__": test_processor()