1 /* 2 * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 package org.openjdk.tests.java.util.stream; 24 25 import java.util.ArrayList; 26 import java.util.Arrays; 27 import java.util.Collection; 28 import java.util.Collections; 29 import java.util.Comparator; 30 import java.util.HashMap; 31 import java.util.HashSet; 32 import java.util.IntSummaryStatistics; 33 import java.util.Iterator; 34 import java.util.List; 35 import java.util.Map; 36 import java.util.Optional; 37 import java.util.Set; 38 import java.util.StringJoiner; 39 import java.util.TreeMap; 40 import java.util.concurrent.ConcurrentHashMap; 41 import java.util.concurrent.ConcurrentSkipListMap; 42 import java.util.concurrent.atomic.AtomicInteger; 43 import java.util.function.BiFunction; 44 import java.util.function.BinaryOperator; 45 import java.util.function.Function; 46 import java.util.function.Predicate; 47 import java.util.function.Supplier; 48 import java.util.stream.Collector; 49 import java.util.stream.Collectors; 50 import java.util.stream.LambdaTestHelpers; 51 import java.util.stream.OpTestCase; 52 import java.util.stream.Stream; 53 import java.util.stream.StreamOpFlagTestHelper; 54 import java.util.stream.StreamTestDataProvider; 55 import java.util.stream.TestData; 56 57 import org.testng.annotations.Test; 58 59 import static java.util.stream.Collectors.collectingAndThen; 60 import static java.util.stream.Collectors.flatMapping; 61 import static java.util.stream.Collectors.filtering; 62 import static java.util.stream.Collectors.groupingBy; 63 import static java.util.stream.Collectors.groupingByConcurrent; 64 import static java.util.stream.Collectors.mapping; 65 import static java.util.stream.Collectors.partitioningBy; 66 import static java.util.stream.Collectors.reducing; 67 import static java.util.stream.Collectors.toCollection; 68 import static java.util.stream.Collectors.toConcurrentMap; 69 import static java.util.stream.Collectors.toList; 70 import static java.util.stream.Collectors.toMap; 71 import static java.util.stream.Collectors.toSet; 72 import static java.util.stream.LambdaTestHelpers.assertContents; 73 import static java.util.stream.LambdaTestHelpers.assertContentsUnordered; 74 import static java.util.stream.LambdaTestHelpers.mDoubler; 75 76 /* 77 * @test 78 * @bug 8071600 8144675 79 * @summary Test for collectors. 80 */ 81 public class CollectorsTest extends OpTestCase { 82 83 private abstract static class CollectorAssertion<T, U> { 84 abstract void assertValue(U value, 85 Supplier<Stream<T>> source, 86 boolean ordered) throws ReflectiveOperationException; 87 } 88 89 static class MappingAssertion<T, V, R> extends CollectorAssertion<T, R> { 90 private final Function<T, V> mapper; 91 private final CollectorAssertion<V, R> downstream; 92 93 MappingAssertion(Function<T, V> mapper, CollectorAssertion<V, R> downstream) { 94 this.mapper = mapper; 95 this.downstream = downstream; 96 } 97 98 @Override 99 void assertValue(R value, Supplier<Stream<T>> source, boolean ordered) throws ReflectiveOperationException { 100 downstream.assertValue(value, 101 () -> source.get().map(mapper::apply), 102 ordered); 103 } 104 } 105 106 static class FlatMappingAssertion<T, V, R> extends CollectorAssertion<T, R> { 107 private final Function<T, Stream<V>> mapper; 108 private final CollectorAssertion<V, R> downstream; 109 110 FlatMappingAssertion(Function<T, Stream<V>> mapper, 111 CollectorAssertion<V, R> downstream) { 112 this.mapper = mapper; 113 this.downstream = downstream; 114 } 115 116 @Override 117 void assertValue(R value, Supplier<Stream<T>> source, boolean ordered) throws ReflectiveOperationException { 118 downstream.assertValue(value, 119 () -> source.get().flatMap(mapper::apply), 120 ordered); 121 } 122 } 123 124 static class FilteringAssertion<T, R> extends CollectorAssertion<T, R> { 125 private final Predicate<T> filter; 126 private final CollectorAssertion<T, R> downstream; 127 128 public FilteringAssertion(Predicate<T> filter, CollectorAssertion<T, R> downstream) { 129 this.filter = filter; 130 this.downstream = downstream; 131 } 132 133 @Override 134 void assertValue(R value, Supplier<Stream<T>> source, boolean ordered) throws ReflectiveOperationException { 135 downstream.assertValue(value, 136 () -> source.get().filter(filter), 137 ordered); 138 } 139 } 140 141 static class GroupingByAssertion<T, K, V, M extends Map<K, ? extends V>> extends CollectorAssertion<T, M> { 142 private final Class<? extends Map> clazz; 143 private final Function<T, K> classifier; 144 private final CollectorAssertion<T,V> downstream; 145 146 GroupingByAssertion(Function<T, K> classifier, Class<? extends Map> clazz, 147 CollectorAssertion<T, V> downstream) { 148 this.clazz = clazz; 149 this.classifier = classifier; 150 this.downstream = downstream; 151 } 152 153 @Override 154 void assertValue(M map, 155 Supplier<Stream<T>> source, 156 boolean ordered) throws ReflectiveOperationException { 157 if (!clazz.isAssignableFrom(map.getClass())) 158 fail(String.format("Class mismatch in GroupingByAssertion: %s, %s", clazz, map.getClass())); 159 assertContentsUnordered(map.keySet(), source.get().map(classifier).collect(toSet())); 160 for (Map.Entry<K, ? extends V> entry : map.entrySet()) { 161 K key = entry.getKey(); 162 downstream.assertValue(entry.getValue(), 163 () -> source.get().filter(e -> classifier.apply(e).equals(key)), 164 ordered); 165 } 166 } 167 } 168 169 static class ToMapAssertion<T, K, V, M extends Map<K,V>> extends CollectorAssertion<T, M> { 170 private final Class<? extends Map> clazz; 171 private final Function<T, K> keyFn; 172 private final Function<T, V> valueFn; 173 private final BinaryOperator<V> mergeFn; 174 175 ToMapAssertion(Function<T, K> keyFn, 176 Function<T, V> valueFn, 177 BinaryOperator<V> mergeFn, 178 Class<? extends Map> clazz) { 179 this.clazz = clazz; 180 this.keyFn = keyFn; 181 this.valueFn = valueFn; 182 this.mergeFn = mergeFn; 183 } 184 185 @Override 186 void assertValue(M map, Supplier<Stream<T>> source, boolean ordered) throws ReflectiveOperationException { 187 if (!clazz.isAssignableFrom(map.getClass())) 188 fail(String.format("Class mismatch in ToMapAssertion: %s, %s", clazz, map.getClass())); 189 Set<K> uniqueKeys = source.get().map(keyFn).collect(toSet()); 190 assertEquals(uniqueKeys, map.keySet()); 191 source.get().forEach(t -> { 192 K key = keyFn.apply(t); 193 V v = source.get() 194 .filter(e -> key.equals(keyFn.apply(e))) 195 .map(valueFn) 196 .reduce(mergeFn) 197 .get(); 198 assertEquals(map.get(key), v); 199 }); 200 } 201 } 202 203 static class PartitioningByAssertion<T, D> extends CollectorAssertion<T, Map<Boolean,D>> { 204 private final Predicate<T> predicate; 205 private final CollectorAssertion<T,D> downstream; 206 207 PartitioningByAssertion(Predicate<T> predicate, CollectorAssertion<T, D> downstream) { 208 this.predicate = predicate; 209 this.downstream = downstream; 210 } 211 212 @Override 213 void assertValue(Map<Boolean, D> map, 214 Supplier<Stream<T>> source, 215 boolean ordered) throws ReflectiveOperationException { 216 if (!Map.class.isAssignableFrom(map.getClass())) 217 fail(String.format("Class mismatch in PartitioningByAssertion: %s", map.getClass())); 218 assertEquals(2, map.size()); 219 downstream.assertValue(map.get(true), () -> source.get().filter(predicate), ordered); 220 downstream.assertValue(map.get(false), () -> source.get().filter(predicate.negate()), ordered); 221 } 222 } 223 224 static class ToListAssertion<T> extends CollectorAssertion<T, List<T>> { 225 @Override 226 void assertValue(List<T> value, Supplier<Stream<T>> source, boolean ordered) 227 throws ReflectiveOperationException { 228 if (!List.class.isAssignableFrom(value.getClass())) 229 fail(String.format("Class mismatch in ToListAssertion: %s", value.getClass())); 230 Stream<T> stream = source.get(); 231 List<T> result = new ArrayList<>(); 232 for (Iterator<T> it = stream.iterator(); it.hasNext(); ) // avoid capturing result::add 233 result.add(it.next()); 234 if (StreamOpFlagTestHelper.isStreamOrdered(stream) && ordered) 235 assertContents(value, result); 236 else 237 assertContentsUnordered(value, result); 238 } 239 } 240 241 static class ToCollectionAssertion<T> extends CollectorAssertion<T, Collection<T>> { 242 private final Class<? extends Collection> clazz; 243 private final boolean targetOrdered; 244 245 ToCollectionAssertion(Class<? extends Collection> clazz, boolean targetOrdered) { 246 this.clazz = clazz; 247 this.targetOrdered = targetOrdered; 248 } 249 250 @Override 251 void assertValue(Collection<T> value, Supplier<Stream<T>> source, boolean ordered) 252 throws ReflectiveOperationException { 253 if (!clazz.isAssignableFrom(value.getClass())) 254 fail(String.format("Class mismatch in ToCollectionAssertion: %s, %s", clazz, value.getClass())); 255 Stream<T> stream = source.get(); 256 Collection<T> result = clazz.newInstance(); 257 for (Iterator<T> it = stream.iterator(); it.hasNext(); ) // avoid capturing result::add 258 result.add(it.next()); 259 if (StreamOpFlagTestHelper.isStreamOrdered(stream) && targetOrdered && ordered) 260 assertContents(value, result); 261 else 262 assertContentsUnordered(value, result); 263 } 264 } 265 266 static class ReducingAssertion<T, U> extends CollectorAssertion<T, U> { 267 private final U identity; 268 private final Function<T, U> mapper; 269 private final BinaryOperator<U> reducer; 270 271 ReducingAssertion(U identity, Function<T, U> mapper, BinaryOperator<U> reducer) { 272 this.identity = identity; 273 this.mapper = mapper; 274 this.reducer = reducer; 275 } 276 277 @Override 278 void assertValue(U value, Supplier<Stream<T>> source, boolean ordered) 279 throws ReflectiveOperationException { 280 Optional<U> reduced = source.get().map(mapper).reduce(reducer); 281 if (value == null) 282 assertTrue(!reduced.isPresent()); 283 else if (!reduced.isPresent()) { 284 assertEquals(value, identity); 285 } 286 else { 287 assertEquals(value, reduced.get()); 288 } 289 } 290 } 291 292 static class PairingAssertion<T, R1, R2, RR> extends CollectorAssertion<T, RR> { 293 private final Collector<T, ?, R1> c1; 294 private final Collector<T, ?, R2> c2; 295 private final BiFunction<? super R1, ? super R2, ? extends RR> finisher; 296 297 PairingAssertion(Collector<T, ?, R1> c1, Collector<T, ?, R2> c2, 298 BiFunction<? super R1, ? super R2, ? extends RR> finisher) { 299 this.c1 = c1; 300 this.c2 = c2; 301 this.finisher = finisher; 302 } 303 304 @Override 305 void assertValue(RR value, Supplier<Stream<T>> source, boolean ordered) { 306 R1 r1 = source.get().collect(c1); 307 R2 r2 = source.get().collect(c2); 308 RR expected = finisher.apply(r1, r2); 309 assertEquals(value, expected); 310 } 311 } 312 313 private <T> ResultAsserter<T> mapTabulationAsserter(boolean ordered) { 314 return (act, exp, ord, par) -> { 315 if (par && (!ordered || !ord)) { 316 CollectorsTest.nestedMapEqualityAssertion(act, exp); 317 } 318 else { 319 LambdaTestHelpers.assertContentsEqual(act, exp); 320 } 321 }; 322 } 323 324 private<T, M extends Map> 325 void exerciseMapCollection(TestData<T, Stream<T>> data, 326 Collector<T, ?, ? extends M> collector, 327 CollectorAssertion<T, M> assertion) 328 throws ReflectiveOperationException { 329 boolean ordered = !collector.characteristics().contains(Collector.Characteristics.UNORDERED); 330 331 M m = withData(data) 332 .terminal(s -> s.collect(collector)) 333 .resultAsserter(mapTabulationAsserter(ordered)) 334 .exercise(); 335 assertion.assertValue(m, () -> data.stream(), ordered); 336 337 m = withData(data) 338 .terminal(s -> s.unordered().collect(collector)) 339 .resultAsserter(mapTabulationAsserter(ordered)) 340 .exercise(); 341 assertion.assertValue(m, () -> data.stream(), false); 342 } 343 344 private static void nestedMapEqualityAssertion(Object o1, Object o2) { 345 if (o1 instanceof Map) { 346 Map m1 = (Map) o1; 347 Map m2 = (Map) o2; 348 assertContentsUnordered(m1.keySet(), m2.keySet()); 349 for (Object k : m1.keySet()) 350 nestedMapEqualityAssertion(m1.get(k), m2.get(k)); 351 } 352 else if (o1 instanceof Collection) { 353 assertContentsUnordered(((Collection) o1), ((Collection) o2)); 354 } 355 else 356 assertEquals(o1, o2); 357 } 358 359 private<T, R> void assertCollect(TestData.OfRef<T> data, 360 Collector<T, ?, R> collector, 361 Function<Stream<T>, R> streamReduction) { 362 R check = streamReduction.apply(data.stream()); 363 withData(data).terminal(s -> s.collect(collector)).expectedResult(check).exercise(); 364 } 365 366 @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class) 367 public void testReducing(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException { 368 assertCollect(data, Collectors.reducing(0, Integer::sum), 369 s -> s.reduce(0, Integer::sum)); 370 assertCollect(data, Collectors.reducing(Integer.MAX_VALUE, Integer::min), 371 s -> s.min(Integer::compare).orElse(Integer.MAX_VALUE)); 372 assertCollect(data, Collectors.reducing(Integer.MIN_VALUE, Integer::max), 373 s -> s.max(Integer::compare).orElse(Integer.MIN_VALUE)); 374 375 assertCollect(data, Collectors.reducing(Integer::sum), 376 s -> s.reduce(Integer::sum)); 377 assertCollect(data, Collectors.minBy(Comparator.naturalOrder()), 378 s -> s.min(Integer::compare)); 379 assertCollect(data, Collectors.maxBy(Comparator.naturalOrder()), 380 s -> s.max(Integer::compare)); 381 382 assertCollect(data, Collectors.reducing(0, x -> x*2, Integer::sum), 383 s -> s.map(x -> x*2).reduce(0, Integer::sum)); 384 385 assertCollect(data, Collectors.summingLong(x -> x * 2L), 386 s -> s.map(x -> x*2L).reduce(0L, Long::sum)); 387 assertCollect(data, Collectors.summingInt(x -> x * 2), 388 s -> s.map(x -> x*2).reduce(0, Integer::sum)); 389 assertCollect(data, Collectors.summingDouble(x -> x * 2.0d), 390 s -> s.map(x -> x * 2.0d).reduce(0.0d, Double::sum)); 391 392 assertCollect(data, Collectors.averagingInt(x -> x * 2), 393 s -> s.mapToInt(x -> x * 2).average().orElse(0)); 394 assertCollect(data, Collectors.averagingLong(x -> x * 2), 395 s -> s.mapToLong(x -> x * 2).average().orElse(0)); 396 assertCollect(data, Collectors.averagingDouble(x -> x * 2), 397 s -> s.mapToDouble(x -> x * 2).average().orElse(0)); 398 399 // Test explicit Collector.of 400 Collector<Integer, long[], Double> avg2xint = Collector.of(() -> new long[2], 401 (a, b) -> { 402 a[0] += b * 2; 403 a[1]++; 404 }, 405 (a, b) -> { 406 a[0] += b[0]; 407 a[1] += b[1]; 408 return a; 409 }, 410 a -> a[1] == 0 ? 0.0d : (double) a[0] / a[1]); 411 assertCollect(data, avg2xint, 412 s -> s.mapToInt(x -> x * 2).average().orElse(0)); 413 } 414 415 @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class) 416 public void testJoining(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException { 417 withData(data) 418 .terminal(s -> s.map(Object::toString).collect(Collectors.joining())) 419 .expectedResult(join(data, "")) 420 .exercise(); 421 422 Collector<String, StringBuilder, String> likeJoining = Collector.of(StringBuilder::new, StringBuilder::append, (sb1, sb2) -> sb1.append(sb2.toString()), StringBuilder::toString); 423 withData(data) 424 .terminal(s -> s.map(Object::toString).collect(likeJoining)) 425 .expectedResult(join(data, "")) 426 .exercise(); 427 428 withData(data) 429 .terminal(s -> s.map(Object::toString).collect(Collectors.joining(","))) 430 .expectedResult(join(data, ",")) 431 .exercise(); 432 433 withData(data) 434 .terminal(s -> s.map(Object::toString).collect(Collectors.joining(",", "[", "]"))) 435 .expectedResult("[" + join(data, ",") + "]") 436 .exercise(); 437 438 withData(data) 439 .terminal(s -> s.map(Object::toString) 440 .collect(StringBuilder::new, StringBuilder::append, StringBuilder::append) 441 .toString()) 442 .expectedResult(join(data, "")) 443 .exercise(); 444 445 withData(data) 446 .terminal(s -> s.map(Object::toString) 447 .collect(() -> new StringJoiner(","), 448 (sj, cs) -> sj.add(cs), 449 (j1, j2) -> j1.merge(j2)) 450 .toString()) 451 .expectedResult(join(data, ",")) 452 .exercise(); 453 454 withData(data) 455 .terminal(s -> s.map(Object::toString) 456 .collect(() -> new StringJoiner(",", "[", "]"), 457 (sj, cs) -> sj.add(cs), 458 (j1, j2) -> j1.merge(j2)) 459 .toString()) 460 .expectedResult("[" + join(data, ",") + "]") 461 .exercise(); 462 } 463 464 private<T> String join(TestData.OfRef<T> data, String delim) { 465 StringBuilder sb = new StringBuilder(); 466 boolean first = true; 467 for (T i : data) { 468 if (!first) 469 sb.append(delim); 470 sb.append(i.toString()); 471 first = false; 472 } 473 return sb.toString(); 474 } 475 476 @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class) 477 public void testSimpleToMap(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException { 478 Function<Integer, Integer> keyFn = i -> i * 2; 479 Function<Integer, Integer> valueFn = i -> i * 4; 480 481 List<Integer> dataAsList = Arrays.asList(data.stream().toArray(Integer[]::new)); 482 Set<Integer> dataAsSet = new HashSet<>(dataAsList); 483 484 BinaryOperator<Integer> sum = Integer::sum; 485 for (BinaryOperator<Integer> op : Arrays.asList((u, v) -> u, 486 (u, v) -> v, 487 sum)) { 488 try { 489 exerciseMapCollection(data, toMap(keyFn, valueFn), 490 new ToMapAssertion<>(keyFn, valueFn, op, HashMap.class)); 491 if (dataAsList.size() != dataAsSet.size()) 492 fail("Expected ISE on input with duplicates"); 493 } 494 catch (IllegalStateException e) { 495 if (dataAsList.size() == dataAsSet.size()) 496 fail("Expected no ISE on input without duplicates"); 497 } 498 499 exerciseMapCollection(data, toMap(keyFn, valueFn, op), 500 new ToMapAssertion<>(keyFn, valueFn, op, HashMap.class)); 501 502 exerciseMapCollection(data, toMap(keyFn, valueFn, op, TreeMap::new), 503 new ToMapAssertion<>(keyFn, valueFn, op, TreeMap.class)); 504 } 505 506 // For concurrent maps, only use commutative merge functions 507 try { 508 exerciseMapCollection(data, toConcurrentMap(keyFn, valueFn), 509 new ToMapAssertion<>(keyFn, valueFn, sum, ConcurrentHashMap.class)); 510 if (dataAsList.size() != dataAsSet.size()) 511 fail("Expected ISE on input with duplicates"); 512 } 513 catch (IllegalStateException e) { 514 if (dataAsList.size() == dataAsSet.size()) 515 fail("Expected no ISE on input without duplicates"); 516 } 517 518 exerciseMapCollection(data, toConcurrentMap(keyFn, valueFn, sum), 519 new ToMapAssertion<>(keyFn, valueFn, sum, ConcurrentHashMap.class)); 520 521 exerciseMapCollection(data, toConcurrentMap(keyFn, valueFn, sum, ConcurrentSkipListMap::new), 522 new ToMapAssertion<>(keyFn, valueFn, sum, ConcurrentSkipListMap.class)); 523 } 524 525 @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class) 526 public void testSimpleGroupingBy(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException { 527 Function<Integer, Integer> classifier = i -> i % 3; 528 529 // Single-level groupBy 530 exerciseMapCollection(data, groupingBy(classifier), 531 new GroupingByAssertion<>(classifier, HashMap.class, 532 new ToListAssertion<>())); 533 exerciseMapCollection(data, groupingByConcurrent(classifier), 534 new GroupingByAssertion<>(classifier, ConcurrentHashMap.class, 535 new ToListAssertion<>())); 536 537 // With explicit constructors 538 exerciseMapCollection(data, 539 groupingBy(classifier, TreeMap::new, toCollection(HashSet::new)), 540 new GroupingByAssertion<>(classifier, TreeMap.class, 541 new ToCollectionAssertion<Integer>(HashSet.class, false))); 542 exerciseMapCollection(data, 543 groupingByConcurrent(classifier, ConcurrentSkipListMap::new, 544 toCollection(HashSet::new)), 545 new GroupingByAssertion<>(classifier, ConcurrentSkipListMap.class, 546 new ToCollectionAssertion<Integer>(HashSet.class, false))); 547 } 548 549 @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class) 550 public void testGroupingByWithMapping(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException { 551 Function<Integer, Integer> classifier = i -> i % 3; 552 Function<Integer, Integer> mapper = i -> i * 2; 553 554 exerciseMapCollection(data, 555 groupingBy(classifier, mapping(mapper, toList())), 556 new GroupingByAssertion<>(classifier, HashMap.class, 557 new MappingAssertion<>(mapper, 558 new ToListAssertion<>()))); 559 } 560 561 @Test(groups = { "serialization-hostile" }) 562 public void testFlatMappingClose() { 563 Function<Integer, Integer> classifier = i -> i; 564 AtomicInteger ai = new AtomicInteger(); 565 Function<Integer, Stream<Integer>> flatMapper = i -> Stream.of(i, i).onClose(ai::getAndIncrement); 566 Map<Integer, List<Integer>> m = Stream.of(1, 2).collect(groupingBy(classifier, flatMapping(flatMapper, toList()))); 567 assertEquals(m.size(), ai.get()); 568 } 569 570 @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class) 571 public void testGroupingByWithFlatMapping(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException { 572 Function<Integer, Integer> classifier = i -> i % 3; 573 Function<Integer, Stream<Integer>> flatMapperByNull = i -> null; 574 Function<Integer, Stream<Integer>> flatMapperBy0 = i -> Stream.empty(); 575 Function<Integer, Stream<Integer>> flatMapperBy2 = i -> Stream.of(i, i); 576 577 exerciseMapCollection(data, 578 groupingBy(classifier, flatMapping(flatMapperByNull, toList())), 579 new GroupingByAssertion<>(classifier, HashMap.class, 580 new FlatMappingAssertion<>(flatMapperBy0, 581 new ToListAssertion<>()))); 582 exerciseMapCollection(data, 583 groupingBy(classifier, flatMapping(flatMapperBy0, toList())), 584 new GroupingByAssertion<>(classifier, HashMap.class, 585 new FlatMappingAssertion<>(flatMapperBy0, 586 new ToListAssertion<>()))); 587 exerciseMapCollection(data, 588 groupingBy(classifier, flatMapping(flatMapperBy2, toList())), 589 new GroupingByAssertion<>(classifier, HashMap.class, 590 new FlatMappingAssertion<>(flatMapperBy2, 591 new ToListAssertion<>()))); 592 } 593 594 @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class) 595 public void testGroupingByWithFiltering(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException { 596 Function<Integer, Integer> classifier = i -> i % 3; 597 Predicate<Integer> filteringByMod2 = i -> i % 2 == 0; 598 Predicate<Integer> filteringByUnder100 = i -> i % 2 < 100; 599 Predicate<Integer> filteringByTrue = i -> true; 600 Predicate<Integer> filteringByFalse = i -> false; 601 602 exerciseMapCollection(data, 603 groupingBy(classifier, filtering(filteringByMod2, toList())), 604 new GroupingByAssertion<>(classifier, HashMap.class, 605 new FilteringAssertion<>(filteringByMod2, 606 new ToListAssertion<>()))); 607 exerciseMapCollection(data, 608 groupingBy(classifier, filtering(filteringByUnder100, toList())), 609 new GroupingByAssertion<>(classifier, HashMap.class, 610 new FilteringAssertion<>(filteringByUnder100, 611 new ToListAssertion<>()))); 612 exerciseMapCollection(data, 613 groupingBy(classifier, filtering(filteringByTrue, toList())), 614 new GroupingByAssertion<>(classifier, HashMap.class, 615 new FilteringAssertion<>(filteringByTrue, 616 new ToListAssertion<>()))); 617 exerciseMapCollection(data, 618 groupingBy(classifier, filtering(filteringByFalse, toList())), 619 new GroupingByAssertion<>(classifier, HashMap.class, 620 new FilteringAssertion<>(filteringByFalse, 621 new ToListAssertion<>()))); 622 } 623 624 @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class) 625 public void testTwoLevelGroupingBy(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException { 626 Function<Integer, Integer> classifier = i -> i % 6; 627 Function<Integer, Integer> classifier2 = i -> i % 23; 628 629 // Two-level groupBy 630 exerciseMapCollection(data, 631 groupingBy(classifier, groupingBy(classifier2)), 632 new GroupingByAssertion<>(classifier, HashMap.class, 633 new GroupingByAssertion<>(classifier2, HashMap.class, 634 new ToListAssertion<>()))); 635 // with concurrent as upstream 636 exerciseMapCollection(data, 637 groupingByConcurrent(classifier, groupingBy(classifier2)), 638 new GroupingByAssertion<>(classifier, ConcurrentHashMap.class, 639 new GroupingByAssertion<>(classifier2, HashMap.class, 640 new ToListAssertion<>()))); 641 // with concurrent as downstream 642 exerciseMapCollection(data, 643 groupingBy(classifier, groupingByConcurrent(classifier2)), 644 new GroupingByAssertion<>(classifier, HashMap.class, 645 new GroupingByAssertion<>(classifier2, ConcurrentHashMap.class, 646 new ToListAssertion<>()))); 647 // with concurrent as upstream and downstream 648 exerciseMapCollection(data, 649 groupingByConcurrent(classifier, groupingByConcurrent(classifier2)), 650 new GroupingByAssertion<>(classifier, ConcurrentHashMap.class, 651 new GroupingByAssertion<>(classifier2, ConcurrentHashMap.class, 652 new ToListAssertion<>()))); 653 654 // With explicit constructors 655 exerciseMapCollection(data, 656 groupingBy(classifier, TreeMap::new, groupingBy(classifier2, TreeMap::new, toCollection(HashSet::new))), 657 new GroupingByAssertion<>(classifier, TreeMap.class, 658 new GroupingByAssertion<>(classifier2, TreeMap.class, 659 new ToCollectionAssertion<Integer>(HashSet.class, false)))); 660 // with concurrent as upstream 661 exerciseMapCollection(data, 662 groupingByConcurrent(classifier, ConcurrentSkipListMap::new, groupingBy(classifier2, TreeMap::new, toList())), 663 new GroupingByAssertion<>(classifier, ConcurrentSkipListMap.class, 664 new GroupingByAssertion<>(classifier2, TreeMap.class, 665 new ToListAssertion<>()))); 666 // with concurrent as downstream 667 exerciseMapCollection(data, 668 groupingBy(classifier, TreeMap::new, groupingByConcurrent(classifier2, ConcurrentSkipListMap::new, toList())), 669 new GroupingByAssertion<>(classifier, TreeMap.class, 670 new GroupingByAssertion<>(classifier2, ConcurrentSkipListMap.class, 671 new ToListAssertion<>()))); 672 // with concurrent as upstream and downstream 673 exerciseMapCollection(data, 674 groupingByConcurrent(classifier, ConcurrentSkipListMap::new, groupingByConcurrent(classifier2, ConcurrentSkipListMap::new, toList())), 675 new GroupingByAssertion<>(classifier, ConcurrentSkipListMap.class, 676 new GroupingByAssertion<>(classifier2, ConcurrentSkipListMap.class, 677 new ToListAssertion<>()))); 678 } 679 680 @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class) 681 public void testGroupubgByWithReducing(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException { 682 Function<Integer, Integer> classifier = i -> i % 3; 683 684 // Single-level simple reduce 685 exerciseMapCollection(data, 686 groupingBy(classifier, reducing(0, Integer::sum)), 687 new GroupingByAssertion<>(classifier, HashMap.class, 688 new ReducingAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum))); 689 // with concurrent 690 exerciseMapCollection(data, 691 groupingByConcurrent(classifier, reducing(0, Integer::sum)), 692 new GroupingByAssertion<>(classifier, ConcurrentHashMap.class, 693 new ReducingAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum))); 694 695 // With explicit constructors 696 exerciseMapCollection(data, 697 groupingBy(classifier, TreeMap::new, reducing(0, Integer::sum)), 698 new GroupingByAssertion<>(classifier, TreeMap.class, 699 new ReducingAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum))); 700 // with concurrent 701 exerciseMapCollection(data, 702 groupingByConcurrent(classifier, ConcurrentSkipListMap::new, reducing(0, Integer::sum)), 703 new GroupingByAssertion<>(classifier, ConcurrentSkipListMap.class, 704 new ReducingAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum))); 705 706 // Single-level map-reduce 707 exerciseMapCollection(data, 708 groupingBy(classifier, reducing(0, mDoubler, Integer::sum)), 709 new GroupingByAssertion<>(classifier, HashMap.class, 710 new ReducingAssertion<>(0, mDoubler, Integer::sum))); 711 // with concurrent 712 exerciseMapCollection(data, 713 groupingByConcurrent(classifier, reducing(0, mDoubler, Integer::sum)), 714 new GroupingByAssertion<>(classifier, ConcurrentHashMap.class, 715 new ReducingAssertion<>(0, mDoubler, Integer::sum))); 716 717 // With explicit constructors 718 exerciseMapCollection(data, 719 groupingBy(classifier, TreeMap::new, reducing(0, mDoubler, Integer::sum)), 720 new GroupingByAssertion<>(classifier, TreeMap.class, 721 new ReducingAssertion<>(0, mDoubler, Integer::sum))); 722 // with concurrent 723 exerciseMapCollection(data, 724 groupingByConcurrent(classifier, ConcurrentSkipListMap::new, reducing(0, mDoubler, Integer::sum)), 725 new GroupingByAssertion<>(classifier, ConcurrentSkipListMap.class, 726 new ReducingAssertion<>(0, mDoubler, Integer::sum))); 727 } 728 729 @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class) 730 public void testSimplePartitioningBy(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException { 731 Predicate<Integer> classifier = i -> i % 3 == 0; 732 733 // Single-level partition to downstream List 734 exerciseMapCollection(data, 735 partitioningBy(classifier), 736 new PartitioningByAssertion<>(classifier, new ToListAssertion<>())); 737 exerciseMapCollection(data, 738 partitioningBy(classifier, toList()), 739 new PartitioningByAssertion<>(classifier, new ToListAssertion<>())); 740 } 741 742 @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class) 743 public void testTwoLevelPartitioningBy(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException { 744 Predicate<Integer> classifier = i -> i % 3 == 0; 745 Predicate<Integer> classifier2 = i -> i % 7 == 0; 746 747 // Two level partition 748 exerciseMapCollection(data, 749 partitioningBy(classifier, partitioningBy(classifier2)), 750 new PartitioningByAssertion<>(classifier, 751 new PartitioningByAssertion(classifier2, new ToListAssertion<>()))); 752 753 // Two level partition with reduce 754 exerciseMapCollection(data, 755 partitioningBy(classifier, reducing(0, Integer::sum)), 756 new PartitioningByAssertion<>(classifier, 757 new ReducingAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum))); 758 } 759 760 @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class) 761 public void testComposeFinisher(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException { 762 List<Integer> asList = exerciseTerminalOps(data, s -> s.collect(toList())); 763 List<Integer> asImmutableList = exerciseTerminalOps(data, s -> s.collect(collectingAndThen(toList(), Collections::unmodifiableList))); 764 assertEquals(asList, asImmutableList); 765 try { 766 asImmutableList.add(0); 767 fail("Expecting immutable result"); 768 } 769 catch (UnsupportedOperationException ignored) { } 770 } 771 772 @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class) 773 public void testPairing(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException { 774 Collector<Integer, ?, Long> summing = Collectors.summingLong(Integer::valueOf); 775 Collector<Integer, ?, Long> counting = Collectors.counting(); 776 Collector<Integer, ?, Integer> min = collectingAndThen(Collectors.<Integer>minBy(Comparator.naturalOrder()), 777 opt -> opt.orElse(Integer.MAX_VALUE)); 778 Collector<Integer, ?, Integer> max = collectingAndThen(Collectors.<Integer>maxBy(Comparator.naturalOrder()), 779 opt -> opt.orElse(Integer.MIN_VALUE)); 780 Collector<Integer, ?, String> joining = mapping(String::valueOf, Collectors.joining(", ", "[", "]")); 781 782 Collector<Integer, ?, Map.Entry<Long, Long>> sumAndCount = Collectors.pairing(summing, counting, Map::entry); 783 Collector<Integer, ?, Map.Entry<Integer, Integer>> minAndMax = Collectors.pairing(min, max, Map::entry); 784 Collector<Integer, ?, Double> averaging = Collectors.pairing(summing, counting, 785 (sum, count) -> ((double)sum) / count); 786 Collector<Integer, ?, String> summaryStatistics = Collectors.pairing(sumAndCount, minAndMax, 787 (sumCountEntry, minMaxEntry) -> new IntSummaryStatistics( 788 sumCountEntry.getValue(), minMaxEntry.getKey(), 789 minMaxEntry.getValue(), sumCountEntry.getKey()).toString()); 790 Collector<Integer, ?, String> countAndContent = Collectors.pairing(counting, joining, 791 (count, content) -> count+": "+content); 792 793 assertCollect(data, sumAndCount, stream -> { 794 List<Integer> list = stream.collect(toList()); 795 return Map.entry(list.stream().mapToLong(Integer::intValue).sum(), (long) list.size()); 796 }); 797 assertCollect(data, averaging, stream -> stream.mapToInt(Integer::intValue).average().orElse(Double.NaN)); 798 assertCollect(data, summaryStatistics, 799 stream -> stream.mapToInt(Integer::intValue).summaryStatistics().toString()); 800 assertCollect(data, countAndContent, stream -> { 801 List<Integer> list = stream.collect(toList()); 802 return list.size()+": "+list; 803 }); 804 805 Function<Integer, Integer> classifier = i -> i % 3; 806 exerciseMapCollection(data, groupingBy(classifier, sumAndCount), 807 new GroupingByAssertion<>(classifier, Map.class, 808 new PairingAssertion<>(summing, counting, Map::entry))); 809 } 810 }