diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0107.java b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0107.java index 0e7b1b89b..b97aadd79 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0107.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0107.java @@ -27,6 +27,7 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.nio.charset.Charset; import java.util.HashMap; +import java.util.List; import java.util.Map; /** @@ -89,7 +90,14 @@ public class J0107 extends Re { deviceAttribute.setGnssAttribute(JGnssAttribute.getInstance(buf.getUnsignedByte(79 + n + m))); deviceAttribute.setCommunicationModuleAttribute(JCommunicationModuleAttribute.getInstance(buf.getUnsignedByte(80 + n + m))); System.out.println(deviceAttribute); - SessionManager.INSTANCE.response(header.getTerminalId(), "0107", (long) respNo, deviceAttribute); + List allRequestKey = SessionManager.INSTANCE.getAllRequestKey(); + String prefix = String.join("_", header.getTerminalId().replaceFirst("^0*", ""), "0107"); + for (String key : allRequestKey) { + if (key.startsWith(prefix)) { + SessionManager.INSTANCE.response(key, deviceAttribute); + } + } + return null; } diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/session/SessionManager.java b/src/main/java/com/genersoft/iot/vmp/jt1078/session/SessionManager.java index 02ecd04c7..91c359ff2 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/session/SessionManager.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/session/SessionManager.java @@ -5,6 +5,8 @@ import io.netty.channel.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.SynchronousQueue; @@ -109,6 +111,23 @@ public enum SessionManager { return false; } + public Boolean response(String key, Object data) { + SynchronousQueue queue = topicSubscribers.get(key); + if (queue != null) { + try { + return queue.offer(data, 2, TimeUnit.SECONDS); + } catch (InterruptedException e) { + log.error("{}", e.getMessage(), e); + } + } + log.warn("Not find response,key:{} data:{} ", key, data); + return false; + } + + public List getAllRequestKey() { + return new ArrayList<>(topicSubscribers.keySet()); + } + private void unsubscribe(String key) { topicSubscribers.remove(key); }