Skip to content
Advertisement

Is this the correct way to extract counts from a Concurrent Hash Map without missing some or double counting?

Working on something where I’m trying to count the number of times something is happening. Instead of spamming the database with millions of calls, I’m trying to sum the updates in-memory and then dumping the results into the database once per second (so like turning 10 +1s into a single +10)

I’ve noticed some strange inconsistency with the counts (like there should be exactly 1 million transactions but instead there are 1,000,016 or something).

I’m looking into other possible causes but I wanted to check that this is the correct way of doing things. The use case is that it needs to be eventually correct, so it’s okay as long as the counts aren’t double counted or dropped.

Here is my sample implementation.

public class Aggregator {
    private Map<String, LongAdder> transactionsPerUser = new ConcurrentHashMap<>();
    private StatisticsDAO statisticsDAO;

    public Aggregator(StatisticsDAO statisticsDAO) {
        this.statisticsDAO = statisticsDAO;
    }

    public void incrementCount(String userId) {
        transactionsPerId.computeIfAbsent(userId, k -> new LongAdder()).increment();
    }

    @Scheduled(every = "1s")
    public void sendAggregatedStatisticsToDatabase() {
        for (String userId : transactionsPerUser.keySet()) {
            long count = transactionsPerUser.remove(userId).sum();
            statisticsDAO.updateCount(userId, count);
        }
    }
}

Advertisement

Answer

You will have updates dropped in the following scenario:

  • Thread A calls incrementCount, and finds an already existing LongAdder instance for the given userId, this instance is returned from computeIfAbsent.
  • Thread B is at the same time handling a sendAggregatedStatisticsToDatabase call, which removes that LongAdder instance from the map.
  • Thread B calls sum() on the LongAdder instance.
  • Thread A, still executing that same incrementCount invocation, now calls increment() on the LongAdder instance.

This update is now dropped. It will not be seen by the next invocation of sendAggregatedStatisticsToDatabase, because the increment() call happened on an instance that was removed from the map in between the calls to computeIfAbsent() and increment() in the incrementCount method.

You might be better off reusing the LongAdder instances by doing something like this in sendAggregatedStatisticsToDatabase:

        LongAdder longAdder = transactionsPerUser.get(userId);
        long count = longAdder.sum();
        longAdder.add(-count);
User contributions licensed under: CC BY-SA
2 People found this is helpful
Advertisement