Files
iot-device-management-service/app/models.py

120 lines
3.8 KiB
Python
Raw Normal View History

import enum
from datetime import datetime, timezone
from typing import Optional
from sqlalchemy import Column, String, Integer, DateTime, Text, Enum, JSON, create_engine
from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy.pool import StaticPool
from app.config import settings
Base = declarative_base()
class AlertStatus(str, enum.Enum):
PENDING = "pending"
CONFIRMED = "confirmed"
IGNORED = "ignored"
RESOLVED = "resolved"
class AlertLevel(str, enum.Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class Alert(Base):
__tablename__ = "alerts"
id = Column(Integer, primary_key=True, autoincrement=True)
alert_no = Column(String(32), unique=True, nullable=False, index=True)
camera_id = Column(String(64), nullable=False, index=True)
roi_id = Column(String(64), nullable=True)
alert_type = Column(String(64), nullable=False)
algorithm = Column(String(128), nullable=True)
confidence = Column(Integer, nullable=True)
duration_minutes = Column(Integer, nullable=True)
trigger_time = Column(DateTime, nullable=False)
message = Column(Text, nullable=True)
snapshot_url = Column(String(512), nullable=True)
snapshot_path = Column(String(512), nullable=True)
status = Column(Enum(AlertStatus), default=AlertStatus.PENDING)
handle_remark = Column(Text, nullable=True)
handled_by = Column(String(64), nullable=True)
handled_at = Column(DateTime, nullable=True)
ai_analysis = Column(JSON, nullable=True)
created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))
updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc))
def to_dict(self) -> dict:
return {
"id": self.id,
"alert_no": self.alert_no,
"camera_id": self.camera_id,
"roi_id": self.roi_id,
"alert_type": self.alert_type,
"algorithm": self.algorithm,
"confidence": self.confidence,
"duration_minutes": self.duration_minutes,
"trigger_time": self.trigger_time.isoformat() if self.trigger_time else None,
"message": self.message,
"snapshot_url": self.snapshot_url,
"status": self.status.value if self.status else None,
"handle_remark": self.handle_remark,
"handled_by": self.handled_by,
"handled_at": self.handled_at.isoformat() if self.handled_at else None,
"ai_analysis": self.ai_analysis,
"created_at": self.created_at.isoformat() if self.created_at else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None,
}
_engine = None
_SessionLocal = None
def get_engine():
global _engine
if _engine is None:
db_url = settings.database.url
connect_args = {}
if "sqlite" in db_url:
db_path = db_url.replace("sqlite:///", "")
os.makedirs(os.path.dirname(db_path) if os.path.dirname(db_path) else ".", exist_ok=True)
connect_args = {"check_same_thread": False}
_engine = create_engine(
db_url,
echo=settings.app.debug,
poolclass=StaticPool if "sqlite" in db_url else None,
connect_args=connect_args,
)
else:
_engine = create_engine(db_url, echo=settings.app.debug)
return _engine
def get_session():
global _SessionLocal
if _SessionLocal is None:
_SessionLocal = sessionmaker(bind=get_engine(), autocommit=False, autoflush=False)
return _SessionLocal()
def init_db():
engine = get_engine()
Base.metadata.create_all(bind=engine)
def close_db():
global _engine, _SessionLocal
if _engine:
_engine.dispose()
_engine = None
_SessionLocal = None