Java Parallel Streams

In this post, we will look at the Java parallel streams. Java 8 introduced the concept of a parallel stream to parallel processing.  It helps us leverage multi-core machines.

Java Parallel Streams – introduction:

We use parallelism in programming languages to leverage multi-core machines. In simple words, it divides one problem into subproblems, solves each problem separately, parallel and joins back the results to get the result. Java stream API provides a reliable and easy way to achieve parallelism. These stream APIs are called parallel streams. In this post, we will learn how the java parallel stream works.

1. Parallel vs Sequential Stream.

Following are the two main differences between sequential and parallel streams:

  1. Sequential stream executes a task in a single core i.e. it runs it in one go like a `for loop`. But parallel stream divides one task into multiple sub-tasks, uses different cores (if available) to execute all tasks parallelly and joins the results of each sub-tasks to get the result.
  2. Different methods are used to generate sequential and parallel streams. Collection.stream() method generates one sequential stream and Collection.parallelStream() generates one parallel stream. Similarly, the BaseStream class provides a sequential() method to get one sequential stream and parallel() method to get one parallel stream.

If during a parallel execution, the number of remaining sub-tasks is more than the available cores, it will queue these sub-tasks. These sub-tasks will wait for the current tasks to finish. Once any of the running sub-task is completed, it will pick new sub-tasks from the queued list. Both sequential and parallel streams behave differently, but they are easy to create. Note that running a stream parallelly doesn’t mean that it is better than sequential always. It depends on the problem you are trying to solve.

2. Different examples of parallel streams.

2.1. Example using BaseStream.parallel() method:

We have different methods to generate parallel streams. .parallel() method of BaseStream class is an easy way to do that. Let’s look at the below example:

>import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.stream.Stream;

class Main {
    public static void main(String[] args) throws IOException {

        File file = new File("/Users/user/IdeaProjects/SampleProgram/src/readme.md");
        Stream < String > lines = Files.lines(file.toPath());
        lines.parallel().forEach(System.out::println);
    }
}

In this example, we are reading the lines of a markdown file. We are using forEach method to print the lines one by one. But before calling forEach, we are using the parallel() method to get one parallel stream from that stream of lines. If you execute the program, it will print the lines in a random order. Change the path of the file before executing this program.

This is because the parallel processing divides the stream into different sub-streams and processes them parallelly. If you remove the parallel() call, it will execute it’s sequentially and print the lines in sequence.

1.2. Example using Collection.parallelStream() method:

Java Collection provides one method called parallelStream(). We can use this with a collection to get one parallel stream from a collection. For example:

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.List;

class Main {
    public static void main(String[] args) throws IOException {

        File file = new File("/Users/user/IdeaProjects/SampleProgram/src/readme.md");

        List < String > lines = Files.readAllLines(file.toPath());
        lines.parallelStream().forEach(System.out::println);
    }
}

This is like the above example. We are reading the lines of a file. The only difference is that we are using readAllLines to read the lines to a list of strings and finally we are using the parallelStream method to get one parallel stream from that list.

It will print one similar output as the above example, i.e. it will print the lines of that file in random order.

1.3. Example with custom class objects:

Let’s try to use a parallel stream with a custom class:

import java.util.ArrayList;
import java.util.List;

class Student {
    int age;
    String name;

    Student(int age, String name) {
        this.age = age;
        this.name = name;
    }
}

public class ParallelStreamCustom {
    public static void main(String[] args) {
        List < Student > studentList = new ArrayList < > ();
        studentList.add(new Student(20, "Alex"));
        studentList.add(new Student(21, "Bob"));
        studentList.add(new Student(22, "Chandler"));
        studentList.add(new Student(19, "Daisy"));

        studentList.parallelStream().forEach(s - > System.out.println(s.name));
    }
}

In this example, we have created one class Student with two properties: age and name. age is of integer type, and the name is of String type. The constructor of this class takes the values of age and name and generates one object of this class.

Inside the main method, one new ArrayList studentList is created with four different Student objects. If you execute this program, it will print the name of the items in a random order like below:

Chandler
Daisy
Bob
Alex

2. Parallel Stream Performance:

Java introduced the parallel stream to improve the performance of an application using parallelism. On a multi-core system, it can improve the performance of execution, but it is not always a wise choice to move to ‘stream’.

Let’s do a micro-benchmark of traditional  loop, sequential stream and Java parallel stream. Benchmarking is difficult and error prone. The process we are using below will not give us an accurate execution time, but we will get some relative values. Every time you run this code, you will get a different result. (Use the performance tools if you are interested in more accurate results)

Here, we are trying to find out the maximum value in a list of 8000000 numbers. We fill this list with random numbers in 100000 bounds. We are using one for loop, one sequential stream and one parallel stream to find out the maximum value. Also, we are using System.currentTimeMillis() to find out the start and end time before and after one operation.

import java.util.ArrayList;
import java.util.List;
import java.util.Random;

public class StreamPerformance {

    public static void main(String[] args) {
        List < Integer > intList = new ArrayList();
        Random r = new Random();

        for (int i = 0; i < 8000000; i++) {
            intList.add(r.nextInt(100000));
        }

        int maxValue = Integer.MIN_VALUE;

        long startTime = System.currentTimeMillis();
        for (Integer integer: intList) {
            if (integer > maxValue) {
                maxValue = integer;
            }
        }
        long endTime = System.currentTimeMillis();

        System.out.println("for loop : max value = " + maxValue + ", time(ms): " + (endTime - startTime));

        long streamMaxValue;
        long startTimeStream = System.currentTimeMillis();
        streamMaxValue = intList.stream().reduce(Integer.MIN_VALUE, Math::max);
        long endTimeStream = System.currentTimeMillis();

        System.out.println("stream : max value = " + streamMaxValue + ", time(ms): " + (endTimeStream - startTimeStream));

        long pStreamMaxValue;
        long startTimePStream = System.currentTimeMillis();
        pStreamMaxValue = intList.parallelStream().reduce(Integer.MIN_VALUE, Math::max);
        long endTimePStream = System.currentTimeMillis();

        System.out.println("Parallel stream : sum = " + pStreamMaxValue + ", time(ms): " + (endTimePStream - startTimePStream));
    }
}

Executing the program will print output like below:

for loop : max value = 99999, time(ms): 24
stream : max value = 99999, time(ms): 186
Parallel stream : sum = 99999, time(ms): 71

The execution of a parallel stream is faster than the sequential stream. But both are slower than the ‘for loop‘. This result also depends on the hardware you are running.

3. When not to use a parallel stream:

It looks like using a parallel stream in Java improves the performance significantly, but there is a downfall. It depends on different factors like data source, how easy to split the source (that is required for parallel stream) etc.

The main problem with a parallel stream is that it uses JVM’s common fork-join thread pool: ForkJoinPool.common(). And if we execute long-running tasks on a parallel stream, there is a high probability that it will use all threads in the common thread pool. Consequently, it will block all other parallel stream operations in the application.

Let’s consider the below example snippet:

private WeatherModel getWeather(String name){
    // network call
}

private List getWeatherData(Stream cityNames) {
    return cityNames.parallel()
    .map(e -> this.getWeather(e))
    .collect(toList());
}

Here, ‘getWeatherData‘ is used to get the weather data for a list of cities. We convert the cityNames list to a parallel stream and each one calls getWeather to get the data. getWeather method makes one network call to get details about the weather. Network call is not a CPU intensive operation and we can make multiple network calls parallelly. But if the network call takes more time to complete for a bad network or any other reason, it will block all threads in the common fork-join thread pool.

Even if only one of the network calls takes a long time to complete and the rest finish sooner, it will have to wait for all of them to complete, affecting the overall performance of the application. Hence, we should always make sure that all tasks submitted to a parallel stream never get stuck and complete in a reasonable time. Another option is to use a custom thread pool that I am explaining below.

4. Custom thread pool with a parallel stream:

The above example explained the biggest limitation of the Stream API, i.e. it uses the common shared thread pool. To avoid this problem, we can submit the stream to a custom ForkJoinPool. If we use one custom thread pool with the above example, it will look like as below :

private List getCurrentWeather(Stream cityNames) throws InterruptedException, ExecutionException {
    ForkJoinPool customPool = new ForkJoinPool(4);
    List weatherData;

    weatherData = customPool.submit(() -> cityNames.parallel()
    .map(this::getWeather)
    .collect(toList())).get();
   
    return weatherData;
}

Note that it may throw InterruptedException or ExecutionException. In this example, we have created one ForkJoinPool of parallelism level 4 i.e. it will create up to 4 threads to run the tasks. It requires some testing to find out the optimal parallelism level.

Summary:

In this post, we looked at Java Parallel Streams. The parallel stream works better if there is a large set of data to process. Because, for each sub-stream, it creates one new thread to work on it. A sequential stream uses only one thread to complete processing. For these reasons, a parallel stream has more overhead as compared to a sequential stream.

Also, running a stream in parallel is not always a good idea as we have seen above. If you think that the Sequential stream on your code is not performing well, before moving to parallel, do a benchmark.

The best time to move to a parallel stream is if the sequential stream behaves poorly, we have a very large set of data to process and the benchmark shows a significant improvement.

Scroll to Top