Merge branch 'master' into 重构/1078

# Conflicts:
#	src/main/resources/application.yml
This commit is contained in:
lin
2025-06-03 17:21:21 +08:00
25 changed files with 138 additions and 86 deletions

View File

@@ -1,23 +0,0 @@
package com.genersoft.iot.vmp.gb28181.bean;
/**
* 国标类型编码,国标编码中11-13位为类型编码
* 详见 附 录 D 编码规则 A
* @author lin
*/
public class ChannelIdType {
/**
* 中心信令控制服务器编码
*/
public final static String CENTRAL_SIGNALING_CONTROL_SERVER = "200";
/**
* 业务分组编码
*/
public final static String BUSINESS_GROUP = "215";
/**
* 虚拟组织编码
*/
public final static String VIRTUAL_ORGANIZATION = "216";
}

View File

@@ -26,19 +26,19 @@ public class Device {
*/
@Schema(description = "名称")
private String name;
/**
* 生产厂商
*/
@Schema(description = "生产厂商")
private String manufacturer;
/**
* 型号
*/
@Schema(description = "型号")
private String model;
/**
* 固件版本
*/
@@ -78,7 +78,7 @@ public class Device {
*/
@Schema(description = "wan地址")
private String hostAddress;
/**
* 在线
*/

View File

@@ -123,7 +123,12 @@ public class DeviceQuery {
log.debug("设备通道信息同步API调用deviceId" + deviceId);
}
Device device = deviceService.getDeviceByDeviceId(deviceId);
if (device.getRegisterTime() == null) {
WVPResult<SyncStatus> wvpResult = new WVPResult<>();
wvpResult.setCode(ErrorCode.ERROR100.getCode());
wvpResult.setMsg("设备尚未注册过");
return wvpResult;
}
return deviceService.devicesSync(device);
}

View File

@@ -578,4 +578,8 @@ public interface CommonGBChannelMapper {
" <foreach collection='channelIdsForClear' item='item' open='(' separator=',' close=')' > #{item}</foreach>" +
" </script>"})
void removeParentIdByChannelIds(List<Integer> channelIdsForClear);
@SelectProvider(type = ChannelProvider.class, method = "queryOnlineListsByGbDeviceId")
List<CommonGBChannel> queryOnlineListsByGbDeviceId(@Param("deviceId") int deviceId);
}

View File

@@ -666,4 +666,8 @@ public interface DeviceChannelMapper {
" where data_type = 1 and data_device_id=#{dataDeviceId} and device_id = #{channelId}" +
" </script>"})
DeviceChannel getOneBySourceChannelId(@Param("dataDeviceId") int dataDeviceId, @Param("channelId") String channelId);
@Update(value = {"UPDATE wvp_device_channel SET status = 'OFF' WHERE data_type = 1 and data_device_id=#{deviceId}"})
void offlineByDeviceId(@Param("deviceId") int deviceId);
}

View File

@@ -144,7 +144,7 @@ public interface DeviceMapper {
", subscribe_cycle_for_alarm=#{subscribeCycleForAlarm}" +
", expires=#{expires}" +
", server_id=#{serverId}" +
"WHERE device_id=#{deviceId}"+
" WHERE device_id=#{deviceId}"+
" </script>"})
int update(Device device);

View File

@@ -147,8 +147,8 @@ public interface PlatformChannelMapper {
" <if test='query != null'> " +
" AND (coalesce(wdc.gb_device_id, wdc.device_id) LIKE concat('%',#{query},'%') OR wpgc.custom_device_id LIKE concat('%',#{query},'%') " +
" OR coalesce(wdc.gb_name, wdc.name) LIKE concat('%',#{query},'%') OR wpgc.custom_name LIKE concat('%',#{query},'%'))</if> " +
" <if test='online == true'> AND coalesce(wpgc.status, wdc.gb_status, wdc.status) = 'ON'</if> " +
" <if test='online == false'> AND coalesce(wpgc.status, wdc.gb_status, wdc.status) = 'OFF'</if> " +
" <if test='online == true'> AND coalesce(wpgc.custom_status, wdc.gb_status, wdc.status) = 'ON'</if> " +
" <if test='online == false'> AND coalesce(wpgc.custom_status, wdc.gb_status, wdc.status) = 'OFF'</if> " +
" <if test='hasShare == true'> AND wpgc.platform_id = #{platformId}</if> " +
" <if test='hasShare == false'> AND wpgc.platform_id is null</if> " +
" <if test='dataType != null'> AND wdc.data_type = #{dataType}</if> " +

View File

@@ -455,6 +455,13 @@ public class ChannelProvider {
return sqlBuild.toString();
}
public String queryOnlineListsByGbDeviceId(Map<String, Object> params ){
StringBuilder sqlBuild = new StringBuilder();
sqlBuild.append(BASE_SQL_TABLE_NAME);
sqlBuild.append(" where wdc.channel_type = 0 AND coalesce(wdc.gb_status, wdc.status) = 'ON' AND wdc.data_type = 1 AND data_device_id = #{deviceId}");
return sqlBuild.toString();
}
public String queryAllForUnusualCivilCode(Map<String, Object> params ){
StringBuilder sqlBuild = new StringBuilder();
sqlBuild.append("select wdc.id from wvp_device_channel wdc ");

View File

@@ -68,11 +68,7 @@ public class EventPublisher {
deviceChannelList.add(deviceChannel);
catalogEventPublish(platform, deviceChannelList, type);
}
public void catalogEventPublish(Platform platform, List<CommonGBChannel> deviceChannels, String type) {
catalogEventPublish(platform, deviceChannels, type, true);
}
public void catalogEventPublish(Platform platform, List<CommonGBChannel> deviceChannels, String type, boolean share) {
if (platform != null && !userSetting.getServerId().equals(platform.getServerId())) {
log.info("[国标级联] 目录状态推送, 此上级平台由其他服务处理,消息已经忽略");
return;

View File

@@ -47,7 +47,7 @@ public class CatalogEventLister implements ApplicationListener<CatalogEvent> {
public void onApplicationEvent(CatalogEvent event) {
SubscribeInfo subscribe = null;
Platform parentPlatform = null;
log.info("[Catalog事件: {}] 通道数量: {}", event.getType(), event.getChannels().size());
log.info("[Catalog事件: {}]通道数量: {}", event.getType(), event.getChannels().size());
Map<String, List<Platform>> platformMap = new HashMap<>();
Map<String, CommonGBChannel> channelMap = new HashMap<>();
if (event.getPlatform() != null) {

View File

@@ -224,7 +224,6 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
@Override
public List<Device> getDeviceByChannelId(String channelId) {
return channelMapper.getDeviceByChannelDeviceId(channelId);
}
@@ -574,7 +573,7 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
deviceChannel.setStreamId(channelInDb.getStreamId());
deviceChannel.setHasAudio(channelInDb.isHasAudio());
deviceChannel.setId(channelInDb.getId());
if (channelInDb.getStatus() != null && channelInDb.getStatus().equalsIgnoreCase(deviceChannel.getStatus())){
if (channelInDb.getStatus() != null && !channelInDb.getStatus().equalsIgnoreCase(deviceChannel.getStatus())){
List<Platform> platformList = platformChannelMapper.queryParentPlatformByChannelId(deviceChannel.getDeviceId());
if (!CollectionUtils.isEmpty(platformList)){
platformList.forEach(platform->{

View File

@@ -3,13 +3,15 @@ package com.genersoft.iot.vmp.gb28181.service.impl;
import com.alibaba.fastjson2.JSON;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.common.enums.ChannelDataType;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.dao.CommonGBChannelMapper;
import com.genersoft.iot.vmp.gb28181.dao.DeviceChannelMapper;
import com.genersoft.iot.vmp.gb28181.dao.DeviceMapper;
import com.genersoft.iot.vmp.gb28181.dao.PlatformChannelMapper;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService;
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
@@ -64,9 +66,6 @@ import java.util.concurrent.TimeUnit;
@Order(value=16)
public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
@Autowired
private DynamicTask dynamicTask;
@Autowired
private ISIPCommander sipCommander;
@@ -88,6 +87,12 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
@Autowired
private DeviceChannelMapper deviceChannelMapper;
@Autowired
private CommonGBChannelMapper commonGBChannelMapper;
@Autowired
private EventPublisher eventPublisher;
@Autowired
private ISendRtpServerService sendRtpServerService;
@@ -156,7 +161,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
}
// 恢复定时任务, TCP因为连接已经断开必须等待设备重新连接
DeviceStatusTask deviceStatusTask = DeviceStatusTask.getInstance(taskInfo.getDeviceId(),
taskInfo.getTransactionInfo(), taskInfo.getExpireTime(), this::deviceStatusExpire);
taskInfo.getTransactionInfo(), taskInfo.getExpireTime() + 1000 + System.currentTimeMillis(), this::deviceStatusExpire);
deviceStatusTaskRunner.addTask(deviceStatusTask);
onlineDeviceIds.add(taskInfo.getDeviceId());
}
@@ -238,8 +243,6 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
if (subscribeTaskRunner.containsKey(SubscribeTaskForMobilPosition.getKey(device))) {
subscribeTaskRunner.removeSubscribe(SubscribeTaskForMobilPosition.getKey(device));
}
//进行通道离线
// deviceChannelMapper.offlineByDeviceId(deviceId);
// 离线释放所有ssrc
List<SsrcTransaction> ssrcTransactions = sessionManager.getSsrcTransactionByDeviceId(device.getDeviceId());
if (ssrcTransactions != null && !ssrcTransactions.isEmpty()) {
@@ -308,7 +311,9 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
device.setCreateTime(now);
device.setUpdateTime(now);
log.info("[设备上线,首次注册]: {},查询设备信息以及通道信息", device.getDeviceId());
addCustomDevice(device);
if(device.getStreamMode() == null) {
device.setStreamMode("TCP-PASSIVE");
}
deviceMapper.add(device);
redisCatchStorage.updateDevice(device);
try {
@@ -333,7 +338,10 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
log.error("[命令发送失败] 查询设备信息: {}", e.getMessage());
}
sync(device);
// TODO 如果设备下的通道级联到了其他平台那么需要发送事件或者notify给上级平台
}else {
if (isDevice(device.getDeviceId())) {
sync(device);
}
}
// 上线添加订阅
if (device.getSubscribeCycleForCatalog() > 0 && !subscribeTaskRunner.containsKey(SubscribeTaskForCatalog.getKey(device))) {
@@ -361,20 +369,21 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
long expiresTime = Math.min(device.getExpires(), device.getHeartBeatInterval() * device.getHeartBeatCount()) * 1000L;
if (deviceStatusTaskRunner.containsKey(device.getDeviceId())) {
if (sipTransactionInfo == null) {
deviceStatusTaskRunner.updateDelay(device.getDeviceId(), System.currentTimeMillis() + expiresTime);
deviceStatusTaskRunner.updateDelay(device.getDeviceId(), expiresTime + System.currentTimeMillis());
}else {
deviceStatusTaskRunner.removeTask(device.getDeviceId());
DeviceStatusTask task = DeviceStatusTask.getInstance(device.getDeviceId(), sipTransactionInfo, expiresTime, this::deviceStatusExpire);
DeviceStatusTask task = DeviceStatusTask.getInstance(device.getDeviceId(), sipTransactionInfo, expiresTime + System.currentTimeMillis(), this::deviceStatusExpire);
deviceStatusTaskRunner.addTask(task);
}
}else {
DeviceStatusTask task = DeviceStatusTask.getInstance(device.getDeviceId(), sipTransactionInfo, expiresTime, this::deviceStatusExpire);
DeviceStatusTask task = DeviceStatusTask.getInstance(device.getDeviceId(), sipTransactionInfo, expiresTime + System.currentTimeMillis(), this::deviceStatusExpire);
deviceStatusTaskRunner.addTask(task);
}
}
@Override
@Transactional
public void offline(String deviceId, String reason) {
Device device = getDeviceByDeviceIdFromDb(deviceId);
if (device == null) {
@@ -382,12 +391,14 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
return;
}
// 主动查询设备状态
Boolean deviceStatus = getDeviceStatus(device);
if (deviceStatus != null && deviceStatus) {
log.info("[设备离线] 主动探测发现设备在线,暂不处理 device{}", deviceId);
online(device, null);
return;
// 主动查询设备状态, 没有HostAddress无法发送请求可能是手动添加的设备
if (device.getHostAddress() != null) {
Boolean deviceStatus = getDeviceStatus(device);
if (deviceStatus != null && deviceStatus) {
log.info("[设备离线] 主动探测发现设备在线,暂不处理 device{}", deviceId);
online(device, null);
return;
}
}
log.info("[设备离线] {}, device{} 心跳间隔: {},心跳超时次数: {} 上次心跳时间:{} 上次注册时间: {}", reason, deviceId,
device.getHeartBeatInterval(), device.getHeartBeatCount(), device.getKeepaliveTime(), device.getRegisterTime());
@@ -395,18 +406,45 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
cleanOfflineDevice(device);
redisCatchStorage.updateDevice(device);
deviceMapper.update(device);
if (userSetting.getDeviceStatusNotify()) {
// 发送redis消息
redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), null, false);
}
if (isDevice(deviceId)) {
channelOfflineByDevice(device);
}
}
private void channelOfflineByDevice(Device device) {
// 进行通道离线
List<CommonGBChannel> channelList = commonGBChannelMapper.queryOnlineListsByGbDeviceId(device.getId());
if (channelList.isEmpty()) {
return;
}
deviceChannelMapper.offlineByDeviceId(device.getId());
// 发送通道离线通知
eventPublisher.catalogEventPublish(null, channelList, CatalogEvent.OFF);
}
private boolean isDevice(String deviceId) {
GbCode decode = GbCode.decode(deviceId);
if (decode == null) {
return true;
}
int code = Integer.parseInt(decode.getTypeCode());
return code <= 199;
}
// 订阅丢失检查
@Scheduled(fixedDelay = 10, timeUnit = TimeUnit.SECONDS)
public void lostCheck(){
public void lostCheckForSubscribe(){
// 获取所有设备
List<Device> deviceList = redisCatchStorage.getAllDevices();
if (deviceList.isEmpty()) {
return;
}
for (Device device : deviceList) {
if (device == null || !device.isOnLine() || !device.getServerId().equals(userSetting.getServerId())) {
if (device == null || !device.isOnLine() || !userSetting.getServerId().equals(device.getServerId())) {
continue;
}
if (device.getSubscribeCycleForCatalog() > 0 && !subscribeTaskRunner.containsKey(SubscribeTaskForCatalog.getKey(device))) {
@@ -420,6 +458,25 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
}
}
// 设备状态丢失检查
@Scheduled(fixedDelay = 30, timeUnit = TimeUnit.SECONDS)
public void lostCheckForStatus(){
// 获取所有设备
List<Device> deviceList = redisCatchStorage.getAllDevices();
if (deviceList.isEmpty()) {
return;
}
for (Device device : deviceList) {
if (device == null || !device.isOnLine() || !userSetting.getServerId().equals(device.getServerId())) {
continue;
}
if (!deviceStatusTaskRunner.containsKey(device.getDeviceId())) {
log.debug("[状态丢失] 执行设备离线, 编号: {},", device.getDeviceId());
offline(device.getDeviceId(), "");
}
}
}
private void catalogSubscribeExpire(String deviceId, SipTransactionInfo transactionInfo) {
log.info("[目录订阅] 到期, 编号: {}", deviceId);
Device device = getDeviceByDeviceId(deviceId);
@@ -862,7 +919,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
@Override
public WVPResult<SyncStatus> devicesSync(Device device) {
if (!userSetting.getServerId().equals(device.getServerId())) {
if (device.getServerId() != null && !userSetting.getServerId().equals(device.getServerId())) {
return redisRpcService.devicesSync(device.getServerId(), device.getDeviceId());
}
// 已存在则返回进度

View File

@@ -28,7 +28,7 @@ public class DeviceStatusTask implements Delayed {
DeviceStatusTask deviceStatusTask = new DeviceStatusTask();
deviceStatusTask.setDeviceId(deviceId);
deviceStatusTask.setTransactionInfo(transactionInfo);
deviceStatusTask.setDelayTime((delayTime * 1000L - 500L) + System.currentTimeMillis());
deviceStatusTask.setDelayTime(delayTime);
deviceStatusTask.setCallback(callback);
return deviceStatusTask;
}

View File

@@ -93,7 +93,7 @@ public class DeviceStatusTaskRunner {
if (task == null) {
return false;
}
log.info("[更新状态任务时间] 编号: {}", key);
log.debug("[更新状态任务时间] 编号: {}", key);
if (delayQueue.contains(task)) {
boolean remove = delayQueue.remove(task);
if (!remove) {

View File

@@ -49,7 +49,7 @@ public class SIPRequestHeaderProvider {
// sipuri
SipURI requestURI = SipFactory.getInstance().createAddressFactory().createSipURI(device.getDeviceId(), device.getHostAddress());
// via
ArrayList<ViaHeader> viaHeaders = new ArrayList<ViaHeader>();
ArrayList<ViaHeader> viaHeaders = new ArrayList<>();
ViaHeader viaHeader = SipFactory.getInstance().createHeaderFactory().createViaHeader(sipLayer.getLocalIp(device.getLocalIp()), sipConfig.getPort(), device.getTransport(), viaTag);
viaHeader.setRPort();
viaHeaders.add(viaHeader);

View File

@@ -69,8 +69,9 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
responseAck(request, Response.BAD_REQUEST);
return;
}
String platformId = SipUtils.getUserIdFromFromHeader(request);
String cmd = XmlUtil.getText(rootElement, "CmdType");
log.info("[收到订阅请求] 类型: {}", cmd);
log.info("[收到订阅请求] 类型: {}, 来自: {}", cmd, platformId);
if (CmdType.MOBILE_POSITION.equals(cmd)) {
processNotifyMobilePosition(request, rootElement);
// } else if (CmdType.ALARM.equals(cmd)) {

View File

@@ -102,7 +102,7 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp
SIPRequest request = (SIPRequest) evt.getRequest();
RemoteAddressInfo remoteAddressInfo = SipUtils.getRemoteAddressFromRequest(request, userSetting.getSipUseSourceIpAsRemoteAddress());
if (!device.getIp().equalsIgnoreCase(remoteAddressInfo.getIp()) || device.getPort() != remoteAddressInfo.getPort()) {
if (device.getIp() == null || !device.getIp().equalsIgnoreCase(remoteAddressInfo.getIp()) || device.getPort() != remoteAddressInfo.getPort()) {
log.info("[收到心跳] 地址变化, {}({}), {}:{}->{}", device.getName(), device.getDeviceId(), remoteAddressInfo.getIp(), remoteAddressInfo.getPort(), request.getLocalAddress().getHostAddress());
device.setPort(remoteAddressInfo.getPort());
device.setHostAddress(remoteAddressInfo.getIp().concat(":").concat(String.valueOf(remoteAddressInfo.getPort())));

View File

@@ -131,6 +131,7 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
Element itemDevice = deviceListIterator.next();
Element channelDeviceElement = itemDevice.element("DeviceID");
if (channelDeviceElement == null) {
// 总数减一, 避免最后总数不对 无法确定问题
continue;
}
// 从xml解析内容到 DeviceChannel 对象

View File

@@ -28,7 +28,7 @@ public class SendRtpServerServiceImpl implements ISendRtpServerService {
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
@Override
public SendRtpInfo createSendRtpInfo(MediaServer mediaServer, String ip, Integer port, String ssrc, String requesterId,
@@ -230,31 +230,27 @@ public class SendRtpServerServiceImpl implements ISendRtpServerService {
log.warn("{}获取redis连接信息失败", mediaServer.getId());
return -1;
}
return getSendPort(startPort, endPort, sendIndexKey, sendRtpSet);
}
private synchronized int getSendPort(int startPort, int endPort, String sendIndexKey, Set<Integer> sendRtpPortSet){
// TODO 这里改为只取偶数端口
RedisAtomicInteger redisAtomicInteger = new RedisAtomicInteger(sendIndexKey , redisTemplate.getConnectionFactory());
if (redisAtomicInteger.get() < startPort) {
redisAtomicInteger.set(startPort);
return startPort;
}else {
int port = redisAtomicInteger.getAndIncrement();
if (port > endPort) {
redisAtomicInteger.set(startPort);
if (sendRtpPortSet.contains(startPort)) {
return getSendPort(startPort, endPort, sendIndexKey, sendRtpPortSet);
}else {
return startPort;
for (int i = 0; i < endPort - startPort; i++) {
int port = redisAtomicInteger.getAndIncrement();
if (port > endPort) {
redisAtomicInteger.set(startPort);
if (sendRtpSet.contains(startPort)) {
continue;
}else {
return startPort;
}
}
if (!sendRtpSet.contains(port)) {
return port;
}
}
if (sendRtpPortSet.contains(port)) {
return getSendPort(startPort, endPort, sendIndexKey, sendRtpPortSet);
}else {
return port;
}
}
log.warn("{}获取发送端口失败, 无可用端口", mediaServer.getId());
return -1;
}
}

View File

@@ -62,6 +62,11 @@ public class RedisRpcDeviceController extends RpcController {
response.setBody("param error");
return response;
}
if (device.getRegisterTime() == null) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("设备尚未注册过");
return response;
}
WVPResult<SyncStatus> result = deviceService.devicesSync(device);
response.setStatusCode(ErrorCode.SUCCESS.getCode());
response.setBody(JSONObject.toJSONString(result));

View File

@@ -74,7 +74,7 @@ public class RedisRpcPlatformController extends RpcController {
List<CommonGBChannel> channels = jsonObject.getJSONArray("channels").toJavaList(CommonGBChannel.class);
String type = jsonObject.getString("type");
eventPublisher.catalogEventPublish(platform, channels, type, false);
eventPublisher.catalogEventPublish(platform, channels, type);
RedisRpcResponse response = request.getResponse();
response.setStatusCode(ErrorCode.SUCCESS.getCode());
return response;