@@ -6,16 +6,29 @@ import com.genersoft.iot.vmp.aiot.bean.AiAlgorithm;
import com.genersoft.iot.vmp.aiot.bean.AiConfigSnapshot ;
import com.genersoft.iot.vmp.aiot.bean.AiRoi ;
import com.genersoft.iot.vmp.aiot.bean.AiRoiAlgoBind ;
import com.genersoft.iot.vmp.aiot.config.AiServiceConfig ;
import com.genersoft.iot.vmp.aiot.dao.AiAlgorithmMapper ;
import com.genersoft.iot.vmp.aiot.dao.AiRoiAlgoBindMapper ;
import com.genersoft.iot.vmp.aiot.dao.AiRoiMapper ;
import com.genersoft.iot.vmp.aiot.service.IAiConfigService ;
import com.genersoft.iot.vmp.aiot.service.IAiConfigSnapshotService ;
import com.genersoft.iot.vmp.aiot.service.IAiRedisConfigService ;
import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy ;
import com.genersoft.iot.vmp.streamProxy.dao.StreamProxyMapper ;
import lombok.extern.slf4j.Slf4j ;
import org.springframework.beans.factory.annotation.Autowired ;
import org.springframework.context.annotation.Lazy ;
import org.springframework.stereotype.Service ;
import org.springframework.web.client.RestTemplate ;
import java.nio.charset.StandardCharsets ;
import java.nio.file.Files ;
import java.nio.file.Path ;
import java.util.concurrent.TimeUnit ;
import java.io.BufferedReader ;
import java.io.InputStreamReader ;
import org.springframework.http.MediaType ;
import org.springframework.http.HttpHeaders ;
import org.springframework.http.HttpEntity ;
import java.util.* ;
@@ -35,6 +48,12 @@ public class AiConfigServiceImpl implements IAiConfigService {
@Autowired
private IAiRedisConfigService redisConfigService ;
@Autowired
private AiServiceConfig aiServiceConfig ;
@Autowired
private StreamProxyMapper streamProxyMapper ;
@Lazy
@Autowired
private IAiConfigSnapshotService snapshotService ;
@@ -119,14 +138,285 @@ public class AiConfigServiceImpl implements IAiConfigService {
log . warn ( " [AiConfig] 摄像头 {} 未关联边缘设备,跳过聚合配置推送 " , cameraId ) ;
}
// 6. 返回推送结果
// 6. 本地调试:同步到 Edge HTTP 接口(保留原 Redis 流程)
boolean httpSyncOk = pushConfigToLocalEdge ( cameraId , config ) ;
// 7. 返回推送结果
Map < String , Object > result = new LinkedHashMap < > ( ) ;
result . put ( " camera_id " , cameraId ) ;
result . put ( " version " , snapshot . getVersion ( ) ) ;
result . put ( " push_status " , " success " ) ;
result . put ( " message " , " 配置已推送到Redis并通知边缘端 " ) ;
result . put ( " http_sync " , httpSyncOk ) ;
log . info ( " [AiConfig] 配置推送完成: cameraId={}, version={} " , cameraId , snapshot . getVersion ( ) ) ;
return result ;
}
@Override
public Map < String , Object > pushAllConfig ( ) {
List < AiRoi > rois = roiMapper . queryAll ( ) ;
List < AiRoiAlgoBind > binds = bindMapper . queryAll ( ) ;
Map < String , Object > payload = buildPayloadFromFlat ( rois , binds ) ;
payload . put ( " sync_mode " , " full " ) ;
boolean httpSyncOk = pushPayloadToLocalEdge ( payload ) ;
Map < String , Object > result = new LinkedHashMap < > ( ) ;
result . put ( " rois " , rois . size ( ) ) ;
result . put ( " binds " , binds . size ( ) ) ;
result . put ( " http_sync " , httpSyncOk ) ;
result . put ( " message " , " 已推送全部ROI和算法绑定到Edge " ) ;
log . info ( " [AiConfig] 全量推送完成 rois={}, binds={}, httpSync={} " , rois . size ( ) , binds . size ( ) , httpSyncOk ) ;
return result ;
}
private Map < String , Object > buildPayloadFromFlat ( List < AiRoi > rois , List < AiRoiAlgoBind > binds ) {
Map < String , Object > payload = new LinkedHashMap < > ( ) ;
List < Map < String , Object > > roiList = new ArrayList < > ( ) ;
List < Map < String , Object > > bindList = new ArrayList < > ( ) ;
Set < String > cameraIds = new LinkedHashSet < > ( ) ;
for ( AiRoi roi : rois ) {
cameraIds . add ( roi . getCameraId ( ) ) ;
Map < String , Object > roiOut = new LinkedHashMap < > ( ) ;
roiOut . put ( " roi_id " , roi . getRoiId ( ) ) ;
roiOut . put ( " camera_id " , roi . getCameraId ( ) ) ;
roiOut . put ( " roi_type " , roi . getRoiType ( ) ) ;
try {
roiOut . put ( " coordinates " , objectMapper . readValue ( roi . getCoordinates ( ) , Object . class ) ) ;
} catch ( Exception e ) {
roiOut . put ( " coordinates " , roi . getCoordinates ( ) ) ;
}
roiOut . put ( " enabled " , roi . getEnabled ( ) = = 1 ) ;
roiOut . put ( " priority " , roi . getPriority ( ) ) ;
roiList . add ( roiOut ) ;
}
for ( AiRoiAlgoBind bind : binds ) {
Map < String , Object > bindOut = new LinkedHashMap < > ( ) ;
bindOut . put ( " bind_id " , bind . getBindId ( ) ) ;
bindOut . put ( " roi_id " , bind . getRoiId ( ) ) ;
bindOut . put ( " algo_code " , bind . getAlgoCode ( ) ) ;
bindOut . put ( " enabled " , bind . getEnabled ( ) = = 1 ) ;
bindOut . put ( " priority " , bind . getPriority ( ) ) ;
try {
bindOut . put ( " params " , objectMapper . readValue ( bind . getParams ( ) , Object . class ) ) ;
} catch ( Exception e ) {
bindOut . put ( " params " , bind . getParams ( ) ) ;
}
AiAlgorithm algo = algorithmMapper . queryByCode ( bind . getAlgoCode ( ) ) ;
if ( algo ! = null ) {
bindOut . put ( " algo_name " , algo . getAlgoName ( ) ) ;
bindOut . put ( " target_class " , algo . getTargetClass ( ) ) ;
}
bindList . add ( bindOut ) ;
}
payload . put ( " camera_ids " , new ArrayList < > ( cameraIds ) ) ;
payload . put ( " rois " , roiList ) ;
payload . put ( " binds " , bindList ) ;
// 构建 cameras 列表(含 rtsp_url) , Edge 需要这些信息启动视频流
// 只保留在 wvp_stream_proxy 中存在且有有效 srcUrl 的摄像头
List < Map < String , Object > > cameraList = new ArrayList < > ( ) ;
Set < String > validCameraIds = new LinkedHashSet < > ( ) ;
for ( String cameraId : cameraIds ) {
String [ ] parts = cameraId . split ( " / " , 2 ) ;
if ( parts . length = = 2 ) {
StreamProxy proxy = streamProxyMapper . selectOneByAppAndStream ( parts [ 0 ] , parts [ 1 ] ) ;
if ( proxy ! = null & & proxy . getSrcUrl ( ) ! = null & & ! proxy . getSrcUrl ( ) . isEmpty ( ) ) {
Map < String , Object > camOut = new LinkedHashMap < > ( ) ;
camOut . put ( " camera_id " , cameraId ) ;
camOut . put ( " enabled " , true ) ;
camOut . put ( " rtsp_url " , proxy . getSrcUrl ( ) ) ;
camOut . put ( " camera_name " , parts [ 0 ] + " / " + parts [ 1 ] ) ;
cameraList . add ( camOut ) ;
validCameraIds . add ( cameraId ) ;
} else {
log . warn ( " [AiConfig] 跳过无效摄像头(无stream_proxy记录): {} " , cameraId ) ;
}
} else {
log . warn ( " [AiConfig] 跳过无效摄像头(camera_id格式错误): {} " , cameraId ) ;
}
}
payload . put ( " cameras " , cameraList ) ;
// 过滤掉不存在的摄像头对应的 ROI 和 Bind
if ( validCameraIds . size ( ) < cameraIds . size ( ) ) {
Set < String > staleCameraIds = new LinkedHashSet < > ( cameraIds ) ;
staleCameraIds . removeAll ( validCameraIds ) ;
log . info ( " [AiConfig] 过滤掉已失效的摄像头: {} " , staleCameraIds ) ;
// 收集有效ROI的roi_id, 用于过滤bindList
Set < Object > validRoiIds = new HashSet < > ( ) ;
roiList . removeIf ( roi - > {
String camId = ( String ) roi . get ( " camera_id " ) ;
boolean valid = validCameraIds . contains ( camId ) ;
if ( valid ) {
validRoiIds . add ( roi . get ( " roi_id " ) ) ;
}
return ! valid ;
} ) ;
bindList . removeIf ( bind - > ! validRoiIds . contains ( bind . get ( " roi_id " ) ) ) ;
payload . put ( " camera_ids " , new ArrayList < > ( validCameraIds ) ) ;
payload . put ( " rois " , roiList ) ;
payload . put ( " binds " , bindList ) ;
}
return payload ;
}
private boolean pushPayloadToLocalEdge ( Map < String , Object > payload ) {
String url = aiServiceConfig . getUrl ( ) + " /debug/sync " ;
String jsonBody = JSON . toJSONString ( payload ) ;
log . info ( " [AiConfig] 推送到Edge: url={}, body_len={} " , url , jsonBody . length ( ) ) ;
if ( jsonBody . length ( ) < = 1024 ) {
log . info ( " [AiConfig] 推送body: {} " , jsonBody ) ;
}
if ( aiServiceConfig . isLocalCallEnabled ( ) ) {
boolean ok = pushPayloadByLocalScript ( url , jsonBody ) ;
log . info ( " [AiConfig] 本地脚本推送结果: ok={} " , ok ) ;
return ok ;
}
RestTemplate restTemplate = new RestTemplate ( ) ;
HttpHeaders headers = new HttpHeaders ( ) ;
headers . setContentType ( MediaType . APPLICATION_JSON ) ;
HttpEntity < String > entity = new HttpEntity < > ( jsonBody , headers ) ;
try {
Map < String , Object > resp = restTemplate . postForObject ( url , entity , Map . class ) ;
boolean ok = resp ! = null & & Boolean . TRUE . equals ( resp . get ( " ok " ) ) ;
log . info ( " [AiConfig] HTTP推送结果: ok={}, url={}, body_len={} " , ok , url , jsonBody . length ( ) ) ;
return ok ;
} catch ( Exception e ) {
log . warn ( " [AiConfig] HTTP推送失败: {}, 尝试本地脚本 " , e . getMessage ( ) ) ;
boolean ok2 = pushPayloadByLocalScript ( url , jsonBody ) ;
log . info ( " [AiConfig] 本地脚本推送结果: ok={} " , ok2 ) ;
return ok2 ;
}
}
private boolean pushPayloadByLocalScript ( String url , String jsonBody ) {
try {
Path tmp = Files . createTempFile ( " edge-sync- " , " .json " ) ;
Files . writeString ( tmp , jsonBody , StandardCharsets . UTF_8 ) ;
String python = aiServiceConfig . getLocalPython ( ) ;
String script = aiServiceConfig . getLocalScriptPath ( ) ;
ProcessBuilder pb = new ProcessBuilder ( python , script , url , tmp . toString ( ) ) ;
pb . redirectErrorStream ( true ) ;
Process p = pb . start ( ) ;
StringBuilder output = new StringBuilder ( ) ;
try ( BufferedReader reader = new BufferedReader ( new InputStreamReader ( p . getInputStream ( ) , StandardCharsets . UTF_8 ) ) ) {
String line ;
while ( ( line = reader . readLine ( ) ) ! = null ) {
output . append ( line ) . append ( " \ n " ) ;
}
}
boolean finished = p . waitFor ( aiServiceConfig . getPushTimeout ( ) , TimeUnit . MILLISECONDS ) ;
if ( ! finished ) {
p . destroyForcibly ( ) ;
log . warn ( " [AiConfig] ??????: {} ms " , aiServiceConfig . getPushTimeout ( ) ) ;
return false ;
}
int code = p . exitValue ( ) ;
log . info ( " [AiConfig] ???????: {}, ??: {} " , code , output . toString ( ) . trim ( ) ) ;
return code = = 0 ;
} catch ( Exception e ) {
log . warn ( " [AiConfig] ????????: {} " , e . getMessage ( ) ) ;
return false ;
}
}
@SuppressWarnings ( " unchecked " )
private boolean pushConfigToLocalEdge ( String cameraId , Map < String , Object > config ) {
if ( ! aiServiceConfig . isEnabled ( ) ) {
return false ;
}
try {
Map < String , Object > payload = new LinkedHashMap < > ( ) ;
payload . put ( " camera_ids " , java . util . Collections . singletonList ( cameraId ) ) ;
List < Map < String , Object > > rois = new ArrayList < > ( ) ;
List < Map < String , Object > > binds = new ArrayList < > ( ) ;
Object roisObj = config . get ( " rois " ) ;
if ( roisObj instanceof List ) {
for ( Object roiItem : ( List < Object > ) roisObj ) {
if ( ! ( roiItem instanceof Map ) ) {
continue ;
}
Map < String , Object > roiMap = ( Map < String , Object > ) roiItem ;
Map < String , Object > roiOut = new LinkedHashMap < > ( ) ;
roiOut . put ( " roi_id " , roiMap . get ( " roi_id " ) ) ;
roiOut . put ( " camera_id " , cameraId ) ;
roiOut . put ( " roi_type " , roiMap . get ( " roi_type " ) ) ;
roiOut . put ( " coordinates " , roiMap . get ( " coordinates " ) ) ;
roiOut . put ( " enabled " , roiMap . getOrDefault ( " enabled " , true ) ) ;
roiOut . put ( " priority " , roiMap . getOrDefault ( " priority " , 0 ) ) ;
rois . add ( roiOut ) ;
Object algosObj = roiMap . get ( " algorithms " ) ;
if ( algosObj instanceof List ) {
for ( Object algoItem : ( List < Object > ) algosObj ) {
if ( ! ( algoItem instanceof Map ) ) {
continue ;
}
Map < String , Object > algoMap = ( Map < String , Object > ) algoItem ;
Map < String , Object > bindOut = new LinkedHashMap < > ( ) ;
bindOut . put ( " bind_id " , algoMap . get ( " bind_id " ) ) ;
bindOut . put ( " roi_id " , roiMap . get ( " roi_id " ) ) ;
bindOut . put ( " algo_code " , algoMap . get ( " algo_code " ) ) ;
bindOut . put ( " enabled " , algoMap . getOrDefault ( " enabled " , true ) ) ;
bindOut . put ( " priority " , algoMap . getOrDefault ( " priority " , 0 ) ) ;
bindOut . put ( " params " , algoMap . get ( " params " ) ) ;
bindOut . put ( " target_class " , algoMap . get ( " target_class " ) ) ;
binds . add ( bindOut ) ;
}
}
}
}
payload . put ( " rois " , rois ) ;
payload . put ( " binds " , binds ) ;
// 构建 cameras 列表(含 rtsp_url)
String [ ] parts = cameraId . split ( " / " , 2 ) ;
if ( parts . length = = 2 ) {
StreamProxy proxy = streamProxyMapper . selectOneByAppAndStream ( parts [ 0 ] , parts [ 1 ] ) ;
if ( proxy ! = null ) {
Map < String , Object > camOut = new LinkedHashMap < > ( ) ;
camOut . put ( " camera_id " , cameraId ) ;
camOut . put ( " rtsp_url " , proxy . getSrcUrl ( ) ) ;
camOut . put ( " camera_name " , cameraId ) ;
camOut . put ( " enabled " , true ) ;
payload . put ( " cameras " , java . util . Collections . singletonList ( camOut ) ) ;
}
}
log . info ( " [AiConfig] 构建Payload: cameraId={}, rois={}, binds={} " , cameraId , rois . size ( ) , binds . size ( ) ) ;
String url = aiServiceConfig . getUrl ( ) + " /debug/sync " ;
String jsonBody = JSON . toJSONString ( payload ) ;
log . info ( " [AiConfig] 发送JSON: url={}, body_len={} " , url , jsonBody . length ( ) ) ;
RestTemplate restTemplate = new RestTemplate ( ) ;
HttpHeaders headers = new HttpHeaders ( ) ;
headers . setContentType ( MediaType . APPLICATION_JSON ) ;
HttpEntity < String > entity = new HttpEntity < > ( jsonBody , headers ) ;
Map < String , Object > resp = restTemplate . postForObject ( url , entity , Map . class ) ;
boolean ok = resp ! = null & & Boolean . TRUE . equals ( resp . get ( " ok " ) ) ;
log . info ( " [AiConfig] 本地 HTTP 同步结果: ok={}, url={} " , ok , url ) ;
return ok ;
} catch ( Exception e ) {
log . warn ( " [AiConfig] 本地 HTTP 同步失败: {} " , e . getMessage ( ) ) ;
return false ;
}
}
}