深入介绍和使用 Java 8 的 Collector 接口和 Collectors 工具类

1 概述

  • java.util.stream.Collector: Collector 接口用于将 Stream 流中的数据加工,转换,处理,最后返回结果。
  • java.util.stream.Collectors: Collectors 工具类内置了一些 Collector 接口的实现。

本文详解 Java 8 的 Collector 接口方法以及 Collectors 工具类的使用,具体内容如下

  1. Stream api 中的 collect 方法介绍以及使用
  2. Collector 接口的实现
  3. Collectors 工具类介绍以及使用

2 Stream 接口中的 collect 方法

collect 方法的功能是将 Stream 中数据转换为最终的结果,具体如下

  1. <R, A> R collect(Collector<? super T, A, R> collector):参数类型为 Collector 接口
  2. <R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner):参数类型为 Supplier, BiConsumer,BiConsumer 三个对象。

2.1 collect 方法使用举例

需求:将集合中的英文字符串首字母大写,下面提供两种实现的方法:

  1. collect 方法接口参数为 Collector 对象实现
  2. collect 方法接口参数为 Supplier, BiConsumer, BiConsumer 三个对象实现

具体实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
@Test
public void test_1() {
List<String> dataList = new ArrayList<>();
dataList.add("john");
dataList.add("kain");
dataList.add("mike");
dataList.add("milk");
dataList.add("kav");

// 1. 接口参数为 Collector 对象实现

// 1.1 Supplier 对象负责创建容器对象,以便容纳 stream 中分隔后的数据,这里使用 List<Integer> 作为数据的容器对象
// 1.2 accumulator 对象负责将 stream 中分隔后的数据放入上一步创建的容器中
// 1.3 combiner 对象负责将分隔的容器中的数据合并起来
// 1.4 finisher 对象负责将最终返回的结果进行最终的处理
// 1.5 characteristics 负责 Collector 执行效率的优化,具体要根据是否使用了 parallelStream 以及 数据是否有顺序,不会影响最终的执行结果:
List<String> resultList = dataList.parallelStream().collect(new Collector<String, List<String>, List<String>>() {
@Override
public Supplier<List<String>> supplier() {
return () -> new ArrayList<>();
}

@Override
public BiConsumer<List<String>, String> accumulator() {
return (list, data) -> list.add(data.substring(0,1).toUpperCase().concat(data.substring(1)));
}

@Override
public BinaryOperator<List<String>> combiner() {
return (list1,list2) -> {
list1.addAll(list2);
return list1;
};
}

@Override
public Function<List<String>, List<String>> finisher() {
return list -> list;
}

@Override
public Set<Characteristics> characteristics() {
Set<Characteristics> characteristicsSet = new HashSet<>();
characteristicsSet.add(Characteristics.CONCURRENT);
return characteristicsSet;
}
});

resultList.forEach(System.out::println);

// 2. 接口参数为 Supplier, BiConsumer, BiConsumer 三个对象

// 2.1 Supplier 对象负责创建容器对象,以便容纳 stream 中分隔后的数据,这里使用 List<Integer> 作为数据的容器对象
// 2.2 BiConsumer 对象负责将 stream 中分隔后的数据放入上一步创建的容器中
// 2.3 BiConsumer 对象负责将分隔的容器中的数据合并起来
List<String> resultList2 = dataList.parallelStream().collect(
ArrayList::new,
(list, data) -> list.add(data.substring(0,1).toUpperCase().concat(data.substring(1))),
(list1,list2) -> list1.addAll(list2));

resultList2.forEach(System.out::println);
}
1
2
3
4
5
6
7
8
9
10
11
new supplier
John
Kain
Mike
Milk
Kav
John
Kain
Mike
Milk
Kav

2.2 collect 方法接口参数为 Collector 对象实现

在 Collector 对象实现中主要包括如下 5 个接口对象

  1. Supplier 对象负责创建容器对象,以便容纳 stream 中分隔后的数据,这里使用 List 作为数据的容器对象
  2. accumulator 对象负责将 stream 中分隔后的数据放入上一步创建的容器中
  3. combiner 对象负责将分隔的容器中的数据合并起来
  4. finisher 对象负责将最终返回的结果进行最终的处理
  5. characteristics 负责 Collector 执行效率的优化,具体要根据是否使用了 parallelStream 以及 数据是否有顺序,不会影响最终的执行结果

2.3 collect 方法接口参数为 Supplier, BiConsumer, BiConsumer 三个对象实现

Supplier, BiConsumer, BiConsumer 三个对象的功能如下

  1. Supplier 对象负责创建容器对象,以便容纳 stream 中分隔后的数据,这里使用 List 作为数据的容器对象
  2. BiConsumer 对象负责将 stream 中分隔后的数据放入上一步创建的容器中
  3. BiConsumer 对象负责将分隔的容器中的数据合并起来

2.4 Characteristics 枚举

在接口参数为 Collector 对象实现中有一个 characteristics 方法,返回的是一个 Set 集合,负责 Collector 执行效率的优化,具体要根据是否使用了 parallelStream 以及数据是否有顺序,最终不会影响执行结果。

具体内容为 Collector 接口中定义的 Characteristics 泛型,具体如下

1
2
3
4
5
6
7
8
enum Characteristics {
// 如果使用 parallelStream,characteristics 方法返回的 set 集合中可以加上这个枚举
CONCURRENT,
// 如果流中的数据是没有顺序的,也可以加上这个枚举
UNORDERED,
// 加上这个表示 finisher 方法中的参数和返回值完全一致,finisher 方法不会执行了
IDENTITY_FINISH
}

2.5 parallelStream 线程安全问题

  • 其中上面的两个例子中都使用了 parallelStream,并且使用了 collect 方法
  • parallelStream 可以提高数据处理效率,具体是通过将流中的数据分隔成若干端后再并行处理最后将结果合并
  • 如果不使用 collect 方法会发生什么情况,会不会出现线程安全问题?

先看下面这个例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private static Lock lock = new ReentrantLock();

@Test
public void test_unsafe() {
List<Integer> list1 = new ArrayList<>();
List<Integer> list2 = new ArrayList<>();
List<Integer> list3 = new ArrayList<>();

// 串行
IntStream.range(0, 10000).forEach(list1::add);

// 并行
IntStream.range(0, 10000).parallel().forEach(list2::add);

// 并行加锁
IntStream.range(0, 10000).parallel().forEach(i -> {
lock.lock();
try {
list3.add(i);
} finally {
lock.unlock();
}
});

System.out.println(String.format("串行执行的大小:%s", list1.size()));
System.out.println(String.format("有线程安全的并行问题大小:%s", list2.size()));
System.out.println(String.format("并行加锁:%s", list1.size()));
}

执行结果如下,可见在 parallelStream 中会出现线程安全问题,虽然通过锁可以避免线程安全问题,但是会降低程序处理效率,从而影响系统的吞吐率。

1
2
3
串行执行的大小:10000
有线程安全的并行问题大小:7351
并行加锁:10000

正确的解决的方法就是通过 collect 方法和 Collector 接口,具体如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Test
public void test_safe() {
List<Integer> list1 = new ArrayList<>();

// 串行
IntStream.range(0, 10000).forEach(list1::add);

// 通过 collect 接口解决并发问题
// 1. Supplier 对象负责创建容器对象,以便容纳 stream 中分隔后的数据,这里使用 List<Integer> 作为数据的容器对象
// 2. ObjIntConsumer 对象负责将 stream 中分隔后的数据放入上一步创建的容器中
// 3. BiConsumer 对象负责将分隔的容器中的数据合并起来
List<Integer> dataList = IntStream.range(0, 10000).parallel().collect(new Supplier<List<Integer>>() {
@Override
public List<Integer> get() {
System.out.println("new Supplier");
return new ArrayList<>();
}
}, new ObjIntConsumer<List<Integer>>() {
@Override
public void accept(List<Integer> dataList, int value) {
dataList.add(value);
}
}, new BiConsumer<List<Integer>, List<Integer>>() {
@Override
public void accept(List<Integer> dataList1, List<Integer> dataList2) {
System.out.println("merge result");
dataList1.addAll(dataList2);
}
});

List<Integer> dataList1 = IntStream.range(0, 10000).collect(ArrayList::new, (list, i) -> list.add(i), (lista, listb) -> lista.addAll(listb));
List<Integer> dataList2 = IntStream.range(0, 10000).parallel().collect(ArrayList::new, (list, i) -> list.add(i), (lista, listb) -> lista.addAll(listb));

System.out.println(String.format("串行执行的大小:%s", list1.size()));
System.out.println(String.format("线程安全的并行大小:%s", dataList.size()));
System.out.println(String.format("线程安全的并行大小:%s", dataList1.size()));
System.out.println(String.format("线程安全的并行大小:%s", dataList2.size()));
}
  • 通过 collect 方法解决并发问题
  1. Supplier 对象负责创建容器对象,以便容纳 stream 中分隔后的数据,这里使用 List 作为数据的容器对象
  2. ObjIntConsumer 对象负责将 stream 中分隔后的数据放入上一步创建的容器中
  3. BiConsumer 对象负责将分隔的容器中的数据合并起来

3 Collectors 工具类介绍以及使用

在上面的例子中都使用了 collect 方法以及自行实现了 Collector 接口。有没有 jdk 已经实现好的 Collector 接口呢?

3.1 Collectors 工具类方法介绍

答案是有的,就是这里将要介绍的 Collectors 工具类:java.util.stream.Collectors,完整的方法如下:

image

下面来简要介绍一下,不谈具体的参数,返回类型都是 Collector 接口对象,具体如下

  1. toCollection,toList,toSet:处理并将结果返回成集合对象
  2. joining:处理并将结果返回成字符串
  3. mapping:首先将流中的元素从 T 转成 U,然后再将含有 U 的流转给 downstream 继续处理
  4. collectingAndThen:首先将流传给 downstream 处理,然后将 downstream 中的 R 转成 RR,最后的结果为 RR
  5. counting:求流中元素的总数,类型为 Long
  6. minBy: 求流中最小的元素
  7. maxBy:求流中最大的元素
  8. summingInt,summingLong,summingDouble:求流中元素之和,和的类型分别是 int, long, double 类型
  9. averagingInt,averagingLong,averagingDouble:求流中元素的平均数,平均数的类型分别是 int, long, double 类型
  10. reducing:减少流中的元素
  11. groupingBy:将流中的元素分组处理
  12. groupingByConcurrent:以并行的方式将流中的元素分组处理
  13. partitioningBy:将流中的元素分区处理
  14. toMap, toConcurrentMap:处理并将结果返回成 Map 对象
  15. summarizingInt,summarizingLong,summarizingDouble:汇总流中的元素,汇总后的类型分别是 IntSummaryStatistics, LongSummaryStatistics, DoubleSummaryStatistics

3.2 Collectors 工具类使用

下面来介绍上面的 15 中方法具体使用

  • 增加 Student 实体如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import org.apache.commons.lang3.builder.ToStringBuilder;

public class Student {

private int age;//年龄
private String name;//姓名
private String enName;// 英文 姓名
private char gender;//性别,0未知,1男,2女
private double credit; // 学分

public int getAge() {
return age;
}

public void setAge(int age) {
this.age = age;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public String getEnName() {
return enName;
}

public void setEnName(String enName) {
this.enName = enName;
}

public char getGender() {
return gender;
}

public void setGender(char gender) {
this.gender = gender;
}

public double getCredit() {
return credit;
}

public void setCredit(double credit) {
this.credit = credit;
}

public Student(int age, String name, String enName, char gender, double credit) {
this.age = age;
this.name = name;
this.enName = enName;
this.gender = gender;
this.credit = credit;
}

@Override
public String toString() {
return new ToStringBuilder(this)
.append("age", age)
.append("name", name)
.append("enName", enName)
.append("gender", gender)
.append("credit", credit)
.toString();
}

public Student() {
}
}
  • 具体的测试如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.math.BigDecimal;
import java.util.*;
import java.util.stream.Collectors;

public class TestCollectorsApi {

private static final Logger logger = LoggerFactory.getLogger(TestCollectorsApi.class);

private static List<Student> userList = new ArrayList<>();

/**
* 初始化 Student 集合
*/
@Before
public void initEveryTestBefore() {
userList.add(new Student(22, "王旭", "wang.xu", '1', 4));
userList.add(new Student(21, "孙萍", "sun.ping", '2', 3.6));
userList.add(new Student(23, "步传宇", "bu.zhuan.yu", '0', 4.1));
userList.add(new Student(18, "蔡明浩", "cai.ming.hao", '1', 3.5));
userList.add(new Student(17, "郭林杰", "guo.lin.jie", '1', 3.4));
userList.add(new Student(29, "韩凯", "han.kai", '0', 4));
userList.add(new Student(22, "韩天琪", "han.tian.qi", '1', 4));
userList.add(new Student(21, "郝玮", "hao.wei", '2', 3.9));
userList.add(new Student(19, "胡亚强", "hu.ya.qing", '1', 3.5));
userList.add(new Student(14, "季恺", "ji.kai", '2', 4.2));
userList.add(new Student(17, "荆帅", "jing.shuai", '1', 4.1));
userList.add(new Student(16, "姜有琪", "jiang.you.qi", '1', 2.5));
logger.info("initEveryTestBefore, size {}", userList.size());
}

/**
* 测试结束,清理数据
*/
@After
public void destroyEveryTestAfter() {
logger.info("destroyEveryTestAfter, size {}", userList.size());
userList.clear();
}

@Test
public void toList() {
// 按照学分排序
userList = userList
.stream()
.sorted(Comparator.comparing(Student::getCredit))
.collect(Collectors.toList());

// 打印数据
userList.forEach(t -> logger.info(t.toString()));
}

@Test
public void joining() {
// 将学生姓名合并成以,分隔的字符串
String names = userList
.stream()
.map(Student::getName)
.collect(Collectors.joining(","));

// 打印数据
System.out.println(names);
}

@Test
public void mapping() {
// 将学生姓名合并成以,分隔的字符串
String names = userList
.stream()
.collect(Collectors.mapping(Student::getName, Collectors.joining(",")));

// 打印数据
System.out.println(names);
}

@Test
public void collectingAndThen() {
// 将学生姓名合并成以,分隔的字符串, 再通过 Function 对象将字符串转成 String[] 对象
String[] nameArr = userList
.stream()
.map(Student::getName)
.collect(Collectors.collectingAndThen(Collectors.joining(","), data -> data.split(",")));

// 打印数据
for (int i = 0; i < nameArr.length; i++) {
System.out.println(nameArr[i]);
}
}

@Test
public void counting() {
// 将学生姓名合并成以,分隔的字符串, 再通过 Function 对象将字符串转成 String[] 对象
Long count = userList
.stream()
.filter(student -> student.getCredit() > 4)
.collect(Collectors.counting());

// 打印数据
System.out.println(String.format("学分大于 4 的学生人数为:%s", count));
}

@Test
public void minBy() {
// 先去重,然后获取学分最小的学生
Optional<Student> student = userList
.stream()
.distinct()
.collect(Collectors.minBy((stu1, stu2) -> BigDecimal.valueOf(stu1.getCredit()).compareTo(BigDecimal.valueOf(stu2.getCredit()))));

// 打印数据
System.out.println(String.format("学分最小的学生:%s", student.get().toString()));
}

@Test
public void maxBy() {
// 先去重,然后获取学分最大的学生
// 使用 Comparator.comparing
Optional<Student> student = userList
.stream()
.distinct()
.collect(Collectors.maxBy(Comparator.comparing(Student::getCredit)));

// 打印数据
System.out.println(String.format("学分最大的学生:%s", student.get().toString()));
}

@Test
public void summingDouble() {
double total = userList
.stream()
.distinct()
.collect(Collectors.summingDouble(Student::getCredit));

// 打印数据
System.out.println(String.format("学分总计:%s", total));
}

@Test
public void averagingDouble() {
double average = userList
.stream()
.distinct()
.collect(Collectors.averagingDouble(Student::getCredit));

// 打印数据
System.out.println(String.format("学分平均:%s", average));
}

@Test
public void reducing() {
// 1 返回学分最大的学生
// reducing 第一个参数 T identity 表示最终的返回类型或者结果的容器
Student studentMax = userList
.stream()
.collect(Collectors.reducing(new Student(), (stu1, stu2) -> stu1.getCredit() > stu2.getCredit() ? stu1 : stu2));

// 打印数据
System.out.println(String.format("返回学分最大的学生:%s", studentMax));

// 2 返回学分最大的学生,不需要指定 返回类型或者结果的容器
Optional<Student> studentMin = userList
.stream()
.collect(Collectors.reducing((stu1, stu2) -> stu1.getCredit() > stu2.getCredit() ? stu1 : stu2));

// 打印数据
System.out.println(String.format("返回学分最大的学生:%s", studentMin.get().toString()));

// 3 所有学生的学分之和
BigDecimal total = BigDecimal.ZERO;

BigDecimal totalCredit = userList
.stream()
.collect(Collectors.reducing(total, stu -> total.add(BigDecimal.valueOf(stu.getCredit())), (data1, data2) -> data1.add(data2)));

// 打印数据
System.out.println(String.format("所有学生的学分之和:%s", totalCredit));
}

@Test
public void groupingBy() {
// 按照性别分组
Map<Character, List<Student>> dataMap = userList
.stream()
.collect(Collectors.groupingBy(Student::getGender));

// 打印数据
dataMap.forEach((k,v) -> {
System.out.println(String.format("性别:%s", k));
v.forEach(System.out::println);
System.out.println("---------------------------");
});
}

@Test
public void groupingByConcurrent() {
// 按照性别分组
Map<Character, List<Student>> dataMap = userList
.stream()
.collect(Collectors.groupingByConcurrent(Student::getGender));

// 打印数据
dataMap.forEach((k,v) -> {
System.out.println(String.format("性别:%s", k));
v.forEach(System.out::println);
System.out.println("---------------------------");
});
}

@Test
public void partitioningBy() {
// 按照学分是否大于 4 分组
Map<Boolean, List<Student>> dataMap = userList
.stream()
.collect(Collectors.partitioningBy(stu -> stu.getCredit() > 4));

// 打印数据
dataMap.forEach((k,v) -> {
System.out.println(k ? "学分大于4:" : "学分小于4:");
v.forEach(System.out::println);
System.out.println("---------------------------");
});
}

@Test
public void toMap() {
// 将数据转成 Map
Map<String, Double> dataMap = userList
.stream()
.collect(Collectors.toMap(stu -> stu.getName(), stu -> stu.getCredit()));

// 打印数据
dataMap.forEach((k,v) -> {
System.out.println(String.format("姓名:%s, 学分:%s:", k, v));
});
}

@Test
public void summarizingDouble() {
// 数据统计
DoubleSummaryStatistics doubleSummaryStatistics = userList
.stream()
.distinct()
.collect(Collectors.summarizingDouble(Student::getCredit));

System.out.println(String.format("学生总数:%s", doubleSummaryStatistics.getCount()));
System.out.println(String.format("所有学生的平均学分:%s", doubleSummaryStatistics.getAverage()));
System.out.println(String.format("最高学分:%s", doubleSummaryStatistics.getMax()));
System.out.println(String.format("最低学分:%s", doubleSummaryStatistics.getMin()));
System.out.println(String.format("学分总和:%s", doubleSummaryStatistics.getSum()));

}

}
Buy me a cup of coffee