Merge remote-tracking branch 'origin/master' into 重构/1078

# Conflicts:
#	src/main/java/com/genersoft/iot/vmp/gb28181/dao/CommonGBChannelMapper.java
#	src/main/java/com/genersoft/iot/vmp/gb28181/dao/PlatformChannelMapper.java
#	src/main/java/com/genersoft/iot/vmp/gb28181/dao/provider/ChannelProvider.java
#	src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceChannelServiceImpl.java
#	数据库/2.7.3/初始化-mysql-2.7.3.sql
#	数据库/2.7.3/初始化-postgresql-kingbase-2.7.3.sql
This commit is contained in:
lin
2025-03-18 15:30:35 +08:00
231 changed files with 15701 additions and 4618 deletions

View File

@@ -2,12 +2,14 @@ package com.genersoft.iot.vmp.service.bean;
import com.genersoft.iot.vmp.media.event.media.MediaRecordMp4Event;
import com.genersoft.iot.vmp.utils.MediaServerUtils;
import lombok.Data;
import java.util.Map;
/**
* 云端录像数据
*/
@Data
public class CloudRecordItem {
/**
* 主键
@@ -79,6 +81,11 @@ public class CloudRecordItem {
*/
private long timeLen;
/**
* 所属服务ID
*/
private String serverId;
public static CloudRecordItem getInstance(MediaRecordMp4Event param) {
CloudRecordItem cloudRecordItem = new CloudRecordItem();
cloudRecordItem.setApp(param.getApp());
@@ -98,115 +105,4 @@ public class CloudRecordItem {
return cloudRecordItem;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getApp() {
return app;
}
public void setApp(String app) {
this.app = app;
}
public String getStream() {
return stream;
}
public void setStream(String stream) {
this.stream = stream;
}
public String getCallId() {
return callId;
}
public void setCallId(String callId) {
this.callId = callId;
}
public long getStartTime() {
return startTime;
}
public void setStartTime(long startTime) {
this.startTime = startTime;
}
public long getEndTime() {
return endTime;
}
public void setEndTime(long endTime) {
this.endTime = endTime;
}
public String getMediaServerId() {
return mediaServerId;
}
public void setMediaServerId(String mediaServerId) {
this.mediaServerId = mediaServerId;
}
public String getFileName() {
return fileName;
}
public void setFileName(String fileName) {
this.fileName = fileName;
}
public String getFilePath() {
return filePath;
}
public void setFilePath(String filePath) {
this.filePath = filePath;
}
public String getFolder() {
return folder;
}
public void setFolder(String folder) {
this.folder = folder;
}
public long getFileSize() {
return fileSize;
}
public void setFileSize(long fileSize) {
this.fileSize = fileSize;
}
public long getTimeLen() {
return timeLen;
}
public void setTimeLen(long timeLen) {
this.timeLen = timeLen;
}
public Boolean getCollect() {
return collect;
}
public void setCollect(Boolean collect) {
this.collect = collect;
}
public Boolean getReserve() {
return reserve;
}
public void setReserve(Boolean reserve) {
this.reserve = reserve;
}
}

View File

@@ -7,12 +7,14 @@ public class SSRCInfo {
private int port;
private String ssrc;
private String app;
private String Stream;
private String timeOutTaskKey;
public SSRCInfo(int port, String ssrc, String stream, String timeOutTaskKey) {
public SSRCInfo(int port, String ssrc, String app, String stream, String timeOutTaskKey) {
this.port = port;
this.ssrc = ssrc;
this.app = app;
this.Stream = stream;
this.timeOutTaskKey = timeOutTaskKey;
}

View File

@@ -2,7 +2,7 @@ package com.genersoft.iot.vmp.service.impl;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.service.ICloudRecordService;
import com.genersoft.iot.vmp.media.bean.MediaServer;
@@ -12,6 +12,7 @@ import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
import com.genersoft.iot.vmp.service.bean.CloudRecordItem;
import com.genersoft.iot.vmp.service.bean.DownloadFileInfo;
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcPlayService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.dao.CloudRecordServiceMapper;
import com.genersoft.iot.vmp.utils.CloudRecordUtils;
@@ -36,7 +37,6 @@ import java.util.Set;
@Slf4j
@Service
@DS("share")
public class CloudRecordServiceImpl implements ICloudRecordService {
@Autowired
@@ -51,8 +51,15 @@ public class CloudRecordServiceImpl implements ICloudRecordService {
@Autowired
private AssistRESTfulUtils assistRESTfulUtils;
@Autowired
private UserSetting userSetting;
@Autowired
private IRedisRpcPlayService redisRpcPlayService;
@Override
public PageInfo<CloudRecordItem> getList(int page, int count, String query, String app, String stream, String startTime, String endTime, List<MediaServer> mediaServerItems, String callId) {
public PageInfo<CloudRecordItem> getList(int page, int count, String query, String app, String stream, String startTime,
String endTime, List<MediaServer> mediaServerItems, String callId) {
// 开始时间和结束时间在数据库中都是以秒为单位的
Long startTimeStamp = null;
Long endTimeStamp = null;
@@ -109,6 +116,7 @@ public class CloudRecordServiceImpl implements ICloudRecordService {
@EventListener
public void onApplicationEvent(MediaRecordMp4Event event) {
CloudRecordItem cloudRecordItem = CloudRecordItem.getInstance(event);
cloudRecordItem.setServerId(userSetting.getServerId());
if (ObjectUtils.isEmpty(cloudRecordItem.getCallId())) {
StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(event.getApp(), event.getStream());
if (streamAuthorityInfo != null) {
@@ -237,6 +245,9 @@ public class CloudRecordServiceImpl implements ICloudRecordService {
if (recordItem == null) {
throw new ControllerException(ErrorCode.ERROR400.getCode(), "资源不存在");
}
if (!userSetting.getServerId().equals(recordItem.getServerId())) {
return redisRpcPlayService.getRecordPlayUrl(recordItem.getServerId(), recordId);
}
String filePath = recordItem.getFilePath();
MediaServer mediaServerItem = mediaServerService.getOne(recordItem.getMediaServerId());
return CloudRecordUtils.getDownloadFilePath(mediaServerItem, filePath);

View File

@@ -24,7 +24,7 @@ public class LogServiceImpl implements ILogService {
@Override
public List<LogFileInfo> queryList(String query, String startTime, String endTime) {
File logFile = getLogDir();
if (logFile == null && !logFile.exists()) {
if (logFile == null || !logFile.exists()) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "获取日志文件目录失败");
}
File[] files = logFile.listFiles();

View File

@@ -147,17 +147,17 @@ public class MediaServiceImpl implements IMediaService {
ResultForOnPublish result = new ResultForOnPublish();
result.setEnable_audio(true);
// 是否录像
if ("rtp".equals(app)) {
result.setEnable_mp4(userSetting.getRecordSip());
} else {
result.setEnable_mp4(userSetting.getRecordPushLive());
}
// 国标流
if ("rtp".equals(app)) {
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, stream);
if (inviteInfo != null) {
result.setEnable_mp4(inviteInfo.getRecord());
}else {
result.setEnable_mp4(userSetting.getRecordSip());
}
// 单端口模式下修改流 ID
if (!mediaServer.isRtpEnable() && inviteInfo == null) {
String ssrc = String.format("%010d", Long.parseLong(stream, 16));
@@ -170,7 +170,7 @@ public class MediaServiceImpl implements IMediaService {
}
// 设置音频信息及录制信息
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream(stream);
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream(app, stream);
if (ssrcTransaction != null ) {
// 为录制国标模拟一个鉴权信息, 方便后续写入录像文件时使用
@@ -191,9 +191,9 @@ public class MediaServiceImpl implements IMediaService {
if (ssrcTransaction.getType() == InviteSessionType.DOWNLOAD) {
// 获取录像的总时长,然后设置为这个视频的时长
InviteInfo inviteInfoForDownload = inviteStreamService.getInviteInfo(InviteSessionType.DOWNLOAD, channelId, stream);
if (inviteInfoForDownload != null && inviteInfoForDownload.getStreamInfo() != null) {
String startTime = inviteInfoForDownload.getStreamInfo().getStartTime();
String endTime = inviteInfoForDownload.getStreamInfo().getEndTime();
if (inviteInfoForDownload != null) {
String startTime = inviteInfoForDownload.getStartTime();
String endTime = inviteInfoForDownload.getEndTime();
long difference = DateUtil.getDifference(startTime, endTime) / 1000;
result.setMp4_max_second((int) difference);
result.setEnable_mp4(true);
@@ -208,8 +208,12 @@ public class MediaServiceImpl implements IMediaService {
}
} else if (app.equals("broadcast")) {
result.setEnable_audio(true);
result.setEnable_mp4(userSetting.getRecordSip());
} else if (app.equals("talk")) {
result.setEnable_audio(true);
result.setEnable_mp4(userSetting.getRecordSip());
}else {
result.setEnable_mp4(userSetting.getRecordPushLive());
}
if (app.equalsIgnoreCase("rtp")) {
String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_" + stream;

View File

@@ -66,7 +66,7 @@ public class RecordPlanServiceImpl implements IRecordPlanService {
return;
}
// 开启点播,
channelPlayService.play(channel, null, ((code, msg, streamInfo) -> {
channelPlayService.play(channel, null, true, ((code, msg, streamInfo) -> {
if (code == InviteErrorCode.SUCCESS.getCode() && streamInfo != null) {
log.info("[录像] 流离开时拉起需要录像的流, 开启成功, 通道ID: {}", channel.getGbId());
recordStreamMap.put(channel.getGbId(), streamInfo);
@@ -79,7 +79,6 @@ public class RecordPlanServiceImpl implements IRecordPlanService {
Map<Integer, StreamInfo> recordStreamMap = new HashMap<>();
// @Scheduled(cron = "0 */30 * * * *")
@Scheduled(fixedRate = 10, timeUnit = TimeUnit.MINUTES)
public void execution() {
log.info("[录制计划] 执行");
@@ -89,7 +88,8 @@ public class RecordPlanServiceImpl implements IRecordPlanService {
if (startChannelIdList.isEmpty()) {
// 当前没有录像任务, 如果存在旧的正在录像的就移除
if(!recordStreamMap.isEmpty()) {
stopStreams(recordStreamMap.keySet(), recordStreamMap);
Set<Integer> recordStreamSet = new HashSet<>(recordStreamMap.keySet());
stopStreams(recordStreamSet, recordStreamMap);
recordStreamMap.clear();
}
}else {
@@ -110,7 +110,7 @@ public class RecordPlanServiceImpl implements IRecordPlanService {
// 查找是否已经开启录像, 如果没有则开启录像
for (CommonGBChannel channel : channelList) {
// 开启点播,
channelPlayService.play(channel, null, ((code, msg, streamInfo) -> {
channelPlayService.play(channel, null, true, ((code, msg, streamInfo) -> {
if (code == InviteErrorCode.SUCCESS.getCode() && streamInfo != null) {
log.info("[录像] 开启成功, 通道ID: {}", channel.getGbId());
recordStreamMap.put(channel.getGbId(), streamInfo);
@@ -266,14 +266,14 @@ public class RecordPlanServiceImpl implements IRecordPlanService {
}
@Override
public PageInfo<CommonGBChannel> queryChannelList(int page, int count, String query, Integer channelType, Boolean online, Integer planId, Boolean hasLink) {
public PageInfo<CommonGBChannel> queryChannelList(int page, int count, String query, Integer dataType, Boolean online, Integer planId, Boolean hasLink) {
PageHelper.startPage(page, count);
if (query != null) {
query = query.replaceAll("/", "//")
.replaceAll("%", "/%")
.replaceAll("_", "/_");
}
List<CommonGBChannel> all = channelMapper.queryForRecordPlanForWebList(planId, query, channelType, online, hasLink);
List<CommonGBChannel> all = channelMapper.queryForRecordPlanForWebList(planId, query, dataType, online, hasLink);
return new PageInfo<>(all);
}

View File

@@ -1,6 +1,5 @@
package com.genersoft.iot.vmp.service.impl;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.genersoft.iot.vmp.service.IRoleService;
import com.genersoft.iot.vmp.storager.dao.RoleMapper;
import com.genersoft.iot.vmp.storager.dao.dto.Role;
@@ -10,7 +9,6 @@ import org.springframework.stereotype.Service;
import java.util.List;
@Service
@DS("master")
public class RoleServerImpl implements IRoleService {
@Autowired

View File

@@ -117,11 +117,11 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService {
// 设置流超时的定时任务
String timeOutTaskKey = UUID.randomUUID().toString();
SSRCInfo ssrcInfo = new SSRCInfo(rtpServerPort, ssrc, streamId, timeOutTaskKey);
SSRCInfo ssrcInfo = new SSRCInfo(rtpServerPort, ssrc, "rtp", streamId, timeOutTaskKey);
OpenRTPServerResult openRTPServerResult = new OpenRTPServerResult();
openRTPServerResult.setSsrcInfo(ssrcInfo);
Hook rtpHook = Hook.getInstance(HookType.on_media_arrival, "rtp", streamId, rtpServerParam.getMediaServerItem().getId());
Hook rtpHook = Hook.getInstance(HookType.on_media_arrival, ssrcInfo.getApp(), streamId, rtpServerParam.getMediaServerItem().getId());
dynamicTask.startDelay(timeOutTaskKey, () -> {
// 收流超时
// 释放ssrc

View File

@@ -1,6 +1,5 @@
package com.genersoft.iot.vmp.service.impl;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.genersoft.iot.vmp.service.IUserApiKeyService;
import com.genersoft.iot.vmp.storager.dao.UserApiKeyMapper;
import com.genersoft.iot.vmp.storager.dao.dto.UserApiKey;
@@ -15,7 +14,6 @@ import org.springframework.stereotype.Service;
import java.util.List;
@Service
@DS("master")
public class UserApiKeyServiceImpl implements IUserApiKeyService {
@Autowired

View File

@@ -1,6 +1,5 @@
package com.genersoft.iot.vmp.service.impl;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.genersoft.iot.vmp.service.IUserService;
import com.genersoft.iot.vmp.storager.dao.UserMapper;
import com.genersoft.iot.vmp.storager.dao.dto.User;
@@ -13,7 +12,6 @@ import org.springframework.util.DigestUtils;
import java.util.List;
@Service
@DS("master")
public class UserServiceImpl implements IUserService {
@Autowired

View File

@@ -0,0 +1,39 @@
package com.genersoft.iot.vmp.service.redisMsg;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.RecordInfo;
import com.genersoft.iot.vmp.service.bean.DownloadFileInfo;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult;
public interface IRedisRpcPlayService {
void play(String serverId, Integer channelId, ErrorCallback<StreamInfo> callback);
void stop(String serverId, InviteSessionType type, int channelId, String stream);
void playback(String serverId, Integer channelId, String startTime, String endTime, ErrorCallback<StreamInfo> callback);
void download(String serverId, Integer channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback<StreamInfo> callback);
void queryRecordInfo(String serverId, Integer channelId, String startTime, String endTime, ErrorCallback<RecordInfo> callback);
void pauseRtp(String serverId, String streamId);
void resumeRtp(String serverId, String streamId);
String frontEndCommand(String serverId, Integer channelId, int cmdCode, int parameter1, int parameter2, int combindCode2);
void playPush(Integer id, ErrorCallback<StreamInfo> callback);
StreamInfo playProxy(String serverId, int id);
void stopProxy(String serverId, int id);
DownloadFileInfo getRecordPlayUrl(String serverId, Integer recordId);
AudioBroadcastResult audioBroadcast(String serverId, String deviceId, String channelDeviceId, Boolean broadcastMode);
}

View File

@@ -2,7 +2,8 @@ package com.genersoft.iot.vmp.service.redisMsg;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
public interface IRedisRpcService {
@@ -23,4 +24,44 @@ public interface IRedisRpcService {
long onStreamOnlineEvent(String app, String stream, CommonCallback<StreamInfo> callback);
void unPushStreamOnlineEvent(String app, String stream);
void subscribeCatalog(int id, int cycle);
void subscribeMobilePosition(int id, int cycle, int interval);
boolean updatePlatform(String serverId, Platform platform);
void catalogEventPublish(String serverId, CatalogEvent catalogEvent);
WVPResult<SyncStatus> devicesSync(String serverId, String deviceId);
SyncStatus getChannelSyncStatus(String serverId, String deviceId);
WVPResult<String> deviceBasicConfig(String serverId, Device device, BasicParam basicParam);
WVPResult<String> deviceConfigQuery(String serverId, Device device, String channelId, String configType);
void teleboot(String serverId, Device device);
WVPResult<String> recordControl(String serverId, Device device, String channelId, String recordCmdStr);
WVPResult<String> guard(String serverId, Device device, String guardCmdStr);
WVPResult<String> resetAlarm(String serverId, Device device, String channelId, String alarmMethod, String alarmType);
void iFrame(String serverId, Device device, String channelId);
WVPResult<String> homePosition(String serverId, Device device, String channelId, Boolean enabled, Integer resetTime, Integer presetIndex);
void dragZoomIn(String serverId, Device device, String channelId, int length, int width, int midpointx, int midpointy, int lengthx, int lengthy);
void dragZoomOut(String serverId, Device device, String channelId, int length, int width, int midpointx, int midpointy, int lengthx, int lengthy);
WVPResult<String> deviceStatus(String serverId, Device device);
WVPResult<String> alarm(String serverId, Device device, String startPriority, String endPriority, String alarmMethod, String alarmType, String startTime, String endTime);
WVPResult<Object> deviceInfo(String serverId, Device device);
WVPResult<Object> queryPreset(String serverId, Device device, String channelId);
}

View File

@@ -105,7 +105,7 @@ public class RedisAlarmMsgListener implements MessageListener {
if (ObjectUtils.isEmpty(gbId)) {
if (userSetting.getSendToPlatformsWhenIdLost()) {
// 发送给所有的上级
List<Platform> parentPlatforms = platformService.queryEnablePlatformList();
List<Platform> parentPlatforms = platformService.queryEnablePlatformList(userSetting.getServerId());
if (!parentPlatforms.isEmpty()) {
for (Platform parentPlatform : parentPlatforms) {
try {

View File

@@ -0,0 +1,348 @@
package com.genersoft.iot.vmp.service.redisMsg.control;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.InviteMessageInfo;
import com.genersoft.iot.vmp.gb28181.bean.SyncStatus;
import com.genersoft.iot.vmp.gb28181.service.IGbChannelPlayService;
import com.genersoft.iot.vmp.gb28181.service.IGbChannelService;
import com.genersoft.iot.vmp.gb28181.service.IPTZService;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.service.redisMsg.dto.RedisRpcController;
import com.genersoft.iot.vmp.service.redisMsg.dto.RedisRpcMapping;
import com.genersoft.iot.vmp.service.redisMsg.dto.RpcController;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.sip.message.Response;
@Component
@Slf4j
@RedisRpcController("channel")
public class RedisRpcChannelPlayController extends RpcController {
@Autowired
private UserSetting userSetting;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
@Autowired
private IGbChannelService channelService;
@Autowired
private IGbChannelPlayService channelPlayService;
@Autowired
private IPTZService iptzService;
private void sendResponse(RedisRpcResponse response){
log.info("[redis-rpc] >> {}", response);
response.setToId(userSetting.getServerId());
RedisRpcMessage message = new RedisRpcMessage();
message.setResponse(response);
redisTemplate.convertAndSend(RedisRpcConfig.REDIS_REQUEST_CHANNEL_KEY, message);
}
/**
* 点播国标设备
*/
@RedisRpcMapping("play")
public RedisRpcResponse playChannel(RedisRpcRequest request) {
int channelId = Integer.parseInt(request.getParam().toString());
RedisRpcResponse response = request.getResponse();
if (channelId <= 0) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
// 获取对应的设备和通道信息
CommonGBChannel channel = channelService.getOne(channelId);
if (channel == null) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
InviteMessageInfo inviteInfo = new InviteMessageInfo();
inviteInfo.setSessionName("Play");
channelPlayService.start(channel, inviteInfo, null, (code, msg, data) ->{
if (code == InviteErrorCode.SUCCESS.getCode()) {
response.setStatusCode(ErrorCode.SUCCESS.getCode());
response.setBody(data);
}else {
response.setStatusCode(code);
}
// 手动发送结果
sendResponse(response);
});
return null;
}
/**
* 点播国标设备
*/
@RedisRpcMapping("queryRecordInfo")
public RedisRpcResponse queryRecordInfo(RedisRpcRequest request) {
JSONObject paramJson = JSONObject.parseObject(request.getParam().toString());
int channelId = paramJson.getIntValue("channelId");
String startTime = paramJson.getString("startTime");
String endTime = paramJson.getString("endTime");
RedisRpcResponse response = request.getResponse();
if (channelId <= 0) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
// 获取对应的设备和通道信息
CommonGBChannel channel = channelService.getOne(channelId);
if (channel == null) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
try {
channelService.queryRecordInfo(channel, startTime, endTime, (code, msg, data) ->{
if (code == InviteErrorCode.SUCCESS.getCode()) {
response.setStatusCode(code);
response.setBody(data);
}else {
response.setStatusCode(code);
}
// 手动发送结果
sendResponse(response);
});
}catch (ControllerException e) {
response.setStatusCode(ErrorCode.ERROR100.getCode());
response.setBody(e.getMessage());
}
return null;
}
/**
* 暂停录像回放
*/
@RedisRpcMapping("pauseRtp")
public RedisRpcResponse pauseRtp(RedisRpcRequest request) {
String streamId = request.getParam().toString();
RedisRpcResponse response = request.getResponse();
if (streamId == null) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
try {
channelPlayService.pauseRtp(streamId);
response.setStatusCode(ErrorCode.SUCCESS.getCode());
}catch (ControllerException e) {
response.setStatusCode(ErrorCode.ERROR100.getCode());
response.setBody(e.getMessage());
}
return response;
}
/**
* 恢复录像回放
*/
@RedisRpcMapping("resumeRtp")
public RedisRpcResponse resumeRtp(RedisRpcRequest request) {
String streamId = request.getParam().toString();
RedisRpcResponse response = request.getResponse();
if (streamId == null) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
try {
channelPlayService.resumeRtp(streamId);
response.setStatusCode(ErrorCode.SUCCESS.getCode());
}catch (ControllerException e) {
response.setStatusCode(ErrorCode.ERROR100.getCode());
response.setBody(e.getMessage());
}
return response;
}
/**
* 停止点播国标设备
*/
@RedisRpcMapping("stop")
public RedisRpcResponse stop(RedisRpcRequest request) {
JSONObject jsonObject = JSONObject.parseObject(request.getParam().toString());
RedisRpcResponse response = request.getResponse();
Integer channelId = jsonObject.getIntValue("channelId");
if (channelId == null || channelId <= 0) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
String stream = jsonObject.getString("stream");
InviteSessionType type = jsonObject.getObject("inviteSessionType", InviteSessionType.class);
// 获取对应的设备和通道信息
CommonGBChannel channel = channelService.getOne(channelId);
if (channel == null) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
try {
channelPlayService.stopPlay(type, channel, stream);
response.setStatusCode(ErrorCode.SUCCESS.getCode());
}catch (Exception e){
response.setStatusCode(Response.SERVER_INTERNAL_ERROR);
response.setBody(e.getMessage());
}
return response;
}
/**
* 录像回放国标设备
*/
@RedisRpcMapping("playback")
public RedisRpcResponse playbackChannel(RedisRpcRequest request) {
JSONObject paramJson = JSONObject.parseObject(request.getParam().toString());
int channelId = paramJson.getIntValue("channelId");
String startTime = paramJson.getString("startTime");
String endTime = paramJson.getString("endTime");
RedisRpcResponse response = request.getResponse();
if (channelId <= 0) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
// 获取对应的设备和通道信息
CommonGBChannel channel = channelService.getOne(channelId);
if (channel == null) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
InviteMessageInfo inviteInfo = new InviteMessageInfo();
inviteInfo.setSessionName("Playback");
inviteInfo.setStartTime(DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(startTime));
inviteInfo.setStopTime(DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(endTime));
channelPlayService.start(channel, inviteInfo, null, (code, msg, data) ->{
if (code == InviteErrorCode.SUCCESS.getCode()) {
response.setStatusCode(ErrorCode.SUCCESS.getCode());
response.setBody(data);
}else {
response.setStatusCode(code);
}
// 手动发送结果
sendResponse(response);
});
return null;
}
/**
* 录像回放国标设备
*/
@RedisRpcMapping("download")
public RedisRpcResponse downloadChannel(RedisRpcRequest request) {
JSONObject paramJson = JSONObject.parseObject(request.getParam().toString());
int channelId = paramJson.getIntValue("channelId");
String startTime = paramJson.getString("startTime");
String endTime = paramJson.getString("endTime");
int downloadSpeed = paramJson.getIntValue("downloadSpeed");
RedisRpcResponse response = request.getResponse();
if (channelId <= 0) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
// 获取对应的设备和通道信息
CommonGBChannel channel = channelService.getOne(channelId);
if (channel == null) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
InviteMessageInfo inviteInfo = new InviteMessageInfo();
inviteInfo.setSessionName("Download");
inviteInfo.setStartTime(DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(startTime));
inviteInfo.setStopTime(DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(endTime));
inviteInfo.setDownloadSpeed(downloadSpeed + "");
channelPlayService.start(channel, inviteInfo, null, (code, msg, data) ->{
if (code == InviteErrorCode.SUCCESS.getCode()) {
response.setStatusCode(ErrorCode.SUCCESS.getCode());
response.setBody(data);
}else {
response.setStatusCode(code);
}
// 手动发送结果
sendResponse(response);
});
return null;
}
/**
* 云台控制
*/
@RedisRpcMapping("ptz/frontEndCommand")
public RedisRpcResponse frontEndCommand(RedisRpcRequest request) {
JSONObject paramJson = JSONObject.parseObject(request.getParam().toString());
int channelId = paramJson.getIntValue("channelId");
int cmdCode = paramJson.getIntValue("cmdCode");
int parameter1 = paramJson.getIntValue("parameter1");
int parameter2 = paramJson.getIntValue("parameter2");
int combindCode2 = paramJson.getIntValue("combindCode2");
RedisRpcResponse response = request.getResponse();
if (channelId <= 0 || cmdCode < 0 || parameter1 < 0 || parameter2 < 0 || combindCode2 < 0) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
// 获取对应的设备和通道信息
CommonGBChannel channel = channelService.getOne(channelId);
if (channel == null) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
try {
iptzService.frontEndCommand(channel, cmdCode, parameter1, parameter2, combindCode2);
}catch (ControllerException e) {
response.setStatusCode(ErrorCode.ERROR100.getCode());
response.setBody(e.getMessage());
return response;
}
response.setStatusCode(ErrorCode.SUCCESS.getCode());
return response;
}
}

View File

@@ -0,0 +1,66 @@
package com.genersoft.iot.vmp.service.redisMsg.control;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
import com.genersoft.iot.vmp.gb28181.service.ICloudRecordService;
import com.genersoft.iot.vmp.service.bean.DownloadFileInfo;
import com.genersoft.iot.vmp.service.redisMsg.dto.RedisRpcController;
import com.genersoft.iot.vmp.service.redisMsg.dto.RedisRpcMapping;
import com.genersoft.iot.vmp.service.redisMsg.dto.RpcController;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
@Component
@Slf4j
@RedisRpcController("cloudRecord")
public class RedisRpcCloudRecordController extends RpcController {
@Autowired
private UserSetting userSetting;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
@Autowired
private ICloudRecordService cloudRecordService;
private void sendResponse(RedisRpcResponse response){
log.info("[redis-rpc] >> {}", response);
response.setToId(userSetting.getServerId());
RedisRpcMessage message = new RedisRpcMessage();
message.setResponse(response);
redisTemplate.convertAndSend(RedisRpcConfig.REDIS_REQUEST_CHANNEL_KEY, message);
}
/**
* 播放
*/
@RedisRpcMapping("play")
public RedisRpcResponse play(RedisRpcRequest request) {
int id = Integer.parseInt(request.getParam().toString());
RedisRpcResponse response = request.getResponse();
if (id <= 0) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
DownloadFileInfo downloadFileInfo = cloudRecordService.getPlayUrlPath(id);
if (downloadFileInfo == null) {
response.setStatusCode(ErrorCode.ERROR100.getCode());
response.setBody("get play url error");
return response;
}
response.setStatusCode(ErrorCode.SUCCESS.getCode());
response.setBody(JSONObject.toJSONString(downloadFileInfo));
return response;
}
}

View File

@@ -0,0 +1,498 @@
package com.genersoft.iot.vmp.service.redisMsg.control;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
import com.genersoft.iot.vmp.gb28181.bean.BasicParam;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.SyncStatus;
import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
import com.genersoft.iot.vmp.service.redisMsg.dto.RedisRpcController;
import com.genersoft.iot.vmp.service.redisMsg.dto.RedisRpcMapping;
import com.genersoft.iot.vmp.service.redisMsg.dto.RpcController;
import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyService;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
@Component
@Slf4j
@RedisRpcController("device")
public class RedisRpcDeviceController extends RpcController {
@Autowired
private UserSetting userSetting;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
@Autowired
private IDeviceService deviceService;
@Autowired
private IStreamProxyService streamProxyService;
private void sendResponse(RedisRpcResponse response){
log.info("[redis-rpc] >> {}", response);
response.setToId(userSetting.getServerId());
RedisRpcMessage message = new RedisRpcMessage();
message.setResponse(response);
redisTemplate.convertAndSend(RedisRpcConfig.REDIS_REQUEST_CHANNEL_KEY, message);
}
/**
* 通道同步
*/
@RedisRpcMapping("devicesSync")
public RedisRpcResponse devicesSync(RedisRpcRequest request) {
String deviceId = request.getParam().toString();
Device device = deviceService.getDeviceByDeviceId(deviceId);
RedisRpcResponse response = request.getResponse();
if (device == null || !userSetting.getServerId().equals(device.getServerId())) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
WVPResult<SyncStatus> result = deviceService.devicesSync(device);
response.setStatusCode(ErrorCode.SUCCESS.getCode());
response.setBody(JSONObject.toJSONString(result));
return response;
}
/**
* 获取通道同步状态
*/
@RedisRpcMapping("getChannelSyncStatus")
public RedisRpcResponse getChannelSyncStatus(RedisRpcRequest request) {
String deviceId = request.getParam().toString();
Device device = deviceService.getDeviceByDeviceId(deviceId);
RedisRpcResponse response = request.getResponse();
if (device == null || !userSetting.getServerId().equals(device.getServerId())) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
SyncStatus channelSyncStatus = deviceService.getChannelSyncStatus(deviceId);
response.setStatusCode(ErrorCode.SUCCESS.getCode());
response.setBody(JSONObject.toJSONString(channelSyncStatus));
return response;
}
@RedisRpcMapping("deviceBasicConfig")
public RedisRpcResponse deviceBasicConfig(RedisRpcRequest request) {
BasicParam basicParam = JSONObject.parseObject(request.getParam().toString(), BasicParam.class);
Device device = deviceService.getDeviceByDeviceId(basicParam.getDeviceId());
RedisRpcResponse response = request.getResponse();
if (device == null || !userSetting.getServerId().equals(device.getServerId())) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
deviceService.deviceBasicConfig(device, basicParam, (code, msg, data) -> {
response.setStatusCode(code);
response.setBody(new WVPResult<>(code, msg, data));
// 手动发送结果
sendResponse(response);
});
return null;
}
@RedisRpcMapping("deviceConfigQuery")
public RedisRpcResponse deviceConfigQuery(RedisRpcRequest request) {
JSONObject paramJson = JSONObject.parseObject(request.getParam().toString());
String deviceId = paramJson.getString("deviceId");
String channelId = paramJson.getString("channelId");
String configType = paramJson.getString("configType");
Device device = deviceService.getDeviceByDeviceId(deviceId);
RedisRpcResponse response = request.getResponse();
if (device == null || !userSetting.getServerId().equals(device.getServerId())) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
deviceService.deviceConfigQuery(device, channelId, configType, (code, msg, data) -> {
response.setStatusCode(code);
response.setBody(new WVPResult<>(code, msg, data));
// 手动发送结果
sendResponse(response);
});
return null;
}
@RedisRpcMapping("teleboot")
public RedisRpcResponse teleboot(RedisRpcRequest request) {
String deviceId = request.getParam().toString();
Device device = deviceService.getDeviceByDeviceId(deviceId);
RedisRpcResponse response = request.getResponse();
if (device == null || !userSetting.getServerId().equals(device.getServerId())) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
try {
deviceService.teleboot(device);
}catch (ControllerException e) {
response.setStatusCode(e.getCode());
response.setBody(WVPResult.fail(ErrorCode.ERROR100.getCode(), e.getMsg()));
return response;
}
response.setStatusCode(ErrorCode.SUCCESS.getCode());
response.setBody(WVPResult.success());
return response;
}
@RedisRpcMapping("record")
public RedisRpcResponse record(RedisRpcRequest request) {
JSONObject paramJson = JSONObject.parseObject(request.getParam().toString());
String deviceId = paramJson.getString("deviceId");
String channelId = paramJson.getString("channelId");
String recordCmdStr = paramJson.getString("recordCmdStr");
Device device = deviceService.getDeviceByDeviceId(deviceId);
RedisRpcResponse response = request.getResponse();
if (device == null || !userSetting.getServerId().equals(device.getServerId())) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
try {
deviceService.record(device, channelId, recordCmdStr, (code, msg, data) -> {
response.setStatusCode(ErrorCode.SUCCESS.getCode());
response.setBody(new WVPResult<>(code, msg, data));
// 手动发送结果
sendResponse(response);
});
}catch (ControllerException e) {
response.setStatusCode(e.getCode());
response.setBody(WVPResult.fail(ErrorCode.ERROR100.getCode(), e.getMsg()));
sendResponse(response);
}
return null;
}
@RedisRpcMapping("guard")
public RedisRpcResponse guard(RedisRpcRequest request) {
JSONObject paramJson = JSONObject.parseObject(request.getParam().toString());
String deviceId = paramJson.getString("deviceId");
String guardCmdStr = paramJson.getString("guardCmdStr");
Device device = deviceService.getDeviceByDeviceId(deviceId);
RedisRpcResponse response = request.getResponse();
if (device == null || !userSetting.getServerId().equals(device.getServerId())) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
try {
deviceService.guard(device, guardCmdStr, (code, msg, data) -> {
response.setStatusCode(ErrorCode.SUCCESS.getCode());
response.setBody(new WVPResult<>(code, msg, data));
// 手动发送结果
sendResponse(response);
});
}catch (ControllerException e) {
response.setStatusCode(e.getCode());
response.setBody(WVPResult.fail(ErrorCode.ERROR100.getCode(), e.getMsg()));
sendResponse(response);
}
return null;
}
@RedisRpcMapping("resetAlarm")
public RedisRpcResponse resetAlarm(RedisRpcRequest request) {
JSONObject paramJson = JSONObject.parseObject(request.getParam().toString());
String deviceId = paramJson.getString("deviceId");
String channelId = paramJson.getString("channelId");
String alarmMethod = paramJson.getString("alarmMethod");
String alarmType = paramJson.getString("alarmType");
Device device = deviceService.getDeviceByDeviceId(deviceId);
RedisRpcResponse response = request.getResponse();
if (device == null || !userSetting.getServerId().equals(device.getServerId())) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
try {
deviceService.resetAlarm(device, channelId, alarmMethod, alarmType, (code, msg, data) -> {
response.setStatusCode(ErrorCode.SUCCESS.getCode());
response.setBody(new WVPResult<>(code, msg, data));
// 手动发送结果
sendResponse(response);
});
}catch (ControllerException e) {
response.setStatusCode(e.getCode());
response.setBody(WVPResult.fail(ErrorCode.ERROR100.getCode(), e.getMsg()));
sendResponse(response);
}
return null;
}
@RedisRpcMapping("iFrame")
public RedisRpcResponse iFrame(RedisRpcRequest request) {
JSONObject paramJson = JSONObject.parseObject(request.getParam().toString());
String deviceId = paramJson.getString("deviceId");
String channelId = paramJson.getString("channelId");
Device device = deviceService.getDeviceByDeviceId(deviceId);
RedisRpcResponse response = request.getResponse();
if (device == null || !userSetting.getServerId().equals(device.getServerId())) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
try {
deviceService.iFrame(device, channelId);
}catch (ControllerException e) {
response.setStatusCode(e.getCode());
response.setBody(WVPResult.fail(ErrorCode.ERROR100.getCode(), e.getMsg()));
sendResponse(response);
}
return null;
}
@RedisRpcMapping("homePosition")
public RedisRpcResponse homePosition(RedisRpcRequest request) {
JSONObject paramJson = JSONObject.parseObject(request.getParam().toString());
String deviceId = paramJson.getString("deviceId");
String channelId = paramJson.getString("channelId");
Boolean enabled = paramJson.getBoolean("enabled");
Integer resetTime = paramJson.getInteger("resetTime");
Integer presetIndex = paramJson.getInteger("presetIndex");
Device device = deviceService.getDeviceByDeviceId(deviceId);
RedisRpcResponse response = request.getResponse();
if (device == null || !userSetting.getServerId().equals(device.getServerId())) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
try {
deviceService.homePosition(device, channelId, enabled, resetTime, presetIndex, (code, msg, data) -> {
response.setStatusCode(ErrorCode.SUCCESS.getCode());
response.setBody(new WVPResult<>(code, msg, data));
// 手动发送结果
sendResponse(response);
});
}catch (ControllerException e) {
response.setStatusCode(e.getCode());
response.setBody(WVPResult.fail(ErrorCode.ERROR100.getCode(), e.getMsg()));
sendResponse(response);
}
return null;
}
@RedisRpcMapping("dragZoomIn")
public RedisRpcResponse dragZoomIn(RedisRpcRequest request) {
JSONObject paramJson = JSONObject.parseObject(request.getParam().toString());
String deviceId = paramJson.getString("deviceId");
String channelId = paramJson.getString("channelId");
Integer length = paramJson.getInteger("length");
Integer width = paramJson.getInteger("width");
Integer midpointx = paramJson.getInteger("midpointx");
Integer midpointy = paramJson.getInteger("midpointy");
Integer lengthx = paramJson.getInteger("lengthx");
Integer lengthy = paramJson.getInteger("lengthy");
Device device = deviceService.getDeviceByDeviceId(deviceId);
RedisRpcResponse response = request.getResponse();
if (device == null || !userSetting.getServerId().equals(device.getServerId())) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
try {
deviceService.dragZoomIn(device, channelId, length, width, midpointx, midpointy, lengthx, lengthy, (code, msg, data) -> {
response.setStatusCode(ErrorCode.SUCCESS.getCode());
response.setBody(new WVPResult<>(code, msg, data));
// 手动发送结果
sendResponse(response);
});
}catch (ControllerException e) {
response.setStatusCode(e.getCode());
response.setBody(WVPResult.fail(ErrorCode.ERROR100.getCode(), e.getMsg()));
sendResponse(response);
}
return null;
}
@RedisRpcMapping("dragZoomOut")
public RedisRpcResponse dragZoomOut(RedisRpcRequest request) {
JSONObject paramJson = JSONObject.parseObject(request.getParam().toString());
String deviceId = paramJson.getString("deviceId");
String channelId = paramJson.getString("channelId");
Integer length = paramJson.getInteger("length");
Integer width = paramJson.getInteger("width");
Integer midpointx = paramJson.getInteger("midpointx");
Integer midpointy = paramJson.getInteger("midpointy");
Integer lengthx = paramJson.getInteger("lengthx");
Integer lengthy = paramJson.getInteger("lengthy");
Device device = deviceService.getDeviceByDeviceId(deviceId);
RedisRpcResponse response = request.getResponse();
if (device == null || !userSetting.getServerId().equals(device.getServerId())) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
try {
deviceService.dragZoomOut(device, channelId, length, width, midpointx, midpointy, lengthx, lengthy, (code, msg, data) -> {
response.setStatusCode(ErrorCode.SUCCESS.getCode());
response.setBody(new WVPResult<>(code, msg, data));
// 手动发送结果
sendResponse(response);
});
}catch (ControllerException e) {
response.setStatusCode(e.getCode());
response.setBody(WVPResult.fail(ErrorCode.ERROR100.getCode(), e.getMsg()));
sendResponse(response);
}
return null;
}
@RedisRpcMapping("alarm")
public RedisRpcResponse alarm(RedisRpcRequest request) {
JSONObject paramJson = JSONObject.parseObject(request.getParam().toString());
String deviceId = paramJson.getString("deviceId");
String startPriority = paramJson.getString("startPriority");
String endPriority = paramJson.getString("endPriority");
String alarmMethod = paramJson.getString("alarmMethod");
String alarmType = paramJson.getString("alarmType");
String startTime = paramJson.getString("startTime");
String endTime = paramJson.getString("endTime");
Device device = deviceService.getDeviceByDeviceId(deviceId);
RedisRpcResponse response = request.getResponse();
if (device == null || !userSetting.getServerId().equals(device.getServerId())) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
try {
deviceService.alarm(device, startPriority, endPriority, alarmMethod, alarmType, startTime, endTime, (code, msg, data) -> {
response.setStatusCode(ErrorCode.SUCCESS.getCode());
response.setBody(new WVPResult<>(code, msg, data));
// 手动发送结果
sendResponse(response);
});
}catch (ControllerException e) {
response.setStatusCode(e.getCode());
response.setBody(WVPResult.fail(ErrorCode.ERROR100.getCode(), e.getMsg()));
sendResponse(response);
}
return null;
}
@RedisRpcMapping("deviceStatus")
public RedisRpcResponse deviceStatus(RedisRpcRequest request) {
String deviceId = request.getParam().toString();
Device device = deviceService.getDeviceByDeviceId(deviceId);
RedisRpcResponse response = request.getResponse();
if (device == null || !userSetting.getServerId().equals(device.getServerId())) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
try {
deviceService.deviceStatus(device, (code, msg, data) -> {
response.setStatusCode(ErrorCode.SUCCESS.getCode());
response.setBody(new WVPResult<>(code, msg, data));
// 手动发送结果
sendResponse(response);
});
}catch (ControllerException e) {
response.setStatusCode(e.getCode());
response.setBody(WVPResult.fail(ErrorCode.ERROR100.getCode(), e.getMsg()));
sendResponse(response);
}
return null;
}
@RedisRpcMapping("info")
public RedisRpcResponse info(RedisRpcRequest request) {
String deviceId = request.getParam().toString();
Device device = deviceService.getDeviceByDeviceId(deviceId);
RedisRpcResponse response = request.getResponse();
if (device == null || !userSetting.getServerId().equals(device.getServerId())) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
try {
deviceService.deviceInfo(device, (code, msg, data) -> {
response.setStatusCode(ErrorCode.SUCCESS.getCode());
response.setBody(new WVPResult<>(code, msg, data));
// 手动发送结果
sendResponse(response);
});
}catch (ControllerException e) {
response.setStatusCode(e.getCode());
response.setBody(WVPResult.fail(ErrorCode.ERROR100.getCode(), e.getMsg()));
sendResponse(response);
}
return null;
}
@RedisRpcMapping("info")
public RedisRpcResponse queryPreset(RedisRpcRequest request) {
JSONObject paramJson = JSONObject.parseObject(request.getParam().toString());
String deviceId = paramJson.getString("deviceId");
String channelId = paramJson.getString("channelId");
Device device = deviceService.getDeviceByDeviceId(deviceId);
RedisRpcResponse response = request.getResponse();
if (device == null || !userSetting.getServerId().equals(device.getServerId())) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
try {
deviceService.queryPreset(device, channelId, (code, msg, data) -> {
response.setStatusCode(ErrorCode.SUCCESS.getCode());
response.setBody(new WVPResult<>(code, msg, data));
// 手动发送结果
sendResponse(response);
});
}catch (ControllerException e) {
response.setStatusCode(e.getCode());
response.setBody(WVPResult.fail(ErrorCode.ERROR100.getCode(), e.getMsg()));
sendResponse(response);
}
return null;
}
}

View File

@@ -0,0 +1,74 @@
package com.genersoft.iot.vmp.service.redisMsg.control;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
import com.genersoft.iot.vmp.gb28181.service.IPlayService;
import com.genersoft.iot.vmp.service.redisMsg.dto.RedisRpcController;
import com.genersoft.iot.vmp.service.redisMsg.dto.RedisRpcMapping;
import com.genersoft.iot.vmp.service.redisMsg.dto.RpcController;
import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
@Component
@Slf4j
@RedisRpcController("devicePlay")
public class RedisRpcDevicePlayController extends RpcController {
@Autowired
private UserSetting userSetting;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
@Autowired
private IDeviceService deviceService;
@Autowired
private IPlayService playService;
private void sendResponse(RedisRpcResponse response){
log.info("[redis-rpc] >> {}", response);
response.setToId(userSetting.getServerId());
RedisRpcMessage message = new RedisRpcMessage();
message.setResponse(response);
redisTemplate.convertAndSend(RedisRpcConfig.REDIS_REQUEST_CHANNEL_KEY, message);
}
/**
* 获取通道同步状态
*/
@RedisRpcMapping("audioBroadcast")
public RedisRpcResponse audioBroadcast(RedisRpcRequest request) {
JSONObject paramJson = JSON.parseObject(request.getParam().toString());
String deviceId = paramJson.getString("deviceId");
String channelDeviceId = paramJson.getString("channelDeviceId");
Boolean broadcastMode = paramJson.getBoolean("broadcastMode");
Device device = deviceService.getDeviceByDeviceId(deviceId);
RedisRpcResponse response = request.getResponse();
if (device == null || !userSetting.getServerId().equals(device.getServerId())) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
AudioBroadcastResult audioBroadcastResult = playService.audioBroadcast(deviceId, channelDeviceId, broadcastMode);
response.setStatusCode(ErrorCode.SUCCESS.getCode());
response.setBody(JSONObject.toJSONString(audioBroadcastResult));
return response;
}
}

View File

@@ -0,0 +1,99 @@
package com.genersoft.iot.vmp.service.redisMsg.control;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.InviteMessageInfo;
import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
import com.genersoft.iot.vmp.gb28181.service.IGbChannelPlayService;
import com.genersoft.iot.vmp.gb28181.service.IGbChannelService;
import com.genersoft.iot.vmp.gb28181.service.IPTZService;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.service.redisMsg.dto.RedisRpcController;
import com.genersoft.iot.vmp.service.redisMsg.dto.RedisRpcMapping;
import com.genersoft.iot.vmp.service.redisMsg.dto.RpcController;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.sip.message.Response;
@Component
@Slf4j
@RedisRpcController("device")
public class RedisRpcGbDeviceController extends RpcController {
@Autowired
private UserSetting userSetting;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
@Autowired
private IDeviceService deviceService;
private void sendResponse(RedisRpcResponse response){
log.info("[redis-rpc] >> {}", response);
response.setToId(userSetting.getServerId());
RedisRpcMessage message = new RedisRpcMessage();
message.setResponse(response);
redisTemplate.convertAndSend(RedisRpcConfig.REDIS_REQUEST_CHANNEL_KEY, message);
}
/**
* 目录订阅
*/
@RedisRpcMapping("subscribeCatalog")
public RedisRpcResponse subscribeCatalog(RedisRpcRequest request) {
JSONObject paramJson = JSONObject.parseObject(request.getParam().toString());
int id = paramJson.getIntValue("id");
int cycle = paramJson.getIntValue("cycle");
RedisRpcResponse response = request.getResponse();
if (id <= 0) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
deviceService.subscribeCatalog(id, cycle);
response.setStatusCode(ErrorCode.SUCCESS.getCode());
return response;
}
/**
* 移动位置订阅
*/
@RedisRpcMapping("subscribeMobilePosition")
public RedisRpcResponse subscribeMobilePosition(RedisRpcRequest request) {
JSONObject paramJson = JSONObject.parseObject(request.getParam().toString());
int id = paramJson.getIntValue("id");
int cycle = paramJson.getIntValue("cycle");
int interval = paramJson.getIntValue("interval");
RedisRpcResponse response = request.getResponse();
if (id <= 0) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
deviceService.subscribeMobilePosition(id, cycle, interval);
response.setStatusCode(ErrorCode.SUCCESS.getCode());
return response;
}
}

View File

@@ -0,0 +1,83 @@
package com.genersoft.iot.vmp.service.redisMsg.control;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.gb28181.service.IPlatformChannelService;
import com.genersoft.iot.vmp.gb28181.service.IPlatformService;
import com.genersoft.iot.vmp.service.redisMsg.dto.RedisRpcController;
import com.genersoft.iot.vmp.service.redisMsg.dto.RedisRpcMapping;
import com.genersoft.iot.vmp.service.redisMsg.dto.RpcController;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
@Slf4j
@RedisRpcController("platform")
public class RedisRpcPlatformController extends RpcController {
@Autowired
private UserSetting userSetting;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
@Autowired
private IPlatformService platformService;
@Autowired
private IPlatformChannelService platformChannelService;
@Autowired
private EventPublisher eventPublisher;
private void sendResponse(RedisRpcResponse response){
log.info("[redis-rpc] >> {}", response);
response.setToId(userSetting.getServerId());
RedisRpcMessage message = new RedisRpcMessage();
message.setResponse(response);
redisTemplate.convertAndSend(RedisRpcConfig.REDIS_REQUEST_CHANNEL_KEY, message);
}
/**
* 更新
*/
@RedisRpcMapping("update")
public RedisRpcResponse play(RedisRpcRequest request) {
Platform platform = JSONObject.parseObject(request.getParam().toString(), Platform.class);
RedisRpcResponse response = request.getResponse();
boolean update = platformService.update(platform);
response.setStatusCode(ErrorCode.SUCCESS.getCode());
response.setBody(Boolean.toString(update));
return response;
}
/**
* 目录更新推送
*/
@RedisRpcMapping("catalogEventPublish")
public RedisRpcResponse catalogEventPublish(RedisRpcRequest request) {
JSONObject jsonObject = JSONObject.parseObject(request.getParam().toString());
Platform platform = jsonObject.getObject("platform", Platform.class);
List<CommonGBChannel> channels = jsonObject.getJSONArray("channels").toJavaList(CommonGBChannel.class);
String type = jsonObject.getString("type");
eventPublisher.catalogEventPublish(platform, channels, type, false);
RedisRpcResponse response = request.getResponse();
response.setStatusCode(ErrorCode.SUCCESS.getCode());
return response;
}
}

View File

@@ -0,0 +1,165 @@
package com.genersoft.iot.vmp.service.redisMsg.control;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.ISendRtpServerService;
import com.genersoft.iot.vmp.service.redisMsg.dto.RedisRpcController;
import com.genersoft.iot.vmp.service.redisMsg.dto.RedisRpcMapping;
import com.genersoft.iot.vmp.service.redisMsg.dto.RpcController;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.sip.message.Response;
@Component
@Slf4j
@RedisRpcController("sendRtp")
public class RedisRpcSendRtpController extends RpcController {
@Autowired
private SSRCFactory ssrcFactory;
@Autowired
private IMediaServerService mediaServerService;
@Autowired
private ISendRtpServerService sendRtpServerService;
@Autowired
private UserSetting userSetting;
/**
* 获取发流的信息
*/
@RedisRpcMapping("getSendRtpItem")
public RedisRpcResponse getSendRtpItem(RedisRpcRequest request) {
String callId = request.getParam().toString();
SendRtpInfo sendRtpItem = sendRtpServerService.queryByCallId(callId);
if (sendRtpItem == null) {
log.info("[redis-rpc] 获取发流的信息, 未找到redis中的发流信息 callId{}", callId);
RedisRpcResponse response = request.getResponse();
response.setStatusCode(ErrorCode.SUCCESS.getCode());
return response;
}
log.info("[redis-rpc] 获取发流的信息: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
// 查询本级是否有这个流
MediaServer mediaServerItem = mediaServerService.getMediaServerByAppAndStream(sendRtpItem.getApp(), sendRtpItem.getStream());
if (mediaServerItem == null) {
RedisRpcResponse response = request.getResponse();
response.setStatusCode(ErrorCode.SUCCESS.getCode());
}
// 自平台内容
int localPort = sendRtpServerService.getNextPort(mediaServerItem);
if (localPort == 0) {
log.info("[redis-rpc] getSendRtpItem->服务器端口资源不足" );
RedisRpcResponse response = request.getResponse();
response.setStatusCode(ErrorCode.SUCCESS.getCode());
}
// 写入redis 超时时回复
sendRtpItem.setStatus(1);
sendRtpItem.setServerId(userSetting.getServerId());
sendRtpItem.setLocalIp(mediaServerItem.getSdpIp());
if (sendRtpItem.getSsrc() == null) {
// 上级平台点播时不使用上级平台指定的ssrc使用自定义的ssrc参考国标文档-点播外域设备媒体流SSRC处理方式
String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId());
sendRtpItem.setSsrc(ssrc);
}
sendRtpServerService.update(sendRtpItem);
RedisRpcResponse response = request.getResponse();
response.setStatusCode(ErrorCode.SUCCESS.getCode());
response.setBody(callId);
return response;
}
/**
* 开始发流
*/
@RedisRpcMapping("startSendRtp")
public RedisRpcResponse startSendRtp(RedisRpcRequest request) {
String callId = request.getParam().toString();
SendRtpInfo sendRtpItem = sendRtpServerService.queryByCallId(callId);
RedisRpcResponse response = request.getResponse();
response.setStatusCode(ErrorCode.SUCCESS.getCode());
if (sendRtpItem == null) {
log.info("[redis-rpc] 开始发流, 未找到redis中的发流信息 callId{}", callId);
WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到redis中的发流信息");
response.setBody(wvpResult);
return response;
}
log.info("[redis-rpc] 开始发流: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
if (mediaServer == null) {
log.info("[redis-rpc] startSendRtp->未找到MediaServer {}", sendRtpItem.getMediaServerId() );
WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到MediaServer");
response.setBody(wvpResult);
return response;
}
MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream());
if (mediaInfo == null) {
log.info("[redis-rpc] startSendRtp->流不在线: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() );
WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "流不在线");
response.setBody(wvpResult);
return response;
}
try {
mediaServerService.startSendRtp(mediaServer, sendRtpItem);
}catch (ControllerException exception) {
log.info("[redis-rpc] 发流失败: {}/{}, 目标地址: {}{} {}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), exception.getMsg());
WVPResult wvpResult = WVPResult.fail(exception.getCode(), exception.getMsg());
response.setBody(wvpResult);
return response;
}
log.info("[redis-rpc] 发流成功: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
WVPResult wvpResult = WVPResult.success();
response.setBody(wvpResult);
return response;
}
/**
* 停止发流
*/
@RedisRpcMapping("stopSendRtp")
public RedisRpcResponse stopSendRtp(RedisRpcRequest request) {
String callId = request.getParam().toString();
SendRtpInfo sendRtpItem = sendRtpServerService.queryByCallId(callId);
RedisRpcResponse response = request.getResponse();
response.setStatusCode(ErrorCode.SUCCESS.getCode());
if (sendRtpItem == null) {
log.info("[redis-rpc] 停止推流, 未找到redis中的发流信息 key{}", callId);
WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到redis中的发流信息");
response.setBody(wvpResult);
return response;
}
log.info("[redis-rpc] 停止推流: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
if (mediaServer == null) {
log.info("[redis-rpc] stopSendRtp->未找到MediaServer {}", sendRtpItem.getMediaServerId() );
WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到MediaServer");
response.setBody(wvpResult);
return response;
}
try {
mediaServerService.stopSendRtp(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc());
}catch (ControllerException exception) {
log.info("[redis-rpc] 停止推流失败: {}/{}, 目标地址: {}{} code {}, msg: {}", sendRtpItem.getApp(),
sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), exception.getCode(), exception.getMsg() );
response.setBody(WVPResult.fail(exception.getCode(), exception.getMsg()));
return response;
}
log.info("[redis-rpc] 停止推流成功: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
response.setBody(WVPResult.success());
return response;
}
}

View File

@@ -0,0 +1,95 @@
package com.genersoft.iot.vmp.service.redisMsg.control;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
import com.genersoft.iot.vmp.service.redisMsg.dto.RedisRpcController;
import com.genersoft.iot.vmp.service.redisMsg.dto.RedisRpcMapping;
import com.genersoft.iot.vmp.service.redisMsg.dto.RpcController;
import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy;
import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyPlayService;
import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyService;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
@Component
@Slf4j
@RedisRpcController("streamProxy")
public class RedisRpcStreamProxyController extends RpcController {
@Autowired
private UserSetting userSetting;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
@Autowired
private IStreamProxyPlayService streamProxyPlayService;
@Autowired
private IStreamProxyService streamProxyService;
private void sendResponse(RedisRpcResponse response){
log.info("[redis-rpc] >> {}", response);
response.setToId(userSetting.getServerId());
RedisRpcMessage message = new RedisRpcMessage();
message.setResponse(response);
redisTemplate.convertAndSend(RedisRpcConfig.REDIS_REQUEST_CHANNEL_KEY, message);
}
/**
* 播放
*/
@RedisRpcMapping("play")
public RedisRpcResponse play(RedisRpcRequest request) {
int id = Integer.parseInt(request.getParam().toString());
RedisRpcResponse response = request.getResponse();
if (id <= 0) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
StreamProxy streamProxy = streamProxyService.getStreamProxy(id);
if (streamProxy == null) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
StreamInfo streamInfo = streamProxyPlayService.startProxy(streamProxy);
response.setStatusCode(ErrorCode.SUCCESS.getCode());
response.setBody(JSONObject.toJSONString(streamInfo));
return response;
}
/**
* 停止
*/
@RedisRpcMapping("stop")
public RedisRpcResponse stop(RedisRpcRequest request) {
int id = Integer.parseInt(request.getParam().toString());
RedisRpcResponse response = request.getResponse();
if (id <= 0) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
StreamProxy streamProxy = streamProxyService.getStreamProxy(id);
if (streamProxy == null) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
streamProxyPlayService.stopProxy(streamProxy);
response.setStatusCode(ErrorCode.SUCCESS.getCode());
return response;
}
}

View File

@@ -3,33 +3,32 @@ package com.genersoft.iot.vmp.service.redisMsg.control;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.ISendRtpServerService;
import com.genersoft.iot.vmp.service.redisMsg.dto.RedisRpcController;
import com.genersoft.iot.vmp.service.redisMsg.dto.RedisRpcMapping;
import com.genersoft.iot.vmp.service.redisMsg.dto.RpcController;
import com.genersoft.iot.vmp.streamPush.service.IStreamPushPlayService;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
/**
* 其他wvp发起的rpc调用这里的方法被 RedisRpcConfig 通过反射寻找对应的方法名称调用
*/
@Slf4j
@Component
public class RedisRpcController {
@Slf4j
@RedisRpcController("streamPush")
public class RedisRpcStreamPushController extends RpcController {
@Autowired
private SSRCFactory ssrcFactory;
@@ -49,52 +48,22 @@ public class RedisRpcController {
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
@Autowired
private IStreamPushPlayService streamPushPlayService;
/**
* 获取发流的信息
*/
public RedisRpcResponse getSendRtpItem(RedisRpcRequest request) {
String callId = request.getParam().toString();
SendRtpInfo sendRtpItem = sendRtpServerService.queryByCallId(callId);
if (sendRtpItem == null) {
log.info("[redis-rpc] 获取发流的信息, 未找到redis中的发流信息 callId{}", callId);
RedisRpcResponse response = request.getResponse();
response.setStatusCode(200);
return response;
}
log.info("[redis-rpc] 获取发流的信息: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
// 查询本级是否有这个流
MediaServer mediaServerItem = mediaServerService.getMediaServerByAppAndStream(sendRtpItem.getApp(), sendRtpItem.getStream());
if (mediaServerItem == null) {
RedisRpcResponse response = request.getResponse();
response.setStatusCode(200);
}
// 自平台内容
int localPort = sendRtpServerService.getNextPort(mediaServerItem);
if (localPort == 0) {
log.info("[redis-rpc] getSendRtpItem->服务器端口资源不足" );
RedisRpcResponse response = request.getResponse();
response.setStatusCode(200);
}
// 写入redis 超时时回复
sendRtpItem.setStatus(1);
sendRtpItem.setServerId(userSetting.getServerId());
sendRtpItem.setLocalIp(mediaServerItem.getSdpIp());
if (sendRtpItem.getSsrc() == null) {
// 上级平台点播时不使用上级平台指定的ssrc使用自定义的ssrc参考国标文档-点播外域设备媒体流SSRC处理方式
String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId());
sendRtpItem.setSsrc(ssrc);
}
sendRtpServerService.update(sendRtpItem);
RedisRpcResponse response = request.getResponse();
response.setStatusCode(200);
response.setBody(callId);
return response;
private void sendResponse(RedisRpcResponse response){
log.info("[redis-rpc] >> {}", response);
response.setToId(userSetting.getServerId());
RedisRpcMessage message = new RedisRpcMessage();
message.setResponse(response);
redisTemplate.convertAndSend(RedisRpcConfig.REDIS_REQUEST_CHANNEL_KEY, message);
}
/**
* 监听流上线
*/
@RedisRpcMapping("waitePushStreamOnline")
public RedisRpcResponse waitePushStreamOnline(RedisRpcRequest request) {
SendRtpInfo sendRtpItem = JSONObject.parseObject(request.getParam().toString(), SendRtpInfo.class);
log.info("[redis-rpc] 监听流上线: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
@@ -115,7 +84,7 @@ public class RedisRpcController {
sendRtpServerService.update(sendRtpItem);
RedisRpcResponse response = request.getResponse();
response.setBody(sendRtpItem.getChannelId());
response.setStatusCode(200);
response.setStatusCode(ErrorCode.SUCCESS.getCode());
}
// 监听流上线 流上线直接发送sendRtpItem消息给实际的信令处理者
Hook hook = Hook.getInstance(HookType.on_media_arrival, sendRtpItem.getApp(), sendRtpItem.getStream(), null);
@@ -134,7 +103,7 @@ public class RedisRpcController {
redisTemplate.opsForValue().set(sendRtpItem.getChannelId(), sendRtpItem);
RedisRpcResponse response = request.getResponse();
response.setBody(sendRtpItem.getChannelId());
response.setStatusCode(200);
response.setStatusCode(ErrorCode.SUCCESS.getCode());
// 手动发送结果
sendResponse(response);
hookSubscribe.removeSubscribe(hook);
@@ -146,6 +115,7 @@ public class RedisRpcController {
/**
* 监听流上线
*/
@RedisRpcMapping("onStreamOnlineEvent")
public RedisRpcResponse onStreamOnlineEvent(RedisRpcRequest request) {
StreamInfo streamInfo = JSONObject.parseObject(request.getParam().toString(), StreamInfo.class);
log.info("[redis-rpc] 监听流信息,等待流上线: {}/{}", streamInfo.getApp(), streamInfo.getStream());
@@ -155,7 +125,7 @@ public class RedisRpcController {
log.info("[redis-rpc] 监听流上线时发现流已存在直接返回: {}/{}", streamInfo.getApp(), streamInfo.getStream());
RedisRpcResponse response = request.getResponse();
response.setBody(JSONObject.toJSONString(streamInfoInServer));
response.setStatusCode(200);
response.setStatusCode(ErrorCode.SUCCESS.getCode());
return response;
}
// 监听流上线 流上线直接发送sendRtpItem消息给实际的信令处理者
@@ -168,7 +138,7 @@ public class RedisRpcController {
streamInfo.getApp(), streamInfo.getStream(), hookData.getMediaInfo(),
hookData.getMediaInfo() != null ? hookData.getMediaInfo().getCallId() : null);
response.setBody(JSONObject.toJSONString(streamInfoByAppAndStream));
response.setStatusCode(200);
response.setStatusCode(ErrorCode.SUCCESS.getCode());
// 手动发送结果
sendResponse(response);
hookSubscribe.removeSubscribe(hook);
@@ -179,6 +149,7 @@ public class RedisRpcController {
/**
* 停止监听流上线
*/
@RedisRpcMapping("stopWaitePushStreamOnline")
public RedisRpcResponse stopWaitePushStreamOnline(RedisRpcRequest request) {
SendRtpInfo sendRtpItem = JSONObject.parseObject(request.getParam().toString(), SendRtpInfo.class);
log.info("[redis-rpc] 停止监听流上线: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
@@ -186,13 +157,14 @@ public class RedisRpcController {
Hook hook = Hook.getInstance(HookType.on_media_arrival, sendRtpItem.getApp(), sendRtpItem.getStream(), null);
hookSubscribe.removeSubscribe(hook);
RedisRpcResponse response = request.getResponse();
response.setStatusCode(200);
response.setStatusCode(ErrorCode.SUCCESS.getCode());
return response;
}
/**
* 停止监听流上线
*/
@RedisRpcMapping("unPushStreamOnlineEvent")
public RedisRpcResponse unPushStreamOnlineEvent(RedisRpcRequest request) {
StreamInfo streamInfo = JSONObject.parseObject(request.getParam().toString(), StreamInfo.class);
log.info("[redis-rpc] 停止监听流上线: {}/{}", streamInfo.getApp(), streamInfo.getStream());
@@ -200,94 +172,30 @@ public class RedisRpcController {
Hook hook = Hook.getInstance(HookType.on_media_arrival, streamInfo.getApp(), streamInfo.getStream(), null);
hookSubscribe.removeSubscribe(hook);
RedisRpcResponse response = request.getResponse();
response.setStatusCode(200);
return response;
}
/**
* 开始发流
*/
public RedisRpcResponse startSendRtp(RedisRpcRequest request) {
String callId = request.getParam().toString();
SendRtpInfo sendRtpItem = sendRtpServerService.queryByCallId(callId);
RedisRpcResponse response = request.getResponse();
response.setStatusCode(200);
if (sendRtpItem == null) {
log.info("[redis-rpc] 开始发流, 未找到redis中的发流信息 callId{}", callId);
WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到redis中的发流信息");
response.setBody(wvpResult);
return response;
}
log.info("[redis-rpc] 开始发流: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
if (mediaServer == null) {
log.info("[redis-rpc] startSendRtp->未找到MediaServer {}", sendRtpItem.getMediaServerId() );
WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到MediaServer");
response.setBody(wvpResult);
return response;
}
MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream());
if (mediaInfo == null) {
log.info("[redis-rpc] startSendRtp->流不在线: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() );
WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "流不在线");
response.setBody(wvpResult);
return response;
}
try {
mediaServerService.startSendRtp(mediaServer, sendRtpItem);
}catch (ControllerException exception) {
log.info("[redis-rpc] 发流失败: {}/{}, 目标地址: {}{} {}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), exception.getMsg());
WVPResult wvpResult = WVPResult.fail(exception.getCode(), exception.getMsg());
response.setBody(wvpResult);
return response;
}
log.info("[redis-rpc] 发流成功: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
WVPResult wvpResult = WVPResult.success();
response.setBody(wvpResult);
response.setStatusCode(ErrorCode.SUCCESS.getCode());
return response;
}
/**
* 停止发流
* 停止监听流上线
*/
public RedisRpcResponse stopSendRtp(RedisRpcRequest request) {
String callId = request.getParam().toString();
SendRtpInfo sendRtpItem = sendRtpServerService.queryByCallId(callId);
@RedisRpcMapping("play")
public RedisRpcResponse play(RedisRpcRequest request) {
int id = Integer.parseInt(request.getParam().toString());
RedisRpcResponse response = request.getResponse();
response.setStatusCode(200);
if (sendRtpItem == null) {
log.info("[redis-rpc] 停止推流, 未找到redis中的发流信息 key{}", callId);
WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到redis中的发流信息");
response.setBody(wvpResult);
if (id <= 0) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
log.info("[redis-rpc] 停止推流: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
if (mediaServer == null) {
log.info("[redis-rpc] stopSendRtp->未找到MediaServer {}", sendRtpItem.getMediaServerId() );
WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到MediaServer");
response.setBody(wvpResult);
return response;
}
try {
mediaServerService.stopSendRtp(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc());
}catch (ControllerException exception) {
log.info("[redis-rpc] 停止推流失败: {}/{}, 目标地址: {}{} code {}, msg: {}", sendRtpItem.getApp(),
sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), exception.getCode(), exception.getMsg() );
response.setBody(WVPResult.fail(exception.getCode(), exception.getMsg()));
return response;
}
log.info("[redis-rpc] 停止推流成功: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
response.setBody(WVPResult.success());
return response;
streamPushPlayService.start(id, (code, msg, data) -> {
if (code == ErrorCode.SUCCESS.getCode()) {
response.setStatusCode(ErrorCode.SUCCESS.getCode());
response.setBody(JSONObject.toJSONString(data));
sendResponse(response);
}
}, null, null);
return null;
}
private void sendResponse(RedisRpcResponse response){
log.info("[redis-rpc] >> {}", response);
response.setToId(userSetting.getServerId());
RedisRpcMessage message = new RedisRpcMessage();
message.setResponse(response);
redisTemplate.convertAndSend(RedisRpcConfig.REDIS_REQUEST_CHANNEL_KEY, message);
}
}

View File

@@ -0,0 +1,13 @@
package com.genersoft.iot.vmp.service.redisMsg.dto;
import java.lang.annotation.*;
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RedisRpcController {
/**
* 请求路径
*/
String value() default "";
}

View File

@@ -0,0 +1,13 @@
package com.genersoft.iot.vmp.service.redisMsg.dto;
import java.lang.annotation.*;
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RedisRpcMapping {
/**
* 请求路径
*/
String value() default "";
}

View File

@@ -0,0 +1,33 @@
package com.genersoft.iot.vmp.service.redisMsg.dto;
import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcClassHandler;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.PostConstruct;
import java.lang.reflect.Method;
public class RpcController {
@Autowired
private RedisRpcConfig redisRpcConfig;
@PostConstruct
public void init() {
String controllerPath = this.getClass().getAnnotation(RedisRpcController.class).value();
// 扫描其下的方法
Method[] methods = this.getClass().getDeclaredMethods();
for (Method method : methods) {
RedisRpcMapping annotation = method.getAnnotation(RedisRpcMapping.class);
if (annotation != null) {
String methodPath = annotation.value();
if (methodPath != null) {
redisRpcConfig.addHandler(controllerPath + "/" + methodPath, new RedisRpcClassHandler(this, method));
}
}
}
}
}

View File

@@ -0,0 +1,257 @@
package com.genersoft.iot.vmp.service.redisMsg.service;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
import com.genersoft.iot.vmp.gb28181.bean.RecordInfo;
import com.genersoft.iot.vmp.service.bean.DownloadFileInfo;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcPlayService;
import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
@Slf4j
@Service
public class RedisRpcPlayServiceImpl implements IRedisRpcPlayService {
@Autowired
private RedisRpcConfig redisRpcConfig;
@Autowired
private UserSetting userSetting;
private RedisRpcRequest buildRequest(String uri, Object param) {
RedisRpcRequest request = new RedisRpcRequest();
request.setFromId(userSetting.getServerId());
request.setParam(param);
request.setUri(uri);
return request;
}
@Override
public void play(String serverId, Integer channelId, ErrorCallback<StreamInfo> callback) {
RedisRpcRequest request = buildRequest("channel/play", channelId);
request.setToId(serverId);
RedisRpcResponse response = redisRpcConfig.request(request, userSetting.getPlayTimeout(), TimeUnit.MILLISECONDS);
if (response == null) {
callback.run(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg(), null);
}else {
if (response.getStatusCode() == ErrorCode.SUCCESS.getCode()) {
StreamInfo streamInfo = JSON.parseObject(response.getBody().toString(), StreamInfo.class);
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
}else {
callback.run(response.getStatusCode(), response.getBody().toString(), null);
}
}
}
@Override
public void stop(String serverId, InviteSessionType type, int channelId, String stream) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("channelId", channelId);
jsonObject.put("stream", stream);
jsonObject.put("inviteSessionType", type);
RedisRpcRequest request = buildRequest("channel/stop", jsonObject.toJSONString());
request.setToId(serverId);
RedisRpcResponse response = redisRpcConfig.request(request, 50, TimeUnit.MICROSECONDS);
if (response == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg());
}else {
if (response.getStatusCode() != ErrorCode.SUCCESS.getCode()) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg());
}
}
}
@Override
public void queryRecordInfo(String serverId, Integer channelId, String startTime, String endTime, ErrorCallback<RecordInfo> callback) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("channelId", channelId);
jsonObject.put("startTime", startTime);
jsonObject.put("endTime", endTime);
RedisRpcRequest request = buildRequest("channel/queryRecordInfo", jsonObject);
request.setToId(serverId);
RedisRpcResponse response = redisRpcConfig.request(request, userSetting.getRecordInfoTimeout(), TimeUnit.MILLISECONDS);
if (response == null) {
callback.run(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg(), null);
}else {
if (response.getStatusCode() == ErrorCode.SUCCESS.getCode()) {
RecordInfo recordInfo = JSON.parseObject(response.getBody().toString(), RecordInfo.class);
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), recordInfo);
}else {
callback.run(response.getStatusCode(), response.getBody().toString(), null);
}
}
}
@Override
public void playback(String serverId, Integer channelId, String startTime, String endTime, ErrorCallback<StreamInfo> callback) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("channelId", channelId);
jsonObject.put("startTime", startTime);
jsonObject.put("endTime", endTime);
RedisRpcRequest request = buildRequest("channel/playback", jsonObject.toString());
request.setToId(serverId);
RedisRpcResponse response = redisRpcConfig.request(request, userSetting.getPlayTimeout(), TimeUnit.MILLISECONDS);
if (response == null) {
callback.run(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg(), null);
}else {
if (response.getStatusCode() == ErrorCode.SUCCESS.getCode()) {
StreamInfo streamInfo = JSON.parseObject(response.getBody().toString(), StreamInfo.class);
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
}else {
callback.run(response.getStatusCode(), response.getBody().toString(), null);
}
}
}
@Override
public void pauseRtp(String serverId, String streamId) {
RedisRpcRequest request = buildRequest("channel/pauseRtp", streamId);
request.setToId(serverId);
RedisRpcResponse response = redisRpcConfig.request(request, 5, TimeUnit.SECONDS);
if (response == null) {
log.info("[RPC 暂停回放] 失败, streamId: {}", streamId);
}else {
if (response.getStatusCode() != ErrorCode.SUCCESS.getCode()) {
log.info("[RPC 暂停回放] 失败, {}, streamId: {}", response.getBody(), streamId);
}
}
}
@Override
public void resumeRtp(String serverId, String streamId) {
RedisRpcRequest request = buildRequest("channel/resumeRtp", streamId);
request.setToId(serverId);
RedisRpcResponse response = redisRpcConfig.request(request, 5, TimeUnit.SECONDS);
if (response == null) {
log.info("[RPC 恢复回放] 失败, streamId: {}", streamId);
}else {
if (response.getStatusCode() != ErrorCode.SUCCESS.getCode()) {
log.info("[RPC 恢复回放] 失败, {}, streamId: {}", response.getBody(), streamId);
}
}
}
@Override
public void download(String serverId, Integer channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback<StreamInfo> callback) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("channelId", channelId);
jsonObject.put("startTime", startTime);
jsonObject.put("endTime", endTime);
jsonObject.put("downloadSpeed", downloadSpeed);
RedisRpcRequest request = buildRequest("channel/download", jsonObject.toString());
request.setToId(serverId);
RedisRpcResponse response = redisRpcConfig.request(request, userSetting.getPlayTimeout(), TimeUnit.MILLISECONDS);
if (response == null) {
callback.run(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg(), null);
}else {
if (response.getStatusCode() == ErrorCode.SUCCESS.getCode()) {
StreamInfo streamInfo = JSON.parseObject(response.getBody().toString(), StreamInfo.class);
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
}else {
callback.run(response.getStatusCode(), response.getBody().toString(), null);
}
}
}
@Override
public String frontEndCommand(String serverId, Integer channelId, int cmdCode, int parameter1, int parameter2, int combindCode2) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("channelId", channelId);
jsonObject.put("cmdCode", cmdCode);
jsonObject.put("parameter1", parameter1);
jsonObject.put("parameter2", parameter2);
jsonObject.put("combindCode2", combindCode2);
RedisRpcRequest request = buildRequest("channel/ptz/frontEndCommand", jsonObject.toString());
request.setToId(serverId);
RedisRpcResponse response = redisRpcConfig.request(request, userSetting.getPlayTimeout(), TimeUnit.MILLISECONDS);
if (response == null) {
return ErrorCode.ERROR100.getMsg();
}else {
if (response.getStatusCode() != ErrorCode.SUCCESS.getCode()) {
return response.getBody().toString();
}
}
return null;
}
@Override
public void playPush(Integer id, ErrorCallback<StreamInfo> callback) {
RedisRpcRequest request = buildRequest("streamPush/play", id);
RedisRpcResponse response = redisRpcConfig.request(request, userSetting.getPlayTimeout(), TimeUnit.SECONDS);
if (response == null) {
callback.run(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg(), null);
}else {
if (response.getStatusCode() == ErrorCode.SUCCESS.getCode()) {
StreamInfo streamInfo = JSON.parseObject(response.getBody().toString(), StreamInfo.class);
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
}else {
callback.run(response.getStatusCode(), response.getBody().toString(), null);
}
}
}
@Override
public StreamInfo playProxy(String serverId, int id) {
RedisRpcRequest request = buildRequest("streamProxy/play", id);
RedisRpcResponse response = redisRpcConfig.request(request, userSetting.getPlayTimeout(), TimeUnit.SECONDS);
if (response != null && response.getStatusCode() == ErrorCode.SUCCESS.getCode()) {
return JSON.parseObject(response.getBody().toString(), StreamInfo.class);
}
return null;
}
@Override
public void stopProxy(String serverId, int id) {
RedisRpcRequest request = buildRequest("streamProxy/stop", id);
RedisRpcResponse response = redisRpcConfig.request(request, userSetting.getPlayTimeout(), TimeUnit.SECONDS);
if (response != null && response.getStatusCode() == ErrorCode.SUCCESS.getCode()) {
log.info("[rpc 拉流代理] 停止成功: id: {}", id);
}else {
log.info("[rpc 拉流代理] 停止失败 id: {}", id);
}
}
@Override
public DownloadFileInfo getRecordPlayUrl(String serverId, Integer recordId) {
RedisRpcRequest request = buildRequest("cloudRecord/play", recordId);
RedisRpcResponse response = redisRpcConfig.request(request, userSetting.getPlayTimeout(), TimeUnit.SECONDS);
if (response != null && response.getStatusCode() == ErrorCode.SUCCESS.getCode()) {
return JSON.parseObject(response.getBody().toString(), DownloadFileInfo.class);
}
return null;
}
@Override
public AudioBroadcastResult audioBroadcast(String serverId, String deviceId, String channelDeviceId, Boolean broadcastMode) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("deviceId", deviceId);
jsonObject.put("channelDeviceId", channelDeviceId);
jsonObject.put("broadcastMode", broadcastMode);
RedisRpcRequest request = buildRequest("devicePlay/audioBroadcast", jsonObject.toString());
request.setToId(serverId);
RedisRpcResponse response = redisRpcConfig.request(request, userSetting.getPlayTimeout(), TimeUnit.SECONDS);
if (response != null && response.getStatusCode() == ErrorCode.SUCCESS.getCode()) {
return JSON.parseObject(response.getBody().toString(), AudioBroadcastResult.class);
}
return null;
}
}

View File

@@ -1,13 +1,16 @@
package com.genersoft.iot.vmp.service.redisMsg.service;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
@@ -22,6 +25,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
@Slf4j
@Service
public class RedisRpcServiceImpl implements IRedisRpcService {
@@ -58,8 +63,8 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
@Override
public SendRtpInfo getSendRtpItem(String callId) {
RedisRpcRequest request = buildRequest("getSendRtpItem", callId);
RedisRpcResponse response = redisRpcConfig.request(request, 10);
RedisRpcRequest request = buildRequest("sendRtp/getSendRtpItem", callId);
RedisRpcResponse response = redisRpcConfig.request(request, 10, TimeUnit.MILLISECONDS);
if (response.getBody() == null) {
return null;
}
@@ -69,9 +74,9 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
@Override
public WVPResult startSendRtp(String callId, SendRtpInfo sendRtpItem) {
log.info("[请求其他WVP] 开始推流wvp{} {}/{}", sendRtpItem.getServerId(), sendRtpItem.getApp(), sendRtpItem.getStream());
RedisRpcRequest request = buildRequest("startSendRtp", callId);
RedisRpcRequest request = buildRequest("sendRtp/startSendRtp", callId);
request.setToId(sendRtpItem.getServerId());
RedisRpcResponse response = redisRpcConfig.request(request, 10);
RedisRpcResponse response = redisRpcConfig.request(request, 10, TimeUnit.MILLISECONDS);
return JSON.parseObject(response.getBody().toString(), WVPResult.class);
}
@@ -83,9 +88,9 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
return WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到发流信息");
}
log.info("[请求其他WVP] 停止推流wvp{} {}/{}", sendRtpItem.getServerId(), sendRtpItem.getApp(), sendRtpItem.getStream());
RedisRpcRequest request = buildRequest("stopSendRtp", callId);
RedisRpcRequest request = buildRequest("sendRtp/stopSendRtp", callId);
request.setToId(sendRtpItem.getServerId());
RedisRpcResponse response = redisRpcConfig.request(request, 10);
RedisRpcResponse response = redisRpcConfig.request(request, 10, TimeUnit.MILLISECONDS);
return JSON.parseObject(response.getBody().toString(), WVPResult.class);
}
@@ -94,7 +99,7 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
log.info("[请求所有WVP监听流上线] {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
// 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者
Hook hook = Hook.getInstance(HookType.on_media_arrival, sendRtpItem.getApp(), sendRtpItem.getStream(), null);
RedisRpcRequest request = buildRequest("waitePushStreamOnline", sendRtpItem);
RedisRpcRequest request = buildRequest("streamPush/waitePushStreamOnline", sendRtpItem);
request.setToId(sendRtpItem.getServerId());
hookSubscribe.addSubscribe(hook, (hookData) -> {
@@ -135,9 +140,9 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
log.info("[停止WVP监听流上线] {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
Hook hook = Hook.getInstance(HookType.on_media_arrival, sendRtpItem.getApp(), sendRtpItem.getStream(), null);
hookSubscribe.removeSubscribe(hook);
RedisRpcRequest request = buildRequest("stopWaitePushStreamOnline", sendRtpItem);
RedisRpcRequest request = buildRequest("streamPush/stopWaitePushStreamOnline", sendRtpItem);
request.setToId(sendRtpItem.getServerId());
redisRpcConfig.request(request, 10);
redisRpcConfig.request(request, 10, TimeUnit.MILLISECONDS);
}
@Override
@@ -147,9 +152,9 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
log.info("[停止WVP监听流上线] 未找到redis中的发流信息 key{}", callId);
return;
}
RedisRpcRequest request = buildRequest("rtpSendStopped", callId);
RedisRpcRequest request = buildRequest("streamPush/rtpSendStopped", callId);
request.setToId(sendRtpItem.getServerId());
redisRpcConfig.request(request, 10);
redisRpcConfig.request(request, 10, TimeUnit.MILLISECONDS);
}
@Override
@@ -166,7 +171,7 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
StreamInfo streamInfoParam = new StreamInfo();
streamInfoParam.setApp(app);
streamInfoParam.setStream(stream);
RedisRpcRequest request = buildRequest("onStreamOnlineEvent", streamInfoParam);
RedisRpcRequest request = buildRequest("streamPush/onStreamOnlineEvent", streamInfoParam);
hookSubscribe.addSubscribe(hook, (hookData) -> {
log.info("[请求所有WVP监听流上线] 监听流上线 {}/{}", app, stream);
if (callback != null) {
@@ -198,7 +203,231 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
StreamInfo streamInfoParam = new StreamInfo();
streamInfoParam.setApp(app);
streamInfoParam.setStream(stream);
RedisRpcRequest request = buildRequest("unPushStreamOnlineEvent", streamInfoParam);
redisRpcConfig.request(request, 10);
RedisRpcRequest request = buildRequest("streamPush/unPushStreamOnlineEvent", streamInfoParam);
redisRpcConfig.request(request, 10, TimeUnit.MILLISECONDS);
}
@Override
public void subscribeCatalog(int id, int cycle) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("id", id);
jsonObject.put("cycle", cycle);
RedisRpcRequest request = buildRequest("device/subscribeCatalog", jsonObject);
redisRpcConfig.request(request, 10, TimeUnit.MILLISECONDS);
}
@Override
public void subscribeMobilePosition(int id, int cycle, int interval) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("id", id);
jsonObject.put("cycle", cycle);
jsonObject.put("interval", cycle);
RedisRpcRequest request = buildRequest("device/subscribeMobilePosition", jsonObject);
redisRpcConfig.request(request, 10, TimeUnit.MILLISECONDS);
}
@Override
public boolean updatePlatform(String serverId, Platform platform) {
RedisRpcRequest request = buildRequest("platform/update", platform);
request.setToId(serverId);
RedisRpcResponse response = redisRpcConfig.request(request, 40, TimeUnit.MILLISECONDS);
return Boolean.parseBoolean(response.getBody().toString());
}
@Override
public void catalogEventPublish(String serverId, CatalogEvent event) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("platform", event.getPlatform());
jsonObject.put("channels", event.getChannels());
jsonObject.put("type", event.getType());
RedisRpcRequest request = buildRequest("platform/catalogEventPublish", jsonObject);
if (serverId != null) {
request.setToId(serverId);
}
redisRpcConfig.request(request, 10, TimeUnit.MILLISECONDS);
}
@Override
public WVPResult<SyncStatus> devicesSync(String serverId, String deviceId) {
RedisRpcRequest request = buildRequest("device/devicesSync", deviceId);
request.setToId(serverId);
RedisRpcResponse response = redisRpcConfig.request(request, 100, TimeUnit.MILLISECONDS);
return JSON.parseObject(response.getBody().toString(), WVPResult.class);
}
@Override
public SyncStatus getChannelSyncStatus(String serverId, String deviceId) {
RedisRpcRequest request = buildRequest("device/getChannelSyncStatus", deviceId);
request.setToId(serverId);
RedisRpcResponse response = redisRpcConfig.request(request, 100, TimeUnit.MILLISECONDS);
return JSON.parseObject(response.getBody().toString(), SyncStatus.class);
}
@Override
public WVPResult<String> deviceBasicConfig(String serverId, Device device, BasicParam basicParam) {
RedisRpcRequest request = buildRequest("device/deviceBasicConfig", JSONObject.toJSONString(basicParam));
request.setToId(serverId);
RedisRpcResponse response = redisRpcConfig.request(request, 50, TimeUnit.MILLISECONDS);
return JSON.parseObject(response.getBody().toString(), WVPResult.class);
}
@Override
public WVPResult<String> deviceConfigQuery(String serverId, Device device, String channelId, String configType) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("device", device.getDeviceId());
jsonObject.put("channelId", channelId);
jsonObject.put("configType", configType);
RedisRpcRequest request = buildRequest("device/deviceConfigQuery", jsonObject);
request.setToId(serverId);
RedisRpcResponse response = redisRpcConfig.request(request, 50, TimeUnit.MILLISECONDS);
return JSON.parseObject(response.getBody().toString(), WVPResult.class);
}
@Override
public void teleboot(String serverId, Device device) {
RedisRpcRequest request = buildRequest("device/teleboot", device.getDeviceId());
request.setToId(serverId);
RedisRpcResponse response = redisRpcConfig.request(request, 50, TimeUnit.MILLISECONDS);
if (response.getStatusCode() != ErrorCode.SUCCESS.getCode()) {
throw new ControllerException(response.getStatusCode(), response.getBody().toString());
}
}
@Override
public WVPResult<String> recordControl(String serverId, Device device, String channelId, String recordCmdStr) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("device", device.getDeviceId());
jsonObject.put("channelId", channelId);
jsonObject.put("recordCmdStr", recordCmdStr);
RedisRpcRequest request = buildRequest("device/record", jsonObject);
request.setToId(serverId);
RedisRpcResponse response = redisRpcConfig.request(request, 50, TimeUnit.MILLISECONDS);
return JSON.parseObject(response.getBody().toString(), WVPResult.class);
}
@Override
public WVPResult<String> guard(String serverId, Device device, String guardCmdStr) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("device", device.getDeviceId());
jsonObject.put("guardCmdStr", guardCmdStr);
RedisRpcRequest request = buildRequest("device/guard", jsonObject);
request.setToId(serverId);
RedisRpcResponse response = redisRpcConfig.request(request, 50, TimeUnit.MILLISECONDS);
return JSON.parseObject(response.getBody().toString(), WVPResult.class);
}
@Override
public WVPResult<String> resetAlarm(String serverId, Device device, String channelId, String alarmMethod, String alarmType) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("device", device.getDeviceId());
jsonObject.put("channelId", channelId);
jsonObject.put("alarmMethod", alarmMethod);
jsonObject.put("alarmType", alarmType);
RedisRpcRequest request = buildRequest("device/resetAlarm", jsonObject);
request.setToId(serverId);
RedisRpcResponse response = redisRpcConfig.request(request, 50, TimeUnit.MILLISECONDS);
return JSON.parseObject(response.getBody().toString(), WVPResult.class);
}
@Override
public void iFrame(String serverId, Device device, String channelId) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("device", device.getDeviceId());
jsonObject.put("channelId", channelId);
RedisRpcRequest request = buildRequest("device/iFrame", jsonObject);
request.setToId(serverId);
redisRpcConfig.request(request, 50, TimeUnit.MILLISECONDS);
}
@Override
public WVPResult<String> homePosition(String serverId, Device device, String channelId, Boolean enabled, Integer resetTime, Integer presetIndex) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("device", device.getDeviceId());
jsonObject.put("channelId", channelId);
jsonObject.put("enabled", enabled);
jsonObject.put("resetTime", resetTime);
jsonObject.put("presetIndex", presetIndex);
RedisRpcRequest request = buildRequest("device/homePosition", jsonObject);
request.setToId(serverId);
RedisRpcResponse response = redisRpcConfig.request(request, 50, TimeUnit.MILLISECONDS);
return JSON.parseObject(response.getBody().toString(), WVPResult.class);
}
@Override
public void dragZoomIn(String serverId, Device device, String channelId, int length, int width, int midpointx,
int midpointy, int lengthx, int lengthy) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("device", device.getDeviceId());
jsonObject.put("channelId", channelId);
jsonObject.put("length", length);
jsonObject.put("width", width);
jsonObject.put("midpointx", midpointx);
jsonObject.put("midpointy", midpointy);
jsonObject.put("lengthx", lengthx);
jsonObject.put("lengthy", lengthy);
RedisRpcRequest request = buildRequest("device/dragZoomIn", jsonObject);
request.setToId(serverId);
redisRpcConfig.request(request, 50, TimeUnit.MILLISECONDS);
}
@Override
public void dragZoomOut(String serverId, Device device, String channelId, int length, int width, int midpointx, int midpointy, int lengthx, int lengthy) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("device", device.getDeviceId());
jsonObject.put("channelId", channelId);
jsonObject.put("length", length);
jsonObject.put("width", width);
jsonObject.put("midpointx", midpointx);
jsonObject.put("midpointy", midpointy);
jsonObject.put("lengthx", lengthx);
jsonObject.put("lengthy", lengthy);
RedisRpcRequest request = buildRequest("device/dragZoomOut", jsonObject);
request.setToId(serverId);
redisRpcConfig.request(request, 50, TimeUnit.MILLISECONDS);
}
@Override
public WVPResult<String> deviceStatus(String serverId, Device device) {
RedisRpcRequest request = buildRequest("device/deviceStatus", device.getDeviceId());
request.setToId(serverId);
RedisRpcResponse response = redisRpcConfig.request(request, 50, TimeUnit.MILLISECONDS);
return JSON.parseObject(response.getBody().toString(), WVPResult.class);
}
@Override
public WVPResult<Object> deviceInfo(String serverId, Device device) {
RedisRpcRequest request = buildRequest("device/info", device.getDeviceId());
request.setToId(serverId);
RedisRpcResponse response = redisRpcConfig.request(request, 50, TimeUnit.MILLISECONDS);
return JSON.parseObject(response.getBody().toString(), WVPResult.class);
}
@Override
public WVPResult<Object> queryPreset(String serverId, Device device, String channelId) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("device", device.getDeviceId());
jsonObject.put("channelId", channelId);
RedisRpcRequest request = buildRequest("device/queryPreset", jsonObject);
request.setToId(serverId);
RedisRpcResponse response = redisRpcConfig.request(request, 60000, TimeUnit.MILLISECONDS);
return JSON.parseObject(response.getBody().toString(), WVPResult.class);
}
@Override
public WVPResult<String> alarm(String serverId, Device device, String startPriority, String endPriority,
String alarmMethod, String alarmType, String startTime, String endTime) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("device", device.getDeviceId());
// jsonObject.put("channelId", channelId);
jsonObject.put("startPriority", startPriority);
jsonObject.put("endPriority", endPriority);
jsonObject.put("alarmMethod", alarmMethod);
jsonObject.put("alarmType", alarmType);
jsonObject.put("startTime", startTime);
jsonObject.put("endTime", endTime);
RedisRpcRequest request = buildRequest("device/alarm", jsonObject);
request.setToId(serverId);
RedisRpcResponse response = redisRpcConfig.request(request, 50, TimeUnit.MILLISECONDS);
return JSON.parseObject(response.getBody().toString(), WVPResult.class);
}
}