From 71390a7b51465a1df10df45d372ddbdfedefa5f0 Mon Sep 17 00:00:00 2001 From: lin <648540858@qq.com> Date: Sat, 8 Feb 2025 17:29:31 +0800 Subject: [PATCH] =?UTF-8?q?[=E9=9B=86=E7=BE=A4]=20=E5=A2=9E=E5=8A=A0?= =?UTF-8?q?=E8=BF=9C=E7=A8=8B=E9=87=8D=E5=90=AF/=E5=BD=95=E5=83=8F?= =?UTF-8?q?=E6=8E=A7=E5=88=B6/=E5=B8=83=E9=98=B2=EF=BC=88=E6=88=96?= =?UTF-8?q?=E6=92=A4=E9=98=B2=EF=BC=89/=E9=87=8D=E7=BD=AE=E6=8A=A5?= =?UTF-8?q?=E8=AD=A6=EF=BC=8C=20=E5=A2=9E=E5=8A=A0=E5=85=B3=E9=97=AD?= =?UTF-8?q?=E6=8E=A8=E6=B5=81=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../vmp/gb28181/controller/DeviceControl.java | 99 +++-------------- .../vmp/gb28181/service/IDeviceService.java | 8 ++ .../service/impl/DeviceServiceImpl.java | 82 ++++++++++++++ .../service/redisMsg/IRedisRpcService.java | 6 + .../control/RedisRpcDeviceController.java | 105 ++++++++++++++++++ .../redisMsg/service/RedisRpcServiceImpl.java | 34 ++++++ .../controller/StreamPushController.java | 8 ++ .../service/IStreamPushPlayService.java | 3 +- .../impl/StreamPushPlayServiceImpl.java | 11 +- 9 files changed, 271 insertions(+), 85 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/DeviceControl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/DeviceControl.java index 55017deb7..a9daa5436 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/DeviceControl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/DeviceControl.java @@ -24,6 +24,7 @@ import io.swagger.v3.oas.annotations.tags.Tag; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.ResponseEntity; +import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; import org.springframework.web.bind.annotation.*; import org.springframework.web.context.request.async.DeferredResult; @@ -61,12 +62,8 @@ public class DeviceControl { log.debug("设备远程启动API调用"); } Device device = deviceService.getDeviceByDeviceId(deviceId); - try { - cmder.teleBootCmd(device); - } catch (InvalidArgumentException | SipException | ParseException e) { - log.error("[命令发送失败] 远程启动: {}", e.getMessage()); - throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage()); - } + Assert.notNull(device, "设备不存在"); + deviceService.teleboot(device); } /** @@ -81,41 +78,18 @@ public class DeviceControl { @Parameter(name = "channelId", description = "通道国标编号", required = true) @Parameter(name = "recordCmdStr", description = "命令, 可选值:Record(手动录像),StopRecord(停止手动录像)", required = true) @GetMapping("/record/{deviceId}/{recordCmdStr}") - public DeferredResult>> recordApi(@PathVariable String deviceId, + public DeferredResult recordApi(@PathVariable String deviceId, @PathVariable String recordCmdStr, String channelId) { if (log.isDebugEnabled()) { log.debug("开始/停止录像API调用"); } Device device = deviceService.getDeviceByDeviceId(deviceId); - String uuid = UUID.randomUUID().toString(); - String key = DeferredResultHolder.CALLBACK_CMD_DEVICECONTROL + deviceId + channelId; - DeferredResult>> result = new DeferredResult<>(3 * 1000L); + Assert.notNull(device, "设备不存在"); + DeferredResult result = deviceService.record(device, channelId, recordCmdStr); result.onTimeout(() -> { - log.warn(String.format("开始/停止录像操作超时, 设备未返回应答指令")); - // 释放rtpserver - RequestMessage msg = new RequestMessage(); - msg.setKey(key); - msg.setId(uuid); - msg.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), "操作超时, 设备未应答")); - resultHolder.invokeAllResult(msg); + log.warn("[开始/停止录像] 操作超时, 设备未返回应答指令, {}", deviceId); + result.setResult("操作超时, 设备未应答"); }); - if (resultHolder.exist(key, null)){ - return result; - } - resultHolder.put(key, uuid, result); - try { - cmder.recordCmd(device, channelId, recordCmdStr, event -> { - RequestMessage msg = new RequestMessage(); - msg.setId(uuid); - msg.setKey(key); - msg.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), String.format("开始/停止录像操作失败,错误码: %s, %s", event.statusCode, event.msg))); - resultHolder.invokeAllResult(msg); - },null); - } catch (InvalidArgumentException | SipException | ParseException e) { - log.error("[命令发送失败] 开始/停止录像: {}", e.getMessage()); - throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage()); - } - return result; } @@ -134,32 +108,12 @@ public class DeviceControl { log.debug("布防/撤防API调用"); } Device device = deviceService.getDeviceByDeviceId(deviceId); - String key = DeferredResultHolder.CALLBACK_CMD_DEVICECONTROL + deviceId + deviceId; - String uuid =UUID.randomUUID().toString(); - try { - cmder.guardCmd(device, guardCmdStr, event -> { - RequestMessage msg = new RequestMessage(); - msg.setId(uuid); - msg.setKey(key); - msg.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), String.format("布防/撤防操作失败,错误码: %s, %s", event.statusCode, event.msg))); - resultHolder.invokeResult(msg); - },null); - } catch (InvalidArgumentException | SipException | ParseException e) { - log.error("[命令发送失败] 布防/撤防操作: {}", e.getMessage()); - throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送: " + e.getMessage()); - } - DeferredResult> result = new DeferredResult<>(3 * 1000L); - resultHolder.put(key, uuid, result); + Assert.notNull(device, "设备不存在"); + DeferredResult> result = deviceService.guard(device, guardCmdStr); result.onTimeout(() -> { - log.warn(String.format("布防/撤防操作超时, 设备未返回应答指令")); - // 释放rtpserver - RequestMessage msg = new RequestMessage(); - msg.setKey(key); - msg.setId(uuid); - msg.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), "操作超时, 设备未应答")); - resultHolder.invokeResult(msg); + log.warn("[布防/撤防] 操作超时, 设备未返回应答指令, {}", deviceId); + result.setResult(WVPResult.fail(ErrorCode.ERROR100.getCode(), "操作超时, 设备未应答")); }); - return result; } @@ -176,38 +130,19 @@ public class DeviceControl { @Parameter(name = "alarmMethod", description = "报警方式") @Parameter(name = "alarmType", description = "报警类型") @GetMapping("/reset_alarm/{deviceId}") - public DeferredResult>> resetAlarmApi(@PathVariable String deviceId, String channelId, + public DeferredResult> resetAlarmApi(@PathVariable String deviceId, String channelId, @RequestParam(required = false) String alarmMethod, @RequestParam(required = false) String alarmType) { if (log.isDebugEnabled()) { log.debug("报警复位API调用"); } Device device = deviceService.getDeviceByDeviceId(deviceId); - String uuid = UUID.randomUUID().toString(); - String key = DeferredResultHolder.CALLBACK_CMD_DEVICECONTROL + deviceId + channelId; - try { - cmder.alarmCmd(device, alarmMethod, alarmType, event -> { - RequestMessage msg = new RequestMessage(); - msg.setId(uuid); - msg.setKey(key); - msg.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), String.format("操作失败,错误码: %s, %s", event.statusCode, event.msg))); - resultHolder.invokeResult(msg); - },null); - } catch (InvalidArgumentException | SipException | ParseException e) { - log.error("[命令发送失败] 报警复位: {}", e.getMessage()); - throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage()); - } - DeferredResult>> result = new DeferredResult<>(3 * 1000L); + Assert.notNull(device, "设备不存在"); + DeferredResult> result = deviceService.resetAlarm(device, channelId, alarmMethod, alarmType); result.onTimeout(() -> { - log.warn(String.format("报警复位操作超时, 设备未返回应答指令")); - // 释放rtpserver - RequestMessage msg = new RequestMessage(); - msg.setId(uuid); - msg.setKey(key); - msg.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), "操作超时, 设备未应答")); - resultHolder.invokeResult(msg); + log.warn("[布防/撤防] 操作超时, 设备未返回应答指令, {}", deviceId); + result.setResult(WVPResult.fail(ErrorCode.ERROR100.getCode(), "操作超时, 设备未应答")); }); - resultHolder.put(key, uuid, result); return result; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceService.java index e9cc49456..43e262e9b 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceService.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceService.java @@ -173,4 +173,12 @@ public interface IDeviceService { DeferredResult deviceBasicConfig(Device device, String channelId, String name, String expiration, String heartBeatInterval, String heartBeatCount); DeferredResult deviceConfigQuery(Device device, String channelId, String configType); + + void teleboot(Device device); + + DeferredResult record(Device device, String channelId, String recordCmdStr); + + DeferredResult> guard(Device device, String guardCmdStr); + + DeferredResult> resetAlarm(Device device, String channelId, String alarmMethod, String alarmType); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java index a55174e64..5d9c0b9ca 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java @@ -36,6 +36,7 @@ import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageInfo; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.Assert; @@ -683,4 +684,85 @@ public class DeviceServiceImpl implements IDeviceService { } return result; } + + @Override + public void teleboot(Device device) { + + if (!userSetting.getServerId().equals(device.getServerId())) { + redisRpcService.teleboot(device.getServerId(), device); + } + try { + sipCommander.teleBootCmd(device); + } catch (InvalidArgumentException | SipException | ParseException e) { + log.error("[命令发送失败] 远程启动: {}", e.getMessage()); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage()); + } + } + + @Override + public DeferredResult record(Device device, String channelId, String recordCmdStr) { + + if (!userSetting.getServerId().equals(device.getServerId())) { + String result = redisRpcService.recordControl(device.getServerId(), device, channelId, recordCmdStr); + DeferredResult deferredResult = new DeferredResult(3 * 1000L); + deferredResult.setResult(result); + return deferredResult; + } + + DeferredResult result = new DeferredResult<>(3 * 1000L); + try { + sipCommander.recordCmd(device, channelId, recordCmdStr, event -> { + result.setResult(String.format("开始/停止录像操作失败,错误码: %s, %s", event.statusCode, event.msg)); + },null); + } catch (InvalidArgumentException | SipException | ParseException e) { + log.error("[命令发送失败] 开始/停止录像: {}", e.getMessage()); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage()); + } + + return result; + } + + @Override + public DeferredResult> guard(Device device, String guardCmdStr) { + if (!userSetting.getServerId().equals(device.getServerId())) { + WVPResult result = redisRpcService.guard(device.getServerId(), device, guardCmdStr); + DeferredResult> deferredResult = new DeferredResult<>(3 * 1000L); + deferredResult.setResult(result); + return deferredResult; + } + + DeferredResult> result = new DeferredResult<>(3 * 1000L); + try { + sipCommander.guardCmd(device, guardCmdStr, event -> { + result.setResult(WVPResult.fail(ErrorCode.ERROR100.getCode(), String.format("布防/撤防操作失败,错误码: %s, %s", event.statusCode, event.msg))); + },null); + } catch (InvalidArgumentException | SipException | ParseException e) { + log.error("[命令发送失败] 布防/撤防操作: {}", e.getMessage()); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送: " + e.getMessage()); + } + + return result; + } + + @Override + public DeferredResult> resetAlarm(Device device, String channelId, String alarmMethod, String alarmType) { + if (!userSetting.getServerId().equals(device.getServerId())) { + WVPResult result = redisRpcService.resetAlarm(device.getServerId(), device, channelId, alarmMethod, alarmType); + DeferredResult> deferredResult = new DeferredResult<>(3 * 1000L); + deferredResult.setResult(result); + return deferredResult; + } + + DeferredResult> result = new DeferredResult<>(3 * 1000L); + try { + sipCommander.alarmCmd(device, alarmMethod, alarmType, event -> { + result.setResult(WVPResult.fail(ErrorCode.ERROR100.getCode(), String.format("操作失败,错误码: %s, %s", event.statusCode, event.msg))); + },null); + } catch (InvalidArgumentException | SipException | ParseException e) { + log.error("[命令发送失败] 布防/撤防操作: {}", e.getMessage()); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送: " + e.getMessage()); + } + + return result; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java index 7a362db7f..eb44b204e 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java @@ -43,4 +43,10 @@ public interface IRedisRpcService { String deviceBasicConfig(String serverId, Device device, String channelId, String name, String expiration, String heartBeatInterval, String heartBeatCount); String deviceConfigQuery(String serverId, Device device, String channelId, String configType); + + void teleboot(String serverId, Device device); + + String recordControl(String serverId, Device device, String channelId, String recordCmdStr); + + WVPResult guard(String serverId, Device device, String guardCmdStr); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcDeviceController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcDeviceController.java index 8180cd9c4..10067fc1f 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcDeviceController.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcDeviceController.java @@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.service.redisMsg.control; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest; @@ -165,6 +166,110 @@ public class RedisRpcDeviceController extends RpcController { return response; } + @RedisRpcMapping("teleboot") + public RedisRpcResponse teleboot(RedisRpcRequest request) { + String deviceId = request.getParam().toString(); + + Device device = deviceService.getDeviceByDeviceId(deviceId); + + RedisRpcResponse response = request.getResponse(); + if (device == null || !userSetting.getServerId().equals(device.getServerId())) { + response.setStatusCode(ErrorCode.ERROR400.getCode()); + response.setBody("param error"); + return response; + } + try { + deviceService.teleboot(device); + }catch (ControllerException e) { + response.setStatusCode(e.getCode()); + response.setBody(e.getMsg()); + return response; + } + response.setStatusCode(ErrorCode.SUCCESS.getCode()); + response.setBody(ErrorCode.SUCCESS.getMsg()); + return response; + } + + @RedisRpcMapping("record") + public RedisRpcResponse record(RedisRpcRequest request) { + JSONObject paramJson = JSONObject.parseObject(request.getParam().toString()); + String deviceId = paramJson.getString("deviceId"); + String channelId = paramJson.getString("channelId"); + String recordCmdStr = paramJson.getString("recordCmdStr"); + + Device device = deviceService.getDeviceByDeviceId(deviceId); + + RedisRpcResponse response = request.getResponse(); + if (device == null || !userSetting.getServerId().equals(device.getServerId())) { + response.setStatusCode(ErrorCode.ERROR400.getCode()); + response.setBody("param error"); + return response; + } + try { + DeferredResult deferredResult = deviceService.record(device, channelId, recordCmdStr); + deferredResult.onCompletion(() ->{ + String resultStr = (String)deferredResult.getResult(); + response.setStatusCode(ErrorCode.SUCCESS.getCode()); + response.setBody(resultStr); + // 手动发送结果 + sendResponse(response); + }); + deferredResult.onTimeout(() -> { + log.warn("设备录像控制操作超时, 设备未返回应答指令"); + JSONObject json = new JSONObject(); + json.put("DeviceID", device.getDeviceId()); + json.put("Status", "Timeout"); + json.put("Description", "设备录像控制操作超时, 设备未返回应答指令"); + response.setStatusCode(ErrorCode.SUCCESS.getCode()); + response.setBody(json); + // 手动发送结果 + sendResponse(response); + }); + }catch (ControllerException e) { + response.setStatusCode(e.getCode()); + response.setBody(e.getMsg()); + sendResponse(response); + } + return null; + } + + @RedisRpcMapping("guard") + public RedisRpcResponse guard(RedisRpcRequest request) { + JSONObject paramJson = JSONObject.parseObject(request.getParam().toString()); + String deviceId = paramJson.getString("deviceId"); + String guardCmdStr = paramJson.getString("guardCmdStr"); + + Device device = deviceService.getDeviceByDeviceId(deviceId); + + RedisRpcResponse response = request.getResponse(); + if (device == null || !userSetting.getServerId().equals(device.getServerId())) { + response.setStatusCode(ErrorCode.ERROR400.getCode()); + response.setBody("param error"); + return response; + } + try { + DeferredResult> deferredResult = deviceService.guard(device, guardCmdStr); + deferredResult.onCompletion(() ->{ + response.setStatusCode(ErrorCode.SUCCESS.getCode()); + response.setBody(deferredResult.getResult()); + // 手动发送结果 + sendResponse(response); + }); + deferredResult.onTimeout(() -> { + log.warn("设备录像控制操作超时, 设备未返回应答指令"); + response.setStatusCode(ErrorCode.SUCCESS.getCode()); + response.setBody(WVPResult.fail(ErrorCode.ERROR100.getCode(), "操作超时, 设备未应答")); + // 手动发送结果 + sendResponse(response); + }); + }catch (ControllerException e) { + response.setStatusCode(e.getCode()); + response.setBody(e.getMsg()); + sendResponse(response); + } + return null; + } + } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java index ca8eda9e0..169d4c04a 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java @@ -5,6 +5,7 @@ import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse; @@ -292,4 +293,37 @@ public class RedisRpcServiceImpl implements IRedisRpcService { RedisRpcResponse response = redisRpcConfig.request(request, 50, TimeUnit.MILLISECONDS); return response.getBody().toString(); } + + @Override + public void teleboot(String serverId, Device device) { + RedisRpcRequest request = buildRequest("device/teleboot", device.getDeviceId()); + request.setToId(serverId); + RedisRpcResponse response = redisRpcConfig.request(request, 50, TimeUnit.MILLISECONDS); + if (response.getStatusCode() != ErrorCode.SUCCESS.getCode()) { + throw new ControllerException(response.getStatusCode(), response.getBody().toString()); + } + } + + @Override + public String recordControl(String serverId, Device device, String channelId, String recordCmdStr) { + JSONObject jsonObject = new JSONObject(); + jsonObject.put("device", device.getDeviceId()); + jsonObject.put("channelId", channelId); + jsonObject.put("recordCmdStr", recordCmdStr); + RedisRpcRequest request = buildRequest("device/record", jsonObject); + request.setToId(serverId); + RedisRpcResponse response = redisRpcConfig.request(request, 50, TimeUnit.MILLISECONDS); + return response.getBody().toString(); + } + + @Override + public WVPResult guard(String serverId, Device device, String guardCmdStr) { + JSONObject jsonObject = new JSONObject(); + jsonObject.put("device", device.getDeviceId()); + jsonObject.put("guardCmdStr", guardCmdStr); + RedisRpcRequest request = buildRequest("device/guard", jsonObject); + request.setToId(serverId); + RedisRpcResponse response = redisRpcConfig.request(request, 50, TimeUnit.MILLISECONDS); + return JSON.parseObject(response.getBody().toString(), WVPResult.class); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/streamPush/controller/StreamPushController.java b/src/main/java/com/genersoft/iot/vmp/streamPush/controller/StreamPushController.java index d6c24cb53..67fb1cfa1 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamPush/controller/StreamPushController.java +++ b/src/main/java/com/genersoft/iot/vmp/streamPush/controller/StreamPushController.java @@ -260,4 +260,12 @@ public class StreamPushController { }, null, null); return result; } + + @GetMapping(value = "/forceClose") + @ResponseBody + @Operation(summary = "强制停止推流", security = @SecurityRequirement(name = JwtUtils.HEADER)) + public void stop(String app, String stream){ + + streamPushPlayService.stop(app, stream); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/streamPush/service/IStreamPushPlayService.java b/src/main/java/com/genersoft/iot/vmp/streamPush/service/IStreamPushPlayService.java index 9b75ffc7a..0b42a2e38 100644 --- a/src/main/java/com/genersoft/iot/vmp/streamPush/service/IStreamPushPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/streamPush/service/IStreamPushPlayService.java @@ -6,5 +6,6 @@ import com.genersoft.iot.vmp.service.bean.ErrorCallback; public interface IStreamPushPlayService { void start(Integer id, ErrorCallback callback, String platformDeviceId, String platformName ); - void stop(Integer streamPushId); + void stop(String app, String stream); + } diff --git a/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushPlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushPlayServiceImpl.java index 16fdc4cfc..68e54343d 100644 --- a/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushPlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushPlayServiceImpl.java @@ -117,7 +117,14 @@ public class StreamPushPlayServiceImpl implements IStreamPushPlayService { } @Override - public void stop(Integer streamPushId) { - // 推流无需主动停止 + public void stop(String app, String stream) { + StreamPush streamPush = streamPushMapper.selectByAppAndStream(app, stream); + if (streamPush == null || !streamPush.isPushing()) { + return; + } + String mediaServerId = streamPush.getMediaServerId(); + MediaServer mediaServer = mediaServerService.getOne(mediaServerId); + Assert.notNull(mediaServer, "未找到使用的节点"); + mediaServerService.closeStreams(mediaServer, app, stream); } }