Merge branch 'master' into 重构/1078

# Conflicts:
#	src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java
#	数据库/2.7.3/初始化-mysql-2.7.3.sql
#	数据库/2.7.3/初始化-postgresql-kingbase-2.7.3.sql
This commit is contained in:
648540858
2024-12-19 23:19:09 +08:00
60 changed files with 1899 additions and 208 deletions

View File

@@ -15,6 +15,7 @@ import java.io.BufferedReader;
import java.io.File;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
/**
@@ -54,7 +55,7 @@ public class CivilCodeFileConf implements CommandLineRunner {
inputStream = Files.newInputStream(civilCodeFile.toPath());
}
BufferedReader inputStreamReader = new BufferedReader(new InputStreamReader(inputStream));
BufferedReader inputStreamReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
int index = -1;
String line;
while ((line = inputStreamReader.readLine()) != null) {

View File

@@ -34,6 +34,7 @@ public class DynamicTask {
threadPoolTaskScheduler.setPoolSize(300);
threadPoolTaskScheduler.setWaitForTasksToCompleteOnShutdown(true);
threadPoolTaskScheduler.setAwaitTerminationSeconds(10);
threadPoolTaskScheduler.setThreadNamePrefix("dynamicTask-");
threadPoolTaskScheduler.initialize();
}

View File

@@ -1,13 +1,18 @@
package com.genersoft.iot.vmp.conf;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import static com.genersoft.iot.vmp.conf.ThreadPoolTaskConfig.cpuNum;
/**
* "@Scheduled"是Spring框架提供的一种定时任务执行机制默认情况下它是单线程的在同时执行多个定时任务时可能会出现阻塞和性能问题。
* 为了解决这种单线程瓶颈问题,可以将定时任务的执行机制改为支持多线程
@@ -15,16 +20,21 @@ import java.util.concurrent.ThreadPoolExecutor;
@Configuration
public class ScheduleConfig implements SchedulingConfigurer {
public static final int cpuNum = Runtime.getRuntime().availableProcessors();
/**
* 核心线程数(默认线程数)
*/
private static final int corePoolSize = Math.max(cpuNum, 20);
private static final int corePoolSize = cpuNum;
private static final String threadNamePrefix = "scheduled-task-pool-%d";
/**
* 线程池名前缀
*/
private static final String threadNamePrefix = "schedule";
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
taskRegistrar.setScheduler(new ScheduledThreadPoolExecutor(corePoolSize,
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(corePoolSize,
new BasicThreadFactory.Builder().namingPattern(threadNamePrefix).daemon(true).build(),
new ThreadPoolExecutor.CallerRunsPolicy()));
new ThreadPoolExecutor.CallerRunsPolicy());
taskRegistrar.setScheduler(scheduledThreadPoolExecutor);
}
}

View File

@@ -28,11 +28,11 @@ public class ThreadPoolTaskConfig {
/**
* 核心线程数(默认线程数)
*/
private static final int corePoolSize = cpuNum;
private static final int corePoolSize = Math.max(cpuNum * 2, 16);
/**
* 最大线程数
*/
private static final int maxPoolSize = cpuNum*2;
private static final int maxPoolSize = corePoolSize * 10;
/**
* 允许线程空闲时间(单位:默认为秒)
*/
@@ -45,12 +45,9 @@ public class ThreadPoolTaskConfig {
/**
* 线程池名前缀
*/
private static final String threadNamePrefix = "wvp-";
private static final String threadNamePrefix = "async-";
/**
*
* @return
*/
@Bean("taskExecutor") // bean的名称默认为首字母小写的方法名
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

View File

@@ -170,4 +170,9 @@ public class UserSetting {
*/
private int gbDeviceOnline = 1;
/**
* 登录超时时间(分钟)
*/
private long loginTimeout = 30;
}

View File

@@ -1,5 +1,6 @@
package com.genersoft.iot.vmp.conf.security;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.security.dto.JwtUser;
import com.genersoft.iot.vmp.service.IUserApiKeyService;
import com.genersoft.iot.vmp.service.IUserService;
@@ -46,13 +47,15 @@ public class JwtUtils implements InitializingBean {
/**
* token过期时间(分钟)
*/
public static final long EXPIRATION_TIME = 30 * 24 * 60;
public static final long EXPIRATION_TIME = 30;
private static RsaJsonWebKey rsaJsonWebKey;
private static IUserService userService;
private static IUserApiKeyService userApiKeyService;
private static UserSetting userSetting;
public static String getApiKeyHeader() {
return API_KEY_HEADER;
@@ -68,6 +71,11 @@ public class JwtUtils implements InitializingBean {
JwtUtils.userApiKeyService = userApiKeyService;
}
@Resource
public void setUserSetting(UserSetting userSetting) {
JwtUtils.userSetting = userSetting;
}
@Override
public void afterPropertiesSet() {
try {
@@ -153,7 +161,7 @@ public class JwtUtils implements InitializingBean {
}
public static String createToken(String username) {
return createToken(username, EXPIRATION_TIME);
return createToken(username, userSetting.getLoginTimeout());
}
public static String getHeader() {

View File

@@ -20,6 +20,7 @@ public class CatalogData {
private Device device;
private String errorMsg;
private Set<String> redisKeysForChannel = new HashSet<>();
private Set<String> errorChannel = new HashSet<>();
private Set<String> redisKeysForRegion = new HashSet<>();
private Set<String> redisKeysForGroup = new HashSet<>();

View File

@@ -126,6 +126,9 @@ public class CommonGBChannel {
@Schema(description = "关联的国标设备数据库ID")
private Integer gbDeviceDbId;
@Schema(description = "二进制保存的录制计划, 每一位表示每个小时的前半个小时")
private Long recordPLan;
@Schema(description = "关联的推流Id流来源是推流时有效")
private Integer streamPushId;

View File

@@ -101,11 +101,31 @@ public class CommonChannelController {
return channel;
}
@Operation(summary = "获取通道列表", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "page", description = "当前页", required = true)
@Parameter(name = "count", description = "每页查询数量", required = true)
@Parameter(name = "query", description = "查询内容")
@Parameter(name = "online", description = "是否在线")
@Parameter(name = "hasRecordPlan", description = "是否已设置录制计划")
@Parameter(name = "channelType", description = "通道类型, 0国标设备1推流设备2拉流代理")
@GetMapping("/list")
public PageInfo<CommonGBChannel> queryList(int page, int count,
@RequestParam(required = false) String query,
@RequestParam(required = false) Boolean online,
@RequestParam(required = false) Boolean hasRecordPlan,
@RequestParam(required = false) Integer channelType){
if (ObjectUtils.isEmpty(query)){
query = null;
}
return channelService.queryList(page, count, query, online, hasRecordPlan, channelType);
}
@Operation(summary = "获取关联行政区划通道列表", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "page", description = "当前页", required = true)
@Parameter(name = "count", description = "每页查询数量", required = true)
@Parameter(name = "query", description = "查询内容")
@Parameter(name = "online", description = "是否在线")
@Parameter(name = "channelType", description = "通道类型, 0国标设备1推流设备2拉流代理")
@Parameter(name = "civilCode", description = "行政区划")
@GetMapping("/civilcode/list")
public PageInfo<CommonGBChannel> queryListByCivilCode(int page, int count,
@@ -124,6 +144,7 @@ public class CommonChannelController {
@Parameter(name = "count", description = "每页查询数量", required = true)
@Parameter(name = "query", description = "查询内容")
@Parameter(name = "online", description = "是否在线")
@Parameter(name = "channelType", description = "通道类型, 0国标设备1推流设备2拉流代理")
@Parameter(name = "groupDeviceId", description = "业务分组下的父节点ID")
@GetMapping("/parent/list")
public PageInfo<CommonGBChannel> queryListByParentId(int page, int count,

View File

@@ -269,7 +269,7 @@ public class DeviceQuery {
@Operation(summary = "修改数据流传输模式", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "deviceId", description = "设备国标编号", required = true)
@Parameter(name = "streamMode", description = "数据流传输模式, 取值:" +
"UDPudp传输TCP-ACTIVEtcp主动模式,暂不支持TCP-PASSIVEtcp被动模式", required = true)
"UDPudp传输TCP-ACTIVEtcp主动模式TCP-PASSIVEtcp被动模式", required = true)
@PostMapping("/transport/{deviceId}/{streamMode}")
public void updateTransport(@PathVariable String deviceId, @PathVariable String streamMode){
Device device = deviceService.getDeviceByDeviceId(deviceId);

View File

@@ -460,4 +460,97 @@ public interface CommonGBChannelMapper {
" </script>"})
void updateGpsByDeviceIdForStreamPush(List<CommonGBChannel> channels);
@SelectProvider(type = ChannelProvider.class, method = "queryList")
List<CommonGBChannel> queryList(@Param("query") String query, @Param("online") Boolean online, @Param("hasRecordPlan") Boolean hasRecordPlan, @Param("channelType") Integer channelType);
@Update(value = {" <script>" +
" UPDATE wvp_device_channel " +
" SET record_plan_id = null" +
" WHERE id in "+
" <foreach collection='channelIds' item='item' open='(' separator=',' close=')' > #{item}</foreach>" +
" </script>"})
void removeRecordPlan(List<Integer> channelIds);
@Update(value = {" <script>" +
" UPDATE wvp_device_channel " +
" SET record_plan_id = #{planId}" +
" WHERE id in "+
" <foreach collection='channelIds' item='item' open='(' separator=',' close=')' > #{item}</foreach>" +
" </script>"})
void addRecordPlan(List<Integer> channelIds, @Param("planId") Integer planId);
@Update(value = {" <script>" +
" UPDATE wvp_device_channel " +
" SET record_plan_id = #{planId}" +
" </script>"})
void addRecordPlanForAll(@Param("planId") Integer planId);
@Update(value = {" <script>" +
" UPDATE wvp_device_channel " +
" SET record_plan_id = null" +
" WHERE record_plan_id = #{planId} "+
" </script>"})
void removeRecordPlanByPlanId( @Param("planId") Integer planId);
@Select("<script>" +
" select " +
" wdc.id as gb_id,\n" +
" wdc.device_db_id as gb_device_db_id,\n" +
" wdc.stream_push_id,\n" +
" wdc.stream_proxy_id,\n" +
" wdc.create_time,\n" +
" wdc.update_time,\n" +
" wdc.record_plan_id,\n" +
" coalesce( wdc.gb_device_id, wdc.device_id) as gb_device_id,\n" +
" coalesce( wdc.gb_name, wdc.name) as gb_name,\n" +
" coalesce( wdc.gb_manufacturer, wdc.manufacturer) as gb_manufacturer,\n" +
" coalesce( wdc.gb_model, wdc.model) as gb_model,\n" +
" coalesce( wdc.gb_owner, wdc.owner) as gb_owner,\n" +
" coalesce( wdc.gb_civil_code, wdc.civil_code) as gb_civil_code,\n" +
" coalesce( wdc.gb_block, wdc.block) as gb_block,\n" +
" coalesce( wdc.gb_address, wdc.address) as gb_address,\n" +
" coalesce( wdc.gb_parental, wdc.parental) as gb_parental,\n" +
" coalesce( wdc.gb_parent_id, wdc.parent_id) as gb_parent_id,\n" +
" coalesce( wdc.gb_safety_way, wdc.safety_way) as gb_safety_way,\n" +
" coalesce( wdc.gb_register_way, wdc.register_way) as gb_register_way,\n" +
" coalesce( wdc.gb_cert_num, wdc.cert_num) as gb_cert_num,\n" +
" coalesce( wdc.gb_certifiable, wdc.certifiable) as gb_certifiable,\n" +
" coalesce( wdc.gb_err_code, wdc.err_code) as gb_err_code,\n" +
" coalesce( wdc.gb_end_time, wdc.end_time) as gb_end_time,\n" +
" coalesce( wdc.gb_secrecy, wdc.secrecy) as gb_secrecy,\n" +
" coalesce( wdc.gb_ip_address, wdc.ip_address) as gb_ip_address,\n" +
" coalesce( wdc.gb_port, wdc.port) as gb_port,\n" +
" coalesce( wdc.gb_password, wdc.password) as gb_password,\n" +
" coalesce( wdc.gb_status, wdc.status) as gb_status,\n" +
" coalesce( wdc.gb_longitude, wdc.longitude) as gb_longitude,\n" +
" coalesce( wdc.gb_latitude, wdc.latitude) as gb_latitude,\n" +
" coalesce( wdc.gb_ptz_type, wdc.ptz_type) as gb_ptz_type,\n" +
" coalesce( wdc.gb_position_type, wdc.position_type) as gb_position_type,\n" +
" coalesce( wdc.gb_room_type, wdc.room_type) as gb_room_type,\n" +
" coalesce( wdc.gb_use_type, wdc.use_type) as gb_use_type,\n" +
" coalesce( wdc.gb_supply_light_type, wdc.supply_light_type) as gb_supply_light_type,\n" +
" coalesce( wdc.gb_direction_type, wdc.direction_type) as gb_direction_type,\n" +
" coalesce( wdc.gb_resolution, wdc.resolution) as gb_resolution,\n" +
" coalesce( wdc.gb_business_group_id, wdc.business_group_id) as gb_business_group_id,\n" +
" coalesce( wdc.gb_download_speed, wdc.download_speed) as gb_download_speed,\n" +
" coalesce( wdc.gb_svc_space_support_mod, wdc.svc_space_support_mod) as gb_svc_space_support_mod,\n" +
" coalesce( wdc.gb_svc_time_support_mode, wdc.svc_time_support_mode) as gb_svc_time_support_mode \n" +
" from wvp_device_channel wdc" +
" where wdc.channel_type = 0 " +
" <if test='query != null'> " +
" AND (coalesce(wdc.gb_device_id, wdc.device_id) LIKE concat('%',#{query},'%') escape '/' " +
" OR coalesce(wdc.gb_name, wdc.name) LIKE concat('%',#{query},'%') escape '/')</if> " +
" <if test='online == true'> AND coalesce(wdc.gb_status, wdc.status) = 'ON'</if> " +
" <if test='online == false'> AND coalesce(wdc.gb_status, wdc.status) = 'OFF'</if> " +
" <if test='hasLink == true'> AND wdc.record_plan_id = #{planId}</if> " +
" <if test='hasLink == false'> AND wdc.record_plan_id is null</if> " +
" <if test='channelType == 0'> AND wdc.device_db_id is not null</if> " +
" <if test='channelType == 1'> AND wdc.stream_push_id is not null</if> " +
" <if test='channelType == 2'> AND wdc.stream_proxy_id is not null</if> " +
"</script>")
List<CommonGBChannel> queryForRecordPlanForWebList(@Param("planId") Integer planId, @Param("query") String query,
@Param("channelType") Integer channelType, @Param("online") Boolean online,
@Param("hasLink") Boolean hasLink);
}

View File

@@ -93,6 +93,12 @@ public interface DeviceChannelMapper {
@SelectProvider(type = DeviceChannelProvider.class, method = "queryChannelsByDeviceDbId")
List<DeviceChannel> queryChannelsByDeviceDbId(@Param("deviceDbId") int deviceDbId);
@Select(value = {" <script> " +
"select id from wvp_device_channel where device_db_id in " +
" <foreach item='item' index='index' collection='deviceDbIds' open='(' separator=',' close=')'> #{item} </foreach>" +
" </script>"})
List<Integer> queryChaneIdListByDeviceDbIds(List<Integer> deviceDbIds);
@Delete("DELETE FROM wvp_device_channel WHERE device_db_id=#{deviceId}")
int cleanChannelsByDeviceId(@Param("deviceId") int deviceId);
@@ -407,6 +413,10 @@ public interface DeviceChannelMapper {
"</script>")
void updateChannelStreamIdentification(DeviceChannel channel);
@Update("<script>" +
"UPDATE wvp_device_channel SET stream_identification=#{streamIdentification}" +
"</script>")
void updateAllChannelStreamIdentification(@Param("streamIdentification") String streamIdentification);
@Update({"<script>" +
"<foreach collection='channelList' item='item' separator=';'>" +

View File

@@ -18,6 +18,7 @@ public class ChannelProvider {
" jt_channel_id,\n" +
" create_time,\n" +
" update_time,\n" +
" record_plan_id,\n" +
" coalesce(gb_device_id, device_id) as gb_device_id,\n" +
" coalesce(gb_name, name) as gb_name,\n" +
" coalesce(gb_manufacturer, manufacturer) as gb_manufacturer,\n" +
@@ -188,6 +189,37 @@ public class ChannelProvider {
return sqlBuild.toString();
}
public String queryList(Map<String, Object> params ){
StringBuilder sqlBuild = new StringBuilder();
sqlBuild.append(BASE_SQL);
sqlBuild.append(" where channel_type = 0 ");
if (params.get("query") != null) {
sqlBuild.append(" AND (coalesce(gb_device_id, device_id) LIKE concat('%',#{query},'%') escape '/'" +
" OR coalesce(gb_name, name) LIKE concat('%',#{query},'%') escape '/' )")
;
}
if (params.get("online") != null && (Boolean)params.get("online")) {
sqlBuild.append(" AND coalesce(gb_status, status) = 'ON'");
}
if (params.get("online") != null && !(Boolean)params.get("online")) {
sqlBuild.append(" AND coalesce(gb_status, status) = 'OFF'");
}
if (params.get("hasRecordPlan") != null && (Boolean)params.get("hasRecordPlan")) {
sqlBuild.append(" AND record_plan_id > 0");
}
if (params.get("channelType") != null) {
if ((Integer)params.get("channelType") == 0) {
sqlBuild.append(" AND device_db_id is not null");
}else if ((Integer)params.get("channelType") == 1) {
sqlBuild.append(" AND stream_push_id is not null");
}else if ((Integer)params.get("channelType") == 2) {
sqlBuild.append(" AND stream_proxy_id is not null");
}
}
return sqlBuild.toString();
}
public String queryInListByStatus(Map<String, Object> params ){
StringBuilder sqlBuild = new StringBuilder();
sqlBuild.append(BASE_SQL);

View File

@@ -122,4 +122,7 @@ public interface IDeviceChannelService {
DeviceChannel getOneBySourceId(int deviceDbId, String channelId);
List<DeviceChannel> queryChaneListByDeviceDbId(Integer deviceDbId);
List<Integer> queryChaneIdListByDeviceDbIds(List<Integer> deviceDbId);
}

View File

@@ -84,4 +84,7 @@ public interface IGbChannelService {
List<CommonGBChannel> queryListByStreamPushList(List<StreamPush> streamPushList);
void updateGpsByDeviceIdForStreamPush(List<CommonGBChannel> channels);
PageInfo<CommonGBChannel> queryList(int page, int count, String query, Boolean online, Boolean hasRecordPlan, Integer channelType);
}

View File

@@ -69,13 +69,8 @@ public interface IPlatformService {
/**
* 向上级发送语音喊话的消息
* @param platform 平台
* @param channelId 通道
* @param hookEvent hook事件
* @param errorEvent 信令错误事件
* @param timeoutCallback 超时事件
*/
void broadcastInvite(Platform platform, CommonGBChannel channelId, MediaServer mediaServerItem, HookSubscribe.Event hookEvent,
void broadcastInvite(Platform platform, CommonGBChannel channel, String sourceId, MediaServer mediaServerItem, HookSubscribe.Event hookEvent,
SipSubscribe.Event errorEvent, InviteTimeOutCallback timeoutCallback) throws InvalidArgumentException, ParseException, SipException;
/**

View File

@@ -338,7 +338,11 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
log.info("[更新通道码流类型] 设备: {}, 通道:{} 码流: {}", channel.getDeviceId(), channel.getDeviceId(),
channel.getStreamIdentification());
}
channelMapper.updateChannelStreamIdentification(channel);
if (channel.getId() > 0) {
channelMapper.updateChannelStreamIdentification(channel);
}else {
channelMapper.updateAllChannelStreamIdentification(channel.getStreamIdentification());
}
}
@Override
@@ -350,6 +354,16 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
return channelMapper.queryChannelsByDeviceDbId(device.getId());
}
@Override
public List<DeviceChannel> queryChaneListByDeviceDbId(Integer deviceDbId) {
return channelMapper.queryChannelsByDeviceDbId(deviceDbId);
}
@Override
public List<Integer> queryChaneIdListByDeviceDbIds(List<Integer> deviceDbIds) {
return channelMapper.queryChaneIdListByDeviceDbIds(deviceDbIds);
}
@Override
public void updateChannelGPS(Device device, DeviceChannel deviceChannel, MobilePosition mobilePosition) {
if (userSetting.getSavePositionHistory()) {

View File

@@ -714,4 +714,16 @@ public class GbChannelServiceImpl implements IGbChannelService {
public void updateGpsByDeviceIdForStreamPush(List<CommonGBChannel> channels) {
commonGBChannelMapper.updateGpsByDeviceIdForStreamPush(channels);
}
@Override
public PageInfo<CommonGBChannel> queryList(int page, int count, String query, Boolean online, Boolean hasRecordPlan, Integer channelType) {
PageHelper.startPage(page, count);
if (query != null) {
query = query.replaceAll("/", "//")
.replaceAll("%", "/%")
.replaceAll("_", "/_");
}
List<CommonGBChannel> all = commonGBChannelMapper.queryList(query, online, hasRecordPlan, channelType);
return new PageInfo<>(all);
}
}

View File

@@ -55,6 +55,11 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
@Override
public PageInfo<PlatformChannel> queryChannelList(int page, int count, String query, Integer channelType, Boolean online, Integer platformId, Boolean hasShare) {
PageHelper.startPage(page, count);
if (query != null) {
query = query.replaceAll("/", "//")
.replaceAll("%", "/%")
.replaceAll("_", "/_");
}
List<PlatformChannel> all = platformChannelMapper.queryForPlatformForWebList(platformId, query, channelType, online, hasShare);
return new PageInfo<>(all);
}

View File

@@ -484,7 +484,7 @@ public class PlatformServiceImpl implements IPlatformService {
}
@Override
public void broadcastInvite(Platform platform, CommonGBChannel channel, MediaServer mediaServerItem, HookSubscribe.Event hookEvent,
public void broadcastInvite(Platform platform, CommonGBChannel channel, String sourceId, MediaServer mediaServerItem, HookSubscribe.Event hookEvent,
SipSubscribe.Event errorEvent, InviteTimeOutCallback timeoutCallback) throws InvalidArgumentException, ParseException, SipException {
if (mediaServerItem == null) {
@@ -565,7 +565,7 @@ public class PlatformServiceImpl implements IPlatformService {
}
}
}, userSetting.getPlayTimeout());
commanderForPlatform.broadcastInviteCmd(platform, channel, mediaServerItem, ssrcInfo, (hookData)->{
commanderForPlatform.broadcastInviteCmd(platform, channel,sourceId, mediaServerItem, ssrcInfo, (hookData)->{
log.info("[国标级联] 发起语音喊话 收到上级推流 deviceId: {}, channelId: {}", platform.getServerGBId(), channel.getGbDeviceId());
dynamicTask.stop(timeOutTaskKey);
// hook响应
@@ -578,45 +578,6 @@ public class PlatformServiceImpl implements IPlatformService {
inviteOKHandler(event, ssrcInfo, tcpMode, ssrcCheck, mediaServerItem, platform, channel, timeOutTaskKey,
null, inviteInfo, InviteSessionType.BROADCAST);
// // 收到200OK 检测ssrc是否有变化防止上级自定义了ssrc
// ResponseEvent responseEvent = (ResponseEvent) event.event;
// String contentString = new String(responseEvent.getResponse().getRawContent());
// // 获取ssrc
// int ssrcIndex = contentString.indexOf("y=");
// // 检查是否有y字段
// if (ssrcIndex >= 0) {
// //ssrc规定长度为10字节不取余下长度以避免后续还有“f=”字段 TODO 后续对不规范的非10位ssrc兼容
// String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
// // 查询到ssrc不一致且开启了ssrc校验则需要针对处理
// if (ssrcInfo.getSsrc().equals(ssrcInResponse) || ssrcCheck) {
// tcpActiveHandler(platform, )
// return;
// }
// logger.info("[点播消息] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse);
// if (!mediaServerItem.isRtpEnable()) {
// logger.info("[点播消息] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
// // 释放ssrc
// mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
// // 单端口模式streamId也有变化需要重新设置监听
// if (!mediaServerItem.isRtpEnable()) {
// // 添加订阅
// HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
// subscribe.removeSubscribe(hookSubscribe);
// hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase());
// subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam) -> {
// logger.info("[ZLM HOOK] ssrc修正后收到订阅消息 " + hookParam);
// dynamicTask.stop(timeOutTaskKey);
// // hook响应
// playService.onPublishHandlerForPlay(mediaServerItemInUse, hookParam, platform.getServerGBId(), channelId);
// hookEvent.response(mediaServerItemInUse, hookParam);
// });
// }
// // 关闭rtp server
// mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
// // 重新开启ssrc server
// mediaServerService.openRTPServer(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse, false, false, ssrcInfo.getPort(), true, false, tcpMode);
// }
// }
}, eventResult -> {
// 收到错误回复
if (errorEvent != null) {

View File

@@ -283,7 +283,7 @@ public class CatalogDataManager implements CommandLineRunner {
if (catalogData == null) {
return 0;
}
return catalogData.getRedisKeysForChannel().size();
return catalogData.getRedisKeysForChannel().size() + catalogData.getErrorChannel().size();
}
public int sumNum(String deviceId, int sn) {

View File

@@ -140,13 +140,13 @@ public interface ISIPCommanderForPlatform {
* @param sendRtpItem
* @return
*/
void sendMediaStatusNotify(Platform platform, SendRtpInfo sendRtpItem) throws SipException, InvalidArgumentException, ParseException;
void sendMediaStatusNotify(Platform platform, SendRtpInfo sendRtpItem, CommonGBChannel channel) throws SipException, InvalidArgumentException, ParseException;
void streamByeCmd(Platform platform, SendRtpInfo sendRtpItem, CommonGBChannel channel) throws SipException, InvalidArgumentException, ParseException;
void streamByeCmd(Platform platform, CommonGBChannel channel, String stream, String callId, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException;
void broadcastInviteCmd(Platform platform, CommonGBChannel channel, MediaServer mediaServerItem,
void broadcastInviteCmd(Platform platform, CommonGBChannel channel, String sourceId, MediaServer mediaServerItem,
SSRCInfo ssrcInfo, HookSubscribe.Event event, SipSubscribe.Event okEvent,
SipSubscribe.Event errorEvent) throws ParseException, SipException, InvalidArgumentException;

View File

@@ -312,12 +312,12 @@ public class SIPRequestHeaderPlarformProvider {
return request;
}
public Request createInviteRequest(Platform platform, String channelId, String content, String viaTag, String fromTag, String ssrc, CallIdHeader callIdHeader) throws PeerUnavailableException, ParseException, InvalidArgumentException {
public Request createInviteRequest(Platform platform,String sourceId, String channelId, String content, String viaTag, String fromTag, String ssrc, CallIdHeader callIdHeader) throws PeerUnavailableException, ParseException, InvalidArgumentException {
Request request = null;
//请求行
String platformHostAddress = platform.getServerIp() + ":" + platform.getServerPort();
String localHostAddress = sipLayer.getLocalIp(platform.getDeviceIp())+":"+ platform.getDevicePort();
SipURI requestLine = SipFactory.getInstance().createAddressFactory().createSipURI(channelId, platformHostAddress);
SipURI requestLine = SipFactory.getInstance().createAddressFactory().createSipURI(sourceId, platformHostAddress);
//via
ArrayList<ViaHeader> viaHeaders = new ArrayList<ViaHeader>();
ViaHeader viaHeader = SipFactory.getInstance().createHeaderFactory().createViaHeader(sipLayer.getLocalIp(platform.getDeviceIp()), platform.getDevicePort(), platform.getTransport(), viaTag);
@@ -329,7 +329,7 @@ public class SIPRequestHeaderPlarformProvider {
Address fromAddress = SipFactory.getInstance().createAddressFactory().createAddress(fromSipURI);
FromHeader fromHeader = SipFactory.getInstance().createHeaderFactory().createFromHeader(fromAddress, fromTag); //必须要有标记否则无法创建会话无法回应ack
//to
SipURI toSipURI = SipFactory.getInstance().createAddressFactory().createSipURI(channelId, platformHostAddress);
SipURI toSipURI = SipFactory.getInstance().createAddressFactory().createSipURI(sourceId, platformHostAddress);
Address toAddress = SipFactory.getInstance().createAddressFactory().createAddress(toSipURI);
ToHeader toHeader = SipFactory.getInstance().createHeaderFactory().createToHeader(toAddress,null);
@@ -345,7 +345,7 @@ public class SIPRequestHeaderPlarformProvider {
Address concatAddress = SipFactory.getInstance().createAddressFactory().createAddress(SipFactory.getInstance().createAddressFactory().createSipURI(sipConfig.getId(),localHostAddress));
request.addHeader(SipFactory.getInstance().createHeaderFactory().createContactHeader(concatAddress));
// Subject
SubjectHeader subjectHeader = SipFactory.getInstance().createHeaderFactory().createSubjectHeader(String.format("%s:%s,%s:%s", sipConfig.getId(), ssrc, channelId, 0));
SubjectHeader subjectHeader = SipFactory.getInstance().createHeaderFactory().createSubjectHeader(String.format("%s:%s,%s:%s", sourceId, ssrc, channelId, 0));
request.addHeader(subjectHeader);
ContentTypeHeader contentTypeHeader = SipFactory.getInstance().createHeaderFactory().createContentTypeHeader("APPLICATION", "SDP");
request.setContent(content, contentTypeHeader);

View File

@@ -7,6 +7,7 @@ import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.SipLayer;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.dao.CommonGBChannelMapper;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.SIPSender;
@@ -59,9 +60,6 @@ public class SIPCommanderForPlatform implements ISIPCommanderForPlatform {
@Autowired
private IMediaServerService mediaServerService;
@Autowired
private SipSubscribe sipSubscribe;
@Autowired
private SipLayer sipLayer;
@@ -599,24 +597,23 @@ public class SIPCommanderForPlatform implements ISIPCommanderForPlatform {
}
@Override
public void sendMediaStatusNotify(Platform parentPlatform, SendRtpInfo sendRtpItem) throws SipException, InvalidArgumentException, ParseException {
if (sendRtpItem == null || parentPlatform == null) {
public void sendMediaStatusNotify(Platform parentPlatform, SendRtpInfo sendRtpInfo, CommonGBChannel channel) throws SipException, InvalidArgumentException, ParseException {
if (channel == null || parentPlatform == null) {
return;
}
String characterSet = parentPlatform.getCharacterSet();
StringBuffer mediaStatusXml = new StringBuffer(200);
mediaStatusXml.append("<?xml version=\"1.0\" encoding=\"" + characterSet + "\"?>\r\n")
.append("<Notify>\r\n")
.append("<CmdType>MediaStatus</CmdType>\r\n")
.append("<SN>" + (int)((Math.random()*9+1)*100000) + "</SN>\r\n")
.append("<DeviceID>" + sendRtpItem.getChannelId() + "</DeviceID>\r\n")
.append("<DeviceID>" + channel.getGbDeviceId() + "</DeviceID>\r\n")
.append("<NotifyType>121</NotifyType>\r\n")
.append("</Notify>\r\n");
SIPRequest messageRequest = (SIPRequest)headerProviderPlatformProvider.createMessageRequest(parentPlatform, mediaStatusXml.toString(),
sendRtpItem);
sendRtpInfo);
sipSender.transmitRequest(parentPlatform.getDeviceIp(),messageRequest);
@@ -691,7 +688,7 @@ public class SIPCommanderForPlatform implements ISIPCommanderForPlatform {
}
@Override
public void broadcastInviteCmd(Platform platform, CommonGBChannel channel, MediaServer mediaServerItem,
public void broadcastInviteCmd(Platform platform, CommonGBChannel channel,String sourceId, MediaServer mediaServerItem,
SSRCInfo ssrcInfo, HookSubscribe.Event event, SipSubscribe.Event okEvent,
SipSubscribe.Event errorEvent) throws ParseException, SipException, InvalidArgumentException {
String stream = ssrcInfo.getStream();
@@ -712,8 +709,9 @@ public class SIPCommanderForPlatform implements ISIPCommanderForPlatform {
StringBuffer content = new StringBuffer(200);
content.append("v=0\r\n");
content.append("o=" + channel.getGbDeviceId() + " 0 0 IN IP4 " + sdpIp + "\r\n");
content.append("o=" + platform.getDeviceGBId() + " 0 0 IN IP4 " + sdpIp + "\r\n");
content.append("s=Play\r\n");
content.append("u=" + channel.getGbDeviceId() + ":0\r\n");
content.append("c=IN IP4 " + sdpIp + "\r\n");
content.append("t=0 0\r\n");
@@ -738,10 +736,10 @@ public class SIPCommanderForPlatform implements ISIPCommanderForPlatform {
content.append("y=" + ssrcInfo.getSsrc() + "\r\n");//ssrc
// f字段:f= v/编码格式/分辨率/帧率/码率类型/码率大小a/编码格式/码率大小/采样率
content.append("f=v/////a/1/8/1\r\n");
content.append("f=v/2/5/25/1/4096a/1/8/1\r\n");
CallIdHeader callIdHeader = sipSender.getNewCallIdHeader(sipLayer.getLocalIp(platform.getDeviceIp()), platform.getTransport());
Request request = headerProviderPlatformProvider.createInviteRequest(platform, channel.getGbDeviceId(),
Request request = headerProviderPlatformProvider.createInviteRequest(platform, sourceId, channel.getGbDeviceId(),
content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), ssrcInfo.getSsrc(),
callIdHeader);
sipSender.transmitRequest(sipLayer.getLocalIp(platform.getDeviceIp()), request, (e -> {

View File

@@ -164,7 +164,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
channelPlayService.start(channel, inviteInfo, platform, ((code, msg, streamInfo) -> {
if (code != InviteErrorCode.SUCCESS.getCode()) {
try {
responseAck(request, code, msg);
responseAck(request, Response.BUSY_HERE , msg);
} catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] 上级Invite 点播失败: {}", e.getMessage());
}

View File

@@ -199,7 +199,9 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor
}
}
} catch (DocumentException e) {
log.error("未处理的异常 ", e);
log.error("[收到移动位置订阅通知] 文档解析异常 \r\n{}", evt.getRequest(), e);
} catch ( Exception e) {
log.error("[收到移动位置订阅通知] 异常: ", e);
}
}
}

View File

@@ -91,7 +91,13 @@ public class BroadcastNotifyMessageHandler extends SIPRequestProcessorParent imp
}
String targetId = targetIDElement.getText();
Element sourceIdElement = rootElement.element("SourceID");
String sourceId;
if (sourceIdElement != null) {
sourceId = sourceIdElement.getText();
}else {
sourceId = targetId;
}
log.info("[国标级联 语音喊话] platform: {}, channel: {}", platform.getServerGBId(), targetId);
CommonGBChannel channel = channelService.queryOneWithPlatform(platform.getId(), targetId);
@@ -125,7 +131,7 @@ public class BroadcastNotifyMessageHandler extends SIPRequestProcessorParent imp
}, eventResult->{
// 消息发送成功, 向上级发送invite获取推流
try {
platformService.broadcastInvite(platform, channel, mediaServerForMinimumLoad, (hookData)->{
platformService.broadcastInvite(platform, channel, sourceId, mediaServerForMinimumLoad, (hookData)->{
// 上级平台推流成功
AudioBroadcastCatch broadcastCatch = audioBroadcastManager.get(channel.getGbId());
if (broadcastCatch != null ) {

View File

@@ -2,14 +2,8 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify
import com.genersoft.iot.vmp.common.InviteInfo;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction;
import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService;
import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService;
import com.genersoft.iot.vmp.gb28181.service.IPlatformService;
import com.genersoft.iot.vmp.gb28181.service.IPlayService;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.service.*;
import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderForPlatform;
@@ -56,7 +50,7 @@ public class MediaStatusNotifyMessageHandler extends SIPRequestProcessorParent i
private SIPCommanderForPlatform sipCommanderFroPlatform;
@Autowired
private IRedisCatchStorage redisCatchStorage;
private IPlatformChannelService platformChannelService;
@Autowired
private IPlatformService platformService;
@@ -108,15 +102,20 @@ public class MediaStatusNotifyMessageHandler extends SIPRequestProcessorParent i
Hook hook = Hook.getInstance(HookType.on_media_arrival, "rtp", ssrcTransaction.getStream(), ssrcTransaction.getMediaServerId());
subscribe.removeSubscribe(hook);
// 如果级联播放,需要给上级发送此通知 TODO 多个上级同时观看一个下级 可能存在停错的问题需要将点播CallId进行上下级绑定
SendRtpInfo sendRtpItem = sendRtpServerService.queryByChannelId(ssrcTransaction.getChannelId(), ssrcTransaction.getPlatformId());
if (sendRtpItem != null) {
Platform parentPlatform = platformService.queryPlatformByServerGBId(sendRtpItem.getTargetId());
SendRtpInfo sendRtpInfo = sendRtpServerService.queryByChannelId(ssrcTransaction.getChannelId(), ssrcTransaction.getPlatformId());
if (sendRtpInfo != null) {
Platform parentPlatform = platformService.queryPlatformByServerGBId(sendRtpInfo.getTargetId());
if (parentPlatform == null) {
log.warn("[级联消息发送]发送MediaStatus发现上级平台{}不存在", sendRtpItem.getTargetId());
log.warn("[级联消息发送]发送MediaStatus发现上级平台{}不存在", sendRtpInfo.getTargetId());
return;
}
CommonGBChannel channel = platformChannelService.queryChannelByPlatformIdAndChannelId(parentPlatform.getId(), sendRtpInfo.getChannelId());
if (channel == null) {
log.warn("[级联消息发送]发送MediaStatus发现通道{}不存在", sendRtpInfo.getChannelId());
return;
}
try {
sipCommanderFroPlatform.sendMediaStatusNotify(parentPlatform, sendRtpItem);
sipCommanderFroPlatform.sendMediaStatusNotify(parentPlatform, sendRtpInfo, channel);
} catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] 国标级联 录像播放完毕: {}", e.getMessage());
}

View File

@@ -94,4 +94,6 @@ public class BroadcastResponseMessageHandler extends SIPRequestProcessorParent i
public void handForPlatform(RequestEvent evt, Platform parentPlatform, Element element) {
}
}

View File

@@ -107,9 +107,9 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "读取配置失败");
}
mediaServer.setId(zlmServerConfig.getGeneralMediaServerId());
mediaServer.setHttpSSlPort(zlmServerConfig.getHttpPort());
mediaServer.setFlvSSLPort(zlmServerConfig.getHttpPort());
mediaServer.setWsFlvSSLPort(zlmServerConfig.getHttpPort());
mediaServer.setHttpSSlPort(zlmServerConfig.getHttpSSLport());
mediaServer.setFlvSSLPort(zlmServerConfig.getHttpSSLport());
mediaServer.setWsFlvSSLPort(zlmServerConfig.getHttpSSLport());
mediaServer.setRtmpPort(zlmServerConfig.getRtmpPort());
mediaServer.setRtmpSSlPort(zlmServerConfig.getRtmpSslPort());
mediaServer.setRtspPort(zlmServerConfig.getRtspPort());

View File

@@ -1,7 +1,11 @@
package com.genersoft.iot.vmp.media.zlm.dto.hook;
import com.genersoft.iot.vmp.media.bean.ResultForOnPublish;
import lombok.Getter;
import lombok.Setter;
@Setter
@Getter
public class HookResultForOnPublish extends HookResult{
private boolean enable_audio;
@@ -34,54 +38,6 @@ public class HookResultForOnPublish extends HookResult{
setMsg(msg);
}
public boolean isEnable_audio() {
return enable_audio;
}
public void setEnable_audio(boolean enable_audio) {
this.enable_audio = enable_audio;
}
public boolean isEnable_mp4() {
return enable_mp4;
}
public void setEnable_mp4(boolean enable_mp4) {
this.enable_mp4 = enable_mp4;
}
public int getMp4_max_second() {
return mp4_max_second;
}
public void setMp4_max_second(int mp4_max_second) {
this.mp4_max_second = mp4_max_second;
}
public String getMp4_save_path() {
return mp4_save_path;
}
public void setMp4_save_path(String mp4_save_path) {
this.mp4_save_path = mp4_save_path;
}
public String getStream_replace() {
return stream_replace;
}
public void setStream_replace(String stream_replace) {
this.stream_replace = stream_replace;
}
public Integer getModify_stamp() {
return modify_stamp;
}
public void setModify_stamp(Integer modify_stamp) {
this.modify_stamp = modify_stamp;
}
@Override
public String toString() {
return "HookResultForOnPublish{" +

View File

@@ -0,0 +1,31 @@
package com.genersoft.iot.vmp.service;
import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
import com.genersoft.iot.vmp.service.bean.RecordPlan;
import com.github.pagehelper.PageInfo;
import java.util.List;
public interface IRecordPlanService {
RecordPlan get(Integer planId);
void update(RecordPlan plan);
void delete(Integer planId);
PageInfo<RecordPlan> query(Integer page, Integer count, String query);
void add(RecordPlan plan);
void link(List<Integer> channelIds, Integer planId);
PageInfo<CommonGBChannel> queryChannelList(int page, int count, String query, Integer channelType, Boolean online, Integer planId, Boolean hasLink);
void linkAll(Integer planId);
void cleanAll(Integer planId);
Integer recording(String app, String stream);
}

View File

@@ -0,0 +1,32 @@
package com.genersoft.iot.vmp.service.bean;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.util.List;
@Data
@Schema(description = "录制计划")
public class RecordPlan {
@Schema(description = "计划数据库ID")
private int id;
@Schema(description = "计划名称")
private String name;
@Schema(description = "计划关联通道数量")
private int channelCount;
@Schema(description = "是否开启定时截图")
private Boolean snap;
@Schema(description = "创建时间")
private String createTime;
@Schema(description = "更新时间")
private String updateTime;
@Schema(description = "计划内容")
private List<RecordPlanItem> planItemList;
}

View File

@@ -0,0 +1,25 @@
package com.genersoft.iot.vmp.service.bean;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
@Data
@Schema(description = "录制计划项")
public class RecordPlanItem {
@Schema(description = "计划项数据库ID")
private int id;
@Schema(description = "计划开始时间的序号, 从0点开始每半个小时增加1")
private Integer start;
@Schema(description = "计划结束时间的序号, 从0点开始每半个小时增加1")
private Integer stop;
@Schema(description = "计划周几执行")
private Integer weekDay;
@Schema(description = "所属计划ID")
private Integer planId;
}

View File

@@ -22,6 +22,7 @@ import com.genersoft.iot.vmp.media.bean.ResultForOnPublish;
import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
import com.genersoft.iot.vmp.service.IMediaService;
import com.genersoft.iot.vmp.service.ISendRtpServerService;
import com.genersoft.iot.vmp.service.IRecordPlanService;
import com.genersoft.iot.vmp.service.IUserService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy;
@@ -77,6 +78,9 @@ public class MediaServiceImpl implements IMediaService {
private ISendRtpServerService sendRtpServerService;
@Autowired
private IRecordPlanService recordPlanService;
@Override
public boolean authenticatePlay(String app, String stream, String callId) {
if (app == null || stream == null) {
@@ -223,6 +227,9 @@ public class MediaServiceImpl implements IMediaService {
@Override
public boolean closeStreamOnNoneReader(String mediaServerId, String app, String stream, String schema) {
boolean result = false;
if (recordPlanService.recording(app, stream) != null) {
return false;
}
// 国标类型的流
if ("rtp".equals(app)) {
result = userSetting.getStreamOnDemand();

View File

@@ -61,13 +61,7 @@ public class MobilePositionServiceImpl implements IMobilePositionService {
if (size == null || size == 0) {
return new ArrayList<>();
}
List<MobilePosition> mobilePositions;
if (size > length) {
mobilePositions = redisTemplate.opsForList().rightPop(REDIS_MOBILE_POSITION_LIST, length);
}else {
mobilePositions = redisTemplate.opsForList().rightPop(REDIS_MOBILE_POSITION_LIST, size);
}
return mobilePositions;
return redisTemplate.opsForList().rightPop(REDIS_MOBILE_POSITION_LIST, Math.min(length, size));
}

View File

@@ -0,0 +1,289 @@
package com.genersoft.iot.vmp.service.impl;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
import com.genersoft.iot.vmp.gb28181.dao.CommonGBChannelMapper;
import com.genersoft.iot.vmp.gb28181.service.IGbChannelPlayService;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IRecordPlanService;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.service.bean.RecordPlan;
import com.genersoft.iot.vmp.service.bean.RecordPlanItem;
import com.genersoft.iot.vmp.storager.dao.RecordPlanMapper;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import com.google.common.base.Joiner;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.TimeUnit;
@Service
@Slf4j
public class RecordPlanServiceImpl implements IRecordPlanService {
@Autowired
private RecordPlanMapper recordPlanMapper;
@Autowired
private CommonGBChannelMapper channelMapper;
@Autowired
private IGbChannelPlayService channelPlayService;
@Autowired
private IMediaServerService mediaServerService;
/**
* 流离开的处理
*/
@Async("taskExecutor")
@EventListener
public void onApplicationEvent(MediaDepartureEvent event) {
// 流断开,检查是否还处于录像状态, 如果是则继续录像
Integer channelId = recording(event.getApp(), event.getStream());
if(channelId == null) {
return;
}
// 重新拉起
CommonGBChannel channel = channelMapper.queryById(channelId);
if (channel == null) {
log.warn("[录制计划] 流离开时拉起需要录像的流时, 发现通道不存在, id: {}", channelId);
return;
}
// 开启点播,
channelPlayService.play(channel, null, ((code, msg, streamInfo) -> {
if (code == InviteErrorCode.SUCCESS.getCode() && streamInfo != null) {
log.info("[录像] 流离开时拉起需要录像的流, 开启成功, 通道ID: {}", channel.getGbId());
recordStreamMap.put(channel.getGbId(), streamInfo);
} else {
recordStreamMap.remove(channelId);
log.info("[录像] 流离开时拉起需要录像的流, 开启失败, 十分钟后重试, 通道ID: {}", channel.getGbId());
}
}));
}
Map<Integer, StreamInfo> recordStreamMap = new HashMap<>();
// @Scheduled(cron = "0 */30 * * * *")
@Scheduled(fixedRate = 10, timeUnit = TimeUnit.MINUTES)
public void execution() {
log.info("[录制计划] 执行");
// 查询现在需要录像的通道Id
List<Integer> startChannelIdList = queryCurrentChannelRecord();
if (startChannelIdList.isEmpty()) {
// 当前没有录像任务, 如果存在旧的正在录像的就移除
if(!recordStreamMap.isEmpty()) {
stopStreams(recordStreamMap.keySet(), recordStreamMap);
recordStreamMap.clear();
}
}else {
// 当前存在录像任务, 获取正在录像中存在但是当前录制列表不存在的内容,进行停止; 获取正在录像中没有但是当前需录制的列表中存在的进行开启.
Set<Integer> recordStreamSet = new HashSet<>(recordStreamMap.keySet());
startChannelIdList.forEach(recordStreamSet::remove);
if (!recordStreamSet.isEmpty()) {
// 正在录像中存在但是当前录制列表不存在的内容,进行停止;
stopStreams(recordStreamSet, recordStreamMap);
}
// 移除startChannelIdList中已经在录像的部分, 剩下的都是需要新添加的(正在录像中没有但是当前需录制的列表中存在的进行开启)
recordStreamMap.keySet().forEach(startChannelIdList::remove);
if (!startChannelIdList.isEmpty()) {
// 获取所有的关联的通道
List<CommonGBChannel> channelList = channelMapper.queryByIds(startChannelIdList);
if (!channelList.isEmpty()) {
// 查找是否已经开启录像, 如果没有则开启录像
for (CommonGBChannel channel : channelList) {
// 开启点播,
channelPlayService.play(channel, null, ((code, msg, streamInfo) -> {
if (code == InviteErrorCode.SUCCESS.getCode() && streamInfo != null) {
log.info("[录像] 开启成功, 通道ID: {}", channel.getGbId());
recordStreamMap.put(channel.getGbId(), streamInfo);
} else {
log.info("[录像] 开启失败, 十分钟后重试, 通道ID: {}", channel.getGbId());
}
}));
}
} else {
log.error("[录制计划] 数据异常, 这些关联的通道已经不存在了: {}", Joiner.on(",").join(startChannelIdList));
}
}
}
}
/**
* 获取当前时间段应该录像的通道Id列表
*/
private List<Integer> queryCurrentChannelRecord(){
// 获取当前时间在一周内的序号, 数据库存储的从第几个30分钟开始, 0-47, 包括首尾
LocalDateTime now = LocalDateTime.now();
int week = now.getDayOfWeek().getValue();
int index = now.getHour() * 2 + (now.getMinute() > 30?1:0);
// 查询现在需要录像的通道Id
return recordPlanMapper.queryRecordIng(week, index);
}
private void stopStreams(Collection<Integer> channelIds, Map<Integer, StreamInfo> recordStreamMap) {
for (Integer channelId : channelIds) {
try {
StreamInfo streamInfo = recordStreamMap.get(channelId);
if (streamInfo == null) {
continue;
}
// 查看是否有人观看,存在则不做处理,等待后续自然处理,如果无人观看,则关闭该流
MediaInfo mediaInfo = mediaServerService.getMediaInfo(streamInfo.getMediaServer(), streamInfo.getApp(), streamInfo.getStream());
if (mediaInfo.getReaderCount() == null || mediaInfo.getReaderCount() == 0) {
mediaServerService.closeStreams(streamInfo.getMediaServer(), streamInfo.getApp(), streamInfo.getStream());
log.info("[录制计划] 停止, 通道ID: {}", channelId);
}
}catch (Exception e) {
log.error("[录制计划] 停止时异常", e);
}finally {
recordStreamMap.remove(channelId);
}
}
}
@Override
public Integer recording(String app, String stream) {
for (Integer channelId : recordStreamMap.keySet()) {
StreamInfo streamInfo = recordStreamMap.get(channelId);
if (streamInfo != null && streamInfo.getApp().equals(app) && streamInfo.getStream().equals(stream)) {
return channelId;
}
}
return null;
}
@Override
@Transactional
public void add(RecordPlan plan) {
plan.setCreateTime(DateUtil.getNow());
plan.setUpdateTime(DateUtil.getNow());
recordPlanMapper.add(plan);
if (plan.getId() > 0 && !plan.getPlanItemList().isEmpty()) {
for (RecordPlanItem recordPlanItem : plan.getPlanItemList()) {
recordPlanItem.setPlanId(plan.getId());
}
recordPlanMapper.batchAddItem(plan.getId(), plan.getPlanItemList());
}
// TODO 更新录像队列
}
@Override
public RecordPlan get(Integer planId) {
RecordPlan recordPlan = recordPlanMapper.get(planId);
if (recordPlan == null) {
return null;
}
List<RecordPlanItem> recordPlanItemList = recordPlanMapper.getItemList(planId);
if (!recordPlanItemList.isEmpty()) {
recordPlan.setPlanItemList(recordPlanItemList);
}
return recordPlan;
}
@Override
@Transactional
public void update(RecordPlan plan) {
plan.setUpdateTime(DateUtil.getNow());
recordPlanMapper.update(plan);
recordPlanMapper.cleanItems(plan.getId());
if (plan.getPlanItemList() != null && !plan.getPlanItemList().isEmpty()){
List<RecordPlanItem> planItemList = new ArrayList<>();
for (RecordPlanItem recordPlanItem : plan.getPlanItemList()) {
if (recordPlanItem.getStart() == null || recordPlanItem.getStop() == null || recordPlanItem.getWeekDay() == null){
continue;
}
if (recordPlanItem.getPlanId() == null) {
recordPlanItem.setPlanId(plan.getId());
}
planItemList.add(recordPlanItem);
}
if(!planItemList.isEmpty()) {
recordPlanMapper.batchAddItem(plan.getId(), planItemList);
}
}
// TODO 更新录像队列
}
@Override
@Transactional
public void delete(Integer planId) {
RecordPlan recordPlan = recordPlanMapper.get(planId);
if (recordPlan == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "录制计划不存在");
}
// 清理关联的通道
channelMapper.removeRecordPlanByPlanId(recordPlan.getId());
recordPlanMapper.cleanItems(planId);
recordPlanMapper.delete(planId);
// TODO 更新录像队列
}
@Override
public PageInfo<RecordPlan> query(Integer page, Integer count, String query) {
PageHelper.startPage(page, count);
if (query != null) {
query = query.replaceAll("/", "//")
.replaceAll("%", "/%")
.replaceAll("_", "/_");
}
List<RecordPlan> all = recordPlanMapper.query(query);
return new PageInfo<>(all);
}
@Override
public void link(List<Integer> channelIds, Integer planId) {
if (channelIds == null || channelIds.isEmpty()) {
log.info("[录制计划] 关联/移除关联时, 通道编号必须存在");
throw new ControllerException(ErrorCode.ERROR100.getCode(), "通道编号必须存在");
}
if (planId == null) {
channelMapper.removeRecordPlan(channelIds);
}else {
channelMapper.addRecordPlan(channelIds, planId);
}
// 查看当前的待录制列表是否变化,如果变化,则调用录制计划马上开始录制
execution();
}
@Override
public PageInfo<CommonGBChannel> queryChannelList(int page, int count, String query, Integer channelType, Boolean online, Integer planId, Boolean hasLink) {
PageHelper.startPage(page, count);
if (query != null) {
query = query.replaceAll("/", "//")
.replaceAll("%", "/%")
.replaceAll("_", "/_");
}
List<CommonGBChannel> all = channelMapper.queryForRecordPlanForWebList(planId, query, channelType, online, hasLink);
return new PageInfo<>(all);
}
@Override
public void linkAll(Integer planId) {
channelMapper.addRecordPlanForAll(planId);
}
@Override
public void cleanAll(Integer planId) {
channelMapper.removeRecordPlanByPlanId(planId);
}
}

View File

@@ -34,7 +34,7 @@ public class SendRtpServerServiceImpl implements ISendRtpServerService {
public SendRtpInfo createSendRtpInfo(MediaServer mediaServer, String ip, Integer port, String ssrc, String requesterId,
String deviceId, Integer channelId, Boolean isTcp, Boolean rtcp) {
int localPort = getNextPort(mediaServer);
if (localPort == 0) {
if (localPort <= 0) {
return null;
}
return SendRtpInfo.getInstance(localPort, mediaServer, ip, port, ssrc, deviceId, null, channelId,

View File

@@ -0,0 +1,67 @@
package com.genersoft.iot.vmp.storager.dao;
import com.genersoft.iot.vmp.service.bean.RecordPlan;
import com.genersoft.iot.vmp.service.bean.RecordPlanItem;
import org.apache.ibatis.annotations.*;
import java.util.List;
@Mapper
public interface RecordPlanMapper {
@Insert(" <script>" +
"INSERT INTO wvp_record_plan (" +
" name," +
" snap," +
" create_time," +
" update_time) " +
"VALUES (" +
" #{name}," +
" #{snap}," +
" #{createTime}," +
" #{updateTime})" +
" </script>")
@Options(useGeneratedKeys = true, keyProperty = "id", keyColumn = "id")
void add(RecordPlan plan);
@Insert(" <script>" +
"INSERT INTO wvp_record_plan_item (" +
"start," +
"stop, " +
"week_day," +
"plan_id) " +
"VALUES" +
"<foreach collection='planItemList' index='index' item='item' separator=','> " +
"(#{item.start}, #{item.stop}, #{item.weekDay},#{planId})" +
"</foreach> " +
" </script>")
void batchAddItem(@Param("planId") int planId, List<RecordPlanItem> planItemList);
@Select("select * from wvp_record_plan where id = #{planId}")
RecordPlan get(@Param("planId") Integer planId);
@Select(" <script>" +
" SELECT wrp.*, (select count(1) from wvp_device_channel where record_plan_id = wrp.id) AS channelCount\n" +
" FROM wvp_record_plan wrp where 1=1" +
" <if test='query != null'> AND (name LIKE concat('%',#{query},'%') escape '/' )</if> " +
" </script>")
List<RecordPlan> query(@Param("query") String query);
@Update("UPDATE wvp_record_plan SET update_time=#{updateTime}, name=#{name}, snap=#{snap} WHERE id=#{id}")
void update(RecordPlan plan);
@Delete("DELETE FROM wvp_record_plan WHERE id=#{planId}")
void delete(@Param("planId") Integer planId);
@Select("select * from wvp_record_plan_item where plan_id = #{planId}")
List<RecordPlanItem> getItemList(@Param("planId") Integer planId);
@Delete("DELETE FROM wvp_record_plan_item WHERE plan_id = #{planId}")
void cleanItems(@Param("planId") Integer planId);
@Select(" <script>" +
" select wdc.id from wvp_device_channel wdc left join wvp_record_plan_item wrpi on wrpi.plan_id = wdc.record_plan_id " +
" where wrpi.week_day = #{week} and wrpi.start &lt;= #{index} and stop &gt;= #{index} group by wdc.id" +
" </script>")
List<Integer> queryRecordIng(@Param("week") int week, @Param("index") int index);
}

View File

@@ -0,0 +1,150 @@
package com.genersoft.iot.vmp.vmanager.recordPlan;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.security.JwtUtils;
import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService;
import com.genersoft.iot.vmp.service.IRecordPlanService;
import com.genersoft.iot.vmp.service.bean.RecordPlan;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.recordPlan.bean.RecordPlanParam;
import com.github.pagehelper.PageInfo;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.security.SecurityRequirement;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.Assert;
import org.springframework.web.bind.annotation.*;
import java.util.ArrayList;
import java.util.List;
@Tag(name = "录制计划")
@Slf4j
@RestController
@RequestMapping("/api/record/plan")
public class RecordPlanController {
@Autowired
private IRecordPlanService recordPlanService;
@Autowired
private IDeviceChannelService deviceChannelService;
@ResponseBody
@PostMapping("/add")
@Operation(summary = "添加录制计划", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "plan", description = "计划", required = true)
public void add(@RequestBody RecordPlan plan) {
if (plan.getPlanItemList() == null || plan.getPlanItemList().isEmpty()) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "添加录制计划时,录制计划不可为空");
}
recordPlanService.add(plan);
}
@ResponseBody
@PostMapping("/link")
@Operation(summary = "通道关联录制计划", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "param", description = "通道关联录制计划", required = true)
public void link(@RequestBody RecordPlanParam param) {
if (param.getAllLink() != null) {
if (param.getAllLink()) {
recordPlanService.linkAll(param.getPlanId());
}else {
recordPlanService.cleanAll(param.getPlanId());
}
return;
}
if (param.getChannelIds() == null && param.getDeviceDbIds() == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "通道ID和国标设备ID不可都为NULL");
}
List<Integer> channelIds = new ArrayList<>();
if (param.getChannelIds() != null) {
channelIds.addAll(param.getChannelIds());
}else {
List<Integer> chanelIdList = deviceChannelService.queryChaneIdListByDeviceDbIds(param.getDeviceDbIds());
if (chanelIdList != null && !chanelIdList.isEmpty()) {
channelIds = chanelIdList;
}
}
recordPlanService.link(channelIds, param.getPlanId());
}
@ResponseBody
@GetMapping("/get")
@Operation(summary = "查询录制计划", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "planId", description = "计划ID", required = true)
public RecordPlan get(Integer planId) {
if (planId == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "计划ID不可为NULL");
}
return recordPlanService.get(planId);
}
@ResponseBody
@GetMapping("/query")
@Operation(summary = "查询录制计划列表", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "query", description = "检索内容", required = false)
@Parameter(name = "page", description = "当前页", required = true)
@Parameter(name = "count", description = "每页查询数量", required = true)
public PageInfo<RecordPlan> query(@RequestParam(required = false) String query, @RequestParam Integer page, @RequestParam Integer count) {
if (query != null && ObjectUtils.isEmpty(query.trim())) {
query = null;
}
return recordPlanService.query(page, count, query);
}
@Operation(summary = "分页查询级联平台的所有所有通道", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "page", description = "当前页", required = true)
@Parameter(name = "count", description = "每页条数", required = true)
@Parameter(name = "planId", description = "录制计划ID")
@Parameter(name = "channelType", description = "通道类型, 0国标设备1推流设备2拉流代理")
@Parameter(name = "query", description = "查询内容")
@Parameter(name = "online", description = "是否在线")
@Parameter(name = "hasLink", description = "是否已经关联")
@GetMapping("/channel/list")
@ResponseBody
public PageInfo<CommonGBChannel> queryChannelList(int page, int count,
@RequestParam(required = false) Integer planId,
@RequestParam(required = false) String query,
@RequestParam(required = false) Integer channelType,
@RequestParam(required = false) Boolean online,
@RequestParam(required = false) Boolean hasLink) {
Assert.notNull(planId, "录制计划ID不可为NULL");
if (org.springframework.util.ObjectUtils.isEmpty(query)) {
query = null;
}
return recordPlanService.queryChannelList(page, count, query, channelType, online, planId, hasLink);
}
@ResponseBody
@PostMapping("/update")
@Operation(summary = "更新录制计划", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "plan", description = "计划", required = true)
public void update(@RequestBody RecordPlan plan) {
if (plan == null || plan.getId() == 0) {
throw new ControllerException(ErrorCode.ERROR400);
}
recordPlanService.update(plan);
}
@ResponseBody
@DeleteMapping("/delete")
@Operation(summary = "删除录制计划", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "planId", description = "计划ID", required = true)
public void delete(Integer planId) {
if (planId == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "计划IDID不可为NULL");
}
recordPlanService.delete(planId);
}
}

View File

@@ -0,0 +1,23 @@
package com.genersoft.iot.vmp.vmanager.recordPlan.bean;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.util.List;
@Data
@Schema(description = "录制计划-添加/编辑参数")
public class RecordPlanParam {
@Schema(description = "关联的通道ID")
private List<Integer> channelIds;
@Schema(description = "关联的设备ID会为设备下的所有通道关联此录制计划channelId存在是此项不生效")
private List<Integer> deviceDbIds;
@Schema(description = "全部关联/全部取消关联")
private Boolean allLink;
@Schema(description = "录制计划ID, ID为空是删除关联的计划")
private Integer planId;
}

View File

@@ -0,0 +1,10 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
111
</body>
</html>

View File

@@ -251,6 +251,8 @@ user-settings:
# 0 国标标准实现,设备离线后不回复心跳,直到设备重新注册上线,
# 1默认 对于离线设备,收到心跳就把设备设置为上线,并更新注册时间为上次这次心跳的时间。防止过期时间判断异常
gb-device-online: 0
# 登录超时时间(分钟)
login-timeout: 30
# 关闭在线文档(生产环境建议关闭)
springdoc: