解决串流,解决编辑上级平台却新建了的问题

This commit is contained in:
panlinlin
2021-04-15 17:48:52 +08:00
parent 662ce3b484
commit 937e591430
32 changed files with 574 additions and 153 deletions

View File

@@ -118,7 +118,7 @@ public class SipLayer implements SipListener {
*/
@Override
public void processRequest(RequestEvent evt) {
logger.debug(evt.getRequest().toString());
// logger.debug(evt.getRequest().toString());
// 由于jainsip是单线程程序为提高性能并发处理
processThreadPool.execute(() -> {
if (processorFactory != null) {
@@ -130,7 +130,7 @@ public class SipLayer implements SipListener {
@Override
public void processResponse(ResponseEvent evt) {
Response response = evt.getResponse();
logger.debug(evt.getResponse().toString());
// logger.debug(evt.getResponse().toString());
int status = response.getStatusCode();
if (((status >= 200) && (status < 300)) || status == 401) { // Success!
ISIPResponseProcessor processor = processorFactory.createResponseProcessor(evt);

View File

@@ -2,6 +2,11 @@ package com.genersoft.iot.vmp.gb28181.bean;
public class ParentPlatform {
/**
* id
*/
private Integer id;
/**
* 是否启用
*/
@@ -99,6 +104,13 @@ public class ParentPlatform {
*/
private int channelCount;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public boolean isEnable() {
return enable;

View File

@@ -49,7 +49,7 @@ public class PlatformKeepaliveExpireEventLister implements ApplicationListener<P
if (logger.isDebugEnabled()) {
logger.debug("平台心跳到期事件事件触发平台国标ID" + event.getPlatformGbID());
}
ParentPlatform parentPlatform = storager.queryParentPlatById(event.getPlatformGbID());
ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(event.getPlatformGbID());
ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(event.getPlatformGbID());
if (parentPlatformCatch == null) {
return;

View File

@@ -35,7 +35,7 @@ public class PlatformNotRegisterEventLister implements ApplicationListener<Platf
logger.debug("平台未注册事件触发平台国标ID" + event.getPlatformGbID());
ParentPlatform parentPlatform = storager.queryParentPlatById(event.getPlatformGbID());
ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(event.getPlatformGbID());
if (parentPlatform == null) {
logger.debug("平台未注册事件触发,但平台已经删除!!! 平台国标ID" + event.getPlatformGbID());
return;

View File

@@ -365,9 +365,14 @@ public class SIPCommander implements ISIPCommander {
// 添加订阅
JSONObject subscribeKey = new JSONObject();
subscribeKey.put("app", "rtp");
subscribeKey.put("id", streamId);
subscribeKey.put("stream", streamId);
subscribeKey.put("regist", true);
subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_publish, subscribeKey, event);
subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, json->{
if (json.getJSONArray("tracks") == null) return;
event.response(json);
subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey);
});
//
StringBuffer content = new StringBuffer(200);
content.append("v=0\r\n");
@@ -465,9 +470,14 @@ public class SIPCommander implements ISIPCommander {
// 添加订阅
JSONObject subscribeKey = new JSONObject();
subscribeKey.put("app", "rtp");
subscribeKey.put("id", streamId);
subscribeKey.put("stream", streamId);
subscribeKey.put("regist", true);
subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_publish, subscribeKey, event);
subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, json->{
if (json.getJSONArray("tracks") == null) return;
event.response(json);
subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey);
});
StringBuffer content = new StringBuffer(200);
content.append("v=0\r\n");

View File

@@ -86,7 +86,7 @@ public class InviteRequestProcessor extends SIPRequestAbstractProcessor {
}
// 查询请求方是否上级平台
ParentPlatform platform = storager.queryParentPlatById(requesterId);
ParentPlatform platform = storager.queryParentPlatByServerGBId(requesterId);
if (platform != null) {
// 查询平台下是否有该通道
DeviceChannel channel = storager.queryChannelInParentPlatform(requesterId, channelId);

View File

@@ -238,7 +238,7 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
// 回复200 OK
responseAck(evt);
String sn = rootElement.element("SN").getText();
ParentPlatform parentPlatform = storager.queryParentPlatById(platformId);
ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformId);
cmderFroPlatform.deviceStatusResponse(parentPlatform, sn, fromHeader.getTag());
}
} else {
@@ -303,7 +303,7 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
if (deviceId.equals(targetGBId)) {
// 远程启动本平台需要在重新启动程序后先对SipStack解绑
logger.info("执行远程启动本平台命令");
ParentPlatform parentPlatform = storager.queryParentPlatById(platformId);
ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformId);
cmderFroPlatform.unregister(parentPlatform, null, null);
Thread restartThread = new Thread(new Runnable() {
@@ -463,7 +463,7 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
// 回复200 OK
responseAck(evt);
String sn = rootElement.element("SN").getText();
ParentPlatform parentPlatform = storager.queryParentPlatById(platformId);
ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformId);
cmderFroPlatform.deviceInfoResponse(parentPlatform, sn, fromHeader.getTag());
}
} else {
@@ -517,7 +517,7 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
// if (deviceListElement == null) { // 存在DeviceList则为响应 catalog 不存在DeviceList则为查询请求
if (name.equalsIgnoreCase("Query")) { // 区分是Response——查询响应还是Query——查询请求
// TODO 后续将代码拆分
ParentPlatform parentPlatform = storager.queryParentPlatById(platformId);
ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformId);
if (parentPlatform == null) {
response404Ack(evt);
return;

View File

@@ -82,7 +82,7 @@ public class RegisterResponseProcessor implements ISIPResponseProcessor {
redisCatchStorage.delPlatformRegisterInfo(callId);
parentPlatform.setStatus(true);
// 取回Expires设置避免注销过程中被置为0
ParentPlatform parentPlatformTmp = storager.queryParentPlatById(platformGBId);
ParentPlatform parentPlatformTmp = storager.queryParentPlatByServerGBId(platformGBId);
String expires = parentPlatformTmp.getExpires();
parentPlatform.setExpires(expires);
storager.updateParentPlatform(parentPlatform);

View File

@@ -240,6 +240,10 @@ public class ZLMHttpHookListener {
if (logger.isDebugEnabled()) {
logger.debug("ZLM HOOK on_stream_changed API调用参数" + json.toString());
}
ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, json);
if (subscribe != null) subscribe.response(json);
// 流消失移除redis play
String app = json.getString("app");
String streamId = json.getString("stream");

View File

@@ -163,6 +163,10 @@ public class ZLMRESTfulUtils {
return sendPost("closeRtpServer",param, null);
}
public JSONObject listRtpServer() {
return sendPost("listRtpServer",null, null);
}
public JSONObject startSendRtp(Map<String, Object> param) {
return sendPost("startSendRtp",param, null);
}

View File

@@ -1,5 +1,6 @@
package com.genersoft.iot.vmp.media.zlm;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.session.SsrcUtil;
@@ -27,7 +28,28 @@ public class ZLMRTPServerFactory {
private int currentPort = 0;
private Map<String, Integer> currentStreams = null;
public int createRTPServer(String streamId) {
if (currentStreams == null) {
currentStreams = new HashMap<>();
JSONObject jsonObject = zlmresTfulUtils.listRtpServer();
JSONArray data = jsonObject.getJSONArray("data");
if (data != null) {
for (int i = 0; i < data.size(); i++) {
JSONObject dataItem = data.getJSONObject(i);
currentStreams.put(dataItem.getString("stream_id"), dataItem.getInteger("port"));
}
}
}
// 已经在推流
if (currentStreams.get(streamId) != null) {
Map<String, Object> closeRtpServerParam = new HashMap<>();
closeRtpServerParam.put("stream_id", streamId);
zlmresTfulUtils.closeRtpServer(closeRtpServerParam);
currentStreams.remove(streamId);
}
Map<String, Object> param = new HashMap<>();
int result = -1;
int newPort = getPortFromUdpPortRange();
@@ -35,14 +57,16 @@ public class ZLMRTPServerFactory {
param.put("enable_tcp", 1);
param.put("stream_id", streamId);
JSONObject jsonObject = zlmresTfulUtils.openRtpServer(param);
System.out.println(jsonObject);
if (jsonObject != null) {
switch (jsonObject.getInteger("code")){
case 0:
result= newPort;
break;
case -300: // id已经存在
case -300: // id已经存在, 可能已经在其他端口推流
Map<String, Object> closeRtpServerParam = new HashMap<>();
closeRtpServerParam.put("stream_id", streamId);
zlmresTfulUtils.closeRtpServer(closeRtpServerParam);
result = newPort;
break;
case -400: // 端口占用

View File

@@ -1,5 +1,7 @@
package com.genersoft.iot.vmp.service;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
/**
@@ -21,6 +23,6 @@ public interface IMediaService {
* @param stream
* @return
*/
StreamInfo getStreamInfoByAppAndStream(String app, String stream);
StreamInfo getStreamInfoByAppAndStream(String app, String stream, JSONArray tracks);
}

View File

@@ -1,5 +1,7 @@
package com.genersoft.iot.vmp.service.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.MediaServerConfig;
@@ -25,7 +27,7 @@ public class MediaServiceImpl implements IMediaService {
@Override
public StreamInfo getStreamInfoByAppAndStream(String app, String stream) {
public StreamInfo getStreamInfoByAppAndStream(String app, String stream, JSONArray tracks) {
MediaServerConfig mediaInfo = redisCatchStorage.getMediaInfo();
StreamInfo streamInfoResult = new StreamInfo();
streamInfoResult.setStreamId(stream);
@@ -41,7 +43,7 @@ public class MediaServiceImpl implements IMediaService {
streamInfoResult.setTs(String.format("http://%s:%s/%s/%s.live.ts", mediaInfo.getWanIp(), mediaInfo.getHttpPort(), app, stream));
streamInfoResult.setWs_ts(String.format("ws://%s:%s/%s/%s.live.ts", mediaInfo.getWanIp(), mediaInfo.getHttpPort(), app, stream));
streamInfoResult.setRtc(String.format("http://%s:%s/index/api/webrtc?app=%s&stream=%s&type=play", mediaInfo.getWanIp(), mediaInfo.getHttpPort(), app, stream));
streamInfoResult.setTracks(tracks);
return streamInfoResult;
}
@@ -50,7 +52,14 @@ public class MediaServiceImpl implements IMediaService {
StreamInfo streamInfo = null;
JSONObject mediaList = zlmresTfulUtils.getMediaList(app, stream);
if (mediaList != null) {
streamInfo = getStreamInfoByAppAndStream(app, stream);
if (mediaList.getInteger("code") == 0) {
JSONArray data = mediaList.getJSONArray("data");
if (data == null) return null;
JSONObject mediaJSON = JSON.parseObject(JSON.toJSONString(data.get(0)), JSONObject.class);
JSONArray tracks = mediaJSON.getJSONArray("tracks");
streamInfo = getStreamInfoByAppAndStream(app, stream, tracks);
}
}
return streamInfo;
}

View File

@@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.service.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.Device;
@@ -83,6 +84,13 @@ public class PlayServiceImpl implements IPlayService {
});
} else {
String streamId = streamInfo.getStreamId();
if (streamId == null) {
RequestMessage msg = new RequestMessage();
msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid);
msg.setData(String.format("点播失败, redis缓存streamId等于null"));
resultHolder.invokeResult(msg);
return playResult;
}
JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(streamId);
if (rtpInfo != null && rtpInfo.getBoolean("exist")) {
RequestMessage msg = new RequestMessage();
@@ -150,8 +158,9 @@ public class PlayServiceImpl implements IPlayService {
}
public StreamInfo onPublishHandler(JSONObject resonse, String deviceId, String channelId, String uuid) {
String streamId = resonse.getString("id");
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream("rtp", streamId);
String streamId = resonse.getString("stream");
JSONArray tracks = resonse.getJSONArray("tracks");
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream("rtp", streamId, tracks);
streamInfo.setDeviceID(deviceId);
streamInfo.setChannelId(channelId);
return streamInfo;

View File

@@ -194,7 +194,7 @@ public interface IVideoManagerStorager {
* @param platformGbId
* @return
*/
ParentPlatform queryParentPlatById(String platformGbId);
ParentPlatform queryParentPlatByServerGBId(String platformGbId);
/**
* 所有平台离线

View File

@@ -25,6 +25,7 @@ public interface ParentPlatformMapper {
"SET enable=#{enable}, " +
"name=#{name}," +
"deviceGBId=#{deviceGBId}," +
"serverGBId=#{serverGBId}, " +
"serverGBDomain=#{serverGBDomain}, " +
"serverIP=#{serverIP}," +
"serverPort=#{serverPort}, " +
@@ -39,7 +40,7 @@ public interface ParentPlatformMapper {
"ptz=#{ptz}, " +
"rtcp=#{rtcp}, " +
"status=#{status} " +
"WHERE serverGBId=#{serverGBId}")
"WHERE id=#{id}")
int updateParentPlatform(ParentPlatform parentPlatform);
@Delete("DELETE FROM parent_platform WHERE serverGBId=#{serverGBId}")
@@ -52,7 +53,10 @@ public interface ParentPlatformMapper {
List<ParentPlatform> getEnableParentPlatformList(boolean enable);
@Select("SELECT * FROM parent_platform WHERE serverGBId=#{platformGbId}")
ParentPlatform getParentPlatById(String platformGbId);
ParentPlatform getParentPlatByServerGBId(String platformGbId);
@Select("SELECT * FROM parent_platform WHERE id=#{id}")
ParentPlatform getParentPlatById(int id);
@Update("UPDATE parent_platform SET status=false" )
void outlineForAllParentPlatform();

View File

@@ -271,15 +271,21 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
public boolean updateParentPlatform(ParentPlatform parentPlatform) {
int result = 0;
ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId()); // .getDeviceGBId());
if ( platformMapper.getParentPlatById(parentPlatform.getServerGBId()) == null) {
if (parentPlatform.getId() == null ) {
result = platformMapper.addParentPlatform(parentPlatform);
if (parentPlatformCatch == null) {
parentPlatformCatch = new ParentPlatformCatch();
parentPlatformCatch.setParentPlatform(parentPlatform);
parentPlatformCatch.setId(parentPlatform.getServerGBId());
}
}else {
if (parentPlatformCatch == null) { // serverGBId 已变化
ParentPlatform parentPlatById = platformMapper.getParentPlatById(parentPlatform.getId());
// 使用旧的查出缓存ID
parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatById.getServerGBId());
parentPlatformCatch.setId(parentPlatform.getServerGBId());
redisCatchStorage.delPlatformCatchInfo(parentPlatById.getServerGBId());
}
result = platformMapper.updateParentPlatform(parentPlatform);
}
// 更新缓存
@@ -305,8 +311,8 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
}
@Override
public ParentPlatform queryParentPlatById(String platformGbId) {
return platformMapper.getParentPlatById(platformGbId);
public ParentPlatform queryParentPlatByServerGBId(String platformGbId) {
return platformMapper.getParentPlatByServerGBId(platformGbId);
}
@Override

View File

@@ -111,7 +111,7 @@ public class PlatformController {
// TODO 检查是否已经存在,且注册成功, 如果注册成功,需要先注销之前再,修改并注册
// ParentPlatform parentPlatformOld = storager.queryParentPlatById(parentPlatform.getDeviceGBId());
ParentPlatform parentPlatformOld = storager.queryParentPlatById(parentPlatform.getServerGBId());
ParentPlatform parentPlatformOld = storager.queryParentPlatByServerGBId(parentPlatform.getServerGBId());
boolean updateResult = storager.updateParentPlatform(parentPlatform);
@@ -123,8 +123,6 @@ public class PlatformController {
} else if (parentPlatformOld != null && parentPlatformOld.isEnable() && !parentPlatform.isEnable()){ // 关闭启用时注销
commanderForPlatform.unregister(parentPlatform, null, null);
}
return new ResponseEntity<>("success", HttpStatus.OK);
} else {
return new ResponseEntity<>("fail", HttpStatus.OK);
@@ -151,7 +149,7 @@ public class PlatformController {
){
return new ResponseEntity<>("missing parameters", HttpStatus.BAD_REQUEST);
}
ParentPlatform parentPlatform = storager.queryParentPlatById(serverGBId);
ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(serverGBId);
if (parentPlatform == null) return new ResponseEntity<>("fail", HttpStatus.OK);
// 发送离线消息,无论是否成功都删除缓存
commanderForPlatform.unregister(parentPlatform, (event -> {
@@ -192,7 +190,7 @@ public class PlatformController {
if (logger.isDebugEnabled()) {
logger.debug("查询上级平台是否存在API调用" + serverGBId);
}
ParentPlatform parentPlatform = storager.queryParentPlatById(serverGBId);
ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(serverGBId);
return new ResponseEntity<>(String.valueOf(parentPlatform != null), HttpStatus.OK);
}

View File

@@ -73,7 +73,6 @@ public class PlayController {
public DeferredResult<ResponseEntity<String>> play(@PathVariable String deviceId,
@PathVariable String channelId) {
PlayResult playResult = playService.play(deviceId, channelId, null, null);
// 超时处理
@@ -181,7 +180,7 @@ public class PlayController {
JSONObject data = jsonObject.getJSONObject("data");
if (data != null) {
result.put("key", data.getString("key"));
StreamInfo streamInfoResult = mediaService.getStreamInfoByAppAndStream("convert", streamId);
StreamInfo streamInfoResult = mediaService.getStreamInfoByAppAndStreamWithCheck("convert", streamId);
result.put("data", streamInfoResult);
}
}else {

View File

@@ -71,7 +71,7 @@ public class PlaybackController {
logger.debug(String.format("设备回放 API调用deviceId%s channelId%s", deviceId, channelId));
}
UUID uuid = UUID.randomUUID();
DeferredResult<ResponseEntity<String>> result = new DeferredResult<ResponseEntity<String>>();
DeferredResult<ResponseEntity<String>> result = new DeferredResult<ResponseEntity<String>>(30000L);
// 超时处理
result.onTimeout(()->{
logger.warn(String.format("设备回放超时deviceId%s channelId%s", deviceId, channelId));

View File

@@ -21,10 +21,10 @@ import javax.security.sasl.AuthenticationException;
public class UserController {
@Autowired
AuthenticationManager authenticationManager;
private AuthenticationManager authenticationManager;
@Autowired
IUserService userService;
private IUserService userService;
@ApiOperation("登录")
@ApiImplicitParams({
@@ -33,7 +33,7 @@ public class UserController {
})
@GetMapping("/login")
public String login(String username, String password){
LoginUser user = null;
LoginUser user;
try {
user = SecurityUtils.login(username, password, authenticationManager);
} catch (AuthenticationException e) {