优化:Redis 连接增强 — TCP keepalive 适配 + 截图处理器独立重连
- 新增 _build_keepalive_options() 适配 Linux/Windows TCP keepalive - health_check_interval 30→15秒,更快发现断连 - 截图处理器新增 _reconnect_cloud_redis() 独立重连能力 - 截图监听捕获 ConnectionError 主动重连,不再退避到60秒
This commit is contained in:
@@ -14,6 +14,8 @@
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import platform
|
||||
import socket
|
||||
import threading
|
||||
import time
|
||||
from datetime import datetime
|
||||
@@ -30,6 +32,23 @@ from utils.version_control import get_version_control
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _build_keepalive_options():
|
||||
"""构建 TCP keepalive 选项,适配 Linux/Windows"""
|
||||
opts = {}
|
||||
if platform.system() == "Linux":
|
||||
# TCP_KEEPIDLE: 连接空闲 15s 后开始发送 keepalive 探测
|
||||
# TCP_KEEPINTVL: 每次探测间隔 5s
|
||||
# TCP_KEEPCNT: 连续 3 次探测失败则判定断连
|
||||
opts = {
|
||||
socket.TCP_KEEPIDLE: 15,
|
||||
socket.TCP_KEEPINTVL: 5,
|
||||
socket.TCP_KEEPCNT: 3,
|
||||
}
|
||||
# Windows 不支持 TCP_KEEPIDLE/KEEPINTVL/KEEPCNT,
|
||||
# 但 socket_keepalive=True 仍会启用默认 keepalive
|
||||
return opts
|
||||
|
||||
|
||||
# ==================== Redis Key 常量 ====================
|
||||
|
||||
# 云端 Redis Keys
|
||||
@@ -207,7 +226,8 @@ class ConfigSyncManager:
|
||||
socket_timeout=10,
|
||||
retry_on_timeout=True,
|
||||
socket_keepalive=True,
|
||||
health_check_interval=30,
|
||||
socket_keepalive_options=_build_keepalive_options(),
|
||||
health_check_interval=15,
|
||||
)
|
||||
self._cloud_redis.ping()
|
||||
logger.info(f"云端 Redis 连接成功: {cfg.host}:{cfg.port}/{cfg.db}")
|
||||
|
||||
@@ -16,6 +16,7 @@ from typing import Optional
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
import redis
|
||||
import requests
|
||||
|
||||
from config.settings import get_settings, COSConfig
|
||||
@@ -121,6 +122,32 @@ class ScreenshotHandler:
|
||||
else:
|
||||
logger.error("[截图] 创建 consumer group 失败: %s", e)
|
||||
|
||||
# ==================== 重连 ====================
|
||||
|
||||
def _reconnect_cloud_redis(self):
|
||||
"""重建云端 Redis 连接"""
|
||||
try:
|
||||
from core.config_sync import _build_keepalive_options
|
||||
cfg = self._settings.cloud_redis
|
||||
self._cloud_redis = redis.Redis(
|
||||
host=cfg.host,
|
||||
port=cfg.port,
|
||||
db=cfg.db,
|
||||
password=cfg.password,
|
||||
decode_responses=cfg.decode_responses,
|
||||
socket_connect_timeout=5,
|
||||
socket_timeout=10,
|
||||
retry_on_timeout=True,
|
||||
socket_keepalive=True,
|
||||
socket_keepalive_options=_build_keepalive_options(),
|
||||
health_check_interval=15,
|
||||
)
|
||||
self._cloud_redis.ping()
|
||||
logger.info("[截图] 云端 Redis 重连成功")
|
||||
except Exception as e:
|
||||
logger.warning("[截图] 云端 Redis 重连失败: %s", e)
|
||||
self._cloud_redis = None
|
||||
|
||||
# ==================== 主循环 ====================
|
||||
|
||||
def _listen_loop(self):
|
||||
@@ -157,6 +184,14 @@ class ScreenshotHandler:
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
except redis.ConnectionError as e:
|
||||
if self._stop_event.is_set():
|
||||
return
|
||||
logger.warning("[截图] 云端 Redis 连接断开: %s, %ds 后重连...", e, backoff)
|
||||
self._reconnect_cloud_redis()
|
||||
self._stop_event.wait(backoff)
|
||||
backoff = min(backoff * 2, max_backoff)
|
||||
|
||||
except Exception as e:
|
||||
if self._stop_event.is_set():
|
||||
return
|
||||
|
||||
4
main.py
4
main.py
@@ -191,6 +191,7 @@ class EdgeInferenceService:
|
||||
try:
|
||||
import redis
|
||||
cfg = self._settings.cloud_redis
|
||||
from core.config_sync import _build_keepalive_options
|
||||
cloud_redis = redis.Redis(
|
||||
host=cfg.host,
|
||||
port=cfg.port,
|
||||
@@ -201,7 +202,8 @@ class EdgeInferenceService:
|
||||
socket_timeout=10,
|
||||
retry_on_timeout=True,
|
||||
socket_keepalive=True,
|
||||
health_check_interval=30,
|
||||
socket_keepalive_options=_build_keepalive_options(),
|
||||
health_check_interval=15,
|
||||
)
|
||||
cloud_redis.ping()
|
||||
self._logger.info(f"截图处理器独立连接云端 Redis 成功: {cfg.host}:{cfg.port}/{cfg.db}")
|
||||
|
||||
Reference in New Issue
Block a user