Java8 Stream实现原理,源码浅析(上)

发布于:2024-04-26 ⋅ 阅读:(23) ⋅ 点赞:(0)

Stream🚩

Stream提供了对Java集合的一种全新的操作方式,由于简约的风格,以及不错的性能,已经成为Java开发人员必须掌握的基本技能。下面直观地感受一下Stream流的用法:

Stream快速开始

// 排序
numbers = numbers.stream().sorted(Integer::compareTo).collect(Collectors.toList());
// 根据ID去重
List<Long> likeTidList = likeDOs.stream().map(LikeDO::getTid)
                .distinct().collect(Collectors.toList());

前置知识:lambda表达式概述

由于Stream依赖lambda表达式,因此先做一些铺垫:lambda表达式为何能作为方法的入参?要点在于:函数式接口。函数式编程是对行为抽象

函数式接口

定义:只有一个抽象方法的接口

即使接口还包含其它默认方法default静态方法static,只要抽象方法只有一个,就行

比如最常见的:Runnable

// 函数式接口注解
@FunctionalInterface
public interface Runnable {
 public abstract void run();
}

lambda表达式

lambda表达式是对函数式接口的实现。在没有lambda的时代,我们往往会用匿名函数,实现接口的方法。但是这样做代码量大,复杂。lambda表达式的特点就是极其轻量化,能简则简。

Runnable runnable = () -> {  //代码更加简洁
    System.out.println("Hello world!");
};

lambda的原理

关键点在于对字节码的分析,可以参考:

总之,Lambda表达式会被编译生成一个私有的静态函数,并且生成一个实现了函数式接口的内部类。在这个内部类的唯一的抽象方法中,会调用生成的私有的静态函数来实现该唯一的抽象方法

下面正式开始介绍Stream的实现原理。流的原理基本可以划分为三个部分:流的创建,中间操作,以及终止操作,我们逐个来看。

一、Stream的创建

list.stream()

这是获取一个流对象,但我们要知道,流对象到底是什么?有什么核心字段?如何存储数据源?有多少种方式创建流对象?

Stream的创建方式

1、Stream.of(T... values)

public static<T> Stream<T> of(T... values) {
    return Arrays.stream(values);
}
// 本质是Arrays.stream,实质就是:
return StreamSupport.stream(spliterator(array, startInclusive, endExclusive), false);
// 又是StreamSupport.stream

2、Collection.stream()

也就是所有Collection的子类都可以,包括ArrayList,ArrayDeque

default Stream<E> stream() {
    return StreamSupport.stream(spliterator(), false);
}

3、Arrays.stream(T [])

数组当然也支持,前文也说过本质还是StreamSupport.stream

StreamSupport:真正创建Stream对象

来看看它的stream方法

// StreamSupport.stream
    public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
        Objects.requireNonNull(spliterator);
        return new ReferencePipeline.Head<>        (spliterator,StreamOpFlag.fromCharacteristics(spliterator),parallel);
}

可以看到,返回的实际上是ReferencePipeline的内部类Head(实现了Stream接口)

并且传递了一个参数Spliterator。那么这两个类就是分析的重点

Spliterator:数据源的载体

Spliterator的中文翻译过来是分离器,但是这并不能很好的帮助你理解Spliterator。

Spliterator最大的意义是:替Stream存储数据源,处理数据源,辅助并行操作。

这里以数组为例,ArraySpliterator来分析:(各种Spliterator实例都被封装在Spliterators类的内部类中)

1、存数源数据

// 数组获取spliterator的方法
   return Spliterators.spliterator(array, startInclusive, endExclusive,...);
// Spliterators.spliterator方法
public static <T> Spliterator<T> spliterator(Object[] array, int fromIndex, int toIndex,
                                             int additionalCharacteristics) {
    return new ArraySpliterator<>(array, fromIndex, toIndex, additionalCharacteristics);
}
static final class ArraySpliterator<T> implements Spliterator<T>{
        private final Object[] array;
        private int index;        // current index, modified on advance/split
        private final int fence;  // one past last index
        private final int characteristics;
}

这就可以看到Spliterator的一个作用:存数源数据

2、处理数据的能力

然后再来看看它的核心方法:

// lambda表达式,消费一个元素
public boolean tryAdvance(Consumer<? super T> action) {
    if (index >= 0 && index < fence) {
         T e = (T) array[index++];
        action.accept(e);
        return true;
    }
    return false;
}
// lambda表达式,消费所有元素
public void forEachRemaining(Consumer<? super T> action) {
     Object[] a; int i, hi; // hoist accesses and checks from loop
     if ((a = array).length >= (hi = fence) &&
         (i = index) >= 0 && i < (index = hi)) {
         do { action.accept((T)a[i]); } while (++i < hi);
     }
}

这就是Spliterator的第二个作用:利用lambda表达式处理数据

3、划分数据源

public Spliterator<T> trySplit() {
    int lo = index, mid = (lo + fence) >>> 1;
    return new ArraySpliterator<>(array, lo, index = mid, characteristics);
}

这个就是Spliterator的第三个作用了:划分数据源,生成多个Spliterator,并行处理时用到,我们暂时不关心

Stream是个惰性流,处理数据的时机是终止操作,因此你会看到后面你熟悉的map方法,都只是在做双向链表的插入而已,Spliterator同样会一直到Stream的终止才被调用其方法。

Pipeline:Stream接口的实例

我们这里就分析ReferencePipeline引用类型。

还有intPipeline之类的,原理都差不多。

看标题就知道了,Pipeline才是Stream接口真正的实现类。

在一条完整的用Stream操作集合的语句,众多的方法的链式调用,其实是尾插一个双向链表,只有三种Stream的创建方式是头节点。

ReferencePipeline继承自AbstractPipeline,本身没有别的属性。

核心属性:

// 头节点
private final AbstractPipeline sourceStage;
// 前一个节点
private final AbstractPipeline previousStage;
// 后一个节点
private AbstractPipeline nextStage;
// 是source 还是 Option 的flag标记
protected final int sourceOrOpFlags;
private int depth;
private int combinedFlags;
private Spliterator<?> sourceSpliterator;
private Supplier<? extends Spliterator<?>> sourceSupplier;
private boolean linkedOrConsumed;
private boolean sourceAnyStateful;
private Runnable sourceCloseAction;
// 是否并行
private boolean parallel;

可以看到是没有存储数据实体的,因为被封装在Spliterator内部。

ReferencePipeline下有三个内部类,都继承自ReferencePipeline

Head:创建流返回的对象

Head代表 Source stage of a ReferencePipeline. 也就是双向链表的头节点。

重写了forEach和forEachOrdered方法

Head的构造方法与另外两个不太相同,因为是头节点

// super父类的
AbstractPipeline(Spliterator<?> source,
                 int sourceFlags, boolean parallel) {
    this.previousStage = null;
    // 可以看到 用 source 命名 Spliterator
    // 而在下面两个的构造方法是没有赋值sourceSpliterator的
    this.sourceSpliterator = source;
    this.sourceStage = this;
    this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
    this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
    this.depth = 0;
    this.parallel = parallel;
}

另外两个类都是中间操作的返回对象,也即双向链表的非头节点又分为下面两种。因此我们放到中间操作部分再看。

小结

流对象三种创建方式,都依赖StreamSupport工厂类。返回的是Head对象,是Pipeline的一个子实现,封装了Spliterator。Spliterator类负责存储和处理数据。

二、Stream的中间操作

开始本篇之前,我们需要先了解他们的实体类。也就是ReferencePipeline的另外两个子类。虽然名字末尾是Op,但需要明白,它们也是流Stream的子类

Pipeline的另外两个子类:

StatelessOp

顾名思义:这是个无状态的中间操作流

仅仅是重写了这个方法

final boolean opIsStateful() {
    return false;
}
StatefulOp

这是个有状态的中间操作流,同样

final boolean opIsStateful() {
    return true;
}

这两种Op的构造方法如下:

AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
    // 维护上一个节点的状态   
    previousStage.linkedOrConsumed = true;
    // 维护双向链表,前后节点
    previousStage.nextStage = this;
    this.previousStage = previousStage;
    
    this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
    this.combinedFlags =                                                                            StreamOpFlag.combineOpFlags(opFlags,previousStage.combinedFlags);
    this.sourceStage = previousStage.sourceStage;
    if (opIsStateful())
        sourceStage.sourceAnyStateful = true;
    this.depth = previousStage.depth + 1;
}
Stateless和Stateful的区别
操作分类 方法
无状态操作 filter() map() mapToInt() mapToLong() mapToDouble() flatMap() flatMapToInt() flatMapToLong() flatMapToDouble() peek() unordered()
有状态操作 distinct() sorted() limit() skip()

有状态操作:是指如果元素的形式有状态,那么会影响该操作的执行。比如:sorted,你原本就是排好序的,这就是一种状态,你是无序的,就是另一种状态。根据不同的状态用if else做不同的处理来提高效率。

无状态操作:是指无论元素处于什么状态,比如元素全部相等,或是有序,都对这个操作完全没有影响。比如:map映射,和有序没有关系,元素过滤,每个元素都是单独用Predicate去test,和元素的状态无关。

常见中间操作

回顾Stream快速开始,我们知道list.stream().map()...都是链式调用的,因此我们可以大胆地猜测:诸如map,sorted等方法返回值都是一个ReferencePipeline的某个子类,我们来挑几个看看这些方法的具体行为。

无状态操作:

map方法
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
    // 果不其然 是一个StatelessOp 无状态
    return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                     // StreamOpFlag记录集合的状态 比如是否已经排序 是否去重了
                                 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
        // 最后重写了  opWrapSink 方法 这个先放一放
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
            return new Sink.ChainedReference<P_OUT, R>(sink) {
                @Override
                public void accept(P_OUT u) {
                    downstream.accept(mapper.apply(u));
                }
            };
        }
    };
}
// 它的begin和end继承自父类
// 相当于:对当前map这个操作,begin和end都是什么也不做,直接交给stream流的下一个stream
        @Override
        public void begin(long size) {
            downstream.begin(size);
        }
​
        @Override
        public void end() {
            downstream.end();
        }

返回了一个StatelessOp

有状态操作

sorted方法
    public final Stream<P_OUT> sorted() {
        return SortedOps.makeRef(this);
    }
// makeRef方法 
// 注意此处参数的名字叫 upstream
    static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream) {
        return new OfRef<>(upstream);
    }

发现,返回的是个OfRef,而OfRef正是StatefulOp的一个子类

private static final class OfRef<T> extends ReferencePipeline.StatefulOp<T, T> {
    // isNaturalSort表示元素本身是否可以排序(传参数不传比较器则为true,否则为false)
    private final boolean isNaturalSort;
    private final Comparator<? super T> comparator;
    
    // 构造方法:
        OfRef(AbstractPipeline<?, T, ?> upstream, Comparator<? super T> comparator) {
            // 因此也会作为节点插入到双向链表尾部
            super(upstream, StreamShape.REFERENCE,
                  StreamOpFlag.IS_ORDERED | StreamOpFlag.NOT_SORTED);
            this.isNaturalSort = false;
            this.comparator = Objects.requireNonNull(comparator);
        }    
    
    // 也重写了 opWrapSink 方法
        public Sink<T> opWrapSink(int flags, Sink<T> sink) {
            // If the input is already naturally sorted and this operation
            // also naturally sorted then this is a no-op
            if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort)
                return sink;
            else if (StreamOpFlag.SIZED.isKnown(flags))
                return new SizedRefSortingSink<>(sink, comparator);
            else
                return new RefSortingSink<>(sink, comparator);
        }
    
    // 还重写了 opEvaluateParallel 方法
}

RefSortingSink重写三个方法:

@Override
public void begin(long size) {
    if (size >= Nodes.MAX_ARRAY_SIZE)
        throw new IllegalArgumentException(Nodes.BAD_SIZE);
    list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>();
}
​
@Override
public void end() 
    // 排序
    list.sort(comparator);
    downstream.begin(list.size());
    if (!cancellationWasRequested) {
        list.forEach(downstream::accept);
    }
    else {
        for (T t : list) {
            if (downstream.cancellationRequested()) break;
            downstream.accept(t);
        }
    }
    downstream.end();
    list = null;
}
​
@Override
public void accept(T t) {
    // 并没有向下传递
    list.add(t);
}

还有一点是:map传入的lambda表达式,和sorted(可能)传入的比较器,都与父抽象类的构造方法无关。而是被用在了opWrapSink以及opEvaluateParallel方法内部

小结

根据以上源码分析我们可以知道:

  • 虽然不知道opWrapSink方法是干嘛的,但我们可以确定opWrapSink方法很重要
  • 不论StatefulOp 还是 StatelessOp,这些方法都仅仅是new 了一个实例,并插入到双向链表尾部
  • 只要我们的类继承自ReferencePipeline.StateXXXOp,重写某些方法,我们完全可以自定义中间操作

但是opWrapSink到底做了什么,map等等操作到底是什么时候执行的?由于篇幅问题,这些问题就留到下篇文章分析Stream的终止操作时在解答吧。请参考:

参考文档