本文共 6933 字,大约阅读时间需要 23 分钟。
在当今的软件开发中,多核心CPU和分布式架构已经成为常态。功能API的混聚方式不仅让用户生活变得便利,也推动了软件架构的演进。然而,如何充分发挥多核能力,切分大型任务,让每个子任务并行运行,仍然是一个值得深入探讨的问题。
并发和并行是两个常见的概念,但它们在实现细节上有显著区别。
| 项目 | 区别1 | 实现技术 |
|---|---|---|
| 并行 | 每个任务跑在单独的CPU核心上 | 分支合并框架,并行流 |
| 并发 | 不同任务共享CPU核心,基于时间片调度 | CompletableFuture |
Java5引入了Future接口,用于建模将来某个时刻发生的事情。通过Future,我们可以异步执行耗时任务,避免主线程白白等待。以下是一个使用Future的示例:
package com.test.completable;import com.google.common.base.Stopwatch;import java.util.concurrent.ExecutionException;import java.util.concurrent.Future;import java.util.concurrent.TimeUnit;import java.util.concurrent.TimeoutException;public class FutureTest { static final ExecutorService pool = Executors.newFixedThreadPool(2); public static void main(String[] args) { Stopwatch stopwatch = Stopwatch.createStarted(); Future longFuture = pool.submit(() -> doSomethingLongTime()); doSomething2(); try { final Long longValue = longFuture.get(3, TimeUnit.SECONDS); System.out.println(Thread.currentThread().getName() + " future return value :" + longValue + " : " + stopwatch.stop()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } pool.shutdown(); } private static void doSomething2() { Stopwatch stopwatch = Stopwatch.createStarted(); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " doSomething2 :" + stopwatch.stop()); } private static Long doSomethingLongTime() { Stopwatch stopwatch = Stopwatch.createStarted(); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " doSomethingLongTime : " + stopwatch.stop()); return 1000L; }} Java8引入了CompletableFuture类,优化了异步任务的处理。它支持更灵活的异常管理和异步API实现。以下是一个基于CompletableFuture的价格查询示例:
package com.test.completable;import com.google.common.base.Stopwatch;import java.util.concurrent.ExecutionException;import java.util.concurrent.Future;import java.util.concurrent.TimeUnit;public class Test { public static void main(String[] args) { Shop shop = new Shop("BestShop"); Stopwatch stopwatch = Stopwatch.createStarted(); Future doubleFuture = shop.getPriceFuture("pizza"); System.out.println("getPriceFuture return after: " + stopwatch.stop()); doSomethingElse(); try { final Double price = doubleFuture.get(); System.out.println("price is " + price + " return after: " + stopwatch.stop()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } private static void doSomethingElse() { Stopwatch stopwatch = Stopwatch.createStarted(); DelayUtil.delay(); System.out.println("doSomethingElse " + stopwatch.stop()); }} 提供异步API需要考虑以下技能点:
以下是一个修改后的同步API为异步API的示例:
public class Shop { private final String name; public Shop(String name) { this.name = name; } public Future getPriceFuture(String product) { final CompletableFuture doubleCompletableFuture = new CompletableFuture<>(); new Thread(() -> { try { doubleCompletableFuture.complete(getPrice(product)); } catch (Exception ex) { doubleCompletableFuture.completeExceptionally(ex); } }).start(); return doubleCompletableFuture; } public Double getPrice(String product) throws InterruptedException { DelayUtil.delay(); return 100.0; }} 在异步任务中,错误处理也是关键。CompletableFuture支持异常传播,确保主线程能够接管子任务抛出的异常:
private static void test2() { Shop shop = new Shop("BestShop"); Stopwatch stopwatch = Stopwatch.createStarted(); Future doubleFuture = shop.getPriceFutureException("pizza"); System.out.println("getPriceFuture return after: " + stopwatch.stop()); doSomethingElse(); try { final Double price = doubleFuture.get(); System.out.println("price is " + price + " return after: " + stopwatch.stop()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); }} 无堵塞处理即让多个线程异步并行执行任务,汇聚结果。以下是一个使用CompletableFuture的无堵塞处理示例:
private static void test3(String productName) { Stopwatch stopwatch = Stopwatch.createStarted(); final List stringList = Stream.of(new Shop("华强北"), new Shop("益田假日广场"), new Shop("香港九龙城"), new Shop("京东商城")) .map(item -> String.format("商店:%s的商品:%s 售价是:%s", item.getName(), productName, item.getPrice(productName))) .collect(Collectors.toList()); System.out.println(stringList); System.out.println("test3 done in " + stopwatch.stop());} 在有依赖关系的多个任务中,使用流水线操作可以有效管理依赖关系。以下是一个复杂的流水线操作示例:
private static void test4(String productName) { Stopwatch stopwatch = Stopwatch.createStarted(); final List stringList = Stream.of(new Shop("华强北"), new Shop("益田假日广场"), new Shop("香港九龙城"), new Shop("京东商城")) .map(shop -> shop.getPrice_discount(productName)) .map(Quote::parse) .map(DisCount::applyDiscount) .collect(Collectors.toList()); System.out.println(stringList); System.out.println("test4 done in " + stopwatch.stop());} Completion事件允许任务尽快完成,无需等待。以下是一个等待所有任务完成的示例:
public void findPriceStream(String productName) { List shops = Arrays.asList(new Shop("华强北"), new Shop("益田假日广场"), new Shop("香港九龙城"), new Shop("京东商城")); final CompletableFuture[] completableFutureArray = shops.stream() .map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice_discount(productName), pool)) .map(future -> future.thenApply(Quote::parse)) .map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(() -> DisCount.applyDiscount(quote), pool))) .map(f -> f.thenAccept(System.out::println)) .toArray(size -> new CompletableFuture[size]); CompletableFuture.allOf(completableFutureArray).join();} 通过以上方法,可以更高效地管理并发任务,提升程序的性能和用户体验。
转载地址:http://rsxfz.baihongyu.com/