临时提交

This commit is contained in:
648540858
2024-10-19 00:04:32 +08:00
parent 277f2db137
commit 46fa390387
9 changed files with 783 additions and 718 deletions

View File

@@ -132,6 +132,9 @@ public class CommonGBChannel {
@Schema(description = "关联的拉流代理Id流来源是拉流代理时有效")
private Integer streamProxyId;
@Schema(description = "关联的部标标通道ID")
private Integer jtChannelId;
@Schema(description = "创建时间")
private String createTime;

View File

@@ -4,6 +4,7 @@ import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.springframework.util.ObjectUtils;
/**
* JT 通道
@@ -49,4 +50,16 @@ public class JTChannel extends CommonGBChannel {
", hasAudio='" + hasAudio + '\'' +
'}';
}
public CommonGBChannel buildCommonGBChannel() {
if (ObjectUtils.isEmpty(this.getGbDeviceId())) {
return null;
}
if (ObjectUtils.isEmpty(this.getGbName())) {
this.setGbName(this.getName());
}
this.setJtChannelId(this.getId());
return this;
}
}

View File

@@ -71,7 +71,7 @@ public class JT1078Controller {
@Operation(summary = "1078-开始点播", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "phoneNumber", description = "设备手机号", required = true)
@Parameter(name = "channelId", description = "通道国标编号, 一般为从1开始的数字", required = true)
@Parameter(name = "channelId", description = "通道编号, 一般为从1开始的数字", required = true)
@Parameter(name = "type", description = "类型0:音视频,1:视频,3:音频", required = true)
@GetMapping("/live/start")
public DeferredResult<WVPResult<StreamContent>> startLive(HttpServletRequest request,

View File

@@ -1,7 +1,7 @@
package com.genersoft.iot.vmp.jt1078.dao;
import com.genersoft.iot.vmp.jt1078.bean.JTChannel;
import com.genersoft.iot.vmp.jt1078.bean.JTDevice;
import com.genersoft.iot.vmp.jt1078.dao.provider.JTChannelProvider;
import org.apache.ibatis.annotations.*;
import java.util.List;
@@ -9,70 +9,13 @@ import java.util.List;
@Mapper
public interface JTChannelMapper {
@Select(value = {" <script>" +
"SELECT * " +
"from " +
"wvp_jt_channel jc " +
"WHERE " +
"terminal_db_id = #{terminalDbId}" +
" <if test='query != null'> AND " +
"jc.name LIKE concat('%',#{query},'%') " +
"</if> " +
"ORDER BY jc.channel_id " +
" </script>"})
List<JTChannel> getAll(@Param("terminalDbId") int terminalDbId, @Param("query") String query);
@SelectProvider(type = JTChannelProvider.class, method = "selectAll")
List<JTChannel> selectAll(@Param("terminalDbId") int terminalDbId, @Param("query") String query);
@Update(value = {" <script>" +
"UPDATE wvp_jt_channel " +
"SET update_time=#{updateTime}" +
"<if test=\"terminalDbId != null\">, terminal_db_id=#{terminalDbId}</if>" +
"<if test=\"hasAudio != null\">, has_audio=#{hasAudio}</if>" +
"<if test=\"name != null\">, name=#{name}</if>" +
"<if test=\"channelId != null\">, channel_id=#{channelId}</if>" +
"<if test='gbManufacturer != null'>, gb_manufacturer=#{gbManufacturer}</if>" +
"<if test='gbModel != null'>, gb_model=#{gbModel}</if>" +
"<if test='gbCivilCode != null'>, gb_civil_code=#{gbCivilCode}</if>" +
"<if test='gbBlock != null'>, gb_block=#{gbBlock}</if>" +
"<if test='gbAddress != null'>, gb_address=#{gbAddress}</if>" +
"<if test='gbParental != null'>, gb_parental=#{gbParental}</if>" +
"<if test='gbParentId != null'>, gb_parent_id=#{gbParentId}</if>" +
"<if test='gbRegisterWay != null'>, gb_register_way=#{gbRegisterWay}</if>" +
"<if test='gbSecurityLevelCode != null'>, gb_security_level_code=#{gbSecurityLevelCode}</if>" +
"<if test='gbSecrecy != null'>, gb_secrecy=#{gbSecrecy}</if>" +
"<if test='gbIpAddress != null'>, gb_ip_address=#{gbIpAddress}</if>" +
"<if test='gbPort != null'>, gb_port=#{gbPort}</if>" +
"<if test='gbPassword != null'>, gb_password=#{gbPassword}</if>" +
"<if test='gbStatus != null'>, gb_status=#{gbStatus}</if>" +
"<if test='gbLongitude != null'>, gb_longitude=#{gbLongitude}</if>" +
"<if test='gbLatitude != null'>, gb_latitude=#{gbLatitude}</if>" +
"<if test='gbBusinessGroupId != null'>, gb_business_group_id=#{gbBusinessGroupId}</if>" +
"<if test='gbPtzType != null'>, gb_ptz_type=#{gbPtzType}</if>" +
"<if test='gbPhotoelectricImagingTyp != null'>, gb_photoelectric_imaging_typ=#{gbPhotoelectricImagingTyp}</if>" +
"<if test='gbCapturePositionType != null'>, gb_capture_position_type=#{gbCapturePositionType}</if>" +
"<if test='gbRoomType != null'>, gb_room_type=#{gbRoomType}</if>" +
"<if test='gbSupplyLightType != null'>, gb_supply_light_type=#{gbSupplyLightType}</if>" +
"<if test='gbDirectionType != null'>, gb_direction_type=#{gbDirectionType}</if>" +
"<if test='gbResolution != null'>, gb_resolution=#{gbResolution}</if>" +
"<if test='gbStreamNumberList != null'>, gb_stream_number_list=#{gbStreamNumberList}</if>" +
"<if test='gbDownloadSpeed != null'>, gb_download_speed=#{gbDownloadSpeed}</if>" +
"<if test='gbSvcSpaceSupportMod != null'>, gb_svc_space_support_mod=#{gbSvcSpaceSupportMod}</if>" +
"<if test='gbSvcTimeSupportMode != null'>, gb_svc_time_support_mode=#{gbSvcTimeSupportMode}</if>" +
"<if test='gbSsvcRatioSupportList != null'>, gb_ssvc_ratio_support_list=#{gbSsvcRatioSupportList}</if>" +
"<if test='gbMobileDeviceType != null'>, gb_mobile_device_type=#{gbMobileDeviceType}</if>" +
"<if test='gbHorizontalFieldAngle != null'>, gb_horizontal_field_angle=#{gbHorizontalFieldAngle}</if>" +
"<if test='gbVerticalFieldAngle != null'>, gb_vertical_field_angle=#{gbVerticalFieldAngle}</if>" +
"<if test='gbMaxViewDistance != null'>, gb_max_view_distance=#{gbMaxViewDistance}</if>" +
"<if test='gbGrassrootsCode != null'>, gb_grassroots_code=#{gbGrassrootsCode}</if>" +
"<if test='gbPoType != null'>, gb_po_type=#{gbPoType}</if>" +
"<if test='gbPoCommonName != null'>, gb_po_common_name=#{gbPoCommonName}</if>" +
"<if test='gbMac != null'>, gb_mac=#{gbMac}</if>" +
"<if test='gbFunctionType != null'>, gb_function_type=#{gbFunctionType}</if>" +
"<if test='gbEncodeType != null'>, gb_encode_type=#{gbEncodeType}</if>" +
"<if test='gbInstallTime != null'>, gb_install_time=#{gbInstallTime}</if>" +
"<if test='gbManagementUnit != null'>, gb_management_unit=#{gbManagementUnit}</if>" +
"<if test='gbContactInfo != null'>, gb_contact_info=#{gbContactInfo}</if>" +
"<if test='gbRecordSaveDays != null'>, gb_record_save_days=#{gbRecordSaveDays}</if>" +
"<if test='gbIndustrialClassification != null'>, gb_industrial_classification=#{gbIndustrialClassification}</if>" +
"SET update_time=#{updateTime}, terminal_db_id=#{terminalDbId}, has_audio=#{hasAudio}, name=#{name}" +
", channel_id=#{channelId}" +
"WHERE id=#{id}"+
" </script>"})
void update(JTChannel channel);
@@ -85,52 +28,6 @@ public interface JTChannelMapper {
"has_audio,"+
"create_time,"+
"update_time"+
"<if test='gbDeviceId != null'>, gb_device_id</if>" +
"<if test='gbName != null'>, gb_name</if>" +
"<if test='gbManufacturer != null'>, gb_manufacturer</if>" +
"<if test='gbModel != null'>, gb_model</if>" +
"<if test='gbCivilCode != null'>, gb_civil_code</if>" +
"<if test='gbBlock != null'>, gb_block</if>" +
"<if test='gbAddress != null'>, gb_address</if>" +
"<if test='gbParental != null'>, gb_parental</if>" +
"<if test='gbParentId != null'>, gb_parent_id</if>" +
"<if test='gbRegisterWay != null'>, gb_register_way</if>" +
"<if test='gbSecurityLevelCode != null'>, gb_security_level_code</if>" +
"<if test='gbSecrecy != null'>, gb_secrecy</if>" +
"<if test='gbIpAddress != null'>, gb_ip_address</if>" +
"<if test='gbPort != null'>, gb_port</if>" +
"<if test='gbPassword != null'>, gb_password</if>" +
"<if test='gbStatus != null'>, gb_status</if>" +
"<if test='gbLongitude != null'>, gb_longitude</if>" +
"<if test='gbLatitude != null'>, gb_latitude</if>" +
"<if test='gbBusinessGroupId != null'>, gb_business_group_id</if>" +
"<if test='gbPtzType != null'>, gb_ptz_type</if>" +
"<if test='gbPhotoelectricImagingTyp != null'>, gb_photoelectric_imaging_typ</if>" +
"<if test='gbCapturePositionType != null'>, gb_capture_position_type</if>" +
"<if test='gbRoomType != null'>, gb_room_type</if>" +
"<if test='gbSupplyLightType != null'>, gb_supply_light_type</if>" +
"<if test='gbDirectionType != null'>, gb_direction_type</if>" +
"<if test='gbResolution != null'>, gb_resolution</if>" +
"<if test='gbStreamNumberList != null'>, gb_stream_number_list</if>" +
"<if test='gbDownloadSpeed != null'>, gb_download_speed</if>" +
"<if test='gbSvcSpaceSupportMod != null'>, gb_svc_space_support_mod</if>" +
"<if test='gbSvcTimeSupportMode != null'>, gb_svc_time_support_mode</if>" +
"<if test='gbSsvcRatioSupportList != null'>, gb_ssvc_ratio_support_list</if>" +
"<if test='gbMobileDeviceType != null'>, gb_mobile_device_type</if>" +
"<if test='gbHorizontalFieldAngle != null'>, gb_horizontal_field_angle</if>" +
"<if test='gbVerticalFieldAngle != null'>, gb_vertical_field_angle</if>" +
"<if test='gbMaxViewDistance != null'>, gb_max_view_distance</if>" +
"<if test='gbGrassrootsCode != null'>, gb_grassroots_code</if>" +
"<if test='gbPoType != null'>, gb_po_type</if>" +
"<if test='gbPoCommonName != null'>, gb_po_common_name</if>" +
"<if test='gbMac != null'>, gb_mac</if>" +
"<if test='gbFunctionType != null'>, gb_function_type</if>" +
"<if test='gbEncodeType != null'>, gb_encode_type</if>" +
"<if test='gbInstallTime != null'>, gb_install_time</if>" +
"<if test='gbManagementUnit != null'>, gb_management_unit</if>" +
"<if test='gbContactInfo != null'>, gb_contact_info</if>" +
"<if test='gbRecordSaveDays != null'>, gb_record_save_days</if>" +
"<if test='gbIndustrialClassification != null'>, gb_industrial_classification</if>" +
") VALUES (" +
"#{terminalDbId}," +
"#{channelId}," +
@@ -138,66 +35,15 @@ public interface JTChannelMapper {
"#{hasAudio}," +
"#{createTime}," +
"#{updateTime}" +
"<if test='gbDeviceId != null'>, #{gbDeviceId}</if>" +
"<if test='gbName != null'>, #{gbName}</if>" +
"<if test='gbManufacturer != null'>, #{gbManufacturer}</if>" +
"<if test='gbModel != null'>, #{gbModel}</if>" +
"<if test='gbCivilCode != null'>, #{gbCivilCode}</if>" +
"<if test='gbBlock != null'>, #{gbBlock}</if>" +
"<if test='gbAddress != null'>, #{gbAddress}</if>" +
"<if test='gbParental != null'>, #{gbParental}</if>" +
"<if test='gbParentId != null'>, #{gbParentId}</if>" +
"<if test='gbRegisterWay != null'>, #{gbRegisterWay}</if>" +
"<if test='gbSecurityLevelCode != null'>, #{gbSecurityLevelCode}</if>" +
"<if test='gbSecrecy != null'>, #{gbSecrecy}</if>" +
"<if test='gbIpAddress != null'>, #{gbIpAddress}</if>" +
"<if test='gbPort != null'>, #{gbPort}</if>" +
"<if test='gbPassword != null'>, #{gbPassword}</if>" +
"<if test='gbStatus != null'>, #{gbStatus}</if>" +
"<if test='gbLongitude != null'>, #{gbLongitude}</if>" +
"<if test='gbLatitude != null'>, #{gbLatitude}</if>" +
"<if test='gbBusinessGroupId != null'>, #{gbBusinessGroupId}</if>" +
"<if test='gbPtzType != null'>, #{gbPtzType}</if>" +
"<if test='gbPhotoelectricImagingTyp != null'>, #{gbPhotoelectricImagingTyp}</if>" +
"<if test='gbCapturePositionType != null'>, #{gbCapturePositionType}</if>" +
"<if test='gbRoomType != null'>, #{gbRoomType}</if>" +
"<if test='gbSupplyLightType != null'>, #{gbSupplyLightType}</if>" +
"<if test='gbDirectionType != null'>, #{gbDirectionType}</if>" +
"<if test='gbResolution != null'>, #{gbResolution}</if>" +
"<if test='gbStreamNumberList != null'>, #{gbStreamNumberList}</if>" +
"<if test='gbDownloadSpeed != null'>, #{gbDownloadSpeed}</if>" +
"<if test='gbSvcSpaceSupportMod != null'>, #{gbSvcSpaceSupportMod}</if>" +
"<if test='gbSvcTimeSupportMode != null'>, #{gbSvcTimeSupportMode}</if>" +
"<if test='gbSsvcRatioSupportList != null'>, #{gbSsvcRatioSupportList}</if>" +
"<if test='gbMobileDeviceType != null'>, #{gbMobileDeviceType}</if>" +
"<if test='gbHorizontalFieldAngle != null'>, #{gbHorizontalFieldAngle}</if>" +
"<if test='gbVerticalFieldAngle != null'>, #{gbVerticalFieldAngle}</if>" +
"<if test='gbMaxViewDistance != null'>, #{gbMaxViewDistance}</if>" +
"<if test='gbGrassrootsCode != null'>, #{gbGrassrootsCode}</if>" +
"<if test='gbPoType != null'>, #{gbPoType}</if>" +
"<if test='gbPoCommonName != null'>, #{gbPoCommonName}</if>" +
"<if test='gbMac != null'>, #{gbMac}</if>" +
"<if test='gbFunctionType != null'>, #{gbFunctionType}</if>" +
"<if test='gbEncodeType != null'>, #{gbEncodeType}</if>" +
"<if test='gbInstallTime != null'>, #{gbInstallTime}</if>" +
"<if test='gbManagementUnit != null'>, #{gbManagementUnit}</if>" +
"<if test='gbContactInfo != null'>, #{gbContactInfo}</if>" +
"<if test='gbRecordSaveDays != null'>, #{gbRecordSaveDays}</if>" +
"<if test='gbIndustrialClassification != null'>, #{gbIndustrialClassification}</if>" +
" )</script>"})
@Options(useGeneratedKeys = true, keyProperty = "id", keyColumn = "id")
void add(JTChannel channel);
@Delete("delete from wvp_jt_channel where id = #{id}")
void delete(@Param("id") int id);
@Select(value = {" <script>" +
"SELECT * " +
"from " +
"wvp_jt_channel jc " +
"WHERE " +
"terminal_db_id = #{terminalDbId} and channel_id = #{channelId}" +
" </script>"})
JTChannel getChannel(@Param("terminalDbId") int terminalDbId, @Param("channelId") Integer channelId);
@SelectProvider(type = JTChannelProvider.class, method = "selectChannelByChannelId")
JTChannel selectChannelByChannelId(@Param("terminalDbId") int terminalDbId, @Param("channelId") Integer channelId);
@Select(value = {" <script>" +
" SELECT * " +
@@ -206,5 +52,6 @@ public interface JTChannelMapper {
" WHERE " +
" id = #{id}" +
" </script>"})
JTChannel getChannelByDbId(@Param("id") Integer id);
@SelectProvider(type = JTChannelProvider.class, method = "selectChannelById")
JTChannel selectChannelById(@Param("id") Integer id);
}

View File

@@ -0,0 +1,37 @@
package com.genersoft.iot.vmp.jt1078.dao.provider;
import java.util.Map;
public class JTChannelProvider {
public final static String BASE_SQL =
"SELECT jc.*, jc.id as jt_channel_id, wdc.*, wdc.id as gb_id " +
" from wvp_jt_channel jc " +
" LEFT join wvp_device_channel wdc " +
" on jc.id = wdc.jt_channel_id ";
public String selectChannelByChannelId(Map<String, Object> params ){
StringBuilder sqlBuild = new StringBuilder();
sqlBuild.append(BASE_SQL);
sqlBuild.append(" WHERE jc.terminal_db_id = #{terminalDbId} and jc.channel_id = #{channelId} ");
return sqlBuild.toString();
}
public String selectChannelById(Map<String, Object> params ){
StringBuilder sqlBuild = new StringBuilder();
sqlBuild.append(BASE_SQL);
sqlBuild.append(" WHERE jc.id = #{id}");
return sqlBuild.toString();
}
public String selectAll(Map<String, Object> params ){
StringBuilder sqlBuild = new StringBuilder();
sqlBuild.append(BASE_SQL);
sqlBuild.append(" WHERE 1=1 ");
if (params.get("query") != null) {
sqlBuild.append(" AND ")
.append(" jc.name LIKE ").append("'%").append(params.get("query")).append("%'")
;
}
sqlBuild.append(" ORDER BY jc.channel_id ");
return sqlBuild.toString();
}
}

View File

@@ -0,0 +1,35 @@
package com.genersoft.iot.vmp.jt1078.service;
import com.genersoft.iot.vmp.common.GeneralCallback;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.jt1078.bean.*;
import com.genersoft.iot.vmp.jt1078.proc.request.J1205;
import com.github.pagehelper.PageInfo;
import java.util.List;
public interface Ijt1078PlayService {
JTMediaStreamType checkStreamFromJt(String stream);
void play(String phoneNumber, Integer channelId, int type, GeneralCallback<StreamInfo> callback);
void playback(String phoneNumber, Integer channelId, String startTime, String endTime, Integer type,
Integer rate, Integer playbackType, Integer playbackSpeed, GeneralCallback<StreamInfo> callback);
void stopPlay(String phoneNumber, Integer channelId);
void pausePlay(String phoneNumber, Integer channelId);
void continueLivePlay(String phoneNumber, Integer channelId);
List<J1205.JRecordItem> getRecordList(String phoneNumber, Integer channelId, String startTime, String endTime);
void stopPlayback(String phoneNumber, Integer channelId);
void startTalk(String phoneNumber, Integer channelId, String app, String stream, String mediaServerId, Boolean onlySend, GeneralCallback<StreamInfo> callback);
void stopTalk(String phoneNumber, Integer channelId);
void playbackControl(String phoneNumber, Integer channelId, Integer command, Integer playbackSpeed, String time);
}

View File

@@ -25,20 +25,6 @@ public interface Ijt1078Service {
void updateDeviceStatus(boolean connected, String phoneNumber);
void play(String phoneNumber, Integer channelId, int type, GeneralCallback<StreamInfo> callback);
void playback(String phoneNumber, Integer channelId, String startTime, String endTime, Integer type,
Integer rate, Integer playbackType, Integer playbackSpeed, GeneralCallback<StreamInfo> callback);
void stopPlay(String phoneNumber, Integer channelId);
void pausePlay(String phoneNumber, Integer channelId);
void continueLivePlay(String phoneNumber, Integer channelId);
List<J1205.JRecordItem> getRecordList(String phoneNumber, Integer channelId, String startTime, String endTime);
void stopPlayback(String phoneNumber, Integer channelId);
void ptzControl(String phoneNumber, Integer channelId, String command, int speed);
@@ -112,14 +98,8 @@ public interface Ijt1078Service {
JTMediaAttribute queryMediaAttribute(String phoneNumber);
void startTalk(String phoneNumber, Integer channelId, String app, String stream, String mediaServerId, Boolean onlySend, GeneralCallback<StreamInfo> callback);
void stopTalk(String phoneNumber, Integer channelId);
void changeStreamType(String phoneNumber, Integer channelId, Integer streamType);
void playbackControl(String phoneNumber, Integer channelId, Integer command, Integer playbackSpeed, String time);
void recordDownload(String phoneNumber, Integer channelId, String startTime, String endTime, Integer alarmSign, Integer mediaType, Integer streamType, Integer storageType, GeneralCallback<String> fileCallback);
PageInfo<JTChannel> getChannelList(int page, int count, int deviceId, String query);

View File

@@ -0,0 +1,651 @@
package com.genersoft.iot.vmp.jt1078.service.impl;
import com.genersoft.iot.vmp.common.GeneralCallback;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.ftpServer.FtpSetting;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
import com.genersoft.iot.vmp.jt1078.bean.*;
import com.genersoft.iot.vmp.jt1078.cmd.JT1078Template;
import com.genersoft.iot.vmp.jt1078.dao.JTChannelMapper;
import com.genersoft.iot.vmp.jt1078.dao.JTTerminalMapper;
import com.genersoft.iot.vmp.jt1078.event.FtpUploadEvent;
import com.genersoft.iot.vmp.jt1078.proc.request.J1205;
import com.genersoft.iot.vmp.jt1078.proc.response.*;
import com.genersoft.iot.vmp.jt1078.service.Ijt1078PlayService;
import com.genersoft.iot.vmp.jt1078.service.Ijt1078Service;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookData;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
import com.genersoft.iot.vmp.media.event.media.MediaNotFoundEvent;
import com.genersoft.iot.vmp.media.event.mediaServer.MediaSendRtpStoppedEvent;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.ISendRtpServerService;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.utils.MediaServerUtils;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Service
@Slf4j
public class jt1078PlayServiceImpl implements Ijt1078PlayService {
@Autowired
private ISendRtpServerService sendRtpServerService;
@Autowired
private Ijt1078Service jt1078Service;
@Autowired
private JT1078Template jt1078Template;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
@Autowired
private HookSubscribe subscribe;
@Autowired
private IMediaServerService mediaServerService;
@Autowired
private DynamicTask dynamicTask;
@Autowired
private UserSetting userSetting;
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired
private FtpSetting ftpSetting;
/**
* 流到来的处理
*/
@Async("taskExecutor")
@EventListener
public void onApplicationEvent(MediaArrivalEvent event) {
}
/**
* 流离开的处理
*/
@Async("taskExecutor")
@EventListener
public void onApplicationEvent(MediaDepartureEvent event) {
}
/**
* 流未找到的处理
*/
@Async("taskExecutor")
@EventListener
public void onApplicationEvent(MediaNotFoundEvent event) {
if (!userSetting.isAutoApplyPlay()) {
return;
}
JTMediaStreamType jtMediaStreamType = checkStreamFromJt(event.getStream());
if (jtMediaStreamType == null){
return;
}
String[] streamParamArray = event.getStream().split("_");
String phoneNumber = streamParamArray[1];
int channelId = Integer.parseInt(streamParamArray[2]);
String params = event.getParams();
Map<String, String> paramMap = MediaServerUtils.urlParamToMap(params);
int type = 0;
try {
type = Integer.parseInt(paramMap.get("type"));
}catch (NumberFormatException ignored) {}
if (jtMediaStreamType.equals(JTMediaStreamType.PLAY)) {
play(phoneNumber, channelId, 0, null);
}else if (jtMediaStreamType.equals(JTMediaStreamType.PLAYBACK)) {
String startTimeParam = DateUtil.jt1078Toyyyy_MM_dd_HH_mm_ss(streamParamArray[3]);
String endTimeParam = DateUtil.jt1078Toyyyy_MM_dd_HH_mm_ss(streamParamArray[4]);
int rate = 0;
int playbackType = 0;
int playbackSpeed = 0;
try {
rate = Integer.parseInt(paramMap.get("rate"));
playbackType = Integer.parseInt(paramMap.get("playbackType"));
playbackSpeed = Integer.parseInt(paramMap.get("playbackSpeed"));
}catch (NumberFormatException ignored) {}
playback(phoneNumber, channelId, startTimeParam, endTimeParam, type, rate, playbackType, playbackSpeed, null);
}
}
/**
* 校验流是否是属于部标的
*/
@Override
public JTMediaStreamType checkStreamFromJt(String stream) {
if (!stream.startsWith("jt_")) {
return null;
}
String[] streamParamArray = stream.split("_");
if (streamParamArray.length == 3) {
return JTMediaStreamType.PLAY;
}else if (streamParamArray.length == 5) {
return JTMediaStreamType.PLAYBACK;
}else if (streamParamArray.length == 4) {
return JTMediaStreamType.TALK;
}else {
return null;
}
}
private final Map<String, List<GeneralCallback<StreamInfo>>> inviteErrorCallbackMap = new ConcurrentHashMap<>();
@Override
public void play(String phoneNumber, Integer channelId, int type, GeneralCallback<StreamInfo> callback) {
JTDevice device = jt1078Service.getDevice(phoneNumber);
if (device == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "设备不存在");
}
jt1078Template.checkTerminalStatus(phoneNumber);
JTChannel channel = jt1078Service.getChannel(device.getId(), channelId);
if (channel == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "通道不存在");
}
// 检查流是否已经存在,存在则返回
String playKey = VideoManagerConstants.INVITE_INFO_1078_PLAY + phoneNumber + ":" + channelId;
List<GeneralCallback<StreamInfo>> errorCallbacks = inviteErrorCallbackMap.computeIfAbsent(playKey, k -> new ArrayList<>());
errorCallbacks.add(callback);
StreamInfo streamInfo = (StreamInfo) redisTemplate.opsForValue().get(playKey);
if (streamInfo != null) {
MediaServer mediaServer = streamInfo.getMediaServer();
if (mediaServer != null) {
// 查询流是否存在,不存在则删除缓存数据
MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, "rtp", streamInfo.getStream());
if (mediaInfo != null) {
log.info("[1078-点播] 点播已经存在,直接返回, phoneNumber {} channelId {}", phoneNumber, channelId);
for (GeneralCallback<StreamInfo> errorCallback : errorCallbacks) {
errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
}
return;
}
}
// 清理数据
redisTemplate.delete(playKey);
}
String stream = "jt_" + phoneNumber + "_" + channelId;
MediaServer mediaServer = mediaServerService.getMediaServerForMinimumLoad(null);
if (mediaServer == null) {
for (GeneralCallback<StreamInfo> errorCallback : errorCallbacks) {
errorCallback.run(InviteErrorCode.FAIL.getCode(), "未找到可用的媒体节点", streamInfo);
}
return;
}
// 设置hook监听
Hook hook = Hook.getInstance(HookType.on_media_arrival, "rtp", stream, mediaServer.getId());
subscribe.addSubscribe(hook, (hookData) -> {
dynamicTask.stop(playKey);
log.info("[1078-点播] 点播成功, 手机号: {} 通道: {}", phoneNumber, channelId);
// TODO 发送9105 实时音视频传输状态通知, 通知丢包率
StreamInfo info = onPublishHandler(mediaServer, hookData, phoneNumber, channelId);
for (GeneralCallback<StreamInfo> errorCallback : errorCallbacks) {
if (errorCallback == null) {
continue;
}
errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), info);
}
subscribe.removeSubscribe(hook);
redisTemplate.opsForValue().set(playKey, info);
// 截图
String streamUrl;
if (mediaServer.getRtspPort() != 0) {
streamUrl = String.format("rtsp://127.0.0.1:%s/%s/%s", mediaServer.getRtspPort(), "rtp", stream);
} else {
streamUrl = String.format("http://127.0.0.1:%s/%s/%s.live.mp4", mediaServer.getHttpPort(), "rtp", stream);
}
String path = "snap";
String fileName = phoneNumber + "_" + channelId + ".jpg";
// 请求截图
log.info("[请求截图]: " + fileName);
mediaServerService.getSnap(mediaServer, streamUrl, 15, 1, path, fileName);
});
// 开启收流端口
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServer, stream, "000", false, false, 0, false, !channel.isHasAudio(), false, 1);
if (ssrcInfo == null) {
stopPlay(phoneNumber, channelId);
return;
}
// 设置超时监听
dynamicTask.startDelay(playKey, () -> {
log.info("[1078-点播] 超时, phoneNumber {} channelId {}", phoneNumber, channelId);
for (GeneralCallback<StreamInfo> errorCallback : errorCallbacks) {
errorCallback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(),
InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null);
}
mediaServerService.closeRTPServer(mediaServer, stream);
subscribe.removeSubscribe(hook);
}, userSetting.getPlayTimeout());
log.info("[1078-点播] phoneNumber {} channelId {} 端口: {}", phoneNumber, channelId, ssrcInfo.getPort());
J9101 j9101 = new J9101();
j9101.setChannel(Integer.valueOf(channelId));
j9101.setIp(mediaServer.getSdpIp());
j9101.setRate(1);
j9101.setTcpPort(ssrcInfo.getPort());
j9101.setUdpPort(ssrcInfo.getPort());
j9101.setType(type);
jt1078Template.startLive(phoneNumber, j9101, 6);
}
public StreamInfo onPublishHandler(MediaServer mediaServerItem, HookData hookData, String phoneNumber, Integer channelId) {
StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream(mediaServerItem, "rtp", hookData.getStream(), hookData.getMediaInfo(), null);
streamInfo.setDeviceId(phoneNumber);
streamInfo.setChannelId(channelId);
return streamInfo;
}
@Override
public void stopPlay(String phoneNumber, Integer channelId) {
String playKey = VideoManagerConstants.INVITE_INFO_1078_PLAY + phoneNumber + ":" + channelId;
dynamicTask.stop(playKey);
// 清理回调
List<GeneralCallback<StreamInfo>> generalCallbacks = inviteErrorCallbackMap.get(playKey);
if (generalCallbacks != null && !generalCallbacks.isEmpty()) {
for (GeneralCallback<StreamInfo> callback : generalCallbacks) {
callback.run(InviteErrorCode.ERROR_FOR_FINISH.getCode(), InviteErrorCode.ERROR_FOR_FINISH.getMsg(), null);
}
}
jt1078Template.checkTerminalStatus(phoneNumber);
StreamInfo streamInfo = (StreamInfo) redisTemplate.opsForValue().get(playKey);
// 发送停止命令
J9102 j9102 = new J9102();
j9102.setChannel(Integer.valueOf(channelId));
j9102.setCommand(0);
j9102.setCloseType(0);
j9102.setStreamType(1);
jt1078Template.stopLive(phoneNumber, j9102, 6);
log.info("[1078-停止点播] phoneNumber {} channelId {}", phoneNumber, channelId);
// 删除缓存数据
if (streamInfo != null) {
// 关闭rtpServer
mediaServerService.closeRTPServer(streamInfo.getMediaServer(), streamInfo.getStream());
redisTemplate.delete(playKey);
}
}
@Override
public void pausePlay(String phoneNumber, Integer channelId) {
String playKey = VideoManagerConstants.INVITE_INFO_1078_PLAY + phoneNumber + ":" + channelId;
dynamicTask.stop(playKey);
StreamInfo streamInfo = (StreamInfo) redisTemplate.opsForValue().get(playKey);
if (streamInfo == null) {
log.info("[1078-暂停点播] 未找到点播信息 phoneNumber {} channelId {}", phoneNumber, channelId);
}
log.info("[1078-暂停点播] phoneNumber {} channelId {}", phoneNumber, channelId);
// 发送暂停命令
J9102 j9102 = new J9102();
j9102.setChannel(Integer.valueOf(channelId));
j9102.setCommand(2);
j9102.setCloseType(0);
j9102.setStreamType(1);
jt1078Template.stopLive(phoneNumber, j9102, 6);
}
@Override
public void continueLivePlay(String phoneNumber, Integer channelId) {
String playKey = VideoManagerConstants.INVITE_INFO_1078_PLAY + phoneNumber + ":" + channelId;
dynamicTask.stop(playKey);
StreamInfo streamInfo = (StreamInfo) redisTemplate.opsForValue().get(playKey);
if (streamInfo == null) {
log.info("[1078-继续点播] 未找到点播信息 phoneNumber {} channelId {}", phoneNumber, channelId);
}
log.info("[1078-继续点播] phoneNumber {} channelId {}", phoneNumber, channelId);
// 发送暂停命令
J9102 j9102 = new J9102();
j9102.setChannel(Integer.valueOf(channelId));
j9102.setCommand(2);
j9102.setCloseType(0);
j9102.setStreamType(1);
jt1078Template.stopLive(phoneNumber, j9102, 6);
}
@Override
public List<J1205.JRecordItem> getRecordList(String phoneNumber, Integer channelId, String startTime, String endTime) {
log.info("[1078-查询录像列表] phoneNumber {} channelId {} startTime {} endTime {}"
, phoneNumber, channelId, startTime, endTime);
// 发送请求录像列表命令
J9205 j9205 = new J9205();
j9205.setChannelId(channelId);
j9205.setStartTime(DateUtil.yyyy_MM_dd_HH_mm_ssTo1078(startTime));
j9205.setEndTime(DateUtil.yyyy_MM_dd_HH_mm_ssTo1078(endTime));
j9205.setMediaType(0);
j9205.setStreamType(0);
j9205.setStorageType(0);
List<J1205.JRecordItem> JRecordItemList = (List<J1205.JRecordItem>) jt1078Template.queryBackTime(phoneNumber, j9205, 20);
if (JRecordItemList == null || JRecordItemList.isEmpty()) {
return null;
}
log.info("[1078-查询录像列表] phoneNumber {} channelId {} startTime {} endTime {}, 结果: {}条"
, phoneNumber, channelId, startTime, endTime, JRecordItemList.size());
return JRecordItemList;
}
@Override
public void playback(String phoneNumber, Integer channelId, String startTime, String endTime, Integer type,
Integer rate, Integer playbackType, Integer playbackSpeed, GeneralCallback<StreamInfo> callback) {
log.info("[1078-回放] 回放,设备:{} 通道: {} 开始时间: {} 结束时间: {} 音视频类型: {} 码流类型: {} " +
"回放方式: {} 快进或快退倍数: {}", phoneNumber, channelId, startTime, endTime, type, rate, playbackType, playbackSpeed);
// 检查流是否已经存在,存在则返回
String playbackKey = VideoManagerConstants.INVITE_INFO_1078_PLAYBACK + phoneNumber + ":" + channelId;
List<GeneralCallback<StreamInfo>> errorCallbacks = inviteErrorCallbackMap.computeIfAbsent(playbackKey, k -> new ArrayList<>());
errorCallbacks.add(callback);
String logInfo = String.format("phoneNumber:%s, channelId:%s, startTime:%s, endTime:%s", phoneNumber, channelId, startTime, endTime);
StreamInfo streamInfo = (StreamInfo) redisTemplate.opsForValue().get(playbackKey);
if (streamInfo != null) {
MediaServer mediaServer = streamInfo.getMediaServer();
if (mediaServer != null) {
// 查询流是否存在,不存在则删除缓存数据
MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, "rtp", streamInfo.getStream());
if (mediaInfo != null) {
log.info("[1078-回放] 回放已经存在,直接返回, logInfo {}", logInfo);
for (GeneralCallback<StreamInfo> errorCallback : errorCallbacks) {
errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
}
return;
}
}
// 清理数据
redisTemplate.delete(playbackKey);
}
String startTimeParam = DateUtil.yyyy_MM_dd_HH_mm_ssTo1078(startTime);
String endTimeParam = DateUtil.yyyy_MM_dd_HH_mm_ssTo1078(endTime);
String stream = "jt_" + phoneNumber + "_" + channelId + "_" + startTimeParam + "_" + endTimeParam;
MediaServer mediaServer = mediaServerService.getMediaServerForMinimumLoad(null);
if (mediaServer == null) {
for (GeneralCallback<StreamInfo> errorCallback : errorCallbacks) {
errorCallback.run(InviteErrorCode.FAIL.getCode(), "未找到可用的媒体节点", streamInfo);
}
return;
}
// 设置hook监听
Hook hookSubscribe = Hook.getInstance(HookType.on_media_arrival, "rtp", stream, mediaServer.getId());
subscribe.addSubscribe(hookSubscribe, (hookData) -> {
dynamicTask.stop(playbackKey);
log.info("[1078-回放] 回放成功, logInfo {}", logInfo);
StreamInfo info = onPublishHandler(mediaServer, hookData, phoneNumber, channelId);
for (GeneralCallback<StreamInfo> errorCallback : errorCallbacks) {
errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), info);
}
subscribe.removeSubscribe(hookSubscribe);
redisTemplate.opsForValue().set(playbackKey, info);
});
// 设置超时监听
dynamicTask.startDelay(playbackKey, () -> {
log.info("[1078-回放] 回放超时, logInfo {}", logInfo);
for (GeneralCallback<StreamInfo> errorCallback : errorCallbacks) {
errorCallback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(),
InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null);
}
}, userSetting.getPlayTimeout());
// 开启收流端口
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServer, stream, null, false, false, 0, false, false, false, 1);
log.info("[1078-回放] logInfo {} 端口: {}", logInfo, ssrcInfo.getPort());
J9201 j9201 = new J9201();
j9201.setChannel(channelId);
j9201.setIp(mediaServer.getSdpIp());
if (rate != null) {
j9201.setRate(rate);
}
if (playbackType != null) {
j9201.setPlaybackType(playbackType);
}
if (playbackSpeed != null) {
j9201.setPlaybackSpeed(playbackSpeed);
}
j9201.setTcpPort(ssrcInfo.getPort());
j9201.setUdpPort(ssrcInfo.getPort());
j9201.setType(type);
j9201.setStartTime(DateUtil.yyyy_MM_dd_HH_mm_ssTo1078(startTime));
j9201.setEndTime(DateUtil.yyyy_MM_dd_HH_mm_ssTo1078(endTime));
jt1078Template.startBackLive(phoneNumber, j9201, 20);
}
@Override
public void playbackControl(String phoneNumber, Integer channelId, Integer command, Integer playbackSpeed, String time) {
log.info("[1078-回放控制] phoneNumber {} channelId {} command {} playbackSpeed {} time {}",
phoneNumber, channelId, command, playbackSpeed, time);
String playKey = VideoManagerConstants.INVITE_INFO_1078_PLAYBACK + phoneNumber + ":" + channelId;
dynamicTask.stop(playKey);
if (command == 2) {
// 结束回放
StreamInfo streamInfo = (StreamInfo) redisTemplate.opsForValue().get(playKey);
// 删除缓存数据
if (streamInfo != null) {
// 关闭rtpServer
mediaServerService.closeRTPServer(streamInfo.getMediaServer(), streamInfo.getStream());
}
// 清理回调
List<GeneralCallback<StreamInfo>> generalCallbacks = inviteErrorCallbackMap.get(playKey);
if (generalCallbacks != null && !generalCallbacks.isEmpty()) {
for (GeneralCallback<StreamInfo> callback : generalCallbacks) {
callback.run(InviteErrorCode.ERROR_FOR_FINISH.getCode(), InviteErrorCode.ERROR_FOR_FINISH.getMsg(), null);
}
}
}
// 发送停止命令
J9202 j9202 = new J9202();
j9202.setChannel(channelId);
j9202.setPlaybackType(command);
if (playbackSpeed != null) {
j9202.setPlaybackSpeed(playbackSpeed);
}
if (!ObjectUtils.isEmpty(time)) {
j9202.setPlaybackTime(DateUtil.yyyy_MM_dd_HH_mm_ssTo1078(time));
}
jt1078Template.controlBackLive(phoneNumber, j9202, 6);
}
@Override
public void stopPlayback(String phoneNumber, Integer channelId) {
playbackControl(phoneNumber, channelId, 2, null, null);
}
private Map<String, GeneralCallback<String>> fileUploadMap = new ConcurrentHashMap<>();
@EventListener
public void onApplicationEvent(FtpUploadEvent event) {
if (fileUploadMap.isEmpty()) {
return;
}
fileUploadMap.keySet().forEach(key -> {
if (!event.getFileName().contains(key)) {
return;
}
GeneralCallback<String> callback = fileUploadMap.get(key);
if (callback != null) {
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), event.getFileName());
fileUploadMap.remove(key);
}
});
}
/**
* 监听发流停止
*/
@EventListener
public void onApplicationEvent(MediaSendRtpStoppedEvent event) {
List<SendRtpInfo> sendRtpInfos = sendRtpServerService.queryByStream(event.getStream());
if (sendRtpInfos.isEmpty()) {
return;
}
for (SendRtpInfo sendRtpInfo : sendRtpInfos) {
if (!sendRtpInfo.isOnlyAudio() || ObjectUtils.isEmpty(sendRtpInfo.getChannelId())) {
continue;
}
if (!sendRtpInfo.getSsrc().contains("_")) {
continue;
}
sendRtpServerService.delete(sendRtpInfo);
String playKey = VideoManagerConstants.INVITE_INFO_1078_TALK + sendRtpInfo.getApp() + ":" + sendRtpInfo.getStream();
redisTemplate.delete(playKey);
}
}
@Override
public void startTalk(String phoneNumber, Integer channelId, String app, String stream, String mediaServerId, Boolean onlySend,
GeneralCallback<StreamInfo> callback) {
// 检查流是否已经存在,存在则返回
String playKey = VideoManagerConstants.INVITE_INFO_1078_TALK + phoneNumber + ":" + channelId;
List<GeneralCallback<StreamInfo>> errorCallbacks = inviteErrorCallbackMap.computeIfAbsent(playKey, k -> new ArrayList<>());
errorCallbacks.add(callback);
StreamInfo streamInfo = (StreamInfo) redisTemplate.opsForValue().get(playKey);
if (streamInfo != null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "对讲进行中");
}
String receiveStream = "jt_" + phoneNumber + "_" + channelId + "_talk";
MediaServer mediaServer = mediaServerService.getOne(mediaServerId);
if (mediaServer == null) {
for (GeneralCallback<StreamInfo> errorCallback : errorCallbacks) {
errorCallback.run(InviteErrorCode.FAIL.getCode(), "未找到可用的媒体节点", streamInfo);
}
return;
}
// 检查待发送的流是否存在,
MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, app, stream);
if (mediaInfo == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), app + "/" + stream + "流不存在");
}
// 开启收流端口, zlm发送1078的rtp流需要将ssrc字段设置为 imei_channel格式
String ssrc = phoneNumber + "_" + channelId;
sendRtpServerService.createSendRtpInfo(mediaServer, )
SendRtpInfo.getInstance(app, stream, ssrc, )
SendRtpInfo sendRtpInfo = new SendRtpInfo();
sendRtpInfo.setMediaServerId(mediaServerId);
sendRtpInfo.setPort(0);
sendRtpInfo.setSsrc(ssrc);
sendRtpInfo.setChannelId(channelId );
sendRtpInfo.setRtcp(false);
sendRtpInfo.setApp(app);
sendRtpInfo.setStream(stream);
sendRtpInfo.setTcp(true);
sendRtpInfo.setTcpActive(true);
sendRtpInfo.setUsePs(false);
sendRtpInfo.setOnlyAudio(true);
if (onlySend == null || !onlySend) {
sendRtpInfo.setReceiveStream(receiveStream);
}
sendRtpInfo.setPlatformId(phoneNumber);
if (onlySend == null || !onlySend) {
// 设置hook监听
Hook hook = Hook.getInstance(HookType.on_media_arrival, "rtp", receiveStream, mediaServer.getId());
subscribe.addSubscribe(hook, (hookData) -> {
dynamicTask.stop(playKey);
log.info("[1078-对讲] 对讲成功, phoneNumber {} channelId {}", phoneNumber, channelId);
StreamInfo info = onPublishHandler(mediaServer, hookData, phoneNumber, channelId);
for (GeneralCallback<StreamInfo> errorCallback : errorCallbacks) {
errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), info);
}
subscribe.removeSubscribe(hook);
redisTemplate.opsForValue().set(playKey, info);
// 存储发流信息
redisCatchStorage.updateSendRTPSever(sendRtpInfo);
});
Hook hookForDeparture = Hook.getInstance(HookType.on_media_departure, "rtp", receiveStream, mediaServer.getId());
subscribe.addSubscribe(hookForDeparture, (hookData) -> {
log.info("[1078-对讲] 对讲时源流注销, app: {}. stream: {}, phoneNumber {} channelId {}", app, stream, phoneNumber, channelId);
stopTalk(phoneNumber, channelId);
});
// 设置超时监听
dynamicTask.startDelay(playKey, () -> {
log.info("[1078-对讲] 超时, phoneNumber {} channelId {}", phoneNumber, channelId);
for (GeneralCallback<StreamInfo> errorCallback : errorCallbacks) {
errorCallback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(),
InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null);
}
}, userSetting.getPlayTimeout());
}
Integer localPort = mediaServerService.startSendRtpPassive(mediaServer, sendRtpInfo, 15000);
log.info("[1078-对讲] phoneNumber {} channelId {} 收发端口: {} app: {}, stream: {}",
phoneNumber, channelId, localPort, app, stream);
J9101 j9101 = new J9101();
j9101.setChannel(Integer.valueOf(channelId));
j9101.setIp(mediaServer.getSdpIp());
j9101.setRate(1);
j9101.setTcpPort(localPort);
j9101.setUdpPort(localPort);
j9101.setType(2);
jt1078Template.startLive(phoneNumber, j9101, 6);
if (onlySend != null && onlySend) {
log.info("[1078-对讲] 对讲成功, phoneNumber {} channelId {}", phoneNumber, channelId);
for (GeneralCallback<StreamInfo> errorCallback : errorCallbacks) {
errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), null);
}
// 存储发流信息
redisCatchStorage.updateSendRTPSever(sendRtpInfo);
}
}
@Override
public void stopTalk(String phoneNumber, Integer channelId) {
String playKey = VideoManagerConstants.INVITE_INFO_1078_TALK + phoneNumber + ":" + channelId;
dynamicTask.stop(playKey);
StreamInfo streamInfo = (StreamInfo) redisTemplate.opsForValue().get(playKey);
// 发送停止命令
J9102 j9102 = new J9102();
j9102.setChannel(Integer.valueOf(channelId));
j9102.setCommand(4);
j9102.setCloseType(0);
j9102.setStreamType(1);
jt1078Template.stopLive(phoneNumber, j9102, 6);
log.info("[1078-停止对讲] phoneNumber {} channelId {}", phoneNumber, channelId);
// 删除缓存数据
if (streamInfo != null) {
redisTemplate.delete(playKey);
// 关闭rtpServer
mediaServerService.closeRTPServer(streamInfo.getMediaServerId(), streamInfo.getStream());
}
// 清理回调
List<GeneralCallback<StreamInfo>> generalCallbacks = inviteErrorCallbackMap.get(playKey);
if (generalCallbacks != null && !generalCallbacks.isEmpty()) {
for (GeneralCallback<StreamInfo> callback : generalCallbacks) {
callback.run(InviteErrorCode.ERROR_FOR_FINISH.getCode(), InviteErrorCode.ERROR_FOR_FINISH.getMsg(), null);
}
}
}
}

View File

@@ -7,35 +7,27 @@ import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.ftpServer.FtpSetting;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.service.IGbChannelService;
import com.genersoft.iot.vmp.jt1078.bean.*;
import com.genersoft.iot.vmp.jt1078.bean.common.ConfigAttribute;
import com.genersoft.iot.vmp.jt1078.cmd.JT1078Template;
import com.genersoft.iot.vmp.jt1078.dao.JTChannelMapper;
import com.genersoft.iot.vmp.jt1078.dao.JTTerminalMapper;
import com.genersoft.iot.vmp.jt1078.event.FtpUploadEvent;
import com.genersoft.iot.vmp.jt1078.proc.request.J1205;
import com.genersoft.iot.vmp.jt1078.proc.response.*;
import com.genersoft.iot.vmp.jt1078.service.Ijt1078Service;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookData;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
import com.genersoft.iot.vmp.media.event.media.MediaNotFoundEvent;
import com.genersoft.iot.vmp.media.event.mediaServer.MediaSendRtpStoppedEvent;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.service.ISendRtpServerService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.utils.MediaServerUtils;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,17 +37,16 @@ import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.transaction.annotation.Transactional;
import java.lang.reflect.Field;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@Service
@Slf4j
public class jt1078ServiceImpl implements Ijt1078Service {
private final static Logger logger = LoggerFactory.getLogger(jt1078ServiceImpl.class);
@Autowired
private JTTerminalMapper jtDeviceMapper;
@@ -69,10 +60,7 @@ public class jt1078ServiceImpl implements Ijt1078Service {
private RedisTemplate<Object, Object> redisTemplate;
@Autowired
private HookSubscribe subscribe;
@Autowired
private IMediaServerService mediaServerService;
private IGbChannelService channelService;
@Autowired
private DynamicTask dynamicTask;
@@ -80,9 +68,6 @@ public class jt1078ServiceImpl implements Ijt1078Service {
@Autowired
private UserSetting userSetting;
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired
private FtpSetting ftpSetting;
@@ -104,45 +89,6 @@ public class jt1078ServiceImpl implements Ijt1078Service {
}
/**
* 流未找到的处理
*/
@Async("taskExecutor")
@EventListener
public void onApplicationEvent(MediaNotFoundEvent event) {
if (!userSetting.isAutoApplyPlay()) {
return;
}
JTMediaStreamType jtMediaStreamType = checkStreamFromJt(event.getStream());
if (jtMediaStreamType == null){
return;
}
String[] streamParamArray = event.getStream().split("_");
String phoneNumber = streamParamArray[1];
int channelId = Integer.parseInt(streamParamArray[2]);
String params = event.getParams();
Map<String, String> paramMap = MediaServerUtils.urlParamToMap(params);
int type = 0;
try {
type = Integer.parseInt(paramMap.get("type"));
}catch (NumberFormatException ignored) {}
if (jtMediaStreamType.equals(JTMediaStreamType.PLAY)) {
play(phoneNumber, channelId, 0, null);
}else if (jtMediaStreamType.equals(JTMediaStreamType.PLAYBACK)) {
String startTimeParam = DateUtil.jt1078Toyyyy_MM_dd_HH_mm_ss(streamParamArray[3]);
String endTimeParam = DateUtil.jt1078Toyyyy_MM_dd_HH_mm_ss(streamParamArray[4]);
int rate = 0;
int playbackType = 0;
int playbackSpeed = 0;
try {
rate = Integer.parseInt(paramMap.get("rate"));
playbackType = Integer.parseInt(paramMap.get("playbackType"));
playbackSpeed = Integer.parseInt(paramMap.get("playbackSpeed"));
}catch (NumberFormatException ignored) {}
playback(phoneNumber, channelId, startTimeParam, endTimeParam, type, rate, playbackType, playbackSpeed, null);
}
}
/**
* 校验流是否是属于部标的
@@ -171,7 +117,7 @@ public class jt1078ServiceImpl implements Ijt1078Service {
@Override
public JTChannel getChannel(Integer terminalDbId, Integer channelId) {
return jtChannelMapper.getChannel(terminalDbId, channelId);
return jtChannelMapper.selectChannelByChannelId(terminalDbId, channelId);
}
@Override
@@ -208,326 +154,6 @@ public class jt1078ServiceImpl implements Ijt1078Service {
jtDeviceMapper.updateDeviceStatus(connected, phoneNumber);
}
private final Map<String, List<GeneralCallback<StreamInfo>>> inviteErrorCallbackMap = new ConcurrentHashMap<>();
@Override
public void play(String phoneNumber, Integer channelId, int type, GeneralCallback<StreamInfo> callback) {
JTDevice device = getDevice(phoneNumber);
if (device == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "设备不存在");
}
jt1078Template.checkTerminalStatus(phoneNumber);
JTChannel channel = getChannel(device.getId(), channelId);
if (channel == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "通道不存在");
}
// 检查流是否已经存在,存在则返回
String playKey = VideoManagerConstants.INVITE_INFO_1078_PLAY + phoneNumber + ":" + channelId;
List<GeneralCallback<StreamInfo>> errorCallbacks = inviteErrorCallbackMap.computeIfAbsent(playKey, k -> new ArrayList<>());
errorCallbacks.add(callback);
StreamInfo streamInfo = (StreamInfo) redisTemplate.opsForValue().get(playKey);
if (streamInfo != null) {
String mediaServerId = streamInfo.getMediaServerId();
MediaServer mediaServer = mediaServerService.getOne(mediaServerId);
if (mediaServer != null) {
// 查询流是否存在,不存在则删除缓存数据
MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, "rtp", streamInfo.getStream());
if (mediaInfo != null) {
logger.info("[1078-点播] 点播已经存在,直接返回, phoneNumber {} channelId {}", phoneNumber, channelId);
for (GeneralCallback<StreamInfo> errorCallback : errorCallbacks) {
errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
}
return;
}
}
// 清理数据
redisTemplate.delete(playKey);
}
String stream = "jt_" + phoneNumber + "_" + channelId;
MediaServer mediaServer = mediaServerService.getMediaServerForMinimumLoad(null);
if (mediaServer == null) {
for (GeneralCallback<StreamInfo> errorCallback : errorCallbacks) {
errorCallback.run(InviteErrorCode.FAIL.getCode(), "未找到可用的媒体节点", streamInfo);
}
return;
}
// 设置hook监听
Hook hook = Hook.getInstance(HookType.on_media_arrival, "rtp", stream, mediaServer.getId());
subscribe.addSubscribe(hook, (hookData) -> {
dynamicTask.stop(playKey);
logger.info("[1078-点播] 点播成功, 手机号: {} 通道: {}", phoneNumber, channelId);
// TODO 发送9105 实时音视频传输状态通知, 通知丢包率
StreamInfo info = onPublishHandler(mediaServer, hookData, phoneNumber, channelId);
for (GeneralCallback<StreamInfo> errorCallback : errorCallbacks) {
if (errorCallback == null) {
continue;
}
errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), info);
}
subscribe.removeSubscribe(hook);
redisTemplate.opsForValue().set(playKey, info);
// 截图
String streamUrl;
if (mediaServer.getRtspPort() != 0) {
streamUrl = String.format("rtsp://127.0.0.1:%s/%s/%s", mediaServer.getRtspPort(), "rtp", stream);
} else {
streamUrl = String.format("http://127.0.0.1:%s/%s/%s.live.mp4", mediaServer.getHttpPort(), "rtp", stream);
}
String path = "snap";
String fileName = phoneNumber + "_" + channelId + ".jpg";
// 请求截图
logger.info("[请求截图]: " + fileName);
mediaServerService.getSnap(mediaServer, streamUrl, 15, 1, path, fileName);
});
// 开启收流端口
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServer, stream, "000", false, false, 0, false, !channel.getHasAudio(), false, 1);
if (ssrcInfo == null) {
stopPlay(phoneNumber, channelId);
return;
}
// 设置超时监听
dynamicTask.startDelay(playKey, () -> {
logger.info("[1078-点播] 超时, phoneNumber {} channelId {}", phoneNumber, channelId);
for (GeneralCallback<StreamInfo> errorCallback : errorCallbacks) {
errorCallback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(),
InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null);
}
mediaServerService.closeRTPServer(mediaServer, stream);
subscribe.removeSubscribe(hook);
}, userSetting.getPlayTimeout());
logger.info("[1078-点播] phoneNumber {} channelId {} 端口: {}", phoneNumber, channelId, ssrcInfo.getPort());
J9101 j9101 = new J9101();
j9101.setChannel(Integer.valueOf(channelId));
j9101.setIp(mediaServer.getSdpIp());
j9101.setRate(1);
j9101.setTcpPort(ssrcInfo.getPort());
j9101.setUdpPort(ssrcInfo.getPort());
j9101.setType(type);
jt1078Template.startLive(phoneNumber, j9101, 6);
}
public StreamInfo onPublishHandler(MediaServer mediaServerItem, HookData hookData, String phoneNumber, Integer channelId) {
StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream(mediaServerItem, "rtp", hookData.getStream(), hookData.getMediaInfo(), null);
streamInfo.setDeviceID(phoneNumber);
streamInfo.setChannelId(channelId + "");
return streamInfo;
}
@Override
public void stopPlay(String phoneNumber, Integer channelId) {
String playKey = VideoManagerConstants.INVITE_INFO_1078_PLAY + phoneNumber + ":" + channelId;
dynamicTask.stop(playKey);
// 清理回调
List<GeneralCallback<StreamInfo>> generalCallbacks = inviteErrorCallbackMap.get(playKey);
if (generalCallbacks != null && !generalCallbacks.isEmpty()) {
for (GeneralCallback<StreamInfo> callback : generalCallbacks) {
callback.run(InviteErrorCode.ERROR_FOR_FINISH.getCode(), InviteErrorCode.ERROR_FOR_FINISH.getMsg(), null);
}
}
jt1078Template.checkTerminalStatus(phoneNumber);
StreamInfo streamInfo = (StreamInfo) redisTemplate.opsForValue().get(playKey);
// 发送停止命令
J9102 j9102 = new J9102();
j9102.setChannel(Integer.valueOf(channelId));
j9102.setCommand(0);
j9102.setCloseType(0);
j9102.setStreamType(1);
jt1078Template.stopLive(phoneNumber, j9102, 6);
logger.info("[1078-停止点播] phoneNumber {} channelId {}", phoneNumber, channelId);
// 删除缓存数据
if (streamInfo != null) {
// 关闭rtpServer
mediaServerService.closeRTPServer(streamInfo.getMediaServerId(), streamInfo.getStream());
redisTemplate.delete(playKey);
}
}
@Override
public void pausePlay(String phoneNumber, Integer channelId) {
String playKey = VideoManagerConstants.INVITE_INFO_1078_PLAY + phoneNumber + ":" + channelId;
dynamicTask.stop(playKey);
StreamInfo streamInfo = (StreamInfo) redisTemplate.opsForValue().get(playKey);
if (streamInfo == null) {
logger.info("[1078-暂停点播] 未找到点播信息 phoneNumber {} channelId {}", phoneNumber, channelId);
}
logger.info("[1078-暂停点播] phoneNumber {} channelId {}", phoneNumber, channelId);
// 发送暂停命令
J9102 j9102 = new J9102();
j9102.setChannel(Integer.valueOf(channelId));
j9102.setCommand(2);
j9102.setCloseType(0);
j9102.setStreamType(1);
jt1078Template.stopLive(phoneNumber, j9102, 6);
}
@Override
public void continueLivePlay(String phoneNumber, Integer channelId) {
String playKey = VideoManagerConstants.INVITE_INFO_1078_PLAY + phoneNumber + ":" + channelId;
dynamicTask.stop(playKey);
StreamInfo streamInfo = (StreamInfo) redisTemplate.opsForValue().get(playKey);
if (streamInfo == null) {
logger.info("[1078-继续点播] 未找到点播信息 phoneNumber {} channelId {}", phoneNumber, channelId);
}
logger.info("[1078-继续点播] phoneNumber {} channelId {}", phoneNumber, channelId);
// 发送暂停命令
J9102 j9102 = new J9102();
j9102.setChannel(Integer.valueOf(channelId));
j9102.setCommand(2);
j9102.setCloseType(0);
j9102.setStreamType(1);
jt1078Template.stopLive(phoneNumber, j9102, 6);
}
@Override
public List<J1205.JRecordItem> getRecordList(String phoneNumber, Integer channelId, String startTime, String endTime) {
logger.info("[1078-查询录像列表] phoneNumber {} channelId {} startTime {} endTime {}"
, phoneNumber, channelId, startTime, endTime);
// 发送请求录像列表命令
J9205 j9205 = new J9205();
j9205.setChannelId(channelId);
j9205.setStartTime(DateUtil.yyyy_MM_dd_HH_mm_ssTo1078(startTime));
j9205.setEndTime(DateUtil.yyyy_MM_dd_HH_mm_ssTo1078(endTime));
j9205.setMediaType(0);
j9205.setStreamType(0);
j9205.setStorageType(0);
List<J1205.JRecordItem> JRecordItemList = (List<J1205.JRecordItem>) jt1078Template.queryBackTime(phoneNumber, j9205, 20);
if (JRecordItemList == null || JRecordItemList.isEmpty()) {
return null;
}
logger.info("[1078-查询录像列表] phoneNumber {} channelId {} startTime {} endTime {}, 结果: {}条"
, phoneNumber, channelId, startTime, endTime, JRecordItemList.size());
return JRecordItemList;
}
@Override
public void playback(String phoneNumber, Integer channelId, String startTime, String endTime, Integer type,
Integer rate, Integer playbackType, Integer playbackSpeed, GeneralCallback<StreamInfo> callback) {
logger.info("[1078-回放] 回放,设备:{} 通道: {} 开始时间: {} 结束时间: {} 音视频类型: {} 码流类型: {} " +
"回放方式: {} 快进或快退倍数: {}", phoneNumber, channelId, startTime, endTime, type, rate, playbackType, playbackSpeed);
// 检查流是否已经存在,存在则返回
String playbackKey = VideoManagerConstants.INVITE_INFO_1078_PLAYBACK + phoneNumber + ":" + channelId;
List<GeneralCallback<StreamInfo>> errorCallbacks = inviteErrorCallbackMap.computeIfAbsent(playbackKey, k -> new ArrayList<>());
errorCallbacks.add(callback);
String logInfo = String.format("phoneNumber:%s, channelId:%s, startTime:%s, endTime:%s", phoneNumber, channelId, startTime, endTime);
StreamInfo streamInfo = (StreamInfo) redisTemplate.opsForValue().get(playbackKey);
if (streamInfo != null) {
String mediaServerId = streamInfo.getMediaServerId();
MediaServer mediaServer = mediaServerService.getOne(mediaServerId);
if (mediaServer != null) {
// 查询流是否存在,不存在则删除缓存数据
MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, "rtp", streamInfo.getStream());
if (mediaInfo != null) {
logger.info("[1078-回放] 回放已经存在,直接返回, logInfo {}", logInfo);
for (GeneralCallback<StreamInfo> errorCallback : errorCallbacks) {
errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
}
return;
}
}
// 清理数据
redisTemplate.delete(playbackKey);
}
String startTimeParam = DateUtil.yyyy_MM_dd_HH_mm_ssTo1078(startTime);
String endTimeParam = DateUtil.yyyy_MM_dd_HH_mm_ssTo1078(endTime);
String stream = "jt_" + phoneNumber + "_" + channelId + "_" + startTimeParam + "_" + endTimeParam;
MediaServer mediaServer = mediaServerService.getMediaServerForMinimumLoad(null);
if (mediaServer == null) {
for (GeneralCallback<StreamInfo> errorCallback : errorCallbacks) {
errorCallback.run(InviteErrorCode.FAIL.getCode(), "未找到可用的媒体节点", streamInfo);
}
return;
}
// 设置hook监听
Hook hookSubscribe = Hook.getInstance(HookType.on_media_arrival, "rtp", stream, mediaServer.getId());
subscribe.addSubscribe(hookSubscribe, (hookData) -> {
dynamicTask.stop(playbackKey);
logger.info("[1078-回放] 回放成功, logInfo {}", logInfo);
StreamInfo info = onPublishHandler(mediaServer, hookData, phoneNumber, channelId);
for (GeneralCallback<StreamInfo> errorCallback : errorCallbacks) {
errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), info);
}
subscribe.removeSubscribe(hookSubscribe);
redisTemplate.opsForValue().set(playbackKey, info);
});
// 设置超时监听
dynamicTask.startDelay(playbackKey, () -> {
logger.info("[1078-回放] 回放超时, logInfo {}", logInfo);
for (GeneralCallback<StreamInfo> errorCallback : errorCallbacks) {
errorCallback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(),
InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null);
}
}, userSetting.getPlayTimeout());
// 开启收流端口
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServer, stream, null, false, false, 0, false, false, false, 1);
logger.info("[1078-回放] logInfo {} 端口: {}", logInfo, ssrcInfo.getPort());
J9201 j9201 = new J9201();
j9201.setChannel(channelId);
j9201.setIp(mediaServer.getSdpIp());
if (rate != null) {
j9201.setRate(rate);
}
if (playbackType != null) {
j9201.setPlaybackType(playbackType);
}
if (playbackSpeed != null) {
j9201.setPlaybackSpeed(playbackSpeed);
}
j9201.setTcpPort(ssrcInfo.getPort());
j9201.setUdpPort(ssrcInfo.getPort());
j9201.setType(type);
j9201.setStartTime(DateUtil.yyyy_MM_dd_HH_mm_ssTo1078(startTime));
j9201.setEndTime(DateUtil.yyyy_MM_dd_HH_mm_ssTo1078(endTime));
jt1078Template.startBackLive(phoneNumber, j9201, 20);
}
@Override
public void playbackControl(String phoneNumber, Integer channelId, Integer command, Integer playbackSpeed, String time) {
logger.info("[1078-回放控制] phoneNumber {} channelId {} command {} playbackSpeed {} time {}",
phoneNumber, channelId, command, playbackSpeed, time);
String playKey = VideoManagerConstants.INVITE_INFO_1078_PLAYBACK + phoneNumber + ":" + channelId;
dynamicTask.stop(playKey);
if (command == 2) {
// 结束回放
StreamInfo streamInfo = (StreamInfo) redisTemplate.opsForValue().get(playKey);
// 删除缓存数据
if (streamInfo != null) {
// 关闭rtpServer
mediaServerService.closeRTPServer(streamInfo.getMediaServerId(), streamInfo.getStream());
}
// 清理回调
List<GeneralCallback<StreamInfo>> generalCallbacks = inviteErrorCallbackMap.get(playKey);
if (generalCallbacks != null && !generalCallbacks.isEmpty()) {
for (GeneralCallback<StreamInfo> callback : generalCallbacks) {
callback.run(InviteErrorCode.ERROR_FOR_FINISH.getCode(), InviteErrorCode.ERROR_FOR_FINISH.getMsg(), null);
}
}
}
// 发送停止命令
J9202 j9202 = new J9202();
j9202.setChannel(channelId);
j9202.setPlaybackType(command);
if (playbackSpeed != null) {
j9202.setPlaybackSpeed(playbackSpeed);
}
if (!ObjectUtils.isEmpty(time)) {
j9202.setPlaybackTime(DateUtil.yyyy_MM_dd_HH_mm_ssTo1078(time));
}
jt1078Template.controlBackLive(phoneNumber, j9202, 6);
}
@Override
public void stopPlayback(String phoneNumber, Integer channelId) {
playbackControl(phoneNumber, channelId, 2, null, null);
}
private Map<String, GeneralCallback<String>> fileUploadMap = new ConcurrentHashMap<>();
@EventListener
@@ -554,7 +180,7 @@ public class jt1078ServiceImpl implements Ijt1078Service {
dynamicTask.startDelay(filePath, ()->{
fileUploadMap.remove(filePath);
}, 2*60*60*1000);
logger.info("[1078-录像] 下载,设备:{} 通道: {} 开始时间: {} 结束时间: {},等待上传文件路径: {} ",
log.info("[1078-录像] 下载,设备:{} 通道: {} 开始时间: {} 结束时间: {},等待上传文件路径: {} ",
phoneNumber, channelId, startTime, endTime, filePath);
// 发送停止命令
J9206 j92026 = new J9206();
@@ -949,162 +575,15 @@ public class jt1078ServiceImpl implements Ijt1078Service {
return (JTMediaAttribute) jt1078Template.queryMediaAttribute(phoneNumber, j9003, 300);
}
/**
* 监听发流停止
*/
@EventListener
public void onApplicationEvent(MediaSendRtpStoppedEvent event) {
List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(event.getStream());
for (SendRtpItem sendRtpItem : sendRtpItems) {
if (!sendRtpItem.isOnlyAudio()
|| ObjectUtils.isEmpty(sendRtpItem.getDeviceId())
|| ObjectUtils.isEmpty(sendRtpItem.getChannelId())) {
continue;
}
if (!sendRtpItem.getSsrc().contains("_")) {
continue;
}
redisCatchStorage.deleteSendRTPServer(sendRtpItem);
String playKey = VideoManagerConstants.INVITE_INFO_1078_TALK + sendRtpItem.getDeviceId() + ":" + sendRtpItem.getChannelId();
redisTemplate.delete(playKey);
}
}
@Override
public void startTalk(String phoneNumber, Integer channelId, String app, String stream, String mediaServerId, Boolean onlySend,
GeneralCallback<StreamInfo> callback) {
// 检查流是否已经存在,存在则返回
String playKey = VideoManagerConstants.INVITE_INFO_1078_TALK + phoneNumber + ":" + channelId;
List<GeneralCallback<StreamInfo>> errorCallbacks = inviteErrorCallbackMap.computeIfAbsent(playKey, k -> new ArrayList<>());
errorCallbacks.add(callback);
StreamInfo streamInfo = (StreamInfo) redisTemplate.opsForValue().get(playKey);
if (streamInfo != null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "对讲进行中");
}
String receiveStream = "jt_" + phoneNumber + "_" + channelId + "_talk";
MediaServer mediaServer = mediaServerService.getOne(mediaServerId);
if (mediaServer == null) {
for (GeneralCallback<StreamInfo> errorCallback : errorCallbacks) {
errorCallback.run(InviteErrorCode.FAIL.getCode(), "未找到可用的媒体节点", streamInfo);
}
return;
}
// 检查待发送的流是否存在,
MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, app, stream);
if (mediaInfo == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), app + "/" + stream + "流不存在");
}
// 开启收流端口, zlm发送1078的rtp流需要将ssrc字段设置为 imei_channel格式
String ssrc = phoneNumber + "_" + channelId;
SendRtpItem sendRtpItem = new SendRtpItem();
sendRtpItem.setMediaServerId(mediaServerId);
sendRtpItem.setPort(0);
sendRtpItem.setSsrc(ssrc);
sendRtpItem.setDeviceId(phoneNumber);
sendRtpItem.setChannelId(channelId + "");
sendRtpItem.setRtcp(false);
sendRtpItem.setApp(app);
sendRtpItem.setStream(stream);
sendRtpItem.setTcp(true);
sendRtpItem.setTcpActive(true);
sendRtpItem.setUsePs(false);
sendRtpItem.setOnlyAudio(true);
if (onlySend == null || !onlySend) {
sendRtpItem.setReceiveStream(receiveStream);
}
sendRtpItem.setPlatformId(phoneNumber);
if (onlySend == null || !onlySend) {
// 设置hook监听
Hook hook = Hook.getInstance(HookType.on_media_arrival, "rtp", receiveStream, mediaServer.getId());
subscribe.addSubscribe(hook, (hookData) -> {
dynamicTask.stop(playKey);
logger.info("[1078-对讲] 对讲成功, phoneNumber {} channelId {}", phoneNumber, channelId);
StreamInfo info = onPublishHandler(mediaServer, hookData, phoneNumber, channelId);
for (GeneralCallback<StreamInfo> errorCallback : errorCallbacks) {
errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), info);
}
subscribe.removeSubscribe(hook);
redisTemplate.opsForValue().set(playKey, info);
// 存储发流信息
redisCatchStorage.updateSendRTPSever(sendRtpItem);
});
Hook hookForDeparture = Hook.getInstance(HookType.on_media_departure, "rtp", receiveStream, mediaServer.getId());
subscribe.addSubscribe(hookForDeparture, (hookData) -> {
logger.info("[1078-对讲] 对讲时源流注销, app: {}. stream: {}, phoneNumber {} channelId {}", app, stream, phoneNumber, channelId);
stopTalk(phoneNumber, channelId);
});
// 设置超时监听
dynamicTask.startDelay(playKey, () -> {
logger.info("[1078-对讲] 超时, phoneNumber {} channelId {}", phoneNumber, channelId);
for (GeneralCallback<StreamInfo> errorCallback : errorCallbacks) {
errorCallback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(),
InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null);
}
}, userSetting.getPlayTimeout());
}
Integer localPort = mediaServerService.startSendRtpPassive(mediaServer, sendRtpItem, 15000);
logger.info("[1078-对讲] phoneNumber {} channelId {} 收发端口: {} app: {}, stream: {}",
phoneNumber, channelId, localPort, app, stream);
J9101 j9101 = new J9101();
j9101.setChannel(Integer.valueOf(channelId));
j9101.setIp(mediaServer.getSdpIp());
j9101.setRate(1);
j9101.setTcpPort(localPort);
j9101.setUdpPort(localPort);
j9101.setType(2);
jt1078Template.startLive(phoneNumber, j9101, 6);
if (onlySend != null && onlySend) {
logger.info("[1078-对讲] 对讲成功, phoneNumber {} channelId {}", phoneNumber, channelId);
for (GeneralCallback<StreamInfo> errorCallback : errorCallbacks) {
errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), null);
}
// 存储发流信息
redisCatchStorage.updateSendRTPSever(sendRtpItem);
}
}
@Override
public void stopTalk(String phoneNumber, Integer channelId) {
String playKey = VideoManagerConstants.INVITE_INFO_1078_TALK + phoneNumber + ":" + channelId;
dynamicTask.stop(playKey);
StreamInfo streamInfo = (StreamInfo) redisTemplate.opsForValue().get(playKey);
// 发送停止命令
J9102 j9102 = new J9102();
j9102.setChannel(Integer.valueOf(channelId));
j9102.setCommand(4);
j9102.setCloseType(0);
j9102.setStreamType(1);
jt1078Template.stopLive(phoneNumber, j9102, 6);
logger.info("[1078-停止对讲] phoneNumber {} channelId {}", phoneNumber, channelId);
// 删除缓存数据
if (streamInfo != null) {
redisTemplate.delete(playKey);
// 关闭rtpServer
mediaServerService.closeRTPServer(streamInfo.getMediaServerId(), streamInfo.getStream());
}
// 清理回调
List<GeneralCallback<StreamInfo>> generalCallbacks = inviteErrorCallbackMap.get(playKey);
if (generalCallbacks != null && !generalCallbacks.isEmpty()) {
for (GeneralCallback<StreamInfo> callback : generalCallbacks) {
callback.run(InviteErrorCode.ERROR_FOR_FINISH.getCode(), InviteErrorCode.ERROR_FOR_FINISH.getMsg(), null);
}
}
}
@Override
public void changeStreamType(String phoneNumber, Integer channelId, Integer streamType) {
String playKey = VideoManagerConstants.INVITE_INFO_1078_PLAY + phoneNumber + ":" + channelId;
dynamicTask.stop(playKey);
StreamInfo streamInfo = (StreamInfo) redisTemplate.opsForValue().get(playKey);
if (streamInfo == null) {
logger.info("[1078-切换码流类型] 未找到点播信息 phoneNumber {} channelId {}, streamType: {}", phoneNumber, channelId, streamType);
log.info("[1078-切换码流类型] 未找到点播信息 phoneNumber {} channelId {}, streamType: {}", phoneNumber, channelId, streamType);
}
logger.info("[1078-切换码流类型] phoneNumber {} channelId {}, streamType: {}", phoneNumber, channelId, streamType);
log.info("[1078-切换码流类型] phoneNumber {} channelId {}, streamType: {}", phoneNumber, channelId, streamType);
// 发送暂停命令
J9102 j9102 = new J9102();
j9102.setChannel(Integer.valueOf(channelId));
@@ -1122,7 +601,7 @@ public class jt1078ServiceImpl implements Ijt1078Service {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "设备不存在");
}
PageHelper.startPage(page, count);
List<JTChannel> all = jtChannelMapper.getAll(deviceId, query);
List<JTChannel> all = jtChannelMapper.selectAll(deviceId, query);
PageInfo<JTChannel> jtChannelPageInfo = new PageInfo<>(all);
for (JTChannel jtChannel : jtChannelPageInfo.getList()) {
String playKey = VideoManagerConstants.INVITE_INFO_1078_PLAY + device.getPhoneNumber() + ":" + jtChannel.getChannelId();
@@ -1135,24 +614,44 @@ public class jt1078ServiceImpl implements Ijt1078Service {
}
@Override
@Transactional
public void updateChannel(JTChannel channel) {
channel.setUpdateTime(DateUtil.getNow());
jtChannelMapper.update(channel);
if (!ObjectUtils.isEmpty(channel.getGbDeviceId())) {
if (channel.getGbId() > 0) {
channelService.update(channel.buildCommonGBChannel());
}else {
channelService.add(channel.buildCommonGBChannel());
}
}
}
@Override
@Transactional
public void addChannel(JTChannel channel) {
JTChannel channelInDb = jtChannelMapper.getChannel(channel.getTerminalDbId(), channel.getChannelId());
JTChannel channelInDb = jtChannelMapper.selectChannelByChannelId(channel.getTerminalDbId(), channel.getChannelId());
if (channelInDb != null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "通道已存在");
}
channel.setCreateTime(DateUtil.getNow());
channel.setUpdateTime(DateUtil.getNow());
jtChannelMapper.add(channel);
if (!ObjectUtils.isEmpty(channel.getGbDeviceId())) {
channelService.add(channel.buildCommonGBChannel());
}
}
@Override
@Transactional
public void deleteChannelById(Integer id) {
JTChannel jtChannel = jtChannelMapper.selectChannelById(id);
if (jtChannel == null) {
return;
}
if (jtChannel.getGbId() > 0) {
channelService.delete(jtChannel.getGbId());
}
jtChannelMapper.delete(id);
}
@@ -1190,6 +689,6 @@ public class jt1078ServiceImpl implements Ijt1078Service {
@Override
public JTChannel getChannelByDbId(Integer id) {
return jtChannelMapper.getChannelByDbId(id);
return jtChannelMapper.selectChannelById(id);
}
}