如何高效的使用并行流_java8_Silently9527_InfoQ 写作社区

本文由 简悦 SimpRead 转码, 原文地址 xie.infoq.cn

在 Java7 之前想要并行处理大量数据是很困难的,首先把数据拆分成很多个部分,然后把这这些子部分放入到每个线程中去执行计算逻辑,最后在把每个线程返回的计算结果进行合并操作;在 Java7 中提供了一个处理大

在 Java7 之前想要并行处理大量数据是很困难的,首先把数据拆分成很多个部分,然后把这这些子部分放入到每个线程中去执行计算逻辑,最后在把每个线程返回的计算结果进行合并操作;在 Java7 中提供了一个处理大数据的 fork/join 框架,屏蔽掉了线程之间交互的处理,更加专注于数据的处理。

Fork/Join 框架

Fork/Join 框架采用的是思想就是分而治之,把大的任务拆分成小的任务,然后放入到独立的线程中去计算,同时为了最大限度的利用多核 CPU,采用了一个种工作窃取的算法来运行任务,也就是说当某个线程处理完自己工作队列中的任务后,尝试当其他线程的工作队列中窃取一个任务来执行,直到所有任务处理完毕。所以为了减少线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行;在百度找了一张图

https://static001.geekbang.org/infoq/6e/6ebd3bdeb6ffeec79d871eaa011048ab.png

  • 使用RecursiveTask

使用 Fork/Join 框架首先需要创建自己的任务,需要继承RecursiveTask,实现抽象方法

1
protected abstract V compute();

复制代码

实现类需要在该方法中实现任务的拆分,计算,合并;伪代码可以表示成这样:

1
if(任务已经不可拆分){    return 顺序计算结果;} else {    1.任务拆分成两个子任务    2.递归调用本方法,拆分子任务    3.等待子任务执行完成    4.合并子任务的结果}

复制代码

  • Fork/Join 实战

任务:完成对一亿个自然数求和

我们先使用串行的方式实现,代码如下:

1
long result = LongStream.rangeClosed(1, 100000000)                .reduce(0, Long::sum);System.out.println("result:" + result);

复制代码

使用 Fork/Join 框架实现,代码如下:

1
public class SumRecursiveTask extends RecursiveTask<Long> {    private long[] numbers;    private int start;    private int end;    public SumRecursiveTask(long[] numbers) {        this.numbers = numbers;        this.start = 0;        this.end = numbers.length;    }    public SumRecursiveTask(long[] numbers, int start, int end) {        this.numbers = numbers;        this.start = start;        this.end = end;    }    @Override    protected Long compute() {        int length = end - start;        if (length < 20000) {  //小于20000个就不在进行拆分            return sum();        }        SumRecursiveTask leftTask = new SumRecursiveTask(numbers, start, start + length / 2); //进行任务拆分        SumRecursiveTask rightTask = new SumRecursiveTask(numbers, start + (length / 2), end); //进行任务拆分        leftTask.fork(); //把该子任务交友ForkJoinPoll线程池去执行        rightTask.fork(); //把该子任务交友ForkJoinPoll线程池去执行        return leftTask.join() + rightTask.join(); //把子任务的结果相加    }    private long sum() {        int sum = 0;        for (int i = start; i < end; i++) {            sum += numbers[i];        }        return sum;    }    public static void main(String[] args) {        long[] numbers = LongStream.rangeClosed(1, 100000000).toArray();        Long result = new ForkJoinPool().invoke(new SumRecursiveTask(numbers));        System.out.println("result:" +result);    }}

复制代码

Fork/Join 默认的线程数量就是你的处理器数量,这个值是由Runtime.getRuntime().available- Processors()得到的。 但是你可以通过系统属性java.util.concurrent.ForkJoinPool.common. parallelism来改变线程池大小,如下所示: System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","12"); 这是一个全局设置,因此它将影响代码中所有的并行流。目前还无法专为某个 并行流指定这个值。因为会影响到所有的并行流,所以在任务中经历避免网络 / IO 操作,否则可能会拖慢其他并行流的运行速度

parallelStream

以上我们说到的都是在 Java7 中使用并行流的操作,Java8 并没有止步于此,为我们提供更加便利的方式,那就是parallelStreamparallelStream底层还是通过 Fork/Join 框架来实现的。

  • 常见的使用方式
  1. 串行流转化成并行流
1
LongStream.rangeClosed(1,1000)                .parallel()                .forEach(System.out::println);

复制代码

  1. 直接生成并行流
1
List<Integer> values = new ArrayList<>();        for (int i = 0; i < 10000; i++) {            values.add(i);        }        values.parallelStream()                .forEach(System.out::println);

复制代码

  • 正确的使用 parallelStream

我们使用parallelStream来实现上面的累加例子看看效果,代码如下:

1
public static void main(String[] args) {    Summer summer = new Summer();    LongStream.rangeClosed(1, 100000000)            .parallel()            .forEach(summer::add);    System.out.println("result:" + summer.sum);}static class Summer {    public long sum = 0;    public void add(long value) {        sum += value;    }}

复制代码

运行结果如下:

https://static001.geekbang.org/infoq/67/6756af09b85de9f4aa46ce4bae7f1e6f.png

运行之后,我们发现运行的结果不正确,并且每次运行的结果都不一样,这是为什么呢?

这里其实就是错用parallelStream常见的情况,parallelStream是非线程安全的,在这个里面中使用多个线程去修改了共享变量 sum, 执行了sum += value操作,这个操作本身是非原子性的,所以在使用并行流时应该避免去修改共享变量。

修改上面的例子,正确使用parallelStream来实现,代码如下:

1
long result = LongStream.rangeClosed(1, 100000000)        .parallel()        .reduce(0, Long::sum);System.out.println("result:" + result);

复制代码

在前面我们已经说过了 fork/join 的操作流程是:拆子部分,计算,合并结果;因为parallelStream底层使用的也是 fork/join 框架,所以这些步骤也是需要做的,但是从上面的代码,我们看到Long::sum做了计算,reduce做了合并结果,我们并没有去做任务的拆分,所以这个过程肯定是parallelStream已经帮我们实现了,这个时候就必须的说说Spliterator

Spliterator是 Java8 加入的新接口,是为了并行执行任务而设计的。

1
public interface Spliterator<T> {    boolean tryAdvance(Consumer<? super T> action);    Spliterator<T> trySplit();    long estimateSize();    int characteristics();}

复制代码

tryAdvance: 遍历所有的元素,如果还有可以遍历的就返回 ture,否则返回 false

trySplit: 对所有的元素进行拆分成小的子部分,如果已经不能拆分就返回 null

estimateSize: 当前拆分里面还剩余多少个元素

characteristics: 返回当前 Spliterator 特性集的编码

总结

  1. 要证明并行处理比顺序处理效率高,只能通过测试,不能靠猜测(本文累加的例子在多台电脑上运行了多次,也并不能证明采用并行来处理累加就一定比串行的快多少,所以只能通过多测试,环境不同可能结果就会不同)
  2. 数据量较少,并且计算逻辑简单,通常不建议使用并行流
  3. 需要考虑流的操作时间消耗
  4. 在有些情况下需要自己去实现拆分的逻辑,并行流才能高效

感谢大家可以耐心地读到这里。

当然,文中或许会存在或多或少的不足、错误之处,有建议或者意见也非常欢迎大家在评论交流。

最后,希望朋友们可以点赞评论关注三连,因为这些就是我分享的全部动力来源🙏