I currently have an API in SpringBoot and I would like to add an MQTT client to subscribe to one or more topics. I tried several Paho,Hive clients, without success, I’m currently on the default MQTT of SpringBoot which uses Paho but I can’t get it to work even with the basic configuration. I get a “Connection Lost” error as soon as I launch the application… Can you tell me a fix or something else that would work. Thanks!
Maven :
<dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency>
.... import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; import lombok.extern.slf4j.Slf4j; import springfox.documentation.swagger2.annotations.EnableSwagger2; @Slf4j @SpringBootApplication @EnableSwagger2 public class MainApiSpring { public static void main(String[] args) { SpringApplication.run(MainApiSpring.class, args); log.trace("L'application a correctement été démarrée."); } @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } @Bean public MessageProducer inbound() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", "test/topic"); adapter.setCompletionTimeout(5000); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(1); adapter.setOutputChannel(mqttInputChannel()); return adapter; } @Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { System.out.println(message.getPayload()); } }; } }
The error on run :
2020-09-04 10:31:39.099 ERROR 4244 --- [ main] .m.i.MqttPahoMessageDrivenChannelAdapter : Exception while connecting and subscribing, retrying org.eclipse.paho.client.mqttv3.MqttException: Connexion perdue at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:197) ~[org.eclipse.paho.client.mqttv3-1.2.4.jar:na] at java.base/java.lang.Thread.run(Thread.java:830) ~[na:na] Caused by: java.io.EOFException: null at java.base/java.io.DataInputStream.readByte(DataInputStream.java:272) ~[na:na] at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:92) ~[org.eclipse.paho.client.mqttv3-1.2.4.jar:na] at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:137) ~[org.eclipse.paho.client.mqttv3-1.2.4.jar:na] ... 1 common frames omitted
Advertisement
Answer
Answer : This works with MqttOptions defined !
@Bean public MqttConnectOptions getReceiverMqttConnectOptions() { MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); mqttConnectOptions.setCleanSession(true); mqttConnectOptions.setConnectionTimeout(30); mqttConnectOptions.setKeepAliveInterval(60); mqttConnectOptions.setAutomaticReconnect(true); // mqttConnectOptions.setUserName("myemail"); String password = "mypassword!"; // String hostUrl = "tcp://maqiatto.com:1883"; String hostUrl = "tcp://localhost:1883"; // mqttConnectOptions.setPassword(password.toCharArray()); mqttConnectOptions.setServerURIs(new String[] { hostUrl }); return mqttConnectOptions; } @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(getReceiverMqttConnectOptions()); return factory; } @Bean public MessageProducer inbound() { String clientId2 = "uuid-" + UUID.randomUUID().toString(); MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId2, // mqttClientFactory(), "myemail/test"); mqttClientFactory(), "test", "test/paho"); adapter.setCompletionTimeout(20000); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(2); adapter.setOutputChannel(mqttInputChannel()); return adapter; }