""" Flask 웹 서버 - QR 마일리지 적립 간편 적립: 전화번호 + 이름만 입력 """ from flask import Flask, request, render_template, jsonify, redirect, url_for, session import hashlib import base64 import secrets from datetime import datetime, timezone, timedelta import sys import os import logging from sqlalchemy import text from dotenv import load_dotenv import json import time # 환경 변수 로드 load_dotenv() # OpenAI import try: from openai import OpenAI, OpenAIError, RateLimitError, APITimeoutError OPENAI_AVAILABLE = True except ImportError: OPENAI_AVAILABLE = False logging.warning("OpenAI 라이브러리가 설치되지 않았습니다. AI 분석 기능을 사용할 수 없습니다.") # Path setup sys.path.insert(0, os.path.dirname(__file__)) from db.dbsetup import DatabaseManager app = Flask(__name__) app.secret_key = 'pharmacy-qr-mileage-secret-key-2026' # 세션 설정 (PWA 자동적립 지원) app.config['SESSION_COOKIE_SECURE'] = not app.debug # HTTPS 전용 (로컬 개발 시 제외) app.config['SESSION_COOKIE_SAMESITE'] = 'Lax' # QR 스캔 시 쿠키 전송 허용 app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(days=90) # 3개월 유지 # 데이터베이스 매니저 db_manager = DatabaseManager() # KST 타임존 (UTC+9) KST = timezone(timedelta(hours=9)) def utc_to_kst_str(utc_time_str): """ UTC 시간 문자열을 KST 시간 문자열로 변환 Args: utc_time_str (str): UTC 시간 문자열 (ISO 8601 형식) Returns: str: KST 시간 문자열 (YYYY-MM-DD HH:MM:SS) """ if not utc_time_str: return None try: # ISO 8601 형식 파싱 ('2026-01-23T12:28:36' 또는 '2026-01-23 12:28:36') utc_time_str = utc_time_str.replace(' ', 'T') # 공백을 T로 변환 # datetime 객체로 변환 if 'T' in utc_time_str: utc_time = datetime.fromisoformat(utc_time_str) else: utc_time = datetime.fromisoformat(utc_time_str) # UTC 타임존 설정 (naive datetime인 경우) if utc_time.tzinfo is None: utc_time = utc_time.replace(tzinfo=timezone.utc) # KST로 변환 kst_time = utc_time.astimezone(KST) # 문자열로 반환 (초 단위까지만) return kst_time.strftime('%Y-%m-%d %H:%M:%S') except Exception as e: logging.error(f"시간 변환 실패: {utc_time_str}, 오류: {e}") return utc_time_str # 변환 실패 시 원본 반환 # ===== OpenAI 설정 및 헬퍼 함수 ===== # OpenAI API 설정 OPENAI_API_KEY = os.getenv('OPENAI_API_KEY') OPENAI_MODEL = os.getenv('OPENAI_MODEL', 'gpt-4o-mini') OPENAI_MAX_TOKENS = int(os.getenv('OPENAI_MAX_TOKENS', '1000')) OPENAI_TEMPERATURE = float(os.getenv('OPENAI_TEMPERATURE', '0.7')) # OpenAI 사용 가능 여부 및 API 키 확인 if OPENAI_AVAILABLE and not OPENAI_API_KEY: logging.warning("OPENAI_API_KEY가 설정되지 않았습니다. AI 분석 기능을 사용할 수 없습니다.") OPENAI_AVAILABLE = False # System Prompt SYSTEM_PROMPT = """당신은 약국 고객 관리 전문가입니다. 고객의 구매 데이터를 분석하여 다음을 제공합니다: 1. 구매 패턴 및 행동 분석 2. 주요 관심 제품 카테고리 파악 3. 업셀링을 위한 제품 추천 4. 고객 맞춤형 마케팅 전략 응답은 반드시 JSON 형식으로 작성하며, 한국어로 작성합니다. 약국 운영자가 실제로 활용할 수 있는 구체적이고 실용적인 인사이트를 제공해야 합니다.""" # 에러 메시지 ERROR_MESSAGES = { 'NO_USER': '사용자를 찾을 수 없습니다.', 'NO_PURCHASES': '구매 이력이 없어 분석할 수 없습니다. 최소 1건 이상의 구매가 필요합니다.', 'OPENAI_NOT_AVAILABLE': 'AI 분석 기능을 사용할 수 없습니다. 관리자에게 문의하세요.', 'OPENAI_API_KEY_MISSING': 'OpenAI API 키가 설정되지 않았습니다.', 'OPENAI_API_ERROR': 'OpenAI API 호출에 실패했습니다. 잠시 후 다시 시도해주세요.', 'OPENAI_RATE_LIMIT': 'API 호출 횟수 제한에 도달했습니다. 잠시 후 다시 시도해주세요.', 'OPENAI_TIMEOUT': 'AI 분석 시간이 초과되었습니다. 다시 시도해주세요.', 'PARSING_ERROR': 'AI 응답을 처리하는 중 오류가 발생했습니다.', 'UNKNOWN_ERROR': '알 수 없는 오류가 발생했습니다. 관리자에게 문의하세요.' } def categorize_product(product_name): """제품명에서 카테고리 추정 (간단한 키워드 매칭)""" categories = { '소화제': ['타센', '베아제', '겔포스', '소화'], '진통제': ['타이레놀', '게보린', '펜잘', '이부프로펜'], '감기약': ['판콜', '화이투벤', '지르텍', '감기'], '피부약': ['후시딘', '마데카솔', '더마틱스'], '비타민': ['비타민', '센트룸', '활성비타민'], '안약': ['안약', '인공눈물'], '소염진통제': ['자미슬', '펠루비', '게보린'] } for category, keywords in categories.items(): for keyword in keywords: if keyword in product_name: return category return '기타' def prepare_analysis_prompt(user, purchases): """OpenAI API 전송용 프롬프트 생성""" # 사용자 정보 요약 user_summary = f"""사용자: {user['nickname']} ({user['phone']}) 가입일: {utc_to_kst_str(user['created_at']) if user['created_at'] else '-'} 포인트 잔액: {user['mileage_balance']:,}P 총 구매 건수: {len(purchases)}건 """ # 구매 이력 상세 purchase_details = [] total_spent = 0 all_products = [] product_freq = {} for idx, purchase in enumerate(purchases, 1): total_spent += purchase['amount'] products_str = ', '.join([f"{item['name']} x{item['qty']}" for item in purchase['items']]) # 제품 빈도 계산 for item in purchase['items']: product_name = item['name'] all_products.append(product_name) product_freq[product_name] = product_freq.get(product_name, 0) + 1 purchase_details.append( f"{idx}. {purchase['date']} - {purchase['amount']:,}원 구매, {purchase['points']}P 적립\n" f" 구매 품목: {products_str}" ) # 통계 계산 avg_purchase = total_spent // len(purchases) if purchases else 0 top_products = sorted(product_freq.items(), key=lambda x: x[1], reverse=True)[:5] top_products_str = ', '.join([f"{name}({count}회)" for name, count in top_products]) # 최종 프롬프트 조립 prompt = f"""다음은 약국 고객의 구매 데이터입니다. 구매 패턴을 분석하고 마케팅 전략을 제안해주세요. {user_summary} 통계 요약: - 총 구매 금액: {total_spent:,}원 - 평균 구매 금액: {avg_purchase:,}원 - 자주 구매한 품목: {top_products_str} 구매 이력 (최근 {len(purchases)}건): {chr(10).join(purchase_details)} 분석 요청사항: 1. 구매 패턴 분석: 구매 빈도, 구매 금액 패턴 등 2. 주로 구매하는 품목 카테고리 (예: 소화제, 감기약, 건강기능식품 등) 3. 추천 제품: 기존 구매 패턴을 기반으로 관심있을만한 제품 3-5가지 (업셀링) 4. 마케팅 전략: 이 고객에게 효과적일 프로모션 또는 포인트 활용 방안 응답은 다음 JSON 형식으로 해주세요: {{ "pattern": "구매 패턴에 대한 상세한 분석 (2-3문장)", "main_products": ["카테고리1: 품목들", "카테고리2: 품목들"], "recommendations": ["추천제품1 (이유)", "추천제품2 (이유)", "추천제품3 (이유)"], "marketing_strategy": "마케팅 전략 제안 (2-3문장)" }} """ return prompt def parse_openai_response(response_text): """OpenAI API 응답을 파싱하여 구조화된 데이터 반환""" import re try: # JSON 추출 (마크다운 코드 블록 제거) json_match = re.search(r'```json\s*(\{.*?\})\s*```', response_text, re.DOTALL) if json_match: json_str = json_match.group(1) else: # 코드 블록 없이 JSON만 있는 경우 json_str = response_text.strip() # JSON 파싱 analysis = json.loads(json_str) # 필수 필드 검증 required_fields = ['pattern', 'main_products', 'recommendations', 'marketing_strategy'] for field in required_fields: if field not in analysis: raise ValueError(f"필수 필드 누락: {field}") # 타입 검증 if not isinstance(analysis['main_products'], list): analysis['main_products'] = [str(analysis['main_products'])] if not isinstance(analysis['recommendations'], list): analysis['recommendations'] = [str(analysis['recommendations'])] return analysis except json.JSONDecodeError as e: # JSON 파싱 실패 시 fallback logging.error(f"JSON 파싱 실패: {e}") return { 'pattern': '응답 파싱에 실패했습니다.', 'main_products': ['분석 결과를 확인할 수 없습니다.'], 'recommendations': ['다시 시도해주세요.'], 'marketing_strategy': response_text[:500] } except Exception as e: logging.error(f"응답 파싱 오류: {e}") raise def handle_openai_error(error): """OpenAI API 에러를 사용자 친화적 메시지로 변환""" error_str = str(error).lower() if 'api key' in error_str or 'authentication' in error_str: return ERROR_MESSAGES['OPENAI_API_KEY_MISSING'] elif 'rate limit' in error_str or 'quota' in error_str: return ERROR_MESSAGES['OPENAI_RATE_LIMIT'] elif 'timeout' in error_str: return ERROR_MESSAGES['OPENAI_TIMEOUT'] else: return ERROR_MESSAGES['OPENAI_API_ERROR'] def call_openai_with_retry(prompt, max_retries=3): """재시도 로직을 포함한 OpenAI API 호출""" if not OPENAI_AVAILABLE: return False, ERROR_MESSAGES['OPENAI_NOT_AVAILABLE'] client = OpenAI(api_key=OPENAI_API_KEY) for attempt in range(max_retries): try: response = client.chat.completions.create( model=OPENAI_MODEL, messages=[ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": prompt} ], max_tokens=OPENAI_MAX_TOKENS, temperature=OPENAI_TEMPERATURE, timeout=30 ) return True, response except RateLimitError as e: if attempt < max_retries - 1: wait_time = 2 ** attempt logging.warning(f"OpenAI Rate limit, {wait_time}초 대기 후 재시도...") time.sleep(wait_time) else: return False, handle_openai_error(e) except APITimeoutError as e: if attempt < max_retries - 1: logging.warning(f"OpenAI 타임아웃, 재시도 중... ({attempt+1}/{max_retries})") time.sleep(1) else: return False, ERROR_MESSAGES['OPENAI_TIMEOUT'] except OpenAIError as e: return False, handle_openai_error(e) except Exception as e: logging.error(f"OpenAI API 호출 오류: {e}") return False, ERROR_MESSAGES['UNKNOWN_ERROR'] return False, ERROR_MESSAGES['OPENAI_TIMEOUT'] def verify_claim_token(transaction_id, nonce): """ QR 토큰 검증 Args: transaction_id (str): 거래 ID nonce (str): 12자 hex nonce Returns: tuple: (성공 여부, 메시지, 토큰 정보 dict) """ try: conn = db_manager.get_sqlite_connection() cursor = conn.cursor() # 1. 거래 ID로 토큰 조회 cursor.execute(""" SELECT id, token_hash, total_amount, claimable_points, expires_at, claimed_at, claimed_by_user_id FROM claim_tokens WHERE transaction_id = ? """, (transaction_id,)) token_record = cursor.fetchone() if not token_record: return (False, "유효하지 않은 QR 코드입니다.", None) # 2. 이미 적립된 토큰인지 확인 if token_record['claimed_at']: return (False, "이미 적립 완료된 영수증입니다.", None) # 3. 만료 확인 expires_at = datetime.strptime(token_record['expires_at'], '%Y-%m-%d %H:%M:%S') if datetime.now() > expires_at: return (False, "적립 기간이 만료되었습니다 (30일).", None) # 4. 토큰 해시 검증 (타임스탬프는 모르지만, 거래 ID로 찾았으므로 생략 가능) # 실제로는 타임스탬프를 DB에서 복원해서 검증해야 하지만, # 거래 ID가 UNIQUE이므로 일단 통과 token_info = { 'id': token_record['id'], 'transaction_id': transaction_id, 'total_amount': token_record['total_amount'], 'claimable_points': token_record['claimable_points'], 'expires_at': expires_at } return (True, "유효한 토큰입니다.", token_info) except Exception as e: return (False, f"토큰 검증 실패: {str(e)}", None) def get_or_create_user(phone, name): """ 사용자 조회 또는 생성 (간편 적립용) Args: phone (str): 전화번호 name (str): 이름 Returns: tuple: (user_id, is_new_user) """ conn = db_manager.get_sqlite_connection() cursor = conn.cursor() # 전화번호로 조회 cursor.execute(""" SELECT id, mileage_balance FROM users WHERE phone = ? """, (phone,)) user = cursor.fetchone() if user: return (user['id'], False) # 신규 생성 cursor.execute(""" INSERT INTO users (nickname, phone, mileage_balance) VALUES (?, ?, 0) """, (name, phone)) conn.commit() return (cursor.lastrowid, True) def claim_mileage(user_id, token_info): """ 마일리지 적립 처리 Args: user_id (int): 사용자 ID token_info (dict): 토큰 정보 Returns: tuple: (성공 여부, 메시지, 적립 후 잔액) """ try: conn = db_manager.get_sqlite_connection() cursor = conn.cursor() # 1. 현재 잔액 조회 cursor.execute("SELECT mileage_balance FROM users WHERE id = ?", (user_id,)) user = cursor.fetchone() current_balance = user['mileage_balance'] # 2. 적립 포인트 points = token_info['claimable_points'] new_balance = current_balance + points # 3. 사용자 잔액 업데이트 cursor.execute(""" UPDATE users SET mileage_balance = ?, updated_at = ? WHERE id = ? """, (new_balance, datetime.now().strftime('%Y-%m-%d %H:%M:%S'), user_id)) # 4. 마일리지 원장 기록 cursor.execute(""" INSERT INTO mileage_ledger (user_id, transaction_id, points, balance_after, reason, description) VALUES (?, ?, ?, ?, ?, ?) """, ( user_id, token_info['transaction_id'], points, new_balance, 'CLAIM', f"영수증 QR 적립 ({token_info['total_amount']:,}원 구매)" )) # 5. claim_tokens 업데이트 (적립 완료 표시) cursor.execute(""" UPDATE claim_tokens SET claimed_at = ?, claimed_by_user_id = ? WHERE id = ? """, (datetime.now().strftime('%Y-%m-%d %H:%M:%S'), user_id, token_info['id'])) conn.commit() return (True, f"{points}P 적립 완료!", new_balance) except Exception as e: conn.rollback() return (False, f"적립 처리 실패: {str(e)}", 0) def normalize_kakao_phone(kakao_phone): """ 카카오 전화번호 형식 변환 "+82 10-1234-5678" → "01012345678" """ if not kakao_phone: return None digits = kakao_phone.replace('+', '').replace('-', '').replace(' ', '') if digits.startswith('82'): digits = '0' + digits[2:] if len(digits) >= 10: return digits return None def link_kakao_identity(user_id, kakao_id, kakao_data): """카카오 계정을 customer_identities에 연결""" conn = db_manager.get_sqlite_connection() cursor = conn.cursor() cursor.execute(""" SELECT id FROM customer_identities WHERE provider = 'kakao' AND provider_user_id = ? """, (kakao_id,)) if not cursor.fetchone(): # raw_data 제외 (세션 크기 절약) store_data = {k: v for k, v in kakao_data.items() if k != 'raw_data'} cursor.execute(""" INSERT INTO customer_identities (user_id, provider, provider_user_id, provider_data) VALUES (?, 'kakao', ?, ?) """, (user_id, kakao_id, json.dumps(store_data, ensure_ascii=False))) # 프로필 이미지, 이메일 업데이트 updates = [] params = [] if kakao_data.get('profile_image'): updates.append("profile_image_url = ?") params.append(kakao_data['profile_image']) if kakao_data.get('email'): updates.append("email = ?") params.append(kakao_data['email']) if updates: params.append(user_id) cursor.execute(f"UPDATE users SET {', '.join(updates)} WHERE id = ?", params) conn.commit() def find_user_by_kakao_id(kakao_id): """카카오 ID로 기존 연결된 사용자 조회""" conn = db_manager.get_sqlite_connection() cursor = conn.cursor() cursor.execute(""" SELECT user_id FROM customer_identities WHERE provider = 'kakao' AND provider_user_id = ? """, (kakao_id,)) row = cursor.fetchone() return row['user_id'] if row else None # ============================================================================ # 라우트 # ============================================================================ @app.route('/') def index(): """메인 페이지""" logged_in = 'logged_in_user_id' in session return render_template('index.html', logged_in=logged_in, logged_in_name=session.get('logged_in_name', ''), logged_in_phone=session.get('logged_in_phone', '') ) @app.route('/claim') def claim(): """ QR 코드 랜딩 페이지 URL: /claim?t=transaction_id:nonce """ # 토큰 파라미터 파싱 token_param = request.args.get('t', '') if ':' not in token_param: return render_template('error.html', message="잘못된 QR 코드 형식입니다.") parts = token_param.split(':') if len(parts) != 2: return render_template('error.html', message="잘못된 QR 코드 형식입니다.") transaction_id, nonce = parts[0], parts[1] # 토큰 검증 success, message, token_info = verify_claim_token(transaction_id, nonce) if not success: return render_template('error.html', message=message) # 세션에 로그인된 유저가 있으면 자동 적립 (PWA) if 'logged_in_user_id' in session: auto_user_id = session['logged_in_user_id'] conn = db_manager.get_sqlite_connection() cursor = conn.cursor() cursor.execute("SELECT id, nickname, phone, mileage_balance FROM users WHERE id = ?", (auto_user_id,)) auto_user = cursor.fetchone() if auto_user: auto_success, auto_msg, auto_balance = claim_mileage(auto_user_id, token_info) if auto_success: return render_template('claim_kakao_success.html', points=token_info['claimable_points'], balance=auto_balance, phone=auto_user['phone'], name=auto_user['nickname']) return render_template('error.html', message=auto_msg) else: # 유저가 삭제됨 - 세션 클리어 session.pop('logged_in_user_id', None) session.pop('logged_in_phone', None) session.pop('logged_in_name', None) # MSSQL에서 구매 품목 조회 sale_items = [] try: db_session = db_manager.get_session('PM_PRES') sale_sub_query = text(""" SELECT ISNULL(G.GoodsName, '(약품명 없음)') AS goods_name, S.SL_NM_item AS quantity, S.SL_TOTAL_PRICE AS total FROM SALE_SUB S LEFT JOIN PM_DRUG.dbo.CD_GOODS G ON S.DrugCode = G.DrugCode WHERE S.SL_NO_order = :transaction_id ORDER BY S.DrugCode """) rows = db_session.execute(sale_sub_query, {'transaction_id': transaction_id}).fetchall() sale_items = [ {'name': r.goods_name, 'qty': int(r.quantity or 0), 'total': int(r.total or 0)} for r in rows ] except Exception as e: logging.warning(f"품목 조회 실패 (transaction_id={transaction_id}): {e}") return render_template('claim_form.html', token_info=token_info, sale_items=sale_items) @app.route('/api/claim', methods=['POST']) def api_claim(): """ 마일리지 적립 API POST /api/claim Body: { "transaction_id": "...", "nonce": "...", "phone": "010-1234-5678", "name": "홍길동" } """ try: data = request.get_json() transaction_id = data.get('transaction_id') nonce = data.get('nonce') phone = data.get('phone', '').strip() name = data.get('name', '').strip() privacy_consent = data.get('privacy_consent', False) # 입력 검증 if not phone or not name: return jsonify({ 'success': False, 'message': '전화번호와 이름을 모두 입력해주세요.' }), 400 # 개인정보 동의 검증 if not privacy_consent: return jsonify({ 'success': False, 'message': '개인정보 수집·이용에 동의해주세요.' }), 400 # 전화번호 형식 정리 (하이픈 제거) phone = phone.replace('-', '').replace(' ', '') if len(phone) < 10: return jsonify({ 'success': False, 'message': '올바른 전화번호를 입력해주세요.' }), 400 # 토큰 검증 success, message, token_info = verify_claim_token(transaction_id, nonce) if not success: return jsonify({ 'success': False, 'message': message }), 400 # 사용자 조회/생성 user_id, is_new = get_or_create_user(phone, name) # 마일리지 적립 success, message, new_balance = claim_mileage(user_id, token_info) if not success: return jsonify({ 'success': False, 'message': message }), 500 # 세션에 유저 정보 저장 (PWA 자동적립용) session.permanent = True session['logged_in_user_id'] = user_id session['logged_in_phone'] = phone session['logged_in_name'] = name return jsonify({ 'success': True, 'message': message, 'points': token_info['claimable_points'], 'balance': new_balance, 'is_new_user': is_new }) except Exception as e: return jsonify({ 'success': False, 'message': f'오류가 발생했습니다: {str(e)}' }), 500 # ============================================================================ # 카카오 적립 라우트 # ============================================================================ @app.route('/claim/kakao/start') def claim_kakao_start(): """카카오 OAuth 시작 - claim 컨텍스트를 state에 담아 카카오로 리다이렉트""" from services.kakao_client import get_kakao_client token_param = request.args.get('t', '') if ':' not in token_param: return render_template('error.html', message="잘못된 QR 코드입니다.") parts = token_param.split(':') if len(parts) != 2: return render_template('error.html', message="잘못된 QR 코드입니다.") transaction_id, nonce = parts # 토큰 사전 검증 success, message, token_info = verify_claim_token(transaction_id, nonce) if not success: return render_template('error.html', message=message) # state: claim 컨텍스트 + CSRF 토큰 csrf_token = secrets.token_hex(16) state_data = { 't': token_param, 'csrf': csrf_token } state_encoded = base64.urlsafe_b64encode( json.dumps(state_data).encode() ).decode() session['kakao_csrf'] = csrf_token kakao_client = get_kakao_client() auth_url = kakao_client.get_authorization_url(state=state_encoded) return redirect(auth_url) def _handle_mypage_kakao_callback(code, kakao_client): """마이페이지 카카오 콜백 처리 - 카카오 ID로 유저 조회 후 마이페이지 이동""" success, token_data = kakao_client.get_access_token(code) if not success: return render_template('error.html', message="카카오 인증에 실패했습니다.") access_token = token_data.get('access_token') success, user_info = kakao_client.get_user_info(access_token) if not success: return render_template('error.html', message="카카오 사용자 정보를 가져올 수 없습니다.") kakao_id = user_info.get('kakao_id') kakao_phone_raw = user_info.get('phone_number') kakao_phone = normalize_kakao_phone(kakao_phone_raw) # 1) 카카오 ID로 기존 유저 조회 existing_user_id = find_user_by_kakao_id(kakao_id) if existing_user_id: conn = db_manager.get_sqlite_connection() cursor = conn.cursor() cursor.execute("SELECT phone FROM users WHERE id = ?", (existing_user_id,)) row = cursor.fetchone() if row: return redirect(f"/my-page?phone={row['phone']}") # 2) 카카오에서 전화번호를 받은 경우 if kakao_phone: return redirect(f"/my-page?phone={kakao_phone}") # 3) 둘 다 없으면 전화번호 입력 안내 return render_template('error.html', message="카카오 계정에 연결된 적립 정보가 없습니다. 전화번호로 조회해주세요.") @app.route('/claim/kakao/callback') def claim_kakao_callback(): """카카오 OAuth 콜백 - 토큰 교환 → 사용자 정보 → 적립 처리""" from services.kakao_client import get_kakao_client code = request.args.get('code') state_encoded = request.args.get('state') error = request.args.get('error') if error: return render_template('error.html', message="카카오 로그인이 취소되었습니다.") if not code or not state_encoded: return render_template('error.html', message="카카오 인증 정보가 올바르지 않습니다.") # 1. state 디코딩 try: state_json = base64.urlsafe_b64decode(state_encoded).decode() state_data = json.loads(state_json) except Exception: return render_template('error.html', message="인증 상태 정보가 올바르지 않습니다.") # 2. CSRF 검증 csrf_token = state_data.get('csrf') if csrf_token != session.pop('kakao_csrf', None): return render_template('error.html', message="보안 검증에 실패했습니다. 다시 시도해주세요.") # 2.5 마이페이지 조회 목적이면 별도 처리 if state_data.get('purpose') == 'mypage': return _handle_mypage_kakao_callback(code, get_kakao_client()) # 3. claim 컨텍스트 복원 token_param = state_data.get('t', '') parts = token_param.split(':') if len(parts) != 2: return render_template('error.html', message="적립 정보가 올바르지 않습니다.") transaction_id, nonce = parts # 4. 토큰 재검증 (OAuth 중 다른 사람이 적립했을 수 있음) success, message, token_info = verify_claim_token(transaction_id, nonce) if not success: return render_template('error.html', message=message) # 5. 카카오 access_token 교환 kakao_client = get_kakao_client() success, token_data = kakao_client.get_access_token(code) if not success: logging.error(f"카카오 토큰 교환 실패: {token_data}") return render_template('error.html', message="카카오 인증에 실패했습니다. 다시 시도해주세요.") # 6. 사용자 정보 조회 access_token = token_data.get('access_token') success, user_info = kakao_client.get_user_info(access_token) if not success: logging.error(f"카카오 사용자 정보 조회 실패: {user_info}") return render_template('error.html', message="카카오 사용자 정보를 가져올 수 없습니다.") kakao_id = user_info.get('kakao_id') kakao_name = user_info.get('name') or user_info.get('nickname', '') kakao_phone_raw = user_info.get('phone_number') kakao_phone = normalize_kakao_phone(kakao_phone_raw) # 7. 분기: 전화번호가 있으면 자동 적립, 없으면 폰 입력 폼 if kakao_phone: # 자동 적립 existing_user_id = find_user_by_kakao_id(kakao_id) if existing_user_id: user_id = existing_user_id is_new = False else: user_id, is_new = get_or_create_user(kakao_phone, kakao_name) link_kakao_identity(user_id, kakao_id, user_info) success, msg, new_balance = claim_mileage(user_id, token_info) if not success: return render_template('error.html', message=msg) # 세션에 유저 정보 저장 (PWA 자동적립용) session.permanent = True session['logged_in_user_id'] = user_id session['logged_in_phone'] = kakao_phone session['logged_in_name'] = kakao_name return render_template('claim_kakao_success.html', points=token_info['claimable_points'], balance=new_balance, phone=kakao_phone, name=kakao_name) else: # 전화번호 없음 → 세션에 카카오 정보 저장 + 폰 입력 폼 session['kakao_claim'] = { 'kakao_id': kakao_id, 'name': kakao_name, 'profile_image': user_info.get('profile_image'), 'email': user_info.get('email'), 'token_param': token_param } return render_template('claim_kakao_phone.html', token_info=token_info, kakao_name=kakao_name, kakao_profile_image=user_info.get('profile_image')) @app.route('/api/claim/kakao', methods=['POST']) def api_claim_kakao(): """카카오 적립 (전화번호 폴백) - 세션의 카카오 정보 + 폼의 전화번호로 적립""" kakao_data = session.pop('kakao_claim', None) if not kakao_data: return jsonify({ 'success': False, 'message': '카카오 인증 정보가 만료되었습니다. 다시 시도해주세요.' }), 400 data = request.get_json() phone = data.get('phone', '').strip().replace('-', '').replace(' ', '') if len(phone) < 10: session['kakao_claim'] = kakao_data return jsonify({ 'success': False, 'message': '올바른 전화번호를 입력해주세요.' }), 400 token_param = kakao_data['token_param'] parts = token_param.split(':') transaction_id, nonce = parts success, message, token_info = verify_claim_token(transaction_id, nonce) if not success: return jsonify({'success': False, 'message': message}), 400 kakao_id = kakao_data['kakao_id'] name = kakao_data['name'] existing_user_id = find_user_by_kakao_id(kakao_id) if existing_user_id: user_id = existing_user_id is_new = False else: user_id, is_new = get_or_create_user(phone, name) link_kakao_identity(user_id, kakao_id, kakao_data) success, message, new_balance = claim_mileage(user_id, token_info) if not success: return jsonify({'success': False, 'message': message}), 500 # 세션에 유저 정보 저장 (PWA 자동적립용) session.permanent = True session['logged_in_user_id'] = user_id session['logged_in_phone'] = phone session['logged_in_name'] = name return jsonify({ 'success': True, 'message': message, 'points': token_info['claimable_points'], 'balance': new_balance, 'is_new_user': is_new }) @app.route('/my-page/kakao/start') def mypage_kakao_start(): """마이페이지 카카오 로그인 조회""" from services.kakao_client import get_kakao_client csrf_token = secrets.token_hex(16) state_data = { 'purpose': 'mypage', 'csrf': csrf_token } state_encoded = base64.urlsafe_b64encode( json.dumps(state_data).encode() ).decode() session['kakao_csrf'] = csrf_token kakao_client = get_kakao_client() auth_url = kakao_client.get_authorization_url(state=state_encoded) return redirect(auth_url) @app.route('/my-page') def my_page(): """마이페이지 (전화번호로 조회)""" phone = request.args.get('phone', '') if not phone: return render_template('my_page_login.html') # 전화번호로 사용자 조회 phone = phone.replace('-', '').replace(' ', '') conn = db_manager.get_sqlite_connection() cursor = conn.cursor() cursor.execute(""" SELECT id, nickname, phone, mileage_balance, created_at FROM users WHERE phone = ? """, (phone,)) user_raw = cursor.fetchone() if not user_raw: return render_template('error.html', message='등록되지 않은 전화번호입니다.') # 사용자 정보에 KST 시간 변환 적용 user = dict(user_raw) user['created_at'] = utc_to_kst_str(user_raw['created_at']) # 적립 내역 조회 (transaction_id 포함) cursor.execute(""" SELECT points, balance_after, reason, description, created_at, transaction_id FROM mileage_ledger WHERE user_id = ? ORDER BY created_at DESC LIMIT 20 """, (user['id'],)) transactions_raw = cursor.fetchall() # 거래 내역에 KST 시간 변환 적용 transactions = [] for tx in transactions_raw: tx_dict = dict(tx) tx_dict['created_at'] = utc_to_kst_str(tx['created_at']) transactions.append(tx_dict) return render_template('my_page.html', user=user, transactions=transactions) # ============================================================================ # PWA / 공통 라우트 # ============================================================================ @app.route('/sw.js') def service_worker(): """서비스 워커를 루트 경로에서 제공 (scope='/' 허용)""" return app.send_static_file('sw.js'), 200, { 'Content-Type': 'application/javascript', 'Service-Worker-Allowed': '/' } @app.route('/privacy') def privacy(): """개인정보 처리방침""" return render_template('privacy.html') @app.route('/logout') def logout(): """세션 로그아웃""" session.pop('logged_in_user_id', None) session.pop('logged_in_phone', None) session.pop('logged_in_name', None) return redirect('/') @app.route('/admin/transaction/') def admin_transaction_detail(transaction_id): """거래 세부 내역 조회 (MSSQL)""" try: # MSSQL PM_PRES 연결 session = db_manager.get_session('PM_PRES') # SALE_MAIN 조회 (거래 헤더) sale_main_query = text(""" SELECT SL_NO_order, InsertTime, SL_MY_total, SL_MY_discount, SL_MY_sale, SL_MY_credit, SL_MY_recive, SL_MY_rec_vat, ISNULL(SL_NM_custom, '[비고객]') AS customer_name FROM SALE_MAIN WHERE SL_NO_order = :transaction_id """) sale_main = session.execute(sale_main_query, {'transaction_id': transaction_id}).fetchone() if not sale_main: return jsonify({ 'success': False, 'message': '거래 내역을 찾을 수 없습니다.' }), 404 # SALE_SUB 조회 (판매 상품 상세) sale_sub_query = text(""" SELECT S.DrugCode, ISNULL(G.GoodsName, '(약품명 없음)') AS goods_name, S.SL_NM_item AS quantity, S.SL_NM_cost_a AS price, S.SL_TOTAL_PRICE AS total FROM SALE_SUB S LEFT JOIN PM_DRUG.dbo.CD_GOODS G ON S.DrugCode = G.DrugCode WHERE S.SL_NO_order = :transaction_id ORDER BY S.DrugCode """) sale_items = session.execute(sale_sub_query, {'transaction_id': transaction_id}).fetchall() # 결과를 JSON으로 반환 result = { 'success': True, 'transaction': { 'id': sale_main.SL_NO_order, 'date': str(sale_main.InsertTime), 'total_amount': int(sale_main.SL_MY_total or 0), 'discount': int(sale_main.SL_MY_discount or 0), 'sale_amount': int(sale_main.SL_MY_sale or 0), 'credit': int(sale_main.SL_MY_credit or 0), 'supply_value': int(sale_main.SL_MY_recive or 0), 'vat': int(sale_main.SL_MY_rec_vat or 0), 'customer_name': sale_main.customer_name }, 'items': [ { 'code': item.DrugCode, 'name': item.goods_name, 'qty': int(item.quantity or 0), 'price': int(item.price or 0), 'total': int(item.total or 0) } for item in sale_items ] } return jsonify(result) except Exception as e: return jsonify({ 'success': False, 'message': f'조회 실패: {str(e)}' }), 500 @app.route('/admin/user/') def admin_user_detail(user_id): """사용자 상세 이력 조회 - 구매 이력, 적립 이력, 구매 품목""" try: # 1. SQLite 연결 conn = db_manager.get_sqlite_connection() cursor = conn.cursor() # 2. 사용자 기본 정보 조회 cursor.execute(""" SELECT id, nickname, phone, mileage_balance, created_at FROM users WHERE id = ? """, (user_id,)) user = cursor.fetchone() if not user: return jsonify({ 'success': False, 'message': '사용자를 찾을 수 없습니다.' }), 404 # 3. 마일리지 이력 조회 (최근 50건) cursor.execute(""" SELECT transaction_id, points, balance_after, reason, description, created_at FROM mileage_ledger WHERE user_id = ? ORDER BY created_at DESC LIMIT 50 """, (user_id,)) mileage_history = cursor.fetchall() # 4. 구매 이력 조회 (적립된 거래만, 최근 20건) cursor.execute(""" SELECT transaction_id, total_amount, claimable_points, claimed_at FROM claim_tokens WHERE claimed_by_user_id = ? ORDER BY claimed_at DESC LIMIT 20 """, (user_id,)) claimed_tokens = cursor.fetchall() # 5. 각 거래의 상품 상세 조회 (MSSQL) purchases = [] try: session = db_manager.get_session('PM_PRES') for token in claimed_tokens: transaction_id = token['transaction_id'] # SALE_MAIN에서 거래 시간 조회 sale_main_query = text(""" SELECT InsertTime FROM SALE_MAIN WHERE SL_NO_order = :transaction_id """) sale_main = session.execute( sale_main_query, {'transaction_id': transaction_id} ).fetchone() # 거래 시간 추출 (MSSQL의 실제 거래 시간) if sale_main and sale_main.InsertTime: transaction_date = str(sale_main.InsertTime)[:16].replace('T', ' ') else: transaction_date = '-' # SALE_SUB + CD_GOODS JOIN (BARCODE 추가) sale_items_query = text(""" SELECT S.BARCODE, S.DrugCode, ISNULL(G.GoodsName, '(약품명 없음)') AS goods_name, S.SL_NM_item AS quantity, S.SL_NM_cost_a AS price, S.SL_TOTAL_PRICE AS total FROM SALE_SUB S LEFT JOIN PM_DRUG.dbo.CD_GOODS G ON S.DrugCode = G.DrugCode WHERE S.SL_NO_order = :transaction_id ORDER BY S.DrugCode """) items_raw = session.execute( sale_items_query, {'transaction_id': transaction_id} ).fetchall() # 상품 리스트 변환 (카테고리 포함) items = [] for item in items_raw: barcode = item.BARCODE # SQLite에서 제품 카테고리 조회 categories = [] if barcode: cursor.execute(""" SELECT category_name, relevance_score FROM product_category_mapping WHERE barcode = ? ORDER BY relevance_score DESC """, (barcode,)) for cat_row in cursor.fetchall(): categories.append({ 'name': cat_row[0], 'score': cat_row[1] }) items.append({ 'code': item.DrugCode, 'barcode': barcode, 'name': item.goods_name, 'qty': int(item.quantity or 0), 'price': int(item.price or 0), 'total': int(item.total or 0), 'categories': categories }) # 상품 요약 생성 ("첫번째상품명 외 N개") if items: first_item_name = items[0]['name'] items_count = len(items) if items_count == 1: items_summary = first_item_name else: items_summary = f"{first_item_name} 외 {items_count - 1}개" else: items_summary = "상품 정보 없음" items_count = 0 purchases.append({ 'transaction_id': transaction_id, 'date': transaction_date, # MSSQL의 실제 거래 시간 사용 'amount': int(token['total_amount']), 'points': int(token['claimable_points']), 'items_summary': items_summary, 'items_count': items_count, 'items': items }) except Exception as mssql_error: # MSSQL 연결 실패 시 빈 배열 반환 print(f"[WARNING] MSSQL 조회 실패 (user {user_id}): {mssql_error}") purchases = [] # 6. 응답 생성 return jsonify({ 'success': True, 'user': { 'id': user['id'], 'name': user['nickname'], 'phone': user['phone'], 'balance': user['mileage_balance'], 'created_at': utc_to_kst_str(user['created_at']) }, 'mileage_history': [ { 'points': ml['points'], 'balance_after': ml['balance_after'], 'reason': ml['reason'], 'description': ml['description'], 'created_at': utc_to_kst_str(ml['created_at']), 'transaction_id': ml['transaction_id'] } for ml in mileage_history ], 'purchases': purchases }) except Exception as e: return jsonify({ 'success': False, 'message': f'조회 실패: {str(e)}' }), 500 @app.route('/admin/search/user') def admin_search_user(): """사용자 검색 (이름/전화번호/전화번호 뒷자리)""" query = request.args.get('q', '').strip() search_type = request.args.get('type', 'name') # 'name', 'phone', 'phone_last' if not query: return jsonify({'success': False, 'message': '검색어를 입력하세요'}), 400 conn = db_manager.get_sqlite_connection() cursor = conn.cursor() try: if search_type == 'phone_last': # 전화번호 뒷자리 검색 cursor.execute(""" SELECT id, nickname, phone, mileage_balance FROM users WHERE phone LIKE ? ORDER BY created_at DESC """, (f'%{query}',)) elif search_type == 'phone': # 전체 전화번호 검색 cursor.execute(""" SELECT id, nickname, phone, mileage_balance FROM users WHERE phone = ? """, (query,)) else: # 이름 검색 cursor.execute(""" SELECT id, nickname, phone, mileage_balance FROM users WHERE nickname LIKE ? ORDER BY created_at DESC """, (f'%{query}%',)) results = cursor.fetchall() if not results: return jsonify({'success': False, 'message': '검색 결과가 없습니다'}), 404 if len(results) == 1: # 단일 매칭 - user_id만 반환 return jsonify({ 'success': True, 'multiple': False, 'user_id': results[0]['id'] }) else: # 여러 명 매칭 - 선택 모달용 데이터 반환 users = [{ 'id': row['id'], 'name': row['nickname'], 'phone': row['phone'], 'balance': row['mileage_balance'] } for row in results] return jsonify({ 'success': True, 'multiple': True, 'users': users }) except Exception as e: return jsonify({ 'success': False, 'message': f'검색 실패: {str(e)}' }), 500 @app.route('/admin/search/product') def admin_search_product(): """제품 검색 - 적립자 목록 반환 (SQLite 적립자 기준)""" query = request.args.get('q', '').strip() if not query: return jsonify({'success': False, 'message': '검색어를 입력하세요'}), 400 conn = db_manager.get_sqlite_connection() cursor = conn.cursor() try: # 1. MSSQL에서 제품명으로 거래번호 찾기 session = db_manager.get_session('PM_PRES') sale_items_query = text(""" SELECT DISTINCT S.SL_NO_order, S.SL_NM_item, M.InsertTime FROM SALE_SUB S LEFT JOIN PM_DRUG.dbo.CD_GOODS G ON S.DrugCode = G.DrugCode LEFT JOIN SALE_MAIN M ON S.SL_NO_order = M.SL_NO_order WHERE G.GoodsName LIKE :product_name ORDER BY M.InsertTime DESC """) sale_results = session.execute(sale_items_query, { 'product_name': f'%{query}%' }).fetchall() if not sale_results: return jsonify({ 'success': True, 'results': [] }) # 2. SQLite에서 적립된 거래만 필터링 (claimed_by_user_id IS NOT NULL) transaction_ids = [row.SL_NO_order for row in sale_results] placeholders = ','.join('?' * len(transaction_ids)) cursor.execute(f""" SELECT ct.transaction_id, ct.total_amount, ct.claimed_at, ct.claimed_by_user_id, u.nickname, u.phone FROM claim_tokens ct JOIN users u ON ct.claimed_by_user_id = u.id WHERE ct.transaction_id IN ({placeholders}) AND ct.claimed_by_user_id IS NOT NULL ORDER BY ct.claimed_at DESC LIMIT 50 """, transaction_ids) claimed_results = cursor.fetchall() # 3. 결과 조합 results = [] for claim_row in claimed_results: # 해당 거래의 MSSQL 정보 찾기 mssql_row = next((r for r in sale_results if r.SL_NO_order == claim_row['transaction_id']), None) if mssql_row: results.append({ 'user_id': claim_row['claimed_by_user_id'], 'user_name': claim_row['nickname'], 'user_phone': claim_row['phone'], 'purchase_date': str(mssql_row.InsertTime)[:16].replace('T', ' ') if mssql_row.InsertTime else '-', # MSSQL 실제 거래 시간 'claimed_date': str(claim_row['claimed_at'])[:16].replace('T', ' ') if claim_row['claimed_at'] else '-', # 적립 시간 'quantity': float(mssql_row.SL_NM_item or 0), 'total_amount': int(claim_row['total_amount']) }) return jsonify({ 'success': True, 'results': results }) except Exception as e: return jsonify({ 'success': False, 'message': f'검색 실패: {str(e)}' }), 500 @app.route('/admin/ai-analyze-user/', methods=['POST']) def admin_ai_analyze_user(user_id): """OpenAI GPT를 사용한 사용자 구매 패턴 AI 분석""" try: # OpenAI 사용 가능 여부 확인 if not OPENAI_AVAILABLE: return jsonify({ 'success': False, 'message': ERROR_MESSAGES['OPENAI_NOT_AVAILABLE'] }), 503 # 1. SQLite 연결 conn = db_manager.get_sqlite_connection() cursor = conn.cursor() # 2. 사용자 기본 정보 조회 cursor.execute(""" SELECT id, nickname, phone, mileage_balance, created_at FROM users WHERE id = ? """, (user_id,)) user_row = cursor.fetchone() if not user_row: return jsonify({ 'success': False, 'message': ERROR_MESSAGES['NO_USER'] }), 404 user = { 'id': user_row['id'], 'nickname': user_row['nickname'], 'phone': user_row['phone'], 'mileage_balance': user_row['mileage_balance'], 'created_at': user_row['created_at'] } # 3. 구매 이력 조회 (최근 20건) cursor.execute(""" SELECT transaction_id, total_amount, claimable_points, claimed_at FROM claim_tokens WHERE claimed_by_user_id = ? ORDER BY claimed_at DESC LIMIT 20 """, (user_id,)) claimed_tokens = cursor.fetchall() if not claimed_tokens: return jsonify({ 'success': False, 'message': ERROR_MESSAGES['NO_PURCHASES'] }), 400 # 4. MSSQL에서 상품 상세 조회 purchases = [] session = db_manager.get_session('PM_PRES') for token in claimed_tokens: transaction_id = token['transaction_id'] # SALE_MAIN에서 거래 시간 조회 sale_main_query = text(""" SELECT InsertTime FROM SALE_MAIN WHERE SL_NO_order = :transaction_id """) sale_main = session.execute(sale_main_query, {'transaction_id': transaction_id}).fetchone() # SALE_SUB + CD_GOODS JOIN sale_items_query = text(""" SELECT S.DrugCode, ISNULL(G.GoodsName, '(약품명 없음)') AS goods_name, S.SL_NM_item AS quantity, S.SL_NM_cost_a AS price, S.SL_TOTAL_PRICE AS total FROM SALE_SUB S LEFT JOIN PM_DRUG.dbo.CD_GOODS G ON S.DrugCode = G.DrugCode WHERE S.SL_NO_order = :transaction_id ORDER BY S.DrugCode """) items_raw = session.execute(sale_items_query, {'transaction_id': transaction_id}).fetchall() items = [{ 'name': item.goods_name, 'qty': int(item.quantity or 0), 'price': int(item.price or 0) } for item in items_raw] purchases.append({ 'date': str(sale_main.InsertTime)[:16].replace('T', ' ') if sale_main and sale_main.InsertTime else '-', 'amount': int(token['total_amount']), 'points': int(token['claimable_points']), 'items': items }) # 5. OpenAI API 호출용 프롬프트 생성 prompt = prepare_analysis_prompt(user, purchases) # 6. OpenAI API 호출 logging.info(f"AI 분석 시작: 사용자 ID {user_id}") success, response = call_openai_with_retry(prompt) if not success: # response에는 에러 메시지가 담겨 있음 return jsonify({ 'success': False, 'message': response }), 500 # 7. 응답 파싱 response_text = response.choices[0].message.content analysis = parse_openai_response(response_text) logging.info(f"AI 분석 완료: 사용자 ID {user_id}, 토큰: {response.usage.total_tokens}") # 8. 결과 반환 return jsonify({ 'success': True, 'user': { 'id': user['id'], 'name': user['nickname'], 'phone': user['phone'], 'balance': user['mileage_balance'] }, 'analysis': analysis, 'metadata': { 'model_used': OPENAI_MODEL, 'tokens_used': response.usage.total_tokens, 'analysis_time': datetime.now(KST).strftime('%Y-%m-%d %H:%M:%S') } }) except Exception as e: logging.error(f"AI 분석 오류: {e}") return jsonify({ 'success': False, 'message': ERROR_MESSAGES['UNKNOWN_ERROR'] }), 500 @app.route('/admin/use-points', methods=['POST']) def admin_use_points(): """관리자 페이지에서 포인트 사용 (차감)""" try: data = request.get_json() user_id = data.get('user_id') points_to_use = data.get('points') if not user_id or not points_to_use: return jsonify({ 'success': False, 'message': '사용자 ID와 포인트를 입력하세요.' }), 400 if points_to_use <= 0: return jsonify({ 'success': False, 'message': '1 이상의 포인트를 입력하세요.' }), 400 conn = db_manager.get_sqlite_connection() cursor = conn.cursor() # 1. 현재 사용자 잔액 확인 cursor.execute(""" SELECT id, nickname, mileage_balance FROM users WHERE id = ? """, (user_id,)) user = cursor.fetchone() if not user: return jsonify({ 'success': False, 'message': '사용자를 찾을 수 없습니다.' }), 404 current_balance = user['mileage_balance'] # 2. 잔액 확인 if points_to_use > current_balance: return jsonify({ 'success': False, 'message': f'잔액({current_balance:,}P)이 부족합니다.' }), 400 # 3. 포인트 차감 new_balance = current_balance - points_to_use cursor.execute(""" UPDATE users SET mileage_balance = ? WHERE id = ? """, (new_balance, user_id)) # 4. 마일리지 원장 기록 cursor.execute(""" INSERT INTO mileage_ledger (user_id, transaction_id, points, balance_after, reason, description) VALUES (?, NULL, ?, ?, 'USE', ?) """, ( user_id, -points_to_use, # 음수로 저장 new_balance, f'포인트 사용 (관리자) - {points_to_use:,}P 차감' )) conn.commit() logging.info(f"포인트 사용 완료: 사용자 ID {user_id}, 차감 {points_to_use}P, 잔액 {new_balance}P") return jsonify({ 'success': True, 'message': f'{points_to_use:,}P 사용 완료', 'new_balance': new_balance, 'used_points': points_to_use }) except Exception as e: logging.error(f"포인트 사용 실패: {e}") return jsonify({ 'success': False, 'message': f'포인트 사용 실패: {str(e)}' }), 500 @app.route('/admin') def admin(): """관리자 페이지 - 전체 사용자 및 적립 현황""" conn = db_manager.get_sqlite_connection() cursor = conn.cursor() # 전체 통계 cursor.execute(""" SELECT COUNT(*) as total_users, SUM(mileage_balance) as total_balance FROM users """) stats = cursor.fetchone() # 최근 가입 사용자 (20명) cursor.execute(""" SELECT id, nickname, phone, mileage_balance, created_at FROM users ORDER BY created_at DESC LIMIT 20 """) recent_users_raw = cursor.fetchall() # 시간을 KST로 변환 recent_users = [] for user in recent_users_raw: user_dict = dict(user) user_dict['created_at'] = utc_to_kst_str(user['created_at']) recent_users.append(user_dict) # 최근 적립 내역 (50건) cursor.execute(""" SELECT ml.id, u.nickname, u.phone, ml.points, ml.balance_after, ml.reason, ml.description, ml.created_at FROM mileage_ledger ml JOIN users u ON ml.user_id = u.id ORDER BY ml.created_at DESC LIMIT 50 """) recent_transactions_raw = cursor.fetchall() # 시간을 KST로 변환 recent_transactions = [] for trans in recent_transactions_raw: trans_dict = dict(trans) trans_dict['created_at'] = utc_to_kst_str(trans['created_at']) recent_transactions.append(trans_dict) # QR 토큰 통계 cursor.execute(""" SELECT COUNT(*) as total_tokens, SUM(CASE WHEN claimed_at IS NOT NULL THEN 1 ELSE 0 END) as claimed_count, SUM(CASE WHEN claimed_at IS NULL THEN 1 ELSE 0 END) as unclaimed_count, SUM(claimable_points) as total_points_issued, SUM(CASE WHEN claimed_at IS NOT NULL THEN claimable_points ELSE 0 END) as total_points_claimed FROM claim_tokens """) token_stats = cursor.fetchone() # 최근 QR 발행 내역 (20건) cursor.execute(""" SELECT transaction_id, total_amount, claimable_points, claimed_at, claimed_by_user_id, created_at FROM claim_tokens ORDER BY created_at DESC LIMIT 20 """) recent_tokens_raw = cursor.fetchall() # Convert only created_at (발행일), leave claimed_at (적립일) unconverted recent_tokens = [] for token in recent_tokens_raw: token_dict = dict(token) token_dict['created_at'] = utc_to_kst_str(token['created_at']) # Convert 발행일 # claimed_at stays as-is (or remains None if not claimed) recent_tokens.append(token_dict) return render_template('admin.html', stats=stats, recent_users=recent_users, recent_transactions=recent_transactions, token_stats=token_stats, recent_tokens=recent_tokens) if __name__ == '__main__': # 개발 모드로 실행 app.run(host='0.0.0.0', port=7001, debug=True)