In this series we will discuss different concurrent structures provided in the new Java 5 Concurrent package. We will start with a simple parallel processing concept called "barrier" that we have all studied and forgotten. By definition if there are set of independent tasks (T1..Tn) that need to fully completed to initiate another task (Tx) which is dependent on all these tasks (T1...Tn) we can synchronize them using a barrier.
why is the barrier not more commonly used?
The functionality is simple enough that it can be accomplished with the low-level tools provided by Java. We can solve the coordination problem in two ways, without using a barrier.
First, we can simply have the threads wait on a condition variable. The last thread releases the barrier by notifying all of the other threads.
A second option is to simply await termination of the threads by using the join() method. Once all threads have been joined, we can start new threads for the next phase of the program
However, in some cases it is preferable to use barriers. When using the join() method, threads are exiting and we're starting new ones. Therefore, the threads lose any state that they have stored in their previous thread object; they need to store that state prior to terminating. Furthermore, if we must always create new threads, logical operations cannot be placed together; since new threads have to be created for each subtask, the code for each subtask must be placed in separate run() methods. It may be easier to code all of the logic as one method, particularly if the subtasks are very small.
CyclicBarrier – a pair of threads computes the sum of all numbers in a
matrix by iterating over neighbour rows. Imagine you have a matrix (with an
even number of columns). You want to sum all its elements using 2 threads. The
simple way is to run 2 threads from inside main () to process rows 0 and 1, wait
(using busy-wait) until computation is complete; sum the resulting values; then
run 2 threads on rows 2 and 3, wait, sum and so on. This is demonstrated in
cyclsimple example
Below is the example:
package Barrier;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierExample
{
private static int matrix[][] =
{
{ 1 },
{ 2, 2 },
{ 3, 3, 3 },
{ 4, 4, 4, 4 },
{ 5, 5, 5, 5, 5 } };
private static int results[];
private static class Summer extends Thread
{
int row;
CyclicBarrier barrier;
Summer(CyclicBarrier barrier, int row)
{
this.barrier = barrier;
this.row = row;
}
public void run()
{
int columns = matrix[row].length;
int sum = 0;
for (int i = 0; i < columns; i++)
{
sum += matrix[row][i];
}
results[row] = sum;
System.out.println("Results for row " + row + " are : " + sum);
// wait for others
try
{
barrier.await();
} catch (InterruptedException ex)
{
ex.printStackTrace();
} catch (BrokenBarrierException ex)
{
ex.printStackTrace();
}
}
}
public static void main(String args[])
{
final int rows = matrix.length;
results = new int[rows];
Runnable merger = new Runnable()
{
public void run()
{
int sum = 0;
for (int i = 0; i < rows; i++)
{
sum += results[i];
}
System.out.println("Results are: " + sum);
}
};
/*
* public CyclicBarrier(int parties,Runnable barrierAction)
* Creates a new CyclicBarrier that will trip when the given number
* of parties (threads) are waiting upon it, and which will execute
* the merger task when the barrier is tripped, performed
* by the last thread entering the barrier.
*/
CyclicBarrier barrier = new CyclicBarrier(rows, merger);
for (int i = 0; i < rows; i++)
{
new Summer(barrier, i).start();
}
System.out.println("Waiting...");
}
}
output :
To access the source code:
https://bitbucket.org/nkancharla/javaexamples/src/bf0dcc13573e0782807788b99e934b64f592dd41/Barrier?at=master
why is the barrier not more commonly used?
The functionality is simple enough that it can be accomplished with the low-level tools provided by Java. We can solve the coordination problem in two ways, without using a barrier.
First, we can simply have the threads wait on a condition variable. The last thread releases the barrier by notifying all of the other threads.
A second option is to simply await termination of the threads by using the join() method. Once all threads have been joined, we can start new threads for the next phase of the program
However, in some cases it is preferable to use barriers. When using the join() method, threads are exiting and we're starting new ones. Therefore, the threads lose any state that they have stored in their previous thread object; they need to store that state prior to terminating. Furthermore, if we must always create new threads, logical operations cannot be placed together; since new threads have to be created for each subtask, the code for each subtask must be placed in separate run() methods. It may be easier to code all of the logic as one method, particularly if the subtasks are very small.
CyclicBarrier – a pair of threads computes the sum of all numbers in a
matrix by iterating over neighbour rows. Imagine you have a matrix (with an
even number of columns). You want to sum all its elements using 2 threads. The
simple way is to run 2 threads from inside main () to process rows 0 and 1, wait
(using busy-wait) until computation is complete; sum the resulting values; then
run 2 threads on rows 2 and 3, wait, sum and so on. This is demonstrated in
cyclsimple example
Below is the example:
package Barrier;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierExample
{
private static int matrix[][] =
{
{ 1 },
{ 2, 2 },
{ 3, 3, 3 },
{ 4, 4, 4, 4 },
{ 5, 5, 5, 5, 5 } };
private static int results[];
private static class Summer extends Thread
{
int row;
CyclicBarrier barrier;
Summer(CyclicBarrier barrier, int row)
{
this.barrier = barrier;
this.row = row;
}
public void run()
{
int columns = matrix[row].length;
int sum = 0;
for (int i = 0; i < columns; i++)
{
sum += matrix[row][i];
}
results[row] = sum;
System.out.println("Results for row " + row + " are : " + sum);
// wait for others
try
{
barrier.await();
} catch (InterruptedException ex)
{
ex.printStackTrace();
} catch (BrokenBarrierException ex)
{
ex.printStackTrace();
}
}
}
public static void main(String args[])
{
final int rows = matrix.length;
results = new int[rows];
Runnable merger = new Runnable()
{
public void run()
{
int sum = 0;
for (int i = 0; i < rows; i++)
{
sum += results[i];
}
System.out.println("Results are: " + sum);
}
};
/*
* public CyclicBarrier(int parties,Runnable barrierAction)
* Creates a new CyclicBarrier that will trip when the given number
* of parties (threads) are waiting upon it, and which will execute
* the merger task when the barrier is tripped, performed
* by the last thread entering the barrier.
*/
CyclicBarrier barrier = new CyclicBarrier(rows, merger);
for (int i = 0; i < rows; i++)
{
new Summer(barrier, i).start();
}
System.out.println("Waiting...");
}
}
output :
Results for row 0 are : 1
Waiting...
Results for row 2 are : 9
Results for row 4 are : 25
Results for row 3 are : 16
Results for row 1 are : 4
Results are: 55To access the source code:
https://bitbucket.org/nkancharla/javaexamples/src/bf0dcc13573e0782807788b99e934b64f592dd41/Barrier?at=master