""" 数据迁移脚本:从旧 alerts 表迁移到新三表结构 将 alerts 表数据迁移到: - alarm_event (主表) - alarm_event_ext (扩展表) - alarm_llm_analysis (大模型分析表) 运行方式: python -m app.migrations.migrate_to_alarm_event """ import json import sys import os # 确保项目根目录在 sys.path sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) from datetime import datetime, timezone from app.models import ( Alert, AlertStatus, AlertLevel, AlarmEvent, AlarmEventExt, AlarmLlmAnalysis, init_db, get_session ) from app.utils.logger import logger # 旧 level enum → 新 alarm_level 整数 LEVEL_MAP = { "low": 1, "medium": 2, "high": 3, "critical": 4, } # 旧 status enum → 新 alarm_status STATUS_MAP = { "pending": "NEW", "confirmed": "CONFIRMED", "ignored": "FALSE", "resolved": "CLOSED", "dispatched": "CLOSED", } # 旧 status → 新 handle_status HANDLE_STATUS_MAP = { "pending": "UNHANDLED", "confirmed": "DONE", "ignored": "DONE", "resolved": "DONE", "dispatched": "DONE", } def migrate(): """执行迁移""" init_db() db = get_session() try: # 检查是否已有数据 existing = db.query(AlarmEvent).count() if existing > 0: print(f"alarm_event 表已有 {existing} 条数据,跳过迁移。") print("如需重新迁移,请先清空 alarm_event / alarm_event_ext / alarm_llm_analysis 表。") return total = db.query(Alert).count() if total == 0: print("旧 alerts 表为空,无需迁移。") return print(f"开始迁移 {total} 条告警记录...") migrated = 0 ext_count = 0 llm_count = 0 errors = 0 # 分批处理 batch_size = 100 offset = 0 while offset < total: alerts = ( db.query(Alert) .order_by(Alert.id) .offset(offset) .limit(batch_size) .all() ) for alert in alerts: try: _migrate_one(db, alert) migrated += 1 # 统计扩展记录 if any([alert.bind_id, alert.bbox, alert.message]): ext_count += 1 if alert.ai_analysis: llm_count += 1 except Exception as e: errors += 1 logger.error(f"迁移告警 {alert.alert_no} 失败: {e}") db.commit() offset += batch_size print(f" 已迁移 {min(offset, total)}/{total}...") print(f"\n迁移完成:") print(f" alarm_event: {migrated} 条") print(f" alarm_event_ext: {ext_count} 条") print(f" alarm_llm_analysis: {llm_count} 条") print(f" 失败: {errors} 条") finally: db.close() def _migrate_one(db, alert: Alert): """迁移单条告警""" # 1. alarm_event old_status = alert.status.value if alert.status else "pending" old_level = alert.level.value if alert.level else "medium" # confidence: 旧表是 0-100 整数,新表是 0-1 float confidence_score = None if alert.confidence is not None: confidence_score = float(alert.confidence) / 100.0 # duration_minutes → duration_ms duration_ms = None if alert.duration_minutes is not None: duration_ms = int(alert.duration_minutes) * 60 * 1000 alarm = AlarmEvent( alarm_id=alert.alert_no, alarm_type=alert.alert_type, algorithm_code=alert.algorithm, device_id=alert.camera_id, scene_id=alert.roi_id, event_time=alert.trigger_time, duration_ms=duration_ms, alarm_level=LEVEL_MAP.get(old_level, 2), confidence_score=confidence_score, alarm_status=STATUS_MAP.get(old_status, "NEW"), handle_status=HANDLE_STATUS_MAP.get(old_status, "UNHANDLED"), snapshot_url=alert.snapshot_url, edge_node_id=alert.device_id, handler=alert.handled_by, handle_remark=alert.handle_remark, handled_at=alert.handled_at, created_at=alert.created_at, updated_at=alert.updated_at, ) db.add(alarm) # 2. alarm_event_ext ext_data = {} if alert.bind_id: ext_data["bind_id"] = alert.bind_id if alert.bbox: ext_data["bbox"] = alert.bbox if alert.message: ext_data["message"] = alert.message if ext_data: ext = AlarmEventExt( alarm_id=alert.alert_no, ext_type="EDGE", ext_data=ext_data, created_at=alert.created_at, ) db.add(ext) # 3. alarm_llm_analysis if alert.ai_analysis: ai = alert.ai_analysis if isinstance(ai, str): try: ai = json.loads(ai) except (json.JSONDecodeError, TypeError): ai = {"summary": ai} if isinstance(ai, dict): analysis = AlarmLlmAnalysis( alarm_id=alert.alert_no, llm_model=ai.get("model", "unknown"), analysis_type="REVIEW", summary=ai.get("summary") or ai.get("analysis"), is_false_alarm=ai.get("is_false_alarm"), risk_score=ai.get("risk_score"), confidence_score=ai.get("confidence"), suggestion=ai.get("suggestion") or ai.get("recommendation"), created_at=alert.updated_at or alert.created_at, ) db.add(analysis) if __name__ == "__main__": migrate()