Merge branch 'wvp-28181-2.0' into main-dev

# Conflicts:
#	src/main/java/com/genersoft/iot/vmp/gb28181/bean/Device.java
#	src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
#	src/main/java/com/genersoft/iot/vmp/service/IPlayService.java
#	src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
#	src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
#	src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMapper.java
This commit is contained in:
648540858
2024-02-27 10:22:13 +08:00
32 changed files with 1104 additions and 253 deletions

View File

@@ -1,7 +1,6 @@
package com.genersoft.iot.vmp.common;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import io.swagger.v3.oas.annotations.media.Schema;
/**
* 记录每次发送invite消息的状态
@@ -125,20 +124,4 @@ public class InviteInfo {
this.streamMode = streamMode;
}
/*=========================设备主子码流逻辑START====================*/
@Schema(description = "是否为子码流(true-是false-主码流)")
private boolean subStream;
public boolean isSubStream() {
return subStream;
}
public void setSubStream(boolean subStream) {
this.subStream = subStream;
}
}

View File

@@ -452,27 +452,11 @@ public class Device {
public void setSipTransactionInfo(SipTransactionInfo sipTransactionInfo) {
this.sipTransactionInfo = sipTransactionInfo;
}
public boolean isBroadcastPushAfterAck() {
return broadcastPushAfterAck;
}
/*======================设备主子码流逻辑START=========================*/
@Schema(description = "开启主子码流切换的开关false-不开启true-开启)")
private boolean switchPrimarySubStream;
public boolean isSwitchPrimarySubStream() {
return switchPrimarySubStream;
}
public void setSwitchPrimarySubStream(boolean switchPrimarySubStream) {
this.switchPrimarySubStream = switchPrimarySubStream;
}
/*======================设备主子码流逻辑END=========================*/
public boolean isBroadcastPushAfterAck() {
return broadcastPushAfterAck;
}
public void setBroadcastPushAfterAck(boolean broadcastPushAfterAck) {
this.broadcastPushAfterAck = broadcastPushAfterAck;
}
public void setBroadcastPushAfterAck(boolean broadcastPushAfterAck) {
this.broadcastPushAfterAck = broadcastPushAfterAck;
}
}

View File

@@ -246,6 +246,10 @@ public class DeviceChannel {
@Schema(description = "GPS的更新时间")
private String gpsTime;
@Schema(description = "码流标识,优先级高于设备中码流标识," +
"用于选择码流时组成码流标识。默认为null不设置。可选值: stream/streamnumber/streamprofile/streamMode")
private String streamIdentification;
public int getId() {
return id;
}
@@ -574,4 +578,12 @@ public class DeviceChannel {
public void setGpsTime(String gpsTime) {
this.gpsTime = gpsTime;
}
public String getStreamIdentification() {
return streamIdentification;
}
public void setStreamIdentification(String streamIdentification) {
this.streamIdentification = streamIdentification;
}
}

View File

@@ -0,0 +1,44 @@
package com.genersoft.iot.vmp.gb28181.bean;
/**
* 码流索引标识
*/
public enum GbSteamIdentification {
/**
* 主码流 stream:0
* 子码流 stream:1s
*/
streamMain("stream", new String[]{"0","1"}),
/**
* 国标28181-2022定义的方式
* 主码流 streamnumber:0
* 子码流 streamnumber:1
*/
streamnumber("streamnumber", new String[]{"0","1"}),
/**
* 主码流 streamprofile:0
* 子码流 streamprofile:1
*/
streamprofile("streamprofile", new String[]{"0","1"}),
/**
* 适用的品牌: TP-LINK
*/
streamMode("streamMode", new String[]{"main","sub"}),
;
GbSteamIdentification(String value, String[] indexArray) {
this.value = value;
this.indexArray = indexArray;
}
private String value;
private String[] indexArray;
public String getValue() {
return value;
}
public String[] getIndexArray() {
return indexArray;
}
}

View File

@@ -52,7 +52,7 @@ public class SubscribeHolder {
Runnable runnable = dynamicTask.get(taskOverdueKey);
if (runnable instanceof ISubscribeTask) {
ISubscribeTask subscribeTask = (ISubscribeTask) runnable;
subscribeTask.stop();
subscribeTask.stop(null);
}
// 添加任务处理订阅过期
dynamicTask.stop(taskOverdueKey);
@@ -87,7 +87,7 @@ public class SubscribeHolder {
Runnable runnable = dynamicTask.get(taskOverdueKey);
if (runnable instanceof ISubscribeTask) {
ISubscribeTask subscribeTask = (ISubscribeTask) runnable;
subscribeTask.stop();
subscribeTask.stop(null);
}
// 添加任务处理订阅过期
dynamicTask.stop(taskOverdueKey);

View File

@@ -2,6 +2,8 @@ package com.genersoft.iot.vmp.gb28181.event;
import com.genersoft.iot.vmp.gb28181.bean.DeviceNotFoundEvent;
import gov.nist.javax.sip.message.SIPRequest;
import gov.nist.javax.sip.message.SIPResponse;
import org.apache.commons.lang3.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
@@ -11,8 +13,7 @@ import javax.sip.DialogTerminatedEvent;
import javax.sip.ResponseEvent;
import javax.sip.TimeoutEvent;
import javax.sip.TransactionTerminatedEvent;
import javax.sip.header.CallIdHeader;
import javax.sip.message.Response;
import javax.sip.header.WarningHeader;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -97,14 +98,27 @@ public class SipSubscribe {
this.event = event;
if (event instanceof ResponseEvent) {
ResponseEvent responseEvent = (ResponseEvent)event;
Response response = responseEvent.getResponse();
SIPResponse response = (SIPResponse)responseEvent.getResponse();
this.type = EventResultType.response;
if (response != null) {
this.msg = response.getReasonPhrase();
WarningHeader warningHeader = (WarningHeader)response.getHeader(WarningHeader.NAME);
if (warningHeader != null && !ObjectUtils.isEmpty(warningHeader.getText())) {
this.msg = "";
if (warningHeader.getCode() > 0) {
this.msg += warningHeader.getCode() + ":";
}
if (warningHeader.getAgent() != null) {
this.msg += warningHeader.getCode() + ":";
}
if (warningHeader.getText() != null) {
this.msg += warningHeader.getText();
}
}else {
this.msg = response.getReasonPhrase();
}
this.statusCode = response.getStatusCode();
this.callId = response.getCallIdHeader().getCallId();
}
this.callId = ((CallIdHeader)response.getHeader(CallIdHeader.NAME)).getCallId();
}else if (event instanceof TimeoutEvent) {
TimeoutEvent timeoutEvent = (TimeoutEvent)event;
this.type = EventResultType.timeout;

View File

@@ -0,0 +1,86 @@
package com.genersoft.iot.vmp.gb28181.session;
import com.genersoft.iot.vmp.common.CommonCallback;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.Calendar;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 通用回调管理
*/
@Component
public class CommonSessionManager {
public static Map<String, CommonSession> callbackMap = new ConcurrentHashMap<>();
/**
* 存储回调相关的信息
*/
class CommonSession{
public String session;
public long createTime;
public int timeout;
public CommonCallback<Object> callback;
public CommonCallback<String> timeoutCallback;
}
/**
* 添加回调
* @param sessionId 唯一标识
* @param callback 回调
* @param timeout 超时时间, 单位分钟
*/
public void add(String sessionId, CommonCallback<Object> callback, CommonCallback<String> timeoutCallback,
Integer timeout) {
CommonSession commonSession = new CommonSession();
commonSession.session = sessionId;
commonSession.callback = callback;
commonSession.createTime = System.currentTimeMillis();
if (timeoutCallback != null) {
commonSession.timeoutCallback = timeoutCallback;
}
if (timeout != null) {
commonSession.timeout = timeout;
}
callbackMap.put(sessionId, commonSession);
}
public void add(String sessionId, CommonCallback<Object> callback) {
add(sessionId, callback, null, 1);
}
public CommonCallback<Object> get(String sessionId, boolean destroy) {
CommonSession commonSession = callbackMap.get(sessionId);
if (destroy) {
callbackMap.remove(sessionId);
}
return commonSession.callback;
}
public CommonCallback<Object> get(String sessionId) {
return get(sessionId, false);
}
public void delete(String sessionID) {
callbackMap.remove(sessionID);
}
@Scheduled(fixedRate= 60) //每分钟执行一次
public void execute(){
Calendar cal = Calendar.getInstance();
cal.add(Calendar.MINUTE, -1);
for (String session : callbackMap.keySet()) {
if (callbackMap.get(session).createTime < cal.getTimeInMillis()) {
// 超时
if (callbackMap.get(session).timeoutCallback != null) {
callbackMap.get(session).timeoutCallback.run("timeout");
}
callbackMap.remove(session);
}
}
}
}

View File

@@ -1,10 +1,10 @@
package com.genersoft.iot.vmp.gb28181.task;
import javax.sip.DialogState;
import com.genersoft.iot.vmp.common.CommonCallback;
/**
* @author lin
*/
public interface ISubscribeTask extends Runnable{
void stop();
void stop(CommonCallback<Boolean> callback);
}

View File

@@ -1,5 +1,6 @@
package com.genersoft.iot.vmp.gb28181.task.impl;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
@@ -7,14 +8,13 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import gov.nist.javax.sip.message.SIPRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import javax.sip.*;
import javax.sip.DialogState;
import javax.sip.InvalidArgumentException;
import javax.sip.ResponseEvent;
import javax.sip.SipException;
import javax.sip.header.ToHeader;
import java.text.ParseException;
import java.util.Timer;
import java.util.TimerTask;
/**
* 目录订阅任务
@@ -71,7 +71,7 @@ public class CatalogSubscribeTask implements ISubscribeTask {
}
@Override
public void stop() {
public void stop(CommonCallback<Boolean> callback) {
/**
* dialog 的各个状态
* EARLY-> Early state状态-初始请求发送以后,收到了一个临时响应消息
@@ -94,6 +94,9 @@ public class CatalogSubscribeTask implements ISubscribeTask {
// 成功
logger.info("[取消目录订阅]成功: {}", device.getDeviceId());
}
if (callback != null) {
callback.run(event.getResponse().getRawContent() != null);
}
},eventResult -> {
// 失败
logger.warn("[取消目录订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg);

View File

@@ -1,20 +1,9 @@
package com.genersoft.iot.vmp.gb28181.task.impl;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.service.IPlatformService;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.utils.SpringBeanFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import javax.sip.DialogState;
import java.util.List;
/**
* 向已经订阅(移动位置)的上级发送MobilePosition消息
@@ -38,7 +27,7 @@ public class MobilePositionSubscribeHandlerTask implements ISubscribeTask {
}
@Override
public void stop() {
public void stop(CommonCallback<Boolean> callback) {
}
}

View File

@@ -1,21 +1,19 @@
package com.genersoft.iot.vmp.gb28181.task.impl;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import gov.nist.javax.sip.message.SIPRequest;
import gov.nist.javax.sip.message.SIPResponse;
import org.dom4j.Element;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import javax.sip.*;
import javax.sip.InvalidArgumentException;
import javax.sip.ResponseEvent;
import javax.sip.SipException;
import javax.sip.header.ToHeader;
import java.text.ParseException;
import java.util.Timer;
import java.util.TimerTask;
/**
* 移动位置订阅的定时更新
@@ -70,7 +68,7 @@ public class MobilePositionSubscribeTask implements ISubscribeTask {
}
@Override
public void stop() {
public void stop(CommonCallback<Boolean> callback) {
/**
* dialog 的各个状态
* EARLY-> Early state状态-初始请求发送以后,收到了一个临时响应消息
@@ -92,6 +90,9 @@ public class MobilePositionSubscribeTask implements ISubscribeTask {
// 成功
logger.info("[取消移动位置订阅]成功: {}", device.getDeviceId());
}
if (callback != null) {
callback.run(event.getResponse().getRawContent() != null);
}
},eventResult -> {
// 失败
logger.warn("[取消移动位置订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg);

View File

@@ -5,6 +5,7 @@ import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarm;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
@@ -97,9 +98,9 @@ public interface ISIPCommander {
/**
* 请求预览视频流
* @param device 视频设备
* @param channelId 预览通道
* @param channel 预览通道
*/
void playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, ZlmHttpHookSubscribe.Event event, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException;
void playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, DeviceChannel channel, ZlmHttpHookSubscribe.Event event, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException;
/**
* 请求回放视频流

View File

@@ -7,6 +7,10 @@ import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.SipLayer;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarm;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.SIPSender;
@@ -266,12 +270,12 @@ public class SIPCommander implements ISIPCommander {
* 请求预览视频流
*
* @param device 视频设备
* @param channelId 预览通道
* @param channel 预览通道
* @param event hook订阅
* @param errorEvent sip错误订阅
*/
@Override
public void playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
public void playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, DeviceChannel channel,
ZlmHttpHookSubscribe.Event event, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException {
String stream = ssrcInfo.getStream();
@@ -295,7 +299,7 @@ public class SIPCommander implements ISIPCommander {
}
StringBuffer content = new StringBuffer(200);
content.append("v=0\r\n");
content.append("o=" + channelId + " 0 0 IN IP4 " + sdpIp + "\r\n");
content.append("o=" + channel.getChannelId() + " 0 0 IN IP4 " + sdpIp + "\r\n");
content.append("s=Play\r\n");
content.append("c=IN IP4 " + sdpIp + "\r\n");
content.append("t=0 0\r\n");
@@ -346,20 +350,8 @@ public class SIPCommander implements ISIPCommander {
}
}
if( device.isSwitchPrimarySubStream() ){
if("TP-LINK".equals(device.getManufacturer())){
if (device.isSwitchPrimarySubStream()){
content.append("a=streamMode:sub\r\n");
}else {
content.append("a=streamMode:main\r\n");
}
}else {
if (device.isSwitchPrimarySubStream()){
content.append("a=streamprofile:1\r\n");
}else {
content.append("a=streamprofile:0\r\n");
}
}
if (!ObjectUtils.isEmpty(channel.getStreamIdentification())) {
content.append("a=" + channel.getStreamIdentification() + "\r\n");
}
content.append("y=" + ssrcInfo.getSsrc() + "\r\n");//ssrc
@@ -368,16 +360,16 @@ public class SIPCommander implements ISIPCommander {
Request request = headerProvider.createInviteRequest(device, channelId, content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null, ssrcInfo.getSsrc(),sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()));
Request request = headerProvider.createInviteRequest(device, channel.getChannelId(), content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null, ssrcInfo.getSsrc(),sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()));
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, (e -> {
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
streamSession.remove(device.getDeviceId(), channel.getChannelId(), ssrcInfo.getStream());
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
errorEvent.response(e);
}), e -> {
ResponseEvent responseEvent = (ResponseEvent) e.event;
SIPResponse response = (SIPResponse) responseEvent.getResponse();
String callId = response.getCallIdHeader().getCallId();
streamSession.put(device.getDeviceId(), channelId, callId, stream, ssrcInfo.getSsrc(), mediaServerItem.getId(), response,
streamSession.put(device.getDeviceId(), channel.getChannelId(), callId, stream, ssrcInfo.getSsrc(), mediaServerItem.getId(), response,
InviteSessionType.PLAY);
okEvent.response(e);
});

View File

@@ -6,7 +6,6 @@ import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
@@ -38,7 +37,6 @@ import javax.sip.SipException;
import javax.sip.header.FromHeader;
import javax.sip.message.Response;
import java.text.ParseException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -222,7 +220,6 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
mobilePosition.getLongitude(), mobilePosition.getLatitude());
mobilePosition.setReportSource("Mobile Position");
// 更新device channel 的经纬度
DeviceChannel deviceChannel = new DeviceChannel();
deviceChannel.setDeviceId(device.getDeviceId());
@@ -242,6 +239,8 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
}
storager.updateChannelPosition(deviceChannel);
// 向关联了该通道并且开启移动位置订阅的上级平台发送移动位置订阅消息
// 发送redis消息。 通知位置信息的变化
JSONObject jsonObject = new JSONObject();

View File

@@ -82,8 +82,9 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp
device.setIp(remoteAddressInfo.getIp());
// 设备地址变化会引起目录订阅任务失效,需要重新添加
if (device.getSubscribeCycleForCatalog() > 0) {
deviceService.removeCatalogSubscribe(device);
deviceService.addCatalogSubscribe(device);
deviceService.removeCatalogSubscribe(device, result->{
deviceService.addCatalogSubscribe(device);
});
}
}
if (device.getKeepaliveTime() == null) {

View File

@@ -87,4 +87,9 @@ public interface IDeviceChannelService {
* 直接批量添加
*/
void batchAddChannel(List<DeviceChannel> deviceChannels);
/**
* 修改通道的码流类型
*/
void updateChannelStreamIdentification(DeviceChannel channel);
}

View File

@@ -1,5 +1,6 @@
package com.genersoft.iot.vmp.service;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo;
@@ -39,7 +40,7 @@ public interface IDeviceService {
* @param device 设备信息
* @return 布尔
*/
boolean removeCatalogSubscribe(Device device);
boolean removeCatalogSubscribe(Device device, CommonCallback<Boolean> callback);
/**
* 添加移动位置订阅
@@ -53,7 +54,7 @@ public interface IDeviceService {
* @param device 设备信息
* @return 布尔
*/
boolean removeMobilePositionSubscribe(Device device);
boolean removeMobilePositionSubscribe(Device device, CommonCallback<Boolean> callback);
/**
* 移除移动位置订阅

View File

@@ -6,6 +6,7 @@ import com.genersoft.iot.vmp.conf.exception.ServiceException;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
@@ -25,7 +26,7 @@ import java.util.Map;
*/
public interface IPlayService {
void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, DeviceChannel channelId,
ErrorCallback<Object> callback);
SSRCInfo play(MediaServerItem mediaServerItem, String deviceId, String channelId, String ssrc, ErrorCallback<Object> callback);

View File

@@ -18,6 +18,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import java.util.ArrayList;
import java.util.HashMap;
@@ -267,5 +268,16 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
}
}
@Override
public void updateChannelStreamIdentification(DeviceChannel channel) {
assert !ObjectUtils.isEmpty(channel.getDeviceId());
assert !ObjectUtils.isEmpty(channel.getStreamIdentification());
if (ObjectUtils.isEmpty(channel.getStreamIdentification())) {
logger.info("[重置通道码流类型] 设备: {}, 码流: {}", channel.getDeviceId(), channel.getStreamIdentification());
}else {
logger.info("[更新通道码流类型] 设备: {}, 通道:{} 码流: {}", channel.getDeviceId(), channel.getChannelId(),
channel.getStreamIdentification());
}
channelMapper.updateChannelStreamIdentification(channel);
}
}

View File

@@ -1,10 +1,10 @@
package com.genersoft.iot.vmp.service.impl;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
@@ -145,10 +145,6 @@ public class DeviceServiceImpl implements IDeviceService {
}
sync(device);
}else {
if (deviceInDb != null) {
device.setSwitchPrimarySubStream(deviceInDb.isSwitchPrimarySubStream());
}
if(!device.isOnLine()){
device.setOnLine(true);
device.setCreateTime(now);
@@ -238,12 +234,8 @@ public class DeviceServiceImpl implements IDeviceService {
}
}
// 移除订阅
removeCatalogSubscribe(device);
removeMobilePositionSubscribe(device);
if (userSetting.getDeviceStatusNotify()) {
// 发送redis消息
redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), null, false);
}
removeCatalogSubscribe(device, null);
removeMobilePositionSubscribe(device, null);
List<AudioBroadcastCatch> audioBroadcastCatches = audioBroadcastManager.get(deviceId);
if (audioBroadcastCatches.size() > 0) {
@@ -281,7 +273,7 @@ public class DeviceServiceImpl implements IDeviceService {
}
@Override
public boolean removeCatalogSubscribe(Device device) {
public boolean removeCatalogSubscribe(Device device, CommonCallback<Boolean> callback) {
if (device == null || device.getSubscribeCycleForCatalog() < 0) {
return false;
}
@@ -291,7 +283,7 @@ public class DeviceServiceImpl implements IDeviceService {
Runnable runnable = dynamicTask.get(taskKey);
if (runnable instanceof ISubscribeTask) {
ISubscribeTask subscribeTask = (ISubscribeTask) runnable;
subscribeTask.stop();
subscribeTask.stop(callback);
}
}
dynamicTask.stop(taskKey);
@@ -314,7 +306,7 @@ public class DeviceServiceImpl implements IDeviceService {
}
@Override
public boolean removeMobilePositionSubscribe(Device device) {
public boolean removeMobilePositionSubscribe(Device device, CommonCallback<Boolean> callback) {
if (device == null || device.getSubscribeCycleForCatalog() < 0) {
return false;
}
@@ -324,7 +316,7 @@ public class DeviceServiceImpl implements IDeviceService {
Runnable runnable = dynamicTask.get(taskKey);
if (runnable instanceof ISubscribeTask) {
ISubscribeTask subscribeTask = (ISubscribeTask) runnable;
subscribeTask.stop();
subscribeTask.stop(callback);
}
}
dynamicTask.stop(taskKey);
@@ -515,21 +507,6 @@ public class DeviceServiceImpl implements IDeviceService {
logger.warn("更新设备时未找到设备信息");
return;
}
if(deviceInStore.isSwitchPrimarySubStream() != device.isSwitchPrimarySubStream()){
//当修改设备的主子码流开关时,需要校验是否存在流,如果存在流则直接关闭
List<SsrcTransaction> ssrcTransactionForAll = streamSession.getSsrcTransactionForAll(device.getDeviceId(), null, null, null);
if(ssrcTransactionForAll != null){
for (SsrcTransaction ssrcTransaction: ssrcTransactionForAll) {
try {
cmder.streamByeCmd(device, ssrcTransaction.getChannelId(), ssrcTransaction.getStream(), null, null);
} catch (InvalidArgumentException | SsrcTransactionNotFoundException | ParseException | SipException e) {
throw new RuntimeException(e);
}
}
}
deviceChannelMapper.clearPlay(device.getDeviceId());
inviteStreamService.clearInviteInfo(device.getDeviceId());
}
if (!ObjectUtils.isEmpty(device.getName())) {
deviceInStore.setName(device.getName());
@@ -552,39 +529,54 @@ public class DeviceServiceImpl implements IDeviceService {
if (!ObjectUtils.isEmpty(device.getStreamMode())) {
deviceInStore.setStreamMode(device.getStreamMode());
}
// 目录订阅相关的信息
if (deviceInStore.getSubscribeCycleForCatalog() != device.getSubscribeCycleForCatalog()) {
if (device.getSubscribeCycleForCatalog() > 0) {
// 若已开启订阅,但订阅周期不同,则先取消
if (deviceInStore.getSubscribeCycleForCatalog() != 0) {
removeCatalogSubscribe(deviceInStore);
removeCatalogSubscribe(deviceInStore, result->{
// 开启订阅
deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog());
addCatalogSubscribe(deviceInStore);
// 因为是异步执行,需要在这里更新下数据
deviceMapper.updateCustom(deviceInStore);
redisCatchStorage.updateDevice(deviceInStore);
});
}else {
// 开启订阅
deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog());
addCatalogSubscribe(deviceInStore);
}
// 开启订阅
deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog());
addCatalogSubscribe(deviceInStore);
}else if (device.getSubscribeCycleForCatalog() == 0) {
// 取消订阅
deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog());
removeCatalogSubscribe(deviceInStore);
deviceInStore.setSubscribeCycleForCatalog(0);
removeCatalogSubscribe(deviceInStore, null);
}
}
// 移动位置订阅相关的信息
if (device.getSubscribeCycleForMobilePosition() > 0) {
if (deviceInStore.getSubscribeCycleForMobilePosition() == 0 || deviceInStore.getSubscribeCycleForMobilePosition() != device.getSubscribeCycleForMobilePosition()) {
deviceInStore.setMobilePositionSubmissionInterval(device.getMobilePositionSubmissionInterval());
deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition());
// 开启订阅
addMobilePositionSubscribe(deviceInStore);
}
}else if (device.getSubscribeCycleForMobilePosition() == 0) {
if (deviceInStore.getSubscribeCycleForMobilePosition() != 0) {
deviceInStore.setMobilePositionSubmissionInterval(device.getMobilePositionSubmissionInterval());
deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition());
if (deviceInStore.getSubscribeCycleForMobilePosition() != device.getSubscribeCycleForMobilePosition()) {
if (device.getSubscribeCycleForMobilePosition() > 0) {
// 若已开启订阅,但订阅周期不同,则先取消
if (deviceInStore.getSubscribeCycleForMobilePosition() != 0) {
removeMobilePositionSubscribe(deviceInStore, result->{
// 开启订阅
deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition());
addMobilePositionSubscribe(deviceInStore);
// 因为是异步执行,需要在这里更新下数据
deviceMapper.updateCustom(deviceInStore);
redisCatchStorage.updateDevice(deviceInStore);
});
}else {
// 开启订阅
deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition());
addMobilePositionSubscribe(deviceInStore);
}
}else if (device.getSubscribeCycleForMobilePosition() == 0) {
// 取消订阅
removeMobilePositionSubscribe(deviceInStore);
deviceInStore.setSubscribeCycleForCatalog(0);
removeCatalogSubscribe(deviceInStore, null);
}
}
if (deviceInStore.getGeoCoordSys() != null) {
@@ -603,10 +595,9 @@ public class DeviceServiceImpl implements IDeviceService {
deviceInStore.setSsrcCheck(device.isSsrcCheck());
//作为消息通道
deviceInStore.setAsMessageChannel(device.isAsMessageChannel());
// 更新redis
deviceMapper.updateCustom(deviceInStore);
redisCatchStorage.removeDevice(deviceInStore.getDeviceId());
redisCatchStorage.updateDevice(deviceInStore);
}
@Override

View File

@@ -20,6 +20,8 @@ import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
@@ -29,6 +31,10 @@ import com.genersoft.iot.vmp.media.zlm.*;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRecordMp4;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnRecordMp4HookParam;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
@@ -39,9 +45,12 @@ import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
import com.genersoft.iot.vmp.service.bean.DownloadFileInfo;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.storager.dao.CloudRecordServiceMapper;
import com.genersoft.iot.vmp.utils.CloudRecordUtils;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult;
@@ -80,7 +89,7 @@ public class PlayServiceImpl implements IPlayService {
private IVideoManagerStorage storager;
@Autowired
private SIPCommander cmder;
private ISIPCommander cmder;
@Autowired
private AudioBroadcastManager audioBroadcastManager;
@@ -121,6 +130,9 @@ public class PlayServiceImpl implements IPlayService {
@Autowired
private UserSetting userSetting;
@Autowired
private IDeviceChannelService channelService;
@Autowired
private SipConfig sipConfig;
@@ -163,6 +175,11 @@ public class PlayServiceImpl implements IPlayService {
logger.warn("[点播] 单端口收流时不支持TCP主动方式收流 deviceId: {},channelId:{}", deviceId, channelId);
throw new ControllerException(ErrorCode.ERROR100.getCode(), "单端口收流时不支持TCP主动方式收流");
}
DeviceChannel channel = channelService.getOne(deviceId, channelId);
if (channel == null) {
logger.warn("[点播] 未找到通道 deviceId: {},channelId:{}", deviceId, channelId);
throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到通道");
}
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
if (inviteInfo != null ) {
if (inviteInfo.getStreamInfo() == null) {
@@ -211,7 +228,7 @@ public class PlayServiceImpl implements IPlayService {
null);
return null;
}
play(mediaServerItem, ssrcInfo, device, channelId, callback);
play(mediaServerItem, ssrcInfo, device, channel, callback);
return ssrcInfo;
}
@@ -358,7 +375,7 @@ public class PlayServiceImpl implements IPlayService {
@Override
public void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
public void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, DeviceChannel channel,
ErrorCallback<Object> callback) {
if (mediaServerItem == null || ssrcInfo == null) {
@@ -367,111 +384,109 @@ public class PlayServiceImpl implements IPlayService {
null);
return;
}
logger.info("[点播开始] deviceId: {}, channelId: {},码流类型:{}, 收流端口: {}, STREAM{}, 收流模式:{}, SSRC: {}, SSRC校验{}",
device.getDeviceId(), channelId, device.isSwitchPrimarySubStream() ? "辅码流" : "主码流", ssrcInfo.getPort(), ssrcInfo.getStream(),
logger.info("[点播开始] deviceId: {}, channelId: {},码流类型:{}, 收流端口: {}, 码流{}, 收流模式:{}, SSRC: {}, SSRC校验{}",
device.getDeviceId(), channel.getChannelId(), channel.getStreamIdentification(), ssrcInfo.getPort(), ssrcInfo.getStream(),
device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
//端口获取失败的ssrcInfo 没有必要发送点播指令
if (ssrcInfo.getPort() <= 0) {
logger.info("[点播端口分配异常]deviceId={},channelId={},ssrcInfo={}", device.getDeviceId(), channelId, ssrcInfo);
logger.info("[点播端口分配异常]deviceId={},channelId={},ssrcInfo={}", device.getDeviceId(), channel.getChannelId(), ssrcInfo);
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
streamSession.remove(device.getDeviceId(), channel.getChannelId(), ssrcInfo.getStream());
callback.run(InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), "点播端口分配异常", null);
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channel.getChannelId(), null,
InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), "点播端口分配异常", null);
return;
}
// 初始化redis中的invite消息状态
InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channelId, ssrcInfo.getStream(), ssrcInfo,
InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channel.getChannelId(), ssrcInfo.getStream(), ssrcInfo,
mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.PLAY,
InviteSessionStatus.ready);
inviteInfo.setSubStream(device.isSwitchPrimarySubStream());
inviteStreamService.updateInviteInfo(inviteInfo);
// 超时处理
String timeOutTaskKey = UUID.randomUUID().toString();
dynamicTask.startDelay(timeOutTaskKey, () -> {
// 执行超时任务时查询是否已经成功,成功了则不执行超时任务,防止超时任务取消失败的情况
InviteInfo inviteInfoForTimeOut = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
InviteInfo inviteInfoForTimeOut = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channel.getChannelId());
if (inviteInfoForTimeOut == null || inviteInfoForTimeOut.getStreamInfo() == null) {
logger.info("[点播超时] 收流超时 deviceId: {}, channelId: {},码流类型{},端口:{}, SSRC: {}",
device.getDeviceId(), channelId, device.isSwitchPrimarySubStream() ? "辅码流" : "主码流",
logger.info("[点播超时] 收流超时 deviceId: {}, channelId: {},码流:{},端口:{}, SSRC: {}",
device.getDeviceId(), channel.getChannelId(), channel.getStreamIdentification(),
ssrcInfo.getPort(), ssrcInfo.getSsrc());
callback.run(InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null);
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channel.getChannelId(), null,
InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null);
inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channel.getChannelId());
try {
cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null);
} catch (InvalidArgumentException | ParseException | SipException |
SsrcTransactionNotFoundException e) {
cmder.streamByeCmd(device, channel.getChannelId(), ssrcInfo.getStream(), null);
} catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
logger.error("[点播超时] 发送BYE失败 {}", e.getMessage());
} finally {
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
streamSession.remove(device.getDeviceId(), channel.getChannelId(), ssrcInfo.getStream());
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
// 取消订阅消息监听
HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
subscribe.removeSubscribe(hookSubscribe);
}
}else {
logger.info("[点播超时] 收流超时 deviceId: {}, channelId: {},码流类型{},端口:{}, SSRC: {}",
device.getDeviceId(), channelId, device.isSwitchPrimarySubStream() ? "辅码流" : "主码流",
logger.info("[点播超时] 收流超时 deviceId: {}, channelId: {},码流:{},端口:{}, SSRC: {}",
device.getDeviceId(), channel.getChannelId(), channel.getStreamIdentification(),
ssrcInfo.getPort(), ssrcInfo.getSsrc());
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
mediaServerService.closeRTPServer(mediaServerItem.getId(), ssrcInfo.getStream());
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
streamSession.remove(device.getDeviceId(), channel.getChannelId(), ssrcInfo.getStream());
}
}, userSetting.getPlayTimeout());
try {
cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (mediaServerItemInuse, hookParam ) -> {
cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channel, (mediaServerItemInuse, hookParam ) -> {
logger.info("收到订阅消息: " + hookParam);
dynamicTask.stop(timeOutTaskKey);
// hook响应
StreamInfo streamInfo = onPublishHandlerForPlay(mediaServerItemInuse, hookParam, device.getDeviceId(), channelId);
StreamInfo streamInfo = onPublishHandlerForPlay(mediaServerItemInuse, hookParam, device.getDeviceId(), channel.getChannelId());
if (streamInfo == null){
callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channel.getChannelId(), null,
InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
return;
}
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channel.getChannelId(), null,
InviteErrorCode.SUCCESS.getCode(),
InviteErrorCode.SUCCESS.getMsg(),
streamInfo);
logger.info("[点播成功] deviceId: {}, channelId:{}, 码流类型:{}", device.getDeviceId(), channelId,
device.isSwitchPrimarySubStream() ? "辅码流" : "主码流");
snapOnPlay(mediaServerItemInuse, device.getDeviceId(), channelId, ssrcInfo.getStream());
logger.info("[点播成功] deviceId: {}, channelId:{}, 码流类型:{}", device.getDeviceId(), channel.getChannelId(),
channel.getStreamIdentification());
snapOnPlay(mediaServerItemInuse, device.getDeviceId(), channel.getChannelId(), ssrcInfo.getStream());
}, (eventResult) -> {
// 处理收到200ok后的TCP主动连接以及SSRC不一致的问题
InviteOKHandler(eventResult, ssrcInfo, mediaServerItem, device, channelId,
InviteOKHandler(eventResult, ssrcInfo, mediaServerItem, device, channel.getChannelId(),
timeOutTaskKey, callback, inviteInfo, InviteSessionType.PLAY);
}, (event) -> {
logger.info("[点播失败] deviceId: {}, channelId:{}, {}: {}", device.getDeviceId(), channelId, event.statusCode, event.msg);
logger.info("[点播失败] deviceId: {}, channelId:{}, {}: {}", device.getDeviceId(), channel.getChannelId(), event.statusCode, event.msg);
dynamicTask.stop(timeOutTaskKey);
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
streamSession.remove(device.getDeviceId(), channel.getChannelId(), ssrcInfo.getStream());
callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_ERROR.getCode(),
String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg), null);
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channel.getChannelId(), null,
InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg), null);
inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channel.getChannelId());
});
} catch (InvalidArgumentException | SipException | ParseException e) {
@@ -481,15 +496,15 @@ public class PlayServiceImpl implements IPlayService {
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
streamSession.remove(device.getDeviceId(), channel.getChannelId(), ssrcInfo.getStream());
callback.run(InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getCode(),
InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getMsg(), null);
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channel.getChannelId(), null,
InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getCode(),
InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getMsg(), null);
inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channel.getChannelId());
}
}
@@ -626,7 +641,7 @@ public class PlayServiceImpl implements IPlayService {
@Override
public void playBack(String deviceId, String channelId, String startTime,
String endTime, ErrorCallback<Object> callback) {
String endTime, ErrorCallback<Object> callback) {
Device device = storager.queryVideoDevice(deviceId);
if (device == null) {
logger.warn("[录像回放] 未找到设备 deviceId: {},channelId:{}", deviceId, channelId);
@@ -651,8 +666,8 @@ public class PlayServiceImpl implements IPlayService {
@Override
public void playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo,
String deviceId, String channelId, String startTime,
String endTime, ErrorCallback<Object> callback) {
String deviceId, String channelId, String startTime,
String endTime, ErrorCallback<Object> callback) {
if (mediaServerItem == null || ssrcInfo == null) {
callback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(),
InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getMsg(),
@@ -1548,16 +1563,16 @@ public class PlayServiceImpl implements IPlayService {
MediaServerItem newMediaServerItem = getNewMediaServerItem(device);
play(newMediaServerItem, deviceId, channelId, null, (code, msg, data)->{
if (code == InviteErrorCode.SUCCESS.getCode()) {
InviteInfo inviteInfoForPlay = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
if (inviteInfoForPlay != null && inviteInfoForPlay.getStreamInfo() != null) {
getSnap(deviceId, channelId, fileName, errorCallback);
}else {
errorCallback.run(InviteErrorCode.FAIL.getCode(), InviteErrorCode.FAIL.getMsg(), null);
}
}else {
errorCallback.run(InviteErrorCode.FAIL.getCode(), InviteErrorCode.FAIL.getMsg(), null);
}
if (code == InviteErrorCode.SUCCESS.getCode()) {
InviteInfo inviteInfoForPlay = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
if (inviteInfoForPlay != null && inviteInfoForPlay.getStreamInfo() != null) {
getSnap(deviceId, channelId, fileName, errorCallback);
}else {
errorCallback.run(InviteErrorCode.FAIL.getCode(), InviteErrorCode.FAIL.getMsg(), null);
}
}else {
errorCallback.run(InviteErrorCode.FAIL.getCode(), InviteErrorCode.FAIL.getMsg(), null);
}
});
}

View File

@@ -20,11 +20,11 @@ public interface DeviceChannelMapper {
@Insert("INSERT INTO wvp_device_channel (channel_id, device_id, name, manufacture, model, owner, civil_code, block, " +
"address, parental, parent_id, safety_way, register_way, cert_num, certifiable, err_code, secrecy, " +
"ip_address, port, password, ptz_type, status, stream_id, longitude, latitude, longitude_gcj02, latitude_gcj02, " +
"longitude_wgs84, latitude_wgs84, has_audio, create_time, update_time, business_group_id, gps_time) " +
"longitude_wgs84, latitude_wgs84, has_audio, create_time, update_time, business_group_id, gps_time, stream_identification) " +
"VALUES (#{channelId}, #{deviceId}, #{name}, #{manufacture}, #{model}, #{owner}, #{civilCode}, #{block}," +
"#{address}, #{parental}, #{parentId}, #{safetyWay}, #{registerWay}, #{certNum}, #{certifiable}, #{errCode}, #{secrecy}, " +
"#{ipAddress}, #{port}, #{password}, #{PTZType}, #{status}, #{streamId}, #{longitude}, #{latitude}, #{longitudeGcj02}, " +
"#{latitudeGcj02}, #{longitudeWgs84}, #{latitudeWgs84}, #{hasAudio}, #{createTime}, #{updateTime}, #{businessGroupId}, #{gpsTime})")
"#{latitudeGcj02}, #{longitudeWgs84}, #{latitudeWgs84}, #{hasAudio}, #{createTime}, #{updateTime}, #{businessGroupId}, #{gpsTime}, #{streamIdentification})")
int add(DeviceChannel channel);
@Update(value = {" <script>" +
@@ -60,6 +60,7 @@ public interface DeviceChannelMapper {
"<if test='latitudeWgs84 != null'>, latitude_wgs84=#{latitudeWgs84}</if>" +
"<if test='businessGroupId != null'>, business_group_id=#{businessGroupId}</if>" +
"<if test='gpsTime != null'>, gps_time=#{gpsTime}</if>" +
"<if test='streamIdentification != null'>, stream_identification=#{streamIdentification}</if>" +
"WHERE device_id=#{deviceId} AND channel_id=#{channelId}"+
" </script>"})
int update(DeviceChannel channel);
@@ -102,6 +103,7 @@ public interface DeviceChannelMapper {
"dc.longitude_wgs84, " +
"dc.latitude_wgs84, " +
"dc.business_group_id, " +
"dc.stream_identification, " +
"dc.gps_time " +
"from " +
"wvp_device_channel dc " +
@@ -241,7 +243,7 @@ public interface DeviceChannelMapper {
"(channel_id, device_id, name, manufacture, model, owner, civil_code, block, sub_count, " +
" address, parental, parent_id, safety_way, register_way, cert_num, certifiable, err_code, secrecy, " +
" ip_address,port,password,ptz_type,status,stream_id,longitude,latitude,longitude_gcj02,latitude_gcj02,"+
" longitude_wgs84,latitude_wgs84,has_audio,create_time,update_time,business_group_id,gps_time)"+
" longitude_wgs84,latitude_wgs84,has_audio,create_time,update_time,business_group_id,gps_time,stream_identification)"+
"values " +
"<foreach collection='addChannels' index='index' item='item' separator=','> " +
"(#{item.channelId}, #{item.deviceId}, #{item.name}, #{item.manufacture}, #{item.model}, " +
@@ -251,7 +253,7 @@ public interface DeviceChannelMapper {
"#{item.ipAddress}, #{item.port}, #{item.password}, #{item.PTZType}, #{item.status}, " +
"#{item.streamId}, #{item.longitude}, #{item.latitude},#{item.longitudeGcj02}, " +
"#{item.latitudeGcj02},#{item.longitudeWgs84}, #{item.latitudeWgs84}, #{item.hasAudio}, now(), now(), " +
"#{item.businessGroupId}, #{item.gpsTime}) " +
"#{item.businessGroupId}, #{item.gpsTime}, #{item.streamIdentification}) " +
"</foreach> " +
"</script>")
int batchAdd(@Param("addChannels") List<DeviceChannel> addChannels);
@@ -349,6 +351,7 @@ public interface DeviceChannelMapper {
"<if test='item.latitudeWgs84 != null'>, latitude_wgs84=#{item.latitudeWgs84}</if>" +
"<if test='item.businessGroupId != null'>, business_group_id=#{item.businessGroupId}</if>" +
"<if test='item.gpsTime != null'>, gps_time=#{item.gpsTime}</if>" +
"<if test='item.streamIdentification != null'>, stream_identification=#{item.streamIdentification}</if>" +
"<if test='item.id > 0'>WHERE id=#{item.id}</if>" +
"<if test='item.id == 0'>WHERE device_id=#{item.deviceId} AND channel_id=#{item.channelId}</if>" +
"</foreach>" +
@@ -542,4 +545,9 @@ public interface DeviceChannelMapper {
" </script>"})
List<DeviceChannel> getSubChannelsByDeviceId(@Param("deviceId") String deviceId, @Param("parentId") String parentId, @Param("onlyCatalog") boolean onlyCatalog);
@Update("<script>" +
"UPDATE wvp_device_channel SET stream_identification=#{streamIdentification} WHERE device_id=#{deviceId}" +
" <if test='channelId != null'> and channel_id = #{channelId} </if>" +
"</script>")
void updateChannelStreamIdentification(DeviceChannel channel);
}

View File

@@ -43,7 +43,6 @@ public interface DeviceMapper {
"geo_coord_sys," +
"on_line," +
"media_server_id," +
"switch_primary_sub_stream," +
"broadcast_push_after_ack," +
"(SELECT count(0) FROM wvp_device_channel WHERE device_id=wvp_device.device_id) as channel_count "+
" FROM wvp_device WHERE device_id = #{deviceId}")
@@ -163,7 +162,6 @@ public interface DeviceMapper {
"geo_coord_sys,"+
"on_line,"+
"media_server_id,"+
"switch_primary_sub_stream switchPrimarySubStream,"+
"(SELECT count(0) FROM wvp_device_channel WHERE device_id=de.device_id) as channel_count " +
"FROM wvp_device de" +
"<if test=\"onLine != null\"> where on_line=${onLine}</if>"+
@@ -256,7 +254,6 @@ public interface DeviceMapper {
"<if test=\"asMessageChannel != null\">, as_message_channel=#{asMessageChannel}</if>" +
"<if test=\"broadcastPushAfterAck != null\">, broadcast_push_after_ack=#{broadcastPushAfterAck}</if>" +
"<if test=\"geoCoordSys != null\">, geo_coord_sys=#{geoCoordSys}</if>" +
"<if test=\"switchPrimarySubStream != null\">, switch_primary_sub_stream=#{switchPrimarySubStream}</if>" +
"<if test=\"mediaServerId != null\">, media_server_id=#{mediaServerId}</if>" +
"WHERE device_id=#{deviceId}"+
" </script>"})
@@ -275,8 +272,7 @@ public interface DeviceMapper {
"broadcastPushAfterAck,"+
"geo_coord_sys,"+
"on_line,"+
"media_server_id,"+
"switch_primary_sub_stream"+
"media_server_id"+
") VALUES (" +
"#{deviceId}," +
"#{name}," +
@@ -290,8 +286,7 @@ public interface DeviceMapper {
"#{broadcastPushAfterAck}," +
"#{geoCoordSys}," +
"#{onLine}," +
"#{mediaServerId}," +
"#{switchPrimarySubStream}" +
"#{mediaServerId}" +
")")
void addCustomDevice(Device device);

View File

@@ -153,10 +153,7 @@ public class MobilePositionController {
Device device = storager.queryVideoDevice(deviceId);
device.setSubscribeCycleForMobilePosition(Integer.parseInt(expires));
device.setMobilePositionSubmissionInterval(Integer.parseInt(interval));
deviceService.updateDevice(device);
if (!deviceService.removeMobilePositionSubscribe(device)) {
throw new ControllerException(ErrorCode.ERROR100);
}
deviceService.updateCustomDevice(device);
}
/**

View File

@@ -199,7 +199,7 @@ public class DeviceQuery {
Runnable runnable = dynamicTask.get(key);
if (runnable instanceof ISubscribeTask) {
ISubscribeTask subscribeTask = (ISubscribeTask) runnable;
subscribeTask.stop();
subscribeTask.stop(null);
}
dynamicTask.stop(key);
}
@@ -264,6 +264,14 @@ public class DeviceQuery {
deviceChannelService.updateChannel(deviceId, channel);
}
@Operation(summary = "修改通道的码流类型", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "deviceId", description = "设备国标编号", required = true)
@Parameter(name = "channel", description = "通道信息", required = true)
@PostMapping("/channel/stream/identification/update/")
public void updateChannelStreamIdentification(DeviceChannel channel){
deviceChannelService.updateChannelStreamIdentification(channel);
}
/**
* 修改数据流传输模式
* @param deviceId 设备id