67 Javanotes 9.0, Section 12.4 — Threads and Networking
Section 12.4
Threads and Networking
In the previous chapter, we looked at several
examples of network programming. Those examples showed how to create
network connections and communicate through them, but they didn’t deal
with one of the fundamental characteristics of network programming,
the fact that network communication is asynchronous.
From the point of view of a program on one end of a network connection,
messages can arrive from the other side of the connection at any time;
the arrival of a message is an event that is not under the
control of the program that is receiving the message. Perhaps an
event-oriented networking API would be a good approach to dealing
with the asynchronous nature of network communication, but that is
not the approach that is taken in Java.
Instead, network programming in Java typically uses threads.
12.4.1 The Blocking I/O Problem
As covered in Section 11.4, network programming uses
sockets. A socket, in the sense that we are using the term here,
represents one end of a network connection.
Every socket has an associated input stream and output stream.
Data written to the output stream on one end of the connection is
transmitted over the network and appears in the input stream
at the other end.
A program that wants to read data from a socket’s input stream
calls one of that input stream’s input methods. It is possible
that the data has already arrived before the input method is called;
in that case, the input method retrieves the data and returns
immediately. More likely, however, the input method will have
to wait for data to arrive from the other side of the connection.
Until the data arrives, the input method and the thread that
called it will be blocked.
It is also possible for an output method in a socket’s output
stream to block. This can happen if the program tries to output
data to the socket faster than the data can be transmitted over
the network. (It’s a little complicated: a socket uses a
“buffer” to hold data that is supposed to be transmitted over
the network. A buffer is just a block of memory that is
used like a queue. The output method drops its data
into the buffer; lower-level software removes data from
the buffer and transmits it over the network. If the buffer fills up,
the output method will block until space becomes available in the buffer. Note that when
the output method returns, it doesn’t mean that the data
has gone out over the network—it just means that the
data has gone into the buffer and is scheduled for later
transmission.)
We say that network communication uses blocking I/O,
because input and output operations on the network can block for
indefinite periods of time. Programs that use the network must
be prepared to deal with this blocking. In some cases, it’s
acceptable for a program to simply shut down all other processing
and wait for input. (This is what happens when a command line program
reads input typed by the user. User input is another type
of blocking I/O.) However, threads make it possible for some
parts of a program to continue doing useful work while other
parts are blocked. A network client program that sends requests
to a server might get by with a single thread, if it has nothing
else to do while waiting for the server’s responses.
A network server program, on the other hand, can typically
be connected to several clients at the same time. While waiting
for data to arrive from a client, the server certainly has other
things that it can do, namely communicate with other clients.
When a server uses different threads to handle the communication
with different clients, the fact that I/O with one client is
blocked won’t stop the server from communicating with other clients.
It’s important to understand that using threads to deal with
blocking I/O differs in a fundamental way from using threads to
speed up computation. When using threads for speed-up in
Subsection 12.3.2, it made sense to use one thread
for each available processor. If only one processor is available,
using more than one thread will yield no speed-up at all; in fact,
it would slow things down because of the extra overhead involved
in creating and managing the threads.
In the case of blocking I/O, on the other hand, it can make
sense to have many more threads than there are processors, since
at any given time many of the threads can be blocked. Only
the active, unblocked threads are competing for processing time.
In the ideal case, to keep all the processors busy, you would
want to have one active thread per processor (actually
somewhat less than that, on average, to allow for variations
over time in the number of active threads).
On a network server program, for example, threads generally
spend most of their time blocked waiting for I/O operations
to complete. If threads are blocked, say, about 90% of the time,
you’d like to have about ten times as many threads as there
are processors. So even on a computer that has just a single
processor, server programs can make good use of large numbers
of threads.
12.4.2 An Asynchronous Network Chat Program
As a first example of using threads for network communication,
we consider a GUI chat program.
The command-line chat programs, CLChatClient.java and
CLChatServer.java, from Subsection 11.4.5
use a straight-through, step-by-step protocol for communication.
After a user on one side of a connection enters a message,
the user must wait for a reply from the other side of the connection.
An asynchronous chat program would be much nicer. In such a program,
a user could just keep typing lines and sending messages without waiting
for any response. Messages that arrive—asynchronously—from the
other side would be displayed as soon as they arrive. It’s not easy to do this
in a command-line interface, but it’s a natural application for a graphical user
interface. The basic idea for a GUI chat program is to create a
thread whose job is to read messages that arrive from the other side
of the connection. As soon as the message arrives, it is displayed to
the user; then, the message-reading thread blocks until the next incoming
message arrives. While it is blocked, however, other threads can continue
to run. In particular, the event-handling thread that responds to
user actions keeps running; that thread can send outgoing messages
as soon as the user generates them.
The GUIChat program can act as either the client end or the server end
of a connection. (Refer back to Subsection 11.4.3 for information about how clients
and servers work.) The program has a “Listen on port” button that the user can click to create a server socket
that will listen for an incoming connection request; this makes the program act
as a server. It also has a “Connect” button that the user can click to send
a connection request; this makes the program act as a client. As usual, the server
listens on a specified port number. The client needs to know the computer on which the
server is running and the port on which the server is listening. There are
input boxes in the GUIChat window where the user can enter this
information.
Once a connection has been established between two GUIChat
windows, each user can send messages to the other. The window has an input box
where the user types a message. Pressing return
sends the message. This means that the sending of the message is handled by the
usual event-handling thread, in response to an event generated by a user action.
Messages are received by a separate thread that just sits around waiting for
incoming messages. This thread blocks while waiting for a message to arrive;
when a message does arrive, it displays that message to the user. The window
contains a large transcript area that displays both incoming and outgoing
messages, along with other information about the network connection.
I urge you to compile the source code, GUIChat.java, and
try the program. To try it on single computer,
you can run two copies of the program on that computer, and
make a connection between one program window and the other program window,
using “localhost” or “127.0.0.1” as the name of the computer. I also urge you to read the source code.
I will discuss only parts of it here.
The program uses a nested class, ConnectionHandler,
to handle most network-related tasks. ConnectionHandler
is a subclass of Thread. The ConnectionHandler
thread is responsible for opening the network connection and then for reading
incoming messages once the connection has been opened. By putting the connection-opening
code in a separate thread,
we make sure that the GUI is not blocked while the connection is being opened.
(Like reading incoming messages, opening a connection is a blocking operation
that can take some time to complete.) The ConnectionHandler
handles opening the connection both when the program acts as a server and
when it acts as a client. The thread is created
when the user clicks either the “Listen” button or the “Connect” button. The “Listen” button
makes the thread act as a server, while “Connect” makes it act as
a client. To distinguish these two cases, the ConnectionHandler
class has the two constructors that are shown below. Note that the postMessage() method posts
a message to the transcript area of the window, where it will be visible
to the user:
/**
* Listen for a connection on a specified port. The constructor
* does not perform any network operations; it just sets some
* instance variables and starts the thread. Note that the
* thread will only listen for one connection, and then will
* close its server socket.
*/
ConnectionHandler(int port) { // For acting as the "server."
state = ConnectionState.LISTENING;
this.port = port;
postMessage("\nLISTENING ON PORT " + port + "\n");
try { setDaemon(true); }
catch (Exception e) {}
start();
}
/**
* Open a connection to a specified computer and port. The constructor
* does not perform any network operations; it just sets some
* instance variables and starts the thread.
*/
ConnectionHandler(String remoteHost, int port) { // For acting as "client."
state = ConnectionState.CONNECTING;
this.remoteHost = remoteHost;
this.port = port;
postMessage("\nCONNECTING TO " + remoteHost + " ON PORT " + port + "\n");
try { setDaemon(true); }
catch (Exception e) {}
start();
}
Here, state is an instance variable whose type is defined by
an enumerated type:
enum ConnectionState { LISTENING, CONNECTING, CONNECTED, CLOSED };
The values of this enum represent different possible states
of the network connection. It is often useful to treat a network connection
as a state machine (see Subsection 6.3.6), since the response to various
events can depend on the state of the connection when the event occurs.
Setting the state variable to LISTENING or
CONNECTING tells the thread whether to act as a server or as a client
when setting up the connection.
Once the thread has been started, it executes the following run()
method:
/**
* The run() method that is executed by the thread. It opens a
* connection as a client or as a server (depending on which
* constructor was used).
*/
public void run() {
try {
if (state == ConnectionState.LISTENING) {
// Open a connection as a server.
listener = new ServerSocket(port);
socket = listener.accept();
listener.close();
}
else if (state == ConnectionState.CONNECTING) {
// Open a connection as a client.
socket = new Socket(remoteHost,port);
}
connectionOpened(); // Sets up to use the connection (including
// creating a BufferedReader, in, for reading
// incoming messages).
while (state == ConnectionState.CONNECTED) {
// Read one line of text from the other side of
// the connection, and report it to the user.
String input = in.readLine();
if (input == null)
connectionClosedFromOtherSide(); // Close socket and report to user.
else
received(input); // Report message to user.
}
}
catch (Exception e) {
// An error occurred. Report it to the user, but not
// if the connection has been closed (since the error
// might be the expected error that is generated when
// a socket is closed).
if (state != ConnectionState.CLOSED)
postMessage("\n\n ERROR: " + e);
}
finally { // Clean up before terminating the thread.
cleanUp();
}
}
This method calls several other methods to do some of its work, but
you can see the general outline of how it works. After opening the
connection as either a server or client, the run() method enters
a while loop in which it receives and processes messages from
the other side of the connection until the connection is closed. It is
important to understand how the connection can be closed. The GUIChat
window has a “Disconnect” button that the user can click to close the connection.
The program responds to this event by closing the socket that represents
the connection and by setting the connection state to CLOSED.
It is likely that when this happens, the connection-handling
thread is blocked in the in.readLine() method, waiting for an incoming
message. When the socket is closed by the GUI thread, this method will fail and
will throw an exception; this exception causes the thread to terminate.
(If the connection-handling thread happens to be between calls to in.readLine()
when the socket is closed, the while loop will terminate because
the connection state changes from CONNECTED to CLOSED.)
Note that closing the window will also close the connection in the same way.
It is also possible for the user on the other side of the connection to close
the connection. When that happens, the stream of incoming messages ends,
and the in.readLine() on this side of the connection
returns the value null, which indicates end-of-stream and acts
as a signal that the connection has been closed by the remote user.
For a final look into the GUIChat code, consider the methods
that send and receive messages. These methods are called from different
threads. The send() method is called by the event-handling
thread in response to a user action. Its purpose is to transmit a
message to the remote user. (It is conceivable, though not likely, that
the data output operation could block, if the socket’s output buffer fills up. A more
sophisticated program might take this possibility into account by using another
thread to transmit outgoing messages.)
The send() method uses a PrintWriter,
out, that writes to the socket’s output stream.
Synchronization of this method prevents
the connection state from changing in the middle of the send operation:
/**
* Send a message to the other side of the connection, and post the
* message to the transcript. This should only be called when the
* connection state is ConnectionState.CONNECTED; if it is called at
* other times, it is ignored.
*/
synchronized void send(String message) {
if (state == ConnectionState.CONNECTED) {
postMessage("SEND: " + message);
out.println(message);
out.flush();
if (out.checkError()) {
postMessage("\nERROR OCCURRED WHILE TRYING TO SEND DATA.");
close(); // Closes the connection.
}
}
}
The received() method is called by the connection-handling
thread after a message
has been read from the remote user. Its only job is to display the
message to the user, but again it is synchronized to avoid the race
condition that could occur if the connection state were changed by
another thread while this method is being executed:
/**
* This is called by the run() method when a message is received from
* the other side of the connection. The message is posted to the
* transcript, but only if the connection state is CONNECTED. (This
* is because a message might be received after the user has clicked
* the "Disconnect" button; that message should not be seen by the
* user.)
*/
synchronized private void received(String message) {
if (state == ConnectionState.CONNECTED)
postMessage("RECEIVE: " + message);
}
12.4.3 A Threaded Network Server
Threads are often used in network server programs. They
allow the server to deal with several clients at the same time. When a
client can stay connected for an extended period of time, other clients
shouldn’t have to wait for service.
Even if the interaction with each client is expected to be very brief,
you can’t always assume that that will be the case. You have to
allow for the possibility of a misbehaving client—one
that stays connected without sending data that the server expects.
This can hang up a thread indefinitely, but in a threaded server there
will be other threads that can carry on with other clients.
The DateServer.java sample program, from
Subsection 11.4.4, is an extremely simple network server program.
It does not use threads, so the server must finish with one client
before it can accept a connection from another client. Let’s see
how we can turn DateServer into a threaded server.
(This server is so simple that doing so doesn’t make a great deal
of sense. However, the same techniques will work for more complicated
servers. See, for example, Exercise 12.5. Also note that
the client program, DateClient.java,
which implements a client for the date server, does not need to use threads,
since the client only uses one connection. The original client program
will work with the new versions of the server.)
As a first attempt, consider DateServerWithThreads.java.
This sample program creates a new thread every time a connection request
is received, instead of handling the connection itself by calling a subroutine.
The main program simply creates the thread and hands the
connection to the thread. This takes very little time, and in particular
will not block. The run() method of the thread handles the
connection in exactly the same way that it would be handled by the
original program. This is not at all difficult to program. Here’s
the new version of the program, with significant changes shown in red italic.
Note again that the constructor for the connection thread does very little and in particular
cannot block; this is very important since the constructor runs in the main thread:
import java.net.*; import java.io.*; import java.util.Date; /** * This program is a server that takes connection requests on * the port specified by the constant LISTENING_PORT. When a * connection is opened, the program sends the current time to * the connected socket. The program will continue to receive * and process connections until it is killed (by a CONTROL-C, * for example). * * This version of the program creates a new thread for * every connection request. */ public class DateServerWithThreads { public static final int LISTENING_PORT = 32007; public static void main(String[] args) { ServerSocket listener; // Listens for incoming connections. Socket connection; // For communication with the connecting program. /* Accept and process connections forever, or until some error occurs. */ try { listener = new ServerSocket(LISTENING_PORT); System.out.println("Listening on port " + LISTENING_PORT); while (true) { // Accept next connection request and create a thread to handle it. connection = listener.accept(); ConnectionHandler handler = new ConnectionHandler(connection); handler.start(); } } catch (Exception e) { System.out.println("Sorry, the server has shut down."); System.out.println("Error: " + e); return; } } // end main() /** * Defines a thread that handles the connection with one * client. */ private static class ConnectionHandler extends Thread { Socket client; // The connection to the client. ConnectionHandler(Socket socket) { client = socket; } public void run() { // (code copied from the original DateServer program) String clientAddress = client.getInetAddress().toString(); try { System.out.println("Connection from " + clientAddress ); Date now = new Date(); // The current date and time. PrintWriter outgoing; // Stream for sending data. outgoing = new PrintWriter( client.getOutputStream() ); outgoing.println( now.toString() ); outgoing.flush(); // Make sure the data is actually sent! client.close(); } catch (Exception e){ System.out.println("Error on connection with: " + clientAddress + ": " + e); } } } } //end class DateServerWithThreads
One interesting change is at the end of the run() method, where
I’ve added the clientAddress to the output of the error message.
I did this to identify which connection the error message refers to.
Since threads run in parallel, it’s possible for outputs from different
threads to be intermingled in various orders. Messages from
the same thread don’t necessarily come together in the output; they might
be separated by messages from other threads. This is just one of the
complications that you have to keep in mind when working with threads!
12.4.4 Using a Thread Pool
It’s not very efficient to create a new thread for every
connection, especially when the connections are typically very
short-lived. Fortunately, we have an alternative: thread
pools (Subsection 12.3.2).
DateServerWithThreadPool.java is an improved
version of our server that uses a thread pool. Each thread in the
pool runs in an infinite loop. Each time through the loop, it handles
one connection. We need a way for the main program to send
connections to the threads. It’s natural to use a blocking
queue (Subsection 12.3.3) named connectionQueue for that purpose.
A connection-handling thread takes connections from this queue.
Since it is a blocking queue, the thread blocks when the queue is
empty and wakes up when a connection becomes available in the
queue. No other synchronization or communication technique is
needed; it’s all built into the blocking queue. Here is
the run() method for the connection-handling threads:
public void run() {
while (true) {
Socket client;
try {
client = connectionQueue.take(); // Blocks until item is available.
}
catch (InterruptedException e) {
continue; // (If interrupted, just go back to start of while loop.)
}
String clientAddress = client.getInetAddress().toString();
try {
System.out.println("Connection from " + clientAddress );
System.out.println("Handled by thread " + this);
Date now = new Date(); // The current date and time.
PrintWriter outgoing; // Stream for sending data.
outgoing = new PrintWriter( client.getOutputStream() );
outgoing.println( now.toString() );
outgoing.flush(); // Make sure the data is actually sent!
client.close();
}
catch (Exception e){
System.out.println("Error on connection with: "
+ clientAddress + ": " + e);
}
}
}
The main program, in the meantime, runs in an infinite loop in which
connections are accepted and added to the queue:
while (true) {
// Accept next connection request and put it in the queue.
connection = listener.accept();
try {
connectionQueue.put(connection); // Blocks if queue is full.
}
catch (InterruptedException e) {
}
}
The queue in this program is of type ArrayBlockingQueue<Socket>.
As such, it has a limited capacity, and the put() operation on the
queue will block if the queue is full. But wait—didn’t we want to avoid blocking
the main program? When the main program is blocked, the server is no longer accepting
connections, and clients who are trying to connect are kept waiting.
Would it be better to use a LinkedBlockingQueue, with
an unlimited capacity?
In fact, connections in the blocking queue are waiting anyway; they are not being
serviced. If the queue grows unreasonably long, connections in the queue will
have to wait for an unreasonable amount of time. If the queue keeps growing
indefinitely, that just means that the server is receiving connection requests
faster than it can process them. That could happen for several reasons:
Your server might simply not be powerful enough to handle the volume of traffic
that you are getting; you need to buy a new server. Or perhaps the thread pool
doesn’t have enough threads to fully utilize your server; you should
increase the size of the thread pool to match the server’s capabilities.
Or maybe your server is under a “Denial Of Service” attack, in which some bad guy
is deliberately sending your server more requests than it can handle in an attempt
to keep other, legitimate clients from getting service.
In any case, ArrayBlockingQueue with limited capacity
is the correct choice. The queue should be short enough so that connections
in the queue will not have to wait too long for service.
In a real server, the size of the queue and the number
of threads in the thread pool should be adjusted to “tune” the server to account
for the particular hardware and network on which the server is running and for
the nature of the client requests that it typically processes. Optimal tuning is,
in general, a difficult problem.
There is, by the way, another way that things can go wrong:
Suppose that the server needs to read some data from the client, but
the client doesn’t send any data. The thread that is trying
to read the data can then block indefinitely, waiting for the input. If a thread pool is
being used, this could happen to every thread in the pool. In that
case, no further processing can ever take place! The solution to
this problem is to have connections “time out” if they are inactive for
an excessive period of time. Typically, each connection thread will
keep track of the time when it last received data from the client.
The server runs another thread (sometimes called a “reaper thread”,
after the Grim Reaper) that wakes up periodically and checks each
connection thread to see how long it has been inactive. A connection
thread that has been waiting too long for input is terminated, and
a new thread is started in its place. The question of how long
the timeout period should be is another difficult tuning issue.
12.4.5 Distributed Computing
We have seen how threads can be used to
do parallel processing, where a number of processors work together to complete
some task. So far, we have assumed that all the processors were
inside one multi-processor computer. But parallel processing can also
be done using processors that are in different computers, as long as those
computers are connected to a network over which they can communicate.
This type of parallel processing—in which a number of computers
work together on a task and communicate over a network—is called
distributed computing.
In some sense, the whole Internet is an immense distributed computation,
but here I am interested in how computers on a network can cooperate to
solve some computational problem. There are several approaches that can be used
for distributed computing in Java. One general technique that is a
a standard part of Java is RMI (Remote Method Invocation).
RMI enables a program running
on one computer to call methods in objects that exist on other computers.
This makes it possible to design an object-oriented program in which
different parts of the program are executed on different computers.
As is commonly
the case in networking, there is the problem of locating services (where
in this case, a “service” means an object that is available to be called
over the network). That is, how can one computer know which computer a service is located
on and what port it is listening on? RMI solves this problem
using a “request broker”—a
server program running at a known location that keeps a list of services that are
available on other computers. Computers that offer services register
those services with the request broker; computers that need services must know
the location of the broker, and they contact it
to find out what services are available and where they are located.
RMI is a complex system that is not very easy to use. I mention
it here because they are part of Java’s standard network API, but I will not discuss
it further. Instead, we will look at a relatively simple demonstration of
distributed computing that uses only basic networking.
The problem that we will consider is the same one that was used
in Section 12.2 and Section 12.3
for MultiprocessingDemo1.java and its variations, namely the
computation of a complex image. In this case, however, the program is not
a GUI program and the image is not shown on the screen. The computation is one that
uses the simplest type of parallel programming, in which
the problem can be broken down into tasks that can be performed independently,
with no communication between the tasks. To apply distributed computing to
this type of problem, we can use one “master” program that divides the problem
into tasks and sends those tasks over the network to “worker” programs
that do the actual work. The worker programs send their results back to
the master program, which combines the results from all the tasks into
a solution of the overall problem. In this context, the worker programs
are often called “slaves,” and the program uses the so-called
master/slave approach to distributed computing.
The demonstration program is defined by three source code files:
CLMandelbrotMaster.java defines the master program;
CLMandelbrotWorker.java defines the worker programs;
and CLMandelbrotTask.java defines the class that represents
the individual tasks that are performed by the workers. The master divides
the overall problem into a collection of tasks; it distributes
those tasks to the workers that will execute the tasks and send the
results back to the master; and the master applies the results from all
the individual tasks to the overall problem.
To run the
demonstration, you must first start the CLMandelbrotWorker
program on several computers (probably by running it on the command line).
This program uses CLMandelbrotTask,
so both class files, CLMandelbrotWorker.class
and CLMandelbrotTask.class, must be present on the worker
computers. You can then run CLMandelbrotMaster on
the master computer. Note that the master program also requires the
class CLMandelbrotTask.
You must specify the host name or IP address
of each of the worker computers as command line arguments for
CLMandelbrotMaster. The worker programs listen for
connection requests from the master program, and the master
program must be told where to send those requests.
For example, if the worker
program is running on three computers with IP addresses
172.21.7.101, 172.21.7.102, and 172.21.7.103, then you
can run CLMandelbrotMaster with the command
java CLMandelbrotMaster 172.21.7.101 172.21.7.102 172.21.7.103
The master will make a network connection to the worker at each IP
address; these connections will be used for communication between the master
program and the workers.
It is possible to run several copies of CLMandelbrotWorker on
the same computer, but they must listen for network connections on different
ports. It is also possible to run CLMandelbrotWorker on
the same computer as CLMandelbrotMaster. You might even see some
speed-up when you do this, if your computer has several processors. See the
comments in the program source code files for more information, but here are
some commands that you can use to run the master program and two copies of the worker
program on the same computer. Give these commands in separate command windows:
java CLMandelbrotWorker (Listens on default port) java CLMandelbrotWorker 2501 (Listens on port 2501) java CLMandelbrotMaster localhost localhost:2501
Every time CLMandelbrotMaster is run, it solves exactly
the same problem. (For this demonstration, the nature of the problem is not
important, but the problem is to compute the data needed for a picture
of a small piece of the famous “Mandelbrot Set.” If you are interested in
seeing the picture that is produced, uncomment the call to the saveImage()
method at the end of the main() routine in CLMandelbrotMaster.java.)
You can run CLMandelbrotMaster with different numbers of worker programs
to see how the time required to solve the problem depends on the number of workers.
(Note that the worker programs continue to run after the master program exits,
so you can run the master program several times without having to restart the workers.)
In addition, if you run CLMandelbrotMaster with no command line
arguments, it will solve the entire problem on its own, so you can see how long
it takes to do so without using distributed computing. In a trial that I
ran on some very old, slow computers, it took 40 seconds for CLMandelbrotMaster to solve the
problem on its own. Using just one worker, it took 43 seconds. The extra
time represents extra work involved in using the network; it takes time to
set up a network connection and to send messages over the network. Using
two workers (on different computers), the problem was solved in 22 seconds. In this case, each worker
did about half of the work, and their computations were performed in parallel,
so that the job was done in about half the time. With larger numbers of
workers, the time continued to decrease, but only up to a point. The master
program itself has a certain amount of work to do, no matter how many
workers there are, and the total time to solve the problem can never be
less than the time it takes for the master program to do its part. In this
case, the minimum time seemed to be about five seconds.
Let’s take a look at how this distributed application is programmed.
The master program divides the overall problem into a set of tasks.
Each task is represented by an object of type CLMandelbrotTask.
These tasks have to be communicated to the worker programs, and the worker
programs must send back their results. Some protocol is needed for this
communication. I decided to use character streams. The master
encodes a task as a line of text, which is sent to a worker. The worker
decodes the text (into an object of type CLMandelbrotTask)
to find out what task it is supposed to perform.
It performs the assigned task. It encodes the results as another line of
text, which it sends back to the master program. Finally, the master decodes
the results and combines them with the results from other tasks. After
all the tasks have been completed and their results have been combined,
the problem has been solved.
A CLMandelbrotWorker receives
not just one task, but a sequence of tasks. Each time it finishes a task and
sends back the result, it is assigned a new task. After all tasks are completed,
the worker receives a “close” command that tells it to close the connection.
In CLMandelbrotWorker.java, all this is done in
a method named handleConnection() that is called to handle
a connection that has already been opened to the master program. It uses
a method readTask() to decode a task that it receives from
the master and a method writeResults() to encode the results
of the task for transmission back to the master. It must also handle
any errors that occur:
private static void handleConnection(Socket connection) {
try {
BufferedReader in = new BufferedReader(
new InputStreamReader( connection.getInputStream()) );
PrintWriter out = new PrintWriter(connection.getOutputStream());
while (true) {
String line = in.readLine(); // Message from the master.
if (line == null) {
// End-of-stream encountered -- should not happen.
throw new Exception("Connection closed unexpectedly.");
}
if (line.startsWith(CLOSE_CONNECTION_COMMAND)) {
// Represents the normal termination of the connection.
System.out.println("Received close command.");
break;
}
else if (line.startsWith(TASK_COMMAND)) {
// Represents a CLMandelbrotTask that this worker is
// supposed to perform.
CLMandelbrotTask task = readTask(line); // Decode the message.
task.compute(); // Perform the task.
out.println(writeResults(task)); // Send back the results.
out.flush(); // Make sure data is sent promptly!
}
else {
// No other messages are part of the protocol.
throw new Exception("Illegal command received.");
}
}
}
catch (Exception e) {
System.out.println("Client connection closed with error " + e);
}
finally {
try {
connection.close(); // Make sure the socket is closed.
}
catch (Exception e) {
}
}
}
Note that this method is not executed in a separate thread. The
worker has only one thing to do at a time and does not need to be multithreaded.
Turning to the master program, CLMandelbrotMaster.java,
we encounter a more complex situation. The master program must communicate
with several workers over several network connections. To accomplish this,
the master program is multi-threaded, with one thread to manage communication
with each worker. A pseudocode outline of the main() routine
is quite simple:
create the tasks that must be performed and add them to a queue
if there are no command line arguments {
// The master program does all the tasks itself.
Remove each task from the queue and perform it.
}
else {
// The tasks will be performed by worker programs.
for each command line argument:
Get information about a worker from command line argument.
Create and start a thread to send tasks to workers.
Wait for all threads to terminate.
}
// All tasks are now complete (assuming no error occurred).
The tasks are put into a variable of type
ConcurrentBlockingQueue<CLMandelbrotTask> named tasks
(see Subsection 12.3.2.) The
communication threads take tasks from this queue and send them
to worker programs. The method tasks.poll() is used to remove
a task from the queue. If the queue is empty, it returns
null, which acts as a signal that all tasks have
been assigned and the communication thread can terminate.
The job of a thread is to send a sequence of tasks to a worker
thread and to receive the results that the worker sends back. The thread
is also responsible for opening the connection in the first place. A
pseudocode outline for the process executed by the thread might
look like:
Create a socket connected to the worker program.
Create input and output streams for communicating with the worker.
while (true) {
Let task = tasks.poll().
If task == null
break; // All tasks have been assigned.
Encode the task into a message and transmit it to the worker.
Read the response from the worker.
Decode and process the response.
}
Send a "close" command to the worker.
Close the socket.
This would work OK. However, there are a few subtle points. First of
all, the thread must be ready to deal with a network error. For example,
a worker might shut down unexpectedly. But if that happens, the master
program can continue, provided other workers are still available.
(You can try this when you run the program: Stop one of the
worker programs, with CONTROL-C, and observe that the
master program still completes successfully.) A difficulty arises
if an error occurs while the thread is working on a task: If the
problem as a whole is going to be completed, that task will have
to be reassigned to another worker. I take care of this by putting
the uncompleted task back into the task list. (Unfortunately, my
program does not handle all possible errors. If the last worker thread
fails, there will be no one left to take over the uncompleted task.
Also, if a network connection
“hangs” indefinitely without actually generating an error, my program
will also hang, waiting for a response from a worker that will never
arrive. A more robust program would have some way of detecting
the problem and reassigning the task.)
Another defect in the procedure outlined above is that it leaves
the worker program idle while the thread in the master program is processing the
worker’s response. It would be nice to get a new task to the
worker before processing the response from the previous task.
This would keep the worker busy and allow two operations to
proceed simultaneously instead of sequentially. (In this example,
the time it takes to process a response is so short that keeping
the worker waiting while it is done probably makes no significant
difference. But as a general principle, it’s desirable to have
as much parallelism as possible in the algorithm.) We can modify
the procedure to take this into account:
try {
Create a socket connected to the worker program.
Create input and output streams for communicating with the worker.
Let currentTask = tasks.poll()
if (currentTask != null)
Encode currentTask into a message and send it to the worker.
while (currentTask != null) {
Read the response from the worker.
Let nextTask = tasks.poll().
If nextTask != null {
// Send nextTask to the worker before processing the
// response to currentTask.
Encode nextTask into a message and send it to the worker.
}
Decode and process the response to currentTask.
currentTask = nextTask.
}
Send a "close" command to the worker.
Close the socket.
}
catch (Exception e) {
Put uncompleted task, if any, back into the task queue.
}
finally {
Close the connection.
}
To see how this all translates into Java, check out the
WorkerConnection nested class in
CLMandelbrotMaster.java