Merge branch 'master' into 重构/1078

# Conflicts:
#	web/src/router/index.js
#	web/src/views/common/channelPlayer/chooseChannelForJt.vue
#	web/src/views/common/channelPlayer/jtDeviceEdit.vue
#	web/src/views/common/channelPlayer/jtDevicePlayer.vue
#	web_src/src/layout/UiHeader.vue
#	web_src/src/main.js
#	web_src/src/router/index.js
This commit is contained in:
lin
2025-07-03 09:41:55 +08:00
262 changed files with 3446 additions and 40754 deletions

View File

@@ -39,9 +39,9 @@ public class SpringDocConfig {
.info(new Info().title("WVP-PRO 接口文档")
.contact(contact)
.description("开箱即用的28181协议视频平台。 <br/>" +
"1. 打开http://127.0.0.1:18080/doc.html#/1.%20全部/用户管理/login_1" +
"1. 打开<a href='/doc.html#/1.%20全部/用户管理/login_1'>登录</a>接口" +
" 登录成功后返回AccessToken。 <br/>" +
"2. 填写到AccessToken到参数值 http://127.0.0.1:18080/doc.html#/Authorize/1.%20全部 <br/>" +
"2. 填写到AccessToken到参数值 <a href='/doc.html#/Authorize/1.%20全部'>Token配置</a> <br/>" +
"后续接口就可以直接测试了")
.version("v3.1.0")
.license(new License().name("Apache 2.0").url("http://springdoc.org")));

View File

@@ -204,6 +204,9 @@ public class UserSetting {
*/
private boolean sipCacheServerConnections = true;
/**
* 禁用date头变相禁用了校时
*/
private boolean disableDateHeader = false;
}

View File

@@ -21,6 +21,12 @@ public class DeviceChannel extends CommonGBChannel {
@Schema(description = "数据库自增ID")
private int id;
@Schema(description = "父设备编码")
private String parentDeviceId;
@Schema(description = "父设备名称")
private String parentName;
@MessageElementForCatalog("DeviceID")
@Schema(description = "编码")
private String deviceId;

View File

@@ -0,0 +1,43 @@
package com.genersoft.iot.vmp.gb28181.bean;
import lombok.Getter;
import lombok.Setter;
import org.dom4j.Element;
import org.jetbrains.annotations.NotNull;
import java.util.List;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class MessageResponseTask<T> implements Delayed {
@Getter
@Setter
private Element element;
@Getter
@Setter
private List<T> data;
@Getter
@Setter
private String key;
/**
* 超时时间(单位: 毫秒)
*/
@Getter
@Setter
private long delayTime;
@Override
public long getDelay(@NotNull TimeUnit unit) {
return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(@NotNull Delayed o) {
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
}

View File

@@ -148,6 +148,7 @@ public class AlarmController {
@Parameter(name = "page",description = "当前页",required = true)
@Parameter(name = "count",description = "每页查询数量",required = true)
@Parameter(name = "deviceId",description = "设备id")
@Parameter(name = "channelId",description = "通道id")
@Parameter(name = "alarmPriority",description = "查询内容")
@Parameter(name = "alarmMethod",description = "查询内容")
@Parameter(name = "alarmType",description = "每页查询数量")
@@ -157,7 +158,8 @@ public class AlarmController {
public PageInfo<DeviceAlarm> getAll(
@RequestParam int page,
@RequestParam int count,
@RequestParam(required = false) String deviceId,
@RequestParam(required = false) String deviceId,
@RequestParam(required = false) String channelId,
@RequestParam(required = false) String alarmPriority,
@RequestParam(required = false) String alarmMethod,
@RequestParam(required = false) String alarmType,
@@ -186,7 +188,7 @@ public class AlarmController {
throw new ControllerException(ErrorCode.ERROR400.getCode(), "endTime格式为" + DateUtil.PATTERN);
}
return deviceAlarmService.getAllAlarm(page, count, deviceId, alarmPriority, alarmMethod,
return deviceAlarmService.getAllAlarm(page, count, deviceId, channelId, alarmPriority, alarmMethod,
alarmType, startTime, endTime);
}
}

View File

@@ -113,6 +113,19 @@ public class DeviceQuery {
return deviceChannelService.queryChannelsByDeviceId(deviceId, query, channelType, online, page, count);
}
@GetMapping("/streams")
@Operation(summary = "分页查询存在流的通道", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "page", description = "当前页", required = true)
@Parameter(name = "count", description = "每页查询数量", required = true)
@Parameter(name = "query", description = "查询内容")
public PageInfo<DeviceChannel> streamChannels(int page, int count,
@RequestParam(required = false) String query) {
if (ObjectUtils.isEmpty(query)) {
query = null;
}
return deviceChannelService.queryChannels(query, true, null, null, true, page, count);
}
@Operation(summary = "同步设备通道", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "deviceId", description = "设备国标编号", required = true)

View File

@@ -5,6 +5,7 @@ import com.genersoft.iot.vmp.gb28181.bean.Group;
import com.genersoft.iot.vmp.gb28181.bean.GroupTree;
import com.genersoft.iot.vmp.gb28181.service.IGroupService;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.github.pagehelper.PageInfo;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
@@ -33,7 +34,7 @@ public class GroupController {
groupService.add(group);
}
@Operation(summary = "查询分组")
@Operation(summary = "查询分组节点")
@Parameter(name = "query", description = "要搜索的内容", required = true)
@Parameter(name = "parent", description = "所属分组编号", required = true)
@ResponseBody
@@ -49,6 +50,17 @@ public class GroupController {
return groupService.queryForTree(query, parent, hasChannel);
}
@Operation(summary = "查询分组")
@Parameter(name = "query", description = "要搜索的内容", required = true)
@Parameter(name = "channel", description = "true为查询通道false为查询节点", required = true)
@ResponseBody
@GetMapping("/tree/query")
public PageInfo<Group> queryTree(Integer page, Integer count,
@RequestParam(required = true) String query
){
return groupService.queryList(page, count, query);
}
@Operation(summary = "更新分组")
@Parameter(name = "group", description = "Group", required = true)
@ResponseBody

View File

@@ -7,19 +7,26 @@ import com.genersoft.iot.vmp.conf.security.SecurityUtils;
import com.genersoft.iot.vmp.conf.security.dto.LoginUser;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyService;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.StreamContent;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.security.SecurityRequirement;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.ObjectUtils;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.context.request.async.DeferredResult;
import javax.servlet.http.HttpServletRequest;
import java.net.MalformedURLException;
import java.net.URL;
@Tag(name = "媒体流相关")
@@ -52,11 +59,12 @@ public class MediaController {
@Parameter(name = "useSourceIpAsStreamIp", description = "是否使用请求IP作为返回的地址IP")
@GetMapping(value = "/stream_info_by_app_and_stream")
@ResponseBody
public StreamContent getStreamInfoByAppAndStream(HttpServletRequest request, @RequestParam String app,
@RequestParam String stream,
@RequestParam(required = false) String mediaServerId,
@RequestParam(required = false) String callId,
@RequestParam(required = false) Boolean useSourceIpAsStreamIp){
public DeferredResult<WVPResult<StreamContent>> getStreamInfoByAppAndStream(HttpServletRequest request, @RequestParam String app,
@RequestParam String stream,
@RequestParam(required = false) String mediaServerId,
@RequestParam(required = false) String callId,
@RequestParam(required = false) Boolean useSourceIpAsStreamIp){
DeferredResult<WVPResult<StreamContent>> result = new DeferredResult<>();
boolean authority = false;
if (callId != null) {
// 权限校验
@@ -75,9 +83,7 @@ public class MediaController {
authority = true;
}
}
StreamInfo streamInfo;
if (useSourceIpAsStreamIp != null && useSourceIpAsStreamIp) {
String host = request.getHeader("Host");
String localAddr = host.split(":")[0];
@@ -88,30 +94,37 @@ public class MediaController {
}
if (streamInfo != null){
return new StreamContent(streamInfo);
WVPResult<StreamContent> wvpResult = WVPResult.success();
wvpResult.setData(new StreamContent(streamInfo));
result.setResult(wvpResult);
}else {
ErrorCallback<StreamInfo> callback = (code, msg, streamInfoStoStart) -> {
if (code == InviteErrorCode.SUCCESS.getCode()) {
WVPResult<StreamContent> wvpResult = WVPResult.success();
if (useSourceIpAsStreamIp != null && useSourceIpAsStreamIp) {
String host;
try {
URL url=new URL(request.getRequestURL().toString());
host=url.getHost();
} catch (MalformedURLException e) {
host=request.getLocalAddr();
}
streamInfoStoStart.changeStreamIp(host);
}
if (!ObjectUtils.isEmpty(streamInfoStoStart.getMediaServer().getTranscodeSuffix())
&& !"null".equalsIgnoreCase(streamInfoStoStart.getMediaServer().getTranscodeSuffix())) {
streamInfoStoStart.setStream(streamInfoStoStart.getStream() + "_" + streamInfoStoStart.getMediaServer().getTranscodeSuffix());
}
wvpResult.setData(new StreamContent(streamInfoStoStart));
result.setResult(wvpResult);
}else {
result.setResult(WVPResult.fail(code, msg));
}
};
//获取流失败,重启拉流后重试一次
streamProxyService.stopByAppAndStream(app,stream);
boolean start = streamProxyService.startByAppAndStream(app, stream);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
log.error("[线程休眠失败] {}", e.getMessage());
}
if (useSourceIpAsStreamIp != null && useSourceIpAsStreamIp) {
String host = request.getHeader("Host");
String localAddr = host.split(":")[0];
log.info("使用{}作为返回流的ip", localAddr);
streamInfo = mediaServerService.getStreamInfoByAppAndStreamWithCheck(app, stream, mediaServerId, localAddr, authority);
}else {
streamInfo = mediaServerService.getStreamInfoByAppAndStreamWithCheck(app, stream, mediaServerId, authority);
}
if (streamInfo != null){
return new StreamContent(streamInfo);
}else {
throw new ControllerException(ErrorCode.ERROR100);
}
streamProxyService.startByAppAndStream(app, stream, callback);
}
return result;
}
/**
* 获取推流播放地址

View File

@@ -50,20 +50,29 @@ public class RegionController {
return regionService.query(query, page, count);
}
@Operation(summary = "查询区域")
@Operation(summary = "查询区域节点")
@Parameter(name = "query", description = "要搜索的内容", required = true)
@Parameter(name = "parent", description = "所属行政区划编号", required = true)
@Parameter(name = "hasChannel", description = "是否查询通道", required = true)
@ResponseBody
@GetMapping("/tree/list")
public List<RegionTree> queryForTree(
@RequestParam(required = false) String query,
@RequestParam(required = false) Integer parent,
@RequestParam(required = false) Boolean hasChannel
){
if (ObjectUtils.isEmpty(query)) {
query = null;
}
return regionService.queryForTree(query, parent, hasChannel);
return regionService.queryForTree(parent, hasChannel);
}
@Operation(summary = "查询区域")
@Parameter(name = "query", description = "要搜索的内容", required = true)
@Parameter(name = "channel", description = "true为查询通道false为查询节点", required = true)
@ResponseBody
@GetMapping("/tree/query")
public PageInfo<Region> queryTree(Integer page, Integer count,
@RequestParam(required = true) String query
){
return regionService.queryList(page, count, query);
}
@Operation(summary = "更新区域")

View File

@@ -275,10 +275,8 @@ public interface CommonGBChannelMapper {
" true as is_leaf " +
" from wvp_device_channel " +
" where coalesce(gb_civil_code, civil_code) = #{parentDeviceId} " +
" <if test='query != null'> AND (coalesce(gb_device_id, device_id) LIKE concat('%',#{query},'%') " +
" OR coalesce(gb_name, name) LIKE concat('%',#{query},'%'))</if> " +
" </script>")
List<RegionTree> queryForRegionTreeByCivilCode(@Param("query") String query, @Param("parentDeviceId") String parentDeviceId);
List<RegionTree> queryForRegionTreeByCivilCode(@Param("parentDeviceId") String parentDeviceId);
@Update(value = {" <script>" +
" UPDATE wvp_device_channel " +

View File

@@ -26,6 +26,7 @@ public interface DeviceAlarmMapper {
" SELECT * FROM wvp_device_alarm " +
" WHERE 1=1 " +
" <if test=\"deviceId != null\" > AND device_id = #{deviceId}</if>" +
" <if test=\"channelId != null\" > AND channel_id = #{channelId}</if>" +
" <if test=\"alarmPriority != null\" > AND alarm_priority = #{alarmPriority} </if>" +
" <if test=\"alarmMethod != null\" > AND alarm_method = #{alarmMethod} </if>" +
" <if test=\"alarmType != null\" > AND alarm_type = #{alarmType} </if>" +
@@ -33,7 +34,7 @@ public interface DeviceAlarmMapper {
" <if test=\"endTime != null\" > AND alarm_time &lt;= #{endTime} </if>" +
" ORDER BY alarm_time ASC " +
" </script>"})
List<DeviceAlarm> query(@Param("deviceId") String deviceId, @Param("alarmPriority") String alarmPriority, @Param("alarmMethod") String alarmMethod,
List<DeviceAlarm> query(@Param("deviceId") String deviceId, @Param("channelId") String channelId, @Param("alarmPriority") String alarmPriority, @Param("alarmMethod") String alarmMethod,
@Param("alarmType") String alarmType, @Param("startTime") String startTime, @Param("endTime") String endTime);

View File

@@ -86,10 +86,11 @@ public interface DeviceChannelMapper {
int update(DeviceChannel channel);
@SelectProvider(type = DeviceChannelProvider.class, method = "queryChannels")
List<DeviceChannel> queryChannels(@Param("dataDeviceId") int dataDeviceId, @Param("civilCode") String civilCode,
List<DeviceChannel> queryChannels(@Param("dataDeviceId") Integer dataDeviceId, @Param("civilCode") String civilCode,
@Param("businessGroupId") String businessGroupId, @Param("parentChannelId") String parentChannelId,
@Param("query") String query, @Param("hasSubChannel") Boolean hasSubChannel,
@Param("online") Boolean online, @Param("channelIds") List<String> channelIds);
@Param("query") String query, @Param("queryParent") Boolean queryParent,
@Param("hasSubChannel") Boolean hasSubChannel, @Param("online") Boolean online,
@Param("channelIds") List<String> channelIds, @Param("hasStream") Boolean hasStream);
@SelectProvider(type = DeviceChannelProvider.class, method = "queryChannelsByDeviceDbId")
List<DeviceChannel> queryChannelsByDeviceDbId(@Param("dataDeviceId") int dataDeviceId);

View File

@@ -346,38 +346,41 @@ public interface DeviceMapper {
@Select(" <script>" +
"SELECT " +
"id, " +
"device_id, " +
"coalesce(custom_name, name) as name, " +
"password, " +
"manufacturer, " +
"model, " +
"firmware, " +
"transport," +
"stream_mode," +
"ip,"+
"sdp_ip,"+
"local_ip,"+
"port,"+
"host_address,"+
"expires,"+
"register_time,"+
"keepalive_time,"+
"create_time,"+
"update_time,"+
"charset,"+
"subscribe_cycle_for_catalog,"+
"subscribe_cycle_for_mobile_position,"+
"mobile_position_submission_interval,"+
"subscribe_cycle_for_alarm,"+
"ssrc_check,"+
"as_message_channel,"+
"broadcast_push_after_ack,"+
"geo_coord_sys,"+
"on_line,"+
"media_server_id,"+
"server_id,"+
"(SELECT count(0) FROM wvp_device_channel dc WHERE dc.data_type = #{dataType} and dc.data_device_id= de.id) as channel_count " +
"id" +
",device_id" +
",manufacturer" +
",model" +
",firmware" +
",transport" +
",stream_mode" +
",on_line" +
",register_time" +
",keepalive_time" +
",ip" +
",create_time" +
",update_time" +
",port" +
",expires" +
",subscribe_cycle_for_catalog" +
",subscribe_cycle_for_mobile_position" +
",mobile_position_submission_interval" +
",subscribe_cycle_for_alarm" +
",host_address" +
",charset" +
",ssrc_check" +
",geo_coord_sys" +
",media_server_id" +
",sdp_ip" +
",local_ip" +
",password" +
",as_message_channel" +
",heart_beat_interval" +
",heart_beat_count" +
",position_capability" +
",broadcast_push_after_ack" +
",server_id" +
",(SELECT count(0) FROM wvp_device_channel dc WHERE dc.data_type = #{dataType} and dc.data_device_id= de.id) as channel_count " +
" FROM wvp_device de" +
" where 1 = 1 "+
" <if test='status != null'> AND de.on_line=${status}</if>"+
@@ -423,4 +426,36 @@ public interface DeviceMapper {
"<foreach collection='offlineDevices' item='item' open='(' separator=',' close=')' > #{item.id}</foreach>" +
" </script>"})
void offlineByList(List<Device> offlineDevices);
@Update({"<script>" +
"<foreach collection='devices' item='item' separator=';'>" +
" UPDATE" +
" wvp_device" +
" SET update_time=#{item.updateTime}" +
", name=#{item.name}" +
", manufacturer=#{item.manufacturer}" +
", model=#{item.model}" +
", firmware=#{item.firmware}" +
", transport=#{item.transport}" +
", ip=#{item.ip}" +
", local_ip=#{item.localIp}" +
", port=#{item.port}" +
", host_address=#{item.hostAddress}" +
", on_line=#{item.onLine}" +
", register_time=#{item.registerTime}" +
", keepalive_time=#{item.keepaliveTime}" +
", heart_beat_interval=#{item.heartBeatInterval}" +
", position_capability=#{item.positionCapability}" +
", heart_beat_count=#{item.heartBeatCount}" +
", subscribe_cycle_for_catalog=#{item.subscribeCycleForCatalog}" +
", subscribe_cycle_for_mobile_position=#{item.subscribeCycleForMobilePosition}" +
", mobile_position_submission_interval=#{item.mobilePositionSubmissionInterval}" +
", subscribe_cycle_for_alarm=#{item.subscribeCycleForAlarm}" +
", expires=#{item.expires}" +
", server_id=#{item.serverId}" +
" WHERE device_id=#{item.deviceId}"+
"</foreach>" +
"</script>"})
void batchUpdate(List<Device> devices);
}

View File

@@ -80,9 +80,8 @@ public interface RegionMapper {
" where " +
" <if test='parentId != null'> parent_id = #{parentId} </if> " +
" <if test='parentId == null'> parent_id is null </if> " +
" <if test='query != null'> AND (device_id LIKE concat('%',#{query},'%') escape '/' OR name LIKE concat('%',#{query},'%') escape '/')</if> " +
" </script>")
List<RegionTree> queryForTree(@Param("query") String query, @Param("parentId") Integer parentId);
List<RegionTree> queryForTree(@Param("parentId") Integer parentId);
@Delete("<script>" +
" DELETE FROM wvp_common_region WHERE id in " +

View File

@@ -20,6 +20,8 @@ public class DeviceChannelProvider {
" dc.gps_time,\n" +
" dc.stream_identification,\n" +
" dc.channel_type,\n" +
" d.device_id as parent_device_id,\n" +
" coalesce(d.custom_name, d.name) as parent_name,\n" +
" coalesce(dc.gb_device_id, dc.device_id) as device_id,\n" +
" coalesce(dc.gb_name, dc.name) as name,\n" +
" coalesce(dc.gb_manufacturer, dc.manufacturer) as manufacturer,\n" +
@@ -55,13 +57,17 @@ public class DeviceChannelProvider {
" coalesce(dc.gb_svc_space_support_mod, dc.svc_space_support_mod) as svc_space_support_mod,\n" +
" coalesce(dc.gb_svc_time_support_mode,dc.svc_time_support_mode) as svc_time_support_mode\n" +
" from " +
" wvp_device_channel dc "
" wvp_device_channel dc " +
" left join wvp_device d on d.id = dc.data_device_id "
;
}
public String queryChannels(Map<String, Object> params ){
StringBuilder sqlBuild = new StringBuilder();
sqlBuild.append(getBaseSelectSql());
sqlBuild.append(" where data_type = " + ChannelDataType.GB28181.value + " and dc.data_device_id = #{dataDeviceId} ");
sqlBuild.append(" where data_type = " + ChannelDataType.GB28181.value);
if (params.get("dataDeviceId") != null) {
sqlBuild.append(" AND dc.data_device_id = #{dataDeviceId} ");
}
if (params.get("businessGroupId") != null ) {
sqlBuild.append(" AND coalesce(dc.gb_business_group_id, dc.business_group_id)=#{businessGroupId} AND coalesce(dc.gb_parent_id, dc.parent_id) is null");
}else if (params.get("parentChannelId") != null ) {
@@ -73,8 +79,15 @@ public class DeviceChannelProvider {
}
if (params.get("query") != null && !ObjectUtils.isEmpty(params.get("query"))) {
sqlBuild.append(" AND (coalesce(dc.gb_device_id, dc.device_id) LIKE concat('%',#{query},'%') escape '/'" +
" OR coalesce(dc.gb_name, dc.name) LIKE concat('%',#{query},'%') escape '/')")
;
" OR coalesce(dc.gb_name, dc.name) LIKE concat('%',#{query},'%') escape '/'");
if (params.get("queryParent") != null && (Boolean) params.get("queryParent")) {
sqlBuild.append(" OR d.device_id LIKE concat('%',#{query},'%') escape '/'");
sqlBuild.append(" OR coalesce(d.custom_name, d.name) LIKE concat('%',#{query},'%') escape '/'");
}
sqlBuild.append(")");
}
if (params.get("hasStream") != null && (Boolean) params.get("hasStream")) {
sqlBuild.append(" AND dc.stream_id IS NOT NULL");
}
if (params.get("online") != null && (Boolean)params.get("online")) {
sqlBuild.append(" AND coalesce(gb_status, status) = 'ON'");
@@ -101,7 +114,7 @@ public class DeviceChannelProvider {
}
sqlBuild.append(" )");
}
sqlBuild.append(" ORDER BY device_id");
sqlBuild.append(" ORDER BY d.device_id, dc.device_id");
return sqlBuild.toString();
}

View File

@@ -23,7 +23,7 @@ public interface IDeviceAlarmService {
* @param endTime 结束时间
* @return 报警列表
*/
PageInfo<DeviceAlarm> getAllAlarm(int page, int count, String deviceId, String alarmPriority, String alarmMethod,
PageInfo<DeviceAlarm> getAllAlarm(int page, int count, String deviceId, String channelId, String alarmPriority, String alarmMethod,
String alarmType, String startTime, String endTime);
/**

View File

@@ -1,10 +1,7 @@
package com.genersoft.iot.vmp.gb28181.service;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.common.enums.DeviceControlType;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.controller.bean.ChannelReduce;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
@@ -104,6 +101,7 @@ public interface IDeviceChannelService {
PageInfo<DeviceChannel> queryChannelsByDeviceId(String deviceId, String query, Boolean channelType, Boolean online, int page, int count);
PageInfo<DeviceChannel> queryChannels(String query, Boolean queryParent, Boolean channelType, Boolean online, Boolean hasStream, int page, int count);
List<Device> queryDeviceWithAsMessageChannel();

View File

@@ -6,6 +6,7 @@ import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import com.github.pagehelper.PageInfo;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
@@ -117,6 +118,9 @@ public interface IDeviceService {
*/
void updateDevice(Device device);
@Transactional
void updateDeviceList(List<Device> deviceList);
/**
* 检查设备编号是否已经存在
* @param deviceId 设备编号

View File

@@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.service;
import com.genersoft.iot.vmp.gb28181.bean.Group;
import com.genersoft.iot.vmp.gb28181.bean.GroupTree;
import com.github.pagehelper.PageInfo;
import java.util.List;
@@ -23,4 +24,6 @@ public interface IGroupService {
boolean batchAdd(List<Group> groupList);
List<Group> getPath(String deviceId, String businessGroup);
PageInfo<Group> queryList(Integer page, Integer count, String query);
}

View File

@@ -27,7 +27,7 @@ public interface IRegionService {
Region queryRegionByDeviceId(String regionDeviceId);
List<RegionTree> queryForTree(String query, Integer parent, Boolean hasChannel);
List<RegionTree> queryForTree(Integer parent, Boolean hasChannel);
void syncFromChannel();
@@ -40,4 +40,6 @@ public interface IRegionService {
String getDescription(String civilCode);
void addByCivilCode(String civilCode);
PageInfo<Region> queryList(int page, int count, String query);
}

View File

@@ -17,9 +17,9 @@ public class DeviceAlarmServiceImpl implements IDeviceAlarmService {
private DeviceAlarmMapper deviceAlarmMapper;
@Override
public PageInfo<DeviceAlarm> getAllAlarm(int page, int count, String deviceId, String alarmPriority, String alarmMethod, String alarmType, String startTime, String endTime) {
public PageInfo<DeviceAlarm> getAllAlarm(int page, int count, String deviceId, String channelId, String alarmPriority, String alarmMethod, String alarmType, String startTime, String endTime) {
PageHelper.startPage(page, count);
List<DeviceAlarm> all = deviceAlarmMapper.query(deviceId, alarmPriority, alarmMethod, alarmType, startTime, endTime);
List<DeviceAlarm> all = deviceAlarmMapper.query(deviceId, channelId, alarmPriority, alarmMethod, alarmType, startTime, endTime);
return new PageInfo<>(all);
}

View File

@@ -696,7 +696,7 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
.replaceAll("%", "/%")
.replaceAll("_", "/_");
}
List<DeviceChannel> all = channelMapper.queryChannels(deviceDbId, civilCode, businessGroupId, parentId, query, channelType, online,null);
List<DeviceChannel> all = channelMapper.queryChannels(deviceDbId, civilCode, businessGroupId, parentId, query, false, channelType, online, null, null);
return new PageInfo<>(all);
}
@@ -717,7 +717,19 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
.replaceAll("_", "/_");
}
PageHelper.startPage(page, count);
List<DeviceChannel> all = channelMapper.queryChannels(device.getId(), null,null, null, query, hasSubChannel, online,null);
List<DeviceChannel> all = channelMapper.queryChannels(device.getId(), null, null, null, query, false, hasSubChannel, online, null, null);
return new PageInfo<>(all);
}
@Override
public PageInfo<DeviceChannel> queryChannels(String query, Boolean queryParent, Boolean hasSubChannel, Boolean online, Boolean hasStream, int page, int count) {
PageHelper.startPage(page, count);
if (query != null) {
query = query.replaceAll("/", "//")
.replaceAll("%", "/%")
.replaceAll("_", "/_");
}
List<DeviceChannel> all = channelMapper.queryChannels(null, null, null, null, query, queryParent, hasSubChannel, online, null, hasStream);
return new PageInfo<>(all);
}

View File

@@ -506,7 +506,11 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
if (device == null || device.getSubscribeCycleForCatalog() < 0) {
return false;
}
log.info("[添加目录订阅] 设备 {}", device.getDeviceId());
if (transactionInfo == null) {
log.info("[添加目录订阅] 设备 {}", device.getDeviceId());
}else {
log.info("[目录订阅续期] 设备 {}", device.getDeviceId());
}
try {
sipCommander.catalogSubscribe(device, transactionInfo, eventResult -> {
ResponseEvent event = (ResponseEvent) eventResult.event;
@@ -514,8 +518,8 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
log.info("[目录订阅]成功: {}", device.getDeviceId());
if (!subscribeTaskRunner.containsKey(SubscribeTaskForCatalog.getKey(device))) {
SIPResponse response = (SIPResponse) event.getResponse();
SipTransactionInfo transactionInfoForResonse = new SipTransactionInfo(response);
SubscribeTask subscribeTask = SubscribeTaskForCatalog.getInstance(device, this::catalogSubscribeExpire, transactionInfoForResonse);
SipTransactionInfo transactionInfoForResponse = new SipTransactionInfo(response);
SubscribeTask subscribeTask = SubscribeTaskForCatalog.getInstance(device, this::catalogSubscribeExpire, transactionInfoForResponse);
if (subscribeTask != null) {
subscribeTaskRunner.addSubscribe(subscribeTask);
}
@@ -566,7 +570,11 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
@Override
public boolean addMobilePositionSubscribe(@NotNull Device device, SipTransactionInfo transactionInfo) {
log.info("[添加移动位置订阅] 设备 {}", device.getDeviceId());
if (transactionInfo == null) {
log.info("[添加移动位置订阅] 设备 {}", device.getDeviceId());
}else {
log.info("[移动位置订阅续期] 设备 {}", device.getDeviceId());
}
try {
sipCommander.mobilePositionSubscribe(device, transactionInfo, eventResult -> {
ResponseEvent event = (ResponseEvent) eventResult.event;
@@ -574,13 +582,13 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
log.info("[移动位置订阅]成功: {}", device.getDeviceId());
if (!subscribeTaskRunner.containsKey(SubscribeTaskForMobilPosition.getKey(device))) {
SIPResponse response = (SIPResponse) event.getResponse();
SipTransactionInfo transactionInfoForResonse = new SipTransactionInfo(response);
SubscribeTask subscribeTask = SubscribeTaskForMobilPosition.getInstance(device, this::catalogSubscribeExpire, transactionInfoForResonse);
SipTransactionInfo transactionInfoForResponse = new SipTransactionInfo(response);
SubscribeTask subscribeTask = SubscribeTaskForMobilPosition.getInstance(device, this::mobilPositionSubscribeExpire, transactionInfoForResponse);
if (subscribeTask != null) {
subscribeTaskRunner.addSubscribe(subscribeTask);
}
}else {
subscribeTaskRunner.updateDelay(SubscribeTaskForMobilPosition.getKey(device), (device.getSubscribeCycleForCatalog() * 1000L - 500L) + System.currentTimeMillis());
subscribeTaskRunner.updateDelay(SubscribeTaskForMobilPosition.getKey(device), (device.getSubscribeCycleForMobilePosition() * 1000L - 500L) + System.currentTimeMillis());
}
},eventResult -> {
@@ -721,8 +729,6 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
@Override
public void updateDevice(Device device) {
String now = DateUtil.getNow();
device.setUpdateTime(now);
device.setCharset(device.getCharset() == null ? "" : device.getCharset().toUpperCase());
device.setUpdateTime(DateUtil.getNow());
if (deviceMapper.update(device) > 0) {
@@ -730,6 +736,40 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
}
}
@Transactional
@Override
public void updateDeviceList(List<Device> deviceList) {
if (deviceList.isEmpty()){
log.info("[批量更新设备] 列表为空,更细失败");
return;
}
if (deviceList.size() == 1) {
updateDevice(deviceList.get(0));
}else {
for (Device device : deviceList) {
device.setCharset(device.getCharset() == null ? "" : device.getCharset().toUpperCase());
device.setUpdateTime(DateUtil.getNow());
}
int limitCount = 300;
if (!deviceList.isEmpty()) {
if (deviceList.size() > limitCount) {
for (int i = 0; i < deviceList.size(); i += limitCount) {
int toIndex = i + limitCount;
if (i + limitCount > deviceList.size()) {
toIndex = deviceList.size();
}
deviceMapper.batchUpdate(deviceList.subList(i, toIndex));
}
}else {
deviceMapper.batchUpdate(deviceList);
}
for (Device device : deviceList) {
redisCatchStorage.updateDevice(device);
}
}
}
}
@Override
public boolean isExist(String deviceId) {
return getDeviceByDeviceIdFromDb(deviceId) != null;
@@ -866,7 +906,16 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
public void subscribeMobilePosition(int id, int cycle, int interval) {
Device device = deviceMapper.query(id);
Assert.notNull(device, "未找到设备");
Assert.isTrue(device.isOnLine(), "设备已离线");
if (!device.isOnLine()) {
// 开启订阅
device.setSubscribeCycleForMobilePosition(cycle);
device.setMobilePositionSubmissionInterval(interval);
updateDevice(device);
if (subscribeTaskRunner.containsKey(SubscribeTaskForMobilPosition.getKey(device))) {
subscribeTaskRunner.removeSubscribe(SubscribeTaskForMobilPosition.getKey(device));
}
throw new ControllerException(ErrorCode.ERROR100.getCode(), "设备已离线");
}
if (device.getSubscribeCycleForMobilePosition() == cycle) {
return;
@@ -882,6 +931,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
// 开启订阅
device.setSubscribeCycleForMobilePosition(cycle);
device.setMobilePositionSubmissionInterval(interval);
updateDevice(device);
if (cycle > 0) {
addMobilePositionSubscribe(device, null);
}

View File

@@ -118,6 +118,7 @@ public class GbChannelPlayServiceImpl implements IGbChannelPlayService {
@Override
public void play(CommonGBChannel channel, Platform platform, Boolean record, ErrorCallback<StreamInfo> callback) {
log.info("[通用通道] 播放, 类型: {} 编号:{}", channel.getDataType(), channel.getGbDeviceId());
if (channel.getDataType() == ChannelDataType.GB28181.value) {
// 国标通道
playGbDeviceChannel(channel, record, callback);

View File

@@ -10,6 +10,8 @@ import com.genersoft.iot.vmp.gb28181.service.IGbChannelService;
import com.genersoft.iot.vmp.gb28181.service.IGroupService;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@@ -293,4 +295,16 @@ public class GroupServiceImpl implements IGroupService {
allParent.add(parent);
return allParent;
}
@Override
public PageInfo<Group> queryList(Integer page, Integer count, String query) {
PageHelper.startPage(page, count);
if (query != null) {
query = query.replaceAll("/", "//")
.replaceAll("%", "/%")
.replaceAll("_", "/_");
}
List<Group> all = groupManager.query(query, null, null);
return new PageInfo<>(all);
}
}

View File

@@ -144,17 +144,12 @@ public class RegionServiceImpl implements IRegionService {
}
@Override
public List<RegionTree> queryForTree(String query, Integer parent, Boolean hasChannel) {
if (query != null) {
query = query.replaceAll("/", "//")
.replaceAll("%", "/%")
.replaceAll("_", "/_");
}
List<RegionTree> regionList = regionMapper.queryForTree(query, parent);
public List<RegionTree> queryForTree(Integer parent, Boolean hasChannel) {
List<RegionTree> regionList = regionMapper.queryForTree(parent);
if (parent != null && hasChannel != null && hasChannel) {
Region parentRegion = regionMapper.queryOne(parent);
if (parentRegion != null) {
List<RegionTree> channelList = commonGBChannelMapper.queryForRegionTreeByCivilCode(query, parentRegion.getDeviceId());
List<RegionTree> channelList = commonGBChannelMapper.queryForRegionTreeByCivilCode(parentRegion.getDeviceId());
regionList.addAll(channelList);
}
}
@@ -324,4 +319,16 @@ public class RegionServiceImpl implements IRegionService {
parentId = region.getId();
}
}
@Override
public PageInfo<Region> queryList(int page, int count, String query) {
PageHelper.startPage(page, count);
if (query != null) {
query = query.replaceAll("/", "//")
.replaceAll("%", "/%")
.replaceAll("_", "/_");
}
List<Region> all = regionMapper.query(query, null);
return new PageInfo<>(all);
}
}

View File

@@ -94,14 +94,7 @@ public class DeviceStatusTaskRunner {
return false;
}
log.debug("[更新状态任务时间] 编号: {}", key);
if (delayQueue.contains(task)) {
boolean remove = delayQueue.remove(task);
if (!remove) {
log.info("[更新状态任务时间] 从延时队列内移除失败: {}", key);
}
}
task.setDelayTime(expirationTime);
delayQueue.offer(task);
String redisKey = String.format("%s_%s_%s", prefix, userSetting.getServerId(), task.getDeviceId());
Duration duration = Duration.ofSeconds((expirationTime - System.currentTimeMillis())/1000);
redisTemplate.expire(redisKey, duration);

View File

@@ -94,14 +94,7 @@ public class SubscribeTaskRunner{
return false;
}
log.info("[更新订阅任务时间] {}, 编号: {}", task.getName(), key);
if (delayQueue.contains(task)) {
boolean remove = delayQueue.remove(task);
if (!remove) {
log.info("[更新订阅任务时间] 从延时队列内移除失败: {}", key);
}
}
task.setDelayTime(expirationTime);
delayQueue.offer(task);
String redisKey = String.format("%s_%s_%s", prefix, userSetting.getServerId(), task.getKey());
Duration duration = Duration.ofSeconds((expirationTime - System.currentTimeMillis())/1000);
redisTemplate.expire(redisKey, duration);

View File

@@ -12,7 +12,7 @@ public class SubscribeTaskForMobilPosition extends SubscribeTask {
public static final String name = "mobilPosition";
public static SubscribeTask getInstance(Device device, SubscribeCallback callback, SipTransactionInfo transactionInfo) {
if (device.getSubscribeCycleForCatalog() <= 0) {
if (device.getSubscribeCycleForMobilePosition() <= 0) {
return null;
}
SubscribeTaskForMobilPosition subscribeTaskForMobilPosition = new SubscribeTaskForMobilPosition();

View File

@@ -114,14 +114,7 @@ public class PlatformStatusTaskRunner {
return false;
}
log.info("[更新平台注册任务时间] 平台上级编号: {}", platformServerId);
if (registerDelayQueue.contains(task)) {
boolean remove = registerDelayQueue.remove(task);
if (!remove) {
log.info("[更新平台注册任务时间] 从延时队列内移除失败: {}", platformServerId);
}
}
task.setDelayTime(expirationTime);
registerDelayQueue.offer(task);
String redisKey = String.format("%s_%s_%s", prefix, userSetting.getServerId(), platformServerId);
Duration duration = Duration.ofSeconds((expirationTime - System.currentTimeMillis())/1000);
redisTemplate.expire(redisKey, duration);
@@ -165,14 +158,7 @@ public class PlatformStatusTaskRunner {
return false;
}
log.info("[更新平台心跳任务时间] 平台上级编号: {}", platformServerId);
if (keepaliveTaskDelayQueue.contains(task)) {
boolean remove = keepaliveTaskDelayQueue.remove(task);
if (!remove) {
log.info("[更新平台心跳任务时间] 从延时队列内移除失败: {}", platformServerId);
}
}
task.setDelayTime(expirationTime);
keepaliveTaskDelayQueue.offer(task);
return true;
}

View File

@@ -1136,9 +1136,9 @@ public class SIPCommander implements ISIPCommander {
}
cmdXml.append("</Query>\r\n");
MessageEvent<Object> messageEvent = MessageEvent.getInstance(cmdType, sn + "", channelId, 1000L, callback);
MessageEvent<Object> messageEvent = MessageEvent.getInstance(cmdType, sn + "", channelId, 4000L, callback);
messageSubscribe.addSubscribe(messageEvent);
log.info("[预置位查询] 设备编号: {} 通道编号: {} SN {}", device.getDeviceId(), channelId, sn);
Request request = headerProvider.createMessageRequest(device, cmdXml.toString(), null, SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()));
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, eventResult -> {
messageSubscribe.removeSubscribe(messageEvent.getKey());

View File

@@ -144,12 +144,15 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
// 携带授权头并且密码正确
response = getMessageFactory().createResponse(Response.OK, request);
// 添加date头
SIPDateHeader dateHeader = new SIPDateHeader();
// 使用自己修改的
GbSipDate gbSipDate = new GbSipDate(Calendar.getInstance(Locale.ENGLISH).getTimeInMillis());
dateHeader.setDate(gbSipDate);
response.addHeader(dateHeader);
// 如果主动禁用了Date头则不添加
if (!userSetting.isDisableDateHeader()) {
// 添加date头
SIPDateHeader dateHeader = new SIPDateHeader();
// 使用自己修改的
GbSipDate gbSipDate = new GbSipDate(Calendar.getInstance(Locale.ENGLISH).getTimeInMillis());
dateHeader.setDate(gbSipDate);
response.addHeader(dateHeader);
}
if (request.getExpires() == null) {
response = getMessageFactory().createResponse(Response.BAD_REQUEST, request);
@@ -218,12 +221,15 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
private Response getRegisterOkResponse(Request request) throws ParseException {
// 携带授权头并且密码正确
Response response = getMessageFactory().createResponse(Response.OK, request);
// 添加date头
SIPDateHeader dateHeader = new SIPDateHeader();
// 使用自己修改的
GbSipDate gbSipDate = new GbSipDate(Calendar.getInstance(Locale.ENGLISH).getTimeInMillis());
dateHeader.setDate(gbSipDate);
response.addHeader(dateHeader);
// 如果主动禁用了Date头则不添加
if (!userSetting.isDisableDateHeader()) {
// 添加date头
SIPDateHeader dateHeader = new SIPDateHeader();
// 使用自己修改的
GbSipDate gbSipDate = new GbSipDate(Calendar.getInstance(Locale.ENGLISH).getTimeInMillis());
dateHeader.setDate(gbSipDate);
response.addHeader(dateHeader);
}
// 添加Contact头
response.addHeader(request.getHeader(ContactHeader.NAME));

View File

@@ -69,6 +69,12 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
responseAck(request, Response.BAD_REQUEST);
return;
}
ExpiresHeader expires = request.getExpires();
if (expires == null) {
log.error("处理SUBSCRIBE请求 未获取到ExpiresHeader{}", evt.getRequest());
responseAck(request, Response.BAD_REQUEST, "missing expires");
return;
}
String platformId = SipUtils.getUserIdFromFromHeader(request);
String cmd = XmlUtil.getText(rootElement, "CmdType");
log.info("[收到订阅请求] 类型: {}, 来自: {}", cmd, platformId);
@@ -181,7 +187,6 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
.append("<Result>OK</Result>\r\n")
.append("</Response>\r\n");
try {
int expires = request.getExpires().getExpires();
Platform parentPlatform = platformService.queryPlatformByServerGBId(platformId);

View File

@@ -2,9 +2,12 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.event.MessageSubscribe;
import com.genersoft.iot.vmp.gb28181.event.sip.MessageEvent;
import com.genersoft.iot.vmp.gb28181.service.IPlatformService;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.query.cmd.CatalogQueryMessageHandler;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import gov.nist.javax.sip.message.SIPRequest;
import lombok.extern.slf4j.Slf4j;
import org.dom4j.Element;
@@ -28,6 +31,9 @@ public abstract class MessageHandlerAbstract extends SIPRequestProcessorParent i
@Autowired
private IPlatformService platformService;
@Autowired
private MessageSubscribe messageSubscribe;
public void addHandler(String cmdType, IMessageHandler messageHandler) {
messageHandlerMap.put(cmdType, messageHandler);
}
@@ -54,6 +60,8 @@ public abstract class MessageHandlerAbstract extends SIPRequestProcessorParent i
return;
}
messageHandler.handForDevice(evt, device, element);
}else {
handMessageEvent(element, null);
}
}
@@ -65,4 +73,21 @@ public abstract class MessageHandlerAbstract extends SIPRequestProcessorParent i
messageHandler.handForPlatform(evt, parentPlatform, element);
}
}
public void handMessageEvent(Element element, Object data) {
String cmd = getText(element, "CmdType");
String sn = getText(element, "SN");
MessageEvent<Object> subscribe = (MessageEvent<Object>)messageSubscribe.getSubscribe(cmd + sn);
if (subscribe != null && subscribe.getCallback() != null) {
String result = getText(element, "Result");
if (result == null || "OK".equalsIgnoreCase(result) || data != null) {
subscribe.getCallback().run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), data);
}else {
subscribe.getCallback().run(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg(), result);
}
messageSubscribe.removeSubscribe(cmd + sn);
}
}
}

View File

@@ -87,6 +87,7 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp
if (handlerCatchDataList.isEmpty()) {
return;
}
List<Device> deviceListForUpdate = new ArrayList<>();
for (SipMsgInfo sipMsgInfo : handlerCatchDataList) {
if (sipMsgInfo == null) {
continue;
@@ -113,7 +114,7 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp
device.setKeepaliveTime(DateUtil.getNow());
if (device.isOnLine()) {
deviceService.updateDevice(device);
deviceListForUpdate.add(device);
long expiresTime = Math.min(device.getExpires(), device.getHeartBeatInterval() * device.getHeartBeatCount()) * 1000L;
if (statusTaskRunner.containsKey(device.getDeviceId())) {
statusTaskRunner.updateDelay(device.getDeviceId(), expiresTime + System.currentTimeMillis());
@@ -125,6 +126,9 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp
}
}
}
if (!deviceListForUpdate.isEmpty()) {
deviceService.updateDeviceList(deviceListForUpdate);
}
}
@Override

View File

@@ -1,11 +1,8 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.event.MessageSubscribe;
import com.genersoft.iot.vmp.gb28181.event.sip.MessageEvent;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.MessageHandlerAbstract;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.MessageRequestProcessor;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import org.dom4j.Element;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
@@ -13,8 +10,6 @@ import org.springframework.stereotype.Component;
import javax.sip.RequestEvent;
import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText;
/**
* 命令类型: 请求动作的应答
* 命令类型: 设备控制, 报警通知, 设备目录信息查询, 目录信息查询, 目录收到, 设备信息查询, 设备状态信息查询 ......
@@ -27,8 +22,7 @@ public class ResponseMessageHandler extends MessageHandlerAbstract implements In
@Autowired
private MessageRequestProcessor messageRequestProcessor;
@Autowired
private MessageSubscribe messageSubscribe;
@Override
public void afterPropertiesSet() throws Exception {
@@ -38,21 +32,5 @@ public class ResponseMessageHandler extends MessageHandlerAbstract implements In
@Override
public void handForDevice(RequestEvent evt, Device device, Element element) {
super.handForDevice(evt, device, element);
handMessageEvent(element, null);
}
public void handMessageEvent(Element element, Object data) {
String cmd = getText(element, "CmdType");
String sn = getText(element, "SN");
MessageEvent<Object> subscribe = (MessageEvent<Object>)messageSubscribe.getSubscribe(cmd + sn);
if (subscribe != null && subscribe.getCallback() != null) {
String result = getText(element, "Result");
if (result == null || "OK".equalsIgnoreCase(result) || data != null) {
subscribe.getCallback().run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), data);
}else {
subscribe.getCallback().run(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg(), result);
}
messageSubscribe.removeSubscribe(cmd + sn);
}
}
}

View File

@@ -1,9 +1,9 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.MessageResponseTask;
import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.bean.Preset;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler;
@@ -13,6 +13,7 @@ import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.sip.InvalidArgumentException;
@@ -23,6 +24,12 @@ import java.text.ParseException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText;
/**
* 设备预置位查询应答
@@ -36,8 +43,9 @@ public class PresetQueryResponseMessageHandler extends SIPRequestProcessorParent
@Autowired
private ResponseMessageHandler responseMessageHandler;
@Autowired
private DeferredResultHolder deferredResultHolder;
private final Map<String, MessageResponseTask<Preset>> mesageMap = new ConcurrentHashMap<>();
private final DelayQueue<MessageResponseTask<Preset>> delayQueue = new DelayQueue<>();
@Override
@@ -93,7 +101,14 @@ public class PresetQueryResponseMessageHandler extends SIPRequestProcessorParent
presetQuerySipReqList.add(presetQuerySipReq);
}
}
responseMessageHandler.handMessageEvent(rootElement, presetQuerySipReqList);
// if (presetQuerySipReqList.size() == sumNum) {
// responseMessageHandler.handMessageEvent(rootElement, presetQuerySipReqList);
// }else {
// String sn = getText(element, "SN");
// addCatch(cmdType + "_" + sn, rootElement, presetQuerySipReqList);
// }
String sn = getText(element, "SN");
addCatch(cmdType + "_" + sn, sumNum, rootElement, presetQuerySipReqList);
try {
responseAck(request, Response.OK);
} catch (InvalidArgumentException | ParseException | SipException e) {
@@ -104,6 +119,63 @@ public class PresetQueryResponseMessageHandler extends SIPRequestProcessorParent
}
}
private void addCatch(String key, int sumNum, Element rootElement, List<Preset> presetQuerySipReqList) {
if (presetQuerySipReqList.size() == sumNum) {
responseMessageHandler.handMessageEvent(rootElement, presetQuerySipReqList);
if (mesageMap.containsKey(key)) {
MessageResponseTask<Preset> messageResponseTask = mesageMap.get(key);
mesageMap.remove(key);
boolean remove = delayQueue.remove(messageResponseTask);
if (!remove) {
log.info("[移除预置位查询任务] 从延时队列内移除失败: {}", key);
}
}
}else {
if (mesageMap.containsKey(key)) {
MessageResponseTask<Preset> messageResponseTask = mesageMap.get(key);
List<Preset> data = messageResponseTask.getData();
data.addAll(presetQuerySipReqList);
if (data.size() == sumNum) {
responseMessageHandler.handMessageEvent(rootElement, presetQuerySipReqList);
mesageMap.remove(key);
boolean remove = delayQueue.remove(messageResponseTask);
if (!remove) {
log.info("[移除预置位查询任务] 从延时队列内移除失败: {}", key);
}
return;
}
messageResponseTask.setDelayTime(System.currentTimeMillis() + 1000);
}else {
MessageResponseTask<Preset> messageResponseTask = new MessageResponseTask<>();
messageResponseTask.setElement(rootElement);
messageResponseTask.setData(presetQuerySipReqList);
messageResponseTask.setDelayTime(System.currentTimeMillis() + 1000);
messageResponseTask.setKey(key);
mesageMap.put(key, messageResponseTask);
delayQueue.offer(messageResponseTask);
}
}
}
// 处理过期的缓存
@Scheduled(fixedDelay = 500, timeUnit = TimeUnit.MILLISECONDS)
public void expirationCheck(){
while (!delayQueue.isEmpty()) {
MessageResponseTask<Preset> take = null;
try {
take = delayQueue.take();
try {
responseMessageHandler.handMessageEvent(take.getElement(), take.getData());
mesageMap.remove(take.getKey());
}catch (Exception e) {
log.error("[预置位查询到期] {} 到期处理时出现异常", take.getKey());
}
} catch (InterruptedException e) {
log.error("[设备订阅任务] ", e);
}
}
}
@Override
public void handForPlatform(RequestEvent evt, Platform parentPlatform, Element rootElement) {

View File

@@ -64,7 +64,7 @@ public interface IMediaNodeServerService {
Long updateDownloadProcess(MediaServer mediaServer, String app, String stream);
StreamInfo startProxy(MediaServer mediaServer, StreamProxy streamProxy);
void startProxy(MediaServer mediaServer, StreamProxy streamProxy);
void stopProxy(MediaServer mediaServer, String streamKey);

View File

@@ -150,7 +150,7 @@ public interface IMediaServerService {
Long updateDownloadProcess(MediaServer mediaServerItem, String app, String stream);
StreamInfo startProxy(MediaServer mediaServer, StreamProxy streamProxy);
void startProxy(MediaServer mediaServer, StreamProxy streamProxy);
void stopProxy(MediaServer mediaServer, String streamKey);

View File

@@ -952,13 +952,13 @@ public class MediaServerServiceImpl implements IMediaServerService {
}
@Override
public StreamInfo startProxy(MediaServer mediaServer, StreamProxy streamProxy) {
public void startProxy(MediaServer mediaServer, StreamProxy streamProxy) {
IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
if (mediaNodeServerService == null) {
log.info("[startProxy] 失败, mediaServer的类型 {},未找到对应的实现类", mediaServer.getType());
throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到mediaServer对应的实现类");
}
return mediaNodeServerService.startProxy(mediaServer, streamProxy);
mediaNodeServerService.startProxy(mediaServer, streamProxy);
}
@Override

View File

@@ -425,7 +425,7 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService {
}
@Override
public StreamInfo startProxy(MediaServer mediaServer, StreamProxy streamProxy) {
public void startProxy(MediaServer mediaServer, StreamProxy streamProxy) {
String dstUrl;
if ("ffmpeg".equalsIgnoreCase(streamProxy.getType())) {
@@ -463,10 +463,6 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService {
MediaInfo mediaInfo = getMediaInfo(mediaServer, streamProxy.getApp(), streamProxy.getStream());
if (mediaInfo != null) {
if (mediaInfo.getOriginUrl() != null && mediaInfo.getOriginUrl().equals(streamProxy.getSrcUrl())) {
log.info("[启动拉流代理] 已存在, 直接返回, app {}, stream: {}", mediaInfo.getApp(), streamProxy.getStream());
return getStreamInfoByAppAndStream(mediaServer, streamProxy.getApp(), streamProxy.getStream(), mediaInfo, null, true);
}
closeStreams(mediaServer, streamProxy.getApp(), streamProxy.getStream());
}
@@ -490,15 +486,6 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService {
JSONObject data = jsonObject.getJSONObject("data");
if (data == null) {
throw new ControllerException(jsonObject.getInteger("code"), "代理结果异常: " + jsonObject);
}else {
streamProxy.setStreamKey(data.getString("key"));
// 由于此时流未注册,手动拼装流信息
mediaInfo = new MediaInfo();
mediaInfo.setApp(streamProxy.getApp());
mediaInfo.setStream(streamProxy.getStream());
mediaInfo.setOriginType(4);
mediaInfo.setOriginTypeStr("pull");
return getStreamInfoByAppAndStream(mediaServer, streamProxy.getApp(), streamProxy.getStream(), mediaInfo, null, true);
}
}
}

View File

@@ -378,6 +378,7 @@ public class ZLMRESTfulUtils {
param.put("url", streamUrl);
param.put("timeout_sec", timeout_sec);
param.put("expire_sec", expire_sec);
param.put("async", 1);
sendGetForImg(mediaServerItem, "getSnap", param, targetPath, fileName);
}
@@ -446,7 +447,6 @@ public class ZLMRESTfulUtils {
BigDecimal bigDecimal = new BigDecimal(stamp);
param.put("stamp", bigDecimal);
param.put("schema", schema);
System.out.println(bigDecimal);
return sendPost(mediaServer, "seekRecordStamp",param, null);
}
}

View File

@@ -28,7 +28,7 @@ public interface IRedisRpcPlayService {
void playPush(String serverId, Integer id, ErrorCallback<StreamInfo> callback);
StreamInfo playProxy(String serverId, int id);
void playProxy(String serverId, int id, ErrorCallback<StreamInfo> callback);
void stopProxy(String serverId, int id);

View File

@@ -1,7 +1,6 @@
package com.genersoft.iot.vmp.service.redisMsg.control;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage;
@@ -63,10 +62,13 @@ public class RedisRpcStreamProxyController extends RpcController {
response.setBody("param error");
return response;
}
StreamInfo streamInfo = streamProxyPlayService.startProxy(streamProxy);
response.setStatusCode(ErrorCode.SUCCESS.getCode());
response.setBody(JSONObject.toJSONString(streamInfo));
return response;
streamProxyPlayService.startProxy(streamProxy, (code, msg, streamInfo) -> {
response.setStatusCode(code);
response.setBody(JSONObject.toJSONString(streamInfo));
sendResponse(response);
});
return null;
}
/**

View File

@@ -212,13 +212,20 @@ public class RedisRpcPlayServiceImpl implements IRedisRpcPlayService {
}
@Override
public StreamInfo playProxy(String serverId, int id) {
public void playProxy(String serverId, int id, ErrorCallback<StreamInfo> callback) {
RedisRpcRequest request = buildRequest("streamProxy/play", id);
request.setToId(serverId);
RedisRpcResponse response = redisRpcConfig.request(request, userSetting.getPlayTimeout(), TimeUnit.SECONDS);
if (response != null && response.getStatusCode() == ErrorCode.SUCCESS.getCode()) {
return JSON.parseObject(response.getBody().toString(), StreamInfo.class);
if (response == null) {
callback.run(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg(), null);
}else {
if (response.getStatusCode() == ErrorCode.SUCCESS.getCode()) {
StreamInfo streamInfo = JSON.parseObject(response.getBody().toString(), StreamInfo.class);
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
}else {
callback.run(response.getStatusCode(), response.getBody().toString(), null);
}
}
return null;
}
@Override

View File

@@ -7,12 +7,15 @@ import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.security.JwtUtils;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy;
import com.genersoft.iot.vmp.streamProxy.bean.StreamProxyParam;
import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyPlayService;
import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyService;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.StreamContent;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import com.github.pagehelper.PageInfo;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
@@ -20,8 +23,10 @@ import io.swagger.v3.oas.annotations.security.SecurityRequirement;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.context.request.async.DeferredResult;
import javax.servlet.http.HttpServletRequest;
import java.net.MalformedURLException;
@@ -89,7 +94,7 @@ public class StreamProxyController {
})
@PostMapping(value = "/save")
@ResponseBody
public StreamContent save(@RequestBody StreamProxyParam param){
public DeferredResult<WVPResult<StreamContent>> save(HttpServletRequest request, @RequestBody StreamProxyParam param){
log.info("添加代理: " + JSONObject.toJSONString(param));
if (ObjectUtils.isEmpty(param.getMediaServerId())) {
param.setMediaServerId("auto");
@@ -97,18 +102,39 @@ public class StreamProxyController {
if (ObjectUtils.isEmpty(param.getType())) {
param.setType("default");
}
DeferredResult<WVPResult<StreamContent>> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue());
ErrorCallback<StreamInfo> callback = (code, msg, streamInfo) -> {
if (code == InviteErrorCode.SUCCESS.getCode()) {
WVPResult<StreamContent> wvpResult = WVPResult.success();
if (streamInfo != null) {
if (userSetting.getUseSourceIpAsStreamIp()) {
streamInfo=streamInfo.clone();//深拷贝
String host;
try {
URL url=new URL(request.getRequestURL().toString());
host=url.getHost();
} catch (MalformedURLException e) {
host=request.getLocalAddr();
}
streamInfo.changeStreamIp(host);
}
if (!ObjectUtils.isEmpty(streamInfo.getMediaServer().getTranscodeSuffix())
&& !"null".equalsIgnoreCase(streamInfo.getMediaServer().getTranscodeSuffix())) {
streamInfo.setStream(streamInfo.getStream() + "_" + streamInfo.getMediaServer().getTranscodeSuffix());
}
wvpResult.setData(new StreamContent(streamInfo));
}else {
wvpResult.setCode(code);
wvpResult.setMsg(msg);
}
StreamInfo streamInfo = streamProxyService.save(param);
if (param.isEnable()) {
if (streamInfo == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg());
result.setResult(wvpResult);
}else {
return new StreamContent(streamInfo);
result.setResult(WVPResult.fail(code, msg));
}
}else {
return null;
}
};
streamProxyService.save(param, callback);
return result;
}
@Operation(summary = "新增代理", security = @SecurityRequirement(name = JwtUtils.HEADER), parameters = {
@@ -193,25 +219,46 @@ public class StreamProxyController {
@ResponseBody
@Operation(summary = "启用代理", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "id", description = "代理Id", required = true)
public StreamContent start(HttpServletRequest request, int id){
public DeferredResult<WVPResult<StreamContent>> start(HttpServletRequest request, int id){
log.info("播放代理: {}", id);
StreamInfo streamInfo = streamProxyPlayService.start(id, null, null);
if (streamInfo == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg());
}else {
if (userSetting.getUseSourceIpAsStreamIp()) {
streamInfo=streamInfo.clone();//深拷贝
String host;
try {
URL url=new URL(request.getRequestURL().toString());
host=url.getHost();
} catch (MalformedURLException e) {
host=request.getLocalAddr();
StreamProxy streamProxy = streamProxyService.getStreamProxy(id);
Assert.notNull(streamProxy, "代理信息不存在");
DeferredResult<WVPResult<StreamContent>> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue());
ErrorCallback<StreamInfo> callback = (code, msg, streamInfo) -> {
if (code == InviteErrorCode.SUCCESS.getCode()) {
WVPResult<StreamContent> wvpResult = WVPResult.success();
if (streamInfo != null) {
if (userSetting.getUseSourceIpAsStreamIp()) {
streamInfo=streamInfo.clone();//深拷贝
String host;
try {
URL url=new URL(request.getRequestURL().toString());
host=url.getHost();
} catch (MalformedURLException e) {
host=request.getLocalAddr();
}
streamInfo.changeStreamIp(host);
}
if (!ObjectUtils.isEmpty(streamInfo.getMediaServer().getTranscodeSuffix())
&& !"null".equalsIgnoreCase(streamInfo.getMediaServer().getTranscodeSuffix())) {
streamInfo.setStream(streamInfo.getStream() + "_" + streamInfo.getMediaServer().getTranscodeSuffix());
}
wvpResult.setData(new StreamContent(streamInfo));
}else {
wvpResult.setCode(code);
wvpResult.setMsg(msg);
}
streamInfo.changeStreamIp(host);
result.setResult(wvpResult);
}else {
result.setResult(WVPResult.fail(code, msg));
}
return new StreamContent(streamInfo);
}
};
streamProxyPlayService.start(id, null, callback);
return result;
}
@GetMapping(value = "/stop")

View File

@@ -92,5 +92,5 @@ public interface StreamProxyMapper {
" SET pulling=#{pulling}, media_server_id = #{mediaServerId}, " +
" stream_key = #{streamKey} " +
" WHERE id=#{id}")
void addStream(StreamProxy streamProxy);
void updateStream(StreamProxy streamProxy);
}

View File

@@ -4,13 +4,13 @@ import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy;
import javax.validation.constraints.NotNull;
public interface IStreamProxyPlayService {
StreamInfo start(int id, Boolean record, ErrorCallback<StreamInfo> callback);
void start(int id, Boolean record, ErrorCallback<StreamInfo> callback);
void start(int id, ErrorCallback<StreamInfo> callback);
StreamInfo startProxy(StreamProxy streamProxy);
void startProxy(@NotNull StreamProxy streamProxy, ErrorCallback<StreamInfo> callback);
void stop(int id);

View File

@@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.streamProxy.service;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy;
import com.genersoft.iot.vmp.streamProxy.bean.StreamProxyParam;
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
@@ -15,7 +16,7 @@ public interface IStreamProxyService {
* 保存视频代理
* @param param
*/
StreamInfo save(StreamProxyParam param);
void save(StreamProxyParam param, ErrorCallback<StreamInfo> callback);
/**
* 分页查询
@@ -38,7 +39,7 @@ public interface IStreamProxyService {
* @param stream
* @return
*/
boolean startByAppAndStream(String app, String stream);
void startByAppAndStream(String app, String stream, ErrorCallback<StreamInfo> callback);
/**
* 停用用视频代理

View File

@@ -1,36 +1,29 @@
package com.genersoft.iot.vmp.streamProxy.service.impl;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcPlayService;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcPlayService;
import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy;
import com.genersoft.iot.vmp.streamProxy.dao.StreamProxyMapper;
import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyPlayService;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import javax.sip.message.Response;
import javax.validation.constraints.NotNull;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
/**
* 视频代理业务
@@ -57,96 +50,42 @@ public class StreamProxyPlayServiceImpl implements IStreamProxyPlayService {
@Autowired
private IRedisRpcPlayService redisRpcPlayService;
private ConcurrentHashMap<Integer, ErrorCallback<StreamInfo>> callbackMap = new ConcurrentHashMap<>();
private ConcurrentHashMap<Integer, StreamInfo> streamInfoMap = new ConcurrentHashMap<>();
/**
* 流到来的处理
*/
@Async("taskExecutor")
@Transactional
@EventListener
public void onApplicationEvent(MediaArrivalEvent event) {
if ("rtsp".equals(event.getSchema())) {
StreamProxy streamProxy = streamProxyMapper.selectOneByAppAndStream(event.getApp(), event.getStream());
if (streamProxy != null) {
ErrorCallback<StreamInfo> callback = callbackMap.remove(streamProxy.getId());
StreamInfo streamInfo = streamInfoMap.remove(streamProxy.getId());
if (callback != null && streamInfo != null) {
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
}
}
}
}
@Override
public void start(int id, ErrorCallback<StreamInfo> callback) {
StreamProxy streamProxy = streamProxyMapper.select(id);
if (streamProxy == null) {
throw new ControllerException(ErrorCode.ERROR404.getCode(), "代理信息未找到");
}
StreamInfo streamInfo = startProxy(streamProxy);
if (streamInfo == null) {
callback.run(Response.BUSY_HERE, "busy here", null);
return;
}
callbackMap.put(id, callback);
streamInfoMap.put(id, streamInfo);
MediaServer mediaServer = mediaServerService.getOne(streamProxy.getMediaServerId());
if (mediaServer != null) {
MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, streamProxy.getApp(), streamProxy.getStream());
if (mediaInfo != null) {
callbackMap.remove(id);
streamInfoMap.remove(id);
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
}
}
}
@Override
public StreamInfo start(int id, Boolean record, ErrorCallback<StreamInfo> callback) {
public void start(int id, Boolean record, ErrorCallback<StreamInfo> callback) {
log.info("[拉流代理] 开始拉流ID{}", id);
StreamProxy streamProxy = streamProxyMapper.select(id);
if (streamProxy == null) {
throw new ControllerException(ErrorCode.ERROR404.getCode(), "代理信息未找到");
}
log.info("[拉流代理] 类型: {} app{}, stream: {}, 流地址: {}", streamProxy.getType(), streamProxy.getApp(), streamProxy.getStream(), streamProxy.getSrcUrl());
if (record != null) {
streamProxy.setEnableMp4(record);
}
StreamInfo streamInfo = startProxy(streamProxy);
if (callback != null) {
// 设置流超时的定时任务
String timeOutTaskKey = UUID.randomUUID().toString();
Hook rtpHook = Hook.getInstance(HookType.on_media_arrival, streamProxy.getApp(), streamProxy.getStream(), streamInfo.getMediaServer().getId());
dynamicTask.startDelay(timeOutTaskKey, () -> {
// 收流超时
subscribe.removeSubscribe(rtpHook);
callback.run(InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), streamInfo);
}, userSetting.getPlayTimeout());
// 开启流到来的监听
subscribe.addSubscribe(rtpHook, (hookData) -> {
dynamicTask.stop(timeOutTaskKey);
// hook响应
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
subscribe.removeSubscribe(rtpHook);
});
}
return streamInfo;
startProxy(streamProxy, callback);
}
@Override
public StreamInfo startProxy(StreamProxy streamProxy){
public void startProxy(@NotNull StreamProxy streamProxy, ErrorCallback<StreamInfo> callback){
if (!streamProxy.isEnable()) {
return null;
callback.run(ErrorCode.ERROR100.getCode(), "代理未启用", null);
return;
}
if (streamProxy.getServerId() == null) {
streamProxy.setServerId(userSetting.getServerId());
}
if (!userSetting.getServerId().equals(streamProxy.getServerId())) {
return redisRpcPlayService.playProxy(streamProxy.getServerId(), streamProxy.getId());
log.info("[拉流代理] 由其他服务{}管理", streamProxy.getServerId());
redisRpcPlayService.playProxy(streamProxy.getServerId(), streamProxy.getId(), callback);
return;
}
if (streamProxy.getMediaServerId() != null) {
StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStreamWithCheck(streamProxy.getApp(), streamProxy.getStream(), streamProxy.getMediaServerId(), null, false);
if (streamInfo != null) {
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
return;
}
}
MediaServer mediaServer;
@@ -159,12 +98,32 @@ public class StreamProxyPlayServiceImpl implements IStreamProxyPlayService {
if (mediaServer == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), mediaServerId == null?"未找到可用的媒体节点":"未找到节点" + mediaServerId);
}
StreamInfo streamInfo = mediaServerService.startProxy(mediaServer, streamProxy);
// 设置流超时的定时任务
String timeOutTaskKey = UUID.randomUUID().toString();
Hook rtpHook = Hook.getInstance(HookType.on_media_arrival, streamProxy.getApp(), streamProxy.getStream(), mediaServer.getId());
dynamicTask.startDelay(timeOutTaskKey, () -> {
log.info("[拉流代理] 收流超时app{}stream: {}", streamProxy.getApp(), streamProxy.getStream());
// 收流超时
subscribe.removeSubscribe(rtpHook);
callback.run(InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null);
}, userSetting.getPlayTimeout());
// 开启流到来的监听
subscribe.addSubscribe(rtpHook, (hookData) -> {
log.info("[拉流代理] 收流成功app{}stream: {}", hookData.getApp(), hookData.getStream());
dynamicTask.stop(timeOutTaskKey);
StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream(mediaServer, hookData.getApp(), hookData.getStream(), hookData.getMediaInfo(), null);
// hook响应
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
subscribe.removeSubscribe(rtpHook);
});
mediaServerService.startProxy(mediaServer, streamProxy);
if (mediaServerId == null || !mediaServerId.equals(mediaServer.getId())) {
streamProxy.setMediaServerId(mediaServer.getId());
streamProxyMapper.addStream(streamProxy);
streamProxyMapper.updateStream(streamProxy);
}
return streamInfo;
}
@Override

View File

@@ -15,6 +15,7 @@ import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerOfflineEvent;
import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerOnlineEvent;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy;
import com.genersoft.iot.vmp.streamProxy.bean.StreamProxyParam;
@@ -109,7 +110,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
// 拉流代理
StreamProxy streamProxyByAppAndStream = getStreamProxyByAppAndStream(event.getApp(), event.getStream());
if (streamProxyByAppAndStream != null && streamProxyByAppAndStream.isEnableDisableNoneReader()) {
startByAppAndStream(event.getApp(), event.getStream());
startByAppAndStream(event.getApp(), event.getStream(), ((code, msg, data) -> {
log.info("[拉流代理] 自动点播成功, app {} stream: {}", event.getApp(), event.getStream());
}));
}
}
@@ -136,7 +139,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
@Override
@Transactional
public StreamInfo save(StreamProxyParam param) {
public void save(StreamProxyParam param, ErrorCallback<StreamInfo> callback) {
// 兼容旧接口
StreamProxy streamProxyInDb = getStreamProxyByAppAndStream(param.getApp(), param.getStream());
if (streamProxyInDb != null && streamProxyInDb.getPulling() != null && streamProxyInDb.getPulling()) {
@@ -159,9 +162,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
}
if (param.isEnable()) {
return playService.startProxy(streamProxy);
} else {
return null;
playService.startProxy(streamProxy, callback);
}
}
@@ -247,13 +248,12 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
@Override
public boolean startByAppAndStream(String app, String stream) {
public void startByAppAndStream(String app, String stream, ErrorCallback<StreamInfo> callback) {
StreamProxy streamProxy = streamProxyMapper.selectOneByAppAndStream(app, stream);
if (streamProxy == null) {
throw new ControllerException(ErrorCode.ERROR404.getCode(), "代理信息未找到");
}
StreamInfo streamInfo = playService.startProxy(streamProxy);
return streamInfo != null;
playService.startProxy(streamProxy, callback);
}
@Override
@@ -406,7 +406,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
streamProxy.setPulling(status);
streamProxy.setMediaServerId(mediaServerId);
streamProxy.setUpdateTime(DateUtil.getNow());
streamProxyMapper.addStream(streamProxy);
streamProxyMapper.updateStream(streamProxy);
streamProxy.setGbStatus(status ? "ON" : "OFF");
if (streamProxy.getGbId() > 0) {

View File

@@ -15,7 +15,7 @@ import java.time.temporal.TemporalAccessor;
import java.util.Locale;
/**
/**
* 全局时间工具类
* @author lin
*/
@@ -74,7 +74,7 @@ public class DateUtil {
public static String yyyy_MM_dd_HH_mm_ssToISO8601(@NotNull String formatTime) {
return formatterISO8601.format(formatter.parse(formatTime));
}
public static String ISO8601Toyyyy_MM_dd_HH_mm_ss(String formatTime) {
// 三种日期格式都尝试,为了兼容不同厂家的日期格式
if (verification(formatTime, formatterCompatibleISO8601)) {
@@ -211,11 +211,4 @@ public class DateUtil {
return ChronoUnit.MILLIS.between(startInstant, endInstant);
}
public static void main(String[] args) {
long difference = getDifference("2025-05-21 13:00:00", "2025-05-21 13:30:00")/1000;
System.out.println(difference);
}
}

View File

@@ -100,7 +100,7 @@ public class RecordPlanController {
return recordPlanService.query(page, count, query);
}
@Operation(summary = "分页查询级联平台的所有所有通道", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Operation(summary = "分页查询录制计划关联的所有通道", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "page", description = "当前页", required = true)
@Parameter(name = "count", description = "每页条数", required = true)
@Parameter(name = "planId", description = "录制计划ID")