# Stream
流的介绍
# 1.1 java8 stream
介绍
java8
新增了 stream
流的特性,能够让用户以函数式的方式、更为简单的操纵集合等数据结构,并实现了用户无感知的并行计算。
# 1.2 从零开始实现一个 stream
流
相信很多人在使用过 java8
的 streamAPI
接口之后,都会对其实现原理感到好奇,但往往在看到 jdk
的 stream
源码后却被其复杂的抽象、封装给弄糊涂了,而无法很好的理解其背后的原理。究其原因,是因为 jdk
的 stream
源码是高度工程化的代码,工程化的代码为了效率和满足各式各样的需求,会将代码实现的极其复杂,不易理解。
在这里,我们将抛开 jdk
的实现思路,从零开始实现一个 stream
流。
我们的 stream
流同样拥有惰性求值,函数式编程接口等特性,并兼容 jdk
的 Collection
等数据结构 (但不支持并行计算 orz
)。
相信在亲手实现一个 stream
流的框架之后,大家能更好的理解流计算的原理。
# stream
的优点
在探讨探究 stream
的实现原理和动手实现之前,我们先要体会 stream
流计算的独特之处。
举个例子: 有一个 List<Person>
列表,我们需要获得年龄为 70
岁的前 10
个 Person
的姓名。
过程式的解决方案:
稍加思考,我们很快就写出了一个过程式的解决方案 (伪代码):
1 2 3 4 5 6 7 8 9 10 11 12
| List<Person> personList = fromDB(); int limit = 10; List<String> nameList = new ArrayList(); for(Person personItem : personList){ if(personItem.age == 70){ nameList.add(personItem.name); if(nameList.size() >= 10){ break; } } } return nameList;
|
函数式 stream
解决方案:
下面我们给出一种基于 stream
流的解决方案 (伪代码):
1 2 3 4 5 6 7 8
| List<Person> personList = fromDB(); List<String> nameList = personList.stream() .filter(item->item.age == 70) .limit(10) .map(item->item.name) .collect(Collector.toList());
return nameList;
|
两种方案的不同之处:
从函数式的角度上看,过程式的代码实现将收集元素、循环迭代、各种逻辑判断耦合在一起,暴露了太多细节。当未来需求变动和变得更加复杂的情况下,过程式的代码将变得难以理解和维护 (需要控制台打印出 年龄为 70
岁的前 10 个 Person
中,姓王的 Person
的名称)。
函数式的解决方案解开了代码细节和业务逻辑的耦合,类似于 sql
语句,表达的是 "要做什么" 而不是 "如何去做",使程序员可以更加专注于业务逻辑,写出易于理解和维护的代码。
1 2 3 4 5 6 7
| List<Person> personList = fromDB(); personList.stream() .filter(item->item.age == 70) .limit(10) .filter(item->item.name.startWith("王")) .map(item->item.name) .forEach(System.out::println);
|
# stream API
接口介绍
stream API
的接口是函数式的,尽管 java 8
也引入了 lambda
表达式,但 java
实质上依然是由接口 - 匿名内部类来实现函数传参的,所以需要事先定义一系列的函数式接口。
Function:
类似于 y = F(x)
1 2 3 4 5 6 7 8 9
| @FunctionalInterface public interface Function<R,T> {
R apply(T t); }
|
BiFunction
: 类似于 z = F(x,y)
1 2 3 4 5 6 7 8 9
| @FunctionalInterface public interface BiFunction<R, T, U> {
R apply(T t, U u); }
|
ForEach
: 遍历处理
1 2 3 4 5 6 7 8 9
| @FunctionalInterface public interface ForEach <T>{
void apply(T item); }
|
Comparator
: 比较器
1 2 3 4 5 6 7 8 9 10 11 12 13
| @FunctionalInterface public interface Comparator<T> {
int compare(T o1, T o2); }
|
Predicate:
条件判断
1 2 3 4 5 6 7 8 9 10 11
| @FunctionalInterface public interface Predicate <T>{
boolean satisfy(T item); }
|
Supplier:
提供初始值
1 2 3 4 5 6 7 8 9
| @FunctionalInterface public interface Supplier<T> {
T get(); }
|
EvalFunction:stream
求值函数
1 2 3 4 5 6 7 8 9
| @FunctionalInterface public interface EvalFunction<T> {
MyStream<T> apply(); }
|
# stream API
接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
|
public interface Stream<T> {
<R> MyStream<R> map(Function<R,T> mapper);
<R> MyStream<R> flatMap(Function<? extends MyStream<R>, T> mapper);
MyStream<T> filter(Predicate<T> predicate);
MyStream<T> limit(int n);
MyStream<T> distinct();
MyStream<T> peek(ForEach<T> consumer);
void forEach(ForEach<T> consumer);
<R> R reduce(R initVal, BiFunction<R, R, T> accumulator);
<R, A> R collect(Collector<T,A,R> collector);
T max(Comparator<T> comparator);
T min(Comparator<T> comparator);
int count();
boolean anyMatch(Predicate<? super T> predicate);
boolean allMatch(Predicate<? super T> predicate);
static <T> MyStream<T> makeEmptyStream(){ return new MyStream.Builder<T>().isEnd(true).build(); } }
|
# MyStream
实现细节
简单介绍了 API
接口定义之后,我们开始深入探讨流的内部实现。
流由两个重要的部分所组成,当前数据项 (head)"和" 下一数据项的求值函数 (nextItemEvalProcess)
。
其中, nextItemEvalProcess
是流能够实现 "惰性求值" 的关键。
流的基本属性:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
| public class MyStream<T> implements Stream<T> {
private T head;
private NextItemEvalProcess nextItemEvalProcess;
private boolean isEnd;
public static class Builder<T>{ private MyStream<T> target;
public Builder() { this.target = new MyStream<>(); }
public Builder<T> head(T head){ target.head = head; return this; }
Builder<T> isEnd(boolean isEnd){ target.isEnd = isEnd; return this; }
public Builder<T> nextItemEvalProcess(NextItemEvalProcess nextItemEvalProcess){ target.nextItemEvalProcess = nextItemEvalProcess; return this; }
public MyStream<T> build(){ return target; } }
private MyStream<T> eval(){ return this.nextItemEvalProcess.eval(); }
private boolean isEmptyStream(){ return this.isEnd; } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
|
public class NextItemEvalProcess {
private EvalFunction evalFunction;
public NextItemEvalProcess(EvalFunction evalFunction) { this.evalFunction = evalFunction; }
MyStream eval(){ return evalFunction.apply(); } }
|
# 4.1 stream
流在使用过程中的三个阶段
-
生成并构造一个流 (List.stream () 等方法)
-
在流的处理过程中添加、绑定惰性求值流程 (map、filter、limit 等方法)
-
对流使用强制求值函数,生成最终结果 (max、collect、forEach 等方法)
# 4.2 生成并构造一个流
流在生成时是 "纯净" 的,其最初的 NextItemEvalProcess
求值之后就是指向自己的下一个元素。
我们以一个 Integer
整数流的生成为例。 IntegerStreamGenerator.getIntegerStream(1,10)
会返回一个流结构,其逻辑上等价于一个从 1
到 10
的整数流。但实质是一个惰性求值的 stream
对象,这里称其为 IntStream
,其 NextItemEvalProcess
是一个闭包,方法体是一个递归结构的求值函数,其中下界参数 low = low + 1
。
当 IntStream
第一次被求值时,流开始初始化, isStart = false
。当初始化完成之后,每一次求值,都会生成一个新的流对象,其中 head(low) = low + 1
。当 low > high
时,流被终止,返回空的流对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
|
public class IntegerStreamGenerator {
public static MyStream<Integer> getIntegerStream(int low, int high){ return getIntegerStreamInner(low,high,true); }
private static MyStream<Integer> getIntegerStreamInner(int low, int high, boolean isStart){ if(low > high){ return Stream.makeEmptyStream(); } if(isStart){ return new MyStream.Builder<Integer>() .process(new NextItemEvalProcess(()->getIntegerStreamInner(low,high,false))) .build(); }else{ return new MyStream.Builder<Integer>() .head(low) .process(new NextItemEvalProcess(()->getIntegerStreamInner(low+1,high,false))) .build(); } } }
|
可以看到,生成一个流的关键在于确定如何求值下一项元素。对于整数流来说, low = low + 1
就是其下一项的求值过程。
那么对于我们非常关心的 jdk
集合容器,又该如何生成对应的流呢?
答案是 Iterator
迭代器, jdk
的集合容器都实现了 Iterator
迭代器接口,通过迭代器我们可以轻易的取得容器的下一项元素,而不用关心容器内部实现细节。换句话说,只要实现过迭代器接口,就可以自然的转化为 stream
流,从而获得流计算的所有能力。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
|
public class CollectionStreamGenerator {
public static <T> MyStream<T> getListStream(List<T> list){ return getListStream(list.iterator(),true); }
private static <T> MyStream<T> getListStream(Iterator<T> iterator, boolean isStart){ if(!iterator.hasNext()){ return Stream.makeEmptyStream(); }
if(isStart){ return new MyStream.Builder<T>() .nextItemEvalProcess(new NextItemEvalProcess(()-> getListStream(iterator,false))) .build(); }else{ return new MyStream.Builder<T>() .head(iterator.next()) .nextItemEvalProcess(new NextItemEvalProcess(()-> getListStream(iterator,false))) .build(); } } }
|
# 举例分析
我们选择一个简单而又不失一般性的例子,串联起这些内容。通过完整的描述一个流求值的全过程,加深大家对流的理解。
1 2 3 4 5 6 7 8 9
| public static void main(String[] args){ Integer sum = IntegerStreamGenerator.getIntegerStream(1,10) .filter(item-> item%2 == 0) .map(item-> item * item) .limit(2) .reduce(0,(i1,i2)-> i1+i2);
System.out.println(sum); }
|
由于我们的 stream
实现采用的是链式编程的方式,不太好理解,将其展开为逻辑等价的形式。
1 2 3 4 5 6 7 8 9 10 11 12 13
| public static void main(String[] args){ Stream<Integer> intStream = IntegerStreamGenerator.getIntegerStream(1,10); Stream<Integer> filterStream = intStream.filter(item-> item%2 == 0); Stream<Integer> mapStream = filterStream.map(item-> item * item); Stream<Integer> limitStream = mapStream.limit(2); Integer sum = limitStream.reduce(0,(i1,i2)-> i1+i2);
System.out.println(sum); }
|
reduce
强制求值操作之前的执行过程图:
reduce
强制求值过程中的执行过程图 :
可以看到, stream
的求值过程并不会一口气将初始的流全部求值,而是按需的、一个一个的进行求值。
stream
的一次求值过程至多只会遍历流中元素一次;如果存在短路操作 ( limit、anyMatch
等),实际迭代的次数会更少。
因此不必担心多层的 map、filter
处理逻辑的嵌套会让流进行多次迭代,导致效率急剧下降。
# 具体用法
# 1. 流的常用创建方法
# 1.1 使用 Collection
下的 stream()
和 parallelStream()
方法
1 2 3
| List<String> list = new ArrayList<>(); Stream<String> stream = list.stream(); Stream<String> parallelStream = list.parallelStream();
|
# 1.2 使用 Arrays
中的 stream()
方法,将数组转成流
1 2
| Integer[] nums = new Integer[10]; Stream<Integer> stream = Arrays.stream(nums);
|
# 1.3 使用 Stream
中的静态方法: of()、iterate()、generate()
1 2 3 4 5 6 7
| Stream<Integer> stream = Stream.of(1,2,3,4,5,6); Stream<Integer> stream2 = Stream.iterate(0, (x) -> x + 2).limit(6); stream2.forEach(System.out::println); Stream<Double> stream3 = Stream.generate(Math::random).limit(2); stream3.forEach(System.out::println);
|
# 1.4 使用 BufferedReader.lines()
方法,将每行内容转成流
1 2 3
| BufferedReader reader = new BufferedReader(new FileReader("F:\\test_stream.txt")); Stream<String> lineStream = reader.lines(); lineStream.forEach(System.out::println);
|
# 1.5 使用 Pattern.splitAsStream()
方法,将字符串分隔成流
1 2 3
| Pattern pattern = Pattern.compile(","); Stream<String> stringStream = pattern.splitAsStream("a,b,c,d"); stringStream.forEach(System.out::println);
|
# 2.
流的中间操作
# 2.1
筛选与切片
filter:
过滤流中的某些元素
limit(n):
获取 n
个元素
skip(n):
跳过 n
元素,配合 limit(n)
可实现分页
distinct:
通过流中元素的 hashCode()
和 equals()
去除重复元素
1 2 3 4 5 6 7
| Stream<Integer> stream = Stream.of(6, 4, 6, 7, 3, 9, 8, 10, 12, 14, 14); Stream<Integer> newStream = stream.filter(s -> s > 5) .distinct() .skip(2) .limit(2); newStream.forEach(System.out::println);
|
# 2.2
映射
map:
接收一个函数作为参数,该函数会被应用到每个元素上,并将其映射成一个新的元素。
flatMap:
接收一个函数作为参数,将流中的每个值都换成另一个流,然后把所有流连接成一个流。
1 2 3 4 5 6 7 8 9 10 11 12 13
| List<String> list = Arrays.asList("a,b,c", "1,2,3");
Stream<String> s1 = list.stream().map(s -> s.replaceAll(",", "")); s1.forEach(System.out::println); Stream<String> s3 = list.stream().flatMap(s -> { String[] split = s.split(","); Stream<String> s2 = Arrays.stream(split); return s2; }); s3.forEach(System.out::println);
|
# 2.3
排序
sorted():
自然排序,流中元素需实现 Comparable
接口
sorted(Comparator com):
定制排序,自定义 Comparator
排序器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| List<String> list = Arrays.asList("aa", "ff", "dd");
list.stream().sorted().forEach(System.out::println); Student s1 = new Student("aa", 10); Student s2 = new Student("bb", 20); Student s3 = new Student("aa", 30); Student s4 = new Student("dd", 40); List<Student> studentList = Arrays.asList(s1, s2, s3, s4);
studentList.stream().sorted( (o1, o2) -> { if (o1.getName().equals(o2.getName())) { return o1.getAge() - o2.getAge(); } else { return o1.getName().compareTo(o2.getName()); } } ).forEach(System.out::println);
|
# 2.4 消费
peek:
如同于 map
,能得到流中的每一个元素。但 map
接收的是一个 Function
表达式,有返回值;而 peek
接收的是 Consumer
表达式,没有返回值。
1 2 3 4 5 6 7 8 9 10 11
| Student s1 = new Student("aa", 10); Student s2 = new Student("bb", 20); List<Student> studentList = Arrays.asList(s1, s2); studentList.stream() .peek(o -> o.setAge(100)) .forEach(System.out::println);
Student{name='aa', age=100} Student{name='bb', age=100}
|
# 3. 流的终止操作
# 3.1 匹配、聚合操作
allMatch
:接收一个 Predicate
函数,当流中每个元素都符合该断言时才返回 true
,否则返回 false
noneMatch
:接收一个 Predicate
函数,当流中每个元素都不符合该断言时才返回 true
,否则返回 false
anyMatch
:接收一个 Predicate
函数,只要流中有一个元素满足该断言则返回 true
,否则返回 false
findFirst
:返回流中第一个元素
findAny
:返回流中的任意元素
count
:返回流中元素的总个数
max
:返回流中元素最大值
min
:返回流中元素最小值
1 2 3 4 5 6 7 8 9 10 11 12
| List<Integer> list = Arrays.asList(1, 2, 3, 4, 5); boolean allMatch = list.stream().allMatch(e -> e > 10); boolean noneMatch = list.stream().noneMatch(e -> e > 10); boolean anyMatch = list.stream().anyMatch(e -> e > 4); Integer findFirst = list.stream().findFirst().get(); Integer findAny = list.stream().findAny().get(); long count = list.stream().count(); Integer max = list.stream().max(Integer::compareTo).get(); Integer min = list.stream().min(Integer::compareTo).get();
|
# 3.2 规约操作
Optional<T> reduce(BinaryOperator<T> accumulator):
第一次执行时, accumulator
函数的第一个参数为流中的第一个元素,第二个参数为流中元素的第二个元素;第二次执行时,第一个参数为第一次函数执行的结果,第二个参数为流中的第三个元素;依次类推。
T reduce(T identity, BinaryOperator<T> accumulator):
流程跟上面一样,只是第一次执行时, accumulator
函数的第一个参数为 identity
,而第二个参数为流中的第一个元素。
<U> U reduce(U identity,BiFunction<U, ? super T, U> accumulator,BinaryOperator<U> combiner):
在串行流 ( stream
) 中,该方法跟第二个方法一样,即第三个参数 combiner
不会起作用。在并行流 ( parallelStream
) 中,我们知道流被 fork join
出多个线程进行执行,此时每个线程的执行流程就跟第二个方法 reduce(identity,accumulator)
一样,而第三个参数 combiner
函数,则是将每个线程的执行结果当成一个新的流,然后使用第一个方法 reduce(accumulator)
流程进行规约。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24); Integer v = list.stream().reduce((x1, x2) -> x1 + x2).get(); System.out.println(v); Integer v1 = list.stream().reduce(10, (x1, x2) -> x1 + x2); System.out.println(v1); Integer v2 = list.stream().reduce(0, (x1, x2) -> { System.out.println("stream accumulator: x1:" + x1 + " x2:" + x2); return x1 - x2; }, (x1, x2) -> { System.out.println("stream combiner: x1:" + x1 + " x2:" + x2); return x1 * x2; }); System.out.println(v2); Integer v3 = list.parallelStream().reduce(0, (x1, x2) -> { System.out.println("parallelStream accumulator: x1:" + x1 + " x2:" + x2); return x1 - x2; }, (x1, x2) -> { System.out.println("parallelStream combiner: x1:" + x1 + " x2:" + x2); return x1 * x2; }); System.out.println(v3);
|
# 3.3 收集操作
collect
:接收一个 Collector
实例,将流中元素收集成另外一个数据结构。
Collector<T, A, R>
是一个接口,有以下 5 个抽象方法:
Supplier<A> supplier()
:创建一个结果容器 A
BiConsumer<A, T> accumulator()
:消费型接口,第一个参数为容器 A,第二个参数为流中元素 T
。
BinaryOperator<A> combiner()
:函数接口,该参数的作用跟上一个方法 ( reduce
) 中的 combiner
参数一样,将并行流中各个子进程的运行结果 ( accumulator
函数操作后的容器 A
) 进行合并。
Function<A, R> finisher()
:函数式接口,参数为:容器 A
,返回类型为: collect
方法最终想要的结果 R
。
Set<Characteristics> characteristics()
:返回一个不可变的 Set
集合,用来表明该 Collector
的特征。有以下三个特征:
CONCURRENT
:表示此收集器支持并发。(官方文档还有其他描述,暂时没去探索,故不作过多翻译)
UNORDERED
:表示该收集操作不会保留流中元素原有的顺序。
IDENTITY_FINISH
:表示 finisher
参数只是标识而已,可忽略。
# 3.3.1 Collector
工具库:Collectors
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| Student s1 = new Student("aa", 10,1); Student s2 = new Student("bb", 20,2); Student s3 = new Student("cc", 10,3); List<Student> list = Arrays.asList(s1, s2, s3);
List<Integer> ageList = list.stream().map(Student::getAge).collect(Collectors.toList());
Set<Integer> ageSet = list.stream().map(Student::getAge).collect(Collectors.toSet());
Map<String, Integer> studentMap = list.stream().collect(Collectors.toMap(Student::getName, Student::getAge));
String joinName = list.stream().map(Student::getName).collect(Collectors.joining(",", "(", ")"));
Long count = list.stream().collect(Collectors.counting());
Integer maxAge = list.stream().map(Student::getAge).collect(Collectors.maxBy(Integer::compare)).get();
Integer sumAge = list.stream().collect(Collectors.summingInt(Student::getAge));
Double averageAge = list.stream().collect(Collectors.averagingDouble(Student::getAge));
DoubleSummaryStatistics statistics = list.stream().collect(Collectors.summarizingDouble(Student::getAge)); System.out.println("count:" + statistics.getCount() + ",max:" + statistics.getMax() + ",sum:" + statistics.getSum() + ",average:" + statistics.getAverage());
Map<Integer, List<Student>> ageMap = list.stream().collect(Collectors.groupingBy(Student::getAge));
Map<Integer, Map<Integer, List<Student>>> typeAgeMap = list.stream().collect(Collectors.groupingBy(Student::getType, Collectors.groupingBy(Student::getAge)));
Map<Boolean, List<Student>> partMap = list.stream().collect(Collectors.partitioningBy(v -> v.getAge() > 10));
Integer allAge = list.stream().map(Student::getAge).collect(Collectors.reducing(Integer::sum)).get();
|
# 3.3.2 Collectors.toList()
解析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| public static <T> Collector<T, ?, List<T>> toList() { return new CollectorImpl<>((Supplier<List<T>>) ArrayList::new, List::add, (left, right) -> { left.addAll(right); return left; }, CH_ID); }
public <T> Collector<T, ?, List<T>> toList() { Supplier<List<T>> supplier = () -> new ArrayList(); BiConsumer<List<T>, T> accumulator = (list, t) -> list.add(t); BinaryOperator<List<T>> combiner = (list1, list2) -> { list1.addAll(list2); return list1; }; Function<List<T>, List<T>> finisher = (list) -> list; Set<Collector.Characteristics> characteristics = Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.IDENTITY_FINISH)); return new Collector<T, List<T>, List<T>>() { @Override public Supplier supplier() { return supplier; } @Override public BiConsumer accumulator() { return accumulator; } @Override public BinaryOperator combiner() { return combiner; } @Override public Function finisher() { return finisher; } @Override public Set<Characteristics> characteristics() { return characteristics; } }; }
|