I have an Azure function written in java, which will listen to a queue message on azure, queue message has path to zip file on azure blob container, once the queue message is received it take zip file from the path location on azure and unzips to a container on azure. It works fine for small size files but > 80 MB it shows FailureException: OutOfMemoryError: Java heap spaceStack
exception. My code is as below
@FunctionName("queueprocessor") public void run(@QueueTrigger(name = "msg", queueName = "queuetest", dataType = "", connection = "AzureWebJobsStorage") Details message, final ExecutionContext executionContext, @BlobInput(name = "file", dataType = "binary", connection = "AzureWebJobsStorage", path = "{Path}") byte[] content) { executionContext.getLogger().info("PATH: " + message.getPath()); CloudStorageAccount storageAccount = null; CloudBlobClient blobClient = null; CloudBlobContainer container = null; try { String connectStr = "DefaultEndpointsProtocol=https;AccountName=name;AccountKey=mykey;EndpointSuffix=core.windows.net"; //unique name of the container String containerName = "output"; // Config to upload file size > 1MB in chunks int deltaBackoff = 2; int maxAttempts = 2; BlobRequestOptions blobReqOption = new BlobRequestOptions(); blobReqOption.setSingleBlobPutThresholdInBytes(1024 * 1024); // 1MB blobReqOption.setRetryPolicyFactory(new RetryExponentialRetry(deltaBackoff, maxAttempts)); // Parse the connection string and create a blob client to interact with Blob storage storageAccount = CloudStorageAccount.parse(connectStr); blobClient = storageAccount.createCloudBlobClient(); blobClient.setDefaultRequestOptions(blobReqOption); container = blobClient.getContainerReference(containerName); container.createIfNotExists(BlobContainerPublicAccessType.CONTAINER, new BlobRequestOptions(), new OperationContext()); ZipInputStream zipIn = new ZipInputStream(new ByteArrayInputStream(content)); ZipEntry zipEntry = zipIn.getNextEntry(); while (zipEntry != null) { executionContext.getLogger().info("ZipEntry name: " + zipEntry.getName()); //Getting a blob reference CloudBlockBlob blob = container.getBlockBlobReference(zipEntry.getName()); ByteArrayOutputStream outputB = new ByteArrayOutputStream(); byte[] buf = new byte[1024]; int n; while ((n = zipIn.read(buf, 0, 1024)) != -1) { outputB.write(buf, 0, n); } // Upload to container ByteArrayInputStream inputS = new ByteArrayInputStream(outputB.toByteArray()); blob.setStreamWriteSizeInBytes(256 * 1024); // 256K blob.upload(inputS, inputS.available()); executionContext.getLogger().info("ZipEntry name: " + zipEntry.getName() + " extracted"); zipIn.closeEntry(); zipEntry = zipIn.getNextEntry(); } zipIn.close(); executionContext.getLogger().info("FILE EXTRACTION FINISHED"); } catch(Exception e) { e.printStackTrace(); } }
Details message
has ID and file path, path is given as input to @BlobInput(..., path ={Path},...)
. According to my analysis I feel @BlobInput
is loading full file into memory that’s why I am getting OutOfMemoryError
. If I’m right please let me know any other way to avoid it ?. Because in future file size might go up to 2GB. In case any mistake in unzipping code please let me know. Thanks.
Advertisement
Answer
I summarize @JoachimSauer’s suggestion as below.
When we use Azure Function blob storage binding to process Azure blob content in java function application, it will hold the whole content in memory. Using it to process large size files, we may face OutOfMemoryError
. So If we want to process large size azure blob, we should use the blob sdk to open an input stream then process content with the stream.
For example
SDK
<dependency> <groupId>com.azure</groupId> <artifactId>azure-storage-blob</artifactId> <version>12.9.0</version> </dependency>
Code
String accountName=""; String accountKey=""; StorageSharedKeyCredential sharedKeyCredential = new StorageSharedKeyCredential(accountName, accountKey); BlobServiceClient blobServiceClient = new BlobServiceClientBuilder() .credential(sharedKeyCredential) .endpoint("https://" + accountName + ".blob.core.windows.net") .buildClient(); BlobContainerClient desContainerClient = blobServiceClient.getBlobContainerClient("output"); BlobContainerClient sourceContainerClient = blobServiceClient.getBlobContainerClient("upload"); BlobInputStreamOptions option = new BlobInputStreamOptions(); //The size of each data chunk returned from the service option.setBlockSize(1024*1024); ZipInputStream zipInput = null; try { zipInput= new ZipInputStream( sourceContainerClient.getBlobClient("<read file name deom queue message>").openInputStream(option)); ZipEntry zipEntry= zipInput.getNextEntry(); while(zipEntry != null){ System.out.println("ZipEntry name: " + zipEntry.getName()); BlobOutputStream outputB = desContainerClient.getBlobClient(zipEntry.getName()).getBlockBlobClient().getBlobOutputStream(); byte[] bytesIn = new byte[1024*1024]; int read = 0; while ((read = zipInput.read(bytesIn)) != -1) { outputB.write(bytesIn, 0, read); } outputB.flush(); outputB.close(); zipInput.closeEntry(); zipEntry =zipInput.getNextEntry(); } } catch (IOException e) { e.printStackTrace(); }finally { try { zipInput.close(); } catch (IOException e) { e.printStackTrace(); } }
For more details, please refer to here.