""" 데이터베이스 연결 및 유틸리티 함수 """ from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker, scoped_session from models import Base, User, Node, PharmacyInfo, MachineSpecs, MonitoringData from datetime import datetime, timedelta import humanize from typing import List, Optional # 글로벌 세션 관리 db_session = scoped_session(sessionmaker()) def init_database(database_url: str): """데이터베이스 초기화""" engine = create_engine(database_url, echo=False) db_session.configure(bind=engine) Base.metadata.bind = engine # 확장 테이블 생성 (기존 테이블은 건드리지 않음) try: Base.metadata.create_all(engine) print("✅ Database initialized successfully") except Exception as e: print(f"❌ Database initialization failed: {e}") return engine def get_session(): """데이터베이스 세션 반환""" return db_session def close_session(): """데이터베이스 세션 종료""" db_session.remove() # 약국 관련 유틸리티 함수 def get_pharmacy_count() -> int: """총 약국 수 반환""" session = get_session() return session.query(PharmacyInfo).count() def get_pharmacy_with_stats(pharmacy_id: int) -> Optional[dict]: """약국 정보와 통계 반환""" session = get_session() pharmacy = session.query(PharmacyInfo).filter_by(id=pharmacy_id).first() if not pharmacy: return None # 연결된 머신 수 machine_count = session.query(Node).join(User).filter(User.name == pharmacy.user_id).count() # 온라인 머신 수 online_count = session.query(Node).join(User).filter( User.name == pharmacy.user_id, Node.last_seen > datetime.now() - timedelta(minutes=5) ).count() return { 'pharmacy': pharmacy, 'machine_count': machine_count, 'online_count': online_count, 'offline_count': machine_count - online_count } def get_all_pharmacies_with_stats() -> List[dict]: """모든 약국 정보와 통계 반환""" session = get_session() pharmacies = session.query(PharmacyInfo).all() result = [] for pharmacy in pharmacies: stats = get_pharmacy_with_stats(pharmacy.id) if stats: result.append(stats) return result # 머신 관련 유틸리티 함수 def get_online_machines_count() -> int: """온라인 머신 수 반환""" session = get_session() cutoff_time = datetime.now() - timedelta(minutes=5) return session.query(Node).filter(Node.last_seen > cutoff_time).count() def get_offline_machines_count() -> int: """오프라인 머신 수 반환""" session = get_session() total_machines = session.query(Node).count() online_machines = get_online_machines_count() return total_machines - online_machines def get_machine_with_details(machine_id: int) -> Optional[dict]: """머신 상세 정보 반환 (하드웨어 사양, 모니터링 데이터 포함)""" session = get_session() try: machine = session.query(Node).filter_by(id=machine_id).first() if not machine: return None # 하드웨어 사양 specs = session.query(MachineSpecs).filter_by(machine_id=machine_id).first() # 최신 모니터링 데이터 latest_monitoring = session.query(MonitoringData).filter_by( machine_id=machine_id ).order_by(MonitoringData.collected_at.desc()).first() # 약국 정보 (specs가 있고 pharmacy_id가 있는 경우) pharmacy = None if specs and hasattr(specs, 'pharmacy_id') and specs.pharmacy_id: pharmacy = session.query(PharmacyInfo).filter_by(id=specs.pharmacy_id).first() # is_online 상태 확인 try: is_online = machine.is_online() if hasattr(machine, 'is_online') else False except: # last_seen이 최근 5분 이내인지 확인 if machine.last_seen: from datetime import datetime, timedelta is_online = machine.last_seen > (datetime.now() - timedelta(minutes=5)) else: is_online = False result = { 'machine': machine, 'specs': specs, 'latest_monitoring': latest_monitoring, 'pharmacy': pharmacy, 'is_online': is_online, 'last_seen_humanized': humanize_datetime(machine.last_seen) } return result except Exception as e: print(f"❌ Error in get_machine_with_details: {e}") import traceback traceback.print_exc() return None # 모니터링 관련 유틸리티 함수 def get_average_cpu_temperature() -> float: """평균 CPU 온도 반환""" session = get_session() # 최근 5분 내 데이터만 사용 cutoff_time = datetime.now() - timedelta(minutes=5) result = session.query(MonitoringData).filter( MonitoringData.collected_at > cutoff_time, MonitoringData.cpu_temperature.isnot(None) ).all() if not result: return 0.0 temperatures = [r.cpu_temperature for r in result if r.cpu_temperature] return sum(temperatures) / len(temperatures) if temperatures else 0.0 def get_active_alerts() -> List[dict]: """활성 알림 목록 반환""" session = get_session() alerts = [] # CPU 온도 경고 (80도 이상) high_temp_machines = session.query(MonitoringData, Node).join(Node).filter( MonitoringData.cpu_temperature > 80, MonitoringData.collected_at > datetime.now() - timedelta(minutes=5) ).all() for monitoring, machine in high_temp_machines: alerts.append({ 'type': 'warning', 'level': 'high_temperature', 'machine': machine, 'message': f'{machine.hostname}: CPU 온도 {monitoring.cpu_temperature}°C', 'value': monitoring.cpu_temperature }) # 디스크 사용률 경고 (90% 이상) high_disk_machines = session.query(MonitoringData, Node).join(Node).filter( MonitoringData.disk_usage > 90, MonitoringData.collected_at > datetime.now() - timedelta(minutes=5) ).all() for monitoring, machine in high_disk_machines: alerts.append({ 'type': 'danger', 'level': 'high_disk', 'machine': machine, 'message': f'{machine.hostname}: 디스크 사용률 {monitoring.disk_usage}%', 'value': float(monitoring.disk_usage) }) return alerts # 유틸리티 헬퍼 함수 def humanize_datetime(dt) -> str: """datetime을 사람이 읽기 쉬운 형태로 변환""" if not dt: return '알 수 없음' try: # 한국어 설정 humanize.i18n.activate('ko_KR') return humanize.naturaltime(dt) except: # 한국어 로케일이 없으면 영어로 fallback return humanize.naturaltime(dt) def get_performance_summary() -> dict: """전체 성능 요약 반환""" session = get_session() cutoff_time = datetime.now() - timedelta(minutes=5) recent_data = session.query(MonitoringData).filter( MonitoringData.collected_at > cutoff_time ).all() if not recent_data: return { 'avg_cpu': 0, 'avg_memory': 0, 'avg_disk': 0, 'avg_temperature': 0 } cpu_values = [float(d.cpu_usage) for d in recent_data if d.cpu_usage] memory_values = [float(d.memory_usage) for d in recent_data if d.memory_usage] disk_values = [float(d.disk_usage) for d in recent_data if d.disk_usage] temp_values = [d.cpu_temperature for d in recent_data if d.cpu_temperature] return { 'avg_cpu': sum(cpu_values) / len(cpu_values) if cpu_values else 0, 'avg_memory': sum(memory_values) / len(memory_values) if memory_values else 0, 'avg_disk': sum(disk_values) / len(disk_values) if disk_values else 0, 'avg_temperature': sum(temp_values) / len(temp_values) if temp_values else 0 }