abl 支持拉流代理

This commit is contained in:
lin
2025-09-08 16:09:01 +08:00
parent 6ebeef33ae
commit 18be8aff6c
11 changed files with 136 additions and 1827 deletions

View File

@@ -1010,7 +1010,7 @@ public class PlayServiceImpl implements IPlayService {
return;
}
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
log.info("[录像下载] 调用成功 deviceId: {}, channelId: {}, 开始时间: {}, 结束时间: {}", device.getDeviceId(), channel, startTime, endTime);
log.info("[录像下载] 调用成功 deviceId: {}, channelId: {}, 开始时间: {}, 结束时间: {}", device.getDeviceId(), channel.getDeviceId(), startTime, endTime);
}else {
if (callback != null) {
callback.run(code, msg, null);

View File

@@ -8,13 +8,13 @@ import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService;
import com.genersoft.iot.vmp.media.abl.bean.AblServerConfig;
import com.genersoft.iot.vmp.media.abl.bean.hook.OnStreamArriveABLHookParam;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
import com.genersoft.iot.vmp.media.event.media.MediaRecordMp4Event;
import com.genersoft.iot.vmp.media.event.media.MediaRecordProcessEvent;
import com.genersoft.iot.vmp.media.service.IMediaNodeServerService;
@@ -23,6 +23,7 @@ import com.genersoft.iot.vmp.service.bean.DownloadFileInfo;
import com.genersoft.iot.vmp.storager.dao.CloudRecordServiceMapper;
import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,7 +77,7 @@ public class ABLMediaNodeServerService implements IMediaNodeServerService {
logger.info("关闭RTP Server " + jsonObject);
if (jsonObject != null ) {
if (jsonObject.getInteger("code") != 0) {
logger.error("[closeRtpServer] 失败: " + jsonObject.getString("msg"));
logger.error("[closeRtpServer] 失败: " + jsonObject.getString("memo"));
}
}else {
// 检查ZLM状态
@@ -99,7 +100,7 @@ public class ABLMediaNodeServerService implements IMediaNodeServerService {
logger.info("关闭RTP Server " + jsonObject);
if (jsonObject != null ) {
if (jsonObject.getInteger("code") != 0) {
logger.error("[closeRtpServer] 失败: " + jsonObject.getString("msg"));
logger.error("[closeRtpServer] 失败: " + jsonObject.getString("memo"));
}
}else {
// 检查ZLM状态
@@ -115,7 +116,7 @@ public class ABLMediaNodeServerService implements IMediaNodeServerService {
JSONObject jsonObject = ablresTfulUtils.closeStreams(mediaServer, app, streamId);
if (jsonObject != null ) {
if (jsonObject.getInteger("code") != 0) {
logger.error("[closeStreams] 失败: " + jsonObject.getString("msg"));
logger.error("[closeStreams] 失败: " + jsonObject.getString("memo"));
}
}else {
// 检查ZLM状态
@@ -264,32 +265,24 @@ public class ABLMediaNodeServerService implements IMediaNodeServerService {
@Override
public String getFfmpegCmd(MediaServer mediaServer, String cmdKey) {
logger.warn("[abl-getFfmpegCmd] 未实现");
return null;
}
@Override
public WVPResult<String> addFFmpegSource(MediaServer mediaServer, String srcUrl, String dstUrl, int timeoutMs, boolean enableAudio, boolean enableMp4, String ffmpegCmdKey) {
logger.warn("[abl-addFFmpegSource] 未实现");
return null;
return "";
}
@Override
public Boolean delFFmpegSource(MediaServer mediaServer, String streamKey) {
logger.warn("[abl-delFFmpegSource] 未实现");
return null;
JSONObject jsonObject = ablresTfulUtils.delFFmpegProxy(mediaServer, streamKey);
return jsonObject.getInteger("code") == 0;
}
@Override
public Boolean delStreamProxy(MediaServer mediaServer, String streamKey) {
logger.warn("[abl-delStreamProxy] 未实现");
return null;
JSONObject jsonObject = ablresTfulUtils.delStreamProxy(mediaServer, streamKey);
return jsonObject.getInteger("code") == 0;
}
@Override
public Map<String, String> getFFmpegCMDs(MediaServer mediaServer) {
logger.warn("[abl-getFFmpegCMDs] 未实现");
return null;
return new HashMap<>();
}
// 接受进度通知
@@ -352,8 +345,13 @@ public class ABLMediaNodeServerService implements IMediaNodeServerService {
@Override
public WVPResult<String> addStreamProxy(MediaServer mediaServer, String app, String stream, String url, boolean enableAudio, boolean enableMp4, String rtpType, Integer timeout) {
logger.warn("[abl-addStreamProxy] 未实现");
return null;
JSONObject jsonObject = ablresTfulUtils.addStreamProxy(mediaServer, app, stream, url, !enableAudio, enableMp4, rtpType, timeout);
if (jsonObject.getInteger("code") != 0) {
return WVPResult.fail(ErrorCode.ERROR100.getCode(), jsonObject.getString("memo"));
}else {
return WVPResult.success(jsonObject.getString("key"));
}
}
@Override
@@ -374,18 +372,70 @@ public class ABLMediaNodeServerService implements IMediaNodeServerService {
}
@Override
public void startProxy(MediaServer mediaServer, StreamProxy streamProxy) {
logger.warn("[abl-startProxy] 未实现");
public String startProxy(MediaServer mediaServer, StreamProxy streamProxy) {
MediaInfo mediaInfo = getMediaInfo(mediaServer, streamProxy.getApp(), streamProxy.getStream());
if (mediaInfo != null) {
closeStreams(mediaServer, streamProxy.getApp(), streamProxy.getStream());
}
JSONObject jsonObject = null;
if ("ffmpeg".equalsIgnoreCase(streamProxy.getType())){
if (streamProxy.getTimeout() == 0) {
streamProxy.setTimeout(15);
}
jsonObject = ablresTfulUtils.addFFmpegProxy(mediaServer, streamProxy.getApp(), streamProxy.getStream(), streamProxy.getSrcUrl().trim(),
!streamProxy.isEnableAudio(), streamProxy.isEnableMp4(), streamProxy.getRtspType(), streamProxy.getTimeout());
}else {
jsonObject = ablresTfulUtils.addStreamProxy(mediaServer, streamProxy.getApp(), streamProxy.getStream(), streamProxy.getSrcUrl().trim(),
streamProxy.isEnableAudio(), streamProxy.isEnableMp4(), streamProxy.getRtspType(), streamProxy.getTimeout());
}
if (jsonObject == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "请求失败");
}else if (jsonObject.getInteger("code") != 0) {
throw new ControllerException(jsonObject.getInteger("code"), jsonObject.getString("memo"));
}else {
String key = jsonObject.getString("key");
if (key == null) {
throw new ControllerException(jsonObject.getInteger("code"), "代理结果异常: " + jsonObject);
}else {
return key;
}
}
}
@Override
public void stopProxy(MediaServer mediaServer, String streamKey) {
logger.warn("[abl-stopProxy] 未实现");
public void stopProxy(MediaServer mediaServer, String streamKey, String type) {
JSONObject jsonObject = null;
if ("ffmpeg".equalsIgnoreCase(type)){
jsonObject = ablresTfulUtils.delFFmpegProxy(mediaServer, streamKey);
}else {
jsonObject = ablresTfulUtils.delStreamProxy(mediaServer, streamKey);
}
if (jsonObject == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "请求失败");
}else if (jsonObject.getInteger("code") != 0) {
throw new ControllerException(jsonObject.getInteger("code"), jsonObject.getString("memo"));
}
}
@Override
public List<String> listRtpServer(MediaServer mediaServer) {
return Collections.emptyList();
JSONObject jsonObject = ablresTfulUtils.getMediaList(mediaServer, "rtp", null);
if (jsonObject == null || jsonObject.getInteger("code") != 0) {
return null;
}
JSONArray mediaList = jsonObject.getJSONArray("mediaList");
if (mediaList == null || mediaList.isEmpty()) {
return new ArrayList<>();
}
List<String> result = new ArrayList<>();
for (int i = 0; i < mediaList.size(); i++) {
JSONObject mediaJSON = mediaList.getJSONObject(i);
result.add(mediaJSON.getString("stream"));
}
return result;
}
@Override

View File

@@ -4,7 +4,6 @@ import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import okhttp3.*;
import okhttp3.logging.HttpLoggingInterceptor;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,14 +47,6 @@ public class ABLRESTfulUtils {
httpClientBuilder.readTimeout(readTimeOut,TimeUnit.SECONDS);
// 设置连接池
httpClientBuilder.connectionPool(new ConnectionPool(16, 5, TimeUnit.MINUTES));
if (logger.isDebugEnabled()) {
HttpLoggingInterceptor logging = new HttpLoggingInterceptor(message -> {
logger.debug("http请求参数" + message);
});
logging.setLevel(HttpLoggingInterceptor.Level.BASIC);
// OkHttp進行添加攔截器loggingInterceptor
httpClientBuilder.addInterceptor(logging);
}
client = httpClientBuilder.build();
}
return client;
@@ -376,7 +367,9 @@ public class ABLRESTfulUtils {
public JSONObject getMediaList(MediaServer mediaServer, String app, String stream) {
Map<String, Object> param = new HashMap<>();
param.put("app", app);
param.put("stream", stream);
if (stream != null) {
param.put("stream", stream);
}
return sendPost(mediaServer,"getMediaList", param, null);
}
@@ -404,4 +397,38 @@ public class ABLRESTfulUtils {
}
public JSONObject addStreamProxy(MediaServer mediaServer, String app, String stream, String url, boolean disableAudio, boolean enableMp4, String rtpType, Integer timeout) {
Map<String, Object> param = new HashMap<>();
param.put("app", app);
param.put("stream", stream);
param.put("url", url);
param.put("disableAudio", disableAudio? "1" : "0");
param.put("enable_mp4", enableMp4 ? "1" : "0");
// TODO rtpType timeout 尚不支持
return sendPost(mediaServer,"addStreamProxy", param, null);
}
public JSONObject addFFmpegProxy(MediaServer mediaServer, String app, String stream, String url, boolean disableAudio, boolean enableMp4, String rtpType, Integer timeout) {
Map<String, Object> param = new HashMap<>();
param.put("app", app);
param.put("stream", stream);
param.put("url", url);
param.put("disableAudio", disableAudio);
param.put("enable_mp4", enableMp4);
// TODO rtpType timeout 尚不支持
return sendPost(mediaServer,"addFFmpegProxy", param, null);
}
public JSONObject delStreamProxy(MediaServer mediaServer, String streamKey) {
Map<String, Object> param = new HashMap<>();
param.put("key", streamKey);
return sendPost(mediaServer,"delStreamProxy", param, null);
}
public JSONObject delFFmpegProxy(MediaServer mediaServer, String streamKey) {
Map<String, Object> param = new HashMap<>();
param.put("key", streamKey);
return sendPost(mediaServer,"delFFmpegProxy", param, null);
}
}

View File

@@ -52,8 +52,6 @@ public interface IMediaNodeServerService {
String getFfmpegCmd(MediaServer mediaServer, String cmdKey);
WVPResult<String> addFFmpegSource(MediaServer mediaServer, String srcUrl, String dstUrl, int timeoutMs, boolean enableAudio, boolean enableMp4, String ffmpegCmdKey);
WVPResult<String> addStreamProxy(MediaServer mediaServer, String app, String stream, String url, boolean enableAudio, boolean enableMp4, String rtpType, Integer timeout);
Boolean delFFmpegSource(MediaServer mediaServer, String streamKey);
@@ -70,9 +68,9 @@ public interface IMediaNodeServerService {
Long updateDownloadProcess(MediaServer mediaServer, String app, String stream);
void startProxy(MediaServer mediaServer, StreamProxy streamProxy);
String startProxy(MediaServer mediaServer, StreamProxy streamProxy);
void stopProxy(MediaServer mediaServer, String streamKey);
void stopProxy(MediaServer mediaServer, String streamKey, String type);
List<String> listRtpServer(MediaServer mediaServer);

View File

@@ -101,8 +101,6 @@ public interface IMediaServerService {
void closeStreams(MediaServer mediaServerItem, String app, String stream);
WVPResult<String> addFFmpegSource(MediaServer mediaServerItem, String srcUrl, String dstUrl, int timeoutMs, boolean enableAudio, boolean enableMp4, String ffmpegCmdKey);
WVPResult<String> addStreamProxy(MediaServer mediaServerItem, String app, String stream, String url, boolean enableAudio, boolean enableMp4, String rtpType, Integer timeout);
Boolean delFFmpegSource(MediaServer mediaServerItem, String streamKey);
@@ -156,9 +154,9 @@ public interface IMediaServerService {
Long updateDownloadProcess(MediaServer mediaServerItem, String app, String stream);
void startProxy(MediaServer mediaServer, StreamProxy streamProxy);
String startProxy(MediaServer mediaServer, StreamProxy streamProxy);
void stopProxy(MediaServer mediaServer, String streamKey);
void stopProxy(MediaServer mediaServer, String streamKey, String type);
StreamInfo getMediaByAppAndStream(String app, String stream);

View File

@@ -760,16 +760,6 @@ public class MediaServerServiceImpl implements IMediaServerService {
mediaNodeServerService.closeStreams(mediaServer, app, stream);
}
@Override
public WVPResult<String> addFFmpegSource(MediaServer mediaServer, String srcUrl, String dstUrl, int timeoutMs, boolean enableAudio, boolean enableMp4, String ffmpegCmdKey) {
IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
if (mediaNodeServerService == null) {
log.info("[addFFmpegSource] 失败, mediaServer的类型 {},未找到对应的实现类", mediaServer.getType());
return WVPResult.fail(ErrorCode.ERROR400);
}
return mediaNodeServerService.addFFmpegSource(mediaServer, srcUrl, dstUrl, timeoutMs, enableAudio, enableMp4, ffmpegCmdKey);
}
@Override
public WVPResult<String> addStreamProxy(MediaServer mediaServer, String app, String stream, String url,
boolean enableAudio, boolean enableMp4, String rtpType, Integer timeout) {
@@ -998,23 +988,23 @@ public class MediaServerServiceImpl implements IMediaServerService {
}
@Override
public void startProxy(MediaServer mediaServer, StreamProxy streamProxy) {
public String startProxy(MediaServer mediaServer, StreamProxy streamProxy) {
IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
if (mediaNodeServerService == null) {
log.info("[startProxy] 失败, mediaServer的类型 {},未找到对应的实现类", mediaServer.getType());
throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到mediaServer对应的实现类");
}
mediaNodeServerService.startProxy(mediaServer, streamProxy);
return mediaNodeServerService.startProxy(mediaServer, streamProxy);
}
@Override
public void stopProxy(MediaServer mediaServer, String streamKey) {
public void stopProxy(MediaServer mediaServer, String streamKey, String type) {
IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
if (mediaNodeServerService == null) {
log.info("[stopProxy] 失败, mediaServer的类型 {},未找到对应的实现类", mediaServer.getType());
throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到mediaServer对应的实现类");
}
mediaNodeServerService.stopProxy(mediaServer, streamKey);
mediaNodeServerService.stopProxy(mediaServer, streamKey, type);
}
@Override

View File

@@ -301,22 +301,6 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService {
return mediaServerConfig.getString(cmdKey);
}
@Override
public WVPResult<String> addFFmpegSource(MediaServer mediaServer, String srcUrl, String dstUrl, int timeoutMs, boolean enableAudio, boolean enableMp4, String ffmpegCmdKey) {
JSONObject jsonObject = zlmresTfulUtils.addFFmpegSource(mediaServer, srcUrl, dstUrl, timeoutMs, enableAudio, enableMp4, ffmpegCmdKey);
if (jsonObject.getInteger("code") != 0) {
log.warn("[getFfmpegCmd] 添加FFMPEG代理失败");
return WVPResult.fail(ErrorCode.ERROR100.getCode(), "添加FFMPEG代理失败");
}else {
JSONObject data = jsonObject.getJSONObject("data");
if (data == null) {
return WVPResult.fail(ErrorCode.ERROR100.getCode(), "代理结果异常: " + jsonObject);
}else {
return WVPResult.success(data.getString("key"));
}
}
}
@Override
public WVPResult<String> addStreamProxy(MediaServer mediaServer, String app, String stream, String url,
boolean enableAudio, boolean enableMp4, String rtpType, Integer timeout) {
@@ -461,7 +445,7 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService {
}
@Override
public void startProxy(MediaServer mediaServer, StreamProxy streamProxy) {
public String startProxy(MediaServer mediaServer, StreamProxy streamProxy) {
String dstUrl;
if ("ffmpeg".equalsIgnoreCase(streamProxy.getType())) {
@@ -522,6 +506,8 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService {
JSONObject data = jsonObject.getJSONObject("data");
if (data == null) {
throw new ControllerException(jsonObject.getInteger("code"), "代理结果异常: " + jsonObject);
}else {
return data.getString("key");
}
}
}
@@ -546,7 +532,7 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService {
}
@Override
public void stopProxy(MediaServer mediaServer, String streamKey) {
public void stopProxy(MediaServer mediaServer, String streamKey, String type) {
JSONObject jsonObject = zlmresTfulUtils.delStreamProxy(mediaServer, streamKey);
if (jsonObject == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "请求失败");

View File

@@ -247,6 +247,7 @@ public class ZLMMediaServerStatusManager {
}else {
mediaServerItem.setTranscodeSuffix(zlmServerConfig.getTranscodeSuffix());
}
mediaServerItem.setRtpProxyPort(zlmServerConfig.getRtpProxyPort());
mediaServerItem.setHookAliveInterval(10F);
}
@@ -288,8 +289,6 @@ public class ZLMMediaServerStatusManager {
// 等zlm支持给每个rtpServer设置关闭音频的时候可以不设置此选项
if (mediaServerItem.isRtpEnable() && !ObjectUtils.isEmpty(mediaServerItem.getRtpPortRange())) {
param.put("rtp_proxy.port_range", mediaServerItem.getRtpPortRange().replace(",", "-"));
}else {
param.put("rtp_proxy.port", mediaServerItem.getRtpProxyPort());
}
if (!ObjectUtils.isEmpty(mediaServerItem.getRecordPath())) {

View File

@@ -117,13 +117,14 @@ public class StreamProxyPlayServiceImpl implements IStreamProxyPlayService {
// hook响应
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
subscribe.removeSubscribe(rtpHook);
streamProxy.setPulling(true);
streamProxyMapper.updateStream(streamProxy);
});
mediaServerService.startProxy(mediaServer, streamProxy);
if (mediaServerId == null || !mediaServerId.equals(mediaServer.getId())) {
streamProxy.setMediaServerId(mediaServer.getId());
streamProxyMapper.updateStream(streamProxy);
}
String key = mediaServerService.startProxy(mediaServer, streamProxy);
streamProxy.setStreamKey(key);
streamProxy.setMediaServerId(mediaServer.getId());
streamProxyMapper.updateStream(streamProxy);
}
@Override
@@ -152,7 +153,7 @@ public class StreamProxyPlayServiceImpl implements IStreamProxyPlayService {
if (ObjectUtils.isEmpty(streamProxy.getStreamKey())) {
mediaServerService.closeStreams(mediaServer, streamProxy.getApp(), streamProxy.getStream());
}else {
mediaServerService.stopProxy(mediaServer, streamProxy.getStreamKey());
mediaServerService.stopProxy(mediaServer, streamProxy.getStreamKey(), streamProxy.getType());
}
streamProxyMapper.removeStream(streamProxy.getId());
}

View File

@@ -88,6 +88,9 @@ server:
certificate: xx.pem
# 私钥文件
certificate-private-key: xx.key
# protocols: TLSv1.2
# ciphers: TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
# 作为28181服务器的配置
sip: