修复集群播放BUG

This commit is contained in:
lin
2025-05-23 17:26:17 +08:00
parent d825b986dd
commit 55f36f660b
21 changed files with 176 additions and 144 deletions

View File

@@ -98,7 +98,7 @@ public class SubscribeHolder {
for (Platform platform : platformList) {
String key = String.format("%s_%s_%s_%s", prefix, userSetting.getServerId(), "catalog", platform.getServerGBId());
if (redisTemplate.hasKey(key)) {
result.add(platform.getServerId());
result.add(platform.getServerGBId());
}
}
return result;
@@ -112,7 +112,7 @@ public class SubscribeHolder {
for (Platform platform : platformList) {
String key = String.format("%s_%s_%s_%s", prefix, userSetting.getServerId(), "mobilePosition", platform.getServerGBId());
if (redisTemplate.hasKey(key)) {
result.add(platform.getServerId());
result.add(platform.getServerGBId());
}
}
return result;

View File

@@ -89,8 +89,8 @@ public interface PlatformMapper {
@Select("SELECT * FROM wvp_platform WHERE id=#{id}")
Platform query(int id);
@Update("UPDATE wvp_platform SET status=#{online} WHERE id=#{id}" )
int updateStatus(@Param("id") int id, @Param("online") boolean online);
@Update("UPDATE wvp_platform SET status=#{online}, server_id = #{serverId} WHERE id=#{id}" )
int updateStatus(@Param("id") int id, @Param("online") boolean online, @Param("serverId") String serverId);
@Select("SELECT server_id FROM wvp_platform WHERE enable=true and server_id != #{serverId} group by server_id")
List<String> queryServerIdsWithEnableAndNotInServer(@Param("serverId") String serverId);
@@ -104,7 +104,7 @@ public interface PlatformMapper {
@Select("SELECT * FROM wvp_platform WHERE enable=true and server_id = #{serverId}")
List<Platform> queryServerIdsWithEnableAndServer(@Param("serverId") String serverId);
@Update("UPDATE wvp_platform SET status=false" )
void offlineAll();
@Update("UPDATE wvp_platform SET status=false where server_id = #{serverId}" )
void offlineAll(@Param("serverId") String serverId);
}

View File

@@ -12,6 +12,7 @@ import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerOfflineEvent;
import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerOnlineEvent;
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;
@@ -21,11 +22,12 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
/**
* @description:Event事件通知推送器支持推送在线事件、离线事件
* @author: swwheihei
* @date: 2020年5月6日 上午11:30:50
* @date: 2020年5月6日 上午11:30:50
*/
@Slf4j
@Component
public class EventPublisher {
@@ -72,12 +74,7 @@ public class EventPublisher {
}
public void catalogEventPublish(Platform platform, List<CommonGBChannel> deviceChannels, String type, boolean share) {
if (platform != null && !userSetting.getServerId().equals(platform.getServerId())) {
// 指定了上级平台的推送,则发送到指定的设备,未指定的则全部发送, 接收后各自处理自己的
CatalogEvent outEvent = new CatalogEvent(this);
outEvent.setChannels(deviceChannels);
outEvent.setType(type);
outEvent.setPlatform(platform);
redisRpcService.catalogEventPublish(platform.getServerId(), outEvent);
log.info("[国标级联] 目录状态推送, 此上级平台由其他服务处理,消息已经忽略");
return;
}
CatalogEvent outEvent = new CatalogEvent(this);
@@ -96,12 +93,11 @@ public class EventPublisher {
}
outEvent.setChannels(channels);
outEvent.setType(type);
outEvent.setPlatform(platform);
applicationEventPublisher.publishEvent(outEvent);
if (platform == null && share) {
// 如果没指定上级平台则推送消息到所有在线的wvp处理自己含有的平台的目录更新
redisRpcService.catalogEventPublish(null, outEvent);
if (platform != null) {
outEvent.setPlatform(platform);
}
applicationEventPublisher.publishEvent(outEvent);
}
public void mobilePositionEventPublish(MobilePosition mobilePosition) {

View File

@@ -1,5 +1,6 @@
package com.genersoft.iot.vmp.gb28181.event.subscribe.catalog;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder;
@@ -39,25 +40,30 @@ public class CatalogEventLister implements ApplicationListener<CatalogEvent> {
@Autowired
private SubscribeHolder subscribeHolder;
@Autowired
private UserSetting userSetting;
@Override
public void onApplicationEvent(CatalogEvent event) {
SubscribeInfo subscribe = null;
Platform parentPlatform = null;
Map<String, List<Platform>> parentPlatformMap = new HashMap<>();
log.info("[Catalog事件: {}] 通道数量: {}", event.getType(), event.getChannels().size());
Map<String, List<Platform>> platformMap = new HashMap<>();
Map<String, CommonGBChannel> channelMap = new HashMap<>();
if (event.getPlatform() != null) {
parentPlatform = event.getPlatform();
if (parentPlatform.getServerGBId() == null) {
log.info("[Catalog事件: {}] 平台服务国标编码未找到", event.getType());
return;
}
subscribe = subscribeHolder.getCatalogSubscribe(parentPlatform.getServerGBId());
if (subscribe == null) {
log.info("[Catalog事件: {}] 未订阅目录事件", event.getType());
return;
}
}else {
List<Platform> allPlatform = platformService.queryAll();
List<Platform> allPlatform = platformService.queryAll(userSetting.getServerId());
// 获取所用订阅
List<String> platforms = subscribeHolder.getAllCatalogSubscribePlatform(allPlatform);
if (event.getChannels() != null) {
@@ -65,10 +71,14 @@ public class CatalogEventLister implements ApplicationListener<CatalogEvent> {
for (CommonGBChannel deviceChannel : event.getChannels()) {
List<Platform> parentPlatformsForGB = platformChannelService.queryPlatFormListByChannelDeviceId(
deviceChannel.getGbId(), platforms);
parentPlatformMap.put(deviceChannel.getGbDeviceId(), parentPlatformsForGB);
platformMap.put(deviceChannel.getGbDeviceId(), parentPlatformsForGB);
channelMap.put(deviceChannel.getGbDeviceId(), deviceChannel);
}
}else {
log.info("[Catalog事件: {}] 未订阅目录事件", event.getType());
}
}else {
log.info("[Catalog事件: {}] 事件内通道数为0", event.getType());
}
}
switch (event.getType()) {
@@ -77,32 +87,32 @@ public class CatalogEventLister implements ApplicationListener<CatalogEvent> {
case CatalogEvent.DEL:
if (parentPlatform != null) {
List<CommonGBChannel> deviceChannelList = new ArrayList<>();
List<CommonGBChannel> channels = new ArrayList<>();
if (event.getChannels() != null) {
deviceChannelList.addAll(event.getChannels());
channels.addAll(event.getChannels());
}
if (!deviceChannelList.isEmpty()) {
log.info("[Catalog事件: {}]平台:{},影响通道{}个", event.getType(), parentPlatform.getServerGBId(), deviceChannelList.size());
if (!channels.isEmpty()) {
log.info("[Catalog事件: {}]平台:{},影响通道{}个", event.getType(), parentPlatform.getServerGBId(), channels.size());
try {
sipCommanderFroPlatform.sendNotifyForCatalogOther(event.getType(), parentPlatform, deviceChannelList, subscribe, null);
sipCommanderFroPlatform.sendNotifyForCatalogOther(event.getType(), parentPlatform, channels, subscribe, null);
} catch (InvalidArgumentException | ParseException | NoSuchFieldException | SipException |
IllegalAccessException e) {
log.error("[命令发送失败] 国标级联 Catalog通知: {}", e.getMessage());
}
}
}else if (!parentPlatformMap.keySet().isEmpty()) {
for (String gbId : parentPlatformMap.keySet()) {
List<Platform> parentPlatforms = parentPlatformMap.get(gbId);
if (parentPlatforms != null && !parentPlatforms.isEmpty()) {
for (Platform platform : parentPlatforms) {
}else if (!platformMap.keySet().isEmpty()) {
for (String serverGbId : platformMap.keySet()) {
List<Platform> platformList = platformMap.get(serverGbId);
if (platformList != null && !platformList.isEmpty()) {
for (Platform platform : platformList) {
SubscribeInfo subscribeInfo = subscribeHolder.getCatalogSubscribe(platform.getServerGBId());
if (subscribeInfo == null) {
continue;
}
log.info("[Catalog事件: {}]平台:{},影响通道{}", event.getType(), platform.getServerGBId(), gbId);
log.info("[Catalog事件: {}]平台:{},影响通道{}", event.getType(), platform.getServerGBId(), serverGbId);
List<CommonGBChannel> deviceChannelList = new ArrayList<>();
CommonGBChannel deviceChannel = new CommonGBChannel();
deviceChannel.setGbDeviceId(gbId);
deviceChannel.setGbDeviceId(serverGbId);
deviceChannelList.add(deviceChannel);
try {
sipCommanderFroPlatform.sendNotifyForCatalogOther(event.getType(), platform, deviceChannelList, subscribeInfo, null);
@@ -111,6 +121,8 @@ public class CatalogEventLister implements ApplicationListener<CatalogEvent> {
log.error("[命令发送失败] 国标级联 Catalog通知: {}", e.getMessage());
}
}
}else {
log.info("[Catalog事件: {}] 未找到上级平台: {}", event.getType(), serverGbId);
}
}
}
@@ -135,9 +147,9 @@ public class CatalogEventLister implements ApplicationListener<CatalogEvent> {
log.error("[命令发送失败] 国标级联 Catalog通知: {}", e.getMessage());
}
}
}else if (!parentPlatformMap.keySet().isEmpty()) {
for (String gbId : parentPlatformMap.keySet()) {
List<Platform> parentPlatforms = parentPlatformMap.get(gbId);
}else if (!platformMap.keySet().isEmpty()) {
for (String gbId : platformMap.keySet()) {
List<Platform> parentPlatforms = platformMap.get(gbId);
if (parentPlatforms != null && !parentPlatforms.isEmpty()) {
for (Platform platform : parentPlatforms) {
SubscribeInfo subscribeInfo = subscribeHolder.getCatalogSubscribe(platform.getServerGBId());

View File

@@ -1,5 +1,6 @@
package com.genersoft.iot.vmp.gb28181.event.subscribe.mobilePosition;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder;
@@ -37,12 +38,15 @@ public class MobilePositionEventLister implements ApplicationListener<MobilePosi
@Autowired
private SubscribeHolder subscribeHolder;
@Autowired
private UserSetting userSetting;
@Override
public void onApplicationEvent(MobilePositionEvent event) {
if (event.getMobilePosition().getChannelId() == 0) {
return;
}
List<Platform> allPlatforms = platformService.queryAll();
List<Platform> allPlatforms = platformService.queryAll(userSetting.getServerId());
// 获取所用订阅
List<String> platforms = subscribeHolder.getAllMobilePositionSubscribePlatform(allPlatforms);
if (platforms.isEmpty()) {

View File

@@ -80,6 +80,6 @@ public interface IPlatformService {
void delete(Integer platformId, CommonCallback<Object> callback);
List<Platform> queryAll();
List<Platform> queryAll(String serverId);
}

View File

@@ -192,6 +192,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
}
sync(device);
}else {
device.setServerId(userSetting.getServerId());
if(!device.isOnLine()){
device.setOnLine(true);
device.setCreateTime(now);
@@ -307,15 +308,15 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
return;
}
for (Device device : deviceList) {
if (device == null || !device.isOnLine()) {
if (device == null || !device.isOnLine() || !device.getServerId().equals(userSetting.getServerId())) {
continue;
}
if (device.getSubscribeCycleForCatalog() > 0 && !subscribeTaskRunner.containsKey(SubscribeTaskForCatalog.getKey(device))) {
log.info("[订阅丢失] 目录订阅, 编号: {}, 重新发起订阅", device.getDeviceId());
log.debug("[订阅丢失] 目录订阅, 编号: {}, 重新发起订阅", device.getDeviceId());
addCatalogSubscribe(device, null);
}
if (device.getSubscribeCycleForMobilePosition() > 0 && !subscribeTaskRunner.containsKey(SubscribeTaskForMobilPosition.getKey(device))) {
log.info("[订阅丢失] 移动位置订阅, 编号: {}, 重新发起订阅", device.getDeviceId());
log.debug("[订阅丢失] 移动位置订阅, 编号: {}, 重新发起订阅", device.getDeviceId());
addMobilePositionSubscribe(device, null);
}
}

View File

@@ -160,30 +160,26 @@ public class GbChannelServiceImpl implements IGbChannelService {
log.warn("[多个通道离线] 通道数量为0更新失败");
return 0;
}
List<CommonGBChannel> onlineChannelList = commonGBChannelMapper.queryInListByStatus(commonGBChannelList, "ON");
if (onlineChannelList.isEmpty()) {
log.info("[多个通道离线] 更新失败, 参数内通道已经离线, 无需更新");
return 0;
}
log.info("[通道离线] 共 {} 个", commonGBChannelList.size());
int limitCount = 1000;
int result = 0;
if (onlineChannelList.size() > limitCount) {
for (int i = 0; i < onlineChannelList.size(); i += limitCount) {
if (commonGBChannelList.size() > limitCount) {
for (int i = 0; i < commonGBChannelList.size(); i += limitCount) {
int toIndex = i + limitCount;
if (i + limitCount > onlineChannelList.size()) {
toIndex = onlineChannelList.size();
if (i + limitCount > commonGBChannelList.size()) {
toIndex = commonGBChannelList.size();
}
result += commonGBChannelMapper.updateStatusForListById(onlineChannelList.subList(i, toIndex), "OFF");
result += commonGBChannelMapper.updateStatusForListById(commonGBChannelList.subList(i, toIndex), "OFF");
}
} else {
result += commonGBChannelMapper.updateStatusForListById(onlineChannelList, "OFF");
result += commonGBChannelMapper.updateStatusForListById(commonGBChannelList, "OFF");
}
if (result > 0) {
try {
// 发送catalog
eventPublisher.catalogEventPublish(null, onlineChannelList, CatalogEvent.OFF);
eventPublisher.catalogEventPublish(null, commonGBChannelList, CatalogEvent.OFF);
} catch (Exception e) {
log.warn("[多个通道离线] 发送失败,数量:{}", onlineChannelList.size(), e);
log.warn("[多个通道离线] 发送失败,数量:{}", commonGBChannelList.size(), e);
}
}
return result;
@@ -214,32 +210,25 @@ public class GbChannelServiceImpl implements IGbChannelService {
log.warn("[多个通道上线] 通道数量为0更新失败");
return 0;
}
List<CommonGBChannel> offlineChannelList = commonGBChannelMapper.queryInListByStatus(commonGBChannelList, "OFF");
if (offlineChannelList.isEmpty()) {
log.warn("[多个通道上线] 更新失败, 参数内通道已经上线");
return 0;
}
// 批量更新
int limitCount = 1000;
int result = 0;
if (offlineChannelList.size() > limitCount) {
for (int i = 0; i < offlineChannelList.size(); i += limitCount) {
if (commonGBChannelList.size() > limitCount) {
for (int i = 0; i < commonGBChannelList.size(); i += limitCount) {
int toIndex = i + limitCount;
if (i + limitCount > offlineChannelList.size()) {
toIndex = offlineChannelList.size();
if (i + limitCount > commonGBChannelList.size()) {
toIndex = commonGBChannelList.size();
}
result += commonGBChannelMapper.updateStatusForListById(offlineChannelList.subList(i, toIndex), "ON");
result += commonGBChannelMapper.updateStatusForListById(commonGBChannelList.subList(i, toIndex), "ON");
}
} else {
result += commonGBChannelMapper.updateStatusForListById(offlineChannelList, "ON");
result += commonGBChannelMapper.updateStatusForListById(commonGBChannelList, "ON");
}
if (result > 0) {
try {
// 发送catalog
eventPublisher.catalogEventPublish(null, offlineChannelList, CatalogEvent.ON);
} catch (Exception e) {
log.warn("[多个通道上线] 发送失败,数量:{}", offlineChannelList.size(), e);
}
try {
// 发送catalog
eventPublisher.catalogEventPublish(null, commonGBChannelList, CatalogEvent.ON);
} catch (Exception e) {
log.warn("[多个通道上线] 发送失败,数量:{}", commonGBChannelList.size(), e);
}
return result;

View File

@@ -63,11 +63,6 @@ import java.util.concurrent.TimeUnit;
@Order(value=15)
public class PlatformServiceImpl implements IPlatformService, CommandLineRunner {
private final static String REGISTER_KEY_PREFIX = "platform_register_";
private final static String REGISTER_FAIL_AGAIN_KEY_PREFIX = "platform_register_fail_again_";
private final static String KEEPALIVE_KEY_PREFIX = "platform_keepalive_";
@Autowired
private PlatformMapper platformMapper;
@@ -133,7 +128,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
sendUnRegister(platform, taskInfo.getSipTransactionInfo());
}
// 启动时所有平台默认离线
platformMapper.offlineAll();
platformMapper.offlineAll(userSetting.getServerId());
}
@Scheduled(fixedDelay = 20, timeUnit = TimeUnit.SECONDS) //每3秒执行一次
public void statusLostCheck(){
@@ -199,6 +194,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
return;
}
log.info("[集群] 检测到 {} 已离线", serverId);
redisCatchStorage.removeOfflineWVPInfo(serverId);
String chooseServerId = redisCatchStorage.chooseOneServer(serverId);
if (!userSetting.getServerId().equals(chooseServerId)){
return;
@@ -390,8 +386,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
PlatformKeepaliveTask keepaliveTask = new PlatformKeepaliveTask(platform.getServerGBId(), platform.getKeepTimeout() * 1000L,
this::keepaliveExpire);
statusTaskRunner.addKeepAliveTask(keepaliveTask);
platformMapper.updateStatus(platform.getId(), true);
platformMapper.updateStatus(platform.getId(), true, userSetting.getServerId());
if (platform.getAutoPushChannel() != null && platform.getAutoPushChannel()) {
if (subscribeHolder.getCatalogSubscribe(platform.getServerGBId()) == null) {
@@ -481,7 +476,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
subscribeHolder.removeCatalogSubscribe(platform.getServerGBId());
subscribeHolder.removeMobilePositionSubscribe(platform.getServerGBId());
platformMapper.updateStatus(platform.getId(), false);
platformMapper.updateStatus(platform.getId(), false, userSetting.getServerId());
// 停止所有推流
log.info("[平台离线] {}({}), 停止所有推流", platform.getName(), platform.getServerGBId());
@@ -521,7 +516,6 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
gpsMsgInfo = null;
}
if (gpsMsgInfo == null && !userSetting.isSendPositionOnDemand()){
gpsMsgInfo = new GPSMsgInfo();
gpsMsgInfo.setId(channel.getGbDeviceId());
@@ -871,7 +865,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
}
@Override
public List<Platform> queryAll() {
return platformMapper.queryAll();
public List<Platform> queryAll(String serverId) {
return platformMapper.queryByServerId(serverId);
}
}

View File

@@ -339,7 +339,7 @@ public class PlayServiceImpl implements IPlayService {
InviteInfo inviteInfoInCatch = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, channel.getId());
if (inviteInfoInCatch != null ) {
if (inviteInfoInCatch.getStreamInfo() == null) {
// 释放生成的ssrc使用上一次申请的322
// 释放生成的ssrc使用上一次申请的
ssrcFactory.releaseSsrc(mediaServerItem.getId(), ssrc);
// 点播发起了但是尚未成功, 仅注册回调等待结果即可

View File

@@ -143,39 +143,44 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrc());
}
}
MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
if (mediaServer != null) {
AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getChannelId());
if (audioBroadcastCatch != null && audioBroadcastCatch.getSipTransactionInfo().getCallId().equals(callIdHeader.getCallId())) {
// 来自上级平台的停止对讲
log.info("[停止对讲] 来自上级平台{}, 通道:{}", sendRtpItem.getTargetId(), sendRtpItem.getChannelId());
audioBroadcastManager.del(sendRtpItem.getChannelId());
}
if (sendRtpItem.getServerId().equals(userSetting.getServerId())) {
MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
if (mediaServer != null) {
AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getChannelId());
if (audioBroadcastCatch != null && audioBroadcastCatch.getSipTransactionInfo().getCallId().equals(callIdHeader.getCallId())) {
// 来自上级平台的停止对讲
log.info("[停止对讲] 来自上级,平台:{}, 通道:{}", sendRtpItem.getTargetId(), sendRtpItem.getChannelId());
audioBroadcastManager.del(sendRtpItem.getChannelId());
}
MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, sendRtpItem.getApp(), streamId);
MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, sendRtpItem.getApp(), streamId);
if (mediaInfo.getReaderCount() <= 0) {
log.info("[收到bye] {} 无其它观看者,通知设备停止推流", streamId);
if (sendRtpItem.getPlayType().equals(InviteStreamType.PLAY)) {
Device device = deviceService.getDeviceByDeviceId(sendRtpItem.getTargetId());
if (device == null) {
log.info("[收到bye] {} 通知设备停止推流时未找到设备信息", streamId);
return;
}
DeviceChannel deviceChannel = deviceChannelService.getOneForSourceById(sendRtpItem.getChannelId());
if (deviceChannel == null) {
log.info("[收到bye] {} 通知设备停止推流时未找到通道信息", streamId);
return;
}
try {
log.info("[停止点播] {}/{}", sendRtpItem.getTargetId(), sendRtpItem.getChannelId());
cmder.streamByeCmd(device, deviceChannel.getDeviceId(), sendRtpItem.getApp(), sendRtpItem.getStream(), null, null);
} catch (InvalidArgumentException | ParseException | SipException |
SsrcTransactionNotFoundException e) {
log.error("[收到bye] {} 无其它观看者,通知设备停止推流, 发送BYE失败 {}",streamId, e.getMessage());
if (mediaInfo != null && mediaInfo.getReaderCount() <= 0) {
log.info("[收到bye] {} 无其它观看者,通知设备停止推流", streamId);
if (sendRtpItem.getPlayType().equals(InviteStreamType.PLAY)) {
Device device = deviceService.getDeviceByDeviceId(sendRtpItem.getTargetId());
if (device == null) {
log.info("[收到bye] {} 通知设备停止推流时未找到设备信息", streamId);
return;
}
DeviceChannel deviceChannel = deviceChannelService.getOneForSourceById(sendRtpItem.getChannelId());
if (deviceChannel == null) {
log.info("[收到bye] {} 通知设备停止推流时未找到通道信息", streamId);
return;
}
try {
log.info("[停止点播] {}/{}", sendRtpItem.getTargetId(), sendRtpItem.getChannelId());
cmder.streamByeCmd(device, deviceChannel.getDeviceId(), sendRtpItem.getApp(), sendRtpItem.getStream(), null, null);
} catch (InvalidArgumentException | ParseException | SipException |
SsrcTransactionNotFoundException e) {
log.error("[收到bye] {} 无其它观看者,通知设备停止推流, 发送BYE失败 {}",streamId, e.getMessage());
}
}
}
}
} else {
// TODO 流再其他wvp上时应该通知这个wvp停止推流和发送BYE
}
}
// 可能是设备发送的停止

View File

@@ -26,7 +26,7 @@ public interface IRedisRpcPlayService {
String frontEndCommand(String serverId, Integer channelId, int cmdCode, int parameter1, int parameter2, int combindCode2);
void playPush(Integer id, ErrorCallback<StreamInfo> callback);
void playPush(String serverId, Integer id, ErrorCallback<StreamInfo> callback);
StreamInfo playProxy(String serverId, int id);

View File

@@ -93,11 +93,11 @@ public class RedisAlarmMsgListener implements MessageListener {
log.warn("[REDIS的ALARM通知]消息解析失败");
continue;
}
String gbId = alarmChannelMessage.getGbId();
String chanelId = alarmChannelMessage.getGbId();
DeviceAlarm deviceAlarm = new DeviceAlarm();
deviceAlarm.setCreateTime(DateUtil.getNow());
deviceAlarm.setChannelId(gbId);
deviceAlarm.setChannelId(chanelId);
deviceAlarm.setAlarmDescription(alarmChannelMessage.getAlarmDescription());
deviceAlarm.setAlarmMethod("" + alarmChannelMessage.getAlarmSn());
deviceAlarm.setAlarmType("" + alarmChannelMessage.getAlarmType());
@@ -106,7 +106,7 @@ public class RedisAlarmMsgListener implements MessageListener {
deviceAlarm.setLongitude(0);
deviceAlarm.setLatitude(0);
if (ObjectUtils.isEmpty(gbId)) {
if (ObjectUtils.isEmpty(chanelId)) {
if (userSetting.getSendToPlatformsWhenIdLost()) {
// 发送给所有的上级
List<Platform> parentPlatforms = platformService.queryEnablePlatformList(userSetting.getServerId());
@@ -148,24 +148,26 @@ public class RedisAlarmMsgListener implements MessageListener {
}
} else {
// 获取该通道ID是属于设备还是对应的上级平台
Device device = deviceService.getDeviceBySourceChannelDeviceId(gbId);
List<Platform> platforms = platformChannelService.queryByPlatformBySharChannelId(gbId);
if (device != null && (platforms == null || platforms.isEmpty())) {
Device device = deviceService.getDeviceBySourceChannelDeviceId(chanelId);
List<Platform> platforms = platformChannelService.queryByPlatformBySharChannelId(chanelId);
if (device != null && device.getServerId().equals(userSetting.getServerId()) && (platforms == null || platforms.isEmpty())) {
try {
commander.sendAlarmMessage(device, deviceAlarm);
} catch (InvalidArgumentException | SipException | ParseException e) {
log.error("[命令发送失败] 发送报警: {}", e.getMessage());
}
} else if (device == null && (platforms != null && !platforms.isEmpty())) {
} else if (device == null && (platforms != null && !platforms.isEmpty() )) {
for (Platform platform : platforms) {
try {
commanderForPlatform.sendAlarmMessage(platform, deviceAlarm);
} catch (InvalidArgumentException | SipException | ParseException e) {
log.error("[命令发送失败] 发送报警: {}", e.getMessage());
if (platform.getServerId().equals(userSetting.getServerId())) {
try {
commanderForPlatform.sendAlarmMessage(platform, deviceAlarm);
} catch (InvalidArgumentException | SipException | ParseException e) {
log.error("[命令发送失败] 发送报警: {}", e.getMessage());
}
}
}
} else {
log.warn("[REDIS的ALARM通知] 未查询到" + gbId + "所属的平台或设备");
log.warn("[REDIS的ALARM通知] 未查询到" + chanelId + "所属的平台或设备");
}
}
} catch (Exception e) {

View File

@@ -48,7 +48,7 @@ public class RedisPushStreamStatusMsgListener implements MessageListener, Applic
@Override
public void onMessage(Message message, byte[] bytes) {
log.info("[REDIS: 流设备状态变化] {}", new String(message.getBody()));
log.info("[REDIS: 流设备状态变化] {}", new String(message.getBody()));
taskQueue.offer(message);
}
@@ -84,11 +84,13 @@ public class RedisPushStreamStatusMsgListener implements MessageListener, Applic
if (streamStatusMessage.getOfflineStreams() != null
&& !streamStatusMessage.getOfflineStreams().isEmpty()) {
// 更新部分设备离线
log.info("[REDIS: 推流设备状态变化] 更新部分设备离线: {}个", streamStatusMessage.getOfflineStreams().size());
streamPushService.offline(streamStatusMessage.getOfflineStreams());
}
if (streamStatusMessage.getOnlineStreams() != null &&
!streamStatusMessage.getOnlineStreams().isEmpty()) {
// 更新部分设备上线
log.info("[REDIS: 推流设备状态变化] 更新部分设备上线: {}个", streamStatusMessage.getOnlineStreams().size());
streamPushService.online(streamStatusMessage.getOnlineStreams());
}
} catch (Exception e) {

View File

@@ -193,8 +193,9 @@ public class RedisRpcPlayServiceImpl implements IRedisRpcPlayService {
}
@Override
public void playPush(Integer id, ErrorCallback<StreamInfo> callback) {
RedisRpcRequest request = buildRequest("streamPush/play", id);
public void playPush(String serverId, Integer id, ErrorCallback<StreamInfo> callback) {
RedisRpcRequest request = buildRequest("streamPush/play", id + "");
request.setToId(serverId);
RedisRpcResponse response = redisRpcConfig.request(request, userSetting.getPlayTimeout(), TimeUnit.SECONDS);
if (response == null) {
callback.run(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg(), null);

View File

@@ -27,6 +27,8 @@ public interface IRedisCatchStorage {
*/
void updateWVPInfo(ServerInfo serverInfo, int time);
void removeOfflineWVPInfo(String serverId);
/**
* 发送推流生成与推流消失消息
* @param jsonObject 消息内容

View File

@@ -84,6 +84,13 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
redisTemplate.opsForZSet().add(setKey, userSetting.getServerId(), System.currentTimeMillis());
}
@Override
public void removeOfflineWVPInfo(String serverId) {
String setKey = VideoManagerConstants.WVP_SERVER_LIST;
// 首次设置就设置为0, 后续值越小说明越是最近启动的
redisTemplate.opsForZSet().remove(setKey, serverId);
}
@Override
public void sendStreamChangeMsg(String type, JSONObject jsonObject) {
String key = VideoManagerConstants.WVP_MSG_STREAM_CHANGE_PREFIX + type;

View File

@@ -91,7 +91,7 @@ public interface StreamPushMapper {
"(#{item.app}, #{item.stream}) " +
"</foreach>" +
")</script>")
List<StreamPush> getListFromRedis(List<StreamPushItemFromRedis> offlineStreams);
List<StreamPush> getListInList(List<StreamPushItemFromRedis> offlineStreams);
@Select("SELECT CONCAT(app,stream) from wvp_stream_push")

View File

@@ -58,7 +58,7 @@ public class StreamPushPlayServiceImpl implements IStreamPushPlayService {
Assert.notNull(streamPush, "推流信息未找到");
if (streamPush.isPushing() && !userSetting.getServerId().equals(streamPush.getServerId())) {
redisRpcPlayService.playPush(id, callback);
redisRpcPlayService.playPush(streamPush.getServerId(), id, callback);
return;
}

View File

@@ -458,16 +458,27 @@ public class StreamPushServiceImpl implements IStreamPushService {
@Override
public void offline(List<StreamPushItemFromRedis> offlineStreams) {
// 更新部分设备离线
List<StreamPush> streamPushList = streamPushMapper.getListFromRedis(offlineStreams);
List<StreamPush> streamPushList = streamPushMapper.getListInList(offlineStreams);
if (streamPushList.isEmpty()) {
log.info("[推流设备] 设备离线操作未发现可操作数据。");
return;
}
List<CommonGBChannel> commonGBChannelList = gbChannelService.queryListByStreamPushList(streamPushList);
gbChannelService.offline(commonGBChannelList);
}
@Override
public void online(List<StreamPushItemFromRedis> onlineStreams) {
if (onlineStreams.isEmpty()) {
log.info("[设备上线] 推流设备列表为空");
return;
}
// 更新部分设备上线streamPushService
List<StreamPush> streamPushList = streamPushMapper.getListFromRedis(onlineStreams);
List<StreamPush> streamPushList = streamPushMapper.getListInList(onlineStreams);
if (streamPushList.isEmpty()) {
for (StreamPushItemFromRedis onlineStream : onlineStreams) {
log.info("[设备上线] 未查询到这些通道: {}/{}", onlineStream.getApp(), onlineStream.getStream());
}
return;
}
List<CommonGBChannel> commonGBChannelList = gbChannelService.queryListByStreamPushList(streamPushList);

View File

@@ -89,15 +89,21 @@ Vue.prototype.$channelTypeList = {
new Vue({
beforeCreate: function () {
// 获取本平台的服务ID
axios({
method: 'get',
url: `/api/server/system/configInfo`,
}).then( (res)=> {
if (res.data.code === 0) {
Vue.prototype.$myServerId = res.data.data.addOn.serverId;
}
}).catch( (error)=> {
});
console.log("获取本平台的服务ID")
if (!this.$myServerId) {
axios({
method: 'get',
url: `/api/server/system/configInfo`,
}).then( (res)=> {
if (res.data.code === 0) {
console.log(res.data)
console.log("当前服务ID " + res.data.data.addOn.serverId)
Vue.prototype.$myServerId = res.data.data.addOn.serverId;
}
}).catch( (error)=> {
});
}
},
router: router,
render: h => h(App),