From b7a2b6816b52c8e24526da3359ece05524686024 Mon Sep 17 00:00:00 2001 From: lin <648540858@qq.com> Date: Thu, 10 Jul 2025 18:48:25 +0800 Subject: [PATCH] =?UTF-8?q?[1078]=20=E5=AE=8C=E5=96=84=E5=BD=95=E5=83=8F?= =?UTF-8?q?=E4=B8=8B=E8=BD=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/conf/ftpServer/FileCallback.java | 8 + .../conf/ftpServer/FtpFileSystemFactory.java | 33 ++++ .../vmp/conf/ftpServer/FtpFileSystemView.java | 63 +++++++ .../vmp/conf/ftpServer/FtpServerConfig.java | 8 +- .../iot/vmp/conf/ftpServer/Ftplet.java | 2 + .../vmp/conf/ftpServer/VirtualFtpFile.java | 167 ++++++++++++++++++ .../vmp/conf/security/WebSecurityConfig.java | 1 + .../vmp/gb28181/bean/SipSendFailEvent.java | 19 ++ .../jt1078/bean/JTRecordDownloadCatch.java | 41 +++++ .../jt1078/controller/JT1078Controller.java | 124 +++++++++---- .../vmp/jt1078/service/Ijt1078Service.java | 8 +- .../service/impl/jt1078PlayServiceImpl.java | 4 - .../service/impl/jt1078ServiceImpl.java | 89 +++++++--- .../vmp/jt1078/session/DownloadManager.java | 108 +++++++++++ .../iot/vmp/jt1078/session/Session.java | 4 +- .../vmp/jt1078/session/SessionManager.java | 3 - .../iot/vmp/media/abl/ABLRESTfulUtils.java | 4 - web/src/api/jtDevice.js | 16 ++ web/src/store/modules/jtDevice.js | 12 +- web/src/views/jtDevice/channel/record.vue | 84 ++++++--- 20 files changed, 691 insertions(+), 107 deletions(-) create mode 100644 src/main/java/com/genersoft/iot/vmp/conf/ftpServer/FileCallback.java create mode 100644 src/main/java/com/genersoft/iot/vmp/conf/ftpServer/FtpFileSystemFactory.java create mode 100644 src/main/java/com/genersoft/iot/vmp/conf/ftpServer/FtpFileSystemView.java create mode 100644 src/main/java/com/genersoft/iot/vmp/conf/ftpServer/VirtualFtpFile.java create mode 100644 src/main/java/com/genersoft/iot/vmp/gb28181/bean/SipSendFailEvent.java create mode 100644 src/main/java/com/genersoft/iot/vmp/jt1078/bean/JTRecordDownloadCatch.java create mode 100644 src/main/java/com/genersoft/iot/vmp/jt1078/session/DownloadManager.java diff --git a/src/main/java/com/genersoft/iot/vmp/conf/ftpServer/FileCallback.java b/src/main/java/com/genersoft/iot/vmp/conf/ftpServer/FileCallback.java new file mode 100644 index 000000000..aa564fcd7 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/conf/ftpServer/FileCallback.java @@ -0,0 +1,8 @@ +package com.genersoft.iot.vmp.conf.ftpServer; + +import java.io.OutputStream; + +public interface FileCallback { + + OutputStream run(String path); +} diff --git a/src/main/java/com/genersoft/iot/vmp/conf/ftpServer/FtpFileSystemFactory.java b/src/main/java/com/genersoft/iot/vmp/conf/ftpServer/FtpFileSystemFactory.java new file mode 100644 index 000000000..5d771e9e0 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/conf/ftpServer/FtpFileSystemFactory.java @@ -0,0 +1,33 @@ +package com.genersoft.iot.vmp.conf.ftpServer; + +import org.apache.ftpserver.ftplet.FileSystemFactory; +import org.apache.ftpserver.ftplet.FileSystemView; +import org.apache.ftpserver.ftplet.FtpException; +import org.apache.ftpserver.ftplet.User; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.io.OutputStream; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +@Component +public class FtpFileSystemFactory implements FileSystemFactory { + + private final Map outputStreamMap = new ConcurrentHashMap<>(); + + @Override + public FileSystemView createFileSystemView(User user) throws FtpException { + return new FtpFileSystemView(user, path -> { + return outputStreamMap.get(path); + }); + } + + public void addOutputStream(String filePath, OutputStream outputStream) { + outputStreamMap.put(filePath, outputStream); + } + + public void removeOutputStream(String filePath) { + outputStreamMap.remove(filePath); + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/conf/ftpServer/FtpFileSystemView.java b/src/main/java/com/genersoft/iot/vmp/conf/ftpServer/FtpFileSystemView.java new file mode 100644 index 000000000..e7e8d3d57 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/conf/ftpServer/FtpFileSystemView.java @@ -0,0 +1,63 @@ +package com.genersoft.iot.vmp.conf.ftpServer; + +import org.apache.ftpserver.ftplet.FileSystemView; +import org.apache.ftpserver.ftplet.FtpException; +import org.apache.ftpserver.ftplet.FtpFile; +import org.apache.ftpserver.ftplet.User; + +import java.io.OutputStream; + +public class FtpFileSystemView implements FileSystemView { + + private User user; + + private FileCallback fileCallback; + + public FtpFileSystemView(User user, FileCallback fileCallback) { + this.user = user; + this.fileCallback = fileCallback; + } + + public static String HOME_PATH = "root"; + + public FtpFile workDir = VirtualFtpFile.getDir(HOME_PATH); + + @Override + public FtpFile getHomeDirectory() throws FtpException { + return VirtualFtpFile.getDir(HOME_PATH); + } + + @Override + public FtpFile getWorkingDirectory() throws FtpException { + return workDir; + } + + @Override + public boolean changeWorkingDirectory(String dir) throws FtpException { + workDir = VirtualFtpFile.getDir(dir); + return true; + } + + @Override + public FtpFile getFile(String file) throws FtpException { + VirtualFtpFile ftpFile = VirtualFtpFile.getFile(file); + if (fileCallback != null) { + OutputStream outputStream = fileCallback.run(workDir.getName()); + if (outputStream != null) { + ftpFile.setOutputStream(outputStream); + } + } + return ftpFile; + } + + @Override + public boolean isRandomAccessible() throws FtpException { + return true; + } + + @Override + public void dispose() { + } + + +} diff --git a/src/main/java/com/genersoft/iot/vmp/conf/ftpServer/FtpServerConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/ftpServer/FtpServerConfig.java index b2f9c0e13..d04016305 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/ftpServer/FtpServerConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/ftpServer/FtpServerConfig.java @@ -22,7 +22,10 @@ public class FtpServerConfig { private UserManager userManager; @Autowired - private Ftplet ftpPlet; + private FtpFileSystemFactory fileSystemFactory; + + @Autowired + private Ftplet ftplet; @Autowired private FtpSetting ftpSetting; @@ -45,11 +48,12 @@ public class FtpServerConfig { serverFactory.addListener("default", listener); // 5、配置自定义用户事件 Map ftpLets = new HashMap<>(); - ftpLets.put("ftpService", ftpPlet); + ftpLets.put("ftpService", ftplet); serverFactory.setFtplets(ftpLets); // 6、读取用户的配置信息 // 6.2、设置用信息 serverFactory.setUserManager(userManager); + serverFactory.setFileSystem(fileSystemFactory); // 7、实例化FTP Server FtpServer server = serverFactory.createServer(); try { diff --git a/src/main/java/com/genersoft/iot/vmp/conf/ftpServer/Ftplet.java b/src/main/java/com/genersoft/iot/vmp/conf/ftpServer/Ftplet.java index cf7c990ac..7b764c2cd 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/ftpServer/Ftplet.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/ftpServer/Ftplet.java @@ -42,6 +42,8 @@ public class Ftplet extends DefaultFtplet { return super.onUploadUniqueEnd(session, request); } + + @Override public FtpletResult onAppendEnd(FtpSession session, FtpRequest request) throws FtpException, IOException { FtpFile file = session.getFileSystemView().getFile(request.getArgument()); diff --git a/src/main/java/com/genersoft/iot/vmp/conf/ftpServer/VirtualFtpFile.java b/src/main/java/com/genersoft/iot/vmp/conf/ftpServer/VirtualFtpFile.java new file mode 100644 index 000000000..a30fbddd5 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/conf/ftpServer/VirtualFtpFile.java @@ -0,0 +1,167 @@ +package com.genersoft.iot.vmp.conf.ftpServer; + +import lombok.Setter; +import org.apache.ftpserver.ftplet.FtpFile; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Collections; +import java.util.List; + +public class VirtualFtpFile implements FtpFile { + + @Setter + private String name; + + @Setter + private boolean hidden = false; + + @Setter + private boolean directory = false; + + @Setter + private String ownerName; + + private Long lastModified = null; + + @Setter + private long size = 0; + + @Setter + private OutputStream outputStream; + + public static VirtualFtpFile getFile(String name) { + VirtualFtpFile virtualFtpFile = new VirtualFtpFile(); + virtualFtpFile.setName(name); + return virtualFtpFile; + } + + public static VirtualFtpFile getDir(String name) { + if (name.endsWith("/")) { + name = name.replaceAll("/", ""); + } + VirtualFtpFile virtualFtpFile = new VirtualFtpFile(); + virtualFtpFile.setName(name); + virtualFtpFile.setDirectory(true); + return virtualFtpFile; + } + + @Override + public String getAbsolutePath() { + return FtpFileSystemView.HOME_PATH + "/" + name; + } + + @Override + public String getName() { + return name; + } + + @Override + public boolean isHidden() { + return hidden; + } + + @Override + public boolean isDirectory() { + return directory; + } + + @Override + public boolean isFile() { + return !directory; + } + + @Override + public boolean doesExist() { + return false; + } + + @Override + public boolean isReadable() { + return true; + } + + @Override + public boolean isWritable() { + return true; + } + + @Override + public boolean isRemovable() { + return true; + } + + @Override + public String getOwnerName() { + return ownerName; + } + + @Override + public String getGroupName() { + return "root"; + } + + @Override + public int getLinkCount() { + return 0; + } + + @Override + public long getLastModified() { + if (lastModified == null) { + lastModified = System.currentTimeMillis(); + } + return lastModified; + } + + @Override + public boolean setLastModified(long time) { + lastModified = time; + return true; + } + + @Override + public long getSize() { + return size; + } + + @Override + public Object getPhysicalFile() { + System.err.println("getPhysicalFile"); + return null; + } + + @Override + public boolean mkdir() { + return true; + } + + @Override + public boolean delete() { + return true; + } + + @Override + public boolean move(FtpFile destination) { + this.name = destination.getName(); + return true; + } + + @Override + public List listFiles() { + return Collections.emptyList(); + } + + @Override + public OutputStream createOutputStream(long offset) throws IOException { + System.out.println("createOutputStream++++"); + return outputStream; + } + + @Override + public InputStream createInputStream(long offset) throws IOException { + System.out.println("createInputStream----"); + return null; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/conf/security/WebSecurityConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/security/WebSecurityConfig.java index 40809beeb..e146a396d 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/security/WebSecurityConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/security/WebSecurityConfig.java @@ -100,6 +100,7 @@ public class WebSecurityConfig extends WebSecurityConfigurerAdapter { defaultExcludes.add("/index/hook/**"); defaultExcludes.add("/api/device/query/snap/**"); defaultExcludes.add("/index/hook/abl/**"); + defaultExcludes.add("/api/jt1078/playback/download"); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SipSendFailEvent.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SipSendFailEvent.java new file mode 100644 index 000000000..2e2c54e5c --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SipSendFailEvent.java @@ -0,0 +1,19 @@ +package com.genersoft.iot.vmp.gb28181.bean; + +import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; +import lombok.Data; + +@Data +public class SipSendFailEvent extends SipSubscribe.EventResult { + + private String callId; + + private String msg; + + public static SipSendFailEvent getInstance(String callId, String msg){ + SipSendFailEvent sipSendFailEvent = new SipSendFailEvent(); + sipSendFailEvent.setMsg(msg); + sipSendFailEvent.setCallId(callId); + return sipSendFailEvent; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/bean/JTRecordDownloadCatch.java b/src/main/java/com/genersoft/iot/vmp/jt1078/bean/JTRecordDownloadCatch.java new file mode 100644 index 000000000..f81de5361 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/bean/JTRecordDownloadCatch.java @@ -0,0 +1,41 @@ +package com.genersoft.iot.vmp.jt1078.bean; + +import com.genersoft.iot.vmp.jt1078.proc.response.J9206; +import lombok.Getter; +import lombok.Setter; +import org.jetbrains.annotations.NotNull; + +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; + +public class JTRecordDownloadCatch implements Delayed { + + @Getter + @Setter + private String phoneNumber; + + @Getter + @Setter + private String path; + + @Getter + @Setter + private J9206 j9206; + + /** + * 超时时间(单位: 毫秒) + */ + @Getter + @Setter + private long delayTime; + + @Override + public long getDelay(@NotNull TimeUnit unit) { + return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); + } + + @Override + public int compareTo(@NotNull Delayed o) { + return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)); + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/controller/JT1078Controller.java b/src/main/java/com/genersoft/iot/vmp/jt1078/controller/JT1078Controller.java index b42610819..44def5ffd 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/controller/JT1078Controller.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/controller/JT1078Controller.java @@ -337,7 +337,67 @@ public class JT1078Controller { jt1078PlayService.stopPlayback(phoneNumber, channelId); } - @Operation(summary = "1078-录像-下载", security = @SecurityRequirement(name = JwtUtils.HEADER)) +// @Operation(summary = "1078-录像-下载", security = @SecurityRequirement(name = JwtUtils.HEADER)) +// @Parameter(name = "phoneNumber", description = "设备手机号", required = true) +// @Parameter(name = "channelId", description = "通道国标编号, 一般为从1开始的数字", required = true) +// @Parameter(name = "startTime", description = "开始时间,格式: yyyy-MM-dd HH:mm:ss", required = true) +// @Parameter(name = "endTime", description = "结束时间,格式: yyyy-MM-dd HH:mm:ss", required = true) +// @Parameter(name = "alarmSign", description = "报警标志", required = true) +// @Parameter(name = "mediaType", description = "音视频资源类型: 0.音视频 1.音频 2.视频 3.视频或音视频", required = true) +// @Parameter(name = "streamType", description = "码流类型:0.所有码流 1.主码流 2.子码流(如果此通道只传输音频,此字段置0)", required = true) +// @Parameter(name = "storageType", description = "存储器类型", required = true) +// @GetMapping("/playback/download") +// public DeferredResult recordDownload(HttpServletRequest request, +// HttpServletResponse response, +// @Parameter(required = true) String phoneNumber, +// @Parameter(required = true) Integer channelId, +// @Parameter(required = true) String startTime, +// @Parameter(required = true) String endTime, +// @Parameter(required = false) Integer alarmSign, +// @Parameter(required = false) Integer mediaType, +// @Parameter(required = false) Integer streamType, +// @Parameter(required = false) Integer storageType +// +// ) throws IOException { +// log.info("[JT-录像] 下载,设备:{}, 通道: {}, 开始时间: {}, 结束时间: {},报警标志: {}, 音视频类型: {}, 码流类型: {},存储器类型: {}, ", +// phoneNumber, channelId, startTime, endTime, alarmSign, mediaType, streamType, storageType); +// if (!ftpSetting.getEnable()) { +// throw new ControllerException(ErrorCode.ERROR100.getCode(), "未启用ftp服务,无法下载录像"); +// } +// DeferredResult result = new DeferredResult<>(600000L); +// ServletOutputStream outputStream = response.getOutputStream(); +// response.setContentType(MediaType.APPLICATION_OCTET_STREAM_VALUE); +// response.addHeader("Content-Disposition", "attachment;filename=" + URLEncoder.encode(phoneNumber + "_" + channelId + ".mp4", "UTF-8")); +// response.setStatus(HttpServletResponse.SC_OK); +// +// service.recordDownload(phoneNumber, channelId, startTime, endTime, alarmSign, mediaType, streamType, storageType, outputStream, wvpResult -> { +// String filePath = "ftp" + wvpResult.getData(); +// File file = new File(filePath); +// if (!file.exists()) { +// log.warn("[下载录像] 收到通知时未找到录像文件: {}", filePath); +// return; +// } +// try { +// final InputStream in = Files.newInputStream(file.toPath()); +// IOUtils.copy(in, outputStream); +// outputStream.flush(); +// in.close(); +// } catch (IOException e) { +// log.warn("[下载录像] 读取文件异常: {}", filePath, e); +// return; +// } finally { +// try { +// outputStream.close(); +// result.setResult(null); +// } catch (IOException ignored) { +// } +// } +// }); +// return result; +// } + + + @Operation(summary = "1078-录像-获取下载地址", security = @SecurityRequirement(name = JwtUtils.HEADER)) @Parameter(name = "phoneNumber", description = "设备手机号", required = true) @Parameter(name = "channelId", description = "通道国标编号, 一般为从1开始的数字", required = true) @Parameter(name = "startTime", description = "开始时间,格式: yyyy-MM-dd HH:mm:ss", required = true) @@ -346,53 +406,39 @@ public class JT1078Controller { @Parameter(name = "mediaType", description = "音视频资源类型: 0.音视频 1.音频 2.视频 3.视频或音视频", required = true) @Parameter(name = "streamType", description = "码流类型:0.所有码流 1.主码流 2.子码流(如果此通道只传输音频,此字段置0)", required = true) @Parameter(name = "storageType", description = "存储器类型", required = true) - @GetMapping("/playback/download") - public DeferredResult recordDownload(HttpServletRequest request, - HttpServletResponse response, - @Parameter(required = true) String phoneNumber, - @Parameter(required = true) Integer channelId, - @Parameter(required = true) String startTime, - @Parameter(required = true) String endTime, - @Parameter(required = false) Integer alarmSign, - @Parameter(required = false) Integer mediaType, - @Parameter(required = false) Integer streamType, - @Parameter(required = false) Integer storageType + @GetMapping("/playback/downloadUrl") + public String getRecordTempUrl(HttpServletRequest request, + @Parameter(required = true) String phoneNumber, + @Parameter(required = true) Integer channelId, + @Parameter(required = true) String startTime, + @Parameter(required = true) String endTime, + @Parameter(required = false) Integer alarmSign, + @Parameter(required = false) Integer mediaType, + @Parameter(required = false) Integer streamType, + @Parameter(required = false) Integer storageType - ) throws IOException { + ){ log.info("[JT-录像] 下载,设备:{}, 通道: {}, 开始时间: {}, 结束时间: {},报警标志: {}, 音视频类型: {}, 码流类型: {},存储器类型: {}, ", phoneNumber, channelId, startTime, endTime, alarmSign, mediaType, streamType, storageType); if (!ftpSetting.getEnable()) { throw new ControllerException(ErrorCode.ERROR100.getCode(), "未启用ftp服务,无法下载录像"); } - DeferredResult result = new DeferredResult<>(600000L); + return service.getRecordTempUrl(phoneNumber, channelId, startTime, endTime, alarmSign, mediaType, streamType, storageType); + } + + @Operation(summary = "1078-录像-下载", security = @SecurityRequirement(name = JwtUtils.HEADER)) + @Parameter(name = "path", description = "临时下载路径", required = true) + @GetMapping("/playback/download") + public void download(HttpServletRequest request, HttpServletResponse response, @Parameter(required = true) String path) throws IOException { + if (!ftpSetting.getEnable()) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "未启用ftp服务,无法下载录像"); + } + DeferredResult result = new DeferredResult<>(); ServletOutputStream outputStream = response.getOutputStream(); response.setContentType(MediaType.APPLICATION_OCTET_STREAM_VALUE); - response.addHeader("Content-Disposition", "attachment;filename=" + URLEncoder.encode(phoneNumber + "_" + channelId + ".mp4", "UTF-8")); + response.addHeader("Content-Disposition", "attachment;filename=" + URLEncoder.encode(path + ".mp4", "UTF-8")); response.setStatus(HttpServletResponse.SC_OK); - service.recordDownload(phoneNumber, channelId, startTime, endTime, alarmSign, mediaType, streamType, storageType, wvpResult -> { - String filePath = "ftp" + wvpResult.getData(); - File file = new File(filePath); - if (!file.exists()) { - log.warn("[下载录像] 收到通知时未找到录像文件: {}", filePath); - return; - } - try { - final InputStream in = Files.newInputStream(file.toPath()); - IOUtils.copy(in, outputStream); - outputStream.flush(); - in.close(); - } catch (IOException e) { - log.warn("[下载录像] 读取文件异常: {}", filePath, e); - return; - } finally { - try { - outputStream.close(); - result.setResult(null); - } catch (IOException ignored) { - } - } - }); - return result; + service.recordDownload(path, outputStream); } @Operation(summary = "1078-云台控制", security = @SecurityRequirement(name = JwtUtils.HEADER)) diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/service/Ijt1078Service.java b/src/main/java/com/genersoft/iot/vmp/jt1078/service/Ijt1078Service.java index dd217ed98..1323925c0 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/service/Ijt1078Service.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/service/Ijt1078Service.java @@ -7,6 +7,8 @@ import com.genersoft.iot.vmp.jt1078.proc.request.J1205; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import com.github.pagehelper.PageInfo; +import javax.servlet.ServletOutputStream; +import java.io.OutputStream; import java.util.List; public interface Ijt1078Service { @@ -101,7 +103,7 @@ public interface Ijt1078Service { void changeStreamType(String phoneNumber, Integer channelId, Integer streamType); - void recordDownload(String phoneNumber, Integer channelId, String startTime, String endTime, Integer alarmSign, Integer mediaType, Integer streamType, Integer storageType, CommonCallback> fileCallback); + void recordDownload(String phoneNumber, Integer channelId, String startTime, String endTime, Integer alarmSign, Integer mediaType, Integer streamType, Integer storageType, OutputStream outputStream, CommonCallback> fileCallback); PageInfo getChannelList(int page, int count, int deviceId, String query); @@ -116,4 +118,8 @@ public interface Ijt1078Service { void updateDevicePosition(String phoneNumber, Double longitude, Double latitude); JTChannel getChannelByDbId(Integer id); + + String getRecordTempUrl(String phoneNumber, Integer channelId, String startTime, String endTime, Integer alarmSign, Integer mediaType, Integer streamType, Integer storageType); + + void recordDownload(String filePath, ServletOutputStream outputStream); } diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078PlayServiceImpl.java index dc1de0234..cebdc1815 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078PlayServiceImpl.java @@ -444,7 +444,6 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService { @Override public void playbackControl(String phoneNumber, Integer channelId, Integer command, Integer playbackSpeed, String time) { - long l = System.currentTimeMillis(); String playKey = VideoManagerConstants.INVITE_INFO_1078_PLAYBACK + phoneNumber + ":" + channelId; dynamicTask.stop(playKey); if (command == 2) { @@ -471,8 +470,6 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService { log.info("[JT-回放控制] phoneNumber: {}, channelId: {}, command: {}, playbackSpeed: {}, time: {}", phoneNumber, channelId, command, playbackSpeed, time); } - System.out.println("清理回调 " + (System.currentTimeMillis() - l)); - l = System.currentTimeMillis(); // 发送停止命令 J9202 j9202 = new J9202(); j9202.setChannel(channelId); @@ -486,7 +483,6 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService { j9202.setPlaybackTime(DateUtil.yyyy_MM_dd_HH_mm_ssTo1078(time)); } jt1078Template.controlBackLive(phoneNumber, j9202, 4); - System.out.println("发送指令 " + (System.currentTimeMillis() - l)); } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078ServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078ServiceImpl.java index a6847dc71..56ce86d7a 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078ServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078ServiceImpl.java @@ -4,10 +4,14 @@ import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.DynamicTask; +import com.genersoft.iot.vmp.conf.ftpServer.FtpFileSystemFactory; +import com.genersoft.iot.vmp.conf.ftpServer.FtpFileSystemView; import com.genersoft.iot.vmp.conf.ftpServer.FtpSetting; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; +import com.genersoft.iot.vmp.gb28181.event.sip.MessageEvent; import com.genersoft.iot.vmp.gb28181.service.IGbChannelService; +import com.genersoft.iot.vmp.gb28181.task.deviceStatus.DeviceStatusTask; import com.genersoft.iot.vmp.jt1078.bean.*; import com.genersoft.iot.vmp.jt1078.bean.common.ConfigAttribute; import com.genersoft.iot.vmp.jt1078.cmd.JT1078Template; @@ -16,6 +20,7 @@ import com.genersoft.iot.vmp.jt1078.dao.JTTerminalMapper; import com.genersoft.iot.vmp.jt1078.event.FtpUploadEvent; import com.genersoft.iot.vmp.jt1078.proc.response.*; import com.genersoft.iot.vmp.jt1078.service.Ijt1078Service; +import com.genersoft.iot.vmp.jt1078.session.DownloadManager; import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent; import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent; import com.genersoft.iot.vmp.utils.DateUtil; @@ -32,10 +37,16 @@ import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; +import org.springframework.util.Assert; +import org.springframework.web.context.request.async.DeferredResult; +import javax.servlet.ServletOutputStream; +import java.io.OutputStream; import java.lang.reflect.Field; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.TimeUnit; @Service @Slf4j @@ -65,6 +76,12 @@ public class jt1078ServiceImpl implements Ijt1078Service { @Autowired private FtpSetting ftpSetting; + @Autowired + private FtpFileSystemFactory fileSystemFactory; + + @Autowired + private DownloadManager downloadManager; + /** * 流到来的处理 */ @@ -148,31 +165,14 @@ public class jt1078ServiceImpl implements Ijt1078Service { jtDeviceMapper.updateDeviceStatus(connected, phoneNumber); } - private Map>> fileUploadMap = new ConcurrentHashMap<>(); - - @EventListener - public void onApplicationEvent(FtpUploadEvent event) { - if (fileUploadMap.isEmpty()) { - return; - } - fileUploadMap.keySet().forEach(key -> { - if (!event.getFileName().contains(key)) { - return; - } - CommonCallback> callback = fileUploadMap.get(key); - if (callback != null) { - callback.run(new WVPResult<>(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), event.getFileName())); - fileUploadMap.remove(key); - } - }); - } @Override - public void recordDownload(String phoneNumber, Integer channelId, String startTime, String endTime, Integer alarmSign, Integer mediaType, Integer streamType, Integer storageType, CommonCallback> fileCallback) { + public void recordDownload(String phoneNumber, Integer channelId, String startTime, String endTime, Integer alarmSign, + Integer mediaType, Integer streamType, Integer storageType, OutputStream outputStream, CommonCallback> fileCallback) { String filePath = UUID.randomUUID().toString(); - fileUploadMap.put(filePath, fileCallback); + fileSystemFactory.addOutputStream(filePath, outputStream); dynamicTask.startDelay(filePath, ()->{ - fileUploadMap.remove(filePath); + fileSystemFactory.removeOutputStream(filePath); }, 2*60*60*1000); log.info("[JT-录像] 下载,设备:{}, 通道: {}, 开始时间: {}, 结束时间: {},等待上传文件路径: {} ", phoneNumber, channelId, startTime, endTime, filePath); @@ -685,4 +685,51 @@ public class jt1078ServiceImpl implements Ijt1078Service { public JTChannel getChannelByDbId(Integer id) { return jtChannelMapper.selectChannelById(id); } + + + + @Override + public String getRecordTempUrl(String phoneNumber, Integer channelId, String startTime, String endTime, Integer alarmSign, Integer mediaType, Integer streamType, Integer storageType) { + String filePath = UUID.randomUUID().toString(); + + log.info("[JT-录像] 下载,设备:{}, 通道: {}, 开始时间: {}, 结束时间: {},等待上传文件路径: {} ", + phoneNumber, channelId, startTime, endTime, filePath); + // 发送停止命令 + J9206 j9206 = new J9206(); + j9206.setChannelId(channelId); + j9206.setStartTime(DateUtil.yyyy_MM_dd_HH_mm_ssTo1078(startTime)); + j9206.setEndTime(DateUtil.yyyy_MM_dd_HH_mm_ssTo1078(endTime)); + j9206.setServerIp(ftpSetting.getIp()); + j9206.setPort(ftpSetting.getPort()); + j9206.setUsername(ftpSetting.getUsername()); + j9206.setPassword(ftpSetting.getPassword()); + j9206.setPath(filePath); + + if (mediaType != null) { + j9206.setMediaType(mediaType); + } + if (streamType != null) { + j9206.setStreamType(streamType); + } + if (storageType != null) { + j9206.setStorageType(storageType); + } + if (alarmSign != null) { + j9206.setAlarmSign(alarmSign); + } + downloadManager.addCatch(filePath, phoneNumber, j9206); + return filePath; + } + + + @Override + public void recordDownload(String filePath, ServletOutputStream outputStream) { + JTRecordDownloadCatch downloadCatch = downloadManager.getCatch(filePath); + Assert.notNull(downloadCatch, "地址不存在"); + fileSystemFactory.addOutputStream(filePath, outputStream); + jt1078Template.fileUpload(downloadCatch.getPhoneNumber(), downloadCatch.getJ9206(), 7200); + downloadManager.runDownload(filePath, 2 * 60 * 60); + fileSystemFactory.removeOutputStream(filePath); + + } } diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/session/DownloadManager.java b/src/main/java/com/genersoft/iot/vmp/jt1078/session/DownloadManager.java new file mode 100644 index 000000000..25d91bbe4 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/session/DownloadManager.java @@ -0,0 +1,108 @@ +package com.genersoft.iot.vmp.jt1078.session; + +import com.genersoft.iot.vmp.common.CommonCallback; +import com.genersoft.iot.vmp.jt1078.bean.JTRecordDownloadCatch; +import com.genersoft.iot.vmp.jt1078.event.FtpUploadEvent; +import com.genersoft.iot.vmp.jt1078.proc.response.J9206; +import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; +import com.genersoft.iot.vmp.vmanager.bean.WVPResult; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.event.EventListener; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; + +@Slf4j +@Component +public class DownloadManager { + + private final Map downloadCatchMap = new ConcurrentHashMap<>(); + private final DelayQueue downloadCatchQueue = new DelayQueue<>(); + + private final Map> topicSubscribers = new ConcurrentHashMap<>(); + + // 下载过期检查 + @Scheduled(fixedDelay = 1, timeUnit = TimeUnit.SECONDS) + public void downloadCatchCheck(){ + while (!downloadCatchQueue.isEmpty()) { + try { + JTRecordDownloadCatch take = downloadCatchQueue.take(); + downloadCatchMap.remove(take.getPath()); + } catch (InterruptedException e) { + log.error("[下载过期] ", e); + } + } + } + + public void addCatch(String path, String phoneNumber, J9206 j9206) { + JTRecordDownloadCatch downloadCatch = new JTRecordDownloadCatch(); + downloadCatch.setPhoneNumber(phoneNumber); + downloadCatch.setPath(path); + downloadCatch.setJ9206(j9206); + + // 10分钟临时地址无法访问则删除 + downloadCatch.setDelayTime(System.currentTimeMillis() + 10 * 60 * 1000L); + + downloadCatchMap.put(path, downloadCatch); + downloadCatchQueue.add(downloadCatch); + } + + public JTRecordDownloadCatch getCatch(String path) { + return downloadCatchMap.get(path); + } + + @EventListener + public void onApplicationEvent(FtpUploadEvent event) { + if (topicSubscribers.isEmpty()) { + return; + } + topicSubscribers.keySet().forEach(key -> { + if (!event.getFileName().contains(key)) { + return; + } + SynchronousQueue synchronousQueue = topicSubscribers.get(key); + if (synchronousQueue != null) { + synchronousQueue.offer(null); + } + }); + } + + + public Object runDownload(String path, long timeOut) { + SynchronousQueue subscribe = subscribe(path); + if (subscribe == null) { + log.error("[JT-下载] 暂停进程失败"); + return null; + } + try { + return subscribe.poll(timeOut, TimeUnit.SECONDS); + } catch (InterruptedException e) { + log.warn("[JT-下载] 暂停进程超时", e); + } finally { + this.unsubscribe(path); + JTRecordDownloadCatch downloadCatch = getCatch(path); + if (downloadCatch != null) { + downloadCatchMap.remove(path); + downloadCatchQueue.remove(downloadCatch); + } + } + return null; + } + + private SynchronousQueue subscribe(String key) { + SynchronousQueue queue = null; + if (!topicSubscribers.containsKey(key)) + topicSubscribers.put(key, queue = new SynchronousQueue<>()); + return queue; + } + + private void unsubscribe(String key) { + topicSubscribers.remove(key); + } + +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/session/Session.java b/src/main/java/com/genersoft/iot/vmp/jt1078/session/Session.java index 9faea3b74..a0558cc1f 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/session/Session.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/session/Session.java @@ -45,9 +45,7 @@ public class Session { public void writeObject(Object message) { log.info("<<<<<<<<<< cmd{},{}", this, message); - System.out.println(message); - ChannelFuture channelFuture = channel.writeAndFlush(message); - System.out.println(222); + channel.writeAndFlush(message); } /** diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/session/SessionManager.java b/src/main/java/com/genersoft/iot/vmp/jt1078/session/SessionManager.java index 3c1bd7390..555ea4379 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/session/SessionManager.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/session/SessionManager.java @@ -58,7 +58,6 @@ public enum SessionManager { */ protected void put(Object clientId, Session newSession) { sessionMap.put(clientId, newSession); - System.out.println(sessionMap.size()); } @@ -79,7 +78,6 @@ public enum SessionManager { return null; } String requestKey = requestKey(cmd.getPhoneNumber(), cmd.getRespId(), cmd.getPackageNo()); - System.out.println("requestKey==" + requestKey); SynchronousQueue subscribe = subscribe(requestKey); if (subscribe == null) { log.error("DevId: {} key:{} send repaid", cmd.getPhoneNumber(), requestKey); @@ -103,7 +101,6 @@ public enum SessionManager { if (responseNo == null) { for (String key : topicSubscribers.keySet()) { if (key.startsWith(requestKey)) { - System.out.println(key); SynchronousQueue queue = topicSubscribers.get(key); if (queue != null) { result = true; diff --git a/src/main/java/com/genersoft/iot/vmp/media/abl/ABLRESTfulUtils.java b/src/main/java/com/genersoft/iot/vmp/media/abl/ABLRESTfulUtils.java index 5e720c1d9..b23c1adf6 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/abl/ABLRESTfulUtils.java +++ b/src/main/java/com/genersoft/iot/vmp/media/abl/ABLRESTfulUtils.java @@ -106,8 +106,6 @@ public class ABLRESTfulUtils { responseJSON = JSON.parseObject(responseStr); } }else { - System.out.println( 2222); - System.out.println( response.code()); response.close(); Objects.requireNonNull(response.body()).close(); } @@ -388,8 +386,6 @@ public class ABLRESTfulUtils { param.put("stream", stream); param.put("starttime", startTime); param.put("endtime", endTime); - System.out.println("starttime: "+ startTime); - System.out.println("endtime: "+ endTime); return sendPost(mediaServer,"queryRecordList", param, null); } diff --git a/web/src/api/jtDevice.js b/web/src/api/jtDevice.js index 041d7b6df..7f9874217 100644 --- a/web/src/api/jtDevice.js +++ b/web/src/api/jtDevice.js @@ -169,6 +169,22 @@ export function startPlayback(params) { } }) } +export function getRecordTempUrl({ phoneNumber, channelId, startTime, endTime, alarmSign, mediaType, streamType, storageType }) { + return request({ + method: 'get', + url: '/api/jt1078/playback/downloadUrl', + params: { + phoneNumber: phoneNumber, + channelId: channelId, + startTime: startTime, + endTime: endTime, + alarmSign: alarmSign, + mediaType: mediaType, + streamType: streamType, + storageType: storageType + } + }) +} export function controlPlayback(params) { const { phoneNumber, channelId, command, playbackSpeed, time } = params return request({ diff --git a/web/src/store/modules/jtDevice.js b/web/src/store/modules/jtDevice.js index 125cab604..e75dd766d 100644 --- a/web/src/store/modules/jtDevice.js +++ b/web/src/store/modules/jtDevice.js @@ -1,7 +1,7 @@ import { add, addChannel, controlPlayback, deleteDevice, - fillLight, + fillLight, getRecordTempUrl, play, ptz, queryChannels, queryConfig, queryDeviceById, @@ -200,6 +200,16 @@ const actions = { reject(error) }) }) + }, + getRecordTempUrl({ commit }, params) { + return new Promise((resolve, reject) => { + getRecordTempUrl(params).then(response => { + const { data } = response + resolve(data) + }).catch(error => { + reject(error) + }) + }) } } diff --git a/web/src/views/jtDevice/channel/record.vue b/web/src/views/jtDevice/channel/record.vue index 98cc690b5..2fe3a2582 100755 --- a/web/src/views/jtDevice/channel/record.vue +++ b/web/src/views/jtDevice/channel/record.vue @@ -475,36 +475,62 @@ export default { spinner: 'el-icon-loading', background: 'rgba(0, 0, 0, 0.7)' }) - const baseUrl = window.baseUrl ? window.baseUrl : '' - const downloadFileUrl = ((process.env.NODE_ENV === 'development') ? process.env.VUE_APP_BASE_API : baseUrl) + - `/api/jt1078/playback/download?phoneNumber=${this.phoneNumber}&channelId=${this.channelId}&startTime=${row.startTime}&endTime=${row.endTime}` + - `&alarmSign=${row.alarmSign}&mediaType=${row.mediaType}&streamType=${row.streamType}&storageType=${row.storageType}&access-token=${this.$store.getters.token}` - const x = new XMLHttpRequest() - x.open('GET', downloadFileUrl, true) - x.responseType = 'blob' - x.onload = (e) => { - const url = window.URL.createObjectURL(x.response) - const a = document.createElement('a') - a.href = url - a.download = this.phoneNumber + '-' + this.channelId + '.mp4' - a.click() + // 获取下载地址 + this.$store.dispatch('jtDevice/getRecordTempUrl', { + phoneNumber: this.phoneNumber, + channelId: this.channelId, + startTime: row.startTime, + endTime: row.endTime, + alarmSign: row.alarmSign, + mediaType: row.mediaType, + streamType: row.streamType, + storageType: row.storageType + }).then(data => { + console.log(data) + // const a = document.createElement('a') + const baseUrl = window.baseUrl + let dev = (process.env.NODE_ENV === 'development' ? process.env.VUE_APP_BASE_API : baseUrl) + // a.href = + // a.download = data + '.mp4' + // a.click() + + window.open(`${dev}/api/jt1078/playback/download?path=${data}`) + }).finally(() => { loading.close() - } - x.ontimeout = (e) => { - loading.close() - this.$message.error({ - showClose: true, - message: '加载超时' - }) - } - x.onerror = (e) => { - loading.close() - this.$message.error({ - showClose: true, - message: e.error - }) - } - x.send() + }) + + + + // const baseUrl = window.baseUrl ? window.baseUrl : '' + // const downloadFileUrl = ((process.env.NODE_ENV === 'development') ? process.env.VUE_APP_BASE_API : baseUrl) + + // `/api/jt1078/playback/download?phoneNumber=${this.phoneNumber}&channelId=${this.channelId}&startTime=${row.startTime}&endTime=${row.endTime}` + + // `&alarmSign=${row.alarmSign}&mediaType=${row.mediaType}&streamType=${row.streamType}&storageType=${row.storageType}&access-token=${this.$store.getters.token}` + // const x = new XMLHttpRequest() + // x.open('GET', downloadFileUrl, true) + // x.responseType = 'blob' + // x.onload = (e) => { + // const url = window.URL.createObjectURL(x.response) + // const a = document.createElement('a') + // a.href = url + // a.download = this.phoneNumber + '-' + this.channelId + '.mp4' + // a.click() + // loading.close() + // } + // x.ontimeout = (e) => { + // loading.close() + // this.$message.error({ + // showClose: true, + // message: '加载超时' + // }) + // } + // x.onerror = (e) => { + // loading.close() + // this.$message.error({ + // showClose: true, + // message: e.error + // }) + // } + // x.send() }, getFileShowName(item) { return moment(item.startTime).format('HH:mm:ss') + '-' + moment(item.endTime).format('HH:mm:ss')