Bulk upload CSV file into Elasticsearch using JavaAPI

Tags: , , ,



I want to bulk upload the csv file into Elasticsearch using JAVA API (without using logstash).

Elasticsearch version – 6.6

I have tried the below program using Jackson format to get source Map for IndexRequest. Because I can’t predefined the POJO variables. So I used dynamic Map from CSV file

import java.io.FileInputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.logging.Logger;

import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.MappingIterator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.csv.CsvMapper;
import com.fasterxml.jackson.dataformat.csv.CsvSchema;
import com.zoho.dedupe.connection.DukeESConnection;

public class BulkImport {

    private static Logger logger = Logger.getLogger(BulkImport.class.getName());    


public static void main(String args[]) {
        long starttime = System.currentTimeMillis();
        logger.info("ElasticSearchServiceImpl => bulkInsert Service Started");
        FileInputStream fis = null;

        BulkRequest request;
        RestHighLevelClient client;
        //elastic Search Index Name
        String esIndex = "post";
        try {
            boolean isHeaderSet = false;
            Set<String> header = new HashSet<String>();

            fis = new FileInputStream("/Users/test/Documents/Test.csv");
            request = new BulkRequest();
            MappingIterator<Map<String, Object>> data = parse(fis);
            while (data.hasNext()) {
                Map<?,?> value = data.next();
                if(!isHeaderSet) {
                    header.add("id");
                    header = (Set<String>) value.keySet();
                    isHeaderSet= true;
                }
                System.out.println(value);
                request.add(getIndexRequest(value, esIndex));
            }
            fis.close();

            if(request.numberOfActions()>0) {
               String hostsInString = "localhost";
                List<HttpHost> httpHosts = new ArrayList<HttpHost> ( );
    String[] hosts = hostsInString.split (","); 
    for (String host : hosts)
    {
        HttpHost httpHost = new HttpHost (host, 9200, "http");
        httpHosts.add (httpHost);
    }
                client = client =  new RestHighLevelClient (RestClient.builder (
            httpHosts.toArray(new HttpHost[]{})).setMaxRetryTimeoutMillis (10 * 60000).setRequestConfigCallback(
            new RestClientBuilder.RequestConfigCallback() {
                @Override
                public RequestConfig.Builder customizeRequestConfig(
                        RequestConfig.Builder requestConfigBuilder) {
                    return requestConfigBuilder
                        .setConnectTimeout (60000)
                        .setSocketTimeout (10 * 60000);
                }

            }));
                CreateIndexRequest crrequest = new CreateIndexRequest(esIndex); 
                Map<String, Object> jsonMap = new HashMap<>();
                Map<String, Object> message = new HashMap<>();
                message.put("type", "text");
                Map<String, Object> keyword = new HashMap<>();
                Map<String, Object> type = new HashMap<>();
                type.put("type", "keyword");
                type.put("ignore_above", 256);
                keyword.put("keyword", type);
                message.put("fields", keyword);
                Map<String, Object> properties = new HashMap<>();
                for (Object hdr :header) {
                    properties.put(hdr.toString(), message);
                }
                Map<String, Object> mapping = new HashMap<>();
                mapping.put("properties", properties);
                jsonMap.put("_doc", mapping);
                crrequest.mapping("_doc", jsonMap); 
                CreateIndexResponse createIndexResponse = client.indices().create(crrequest, RequestOptions.DEFAULT);
                boolean acknowledged = createIndexResponse.isAcknowledged(); 
                System.out.println(acknowledged);
                BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
                if(bulkResponse.hasFailures()) {
                    logger.info("ElasticSearchServiceImpl => bulkInsert : Some of the record has failed.Please reinitiate the process");
                } else {
                    logger.info("ElasticSearchServiceImpl => bulkInsert : Success");
                }
            } else {
                logger.info("ElasticSearchServiceImpl => bulkInsert : No request for BulkInsert ="+request.numberOfActions());
            }

        } catch (Exception e) {
            logger.info("ElasticSearchServiceImpl => bulkInsert : Exception =" + e.getMessage());
            e.printStackTrace();
        }
        long endTime = System.currentTimeMillis();
        logger.info("ElasticSearchServiceImpl => bulkInsert End "  + (endTime - starttime));
    }
public static MappingIterator<Map<String, Object>> parse(FileInputStream input) throws Exception {


    MappingIterator<Map<String, Object>> map = readObjectsFromCsv(input);
    return  map;
    //writeAsJson(data);
}

public static MappingIterator<Map<String, Object>> readObjectsFromCsv(FileInputStream file) throws IOException {
    CsvSchema bootstrap = CsvSchema.emptySchema().withHeader().withColumnSeparator(',');
    CsvMapper csvMapper = new CsvMapper();
    MappingIterator<Map<String, Object>> mappingIterator = csvMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES).reader(Map.class).with(bootstrap).readValues(file);
   // System.out.println("Column names: " + mappingIterator.next().keySet());
    return mappingIterator;
}

public static void writeAsJson(List<Map<?, ?>> data) throws IOException, JSONException {
    ObjectMapper mapper = new ObjectMapper();
    String value = mapper.writeValueAsString(data);
    JSONArray json = new JSONArray(value);
    System.out.println(json);
}

        public static IndexRequest getIndexRequest(Map data,String index)throws Exception {
            IndexRequest indexRequest = null;

        indexRequest = new IndexRequest(index).id(UUID.randomUUID().toString()).source(data);
        System.out.println(indexRequest.toString());
        return indexRequest;
        }
}

I got the below exception while running the program

    {Document Name=dhjajga, Title=sdas, Name=asd, DOB=14-43-22}
index {[post][null][c2148857-87e0-4407-b5f5-b4f5f52c40d2], source[{"Document Name":"dhjajga","Title":"sdas","Name":"asd","DOB":"14-43-22"}]}
Jun 11, 2020 4:06:18 PM com.zoho.dedupe.connection.DukeESConnection connect
INFO: Client org.elasticsearch.client.RestHighLevelClient@7c51f34b
true
Jun 11, 2020 4:06:18 PM BulkImport main
INFO: ElasticSearchServiceImpl => bulkInsert : Exception =Validation Failed: 1: type is missing;2: type is missing;3: type is missing;
org.elasticsearch.action.ActionRequestValidationException: Validation Failed: 1: type is missing;2: type is missing;3: type is missing;
    at org.elasticsearch.action.bulk.BulkRequest.validate(BulkRequest.java:612)
    at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1728)
    at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1694)
    at org.elasticsearch.client.RestHighLevelClient.bulk(RestHighLevelClient.java:470)
    at BulkImport.main(BulkImport.java:85)
Jun 11, 2020 4:06:18 PM BulkImport main
INFO: ElasticSearchServiceImpl => bulkInsert End 1432

When I try to insert the same above indexrequest its working fine.

curl -X POST "localhost:9200/post/_doc/?pretty" -H 'Content-Type: application/json' -d'
{
    "Document Name":"dhjajga","Title":"sdas","Name":"asd","DOB":"14-43-22"
}
'

{
  "_index" : "post",
  "_type" : "_doc",
  "_id" : "jBPronIB0Wb3XTTasBjG",
  "_version" : 1,
  "result" : "created",
  "_shards" : {
    "total" : 2,
    "successful" : 1,
    "failed" : 0
  },
  "_seq_no" : 0,
  "_primary_term" : 1
}

Please help to fix the issue in java program. Thanks in advance

Answer

Before Elasticsearch version 7 you have to specify a type with your Indexrequest. It is recommended to use the type “_doc”.



Source: stackoverflow