Saturday, December 29, 2012

Implementing Producer / Consumer using SynchronousQueue

Among plenty of useful classes which Java provides for concurrency support, there is one I would like to talk about: SynchronousQueue. In particular, I would like to walk through Producer / Consumer implementation using handy SynchronousQueue as an exchange mechanism.

It might not sound clear why to use this type of queue for producer / consumer communication unless we look under the hood of SynchronousQueue implementation. It turns out that it's not really a queue as we used to think about queues. The analogy would be just a collection containing at most one element.

Why it's useful? Well, there are several reasons. From producer's point of view, only one element (or message) could be stored into the queue. In order to proceed with the next element (or message), the producer should wait till consumer consumes the one currently in the queue. From consumer's point of view, it just polls the queue for next element (or message) available. Quite simple, but the great benefit is: producer cannot send messages faster than consumer can process them.

Here is one of the use cases I encountered recently: compare two database tables (possibly just huge) and detect if those contain different data or data is the same (copy). The SynchronousQueue is quite a handy tool for this problem: it allows to handle each table in own thread as well as compensate the possible timeouts / latency while reading from two different databases.

Let's start by defining our compare function which accepts source and destination data sources as well as a table name (to compare). I am using quite useful JdbcTemplate class from Spring framework as it extremely well abstract all the boring details dealing with connections and prepared statements.

public boolean compare( final DataSource source, final DataSource destination, final String table )  {
    final JdbcTemplate from = new  JdbcTemplate( source );
    final JdbcTemplate to = new JdbcTemplate( destination );
}

Before doing any actual data comparison, it's a good idea to compare table's row count of the source and destination databases:

if( from.queryForLong("SELECT count(1) FROM " + table ) != to.queryForLong("SELECT count(1) FROM " + table ) ) {
    return false;
}

Now, at least knowing that table contains same number of rows in both databases, we can start with data comparison. The algorithm is very simple:

  • create a separate thread for source (producer) and destination (consumer) databases
  • producer thread reads single row from the table and puts it into the SynchronousQueue
  • consumer thread also reads single row from the table, then asks queue for the available row to compare (waits if necessary) and lastly compare two result sets

Using another great part Java concurrent utilities for thread pooling, let's define a thread pool with fixed amount of threads (2).

final ExecutorService executor = Executors.newFixedThreadPool( 2 );
final SynchronousQueue< List< ? > > resultSets = new SynchronousQueue< List< ? > >();        

Following the described algorithm, the producer functionality could be represented as a single callable:

Callable< Void > producer = new Callable< Void >() {
    @Override
    public Void call() throws Exception {
        from.query( "SELECT * FROM " + table,
            new RowCallbackHandler() {
                @Override
                public void processRow(ResultSet rs) throws SQLException {
                    try {                   
                        List< ? > row = ...; // convert ResultSet to List
                        if( !resultSets.offer( row, 2, TimeUnit.MINUTES ) ) {
                            throw new SQLException( "Having more data but consumer has already completed" );
                        }
                    } catch( InterruptedException ex ) {
                        throw new SQLException( "Having more data but producer has been interrupted" );
                    }
                }
            }
        );

        return  null;
    }
};

The code is a bit verbose due to Java syntax but it doesn't do much actually. Every result set read from the table producer converts to a list (implementation has been omitted as it's a boilerplate) and puts in a queue (offer). If queue is not empty, producer is blocked waiting for consumer to finish his work. The consumer, respectively, could be represented as a following callable:

Callable< Void > consumer = new Callable< Void >() {
    @Override
    public Void call() throws Exception {
        to.query( "SELECT * FROM " + table,
            new RowCallbackHandler() {
                @Override
                public void processRow(ResultSet rs) throws SQLException {
                    try {
                        List< ? > source = resultSets.poll( 2, TimeUnit.MINUTES );
                        if( source == null ) {
                            throw new SQLException( "Having more data but producer has already completed" );
                        }                                     
 
                        List< ? > destination = ...; // convert ResultSet to List
                        if( !source.equals( destination ) ) {
                            throw new SQLException( "Row data is not the same" );
                        }
                    } catch ( InterruptedException ex ) {
                        throw new SQLException( "Having more data but consumer has been interrupted" );
                    }
                }
            }
        );
                    
        return  null;
    }
};

The consumer does a reverse operation on the queue: instead of putting data it pulls it (poll) from the queue. If queue is empty, consumer is blocked waiting for producer to publish next row. The part which is left is only submitting those callables for execution. Any exception returned by the Future's get method indicates that table doesn't contain the same data (or there are issue with getting data from database):

    List< Future< Void > > futures = executor.invokeAll( Arrays.asList( producer, consumer ) );
    for( final Future< Void > future: futures ) {
        future.get( 5, TimeUnit.MINUTES );
    }

That's basically all for today ... and this year. Happy New Year to everyone!

3 comments:

Colin Yates said...

First: Happy new year!

Second - this pattern has the disadvantage that the producer is stalled waiting for the consumer to work. Imagine the following:

tick p1: pushes row
tick c1: queries their DB, p1 gets next row ready

p1 cannot push now because c1 has stalled (due to a network blip).

Another approach which essentially accommodates the speed differences between the two is using a http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/BlockingQueue.html. This allows the queue to build up to a (possibly fixed) number. If that number was 5 then p1 can put 5 rows onto the Q until it blocks.

The other point is that this is a perfect case for divide and conquer. If the row count is large then have separate workers for each segment in the database (i.e. worker 1 compares rows 0..100, worker 2 compares rows 101..200 etc.). Create an executor service (or simply thread pool) based on CPU * 2 and off you go. CPU * 4 might work as well because I imagine the CPU effort will be eclipsed by network effort.

Colin Yates said...

Minor point - java returns "void", not "Void". Keep those nasty upper case C# constructs away! :)

Andriy Redko said...

Hi Colin,

Thanks a lot for valuable comments. I definitely agree that your approach is a traditional producer / consumer implementation (with well-known pros and cons). The case I have specifically described here benefits from using the structure like SynchronousQueue. No matter how fast producer reads from one database, the data has to be compared with another database. The same applies to consumer. Another point is that I am relying on server-side cursors (read-only, forward-only) so the whole dataset is streamed (no need to use LIMIT / ROWNUM, etc). Lastly, because Callable< T > requires return type, you cannot specified void (it's not a type) but Void so to indicate you return nothing.

Anyway, with no doubts your approach works as well. The SynchronousQueue stays a bit aside from other well-known alternatives (like blocking queue) so I found it to be quite useful for this particular use case described.

Thank you a lot.