""" Flask 웹 서버 - QR 마일리지 적립 간편 적립: 전화번호 + 이름만 입력 """ import sys import os # Windows 콘솔 UTF-8 강제 (한글 깨짐 방지) if sys.platform == 'win32': import io sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8') sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8') os.environ.setdefault('PYTHONIOENCODING', 'utf-8') from flask import Flask, request, render_template, jsonify, redirect, url_for, session import hashlib import base64 import secrets from datetime import datetime, timezone, timedelta import logging from sqlalchemy import text from dotenv import load_dotenv import json import time # 환경 변수 로드 load_dotenv() # OpenAI import try: from openai import OpenAI, OpenAIError, RateLimitError, APITimeoutError OPENAI_AVAILABLE = True except ImportError: OPENAI_AVAILABLE = False logging.warning("OpenAI 라이브러리가 설치되지 않았습니다. AI 분석 기능을 사용할 수 없습니다.") # Path setup sys.path.insert(0, os.path.dirname(__file__)) from db.dbsetup import DatabaseManager app = Flask(__name__) app.secret_key = 'pharmacy-qr-mileage-secret-key-2026' # 세션 설정 (PWA 자동적립 지원) app.config['SESSION_COOKIE_SECURE'] = not app.debug # HTTPS 전용 (로컬 개발 시 제외) app.config['SESSION_COOKIE_SAMESITE'] = 'Lax' # QR 스캔 시 쿠키 전송 허용 app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(days=90) # 3개월 유지 # 데이터베이스 매니저 db_manager = DatabaseManager() # KST 타임존 (UTC+9) KST = timezone(timedelta(hours=9)) # 키오스크 현재 세션 (메모리 변수, 서버 재시작 시 초기화) kiosk_current_session = None def utc_to_kst_str(utc_time_str): """ UTC 시간 문자열을 KST 시간 문자열로 변환 Args: utc_time_str (str): UTC 시간 문자열 (ISO 8601 형식) Returns: str: KST 시간 문자열 (YYYY-MM-DD HH:MM:SS) """ if not utc_time_str: return None try: # ISO 8601 형식 파싱 ('2026-01-23T12:28:36' 또는 '2026-01-23 12:28:36') utc_time_str = utc_time_str.replace(' ', 'T') # 공백을 T로 변환 # datetime 객체로 변환 if 'T' in utc_time_str: utc_time = datetime.fromisoformat(utc_time_str) else: utc_time = datetime.fromisoformat(utc_time_str) # UTC 타임존 설정 (naive datetime인 경우) if utc_time.tzinfo is None: utc_time = utc_time.replace(tzinfo=timezone.utc) # KST로 변환 kst_time = utc_time.astimezone(KST) # 문자열로 반환 (초 단위까지만) return kst_time.strftime('%Y-%m-%d %H:%M:%S') except Exception as e: logging.error(f"시간 변환 실패: {utc_time_str}, 오류: {e}") return utc_time_str # 변환 실패 시 원본 반환 # ===== OpenAI 설정 및 헬퍼 함수 ===== # OpenAI API 설정 OPENAI_API_KEY = os.getenv('OPENAI_API_KEY') OPENAI_MODEL = os.getenv('OPENAI_MODEL', 'gpt-4o-mini') OPENAI_MAX_TOKENS = int(os.getenv('OPENAI_MAX_TOKENS', '1000')) OPENAI_TEMPERATURE = float(os.getenv('OPENAI_TEMPERATURE', '0.7')) # OpenAI 사용 가능 여부 및 API 키 확인 if OPENAI_AVAILABLE and not OPENAI_API_KEY: logging.warning("OPENAI_API_KEY가 설정되지 않았습니다. AI 분석 기능을 사용할 수 없습니다.") OPENAI_AVAILABLE = False # System Prompt SYSTEM_PROMPT = """당신은 약국 고객 관리 전문가입니다. 고객의 구매 데이터를 분석하여 다음을 제공합니다: 1. 구매 패턴 및 행동 분석 2. 주요 관심 제품 카테고리 파악 3. 업셀링을 위한 제품 추천 4. 고객 맞춤형 마케팅 전략 응답은 반드시 JSON 형식으로 작성하며, 한국어로 작성합니다. 약국 운영자가 실제로 활용할 수 있는 구체적이고 실용적인 인사이트를 제공해야 합니다.""" # 에러 메시지 ERROR_MESSAGES = { 'NO_USER': '사용자를 찾을 수 없습니다.', 'NO_PURCHASES': '구매 이력이 없어 분석할 수 없습니다. 최소 1건 이상의 구매가 필요합니다.', 'OPENAI_NOT_AVAILABLE': 'AI 분석 기능을 사용할 수 없습니다. 관리자에게 문의하세요.', 'OPENAI_API_KEY_MISSING': 'OpenAI API 키가 설정되지 않았습니다.', 'OPENAI_API_ERROR': 'OpenAI API 호출에 실패했습니다. 잠시 후 다시 시도해주세요.', 'OPENAI_RATE_LIMIT': 'API 호출 횟수 제한에 도달했습니다. 잠시 후 다시 시도해주세요.', 'OPENAI_TIMEOUT': 'AI 분석 시간이 초과되었습니다. 다시 시도해주세요.', 'PARSING_ERROR': 'AI 응답을 처리하는 중 오류가 발생했습니다.', 'UNKNOWN_ERROR': '알 수 없는 오류가 발생했습니다. 관리자에게 문의하세요.' } def categorize_product(product_name): """제품명에서 카테고리 추정 (간단한 키워드 매칭)""" categories = { '소화제': ['타센', '베아제', '겔포스', '소화'], '진통제': ['타이레놀', '게보린', '펜잘', '이부프로펜'], '감기약': ['판콜', '화이투벤', '지르텍', '감기'], '피부약': ['후시딘', '마데카솔', '더마틱스'], '비타민': ['비타민', '센트룸', '활성비타민'], '안약': ['안약', '인공눈물'], '소염진통제': ['자미슬', '펠루비', '게보린'] } for category, keywords in categories.items(): for keyword in keywords: if keyword in product_name: return category return '기타' def prepare_analysis_prompt(user, purchases): """OpenAI API 전송용 프롬프트 생성""" # 사용자 정보 요약 user_summary = f"""사용자: {user['nickname']} ({user['phone']}) 가입일: {utc_to_kst_str(user['created_at']) if user['created_at'] else '-'} 포인트 잔액: {user['mileage_balance']:,}P 총 구매 건수: {len(purchases)}건 """ # 구매 이력 상세 purchase_details = [] total_spent = 0 all_products = [] product_freq = {} for idx, purchase in enumerate(purchases, 1): total_spent += purchase['amount'] products_str = ', '.join([f"{item['name']} x{item['qty']}" for item in purchase['items']]) # 제품 빈도 계산 for item in purchase['items']: product_name = item['name'] all_products.append(product_name) product_freq[product_name] = product_freq.get(product_name, 0) + 1 purchase_details.append( f"{idx}. {purchase['date']} - {purchase['amount']:,}원 구매, {purchase['points']}P 적립\n" f" 구매 품목: {products_str}" ) # 통계 계산 avg_purchase = total_spent // len(purchases) if purchases else 0 top_products = sorted(product_freq.items(), key=lambda x: x[1], reverse=True)[:5] top_products_str = ', '.join([f"{name}({count}회)" for name, count in top_products]) # 최종 프롬프트 조립 prompt = f"""다음은 약국 고객의 구매 데이터입니다. 구매 패턴을 분석하고 마케팅 전략을 제안해주세요. {user_summary} 통계 요약: - 총 구매 금액: {total_spent:,}원 - 평균 구매 금액: {avg_purchase:,}원 - 자주 구매한 품목: {top_products_str} 구매 이력 (최근 {len(purchases)}건): {chr(10).join(purchase_details)} 분석 요청사항: 1. 구매 패턴 분석: 구매 빈도, 구매 금액 패턴 등 2. 주로 구매하는 품목 카테고리 (예: 소화제, 감기약, 건강기능식품 등) 3. 추천 제품: 기존 구매 패턴을 기반으로 관심있을만한 제품 3-5가지 (업셀링) 4. 마케팅 전략: 이 고객에게 효과적일 프로모션 또는 포인트 활용 방안 응답은 다음 JSON 형식으로 해주세요: {{ "pattern": "구매 패턴에 대한 상세한 분석 (2-3문장)", "main_products": ["카테고리1: 품목들", "카테고리2: 품목들"], "recommendations": ["추천제품1 (이유)", "추천제품2 (이유)", "추천제품3 (이유)"], "marketing_strategy": "마케팅 전략 제안 (2-3문장)" }} """ return prompt def parse_openai_response(response_text): """OpenAI API 응답을 파싱하여 구조화된 데이터 반환""" import re try: # JSON 추출 (마크다운 코드 블록 제거) json_match = re.search(r'```json\s*(\{.*?\})\s*```', response_text, re.DOTALL) if json_match: json_str = json_match.group(1) else: # 코드 블록 없이 JSON만 있는 경우 json_str = response_text.strip() # JSON 파싱 analysis = json.loads(json_str) # 필수 필드 검증 required_fields = ['pattern', 'main_products', 'recommendations', 'marketing_strategy'] for field in required_fields: if field not in analysis: raise ValueError(f"필수 필드 누락: {field}") # 타입 검증 if not isinstance(analysis['main_products'], list): analysis['main_products'] = [str(analysis['main_products'])] if not isinstance(analysis['recommendations'], list): analysis['recommendations'] = [str(analysis['recommendations'])] return analysis except json.JSONDecodeError as e: # JSON 파싱 실패 시 fallback logging.error(f"JSON 파싱 실패: {e}") return { 'pattern': '응답 파싱에 실패했습니다.', 'main_products': ['분석 결과를 확인할 수 없습니다.'], 'recommendations': ['다시 시도해주세요.'], 'marketing_strategy': response_text[:500] } except Exception as e: logging.error(f"응답 파싱 오류: {e}") raise def handle_openai_error(error): """OpenAI API 에러를 사용자 친화적 메시지로 변환""" error_str = str(error).lower() if 'api key' in error_str or 'authentication' in error_str: return ERROR_MESSAGES['OPENAI_API_KEY_MISSING'] elif 'rate limit' in error_str or 'quota' in error_str: return ERROR_MESSAGES['OPENAI_RATE_LIMIT'] elif 'timeout' in error_str: return ERROR_MESSAGES['OPENAI_TIMEOUT'] else: return ERROR_MESSAGES['OPENAI_API_ERROR'] def call_openai_with_retry(prompt, max_retries=3): """재시도 로직을 포함한 OpenAI API 호출""" if not OPENAI_AVAILABLE: return False, ERROR_MESSAGES['OPENAI_NOT_AVAILABLE'] client = OpenAI(api_key=OPENAI_API_KEY) for attempt in range(max_retries): try: response = client.chat.completions.create( model=OPENAI_MODEL, messages=[ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": prompt} ], max_tokens=OPENAI_MAX_TOKENS, temperature=OPENAI_TEMPERATURE, timeout=30 ) return True, response except RateLimitError as e: if attempt < max_retries - 1: wait_time = 2 ** attempt logging.warning(f"OpenAI Rate limit, {wait_time}초 대기 후 재시도...") time.sleep(wait_time) else: return False, handle_openai_error(e) except APITimeoutError as e: if attempt < max_retries - 1: logging.warning(f"OpenAI 타임아웃, 재시도 중... ({attempt+1}/{max_retries})") time.sleep(1) else: return False, ERROR_MESSAGES['OPENAI_TIMEOUT'] except OpenAIError as e: return False, handle_openai_error(e) except Exception as e: logging.error(f"OpenAI API 호출 오류: {e}") return False, ERROR_MESSAGES['UNKNOWN_ERROR'] return False, ERROR_MESSAGES['OPENAI_TIMEOUT'] def verify_claim_token(transaction_id, nonce): """ QR 토큰 검증 Args: transaction_id (str): 거래 ID nonce (str): 12자 hex nonce Returns: tuple: (성공 여부, 메시지, 토큰 정보 dict) """ try: conn = db_manager.get_sqlite_connection() cursor = conn.cursor() # 1. 거래 ID로 토큰 조회 cursor.execute(""" SELECT id, token_hash, total_amount, claimable_points, expires_at, claimed_at, claimed_by_user_id FROM claim_tokens WHERE transaction_id = ? """, (transaction_id,)) token_record = cursor.fetchone() if not token_record: return (False, "유효하지 않은 QR 코드입니다.", None) # 2. 이미 적립된 토큰인지 확인 if token_record['claimed_at']: return (False, "이미 적립 완료된 영수증입니다.", None) # 3. 만료 확인 expires_at = datetime.strptime(token_record['expires_at'], '%Y-%m-%d %H:%M:%S') if datetime.now() > expires_at: return (False, "적립 기간이 만료되었습니다 (30일).", None) # 4. 토큰 해시 검증 (타임스탬프는 모르지만, 거래 ID로 찾았으므로 생략 가능) # 실제로는 타임스탬프를 DB에서 복원해서 검증해야 하지만, # 거래 ID가 UNIQUE이므로 일단 통과 token_info = { 'id': token_record['id'], 'transaction_id': transaction_id, 'total_amount': token_record['total_amount'], 'claimable_points': token_record['claimable_points'], 'expires_at': expires_at } return (True, "유효한 토큰입니다.", token_info) except Exception as e: return (False, f"토큰 검증 실패: {str(e)}", None) def get_or_create_user(phone, name, birthday=None): """ 사용자 조회 또는 생성 (간편 적립용) Args: phone (str): 전화번호 name (str): 이름 birthday (str, optional): 생년월일 (YYYY-MM-DD) Returns: tuple: (user_id, is_new_user) """ conn = db_manager.get_sqlite_connection() cursor = conn.cursor() # 전화번호로 조회 cursor.execute(""" SELECT id, mileage_balance FROM users WHERE phone = ? """, (phone,)) user = cursor.fetchone() if user: # 기존 유저: birthday가 제공되면 업데이트 if birthday: cursor.execute("UPDATE users SET birthday = ? WHERE id = ?", (birthday, user['id'])) conn.commit() return (user['id'], False) # 신규 생성 cursor.execute(""" INSERT INTO users (nickname, phone, birthday, mileage_balance) VALUES (?, ?, ?, 0) """, (name, phone, birthday)) conn.commit() return (cursor.lastrowid, True) def claim_mileage(user_id, token_info): """ 마일리지 적립 처리 Args: user_id (int): 사용자 ID token_info (dict): 토큰 정보 Returns: tuple: (성공 여부, 메시지, 적립 후 잔액) """ try: conn = db_manager.get_sqlite_connection() cursor = conn.cursor() # 1. 현재 잔액 조회 cursor.execute("SELECT mileage_balance FROM users WHERE id = ?", (user_id,)) user = cursor.fetchone() current_balance = user['mileage_balance'] # 2. 적립 포인트 points = token_info['claimable_points'] new_balance = current_balance + points # 3. 사용자 잔액 업데이트 cursor.execute(""" UPDATE users SET mileage_balance = ?, updated_at = ? WHERE id = ? """, (new_balance, datetime.now().strftime('%Y-%m-%d %H:%M:%S'), user_id)) # 4. 마일리지 원장 기록 cursor.execute(""" INSERT INTO mileage_ledger (user_id, transaction_id, points, balance_after, reason, description) VALUES (?, ?, ?, ?, ?, ?) """, ( user_id, token_info['transaction_id'], points, new_balance, 'CLAIM', f"영수증 QR 적립 ({token_info['total_amount']:,}원 구매)" )) # 5. claim_tokens 업데이트 (적립 완료 표시) cursor.execute(""" UPDATE claim_tokens SET claimed_at = ?, claimed_by_user_id = ? WHERE id = ? """, (datetime.now().strftime('%Y-%m-%d %H:%M:%S'), user_id, token_info['id'])) conn.commit() return (True, f"{points}P 적립 완료!", new_balance) except Exception as e: conn.rollback() return (False, f"적립 처리 실패: {str(e)}", 0) def normalize_kakao_phone(kakao_phone): """ 카카오 전화번호 형식 변환 "+82 10-1234-5678" → "01012345678" """ if not kakao_phone: return None digits = kakao_phone.replace('+', '').replace('-', '').replace(' ', '') if digits.startswith('82'): digits = '0' + digits[2:] if len(digits) >= 10: return digits return None def link_kakao_identity(user_id, kakao_id, kakao_data): """카카오 계정을 customer_identities에 연결""" conn = db_manager.get_sqlite_connection() cursor = conn.cursor() cursor.execute(""" SELECT id FROM customer_identities WHERE provider = 'kakao' AND provider_user_id = ? """, (kakao_id,)) if not cursor.fetchone(): # raw_data 제외 (세션 크기 절약) store_data = {k: v for k, v in kakao_data.items() if k != 'raw_data'} cursor.execute(""" INSERT INTO customer_identities (user_id, provider, provider_user_id, provider_data) VALUES (?, 'kakao', ?, ?) """, (user_id, kakao_id, json.dumps(store_data, ensure_ascii=False))) # 프로필 이미지, 이메일 업데이트 updates = [] params = [] if kakao_data.get('profile_image'): updates.append("profile_image_url = ?") params.append(kakao_data['profile_image']) if kakao_data.get('email'): updates.append("email = ?") params.append(kakao_data['email']) if updates: params.append(user_id) cursor.execute(f"UPDATE users SET {', '.join(updates)} WHERE id = ?", params) conn.commit() def find_user_by_kakao_id(kakao_id): """카카오 ID로 기존 연결된 사용자 조회""" conn = db_manager.get_sqlite_connection() cursor = conn.cursor() cursor.execute(""" SELECT user_id FROM customer_identities WHERE provider = 'kakao' AND provider_user_id = ? """, (kakao_id,)) row = cursor.fetchone() return row['user_id'] if row else None # ============================================================================ # 라우트 # ============================================================================ @app.route('/') def index(): """메인 페이지""" logged_in = 'logged_in_user_id' in session return render_template('index.html', logged_in=logged_in, logged_in_name=session.get('logged_in_name', ''), logged_in_phone=session.get('logged_in_phone', '') ) @app.route('/signup') def signup(): """회원가입 페이지""" if 'logged_in_user_id' in session: return redirect(f"/my-page?phone={session.get('logged_in_phone', '')}") return render_template('signup.html') @app.route('/api/signup', methods=['POST']) def api_signup(): """회원가입 API""" try: data = request.get_json() name = data.get('name', '').strip() phone = data.get('phone', '').strip().replace('-', '').replace(' ', '') birthday = data.get('birthday', '').strip() or None # 선택 항목 if not name or not phone: return jsonify({'success': False, 'message': '이름과 전화번호를 모두 입력해주세요.'}), 400 if len(phone) < 10: return jsonify({'success': False, 'message': '올바른 전화번호를 입력해주세요.'}), 400 user_id, is_new = get_or_create_user(phone, name, birthday=birthday) # 세션에 유저 정보 저장 session.permanent = True session['logged_in_user_id'] = user_id session['logged_in_phone'] = phone session['logged_in_name'] = name return jsonify({ 'success': True, 'message': '가입이 완료되었습니다.' if is_new else '이미 가입된 회원입니다. 로그인되었습니다.', 'is_new': is_new }) except Exception as e: return jsonify({'success': False, 'message': f'오류가 발생했습니다: {str(e)}'}), 500 @app.route('/claim') def claim(): """ QR 코드 랜딩 페이지 URL: /claim?t=transaction_id:nonce """ # 토큰 파라미터 파싱 token_param = request.args.get('t', '') if ':' not in token_param: return render_template('error.html', message="잘못된 QR 코드 형식입니다.") parts = token_param.split(':') if len(parts) != 2: return render_template('error.html', message="잘못된 QR 코드 형식입니다.") transaction_id, nonce = parts[0], parts[1] # 토큰 검증 success, message, token_info = verify_claim_token(transaction_id, nonce) if not success: return render_template('error.html', message=message) # 세션에 로그인된 유저가 있으면 자동 적립 (PWA) if 'logged_in_user_id' in session: auto_user_id = session['logged_in_user_id'] conn = db_manager.get_sqlite_connection() cursor = conn.cursor() cursor.execute("SELECT id, nickname, phone, mileage_balance FROM users WHERE id = ?", (auto_user_id,)) auto_user = cursor.fetchone() if auto_user: auto_success, auto_msg, auto_balance = claim_mileage(auto_user_id, token_info) if auto_success: return render_template('claim_kakao_success.html', points=token_info['claimable_points'], balance=auto_balance, phone=auto_user['phone'], name=auto_user['nickname']) return render_template('error.html', message=auto_msg) else: # 유저가 삭제됨 - 세션 클리어 session.pop('logged_in_user_id', None) session.pop('logged_in_phone', None) session.pop('logged_in_name', None) # MSSQL에서 구매 품목 조회 sale_items = [] try: db_session = db_manager.get_session('PM_PRES') sale_sub_query = text(""" SELECT ISNULL(G.GoodsName, '(약품명 없음)') AS goods_name, S.SL_NM_item AS quantity, S.SL_TOTAL_PRICE AS total FROM SALE_SUB S LEFT JOIN PM_DRUG.dbo.CD_GOODS G ON S.DrugCode = G.DrugCode WHERE S.SL_NO_order = :transaction_id ORDER BY S.DrugCode """) rows = db_session.execute(sale_sub_query, {'transaction_id': transaction_id}).fetchall() sale_items = [ {'name': r.goods_name, 'qty': int(r.quantity or 0), 'total': int(r.total or 0)} for r in rows ] except Exception as e: logging.warning(f"품목 조회 실패 (transaction_id={transaction_id}): {e}") # JS SDK용 카카오 state 생성 (CSRF 보호) csrf_token = secrets.token_hex(16) state_data = {'t': token_param, 'csrf': csrf_token} kakao_state = base64.urlsafe_b64encode( json.dumps(state_data).encode() ).decode() session['kakao_csrf'] = csrf_token return render_template('claim_form.html', token_info=token_info, sale_items=sale_items, kakao_state=kakao_state) @app.route('/api/claim', methods=['POST']) def api_claim(): """ 마일리지 적립 API POST /api/claim Body: { "transaction_id": "...", "nonce": "...", "phone": "010-1234-5678", "name": "홍길동" } """ try: data = request.get_json() transaction_id = data.get('transaction_id') nonce = data.get('nonce') phone = data.get('phone', '').strip() name = data.get('name', '').strip() privacy_consent = data.get('privacy_consent', False) # 입력 검증 if not phone or not name: return jsonify({ 'success': False, 'message': '전화번호와 이름을 모두 입력해주세요.' }), 400 # 개인정보 동의 검증 if not privacy_consent: return jsonify({ 'success': False, 'message': '개인정보 수집·이용에 동의해주세요.' }), 400 # 전화번호 형식 정리 (하이픈 제거) phone = phone.replace('-', '').replace(' ', '') if len(phone) < 10: return jsonify({ 'success': False, 'message': '올바른 전화번호를 입력해주세요.' }), 400 # 토큰 검증 success, message, token_info = verify_claim_token(transaction_id, nonce) if not success: return jsonify({ 'success': False, 'message': message }), 400 # 사용자 조회/생성 user_id, is_new = get_or_create_user(phone, name) # 마일리지 적립 success, message, new_balance = claim_mileage(user_id, token_info) if not success: return jsonify({ 'success': False, 'message': message }), 500 # 세션에 유저 정보 저장 (PWA 자동적립용) session.permanent = True session['logged_in_user_id'] = user_id session['logged_in_phone'] = phone session['logged_in_name'] = name return jsonify({ 'success': True, 'message': message, 'points': token_info['claimable_points'], 'balance': new_balance, 'is_new_user': is_new }) except Exception as e: return jsonify({ 'success': False, 'message': f'오류가 발생했습니다: {str(e)}' }), 500 # ============================================================================ # 카카오 적립 라우트 # ============================================================================ @app.route('/claim/kakao/start') def claim_kakao_start(): """카카오 OAuth 시작 - claim 컨텍스트를 state에 담아 카카오로 리다이렉트""" from services.kakao_client import get_kakao_client token_param = request.args.get('t', '') if ':' not in token_param: return render_template('error.html', message="잘못된 QR 코드입니다.") parts = token_param.split(':') if len(parts) != 2: return render_template('error.html', message="잘못된 QR 코드입니다.") transaction_id, nonce = parts # 토큰 사전 검증 success, message, token_info = verify_claim_token(transaction_id, nonce) if not success: return render_template('error.html', message=message) # state: claim 컨텍스트 + CSRF 토큰 csrf_token = secrets.token_hex(16) state_data = { 't': token_param, 'csrf': csrf_token } state_encoded = base64.urlsafe_b64encode( json.dumps(state_data).encode() ).decode() session['kakao_csrf'] = csrf_token kakao_client = get_kakao_client() auth_url = kakao_client.get_authorization_url(state=state_encoded) return redirect(auth_url) def _handle_mypage_kakao_callback(code, kakao_client): """ 마이페이지 카카오 콜백 처리 - 카카오 연동(머지) + 마이페이지 이동 케이스별 동작: A) 카카오 ID가 이미 연결된 유저 → 그 유저의 마이페이지로 이동 B) 미연결 + 카카오 전화번호로 기존 유저 발견 → 카카오 연동 후 이동 C) 미연결 + 기존 유저 없음 → 신규 생성 + 카카오 연동 D) 전화번호 없음 → 에러 안내 """ success, token_data = kakao_client.get_access_token(code) if not success: return render_template('error.html', message="카카오 인증에 실패했습니다.") access_token = token_data.get('access_token') success, user_info = kakao_client.get_user_info(access_token) if not success: return render_template('error.html', message="카카오 사용자 정보를 가져올 수 없습니다.") kakao_id = user_info.get('kakao_id') kakao_phone_raw = user_info.get('phone_number') kakao_phone = normalize_kakao_phone(kakao_phone_raw) kakao_name = user_info.get('name') or user_info.get('nickname', '고객') conn = db_manager.get_sqlite_connection() cursor = conn.cursor() # Case A: 카카오 ID로 이미 연결된 유저 있음 existing_user_id = find_user_by_kakao_id(kakao_id) if existing_user_id: # "고객" 이름이면 카카오 실명으로 업데이트 if kakao_name and kakao_name != '고객': cursor.execute( "UPDATE users SET nickname = ? WHERE id = ? AND nickname = '고객'", (kakao_name, existing_user_id)) conn.commit() cursor.execute("SELECT phone FROM users WHERE id = ?", (existing_user_id,)) row = cursor.fetchone() if row and row['phone']: return redirect(f"/my-page?phone={row['phone']}") # 전화번호 없으면 연동 불가 if not kakao_phone: return render_template('error.html', message="카카오 계정에 전화번호 정보가 없습니다. 카카오 설정에서 전화번호를 등록해주세요.") # Case B/C: 전화번호로 기존 유저 조회 cursor.execute("SELECT id, nickname FROM users WHERE phone = ?", (kakao_phone,)) phone_user = cursor.fetchone() if phone_user: # Case B: 기존 전화번호 유저 → 카카오 연동 (머지) user_id = phone_user['id'] link_kakao_identity(user_id, kakao_id, user_info) # "고객" 이름이면 카카오 실명으로 업데이트 if phone_user['nickname'] == '고객' and kakao_name and kakao_name != '고객': cursor.execute("UPDATE users SET nickname = ? WHERE id = ?", (kakao_name, user_id)) conn.commit() logging.info(f"마이페이지 카카오 머지: user_id={user_id}, kakao_id={kakao_id}") else: # Case C: 신규 유저 생성 + 카카오 연동 user_id, _ = get_or_create_user(kakao_phone, kakao_name) link_kakao_identity(user_id, kakao_id, user_info) logging.info(f"마이페이지 카카오 신규: user_id={user_id}, kakao_id={kakao_id}") return redirect(f"/my-page?phone={kakao_phone}") @app.route('/claim/kakao/callback') def claim_kakao_callback(): """카카오 OAuth 콜백 - 토큰 교환 → 사용자 정보 → 적립 처리""" from services.kakao_client import get_kakao_client code = request.args.get('code') state_encoded = request.args.get('state') error = request.args.get('error') if error: return render_template('error.html', message="카카오 로그인이 취소되었습니다.") if not code or not state_encoded: return render_template('error.html', message="카카오 인증 정보가 올바르지 않습니다.") # 1. state 디코딩 try: state_json = base64.urlsafe_b64decode(state_encoded).decode() state_data = json.loads(state_json) except Exception: return render_template('error.html', message="인증 상태 정보가 올바르지 않습니다.") # 2. CSRF 검증 csrf_token = state_data.get('csrf') if csrf_token != session.pop('kakao_csrf', None): return render_template('error.html', message="보안 검증에 실패했습니다. 다시 시도해주세요.") # 2.5 마이페이지 조회 목적이면 별도 처리 if state_data.get('purpose') == 'mypage': return _handle_mypage_kakao_callback(code, get_kakao_client()) # 3. claim 컨텍스트 복원 token_param = state_data.get('t', '') parts = token_param.split(':') if len(parts) != 2: return render_template('error.html', message="적립 정보가 올바르지 않습니다.") transaction_id, nonce = parts # 4. 토큰 재검증 (OAuth 중 다른 사람이 적립했을 수 있음) success, message, token_info = verify_claim_token(transaction_id, nonce) if not success: return render_template('error.html', message=message) # 5. 카카오 access_token 교환 kakao_client = get_kakao_client() success, token_data = kakao_client.get_access_token(code) if not success: logging.error(f"카카오 토큰 교환 실패: {token_data}") return render_template('error.html', message="카카오 인증에 실패했습니다. 다시 시도해주세요.") # 6. 사용자 정보 조회 access_token = token_data.get('access_token') success, user_info = kakao_client.get_user_info(access_token) if not success: logging.error(f"카카오 사용자 정보 조회 실패: {user_info}") return render_template('error.html', message="카카오 사용자 정보를 가져올 수 없습니다.") kakao_id = user_info.get('kakao_id') kakao_name = user_info.get('name') or user_info.get('nickname', '') kakao_phone_raw = user_info.get('phone_number') kakao_phone = normalize_kakao_phone(kakao_phone_raw) # 카카오에서 받은 생년월일 조합 (YYYY-MMDD) kakao_birthday = None if user_info.get('birthyear') and user_info.get('birthday'): kakao_birthday = f"{user_info['birthyear']}-{user_info['birthday'][:2]}-{user_info['birthday'][2:]}" # 7. 분기: 전화번호가 있으면 자동 적립, 없으면 폰 입력 폼 if kakao_phone: # 자동 적립 existing_user_id = find_user_by_kakao_id(kakao_id) if existing_user_id: user_id = existing_user_id is_new = False # 생년월일이 있으면 업데이트 if kakao_birthday: conn = db_manager.get_sqlite_connection() conn.cursor().execute("UPDATE users SET birthday = ? WHERE id = ? AND birthday IS NULL", (kakao_birthday, user_id)) conn.commit() else: user_id, is_new = get_or_create_user(kakao_phone, kakao_name, birthday=kakao_birthday) link_kakao_identity(user_id, kakao_id, user_info) success, msg, new_balance = claim_mileage(user_id, token_info) if not success: return render_template('error.html', message=msg) # 세션에 유저 정보 저장 (PWA 자동적립용) session.permanent = True session['logged_in_user_id'] = user_id session['logged_in_phone'] = kakao_phone session['logged_in_name'] = kakao_name return render_template('claim_kakao_success.html', points=token_info['claimable_points'], balance=new_balance, phone=kakao_phone, name=kakao_name) else: # 전화번호 없음 → 세션에 카카오 정보 저장 + 폰 입력 폼 session['kakao_claim'] = { 'kakao_id': kakao_id, 'name': kakao_name, 'profile_image': user_info.get('profile_image'), 'email': user_info.get('email'), 'token_param': token_param } return render_template('claim_kakao_phone.html', token_info=token_info, kakao_name=kakao_name, kakao_profile_image=user_info.get('profile_image')) @app.route('/api/claim/kakao', methods=['POST']) def api_claim_kakao(): """카카오 적립 (전화번호 폴백) - 세션의 카카오 정보 + 폼의 전화번호로 적립""" kakao_data = session.pop('kakao_claim', None) if not kakao_data: return jsonify({ 'success': False, 'message': '카카오 인증 정보가 만료되었습니다. 다시 시도해주세요.' }), 400 data = request.get_json() phone = data.get('phone', '').strip().replace('-', '').replace(' ', '') if len(phone) < 10: session['kakao_claim'] = kakao_data return jsonify({ 'success': False, 'message': '올바른 전화번호를 입력해주세요.' }), 400 token_param = kakao_data['token_param'] parts = token_param.split(':') transaction_id, nonce = parts success, message, token_info = verify_claim_token(transaction_id, nonce) if not success: return jsonify({'success': False, 'message': message}), 400 kakao_id = kakao_data['kakao_id'] name = kakao_data['name'] existing_user_id = find_user_by_kakao_id(kakao_id) if existing_user_id: user_id = existing_user_id is_new = False else: user_id, is_new = get_or_create_user(phone, name) link_kakao_identity(user_id, kakao_id, kakao_data) success, message, new_balance = claim_mileage(user_id, token_info) if not success: return jsonify({'success': False, 'message': message}), 500 # 세션에 유저 정보 저장 (PWA 자동적립용) session.permanent = True session['logged_in_user_id'] = user_id session['logged_in_phone'] = phone session['logged_in_name'] = name return jsonify({ 'success': True, 'message': message, 'points': token_info['claimable_points'], 'balance': new_balance, 'is_new_user': is_new }) @app.route('/my-page/kakao/start') def mypage_kakao_start(): """마이페이지 카카오 로그인 조회""" from services.kakao_client import get_kakao_client csrf_token = secrets.token_hex(16) state_data = { 'purpose': 'mypage', 'csrf': csrf_token } state_encoded = base64.urlsafe_b64encode( json.dumps(state_data).encode() ).decode() session['kakao_csrf'] = csrf_token kakao_client = get_kakao_client() auth_url = kakao_client.get_authorization_url(state=state_encoded) return redirect(auth_url) @app.route('/my-page') def my_page(): """마이페이지 (전화번호로 조회)""" phone = request.args.get('phone', '') if not phone: # JS SDK용 카카오 state 생성 csrf_token = secrets.token_hex(16) state_data = {'purpose': 'mypage', 'csrf': csrf_token} kakao_state = base64.urlsafe_b64encode( json.dumps(state_data).encode() ).decode() session['kakao_csrf'] = csrf_token return render_template('my_page_login.html', kakao_state=kakao_state) # 전화번호로 사용자 조회 phone = phone.replace('-', '').replace(' ', '') conn = db_manager.get_sqlite_connection() cursor = conn.cursor() cursor.execute(""" SELECT id, nickname, phone, mileage_balance, created_at FROM users WHERE phone = ? """, (phone,)) user_raw = cursor.fetchone() if not user_raw: return render_template('error.html', message='등록되지 않은 전화번호입니다.') # 사용자 정보에 KST 시간 변환 적용 user = dict(user_raw) user['created_at'] = utc_to_kst_str(user_raw['created_at']) # 적립 내역 조회 (transaction_id 포함) cursor.execute(""" SELECT points, balance_after, reason, description, created_at, transaction_id FROM mileage_ledger WHERE user_id = ? ORDER BY created_at DESC LIMIT 20 """, (user['id'],)) transactions_raw = cursor.fetchall() # 거래 내역에 KST 시간 변환 적용 transactions = [] for tx in transactions_raw: tx_dict = dict(tx) tx_dict['created_at'] = utc_to_kst_str(tx['created_at']) transactions.append(tx_dict) return render_template('my_page.html', user=user, transactions=transactions, user_id=user['id']) # ============================================================================ # PWA / 공통 라우트 # ============================================================================ @app.route('/sw.js') def service_worker(): """서비스 워커를 루트 경로에서 제공 (scope='/' 허용)""" return app.send_static_file('sw.js'), 200, { 'Content-Type': 'application/javascript', 'Service-Worker-Allowed': '/' } @app.route('/privacy') def privacy(): """개인정보 처리방침""" return render_template('privacy.html') @app.route('/logout') def logout(): """세션 로그아웃""" session.pop('logged_in_user_id', None) session.pop('logged_in_phone', None) session.pop('logged_in_name', None) return redirect('/') @app.route('/admin/transaction/') def admin_transaction_detail(transaction_id): """거래 세부 내역 조회 (MSSQL)""" try: # MSSQL PM_PRES 연결 session = db_manager.get_session('PM_PRES') # SALE_MAIN 조회 (거래 헤더) sale_main_query = text(""" SELECT SL_NO_order, InsertTime, SL_MY_total, SL_MY_discount, SL_MY_sale, SL_MY_credit, SL_MY_recive, SL_MY_rec_vat, ISNULL(SL_NM_custom, '[비고객]') AS customer_name FROM SALE_MAIN WHERE SL_NO_order = :transaction_id """) sale_main = session.execute(sale_main_query, {'transaction_id': transaction_id}).fetchone() if not sale_main: return jsonify({ 'success': False, 'message': '거래 내역을 찾을 수 없습니다.' }), 404 # SALE_SUB 조회 (판매 상품 상세) sale_sub_query = text(""" SELECT S.DrugCode, ISNULL(G.GoodsName, '(약품명 없음)') AS goods_name, S.SL_NM_item AS quantity, S.SL_NM_cost_a AS price, S.SL_TOTAL_PRICE AS total FROM SALE_SUB S LEFT JOIN PM_DRUG.dbo.CD_GOODS G ON S.DrugCode = G.DrugCode WHERE S.SL_NO_order = :transaction_id ORDER BY S.DrugCode """) sale_items = session.execute(sale_sub_query, {'transaction_id': transaction_id}).fetchall() # 결과를 JSON으로 반환 result = { 'success': True, 'transaction': { 'id': sale_main.SL_NO_order, 'date': str(sale_main.InsertTime), 'total_amount': int(sale_main.SL_MY_total or 0), 'discount': int(sale_main.SL_MY_discount or 0), 'sale_amount': int(sale_main.SL_MY_sale or 0), 'credit': int(sale_main.SL_MY_credit or 0), 'supply_value': int(sale_main.SL_MY_recive or 0), 'vat': int(sale_main.SL_MY_rec_vat or 0), 'customer_name': sale_main.customer_name }, 'items': [ { 'code': item.DrugCode, 'name': item.goods_name, 'qty': int(item.quantity or 0), 'price': int(item.price or 0), 'total': int(item.total or 0) } for item in sale_items ] } return jsonify(result) except Exception as e: return jsonify({ 'success': False, 'message': f'조회 실패: {str(e)}' }), 500 @app.route('/admin/user/') def admin_user_detail(user_id): """사용자 상세 이력 조회 - 구매 이력, 적립 이력, 구매 품목""" try: # 1. SQLite 연결 conn = db_manager.get_sqlite_connection() cursor = conn.cursor() # 2. 사용자 기본 정보 조회 cursor.execute(""" SELECT id, nickname, phone, mileage_balance, created_at FROM users WHERE id = ? """, (user_id,)) user = cursor.fetchone() if not user: return jsonify({ 'success': False, 'message': '사용자를 찾을 수 없습니다.' }), 404 # 3. 마일리지 이력 조회 (최근 50건) cursor.execute(""" SELECT transaction_id, points, balance_after, reason, description, created_at FROM mileage_ledger WHERE user_id = ? ORDER BY created_at DESC LIMIT 50 """, (user_id,)) mileage_history = cursor.fetchall() # 4. 구매 이력 조회 (적립된 거래만, 최근 20건) cursor.execute(""" SELECT transaction_id, total_amount, claimable_points, claimed_at FROM claim_tokens WHERE claimed_by_user_id = ? ORDER BY claimed_at DESC LIMIT 20 """, (user_id,)) claimed_tokens = cursor.fetchall() # 5. 각 거래의 상품 상세 조회 (MSSQL) purchases = [] try: session = db_manager.get_session('PM_PRES') for token in claimed_tokens: transaction_id = token['transaction_id'] # SALE_MAIN에서 거래 시간 조회 sale_main_query = text(""" SELECT InsertTime FROM SALE_MAIN WHERE SL_NO_order = :transaction_id """) sale_main = session.execute( sale_main_query, {'transaction_id': transaction_id} ).fetchone() # 거래 시간 추출 (MSSQL의 실제 거래 시간) if sale_main and sale_main.InsertTime: transaction_date = str(sale_main.InsertTime)[:16].replace('T', ' ') else: transaction_date = '-' # SALE_SUB + CD_GOODS JOIN (BARCODE 추가) sale_items_query = text(""" SELECT S.BARCODE, S.DrugCode, ISNULL(G.GoodsName, '(약품명 없음)') AS goods_name, S.SL_NM_item AS quantity, S.SL_NM_cost_a AS price, S.SL_TOTAL_PRICE AS total FROM SALE_SUB S LEFT JOIN PM_DRUG.dbo.CD_GOODS G ON S.DrugCode = G.DrugCode WHERE S.SL_NO_order = :transaction_id ORDER BY S.DrugCode """) items_raw = session.execute( sale_items_query, {'transaction_id': transaction_id} ).fetchall() # 상품 리스트 변환 (카테고리 포함) items = [] for item in items_raw: barcode = item.BARCODE # SQLite에서 제품 카테고리 조회 categories = [] if barcode: cursor.execute(""" SELECT category_name, relevance_score FROM product_category_mapping WHERE barcode = ? ORDER BY relevance_score DESC """, (barcode,)) for cat_row in cursor.fetchall(): categories.append({ 'name': cat_row[0], 'score': cat_row[1] }) items.append({ 'code': item.DrugCode, 'barcode': barcode, 'name': item.goods_name, 'qty': int(item.quantity or 0), 'price': int(item.price or 0), 'total': int(item.total or 0), 'categories': categories }) # 상품 요약 생성 ("첫번째상품명 외 N개") if items: first_item_name = items[0]['name'] items_count = len(items) if items_count == 1: items_summary = first_item_name else: items_summary = f"{first_item_name} 외 {items_count - 1}개" else: items_summary = "상품 정보 없음" items_count = 0 purchases.append({ 'transaction_id': transaction_id, 'date': transaction_date, # MSSQL의 실제 거래 시간 사용 'amount': int(token['total_amount']), 'points': int(token['claimable_points']), 'items_summary': items_summary, 'items_count': items_count, 'items': items }) except Exception as mssql_error: # MSSQL 연결 실패 시 빈 배열 반환 print(f"[WARNING] MSSQL 조회 실패 (user {user_id}): {mssql_error}") purchases = [] # 6. 응답 생성 return jsonify({ 'success': True, 'user': { 'id': user['id'], 'name': user['nickname'], 'phone': user['phone'], 'balance': user['mileage_balance'], 'created_at': utc_to_kst_str(user['created_at']) }, 'mileage_history': [ { 'points': ml['points'], 'balance_after': ml['balance_after'], 'reason': ml['reason'], 'description': ml['description'], 'created_at': utc_to_kst_str(ml['created_at']), 'transaction_id': ml['transaction_id'] } for ml in mileage_history ], 'purchases': purchases }) except Exception as e: return jsonify({ 'success': False, 'message': f'조회 실패: {str(e)}' }), 500 @app.route('/admin/search/user') def admin_search_user(): """사용자 검색 (이름/전화번호/전화번호 뒷자리)""" query = request.args.get('q', '').strip() search_type = request.args.get('type', 'name') # 'name', 'phone', 'phone_last' if not query: return jsonify({'success': False, 'message': '검색어를 입력하세요'}), 400 conn = db_manager.get_sqlite_connection() cursor = conn.cursor() try: if search_type == 'phone_last': # 전화번호 뒷자리 검색 cursor.execute(""" SELECT id, nickname, phone, mileage_balance FROM users WHERE phone LIKE ? ORDER BY created_at DESC """, (f'%{query}',)) elif search_type == 'phone': # 전체 전화번호 검색 cursor.execute(""" SELECT id, nickname, phone, mileage_balance FROM users WHERE phone = ? """, (query,)) else: # 이름 검색 cursor.execute(""" SELECT id, nickname, phone, mileage_balance FROM users WHERE nickname LIKE ? ORDER BY created_at DESC """, (f'%{query}%',)) results = cursor.fetchall() if not results: return jsonify({'success': False, 'message': '검색 결과가 없습니다'}), 404 if len(results) == 1: # 단일 매칭 - user_id만 반환 return jsonify({ 'success': True, 'multiple': False, 'user_id': results[0]['id'] }) else: # 여러 명 매칭 - 선택 모달용 데이터 반환 users = [{ 'id': row['id'], 'name': row['nickname'], 'phone': row['phone'], 'balance': row['mileage_balance'] } for row in results] return jsonify({ 'success': True, 'multiple': True, 'users': users }) except Exception as e: return jsonify({ 'success': False, 'message': f'검색 실패: {str(e)}' }), 500 @app.route('/admin/search/product') def admin_search_product(): """제품 검색 - 적립자 목록 반환 (SQLite 적립자 기준)""" query = request.args.get('q', '').strip() if not query: return jsonify({'success': False, 'message': '검색어를 입력하세요'}), 400 conn = db_manager.get_sqlite_connection() cursor = conn.cursor() try: # 1. MSSQL에서 제품명으로 거래번호 찾기 session = db_manager.get_session('PM_PRES') sale_items_query = text(""" SELECT DISTINCT S.SL_NO_order, S.SL_NM_item, M.InsertTime FROM SALE_SUB S LEFT JOIN PM_DRUG.dbo.CD_GOODS G ON S.DrugCode = G.DrugCode LEFT JOIN SALE_MAIN M ON S.SL_NO_order = M.SL_NO_order WHERE G.GoodsName LIKE :product_name ORDER BY M.InsertTime DESC """) sale_results = session.execute(sale_items_query, { 'product_name': f'%{query}%' }).fetchall() if not sale_results: return jsonify({ 'success': True, 'results': [] }) # 2. SQLite에서 적립된 거래만 필터링 (claimed_by_user_id IS NOT NULL) transaction_ids = [row.SL_NO_order for row in sale_results] placeholders = ','.join('?' * len(transaction_ids)) cursor.execute(f""" SELECT ct.transaction_id, ct.total_amount, ct.claimed_at, ct.claimed_by_user_id, u.nickname, u.phone FROM claim_tokens ct JOIN users u ON ct.claimed_by_user_id = u.id WHERE ct.transaction_id IN ({placeholders}) AND ct.claimed_by_user_id IS NOT NULL ORDER BY ct.claimed_at DESC LIMIT 50 """, transaction_ids) claimed_results = cursor.fetchall() # 3. 결과 조합 results = [] for claim_row in claimed_results: # 해당 거래의 MSSQL 정보 찾기 mssql_row = next((r for r in sale_results if r.SL_NO_order == claim_row['transaction_id']), None) if mssql_row: results.append({ 'user_id': claim_row['claimed_by_user_id'], 'user_name': claim_row['nickname'], 'user_phone': claim_row['phone'], 'purchase_date': str(mssql_row.InsertTime)[:16].replace('T', ' ') if mssql_row.InsertTime else '-', # MSSQL 실제 거래 시간 'claimed_date': str(claim_row['claimed_at'])[:16].replace('T', ' ') if claim_row['claimed_at'] else '-', # 적립 시간 'quantity': float(mssql_row.SL_NM_item or 0), 'total_amount': int(claim_row['total_amount']) }) return jsonify({ 'success': True, 'results': results }) except Exception as e: return jsonify({ 'success': False, 'message': f'검색 실패: {str(e)}' }), 500 @app.route('/admin/ai-analyze-user/', methods=['POST']) def admin_ai_analyze_user(user_id): """OpenAI GPT를 사용한 사용자 구매 패턴 AI 분석""" try: # OpenAI 사용 가능 여부 확인 if not OPENAI_AVAILABLE: return jsonify({ 'success': False, 'message': ERROR_MESSAGES['OPENAI_NOT_AVAILABLE'] }), 503 # 1. SQLite 연결 conn = db_manager.get_sqlite_connection() cursor = conn.cursor() # 2. 사용자 기본 정보 조회 cursor.execute(""" SELECT id, nickname, phone, mileage_balance, created_at FROM users WHERE id = ? """, (user_id,)) user_row = cursor.fetchone() if not user_row: return jsonify({ 'success': False, 'message': ERROR_MESSAGES['NO_USER'] }), 404 user = { 'id': user_row['id'], 'nickname': user_row['nickname'], 'phone': user_row['phone'], 'mileage_balance': user_row['mileage_balance'], 'created_at': user_row['created_at'] } # 3. 구매 이력 조회 (최근 20건) cursor.execute(""" SELECT transaction_id, total_amount, claimable_points, claimed_at FROM claim_tokens WHERE claimed_by_user_id = ? ORDER BY claimed_at DESC LIMIT 20 """, (user_id,)) claimed_tokens = cursor.fetchall() if not claimed_tokens: return jsonify({ 'success': False, 'message': ERROR_MESSAGES['NO_PURCHASES'] }), 400 # 4. MSSQL에서 상품 상세 조회 purchases = [] session = db_manager.get_session('PM_PRES') for token in claimed_tokens: transaction_id = token['transaction_id'] # SALE_MAIN에서 거래 시간 조회 sale_main_query = text(""" SELECT InsertTime FROM SALE_MAIN WHERE SL_NO_order = :transaction_id """) sale_main = session.execute(sale_main_query, {'transaction_id': transaction_id}).fetchone() # SALE_SUB + CD_GOODS JOIN sale_items_query = text(""" SELECT S.DrugCode, ISNULL(G.GoodsName, '(약품명 없음)') AS goods_name, S.SL_NM_item AS quantity, S.SL_NM_cost_a AS price, S.SL_TOTAL_PRICE AS total FROM SALE_SUB S LEFT JOIN PM_DRUG.dbo.CD_GOODS G ON S.DrugCode = G.DrugCode WHERE S.SL_NO_order = :transaction_id ORDER BY S.DrugCode """) items_raw = session.execute(sale_items_query, {'transaction_id': transaction_id}).fetchall() items = [{ 'name': item.goods_name, 'qty': int(item.quantity or 0), 'price': int(item.price or 0) } for item in items_raw] purchases.append({ 'date': str(sale_main.InsertTime)[:16].replace('T', ' ') if sale_main and sale_main.InsertTime else '-', 'amount': int(token['total_amount']), 'points': int(token['claimable_points']), 'items': items }) # 5. OpenAI API 호출용 프롬프트 생성 prompt = prepare_analysis_prompt(user, purchases) # 6. OpenAI API 호출 logging.info(f"AI 분석 시작: 사용자 ID {user_id}") success, response = call_openai_with_retry(prompt) if not success: # response에는 에러 메시지가 담겨 있음 return jsonify({ 'success': False, 'message': response }), 500 # 7. 응답 파싱 response_text = response.choices[0].message.content analysis = parse_openai_response(response_text) logging.info(f"AI 분석 완료: 사용자 ID {user_id}, 토큰: {response.usage.total_tokens}") # 8. 결과 반환 return jsonify({ 'success': True, 'user': { 'id': user['id'], 'name': user['nickname'], 'phone': user['phone'], 'balance': user['mileage_balance'] }, 'analysis': analysis, 'metadata': { 'model_used': OPENAI_MODEL, 'tokens_used': response.usage.total_tokens, 'analysis_time': datetime.now(KST).strftime('%Y-%m-%d %H:%M:%S') } }) except Exception as e: logging.error(f"AI 분석 오류: {e}") return jsonify({ 'success': False, 'message': ERROR_MESSAGES['UNKNOWN_ERROR'] }), 500 @app.route('/admin/use-points', methods=['POST']) def admin_use_points(): """관리자 페이지에서 포인트 사용 (차감)""" try: data = request.get_json() user_id = data.get('user_id') points_to_use = data.get('points') if not user_id or not points_to_use: return jsonify({ 'success': False, 'message': '사용자 ID와 포인트를 입력하세요.' }), 400 if points_to_use <= 0: return jsonify({ 'success': False, 'message': '1 이상의 포인트를 입력하세요.' }), 400 conn = db_manager.get_sqlite_connection() cursor = conn.cursor() # 1. 현재 사용자 잔액 확인 cursor.execute(""" SELECT id, nickname, mileage_balance FROM users WHERE id = ? """, (user_id,)) user = cursor.fetchone() if not user: return jsonify({ 'success': False, 'message': '사용자를 찾을 수 없습니다.' }), 404 current_balance = user['mileage_balance'] # 2. 잔액 확인 if points_to_use > current_balance: return jsonify({ 'success': False, 'message': f'잔액({current_balance:,}P)이 부족합니다.' }), 400 # 3. 포인트 차감 new_balance = current_balance - points_to_use cursor.execute(""" UPDATE users SET mileage_balance = ? WHERE id = ? """, (new_balance, user_id)) # 4. 마일리지 원장 기록 cursor.execute(""" INSERT INTO mileage_ledger (user_id, transaction_id, points, balance_after, reason, description) VALUES (?, NULL, ?, ?, 'USE', ?) """, ( user_id, -points_to_use, # 음수로 저장 new_balance, f'포인트 사용 (관리자) - {points_to_use:,}P 차감' )) conn.commit() logging.info(f"포인트 사용 완료: 사용자 ID {user_id}, 차감 {points_to_use}P, 잔액 {new_balance}P") return jsonify({ 'success': True, 'message': f'{points_to_use:,}P 사용 완료', 'new_balance': new_balance, 'used_points': points_to_use }) except Exception as e: logging.error(f"포인트 사용 실패: {e}") return jsonify({ 'success': False, 'message': f'포인트 사용 실패: {str(e)}' }), 500 @app.route('/admin') def admin(): """관리자 페이지 - 전체 사용자 및 적립 현황""" conn = db_manager.get_sqlite_connection() cursor = conn.cursor() # 전체 통계 cursor.execute(""" SELECT COUNT(*) as total_users, SUM(mileage_balance) as total_balance FROM users """) stats = cursor.fetchone() # 최근 가입 사용자 (20명) cursor.execute(""" SELECT id, nickname, phone, mileage_balance, created_at FROM users ORDER BY created_at DESC LIMIT 20 """) recent_users_raw = cursor.fetchall() # 시간을 KST로 변환 recent_users = [] for user in recent_users_raw: user_dict = dict(user) user_dict['created_at'] = utc_to_kst_str(user['created_at']) recent_users.append(user_dict) # 최근 적립 내역 (50건) cursor.execute(""" SELECT ml.id, u.nickname, u.phone, ml.points, ml.balance_after, ml.reason, ml.description, ml.created_at, ml.transaction_id FROM mileage_ledger ml JOIN users u ON ml.user_id = u.id ORDER BY ml.created_at DESC LIMIT 50 """) recent_transactions_raw = cursor.fetchall() # 시간을 KST로 변환 recent_transactions = [] for trans in recent_transactions_raw: trans_dict = dict(trans) trans_dict['created_at'] = utc_to_kst_str(trans['created_at']) recent_transactions.append(trans_dict) # QR 토큰 통계 cursor.execute(""" SELECT COUNT(*) as total_tokens, SUM(CASE WHEN claimed_at IS NOT NULL THEN 1 ELSE 0 END) as claimed_count, SUM(CASE WHEN claimed_at IS NULL THEN 1 ELSE 0 END) as unclaimed_count, SUM(claimable_points) as total_points_issued, SUM(CASE WHEN claimed_at IS NOT NULL THEN claimable_points ELSE 0 END) as total_points_claimed FROM claim_tokens """) token_stats = cursor.fetchone() # 최근 QR 발행 내역 (20건) cursor.execute(""" SELECT transaction_id, total_amount, claimable_points, claimed_at, claimed_by_user_id, created_at FROM claim_tokens ORDER BY created_at DESC LIMIT 20 """) recent_tokens_raw = cursor.fetchall() # Convert only created_at (발행일), leave claimed_at (적립일) unconverted recent_tokens = [] for token in recent_tokens_raw: token_dict = dict(token) token_dict['created_at'] = utc_to_kst_str(token['created_at']) # Convert 발행일 # claimed_at stays as-is (or remains None if not claimed) recent_tokens.append(token_dict) return render_template('admin.html', stats=stats, recent_users=recent_users, recent_transactions=recent_transactions, token_stats=token_stats, recent_tokens=recent_tokens) # ============================================================================ # AI 업셀링 추천 # ============================================================================ def _get_available_products(): """약국 보유 제품 목록 (최근 30일 판매 실적 기준 TOP 40)""" try: mssql_session = db_manager.get_session('PM_PRES') rows = mssql_session.execute(text(""" SELECT TOP 40 ISNULL(G.GoodsName, '') AS name, COUNT(*) as sales, MAX(G.Saleprice) as price FROM SALE_SUB S LEFT JOIN PM_DRUG.dbo.CD_GOODS G ON S.DrugCode = G.DrugCode WHERE S.SL_DT_appl >= CONVERT(VARCHAR(8), DATEADD(DAY, -30, GETDATE()), 112) AND G.GoodsName IS NOT NULL AND G.GoodsName NOT LIKE N'%(판매중지)%' GROUP BY G.GoodsName ORDER BY COUNT(*) DESC """)).fetchall() return [{'name': r.name, 'price': float(r.price or 0), 'sales': r.sales} for r in rows] except Exception as e: logging.warning(f"[AI추천] 보유 제품 목록 조회 실패: {e}") return [] def _generate_upsell_recommendation(user_id, transaction_id, sale_items, user_name): """키오스크 적립 후 AI 업셀링 추천 생성 (fire-and-forget)""" from services.clawdbot_client import generate_upsell, generate_upsell_real if not sale_items: return # 현재 구매 품목 current_items = ', '.join(item['name'] for item in sale_items if item.get('name')) if not current_items: return # 최근 구매 이력 수집 recent_products = current_items # 기본값 try: conn = db_manager.get_sqlite_connection() cursor = conn.cursor() cursor.execute(""" SELECT ct.transaction_id FROM claim_tokens ct WHERE ct.claimed_by_user_id = ? AND ct.transaction_id != ? ORDER BY ct.claimed_at DESC LIMIT 5 """, (user_id, transaction_id)) recent_tokens = cursor.fetchall() if recent_tokens: all_products = [] mssql_session = db_manager.get_session('PM_PRES') for token in recent_tokens: rows = mssql_session.execute(text(""" SELECT ISNULL(G.GoodsName, '') AS goods_name FROM SALE_SUB S LEFT JOIN PM_DRUG.dbo.CD_GOODS G ON S.DrugCode = G.DrugCode WHERE S.SL_NO_order = :tid """), {'tid': token['transaction_id']}).fetchall() for r in rows: if r.goods_name: all_products.append(r.goods_name) if all_products: recent_products = ', '.join(set(all_products)) except Exception as e: logging.warning(f"[AI추천] 구매 이력 수집 실패 (현재 품목만 사용): {e}") # 실데이터 기반 추천 (보유 제품 목록 제공) available = _get_available_products() rec = None if available: logging.info(f"[AI추천] 실데이터 생성 시작: user={user_name}, items={current_items}, 보유제품={len(available)}개") rec = generate_upsell_real(user_name, current_items, recent_products, available) # 실데이터 실패 시 기존 방식 fallback if not rec: logging.info(f"[AI추천] 자유 생성 fallback: user={user_name}") rec = generate_upsell(user_name, current_items, recent_products) if not rec: logging.warning("[AI추천] 생성 실패 (AI 응답 없음)") return # SQLite에 저장 try: conn = db_manager.get_sqlite_connection() cursor = conn.cursor() expires_at = (datetime.now() + timedelta(days=7)).strftime('%Y-%m-%d %H:%M:%S') cursor.execute(""" INSERT INTO ai_recommendations (user_id, transaction_id, recommended_product, recommendation_message, recommendation_reason, trigger_products, ai_raw_response, expires_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( user_id, transaction_id, rec['product'], rec['message'], rec['reason'], json.dumps([item['name'] for item in sale_items], ensure_ascii=False), json.dumps(rec, ensure_ascii=False), expires_at )) conn.commit() logging.info(f"[AI추천] 저장 완료: user_id={user_id}, product={rec['product']}") except Exception as e: logging.warning(f"[AI추천] DB 저장 실패: {e}") @app.route('/api/recommendation/') def api_get_recommendation(user_id): """마이페이지용 AI 추천 조회""" conn = db_manager.get_sqlite_connection() cursor = conn.cursor() now = datetime.now().strftime('%Y-%m-%d %H:%M:%S') cursor.execute(""" SELECT id, recommended_product, recommendation_message, created_at FROM ai_recommendations WHERE user_id = ? AND status = 'active' AND (expires_at IS NULL OR expires_at > ?) ORDER BY created_at DESC LIMIT 1 """, (user_id, now)) rec = cursor.fetchone() if not rec: return jsonify({'success': True, 'has_recommendation': False}) # 표시 횟수 업데이트 cursor.execute(""" UPDATE ai_recommendations SET displayed_count = displayed_count + 1, displayed_at = COALESCE(displayed_at, ?) WHERE id = ? """, (now, rec['id'])) conn.commit() return jsonify({ 'success': True, 'has_recommendation': True, 'recommendation': { 'id': rec['id'], 'product': rec['recommended_product'], 'message': rec['recommendation_message'] } }) @app.route('/api/recommendation//dismiss', methods=['POST']) def api_dismiss_recommendation(rec_id): """추천 닫기 / 관심 표시""" data = request.get_json(silent=True) or {} action = data.get('action', 'dismissed') if action not in ('dismissed', 'interested'): action = 'dismissed' conn = db_manager.get_sqlite_connection() cursor = conn.cursor() now = datetime.now().strftime('%Y-%m-%d %H:%M:%S') cursor.execute(""" UPDATE ai_recommendations SET status = ?, dismissed_at = ? WHERE id = ? """, (action, now, rec_id)) conn.commit() logging.info(f"[AI추천] id={rec_id} → {action}") return jsonify({'success': True}) # ============================================================================ # 알림톡 로그 # ============================================================================ @app.route('/admin/alimtalk') def admin_alimtalk(): """알림톡 발송 로그 + NHN 발송 내역""" conn = db_manager.get_sqlite_connection() cursor = conn.cursor() # 로컬 발송 로그 (최근 50건) cursor.execute(""" SELECT a.*, u.nickname, u.phone as user_phone FROM alimtalk_logs a LEFT JOIN users u ON a.user_id = u.id ORDER BY a.created_at DESC LIMIT 50 """) local_logs = [dict(row) for row in cursor.fetchall()] # 통계 cursor.execute(""" SELECT COUNT(*) as total, SUM(CASE WHEN success = 1 THEN 1 ELSE 0 END) as success_count, SUM(CASE WHEN success = 0 THEN 1 ELSE 0 END) as fail_count FROM alimtalk_logs """) stats = dict(cursor.fetchone()) # 오늘 통계 cursor.execute(""" SELECT COUNT(*) as today_total, SUM(CASE WHEN success = 1 THEN 1 ELSE 0 END) as today_success FROM alimtalk_logs WHERE date(created_at) = date('now') """) today = dict(cursor.fetchone()) stats.update(today) return render_template('admin_alimtalk.html', local_logs=local_logs, stats=stats) @app.route('/admin/ai-crm') def admin_ai_crm(): """AI 업셀링 CRM 대시보드""" conn = db_manager.get_sqlite_connection() cursor = conn.cursor() # 추천 목록 (최근 50건) + 사용자 정보 JOIN cursor.execute(""" SELECT r.*, u.nickname, u.phone as user_phone FROM ai_recommendations r LEFT JOIN users u ON r.user_id = u.id ORDER BY r.created_at DESC LIMIT 50 """) recs = [dict(row) for row in cursor.fetchall()] # trigger_products JSON 파싱 for rec in recs: tp = rec.get('trigger_products') if tp: try: rec['trigger_list'] = json.loads(tp) except Exception: rec['trigger_list'] = [tp] else: rec['trigger_list'] = [] # 통계 cursor.execute(""" SELECT COUNT(*) as total, SUM(CASE WHEN status = 'active' AND (expires_at IS NULL OR expires_at > datetime('now')) THEN 1 ELSE 0 END) as active_count, SUM(CASE WHEN status = 'interested' THEN 1 ELSE 0 END) as interested_count, SUM(CASE WHEN status = 'dismissed' THEN 1 ELSE 0 END) as dismissed_count, SUM(CASE WHEN displayed_count > 0 THEN 1 ELSE 0 END) as displayed_count FROM ai_recommendations """) stats = dict(cursor.fetchone()) # 오늘 생성 건수 cursor.execute(""" SELECT COUNT(*) as today_count FROM ai_recommendations WHERE date(created_at) = date('now') """) stats['today_count'] = cursor.fetchone()['today_count'] now = datetime.now().strftime('%Y-%m-%d %H:%M:%S') return render_template('admin_ai_crm.html', recs=recs, stats=stats, now=now) @app.route('/api/admin/alimtalk/nhn-history') def api_admin_alimtalk_nhn_history(): """NHN Cloud 실제 발송 내역 API""" from services.nhn_alimtalk import get_nhn_send_history date_str = request.args.get('date', datetime.now().strftime('%Y-%m-%d')) start = f"{date_str} 00:00" end = f"{date_str} 23:59" messages = get_nhn_send_history(start, end) result = [] for m in messages: result.append({ 'requestDate': m.get('requestDate', ''), 'recipientNo': m.get('recipientNo', ''), 'templateCode': m.get('templateCode', ''), 'messageStatus': m.get('messageStatus', ''), 'resultCode': m.get('resultCode', ''), 'resultMessage': m.get('resultMessage', ''), 'content': m.get('content', ''), }) return jsonify({'success': True, 'messages': result}) @app.route('/api/admin/alimtalk/test-send', methods=['POST']) def api_admin_alimtalk_test_send(): """관리자 수동 알림톡 발송 테스트""" from services.nhn_alimtalk import send_mileage_claim_alimtalk data = request.get_json() phone = data.get('phone', '').strip().replace('-', '') name = data.get('name', '테스트') if len(phone) < 10: return jsonify({'success': False, 'message': '올바른 전화번호를 입력해주세요.'}), 400 success, msg = send_mileage_claim_alimtalk( phone, name, 100, 500, items=[{'name': '테스트 발송', 'qty': 1, 'total': 1000}], trigger_source='admin_test' ) return jsonify({'success': success, 'message': msg}) # ============================================================================ # 키오스크 적립 # ============================================================================ @app.route('/kiosk') def kiosk(): """키오스크 메인 페이지 (전체 화면 웹 UI)""" return render_template('kiosk.html') @app.route('/api/kiosk/trigger', methods=['POST']) def api_kiosk_trigger(): """ POS → 키오스크 세션 생성 POST /api/kiosk/trigger Body: {"transaction_id": "...", "amount": 50000} """ global kiosk_current_session data = request.get_json() transaction_id = data.get('transaction_id') amount = data.get('amount', 0) if not transaction_id: return jsonify({'success': False, 'message': 'transaction_id가 필요합니다.'}), 400 try: conn = db_manager.get_sqlite_connection() cursor = conn.cursor() # QR 토큰 존재 확인 cursor.execute("SELECT id, token_hash, claimable_points, claimed_at FROM claim_tokens WHERE transaction_id = ?", (transaction_id,)) token_row = cursor.fetchone() if token_row and token_row['claimed_at']: return jsonify({'success': False, 'message': '이미 적립된 거래입니다.'}), 400 if token_row: # 기존 토큰 사용 — QR URL은 새 nonce로 생성 # (verify_claim_token은 transaction_id로만 조회하므로 nonce 불일치 무관) claimable_points = token_row['claimable_points'] nonce = secrets.token_hex(6) from utils.qr_token_generator import QR_BASE_URL qr_url = f"{QR_BASE_URL}?t={transaction_id}:{nonce}" else: # 새 토큰 생성 from utils.qr_token_generator import generate_claim_token, save_token_to_db token_info = generate_claim_token(transaction_id, float(amount)) success, error = save_token_to_db( transaction_id, token_info['token_hash'], float(amount), token_info['claimable_points'], token_info['expires_at'], token_info['pharmacy_id'] ) if not success: return jsonify({'success': False, 'message': error}), 500 claimable_points = token_info['claimable_points'] qr_url = token_info['qr_url'] # MSSQL에서 구매 품목 조회 sale_items = [] try: mssql_session = db_manager.get_session('PM_PRES') sale_sub_query = text(""" SELECT ISNULL(G.GoodsName, '(약품명 없음)') AS goods_name, S.SL_NM_item AS quantity, S.SL_TOTAL_PRICE AS total FROM SALE_SUB S LEFT JOIN PM_DRUG.dbo.CD_GOODS G ON S.DrugCode = G.DrugCode WHERE S.SL_NO_order = :transaction_id ORDER BY S.DrugCode """) rows = mssql_session.execute(sale_sub_query, {'transaction_id': transaction_id}).fetchall() sale_items = [ {'name': r.goods_name, 'qty': int(r.quantity or 0), 'total': int(r.total or 0)} for r in rows ] except Exception as e: logging.warning(f"키오스크 품목 조회 실패 (transaction_id={transaction_id}): {e}") # 키오스크 세션 저장 kiosk_current_session = { 'transaction_id': transaction_id, 'amount': int(amount), 'points': claimable_points, 'qr_url': qr_url, 'items': sale_items, 'created_at': datetime.now(KST).isoformat() } return jsonify({ 'success': True, 'message': f'키오스크 적립 대기 ({claimable_points}P)', 'points': claimable_points }) except Exception as e: logging.error(f"키오스크 트리거 오류: {e}") return jsonify({'success': False, 'message': f'오류: {str(e)}'}), 500 @app.route('/api/kiosk/current') def api_kiosk_current(): """ 키오스크 폴링 - 현재 세션 조회 GET /api/kiosk/current """ global kiosk_current_session if kiosk_current_session is None: return jsonify({'active': False}) # 5분 경과 시 자동 만료 created = datetime.fromisoformat(kiosk_current_session['created_at']) if datetime.now(KST) - created > timedelta(minutes=5): kiosk_current_session = None return jsonify({'active': False}) return jsonify({ 'active': True, 'transaction_id': kiosk_current_session['transaction_id'], 'amount': kiosk_current_session['amount'], 'points': kiosk_current_session['points'], 'qr_url': kiosk_current_session.get('qr_url'), 'items': kiosk_current_session.get('items', []) }) @app.route('/api/kiosk/claim', methods=['POST']) def api_kiosk_claim(): """ 키오스크 전화번호 적립 POST /api/kiosk/claim Body: {"phone": "01012345678"} """ global kiosk_current_session if kiosk_current_session is None: return jsonify({'success': False, 'message': '적립 대기 중인 거래가 없습니다.'}), 400 data = request.get_json() phone = data.get('phone', '').strip().replace('-', '').replace(' ', '') if len(phone) < 10: return jsonify({'success': False, 'message': '올바른 전화번호를 입력해주세요.'}), 400 transaction_id = kiosk_current_session['transaction_id'] try: conn = db_manager.get_sqlite_connection() cursor = conn.cursor() # claim_tokens에서 nonce 조회를 위해 token_hash로 검증 cursor.execute(""" SELECT id, transaction_id, token_hash, total_amount, claimable_points, pharmacy_id, expires_at, claimed_at, claimed_by_user_id FROM claim_tokens WHERE transaction_id = ? """, (transaction_id,)) token_row = cursor.fetchone() if not token_row: return jsonify({'success': False, 'message': '토큰을 찾을 수 없습니다.'}), 400 if token_row['claimed_at']: kiosk_current_session = None return jsonify({'success': False, 'message': '이미 적립된 거래입니다.'}), 400 # 만료 확인 expires_at = datetime.strptime(token_row['expires_at'], '%Y-%m-%d %H:%M:%S') if datetime.now() > expires_at: kiosk_current_session = None return jsonify({'success': False, 'message': '만료된 거래입니다.'}), 400 # token_info 딕셔너리 구성 (claim_mileage 호환) token_info = { 'id': token_row['id'], 'transaction_id': token_row['transaction_id'], 'total_amount': token_row['total_amount'], 'claimable_points': token_row['claimable_points'] } # 사용자 조회/생성 user_id, is_new = get_or_create_user(phone, '고객') # 마일리지 적립 claim_success, claim_msg, new_balance = claim_mileage(user_id, token_info) if not claim_success: return jsonify({'success': False, 'message': claim_msg}), 500 # 키오스크 세션에서 품목 정보 캡처 후 클리어 claimed_points = token_info['claimable_points'] sale_items = kiosk_current_session.get('items', []) kiosk_current_session = None # 알림톡 발송 (fire-and-forget) try: from services.nhn_alimtalk import send_mileage_claim_alimtalk # 유저 이름 조회 (기존 유저는 실명이 있을 수 있음) cursor.execute("SELECT nickname FROM users WHERE id = ?", (user_id,)) user_row = cursor.fetchone() user_name = user_row['nickname'] if user_row else '고객' logging.warning(f"[알림톡] 발송 시도: phone={phone}, name={user_name}, points={claimed_points}, balance={new_balance}, items={sale_items}") success, msg = send_mileage_claim_alimtalk( phone, user_name, claimed_points, new_balance, items=sale_items, user_id=user_id, trigger_source='kiosk', transaction_id=transaction_id ) logging.warning(f"[알림톡] 발송 결과: success={success}, msg={msg}") except Exception as alimtalk_err: logging.warning(f"[알림톡] 발송 예외 (적립은 완료): {alimtalk_err}") # AI 업셀링 추천 생성 (별도 스레드 — 적립 응답 블로킹 방지) import threading def _bg_upsell(): try: _generate_upsell_recommendation(user_id, transaction_id, sale_items, user_name) except Exception as rec_err: logging.warning(f"[AI추천] 생성 예외 (적립은 완료): {rec_err}") threading.Thread(target=_bg_upsell, daemon=True).start() return jsonify({ 'success': True, 'message': f'{claimed_points}P 적립 완료!', 'points': claimed_points, 'balance': new_balance, 'is_new_user': is_new }) except Exception as e: logging.error(f"키오스크 적립 오류: {e}") return jsonify({'success': False, 'message': f'오류: {str(e)}'}), 500 if __name__ == '__main__': # 개발 모드로 실행 app.run(host='0.0.0.0', port=7001, debug=True)