""" Headscale SQLite Database Models for SQLAlchemy Based on actual schema analysis of Headscale v0.23.0 Generated from: /var/lib/headscale/db.sqlite Schema Analysis Date: 2025-09-09 """ from datetime import datetime, timedelta from typing import Optional, List import json from sqlalchemy import ( Column, Integer, String, DateTime, Boolean, Text, ForeignKey, LargeBinary, Index, UniqueConstraint ) from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import relationship, Session from sqlalchemy.types import TypeDecorator, TEXT Base = declarative_base() class JSONType(TypeDecorator): """Custom JSON type for SQLAlchemy that handles JSON serialization""" impl = TEXT def process_bind_param(self, value, dialect): if value is not None: return json.dumps(value) return value def process_result_value(self, value, dialect): if value is not None: return json.loads(value) return value class Migration(Base): """Migration tracking table""" __tablename__ = 'migrations' id = Column(String, primary_key=True) def __repr__(self): return f"" class User(Base): """Headscale Users table Represents individual users/namespaces in the Headscale network. Each user can have multiple nodes (machines) associated with them. """ __tablename__ = 'users' __table_args__ = ( Index('idx_users_deleted_at', 'deleted_at'), Index('idx_provider_identifier', 'provider_identifier', postgresql_where="provider_identifier IS NOT NULL"), Index('idx_name_provider_identifier', 'name', 'provider_identifier'), Index('idx_name_no_provider_identifier', 'name', postgresql_where="provider_identifier IS NULL"), ) id = Column(Integer, primary_key=True, autoincrement=True) created_at = Column(DateTime) updated_at = Column(DateTime) deleted_at = Column(DateTime) # Soft delete name = Column(String) # User identifier (e.g., "myuser") display_name = Column(String) # Human-readable display name email = Column(String) # User email address provider_identifier = Column(String) # External auth provider ID provider = Column(String) # Auth provider name (OIDC, etc.) profile_pic_url = Column(String) # Profile picture URL # Relationships nodes = relationship("Node", back_populates="user", cascade="all, delete-orphan") pre_auth_keys = relationship("PreAuthKey", back_populates="user") def __repr__(self): return f"" def is_deleted(self) -> bool: """Check if user is soft-deleted""" return self.deleted_at is not None class Node(Base): """Headscale Nodes (Machines) table Represents individual devices/machines connected to the Tailnet. Each node belongs to a user and has various networking attributes. """ __tablename__ = 'nodes' id = Column(Integer, primary_key=True, autoincrement=True) machine_key = Column(String) # Machine's public key node_key = Column(String) # Node's network key disco_key = Column(String) # Discovery key for peer-to-peer connections endpoints = Column(JSONType) # List of network endpoints (JSON array) host_info = Column(JSONType) # Detailed host information (JSON object) ipv4 = Column(String) # Assigned IPv4 address (e.g., "100.64.0.1") ipv6 = Column(String) # Assigned IPv6 address hostname = Column(String) # Machine hostname given_name = Column(String) # User-assigned machine name user_id = Column(Integer, ForeignKey('users.id', ondelete='CASCADE')) register_method = Column(String) # Registration method (e.g., "authkey") forced_tags = Column(JSONType) # Tags forced on this node (JSON array) auth_key_id = Column(Integer, ForeignKey('pre_auth_keys.id')) expiry = Column(DateTime) # Node expiration date last_seen = Column(DateTime) # Last activity timestamp approved_routes = Column(JSONType) # Approved subnet routes (JSON array) created_at = Column(DateTime) updated_at = Column(DateTime) deleted_at = Column(DateTime) # Soft delete # Relationships user = relationship("User", back_populates="nodes") auth_key = relationship("PreAuthKey") def __repr__(self): return f"" def is_online(self, timeout_minutes: int = 1440) -> bool: # 24 hours timeout like Tailscale """Check if node is considered online based on last_seen""" if not self.last_seen: return False # Handle timezone-aware datetime properly now = datetime.now() last_seen = self.last_seen # Convert both to naive datetime to avoid timezone issues if last_seen.tzinfo is not None: last_seen = last_seen.replace(tzinfo=None) if now.tzinfo is not None: now = now.replace(tzinfo=None) try: time_diff_seconds = (now - last_seen).total_seconds() # Consider online if last seen within timeout_minutes is_recent = time_diff_seconds < (timeout_minutes * 60) return is_recent except TypeError as e: # Fallback: just check if we have a recent timestamp return True def get_host_info(self) -> dict: """Get parsed host information""" return self.host_info or {} def get_endpoints(self) -> List[str]: """Get list of network endpoints""" return self.endpoints or [] def get_forced_tags(self) -> List[str]: """Get list of forced tags""" return self.forced_tags or [] def get_approved_routes(self) -> List[str]: """Get list of approved routes""" return self.approved_routes or [] class PreAuthKey(Base): """Pre-authentication keys table Keys used for automatic node registration without manual approval. """ __tablename__ = 'pre_auth_keys' id = Column(Integer, primary_key=True, autoincrement=True) key = Column(String) # The actual pre-auth key string user_id = Column(Integer, ForeignKey('users.id', ondelete='SET NULL')) reusable = Column(Boolean) # Can be used multiple times ephemeral = Column(Boolean, default=False) # Temporary key used = Column(Boolean, default=False) # Has been used tags = Column(JSONType) # Tags to apply to nodes using this key created_at = Column(DateTime) expiration = Column(DateTime) # When the key expires # Relationships user = relationship("User", back_populates="pre_auth_keys") def __repr__(self): return f"" def is_expired(self) -> bool: """Check if the pre-auth key is expired""" if not self.expiration: return False now = datetime.now() expiration = self.expiration # Handle timezone-aware datetime if expiration.tzinfo is not None and now.tzinfo is None: from datetime import timezone now = now.replace(tzinfo=timezone.utc) elif expiration.tzinfo is not None: expiration = expiration.replace(tzinfo=None) try: return now > expiration except TypeError: return False def is_valid(self) -> bool: """Check if the key is still valid for use""" if self.is_expired(): return False if self.used and not self.reusable: return False return True def get_tags(self) -> List[str]: """Get list of tags for this key""" return self.tags or [] class ApiKey(Base): """API Keys table Keys used for API authentication to the Headscale server. """ __tablename__ = 'api_keys' __table_args__ = ( Index('idx_api_keys_prefix', 'prefix', unique=True), ) id = Column(Integer, primary_key=True, autoincrement=True) prefix = Column(String) # Key prefix for identification (e.g., "8qRr1IB") hash = Column(LargeBinary) # Hashed key value created_at = Column(DateTime) expiration = Column(DateTime) # When the key expires last_seen = Column(DateTime) # Last time key was used def __repr__(self): return f"" def is_expired(self) -> bool: """Check if the API key is expired""" if not self.expiration: return False now = datetime.now() expiration = self.expiration # Handle timezone-aware datetime if expiration.tzinfo is not None and now.tzinfo is None: from datetime import timezone now = now.replace(tzinfo=timezone.utc) elif expiration.tzinfo is not None: expiration = expiration.replace(tzinfo=None) try: return now > expiration except TypeError: return False class Policy(Base): """ACL Policies table Stores Access Control List policies in JSON format. """ __tablename__ = 'policies' __table_args__ = ( Index('idx_policies_deleted_at', 'deleted_at'), ) id = Column(Integer, primary_key=True, autoincrement=True) created_at = Column(DateTime) updated_at = Column(DateTime) deleted_at = Column(DateTime) # Soft delete data = Column(Text) # JSON policy data def __repr__(self): return f"" def get_policy_data(self) -> dict: """Parse and return policy data as dictionary""" try: return json.loads(self.data) if self.data else {} except json.JSONDecodeError: return {} # ========================================== # Extended Models for FARMQ Customization # ========================================== # ========================================== # Extended Models for FARMQ Customization - DEPRECATED # ========================================== # # 주의: 이 모델들은 Headscale과 외래키 충돌을 일으키므로 더 이상 사용하지 않습니다. # 대신 farmq_models.py의 독립적인 모델들을 사용하세요. # # 마이그레이션 가이드: # - PharmacyInfo -> farmq_models.PharmacyInfo # - MachineSpecs -> farmq_models.MachineProfile # - MonitoringData -> farmq_models.MonitoringMetrics # # 이 클래스들은 하위 호환성을 위해 유지되지만 실제 테이블은 생성되지 않습니다. # ========================================== # Database Helper Functions # ========================================== def create_all_tables(engine): """Create all tables in the database""" Base.metadata.create_all(engine) def get_active_nodes(session: Session) -> List[Node]: """Get all non-deleted nodes""" return session.query(Node).filter(Node.deleted_at.is_(None)).all() def get_online_nodes(session: Session, timeout_minutes: int = 5) -> List[Node]: """Get nodes that are currently online""" cutoff_time = datetime.now() - timedelta(minutes=timeout_minutes) return session.query(Node).filter( Node.deleted_at.is_(None), Node.last_seen > cutoff_time ).all() def get_user_with_pharmacy_info(session: Session, user_name: str): """Get user with associated pharmacy information""" return session.query(User).join(PharmacyInfo).filter(User.name == user_name).first() def get_node_with_specs_and_monitoring(session: Session, node_id: int): """Get node with hardware specs and latest monitoring data""" return session.query(Node)\ .outerjoin(MachineSpecs)\ .outerjoin(MonitoringData)\ .filter(Node.id == node_id)\ .first() # ========================================== # Usage Example # ========================================== if __name__ == "__main__": from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker # SQLite connection to Headscale database DATABASE_URL = "sqlite:///data/db.sqlite" engine = create_engine(DATABASE_URL) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) # Create extended tables (if needed) create_all_tables(engine) # Example usage session = SessionLocal() try: # Get all users users = session.query(User).all() print("=== Users ===") for user in users: print(f" {user}") # Get all nodes nodes = session.query(Node).all() print("\n=== Nodes ===") for node in nodes: print(f" {node}") print(f" Online: {node.is_online()}") print(f" Host Info: {node.get_host_info().get('Hostname', 'Unknown')}") # Get all API keys api_keys = session.query(ApiKey).all() print("\n=== API Keys ===") for key in api_keys: print(f" {key}") print(f" Expired: {key.is_expired()}") finally: session.close()