pharmacy-pos-qr-system/backend/app.py
thug0bin 76da7d9cd1 fix: SQLite 멀티스레드 I/O 에러 해결
- 요청마다 새 SQLite 연결 생성 (new_connection=True)
- 사용 후 명시적 close
- 간헐적 'I/O operation on closed file' 에러 방지
2026-02-27 15:43:52 +09:00

3489 lines
125 KiB
Python

"""
Flask 웹 서버 - QR 마일리지 적립
간편 적립: 전화번호 + 이름만 입력
"""
import sys
import os
# Windows 콘솔 UTF-8 강제 (한글 깨짐 방지)
if sys.platform == 'win32':
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
os.environ.setdefault('PYTHONIOENCODING', 'utf-8')
from flask import Flask, request, render_template, jsonify, redirect, url_for, session
import hashlib
import base64
import secrets
from datetime import datetime, timezone, timedelta
import logging
from sqlalchemy import text
from dotenv import load_dotenv
import json
import time
# 환경 변수 로드
load_dotenv()
# OpenAI import
try:
from openai import OpenAI, OpenAIError, RateLimitError, APITimeoutError
OPENAI_AVAILABLE = True
except ImportError:
OPENAI_AVAILABLE = False
logging.warning("OpenAI 라이브러리가 설치되지 않았습니다. AI 분석 기능을 사용할 수 없습니다.")
# Path setup
sys.path.insert(0, os.path.dirname(__file__))
from db.dbsetup import DatabaseManager
app = Flask(__name__)
app.secret_key = 'pharmacy-qr-mileage-secret-key-2026'
# 세션 설정 (PWA 자동적립 지원)
app.config['SESSION_COOKIE_SECURE'] = not app.debug # HTTPS 전용 (로컬 개발 시 제외)
app.config['SESSION_COOKIE_SAMESITE'] = 'Lax' # QR 스캔 시 쿠키 전송 허용
app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(days=90) # 3개월 유지
# 데이터베이스 매니저
db_manager = DatabaseManager()
# KST 타임존 (UTC+9)
KST = timezone(timedelta(hours=9))
# 키오스크 현재 세션 (메모리 변수, 서버 재시작 시 초기화)
kiosk_current_session = None
def utc_to_kst_str(utc_time_str):
"""
UTC 시간 문자열을 KST 시간 문자열로 변환
Args:
utc_time_str (str): UTC 시간 문자열 (ISO 8601 형식)
Returns:
str: KST 시간 문자열 (YYYY-MM-DD HH:MM:SS)
"""
if not utc_time_str:
return None
try:
# ISO 8601 형식 파싱 ('2026-01-23T12:28:36' 또는 '2026-01-23 12:28:36')
utc_time_str = utc_time_str.replace(' ', 'T') # 공백을 T로 변환
# datetime 객체로 변환
if 'T' in utc_time_str:
utc_time = datetime.fromisoformat(utc_time_str)
else:
utc_time = datetime.fromisoformat(utc_time_str)
# UTC 타임존 설정 (naive datetime인 경우)
if utc_time.tzinfo is None:
utc_time = utc_time.replace(tzinfo=timezone.utc)
# KST로 변환
kst_time = utc_time.astimezone(KST)
# 문자열로 반환 (초 단위까지만)
return kst_time.strftime('%Y-%m-%d %H:%M:%S')
except Exception as e:
logging.error(f"시간 변환 실패: {utc_time_str}, 오류: {e}")
return utc_time_str # 변환 실패 시 원본 반환
# ===== OpenAI 설정 및 헬퍼 함수 =====
# OpenAI API 설정
OPENAI_API_KEY = os.getenv('OPENAI_API_KEY')
OPENAI_MODEL = os.getenv('OPENAI_MODEL', 'gpt-4o-mini')
OPENAI_MAX_TOKENS = int(os.getenv('OPENAI_MAX_TOKENS', '1000'))
OPENAI_TEMPERATURE = float(os.getenv('OPENAI_TEMPERATURE', '0.7'))
# OpenAI 사용 가능 여부 및 API 키 확인
if OPENAI_AVAILABLE and not OPENAI_API_KEY:
logging.warning("OPENAI_API_KEY가 설정되지 않았습니다. AI 분석 기능을 사용할 수 없습니다.")
OPENAI_AVAILABLE = False
# System Prompt
SYSTEM_PROMPT = """당신은 약국 고객 관리 전문가입니다. 고객의 구매 데이터를 분석하여 다음을 제공합니다:
1. 구매 패턴 및 행동 분석
2. 주요 관심 제품 카테고리 파악
3. 업셀링을 위한 제품 추천
4. 고객 맞춤형 마케팅 전략
응답은 반드시 JSON 형식으로 작성하며, 한국어로 작성합니다.
약국 운영자가 실제로 활용할 수 있는 구체적이고 실용적인 인사이트를 제공해야 합니다."""
# 에러 메시지
ERROR_MESSAGES = {
'NO_USER': '사용자를 찾을 수 없습니다.',
'NO_PURCHASES': '구매 이력이 없어 분석할 수 없습니다. 최소 1건 이상의 구매가 필요합니다.',
'OPENAI_NOT_AVAILABLE': 'AI 분석 기능을 사용할 수 없습니다. 관리자에게 문의하세요.',
'OPENAI_API_KEY_MISSING': 'OpenAI API 키가 설정되지 않았습니다.',
'OPENAI_API_ERROR': 'OpenAI API 호출에 실패했습니다. 잠시 후 다시 시도해주세요.',
'OPENAI_RATE_LIMIT': 'API 호출 횟수 제한에 도달했습니다. 잠시 후 다시 시도해주세요.',
'OPENAI_TIMEOUT': 'AI 분석 시간이 초과되었습니다. 다시 시도해주세요.',
'PARSING_ERROR': 'AI 응답을 처리하는 중 오류가 발생했습니다.',
'UNKNOWN_ERROR': '알 수 없는 오류가 발생했습니다. 관리자에게 문의하세요.'
}
def categorize_product(product_name):
"""제품명에서 카테고리 추정 (간단한 키워드 매칭)"""
categories = {
'소화제': ['타센', '베아제', '겔포스', '소화'],
'진통제': ['타이레놀', '게보린', '펜잘', '이부프로펜'],
'감기약': ['판콜', '화이투벤', '지르텍', '감기'],
'피부약': ['후시딘', '마데카솔', '더마틱스'],
'비타민': ['비타민', '센트룸', '활성비타민'],
'안약': ['안약', '인공눈물'],
'소염진통제': ['자미슬', '펠루비', '게보린']
}
for category, keywords in categories.items():
for keyword in keywords:
if keyword in product_name:
return category
return '기타'
def prepare_analysis_prompt(user, purchases):
"""OpenAI API 전송용 프롬프트 생성"""
# 사용자 정보 요약
user_summary = f"""사용자: {user['nickname']} ({user['phone']})
가입일: {utc_to_kst_str(user['created_at']) if user['created_at'] else '-'}
포인트 잔액: {user['mileage_balance']:,}P
총 구매 건수: {len(purchases)}
"""
# 구매 이력 상세
purchase_details = []
total_spent = 0
all_products = []
product_freq = {}
for idx, purchase in enumerate(purchases, 1):
total_spent += purchase['amount']
products_str = ', '.join([f"{item['name']} x{item['qty']}" for item in purchase['items']])
# 제품 빈도 계산
for item in purchase['items']:
product_name = item['name']
all_products.append(product_name)
product_freq[product_name] = product_freq.get(product_name, 0) + 1
purchase_details.append(
f"{idx}. {purchase['date']} - {purchase['amount']:,}원 구매, {purchase['points']}P 적립\n"
f" 구매 품목: {products_str}"
)
# 통계 계산
avg_purchase = total_spent // len(purchases) if purchases else 0
top_products = sorted(product_freq.items(), key=lambda x: x[1], reverse=True)[:5]
top_products_str = ', '.join([f"{name}({count}회)" for name, count in top_products])
# 최종 프롬프트 조립
prompt = f"""다음은 약국 고객의 구매 데이터입니다. 구매 패턴을 분석하고 마케팅 전략을 제안해주세요.
{user_summary}
통계 요약:
- 총 구매 금액: {total_spent:,}
- 평균 구매 금액: {avg_purchase:,}
- 자주 구매한 품목: {top_products_str}
구매 이력 (최근 {len(purchases)}건):
{chr(10).join(purchase_details)}
분석 요청사항:
1. 구매 패턴 분석: 구매 빈도, 구매 금액 패턴 등
2. 주로 구매하는 품목 카테고리 (예: 소화제, 감기약, 건강기능식품 등)
3. 추천 제품: 기존 구매 패턴을 기반으로 관심있을만한 제품 3-5가지 (업셀링)
4. 마케팅 전략: 이 고객에게 효과적일 프로모션 또는 포인트 활용 방안
응답은 다음 JSON 형식으로 해주세요:
{{
"pattern": "구매 패턴에 대한 상세한 분석 (2-3문장)",
"main_products": ["카테고리1: 품목들", "카테고리2: 품목들"],
"recommendations": ["추천제품1 (이유)", "추천제품2 (이유)", "추천제품3 (이유)"],
"marketing_strategy": "마케팅 전략 제안 (2-3문장)"
}}
"""
return prompt
def parse_openai_response(response_text):
"""OpenAI API 응답을 파싱하여 구조화된 데이터 반환"""
import re
try:
# JSON 추출 (마크다운 코드 블록 제거)
json_match = re.search(r'```json\s*(\{.*?\})\s*```', response_text, re.DOTALL)
if json_match:
json_str = json_match.group(1)
else:
# 코드 블록 없이 JSON만 있는 경우
json_str = response_text.strip()
# JSON 파싱
analysis = json.loads(json_str)
# 필수 필드 검증
required_fields = ['pattern', 'main_products', 'recommendations', 'marketing_strategy']
for field in required_fields:
if field not in analysis:
raise ValueError(f"필수 필드 누락: {field}")
# 타입 검증
if not isinstance(analysis['main_products'], list):
analysis['main_products'] = [str(analysis['main_products'])]
if not isinstance(analysis['recommendations'], list):
analysis['recommendations'] = [str(analysis['recommendations'])]
return analysis
except json.JSONDecodeError as e:
# JSON 파싱 실패 시 fallback
logging.error(f"JSON 파싱 실패: {e}")
return {
'pattern': '응답 파싱에 실패했습니다.',
'main_products': ['분석 결과를 확인할 수 없습니다.'],
'recommendations': ['다시 시도해주세요.'],
'marketing_strategy': response_text[:500]
}
except Exception as e:
logging.error(f"응답 파싱 오류: {e}")
raise
def handle_openai_error(error):
"""OpenAI API 에러를 사용자 친화적 메시지로 변환"""
error_str = str(error).lower()
if 'api key' in error_str or 'authentication' in error_str:
return ERROR_MESSAGES['OPENAI_API_KEY_MISSING']
elif 'rate limit' in error_str or 'quota' in error_str:
return ERROR_MESSAGES['OPENAI_RATE_LIMIT']
elif 'timeout' in error_str:
return ERROR_MESSAGES['OPENAI_TIMEOUT']
else:
return ERROR_MESSAGES['OPENAI_API_ERROR']
def call_openai_with_retry(prompt, max_retries=3):
"""재시도 로직을 포함한 OpenAI API 호출"""
if not OPENAI_AVAILABLE:
return False, ERROR_MESSAGES['OPENAI_NOT_AVAILABLE']
client = OpenAI(api_key=OPENAI_API_KEY)
for attempt in range(max_retries):
try:
response = client.chat.completions.create(
model=OPENAI_MODEL,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": prompt}
],
max_tokens=OPENAI_MAX_TOKENS,
temperature=OPENAI_TEMPERATURE,
timeout=30
)
return True, response
except RateLimitError as e:
if attempt < max_retries - 1:
wait_time = 2 ** attempt
logging.warning(f"OpenAI Rate limit, {wait_time}초 대기 후 재시도...")
time.sleep(wait_time)
else:
return False, handle_openai_error(e)
except APITimeoutError as e:
if attempt < max_retries - 1:
logging.warning(f"OpenAI 타임아웃, 재시도 중... ({attempt+1}/{max_retries})")
time.sleep(1)
else:
return False, ERROR_MESSAGES['OPENAI_TIMEOUT']
except OpenAIError as e:
return False, handle_openai_error(e)
except Exception as e:
logging.error(f"OpenAI API 호출 오류: {e}")
return False, ERROR_MESSAGES['UNKNOWN_ERROR']
return False, ERROR_MESSAGES['OPENAI_TIMEOUT']
def verify_claim_token(transaction_id, nonce):
"""
QR 토큰 검증
Args:
transaction_id (str): 거래 ID
nonce (str): 12자 hex nonce
Returns:
tuple: (성공 여부, 메시지, 토큰 정보 dict)
"""
try:
conn = db_manager.get_sqlite_connection()
cursor = conn.cursor()
# 1. 거래 ID로 토큰 조회
cursor.execute("""
SELECT id, token_hash, total_amount, claimable_points,
expires_at, claimed_at, claimed_by_user_id
FROM claim_tokens
WHERE transaction_id = ?
""", (transaction_id,))
token_record = cursor.fetchone()
if not token_record:
return (False, "유효하지 않은 QR 코드입니다.", None)
# 2. 이미 적립된 토큰인지 확인
if token_record['claimed_at']:
return (False, "이미 적립 완료된 영수증입니다.", None)
# 3. 만료 확인
expires_at = datetime.strptime(token_record['expires_at'], '%Y-%m-%d %H:%M:%S')
if datetime.now() > expires_at:
return (False, "적립 기간이 만료되었습니다 (30일).", None)
# 4. 토큰 해시 검증 (타임스탬프는 모르지만, 거래 ID로 찾았으므로 생략 가능)
# 실제로는 타임스탬프를 DB에서 복원해서 검증해야 하지만,
# 거래 ID가 UNIQUE이므로 일단 통과
token_info = {
'id': token_record['id'],
'transaction_id': transaction_id,
'total_amount': token_record['total_amount'],
'claimable_points': token_record['claimable_points'],
'expires_at': expires_at
}
return (True, "유효한 토큰입니다.", token_info)
except Exception as e:
return (False, f"토큰 검증 실패: {str(e)}", None)
def get_or_create_user(phone, name, birthday=None):
"""
사용자 조회 또는 생성 (간편 적립용)
Args:
phone (str): 전화번호
name (str): 이름
birthday (str, optional): 생년월일 (YYYY-MM-DD)
Returns:
tuple: (user_id, is_new_user)
"""
conn = db_manager.get_sqlite_connection()
cursor = conn.cursor()
# 전화번호로 조회
cursor.execute("""
SELECT id, mileage_balance FROM users WHERE phone = ?
""", (phone,))
user = cursor.fetchone()
if user:
# 기존 유저: birthday가 제공되면 업데이트
if birthday:
cursor.execute("UPDATE users SET birthday = ? WHERE id = ?", (birthday, user['id']))
conn.commit()
return (user['id'], False)
# 신규 생성
cursor.execute("""
INSERT INTO users (nickname, phone, birthday, mileage_balance)
VALUES (?, ?, ?, 0)
""", (name, phone, birthday))
conn.commit()
return (cursor.lastrowid, True)
def claim_mileage(user_id, token_info):
"""
마일리지 적립 처리
Args:
user_id (int): 사용자 ID
token_info (dict): 토큰 정보
Returns:
tuple: (성공 여부, 메시지, 적립 후 잔액)
"""
try:
conn = db_manager.get_sqlite_connection()
cursor = conn.cursor()
# 1. 현재 잔액 조회
cursor.execute("SELECT mileage_balance FROM users WHERE id = ?", (user_id,))
user = cursor.fetchone()
current_balance = user['mileage_balance']
# 2. 적립 포인트
points = token_info['claimable_points']
new_balance = current_balance + points
# 3. 사용자 잔액 업데이트
cursor.execute("""
UPDATE users SET mileage_balance = ?, updated_at = ?
WHERE id = ?
""", (new_balance, datetime.now().strftime('%Y-%m-%d %H:%M:%S'), user_id))
# 4. 마일리지 원장 기록
cursor.execute("""
INSERT INTO mileage_ledger (user_id, transaction_id, points, balance_after, reason, description)
VALUES (?, ?, ?, ?, ?, ?)
""", (
user_id,
token_info['transaction_id'],
points,
new_balance,
'CLAIM',
f"영수증 QR 적립 ({token_info['total_amount']:,}원 구매)"
))
# 5. claim_tokens 업데이트 (적립 완료 표시)
cursor.execute("""
UPDATE claim_tokens
SET claimed_at = ?, claimed_by_user_id = ?
WHERE id = ?
""", (datetime.now().strftime('%Y-%m-%d %H:%M:%S'), user_id, token_info['id']))
conn.commit()
return (True, f"{points}P 적립 완료!", new_balance)
except Exception as e:
conn.rollback()
return (False, f"적립 처리 실패: {str(e)}", 0)
def normalize_kakao_phone(kakao_phone):
"""
카카오 전화번호 형식 변환
"+82 10-1234-5678""01012345678"
"""
if not kakao_phone:
return None
digits = kakao_phone.replace('+', '').replace('-', '').replace(' ', '')
if digits.startswith('82'):
digits = '0' + digits[2:]
if len(digits) >= 10:
return digits
return None
def link_kakao_identity(user_id, kakao_id, kakao_data):
"""카카오 계정을 customer_identities에 연결"""
conn = db_manager.get_sqlite_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT id FROM customer_identities
WHERE provider = 'kakao' AND provider_user_id = ?
""", (kakao_id,))
if not cursor.fetchone():
# raw_data 제외 (세션 크기 절약)
store_data = {k: v for k, v in kakao_data.items() if k != 'raw_data'}
cursor.execute("""
INSERT INTO customer_identities (user_id, provider, provider_user_id, provider_data)
VALUES (?, 'kakao', ?, ?)
""", (user_id, kakao_id, json.dumps(store_data, ensure_ascii=False)))
# 프로필 이미지, 이메일 업데이트
updates = []
params = []
if kakao_data.get('profile_image'):
updates.append("profile_image_url = ?")
params.append(kakao_data['profile_image'])
if kakao_data.get('email'):
updates.append("email = ?")
params.append(kakao_data['email'])
if updates:
params.append(user_id)
cursor.execute(f"UPDATE users SET {', '.join(updates)} WHERE id = ?", params)
conn.commit()
def find_user_by_kakao_id(kakao_id):
"""카카오 ID로 기존 연결된 사용자 조회"""
conn = db_manager.get_sqlite_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT user_id FROM customer_identities
WHERE provider = 'kakao' AND provider_user_id = ?
""", (kakao_id,))
row = cursor.fetchone()
return row['user_id'] if row else None
# ============================================================================
# 라우트
# ============================================================================
@app.route('/')
def index():
"""메인 페이지"""
logged_in = 'logged_in_user_id' in session
return render_template('index.html',
logged_in=logged_in,
logged_in_name=session.get('logged_in_name', ''),
logged_in_phone=session.get('logged_in_phone', '')
)
@app.route('/signup')
def signup():
"""회원가입 페이지"""
if 'logged_in_user_id' in session:
return redirect(f"/my-page?phone={session.get('logged_in_phone', '')}")
return render_template('signup.html')
@app.route('/api/signup', methods=['POST'])
def api_signup():
"""회원가입 API"""
try:
data = request.get_json()
name = data.get('name', '').strip()
phone = data.get('phone', '').strip().replace('-', '').replace(' ', '')
birthday = data.get('birthday', '').strip() or None # 선택 항목
if not name or not phone:
return jsonify({'success': False, 'message': '이름과 전화번호를 모두 입력해주세요.'}), 400
if len(phone) < 10:
return jsonify({'success': False, 'message': '올바른 전화번호를 입력해주세요.'}), 400
user_id, is_new = get_or_create_user(phone, name, birthday=birthday)
# 세션에 유저 정보 저장
session.permanent = True
session['logged_in_user_id'] = user_id
session['logged_in_phone'] = phone
session['logged_in_name'] = name
return jsonify({
'success': True,
'message': '가입이 완료되었습니다.' if is_new else '이미 가입된 회원입니다. 로그인되었습니다.',
'is_new': is_new
})
except Exception as e:
return jsonify({'success': False, 'message': f'오류가 발생했습니다: {str(e)}'}), 500
@app.route('/claim')
def claim():
"""
QR 코드 랜딩 페이지
URL: /claim?t=transaction_id:nonce
"""
# 토큰 파라미터 파싱
token_param = request.args.get('t', '')
if ':' not in token_param:
return render_template('error.html', message="잘못된 QR 코드 형식입니다.")
parts = token_param.split(':')
if len(parts) != 2:
return render_template('error.html', message="잘못된 QR 코드 형식입니다.")
transaction_id, nonce = parts[0], parts[1]
# 토큰 검증
success, message, token_info = verify_claim_token(transaction_id, nonce)
if not success:
return render_template('error.html', message=message)
# 세션에 로그인된 유저가 있으면 자동 적립 (PWA)
if 'logged_in_user_id' in session:
auto_user_id = session['logged_in_user_id']
conn = db_manager.get_sqlite_connection()
cursor = conn.cursor()
cursor.execute("SELECT id, nickname, phone, mileage_balance FROM users WHERE id = ?", (auto_user_id,))
auto_user = cursor.fetchone()
if auto_user:
auto_success, auto_msg, auto_balance = claim_mileage(auto_user_id, token_info)
if auto_success:
return render_template('claim_kakao_success.html',
points=token_info['claimable_points'],
balance=auto_balance,
phone=auto_user['phone'],
name=auto_user['nickname'])
return render_template('error.html', message=auto_msg)
else:
# 유저가 삭제됨 - 세션 클리어
session.pop('logged_in_user_id', None)
session.pop('logged_in_phone', None)
session.pop('logged_in_name', None)
# MSSQL에서 구매 품목 조회
sale_items = []
try:
db_session = db_manager.get_session('PM_PRES')
sale_sub_query = text("""
SELECT
ISNULL(G.GoodsName, '(약품명 없음)') AS goods_name,
S.SL_NM_item AS quantity,
S.SL_TOTAL_PRICE AS total
FROM SALE_SUB S
LEFT JOIN PM_DRUG.dbo.CD_GOODS G ON S.DrugCode = G.DrugCode
WHERE S.SL_NO_order = :transaction_id
ORDER BY S.DrugCode
""")
rows = db_session.execute(sale_sub_query, {'transaction_id': transaction_id}).fetchall()
sale_items = [
{'name': r.goods_name, 'qty': int(r.quantity or 0), 'total': int(r.total or 0)}
for r in rows
]
except Exception as e:
logging.warning(f"품목 조회 실패 (transaction_id={transaction_id}): {e}")
# JS SDK용 카카오 state 생성 (CSRF 보호)
csrf_token = secrets.token_hex(16)
state_data = {'t': token_param, 'csrf': csrf_token}
kakao_state = base64.urlsafe_b64encode(
json.dumps(state_data).encode()
).decode()
session['kakao_csrf'] = csrf_token
return render_template('claim_form.html', token_info=token_info, sale_items=sale_items, kakao_state=kakao_state)
@app.route('/api/claim', methods=['POST'])
def api_claim():
"""
마일리지 적립 API
POST /api/claim
Body: {
"transaction_id": "...",
"nonce": "...",
"phone": "010-1234-5678",
"name": "홍길동"
}
"""
try:
data = request.get_json()
transaction_id = data.get('transaction_id')
nonce = data.get('nonce')
phone = data.get('phone', '').strip()
name = data.get('name', '').strip()
privacy_consent = data.get('privacy_consent', False)
# 입력 검증
if not phone or not name:
return jsonify({
'success': False,
'message': '전화번호와 이름을 모두 입력해주세요.'
}), 400
# 개인정보 동의 검증
if not privacy_consent:
return jsonify({
'success': False,
'message': '개인정보 수집·이용에 동의해주세요.'
}), 400
# 전화번호 형식 정리 (하이픈 제거)
phone = phone.replace('-', '').replace(' ', '')
if len(phone) < 10:
return jsonify({
'success': False,
'message': '올바른 전화번호를 입력해주세요.'
}), 400
# 토큰 검증
success, message, token_info = verify_claim_token(transaction_id, nonce)
if not success:
return jsonify({
'success': False,
'message': message
}), 400
# 사용자 조회/생성
user_id, is_new = get_or_create_user(phone, name)
# 마일리지 적립
success, message, new_balance = claim_mileage(user_id, token_info)
if not success:
return jsonify({
'success': False,
'message': message
}), 500
# 세션에 유저 정보 저장 (PWA 자동적립용)
session.permanent = True
session['logged_in_user_id'] = user_id
session['logged_in_phone'] = phone
session['logged_in_name'] = name
return jsonify({
'success': True,
'message': message,
'points': token_info['claimable_points'],
'balance': new_balance,
'is_new_user': is_new
})
except Exception as e:
return jsonify({
'success': False,
'message': f'오류가 발생했습니다: {str(e)}'
}), 500
# ============================================================================
# 카카오 적립 라우트
# ============================================================================
@app.route('/claim/kakao/start')
def claim_kakao_start():
"""카카오 OAuth 시작 - claim 컨텍스트를 state에 담아 카카오로 리다이렉트"""
from services.kakao_client import get_kakao_client
token_param = request.args.get('t', '')
if ':' not in token_param:
return render_template('error.html', message="잘못된 QR 코드입니다.")
parts = token_param.split(':')
if len(parts) != 2:
return render_template('error.html', message="잘못된 QR 코드입니다.")
transaction_id, nonce = parts
# 토큰 사전 검증
success, message, token_info = verify_claim_token(transaction_id, nonce)
if not success:
return render_template('error.html', message=message)
# state: claim 컨텍스트 + CSRF 토큰
csrf_token = secrets.token_hex(16)
state_data = {
't': token_param,
'csrf': csrf_token
}
state_encoded = base64.urlsafe_b64encode(
json.dumps(state_data).encode()
).decode()
session['kakao_csrf'] = csrf_token
kakao_client = get_kakao_client()
auth_url = kakao_client.get_authorization_url(state=state_encoded)
return redirect(auth_url)
def _handle_mypage_kakao_callback(code, kakao_client):
"""
마이페이지 카카오 콜백 처리 - 카카오 연동(머지) + 마이페이지 이동
케이스별 동작:
A) 카카오 ID가 이미 연결된 유저 → 그 유저의 마이페이지로 이동
B) 미연결 + 카카오 전화번호로 기존 유저 발견 → 카카오 연동 후 이동
C) 미연결 + 기존 유저 없음 → 신규 생성 + 카카오 연동
D) 전화번호 없음 → 에러 안내
"""
success, token_data = kakao_client.get_access_token(code)
if not success:
return render_template('error.html', message="카카오 인증에 실패했습니다.")
access_token = token_data.get('access_token')
success, user_info = kakao_client.get_user_info(access_token)
if not success:
return render_template('error.html', message="카카오 사용자 정보를 가져올 수 없습니다.")
kakao_id = user_info.get('kakao_id')
kakao_phone_raw = user_info.get('phone_number')
kakao_phone = normalize_kakao_phone(kakao_phone_raw)
kakao_name = user_info.get('name') or user_info.get('nickname', '고객')
conn = db_manager.get_sqlite_connection()
cursor = conn.cursor()
# Case A: 카카오 ID로 이미 연결된 유저 있음
existing_user_id = find_user_by_kakao_id(kakao_id)
if existing_user_id:
# "고객" 이름이면 카카오 실명으로 업데이트
if kakao_name and kakao_name != '고객':
cursor.execute(
"UPDATE users SET nickname = ? WHERE id = ? AND nickname = '고객'",
(kakao_name, existing_user_id))
conn.commit()
cursor.execute("SELECT phone FROM users WHERE id = ?", (existing_user_id,))
row = cursor.fetchone()
if row and row['phone']:
return redirect(f"/my-page?phone={row['phone']}")
# 전화번호 없으면 연동 불가
if not kakao_phone:
return render_template('error.html',
message="카카오 계정에 전화번호 정보가 없습니다. 카카오 설정에서 전화번호를 등록해주세요.")
# Case B/C: 전화번호로 기존 유저 조회
cursor.execute("SELECT id, nickname FROM users WHERE phone = ?", (kakao_phone,))
phone_user = cursor.fetchone()
if phone_user:
# Case B: 기존 전화번호 유저 → 카카오 연동 (머지)
user_id = phone_user['id']
link_kakao_identity(user_id, kakao_id, user_info)
# "고객" 이름이면 카카오 실명으로 업데이트
if phone_user['nickname'] == '고객' and kakao_name and kakao_name != '고객':
cursor.execute("UPDATE users SET nickname = ? WHERE id = ?",
(kakao_name, user_id))
conn.commit()
logging.info(f"마이페이지 카카오 머지: user_id={user_id}, kakao_id={kakao_id}")
else:
# Case C: 신규 유저 생성 + 카카오 연동
user_id, _ = get_or_create_user(kakao_phone, kakao_name)
link_kakao_identity(user_id, kakao_id, user_info)
logging.info(f"마이페이지 카카오 신규: user_id={user_id}, kakao_id={kakao_id}")
return redirect(f"/my-page?phone={kakao_phone}")
@app.route('/claim/kakao/callback')
def claim_kakao_callback():
"""카카오 OAuth 콜백 - 토큰 교환 → 사용자 정보 → 적립 처리"""
from services.kakao_client import get_kakao_client
code = request.args.get('code')
state_encoded = request.args.get('state')
error = request.args.get('error')
if error:
return render_template('error.html', message="카카오 로그인이 취소되었습니다.")
if not code or not state_encoded:
return render_template('error.html', message="카카오 인증 정보가 올바르지 않습니다.")
# 1. state 디코딩
try:
state_json = base64.urlsafe_b64decode(state_encoded).decode()
state_data = json.loads(state_json)
except Exception:
return render_template('error.html', message="인증 상태 정보가 올바르지 않습니다.")
# 2. CSRF 검증
csrf_token = state_data.get('csrf')
if csrf_token != session.pop('kakao_csrf', None):
return render_template('error.html', message="보안 검증에 실패했습니다. 다시 시도해주세요.")
# 2.5 마이페이지 조회 목적이면 별도 처리
if state_data.get('purpose') == 'mypage':
return _handle_mypage_kakao_callback(code, get_kakao_client())
# 3. claim 컨텍스트 복원
token_param = state_data.get('t', '')
parts = token_param.split(':')
if len(parts) != 2:
return render_template('error.html', message="적립 정보가 올바르지 않습니다.")
transaction_id, nonce = parts
# 4. 토큰 재검증 (OAuth 중 다른 사람이 적립했을 수 있음)
success, message, token_info = verify_claim_token(transaction_id, nonce)
if not success:
return render_template('error.html', message=message)
# 5. 카카오 access_token 교환
kakao_client = get_kakao_client()
success, token_data = kakao_client.get_access_token(code)
if not success:
logging.error(f"카카오 토큰 교환 실패: {token_data}")
return render_template('error.html', message="카카오 인증에 실패했습니다. 다시 시도해주세요.")
# 6. 사용자 정보 조회
access_token = token_data.get('access_token')
success, user_info = kakao_client.get_user_info(access_token)
if not success:
logging.error(f"카카오 사용자 정보 조회 실패: {user_info}")
return render_template('error.html', message="카카오 사용자 정보를 가져올 수 없습니다.")
kakao_id = user_info.get('kakao_id')
kakao_name = user_info.get('name') or user_info.get('nickname', '')
kakao_phone_raw = user_info.get('phone_number')
kakao_phone = normalize_kakao_phone(kakao_phone_raw)
# 카카오에서 받은 생년월일 조합 (YYYY-MMDD)
kakao_birthday = None
if user_info.get('birthyear') and user_info.get('birthday'):
kakao_birthday = f"{user_info['birthyear']}-{user_info['birthday'][:2]}-{user_info['birthday'][2:]}"
# 7. 분기: 전화번호가 있으면 자동 적립, 없으면 폰 입력 폼
if kakao_phone:
# 자동 적립
existing_user_id = find_user_by_kakao_id(kakao_id)
if existing_user_id:
user_id = existing_user_id
is_new = False
# 생년월일이 있으면 업데이트
if kakao_birthday:
conn = db_manager.get_sqlite_connection()
conn.cursor().execute("UPDATE users SET birthday = ? WHERE id = ? AND birthday IS NULL", (kakao_birthday, user_id))
conn.commit()
else:
user_id, is_new = get_or_create_user(kakao_phone, kakao_name, birthday=kakao_birthday)
link_kakao_identity(user_id, kakao_id, user_info)
success, msg, new_balance = claim_mileage(user_id, token_info)
if not success:
return render_template('error.html', message=msg)
# 세션에 유저 정보 저장 (PWA 자동적립용)
session.permanent = True
session['logged_in_user_id'] = user_id
session['logged_in_phone'] = kakao_phone
session['logged_in_name'] = kakao_name
return render_template('claim_kakao_success.html',
points=token_info['claimable_points'],
balance=new_balance,
phone=kakao_phone,
name=kakao_name)
else:
# 전화번호 없음 → 세션에 카카오 정보 저장 + 폰 입력 폼
session['kakao_claim'] = {
'kakao_id': kakao_id,
'name': kakao_name,
'profile_image': user_info.get('profile_image'),
'email': user_info.get('email'),
'token_param': token_param
}
return render_template('claim_kakao_phone.html',
token_info=token_info,
kakao_name=kakao_name,
kakao_profile_image=user_info.get('profile_image'))
@app.route('/api/claim/kakao', methods=['POST'])
def api_claim_kakao():
"""카카오 적립 (전화번호 폴백) - 세션의 카카오 정보 + 폼의 전화번호로 적립"""
kakao_data = session.pop('kakao_claim', None)
if not kakao_data:
return jsonify({
'success': False,
'message': '카카오 인증 정보가 만료되었습니다. 다시 시도해주세요.'
}), 400
data = request.get_json()
phone = data.get('phone', '').strip().replace('-', '').replace(' ', '')
if len(phone) < 10:
session['kakao_claim'] = kakao_data
return jsonify({
'success': False,
'message': '올바른 전화번호를 입력해주세요.'
}), 400
token_param = kakao_data['token_param']
parts = token_param.split(':')
transaction_id, nonce = parts
success, message, token_info = verify_claim_token(transaction_id, nonce)
if not success:
return jsonify({'success': False, 'message': message}), 400
kakao_id = kakao_data['kakao_id']
name = kakao_data['name']
existing_user_id = find_user_by_kakao_id(kakao_id)
if existing_user_id:
user_id = existing_user_id
is_new = False
else:
user_id, is_new = get_or_create_user(phone, name)
link_kakao_identity(user_id, kakao_id, kakao_data)
success, message, new_balance = claim_mileage(user_id, token_info)
if not success:
return jsonify({'success': False, 'message': message}), 500
# 세션에 유저 정보 저장 (PWA 자동적립용)
session.permanent = True
session['logged_in_user_id'] = user_id
session['logged_in_phone'] = phone
session['logged_in_name'] = name
return jsonify({
'success': True,
'message': message,
'points': token_info['claimable_points'],
'balance': new_balance,
'is_new_user': is_new
})
@app.route('/my-page/kakao/start')
def mypage_kakao_start():
"""마이페이지 카카오 로그인 조회"""
from services.kakao_client import get_kakao_client
csrf_token = secrets.token_hex(16)
state_data = {
'purpose': 'mypage',
'csrf': csrf_token
}
state_encoded = base64.urlsafe_b64encode(
json.dumps(state_data).encode()
).decode()
session['kakao_csrf'] = csrf_token
kakao_client = get_kakao_client()
auth_url = kakao_client.get_authorization_url(state=state_encoded)
return redirect(auth_url)
@app.route('/my-page')
def my_page():
"""마이페이지 (전화번호로 조회)"""
phone = request.args.get('phone', '')
if not phone:
# JS SDK용 카카오 state 생성
csrf_token = secrets.token_hex(16)
state_data = {'purpose': 'mypage', 'csrf': csrf_token}
kakao_state = base64.urlsafe_b64encode(
json.dumps(state_data).encode()
).decode()
session['kakao_csrf'] = csrf_token
return render_template('my_page_login.html', kakao_state=kakao_state)
# 전화번호로 사용자 조회
phone = phone.replace('-', '').replace(' ', '')
conn = db_manager.get_sqlite_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT id, nickname, phone, mileage_balance, created_at
FROM users WHERE phone = ?
""", (phone,))
user_raw = cursor.fetchone()
if not user_raw:
return render_template('error.html', message='등록되지 않은 전화번호입니다.')
# 사용자 정보에 KST 시간 변환 적용
user = dict(user_raw)
user['created_at'] = utc_to_kst_str(user_raw['created_at'])
# 적립 내역 조회 (transaction_id 포함)
cursor.execute("""
SELECT points, balance_after, reason, description, created_at, transaction_id
FROM mileage_ledger
WHERE user_id = ?
ORDER BY created_at DESC
LIMIT 20
""", (user['id'],))
transactions_raw = cursor.fetchall()
# 거래 내역에 KST 시간 변환 적용
transactions = []
for tx in transactions_raw:
tx_dict = dict(tx)
tx_dict['created_at'] = utc_to_kst_str(tx['created_at'])
transactions.append(tx_dict)
return render_template('my_page.html', user=user, transactions=transactions, user_id=user['id'])
# ============================================================================
# PWA / 공통 라우트
# ============================================================================
@app.route('/sw.js')
def service_worker():
"""서비스 워커를 루트 경로에서 제공 (scope='/' 허용)"""
return app.send_static_file('sw.js'), 200, {
'Content-Type': 'application/javascript',
'Service-Worker-Allowed': '/'
}
@app.route('/privacy')
def privacy():
"""개인정보 처리방침"""
return render_template('privacy.html')
@app.route('/logout')
def logout():
"""세션 로그아웃"""
session.pop('logged_in_user_id', None)
session.pop('logged_in_phone', None)
session.pop('logged_in_name', None)
return redirect('/')
@app.route('/admin/transaction/<transaction_id>')
def admin_transaction_detail(transaction_id):
"""거래 세부 내역 조회 (MSSQL)"""
try:
# MSSQL PM_PRES 연결
session = db_manager.get_session('PM_PRES')
# SALE_MAIN 조회 (거래 헤더)
sale_main_query = text("""
SELECT
SL_NO_order,
InsertTime,
SL_MY_total,
SL_MY_discount,
SL_MY_sale,
SL_MY_credit,
SL_MY_recive,
SL_MY_rec_vat,
ISNULL(SL_NM_custom, '[비고객]') AS customer_name
FROM SALE_MAIN
WHERE SL_NO_order = :transaction_id
""")
sale_main = session.execute(sale_main_query, {'transaction_id': transaction_id}).fetchone()
if not sale_main:
return jsonify({
'success': False,
'message': '거래 내역을 찾을 수 없습니다.'
}), 404
# SALE_SUB 조회 (판매 상품 상세)
sale_sub_query = text("""
SELECT
S.DrugCode,
ISNULL(G.GoodsName, '(약품명 없음)') AS goods_name,
S.SL_NM_item AS quantity,
S.SL_NM_cost_a AS price,
S.SL_TOTAL_PRICE AS total
FROM SALE_SUB S
LEFT JOIN PM_DRUG.dbo.CD_GOODS G ON S.DrugCode = G.DrugCode
WHERE S.SL_NO_order = :transaction_id
ORDER BY S.DrugCode
""")
sale_items = session.execute(sale_sub_query, {'transaction_id': transaction_id}).fetchall()
# 결과를 JSON으로 반환
result = {
'success': True,
'transaction': {
'id': sale_main.SL_NO_order,
'date': str(sale_main.InsertTime),
'total_amount': int(sale_main.SL_MY_total or 0),
'discount': int(sale_main.SL_MY_discount or 0),
'sale_amount': int(sale_main.SL_MY_sale or 0),
'credit': int(sale_main.SL_MY_credit or 0),
'supply_value': int(sale_main.SL_MY_recive or 0),
'vat': int(sale_main.SL_MY_rec_vat or 0),
'customer_name': sale_main.customer_name
},
'items': [
{
'code': item.DrugCode,
'name': item.goods_name,
'qty': int(item.quantity or 0),
'price': int(item.price or 0),
'total': int(item.total or 0)
}
for item in sale_items
]
}
return jsonify(result)
except Exception as e:
return jsonify({
'success': False,
'message': f'조회 실패: {str(e)}'
}), 500
@app.route('/admin/user/<int:user_id>')
def admin_user_detail(user_id):
"""사용자 상세 이력 조회 - 구매 이력, 적립 이력, 구매 품목"""
try:
# 1. SQLite 연결
conn = db_manager.get_sqlite_connection()
cursor = conn.cursor()
# 2. 사용자 기본 정보 조회
cursor.execute("""
SELECT id, nickname, phone, mileage_balance, created_at
FROM users WHERE id = ?
""", (user_id,))
user = cursor.fetchone()
if not user:
return jsonify({
'success': False,
'message': '사용자를 찾을 수 없습니다.'
}), 404
# 3. 마일리지 이력 조회 (최근 50건)
cursor.execute("""
SELECT transaction_id, points, balance_after, reason, description, created_at
FROM mileage_ledger
WHERE user_id = ?
ORDER BY created_at DESC
LIMIT 50
""", (user_id,))
mileage_history = cursor.fetchall()
# 4. 구매 이력 조회 (적립된 거래만, 최근 20건)
cursor.execute("""
SELECT transaction_id, total_amount, claimable_points, claimed_at
FROM claim_tokens
WHERE claimed_by_user_id = ?
ORDER BY claimed_at DESC
LIMIT 20
""", (user_id,))
claimed_tokens = cursor.fetchall()
# 5. 각 거래의 상품 상세 조회 (MSSQL)
purchases = []
try:
session = db_manager.get_session('PM_PRES')
for token in claimed_tokens:
transaction_id = token['transaction_id']
# SALE_MAIN에서 거래 시간 조회
sale_main_query = text("""
SELECT InsertTime
FROM SALE_MAIN
WHERE SL_NO_order = :transaction_id
""")
sale_main = session.execute(
sale_main_query,
{'transaction_id': transaction_id}
).fetchone()
# 거래 시간 추출 (MSSQL의 실제 거래 시간)
if sale_main and sale_main.InsertTime:
transaction_date = str(sale_main.InsertTime)[:16].replace('T', ' ')
else:
transaction_date = '-'
# SALE_SUB + CD_GOODS JOIN (BARCODE 추가)
sale_items_query = text("""
SELECT
S.BARCODE,
S.DrugCode,
ISNULL(G.GoodsName, '(약품명 없음)') AS goods_name,
S.SL_NM_item AS quantity,
S.SL_NM_cost_a AS price,
S.SL_TOTAL_PRICE AS total
FROM SALE_SUB S
LEFT JOIN PM_DRUG.dbo.CD_GOODS G ON S.DrugCode = G.DrugCode
WHERE S.SL_NO_order = :transaction_id
ORDER BY S.DrugCode
""")
items_raw = session.execute(
sale_items_query,
{'transaction_id': transaction_id}
).fetchall()
# 상품 리스트 변환 (카테고리 포함)
items = []
for item in items_raw:
barcode = item.BARCODE
# SQLite에서 제품 카테고리 조회
categories = []
if barcode:
cursor.execute("""
SELECT category_name, relevance_score
FROM product_category_mapping
WHERE barcode = ?
ORDER BY relevance_score DESC
""", (barcode,))
for cat_row in cursor.fetchall():
categories.append({
'name': cat_row[0],
'score': cat_row[1]
})
items.append({
'code': item.DrugCode,
'barcode': barcode,
'name': item.goods_name,
'qty': int(item.quantity or 0),
'price': int(item.price or 0),
'total': int(item.total or 0),
'categories': categories
})
# 상품 요약 생성 ("첫번째상품명 외 N개")
if items:
first_item_name = items[0]['name']
items_count = len(items)
if items_count == 1:
items_summary = first_item_name
else:
items_summary = f"{first_item_name}{items_count - 1}"
else:
items_summary = "상품 정보 없음"
items_count = 0
purchases.append({
'transaction_id': transaction_id,
'date': transaction_date, # MSSQL의 실제 거래 시간 사용
'amount': int(token['total_amount']),
'points': int(token['claimable_points']),
'items_summary': items_summary,
'items_count': items_count,
'items': items
})
except Exception as mssql_error:
# MSSQL 연결 실패 시 빈 배열 반환
print(f"[WARNING] MSSQL 조회 실패 (user {user_id}): {mssql_error}")
purchases = []
# 6. 응답 생성
return jsonify({
'success': True,
'user': {
'id': user['id'],
'name': user['nickname'],
'phone': user['phone'],
'balance': user['mileage_balance'],
'created_at': utc_to_kst_str(user['created_at'])
},
'mileage_history': [
{
'points': ml['points'],
'balance_after': ml['balance_after'],
'reason': ml['reason'],
'description': ml['description'],
'created_at': utc_to_kst_str(ml['created_at']),
'transaction_id': ml['transaction_id']
}
for ml in mileage_history
],
'purchases': purchases
})
except Exception as e:
return jsonify({
'success': False,
'message': f'조회 실패: {str(e)}'
}), 500
@app.route('/admin/search/user')
def admin_search_user():
"""사용자 검색 (이름/전화번호/전화번호 뒷자리)"""
query = request.args.get('q', '').strip()
search_type = request.args.get('type', 'name') # 'name', 'phone', 'phone_last'
if not query:
return jsonify({'success': False, 'message': '검색어를 입력하세요'}), 400
conn = db_manager.get_sqlite_connection()
cursor = conn.cursor()
try:
if search_type == 'phone_last':
# 전화번호 뒷자리 검색
cursor.execute("""
SELECT id, nickname, phone, mileage_balance
FROM users
WHERE phone LIKE ?
ORDER BY created_at DESC
""", (f'%{query}',))
elif search_type == 'phone':
# 전체 전화번호 검색
cursor.execute("""
SELECT id, nickname, phone, mileage_balance
FROM users
WHERE phone = ?
""", (query,))
else:
# 이름 검색
cursor.execute("""
SELECT id, nickname, phone, mileage_balance
FROM users
WHERE nickname LIKE ?
ORDER BY created_at DESC
""", (f'%{query}%',))
results = cursor.fetchall()
if not results:
return jsonify({'success': False, 'message': '검색 결과가 없습니다'}), 404
if len(results) == 1:
# 단일 매칭 - user_id만 반환
return jsonify({
'success': True,
'multiple': False,
'user_id': results[0]['id']
})
else:
# 여러 명 매칭 - 선택 모달용 데이터 반환
users = [{
'id': row['id'],
'name': row['nickname'],
'phone': row['phone'],
'balance': row['mileage_balance']
} for row in results]
return jsonify({
'success': True,
'multiple': True,
'users': users
})
except Exception as e:
return jsonify({
'success': False,
'message': f'검색 실패: {str(e)}'
}), 500
@app.route('/admin/search/product')
def admin_search_product():
"""제품 검색 - 적립자 목록 반환 (SQLite 적립자 기준)"""
query = request.args.get('q', '').strip()
if not query:
return jsonify({'success': False, 'message': '검색어를 입력하세요'}), 400
conn = db_manager.get_sqlite_connection()
cursor = conn.cursor()
try:
# 1. MSSQL에서 제품명으로 거래번호 찾기
session = db_manager.get_session('PM_PRES')
sale_items_query = text("""
SELECT DISTINCT
S.SL_NO_order,
S.SL_NM_item,
M.InsertTime
FROM SALE_SUB S
LEFT JOIN PM_DRUG.dbo.CD_GOODS G ON S.DrugCode = G.DrugCode
LEFT JOIN SALE_MAIN M ON S.SL_NO_order = M.SL_NO_order
WHERE G.GoodsName LIKE :product_name
ORDER BY M.InsertTime DESC
""")
sale_results = session.execute(sale_items_query, {
'product_name': f'%{query}%'
}).fetchall()
if not sale_results:
return jsonify({
'success': True,
'results': []
})
# 2. SQLite에서 적립된 거래만 필터링 (claimed_by_user_id IS NOT NULL)
transaction_ids = [row.SL_NO_order for row in sale_results]
placeholders = ','.join('?' * len(transaction_ids))
cursor.execute(f"""
SELECT
ct.transaction_id,
ct.total_amount,
ct.claimed_at,
ct.claimed_by_user_id,
u.nickname,
u.phone
FROM claim_tokens ct
JOIN users u ON ct.claimed_by_user_id = u.id
WHERE ct.transaction_id IN ({placeholders})
AND ct.claimed_by_user_id IS NOT NULL
ORDER BY ct.claimed_at DESC
LIMIT 50
""", transaction_ids)
claimed_results = cursor.fetchall()
# 3. 결과 조합
results = []
for claim_row in claimed_results:
# 해당 거래의 MSSQL 정보 찾기
mssql_row = next((r for r in sale_results if r.SL_NO_order == claim_row['transaction_id']), None)
if mssql_row:
results.append({
'user_id': claim_row['claimed_by_user_id'],
'user_name': claim_row['nickname'],
'user_phone': claim_row['phone'],
'purchase_date': str(mssql_row.InsertTime)[:16].replace('T', ' ') if mssql_row.InsertTime else '-', # MSSQL 실제 거래 시간
'claimed_date': str(claim_row['claimed_at'])[:16].replace('T', ' ') if claim_row['claimed_at'] else '-', # 적립 시간
'quantity': float(mssql_row.SL_NM_item or 0),
'total_amount': int(claim_row['total_amount'])
})
return jsonify({
'success': True,
'results': results
})
except Exception as e:
return jsonify({
'success': False,
'message': f'검색 실패: {str(e)}'
}), 500
@app.route('/admin/ai-analyze-user/<int:user_id>', methods=['POST'])
def admin_ai_analyze_user(user_id):
"""OpenAI GPT를 사용한 사용자 구매 패턴 AI 분석"""
try:
# OpenAI 사용 가능 여부 확인
if not OPENAI_AVAILABLE:
return jsonify({
'success': False,
'message': ERROR_MESSAGES['OPENAI_NOT_AVAILABLE']
}), 503
# 1. SQLite 연결
conn = db_manager.get_sqlite_connection()
cursor = conn.cursor()
# 2. 사용자 기본 정보 조회
cursor.execute("""
SELECT id, nickname, phone, mileage_balance, created_at
FROM users WHERE id = ?
""", (user_id,))
user_row = cursor.fetchone()
if not user_row:
return jsonify({
'success': False,
'message': ERROR_MESSAGES['NO_USER']
}), 404
user = {
'id': user_row['id'],
'nickname': user_row['nickname'],
'phone': user_row['phone'],
'mileage_balance': user_row['mileage_balance'],
'created_at': user_row['created_at']
}
# 3. 구매 이력 조회 (최근 20건)
cursor.execute("""
SELECT transaction_id, total_amount, claimable_points, claimed_at
FROM claim_tokens
WHERE claimed_by_user_id = ?
ORDER BY claimed_at DESC
LIMIT 20
""", (user_id,))
claimed_tokens = cursor.fetchall()
if not claimed_tokens:
return jsonify({
'success': False,
'message': ERROR_MESSAGES['NO_PURCHASES']
}), 400
# 4. MSSQL에서 상품 상세 조회
purchases = []
session = db_manager.get_session('PM_PRES')
for token in claimed_tokens:
transaction_id = token['transaction_id']
# SALE_MAIN에서 거래 시간 조회
sale_main_query = text("""
SELECT InsertTime
FROM SALE_MAIN
WHERE SL_NO_order = :transaction_id
""")
sale_main = session.execute(sale_main_query, {'transaction_id': transaction_id}).fetchone()
# SALE_SUB + CD_GOODS JOIN
sale_items_query = text("""
SELECT
S.DrugCode,
ISNULL(G.GoodsName, '(약품명 없음)') AS goods_name,
S.SL_NM_item AS quantity,
S.SL_NM_cost_a AS price,
S.SL_TOTAL_PRICE AS total
FROM SALE_SUB S
LEFT JOIN PM_DRUG.dbo.CD_GOODS G ON S.DrugCode = G.DrugCode
WHERE S.SL_NO_order = :transaction_id
ORDER BY S.DrugCode
""")
items_raw = session.execute(sale_items_query, {'transaction_id': transaction_id}).fetchall()
items = [{
'name': item.goods_name,
'qty': int(item.quantity or 0),
'price': int(item.price or 0)
} for item in items_raw]
purchases.append({
'date': str(sale_main.InsertTime)[:16].replace('T', ' ') if sale_main and sale_main.InsertTime else '-',
'amount': int(token['total_amount']),
'points': int(token['claimable_points']),
'items': items
})
# 5. OpenAI API 호출용 프롬프트 생성
prompt = prepare_analysis_prompt(user, purchases)
# 6. OpenAI API 호출
logging.info(f"AI 분석 시작: 사용자 ID {user_id}")
success, response = call_openai_with_retry(prompt)
if not success:
# response에는 에러 메시지가 담겨 있음
return jsonify({
'success': False,
'message': response
}), 500
# 7. 응답 파싱
response_text = response.choices[0].message.content
analysis = parse_openai_response(response_text)
logging.info(f"AI 분석 완료: 사용자 ID {user_id}, 토큰: {response.usage.total_tokens}")
# 8. 결과 반환
return jsonify({
'success': True,
'user': {
'id': user['id'],
'name': user['nickname'],
'phone': user['phone'],
'balance': user['mileage_balance']
},
'analysis': analysis,
'metadata': {
'model_used': OPENAI_MODEL,
'tokens_used': response.usage.total_tokens,
'analysis_time': datetime.now(KST).strftime('%Y-%m-%d %H:%M:%S')
}
})
except Exception as e:
logging.error(f"AI 분석 오류: {e}")
return jsonify({
'success': False,
'message': ERROR_MESSAGES['UNKNOWN_ERROR']
}), 500
@app.route('/admin/use-points', methods=['POST'])
def admin_use_points():
"""관리자 페이지에서 포인트 사용 (차감)"""
try:
data = request.get_json()
user_id = data.get('user_id')
points_to_use = data.get('points')
if not user_id or not points_to_use:
return jsonify({
'success': False,
'message': '사용자 ID와 포인트를 입력하세요.'
}), 400
if points_to_use <= 0:
return jsonify({
'success': False,
'message': '1 이상의 포인트를 입력하세요.'
}), 400
conn = db_manager.get_sqlite_connection()
cursor = conn.cursor()
# 1. 현재 사용자 잔액 확인
cursor.execute("""
SELECT id, nickname, mileage_balance
FROM users
WHERE id = ?
""", (user_id,))
user = cursor.fetchone()
if not user:
return jsonify({
'success': False,
'message': '사용자를 찾을 수 없습니다.'
}), 404
current_balance = user['mileage_balance']
# 2. 잔액 확인
if points_to_use > current_balance:
return jsonify({
'success': False,
'message': f'잔액({current_balance:,}P)이 부족합니다.'
}), 400
# 3. 포인트 차감
new_balance = current_balance - points_to_use
cursor.execute("""
UPDATE users
SET mileage_balance = ?
WHERE id = ?
""", (new_balance, user_id))
# 4. 마일리지 원장 기록
cursor.execute("""
INSERT INTO mileage_ledger
(user_id, transaction_id, points, balance_after, reason, description)
VALUES (?, NULL, ?, ?, 'USE', ?)
""", (
user_id,
-points_to_use, # 음수로 저장
new_balance,
f'포인트 사용 (관리자) - {points_to_use:,}P 차감'
))
conn.commit()
logging.info(f"포인트 사용 완료: 사용자 ID {user_id}, 차감 {points_to_use}P, 잔액 {new_balance}P")
return jsonify({
'success': True,
'message': f'{points_to_use:,}P 사용 완료',
'new_balance': new_balance,
'used_points': points_to_use
})
except Exception as e:
logging.error(f"포인트 사용 실패: {e}")
return jsonify({
'success': False,
'message': f'포인트 사용 실패: {str(e)}'
}), 500
@app.route('/admin')
def admin():
"""관리자 페이지 - 전체 사용자 및 적립 현황"""
conn = db_manager.get_sqlite_connection()
cursor = conn.cursor()
# 전체 통계
cursor.execute("""
SELECT
COUNT(*) as total_users,
SUM(mileage_balance) as total_balance
FROM users
""")
stats = cursor.fetchone()
# 최근 가입 사용자 (20명)
cursor.execute("""
SELECT id, nickname, phone, mileage_balance, created_at
FROM users
ORDER BY created_at DESC
LIMIT 20
""")
recent_users_raw = cursor.fetchall()
# 시간을 KST로 변환
recent_users = []
for user in recent_users_raw:
user_dict = dict(user)
user_dict['created_at'] = utc_to_kst_str(user['created_at'])
recent_users.append(user_dict)
# 최근 적립 내역 (50건)
cursor.execute("""
SELECT
ml.id,
u.nickname,
u.phone,
ml.points,
ml.balance_after,
ml.reason,
ml.description,
ml.created_at,
ml.transaction_id
FROM mileage_ledger ml
JOIN users u ON ml.user_id = u.id
ORDER BY ml.created_at DESC
LIMIT 50
""")
recent_transactions_raw = cursor.fetchall()
# 시간을 KST로 변환
recent_transactions = []
for trans in recent_transactions_raw:
trans_dict = dict(trans)
trans_dict['created_at'] = utc_to_kst_str(trans['created_at'])
recent_transactions.append(trans_dict)
# QR 토큰 통계
cursor.execute("""
SELECT
COUNT(*) as total_tokens,
SUM(CASE WHEN claimed_at IS NOT NULL THEN 1 ELSE 0 END) as claimed_count,
SUM(CASE WHEN claimed_at IS NULL THEN 1 ELSE 0 END) as unclaimed_count,
SUM(claimable_points) as total_points_issued,
SUM(CASE WHEN claimed_at IS NOT NULL THEN claimable_points ELSE 0 END) as total_points_claimed
FROM claim_tokens
""")
token_stats = cursor.fetchone()
# 최근 QR 발행 내역 (20건)
cursor.execute("""
SELECT
transaction_id,
total_amount,
claimable_points,
claimed_at,
claimed_by_user_id,
created_at
FROM claim_tokens
ORDER BY created_at DESC
LIMIT 20
""")
recent_tokens_raw = cursor.fetchall()
# Convert only created_at (발행일), leave claimed_at (적립일) unconverted
recent_tokens = []
for token in recent_tokens_raw:
token_dict = dict(token)
token_dict['created_at'] = utc_to_kst_str(token['created_at']) # Convert 발행일
# claimed_at stays as-is (or remains None if not claimed)
recent_tokens.append(token_dict)
return render_template('admin.html',
stats=stats,
recent_users=recent_users,
recent_transactions=recent_transactions,
token_stats=token_stats,
recent_tokens=recent_tokens)
# ============================================================================
# AI 업셀링 추천
# ============================================================================
def _get_available_products():
"""약국 보유 제품 목록 (최근 30일 판매 실적 기준 TOP 40)"""
try:
mssql_session = db_manager.get_session('PM_PRES')
rows = mssql_session.execute(text("""
SELECT TOP 40
ISNULL(G.GoodsName, '') AS name,
COUNT(*) as sales,
MAX(G.Saleprice) as price
FROM SALE_SUB S
LEFT JOIN PM_DRUG.dbo.CD_GOODS G ON S.DrugCode = G.DrugCode
WHERE S.SL_DT_appl >= CONVERT(VARCHAR(8), DATEADD(DAY, -30, GETDATE()), 112)
AND G.GoodsName IS NOT NULL
AND G.GoodsName NOT LIKE N'%(판매중지)%'
GROUP BY G.GoodsName
ORDER BY COUNT(*) DESC
""")).fetchall()
return [{'name': r.name, 'price': float(r.price or 0), 'sales': r.sales} for r in rows]
except Exception as e:
logging.warning(f"[AI추천] 보유 제품 목록 조회 실패: {e}")
return []
def _generate_upsell_recommendation(user_id, transaction_id, sale_items, user_name):
"""키오스크 적립 후 AI 업셀링 추천 생성 (fire-and-forget)"""
from services.clawdbot_client import generate_upsell, generate_upsell_real
if not sale_items:
return
# 현재 구매 품목
current_items = ', '.join(item['name'] for item in sale_items if item.get('name'))
if not current_items:
return
# 최근 구매 이력 수집
recent_products = current_items # 기본값
try:
conn = db_manager.get_sqlite_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT ct.transaction_id
FROM claim_tokens ct
WHERE ct.claimed_by_user_id = ? AND ct.transaction_id != ?
ORDER BY ct.claimed_at DESC LIMIT 5
""", (user_id, transaction_id))
recent_tokens = cursor.fetchall()
if recent_tokens:
all_products = []
mssql_session = db_manager.get_session('PM_PRES')
for token in recent_tokens:
rows = mssql_session.execute(text("""
SELECT ISNULL(G.GoodsName, '') AS goods_name
FROM SALE_SUB S
LEFT JOIN PM_DRUG.dbo.CD_GOODS G ON S.DrugCode = G.DrugCode
WHERE S.SL_NO_order = :tid
"""), {'tid': token['transaction_id']}).fetchall()
for r in rows:
if r.goods_name:
all_products.append(r.goods_name)
if all_products:
recent_products = ', '.join(set(all_products))
except Exception as e:
logging.warning(f"[AI추천] 구매 이력 수집 실패 (현재 품목만 사용): {e}")
# 실데이터 기반 추천 (보유 제품 목록 제공)
available = _get_available_products()
rec = None
if available:
logging.info(f"[AI추천] 실데이터 생성 시작: user={user_name}, items={current_items}, 보유제품={len(available)}")
rec = generate_upsell_real(user_name, current_items, recent_products, available)
# 실데이터 실패 시 기존 방식 fallback
if not rec:
logging.info(f"[AI추천] 자유 생성 fallback: user={user_name}")
rec = generate_upsell(user_name, current_items, recent_products)
if not rec:
logging.warning("[AI추천] 생성 실패 (AI 응답 없음)")
return
# SQLite에 저장
try:
conn = db_manager.get_sqlite_connection()
cursor = conn.cursor()
expires_at = (datetime.now() + timedelta(days=7)).strftime('%Y-%m-%d %H:%M:%S')
cursor.execute("""
INSERT INTO ai_recommendations
(user_id, transaction_id, recommended_product, recommendation_message,
recommendation_reason, trigger_products, ai_raw_response, expires_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
user_id, transaction_id,
rec['product'], rec['message'], rec['reason'],
json.dumps([item['name'] for item in sale_items], ensure_ascii=False),
json.dumps(rec, ensure_ascii=False),
expires_at
))
conn.commit()
logging.info(f"[AI추천] 저장 완료: user_id={user_id}, product={rec['product']}")
except Exception as e:
logging.warning(f"[AI추천] DB 저장 실패: {e}")
@app.route('/api/recommendation/<int:user_id>')
def api_get_recommendation(user_id):
"""마이페이지용 AI 추천 조회"""
conn = db_manager.get_sqlite_connection()
cursor = conn.cursor()
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
cursor.execute("""
SELECT id, recommended_product, recommendation_message, created_at
FROM ai_recommendations
WHERE user_id = ? AND status = 'active'
AND (expires_at IS NULL OR expires_at > ?)
ORDER BY created_at DESC LIMIT 1
""", (user_id, now))
rec = cursor.fetchone()
if not rec:
return jsonify({'success': True, 'has_recommendation': False})
# 표시 횟수 업데이트
cursor.execute("""
UPDATE ai_recommendations
SET displayed_count = displayed_count + 1,
displayed_at = COALESCE(displayed_at, ?)
WHERE id = ?
""", (now, rec['id']))
conn.commit()
return jsonify({
'success': True,
'has_recommendation': True,
'recommendation': {
'id': rec['id'],
'product': rec['recommended_product'],
'message': rec['recommendation_message']
}
})
@app.route('/api/recommendation/<int:rec_id>/dismiss', methods=['POST'])
def api_dismiss_recommendation(rec_id):
"""추천 닫기 / 관심 표시"""
data = request.get_json(silent=True) or {}
action = data.get('action', 'dismissed')
if action not in ('dismissed', 'interested'):
action = 'dismissed'
conn = db_manager.get_sqlite_connection()
cursor = conn.cursor()
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
cursor.execute("""
UPDATE ai_recommendations SET status = ?, dismissed_at = ?
WHERE id = ?
""", (action, now, rec_id))
conn.commit()
logging.info(f"[AI추천] id={rec_id}{action}")
return jsonify({'success': True})
# ============================================================================
# 알림톡 로그
# ============================================================================
@app.route('/admin/alimtalk')
def admin_alimtalk():
"""알림톡 발송 로그 + NHN 발송 내역"""
conn = db_manager.get_sqlite_connection()
cursor = conn.cursor()
# 로컬 발송 로그 (최근 50건)
cursor.execute("""
SELECT a.*, u.nickname, u.phone as user_phone
FROM alimtalk_logs a
LEFT JOIN users u ON a.user_id = u.id
ORDER BY a.created_at DESC
LIMIT 50
""")
local_logs = [dict(row) for row in cursor.fetchall()]
# 통계
cursor.execute("""
SELECT
COUNT(*) as total,
SUM(CASE WHEN success = 1 THEN 1 ELSE 0 END) as success_count,
SUM(CASE WHEN success = 0 THEN 1 ELSE 0 END) as fail_count
FROM alimtalk_logs
""")
stats = dict(cursor.fetchone())
# 오늘 통계
cursor.execute("""
SELECT
COUNT(*) as today_total,
SUM(CASE WHEN success = 1 THEN 1 ELSE 0 END) as today_success
FROM alimtalk_logs
WHERE date(created_at) = date('now')
""")
today = dict(cursor.fetchone())
stats.update(today)
return render_template('admin_alimtalk.html', local_logs=local_logs, stats=stats)
@app.route('/admin/ai-crm')
def admin_ai_crm():
"""AI 업셀링 CRM 대시보드"""
conn = db_manager.get_sqlite_connection()
cursor = conn.cursor()
# 추천 목록 (최근 50건) + 사용자 정보 JOIN
cursor.execute("""
SELECT r.*, u.nickname, u.phone as user_phone
FROM ai_recommendations r
LEFT JOIN users u ON r.user_id = u.id
ORDER BY r.created_at DESC
LIMIT 50
""")
recs = [dict(row) for row in cursor.fetchall()]
# trigger_products JSON 파싱
for rec in recs:
tp = rec.get('trigger_products')
if tp:
try:
rec['trigger_list'] = json.loads(tp)
except Exception:
rec['trigger_list'] = [tp]
else:
rec['trigger_list'] = []
# 통계
cursor.execute("""
SELECT
COUNT(*) as total,
SUM(CASE WHEN status = 'active' AND (expires_at IS NULL OR expires_at > datetime('now')) THEN 1 ELSE 0 END) as active_count,
SUM(CASE WHEN status = 'interested' THEN 1 ELSE 0 END) as interested_count,
SUM(CASE WHEN status = 'dismissed' THEN 1 ELSE 0 END) as dismissed_count,
SUM(CASE WHEN displayed_count > 0 THEN 1 ELSE 0 END) as displayed_count
FROM ai_recommendations
""")
stats = dict(cursor.fetchone())
# 오늘 생성 건수
cursor.execute("""
SELECT COUNT(*) as today_count
FROM ai_recommendations
WHERE date(created_at) = date('now')
""")
stats['today_count'] = cursor.fetchone()['today_count']
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
return render_template('admin_ai_crm.html', recs=recs, stats=stats, now=now)
@app.route('/api/admin/alimtalk/nhn-history')
def api_admin_alimtalk_nhn_history():
"""NHN Cloud 실제 발송 내역 API"""
from services.nhn_alimtalk import get_nhn_send_history
date_str = request.args.get('date', datetime.now().strftime('%Y-%m-%d'))
start = f"{date_str} 00:00"
end = f"{date_str} 23:59"
messages = get_nhn_send_history(start, end)
result = []
for m in messages:
result.append({
'requestDate': m.get('requestDate', ''),
'recipientNo': m.get('recipientNo', ''),
'templateCode': m.get('templateCode', ''),
'messageStatus': m.get('messageStatus', ''),
'resultCode': m.get('resultCode', ''),
'resultMessage': m.get('resultMessage', ''),
'content': m.get('content', ''),
})
return jsonify({'success': True, 'messages': result})
@app.route('/api/admin/alimtalk/test-send', methods=['POST'])
def api_admin_alimtalk_test_send():
"""관리자 수동 알림톡 발송 테스트"""
from services.nhn_alimtalk import send_mileage_claim_alimtalk
data = request.get_json()
phone = data.get('phone', '').strip().replace('-', '')
name = data.get('name', '테스트')
if len(phone) < 10:
return jsonify({'success': False, 'message': '올바른 전화번호를 입력해주세요.'}), 400
success, msg = send_mileage_claim_alimtalk(
phone, name, 100, 500,
items=[{'name': '테스트 발송', 'qty': 1, 'total': 1000}],
trigger_source='admin_test'
)
return jsonify({'success': success, 'message': msg})
# ============================================================================
# 키오스크 적립
# ============================================================================
@app.route('/kiosk')
def kiosk():
"""키오스크 메인 페이지 (전체 화면 웹 UI)"""
return render_template('kiosk.html')
@app.route('/api/kiosk/trigger', methods=['POST'])
def api_kiosk_trigger():
"""
POS → 키오스크 세션 생성
POST /api/kiosk/trigger
Body: {"transaction_id": "...", "amount": 50000}
"""
global kiosk_current_session
data = request.get_json()
transaction_id = data.get('transaction_id')
amount = data.get('amount', 0)
if not transaction_id:
return jsonify({'success': False, 'message': 'transaction_id가 필요합니다.'}), 400
try:
conn = db_manager.get_sqlite_connection()
cursor = conn.cursor()
# QR 토큰 존재 확인
cursor.execute("SELECT id, token_hash, claimable_points, claimed_at FROM claim_tokens WHERE transaction_id = ?",
(transaction_id,))
token_row = cursor.fetchone()
if token_row and token_row['claimed_at']:
return jsonify({'success': False, 'message': '이미 적립된 거래입니다.'}), 400
if token_row:
# 기존 토큰 사용 — QR URL은 새 nonce로 생성
# (verify_claim_token은 transaction_id로만 조회하므로 nonce 불일치 무관)
claimable_points = token_row['claimable_points']
nonce = secrets.token_hex(6)
from utils.qr_token_generator import QR_BASE_URL
qr_url = f"{QR_BASE_URL}?t={transaction_id}:{nonce}"
else:
# 새 토큰 생성
from utils.qr_token_generator import generate_claim_token, save_token_to_db
token_info = generate_claim_token(transaction_id, float(amount))
success, error = save_token_to_db(
transaction_id,
token_info['token_hash'],
float(amount),
token_info['claimable_points'],
token_info['expires_at'],
token_info['pharmacy_id']
)
if not success:
return jsonify({'success': False, 'message': error}), 500
claimable_points = token_info['claimable_points']
qr_url = token_info['qr_url']
# MSSQL에서 구매 품목 조회
sale_items = []
try:
mssql_session = db_manager.get_session('PM_PRES')
sale_sub_query = text("""
SELECT
ISNULL(G.GoodsName, '(약품명 없음)') AS goods_name,
S.SL_NM_item AS quantity,
S.SL_TOTAL_PRICE AS total
FROM SALE_SUB S
LEFT JOIN PM_DRUG.dbo.CD_GOODS G ON S.DrugCode = G.DrugCode
WHERE S.SL_NO_order = :transaction_id
ORDER BY S.DrugCode
""")
rows = mssql_session.execute(sale_sub_query, {'transaction_id': transaction_id}).fetchall()
sale_items = [
{'name': r.goods_name, 'qty': int(r.quantity or 0), 'total': int(r.total or 0)}
for r in rows
]
except Exception as e:
logging.warning(f"키오스크 품목 조회 실패 (transaction_id={transaction_id}): {e}")
# 키오스크 세션 저장
kiosk_current_session = {
'transaction_id': transaction_id,
'amount': int(amount),
'points': claimable_points,
'qr_url': qr_url,
'items': sale_items,
'created_at': datetime.now(KST).isoformat()
}
return jsonify({
'success': True,
'message': f'키오스크 적립 대기 ({claimable_points}P)',
'points': claimable_points
})
except Exception as e:
logging.error(f"키오스크 트리거 오류: {e}")
return jsonify({'success': False, 'message': f'오류: {str(e)}'}), 500
@app.route('/api/kiosk/current')
def api_kiosk_current():
"""
키오스크 폴링 - 현재 세션 조회
GET /api/kiosk/current
"""
global kiosk_current_session
if kiosk_current_session is None:
return jsonify({'active': False})
# 5분 경과 시 자동 만료
created = datetime.fromisoformat(kiosk_current_session['created_at'])
if datetime.now(KST) - created > timedelta(minutes=5):
kiosk_current_session = None
return jsonify({'active': False})
return jsonify({
'active': True,
'transaction_id': kiosk_current_session['transaction_id'],
'amount': kiosk_current_session['amount'],
'points': kiosk_current_session['points'],
'qr_url': kiosk_current_session.get('qr_url'),
'items': kiosk_current_session.get('items', [])
})
@app.route('/api/kiosk/claim', methods=['POST'])
def api_kiosk_claim():
"""
키오스크 전화번호 적립
POST /api/kiosk/claim
Body: {"phone": "01012345678"}
"""
global kiosk_current_session
if kiosk_current_session is None:
return jsonify({'success': False, 'message': '적립 대기 중인 거래가 없습니다.'}), 400
data = request.get_json()
phone = data.get('phone', '').strip().replace('-', '').replace(' ', '')
if len(phone) < 10:
return jsonify({'success': False, 'message': '올바른 전화번호를 입력해주세요.'}), 400
transaction_id = kiosk_current_session['transaction_id']
try:
conn = db_manager.get_sqlite_connection()
cursor = conn.cursor()
# claim_tokens에서 nonce 조회를 위해 token_hash로 검증
cursor.execute("""
SELECT id, transaction_id, token_hash, total_amount, claimable_points,
pharmacy_id, expires_at, claimed_at, claimed_by_user_id
FROM claim_tokens WHERE transaction_id = ?
""", (transaction_id,))
token_row = cursor.fetchone()
if not token_row:
return jsonify({'success': False, 'message': '토큰을 찾을 수 없습니다.'}), 400
if token_row['claimed_at']:
kiosk_current_session = None
return jsonify({'success': False, 'message': '이미 적립된 거래입니다.'}), 400
# 만료 확인
expires_at = datetime.strptime(token_row['expires_at'], '%Y-%m-%d %H:%M:%S')
if datetime.now() > expires_at:
kiosk_current_session = None
return jsonify({'success': False, 'message': '만료된 거래입니다.'}), 400
# token_info 딕셔너리 구성 (claim_mileage 호환)
token_info = {
'id': token_row['id'],
'transaction_id': token_row['transaction_id'],
'total_amount': token_row['total_amount'],
'claimable_points': token_row['claimable_points']
}
# 사용자 조회/생성
user_id, is_new = get_or_create_user(phone, '고객')
# 마일리지 적립
claim_success, claim_msg, new_balance = claim_mileage(user_id, token_info)
if not claim_success:
return jsonify({'success': False, 'message': claim_msg}), 500
# 키오스크 세션에서 품목 정보 캡처 후 클리어
claimed_points = token_info['claimable_points']
sale_items = kiosk_current_session.get('items', [])
kiosk_current_session = None
# 알림톡 발송 (fire-and-forget)
try:
from services.nhn_alimtalk import send_mileage_claim_alimtalk
# 유저 이름 조회 (기존 유저는 실명이 있을 수 있음)
cursor.execute("SELECT nickname FROM users WHERE id = ?", (user_id,))
user_row = cursor.fetchone()
user_name = user_row['nickname'] if user_row else '고객'
logging.warning(f"[알림톡] 발송 시도: phone={phone}, name={user_name}, points={claimed_points}, balance={new_balance}, items={sale_items}")
success, msg = send_mileage_claim_alimtalk(
phone, user_name, claimed_points, new_balance,
items=sale_items, user_id=user_id,
trigger_source='kiosk', transaction_id=transaction_id
)
logging.warning(f"[알림톡] 발송 결과: success={success}, msg={msg}")
except Exception as alimtalk_err:
logging.warning(f"[알림톡] 발송 예외 (적립은 완료): {alimtalk_err}")
# AI 업셀링 추천 생성 (별도 스레드 — 적립 응답 블로킹 방지)
import threading
def _bg_upsell():
try:
_generate_upsell_recommendation(user_id, transaction_id, sale_items, user_name)
except Exception as rec_err:
logging.warning(f"[AI추천] 생성 예외 (적립은 완료): {rec_err}")
threading.Thread(target=_bg_upsell, daemon=True).start()
return jsonify({
'success': True,
'message': f'{claimed_points}P 적립 완료!',
'points': claimed_points,
'balance': new_balance,
'is_new_user': is_new
})
except Exception as e:
logging.error(f"키오스크 적립 오류: {e}")
return jsonify({'success': False, 'message': f'오류: {str(e)}'}), 500
# ===== AI Gateway 모니터 페이지 =====
@app.route('/admin/ai-gw')
def admin_ai_gw():
"""AI Gateway 모니터 페이지"""
return render_template('admin_ai_gw.html')
# ===== 판매 상세 조회 페이지 =====
@app.route('/admin/sales-detail')
def admin_sales_detail():
"""판매 상세 조회 페이지 (상품코드/바코드/표준코드 매핑)"""
return render_template('admin_sales_detail.html')
# ===== 제품 검색 페이지 =====
@app.route('/admin/products')
def admin_products():
"""제품 검색 페이지 (전체 재고에서 검색, QR 인쇄)"""
return render_template('admin_products.html')
@app.route('/admin/members')
def admin_members():
"""회원 검색 페이지 (팜IT3000 CD_PERSON, 알림톡/SMS 발송)"""
return render_template('admin_members.html')
@app.route('/api/products')
def api_products():
"""
제품 검색 API
- 상품명, 상품코드, 바코드로 검색
- 세트상품 바코드도 CD_ITEM_UNIT_MEMBER에서 조회
"""
search = request.args.get('search', '').strip()
limit = int(request.args.get('limit', 100))
if not search or len(search) < 2:
return jsonify({'success': False, 'error': '검색어는 2글자 이상 입력하세요'})
try:
drug_session = db_manager.get_session('PM_DRUG')
# 제품 검색 쿼리
# CD_GOODS.BARCODE가 없으면 CD_ITEM_UNIT_MEMBER.CD_CD_BARCODE 사용 (세트상품)
# 세트상품 여부 확인: CD_item_set에 SetCode로 존재하면 세트상품
products_query = text(f"""
SELECT TOP {limit}
G.DrugCode as drug_code,
G.GoodsName as product_name,
COALESCE(NULLIF(G.BARCODE, ''), U.CD_CD_BARCODE, '') as barcode,
G.Saleprice as sale_price,
G.Price as cost_price,
CASE
WHEN G.SplName IS NOT NULL AND G.SplName != '' THEN G.SplName
WHEN SET_CHK.is_set = 1 THEN N'세트상품'
ELSE ''
END as supplier,
CASE WHEN SET_CHK.is_set = 1 THEN 1 ELSE 0 END as is_set
FROM CD_GOODS G
OUTER APPLY (
SELECT TOP 1 CD_CD_BARCODE
FROM CD_ITEM_UNIT_MEMBER
WHERE DRUGCODE = G.DrugCode AND CD_CD_BARCODE IS NOT NULL AND CD_CD_BARCODE != ''
) U
OUTER APPLY (
SELECT TOP 1 1 as is_set
FROM CD_item_set
WHERE SetCode = G.DrugCode AND DrugCode = 'SET0000'
) SET_CHK
WHERE
G.GoodsName LIKE :search_like
OR G.DrugCode LIKE :search_like
OR G.BARCODE LIKE :search_like
OR U.CD_CD_BARCODE LIKE :search_like
ORDER BY G.GoodsName
""")
search_like = f'%{search}%'
rows = drug_session.execute(products_query, {
'search_like': search_like
}).fetchall()
items = []
for row in rows:
items.append({
'drug_code': row.drug_code or '',
'product_name': row.product_name or '',
'barcode': row.barcode or '',
'sale_price': float(row.sale_price) if row.sale_price else 0,
'cost_price': float(row.cost_price) if row.cost_price else 0,
'supplier': row.supplier or '',
'is_set': bool(row.is_set)
})
return jsonify({
'success': True,
'items': items,
'count': len(items)
})
except Exception as e:
logging.error(f"제품 검색 오류: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/admin/sales')
def admin_sales_pos():
"""판매 내역 페이지 (POS 스타일, 거래별 그룹핑)"""
return render_template('admin_sales_pos.html')
@app.route('/api/sales-detail')
def api_sales_detail():
"""
판매 상세 조회 API (바코드 포함)
GET /api/sales-detail?days=7&search=타이레놀&barcode=has&customer=홍길동
"""
try:
days = int(request.args.get('days', 7))
search = request.args.get('search', '').strip()
barcode_filter = request.args.get('barcode', 'all') # all, has, none
mssql_session = db_manager.get_session('PM_PRES')
drug_session = db_manager.get_session('PM_DRUG')
# 판매 내역 조회 (최근 N일)
# CD_GOODS.BARCODE가 없으면 CD_ITEM_UNIT_MEMBER.CD_CD_BARCODE 사용 (세트상품/자체등록 바코드)
# 세트상품 여부 확인: CD_item_set에 SetCode로 존재하면 세트상품
sales_query = text("""
SELECT
S.SL_DT_appl as sale_date,
S.SL_NO_order as item_order,
S.DrugCode as drug_code,
ISNULL(G.GoodsName, '알 수 없음') as product_name,
COALESCE(NULLIF(G.BARCODE, ''), U.CD_CD_BARCODE, '') as barcode,
CASE
WHEN G.SplName IS NOT NULL AND G.SplName != '' THEN G.SplName
WHEN SET_CHK.is_set = 1 THEN '세트상품'
ELSE ''
END as supplier,
ISNULL(S.QUAN, 1) as quantity,
ISNULL(S.SL_TOTAL_PRICE, 0) as total_price_db,
ISNULL(G.Saleprice, 0) as unit_price
FROM SALE_SUB S
LEFT JOIN PM_DRUG.dbo.CD_GOODS G ON S.DrugCode = G.DrugCode
OUTER APPLY (
SELECT TOP 1 CD_CD_BARCODE
FROM PM_DRUG.dbo.CD_ITEM_UNIT_MEMBER
WHERE DRUGCODE = S.DrugCode AND CD_CD_BARCODE IS NOT NULL AND CD_CD_BARCODE != ''
) U
OUTER APPLY (
SELECT TOP 1 1 as is_set
FROM PM_DRUG.dbo.CD_item_set
WHERE SetCode = S.DrugCode AND DrugCode = 'SET0000'
) SET_CHK
WHERE S.SL_DT_appl >= CONVERT(VARCHAR(8), DATEADD(DAY, -:days, GETDATE()), 112)
ORDER BY S.SL_DT_appl DESC, S.SL_NO_order DESC
""")
rows = mssql_session.execute(sales_query, {'days': days}).fetchall()
items = []
total_amount = 0
barcode_count = 0
unique_products = set()
for row in rows:
drug_code = row.drug_code or ''
product_name = row.product_name or ''
barcode = row.barcode or ''
# 검색 필터 (상품명, 코드, 바코드)
if search:
search_lower = search.lower()
if (search_lower not in product_name.lower() and
search_lower not in drug_code.lower() and
search_lower not in barcode.lower()):
continue
# 바코드 필터
if barcode_filter == 'has' and not barcode:
continue
if barcode_filter == 'none' and barcode:
continue
# 표준코드 조회 (CD_BARCODE 테이블)
standard_code = ''
if barcode:
try:
std_result = drug_session.execute(text("""
SELECT BASECODE FROM CD_BARCODE WHERE BARCODE = :barcode
"""), {'barcode': barcode}).fetchone()
if std_result and std_result[0]:
standard_code = std_result[0]
except:
pass
quantity = int(row.quantity or 1)
unit_price = float(row.unit_price or 0)
total_price_from_db = float(row.total_price_db or 0)
# DB에 합계가 있으면 사용, 없으면 계산
total_price = total_price_from_db if total_price_from_db > 0 else (quantity * unit_price)
# 날짜 포맷팅
sale_date_str = str(row.sale_date or '')
if len(sale_date_str) == 8:
sale_date_str = f"{sale_date_str[:4]}-{sale_date_str[4:6]}-{sale_date_str[6:]}"
items.append({
'sale_date': sale_date_str,
'drug_code': drug_code,
'product_name': product_name,
'barcode': barcode,
'standard_code': standard_code,
'supplier': row.supplier or '',
'quantity': quantity,
'unit_price': int(unit_price),
'total_price': int(total_price)
})
total_amount += total_price
if barcode:
barcode_count += 1
unique_products.add(drug_code)
# 바코드 매핑률 계산
barcode_rate = round(barcode_count / len(items) * 100, 1) if items else 0
return jsonify({
'success': True,
'items': items[:500], # 최대 500건
'stats': {
'total_count': len(items),
'total_amount': int(total_amount),
'barcode_rate': barcode_rate,
'unique_products': len(unique_products)
}
})
except Exception as e:
logging.error(f"판매 상세 조회 오류: {e}")
return jsonify({
'success': False,
'error': str(e)
}), 500
# ===== Claude 상태 API =====
@app.route('/api/claude-status')
def api_claude_status():
"""
Claude 사용량 상태 조회 (토큰 차감 없음)
GET /api/claude-status
GET /api/claude-status?detail=true — 전체 세션 상세 포함
Returns:
{
"ok": true,
"connected": true,
"model": "claude-opus-4-5",
"mainSession": { ... },
"summary": { ... },
"sessions": [ ... ], // detail=true 일 때만
"timestamp": "2026-02-27T09:45:00+09:00"
}
"""
try:
from services.clawdbot_client import get_claude_status
# 상세 모드 여부
detail_mode = request.args.get('detail', 'false').lower() == 'true'
status = get_claude_status()
if not status or not status.get('connected'):
return jsonify({
'ok': False,
'connected': False,
'error': status.get('error', 'Gateway 연결 실패'),
'timestamp': datetime.now(KST).isoformat()
}), 503
sessions_data = status.get('sessions', {})
sessions_list = sessions_data.get('sessions', [])
defaults = sessions_data.get('defaults', {})
# 메인 세션 찾기
main_session = None
for s in sessions_list:
if s.get('key') == 'agent:main:main':
main_session = s
break
# 전체 토큰 합계
total_tokens = sum(s.get('totalTokens', 0) for s in sessions_list)
# 메인 세션 컨텍스트 사용률
context_used = 0
context_max = defaults.get('contextTokens', 200000)
context_percent = 0
if main_session:
context_used = main_session.get('totalTokens', 0)
context_max = main_session.get('contextTokens', context_max)
if context_max > 0:
context_percent = round(context_used / context_max * 100, 1)
# 기본 응답
response = {
'ok': True,
'connected': True,
'model': f"{defaults.get('modelProvider', 'unknown')}/{defaults.get('model', 'unknown')}",
'context': {
'used': context_used,
'max': context_max,
'percent': context_percent,
'display': f"{context_used//1000}k/{context_max//1000}k ({context_percent}%)"
},
'mainSession': {
'key': main_session.get('key') if main_session else None,
'inputTokens': main_session.get('inputTokens', 0) if main_session else 0,
'outputTokens': main_session.get('outputTokens', 0) if main_session else 0,
'totalTokens': main_session.get('totalTokens', 0) if main_session else 0,
'lastChannel': main_session.get('lastChannel') if main_session else None
} if main_session else None,
'summary': {
'totalSessions': len(sessions_list),
'totalTokens': total_tokens,
'activeModel': defaults.get('model')
},
'timestamp': datetime.now(KST).isoformat()
}
# 상세 모드: 전체 세션 목록 추가
if detail_mode:
detailed_sessions = []
for s in sessions_list:
session_tokens = s.get('totalTokens', 0)
session_context_max = s.get('contextTokens', 200000)
session_percent = round(session_tokens / session_context_max * 100, 1) if session_context_max > 0 else 0
# 세션 키에서 이름 추출 (agent:main:xxx → xxx)
session_key = s.get('key', '')
session_name = session_key.split(':')[-1] if ':' in session_key else session_key
# 마지막 활동 시간
updated_at = s.get('updatedAt')
updated_str = None
if updated_at:
try:
dt = datetime.fromtimestamp(updated_at / 1000, tz=KST)
updated_str = dt.strftime('%Y-%m-%d %H:%M')
except:
pass
detailed_sessions.append({
'key': session_key,
'name': session_name,
'displayName': s.get('displayName', session_name),
'model': f"{s.get('modelProvider', 'unknown')}/{s.get('model', 'unknown')}",
'tokens': {
'input': s.get('inputTokens', 0),
'output': s.get('outputTokens', 0),
'total': session_tokens,
'contextMax': session_context_max,
'contextPercent': session_percent,
'display': f"{session_tokens//1000}k/{session_context_max//1000}k ({session_percent}%)"
},
'channel': s.get('lastChannel') or s.get('origin', {}).get('provider'),
'kind': s.get('kind'),
'updatedAt': updated_str
})
# 토큰 사용량 순으로 정렬
detailed_sessions.sort(key=lambda x: x['tokens']['total'], reverse=True)
response['sessions'] = detailed_sessions
# 모델별 통계
model_stats = {}
for s in sessions_list:
model_key = f"{s.get('modelProvider', 'unknown')}/{s.get('model', 'unknown')}"
if model_key not in model_stats:
model_stats[model_key] = {'sessions': 0, 'tokens': 0}
model_stats[model_key]['sessions'] += 1
model_stats[model_key]['tokens'] += s.get('totalTokens', 0)
response['modelStats'] = model_stats
return jsonify(response)
except Exception as e:
logging.error(f"Claude 상태 조회 오류: {e}")
return jsonify({
'ok': False,
'connected': False,
'error': str(e),
'timestamp': datetime.now(KST).isoformat()
}), 500
# =============================================================================
# 회원 검색 API (팜IT3000 CD_PERSON)
# =============================================================================
@app.route('/api/members/search')
def api_members_search():
"""
회원 검색 API
- 이름 또는 전화번호로 검색
- PM_BASE.dbo.CD_PERSON 테이블 조회
"""
search = request.args.get('q', '').strip()
search_type = request.args.get('type', 'auto') # auto, name, phone
limit = min(int(request.args.get('limit', 50)), 200)
if not search or len(search) < 2:
return jsonify({'success': False, 'error': '검색어는 2글자 이상 입력하세요'})
try:
# PM_BASE 연결
base_session = db_manager.get_session('PM_BASE')
# 검색 타입 자동 감지
if search_type == 'auto':
# 숫자만 있으면 전화번호, 아니면 이름
if search.replace('-', '').replace(' ', '').isdigit():
search_type = 'phone'
else:
search_type = 'name'
# 전화번호 정규화
phone_search = search.replace('-', '').replace(' ', '')
if search_type == 'phone':
# 전화번호 검색 (3개 컬럼 모두)
query = text(f"""
SELECT TOP {limit}
CUSCODE, PANAME, PANUM,
TEL_NO, PHONE, PHONE2,
CUSETC, EMAIL, SMS_STOP
FROM CD_PERSON
WHERE
REPLACE(REPLACE(TEL_NO, '-', ''), ' ', '') LIKE :phone
OR REPLACE(REPLACE(PHONE, '-', ''), ' ', '') LIKE :phone
OR REPLACE(REPLACE(PHONE2, '-', ''), ' ', '') LIKE :phone
ORDER BY PANAME
""")
rows = base_session.execute(query, {'phone': f'%{phone_search}%'}).fetchall()
else:
# 이름 검색
query = text(f"""
SELECT TOP {limit}
CUSCODE, PANAME, PANUM,
TEL_NO, PHONE, PHONE2,
CUSETC, EMAIL, SMS_STOP
FROM CD_PERSON
WHERE PANAME LIKE :name
ORDER BY PANAME
""")
rows = base_session.execute(query, {'name': f'%{search}%'}).fetchall()
members = []
for row in rows:
# 유효한 전화번호 찾기 (PHONE 우선, 없으면 TEL_NO, PHONE2)
phone = row.PHONE or row.TEL_NO or row.PHONE2 or ''
phone = phone.strip() if phone else ''
members.append({
'cuscode': row.CUSCODE,
'name': row.PANAME or '',
'panum': row.PANUM or '',
'phone': phone,
'tel_no': row.TEL_NO or '',
'phone1': row.PHONE or '',
'phone2': row.PHONE2 or '',
'memo': (row.CUSETC or '')[:100], # 메모 100자 제한
'email': row.EMAIL or '',
'sms_stop': row.SMS_STOP == 'Y' # SMS 수신거부 여부
})
return jsonify({
'success': True,
'items': members,
'count': len(members),
'search_type': search_type
})
except Exception as e:
logging.error(f"회원 검색 오류: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/members/<cuscode>')
def api_member_detail(cuscode):
"""회원 상세 정보 + 메모 조회"""
try:
base_session = db_manager.get_session('PM_BASE')
# 회원 정보
query = text("""
SELECT
CUSCODE, PANAME, PANUM,
TEL_NO, PHONE, PHONE2,
CUSETC, EMAIL, ADDRESS, SMS_STOP
FROM CD_PERSON
WHERE CUSCODE = :cuscode
""")
row = base_session.execute(query, {'cuscode': cuscode}).fetchone()
if not row:
return jsonify({'success': False, 'error': '회원을 찾을 수 없습니다'}), 404
member = {
'cuscode': row.CUSCODE,
'name': row.PANAME or '',
'panum': row.PANUM or '',
'phone': row.PHONE or row.TEL_NO or row.PHONE2 or '',
'tel_no': row.TEL_NO or '',
'phone1': row.PHONE or '',
'phone2': row.PHONE2 or '',
'memo': row.CUSETC or '',
'email': row.EMAIL or '',
'address': row.ADDRESS or '',
'sms_stop': row.SMS_STOP == 'Y'
}
# 상세 메모 조회
memo_query = text("""
SELECT MEMO_CODE, PHARMA_ID, MEMO_DATE, MEMO_TITLE, MEMO_Item
FROM CD_PERSON_MEMO
WHERE CUSCODE = :cuscode
ORDER BY MEMO_DATE DESC
""")
memo_rows = base_session.execute(memo_query, {'cuscode': cuscode}).fetchall()
memos = []
for m in memo_rows:
memos.append({
'memo_code': m.MEMO_CODE,
'author': m.PHARMA_ID or '',
'date': m.MEMO_DATE or '',
'title': m.MEMO_TITLE or '',
'content': m.MEMO_Item or ''
})
return jsonify({
'success': True,
'member': member,
'memos': memos
})
except Exception as e:
logging.error(f"회원 상세 조회 오류: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/members/history/<phone>')
def api_member_history(phone):
"""
회원 구매 이력 통합 조회 API
- 마일리지 적립/사용 내역 (SQLite)
- transaction_id로 POS 품목 조회 (MSSQL)
- 전체 구매 이력 (SALE_MAIN)
- 조제 이력 (PS_main)
"""
try:
# 전화번호 정규화
phone = phone.replace('-', '').replace(' ', '')
result = {
'success': True,
'phone': phone,
'mileage': None,
'purchases': [], # 전체 구매 이력
'prescriptions': [], # 조제 이력
'interests': [], # 관심 상품 (AI 업셀링)
'pos_customer': None
}
transaction_ids = [] # 적립된 거래번호 수집
# 1. 마일리지 내역 조회 (SQLite) - 새 연결 사용 (멀티스레드 안전)
sqlite_conn = None
try:
sqlite_conn = db_manager.get_sqlite_connection(new_connection=True)
cursor = sqlite_conn.cursor()
# 사용자 정보 조회
cursor.execute("""
SELECT id, nickname, phone, mileage_balance, created_at
FROM users WHERE phone = ?
""", (phone,))
user = cursor.fetchone()
if user:
user_id = user['id']
# 적립/사용 내역 조회 (claim_tokens 조인으로 금액 포함)
cursor.execute("""
SELECT
ml.points, ml.balance_after, ml.reason,
ml.description, ml.transaction_id, ml.created_at,
ct.total_amount
FROM mileage_ledger ml
LEFT JOIN claim_tokens ct ON ml.transaction_id = ct.transaction_id
WHERE ml.user_id = ?
ORDER BY ml.created_at DESC
LIMIT 50
""", (user_id,))
transactions = cursor.fetchall()
tx_list = []
for t in transactions:
tx_data = {
'points': t['points'],
'balance_after': t['balance_after'],
'reason': t['reason'],
'description': t['description'],
'transaction_id': t['transaction_id'],
'created_at': t['created_at'],
'total_amount': t['total_amount'],
'items': [] # 나중에 POS에서 채움
}
tx_list.append(tx_data)
# 적립 거래번호 수집
if t['transaction_id'] and t['reason'] == 'CLAIM':
transaction_ids.append(t['transaction_id'])
result['mileage'] = {
'user_id': user_id,
'name': user['nickname'] or '',
'phone': user['phone'],
'balance': user['mileage_balance'] or 0,
'member_since': user['created_at'],
'transactions': tx_list
}
# 관심 상품 조회 (AI 업셀링에서 '관심있어요' 표시한 것)
cursor.execute("""
SELECT
id, recommended_product, recommendation_message,
recommendation_reason, trigger_products, created_at
FROM ai_recommendations
WHERE user_id = ? AND status = 'interested'
ORDER BY created_at DESC
LIMIT 20
""", (user_id,))
interests = cursor.fetchall()
result['interests'] = [{
'id': i['id'],
'product': i['recommended_product'],
'message': i['recommendation_message'],
'reason': i['recommendation_reason'],
'trigger_products': i['trigger_products'],
'created_at': i['created_at']
} for i in interests]
except Exception as e:
logging.warning(f"마일리지 조회 실패: {e}")
finally:
# SQLite 연결 닫기
if sqlite_conn:
try:
sqlite_conn.close()
except:
pass
# 2. 전화번호로 POS 고객코드 조회 (MSSQL)
cuscode = None
try:
base_session = db_manager.get_session('PM_BASE')
cuscode_query = text("""
SELECT TOP 1 CUSCODE, PANAME
FROM CD_PERSON
WHERE REPLACE(REPLACE(PHONE, '-', ''), ' ', '') = :phone
OR REPLACE(REPLACE(TEL_NO, '-', ''), ' ', '') = :phone
OR REPLACE(REPLACE(PHONE2, '-', ''), ' ', '') = :phone
""")
cus_row = base_session.execute(cuscode_query, {'phone': phone}).fetchone()
if cus_row:
cuscode = cus_row.CUSCODE
result['pos_customer'] = {
'cuscode': cuscode,
'name': cus_row.PANAME
}
except Exception as e:
logging.warning(f"POS 고객 조회 실패: {e}")
# 3. transaction_id로 POS 품목 조회 (마일리지 적립 내역)
if transaction_ids and result['mileage']:
try:
pres_session = db_manager.get_session('PM_PRES')
# 각 거래번호별 품목 조회
items_by_tx = {}
for tx_id in transaction_ids[:30]: # 최대 30건
items_query = text("""
SELECT
S.DrugCode,
G.GoodsName,
S.QUAN as quantity,
S.SL_TOTAL_PRICE as price
FROM SALE_SUB S
LEFT JOIN PM_DRUG.dbo.CD_GOODS G ON S.DrugCode = G.DrugCode
WHERE S.SL_NO_order = :order_no
""")
items = pres_session.execute(items_query, {'order_no': tx_id}).fetchall()
items_by_tx[tx_id] = [{
'drug_code': item.DrugCode,
'name': item.GoodsName or '알 수 없음',
'quantity': float(item.quantity) if item.quantity else 1,
'price': float(item.price) if item.price else 0
} for item in items]
# 마일리지 내역에 품목 추가
for tx in result['mileage']['transactions']:
tx_id = tx.get('transaction_id')
if tx_id and tx_id in items_by_tx:
tx['items'] = items_by_tx[tx_id]
except Exception as e:
logging.warning(f"POS 품목 조회 실패: {e}")
# 4. 전체 구매 이력 조회 (고객코드 기준)
if cuscode:
try:
pres_session = db_manager.get_session('PM_PRES')
# 최근 60일 구매 이력
purchase_query = text("""
SELECT TOP 30
M.SL_NO_order as order_no,
M.SL_DT_appl as order_date,
M.SL_MY_total as total_amount
FROM SALE_MAIN M
WHERE M.SL_CD_custom = :cuscode
ORDER BY M.SL_DT_appl DESC, M.SL_NO_order DESC
""")
orders = pres_session.execute(purchase_query, {'cuscode': cuscode}).fetchall()
purchases = []
for order in orders:
# 품목 조회
items_query = text("""
SELECT
S.DrugCode,
G.GoodsName,
S.QUAN as quantity,
S.SL_TOTAL_PRICE as price
FROM SALE_SUB S
LEFT JOIN PM_DRUG.dbo.CD_GOODS G ON S.DrugCode = G.DrugCode
WHERE S.SL_NO_order = :order_no
""")
items = pres_session.execute(items_query, {'order_no': order.order_no}).fetchall()
purchases.append({
'order_no': order.order_no,
'date': order.order_date,
'total': float(order.total_amount) if order.total_amount else 0,
'items': [{
'drug_code': item.DrugCode,
'name': item.GoodsName or '알 수 없음',
'quantity': float(item.quantity) if item.quantity else 1,
'price': float(item.price) if item.price else 0
} for item in items]
})
result['purchases'] = purchases
except Exception as e:
logging.warning(f"전체 구매 이력 조회 실패: {e}")
# 5. 조제 이력 조회 (고객코드 기준)
if cuscode:
try:
pres_session = db_manager.get_session('PM_PRES')
# 최근 조제 이력 (최대 30건)
rx_query = text("""
SELECT TOP 30
P.PreSerial,
P.Indate,
P.Paname,
P.Drname,
P.OrderName,
P.TDAYS
FROM PS_main P
WHERE P.CusCode = :cuscode
ORDER BY P.Indate DESC, P.PreSerial DESC
""")
rxs = pres_session.execute(rx_query, {'cuscode': cuscode}).fetchall()
prescriptions = []
for rx in rxs:
# 처방 품목 조회
items_query = text("""
SELECT
S.DrugCode,
G.GoodsName,
S.Days,
S.QUAN,
S.QUAN_TIME
FROM PS_sub_pharm S
LEFT JOIN PM_DRUG.dbo.CD_GOODS G ON S.DrugCode = G.DrugCode
WHERE S.PreSerial = :pre_serial
""")
items = pres_session.execute(items_query, {'pre_serial': rx.PreSerial}).fetchall()
prescriptions.append({
'pre_serial': rx.PreSerial,
'date': rx.Indate,
'patient_name': rx.Paname,
'doctor': rx.Drname,
'hospital': rx.OrderName,
'total_days': rx.TDAYS,
'items': [{
'drug_code': item.DrugCode,
'name': item.GoodsName or '알 수 없음',
'days': float(item.Days) if item.Days else 0,
'quantity': float(item.QUAN) if item.QUAN else 0,
'times_per_day': float(item.QUAN_TIME) if item.QUAN_TIME else 0
} for item in items]
})
result['prescriptions'] = prescriptions
except Exception as e:
logging.warning(f"조제 이력 조회 실패: {e}")
return jsonify(result)
except Exception as e:
logging.error(f"회원 이력 조회 오류: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
# =============================================================================
# 알림톡/SMS 발송 API
# =============================================================================
@app.route('/api/message/send', methods=['POST'])
def api_message_send():
"""
알림톡/SMS 발송 API
Body:
{
"recipients": [{"cuscode": "", "name": "", "phone": ""}],
"message": "메시지 내용",
"type": "alimtalk" | "sms"
}
"""
try:
data = request.get_json()
if not data:
return jsonify({'success': False, 'error': '데이터가 없습니다'}), 400
recipients = data.get('recipients', [])
message = data.get('message', '')
msg_type = data.get('type', 'sms') # alimtalk 또는 sms
if not recipients:
return jsonify({'success': False, 'error': '수신자가 없습니다'}), 400
if not message:
return jsonify({'success': False, 'error': '메시지 내용이 없습니다'}), 400
# 전화번호 정규화
valid_recipients = []
for r in recipients:
phone = (r.get('phone') or '').replace('-', '').replace(' ', '')
if phone and len(phone) >= 10:
valid_recipients.append({
'cuscode': r.get('cuscode', ''),
'name': r.get('name', ''),
'phone': phone
})
if not valid_recipients:
return jsonify({'success': False, 'error': '유효한 전화번호가 없습니다'}), 400
# TODO: 실제 발송 로직 (NHN Cloud 알림톡/SMS)
# 현재는 테스트 모드로 성공 응답
results = []
for r in valid_recipients:
results.append({
'phone': r['phone'],
'name': r['name'],
'status': 'success', # 실제 발송 시 결과로 변경
'message': f'{msg_type} 발송 예약됨'
})
return jsonify({
'success': True,
'type': msg_type,
'sent_count': len(results),
'results': results,
'message': f'{len(results)}명에게 {msg_type} 발송 완료 (테스트 모드)'
})
except Exception as e:
logging.error(f"메시지 발송 오류: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
# =============================================================================
# QR 라벨 인쇄 API
# =============================================================================
@app.route('/api/qr-print', methods=['POST'])
def api_qr_print():
"""QR 라벨 인쇄 API"""
try:
from qr_printer import print_drug_qr_label
data = request.get_json()
if not data:
return jsonify({'success': False, 'error': '데이터가 없습니다'}), 400
drug_name = data.get('drug_name', '')
barcode = data.get('barcode', '')
drug_code = data.get('drug_code', '')
sale_price = data.get('sale_price', 0)
if not drug_name:
return jsonify({'success': False, 'error': '상품명이 필요합니다'}), 400
# 바코드가 없으면 drug_code 사용
qr_data = barcode if barcode else drug_code
result = print_drug_qr_label(
drug_name=drug_name,
barcode=qr_data,
sale_price=sale_price,
drug_code=drug_code,
pharmacy_name='청춘약국'
)
return jsonify(result)
except ImportError as e:
logging.error(f"QR 프린터 모듈 로드 실패: {e}")
return jsonify({'success': False, 'error': 'QR 프린터 모듈이 없습니다'}), 500
except Exception as e:
logging.error(f"QR 인쇄 오류: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/qr-preview', methods=['POST'])
def api_qr_preview():
"""QR 라벨 미리보기 API (base64 이미지 반환)"""
try:
from qr_printer import preview_qr_label
data = request.get_json()
if not data:
return jsonify({'success': False, 'error': '데이터가 없습니다'}), 400
drug_name = data.get('drug_name', '')
barcode = data.get('barcode', '')
drug_code = data.get('drug_code', '')
sale_price = data.get('sale_price', 0)
if not drug_name:
return jsonify({'success': False, 'error': '상품명이 필요합니다'}), 400
qr_data = barcode if barcode else drug_code
image_data = preview_qr_label(
drug_name=drug_name,
barcode=qr_data,
sale_price=sale_price,
drug_code=drug_code,
pharmacy_name='청춘약국'
)
return jsonify({'success': True, 'image': image_data})
except Exception as e:
logging.error(f"QR 미리보기 오류: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
def check_port_available(port: int) -> bool:
"""포트가 사용 가능한지 확인"""
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
result = sock.connect_ex(('127.0.0.1', port))
sock.close()
return result != 0 # 0이면 이미 사용 중, 0이 아니면 사용 가능
def kill_process_on_port(port: int) -> bool:
"""특정 포트를 사용하는 프로세스 종료 (Windows)"""
import subprocess
try:
# netstat으로 PID 찾기
result = subprocess.run(
f'netstat -ano | findstr ":{port}"',
shell=True, capture_output=True, text=True
)
for line in result.stdout.strip().split('\n'):
if 'LISTENING' in line:
parts = line.split()
pid = parts[-1]
if pid.isdigit():
subprocess.run(f'taskkill /F /PID {pid}', shell=True)
logging.info(f"포트 {port} 사용 중인 프로세스 종료: PID {pid}")
return True
return False
except Exception as e:
logging.error(f"프로세스 종료 실패: {e}")
return False
if __name__ == '__main__':
import os
PORT = 7001
# Flask reloader 자식 프로세스가 아닌 경우에만 체크
if os.environ.get('WERKZEUG_RUN_MAIN') != 'true':
if not check_port_available(PORT):
logging.warning(f"포트 {PORT}이 이미 사용 중입니다. 기존 프로세스를 종료합니다...")
if kill_process_on_port(PORT):
import time
time.sleep(2) # 프로세스 종료 대기
else:
logging.error(f"포트 {PORT} 해제 실패. 수동으로 확인하세요.")
# 개발 모드로 실행
app.run(host='0.0.0.0', port=PORT, debug=True)