功能:area_id 全链路支持 + 截图处理器独立 Redis 连接

- CameraInfo 模型添加 area_id 字段
- SQLite 表增加 area_id 列及迁移
- config_sync 同步 area_id 到本地
- 告警 ext_data 携带 area_id
- 截图处理器使用独立 Redis 连接,避免与配置同步阻塞冲突
- get_all_camera_configs 使用 cursor.description 动态获取列名

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-18 16:05:04 +08:00
parent 0d88ed7fbb
commit 3d91aa1a67
4 changed files with 48 additions and 33 deletions

View File

@@ -49,7 +49,8 @@ class CameraInfo:
enabled: bool = True
location: Optional[str] = None
extra_params: Optional[Dict[str, Any]] = None
area_id: Optional[int] = None
def to_dict(self) -> Dict[str, Any]:
"""转换为字典"""
return {
@@ -60,8 +61,9 @@ class CameraInfo:
"enabled": self.enabled,
"location": self.location,
"extra_params": self.extra_params,
"area_id": self.area_id,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'CameraInfo':
"""从字典创建实例"""
@@ -73,6 +75,7 @@ class CameraInfo:
enabled=data.get("enabled", True),
location=data.get("location"),
extra_params=data.get("extra_params"),
area_id=data.get("area_id"),
)

View File

@@ -233,6 +233,7 @@ class SQLiteManager:
location TEXT,
roi_group_id TEXT,
extra_params TEXT,
area_id INTEGER,
updated_at TEXT
)
""")
@@ -250,7 +251,14 @@ class SQLiteManager:
""")
self._conn.commit()
# 迁移:为已有数据库添加 area_id 列
try:
cursor.execute("ALTER TABLE camera_configs ADD COLUMN area_id INTEGER")
self._conn.commit()
except Exception:
pass # 列已存在,忽略
self._init_default_algorithms()
def _init_default_algorithms(self):
@@ -555,8 +563,8 @@ class SQLiteManager:
cursor.execute("""
INSERT OR REPLACE INTO camera_configs (
camera_id, rtsp_url, camera_name, status, enabled,
location, roi_group_id, extra_params, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
location, roi_group_id, extra_params, area_id, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
camera_id, rtsp_url,
kwargs.get('camera_name'),
@@ -565,6 +573,7 @@ class SQLiteManager:
kwargs.get('location'),
kwargs.get('roi_group_id'),
str(kwargs.get('extra_params')) if kwargs.get('extra_params') else None,
kwargs.get('area_id'),
now
))
self._conn.commit()
@@ -580,8 +589,7 @@ class SQLiteManager:
cursor.execute("SELECT * FROM camera_configs WHERE camera_id = ?", (camera_id,))
row = cursor.fetchone()
if row:
columns = ['camera_id', 'rtsp_url', 'camera_name', 'status',
'enabled', 'location', 'roi_group_id', 'extra_params', 'updated_at']
columns = [desc[0] for desc in cursor.description]
return dict(zip(columns, row))
return None
except Exception as e:
@@ -593,8 +601,7 @@ class SQLiteManager:
try:
cursor = self._conn.cursor()
cursor.execute("SELECT * FROM camera_configs ORDER BY camera_id")
columns = ['camera_id', 'rtsp_url', 'camera_name', 'status',
'enabled', 'location', 'roi_group_id', 'extra_params', 'updated_at']
columns = [desc[0] for desc in cursor.description]
return [dict(zip(columns, row)) for row in cursor.fetchall()]
except Exception as e:
logger.error(f"获取所有摄像头配置失败: {e}")

View File

@@ -540,6 +540,7 @@ class ConfigSyncManager:
camera_name=cam.get("camera_name", ""),
enabled=cam.get("enabled", True),
location=cam.get("location", ""),
area_id=cam.get("area_id"),
)
count += 1
except Exception as e:

52
main.py
View File

@@ -185,29 +185,27 @@ class EdgeInferenceService:
def _init_screenshot_handler(self):
"""初始化截图处理器"""
try:
# 优先从 config_manager 获取已有的云端 Redis 连接
cloud_redis = getattr(self._config_manager, '_cloud_redis', None)
# LOCAL 模式下 config_manager 不初始化云端 Redis需要独立创建
if cloud_redis is None:
try:
import redis
cfg = self._settings.cloud_redis
cloud_redis = redis.Redis(
host=cfg.host,
port=cfg.port,
db=cfg.db,
password=cfg.password,
decode_responses=cfg.decode_responses,
socket_connect_timeout=5,
socket_timeout=5,
retry_on_timeout=True,
)
cloud_redis.ping()
self._logger.info(f"截图处理器独立连接云端 Redis 成功: {cfg.host}:{cfg.port}/{cfg.db}")
except Exception as e:
self._logger.warning(f"截图处理器无法连接云端 Redis: {e}")
cloud_redis = None
# 截图处理器必须使用独立的 Redis 连接(不能与 config_sync 共用,
# 因为两者都做阻塞 XREAD/XREADGROUP共用连接会互相干扰
cloud_redis = None
try:
import redis
cfg = self._settings.cloud_redis
cloud_redis = redis.Redis(
host=cfg.host,
port=cfg.port,
db=cfg.db,
password=cfg.password,
decode_responses=cfg.decode_responses,
socket_connect_timeout=5,
socket_timeout=10,
retry_on_timeout=True,
)
cloud_redis.ping()
self._logger.info(f"截图处理器独立连接云端 Redis 成功: {cfg.host}:{cfg.port}/{cfg.db}")
except Exception as e:
self._logger.warning(f"截图处理器无法连接云端 Redis: {e}")
cloud_redis = None
if cloud_redis and self._stream_manager:
self._screenshot_handler = ScreenshotHandler(
@@ -363,7 +361,12 @@ class EdgeInferenceService:
)
self._logger.info("所有组件初始化完成")
def _get_camera_area_id(self, camera_id: str) -> Optional[int]:
"""获取摄像头的 area_id"""
cam = self._get_camera_config_by_id(camera_id)
return cam.area_id if cam else None
def _get_camera_ids_with_roi(self) -> set:
"""获取有ROI配置的摄像头ID集合
@@ -844,6 +847,7 @@ class EdgeInferenceService:
"edge_node_id": self._settings.mqtt.device_id,
"first_frame_time": first_frame_time,
"vehicle_count": alert.get("vehicle_count"),
"area_id": self._get_camera_area_id(camera_id),
},
)
self._reporter.report_alarm(alarm_info, screenshot=frame.image)