diff --git a/src/main/java/com/genersoft/iot/vmp/media/abl/ABLMediaNodeServerService.java b/src/main/java/com/genersoft/iot/vmp/media/abl/ABLMediaNodeServerService.java index 110c0cbb5..7aa0f982e 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/abl/ABLMediaNodeServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/media/abl/ABLMediaNodeServerService.java @@ -1,8 +1,12 @@ package com.genersoft.iot.vmp.media.abl; +import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.conf.SipConfig; +import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; +import com.genersoft.iot.vmp.media.abl.bean.AblServerConfig; import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.service.IMediaNodeServerService; @@ -24,6 +28,9 @@ public class ABLMediaNodeServerService implements IMediaNodeServerService { @Autowired private ABLRESTfulUtils ablresTfulUtils; + @Autowired + private SipConfig sipConfig; + @Override public int createRTPServer(MediaServer mediaServer, String stream, long ssrc, Integer port, Boolean onlyAuto, Boolean disableAudio, Boolean reUsePort, Integer tcpMode) { return ablresTfulUtils.openRtpServer(mediaServer, "rtp", stream, 96, port, tcpMode, disableAudio?1:0); @@ -46,17 +53,28 @@ public class ABLMediaNodeServerService implements IMediaNodeServerService { logger.info("关闭RTP Server " + jsonObject); if (jsonObject != null ) { if (jsonObject.getInteger("code") != 0) { - logger.error("关闭RTP Server 失败: " + jsonObject.getString("msg")); + logger.error("[closeRtpServer] 失败: " + jsonObject.getString("msg")); } }else { // 检查ZLM状态 - logger.error("关闭RTP Server 失败: 请检查ZLM服务"); + logger.error("[closeRtpServer] 失败: 请检查ZLM服务"); } } @Override - public void closeStreams(MediaServer mediaServerItem, String rtp, String streamId) { - + public void closeStreams(MediaServer mediaServer, String app, String streamId) { + Map param = new HashMap<>(); + param.put("stream_id", streamId); + param.put("force", 1); + JSONObject jsonObject = ablresTfulUtils.closeStreams(mediaServer, app, streamId); + if (jsonObject != null ) { + if (jsonObject.getInteger("code") != 0) { + logger.error("[closeStreams] 失败: " + jsonObject.getString("msg")); + } + }else { + // 检查ZLM状态 + logger.error("[closeStreams] 失败: 请检查ZLM服务"); + } } @Override @@ -66,86 +84,125 @@ public class ABLMediaNodeServerService implements IMediaNodeServerService { @Override public boolean checkNodeId(MediaServer mediaServerItem) { + logger.warn("[abl-checkNodeId] 未实现"); return false; } @Override public void online(MediaServer mediaServerItem) { - + logger.warn("[abl-online] 未实现"); } @Override public MediaServer checkMediaServer(String ip, int port, String secret) { + MediaServer mediaServer = new MediaServer(); + mediaServer.setIp(ip); + mediaServer.setHttpPort(port); + mediaServer.setSecret(secret); + JSONObject responseJSON = ablresTfulUtils.getServerConfig(mediaServer); + JSONArray data = responseJSON.getJSONArray("params"); + if (data != null && !data.isEmpty()) { + AblServerConfig config = AblServerConfig.getInstance(data); + config.setServerIp(ip); + config.setHttpServerPort(port); + return new MediaServer(config, sipConfig.getIp()); + } return null; } @Override public boolean stopSendRtp(MediaServer mediaInfo, String app, String stream, String ssrc) { + // TODO 需要记录开始发流返回的KEY,暂不做实现 + logger.warn("[abl-stopSendRtp] 未实现"); +// ablresTfulUtils.stopSendRtp() return false; } @Override public boolean deleteRecordDirectory(MediaServer mediaServerItem, String app, String stream, String date, String fileName) { + logger.warn("[abl-deleteRecordDirectory] 未实现"); return false; } @Override public List getMediaList(MediaServer mediaServerItem, String app, String stream, String callId) { + logger.warn("[abl-getMediaList] 未实现"); + return null; } @Override public Boolean connectRtpServer(MediaServer mediaServerItem, String address, int port, String stream) { + logger.warn("[abl-connectRtpServer] 未实现"); return null; } @Override public void getSnap(MediaServer mediaServerItem, String streamUrl, int timeoutSec, int expireSec, String path, String fileName) { - + logger.warn("[abl-getSnap] 未实现"); } @Override public MediaInfo getMediaInfo(MediaServer mediaServerItem, String app, String stream) { + logger.warn("[abl-getMediaInfo] 未实现"); return null; } @Override public Boolean pauseRtpCheck(MediaServer mediaServer, String streamKey) { + logger.warn("[abl-pauseRtpCheck] 未实现"); return null; } @Override public Boolean resumeRtpCheck(MediaServer mediaServer, String streamKey) { + logger.warn("[abl-resumeRtpCheck] 未实现"); return null; } @Override public String getFfmpegCmd(MediaServer mediaServer, String cmdKey) { + logger.warn("[abl-getFfmpegCmd] 未实现"); return null; } @Override public WVPResult addFFmpegSource(MediaServer mediaServer, String srcUrl, String dstUrl, int timeoutMs, boolean enableAudio, boolean enableMp4, String ffmpegCmdKey) { + logger.warn("[abl-addFFmpegSource] 未实现"); return null; } @Override public WVPResult addStreamProxy(MediaServer mediaServer, String app, String stream, String url, boolean enableAudio, boolean enableMp4, String rtpType) { + logger.warn("[abl-addStreamProxy] 未实现"); return null; } @Override public Boolean delFFmpegSource(MediaServer mediaServer, String streamKey) { + logger.warn("[abl-delFFmpegSource] 未实现"); return null; } @Override public Boolean delStreamProxy(MediaServer mediaServer, String streamKey) { + logger.warn("[abl-delStreamProxy] 未实现"); return null; } @Override public Map getFFmpegCMDs(MediaServer mediaServer) { + logger.warn("[abl-getFFmpegCMDs] 未实现"); return null; } + + @Override + public void startSendRtpPassive(MediaServer mediaServer, SendRtpItem sendRtpItem, Integer timeout) { + logger.warn("[abl-startSendRtpPassive] 未实现"); + } + + @Override + public void startSendRtpStream(MediaServer mediaServer, SendRtpItem sendRtpItem) { + logger.warn("[abl-startSendRtpStream] 未实现"); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/abl/ABLRESTfulUtils.java b/src/main/java/com/genersoft/iot/vmp/media/abl/ABLRESTfulUtils.java new file mode 100644 index 000000000..4d0af781c --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/media/abl/ABLRESTfulUtils.java @@ -0,0 +1,325 @@ +package com.genersoft.iot.vmp.media.abl; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.media.bean.MediaServer; +import okhttp3.*; +import okhttp3.logging.HttpLoggingInterceptor; +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.ConnectException; +import java.net.SocketTimeoutException; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +@Component +public class ABLRESTfulUtils { + + private final static Logger logger = LoggerFactory.getLogger(ABLRESTfulUtils.class); + + private OkHttpClient client; + + + + public interface RequestCallback{ + void run(JSONObject response); + } + + private OkHttpClient getClient(){ + return getClient(null); + } + + private OkHttpClient getClient(Integer readTimeOut){ + if (client == null) { + if (readTimeOut == null) { + readTimeOut = 10; + } + OkHttpClient.Builder httpClientBuilder = new OkHttpClient.Builder(); + //todo 暂时写死超时时间 均为5s + // 设置连接超时时间 + httpClientBuilder.connectTimeout(8,TimeUnit.SECONDS); + // 设置读取超时时间 + httpClientBuilder.readTimeout(readTimeOut,TimeUnit.SECONDS); + // 设置连接池 + httpClientBuilder.connectionPool(new ConnectionPool(16, 5, TimeUnit.MINUTES)); + if (logger.isDebugEnabled()) { + HttpLoggingInterceptor logging = new HttpLoggingInterceptor(message -> { + logger.debug("http请求参数:" + message); + }); + logging.setLevel(HttpLoggingInterceptor.Level.BASIC); + // OkHttp進行添加攔截器loggingInterceptor + httpClientBuilder.addInterceptor(logging); + } + client = httpClientBuilder.build(); + } + return client; + + } + + public JSONObject sendPost(MediaServer mediaServerItem, String api, Map param, RequestCallback callback) { + return sendPost(mediaServerItem, api, param, callback, null); + } + + + public JSONObject sendPost(MediaServer mediaServerItem, String api, Map param, RequestCallback callback, Integer readTimeOut) { + OkHttpClient client = getClient(readTimeOut); + + if (mediaServerItem == null) { + return null; + } + String url = String.format("http://%s:%s/index/api/%s", mediaServerItem.getIp(), mediaServerItem.getHttpPort(), api); + JSONObject responseJSON = new JSONObject(); + //-2自定义流媒体 调用错误码 + responseJSON.put("code",-2); + responseJSON.put("msg","流媒体调用失败"); + + FormBody.Builder builder = new FormBody.Builder(); + builder.add("secret",mediaServerItem.getSecret()); + if (param != null && param.keySet().size() > 0) { + for (String key : param.keySet()){ + if (param.get(key) != null) { + builder.add(key, param.get(key).toString()); + } + } + } + + FormBody body = builder.build(); + + Request request = new Request.Builder() + .post(body) + .url(url) + .build(); + if (callback == null) { + try { + Response response = client.newCall(request).execute(); + + if (response.isSuccessful()) { + ResponseBody responseBody = response.body(); + if (responseBody != null) { + String responseStr = responseBody.string(); + responseJSON = JSON.parseObject(responseStr); + } + }else { + System.out.println( 2222); + System.out.println( response.code()); + response.close(); + Objects.requireNonNull(response.body()).close(); + } + }catch (IOException e) { + logger.error(String.format("[ %s ]请求失败: %s", url, e.getMessage())); + + if(e instanceof SocketTimeoutException){ + //读取超时超时异常 + logger.error(String.format("读取ABL数据超时失败: %s, %s", url, e.getMessage())); + } + if(e instanceof ConnectException){ + //判断连接异常,我这里是报Failed to connect to 10.7.5.144 + logger.error(String.format("连接ABL连接失败: %s, %s", url, e.getMessage())); + } + + }catch (Exception e){ + logger.error(String.format("访问ABL失败: %s, %s", url, e.getMessage())); + } + }else { + client.newCall(request).enqueue(new Callback(){ + + @Override + public void onResponse(@NotNull Call call, @NotNull Response response){ + if (response.isSuccessful()) { + try { + String responseStr = Objects.requireNonNull(response.body()).string(); + callback.run(JSON.parseObject(responseStr)); + } catch (IOException e) { + logger.error(String.format("[ %s ]请求失败: %s", url, e.getMessage())); + } + + }else { + response.close(); + Objects.requireNonNull(response.body()).close(); + } + } + + @Override + public void onFailure(@NotNull Call call, @NotNull IOException e) { + logger.error(String.format("连接ABL失败: %s, %s", call.request().toString(), e.getMessage())); + + if(e instanceof SocketTimeoutException){ + //读取超时超时异常 + logger.error(String.format("读取ABL数据失败: %s, %s", call.request().toString(), e.getMessage())); + } + if(e instanceof ConnectException){ + //判断连接异常,我这里是报Failed to connect to 10.7.5.144 + logger.error(String.format("连接ABL失败: %s, %s", call.request().toString(), e.getMessage())); + } + } + }); + } + + + + return responseJSON; + } + + public JSONObject sendGet(MediaServer mediaServerItem, String api, Map param) { + OkHttpClient client = getClient(); + + if (mediaServerItem == null) { + return null; + } + JSONObject responseJSON = null; + StringBuilder stringBuffer = new StringBuilder(); + stringBuffer.append(String.format("http://%s:%s/index/api/%s", mediaServerItem.getIp(), mediaServerItem.getHttpPort(), api)); + if (param != null && !param.keySet().isEmpty()) { + stringBuffer.append("?secret=").append(mediaServerItem.getSecret()).append("&"); + int index = 1; + for (String key : param.keySet()){ + if (param.get(key) != null) { + stringBuffer.append(key + "=" + param.get(key)); + if (index < param.size()) { + stringBuffer.append("&"); + } + } + index++; + } + } + String url = stringBuffer.toString(); + logger.info("[访问ABL]: {}", url); + Request request = new Request.Builder() + .get() + .url(url) + .build(); + try { + Response response = client.newCall(request).execute(); + if (response.isSuccessful()) { + ResponseBody responseBody = response.body(); + if (responseBody != null) { + String responseStr = responseBody.string(); + responseJSON = JSON.parseObject(responseStr); + } + }else { + response.close(); + Objects.requireNonNull(response.body()).close(); + } + } catch (ConnectException e) { + logger.error(String.format("连接ABL失败: %s, %s", e.getCause().getMessage(), e.getMessage())); + logger.info("请检查media配置并确认ABL已启动..."); + }catch (IOException e) { + logger.error(String.format("[ %s ]请求失败: %s", url, e.getMessage())); + } + + + + return responseJSON; + } + + public void sendGetForImg(MediaServer mediaServerItem, String api, Map params, String targetPath, String fileName) { + String url = String.format("http://%s:%s/index/api/%s", mediaServerItem.getIp(), mediaServerItem.getHttpPort(), api); + HttpUrl parseUrl = HttpUrl.parse(url); + if (parseUrl == null) { + return; + } + HttpUrl.Builder httpBuilder = parseUrl.newBuilder(); + + httpBuilder.addQueryParameter("secret", mediaServerItem.getSecret()); + if (params != null) { + for (Map.Entry param : params.entrySet()) { + httpBuilder.addQueryParameter(param.getKey(), param.getValue().toString()); + } + } + + Request request = new Request.Builder() + .url(httpBuilder.build()) + .build(); + logger.info(request.toString()); + try { + OkHttpClient client = getClient(); + Response response = client.newCall(request).execute(); + if (response.isSuccessful()) { + if (targetPath != null) { + File snapFolder = new File(targetPath); + if (!snapFolder.exists()) { + if (!snapFolder.mkdirs()) { + logger.warn("{}路径创建失败", snapFolder.getAbsolutePath()); + } + + } + File snapFile = new File(targetPath + File.separator + fileName); + FileOutputStream outStream = new FileOutputStream(snapFile); + + outStream.write(Objects.requireNonNull(response.body()).bytes()); + outStream.flush(); + outStream.close(); + } else { + logger.error(String.format("[ %s ]请求失败: %s %s", url, response.code(), response.message())); + } + } else { + logger.error(String.format("[ %s ]请求失败: %s %s", url, response.code(), response.message())); + } + Objects.requireNonNull(response.body()).close(); + } catch (ConnectException e) { + logger.error(String.format("连接ABL失败: %s, %s", e.getCause().getMessage(), e.getMessage())); + logger.info("请检查media配置并确认ABL已启动..."); + } catch (IOException e) { + logger.error(String.format("[ %s ]请求失败: %s", url, e.getMessage())); + } + } + + + public Integer openRtpServer(MediaServer mediaServer, String app, String stream, int payload, Integer port, Integer tcpMode, Integer disableAudio) { + Map param = new HashMap<>(); + param.put("vhost", "_defaultVhost_"); + param.put("app", app); + param.put("stream_id", stream); + param.put("payload", payload); + if (port != null) { + param.put("port", port); + } + if (tcpMode != null) { + param.put("enable_tcp", tcpMode); + } + if (disableAudio != null) { + param.put("disableAudio", disableAudio); + } + + JSONObject jsonObject = sendPost(mediaServer, "openRtpServer", param, null); + if (jsonObject.getInteger("code") == 0) { + return jsonObject.getInteger("port"); + }else { + return 0; + } + } + + public JSONObject closeStreams(MediaServer mediaServerItem, String app, String stream) { + Map param = new HashMap<>(); + param.put("vhost", "__defaultVhost__"); + param.put("app", app); + param.put("stream", stream); + param.put("force", 1); + return sendPost(mediaServerItem, "close_streams",param, null); + } + + public JSONObject getServerConfig(MediaServer mediaServerItem){ + return sendPost(mediaServerItem, "getServerConfig",null, null); + } + + public JSONObject setConfigParamValue(MediaServer mediaServerItem, String key, Object value){ + Map param = new HashMap<>(); + param.put("key", key); + param.put("value", value); + return sendGet(mediaServerItem,"setConfigParamValue", param); + } + + public void stopSendRtp(MediaServer mediaServer,String key) { + Map param = new HashMap<>(); + param.put("key", key); + sendPost(mediaServer,"stopSendRtp", param, null); + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/media/bean/MediaServer.java b/src/main/java/com/genersoft/iot/vmp/media/bean/MediaServer.java index decbd4e80..288d948cd 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/bean/MediaServer.java +++ b/src/main/java/com/genersoft/iot/vmp/media/bean/MediaServer.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.media.bean; +import com.genersoft.iot.vmp.media.abl.bean.AblServerConfig; import com.genersoft.iot.vmp.media.zlm.dto.ZLMServerConfig; import io.swagger.v3.oas.annotations.media.Schema; import org.springframework.util.ObjectUtils; @@ -129,6 +130,32 @@ public class MediaServer { } + public MediaServer(AblServerConfig config, String sipIp) { + id = config.getMediaServerId(); + ip = config.getServerIp(); + hookIp = sipIp; + sdpIp = config.getServerIp(); + streamIp = config.getServerIp(); + httpPort = config.getHttpServerPort(); + flvPort = config.getHttpFlvPort(); + wsFlvPort = config.getWsPort(); +// httpSSlPort = config.getHttpSSLport(); +// flvSSLPort = config.getHttpSSLport(); +// wsFlvSSLPort = config.getHttpSSLport(); + rtmpPort = config.getRtmpPort(); +// rtmpSSlPort = config.getRtmpSslPort(); + rtpProxyPort = config.getPsTsRecvPort(); + rtspPort = config.getRtspPort(); +// rtspSSLPort = config.getRtspSSlport(); + autoConfig = true; // 默认值true; + secret = config.getSecret(); +// hookAliveInterval = config.getHookAliveInterval(); + rtpEnable = false; // 默认使用单端口;直到用户自己设置开启多端口 +// rtpPortRange = config.getPortRange().replace("_",","); // 默认使用30000,30500作为级联时发送流的端口号 + rtpPortRange = "30000,30500"; // 默认使用30000,30500作为级联时发送流的端口号 + recordAssistPort = 0; // 默认关闭 + } + public String getId() { return id; }