- generate_upsell_real(): MSSQL 최근 30일 판매 TOP 40 제품 목록을 AI에 제공 - AI가 실제 약국 보유 제품 중에서만 선택하여 추천 - 실데이터 실패 시 기존 자유 생성(generate_upsell) fallback - 기존 generate_upsell은 그대로 보존 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
327 lines
12 KiB
Python
327 lines
12 KiB
Python
"""
|
|
Clawdbot Gateway Python 클라이언트
|
|
카카오톡 봇과 동일한 Gateway WebSocket API를 통해 Claude와 통신
|
|
추가 API 비용 없음 (Claude Max 구독 재활용)
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
|
|
# Windows 콘솔 UTF-8 강제 (한글 깨짐 방지)
|
|
if sys.platform == 'win32':
|
|
import io
|
|
if hasattr(sys.stdout, 'buffer'):
|
|
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
|
|
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
|
|
os.environ.setdefault('PYTHONIOENCODING', 'utf-8')
|
|
|
|
import json
|
|
import uuid
|
|
import asyncio
|
|
import logging
|
|
from pathlib import Path
|
|
|
|
import websockets
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Gateway 설정 (clawdbot.json에서 읽기)
|
|
CLAWDBOT_CONFIG_PATH = Path.home() / '.clawdbot' / 'clawdbot.json'
|
|
|
|
|
|
def _load_gateway_config():
|
|
"""clawdbot.json에서 Gateway 설정 로드"""
|
|
try:
|
|
with open(CLAWDBOT_CONFIG_PATH, 'r', encoding='utf-8') as f:
|
|
config = json.load(f)
|
|
gw = config.get('gateway', {})
|
|
return {
|
|
'port': gw.get('port', 18789),
|
|
'token': gw.get('auth', {}).get('token', ''),
|
|
}
|
|
except Exception as e:
|
|
logger.warning(f"[Clawdbot] 설정 파일 로드 실패: {e}")
|
|
return {'port': 18789, 'token': ''}
|
|
|
|
|
|
async def _ask_gateway(message, session_id='pharmacy-upsell',
|
|
system_prompt=None, timeout=60):
|
|
"""
|
|
Clawdbot Gateway WebSocket API 호출
|
|
|
|
프로토콜:
|
|
1. WS 연결
|
|
2. 서버 → connect.challenge (nonce)
|
|
3. 클라이언트 → connect 요청 (token)
|
|
4. 서버 → connect 응답 (ok)
|
|
5. 클라이언트 → agent 요청
|
|
6. 서버 → accepted (ack) → 최종 응답
|
|
|
|
Returns:
|
|
str: AI 응답 텍스트 (실패 시 None)
|
|
"""
|
|
config = _load_gateway_config()
|
|
url = f"ws://127.0.0.1:{config['port']}"
|
|
token = config['token']
|
|
|
|
try:
|
|
async with websockets.connect(url, max_size=25 * 1024 * 1024,
|
|
close_timeout=5) as ws:
|
|
# 1. connect.challenge 대기
|
|
nonce = None
|
|
challenge_msg = await asyncio.wait_for(ws.recv(), timeout=10)
|
|
challenge = json.loads(challenge_msg)
|
|
if challenge.get('event') == 'connect.challenge':
|
|
nonce = challenge.get('payload', {}).get('nonce')
|
|
|
|
# 2. connect 요청
|
|
connect_id = str(uuid.uuid4())
|
|
connect_frame = {
|
|
'type': 'req',
|
|
'id': connect_id,
|
|
'method': 'connect',
|
|
'params': {
|
|
'minProtocol': 3,
|
|
'maxProtocol': 3,
|
|
'client': {
|
|
'id': 'gateway-client',
|
|
'displayName': 'Pharmacy Upsell',
|
|
'version': '1.0.0',
|
|
'platform': 'win32',
|
|
'mode': 'backend',
|
|
'instanceId': str(uuid.uuid4()),
|
|
},
|
|
'caps': [],
|
|
'auth': {
|
|
'token': token,
|
|
},
|
|
'role': 'operator',
|
|
'scopes': ['operator.admin'],
|
|
}
|
|
}
|
|
await ws.send(json.dumps(connect_frame))
|
|
|
|
# 3. connect 응답 대기
|
|
while True:
|
|
msg = await asyncio.wait_for(ws.recv(), timeout=10)
|
|
data = json.loads(msg)
|
|
if data.get('id') == connect_id:
|
|
if not data.get('ok'):
|
|
error = data.get('error', {}).get('message', 'unknown')
|
|
logger.warning(f"[Clawdbot] connect 실패: {error}")
|
|
return None
|
|
break # 연결 성공
|
|
|
|
# 4. agent 요청
|
|
agent_id = str(uuid.uuid4())
|
|
agent_params = {
|
|
'message': message,
|
|
'sessionId': session_id,
|
|
'sessionKey': session_id,
|
|
'timeout': timeout,
|
|
'idempotencyKey': str(uuid.uuid4()),
|
|
}
|
|
if system_prompt:
|
|
agent_params['extraSystemPrompt'] = system_prompt
|
|
|
|
agent_frame = {
|
|
'type': 'req',
|
|
'id': agent_id,
|
|
'method': 'agent',
|
|
'params': agent_params,
|
|
}
|
|
await ws.send(json.dumps(agent_frame))
|
|
|
|
# 5. agent 응답 대기 (accepted → final)
|
|
while True:
|
|
msg = await asyncio.wait_for(ws.recv(), timeout=timeout + 30)
|
|
data = json.loads(msg)
|
|
|
|
# 이벤트 무시 (tick 등)
|
|
if data.get('event'):
|
|
continue
|
|
|
|
# 우리 요청에 대한 응답인지 확인
|
|
if data.get('id') != agent_id:
|
|
continue
|
|
|
|
payload = data.get('payload', {})
|
|
status = payload.get('status')
|
|
|
|
# accepted는 대기
|
|
if status == 'accepted':
|
|
continue
|
|
|
|
# 최종 응답
|
|
if data.get('ok'):
|
|
payloads = payload.get('result', {}).get('payloads', [])
|
|
text = '\n'.join(p.get('text', '') for p in payloads if p.get('text'))
|
|
return text or None
|
|
else:
|
|
error = data.get('error', {}).get('message', 'unknown')
|
|
logger.warning(f"[Clawdbot] agent 실패: {error}")
|
|
return None
|
|
|
|
except asyncio.TimeoutError:
|
|
logger.warning("[Clawdbot] Gateway 타임아웃")
|
|
return None
|
|
except (ConnectionRefusedError, OSError) as e:
|
|
logger.warning(f"[Clawdbot] Gateway 연결 실패 (꺼져있음?): {e}")
|
|
return None
|
|
except Exception as e:
|
|
logger.warning(f"[Clawdbot] Gateway 오류: {e}")
|
|
return None
|
|
|
|
|
|
def ask_clawdbot(message, session_id='pharmacy-upsell',
|
|
system_prompt=None, timeout=60):
|
|
"""
|
|
동기 래퍼: Flask에서 직접 호출 가능
|
|
|
|
Args:
|
|
message: 사용자 메시지
|
|
session_id: 세션 ID (대화 구분용)
|
|
system_prompt: 추가 시스템 프롬프트
|
|
timeout: 타임아웃 (초)
|
|
|
|
Returns:
|
|
str: AI 응답 텍스트 (실패 시 None)
|
|
"""
|
|
try:
|
|
loop = asyncio.new_event_loop()
|
|
result = loop.run_until_complete(
|
|
_ask_gateway(message, session_id, system_prompt, timeout)
|
|
)
|
|
loop.close()
|
|
return result
|
|
except Exception as e:
|
|
logger.warning(f"[Clawdbot] 호출 실패: {e}")
|
|
return None
|
|
|
|
|
|
# 업셀링 전용 ──────────────────────────────────────
|
|
|
|
UPSELL_SYSTEM_PROMPT = """당신은 동네 약국(청춘약국)의 친절한 약사입니다.
|
|
고객의 구매 이력을 보고, 자연스럽고 따뜻한 톤으로 약 하나를 추천합니다.
|
|
강압적이거나 광고 같은 느낌이 아닌, 진심으로 건강을 걱정하는 약사의 말투로 작성해주세요.
|
|
반드시 아래 JSON 형식으로만 응답하세요. 다른 텍스트 없이 JSON만 출력하세요."""
|
|
|
|
|
|
def generate_upsell(user_name, current_items, recent_products):
|
|
"""
|
|
업셀링 추천 생성
|
|
|
|
Args:
|
|
user_name: 고객명
|
|
current_items: 오늘 구매 품목 문자열 (예: "타이레놀, 챔프 시럽")
|
|
recent_products: 최근 구매 이력 문자열
|
|
|
|
Returns:
|
|
dict: {'product': '...', 'reason': '...', 'message': '...'} 또는 None
|
|
"""
|
|
prompt = f"""고객 이름: {user_name}
|
|
오늘 구매한 약: {current_items}
|
|
최근 구매 이력: {recent_products}
|
|
|
|
위 정보를 바탕으로 이 고객에게 추천할 약품 하나를 제안해주세요.
|
|
|
|
규칙:
|
|
1. 오늘 구매한 약과 함께 먹으면 좋거나, 구매 패턴상 필요해보이는 약 1가지만 추천
|
|
2. 실제 약국에서 판매하는 일반의약품/건강기능식품만 추천 (처방약 제외)
|
|
3. 메시지는 2문장 이내, 따뜻하고 자연스러운 톤
|
|
4. 구체적인 제품명 사용 (예: "비타민C 1000", "오메가3" 등)
|
|
|
|
응답은 반드시 아래 JSON 형식으로만:
|
|
{{"product": "추천 제품명", "reason": "추천 이유 (내부용, 1문장)", "message": "{user_name}님, [오늘 구매 품목]과 함께 [추천약]도 추천드려요. [간단한 이유]."}}"""
|
|
|
|
response_text = ask_clawdbot(
|
|
prompt,
|
|
session_id=f'upsell-{user_name}',
|
|
system_prompt=UPSELL_SYSTEM_PROMPT,
|
|
timeout=30
|
|
)
|
|
|
|
if not response_text:
|
|
return None
|
|
|
|
return _parse_upsell_response(response_text)
|
|
|
|
|
|
UPSELL_REAL_SYSTEM_PROMPT = """당신은 동네 약국(청춘약국)의 친절한 약사입니다.
|
|
고객의 구매 이력을 보고, 약국에 실제로 있는 제품 중에서 하나를 추천합니다.
|
|
반드시 [약국 보유 제품 목록]에 있는 제품명을 그대로 사용하세요.
|
|
목록에 없는 제품은 절대 추천하지 마세요.
|
|
강압적이거나 광고 같은 느낌이 아닌, 진심으로 건강을 걱정하는 약사의 말투로 작성해주세요.
|
|
반드시 아래 JSON 형식으로만 응답하세요. 다른 텍스트 없이 JSON만 출력하세요."""
|
|
|
|
|
|
def generate_upsell_real(user_name, current_items, recent_products, available_products):
|
|
"""
|
|
실데이터 기반 업셀링 추천 생성
|
|
available_products: 약국 보유 제품 리스트 [{'name': ..., 'price': ..., 'sales': ...}, ...]
|
|
"""
|
|
product_list = '\n'.join(
|
|
f"- {p['name']} ({int(p['price'])}원, 최근 {p['sales']}건 판매)"
|
|
for p in available_products if p.get('name')
|
|
)
|
|
|
|
prompt = f"""고객 이름: {user_name}
|
|
오늘 구매한 약: {current_items}
|
|
최근 구매 이력: {recent_products}
|
|
|
|
[약국 보유 제품 목록 — 이 중에서만 추천하세요]
|
|
{product_list}
|
|
|
|
규칙:
|
|
1. 위 목록에 있는 제품 중 오늘 구매한 약과 함께 먹으면 좋거나, 구매 패턴상 필요해보이는 약 1가지만 추천
|
|
2. 오늘 이미 구매한 제품은 추천하지 마세요
|
|
3. 메시지는 2문장 이내, 따뜻하고 자연스러운 톤
|
|
4. product 필드에는 목록에 있는 제품명을 정확히 그대로 적어주세요
|
|
|
|
응답은 반드시 아래 JSON 형식으로만:
|
|
{{"product": "목록에 있는 정확한 제품명", "reason": "추천 이유 (내부용, 1문장)", "message": "{user_name}님, [추천 메시지 2문장 이내]"}}"""
|
|
|
|
response_text = ask_clawdbot(
|
|
prompt,
|
|
session_id=f'upsell-real-{user_name}',
|
|
system_prompt=UPSELL_REAL_SYSTEM_PROMPT,
|
|
timeout=30
|
|
)
|
|
|
|
if not response_text:
|
|
return None
|
|
|
|
return _parse_upsell_response(response_text)
|
|
|
|
|
|
def _parse_upsell_response(text):
|
|
"""AI 응답에서 JSON 추출"""
|
|
import re
|
|
try:
|
|
# ```json ... ``` 블록 추출 시도
|
|
json_match = re.search(r'```json\s*(\{.*?\})\s*```', text, re.DOTALL)
|
|
if json_match:
|
|
json_str = json_match.group(1)
|
|
else:
|
|
# 직접 JSON 파싱 시도
|
|
start = text.find('{')
|
|
end = text.rfind('}')
|
|
if start >= 0 and end > start:
|
|
json_str = text[start:end + 1]
|
|
else:
|
|
return None
|
|
|
|
data = json.loads(json_str)
|
|
|
|
if 'product' not in data or 'message' not in data:
|
|
return None
|
|
|
|
return {
|
|
'product': data['product'],
|
|
'reason': data.get('reason', ''),
|
|
'message': data['message'],
|
|
}
|
|
except (json.JSONDecodeError, Exception) as e:
|
|
logger.warning(f"[Clawdbot] 업셀 응답 파싱 실패: {e}")
|
|
return None
|