""" Flask 웹 서버 - QR 마일리지 적립 간편 적립: 전화번호 + 이름만 입력 """ import sys import os # Windows 콘솔 UTF-8 강제 (한글 깨짐 방지) if sys.platform == 'win32': import io sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8') sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8') os.environ.setdefault('PYTHONIOENCODING', 'utf-8') from flask import Flask, request, render_template, jsonify, redirect, url_for, session import hashlib import base64 import secrets from datetime import datetime, timezone, timedelta import logging from sqlalchemy import text from dotenv import load_dotenv import json import time # 환경 변수 로드 load_dotenv() # OpenAI import try: from openai import OpenAI, OpenAIError, RateLimitError, APITimeoutError OPENAI_AVAILABLE = True except ImportError: OPENAI_AVAILABLE = False logging.warning("OpenAI 라이브러리가 설치되지 않았습니다. AI 분석 기능을 사용할 수 없습니다.") # Path setup sys.path.insert(0, os.path.dirname(__file__)) from db.dbsetup import DatabaseManager app = Flask(__name__) app.secret_key = 'pharmacy-qr-mileage-secret-key-2026' # 세션 설정 (PWA 자동적립 지원) app.config['SESSION_COOKIE_SECURE'] = not app.debug # HTTPS 전용 (로컬 개발 시 제외) app.config['SESSION_COOKIE_SAMESITE'] = 'Lax' # QR 스캔 시 쿠키 전송 허용 app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(days=90) # 3개월 유지 # 데이터베이스 매니저 db_manager = DatabaseManager() # KST 타임존 (UTC+9) KST = timezone(timedelta(hours=9)) # 키오스크 현재 세션 (메모리 변수, 서버 재시작 시 초기화) kiosk_current_session = None def utc_to_kst_str(utc_time_str): """ UTC 시간 문자열을 KST 시간 문자열로 변환 Args: utc_time_str (str): UTC 시간 문자열 (ISO 8601 형식) Returns: str: KST 시간 문자열 (YYYY-MM-DD HH:MM:SS) """ if not utc_time_str: return None try: # ISO 8601 형식 파싱 ('2026-01-23T12:28:36' 또는 '2026-01-23 12:28:36') utc_time_str = utc_time_str.replace(' ', 'T') # 공백을 T로 변환 # datetime 객체로 변환 if 'T' in utc_time_str: utc_time = datetime.fromisoformat(utc_time_str) else: utc_time = datetime.fromisoformat(utc_time_str) # UTC 타임존 설정 (naive datetime인 경우) if utc_time.tzinfo is None: utc_time = utc_time.replace(tzinfo=timezone.utc) # KST로 변환 kst_time = utc_time.astimezone(KST) # 문자열로 반환 (초 단위까지만) return kst_time.strftime('%Y-%m-%d %H:%M:%S') except Exception as e: logging.error(f"시간 변환 실패: {utc_time_str}, 오류: {e}") return utc_time_str # 변환 실패 시 원본 반환 # ===== OpenAI 설정 및 헬퍼 함수 ===== # OpenAI API 설정 OPENAI_API_KEY = os.getenv('OPENAI_API_KEY') OPENAI_MODEL = os.getenv('OPENAI_MODEL', 'gpt-4o-mini') OPENAI_MAX_TOKENS = int(os.getenv('OPENAI_MAX_TOKENS', '1000')) OPENAI_TEMPERATURE = float(os.getenv('OPENAI_TEMPERATURE', '0.7')) # OpenAI 사용 가능 여부 및 API 키 확인 if OPENAI_AVAILABLE and not OPENAI_API_KEY: logging.warning("OPENAI_API_KEY가 설정되지 않았습니다. AI 분석 기능을 사용할 수 없습니다.") OPENAI_AVAILABLE = False # System Prompt SYSTEM_PROMPT = """당신은 약국 고객 관리 전문가입니다. 고객의 구매 데이터를 분석하여 다음을 제공합니다: 1. 구매 패턴 및 행동 분석 2. 주요 관심 제품 카테고리 파악 3. 업셀링을 위한 제품 추천 4. 고객 맞춤형 마케팅 전략 응답은 반드시 JSON 형식으로 작성하며, 한국어로 작성합니다. 약국 운영자가 실제로 활용할 수 있는 구체적이고 실용적인 인사이트를 제공해야 합니다.""" # 에러 메시지 ERROR_MESSAGES = { 'NO_USER': '사용자를 찾을 수 없습니다.', 'NO_PURCHASES': '구매 이력이 없어 분석할 수 없습니다. 최소 1건 이상의 구매가 필요합니다.', 'OPENAI_NOT_AVAILABLE': 'AI 분석 기능을 사용할 수 없습니다. 관리자에게 문의하세요.', 'OPENAI_API_KEY_MISSING': 'OpenAI API 키가 설정되지 않았습니다.', 'OPENAI_API_ERROR': 'OpenAI API 호출에 실패했습니다. 잠시 후 다시 시도해주세요.', 'OPENAI_RATE_LIMIT': 'API 호출 횟수 제한에 도달했습니다. 잠시 후 다시 시도해주세요.', 'OPENAI_TIMEOUT': 'AI 분석 시간이 초과되었습니다. 다시 시도해주세요.', 'PARSING_ERROR': 'AI 응답을 처리하는 중 오류가 발생했습니다.', 'UNKNOWN_ERROR': '알 수 없는 오류가 발생했습니다. 관리자에게 문의하세요.' } def categorize_product(product_name): """제품명에서 카테고리 추정 (간단한 키워드 매칭)""" categories = { '소화제': ['타센', '베아제', '겔포스', '소화'], '진통제': ['타이레놀', '게보린', '펜잘', '이부프로펜'], '감기약': ['판콜', '화이투벤', '지르텍', '감기'], '피부약': ['후시딘', '마데카솔', '더마틱스'], '비타민': ['비타민', '센트룸', '활성비타민'], '안약': ['안약', '인공눈물'], '소염진통제': ['자미슬', '펠루비', '게보린'] } for category, keywords in categories.items(): for keyword in keywords: if keyword in product_name: return category return '기타' def prepare_analysis_prompt(user, purchases): """OpenAI API 전송용 프롬프트 생성""" # 사용자 정보 요약 user_summary = f"""사용자: {user['nickname']} ({user['phone']}) 가입일: {utc_to_kst_str(user['created_at']) if user['created_at'] else '-'} 포인트 잔액: {user['mileage_balance']:,}P 총 구매 건수: {len(purchases)}건 """ # 구매 이력 상세 purchase_details = [] total_spent = 0 all_products = [] product_freq = {} for idx, purchase in enumerate(purchases, 1): total_spent += purchase['amount'] products_str = ', '.join([f"{item['name']} x{item['qty']}" for item in purchase['items']]) # 제품 빈도 계산 for item in purchase['items']: product_name = item['name'] all_products.append(product_name) product_freq[product_name] = product_freq.get(product_name, 0) + 1 purchase_details.append( f"{idx}. {purchase['date']} - {purchase['amount']:,}원 구매, {purchase['points']}P 적립\n" f" 구매 품목: {products_str}" ) # 통계 계산 avg_purchase = total_spent // len(purchases) if purchases else 0 top_products = sorted(product_freq.items(), key=lambda x: x[1], reverse=True)[:5] top_products_str = ', '.join([f"{name}({count}회)" for name, count in top_products]) # 최종 프롬프트 조립 prompt = f"""다음은 약국 고객의 구매 데이터입니다. 구매 패턴을 분석하고 마케팅 전략을 제안해주세요. {user_summary} 통계 요약: - 총 구매 금액: {total_spent:,}원 - 평균 구매 금액: {avg_purchase:,}원 - 자주 구매한 품목: {top_products_str} 구매 이력 (최근 {len(purchases)}건): {chr(10).join(purchase_details)} 분석 요청사항: 1. 구매 패턴 분석: 구매 빈도, 구매 금액 패턴 등 2. 주로 구매하는 품목 카테고리 (예: 소화제, 감기약, 건강기능식품 등) 3. 추천 제품: 기존 구매 패턴을 기반으로 관심있을만한 제품 3-5가지 (업셀링) 4. 마케팅 전략: 이 고객에게 효과적일 프로모션 또는 포인트 활용 방안 응답은 다음 JSON 형식으로 해주세요: {{ "pattern": "구매 패턴에 대한 상세한 분석 (2-3문장)", "main_products": ["카테고리1: 품목들", "카테고리2: 품목들"], "recommendations": ["추천제품1 (이유)", "추천제품2 (이유)", "추천제품3 (이유)"], "marketing_strategy": "마케팅 전략 제안 (2-3문장)" }} """ return prompt def parse_openai_response(response_text): """OpenAI API 응답을 파싱하여 구조화된 데이터 반환""" import re try: # JSON 추출 (마크다운 코드 블록 제거) json_match = re.search(r'```json\s*(\{.*?\})\s*```', response_text, re.DOTALL) if json_match: json_str = json_match.group(1) else: # 코드 블록 없이 JSON만 있는 경우 json_str = response_text.strip() # JSON 파싱 analysis = json.loads(json_str) # 필수 필드 검증 required_fields = ['pattern', 'main_products', 'recommendations', 'marketing_strategy'] for field in required_fields: if field not in analysis: raise ValueError(f"필수 필드 누락: {field}") # 타입 검증 if not isinstance(analysis['main_products'], list): analysis['main_products'] = [str(analysis['main_products'])] if not isinstance(analysis['recommendations'], list): analysis['recommendations'] = [str(analysis['recommendations'])] return analysis except json.JSONDecodeError as e: # JSON 파싱 실패 시 fallback logging.error(f"JSON 파싱 실패: {e}") return { 'pattern': '응답 파싱에 실패했습니다.', 'main_products': ['분석 결과를 확인할 수 없습니다.'], 'recommendations': ['다시 시도해주세요.'], 'marketing_strategy': response_text[:500] } except Exception as e: logging.error(f"응답 파싱 오류: {e}") raise def handle_openai_error(error): """OpenAI API 에러를 사용자 친화적 메시지로 변환""" error_str = str(error).lower() if 'api key' in error_str or 'authentication' in error_str: return ERROR_MESSAGES['OPENAI_API_KEY_MISSING'] elif 'rate limit' in error_str or 'quota' in error_str: return ERROR_MESSAGES['OPENAI_RATE_LIMIT'] elif 'timeout' in error_str: return ERROR_MESSAGES['OPENAI_TIMEOUT'] else: return ERROR_MESSAGES['OPENAI_API_ERROR'] def call_openai_with_retry(prompt, max_retries=3): """재시도 로직을 포함한 OpenAI API 호출""" if not OPENAI_AVAILABLE: return False, ERROR_MESSAGES['OPENAI_NOT_AVAILABLE'] client = OpenAI(api_key=OPENAI_API_KEY) for attempt in range(max_retries): try: response = client.chat.completions.create( model=OPENAI_MODEL, messages=[ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": prompt} ], max_tokens=OPENAI_MAX_TOKENS, temperature=OPENAI_TEMPERATURE, timeout=30 ) return True, response except RateLimitError as e: if attempt < max_retries - 1: wait_time = 2 ** attempt logging.warning(f"OpenAI Rate limit, {wait_time}초 대기 후 재시도...") time.sleep(wait_time) else: return False, handle_openai_error(e) except APITimeoutError as e: if attempt < max_retries - 1: logging.warning(f"OpenAI 타임아웃, 재시도 중... ({attempt+1}/{max_retries})") time.sleep(1) else: return False, ERROR_MESSAGES['OPENAI_TIMEOUT'] except OpenAIError as e: return False, handle_openai_error(e) except Exception as e: logging.error(f"OpenAI API 호출 오류: {e}") return False, ERROR_MESSAGES['UNKNOWN_ERROR'] return False, ERROR_MESSAGES['OPENAI_TIMEOUT'] def verify_claim_token(transaction_id, nonce): """ QR 토큰 검증 Args: transaction_id (str): 거래 ID nonce (str): 12자 hex nonce Returns: tuple: (성공 여부, 메시지, 토큰 정보 dict) """ try: conn = db_manager.get_sqlite_connection() cursor = conn.cursor() # 1. 거래 ID로 토큰 조회 cursor.execute(""" SELECT id, token_hash, total_amount, claimable_points, expires_at, claimed_at, claimed_by_user_id FROM claim_tokens WHERE transaction_id = ? """, (transaction_id,)) token_record = cursor.fetchone() if not token_record: return (False, "유효하지 않은 QR 코드입니다.", None) # 2. 이미 적립된 토큰인지 확인 if token_record['claimed_at']: return (False, "이미 적립 완료된 영수증입니다.", None) # 3. 만료 확인 expires_at = datetime.strptime(token_record['expires_at'], '%Y-%m-%d %H:%M:%S') if datetime.now() > expires_at: return (False, "적립 기간이 만료되었습니다 (30일).", None) # 4. 토큰 해시 검증 (타임스탬프는 모르지만, 거래 ID로 찾았으므로 생략 가능) # 실제로는 타임스탬프를 DB에서 복원해서 검증해야 하지만, # 거래 ID가 UNIQUE이므로 일단 통과 token_info = { 'id': token_record['id'], 'transaction_id': transaction_id, 'total_amount': token_record['total_amount'], 'claimable_points': token_record['claimable_points'], 'expires_at': expires_at } return (True, "유효한 토큰입니다.", token_info) except Exception as e: return (False, f"토큰 검증 실패: {str(e)}", None) def get_or_create_user(phone, name, birthday=None): """ 사용자 조회 또는 생성 (간편 적립용) Args: phone (str): 전화번호 name (str): 이름 birthday (str, optional): 생년월일 (YYYY-MM-DD) Returns: tuple: (user_id, is_new_user) """ conn = db_manager.get_sqlite_connection() cursor = conn.cursor() # 전화번호로 조회 cursor.execute(""" SELECT id, mileage_balance FROM users WHERE phone = ? """, (phone,)) user = cursor.fetchone() if user: # 기존 유저: birthday가 제공되면 업데이트 if birthday: cursor.execute("UPDATE users SET birthday = ? WHERE id = ?", (birthday, user['id'])) conn.commit() return (user['id'], False) # 신규 생성 cursor.execute(""" INSERT INTO users (nickname, phone, birthday, mileage_balance) VALUES (?, ?, ?, 0) """, (name, phone, birthday)) conn.commit() return (cursor.lastrowid, True) def claim_mileage(user_id, token_info): """ 마일리지 적립 처리 Args: user_id (int): 사용자 ID token_info (dict): 토큰 정보 Returns: tuple: (성공 여부, 메시지, 적립 후 잔액) """ try: conn = db_manager.get_sqlite_connection() cursor = conn.cursor() # 1. 현재 잔액 조회 cursor.execute("SELECT mileage_balance FROM users WHERE id = ?", (user_id,)) user = cursor.fetchone() current_balance = user['mileage_balance'] # 2. 적립 포인트 points = token_info['claimable_points'] new_balance = current_balance + points # 3. 사용자 잔액 업데이트 cursor.execute(""" UPDATE users SET mileage_balance = ?, updated_at = ? WHERE id = ? """, (new_balance, datetime.now().strftime('%Y-%m-%d %H:%M:%S'), user_id)) # 4. 마일리지 원장 기록 cursor.execute(""" INSERT INTO mileage_ledger (user_id, transaction_id, points, balance_after, reason, description) VALUES (?, ?, ?, ?, ?, ?) """, ( user_id, token_info['transaction_id'], points, new_balance, 'CLAIM', f"영수증 QR 적립 ({token_info['total_amount']:,}원 구매)" )) # 5. claim_tokens 업데이트 (적립 완료 표시) cursor.execute(""" UPDATE claim_tokens SET claimed_at = ?, claimed_by_user_id = ? WHERE id = ? """, (datetime.now().strftime('%Y-%m-%d %H:%M:%S'), user_id, token_info['id'])) conn.commit() return (True, f"{points}P 적립 완료!", new_balance) except Exception as e: conn.rollback() return (False, f"적립 처리 실패: {str(e)}", 0) def normalize_kakao_phone(kakao_phone): """ 카카오 전화번호 형식 변환 "+82 10-1234-5678" → "01012345678" """ if not kakao_phone: return None digits = kakao_phone.replace('+', '').replace('-', '').replace(' ', '') if digits.startswith('82'): digits = '0' + digits[2:] if len(digits) >= 10: return digits return None def link_kakao_identity(user_id, kakao_id, kakao_data): """카카오 계정을 customer_identities에 연결""" conn = db_manager.get_sqlite_connection() cursor = conn.cursor() cursor.execute(""" SELECT id FROM customer_identities WHERE provider = 'kakao' AND provider_user_id = ? """, (kakao_id,)) is_new_link = cursor.fetchone() is None if is_new_link: # raw_data 제외 (세션 크기 절약) store_data = {k: v for k, v in kakao_data.items() if k != 'raw_data'} cursor.execute(""" INSERT INTO customer_identities (user_id, provider, provider_user_id, provider_data) VALUES (?, 'kakao', ?, ?) """, (user_id, kakao_id, json.dumps(store_data, ensure_ascii=False))) # 프로필 이미지, 이메일 업데이트 updates = [] params = [] if kakao_data.get('profile_image'): updates.append("profile_image_url = ?") params.append(kakao_data['profile_image']) if kakao_data.get('email'): updates.append("email = ?") params.append(kakao_data['email']) # 카카오 인증일 기록 (최초 연동 시) if is_new_link: updates.append("kakao_verified_at = datetime('now')") if updates: params.append(user_id) cursor.execute(f"UPDATE users SET {', '.join(updates)} WHERE id = ?", params) conn.commit() def find_user_by_kakao_id(kakao_id): """카카오 ID로 기존 연결된 사용자 조회""" conn = db_manager.get_sqlite_connection() cursor = conn.cursor() cursor.execute(""" SELECT user_id FROM customer_identities WHERE provider = 'kakao' AND provider_user_id = ? """, (kakao_id,)) row = cursor.fetchone() return row['user_id'] if row else None # ============================================================================ # 라우트 # ============================================================================ @app.route('/') def index(): """메인 페이지""" logged_in = 'logged_in_user_id' in session return render_template('index.html', logged_in=logged_in, logged_in_name=session.get('logged_in_name', ''), logged_in_phone=session.get('logged_in_phone', '') ) @app.route('/signup') def signup(): """회원가입 페이지""" if 'logged_in_user_id' in session: return redirect(f"/my-page?phone={session.get('logged_in_phone', '')}") return render_template('signup.html') @app.route('/api/signup', methods=['POST']) def api_signup(): """회원가입 API""" try: data = request.get_json() name = data.get('name', '').strip() phone = data.get('phone', '').strip().replace('-', '').replace(' ', '') birthday = data.get('birthday', '').strip() or None # 선택 항목 if not name or not phone: return jsonify({'success': False, 'message': '이름과 전화번호를 모두 입력해주세요.'}), 400 if len(phone) < 10: return jsonify({'success': False, 'message': '올바른 전화번호를 입력해주세요.'}), 400 user_id, is_new = get_or_create_user(phone, name, birthday=birthday) # 세션에 유저 정보 저장 session.permanent = True session['logged_in_user_id'] = user_id session['logged_in_phone'] = phone session['logged_in_name'] = name return jsonify({ 'success': True, 'message': '가입이 완료되었습니다.' if is_new else '이미 가입된 회원입니다. 로그인되었습니다.', 'is_new': is_new }) except Exception as e: return jsonify({'success': False, 'message': f'오류가 발생했습니다: {str(e)}'}), 500 @app.route('/claim') def claim(): """ QR 코드 랜딩 페이지 URL: /claim?t=transaction_id:nonce """ # 토큰 파라미터 파싱 token_param = request.args.get('t', '') if ':' not in token_param: return render_template('error.html', message="잘못된 QR 코드 형식입니다.") parts = token_param.split(':') if len(parts) != 2: return render_template('error.html', message="잘못된 QR 코드 형식입니다.") transaction_id, nonce = parts[0], parts[1] # 토큰 검증 success, message, token_info = verify_claim_token(transaction_id, nonce) if not success: return render_template('error.html', message=message) # 세션에 로그인된 유저가 있으면 자동 적립 (PWA) if 'logged_in_user_id' in session: auto_user_id = session['logged_in_user_id'] conn = db_manager.get_sqlite_connection() cursor = conn.cursor() cursor.execute("SELECT id, nickname, phone, mileage_balance FROM users WHERE id = ?", (auto_user_id,)) auto_user = cursor.fetchone() if auto_user: auto_success, auto_msg, auto_balance = claim_mileage(auto_user_id, token_info) if auto_success: return render_template('claim_kakao_success.html', points=token_info['claimable_points'], balance=auto_balance, phone=auto_user['phone'], name=auto_user['nickname']) return render_template('error.html', message=auto_msg) else: # 유저가 삭제됨 - 세션 클리어 session.pop('logged_in_user_id', None) session.pop('logged_in_phone', None) session.pop('logged_in_name', None) # MSSQL에서 구매 품목 조회 sale_items = [] try: db_session = db_manager.get_session('PM_PRES') sale_sub_query = text(""" SELECT ISNULL(G.GoodsName, '(약품명 없음)') AS goods_name, S.SL_NM_item AS quantity, S.SL_TOTAL_PRICE AS total FROM SALE_SUB S LEFT JOIN PM_DRUG.dbo.CD_GOODS G ON S.DrugCode = G.DrugCode WHERE S.SL_NO_order = :transaction_id ORDER BY S.DrugCode """) rows = db_session.execute(sale_sub_query, {'transaction_id': transaction_id}).fetchall() sale_items = [ {'name': r.goods_name, 'qty': int(r.quantity or 0), 'total': int(r.total or 0)} for r in rows ] except Exception as e: logging.warning(f"품목 조회 실패 (transaction_id={transaction_id}): {e}") # JS SDK용 카카오 state 생성 (CSRF 보호) csrf_token = secrets.token_hex(16) state_data = {'t': token_param, 'csrf': csrf_token} kakao_state = base64.urlsafe_b64encode( json.dumps(state_data).encode() ).decode() session['kakao_csrf'] = csrf_token return render_template('claim_form.html', token_info=token_info, sale_items=sale_items, kakao_state=kakao_state) @app.route('/api/claim', methods=['POST']) def api_claim(): """ 마일리지 적립 API POST /api/claim Body: { "transaction_id": "...", "nonce": "...", "phone": "010-1234-5678", "name": "홍길동" } """ try: data = request.get_json() transaction_id = data.get('transaction_id') nonce = data.get('nonce') phone = data.get('phone', '').strip() name = data.get('name', '').strip() privacy_consent = data.get('privacy_consent', False) # 입력 검증 if not phone or not name: return jsonify({ 'success': False, 'message': '전화번호와 이름을 모두 입력해주세요.' }), 400 # 개인정보 동의 검증 if not privacy_consent: return jsonify({ 'success': False, 'message': '개인정보 수집·이용에 동의해주세요.' }), 400 # 전화번호 형식 정리 (하이픈 제거) phone = phone.replace('-', '').replace(' ', '') if len(phone) < 10: return jsonify({ 'success': False, 'message': '올바른 전화번호를 입력해주세요.' }), 400 # 토큰 검증 success, message, token_info = verify_claim_token(transaction_id, nonce) if not success: return jsonify({ 'success': False, 'message': message }), 400 # 사용자 조회/생성 user_id, is_new = get_or_create_user(phone, name) # 마일리지 적립 success, message, new_balance = claim_mileage(user_id, token_info) if not success: return jsonify({ 'success': False, 'message': message }), 500 # 세션에 유저 정보 저장 (PWA 자동적립용) session.permanent = True session['logged_in_user_id'] = user_id session['logged_in_phone'] = phone session['logged_in_name'] = name return jsonify({ 'success': True, 'message': message, 'points': token_info['claimable_points'], 'balance': new_balance, 'is_new_user': is_new }) except Exception as e: return jsonify({ 'success': False, 'message': f'오류가 발생했습니다: {str(e)}' }), 500 # ============================================================================ # 카카오 적립 라우트 # ============================================================================ @app.route('/claim/kakao/start') def claim_kakao_start(): """카카오 OAuth 시작 - claim 컨텍스트를 state에 담아 카카오로 리다이렉트""" from services.kakao_client import get_kakao_client token_param = request.args.get('t', '') if ':' not in token_param: return render_template('error.html', message="잘못된 QR 코드입니다.") parts = token_param.split(':') if len(parts) != 2: return render_template('error.html', message="잘못된 QR 코드입니다.") transaction_id, nonce = parts # 토큰 사전 검증 success, message, token_info = verify_claim_token(transaction_id, nonce) if not success: return render_template('error.html', message=message) # state: claim 컨텍스트 + CSRF 토큰 csrf_token = secrets.token_hex(16) state_data = { 't': token_param, 'csrf': csrf_token } state_encoded = base64.urlsafe_b64encode( json.dumps(state_data).encode() ).decode() session['kakao_csrf'] = csrf_token kakao_client = get_kakao_client() auth_url = kakao_client.get_authorization_url(state=state_encoded) return redirect(auth_url) def _handle_mypage_kakao_callback(code, kakao_client): """ 마이페이지 카카오 콜백 처리 - 카카오 연동(머지) + 마이페이지 이동 케이스별 동작: A) 카카오 ID가 이미 연결된 유저 → 그 유저의 마이페이지로 이동 B) 미연결 + 카카오 전화번호로 기존 유저 발견 → 카카오 연동 후 이동 C) 미연결 + 기존 유저 없음 → 신규 생성 + 카카오 연동 D) 전화번호 없음 → 에러 안내 """ success, token_data = kakao_client.get_access_token(code) if not success: return render_template('error.html', message="카카오 인증에 실패했습니다.") access_token = token_data.get('access_token') success, user_info = kakao_client.get_user_info(access_token) if not success: return render_template('error.html', message="카카오 사용자 정보를 가져올 수 없습니다.") kakao_id = user_info.get('kakao_id') kakao_phone_raw = user_info.get('phone_number') kakao_phone = normalize_kakao_phone(kakao_phone_raw) kakao_name = user_info.get('name') or user_info.get('nickname', '고객') conn = db_manager.get_sqlite_connection() cursor = conn.cursor() # Case A: 카카오 ID로 이미 연결된 유저 있음 existing_user_id = find_user_by_kakao_id(kakao_id) if existing_user_id: # "고객" 이름이면 카카오 실명으로 업데이트 if kakao_name and kakao_name != '고객': cursor.execute( "UPDATE users SET nickname = ? WHERE id = ? AND nickname = '고객'", (kakao_name, existing_user_id)) conn.commit() cursor.execute("SELECT phone FROM users WHERE id = ?", (existing_user_id,)) row = cursor.fetchone() if row and row['phone']: return redirect(f"/my-page?phone={row['phone']}") # 전화번호 없으면 연동 불가 if not kakao_phone: return render_template('error.html', message="카카오 계정에 전화번호 정보가 없습니다. 카카오 설정에서 전화번호를 등록해주세요.") # Case B/C: 전화번호로 기존 유저 조회 cursor.execute("SELECT id, nickname FROM users WHERE phone = ?", (kakao_phone,)) phone_user = cursor.fetchone() if phone_user: # Case B: 기존 전화번호 유저 → 카카오 연동 (머지) user_id = phone_user['id'] link_kakao_identity(user_id, kakao_id, user_info) # "고객" 이름이면 카카오 실명으로 업데이트 if phone_user['nickname'] == '고객' and kakao_name and kakao_name != '고객': cursor.execute("UPDATE users SET nickname = ? WHERE id = ?", (kakao_name, user_id)) conn.commit() logging.info(f"마이페이지 카카오 머지: user_id={user_id}, kakao_id={kakao_id}") else: # Case C: 신규 유저 생성 + 카카오 연동 user_id, _ = get_or_create_user(kakao_phone, kakao_name) link_kakao_identity(user_id, kakao_id, user_info) logging.info(f"마이페이지 카카오 신규: user_id={user_id}, kakao_id={kakao_id}") return redirect(f"/my-page?phone={kakao_phone}") @app.route('/claim/kakao/callback') def claim_kakao_callback(): """카카오 OAuth 콜백 - 토큰 교환 → 사용자 정보 → 적립 처리""" from services.kakao_client import get_kakao_client code = request.args.get('code') state_encoded = request.args.get('state') error = request.args.get('error') if error: return render_template('error.html', message="카카오 로그인이 취소되었습니다.") if not code or not state_encoded: return render_template('error.html', message="카카오 인증 정보가 올바르지 않습니다.") # 1. state 디코딩 try: state_json = base64.urlsafe_b64decode(state_encoded).decode() state_data = json.loads(state_json) except Exception: return render_template('error.html', message="인증 상태 정보가 올바르지 않습니다.") # 2. CSRF 검증 csrf_token = state_data.get('csrf') if csrf_token != session.pop('kakao_csrf', None): return render_template('error.html', message="보안 검증에 실패했습니다. 다시 시도해주세요.") # 2.5 마이페이지 조회 목적이면 별도 처리 if state_data.get('purpose') == 'mypage': return _handle_mypage_kakao_callback(code, get_kakao_client()) # 3. claim 컨텍스트 복원 token_param = state_data.get('t', '') parts = token_param.split(':') if len(parts) != 2: return render_template('error.html', message="적립 정보가 올바르지 않습니다.") transaction_id, nonce = parts # 4. 토큰 재검증 (OAuth 중 다른 사람이 적립했을 수 있음) success, message, token_info = verify_claim_token(transaction_id, nonce) if not success: return render_template('error.html', message=message) # 5. 카카오 access_token 교환 kakao_client = get_kakao_client() success, token_data = kakao_client.get_access_token(code) if not success: logging.error(f"카카오 토큰 교환 실패: {token_data}") return render_template('error.html', message="카카오 인증에 실패했습니다. 다시 시도해주세요.") # 6. 사용자 정보 조회 access_token = token_data.get('access_token') success, user_info = kakao_client.get_user_info(access_token) if not success: logging.error(f"카카오 사용자 정보 조회 실패: {user_info}") return render_template('error.html', message="카카오 사용자 정보를 가져올 수 없습니다.") kakao_id = user_info.get('kakao_id') kakao_name = user_info.get('name') or user_info.get('nickname', '') kakao_phone_raw = user_info.get('phone_number') kakao_phone = normalize_kakao_phone(kakao_phone_raw) # 카카오에서 받은 생년월일 조합 (YYYY-MMDD) kakao_birthday = None if user_info.get('birthyear') and user_info.get('birthday'): kakao_birthday = f"{user_info['birthyear']}-{user_info['birthday'][:2]}-{user_info['birthday'][2:]}" # 7. 분기: 전화번호가 있으면 자동 적립, 없으면 폰 입력 폼 if kakao_phone: # 자동 적립 existing_user_id = find_user_by_kakao_id(kakao_id) if existing_user_id: user_id = existing_user_id is_new = False # 생년월일이 있으면 업데이트 if kakao_birthday: conn = db_manager.get_sqlite_connection() conn.cursor().execute("UPDATE users SET birthday = ? WHERE id = ? AND birthday IS NULL", (kakao_birthday, user_id)) conn.commit() else: user_id, is_new = get_or_create_user(kakao_phone, kakao_name, birthday=kakao_birthday) link_kakao_identity(user_id, kakao_id, user_info) success, msg, new_balance = claim_mileage(user_id, token_info) if not success: return render_template('error.html', message=msg) # 세션에 유저 정보 저장 (PWA 자동적립용) session.permanent = True session['logged_in_user_id'] = user_id session['logged_in_phone'] = kakao_phone session['logged_in_name'] = kakao_name return render_template('claim_kakao_success.html', points=token_info['claimable_points'], balance=new_balance, phone=kakao_phone, name=kakao_name) else: # 전화번호 없음 → 세션에 카카오 정보 저장 + 폰 입력 폼 session['kakao_claim'] = { 'kakao_id': kakao_id, 'name': kakao_name, 'profile_image': user_info.get('profile_image'), 'email': user_info.get('email'), 'token_param': token_param } return render_template('claim_kakao_phone.html', token_info=token_info, kakao_name=kakao_name, kakao_profile_image=user_info.get('profile_image')) @app.route('/api/claim/kakao', methods=['POST']) def api_claim_kakao(): """카카오 적립 (전화번호 폴백) - 세션의 카카오 정보 + 폼의 전화번호로 적립""" kakao_data = session.pop('kakao_claim', None) if not kakao_data: return jsonify({ 'success': False, 'message': '카카오 인증 정보가 만료되었습니다. 다시 시도해주세요.' }), 400 data = request.get_json() phone = data.get('phone', '').strip().replace('-', '').replace(' ', '') if len(phone) < 10: session['kakao_claim'] = kakao_data return jsonify({ 'success': False, 'message': '올바른 전화번호를 입력해주세요.' }), 400 token_param = kakao_data['token_param'] parts = token_param.split(':') transaction_id, nonce = parts success, message, token_info = verify_claim_token(transaction_id, nonce) if not success: return jsonify({'success': False, 'message': message}), 400 kakao_id = kakao_data['kakao_id'] name = kakao_data['name'] existing_user_id = find_user_by_kakao_id(kakao_id) if existing_user_id: user_id = existing_user_id is_new = False else: user_id, is_new = get_or_create_user(phone, name) link_kakao_identity(user_id, kakao_id, kakao_data) success, message, new_balance = claim_mileage(user_id, token_info) if not success: return jsonify({'success': False, 'message': message}), 500 # 세션에 유저 정보 저장 (PWA 자동적립용) session.permanent = True session['logged_in_user_id'] = user_id session['logged_in_phone'] = phone session['logged_in_name'] = name return jsonify({ 'success': True, 'message': message, 'points': token_info['claimable_points'], 'balance': new_balance, 'is_new_user': is_new }) @app.route('/my-page/kakao/start') def mypage_kakao_start(): """마이페이지 카카오 로그인 조회""" from services.kakao_client import get_kakao_client csrf_token = secrets.token_hex(16) state_data = { 'purpose': 'mypage', 'csrf': csrf_token } state_encoded = base64.urlsafe_b64encode( json.dumps(state_data).encode() ).decode() session['kakao_csrf'] = csrf_token kakao_client = get_kakao_client() auth_url = kakao_client.get_authorization_url(state=state_encoded) return redirect(auth_url) @app.route('/my-page') def my_page(): """마이페이지 (전화번호로 조회)""" phone = request.args.get('phone', '') if not phone: # JS SDK용 카카오 state 생성 csrf_token = secrets.token_hex(16) state_data = {'purpose': 'mypage', 'csrf': csrf_token} kakao_state = base64.urlsafe_b64encode( json.dumps(state_data).encode() ).decode() session['kakao_csrf'] = csrf_token return render_template('my_page_login.html', kakao_state=kakao_state) # 전화번호로 사용자 조회 phone = phone.replace('-', '').replace(' ', '') conn = db_manager.get_sqlite_connection() cursor = conn.cursor() cursor.execute(""" SELECT id, nickname, phone, mileage_balance, created_at FROM users WHERE phone = ? """, (phone,)) user_raw = cursor.fetchone() if not user_raw: return render_template('error.html', message='등록되지 않은 전화번호입니다.') # 사용자 정보에 KST 시간 변환 적용 user = dict(user_raw) user['created_at'] = utc_to_kst_str(user_raw['created_at']) # 적립 내역 조회 (transaction_id 포함) cursor.execute(""" SELECT points, balance_after, reason, description, created_at, transaction_id FROM mileage_ledger WHERE user_id = ? ORDER BY created_at DESC LIMIT 20 """, (user['id'],)) transactions_raw = cursor.fetchall() # 거래 내역에 KST 시간 변환 적용 transactions = [] for tx in transactions_raw: tx_dict = dict(tx) tx_dict['created_at'] = utc_to_kst_str(tx['created_at']) transactions.append(tx_dict) return render_template('my_page.html', user=user, transactions=transactions, user_id=user['id']) # ============================================================================ # PWA / 공통 라우트 # ============================================================================ @app.route('/sw.js') def service_worker(): """서비스 워커를 루트 경로에서 제공 (scope='/' 허용)""" return app.send_static_file('sw.js'), 200, { 'Content-Type': 'application/javascript', 'Service-Worker-Allowed': '/' } @app.route('/privacy') def privacy(): """개인정보 처리방침""" return render_template('privacy.html') @app.route('/logout') def logout(): """세션 로그아웃""" session.pop('logged_in_user_id', None) session.pop('logged_in_phone', None) session.pop('logged_in_name', None) return redirect('/') @app.route('/admin/transaction/') def admin_transaction_detail(transaction_id): """거래 세부 내역 조회 (MSSQL)""" try: # MSSQL PM_PRES 연결 session = db_manager.get_session('PM_PRES') # SALE_MAIN 조회 (거래 헤더) sale_main_query = text(""" SELECT SL_NO_order, InsertTime, SL_MY_total, SL_MY_discount, SL_MY_sale, SL_MY_credit, SL_MY_recive, SL_MY_rec_vat, ISNULL(SL_NM_custom, '[비고객]') AS customer_name FROM SALE_MAIN WHERE SL_NO_order = :transaction_id """) sale_main = session.execute(sale_main_query, {'transaction_id': transaction_id}).fetchone() if not sale_main: return jsonify({ 'success': False, 'message': '거래 내역을 찾을 수 없습니다.' }), 404 # SALE_SUB 조회 (판매 상품 상세) sale_sub_query = text(""" SELECT S.DrugCode, ISNULL(G.GoodsName, '(약품명 없음)') AS goods_name, S.SL_NM_item AS quantity, S.SL_NM_cost_a AS price, S.SL_TOTAL_PRICE AS total FROM SALE_SUB S LEFT JOIN PM_DRUG.dbo.CD_GOODS G ON S.DrugCode = G.DrugCode WHERE S.SL_NO_order = :transaction_id ORDER BY S.DrugCode """) sale_items = session.execute(sale_sub_query, {'transaction_id': transaction_id}).fetchall() # 결과를 JSON으로 반환 result = { 'success': True, 'transaction': { 'id': sale_main.SL_NO_order, 'date': str(sale_main.InsertTime), 'total_amount': int(sale_main.SL_MY_total or 0), 'discount': int(sale_main.SL_MY_discount or 0), 'sale_amount': int(sale_main.SL_MY_sale or 0), 'credit': int(sale_main.SL_MY_credit or 0), 'supply_value': int(sale_main.SL_MY_recive or 0), 'vat': int(sale_main.SL_MY_rec_vat or 0), 'customer_name': sale_main.customer_name }, 'items': [ { 'code': item.DrugCode, 'name': item.goods_name, 'qty': int(item.quantity or 0), 'price': int(item.price or 0), 'total': int(item.total or 0) } for item in sale_items ] } return jsonify(result) except Exception as e: return jsonify({ 'success': False, 'message': f'조회 실패: {str(e)}' }), 500 @app.route('/admin/user/') def admin_user_detail(user_id): """사용자 상세 이력 조회 - 구매 이력, 적립 이력, 구매 품목""" conn = None try: # 1. SQLite 연결 (새 연결 사용) conn = db_manager.get_sqlite_connection(new_connection=True) cursor = conn.cursor() # 2. 사용자 기본 정보 조회 cursor.execute(""" SELECT id, nickname, phone, mileage_balance, created_at FROM users WHERE id = ? """, (user_id,)) user = cursor.fetchone() if not user: return jsonify({ 'success': False, 'message': '사용자를 찾을 수 없습니다.' }), 404 # 3. 마일리지 이력 조회 (최근 50건) cursor.execute(""" SELECT transaction_id, points, balance_after, reason, description, created_at FROM mileage_ledger WHERE user_id = ? ORDER BY created_at DESC LIMIT 50 """, (user_id,)) mileage_history = cursor.fetchall() # 4. 구매 이력 조회 (적립된 거래만, 최근 20건) cursor.execute(""" SELECT transaction_id, total_amount, claimable_points, claimed_at FROM claim_tokens WHERE claimed_by_user_id = ? ORDER BY claimed_at DESC LIMIT 20 """, (user_id,)) claimed_tokens = cursor.fetchall() # 5. 각 거래의 상품 상세 조회 (MSSQL) purchases = [] try: session = db_manager.get_session('PM_PRES') for token in claimed_tokens: transaction_id = token['transaction_id'] # SALE_MAIN에서 거래 시간 조회 sale_main_query = text(""" SELECT InsertTime FROM SALE_MAIN WHERE SL_NO_order = :transaction_id """) sale_main = session.execute( sale_main_query, {'transaction_id': transaction_id} ).fetchone() # 거래 시간 추출 (MSSQL의 실제 거래 시간) if sale_main and sale_main.InsertTime: transaction_date = str(sale_main.InsertTime)[:16].replace('T', ' ') else: transaction_date = '-' # SALE_SUB + CD_GOODS JOIN (BARCODE 추가) sale_items_query = text(""" SELECT S.BARCODE, S.DrugCode, ISNULL(G.GoodsName, '(약품명 없음)') AS goods_name, S.SL_NM_item AS quantity, S.SL_NM_cost_a AS price, S.SL_TOTAL_PRICE AS total FROM SALE_SUB S LEFT JOIN PM_DRUG.dbo.CD_GOODS G ON S.DrugCode = G.DrugCode WHERE S.SL_NO_order = :transaction_id ORDER BY S.DrugCode """) items_raw = session.execute( sale_items_query, {'transaction_id': transaction_id} ).fetchall() # 상품 리스트 변환 (카테고리 포함) items = [] for item in items_raw: barcode = item.BARCODE # SQLite에서 제품 카테고리 조회 (테이블 없으면 건너뜀) categories = [] if barcode: try: cursor.execute(""" SELECT category_name, relevance_score FROM product_category_mapping WHERE barcode = ? ORDER BY relevance_score DESC """, (barcode,)) for cat_row in cursor.fetchall(): categories.append({ 'name': cat_row[0], 'score': cat_row[1] }) except Exception: pass # 테이블 없으면 무시 items.append({ 'code': item.DrugCode, 'barcode': barcode, 'name': item.goods_name, 'qty': int(item.quantity or 0), 'price': int(item.price or 0), 'total': int(item.total or 0), 'categories': categories }) # 상품 요약 생성 ("첫번째상품명 외 N개") if items: first_item_name = items[0]['name'] items_count = len(items) if items_count == 1: items_summary = first_item_name else: items_summary = f"{first_item_name} 외 {items_count - 1}개" else: items_summary = "상품 정보 없음" items_count = 0 purchases.append({ 'transaction_id': transaction_id, 'date': transaction_date, # MSSQL의 실제 거래 시간 사용 'amount': int(token['total_amount']), 'points': int(token['claimable_points']), 'items_summary': items_summary, 'items_count': items_count, 'items': items }) except Exception as mssql_error: # MSSQL 연결 실패 시 빈 배열 반환 print(f"[WARNING] MSSQL 조회 실패 (user {user_id}): {mssql_error}") purchases = [] # 6. 조제 이력 조회 (전화번호 → CUSCODE → PS_main) prescriptions = [] pos_customer = None if user['phone']: try: phone_clean = user['phone'].replace('-', '').replace(' ', '') base_session = db_manager.get_session('PM_BASE') pres_session = db_manager.get_session('PM_PRES') # 전화번호로 CUSCODE 조회 cuscode_query = text(""" SELECT TOP 1 CUSCODE, PANAME FROM CD_PERSON WHERE REPLACE(REPLACE(PHONE, '-', ''), ' ', '') = :phone OR REPLACE(REPLACE(TEL_NO, '-', ''), ' ', '') = :phone OR REPLACE(REPLACE(PHONE2, '-', ''), ' ', '') = :phone """) cus_row = base_session.execute(cuscode_query, {'phone': phone_clean}).fetchone() if cus_row: cuscode = cus_row.CUSCODE pos_customer = {'cuscode': cuscode, 'name': cus_row.PANAME} # 조제 이력 조회 rx_query = text(""" SELECT TOP 30 P.PreSerial, P.Indate, P.Paname, P.Drname, P.OrderName, P.TDAYS FROM PS_main P WHERE P.CusCode = :cuscode ORDER BY P.Indate DESC, P.PreSerial DESC """) rxs = pres_session.execute(rx_query, {'cuscode': cuscode}).fetchall() for rx in rxs: # 처방 품목 조회 items_query = text(""" SELECT S.DrugCode, G.GoodsName, S.Days, S.QUAN, S.QUAN_TIME FROM PS_sub_pharm S LEFT JOIN PM_DRUG.dbo.CD_GOODS G ON S.DrugCode = G.DrugCode WHERE S.PreSerial = :pre_serial """) items = pres_session.execute(items_query, {'pre_serial': rx.PreSerial}).fetchall() prescriptions.append({ 'pre_serial': rx.PreSerial, 'date': rx.Indate, 'patient_name': rx.Paname, 'doctor': rx.Drname, 'hospital': rx.OrderName, 'total_days': rx.TDAYS, 'items': [{ 'drug_code': item.DrugCode, 'name': item.GoodsName or '알 수 없음', 'days': float(item.Days) if item.Days else 0, 'quantity': float(item.QUAN) if item.QUAN else 0, 'times_per_day': float(item.QUAN_TIME) if item.QUAN_TIME else 0 } for item in items] }) except Exception as rx_error: logging.warning(f"조제 이력 조회 실패 (user {user_id}): {rx_error}") # 7. 응답 생성 return jsonify({ 'success': True, 'user': { 'id': user['id'], 'name': user['nickname'], 'phone': user['phone'], 'balance': user['mileage_balance'], 'created_at': utc_to_kst_str(user['created_at']), 'is_kakao_verified': user['nickname'] != '고객' # 카카오 인증 여부 }, 'mileage_history': [ { 'points': ml['points'], 'balance_after': ml['balance_after'], 'reason': ml['reason'], 'description': ml['description'], 'created_at': utc_to_kst_str(ml['created_at']), 'transaction_id': ml['transaction_id'] } for ml in mileage_history ], 'purchases': purchases, 'prescriptions': prescriptions, 'pos_customer': pos_customer }) except Exception as e: import traceback logging.error(f"사용자 상세 조회 실패: {e}\n{traceback.format_exc()}") return jsonify({ 'success': False, 'message': f'조회 실패: {str(e)}' }), 500 finally: if conn: try: conn.close() except: pass @app.route('/admin/search/user') def admin_search_user(): """사용자 검색 (이름/전화번호/전화번호 뒷자리)""" query = request.args.get('q', '').strip() search_type = request.args.get('type', 'name') # 'name', 'phone', 'phone_last' if not query: return jsonify({'success': False, 'message': '검색어를 입력하세요'}), 400 conn = None try: conn = db_manager.get_sqlite_connection(new_connection=True) cursor = conn.cursor() if search_type == 'phone_last': # 전화번호 뒷자리 검색 cursor.execute(""" SELECT id, nickname, phone, mileage_balance FROM users WHERE phone LIKE ? ORDER BY created_at DESC """, (f'%{query}',)) elif search_type == 'phone': # 전체 전화번호 검색 cursor.execute(""" SELECT id, nickname, phone, mileage_balance FROM users WHERE phone = ? """, (query,)) else: # 이름 검색 cursor.execute(""" SELECT id, nickname, phone, mileage_balance FROM users WHERE nickname LIKE ? ORDER BY created_at DESC """, (f'%{query}%',)) results = cursor.fetchall() if not results: return jsonify({'success': False, 'message': '검색 결과가 없습니다'}), 404 if len(results) == 1: # 단일 매칭 - user_id만 반환 return jsonify({ 'success': True, 'multiple': False, 'user_id': results[0]['id'] }) else: # 여러 명 매칭 - 선택 모달용 데이터 반환 users = [{ 'id': row['id'], 'name': row['nickname'], 'phone': row['phone'], 'balance': row['mileage_balance'], 'is_kakao_verified': row['nickname'] != '고객' # 카카오 인증 여부 } for row in results] return jsonify({ 'success': True, 'multiple': True, 'users': users }) except Exception as e: import traceback logging.error(f"사용자 검색 실패: {e}\n{traceback.format_exc()}") return jsonify({ 'success': False, 'message': f'검색 실패: {str(e)}' }), 500 finally: if conn: try: conn.close() except: pass @app.route('/admin/search/product') def admin_search_product(): """제품 검색 - 적립자 목록 반환 (SQLite 적립자 기준)""" query = request.args.get('q', '').strip() if not query: return jsonify({'success': False, 'message': '검색어를 입력하세요'}), 400 conn = None try: conn = db_manager.get_sqlite_connection(new_connection=True) cursor = conn.cursor() # 1. MSSQL에서 제품명으로 거래번호 찾기 session = db_manager.get_session('PM_PRES') sale_items_query = text(""" SELECT DISTINCT S.SL_NO_order, S.SL_NM_item, M.InsertTime FROM SALE_SUB S LEFT JOIN PM_DRUG.dbo.CD_GOODS G ON S.DrugCode = G.DrugCode LEFT JOIN SALE_MAIN M ON S.SL_NO_order = M.SL_NO_order WHERE G.GoodsName LIKE :product_name ORDER BY M.InsertTime DESC """) sale_results = session.execute(sale_items_query, { 'product_name': f'%{query}%' }).fetchall() if not sale_results: return jsonify({ 'success': True, 'results': [] }) # 2. SQLite에서 적립된 거래만 필터링 (claimed_by_user_id IS NOT NULL) transaction_ids = [row.SL_NO_order for row in sale_results] placeholders = ','.join('?' * len(transaction_ids)) cursor.execute(f""" SELECT ct.transaction_id, ct.total_amount, ct.claimed_at, ct.claimed_by_user_id, u.nickname, u.phone FROM claim_tokens ct JOIN users u ON ct.claimed_by_user_id = u.id WHERE ct.transaction_id IN ({placeholders}) AND ct.claimed_by_user_id IS NOT NULL ORDER BY ct.claimed_at DESC LIMIT 50 """, transaction_ids) claimed_results = cursor.fetchall() # 3. 결과 조합 results = [] for claim_row in claimed_results: # 해당 거래의 MSSQL 정보 찾기 mssql_row = next((r for r in sale_results if r.SL_NO_order == claim_row['transaction_id']), None) if mssql_row: results.append({ 'user_id': claim_row['claimed_by_user_id'], 'user_name': claim_row['nickname'], 'user_phone': claim_row['phone'], 'purchase_date': str(mssql_row.InsertTime)[:16].replace('T', ' ') if mssql_row.InsertTime else '-', # MSSQL 실제 거래 시간 'claimed_date': str(claim_row['claimed_at'])[:16].replace('T', ' ') if claim_row['claimed_at'] else '-', # 적립 시간 'quantity': float(mssql_row.SL_NM_item or 0), 'total_amount': int(claim_row['total_amount']) }) return jsonify({ 'success': True, 'results': results }) except Exception as e: import traceback logging.error(f"제품 검색 실패: {e}\n{traceback.format_exc()}") return jsonify({ 'success': False, 'message': f'검색 실패: {str(e)}' }), 500 finally: if conn: try: conn.close() except: pass @app.route('/admin/ai-analyze-user/', methods=['POST']) def admin_ai_analyze_user(user_id): """OpenAI GPT를 사용한 사용자 구매 패턴 AI 분석""" try: # OpenAI 사용 가능 여부 확인 if not OPENAI_AVAILABLE: return jsonify({ 'success': False, 'message': ERROR_MESSAGES['OPENAI_NOT_AVAILABLE'] }), 503 # 1. SQLite 연결 conn = db_manager.get_sqlite_connection() cursor = conn.cursor() # 2. 사용자 기본 정보 조회 cursor.execute(""" SELECT id, nickname, phone, mileage_balance, created_at FROM users WHERE id = ? """, (user_id,)) user_row = cursor.fetchone() if not user_row: return jsonify({ 'success': False, 'message': ERROR_MESSAGES['NO_USER'] }), 404 user = { 'id': user_row['id'], 'nickname': user_row['nickname'], 'phone': user_row['phone'], 'mileage_balance': user_row['mileage_balance'], 'created_at': user_row['created_at'] } # 3. 구매 이력 조회 (최근 20건) cursor.execute(""" SELECT transaction_id, total_amount, claimable_points, claimed_at FROM claim_tokens WHERE claimed_by_user_id = ? ORDER BY claimed_at DESC LIMIT 20 """, (user_id,)) claimed_tokens = cursor.fetchall() if not claimed_tokens: return jsonify({ 'success': False, 'message': ERROR_MESSAGES['NO_PURCHASES'] }), 400 # 4. MSSQL에서 상품 상세 조회 purchases = [] session = db_manager.get_session('PM_PRES') for token in claimed_tokens: transaction_id = token['transaction_id'] # SALE_MAIN에서 거래 시간 조회 sale_main_query = text(""" SELECT InsertTime FROM SALE_MAIN WHERE SL_NO_order = :transaction_id """) sale_main = session.execute(sale_main_query, {'transaction_id': transaction_id}).fetchone() # SALE_SUB + CD_GOODS JOIN sale_items_query = text(""" SELECT S.DrugCode, ISNULL(G.GoodsName, '(약품명 없음)') AS goods_name, S.SL_NM_item AS quantity, S.SL_NM_cost_a AS price, S.SL_TOTAL_PRICE AS total FROM SALE_SUB S LEFT JOIN PM_DRUG.dbo.CD_GOODS G ON S.DrugCode = G.DrugCode WHERE S.SL_NO_order = :transaction_id ORDER BY S.DrugCode """) items_raw = session.execute(sale_items_query, {'transaction_id': transaction_id}).fetchall() items = [{ 'name': item.goods_name, 'qty': int(item.quantity or 0), 'price': int(item.price or 0) } for item in items_raw] purchases.append({ 'date': str(sale_main.InsertTime)[:16].replace('T', ' ') if sale_main and sale_main.InsertTime else '-', 'amount': int(token['total_amount']), 'points': int(token['claimable_points']), 'items': items }) # 5. OpenAI API 호출용 프롬프트 생성 prompt = prepare_analysis_prompt(user, purchases) # 6. OpenAI API 호출 logging.info(f"AI 분석 시작: 사용자 ID {user_id}") success, response = call_openai_with_retry(prompt) if not success: # response에는 에러 메시지가 담겨 있음 return jsonify({ 'success': False, 'message': response }), 500 # 7. 응답 파싱 response_text = response.choices[0].message.content analysis = parse_openai_response(response_text) logging.info(f"AI 분석 완료: 사용자 ID {user_id}, 토큰: {response.usage.total_tokens}") # 8. 결과 반환 return jsonify({ 'success': True, 'user': { 'id': user['id'], 'name': user['nickname'], 'phone': user['phone'], 'balance': user['mileage_balance'] }, 'analysis': analysis, 'metadata': { 'model_used': OPENAI_MODEL, 'tokens_used': response.usage.total_tokens, 'analysis_time': datetime.now(KST).strftime('%Y-%m-%d %H:%M:%S') } }) except Exception as e: logging.error(f"AI 분석 오류: {e}") return jsonify({ 'success': False, 'message': ERROR_MESSAGES['UNKNOWN_ERROR'] }), 500 @app.route('/admin/use-points', methods=['POST']) def admin_use_points(): """관리자 페이지에서 포인트 사용 (차감)""" try: data = request.get_json() user_id = data.get('user_id') points_to_use = data.get('points') if not user_id or not points_to_use: return jsonify({ 'success': False, 'message': '사용자 ID와 포인트를 입력하세요.' }), 400 if points_to_use <= 0: return jsonify({ 'success': False, 'message': '1 이상의 포인트를 입력하세요.' }), 400 conn = db_manager.get_sqlite_connection() cursor = conn.cursor() # 1. 현재 사용자 잔액 확인 cursor.execute(""" SELECT id, nickname, mileage_balance FROM users WHERE id = ? """, (user_id,)) user = cursor.fetchone() if not user: return jsonify({ 'success': False, 'message': '사용자를 찾을 수 없습니다.' }), 404 current_balance = user['mileage_balance'] # 2. 잔액 확인 if points_to_use > current_balance: return jsonify({ 'success': False, 'message': f'잔액({current_balance:,}P)이 부족합니다.' }), 400 # 3. 포인트 차감 new_balance = current_balance - points_to_use cursor.execute(""" UPDATE users SET mileage_balance = ? WHERE id = ? """, (new_balance, user_id)) # 4. 마일리지 원장 기록 cursor.execute(""" INSERT INTO mileage_ledger (user_id, transaction_id, points, balance_after, reason, description) VALUES (?, NULL, ?, ?, 'USE', ?) """, ( user_id, -points_to_use, # 음수로 저장 new_balance, f'포인트 사용 (관리자) - {points_to_use:,}P 차감' )) conn.commit() logging.info(f"포인트 사용 완료: 사용자 ID {user_id}, 차감 {points_to_use}P, 잔액 {new_balance}P") return jsonify({ 'success': True, 'message': f'{points_to_use:,}P 사용 완료', 'new_balance': new_balance, 'used_points': points_to_use }) except Exception as e: logging.error(f"포인트 사용 실패: {e}") return jsonify({ 'success': False, 'message': f'포인트 사용 실패: {str(e)}' }), 500 @app.route('/admin') def admin(): """관리자 페이지 - 전체 사용자 및 적립 현황""" conn = db_manager.get_sqlite_connection() cursor = conn.cursor() # 전체 통계 cursor.execute(""" SELECT COUNT(*) as total_users, SUM(mileage_balance) as total_balance FROM users """) stats = cursor.fetchone() # 최근 가입 사용자 (20명) cursor.execute(""" SELECT id, nickname, phone, mileage_balance, created_at, kakao_verified_at FROM users ORDER BY created_at DESC LIMIT 20 """) recent_users_raw = cursor.fetchall() # 시간을 KST로 변환 + 조제 이력 확인 recent_users = [] # MSSQL 세션 (조제 이력 확인용) base_session = None pres_session = None try: base_session = db_manager.get_session('PM_BASE') pres_session = db_manager.get_session('PM_PRES') except: pass for user in recent_users_raw: user_dict = dict(user) user_dict['created_at'] = utc_to_kst_str(user['created_at']) user_dict['kakao_verified_at'] = utc_to_kst_str(user['kakao_verified_at']) if user['kakao_verified_at'] else None # 조제 이력 확인 (MSSQL) user_dict['has_prescription'] = False if base_session and pres_session and user['phone']: try: phone_clean = user['phone'].replace('-', '').replace(' ', '') # 1단계: PM_BASE에서 전화번호로 CUSCODE 조회 cuscode_query = text(""" SELECT TOP 1 CUSCODE FROM CD_PERSON WHERE REPLACE(REPLACE(PHONE, '-', ''), ' ', '') = :phone OR REPLACE(REPLACE(TEL_NO, '-', ''), ' ', '') = :phone OR REPLACE(REPLACE(PHONE2, '-', ''), ' ', '') = :phone """) cus_row = base_session.execute(cuscode_query, {'phone': phone_clean}).fetchone() if cus_row: # 2단계: PM_PRES에서 CUSCODE로 조제 기록 확인 pres_check = pres_session.execute(text(""" SELECT TOP 1 PreSerial FROM PS_main WHERE CusCode = :cuscode """), {'cuscode': cus_row.CUSCODE}).fetchone() if pres_check: user_dict['has_prescription'] = True except Exception as e: logging.warning(f"조제 이력 확인 실패 (user {user['id']}): {e}") recent_users.append(user_dict) # 최근 적립 내역 (50건) cursor.execute(""" SELECT ml.id, u.nickname, u.phone, ml.points, ml.balance_after, ml.reason, ml.description, ml.created_at, ml.transaction_id FROM mileage_ledger ml JOIN users u ON ml.user_id = u.id ORDER BY ml.created_at DESC LIMIT 50 """) recent_transactions_raw = cursor.fetchall() # 시간을 KST로 변환 recent_transactions = [] for trans in recent_transactions_raw: trans_dict = dict(trans) trans_dict['created_at'] = utc_to_kst_str(trans['created_at']) recent_transactions.append(trans_dict) # QR 토큰 통계 cursor.execute(""" SELECT COUNT(*) as total_tokens, SUM(CASE WHEN claimed_at IS NOT NULL THEN 1 ELSE 0 END) as claimed_count, SUM(CASE WHEN claimed_at IS NULL THEN 1 ELSE 0 END) as unclaimed_count, SUM(claimable_points) as total_points_issued, SUM(CASE WHEN claimed_at IS NOT NULL THEN claimable_points ELSE 0 END) as total_points_claimed FROM claim_tokens """) token_stats = cursor.fetchone() # 최근 QR 발행 내역 (20건) cursor.execute(""" SELECT transaction_id, total_amount, claimable_points, claimed_at, claimed_by_user_id, created_at FROM claim_tokens ORDER BY created_at DESC LIMIT 20 """) recent_tokens_raw = cursor.fetchall() # Convert only created_at (발행일), leave claimed_at (적립일) unconverted recent_tokens = [] for token in recent_tokens_raw: token_dict = dict(token) token_dict['created_at'] = utc_to_kst_str(token['created_at']) # Convert 발행일 # claimed_at stays as-is (or remains None if not claimed) recent_tokens.append(token_dict) return render_template('admin.html', stats=stats, recent_users=recent_users, recent_transactions=recent_transactions, token_stats=token_stats, recent_tokens=recent_tokens) # ============================================================================ # AI 업셀링 추천 # ============================================================================ def _get_available_products(): """약국 보유 제품 목록 (최근 30일 판매 실적 기준 TOP 40)""" try: mssql_session = db_manager.get_session('PM_PRES') rows = mssql_session.execute(text(""" SELECT TOP 40 ISNULL(G.GoodsName, '') AS name, COUNT(*) as sales, MAX(G.Saleprice) as price FROM SALE_SUB S LEFT JOIN PM_DRUG.dbo.CD_GOODS G ON S.DrugCode = G.DrugCode WHERE S.SL_DT_appl >= CONVERT(VARCHAR(8), DATEADD(DAY, -30, GETDATE()), 112) AND G.GoodsName IS NOT NULL AND G.GoodsName NOT LIKE N'%(판매중지)%' GROUP BY G.GoodsName ORDER BY COUNT(*) DESC """)).fetchall() return [{'name': r.name, 'price': float(r.price or 0), 'sales': r.sales} for r in rows] except Exception as e: logging.warning(f"[AI추천] 보유 제품 목록 조회 실패: {e}") return [] def _generate_upsell_recommendation(user_id, transaction_id, sale_items, user_name): """키오스크 적립 후 AI 업셀링 추천 생성 (fire-and-forget)""" from services.clawdbot_client import generate_upsell, generate_upsell_real if not sale_items: return # 현재 구매 품목 current_items = ', '.join(item['name'] for item in sale_items if item.get('name')) if not current_items: return # 최근 구매 이력 수집 recent_products = current_items # 기본값 try: conn = db_manager.get_sqlite_connection() cursor = conn.cursor() cursor.execute(""" SELECT ct.transaction_id FROM claim_tokens ct WHERE ct.claimed_by_user_id = ? AND ct.transaction_id != ? ORDER BY ct.claimed_at DESC LIMIT 5 """, (user_id, transaction_id)) recent_tokens = cursor.fetchall() if recent_tokens: all_products = [] mssql_session = db_manager.get_session('PM_PRES') for token in recent_tokens: rows = mssql_session.execute(text(""" SELECT ISNULL(G.GoodsName, '') AS goods_name FROM SALE_SUB S LEFT JOIN PM_DRUG.dbo.CD_GOODS G ON S.DrugCode = G.DrugCode WHERE S.SL_NO_order = :tid """), {'tid': token['transaction_id']}).fetchall() for r in rows: if r.goods_name: all_products.append(r.goods_name) if all_products: recent_products = ', '.join(set(all_products)) except Exception as e: logging.warning(f"[AI추천] 구매 이력 수집 실패 (현재 품목만 사용): {e}") # 실데이터 기반 추천 (보유 제품 목록 제공) available = _get_available_products() rec = None if available: logging.info(f"[AI추천] 실데이터 생성 시작: user={user_name}, items={current_items}, 보유제품={len(available)}개") rec = generate_upsell_real(user_name, current_items, recent_products, available) # 실데이터 실패 시 기존 방식 fallback if not rec: logging.info(f"[AI추천] 자유 생성 fallback: user={user_name}") rec = generate_upsell(user_name, current_items, recent_products) if not rec: logging.warning("[AI추천] 생성 실패 (AI 응답 없음)") return # SQLite에 저장 try: conn = db_manager.get_sqlite_connection() cursor = conn.cursor() expires_at = (datetime.now() + timedelta(days=7)).strftime('%Y-%m-%d %H:%M:%S') cursor.execute(""" INSERT INTO ai_recommendations (user_id, transaction_id, recommended_product, recommendation_message, recommendation_reason, trigger_products, ai_raw_response, expires_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( user_id, transaction_id, rec['product'], rec['message'], rec['reason'], json.dumps([item['name'] for item in sale_items], ensure_ascii=False), json.dumps(rec, ensure_ascii=False), expires_at )) conn.commit() logging.info(f"[AI추천] 저장 완료: user_id={user_id}, product={rec['product']}") except Exception as e: logging.warning(f"[AI추천] DB 저장 실패: {e}") @app.route('/api/recommendation/') def api_get_recommendation(user_id): """마이페이지용 AI 추천 조회""" conn = db_manager.get_sqlite_connection() cursor = conn.cursor() now = datetime.now().strftime('%Y-%m-%d %H:%M:%S') cursor.execute(""" SELECT id, recommended_product, recommendation_message, created_at FROM ai_recommendations WHERE user_id = ? AND status = 'active' AND (expires_at IS NULL OR expires_at > ?) ORDER BY created_at DESC LIMIT 1 """, (user_id, now)) rec = cursor.fetchone() if not rec: return jsonify({'success': True, 'has_recommendation': False}) # 표시 횟수 업데이트 cursor.execute(""" UPDATE ai_recommendations SET displayed_count = displayed_count + 1, displayed_at = COALESCE(displayed_at, ?) WHERE id = ? """, (now, rec['id'])) conn.commit() return jsonify({ 'success': True, 'has_recommendation': True, 'recommendation': { 'id': rec['id'], 'product': rec['recommended_product'], 'message': rec['recommendation_message'] } }) @app.route('/api/recommendation//dismiss', methods=['POST']) def api_dismiss_recommendation(rec_id): """추천 닫기 / 관심 표시""" data = request.get_json(silent=True) or {} action = data.get('action', 'dismissed') if action not in ('dismissed', 'interested'): action = 'dismissed' conn = db_manager.get_sqlite_connection() cursor = conn.cursor() now = datetime.now().strftime('%Y-%m-%d %H:%M:%S') cursor.execute(""" UPDATE ai_recommendations SET status = ?, dismissed_at = ? WHERE id = ? """, (action, now, rec_id)) conn.commit() logging.info(f"[AI추천] id={rec_id} → {action}") return jsonify({'success': True}) # ============================================================================ # 알림톡 로그 # ============================================================================ @app.route('/admin/alimtalk') def admin_alimtalk(): """알림톡 발송 로그 + NHN 발송 내역""" conn = db_manager.get_sqlite_connection() cursor = conn.cursor() # 로컬 발송 로그 (최근 50건) cursor.execute(""" SELECT a.*, u.nickname, u.phone as user_phone FROM alimtalk_logs a LEFT JOIN users u ON a.user_id = u.id ORDER BY a.created_at DESC LIMIT 50 """) local_logs = [dict(row) for row in cursor.fetchall()] # 통계 cursor.execute(""" SELECT COUNT(*) as total, SUM(CASE WHEN success = 1 THEN 1 ELSE 0 END) as success_count, SUM(CASE WHEN success = 0 THEN 1 ELSE 0 END) as fail_count FROM alimtalk_logs """) stats = dict(cursor.fetchone()) # 오늘 통계 cursor.execute(""" SELECT COUNT(*) as today_total, SUM(CASE WHEN success = 1 THEN 1 ELSE 0 END) as today_success FROM alimtalk_logs WHERE date(created_at) = date('now') """) today = dict(cursor.fetchone()) stats.update(today) return render_template('admin_alimtalk.html', local_logs=local_logs, stats=stats) @app.route('/admin/ai-crm') def admin_ai_crm(): """AI 업셀링 CRM 대시보드""" conn = db_manager.get_sqlite_connection() cursor = conn.cursor() # 추천 목록 (최근 50건) + 사용자 정보 JOIN cursor.execute(""" SELECT r.*, u.nickname, u.phone as user_phone FROM ai_recommendations r LEFT JOIN users u ON r.user_id = u.id ORDER BY r.created_at DESC LIMIT 50 """) recs = [dict(row) for row in cursor.fetchall()] # trigger_products JSON 파싱 for rec in recs: tp = rec.get('trigger_products') if tp: try: rec['trigger_list'] = json.loads(tp) except Exception: rec['trigger_list'] = [tp] else: rec['trigger_list'] = [] # 통계 cursor.execute(""" SELECT COUNT(*) as total, SUM(CASE WHEN status = 'active' AND (expires_at IS NULL OR expires_at > datetime('now')) THEN 1 ELSE 0 END) as active_count, SUM(CASE WHEN status = 'interested' THEN 1 ELSE 0 END) as interested_count, SUM(CASE WHEN status = 'dismissed' THEN 1 ELSE 0 END) as dismissed_count, SUM(CASE WHEN displayed_count > 0 THEN 1 ELSE 0 END) as displayed_count FROM ai_recommendations """) stats = dict(cursor.fetchone()) # 오늘 생성 건수 cursor.execute(""" SELECT COUNT(*) as today_count FROM ai_recommendations WHERE date(created_at) = date('now') """) stats['today_count'] = cursor.fetchone()['today_count'] now = datetime.now().strftime('%Y-%m-%d %H:%M:%S') return render_template('admin_ai_crm.html', recs=recs, stats=stats, now=now) @app.route('/api/admin/alimtalk/nhn-history') def api_admin_alimtalk_nhn_history(): """NHN Cloud 실제 발송 내역 API""" from services.nhn_alimtalk import get_nhn_send_history date_str = request.args.get('date', datetime.now().strftime('%Y-%m-%d')) start = f"{date_str} 00:00" end = f"{date_str} 23:59" messages = get_nhn_send_history(start, end) result = [] for m in messages: result.append({ 'requestDate': m.get('requestDate', ''), 'recipientNo': m.get('recipientNo', ''), 'templateCode': m.get('templateCode', ''), 'messageStatus': m.get('messageStatus', ''), 'resultCode': m.get('resultCode', ''), 'resultMessage': m.get('resultMessage', ''), 'content': m.get('content', ''), }) return jsonify({'success': True, 'messages': result}) @app.route('/api/admin/alimtalk/test-send', methods=['POST']) def api_admin_alimtalk_test_send(): """관리자 수동 알림톡 발송 테스트""" from services.nhn_alimtalk import send_mileage_claim_alimtalk data = request.get_json() phone = data.get('phone', '').strip().replace('-', '') name = data.get('name', '테스트') if len(phone) < 10: return jsonify({'success': False, 'message': '올바른 전화번호를 입력해주세요.'}), 400 success, msg = send_mileage_claim_alimtalk( phone, name, 100, 500, items=[{'name': '테스트 발송', 'qty': 1, 'total': 1000}], trigger_source='admin_test' ) return jsonify({'success': success, 'message': msg}) # ============================================================================ # 키오스크 적립 # ============================================================================ @app.route('/kiosk') def kiosk(): """키오스크 메인 페이지 (전체 화면 웹 UI)""" return render_template('kiosk.html') @app.route('/api/kiosk/trigger', methods=['POST']) def api_kiosk_trigger(): """ POS → 키오스크 세션 생성 POST /api/kiosk/trigger Body: {"transaction_id": "...", "amount": 50000} """ global kiosk_current_session data = request.get_json() transaction_id = data.get('transaction_id') amount = data.get('amount', 0) if not transaction_id: return jsonify({'success': False, 'message': 'transaction_id가 필요합니다.'}), 400 try: conn = db_manager.get_sqlite_connection() cursor = conn.cursor() # QR 토큰 존재 확인 cursor.execute("SELECT id, token_hash, claimable_points, claimed_at FROM claim_tokens WHERE transaction_id = ?", (transaction_id,)) token_row = cursor.fetchone() if token_row and token_row['claimed_at']: return jsonify({'success': False, 'message': '이미 적립된 거래입니다.'}), 400 if token_row: # 기존 토큰 사용 — QR URL은 새 nonce로 생성 # (verify_claim_token은 transaction_id로만 조회하므로 nonce 불일치 무관) claimable_points = token_row['claimable_points'] nonce = secrets.token_hex(6) from utils.qr_token_generator import QR_BASE_URL qr_url = f"{QR_BASE_URL}?t={transaction_id}:{nonce}" else: # 새 토큰 생성 from utils.qr_token_generator import generate_claim_token, save_token_to_db token_info = generate_claim_token(transaction_id, float(amount)) success, error = save_token_to_db( transaction_id, token_info['token_hash'], float(amount), token_info['claimable_points'], token_info['expires_at'], token_info['pharmacy_id'] ) if not success: return jsonify({'success': False, 'message': error}), 500 claimable_points = token_info['claimable_points'] qr_url = token_info['qr_url'] # MSSQL에서 구매 품목 조회 sale_items = [] try: mssql_session = db_manager.get_session('PM_PRES') sale_sub_query = text(""" SELECT ISNULL(G.GoodsName, '(약품명 없음)') AS goods_name, S.SL_NM_item AS quantity, S.SL_TOTAL_PRICE AS total FROM SALE_SUB S LEFT JOIN PM_DRUG.dbo.CD_GOODS G ON S.DrugCode = G.DrugCode WHERE S.SL_NO_order = :transaction_id ORDER BY S.DrugCode """) rows = mssql_session.execute(sale_sub_query, {'transaction_id': transaction_id}).fetchall() sale_items = [ {'name': r.goods_name, 'qty': int(r.quantity or 0), 'total': int(r.total or 0)} for r in rows ] except Exception as e: logging.warning(f"키오스크 품목 조회 실패 (transaction_id={transaction_id}): {e}") # 키오스크 세션 저장 kiosk_current_session = { 'transaction_id': transaction_id, 'amount': int(amount), 'points': claimable_points, 'qr_url': qr_url, 'items': sale_items, 'created_at': datetime.now(KST).isoformat() } return jsonify({ 'success': True, 'message': f'키오스크 적립 대기 ({claimable_points}P)', 'points': claimable_points }) except Exception as e: logging.error(f"키오스크 트리거 오류: {e}") return jsonify({'success': False, 'message': f'오류: {str(e)}'}), 500 @app.route('/api/kiosk/current') def api_kiosk_current(): """ 키오스크 폴링 - 현재 세션 조회 GET /api/kiosk/current """ global kiosk_current_session if kiosk_current_session is None: return jsonify({'active': False}) # 5분 경과 시 자동 만료 created = datetime.fromisoformat(kiosk_current_session['created_at']) if datetime.now(KST) - created > timedelta(minutes=5): kiosk_current_session = None return jsonify({'active': False}) return jsonify({ 'active': True, 'transaction_id': kiosk_current_session['transaction_id'], 'amount': kiosk_current_session['amount'], 'points': kiosk_current_session['points'], 'qr_url': kiosk_current_session.get('qr_url'), 'items': kiosk_current_session.get('items', []) }) @app.route('/api/kiosk/claim', methods=['POST']) def api_kiosk_claim(): """ 키오스크 전화번호 적립 POST /api/kiosk/claim Body: {"phone": "01012345678"} """ global kiosk_current_session if kiosk_current_session is None: return jsonify({'success': False, 'message': '적립 대기 중인 거래가 없습니다.'}), 400 data = request.get_json() phone = data.get('phone', '').strip().replace('-', '').replace(' ', '') if len(phone) < 10: return jsonify({'success': False, 'message': '올바른 전화번호를 입력해주세요.'}), 400 transaction_id = kiosk_current_session['transaction_id'] try: conn = db_manager.get_sqlite_connection() cursor = conn.cursor() # claim_tokens에서 nonce 조회를 위해 token_hash로 검증 cursor.execute(""" SELECT id, transaction_id, token_hash, total_amount, claimable_points, pharmacy_id, expires_at, claimed_at, claimed_by_user_id FROM claim_tokens WHERE transaction_id = ? """, (transaction_id,)) token_row = cursor.fetchone() if not token_row: return jsonify({'success': False, 'message': '토큰을 찾을 수 없습니다.'}), 400 if token_row['claimed_at']: kiosk_current_session = None return jsonify({'success': False, 'message': '이미 적립된 거래입니다.'}), 400 # 만료 확인 expires_at = datetime.strptime(token_row['expires_at'], '%Y-%m-%d %H:%M:%S') if datetime.now() > expires_at: kiosk_current_session = None return jsonify({'success': False, 'message': '만료된 거래입니다.'}), 400 # token_info 딕셔너리 구성 (claim_mileage 호환) token_info = { 'id': token_row['id'], 'transaction_id': token_row['transaction_id'], 'total_amount': token_row['total_amount'], 'claimable_points': token_row['claimable_points'] } # 사용자 조회/생성 user_id, is_new = get_or_create_user(phone, '고객') # 마일리지 적립 claim_success, claim_msg, new_balance = claim_mileage(user_id, token_info) if not claim_success: return jsonify({'success': False, 'message': claim_msg}), 500 # 키오스크 세션에서 품목 정보 캡처 후 클리어 claimed_points = token_info['claimable_points'] sale_items = kiosk_current_session.get('items', []) kiosk_current_session = None # 알림톡 발송 (fire-and-forget) try: from services.nhn_alimtalk import send_mileage_claim_alimtalk # 유저 이름 조회 (기존 유저는 실명이 있을 수 있음) cursor.execute("SELECT nickname FROM users WHERE id = ?", (user_id,)) user_row = cursor.fetchone() user_name = user_row['nickname'] if user_row else '고객' logging.warning(f"[알림톡] 발송 시도: phone={phone}, name={user_name}, points={claimed_points}, balance={new_balance}, items={sale_items}") success, msg = send_mileage_claim_alimtalk( phone, user_name, claimed_points, new_balance, items=sale_items, user_id=user_id, trigger_source='kiosk', transaction_id=transaction_id ) logging.warning(f"[알림톡] 발송 결과: success={success}, msg={msg}") except Exception as alimtalk_err: logging.warning(f"[알림톡] 발송 예외 (적립은 완료): {alimtalk_err}") # AI 업셀링 추천 생성 (별도 스레드 — 적립 응답 블로킹 방지) import threading def _bg_upsell(): try: _generate_upsell_recommendation(user_id, transaction_id, sale_items, user_name) except Exception as rec_err: logging.warning(f"[AI추천] 생성 예외 (적립은 완료): {rec_err}") threading.Thread(target=_bg_upsell, daemon=True).start() return jsonify({ 'success': True, 'message': f'{claimed_points}P 적립 완료!', 'points': claimed_points, 'balance': new_balance, 'is_new_user': is_new }) except Exception as e: logging.error(f"키오스크 적립 오류: {e}") return jsonify({'success': False, 'message': f'오류: {str(e)}'}), 500 # ===== AI Gateway 모니터 페이지 ===== @app.route('/admin/ai-gw') def admin_ai_gw(): """AI Gateway 모니터 페이지""" return render_template('admin_ai_gw.html') # ===== 판매 상세 조회 페이지 ===== @app.route('/admin/sales-detail') def admin_sales_detail(): """판매 상세 조회 페이지 (상품코드/바코드/표준코드 매핑)""" return render_template('admin_sales_detail.html') # ===== 제품 검색 페이지 ===== @app.route('/admin/products') def admin_products(): """제품 검색 페이지 (전체 재고에서 검색, QR 인쇄)""" return render_template('admin_products.html') @app.route('/admin/members') def admin_members(): """회원 검색 페이지 (팜IT3000 CD_PERSON, 알림톡/SMS 발송)""" return render_template('admin_members.html') @app.route('/api/products') def api_products(): """ 제품 검색 API - 상품명, 상품코드, 바코드로 검색 - 세트상품 바코드도 CD_ITEM_UNIT_MEMBER에서 조회 """ search = request.args.get('search', '').strip() limit = int(request.args.get('limit', 100)) if not search or len(search) < 2: return jsonify({'success': False, 'error': '검색어는 2글자 이상 입력하세요'}) try: drug_session = db_manager.get_session('PM_DRUG') # 제품 검색 쿼리 # CD_GOODS.BARCODE가 없으면 CD_ITEM_UNIT_MEMBER.CD_CD_BARCODE 사용 (세트상품) # 세트상품 여부 확인: CD_item_set에 SetCode로 존재하면 세트상품 products_query = text(f""" SELECT TOP {limit} G.DrugCode as drug_code, G.GoodsName as product_name, COALESCE(NULLIF(G.BARCODE, ''), U.CD_CD_BARCODE, '') as barcode, G.Saleprice as sale_price, G.Price as cost_price, CASE WHEN G.SplName IS NOT NULL AND G.SplName != '' THEN G.SplName WHEN SET_CHK.is_set = 1 THEN N'세트상품' ELSE '' END as supplier, CASE WHEN SET_CHK.is_set = 1 THEN 1 ELSE 0 END as is_set FROM CD_GOODS G OUTER APPLY ( SELECT TOP 1 CD_CD_BARCODE FROM CD_ITEM_UNIT_MEMBER WHERE DRUGCODE = G.DrugCode AND CD_CD_BARCODE IS NOT NULL AND CD_CD_BARCODE != '' ) U OUTER APPLY ( SELECT TOP 1 1 as is_set FROM CD_item_set WHERE SetCode = G.DrugCode AND DrugCode = 'SET0000' ) SET_CHK WHERE G.GoodsName LIKE :search_like OR G.DrugCode LIKE :search_like OR G.BARCODE LIKE :search_like OR U.CD_CD_BARCODE LIKE :search_like ORDER BY G.GoodsName """) search_like = f'%{search}%' rows = drug_session.execute(products_query, { 'search_like': search_like }).fetchall() items = [] for row in rows: items.append({ 'drug_code': row.drug_code or '', 'product_name': row.product_name or '', 'barcode': row.barcode or '', 'sale_price': float(row.sale_price) if row.sale_price else 0, 'cost_price': float(row.cost_price) if row.cost_price else 0, 'supplier': row.supplier or '', 'is_set': bool(row.is_set) }) return jsonify({ 'success': True, 'items': items, 'count': len(items) }) except Exception as e: logging.error(f"제품 검색 오류: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/admin/sales') def admin_sales_pos(): """판매 내역 페이지 (POS 스타일, 거래별 그룹핑)""" return render_template('admin_sales_pos.html') @app.route('/api/sales-detail') def api_sales_detail(): """ 판매 상세 조회 API (바코드 포함) GET /api/sales-detail?days=7&search=타이레놀&barcode=has&customer=홍길동 """ try: days = int(request.args.get('days', 7)) search = request.args.get('search', '').strip() barcode_filter = request.args.get('barcode', 'all') # all, has, none mssql_session = db_manager.get_session('PM_PRES') drug_session = db_manager.get_session('PM_DRUG') # 판매 내역 조회 (최근 N일) # CD_GOODS.BARCODE가 없으면 CD_ITEM_UNIT_MEMBER.CD_CD_BARCODE 사용 (세트상품/자체등록 바코드) # 세트상품 여부 확인: CD_item_set에 SetCode로 존재하면 세트상품 sales_query = text(""" SELECT S.SL_DT_appl as sale_date, S.SL_NO_order as item_order, S.DrugCode as drug_code, ISNULL(G.GoodsName, '알 수 없음') as product_name, COALESCE(NULLIF(G.BARCODE, ''), U.CD_CD_BARCODE, '') as barcode, CASE WHEN G.SplName IS NOT NULL AND G.SplName != '' THEN G.SplName WHEN SET_CHK.is_set = 1 THEN '세트상품' ELSE '' END as supplier, ISNULL(S.QUAN, 1) as quantity, ISNULL(S.SL_TOTAL_PRICE, 0) as total_price_db, ISNULL(G.Saleprice, 0) as unit_price FROM SALE_SUB S LEFT JOIN PM_DRUG.dbo.CD_GOODS G ON S.DrugCode = G.DrugCode OUTER APPLY ( SELECT TOP 1 CD_CD_BARCODE FROM PM_DRUG.dbo.CD_ITEM_UNIT_MEMBER WHERE DRUGCODE = S.DrugCode AND CD_CD_BARCODE IS NOT NULL AND CD_CD_BARCODE != '' ) U OUTER APPLY ( SELECT TOP 1 1 as is_set FROM PM_DRUG.dbo.CD_item_set WHERE SetCode = S.DrugCode AND DrugCode = 'SET0000' ) SET_CHK WHERE S.SL_DT_appl >= CONVERT(VARCHAR(8), DATEADD(DAY, -:days, GETDATE()), 112) ORDER BY S.SL_DT_appl DESC, S.SL_NO_order DESC """) rows = mssql_session.execute(sales_query, {'days': days}).fetchall() items = [] total_amount = 0 barcode_count = 0 unique_products = set() for row in rows: drug_code = row.drug_code or '' product_name = row.product_name or '' barcode = row.barcode or '' # 검색 필터 (상품명, 코드, 바코드) if search: search_lower = search.lower() if (search_lower not in product_name.lower() and search_lower not in drug_code.lower() and search_lower not in barcode.lower()): continue # 바코드 필터 if barcode_filter == 'has' and not barcode: continue if barcode_filter == 'none' and barcode: continue # 표준코드 조회 (CD_BARCODE 테이블) standard_code = '' if barcode: try: std_result = drug_session.execute(text(""" SELECT BASECODE FROM CD_BARCODE WHERE BARCODE = :barcode """), {'barcode': barcode}).fetchone() if std_result and std_result[0]: standard_code = std_result[0] except: pass quantity = int(row.quantity or 1) unit_price = float(row.unit_price or 0) total_price_from_db = float(row.total_price_db or 0) # DB에 합계가 있으면 사용, 없으면 계산 total_price = total_price_from_db if total_price_from_db > 0 else (quantity * unit_price) # 날짜 포맷팅 sale_date_str = str(row.sale_date or '') if len(sale_date_str) == 8: sale_date_str = f"{sale_date_str[:4]}-{sale_date_str[4:6]}-{sale_date_str[6:]}" items.append({ 'sale_date': sale_date_str, 'drug_code': drug_code, 'product_name': product_name, 'barcode': barcode, 'standard_code': standard_code, 'supplier': row.supplier or '', 'quantity': quantity, 'unit_price': int(unit_price), 'total_price': int(total_price) }) total_amount += total_price if barcode: barcode_count += 1 unique_products.add(drug_code) # 바코드 매핑률 계산 barcode_rate = round(barcode_count / len(items) * 100, 1) if items else 0 return jsonify({ 'success': True, 'items': items[:500], # 최대 500건 'stats': { 'total_count': len(items), 'total_amount': int(total_amount), 'barcode_rate': barcode_rate, 'unique_products': len(unique_products) } }) except Exception as e: logging.error(f"판매 상세 조회 오류: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 # ===== Claude 상태 API ===== @app.route('/api/claude-status') def api_claude_status(): """ Claude 사용량 상태 조회 (토큰 차감 없음) GET /api/claude-status GET /api/claude-status?detail=true — 전체 세션 상세 포함 Returns: { "ok": true, "connected": true, "model": "claude-opus-4-5", "mainSession": { ... }, "summary": { ... }, "sessions": [ ... ], // detail=true 일 때만 "timestamp": "2026-02-27T09:45:00+09:00" } """ try: from services.clawdbot_client import get_claude_status # 상세 모드 여부 detail_mode = request.args.get('detail', 'false').lower() == 'true' status = get_claude_status() if not status or not status.get('connected'): return jsonify({ 'ok': False, 'connected': False, 'error': status.get('error', 'Gateway 연결 실패'), 'timestamp': datetime.now(KST).isoformat() }), 503 sessions_data = status.get('sessions', {}) sessions_list = sessions_data.get('sessions', []) defaults = sessions_data.get('defaults', {}) # 메인 세션 찾기 main_session = None for s in sessions_list: if s.get('key') == 'agent:main:main': main_session = s break # 전체 토큰 합계 total_tokens = sum(s.get('totalTokens', 0) for s in sessions_list) # 메인 세션 컨텍스트 사용률 context_used = 0 context_max = defaults.get('contextTokens', 200000) context_percent = 0 if main_session: context_used = main_session.get('totalTokens', 0) context_max = main_session.get('contextTokens', context_max) if context_max > 0: context_percent = round(context_used / context_max * 100, 1) # 기본 응답 response = { 'ok': True, 'connected': True, 'model': f"{defaults.get('modelProvider', 'unknown')}/{defaults.get('model', 'unknown')}", 'context': { 'used': context_used, 'max': context_max, 'percent': context_percent, 'display': f"{context_used//1000}k/{context_max//1000}k ({context_percent}%)" }, 'mainSession': { 'key': main_session.get('key') if main_session else None, 'inputTokens': main_session.get('inputTokens', 0) if main_session else 0, 'outputTokens': main_session.get('outputTokens', 0) if main_session else 0, 'totalTokens': main_session.get('totalTokens', 0) if main_session else 0, 'lastChannel': main_session.get('lastChannel') if main_session else None } if main_session else None, 'summary': { 'totalSessions': len(sessions_list), 'totalTokens': total_tokens, 'activeModel': defaults.get('model') }, 'timestamp': datetime.now(KST).isoformat() } # 상세 모드: 전체 세션 목록 추가 if detail_mode: detailed_sessions = [] for s in sessions_list: session_tokens = s.get('totalTokens', 0) session_context_max = s.get('contextTokens', 200000) session_percent = round(session_tokens / session_context_max * 100, 1) if session_context_max > 0 else 0 # 세션 키에서 이름 추출 (agent:main:xxx → xxx) session_key = s.get('key', '') session_name = session_key.split(':')[-1] if ':' in session_key else session_key # 마지막 활동 시간 updated_at = s.get('updatedAt') updated_str = None if updated_at: try: dt = datetime.fromtimestamp(updated_at / 1000, tz=KST) updated_str = dt.strftime('%Y-%m-%d %H:%M') except: pass detailed_sessions.append({ 'key': session_key, 'name': session_name, 'displayName': s.get('displayName', session_name), 'model': f"{s.get('modelProvider', 'unknown')}/{s.get('model', 'unknown')}", 'tokens': { 'input': s.get('inputTokens', 0), 'output': s.get('outputTokens', 0), 'total': session_tokens, 'contextMax': session_context_max, 'contextPercent': session_percent, 'display': f"{session_tokens//1000}k/{session_context_max//1000}k ({session_percent}%)" }, 'channel': s.get('lastChannel') or s.get('origin', {}).get('provider'), 'kind': s.get('kind'), 'updatedAt': updated_str }) # 토큰 사용량 순으로 정렬 detailed_sessions.sort(key=lambda x: x['tokens']['total'], reverse=True) response['sessions'] = detailed_sessions # 모델별 통계 model_stats = {} for s in sessions_list: model_key = f"{s.get('modelProvider', 'unknown')}/{s.get('model', 'unknown')}" if model_key not in model_stats: model_stats[model_key] = {'sessions': 0, 'tokens': 0} model_stats[model_key]['sessions'] += 1 model_stats[model_key]['tokens'] += s.get('totalTokens', 0) response['modelStats'] = model_stats return jsonify(response) except Exception as e: logging.error(f"Claude 상태 조회 오류: {e}") return jsonify({ 'ok': False, 'connected': False, 'error': str(e), 'timestamp': datetime.now(KST).isoformat() }), 500 # ============================================================================= # 회원 검색 API (팜IT3000 CD_PERSON) # ============================================================================= @app.route('/api/members/search') def api_members_search(): """ 회원 검색 API - 이름 또는 전화번호로 검색 - PM_BASE.dbo.CD_PERSON 테이블 조회 """ search = request.args.get('q', '').strip() search_type = request.args.get('type', 'auto') # auto, name, phone limit = min(int(request.args.get('limit', 50)), 200) if not search or len(search) < 2: return jsonify({'success': False, 'error': '검색어는 2글자 이상 입력하세요'}) try: # PM_BASE 연결 base_session = db_manager.get_session('PM_BASE') # 검색 타입 자동 감지 if search_type == 'auto': # 숫자만 있으면 전화번호, 아니면 이름 if search.replace('-', '').replace(' ', '').isdigit(): search_type = 'phone' else: search_type = 'name' # 전화번호 정규화 phone_search = search.replace('-', '').replace(' ', '') if search_type == 'phone': # 전화번호 검색 (3개 컬럼 모두) query = text(f""" SELECT TOP {limit} CUSCODE, PANAME, PANUM, TEL_NO, PHONE, PHONE2, CUSETC, EMAIL, SMS_STOP FROM CD_PERSON WHERE REPLACE(REPLACE(TEL_NO, '-', ''), ' ', '') LIKE :phone OR REPLACE(REPLACE(PHONE, '-', ''), ' ', '') LIKE :phone OR REPLACE(REPLACE(PHONE2, '-', ''), ' ', '') LIKE :phone ORDER BY PANAME """) rows = base_session.execute(query, {'phone': f'%{phone_search}%'}).fetchall() else: # 이름 검색 query = text(f""" SELECT TOP {limit} CUSCODE, PANAME, PANUM, TEL_NO, PHONE, PHONE2, CUSETC, EMAIL, SMS_STOP FROM CD_PERSON WHERE PANAME LIKE :name ORDER BY PANAME """) rows = base_session.execute(query, {'name': f'%{search}%'}).fetchall() members = [] for row in rows: # 유효한 전화번호 찾기 (PHONE 우선, 없으면 TEL_NO, PHONE2) phone = row.PHONE or row.TEL_NO or row.PHONE2 or '' phone = phone.strip() if phone else '' members.append({ 'cuscode': row.CUSCODE, 'name': row.PANAME or '', 'panum': row.PANUM or '', 'phone': phone, 'tel_no': row.TEL_NO or '', 'phone1': row.PHONE or '', 'phone2': row.PHONE2 or '', 'memo': (row.CUSETC or '')[:100], # 메모 100자 제한 'email': row.EMAIL or '', 'sms_stop': row.SMS_STOP == 'Y' # SMS 수신거부 여부 }) return jsonify({ 'success': True, 'items': members, 'count': len(members), 'search_type': search_type }) except Exception as e: logging.error(f"회원 검색 오류: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/members/') def api_member_detail(cuscode): """회원 상세 정보 + 메모 조회""" try: base_session = db_manager.get_session('PM_BASE') # 회원 정보 query = text(""" SELECT CUSCODE, PANAME, PANUM, TEL_NO, PHONE, PHONE2, CUSETC, EMAIL, ADDRESS, SMS_STOP FROM CD_PERSON WHERE CUSCODE = :cuscode """) row = base_session.execute(query, {'cuscode': cuscode}).fetchone() if not row: return jsonify({'success': False, 'error': '회원을 찾을 수 없습니다'}), 404 member = { 'cuscode': row.CUSCODE, 'name': row.PANAME or '', 'panum': row.PANUM or '', 'phone': row.PHONE or row.TEL_NO or row.PHONE2 or '', 'tel_no': row.TEL_NO or '', 'phone1': row.PHONE or '', 'phone2': row.PHONE2 or '', 'memo': row.CUSETC or '', 'email': row.EMAIL or '', 'address': row.ADDRESS or '', 'sms_stop': row.SMS_STOP == 'Y' } # 상세 메모 조회 memo_query = text(""" SELECT MEMO_CODE, PHARMA_ID, MEMO_DATE, MEMO_TITLE, MEMO_Item FROM CD_PERSON_MEMO WHERE CUSCODE = :cuscode ORDER BY MEMO_DATE DESC """) memo_rows = base_session.execute(memo_query, {'cuscode': cuscode}).fetchall() memos = [] for m in memo_rows: memos.append({ 'memo_code': m.MEMO_CODE, 'author': m.PHARMA_ID or '', 'date': m.MEMO_DATE or '', 'title': m.MEMO_TITLE or '', 'content': m.MEMO_Item or '' }) return jsonify({ 'success': True, 'member': member, 'memos': memos }) except Exception as e: logging.error(f"회원 상세 조회 오류: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/members/history/') def api_member_history(phone): """ 회원 구매 이력 통합 조회 API - 마일리지 적립/사용 내역 (SQLite) - transaction_id로 POS 품목 조회 (MSSQL) - 전체 구매 이력 (SALE_MAIN) - 조제 이력 (PS_main) """ try: # 전화번호 정규화 phone = phone.replace('-', '').replace(' ', '') result = { 'success': True, 'phone': phone, 'mileage': None, 'purchases': [], # 전체 구매 이력 'prescriptions': [], # 조제 이력 'interests': [], # 관심 상품 (AI 업셀링) 'pos_customer': None } transaction_ids = [] # 적립된 거래번호 수집 # 1. 마일리지 내역 조회 (SQLite) - 새 연결 사용 (멀티스레드 안전) sqlite_conn = None try: sqlite_conn = db_manager.get_sqlite_connection(new_connection=True) cursor = sqlite_conn.cursor() # 사용자 정보 조회 cursor.execute(""" SELECT id, nickname, phone, mileage_balance, created_at FROM users WHERE phone = ? """, (phone,)) user = cursor.fetchone() if user: user_id = user['id'] # 적립/사용 내역 조회 (claim_tokens 조인으로 금액 포함) cursor.execute(""" SELECT ml.points, ml.balance_after, ml.reason, ml.description, ml.transaction_id, ml.created_at, ct.total_amount FROM mileage_ledger ml LEFT JOIN claim_tokens ct ON ml.transaction_id = ct.transaction_id WHERE ml.user_id = ? ORDER BY ml.created_at DESC LIMIT 50 """, (user_id,)) transactions = cursor.fetchall() tx_list = [] for t in transactions: tx_data = { 'points': t['points'], 'balance_after': t['balance_after'], 'reason': t['reason'], 'description': t['description'], 'transaction_id': t['transaction_id'], 'created_at': t['created_at'], 'total_amount': t['total_amount'], 'items': [] # 나중에 POS에서 채움 } tx_list.append(tx_data) # 적립 거래번호 수집 if t['transaction_id'] and t['reason'] == 'CLAIM': transaction_ids.append(t['transaction_id']) result['mileage'] = { 'user_id': user_id, 'name': user['nickname'] or '', 'phone': user['phone'], 'balance': user['mileage_balance'] or 0, 'member_since': user['created_at'], 'transactions': tx_list } # 관심 상품 조회 (AI 업셀링에서 '관심있어요' 표시한 것) cursor.execute(""" SELECT id, recommended_product, recommendation_message, recommendation_reason, trigger_products, created_at FROM ai_recommendations WHERE user_id = ? AND status = 'interested' ORDER BY created_at DESC LIMIT 20 """, (user_id,)) interests = cursor.fetchall() result['interests'] = [{ 'id': i['id'], 'product': i['recommended_product'], 'message': i['recommendation_message'], 'reason': i['recommendation_reason'], 'trigger_products': i['trigger_products'], 'created_at': i['created_at'] } for i in interests] except Exception as e: import traceback logging.error(f"마일리지 조회 실패: {e}\n{traceback.format_exc()}") finally: # SQLite 연결 닫기 if sqlite_conn: try: sqlite_conn.close() except: pass # 2. 전화번호로 POS 고객코드 조회 (MSSQL) cuscode = None try: base_session = db_manager.get_session('PM_BASE') cuscode_query = text(""" SELECT TOP 1 CUSCODE, PANAME FROM CD_PERSON WHERE REPLACE(REPLACE(PHONE, '-', ''), ' ', '') = :phone OR REPLACE(REPLACE(TEL_NO, '-', ''), ' ', '') = :phone OR REPLACE(REPLACE(PHONE2, '-', ''), ' ', '') = :phone """) cus_row = base_session.execute(cuscode_query, {'phone': phone}).fetchone() if cus_row: cuscode = cus_row.CUSCODE result['pos_customer'] = { 'cuscode': cuscode, 'name': cus_row.PANAME } except Exception as e: logging.warning(f"POS 고객 조회 실패: {e}") # 3. transaction_id로 POS 품목 조회 (마일리지 적립 내역) if transaction_ids and result['mileage']: try: pres_session = db_manager.get_session('PM_PRES') # 각 거래번호별 품목 조회 items_by_tx = {} for tx_id in transaction_ids[:30]: # 최대 30건 items_query = text(""" SELECT S.DrugCode, G.GoodsName, S.QUAN as quantity, S.SL_TOTAL_PRICE as price FROM SALE_SUB S LEFT JOIN PM_DRUG.dbo.CD_GOODS G ON S.DrugCode = G.DrugCode WHERE S.SL_NO_order = :order_no """) items = pres_session.execute(items_query, {'order_no': tx_id}).fetchall() items_by_tx[tx_id] = [{ 'drug_code': item.DrugCode, 'name': item.GoodsName or '알 수 없음', 'quantity': float(item.quantity) if item.quantity else 1, 'price': float(item.price) if item.price else 0 } for item in items] # 마일리지 내역에 품목 추가 for tx in result['mileage']['transactions']: tx_id = tx.get('transaction_id') if tx_id and tx_id in items_by_tx: tx['items'] = items_by_tx[tx_id] except Exception as e: logging.warning(f"POS 품목 조회 실패: {e}") # 4. 전체 구매 이력 조회 (고객코드 기준) if cuscode: try: pres_session = db_manager.get_session('PM_PRES') # 최근 60일 구매 이력 purchase_query = text(""" SELECT TOP 30 M.SL_NO_order as order_no, M.SL_DT_appl as order_date, M.SL_MY_total as total_amount FROM SALE_MAIN M WHERE M.SL_CD_custom = :cuscode ORDER BY M.SL_DT_appl DESC, M.SL_NO_order DESC """) orders = pres_session.execute(purchase_query, {'cuscode': cuscode}).fetchall() purchases = [] for order in orders: # 품목 조회 items_query = text(""" SELECT S.DrugCode, G.GoodsName, S.QUAN as quantity, S.SL_TOTAL_PRICE as price FROM SALE_SUB S LEFT JOIN PM_DRUG.dbo.CD_GOODS G ON S.DrugCode = G.DrugCode WHERE S.SL_NO_order = :order_no """) items = pres_session.execute(items_query, {'order_no': order.order_no}).fetchall() purchases.append({ 'order_no': order.order_no, 'date': order.order_date, 'total': float(order.total_amount) if order.total_amount else 0, 'items': [{ 'drug_code': item.DrugCode, 'name': item.GoodsName or '알 수 없음', 'quantity': float(item.quantity) if item.quantity else 1, 'price': float(item.price) if item.price else 0 } for item in items] }) result['purchases'] = purchases except Exception as e: logging.warning(f"전체 구매 이력 조회 실패: {e}") # 5. 조제 이력 조회 (고객코드 기준) if cuscode: try: pres_session = db_manager.get_session('PM_PRES') # 최근 조제 이력 (최대 30건) rx_query = text(""" SELECT TOP 30 P.PreSerial, P.Indate, P.Paname, P.Drname, P.OrderName, P.TDAYS FROM PS_main P WHERE P.CusCode = :cuscode ORDER BY P.Indate DESC, P.PreSerial DESC """) rxs = pres_session.execute(rx_query, {'cuscode': cuscode}).fetchall() prescriptions = [] for rx in rxs: # 처방 품목 조회 items_query = text(""" SELECT S.DrugCode, G.GoodsName, S.Days, S.QUAN, S.QUAN_TIME FROM PS_sub_pharm S LEFT JOIN PM_DRUG.dbo.CD_GOODS G ON S.DrugCode = G.DrugCode WHERE S.PreSerial = :pre_serial """) items = pres_session.execute(items_query, {'pre_serial': rx.PreSerial}).fetchall() prescriptions.append({ 'pre_serial': rx.PreSerial, 'date': rx.Indate, 'patient_name': rx.Paname, 'doctor': rx.Drname, 'hospital': rx.OrderName, 'total_days': rx.TDAYS, 'items': [{ 'drug_code': item.DrugCode, 'name': item.GoodsName or '알 수 없음', 'days': float(item.Days) if item.Days else 0, 'quantity': float(item.QUAN) if item.QUAN else 0, 'times_per_day': float(item.QUAN_TIME) if item.QUAN_TIME else 0 } for item in items] }) result['prescriptions'] = prescriptions except Exception as e: logging.warning(f"조제 이력 조회 실패: {e}") return jsonify(result) except Exception as e: logging.error(f"회원 이력 조회 오류: {e}") return jsonify({'success': False, 'error': str(e)}), 500 # ============================================================================= # 알림톡/SMS 발송 API # ============================================================================= @app.route('/api/message/send', methods=['POST']) def api_message_send(): """ 알림톡/SMS 발송 API Body: { "recipients": [{"cuscode": "", "name": "", "phone": ""}], "message": "메시지 내용", "type": "alimtalk" | "sms" } """ try: data = request.get_json() if not data: return jsonify({'success': False, 'error': '데이터가 없습니다'}), 400 recipients = data.get('recipients', []) message = data.get('message', '') msg_type = data.get('type', 'sms') # alimtalk 또는 sms if not recipients: return jsonify({'success': False, 'error': '수신자가 없습니다'}), 400 if not message: return jsonify({'success': False, 'error': '메시지 내용이 없습니다'}), 400 # 전화번호 정규화 valid_recipients = [] for r in recipients: phone = (r.get('phone') or '').replace('-', '').replace(' ', '') if phone and len(phone) >= 10: valid_recipients.append({ 'cuscode': r.get('cuscode', ''), 'name': r.get('name', ''), 'phone': phone }) if not valid_recipients: return jsonify({'success': False, 'error': '유효한 전화번호가 없습니다'}), 400 # TODO: 실제 발송 로직 (NHN Cloud 알림톡/SMS) # 현재는 테스트 모드로 성공 응답 results = [] for r in valid_recipients: results.append({ 'phone': r['phone'], 'name': r['name'], 'status': 'success', # 실제 발송 시 결과로 변경 'message': f'{msg_type} 발송 예약됨' }) return jsonify({ 'success': True, 'type': msg_type, 'sent_count': len(results), 'results': results, 'message': f'{len(results)}명에게 {msg_type} 발송 완료 (테스트 모드)' }) except Exception as e: logging.error(f"메시지 발송 오류: {e}") return jsonify({'success': False, 'error': str(e)}), 500 # ============================================================================= # QR 라벨 인쇄 API # ============================================================================= @app.route('/api/qr-print', methods=['POST']) def api_qr_print(): """QR 라벨 인쇄 API""" try: from qr_printer import print_drug_qr_label data = request.get_json() if not data: return jsonify({'success': False, 'error': '데이터가 없습니다'}), 400 drug_name = data.get('drug_name', '') barcode = data.get('barcode', '') drug_code = data.get('drug_code', '') sale_price = data.get('sale_price', 0) if not drug_name: return jsonify({'success': False, 'error': '상품명이 필요합니다'}), 400 # 바코드가 없으면 drug_code 사용 qr_data = barcode if barcode else drug_code result = print_drug_qr_label( drug_name=drug_name, barcode=qr_data, sale_price=sale_price, drug_code=drug_code, pharmacy_name='청춘약국' ) return jsonify(result) except ImportError as e: logging.error(f"QR 프린터 모듈 로드 실패: {e}") return jsonify({'success': False, 'error': 'QR 프린터 모듈이 없습니다'}), 500 except Exception as e: logging.error(f"QR 인쇄 오류: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/qr-preview', methods=['POST']) def api_qr_preview(): """QR 라벨 미리보기 API (base64 이미지 반환)""" try: from qr_printer import preview_qr_label data = request.get_json() if not data: return jsonify({'success': False, 'error': '데이터가 없습니다'}), 400 drug_name = data.get('drug_name', '') barcode = data.get('barcode', '') drug_code = data.get('drug_code', '') sale_price = data.get('sale_price', 0) if not drug_name: return jsonify({'success': False, 'error': '상품명이 필요합니다'}), 400 qr_data = barcode if barcode else drug_code image_data = preview_qr_label( drug_name=drug_name, barcode=qr_data, sale_price=sale_price, drug_code=drug_code, pharmacy_name='청춘약국' ) return jsonify({'success': True, 'image': image_data}) except Exception as e: logging.error(f"QR 미리보기 오류: {e}") return jsonify({'success': False, 'error': str(e)}), 500 def check_port_available(port: int) -> bool: """포트가 사용 가능한지 확인""" import socket sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(1) result = sock.connect_ex(('127.0.0.1', port)) sock.close() return result != 0 # 0이면 이미 사용 중, 0이 아니면 사용 가능 def kill_process_on_port(port: int) -> bool: """특정 포트를 사용하는 프로세스 종료 (Windows)""" import subprocess try: # netstat으로 PID 찾기 result = subprocess.run( f'netstat -ano | findstr ":{port}"', shell=True, capture_output=True, text=True ) for line in result.stdout.strip().split('\n'): if 'LISTENING' in line: parts = line.split() pid = parts[-1] if pid.isdigit(): subprocess.run(f'taskkill /F /PID {pid}', shell=True) logging.info(f"포트 {port} 사용 중인 프로세스 종료: PID {pid}") return True return False except Exception as e: logging.error(f"프로세스 종료 실패: {e}") return False if __name__ == '__main__': import os PORT = 7001 # Flask reloader 자식 프로세스가 아닌 경우에만 체크 if os.environ.get('WERKZEUG_RUN_MAIN') != 'true': if not check_port_available(PORT): logging.warning(f"포트 {PORT}이 이미 사용 중입니다. 기존 프로세스를 종료합니다...") if kill_process_on_port(PORT): import time time.sleep(2) # 프로세스 종료 대기 else: logging.error(f"포트 {PORT} 해제 실패. 수동으로 확인하세요.") # 개발 모드로 실행 app.run(host='0.0.0.0', port=PORT, debug=True)