临时提交

This commit is contained in:
648540858
2024-07-23 18:00:47 +08:00
parent 05fabc87a2
commit 6bd0cdd37b
70 changed files with 603 additions and 436 deletions

View File

@@ -1,55 +0,0 @@
package com.genersoft.iot.vmp.service;
import com.alibaba.fastjson2.JSONArray;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.service.bean.CloudRecordItem;
import com.genersoft.iot.vmp.service.bean.DownloadFileInfo;
import com.github.pagehelper.PageInfo;
import java.util.List;
/**
* 云端录像管理
* @author lin
*/
public interface ICloudRecordService {
/**
* 分页回去云端录像列表
*/
PageInfo<CloudRecordItem> getList(int page, int count, String query, String app, String stream, String startTime, String endTime, List<MediaServer> mediaServerItems, String callId);
/**
* 获取所有的日期
*/
List<String> getDateList(String app, String stream, int year, int month, List<MediaServer> mediaServerItems);
/**
* 添加合并任务
*/
String addTask(String app, String stream, MediaServer mediaServerItem, String startTime,
String endTime, String callId, String remoteHost, boolean filterMediaServer);
/**
* 查询合并任务列表
*/
JSONArray queryTask(String app, String stream, String callId, String taskId, String mediaServerId, Boolean isEnd, String scheme);
/**
* 收藏视频,收藏的视频过期不会删除
*/
int changeCollect(boolean result, String app, String stream, String mediaServerId, String startTime, String endTime, String callId);
/**
* 添加指定录像收藏
*/
int changeCollectById(Integer recordId, boolean result);
/**
* 获取播放地址
*/
DownloadFileInfo getPlayUrlPath(Integer recordId);
List<CloudRecordItem> getAllList(String query, String app, String stream, String startTime, String endTime, List<MediaServer> mediaServerItems, String callId, List<Integer> ids);
}

View File

@@ -1,43 +0,0 @@
package com.genersoft.iot.vmp.service;
import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarm;
import com.github.pagehelper.PageInfo;
import java.util.List;
/**
* 报警相关业务处理
*/
public interface IDeviceAlarmService {
/**
* 根据多个添加获取报警列表
* @param page 当前页
* @param count 每页数量
* @param deviceId 设备id
* @param alarmPriority 报警级别, 1为一级警情, 2为二级警情, 3为三级警情, 4为四级 警情-
* @param alarmMethod 报警方式 , 1为电话报警, 2为设备报警, 3为短信报警, 4为 GPS报警, 5为视频报警, 6为设备故障报警,
* 7其他报警;可以为直接组合如12为电话报警或 设备报警-
* @param alarmType 报警类型
* @param startTime 开始时间
* @param endTime 结束时间
* @return 报警列表
*/
PageInfo<DeviceAlarm> getAllAlarm(int page, int count, String deviceId, String alarmPriority, String alarmMethod,
String alarmType, String startTime, String endTime);
/**
* 添加一个报警
* @param deviceAlarm 添加报警
*/
void add(DeviceAlarm deviceAlarm);
/**
* 清空时间以前的报警
* @param id 数据库id
* @param deviceIdList 制定需要清理的设备id
* @param time 不写时间则清空所有时间的
*/
int clearAlarmBeforeTime(Integer id, List<String> deviceIdList, String time);
}

View File

@@ -1,121 +0,0 @@
package com.genersoft.iot.vmp.service;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
import com.genersoft.iot.vmp.gb28181.controller.bean.ChannelReduce;
import com.genersoft.iot.vmp.web.gb28181.dto.DeviceChannelExtend;
import com.github.pagehelper.PageInfo;
import java.util.List;
/**
* 国标通道业务类
* @author lin
*/
public interface IDeviceChannelService {
/**
* 添加设备通道
*
* @param deviceId 设备id
* @param channel 通道
*/
void updateChannel(String deviceId, DeviceChannel channel);
/**
* 批量添加设备通道
*/
int updateChannels(Device device, List<DeviceChannel> channels);
/**
* 获取统计信息
* @return
*/
ResourceBaseInfo getOverview();
/**
* 查询所有未分配的通道
* @param platformId
* @return
*/
List<ChannelReduce> queryAllChannelList(String platformId);
PageInfo<ChannelReduce> queryAllChannelList(int page, int count, String query, Boolean online, Boolean channelType, String platformId, String catalogId);
/**
* 查询通道所属的设备
*/
List<Device> getDeviceByChannelId(String channelId);
/**
* 批量删除通道
* @param deleteChannelList 待删除的通道列表
*/
int deleteChannels(List<DeviceChannel> deleteChannelList);
/**
* 批量上线
*/
int channelsOnline(List<DeviceChannel> channels);
/**
* 批量下线
*/
int channelsOffline(List<DeviceChannel> channels);
/**
* 获取一个通道
*/
DeviceChannel getOne(String deviceId, String channelId);
/**
* 直接批量更新通道
*/
void batchUpdateChannel(List<DeviceChannel> channels);
/**
* 直接批量添加
*/
void batchAddChannel(List<DeviceChannel> deviceChannels);
/**
* 修改通道的码流类型
*/
void updateChannelStreamIdentification(DeviceChannel channel);
List<DeviceChannel> queryChaneListByDeviceId(String deviceId);
void updateChannelGPS(Device device, DeviceChannel deviceChannel, MobilePosition mobilePosition);
void startPlay(String deviceId, String channelId, String stream);
void stopPlay(String deviceId, String channelId);
void batchUpdateChannelGPS(List<DeviceChannel> channelList);
void batchAddMobilePosition(List<MobilePosition> addMobilePositionList);
void online(DeviceChannel channel);
void offline(DeviceChannel channel);
void delete(DeviceChannel channel);
void cleanChannelsForDevice(int deviceId);
boolean resetChannels(int deviceDbId, List<DeviceChannel> deviceChannels);
PageInfo<DeviceChannel> getSubChannels(int deviceDbId, String channelId, String query, Boolean channelType, Boolean online, int page, int count);
List<DeviceChannelExtend> queryChannelExtendsByDeviceId(String deviceId, List<String> channelIds, Boolean online);
PageInfo<DeviceChannel> queryChannelsByDeviceId(String deviceId, String query, Boolean channelType, Boolean online, int page, int count);
List<Device> queryDeviceWithAsMessageChannel();
DeviceChannel getRawChannel(int id);
}

View File

@@ -1,167 +0,0 @@
package com.genersoft.iot.vmp.service;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo;
import com.genersoft.iot.vmp.gb28181.bean.SyncStatus;
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
import com.github.pagehelper.PageInfo;
import java.util.List;
/**
* 设备相关业务处理
* @author lin
*/
public interface IDeviceService {
/**
* 设备上线
* @param device 设备信息
*/
void online(Device device, SipTransactionInfo sipTransactionInfo);
/**
* 设备下线
* @param deviceId 设备编号
*/
void offline(String deviceId, String reason);
/**
* 添加目录订阅
* @param device 设备信息
* @return 布尔
*/
boolean addCatalogSubscribe(Device device);
/**
* 移除目录订阅
* @param device 设备信息
* @return 布尔
*/
boolean removeCatalogSubscribe(Device device, CommonCallback<Boolean> callback);
/**
* 添加移动位置订阅
* @param device 设备信息
* @return 布尔
*/
boolean addMobilePositionSubscribe(Device device);
/**
* 移除移动位置订阅
* @param device 设备信息
* @return 布尔
*/
boolean removeMobilePositionSubscribe(Device device, CommonCallback<Boolean> callback);
/**
* 移除移动位置订阅
* @param deviceId 设备ID
* @return 同步状态
*/
SyncStatus getChannelSyncStatus(String deviceId);
/**
* 查看是否仍在同步
* @param deviceId 设备ID
* @return 布尔
*/
Boolean isSyncRunning(String deviceId);
/**
* 通道同步
* @param device 设备信息
*/
void sync(Device device);
/**
* 查询设备信息
* @param deviceId 设备编号
* @return 设备信息
*/
Device getDevice(String deviceId);
/**
* 获取所有在线设备
* @return 设备列表
*/
List<Device> getAllOnlineDevice();
List<Device> getAllByStatus(boolean status);
/**
* 判断是否注册已经失效
* @param device 设备信息
* @return 布尔
*/
boolean expire(Device device);
/**
* 检查设备状态
* @param device 设备信息
*/
void checkDeviceStatus(Device device);
/**
* 根据IP和端口获取设备信息
* @param host IP
* @param port 端口
* @return 设备信息
*/
Device getDeviceByHostAndPort(String host, int port);
/**
* 更新设备
* @param device 设备信息
*/
void updateDevice(Device device);
/**
* 查询树节点下的通道
* @param deviceId 设备ID
* @param parentId 父ID
* @return
*/
List<DeviceChannel> queryVideoDeviceInTreeNode(String deviceId, String parentId);
/**
* 检查设备编号是否已经存在
* @param deviceId 设备编号
* @return
*/
boolean isExist(String deviceId);
/**
* 添加设备
* @param device
*/
void addDevice(Device device);
/**
* 页面表单更新设备信息
* @param device
*/
void updateCustomDevice(Device device);
/**
* 删除设备
* @param deviceId
* @return
*/
boolean delete(String deviceId);
/**
* 获取统计信息
* @return
*/
ResourceBaseInfo getOverview();
/**
* 获取所有设备
*/
List<Device> getAll();
PageInfo<Device> getAll(int page, int count, String query, Boolean status);
}

View File

@@ -1,88 +0,0 @@
package com.genersoft.iot.vmp.service;
import com.genersoft.iot.vmp.common.InviteInfo;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
/**
* 记录国标点播的状态,包括实时预览,下载,录像回放
*/
public interface IInviteStreamService {
/**
* 更新点播的状态信息
*/
void updateInviteInfo(InviteInfo inviteInfo);
void updateInviteInfo(InviteInfo inviteInfo, Long time);
InviteInfo updateInviteInfoForStream(InviteInfo inviteInfo, String stream);
/**
* 获取点播的状态信息
*/
InviteInfo getInviteInfo(InviteSessionType type,
String deviceId,
String channelId,
String stream);
/**
* 移除点播的状态信息
*/
void removeInviteInfo(InviteSessionType type,
String deviceId,
String channelId,
String stream);
/**
* 移除点播的状态信息
*/
void removeInviteInfo(InviteInfo inviteInfo);
/**
* 移除点播的状态信息
*/
void removeInviteInfoByDeviceAndChannel(InviteSessionType inviteSessionType, String deviceId, String channelId);
/**
* 获取点播的状态信息
*/
InviteInfo getInviteInfoByDeviceAndChannel(InviteSessionType type,
String deviceId,
String channelId);
/**
* 获取点播的状态信息
*/
InviteInfo getInviteInfoByStream(InviteSessionType type, String stream);
/**
* 添加一个invite回调
*/
void once(InviteSessionType type, String deviceId, String channelId, String stream, ErrorCallback<Object> callback);
/**
* 调用一个invite回调
*/
void call(InviteSessionType type, String deviceId, String channelId, String stream, int code, String msg, Object data);
/**
* 清空一个设备的所有invite信息
*/
void clearInviteInfo(String deviceId);
/**
* 统计同一个zlm下的国标收流个数
*/
int getStreamInfoCount(String mediaServerId);
/**
* 获取MediaServer下的流信息
*/
InviteInfo getInviteInfoBySSRC(String ssrc);
/**
* 更新ssrc
*/
InviteInfo updateInviteInfoForSSRC(InviteInfo inviteInfo, String ssrcInResponse);
}

View File

@@ -1,29 +0,0 @@
package com.genersoft.iot.vmp.service;
import com.genersoft.iot.vmp.gb28181.controller.bean.ChannelReduce;
import java.util.List;
/**
* 平台关联通道管理
* @author lin
*/
public interface IPlatformChannelService {
/**
* 更新目录下的通道
* @param platformId 平台编号
* @param channelReduces 通道信息
* @param catalogId 目录编号
* @return
*/
int updateChannelForGB(String platformId, List<ChannelReduce> channelReduces, String catalogId);
/**
* 移除目录下的所有通道
* @param platformId
* @param catalogId
* @return
*/
int delAllChannelForGB(String platformId, String catalogId);
}

View File

@@ -1,85 +0,0 @@
package com.genersoft.iot.vmp.service;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.service.bean.InviteTimeOutCallback;
import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo;
import com.github.pagehelper.PageInfo;
import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
import java.text.ParseException;
/**
* 国标平台的业务类
* @author lin
*/
public interface IPlatformService {
ParentPlatform queryPlatformByServerGBId(String platformGbId);
/**
* 分页获取上级平台
* @param page
* @param count
* @return
*/
PageInfo<ParentPlatform> queryParentPlatformList(int page, int count);
/**
* 添加级联平台
* @param parentPlatform 级联平台
*/
boolean add(ParentPlatform parentPlatform);
/**
* 添加级联平台
* @param parentPlatform 级联平台
*/
boolean update(ParentPlatform parentPlatform);
/**
* 平台上线
* @param parentPlatform 平台信息
*/
void online(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo);
/**
* 平台离线
* @param parentPlatform 平台信息
*/
void offline(ParentPlatform parentPlatform, boolean stopRegisterTask);
/**
* 向上级平台发起注册
* @param parentPlatform
*/
void login(ParentPlatform parentPlatform);
/**
* 向上级平台发送位置订阅
* @param platformId 平台
*/
void sendNotifyMobilePosition(String platformId);
/**
* 向上级发送语音喊话的消息
* @param platform 平台
* @param channelId 通道
* @param hookEvent hook事件
* @param errorEvent 信令错误事件
* @param timeoutCallback 超时事件
*/
void broadcastInvite(ParentPlatform platform, String channelId, MediaServer mediaServerItem, HookSubscribe.Event hookEvent,
SipSubscribe.Event errorEvent, InviteTimeOutCallback timeoutCallback) throws InvalidArgumentException, ParseException, SipException;
/**
* 语音喊话回复BYE
*/
void stopBroadcast(ParentPlatform platform, DeviceChannel channel, String stream,boolean sendBye, MediaServer mediaServerItem);
void addSimulatedSubscribeInfo(ParentPlatform parentPlatform);
}

View File

@@ -1,69 +0,0 @@
package com.genersoft.iot.vmp.service;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.exception.ServiceException;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult;
import com.genersoft.iot.vmp.gb28181.controller.bean.AudioBroadcastEvent;
import gov.nist.javax.sip.message.SIPResponse;
import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
import javax.sip.header.CallIdHeader;
import java.text.ParseException;
/**
* 点播处理
*/
public interface IPlayService {
void play(MediaServer mediaServerItem, SSRCInfo ssrcInfo, Device device, DeviceChannel channelId,
ErrorCallback<Object> callback);
SSRCInfo play(MediaServer mediaServerItem, String deviceId, String channelId, String ssrc, ErrorCallback<Object> callback);
StreamInfo onPublishHandlerForPlay(MediaServer mediaServerItem, MediaInfo mediaInfo, String deviceId, String channelId);
MediaServer getNewMediaServerItem(Device device);
void playBack(String deviceId, String channelId, String startTime, String endTime, ErrorCallback<Object> callback);
void playBack(MediaServer mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, ErrorCallback<Object> callback);
void zlmServerOffline(String mediaServerId);
void download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback<Object> callback);
void download(MediaServer mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback<Object> callback);
StreamInfo getDownLoadInfo(String deviceId, String channelId, String stream);
void zlmServerOnline(String mediaServerId);
AudioBroadcastResult audioBroadcast(Device device, String channelId, Boolean broadcastMode);
boolean audioBroadcastCmd(Device device, String channelId, MediaServer mediaServerItem, String app, String stream, int timeout, boolean isFromPlatform, AudioBroadcastEvent event) throws InvalidArgumentException, ParseException, SipException;
boolean audioBroadcastInUse(Device device, String channelId);
void stopAudioBroadcast(String deviceId, String channelId);
void pauseRtp(String streamId) throws ServiceException, InvalidArgumentException, ParseException, SipException;
void resumeRtp(String streamId) throws ServiceException, InvalidArgumentException, ParseException, SipException;
void startPushStream(SendRtpItem sendRtpItem, SIPResponse sipResponse, ParentPlatform platform, CallIdHeader callIdHeader);
void startSendRtpStreamFailHand(SendRtpItem sendRtpItem,ParentPlatform platform, CallIdHeader callIdHeader);
void talkCmd(Device device, String channelId, MediaServer mediaServerItem, String stream, AudioBroadcastEvent event);
void stopTalk(Device device, String channelId, Boolean streamIsReady);
void getSnap(String deviceId, String channelId, String fileName, ErrorCallback errorCallback);
void stopPlay(Device device, String channelId);
}

View File

@@ -10,7 +10,7 @@ import com.genersoft.iot.vmp.media.event.media.MediaRecordMp4Event;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
import com.genersoft.iot.vmp.service.ICloudRecordService;
import com.genersoft.iot.vmp.gb28181.service.ICloudRecordService;
import com.genersoft.iot.vmp.service.bean.CloudRecordItem;
import com.genersoft.iot.vmp.service.bean.DownloadFileInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;

View File

@@ -1,37 +0,0 @@
package com.genersoft.iot.vmp.service.impl;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarm;
import com.genersoft.iot.vmp.service.IDeviceAlarmService;
import com.genersoft.iot.vmp.gb28181.dao.DeviceAlarmMapper;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
@DS("master")
public class DeviceAlarmServiceImpl implements IDeviceAlarmService {
@Autowired
private DeviceAlarmMapper deviceAlarmMapper;
@Override
public PageInfo<DeviceAlarm> getAllAlarm(int page, int count, String deviceId, String alarmPriority, String alarmMethod, String alarmType, String startTime, String endTime) {
PageHelper.startPage(page, count);
List<DeviceAlarm> all = deviceAlarmMapper.query(deviceId, alarmPriority, alarmMethod, alarmType, startTime, endTime);
return new PageInfo<>(all);
}
@Override
public void add(DeviceAlarm deviceAlarm) {
deviceAlarmMapper.add(deviceAlarm);
}
@Override
public int clearAlarmBeforeTime(Integer id, List<String> deviceIdList, String time) {
return deviceAlarmMapper.clearAlarmBeforeTime(id, deviceIdList, time);
}
}

View File

@@ -1,584 +0,0 @@
package com.genersoft.iot.vmp.service.impl;
import com.alibaba.fastjson2.JSONObject;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.genersoft.iot.vmp.common.InviteInfo;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
import com.genersoft.iot.vmp.gb28181.dao.DeviceChannelMapper;
import com.genersoft.iot.vmp.gb28181.dao.DeviceMapper;
import com.genersoft.iot.vmp.gb28181.dao.DeviceMobilePositionMapper;
import com.genersoft.iot.vmp.gb28181.dao.PlatformChannelMapper;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.service.IDeviceChannelService;
import com.genersoft.iot.vmp.service.IInviteStreamService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
import com.genersoft.iot.vmp.gb28181.controller.bean.ChannelReduce;
import com.genersoft.iot.vmp.web.gb28181.dto.DeviceChannelExtend;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author lin
*/
@Slf4j
@Service
@DS("master")
public class DeviceChannelServiceImpl implements IDeviceChannelService {
@Autowired
private EventPublisher eventPublisher;
@Autowired
private IInviteStreamService inviteStreamService;
@Autowired
private DeviceChannelMapper channelMapper;
@Autowired
private PlatformChannelMapper platformChannelMapper;
@Autowired
private DeviceMapper deviceMapper;
@Autowired
private DeviceMobilePositionMapper deviceMobilePositionMapper;
@Autowired
private UserSetting userSetting;
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Override
public void updateChannel(String deviceId, DeviceChannel channel) {
String channelId = channel.getDeviceId();
channel.setDeviceId(deviceId);
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
if (inviteInfo != null && inviteInfo.getStreamInfo() != null) {
channel.setStreamId(inviteInfo.getStreamInfo().getStream());
}
String now = DateUtil.getNow();
channel.setUpdateTime(now);
DeviceChannel deviceChannel = getOne(deviceId, channelId);
if (deviceChannel == null) {
channel.setCreateTime(now);
channelMapper.add(channel);
}else {
channelMapper.update(channel);
}
channelMapper.updateChannelSubCount(channel.getDeviceDbId(),channel.getParentId());
}
@Override
public int updateChannels(Device device, List<DeviceChannel> channels) {
List<DeviceChannel> addChannels = new ArrayList<>();
List<DeviceChannel> updateChannels = new ArrayList<>();
HashMap<String, DeviceChannel> channelsInStore = new HashMap<>();
int result = 0;
if (channels != null && !channels.isEmpty()) {
List<DeviceChannel> channelList = channelMapper.queryChannelsByDeviceDbId(device.getId());
if (channelList.isEmpty()) {
for (DeviceChannel channel : channels) {
channel.setDeviceDbId(device.getId());
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channel.getDeviceId());
if (inviteInfo != null && inviteInfo.getStreamInfo() != null) {
channel.setStreamId(inviteInfo.getStreamInfo().getStream());
}
String now = DateUtil.getNow();
channel.setUpdateTime(now);
channel.setCreateTime(now);
addChannels.add(channel);
}
}else {
for (DeviceChannel deviceChannel : channelList) {
channelsInStore.put(deviceChannel.getDeviceDbId() + deviceChannel.getDeviceId(), deviceChannel);
}
for (DeviceChannel channel : channels) {
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channel.getDeviceId());
if (inviteInfo != null && inviteInfo.getStreamInfo() != null) {
channel.setStreamId(inviteInfo.getStreamInfo().getStream());
}
String now = DateUtil.getNow();
channel.setUpdateTime(now);
DeviceChannel deviceChannelInDb = channelsInStore.get(channel.getDeviceDbId() + channel.getDeviceId());
if ( deviceChannelInDb != null) {
channel.setId(deviceChannelInDb.getId());
channel.setUpdateTime(now);
updateChannels.add(channel);
}else {
channel.setCreateTime(now);
channel.setUpdateTime(now);
addChannels.add(channel);
}
}
}
Set<String> channelSet = new HashSet<>();
// 滤重
List<DeviceChannel> addChannelList = new ArrayList<>();
List<DeviceChannel> updateChannelList = new ArrayList<>();
addChannels.forEach(channel -> {
if (channelSet.add(channel.getDeviceId())) {
addChannelList.add(channel);
}
});
channelSet.clear();
updateChannels.forEach(channel -> {
if (channelSet.add(channel.getDeviceId())) {
updateChannelList.add(channel);
}
});
int limitCount = 500;
if (!addChannelList.isEmpty()) {
if (addChannelList.size() > limitCount) {
for (int i = 0; i < addChannelList.size(); i += limitCount) {
int toIndex = i + limitCount;
if (i + limitCount > addChannelList.size()) {
toIndex = addChannelList.size();
}
result += channelMapper.batchAdd(addChannelList.subList(i, toIndex));
}
}else {
result += channelMapper.batchAdd(addChannelList);
}
}
if (!updateChannelList.isEmpty()) {
if (updateChannelList.size() > limitCount) {
for (int i = 0; i < updateChannelList.size(); i += limitCount) {
int toIndex = i + limitCount;
if (i + limitCount > updateChannelList.size()) {
toIndex = updateChannelList.size();
}
result += channelMapper.batchUpdate(updateChannelList.subList(i, toIndex));
}
}else {
result += channelMapper.batchUpdate(updateChannelList);
}
}
}
return result;
}
@Override
public ResourceBaseInfo getOverview() {
int online = channelMapper.getOnlineCount();
int total = channelMapper.getAllChannelCount();
return new ResourceBaseInfo(total, online);
}
@Override
public List<ChannelReduce> queryAllChannelList(String platformId) {
return channelMapper.queryChannelListInAll(null, null, null, platformId, null);
}
@Override
public PageInfo<ChannelReduce> queryAllChannelList(int page, int count, String query, Boolean online, Boolean channelType, String platformId, String catalogId) {
PageHelper.startPage(page, count);
List<ChannelReduce> all = channelMapper.queryChannelListInAll(query, online, channelType, platformId, catalogId);
return new PageInfo<>(all);
}
@Override
public List<Device> getDeviceByChannelId(String channelId) {
return channelMapper.getDeviceByChannelId(channelId);
}
@Override
public int deleteChannels(List<DeviceChannel> deleteChannelList) {
return channelMapper.batchDel(deleteChannelList);
}
@Override
public int channelsOnline(List<DeviceChannel> channels) {
return channelMapper.batchOnline(channels);
}
@Override
public void online(DeviceChannel channel) {
channelMapper.online(channel.getId());
}
@Override
public int channelsOffline(List<DeviceChannel> channels) {
return channelMapper.batchOffline(channels);
}
@Override
public void offline(DeviceChannel channel) {
channelMapper.offline(channel.getId());
}
@Override
public void delete(DeviceChannel channel) {
channelMapper.del(channel.getId());
}
@Override
public DeviceChannel getOne(String deviceId, String channelId){
Device device = deviceMapper.getDeviceByDeviceId(deviceId);
if (device == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到设备:" + deviceId);
}
return channelMapper.getOneByDeviceId(device.getId(), channelId);
}
@Override
public synchronized void batchUpdateChannel(List<DeviceChannel> channels) {
String now = DateUtil.getNow();
for (DeviceChannel channel : channels) {
channel.setUpdateTime(now);
}
int limitCount = 1000;
if (!channels.isEmpty()) {
if (channels.size() > limitCount) {
for (int i = 0; i < channels.size(); i += limitCount) {
int toIndex = i + limitCount;
if (i + limitCount > channels.size()) {
toIndex = channels.size();
}
channelMapper.batchUpdate(channels.subList(i, toIndex));
}
}else {
channelMapper.batchUpdate(channels);
}
}
}
@Override
public void batchAddChannel(List<DeviceChannel> channels) {
channelMapper.batchAdd(channels);
for (DeviceChannel channel : channels) {
if (channel.getParentId() != null) {
channelMapper.updateChannelSubCount(channel.getDeviceDbId(), channel.getParentId());
}
}
}
@Override
public void updateChannelStreamIdentification(DeviceChannel channel) {
assert !ObjectUtils.isEmpty(channel.getId());
assert !ObjectUtils.isEmpty(channel.getStreamIdentification());
if (ObjectUtils.isEmpty(channel.getStreamIdentification())) {
log.info("[重置通道码流类型] 设备: {}, 码流: {}", channel.getDeviceId(), channel.getStreamIdentification());
}else {
log.info("[更新通道码流类型] 设备: {}, 通道:{} 码流: {}", channel.getDeviceId(), channel.getDeviceId(),
channel.getStreamIdentification());
}
channelMapper.updateChannelStreamIdentification(channel);
}
@Override
public List<DeviceChannel> queryChaneListByDeviceId(String deviceId) {
Device device = deviceMapper.getDeviceByDeviceId(deviceId);
if (device == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到通道:" + deviceId);
}
return channelMapper.queryAllChannels(device.getId());
}
@Override
public void updateChannelGPS(Device device, DeviceChannel deviceChannel, MobilePosition mobilePosition) {
if (userSetting.getSavePositionHistory()) {
deviceMobilePositionMapper.insertNewPosition(mobilePosition);
}
if (deviceChannel.getDeviceId().equals(deviceChannel.getDeviceId())) {
deviceChannel.setDeviceId(null);
}
if (deviceChannel.getGpsTime() == null) {
deviceChannel.setGpsTime(DateUtil.getNow());
}
int updated = channelMapper.updatePosition(deviceChannel);
if (updated == 0) {
return;
}
List<DeviceChannel> deviceChannels = new ArrayList<>();
if (deviceChannel.getDeviceId() == null) {
// 有的设备这里上报的deviceId与通道Id是一样这种情况更新设备下的全部通道
List<DeviceChannel> deviceChannelsInDb = queryChaneListByDeviceId(device.getDeviceId());
deviceChannels.addAll(deviceChannelsInDb);
}else {
deviceChannels.add(deviceChannel);
}
if (deviceChannels.isEmpty()) {
return;
}
if (deviceChannels.size() > 100) {
log.warn("[更新通道位置信息后发送通知] 设备可能是平台,上报的位置信息未标明通道编号," +
"导致所有通道被更新位置, deviceId:{}", device.getDeviceId());
}
for (DeviceChannel channel : deviceChannels) {
// 向关联了该通道并且开启移动位置订阅的上级平台发送移动位置订阅消息
mobilePosition.setChannelId(channel.getDeviceId());
try {
eventPublisher.mobilePositionEventPublish(mobilePosition);
}catch (Exception e) {
log.error("[向上级转发移动位置失败] ", e);
}
// 发送redis消息。 通知位置信息的变化
JSONObject jsonObject = new JSONObject();
jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime()));
jsonObject.put("serial", mobilePosition.getDeviceId());
jsonObject.put("code", mobilePosition.getChannelId());
jsonObject.put("longitude", mobilePosition.getLongitude());
jsonObject.put("latitude", mobilePosition.getLatitude());
jsonObject.put("altitude", mobilePosition.getAltitude());
jsonObject.put("direction", mobilePosition.getDirection());
jsonObject.put("speed", mobilePosition.getSpeed());
redisCatchStorage.sendMobilePositionMsg(jsonObject);
}
}
@Override
public void startPlay(String deviceId, String channelId, String stream) {
Device device = deviceMapper.getDeviceByDeviceId(deviceId);
if (device == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到设备: " +deviceId);
}
channelMapper.startPlay(device.getId(), channelId, stream);
}
@Override
public void stopPlay(String deviceId, String channelId) {
Device device = deviceMapper.getDeviceByDeviceId(deviceId);
if (device == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到设备: " +deviceId);
}
channelMapper.stopPlay(device.getId(), channelId);
}
@Override
@Transactional
public void batchUpdateChannelGPS(List<DeviceChannel> channelList) {
for (DeviceChannel deviceChannel : channelList) {
deviceChannel.setUpdateTime(DateUtil.getNow());
if (deviceChannel.getGpsTime() == null) {
deviceChannel.setGpsTime(DateUtil.getNow());
}
}
int count = 1000;
if (channelList.size() > count) {
for (int i = 0; i < channelList.size(); i+=count) {
int toIndex = i+count;
if ( i + count > channelList.size()) {
toIndex = channelList.size();
}
List<DeviceChannel> channels = channelList.subList(i, toIndex);
channelMapper.batchUpdatePosition(channels);
}
}else {
channelMapper.batchUpdatePosition(channelList);
}
}
@Override
@Transactional
public void batchAddMobilePosition(List<MobilePosition> mobilePositions) {
// int count = 500;
// if (mobilePositions.size() > count) {
// for (int i = 0; i < mobilePositions.size(); i+=count) {
// int toIndex = i+count;
// if ( i + count > mobilePositions.size()) {
// toIndex = mobilePositions.size();
// }
// List<MobilePosition> mobilePositionsSub = mobilePositions.subList(i, toIndex);
// deviceMobilePositionMapper.batchadd(mobilePositionsSub);
// }
// }else {
// deviceMobilePositionMapper.batchadd(mobilePositions);
// }
deviceMobilePositionMapper.batchadd(mobilePositions);
}
@Override
public void cleanChannelsForDevice(int deviceId) {
channelMapper.cleanChannelsByDeviceId(deviceId);
}
@Override
@Transactional
public boolean resetChannels(int deviceDbId, List<DeviceChannel> deviceChannelList) {
if (CollectionUtils.isEmpty(deviceChannelList)) {
return false;
}
List<DeviceChannel> allChannels = channelMapper.queryAllChannels(deviceDbId);
Map<String,DeviceChannel> allChannelMap = new ConcurrentHashMap<>();
if (!allChannels.isEmpty()) {
for (DeviceChannel deviceChannel : allChannels) {
allChannelMap.put(deviceChannel.getDeviceDbId() + deviceChannel.getDeviceId(), deviceChannel);
}
}
// 数据去重
List<DeviceChannel> channels = new ArrayList<>();
List<DeviceChannel> updateChannels = new ArrayList<>();
List<DeviceChannel> addChannels = new ArrayList<>();
List<DeviceChannel> deleteChannels = new ArrayList<>();
StringBuilder stringBuilder = new StringBuilder();
Map<String, Integer> subContMap = new HashMap<>();
// 数据去重
Set<String> gbIdSet = new HashSet<>();
for (DeviceChannel deviceChannel : deviceChannelList) {
if (gbIdSet.contains(deviceChannel.getDeviceId())) {
stringBuilder.append(deviceChannel.getDeviceId()).append(",");
continue;
}
gbIdSet.add(deviceChannel.getDeviceId());
DeviceChannel channelInDb = allChannelMap.get(deviceChannel.getDeviceDbId() + deviceChannel.getDeviceId());
if (channelInDb != null) {
deviceChannel.setStreamId(channelInDb.getStreamId());
deviceChannel.setHasAudio(channelInDb.isHasAudio());
deviceChannel.setId(channelInDb.getId());
if (channelInDb.getStatus().equalsIgnoreCase(deviceChannel.getStatus())){
List<String> strings = platformChannelMapper.queryParentPlatformByChannelId(deviceChannel.getDeviceId());
if (!CollectionUtils.isEmpty(strings)){
strings.forEach(platformId->{
eventPublisher.catalogEventPublish(platformId, deviceChannel, deviceChannel.getStatus().equals("ON")? CatalogEvent.ON:CatalogEvent.OFF);
});
}
}
deviceChannel.setUpdateTime(DateUtil.getNow());
updateChannels.add(deviceChannel);
}else {
deviceChannel.setCreateTime(DateUtil.getNow());
deviceChannel.setUpdateTime(DateUtil.getNow());
addChannels.add(deviceChannel);
}
allChannelMap.remove(deviceChannel.getDeviceDbId() + deviceChannel.getDeviceId());
channels.add(deviceChannel);
if (!ObjectUtils.isEmpty(deviceChannel.getParentId())) {
if (subContMap.get(deviceChannel.getParentId()) == null) {
subContMap.put(deviceChannel.getParentId(), 1);
}else {
Integer count = subContMap.get(deviceChannel.getParentId());
subContMap.put(deviceChannel.getParentId(), count++);
}
}
}
deleteChannels.addAll(allChannelMap.values());
if (!channels.isEmpty()) {
for (DeviceChannel channel : channels) {
if (subContMap.get(channel.getDeviceId()) != null){
Integer count = subContMap.get(channel.getDeviceId());
if (count > 0) {
channel.setSubCount(count);
channel.setParental(1);
}
}
}
}
if (stringBuilder.length() > 0) {
log.info("[目录查询]收到的数据存在重复: {}" , stringBuilder);
}
if(CollectionUtils.isEmpty(channels)){
log.info("通道重设,数据为空={}" , deviceChannelList);
return false;
}
int limitCount = 50;
boolean result = false;
if (!addChannels.isEmpty()) {
if (addChannels.size() > limitCount) {
for (int i = 0; i < addChannels.size(); i += limitCount) {
int toIndex = i + limitCount;
if (i + limitCount > addChannels.size()) {
toIndex = addChannels.size();
}
result = result || channelMapper.batchAdd(addChannels.subList(i, toIndex)) < 0;
}
}else {
result = result || channelMapper.batchAdd(addChannels) < 0;
}
}
if (!result && !updateChannels.isEmpty()) {
if (updateChannels.size() > limitCount) {
for (int i = 0; i < updateChannels.size(); i += limitCount) {
int toIndex = i + limitCount;
if (i + limitCount > updateChannels.size()) {
toIndex = updateChannels.size();
}
result = result || channelMapper.batchUpdate(updateChannels.subList(i, toIndex)) < 0;
}
}else {
result = result || channelMapper.batchUpdate(updateChannels) < 0;
}
}
if (!result && !deleteChannels.isEmpty()) {
System.out.println("删除: " + deleteChannels.size());
if (deleteChannels.size() > limitCount) {
for (int i = 0; i < deleteChannels.size(); i += limitCount) {
int toIndex = i + limitCount;
if (i + limitCount > deleteChannels.size()) {
toIndex = deleteChannels.size();
}
result = result || channelMapper.batchDel(deleteChannels.subList(i, toIndex)) < 0;
}
}else {
result = result || channelMapper.batchDel(deleteChannels) < 0;
}
}
return true;
}
@Override
public PageInfo<DeviceChannel> getSubChannels(int deviceDbId, String channelId, String query, Boolean channelType, Boolean online, int page, int count) {
PageHelper.startPage(page, count);
List<DeviceChannel> all = channelMapper.queryChannels(deviceDbId, channelId, query, channelType, online,null);
return new PageInfo<>(all);
}
@Override
public List<DeviceChannelExtend> queryChannelExtendsByDeviceId(String deviceId, List<String> channelIds, Boolean online) {
return channelMapper.queryChannelsWithDeviceInfo(deviceId, null,null, null, online,channelIds);
}
@Override
public PageInfo queryChannelsByDeviceId(String deviceId, String query, Boolean hasSubChannel, Boolean online, int page, int count) {
Device device = deviceMapper.getDeviceByDeviceId(deviceId);
if (device == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到设备:" + deviceId);
}
// 获取到所有正在播放的流
PageHelper.startPage(page, count);
List<DeviceChannel> all = channelMapper.queryChannels(device.getId(), null, query, hasSubChannel, online,null);
return new PageInfo<>(all);
}
@Override
public List<Device> queryDeviceWithAsMessageChannel() {
return deviceMapper.queryDeviceWithAsMessageChannel();
}
@Override
public DeviceChannel getRawChannel(int id) {
return deviceMapper.getRawChannel(id);
}
}

View File

@@ -1,572 +0,0 @@
package com.genersoft.iot.vmp.service.impl;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.dao.DeviceChannelMapper;
import com.genersoft.iot.vmp.gb28181.dao.DeviceMapper;
import com.genersoft.iot.vmp.gb28181.dao.PlatformChannelMapper;
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
import com.genersoft.iot.vmp.gb28181.task.impl.CatalogSubscribeTask;
import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeTask;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd.CatalogResponseMessageHandler;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IDeviceChannelService;
import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.service.IInviteStreamService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.stereotype.Service;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ObjectUtils;
import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
import java.text.ParseException;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* 设备业务(目录订阅)
*/
@Slf4j
@Service
@DS("master")
public class DeviceServiceImpl implements IDeviceService {
@Autowired
private SIPCommander cmder;
@Autowired
private DynamicTask dynamicTask;
@Autowired
private ISIPCommander sipCommander;
@Autowired
private CatalogResponseMessageHandler catalogResponseMessageHandler;
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired
private IInviteStreamService inviteStreamService;
@Autowired
private DeviceMapper deviceMapper;
@Autowired
private PlatformChannelMapper platformChannelMapper;
@Autowired
private IDeviceChannelService deviceChannelService;
@Autowired
private DeviceChannelMapper deviceChannelMapper;
@Autowired
DataSourceTransactionManager dataSourceTransactionManager;
@Autowired
TransactionDefinition transactionDefinition;
@Autowired
private UserSetting userSetting;
@Autowired
private ISIPCommander commander;
@Autowired
private VideoStreamSessionManager streamSession;
@Autowired
private IMediaServerService mediaServerService;
@Autowired
private AudioBroadcastManager audioBroadcastManager;
@Override
public void online(Device device, SipTransactionInfo sipTransactionInfo) {
log.info("[设备上线] deviceId{}->{}:{}", device.getDeviceId(), device.getIp(), device.getPort());
Device deviceInRedis = redisCatchStorage.getDevice(device.getDeviceId());
Device deviceInDb = deviceMapper.getDeviceByDeviceId(device.getDeviceId());
String now = DateUtil.getNow();
if (deviceInRedis != null && deviceInDb == null) {
// redis 存在脏数据
inviteStreamService.clearInviteInfo(device.getDeviceId());
}
device.setUpdateTime(now);
device.setKeepaliveTime(now);
if (device.getKeepaliveIntervalTime() == 0) {
// 默认心跳间隔60
device.setKeepaliveIntervalTime(60);
}
if (sipTransactionInfo != null) {
device.setSipTransactionInfo(sipTransactionInfo);
}else {
if (deviceInRedis != null) {
device.setSipTransactionInfo(deviceInRedis.getSipTransactionInfo());
}
}
// 第一次上线 或则设备之前是离线状态--进行通道同步和设备信息查询
if (deviceInDb == null) {
device.setOnLine(true);
device.setCreateTime(now);
device.setUpdateTime(now);
log.info("[设备上线,首次注册]: {},查询设备信息以及通道信息", device.getDeviceId());
deviceMapper.add(device);
redisCatchStorage.updateDevice(device);
try {
commander.deviceInfoQuery(device);
} catch (InvalidArgumentException | SipException | ParseException e) {
log.error("[命令发送失败] 查询设备信息: {}", e.getMessage());
}
sync(device);
}else {
if(!device.isOnLine()){
device.setOnLine(true);
device.setCreateTime(now);
deviceMapper.update(device);
redisCatchStorage.updateDevice(device);
if (userSetting.getSyncChannelOnDeviceOnline()) {
log.info("[设备上线,离线状态下重新注册]: {},查询设备信息以及通道信息", device.getDeviceId());
try {
commander.deviceInfoQuery(device);
} catch (InvalidArgumentException | SipException | ParseException e) {
log.error("[命令发送失败] 查询设备信息: {}", e.getMessage());
}
sync(device);
// TODO 如果设备下的通道级联到了其他平台那么需要发送事件或者notify给上级平台
}
// 上线添加订阅
if (device.getSubscribeCycleForCatalog() > 0) {
// 查询在线设备那些开启了订阅,为设备开启定时的目录订阅
addCatalogSubscribe(device);
}
if (device.getSubscribeCycleForMobilePosition() > 0) {
addMobilePositionSubscribe(device);
}
if (userSetting.getDeviceStatusNotify()) {
// 发送redis消息
redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), null, true);
}
}else {
if (deviceChannelMapper.queryAllChannels(device.getId()).isEmpty()) {
log.info("[设备上线]: {}通道数为0,查询通道信息", device.getDeviceId());
sync(device);
}
deviceMapper.update(device);
redisCatchStorage.updateDevice(device);
}
}
// 刷新过期任务
String registerExpireTaskKey = VideoManagerConstants.REGISTER_EXPIRE_TASK_KEY_PREFIX + device.getDeviceId();
// 如果第一次注册那么必须在60 * 3时间内收到一个心跳否则设备离线
dynamicTask.startDelay(registerExpireTaskKey, ()-> offline(device.getDeviceId(), "首次注册后未能收到心跳"), device.getKeepaliveIntervalTime() * 1000 * 3);
//
// try {
// cmder.alarmSubscribe(device, 600, "0", "4", "0", "2023-7-27T00:00:00", "2023-7-28T00:00:00");
// } catch (InvalidArgumentException e) {
// throw new RuntimeException(e);
// } catch (SipException e) {
// throw new RuntimeException(e);
// } catch (ParseException e) {
// throw new RuntimeException(e);
// }
}
@Override
public void offline(String deviceId, String reason) {
log.warn("[设备离线]{}, device{}", reason, deviceId);
Device device = deviceMapper.getDeviceByDeviceId(deviceId);
if (device == null) {
return;
}
String registerExpireTaskKey = VideoManagerConstants.REGISTER_EXPIRE_TASK_KEY_PREFIX + deviceId;
dynamicTask.stop(registerExpireTaskKey);
if (device.isOnLine()) {
if (userSetting.getDeviceStatusNotify()) {
// 发送redis消息
redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), null, false);
}
}
device.setOnLine(false);
redisCatchStorage.updateDevice(device);
deviceMapper.update(device);
//进行通道离线
// deviceChannelMapper.offlineByDeviceId(deviceId);
// 离线释放所有ssrc
List<SsrcTransaction> ssrcTransactions = streamSession.getSsrcTransactionForAll(deviceId, null, null, null);
if (ssrcTransactions != null && ssrcTransactions.size() > 0) {
for (SsrcTransaction ssrcTransaction : ssrcTransactions) {
mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc());
mediaServerService.closeRTPServer(ssrcTransaction.getMediaServerId(), ssrcTransaction.getStream());
streamSession.removeByCallId(deviceId, ssrcTransaction.getChannelId(), ssrcTransaction.getCallId());
}
}
// 移除订阅
removeCatalogSubscribe(device, null);
removeMobilePositionSubscribe(device, null);
List<AudioBroadcastCatch> audioBroadcastCatches = audioBroadcastManager.get(deviceId);
if (audioBroadcastCatches.size() > 0) {
for (AudioBroadcastCatch audioBroadcastCatch : audioBroadcastCatches) {
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(deviceId, audioBroadcastCatch.getChannelId(), null, null);
if (sendRtpItem != null) {
redisCatchStorage.deleteSendRTPServer(deviceId, sendRtpItem.getChannelId(), null, null);
MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), null);
}
audioBroadcastManager.del(deviceId, audioBroadcastCatch.getChannelId());
}
}
}
@Override
public boolean addCatalogSubscribe(Device device) {
if (device == null || device.getSubscribeCycleForCatalog() < 0) {
return false;
}
log.info("[添加目录订阅] 设备{}", device.getDeviceId());
// 添加目录订阅
CatalogSubscribeTask catalogSubscribeTask = new CatalogSubscribeTask(device, sipCommander, dynamicTask);
// 刷新订阅
int subscribeCycleForCatalog = Math.max(device.getSubscribeCycleForCatalog(),30);
// 设置最小值为30
dynamicTask.startCron(device.getDeviceId() + "catalog", catalogSubscribeTask, (subscribeCycleForCatalog -1) * 1000);
catalogSubscribeTask.run();
return true;
}
@Override
public boolean removeCatalogSubscribe(Device device, CommonCallback<Boolean> callback) {
if (device == null || device.getSubscribeCycleForCatalog() < 0) {
return false;
}
log.info("[移除目录订阅]: {}", device.getDeviceId());
String taskKey = device.getDeviceId() + "catalog";
if (device.isOnLine()) {
Runnable runnable = dynamicTask.get(taskKey);
if (runnable instanceof ISubscribeTask) {
ISubscribeTask subscribeTask = (ISubscribeTask) runnable;
subscribeTask.stop(callback);
}
}
dynamicTask.stop(taskKey);
return true;
}
@Override
public boolean addMobilePositionSubscribe(Device device) {
if (device == null || device.getSubscribeCycleForMobilePosition() < 0) {
return false;
}
log.info("[添加移动位置订阅] 设备{}", device.getDeviceId());
// 添加目录订阅
MobilePositionSubscribeTask mobilePositionSubscribeTask = new MobilePositionSubscribeTask(device, sipCommander, dynamicTask);
// 设置最小值为30
int subscribeCycleForCatalog = Math.max(device.getSubscribeCycleForMobilePosition(),30);
// 刷新订阅
dynamicTask.startCron(device.getDeviceId() + "mobile_position" , mobilePositionSubscribeTask, subscribeCycleForCatalog * 1000);
mobilePositionSubscribeTask.run();
return true;
}
@Override
public boolean removeMobilePositionSubscribe(Device device, CommonCallback<Boolean> callback) {
if (device == null || device.getSubscribeCycleForCatalog() < 0) {
return false;
}
log.info("[移除移动位置订阅]: {}", device.getDeviceId());
String taskKey = device.getDeviceId() + "mobile_position";
if (device.isOnLine()) {
Runnable runnable = dynamicTask.get(taskKey);
if (runnable instanceof ISubscribeTask) {
ISubscribeTask subscribeTask = (ISubscribeTask) runnable;
subscribeTask.stop(callback);
}
}
dynamicTask.stop(taskKey);
return true;
}
@Override
public SyncStatus getChannelSyncStatus(String deviceId) {
return catalogResponseMessageHandler.getChannelSyncProgress(deviceId);
}
@Override
public Boolean isSyncRunning(String deviceId) {
return catalogResponseMessageHandler.isSyncRunning(deviceId);
}
@Override
public void sync(Device device) {
if (catalogResponseMessageHandler.isSyncRunning(device.getDeviceId())) {
log.info("开启同步时发现同步已经存在");
return;
}
int sn = (int)((Math.random()*9+1)*100000);
catalogResponseMessageHandler.setChannelSyncReady(device, sn);
try {
sipCommander.catalogQuery(device, sn, event -> {
String errorMsg = String.format("同步通道失败,错误码: %s, %s", event.statusCode, event.msg);
catalogResponseMessageHandler.setChannelSyncEnd(device.getDeviceId(), errorMsg);
});
} catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[同步通道], 信令发送失败:{}", e.getMessage() );
String errorMsg = String.format("同步通道失败,信令发送失败: %s", e.getMessage());
catalogResponseMessageHandler.setChannelSyncEnd(device.getDeviceId(), errorMsg);
}
}
@Override
public Device getDevice(String deviceId) {
Device device = redisCatchStorage.getDevice(deviceId);
if (device == null) {
device = deviceMapper.getDeviceByDeviceId(deviceId);
if (device != null) {
redisCatchStorage.updateDevice(device);
}
}
return device;
}
@Override
public List<Device> getAllOnlineDevice() {
return deviceMapper.getOnlineDevices();
}
@Override
public List<Device> getAllByStatus(boolean status) {
return deviceMapper.getDevices(status);
}
@Override
public boolean expire(Device device) {
Instant registerTimeDate = Instant.from(DateUtil.formatter.parse(device.getRegisterTime()));
Instant expireInstant = registerTimeDate.plusMillis(TimeUnit.SECONDS.toMillis(device.getExpires()));
return expireInstant.isBefore(Instant.now());
}
@Override
public void checkDeviceStatus(Device device) {
if (device == null || !device.isOnLine()) {
return;
}
try {
sipCommander.deviceStatusQuery(device, null);
} catch (InvalidArgumentException | SipException | ParseException e) {
log.error("[命令发送失败] 设备状态查询: {}", e.getMessage());
}
}
@Override
public Device getDeviceByHostAndPort(String host, int port) {
return deviceMapper.getDeviceByHostAndPort(host, port);
}
@Override
public void updateDevice(Device device) {
String now = DateUtil.getNow();
device.setUpdateTime(now);
device.setCharset(device.getCharset().toUpperCase());
device.setUpdateTime(DateUtil.getNow());
if (deviceMapper.update(device) > 0) {
redisCatchStorage.updateDevice(device);
}
}
@Override
public List<DeviceChannel> queryVideoDeviceInTreeNode(String deviceId, String parentId) {
Device device = deviceMapper.getDeviceByDeviceId(deviceId);
if (device == null) {
return null;
}
if (ObjectUtils.isEmpty(parentId) || parentId.equals(deviceId)) {
return deviceChannelMapper.getSubChannelsByDeviceId(device.getId(), null, false);
}else {
return deviceChannelMapper.getSubChannelsByDeviceId(device.getId(), parentId, false);
}
}
@Override
public boolean isExist(String deviceId) {
return deviceMapper.getDeviceByDeviceId(deviceId) != null;
}
@Override
public void addDevice(Device device) {
device.setOnLine(false);
device.setCreateTime(DateUtil.getNow());
device.setUpdateTime(DateUtil.getNow());
deviceMapper.addCustomDevice(device);
}
@Override
public void updateCustomDevice(Device device) {
Device deviceInStore = deviceMapper.getDeviceByDeviceId(device.getDeviceId());
if (deviceInStore == null) {
log.warn("更新设备时未找到设备信息");
return;
}
if (!ObjectUtils.isEmpty(device.getName())) {
deviceInStore.setName(device.getName());
}
if (!ObjectUtils.isEmpty(device.getCharset())) {
deviceInStore.setCharset(device.getCharset());
}
if (!ObjectUtils.isEmpty(device.getMediaServerId())) {
deviceInStore.setMediaServerId(device.getMediaServerId());
}
if (!ObjectUtils.isEmpty(device.getCharset())) {
deviceInStore.setCharset(device.getCharset());
}
if (!ObjectUtils.isEmpty(device.getSdpIp())) {
deviceInStore.setSdpIp(device.getSdpIp());
}
if (!ObjectUtils.isEmpty(device.getPassword())) {
deviceInStore.setPassword(device.getPassword());
}
if (!ObjectUtils.isEmpty(device.getStreamMode())) {
deviceInStore.setStreamMode(device.getStreamMode());
}
// 目录订阅相关的信息
if (deviceInStore.getSubscribeCycleForCatalog() != device.getSubscribeCycleForCatalog()) {
if (device.getSubscribeCycleForCatalog() > 0) {
// 若已开启订阅,但订阅周期不同,则先取消
if (deviceInStore.getSubscribeCycleForCatalog() != 0) {
removeCatalogSubscribe(deviceInStore, result->{
// 开启订阅
deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog());
addCatalogSubscribe(deviceInStore);
// 因为是异步执行,需要在这里更新下数据
deviceMapper.updateCustom(deviceInStore);
redisCatchStorage.updateDevice(deviceInStore);
});
}else {
// 开启订阅
deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog());
addCatalogSubscribe(deviceInStore);
}
}else if (device.getSubscribeCycleForCatalog() == 0) {
// 取消订阅
deviceInStore.setSubscribeCycleForCatalog(0);
removeCatalogSubscribe(deviceInStore, null);
}
}
// 移动位置订阅相关的信息
if (deviceInStore.getSubscribeCycleForMobilePosition() != device.getSubscribeCycleForMobilePosition()) {
if (device.getSubscribeCycleForMobilePosition() > 0) {
// 若已开启订阅,但订阅周期不同,则先取消
if (deviceInStore.getSubscribeCycleForMobilePosition() != 0) {
removeMobilePositionSubscribe(deviceInStore, result->{
// 开启订阅
deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition());
deviceInStore.setMobilePositionSubmissionInterval(device.getMobilePositionSubmissionInterval());
addMobilePositionSubscribe(deviceInStore);
// 因为是异步执行,需要在这里更新下数据
deviceMapper.updateCustom(deviceInStore);
redisCatchStorage.updateDevice(deviceInStore);
});
}else {
// 开启订阅
deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition());
deviceInStore.setMobilePositionSubmissionInterval(device.getMobilePositionSubmissionInterval());
addMobilePositionSubscribe(deviceInStore);
}
}else if (device.getSubscribeCycleForMobilePosition() == 0) {
// 取消订阅
deviceInStore.setSubscribeCycleForMobilePosition(0);
deviceInStore.setMobilePositionSubmissionInterval(0);
removeMobilePositionSubscribe(deviceInStore, null);
}
}
if (deviceInStore.getGeoCoordSys() != null) {
// 坐标系变化需要重新计算GCJ02坐标和WGS84坐标
if (!deviceInStore.getGeoCoordSys().equals(device.getGeoCoordSys())) {
deviceInStore.setGeoCoordSys(device.getGeoCoordSys());
}
}else {
deviceInStore.setGeoCoordSys("WGS84");
}
if (device.getCharset() == null) {
deviceInStore.setCharset("GB2312");
}
//SSRC校验
deviceInStore.setSsrcCheck(device.isSsrcCheck());
//作为消息通道
deviceInStore.setAsMessageChannel(device.isAsMessageChannel());
deviceMapper.updateCustom(deviceInStore);
redisCatchStorage.updateDevice(deviceInStore);
}
@Override
@Transactional
public boolean delete(String deviceId) {
Device device = deviceMapper.getDeviceByDeviceId(deviceId);
if (device == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到设备:" + deviceId);
}
platformChannelMapper.delChannelForDeviceId(deviceId);
deviceChannelMapper.cleanChannelsByDeviceId(device.getId());
deviceMapper.del(deviceId);
redisCatchStorage.removeDevice(deviceId);
return true;
}
@Override
public ResourceBaseInfo getOverview() {
List<Device> onlineDevices = deviceMapper.getOnlineDevices();
List<Device> all = deviceMapper.getAll();
return new ResourceBaseInfo(all.size(), onlineDevices.size());
}
@Override
public List<Device> getAll() {
return deviceMapper.getAll();
}
@Override
public PageInfo<Device> getAll(int page, int count, String query, Boolean status) {
PageHelper.startPage(page, count);
List<Device> all = deviceMapper.getDeviceList(query, status);
return new PageInfo<>(all);
}
}

View File

@@ -1,340 +0,0 @@
package com.genersoft.iot.vmp.service.impl;
import com.alibaba.fastjson2.JSON;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.genersoft.iot.vmp.common.InviteInfo;
import com.genersoft.iot.vmp.common.InviteSessionStatus;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.dao.DeviceChannelMapper;
import com.genersoft.iot.vmp.gb28181.dao.DeviceMapper;
import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
import com.genersoft.iot.vmp.service.IDeviceChannelService;
import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.service.IInviteStreamService;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
@Slf4j
@Service
@DS("master")
public class InviteStreamServiceImpl implements IInviteStreamService {
private final Map<String, List<ErrorCallback<Object>>> inviteErrorCallbackMap = new ConcurrentHashMap<>();
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
@Autowired
private UserSetting userSetting;
@Autowired
private DeviceMapper deviceMapper;
@Autowired
private DeviceChannelMapper deviceChannelMapper;
/**
* 流到来的处理
*/
@Async("taskExecutor")
@org.springframework.context.event.EventListener
public void onApplicationEvent(MediaArrivalEvent event) {
// if ("rtsp".equals(event.getSchema()) && "rtp".equals(event.getApp())) {
//
// }
}
/**
* 流离开的处理
*/
@Async("taskExecutor")
@EventListener
public void onApplicationEvent(MediaDepartureEvent event) {
if ("rtsp".equals(event.getSchema()) && "rtp".equals(event.getApp())) {
InviteInfo inviteInfo = getInviteInfoByStream(null, event.getStream());
if (inviteInfo != null && (inviteInfo.getType() == InviteSessionType.PLAY || inviteInfo.getType() == InviteSessionType.PLAYBACK)) {
removeInviteInfo(inviteInfo);
Device device = deviceMapper.getDeviceByDeviceId(inviteInfo.getDeviceId());
if (device != null) {
deviceChannelMapper.stopPlay(device.getId(), inviteInfo.getChannelId());
}
}
}
}
@Override
public void updateInviteInfo(InviteInfo inviteInfo) {
if (InviteSessionStatus.ready == inviteInfo.getStatus()) {
updateInviteInfo(inviteInfo, Long.valueOf(userSetting.getPlayTimeout()) * 2);
}else {
updateInviteInfo(inviteInfo, null);
}
}
@Override
public void updateInviteInfo(InviteInfo inviteInfo, Long time) {
if (inviteInfo == null || (inviteInfo.getDeviceId() == null || inviteInfo.getChannelId() == null)) {
log.warn("[更新Invite信息],参数不全: {}", JSON.toJSON(inviteInfo));
return;
}
InviteInfo inviteInfoForUpdate = null;
if (InviteSessionStatus.ready == inviteInfo.getStatus()) {
if (inviteInfo.getDeviceId() == null
|| inviteInfo.getChannelId() == null
|| inviteInfo.getType() == null
|| inviteInfo.getStream() == null
) {
return;
}
inviteInfoForUpdate = inviteInfo;
} else {
InviteInfo inviteInfoInRedis = getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(),
inviteInfo.getChannelId(), inviteInfo.getStream());
if (inviteInfoInRedis == null) {
log.warn("[更新Invite信息]未从缓存中读取到Invite信息 deviceId: {}, channel: {}, stream: {}",
inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream());
return;
}
if (inviteInfo.getStreamInfo() != null) {
inviteInfoInRedis.setStreamInfo(inviteInfo.getStreamInfo());
}
if (inviteInfo.getSsrcInfo() != null) {
inviteInfoInRedis.setSsrcInfo(inviteInfo.getSsrcInfo());
}
if (inviteInfo.getStreamMode() != null) {
inviteInfoInRedis.setStreamMode(inviteInfo.getStreamMode());
}
if (inviteInfo.getReceiveIp() != null) {
inviteInfoInRedis.setReceiveIp(inviteInfo.getReceiveIp());
}
if (inviteInfo.getReceivePort() != null) {
inviteInfoInRedis.setReceivePort(inviteInfo.getReceivePort());
}
if (inviteInfo.getStatus() != null) {
inviteInfoInRedis.setStatus(inviteInfo.getStatus());
}
inviteInfoForUpdate = inviteInfoInRedis;
}
String key = VideoManagerConstants.INVITE_PREFIX +
":" + inviteInfoForUpdate.getType() +
":" + inviteInfoForUpdate.getDeviceId() +
":" + inviteInfoForUpdate.getChannelId() +
":" + inviteInfoForUpdate.getStream()+
":" + inviteInfoForUpdate.getSsrcInfo().getSsrc();
if (time != null && time > 0) {
redisTemplate.opsForValue().set(key, inviteInfoForUpdate, time, TimeUnit.SECONDS);
}else {
redisTemplate.opsForValue().set(key, inviteInfoForUpdate);
}
}
@Override
public InviteInfo updateInviteInfoForStream(InviteInfo inviteInfo, String stream) {
InviteInfo inviteInfoInDb = getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream());
if (inviteInfoInDb == null) {
return null;
}
removeInviteInfo(inviteInfoInDb);
String key = VideoManagerConstants.INVITE_PREFIX +
":" + inviteInfo.getType() +
":" + inviteInfo.getDeviceId() +
":" + inviteInfo.getChannelId() +
":" + stream +
":" + inviteInfo.getSsrcInfo().getSsrc();
inviteInfoInDb.setStream(stream);
if (inviteInfoInDb.getSsrcInfo() != null) {
inviteInfoInDb.getSsrcInfo().setStream(stream);
}
if (InviteSessionStatus.ready == inviteInfo.getStatus()) {
redisTemplate.opsForValue().set(key, inviteInfoInDb, userSetting.getPlayTimeout() * 2, TimeUnit.SECONDS);
}else {
redisTemplate.opsForValue().set(key, inviteInfoInDb);
}
return inviteInfoInDb;
}
@Override
public InviteInfo getInviteInfo(InviteSessionType type, String deviceId, String channelId, String stream) {
String key = VideoManagerConstants.INVITE_PREFIX +
":" + (type != null ? type : "*") +
":" + (deviceId != null ? deviceId : "*") +
":" + (channelId != null ? channelId : "*") +
":" + (stream != null ? stream : "*")
+ ":*";
List<Object> scanResult = RedisUtil.scan(redisTemplate, key);
if (scanResult.isEmpty()) {
return null;
}
if (scanResult.size() != 1) {
log.warn("[获取InviteInfo] 发现 key: {}存在多条", key);
}
return (InviteInfo) redisTemplate.opsForValue().get(scanResult.get(0));
}
@Override
public InviteInfo getInviteInfoByDeviceAndChannel(InviteSessionType type, String deviceId, String channelId) {
return getInviteInfo(type, deviceId, channelId, null);
}
@Override
public InviteInfo getInviteInfoByStream(InviteSessionType type, String stream) {
return getInviteInfo(type, null, null, stream);
}
@Override
public void removeInviteInfo(InviteSessionType type, String deviceId, String channelId, String stream) {
String scanKey = VideoManagerConstants.INVITE_PREFIX +
":" + (type != null ? type : "*") +
":" + (deviceId != null ? deviceId : "*") +
":" + (channelId != null ? channelId : "*") +
":" + (stream != null ? stream : "*") +
":*";
List<Object> scanResult = RedisUtil.scan(redisTemplate, scanKey);
if (scanResult.size() > 0) {
for (Object keyObj : scanResult) {
String key = (String) keyObj;
InviteInfo inviteInfo = (InviteInfo) redisTemplate.opsForValue().get(key);
if (inviteInfo == null) {
continue;
}
redisTemplate.delete(key);
inviteErrorCallbackMap.remove(buildKey(type, deviceId, channelId, inviteInfo.getStream()));
}
}
}
@Override
public void removeInviteInfoByDeviceAndChannel(InviteSessionType inviteSessionType, String deviceId, String channelId) {
removeInviteInfo(inviteSessionType, deviceId, channelId, null);
}
@Override
public void removeInviteInfo(InviteInfo inviteInfo) {
removeInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream());
}
@Override
public void once(InviteSessionType type, String deviceId, String channelId, String stream, ErrorCallback<Object> callback) {
String key = buildKey(type, deviceId, channelId, stream);
List<ErrorCallback<Object>> callbacks = inviteErrorCallbackMap.get(key);
if (callbacks == null) {
callbacks = new CopyOnWriteArrayList<>();
inviteErrorCallbackMap.put(key, callbacks);
}
callbacks.add(callback);
}
private String buildKey(InviteSessionType type, String deviceId, String channelId, String stream) {
String key = type + ":" + deviceId + ":" + channelId;
// 如果ssrc未null那么可以实现一个通道只能一次操作ssrc不为null则可以支持一个通道多次invite
if (stream != null) {
key += (":" + stream);
}
return key;
}
@Override
public void clearInviteInfo(String deviceId) {
removeInviteInfo(null, deviceId, null, null);
}
@Override
public int getStreamInfoCount(String mediaServerId) {
int count = 0;
String key = VideoManagerConstants.INVITE_PREFIX + ":*:*:*:*:*";
List<Object> scanResult = RedisUtil.scan(redisTemplate, key);
if (scanResult.size() == 0) {
return 0;
}else {
for (Object keyObj : scanResult) {
String keyStr = (String) keyObj;
InviteInfo inviteInfo = (InviteInfo) redisTemplate.opsForValue().get(keyStr);
if (inviteInfo != null && inviteInfo.getStreamInfo() != null && inviteInfo.getStreamInfo().getMediaServerId().equals(mediaServerId)) {
if (inviteInfo.getType().equals(InviteSessionType.DOWNLOAD) && inviteInfo.getStreamInfo().getProgress() == 1) {
continue;
}
count++;
}
}
}
return count;
}
@Override
public void call(InviteSessionType type, String deviceId, String channelId, String stream, int code, String msg, Object data) {
String key = buildSubStreamKey(type, deviceId, channelId, stream);
List<ErrorCallback<Object>> callbacks = inviteErrorCallbackMap.get(key);
if (callbacks == null) {
return;
}
for (ErrorCallback<Object> callback : callbacks) {
callback.run(code, msg, data);
}
inviteErrorCallbackMap.remove(key);
}
private String buildSubStreamKey(InviteSessionType type, String deviceId, String channelId, String stream) {
String key = type + ":" + ":" + deviceId + ":" + channelId;
// 如果ssrc为null那么可以实现一个通道只能一次操作ssrc不为null则可以支持一个通道多次invite
if (stream != null) {
key += (":" + stream);
}
return key;
}
@Override
public InviteInfo getInviteInfoBySSRC(String ssrc) {
String key = VideoManagerConstants.INVITE_PREFIX + ":*:*:*:*:" + ssrc;
List<Object> scanResult = RedisUtil.scan(redisTemplate, key);
if (scanResult.size() != 1) {
return null;
}
return (InviteInfo) redisTemplate.opsForValue().get(scanResult.get(0));
}
@Override
public InviteInfo updateInviteInfoForSSRC(InviteInfo inviteInfo, String ssrc) {
InviteInfo inviteInfoInDb = getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream());
if (inviteInfoInDb == null) {
return null;
}
removeInviteInfo(inviteInfoInDb);
String key = VideoManagerConstants.INVITE_PREFIX +
":" + inviteInfo.getType() +
":" + inviteInfo.getDeviceId() +
":" + inviteInfo.getChannelId() +
":" + inviteInfo.getStream() +
":" + ssrc;
if (inviteInfoInDb.getSsrcInfo() != null) {
inviteInfoInDb.getSsrcInfo().setSsrc(ssrc);
}
redisTemplate.opsForValue().set(key, inviteInfoInDb);
return inviteInfoInDb;
}
}

View File

@@ -7,6 +7,9 @@ import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService;
import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;

View File

@@ -1,173 +0,0 @@
package com.genersoft.iot.vmp.service.impl;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.service.IPlatformChannelService;
import com.genersoft.iot.vmp.gb28181.dao.DeviceChannelMapper;
import com.genersoft.iot.vmp.gb28181.dao.ParentPlatformMapper;
import com.genersoft.iot.vmp.gb28181.dao.PlatformCatalogMapper;
import com.genersoft.iot.vmp.gb28181.dao.PlatformChannelMapper;
import com.genersoft.iot.vmp.gb28181.controller.bean.ChannelReduce;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.stereotype.Service;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.util.ObjectUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author lin
*/
@Slf4j
@Service
@DS("master")
public class PlatformChannelServiceImpl implements IPlatformChannelService {
@Autowired
private PlatformChannelMapper platformChannelMapper;
@Autowired
TransactionDefinition transactionDefinition;
@Autowired
DataSourceTransactionManager dataSourceTransactionManager;
@Autowired
private SubscribeHolder subscribeHolder;
@Autowired
private DeviceChannelMapper deviceChannelMapper;
@Autowired
private PlatformCatalogMapper catalogManager;
@Autowired
private ParentPlatformMapper platformMapper;
@Autowired
EventPublisher eventPublisher;
@Override
public int updateChannelForGB(String platformId, List<ChannelReduce> channelReduces, String catalogId) {
ParentPlatform platform = platformMapper.getParentPlatByServerGBId(platformId);
if (platform == null) {
log.warn("更新级联通道信息时未找到平台{}的信息", platformId);
return 0;
}
Map<Integer, ChannelReduce> deviceAndChannels = new HashMap<>();
for (ChannelReduce channelReduce : channelReduces) {
channelReduce.setCatalogId(catalogId);
deviceAndChannels.put(channelReduce.getId(), channelReduce);
}
List<Integer> deviceAndChannelList = new ArrayList<>(deviceAndChannels.keySet());
// 查询当前已经存在的
List<Integer> channelIds = platformChannelMapper.findChannelRelatedPlatform(platformId, channelReduces);
if (deviceAndChannelList != null) {
deviceAndChannelList.removeAll(channelIds);
}
for (Integer channelId : channelIds) {
deviceAndChannels.remove(channelId);
}
List<ChannelReduce> channelReducesToAdd = new ArrayList<>(deviceAndChannels.values());
// 对剩下的数据进行存储
int allCount = 0;
boolean result = false;
TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
int limitCount = 50;
if (channelReducesToAdd.size() > 0) {
if (channelReducesToAdd.size() > limitCount) {
for (int i = 0; i < channelReducesToAdd.size(); i += limitCount) {
int toIndex = i + limitCount;
if (i + limitCount > channelReducesToAdd.size()) {
toIndex = channelReducesToAdd.size();
}
int count = platformChannelMapper.addChannels(platformId, channelReducesToAdd.subList(i, toIndex));
result = result || count < 0;
allCount += count;
log.info("[关联通道]国标通道 平台:{}, 共需关联通道数:{}, 已关联:{}", platformId, channelReducesToAdd.size(), toIndex);
}
}else {
allCount = platformChannelMapper.addChannels(platformId, channelReducesToAdd);
result = result || allCount < 0;
log.info("[关联通道]国标通道 平台:{}, 关联通道数:{}", platformId, channelReducesToAdd.size());
}
if (result) {
//事务回滚
dataSourceTransactionManager.rollback(transactionStatus);
allCount = 0;
}else {
log.info("[关联通道]国标通道 平台:{}, 正在存入数据库", platformId);
dataSourceTransactionManager.commit(transactionStatus);
}
SubscribeInfo catalogSubscribe = subscribeHolder.getCatalogSubscribe(platformId);
if (catalogSubscribe != null) {
List<CommonGBChannel> deviceChannelList = getDeviceChannelListByChannelReduceList(channelReducesToAdd, catalogId, platform);
if (deviceChannelList != null) {
eventPublisher.catalogEventPublish(platformId, deviceChannelList, CatalogEvent.ADD);
}
}
log.info("[关联通道]国标通道 平台:{}, 存入数据库成功", platformId);
}
return allCount;
}
private List<CommonGBChannel> getDeviceChannelListByChannelReduceList(List<ChannelReduce> channelReduces, String catalogId, ParentPlatform platform) {
List<CommonGBChannel> deviceChannelList = new ArrayList<>();
if (!channelReduces.isEmpty()){
PlatformCatalog catalog = catalogManager.selectByPlatFormAndCatalogId(platform.getServerGBId(),catalogId);
if (catalog == null && catalogId.equals(platform.getDeviceGBId())) {
for (ChannelReduce channelReduce : channelReduces) {
DeviceChannel deviceChannel = deviceChannelMapper.getOne(channelReduce.getId());
deviceChannel.setParental(0);
deviceChannel.setCivilCode(platform.getServerGBDomain());
deviceChannelList.add(deviceChannel);
}
return deviceChannelList;
} else if (catalog == null && !catalogId.equals(platform.getDeviceGBId())) {
log.warn("未查询到目录{}的信息", catalogId);
return null;
}
for (ChannelReduce channelReduce : channelReduces) {
DeviceChannel deviceChannel = deviceChannelMapper.getOne(channelReduce.getId());
deviceChannel.setParental(0);
deviceChannel.setCivilCode(catalog.getCivilCode());
deviceChannel.setParentId(catalog.getParentId());
deviceChannel.setBusinessGroupId(catalog.getBusinessGroupId());
deviceChannelList.add(deviceChannel);
}
}
return deviceChannelList;
}
@Override
public int delAllChannelForGB(String platformId, String catalogId) {
int result;
if (platformId == null) {
return 0;
}
ParentPlatform platform = platformMapper.getParentPlatByServerGBId(platformId);
if (platform == null) {
return 0;
}
if (ObjectUtils.isEmpty(catalogId)) {
catalogId = null;
}
List<CommonGBChannel> deviceChannels = platformChannelMapper.queryAllChannelInCatalog(platformId, catalogId);
eventPublisher.catalogEventPublish(platformId, deviceChannels, CatalogEvent.DEL);
return platformChannelMapper.delChannelForGBByCatalogId(platformId, catalogId);
}
}

View File

@@ -1,798 +0,0 @@
package com.genersoft.iot.vmp.service.impl;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.genersoft.iot.vmp.common.InviteInfo;
import com.genersoft.iot.vmp.common.InviteSessionStatus;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.hook.HookData;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
import com.genersoft.iot.vmp.media.event.mediaServer.MediaSendRtpStoppedEvent;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IInviteStreamService;
import com.genersoft.iot.vmp.service.IPlatformService;
import com.genersoft.iot.vmp.service.IPlayService;
import com.genersoft.iot.vmp.service.bean.*;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
import com.genersoft.iot.vmp.gb28181.dao.ParentPlatformMapper;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import gov.nist.javax.sip.message.SIPResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import javax.sdp.*;
import javax.sip.InvalidArgumentException;
import javax.sip.ResponseEvent;
import javax.sip.SipException;
import java.text.ParseException;
import java.util.List;
import java.util.UUID;
import java.util.Vector;
/**
* @author lin
*/
@Slf4j
@Service
@DS("master")
public class PlatformServiceImpl implements IPlatformService {
private final static String REGISTER_KEY_PREFIX = "platform_register_";
private final static String REGISTER_FAIL_AGAIN_KEY_PREFIX = "platform_register_fail_again_";
private final static String KEEPALIVE_KEY_PREFIX = "platform_keepalive_";
@Autowired
private ParentPlatformMapper platformMapper;
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired
private SSRCFactory ssrcFactory;
@Autowired
private IMediaServerService mediaServerService;
@Autowired
private ISIPCommanderForPlatform commanderForPlatform;
@Autowired
private DynamicTask dynamicTask;
@Autowired
private SubscribeHolder subscribeHolder;
@Autowired
private GbStreamMapper gbStreamMapper;
@Autowired
private UserSetting userSetting;
@Autowired
private VideoStreamSessionManager streamSession;
@Autowired
private IPlayService playService;
@Autowired
private IInviteStreamService inviteStreamService;
/**
* 流离开的处理
*/
@Async("taskExecutor")
@EventListener
public void onApplicationEvent(MediaDepartureEvent event) {
List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(event.getStream());
if (!sendRtpItems.isEmpty()) {
for (SendRtpItem sendRtpItem : sendRtpItems) {
if (sendRtpItem != null && sendRtpItem.getApp().equals(event.getApp())) {
String platformId = sendRtpItem.getPlatformId();
ParentPlatform platform = platformMapper.getParentPlatByServerGBId(platformId);
try {
if (platform != null) {
commanderForPlatform.streamByeCmd(platform, sendRtpItem);
redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(),
sendRtpItem.getCallId(), sendRtpItem.getStream());
}
} catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] 发送BYE: {}", e.getMessage());
}
}
}
}
}
/**
* 发流停止
*/
@Async("taskExecutor")
@EventListener
public void onApplicationEvent(MediaSendRtpStoppedEvent event) {
List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(event.getStream());
if (sendRtpItems != null && !sendRtpItems.isEmpty()) {
for (SendRtpItem sendRtpItem : sendRtpItems) {
ParentPlatform parentPlatform = platformMapper.getParentPlatByServerGBId(sendRtpItem.getPlatformId());
ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc());
try {
commanderForPlatform.streamByeCmd(parentPlatform, sendRtpItem.getCallId());
} catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
}
redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(),
sendRtpItem.getCallId(), sendRtpItem.getStream());
}
}
}
@Override
public ParentPlatform queryPlatformByServerGBId(String platformGbId) {
return platformMapper.getParentPlatByServerGBId(platformGbId);
}
@Override
public PageInfo<ParentPlatform> queryParentPlatformList(int page, int count) {
PageHelper.startPage(page, count);
List<ParentPlatform> all = platformMapper.getParentPlatformList();
return new PageInfo<>(all);
}
@Override
public boolean add(ParentPlatform parentPlatform) {
if (parentPlatform.getCatalogGroup() == 0) {
// 每次发送目录的数量默认为1
parentPlatform.setCatalogGroup(1);
}
parentPlatform.setCatalogId(parentPlatform.getDeviceGBId());
int result = platformMapper.addParentPlatform(parentPlatform);
// 添加缓存
ParentPlatformCatch parentPlatformCatch = new ParentPlatformCatch();
parentPlatformCatch.setParentPlatform(parentPlatform);
parentPlatformCatch.setId(parentPlatform.getServerGBId());
parentPlatformCatch.setParentPlatform(parentPlatform);
redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
if (parentPlatform.isEnable()) {
// 保存时启用就发送注册
// 注册成功时由程序直接调用了online方法
try {
commanderForPlatform.register(parentPlatform, eventResult -> {
log.info("[国标级联] {},添加向上级注册失败,请确定上级平台可用时重新保存", parentPlatform.getServerGBId());
}, null);
} catch (InvalidArgumentException | ParseException | SipException e) {
log.error("[命令发送失败] 国标级联: {}", e.getMessage());
}
}
return result > 0;
}
@Override
public boolean update(ParentPlatform parentPlatform) {
log.info("[国标级联]更新平台 {}", parentPlatform.getDeviceGBId());
parentPlatform.setCharacterSet(parentPlatform.getCharacterSet().toUpperCase());
ParentPlatform parentPlatformOld = platformMapper.getParentPlatById(parentPlatform.getId());
ParentPlatformCatch parentPlatformCatchOld = redisCatchStorage.queryPlatformCatchInfo(parentPlatformOld.getServerGBId());
parentPlatform.setUpdateTime(DateUtil.getNow());
// 停止心跳定时
final String keepaliveTaskKey = KEEPALIVE_KEY_PREFIX + parentPlatformOld.getServerGBId();
dynamicTask.stop(keepaliveTaskKey);
// 停止注册定时
final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatformOld.getServerGBId();
dynamicTask.stop(registerTaskKey);
// 注销旧的
try {
if (parentPlatformOld.isStatus() && parentPlatformCatchOld != null) {
log.info("保存平台{}时发现旧平台在线,发送注销命令", parentPlatformOld.getServerGBId());
commanderForPlatform.unregister(parentPlatformOld, parentPlatformCatchOld.getSipTransactionInfo(), null, eventResult -> {
log.info("[国标级联] 注销成功, 平台:{}", parentPlatformOld.getServerGBId());
});
}
} catch (InvalidArgumentException | ParseException | SipException e) {
log.error("[命令发送失败] 国标级联 注销: {}", e.getMessage());
}
// 更新数据库
if (parentPlatform.getCatalogGroup() == 0) {
parentPlatform.setCatalogGroup(1);
}
platformMapper.updateParentPlatform(parentPlatform);
// 更新redis
redisCatchStorage.delPlatformCatchInfo(parentPlatformOld.getServerGBId());
ParentPlatformCatch parentPlatformCatch = new ParentPlatformCatch();
parentPlatformCatch.setParentPlatform(parentPlatform);
parentPlatformCatch.setId(parentPlatform.getServerGBId());
redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
// 注册
if (parentPlatform.isEnable()) {
// 保存时启用就发送注册
// 注册成功时由程序直接调用了online方法
try {
log.info("[国标级联] 平台注册 {}", parentPlatform.getDeviceGBId());
commanderForPlatform.register(parentPlatform, eventResult -> {
log.info("[国标级联] {},添加向上级注册失败,请确定上级平台可用时重新保存", parentPlatform.getServerGBId());
}, null);
} catch (InvalidArgumentException | ParseException | SipException e) {
log.error("[命令发送失败] 国标级联: {}", e.getMessage());
}
}
return false;
}
@Override
public void online(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo) {
log.info("[国标级联]{}, 平台上线", parentPlatform.getServerGBId());
final String registerFailAgainTaskKey = REGISTER_FAIL_AGAIN_KEY_PREFIX + parentPlatform.getServerGBId();
dynamicTask.stop(registerFailAgainTaskKey);
platformMapper.updateParentPlatformStatus(parentPlatform.getServerGBId(), true);
ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId());
if (parentPlatformCatch == null) {
parentPlatformCatch = new ParentPlatformCatch();
parentPlatformCatch.setParentPlatform(parentPlatform);
parentPlatformCatch.setId(parentPlatform.getServerGBId());
parentPlatform.setStatus(true);
parentPlatformCatch.setParentPlatform(parentPlatform);
}
parentPlatformCatch.getParentPlatform().setStatus(true);
parentPlatformCatch.setSipTransactionInfo(sipTransactionInfo);
redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatform.getServerGBId();
if (!dynamicTask.isAlive(registerTaskKey)) {
log.info("[国标级联]{}, 添加定时注册任务", parentPlatform.getServerGBId());
// 添加注册任务
dynamicTask.startCron(registerTaskKey,
// 注册失败注册成功时由程序直接调用了online方法
()-> registerTask(parentPlatform, sipTransactionInfo),
parentPlatform.getExpires() * 1000);
}
final String keepaliveTaskKey = KEEPALIVE_KEY_PREFIX + parentPlatform.getServerGBId();
if (!dynamicTask.contains(keepaliveTaskKey)) {
log.info("[国标级联]{}, 添加定时心跳任务", parentPlatform.getServerGBId());
// 添加心跳任务
dynamicTask.startCron(keepaliveTaskKey,
()-> {
try {
commanderForPlatform.keepalive(parentPlatform, eventResult -> {
// 心跳失败
if (eventResult.type != SipSubscribe.EventResultType.timeout) {
log.warn("[国标级联]发送心跳收到错误code {}, msg: {}", eventResult.statusCode, eventResult.msg);
}
// 心跳失败
ParentPlatformCatch platformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId());
// 此时是第三次心跳超时, 平台离线
if (platformCatch.getKeepAliveReply() == 2) {
// 设置平台离线,并重新注册
log.info("[国标级联] 三次心跳失败, 平台{}({})离线", parentPlatform.getName(), parentPlatform.getServerGBId());
offline(parentPlatform, false);
}else {
platformCatch.setKeepAliveReply(platformCatch.getKeepAliveReply() + 1);
redisCatchStorage.updatePlatformCatchInfo(platformCatch);
}
}, eventResult -> {
// 心跳成功
// 清空之前的心跳超时计数
ParentPlatformCatch platformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId());
if (platformCatch != null && platformCatch.getKeepAliveReply() > 0) {
platformCatch.setKeepAliveReply(0);
redisCatchStorage.updatePlatformCatchInfo(platformCatch);
}
log.info("[发送心跳] 国标级联 发送心跳, code {}, msg: {}", eventResult.statusCode, eventResult.msg);
});
} catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] 国标级联 发送心跳: {}", e.getMessage());
}
},
(parentPlatform.getKeepTimeout())*1000);
}
if (parentPlatform.isAutoPushChannel()) {
if (subscribeHolder.getCatalogSubscribe(parentPlatform.getServerGBId()) == null) {
log.info("[国标级联]{}, 添加自动通道推送模拟订阅信息", parentPlatform.getServerGBId());
addSimulatedSubscribeInfo(parentPlatform);
}
}else {
SubscribeInfo catalogSubscribe = subscribeHolder.getCatalogSubscribe(parentPlatform.getServerGBId());
if (catalogSubscribe != null && catalogSubscribe.getExpires() == -1) {
subscribeHolder.removeCatalogSubscribe(parentPlatform.getServerGBId());
}
}
}
@Override
public void addSimulatedSubscribeInfo(ParentPlatform parentPlatform) {
// 自动添加一条模拟的订阅信息
SubscribeInfo subscribeInfo = new SubscribeInfo();
subscribeInfo.setId(parentPlatform.getServerGBId());
subscribeInfo.setExpires(-1);
subscribeInfo.setEventType("Catalog");
int random = (int) Math.floor(Math.random() * 10000);
subscribeInfo.setEventId(random + "");
subscribeInfo.setSimulatedCallId(UUID.randomUUID().toString().replace("-", "") + "@" + parentPlatform.getServerIP());
subscribeInfo.setSimulatedFromTag(UUID.randomUUID().toString().replace("-", ""));
subscribeInfo.setSimulatedToTag(UUID.randomUUID().toString().replace("-", ""));
subscribeHolder.putCatalogSubscribe(parentPlatform.getServerGBId(), subscribeInfo);
}
private void registerTask(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo){
try {
// 不在同一个会话中续订则每次全新注册
if (!userSetting.isRegisterKeepIntDialog()) {
sipTransactionInfo = null;
}
if (sipTransactionInfo == null) {
log.info("[国标级联] 平台:{}注册即将到期,开始重新注册", parentPlatform.getServerGBId());
}else {
log.info("[国标级联] 平台:{}注册即将到期,开始续订", parentPlatform.getServerGBId());
}
commanderForPlatform.register(parentPlatform, sipTransactionInfo, eventResult -> {
log.info("[国标级联] 平台:{}注册失败,{}:{}", parentPlatform.getServerGBId(),
eventResult.statusCode, eventResult.msg);
offline(parentPlatform, false);
}, null);
} catch (Exception e) {
log.error("[命令发送失败] 国标级联定时注册: {}", e.getMessage());
}
}
@Override
public void offline(ParentPlatform parentPlatform, boolean stopRegister) {
log.info("[平台离线]{}", parentPlatform.getServerGBId());
ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId());
parentPlatformCatch.setKeepAliveReply(0);
parentPlatformCatch.setRegisterAliveReply(0);
ParentPlatform parentPlatformInCatch = parentPlatformCatch.getParentPlatform();
parentPlatformInCatch.setStatus(false);
parentPlatformCatch.setParentPlatform(parentPlatformInCatch);
redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
platformMapper.updateParentPlatformStatus(parentPlatform.getServerGBId(), false);
// 停止所有推流
log.info("[平台离线] {}, 停止所有推流", parentPlatform.getServerGBId());
stopAllPush(parentPlatform.getServerGBId());
// 清除注册定时
log.info("[平台离线] {}, 停止定时注册任务", parentPlatform.getServerGBId());
final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatform.getServerGBId();
if (dynamicTask.contains(registerTaskKey)) {
dynamicTask.stop(registerTaskKey);
}
// 清除心跳定时
log.info("[平台离线] {}, 停止定时发送心跳任务", parentPlatform.getServerGBId());
final String keepaliveTaskKey = KEEPALIVE_KEY_PREFIX + parentPlatform.getServerGBId();
if (dynamicTask.contains(keepaliveTaskKey)) {
// 清除心跳任务
dynamicTask.stop(keepaliveTaskKey);
}
// 停止订阅回复
SubscribeInfo catalogSubscribe = subscribeHolder.getCatalogSubscribe(parentPlatform.getServerGBId());
if (catalogSubscribe != null) {
if (catalogSubscribe.getExpires() > 0) {
log.info("[平台离线] {}, 停止目录订阅回复", parentPlatform.getServerGBId());
subscribeHolder.removeCatalogSubscribe(parentPlatform.getServerGBId());
}
}
log.info("[平台离线] {}, 停止移动位置订阅回复", parentPlatform.getServerGBId());
subscribeHolder.removeMobilePositionSubscribe(parentPlatform.getServerGBId());
// 发起定时自动重新注册
if (!stopRegister) {
// 设置为60秒自动尝试重新注册
final String registerFailAgainTaskKey = REGISTER_FAIL_AGAIN_KEY_PREFIX + parentPlatform.getServerGBId();
ParentPlatform platform = platformMapper.getParentPlatById(parentPlatform.getId());
if (platform.isEnable()) {
dynamicTask.startCron(registerFailAgainTaskKey,
()-> registerTask(platform, null),
userSetting.getRegisterAgainAfterTime() * 1000);
}
}
}
private void stopAllPush(String platformId) {
List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServer(platformId);
if (sendRtpItems != null && sendRtpItems.size() > 0) {
for (SendRtpItem sendRtpItem : sendRtpItems) {
ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc());
redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(), null, null);
MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), null);
}
}
}
@Override
public void login(ParentPlatform parentPlatform) {
final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatform.getServerGBId();
try {
commanderForPlatform.register(parentPlatform, eventResult1 -> {
log.info("[国标级联] {}开始定时发起注册间隔为1分钟", parentPlatform.getServerGBId());
// 添加注册任务
dynamicTask.startCron(registerTaskKey,
// 注册失败注册成功时由程序直接调用了online方法
()-> log.info("[国标级联] {},平台离线后持续发起注册,失败", parentPlatform.getServerGBId()),
60*1000);
}, null);
} catch (InvalidArgumentException | ParseException | SipException e) {
log.error("[命令发送失败] 国标级联注册: {}", e.getMessage());
}
}
@Override
public void sendNotifyMobilePosition(String platformId) {
ParentPlatform platform = platformMapper.getParentPlatByServerGBId(platformId);
if (platform == null) {
return;
}
SubscribeInfo subscribe = subscribeHolder.getMobilePositionSubscribe(platform.getServerGBId());
if (subscribe != null) {
// TODO 暂时只处理视频流的回复,后续增加对国标设备的支持
List<DeviceChannel> gbStreams = gbStreamMapper.queryGbStreamListInPlatform(platform.getServerGBId(), userSetting.isUsePushingAsStatus());
if (gbStreams.size() == 0) {
return;
}
for (DeviceChannel deviceChannel : gbStreams) {
String gbId = deviceChannel.getDeviceId();
GPSMsgInfo gpsMsgInfo = redisCatchStorage.getGpsMsgInfo(gbId);
// 无最新位置不发送
if (gpsMsgInfo != null) {
// 经纬度都为0不发送
if (gpsMsgInfo.getLng() == 0 && gpsMsgInfo.getLat() == 0) {
continue;
}
// 发送GPS消息
try {
commanderForPlatform.sendNotifyMobilePosition(platform, gpsMsgInfo, subscribe);
} catch (InvalidArgumentException | ParseException | NoSuchFieldException | SipException |
IllegalAccessException e) {
log.error("[命令发送失败] 国标级联 移动位置通知: {}", e.getMessage());
}
}
}
}
}
@Override
public void broadcastInvite(ParentPlatform platform, String channelId, MediaServer mediaServerItem, HookSubscribe.Event hookEvent,
SipSubscribe.Event errorEvent, InviteTimeOutCallback timeoutCallback) throws InvalidArgumentException, ParseException, SipException {
if (mediaServerItem == null) {
log.info("[国标级联] 语音喊话未找到可用的zlm. platform: {}", platform.getServerGBId());
return;
}
InviteInfo inviteInfoForOld = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, platform.getServerGBId(), channelId);
if (inviteInfoForOld != null && inviteInfoForOld.getStreamInfo() != null) {
// 如果zlm不存在这个流则删除数据即可
MediaServer mediaServerItemForStreamInfo = mediaServerService.getOne(inviteInfoForOld.getStreamInfo().getMediaServerId());
if (mediaServerItemForStreamInfo != null) {
Boolean ready = mediaServerService.isStreamReady(mediaServerItemForStreamInfo, inviteInfoForOld.getStreamInfo().getApp(), inviteInfoForOld.getStreamInfo().getStream());
if (!ready) {
// 错误存在于redis中的数据
inviteStreamService.removeInviteInfo(inviteInfoForOld);
}else {
// 流确实尚在推流,直接回调结果
HookData hookData = new HookData();
hookData.setApp(inviteInfoForOld.getStreamInfo().getApp());
hookData.setStream(inviteInfoForOld.getStreamInfo().getStream());
hookData.setMediaServer(mediaServerItemForStreamInfo);
hookEvent.response(hookData);
return;
}
}
}
String streamId = null;
if (mediaServerItem.isRtpEnable()) {
streamId = String.format("%s_%s", platform.getServerGBId(), channelId);
}
// 默认不进行SSRC校验 TODO 后续可改为配置
boolean ssrcCheck = false;
int tcpMode;
if (userSetting.getBroadcastForPlatform().equalsIgnoreCase("TCP-PASSIVE")) {
tcpMode = 1;
}else if (userSetting.getBroadcastForPlatform().equalsIgnoreCase("TCP-ACTIVE")) {
tcpMode = 2;
} else {
tcpMode = 0;
}
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, ssrcCheck, false, null, true, false, false, tcpMode);
if (ssrcInfo == null || ssrcInfo.getPort() < 0) {
log.info("[国标级联] 发起语音喊话 开启端口监听失败, platform: {}, channel {}", platform.getServerGBId(), channelId);
SipSubscribe.EventResult<Object> eventResult = new SipSubscribe.EventResult<>();
eventResult.statusCode = -1;
eventResult.msg = "端口监听失败";
eventResult.type = SipSubscribe.EventResultType.failedToGetPort;
errorEvent.response(eventResult);
return;
}
log.info("[国标级联] 语音喊话发起Invite消息 deviceId: {}, channelId: {},收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验{}",
platform.getServerGBId(), channelId, ssrcInfo.getPort(), userSetting.getBroadcastForPlatform(), ssrcInfo.getSsrc(), ssrcCheck);
// 初始化redis中的invite消息状态
InviteInfo inviteInfo = InviteInfo.getInviteInfo(platform.getServerGBId(), channelId, ssrcInfo.getStream(), ssrcInfo,
mediaServerItem.getSdpIp(), ssrcInfo.getPort(), userSetting.getBroadcastForPlatform(), InviteSessionType.BROADCAST,
InviteSessionStatus.ready);
inviteStreamService.updateInviteInfo(inviteInfo);
String timeOutTaskKey = UUID.randomUUID().toString();
dynamicTask.startDelay(timeOutTaskKey, () -> {
// 执行超时任务时查询是否已经成功,成功了则不执行超时任务,防止超时任务取消失败的情况
InviteInfo inviteInfoForBroadcast = inviteStreamService.getInviteInfo(InviteSessionType.BROADCAST, platform.getServerGBId(), channelId, null);
if (inviteInfoForBroadcast == null) {
log.info("[国标级联] 发起语音喊话 收流超时 deviceId: {}, channelId: {},端口:{}, SSRC: {}", platform.getServerGBId(), channelId, ssrcInfo.getPort(), ssrcInfo.getSsrc());
// 点播超时回复BYE 同时释放ssrc以及此次点播的资源
try {
commanderForPlatform.streamByeCmd(platform, channelId, ssrcInfo.getStream(), null, null);
} catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
log.error("[点播超时] 发送BYE失败 {}", e.getMessage());
} finally {
timeoutCallback.run(1, "收流超时");
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
streamSession.remove(platform.getServerGBId(), channelId, ssrcInfo.getStream());
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
}
}
}, userSetting.getPlayTimeout());
commanderForPlatform.broadcastInviteCmd(platform, channelId, mediaServerItem, ssrcInfo, (hookData)->{
log.info("[国标级联] 发起语音喊话 收到上级推流 deviceId: {}, channelId: {}", platform.getServerGBId(), channelId);
dynamicTask.stop(timeOutTaskKey);
// hook响应
playService.onPublishHandlerForPlay(hookData.getMediaServer(), hookData.getMediaInfo(), platform.getServerGBId(), channelId);
// 收到流
if (hookEvent != null) {
hookEvent.response(hookData);
}
}, event -> {
inviteOKHandler(event, ssrcInfo, tcpMode, ssrcCheck, mediaServerItem, platform, channelId, timeOutTaskKey,
null, inviteInfo, InviteSessionType.BROADCAST);
// // 收到200OK 检测ssrc是否有变化防止上级自定义了ssrc
// ResponseEvent responseEvent = (ResponseEvent) event.event;
// String contentString = new String(responseEvent.getResponse().getRawContent());
// // 获取ssrc
// int ssrcIndex = contentString.indexOf("y=");
// // 检查是否有y字段
// if (ssrcIndex >= 0) {
// //ssrc规定长度为10字节不取余下长度以避免后续还有“f=”字段 TODO 后续对不规范的非10位ssrc兼容
// String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
// // 查询到ssrc不一致且开启了ssrc校验则需要针对处理
// if (ssrcInfo.getSsrc().equals(ssrcInResponse) || ssrcCheck) {
// tcpActiveHandler(platform, )
// return;
// }
// logger.info("[点播消息] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse);
// if (!mediaServerItem.isRtpEnable()) {
// logger.info("[点播消息] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
// // 释放ssrc
// mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
// // 单端口模式streamId也有变化需要重新设置监听
// if (!mediaServerItem.isRtpEnable()) {
// // 添加订阅
// HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
// subscribe.removeSubscribe(hookSubscribe);
// hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase());
// subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam) -> {
// logger.info("[ZLM HOOK] ssrc修正后收到订阅消息 " + hookParam);
// dynamicTask.stop(timeOutTaskKey);
// // hook响应
// playService.onPublishHandlerForPlay(mediaServerItemInUse, hookParam, platform.getServerGBId(), channelId);
// hookEvent.response(mediaServerItemInUse, hookParam);
// });
// }
// // 关闭rtp server
// mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
// // 重新开启ssrc server
// mediaServerService.openRTPServer(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse, false, false, ssrcInfo.getPort(), true, false, tcpMode);
// }
// }
}, eventResult -> {
// 收到错误回复
if (errorEvent != null) {
errorEvent.response(eventResult);
}
});
}
private void inviteOKHandler(SipSubscribe.EventResult eventResult, SSRCInfo ssrcInfo, int tcpMode, boolean ssrcCheck, MediaServer mediaServerItem,
ParentPlatform platform, String channelId, String timeOutTaskKey, ErrorCallback<Object> callback,
InviteInfo inviteInfo, InviteSessionType inviteSessionType){
inviteInfo.setStatus(InviteSessionStatus.ok);
ResponseEvent responseEvent = (ResponseEvent) eventResult.event;
String contentString = new String(responseEvent.getResponse().getRawContent());
System.out.println(contentString);
String ssrcInResponse = SipUtils.getSsrcFromSdp(contentString);
// 兼容回复的消息中缺少ssrc(y字段)的情况
if (ssrcInResponse == null) {
ssrcInResponse = ssrcInfo.getSsrc();
}
if (ssrcInfo.getSsrc().equals(ssrcInResponse)) {
// ssrc 一致
if (mediaServerItem.isRtpEnable()) {
// 多端口
if (tcpMode == 2) {
tcpActiveHandler(platform, channelId, contentString, mediaServerItem, tcpMode, ssrcCheck,
timeOutTaskKey, ssrcInfo, callback);
}
}else {
// 单端口
if (tcpMode == 2) {
log.warn("[Invite 200OK] 单端口收流模式不支持tcp主动模式收流");
}
}
}else {
log.info("[Invite 200OK] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse);
// ssrc 不一致
if (mediaServerItem.isRtpEnable()) {
// 多端口
if (ssrcCheck) {
// ssrc检验
// 更新ssrc
log.info("[Invite 200OK] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse);
if (!result) {
try {
log.warn("[Invite 200OK] 更新ssrc失败停止喊话 {}/{}", platform.getServerGBId(), channelId);
commanderForPlatform.streamByeCmd(platform, channelId, ssrcInfo.getStream(), null, null);
} catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
log.error("[命令发送失败] 停止播放, 发送BYE: {}", e.getMessage());
}
dynamicTask.stop(timeOutTaskKey);
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
streamSession.remove(platform.getServerGBId(), channelId, ssrcInfo.getStream());
callback.run(InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
"下级自定义了ssrc,重新设置收流信息失败", null);
inviteStreamService.call(inviteSessionType, platform.getServerGBId(), channelId, null,
InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
"下级自定义了ssrc,重新设置收流信息失败", null);
}else {
ssrcInfo.setSsrc(ssrcInResponse);
inviteInfo.setSsrcInfo(ssrcInfo);
inviteInfo.setStream(ssrcInfo.getStream());
if (tcpMode == 2) {
if (mediaServerItem.isRtpEnable()) {
tcpActiveHandler(platform, channelId, contentString, mediaServerItem, tcpMode, ssrcCheck,
timeOutTaskKey, ssrcInfo, callback);
}else {
log.warn("[Invite 200OK] 单端口收流模式不支持tcp主动模式收流");
}
}
inviteStreamService.updateInviteInfo(inviteInfo);
}
}else {
ssrcInfo.setSsrc(ssrcInResponse);
inviteInfo.setSsrcInfo(ssrcInfo);
inviteInfo.setStream(ssrcInfo.getStream());
if (tcpMode == 2) {
if (mediaServerItem.isRtpEnable()) {
tcpActiveHandler(platform, channelId, contentString, mediaServerItem, tcpMode, ssrcCheck,
timeOutTaskKey, ssrcInfo, callback);
}else {
log.warn("[Invite 200OK] 单端口收流模式不支持tcp主动模式收流");
}
}
inviteStreamService.updateInviteInfo(inviteInfo);
}
}else {
if (ssrcInResponse != null) {
// 单端口
// 重新订阅流上线
SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(inviteInfo.getDeviceId(),
inviteInfo.getChannelId(), null, inviteInfo.getStream());
streamSession.remove(inviteInfo.getDeviceId(),
inviteInfo.getChannelId(), inviteInfo.getStream());
inviteStreamService.updateInviteInfoForSSRC(inviteInfo, ssrcInResponse);
streamSession.put(platform.getServerGBId(), channelId, ssrcTransaction.getCallId(),
inviteInfo.getStream(), ssrcInResponse, mediaServerItem.getId(), (SIPResponse) responseEvent.getResponse(), inviteSessionType);
}
}
}
}
private void tcpActiveHandler(ParentPlatform platform, String channelId, String contentString,
MediaServer mediaServerItem, int tcpMode, boolean ssrcCheck,
String timeOutTaskKey, SSRCInfo ssrcInfo, ErrorCallback<Object> callback){
if (tcpMode != 2) {
return;
}
String substring;
if (contentString.indexOf("y=") > 0) {
substring = contentString.substring(0, contentString.indexOf("y="));
}else {
substring = contentString;
}
try {
SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring);
int port = -1;
Vector mediaDescriptions = sdp.getMediaDescriptions(true);
for (Object description : mediaDescriptions) {
MediaDescription mediaDescription = (MediaDescription) description;
Media media = mediaDescription.getMedia();
Vector mediaFormats = media.getMediaFormats(false);
if (mediaFormats.contains("8") || mediaFormats.contains("0")) {
port = media.getMediaPort();
break;
}
}
log.info("[TCP主动连接对方] serverGbId: {}, channelId: {}, 连接对方的地址:{}:{}, SSRC: {}, SSRC校验{}",
platform.getServerGBId(), channelId, sdp.getConnection().getAddress(), port, ssrcInfo.getSsrc(), ssrcCheck);
Boolean result = mediaServerService.connectRtpServer(mediaServerItem, sdp.getConnection().getAddress(), port, ssrcInfo.getStream());
log.info("[TCP主动连接对方] 结果: {}", result);
} catch (SdpException e) {
log.error("[TCP主动连接对方] serverGbId: {}, channelId: {}, 解析200OK的SDP信息失败", platform.getServerGBId(), channelId, e);
dynamicTask.stop(timeOutTaskKey);
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
streamSession.remove(platform.getServerGBId(), channelId, ssrcInfo.getStream());
callback.run(InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
inviteStreamService.call(InviteSessionType.PLAY, platform.getServerGBId(), channelId, null,
InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
}
}
@Override
public void stopBroadcast(ParentPlatform platform, DeviceChannel channel, String stream, boolean sendBye, MediaServer mediaServerItem) {
try {
if (sendBye) {
commanderForPlatform.streamByeCmd(platform, channel.getDeviceId(), stream, null, null);
}
} catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
log.warn("[消息发送失败] 停止语音对讲, 平台:{},通道:{}", platform.getId(), channel.getDeviceId() );
} finally {
mediaServerService.closeRTPServer(mediaServerItem, stream);
InviteInfo inviteInfo = inviteStreamService.getInviteInfo(null, platform.getServerGBId(), channel.getDeviceId(), stream);
if (inviteInfo != null) {
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), inviteInfo.getSsrcInfo().getSsrc());
inviteStreamService.removeInviteInfo(inviteInfo);
}
streamSession.remove(platform.getServerGBId(), channel.getDeviceId(), stream);
}
}
}

View File

@@ -8,8 +8,8 @@ import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarm;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.service.IDeviceChannelService;
import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService;
import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
import lombok.extern.slf4j.Slf4j;