# sms_client.py - NHN Cloud SMS API 클라이언트 import requests import json import logging from typing import Dict, List # NHN Cloud SMS 설정 (SMS 전용 앱키) SMS_CONFIG = { "BASE_URL": "https://api-sms.cloud.toast.com", "APP_KEY": "YWWBZkuJ0ck03cje", "SECRET_KEY": "jxXbBPnQN2tUL8QnEp4O3YfraGd8ZuNh", "SENDER_NO": "0334817390", # 발신번호 (033-481-7390) } logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class SMSClient: """NHN Cloud SMS 발송 클라이언트""" def __init__(self): self.base_url = SMS_CONFIG["BASE_URL"] self.app_key = SMS_CONFIG["APP_KEY"] self.secret_key = SMS_CONFIG["SECRET_KEY"] self.sender_no = SMS_CONFIG["SENDER_NO"] def _get_headers(self) -> Dict[str, str]: return { "Content-Type": "application/json;charset=UTF-8", "X-Secret-Key": self.secret_key } def send_sms(self, recipients: List[Dict], message: str) -> Dict: """ SMS 발송 Args: recipients: [{"phone": "01012345678", "name": "홍길동"}] message: 메시지 내용 (90바이트 이하 SMS, 초과시 LMS) Returns: 발송 결과 """ # 메시지 길이에 따라 SMS/LMS 결정 msg_bytes = len(message.encode('utf-8')) is_lms = msg_bytes > 90 url = f"{self.base_url}/sms/v3.0/appKeys/{self.app_key}/sender/{'mms' if is_lms else 'sms'}" # 수신자 리스트 생성 recipient_list = [] for r in recipients: phone = (r.get('phone') or '').replace('-', '').replace(' ', '') if phone and len(phone) >= 10: recipient_list.append({ "recipientNo": phone, "countryCode": "82" }) if not recipient_list: return { "success": False, "error": "유효한 수신자가 없습니다" } # 요청 데이터 data = { "body": message, "sendNo": self.sender_no, "recipientList": recipient_list } # LMS인 경우 제목 추가 if is_lms: data["title"] = "청춘약국" try: logger.info(f"SMS 발송 요청: {len(recipient_list)}명, {msg_bytes}bytes ({'LMS' if is_lms else 'SMS'})") response = requests.post( url, headers=self._get_headers(), data=json.dumps(data), timeout=30 ) result = response.json() logger.info(f"SMS 응답: {result}") header = result.get("header", {}) if header.get("isSuccessful"): body = result.get("body", {}) return { "success": True, "message": f"SMS 발송 성공 ({len(recipient_list)}명)", "type": "LMS" if is_lms else "SMS", "request_id": body.get("data", {}).get("requestId"), "sent_count": len(recipient_list) } else: return { "success": False, "error": header.get("resultMessage", "발송 실패"), "code": header.get("resultCode") } except requests.exceptions.Timeout: return {"success": False, "error": "요청 시간 초과"} except requests.exceptions.RequestException as e: logger.error(f"SMS 발송 오류: {e}") return {"success": False, "error": str(e)} except Exception as e: logger.error(f"SMS 발송 예외: {e}") return {"success": False, "error": str(e)} def check_balance(self) -> Dict: """잔여 발송량 확인""" url = f"{self.base_url}/sms/v3.0/appKeys/{self.app_key}/stats" try: response = requests.get(url, headers=self._get_headers(), timeout=10) return response.json() except Exception as e: return {"success": False, "error": str(e)} # 싱글톤 인스턴스 sms_client = SMSClient() def send_test_sms(phone: str, message: str = None) -> Dict: """테스트 SMS 발송""" if not message: message = "[청춘약국] 테스트 문자입니다. 정상 수신되었다면 회신 부탁드립니다." return sms_client.send_sms( recipients=[{"phone": phone, "name": "테스트"}], message=message ) if __name__ == "__main__": # 테스트 발송 result = send_test_sms("01027027390") print(json.dumps(result, ensure_ascii=False, indent=2))