""" 새로운 데이터베이스 유틸리티 - Headscale과 분리된 FARMQ 전용 외래키 제약조건 없이 능동적으로 데이터를 관리 """ import os import json import subprocess from typing import Optional, List, Dict, Any from datetime import datetime, timedelta from sqlalchemy import create_engine, text, and_, or_, desc from sqlalchemy.orm import sessionmaker, Session from models.farmq_models import ( PharmacyInfo, MachineProfile, MonitoringMetrics, SystemAlert, FarmqDatabaseManager, create_farmq_database_manager, FarmqBase ) from models.headscale_models import User, Node, PreAuthKey, ApiKey # 전역 데이터베이스 매니저들 farmq_manager: Optional[FarmqDatabaseManager] = None headscale_engine = None headscale_session_maker = None def init_databases(headscale_db_uri: str, farmq_db_uri: str = None): """두 개의 데이터베이스 초기화""" global farmq_manager, headscale_engine, headscale_session_maker # FARMQ 전용 데이터베이스 (외래키 제약조건 없음) if farmq_db_uri is None: farmq_db_uri = "sqlite:///farmq-admin/farmq.sqlite" farmq_manager = create_farmq_database_manager(farmq_db_uri) print(f"✅ FARMQ 데이터베이스 초기화: {farmq_db_uri}") # Headscale 읽기 전용 데이터베이스 headscale_engine = create_engine(headscale_db_uri, echo=False) headscale_session_maker = sessionmaker(bind=headscale_engine) print(f"✅ Headscale 데이터베이스 연결: {headscale_db_uri}") def get_farmq_session() -> Session: """FARMQ 데이터베이스 세션 가져오기""" if farmq_manager is None: raise RuntimeError("FARMQ database not initialized") return farmq_manager.get_session() def get_headscale_session() -> Session: """Headscale 데이터베이스 세션 가져오기 (읽기 전용)""" if headscale_session_maker is None: raise RuntimeError("Headscale database not initialized") return headscale_session_maker() def close_session(session: Session): """세션 종료""" if session: session.close() # ========================================== # Headscale CLI Integration # ========================================== def get_headscale_online_status() -> Dict[str, bool]: """Headscale CLI를 통해 실시간 온라인 상태 조회""" try: # Docker를 통해 Headscale CLI 실행 result = subprocess.run( ['docker', 'exec', 'headscale', 'headscale', 'nodes', 'list', '-o', 'json'], capture_output=True, text=True, check=True ) nodes_data = json.loads(result.stdout) online_status = {} for node in nodes_data: # given_name 또는 name 사용 node_name = node.get('given_name') or node.get('name', '') # online 필드가 True인 경우만 온라인 # 필드가 없거나 False면 오프라인 is_online = node.get('online', False) == True online_status[node_name.lower()] = is_online return online_status except Exception as e: print(f"❌ Headscale CLI 호출 실패: {e}") return {} # ========================================== # Dashboard Statistics # ========================================== def get_dashboard_stats() -> Dict[str, Any]: """대시보드 통계 조회 - 실시간 DB 쿼리""" farmq_session = get_farmq_session() headscale_session = get_headscale_session() try: # 약국 수 total_pharmacies = farmq_session.query(PharmacyInfo).filter( PharmacyInfo.status == 'active' ).count() # 머신 상태 - Headscale 노드에서 직접 실시간 조회 from models.headscale_models import Node # 활성 노드만 조회 (deleted_at이 null인 것) active_nodes = headscale_session.query(Node).filter( Node.deleted_at.is_(None) ).all() total_machines = len(active_nodes) # Headscale CLI를 통해 실시간 온라인 상태 가져오기 online_status = get_headscale_online_status() # 온라인 머신 수 계산 online_machines = 0 print(f"🔍 Headscale CLI 온라인 상태:") for node in active_nodes: node_name = (node.given_name or node.hostname or '').lower() is_online = online_status.get(node_name, False) if is_online: online_machines += 1 print(f" {node.given_name:15s} | {'🟢 ONLINE' if is_online else '🔴 OFFLINE'}") offline_machines = total_machines - online_machines print(f"📊 최종 카운트: {online_machines}/{total_machines} 온라인 (Headplane과 동일)") # 최근 알림 수 recent_alerts = farmq_session.query(SystemAlert).filter( SystemAlert.status == 'active', SystemAlert.created_at > (datetime.now() - timedelta(hours=24)) ).count() # 평균 CPU 온도 (최근 1시간) cutoff_time = datetime.now() - timedelta(hours=1) avg_temp_result = farmq_session.query( MonitoringMetrics.cpu_temperature ).filter( MonitoringMetrics.collected_at > cutoff_time, MonitoringMetrics.cpu_temperature.isnot(None) ).all() avg_cpu_temp = 0 if avg_temp_result: temps = [temp[0] for temp in avg_temp_result if temp[0] is not None] avg_cpu_temp = sum(temps) / len(temps) if temps else 0 return { 'total_pharmacies': total_pharmacies, 'total_machines': total_machines, 'online_machines': online_machines, 'offline_machines': offline_machines, 'recent_alerts': recent_alerts, 'avg_cpu_temp': round(avg_cpu_temp, 1) } finally: close_session(farmq_session) close_session(headscale_session) # ========================================== # Pharmacy Management # ========================================== def get_all_pharmacies_with_stats() -> List[Dict[str, Any]]: """모든 약국과 통계 정보 조회 - Headscale Node 데이터 사용""" farmq_session = get_farmq_session() headscale_session = get_headscale_session() try: pharmacies = farmq_session.query(PharmacyInfo).filter( PharmacyInfo.status == 'active' ).all() # Headscale CLI를 통해 실시간 온라인 상태 가져오기 online_status = get_headscale_online_status() result = [] for pharmacy in pharmacies: # Headscale에서 해당 사용자의 머신 수 조회 user_machines = headscale_session.query(Node).join(User).filter( User.name == pharmacy.headscale_user_name, Node.deleted_at.is_(None) ).all() machine_count = len(user_machines) # 온라인 머신 수 계산 - Headscale CLI 기반 online_count = 0 for machine in user_machines: node_name = (machine.given_name or machine.hostname or '').lower() if online_status.get(node_name, False): online_count += 1 # 활성 알림 수 (현재는 0으로 설정, 나중에 구현) alert_count = 0 pharmacy_data = pharmacy.to_dict() pharmacy_data.update({ 'machine_count': machine_count, 'online_count': online_count, 'offline_count': machine_count - online_count, 'alert_count': alert_count }) result.append(pharmacy_data) return result finally: close_session(farmq_session) close_session(headscale_session) def get_pharmacy_detail(pharmacy_id: int) -> Optional[Dict[str, Any]]: """Headscale CLI 기반 약국 상세 정보 조회""" import subprocess import json from datetime import datetime farmq_session = get_farmq_session() try: # 약국 기본 정보 조회 pharmacy = farmq_session.query(PharmacyInfo).filter( PharmacyInfo.id == pharmacy_id ).first() if not pharmacy: return None # Headscale CLI에서 노드 목록 가져오기 result = subprocess.run( ['docker', 'exec', 'headscale', 'headscale', 'nodes', 'list', '-o', 'json'], capture_output=True, text=True, check=True ) nodes_data = json.loads(result.stdout) online_status = get_headscale_online_status() # 약국과 연결된 사용자의 머신들 찾기 machine_list = [] if pharmacy.headscale_user_name: for node in nodes_data: # 이 노드가 약국의 사용자 것인지 확인 node_user_name = node.get('user', {}).get('name', '') if node_user_name == pharmacy.headscale_user_name: # 시간 변환 함수 def convert_timestamp(ts_obj): if isinstance(ts_obj, dict) and 'seconds' in ts_obj: return datetime.fromtimestamp(ts_obj['seconds']) elif isinstance(ts_obj, str): try: return datetime.fromisoformat(ts_obj.replace('Z', '+00:00')) except: return None return None node_name = node.get('given_name') or node.get('name', '') is_online = online_status.get(node_name.lower(), False) machine_data = { 'id': node.get('id'), 'given_name': node.get('given_name'), 'hostname': node.get('name'), 'ipv4': node.get('ip_addresses', [])[0] if node.get('ip_addresses') else None, 'ipv6': node.get('ip_addresses', [])[1] if len(node.get('ip_addresses', [])) > 1 else None, 'machine_key': node.get('machine_key'), 'node_key': node.get('node_key'), 'disco_key': node.get('disco_key'), 'user': node.get('user'), 'last_seen': convert_timestamp(node.get('last_seen')), 'created_at': convert_timestamp(node.get('created_at')), 'register_method': 'CLI' if node.get('register_method') == 1 else 'Pre-auth Key', 'online': is_online, 'endpoints': node.get('endpoints', []), } machine_list.append(machine_data) # 약국 정보에 통계 추가 pharmacy_data = { 'id': pharmacy.id, 'pharmacy_name': pharmacy.pharmacy_name, 'business_number': pharmacy.business_number, 'manager_name': pharmacy.manager_name, 'phone': pharmacy.phone, 'address': pharmacy.address, 'proxmox_host': pharmacy.proxmox_host, 'headscale_user_name': pharmacy.headscale_user_name, 'headscale_user_id': pharmacy.headscale_user_id, 'created_at': pharmacy.created_at, 'updated_at': pharmacy.updated_at, # 통계 정보 'total_machines': len(machine_list), 'online_machines': len([m for m in machine_list if m['online']]), 'offline_machines': len([m for m in machine_list if not m['online']]), } return { 'pharmacy': pharmacy_data, 'machines': machine_list } except subprocess.CalledProcessError as e: print(f"❌ Headscale CLI 오류: {e}") return None except Exception as e: print(f"❌ 약국 상세 정보 조회 오류: {e}") return None finally: farmq_session.close() # ========================================== # Machine Management # ========================================== def get_all_machines_with_details() -> List[Dict[str, Any]]: """모든 머신 상세 정보 조회 - Headscale Node 데이터 사용""" headscale_session = get_headscale_session() farmq_session = get_farmq_session() try: # Headscale에서 모든 노드 조회 nodes = headscale_session.query(Node).filter( Node.deleted_at.is_(None) ).all() print(f"🔍 Found {len(nodes)} nodes in Headscale database") # Headscale CLI를 통해 실시간 온라인 상태 가져오기 online_status = get_headscale_online_status() result = [] for node in nodes: # 기본 머신 정보 machine_data = { 'id': node.id, 'hostname': node.hostname, 'machine_name': node.hostname, # 표시용 이름 'tailscale_ip': node.ipv4, 'ipv6': node.ipv6, 'headscale_user_name': node.user.name if node.user else '미지정', 'user_id': node.user_id, 'last_seen': node.last_seen, 'created_at': node.created_at, 'updated_at': node.updated_at } # Headscale CLI 기반 온라인 상태 확인 node_name = (node.given_name or node.hostname or '').lower() machine_data['is_online'] = online_status.get(node_name, False) # 사용자 이름으로 약국 정보 찾기 machine_data['pharmacy'] = None if node.user: pharmacy = farmq_session.query(PharmacyInfo).filter( PharmacyInfo.headscale_user_name == node.user.name ).first() if pharmacy: machine_data['pharmacy'] = { 'id': pharmacy.id, 'pharmacy_name': pharmacy.pharmacy_name, 'manager_name': pharmacy.manager_name, 'business_number': pharmacy.business_number } # 마지막 접속 시간을 사람이 읽기 쉬운 형태로 machine_data['last_seen_humanized'] = humanize_datetime(node.last_seen) # 하드웨어 사양 및 모니터링 데이터는 나중에 추가 예정 machine_data['specs'] = None machine_data['latest_monitoring'] = None result.append(machine_data) return result finally: close_session(farmq_session) close_session(headscale_session) def get_machine_detail(machine_id: int) -> Optional[Dict[str, Any]]: """Headscale CLI를 통한 머신 상세 정보 조회""" import subprocess import json from datetime import datetime try: # Headscale CLI에서 노드 목록 가져오기 result = subprocess.run( ['docker', 'exec', 'headscale', 'headscale', 'nodes', 'list', '-o', 'json'], capture_output=True, text=True, check=True ) nodes_data = json.loads(result.stdout) # machine_id로 노드 찾기 (id 기준) machine = None for node in nodes_data: if node.get('id') == machine_id: machine = node break if not machine: return None # 온라인 상태 체크 online_status = get_headscale_online_status() node_name = machine.get('given_name') or machine.get('name', '') is_online = online_status.get(node_name.lower(), False) # 시간 변환 함수 def convert_timestamp(ts_obj): if isinstance(ts_obj, dict) and 'seconds' in ts_obj: return datetime.fromtimestamp(ts_obj['seconds']) elif isinstance(ts_obj, str): try: return datetime.fromisoformat(ts_obj.replace('Z', '+00:00')) except: return None return None # 엔드포인트 추출 endpoints = [] if 'endpoints' in machine: endpoints = machine['endpoints'] # 약국 정보 매핑 (사용자명을 통해) pharmacy_info = None user_name = machine.get('user', {}).get('name', '') if user_name: farmq_session = get_farmq_session() try: pharmacy = farmq_session.query(PharmacyInfo).filter( PharmacyInfo.headscale_user_name == user_name ).first() if pharmacy: pharmacy_info = { 'id': pharmacy.id, 'name': pharmacy.pharmacy_name, 'manager': pharmacy.manager_name, 'address': pharmacy.address } finally: farmq_session.close() # 반환 데이터 구성 machine_data = { 'id': machine.get('id'), 'given_name': machine.get('given_name'), 'hostname': machine.get('name'), 'ipv4': machine.get('ip_addresses', [])[0] if machine.get('ip_addresses') else None, 'ipv6': machine.get('ip_addresses', [])[1] if len(machine.get('ip_addresses', [])) > 1 else None, 'machine_key': machine.get('machine_key'), 'node_key': machine.get('node_key'), 'disco_key': machine.get('disco_key'), 'user': machine.get('user'), 'last_seen': convert_timestamp(machine.get('last_seen')), 'created_at': convert_timestamp(machine.get('created_at')), 'register_method': 'CLI' if machine.get('register_method') == 1 else 'Pre-auth Key', 'online': is_online, 'endpoints': endpoints, 'pharmacy': pharmacy_info, # 헬퍼 메서드 'get_endpoints': lambda: endpoints, } return machine_data except subprocess.CalledProcessError as e: print(f"❌ Headscale CLI 오류: {e}") return None except Exception as e: print(f"❌ 머신 상세 정보 조회 오류: {e}") return None # ========================================== # Headscale Synchronization # ========================================== def sync_machines_from_headscale() -> Dict[str, int]: """Headscale에서 머신 정보 동기화""" farmq_session = get_farmq_session() headscale_session = get_headscale_session() try: # Headscale에서 모든 노드 조회 nodes = headscale_session.query(Node).filter( Node.deleted_at.is_(None) ).all() synced = 0 created = 0 for node in nodes: # FARMQ 데이터베이스에서 해당 머신 찾기 machine = farmq_session.query(MachineProfile).filter( MachineProfile.headscale_node_id == node.id ).first() if machine: # 기존 머신 업데이트 is_online = node.is_online() status = 'online' if is_online else 'offline' machine.hostname = node.hostname machine.tailscale_ip = node.ipv4 machine.tailscale_status = status machine.last_seen = node.last_seen or datetime.now() machine.updated_at = datetime.now() synced += 1 else: # 새 머신 생성 machine = MachineProfile( headscale_node_id=node.id, headscale_machine_key=node.machine_key, hostname=node.hostname or 'unknown', machine_name=node.given_name or node.hostname or 'unknown', tailscale_ip=node.ipv4, tailscale_status='online' if node.is_online() else 'offline', os_type='unknown', status='active', last_seen=node.last_seen or datetime.now() ) farmq_session.add(machine) created += 1 farmq_session.commit() return { 'total_nodes': len(nodes), 'synced': synced, 'created': created } finally: close_session(farmq_session) close_session(headscale_session) def sync_users_from_headscale() -> Dict[str, int]: """Headscale에서 사용자 정보 동기화""" farmq_session = get_farmq_session() headscale_session = get_headscale_session() try: # Headscale에서 모든 사용자 조회 users = headscale_session.query(User).filter( User.deleted_at.is_(None) ).all() synced = 0 created = 0 for user in users: # FARMQ 데이터베이스에서 해당 약국 찾기 pharmacy = farmq_session.query(PharmacyInfo).filter( PharmacyInfo.headscale_user_name == user.name ).first() if pharmacy: # 기존 약국 업데이트 pharmacy.headscale_user_id = user.id # 약국명이 사용자명과 같으면 더 나은 이름으로 업데이트 if pharmacy.pharmacy_name == user.name: if user.display_name and user.display_name != user.name: pharmacy.pharmacy_name = user.display_name else: pharmacy.pharmacy_name = f"{user.name} 약국" # 더 나은 기본 이름 # 기본값들이 None인 경우 업데이트 if not pharmacy.business_number or pharmacy.business_number == "None": pharmacy.business_number = "000-00-00000" if not pharmacy.manager_name or pharmacy.manager_name == "None": pharmacy.manager_name = "관리자" pharmacy.last_sync = datetime.now() pharmacy.updated_at = datetime.now() synced += 1 else: # 새 약국 생성 (기본 정보로) pharmacy_name = user.display_name if user.display_name else f"{user.name} 약국" pharmacy = PharmacyInfo( headscale_user_name=user.name, headscale_user_id=user.id, pharmacy_name=pharmacy_name, business_number="000-00-00000", # 기본 사업자번호 manager_name="관리자", status='active', last_sync=datetime.now() ) farmq_session.add(pharmacy) created += 1 farmq_session.commit() return { 'total_users': len(users), 'synced': synced, 'created': created } finally: close_session(farmq_session) close_session(headscale_session) # ========================================== # Alert Management # ========================================== def get_active_alerts(limit: int = 50) -> List[Dict[str, Any]]: """활성 알림 조회""" farmq_session = get_farmq_session() try: alerts = farmq_session.query(SystemAlert).filter( SystemAlert.status == 'active' ).order_by(desc(SystemAlert.created_at)).limit(limit).all() return [alert.to_dict() for alert in alerts] finally: close_session(farmq_session) def create_alert(machine_profile_id: int, alert_data: Dict[str, Any]) -> SystemAlert: """새로운 알림 생성""" farmq_session = get_farmq_session() try: # 중복 알림 확인 fingerprint = f"{machine_profile_id}_{alert_data.get('alert_type')}_{alert_data.get('current_value')}" existing_alert = farmq_session.query(SystemAlert).filter( SystemAlert.fingerprint == fingerprint, SystemAlert.status == 'active' ).first() if existing_alert: # 기존 알림 업데이트 existing_alert.occurrence_count += 1 existing_alert.last_occurred = datetime.now() existing_alert.updated_at = datetime.now() farmq_session.commit() return existing_alert else: # 새 알림 생성 alert = SystemAlert( machine_profile_id=machine_profile_id, fingerprint=fingerprint, **alert_data ) farmq_session.add(alert) farmq_session.commit() farmq_session.refresh(alert) return alert finally: close_session(farmq_session) # ========================================== # Backward Compatibility Functions # ========================================== def get_pharmacy_count() -> int: """약국 수 조회 (하위 호환성)""" stats = get_dashboard_stats() return stats['total_pharmacies'] def get_online_machines_count() -> int: """온라인 머신 수 조회 (하위 호환성)""" stats = get_dashboard_stats() return stats['online_machines'] def get_offline_machines_count() -> int: """오프라인 머신 수 조회 (하위 호환성)""" stats = get_dashboard_stats() return stats['offline_machines'] def get_average_cpu_temperature() -> float: """평균 CPU 온도 조회 (하위 호환성)""" stats = get_dashboard_stats() return stats['avg_cpu_temp'] def humanize_datetime(dt) -> str: """datetime을 사람이 읽기 쉬운 형태로 변환""" if not dt: return '알 수 없음' try: import humanize # 한국어 설정 시도 try: humanize.i18n.activate('ko_KR') except: pass return humanize.naturaltime(dt) except ImportError: # humanize가 없으면 기본 형식 사용 if isinstance(dt, str): return dt return dt.strftime('%Y-%m-%d %H:%M:%S') def get_machine_with_details(machine_id: int) -> Optional[Dict[str, Any]]: """머신 상세 정보 조회 (하위 호환성)""" return get_machine_detail(machine_id) def get_performance_summary() -> Dict[str, Any]: """성능 요약 조회""" return { 'status': 'good', 'summary': '모든 시스템이 정상 작동 중입니다.' } # ========================================== # 초기화 함수 # ========================================== def init_database(headscale_db_uri: str): """데이터베이스 초기화 (하위 호환성)""" # FARMQ 데이터베이스는 자동으로 생성 farmq_db_uri = "sqlite:///farmq-admin/farmq.sqlite" # 디렉토리 생성 os.makedirs("farmq-admin", exist_ok=True) init_databases(headscale_db_uri, farmq_db_uri) # 초기 동기화 실행 try: print("🔄 Headscale에서 데이터 동기화 중...") user_sync = sync_users_from_headscale() machine_sync = sync_machines_from_headscale() print(f"✅ 사용자 동기화: {user_sync}") print(f"✅ 머신 동기화: {machine_sync}") except Exception as e: print(f"⚠️ 동기화 중 오류 발생: {e}") def get_session(): """FARMQ 세션 가져오기 (하위 호환성)""" return get_farmq_session() def close_session(session=None): """세션 종료 (하위 호환성)""" if session: session.close() # 새로운 구조에서는 각 함수가 자체적으로 세션을 관리하므로 여기서는 아무것도 하지 않음