[1078] 完善录像下载

This commit is contained in:
lin
2025-07-10 18:48:25 +08:00
parent 1655e5903b
commit b7a2b6816b
20 changed files with 691 additions and 107 deletions

View File

@@ -0,0 +1,8 @@
package com.genersoft.iot.vmp.conf.ftpServer;
import java.io.OutputStream;
public interface FileCallback {
OutputStream run(String path);
}

View File

@@ -0,0 +1,33 @@
package com.genersoft.iot.vmp.conf.ftpServer;
import org.apache.ftpserver.ftplet.FileSystemFactory;
import org.apache.ftpserver.ftplet.FileSystemView;
import org.apache.ftpserver.ftplet.FtpException;
import org.apache.ftpserver.ftplet.User;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.OutputStream;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Component
public class FtpFileSystemFactory implements FileSystemFactory {
private final Map<String, OutputStream> outputStreamMap = new ConcurrentHashMap<>();
@Override
public FileSystemView createFileSystemView(User user) throws FtpException {
return new FtpFileSystemView(user, path -> {
return outputStreamMap.get(path);
});
}
public void addOutputStream(String filePath, OutputStream outputStream) {
outputStreamMap.put(filePath, outputStream);
}
public void removeOutputStream(String filePath) {
outputStreamMap.remove(filePath);
}
}

View File

@@ -0,0 +1,63 @@
package com.genersoft.iot.vmp.conf.ftpServer;
import org.apache.ftpserver.ftplet.FileSystemView;
import org.apache.ftpserver.ftplet.FtpException;
import org.apache.ftpserver.ftplet.FtpFile;
import org.apache.ftpserver.ftplet.User;
import java.io.OutputStream;
public class FtpFileSystemView implements FileSystemView {
private User user;
private FileCallback fileCallback;
public FtpFileSystemView(User user, FileCallback fileCallback) {
this.user = user;
this.fileCallback = fileCallback;
}
public static String HOME_PATH = "root";
public FtpFile workDir = VirtualFtpFile.getDir(HOME_PATH);
@Override
public FtpFile getHomeDirectory() throws FtpException {
return VirtualFtpFile.getDir(HOME_PATH);
}
@Override
public FtpFile getWorkingDirectory() throws FtpException {
return workDir;
}
@Override
public boolean changeWorkingDirectory(String dir) throws FtpException {
workDir = VirtualFtpFile.getDir(dir);
return true;
}
@Override
public FtpFile getFile(String file) throws FtpException {
VirtualFtpFile ftpFile = VirtualFtpFile.getFile(file);
if (fileCallback != null) {
OutputStream outputStream = fileCallback.run(workDir.getName());
if (outputStream != null) {
ftpFile.setOutputStream(outputStream);
}
}
return ftpFile;
}
@Override
public boolean isRandomAccessible() throws FtpException {
return true;
}
@Override
public void dispose() {
}
}

View File

@@ -22,7 +22,10 @@ public class FtpServerConfig {
private UserManager userManager;
@Autowired
private Ftplet ftpPlet;
private FtpFileSystemFactory fileSystemFactory;
@Autowired
private Ftplet ftplet;
@Autowired
private FtpSetting ftpSetting;
@@ -45,11 +48,12 @@ public class FtpServerConfig {
serverFactory.addListener("default", listener);
// 5、配置自定义用户事件
Map<String, org.apache.ftpserver.ftplet.Ftplet> ftpLets = new HashMap<>();
ftpLets.put("ftpService", ftpPlet);
ftpLets.put("ftpService", ftplet);
serverFactory.setFtplets(ftpLets);
// 6、读取用户的配置信息
// 6.2、设置用信息
serverFactory.setUserManager(userManager);
serverFactory.setFileSystem(fileSystemFactory);
// 7、实例化FTP Server
FtpServer server = serverFactory.createServer();
try {

View File

@@ -42,6 +42,8 @@ public class Ftplet extends DefaultFtplet {
return super.onUploadUniqueEnd(session, request);
}
@Override
public FtpletResult onAppendEnd(FtpSession session, FtpRequest request) throws FtpException, IOException {
FtpFile file = session.getFileSystemView().getFile(request.getArgument());

View File

@@ -0,0 +1,167 @@
package com.genersoft.iot.vmp.conf.ftpServer;
import lombok.Setter;
import org.apache.ftpserver.ftplet.FtpFile;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collections;
import java.util.List;
public class VirtualFtpFile implements FtpFile {
@Setter
private String name;
@Setter
private boolean hidden = false;
@Setter
private boolean directory = false;
@Setter
private String ownerName;
private Long lastModified = null;
@Setter
private long size = 0;
@Setter
private OutputStream outputStream;
public static VirtualFtpFile getFile(String name) {
VirtualFtpFile virtualFtpFile = new VirtualFtpFile();
virtualFtpFile.setName(name);
return virtualFtpFile;
}
public static VirtualFtpFile getDir(String name) {
if (name.endsWith("/")) {
name = name.replaceAll("/", "");
}
VirtualFtpFile virtualFtpFile = new VirtualFtpFile();
virtualFtpFile.setName(name);
virtualFtpFile.setDirectory(true);
return virtualFtpFile;
}
@Override
public String getAbsolutePath() {
return FtpFileSystemView.HOME_PATH + "/" + name;
}
@Override
public String getName() {
return name;
}
@Override
public boolean isHidden() {
return hidden;
}
@Override
public boolean isDirectory() {
return directory;
}
@Override
public boolean isFile() {
return !directory;
}
@Override
public boolean doesExist() {
return false;
}
@Override
public boolean isReadable() {
return true;
}
@Override
public boolean isWritable() {
return true;
}
@Override
public boolean isRemovable() {
return true;
}
@Override
public String getOwnerName() {
return ownerName;
}
@Override
public String getGroupName() {
return "root";
}
@Override
public int getLinkCount() {
return 0;
}
@Override
public long getLastModified() {
if (lastModified == null) {
lastModified = System.currentTimeMillis();
}
return lastModified;
}
@Override
public boolean setLastModified(long time) {
lastModified = time;
return true;
}
@Override
public long getSize() {
return size;
}
@Override
public Object getPhysicalFile() {
System.err.println("getPhysicalFile");
return null;
}
@Override
public boolean mkdir() {
return true;
}
@Override
public boolean delete() {
return true;
}
@Override
public boolean move(FtpFile destination) {
this.name = destination.getName();
return true;
}
@Override
public List<? extends FtpFile> listFiles() {
return Collections.emptyList();
}
@Override
public OutputStream createOutputStream(long offset) throws IOException {
System.out.println("createOutputStream++++");
return outputStream;
}
@Override
public InputStream createInputStream(long offset) throws IOException {
System.out.println("createInputStream----");
return null;
}
}

View File

@@ -100,6 +100,7 @@ public class WebSecurityConfig extends WebSecurityConfigurerAdapter {
defaultExcludes.add("/index/hook/**");
defaultExcludes.add("/api/device/query/snap/**");
defaultExcludes.add("/index/hook/abl/**");
defaultExcludes.add("/api/jt1078/playback/download");

View File

@@ -0,0 +1,19 @@
package com.genersoft.iot.vmp.gb28181.bean;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import lombok.Data;
@Data
public class SipSendFailEvent extends SipSubscribe.EventResult<String> {
private String callId;
private String msg;
public static SipSendFailEvent getInstance(String callId, String msg){
SipSendFailEvent sipSendFailEvent = new SipSendFailEvent();
sipSendFailEvent.setMsg(msg);
sipSendFailEvent.setCallId(callId);
return sipSendFailEvent;
}
}

View File

@@ -0,0 +1,41 @@
package com.genersoft.iot.vmp.jt1078.bean;
import com.genersoft.iot.vmp.jt1078.proc.response.J9206;
import lombok.Getter;
import lombok.Setter;
import org.jetbrains.annotations.NotNull;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class JTRecordDownloadCatch implements Delayed {
@Getter
@Setter
private String phoneNumber;
@Getter
@Setter
private String path;
@Getter
@Setter
private J9206 j9206;
/**
* 超时时间(单位: 毫秒)
*/
@Getter
@Setter
private long delayTime;
@Override
public long getDelay(@NotNull TimeUnit unit) {
return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(@NotNull Delayed o) {
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
}

View File

@@ -337,7 +337,67 @@ public class JT1078Controller {
jt1078PlayService.stopPlayback(phoneNumber, channelId);
}
@Operation(summary = "1078-录像-下载", security = @SecurityRequirement(name = JwtUtils.HEADER))
// @Operation(summary = "1078-录像-下载", security = @SecurityRequirement(name = JwtUtils.HEADER))
// @Parameter(name = "phoneNumber", description = "设备手机号", required = true)
// @Parameter(name = "channelId", description = "通道国标编号, 一般为从1开始的数字", required = true)
// @Parameter(name = "startTime", description = "开始时间,格式: yyyy-MM-dd HH:mm:ss", required = true)
// @Parameter(name = "endTime", description = "结束时间,格式: yyyy-MM-dd HH:mm:ss", required = true)
// @Parameter(name = "alarmSign", description = "报警标志", required = true)
// @Parameter(name = "mediaType", description = "音视频资源类型: 0.音视频 1.音频 2.视频 3.视频或音视频", required = true)
// @Parameter(name = "streamType", description = "码流类型0.所有码流 1.主码流 2.子码流(如果此通道只传输音频,此字段置0)", required = true)
// @Parameter(name = "storageType", description = "存储器类型", required = true)
// @GetMapping("/playback/download")
// public DeferredResult<Void> recordDownload(HttpServletRequest request,
// HttpServletResponse response,
// @Parameter(required = true) String phoneNumber,
// @Parameter(required = true) Integer channelId,
// @Parameter(required = true) String startTime,
// @Parameter(required = true) String endTime,
// @Parameter(required = false) Integer alarmSign,
// @Parameter(required = false) Integer mediaType,
// @Parameter(required = false) Integer streamType,
// @Parameter(required = false) Integer storageType
//
// ) throws IOException {
// log.info("[JT-录像] 下载,设备:{} 通道: {} 开始时间: {} 结束时间: {},报警标志: {}, 音视频类型: {} 码流类型: {},存储器类型: {} ",
// phoneNumber, channelId, startTime, endTime, alarmSign, mediaType, streamType, storageType);
// if (!ftpSetting.getEnable()) {
// throw new ControllerException(ErrorCode.ERROR100.getCode(), "未启用ftp服务无法下载录像");
// }
// DeferredResult<Void> result = new DeferredResult<>(600000L);
// ServletOutputStream outputStream = response.getOutputStream();
// response.setContentType(MediaType.APPLICATION_OCTET_STREAM_VALUE);
// response.addHeader("Content-Disposition", "attachment;filename=" + URLEncoder.encode(phoneNumber + "_" + channelId + ".mp4", "UTF-8"));
// response.setStatus(HttpServletResponse.SC_OK);
//
// service.recordDownload(phoneNumber, channelId, startTime, endTime, alarmSign, mediaType, streamType, storageType, outputStream, wvpResult -> {
// String filePath = "ftp" + wvpResult.getData();
// File file = new File(filePath);
// if (!file.exists()) {
// log.warn("[下载录像] 收到通知时未找到录像文件: {}", filePath);
// return;
// }
// try {
// final InputStream in = Files.newInputStream(file.toPath());
// IOUtils.copy(in, outputStream);
// outputStream.flush();
// in.close();
// } catch (IOException e) {
// log.warn("[下载录像] 读取文件异常: {}", filePath, e);
// return;
// } finally {
// try {
// outputStream.close();
// result.setResult(null);
// } catch (IOException ignored) {
// }
// }
// });
// return result;
// }
@Operation(summary = "1078-录像-获取下载地址", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "phoneNumber", description = "设备手机号", required = true)
@Parameter(name = "channelId", description = "通道国标编号, 一般为从1开始的数字", required = true)
@Parameter(name = "startTime", description = "开始时间,格式: yyyy-MM-dd HH:mm:ss", required = true)
@@ -346,53 +406,39 @@ public class JT1078Controller {
@Parameter(name = "mediaType", description = "音视频资源类型: 0.音视频 1.音频 2.视频 3.视频或音视频", required = true)
@Parameter(name = "streamType", description = "码流类型0.所有码流 1.主码流 2.子码流(如果此通道只传输音频,此字段置0)", required = true)
@Parameter(name = "storageType", description = "存储器类型", required = true)
@GetMapping("/playback/download")
public DeferredResult<Void> recordDownload(HttpServletRequest request,
HttpServletResponse response,
@Parameter(required = true) String phoneNumber,
@Parameter(required = true) Integer channelId,
@Parameter(required = true) String startTime,
@Parameter(required = true) String endTime,
@Parameter(required = false) Integer alarmSign,
@Parameter(required = false) Integer mediaType,
@Parameter(required = false) Integer streamType,
@Parameter(required = false) Integer storageType
@GetMapping("/playback/downloadUrl")
public String getRecordTempUrl(HttpServletRequest request,
@Parameter(required = true) String phoneNumber,
@Parameter(required = true) Integer channelId,
@Parameter(required = true) String startTime,
@Parameter(required = true) String endTime,
@Parameter(required = false) Integer alarmSign,
@Parameter(required = false) Integer mediaType,
@Parameter(required = false) Integer streamType,
@Parameter(required = false) Integer storageType
) throws IOException {
){
log.info("[JT-录像] 下载,设备:{} 通道: {} 开始时间: {} 结束时间: {},报警标志: {}, 音视频类型: {} 码流类型: {},存储器类型: {} ",
phoneNumber, channelId, startTime, endTime, alarmSign, mediaType, streamType, storageType);
if (!ftpSetting.getEnable()) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "未启用ftp服务无法下载录像");
}
DeferredResult<Void> result = new DeferredResult<>(600000L);
return service.getRecordTempUrl(phoneNumber, channelId, startTime, endTime, alarmSign, mediaType, streamType, storageType);
}
@Operation(summary = "1078-录像-下载", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "path", description = "临时下载路径", required = true)
@GetMapping("/playback/download")
public void download(HttpServletRequest request, HttpServletResponse response, @Parameter(required = true) String path) throws IOException {
if (!ftpSetting.getEnable()) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "未启用ftp服务无法下载录像");
}
DeferredResult<String> result = new DeferredResult<>();
ServletOutputStream outputStream = response.getOutputStream();
response.setContentType(MediaType.APPLICATION_OCTET_STREAM_VALUE);
response.addHeader("Content-Disposition", "attachment;filename=" + URLEncoder.encode(phoneNumber + "_" + channelId + ".mp4", "UTF-8"));
response.addHeader("Content-Disposition", "attachment;filename=" + URLEncoder.encode(path + ".mp4", "UTF-8"));
response.setStatus(HttpServletResponse.SC_OK);
service.recordDownload(phoneNumber, channelId, startTime, endTime, alarmSign, mediaType, streamType, storageType, wvpResult -> {
String filePath = "ftp" + wvpResult.getData();
File file = new File(filePath);
if (!file.exists()) {
log.warn("[下载录像] 收到通知时未找到录像文件: {}", filePath);
return;
}
try {
final InputStream in = Files.newInputStream(file.toPath());
IOUtils.copy(in, outputStream);
outputStream.flush();
in.close();
} catch (IOException e) {
log.warn("[下载录像] 读取文件异常: {}", filePath, e);
return;
} finally {
try {
outputStream.close();
result.setResult(null);
} catch (IOException ignored) {
}
}
});
return result;
service.recordDownload(path, outputStream);
}
@Operation(summary = "1078-云台控制", security = @SecurityRequirement(name = JwtUtils.HEADER))

View File

@@ -7,6 +7,8 @@ import com.genersoft.iot.vmp.jt1078.proc.request.J1205;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import com.github.pagehelper.PageInfo;
import javax.servlet.ServletOutputStream;
import java.io.OutputStream;
import java.util.List;
public interface Ijt1078Service {
@@ -101,7 +103,7 @@ public interface Ijt1078Service {
void changeStreamType(String phoneNumber, Integer channelId, Integer streamType);
void recordDownload(String phoneNumber, Integer channelId, String startTime, String endTime, Integer alarmSign, Integer mediaType, Integer streamType, Integer storageType, CommonCallback<WVPResult<String>> fileCallback);
void recordDownload(String phoneNumber, Integer channelId, String startTime, String endTime, Integer alarmSign, Integer mediaType, Integer streamType, Integer storageType, OutputStream outputStream, CommonCallback<WVPResult<String>> fileCallback);
PageInfo<JTChannel> getChannelList(int page, int count, int deviceId, String query);
@@ -116,4 +118,8 @@ public interface Ijt1078Service {
void updateDevicePosition(String phoneNumber, Double longitude, Double latitude);
JTChannel getChannelByDbId(Integer id);
String getRecordTempUrl(String phoneNumber, Integer channelId, String startTime, String endTime, Integer alarmSign, Integer mediaType, Integer streamType, Integer storageType);
void recordDownload(String filePath, ServletOutputStream outputStream);
}

View File

@@ -444,7 +444,6 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
@Override
public void playbackControl(String phoneNumber, Integer channelId, Integer command, Integer playbackSpeed, String time) {
long l = System.currentTimeMillis();
String playKey = VideoManagerConstants.INVITE_INFO_1078_PLAYBACK + phoneNumber + ":" + channelId;
dynamicTask.stop(playKey);
if (command == 2) {
@@ -471,8 +470,6 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
log.info("[JT-回放控制] phoneNumber {} channelId {} command {} playbackSpeed {} time {}",
phoneNumber, channelId, command, playbackSpeed, time);
}
System.out.println("清理回调 " + (System.currentTimeMillis() - l));
l = System.currentTimeMillis();
// 发送停止命令
J9202 j9202 = new J9202();
j9202.setChannel(channelId);
@@ -486,7 +483,6 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
j9202.setPlaybackTime(DateUtil.yyyy_MM_dd_HH_mm_ssTo1078(time));
}
jt1078Template.controlBackLive(phoneNumber, j9202, 4);
System.out.println("发送指令 " + (System.currentTimeMillis() - l));
}
@Override

View File

@@ -4,10 +4,14 @@ import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.ftpServer.FtpFileSystemFactory;
import com.genersoft.iot.vmp.conf.ftpServer.FtpFileSystemView;
import com.genersoft.iot.vmp.conf.ftpServer.FtpSetting;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.event.sip.MessageEvent;
import com.genersoft.iot.vmp.gb28181.service.IGbChannelService;
import com.genersoft.iot.vmp.gb28181.task.deviceStatus.DeviceStatusTask;
import com.genersoft.iot.vmp.jt1078.bean.*;
import com.genersoft.iot.vmp.jt1078.bean.common.ConfigAttribute;
import com.genersoft.iot.vmp.jt1078.cmd.JT1078Template;
@@ -16,6 +20,7 @@ import com.genersoft.iot.vmp.jt1078.dao.JTTerminalMapper;
import com.genersoft.iot.vmp.jt1078.event.FtpUploadEvent;
import com.genersoft.iot.vmp.jt1078.proc.response.*;
import com.genersoft.iot.vmp.jt1078.service.Ijt1078Service;
import com.genersoft.iot.vmp.jt1078.session.DownloadManager;
import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
import com.genersoft.iot.vmp.utils.DateUtil;
@@ -32,10 +37,16 @@ import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert;
import org.springframework.web.context.request.async.DeferredResult;
import javax.servlet.ServletOutputStream;
import java.io.OutputStream;
import java.lang.reflect.Field;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
@Service
@Slf4j
@@ -65,6 +76,12 @@ public class jt1078ServiceImpl implements Ijt1078Service {
@Autowired
private FtpSetting ftpSetting;
@Autowired
private FtpFileSystemFactory fileSystemFactory;
@Autowired
private DownloadManager downloadManager;
/**
* 流到来的处理
*/
@@ -148,31 +165,14 @@ public class jt1078ServiceImpl implements Ijt1078Service {
jtDeviceMapper.updateDeviceStatus(connected, phoneNumber);
}
private Map<String, CommonCallback<WVPResult<String>>> fileUploadMap = new ConcurrentHashMap<>();
@EventListener
public void onApplicationEvent(FtpUploadEvent event) {
if (fileUploadMap.isEmpty()) {
return;
}
fileUploadMap.keySet().forEach(key -> {
if (!event.getFileName().contains(key)) {
return;
}
CommonCallback<WVPResult<String>> callback = fileUploadMap.get(key);
if (callback != null) {
callback.run(new WVPResult<>(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), event.getFileName()));
fileUploadMap.remove(key);
}
});
}
@Override
public void recordDownload(String phoneNumber, Integer channelId, String startTime, String endTime, Integer alarmSign, Integer mediaType, Integer streamType, Integer storageType, CommonCallback<WVPResult<String>> fileCallback) {
public void recordDownload(String phoneNumber, Integer channelId, String startTime, String endTime, Integer alarmSign,
Integer mediaType, Integer streamType, Integer storageType, OutputStream outputStream, CommonCallback<WVPResult<String>> fileCallback) {
String filePath = UUID.randomUUID().toString();
fileUploadMap.put(filePath, fileCallback);
fileSystemFactory.addOutputStream(filePath, outputStream);
dynamicTask.startDelay(filePath, ()->{
fileUploadMap.remove(filePath);
fileSystemFactory.removeOutputStream(filePath);
}, 2*60*60*1000);
log.info("[JT-录像] 下载,设备:{} 通道: {} 开始时间: {} 结束时间: {},等待上传文件路径: {} ",
phoneNumber, channelId, startTime, endTime, filePath);
@@ -685,4 +685,51 @@ public class jt1078ServiceImpl implements Ijt1078Service {
public JTChannel getChannelByDbId(Integer id) {
return jtChannelMapper.selectChannelById(id);
}
@Override
public String getRecordTempUrl(String phoneNumber, Integer channelId, String startTime, String endTime, Integer alarmSign, Integer mediaType, Integer streamType, Integer storageType) {
String filePath = UUID.randomUUID().toString();
log.info("[JT-录像] 下载,设备:{} 通道: {} 开始时间: {} 结束时间: {},等待上传文件路径: {} ",
phoneNumber, channelId, startTime, endTime, filePath);
// 发送停止命令
J9206 j9206 = new J9206();
j9206.setChannelId(channelId);
j9206.setStartTime(DateUtil.yyyy_MM_dd_HH_mm_ssTo1078(startTime));
j9206.setEndTime(DateUtil.yyyy_MM_dd_HH_mm_ssTo1078(endTime));
j9206.setServerIp(ftpSetting.getIp());
j9206.setPort(ftpSetting.getPort());
j9206.setUsername(ftpSetting.getUsername());
j9206.setPassword(ftpSetting.getPassword());
j9206.setPath(filePath);
if (mediaType != null) {
j9206.setMediaType(mediaType);
}
if (streamType != null) {
j9206.setStreamType(streamType);
}
if (storageType != null) {
j9206.setStorageType(storageType);
}
if (alarmSign != null) {
j9206.setAlarmSign(alarmSign);
}
downloadManager.addCatch(filePath, phoneNumber, j9206);
return filePath;
}
@Override
public void recordDownload(String filePath, ServletOutputStream outputStream) {
JTRecordDownloadCatch downloadCatch = downloadManager.getCatch(filePath);
Assert.notNull(downloadCatch, "地址不存在");
fileSystemFactory.addOutputStream(filePath, outputStream);
jt1078Template.fileUpload(downloadCatch.getPhoneNumber(), downloadCatch.getJ9206(), 7200);
downloadManager.runDownload(filePath, 2 * 60 * 60);
fileSystemFactory.removeOutputStream(filePath);
}
}

View File

@@ -0,0 +1,108 @@
package com.genersoft.iot.vmp.jt1078.session;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.jt1078.bean.JTRecordDownloadCatch;
import com.genersoft.iot.vmp.jt1078.event.FtpUploadEvent;
import com.genersoft.iot.vmp.jt1078.proc.response.J9206;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
@Slf4j
@Component
public class DownloadManager {
private final Map<String, JTRecordDownloadCatch> downloadCatchMap = new ConcurrentHashMap<>();
private final DelayQueue<JTRecordDownloadCatch> downloadCatchQueue = new DelayQueue<>();
private final Map<String, SynchronousQueue<Object>> topicSubscribers = new ConcurrentHashMap<>();
// 下载过期检查
@Scheduled(fixedDelay = 1, timeUnit = TimeUnit.SECONDS)
public void downloadCatchCheck(){
while (!downloadCatchQueue.isEmpty()) {
try {
JTRecordDownloadCatch take = downloadCatchQueue.take();
downloadCatchMap.remove(take.getPath());
} catch (InterruptedException e) {
log.error("[下载过期] ", e);
}
}
}
public void addCatch(String path, String phoneNumber, J9206 j9206) {
JTRecordDownloadCatch downloadCatch = new JTRecordDownloadCatch();
downloadCatch.setPhoneNumber(phoneNumber);
downloadCatch.setPath(path);
downloadCatch.setJ9206(j9206);
// 10分钟临时地址无法访问则删除
downloadCatch.setDelayTime(System.currentTimeMillis() + 10 * 60 * 1000L);
downloadCatchMap.put(path, downloadCatch);
downloadCatchQueue.add(downloadCatch);
}
public JTRecordDownloadCatch getCatch(String path) {
return downloadCatchMap.get(path);
}
@EventListener
public void onApplicationEvent(FtpUploadEvent event) {
if (topicSubscribers.isEmpty()) {
return;
}
topicSubscribers.keySet().forEach(key -> {
if (!event.getFileName().contains(key)) {
return;
}
SynchronousQueue<Object> synchronousQueue = topicSubscribers.get(key);
if (synchronousQueue != null) {
synchronousQueue.offer(null);
}
});
}
public Object runDownload(String path, long timeOut) {
SynchronousQueue<Object> subscribe = subscribe(path);
if (subscribe == null) {
log.error("[JT-下载] 暂停进程失败");
return null;
}
try {
return subscribe.poll(timeOut, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.warn("[JT-下载] 暂停进程超时", e);
} finally {
this.unsubscribe(path);
JTRecordDownloadCatch downloadCatch = getCatch(path);
if (downloadCatch != null) {
downloadCatchMap.remove(path);
downloadCatchQueue.remove(downloadCatch);
}
}
return null;
}
private SynchronousQueue<Object> subscribe(String key) {
SynchronousQueue<Object> queue = null;
if (!topicSubscribers.containsKey(key))
topicSubscribers.put(key, queue = new SynchronousQueue<>());
return queue;
}
private void unsubscribe(String key) {
topicSubscribers.remove(key);
}
}

View File

@@ -45,9 +45,7 @@ public class Session {
public void writeObject(Object message) {
log.info("<<<<<<<<<< cmd{},{}", this, message);
System.out.println(message);
ChannelFuture channelFuture = channel.writeAndFlush(message);
System.out.println(222);
channel.writeAndFlush(message);
}
/**

View File

@@ -58,7 +58,6 @@ public enum SessionManager {
*/
protected void put(Object clientId, Session newSession) {
sessionMap.put(clientId, newSession);
System.out.println(sessionMap.size());
}
@@ -79,7 +78,6 @@ public enum SessionManager {
return null;
}
String requestKey = requestKey(cmd.getPhoneNumber(), cmd.getRespId(), cmd.getPackageNo());
System.out.println("requestKey==" + requestKey);
SynchronousQueue<Object> subscribe = subscribe(requestKey);
if (subscribe == null) {
log.error("DevId: {} key:{} send repaid", cmd.getPhoneNumber(), requestKey);
@@ -103,7 +101,6 @@ public enum SessionManager {
if (responseNo == null) {
for (String key : topicSubscribers.keySet()) {
if (key.startsWith(requestKey)) {
System.out.println(key);
SynchronousQueue<Object> queue = topicSubscribers.get(key);
if (queue != null) {
result = true;

View File

@@ -106,8 +106,6 @@ public class ABLRESTfulUtils {
responseJSON = JSON.parseObject(responseStr);
}
}else {
System.out.println( 2222);
System.out.println( response.code());
response.close();
Objects.requireNonNull(response.body()).close();
}
@@ -388,8 +386,6 @@ public class ABLRESTfulUtils {
param.put("stream", stream);
param.put("starttime", startTime);
param.put("endtime", endTime);
System.out.println("starttime: "+ startTime);
System.out.println("endtime: "+ endTime);
return sendPost(mediaServer,"queryRecordList", param, null);
}