Like Share Discussion Bookmark Smile

J.J. Huang   2020-04-09   Java   瀏覽次數:

Java 8 | Stream (下)

Stream的另一個價值是創造性地支持並行處理(parallel processing)。Stream操作可以是順序的,也可以是並行的。順序操作通過單線程執行,而並行操作則通過多線程執行。

並行(parallel)程序

parallelStream是流並行處理程序的代替方法。以下實例我們使用parallelStream來輸出空字符串的數量:(可以很容易的在順序運行和並行直接切換)

1
2
3
List<String> strings = Arrays.asList("abc", "", "bc", "efg", "abcd","", "jkl");
// 獲取空字符串的數量
long count = strings.parallelStream().filter(string -> string.isEmpty()).count();

註:這邊一定會有疑問到底Stream和Parallel差在哪邊,可以參考這篇文章Stream 與平行化,有更詳細的說明。

下面的例子就演示瞭如何使用並行流進行操作來提高運行效率,程式碼非常簡單。

首先我們創建一個大的list,裡面的元素都是唯一的:

1
2
3
4
5
6
int max = 1000000;
List<String> values = new ArrayList<>(max);
for (int i = 0; i < max; i++) {
UUID uuid = UUID.randomUUID();
values.add(uuid.toString());
}
  • 順序排序
1
2
3
4
5
6
7
8
9
10
11
long t0 = System.nanoTime();

long count = values.stream().sorted().count();
System.out.println(count);

long t1 = System.nanoTime();

long millis = TimeUnit.NANOSECONDS.toMillis(t1 - t0);
System.out.println(String.format("sequential sort took: %d ms", millis));

// sequential sort took: 899 ms
  • 並行排序
1
2
3
4
5
6
7
8
9
10
11
long t0 = System.nanoTime();

long count = values.parallelStream().sorted().count();
System.out.println(count);

long t1 = System.nanoTime();

long millis = TimeUnit.NANOSECONDS.toMillis(t1 - t0);
System.out.println(String.format("parallel sort took: %d ms", millis));

// parallel sort took: 472 ms

如你所見,所有的程式碼段幾乎都相同,唯一的不同就是把stream()改成了parallelStream(),結果並行排序快了50%`。

Collectors

Collectors類實現了很多歸約操作,例如將流轉換成集合和聚合元素。Collectors可用於返回列表或字符串:

1
2
3
4
5
6
List<String>strings = Arrays.asList("abc", "", "bc", "efg", "abcd","", "jkl");
List<String> filtered = strings.stream().filter(string -> !string.isEmpty()).collect(Collectors.toList());
 
System.out.println("篩選列表: " + filtered);
String mergedString = strings.stream().filter(string -> !string.isEmpty()).collect(Collectors.joining(", "));
System.out.println("合併字符串: " + mergedString);

輸出的結果為:

1
2
篩選列表: [abc, bc, efg, abcd, jkl]
合併字符串: abc, bc, efg, abcd, jkl

範例

對於上一篇的tasks集合,我們可以用下面的程式碼計算所有任務的點數之和:

1
2
3
4
5
6
7
8
// Calculate total points of all tasks
final double totalPoints = tasks
.stream()
.parallel()
.map( task -> task.getPoints() ) // or map( Task::getPoints )
.reduce( 0, Integer::sum );

System.out.println( "Total points (all tasks): " + totalPoints );

這裡我們使用parallel方法並行處理所有的task,並使用reduce方法計算最終的結果。輸出如下:

1
Total points(all tasks): 26.0

對於一個集合,經常需要根據某些條件對其中的元素分組。利用stream提供的API可以很快完成這類任務,程式碼如下:

1
2
3
4
5
// Group tasks by their status
final Map< Status, List< Task > > map = tasks
.stream()
.collect( Collectors.groupingBy( Task::getStatus ) );
System.out.println( map );

輸出的結果為:

1
{CLOSED=[[CLOSED, 8]], OPEN=[[OPEN, 5], [OPEN, 13]]}

最後關於tasks集合的例子問題是:如何計算集合中每個任務的點數在集合中所佔的比重,具體處理的程式碼如下:

1
2
3
4
5
6
7
8
9
10
11
12
// Calculate the weight of each tasks (as percent of total points) 
final Collection< String > result = tasks
.stream() // Stream< String >
.mapToInt( Task::getPoints ) // IntStream
.asLongStream() // LongStream
.mapToDouble( points -> points / totalPoints ) // DoubleStream
.boxed() // Stream< Double >
.mapToLong( weigth -> ( long )( weigth * 100 ) ) // LongStream
.mapToObj( percentage -> percentage + "%" ) // Stream< String>
.collect( Collectors.toList() ); // List< String >

System.out.println( result );

輸出的結果為:

1
[19%, 50%, 30%]

最後,正如之前所說,Stream API不僅可以作用於Java集合,傳統的IO操作(從文件或者網絡一行一行得讀取資料)可以受益於stream處理,這裡有一個小例子:

1
2
3
4
final Path path = new File( filename ).toPath();
try( Stream< String > lines = Files.lines( path, StandardCharsets.UTF_8 ) ) {
lines.onClose( () -> System.out.println("Done!") ).forEach( System.out::println );
}

Stream的方法onClose返回一個等價的有額外句柄的Stream,當Streamclose()方法被調用的時候這個句柄會被執行。Stream APILambda表達式還有接口默認方法和靜態方法支持的方法引用,是Java 8對軟件開發的現代範式的響應。


註:以上參考了
Stream 與平行化
Java 8 Lambda新語法,簡化程式,增強效能
Java 8 新特性
Java 8的新特性—终极版
现代化 Java - Java8 指南