""" 새로운 데이터베이스 유틸리티 - Headscale과 분리된 FARMQ 전용 외래키 제약조건 없이 능동적으로 데이터를 관리 """ import os from typing import Optional, List, Dict, Any from datetime import datetime, timedelta from sqlalchemy import create_engine, text, and_, or_, desc from sqlalchemy.orm import sessionmaker, Session from models.farmq_models import ( PharmacyInfo, MachineProfile, MonitoringMetrics, SystemAlert, FarmqDatabaseManager, create_farmq_database_manager, FarmqBase ) from models.headscale_models import User, Node, PreAuthKey, ApiKey # 전역 데이터베이스 매니저들 farmq_manager: Optional[FarmqDatabaseManager] = None headscale_engine = None headscale_session_maker = None def init_databases(headscale_db_uri: str, farmq_db_uri: str = None): """두 개의 데이터베이스 초기화""" global farmq_manager, headscale_engine, headscale_session_maker # FARMQ 전용 데이터베이스 (외래키 제약조건 없음) if farmq_db_uri is None: farmq_db_uri = "sqlite:///farmq-admin/farmq.sqlite" farmq_manager = create_farmq_database_manager(farmq_db_uri) print(f"✅ FARMQ 데이터베이스 초기화: {farmq_db_uri}") # Headscale 읽기 전용 데이터베이스 headscale_engine = create_engine(headscale_db_uri, echo=False) headscale_session_maker = sessionmaker(bind=headscale_engine) print(f"✅ Headscale 데이터베이스 연결: {headscale_db_uri}") def get_farmq_session() -> Session: """FARMQ 데이터베이스 세션 가져오기""" if farmq_manager is None: raise RuntimeError("FARMQ database not initialized") return farmq_manager.get_session() def get_headscale_session() -> Session: """Headscale 데이터베이스 세션 가져오기 (읽기 전용)""" if headscale_session_maker is None: raise RuntimeError("Headscale database not initialized") return headscale_session_maker() def close_session(session: Session): """세션 종료""" if session: session.close() # ========================================== # Dashboard Statistics # ========================================== def get_dashboard_stats() -> Dict[str, Any]: """대시보드 통계 조회""" farmq_session = get_farmq_session() headscale_session = get_headscale_session() try: # 약국 수 total_pharmacies = farmq_session.query(PharmacyInfo).filter( PharmacyInfo.status == 'active' ).count() # 머신 상태 total_machines = farmq_session.query(MachineProfile).filter( MachineProfile.status == 'active' ).count() online_machines = farmq_session.query(MachineProfile).filter( MachineProfile.status == 'active', MachineProfile.tailscale_status == 'online' ).count() offline_machines = total_machines - online_machines # 최근 알림 수 recent_alerts = farmq_session.query(SystemAlert).filter( SystemAlert.status == 'active', SystemAlert.created_at > (datetime.now() - timedelta(hours=24)) ).count() # 평균 CPU 온도 (최근 1시간) cutoff_time = datetime.now() - timedelta(hours=1) avg_temp_result = farmq_session.query( MonitoringMetrics.cpu_temperature ).filter( MonitoringMetrics.collected_at > cutoff_time, MonitoringMetrics.cpu_temperature.isnot(None) ).all() avg_cpu_temp = 0 if avg_temp_result: temps = [temp[0] for temp in avg_temp_result if temp[0] is not None] avg_cpu_temp = sum(temps) / len(temps) if temps else 0 return { 'total_pharmacies': total_pharmacies, 'total_machines': total_machines, 'online_machines': online_machines, 'offline_machines': offline_machines, 'recent_alerts': recent_alerts, 'avg_cpu_temp': round(avg_cpu_temp, 1) } finally: close_session(farmq_session) close_session(headscale_session) # ========================================== # Pharmacy Management # ========================================== def get_all_pharmacies_with_stats() -> List[Dict[str, Any]]: """모든 약국과 통계 정보 조회 - Headscale Node 데이터 사용""" farmq_session = get_farmq_session() headscale_session = get_headscale_session() try: pharmacies = farmq_session.query(PharmacyInfo).filter( PharmacyInfo.status == 'active' ).all() result = [] for pharmacy in pharmacies: # Headscale에서 해당 사용자의 머신 수 조회 user_machines = headscale_session.query(Node).join(User).filter( User.name == pharmacy.headscale_user_name, Node.deleted_at.is_(None) ).all() machine_count = len(user_machines) # 온라인 머신 수 계산 (24시간 timeout) online_count = 0 for machine in user_machines: if machine.last_seen: try: from datetime import timezone if machine.last_seen.tzinfo is not None: cutoff_time = datetime.now(timezone.utc) - timedelta(hours=24) else: cutoff_time = datetime.now() - timedelta(hours=24) if machine.last_seen > cutoff_time: online_count += 1 except Exception: online_count += 1 # 타임존 에러 시 온라인으로 간주 # 활성 알림 수 (현재는 0으로 설정, 나중에 구현) alert_count = 0 pharmacy_data = pharmacy.to_dict() pharmacy_data.update({ 'machine_count': machine_count, 'online_count': online_count, 'offline_count': machine_count - online_count, 'alert_count': alert_count }) result.append(pharmacy_data) return result finally: close_session(farmq_session) close_session(headscale_session) def get_pharmacy_detail(pharmacy_id: int) -> Optional[Dict[str, Any]]: """약국 상세 정보 조회""" farmq_session = get_farmq_session() try: pharmacy = farmq_session.query(PharmacyInfo).filter( PharmacyInfo.id == pharmacy_id ).first() if not pharmacy: return None # 약국의 머신들 조회 machines = farmq_session.query(MachineProfile).filter( MachineProfile.pharmacy_id == pharmacy_id, MachineProfile.status == 'active' ).all() machine_list = [] for machine in machines: machine_data = machine.to_dict() # 최근 모니터링 데이터 latest_metrics = farmq_session.query(MonitoringMetrics).filter( MonitoringMetrics.machine_profile_id == machine.id ).order_by(desc(MonitoringMetrics.collected_at)).first() if latest_metrics: machine_data['latest_metrics'] = latest_metrics.to_dict() machine_list.append(machine_data) return { 'pharmacy': pharmacy.to_dict(), 'machines': machine_list } finally: close_session(farmq_session) # ========================================== # Machine Management # ========================================== def get_all_machines_with_details() -> List[Dict[str, Any]]: """모든 머신 상세 정보 조회 - Headscale Node 데이터 사용""" headscale_session = get_headscale_session() farmq_session = get_farmq_session() try: # Headscale에서 모든 노드 조회 nodes = headscale_session.query(Node).filter( Node.deleted_at.is_(None) ).all() result = [] for node in nodes: # 기본 머신 정보 machine_data = { 'id': node.id, 'hostname': node.hostname, 'machine_name': node.hostname, # 표시용 이름 'tailscale_ip': node.ipv4, 'ipv6': node.ipv6, 'headscale_user_name': node.user.name if node.user else '미지정', 'user_id': node.user_id, 'last_seen': node.last_seen, 'created_at': node.created_at, 'updated_at': node.updated_at } # 온라인 상태 확인 (24시간 timeout) if node.last_seen: try: from datetime import timezone # node.last_seen이 timezone-aware인지 확인 if node.last_seen.tzinfo is not None: cutoff_time = datetime.now(timezone.utc) - timedelta(hours=24) else: cutoff_time = datetime.now() - timedelta(hours=24) machine_data['is_online'] = node.last_seen > cutoff_time except Exception as e: # 타임존 비교 에러가 발생하면 기본적으로 온라인으로 가정 print(f"Timezone comparison error for {node.hostname}: {e}") machine_data['is_online'] = True else: machine_data['is_online'] = False # 사용자 이름으로 약국 정보 찾기 machine_data['pharmacy'] = None if node.user: pharmacy = farmq_session.query(PharmacyInfo).filter( PharmacyInfo.headscale_user_name == node.user.name ).first() if pharmacy: machine_data['pharmacy'] = { 'id': pharmacy.id, 'pharmacy_name': pharmacy.pharmacy_name, 'manager_name': pharmacy.manager_name, 'business_number': pharmacy.business_number } # 마지막 접속 시간을 사람이 읽기 쉬운 형태로 machine_data['last_seen_humanized'] = humanize_datetime(node.last_seen) # 하드웨어 사양 및 모니터링 데이터는 나중에 추가 예정 machine_data['specs'] = None machine_data['latest_monitoring'] = None result.append(machine_data) return result finally: close_session(farmq_session) close_session(headscale_session) def get_machine_detail(machine_id: int) -> Optional[Dict[str, Any]]: """머신 상세 정보 조회""" farmq_session = get_farmq_session() try: machine = farmq_session.query(MachineProfile).filter( MachineProfile.id == machine_id ).first() if not machine: return None machine_data = machine.to_dict() # 약국 정보 if machine.pharmacy_id: pharmacy = farmq_session.query(PharmacyInfo).filter( PharmacyInfo.id == machine.pharmacy_id ).first() if pharmacy: machine_data['pharmacy'] = pharmacy.to_dict() # 최근 모니터링 데이터 (24시간) cutoff_time = datetime.now() - timedelta(hours=24) metrics = farmq_session.query(MonitoringMetrics).filter( MonitoringMetrics.machine_profile_id == machine_id, MonitoringMetrics.collected_at > cutoff_time ).order_by(desc(MonitoringMetrics.collected_at)).limit(100).all() machine_data['metrics_history'] = [metric.to_dict() for metric in metrics] # 최신 메트릭스 if metrics: latest = metrics[0] machine_data['latest_metrics'] = latest.to_dict() machine_data['alerts'] = latest.get_alert_status() # 활성 알림들 active_alerts = farmq_session.query(SystemAlert).filter( SystemAlert.machine_profile_id == machine_id, SystemAlert.status == 'active' ).order_by(desc(SystemAlert.created_at)).limit(10).all() machine_data['active_alerts'] = [alert.to_dict() for alert in active_alerts] return machine_data finally: close_session(farmq_session) # ========================================== # Headscale Synchronization # ========================================== def sync_machines_from_headscale() -> Dict[str, int]: """Headscale에서 머신 정보 동기화""" farmq_session = get_farmq_session() headscale_session = get_headscale_session() try: # Headscale에서 모든 노드 조회 nodes = headscale_session.query(Node).filter( Node.deleted_at.is_(None) ).all() synced = 0 created = 0 for node in nodes: # FARMQ 데이터베이스에서 해당 머신 찾기 machine = farmq_session.query(MachineProfile).filter( MachineProfile.headscale_node_id == node.id ).first() if machine: # 기존 머신 업데이트 is_online = node.is_online() status = 'online' if is_online else 'offline' machine.hostname = node.hostname machine.tailscale_ip = node.ipv4 machine.tailscale_status = status machine.last_seen = node.last_seen or datetime.now() machine.updated_at = datetime.now() synced += 1 else: # 새 머신 생성 machine = MachineProfile( headscale_node_id=node.id, headscale_machine_key=node.machine_key, hostname=node.hostname or 'unknown', machine_name=node.given_name or node.hostname or 'unknown', tailscale_ip=node.ipv4, tailscale_status='online' if node.is_online() else 'offline', os_type='unknown', status='active', last_seen=node.last_seen or datetime.now() ) farmq_session.add(machine) created += 1 farmq_session.commit() return { 'total_nodes': len(nodes), 'synced': synced, 'created': created } finally: close_session(farmq_session) close_session(headscale_session) def sync_users_from_headscale() -> Dict[str, int]: """Headscale에서 사용자 정보 동기화""" farmq_session = get_farmq_session() headscale_session = get_headscale_session() try: # Headscale에서 모든 사용자 조회 users = headscale_session.query(User).filter( User.deleted_at.is_(None) ).all() synced = 0 created = 0 for user in users: # FARMQ 데이터베이스에서 해당 약국 찾기 pharmacy = farmq_session.query(PharmacyInfo).filter( PharmacyInfo.headscale_user_name == user.name ).first() if pharmacy: # 기존 약국 업데이트 pharmacy.headscale_user_id = user.id # 약국명이 사용자명과 같으면 더 나은 이름으로 업데이트 if pharmacy.pharmacy_name == user.name: if user.display_name and user.display_name != user.name: pharmacy.pharmacy_name = user.display_name else: pharmacy.pharmacy_name = f"{user.name} 약국" # 더 나은 기본 이름 # 기본값들이 None인 경우 업데이트 if not pharmacy.business_number or pharmacy.business_number == "None": pharmacy.business_number = "000-00-00000" if not pharmacy.manager_name or pharmacy.manager_name == "None": pharmacy.manager_name = "관리자" pharmacy.last_sync = datetime.now() pharmacy.updated_at = datetime.now() synced += 1 else: # 새 약국 생성 (기본 정보로) pharmacy_name = user.display_name if user.display_name else f"{user.name} 약국" pharmacy = PharmacyInfo( headscale_user_name=user.name, headscale_user_id=user.id, pharmacy_name=pharmacy_name, business_number="000-00-00000", # 기본 사업자번호 manager_name="관리자", status='active', last_sync=datetime.now() ) farmq_session.add(pharmacy) created += 1 farmq_session.commit() return { 'total_users': len(users), 'synced': synced, 'created': created } finally: close_session(farmq_session) close_session(headscale_session) # ========================================== # Alert Management # ========================================== def get_active_alerts(limit: int = 50) -> List[Dict[str, Any]]: """활성 알림 조회""" farmq_session = get_farmq_session() try: alerts = farmq_session.query(SystemAlert).filter( SystemAlert.status == 'active' ).order_by(desc(SystemAlert.created_at)).limit(limit).all() return [alert.to_dict() for alert in alerts] finally: close_session(farmq_session) def create_alert(machine_profile_id: int, alert_data: Dict[str, Any]) -> SystemAlert: """새로운 알림 생성""" farmq_session = get_farmq_session() try: # 중복 알림 확인 fingerprint = f"{machine_profile_id}_{alert_data.get('alert_type')}_{alert_data.get('current_value')}" existing_alert = farmq_session.query(SystemAlert).filter( SystemAlert.fingerprint == fingerprint, SystemAlert.status == 'active' ).first() if existing_alert: # 기존 알림 업데이트 existing_alert.occurrence_count += 1 existing_alert.last_occurred = datetime.now() existing_alert.updated_at = datetime.now() farmq_session.commit() return existing_alert else: # 새 알림 생성 alert = SystemAlert( machine_profile_id=machine_profile_id, fingerprint=fingerprint, **alert_data ) farmq_session.add(alert) farmq_session.commit() farmq_session.refresh(alert) return alert finally: close_session(farmq_session) # ========================================== # Backward Compatibility Functions # ========================================== def get_pharmacy_count() -> int: """약국 수 조회 (하위 호환성)""" stats = get_dashboard_stats() return stats['total_pharmacies'] def get_online_machines_count() -> int: """온라인 머신 수 조회 (하위 호환성)""" stats = get_dashboard_stats() return stats['online_machines'] def get_offline_machines_count() -> int: """오프라인 머신 수 조회 (하위 호환성)""" stats = get_dashboard_stats() return stats['offline_machines'] def get_average_cpu_temperature() -> float: """평균 CPU 온도 조회 (하위 호환성)""" stats = get_dashboard_stats() return stats['avg_cpu_temp'] def humanize_datetime(dt) -> str: """datetime을 사람이 읽기 쉬운 형태로 변환""" if not dt: return '알 수 없음' try: import humanize # 한국어 설정 시도 try: humanize.i18n.activate('ko_KR') except: pass return humanize.naturaltime(dt) except ImportError: # humanize가 없으면 기본 형식 사용 if isinstance(dt, str): return dt return dt.strftime('%Y-%m-%d %H:%M:%S') def get_machine_with_details(machine_id: int) -> Optional[Dict[str, Any]]: """머신 상세 정보 조회 (하위 호환성)""" return get_machine_detail(machine_id) def get_performance_summary() -> Dict[str, Any]: """성능 요약 조회""" return { 'status': 'good', 'summary': '모든 시스템이 정상 작동 중입니다.' } # ========================================== # 초기화 함수 # ========================================== def init_database(headscale_db_uri: str): """데이터베이스 초기화 (하위 호환성)""" # FARMQ 데이터베이스는 자동으로 생성 farmq_db_uri = "sqlite:///farmq-admin/farmq.sqlite" # 디렉토리 생성 os.makedirs("farmq-admin", exist_ok=True) init_databases(headscale_db_uri, farmq_db_uri) # 초기 동기화 실행 try: print("🔄 Headscale에서 데이터 동기화 중...") user_sync = sync_users_from_headscale() machine_sync = sync_machines_from_headscale() print(f"✅ 사용자 동기화: {user_sync}") print(f"✅ 머신 동기화: {machine_sync}") except Exception as e: print(f"⚠️ 동기화 중 오류 발생: {e}") def get_session(): """FARMQ 세션 가져오기 (하위 호환성)""" return get_farmq_session() def close_session(session=None): """세션 종료 (하위 호환성)""" if session: session.close() # 새로운 구조에서는 각 함수가 자체적으로 세션을 관리하므로 여기서는 아무것도 하지 않음