Merge remote-tracking branch 'origin/wvp-28181-2.0' into wvp-pro-record

This commit is contained in:
648540858
2022-02-22 11:55:23 +08:00
22 changed files with 392 additions and 320 deletions

View File

@@ -3,6 +3,11 @@ package com.genersoft.iot.vmp.gb28181.bean;
public class Device {
/**
* Id
*/
private int id;
/**
* 设备Id
*/
@@ -114,7 +119,13 @@ public class Device {
*/
private int subscribeCycleForCatalog ;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getDeviceId() {
return deviceId;
@@ -283,4 +294,6 @@ public class Device {
public void setSubscribeCycleForCatalog(int subscribeCycleForCatalog) {
this.subscribeCycleForCatalog = subscribeCycleForCatalog;
}
}

View File

@@ -5,6 +5,7 @@ package com.genersoft.iot.vmp.gb28181.bean;
*/
public class GbStream extends PlatformGbStream{
private int id;
private String app;
private String stream;
private String gbId;
@@ -19,6 +20,14 @@ public class GbStream extends PlatformGbStream{
*/
public Long createStamp;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getApp() {
return app;
}

View File

@@ -7,6 +7,10 @@ package com.genersoft.iot.vmp.gb28181.bean;
*/
public class MobilePosition {
/**
* Id
*/
private int id;
/**
* 设备Id
*/
@@ -72,6 +76,13 @@ public class MobilePosition {
*/
private String cnLat;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getDeviceId() {
return deviceId;

View File

@@ -1,25 +1,16 @@
package com.genersoft.iot.vmp.gb28181.bean;
public class PlatformGbStream {
private String app;
private String stream;
private String gbStreamId;
private String platformId;
private String catalogId;
public String getApp() {
return app;
public String getGbStreamId() {
return gbStreamId;
}
public void setApp(String app) {
this.app = app;
}
public String getStream() {
return stream;
}
public void setStream(String stream) {
this.stream = stream;
public void setGbStreamId(String gbStreamId) {
this.gbStreamId = gbStreamId;
}
public String getPlatformId() {

View File

@@ -76,7 +76,10 @@ public class KeepaliveTimeoutListenerForPlatform extends RedisKeyExpirationEvent
eventResult.callId = callid;
eventResult.msg = "注册超时";
eventResult.type = "register timeout";
sipSubscribe.getErrorSubscribe(callid).response(eventResult);
if (sipSubscribe.getErrorSubscribe(callid) != null) {
sipSubscribe.getErrorSubscribe(callid).response(eventResult);
}
}
}

View File

@@ -102,7 +102,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
sipSubscribe.addErrorSubscribe(callIdHeader.getCallId(), (event)->{
if (event != null) {
logger.info("向上级平台 [ {} ] 注册发错误: {} ",
logger.info("向上级平台 [ {} ] 注册发错误: {} ",
parentPlatform.getServerGBId(),
event.msg);
}

View File

@@ -373,7 +373,8 @@ public class StreamPushServiceImpl implements IStreamPushService {
return;
}
}
platformGbStreamMapper.batchAdd(platformId, catalogId, streamPushItems);
List<GbStream> gbStreamList = gbStreamMapper.selectAllForAppAndStream(streamPushItems);
platformGbStreamMapper.batchAdd(platformId, catalogId, gbStreamList);
eventPublisher.catalogEventPublishForStream(platformId, streamPushItems.toArray(new GbStream[0]), CatalogEvent.ADD);
}
}

View File

@@ -17,7 +17,7 @@ public class StreamPushUploadFileHandler extends AnalysisEventListener<StreamPus
private List<StreamPushItem> streamPushItems = new ArrayList<>();
private Map<String, UploadData> streamPushItemsForPlatform = new HashMap<>();
private Set<String> streamPushStreamSet = new HashSet<>();
private Set<String> streamPushGBSet = new HashSet<>();
private Map<String,String> streamPushGBMap = new HashMap<>();
private List<String> errorStreamList = new ArrayList<>();
private List<String> errorGBList = new ArrayList<>();
// 读取数量计数器
@@ -50,13 +50,17 @@ public class StreamPushUploadFileHandler extends AnalysisEventListener<StreamPus
|| StringUtils.isEmpty(streamPushExcelDto.getGbId())) {
return;
}
if (streamPushGBSet.contains(streamPushExcelDto.getGbId())) {
errorGBList.add(streamPushExcelDto.getGbId());
if (streamPushGBMap.get(streamPushExcelDto.getApp() + streamPushExcelDto.getStream()) == null) {
streamPushGBMap.put(streamPushExcelDto.getApp() + streamPushExcelDto.getStream(), streamPushExcelDto.getGbId());
}else {
if (!streamPushGBMap.get(streamPushExcelDto.getApp() + streamPushExcelDto.getStream()).equals(streamPushExcelDto.getGbId())) {
errorGBList.add(streamPushExcelDto.getGbId() + "(同一组app+stream使用了不同国标ID)");
return;
}
}
if (streamPushStreamSet.contains(streamPushExcelDto.getApp() + streamPushExcelDto.getStream() + streamPushExcelDto.getPlatformId())) {
errorStreamList.add(streamPushExcelDto.getApp() + "/" + streamPushExcelDto.getStream());
}
if (streamPushGBSet.contains(streamPushExcelDto.getGbId()) || streamPushStreamSet.contains(streamPushExcelDto.getApp() + streamPushExcelDto.getStream() + streamPushExcelDto.getPlatformId())) {
errorStreamList.add(streamPushExcelDto.getApp() + "/" + streamPushExcelDto.getStream()+ "/" + streamPushExcelDto.getPlatformId() + "(同一组app+stream添加在了同一个平台下)");
return;
}
@@ -95,8 +99,7 @@ public class StreamPushUploadFileHandler extends AnalysisEventListener<StreamPus
}
streamPushGBSet.add(streamPushExcelDto.getGbId());
streamPushStreamSet.add(streamPushExcelDto.getApp()+streamPushExcelDto.getStream());
streamPushStreamSet.add(streamPushExcelDto.getApp()+streamPushExcelDto.getStream() + streamPushExcelDto.getPlatformId());
loadedSize ++;
if (loadedSize > 1000) {
saveData();
@@ -111,7 +114,7 @@ public class StreamPushUploadFileHandler extends AnalysisEventListener<StreamPus
public void doAfterAllAnalysed(AnalysisContext analysisContext) {
// 这里也要保存数据,确保最后遗留的数据也存储到数据库
saveData();
streamPushGBSet.clear();
streamPushGBMap.clear();
streamPushStreamSet.clear();
errorDataHandler.handle(errorStreamList, errorGBList);
}

View File

@@ -86,8 +86,6 @@ public interface DeviceChannelMapper {
@Update(value = {"UPDATE device_channel SET streamId=#{streamId} WHERE deviceId=#{deviceId} AND channelId=#{channelId}"})
void startPlay(String deviceId, String channelId, String streamId);
@Select(value = {" <script>" +
"SELECT dc.channelId, "+
"dc.deviceId, " +
@@ -107,7 +105,7 @@ public interface DeviceChannelMapper {
" <if test='online == false' > AND dc.status=0</if> " +
" <if test='hasSubChannel!= null and hasSubChannel == true' > AND dc2.channelId is not null</if> " +
" <if test='hasSubChannel!= null and hasSubChannel == false' > AND dc2.channelId is null</if> " +
" <if test='catalogId == null ' > AND pgc.platformId is null AND pgc.catalogId is null</if> " +
" <if test='catalogId == null ' > AND ((pgc.platformId IS NULL AND pgc.catalogId IS NULL) or (pgc.platformId != #{platformId}))</if> " +
" <if test='catalogId != null ' > AND pgc.platformId =#{platformId} AND pgc.catalogId = #{catalogId}</if> " +
" ORDER BY dc.deviceId, dc.channelId ASC" +
" </script>"})

View File

@@ -38,12 +38,13 @@ public interface GbStreamMapper {
int del(String app, String stream);
@Select("<script> "+
"SELECT gs.*, pgs.platformId AS platformId, pgs.catalogId AS catalogId FROM gb_stream gs " +
"LEFT JOIN platform_gb_stream pgs ON gs.app = pgs.app AND gs.stream = pgs.stream " +
"SELECT gs.* FROM gb_stream gs " +
"WHERE " +
"1=1 " +
" <if test='catalogId != null'> AND pgs.platformId = #{platformId} AND pgs.catalogId = #{catalogId}</if> " +
" <if test='catalogId == null'> AND pgs.platformId is null AND pgs.catalogId is null</if> " +
" <if test='catalogId != null'> AND gs.id in" +
"(select pgs.gbStreamId from platform_gb_stream pgs where pgs.platformId = #{platformId} and pgs.catalogId=#{catalogId})</if> " +
" <if test='catalogId == null'> AND gs.id not in" +
"(select pgs.gbStreamId from platform_gb_stream pgs where pgs.platformId = #{platformId}) </if> " +
" <if test='query != null'> AND (gs.app LIKE '%${query}%' OR gs.stream LIKE '%${query}%' OR gs.gbId LIKE '%${query}%' OR gs.name LIKE '%${query}%')</if> " +
" <if test='pushing == true' > AND gs.status=1</if>" +
" <if test='pushing == false' > AND gs.status=0</if>" +
@@ -59,18 +60,18 @@ public interface GbStreamMapper {
List<GbStream> selectByGBId(String gbId);
@Select("SELECT gs.*, pgs.platformId as platformId, pgs.catalogId as catalogId FROM gb_stream gs " +
"LEFT JOIN platform_gb_stream pgs ON gs.app = pgs.app AND gs.stream = pgs.stream " +
"LEFT JOIN platform_gb_stream pgs ON gs.id = pgs.catalogId " +
"WHERE gs.gbId = '${gbId}' AND pgs.platformId = '${platformId}'")
GbStream queryStreamInPlatform(String platformId, String gbId);
@Select("SELECT gs.*, pgs.platformId as platformId, pgs.catalogId as catalogId FROM gb_stream gs " +
"LEFT JOIN platform_gb_stream pgs ON gs.app = pgs.app AND gs.stream = pgs.stream " +
"WHERE pgs.platformId = '${platformId}'")
"LEFT JOIN platform_gb_stream pgs ON gs.id = pgs.gbStreamId " +
"WHERE pgs.platformId = #{platformId}")
List<GbStream> queryGbStreamListInPlatform(String platformId);
@Select("SELECT gs.*, pgs.platformId as platformId, pgs.catalogId as catalogId FROM gb_stream gs LEFT JOIN platform_gb_stream pgs " +
"ON gs.app = pgs.app and gs.stream = pgs.stream WHERE pgs.app is NULL and pgs.stream is NULL")
@Select("SELECT gs.* FROM gb_stream gs LEFT JOIN platform_gb_stream pgs " +
"ON gs.id = pgs.gbStreamId WHERE pgs.gbStreamId is NULL")
List<GbStream> queryStreamNotInPlatform();
@Update("UPDATE gb_stream " +
@@ -124,4 +125,12 @@ public interface GbStreamMapper {
"</foreach>" +
"</script>"})
int updateStreamGPS(List<GPSMsgInfo> gpsMsgInfos);
@Select("<script> "+
"SELECT * FROM gb_stream where " +
"<foreach collection='streamPushItems' item='item' separator='or'>" +
"(app=#{item.app} and stream=#{item.stream}) " +
"</foreach>" +
"</script>")
List<GbStream> selectAllForAppAndStream(List<StreamPushItem> streamPushItems);
}

View File

@@ -21,7 +21,6 @@ public interface LogMapper {
"values ('${name}', '${type}', '${uri}', '${address}', '${result}', ${timing}, '${username}', '${createTime}')")
int add(LogDto logDto);
@Select(value = {"<script>" +
" SELECT * FROM log " +
" WHERE 1=1 " +
@@ -33,7 +32,6 @@ public interface LogMapper {
" </script>"})
List<LogDto> query(String query, String type, String startTime, String endTime);
@Delete("DELETE FROM log")
int clear();
}

View File

@@ -54,7 +54,11 @@ public interface ParentPlatformMapper {
" +\n" +
" (SELECT count(0)\n" +
" FROM platform_gb_stream pgs\n" +
" WHERE pgs.platformId = pp.serverGBId)) as channelCount\n" +
" WHERE pgs.platformId = pp.serverGBId)\n" +
" +\n" +
" (SELECT count(0)\n" +
" FROM platform_catalog pgc\n" +
" WHERE pgc.platformId = pp.serverGBId)) as channelCount\n" +
"FROM parent_platform pp ")
List<ParentPlatform> getParentPlatformList();

View File

@@ -34,7 +34,6 @@ public interface PlatformChannelMapper {
"</script>")
int addChannels(String platformId, List<ChannelReduce> channelReducesToAdd);
@Delete("<script> "+
"DELETE FROM platform_gb_channel WHERE platformId='${platformId}' AND deviceAndChannelId in" +
"<foreach collection='channelReducesToDel' item='item' open='(' separator=',' close=')' > '${item.deviceId}_${item.channelId}'</foreach>" +
@@ -51,12 +50,10 @@ public interface PlatformChannelMapper {
"</script>")
int cleanChannelForGB(String platformId);
@Select("SELECT * FROM device_channel WHERE deviceId = (SELECT deviceId FROM platform_gb_channel WHERE " +
"platformId='${platformId}' AND channelId='${channelId}' ) AND channelId='${channelId}'")
DeviceChannel queryChannelInParentPlatform(String platformId, String channelId);
@Select("select dc.channelId as id, dc.name as name, pgc.platformId as platformId, pgc.catalogId as parentId, 0 as childrenCount, 1 as type " +
"from device_channel dc left join platform_gb_channel pgc on dc.deviceId = pgc.deviceId and dc.channelId = pgc.channelId " +
"where pgc.platformId=#{platformId} and pgc.catalogId=#{catalogId}")

View File

@@ -16,22 +16,22 @@ import java.util.List;
@Repository
public interface PlatformGbStreamMapper {
@Insert("REPLACE INTO platform_gb_stream (app, stream, platformId, catalogId) VALUES" +
"('${app}', '${stream}', '${platformId}', '${catalogId}')")
@Insert("REPLACE INTO platform_gb_stream (gbStreamId, platformId, catalogId) VALUES" +
"( #{id}, #{platformId}, #{catalogId})")
int add(PlatformGbStream platformGbStream);
@Insert("<script> " +
"REPLACE into platform_gb_stream " +
"(app, stream, platformId, catalogId) " +
"(gbStreamId, platformId, catalogId) " +
"values " +
"<foreach collection='streamPushItems' index='index' item='item' separator=','> " +
"('${item.app}', '${item.stream}', '${platformId}', '${catalogId}')" +
"(#{item.id}, #{platformId}, #{catalogId})" +
"</foreach> " +
"</script>")
int batchAdd(String platformId, String catalogId, List<StreamPushItem> streamPushItems);
int batchAdd(String platformId, String catalogId, List<GbStream> streamPushItems);
@Delete("DELETE FROM platform_gb_stream WHERE app=#{app} AND stream=#{stream}")
@Delete("DELETE FROM platform_gb_stream WHERE gbStreamId = (select id from gb_stream where app=#{app} AND stream=#{stream})")
int delByAppAndStream(String app, String stream);
@Delete("DELETE FROM platform_gb_stream WHERE platformId=#{platformId}")
@@ -42,27 +42,28 @@ public interface PlatformGbStreamMapper {
"FROM " +
"platform_gb_stream pgs " +
"LEFT JOIN parent_platform pp ON pp.serverGBId = pgs.platformId " +
"LEFT JOIN gb_stream gs ON gs.id = pgs.gbStreamId " +
"WHERE " +
"pgs.app =#{app} " +
"AND pgs.stream =#{stream} ")
"gs.app =#{app} " +
"AND gs.stream =#{stream} ")
List<ParentPlatform> selectByAppAndStream(String app, String stream);
@Select("SELECT pgs.*, gs.gbId FROM platform_gb_stream pgs " +
"LEFT JOIN gb_stream gs ON pgs.app = gs.app AND pgs.stream = gs.stream " +
"WHERE pgs.app=#{app} AND pgs.stream=#{stream} AND pgs.platformId=#{serverGBId}")
"LEFT JOIN gb_stream gs ON pgs.gbStreamId = gs.id " +
"WHERE gs.app=#{app} AND gs.stream=#{stream} AND pgs.platformId=#{serverGBId}")
StreamProxyItem selectOne(String app, String stream, String serverGBId);
@Select("select gs.* \n" +
"from gb_stream gs\n" +
" left join platform_gb_stream pgs\n" +
" on gs.app = pgs.app and gs.stream = pgs.stream\n" +
" on gs.id = pgs.gbStreamId\n" +
"where pgs.platformId=#{platformId} and pgs.catalogId=#{catalogId}")
List<GbStream> queryChannelInParentPlatformAndCatalog(String platformId, String catalogId);
@Select("select gs.gbId as id, gs.name as name, pgs.platformId as platformId, pgs.catalogId as catalogId , 0 as childrenCount, 2 as type\n" +
"from gb_stream gs\n" +
" left join platform_gb_stream pgs\n" +
" on gs.app = pgs.app and gs.stream = pgs.stream\n" +
" on gs.id = pgs.gbStreamId\n" +
"where pgs.platformId=#{platformId} and pgs.catalogId=#{catalogId}")
List<PlatformCatalog> queryChannelInParentPlatformAndCatalogForCatalog(String platformId, String catalogId);
@@ -76,29 +77,31 @@ public interface PlatformGbStreamMapper {
"parent_platform pp " +
"left join platform_gb_stream pgs on " +
"pp.serverGBId = pgs.platformId " +
"left join gb_stream gs " +
"gs.id = pgs.gbStreamId " +
"WHERE " +
"pgs.app = #{app} " +
"AND pgs.stream = #{stream}" +
"gs.app = #{app} " +
"AND gs.stream = #{stream}" +
"AND pp.serverGBId IN" +
"<foreach collection='platforms' item='item' open='(' separator=',' close=')' > #{item}</foreach>" +
"</script> ")
List<ParentPlatform> queryPlatFormListForGBWithGBId(String app, String stream, List<String> platforms);
@Delete("DELETE FROM platform_gb_stream WHERE app=#{app} AND stream=#{stream} AND platformId=#{platformId}")
@Delete("DELETE FROM platform_gb_stream WHERE gbStreamId = (select id from gb_stream where app=#{app} AND stream=#{stream}) AND platformId=#{platformId}")
int delByAppAndStreamAndPlatform(String app, String stream, String platformId);
@Delete("<script> "+
"DELETE FROM platform_gb_stream where " +
"<foreach collection='gbStreams' item='item' separator='or'>" +
"(app=#{item.app} and stream=#{item.stream}) " +
"DELETE FROM platform_gb_stream where gbStreamId in " +
"<foreach collection='gbStreams' item='item' open='(' separator=',' close=')' >" +
"#{item.id}" +
"</foreach>" +
"</script>")
void delByGbStreams(List<GbStream> gbStreams);
@Delete("<script> "+
"DELETE FROM platform_gb_stream where " +
"<foreach collection='gbStreams' item='item' separator='or'>" +
"(app=#{item.app} and stream=#{item.stream}) and platformId=#{platformId}" +
"DELETE FROM platform_gb_stream where platformId=#{platformId} and gbStreamId in " +
"<foreach collection='gbStreams' item='item' open='(' separator=',' close=')'>" +
"#{item.id} " +
"</foreach>" +
"</script>")
void delByAppAndStreamsByPlatformId(List<GbStream> gbStreams, String platformId);

View File

@@ -61,7 +61,7 @@ public interface StreamPushMapper {
@Select(value = {" <script>" +
"SELECT " +
"st.*, " +
"pgs.gbId, pgs.status, pgs.name, pgs.longitude, pgs.latitude " +
"pgs.gbId, pgs.status, pgs.name, pgs.longitude, pgs.latitude , pgs.id " +
"from " +
"stream_push st " +
"LEFT JOIN gb_stream pgs " +

View File

@@ -705,9 +705,18 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
streamProxyItem.setCreateTime(now);
streamProxyItem.setCreateStamp(System.currentTimeMillis());
try {
if (gbStreamMapper.add(streamProxyItem)<0 || streamProxyMapper.add(streamProxyItem) < 0) {
if (streamProxyMapper.add(streamProxyItem) > 0) {
if (!StringUtils.isEmpty(streamProxyItem.getGbId())) {
if (gbStreamMapper.add(streamProxyItem) > 0) {
//事务回滚
dataSourceTransactionManager.rollback(transactionStatus);
return false;
}
}
}else {
//事务回滚
dataSourceTransactionManager.rollback(transactionStatus);
return false;
}
result = true;
dataSourceTransactionManager.commit(transactionStatus); //手动提交
@@ -731,10 +740,20 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
boolean result = false;
streamProxyItem.setStreamType("proxy");
try {
if (gbStreamMapper.update(streamProxyItem)<0 || streamProxyMapper.update(streamProxyItem) < 0) {
if (streamProxyMapper.update(streamProxyItem) > 0) {
if (!StringUtils.isEmpty(streamProxyItem.getGbId())) {
if (gbStreamMapper.update(streamProxyItem) > 0) {
//事务回滚
dataSourceTransactionManager.rollback(transactionStatus);
return false;
}
}
}else {
//事务回滚
dataSourceTransactionManager.rollback(transactionStatus);
return false;
}
dataSourceTransactionManager.commit(transactionStatus); //手动提交
result = true;
}catch (Exception e) {

View File

@@ -67,6 +67,8 @@ public class GbStreamController {
mediaServerId = null;
}
// catalogId 为null 查询未在平台下分配的数据
// catalogId 不为null 查询平台下这个,目录下的通道
return gbStreamService.getAll(page, count, platformId, catalogId, query, pushing, mediaServerId);
}

View File

@@ -71,6 +71,7 @@ public class StreamProxyController {
logger.info("添加代理: " + JSONObject.toJSONString(param));
if (StringUtils.isEmpty(param.getMediaServerId())) param.setMediaServerId("auto");
if (StringUtils.isEmpty(param.getType())) param.setType("default");
if (StringUtils.isEmpty(param.getGbId())) param.setGbId(null);
WVPResult<StreamInfo> result = streamProxyService.save(param);
return result;
}