commit baa895a6f193f7b4f0042b95c02be70dba142eb9 Author: 16337 <1633794139@qq.com> Date: Mon Feb 2 09:40:02 2026 +0800 feat: 初始化告警平台后端项目 - 创建 FastAPI 项目结构 - 实现告警数据模型(SQLAlchemy) - 实现 multipart/form-data 告警接收接口 - 实现阿里云 OSS 图片上传模块 - 实现告警查询和处理 API - 实现异步大模型分析模块 diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..b76d447 --- /dev/null +++ b/.env.example @@ -0,0 +1,18 @@ +# 数据库配置 +DATABASE_URL=sqlite:///./data/alert_platform.db + +# 阿里云 OSS 配置 +OSS_ACCESS_KEY_ID=your_access_key_id +OSS_ACCESS_KEY_SECRET=your_access_key_secret +OSS_ENDPOINT=oss-cn-hangzhou.aliyuncs.com +OSS_BUCKET_NAME=your_bucket_name +OSS_URL_PREFIX=https://your-bucket-name.oss-cn-hangzhou.aliyuncs.com + +# 应用配置 +APP_HOST=0.0.0.0 +APP_PORT=8000 +DEBUG=true + +# 大模型配置(可选) +AI_MODEL_ENDPOINT=http://localhost:8001 +AI_MODEL_API_KEY=your_api_key diff --git a/.trae/documents/plan_20260130_093809.md b/.trae/documents/plan_20260130_093809.md new file mode 100644 index 0000000..17c817d --- /dev/null +++ b/.trae/documents/plan_20260130_093809.md @@ -0,0 +1,54 @@ +## 安保设备管理 & 事件告警平台实现计划 + +按照你的架构设计,我将分 8 个阶段实现: + +### 阶段 1:项目骨架(1-2 小时) +- 创建目录结构 `security_platform/` +- 配置 `settings.py`(应用名、数据库、Redis) +- 配置 `requirements.txt`(FastAPI、SQLAlchemy 等依赖) + +### 阶段 2:数据库层(2-3 小时) +- 创建 `infra/db.py`(SQLAlchemy engine 和 session 管理) +- 创建 `core/device/models.py`(Device 设备模型) +- 创建 `core/event/models.py`(Event 事件模型) +- 创建 Repository 基类和设备/事件仓储实现 + +### 阶段 3:设备管理模块(2-3 小时) +- 创建 `schemas/device.py`(Pydantic 请求/响应模型) +- 实现 `core/device/service.py`(设备业务逻辑) +- 创建 `api/device.py`(设备 CRUD + 心跳接口) +- 支持对接 IoT 平台的设备注册 + +### 阶段 4:事件告警模块(2-3 小时) +- 创建 `schemas/event.py`(事件 Pydantic 模型) +- 实现 `core/event/service.py`(事件处理逻辑) +- 实现 `core/event/rules.py`(告警规则引擎) +- 创建 `api/event.py`(事件上报、查询接口) + +### 阶段 5:WebSocket 实时推送(1-2 小时) +- 实现 `api/realtime.py`(WebSocket 端点) +- 告警实时推送机制 +- 连接管理和消息广播 + +### 阶段 6:辅助模块(1 小时) +- 创建 `api/health.py`(健康检查) +- 创建 `utils/logger.py`(日志工具) +- 创建 `utils/time.py`(时间工具) + +### 阶段 7:部署配置(1 小时) +- 创建 `deploy/Dockerfile` +- 创建 `deploy/docker-compose.yml` +- 创建 `deploy/nginx.conf` + +### 阶段 8:文档和测试(1 小时) +- 生成 `README.md` +- 编写基础 API 文档 + +**预计总工时:11-15 小时** + +--- +**核心技术栈确认:** +- FastAPI + Uvicorn + Gunicorn +- SQLite(本地)或 MySQL(云端) +- Redis(可选,用于 WebSocket 订阅) +- SQLAlchemy 2.0 \ No newline at end of file diff --git a/.trae/documents/plan_20260202_013208.md b/.trae/documents/plan_20260202_013208.md new file mode 100644 index 0000000..147c14a --- /dev/null +++ b/.trae/documents/plan_20260202_013208.md @@ -0,0 +1,151 @@ +## AI 告警平台开发计划 + +### 项目定位 +**职责**:仅接收边缘端告警、保存告警证据、提供可视化与人工处理能力 +**不负责**:设备管理、推理逻辑、用户体系(复用芋道前端用户体系) + +--- + +### 一、边缘端 → 告警平台 数据协议 + +**传输方式**:multipart/form-data(JSON + 图片文件) + +**JSON 字段**: +```json +{ + "camera_id": "cam-001", + "roi_id": "roi-01", + "alert_type": "leave_post", + "algorithm": "LeavePostAlgorithm", + "confidence": 0.85, + "duration_minutes": 5, + "trigger_time": "2024-01-20T10:30:00Z", + "message": "离岗告警" +} +``` + +**图片字段**:`snapshot`(二进制文件) + +--- + +### 二、后端架构(FastAPI) + +``` +alert_platform/ +├── app/ +│ ├── main.py # FastAPI 入口 +│ ├── config.py # 配置(SQLite + 图片存储路径) +│ ├── models.py # SQLAlchemy 告警模型 +│ ├── schemas.py # Pydantic 请求/响应模型 +│ ├── api/ +│ │ └── alert.py # 告警接收 & 查询接口 +│ ├── service/ +│ │ ├── alert_service.py # 告警业务逻辑 +│ │ └── analyzer.py # 异步大模型分析 +│ └── storage.py # 图片存储 +├── data/ # SQLite + 图片目录 +├── requirements.txt +└── Dockerfile +``` + +**核心接口**: +| 方法 | 路径 | 描述 | +|------|------|------| +| POST | `/api/v1/alerts` | 接收边缘端告警(multipart/form-data) | +| GET | `/api/v1/alerts` | 查询告警列表(分页 + 筛选) | +| GET | `/api/v1/alerts/{id}` | 告警详情 | +| PUT | `/api/v1/alerts/{id}/handle` | 处理告警(确认/忽略/备注) | +| GET | `/api/v1/alerts/{id}/image` | 获取告警图片 | +| GET | `/api/v1/alerts/statistics` | 告警统计 | + +--- + +### 三、前端架构(芋道 UI) + +**在 yudao-ui-admin-vben 中新增模块**: + +``` +apps/web-antd/src/ +├── api/alert/ +│ └── alert.ts # 告警 API +├── views/alert/ +│ ├── list/ +│ │ ├── data.ts # 列表表单/列配置 +│ │ └── index.vue # 告警列表页 +│ └── detail/ +│ ├── data.ts +│ └── index.vue # 告警详情页 +└── router/routes/modules/ + └── alert.ts # 告警路由 +``` + +**前端功能**: +- 告警列表:时间/类型/状态筛选 + 分页 +- 告警详情:图片展示 + 算法信息 + 离岗时长 +- 人工处理:确认、忽略、备注操作 +- 大模型分析结果展示(可选) + +--- + +### 四、大模型分析(异步) + +**流程**: +1. 告警写入数据库后,触发异步任务 +2. 任务队列发送图片 + 上下文到大模型服务 +3. 大模型返回:风险等级、告警描述、误报建议 +4. 结果写入告警记录的 `ai_analysis` 扩展字段 + +**实现方式**:Python `asyncio` 或 Celery(预留) + +--- + +### 五、技术栈 + +| 层级 | 技术 | +|------|------| +| 后端框架 | FastAPI + Uvicorn | +| 数据库 | SQLite(轻量)或 MySQL | +| 图片存储 | 本地文件系统 / 对象存储 | +| 前端框架 | Vue 3 + Vben Admin(芋道) | +| 异步任务 | asyncio(预留 Celery) | + +--- + +### 六、开发优先级 + +**Phase 1(核心)**: +1. 后端告警接收接口(multipart/form-data) +2. 图片存储与访问 +3. SQLite 持久化 +4. 告警查询接口 + +**Phase 2(前端)**: +1. 告警列表页 +2. 告警详情页 +3. 人工处理功能 + +**Phase 3(增强)**: +1. 异步大模型分析 +2. 告警统计图表 +3. Docker 部署 + +--- + +### 七、与 ai_edge 对接示例 + +边缘端(ai_edge)调用: +```python +import requests + +files = { + 'snapshot': ('alert.jpg', image_bytes, 'image/jpeg'), + 'data': (None, json.dumps(alert_data), 'application/json') +} + +response = requests.post( + "http://alert-platform/api/v1/alerts", + files=files +) +``` + +--- \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..1ce250f --- /dev/null +++ b/README.md @@ -0,0 +1,50 @@ +# AI 告警平台 + +轻量级告警接收与处理平台,接收边缘端告警数据,存储到数据库并提供可视化查询能力。 + +## 功能特性 + +- 告警接收:支持边缘端通过 multipart/form-data 上报告警 +- 图片存储:阿里云 OSS 对象存储 +- 告警管理:查询、筛选、处理告警 +- 大模型分析:异步告警智能分析 + +## 快速开始 + +```bash +# 安装依赖 +pip install -r requirements.txt + +# 配置环境变量 +cp .env.example .env +# 编辑 .env 文件,配置数据库和 OSS + +# 启动服务 +python -m app.main +``` + +## 项目结构 + +``` +alert_platform/ +├── app/ +│ ├── main.py # FastAPI 入口 +│ ├── config.py # 配置 +│ ├── models.py # 数据模型 +│ ├── schemas.py # Pydantic 模型 +│ ├── api/ +│ │ └── alert.py # 告警 API +│ ├── services/ +│ │ ├── alert_service.py # 告警业务 +│ │ └── oss_storage.py # OSS 存储 +│ └── utils/ +│ └── logger.py # 日志工具 +├── data/ # 数据库目录 +├── uploads/ # 本地临时存储 +├── requirements.txt +└── .env.example +``` + +## API 文档 + +启动后访问:http://localhost:8000/docs diff --git a/app/config.py b/app/config.py new file mode 100644 index 0000000..0ef4783 --- /dev/null +++ b/app/config.py @@ -0,0 +1,69 @@ +import os +from pathlib import Path +from typing import Optional +from dataclasses import dataclass +from pydantic import BaseModel + + +@dataclass +class DatabaseConfig: + url: str = "sqlite:///./data/alert_platform.db" + + +@dataclass +class OSSConfig: + access_key_id: str = "" + access_key_secret: str = "" + endpoint: str = "oss-cn-hangzhou.aliyuncs.com" + bucket_name: str = "" + url_prefix: str = "" + + +@dataclass +class AppConfig: + host: str = "0.0.0.0" + port: int = 8000 + debug: bool = True + + +@dataclass +class AIModelConfig: + endpoint: str = "" + api_key: str = "" + + +class Settings(BaseModel): + database: DatabaseConfig = DatabaseConfig() + oss: OSSConfig = OSSConfig() + app: AppConfig = AppConfig() + ai_model: AIModelConfig = AIModelConfig() + + +def load_settings() -> Settings: + from dotenv import load_dotenv + load_dotenv() + + return Settings( + database=DatabaseConfig( + url=os.getenv("DATABASE_URL", "sqlite:///./data/alert_platform.db"), + ), + oss=OSSConfig( + access_key_id=os.getenv("OSS_ACCESS_KEY_ID", ""), + access_key_secret=os.getenv("OSS_ACCESS_KEY_SECRET", ""), + endpoint=os.getenv("OSS_ENDPOINT", "oss-cn-hangzhou.aliyuncs.com"), + bucket_name=os.getenv("OSS_BUCKET_NAME", ""), + url_prefix=os.getenv("OSS_URL_PREFIX", ""), + ), + app=AppConfig( + host=os.getenv("APP_HOST", "0.0.0.0"), + port=int(os.getenv("APP_PORT", "8000")), + debug=os.getenv("DEBUG", "true").lower() == "true", + ), + ai_model=AIModelConfig( + endpoint=os.getenv("AI_MODEL_ENDPOINT", ""), + api_key=os.getenv("AI_MODEL_API_KEY", ""), + ), + ) + + +settings = load_settings() diff --git a/app/main.py b/app/main.py new file mode 100644 index 0000000..f146a28 --- /dev/null +++ b/app/main.py @@ -0,0 +1,182 @@ +from fastapi import FastAPI, UploadFile, File, Form, HTTPException, Depends, Query +from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import FileResponse +from typing import Optional +from datetime import datetime +from pathlib import Path + +from app.config import settings +from app.models import init_db, get_engine, AlertStatus +from app.schemas import ( + AlertCreate, + AlertResponse, + AlertListResponse, + AlertHandleRequest, + AlertStatisticsResponse, + HealthResponse, +) +from app.services.alert_service import get_alert_service +from app.services.ai_analyzer import trigger_async_analysis +from app.utils.logger import logger +import json + + +app = FastAPI( + title="AI 告警平台", + description="接收边缘端告警,提供告警查询与处理能力", + version="1.0.0", +) + +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + + +def get_alert_service(): + return get_alert_service() + + +@app.on_event("startup") +async def startup(): + init_db() + logger.info("AI 告警平台启动") + + +@app.get("/health", response_model=HealthResponse) +async def health_check(): + db_status = "healthy" + try: + engine = get_engine() + with engine.connect() as conn: + conn.execute("SELECT 1") + except Exception as e: + db_status = f"unhealthy: {e}" + + return HealthResponse( + status="healthy" if db_status == "healthy" else "degraded", + database=db_status, + ) + + +@app.post("/api/v1/alerts", response_model=AlertResponse) +async def create_alert( + data: str = Form(..., description="JSON数据"), + snapshot: Optional[UploadFile] = File(None), + service=Depends(get_alert_service), +): + try: + alert_data = json.loads(data) + alert_create = AlertCreate(**alert_data) + except json.JSONDecodeError as e: + raise HTTPException(status_code=400, detail=f"JSON解析失败: {e}") + + snapshot_data = None + if snapshot: + snapshot_data = await snapshot.read() + + alert = service.create_alert(alert_create, snapshot_data) + + if alert.snapshot_url and settings.ai_model.endpoint: + trigger_async_analysis( + alert_id=alert.id, + snapshot_url=alert.snapshot_url, + alert_info={ + "camera_id": alert.camera_id, + "roi_id": alert.roi_id, + "alert_type": alert.alert_type, + "confidence": alert.confidence, + "duration_minutes": alert.duration_minutes, + }, + ) + + return AlertResponse(**alert.to_dict()) + + +@app.get("/api/v1/alerts", response_model=AlertListResponse) +async def list_alerts( + camera_id: Optional[str] = Query(None, description="摄像头ID"), + alert_type: Optional[str] = Query(None, description="告警类型"), + status: Optional[str] = Query(None, description="处理状态"), + start_time: Optional[datetime] = Query(None, description="开始时间"), + end_time: Optional[datetime] = Query(None, description="结束时间"), + page: int = Query(1, ge=1), + page_size: int = Query(20, ge=1, le=100), + service=Depends(get_alert_service), +): + alerts, total = service.get_alerts( + camera_id=camera_id, + alert_type=alert_type, + status=status, + start_time=start_time, + end_time=end_time, + page=page, + page_size=page_size, + ) + + return AlertListResponse( + alerts=[AlertResponse(**a.to_dict()) for a in alerts], + total=total, + page=page, + page_size=page_size, + ) + + +@app.get("/api/v1/alerts/{alert_id}", response_model=AlertResponse) +async def get_alert( + alert_id: int, + service=Depends(get_alert_service), +): + alert = service.get_alert(alert_id) + if not alert: + raise HTTPException(status_code=404, detail="告警不存在") + return AlertResponse(**alert.to_dict()) + + +@app.put("/api/v1/alerts/{alert_id}/handle", response_model=AlertResponse) +async def handle_alert( + alert_id: int, + handle_data: AlertHandleRequest, + handled_by: Optional[str] = Query(None, description="处理人"), + service=Depends(get_alert_service), +): + alert = service.handle_alert(alert_id, handle_data, handled_by) + if not alert: + raise HTTPException(status_code=404, detail="告警不存在") + return AlertResponse(**alert.to_dict()) + + +@app.get("/api/v1/alerts/statistics", response_model=AlertStatisticsResponse) +async def get_statistics( + service=Depends(get_alert_service), +): + stats = service.get_statistics() + return AlertStatisticsResponse(**stats) + + +@app.get("/api/v1/alerts/{alert_id}/image") +async def get_alert_image( + alert_id: int, + service=Depends(get_alert_service), +): + alert = service.get_alert(alert_id) + if not alert: + raise HTTPException(status_code=404, detail="告警不存在") + + if not alert.snapshot_path: + raise HTTPException(status_code=404, detail="图片不存在") + + return FileResponse(alert.snapshot_path) + + +if __name__ == "__main__": + import uvicorn + uvicorn.run( + "app.main:app", + host=settings.app.host, + port=settings.app.port, + reload=settings.app.debug, + ) diff --git a/app/models.py b/app/models.py new file mode 100644 index 0000000..7cf8442 --- /dev/null +++ b/app/models.py @@ -0,0 +1,119 @@ +import enum +from datetime import datetime, timezone +from typing import Optional +from sqlalchemy import Column, String, Integer, DateTime, Text, Enum, JSON, create_engine +from sqlalchemy.orm import declarative_base, sessionmaker +from sqlalchemy.pool import StaticPool + +from app.config import settings + +Base = declarative_base() + + +class AlertStatus(str, enum.Enum): + PENDING = "pending" + CONFIRMED = "confirmed" + IGNORED = "ignored" + RESOLVED = "resolved" + + +class AlertLevel(str, enum.Enum): + LOW = "low" + MEDIUM = "medium" + HIGH = "high" + CRITICAL = "critical" + + +class Alert(Base): + __tablename__ = "alerts" + + id = Column(Integer, primary_key=True, autoincrement=True) + alert_no = Column(String(32), unique=True, nullable=False, index=True) + + camera_id = Column(String(64), nullable=False, index=True) + roi_id = Column(String(64), nullable=True) + alert_type = Column(String(64), nullable=False) + algorithm = Column(String(128), nullable=True) + confidence = Column(Integer, nullable=True) + duration_minutes = Column(Integer, nullable=True) + trigger_time = Column(DateTime, nullable=False) + message = Column(Text, nullable=True) + + snapshot_url = Column(String(512), nullable=True) + snapshot_path = Column(String(512), nullable=True) + + status = Column(Enum(AlertStatus), default=AlertStatus.PENDING) + handle_remark = Column(Text, nullable=True) + handled_by = Column(String(64), nullable=True) + handled_at = Column(DateTime, nullable=True) + + ai_analysis = Column(JSON, nullable=True) + + created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc)) + updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc)) + + def to_dict(self) -> dict: + return { + "id": self.id, + "alert_no": self.alert_no, + "camera_id": self.camera_id, + "roi_id": self.roi_id, + "alert_type": self.alert_type, + "algorithm": self.algorithm, + "confidence": self.confidence, + "duration_minutes": self.duration_minutes, + "trigger_time": self.trigger_time.isoformat() if self.trigger_time else None, + "message": self.message, + "snapshot_url": self.snapshot_url, + "status": self.status.value if self.status else None, + "handle_remark": self.handle_remark, + "handled_by": self.handled_by, + "handled_at": self.handled_at.isoformat() if self.handled_at else None, + "ai_analysis": self.ai_analysis, + "created_at": self.created_at.isoformat() if self.created_at else None, + "updated_at": self.updated_at.isoformat() if self.updated_at else None, + } + + +_engine = None +_SessionLocal = None + + +def get_engine(): + global _engine + if _engine is None: + db_url = settings.database.url + connect_args = {} + if "sqlite" in db_url: + db_path = db_url.replace("sqlite:///", "") + os.makedirs(os.path.dirname(db_path) if os.path.dirname(db_path) else ".", exist_ok=True) + connect_args = {"check_same_thread": False} + _engine = create_engine( + db_url, + echo=settings.app.debug, + poolclass=StaticPool if "sqlite" in db_url else None, + connect_args=connect_args, + ) + else: + _engine = create_engine(db_url, echo=settings.app.debug) + return _engine + + +def get_session(): + global _SessionLocal + if _SessionLocal is None: + _SessionLocal = sessionmaker(bind=get_engine(), autocommit=False, autoflush=False) + return _SessionLocal() + + +def init_db(): + engine = get_engine() + Base.metadata.create_all(bind=engine) + + +def close_db(): + global _engine, _SessionLocal + if _engine: + _engine.dispose() + _engine = None + _SessionLocal = None diff --git a/app/schemas.py b/app/schemas.py new file mode 100644 index 0000000..cb46914 --- /dev/null +++ b/app/schemas.py @@ -0,0 +1,66 @@ +import os +from pydantic import BaseModel, Field +from typing import Optional, List +from datetime import datetime + + +class AlertCreate(BaseModel): + camera_id: str = Field(..., description="摄像头标识") + roi_id: Optional[str] = Field(None, description="ROI区域标识") + alert_type: str = Field(..., description="告警类型") + algorithm: Optional[str] = Field(None, description="算法名称") + confidence: Optional[int] = Field(None, ge=0, le=100, description="置信度") + duration_minutes: Optional[int] = Field(None, ge=0, description="离岗时长(分钟)") + trigger_time: datetime = Field(..., description="触发时间") + message: Optional[str] = Field(None, description="告警消息") + + +class AlertResponse(BaseModel): + id: int + alert_no: str + camera_id: str + roi_id: Optional[str] + alert_type: str + algorithm: Optional[str] + confidence: Optional[int] + duration_minutes: Optional[int] + trigger_time: Optional[str] + message: Optional[str] + snapshot_url: Optional[str] + status: str + handle_remark: Optional[str] + handled_by: Optional[str] + handled_at: Optional[str] + ai_analysis: Optional[dict] + created_at: Optional[str] + updated_at: Optional[str] + + class Config: + from_attributes = True + + +class AlertListResponse(BaseModel): + alerts: List[AlertResponse] + total: int + page: int + page_size: int + + +class AlertHandleRequest(BaseModel): + status: str = Field(..., description="处理状态: confirmed/ignored/resolved") + remark: Optional[str] = Field(None, description="处理备注") + + +class AlertStatisticsResponse(BaseModel): + total: int + pending: int + confirmed: int + ignored: int + resolved: int + by_type: dict + by_level: dict + + +class HealthResponse(BaseModel): + status: str + database: str diff --git a/app/services/ai_analyzer.py b/app/services/ai_analyzer.py new file mode 100644 index 0000000..5318683 --- /dev/null +++ b/app/services/ai_analyzer.py @@ -0,0 +1,54 @@ +import asyncio +import httpx +from typing import Optional + +from app.config import settings +from app.utils.logger import logger + + +class AIAnalyzer: + def __init__(self): + self.endpoint = settings.ai_model.endpoint + self.api_key = settings.ai_model.api_key + + async def analyze_alert( + self, + alert_id: int, + snapshot_url: str, + alert_info: dict, + ) -> Optional[dict]: + if not self.endpoint: + logger.debug("AI模型端点未配置,跳过分析") + return None + + try: + async with httpx.AsyncClient(timeout=30.0) as client: + response = await client.post( + f"{self.endpoint}/analyze", + json={ + "alert_id": alert_id, + "snapshot_url": snapshot_url, + "alert_info": alert_info, + }, + headers={"Authorization": f"Bearer {self.api_key}"} if self.api_key else {}, + ) + response.raise_for_status() + result = response.json() + logger.info(f"AI分析完成: alert_id={alert_id}") + return result + except Exception as e: + logger.error(f"AI分析失败: alert_id={alert_id}, error={e}") + return None + + +ai_analyzer = AIAnalyzer() + + +async def trigger_async_analysis(alert_id: int, snapshot_url: str, alert_info: dict): + asyncio.create_task( + ai_analyzer.analyze_alert(alert_id, snapshot_url, alert_info) + ) + + +def get_ai_analyzer() -> AIAnalyzer: + return ai_analyzer diff --git a/app/services/alert_service.py b/app/services/alert_service.py new file mode 100644 index 0000000..c7ab999 --- /dev/null +++ b/app/services/alert_service.py @@ -0,0 +1,167 @@ +import uuid +from datetime import datetime, timezone +from typing import Optional, List + +from app.models import Alert, AlertStatus, get_session +from app.schemas import AlertCreate, AlertHandleRequest +from app.services.oss_storage import get_oss_storage +from app.utils.logger import logger + + +class AlertService: + def __init__(self): + self.oss = get_oss_storage() + + def generate_alert_no(self) -> str: + timestamp = datetime.now(timezone.utc).strftime("%Y%m%d%H%M%S") + unique_id = uuid.uuid4().hex[:8].upper() + return f"ALT{timestamp}{unique_id}" + + def create_alert( + self, + alert_data: AlertCreate, + snapshot_data: Optional[bytes] = None, + ) -> Alert: + db = get_session() + try: + alert = Alert( + alert_no=self.generate_alert_no(), + camera_id=alert_data.camera_id, + roi_id=alert_data.roi_id, + alert_type=alert_data.alert_type, + algorithm=alert_data.algorithm, + confidence=alert_data.confidence, + duration_minutes=alert_data.duration_minutes, + trigger_time=alert_data.trigger_time, + message=alert_data.message, + status=AlertStatus.PENDING, + ) + + if snapshot_data: + snapshot_url = self.oss.upload_image(snapshot_data) + alert.snapshot_url = snapshot_url + + db.add(alert) + db.commit() + db.refresh(alert) + + logger.info(f"告警创建成功: {alert.alert_no}") + return alert + + finally: + db.close() + + def get_alert(self, alert_id: int) -> Optional[Alert]: + db = get_session() + try: + return db.query(Alert).filter(Alert.id == alert_id).first() + finally: + db.close() + + def get_alerts( + self, + camera_id: Optional[str] = None, + alert_type: Optional[str] = None, + status: Optional[str] = None, + start_time: Optional[datetime] = None, + end_time: Optional[datetime] = None, + page: int = 1, + page_size: int = 20, + ) -> tuple[List[Alert], int]: + db = get_session() + try: + query = db.query(Alert) + + if camera_id: + query = query.filter(Alert.camera_id == camera_id) + if alert_type: + query = query.filter(Alert.alert_type == alert_type) + if status: + query = query.filter(Alert.status == status) + if start_time: + query = query.filter(Alert.trigger_time >= start_time) + if end_time: + query = query.filter(Alert.trigger_time <= end_time) + + total = query.count() + alerts = ( + query.order_by(Alert.created_at.desc()) + .offset((page - 1) * page_size) + .limit(page_size) + .all() + ) + + return alerts, total + finally: + db.close() + + def handle_alert( + self, + alert_id: int, + handle_data: AlertHandleRequest, + handled_by: Optional[str] = None, + ) -> Optional[Alert]: + db = get_session() + try: + alert = db.query(Alert).filter(Alert.id == alert_id).first() + if not alert: + return None + + alert.status = AlertStatus(handle_data.status) + alert.handle_remark = handle_data.remark + alert.handled_by = handled_by + alert.handled_at = datetime.now(timezone.utc) + alert.updated_at = datetime.now(timezone.utc) + + db.commit() + db.refresh(alert) + + logger.info(f"告警处理完成: {alert.alert_no}, 状态: {handle_data.status}") + return alert + + finally: + db.close() + + def get_statistics(self) -> dict: + db = get_session() + try: + total = db.query(Alert).count() + pending = db.query(Alert).filter(Alert.status == AlertStatus.PENDING).count() + confirmed = db.query(Alert).filter(Alert.status == AlertStatus.CONFIRMED).count() + ignored = db.query(Alert).filter(Alert.status == AlertStatus.IGNORED).count() + resolved = db.query(Alert).filter(Alert.status == AlertStatus.RESOLVED).count() + + by_type = {} + for alert in db.query(Alert.alert_type).distinct(): + alert_type = alert[0] + by_type[alert_type] = db.query(Alert).filter(Alert.alert_type == alert_type).count() + + return { + "total": total, + "pending": pending, + "confirmed": confirmed, + "ignored": ignored, + "resolved": resolved, + "by_type": by_type, + } + finally: + db.close() + + def update_ai_analysis(self, alert_id: int, analysis: dict) -> Optional[Alert]: + db = get_session() + try: + alert = db.query(Alert).filter(Alert.id == alert_id).first() + if alert: + alert.ai_analysis = analysis + db.commit() + db.refresh(alert) + return alert + finally: + db.close() + + +alert_service = AlertService() + + +def get_alert_service() -> AlertService: + return alert_service diff --git a/app/services/oss_storage.py b/app/services/oss_storage.py new file mode 100644 index 0000000..fabf87d --- /dev/null +++ b/app/services/oss_storage.py @@ -0,0 +1,79 @@ +import uuid +from datetime import datetime, timezone +from typing import Optional + +import oss2 + +from app.config import settings +from app.utils.logger import logger + + +class OSSStorage: + def __init__(self): + self.bucket = None + self._init_bucket() + + def _init_bucket(self): + if not settings.oss.access_key_id or not settings.oss.bucket_name: + logger.warning("OSS配置不完整,将使用本地存储") + return + + try: + auth = oss2.Auth(settings.oss.access_key_id, settings.oss.access_key_secret) + self.bucket = oss2.Bucket(auth, settings.oss.endpoint, settings.oss.bucket_name) + logger.info(f"OSS连接成功: {settings.oss.bucket_name}") + except Exception as e: + logger.error(f"OSS连接失败: {e}") + self.bucket = None + + def upload_image(self, image_data: bytes, filename: Optional[str] = None) -> str: + if not self.bucket: + return self._upload_local(image_data, filename) + + if filename is None: + timestamp = datetime.now().strftime("%Y%m%d%H%M%S") + unique_id = uuid.uuid4().hex[:8] + ext = ".jpg" + filename = f"alerts/{timestamp}_{unique_id}{ext}" + + try: + self.bucket.put_object(filename, image_data) + url = f"{settings.oss.url_prefix}/{filename}" + logger.info(f"图片上传OSS成功: {url}") + return url + except Exception as e: + logger.error(f"图片上传OSS失败: {e}") + return self._upload_local(image_data, filename) + + def _upload_local(self, image_data: bytes, filename: Optional[str] = None) -> str: + upload_dir = Path("uploads") + upload_dir.mkdir(exist_ok=True) + + if filename is None: + timestamp = datetime.now().strftime("%Y%m%d%H%M%S") + unique_id = uuid.uuid4().hex[:8] + filename = f"alerts/{timestamp}_{unique_id}.jpg" + + file_path = upload_dir / filename + file_path.parent.mkdir(parents=True, exist_ok=True) + + with open(file_path, "wb") as f: + f.write(image_data) + + local_url = f"/uploads/{filename}" + logger.info(f"图片保存本地: {local_url}") + return local_url + + def get_url(self, path: str) -> str: + if path.startswith("http"): + return path + if self.bucket: + return f"{settings.oss.url_prefix}/{path}" + return f"/uploads/{path}" if not path.startswith("/") else path + + +oss_storage = OSSStorage() + + +def get_oss_storage() -> OSSStorage: + return oss_storage diff --git a/app/utils/logger.py b/app/utils/logger.py new file mode 100644 index 0000000..3a9d6db --- /dev/null +++ b/app/utils/logger.py @@ -0,0 +1,34 @@ +import logging +import sys +from datetime import datetime + + +def setup_logger(): + logger = logging.getLogger("alert_platform") + logger.setLevel(logging.DEBUG if settings.app.debug else logging.INFO) + + handler = logging.StreamHandler(sys.stdout) + handler.setLevel(logging.DEBUG) + + formatter = logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + ) + handler.setFormatter(formatter) + + logger.addHandler(handler) + + if settings.app.debug: + log_dir = Path("logs") + log_dir.mkdir(exist_ok=True) + file_handler = logging.FileHandler( + log_dir / f"alert_platform_{datetime.now().strftime('%Y%m%d')}.log", + encoding="utf-8", + ) + file_handler.setLevel(logging.DEBUG) + file_handler.setFormatter(formatter) + logger.addHandler(file_handler) + + return logger + + +logger = setup_logger() diff --git a/main.py b/main.py new file mode 100644 index 0000000..eb389a0 --- /dev/null +++ b/main.py @@ -0,0 +1,16 @@ +# 这是一个示例 Python 脚本。 + +# 按 Shift+F10 执行或将其替换为您的代码。 +# 按 双击 Shift 在所有地方搜索类、文件、工具窗口、操作和设置。 + + +def print_hi(name): + # 在下面的代码行中使用断点来调试脚本。 + print(f'Hi, {name}') # 按 Ctrl+F8 切换断点。 + + +# 按装订区域中的绿色按钮以运行脚本。 +if __name__ == '__main__': + print_hi('PyCharm') + +# 访问 https://www.jetbrains.com/help/pycharm/ 获取 PyCharm 帮助 diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..edcf612 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,10 @@ +fastapi==0.109.0 +uvicorn[standard]==0.27.0 +sqlalchemy==2.0.25 +pydantic==2.5.3 +pydantic-settings==2.1.0 +oss2==2.18.0 +python-multipart==0.0.6 +aiofiles==23.2.1 +python-dateutil==2.8.2 +httpx==0.26.0