I want to bulk upload the csv file into Elasticsearch using JAVA API (without using logstash).
Elasticsearch version – 6.6
I have tried the below program using Jackson format to get source Map for IndexRequest. Because I can’t predefined the POJO variables. So I used dynamic Map from CSV file
import java.io.FileInputStream; import java.io.IOException; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.logging.Logger; import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONException; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.MappingIterator; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.csv.CsvMapper; import com.fasterxml.jackson.dataformat.csv.CsvSchema; import com.zoho.dedupe.connection.DukeESConnection; public class BulkImport { private static Logger logger = Logger.getLogger(BulkImport.class.getName()); public static void main(String args[]) { long starttime = System.currentTimeMillis(); logger.info("ElasticSearchServiceImpl => bulkInsert Service Started"); FileInputStream fis = null; BulkRequest request; RestHighLevelClient client; //elastic Search Index Name String esIndex = "post"; try { boolean isHeaderSet = false; Set<String> header = new HashSet<String>(); fis = new FileInputStream("/Users/test/Documents/Test.csv"); request = new BulkRequest(); MappingIterator<Map<String, Object>> data = parse(fis); while (data.hasNext()) { Map<?,?> value = data.next(); if(!isHeaderSet) { header.add("id"); header = (Set<String>) value.keySet(); isHeaderSet= true; } System.out.println(value); request.add(getIndexRequest(value, esIndex)); } fis.close(); if(request.numberOfActions()>0) { String hostsInString = "localhost"; List<HttpHost> httpHosts = new ArrayList<HttpHost> ( ); String[] hosts = hostsInString.split (","); for (String host : hosts) { HttpHost httpHost = new HttpHost (host, 9200, "http"); httpHosts.add (httpHost); } client = client = new RestHighLevelClient (RestClient.builder ( httpHosts.toArray(new HttpHost[]{})).setMaxRetryTimeoutMillis (10 * 60000).setRequestConfigCallback( new RestClientBuilder.RequestConfigCallback() { @Override public RequestConfig.Builder customizeRequestConfig( RequestConfig.Builder requestConfigBuilder) { return requestConfigBuilder .setConnectTimeout (60000) .setSocketTimeout (10 * 60000); } })); CreateIndexRequest crrequest = new CreateIndexRequest(esIndex); Map<String, Object> jsonMap = new HashMap<>(); Map<String, Object> message = new HashMap<>(); message.put("type", "text"); Map<String, Object> keyword = new HashMap<>(); Map<String, Object> type = new HashMap<>(); type.put("type", "keyword"); type.put("ignore_above", 256); keyword.put("keyword", type); message.put("fields", keyword); Map<String, Object> properties = new HashMap<>(); for (Object hdr :header) { properties.put(hdr.toString(), message); } Map<String, Object> mapping = new HashMap<>(); mapping.put("properties", properties); jsonMap.put("_doc", mapping); crrequest.mapping("_doc", jsonMap); CreateIndexResponse createIndexResponse = client.indices().create(crrequest, RequestOptions.DEFAULT); boolean acknowledged = createIndexResponse.isAcknowledged(); System.out.println(acknowledged); BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT); if(bulkResponse.hasFailures()) { logger.info("ElasticSearchServiceImpl => bulkInsert : Some of the record has failed.Please reinitiate the process"); } else { logger.info("ElasticSearchServiceImpl => bulkInsert : Success"); } } else { logger.info("ElasticSearchServiceImpl => bulkInsert : No request for BulkInsert ="+request.numberOfActions()); } } catch (Exception e) { logger.info("ElasticSearchServiceImpl => bulkInsert : Exception =" + e.getMessage()); e.printStackTrace(); } long endTime = System.currentTimeMillis(); logger.info("ElasticSearchServiceImpl => bulkInsert End " + (endTime - starttime)); } public static MappingIterator<Map<String, Object>> parse(FileInputStream input) throws Exception { MappingIterator<Map<String, Object>> map = readObjectsFromCsv(input); return map; //writeAsJson(data); } public static MappingIterator<Map<String, Object>> readObjectsFromCsv(FileInputStream file) throws IOException { CsvSchema bootstrap = CsvSchema.emptySchema().withHeader().withColumnSeparator(','); CsvMapper csvMapper = new CsvMapper(); MappingIterator<Map<String, Object>> mappingIterator = csvMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES).reader(Map.class).with(bootstrap).readValues(file); // System.out.println("Column names: " + mappingIterator.next().keySet()); return mappingIterator; } public static void writeAsJson(List<Map<?, ?>> data) throws IOException, JSONException { ObjectMapper mapper = new ObjectMapper(); String value = mapper.writeValueAsString(data); JSONArray json = new JSONArray(value); System.out.println(json); } public static IndexRequest getIndexRequest(Map data,String index)throws Exception { IndexRequest indexRequest = null; indexRequest = new IndexRequest(index).id(UUID.randomUUID().toString()).source(data); System.out.println(indexRequest.toString()); return indexRequest; } }
I got the below exception while running the program
{Document Name=dhjajga, Title=sdas, Name=asd, DOB=14-43-22} index {[post][null][c2148857-87e0-4407-b5f5-b4f5f52c40d2], source[{"Document Name":"dhjajga","Title":"sdas","Name":"asd","DOB":"14-43-22"}]} Jun 11, 2020 4:06:18 PM com.zoho.dedupe.connection.DukeESConnection connect INFO: Client org.elasticsearch.client.RestHighLevelClient@7c51f34b true Jun 11, 2020 4:06:18 PM BulkImport main INFO: ElasticSearchServiceImpl => bulkInsert : Exception =Validation Failed: 1: type is missing;2: type is missing;3: type is missing; org.elasticsearch.action.ActionRequestValidationException: Validation Failed: 1: type is missing;2: type is missing;3: type is missing; at org.elasticsearch.action.bulk.BulkRequest.validate(BulkRequest.java:612) at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1728) at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1694) at org.elasticsearch.client.RestHighLevelClient.bulk(RestHighLevelClient.java:470) at BulkImport.main(BulkImport.java:85) Jun 11, 2020 4:06:18 PM BulkImport main INFO: ElasticSearchServiceImpl => bulkInsert End 1432
When I try to insert the same above indexrequest its working fine.
curl -X POST "localhost:9200/post/_doc/?pretty" -H 'Content-Type: application/json' -d' { "Document Name":"dhjajga","Title":"sdas","Name":"asd","DOB":"14-43-22" } ' { "_index" : "post", "_type" : "_doc", "_id" : "jBPronIB0Wb3XTTasBjG", "_version" : 1, "result" : "created", "_shards" : { "total" : 2, "successful" : 1, "failed" : 0 }, "_seq_no" : 0, "_primary_term" : 1 }
Please help to fix the issue in java program. Thanks in advance
Advertisement
Answer
Before Elasticsearch version 7 you have to specify a type with your Indexrequest. It is recommended to use the type “_doc”.