feat: AI 업셀링 CRM - Clawdbot Gateway 기반 맞춤 추천 시스템

키오스크 적립 시 Clawdbot Gateway(Claude Max)를 통해 구매 이력 기반
맞춤 제품 추천을 생성하고, 마이페이지 방문 시 바텀시트 팝업으로 표시.

- ai_recommendations SQLite 테이블 추가 (스키마 + 마이그레이션)
- clawdbot_client.py: Gateway WebSocket 프로토콜 v3 Python 클라이언트
- app.py: 추천 생성 + GET/POST API 엔드포인트
- my_page.html: 바텀시트 UI (슬라이드업 애니메이션, 1.5초 후 자동 표시)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
thug0bin 2026-02-26 19:57:03 +09:00
parent a3ff69b67f
commit b5a99f7b3b
5 changed files with 506 additions and 1 deletions

View File

@ -1109,7 +1109,7 @@ def my_page():
tx_dict['created_at'] = utc_to_kst_str(tx['created_at'])
transactions.append(tx_dict)
return render_template('my_page.html', user=user, transactions=transactions)
return render_template('my_page.html', user=user, transactions=transactions, user_id=user['id'])
# ============================================================================
@ -1885,6 +1885,137 @@ def admin():
recent_tokens=recent_tokens)
# ============================================================================
# AI 업셀링 추천
# ============================================================================
def _generate_upsell_recommendation(user_id, transaction_id, sale_items, user_name):
"""키오스크 적립 후 AI 업셀링 추천 생성 (fire-and-forget)"""
from services.clawdbot_client import generate_upsell
if not sale_items:
return
# 현재 구매 품목
current_items = ', '.join(item['name'] for item in sale_items if item.get('name'))
if not current_items:
return
# 최근 구매 이력 수집
recent_products = current_items # 기본값
try:
conn = db_manager.get_sqlite_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT ct.transaction_id
FROM claim_tokens ct
WHERE ct.claimed_by_user_id = ? AND ct.transaction_id != ?
ORDER BY ct.claimed_at DESC LIMIT 5
""", (user_id, transaction_id))
recent_tokens = cursor.fetchall()
if recent_tokens:
all_products = []
mssql_session = db_manager.get_session('PM_PRES')
for token in recent_tokens:
rows = mssql_session.execute(text("""
SELECT ISNULL(G.GoodsName, '') AS goods_name
FROM SALE_SUB S
LEFT JOIN PM_DRUG.dbo.CD_GOODS G ON S.DrugCode = G.DrugCode
WHERE S.SL_NO_order = :tid
"""), {'tid': token['transaction_id']}).fetchall()
for r in rows:
if r.goods_name:
all_products.append(r.goods_name)
if all_products:
recent_products = ', '.join(set(all_products))
except Exception as e:
logging.warning(f"[AI추천] 구매 이력 수집 실패 (현재 품목만 사용): {e}")
# Claude로 추천 생성
logging.info(f"[AI추천] 생성 시작: user={user_name}, items={current_items}")
rec = generate_upsell(user_name, current_items, recent_products)
if not rec:
logging.warning("[AI추천] 생성 실패 (AI 응답 없음)")
return
# SQLite에 저장
try:
conn = db_manager.get_sqlite_connection()
cursor = conn.cursor()
expires_at = (datetime.now() + timedelta(days=7)).strftime('%Y-%m-%d %H:%M:%S')
cursor.execute("""
INSERT INTO ai_recommendations
(user_id, transaction_id, recommended_product, recommendation_message,
recommendation_reason, trigger_products, ai_raw_response, expires_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
user_id, transaction_id,
rec['product'], rec['message'], rec['reason'],
json.dumps([item['name'] for item in sale_items], ensure_ascii=False),
json.dumps(rec, ensure_ascii=False),
expires_at
))
conn.commit()
logging.info(f"[AI추천] 저장 완료: user_id={user_id}, product={rec['product']}")
except Exception as e:
logging.warning(f"[AI추천] DB 저장 실패: {e}")
@app.route('/api/recommendation/<int:user_id>')
def api_get_recommendation(user_id):
"""마이페이지용 AI 추천 조회"""
conn = db_manager.get_sqlite_connection()
cursor = conn.cursor()
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
cursor.execute("""
SELECT id, recommended_product, recommendation_message, created_at
FROM ai_recommendations
WHERE user_id = ? AND status = 'active'
AND (expires_at IS NULL OR expires_at > ?)
ORDER BY created_at DESC LIMIT 1
""", (user_id, now))
rec = cursor.fetchone()
if not rec:
return jsonify({'success': True, 'has_recommendation': False})
# 표시 횟수 업데이트
cursor.execute("""
UPDATE ai_recommendations
SET displayed_count = displayed_count + 1,
displayed_at = COALESCE(displayed_at, ?)
WHERE id = ?
""", (now, rec['id']))
conn.commit()
return jsonify({
'success': True,
'has_recommendation': True,
'recommendation': {
'id': rec['id'],
'product': rec['recommended_product'],
'message': rec['recommendation_message']
}
})
@app.route('/api/recommendation/<int:rec_id>/dismiss', methods=['POST'])
def api_dismiss_recommendation(rec_id):
"""추천 닫기"""
conn = db_manager.get_sqlite_connection()
cursor = conn.cursor()
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
cursor.execute("""
UPDATE ai_recommendations SET status = 'dismissed', dismissed_at = ?
WHERE id = ?
""", (now, rec_id))
conn.commit()
return jsonify({'success': True})
# ============================================================================
# 알림톡 로그
# ============================================================================
@ -2195,6 +2326,12 @@ def api_kiosk_claim():
except Exception as alimtalk_err:
logging.warning(f"[알림톡] 발송 예외 (적립은 완료): {alimtalk_err}")
# AI 업셀링 추천 생성 (fire-and-forget)
try:
_generate_upsell_recommendation(user_id, transaction_id, sale_items, user_name)
except Exception as rec_err:
logging.warning(f"[AI추천] 생성 예외 (적립은 완료): {rec_err}")
return jsonify({
'success': True,
'message': f'{claimed_points}P 적립 완료!',

View File

@ -269,6 +269,33 @@ class DatabaseManager:
self.sqlite_conn.commit()
print("[DB Manager] SQLite 마이그레이션: alimtalk_logs 테이블 생성")
# ai_recommendations 테이블 생성
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='ai_recommendations'")
if not cursor.fetchone():
cursor.executescript("""
CREATE TABLE IF NOT EXISTS ai_recommendations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
transaction_id VARCHAR(20),
recommended_product TEXT NOT NULL,
recommendation_message TEXT NOT NULL,
recommendation_reason TEXT,
trigger_products TEXT,
ai_raw_response TEXT,
status VARCHAR(20) DEFAULT 'active',
displayed_count INTEGER DEFAULT 0,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
expires_at DATETIME,
displayed_at DATETIME,
dismissed_at DATETIME,
FOREIGN KEY (user_id) REFERENCES users(id)
);
CREATE INDEX IF NOT EXISTS idx_rec_user_status ON ai_recommendations(user_id, status);
CREATE INDEX IF NOT EXISTS idx_rec_expires ON ai_recommendations(expires_at);
""")
self.sqlite_conn.commit()
print("[DB Manager] SQLite 마이그레이션: ai_recommendations 테이블 생성")
def test_connection(self, database='PM_BASE'):
"""연결 테스트"""
try:

View File

@ -98,3 +98,25 @@ CREATE TABLE IF NOT EXISTS alimtalk_logs (
CREATE INDEX IF NOT EXISTS idx_alimtalk_created ON alimtalk_logs(created_at);
CREATE INDEX IF NOT EXISTS idx_alimtalk_recipient ON alimtalk_logs(recipient_no);
-- 7. AI 추천 테이블
CREATE TABLE IF NOT EXISTS ai_recommendations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
transaction_id VARCHAR(20),
recommended_product TEXT NOT NULL,
recommendation_message TEXT NOT NULL,
recommendation_reason TEXT,
trigger_products TEXT,
ai_raw_response TEXT,
status VARCHAR(20) DEFAULT 'active',
displayed_count INTEGER DEFAULT 0,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
expires_at DATETIME,
displayed_at DATETIME,
dismissed_at DATETIME,
FOREIGN KEY (user_id) REFERENCES users(id)
);
CREATE INDEX IF NOT EXISTS idx_rec_user_status ON ai_recommendations(user_id, status);
CREATE INDEX IF NOT EXISTS idx_rec_expires ON ai_recommendations(expires_at);

View File

@ -0,0 +1,268 @@
"""
Clawdbot Gateway Python 클라이언트
카카오톡 봇과 동일한 Gateway WebSocket API를 통해 Claude와 통신
추가 API 비용 없음 (Claude Max 구독 재활용)
"""
import json
import uuid
import asyncio
import logging
from pathlib import Path
import websockets
logger = logging.getLogger(__name__)
# Gateway 설정 (clawdbot.json에서 읽기)
CLAWDBOT_CONFIG_PATH = Path.home() / '.clawdbot' / 'clawdbot.json'
def _load_gateway_config():
"""clawdbot.json에서 Gateway 설정 로드"""
try:
with open(CLAWDBOT_CONFIG_PATH, 'r', encoding='utf-8') as f:
config = json.load(f)
gw = config.get('gateway', {})
return {
'port': gw.get('port', 18789),
'token': gw.get('auth', {}).get('token', ''),
}
except Exception as e:
logger.warning(f"[Clawdbot] 설정 파일 로드 실패: {e}")
return {'port': 18789, 'token': ''}
async def _ask_gateway(message, session_id='pharmacy-upsell',
system_prompt=None, timeout=60):
"""
Clawdbot Gateway WebSocket API 호출
프로토콜:
1. WS 연결
2. 서버 connect.challenge (nonce)
3. 클라이언트 connect 요청 (token)
4. 서버 connect 응답 (ok)
5. 클라이언트 agent 요청
6. 서버 accepted (ack) 최종 응답
Returns:
str: AI 응답 텍스트 (실패 None)
"""
config = _load_gateway_config()
url = f"ws://127.0.0.1:{config['port']}"
token = config['token']
try:
async with websockets.connect(url, max_size=25 * 1024 * 1024,
close_timeout=5) as ws:
# 1. connect.challenge 대기
nonce = None
challenge_msg = await asyncio.wait_for(ws.recv(), timeout=10)
challenge = json.loads(challenge_msg)
if challenge.get('event') == 'connect.challenge':
nonce = challenge.get('payload', {}).get('nonce')
# 2. connect 요청
connect_id = str(uuid.uuid4())
connect_frame = {
'type': 'req',
'id': connect_id,
'method': 'connect',
'params': {
'minProtocol': 3,
'maxProtocol': 3,
'client': {
'id': 'gateway-client',
'displayName': 'Pharmacy Upsell',
'version': '1.0.0',
'platform': 'win32',
'mode': 'backend',
'instanceId': str(uuid.uuid4()),
},
'caps': [],
'auth': {
'token': token,
},
'role': 'operator',
'scopes': ['operator.admin'],
}
}
await ws.send(json.dumps(connect_frame))
# 3. connect 응답 대기
while True:
msg = await asyncio.wait_for(ws.recv(), timeout=10)
data = json.loads(msg)
if data.get('id') == connect_id:
if not data.get('ok'):
error = data.get('error', {}).get('message', 'unknown')
logger.warning(f"[Clawdbot] connect 실패: {error}")
return None
break # 연결 성공
# 4. agent 요청
agent_id = str(uuid.uuid4())
agent_params = {
'message': message,
'sessionId': session_id,
'sessionKey': session_id,
'timeout': timeout,
'idempotencyKey': str(uuid.uuid4()),
}
if system_prompt:
agent_params['extraSystemPrompt'] = system_prompt
agent_frame = {
'type': 'req',
'id': agent_id,
'method': 'agent',
'params': agent_params,
}
await ws.send(json.dumps(agent_frame))
# 5. agent 응답 대기 (accepted → final)
while True:
msg = await asyncio.wait_for(ws.recv(), timeout=timeout + 30)
data = json.loads(msg)
# 이벤트 무시 (tick 등)
if data.get('event'):
continue
# 우리 요청에 대한 응답인지 확인
if data.get('id') != agent_id:
continue
payload = data.get('payload', {})
status = payload.get('status')
# accepted는 대기
if status == 'accepted':
continue
# 최종 응답
if data.get('ok'):
payloads = payload.get('result', {}).get('payloads', [])
text = '\n'.join(p.get('text', '') for p in payloads if p.get('text'))
return text or None
else:
error = data.get('error', {}).get('message', 'unknown')
logger.warning(f"[Clawdbot] agent 실패: {error}")
return None
except asyncio.TimeoutError:
logger.warning("[Clawdbot] Gateway 타임아웃")
return None
except (ConnectionRefusedError, OSError) as e:
logger.warning(f"[Clawdbot] Gateway 연결 실패 (꺼져있음?): {e}")
return None
except Exception as e:
logger.warning(f"[Clawdbot] Gateway 오류: {e}")
return None
def ask_clawdbot(message, session_id='pharmacy-upsell',
system_prompt=None, timeout=60):
"""
동기 래퍼: Flask에서 직접 호출 가능
Args:
message: 사용자 메시지
session_id: 세션 ID (대화 구분용)
system_prompt: 추가 시스템 프롬프트
timeout: 타임아웃 ()
Returns:
str: AI 응답 텍스트 (실패 None)
"""
try:
loop = asyncio.new_event_loop()
result = loop.run_until_complete(
_ask_gateway(message, session_id, system_prompt, timeout)
)
loop.close()
return result
except Exception as e:
logger.warning(f"[Clawdbot] 호출 실패: {e}")
return None
# 업셀링 전용 ──────────────────────────────────────
UPSELL_SYSTEM_PROMPT = """당신은 동네 약국(청춘약국)의 친절한 약사입니다.
고객의 구매 이력을 보고, 자연스럽고 따뜻한 톤으로 하나를 추천합니다.
강압적이거나 광고 같은 느낌이 아닌, 진심으로 건강을 걱정하는 약사의 말투로 작성해주세요.
반드시 아래 JSON 형식으로만 응답하세요. 다른 텍스트 없이 JSON만 출력하세요."""
def generate_upsell(user_name, current_items, recent_products):
"""
업셀링 추천 생성
Args:
user_name: 고객명
current_items: 오늘 구매 품목 문자열 (: "타이레놀, 챔프 시럽")
recent_products: 최근 구매 이력 문자열
Returns:
dict: {'product': '...', 'reason': '...', 'message': '...'} 또는 None
"""
prompt = f"""고객 이름: {user_name}
오늘 구매한 : {current_items}
최근 구매 이력: {recent_products}
정보를 바탕으로 고객에게 추천할 약품 하나를 제안해주세요.
규칙:
1. 오늘 구매한 약과 함께 먹으면 좋거나, 구매 패턴상 필요해보이는 1가지만 추천
2. 실제 약국에서 판매하는 일반의약품/건강기능식품만 추천 (처방약 제외)
3. 메시지는 2문장 이내, 따뜻하고 자연스러운
4. 구체적인 제품명 사용 (: "비타민C 1000", "오메가3" )
응답은 반드시 아래 JSON 형식으로만:
{{"product": "추천 제품명", "reason": "추천 이유 (내부용, 1문장)", "message": "{user_name}님, [오늘 구매 품목]과 함께 [추천약]도 추천드려요. [간단한 이유]."}}"""
response_text = ask_clawdbot(
prompt,
session_id=f'upsell-{user_name}',
system_prompt=UPSELL_SYSTEM_PROMPT,
timeout=30
)
if not response_text:
return None
return _parse_upsell_response(response_text)
def _parse_upsell_response(text):
"""AI 응답에서 JSON 추출"""
import re
try:
# ```json ... ``` 블록 추출 시도
json_match = re.search(r'```json\s*(\{.*?\})\s*```', text, re.DOTALL)
if json_match:
json_str = json_match.group(1)
else:
# 직접 JSON 파싱 시도
start = text.find('{')
end = text.rfind('}')
if start >= 0 and end > start:
json_str = text[start:end + 1]
else:
return None
data = json.loads(json_str)
if 'product' not in data or 'message' not in data:
return None
return {
'product': data['product'],
'reason': data.get('reason', ''),
'message': data['message'],
}
except (json.JSONDecodeError, Exception) as e:
logger.warning(f"[Clawdbot] 업셀 응답 파싱 실패: {e}")
return None

View File

@ -392,6 +392,57 @@
}
}
</script>
<!-- AI 추천 바텀시트 -->
<div id="rec-sheet" style="display:none;">
<div id="rec-backdrop" style="position:fixed;top:0;left:0;right:0;bottom:0;background:rgba(0,0,0,0.3);z-index:999;animation:recFadeIn .3s ease;"></div>
<div id="rec-content" style="position:fixed;bottom:0;left:50%;transform:translateX(-50%);width:100%;max-width:420px;background:#fff;border-radius:24px 24px 0 0;padding:12px 24px 32px;box-shadow:0 -8px 32px rgba(0,0,0,0.12);z-index:1000;animation:recSlideUp .4s cubic-bezier(.16,1,.3,1);">
<div style="width:40px;height:4px;background:#dee2e6;border-radius:2px;margin:0 auto 20px;"></div>
<div style="text-align:center;padding:8px 0 20px;">
<div style="font-size:48px;margin-bottom:16px;">💊</div>
<div id="rec-message" style="color:#343a40;font-size:16px;font-weight:500;line-height:1.6;letter-spacing:-0.3px;margin-bottom:16px;"></div>
<div id="rec-product" style="display:inline-block;background:linear-gradient(135deg,#6366f1,#8b5cf6);color:#fff;font-size:14px;font-weight:600;padding:8px 20px;border-radius:20px;letter-spacing:-0.2px;"></div>
</div>
<div style="display:flex;gap:12px;padding-bottom:env(safe-area-inset-bottom,0);">
<button onclick="dismissRec()" style="flex:1;padding:14px;border:1px solid #dee2e6;border-radius:14px;background:#fff;color:#868e96;font-size:15px;font-weight:600;cursor:pointer;font-family:inherit;">다음에요</button>
<button onclick="dismissRec()" style="flex:2;padding:14px;border:none;border-radius:14px;background:linear-gradient(135deg,#6366f1,#8b5cf6);color:#fff;font-size:15px;font-weight:600;cursor:pointer;font-family:inherit;">관심있어요!</button>
</div>
</div>
</div>
<style>
@keyframes recFadeIn { from{opacity:0} to{opacity:1} }
@keyframes recSlideUp { from{transform:translate(-50%,100%)} to{transform:translate(-50%,0)} }
@keyframes recSlideDown { from{transform:translate(-50%,0)} to{transform:translate(-50%,100%)} }
</style>
<script>
let _recId = null;
window.addEventListener('load', function() {
{% if user_id %}
setTimeout(async function() {
try {
const res = await fetch('/api/recommendation/{{ user_id }}');
const data = await res.json();
if (data.success && data.has_recommendation) {
_recId = data.recommendation.id;
document.getElementById('rec-message').textContent = data.recommendation.message;
document.getElementById('rec-product').textContent = data.recommendation.product;
document.getElementById('rec-sheet').style.display = 'block';
document.getElementById('rec-backdrop').onclick = dismissRec;
}
} catch(e) {}
}, 1500);
{% endif %}
});
function dismissRec() {
const c = document.getElementById('rec-content');
const b = document.getElementById('rec-backdrop');
c.style.animation = 'recSlideDown .3s ease forwards';
b.style.opacity = '0';
b.style.transition = 'opacity .3s';
setTimeout(function(){ document.getElementById('rec-sheet').style.display='none'; }, 300);
if (_recId) fetch('/api/recommendation/' + _recId + '/dismiss', {method:'POST'}).catch(function(){});
}
</script>
<script>if('serviceWorker' in navigator){navigator.serviceWorker.register('/sw.js').catch(()=>{});}</script>
</body>
</html>