""" Headscale SQLite Database Models for SQLAlchemy Based on actual schema analysis of Headscale v0.23.0 Generated from: /var/lib/headscale/db.sqlite Schema Analysis Date: 2025-09-09 """ from datetime import datetime, timedelta from typing import Optional, List import json from sqlalchemy import ( Column, Integer, String, DateTime, Boolean, Text, ForeignKey, LargeBinary, Index, UniqueConstraint ) from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import relationship, Session from sqlalchemy.types import TypeDecorator, TEXT Base = declarative_base() class JSONType(TypeDecorator): """Custom JSON type for SQLAlchemy that handles JSON serialization""" impl = TEXT def process_bind_param(self, value, dialect): if value is not None: return json.dumps(value) return value def process_result_value(self, value, dialect): if value is not None: return json.loads(value) return value class Migration(Base): """Migration tracking table""" __tablename__ = 'migrations' id = Column(String, primary_key=True) def __repr__(self): return f"" class User(Base): """Headscale Users table Represents individual users/namespaces in the Headscale network. Each user can have multiple nodes (machines) associated with them. """ __tablename__ = 'users' __table_args__ = ( Index('idx_users_deleted_at', 'deleted_at'), Index('idx_provider_identifier', 'provider_identifier', postgresql_where="provider_identifier IS NOT NULL"), Index('idx_name_provider_identifier', 'name', 'provider_identifier'), Index('idx_name_no_provider_identifier', 'name', postgresql_where="provider_identifier IS NULL"), ) id = Column(Integer, primary_key=True, autoincrement=True) created_at = Column(DateTime) updated_at = Column(DateTime) deleted_at = Column(DateTime) # Soft delete name = Column(String) # User identifier (e.g., "myuser") display_name = Column(String) # Human-readable display name email = Column(String) # User email address provider_identifier = Column(String) # External auth provider ID provider = Column(String) # Auth provider name (OIDC, etc.) profile_pic_url = Column(String) # Profile picture URL # Relationships nodes = relationship("Node", back_populates="user", cascade="all, delete-orphan") pre_auth_keys = relationship("PreAuthKey", back_populates="user") def __repr__(self): return f"" def is_deleted(self) -> bool: """Check if user is soft-deleted""" return self.deleted_at is not None class Node(Base): """Headscale Nodes (Machines) table Represents individual devices/machines connected to the Tailnet. Each node belongs to a user and has various networking attributes. """ __tablename__ = 'nodes' id = Column(Integer, primary_key=True, autoincrement=True) machine_key = Column(String) # Machine's public key node_key = Column(String) # Node's network key disco_key = Column(String) # Discovery key for peer-to-peer connections endpoints = Column(JSONType) # List of network endpoints (JSON array) host_info = Column(JSONType) # Detailed host information (JSON object) ipv4 = Column(String) # Assigned IPv4 address (e.g., "100.64.0.1") ipv6 = Column(String) # Assigned IPv6 address hostname = Column(String) # Machine hostname given_name = Column(String) # User-assigned machine name user_id = Column(Integer, ForeignKey('users.id', ondelete='CASCADE')) register_method = Column(String) # Registration method (e.g., "authkey") forced_tags = Column(JSONType) # Tags forced on this node (JSON array) auth_key_id = Column(Integer, ForeignKey('pre_auth_keys.id')) expiry = Column(DateTime) # Node expiration date last_seen = Column(DateTime) # Last activity timestamp approved_routes = Column(JSONType) # Approved subnet routes (JSON array) created_at = Column(DateTime) updated_at = Column(DateTime) deleted_at = Column(DateTime) # Soft delete # Relationships user = relationship("User", back_populates="nodes") auth_key = relationship("PreAuthKey") def __repr__(self): return f"" def is_online(self, timeout_minutes: int = 5) -> bool: """Check if node is considered online based on last_seen""" if not self.last_seen: return False # Handle timezone-aware datetime now = datetime.now() last_seen = self.last_seen # If last_seen is timezone-aware, make now timezone-aware too if last_seen.tzinfo is not None and now.tzinfo is None: from datetime import timezone now = now.replace(tzinfo=timezone.utc) # If last_seen is naive, make it naive too elif last_seen.tzinfo is not None and now.tzinfo is None: last_seen = last_seen.replace(tzinfo=None) try: return (now - last_seen).total_seconds() < (timeout_minutes * 60) except TypeError: # Fallback: just check if we have a recent timestamp return True def get_host_info(self) -> dict: """Get parsed host information""" return self.host_info or {} def get_endpoints(self) -> List[str]: """Get list of network endpoints""" return self.endpoints or [] def get_forced_tags(self) -> List[str]: """Get list of forced tags""" return self.forced_tags or [] def get_approved_routes(self) -> List[str]: """Get list of approved routes""" return self.approved_routes or [] class PreAuthKey(Base): """Pre-authentication keys table Keys used for automatic node registration without manual approval. """ __tablename__ = 'pre_auth_keys' id = Column(Integer, primary_key=True, autoincrement=True) key = Column(String) # The actual pre-auth key string user_id = Column(Integer, ForeignKey('users.id', ondelete='SET NULL')) reusable = Column(Boolean) # Can be used multiple times ephemeral = Column(Boolean, default=False) # Temporary key used = Column(Boolean, default=False) # Has been used tags = Column(JSONType) # Tags to apply to nodes using this key created_at = Column(DateTime) expiration = Column(DateTime) # When the key expires # Relationships user = relationship("User", back_populates="pre_auth_keys") def __repr__(self): return f"" def is_expired(self) -> bool: """Check if the pre-auth key is expired""" if not self.expiration: return False now = datetime.now() expiration = self.expiration # Handle timezone-aware datetime if expiration.tzinfo is not None and now.tzinfo is None: from datetime import timezone now = now.replace(tzinfo=timezone.utc) elif expiration.tzinfo is not None: expiration = expiration.replace(tzinfo=None) try: return now > expiration except TypeError: return False def is_valid(self) -> bool: """Check if the key is still valid for use""" if self.is_expired(): return False if self.used and not self.reusable: return False return True def get_tags(self) -> List[str]: """Get list of tags for this key""" return self.tags or [] class ApiKey(Base): """API Keys table Keys used for API authentication to the Headscale server. """ __tablename__ = 'api_keys' __table_args__ = ( Index('idx_api_keys_prefix', 'prefix', unique=True), ) id = Column(Integer, primary_key=True, autoincrement=True) prefix = Column(String) # Key prefix for identification (e.g., "8qRr1IB") hash = Column(LargeBinary) # Hashed key value created_at = Column(DateTime) expiration = Column(DateTime) # When the key expires last_seen = Column(DateTime) # Last time key was used def __repr__(self): return f"" def is_expired(self) -> bool: """Check if the API key is expired""" if not self.expiration: return False now = datetime.now() expiration = self.expiration # Handle timezone-aware datetime if expiration.tzinfo is not None and now.tzinfo is None: from datetime import timezone now = now.replace(tzinfo=timezone.utc) elif expiration.tzinfo is not None: expiration = expiration.replace(tzinfo=None) try: return now > expiration except TypeError: return False class Policy(Base): """ACL Policies table Stores Access Control List policies in JSON format. """ __tablename__ = 'policies' __table_args__ = ( Index('idx_policies_deleted_at', 'deleted_at'), ) id = Column(Integer, primary_key=True, autoincrement=True) created_at = Column(DateTime) updated_at = Column(DateTime) deleted_at = Column(DateTime) # Soft delete data = Column(Text) # JSON policy data def __repr__(self): return f"" def get_policy_data(self) -> dict: """Parse and return policy data as dictionary""" try: return json.loads(self.data) if self.data else {} except json.JSONDecodeError: return {} # ========================================== # Extended Models for FARMQ Customization # ========================================== class PharmacyInfo(Base): """Extended table for pharmacy information This extends the base Headscale functionality to store pharmacy-specific information for FARMQ management. """ __tablename__ = 'pharmacy_info' id = Column(Integer, primary_key=True, autoincrement=True) user_id = Column(String, ForeignKey('users.name'), unique=True) # Link to users.name pharmacy_name = Column(String, nullable=False) # 약국명 business_number = Column(String(20)) # 사업자번호 address = Column(Text) # 주소 phone = Column(String(20)) # 전화번호 manager_name = Column(String(100)) # 담당자명 proxmox_host = Column(String(255)) # Proxmox 호스트 IP proxmox_api_token = Column(Text) # Proxmox API 토큰 created_at = Column(DateTime, default=datetime.now) updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now) def __repr__(self): return f"" class MachineSpecs(Base): """Extended table for machine specifications Stores detailed hardware specifications for each machine/node. """ __tablename__ = 'machine_specs' id = Column(Integer, primary_key=True, autoincrement=True) machine_id = Column(Integer, ForeignKey('nodes.id'), nullable=False) pharmacy_id = Column(Integer, ForeignKey('pharmacy_info.id')) cpu_model = Column(String(255)) # CPU 모델명 cpu_cores = Column(Integer) # CPU 코어 수 ram_gb = Column(Integer) # RAM 용량 (GB) storage_gb = Column(Integer) # 스토리지 용량 (GB) gpu_model = Column(String(255)) # GPU 모델명 last_updated = Column(DateTime, default=datetime.now, onupdate=datetime.now) # Relationships machine = relationship("Node") pharmacy = relationship("PharmacyInfo") def __repr__(self): return f"" class MonitoringData(Base): """Real-time monitoring data table Stores time-series monitoring data collected from Proxmox hosts. """ __tablename__ = 'monitoring_data' id = Column(Integer, primary_key=True, autoincrement=True) machine_id = Column(Integer, ForeignKey('nodes.id'), nullable=False) cpu_usage = Column(String(5)) # CPU 사용률 (예: "75.50") memory_usage = Column(String(5)) # 메모리 사용률 disk_usage = Column(String(5)) # 디스크 사용률 cpu_temperature = Column(Integer) # CPU 온도 (섭씨) network_rx_bytes = Column(Integer) # 네트워크 수신 바이트 network_tx_bytes = Column(Integer) # 네트워크 송신 바이트 vm_count = Column(Integer) # 총 VM 개수 vm_running = Column(Integer) # 실행중인 VM 개수 collected_at = Column(DateTime, default=datetime.now) # Relationships machine = relationship("Node") def __repr__(self): return f"" def get_cpu_usage_float(self) -> float: """CPU 사용률을 float로 반환""" try: return float(self.cpu_usage) if self.cpu_usage else 0.0 except ValueError: return 0.0 # ========================================== # Database Helper Functions # ========================================== def create_all_tables(engine): """Create all tables in the database""" Base.metadata.create_all(engine) def get_active_nodes(session: Session) -> List[Node]: """Get all non-deleted nodes""" return session.query(Node).filter(Node.deleted_at.is_(None)).all() def get_online_nodes(session: Session, timeout_minutes: int = 5) -> List[Node]: """Get nodes that are currently online""" cutoff_time = datetime.now() - timedelta(minutes=timeout_minutes) return session.query(Node).filter( Node.deleted_at.is_(None), Node.last_seen > cutoff_time ).all() def get_user_with_pharmacy_info(session: Session, user_name: str): """Get user with associated pharmacy information""" return session.query(User).join(PharmacyInfo).filter(User.name == user_name).first() def get_node_with_specs_and_monitoring(session: Session, node_id: int): """Get node with hardware specs and latest monitoring data""" return session.query(Node)\ .outerjoin(MachineSpecs)\ .outerjoin(MonitoringData)\ .filter(Node.id == node_id)\ .first() # ========================================== # Usage Example # ========================================== if __name__ == "__main__": from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker # SQLite connection to Headscale database DATABASE_URL = "sqlite:///data/db.sqlite" engine = create_engine(DATABASE_URL) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) # Create extended tables (if needed) create_all_tables(engine) # Example usage session = SessionLocal() try: # Get all users users = session.query(User).all() print("=== Users ===") for user in users: print(f" {user}") # Get all nodes nodes = session.query(Node).all() print("\n=== Nodes ===") for node in nodes: print(f" {node}") print(f" Online: {node.is_online()}") print(f" Host Info: {node.get_host_info().get('Hostname', 'Unknown')}") # Get all API keys api_keys = session.query(ApiKey).all() print("\n=== API Keys ===") for key in api_keys: print(f" {key}") print(f" Expired: {key.is_expired()}") finally: session.close()