0%

Java-流(二)

收集结果

处理完流之后,通常会想要查看其元素,此时可以调用iterator()方法,它会产生可以用来访问元素的旧时风格的迭代器

或者,可以用forEach()方法,将某函数应用于每一个元素

在并行流上,forEach()方法会以任意顺序遍历各个元素,如果想要按照流中的顺序来处理元素,可以用forEachOrdered()方法,不过这个方法会丧失并行处理的优势

有时,我们也会想把结果收集到数据结构中,可以用toArray()方法获得由流的元素构成的数组
不过因为无法在运行时创建泛型数组,toArray()方法会返回一个Object[]数组,如果想要让数组拥有正确的类型,可以传递数组构造器

1
String result[] = stream.toArray(String[]::new);

针对将流中的元素收集到另一个目标中,可以用collect()方法,它会接受一个Collevtor接口的实例,Collectors类提供了大量用于生成公共收集器的工厂方法

一些简单收集结果的用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 将流收集到列表中
List<String> result = stream.collect(Collectors.toList());
// 将流收集到集合中
Set<String> result2 = stream.collect(Collectors.toSet());

// 控制获得的集合的种类
TreeSet<String> result3 = stream.collect(Collectors.toCollection(TreeSet::new));

// 通过连接操作来收集流中的所有字符串
String result4 = stream.collect(Collectors.joining());
// 在元素之间添加分隔符
String result5 = stream.collect(Collectors.joining(", "));

// 如果流中包含除字符串以外的其他对象,我们需要现将其转换为字符串
String result6 = stream.map(Object::toString).collect(Collectors.joining(", "));

如果想要将流的结果约简为总和、平均值、最大或最小值,可以用summarizing[Int|Long|Double]方法中的某一个,这些方法会接受一个将流对象映射为数据的函数,同时产生类型为[Int|Long|Double]SummaryStatistics对象的结果,同时计算总和、平均值、最大或最小值

1
2
3
IntSummaryStatistics summary = stream.collect(Collectors.summarizingInt(String::length));
double averageWordLength = summary.getAverage();
double maxWordLength = summary.getMax();

API java.util.stream.Stream

  • void forEach(Consumer<? super T> action);
  • Object[] toArray();
  • <A> A[] toArray(IntFunction<A[]> generator);
  • <R, A> R collect(Collector<? super T, A, R> collector);

API java.util.stream.Collectors

  • public static <T> Collector<T, ?, List<T>> toList()
  • public static <T> Collector<T, ?, Set<T>> toSet()
  • public static <T, C extends Collection<T>> Collector<T, ?, C> toCollection(Supplier<C> collectionFactory)
    将元素收集到任意集合中的收集器,可以传入构造器引用
  • public static Collector<CharSequence, ?, String> joining()
  • public static Collector<CharSequence, ?, String> joining(CharSequence delimiter)
  • public static Collector<CharSequence, ?, String> joining(CharSequence delimiter, CharSequence prefix, CharSequence suffix)
    产生一个连接字符串的收集器,分隔符会置于字符串之间,且第一个字符串可以有前缀,最后一个可以有后缀
  • public static <T> Collector<T, ?, Integer> summingInt|Long|Double(ToIntFunction<? super T> mapper)

API java.util.Int|Long|DoubleSummaryStatistics

  • public final long getCount()
    产生汇总后的元素个数
  • public final long|double getSum()
  • public final double getAverage()
    产生汇总后的元素的总和或平均值,没有任何元素时返回0
  • public final int|long|double getMin()
  • public final int|long|double getMax()
    产生汇总后的元素的最大值或最小值,没有任何元素时,产生Integer|Long|Double.MAX|MIN_VALUE

收集到映射表,群组与分区、下游收集器

示例 1

首先,再主类中定义一个静态内部类Person

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static class Person {
private int id;
private String name;

public Person(int id, String name) {
this.id = id;
this.name = name;
}

// 省略 getter and setter

@Override
public String toString() {
return getClass().getName() + "[id=" + id + ",name=" + name + "]";
}
}

再定义一个静态方法perple()

1
2
3
4
5
public static Stream<Person> people() {
return Stream.of(new Person(1001, "Peter"),
new Person(1002, "Paul"),
new Person(1003, "Mary"));
}

如果有一个Stream<Person>对象,想要将其元素收集到一个映射表中,后续就可以通过ID查找人员

Collectors.toMap()方法有有很多重载形式,其中一个两个引元,用来产生映射表的键和值,例如

1
2
3
Map<Integer, String> idToName = people().collect(
Collectors.toMap(Person::getId, Person::getName));
System.out.println("idToName: " + idToName);

输出结果如下

1
idToName: {1001=Peter, 1002=Paul, 1003=Mary}

通常情况下,值应该是实际的元素,第二个函数就可以使用Function.identity()

1
2
3
Map<Integer, Person> idToPerson = people().collect(
Collectors.toMap(Person::getId, Function.identity()));
System.out.println("idToPerson: " + idToPerson.getClass().getName() + idToPerson);

输出结果如下

1
2
3
4
idToPerson: java.util.HashMap
{1001=cn.nopech.stream.CollectingIntoMaps$Person[id=1001,name=Peter],
1002=cn.nopech.stream.CollectingIntoMaps$Person[id=1002,name=Paul],
1003=cn.nopech.stream.CollectingIntoMaps$Person[id=1003,name=Mary]}

如果有多个元素具有相同的键,就会存在冲突,收集器会抛出IllegalStateExecption异常

可以通过toMap方法的第三个函数引元来覆盖行为,该函数会针对给定的已有值和新值来解决冲突并决定键对应的值,示例代码如下

1
2
3
4
5
6
7
idToPerson = people().collect(
Collectors.toMap(Person::getId, Function.identity(),
(exisitingValue, newValue) -> {
throw new IllegalStateException();
}, TreeMap::new)
);
System.out.println("idToPerson: " + idToPerson.getClass().getName() + idToPerson);

示例 2

首先,我们构建一个映射表,存储所有可用Locale中的每种语言
它以默认Locale类中的名字(如"German")为键,以其本地化的名字(如"Deutsch")为值

1
2
3
4
5
6
7
8
Stream<Locale> locales = Stream.of(Locale.getAvailableLocales());
Map<String, String> languageNames = locales.collect(
Collectors.toMap(
Locale::getDisplayLanguage,
l -> l.getDisplayLanguage(l),
(existingValue, newValue) -> existingValue)
);
System.out.println("languagesNames: " + languageNames);

我们不用关心同一种语言是否可能出现两次,只要记录第一项就好,部分输出如下

1
2
3
languagesNames: {旺杜语=ewondo, 乌兹别克语=o‘zbek, =, 桑布鲁语=Kisampur, 
爪哇语=Jawa, 宿务语=Cebuano, 隆迪语=Ikirundi, 索加语=Olusoga, 克罗地亚语=hrvatski,
尼昂科勒语=Runyankore, 亚美尼亚语=հայերեն, 瑞士德语=Schwiizertüütsch, ...

现在,假设我们需要了解给定国家的所有语言,就需要一个Map<String, Set<String>>对象,例如,"Switzerland"的值是集合[French, German, Italian]

首先,为每种语言都存储一个单例集,无论何时,只要找到了给定国家的新语言,都将已有集合和新集合做并集操作

1
2
3
4
5
6
7
8
9
10
11
12
13
Stream<Locale> locales = Stream.of(Locale.getAvailableLocales());
Map<String, Set<String>> countryLanguageSets = locales.collect(
Collectors.toMap(
Locale::getDisplayCountry,
l -> Collections.singleton(l.getDisplayLanguage()),
(a, b) -> { // union of a and b
Set<String> union = new HashSet<>(a);
union.addAll(b);
return union;
}
)
);
System.out.println("countryLanguageSets: " + countryLanguageSets);

展示部分输出

1
2
3
countryLanguageSets: {巴西=[西班牙语, 葡萄牙语], 
中国=[中文, 四川彝语, 粤语, 维吾尔语, 藏语], 澳大利亚=[英语], 蒙古=[蒙古语],
佛得角=[卡布佛得鲁语, 葡萄牙语], 台湾=[中文], 萨摩亚=[英语], ...

API java.util.stream.Collectors

  • 1
    2
    3
    public static <T, K, U, M extends Map<K, U>>
    Collector<T, ?, Map<K,U>> toMap(Function<? super T, ? extends K> keyMapper,
    Function<? super T, ? extends U> valueMapper)
  • 1
    2
    3
    4
    public static <T, K, U, M extends Map<K, U>>
    Collector<T, ?, Map<K,U>> toMap(Function<? super T, ? extends K> keyMapper,
    Function<? super T, ? extends U> valueMapper,
    BinaryOperator<U> mergeFunction)
  • 1
    2
    3
    4
    5
    public static <T, K, U, M extends Map<K, U>>
    Collector<T, ?, M> toMap(Function<? super T, ? extends K> keyMapper,
    Function<? super T, ? extends U> valueMapper,
    BinaryOperator<U> mergeFunction,
    Supplier<M> mapFactory)
  • 1
    2
    3
    4
    5
    public static <T, K, U, M extends ConcurrentMap<K, U>>
    Collector<T, ?, M> toConcurrentMap(Function<? super T, ? extends K> keyMapper,
    Function<? super T, ? extends U> valueMapper,
    BinaryOperator<U> mergeFunction,
    Supplier<M> mapFactory)

产生一个收集器,会产生一个映射表或者并发映射表

keyMappervalueMapper会产生一个键值对,默认情况下,键相同会抛出IllegalStateExecption异常,可用提供一个mergeFunction来合并具有相同键的值

默认情况下,结果是一个HashMap或者ConcurrentHashMap对象,可用提供一个mapFactory,来产生期望的映射表实例

群组与分区

刚刚展示了如何收集给定国家的所有语言,但处理过程有些冗长,必须为每个映射表的值生成单例集合,然后指定如何合并现有集合和新集合

将具有相同特性的值群聚成组,可用用groupingBy()方法

1
2
3
Map<String, List<Locale>> countryToLocale = locales.collect(
Collectors.groupingBy(Locale::getCountry)
);

Locale::getCountry是群组的分类函数

现在,可用查找给定国家代码对应的地点了

1
2
3
List<Locale> chinaLocales = countryToLocale.get("CN");
chinaLocales.forEach(s -> System.out.print(s + " "));
// Yield locales [yue_CN_#Hans zh_CN zh_CN_#Hans ug_CN bo_CN ii_CN]

当分类函数是断言函数时,流的元素可以区分为两个列表:该函数返回ture的元素和其他元素

这种情况下,使用partitioningBy()就比groupingBy()高效

下面的例子,将所有Locale对象分成了使用用于和使用所有其他语言的两类

1
2
3
4
5
6
7
8
Map<Boolean, List<Locale>> englishAndOtherLocales = locales.collect(
Collectors.partitioningBy(l -> l.getLanguage().equals("en"))
);
englishAndOtherLocales.forEach((k, v) -> System.out.println(k + ": " + v));

// 输出
false: [, nds, ti_ET, ta_SG, lv, zh_SG_#Hans, ...
true: [en_NU, en_JM, en_LR, en_PW, en_US_POSIX, ...

API ``

  • 1
    2
    public static <T, K> Collector<T, ?, Map<K, List<T>>>
    groupingBy(Function<? super T, ? extends K> classifier)
  • 1
    2
    public static <T, K> Collector<T, ?, ConcurrentMap<K, List<T>>>
    groupingByConcurrent(Function<? super T, ? extends K> classifier)

产生一个收集器,它会产生一个映射表或者并发映射表,其键是将classifier应用于所有收集到的元素上所产生的结果,而值是由具有相同键的元素构成的列表

  • 1
    2
    public static <T> Collector<T, ?, Map<Boolean, List<T>>> 
    partitioningBy(Predicate<? super T> predicate)

产生一个收集器,他会产生一个映射表,其键是true/false,而值是由满足/不满足断言的元素构成的列表

下游收集器

groupingBy()方法产生的映射表,每个值都是一个列表,若想要处理列表,就要提供一个“下游收集器”

例如,想要获得集合而非列表,就可以用之前提到的Collectors.toSet()方法

1
2
3
Map<String, Set<Locale>> countryToLocaleSet = locales.collect(
Collectors.groupingBy(Locale::getCountry, Collectors.toSet())
);

API

Java 提供了多种可用将群组元素约简为数字的收集器,这里简单列举几个API

  • public static <T> Collector<T, ?, Long> counting()
    产生一个可以对收集到的元素进行计数的计数器

  • 1
    2
    3
    public static <T> Collector<T, ?, Integer>
    summingInt|Long|Double
    (ToInt|Long|DoubleFunction<? super T> mapper)

    产生一个收集器,计算总和

  • public static <T> Collector<T, ?, Optional<T>> minBy(Comparator<? super T> comparator)

  • public static <T> Collector<T, ?, Optional<T>> maxBy(Comparator<? super T> comparator)
    产生一个收集器,收集最大/最小值

  • 1
    2
    3
    public static <T, U, A, R> Collector<T, ?, R> 
    mapping(Function<? super T, ? extends U> mapper,
    Collector<? super U, A, R> downstream)

    产生一个收集器,它会产生一个映射表,其键时将mapper应用到收集到的数据上而产生的,其值是使用downstream收集器收集到的具有相同键的元素

约简操作

reduce方法

reduce()方法是一种用于流中计算某个值得通用机制,有多种重载形式

API java.util.stream.Stream

  • Optional reduce(BinaryOperator accumulator);
  • T reduce(T identity, BinaryOperator accumulator);
  • 1
    2
    3
    <U> U reduce(U identity,
    BiFunction<U, ? super T, U> accumulator,
    BinaryOperator<U> combiner);

用给定得accumulator函数产生流中元素的累积总和
如果提供了单位元(Identity Element,也叫幺元),那么第一个被累计的元素就是该单位元
如果提供了组合器,那么它可以用来将分别累积的各个部分整合成总和

collect方法

有时reduce并不够通用,例如,想要收集BitSet中的结果,若收集操作时并行的,那么就不能直接将元素放在单个BitSet对象中,因为其不是线程安全的

每个部分都要以其自己的空集开始,此时就该使用collect,它会接受单个引元
1)一个Supplier<R>对象,可以创建目标类型的新实例
2)一个BiConsumer<R, ? super T>对象,可以将一个元素添加到一个实例上
3)一个BiConsumer<R, R>对象,可以将两个实例合并成一个

简单举个collect方法操作位集的例子

1
BitSet result = stream.collect(BitSet::new, BitSet::set, BitSet::or);

API java.util.stream.Stream

1
2
3
<R> R collect(Supplier<R> supplier,
BiConsumer<R, ? super T> accumulator,
BiConsumer<R, R> combiner);

将元素收集到类型R的结果中
在每个部分上,都会调用supplier来提供初始结果,调用accumulator来交替的将元素添加到结果中,并调用combiner来整合两个结果

基本类型流

流库中提供了专门的类型IntStream, LongStream, DoubleStream,用来直接存储基本类型值,无需使用包装器
如果想要存储short, char, byte, boolean,可以直接使用IntStream
对于float,可以使用DoubleStream

详情可以参照API ,里面提供了丰富的方法

  • java.util.stream.IntStream
  • java.util.stream.LongStream
  • java.util.stream.DoubleStream

并行流

流使得并行处理块操作变得很容易,这个过程几乎是自动的

创建并行流

利用Collection<E>接口的parallelStream()方法从任意集合中获取一个并行流

1
2
3
List<String> words = ...;

Stream<String> parallelWords = words.parallelStream();

parallel()方法可以将任意的顺序流转换为并行流

1
2
3
String[] wordArray = {...};

Stream<String> parallelWords2 = Stream.of(wordArray).parallel();

只要在终结方法执行时,流处于并行模式,那么所有的中间流操作都将被并行化

当流操作并行运行时,目标是要让其返回结果与顺序执行时的返回结果相同,重要的是,这些操作可以以任意顺序执行

例如,想要对字符串流中的所有短单词计数,我们需要确保传递给并行流操作的任何函数都可以安全并行执行

如果用长度将字符串群组,然后分别对它们进行计数,那么就可以安全并行化这项计算

1
2
3
4
5
6
Map<Integer, Long> shortWordCounts = words.parallelStream()
.filter(s -> s.length() < 10)
.collect(Collectors.groupingBy(
String::length,
Collectors.counting()
));

注意,传递给并行流操作的函数不应该被堵塞

默认情况下,从有序集合(数组和列表)、生成器和迭代产生的流,或通过调用Stream.sorted产生的流,都是有序的,排序并不排斥高效的并行处理

通过在流上调用unsorted方法,就可以明确表示我们对排序不感兴趣

之前讨论过,合并映射表的代价很高,不过Collectors.groupByConcurrent方法使用了共享的并发映射表,为从并行化中受益,映射表中的值顺序不会与流中的顺序相同

1
2
3
Map<Integer, List<String>> result = words.parallelStream().collect(
Collectors.groupingByConcurrent(String::length)
);

当然,如果使用独立于排序的下游收集器,就不用太在意了

1
2
3
4
5
6
7
Map<Integer, Long> wordCounts = words.parallelStream()
.collect(
Collectors.groupingByConcurrent(
String::length,
Collectors.counting()
)
);

注意:不要修改在执行某项流操作后将元素返回到流中的集合,因为流不会收集它们的数据,数据总是在单独的集合中。如果修改了这样的集合,那么流操作的结果就是未定义的

为了让并行流正常工作,需要满足许多条件:

  • 数据应该在内存中
  • 流应该可以被高效地分成若干个字部份
  • 流操作的工作量应该具有较大的规模
  • 流操作不应该被阻塞

总之,不要将所有流操作都转换为并行流,只有在对已经位于内存中的数据执行大量计算操作时,才应该使用并行流

API java.util.stream.BaseStream<T, S extends BaseStream<T, S>>接口

  • 1
    S parallel();

    产生一个与当前流中元素相同的并行流

  • 1
    S unordered();

    产生一个与当前流中元素相同的无序流

API java.util.Collection<E>接口

1
2
3
default Stream<E> parallelStream() {
return StreamSupport.stream(spliterator(), true);
}

用当前集合中的元素产生一个并行流

最后附上应该并行流的示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class ParallelStreams {
public static void main(String[] args) throws IOException {
String contents = new String(Files.readAllBytes(
Paths.get("...\\microservice.txt")),
StandardCharsets.UTF_8);
List<String> wordList = Arrays.asList(contents.split("\\PL+"));

// Group and count
Map<Integer, Long> shortWordCounts = wordList.parallelStream()
.filter(s -> s.length() < 10)
.collect(Collectors.groupingBy(String::length, Collectors.counting()));
System.out.println("shortWordCounts: " + shortWordCounts);

// Downstream order not deterministic
Map<Integer, List<String>> result = wordList.parallelStream().collect(
Collectors.groupingByConcurrent(String::length)
);
System.out.println("length(13): " + result.get(13));

// for comparing
result = wordList.parallelStream().collect(
Collectors.groupingByConcurrent(String::length)
);
System.out.println("length(13): " + result.get(13));

Map<Integer, Long> wordCounts = wordList.parallelStream().collect(
Collectors.groupingByConcurrent(String::length, Collectors.counting())
);
System.out.println("wordCounts: " + wordCounts);
}
}

输出如下:

1
2
3
4
shortWordCounts: {0=1, 1=33, 2=99, 3=94, 4=94, 5=48, 6=47, 7=42, 8=51, 9=17}
length(13): [architectural, communicating, architectural, independently, independently, Microservices]
length(13): [architectural, architectural, Microservices, communicating, independently, independently]
wordCounts: {0=1, 1=33, 2=99, 3=94, 4=94, 5=48, 6=47, 7=42, 8=51, 9=17, 10=27, 11=17, 12=21, 13=6}