diff --git a/.env.example b/.env.example index b76d447..5cf4508 100644 --- a/.env.example +++ b/.env.example @@ -1,17 +1,28 @@ # 数据库配置 DATABASE_URL=sqlite:///./data/alert_platform.db -# 阿里云 OSS 配置 -OSS_ACCESS_KEY_ID=your_access_key_id -OSS_ACCESS_KEY_SECRET=your_access_key_secret -OSS_ENDPOINT=oss-cn-hangzhou.aliyuncs.com -OSS_BUCKET_NAME=your_bucket_name -OSS_URL_PREFIX=https://your-bucket-name.oss-cn-hangzhou.aliyuncs.com +# 腾讯云 COS 存储配置 +COS_ENABLED=false +COS_SECRET_ID=your_secret_id +COS_SECRET_KEY=your_secret_key +COS_REGION=ap-beijing +COS_BUCKET=your-bucket-1250000000 +COS_UPLOAD_PREFIX=alerts +COS_PRESIGN_EXPIRE=1800 +COS_STS_EXPIRE=1800 # 应用配置 APP_HOST=0.0.0.0 APP_PORT=8000 DEBUG=true +DEV_MODE=true + +# MQTT 配置 +MQTT_ENABLED=true +MQTT_BROKER_HOST=localhost +MQTT_BROKER_PORT=1883 +MQTT_USERNAME= +MQTT_PASSWORD= # 大模型配置(可选) AI_MODEL_ENDPOINT=http://localhost:8001 diff --git a/app/config.py b/app/config.py index 8108e35..f89813b 100644 --- a/app/config.py +++ b/app/config.py @@ -15,13 +15,16 @@ class DatabaseConfig: @dataclass -class OSSConfig: - """OSS 存储配置""" - access_key_id: str = "" - access_key_secret: str = "" - endpoint: str = "oss-cn-hangzhou.aliyuncs.com" - bucket_name: str = "" - url_prefix: str = "" +class COSConfig: + """腾讯云 COS 存储配置""" + secret_id: str = "" + secret_key: str = "" + region: str = "ap-beijing" + bucket: str = "" # 格式: bucketname-appid + upload_prefix: str = "alerts" # 对象 Key 前缀 + presign_expire: int = 1800 # 预签名URL有效期(秒),默认30分钟 + sts_expire: int = 1800 # STS 临时凭证有效期(秒) + enabled: bool = False # 是否启用 COS(False 时使用本地存储) @dataclass @@ -57,7 +60,7 @@ class MQTTConfig: class Settings(BaseModel): """全局配置""" database: DatabaseConfig = DatabaseConfig() - oss: OSSConfig = OSSConfig() + cos: COSConfig = COSConfig() app: AppConfig = AppConfig() ai_model: AIModelConfig = AIModelConfig() mqtt: MQTTConfig = MQTTConfig() @@ -72,12 +75,15 @@ def load_settings() -> Settings: database=DatabaseConfig( url=os.getenv("DATABASE_URL", "sqlite:///./data/alert_platform.db"), ), - oss=OSSConfig( - access_key_id=os.getenv("OSS_ACCESS_KEY_ID", ""), - access_key_secret=os.getenv("OSS_ACCESS_KEY_SECRET", ""), - endpoint=os.getenv("OSS_ENDPOINT", "oss-cn-hangzhou.aliyuncs.com"), - bucket_name=os.getenv("OSS_BUCKET_NAME", ""), - url_prefix=os.getenv("OSS_URL_PREFIX", ""), + cos=COSConfig( + secret_id=os.getenv("COS_SECRET_ID", ""), + secret_key=os.getenv("COS_SECRET_KEY", ""), + region=os.getenv("COS_REGION", "ap-beijing"), + bucket=os.getenv("COS_BUCKET", ""), + upload_prefix=os.getenv("COS_UPLOAD_PREFIX", "alerts"), + presign_expire=int(os.getenv("COS_PRESIGN_EXPIRE", "1800")), + sts_expire=int(os.getenv("COS_STS_EXPIRE", "1800")), + enabled=os.getenv("COS_ENABLED", "false").lower() == "true", ), app=AppConfig( host=os.getenv("APP_HOST", "0.0.0.0"), diff --git a/app/main.py b/app/main.py index 146c4e0..d08d61c 100644 --- a/app/main.py +++ b/app/main.py @@ -21,12 +21,13 @@ from app.schemas import ( DeviceStatisticsResponse, ) from app.services.alert_service import alert_service, get_alert_service +from app.services.alarm_event_service import alarm_event_service, get_alarm_event_service from app.services.ai_analyzer import trigger_async_analysis from app.services.mqtt_service import get_mqtt_service from app.services.notification_service import get_notification_service from app.services.device_service import get_device_service from app.utils.logger import logger -from app.routers import yudao_alert_router, yudao_auth_router, yudao_aiot_alarm_router, yudao_aiot_edge_router +from app.routers import yudao_alert_router, yudao_auth_router, yudao_aiot_alarm_router, yudao_aiot_edge_router, yudao_aiot_storage_router from app.yudao_compat import yudao_exception_handler import json @@ -38,13 +39,23 @@ device_service = get_device_service() def handle_mqtt_alert(payload: dict): - """处理 MQTT 告警消息""" + """处理 MQTT 告警消息(双写:旧表 + 新表)""" try: + # 1. 写旧表(保持兼容) alert = alert_service.create_alert_from_mqtt(payload) if alert: - # 通过 WebSocket 推送新告警 + logger.info(f"MQTT 告警写入旧表: {alert.alert_no}") + + # 2. 写新表 + alarm = alarm_event_service.create_from_mqtt(payload) + + # 3. WebSocket 通知(优先使用新表数据) + if alarm: + notification_service.notify_sync("new_alert", alarm.to_dict()) + logger.info(f"MQTT 告警已处理并推送: {alarm.alarm_id}") + elif alert: notification_service.notify_sync("new_alert", alert.to_dict()) - logger.info(f"MQTT 告警已处理并推送: {alert.alert_no}") + logger.info(f"MQTT 告警已处理并推送(旧表): {alert.alert_no}") except Exception as e: logger.error(f"处理 MQTT 告警失败: {e}") @@ -100,6 +111,7 @@ app.add_middleware( allow_credentials=True, allow_methods=["*"], allow_headers=["*"], + ) # ==================== 芋道兼容路由 ==================== @@ -111,6 +123,7 @@ app.include_router(yudao_alert_router) # aiot 命名空间下的新路由,与旧路由并存 app.include_router(yudao_aiot_alarm_router) app.include_router(yudao_aiot_edge_router) +app.include_router(yudao_aiot_storage_router) # 注册芋道格式异常处理器 app.add_exception_handler(HTTPException, yudao_exception_handler) diff --git a/app/migrations/__init__.py b/app/migrations/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/migrations/migrate_to_alarm_event.py b/app/migrations/migrate_to_alarm_event.py new file mode 100644 index 0000000..57c7b35 --- /dev/null +++ b/app/migrations/migrate_to_alarm_event.py @@ -0,0 +1,202 @@ +""" +数据迁移脚本:从旧 alerts 表迁移到新三表结构 + +将 alerts 表数据迁移到: +- alarm_event (主表) +- alarm_event_ext (扩展表) +- alarm_llm_analysis (大模型分析表) + +运行方式: python -m app.migrations.migrate_to_alarm_event +""" +import json +import sys +import os + +# 确保项目根目录在 sys.path +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) + +from datetime import datetime, timezone +from app.models import ( + Alert, AlertStatus, AlertLevel, + AlarmEvent, AlarmEventExt, AlarmLlmAnalysis, + init_db, get_session +) +from app.utils.logger import logger + + +# 旧 level enum → 新 alarm_level 整数 +LEVEL_MAP = { + "low": 1, + "medium": 2, + "high": 3, + "critical": 4, +} + +# 旧 status enum → 新 alarm_status +STATUS_MAP = { + "pending": "NEW", + "confirmed": "CONFIRMED", + "ignored": "FALSE", + "resolved": "CLOSED", + "dispatched": "CLOSED", +} + +# 旧 status → 新 handle_status +HANDLE_STATUS_MAP = { + "pending": "UNHANDLED", + "confirmed": "DONE", + "ignored": "DONE", + "resolved": "DONE", + "dispatched": "DONE", +} + + +def migrate(): + """执行迁移""" + init_db() + + db = get_session() + try: + # 检查是否已有数据 + existing = db.query(AlarmEvent).count() + if existing > 0: + print(f"alarm_event 表已有 {existing} 条数据,跳过迁移。") + print("如需重新迁移,请先清空 alarm_event / alarm_event_ext / alarm_llm_analysis 表。") + return + + total = db.query(Alert).count() + if total == 0: + print("旧 alerts 表为空,无需迁移。") + return + + print(f"开始迁移 {total} 条告警记录...") + + migrated = 0 + ext_count = 0 + llm_count = 0 + errors = 0 + + # 分批处理 + batch_size = 100 + offset = 0 + + while offset < total: + alerts = ( + db.query(Alert) + .order_by(Alert.id) + .offset(offset) + .limit(batch_size) + .all() + ) + + for alert in alerts: + try: + _migrate_one(db, alert) + migrated += 1 + + # 统计扩展记录 + if any([alert.bind_id, alert.bbox, alert.message]): + ext_count += 1 + if alert.ai_analysis: + llm_count += 1 + + except Exception as e: + errors += 1 + logger.error(f"迁移告警 {alert.alert_no} 失败: {e}") + + db.commit() + offset += batch_size + print(f" 已迁移 {min(offset, total)}/{total}...") + + print(f"\n迁移完成:") + print(f" alarm_event: {migrated} 条") + print(f" alarm_event_ext: {ext_count} 条") + print(f" alarm_llm_analysis: {llm_count} 条") + print(f" 失败: {errors} 条") + + finally: + db.close() + + +def _migrate_one(db, alert: Alert): + """迁移单条告警""" + # 1. alarm_event + old_status = alert.status.value if alert.status else "pending" + old_level = alert.level.value if alert.level else "medium" + + # confidence: 旧表是 0-100 整数,新表是 0-1 float + confidence_score = None + if alert.confidence is not None: + confidence_score = float(alert.confidence) / 100.0 + + # duration_minutes → duration_ms + duration_ms = None + if alert.duration_minutes is not None: + duration_ms = int(alert.duration_minutes) * 60 * 1000 + + alarm = AlarmEvent( + alarm_id=alert.alert_no, + alarm_type=alert.alert_type, + algorithm_code=alert.algorithm, + device_id=alert.camera_id, + scene_id=alert.roi_id, + event_time=alert.trigger_time, + duration_ms=duration_ms, + alarm_level=LEVEL_MAP.get(old_level, 2), + confidence_score=confidence_score, + alarm_status=STATUS_MAP.get(old_status, "NEW"), + handle_status=HANDLE_STATUS_MAP.get(old_status, "UNHANDLED"), + snapshot_url=alert.snapshot_url, + edge_node_id=alert.device_id, + handler=alert.handled_by, + handle_remark=alert.handle_remark, + handled_at=alert.handled_at, + created_at=alert.created_at, + updated_at=alert.updated_at, + ) + db.add(alarm) + + # 2. alarm_event_ext + ext_data = {} + if alert.bind_id: + ext_data["bind_id"] = alert.bind_id + if alert.bbox: + ext_data["bbox"] = alert.bbox + if alert.message: + ext_data["message"] = alert.message + + if ext_data: + ext = AlarmEventExt( + alarm_id=alert.alert_no, + ext_type="EDGE", + ext_data=ext_data, + created_at=alert.created_at, + ) + db.add(ext) + + # 3. alarm_llm_analysis + if alert.ai_analysis: + ai = alert.ai_analysis + if isinstance(ai, str): + try: + ai = json.loads(ai) + except (json.JSONDecodeError, TypeError): + ai = {"summary": ai} + + if isinstance(ai, dict): + analysis = AlarmLlmAnalysis( + alarm_id=alert.alert_no, + llm_model=ai.get("model", "unknown"), + analysis_type="REVIEW", + summary=ai.get("summary") or ai.get("analysis"), + is_false_alarm=ai.get("is_false_alarm"), + risk_score=ai.get("risk_score"), + confidence_score=ai.get("confidence"), + suggestion=ai.get("suggestion") or ai.get("recommendation"), + created_at=alert.updated_at or alert.created_at, + ) + db.add(analysis) + + +if __name__ == "__main__": + migrate() diff --git a/app/models.py b/app/models.py index 191eea7..90d4f6e 100644 --- a/app/models.py +++ b/app/models.py @@ -6,7 +6,7 @@ import os from datetime import datetime, timezone from typing import Optional from sqlalchemy import ( - Column, String, Integer, BigInteger, DateTime, Text, Enum, JSON, + Column, String, Integer, SmallInteger, BigInteger, Boolean, Float, DateTime, Text, Enum, JSON, ForeignKey, create_engine, Index ) from sqlalchemy.orm import declarative_base, sessionmaker, relationship @@ -254,6 +254,118 @@ class EdgeDevice(Base): } +# ==================== 新告警三表结构 ==================== + +class AlarmEvent(Base): + """告警事件主表""" + __tablename__ = "alarm_event" + + alarm_id = Column(String(64), primary_key=True, comment="分布式告警ID") + alarm_type = Column(String(32), nullable=False, comment="告警类型") + algorithm_code = Column(String(64), comment="算法编码") + device_id = Column(String(64), nullable=False, comment="摄像头/设备ID") + scene_id = Column(String(64), comment="场景/ROI ID") + event_time = Column(DateTime, nullable=False, comment="事件发生时间") + first_frame_time = Column(DateTime, comment="首帧时间") + last_frame_time = Column(DateTime, comment="末帧时间") + duration_ms = Column(Integer, comment="持续时长(毫秒)") + alarm_level = Column(SmallInteger, comment="告警级别: 1提醒 2一般 3严重 4紧急") + confidence_score = Column(Float, comment="置信度 0-1") + alarm_status = Column(String(20), default="NEW", comment="告警状态: NEW/CONFIRMED/FALSE/CLOSED") + handle_status = Column(String(20), default="UNHANDLED", comment="处理状态: UNHANDLED/HANDLING/DONE") + snapshot_url = Column(String(512), comment="截图URL") + video_url = Column(String(512), comment="视频URL") + edge_node_id = Column(String(64), comment="边缘节点ID") + handler = Column(String(64), comment="处理人") + handle_remark = Column(Text, comment="处理备注") + handled_at = Column(DateTime, comment="处理时间") + created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc)) + updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), + onupdate=lambda: datetime.now(timezone.utc)) + + __table_args__ = ( + Index('idx_alarm_event_time', 'event_time'), + Index('idx_alarm_device_type', 'device_id', 'alarm_type'), + ) + + def to_dict(self) -> dict: + return { + "alarm_id": self.alarm_id, + "alarm_type": self.alarm_type, + "algorithm_code": self.algorithm_code, + "device_id": self.device_id, + "scene_id": self.scene_id, + "event_time": self.event_time.isoformat() if self.event_time else None, + "first_frame_time": self.first_frame_time.isoformat() if self.first_frame_time else None, + "last_frame_time": self.last_frame_time.isoformat() if self.last_frame_time else None, + "duration_ms": self.duration_ms, + "alarm_level": self.alarm_level, + "confidence_score": self.confidence_score, + "alarm_status": self.alarm_status, + "handle_status": self.handle_status, + "snapshot_url": self.snapshot_url, + "video_url": self.video_url, + "edge_node_id": self.edge_node_id, + "handler": self.handler, + "handle_remark": self.handle_remark, + "handled_at": self.handled_at.isoformat() if self.handled_at else None, + "created_at": self.created_at.isoformat() if self.created_at else None, + "updated_at": self.updated_at.isoformat() if self.updated_at else None, + } + + +class AlarmEventExt(Base): + """告警事件扩展表(算法结果详情)""" + __tablename__ = "alarm_event_ext" + + id = Column(Integer, primary_key=True, autoincrement=True) + alarm_id = Column(String(64), nullable=False, index=True, comment="关联告警ID") + ext_type = Column(String(32), comment="扩展类型: EDGE/POST/MANUAL") + ext_data = Column(JSON, comment="扩展数据") + roi_config = Column(JSON, comment="ROI配置快照") + created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc)) + + def to_dict(self) -> dict: + return { + "id": self.id, + "alarm_id": self.alarm_id, + "ext_type": self.ext_type, + "ext_data": self.ext_data, + "roi_config": self.roi_config, + "created_at": self.created_at.isoformat() if self.created_at else None, + } + + +class AlarmLlmAnalysis(Base): + """告警大模型分析表""" + __tablename__ = "alarm_llm_analysis" + + id = Column(Integer, primary_key=True, autoincrement=True) + alarm_id = Column(String(64), nullable=False, index=True, comment="关联告警ID") + llm_model = Column(String(32), comment="模型名称") + analysis_type = Column(String(20), comment="分析类型: REVIEW/EXPLAIN/RISK") + summary = Column(Text, comment="分析摘要") + is_false_alarm = Column(Boolean, comment="是否误报") + risk_score = Column(Integer, comment="风险评分 0-100") + confidence_score = Column(Float, comment="分析置信度") + suggestion = Column(Text, comment="处置建议") + created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc)) + + def to_dict(self) -> dict: + return { + "id": self.id, + "alarm_id": self.alarm_id, + "llm_model": self.llm_model, + "analysis_type": self.analysis_type, + "summary": self.summary, + "is_false_alarm": self.is_false_alarm, + "risk_score": self.risk_score, + "confidence_score": self.confidence_score, + "suggestion": self.suggestion, + "created_at": self.created_at.isoformat() if self.created_at else None, + } + + # ==================== 数据库管理 ==================== _engine = None diff --git a/app/routers/__init__.py b/app/routers/__init__.py index 2155527..57ffb00 100644 --- a/app/routers/__init__.py +++ b/app/routers/__init__.py @@ -8,10 +8,12 @@ from app.routers.yudao_auth import router as yudao_auth_router from app.routers.yudao_alert import router as yudao_alert_router from app.routers.yudao_aiot_alarm import router as yudao_aiot_alarm_router from app.routers.yudao_aiot_edge import router as yudao_aiot_edge_router +from app.routers.yudao_aiot_storage import router as yudao_aiot_storage_router __all__ = [ "yudao_auth_router", "yudao_alert_router", "yudao_aiot_alarm_router", "yudao_aiot_edge_router", + "yudao_aiot_storage_router", ] diff --git a/app/routers/yudao_aiot_alarm.py b/app/routers/yudao_aiot_alarm.py index 4cf5769..5ac99c1 100644 --- a/app/routers/yudao_aiot_alarm.py +++ b/app/routers/yudao_aiot_alarm.py @@ -1,5 +1,5 @@ """ -AIoT 告警路由 - 芋道规范 +AIoT 告警路由 - 芋道规范(新三表结构) 统一到 /admin-api/aiot/alarm 命名空间,与 aiot 平台架构对齐。 @@ -9,7 +9,7 @@ API 路径规范: - /admin-api/aiot/alarm/alert/handle - 处理告警 - /admin-api/aiot/alarm/alert/delete - 删除告警 - /admin-api/aiot/alarm/alert/statistics - 获取统计 - - /admin-api/aiot/alarm/camera-summary/page - 摄像头汇总 + - /admin-api/aiot/alarm/device-summary/page - 设备告警汇总 """ from fastapi import APIRouter, Query, Depends, HTTPException @@ -17,75 +17,83 @@ from typing import Optional from datetime import datetime from app.yudao_compat import YudaoResponse, get_current_user -from app.services.alert_service import get_alert_service, AlertService -from app.schemas import AlertHandleRequest +from app.services.alarm_event_service import get_alarm_event_service, AlarmEventService +from app.services.oss_storage import get_oss_storage router = APIRouter(prefix="/admin-api/aiot/alarm", tags=["AIoT-告警"]) +def _alarm_to_camel(alarm_dict: dict) -> dict: + """将 alarm_event 字典转换为前端 camelCase 格式""" + # snapshot_url: 如果是 COS object_key,转为预签名 URL + storage = get_oss_storage() + snapshot_url = alarm_dict.get("snapshot_url") + if snapshot_url: + snapshot_url = storage.get_url(snapshot_url) + + return { + "alarmId": alarm_dict.get("alarm_id"), + "alarmType": alarm_dict.get("alarm_type"), + "alarmTypeName": _get_alarm_type_name(alarm_dict.get("alarm_type")), + "algorithmCode": alarm_dict.get("algorithm_code"), + "deviceId": alarm_dict.get("device_id"), + "deviceName": alarm_dict.get("device_id"), + "sceneId": alarm_dict.get("scene_id"), + "eventTime": alarm_dict.get("event_time"), + "firstFrameTime": alarm_dict.get("first_frame_time"), + "lastFrameTime": alarm_dict.get("last_frame_time"), + "durationMs": alarm_dict.get("duration_ms"), + "alarmLevel": alarm_dict.get("alarm_level"), + "confidenceScore": alarm_dict.get("confidence_score"), + "alarmStatus": alarm_dict.get("alarm_status"), + "handleStatus": alarm_dict.get("handle_status"), + "snapshotUrl": snapshot_url, + "videoUrl": alarm_dict.get("video_url"), + "edgeNodeId": alarm_dict.get("edge_node_id"), + "handler": alarm_dict.get("handler"), + "handleRemark": alarm_dict.get("handle_remark"), + "handledAt": alarm_dict.get("handled_at"), + "createdAt": alarm_dict.get("created_at"), + "updatedAt": alarm_dict.get("updated_at"), + # 扩展数据(详情时可能包含) + "ext": alarm_dict.get("ext"), + "llmAnalyses": alarm_dict.get("llm_analyses"), + } + + # ==================== 告警管理 ==================== @router.get("/alert/page") async def get_alert_page( pageNo: int = Query(1, ge=1, description="页码"), pageSize: int = Query(20, ge=1, le=100, description="每页大小"), - cameraId: Optional[str] = Query(None, description="摄像头ID"), - deviceId: Optional[str] = Query(None, description="设备ID"), - alertType: Optional[str] = Query(None, description="告警类型"), - status: Optional[str] = Query(None, description="状态: pending/confirmed/ignored/resolved/dispatched"), - level: Optional[str] = Query(None, description="级别: low/medium/high/critical"), + deviceId: Optional[str] = Query(None, description="摄像头/设备ID"), + edgeNodeId: Optional[str] = Query(None, description="边缘节点ID"), + alarmType: Optional[str] = Query(None, description="告警类型"), + alarmStatus: Optional[str] = Query(None, description="告警状态: NEW/CONFIRMED/FALSE/CLOSED"), + alarmLevel: Optional[int] = Query(None, description="告警级别: 1提醒/2一般/3严重/4紧急"), startTime: Optional[datetime] = Query(None, description="开始时间"), endTime: Optional[datetime] = Query(None, description="结束时间"), - service: AlertService = Depends(get_alert_service), + service: AlarmEventService = Depends(get_alarm_event_service), current_user: dict = Depends(get_current_user) ): """分页查询告警列表""" - alerts, total = service.get_alerts( - camera_id=cameraId, + alarms, total = service.get_alarms( device_id=deviceId, - alert_type=alertType, - status=status, - level=level, + alarm_type=alarmType, + alarm_status=alarmStatus, + alarm_level=alarmLevel, + edge_node_id=edgeNodeId, start_time=startTime, end_time=endTime, page=pageNo, page_size=pageSize, ) - alert_list = [] - for alert in alerts: - alert_dict = alert.to_dict() - alert_list.append({ - "id": alert_dict.get("id"), - "alertNo": alert_dict.get("alert_no"), - "cameraId": alert_dict.get("camera_id"), - "cameraName": alert_dict.get("camera_id"), - "roiId": alert_dict.get("roi_id"), - "bindId": alert_dict.get("bind_id"), - "deviceId": alert_dict.get("device_id"), - "alertType": alert_dict.get("alert_type"), - "alertTypeName": _get_alert_type_name(alert_dict.get("alert_type")), - "algorithm": alert_dict.get("algorithm"), - "confidence": alert_dict.get("confidence"), - "durationMinutes": alert_dict.get("duration_minutes"), - "triggerTime": alert_dict.get("trigger_time"), - "message": alert_dict.get("message"), - "bbox": alert_dict.get("bbox"), - "snapshotUrl": alert_dict.get("snapshot_url"), - "ossUrl": alert_dict.get("oss_url"), - "status": alert_dict.get("status"), - "level": alert_dict.get("level"), - "handleRemark": alert_dict.get("handle_remark"), - "handledBy": alert_dict.get("handled_by"), - "handledAt": alert_dict.get("handled_at"), - "workOrderId": alert_dict.get("work_order_id"), - "aiAnalysis": alert_dict.get("ai_analysis"), - "createdAt": alert_dict.get("created_at"), - "updatedAt": alert_dict.get("updated_at"), - }) + alarm_list = [_alarm_to_camel(a.to_dict()) for a in alarms] return YudaoResponse.page( - list_data=alert_list, + list_data=alarm_list, total=total, page_no=pageNo, page_size=pageSize @@ -94,60 +102,38 @@ async def get_alert_page( @router.get("/alert/get") async def get_alert( - id: int = Query(..., description="告警ID"), - service: AlertService = Depends(get_alert_service), + alarmId: str = Query(..., description="告警ID"), + service: AlarmEventService = Depends(get_alarm_event_service), current_user: dict = Depends(get_current_user) ): """获取告警详情""" - alert = service.get_alert(id) - if not alert: + alarm_dict = service.get_alarm(alarmId) + if not alarm_dict: raise HTTPException(status_code=404, detail="告警不存在") - alert_dict = alert.to_dict() - return YudaoResponse.success({ - "id": alert_dict.get("id"), - "alertNo": alert_dict.get("alert_no"), - "cameraId": alert_dict.get("camera_id"), - "cameraName": alert_dict.get("camera_id"), - "roiId": alert_dict.get("roi_id"), - "bindId": alert_dict.get("bind_id"), - "deviceId": alert_dict.get("device_id"), - "alertType": alert_dict.get("alert_type"), - "alertTypeName": _get_alert_type_name(alert_dict.get("alert_type")), - "algorithm": alert_dict.get("algorithm"), - "confidence": alert_dict.get("confidence"), - "durationMinutes": alert_dict.get("duration_minutes"), - "triggerTime": alert_dict.get("trigger_time"), - "message": alert_dict.get("message"), - "bbox": alert_dict.get("bbox"), - "snapshotUrl": alert_dict.get("snapshot_url"), - "ossUrl": alert_dict.get("oss_url"), - "status": alert_dict.get("status"), - "level": alert_dict.get("level"), - "handleRemark": alert_dict.get("handle_remark"), - "handledBy": alert_dict.get("handled_by"), - "handledAt": alert_dict.get("handled_at"), - "workOrderId": alert_dict.get("work_order_id"), - "aiAnalysis": alert_dict.get("ai_analysis"), - "createdAt": alert_dict.get("created_at"), - "updatedAt": alert_dict.get("updated_at"), - }) + return YudaoResponse.success(_alarm_to_camel(alarm_dict)) @router.put("/alert/handle") async def handle_alert( - id: int = Query(..., description="告警ID"), - status: str = Query(..., description="处理状态: confirmed/ignored/resolved"), + alarmId: str = Query(..., description="告警ID"), + alarmStatus: Optional[str] = Query(None, description="告警状态: CONFIRMED/FALSE/CLOSED"), + handleStatus: Optional[str] = Query(None, description="处理状态: HANDLING/DONE"), remark: Optional[str] = Query(None, description="处理备注"), - service: AlertService = Depends(get_alert_service), + service: AlarmEventService = Depends(get_alarm_event_service), current_user: dict = Depends(get_current_user) ): """处理告警""" - handle_data = AlertHandleRequest(status=status, remark=remark) - handled_by = current_user.get("username", "admin") + handler = current_user.get("username", "admin") - alert = service.handle_alert(id, handle_data, handled_by) - if not alert: + alarm = service.handle_alarm( + alarm_id=alarmId, + alarm_status=alarmStatus, + handle_status=handleStatus, + remark=remark, + handler=handler, + ) + if not alarm: raise HTTPException(status_code=404, detail="告警不存在") return YudaoResponse.success(True) @@ -155,12 +141,12 @@ async def handle_alert( @router.delete("/alert/delete") async def delete_alert( - id: int = Query(..., description="告警ID"), - service: AlertService = Depends(get_alert_service), + alarmId: str = Query(..., description="告警ID"), + service: AlarmEventService = Depends(get_alarm_event_service), current_user: dict = Depends(get_current_user) ): """删除告警""" - success = service.delete_alert(id) + success = service.delete_alarm(alarmId) if not success: raise HTTPException(status_code=404, detail="告警不存在") @@ -169,35 +155,26 @@ async def delete_alert( @router.get("/alert/statistics") async def get_statistics( - service: AlertService = Depends(get_alert_service), + service: AlarmEventService = Depends(get_alarm_event_service), current_user: dict = Depends(get_current_user) ): """获取告警统计""" stats = service.get_statistics() - return YudaoResponse.success({ - "total": stats.get("total", 0), - "pending": stats.get("pending", 0), - "confirmed": stats.get("confirmed", 0), - "ignored": stats.get("ignored", 0), - "resolved": stats.get("resolved", 0), - "dispatched": stats.get("dispatched", 0), - "byType": stats.get("by_type", {}), - "byLevel": stats.get("by_level", {}), - }) + return YudaoResponse.success(stats) -# ==================== 摄像头告警汇总 ==================== +# ==================== 设备告警汇总 ==================== -@router.get("/camera-summary/page") -async def get_camera_summary_page( +@router.get("/device-summary/page") +async def get_device_summary_page( pageNo: int = Query(1, ge=1, description="页码"), pageSize: int = Query(20, ge=1, le=100, description="每页大小"), - service: AlertService = Depends(get_alert_service), + service: AlarmEventService = Depends(get_alarm_event_service), current_user: dict = Depends(get_current_user) ): - """获取摄像头告警汇总(分页)""" - result = service.get_camera_alert_summary(page=pageNo, page_size=pageSize) + """获取设备告警汇总(分页)""" + result = service.get_device_summary(page=pageNo, page_size=pageSize) return YudaoResponse.page( list_data=result.get("list", []), @@ -209,7 +186,7 @@ async def get_camera_summary_page( # ==================== 辅助函数 ==================== -def _get_alert_type_name(alert_type: Optional[str]) -> str: +def _get_alarm_type_name(alarm_type: Optional[str]) -> str: """获取告警类型名称""" type_names = { "leave_post": "离岗检测", @@ -221,4 +198,4 @@ def _get_alert_type_name(alert_type: Optional[str]) -> str: "helmet": "安全帽检测", "unknown": "未知类型", } - return type_names.get(alert_type, alert_type or "未知类型") + return type_names.get(alarm_type, alarm_type or "未知类型") diff --git a/app/routers/yudao_aiot_storage.py b/app/routers/yudao_aiot_storage.py new file mode 100644 index 0000000..09a60d5 --- /dev/null +++ b/app/routers/yudao_aiot_storage.py @@ -0,0 +1,137 @@ +""" +AIoT 文件存储路由 - COS 对象存储接口 + +API 路径规范: + - POST /admin-api/aiot/storage/upload - 后端中转上传 + - GET /admin-api/aiot/storage/presign - 获取预签名下载 URL + - GET /admin-api/aiot/storage/sts - 获取 STS 临时凭证(前端直传) + - GET /admin-api/aiot/storage/upload-url - 获取预签名上传 URL(前端直传) +""" +import os +from fastapi import APIRouter, Query, Depends, HTTPException, UploadFile, File + +from app.yudao_compat import YudaoResponse, get_current_user +from app.services.oss_storage import get_oss_storage, COSStorage, _generate_object_key + +router = APIRouter(prefix="/admin-api/aiot/storage", tags=["AIoT-文件存储"]) + + +@router.post("/upload") +async def upload_file( + file: UploadFile = File(..., description="上传文件"), + prefix: str = Query("alerts", description="存储路径前缀"), + current_user: dict = Depends(get_current_user), +): + """ + 后端中转上传 + + 文件经过后端写入 COS / 本地,返回 object_key。 + 适用于服务端需要对文件做校验或处理的场景。 + """ + storage = get_oss_storage() + + # 文件大小检查(限制 20MB) + content = await file.read() + if len(content) > 20 * 1024 * 1024: + raise HTTPException(status_code=400, detail="文件大小超过 20MB 限制") + + # 文件类型检查 + allowed_types = {".jpg", ".jpeg", ".png", ".gif", ".bmp", ".webp", ".mp4", ".mov", ".pdf"} + _, ext = os.path.splitext(file.filename or "unknown.jpg") + ext = ext.lower() + if ext not in allowed_types: + raise HTTPException(status_code=400, detail=f"不支持的文件类型: {ext}") + + # 确定 content_type + content_type = file.content_type or "application/octet-stream" + + # 生成 object_key + object_key = _generate_object_key(prefix=prefix, ext=ext) + + # 上传 + result_key = storage.upload_file(content, object_key, content_type) + + return YudaoResponse.success({ + "objectKey": result_key, + "filename": file.filename, + "size": len(content), + "contentType": content_type, + }) + + +@router.get("/presign") +async def get_presigned_download_url( + objectKey: str = Query(..., description="对象 Key"), + expire: int = Query(1800, description="有效期(秒)"), + current_user: dict = Depends(get_current_user), +): + """ + 获取预签名下载 URL + + 前端查看告警截图时调用此接口,拿到临时 URL 后直接访问 COS。 + URL 过期后失效,防止泄露。 + """ + storage = get_oss_storage() + url = storage.get_presigned_url(objectKey, expire) + + return YudaoResponse.success({ + "url": url, + "expire": expire, + }) + + +@router.get("/upload-url") +async def get_presigned_upload_url( + prefix: str = Query("alerts", description="存储路径前缀"), + ext: str = Query(".jpg", description="文件扩展名"), + expire: int = Query(1800, description="有效期(秒)"), + current_user: dict = Depends(get_current_user), +): + """ + 获取预签名上传 URL + + 前端拿到此 URL 后,直接 PUT 文件到 COS,无需经过后端中转。 + 适用于大文件或高频上传场景。 + """ + storage = get_oss_storage() + + if not storage.is_cos_mode: + raise HTTPException(status_code=400, detail="COS 未启用,请使用 /upload 接口") + + object_key = _generate_object_key(prefix=prefix, ext=ext) + url = storage.get_presigned_upload_url(object_key, expire) + + if not url: + raise HTTPException(status_code=500, detail="生成上传 URL 失败") + + return YudaoResponse.success({ + "uploadUrl": url, + "objectKey": object_key, + "expire": expire, + }) + + +@router.get("/sts") +async def get_sts_credential( + prefix: str = Query("alerts", description="允许上传的路径前缀"), + current_user: dict = Depends(get_current_user), +): + """ + 获取 STS 临时凭证 + + 前端使用 COS JS SDK 直传时,先调用此接口获取临时密钥。 + 返回的 tmpSecretId / tmpSecretKey / sessionToken 用于初始化 SDK。 + 凭证仅允许向指定前缀上传,不可读取或删除其他路径。 + """ + storage = get_oss_storage() + + if not storage.is_cos_mode: + raise HTTPException(status_code=400, detail="COS 未启用,无法签发 STS 凭证") + + allow_prefix = f"{prefix}/*" + credential = storage.get_sts_credential(allow_prefix) + + if not credential: + raise HTTPException(status_code=500, detail="STS 凭证签发失败,请检查 COS 配置或安装 qcloud-python-sts") + + return YudaoResponse.success(credential) diff --git a/app/services/alarm_event_service.py b/app/services/alarm_event_service.py new file mode 100644 index 0000000..73b1879 --- /dev/null +++ b/app/services/alarm_event_service.py @@ -0,0 +1,475 @@ +""" +告警事件服务(新三表结构) +处理 alarm_event / alarm_event_ext / alarm_llm_analysis 的 CRUD +""" +import uuid +from datetime import datetime, timezone +from typing import Optional, List, Dict, Any, Tuple + +from sqlalchemy import func + +from app.models import AlarmEvent, AlarmEventExt, AlarmLlmAnalysis, get_session +from app.services.oss_storage import get_oss_storage +from app.utils.logger import logger + + +def generate_alarm_id() -> str: + """生成告警ID: ALM + YYYYMMDDHHmmss + 8位uuid""" + timestamp = datetime.now(timezone.utc).strftime("%Y%m%d%H%M%S") + unique_id = uuid.uuid4().hex[:8].upper() + return f"ALM{timestamp}{unique_id}" + + +def _determine_alarm_level(alarm_type: str, confidence: float, duration_ms: Optional[int] = None) -> int: + """ + 根据告警类型、置信度和持续时长确定告警级别 + 返回: 1提醒 2一般 3严重 4紧急 + """ + if alarm_type == "intrusion": + return 3 # 严重 + elif alarm_type == "leave_post": + if duration_ms and duration_ms > 30 * 60 * 1000: + return 3 # 严重 + elif duration_ms and duration_ms > 10 * 60 * 1000: + return 2 # 一般 + return 1 # 提醒 + elif confidence and confidence > 0.9: + return 3 # 严重 + elif confidence and confidence > 0.7: + return 2 # 一般 + + return 2 # 默认一般 + + +class AlarmEventService: + """告警事件服务""" + + def __init__(self): + self.oss = get_oss_storage() + + def create_from_mqtt(self, mqtt_data: Dict[str, Any]) -> Optional[AlarmEvent]: + """从 MQTT 消息创建告警事件""" + db = get_session() + try: + alarm_id = generate_alarm_id() + + # 解析时间 + timestamp_str = mqtt_data.get("timestamp") + if timestamp_str: + try: + event_time = datetime.fromisoformat(timestamp_str.replace("Z", "+00:00")) + except ValueError: + event_time = datetime.now(timezone.utc) + else: + event_time = datetime.now(timezone.utc) + + # 置信度保持 float 0-1 + confidence = mqtt_data.get("confidence") + if confidence is not None: + confidence = float(confidence) + # 如果传入的是 0-100 范围的值,转为 0-1 + if confidence > 1: + confidence = confidence / 100.0 + + # duration_minutes → duration_ms + duration_minutes = mqtt_data.get("duration_minutes") + duration_ms = None + if duration_minutes is not None: + duration_ms = int(float(duration_minutes) * 60 * 1000) + + alarm_type = mqtt_data.get("alert_type", "unknown") + alarm_level = _determine_alarm_level(alarm_type, confidence or 0, duration_ms) + + alarm = AlarmEvent( + alarm_id=alarm_id, + alarm_type=alarm_type, + algorithm_code=mqtt_data.get("algorithm", "YOLO"), + device_id=mqtt_data.get("camera_id", "unknown"), + scene_id=mqtt_data.get("roi_id"), + event_time=event_time, + duration_ms=duration_ms, + alarm_level=alarm_level, + confidence_score=confidence, + alarm_status="NEW", + handle_status="UNHANDLED", + edge_node_id=mqtt_data.get("device_id"), + ) + + db.add(alarm) + + # 写入扩展表 + ext_data = {} + for key in ("bind_id", "target_class", "bbox", "message", "alert_id"): + val = mqtt_data.get(key) + if val is not None: + ext_key = "edge_alert_id" if key == "alert_id" else key + ext_data[ext_key] = val + + if ext_data: + ext = AlarmEventExt( + alarm_id=alarm_id, + ext_type="EDGE", + ext_data=ext_data, + ) + db.add(ext) + + db.commit() + db.refresh(alarm) + + logger.info(f"新告警事件创建: {alarm_id}, type={alarm_type}") + return alarm + + except Exception as e: + db.rollback() + logger.error(f"创建告警事件失败: {e}") + return None + finally: + db.close() + + def create_from_http(self, data: Dict[str, Any], snapshot_data: Optional[bytes] = None) -> Optional[AlarmEvent]: + """从 HTTP 请求创建告警事件""" + db = get_session() + try: + alarm_id = generate_alarm_id() + + # 解析时间 + trigger_time = data.get("trigger_time") or data.get("timestamp") + if isinstance(trigger_time, str): + try: + event_time = datetime.fromisoformat(trigger_time.replace("Z", "+00:00")) + except ValueError: + event_time = datetime.now(timezone.utc) + elif isinstance(trigger_time, datetime): + event_time = trigger_time + else: + event_time = datetime.now(timezone.utc) + + confidence = data.get("confidence") + if confidence is not None: + confidence = float(confidence) + if confidence > 1: + confidence = confidence / 100.0 + + duration_minutes = data.get("duration_minutes") + duration_ms = None + if duration_minutes is not None: + duration_ms = int(float(duration_minutes) * 60 * 1000) + + alarm_type = data.get("alert_type", "unknown") + alarm_level = _determine_alarm_level(alarm_type, confidence or 0, duration_ms) + + snapshot_url = None + if snapshot_data: + snapshot_url = self.oss.upload_image(snapshot_data) + + alarm = AlarmEvent( + alarm_id=alarm_id, + alarm_type=alarm_type, + algorithm_code=data.get("algorithm"), + device_id=data.get("camera_id", "unknown"), + scene_id=data.get("roi_id"), + event_time=event_time, + duration_ms=duration_ms, + alarm_level=alarm_level, + confidence_score=confidence, + alarm_status="NEW", + handle_status="UNHANDLED", + snapshot_url=snapshot_url, + edge_node_id=data.get("device_id"), + ) + + db.add(alarm) + + # 写入扩展表 + ext_data = {} + for key in ("bind_id", "target_class", "bbox", "message"): + val = data.get(key) + if val is not None: + ext_data[key] = val + + if ext_data: + ext = AlarmEventExt( + alarm_id=alarm_id, + ext_type="POST", + ext_data=ext_data, + ) + db.add(ext) + + db.commit() + db.refresh(alarm) + + logger.info(f"HTTP告警事件创建: {alarm_id}") + return alarm + + except Exception as e: + db.rollback() + logger.error(f"HTTP创建告警事件失败: {e}") + return None + finally: + db.close() + + def get_alarm(self, alarm_id: str) -> Optional[Dict]: + """获取告警详情(含扩展信息)""" + db = get_session() + try: + alarm = db.query(AlarmEvent).filter(AlarmEvent.alarm_id == alarm_id).first() + if not alarm: + return None + + result = alarm.to_dict() + + # 关联扩展 + ext = db.query(AlarmEventExt).filter(AlarmEventExt.alarm_id == alarm_id).first() + if ext: + result["ext"] = ext.to_dict() + + # 关联 LLM 分析 + analyses = db.query(AlarmLlmAnalysis).filter( + AlarmLlmAnalysis.alarm_id == alarm_id + ).order_by(AlarmLlmAnalysis.created_at.desc()).all() + if analyses: + result["llm_analyses"] = [a.to_dict() for a in analyses] + + return result + finally: + db.close() + + def get_alarms( + self, + device_id: Optional[str] = None, + alarm_type: Optional[str] = None, + alarm_status: Optional[str] = None, + alarm_level: Optional[int] = None, + start_time: Optional[datetime] = None, + end_time: Optional[datetime] = None, + edge_node_id: Optional[str] = None, + page: int = 1, + page_size: int = 20, + ) -> Tuple[List[AlarmEvent], int]: + """分页查询告警列表""" + db = get_session() + try: + query = db.query(AlarmEvent) + + if device_id: + query = query.filter(AlarmEvent.device_id == device_id) + if alarm_type: + query = query.filter(AlarmEvent.alarm_type == alarm_type) + if alarm_status: + query = query.filter(AlarmEvent.alarm_status == alarm_status) + if alarm_level is not None: + query = query.filter(AlarmEvent.alarm_level == alarm_level) + if edge_node_id: + query = query.filter(AlarmEvent.edge_node_id == edge_node_id) + if start_time: + query = query.filter(AlarmEvent.event_time >= start_time) + if end_time: + query = query.filter(AlarmEvent.event_time <= end_time) + + total = query.count() + alarms = ( + query.order_by(AlarmEvent.event_time.desc()) + .offset((page - 1) * page_size) + .limit(page_size) + .all() + ) + + return alarms, total + finally: + db.close() + + def handle_alarm( + self, + alarm_id: str, + alarm_status: Optional[str] = None, + handle_status: Optional[str] = None, + remark: Optional[str] = None, + handler: Optional[str] = None, + ) -> Optional[AlarmEvent]: + """处理告警""" + db = get_session() + try: + alarm = db.query(AlarmEvent).filter(AlarmEvent.alarm_id == alarm_id).first() + if not alarm: + return None + + if alarm_status: + alarm.alarm_status = alarm_status + if handle_status: + alarm.handle_status = handle_status + if remark is not None: + alarm.handle_remark = remark + if handler: + alarm.handler = handler + alarm.handled_at = datetime.now(timezone.utc) + alarm.updated_at = datetime.now(timezone.utc) + + db.commit() + db.refresh(alarm) + + logger.info(f"告警已处理: {alarm_id}, status={alarm_status}") + return alarm + finally: + db.close() + + def delete_alarm(self, alarm_id: str) -> bool: + """删除告警(含扩展和分析)""" + db = get_session() + try: + alarm = db.query(AlarmEvent).filter(AlarmEvent.alarm_id == alarm_id).first() + if not alarm: + return False + + # 删除关联数据 + db.query(AlarmEventExt).filter(AlarmEventExt.alarm_id == alarm_id).delete() + db.query(AlarmLlmAnalysis).filter(AlarmLlmAnalysis.alarm_id == alarm_id).delete() + db.delete(alarm) + db.commit() + + logger.info(f"告警已删除: {alarm_id}") + return True + except Exception as e: + db.rollback() + logger.error(f"删除告警失败: {e}") + return False + finally: + db.close() + + def get_statistics(self) -> Dict: + """获取告警统计""" + db = get_session() + try: + total = db.query(AlarmEvent).count() + + # 按 alarm_status 计数 + by_status = {} + for row in db.query( + AlarmEvent.alarm_status, func.count(AlarmEvent.alarm_id) + ).group_by(AlarmEvent.alarm_status).all(): + by_status[row[0]] = row[1] + + # 按 alarm_type 计数 + by_type = {} + for row in db.query( + AlarmEvent.alarm_type, func.count(AlarmEvent.alarm_id) + ).group_by(AlarmEvent.alarm_type).all(): + by_type[row[0]] = row[1] + + # 按 alarm_level 计数 + by_level = {} + for row in db.query( + AlarmEvent.alarm_level, func.count(AlarmEvent.alarm_id) + ).group_by(AlarmEvent.alarm_level).all(): + by_level[row[0]] = row[1] + + return { + "total": total, + "byStatus": by_status, + "byType": by_type, + "byLevel": by_level, + } + finally: + db.close() + + def get_device_summary( + self, + page: int = 1, + page_size: int = 10, + ) -> Dict: + """按设备(摄像头)分组统计告警汇总""" + db = get_session() + try: + query = db.query( + AlarmEvent.device_id, + func.count(AlarmEvent.alarm_id).label("total_count"), + func.max(AlarmEvent.event_time).label("last_event_time"), + ).group_by(AlarmEvent.device_id) + + total = query.count() + + results = ( + query.order_by(func.count(AlarmEvent.alarm_id).desc()) + .offset((page - 1) * page_size) + .limit(page_size) + .all() + ) + + summary_list = [] + for row in results: + # 该设备待处理数量 + unhandled_count = ( + db.query(AlarmEvent) + .filter(AlarmEvent.device_id == row.device_id) + .filter(AlarmEvent.handle_status == "UNHANDLED") + .count() + ) + + # 最新一条告警 + latest = ( + db.query(AlarmEvent) + .filter(AlarmEvent.device_id == row.device_id) + .order_by(AlarmEvent.event_time.desc()) + .first() + ) + + summary_list.append({ + "deviceId": row.device_id, + "deviceName": row.device_id, + "totalCount": row.total_count, + "unhandledCount": unhandled_count, + "lastEventTime": row.last_event_time.isoformat() if row.last_event_time else None, + "lastAlarmType": latest.alarm_type if latest else None, + "lastAlarmTypeName": latest.alarm_type if latest else None, + }) + + return { + "list": summary_list, + "total": total, + } + finally: + db.close() + + def save_llm_analysis( + self, + alarm_id: str, + llm_model: str, + analysis_type: str, + summary: Optional[str] = None, + is_false_alarm: Optional[bool] = None, + risk_score: Optional[int] = None, + confidence_score: Optional[float] = None, + suggestion: Optional[str] = None, + ) -> Optional[AlarmLlmAnalysis]: + """保存大模型分析结果""" + db = get_session() + try: + analysis = AlarmLlmAnalysis( + alarm_id=alarm_id, + llm_model=llm_model, + analysis_type=analysis_type, + summary=summary, + is_false_alarm=is_false_alarm, + risk_score=risk_score, + confidence_score=confidence_score, + suggestion=suggestion, + ) + db.add(analysis) + db.commit() + db.refresh(analysis) + logger.info(f"LLM分析结果已保存: alarm={alarm_id}, model={llm_model}") + return analysis + except Exception as e: + db.rollback() + logger.error(f"保存LLM分析失败: {e}") + return None + finally: + db.close() + + +# 全局单例 +alarm_event_service = AlarmEventService() + + +def get_alarm_event_service() -> AlarmEventService: + """获取告警事件服务单例""" + return alarm_event_service diff --git a/app/services/oss_storage.py b/app/services/oss_storage.py index f6ac98c..c1287fc 100644 --- a/app/services/oss_storage.py +++ b/app/services/oss_storage.py @@ -1,110 +1,346 @@ +""" +对象存储服务(腾讯云 COS + 本地回退) + +功能: +- COS_ENABLED=true 时使用腾讯云 COS 存储 +- COS_ENABLED=false 时回退到本地 uploads/ 目录 +- 后端上传:服务端直接将图片/文件写入 COS +- 预签名下载:生成带鉴权的临时下载 URL +- STS 临时凭证:签发前端直传用的临时 AK/SK/Token +""" import uuid from datetime import datetime, timezone -from typing import Optional +from typing import Optional, Dict from pathlib import Path -# import oss2 # TODO: 阿里云 OSS 待配置完成后启用 from app.config import settings from app.utils.logger import logger -class OSSStorage: +# 按需导入 COS SDK,未安装时不影响本地模式 +_cos_client = None +_cos_available = False + +try: + from qcloud_cos import CosConfig, CosS3Client + _cos_available = True +except ImportError: + _cos_available = False + + +def _get_cos_client(): + """懒加载 COS 客户端单例""" + global _cos_client + if _cos_client is not None: + return _cos_client + + if not _cos_available: + logger.warning("qcloud_cos 未安装,使用本地存储模式") + return None + + cfg = settings.cos + if not cfg.enabled or not cfg.secret_id or not cfg.bucket: + return None + + try: + cos_config = CosConfig( + Region=cfg.region, + SecretId=cfg.secret_id, + SecretKey=cfg.secret_key, + Scheme="https", + ) + _cos_client = CosS3Client(cos_config) + logger.info(f"COS 客户端初始化成功: bucket={cfg.bucket}, region={cfg.region}") + return _cos_client + except Exception as e: + logger.error(f"COS 客户端初始化失败: {e}") + return None + + +def _generate_object_key(prefix: str = "", ext: str = ".jpg") -> str: """ - 图片存储类 - TODO: 阿里云 OSS 配置完成后,注释掉 use_local_only=True 即可启用 OSS + 生成对象存储 Key + 格式: {prefix}/{YYYY}/{MM}/{DD}/{YYYYMMDDHHmmss}_{uuid8}{ext} + 示例: alerts/2026/02/09/20260209153000_A1B2C3D4.jpg + """ + now = datetime.now(timezone.utc) + date_path = now.strftime("%Y/%m/%d") + timestamp = now.strftime("%Y%m%d%H%M%S") + unique_id = uuid.uuid4().hex[:8].upper() + prefix = prefix.strip("/") if prefix else settings.cos.upload_prefix + return f"{prefix}/{date_path}/{timestamp}_{unique_id}{ext}" + + +class COSStorage: + """ + 对象存储统一接口 + + - COS 模式:调用腾讯云 COS SDK + - 本地模式:写入 uploads/ 目录,返回相对路径 """ - def __init__(self, use_local_only: bool = True): - self.use_local_only = use_local_only - self.bucket = None + def __init__(self): + self._client = None + self._use_local = True + self._init() - if not self.use_local_only: - self._init_bucket() + def _init(self): + """初始化存储后端""" + if settings.cos.enabled: + client = _get_cos_client() + if client: + self._client = client + self._use_local = False + return - def _init_bucket(self): - """ - 初始化 OSS Bucket(待配置完成后启用) - """ - # TODO: 阿里云 OSS 配置完成后启用以下代码 - # if not settings.oss.access_key_id or not settings.oss.bucket_name: - # logger.warning("OSS配置不完整,将使用本地存储") - # self.use_local_only = True - # return - # - # try: - # auth = oss2.Auth(settings.oss.access_key_id, settings.oss.access_key_secret) - # self.bucket = oss2.Bucket(auth, settings.oss.endpoint, settings.oss.bucket_name) - # logger.info(f"OSS连接成功: {settings.oss.bucket_name}") - # except Exception as e: - # logger.error(f"OSS连接失败: {e}") - # self.bucket = None - logger.info("OSS存储已注释,启用本地存储模式") + logger.info("使用本地文件存储模式") + self._use_local = True + + @property + def is_cos_mode(self) -> bool: + return not self._use_local and self._client is not None + + # ======================== 上传 ======================== def upload_image(self, image_data: bytes, filename: Optional[str] = None) -> str: """ - 上传图片(当前使用本地存储,OSS 配置完成后可启用) + 上传图片,返回 object_key(COS 模式)或本地路径(本地模式) + + 数据库中存储此返回值,下载时通过 get_presigned_url() 获取临时访问地址。 """ - if self.use_local_only or not self.bucket: + if self._use_local: + return self._upload_local(image_data, filename) + return self._upload_cos(image_data, filename) + + def _upload_cos(self, image_data: bytes, filename: Optional[str] = None) -> str: + """上传到 COS""" + object_key = filename or _generate_object_key(ext=".jpg") + try: + self._client.put_object( + Bucket=settings.cos.bucket, + Body=image_data, + Key=object_key, + ContentType="image/jpeg", + ) + logger.info(f"COS 上传成功: {object_key}") + return object_key + except Exception as e: + logger.error(f"COS 上传失败,回退本地: {e}") return self._upload_local(image_data, filename) - # TODO: 阿里云 OSS 配置完成后启用以下代码 - # if not self.bucket: - # return self._upload_local(image_data, filename) - # - # if filename is None: - # timestamp = datetime.now().strftime("%Y%m%d%H%M%S") - # unique_id = uuid.uuid4().hex[:8] - # ext = ".jpg" - # filename = f"alerts/{timestamp}_{unique_id}{ext}" - # - # try: - # self.bucket.put_object(filename, image_data) - # url = f"{settings.oss.url_prefix}/{filename}" - # logger.info(f"图片上传OSS成功: {url}") - # return url - # except Exception as e: - # logger.error(f"图片上传OSS失败: {e}") - # return self._upload_local(image_data, filename) + def upload_file(self, file_data: bytes, object_key: str, content_type: str = "application/octet-stream") -> str: + """上传任意文件到 COS""" + if self._use_local: + return self._upload_local(file_data, object_key) + try: + self._client.put_object( + Bucket=settings.cos.bucket, + Body=file_data, + Key=object_key, + ContentType=content_type, + ) + logger.info(f"COS 文件上传成功: {object_key}") + return object_key + except Exception as e: + logger.error(f"COS 文件上传失败: {e}") + return self._upload_local(file_data, object_key) - return self._upload_local(image_data, filename) - - def _upload_local(self, image_data: bytes, filename: Optional[str] = None) -> str: - """ - 本地存储(当前使用) - """ + def _upload_local(self, data: bytes, filename: Optional[str] = None) -> str: + """本地存储回退""" upload_dir = Path("uploads") - upload_dir.mkdir(exist_ok=True) - if filename is None: - timestamp = datetime.now().strftime("%Y%m%d%H%M%S") - unique_id = uuid.uuid4().hex[:8] - filename = f"alerts/{timestamp}_{unique_id}.jpg" + filename = _generate_object_key(ext=".jpg") file_path = upload_dir / filename file_path.parent.mkdir(parents=True, exist_ok=True) with open(file_path, "wb") as f: - f.write(image_data) + f.write(data) local_url = f"/uploads/{filename}" - logger.info(f"图片保存本地: {local_url}") + logger.info(f"本地保存: {local_url}") return local_url + # ======================== 下载(预签名 URL) ======================== + + def get_presigned_url(self, object_key: str, expire: Optional[int] = None) -> str: + """ + 获取预签名下载 URL + + - COS 模式:生成带签名的临时 URL,过期后失效 + - 本地模式:直接返回本地路径 + """ + if self._use_local or object_key.startswith("/uploads/") or object_key.startswith("http"): + return object_key + + expire = expire or settings.cos.presign_expire + try: + url = self._client.get_presigned_download_url( + Bucket=settings.cos.bucket, + Key=object_key, + Expired=expire, + ) + return url + except Exception as e: + logger.error(f"生成预签名 URL 失败: {e}") + return object_key + + def get_presigned_upload_url(self, object_key: str, expire: Optional[int] = None) -> str: + """ + 获取预签名上传 URL(供前端直传) + + 前端拿到此 URL 后,直接 PUT 文件即可,无需经过后端中转。 + """ + if self._use_local: + return "" + + expire = expire or settings.cos.presign_expire + try: + url = self._client.get_presigned_url( + Method="PUT", + Bucket=settings.cos.bucket, + Key=object_key, + Expired=expire, + ) + return url + except Exception as e: + logger.error(f"生成预签名上传 URL 失败: {e}") + return "" + + # ======================== STS 临时凭证 ======================== + + def get_sts_credential(self, allow_prefix: Optional[str] = None) -> Optional[Dict]: + """ + 获取 STS 临时凭证(供前端 SDK 直传) + + 需安装: pip install qcloud-python-sts + + 返回: + { + "credentials": {"tmpSecretId": ..., "tmpSecretKey": ..., "sessionToken": ...}, + "expiredTime": ..., + "startTime": ..., + "bucket": ..., + "region": ..., + "allowPrefix": ... + } + """ + if self._use_local: + return None + + try: + from sts.sts import Sts + + cfg = settings.cos + prefix = allow_prefix or f"{cfg.upload_prefix}/*" + + # 提取 appid from bucket (格式: name-appid) + parts = cfg.bucket.rsplit("-", 1) + appid = parts[1] if len(parts) == 2 else "" + + sts_config = { + "duration_seconds": cfg.sts_expire, + "secret_id": cfg.secret_id, + "secret_key": cfg.secret_key, + "bucket": cfg.bucket, + "region": cfg.region, + "allow_prefix": prefix, + "allow_actions": [ + "name/cos:PutObject", + "name/cos:PostObject", + "name/cos:InitiateMultipartUpload", + "name/cos:ListMultipartUploads", + "name/cos:ListParts", + "name/cos:UploadPart", + "name/cos:CompleteMultipartUpload", + ], + "policy": { + "version": "2.0", + "statement": [ + { + "action": [ + "name/cos:PutObject", + "name/cos:PostObject", + "name/cos:InitiateMultipartUpload", + "name/cos:ListMultipartUploads", + "name/cos:ListParts", + "name/cos:UploadPart", + "name/cos:CompleteMultipartUpload", + ], + "effect": "allow", + "resource": [ + f"qcs::cos:{cfg.region}:uid/{appid}:{cfg.bucket}/{prefix}", + ], + } + ], + }, + } + + sts = Sts(sts_config) + response = sts.get_credential() + + result = { + "credentials": response["credentials"], + "expiredTime": response["expiredTime"], + "startTime": response.get("startTime"), + "bucket": cfg.bucket, + "region": cfg.region, + "allowPrefix": prefix, + } + + logger.info(f"STS 凭证签发成功, prefix={prefix}") + return result + + except ImportError: + logger.error("qcloud-python-sts 未安装,无法签发 STS 凭证。请运行: pip install qcloud-python-sts") + return None + except Exception as e: + logger.error(f"STS 凭证签发失败: {e}") + return None + + # ======================== 删除 ======================== + + def delete_object(self, object_key: str) -> bool: + """删除对象""" + if self._use_local: + local_path = Path(object_key.lstrip("/")) + if local_path.exists(): + local_path.unlink() + return True + return False + + try: + self._client.delete_object( + Bucket=settings.cos.bucket, + Key=object_key, + ) + logger.info(f"COS 对象已删除: {object_key}") + return True + except Exception as e: + logger.error(f"COS 删除失败: {e}") + return False + + # ======================== 兼容旧接口 ======================== + def get_url(self, path: str) -> str: - """ - 获取图片访问URL - """ + """获取访问 URL(兼容旧代码调用)""" + if not path: + return "" if path.startswith("http"): return path - # TODO: 阿里云 OSS 配置完成后启用以下代码 - # if self.bucket: - # return f"{settings.oss.url_prefix}/{path}" - return f"/uploads/{path}" if not path.startswith("/") else path + if path.startswith("/uploads/"): + return path + # COS object_key → 预签名 URL + return self.get_presigned_url(path) -# 使用本地存储模式(OSS 配置完成后可修改为 OSSStorage(use_local_only=False)) -oss_storage = OSSStorage(use_local_only=True) +# 全局单例 +_cos_storage: Optional[COSStorage] = None -def get_oss_storage() -> OSSStorage: - return oss_storage +def get_oss_storage() -> COSStorage: + """获取存储服务单例(保持旧函数名兼容)""" + global _cos_storage + if _cos_storage is None: + _cos_storage = COSStorage() + return _cos_storage diff --git a/main.py b/main.py deleted file mode 100644 index eb389a0..0000000 --- a/main.py +++ /dev/null @@ -1,16 +0,0 @@ -# 这是一个示例 Python 脚本。 - -# 按 Shift+F10 执行或将其替换为您的代码。 -# 按 双击 Shift 在所有地方搜索类、文件、工具窗口、操作和设置。 - - -def print_hi(name): - # 在下面的代码行中使用断点来调试脚本。 - print(f'Hi, {name}') # 按 Ctrl+F8 切换断点。 - - -# 按装订区域中的绿色按钮以运行脚本。 -if __name__ == '__main__': - print_hi('PyCharm') - -# 访问 https://www.jetbrains.com/help/pycharm/ 获取 PyCharm 帮助