# -*- coding: utf-8 -*- """ yakkok.com 제품 이미지 크롤러 - 제품명으로 검색하여 이미지 URL 추출 - base64로 변환하여 SQLite에 저장 """ import os import sys import sqlite3 import base64 import logging import hashlib import re from datetime import datetime from urllib.parse import quote import requests from PIL import Image from io import BytesIO # Playwright 동기 모드 from playwright.sync_api import sync_playwright # 로깅 설정 logging.basicConfig(level=logging.INFO, format='[%(levelname)s] %(message)s') logger = logging.getLogger(__name__) # DB 경로 DB_PATH = os.path.join(os.path.dirname(__file__), '..', 'db', 'product_images.db') # yakkok.com 설정 YAKKOK_BASE_URL = "https://yakkok.com" YAKKOK_SEARCH_URL = "https://yakkok.com/search?q={query}" def init_db(): """DB 초기화""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() # 스키마 파일 실행 schema_path = os.path.join(os.path.dirname(__file__), '..', 'db', 'product_images_schema.sql') if os.path.exists(schema_path): with open(schema_path, 'r', encoding='utf-8') as f: cursor.executescript(f.read()) conn.commit() conn.close() logger.info(f"[DB] 초기화 완료: {DB_PATH}") def get_existing_barcodes(): """이미 저장된 바코드 목록 조회""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute("SELECT barcode FROM product_images WHERE status IN ('success', 'manual')") barcodes = set(row[0] for row in cursor.fetchall()) conn.close() return barcodes def save_product_image(barcode, drug_code, product_name, search_name, image_base64, image_url, thumbnail_base64=None, status='success', error_message=None): """제품 이미지 저장""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute(""" INSERT OR REPLACE INTO product_images (barcode, drug_code, product_name, search_name, image_base64, image_url, thumbnail_base64, status, error_message, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, (barcode, drug_code, product_name, search_name, image_base64, image_url, thumbnail_base64, status, error_message, datetime.now().isoformat())) conn.commit() conn.close() logger.info(f"[DB] 저장 완료: {product_name} ({barcode}) - {status}") def download_image_as_base64(url, max_size=500): """이미지 다운로드 후 base64 변환 (리사이즈 포함)""" try: headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' } response = requests.get(url, headers=headers, timeout=10) response.raise_for_status() # PIL로 이미지 열기 img = Image.open(BytesIO(response.content)) # RGBA -> RGB 변환 (JPEG 저장용) if img.mode == 'RGBA': bg = Image.new('RGB', img.size, (255, 255, 255)) bg.paste(img, mask=img.split()[3]) img = bg elif img.mode != 'RGB': img = img.convert('RGB') # 리사이즈 (비율 유지) if max(img.size) > max_size: ratio = max_size / max(img.size) new_size = tuple(int(dim * ratio) for dim in img.size) img = img.resize(new_size, Image.LANCZOS) # base64 변환 buffer = BytesIO() img.save(buffer, format='JPEG', quality=85) base64_str = base64.b64encode(buffer.getvalue()).decode('utf-8') return base64_str except Exception as e: logger.error(f"[ERROR] 이미지 다운로드 실패: {url} - {e}") return None def clean_product_name(name): """검색용 제품명 정리""" # 괄호 안 내용 제거 (용량 등) name = re.sub(r'\([^)]*\)', '', name) # 숫자+단위 제거 (100ml, 500mg 등) name = re.sub(r'\d+\s*(ml|mg|g|kg|정|캡슐|T|t|개|EA|ea)', '', name, flags=re.IGNORECASE) # 특수문자 제거 name = re.sub(r'[_\-/\\]', ' ', name) # 연속 공백 정리 name = re.sub(r'\s+', ' ', name).strip() return name def search_yakkok(page, product_name): """yakkok.com에서 제품 검색하여 이미지 URL 반환""" try: # 검색어 정리 search_name = clean_product_name(product_name) if not search_name: search_name = product_name # 검색 페이지 접속 search_url = YAKKOK_SEARCH_URL.format(query=quote(search_name)) page.goto(search_url, wait_until='networkidle', timeout=15000) # 잠시 대기 page.wait_for_timeout(1000) # 첫 번째 검색 결과의 이미지 찾기 img_selector = 'img[alt]' images = page.query_selector_all(img_selector) for img in images: src = img.get_attribute('src') alt = img.get_attribute('alt') or '' # 로고, 아이콘 등 제외 if not src or 'logo' in src.lower() or 'icon' in src.lower(): continue # 검색 아이콘 등 제외 if alt in ['검색', '홈', '마이', '재고콕', '약콕인증', '뒤로가기']: continue # 제품 이미지로 보이는 것 반환 if src.startswith('http') or src.startswith('//'): if src.startswith('//'): src = 'https:' + src return src, search_name return None, search_name except Exception as e: logger.error(f"[ERROR] 검색 실패: {product_name} - {e}") return None, search_name def crawl_products(products, headless=True): """ 제품 목록 크롤링 products: [(barcode, drug_code, product_name), ...] """ init_db() existing = get_existing_barcodes() # 새로 크롤링할 제품만 필터 to_crawl = [(b, d, n) for b, d, n in products if b not in existing] if not to_crawl: logger.info("[INFO] 크롤링할 새 제품이 없습니다.") return {'total': 0, 'success': 0, 'failed': 0, 'skipped': len(products)} logger.info(f"[INFO] 크롤링 시작: {len(to_crawl)}개 (스킵: {len(products) - len(to_crawl)}개)") results = {'total': len(to_crawl), 'success': 0, 'failed': 0, 'skipped': len(products) - len(to_crawl)} with sync_playwright() as p: browser = p.chromium.launch(headless=headless) context = browser.new_context( viewport={'width': 390, 'height': 844}, # 모바일 뷰포트 user_agent='Mozilla/5.0 (iPhone; CPU iPhone OS 16_0 like Mac OS X) AppleWebKit/605.1.15' ) page = context.new_page() for barcode, drug_code, product_name in to_crawl: try: logger.info(f"[CRAWL] {product_name} ({barcode})") # yakkok 검색 image_url, search_name = search_yakkok(page, product_name) if image_url: # 이미지 다운로드 & base64 변환 image_base64 = download_image_as_base64(image_url) thumbnail_base64 = download_image_as_base64(image_url, max_size=100) if image_base64: save_product_image( barcode=barcode, drug_code=drug_code, product_name=product_name, search_name=search_name, image_base64=image_base64, image_url=image_url, thumbnail_base64=thumbnail_base64, status='success' ) results['success'] += 1 else: save_product_image( barcode=barcode, drug_code=drug_code, product_name=product_name, search_name=search_name, image_base64=None, image_url=image_url, status='failed', error_message='이미지 다운로드 실패' ) results['failed'] += 1 else: save_product_image( barcode=barcode, drug_code=drug_code, product_name=product_name, search_name=search_name, image_base64=None, image_url=None, status='no_result', error_message='검색 결과 없음' ) results['failed'] += 1 # 요청 간 딜레이 page.wait_for_timeout(500) except Exception as e: logger.error(f"[ERROR] {product_name}: {e}") save_product_image( barcode=barcode, drug_code=drug_code, product_name=product_name, search_name=product_name, image_base64=None, image_url=None, status='failed', error_message=str(e) ) results['failed'] += 1 browser.close() logger.info(f"[DONE] 완료 - 성공: {results['success']}, 실패: {results['failed']}, 스킵: {results['skipped']}") return results def get_sales_products(date_str=None): """특정 날짜 판매 제품 목록 조회 (MSSQL) Args: date_str: 날짜 문자열 (YYYYMMDD 또는 YYYY-MM-DD), None이면 오늘 """ try: # 상위 폴더의 db 모듈 import sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) from db.dbsetup import db_manager from sqlalchemy import text session = db_manager.get_session('PM_PRES') # 날짜 처리 if date_str: # YYYY-MM-DD -> YYYYMMDD 변환 target_date = date_str.replace('-', '') else: target_date = datetime.now().strftime('%Y%m%d') # 해당 날짜 판매된 품목 조회 (중복 제거) query = text(""" SELECT DISTINCT COALESCE(NULLIF(G.Barcode, ''), (SELECT TOP 1 CD_CD_BARCODE FROM PM_DRUG.dbo.CD_ITEM_UNIT_MEMBER WHERE DrugCode = S.DrugCode) ) AS barcode, S.DrugCode AS drug_code, ISNULL(G.GoodsName, '알수없음') AS product_name FROM SALE_SUB S LEFT JOIN PM_DRUG.dbo.CD_GOODS G ON S.DrugCode = G.DrugCode WHERE S.SL_NO_order LIKE :date_pattern AND S.DrugCode IS NOT NULL """) result = session.execute(query, {'date_pattern': f'{target_date}%'}).fetchall() products = [] for row in result: barcode = row[0] if barcode: # 바코드 있는 것만 products.append((barcode, row[1], row[2])) logger.info(f"[MSSQL] {target_date} 판매 품목: {len(products)}개") return products except Exception as e: logger.error(f"[ERROR] MSSQL 조회 실패: {e}") return [] def get_today_sales_products(): """오늘 판매된 제품 목록 조회 (하위호환)""" return get_sales_products(None) def crawl_sales_by_date(date_str=None, headless=True): """특정 날짜 판매 제품 이미지 크롤링 Args: date_str: 날짜 문자열 (YYYYMMDD 또는 YYYY-MM-DD), None이면 오늘 """ products = get_sales_products(date_str) if not products: return {'total': 0, 'success': 0, 'failed': 0, 'skipped': 0, 'message': '해당일 판매 내역 없음'} return crawl_products(products, headless=headless) def crawl_today_sales(headless=True): """오늘 판매된 제품 이미지 크롤링 (하위호환)""" return crawl_sales_by_date(None, headless=headless) # CLI 실행 if __name__ == '__main__': import argparse parser = argparse.ArgumentParser(description='yakkok.com 제품 이미지 크롤러') parser.add_argument('--today', action='store_true', help='오늘 판매 제품 크롤링') parser.add_argument('--product', type=str, help='특정 제품명으로 테스트') parser.add_argument('--visible', action='store_true', help='브라우저 표시') args = parser.parse_args() if args.today: result = crawl_today_sales(headless=not args.visible) print(f"\n결과: {result}") elif args.product: # 테스트용 단일 제품 크롤링 test_products = [('TEST001', 'TEST', args.product)] result = crawl_products(test_products, headless=not args.visible) print(f"\n결과: {result}") else: print("사용법:") print(" python yakkok_crawler.py --today # 오늘 판매 제품 크롤링") print(" python yakkok_crawler.py --product 타이레놀 # 특정 제품 테스트") print(" python yakkok_crawler.py --visible # 브라우저 표시")