#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Excel 파일 처리 모듈 한의사랑, 한의정보 형식 자동 감지 및 처리 """ import pandas as pd import numpy as np from datetime import datetime import re class ExcelProcessor: """Excel 파일 형식별 처리 클래스""" # 한의사랑 형식 컬럼 매핑 HANISARANG_MAPPING = { '품목명': 'herb_name', '제품코드': 'insurance_code', '일그램당단가': 'unit_price', '원산지': 'origin_country', '적용일': 'receipt_date', '총구입량': 'quantity', '총구입단가': 'total_amount' } # 한의정보 형식 컬럼 매핑 HANINFO_MAPPING = { '제품코드': 'insurance_code', '업체명': 'supplier_name', '약재명': 'herb_name', '구입일자': 'receipt_date', '구입량': 'quantity', '구입액': 'total_amount', '원산지': 'origin_country', '비고': 'notes' } # 한퓨어 형식 컬럼 매핑 HANPURE_MAPPING = { '상품명': 'herb_name', '제조사코드': 'insurance_code', '주성분코드': 'ingredient_code', '제조사명': 'supplier_name', '원산지': 'origin_country', '소계': 'total_amount', '옵션항목': 'option_detail', '주문번호': 'order_number', } def __init__(self): self.format_type = None self.df_original = None self.df_processed = None def detect_format(self, df): """Excel 형식 자동 감지""" columns = df.columns.tolist() # 한퓨어 형식 체크 (주문번호, 주성분코드, 제조사코드, 옵션항목이 특징) hanpure_cols = ['주문번호', '주성분코드', '제조사코드', '상품명', '옵션항목'] if all(col in columns for col in hanpure_cols): return 'hanpure' # 한의사랑 형식 체크 hanisarang_cols = ['품목명', '제품코드', '일그램당단가', '총구입량', '총구입단가'] if all(col in columns for col in hanisarang_cols): return 'hanisarang' # 한의정보 형식 체크 haninfo_cols = ['제품코드', '업체명', '약재명', '구입일자', '구입량', '구입액'] if all(col in columns for col in haninfo_cols): return 'haninfo' # 기본 형식 (제품코드가 있는 경우 한의정보로 간주) if '제품코드' in columns and '약재명' in columns: return 'haninfo' return 'unknown' def read_excel(self, file_path): """Excel 파일 읽기""" try: # 코드 컬럼을 문자열로 읽기 위한 dtype 설정 self.df_original = pd.read_excel(file_path, dtype={ '제품코드': str, '제조사코드': str, '주성분코드': str, '대표코드': str }) self.format_type = self.detect_format(self.df_original) return True except Exception as e: print(f"Excel 파일 읽기 실패: {str(e)}") return False def process_hanisarang(self): """한의사랑 형식 처리""" df = self.df_original.copy() # 컬럼 매핑 df_mapped = pd.DataFrame() for old_col, new_col in self.HANISARANG_MAPPING.items(): if old_col in df.columns: df_mapped[new_col] = df[old_col] # 보험코드 9자리 패딩 처리 if 'insurance_code' in df_mapped.columns: df_mapped['insurance_code'] = df_mapped['insurance_code'].apply( lambda x: str(x).zfill(9) if pd.notna(x) and str(x).isdigit() else str(x) if pd.notna(x) else None ) # 업체명 추가 (기본값) df_mapped['supplier_name'] = '한의사랑' # 날짜 처리 if 'receipt_date' in df_mapped.columns: df_mapped['receipt_date'] = pd.to_datetime( df_mapped['receipt_date'], format='%Y-%m-%d', errors='coerce' ).dt.strftime('%Y%m%d') # 단가 계산 (이미 있지만 검증) if 'unit_price' not in df_mapped.columns or df_mapped['unit_price'].isnull().all(): if 'total_amount' in df_mapped.columns and 'quantity' in df_mapped.columns: df_mapped['unit_price'] = df_mapped['total_amount'] / df_mapped['quantity'] self.df_processed = df_mapped return df_mapped def process_haninfo(self): """한의정보 형식 처리""" df = self.df_original.copy() # 컬럼 매핑 df_mapped = pd.DataFrame() for old_col, new_col in self.HANINFO_MAPPING.items(): if old_col in df.columns: df_mapped[new_col] = df[old_col] # 보험코드 9자리 패딩 처리 if 'insurance_code' in df_mapped.columns: df_mapped['insurance_code'] = df_mapped['insurance_code'].apply( lambda x: str(x).zfill(9) if pd.notna(x) and str(x).isdigit() else str(x) if pd.notna(x) else None ) # 날짜 처리 (YYYYMMDD 형식) if 'receipt_date' in df_mapped.columns: df_mapped['receipt_date'] = df_mapped['receipt_date'].astype(str) # 단가 계산 if 'total_amount' in df_mapped.columns and 'quantity' in df_mapped.columns: df_mapped['unit_price'] = df_mapped['total_amount'] / df_mapped['quantity'] df_mapped['unit_price'] = df_mapped['unit_price'].round(2) self.df_processed = df_mapped return df_mapped @staticmethod def parse_option_quantity_g(option_text): """옵션항목에서 총 중량(g) 파싱 예: '인삼 특A (4~5년근) 600g*5개' → 3000 '감초 1kg' → 1000 '복령 500g' → 500 '백출 300g*3개' → 900 """ if not option_text or pd.isna(option_text): return None text = str(option_text) # 패턴1: NNNg*N개 또는 NNNg×N개 m = re.search(r'(\d+(?:\.\d+)?)\s*g\s*[*×x]\s*(\d+)', text, re.IGNORECASE) if m: return float(m.group(1)) * int(m.group(2)) # 패턴2: N kg*N개 m = re.search(r'(\d+(?:\.\d+)?)\s*kg\s*[*×x]\s*(\d+)', text, re.IGNORECASE) if m: return float(m.group(1)) * 1000 * int(m.group(2)) # 패턴3: NNNg (단독) m = re.search(r'(\d+(?:\.\d+)?)\s*g(?!\w)', text, re.IGNORECASE) if m: return float(m.group(1)) # 패턴4: Nkg (단독) m = re.search(r'(\d+(?:\.\d+)?)\s*kg(?!\w)', text, re.IGNORECASE) if m: return float(m.group(1)) * 1000 return None def process_hanpure(self): """한퓨어 형식 처리""" df = self.df_original.copy() df_mapped = pd.DataFrame() for old_col, new_col in self.HANPURE_MAPPING.items(): if old_col in df.columns: df_mapped[new_col] = df[old_col] # 보험코드 9자리 패딩 처리 if 'insurance_code' in df_mapped.columns: df_mapped['insurance_code'] = df_mapped['insurance_code'].apply( lambda x: str(x).zfill(9) if pd.notna(x) and str(x).strip().isdigit() else str(x).strip() if pd.notna(x) else None ) # 주문번호에서 날짜 추출 (20260211-22511888 → 20260211) if 'order_number' in df_mapped.columns: df_mapped['receipt_date'] = df_mapped['order_number'].apply( lambda x: str(x).split('-')[0] if pd.notna(x) else None ) # 옵션항목에서 중량(g) 파싱 if 'option_detail' in df_mapped.columns: df_mapped['quantity'] = df_mapped['option_detail'].apply(self.parse_option_quantity_g) # 업체명 기본값 if 'supplier_name' not in df_mapped.columns or df_mapped['supplier_name'].isnull().all(): df_mapped['supplier_name'] = '한퓨어' # 단가 계산 (소계 / 중량g) if 'total_amount' in df_mapped.columns and 'quantity' in df_mapped.columns: df_mapped['unit_price'] = df_mapped.apply( lambda row: round(row['total_amount'] / row['quantity'], 2) if pd.notna(row.get('quantity')) and row.get('quantity', 0) > 0 else None, axis=1 ) # 비고에 옵션항목 원문 저장 df_mapped['notes'] = df_mapped.get('option_detail', '') # 임시 컬럼 제거 df_mapped.drop(columns=['order_number', 'option_detail'], errors='ignore', inplace=True) self.df_processed = df_mapped return df_mapped def process(self): """형식에 따라 자동 처리""" if self.format_type == 'hanisarang': return self.process_hanisarang() elif self.format_type == 'haninfo': return self.process_haninfo() elif self.format_type == 'hanpure': return self.process_hanpure() else: raise ValueError(f"지원하지 않는 형식: {self.format_type}") def validate_data(self): """처리된 데이터 검증""" if self.df_processed is None: return False, "처리된 데이터가 없습니다" df = self.df_processed # 필수 컬럼 확인 required_columns = ['herb_name', 'quantity', 'total_amount'] missing_cols = [col for col in required_columns if col not in df.columns] if missing_cols: return False, f"필수 컬럼 누락: {', '.join(missing_cols)}" # 데이터 타입 검증 numeric_cols = ['quantity', 'total_amount', 'unit_price'] for col in numeric_cols: if col in df.columns: try: df[col] = pd.to_numeric(df[col], errors='coerce') except: return False, f"{col} 컬럼이 숫자 형식이 아닙니다" # NULL 값 확인 null_check = df[required_columns].isnull().sum() if null_check.sum() > 0: null_cols = null_check[null_check > 0].index.tolist() return False, f"NULL 값 포함 컬럼: {', '.join(null_cols)}" # 음수 값 확인 for col in ['quantity', 'total_amount']: if col in df.columns: if (df[col] < 0).any(): return False, f"{col} 컬럼에 음수 값이 있습니다" return True, "검증 통과" def get_summary(self): """처리 결과 요약""" if self.df_processed is None: return None df = self.df_processed summary = { 'format_type': self.format_type, 'total_rows': len(df), 'total_items': df['herb_name'].nunique() if 'herb_name' in df.columns else 0, 'total_quantity': df['quantity'].sum() if 'quantity' in df.columns else 0, 'total_amount': df['total_amount'].sum() if 'total_amount' in df.columns else 0, 'suppliers': df['supplier_name'].unique().tolist() if 'supplier_name' in df.columns else [], 'date_range': None } # 날짜 범위 if 'receipt_date' in df.columns: dates = pd.to_datetime(df['receipt_date'], format='%Y%m%d', errors='coerce') dates = dates.dropna() if not dates.empty: summary['date_range'] = { 'start': dates.min().strftime('%Y-%m-%d'), 'end': dates.max().strftime('%Y-%m-%d') } return summary def export_to_standard(self): """표준 형식으로 변환""" if self.df_processed is None: return None # 표준 컬럼 순서 standard_columns = [ 'insurance_code', 'supplier_name', 'herb_name', 'receipt_date', 'quantity', 'total_amount', 'unit_price', 'origin_country', 'notes', 'ingredient_code' ] # 있는 컬럼만 선택 available_cols = [col for col in standard_columns if col in self.df_processed.columns] df_standard = self.df_processed[available_cols].copy() # 누락된 컬럼 추가 (기본값) for col in standard_columns: if col not in df_standard.columns: if col == 'notes': df_standard[col] = '' elif col == 'supplier_name': df_standard[col] = '미지정' else: df_standard[col] = None return df_standard[standard_columns] # 테스트 함수 def test_processor(): """프로세서 테스트""" processor = ExcelProcessor() # 한의사랑 테스트 print("="*60) print("한의사랑 형식 테스트") print("="*60) if processor.read_excel('/root/kdrug/sample/한의사랑.xlsx'): print(f"형식 감지: {processor.format_type}") df = processor.process() print(f"처리된 행 수: {len(df)}") valid, msg = processor.validate_data() print(f"검증 결과: {msg}") summary = processor.get_summary() print(f"요약:") print(f" - 총 약재: {summary['total_items']}종") print(f" - 총 수량: {summary['total_quantity']:,.0f}g") print(f" - 총 금액: {summary['total_amount']:,.0f}원") # 샘플 출력 print("\n처리된 데이터 샘플:") print(df.head(3).to_string()) # 한의정보 테스트 print("\n" + "="*60) print("한의정보 형식 테스트") print("="*60) processor2 = ExcelProcessor() if processor2.read_excel('/root/kdrug/sample/한의정보.xlsx'): print(f"형식 감지: {processor2.format_type}") df = processor2.process() print(f"처리된 행 수: {len(df)}") valid, msg = processor2.validate_data() print(f"검증 결과: {msg}") summary = processor2.get_summary() print(f"요약:") print(f" - 총 약재: {summary['total_items']}종") print(f" - 총 수량: {summary['total_quantity']:,.0f}g") print(f" - 총 금액: {summary['total_amount']:,.0f}원") print(f" - 공급업체: {', '.join(summary['suppliers'])}") # 샘플 출력 print("\n처리된 데이터 샘플:") print(df.head(3).to_string()) if __name__ == "__main__": test_processor()