import enum from datetime import datetime, timezone from typing import Optional from sqlalchemy import Column, String, Integer, DateTime, Text, Enum, JSON, create_engine from sqlalchemy.orm import declarative_base, sessionmaker from sqlalchemy.pool import StaticPool from app.config import settings Base = declarative_base() class AlertStatus(str, enum.Enum): PENDING = "pending" CONFIRMED = "confirmed" IGNORED = "ignored" RESOLVED = "resolved" class AlertLevel(str, enum.Enum): LOW = "low" MEDIUM = "medium" HIGH = "high" CRITICAL = "critical" class Alert(Base): __tablename__ = "alerts" id = Column(Integer, primary_key=True, autoincrement=True) alert_no = Column(String(32), unique=True, nullable=False, index=True) camera_id = Column(String(64), nullable=False, index=True) roi_id = Column(String(64), nullable=True) alert_type = Column(String(64), nullable=False) algorithm = Column(String(128), nullable=True) confidence = Column(Integer, nullable=True) duration_minutes = Column(Integer, nullable=True) trigger_time = Column(DateTime, nullable=False) message = Column(Text, nullable=True) snapshot_url = Column(String(512), nullable=True) snapshot_path = Column(String(512), nullable=True) status = Column(Enum(AlertStatus), default=AlertStatus.PENDING) handle_remark = Column(Text, nullable=True) handled_by = Column(String(64), nullable=True) handled_at = Column(DateTime, nullable=True) ai_analysis = Column(JSON, nullable=True) created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc)) updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc)) def to_dict(self) -> dict: return { "id": self.id, "alert_no": self.alert_no, "camera_id": self.camera_id, "roi_id": self.roi_id, "alert_type": self.alert_type, "algorithm": self.algorithm, "confidence": self.confidence, "duration_minutes": self.duration_minutes, "trigger_time": self.trigger_time.isoformat() if self.trigger_time else None, "message": self.message, "snapshot_url": self.snapshot_url, "status": self.status.value if self.status else None, "handle_remark": self.handle_remark, "handled_by": self.handled_by, "handled_at": self.handled_at.isoformat() if self.handled_at else None, "ai_analysis": self.ai_analysis, "created_at": self.created_at.isoformat() if self.created_at else None, "updated_at": self.updated_at.isoformat() if self.updated_at else None, } _engine = None _SessionLocal = None def get_engine(): global _engine if _engine is None: db_url = settings.database.url connect_args = {} if "sqlite" in db_url: db_path = db_url.replace("sqlite:///", "") os.makedirs(os.path.dirname(db_path) if os.path.dirname(db_path) else ".", exist_ok=True) connect_args = {"check_same_thread": False} _engine = create_engine( db_url, echo=settings.app.debug, poolclass=StaticPool if "sqlite" in db_url else None, connect_args=connect_args, ) else: _engine = create_engine(db_url, echo=settings.app.debug) return _engine def get_session(): global _SessionLocal if _SessionLocal is None: _SessionLocal = sessionmaker(bind=get_engine(), autocommit=False, autoflush=False) return _SessionLocal() def init_db(): engine = get_engine() Base.metadata.create_all(bind=engine) def close_db(): global _engine, _SessionLocal if _engine: _engine.dispose() _engine = None _SessionLocal = None