diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/SseController.java b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/SseController.java index b9610647e..16e5a4d74 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/SseController.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/SseController.java @@ -1,6 +1,5 @@ package com.genersoft.iot.vmp.gb28181.controller; -import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.gb28181.event.alarm.AlarmEventListener; import io.swagger.v3.oas.annotations.tags.Tag; import org.springframework.web.bind.annotation.GetMapping; @@ -29,9 +28,6 @@ public class SseController { @Resource private AlarmEventListener alarmEventListener; - @Resource - private DynamicTask dynamicTask; - /** * SSE 推送. * @@ -45,17 +41,7 @@ public class SseController { public void emit(HttpServletResponse response, @RequestParam String browserId) throws IOException, InterruptedException { response.setContentType("text/event-stream"); response.setCharacterEncoding("utf-8"); - PrintWriter writer = response.getWriter(); alarmEventListener.addSseEmitter(browserId, writer); - - dynamicTask.startCron("sse-key", new Runnable() { - @Override - public void run() { - writer.write(":keep alive\n\n"); - writer.flush(); - } - }, 1000); - alarmEventListener.removeSseEmitter(browserId, writer); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/alarm/AlarmEventListener.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/alarm/AlarmEventListener.java index 3610c2a3d..e20e93534 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/alarm/AlarmEventListener.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/alarm/AlarmEventListener.java @@ -22,16 +22,27 @@ import java.util.concurrent.ConcurrentHashMap; @Component public class AlarmEventListener implements ApplicationListener { - private static final Map SSE_CACHE = new ConcurrentHashMap<>(); + private static final Map sseChannelMap = new ConcurrentHashMap<>(); + + public void addSseEmitter(String browserId, PrintWriter writer) throws InterruptedException { + sseChannelMap.put(browserId, writer); + log.info("[SSE推送] 连接已建立, 浏览器 ID: {}, 当前在线数: {}", browserId, sseChannelMap.size()); + while (!writer.checkError()) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + writer.write(":keep alive\n\n"); + writer.flush(); + } + removeSseEmitter(browserId, writer); - public void addSseEmitter(String browserId, PrintWriter writer) { - SSE_CACHE.put(browserId, writer); - log.info("[SSE推送] 连接已建立, 浏览器 ID: {}, 当前在线数: {}", browserId, SSE_CACHE.size()); } public void removeSseEmitter(String browserId, PrintWriter writer) { - SSE_CACHE.remove(browserId, writer); - log.info("[SSE推送] 连接已断开, 浏览器 ID: {}, 当前在线数: {}", browserId, SSE_CACHE.size()); + sseChannelMap.remove(browserId, writer); + log.info("[SSE推送] 连接已断开, 浏览器 ID: {}, 当前在线数: {}", browserId, sseChannelMap.size()); } @Override @@ -42,13 +53,7 @@ public class AlarmEventListener implements ApplicationListener { log.info("设备报警事件触发, deviceId: {}, {}", event.getAlarmInfo().getDeviceId(), event.getAlarmInfo().getAlarmDescription()); - - String msg = "设备: " + event.getAlarmInfo().getDeviceId() + "" - + "
通道编号: " + event.getAlarmInfo().getChannelId() + "" - + "
报警描述: " + event.getAlarmInfo().getAlarmDescription() + "" - + "
报警时间: " + event.getAlarmInfo().getAlarmTime() + ""; - - for (Iterator> it = SSE_CACHE.entrySet().iterator(); it.hasNext(); ) { + for (Iterator> it = sseChannelMap.entrySet().iterator(); it.hasNext(); ) { Map.Entry response = it.next(); try { @@ -59,12 +64,6 @@ public class AlarmEventListener implements ApplicationListener { continue; } - String sseMsg = "event:message\n" + - "data:" + msg + "\n" + - "\n"; - System.out.println( - SSEMessage.getInstance("message", event.getAlarmInfo()).ecode() - ); writer.write(SSEMessage.getInstance("message", event.getAlarmInfo()).ecode()); writer.flush(); } catch (Exception e) {