使用异步接口, 更好的并发, 对hook使用订阅机制

替换前段播放器, 支持h265的播放
放弃循环获取编码信息,
This commit is contained in:
648540858
2020-12-08 18:11:02 +08:00
parent 5a4859f05c
commit f75b3e6cda
20 changed files with 681 additions and 359 deletions

View File

@@ -5,12 +5,18 @@ import com.alibaba.fastjson.JSONArray;
public class StreamInfo {
private String ssrc;
private String streamId;
private String deviceID;
private String cahnnelId;
private String flv;
private String ws_flv;
private String rtmp;
private String fmp4;
private String ws_fmp4;
private String hls;
private String ws_hls;
private String ts;
private String ws_ts;
private String rtmp;
private String rtsp;
private JSONArray tracks;
@@ -85,4 +91,52 @@ public class StreamInfo {
public void setTracks(JSONArray tracks) {
this.tracks = tracks;
}
public String getFmp4() {
return fmp4;
}
public void setFmp4(String fmp4) {
this.fmp4 = fmp4;
}
public String getWs_fmp4() {
return ws_fmp4;
}
public void setWs_fmp4(String ws_fmp4) {
this.ws_fmp4 = ws_fmp4;
}
public String getWs_hls() {
return ws_hls;
}
public void setWs_hls(String ws_hls) {
this.ws_hls = ws_hls;
}
public String getTs() {
return ts;
}
public void setTs(String ts) {
this.ts = ts;
}
public String getWs_ts() {
return ws_ts;
}
public void setWs_ts(String ws_ts) {
this.ws_ts = ws_ts;
}
public String getStreamId() {
return streamId;
}
public void setStreamId(String streamId) {
this.streamId = streamId;
}
}

View File

@@ -22,6 +22,8 @@ public class DeferredResultHolder {
public static final String CALLBACK_CMD_RECORDINFO = "CALLBACK_RECORDINFO";
public static final String CALLBACK_CMD_PlAY = "CALLBACK_PLAY";
private Map<String, DeferredResult> map = new HashMap<String, DeferredResult>();
public void put(String key, DeferredResult result) {

View File

@@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
/**
* @Description:设备能力接口,用于定义设备的控制、查询能力
@@ -19,7 +20,7 @@ public interface ISIPCommander {
* @param upDown 镜头上移下移 0:停止 1:上移 2:下移
* @param moveSpeed 镜头移动速度
*/
public boolean ptzdirectCmd(Device device,String channelId,int leftRight, int upDown);
boolean ptzdirectCmd(Device device,String channelId,int leftRight, int upDown);
/**
* 云台方向放控制
@@ -30,7 +31,7 @@ public interface ISIPCommander {
* @param upDown 镜头上移下移 0:停止 1:上移 2:下移
* @param moveSpeed 镜头移动速度
*/
public boolean ptzdirectCmd(Device device,String channelId,int leftRight, int upDown, int moveSpeed);
boolean ptzdirectCmd(Device device,String channelId,int leftRight, int upDown, int moveSpeed);
/**
* 云台缩放控制,使用配置文件中的默认镜头缩放速度
@@ -39,7 +40,7 @@ public interface ISIPCommander {
* @param channelId 预览通道
* @param inOut 镜头放大缩小 0:停止 1:缩小 2:放大
*/
public boolean ptzZoomCmd(Device device,String channelId,int inOut);
boolean ptzZoomCmd(Device device,String channelId,int inOut);
/**
* 云台缩放控制
@@ -49,7 +50,7 @@ public interface ISIPCommander {
* @param inOut 镜头放大缩小 0:停止 1:缩小 2:放大
* @param zoomSpeed 镜头缩放速度
*/
public boolean ptzZoomCmd(Device device,String channelId,int inOut, int moveSpeed);
boolean ptzZoomCmd(Device device,String channelId,int inOut, int moveSpeed);
/**
* 云台控制,支持方向与缩放控制
@@ -62,7 +63,7 @@ public interface ISIPCommander {
* @param moveSpeed 镜头移动速度
* @param zoomSpeed 镜头缩放速度
*/
public boolean ptzCmd(Device device,String channelId,int leftRight, int upDown, int inOut, int moveSpeed, int zoomSpeed);
boolean ptzCmd(Device device,String channelId,int leftRight, int upDown, int inOut, int moveSpeed, int zoomSpeed);
/**
* 前端控制包括PTZ指令、FI指令、预置位指令、巡航指令、扫描指令和辅助开关指令
@@ -74,7 +75,7 @@ public interface ISIPCommander {
* @param parameter2 数据2
* @param combineCode2 组合码2
*/
public boolean frontEndCmd(Device device, String channelId, int cmdCode, int parameter1, int parameter2, int combineCode2);
boolean frontEndCmd(Device device, String channelId, int cmdCode, int parameter1, int parameter2, int combineCode2);
/**
* 请求预览视频流
@@ -82,7 +83,7 @@ public interface ISIPCommander {
* @param device 视频设备
* @param channelId 预览通道
*/
public StreamInfo playStreamCmd(Device device, String channelId);
void playStreamCmd(Device device, String channelId, ZLMHttpHookSubscribe.Event event);
/**
* 请求回放视频流
@@ -92,14 +93,14 @@ public interface ISIPCommander {
* @param startTime 开始时间,格式要求yyyy-MM-dd HH:mm:ss
* @param endTime 结束时间,格式要求yyyy-MM-dd HH:mm:ss
*/
public StreamInfo playbackStreamCmd(Device device,String channelId, String startTime, String endTime);
void playbackStreamCmd(Device device, String channelId, String startTime, String endTime, ZLMHttpHookSubscribe.Event event);
/**
* 视频流停止
*
* @param ssrc ssrc
*/
public void streamByeCmd(String ssrc);
void streamByeCmd(String ssrc);
/**
* 语音广播
@@ -107,7 +108,7 @@ public interface ISIPCommander {
* @param device 视频设备
* @param channelId 预览通道
*/
public boolean audioBroadcastCmd(Device device,String channelId);
boolean audioBroadcastCmd(Device device,String channelId);
/**
* 音视频录像控制
@@ -115,21 +116,21 @@ public interface ISIPCommander {
* @param device 视频设备
* @param channelId 预览通道
*/
public boolean recordCmd(Device device,String channelId);
boolean recordCmd(Device device,String channelId);
/**
* 报警布防/撤防命令
*
* @param device 视频设备
*/
public boolean guardCmd(Device device);
boolean guardCmd(Device device);
/**
* 报警复位命令
*
* @param device 视频设备
*/
public boolean alarmCmd(Device device);
boolean alarmCmd(Device device);
/**
* 强制关键帧命令,设备收到此命令应立刻发送一个IDR帧
@@ -137,21 +138,21 @@ public interface ISIPCommander {
* @param device 视频设备
* @param channelId 预览通道
*/
public boolean iFameCmd(Device device,String channelId);
boolean iFameCmd(Device device,String channelId);
/**
* 看守位控制命令
*
* @param device 视频设备
*/
public boolean homePositionCmd(Device device);
boolean homePositionCmd(Device device);
/**
* 设备配置命令
*
* @param device 视频设备
*/
public boolean deviceConfigCmd(Device device);
boolean deviceConfigCmd(Device device);
/**
@@ -159,7 +160,7 @@ public interface ISIPCommander {
*
* @param device 视频设备
*/
public boolean deviceStatusQuery(Device device);
boolean deviceStatusQuery(Device device);
/**
* 查询设备信息
@@ -167,14 +168,14 @@ public interface ISIPCommander {
* @param device 视频设备
* @return
*/
public boolean deviceInfoQuery(Device device);
boolean deviceInfoQuery(Device device);
/**
* 查询目录列表
*
* @param device 视频设备
*/
public boolean catalogQuery(Device device);
boolean catalogQuery(Device device);
/**
* 查询录像信息
@@ -183,33 +184,33 @@ public interface ISIPCommander {
* @param startTime 开始时间,格式要求yyyy-MM-dd HH:mm:ss
* @param endTime 结束时间,格式要求yyyy-MM-dd HH:mm:ss
*/
public boolean recordInfoQuery(Device device, String channelId, String startTime, String endTime);
boolean recordInfoQuery(Device device, String channelId, String startTime, String endTime);
/**
* 查询报警信息
*
* @param device 视频设备
*/
public boolean alarmInfoQuery(Device device);
boolean alarmInfoQuery(Device device);
/**
* 查询设备配置
*
* @param device 视频设备
*/
public boolean configQuery(Device device);
boolean configQuery(Device device);
/**
* 查询设备预置位置
*
* @param device 视频设备
*/
public boolean presetQuery(Device device);
boolean presetQuery(Device device);
/**
* 查询移动设备位置数据
*
* @param device 视频设备
*/
public boolean mobilePostitionQuery(Device device);
boolean mobilePostitionQuery(Device device);
}

View File

@@ -19,6 +19,7 @@ import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.MediaServerConfig;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZLMUtils;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import org.springframework.beans.factory.annotation.Autowired;
@@ -67,6 +68,9 @@ public class SIPCommander implements ISIPCommander {
@Value("${media.rtp.enable}")
private boolean rtpEnable;
@Autowired
private ZLMHttpHookSubscribe subscribe;
/**
@@ -264,12 +268,12 @@ public class SIPCommander implements ISIPCommander {
}
/**
* 请求预览视频流
*
*
* @param device 视频设备
* @param channelId 预览通道
*/
*/
@Override
public StreamInfo playStreamCmd(Device device, String channelId) {
public void playStreamCmd(Device device, String channelId, ZLMHttpHookSubscribe.Event event) {
try {
String ssrc = streamSession.createPlaySsrc();
@@ -282,53 +286,57 @@ public class SIPCommander implements ISIPCommander {
}else {
mediaPort = mediaInfo.getRtpProxyPort();
}
String streamId = String.format("%08x", Integer.parseInt(ssrc)).toUpperCase();
// 添加订阅
JSONObject subscribeKey = new JSONObject();
subscribeKey.put("app", "rtp");
subscribeKey.put("id", streamId);
subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_publish, subscribeKey, event);
//
StringBuffer content = new StringBuffer(200);
content.append("v=0\r\n");
content.append("o="+channelId+" 0 0 IN IP4 "+mediaInfo.getWanIp()+"\r\n");
content.append("s=Play\r\n");
content.append("c=IN IP4 "+mediaInfo.getWanIp()+"\r\n");
content.append("t=0 0\r\n");
if("TCP-PASSIVE".equals(streamMode)) {
content.append("m=video "+ mediaPort +" TCP/RTP/AVP 96 98 97\r\n");
content.append("v=0\r\n");
content.append("o="+channelId+" 0 0 IN IP4 "+mediaInfo.getWanIp()+"\r\n");
content.append("s=Play\r\n");
content.append("c=IN IP4 "+mediaInfo.getWanIp()+"\r\n");
content.append("t=0 0\r\n");
if("TCP-PASSIVE".equals(streamMode)) {
content.append("m=video "+ mediaPort +" TCP/RTP/AVP 96 98 97\r\n");
}else if ("TCP-ACTIVE".equals(streamMode)) {
content.append("m=video "+ mediaPort +" TCP/RTP/AVP 96 98 97\r\n");
}else if("UDP".equals(streamMode)) {
content.append("m=video "+ mediaPort +" RTP/AVP 96 98 97\r\n");
content.append("m=video "+ mediaPort +" RTP/AVP 96 98 97\r\n");
}
content.append("a=recvonly\r\n");
content.append("a=rtpmap:96 PS/90000\r\n");
content.append("a=rtpmap:98 H264/90000\r\n");
content.append("a=rtpmap:97 MPEG4/90000\r\n");
if("TCP-PASSIVE".equals(streamMode)){ // tcp被动模式
content.append("a=setup:passive\r\n");
content.append("a=recvonly\r\n");
content.append("a=rtpmap:96 PS/90000\r\n");
content.append("a=rtpmap:98 H264/90000\r\n");
content.append("a=rtpmap:97 MPEG4/90000\r\n");
if("TCP-PASSIVE".equals(streamMode)){ // tcp被动模式
content.append("a=setup:passive\r\n");
content.append("a=connection:new\r\n");
}else if ("TCP-ACTIVE".equals(streamMode)) { // tcp主动模式
}else if ("TCP-ACTIVE".equals(streamMode)) { // tcp主动模式
content.append("a=setup:active\r\n");
content.append("a=connection:new\r\n");
}
content.append("y="+ssrc+"\r\n");//ssrc
Request request = headerProvider.createInviteRequest(device, channelId, content.toString(), null, "live", null, ssrc);
ClientTransaction transaction = transmitRequest(device, request);
streamSession.put(ssrc, transaction);
content.append("y="+ssrc+"\r\n");//ssrc
Request request = headerProvider.createInviteRequest(device, channelId, content.toString(), null, "live", null, ssrc);
ClientTransaction transaction = transmitRequest(device, request);
streamSession.put(ssrc, transaction);
DeviceChannel deviceChannel = storager.queryChannel(device.getDeviceId(), channelId);
if (deviceChannel != null) {
deviceChannel.setSsrc(ssrc);
storager.updateChannel(device.getDeviceId(), deviceChannel);
}
StreamInfo streamInfo = new StreamInfo();
streamInfo.setSsrc(ssrc);
streamInfo.setCahnnelId(channelId);
streamInfo.setDeviceID(device.getDeviceId());
storager.startPlay(streamInfo);
return streamInfo;
// TODO 订阅SIP response处理对方的错误返回
} catch ( SipException | ParseException | InvalidArgumentException e) {
e.printStackTrace();
return null;
}
}
}
/**
@@ -340,10 +348,18 @@ public class SIPCommander implements ISIPCommander {
* @param endTime 结束时间,格式要求yyyy-MM-dd HH:mm:ss
*/
@Override
public StreamInfo playbackStreamCmd(Device device, String channelId, String startTime, String endTime) {
public void playbackStreamCmd(Device device, String channelId, String startTime, String endTime, ZLMHttpHookSubscribe.Event event) {
try {
MediaServerConfig mediaInfo = storager.getMediaInfo();
String ssrc = streamSession.createPlayBackSsrc();
String streamId = String.format("%08x", Integer.parseInt(ssrc)).toUpperCase();
// 添加订阅
JSONObject subscribeKey = new JSONObject();
subscribeKey.put("app", "rtp");
subscribeKey.put("id", streamId);
subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_publish, subscribeKey, event);
//
StringBuffer content = new StringBuffer(200);
content.append("v=0\r\n");
@@ -386,16 +402,8 @@ public class SIPCommander implements ISIPCommander {
ClientTransaction transaction = transmitRequest(device, request);
streamSession.put(ssrc, transaction);
StreamInfo streamInfo = new StreamInfo();
streamInfo.setSsrc(ssrc);
streamInfo.setCahnnelId(channelId);
streamInfo.setDeviceID(device.getDeviceId());
boolean b = storager.startPlayback(streamInfo);
return streamInfo;
} catch ( SipException | ParseException | InvalidArgumentException e) {
e.printStackTrace();
return null;
}
}
@@ -433,6 +441,7 @@ public class SIPCommander implements ISIPCommander {
clientTransaction = udpSipProvider.getNewClientTransaction(byeRequest);
}
dialog.sendRequest(clientTransaction);
streamSession.remove(ssrc);
} catch (TransactionDoesNotExistException e) {
e.printStackTrace();
} catch (SipException e) {

View File

@@ -1,9 +1,14 @@
package com.genersoft.iot.vmp.gb28181.transmit.request.impl;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
import javax.sip.SipException;
import javax.sip.message.Response;
import com.genersoft.iot.vmp.gb28181.transmit.request.SIPRequestAbstractProcessor;
import java.text.ParseException;
/**
* @Description: BYE请求处理器
* @author: swwheihei
@@ -11,18 +16,35 @@ import com.genersoft.iot.vmp.gb28181.transmit.request.SIPRequestAbstractProcesso
*/
public class ByeRequestProcessor extends SIPRequestAbstractProcessor {
/**
/**
* 处理BYE请求
*
* @param evt
* @param layer
* @param transaction
* @param config
*/
*/
@Override
public void process(RequestEvent evt) {
try {
responseAck(evt);
} catch (SipException e) {
e.printStackTrace();
} catch (InvalidArgumentException e) {
e.printStackTrace();
} catch (ParseException e) {
e.printStackTrace();
}
// TODO 优先级99 Bye Request消息实现此消息一般为级联消息上级给下级发送视频停止指令
}
/***
* 回复200 OK
* @param evt
* @throws SipException
* @throws InvalidArgumentException
* @throws ParseException
*/
private void responseAck(RequestEvent evt) throws SipException, InvalidArgumentException, ParseException {
Response response = getMessageFactory().createResponse(Response.OK, evt.getRequest());
getServerTransaction(evt).sendResponse(response);
}
}

View File

@@ -49,6 +49,9 @@ public class ZLMHttpHookListener {
@Autowired
private ZLMRESTfulUtils zlmresTfulUtils;
@Autowired
private ZLMHttpHookSubscribe subscribe;
@Value("${media.ip}")
private String mediaIp;
@@ -128,30 +131,38 @@ public class ZLMHttpHookListener {
}
String app = json.getString("app");
String streamId = json.getString("id");
if ("rtp".equals(app)) {
String ssrc = new DecimalFormat("0000000000").format(Integer.parseInt(streamId, 16));
StreamInfo streamInfoForPlay = storager.queryPlayBySSRC(ssrc);
if ("rtp".equals(app) && streamInfoForPlay != null ) {
MediaServerConfig mediaInfo = storager.getMediaInfo();
streamInfoForPlay.setFlv(String.format("http://%s:%s/rtp/%s.flv", mediaInfo.getWanIp(), mediaInfo.getHttpPort(), streamId));
streamInfoForPlay.setWs_flv(String.format("ws://%s:%s/rtp/%s.flv", mediaInfo.getWanIp(), mediaInfo.getHttpPort(), streamId));
streamInfoForPlay.setRtmp(String.format("rtmp://%s:%s/rtp/%s", mediaInfo.getWanIp(), mediaInfo.getRtmpPort(), streamId));
streamInfoForPlay.setHls(String.format("http://%s:%s/rtp/%s/hls.m3u8", mediaInfo.getWanIp(), mediaInfo.getHttpPort(), streamId));
streamInfoForPlay.setRtsp(String.format("rtsp://%s:%s/rtp/%s", mediaInfo.getWanIp(), mediaInfo.getRtspPort(), streamId));
storager.startPlay(streamInfoForPlay);
}
StreamInfo streamInfoForPlayBack = storager.queryPlaybackBySSRC(ssrc);
if ("rtp".equals(app) && streamInfoForPlayBack != null ) {
MediaServerConfig mediaInfo = storager.getMediaInfo();
streamInfoForPlayBack.setFlv(String.format("http://%s:%s/rtp/%s.flv", mediaInfo.getWanIp(), mediaInfo.getHttpPort(), streamId));
streamInfoForPlayBack.setWs_flv(String.format("ws://%s:%s/rtp/%s.flv", mediaInfo.getWanIp(), mediaInfo.getHttpPort(), streamId));
streamInfoForPlayBack.setRtmp(String.format("rtmp://%s:%s/rtp/%s", mediaInfo.getWanIp(), mediaInfo.getRtmpPort(), streamId));
streamInfoForPlayBack.setHls(String.format("http://%s:%s/rtp/%s/hls.m3u8", mediaInfo.getWanIp(), mediaInfo.getHttpPort(), streamId));
streamInfoForPlayBack.setRtsp(String.format("rtsp://%s:%s/rtp/%s", mediaInfo.getWanIp(), mediaInfo.getRtspPort(), streamId));
storager.startPlayback(streamInfoForPlayBack);
}
}
ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_publish, json);
if (subscribe != null) subscribe.response(json);
// if ("rtp".equals(app)) {
// String ssrc = new DecimalFormat("0000000000").format(Integer.parseInt(streamId, 16));
// StreamInfo streamInfoForPlay = storager.queryPlayBySSRC(ssrc);
// if ("rtp".equals(app) && streamInfoForPlay != null ) {
// MediaServerConfig mediaInfo = storager.getMediaInfo();
// streamInfoForPlay.setFlv(String.format("http://%s:%s/rtp/%s.flv", mediaInfo.getWanIp(), mediaInfo.getHttpPort(), streamId));
// streamInfoForPlay.setWs_flv(String.format("ws://%s:%s/rtp/%s.flv", mediaInfo.getWanIp(), mediaInfo.getHttpPort(), streamId));
// streamInfoForPlay.setFmp4(String.format("http://%s:%s/rtp/%s.live.mp4", mediaInfo.getWanIp(), mediaInfo.getHttpPort(), streamId));
// streamInfoForPlay.setWs_fmp4(String.format("ws://%s:%s/rtp/%s.live.mp4", mediaInfo.getWanIp(), mediaInfo.getHttpPort(), streamId));
// streamInfoForPlay.setRtmp(String.format("rtmp://%s:%s/rtp/%s", mediaInfo.getWanIp(), mediaInfo.getRtmpPort(), streamId));
// streamInfoForPlay.setHls(String.format("http://%s:%s/rtp/%s/hls.m3u8", mediaInfo.getWanIp(), mediaInfo.getHttpPort(), streamId));
// streamInfoForPlay.setRtsp(String.format("rtsp://%s:%s/rtp/%s", mediaInfo.getWanIp(), mediaInfo.getRtspPort(), streamId));
// storager.startPlay(streamInfoForPlay);
// }
//
// StreamInfo streamInfoForPlayBack = storager.queryPlaybackBySSRC(ssrc);
// if ("rtp".equals(app) && streamInfoForPlayBack != null ) {
// MediaServerConfig mediaInfo = storager.getMediaInfo();
// streamInfoForPlayBack.setFlv(String.format("http://%s:%s/rtp/%s.flv", mediaInfo.getWanIp(), mediaInfo.getHttpPort(), streamId));
// streamInfoForPlayBack.setWs_flv(String.format("ws://%s:%s/rtp/%s.flv", mediaInfo.getWanIp(), mediaInfo.getHttpPort(), streamId));
// streamInfoForPlayBack.setFmp4(String.format("http://%s:%s/rtp/%s.live.mp4", mediaInfo.getWanIp(), mediaInfo.getHttpPort(), streamId));
// streamInfoForPlayBack.setWs_fmp4(String.format("ws://%s:%s/rtp/%s.live.mp4", mediaInfo.getWanIp(), mediaInfo.getHttpPort(), streamId));
// streamInfoForPlayBack.setRtmp(String.format("rtmp://%s:%s/rtp/%s", mediaInfo.getWanIp(), mediaInfo.getRtmpPort(), streamId));
// streamInfoForPlayBack.setHls(String.format("http://%s:%s/rtp/%s/hls.m3u8", mediaInfo.getWanIp(), mediaInfo.getHttpPort(), streamId));
// streamInfoForPlayBack.setRtsp(String.format("rtsp://%s:%s/rtp/%s", mediaInfo.getWanIp(), mediaInfo.getRtspPort(), streamId));
// storager.startPlayback(streamInfoForPlayBack);
// }
// }
// TODO Auto-generated method stub

View File

@@ -0,0 +1,88 @@
package com.genersoft.iot.vmp.media.zlm;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.MediaServerConfig;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletRequest;
import java.math.BigInteger;
import java.text.DecimalFormat;
import java.util.HashMap;
import java.util.Map;
/**
* @Description:针对 ZLMediaServer的hook事件订阅
* @author: pan
* @date: 2020年12月2日 21:17:32
*/
@Component
public class ZLMHttpHookSubscribe {
private final static Logger logger = LoggerFactory.getLogger(ZLMHttpHookSubscribe.class);
public enum HookType{
on_flow_report,
on_http_access,
on_play,
on_publish,
on_record_mp4,
on_rtsp_auth,
on_rtsp_realm,
on_shell_login,
on_stream_changed,
on_stream_none_reader,
on_stream_not_found,
on_server_started
}
public interface Event{
void response(JSONObject response);
}
private Map<HookType, Map<JSONObject, ZLMHttpHookSubscribe.Event>> allSubscribes = new HashMap<>();
public void addSubscribe(HookType type, JSONObject hookResponse, ZLMHttpHookSubscribe.Event event) {
Map<JSONObject, Event> eventMap = allSubscribes.get(type);
if (eventMap == null) {
eventMap = new HashMap<JSONObject, Event>();
allSubscribes.put(type,eventMap);
}
eventMap.put(hookResponse, event);
}
public ZLMHttpHookSubscribe.Event getSubscribe(HookType type, JSONObject hookResponse) {
ZLMHttpHookSubscribe.Event event= null;
Map<JSONObject, Event> eventMap = allSubscribes.get(type);
if (eventMap == null) {
return null;
}
for (JSONObject key : eventMap.keySet()) {
Boolean result = null;
for (String s : key.keySet()) {
String string = hookResponse.getString(s);
String string1 = key.getString(s);
if (result == null) {
result = key.getString(s).equals(hookResponse.getString(s));
}else {
result = result && key.getString(s).equals(hookResponse.getString(s));
}
}
if (result) {
event = eventMap.get(key);
}
}
return event;
}
}

View File

@@ -4,7 +4,10 @@ import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.MediaServerConfig;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.vmanager.service.IPlayService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -22,6 +25,10 @@ import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import org.springframework.web.context.request.async.DeferredResult;
import java.text.DecimalFormat;
import java.util.UUID;
@CrossOrigin
@RestController
@@ -39,95 +46,56 @@ public class PlayController {
@Autowired
private ZLMRESTfulUtils zlmresTfulUtils;
@Value("${media.closeWaitRTPInfo}")
private boolean closeWaitRTPInfo;
@Autowired
private DeferredResultHolder resultHolder;
@Autowired
private IPlayService playService;
@GetMapping("/play/{deviceId}/{channelId}")
public ResponseEntity<String> play(@PathVariable String deviceId, @PathVariable String channelId,
Integer getEncoding) {
public DeferredResult<ResponseEntity<String>> play(@PathVariable String deviceId,
@PathVariable String channelId) {
if (getEncoding == null) getEncoding = 0;
getEncoding = closeWaitRTPInfo ? 0 : getEncoding;
Device device = storager.queryVideoDevice(deviceId);
StreamInfo streamInfo = storager.queryPlayByDevice(deviceId, channelId);
UUID uuid = UUID.randomUUID();
DeferredResult<ResponseEntity<String>> result = new DeferredResult<ResponseEntity<String>>();
// 超时处理
result.onTimeout(()->{
RequestMessage msg = new RequestMessage();
msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid);
msg.setData("Timeout");
resultHolder.invokeResult(msg);
});
// 录像查询以channelId作为deviceId查询
resultHolder.put(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid, result);
if (streamInfo == null) {
streamInfo = cmder.playStreamCmd(device, channelId);
// TODO playStreamCmd 超时处理
cmder.playStreamCmd(device, channelId, (JSONObject response) -> {
logger.info("收到订阅消息: " + response.toJSONString());
playService.onPublishHandlerForPlay(response, deviceId, channelId, uuid.toString());
});
} else {
String streamId = String.format("%08x", Integer.parseInt(streamInfo.getSsrc())).toUpperCase();
JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(streamId);
if (rtpInfo.getBoolean("exist")) {
return new ResponseEntity<String>(JSON.toJSONString(streamInfo), HttpStatus.OK);
RequestMessage msg = new RequestMessage();
msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid);
msg.setData(JSON.toJSONString(streamInfo));
resultHolder.invokeResult(msg);
} else {
storager.stopPlay(streamInfo);
streamInfo = cmder.playStreamCmd(device, channelId);
// TODO playStreamCmd 超时处理
cmder.playStreamCmd(device, channelId, (JSONObject response) -> {
logger.info("收到订阅消息: " + response.toJSONString());
playService.onPublishHandlerForPlay(response, deviceId, channelId, uuid.toString());
});
}
}
String streamId = String.format("%08x", Integer.parseInt(streamInfo.getSsrc())).toUpperCase();
// 等待推流, TODO 默认超时30s
boolean lockFlag = true;
boolean rtpPushed = false;
long startTime = System.currentTimeMillis();
JSONObject rtpInfo = null;
if (getEncoding == 1) {
while (lockFlag) {
try {
if (System.currentTimeMillis() - startTime > 60 * 1000) {
storager.stopPlay(streamInfo);
logger.info("播放等待超时");
return new ResponseEntity<String>("timeout", HttpStatus.OK);
} else {
streamInfo = storager.queryPlayByDevice(deviceId, channelId);
if (!rtpPushed) {
logger.info("查询RTP推流信息...");
rtpInfo = zlmresTfulUtils.getRtpInfo(streamId);
}
if (rtpInfo != null && rtpInfo.getBoolean("exist") && streamInfo != null
&& streamInfo.getFlv() != null) {
logger.info("查询流编码信息:" + streamInfo.getFlv());
rtpPushed = true;
Thread.sleep(2000);
JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo("rtp", "rtmp", streamId);
if (mediaInfo.getInteger("code") == 0 && mediaInfo.getBoolean("online")) {
lockFlag = false;
logger.info("流编码信息已获取");
JSONArray tracks = mediaInfo.getJSONArray("tracks");
logger.info(tracks.toJSONString());
streamInfo.setTracks(tracks);
storager.startPlay(streamInfo);
} else {
logger.info("流编码信息未获取2秒后重试...");
}
} else {
Thread.sleep(2000);
continue;
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} else {
String flv = storager.getMediaInfo().getWanIp() + ":" + storager.getMediaInfo().getHttpPort() + "/rtp/"
+ streamId + ".flv";
streamInfo.setFlv("http://" + flv);
streamInfo.setWs_flv("ws://" + flv);
storager.startPlay(streamInfo);
}
if (logger.isDebugEnabled()) {
logger.debug(String.format("设备预览 API调用deviceId%s channelId%s", deviceId, channelId));
logger.debug("设备预览 API调用ssrc" + streamInfo.getSsrc() + ",ZLMedia streamId:"
+ Integer.toHexString(Integer.parseInt(streamInfo.getSsrc())));
}
if (streamInfo != null) {
return new ResponseEntity<String>(JSON.toJSONString(streamInfo), HttpStatus.OK);
} else {
logger.warn("设备预览API调用失败");
return new ResponseEntity<String>(HttpStatus.INTERNAL_SERVER_ERROR);
}
return result;
}
@PostMapping("/play/{ssrc}/stop")
@@ -180,10 +148,20 @@ public class PlayController {
result.put("code", 0);
JSONObject data = jsonObject.getJSONObject("data");
if (data != null) {
result.put("key", data.getString("key"));
result.put("rtmp", dstUrl);
result.put("flv", String.format("http://%s:%s/convert/%s.flv", mediaInfo.getWanIp(), mediaInfo.getHttpPort(), streamId));
result.put("ws_flv", String.format("ws://%s:%s/convert/%s.flv", mediaInfo.getWanIp(), mediaInfo.getHttpPort(), streamId));
result.put("key", data.getString("key"));
StreamInfo streamInfoResult = new StreamInfo();
streamInfoResult.setRtmp(dstUrl);
streamInfoResult.setRtsp(String.format("rtsp://%s:%s/convert/%s", mediaInfo.getWanIp(), mediaInfo.getRtspPort(), streamId));
streamInfoResult.setStreamId(streamId);
streamInfoResult.setFlv(String.format("http://%s:%s/convert/%s.flv", mediaInfo.getWanIp(), mediaInfo.getHttpPort(), streamId));
streamInfoResult.setWs_flv(String.format("ws://%s:%s/convert/%s.flv", mediaInfo.getWanIp(), mediaInfo.getHttpPort(), streamId));
streamInfoResult.setHls(String.format("http://%s:%s/convert/%s/hls.m3u8", mediaInfo.getWanIp(), mediaInfo.getHttpPort(), streamId));
streamInfoResult.setWs_hls(String.format("ws://%s:%s/convert/%s/hls.m3u8", mediaInfo.getWanIp(), mediaInfo.getHttpPort(), streamId));
streamInfoResult.setFmp4(String.format("http://%s:%s/convert/%s.live.mp4", mediaInfo.getWanIp(), mediaInfo.getHttpPort(), streamId));
streamInfoResult.setWs_fmp4(String.format("ws://%s:%s/convert/%s.live.mp4", mediaInfo.getWanIp(), mediaInfo.getHttpPort(), streamId));
streamInfoResult.setTs(String.format("http://%s:%s/convert/%s.live.ts", mediaInfo.getWanIp(), mediaInfo.getHttpPort(), streamId));
streamInfoResult.setWs_ts(String.format("ws://%s:%s/convert/%s.live.ts", mediaInfo.getWanIp(), mediaInfo.getHttpPort(), streamId));
result.put("data", streamInfoResult);
}
}else {
result.put("code", 1);

View File

@@ -3,7 +3,10 @@ package com.genersoft.iot.vmp.vmanager.playback;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.vmanager.service.IPlayService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -22,6 +25,9 @@ import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import org.springframework.web.context.request.async.DeferredResult;
import java.util.UUID;
@CrossOrigin
@RestController
@@ -39,105 +45,41 @@ public class PlaybackController {
@Autowired
private ZLMRESTfulUtils zlmresTfulUtils;
@Value("${media.closeWaitRTPInfo}")
private boolean closeWaitRTPInfo;
@Autowired
private IPlayService playService;
@Autowired
private DeferredResultHolder resultHolder;
@GetMapping("/playback/{deviceId}/{channelId}")
public ResponseEntity<String> play(@PathVariable String deviceId, @PathVariable String channelId, String startTime,
String endTime) {
public DeferredResult<ResponseEntity<String>> play(@PathVariable String deviceId, @PathVariable String channelId, String startTime,
String endTime) {
if (logger.isDebugEnabled()) {
logger.debug(String.format("设备回放 API调用deviceId%s channelId%s", deviceId, channelId));
}
if (StringUtils.isEmpty(deviceId) || StringUtils.isEmpty(channelId)) {
String log = String.format("设备回放 API调用失败deviceId%s channelId%s", deviceId, channelId);
logger.warn(log);
return new ResponseEntity<String>(log, HttpStatus.BAD_REQUEST);
}
UUID uuid = UUID.randomUUID();
DeferredResult<ResponseEntity<String>> result = new DeferredResult<ResponseEntity<String>>();
// 超时处理
result.onTimeout(()->{
RequestMessage msg = new RequestMessage();
msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid);
msg.setData("Timeout");
resultHolder.invokeResult(msg);
});
Device device = storager.queryVideoDevice(deviceId);
StreamInfo streamInfo = storager.queryPlaybackByDevice(deviceId, channelId);
if (streamInfo != null) {
// 停止之前的回放
cmder.streamByeCmd(streamInfo.getSsrc());
}
resultHolder.put(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid, result);
cmder.playbackStreamCmd(device, channelId, startTime, endTime, (JSONObject response) -> {
logger.info("收到订阅消息: " + response.toJSONString());
playService.onPublishHandlerForPlayBack(response, deviceId, channelId, uuid.toString());
});
// }else {
// String streamId = String.format("%08x",
// Integer.parseInt(streamInfo.getSsrc())).toUpperCase();
// JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(streamId);
// if (rtpInfo.getBoolean("exist")) {
// return new
// ResponseEntity<String>(JSON.toJSONString(streamInfo),HttpStatus.OK);
// }else {
// storager.stopPlayback(streamInfo);
// streamInfo = cmder.playbackStreamCmd(device, channelId, startTime, endTime);
// }
// }
streamInfo = cmder.playbackStreamCmd(device, channelId, startTime, endTime);
String streamId = String.format("%08x", Integer.parseInt(streamInfo.getSsrc())).toUpperCase();
if (logger.isDebugEnabled()) {
logger.debug("设备回放 API调用ssrc" + streamInfo.getSsrc() + ",ZLMedia streamId:" + streamId);
}
// 等待推流, TODO 默认超时15s
boolean lockFlag = true;
boolean rtpPushed = false;
long lockStartTime = System.currentTimeMillis();
JSONObject rtpInfo = null;
if (closeWaitRTPInfo) {
String flv = storager.getMediaInfo().getWanIp() + ":" + storager.getMediaInfo().getHttpPort() + "/rtp/"
+ streamId + ".flv";
streamInfo.setFlv("http://" + flv);
streamInfo.setWs_flv("ws://" + flv);
storager.startPlayback(streamInfo);
} else {
while (lockFlag) {
try {
if (System.currentTimeMillis() - lockStartTime > 75 * 1000) {
storager.stopPlayback(streamInfo);
logger.info("播放等待超时");
return new ResponseEntity<String>("timeout", HttpStatus.OK);
} else {
streamInfo = storager.queryPlaybackByDevice(deviceId, channelId);
if (!rtpPushed) {
logger.info("查询RTP推流信息...");
rtpInfo = zlmresTfulUtils.getRtpInfo(streamId);
}
if (rtpInfo != null && rtpInfo.getBoolean("exist") && streamInfo != null
&& streamInfo.getFlv() != null) {
logger.info("查询流编码信息:" + streamInfo.getFlv());
rtpPushed = true;
Thread.sleep(2000);
JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo("rtp", "rtmp", streamId);
if (mediaInfo.getInteger("code") == 0 && mediaInfo.getBoolean("online")) {
lockFlag = false;
logger.info("流编码信息已获取");
JSONArray tracks = mediaInfo.getJSONArray("tracks");
streamInfo.setTracks(tracks);
storager.startPlayback(streamInfo);
} else {
logger.info("流编码信息未获取2秒后重试...");
}
} else {
Thread.sleep(2000);
continue;
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
if (streamInfo != null) {
return new ResponseEntity<String>(JSON.toJSONString(streamInfo), HttpStatus.OK);
} else {
logger.warn("设备回放API调用失败");
return new ResponseEntity<String>(HttpStatus.INTERNAL_SERVER_ERROR);
}
return result;
}
@RequestMapping("/playback/{ssrc}/stop")

View File

@@ -0,0 +1,13 @@
package com.genersoft.iot.vmp.vmanager.service;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
/**
* 点播处理
*/
public interface IPlayService {
void onPublishHandlerForPlayBack(JSONObject resonse, String deviceId, String channelId, String uuid);
void onPublishHandlerForPlay(JSONObject resonse, String deviceId, String channelId, String uuid);
}

View File

@@ -0,0 +1,90 @@
package com.genersoft.iot.vmp.vmanager.service.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.MediaServerConfig;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import com.genersoft.iot.vmp.vmanager.play.PlayController;
import com.genersoft.iot.vmp.vmanager.service.IPlayService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.text.DecimalFormat;
@Service
public class PlayServiceImpl implements IPlayService {
private final static Logger logger = LoggerFactory.getLogger(PlayServiceImpl.class);
@Autowired
private IVideoManagerStorager storager;
@Autowired
private DeferredResultHolder resultHolder;
@Override
public void onPublishHandlerForPlay(JSONObject resonse, String deviceId, String channelId, String uuid) {
RequestMessage msg = new RequestMessage();
msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid);
StreamInfo streamInfo = onPublishHandler(resonse, deviceId, channelId, uuid);
if (streamInfo != null) {
storager.startPlay(streamInfo);
msg.setData(JSON.toJSONString(streamInfo));
resultHolder.invokeResult(msg);
} else {
logger.warn("设备预览API调用失败");
msg.setData("设备预览API调用失败");
resultHolder.invokeResult(msg);
}
}
@Override
public void onPublishHandlerForPlayBack(JSONObject resonse, String deviceId, String channelId, String uuid) {
RequestMessage msg = new RequestMessage();
msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid);
StreamInfo streamInfo = onPublishHandler(resonse, deviceId, channelId, uuid);
if (streamInfo != null) {
storager.startPlayback(streamInfo);
msg.setData(JSON.toJSONString(streamInfo));
resultHolder.invokeResult(msg);
} else {
logger.warn("设备预览API调用失败");
msg.setData("设备预览API调用失败");
resultHolder.invokeResult(msg);
}
}
public StreamInfo onPublishHandler(JSONObject resonse, String deviceId, String channelId, String uuid) {
String streamId = resonse.getString("id");
String ssrc = new DecimalFormat("0000000000").format(Integer.parseInt(streamId, 16));
StreamInfo streamInfo = new StreamInfo();
streamInfo.setSsrc(ssrc);
streamInfo.setStreamId(streamId);
streamInfo.setDeviceID(deviceId);
streamInfo.setCahnnelId(channelId);
MediaServerConfig mediaServerConfig = storager.getMediaInfo();
streamInfo.setFlv(String.format("http://%s:%s/rtp/%s.flv", mediaServerConfig.getWanIp(), mediaServerConfig.getHttpPort(), streamId));
streamInfo.setWs_flv(String.format("ws://%s:%s/rtp/%s.flv", mediaServerConfig.getWanIp(), mediaServerConfig.getHttpPort(), streamId));
streamInfo.setFmp4(String.format("http://%s:%s/rtp/%s.live.mp4", mediaServerConfig.getWanIp(), mediaServerConfig.getHttpPort(), streamId));
streamInfo.setWs_fmp4(String.format("ws://%s:%s/rtp/%s.live.mp4", mediaServerConfig.getWanIp(), mediaServerConfig.getHttpPort(), streamId));
streamInfo.setHls(String.format("http://%s:%s/rtp/%s/hls.m3u8", mediaServerConfig.getWanIp(), mediaServerConfig.getHttpPort(), streamId));
streamInfo.setWs_hls(String.format("ws://%s:%s/rtp/%s/hls.m3u8", mediaServerConfig.getWanIp(), mediaServerConfig.getHttpPort(), streamId));
streamInfo.setTs(String.format("http://%s:%s/rtp/%s.live.ts", mediaServerConfig.getWanIp(), mediaServerConfig.getHttpPort(), streamId));
streamInfo.setWs_ts(String.format("ws://%s:%s/rtp/%s.live.ts", mediaServerConfig.getWanIp(), mediaServerConfig.getHttpPort(), streamId));
streamInfo.setRtmp(String.format("rtmp://%s:%s/rtp/%s", mediaServerConfig.getWanIp(), mediaServerConfig.getRtmpPort(), streamId));
streamInfo.setRtsp(String.format("rtsp://%s:%s/rtp/%s", mediaServerConfig.getWanIp(), mediaServerConfig.getRtspPort(), streamId));
return streamInfo;
}
}

View File

@@ -34,8 +34,7 @@ public class ApiStreamController {
@Autowired
private IVideoManagerStorager storager;
@Value("${media.closeWaitRTPInfo}")
private boolean closeWaitRTPInfo;
private boolean closeWaitRTPInfo = false;
@Autowired
@@ -94,7 +93,7 @@ public class ApiStreamController {
StreamInfo streamInfo = storager.queryPlayByDevice(device.getDeviceId(), code);
if (streamInfo == null) {
logger.debug("streamInfo 等于null, 重新点播");
streamInfo = cmder.playStreamCmd(device, code);
// streamInfo = cmder.playStreamCmd(device, code);
}else {
logger.debug("streamInfo 不等于null, 向流媒体查询是否正在推流");
String streamId = String.format("%08x", Integer.parseInt(streamInfo.getSsrc())).toUpperCase();
@@ -136,7 +135,7 @@ public class ApiStreamController {
} else {
logger.debug("向流媒体查询没有推流, 重新点播");
storager.stopPlay(streamInfo);
streamInfo = cmder.playStreamCmd(device, code);
// streamInfo = cmder.playStreamCmd(device, code);
}
}