diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CommonGBChannel.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CommonGBChannel.java index 6327688b2..58bb6631f 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CommonGBChannel.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CommonGBChannel.java @@ -132,6 +132,9 @@ public class CommonGBChannel { @Schema(description = "关联的拉流代理Id(流来源是拉流代理时有效)") private Integer streamProxyId; + @Schema(description = "关联的部标标通道ID") + private Integer jtChannelId; + @Schema(description = "创建时间") private String createTime; diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/bean/JTChannel.java b/src/main/java/com/genersoft/iot/vmp/jt1078/bean/JTChannel.java index a5222fa36..1ea227e69 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/bean/JTChannel.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/bean/JTChannel.java @@ -4,6 +4,7 @@ import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel; import io.swagger.v3.oas.annotations.media.Schema; import lombok.Data; import lombok.EqualsAndHashCode; +import org.springframework.util.ObjectUtils; /** * JT 通道 @@ -49,4 +50,16 @@ public class JTChannel extends CommonGBChannel { ", hasAudio='" + hasAudio + '\'' + '}'; } + + public CommonGBChannel buildCommonGBChannel() { + if (ObjectUtils.isEmpty(this.getGbDeviceId())) { + return null; + } + if (ObjectUtils.isEmpty(this.getGbName())) { + this.setGbName(this.getName()); + } + this.setJtChannelId(this.getId()); + return this; + + } } diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/controller/JT1078Controller.java b/src/main/java/com/genersoft/iot/vmp/jt1078/controller/JT1078Controller.java index f0e8ee352..03e12daec 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/controller/JT1078Controller.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/controller/JT1078Controller.java @@ -71,7 +71,7 @@ public class JT1078Controller { @Operation(summary = "1078-开始点播", security = @SecurityRequirement(name = JwtUtils.HEADER)) @Parameter(name = "phoneNumber", description = "设备手机号", required = true) - @Parameter(name = "channelId", description = "通道国标编号, 一般为从1开始的数字", required = true) + @Parameter(name = "channelId", description = "通道编号, 一般为从1开始的数字", required = true) @Parameter(name = "type", description = "类型:0:音视频,1:视频,3:音频", required = true) @GetMapping("/live/start") public DeferredResult> startLive(HttpServletRequest request, diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/dao/JTChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/jt1078/dao/JTChannelMapper.java index 555a899c7..5b55576c9 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/dao/JTChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/dao/JTChannelMapper.java @@ -1,7 +1,7 @@ package com.genersoft.iot.vmp.jt1078.dao; import com.genersoft.iot.vmp.jt1078.bean.JTChannel; -import com.genersoft.iot.vmp.jt1078.bean.JTDevice; +import com.genersoft.iot.vmp.jt1078.dao.provider.JTChannelProvider; import org.apache.ibatis.annotations.*; import java.util.List; @@ -9,70 +9,13 @@ import java.util.List; @Mapper public interface JTChannelMapper { - @Select(value = {" "}) - List getAll(@Param("terminalDbId") int terminalDbId, @Param("query") String query); + @SelectProvider(type = JTChannelProvider.class, method = "selectAll") + List selectAll(@Param("terminalDbId") int terminalDbId, @Param("query") String query); @Update(value = {" "}) void update(JTChannel channel); @@ -85,52 +28,6 @@ public interface JTChannelMapper { "has_audio,"+ "create_time,"+ "update_time"+ - ", gb_device_id" + - ", gb_name" + - ", gb_manufacturer" + - ", gb_model" + - ", gb_civil_code" + - ", gb_block" + - ", gb_address" + - ", gb_parental" + - ", gb_parent_id" + - ", gb_register_way" + - ", gb_security_level_code" + - ", gb_secrecy" + - ", gb_ip_address" + - ", gb_port" + - ", gb_password" + - ", gb_status" + - ", gb_longitude" + - ", gb_latitude" + - ", gb_business_group_id" + - ", gb_ptz_type" + - ", gb_photoelectric_imaging_typ" + - ", gb_capture_position_type" + - ", gb_room_type" + - ", gb_supply_light_type" + - ", gb_direction_type" + - ", gb_resolution" + - ", gb_stream_number_list" + - ", gb_download_speed" + - ", gb_svc_space_support_mod" + - ", gb_svc_time_support_mode" + - ", gb_ssvc_ratio_support_list" + - ", gb_mobile_device_type" + - ", gb_horizontal_field_angle" + - ", gb_vertical_field_angle" + - ", gb_max_view_distance" + - ", gb_grassroots_code" + - ", gb_po_type" + - ", gb_po_common_name" + - ", gb_mac" + - ", gb_function_type" + - ", gb_encode_type" + - ", gb_install_time" + - ", gb_management_unit" + - ", gb_contact_info" + - ", gb_record_save_days" + - ", gb_industrial_classification" + ") VALUES (" + "#{terminalDbId}," + "#{channelId}," + @@ -138,66 +35,15 @@ public interface JTChannelMapper { "#{hasAudio}," + "#{createTime}," + "#{updateTime}" + - ", #{gbDeviceId}" + - ", #{gbName}" + - ", #{gbManufacturer}" + - ", #{gbModel}" + - ", #{gbCivilCode}" + - ", #{gbBlock}" + - ", #{gbAddress}" + - ", #{gbParental}" + - ", #{gbParentId}" + - ", #{gbRegisterWay}" + - ", #{gbSecurityLevelCode}" + - ", #{gbSecrecy}" + - ", #{gbIpAddress}" + - ", #{gbPort}" + - ", #{gbPassword}" + - ", #{gbStatus}" + - ", #{gbLongitude}" + - ", #{gbLatitude}" + - ", #{gbBusinessGroupId}" + - ", #{gbPtzType}" + - ", #{gbPhotoelectricImagingTyp}" + - ", #{gbCapturePositionType}" + - ", #{gbRoomType}" + - ", #{gbSupplyLightType}" + - ", #{gbDirectionType}" + - ", #{gbResolution}" + - ", #{gbStreamNumberList}" + - ", #{gbDownloadSpeed}" + - ", #{gbSvcSpaceSupportMod}" + - ", #{gbSvcTimeSupportMode}" + - ", #{gbSsvcRatioSupportList}" + - ", #{gbMobileDeviceType}" + - ", #{gbHorizontalFieldAngle}" + - ", #{gbVerticalFieldAngle}" + - ", #{gbMaxViewDistance}" + - ", #{gbGrassrootsCode}" + - ", #{gbPoType}" + - ", #{gbPoCommonName}" + - ", #{gbMac}" + - ", #{gbFunctionType}" + - ", #{gbEncodeType}" + - ", #{gbInstallTime}" + - ", #{gbManagementUnit}" + - ", #{gbContactInfo}" + - ", #{gbRecordSaveDays}" + - ", #{gbIndustrialClassification}" + " )"}) + @Options(useGeneratedKeys = true, keyProperty = "id", keyColumn = "id") void add(JTChannel channel); @Delete("delete from wvp_jt_channel where id = #{id}") void delete(@Param("id") int id); - @Select(value = {" "}) - JTChannel getChannel(@Param("terminalDbId") int terminalDbId, @Param("channelId") Integer channelId); + @SelectProvider(type = JTChannelProvider.class, method = "selectChannelByChannelId") + JTChannel selectChannelByChannelId(@Param("terminalDbId") int terminalDbId, @Param("channelId") Integer channelId); @Select(value = {" "}) - JTChannel getChannelByDbId(@Param("id") Integer id); + @SelectProvider(type = JTChannelProvider.class, method = "selectChannelById") + JTChannel selectChannelById(@Param("id") Integer id); } diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/dao/provider/JTChannelProvider.java b/src/main/java/com/genersoft/iot/vmp/jt1078/dao/provider/JTChannelProvider.java new file mode 100644 index 000000000..d07887f06 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/dao/provider/JTChannelProvider.java @@ -0,0 +1,37 @@ +package com.genersoft.iot.vmp.jt1078.dao.provider; + +import java.util.Map; + +public class JTChannelProvider { + + public final static String BASE_SQL = + "SELECT jc.*, jc.id as jt_channel_id, wdc.*, wdc.id as gb_id " + + " from wvp_jt_channel jc " + + " LEFT join wvp_device_channel wdc " + + " on jc.id = wdc.jt_channel_id "; + + public String selectChannelByChannelId(Map params ){ + StringBuilder sqlBuild = new StringBuilder(); + sqlBuild.append(BASE_SQL); + sqlBuild.append(" WHERE jc.terminal_db_id = #{terminalDbId} and jc.channel_id = #{channelId} "); + return sqlBuild.toString(); + } + public String selectChannelById(Map params ){ + StringBuilder sqlBuild = new StringBuilder(); + sqlBuild.append(BASE_SQL); + sqlBuild.append(" WHERE jc.id = #{id}"); + return sqlBuild.toString(); + } + public String selectAll(Map params ){ + StringBuilder sqlBuild = new StringBuilder(); + sqlBuild.append(BASE_SQL); + sqlBuild.append(" WHERE 1=1 "); + if (params.get("query") != null) { + sqlBuild.append(" AND ") + .append(" jc.name LIKE ").append("'%").append(params.get("query")).append("%'") + ; + } + sqlBuild.append(" ORDER BY jc.channel_id "); + return sqlBuild.toString(); + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/service/Ijt1078PlayService.java b/src/main/java/com/genersoft/iot/vmp/jt1078/service/Ijt1078PlayService.java new file mode 100644 index 000000000..aa2a3f234 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/service/Ijt1078PlayService.java @@ -0,0 +1,35 @@ +package com.genersoft.iot.vmp.jt1078.service; + +import com.genersoft.iot.vmp.common.GeneralCallback; +import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.jt1078.bean.*; +import com.genersoft.iot.vmp.jt1078.proc.request.J1205; +import com.github.pagehelper.PageInfo; + +import java.util.List; + +public interface Ijt1078PlayService { + JTMediaStreamType checkStreamFromJt(String stream); + + void play(String phoneNumber, Integer channelId, int type, GeneralCallback callback); + + void playback(String phoneNumber, Integer channelId, String startTime, String endTime, Integer type, + Integer rate, Integer playbackType, Integer playbackSpeed, GeneralCallback callback); + + void stopPlay(String phoneNumber, Integer channelId); + + void pausePlay(String phoneNumber, Integer channelId); + + void continueLivePlay(String phoneNumber, Integer channelId); + + List getRecordList(String phoneNumber, Integer channelId, String startTime, String endTime); + + void stopPlayback(String phoneNumber, Integer channelId); + + void startTalk(String phoneNumber, Integer channelId, String app, String stream, String mediaServerId, Boolean onlySend, GeneralCallback callback); + + void stopTalk(String phoneNumber, Integer channelId); + + void playbackControl(String phoneNumber, Integer channelId, Integer command, Integer playbackSpeed, String time); + +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/service/Ijt1078Service.java b/src/main/java/com/genersoft/iot/vmp/jt1078/service/Ijt1078Service.java index c0092d46b..23c10b4b0 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/service/Ijt1078Service.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/service/Ijt1078Service.java @@ -25,20 +25,6 @@ public interface Ijt1078Service { void updateDeviceStatus(boolean connected, String phoneNumber); - void play(String phoneNumber, Integer channelId, int type, GeneralCallback callback); - - void playback(String phoneNumber, Integer channelId, String startTime, String endTime, Integer type, - Integer rate, Integer playbackType, Integer playbackSpeed, GeneralCallback callback); - - void stopPlay(String phoneNumber, Integer channelId); - - void pausePlay(String phoneNumber, Integer channelId); - - void continueLivePlay(String phoneNumber, Integer channelId); - - List getRecordList(String phoneNumber, Integer channelId, String startTime, String endTime); - - void stopPlayback(String phoneNumber, Integer channelId); void ptzControl(String phoneNumber, Integer channelId, String command, int speed); @@ -112,14 +98,8 @@ public interface Ijt1078Service { JTMediaAttribute queryMediaAttribute(String phoneNumber); - void startTalk(String phoneNumber, Integer channelId, String app, String stream, String mediaServerId, Boolean onlySend, GeneralCallback callback); - - void stopTalk(String phoneNumber, Integer channelId); - void changeStreamType(String phoneNumber, Integer channelId, Integer streamType); - void playbackControl(String phoneNumber, Integer channelId, Integer command, Integer playbackSpeed, String time); - void recordDownload(String phoneNumber, Integer channelId, String startTime, String endTime, Integer alarmSign, Integer mediaType, Integer streamType, Integer storageType, GeneralCallback fileCallback); PageInfo getChannelList(int page, int count, int deviceId, String query); diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078PlayServiceImpl.java new file mode 100644 index 000000000..01122918e --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078PlayServiceImpl.java @@ -0,0 +1,651 @@ +package com.genersoft.iot.vmp.jt1078.service.impl; + +import com.genersoft.iot.vmp.common.GeneralCallback; +import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.common.VideoManagerConstants; +import com.genersoft.iot.vmp.conf.DynamicTask; +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.conf.exception.ControllerException; +import com.genersoft.iot.vmp.conf.ftpServer.FtpSetting; +import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo; +import com.genersoft.iot.vmp.jt1078.bean.*; +import com.genersoft.iot.vmp.jt1078.cmd.JT1078Template; +import com.genersoft.iot.vmp.jt1078.dao.JTChannelMapper; +import com.genersoft.iot.vmp.jt1078.dao.JTTerminalMapper; +import com.genersoft.iot.vmp.jt1078.event.FtpUploadEvent; +import com.genersoft.iot.vmp.jt1078.proc.request.J1205; +import com.genersoft.iot.vmp.jt1078.proc.response.*; +import com.genersoft.iot.vmp.jt1078.service.Ijt1078PlayService; +import com.genersoft.iot.vmp.jt1078.service.Ijt1078Service; +import com.genersoft.iot.vmp.media.bean.MediaInfo; +import com.genersoft.iot.vmp.media.bean.MediaServer; +import com.genersoft.iot.vmp.media.event.hook.Hook; +import com.genersoft.iot.vmp.media.event.hook.HookData; +import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; +import com.genersoft.iot.vmp.media.event.hook.HookType; +import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent; +import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent; +import com.genersoft.iot.vmp.media.event.media.MediaNotFoundEvent; +import com.genersoft.iot.vmp.media.event.mediaServer.MediaSendRtpStoppedEvent; +import com.genersoft.iot.vmp.media.service.IMediaServerService; +import com.genersoft.iot.vmp.service.ISendRtpServerService; +import com.genersoft.iot.vmp.service.bean.InviteErrorCode; +import com.genersoft.iot.vmp.service.bean.SSRCInfo; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import com.genersoft.iot.vmp.utils.DateUtil; +import com.genersoft.iot.vmp.utils.MediaServerUtils; +import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.ObjectUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.event.EventListener; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Service; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +@Service +@Slf4j +public class jt1078PlayServiceImpl implements Ijt1078PlayService { + + @Autowired + private ISendRtpServerService sendRtpServerService; + + @Autowired + private Ijt1078Service jt1078Service; + + @Autowired + private JT1078Template jt1078Template; + + @Autowired + private RedisTemplate redisTemplate; + + @Autowired + private HookSubscribe subscribe; + + @Autowired + private IMediaServerService mediaServerService; + + @Autowired + private DynamicTask dynamicTask; + + @Autowired + private UserSetting userSetting; + + @Autowired + private IRedisCatchStorage redisCatchStorage; + + @Autowired + private FtpSetting ftpSetting; + + /** + * 流到来的处理 + */ + @Async("taskExecutor") + @EventListener + public void onApplicationEvent(MediaArrivalEvent event) { + + } + + /** + * 流离开的处理 + */ + @Async("taskExecutor") + @EventListener + public void onApplicationEvent(MediaDepartureEvent event) { + + } + + /** + * 流未找到的处理 + */ + @Async("taskExecutor") + @EventListener + public void onApplicationEvent(MediaNotFoundEvent event) { + if (!userSetting.isAutoApplyPlay()) { + return; + } + JTMediaStreamType jtMediaStreamType = checkStreamFromJt(event.getStream()); + if (jtMediaStreamType == null){ + return; + } + String[] streamParamArray = event.getStream().split("_"); + String phoneNumber = streamParamArray[1]; + int channelId = Integer.parseInt(streamParamArray[2]); + String params = event.getParams(); + Map paramMap = MediaServerUtils.urlParamToMap(params); + int type = 0; + try { + type = Integer.parseInt(paramMap.get("type")); + }catch (NumberFormatException ignored) {} + if (jtMediaStreamType.equals(JTMediaStreamType.PLAY)) { + play(phoneNumber, channelId, 0, null); + }else if (jtMediaStreamType.equals(JTMediaStreamType.PLAYBACK)) { + String startTimeParam = DateUtil.jt1078Toyyyy_MM_dd_HH_mm_ss(streamParamArray[3]); + String endTimeParam = DateUtil.jt1078Toyyyy_MM_dd_HH_mm_ss(streamParamArray[4]); + int rate = 0; + int playbackType = 0; + int playbackSpeed = 0; + try { + rate = Integer.parseInt(paramMap.get("rate")); + playbackType = Integer.parseInt(paramMap.get("playbackType")); + playbackSpeed = Integer.parseInt(paramMap.get("playbackSpeed")); + }catch (NumberFormatException ignored) {} + playback(phoneNumber, channelId, startTimeParam, endTimeParam, type, rate, playbackType, playbackSpeed, null); + } + } + + + /** + * 校验流是否是属于部标的 + */ + @Override + public JTMediaStreamType checkStreamFromJt(String stream) { + if (!stream.startsWith("jt_")) { + return null; + } + String[] streamParamArray = stream.split("_"); + if (streamParamArray.length == 3) { + return JTMediaStreamType.PLAY; + }else if (streamParamArray.length == 5) { + return JTMediaStreamType.PLAYBACK; + }else if (streamParamArray.length == 4) { + return JTMediaStreamType.TALK; + }else { + return null; + } + } + + private final Map>> inviteErrorCallbackMap = new ConcurrentHashMap<>(); + + @Override + public void play(String phoneNumber, Integer channelId, int type, GeneralCallback callback) { + JTDevice device = jt1078Service.getDevice(phoneNumber); + if (device == null) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "设备不存在"); + } + jt1078Template.checkTerminalStatus(phoneNumber); + JTChannel channel = jt1078Service.getChannel(device.getId(), channelId); + if (channel == null) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "通道不存在"); + } + // 检查流是否已经存在,存在则返回 + String playKey = VideoManagerConstants.INVITE_INFO_1078_PLAY + phoneNumber + ":" + channelId; + List> errorCallbacks = inviteErrorCallbackMap.computeIfAbsent(playKey, k -> new ArrayList<>()); + errorCallbacks.add(callback); + StreamInfo streamInfo = (StreamInfo) redisTemplate.opsForValue().get(playKey); + if (streamInfo != null) { + MediaServer mediaServer = streamInfo.getMediaServer(); + if (mediaServer != null) { + // 查询流是否存在,不存在则删除缓存数据 + MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, "rtp", streamInfo.getStream()); + if (mediaInfo != null) { + log.info("[1078-点播] 点播已经存在,直接返回, phoneNumber: {}, channelId: {}", phoneNumber, channelId); + for (GeneralCallback errorCallback : errorCallbacks) { + errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo); + } + return; + } + } + // 清理数据 + redisTemplate.delete(playKey); + } + String stream = "jt_" + phoneNumber + "_" + channelId; + MediaServer mediaServer = mediaServerService.getMediaServerForMinimumLoad(null); + if (mediaServer == null) { + for (GeneralCallback errorCallback : errorCallbacks) { + errorCallback.run(InviteErrorCode.FAIL.getCode(), "未找到可用的媒体节点", streamInfo); + } + return; + } + // 设置hook监听 + Hook hook = Hook.getInstance(HookType.on_media_arrival, "rtp", stream, mediaServer.getId()); + subscribe.addSubscribe(hook, (hookData) -> { + dynamicTask.stop(playKey); + log.info("[1078-点播] 点播成功, 手机号: {}, 通道: {}", phoneNumber, channelId); + // TODO 发送9105 实时音视频传输状态通知, 通知丢包率 + StreamInfo info = onPublishHandler(mediaServer, hookData, phoneNumber, channelId); + + for (GeneralCallback errorCallback : errorCallbacks) { + if (errorCallback == null) { + continue; + } + errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), info); + } + subscribe.removeSubscribe(hook); + redisTemplate.opsForValue().set(playKey, info); + // 截图 + String streamUrl; + if (mediaServer.getRtspPort() != 0) { + streamUrl = String.format("rtsp://127.0.0.1:%s/%s/%s", mediaServer.getRtspPort(), "rtp", stream); + } else { + streamUrl = String.format("http://127.0.0.1:%s/%s/%s.live.mp4", mediaServer.getHttpPort(), "rtp", stream); + } + String path = "snap"; + String fileName = phoneNumber + "_" + channelId + ".jpg"; + // 请求截图 + log.info("[请求截图]: " + fileName); + mediaServerService.getSnap(mediaServer, streamUrl, 15, 1, path, fileName); + }); + // 开启收流端口 + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServer, stream, "000", false, false, 0, false, !channel.isHasAudio(), false, 1); + if (ssrcInfo == null) { + stopPlay(phoneNumber, channelId); + return; + } + // 设置超时监听 + dynamicTask.startDelay(playKey, () -> { + log.info("[1078-点播] 超时, phoneNumber: {}, channelId: {}", phoneNumber, channelId); + for (GeneralCallback errorCallback : errorCallbacks) { + errorCallback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(), + InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null); + } + mediaServerService.closeRTPServer(mediaServer, stream); + subscribe.removeSubscribe(hook); + }, userSetting.getPlayTimeout()); + + log.info("[1078-点播] phoneNumber: {}, channelId: {}, 端口: {}", phoneNumber, channelId, ssrcInfo.getPort()); + J9101 j9101 = new J9101(); + j9101.setChannel(Integer.valueOf(channelId)); + j9101.setIp(mediaServer.getSdpIp()); + j9101.setRate(1); + j9101.setTcpPort(ssrcInfo.getPort()); + j9101.setUdpPort(ssrcInfo.getPort()); + j9101.setType(type); + jt1078Template.startLive(phoneNumber, j9101, 6); + } + + public StreamInfo onPublishHandler(MediaServer mediaServerItem, HookData hookData, String phoneNumber, Integer channelId) { + StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream(mediaServerItem, "rtp", hookData.getStream(), hookData.getMediaInfo(), null); + streamInfo.setDeviceId(phoneNumber); + streamInfo.setChannelId(channelId); + return streamInfo; + } + + @Override + public void stopPlay(String phoneNumber, Integer channelId) { + String playKey = VideoManagerConstants.INVITE_INFO_1078_PLAY + phoneNumber + ":" + channelId; + dynamicTask.stop(playKey); + // 清理回调 + List> generalCallbacks = inviteErrorCallbackMap.get(playKey); + if (generalCallbacks != null && !generalCallbacks.isEmpty()) { + for (GeneralCallback callback : generalCallbacks) { + callback.run(InviteErrorCode.ERROR_FOR_FINISH.getCode(), InviteErrorCode.ERROR_FOR_FINISH.getMsg(), null); + } + } + jt1078Template.checkTerminalStatus(phoneNumber); + StreamInfo streamInfo = (StreamInfo) redisTemplate.opsForValue().get(playKey); + // 发送停止命令 + J9102 j9102 = new J9102(); + j9102.setChannel(Integer.valueOf(channelId)); + j9102.setCommand(0); + j9102.setCloseType(0); + j9102.setStreamType(1); + jt1078Template.stopLive(phoneNumber, j9102, 6); + log.info("[1078-停止点播] phoneNumber: {}, channelId: {}", phoneNumber, channelId); + // 删除缓存数据 + if (streamInfo != null) { + // 关闭rtpServer + mediaServerService.closeRTPServer(streamInfo.getMediaServer(), streamInfo.getStream()); + redisTemplate.delete(playKey); + } + + } + + @Override + public void pausePlay(String phoneNumber, Integer channelId) { + String playKey = VideoManagerConstants.INVITE_INFO_1078_PLAY + phoneNumber + ":" + channelId; + dynamicTask.stop(playKey); + StreamInfo streamInfo = (StreamInfo) redisTemplate.opsForValue().get(playKey); + if (streamInfo == null) { + log.info("[1078-暂停点播] 未找到点播信息 phoneNumber: {}, channelId: {}", phoneNumber, channelId); + } + log.info("[1078-暂停点播] phoneNumber: {}, channelId: {}", phoneNumber, channelId); + // 发送暂停命令 + J9102 j9102 = new J9102(); + j9102.setChannel(Integer.valueOf(channelId)); + j9102.setCommand(2); + j9102.setCloseType(0); + j9102.setStreamType(1); + jt1078Template.stopLive(phoneNumber, j9102, 6); + } + + @Override + public void continueLivePlay(String phoneNumber, Integer channelId) { + String playKey = VideoManagerConstants.INVITE_INFO_1078_PLAY + phoneNumber + ":" + channelId; + dynamicTask.stop(playKey); + StreamInfo streamInfo = (StreamInfo) redisTemplate.opsForValue().get(playKey); + if (streamInfo == null) { + log.info("[1078-继续点播] 未找到点播信息 phoneNumber: {}, channelId: {}", phoneNumber, channelId); + } + log.info("[1078-继续点播] phoneNumber: {}, channelId: {}", phoneNumber, channelId); + // 发送暂停命令 + J9102 j9102 = new J9102(); + j9102.setChannel(Integer.valueOf(channelId)); + j9102.setCommand(2); + j9102.setCloseType(0); + j9102.setStreamType(1); + jt1078Template.stopLive(phoneNumber, j9102, 6); + } + + @Override + public List getRecordList(String phoneNumber, Integer channelId, String startTime, String endTime) { + log.info("[1078-查询录像列表] phoneNumber: {}, channelId: {}, startTime: {}, endTime: {}" + , phoneNumber, channelId, startTime, endTime); + // 发送请求录像列表命令 + J9205 j9205 = new J9205(); + j9205.setChannelId(channelId); + j9205.setStartTime(DateUtil.yyyy_MM_dd_HH_mm_ssTo1078(startTime)); + j9205.setEndTime(DateUtil.yyyy_MM_dd_HH_mm_ssTo1078(endTime)); + j9205.setMediaType(0); + j9205.setStreamType(0); + j9205.setStorageType(0); + List JRecordItemList = (List) jt1078Template.queryBackTime(phoneNumber, j9205, 20); + if (JRecordItemList == null || JRecordItemList.isEmpty()) { + return null; + } + log.info("[1078-查询录像列表] phoneNumber: {}, channelId: {}, startTime: {}, endTime: {}, 结果: {}条" + , phoneNumber, channelId, startTime, endTime, JRecordItemList.size()); + return JRecordItemList; + } + + @Override + public void playback(String phoneNumber, Integer channelId, String startTime, String endTime, Integer type, + Integer rate, Integer playbackType, Integer playbackSpeed, GeneralCallback callback) { + log.info("[1078-回放] 回放,设备:{}, 通道: {}, 开始时间: {}, 结束时间: {}, 音视频类型: {}, 码流类型: {}, " + + "回放方式: {}, 快进或快退倍数: {}", phoneNumber, channelId, startTime, endTime, type, rate, playbackType, playbackSpeed); + // 检查流是否已经存在,存在则返回 + String playbackKey = VideoManagerConstants.INVITE_INFO_1078_PLAYBACK + phoneNumber + ":" + channelId; + List> errorCallbacks = inviteErrorCallbackMap.computeIfAbsent(playbackKey, k -> new ArrayList<>()); + errorCallbacks.add(callback); + String logInfo = String.format("phoneNumber:%s, channelId:%s, startTime:%s, endTime:%s", phoneNumber, channelId, startTime, endTime); + StreamInfo streamInfo = (StreamInfo) redisTemplate.opsForValue().get(playbackKey); + if (streamInfo != null) { + MediaServer mediaServer = streamInfo.getMediaServer(); + if (mediaServer != null) { + // 查询流是否存在,不存在则删除缓存数据 + MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, "rtp", streamInfo.getStream()); + if (mediaInfo != null) { + log.info("[1078-回放] 回放已经存在,直接返回, logInfo: {}", logInfo); + for (GeneralCallback errorCallback : errorCallbacks) { + errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo); + } + return; + } + } + // 清理数据 + redisTemplate.delete(playbackKey); + } + String startTimeParam = DateUtil.yyyy_MM_dd_HH_mm_ssTo1078(startTime); + String endTimeParam = DateUtil.yyyy_MM_dd_HH_mm_ssTo1078(endTime); + String stream = "jt_" + phoneNumber + "_" + channelId + "_" + startTimeParam + "_" + endTimeParam; + MediaServer mediaServer = mediaServerService.getMediaServerForMinimumLoad(null); + if (mediaServer == null) { + for (GeneralCallback errorCallback : errorCallbacks) { + errorCallback.run(InviteErrorCode.FAIL.getCode(), "未找到可用的媒体节点", streamInfo); + } + return; + } + // 设置hook监听 + Hook hookSubscribe = Hook.getInstance(HookType.on_media_arrival, "rtp", stream, mediaServer.getId()); + subscribe.addSubscribe(hookSubscribe, (hookData) -> { + dynamicTask.stop(playbackKey); + log.info("[1078-回放] 回放成功, logInfo: {}", logInfo); + StreamInfo info = onPublishHandler(mediaServer, hookData, phoneNumber, channelId); + + for (GeneralCallback errorCallback : errorCallbacks) { + errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), info); + } + subscribe.removeSubscribe(hookSubscribe); + redisTemplate.opsForValue().set(playbackKey, info); + }); + // 设置超时监听 + dynamicTask.startDelay(playbackKey, () -> { + log.info("[1078-回放] 回放超时, logInfo: {}", logInfo); + for (GeneralCallback errorCallback : errorCallbacks) { + errorCallback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(), + InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null); + } + + }, userSetting.getPlayTimeout()); + + // 开启收流端口 + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServer, stream, null, false, false, 0, false, false, false, 1); + log.info("[1078-回放] logInfo: {}, 端口: {}", logInfo, ssrcInfo.getPort()); + J9201 j9201 = new J9201(); + j9201.setChannel(channelId); + j9201.setIp(mediaServer.getSdpIp()); + if (rate != null) { + j9201.setRate(rate); + } + if (playbackType != null) { + j9201.setPlaybackType(playbackType); + } + if (playbackSpeed != null) { + j9201.setPlaybackSpeed(playbackSpeed); + } + + j9201.setTcpPort(ssrcInfo.getPort()); + j9201.setUdpPort(ssrcInfo.getPort()); + j9201.setType(type); + j9201.setStartTime(DateUtil.yyyy_MM_dd_HH_mm_ssTo1078(startTime)); + j9201.setEndTime(DateUtil.yyyy_MM_dd_HH_mm_ssTo1078(endTime)); + jt1078Template.startBackLive(phoneNumber, j9201, 20); + + } + + @Override + public void playbackControl(String phoneNumber, Integer channelId, Integer command, Integer playbackSpeed, String time) { + log.info("[1078-回放控制] phoneNumber: {}, channelId: {}, command: {}, playbackSpeed: {}, time: {}", + phoneNumber, channelId, command, playbackSpeed, time); + String playKey = VideoManagerConstants.INVITE_INFO_1078_PLAYBACK + phoneNumber + ":" + channelId; + dynamicTask.stop(playKey); + if (command == 2) { + // 结束回放 + StreamInfo streamInfo = (StreamInfo) redisTemplate.opsForValue().get(playKey); + // 删除缓存数据 + if (streamInfo != null) { + // 关闭rtpServer + mediaServerService.closeRTPServer(streamInfo.getMediaServer(), streamInfo.getStream()); + } + // 清理回调 + List> generalCallbacks = inviteErrorCallbackMap.get(playKey); + if (generalCallbacks != null && !generalCallbacks.isEmpty()) { + for (GeneralCallback callback : generalCallbacks) { + callback.run(InviteErrorCode.ERROR_FOR_FINISH.getCode(), InviteErrorCode.ERROR_FOR_FINISH.getMsg(), null); + } + } + } + // 发送停止命令 + J9202 j9202 = new J9202(); + j9202.setChannel(channelId); + j9202.setPlaybackType(command); + if (playbackSpeed != null) { + j9202.setPlaybackSpeed(playbackSpeed); + } + if (!ObjectUtils.isEmpty(time)) { + j9202.setPlaybackTime(DateUtil.yyyy_MM_dd_HH_mm_ssTo1078(time)); + } + jt1078Template.controlBackLive(phoneNumber, j9202, 6); + } + + @Override + public void stopPlayback(String phoneNumber, Integer channelId) { + playbackControl(phoneNumber, channelId, 2, null, null); + } + + private Map> fileUploadMap = new ConcurrentHashMap<>(); + + @EventListener + public void onApplicationEvent(FtpUploadEvent event) { + if (fileUploadMap.isEmpty()) { + return; + } + fileUploadMap.keySet().forEach(key -> { + if (!event.getFileName().contains(key)) { + return; + } + GeneralCallback callback = fileUploadMap.get(key); + if (callback != null) { + callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), event.getFileName()); + fileUploadMap.remove(key); + } + }); + } + + /** + * 监听发流停止 + */ + @EventListener + public void onApplicationEvent(MediaSendRtpStoppedEvent event) { + + List sendRtpInfos = sendRtpServerService.queryByStream(event.getStream()); + if (sendRtpInfos.isEmpty()) { + return; + } + for (SendRtpInfo sendRtpInfo : sendRtpInfos) { + if (!sendRtpInfo.isOnlyAudio() || ObjectUtils.isEmpty(sendRtpInfo.getChannelId())) { + continue; + } + if (!sendRtpInfo.getSsrc().contains("_")) { + continue; + } + sendRtpServerService.delete(sendRtpInfo); + String playKey = VideoManagerConstants.INVITE_INFO_1078_TALK + sendRtpInfo.getApp() + ":" + sendRtpInfo.getStream(); + redisTemplate.delete(playKey); + } + } + + @Override + public void startTalk(String phoneNumber, Integer channelId, String app, String stream, String mediaServerId, Boolean onlySend, + GeneralCallback callback) { + // 检查流是否已经存在,存在则返回 + String playKey = VideoManagerConstants.INVITE_INFO_1078_TALK + phoneNumber + ":" + channelId; + List> errorCallbacks = inviteErrorCallbackMap.computeIfAbsent(playKey, k -> new ArrayList<>()); + errorCallbacks.add(callback); + StreamInfo streamInfo = (StreamInfo) redisTemplate.opsForValue().get(playKey); + if (streamInfo != null) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "对讲进行中"); + } + + String receiveStream = "jt_" + phoneNumber + "_" + channelId + "_talk"; + MediaServer mediaServer = mediaServerService.getOne(mediaServerId); + if (mediaServer == null) { + for (GeneralCallback errorCallback : errorCallbacks) { + errorCallback.run(InviteErrorCode.FAIL.getCode(), "未找到可用的媒体节点", streamInfo); + } + return; + } + // 检查待发送的流是否存在, + MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, app, stream); + if (mediaInfo == null) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), app + "/" + stream + "流不存在"); + } + // 开启收流端口, zlm发送1078的rtp流需要将ssrc字段设置为 imei_channel格式 + String ssrc = phoneNumber + "_" + channelId; + sendRtpServerService.createSendRtpInfo(mediaServer, ) + + SendRtpInfo.getInstance(app, stream, ssrc, ) + + SendRtpInfo sendRtpInfo = new SendRtpInfo(); + sendRtpInfo.setMediaServerId(mediaServerId); + sendRtpInfo.setPort(0); + sendRtpInfo.setSsrc(ssrc); + sendRtpInfo.setChannelId(channelId ); + sendRtpInfo.setRtcp(false); + sendRtpInfo.setApp(app); + sendRtpInfo.setStream(stream); + sendRtpInfo.setTcp(true); + sendRtpInfo.setTcpActive(true); + sendRtpInfo.setUsePs(false); + sendRtpInfo.setOnlyAudio(true); + if (onlySend == null || !onlySend) { + sendRtpInfo.setReceiveStream(receiveStream); + } + sendRtpInfo.setPlatformId(phoneNumber); + if (onlySend == null || !onlySend) { + // 设置hook监听 + Hook hook = Hook.getInstance(HookType.on_media_arrival, "rtp", receiveStream, mediaServer.getId()); + subscribe.addSubscribe(hook, (hookData) -> { + dynamicTask.stop(playKey); + log.info("[1078-对讲] 对讲成功, phoneNumber: {}, channelId: {}", phoneNumber, channelId); + StreamInfo info = onPublishHandler(mediaServer, hookData, phoneNumber, channelId); + + for (GeneralCallback errorCallback : errorCallbacks) { + errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), info); + } + subscribe.removeSubscribe(hook); + redisTemplate.opsForValue().set(playKey, info); + // 存储发流信息 + redisCatchStorage.updateSendRTPSever(sendRtpInfo); + }); + Hook hookForDeparture = Hook.getInstance(HookType.on_media_departure, "rtp", receiveStream, mediaServer.getId()); + subscribe.addSubscribe(hookForDeparture, (hookData) -> { + log.info("[1078-对讲] 对讲时源流注销, app: {}. stream: {}, phoneNumber: {}, channelId: {}", app, stream, phoneNumber, channelId); + stopTalk(phoneNumber, channelId); + }); + // 设置超时监听 + dynamicTask.startDelay(playKey, () -> { + log.info("[1078-对讲] 超时, phoneNumber: {}, channelId: {}", phoneNumber, channelId); + for (GeneralCallback errorCallback : errorCallbacks) { + errorCallback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(), + InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null); + } + + }, userSetting.getPlayTimeout()); + } + + Integer localPort = mediaServerService.startSendRtpPassive(mediaServer, sendRtpInfo, 15000); + + log.info("[1078-对讲] phoneNumber: {}, channelId: {}, 收发端口: {}, app: {}, stream: {}", + phoneNumber, channelId, localPort, app, stream); + J9101 j9101 = new J9101(); + j9101.setChannel(Integer.valueOf(channelId)); + j9101.setIp(mediaServer.getSdpIp()); + j9101.setRate(1); + j9101.setTcpPort(localPort); + j9101.setUdpPort(localPort); + j9101.setType(2); + jt1078Template.startLive(phoneNumber, j9101, 6); + if (onlySend != null && onlySend) { + log.info("[1078-对讲] 对讲成功, phoneNumber: {}, channelId: {}", phoneNumber, channelId); + for (GeneralCallback errorCallback : errorCallbacks) { + errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), null); + } + // 存储发流信息 + redisCatchStorage.updateSendRTPSever(sendRtpInfo); + } + } + + @Override + public void stopTalk(String phoneNumber, Integer channelId) { + String playKey = VideoManagerConstants.INVITE_INFO_1078_TALK + phoneNumber + ":" + channelId; + dynamicTask.stop(playKey); + StreamInfo streamInfo = (StreamInfo) redisTemplate.opsForValue().get(playKey); + // 发送停止命令 + J9102 j9102 = new J9102(); + j9102.setChannel(Integer.valueOf(channelId)); + j9102.setCommand(4); + j9102.setCloseType(0); + j9102.setStreamType(1); + jt1078Template.stopLive(phoneNumber, j9102, 6); + log.info("[1078-停止对讲] phoneNumber: {}, channelId: {}", phoneNumber, channelId); + // 删除缓存数据 + if (streamInfo != null) { + redisTemplate.delete(playKey); + // 关闭rtpServer + mediaServerService.closeRTPServer(streamInfo.getMediaServerId(), streamInfo.getStream()); + } + // 清理回调 + List> generalCallbacks = inviteErrorCallbackMap.get(playKey); + if (generalCallbacks != null && !generalCallbacks.isEmpty()) { + for (GeneralCallback callback : generalCallbacks) { + callback.run(InviteErrorCode.ERROR_FOR_FINISH.getCode(), InviteErrorCode.ERROR_FOR_FINISH.getMsg(), null); + } + } + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078ServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078ServiceImpl.java index 90a2992e9..53aef875a 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078ServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078ServiceImpl.java @@ -7,35 +7,27 @@ import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.ftpServer.FtpSetting; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; -import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; +import com.genersoft.iot.vmp.gb28181.service.IGbChannelService; import com.genersoft.iot.vmp.jt1078.bean.*; import com.genersoft.iot.vmp.jt1078.bean.common.ConfigAttribute; import com.genersoft.iot.vmp.jt1078.cmd.JT1078Template; import com.genersoft.iot.vmp.jt1078.dao.JTChannelMapper; import com.genersoft.iot.vmp.jt1078.dao.JTTerminalMapper; import com.genersoft.iot.vmp.jt1078.event.FtpUploadEvent; -import com.genersoft.iot.vmp.jt1078.proc.request.J1205; import com.genersoft.iot.vmp.jt1078.proc.response.*; import com.genersoft.iot.vmp.jt1078.service.Ijt1078Service; -import com.genersoft.iot.vmp.media.bean.MediaInfo; -import com.genersoft.iot.vmp.media.bean.MediaServer; -import com.genersoft.iot.vmp.media.event.hook.Hook; -import com.genersoft.iot.vmp.media.event.hook.HookData; -import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; -import com.genersoft.iot.vmp.media.event.hook.HookType; import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent; import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent; import com.genersoft.iot.vmp.media.event.media.MediaNotFoundEvent; -import com.genersoft.iot.vmp.media.event.mediaServer.MediaSendRtpStoppedEvent; import com.genersoft.iot.vmp.media.service.IMediaServerService; -import com.genersoft.iot.vmp.service.bean.InviteErrorCode; -import com.genersoft.iot.vmp.service.bean.SSRCInfo; +import com.genersoft.iot.vmp.service.ISendRtpServerService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.utils.MediaServerUtils; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageInfo; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.ObjectUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,17 +37,16 @@ import org.springframework.data.redis.core.RedisTemplate; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; -import org.springframework.web.bind.annotation.CrossOrigin; +import org.springframework.transaction.annotation.Transactional; import java.lang.reflect.Field; import java.util.*; import java.util.concurrent.ConcurrentHashMap; @Service +@Slf4j public class jt1078ServiceImpl implements Ijt1078Service { - private final static Logger logger = LoggerFactory.getLogger(jt1078ServiceImpl.class); - @Autowired private JTTerminalMapper jtDeviceMapper; @@ -69,10 +60,7 @@ public class jt1078ServiceImpl implements Ijt1078Service { private RedisTemplate redisTemplate; @Autowired - private HookSubscribe subscribe; - - @Autowired - private IMediaServerService mediaServerService; + private IGbChannelService channelService; @Autowired private DynamicTask dynamicTask; @@ -80,9 +68,6 @@ public class jt1078ServiceImpl implements Ijt1078Service { @Autowired private UserSetting userSetting; - @Autowired - private IRedisCatchStorage redisCatchStorage; - @Autowired private FtpSetting ftpSetting; @@ -104,45 +89,6 @@ public class jt1078ServiceImpl implements Ijt1078Service { } - /** - * 流未找到的处理 - */ - @Async("taskExecutor") - @EventListener - public void onApplicationEvent(MediaNotFoundEvent event) { - if (!userSetting.isAutoApplyPlay()) { - return; - } - JTMediaStreamType jtMediaStreamType = checkStreamFromJt(event.getStream()); - if (jtMediaStreamType == null){ - return; - } - String[] streamParamArray = event.getStream().split("_"); - String phoneNumber = streamParamArray[1]; - int channelId = Integer.parseInt(streamParamArray[2]); - String params = event.getParams(); - Map paramMap = MediaServerUtils.urlParamToMap(params); - int type = 0; - try { - type = Integer.parseInt(paramMap.get("type")); - }catch (NumberFormatException ignored) {} - if (jtMediaStreamType.equals(JTMediaStreamType.PLAY)) { - play(phoneNumber, channelId, 0, null); - }else if (jtMediaStreamType.equals(JTMediaStreamType.PLAYBACK)) { - String startTimeParam = DateUtil.jt1078Toyyyy_MM_dd_HH_mm_ss(streamParamArray[3]); - String endTimeParam = DateUtil.jt1078Toyyyy_MM_dd_HH_mm_ss(streamParamArray[4]); - int rate = 0; - int playbackType = 0; - int playbackSpeed = 0; - try { - rate = Integer.parseInt(paramMap.get("rate")); - playbackType = Integer.parseInt(paramMap.get("playbackType")); - playbackSpeed = Integer.parseInt(paramMap.get("playbackSpeed")); - }catch (NumberFormatException ignored) {} - playback(phoneNumber, channelId, startTimeParam, endTimeParam, type, rate, playbackType, playbackSpeed, null); - } - } - /** * 校验流是否是属于部标的 @@ -171,7 +117,7 @@ public class jt1078ServiceImpl implements Ijt1078Service { @Override public JTChannel getChannel(Integer terminalDbId, Integer channelId) { - return jtChannelMapper.getChannel(terminalDbId, channelId); + return jtChannelMapper.selectChannelByChannelId(terminalDbId, channelId); } @Override @@ -208,326 +154,6 @@ public class jt1078ServiceImpl implements Ijt1078Service { jtDeviceMapper.updateDeviceStatus(connected, phoneNumber); } - private final Map>> inviteErrorCallbackMap = new ConcurrentHashMap<>(); - - @Override - public void play(String phoneNumber, Integer channelId, int type, GeneralCallback callback) { - JTDevice device = getDevice(phoneNumber); - if (device == null) { - throw new ControllerException(ErrorCode.ERROR100.getCode(), "设备不存在"); - } - jt1078Template.checkTerminalStatus(phoneNumber); - JTChannel channel = getChannel(device.getId(), channelId); - if (channel == null) { - throw new ControllerException(ErrorCode.ERROR100.getCode(), "通道不存在"); - } - // 检查流是否已经存在,存在则返回 - String playKey = VideoManagerConstants.INVITE_INFO_1078_PLAY + phoneNumber + ":" + channelId; - List> errorCallbacks = inviteErrorCallbackMap.computeIfAbsent(playKey, k -> new ArrayList<>()); - errorCallbacks.add(callback); - StreamInfo streamInfo = (StreamInfo) redisTemplate.opsForValue().get(playKey); - if (streamInfo != null) { - String mediaServerId = streamInfo.getMediaServerId(); - MediaServer mediaServer = mediaServerService.getOne(mediaServerId); - if (mediaServer != null) { - // 查询流是否存在,不存在则删除缓存数据 - MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, "rtp", streamInfo.getStream()); - if (mediaInfo != null) { - logger.info("[1078-点播] 点播已经存在,直接返回, phoneNumber: {}, channelId: {}", phoneNumber, channelId); - for (GeneralCallback errorCallback : errorCallbacks) { - errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo); - } - return; - } - } - // 清理数据 - redisTemplate.delete(playKey); - } - String stream = "jt_" + phoneNumber + "_" + channelId; - MediaServer mediaServer = mediaServerService.getMediaServerForMinimumLoad(null); - if (mediaServer == null) { - for (GeneralCallback errorCallback : errorCallbacks) { - errorCallback.run(InviteErrorCode.FAIL.getCode(), "未找到可用的媒体节点", streamInfo); - } - return; - } - // 设置hook监听 - Hook hook = Hook.getInstance(HookType.on_media_arrival, "rtp", stream, mediaServer.getId()); - subscribe.addSubscribe(hook, (hookData) -> { - dynamicTask.stop(playKey); - logger.info("[1078-点播] 点播成功, 手机号: {}, 通道: {}", phoneNumber, channelId); - // TODO 发送9105 实时音视频传输状态通知, 通知丢包率 - StreamInfo info = onPublishHandler(mediaServer, hookData, phoneNumber, channelId); - - for (GeneralCallback errorCallback : errorCallbacks) { - if (errorCallback == null) { - continue; - } - errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), info); - } - subscribe.removeSubscribe(hook); - redisTemplate.opsForValue().set(playKey, info); - // 截图 - String streamUrl; - if (mediaServer.getRtspPort() != 0) { - streamUrl = String.format("rtsp://127.0.0.1:%s/%s/%s", mediaServer.getRtspPort(), "rtp", stream); - } else { - streamUrl = String.format("http://127.0.0.1:%s/%s/%s.live.mp4", mediaServer.getHttpPort(), "rtp", stream); - } - String path = "snap"; - String fileName = phoneNumber + "_" + channelId + ".jpg"; - // 请求截图 - logger.info("[请求截图]: " + fileName); - mediaServerService.getSnap(mediaServer, streamUrl, 15, 1, path, fileName); - }); - // 开启收流端口 - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServer, stream, "000", false, false, 0, false, !channel.getHasAudio(), false, 1); - if (ssrcInfo == null) { - stopPlay(phoneNumber, channelId); - return; - } - // 设置超时监听 - dynamicTask.startDelay(playKey, () -> { - logger.info("[1078-点播] 超时, phoneNumber: {}, channelId: {}", phoneNumber, channelId); - for (GeneralCallback errorCallback : errorCallbacks) { - errorCallback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(), - InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null); - } - mediaServerService.closeRTPServer(mediaServer, stream); - subscribe.removeSubscribe(hook); - }, userSetting.getPlayTimeout()); - - logger.info("[1078-点播] phoneNumber: {}, channelId: {}, 端口: {}", phoneNumber, channelId, ssrcInfo.getPort()); - J9101 j9101 = new J9101(); - j9101.setChannel(Integer.valueOf(channelId)); - j9101.setIp(mediaServer.getSdpIp()); - j9101.setRate(1); - j9101.setTcpPort(ssrcInfo.getPort()); - j9101.setUdpPort(ssrcInfo.getPort()); - j9101.setType(type); - jt1078Template.startLive(phoneNumber, j9101, 6); - } - - public StreamInfo onPublishHandler(MediaServer mediaServerItem, HookData hookData, String phoneNumber, Integer channelId) { - StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream(mediaServerItem, "rtp", hookData.getStream(), hookData.getMediaInfo(), null); - streamInfo.setDeviceID(phoneNumber); - streamInfo.setChannelId(channelId + ""); - return streamInfo; - } - - @Override - public void stopPlay(String phoneNumber, Integer channelId) { - String playKey = VideoManagerConstants.INVITE_INFO_1078_PLAY + phoneNumber + ":" + channelId; - dynamicTask.stop(playKey); - // 清理回调 - List> generalCallbacks = inviteErrorCallbackMap.get(playKey); - if (generalCallbacks != null && !generalCallbacks.isEmpty()) { - for (GeneralCallback callback : generalCallbacks) { - callback.run(InviteErrorCode.ERROR_FOR_FINISH.getCode(), InviteErrorCode.ERROR_FOR_FINISH.getMsg(), null); - } - } - jt1078Template.checkTerminalStatus(phoneNumber); - StreamInfo streamInfo = (StreamInfo) redisTemplate.opsForValue().get(playKey); - // 发送停止命令 - J9102 j9102 = new J9102(); - j9102.setChannel(Integer.valueOf(channelId)); - j9102.setCommand(0); - j9102.setCloseType(0); - j9102.setStreamType(1); - jt1078Template.stopLive(phoneNumber, j9102, 6); - logger.info("[1078-停止点播] phoneNumber: {}, channelId: {}", phoneNumber, channelId); - // 删除缓存数据 - if (streamInfo != null) { - // 关闭rtpServer - mediaServerService.closeRTPServer(streamInfo.getMediaServerId(), streamInfo.getStream()); - redisTemplate.delete(playKey); - } - - } - - @Override - public void pausePlay(String phoneNumber, Integer channelId) { - String playKey = VideoManagerConstants.INVITE_INFO_1078_PLAY + phoneNumber + ":" + channelId; - dynamicTask.stop(playKey); - StreamInfo streamInfo = (StreamInfo) redisTemplate.opsForValue().get(playKey); - if (streamInfo == null) { - logger.info("[1078-暂停点播] 未找到点播信息 phoneNumber: {}, channelId: {}", phoneNumber, channelId); - } - logger.info("[1078-暂停点播] phoneNumber: {}, channelId: {}", phoneNumber, channelId); - // 发送暂停命令 - J9102 j9102 = new J9102(); - j9102.setChannel(Integer.valueOf(channelId)); - j9102.setCommand(2); - j9102.setCloseType(0); - j9102.setStreamType(1); - jt1078Template.stopLive(phoneNumber, j9102, 6); - } - - @Override - public void continueLivePlay(String phoneNumber, Integer channelId) { - String playKey = VideoManagerConstants.INVITE_INFO_1078_PLAY + phoneNumber + ":" + channelId; - dynamicTask.stop(playKey); - StreamInfo streamInfo = (StreamInfo) redisTemplate.opsForValue().get(playKey); - if (streamInfo == null) { - logger.info("[1078-继续点播] 未找到点播信息 phoneNumber: {}, channelId: {}", phoneNumber, channelId); - } - logger.info("[1078-继续点播] phoneNumber: {}, channelId: {}", phoneNumber, channelId); - // 发送暂停命令 - J9102 j9102 = new J9102(); - j9102.setChannel(Integer.valueOf(channelId)); - j9102.setCommand(2); - j9102.setCloseType(0); - j9102.setStreamType(1); - jt1078Template.stopLive(phoneNumber, j9102, 6); - } - - @Override - public List getRecordList(String phoneNumber, Integer channelId, String startTime, String endTime) { - logger.info("[1078-查询录像列表] phoneNumber: {}, channelId: {}, startTime: {}, endTime: {}" - , phoneNumber, channelId, startTime, endTime); - // 发送请求录像列表命令 - J9205 j9205 = new J9205(); - j9205.setChannelId(channelId); - j9205.setStartTime(DateUtil.yyyy_MM_dd_HH_mm_ssTo1078(startTime)); - j9205.setEndTime(DateUtil.yyyy_MM_dd_HH_mm_ssTo1078(endTime)); - j9205.setMediaType(0); - j9205.setStreamType(0); - j9205.setStorageType(0); - List JRecordItemList = (List) jt1078Template.queryBackTime(phoneNumber, j9205, 20); - if (JRecordItemList == null || JRecordItemList.isEmpty()) { - return null; - } - logger.info("[1078-查询录像列表] phoneNumber: {}, channelId: {}, startTime: {}, endTime: {}, 结果: {}条" - , phoneNumber, channelId, startTime, endTime, JRecordItemList.size()); - return JRecordItemList; - } - - @Override - public void playback(String phoneNumber, Integer channelId, String startTime, String endTime, Integer type, - Integer rate, Integer playbackType, Integer playbackSpeed, GeneralCallback callback) { - logger.info("[1078-回放] 回放,设备:{}, 通道: {}, 开始时间: {}, 结束时间: {}, 音视频类型: {}, 码流类型: {}, " + - "回放方式: {}, 快进或快退倍数: {}", phoneNumber, channelId, startTime, endTime, type, rate, playbackType, playbackSpeed); - // 检查流是否已经存在,存在则返回 - String playbackKey = VideoManagerConstants.INVITE_INFO_1078_PLAYBACK + phoneNumber + ":" + channelId; - List> errorCallbacks = inviteErrorCallbackMap.computeIfAbsent(playbackKey, k -> new ArrayList<>()); - errorCallbacks.add(callback); - String logInfo = String.format("phoneNumber:%s, channelId:%s, startTime:%s, endTime:%s", phoneNumber, channelId, startTime, endTime); - StreamInfo streamInfo = (StreamInfo) redisTemplate.opsForValue().get(playbackKey); - if (streamInfo != null) { - String mediaServerId = streamInfo.getMediaServerId(); - MediaServer mediaServer = mediaServerService.getOne(mediaServerId); - if (mediaServer != null) { - // 查询流是否存在,不存在则删除缓存数据 - MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, "rtp", streamInfo.getStream()); - if (mediaInfo != null) { - logger.info("[1078-回放] 回放已经存在,直接返回, logInfo: {}", logInfo); - for (GeneralCallback errorCallback : errorCallbacks) { - errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo); - } - return; - } - } - // 清理数据 - redisTemplate.delete(playbackKey); - } - String startTimeParam = DateUtil.yyyy_MM_dd_HH_mm_ssTo1078(startTime); - String endTimeParam = DateUtil.yyyy_MM_dd_HH_mm_ssTo1078(endTime); - String stream = "jt_" + phoneNumber + "_" + channelId + "_" + startTimeParam + "_" + endTimeParam; - MediaServer mediaServer = mediaServerService.getMediaServerForMinimumLoad(null); - if (mediaServer == null) { - for (GeneralCallback errorCallback : errorCallbacks) { - errorCallback.run(InviteErrorCode.FAIL.getCode(), "未找到可用的媒体节点", streamInfo); - } - return; - } - // 设置hook监听 - Hook hookSubscribe = Hook.getInstance(HookType.on_media_arrival, "rtp", stream, mediaServer.getId()); - subscribe.addSubscribe(hookSubscribe, (hookData) -> { - dynamicTask.stop(playbackKey); - logger.info("[1078-回放] 回放成功, logInfo: {}", logInfo); - StreamInfo info = onPublishHandler(mediaServer, hookData, phoneNumber, channelId); - - for (GeneralCallback errorCallback : errorCallbacks) { - errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), info); - } - subscribe.removeSubscribe(hookSubscribe); - redisTemplate.opsForValue().set(playbackKey, info); - }); - // 设置超时监听 - dynamicTask.startDelay(playbackKey, () -> { - logger.info("[1078-回放] 回放超时, logInfo: {}", logInfo); - for (GeneralCallback errorCallback : errorCallbacks) { - errorCallback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(), - InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null); - } - - }, userSetting.getPlayTimeout()); - - // 开启收流端口 - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServer, stream, null, false, false, 0, false, false, false, 1); - logger.info("[1078-回放] logInfo: {}, 端口: {}", logInfo, ssrcInfo.getPort()); - J9201 j9201 = new J9201(); - j9201.setChannel(channelId); - j9201.setIp(mediaServer.getSdpIp()); - if (rate != null) { - j9201.setRate(rate); - } - if (playbackType != null) { - j9201.setPlaybackType(playbackType); - } - if (playbackSpeed != null) { - j9201.setPlaybackSpeed(playbackSpeed); - } - - j9201.setTcpPort(ssrcInfo.getPort()); - j9201.setUdpPort(ssrcInfo.getPort()); - j9201.setType(type); - j9201.setStartTime(DateUtil.yyyy_MM_dd_HH_mm_ssTo1078(startTime)); - j9201.setEndTime(DateUtil.yyyy_MM_dd_HH_mm_ssTo1078(endTime)); - jt1078Template.startBackLive(phoneNumber, j9201, 20); - - } - - @Override - public void playbackControl(String phoneNumber, Integer channelId, Integer command, Integer playbackSpeed, String time) { - logger.info("[1078-回放控制] phoneNumber: {}, channelId: {}, command: {}, playbackSpeed: {}, time: {}", - phoneNumber, channelId, command, playbackSpeed, time); - String playKey = VideoManagerConstants.INVITE_INFO_1078_PLAYBACK + phoneNumber + ":" + channelId; - dynamicTask.stop(playKey); - if (command == 2) { - // 结束回放 - StreamInfo streamInfo = (StreamInfo) redisTemplate.opsForValue().get(playKey); - // 删除缓存数据 - if (streamInfo != null) { - // 关闭rtpServer - mediaServerService.closeRTPServer(streamInfo.getMediaServerId(), streamInfo.getStream()); - } - // 清理回调 - List> generalCallbacks = inviteErrorCallbackMap.get(playKey); - if (generalCallbacks != null && !generalCallbacks.isEmpty()) { - for (GeneralCallback callback : generalCallbacks) { - callback.run(InviteErrorCode.ERROR_FOR_FINISH.getCode(), InviteErrorCode.ERROR_FOR_FINISH.getMsg(), null); - } - } - } - // 发送停止命令 - J9202 j9202 = new J9202(); - j9202.setChannel(channelId); - j9202.setPlaybackType(command); - if (playbackSpeed != null) { - j9202.setPlaybackSpeed(playbackSpeed); - } - if (!ObjectUtils.isEmpty(time)) { - j9202.setPlaybackTime(DateUtil.yyyy_MM_dd_HH_mm_ssTo1078(time)); - } - jt1078Template.controlBackLive(phoneNumber, j9202, 6); - } - - @Override - public void stopPlayback(String phoneNumber, Integer channelId) { - playbackControl(phoneNumber, channelId, 2, null, null); - } - private Map> fileUploadMap = new ConcurrentHashMap<>(); @EventListener @@ -554,7 +180,7 @@ public class jt1078ServiceImpl implements Ijt1078Service { dynamicTask.startDelay(filePath, ()->{ fileUploadMap.remove(filePath); }, 2*60*60*1000); - logger.info("[1078-录像] 下载,设备:{}, 通道: {}, 开始时间: {}, 结束时间: {},等待上传文件路径: {} ", + log.info("[1078-录像] 下载,设备:{}, 通道: {}, 开始时间: {}, 结束时间: {},等待上传文件路径: {} ", phoneNumber, channelId, startTime, endTime, filePath); // 发送停止命令 J9206 j92026 = new J9206(); @@ -949,162 +575,15 @@ public class jt1078ServiceImpl implements Ijt1078Service { return (JTMediaAttribute) jt1078Template.queryMediaAttribute(phoneNumber, j9003, 300); } - /** - * 监听发流停止 - */ - @EventListener - public void onApplicationEvent(MediaSendRtpStoppedEvent event) { - List sendRtpItems = redisCatchStorage.querySendRTPServerByStream(event.getStream()); - for (SendRtpItem sendRtpItem : sendRtpItems) { - if (!sendRtpItem.isOnlyAudio() - || ObjectUtils.isEmpty(sendRtpItem.getDeviceId()) - || ObjectUtils.isEmpty(sendRtpItem.getChannelId())) { - continue; - } - if (!sendRtpItem.getSsrc().contains("_")) { - continue; - } - redisCatchStorage.deleteSendRTPServer(sendRtpItem); - String playKey = VideoManagerConstants.INVITE_INFO_1078_TALK + sendRtpItem.getDeviceId() + ":" + sendRtpItem.getChannelId(); - redisTemplate.delete(playKey); - } - } - - @Override - public void startTalk(String phoneNumber, Integer channelId, String app, String stream, String mediaServerId, Boolean onlySend, - GeneralCallback callback) { - // 检查流是否已经存在,存在则返回 - String playKey = VideoManagerConstants.INVITE_INFO_1078_TALK + phoneNumber + ":" + channelId; - List> errorCallbacks = inviteErrorCallbackMap.computeIfAbsent(playKey, k -> new ArrayList<>()); - errorCallbacks.add(callback); - StreamInfo streamInfo = (StreamInfo) redisTemplate.opsForValue().get(playKey); - if (streamInfo != null) { - throw new ControllerException(ErrorCode.ERROR100.getCode(), "对讲进行中"); - } - - String receiveStream = "jt_" + phoneNumber + "_" + channelId + "_talk"; - MediaServer mediaServer = mediaServerService.getOne(mediaServerId); - if (mediaServer == null) { - for (GeneralCallback errorCallback : errorCallbacks) { - errorCallback.run(InviteErrorCode.FAIL.getCode(), "未找到可用的媒体节点", streamInfo); - } - return; - } - // 检查待发送的流是否存在, - MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, app, stream); - if (mediaInfo == null) { - throw new ControllerException(ErrorCode.ERROR100.getCode(), app + "/" + stream + "流不存在"); - } - // 开启收流端口, zlm发送1078的rtp流需要将ssrc字段设置为 imei_channel格式 - String ssrc = phoneNumber + "_" + channelId; - SendRtpItem sendRtpItem = new SendRtpItem(); - sendRtpItem.setMediaServerId(mediaServerId); - sendRtpItem.setPort(0); - sendRtpItem.setSsrc(ssrc); - sendRtpItem.setDeviceId(phoneNumber); - sendRtpItem.setChannelId(channelId + ""); - sendRtpItem.setRtcp(false); - sendRtpItem.setApp(app); - sendRtpItem.setStream(stream); - sendRtpItem.setTcp(true); - sendRtpItem.setTcpActive(true); - sendRtpItem.setUsePs(false); - sendRtpItem.setOnlyAudio(true); - if (onlySend == null || !onlySend) { - sendRtpItem.setReceiveStream(receiveStream); - } - sendRtpItem.setPlatformId(phoneNumber); - if (onlySend == null || !onlySend) { - // 设置hook监听 - Hook hook = Hook.getInstance(HookType.on_media_arrival, "rtp", receiveStream, mediaServer.getId()); - subscribe.addSubscribe(hook, (hookData) -> { - dynamicTask.stop(playKey); - logger.info("[1078-对讲] 对讲成功, phoneNumber: {}, channelId: {}", phoneNumber, channelId); - StreamInfo info = onPublishHandler(mediaServer, hookData, phoneNumber, channelId); - - for (GeneralCallback errorCallback : errorCallbacks) { - errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), info); - } - subscribe.removeSubscribe(hook); - redisTemplate.opsForValue().set(playKey, info); - // 存储发流信息 - redisCatchStorage.updateSendRTPSever(sendRtpItem); - }); - Hook hookForDeparture = Hook.getInstance(HookType.on_media_departure, "rtp", receiveStream, mediaServer.getId()); - subscribe.addSubscribe(hookForDeparture, (hookData) -> { - logger.info("[1078-对讲] 对讲时源流注销, app: {}. stream: {}, phoneNumber: {}, channelId: {}", app, stream, phoneNumber, channelId); - stopTalk(phoneNumber, channelId); - }); - // 设置超时监听 - dynamicTask.startDelay(playKey, () -> { - logger.info("[1078-对讲] 超时, phoneNumber: {}, channelId: {}", phoneNumber, channelId); - for (GeneralCallback errorCallback : errorCallbacks) { - errorCallback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(), - InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null); - } - - }, userSetting.getPlayTimeout()); - } - - Integer localPort = mediaServerService.startSendRtpPassive(mediaServer, sendRtpItem, 15000); - - logger.info("[1078-对讲] phoneNumber: {}, channelId: {}, 收发端口: {}, app: {}, stream: {}", - phoneNumber, channelId, localPort, app, stream); - J9101 j9101 = new J9101(); - j9101.setChannel(Integer.valueOf(channelId)); - j9101.setIp(mediaServer.getSdpIp()); - j9101.setRate(1); - j9101.setTcpPort(localPort); - j9101.setUdpPort(localPort); - j9101.setType(2); - jt1078Template.startLive(phoneNumber, j9101, 6); - if (onlySend != null && onlySend) { - logger.info("[1078-对讲] 对讲成功, phoneNumber: {}, channelId: {}", phoneNumber, channelId); - for (GeneralCallback errorCallback : errorCallbacks) { - errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), null); - } - // 存储发流信息 - redisCatchStorage.updateSendRTPSever(sendRtpItem); - } - } - - @Override - public void stopTalk(String phoneNumber, Integer channelId) { - String playKey = VideoManagerConstants.INVITE_INFO_1078_TALK + phoneNumber + ":" + channelId; - dynamicTask.stop(playKey); - StreamInfo streamInfo = (StreamInfo) redisTemplate.opsForValue().get(playKey); - // 发送停止命令 - J9102 j9102 = new J9102(); - j9102.setChannel(Integer.valueOf(channelId)); - j9102.setCommand(4); - j9102.setCloseType(0); - j9102.setStreamType(1); - jt1078Template.stopLive(phoneNumber, j9102, 6); - logger.info("[1078-停止对讲] phoneNumber: {}, channelId: {}", phoneNumber, channelId); - // 删除缓存数据 - if (streamInfo != null) { - redisTemplate.delete(playKey); - // 关闭rtpServer - mediaServerService.closeRTPServer(streamInfo.getMediaServerId(), streamInfo.getStream()); - } - // 清理回调 - List> generalCallbacks = inviteErrorCallbackMap.get(playKey); - if (generalCallbacks != null && !generalCallbacks.isEmpty()) { - for (GeneralCallback callback : generalCallbacks) { - callback.run(InviteErrorCode.ERROR_FOR_FINISH.getCode(), InviteErrorCode.ERROR_FOR_FINISH.getMsg(), null); - } - } - } - @Override public void changeStreamType(String phoneNumber, Integer channelId, Integer streamType) { String playKey = VideoManagerConstants.INVITE_INFO_1078_PLAY + phoneNumber + ":" + channelId; dynamicTask.stop(playKey); StreamInfo streamInfo = (StreamInfo) redisTemplate.opsForValue().get(playKey); if (streamInfo == null) { - logger.info("[1078-切换码流类型] 未找到点播信息 phoneNumber: {}, channelId: {}, streamType: {}", phoneNumber, channelId, streamType); + log.info("[1078-切换码流类型] 未找到点播信息 phoneNumber: {}, channelId: {}, streamType: {}", phoneNumber, channelId, streamType); } - logger.info("[1078-切换码流类型] phoneNumber: {}, channelId: {}, streamType: {}", phoneNumber, channelId, streamType); + log.info("[1078-切换码流类型] phoneNumber: {}, channelId: {}, streamType: {}", phoneNumber, channelId, streamType); // 发送暂停命令 J9102 j9102 = new J9102(); j9102.setChannel(Integer.valueOf(channelId)); @@ -1122,7 +601,7 @@ public class jt1078ServiceImpl implements Ijt1078Service { throw new ControllerException(ErrorCode.ERROR100.getCode(), "设备不存在"); } PageHelper.startPage(page, count); - List all = jtChannelMapper.getAll(deviceId, query); + List all = jtChannelMapper.selectAll(deviceId, query); PageInfo jtChannelPageInfo = new PageInfo<>(all); for (JTChannel jtChannel : jtChannelPageInfo.getList()) { String playKey = VideoManagerConstants.INVITE_INFO_1078_PLAY + device.getPhoneNumber() + ":" + jtChannel.getChannelId(); @@ -1135,24 +614,44 @@ public class jt1078ServiceImpl implements Ijt1078Service { } @Override + @Transactional public void updateChannel(JTChannel channel) { channel.setUpdateTime(DateUtil.getNow()); jtChannelMapper.update(channel); + if (!ObjectUtils.isEmpty(channel.getGbDeviceId())) { + if (channel.getGbId() > 0) { + channelService.update(channel.buildCommonGBChannel()); + }else { + channelService.add(channel.buildCommonGBChannel()); + } + } } @Override + @Transactional public void addChannel(JTChannel channel) { - JTChannel channelInDb = jtChannelMapper.getChannel(channel.getTerminalDbId(), channel.getChannelId()); + JTChannel channelInDb = jtChannelMapper.selectChannelByChannelId(channel.getTerminalDbId(), channel.getChannelId()); if (channelInDb != null) { throw new ControllerException(ErrorCode.ERROR100.getCode(), "通道已存在"); } channel.setCreateTime(DateUtil.getNow()); channel.setUpdateTime(DateUtil.getNow()); jtChannelMapper.add(channel); + if (!ObjectUtils.isEmpty(channel.getGbDeviceId())) { + channelService.add(channel.buildCommonGBChannel()); + } } @Override + @Transactional public void deleteChannelById(Integer id) { + JTChannel jtChannel = jtChannelMapper.selectChannelById(id); + if (jtChannel == null) { + return; + } + if (jtChannel.getGbId() > 0) { + channelService.delete(jtChannel.getGbId()); + } jtChannelMapper.delete(id); } @@ -1190,6 +689,6 @@ public class jt1078ServiceImpl implements Ijt1078Service { @Override public JTChannel getChannelByDbId(Integer id) { - return jtChannelMapper.getChannelByDbId(id); + return jtChannelMapper.selectChannelById(id); } }