I’m working on Java app that uses Spark 2.3.1 to load data from Oracle to HDFS and vice versa. I want to create CSV file in HDFS and then load it to Oracle (12.2) BLOB.
The code..
//create Dataset Dataset<Row> dataset = SparkService.sql("select * from test_table"); String trgtFileWithPath = "/tmp/test_table.csv"; //save file in HDFS dataset.write().mode("overwrite").format("csv").save(trgtFileWithPath); //get file from HDFS JavaSparkContext jsc = SparkContextUtil.getJavaSparkContext("appId"); JavaRDD<String> textFile = jsc.textFile(trgtFileWithPath); //Call Oracle package, that inserts into table with BLOB field File csvFile = new File("/tmp/ETLFramework/test_table1.csv"); BufferedInputStream bis = new BufferedInputStream(new FileInputStream(csvFile), 500); Connection conn = tbl.getJdbcConnection(); //there is tbl var with java.sql.Connection CallableStatement cstmt = conn.prepareCall(String.format("{call %s(?, ?, ?)}", "ORACLE_API_FOR_ETL_FRAMEWORK.INSERT_LOB")); cstmt.setString(1, "FILE_TO_LOB"); cstmt.setString(2, "/tmp/test_table.csv"); cstmt.setClob(3, bis, (int) csvFile.length()); cstmt.execute(); if (!conn.getAutoCommit()) { conn.commit(); }
I’m new to Spark.. so any ideas please how to convert JavaRDD to BufferedInputStream, or get rid of mess above and put Dataset to Oracle BLOB in more sane way..
Thanks
Advertisement
Answer
Finally.. after couple days of fighting with Oracle, Hadoop and Spark, I found solution for my task:
try { String trgtFolderPath = "tmp/ETLFramework/csv/form_name"; Configuration conf = new Configuration(); String hdfsUri = "hdfs://" + /*nameNode*/ + ":" + /*hdfsPort*/; FileSystem fileSystem = FileSystem.get(URI.create(hdfsUri), conf); RemoteIterator<LocatedFileStatus> fileStatusListIterator = fileSystem.listFiles(new Path(trgtFolderPath), true); while(fileStatusListIterator.hasNext()){ LocatedFileStatus fileStatus = fileStatusListIterator.next(); String fileName = fileStatus.getPath().getName(); if (fileName.contains(".csv") && fileStatus.getLen()>0) { log.info("fileName=" + fileName); log.info("fileStatus.getLen=" + fileStatus.getLen()); BufferedInputStream bis = new BufferedInputStream(fileSystem.open(new Path(trgtFolderPath + "/" + fileName)), 500); ETLParams param = ETLParams.getParams(); Connection conn = tbl.getJdbcConnection(); String apiPackageInsertLOB = ETLService.replaceParams(tbl.getConnection().getFullSchema() + "." + tbl.getApiPackage().getDbTableApiPackageInsertLOB(), param.getParamsByName()); log.info(String.format("Call %s(%s, %s, %s);", apiPackageInsertLOB, tbl.getFullTableName(), trgtFolderPath + "/" + fileName, "p_nInsertedRows")); CallableStatement cstmt = conn.prepareCall(String.format("{call %s(?, ?, ?, ?)}", apiPackageInsertLOB)); cstmt.setString(1, tbl.getFullTableName()); cstmt.setString(2, trgtFolderPath + "/" + fileName); cstmt.setBlob(3, bis, fileStatus.getLen()); cstmt.registerOutParameter(4, Types.INTEGER); cstmt.execute(); int rowsInsertedCount = cstmt.getInt(3); log.info("Inserted " + rowsInsertedCount + " rows into table blob_file"); cstmt.close(); } } fileSystem.close(); } catch (IOException | SQLException exc){ exc.printStackTrace(); }
Writing of 2 Gb CSV from Spark Dataset into HDFS, and following reading of this CSV from HDFS into Oracle BLOB took about 5 minutes..