Java Concurrency Tutorial on Java Concurrency ForkJoin framework

the fork-join framework allows to break a certain task on several workers and then wait for the result to combine them. it leverages multi-processor machine's capacity to great extent. following are the core concepts and objects used in fork-join framework.

fork

fork is a process in which a task splits itself into smaller and independent sub-tasks which can be executed concurrently.

syntax

sum left  = new sum(array, low, mid);
left.fork();

here sum is a subclass of recursivetask and left.fork() spilts the task into sub-tasks.

join

join is a process in which a task join all the results of sub-tasks once the subtasks have finished executing, otherwise it keeps waiting.

syntax

left.join();

here left is an object of sum class.

forkjoinpool

it is a special thread pool designed to work with fork-and-join task splitting.

syntax

forkjoinpool forkjoinpool = new forkjoinpool(4);

here a new forkjoinpool with a parallelism level of 4 cpus.

recursiveaction

recursiveaction represents a task which does not return any value.

syntax

class writer extends recursiveaction {
   @override
   protected void compute() { }
}

recursivetask

recursivetask represents a task which returns a value.

syntax

class sum extends recursivetask<long> {
   @override
   protected long compute() { return null; }
}

example

the following testthread program shows usage of fork-join framework in thread based environment.

import java.util.concurrent.executionexception;
import java.util.concurrent.forkjoinpool;
import java.util.concurrent.recursivetask;

public class testthread {

   public static void main(final string[] arguments) throws interruptedexception, 
      executionexception {
      
      int nthreads = runtime.getruntime().availableprocessors();
      system.out.println(nthreads);
      
      int[] numbers = new int[1000]; 

      for(int i = 0; i < numbers.length; i++) {
         numbers[i] = i;
      }

      forkjoinpool forkjoinpool = new forkjoinpool(nthreads);
      long result = forkjoinpool.invoke(new sum(numbers,0,numbers.length));
      system.out.println(result);
   }  

   static class sum extends recursivetask<long> {
      int low;
      int high;
      int[] array;

      sum(int[] array, int low, int high) {
         this.array = array;
         this.low   = low;
         this.high  = high;
      }

      protected long compute() {
         
         if(high - low <= 10) {
            long sum = 0;
            
            for(int i = low; i < high; ++i) 
               sum += array[i];
               return sum;
         } else {	    	
            int mid = low + (high - low) / 2;
            sum left  = new sum(array, low, mid);
            sum right = new sum(array, mid, high);
            left.fork();
            long rightresult = right.compute();
            long leftresult  = left.join();
            return leftresult + rightresult;
         }
      }
   }
}

this will produce the following result.

output

32
499500