I am new to dynamodb and I need to process 5M records. Each record has an id and a status. I need to query for each record based on its status, process it, and finally update the status.
I am using DynamoDbEnhancedClient
but I could not find example on how to query based only on the range and not the hash, while avoiding scan.
I tried to create a query with a condition and a limit of 1 but it did not work.
Here is what I have:
My Customer model:
@DynamoDbPartitionKey private String id; @DynamoDbSecondarySortKey(indexNames = "status") private String status; private String name;
configs:
@Bean public DynamoDbEnhancedClient dynamoDbEnhancedClient(){ return DynamoDbEnhancedClient.builder() .dynamoDbClient(dynamoDbClient()) .extensions(AutoGeneratedTimestampRecordExtension.create()) .build(); }
My query:
static final TableSchema<Customer> CUSTOMER_TABLE = TableSchema.fromClass(Customer.class); public Customer findByStatus() { DynamoDbTable<Customer> customerTable = dynamoDbEnhancedClient.table("customer", CUSTOMER_TABLE); QueryConditional queryConditionalPerPartition = new EqualToConditional(Key.builder(). partitionValue("status"). build()); QueryEnhancedRequest request = QueryEnhancedRequest.builder() .limit(1) .queryConditional(queryConditionalPerPartition) .build(); PageIterable<Customer> pageIterable = customerTable.query(request); Customer customer = pageIterable.stream().findFirst().get().items().get(0); return customer; }
However that does not work. How can query by status and only get a single result? I have no restriction on the table structure and I can change it however I require.
Advertisement
Answer
So I finally figured it out, I needed to create create a GSI (Global Secondary Index)
When creating the GCI you define the the sort key as hash key and than you can query the index.
create table:
aws dynamodb create-table --table-name customer --key-schema AttributeName=id,KeyType=HASH AttributeName=status,KeyType=RANGE --attribute-definitions AttributeName=id,AttributeType=S AttributeName=status,AttributeType=S --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 --table-class STANDARD --global-secondary-index '[ { "IndexName": "id-status", "KeySchema": [ { "AttributeName": "status", "KeyType": "HASH" }, { "AttributeName": "id", "KeyType": "RANGE" } ], "Projection": { "ProjectionType": "ALL" }, "ProvisionedThroughput": { "ReadCapacityUnits": 1, "WriteCapacityUnits": 1 } } ]'
My model in Java:
@DynamoDbBean public class Customer { private String id; private String status; private Instant created; private Instant updated; public Customer(){ @DynamoDbPartitionKey @DynamoDbSecondarySortKey(indexNames = "id-status") public String getId() { return id; } @DynamoDbSortKey @DynamoDbSecondaryPartitionKey(indexNames = "id-status") public String getStatus() { return status; } @DynamoDbAutoGeneratedTimestampAttribute @DynamoDBTypeConverted(converter = InstantToStringTypeConverter.class) public Instant getCreated() { return created; } @DynamoDbAutoGeneratedTimestampAttribute @DynamoDBTypeConverted(converter = InstantToStringTypeConverter.class) public Instant getUpdated() { return updated; } }
Than query the db:
@Service public class customerDAO { static final TableSchema<customer> CUSTOMER_TABLE = TableSchema.fromBean(Customer.class); @Autowired private DynamoDbEnhancedClient dynamoDbEnhancedClient; public Customer findByStatus() { DynamoDbTable<Customer> customerTable = dynamoDbEnhancedClient.table("customer", CUSTOIMER_TABLE); DynamoDbIndex<Customer> secIndex = customerTable.index("id-status"); QueryConditional queryConditional = QueryConditional .keyEqualTo(Key.builder().partitionValue("PENDING"). build()); PageIterable<Customer> results = (PageIterable<Customer>) secIndex.query(QueryEnhancedRequest.builder(). .queryConditional(queryConditional) .build()); results.forEach(p -> p.items().forEach(item -> System.out.println(customer))); } }