Case Study: Concurrent Thread Access
Saturday, May 30th, 2009Whenever I encounter a scenario that I know will benefit from the power of asynchronous multiple threads, I still dread the implementation. Concurrent programming stretches my brain and patience in many ways as things which just seem like they should work end up working not at all, or even worse: code that runs smoothly for 99 out of 100 scenarios but sneaks up to bite at the last moment, when it turns out that what you thought was thread-safe really was not at all.
The other day I had encountered such a scenario. I found it necessary to write a stand-alone application which had the capability of acting as a server and responding to events on the local network so that it could be controlled remotely. In general terms, the application consisted of the following components:
- Server: The main class, responsible for making the network connections
- ServerThread: Started and referenced by Server, to listen for incoming events on the opened connections
- OtherClass1 … OtherClassN: Classes which contain the application’s actual functionality
The goal was to not integrate networking junk throughout the whole application, by instead having network requests captured by the ServerThread and delegated to listeners. This would allow the network interface and the program’s functionality (OtherClassN’s) to be decoupled from the guts of the network connection.
- (setup) Server initiates connection, starts ServerThread, registers itself as a listener
- ServerThread (producer) receives an incoming request, and fires an event to all listeners
- Server (consumer) receives event delegates accordingly to OtherClass
- ServerThread returns to listening
- OtherClass does its thing
If you have written multithreaded applications, you will probably spot the problem with this approach quickly. However, my first attempt at this (shown below) actually worked fine until the ServerThread received and fired an event such as “exit” to its listeners, whereupon my program would invoke a ConcurrentAccessException and go unresponsive. I tracked this error down to it being caused by a simultaneous attempt by ServerThread to loop through the array of IListener interfaces, and the attempt by a notified listener to remove itself from the list before ServerThread had finished looping through the list to notify the remaining listeners.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 | class BadProducer extends Thread { // // Network object declarations here // ... List< IListener > listeners = new ArrayList< IListener >(); Object listenerLock = new Object(); boolean stop = false; public void addListener( IListener listener ) { sychronized ( listenerLock ) { listeners.add( listener ); } } public void removeListener( IListener listener ) { synchronized ( listenerLock ) { if ( listeners.contains( listener ) ) listeners.remove( listener ); } } } public void fireEvent( String szMessage ) { synchronized ( listenerLock ) { for ( IListener l : listeners ) { l.addMessage( szMessage ); } } } public void run() { while( !stop ) { String szLast = reader.getLine(); fireEvent( szLast ); } } } class BadConsumer implements IListener { private BadProducer serverThread; private SomeClass c; /* * Implemented from IListener */ public void addMessage( String szMessage ) { // // Handle message directly // if ( szMessage.equals( "whatever" ) ) { // works fine as long as we don't call into serverThread! c.doWhatever(); } else if ( szMessage.equals( "exit" ) ) { // throws Concurrent Exception serverThread.removeListener( this ); } } public static void main( String[] args ) { serverThread.join(); } } |
Fine, I thought, I’ll make sure to lock the array of IListeners with a synchronize {} block. But the exception was still thrown. Upon begging the advice of one of my coworkers, and examining the logic of my code, I realized that calling another’s thread’s method (in this case, from ServerThread to server) transfers control of execution to that thread, which then releases the lock internally. When Server attempts to call removeListener( this ), it throws an exception.
The solution is a based upon a concurrency pattern called Consumer-Producer. The key reason why this works where the other didn’t is that when ServerThread calls fireEvent(), the IListener responds asynchronously to that event as well. This allows execution to return immediately to serverThread, so that its lock is still maintained. Meanwhile, the server class runs a loop of its own, occaisonally checking for a new message in an vector of its own. (Introducing a vector allows flexibility and data integrity in the case that ServerThread receives multiple requests before Server has handled the last one, acting like a message queue). This is as clear as mud conceptually — so here’s the modified source code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 | class Consumer extends Thread implements IListener { // // Server object declarations here // ... private Vector< String > messageQueue = new Vector< String >(); private Object mqLock = new Object(); public void messageLoop() { while ( true ) { String szNextMessage = ""; // Check for messages synchronized ( mqLock ) { if ( messageQueue.size() > 0 ) { szNextMessage = messageQueue.getItem( 0 ); } } if ( szNextMessage != "" ) { // Handle message // ... // } // Don't lock the producer out sleep( 50 ); } } /* * Implemented from IListener */ public void addMessage( String message ) { synchronized ( mqLock ) { messageQueue.add( message ); } } protected void run() { messageLoop(); } } class Producer extends Thread { private boolean stop = false; private List< IListener > listeners = new ArrayList< IListener >(); public void checkForClientMessages() { while ( !stop ) { String sz = reader.readLine(); fireEvent( sz ); } } private void fireEvent( String event ) { for ( IListener l : listeners ) { l.addMessage( event ); } } protected void run() { checkForClientMessages(); } } |
The new flow resembles the following:
- ServerThread receives an incoming request, calls fireEvent() to all listeners
- ServerThread releases lock and returns to listening
- Server, looping on its own thread, notices an new message in the message queue.
- Server gets the next message from the queue and releases its lock
- Server interprets the event and delegates accordingly
- Server sleeps, then checks the queue again. Meanwhile, ServerThread may have received several events and added them to the message queue, but without having to stop and wait for Server to handle them.

- Sequence Diagram
The new model responded well under testing, even with rapid-fire requests sent with the intent to bog it down. The vector (message queue) acts as a buffer between those events and the event handler, which allows for asynchronous handling. And best of all: no exceptions!










