临时提交

This commit is contained in:
648540858
2024-09-06 17:56:24 +08:00
parent 26919170f4
commit da441a67f8
52 changed files with 576 additions and 508 deletions

View File

@@ -51,7 +51,9 @@ public class VideoManagerConstants {
public static final String MEDIA_SSRC_USED_PREFIX = "VMP_MEDIA_USED_SSRC_";
public static final String MEDIA_TRANSACTION_USED_PREFIX = "VMP_MEDIA_TRANSACTION_";
public static final String SIP_INVITE_SESSION = "VMP_SIP_INVITE_SESSION:";
public static final String SIP_INVITE_SESSION_CALL_ID = SIP_INVITE_SESSION + "CALL_ID" + ":";
public static final String SIP_INVITE_SESSION_STREAM = SIP_INVITE_SESSION + "STREAM" + ":";
public static final String MEDIA_STREAM_AUTHORITY = "VMP_MEDIA_STREAM_AUTHORITY_";

View File

@@ -6,7 +6,7 @@ import com.genersoft.iot.vmp.common.VideoManagerConstants;
import lombok.Data;
@Data
public class SendRtpItem {
public class SendRtpInfo {
/**
* 推流ip
@@ -143,8 +143,8 @@ public class SendRtpItem {
*/
private String sessionName;
public static SendRtpItem getInstance(RequestPushStreamMsg requestPushStreamMsg) {
SendRtpItem sendRtpItem = new SendRtpItem();
public static SendRtpInfo getInstance(RequestPushStreamMsg requestPushStreamMsg) {
SendRtpInfo sendRtpItem = new SendRtpInfo();
sendRtpItem.setMediaServerId(requestPushStreamMsg.getMediaServerId());
sendRtpItem.setApp(requestPushStreamMsg.getApp());
sendRtpItem.setStream(requestPushStreamMsg.getStream());
@@ -160,8 +160,8 @@ public class SendRtpItem {
}
public static SendRtpItem getInstance(String app, String stream, String ssrc, String dstIp, Integer dstPort, boolean tcp, int sendLocalPort, Integer pt) {
SendRtpItem sendRtpItem = new SendRtpItem();
public static SendRtpInfo getInstance(String app, String stream, String ssrc, String dstIp, Integer dstPort, boolean tcp, int sendLocalPort, Integer pt) {
SendRtpInfo sendRtpItem = new SendRtpInfo();
sendRtpItem.setApp(app);
sendRtpItem.setStream(stream);
sendRtpItem.setSsrc(ssrc);

View File

@@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.gb28181.bean;
import com.genersoft.iot.vmp.common.InviteSessionType;
import gov.nist.javax.sip.message.SIPResponse;
import lombok.Data;
@Data
@@ -18,18 +19,31 @@ public class SsrcTransaction {
private InviteSessionType type;
public SsrcTransaction(String deviceId, String platformId, Integer channelId, String callId,
String stream, String mediaServerId, String ssrc,
SipTransactionInfo sipTransactionInfo, InviteSessionType type) {
this.deviceId = deviceId;
this.platformId = platformId;
this.channelId = channelId;
this.callId = callId;
this.stream = stream;
this.mediaServerId = mediaServerId;
this.ssrc = ssrc;
this.sipTransactionInfo = sipTransactionInfo;
this.type = type;
public static SsrcTransaction buildForDevice(String deviceId, Integer channelId, String callId, String stream,
String ssrc, String mediaServerId, SIPResponse response, InviteSessionType type) {
SsrcTransaction ssrcTransaction = new SsrcTransaction();
ssrcTransaction.setDeviceId(deviceId);
ssrcTransaction.setChannelId(channelId);
ssrcTransaction.setCallId(callId);
ssrcTransaction.setStream(stream);
ssrcTransaction.setMediaServerId(mediaServerId);
ssrcTransaction.setSsrc(ssrc);
ssrcTransaction.setSipTransactionInfo(new SipTransactionInfo(response));
ssrcTransaction.setType(type);
return ssrcTransaction;
}
public static SsrcTransaction buildForPlatform(String platformId, Integer channelId, String callId, String stream,
String ssrc, String mediaServerId, SIPResponse response, InviteSessionType type) {
SsrcTransaction ssrcTransaction = new SsrcTransaction();
ssrcTransaction.setPlatformId(platformId);
ssrcTransaction.setChannelId(channelId);
ssrcTransaction.setCallId(callId);
ssrcTransaction.setStream(stream);
ssrcTransaction.setMediaServerId(mediaServerId);
ssrcTransaction.setSsrc(ssrc);
ssrcTransaction.setSipTransactionInfo(new SipTransactionInfo(response));
ssrcTransaction.setType(type);
return ssrcTransaction;
}
public SsrcTransaction() {

View File

@@ -13,7 +13,7 @@ import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService;
import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService;
import com.genersoft.iot.vmp.gb28181.service.IPlayService;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.media.bean.MediaServer;
@@ -52,7 +52,7 @@ import java.util.UUID;
public class PlayController {
@Autowired
private VideoStreamSessionManager streamSession;
private SipInviteSessionManager sessionManager;
@Autowired
private IInviteStreamService inviteStreamService;
@@ -240,7 +240,7 @@ public class PlayController {
log.debug("获取所有的ssrc");
}
JSONArray objects = new JSONArray();
List<SsrcTransaction> allSsrc = streamSession.getAllSsrc();
List<SsrcTransaction> allSsrc = sessionManager.getAll();
for (SsrcTransaction transaction : allSsrc) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("deviceId", transaction.getDeviceId());

View File

@@ -749,4 +749,57 @@ public interface DeviceChannelMapper {
@Update(value = {"UPDATE wvp_device_channel SET stream_id=null WHERE id=#{channelId}"})
void stopPlayById(@Param("channelId") Integer channelId);
@Select(value = {" <script>" +
" SELECT " +
" id,\n" +
" device_db_id,\n" +
" create_time,\n" +
" update_time,\n" +
" sub_count,\n" +
" stream_id,\n" +
" has_audio,\n" +
" gps_time,\n" +
" stream_identification,\n" +
" channel_type,\n" +
" coalesce(gb_device_id, device_id) as device_id,\n" +
" coalesce(gb_name, name) as name,\n" +
" coalesce(gb_manufacturer, manufacturer) as manufacturer,\n" +
" coalesce(gb_model, model) as model,\n" +
" coalesce(gb_owner, owner) as owner,\n" +
" coalesce(gb_civil_code, civil_code) as civil_code,\n" +
" coalesce(gb_block, block) as block,\n" +
" coalesce(gb_address, address) as address,\n" +
" coalesce(gb_parental, parental) as parental,\n" +
" coalesce(gb_parent_id, parent_id) as parent_id,\n" +
" coalesce(gb_safety_way, safety_way) as safety_way,\n" +
" coalesce(gb_register_way, register_way) as register_way,\n" +
" coalesce(gb_cert_num, cert_num) as cert_num,\n" +
" coalesce(gb_certifiable, certifiable) as certifiable,\n" +
" coalesce(gb_err_code, err_code) as err_code,\n" +
" coalesce(gb_end_time, end_time) as end_time,\n" +
" coalesce(gb_secrecy, secrecy) as secrecy,\n" +
" coalesce(gb_ip_address, ip_address) as ip_address,\n" +
" coalesce(gb_port, port) as port,\n" +
" coalesce(gb_password, password) as password,\n" +
" coalesce(gb_status, status) as status,\n" +
" coalesce(gb_longitude, longitude) as longitude,\n" +
" coalesce(gb_latitude, latitude) as latitude,\n" +
" coalesce(gb_ptz_type, ptz_type) as ptz_type,\n" +
" coalesce(gb_position_type, position_type) as position_type,\n" +
" coalesce(gb_room_type, room_type) as room_type,\n" +
" coalesce(gb_use_type, use_type) as use_type,\n" +
" coalesce(gb_supply_light_type, supply_light_type) as supply_light_type,\n" +
" coalesce(gb_direction_type, direction_type) as direction_type,\n" +
" coalesce(gb_resolution, resolution) as resolution,\n" +
" coalesce(gb_business_group_id, business_group_id) as business_group_id,\n" +
" coalesce(gb_download_speed, download_speed) as download_speed,\n" +
" coalesce(gb_svc_space_support_mod, svc_space_support_mod) as svc_space_support_mod,\n" +
" coalesce(gb_svc_time_support_mode, svc_time_support_mode) as svc_time_support_mode\n" +
" from wvp_device_channel " +
" where device_db_id=#{deviceDbId}" +
" </script>"})
List<DeviceChannel> getByDeviceId(@Param("deviceDbId") int deviceDbId);
}

View File

@@ -121,4 +121,6 @@ public interface IDeviceChannelService {
DeviceChannel getRawChannel(int id);
DeviceChannel getOneById(Integer channelId);
DeviceChannel getBroadcastChannel(int deviceDbId);
}

View File

@@ -51,9 +51,9 @@ public interface IPlayService {
void resumeRtp(String streamId) throws ServiceException, InvalidArgumentException, ParseException, SipException;
void startPushStream(SendRtpItem sendRtpItem, SIPResponse sipResponse, Platform platform, CallIdHeader callIdHeader);
void startPushStream(SendRtpInfo sendRtpItem, SIPResponse sipResponse, Platform platform, CallIdHeader callIdHeader);
void startSendRtpStreamFailHand(SendRtpItem sendRtpItem, Platform platform, CallIdHeader callIdHeader);
void startSendRtpStreamFailHand(SendRtpInfo sendRtpItem, Platform platform, CallIdHeader callIdHeader);
void talkCmd(Device device, String channelId, MediaServer mediaServerItem, String stream, AudioBroadcastEvent event);

View File

@@ -19,6 +19,7 @@ import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService;
import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService;
import com.genersoft.iot.vmp.gb28181.service.IPlatformChannelService;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
@@ -606,4 +607,19 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
public DeviceChannel getOneById(Integer channelId) {
return channelMapper.getOne(channelId);
}
@Override
public DeviceChannel getBroadcastChannel(int deviceDbId) {
List<DeviceChannel> channels = channelMapper.getByDeviceId(deviceDbId);
if (channels.size() == 1) {
return channels.get(0);
}
for (DeviceChannel channel : channels) {
// 获取137类型的
if (SipUtils.isFrontEnd(channel.getDeviceId())) {
return channel;
}
}
return null;
}
}

View File

@@ -11,7 +11,7 @@ import com.genersoft.iot.vmp.gb28181.dao.DeviceChannelMapper;
import com.genersoft.iot.vmp.gb28181.dao.DeviceMapper;
import com.genersoft.iot.vmp.gb28181.dao.PlatformChannelMapper;
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager;
import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
import com.genersoft.iot.vmp.gb28181.task.impl.CatalogSubscribeTask;
import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeTask;
@@ -95,7 +95,7 @@ public class DeviceServiceImpl implements IDeviceService {
private ISIPCommander commander;
@Autowired
private VideoStreamSessionManager streamSession;
private SipInviteSessionManager sessionManager;
@Autowired
private IMediaServerService mediaServerService;
@@ -223,12 +223,12 @@ public class DeviceServiceImpl implements IDeviceService {
//进行通道离线
// deviceChannelMapper.offlineByDeviceId(deviceId);
// 离线释放所有ssrc
List<SsrcTransaction> ssrcTransactions = streamSession.getSsrcTransactionForAll(deviceId, null, null, null);
List<SsrcTransaction> ssrcTransactions = sessionManager.getSsrcTransactionByDeviceId(deviceId);
if (ssrcTransactions != null && ssrcTransactions.size() > 0) {
for (SsrcTransaction ssrcTransaction : ssrcTransactions) {
mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc());
mediaServerService.closeRTPServer(ssrcTransaction.getMediaServerId(), ssrcTransaction.getStream());
streamSession.removeByCallId(deviceId, ssrcTransaction.getChannelId(), ssrcTransaction.getCallId());
sessionManager.removeByCallId(ssrcTransaction.getCallId());
}
}
// 移除订阅
@@ -239,7 +239,7 @@ public class DeviceServiceImpl implements IDeviceService {
if (audioBroadcastCatches.size() > 0) {
for (AudioBroadcastCatch audioBroadcastCatch : audioBroadcastCatches) {
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(deviceId, audioBroadcastCatch.getChannelId(), null, null);
SendRtpInfo sendRtpItem = redisCatchStorage.querySendRTPServer(deviceId, audioBroadcastCatch.getChannelId(), null, null);
if (sendRtpItem != null) {
redisCatchStorage.deleteSendRTPServer(deviceId, sendRtpItem.getChannelId(), null, null);
MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());

View File

@@ -14,7 +14,7 @@ import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService;
import com.genersoft.iot.vmp.gb28181.service.IPlatformService;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
@@ -89,7 +89,7 @@ public class PlatformServiceImpl implements IPlatformService {
private UserSetting userSetting;
@Autowired
private VideoStreamSessionManager streamSession;
private SipInviteSessionManager sessionManager;
@Autowired
private IInviteStreamService inviteStreamService;
@@ -106,9 +106,9 @@ public class PlatformServiceImpl implements IPlatformService {
@Async("taskExecutor")
@EventListener
public void onApplicationEvent(MediaDepartureEvent event) {
List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(event.getStream());
List<SendRtpInfo> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(event.getStream());
if (!sendRtpItems.isEmpty()) {
for (SendRtpItem sendRtpItem : sendRtpItems) {
for (SendRtpInfo sendRtpItem : sendRtpItems) {
if (sendRtpItem != null && sendRtpItem.getApp().equals(event.getApp())) {
String platformId = sendRtpItem.getPlatformId();
Platform platform = platformMapper.getParentPlatByServerGBId(platformId);
@@ -133,9 +133,9 @@ public class PlatformServiceImpl implements IPlatformService {
@Async("taskExecutor")
@EventListener
public void onApplicationEvent(MediaSendRtpStoppedEvent event) {
List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(event.getStream());
List<SendRtpInfo> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(event.getStream());
if (sendRtpItems != null && !sendRtpItems.isEmpty()) {
for (SendRtpItem sendRtpItem : sendRtpItems) {
for (SendRtpInfo sendRtpItem : sendRtpItems) {
Platform platform = platformMapper.getParentPlatByServerGBId(sendRtpItem.getPlatformId());
ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc());
try {
@@ -414,9 +414,9 @@ public class PlatformServiceImpl implements IPlatformService {
}
private void stopAllPush(String platformId) {
List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServer(platformId);
List<SendRtpInfo> sendRtpItems = redisCatchStorage.querySendRTPServer(platformId);
if (sendRtpItems != null && sendRtpItems.size() > 0) {
for (SendRtpItem sendRtpItem : sendRtpItems) {
for (SendRtpInfo sendRtpItem : sendRtpItems) {
ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc());
redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(), null, null);
MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
@@ -558,7 +558,7 @@ public class PlatformServiceImpl implements IPlatformService {
timeoutCallback.run(1, "收流超时");
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
streamSession.remove(platform.getServerGBId(), channel.getGbDeviceId(), ssrcInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getStream());
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
}
}
@@ -684,7 +684,7 @@ public class PlatformServiceImpl implements IPlatformService {
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
streamSession.remove(platform.getServerGBId(), channel.getGbDeviceId(), ssrcInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getStream());
callback.run(InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
"下级自定义了ssrc,重新设置收流信息失败", null);
@@ -724,9 +724,8 @@ public class PlatformServiceImpl implements IPlatformService {
if (ssrcInResponse != null) {
// 单端口
// 重新订阅流上线
SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(inviteInfo.getChannelId(), null, inviteInfo.getStream());
streamSession.remove(inviteInfo.getDeviceId(),
inviteInfo.getChannelId(), inviteInfo.getStream());
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream(inviteInfo.getStream());
sessionManager.removeByStream(inviteInfo.getStream());
inviteStreamService.updateInviteInfoForSSRC(inviteInfo, ssrcInResponse);
ssrcTransaction.setPlatformId(platform.getServerGBId());
@@ -737,7 +736,7 @@ public class PlatformServiceImpl implements IPlatformService {
ssrcTransaction.setSipTransactionInfo(new SipTransactionInfo((SIPResponse) responseEvent.getResponse()));
ssrcTransaction.setType(inviteSessionType);
streamSession.put(ssrcTransaction);
sessionManager.put(ssrcTransaction);
}
}
}
@@ -782,7 +781,7 @@ public class PlatformServiceImpl implements IPlatformService {
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
streamSession.remove(platform.getServerGBId(), channelId, ssrcInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getStream());
callback.run(InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
@@ -797,19 +796,19 @@ public class PlatformServiceImpl implements IPlatformService {
try {
if (sendBye) {
commanderForPlatform.streamByeCmd(platform, channel.getGbDeviceId(), stream, null, null);
commanderForPlatform.streamByeCmd(platform, channel, stream, null, null);
}
} catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
log.warn("[消息发送失败] 停止语音对讲, 平台:{},通道:{}", platform.getId(), channel.getGbDeviceId() );
} finally {
mediaServerService.closeRTPServer(mediaServerItem, stream);
InviteInfo inviteInfo = inviteStreamService.getInviteInfo(null, platform.getServerGBId(), channel.getGbDeviceId(), stream);
InviteInfo inviteInfo = inviteStreamService.getInviteInfo(null, channel.getGbId(), stream);
if (inviteInfo != null) {
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), inviteInfo.getSsrcInfo().getSsrc());
inviteStreamService.removeInviteInfo(inviteInfo);
}
streamSession.remove(platform.getServerGBId(), channel.getGbDeviceId(), stream);
sessionManager.removeByStream(stream);
}
}

View File

@@ -14,7 +14,7 @@ import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.service.*;
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
@@ -100,7 +100,7 @@ public class PlayServiceImpl implements IPlayService {
private IMediaServerService mediaServerService;
@Autowired
private VideoStreamSessionManager streamSession;
private SipInviteSessionManager sessionManager;
@Autowired
private UserSetting userSetting;
@@ -169,15 +169,16 @@ public class PlayServiceImpl implements IPlayService {
@Async("taskExecutor")
@EventListener
public void onApplicationEvent(MediaDepartureEvent event) {
List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(event.getStream());
List<SendRtpInfo> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(event.getStream());
if (!sendRtpItems.isEmpty()) {
for (SendRtpItem sendRtpItem : sendRtpItems) {
for (SendRtpInfo sendRtpItem : sendRtpItems) {
if (sendRtpItem != null && sendRtpItem.getApp().equals(event.getApp())) {
String platformId = sendRtpItem.getPlatformId();
Device device = deviceService.getDeviceByDeviceId(platformId);
DeviceChannel channel = channelService.getOneById(sendRtpItem.getChannelId());
try {
if (device != null) {
cmder.streamByeCmd(device, sendRtpItem.getChannelId(), event.getStream(), sendRtpItem.getCallId());
if (device != null && channel != null) {
cmder.streamByeCmd(device, channel.getDeviceId(), event.getStream(), sendRtpItem.getCallId());
if (sendRtpItem.getPlayType().equals(InviteStreamType.BROADCAST)
|| sendRtpItem.getPlayType().equals(InviteStreamType.TALK)) {
AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.getByDeviceId(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
@@ -345,7 +346,7 @@ public class PlayServiceImpl implements IPlayService {
audioEvent.call("ssrc已经用尽");
return;
}
SendRtpItem sendRtpItem = new SendRtpItem();
SendRtpInfo sendRtpItem = new SendRtpInfo();
sendRtpItem.setApp("talk");
sendRtpItem.setStream(stream);
sendRtpItem.setSsrc(playSsrc);
@@ -388,7 +389,7 @@ public class PlayServiceImpl implements IPlayService {
} finally {
timeoutCallback.run();
mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc());
streamSession.remove(device.getDeviceId(), channelId, sendRtpItem.getStream());
sessionManager.removeByStream(sendRtpItem.getStream());
}
}, userSetting.getPlayTimeout());
@@ -425,9 +426,11 @@ public class PlayServiceImpl implements IPlayService {
sendRtpItem.setCallId(response.getCallIdHeader().getCallId());
redisCatchStorage.updateSendRTPSever(sendRtpItem);
streamSession.put(device.getDeviceId(), channelId, "talk",
SsrcTransaction ssrcTransaction = SsrcTransaction.buildForDevice(device.getDeviceId(), channelId, "talk",
sendRtpItem.getStream(), sendRtpItem.getSsrc(), sendRtpItem.getMediaServerId(),
response, InviteSessionType.TALK);
sessionManager.put(ssrcTransaction);
} else {
log.error("[语音对讲]收到的消息错误response不是SIPResponse");
}
@@ -440,7 +443,7 @@ public class PlayServiceImpl implements IPlayService {
mediaServerService.closeRTPServer(mediaServerItem, sendRtpItem.getStream());
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc());
streamSession.remove(device.getDeviceId(), channelId, sendRtpItem.getStream());
sessionManager.removeByStream(sendRtpItem.getStream());
errorEvent.response(event);
});
} catch (InvalidArgumentException | SipException | ParseException e) {
@@ -451,7 +454,7 @@ public class PlayServiceImpl implements IPlayService {
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc());
streamSession.remove(device.getDeviceId(), channelId, sendRtpItem.getStream());
sessionManager.removeByStream(sendRtpItem.getStream());
SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult();
eventResult.type = SipSubscribe.EventResultType.cmdSendFailEvent;
eventResult.statusCode = -1;
@@ -484,7 +487,7 @@ public class PlayServiceImpl implements IPlayService {
log.info("[点播端口分配异常]deviceId={},channelId={},ssrcInfo={}", device.getDeviceId(), channel.getDeviceId(), ssrcInfo);
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
streamSession.remove(device.getDeviceId(), channel.getDeviceId(), ssrcInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getStream());
if (callback != null) {
callback.run(InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), "点播端口分配异常", null);
}
@@ -521,7 +524,7 @@ public class PlayServiceImpl implements IPlayService {
} finally {
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
streamSession.remove(device.getDeviceId(), channel.getDeviceId(), ssrcInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getStream());
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
// 取消订阅消息监听
subscribe.removeSubscribe(Hook.getInstance(HookType.on_media_arrival, "rtp", ssrcInfo.getStream()));
@@ -534,7 +537,7 @@ public class PlayServiceImpl implements IPlayService {
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
mediaServerService.closeRTPServer(mediaServerItem.getId(), ssrcInfo.getStream());
streamSession.remove(device.getDeviceId(), channel.getDeviceId(), ssrcInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getStream());
}
}, userSetting.getPlayTimeout());
@@ -576,7 +579,7 @@ public class PlayServiceImpl implements IPlayService {
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
streamSession.remove(device.getDeviceId(), channel.getDeviceId(), ssrcInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getStream());
if (callback != null) {
callback.run(event.statusCode, event.msg, null);
}
@@ -594,7 +597,7 @@ public class PlayServiceImpl implements IPlayService {
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
streamSession.remove(device.getDeviceId(), channel.getDeviceId(), ssrcInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getStream());
if (callback != null) {
callback.run(InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getCode(),
InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getMsg(), null);
@@ -644,7 +647,7 @@ public class PlayServiceImpl implements IPlayService {
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getStream());
callback.run(InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
@@ -659,7 +662,7 @@ public class PlayServiceImpl implements IPlayService {
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getStream());
callback.run(InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
@@ -819,7 +822,7 @@ public class PlayServiceImpl implements IPlayService {
// 点播超时回复BYE 同时释放ssrc以及此次点播的资源
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
streamSession.remove(deviceId, channel.getGbDeviceId(), ssrcInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getStream());
}
}, userSetting.getPlayTimeout());
@@ -830,7 +833,7 @@ public class PlayServiceImpl implements IPlayService {
String.format("回放失败, 错误码: %s, %s", event.statusCode, event.msg), null);
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
streamSession.remove(device.getDeviceId(), channel.getGbDeviceId(), ssrcInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getStream());
inviteStreamService.removeInviteInfo(inviteInfo);
};
@@ -916,7 +919,7 @@ public class PlayServiceImpl implements IPlayService {
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getStream());
callback.run(InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
"下级自定义了ssrc,重新设置收流信息失败", null);
@@ -942,13 +945,18 @@ public class PlayServiceImpl implements IPlayService {
if (ssrcInResponse != null) {
// 单端口
// 重新订阅流上线
SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(inviteInfo.getDeviceId(),
inviteInfo.getChannelId(), null, inviteInfo.getStream());
streamSession.remove(inviteInfo.getDeviceId(),
inviteInfo.getChannelId(), inviteInfo.getStream());
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream(inviteInfo.getStream());
sessionManager.removeByStream(inviteInfo.getStream());
inviteStreamService.updateInviteInfoForSSRC(inviteInfo, ssrcInResponse);
streamSession.put(device.getDeviceId(), channelId, ssrcTransaction.getCallId(),
inviteInfo.getStream(), ssrcInResponse, mediaServerItem.getId(), (SIPResponse) responseEvent.getResponse(), inviteSessionType);
ssrcTransaction.setDeviceId(device.getDeviceId());
ssrcTransaction.setChannelId(ssrcTransaction.getChannelId());
ssrcTransaction.setCallId(ssrcTransaction.getCallId());
ssrcTransaction.setSsrc(ssrcInResponse);
ssrcTransaction.setMediaServerId(mediaServerItem.getId());
ssrcTransaction.setSipTransactionInfo(new SipTransactionInfo((SIPResponse) responseEvent.getResponse()));
ssrcTransaction.setType(inviteSessionType);
sessionManager.put(ssrcTransaction);
}
}
}
@@ -1017,7 +1025,7 @@ public class PlayServiceImpl implements IPlayService {
} catch (SsrcTransactionNotFoundException e) {
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getStream());
}
}, userSetting.getPlayTimeout());
@@ -1026,7 +1034,7 @@ public class PlayServiceImpl implements IPlayService {
callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(),
String.format("录像下载失败, 错误码: %s, %s", event.statusCode, event.msg), null);
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getStream());
inviteStreamService.removeInviteInfo(inviteInfo);
};
HookSubscribe.Event hookEvent = (hookData) -> {
@@ -1150,9 +1158,9 @@ public class PlayServiceImpl implements IPlayService {
@Override
public void zlmServerOffline(String mediaServerId) {
// 处理正在向上推流的上级平台
List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServer(null);
List<SendRtpInfo> sendRtpItems = redisCatchStorage.querySendRTPServer(null);
if (sendRtpItems.size() > 0) {
for (SendRtpItem sendRtpItem : sendRtpItems) {
for (SendRtpInfo sendRtpItem : sendRtpItems) {
if (sendRtpItem.getMediaServerId().equals(mediaServerId)) {
Platform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId());
try {
@@ -1164,7 +1172,7 @@ public class PlayServiceImpl implements IPlayService {
}
}
// 处理正在观看的国标设备
List<SsrcTransaction> allSsrc = streamSession.getAllSsrc();
List<SsrcTransaction> allSsrc = sessionManager.getAll();
if (allSsrc.size() > 0) {
for (SsrcTransaction ssrcTransaction : allSsrc) {
if (ssrcTransaction.getMediaServerId().equals(mediaServerId)) {
@@ -1224,7 +1232,7 @@ public class PlayServiceImpl implements IPlayService {
}
// 查询通道使用状态
if (audioBroadcastManager.exit(device.getDeviceId(), channelId)) {
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null);
SendRtpInfo sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null);
if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) {
// 查询流是否存在,不存在则认为是异常状态
Boolean streamReady = mediaServerService.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream());
@@ -1264,7 +1272,7 @@ public class PlayServiceImpl implements IPlayService {
@Override
public boolean audioBroadcastInUse(Device device, String channelId) {
if (audioBroadcastManager.exit(device.getDeviceId(), channelId)) {
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null);
SendRtpInfo sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null);
if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) {
// 查询流是否存在,不存在则认为是异常状态
MediaServer mediaServerServiceOne = mediaServerService.getOne(sendRtpItem.getMediaServerId());
@@ -1294,7 +1302,7 @@ public class PlayServiceImpl implements IPlayService {
if (device == null || audioBroadcastCatch == null) {
return;
}
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(deviceId, audioBroadcastCatch.getChannelId(), null, null);
SendRtpInfo sendRtpItem = redisCatchStorage.querySendRTPServer(deviceId, audioBroadcastCatch.getChannelId(), null, null);
if (sendRtpItem != null) {
redisCatchStorage.deleteSendRTPServer(deviceId, sendRtpItem.getChannelId(), null, null);
MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
@@ -1422,7 +1430,7 @@ public class PlayServiceImpl implements IPlayService {
}
@Override
public void startPushStream(SendRtpItem sendRtpItem, SIPResponse sipResponse, Platform platform, CallIdHeader callIdHeader) {
public void startPushStream(SendRtpInfo sendRtpItem, SIPResponse sipResponse, Platform platform, CallIdHeader callIdHeader) {
// 开始发流
MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
@@ -1447,7 +1455,7 @@ public class PlayServiceImpl implements IPlayService {
}
@Override
public void startSendRtpStreamFailHand(SendRtpItem sendRtpItem, Platform platform, CallIdHeader callIdHeader) {
public void startSendRtpStreamFailHand(SendRtpInfo sendRtpItem, Platform platform, CallIdHeader callIdHeader) {
if (sendRtpItem.isOnlyAudio()) {
Device device = deviceService.getDeviceByDeviceId(sendRtpItem.getDeviceId());
AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getChannelId());
@@ -1487,7 +1495,7 @@ public class PlayServiceImpl implements IPlayService {
}
// 查询通道使用状态
if (audioBroadcastManager.exit(device.getDeviceId(), channelId)) {
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null);
SendRtpInfo sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null);
if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) {
// 查询流是否存在,不存在则认为是异常状态
MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
@@ -1502,7 +1510,7 @@ public class PlayServiceImpl implements IPlayService {
}
}
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, stream, null);
SendRtpInfo sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, stream, null);
if (sendRtpItem != null) {
MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
Boolean streamReady = mediaServerService.isStreamReady(mediaServer, "rtp", sendRtpItem.getReceiveStream());
@@ -1538,7 +1546,7 @@ public class PlayServiceImpl implements IPlayService {
@Override
public void stopTalk(Device device, String channelId, Boolean streamIsReady) {
log.info("[语音对讲] 停止, {}/{}", device.getDeviceId(), channelId);
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null);
SendRtpInfo sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null);
if (sendRtpItem == null) {
log.info("[语音对讲] 停止失败, 未找到发送信息,可能已经停止");
return;
@@ -1557,7 +1565,7 @@ public class PlayServiceImpl implements IPlayService {
ssrcFactory.releaseSsrc(mediaServerId, sendRtpItem.getSsrc());
SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, null, sendRtpItem.getStream());
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream(sendRtpItem.getStream());
if (ssrcTransaction != null) {
try {
cmder.streamByeCmd(device, channelId, sendRtpItem.getStream(), null);
@@ -1632,7 +1640,6 @@ public class PlayServiceImpl implements IPlayService {
}
inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
channelService.stopPlay(device.getDeviceId(), channelId);
channelService.stopPlay(device.getDeviceId(), channelId);
if (inviteInfo.getStreamInfo() != null) {
mediaServerService.closeRTPServer(inviteInfo.getStreamInfo().getMediaServer(), inviteInfo.getStream());
}

View File

@@ -0,0 +1,101 @@
package com.genersoft.iot.vmp.gb28181.session;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
/**
* 视频流session管理器管理视频预览、预览回放的通信句柄
*/
@Component
public class SipInviteSessionManager {
@Autowired
private UserSetting userSetting;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
/**
* 添加一个点播/回放的事务信息
*/
public void put(SsrcTransaction ssrcTransaction){
redisTemplate.opsForValue().set(VideoManagerConstants.SIP_INVITE_SESSION_STREAM + userSetting.getServerId()
+ ":" + ssrcTransaction.getStream(), ssrcTransaction);
redisTemplate.opsForValue().set(VideoManagerConstants.SIP_INVITE_SESSION_CALL_ID + userSetting.getServerId()
+ ":" + ssrcTransaction.getCallId(), ssrcTransaction);
}
public SsrcTransaction getSsrcTransactionByStream(String stream){
String key = VideoManagerConstants.SIP_INVITE_SESSION_STREAM + userSetting.getServerId() + ":" + stream;
return (SsrcTransaction)redisTemplate.opsForValue().get(key);
}
public SsrcTransaction getSsrcTransactionByCallId(String callId){
String key = VideoManagerConstants.SIP_INVITE_SESSION_CALL_ID + userSetting.getServerId() + ":" + callId;
return (SsrcTransaction)redisTemplate.opsForValue().get(key);
}
public List<SsrcTransaction> getSsrcTransactionByDeviceId(String deviceId){
String key = VideoManagerConstants.SIP_INVITE_SESSION_CALL_ID + userSetting.getServerId() + ":*";
List<Object> scanResult = RedisUtil.scan(redisTemplate, key);
if (scanResult.isEmpty()) {
return new ArrayList<>();
}
List<SsrcTransaction> result = new ArrayList<>();
for (Object keyObj : scanResult) {
SsrcTransaction ssrcTransaction = (SsrcTransaction)redisTemplate.opsForValue().get(keyObj);
if (ssrcTransaction != null && ssrcTransaction.getDeviceId().equals(deviceId)) {
result.add(ssrcTransaction);
}
}
return result;
}
public void removeByStream(String stream) {
SsrcTransaction ssrcTransaction = getSsrcTransactionByStream(stream);
if (ssrcTransaction == null ) {
return;
}
redisTemplate.delete(VideoManagerConstants.SIP_INVITE_SESSION_STREAM + userSetting.getServerId() + ":" + stream);
if (ssrcTransaction.getCallId() != null) {
redisTemplate.delete(VideoManagerConstants.SIP_INVITE_SESSION_CALL_ID + userSetting.getServerId() + ":" + ssrcTransaction.getCallId());
}
}
public void removeByCallId(String callId) {
SsrcTransaction ssrcTransaction = getSsrcTransactionByCallId(callId);
if (ssrcTransaction == null ) {
return;
}
redisTemplate.delete(VideoManagerConstants.SIP_INVITE_SESSION_CALL_ID + userSetting.getServerId() + ":" + callId);
if (ssrcTransaction.getStream() != null) {
redisTemplate.delete(VideoManagerConstants.SIP_INVITE_SESSION_STREAM + userSetting.getServerId() + ":" + ssrcTransaction.getStream());
}
}
public List<SsrcTransaction> getAll() {
String key = VideoManagerConstants.SIP_INVITE_SESSION_CALL_ID + userSetting.getServerId() + ":*";
List<Object> scanResult = RedisUtil.scan(redisTemplate, key);
if (scanResult.isEmpty()) {
return new ArrayList<>();
}
List<SsrcTransaction> result = new ArrayList<>();
for (Object keyObj : scanResult) {
SsrcTransaction ssrcTransaction = (SsrcTransaction)redisTemplate.opsForValue().get(keyObj);
result.add(ssrcTransaction);
}
return result;
}
}

View File

@@ -1,172 +0,0 @@
package com.genersoft.iot.vmp.gb28181.session;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo;
import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction;
import com.genersoft.iot.vmp.utils.JsonUtil;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import gov.nist.javax.sip.message.SIPResponse;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import java.util.ArrayList;
import java.util.List;
/**
* 视频流session管理器管理视频预览、预览回放的通信句柄
*/
@Component
public class VideoStreamSessionManager {
@Autowired
private UserSetting userSetting;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
/**
* 添加一个点播/回放的事务信息
* 后续可以通过流Id/callID
* @param deviceId 设备ID
* @param channelId 通道ID
* @param callId 一次请求的CallID
* @param stream 流名称
* @param mediaServerId 所使用的流媒体ID
* @param response 回复
*/
public void put(String deviceId, Integer channelId, String callId, String stream, String ssrc, String mediaServerId, SIPResponse response, InviteSessionType type){
SsrcTransaction ssrcTransaction = new SsrcTransaction();
ssrcTransaction.setDeviceId(deviceId);
ssrcTransaction.setChannelId(channelId);
ssrcTransaction.setStream(stream);
ssrcTransaction.setSipTransactionInfo(new SipTransactionInfo(response));
ssrcTransaction.setCallId(callId);
ssrcTransaction.setSsrc(ssrc);
ssrcTransaction.setMediaServerId(mediaServerId);
ssrcTransaction.setType(type);
redisTemplate.opsForValue().set(VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX + userSetting.getServerId()
+ ":" + deviceId + ":" + channelId + ":" + callId + ":" + stream, ssrcTransaction);
}
public void put(SsrcTransaction ssrcTransaction){
redisTemplate.opsForValue().set(VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX + userSetting.getServerId()
+ ":" + ssrcTransaction.getChannelId() + ":" + ssrcTransaction.getCallId() + ":" + ssrcTransaction.getStream(), ssrcTransaction);
}
public SsrcTransaction getSsrcTransaction(Integer channelId, String callId, String stream){
String chanelStr = channelId==null?"*":channelId.toString();
if (ObjectUtils.isEmpty(callId)) {
callId ="*";
}
if (ObjectUtils.isEmpty(stream)) {
stream ="*";
}
String key = VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX + userSetting.getServerId() + ":" + chanelStr + ":" + callId+ ":" + stream;
List<Object> scanResult = RedisUtil.scan(redisTemplate, key);
if (scanResult.size() == 0) {
return null;
}
return (SsrcTransaction)redisTemplate.opsForValue().get(scanResult.get(0));
}
public SsrcTransaction getSsrcTransactionByCallId(String callId){
if (ObjectUtils.isEmpty(callId)) {
return null;
}
String key = VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX + userSetting.getServerId() + ":*:" + callId+ ":*";
List<Object> scanResult = RedisUtil.scan(redisTemplate, key);
if (!scanResult.isEmpty()) {
return (SsrcTransaction)redisTemplate.opsForValue().get(scanResult.get(0));
}else {
key = VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX + userSetting.getServerId() + ":*:play:*";
scanResult = RedisUtil.scan(redisTemplate, key);
if (scanResult.isEmpty()) {
return null;
}
for (Object keyObj : scanResult) {
SsrcTransaction ssrcTransaction = (SsrcTransaction)redisTemplate.opsForValue().get(keyObj);
if (ssrcTransaction.getSipTransactionInfo() != null &&
ssrcTransaction.getSipTransactionInfo().getCallId().equals(callId)) {
return ssrcTransaction;
}
}
return null;
}
}
public List<SsrcTransaction> getSsrcTransactionForAll(Integer channelId, String callId, String stream){
String chanelStr = channelId==null?"*":channelId.toString();
if (ObjectUtils.isEmpty(callId)) {
callId ="*";
}
if (ObjectUtils.isEmpty(stream)) {
stream ="*";
}
String key = VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX + userSetting.getServerId() + ":" + chanelStr + ":" + callId+ ":" + stream;
List<Object> scanResult = RedisUtil.scan(redisTemplate, key);
if (scanResult.size() == 0) {
return null;
}
List<SsrcTransaction> result = new ArrayList<>();
for (Object keyObj : scanResult) {
result.add((SsrcTransaction)redisTemplate.opsForValue().get(keyObj));
}
return result;
}
public String getMediaServerId(Integer channelId, String stream){
SsrcTransaction ssrcTransaction = getSsrcTransaction( channelId, null, stream);
if (ssrcTransaction == null) {
return null;
}
return ssrcTransaction.getMediaServerId();
}
public String getSSRC(Integer channelId, String stream){
SsrcTransaction ssrcTransaction = getSsrcTransaction(channelId, null, stream);
if (ssrcTransaction == null) {
return null;
}
return ssrcTransaction.getSsrc();
}
public void remove(String deviceId, Integer channelId, String stream) {
List<SsrcTransaction> ssrcTransactionList = getSsrcTransactionForAll(channelId, null, stream);
if (ssrcTransactionList == null || ssrcTransactionList.isEmpty()) {
return;
}
for (SsrcTransaction ssrcTransaction : ssrcTransactionList) {
redisTemplate.delete(VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX + userSetting.getServerId() + ":"
+ deviceId + ":" + channelId + ":" + ssrcTransaction.getCallId() + ":" + ssrcTransaction.getStream());
}
}
public void removeByCallId(String deviceId, Integer channelId, String callId) {
SsrcTransaction ssrcTransaction = getSsrcTransaction(channelId, callId, null);
if (ssrcTransaction == null ) {
return;
}
redisTemplate.delete(VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX + userSetting.getServerId() + ":"
+ deviceId + ":" + channelId + ":" + ssrcTransaction.getCallId() + ":" + ssrcTransaction.getStream());
}
public List<SsrcTransaction> getAllSsrc() {
List<Object> ssrcTransactionKeys = RedisUtil.scan(redisTemplate, String.format("%s_*_*_*_*", VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX+ userSetting.getServerId()));
List<SsrcTransaction> result= new ArrayList<>();
for (Object ssrcTransactionKey : ssrcTransactionKeys) {
String key = (String) ssrcTransactionKey;
SsrcTransaction ssrcTransaction = JsonUtil.redisJsonToObject(redisTemplate, key, SsrcTransaction.class);
result.add(ssrcTransaction);
}
return result;
}
}

View File

@@ -2,7 +2,7 @@ package com.genersoft.iot.vmp.gb28181.task;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.media.bean.MediaServer;
@@ -86,9 +86,9 @@ public class SipRunner implements CommandLineRunner {
// 查找国标推流
List<SendRtpItem> sendRtpItems = redisCatchStorage.queryAllSendRTPServer();
List<SendRtpInfo> sendRtpItems = redisCatchStorage.queryAllSendRTPServer();
if (sendRtpItems.size() > 0) {
for (SendRtpItem sendRtpItem : sendRtpItems) {
for (SendRtpInfo sendRtpItem : sendRtpItems) {
MediaServer mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId());
redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(),sendRtpItem.getChannelId(), sendRtpItem.getCallId(),sendRtpItem.getStream());
if (mediaServerItem != null) {

View File

@@ -106,11 +106,11 @@ public interface ISIPCommander {
* 请求回放视频流
*
* @param device 视频设备
* @param channelId 预览通道
* @param channel 预览通道
* @param startTime 开始时间,格式要求yyyy-MM-dd HH:mm:ss
* @param endTime 结束时间,格式要求yyyy-MM-dd HH:mm:ss
*/
void playbackStreamCmd(MediaServer mediaServerItem, SSRCInfo ssrcInf, Device device, String channelId, String startTime, String endTime, HookSubscribe.Event event, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException;
void playbackStreamCmd(MediaServer mediaServerItem, SSRCInfo ssrcInf, Device device, DeviceChannel channel, String startTime, String endTime, HookSubscribe.Event event, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException;
/**
* 请求历史媒体下载
@@ -121,7 +121,7 @@ public interface ISIPCommander {
* @param endTime 结束时间,格式要求yyyy-MM-dd HH:mm:ss
* @param downloadSpeed 下载倍速参数
*/
void downloadStreamCmd(MediaServer mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
void downloadStreamCmd(MediaServer mediaServerItem, SSRCInfo ssrcInfo, Device device, DeviceChannel channel,
String startTime, String endTime, int downloadSpeed, HookSubscribe.Event hookEvent,
SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException;
@@ -131,7 +131,7 @@ public interface ISIPCommander {
*/
void streamByeCmd(Device device, String channelId, String stream, String callId, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException;
void talkStreamCmd(MediaServer mediaServerItem, SendRtpItem sendRtpItem, Device device, String channelId, String callId, HookSubscribe.Event event, HookSubscribe.Event eventForPush, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException;
void talkStreamCmd(MediaServer mediaServerItem, SendRtpInfo sendRtpItem, Device device, DeviceChannel channelId, String callId, HookSubscribe.Event event, HookSubscribe.Event eventForPush, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException;
void streamByeCmd(Device device, String channelId, String stream, String callId) throws InvalidArgumentException, ParseException, SipException, SsrcTransactionNotFoundException;

View File

@@ -140,7 +140,7 @@ public interface ISIPCommanderForPlatform {
* @param sendRtpItem
* @return
*/
void sendMediaStatusNotify(Platform platform, SendRtpItem sendRtpItem) throws SipException, InvalidArgumentException, ParseException;
void sendMediaStatusNotify(Platform platform, SendRtpInfo sendRtpItem) throws SipException, InvalidArgumentException, ParseException;
/**
* 向发起点播的上级回复bye
@@ -150,7 +150,7 @@ public interface ISIPCommanderForPlatform {
*/
void streamByeCmd(Platform platform, String callId) throws SipException, InvalidArgumentException, ParseException;
void streamByeCmd(Platform platform, SendRtpItem sendRtpItem) throws SipException, InvalidArgumentException, ParseException;
void streamByeCmd(Platform platform, SendRtpInfo sendRtpItem) throws SipException, InvalidArgumentException, ParseException;
void streamByeCmd(Platform platform, CommonGBChannel channel, String stream, String callId, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException;

View File

@@ -3,7 +3,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.gb28181.SipLayer;
import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo;
import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
@@ -165,7 +165,7 @@ public class SIPRequestHeaderPlarformProvider {
return registerRequest;
}
public Request createMessageRequest(Platform parentPlatform, String content, SendRtpItem sendRtpItem) throws PeerUnavailableException, ParseException, InvalidArgumentException {
public Request createMessageRequest(Platform parentPlatform, String content, SendRtpInfo sendRtpItem) throws PeerUnavailableException, ParseException, InvalidArgumentException {
CallIdHeader callIdHeader = SipFactory.getInstance().createHeaderFactory().createCallIdHeader(sendRtpItem.getCallId());
callIdHeader.setCallId(sendRtpItem.getCallId());
return createMessageRequest(parentPlatform, content, sendRtpItem.getToTag(), SipUtils.getNewViaTag(), sendRtpItem.getFromTag(), callIdHeader);
@@ -269,7 +269,7 @@ public class SIPRequestHeaderPlarformProvider {
return request;
}
public SIPRequest createByeRequest(Platform platform, SendRtpItem sendRtpItem) throws PeerUnavailableException, ParseException, InvalidArgumentException {
public SIPRequest createByeRequest(Platform platform, SendRtpInfo sendRtpItem) throws PeerUnavailableException, ParseException, InvalidArgumentException {
if (sendRtpItem == null ) {
return null;

View File

@@ -4,7 +4,6 @@ import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.gb28181.SipLayer;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.utils.GitUtil;
@@ -44,8 +43,6 @@ public class SIPRequestHeaderProvider {
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired
private VideoStreamSessionManager streamSession;
public Request createMessageRequest(Device device, String content, String viaTag, String fromTag, String toTag, CallIdHeader callIdHeader) throws ParseException, InvalidArgumentException, PeerUnavailableException {
Request request = null;

View File

@@ -8,7 +8,7 @@ import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.SipLayer;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.SIPSender;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderProvider;
@@ -36,7 +36,6 @@ import javax.sip.SipFactory;
import javax.sip.header.CallIdHeader;
import javax.sip.message.Request;
import java.text.ParseException;
import java.util.List;
/**
* @description:设备能力接口,用于定义设备的控制、查询能力
@@ -61,7 +60,7 @@ public class SIPCommander implements ISIPCommander {
private SIPRequestHeaderProvider headerProvider;
@Autowired
private VideoStreamSessionManager streamSession;
private SipInviteSessionManager sessionManager;
@Autowired
private UserSetting userSetting;
@@ -350,15 +349,16 @@ public class SIPCommander implements ISIPCommander {
Request request = headerProvider.createInviteRequest(device, channel.getDeviceId(), content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null, ssrcInfo.getSsrc(),sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()));
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, (e -> {
streamSession.remove(device.getDeviceId(), channel.getDeviceId(), ssrcInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getStream());
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
errorEvent.response(e);
}), e -> {
ResponseEvent responseEvent = (ResponseEvent) e.event;
SIPResponse response = (SIPResponse) responseEvent.getResponse();
String callId = response.getCallIdHeader().getCallId();
streamSession.put(device.getDeviceId(), channel.getDeviceId(), callId, stream, ssrcInfo.getSsrc(), mediaServerItem.getId(), response,
SsrcTransaction ssrcTransaction = SsrcTransaction.buildForDevice(device.getDeviceId(), channel.getId(), callId, stream, ssrcInfo.getSsrc(), mediaServerItem.getId(), response,
InviteSessionType.PLAY);
sessionManager.put(ssrcTransaction);
okEvent.response(e);
});
}
@@ -367,12 +367,12 @@ public class SIPCommander implements ISIPCommander {
* 请求回放视频流
*
* @param device 视频设备
* @param channelId 预览通道
* @param channel 预览通道
* @param startTime 开始时间,格式要求yyyy-MM-dd HH:mm:ss
* @param endTime 结束时间,格式要求yyyy-MM-dd HH:mm:ss
*/
@Override
public void playbackStreamCmd(MediaServer mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
public void playbackStreamCmd(MediaServer mediaServerItem, SSRCInfo ssrcInfo, Device device, DeviceChannel channel,
String startTime, String endTime, HookSubscribe.Event hookEvent,
SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException {
@@ -386,9 +386,9 @@ public class SIPCommander implements ISIPCommander {
}
StringBuffer content = new StringBuffer(200);
content.append("v=0\r\n");
content.append("o=" + channelId + " 0 0 IN IP4 " + sdpIp + "\r\n");
content.append("o=" + channel.getDeviceId() + " 0 0 IN IP4 " + sdpIp + "\r\n");
content.append("s=Playback\r\n");
content.append("u=" + channelId + ":0\r\n");
content.append("u=" + channel.getDeviceId() + ":0\r\n");
content.append("c=IN IP4 " + sdpIp + "\r\n");
content.append("t=" + DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(startTime) + " "
+ DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(endTime) + "\r\n");
@@ -454,27 +454,23 @@ public class SIPCommander implements ISIPCommander {
}
subscribe.removeSubscribe(rtpHook);
});
Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()), ssrcInfo.getSsrc());
Request request = headerProvider.createPlaybackInviteRequest(device, channel.getDeviceId(), content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()), ssrcInfo.getSsrc());
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent, event -> {
ResponseEvent responseEvent = (ResponseEvent) event.event;
SIPResponse response = (SIPResponse) responseEvent.getResponse();
streamSession.put(device.getDeviceId(), channelId,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()).getCallId(), ssrcInfo.getStream(), ssrcInfo.getSsrc(), mediaServerItem.getId(), response, InviteSessionType.PLAYBACK);
SsrcTransaction ssrcTransaction = SsrcTransaction.buildForDevice(device.getDeviceId(),
channel.getId(), sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()).getCallId(), ssrcInfo.getStream(), ssrcInfo.getSsrc(), mediaServerItem.getId(), response, InviteSessionType.PLAYBACK);
sessionManager.put(ssrcTransaction);
okEvent.response(event);
});
}
/**
* 请求历史媒体下载
*
* @param device 视频设备
* @param channelId 预览通道
* @param startTime 开始时间,格式要求yyyy-MM-dd HH:mm:ss
* @param endTime 结束时间,格式要求yyyy-MM-dd HH:mm:ss
* @param downloadSpeed 下载倍速参数
*/
@Override
public void downloadStreamCmd(MediaServer mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
public void downloadStreamCmd(MediaServer mediaServerItem, SSRCInfo ssrcInfo, Device device, DeviceChannel channel,
String startTime, String endTime, int downloadSpeed,
HookSubscribe.Event hookEvent,
SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException {
@@ -488,9 +484,9 @@ public class SIPCommander implements ISIPCommander {
}
StringBuffer content = new StringBuffer(200);
content.append("v=0\r\n");
content.append("o=" + channelId + " 0 0 IN IP4 " + sdpIp + "\r\n");
content.append("o=" + channel.getDeviceId() + " 0 0 IN IP4 " + sdpIp + "\r\n");
content.append("s=Download\r\n");
content.append("u=" + channelId + ":0\r\n");
content.append("u=" + channel.getDeviceId() + ":0\r\n");
content.append("c=IN IP4 " + sdpIp + "\r\n");
content.append("t=" + DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(startTime) + " "
+ DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(endTime) + "\r\n");
@@ -561,7 +557,7 @@ public class SIPCommander implements ISIPCommander {
(departureHookData) -> {
log.info("[录像]下载结束, 发送BYE");
try {
streamByeCmd(device, channelId, ssrcInfo.getStream(), callId);
streamByeCmd(device, channel.getDeviceId(), ssrcInfo.getStream(), callId);
} catch (InvalidArgumentException | ParseException | SipException |
SsrcTransactionNotFoundException e) {
log.error("[录像]下载结束, 发送BYE失败 {}", e.getMessage());
@@ -569,20 +565,21 @@ public class SIPCommander implements ISIPCommander {
});
});
Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null,newCallIdHeader, ssrcInfo.getSsrc());
Request request = headerProvider.createPlaybackInviteRequest(device, channel.getDeviceId(), content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null,newCallIdHeader, ssrcInfo.getSsrc());
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent, event -> {
ResponseEvent responseEvent = (ResponseEvent) event.event;
SIPResponse response = (SIPResponse) responseEvent.getResponse();
String contentString =new String(response.getRawContent());
String ssrc = SipUtils.getSsrcFromSdp(contentString);
streamSession.put(device.getDeviceId(), channelId, response.getCallIdHeader().getCallId(), ssrcInfo.getStream(), ssrc, mediaServerItem.getId(), response, InviteSessionType.DOWNLOAD);
SsrcTransaction ssrcTransaction = SsrcTransaction.buildForDevice(device.getDeviceId(), channel.getId(), response.getCallIdHeader().getCallId(), ssrcInfo.getStream(), ssrc, mediaServerItem.getId(), response, InviteSessionType.DOWNLOAD);
sessionManager.put(ssrcTransaction);
okEvent.response(event);
});
}
@Override
public void talkStreamCmd(MediaServer mediaServerItem, SendRtpItem sendRtpItem, Device device, String channelId, String callId, HookSubscribe.Event event, HookSubscribe.Event eventForPush, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException {
public void talkStreamCmd(MediaServer mediaServerItem, SendRtpInfo sendRtpItem, Device device, DeviceChannel channel, String callId, HookSubscribe.Event event, HookSubscribe.Event eventForPush, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException {
String stream = sendRtpItem.getStream();
@@ -615,7 +612,7 @@ public class SIPCommander implements ISIPCommander {
//
StringBuffer content = new StringBuffer(200);
content.append("v=0\r\n");
content.append("o=" + channelId + " 0 0 IN IP4 " + mediaServerItem.getSdpIp() + "\r\n");
content.append("o=" + channel.getDeviceId() + " 0 0 IN IP4 " + mediaServerItem.getSdpIp() + "\r\n");
content.append("s=Talk\r\n");
content.append("c=IN IP4 " + mediaServerItem.getSdpIp() + "\r\n");
content.append("t=0 0\r\n");
@@ -630,17 +627,18 @@ public class SIPCommander implements ISIPCommander {
// f字段:f= v/编码格式/分辨率/帧率/码率类型/码率大小a/编码格式/码率大小/采样率
content.append("f=v/////a/1/8/1" + "\r\n");
Request request = headerProvider.createInviteRequest(device, channelId, content.toString(),
Request request = headerProvider.createInviteRequest(device, channel.getDeviceId(), content.toString(),
SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null, sendRtpItem.getSsrc(), callIdHeader);
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, (e -> {
streamSession.remove(device.getDeviceId(), channelId, sendRtpItem.getStream());
sessionManager.removeByStream(sendRtpItem.getStream());
mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc());
errorEvent.response(e);
}), e -> {
// 这里为例避免一个通道的点播只有一个callID这个参数使用一个固定值
ResponseEvent responseEvent = (ResponseEvent) e.event;
SIPResponse response = (SIPResponse) responseEvent.getResponse();
streamSession.put(device.getDeviceId(), channelId, "talk", stream, sendRtpItem.getSsrc(), mediaServerItem.getId(), response, InviteSessionType.TALK);
SsrcTransaction ssrcTransaction = SsrcTransaction.buildForDevice(device.getDeviceId(), channel.getId(), "talk", stream, sendRtpItem.getSsrc(), mediaServerItem.getId(), response, InviteSessionType.TALK);
sessionManager.put(ssrcTransaction);
okEvent.response(e);
});
}
@@ -662,21 +660,25 @@ public class SIPCommander implements ISIPCommander {
log.warn("[发送BYE] device为null");
return;
}
List<SsrcTransaction> ssrcTransactionList = streamSession.getSsrcTransactionForAll(device.getDeviceId(), channelId, callId, stream);
if (ssrcTransactionList == null || ssrcTransactionList.isEmpty()) {
SsrcTransaction ssrcTransaction = null;
if (callId != null) {
ssrcTransaction = sessionManager.getSsrcTransactionByCallId(callId);
}else if (stream != null) {
ssrcTransaction = sessionManager.getSsrcTransactionByStream(stream);
}
if (ssrcTransaction == null) {
log.info("[发送BYE] 未找到事务信息,设备: device: {}, channel: {}", device.getDeviceId(), channelId);
throw new SsrcTransactionNotFoundException(device.getDeviceId(), channelId, callId, stream);
}
for (SsrcTransaction ssrcTransaction : ssrcTransactionList) {
log.info("[发送BYE] 设备: device: {}, channel: {}, callId: {}", device.getDeviceId(), channelId, ssrcTransaction.getCallId());
mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc());
log.info("[发送BYE] 设备: device: {}, channel: {}, callId: {}", device.getDeviceId(), channelId, ssrcTransaction.getCallId());
mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc());
mediaServerService.closeRTPServer(ssrcTransaction.getMediaServerId(), ssrcTransaction.getStream());
streamSession.removeByCallId(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(), ssrcTransaction.getCallId());
Request byteRequest = headerProvider.createByteRequest(device, channelId, ssrcTransaction.getSipTransactionInfo());
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), byteRequest, null, okEvent);
}
mediaServerService.closeRTPServer(ssrcTransaction.getMediaServerId(), ssrcTransaction.getStream());
sessionManager.removeByCallId(ssrcTransaction.getCallId());
Request byteRequest = headerProvider.createByteRequest(device, channelId, ssrcTransaction.getSipTransactionInfo());
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), byteRequest, null, okEvent);
}
@Override
@@ -1403,7 +1405,7 @@ public class SIPCommander implements ISIPCommander {
@Override
public void playbackControlCmd(Device device, StreamInfo streamInfo, String content, SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent) throws SipException, InvalidArgumentException, ParseException {
SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(device.getDeviceId(), streamInfo.getChannelId(), null, streamInfo.getStream());
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream(streamInfo.getStream());
if (ssrcTransaction == null) {
log.info("[回放控制]未找到视频流信息,设备:{}, 流ID: {}", device.getDeviceId(), streamInfo.getStream());
return;

View File

@@ -8,7 +8,7 @@ import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.SipLayer;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.SIPSender;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderPlarformProvider;
@@ -76,7 +76,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
@Autowired
private VideoStreamSessionManager streamSession;
private SipInviteSessionManager sessionManager;
@Autowired
private DynamicTask dynamicTask;
@@ -597,7 +597,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
}
@Override
public void sendMediaStatusNotify(Platform parentPlatform, SendRtpItem sendRtpItem) throws SipException, InvalidArgumentException, ParseException {
public void sendMediaStatusNotify(Platform parentPlatform, SendRtpInfo sendRtpItem) throws SipException, InvalidArgumentException, ParseException {
if (sendRtpItem == null || parentPlatform == null) {
return;
}
@@ -625,14 +625,14 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
if (platform == null) {
return;
}
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platform.getServerGBId(), null, null, callId);
SendRtpInfo sendRtpItem = redisCatchStorage.querySendRTPServer(platform.getServerGBId(), null, null, callId);
if (sendRtpItem != null) {
streamByeCmd(platform, sendRtpItem);
}
}
@Override
public synchronized void streamByeCmd(Platform platform, SendRtpItem sendRtpItem) throws SipException, InvalidArgumentException, ParseException {
public synchronized void streamByeCmd(Platform platform, SendRtpInfo sendRtpItem) throws SipException, InvalidArgumentException, ParseException {
if (sendRtpItem == null ) {
log.info("[向上级发送BYE] sendRtpItem 为NULL");
return;
@@ -657,14 +657,20 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
@Override
public void streamByeCmd(Platform platform, CommonGBChannel channel, String stream, String callId, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException {
SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(channel.getGbId(), callId, stream);
SsrcTransaction ssrcTransaction = null;
if (callId != null) {
ssrcTransaction = sessionManager.getSsrcTransactionByCallId(callId);
}else if (stream != null) {
ssrcTransaction = sessionManager.getSsrcTransactionByStream(stream);
}
if (ssrcTransaction == null) {
throw new SsrcTransactionNotFoundException(platform.getServerGBId(), channel.getGbDeviceId(), callId, stream);
}
mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc());
mediaServerService.closeRTPServer(ssrcTransaction.getMediaServerId(), ssrcTransaction.getStream());
streamSession.remove(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(), ssrcTransaction.getStream());
sessionManager.removeByStream(ssrcTransaction.getStream());
Request byteRequest = headerProviderPlatformProvider.createByteRequest(platform, channel.getGbDeviceId(), ssrcTransaction.getSipTransactionInfo());
sipSender.transmitRequest(sipLayer.getLocalIp(platform.getDeviceIp()), byteRequest, null, okEvent);
@@ -746,14 +752,15 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), ssrcInfo.getSsrc(),
callIdHeader);
sipSender.transmitRequest(sipLayer.getLocalIp(platform.getDeviceIp()), request, (e -> {
streamSession.remove(platform.getServerGBId(), channel.getGbId(), ssrcInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getStream());
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
subscribe.removeSubscribe(hook);
errorEvent.response(e);
}), e -> {
ResponseEvent responseEvent = (ResponseEvent) e.event;
SIPResponse response = (SIPResponse) responseEvent.getResponse();
streamSession.put(platform.getServerGBId(), channel.getGbId(), callIdHeader.getCallId(), stream, ssrcInfo.getSsrc(), mediaServerItem.getId(), response, InviteSessionType.BROADCAST);
SsrcTransaction ssrcTransaction = SsrcTransaction.buildForPlatform(platform.getServerGBId(), channel.getGbId(), callIdHeader.getCallId(), stream, ssrcInfo.getSsrc(), mediaServerItem.getId(), response, InviteSessionType.BROADCAST);
sessionManager.put(ssrcTransaction);
okEvent.response(e);
});
}

View File

@@ -5,7 +5,7 @@ import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
import com.genersoft.iot.vmp.gb28181.service.IPlatformService;
import com.genersoft.iot.vmp.gb28181.service.IPlayService;
@@ -83,7 +83,7 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
String fromUserId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser();
String toUserId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser();
log.info("[收到ACK] 来自->{}", fromUserId);
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId());
SendRtpInfo sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId());
if (sendRtpItem == null) {
log.warn("[收到ACK]:未找到来自{}callId: {}", fromUserId, callIdHeader.getCallId());
return;

View File

@@ -7,7 +7,7 @@ import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.service.*;
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
@@ -79,7 +79,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
private SIPProcessorObserver sipProcessorObserver;
@Autowired
private VideoStreamSessionManager streamSession;
private SipInviteSessionManager sessionManager;
@Autowired
private IPlayService playService;
@@ -112,7 +112,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
log.error("[回复BYE信息失败]{}", e.getMessage());
}
CallIdHeader callIdHeader = (CallIdHeader)evt.getRequest().getHeader(CallIdHeader.NAME);
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId());
SendRtpInfo sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId());
// 收流端发送的停止
if (sendRtpItem != null){
@@ -180,7 +180,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
}
}
// 可能是设备发送的停止
SsrcTransaction ssrcTransaction = streamSession.getSsrcTransactionByCallId(callIdHeader.getCallId());
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByCallId(callIdHeader.getCallId());
if (ssrcTransaction == null) {
return;
}
@@ -247,7 +247,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
if (mediaServerItem != null) {
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcTransaction.getSsrc());
}
streamSession.removeByCallId(device.getDeviceId(), channel.getDeviceId(), ssrcTransaction.getCallId());
sessionManager.removeByCallId(ssrcTransaction.getCallId());
}
}
}

View File

@@ -10,7 +10,7 @@ import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.service.*;
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
@@ -117,7 +117,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
private SipConfig config;
@Autowired
private VideoStreamSessionManager streamSession;
private SipInviteSessionManager sessionManager;
@Autowired
private SendRtpPortManager sendRtpPortManager;
@@ -182,7 +182,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
}else {
// 点播成功, TODO 可以在此处检测cancel命令是否存在存在则不发送
// 构建sendRTP内容
SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(streamInfo.getMediaServer(),
SendRtpInfo sendRtpItem = mediaServerService.createSendRtpItem(streamInfo.getMediaServer(),
inviteInfo.getIp(), inviteInfo.getPort(), inviteInfo.getSsrc(), platform.getServerGBId(),
streamInfo.getApp(), streamInfo.getStream(),
channel.getGbId(), inviteInfo.isTcp(), platform.isRtcp());
@@ -700,7 +700,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
}
private String createSendSdp(SendRtpItem sendRtpItem, InviteInfo inviteInfo, String sdpIp) {
private String createSendSdp(SendRtpInfo sendRtpItem, InviteInfo inviteInfo, String sdpIp) {
StringBuilder content = new StringBuilder(200);
content.append("v=0\r\n");
content.append("o=" + inviteInfo.getChannelId() + " 0 0 IN IP4 " + sdpIp + "\r\n");
@@ -867,7 +867,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
mediaTransmissionTCP ? (tcpActive ? "TCP主动" : "TCP被动") : "UDP", sdp.getSessionName().getValue());
CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME);
SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServerItem, addressStr, port, gb28181Sdp.getSsrc(), requesterId,
SendRtpInfo sendRtpItem = mediaServerService.createSendRtpItem(mediaServerItem, addressStr, port, gb28181Sdp.getSsrc(), requesterId,
device.getDeviceId(), deviceChannel.getId(),
mediaTransmissionTCP, false);
@@ -929,7 +929,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
}
}
SIPResponse sendOk(Device device, SendRtpItem sendRtpItem, SessionDescription sdp, SIPRequest request, MediaServer mediaServerItem, boolean mediaTransmissionTCP, String ssrc) {
SIPResponse sendOk(Device device, SendRtpInfo sendRtpItem, SessionDescription sdp, SIPRequest request, MediaServer mediaServerItem, boolean mediaTransmissionTCP, String ssrc) {
SIPResponse sipResponse = null;
try {
sendRtpItem.setStatus(2);
@@ -973,7 +973,8 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
audioBroadcastCatch.setStatus(AudioBroadcastCatchStatus.Ok);
audioBroadcastCatch.setSipTransactionInfoByRequest(sipResponse);
audioBroadcastManager.update(audioBroadcastCatch);
streamSession.put(device.getDeviceId(), sendRtpItem.getChannelId(), request.getCallIdHeader().getCallId(), sendRtpItem.getStream(), sendRtpItem.getSsrc(), sendRtpItem.getMediaServerId(), sipResponse, InviteSessionType.BROADCAST);
SsrcTransaction ssrcTransaction = SsrcTransaction.buildForDevice(device.getDeviceId(), sendRtpItem.getChannelId(), request.getCallIdHeader().getCallId(), sendRtpItem.getStream(), sendRtpItem.getSsrc(), sendRtpItem.getMediaServerId(), sipResponse, InviteSessionType.BROADCAST);
sessionManager.put(ssrcTransaction);
// 开启发流大华在收到200OK后就会开始建立连接
if (!device.isBroadcastPushAfterAck()) {
log.info("[语音喊话] 回复200OK后发现 BroadcastPushAfterAck为False现在开始推流");

View File

@@ -7,7 +7,7 @@ import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService;
import com.genersoft.iot.vmp.gb28181.service.IPlatformService;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
@@ -58,7 +58,7 @@ public class InfoRequestProcessor extends SIPRequestProcessorParent implements I
private SIPCommander cmder;
@Autowired
private VideoStreamSessionManager sessionManager;
private SipInviteSessionManager sessionManager;
@Override
public void afterPropertiesSet() throws Exception {
@@ -72,7 +72,7 @@ public class InfoRequestProcessor extends SIPRequestProcessorParent implements I
SIPRequest request = (SIPRequest) evt.getRequest();
CallIdHeader callIdHeader = request.getCallIdHeader();
// 先从会话内查找
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransaction(null, null, callIdHeader.getCallId(), null);
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByCallId(callIdHeader.getCallId());
// 查询设备是否存在
Device device = redisCatchStorage.getDevice(ssrcTransaction.getDeviceId());
@@ -104,7 +104,7 @@ public class InfoRequestProcessor extends SIPRequestProcessorParent implements I
String contentType = header.getContentType();
String contentSubType = header.getContentSubType();
if ("Application".equalsIgnoreCase(contentType) && "MANSRTSP".equalsIgnoreCase(contentSubType)) {
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId());
SendRtpInfo sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId());
String streamId = sendRtpItem.getStream();
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(InviteSessionType.PLAYBACK, streamId);
if (null == inviteInfo) {

View File

@@ -6,7 +6,7 @@ import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.service.IPlatformService;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
@@ -50,7 +50,7 @@ public class MessageRequestProcessor extends SIPRequestProcessorParent implement
private IRedisCatchStorage redisCatchStorage;
@Autowired
private VideoStreamSessionManager sessionManager;
private SipInviteSessionManager sessionManager;
@Override
public void afterPropertiesSet() throws Exception {
@@ -69,7 +69,7 @@ public class MessageRequestProcessor extends SIPRequestProcessorParent implement
String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest());
CallIdHeader callIdHeader = sipRequest.getCallIdHeader();
// 先从会话内查找
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransaction(null, null, callIdHeader.getCallId(), null);
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByCallId(callIdHeader.getCallId());
// 兼容海康 媒体通知 消息from字段不是设备ID的问题
if (ssrcTransaction != null) {
deviceId = ssrcTransaction.getDeviceId();

View File

@@ -127,7 +127,7 @@ public class BroadcastNotifyMessageHandler extends SIPRequestProcessorParent imp
try {
platformService.broadcastInvite(platform, channel, mediaServerForMinimumLoad, (hookData)->{
// 上级平台推流成功
AudioBroadcastCatch broadcastCatch = audioBroadcastManager.getByDeviceId(device.getDeviceId(), targetId);
AudioBroadcastCatch broadcastCatch = audioBroadcastManager.get(channel.getGbId());
if (broadcastCatch != null ) {
if (playService.audioBroadcastInUse(device, targetId)) {
log.info("[国标级联] 语音喊话 设备正在使用中 platform {} channel: {}",
@@ -141,7 +141,7 @@ public class BroadcastNotifyMessageHandler extends SIPRequestProcessorParent imp
broadcastCatch.setMediaServerItem(hookData.getMediaServer());
audioBroadcastManager.update(broadcastCatch);
// 推流到设备
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, targetId, hookData.getStream(), null);
SendRtpInfo sendRtpItem = redisCatchStorage.querySendRTPServer(null, targetId, hookData.getStream(), null);
if (sendRtpItem == null) {
log.warn("[国标级联] 语音喊话 异常,未找到发流信息, channelId: {}, stream: {}", targetId, hookData.getStream());
log.info("[国标级联] 语音喊话 重新开始channelId: {}, stream: {}", targetId, hookData.getStream());

View File

@@ -5,11 +5,11 @@ import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction;
import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService;
import com.genersoft.iot.vmp.gb28181.service.IPlatformService;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
@@ -66,7 +66,7 @@ public class MediaStatusNotifyMessageHandler extends SIPRequestProcessorParent i
private IInviteStreamService inviteStreamService;
@Autowired
private VideoStreamSessionManager streamSession;
private SipInviteSessionManager sessionManager;
@Override
public void afterPropertiesSet() throws Exception {
@@ -87,7 +87,7 @@ public class MediaStatusNotifyMessageHandler extends SIPRequestProcessorParent i
if ("121".equals(NotifyType)){
log.info("[录像流]推送完毕,收到关流通知");
SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(null, null, callIdHeader.getCallId(), null);
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByCallId(callIdHeader.getCallId());
if (ssrcTransaction != null) {
log.info("[录像流]推送完毕,关流通知, device: {}, channelId: {}", ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId());
InviteInfo inviteInfo = inviteStreamService.getInviteInfo(InviteSessionType.DOWNLOAD, ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(), ssrcTransaction.getStream());
@@ -105,7 +105,7 @@ public class MediaStatusNotifyMessageHandler extends SIPRequestProcessorParent i
Hook hook = Hook.getInstance(HookType.on_media_arrival, "rtp", ssrcTransaction.getStream(), ssrcTransaction.getMediaServerId());
subscribe.removeSubscribe(hook);
// 如果级联播放,需要给上级发送此通知 TODO 多个上级同时观看一个下级 可能存在停错的问题需要将点播CallId进行上下级绑定
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, ssrcTransaction.getChannelId(), null, null);
SendRtpInfo sendRtpItem = redisCatchStorage.querySendRTPServer(null, ssrcTransaction.getChannelId(), null, null);
if (sendRtpItem != null) {
Platform parentPlatform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId());
if (parentPlatform == null) {

View File

@@ -1,10 +1,8 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.gb28181.bean.AudioBroadcastCatch;
import com.genersoft.iot.vmp.gb28181.bean.AudioBroadcastCatchStatus;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService;
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
@@ -35,7 +33,7 @@ public class BroadcastResponseMessageHandler extends SIPRequestProcessorParent i
private ResponseMessageHandler responseMessageHandler;
@Autowired
private DynamicTask dynamicTask;
private IDeviceChannelService deviceChannelService;
@Autowired
private AudioBroadcastManager audioBroadcastManager;
@@ -54,7 +52,19 @@ public class BroadcastResponseMessageHandler extends SIPRequestProcessorParent i
SIPRequest request = (SIPRequest) evt.getRequest();
try {
String channelId = getText(rootElement, "DeviceID");
if (!audioBroadcastManager.exit(device.getDeviceId(), channelId)) {
DeviceChannel channel = null;
if (!channelId.equals(device.getDeviceId())) {
channel = deviceChannelService.getOne(device.getDeviceId(), channelId);
}else {
channel = deviceChannelService.getBroadcastChannel(device.getId());
}
if (channel == null) {
log.info("[语音广播]回复: 未找到通道{}/{}", device.getDeviceId(), channelId );
// 回复410
responseAck((SIPRequest) evt.getRequest(), Response.NOT_FOUND);
return;
}
if (!audioBroadcastManager.exit(channel.getId())) {
// 回复410
responseAck((SIPRequest) evt.getRequest(), Response.GONE);
return;
@@ -70,7 +80,7 @@ public class BroadcastResponseMessageHandler extends SIPRequestProcessorParent i
// 回复200 OK
responseAck(request, Response.OK);
if (result.equalsIgnoreCase("OK")) {
AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.getByDeviceId(device.getDeviceId(), channelId);
AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(channel.getId());
audioBroadcastCatch.setStatus(AudioBroadcastCatchStatus.WaiteInvite);
audioBroadcastManager.update(audioBroadcastCatch);
}else {

View File

@@ -2,7 +2,7 @@ package com.genersoft.iot.vmp.media.service;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy;
@@ -58,9 +58,9 @@ public interface IMediaNodeServerService {
Map<String, String> getFFmpegCMDs(MediaServer mediaServer);
void startSendRtpPassive(MediaServer mediaServer, SendRtpItem sendRtpItem, Integer timeout);
void startSendRtpPassive(MediaServer mediaServer, SendRtpInfo sendRtpItem, Integer timeout);
void startSendRtpStream(MediaServer mediaServer, SendRtpItem sendRtpItem);
void startSendRtpStream(MediaServer mediaServer, SendRtpInfo sendRtpItem);
Long updateDownloadProcess(MediaServer mediaServer, String app, String stream);

View File

@@ -2,7 +2,7 @@ package com.genersoft.iot.vmp.media.service;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.service.bean.MediaServerLoad;
@@ -142,13 +142,13 @@ public interface IMediaServerService {
Boolean isStreamReady(MediaServer mediaServer, String rtp, String streamId);
void startSendRtpPassive(MediaServer mediaServer, SendRtpItem sendRtpItem, Integer timeout);
void startSendRtpPassive(MediaServer mediaServer, SendRtpInfo sendRtpItem, Integer timeout);
void startSendRtp(MediaServer mediaServer, SendRtpItem sendRtpItem);
void startSendRtp(MediaServer mediaServer, SendRtpInfo sendRtpItem);
SendRtpItem createSendRtpItem(MediaServer mediaServerItem, String addressStr, int port, String ssrc, String requesterId, String deviceId, Integer channelId, boolean mediaTransmissionTCP, boolean rtcp);
SendRtpInfo createSendRtpItem(MediaServer mediaServerItem, String addressStr, int port, String ssrc, String requesterId, String deviceId, Integer channelId, boolean mediaTransmissionTCP, boolean rtcp);
SendRtpItem createSendRtpItem(MediaServer serverItem, String ip, int port, String ssrc, String platformId,
SendRtpInfo createSendRtpItem(MediaServer serverItem, String ip, int port, String ssrc, String platformId,
String app, String stream, Integer channelId, boolean tcp, boolean rtcp);
MediaServer getMediaServerByAppAndStream(String app, String stream);

View File

@@ -8,7 +8,7 @@ import com.genersoft.iot.vmp.conf.MediaConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.PlayException;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
@@ -846,7 +846,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
}
@Override
public void startSendRtpPassive(MediaServer mediaServer, SendRtpItem sendRtpItem, Integer timeout) {
public void startSendRtpPassive(MediaServer mediaServer, SendRtpInfo sendRtpItem, Integer timeout) {
IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
if (mediaNodeServerService == null) {
log.info("[startSendRtpPassive] 失败, mediaServer的类型 {},未找到对应的实现类", mediaServer.getType());
@@ -856,7 +856,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
}
@Override
public void startSendRtp(MediaServer mediaServer, SendRtpItem sendRtpItem) {
public void startSendRtp(MediaServer mediaServer, SendRtpInfo sendRtpItem) {
IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
if (mediaNodeServerService == null) {
log.info("[startSendRtpStream] 失败, mediaServer的类型 {},未找到对应的实现类", mediaServer.getType());
@@ -868,12 +868,12 @@ public class MediaServerServiceImpl implements IMediaServerService {
}
@Override
public SendRtpItem createSendRtpItem(MediaServer mediaServer, String ip, int port, String ssrc, String requesterId, String deviceId, Integer channelId, boolean isTcp, boolean rtcp) {
public SendRtpInfo createSendRtpItem(MediaServer mediaServer, String ip, int port, String ssrc, String requesterId, String deviceId, Integer channelId, boolean isTcp, boolean rtcp) {
int localPort = sendRtpPortManager.getNextPort(mediaServer);
if (localPort == 0) {
return null;
}
SendRtpItem sendRtpItem = new SendRtpItem();
SendRtpInfo sendRtpItem = new SendRtpInfo();
sendRtpItem.setIp(ip);
sendRtpItem.setPort(port);
sendRtpItem.setSsrc(ssrc);
@@ -889,14 +889,14 @@ public class MediaServerServiceImpl implements IMediaServerService {
}
@Override
public SendRtpItem createSendRtpItem(MediaServer serverItem, String ip, int port, String ssrc, String platformId,
public SendRtpInfo createSendRtpItem(MediaServer serverItem, String ip, int port, String ssrc, String platformId,
String app, String stream, Integer channelId, boolean tcp, boolean rtcp){
int localPort = sendRtpPortManager.getNextPort(serverItem);
if (localPort <= 0) {
throw new PlayException(javax.sip.message.Response.SERVER_INTERNAL_ERROR, "server internal error");
}
SendRtpItem sendRtpItem = new SendRtpItem();
SendRtpInfo sendRtpItem = new SendRtpInfo();
sendRtpItem.setIp(ip);
sendRtpItem.setPort(port);
sendRtpItem.setSsrc(ssrc);

View File

@@ -2,7 +2,7 @@ package com.genersoft.iot.vmp.media.zlm;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import lombok.extern.slf4j.Slf4j;
@@ -37,10 +37,10 @@ public class SendRtpPortManager {
String key = VideoManagerConstants.SEND_RTP_INFO_PREFIX
+ userSetting.getServerId() + "_*";
List<Object> queryResult = RedisUtil.scan(redisTemplate, key);
Map<Integer, SendRtpItem> sendRtpItemMap = new HashMap<>();
Map<Integer, SendRtpInfo> sendRtpItemMap = new HashMap<>();
for (Object o : queryResult) {
SendRtpItem sendRtpItem = (SendRtpItem) redisTemplate.opsForValue().get(o);
SendRtpInfo sendRtpItem = (SendRtpInfo) redisTemplate.opsForValue().get(o);
if (sendRtpItem != null) {
sendRtpItemMap.put(sendRtpItem.getLocalPort(), sendRtpItem);
}
@@ -80,7 +80,7 @@ public class SendRtpPortManager {
return getSendPort(startPort, endPort, sendIndexKey, sendRtpItemMap);
}
private synchronized int getSendPort(int startPort, int endPort, String sendIndexKey, Map<Integer, SendRtpItem> sendRtpItemMap){
private synchronized int getSendPort(int startPort, int endPort, String sendIndexKey, Map<Integer, SendRtpInfo> sendRtpItemMap){
// TODO 这里改为只取偶数端口
RedisAtomicInteger redisAtomicInteger = new RedisAtomicInteger(sendIndexKey , redisTemplate.getConnectionFactory());
if (redisAtomicInteger.get() < startPort) {

View File

@@ -7,7 +7,7 @@ import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.service.IMediaNodeServerService;
@@ -331,7 +331,7 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService {
}
@Override
public void startSendRtpPassive(MediaServer mediaServer, SendRtpItem sendRtpItem, Integer timeout) {
public void startSendRtpPassive(MediaServer mediaServer, SendRtpInfo sendRtpItem, Integer timeout) {
Map<String, Object> param = new HashMap<>(12);
param.put("vhost","__defaultVhost__");
param.put("app", sendRtpItem.getApp());
@@ -366,7 +366,7 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService {
}
@Override
public void startSendRtpStream(MediaServer mediaServer, SendRtpItem sendRtpItem) {
public void startSendRtpStream(MediaServer mediaServer, SendRtpInfo sendRtpItem) {
Map<String, Object> param = new HashMap<>(12);
param.put("vhost", "__defaultVhost__");
param.put("app", sendRtpItem.getApp());

View File

@@ -1,10 +1,9 @@
package com.genersoft.iot.vmp.media.zlm;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@@ -202,7 +201,7 @@ public class ZLMServerFactory {
return mediaInfo.getInteger("totalReaderCount");
}
public JSONObject startSendRtp(MediaServer mediaInfo, SendRtpItem sendRtpItem) {
public JSONObject startSendRtp(MediaServer mediaInfo, SendRtpInfo sendRtpItem) {
String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
log.info("rtp/{}开始推流, 目标={}:{}SSRC={}", sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc());
Map<String, Object> param = new HashMap<>(12);
@@ -261,7 +260,7 @@ public class ZLMServerFactory {
return result;
}
public JSONObject stopSendRtpStream(MediaServer mediaServerItem, SendRtpItem sendRtpItem) {
public JSONObject stopSendRtpStream(MediaServer mediaServerItem, SendRtpInfo sendRtpItem) {
Map<String, Object> param = new HashMap<>();
param.put("vhost", "__defaultVhost__");
param.put("app", sendRtpItem.getApp());

View File

@@ -1,6 +1,6 @@
package com.genersoft.iot.vmp.media.zlm.dto;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
import java.text.ParseException;
@@ -9,5 +9,5 @@ import java.text.ParseException;
*/
public interface ChannelOnlineEvent {
void run(SendRtpItem sendRtpItem) throws ParseException;
void run(SendRtpInfo sendRtpItem) throws ParseException;
}

View File

@@ -1,5 +1,6 @@
package com.genersoft.iot.vmp.service;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.hook.HookData;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.service.bean.RTPServerParam;
@@ -7,4 +8,6 @@ import com.genersoft.iot.vmp.service.bean.SSRCInfo;
public interface IReceiveRtpServerService {
SSRCInfo openRTPServer(RTPServerParam rtpServerParam, ErrorCallback<HookData> callback);
void closeRTPServer(MediaServer mediaServer, String stream);
}

View File

@@ -0,0 +1,5 @@
package com.genersoft.iot.vmp.service;
public interface ISendRtpServerService {
}

View File

@@ -1,6 +1,6 @@
package com.genersoft.iot.vmp.service.bean;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
/**
* redis消息请求下级推送流信息
@@ -82,7 +82,7 @@ public class RequestPushStreamMsg {
return requestPushStreamMsg;
}
public static RequestPushStreamMsg getInstance(SendRtpItem sendRtpItem) {
public static RequestPushStreamMsg getInstance(SendRtpInfo sendRtpItem) {
RequestPushStreamMsg requestPushStreamMsg = new RequestPushStreamMsg();
requestPushStreamMsg.setMediaServerId(sendRtpItem.getMediaServerId());
requestPushStreamMsg.setApp(sendRtpItem.getApp());

View File

@@ -1,12 +1,12 @@
package com.genersoft.iot.vmp.service.bean;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
public class RequestStopPushStreamMsg {
private SendRtpItem sendRtpItem;
private SendRtpInfo sendRtpItem;
private String platformName;
@@ -14,11 +14,11 @@ public class RequestStopPushStreamMsg {
private int platFormIndex;
public SendRtpItem getSendRtpItem() {
public SendRtpInfo getSendRtpItem() {
return sendRtpItem;
}
public void setSendRtpItem(SendRtpItem sendRtpItem) {
public void setSendRtpItem(SendRtpInfo sendRtpItem) {
this.sendRtpItem = sendRtpItem;
}
@@ -39,7 +39,7 @@ public class RequestStopPushStreamMsg {
this.platFormIndex = platFormIndex;
}
public static RequestStopPushStreamMsg getInstance(SendRtpItem sendRtpItem, String platformName, int platFormIndex) {
public static RequestStopPushStreamMsg getInstance(SendRtpInfo sendRtpItem, String platformName, int platFormIndex) {
RequestStopPushStreamMsg streamMsg = new RequestStopPushStreamMsg();
streamMsg.setSendRtpItem(sendRtpItem);
streamMsg.setPlatformName(platformName);

View File

@@ -1,6 +1,6 @@
package com.genersoft.iot.vmp.service.bean;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
/**
@@ -9,15 +9,15 @@ import com.genersoft.iot.vmp.media.bean.MediaServer;
*/
public class ResponseSendItemMsg {
private SendRtpItem sendRtpItem;
private SendRtpInfo sendRtpItem;
private MediaServer mediaServerItem;
public SendRtpItem getSendRtpItem() {
public SendRtpInfo getSendRtpItem() {
return sendRtpItem;
}
public void setSendRtpItem(SendRtpItem sendRtpItem) {
public void setSendRtpItem(SendRtpInfo sendRtpItem) {
this.sendRtpItem = sendRtpItem;
}

View File

@@ -4,13 +4,12 @@ import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.service.ICloudRecordService;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.media.MediaRecordMp4Event;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
import com.genersoft.iot.vmp.gb28181.service.ICloudRecordService;
import com.genersoft.iot.vmp.service.bean.CloudRecordItem;
import com.genersoft.iot.vmp.service.bean.DownloadFileInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@@ -52,9 +51,6 @@ public class CloudRecordServiceImpl implements ICloudRecordService {
@Autowired
private AssistRESTfulUtils assistRESTfulUtils;
@Autowired
private VideoStreamSessionManager streamSession;
@Override
public PageInfo<CloudRecordItem> getList(int page, int count, String query, String app, String stream, String startTime, String endTime, List<MediaServer> mediaServerItems, String callId) {
// 开始时间和结束时间在数据库中都是以秒为单位的

View File

@@ -11,7 +11,7 @@ import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService;
import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService;
import com.genersoft.iot.vmp.gb28181.service.IPlatformService;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.media.bean.MediaServer;
@@ -64,7 +64,7 @@ public class MediaServiceImpl implements IMediaService {
private IDeviceChannelService deviceChannelService;
@Autowired
private VideoStreamSessionManager sessionManager;
private SipInviteSessionManager sessionManager;
@Autowired
private IPlatformService platformService;
@@ -161,25 +161,25 @@ public class MediaServiceImpl implements IMediaService {
}
// 设置音频信息及录制信息
List<SsrcTransaction> ssrcTransactionForAll = sessionManager.getSsrcTransactionForAll(null, null, stream);
if (ssrcTransactionForAll != null && ssrcTransactionForAll.size() == 1) {
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream(stream);
if (ssrcTransaction != null ) {
// 为录制国标模拟一个鉴权信息, 方便后续写入录像文件时使用
StreamAuthorityInfo streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(app, stream, mediaServer.getId());
streamAuthorityInfo.setApp(app);
streamAuthorityInfo.setStream(ssrcTransactionForAll.get(0).getStream());
streamAuthorityInfo.setCallId(ssrcTransactionForAll.get(0).getSipTransactionInfo().getCallId());
streamAuthorityInfo.setStream(ssrcTransaction.getStream());
streamAuthorityInfo.setCallId(ssrcTransaction.getSipTransactionInfo().getCallId());
redisCatchStorage.updateStreamAuthorityInfo(app, ssrcTransactionForAll.get(0).getStream(), streamAuthorityInfo);
redisCatchStorage.updateStreamAuthorityInfo(app, ssrcTransaction.getStream(), streamAuthorityInfo);
String deviceId = ssrcTransactionForAll.get(0).getDeviceId();
Integer channelId = ssrcTransactionForAll.get(0).getChannelId();
String deviceId = ssrcTransaction.getDeviceId();
Integer channelId = ssrcTransaction.getChannelId();
DeviceChannel deviceChannel = deviceChannelService.getOneById(channelId);
if (deviceChannel != null) {
result.setEnable_audio(deviceChannel.isHasAudio());
}
// 如果是录像下载就设置视频间隔十秒
if (ssrcTransactionForAll.get(0).getType() == InviteSessionType.DOWNLOAD) {
if (ssrcTransaction.getType() == InviteSessionType.DOWNLOAD) {
// 获取录像的总时长,然后设置为这个视频的时长
InviteInfo inviteInfoForDownload = inviteStreamService.getInviteInfo(InviteSessionType.DOWNLOAD, channelId, stream);
if (inviteInfoForDownload != null && inviteInfoForDownload.getStreamInfo() != null) {
@@ -193,7 +193,7 @@ public class MediaServiceImpl implements IMediaService {
}
}
// 如果是talk对讲则默认获取声音
if (ssrcTransactionForAll.get(0).getType() == InviteSessionType.TALK) {
if (ssrcTransaction.getType() == InviteSessionType.TALK) {
result.setEnable_audio(true);
}
}
@@ -231,9 +231,9 @@ public class MediaServiceImpl implements IMediaService {
}
// 收到无人观看说明流也没有在往上级推送
if (redisCatchStorage.isChannelSendingRTP(inviteInfo.getChannelId())) {
List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChannelId(inviteInfo.getChannelId());
List<SendRtpInfo> sendRtpItems = redisCatchStorage.querySendRTPServerByChannelId(inviteInfo.getChannelId());
if (!sendRtpItems.isEmpty()) {
for (SendRtpItem sendRtpItem : sendRtpItems) {
for (SendRtpInfo sendRtpItem : sendRtpItems) {
Platform parentPlatform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId());
try {
commanderForPlatform.streamByeCmd(parentPlatform, sendRtpItem.getCallId());
@@ -271,7 +271,7 @@ public class MediaServiceImpl implements IMediaService {
deviceChannelService.stopPlay(inviteInfo.getChannelId());
return result;
}
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, stream, null);
SendRtpInfo sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, stream, null);
if (sendRtpItem != null && "talk".equals(sendRtpItem.getApp())) {
return false;
}

View File

@@ -2,7 +2,9 @@ package com.genersoft.iot.vmp.service.impl;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookData;
@@ -10,7 +12,6 @@ import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
import com.genersoft.iot.vmp.media.service.IMediaNodeServerService;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IReceiveRtpServerService;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
@@ -44,6 +45,9 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService {
@Autowired
private HookSubscribe subscribe;
@Autowired
private SipInviteSessionManager sessionManager;
/**
* 流到来的处理
*/
@@ -135,12 +139,15 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService {
}
@Override
public void closeRTPServer(MediaServer mediaServer, String streamId) {
public void closeRTPServer(MediaServer mediaServer, String stream) {
if (mediaServer == null) {
return;
}
// 释放ssrc
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream(stream);
if (ssrcTransaction != null) {
// 释放ssrc
ssrcFactory.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc());
}
mediaServerService.closeRTPServer(mediaServer, stream);
}
}

View File

@@ -0,0 +1,12 @@
package com.genersoft.iot.vmp.service.impl;
import com.genersoft.iot.vmp.service.ISendRtpServerService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class SendRtpServerServiceImpl implements ISendRtpServerService {
}

View File

@@ -2,20 +2,20 @@ package com.genersoft.iot.vmp.service.redisMsg;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
public interface IRedisRpcService {
SendRtpItem getSendRtpItem(String sendRtpItemKey);
SendRtpInfo getSendRtpItem(String sendRtpItemKey);
WVPResult startSendRtp(String sendRtpItemKey, SendRtpItem sendRtpItem);
WVPResult startSendRtp(String sendRtpItemKey, SendRtpInfo sendRtpItem);
WVPResult stopSendRtp(String sendRtpItemKey);
long waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback<String> callback);
long waitePushStreamOnline(SendRtpInfo sendRtpItem, CommonCallback<String> callback);
void stopWaitePushStreamOnline(SendRtpItem sendRtpItem);
void stopWaitePushStreamOnline(SendRtpInfo sendRtpItem);
void rtpSendStopped(String sendRtpItemKey);

View File

@@ -8,7 +8,7 @@ import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
import com.genersoft.iot.vmp.gb28181.service.IPlatformService;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
@@ -69,7 +69,7 @@ public class RedisRpcController {
*/
public RedisRpcResponse getSendRtpItem(RedisRpcRequest request) {
String sendRtpItemKey = request.getParam().toString();
SendRtpItem sendRtpItem = (SendRtpItem) redisTemplate.opsForValue().get(sendRtpItemKey);
SendRtpInfo sendRtpItem = (SendRtpInfo) redisTemplate.opsForValue().get(sendRtpItemKey);
if (sendRtpItem == null) {
log.info("[redis-rpc] 获取发流的信息, 未找到redis中的发流信息 key{}", sendRtpItemKey);
RedisRpcResponse response = request.getResponse();
@@ -111,7 +111,7 @@ public class RedisRpcController {
* 监听流上线
*/
public RedisRpcResponse waitePushStreamOnline(RedisRpcRequest request) {
SendRtpItem sendRtpItem = JSONObject.parseObject(request.getParam().toString(), SendRtpItem.class);
SendRtpInfo sendRtpItem = JSONObject.parseObject(request.getParam().toString(), SendRtpInfo.class);
log.info("[redis-rpc] 监听流上线: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
// 查询本级是否有这个流
MediaServer mediaServer = mediaServerService.getMediaServerByAppAndStream(sendRtpItem.getApp(), sendRtpItem.getStream());
@@ -195,7 +195,7 @@ public class RedisRpcController {
* 停止监听流上线
*/
public RedisRpcResponse stopWaitePushStreamOnline(RedisRpcRequest request) {
SendRtpItem sendRtpItem = JSONObject.parseObject(request.getParam().toString(), SendRtpItem.class);
SendRtpInfo sendRtpItem = JSONObject.parseObject(request.getParam().toString(), SendRtpInfo.class);
log.info("[redis-rpc] 停止监听流上线: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
// 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者
Hook hook = Hook.getInstance(HookType.on_media_arrival, sendRtpItem.getApp(), sendRtpItem.getStream(), null);
@@ -225,7 +225,7 @@ public class RedisRpcController {
*/
public RedisRpcResponse startSendRtp(RedisRpcRequest request) {
String sendRtpItemKey = request.getParam().toString();
SendRtpItem sendRtpItem = (SendRtpItem) redisTemplate.opsForValue().get(sendRtpItemKey);
SendRtpInfo sendRtpItem = (SendRtpInfo) redisTemplate.opsForValue().get(sendRtpItemKey);
RedisRpcResponse response = request.getResponse();
response.setStatusCode(200);
if (sendRtpItem == null) {
@@ -268,7 +268,7 @@ public class RedisRpcController {
*/
public RedisRpcResponse stopSendRtp(RedisRpcRequest request) {
String sendRtpItemKey = request.getParam().toString();
SendRtpItem sendRtpItem = (SendRtpItem) redisTemplate.opsForValue().get(sendRtpItemKey);
SendRtpInfo sendRtpItem = (SendRtpInfo) redisTemplate.opsForValue().get(sendRtpItemKey);
RedisRpcResponse response = request.getResponse();
response.setStatusCode(200);
if (sendRtpItem == null) {

View File

@@ -7,7 +7,7 @@ import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
@@ -53,17 +53,17 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
}
@Override
public SendRtpItem getSendRtpItem(String sendRtpItemKey) {
public SendRtpInfo getSendRtpItem(String sendRtpItemKey) {
RedisRpcRequest request = buildRequest("getSendRtpItem", sendRtpItemKey);
RedisRpcResponse response = redisRpcConfig.request(request, 10);
if (response.getBody() == null) {
return null;
}
return (SendRtpItem)redisTemplate.opsForValue().get(response.getBody().toString());
return (SendRtpInfo)redisTemplate.opsForValue().get(response.getBody().toString());
}
@Override
public WVPResult startSendRtp(String sendRtpItemKey, SendRtpItem sendRtpItem) {
public WVPResult startSendRtp(String sendRtpItemKey, SendRtpInfo sendRtpItem) {
log.info("[请求其他WVP] 开始推流wvp{} {}/{}", sendRtpItem.getServerId(), sendRtpItem.getApp(), sendRtpItem.getStream());
RedisRpcRequest request = buildRequest("startSendRtp", sendRtpItemKey);
request.setToId(sendRtpItem.getServerId());
@@ -73,7 +73,7 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
@Override
public WVPResult stopSendRtp(String sendRtpItemKey) {
SendRtpItem sendRtpItem = (SendRtpItem)redisTemplate.opsForValue().get(sendRtpItemKey);
SendRtpInfo sendRtpItem = (SendRtpInfo)redisTemplate.opsForValue().get(sendRtpItemKey);
if (sendRtpItem == null) {
log.info("[请求其他WVP] 停止推流, 未找到redis中的发流信息 key{}", sendRtpItemKey);
return WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到发流信息");
@@ -86,7 +86,7 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
}
@Override
public long waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback<String> callback) {
public long waitePushStreamOnline(SendRtpInfo sendRtpItem, CommonCallback<String> callback) {
log.info("[请求所有WVP监听流上线] {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
// 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者
Hook hook = Hook.getInstance(HookType.on_media_arrival, sendRtpItem.getApp(), sendRtpItem.getStream(), null);
@@ -127,7 +127,7 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
}
@Override
public void stopWaitePushStreamOnline(SendRtpItem sendRtpItem) {
public void stopWaitePushStreamOnline(SendRtpInfo sendRtpItem) {
log.info("[停止WVP监听流上线] {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
Hook hook = Hook.getInstance(HookType.on_media_arrival, sendRtpItem.getApp(), sendRtpItem.getStream(), null);
hookSubscribe.removeSubscribe(hook);
@@ -138,7 +138,7 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
@Override
public void rtpSendStopped(String sendRtpItemKey) {
SendRtpItem sendRtpItem = (SendRtpItem)redisTemplate.opsForValue().get(sendRtpItemKey);
SendRtpInfo sendRtpItem = (SendRtpInfo)redisTemplate.opsForValue().get(sendRtpItemKey);
if (sendRtpItem == null) {
log.info("[停止WVP监听流上线] 未找到redis中的发流信息 key{}", sendRtpItemKey);
return;

View File

@@ -38,9 +38,9 @@ public interface IRedisCatchStorage {
void delPlatformRegisterInfo(String callId);
void updateSendRTPSever(SendRtpItem sendRtpItem);
void updateSendRTPSever(SendRtpInfo sendRtpItem);
List<SendRtpItem> querySendRTPServer(String platformGbId, String channelId, String streamId);
List<SendRtpInfo> querySendRTPServer(String platformGbId, String channelId, String streamId);
/**
* 查询RTP推送信息缓存
@@ -48,9 +48,9 @@ public interface IRedisCatchStorage {
* @param channelId
* @return sendRtpItem
*/
SendRtpItem querySendRTPServer(String platformGbId, String channelId, String streamId, String callId);
SendRtpInfo querySendRTPServer(String platformGbId, String channelId, String streamId, String callId);
List<SendRtpItem> querySendRTPServer(String platformGbId);
List<SendRtpInfo> querySendRTPServer(String platformGbId);
/**
* 删除RTP推送信息缓存
@@ -182,9 +182,9 @@ public interface IRedisCatchStorage {
*/
void sendStreamPushRequestedMsgForStatus();
List<SendRtpItem> querySendRTPServerByChannelId(String channelId);
List<SendRtpInfo> querySendRTPServerByChannelId(String channelId);
List<SendRtpItem> querySendRTPServerByStream(String stream);
List<SendRtpInfo> querySendRTPServerByStream(String stream);
SystemAllInfo getSystemInfo();
@@ -196,9 +196,9 @@ public interface IRedisCatchStorage {
void addDiskInfo(List<Map<String, Object>> diskInfo);
void deleteSendRTPServer(SendRtpItem sendRtpItem);
void deleteSendRTPServer(SendRtpInfo sendRtpItem);
List<SendRtpItem> queryAllSendRTPServer();
List<SendRtpInfo> queryAllSendRTPServer();
List<Device> getAllDevices();
@@ -208,9 +208,9 @@ public interface IRedisCatchStorage {
void sendChannelAddOrDelete(String deviceId, String channelId, boolean add);
void sendPlatformStartPlayMsg(SendRtpItem sendRtpItem, Platform platform);
void sendPlatformStartPlayMsg(SendRtpInfo sendRtpItem, Platform platform);
void sendPlatformStopPlayMsg(SendRtpItem sendRtpItem, Platform platform, CommonGBChannel channel);
void sendPlatformStopPlayMsg(SendRtpInfo sendRtpItem, Platform platform, CommonGBChannel channel);
void addPushListItem(String app, String stream, MediaInfo param);
@@ -220,11 +220,11 @@ public interface IRedisCatchStorage {
void sendPushStreamClose(MessageForPushChannel messageForPushChannel);
void addWaiteSendRtpItem(SendRtpItem sendRtpItem, int platformPlayTimeout);
void addWaiteSendRtpItem(SendRtpInfo sendRtpItem, int platformPlayTimeout);
SendRtpItem getWaiteSendRtpItem(String app, String stream);
SendRtpInfo getWaiteSendRtpItem(String app, String stream);
void sendStartSendRtp(SendRtpItem sendRtpItem);
void sendStartSendRtp(SendRtpInfo sendRtpItem);
void sendPushStreamOnline(SendRtpItem sendRtpItem);
void sendPushStreamOnline(SendRtpInfo sendRtpItem);
}

View File

@@ -134,31 +134,31 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
}
@Override
public void updateSendRTPSever(SendRtpItem sendRtpItem) {
public void updateSendRTPSever(SendRtpInfo sendRtpItem) {
redisTemplate.opsForValue().set(sendRtpItem.getRedisKey(), sendRtpItem);
}
@Override
public List<SendRtpItem> querySendRTPServer(String platformGbId, String channelId, String streamId) {
public List<SendRtpInfo> querySendRTPServer(String platformGbId, String channelId, String streamId) {
String scanKey = VideoManagerConstants.SEND_RTP_INFO_PREFIX
+ userSetting.getServerId() + "_*_"
+ platformGbId + "_"
+ channelId + "_"
+ streamId + "_"
+ "*";
List<SendRtpItem> result = new ArrayList<>();
List<SendRtpInfo> result = new ArrayList<>();
List<Object> scan = RedisUtil.scan(redisTemplate, scanKey);
if (!scan.isEmpty()) {
for (Object o : scan) {
String key = (String) o;
result.add(JsonUtil.redisJsonToObject(redisTemplate, key, SendRtpItem.class));
result.add(JsonUtil.redisJsonToObject(redisTemplate, key, SendRtpInfo.class));
}
}
return result;
}
@Override
public SendRtpItem querySendRTPServer(String platformGbId, String channelId, String streamId, String callId) {
public SendRtpInfo querySendRTPServer(String platformGbId, String channelId, String streamId, String callId) {
if (platformGbId == null) {
platformGbId = "*";
}
@@ -179,14 +179,14 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
+ callId;
List<Object> scan = RedisUtil.scan(redisTemplate, key);
if (scan.size() > 0) {
return (SendRtpItem)redisTemplate.opsForValue().get(scan.get(0));
return (SendRtpInfo)redisTemplate.opsForValue().get(scan.get(0));
}else {
return null;
}
}
@Override
public List<SendRtpItem> querySendRTPServerByChannelId(String channelId) {
public List<SendRtpInfo> querySendRTPServerByChannelId(String channelId) {
if (channelId == null) {
return null;
}
@@ -200,15 +200,15 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
+ streamId + "_"
+ callId;
List<Object> scan = RedisUtil.scan(redisTemplate, key);
List<SendRtpItem> result = new ArrayList<>();
List<SendRtpInfo> result = new ArrayList<>();
for (Object o : scan) {
result.add((SendRtpItem) redisTemplate.opsForValue().get(o));
result.add((SendRtpInfo) redisTemplate.opsForValue().get(o));
}
return result;
}
@Override
public List<SendRtpItem> querySendRTPServerByStream(String stream) {
public List<SendRtpInfo> querySendRTPServerByStream(String stream) {
if (stream == null) {
return null;
}
@@ -222,15 +222,15 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
+ stream + "_"
+ callId;
List<Object> scan = RedisUtil.scan(redisTemplate, key);
List<SendRtpItem> result = new ArrayList<>();
List<SendRtpInfo> result = new ArrayList<>();
for (Object o : scan) {
result.add((SendRtpItem) redisTemplate.opsForValue().get(o));
result.add((SendRtpInfo) redisTemplate.opsForValue().get(o));
}
return result;
}
@Override
public List<SendRtpItem> querySendRTPServer(String platformGbId) {
public List<SendRtpInfo> querySendRTPServer(String platformGbId) {
if (platformGbId == null) {
platformGbId = "*";
}
@@ -238,11 +238,11 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
+ userSetting.getServerId() + "_*_"
+ platformGbId + "_*" + "_*" + "_*";
List<Object> queryResult = RedisUtil.scan(redisTemplate, key);
List<SendRtpItem> result= new ArrayList<>();
List<SendRtpInfo> result= new ArrayList<>();
for (Object o : queryResult) {
String keyItem = (String) o;
result.add((SendRtpItem) redisTemplate.opsForValue().get(keyItem));
result.add((SendRtpInfo) redisTemplate.opsForValue().get(keyItem));
}
return result;
@@ -278,20 +278,20 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
* 删除RTP推送信息缓存
*/
@Override
public void deleteSendRTPServer(SendRtpItem sendRtpItem) {
public void deleteSendRTPServer(SendRtpInfo sendRtpItem) {
deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),sendRtpItem.getCallId(), sendRtpItem.getStream());
}
@Override
public List<SendRtpItem> queryAllSendRTPServer() {
public List<SendRtpInfo> queryAllSendRTPServer() {
String key = VideoManagerConstants.SEND_RTP_INFO_PREFIX
+ userSetting.getServerId() + "_*";
List<Object> queryResult = RedisUtil.scan(redisTemplate, key);
List<SendRtpItem> result= new ArrayList<>();
List<SendRtpInfo> result= new ArrayList<>();
for (Object o : queryResult) {
String keyItem = (String) o;
result.add((SendRtpItem) redisTemplate.opsForValue().get(keyItem));
result.add((SendRtpInfo) redisTemplate.opsForValue().get(keyItem));
}
return result;
@@ -662,7 +662,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
}
@Override
public void sendPlatformStartPlayMsg(SendRtpItem sendRtpItem, Platform platform) {
public void sendPlatformStartPlayMsg(SendRtpInfo sendRtpItem, Platform platform) {
if (sendRtpItem.getPlayType() == InviteStreamType.PUSH && platform != null) {
MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, sendRtpItem.getApp(), sendRtpItem.getStream(),
sendRtpItem.getChannelId(), platform.getServerGBId(), platform.getName(), userSetting.getServerId(),
@@ -675,7 +675,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
}
@Override
public void sendPlatformStopPlayMsg(SendRtpItem sendRtpItem, Platform platform, CommonGBChannel channel) {
public void sendPlatformStopPlayMsg(SendRtpInfo sendRtpItem, Platform platform, CommonGBChannel channel) {
MessageForPushChannel msg = MessageForPushChannel.getInstance(0,
sendRtpItem.getApp(), sendRtpItem.getStream(), channel.getGbDeviceId(),
@@ -716,26 +716,26 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
}
@Override
public void addWaiteSendRtpItem(SendRtpItem sendRtpItem, int platformPlayTimeout) {
public void addWaiteSendRtpItem(SendRtpInfo sendRtpItem, int platformPlayTimeout) {
String key = VideoManagerConstants.WAITE_SEND_PUSH_STREAM + sendRtpItem.getApp() + "_" + sendRtpItem.getStream();
redisTemplate.opsForValue().set(key, sendRtpItem);
}
@Override
public SendRtpItem getWaiteSendRtpItem(String app, String stream) {
public SendRtpInfo getWaiteSendRtpItem(String app, String stream) {
String key = VideoManagerConstants.WAITE_SEND_PUSH_STREAM + app + "_" + stream;
return JsonUtil.redisJsonToObject(redisTemplate, key, SendRtpItem.class);
return JsonUtil.redisJsonToObject(redisTemplate, key, SendRtpInfo.class);
}
@Override
public void sendStartSendRtp(SendRtpItem sendRtpItem) {
public void sendStartSendRtp(SendRtpInfo sendRtpItem) {
String key = VideoManagerConstants.START_SEND_PUSH_STREAM + sendRtpItem.getApp() + "_" + sendRtpItem.getStream();
log.info("[redis发送通知] 通知其他WVP推流 {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getPlatformId());
redisTemplate.convertAndSend(key, JSON.toJSON(sendRtpItem));
}
@Override
public void sendPushStreamOnline(SendRtpItem sendRtpItem) {
public void sendPushStreamOnline(SendRtpInfo sendRtpItem) {
String key = VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE_REQUESTED;
log.info("[redis发送通知] 流上线 {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getPlatformId());
redisTemplate.convertAndSend(key, JSON.toJSON(sendRtpItem));

View File

@@ -5,7 +5,7 @@ import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.security.JwtUtils;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
@@ -202,7 +202,7 @@ public class PsController {
sendInfo.setPushApp(app);
sendInfo.setPushStream(stream);
sendInfo.setPushSSRC(ssrc);
SendRtpItem sendRtpItem = SendRtpItem.getInstance(app, stream, ssrc, dstIp, dstPort, !isUdp, sendInfo.getSendLocalPort(), null);
SendRtpInfo sendRtpItem = SendRtpInfo.getInstance(app, stream, ssrc, dstIp, dstPort, !isUdp, sendInfo.getSendLocalPort(), null);
Boolean streamReady = mediaServerService.isStreamReady(mediaServer, app, stream);
if (streamReady) {
mediaServerService.startSendRtp(mediaServer, sendRtpItem);

View File

@@ -5,7 +5,7 @@ import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.security.JwtUtils;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
@@ -225,15 +225,15 @@ public class RtpController {
sendInfo.setPushSSRC(ssrc);
SendRtpItem sendRtpItemForVideo;
SendRtpItem sendRtpItemForAudio;
SendRtpInfo sendRtpItemForVideo;
SendRtpInfo sendRtpItemForAudio;
if (!ObjectUtils.isEmpty(dstIpForAudio) && dstPortForAudio > 0) {
sendRtpItemForAudio = SendRtpItem.getInstance(app, stream, ssrc, dstIpForAudio, dstPortForAudio, !isUdp, sendInfo.getSendLocalPortForAudio(), ptForAudio);
sendRtpItemForAudio = SendRtpInfo.getInstance(app, stream, ssrc, dstIpForAudio, dstPortForAudio, !isUdp, sendInfo.getSendLocalPortForAudio(), ptForAudio);
} else {
sendRtpItemForAudio = null;
}
if (!ObjectUtils.isEmpty(dstIpForVideo) && dstPortForVideo > 0) {
sendRtpItemForVideo = SendRtpItem.getInstance(app, stream, ssrc, dstIpForAudio, dstPortForAudio, !isUdp, sendInfo.getSendLocalPortForVideo(), ptForVideo);
sendRtpItemForVideo = SendRtpInfo.getInstance(app, stream, ssrc, dstIpForAudio, dstPortForAudio, !isUdp, sendInfo.getSendLocalPortForVideo(), ptForVideo);
} else {
sendRtpItemForVideo = null;
}

View File

@@ -8,14 +8,13 @@ import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService;
import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService;
import com.genersoft.iot.vmp.gb28181.service.IPlayService;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
@@ -38,9 +37,6 @@ public class ApiStreamController {
@Autowired
private SIPCommander cmder;
@Autowired
private IVideoManagerStorage storager;
@Autowired
private UserSetting userSetting;
@@ -94,15 +90,7 @@ public class ApiStreamController {
result.setResult(resultJSON);
return result;
}
result.onTimeout(()->{
log.info("播放等待超时");
JSONObject resultJSON = new JSONObject();
resultJSON.put("error","timeout");
result.setResult(resultJSON);
inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, serial, code);
deviceChannelService.stopPlay(serial, code);
// 清理RTP server
});
DeviceChannel deviceChannel = deviceChannelService.getOne(serial, code);
if (deviceChannel == null) {
@@ -116,6 +104,17 @@ public class ApiStreamController {
result.setResult(resultJSON);
return result;
}
result.onTimeout(()->{
log.info("播放等待超时");
JSONObject resultJSON = new JSONObject();
resultJSON.put("error","timeout");
result.setResult(resultJSON);
inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceChannel.getId());
deviceChannelService.stopPlay(serial, code);
// 清理RTP server
});
MediaServer newMediaServerItem = playService.getNewMediaServerItem(device);
playService.play(newMediaServerItem, serial, code, null, (errorCode, msg, data) -> {
@@ -224,18 +223,26 @@ public class ApiStreamController {
){
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, serial, code);
if (inviteInfo == null) {
JSONObject result = new JSONObject();
result.put("error","未找到流信息");
return result;
}
Device device = deviceService.getDeviceByDeviceId(serial);
if (device == null) {
JSONObject result = new JSONObject();
result.put("error","未找到设备");
return result;
}
DeviceChannel deviceChannel = deviceChannelService.getOne(serial, code);
if (deviceChannel == null) {
JSONObject result = new JSONObject();
result.put("error","未找到通道");
return result;
}
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceChannel.getId());
if (inviteInfo == null) {
JSONObject result = new JSONObject();
result.put("error","未找到流信息");
return result;
}
try {
cmder.streamByeCmd(device, code, inviteInfo.getStream(), null);
} catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {