462 lines
15 KiB
Python
462 lines
15 KiB
Python
|
|
"""
|
|||
|
|
云端同步服务
|
|||
|
|
|
|||
|
|
实现"云端为主、本地为辅"的双层数据存储架构:
|
|||
|
|
- 配置双向同步
|
|||
|
|
- 报警单向上报
|
|||
|
|
- 设备状态上报
|
|||
|
|
- 断网容错机制
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
import os
|
|||
|
|
import sys
|
|||
|
|
import time
|
|||
|
|
import threading
|
|||
|
|
import logging
|
|||
|
|
from datetime import datetime
|
|||
|
|
from typing import Optional, List, Dict, Any
|
|||
|
|
from queue import Queue, Empty
|
|||
|
|
from dataclasses import dataclass
|
|||
|
|
from enum import Enum
|
|||
|
|
|
|||
|
|
import requests
|
|||
|
|
from sqlalchemy.orm import Session
|
|||
|
|
|
|||
|
|
# 添加项目根目录到路径
|
|||
|
|
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|||
|
|
sys.path.insert(0, project_root)
|
|||
|
|
|
|||
|
|
from config import get_config
|
|||
|
|
|
|||
|
|
logger = logging.getLogger(__name__)
|
|||
|
|
|
|||
|
|
|
|||
|
|
class SyncStatus(Enum):
|
|||
|
|
"""同步状态"""
|
|||
|
|
PENDING = "pending"
|
|||
|
|
SYNCING = "syncing"
|
|||
|
|
SUCCESS = "success"
|
|||
|
|
FAILED = "failed"
|
|||
|
|
RETRY = "retry"
|
|||
|
|
|
|||
|
|
|
|||
|
|
class EntityType(Enum):
|
|||
|
|
"""实体类型"""
|
|||
|
|
CAMERA = "camera"
|
|||
|
|
ROI = "roi"
|
|||
|
|
ALARM = "alarm"
|
|||
|
|
STATUS = "status"
|
|||
|
|
|
|||
|
|
|
|||
|
|
@dataclass
|
|||
|
|
class SyncTask:
|
|||
|
|
"""同步任务"""
|
|||
|
|
entity_type: EntityType
|
|||
|
|
entity_id: int
|
|||
|
|
operation: str # create, update, delete
|
|||
|
|
data: Optional[Dict[str, Any]] = None
|
|||
|
|
status: SyncStatus = SyncStatus.PENDING
|
|||
|
|
retry_count: int = 0
|
|||
|
|
error_message: Optional[str] = None
|
|||
|
|
created_at: datetime = None
|
|||
|
|
|
|||
|
|
def __post_init__(self):
|
|||
|
|
if self.created_at is None:
|
|||
|
|
self.created_at = datetime.utcnow()
|
|||
|
|
|
|||
|
|
|
|||
|
|
class CloudAPIClient:
|
|||
|
|
"""云端 API 客户端"""
|
|||
|
|
|
|||
|
|
def __init__(self, base_url: str, api_key: str, device_id: str):
|
|||
|
|
self.base_url = base_url.rstrip('/')
|
|||
|
|
self.api_key = api_key
|
|||
|
|
self.device_id = device_id
|
|||
|
|
self.session = requests.Session()
|
|||
|
|
self.session.headers.update({
|
|||
|
|
'Authorization': f'Bearer {api_key}',
|
|||
|
|
'Content-Type': 'application/json',
|
|||
|
|
'X-Device-ID': device_id
|
|||
|
|
})
|
|||
|
|
|
|||
|
|
def request(self, method: str, path: str, **kwargs) -> requests.Response:
|
|||
|
|
"""发送 API 请求"""
|
|||
|
|
url = f"{self.base_url}{path}"
|
|||
|
|
response = self.session.request(method, url, **kwargs)
|
|||
|
|
response.raise_for_status()
|
|||
|
|
return response
|
|||
|
|
|
|||
|
|
def get(self, path: str, **kwargs):
|
|||
|
|
return self.request('GET', path, **kwargs)
|
|||
|
|
|
|||
|
|
def post(self, path: str, **kwargs):
|
|||
|
|
return self.request('POST', path, **kwargs)
|
|||
|
|
|
|||
|
|
def put(self, path: str, **kwargs):
|
|||
|
|
return self.request('PUT', path, **kwargs)
|
|||
|
|
|
|||
|
|
def delete(self, path: str, **kwargs):
|
|||
|
|
return self.request('DELETE', path, **kwargs)
|
|||
|
|
|
|||
|
|
|
|||
|
|
class SyncService:
|
|||
|
|
"""云端同步服务"""
|
|||
|
|
|
|||
|
|
def __init__(self):
|
|||
|
|
config = get_config()
|
|||
|
|
self.config = config
|
|||
|
|
|
|||
|
|
# 云端配置
|
|||
|
|
self.cloud_enabled = config.cloud.enabled
|
|||
|
|
self.cloud_url = config.cloud.api_url
|
|||
|
|
self.api_key = config.cloud.api_key
|
|||
|
|
self.device_id = config.cloud.device_id
|
|||
|
|
|
|||
|
|
# 同步配置
|
|||
|
|
self.sync_interval = config.cloud.sync_interval
|
|||
|
|
self.alarm_retry_interval = config.cloud.alarm_retry_interval
|
|||
|
|
self.status_report_interval = config.cloud.status_report_interval
|
|||
|
|
self.max_retries = config.cloud.max_retries
|
|||
|
|
|
|||
|
|
# 客户端
|
|||
|
|
self.client: Optional[CloudAPIClient] = None
|
|||
|
|
if self.cloud_enabled:
|
|||
|
|
self.client = CloudAPIClient(
|
|||
|
|
self.cloud_url,
|
|||
|
|
self.api_key,
|
|||
|
|
self.device_id
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# 任务队列
|
|||
|
|
self.sync_queue: Queue = Queue()
|
|||
|
|
self.alarm_queue: Queue = Queue()
|
|||
|
|
|
|||
|
|
# 状态
|
|||
|
|
self.running = False
|
|||
|
|
self.threads: List[threading.Thread] = []
|
|||
|
|
self.network_status = "disconnected"
|
|||
|
|
|
|||
|
|
# 重试配置
|
|||
|
|
self.retry_delays = [60, 300, 900, 3600] # 1分钟, 5分钟, 15分钟, 1小时
|
|||
|
|
|
|||
|
|
def start(self):
|
|||
|
|
"""启动同步服务"""
|
|||
|
|
if self.running:
|
|||
|
|
logger.warning("同步服务已在运行")
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
self.running = True
|
|||
|
|
|
|||
|
|
if self.cloud_enabled:
|
|||
|
|
logger.info(f"启动云端同步服务,设备ID: {self.device_id}")
|
|||
|
|
else:
|
|||
|
|
logger.info("云端同步已禁用,使用本地模式")
|
|||
|
|
|
|||
|
|
# 启动工作线程
|
|||
|
|
self.threads.append(threading.Thread(target=self._sync_worker, daemon=True))
|
|||
|
|
self.threads.append(threading.Thread(target=self._alarm_worker, daemon=True))
|
|||
|
|
self.threads.append(threading.Thread(target=self._status_worker, daemon=True))
|
|||
|
|
|
|||
|
|
for thread in self.threads:
|
|||
|
|
thread.start()
|
|||
|
|
|
|||
|
|
logger.info("同步服务已启动")
|
|||
|
|
|
|||
|
|
def stop(self):
|
|||
|
|
"""停止同步服务"""
|
|||
|
|
self.running = False
|
|||
|
|
|
|||
|
|
for thread in self.threads:
|
|||
|
|
if thread.is_alive():
|
|||
|
|
thread.join(timeout=5)
|
|||
|
|
|
|||
|
|
logger.info("同步服务已停止")
|
|||
|
|
|
|||
|
|
def _sync_worker(self):
|
|||
|
|
"""配置同步工作线程"""
|
|||
|
|
while self.running:
|
|||
|
|
try:
|
|||
|
|
task = self.sync_queue.get(timeout=1)
|
|||
|
|
self._execute_sync(task)
|
|||
|
|
except Empty:
|
|||
|
|
self._check_network_status()
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"同步工作线程异常: {e}")
|
|||
|
|
|
|||
|
|
def _alarm_worker(self):
|
|||
|
|
"""报警上报工作线程"""
|
|||
|
|
while self.running:
|
|||
|
|
try:
|
|||
|
|
alarm_id = self.alarm_queue.get(timeout=1)
|
|||
|
|
self._upload_alarm(alarm_id)
|
|||
|
|
except Empty:
|
|||
|
|
continue
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"报警上报工作线程异常: {e}")
|
|||
|
|
|
|||
|
|
def _status_worker(self):
|
|||
|
|
"""状态上报工作线程"""
|
|||
|
|
while self.running:
|
|||
|
|
try:
|
|||
|
|
if self.network_status == "connected":
|
|||
|
|
self._report_status()
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"状态上报失败: {e}")
|
|||
|
|
time.sleep(self.status_report_interval)
|
|||
|
|
|
|||
|
|
def _check_network_status(self):
|
|||
|
|
"""检查网络状态"""
|
|||
|
|
if not self.cloud_enabled:
|
|||
|
|
self.network_status = "disabled"
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
self.client.get('/health')
|
|||
|
|
self.network_status = "connected"
|
|||
|
|
except:
|
|||
|
|
self.network_status = "disconnected"
|
|||
|
|
|
|||
|
|
def _execute_sync(self, task: SyncTask):
|
|||
|
|
"""执行同步任务"""
|
|||
|
|
logger.info(f"执行同步任务: {task.entity_type.value}/{task.entity_id} ({task.operation})")
|
|||
|
|
|
|||
|
|
task.status = SyncStatus.SYNCING
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
if task.entity_type == EntityType.CAMERA:
|
|||
|
|
self._sync_camera(task)
|
|||
|
|
elif task.entity_type == EntityType.ROI:
|
|||
|
|
self._sync_roi(task)
|
|||
|
|
|
|||
|
|
task.status = SyncStatus.SUCCESS
|
|||
|
|
logger.info(f"同步成功: {task.entity_type.value}/{task.entity_id}")
|
|||
|
|
|
|||
|
|
except requests.exceptions.RequestException as e:
|
|||
|
|
self._handle_sync_error(task, str(e))
|
|||
|
|
except Exception as e:
|
|||
|
|
task.status = SyncStatus.FAILED
|
|||
|
|
task.error_message = str(e)
|
|||
|
|
logger.error(f"同步失败: {task.entity_type.value}/{task.entity_id}: {e}")
|
|||
|
|
|
|||
|
|
def _handle_sync_error(self, task: SyncTask, error: str):
|
|||
|
|
"""处理同步错误"""
|
|||
|
|
task.retry_count += 1
|
|||
|
|
|
|||
|
|
if task.retry_count < self.max_retries:
|
|||
|
|
task.status = SyncStatus.RETRY
|
|||
|
|
delay = self.retry_delays[task.retry_count - 1]
|
|||
|
|
task.error_message = f"第{task.retry_count}次失败: {error}"
|
|||
|
|
logger.warning(f"同步重试 ({task.retry_count}/{self.max_retries}): {task.entity_type.value}/{task.entity_id}")
|
|||
|
|
# 重新入队
|
|||
|
|
time.sleep(delay)
|
|||
|
|
self.sync_queue.put(task)
|
|||
|
|
else:
|
|||
|
|
task.status = SyncStatus.FAILED
|
|||
|
|
task.error_message = f"已超过最大重试次数: {error}"
|
|||
|
|
logger.error(f"同步失败,已达最大重试次数: {task.entity_type.value}/{task.entity_id}")
|
|||
|
|
|
|||
|
|
def _sync_camera(self, task: SyncTask):
|
|||
|
|
"""同步摄像头配置"""
|
|||
|
|
if task.operation == 'update':
|
|||
|
|
self.client.put(f"/api/v1/cameras/{task.entity_id}", json=task.data)
|
|||
|
|
elif task.operation == 'delete':
|
|||
|
|
self.client.delete(f"/api/v1/cameras/{task.entity_id}")
|
|||
|
|
|
|||
|
|
def _sync_roi(self, task: SyncTask):
|
|||
|
|
"""同步 ROI 配置"""
|
|||
|
|
if task.operation == 'update':
|
|||
|
|
self.client.put(f"/api/v1/rois/{task.entity_id}", json=task.data)
|
|||
|
|
elif task.operation == 'delete':
|
|||
|
|
self.client.delete(f"/api/v1/rois/{task.entity_id}")
|
|||
|
|
|
|||
|
|
def _upload_alarm(self, alarm_id: int):
|
|||
|
|
"""上传报警记录"""
|
|||
|
|
from db.crud import get_alarm_by_id, update_alarm_status
|
|||
|
|
from db.models import get_session_factory
|
|||
|
|
|
|||
|
|
SessionLocal = get_session_factory()
|
|||
|
|
db = SessionLocal()
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
alarm = get_alarm_by_id(db, alarm_id)
|
|||
|
|
if not alarm:
|
|||
|
|
logger.warning(f"报警记录不存在: {alarm_id}")
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
# 准备数据
|
|||
|
|
alarm_data = {
|
|||
|
|
'device_id': self.device_id,
|
|||
|
|
'camera_id': alarm.camera_id,
|
|||
|
|
'alarm_type': alarm.event_type,
|
|||
|
|
'confidence': alarm.confidence,
|
|||
|
|
'timestamp': alarm.created_at.isoformat() if alarm.created_at else None,
|
|||
|
|
'region': alarm.region_data
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
# 上传图片
|
|||
|
|
if alarm.snapshot_path and os.path.exists(alarm.snapshot_path):
|
|||
|
|
with open(alarm.snapshot_path, 'rb') as f:
|
|||
|
|
files = {'file': f}
|
|||
|
|
response = self.client.post('/api/v1/alarms/images', files=files)
|
|||
|
|
alarm_data['image_url'] = response.json().get('data', {}).get('url')
|
|||
|
|
|
|||
|
|
# 上报报警
|
|||
|
|
response = self.client.post('/api/v1/alarms/report', json=alarm_data)
|
|||
|
|
cloud_id = response.json().get('data', {}).get('alarm_id')
|
|||
|
|
|
|||
|
|
# 更新本地状态
|
|||
|
|
update_alarm_status(db, alarm_id, status='uploaded', cloud_id=cloud_id)
|
|||
|
|
logger.info(f"报警上报成功: {alarm_id} -> 云端ID: {cloud_id}")
|
|||
|
|
|
|||
|
|
except requests.exceptions.RequestException as e:
|
|||
|
|
update_alarm_status(db, alarm_id, status='retry', error_message=str(e))
|
|||
|
|
self.alarm_queue.put(alarm_id) # 重试
|
|||
|
|
except Exception as e:
|
|||
|
|
update_alarm_status(db, alarm_id, status='failed', error_message=str(e))
|
|||
|
|
logger.error(f"报警处理失败: {alarm_id}: {e}")
|
|||
|
|
finally:
|
|||
|
|
db.close()
|
|||
|
|
|
|||
|
|
def _report_status(self):
|
|||
|
|
"""上报设备状态"""
|
|||
|
|
import psutil
|
|||
|
|
from db.crud import get_active_camera_count
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
metrics = {
|
|||
|
|
'device_id': self.device_id,
|
|||
|
|
'cpu_percent': psutil.cpu_percent(),
|
|||
|
|
'memory_percent': psutil.virtual_memory().percent,
|
|||
|
|
'disk_usage': psutil.disk_usage('/').percent,
|
|||
|
|
'timestamp': datetime.utcnow().isoformat()
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
self.client.post('/api/v1/devices/status', json=metrics)
|
|||
|
|
logger.debug(f"设备状态上报成功: CPU={metrics['cpu_percent']}%")
|
|||
|
|
except requests.exceptions.RequestException as e:
|
|||
|
|
logger.warning(f"设备状态上报失败: {e}")
|
|||
|
|
|
|||
|
|
# 公共接口
|
|||
|
|
|
|||
|
|
def queue_camera_sync(self, camera_id: int, operation: str = 'update', data: Dict[str, Any] = None):
|
|||
|
|
"""将摄像头同步加入队列"""
|
|||
|
|
task = SyncTask(
|
|||
|
|
entity_type=EntityType.CAMERA,
|
|||
|
|
entity_id=camera_id,
|
|||
|
|
operation=operation,
|
|||
|
|
data=data
|
|||
|
|
)
|
|||
|
|
self.sync_queue.put(task)
|
|||
|
|
|
|||
|
|
def queue_roi_sync(self, roi_id: int, operation: str = 'update', data: Dict[str, Any] = None):
|
|||
|
|
"""将 ROI 同步加入队列"""
|
|||
|
|
task = SyncTask(
|
|||
|
|
entity_type=EntityType.ROI,
|
|||
|
|
entity_id=roi_id,
|
|||
|
|
operation=operation,
|
|||
|
|
data=data
|
|||
|
|
)
|
|||
|
|
self.sync_queue.put(task)
|
|||
|
|
|
|||
|
|
def queue_alarm_upload(self, alarm_id: int):
|
|||
|
|
"""将报警上传加入队列"""
|
|||
|
|
self.alarm_queue.put(alarm_id)
|
|||
|
|
|
|||
|
|
def sync_config_from_cloud(self, db: Session) -> Dict[str, int]:
|
|||
|
|
"""从云端拉取配置"""
|
|||
|
|
result = {'cameras': 0, 'rois': 0}
|
|||
|
|
|
|||
|
|
if not self.cloud_enabled:
|
|||
|
|
logger.info("云端同步已禁用,跳过配置拉取")
|
|||
|
|
return result
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
logger.info("从云端拉取配置...")
|
|||
|
|
|
|||
|
|
# 拉取设备配置
|
|||
|
|
response = self.client.get(f"/api/v1/devices/{self.device_id}/config")
|
|||
|
|
config = response.json().get('data', {})
|
|||
|
|
|
|||
|
|
# 处理摄像头
|
|||
|
|
cameras = config.get('cameras', [])
|
|||
|
|
for cloud_cam in cameras:
|
|||
|
|
self._merge_camera(db, cloud_cam)
|
|||
|
|
result['cameras'] += 1
|
|||
|
|
|
|||
|
|
logger.info(f"从云端拉取配置完成: {result['cameras']} 个摄像头")
|
|||
|
|
|
|||
|
|
except requests.exceptions.RequestException as e:
|
|||
|
|
logger.error(f"从云端拉取配置失败: {e}")
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"处理云端配置时出错: {e}")
|
|||
|
|
|
|||
|
|
return result
|
|||
|
|
|
|||
|
|
def _merge_camera(self, db: Session, cloud_data: Dict[str, Any]):
|
|||
|
|
"""合并摄像头配置"""
|
|||
|
|
from db.crud import get_camera_by_cloud_id, create_camera, update_camera
|
|||
|
|
from db.models import Camera
|
|||
|
|
|
|||
|
|
cloud_id = cloud_data.get('id')
|
|||
|
|
existing = get_camera_by_cloud_id(db, cloud_id)
|
|||
|
|
|
|||
|
|
if existing:
|
|||
|
|
# 更新现有记录
|
|||
|
|
if not existing.pending_sync:
|
|||
|
|
update_camera(db, existing.id, {
|
|||
|
|
'name': cloud_data.get('name'),
|
|||
|
|
'rtsp_url': cloud_data.get('rtsp_url'),
|
|||
|
|
'enabled': cloud_data.get('enabled', True),
|
|||
|
|
'fps_limit': cloud_data.get('fps_limit', 30),
|
|||
|
|
'process_every_n_frames': cloud_data.get('process_every_n_frames', 3),
|
|||
|
|
})
|
|||
|
|
else:
|
|||
|
|
# 创建新记录
|
|||
|
|
camera = create_camera(db, {
|
|||
|
|
'name': cloud_data.get('name'),
|
|||
|
|
'rtsp_url': cloud_data.get('rtsp_url'),
|
|||
|
|
'enabled': cloud_data.get('enabled', True),
|
|||
|
|
'fps_limit': cloud_data.get('fps_limit', 30),
|
|||
|
|
'process_every_n_frames': cloud_data.get('process_every_n_frames', 3),
|
|||
|
|
})
|
|||
|
|
# 更新 cloud_id
|
|||
|
|
camera.cloud_id = cloud_id
|
|||
|
|
db.commit()
|
|||
|
|
|
|||
|
|
def get_status(self) -> Dict[str, Any]:
|
|||
|
|
"""获取同步服务状态"""
|
|||
|
|
return {
|
|||
|
|
'running': self.running,
|
|||
|
|
'cloud_enabled': self.cloud_enabled,
|
|||
|
|
'network_status': self.network_status,
|
|||
|
|
'device_id': self.device_id,
|
|||
|
|
'pending_sync': self.sync_queue.qsize(),
|
|||
|
|
'pending_alarms': self.alarm_queue.qsize(),
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
|
|||
|
|
# 单例
|
|||
|
|
_sync_service: Optional[SyncService] = None
|
|||
|
|
|
|||
|
|
|
|||
|
|
def get_sync_service() -> SyncService:
|
|||
|
|
"""获取同步服务单例"""
|
|||
|
|
global _sync_service
|
|||
|
|
if _sync_service is None:
|
|||
|
|
_sync_service = SyncService()
|
|||
|
|
return _sync_service
|
|||
|
|
|
|||
|
|
|
|||
|
|
def start_sync_service():
|
|||
|
|
"""启动同步服务"""
|
|||
|
|
service = get_sync_service()
|
|||
|
|
service.start()
|
|||
|
|
|
|||
|
|
|
|||
|
|
def stop_sync_service():
|
|||
|
|
"""停止同步服务"""
|
|||
|
|
global _sync_service
|
|||
|
|
if _sync_service:
|
|||
|
|
_sync_service.stop()
|
|||
|
|
_sync_service = None
|