@@ -5,6 +5,7 @@ import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.aiot.service.IAiScreenshotService ;
import lombok.extern.slf4j.Slf4j ;
import org.springframework.beans.factory.annotation.Autowired ;
import org.springframework.beans.factory.annotation.Value ;
import org.springframework.data.redis.connection.stream.MapRecord ;
import org.springframework.data.redis.connection.stream.RecordId ;
import org.springframework.data.redis.core.StringRedisTemplate ;
@@ -15,24 +16,32 @@ import java.time.format.DateTimeFormatter;
import java.util.HashMap ;
import java.util.Map ;
import java.util.UUID ;
import java.util.concurrent.TimeUnit ;
import java.util.concurrent.* ;
@Slf4j
@Service
public class AiScreenshotServiceImpl implements IAiScreenshotService {
private static final String SNAP_REQUEST_STREAM = " edge_snap_request " ;
private static final String SNAP_RESULT_KEY_PREFIX = " snap:result: " ;
private static final String SNAP_CACHE_KEY_PREFIX = " snap:cache: " ;
private static final String SNAP_RESULT_KEY_PREFIX = " snap:result: " ;
/** 轮询间隔 ms */
private static final long POLL_INTERVAL_MS = 5 00;
/** 最大等待时间 ms */
private static final long MAX_WAIT_M S = 15_000 ;
/** 缓存 TTL( 秒) */
private static final long SNAP_CACHE_TTL = 3 00;
/** 最大等待时间(秒) */
private static final long MAX_WAIT_SECOND S = 15 ;
/** 降级 Redis 结果 TTL( 秒) */
private static final long SNAP_RESULT_TTL = 60 ;
/** 等待 Edge 回调的 pending 请求表 */
private final ConcurrentHashMap < String , CompletableFuture < Map < String , Object > > > pendingRequests = new ConcurrentHashMap < > ( ) ;
@Autowired
private StringRedisTemplate stringRedisTemplate ;
@Value ( " ${ai.screenshot.callback-url:} " )
private String callbackUrl ;
@Override
public Map < String , Object > requestScreenshot ( String cameraCode , boolean force ) {
Map < String , Object > result = new HashMap < > ( ) ;
@@ -58,11 +67,18 @@ public class AiScreenshotServiceImpl implements IAiScreenshotService {
String requestId = UUID . randomUUID ( ) . toString ( ) . replace ( " - " , " " ) . substring ( 0 , 12 ) ;
String cosPath = buildCosPath ( cameraCode , requestId ) ;
// 3. XADD 到 Stream
// 3. 创建 CompletableFuture 并注册到 pending 表
CompletableFuture < Map < String , Object > > future = new CompletableFuture < > ( ) ;
pendingRequests . put ( requestId , future ) ;
// 4. XADD 到 Stream( 含 callback_url)
Map < String , String > fields = new HashMap < > ( ) ;
fields . put ( " request_id " , requestId ) ;
fields . put ( " camera_code " , cameraCode ) ;
fields . put ( " cos_path " , cosPath ) ;
if ( callbackUrl ! = null & & ! callbackUrl . isEmpty ( ) ) {
fields . put ( " callback_url " , callbackUrl ) ;
}
try {
MapRecord < String , String , String > record = MapRecord . create ( SNAP_REQUEST_STREAM , fields ) ;
@@ -70,59 +86,107 @@ public class AiScreenshotServiceImpl implements IAiScreenshotService {
log . info ( " [AI截图] 发送截图请求: requestId={}, cameraCode={}, streamId={} " , requestId , cameraCode , recordId ) ;
} catch ( Exception e ) {
log . error ( " [AI截图] 发送截图请求失败: {} " , e . getMessage ( ) ) ;
pendingRequests . remove ( requestId ) ;
result . put ( " status " , " error " ) ;
result . put ( " message " , " 发送截图请求失败 " ) ;
return result ;
}
// 4 . 轮询 结果
String resultKey = SNAP_RESULT_KEY_PREFIX + requestId ;
long deadline = System . currentTimeMillis ( ) + MAX_WAIT_MS ;
while ( System . currentTimeMillis ( ) < deadline ) {
try {
Thread . sleep ( POLL_INTERVAL_MS ) ;
} catch ( InterruptedException e ) {
Thread . currentThread ( ) . interrupt ( ) ;
break ;
// 5 . 等待回调 结果
try {
Map < String , Object > callbackData = future . get ( MAX_WAIT_SECONDS , TimeUnit . SECONDS ) ;
String status = ( String ) callbackData . get ( " status " ) ;
result . put ( " status " , status ) ;
if ( " ok " . equals ( status ) ) {
result . put ( " url " , callbackData . get ( " url " ) ) ;
} else {
result . put ( " message " , callbackData . get ( " message " ) ) ;
}
String resultJson = stringRedisTemplate . opsForValue ( ) . get ( resultKey ) ;
return result ;
} catch ( TimeoutException e ) {
// 超时 → 降级检查 Redis 结果( Edge 回调失败时可能回退写 Redis)
log . warn ( " [AI截图] 回调超时,检查 Redis 降级: requestId={} " , requestId ) ;
String resultJson = stringRedisTemplate . opsForValue ( ) . get ( SNAP_RESULT_KEY_PREFIX + requestId ) ;
if ( resultJson ! = null ) {
try {
JSONObject res = JSON . parseObject ( resultJson ) ;
result . put ( " status " , res . getString ( " status " ) ) ;
if ( " ok " . equals ( res . getString ( " status " ) ) ) {
result . put ( " url " , res . getString ( " url " ) ) ;
// 降级成功,写入缓存
writeCache ( cameraCode , res . getString ( " url " ) ) ;
} else {
result . put ( " message " , res . getString ( " message " ) ) ;
}
// 清理结果 key
stringRedisTemplate . delete ( resultKey ) ;
stringRedisTemplate . delete ( SNAP_RESULT_KEY_PREFIX + requestId ) ;
return result ;
} catch ( Exception e ) {
log . warn ( " [AI截图] 结果解析失败: {} " , e . getMessage ( ) ) ;
} catch ( Exception ex ) {
log . warn ( " [AI截图] 降级 结果解析失败: {} " , ex . getMessage ( ) ) ;
}
}
}
// 5. 超时 → 尝试返回过期缓存
log . warn ( " [AI截图] 截图超时: cameraCode={}, requestId={} " , cameraCode , requestId ) ;
String staleCache = stringRedisTemplate . opsForValue ( ) . get ( SNAP_CACHE_KEY_PREFIX + cameraCode ) ;
if ( staleCache ! = null ) {
try {
JSONObject cached = JSON . parseObject ( staleCache ) ;
result . put ( " status " , " ok " ) ;
result . put ( " url " , cached . getString ( " url " ) ) ;
result . put ( " stale " , true ) ;
return result ;
} catch ( Exception ignored ) {
// Redis 降级也没有 → 尝试返回过期缓存
log . warn ( " [AI截图] 截图超时: cameraCode={}, requestId={} " , cameraCode , requestId ) ;
String staleCache = stringRedisTemplate . opsForValue ( ) . get ( SNAP_CACHE_KEY_PREFIX + cameraCode ) ;
if ( staleCache ! = null ) {
try {
JSONObject cached = JSON . parseObject ( staleCache ) ;
result . put ( " status " , " ok " ) ;
result . put ( " url " , cached . getString ( " url " ) ) ;
result . put ( " stale " , true ) ;
return result ;
} catch ( Exception ignored ) {
}
}
result . put ( " status " , " timeout " ) ;
result . put ( " message " , " 边缘设备响应超时,请确认设备在线 " ) ;
return result ;
} catch ( Exception e ) {
log . error ( " [AI截图] 等待回调异常: {} " , e . getMessage ( ) ) ;
result . put ( " status " , " error " ) ;
result . put ( " message " , " 截图请求异常 " ) ;
return result ;
} finally {
pendingRequests . remove ( requestId ) ;
}
}
@Override
public void handleCallback ( String requestId , Map < String , Object > data ) {
if ( requestId = = null | | requestId . isEmpty ( ) ) {
log . warn ( " [AI截图] 回调缺少 request_id " ) ;
return ;
}
result . put ( " status " , " timeout " ) ;
result . put ( " message " , " 边缘设备响应超时,请确认设备在线 " ) ;
return result ;
CompletableFuture < Map < String , Object > > future = pendingRequests . get ( requestId ) ;
if ( future ! = null ) {
future . complete ( data ) ;
log . info ( " [AI截图] 回调完成: requestId={} " , requestId ) ;
} else {
log . warn ( " [AI截图] 回调未找到对应请求(可能已超时): requestId={} " , requestId ) ;
}
// 写入 Redis 缓存(无论 future 是否存在,缓存都应更新)
String cameraCode = ( String ) data . get ( " camera_code " ) ;
String status = ( String ) data . get ( " status " ) ;
if ( " ok " . equals ( status ) & & cameraCode ! = null ) {
String url = ( String ) data . get ( " url " ) ;
writeCache ( cameraCode , url ) ;
}
}
/**
* 写入截图缓存
*/
private void writeCache ( String cameraCode , String url ) {
String cacheKey = SNAP_CACHE_KEY_PREFIX + cameraCode ;
String cacheData = JSON . toJSONString ( Map . of ( " url " , url , " timestamp " , System . currentTimeMillis ( ) / 1000 ) ) ;
try {
stringRedisTemplate . opsForValue ( ) . set ( cacheKey , cacheData , SNAP_CACHE_TTL , TimeUnit . SECONDS ) ;
} catch ( Exception e ) {
log . warn ( " [AI截图] 写入缓存失败: {} " , e . getMessage ( ) ) ;
}
}
/**