I am implementing an elastic pool for my spring boot project also I am using spring boot 2.1.4 and elastic search 7.3.0. I am stuck at this. When any API trying to query it gives java.net.ConnectException: Connection refused
. I want to use configuration with customizeHttpClient
with a setting thread count. So It makes only one connection when application will start and querying to the database using that one connection only till bean will destroy.
I tried with this Elastic Configuration:
import java.io.IOException; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import org.apache.http.impl.nio.reactor.IOReactorConfig; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback; import org.elasticsearch.client.RestHighLevelClient; import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; @Component public class ElasticConfig{ public static String host; private static String port; private static String protocol; private static String username; private static String password; private RestHighLevelClient client; @Value("${dselastic.host}") public void setHost(String value) { host = value; } @Value("${dselastic.port}") public void setPort(String value) { port = value; } @Value("${dselastic.protocol}") public void setProtocol(String value) { protocol = value; } @Value("${dselastic.username}") public void setUsername(String value) { username = value; } @Value("${dselastic.password}") public void setPassword(String value) { password = value; } @Bean(destroyMethod = "cleanUp") @Scope(value = ConfigurableBeanFactory.SCOPE_SINGLETON) public void prepareConnection() { RestClientBuilder restBuilder = RestClient.builder(new HttpHost(host, Integer.valueOf(port), protocol)); if (username != null & password != null) { final CredentialsProvider creadential = new BasicCredentialsProvider(); creadential.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password)); restBuilder.setHttpClientConfigCallback(new HttpClientConfigCallback() { @Override public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) { return httpClientBuilder.setDefaultCredentialsProvider(creadential) .setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(1).build()); } }); restBuilder.setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder.setConnectTimeout(10000) // time until a connection with the server is established. .setSocketTimeout(60000) // time of inactivity to wait for packets[data] to receive. .setConnectionRequestTimeout(0)); // time to fetch a connection from the connection pool 0 for infinite. client = new RestHighLevelClient(restBuilder); } } public void cleanUp() { try { client.close(); } catch (IOException e) { e.printStackTrace(); } } }
I also tried by implementing DisposableBean
interface and its destroy
method but I got same exception.
This is my API where I am trying to querying a document:
public class IndexNameController { @Autowired RestHighLevelClient client; @GetMapping(value = "/listAllNames") public ArrayList<Object> listAllNames(HttpSession session) { ArrayList<Object> results = new ArrayList<>(); try { SearchRequest searchRequest = new SearchRequest(); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchRequest.indices("indexname"); String[] fields = { "name", "id" }; searchSourceBuilder.fetchSource(fields, new String[] {}); searchSourceBuilder.query(QueryBuilders.matchAllQuery()).size(10000); searchSourceBuilder = searchSourceBuilder.sort(new FieldSortBuilder("createdTime").order(SortOrder.DESC)); searchRequest.source(searchSourceBuilder); SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); SearchHit[] searchHits = searchResponse.getHits().getHits(); for (SearchHit searchHit : searchHits) { Map<String, Object> map = new HashMap<>(); map.put("value", searchHit.getSourceAsMap().get("id")); map.put("name", searchHit.getSourceAsMap().get("name")); results.add(map); } return results; } catch (Exception e) { e.printStackTrace(); } return new ArrayList<>(); } }
When it’s trying to query it gives exception at client.search()
. This is the Stack Trace:
java.net.ConnectException: Connection refused at org.elasticsearch.client.RestClient.extractAndWrapCause(RestClient.java:788) at org.elasticsearch.client.RestClient.performRequest(RestClient.java:218) at org.elasticsearch.client.RestClient.performRequest(RestClient.java:205) at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1454) at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1424) at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1394) at org.elasticsearch.client.RestHighLevelClient.search(RestHighLevelClient.java:930) at com.incident.response.controller.IncidentController.listAllIncidents(IncidentController.java:569) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:189) at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:138) at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:102) at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:892) at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:797) at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1038) at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:942) at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1005) at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:897) at javax.servlet.http.HttpServlet.service(HttpServlet.java:634) at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:882) at javax.servlet.http.HttpServlet.service(HttpServlet.java:741) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:92) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:93) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:200) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:200) at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96) at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:490) at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:139) at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74) at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343) at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:408) at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66) at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:834) at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1415) at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) at java.lang.Thread.run(Thread.java:748) Caused by: java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvent(DefaultConnectingIOReactor.java:171) at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvents(DefaultConnectingIOReactor.java:145) at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor.execute(AbstractMultiworkerIOReactor.java:351) at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.execute(PoolingNHttpClientConnectionManager.java:221) at org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase$1.run(CloseableHttpAsyncClientBase.java:64) ... 1 more
Please help me to get rid from this. All help and suggestions will be appriciate.
Advertisement
Answer
- I researched a lot that how to get rid of this and I tried many solutions after that finally I’ll get the solution all things are working as I want.
- I am trying to implement an elastic pool that one client used by the the whole project for any query or aggregation and when bean will destroy the will close and my whole query is done by using only by one connection.
- I changed my Elastic configuration like this:
@Configuration public class ElasticConfig { @Autowired Environment environment; private RestHighLevelClient client; @Bean public RestHighLevelClient prepareConnection() { RestClientBuilder restBuilder = RestClient .builder(new HttpHost(environment.getProperty("zselastic.host").toString(), Integer.valueOf(environment.getProperty("zselastic.port").toString()), environment.getProperty("zselastic.protocol").toString())); String username = new String(environment.getProperty("zselastic.username").toString()); String password = new String(environment.getProperty("zselastic.password").toString()); if (username != null & password != null) { final CredentialsProvider creadential = new BasicCredentialsProvider(); creadential.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password)); restBuilder.setHttpClientConfigCallback(new HttpClientConfigCallback() { @Override public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) { return httpClientBuilder.setDefaultCredentialsProvider(creadential) .setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(1).build()); } }); restBuilder.setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder.setConnectTimeout(10000) // time until a connection with the server is established. .setSocketTimeout(60000) // time of inactivity to wait for packets[data] to receive. .setConnectionRequestTimeout(0)); // time to fetch a connection from the connection pool 0 for infinite. client = new RestHighLevelClient(restBuilder); return client; } return null; } /* * it gets called when bean instance is getting removed from the context if * scope is not a prototype */ /* * If there is a method named shutdown or close then spring container will try * to automatically configure them as callback methods when bean is being * destroyed */ @PreDestroy public void clientClose() { try { this.client.close(); } catch (IOException e) { e.printStackTrace(); } } }
- So now my bean return the
RestHighLevelClient
and it will be use by whole project and I got every API repsponse instead ofjava.net.ConnectException: Connection refused
. - Also, I check node stats by using this
http://host:port/_nodes/stats/http
. When my Spring boot application starts it will initiate one connection to the elasticsearch and one entry will be added intocurrent_open
. After that, no connection will increase until my application is running all the queries, and aggregation is being performed by using this connection only. When my application will shutdown or stop connection will be close and entry removed from thecurrent_open
. - So now I can conclude that I applied elastic pool by using this configuration.