Java8 中的Stream API
Stream 是 Java8 中处理集合的关键抽象概念。可以对集合进行操作,执行复杂的查找、过滤、映射数据等相关操作。使用Stream API 对集合操作类似于SQL查询。
一 、Stream (流)是什么
stream 是一种数据渠道,用来操作数据源并生成新的元素序列的过程。是一个来自数据源的元素队列并支持查找过滤映射等聚合操作。
需要注意点:
(1)Stream 自己不会存存储元素。(所以不是集合)
(2)Stream 的来源可以是集合、数组、IO推导、产生器generator等。
(3)Stream 不会改变源对象,在 Stream 结束操作之后会返回一个持有操作结果的新 Stream 。
(4)Stream 的聚合操作类似SQL语句,如:filter、map、reduce、find、match、sorted等
(4)Stream 操作是一种 Pipelining (流水线技术),中间操作返回流对象本身,这样可以将多个操作串联成一个管道,如fluent style(流式风格)
,可以实现延迟执行(Laziness)和短路(Short-Circuiting)的优化。
(5)Stream 通过访问者模式(Visitor)实现内部迭代方式,无须通过Iterator或For进行外部显示迭代。
二、Stream 操作步骤
Stream 操作分三步:
(1)创建 Stream
利用数据源(集合、数组、IO推导、产生器generator等)获取流。
(2)中间操作
中间操作(intermediate operation) 可以是一个中间链,对数据源的数据进行处理
(3)最终操作(terminal operation)
最终操作,也可以叫终止操作、终端操作。最终操作执行中间操作链,并生成结果。
流程大概如图:
+--------------------+ +------+ +------+ +---+ +-------+ +--------+
| stream of elements +-----> |filter+-> |sorted+-> |map+-> |flatMap|-----> |collect +
+--------------------+ +------+ +------+ +---+ +-------+ +--------+
1、创建流
(1)Collection接口获取流
java8 的Collection
接口被扩展,提供了两个获取流的方法:
default Stream<E> stream() //返回一个串行流
default Stream<E> parallelStream() //返回一个并行流
如:
//Collection 提供了两个方法 stream() 与 parallelStream()
List<String> list = new ArrayList<String>();
Stream<String> stream = list.stream(); //获取一个顺序流
Stream<String> parallelStream = list.parallelStream(); //获取一个并行流
(2)数组获取数组流
java8 中的Arrays
的静态方法stream()
获取数组流
static <T> Stream<T> stream(T[] array) //返回数组流
还重载形式:
public static IntStream stream(int[] array)
public static LongStream stream(long[] array)
public static DoubleStream stream(double[] array)
如:
//通过 Arrays 中的 stream() 获取一个数组流
Integer[] nums = new Integer[10];
Stream<Integer> stream1 = Arrays.stream(nums);
(3)根据值获取流
可以使用静态方法Stream.of()
利用显示值创建流,它可以接受任意数量的参数。
public static<T> Stream<T> stream(T... values)
如:
// 通过 Stream 类中静态方法 of()
Stream<Integer> stream2 = Stream.of(1,2,3,4,5,6);
(4)创建无限流
可以使用静态方法Stream.iterate()
和Stream.generate()
创建无限流。
public static<T> Stream<T> iterate(final T seed, final UnaryOperator<T> f)
public static<T> Stream<T> generate(Supplier<T> s)
如:
//迭代
Stream<Integer> stream3 = Stream.iterate(0, (x) -> x + 2).limit(10);
stream3.forEach(System.out::println);
//生成
Stream<Double> stream4 = Stream.generate(Math::random).limit(2);
stream4.forEach(System.out::println);
2、中间操作
中间操作是对数据源进行处理的操作,多个中间操作连接起来形成中间链/流水线,除非中间链上触发终止操作,否则中间操作不会执行任何处理,而是在终止操作时一次性全部处理,这个过程称为”延迟加载”或“惰性求值”。
(1)筛选和切片
方法 | 说明 |
---|---|
filter(Predicate p) | 接收Lamba,从流中排除某些元素 |
distinct() | 筛选,通过流所生成元素的hashCode()和equals()去除重复元素 |
limit(long maxsize) | 截断流,使其元素不超过给定数量 |
skip(long n) | 跳过元素,返回一个扔掉了前n个元素的流。 若流中元素不足n个,则返回一个空流。与limit(n)互补。 |
如:
List<User> users = Arrays.asList(
new User(102, "李四", 59, 6666.66),
new User(101, "张三", 18, 9999.99),
new User(103, "王五", 28, 3333.33),
new User(104, "赵六", 8, 7777.77),
new User(104, "赵六", 8, 7777.77),
new User(104, "赵六", 8, 7777.77),
new User(105, "田七", 38, 5555.55)
);
//所有的中间操作不会做任何的处理
Stream<User> stream = users.stream()
.filter((e) -> {
System.out.println("测试中间操作执行");
return e.getAge() >= 30;
});
//只有当做终止操作时,所有的中间操作会一次性的全部执行,称为“惰性求值”
stream.forEach(System.out::println);
//去重复
users.stream().distinct().forEach(System.out::println);
users.stream().filter((e) -> {
System.out.println("短路!"); // && ||
return e.getSalary() >= 5000;
}).limit(3)
.forEach(System.out::println);
users.parallelStream().filter((e) -> e.getSalary() >= 5000)
.skip(2)
.forEach(System.out::println);
(2)映射
方法 | 说明 |
---|---|
map(Function f) | 接收一个函数作为参数,该函数会被应用到每个元素上, 并将其映射成一个新的元素。 |
mapToDouble(ToDoubleFunction df) | 接收一个函数作为参数,该函数会被应用到每个元素上, 产生一个新的DoubleStream。 |
mapToInt(ToIntFunction f) | 接收一个函数作为参数,该函数会被应用到每个元素上, 产生一个新的IntStream。 |
mapToLong(ToLongFunction f) | 接收一个函数作为参数,该函数会被应用到每个元素上, 产生一个新的LongStream。 |
flatMap(Function f) | 接收一个函数作为参数,将流中的每个值都换成另一个流, 然后把所有的流连接成一个新的流。 |
map(Function f)
和flatMap(Function f)
的区别关于有点类似于List
的add(Object o)
和addAll(Collection<?> c)
的关系区别;类似于Map
的 put(K key, V value)
和putAll(Map<?> m)
的关系区别。
如:
package com.zhangxiaocai.cn.java8.stream;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Stream;
import org.junit.Test;
public class StreamAPITest1 {
@Test
public void mapTest(){
Stream<String> str = users.stream().map((e) -> e.getName());
System.out.println("-------------------------------------------");
List<String> strList = Arrays.asList("hello", "stream", "API", "small", "rose");
Stream<String> stream = strList.stream().map(String::toUpperCase);
//打印结果
stream.forEach(System.out::println);
Stream<Stream<Character>> stream2 = strList.stream().map(TestStreamAPI1::filterCharacter);
//显示迭代,打印结果
stream2.forEach((sm) -> {
sm.forEach(System.out::println);
});
System.out.println("---------------------------------------------");
//扁平化的连接流
Stream<Character> stream3 = strList.stream().flatMap(TestStreamAPI1::filterCharacter);
stream3.forEach(System.out::println);
}
public static Stream<Character> filterCharacter(String str){
List<Character> list = new ArrayList<>();
for (Character ch : str.toCharArray()) {
list.add(ch);
}
return list.stream();
}
}
(3)排序
方法 | 说明 |
---|---|
sorted() | 产生一个新流,其中按自然顺序排序 |
sorted(Comparator comp) | 产生一个新流,其中按自定义比较器顺序排序 |
如:
//自然排序
users.stream().map(User::getName)
.sorted()
.forEach(System.out::println);
//比较器排序
users.stream().sorted((x, y) -> {
if(x.getAge() == y.getAge()){
return x.getName().compareTo(y.getName());
}else{
return Integer.compare(x.getAge(), y.getAge());
}
}).forEach(System.out::println);
3、终止操作
终端操作是让流的中间链生成结果。它的结果可以是任何不是流的值,比如:List
、Integer
、Map
,也可以是void
。
(1)查找匹配
方法 | 返回类型 | 说明 |
---|---|---|
allMatch(Predicate p) | Boolean | 检查是否匹配所有元素 |
anyMatch(Predicate p) | Boolean | 检查是否至少匹配一个元素 |
noneMatch(Predicate p) | Boolean | 检查是否没有匹配所有元素 |
findFirst() | Optional<T> | 返回第一个元素 |
findAny() | Optional<T> | 返回当前流中的任意元素 |
使用示例:
List<User> users = Arrays.asList(
new User(102, "李四", 59, 6666.66, Status.BUSY),
new User(101, "张三", 18, 9999.99, Status.FREE),
new User(103, "王五", 28, 3333.33, Status.VOCATION),
new User(104, "赵六", 8, 7777.77, Status.BUSY),
new User(104, "赵六", 8, 7777.77, Status.FREE),
new User(104, "赵六", 8, 7777.77, Status.FREE),
new User(105, "田七", 38, 5555.55, Status.BUSY)
);
boolean bl = users.stream().allMatch((e) -> e.getStatus().equals(Status.BUSY));
System.out.println(bl); //false
boolean bl1 = users.stream().anyMatch((e) -> e.getStatus().equals(Status.BUSY));
System.out.println(bl1); //true
boolean bl2 = users.stream().noneMatch((e) -> e.getStatus().equals(Status.BUSY));
System.out.println(bl2); //false
Optional<User> op = users.stream()
.sorted((e1, e2) -> Double.compare(e1.getSalary(), e2.getSalary()))
.findFirst();
System.out.println(op.get());
System.err.println("--------------------------------");
Optional<User> op2 = users.parallelStream()
.filter((e) -> e.getStatus().equals(Status.FREE))
.findAny();
System.out.println(op2.get());
(2)聚合
方法 | 返回类型 | 说明 |
---|---|---|
count() | Long | 返回流中元素总数 |
max(Comparator c) | Optional<T> | 返回流中最大值 |
min(Comparator c) | Optional<T> | 返回流中最小值 |
forEach(Consumer c) | Stream API的内部迭代 |
使用示例:
long count = users.stream().filter((e) -> e.getStatus().equals(Status.FREE))
.count();
System.out.println(count);
Optional<Double> op = users.stream()
.map(User::getSalary)
.max(Double::compare);
System.out.println(op.get());
Optional<User> op2 = users.stream()
.min((e1, e2) -> Double.compare(e1.getSalary(), e2.getSalary()));
System.out.println(op2.get());
Stream<User> stream = users.stream()
.filter((e) -> e.getStatus().equals(Status.FREE));
long count = stream.count();
//流进行了终止操作后,不能再次使用
stream.map(User::getSalary).max(Double::compare);
(3)归约
方法 | 说明 |
---|---|
reduce(T iden, BinaryOperator b) | 可以将流中元素反复结合起来,得到一个值,返回T |
reduce(BinaryOperator b) | 可以将流中元素反复结合起来,得到一个值,返回Optional<T> |
map
和reduce
的连接通常称为map-reduce
模式,因Google用它来进行网络搜索而出名。
使用示例:
List<Integer> list = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
Integer sum = list.stream().reduce(0, (x, y) -> x + y);
System.out.println(sum);
System.out.println("----------------------------------------");
Optional<Double> op = users.stream()
.map(User::getSalary)
.reduce(Double::sum);
System.out.println(op.get());
//查名字里出现"六"的次数
Optional<Integer> sum = users.stream()
.map(User::getName)
.flatMap(TestStreamAPI1::filterCharacter)
.map((ch) -> {
if(ch.equals('六'))
return 1;
else
return 0;
}).reduce(Integer::sum);
System.out.println(sum.get());
(4)收集
方法 | 说明 |
---|---|
collect(Collector c) | 将流转换为其他形式。接受一个Collector 接口的实现,用于给Stream中元素做汇总的方法。 |
Collector
接口中方法的实现决定了如何对流执行收集操作(如何收集到List
、Set
、Map
)。但是Collectors
实用类提供了很多静态方法,可以方便创建常见收集器实例,具体方法如下:
方法 | 返回类型 | 说明 |
---|---|---|
toList | List<T> | 把流中元素收集到List 集合 |
toSet | Set<T> | 把流中元素收集到Set 集合 |
toCollection | Collection<T> | 把流中元素收集到创建的集合 |
counting | Long | 计算流中元素的个数 |
summingInt | Integer | 对流中元素的整数属性求和 |
averagingInt | Double | 计算流中元素Intger 属性的平均值 |
sunmarizingInt | IntSummaryStatistics | 收集流中Intger 属性的统计值。如:平均值 |
joining | String | 连接流中每个字符串 |
maxBy | Optional<T> | 根据比较器选择最大值 |
minBy | Optional<T> | 根据比较器选择最小值 |
reducing | 归约产生的类型 | 从一个作为累加器的初始值开始,利用BinaryOperator 与流中元素逐个结合,从而归约成单个值 |
CollectingAndThen | 转换函数返回的类型 | 包裹另一个收集器,对其结果转换函数 |
groupingBy | Map<K,List<T>> | 根据某属性值对流分组,属性为K,结果为V |
partitioningBy | Map<Boolean,List<T>> | 根据true 或false 进行分区 |
方式使用示例:
// toList
List<String> list = users.stream()
.map(User::getName)
.collect(Collectors.toList());
list.forEach(System.out::println);
System.out.println("----------------------------------");
// toSet
Set<String> set = users.stream()
.map(User::getName)
.collect(Collectors.toSet());
set.forEach(System.out::println);
System.out.println("----------------------------------");
// toCollection
HashSet<String> hs = users.stream()
.map(User::getName)
.collect(Collectors.toCollection(HashSet::new));
hs.forEach(System.out::println);
// maxBy 最大值
Optional<Double> max = users.stream()
.map(User::getSalary)
.collect(Collectors.maxBy(Double::compare));
System.out.println(max.get());
// minBy 最小值
Optional<User> op = users.stream()
.collect(Collectors.minBy((e1, e2) -> Double.compare(e1.getSalary(), e2.getSalary())));
System.out.println(op.get());
// summingDouble
Double sum = users.stream()
.collect(Collectors.summingDouble(User::getSalary));
System.out.println(sum);
// averagingDouble 平均值
Double avg = users.stream()
.collect(Collectors.averagingDouble(User::getSalary));
System.out.println(avg);
// counting 总数
Long count = users.stream()
.collect(Collectors.counting());
System.out.println(count);
System.out.println("--------------------------------------------");
// summarizingDouble 总和
DoubleSummaryStatistics dss = users.stream()
.collect(Collectors.summarizingDouble(User::getSalary));
System.out.println(dss.getMax());
System.out.println(dss.getSum());
System.out.println(dss.getAverage());
// groupingBy 分组
Map<Status, List<User>> map = users.stream()
.collect(Collectors.groupingBy(User::getStatus));
System.out.println(map);
//多级分组
Map<Status, Map<String, List<User>>> map = users.stream()
.collect(Collectors.groupingBy(User::getStatus, Collectors.groupingBy((e) -> {
if(e.getAge() >= 60)
return "老年";
else if(e.getAge() >= 35)
return "中年";
else
return "成年";
})));
System.out.println(map);
// partitioningBy 分区
Map<Boolean, List<User>> map = users.stream()
.collect(Collectors.partitioningBy((e) -> e.getSalary() >= 5000));
System.out.println(map);
// joining 连接
String str = users.stream()
.map(User::getName)
.collect(Collectors.joining("," , "----", "----"));
System.out.println(str); //以逗号分隔,前后加 ------
// reducing 归约
Optional<Double> sum = users.stream()
.map(User::getSalary)
.collect(Collectors.reducing(Double::sum));
System.out.println(sum.get());
三、并行流与串行流
并行流 就是把一个内容分成多个数据库,并用不同的线程分别处理每个数据块的流。
java8 中将并行进行优化,可以很容易对数据进行并行操作。Stream API 可以声明性地通过parallel()
和sequential()
在并行流和串行流之间进行切换。
一个整数累加的示例:
@Test
public void paralleltTest(){
long start = System.currentTimeMillis();
Long sum = LongStream.rangeClosed(0L, 10000000000L)
.parallel()
.sum();
System.out.println(sum);
long end = System.currentTimeMillis();
System.out.println("耗费的时间为: " + (end - start));
}
四、ForK /Join 框架简介
ForK /Join 框架: 就是在比要时,可以将一个大任务,进行拆分(fork
)成若干个小任务(拆到不可再拆时),再将一个个的小任务运算的结果进行Join
汇总。
ForK / Join 框架和传统线程池区别:
ForK / Join 框架采用的是“Working-Stealing(工作窃取)”
模式:当执行新的任务时它可以将其拆分成更小颗粒的任务执行,并将小任务添加到线程队列中,然后再从一个随机线程的队列中偷一个并把它放在自己队列中。
相比于一般线程池实现,ForK / Join 的优势是在对其中包含的任务的处理方式上,在一般的线程池中,如果一个线程正在执行的任务有余某些原因无法继续执行,那么该线程会处于等待状态。而ForK / Join 框架实现中,如果某个子问题由于等待另一个子问题的完成而无法继续执行时,处理该子问题的线程会主动寻找其他尚未运行的子问题来执行。这种方式减少了线程的等待时间,可以有效的提高性能。
Fork/Join框架主要类
名称 | 作用 |
---|---|
ForkJoinPool | 用来执行Task,或生成新的ForkJoinWorkerThread,执行 ForkJoinWorkerThread 间的 work-stealing 逻辑。ForkJoinPool 不是为了替代 ExecutorService,而是它的补充,在某些应用场景下性能比 ExecutorService 更好。 |
ForkJoinTask | 执行具体的分支逻辑,声明以同步/异步方式进行执行 |
ForkJoinWorkerThread | 是 ForkJoinPool 内的 worker thread,执行ForkJoinTask, 内部有 ForkJoinPool.WorkQueue来保存要执行的ForkJoinTask。 |
ForkJoinPool.WorkQueue | 保存要执行的ForkJoinTask。 |
Fork/Join框架主要逻辑:
ForkJoinPool 的每个工作线程都维护着一个工作队列(WorkQueue),这是一个双端队列(Deque),里面存放的对象是任务(ForkJoinTask)。
每个工作线程在运行中产生新的任务(通常是因为调用了fork()
)时,会放入工作队列的队尾,并且工作线程在处理自己的工作队列时,使用的是LIFO
方式,也就是说每次从队尾取出任务来执行。
每个工作线程在处理自己的工作队列同时,会尝试窃取一个任务(或是来自于刚刚提交到 pool 的任务,或是来自于其他工作线程的工作队列),窃取的任务位于其他线程的工作队列的队首,也就是说工作线程在窃取其他工作线程的任务时,使用的是 FIFO 方式。
在遇到 join() 时,如果需要 join 的任务尚未完成,则会先处理其他任务,并等待其完成。
在既没有自己的任务,也没有可以窃取的任务时,进入休眠。
一个整数累加的示例:
package com.atguigu.java8;
import java.util.concurrent.RecursiveTask;
public class ForkJoinHandle extends RecursiveTask<Long>{
private static final long serialVersionUID = 13475679780L;
private long start;
private long end;
private static final long THRESHOLD = 10000L; //临界值
public ForkJoinHandle(long start, long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
long length = end - start;
if(length <= THRESHOLD){
long sum = 0;
for (long i = start; i <= end; i++) {
sum += i;
}
return sum;
}else{
long middle = (start + end) / 2;
ForkJoinHandle left = new ForkJoinHandle(start, middle);
left.fork(); //拆分,并将该子任务压入线程队列
ForkJoinHandle right = new ForkJoinHandle(middle+1, end);
right.fork();
return left.join() + right.join();
}
}
}
测试类:
@Test
public void forkJoinTest(){
long start = System.currentTimeMillis();
ForkJoinPool pool = new ForkJoinPool();
ForkJoinTask<Long> task = new ForkJoinHandle(0L, 20000000000L);
long sum = pool.invoke(task);
System.out.println(sum);
long end = System.currentTimeMillis();
System.out.println("耗费的时间为: " + (end - start));
}
@Test
public void forkJoinTest2(){
ForkJoinPool forkjoinPool = new ForkJoinPool();
ForkJoinHandle task = new ForkJoinHandle(1, 100);
//执行一个任务
Future<Integer> result = forkjoinPool.submit(task);
try {
log.info("result" + result.get());
} catch (Exception e) {
e.printstacktrace();
}
注意事宜:
(1)ForkJoinPool 使用submit 或 invoke 提交的区别:invoke是同步执行,调用之后需要等待任务完成,才能执行后面的代码;submit是异步执行,只有在Future调用get的时候会阻塞。
(2)这里继承的是RecursiveTask,还可以继承RecursiveAction。前者适用于有返回值的场景,而后者适合于没有返回值的场景
(3)这里执行子任务调用fork方法并不是最佳的选择,最佳的选择是invokeAll方法。
left.fork();
right.fork();
替换为
invokeAll(left, right);