headscale-tailscale-replace.../farmq-admin/utils/database_new.py
시골약사 11f6ff16d0 Implement real-time online status synchronization with Headplane
- Add Headscale CLI integration to get real-time online status
- Replace timeout-based logic with exact same logic as Headplane
- Use 'online' field from Headscale CLI JSON output
- Update dashboard statistics to show 3 online nodes matching Headplane
- Update pharmacy and machine management views with real-time status

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-11 10:38:14 +09:00

644 lines
23 KiB
Python

"""
새로운 데이터베이스 유틸리티 - Headscale과 분리된 FARMQ 전용
외래키 제약조건 없이 능동적으로 데이터를 관리
"""
import os
import json
import subprocess
from typing import Optional, List, Dict, Any
from datetime import datetime, timedelta
from sqlalchemy import create_engine, text, and_, or_, desc
from sqlalchemy.orm import sessionmaker, Session
from models.farmq_models import (
PharmacyInfo, MachineProfile, MonitoringMetrics, SystemAlert,
FarmqDatabaseManager, create_farmq_database_manager,
FarmqBase
)
from models.headscale_models import User, Node, PreAuthKey, ApiKey
# 전역 데이터베이스 매니저들
farmq_manager: Optional[FarmqDatabaseManager] = None
headscale_engine = None
headscale_session_maker = None
def init_databases(headscale_db_uri: str, farmq_db_uri: str = None):
"""두 개의 데이터베이스 초기화"""
global farmq_manager, headscale_engine, headscale_session_maker
# FARMQ 전용 데이터베이스 (외래키 제약조건 없음)
if farmq_db_uri is None:
farmq_db_uri = "sqlite:///farmq-admin/farmq.sqlite"
farmq_manager = create_farmq_database_manager(farmq_db_uri)
print(f"✅ FARMQ 데이터베이스 초기화: {farmq_db_uri}")
# Headscale 읽기 전용 데이터베이스
headscale_engine = create_engine(headscale_db_uri, echo=False)
headscale_session_maker = sessionmaker(bind=headscale_engine)
print(f"✅ Headscale 데이터베이스 연결: {headscale_db_uri}")
def get_farmq_session() -> Session:
"""FARMQ 데이터베이스 세션 가져오기"""
if farmq_manager is None:
raise RuntimeError("FARMQ database not initialized")
return farmq_manager.get_session()
def get_headscale_session() -> Session:
"""Headscale 데이터베이스 세션 가져오기 (읽기 전용)"""
if headscale_session_maker is None:
raise RuntimeError("Headscale database not initialized")
return headscale_session_maker()
def close_session(session: Session):
"""세션 종료"""
if session:
session.close()
# ==========================================
# Headscale CLI Integration
# ==========================================
def get_headscale_online_status() -> Dict[str, bool]:
"""Headscale CLI를 통해 실시간 온라인 상태 조회"""
try:
# Docker를 통해 Headscale CLI 실행
result = subprocess.run(
['docker', 'exec', 'headscale', 'headscale', 'nodes', 'list', '-o', 'json'],
capture_output=True,
text=True,
check=True
)
nodes_data = json.loads(result.stdout)
online_status = {}
for node in nodes_data:
# given_name 또는 name 사용
node_name = node.get('given_name') or node.get('name', '')
# online 필드가 True인 경우만 온라인
# 필드가 없거나 False면 오프라인
is_online = node.get('online', False) == True
online_status[node_name.lower()] = is_online
return online_status
except Exception as e:
print(f"❌ Headscale CLI 호출 실패: {e}")
return {}
# ==========================================
# Dashboard Statistics
# ==========================================
def get_dashboard_stats() -> Dict[str, Any]:
"""대시보드 통계 조회 - 실시간 DB 쿼리"""
farmq_session = get_farmq_session()
headscale_session = get_headscale_session()
try:
# 약국 수
total_pharmacies = farmq_session.query(PharmacyInfo).filter(
PharmacyInfo.status == 'active'
).count()
# 머신 상태 - Headscale 노드에서 직접 실시간 조회
from models.headscale_models import Node
# 활성 노드만 조회 (deleted_at이 null인 것)
active_nodes = headscale_session.query(Node).filter(
Node.deleted_at.is_(None)
).all()
total_machines = len(active_nodes)
# Headscale CLI를 통해 실시간 온라인 상태 가져오기
online_status = get_headscale_online_status()
# 온라인 머신 수 계산
online_machines = 0
print(f"🔍 Headscale CLI 온라인 상태:")
for node in active_nodes:
node_name = (node.given_name or node.hostname or '').lower()
is_online = online_status.get(node_name, False)
if is_online:
online_machines += 1
print(f" {node.given_name:15s} | {'🟢 ONLINE' if is_online else '🔴 OFFLINE'}")
offline_machines = total_machines - online_machines
print(f"📊 최종 카운트: {online_machines}/{total_machines} 온라인 (Headplane과 동일)")
# 최근 알림 수
recent_alerts = farmq_session.query(SystemAlert).filter(
SystemAlert.status == 'active',
SystemAlert.created_at > (datetime.now() - timedelta(hours=24))
).count()
# 평균 CPU 온도 (최근 1시간)
cutoff_time = datetime.now() - timedelta(hours=1)
avg_temp_result = farmq_session.query(
MonitoringMetrics.cpu_temperature
).filter(
MonitoringMetrics.collected_at > cutoff_time,
MonitoringMetrics.cpu_temperature.isnot(None)
).all()
avg_cpu_temp = 0
if avg_temp_result:
temps = [temp[0] for temp in avg_temp_result if temp[0] is not None]
avg_cpu_temp = sum(temps) / len(temps) if temps else 0
return {
'total_pharmacies': total_pharmacies,
'total_machines': total_machines,
'online_machines': online_machines,
'offline_machines': offline_machines,
'recent_alerts': recent_alerts,
'avg_cpu_temp': round(avg_cpu_temp, 1)
}
finally:
close_session(farmq_session)
close_session(headscale_session)
# ==========================================
# Pharmacy Management
# ==========================================
def get_all_pharmacies_with_stats() -> List[Dict[str, Any]]:
"""모든 약국과 통계 정보 조회 - Headscale Node 데이터 사용"""
farmq_session = get_farmq_session()
headscale_session = get_headscale_session()
try:
pharmacies = farmq_session.query(PharmacyInfo).filter(
PharmacyInfo.status == 'active'
).all()
# Headscale CLI를 통해 실시간 온라인 상태 가져오기
online_status = get_headscale_online_status()
result = []
for pharmacy in pharmacies:
# Headscale에서 해당 사용자의 머신 수 조회
user_machines = headscale_session.query(Node).join(User).filter(
User.name == pharmacy.headscale_user_name,
Node.deleted_at.is_(None)
).all()
machine_count = len(user_machines)
# 온라인 머신 수 계산 - Headscale CLI 기반
online_count = 0
for machine in user_machines:
node_name = (machine.given_name or machine.hostname or '').lower()
if online_status.get(node_name, False):
online_count += 1
# 활성 알림 수 (현재는 0으로 설정, 나중에 구현)
alert_count = 0
pharmacy_data = pharmacy.to_dict()
pharmacy_data.update({
'machine_count': machine_count,
'online_count': online_count,
'offline_count': machine_count - online_count,
'alert_count': alert_count
})
result.append(pharmacy_data)
return result
finally:
close_session(farmq_session)
close_session(headscale_session)
def get_pharmacy_detail(pharmacy_id: int) -> Optional[Dict[str, Any]]:
"""약국 상세 정보 조회"""
farmq_session = get_farmq_session()
try:
pharmacy = farmq_session.query(PharmacyInfo).filter(
PharmacyInfo.id == pharmacy_id
).first()
if not pharmacy:
return None
# 약국의 머신들 조회
machines = farmq_session.query(MachineProfile).filter(
MachineProfile.pharmacy_id == pharmacy_id,
MachineProfile.status == 'active'
).all()
machine_list = []
for machine in machines:
machine_data = machine.to_dict()
# 최근 모니터링 데이터
latest_metrics = farmq_session.query(MonitoringMetrics).filter(
MonitoringMetrics.machine_profile_id == machine.id
).order_by(desc(MonitoringMetrics.collected_at)).first()
if latest_metrics:
machine_data['latest_metrics'] = latest_metrics.to_dict()
machine_list.append(machine_data)
return {
'pharmacy': pharmacy.to_dict(),
'machines': machine_list
}
finally:
close_session(farmq_session)
# ==========================================
# Machine Management
# ==========================================
def get_all_machines_with_details() -> List[Dict[str, Any]]:
"""모든 머신 상세 정보 조회 - Headscale Node 데이터 사용"""
headscale_session = get_headscale_session()
farmq_session = get_farmq_session()
try:
# Headscale에서 모든 노드 조회
nodes = headscale_session.query(Node).filter(
Node.deleted_at.is_(None)
).all()
print(f"🔍 Found {len(nodes)} nodes in Headscale database")
# Headscale CLI를 통해 실시간 온라인 상태 가져오기
online_status = get_headscale_online_status()
result = []
for node in nodes:
# 기본 머신 정보
machine_data = {
'id': node.id,
'hostname': node.hostname,
'machine_name': node.hostname, # 표시용 이름
'tailscale_ip': node.ipv4,
'ipv6': node.ipv6,
'headscale_user_name': node.user.name if node.user else '미지정',
'user_id': node.user_id,
'last_seen': node.last_seen,
'created_at': node.created_at,
'updated_at': node.updated_at
}
# Headscale CLI 기반 온라인 상태 확인
node_name = (node.given_name or node.hostname or '').lower()
machine_data['is_online'] = online_status.get(node_name, False)
# 사용자 이름으로 약국 정보 찾기
machine_data['pharmacy'] = None
if node.user:
pharmacy = farmq_session.query(PharmacyInfo).filter(
PharmacyInfo.headscale_user_name == node.user.name
).first()
if pharmacy:
machine_data['pharmacy'] = {
'id': pharmacy.id,
'pharmacy_name': pharmacy.pharmacy_name,
'manager_name': pharmacy.manager_name,
'business_number': pharmacy.business_number
}
# 마지막 접속 시간을 사람이 읽기 쉬운 형태로
machine_data['last_seen_humanized'] = humanize_datetime(node.last_seen)
# 하드웨어 사양 및 모니터링 데이터는 나중에 추가 예정
machine_data['specs'] = None
machine_data['latest_monitoring'] = None
result.append(machine_data)
return result
finally:
close_session(farmq_session)
close_session(headscale_session)
def get_machine_detail(machine_id: int) -> Optional[Dict[str, Any]]:
"""머신 상세 정보 조회"""
farmq_session = get_farmq_session()
try:
machine = farmq_session.query(MachineProfile).filter(
MachineProfile.id == machine_id
).first()
if not machine:
return None
machine_data = machine.to_dict()
# 약국 정보
if machine.pharmacy_id:
pharmacy = farmq_session.query(PharmacyInfo).filter(
PharmacyInfo.id == machine.pharmacy_id
).first()
if pharmacy:
machine_data['pharmacy'] = pharmacy.to_dict()
# 최근 모니터링 데이터 (24시간)
cutoff_time = datetime.now() - timedelta(hours=24)
metrics = farmq_session.query(MonitoringMetrics).filter(
MonitoringMetrics.machine_profile_id == machine_id,
MonitoringMetrics.collected_at > cutoff_time
).order_by(desc(MonitoringMetrics.collected_at)).limit(100).all()
machine_data['metrics_history'] = [metric.to_dict() for metric in metrics]
# 최신 메트릭스
if metrics:
latest = metrics[0]
machine_data['latest_metrics'] = latest.to_dict()
machine_data['alerts'] = latest.get_alert_status()
# 활성 알림들
active_alerts = farmq_session.query(SystemAlert).filter(
SystemAlert.machine_profile_id == machine_id,
SystemAlert.status == 'active'
).order_by(desc(SystemAlert.created_at)).limit(10).all()
machine_data['active_alerts'] = [alert.to_dict() for alert in active_alerts]
return machine_data
finally:
close_session(farmq_session)
# ==========================================
# Headscale Synchronization
# ==========================================
def sync_machines_from_headscale() -> Dict[str, int]:
"""Headscale에서 머신 정보 동기화"""
farmq_session = get_farmq_session()
headscale_session = get_headscale_session()
try:
# Headscale에서 모든 노드 조회
nodes = headscale_session.query(Node).filter(
Node.deleted_at.is_(None)
).all()
synced = 0
created = 0
for node in nodes:
# FARMQ 데이터베이스에서 해당 머신 찾기
machine = farmq_session.query(MachineProfile).filter(
MachineProfile.headscale_node_id == node.id
).first()
if machine:
# 기존 머신 업데이트
is_online = node.is_online()
status = 'online' if is_online else 'offline'
machine.hostname = node.hostname
machine.tailscale_ip = node.ipv4
machine.tailscale_status = status
machine.last_seen = node.last_seen or datetime.now()
machine.updated_at = datetime.now()
synced += 1
else:
# 새 머신 생성
machine = MachineProfile(
headscale_node_id=node.id,
headscale_machine_key=node.machine_key,
hostname=node.hostname or 'unknown',
machine_name=node.given_name or node.hostname or 'unknown',
tailscale_ip=node.ipv4,
tailscale_status='online' if node.is_online() else 'offline',
os_type='unknown',
status='active',
last_seen=node.last_seen or datetime.now()
)
farmq_session.add(machine)
created += 1
farmq_session.commit()
return {
'total_nodes': len(nodes),
'synced': synced,
'created': created
}
finally:
close_session(farmq_session)
close_session(headscale_session)
def sync_users_from_headscale() -> Dict[str, int]:
"""Headscale에서 사용자 정보 동기화"""
farmq_session = get_farmq_session()
headscale_session = get_headscale_session()
try:
# Headscale에서 모든 사용자 조회
users = headscale_session.query(User).filter(
User.deleted_at.is_(None)
).all()
synced = 0
created = 0
for user in users:
# FARMQ 데이터베이스에서 해당 약국 찾기
pharmacy = farmq_session.query(PharmacyInfo).filter(
PharmacyInfo.headscale_user_name == user.name
).first()
if pharmacy:
# 기존 약국 업데이트
pharmacy.headscale_user_id = user.id
# 약국명이 사용자명과 같으면 더 나은 이름으로 업데이트
if pharmacy.pharmacy_name == user.name:
if user.display_name and user.display_name != user.name:
pharmacy.pharmacy_name = user.display_name
else:
pharmacy.pharmacy_name = f"{user.name} 약국" # 더 나은 기본 이름
# 기본값들이 None인 경우 업데이트
if not pharmacy.business_number or pharmacy.business_number == "None":
pharmacy.business_number = "000-00-00000"
if not pharmacy.manager_name or pharmacy.manager_name == "None":
pharmacy.manager_name = "관리자"
pharmacy.last_sync = datetime.now()
pharmacy.updated_at = datetime.now()
synced += 1
else:
# 새 약국 생성 (기본 정보로)
pharmacy_name = user.display_name if user.display_name else f"{user.name} 약국"
pharmacy = PharmacyInfo(
headscale_user_name=user.name,
headscale_user_id=user.id,
pharmacy_name=pharmacy_name,
business_number="000-00-00000", # 기본 사업자번호
manager_name="관리자",
status='active',
last_sync=datetime.now()
)
farmq_session.add(pharmacy)
created += 1
farmq_session.commit()
return {
'total_users': len(users),
'synced': synced,
'created': created
}
finally:
close_session(farmq_session)
close_session(headscale_session)
# ==========================================
# Alert Management
# ==========================================
def get_active_alerts(limit: int = 50) -> List[Dict[str, Any]]:
"""활성 알림 조회"""
farmq_session = get_farmq_session()
try:
alerts = farmq_session.query(SystemAlert).filter(
SystemAlert.status == 'active'
).order_by(desc(SystemAlert.created_at)).limit(limit).all()
return [alert.to_dict() for alert in alerts]
finally:
close_session(farmq_session)
def create_alert(machine_profile_id: int, alert_data: Dict[str, Any]) -> SystemAlert:
"""새로운 알림 생성"""
farmq_session = get_farmq_session()
try:
# 중복 알림 확인
fingerprint = f"{machine_profile_id}_{alert_data.get('alert_type')}_{alert_data.get('current_value')}"
existing_alert = farmq_session.query(SystemAlert).filter(
SystemAlert.fingerprint == fingerprint,
SystemAlert.status == 'active'
).first()
if existing_alert:
# 기존 알림 업데이트
existing_alert.occurrence_count += 1
existing_alert.last_occurred = datetime.now()
existing_alert.updated_at = datetime.now()
farmq_session.commit()
return existing_alert
else:
# 새 알림 생성
alert = SystemAlert(
machine_profile_id=machine_profile_id,
fingerprint=fingerprint,
**alert_data
)
farmq_session.add(alert)
farmq_session.commit()
farmq_session.refresh(alert)
return alert
finally:
close_session(farmq_session)
# ==========================================
# Backward Compatibility Functions
# ==========================================
def get_pharmacy_count() -> int:
"""약국 수 조회 (하위 호환성)"""
stats = get_dashboard_stats()
return stats['total_pharmacies']
def get_online_machines_count() -> int:
"""온라인 머신 수 조회 (하위 호환성)"""
stats = get_dashboard_stats()
return stats['online_machines']
def get_offline_machines_count() -> int:
"""오프라인 머신 수 조회 (하위 호환성)"""
stats = get_dashboard_stats()
return stats['offline_machines']
def get_average_cpu_temperature() -> float:
"""평균 CPU 온도 조회 (하위 호환성)"""
stats = get_dashboard_stats()
return stats['avg_cpu_temp']
def humanize_datetime(dt) -> str:
"""datetime을 사람이 읽기 쉬운 형태로 변환"""
if not dt:
return '알 수 없음'
try:
import humanize
# 한국어 설정 시도
try:
humanize.i18n.activate('ko_KR')
except:
pass
return humanize.naturaltime(dt)
except ImportError:
# humanize가 없으면 기본 형식 사용
if isinstance(dt, str):
return dt
return dt.strftime('%Y-%m-%d %H:%M:%S')
def get_machine_with_details(machine_id: int) -> Optional[Dict[str, Any]]:
"""머신 상세 정보 조회 (하위 호환성)"""
return get_machine_detail(machine_id)
def get_performance_summary() -> Dict[str, Any]:
"""성능 요약 조회"""
return {
'status': 'good',
'summary': '모든 시스템이 정상 작동 중입니다.'
}
# ==========================================
# 초기화 함수
# ==========================================
def init_database(headscale_db_uri: str):
"""데이터베이스 초기화 (하위 호환성)"""
# FARMQ 데이터베이스는 자동으로 생성
farmq_db_uri = "sqlite:///farmq-admin/farmq.sqlite"
# 디렉토리 생성
os.makedirs("farmq-admin", exist_ok=True)
init_databases(headscale_db_uri, farmq_db_uri)
# 초기 동기화 실행
try:
print("🔄 Headscale에서 데이터 동기화 중...")
user_sync = sync_users_from_headscale()
machine_sync = sync_machines_from_headscale()
print(f"✅ 사용자 동기화: {user_sync}")
print(f"✅ 머신 동기화: {machine_sync}")
except Exception as e:
print(f"⚠️ 동기화 중 오류 발생: {e}")
def get_session():
"""FARMQ 세션 가져오기 (하위 호환성)"""
return get_farmq_session()
def close_session(session=None):
"""세션 종료 (하위 호환성)"""
if session:
session.close()
# 새로운 구조에서는 각 함수가 자체적으로 세션을 관리하므로 여기서는 아무것도 하지 않음