When I try to create a stream on the local NATS server get the error. The connection established successfully but on jsm.addStream(conf)
request broke.
I’m using maven artifact:
<dependency> <groupId>io.nats</groupId> <artifactId>jnats</artifactId> <version>2.12.0</version> </dependency>
Method connection:
private Connection connection; @SneakyThrows public void openConnection() { connection = Nats.connect(); } @SneakyThrows public void openStream(String name, String sub) { JetStreamManagement jsm = connection.jetStreamManagement(); StreamConfiguration conf = StreamConfiguration.builder() .name(name) .subjects(sub) .storageType(StorageType.Memory) .build(); StreamInfo streamInfo = jsm.addStream(conf); JsonUtils.printFormatted(streamInfo); }
Call:
@ResponseStatus(value = HttpStatus.OK) @GetMapping("/connect") public void connect() { natsProducer.openConnection(); natsProducer.openStream("some-name", "com.demosub"); }
NATS running inside docker conainer on localhost
:
C:Userspavel>docker pull nats:latest latest: Pulling from library/nats d16cac695b49: Pull complete c945b005f9f3: Pull complete Digest: sha256:382506a4a72d887560f91eee19d3536c3694aa1750c7f9a172bcc09ebcac6273 Status: Downloaded newer image for nats:latest docker.io/library/nats:latest C:Userspavel>docker run -p 4222:4222 -ti nats:latest [1] 2021/09/25 12:04:15.674548 [INF] Starting nats-server [1] 2021/09/25 12:04:15.674585 [INF] Version: 2.6.1 [1] 2021/09/25 12:04:15.674604 [INF] Git: [c91f0fe] [1] 2021/09/25 12:04:15.674622 [INF] Name: NBJL25OYJYSFE6WOS4F3ZBXUQFVJFOAVGWH2XTBYLUPOHGKVLZI3TQJ4 [1] 2021/09/25 12:04:15.674644 [INF] ID: NBJL25OYJYSFE6WOS4F3ZBXUQFVJFOAVGWH2XTBYLUPOHGKVLZI3TQJ4 [1] 2021/09/25 12:04:15.674670 [INF] Using configuration file: nats-server.conf [1] 2021/09/25 12:04:15.675538 [INF] Starting http monitor on 0.0.0.0:8222 [1] 2021/09/25 12:04:15.675631 [INF] Listening for client connections on 0.0.0.0:4222 [1] 2021/09/25 12:04:15.675871 [INF] Server is ready [1] 2021/09/25 12:04:15.675904 [INF] Cluster name is tM3uBDWH0KKEqa2G2pVFv3 [1] 2021/09/25 12:04:15.675944 [WRN] Cluster name was dynamically generated, consider setting one [1] 2021/09/25 12:04:15.675980 [INF] Listening for route connections on 0.0.0.0:6222
But when I call endpoint get the error:
java.io.IOException: Timeout or no response waiting for NATS JetStream server at io.nats.client.impl.NatsJetStreamImplBase.responseRequired(NatsJetStreamImplBase.java:84) ~[jnats-2.12.0.jar:2.12.0] at io.nats.client.impl.NatsJetStreamImplBase.makeRequestResponseRequired(NatsJetStreamImplBase.java:68) ~[jnats-2.12.0.jar:2.12.0] at io.nats.client.impl.NatsJetStreamManagement.addOrUpdateStream(NatsJetStreamManagement.java:66) ~[jnats-2.12.0.jar:2.12.0] at io.nats.client.impl.NatsJetStreamManagement.addStream(NatsJetStreamManagement.java:47) ~[jnats-2.12.0.jar:2.12.0] at com.example.natsdemo.NatsProducer.openStream(NatsProducer.java:36) ~[classes/:na] at com.example.natsdemo.NatsController.connect(NatsController.java:25) ~[classes/:na] at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na] at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:78) ~[na:na] at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na] at java.base/java.lang.reflect.Method.invoke(Method.java:567) ~[na:na] at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205) ~[spring-web-5.3.10.jar:5.3.10] at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:150) ~[spring-web-5.3.10.jar:5.3.10] at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:117) ~[spring-webmvc-5.3.10.jar:5.3.10] at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:895) ~[spring-webmvc-5.3.10.jar:5.3.10] at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808) ~[spring-webmvc-5.3.10.jar:5.3.10] at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-5.3.10.jar:5.3.10] at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1067) ~[spring-webmvc-5.3.10.jar:5.3.10] at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:963) ~[spring-webmvc-5.3.10.jar:5.3.10] at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006) ~[spring-webmvc-5.3.10.jar:5.3.10] at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:898) ~[spring-webmvc-5.3.10.jar:5.3.10] at javax.servlet.http.HttpServlet.service(HttpServlet.java:655) ~[tomcat-embed-core-9.0.53.jar:4.0.FR] at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883) ~[spring-webmvc-5.3.10.jar:5.3.10] at javax.servlet.http.HttpServlet.service(HttpServlet.java:764) ~[tomcat-embed-core-9.0.53.jar:4.0.FR] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:227) ~[tomcat-embed-core-9.0.53.jar:9.0.53] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.53.jar:9.0.53] at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) ~[tomcat-embed-websocket-9.0.53.jar:9.0.53] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.53.jar:9.0.53] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.53.jar:9.0.53] at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) ~[spring-web-5.3.10.jar:5.3.10] at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.10.jar:5.3.10] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.53.jar:9.0.53] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.53.jar:9.0.53] at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) ~[spring-web-5.3.10.jar:5.3.10] at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.10.jar:5.3.10] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.53.jar:9.0.53] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.53.jar:9.0.53] at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) ~[spring-web-5.3.10.jar:5.3.10] at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.10.jar:5.3.10] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.53.jar:9.0.53] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.53.jar:9.0.53] at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:197) ~[tomcat-embed-core-9.0.53.jar:9.0.53] at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97) ~[tomcat-embed-core-9.0.53.jar:9.0.53] at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:540) ~[tomcat-embed-core-9.0.53.jar:9.0.53] at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:135) ~[tomcat-embed-core-9.0.53.jar:9.0.53] at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) ~[tomcat-embed-core-9.0.53.jar:9.0.53] at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78) ~[tomcat-embed-core-9.0.53.jar:9.0.53] at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:357) ~[tomcat-embed-core-9.0.53.jar:9.0.53] at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:382) ~[tomcat-embed-core-9.0.53.jar:9.0.53] at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65) ~[tomcat-embed-core-9.0.53.jar:9.0.53] at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:893) ~[tomcat-embed-core-9.0.53.jar:9.0.53] at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1726) ~[tomcat-embed-core-9.0.53.jar:9.0.53] at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) ~[tomcat-embed-core-9.0.53.jar:9.0.53] at org.apache.tomcat.util.threads.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1191) ~[tomcat-embed-core-9.0.53.jar:9.0.53] at org.apache.tomcat.util.threads.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:659) ~[tomcat-embed-core-9.0.53.jar:9.0.53] at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) ~[tomcat-embed-core-9.0.53.jar:9.0.53] at java.base/java.lang.Thread.run(Thread.java:831) ~[na:na]
What is the problem in my solution?
Advertisement
Answer
The problem is the way to open the stream. Correct solution:
Options options = new Options.Builder() .server(url) .connectionListener((conn, type) -> { System.out.println(type); }).build(); connection = Nats.connect(options);
And send async:
@SneakyThrows void pushAsync(@NotNull String sub, @NotNull String msg) { JetStream stream = connection.jetStream(); CompletableFuture<PublishAck> future = stream .publishAsync(sub, msg.getBytes(StandardCharsets.UTF_8)); future.thenApplyAsync(ack -> { log.debug(ack); return ack; }); }