I am new to dynamodb and I need to process 5M records. Each record has an id and a status. I need to query for each record based on its status, process it, and finally update the status.
I am using DynamoDbEnhancedClient
but I could not find example on how to query based only on the range and not the hash, while avoiding scan.
I tried to create a query with a condition and a limit of 1 but it did not work.
Here is what I have:
My Customer model:
@DynamoDbPartitionKey
private String id;
@DynamoDbSecondarySortKey(indexNames = "status")
private String status;
private String name;
configs:
@Bean
public DynamoDbEnhancedClient dynamoDbEnhancedClient(){
return DynamoDbEnhancedClient.builder()
.dynamoDbClient(dynamoDbClient())
.extensions(AutoGeneratedTimestampRecordExtension.create())
.build();
}
My query:
static final TableSchema<Customer> CUSTOMER_TABLE = TableSchema.fromClass(Customer.class);
public Customer findByStatus() {
DynamoDbTable<Customer> customerTable = dynamoDbEnhancedClient.table("customer", CUSTOMER_TABLE);
QueryConditional queryConditionalPerPartition = new EqualToConditional(Key.builder().
partitionValue("status").
build());
QueryEnhancedRequest request = QueryEnhancedRequest.builder()
.limit(1)
.queryConditional(queryConditionalPerPartition)
.build();
PageIterable<Customer> pageIterable = customerTable.query(request);
Customer customer = pageIterable.stream().findFirst().get().items().get(0);
return customer;
}
However that does not work. How can query by status and only get a single result? I have no restriction on the table structure and I can change it however I require.
Advertisement
Answer
So I finally figured it out, I needed to create create a GSI (Global Secondary Index)
When creating the GCI you define the the sort key as hash key and than you can query the index.
create table:
aws dynamodb create-table
--table-name customer
--key-schema
AttributeName=id,KeyType=HASH
AttributeName=status,KeyType=RANGE
--attribute-definitions
AttributeName=id,AttributeType=S
AttributeName=status,AttributeType=S
--provisioned-throughput
ReadCapacityUnits=5,WriteCapacityUnits=5
--table-class STANDARD
--global-secondary-index '[
{
"IndexName": "id-status",
"KeySchema": [
{
"AttributeName": "status",
"KeyType": "HASH"
},
{
"AttributeName": "id",
"KeyType": "RANGE"
}
],
"Projection": {
"ProjectionType": "ALL"
},
"ProvisionedThroughput": {
"ReadCapacityUnits": 1,
"WriteCapacityUnits": 1
}
}
]'
My model in Java:
@DynamoDbBean
public class Customer {
private String id;
private String status;
private Instant created;
private Instant updated;
public Customer(){
@DynamoDbPartitionKey
@DynamoDbSecondarySortKey(indexNames = "id-status")
public String getId() {
return id;
}
@DynamoDbSortKey
@DynamoDbSecondaryPartitionKey(indexNames = "id-status")
public String getStatus() {
return status;
}
@DynamoDbAutoGeneratedTimestampAttribute
@DynamoDBTypeConverted(converter = InstantToStringTypeConverter.class)
public Instant getCreated() {
return created;
}
@DynamoDbAutoGeneratedTimestampAttribute
@DynamoDBTypeConverted(converter = InstantToStringTypeConverter.class)
public Instant getUpdated() {
return updated;
}
}
Than query the db:
@Service
public class customerDAO {
static final TableSchema<customer> CUSTOMER_TABLE =
TableSchema.fromBean(Customer.class);
@Autowired
private DynamoDbEnhancedClient dynamoDbEnhancedClient;
public Customer findByStatus() {
DynamoDbTable<Customer> customerTable = dynamoDbEnhancedClient.table("customer", CUSTOIMER_TABLE);
DynamoDbIndex<Customer> secIndex = customerTable.index("id-status");
QueryConditional queryConditional = QueryConditional
.keyEqualTo(Key.builder().partitionValue("PENDING").
build());
PageIterable<Customer> results =
(PageIterable<Customer>) secIndex.query(QueryEnhancedRequest.builder().
.queryConditional(queryConditional)
.build());
results.forEach(p -> p.items().forEach(item -> System.out.println(customer)));
}
}