Concurrency – Two threads querying multiple databases from different servers and compare each record synchronously

Tags: , ,



I want to have two threads querying (JDBC) two tables (from different servers/databases but related) for an ordered output then compare them or apply some logic record by record.

The table size can be very large so I was thinking using thread would be the most efficient way to get this done with the least footprint.

Example:

Thread1 – query table server1.database1.schema1.tableA ordered by 1;

Thread2 – query table server2.database2.schema2.tableB where [conditions/logics related to A] order by 1;

Synchronized on each record in the ResultSet in both thread and apply the comparison or data logic.

For example: ResultSet from Thread1 = [1,2,3,4,5]

ResultSet from Thread2 = [2,4,6,8,10]

I want to be able to synchronize at each index (0…4), and compare them. Say Thread1.ResultSet[0] = Thread2.ResultSet[0]/2.

That means:

1 = 2/2

2 = 4/2

etc…

This is what I have so far, base on another answer i got while researching. I am using AtomicInteger to synchronize the ResultSet iteration.

//Main class
public class App {
    public static void main(String[] args) {
        try {
            ReaderThread t1 = new ReaderThread();
            ReaderThread t2 = new ReaderThread();
            List<ReaderThread> list = new ArrayList<ReaderThread>();
            list.add(t1);
            list.add(t2);
            
            HelperThread helperThread = new HelperThread(list);
            helperThread.start();
            
            t1.setName("Reader1");
            t2.setName("Reader2");
            t1.start();
            t2.start();
        } catch (SQLException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

//Database ReaderThread 
public class ReaderThread extends Thread {

    private DatabaseAccessLayer dal = new DatabaseAccessLayer(); //access layer to instantiate connection, statement and execute query and return ResultSet
    private ResultSet rs;
    private final Object hold = new Object();
    private final AtomicInteger lineCount = new AtomicInteger(0);
    private String currentLine;

    public ReaderThread() throws SQLException {
        this.rs = dal.executeStatement(); //execute SQL query on instantiation and get the resultset
    }

    @Override
    public void run() {
        synchronized (hold) {
            try {
                while (rs.next()) {
                    currentLine = rs.getString(1) + rs.getString(2) + rs.getString(3) + rs.getString(4)
                            + rs.getString(5) + rs.getString(5);
                    lineCount.getAndIncrement();
                    System.out.println(this.getName() + " ||| " + currentLine);
                }
            } catch (SQLException e) {
                e.printStackTrace();
            }

        }
    }

    public void hold () throws InterruptedException {
        this.hold.wait();
    }

    public void release() {
        this.hold.notify();
    }

    public boolean isLocked() {
        return getState().equals(State.WAITING);
    }

    public Object getHold() {
        return hold;
    }

    public AtomicInteger getLineCount() {
        return lineCount;
    }

    public String getCurrentLine() {
        return currentLine;
    }

}

// THe helper class which look at two threads and determine lock conditions and subsequence logic
public class HelperThread extends Thread {
    private List<ReaderThread> threads;

    @Override
    public void run() {
        while (true) {
            threads.forEach(t -> {
                try {
                    int r1 = 0;
                    int r2 = 0;
                    //======== lock and synchronize logic here =========
                    if (t.getName().equals("Reader1")) r1 = t.getLineCount().get();
                    if (t.getName().equals("Reader2")) r2 = t.getLineCount().get();
                    if (t.getName().equals("Reader1") && r1 == r2) t.hold();
                    if (t.getName().equals("Reader2") && r2 == r1) t.hold();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });

            if (threads.stream().allMatch(ReaderThread::isLocked)) {
                System.out.println("next line:");

                threads.forEach(t -> {
                    synchronized (t.getLock()) {
                        System.out.println(t.getCurrentLine());
                        t.release();
                    }
                });

                System.out.println("n");
            }
        }

    }

    public HelperThread(List<ReaderThread> threads) {
        this.threads = threads;
    }
}

The above code is able to execute the query concurrently on the tables and print out the resultset for each. However, the locking/holding logic is not working. I am trying to put the threads on hold when the AtomicInteger variable are the same in both thread. What this means in my code is that it will iterate through the results set one by one. For each one the AtomicInteger variable is incremented and will wait until the other thread’s AtomicInteger variable to get to the same value. Then the comparison logic happens then both threads are release to move on.

I am unsure about if AtomicInteger is the correct usage here.

Any suggestion is much appreciate.

Answer

A good solution can be to use two ArrayBlockingQueue as buffers

ArrayBlockingQueue db1Buf=new ArrayBlockingQueue<>(BUF_SIZE); ArrayBlockingQueue db2Buf=new ArrayBlockingQueue<>(BUF_SIZE);

the reading threads simply offer lines to the buffer

while (rs.next()) {
        MyData data=new MyData(rs.getString(1),rs.getString(2)...);           
        db1Buf.offer(data); //this waits if the buffer is full
 }
  db1Buf.offer(null); //signal the end of table

the third thread processes data

for(;;) {
     MyData db1Record=db1Buf.take();
     MyData db2Record=db2Buf.take();
     if (db1Record==null || db2Record==null) 
         break;
     
   // do something with db1Record and db2Record

}

No synchronisation is needed because ArrayBlockingQueue is already synchronised.

the reading thread will feed the buffers and block if they are full, the third thread will consume the data waiting the other threads to read data if the buffer was empty. The class MyData is a simple bean with the fields you need.



Source: stackoverflow