The idea is to allow ConcurrentSkipListMap
to store only one ApprovalRequest
which has unique customerId
and its state is PENDING
. I supplied overridden hashCode
and equals
implementations. Moreover, in unit test the ApprovalRequest
suppose to create a new instance using lombok’s @Builder
. How to make it work?
@Component public class LoanRepository { private final ConcurrentSkipListMap<ApprovalRequest, ConcurrentHashMap<String, Decision>> pendingStorage; public synchronized void saveAsPending(final LoanApprovalRequest loanApprovalRequest) { log.info("Trying to save: {}", loanApprovalRequest); if (pendingStorage.containsKey(loanApprovalRequest)) { log.error("Attempt to save duplicate pending LoanApprovalRequest: {}", loanApprovalRequest); throw new BusinessRuleException("Attempt to save duplicate pending LoanApprovalRequest: " + loanApprovalRequest); } ConcurrentHashMap<String, Decision> decisions = new ConcurrentHashMap<>(); for (Approver approver : loanApprovalRequest.getApprovers()) { Decision pendingDecision = Decision.builder() .customerId(loanApprovalRequest.getCustomerId()) .approverUsername(approver.getName()) .state(PENDING) .build(); decisions.put(approver.getName(), pendingDecision); } if (pendingStorage.putIfAbsent(loanApprovalRequest, decisions) == null) { log.info("Successfully added new LoanApprovalRequest: {}", loanApprovalRequest); } else { log.error("Save failed. Duplicate LoanApprovalRequest: {}", loanApprovalRequest); throw new BusinessRuleException("Fail to add LoanApprovalRequest. Duplicate LoanApprovalRequest: " + loanApprovalRequest); } log.info("New storage size: {}", pendingStorage.size()); } }
Test:
ConcurrentSkipListMap<ApprovalRequest, ConcurrentHashMap<String, Decision>> pendingStorage; @BeforeEach public void each() { mainStorage = new ConcurrentSkipListMap<>(); pendingStorage = new ConcurrentSkipListMap<>(); repository = new LoanRepository(mainStorage, pendingStorage, threadPoolTaskScheduler); } @Order(2) @Test public void givenTwoProducers_whenSaving30LoanApprovalRequestsConcurrently_expectCorrectStatistics() throws InterruptedException { final int numberOfThreads = 2; final ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads); CountDownLatch completedThreadCounter = new CountDownLatch(numberOfThreads); CountDownLatch readyThreadCounter = new CountDownLatch(numberOfThreads); CountDownLatch callingThreadBlocker = new CountDownLatch(1); Runnable producer1 = () -> { try { readyThreadCounter.countDown(); callingThreadBlocker.await(); Set<Approver> approver = new HashSet<>(); approver.add(new Approver("Under €1_000 Approver")); LoanApprovalRequest request; for (int i = 0; i < 5; i++) { request = LoanApprovalRequest.builder() .customerId("1X-XXXX-XAX") .decisionState(PENDING) .loanAmount(BigDecimal.valueOf(123.01 + i)) .approvers(approver) .timestamp(ZonedDateTime.now()) .build(); try { repository.saveAsPending(request); } catch (BusinessRuleException be) { System.out.println(be.getMessage()); } Thread.sleep(i * 10L); } } catch (InterruptedException interruptedException) { interruptedException.printStackTrace(); } finally { completedThreadCounter.countDown(); } }; Runnable producer2 = () -> { try { readyThreadCounter.countDown(); callingThreadBlocker.await(); Set<Approver> approver = new HashSet<>(); approver.add(new Approver("Under €9_000 Approver")); LoanApprovalRequest request; for (int i = 0; i < 5; i++) { request = LoanApprovalRequest.builder() .customerId("2X-XXXX-XWX") .loanAmount(BigDecimal.valueOf(1023.55 + i * 10)) .decisionState(PENDING) .approvers(approver) .timestamp(ZonedDateTime.now()) .build(); try { repository.saveAsPending(request); } catch (BusinessRuleException be) { System.out.println(be.getMessage()); } Thread.sleep(i * 10L); } completedThreadCounter.countDown(); } catch (InterruptedException interruptedException) { interruptedException.printStackTrace(); } finally { completedThreadCounter.countDown(); } }; executorService.execute(producer1); executorService.execute(producer2); readyThreadCounter.await(); callingThreadBlocker.countDown(); completedThreadCounter.await(); executorService.shutdown(); Statistics statistics = repository.getStatistics(Duration.ofSeconds(60)); assertEquals(2, statistics.getCount()); }
LoanApprovalRequest
@Builder @Data @NoArgsConstructor @AllArgsConstructor public class LoanApprovalRequest implements ApprovalRequest, Comparable<LoanApprovalRequest> { public LoanApprovalRequest(ZonedDateTime zonedDateTime) { this.timestamp = zonedDateTime; } String customerId; BigDecimal loanAmount; Set<Approver> approvers; ZonedDateTime timestamp; DecisionState decisionState; @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; LoanApprovalRequest that = (LoanApprovalRequest) o; return customerId.equals(that.customerId); } @Override public int hashCode() { return customerId.hashCode(); } @Override public int compareTo(LoanApprovalRequest o) { return this.timestamp.compareTo(o.timestamp); } @Override public String toString() { return "LoanApprovalRequest{" + "customerId='" + customerId + ''' + ", loanAmount=" + loanAmount + // ", approvers=[" + approvers.stream().map(Approver::getName).collect(Collectors.joining(",")) + "]" + // ", timestamp=" + timestamp.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss:nnnnnnnnn").withZone(ZoneId.of("UTC"))) + ", decisionState=" + decisionState + '}'; } }
Advertisement
Answer
ConcurrentSkipListMap
is not based on hash codes, but on ordering/comparisons.
So you will have to use that customerId
in compareTo
as well (or supply the Map with a different Comparator
based on customerId
). Otherwise it will not be consistent with equals
and the Map key uniqueness checks won’t work.