嘘~ 正在从服务器偷取页面 . . .

Java8 中的Stream API


Java8 中的Stream API

Stream 是 Java8 中处理集合的关键抽象概念。可以对集合进行操作,执行复杂的查找、过滤、映射数据等相关操作。使用Stream API 对集合操作类似于SQL查询。

一 、Stream (流)是什么

stream 是一种数据渠道,用来操作数据源并生成新的元素序列的过程。是一个来自数据源的元素队列并支持查找过滤映射等聚合操作。

需要注意点:
(1)Stream 自己不会存存储元素。(所以不是集合)
(2)Stream 的来源可以是集合、数组、IO推导、产生器generator等。
(3)Stream 不会改变源对象,在 Stream 结束操作之后会返回一个持有操作结果的新 Stream 。
(4)Stream 的聚合操作类似SQL语句,如:filter、map、reduce、find、match、sorted等
(4)Stream 操作是一种 Pipelining (流水线技术),中间操作返回流对象本身,这样可以将多个操作串联成一个管道,如fluent style(流式风格),可以实现延迟执行(Laziness)和短路(Short-Circuiting)的优化。
(5)Stream 通过访问者模式(Visitor)实现内部迭代方式,无须通过Iterator或For进行外部显示迭代。

二、Stream 操作步骤

Stream 操作分三步:

(1)创建 Stream

利用数据源(集合、数组、IO推导、产生器generator等)获取流。

(2)中间操作

中间操作(intermediate operation) 可以是一个中间链,对数据源的数据进行处理

(3)最终操作(terminal operation)

最终操作,也可以叫终止操作、终端操作。最终操作执行中间操作链,并生成结果。

流程大概如图:

+--------------------+       +------+   +------+   +---+   +-------+       +--------+
| stream of elements +-----> |filter+-> |sorted+-> |map+-> |flatMap|-----> |collect +
+--------------------+       +------+   +------+   +---+   +-------+       +--------+

1、创建流

(1)Collection接口获取流

java8 的Collection接口被扩展,提供了两个获取流的方法:

default Stream<E> stream() //返回一个串行流
default Stream<E> parallelStream() //返回一个并行流

如:

//Collection 提供了两个方法  stream() 与 parallelStream()
List<String> list = new ArrayList<String>();
Stream<String> stream = list.stream(); //获取一个顺序流
Stream<String> parallelStream = list.parallelStream(); //获取一个并行流

(2)数组获取数组流

java8 中的Arrays的静态方法stream()获取数组流

static <T> Stream<T> stream(T[] array) //返回数组流

还重载形式:

public static IntStream stream(int[] array)
public static LongStream stream(long[] array)
public static DoubleStream stream(double[] array)

如:

//通过 Arrays 中的 stream() 获取一个数组流
Integer[] nums = new Integer[10];
Stream<Integer> stream1 = Arrays.stream(nums);

(3)根据值获取流

可以使用静态方法Stream.of()利用显示值创建流,它可以接受任意数量的参数。

public static<T> Stream<T> stream(T... values)

如:

// 通过 Stream 类中静态方法 of()
Stream<Integer> stream2 = Stream.of(1,2,3,4,5,6);

(4)创建无限流

可以使用静态方法Stream.iterate()Stream.generate()创建无限流。

public static<T> Stream<T> iterate(final T seed, final UnaryOperator<T> f)
 public static<T> Stream<T> generate(Supplier<T> s)

如:

//迭代
Stream<Integer> stream3 = Stream.iterate(0, (x) -> x + 2).limit(10);
stream3.forEach(System.out::println);

//生成
Stream<Double> stream4 = Stream.generate(Math::random).limit(2);
stream4.forEach(System.out::println);

2、中间操作

中间操作是对数据源进行处理的操作,多个中间操作连接起来形成中间链/流水线,除非中间链上触发终止操作,否则中间操作不会执行任何处理,而是在终止操作时一次性全部处理,这个过程称为”延迟加载”或“惰性求值”。

(1)筛选和切片

方法说明
filter(Predicate p)接收Lamba,从流中排除某些元素
distinct()筛选,通过流所生成元素的hashCode()和equals()去除重复元素
limit(long maxsize)截断流,使其元素不超过给定数量
skip(long n)跳过元素,返回一个扔掉了前n个元素的流。
若流中元素不足n个,则返回一个空流。与limit(n)互补。

如:

List<User> users = Arrays.asList(
            new User(102, "李四", 59, 6666.66),
            new User(101, "张三", 18, 9999.99),
            new User(103, "王五", 28, 3333.33),
            new User(104, "赵六", 8, 7777.77),
            new User(104, "赵六", 8, 7777.77),
            new User(104, "赵六", 8, 7777.77),
            new User(105, "田七", 38, 5555.55)
    );

//所有的中间操作不会做任何的处理
Stream<User> stream = users.stream()
            .filter((e) -> {
                System.out.println("测试中间操作执行");
                return e.getAge() >= 30;
        });

//只有当做终止操作时,所有的中间操作会一次性的全部执行,称为“惰性求值”
stream.forEach(System.out::println);

//去重复
users.stream().distinct().forEach(System.out::println);

users.stream().filter((e) -> {
                System.out.println("短路!"); // &&  ||
                return e.getSalary() >= 5000;
            }).limit(3)
            .forEach(System.out::println);

users.parallelStream().filter((e) -> e.getSalary() >= 5000)
            .skip(2)
            .forEach(System.out::println);

(2)映射

方法说明
map(Function f)接收一个函数作为参数,该函数会被应用到每个元素上,
并将其映射成一个新的元素。
mapToDouble(ToDoubleFunction df)接收一个函数作为参数,该函数会被应用到每个元素上,
产生一个新的DoubleStream。
mapToInt(ToIntFunction f)接收一个函数作为参数,该函数会被应用到每个元素上,
产生一个新的IntStream。
mapToLong(ToLongFunction f)接收一个函数作为参数,该函数会被应用到每个元素上,
产生一个新的LongStream。
flatMap(Function f)接收一个函数作为参数,将流中的每个值都换成另一个流,
然后把所有的流连接成一个新的流。

map(Function f)flatMap(Function f)的区别关于有点类似于Listadd(Object o)addAll(Collection<?> c)的关系区别;类似于Mapput(K key, V value)putAll(Map<?> m)的关系区别。

如:

package com.zhangxiaocai.cn.java8.stream;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Stream;

import org.junit.Test;

public class StreamAPITest1 {

    @Test
    public void mapTest(){
        Stream<String> str = users.stream().map((e) -> e.getName());

        System.out.println("-------------------------------------------");

        List<String> strList = Arrays.asList("hello", "stream", "API", "small", "rose");

        Stream<String> stream = strList.stream().map(String::toUpperCase);

        //打印结果
        stream.forEach(System.out::println);

        Stream<Stream<Character>> stream2 = strList.stream().map(TestStreamAPI1::filterCharacter);

        //显示迭代,打印结果
        stream2.forEach((sm) -> {
                    sm.forEach(System.out::println);
                });

        System.out.println("---------------------------------------------");

        //扁平化的连接流
        Stream<Character> stream3 = strList.stream().flatMap(TestStreamAPI1::filterCharacter);

        stream3.forEach(System.out::println);
    }

    public static Stream<Character> filterCharacter(String str){
        List<Character> list = new ArrayList<>();
        for (Character ch : str.toCharArray()) {
            list.add(ch);
        }
        return list.stream();
    }
}

(3)排序

方法说明
sorted()产生一个新流,其中按自然顺序排序
sorted(Comparator comp)产生一个新流,其中按自定义比较器顺序排序

如:

//自然排序
users.stream().map(User::getName)
            .sorted()
            .forEach(System.out::println);

//比较器排序
users.stream().sorted((x, y) -> {
                if(x.getAge() == y.getAge()){
                    return x.getName().compareTo(y.getName());
                }else{
                    return Integer.compare(x.getAge(), y.getAge());
                }
            }).forEach(System.out::println);

3、终止操作

终端操作是让流的中间链生成结果。它的结果可以是任何不是流的值,比如:ListIntegerMap,也可以是void

(1)查找匹配

方法返回类型说明
allMatch(Predicate p)Boolean检查是否匹配所有元素
anyMatch(Predicate p)Boolean检查是否至少匹配一个元素
noneMatch(Predicate p)Boolean检查是否没有匹配所有元素
findFirst()Optional<T>返回第一个元素
findAny()Optional<T>返回当前流中的任意元素

使用示例:

List<User> users = Arrays.asList(
            new User(102, "李四", 59, 6666.66, Status.BUSY),
            new User(101, "张三", 18, 9999.99, Status.FREE),
            new User(103, "王五", 28, 3333.33, Status.VOCATION),
            new User(104, "赵六", 8, 7777.77, Status.BUSY),
            new User(104, "赵六", 8, 7777.77, Status.FREE),
            new User(104, "赵六", 8, 7777.77, Status.FREE),
            new User(105, "田七", 38, 5555.55, Status.BUSY)
    );
boolean bl = users.stream().allMatch((e) -> e.getStatus().equals(Status.BUSY));
System.out.println(bl); //false

boolean bl1 = users.stream().anyMatch((e) -> e.getStatus().equals(Status.BUSY));
System.out.println(bl1); //true

boolean bl2 = users.stream().noneMatch((e) -> e.getStatus().equals(Status.BUSY));
System.out.println(bl2); //false

Optional<User> op = users.stream()
            .sorted((e1, e2) -> Double.compare(e1.getSalary(), e2.getSalary()))
            .findFirst();

System.out.println(op.get());

System.err.println("--------------------------------");

Optional<User> op2 = users.parallelStream()
            .filter((e) -> e.getStatus().equals(Status.FREE))
            .findAny();

System.out.println(op2.get());

(2)聚合

方法返回类型说明
count()Long返回流中元素总数
max(Comparator c)Optional<T>返回流中最大值
min(Comparator c)Optional<T>返回流中最小值
forEach(Consumer c)Stream API的内部迭代

使用示例:

long count = users.stream().filter((e) -> e.getStatus().equals(Status.FREE))
                         .count();
System.out.println(count);

Optional<Double> op = users.stream()
            .map(User::getSalary)
            .max(Double::compare);

System.out.println(op.get());

Optional<User> op2 = users.stream()
            .min((e1, e2) -> Double.compare(e1.getSalary(), e2.getSalary()));

System.out.println(op2.get());


Stream<User> stream = users.stream()
         .filter((e) -> e.getStatus().equals(Status.FREE));

long count = stream.count();
//流进行了终止操作后,不能再次使用
stream.map(User::getSalary).max(Double::compare);

(3)归约

方法说明
reduce(T iden, BinaryOperator b)可以将流中元素反复结合起来,得到一个值,返回T
reduce(BinaryOperator b)可以将流中元素反复结合起来,得到一个值,返回Optional<T>

mapreduce的连接通常称为map-reduce模式,因Google用它来进行网络搜索而出名。

使用示例:

List<Integer> list = Arrays.asList(1,2,3,4,5,6,7,8,9,10);

Integer sum = list.stream().reduce(0, (x, y) -> x + y);
System.out.println(sum);

System.out.println("----------------------------------------");

Optional<Double> op = users.stream()
            .map(User::getSalary)
            .reduce(Double::sum);

System.out.println(op.get());

//查名字里出现"六"的次数
Optional<Integer> sum = users.stream()
            .map(User::getName)
            .flatMap(TestStreamAPI1::filterCharacter)
            .map((ch) -> {
                if(ch.equals('六'))
                    return 1;
                else 
                    return 0;
            }).reduce(Integer::sum);

        System.out.println(sum.get());

(4)收集

方法说明
collect(Collector c)将流转换为其他形式。接受一个Collector接口的实现,用于给Stream中元素做汇总的方法。

Collector接口中方法的实现决定了如何对流执行收集操作(如何收集到ListSetMap)。但是Collectors实用类提供了很多静态方法,可以方便创建常见收集器实例,具体方法如下:

方法返回类型说明
toListList<T>把流中元素收集到List集合
toSetSet<T>把流中元素收集到Set集合
toCollectionCollection<T>把流中元素收集到创建的集合
countingLong计算流中元素的个数
summingIntInteger对流中元素的整数属性求和
averagingIntDouble计算流中元素Intger属性的平均值
sunmarizingIntIntSummaryStatistics收集流中Intger属性的统计值。如:平均值
joiningString连接流中每个字符串
maxByOptional<T>根据比较器选择最大值
minByOptional<T>根据比较器选择最小值
reducing归约产生的类型从一个作为累加器的初始值开始,利用BinaryOperator与流中元素逐个结合,从而归约成单个值
CollectingAndThen转换函数返回的类型包裹另一个收集器,对其结果转换函数
groupingByMap<K,List<T>>根据某属性值对流分组,属性为K,结果为V
partitioningByMap<Boolean,List<T>>根据truefalse进行分区

方式使用示例:

// toList
List<String> list = users.stream()
                    .map(User::getName)
                    .collect(Collectors.toList());
list.forEach(System.out::println);
System.out.println("----------------------------------");

// toSet
Set<String> set = users.stream()
                    .map(User::getName)
                    .collect(Collectors.toSet());
set.forEach(System.out::println);
System.out.println("----------------------------------");

// toCollection
HashSet<String> hs = users.stream()
                    .map(User::getName)
                    .collect(Collectors.toCollection(HashSet::new));
hs.forEach(System.out::println);

// maxBy 最大值
Optional<Double> max = users.stream()
            .map(User::getSalary)
            .collect(Collectors.maxBy(Double::compare));
System.out.println(max.get());

// minBy 最小值
Optional<User> op = users.stream()
            .collect(Collectors.minBy((e1, e2) -> Double.compare(e1.getSalary(), e2.getSalary())));
System.out.println(op.get());

// summingDouble
Double sum = users.stream()
            .collect(Collectors.summingDouble(User::getSalary));
System.out.println(sum);

// averagingDouble  平均值
Double avg = users.stream()
            .collect(Collectors.averagingDouble(User::getSalary));
System.out.println(avg);

// counting 总数
Long count = users.stream()
            .collect(Collectors.counting());
System.out.println(count);
System.out.println("--------------------------------------------");

// summarizingDouble  总和
DoubleSummaryStatistics dss = users.stream()
            .collect(Collectors.summarizingDouble(User::getSalary));
System.out.println(dss.getMax());
System.out.println(dss.getSum());
System.out.println(dss.getAverage());

// groupingBy 分组
Map<Status, List<User>> map = users.stream()
            .collect(Collectors.groupingBy(User::getStatus));

System.out.println(map);

//多级分组
Map<Status, Map<String, List<User>>> map = users.stream()
            .collect(Collectors.groupingBy(User::getStatus, Collectors.groupingBy((e) -> {
                if(e.getAge() >= 60)
                    return "老年";
                else if(e.getAge() >= 35)
                    return "中年";
                else
                    return "成年";
            })));
System.out.println(map);

// partitioningBy 分区
Map<Boolean, List<User>> map = users.stream()
            .collect(Collectors.partitioningBy((e) -> e.getSalary() >= 5000));
System.out.println(map);

// joining 连接
String str = users.stream()
            .map(User::getName)
            .collect(Collectors.joining("," , "----", "----"));
System.out.println(str); //以逗号分隔,前后加 ------

// reducing 归约
Optional<Double> sum = users.stream()
            .map(User::getSalary)
            .collect(Collectors.reducing(Double::sum));
System.out.println(sum.get());

三、并行流与串行流

并行流 就是把一个内容分成多个数据库,并用不同的线程分别处理每个数据块的流。

java8 中将并行进行优化,可以很容易对数据进行并行操作。Stream API 可以声明性地通过parallel()sequential()在并行流和串行流之间进行切换。

一个整数累加的示例:

@Test
    public void paralleltTest(){
        long start = System.currentTimeMillis();

        Long sum = LongStream.rangeClosed(0L, 10000000000L)
                             .parallel()
                             .sum();

        System.out.println(sum);

        long end = System.currentTimeMillis();

        System.out.println("耗费的时间为: " + (end - start)); 
    }

四、ForK /Join 框架简介

ForK /Join 框架: 就是在比要时,可以将一个大任务,进行拆分(fork)成若干个小任务(拆到不可再拆时),再将一个个的小任务运算的结果进行Join汇总。

ForK / Join 框架和传统线程池区别:

ForK / Join 框架采用的是“Working-Stealing(工作窃取)”模式:当执行新的任务时它可以将其拆分成更小颗粒的任务执行,并将小任务添加到线程队列中,然后再从一个随机线程的队列中偷一个并把它放在自己队列中。

相比于一般线程池实现,ForK / Join 的优势是在对其中包含的任务的处理方式上,在一般的线程池中,如果一个线程正在执行的任务有余某些原因无法继续执行,那么该线程会处于等待状态。而ForK / Join 框架实现中,如果某个子问题由于等待另一个子问题的完成而无法继续执行时,处理该子问题的线程会主动寻找其他尚未运行的子问题来执行。这种方式减少了线程的等待时间,可以有效的提高性能。

Fork/Join框架主要类

名称作用
ForkJoinPool用来执行Task,或生成新的ForkJoinWorkerThread,执行 ForkJoinWorkerThread 间的 work-stealing 逻辑。ForkJoinPool 不是为了替代 ExecutorService,而是它的补充,在某些应用场景下性能比 ExecutorService 更好。
ForkJoinTask执行具体的分支逻辑,声明以同步/异步方式进行执行
ForkJoinWorkerThread是 ForkJoinPool 内的 worker thread,执行ForkJoinTask, 内部有 ForkJoinPool.WorkQueue来保存要执行的ForkJoinTask。
ForkJoinPool.WorkQueue保存要执行的ForkJoinTask。

Fork/Join框架主要逻辑:

ForkJoinPool 的每个工作线程都维护着一个工作队列(WorkQueue),这是一个双端队列(Deque),里面存放的对象是任务(ForkJoinTask)。

每个工作线程在运行中产生新的任务(通常是因为调用了
fork())时,会放入工作队列的队尾,并且工作线程在处理自己的工作队列时,使用的是LIFO方式,也就是说每次从队尾取出任务来执行。

每个工作线程在处理自己的工作队列同时,会尝试窃取一个任务(或是来自于刚刚提交到 pool 的任务,或是来自于其他工作线程的工作队列),窃取的任务位于其他线程的工作队列的队首,也就是说工作线程在窃取其他工作线程的任务时,使用的是 FIFO 方式。

在遇到 join() 时,如果需要 join 的任务尚未完成,则会先处理其他任务,并等待其完成。

在既没有自己的任务,也没有可以窃取的任务时,进入休眠。

一个整数累加的示例:

package com.atguigu.java8;

import java.util.concurrent.RecursiveTask;

public class ForkJoinHandle extends RecursiveTask<Long>{

    private static final long serialVersionUID = 13475679780L;

    private long start;
    private long end;

    private static final long THRESHOLD = 10000L; //临界值

    public ForkJoinHandle(long start, long end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        long length = end - start;

        if(length <= THRESHOLD){
            long sum = 0;

            for (long i = start; i <= end; i++) {
                sum += i;
            }

            return sum;
        }else{
            long middle = (start + end) / 2;

            ForkJoinHandle left = new ForkJoinHandle(start, middle);
            left.fork(); //拆分,并将该子任务压入线程队列

            ForkJoinHandle right = new ForkJoinHandle(middle+1, end);
            right.fork();

            return left.join() + right.join();
        }

    }

}

测试类:


    @Test
    public void forkJoinTest(){
        long start = System.currentTimeMillis();

        ForkJoinPool pool = new ForkJoinPool();
        ForkJoinTask<Long> task = new ForkJoinHandle(0L, 20000000000L);

        long sum = pool.invoke(task);
        System.out.println(sum);

        long end = System.currentTimeMillis();

        System.out.println("耗费的时间为: " + (end - start)); 
    }

    @Test
    public void forkJoinTest2(){

        ForkJoinPool forkjoinPool = new ForkJoinPool();

        ForkJoinHandle task = new ForkJoinHandle(1, 100);
        //执行一个任务
        Future<Integer> result = forkjoinPool.submit(task);

        try {
            log.info("result" + result.get());
        } catch (Exception e) {
            e.printstacktrace();
        }

注意事宜:
(1)ForkJoinPool 使用submit 或 invoke 提交的区别:invoke是同步执行,调用之后需要等待任务完成,才能执行后面的代码;submit是异步执行,只有在Future调用get的时候会阻塞。

(2)这里继承的是RecursiveTask,还可以继承RecursiveAction。前者适用于有返回值的场景,而后者适合于没有返回值的场景

(3)这里执行子任务调用fork方法并不是最佳的选择,最佳的选择是invokeAll方法。

left.fork(); 
right.fork();

替换为

invokeAll(left, right);

参考文章:
《并发之Fork/Join框架使用及注意点》



版权声明: 本博客所有文章除特別声明外,均采用 CC BY-SA 4.0 许可协议。转载请注明来源 Small-Rose / 张小菜 !
评论
  目录