1 /*
2 * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This code is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12 * version 2 for more details (a copy is included in the LICENSE file that
13 * accompanied this code).
14 *
15 * You should have received a copy of the GNU General Public License version
16 * 2 along with this work; if not, write to the Free Software Foundation,
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18 *
19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20 * or visit www.oracle.com if you need additional information or have any
21 * questions.
22 */
23 package org.openjdk.tests.java.util.stream;
24
25 import java.util.ArrayList;
26 import java.util.Arrays;
27 import java.util.Collection;
28 import java.util.Collections;
29 import java.util.Comparator;
30 import java.util.HashMap;
31 import java.util.HashSet;
32 import java.util.IntSummaryStatistics;
33 import java.util.Iterator;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.Optional;
37 import java.util.Set;
38 import java.util.StringJoiner;
39 import java.util.TreeMap;
40 import java.util.concurrent.ConcurrentHashMap;
41 import java.util.concurrent.ConcurrentSkipListMap;
42 import java.util.concurrent.atomic.AtomicInteger;
43 import java.util.function.BiFunction;
44 import java.util.function.BinaryOperator;
45 import java.util.function.Function;
46 import java.util.function.Predicate;
47 import java.util.function.Supplier;
48 import java.util.stream.Collector;
49 import java.util.stream.Collectors;
50 import java.util.stream.LambdaTestHelpers;
51 import java.util.stream.OpTestCase;
52 import java.util.stream.Stream;
53 import java.util.stream.StreamOpFlagTestHelper;
54 import java.util.stream.StreamTestDataProvider;
55 import java.util.stream.TestData;
56
57 import org.testng.annotations.Test;
58
59 import static java.util.stream.Collectors.collectingAndThen;
60 import static java.util.stream.Collectors.flatMapping;
61 import static java.util.stream.Collectors.filtering;
62 import static java.util.stream.Collectors.groupingBy;
63 import static java.util.stream.Collectors.groupingByConcurrent;
64 import static java.util.stream.Collectors.mapping;
65 import static java.util.stream.Collectors.partitioningBy;
66 import static java.util.stream.Collectors.reducing;
67 import static java.util.stream.Collectors.toCollection;
68 import static java.util.stream.Collectors.toConcurrentMap;
69 import static java.util.stream.Collectors.toList;
70 import static java.util.stream.Collectors.toMap;
71 import static java.util.stream.Collectors.toSet;
72 import static java.util.stream.LambdaTestHelpers.assertContents;
73 import static java.util.stream.LambdaTestHelpers.assertContentsUnordered;
74 import static java.util.stream.LambdaTestHelpers.mDoubler;
75
76 /*
77 * @test
78 * @bug 8071600 8144675
79 * @summary Test for collectors.
80 */
81 public class CollectorsTest extends OpTestCase {
82
83 private abstract static class CollectorAssertion<T, U> {
84 abstract void assertValue(U value,
85 Supplier<Stream<T>> source,
86 boolean ordered) throws ReflectiveOperationException;
87 }
88
89 static class MappingAssertion<T, V, R> extends CollectorAssertion<T, R> {
90 private final Function<T, V> mapper;
91 private final CollectorAssertion<V, R> downstream;
92
93 MappingAssertion(Function<T, V> mapper, CollectorAssertion<V, R> downstream) {
94 this.mapper = mapper;
95 this.downstream = downstream;
96 }
97
98 @Override
99 void assertValue(R value, Supplier<Stream<T>> source, boolean ordered) throws ReflectiveOperationException {
100 downstream.assertValue(value,
101 () -> source.get().map(mapper::apply),
102 ordered);
103 }
104 }
105
106 static class FlatMappingAssertion<T, V, R> extends CollectorAssertion<T, R> {
107 private final Function<T, Stream<V>> mapper;
108 private final CollectorAssertion<V, R> downstream;
109
110 FlatMappingAssertion(Function<T, Stream<V>> mapper,
111 CollectorAssertion<V, R> downstream) {
112 this.mapper = mapper;
113 this.downstream = downstream;
114 }
115
116 @Override
117 void assertValue(R value, Supplier<Stream<T>> source, boolean ordered) throws ReflectiveOperationException {
118 downstream.assertValue(value,
119 () -> source.get().flatMap(mapper::apply),
120 ordered);
121 }
122 }
123
124 static class FilteringAssertion<T, R> extends CollectorAssertion<T, R> {
125 private final Predicate<T> filter;
126 private final CollectorAssertion<T, R> downstream;
127
128 public FilteringAssertion(Predicate<T> filter, CollectorAssertion<T, R> downstream) {
129 this.filter = filter;
130 this.downstream = downstream;
131 }
132
133 @Override
134 void assertValue(R value, Supplier<Stream<T>> source, boolean ordered) throws ReflectiveOperationException {
135 downstream.assertValue(value,
136 () -> source.get().filter(filter),
137 ordered);
138 }
139 }
140
141 static class GroupingByAssertion<T, K, V, M extends Map<K, ? extends V>> extends CollectorAssertion<T, M> {
142 private final Class<? extends Map> clazz;
143 private final Function<T, K> classifier;
144 private final CollectorAssertion<T,V> downstream;
145
146 GroupingByAssertion(Function<T, K> classifier, Class<? extends Map> clazz,
147 CollectorAssertion<T, V> downstream) {
148 this.clazz = clazz;
149 this.classifier = classifier;
150 this.downstream = downstream;
151 }
152
153 @Override
154 void assertValue(M map,
155 Supplier<Stream<T>> source,
156 boolean ordered) throws ReflectiveOperationException {
157 if (!clazz.isAssignableFrom(map.getClass()))
158 fail(String.format("Class mismatch in GroupingByAssertion: %s, %s", clazz, map.getClass()));
159 assertContentsUnordered(map.keySet(), source.get().map(classifier).collect(toSet()));
160 for (Map.Entry<K, ? extends V> entry : map.entrySet()) {
161 K key = entry.getKey();
162 downstream.assertValue(entry.getValue(),
163 () -> source.get().filter(e -> classifier.apply(e).equals(key)),
164 ordered);
165 }
166 }
167 }
168
169 static class ToMapAssertion<T, K, V, M extends Map<K,V>> extends CollectorAssertion<T, M> {
170 private final Class<? extends Map> clazz;
171 private final Function<T, K> keyFn;
172 private final Function<T, V> valueFn;
173 private final BinaryOperator<V> mergeFn;
174
175 ToMapAssertion(Function<T, K> keyFn,
176 Function<T, V> valueFn,
177 BinaryOperator<V> mergeFn,
178 Class<? extends Map> clazz) {
179 this.clazz = clazz;
180 this.keyFn = keyFn;
181 this.valueFn = valueFn;
182 this.mergeFn = mergeFn;
183 }
184
185 @Override
186 void assertValue(M map, Supplier<Stream<T>> source, boolean ordered) throws ReflectiveOperationException {
187 if (!clazz.isAssignableFrom(map.getClass()))
188 fail(String.format("Class mismatch in ToMapAssertion: %s, %s", clazz, map.getClass()));
189 Set<K> uniqueKeys = source.get().map(keyFn).collect(toSet());
190 assertEquals(uniqueKeys, map.keySet());
191 source.get().forEach(t -> {
192 K key = keyFn.apply(t);
193 V v = source.get()
194 .filter(e -> key.equals(keyFn.apply(e)))
195 .map(valueFn)
196 .reduce(mergeFn)
197 .get();
198 assertEquals(map.get(key), v);
199 });
200 }
201 }
202
203 static class PartitioningByAssertion<T, D> extends CollectorAssertion<T, Map<Boolean,D>> {
204 private final Predicate<T> predicate;
205 private final CollectorAssertion<T,D> downstream;
206
207 PartitioningByAssertion(Predicate<T> predicate, CollectorAssertion<T, D> downstream) {
208 this.predicate = predicate;
209 this.downstream = downstream;
210 }
211
212 @Override
213 void assertValue(Map<Boolean, D> map,
214 Supplier<Stream<T>> source,
215 boolean ordered) throws ReflectiveOperationException {
216 if (!Map.class.isAssignableFrom(map.getClass()))
217 fail(String.format("Class mismatch in PartitioningByAssertion: %s", map.getClass()));
218 assertEquals(2, map.size());
219 downstream.assertValue(map.get(true), () -> source.get().filter(predicate), ordered);
220 downstream.assertValue(map.get(false), () -> source.get().filter(predicate.negate()), ordered);
221 }
222 }
223
224 static class ToListAssertion<T> extends CollectorAssertion<T, List<T>> {
225 @Override
226 void assertValue(List<T> value, Supplier<Stream<T>> source, boolean ordered)
227 throws ReflectiveOperationException {
228 if (!List.class.isAssignableFrom(value.getClass()))
229 fail(String.format("Class mismatch in ToListAssertion: %s", value.getClass()));
230 Stream<T> stream = source.get();
231 List<T> result = new ArrayList<>();
232 for (Iterator<T> it = stream.iterator(); it.hasNext(); ) // avoid capturing result::add
233 result.add(it.next());
234 if (StreamOpFlagTestHelper.isStreamOrdered(stream) && ordered)
235 assertContents(value, result);
236 else
237 assertContentsUnordered(value, result);
238 }
239 }
240
241 static class ToCollectionAssertion<T> extends CollectorAssertion<T, Collection<T>> {
242 private final Class<? extends Collection> clazz;
243 private final boolean targetOrdered;
244
245 ToCollectionAssertion(Class<? extends Collection> clazz, boolean targetOrdered) {
246 this.clazz = clazz;
247 this.targetOrdered = targetOrdered;
248 }
249
250 @Override
251 void assertValue(Collection<T> value, Supplier<Stream<T>> source, boolean ordered)
252 throws ReflectiveOperationException {
253 if (!clazz.isAssignableFrom(value.getClass()))
254 fail(String.format("Class mismatch in ToCollectionAssertion: %s, %s", clazz, value.getClass()));
255 Stream<T> stream = source.get();
256 Collection<T> result = clazz.newInstance();
257 for (Iterator<T> it = stream.iterator(); it.hasNext(); ) // avoid capturing result::add
258 result.add(it.next());
259 if (StreamOpFlagTestHelper.isStreamOrdered(stream) && targetOrdered && ordered)
260 assertContents(value, result);
261 else
262 assertContentsUnordered(value, result);
263 }
264 }
265
266 static class ReducingAssertion<T, U> extends CollectorAssertion<T, U> {
267 private final U identity;
268 private final Function<T, U> mapper;
269 private final BinaryOperator<U> reducer;
270
271 ReducingAssertion(U identity, Function<T, U> mapper, BinaryOperator<U> reducer) {
272 this.identity = identity;
273 this.mapper = mapper;
274 this.reducer = reducer;
275 }
276
277 @Override
278 void assertValue(U value, Supplier<Stream<T>> source, boolean ordered)
279 throws ReflectiveOperationException {
280 Optional<U> reduced = source.get().map(mapper).reduce(reducer);
281 if (value == null)
282 assertTrue(!reduced.isPresent());
283 else if (!reduced.isPresent()) {
284 assertEquals(value, identity);
285 }
286 else {
287 assertEquals(value, reduced.get());
288 }
289 }
290 }
291
292 static class PairingAssertion<T, R1, R2, RR> extends CollectorAssertion<T, RR> {
293 private final Collector<T, ?, R1> c1;
294 private final Collector<T, ?, R2> c2;
295 private final BiFunction<? super R1, ? super R2, ? extends RR> finisher;
296
297 PairingAssertion(Collector<T, ?, R1> c1, Collector<T, ?, R2> c2,
298 BiFunction<? super R1, ? super R2, ? extends RR> finisher) {
299 this.c1 = c1;
300 this.c2 = c2;
301 this.finisher = finisher;
302 }
303
304 @Override
305 void assertValue(RR value, Supplier<Stream<T>> source, boolean ordered) {
306 R1 r1 = source.get().collect(c1);
307 R2 r2 = source.get().collect(c2);
308 RR expected = finisher.apply(r1, r2);
309 assertEquals(value, expected);
310 }
311 }
312
313 private <T> ResultAsserter<T> mapTabulationAsserter(boolean ordered) {
314 return (act, exp, ord, par) -> {
315 if (par && (!ordered || !ord)) {
316 CollectorsTest.nestedMapEqualityAssertion(act, exp);
317 }
318 else {
319 LambdaTestHelpers.assertContentsEqual(act, exp);
320 }
321 };
322 }
323
324 private<T, M extends Map>
325 void exerciseMapCollection(TestData<T, Stream<T>> data,
326 Collector<T, ?, ? extends M> collector,
327 CollectorAssertion<T, M> assertion)
328 throws ReflectiveOperationException {
329 boolean ordered = !collector.characteristics().contains(Collector.Characteristics.UNORDERED);
330
331 M m = withData(data)
332 .terminal(s -> s.collect(collector))
333 .resultAsserter(mapTabulationAsserter(ordered))
334 .exercise();
335 assertion.assertValue(m, () -> data.stream(), ordered);
336
337 m = withData(data)
338 .terminal(s -> s.unordered().collect(collector))
339 .resultAsserter(mapTabulationAsserter(ordered))
340 .exercise();
341 assertion.assertValue(m, () -> data.stream(), false);
342 }
343
344 private static void nestedMapEqualityAssertion(Object o1, Object o2) {
345 if (o1 instanceof Map) {
346 Map m1 = (Map) o1;
347 Map m2 = (Map) o2;
348 assertContentsUnordered(m1.keySet(), m2.keySet());
349 for (Object k : m1.keySet())
350 nestedMapEqualityAssertion(m1.get(k), m2.get(k));
351 }
352 else if (o1 instanceof Collection) {
353 assertContentsUnordered(((Collection) o1), ((Collection) o2));
354 }
355 else
356 assertEquals(o1, o2);
357 }
358
359 private<T, R> void assertCollect(TestData.OfRef<T> data,
360 Collector<T, ?, R> collector,
361 Function<Stream<T>, R> streamReduction) {
362 R check = streamReduction.apply(data.stream());
363 withData(data).terminal(s -> s.collect(collector)).expectedResult(check).exercise();
364 }
365
366 @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
367 public void testReducing(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
368 assertCollect(data, Collectors.reducing(0, Integer::sum),
369 s -> s.reduce(0, Integer::sum));
370 assertCollect(data, Collectors.reducing(Integer.MAX_VALUE, Integer::min),
371 s -> s.min(Integer::compare).orElse(Integer.MAX_VALUE));
372 assertCollect(data, Collectors.reducing(Integer.MIN_VALUE, Integer::max),
373 s -> s.max(Integer::compare).orElse(Integer.MIN_VALUE));
374
375 assertCollect(data, Collectors.reducing(Integer::sum),
376 s -> s.reduce(Integer::sum));
377 assertCollect(data, Collectors.minBy(Comparator.naturalOrder()),
378 s -> s.min(Integer::compare));
379 assertCollect(data, Collectors.maxBy(Comparator.naturalOrder()),
380 s -> s.max(Integer::compare));
381
382 assertCollect(data, Collectors.reducing(0, x -> x*2, Integer::sum),
383 s -> s.map(x -> x*2).reduce(0, Integer::sum));
384
385 assertCollect(data, Collectors.summingLong(x -> x * 2L),
386 s -> s.map(x -> x*2L).reduce(0L, Long::sum));
387 assertCollect(data, Collectors.summingInt(x -> x * 2),
388 s -> s.map(x -> x*2).reduce(0, Integer::sum));
389 assertCollect(data, Collectors.summingDouble(x -> x * 2.0d),
390 s -> s.map(x -> x * 2.0d).reduce(0.0d, Double::sum));
391
392 assertCollect(data, Collectors.averagingInt(x -> x * 2),
393 s -> s.mapToInt(x -> x * 2).average().orElse(0));
394 assertCollect(data, Collectors.averagingLong(x -> x * 2),
395 s -> s.mapToLong(x -> x * 2).average().orElse(0));
396 assertCollect(data, Collectors.averagingDouble(x -> x * 2),
397 s -> s.mapToDouble(x -> x * 2).average().orElse(0));
398
399 // Test explicit Collector.of
400 Collector<Integer, long[], Double> avg2xint = Collector.of(() -> new long[2],
401 (a, b) -> {
402 a[0] += b * 2;
403 a[1]++;
404 },
405 (a, b) -> {
406 a[0] += b[0];
407 a[1] += b[1];
408 return a;
409 },
410 a -> a[1] == 0 ? 0.0d : (double) a[0] / a[1]);
411 assertCollect(data, avg2xint,
412 s -> s.mapToInt(x -> x * 2).average().orElse(0));
413 }
414
415 @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
416 public void testJoining(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
417 withData(data)
418 .terminal(s -> s.map(Object::toString).collect(Collectors.joining()))
419 .expectedResult(join(data, ""))
420 .exercise();
421
422 Collector<String, StringBuilder, String> likeJoining = Collector.of(StringBuilder::new, StringBuilder::append, (sb1, sb2) -> sb1.append(sb2.toString()), StringBuilder::toString);
423 withData(data)
424 .terminal(s -> s.map(Object::toString).collect(likeJoining))
425 .expectedResult(join(data, ""))
426 .exercise();
427
428 withData(data)
429 .terminal(s -> s.map(Object::toString).collect(Collectors.joining(",")))
430 .expectedResult(join(data, ","))
431 .exercise();
432
433 withData(data)
434 .terminal(s -> s.map(Object::toString).collect(Collectors.joining(",", "[", "]")))
435 .expectedResult("[" + join(data, ",") + "]")
436 .exercise();
437
438 withData(data)
439 .terminal(s -> s.map(Object::toString)
440 .collect(StringBuilder::new, StringBuilder::append, StringBuilder::append)
441 .toString())
442 .expectedResult(join(data, ""))
443 .exercise();
444
445 withData(data)
446 .terminal(s -> s.map(Object::toString)
447 .collect(() -> new StringJoiner(","),
448 (sj, cs) -> sj.add(cs),
449 (j1, j2) -> j1.merge(j2))
450 .toString())
451 .expectedResult(join(data, ","))
452 .exercise();
453
454 withData(data)
455 .terminal(s -> s.map(Object::toString)
456 .collect(() -> new StringJoiner(",", "[", "]"),
457 (sj, cs) -> sj.add(cs),
458 (j1, j2) -> j1.merge(j2))
459 .toString())
460 .expectedResult("[" + join(data, ",") + "]")
461 .exercise();
462 }
463
464 private<T> String join(TestData.OfRef<T> data, String delim) {
465 StringBuilder sb = new StringBuilder();
466 boolean first = true;
467 for (T i : data) {
468 if (!first)
469 sb.append(delim);
470 sb.append(i.toString());
471 first = false;
472 }
473 return sb.toString();
474 }
475
476 @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
477 public void testSimpleToMap(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
478 Function<Integer, Integer> keyFn = i -> i * 2;
479 Function<Integer, Integer> valueFn = i -> i * 4;
480
481 List<Integer> dataAsList = Arrays.asList(data.stream().toArray(Integer[]::new));
482 Set<Integer> dataAsSet = new HashSet<>(dataAsList);
483
484 BinaryOperator<Integer> sum = Integer::sum;
485 for (BinaryOperator<Integer> op : Arrays.asList((u, v) -> u,
486 (u, v) -> v,
487 sum)) {
488 try {
489 exerciseMapCollection(data, toMap(keyFn, valueFn),
490 new ToMapAssertion<>(keyFn, valueFn, op, HashMap.class));
491 if (dataAsList.size() != dataAsSet.size())
492 fail("Expected ISE on input with duplicates");
493 }
494 catch (IllegalStateException e) {
495 if (dataAsList.size() == dataAsSet.size())
496 fail("Expected no ISE on input without duplicates");
497 }
498
499 exerciseMapCollection(data, toMap(keyFn, valueFn, op),
500 new ToMapAssertion<>(keyFn, valueFn, op, HashMap.class));
501
502 exerciseMapCollection(data, toMap(keyFn, valueFn, op, TreeMap::new),
503 new ToMapAssertion<>(keyFn, valueFn, op, TreeMap.class));
504 }
505
506 // For concurrent maps, only use commutative merge functions
507 try {
508 exerciseMapCollection(data, toConcurrentMap(keyFn, valueFn),
509 new ToMapAssertion<>(keyFn, valueFn, sum, ConcurrentHashMap.class));
510 if (dataAsList.size() != dataAsSet.size())
511 fail("Expected ISE on input with duplicates");
512 }
513 catch (IllegalStateException e) {
514 if (dataAsList.size() == dataAsSet.size())
515 fail("Expected no ISE on input without duplicates");
516 }
517
518 exerciseMapCollection(data, toConcurrentMap(keyFn, valueFn, sum),
519 new ToMapAssertion<>(keyFn, valueFn, sum, ConcurrentHashMap.class));
520
521 exerciseMapCollection(data, toConcurrentMap(keyFn, valueFn, sum, ConcurrentSkipListMap::new),
522 new ToMapAssertion<>(keyFn, valueFn, sum, ConcurrentSkipListMap.class));
523 }
524
525 @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
526 public void testSimpleGroupingBy(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
527 Function<Integer, Integer> classifier = i -> i % 3;
528
529 // Single-level groupBy
530 exerciseMapCollection(data, groupingBy(classifier),
531 new GroupingByAssertion<>(classifier, HashMap.class,
532 new ToListAssertion<>()));
533 exerciseMapCollection(data, groupingByConcurrent(classifier),
534 new GroupingByAssertion<>(classifier, ConcurrentHashMap.class,
535 new ToListAssertion<>()));
536
537 // With explicit constructors
538 exerciseMapCollection(data,
539 groupingBy(classifier, TreeMap::new, toCollection(HashSet::new)),
540 new GroupingByAssertion<>(classifier, TreeMap.class,
541 new ToCollectionAssertion<Integer>(HashSet.class, false)));
542 exerciseMapCollection(data,
543 groupingByConcurrent(classifier, ConcurrentSkipListMap::new,
544 toCollection(HashSet::new)),
545 new GroupingByAssertion<>(classifier, ConcurrentSkipListMap.class,
546 new ToCollectionAssertion<Integer>(HashSet.class, false)));
547 }
548
549 @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
550 public void testGroupingByWithMapping(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
551 Function<Integer, Integer> classifier = i -> i % 3;
552 Function<Integer, Integer> mapper = i -> i * 2;
553
554 exerciseMapCollection(data,
555 groupingBy(classifier, mapping(mapper, toList())),
556 new GroupingByAssertion<>(classifier, HashMap.class,
557 new MappingAssertion<>(mapper,
558 new ToListAssertion<>())));
559 }
560
561 @Test(groups = { "serialization-hostile" })
562 public void testFlatMappingClose() {
563 Function<Integer, Integer> classifier = i -> i;
564 AtomicInteger ai = new AtomicInteger();
565 Function<Integer, Stream<Integer>> flatMapper = i -> Stream.of(i, i).onClose(ai::getAndIncrement);
566 Map<Integer, List<Integer>> m = Stream.of(1, 2).collect(groupingBy(classifier, flatMapping(flatMapper, toList())));
567 assertEquals(m.size(), ai.get());
568 }
569
570 @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
571 public void testGroupingByWithFlatMapping(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
572 Function<Integer, Integer> classifier = i -> i % 3;
573 Function<Integer, Stream<Integer>> flatMapperByNull = i -> null;
574 Function<Integer, Stream<Integer>> flatMapperBy0 = i -> Stream.empty();
575 Function<Integer, Stream<Integer>> flatMapperBy2 = i -> Stream.of(i, i);
576
577 exerciseMapCollection(data,
578 groupingBy(classifier, flatMapping(flatMapperByNull, toList())),
579 new GroupingByAssertion<>(classifier, HashMap.class,
580 new FlatMappingAssertion<>(flatMapperBy0,
581 new ToListAssertion<>())));
582 exerciseMapCollection(data,
583 groupingBy(classifier, flatMapping(flatMapperBy0, toList())),
584 new GroupingByAssertion<>(classifier, HashMap.class,
585 new FlatMappingAssertion<>(flatMapperBy0,
586 new ToListAssertion<>())));
587 exerciseMapCollection(data,
588 groupingBy(classifier, flatMapping(flatMapperBy2, toList())),
589 new GroupingByAssertion<>(classifier, HashMap.class,
590 new FlatMappingAssertion<>(flatMapperBy2,
591 new ToListAssertion<>())));
592 }
593
594 @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
595 public void testGroupingByWithFiltering(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
596 Function<Integer, Integer> classifier = i -> i % 3;
597 Predicate<Integer> filteringByMod2 = i -> i % 2 == 0;
598 Predicate<Integer> filteringByUnder100 = i -> i % 2 < 100;
599 Predicate<Integer> filteringByTrue = i -> true;
600 Predicate<Integer> filteringByFalse = i -> false;
601
602 exerciseMapCollection(data,
603 groupingBy(classifier, filtering(filteringByMod2, toList())),
604 new GroupingByAssertion<>(classifier, HashMap.class,
605 new FilteringAssertion<>(filteringByMod2,
606 new ToListAssertion<>())));
607 exerciseMapCollection(data,
608 groupingBy(classifier, filtering(filteringByUnder100, toList())),
609 new GroupingByAssertion<>(classifier, HashMap.class,
610 new FilteringAssertion<>(filteringByUnder100,
611 new ToListAssertion<>())));
612 exerciseMapCollection(data,
613 groupingBy(classifier, filtering(filteringByTrue, toList())),
614 new GroupingByAssertion<>(classifier, HashMap.class,
615 new FilteringAssertion<>(filteringByTrue,
616 new ToListAssertion<>())));
617 exerciseMapCollection(data,
618 groupingBy(classifier, filtering(filteringByFalse, toList())),
619 new GroupingByAssertion<>(classifier, HashMap.class,
620 new FilteringAssertion<>(filteringByFalse,
621 new ToListAssertion<>())));
622 }
623
624 @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
625 public void testTwoLevelGroupingBy(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
626 Function<Integer, Integer> classifier = i -> i % 6;
627 Function<Integer, Integer> classifier2 = i -> i % 23;
628
629 // Two-level groupBy
630 exerciseMapCollection(data,
631 groupingBy(classifier, groupingBy(classifier2)),
632 new GroupingByAssertion<>(classifier, HashMap.class,
633 new GroupingByAssertion<>(classifier2, HashMap.class,
634 new ToListAssertion<>())));
635 // with concurrent as upstream
636 exerciseMapCollection(data,
637 groupingByConcurrent(classifier, groupingBy(classifier2)),
638 new GroupingByAssertion<>(classifier, ConcurrentHashMap.class,
639 new GroupingByAssertion<>(classifier2, HashMap.class,
640 new ToListAssertion<>())));
641 // with concurrent as downstream
642 exerciseMapCollection(data,
643 groupingBy(classifier, groupingByConcurrent(classifier2)),
644 new GroupingByAssertion<>(classifier, HashMap.class,
645 new GroupingByAssertion<>(classifier2, ConcurrentHashMap.class,
646 new ToListAssertion<>())));
647 // with concurrent as upstream and downstream
648 exerciseMapCollection(data,
649 groupingByConcurrent(classifier, groupingByConcurrent(classifier2)),
650 new GroupingByAssertion<>(classifier, ConcurrentHashMap.class,
651 new GroupingByAssertion<>(classifier2, ConcurrentHashMap.class,
652 new ToListAssertion<>())));
653
654 // With explicit constructors
655 exerciseMapCollection(data,
656 groupingBy(classifier, TreeMap::new, groupingBy(classifier2, TreeMap::new, toCollection(HashSet::new))),
657 new GroupingByAssertion<>(classifier, TreeMap.class,
658 new GroupingByAssertion<>(classifier2, TreeMap.class,
659 new ToCollectionAssertion<Integer>(HashSet.class, false))));
660 // with concurrent as upstream
661 exerciseMapCollection(data,
662 groupingByConcurrent(classifier, ConcurrentSkipListMap::new, groupingBy(classifier2, TreeMap::new, toList())),
663 new GroupingByAssertion<>(classifier, ConcurrentSkipListMap.class,
664 new GroupingByAssertion<>(classifier2, TreeMap.class,
665 new ToListAssertion<>())));
666 // with concurrent as downstream
667 exerciseMapCollection(data,
668 groupingBy(classifier, TreeMap::new, groupingByConcurrent(classifier2, ConcurrentSkipListMap::new, toList())),
669 new GroupingByAssertion<>(classifier, TreeMap.class,
670 new GroupingByAssertion<>(classifier2, ConcurrentSkipListMap.class,
671 new ToListAssertion<>())));
672 // with concurrent as upstream and downstream
673 exerciseMapCollection(data,
674 groupingByConcurrent(classifier, ConcurrentSkipListMap::new, groupingByConcurrent(classifier2, ConcurrentSkipListMap::new, toList())),
675 new GroupingByAssertion<>(classifier, ConcurrentSkipListMap.class,
676 new GroupingByAssertion<>(classifier2, ConcurrentSkipListMap.class,
677 new ToListAssertion<>())));
678 }
679
680 @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
681 public void testGroupubgByWithReducing(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
682 Function<Integer, Integer> classifier = i -> i % 3;
683
684 // Single-level simple reduce
685 exerciseMapCollection(data,
686 groupingBy(classifier, reducing(0, Integer::sum)),
687 new GroupingByAssertion<>(classifier, HashMap.class,
688 new ReducingAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
689 // with concurrent
690 exerciseMapCollection(data,
691 groupingByConcurrent(classifier, reducing(0, Integer::sum)),
692 new GroupingByAssertion<>(classifier, ConcurrentHashMap.class,
693 new ReducingAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
694
695 // With explicit constructors
696 exerciseMapCollection(data,
697 groupingBy(classifier, TreeMap::new, reducing(0, Integer::sum)),
698 new GroupingByAssertion<>(classifier, TreeMap.class,
699 new ReducingAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
700 // with concurrent
701 exerciseMapCollection(data,
702 groupingByConcurrent(classifier, ConcurrentSkipListMap::new, reducing(0, Integer::sum)),
703 new GroupingByAssertion<>(classifier, ConcurrentSkipListMap.class,
704 new ReducingAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
705
706 // Single-level map-reduce
707 exerciseMapCollection(data,
708 groupingBy(classifier, reducing(0, mDoubler, Integer::sum)),
709 new GroupingByAssertion<>(classifier, HashMap.class,
710 new ReducingAssertion<>(0, mDoubler, Integer::sum)));
711 // with concurrent
712 exerciseMapCollection(data,
713 groupingByConcurrent(classifier, reducing(0, mDoubler, Integer::sum)),
714 new GroupingByAssertion<>(classifier, ConcurrentHashMap.class,
715 new ReducingAssertion<>(0, mDoubler, Integer::sum)));
716
717 // With explicit constructors
718 exerciseMapCollection(data,
719 groupingBy(classifier, TreeMap::new, reducing(0, mDoubler, Integer::sum)),
720 new GroupingByAssertion<>(classifier, TreeMap.class,
721 new ReducingAssertion<>(0, mDoubler, Integer::sum)));
722 // with concurrent
723 exerciseMapCollection(data,
724 groupingByConcurrent(classifier, ConcurrentSkipListMap::new, reducing(0, mDoubler, Integer::sum)),
725 new GroupingByAssertion<>(classifier, ConcurrentSkipListMap.class,
726 new ReducingAssertion<>(0, mDoubler, Integer::sum)));
727 }
728
729 @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
730 public void testSimplePartitioningBy(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
731 Predicate<Integer> classifier = i -> i % 3 == 0;
732
733 // Single-level partition to downstream List
734 exerciseMapCollection(data,
735 partitioningBy(classifier),
736 new PartitioningByAssertion<>(classifier, new ToListAssertion<>()));
737 exerciseMapCollection(data,
738 partitioningBy(classifier, toList()),
739 new PartitioningByAssertion<>(classifier, new ToListAssertion<>()));
740 }
741
742 @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
743 public void testTwoLevelPartitioningBy(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
744 Predicate<Integer> classifier = i -> i % 3 == 0;
745 Predicate<Integer> classifier2 = i -> i % 7 == 0;
746
747 // Two level partition
748 exerciseMapCollection(data,
749 partitioningBy(classifier, partitioningBy(classifier2)),
750 new PartitioningByAssertion<>(classifier,
751 new PartitioningByAssertion(classifier2, new ToListAssertion<>())));
752
753 // Two level partition with reduce
754 exerciseMapCollection(data,
755 partitioningBy(classifier, reducing(0, Integer::sum)),
756 new PartitioningByAssertion<>(classifier,
757 new ReducingAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
758 }
759
760 @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
761 public void testComposeFinisher(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
762 List<Integer> asList = exerciseTerminalOps(data, s -> s.collect(toList()));
763 List<Integer> asImmutableList = exerciseTerminalOps(data, s -> s.collect(collectingAndThen(toList(), Collections::unmodifiableList)));
764 assertEquals(asList, asImmutableList);
765 try {
766 asImmutableList.add(0);
767 fail("Expecting immutable result");
768 }
769 catch (UnsupportedOperationException ignored) { }
770 }
771
772 @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
773 public void testPairing(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
774 Collector<Integer, ?, Long> summing = Collectors.summingLong(Integer::valueOf);
775 Collector<Integer, ?, Long> counting = Collectors.counting();
776 Collector<Integer, ?, Integer> min = collectingAndThen(Collectors.<Integer>minBy(Comparator.naturalOrder()),
777 opt -> opt.orElse(Integer.MAX_VALUE));
778 Collector<Integer, ?, Integer> max = collectingAndThen(Collectors.<Integer>maxBy(Comparator.naturalOrder()),
779 opt -> opt.orElse(Integer.MIN_VALUE));
780 Collector<Integer, ?, String> joining = mapping(String::valueOf, Collectors.joining(", ", "[", "]"));
781
782 Collector<Integer, ?, Map.Entry<Long, Long>> sumAndCount = Collectors.pairing(summing, counting, Map::entry);
783 Collector<Integer, ?, Map.Entry<Integer, Integer>> minAndMax = Collectors.pairing(min, max, Map::entry);
784 Collector<Integer, ?, Double> averaging = Collectors.pairing(summing, counting,
785 (sum, count) -> ((double)sum) / count);
786 Collector<Integer, ?, String> summaryStatistics = Collectors.pairing(sumAndCount, minAndMax,
787 (sumCountEntry, minMaxEntry) -> new IntSummaryStatistics(
788 sumCountEntry.getValue(), minMaxEntry.getKey(),
789 minMaxEntry.getValue(), sumCountEntry.getKey()).toString());
790 Collector<Integer, ?, String> countAndContent = Collectors.pairing(counting, joining,
791 (count, content) -> count+": "+content);
792
793 assertCollect(data, sumAndCount, stream -> {
794 List<Integer> list = stream.collect(toList());
795 return Map.entry(list.stream().mapToLong(Integer::intValue).sum(), (long) list.size());
796 });
797 assertCollect(data, averaging, stream -> stream.mapToInt(Integer::intValue).average().orElse(Double.NaN));
798 assertCollect(data, summaryStatistics,
799 stream -> stream.mapToInt(Integer::intValue).summaryStatistics().toString());
800 assertCollect(data, countAndContent, stream -> {
801 List<Integer> list = stream.collect(toList());
802 return list.size()+": "+list;
803 });
804
805 Function<Integer, Integer> classifier = i -> i % 3;
806 exerciseMapCollection(data, groupingBy(classifier, sumAndCount),
807 new GroupingByAssertion<>(classifier, Map.class,
808 new PairingAssertion<>(summing, counting, Map::entry)));
809 }
810 }