博客
关于我
java8-CompleableFuture的使用1
阅读量:457 次
发布时间:2019-03-06

本文共 6933 字,大约阅读时间需要 23 分钟。

CompletableFuture与并发编程的深入探讨

在当今的软件开发中,多核心CPU和分布式架构已经成为常态。功能API的混聚方式不仅让用户生活变得便利,也推动了软件架构的演进。然而,如何充分发挥多核能力,切分大型任务,让每个子任务并行运行,仍然是一个值得深入探讨的问题。

并发与并行的区别

并发和并行是两个常见的概念,但它们在实现细节上有显著区别。

项目 区别1 实现技术
并行 每个任务跑在单独的CPU核心上 分支合并框架,并行流
并发 不同任务共享CPU核心,基于时间片调度 CompletableFuture

Future接口

Java5引入了Future接口,用于建模将来某个时刻发生的事情。通过Future,我们可以异步执行耗时任务,避免主线程白白等待。以下是一个使用Future的示例:

package com.test.completable;
import com.google.common.base.Stopwatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class FutureTest {
static final ExecutorService pool = Executors.newFixedThreadPool(2);
public static void main(String[] args) {
Stopwatch stopwatch = Stopwatch.createStarted();
Future
longFuture = pool.submit(() -> doSomethingLongTime());
doSomething2();
try {
final Long longValue = longFuture.get(3, TimeUnit.SECONDS);
System.out.println(Thread.currentThread().getName() + " future return value :" + longValue + " : " + stopwatch.stop());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
pool.shutdown();
}
private static void doSomething2() {
Stopwatch stopwatch = Stopwatch.createStarted();
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " doSomething2 :" + stopwatch.stop());
}
private static Long doSomethingLongTime() {
Stopwatch stopwatch = Stopwatch.createStarted();
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " doSomethingLongTime : " + stopwatch.stop());
return 1000L;
}
}

CompletableFuture的优势

Java8引入了CompletableFuture类,优化了异步任务的处理。它支持更灵活的异常管理和异步API实现。以下是一个基于CompletableFuture的价格查询示例:

package com.test.completable;
import com.google.common.base.Stopwatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class Test {
public static void main(String[] args) {
Shop shop = new Shop("BestShop");
Stopwatch stopwatch = Stopwatch.createStarted();
Future
doubleFuture = shop.getPriceFuture("pizza");
System.out.println("getPriceFuture return after: " + stopwatch.stop());
doSomethingElse();
try {
final Double price = doubleFuture.get();
System.out.println("price is " + price + " return after: " + stopwatch.stop());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
private static void doSomethingElse() {
Stopwatch stopwatch = Stopwatch.createStarted();
DelayUtil.delay();
System.out.println("doSomethingElse " + stopwatch.stop());
}
}

异步API的实现

提供异步API需要考虑以下技能点:

  • 提供异步API:通过CompletableFuture实现异步调用。
  • 同步API到异步API的转换:使用流水线将任务合并为一个异步计算操作。
  • 响应式处理:响应异步操作的完成事件。
  • 以下是一个修改后的同步API为异步API的示例:

    public class Shop {
    private final String name;
    public Shop(String name) {
    this.name = name;
    }
    public Future
    getPriceFuture(String product) {
    final CompletableFuture
    doubleCompletableFuture = new CompletableFuture<>();
    new Thread(() -> {
    try {
    doubleCompletableFuture.complete(getPrice(product));
    } catch (Exception ex) {
    doubleCompletableFuture.completeExceptionally(ex);
    }
    }).start();
    return doubleCompletableFuture;
    }
    public Double getPrice(String product) throws InterruptedException {
    DelayUtil.delay();
    return 100.0;
    }
    }

    错误处理

    在异步任务中,错误处理也是关键。CompletableFuture支持异常传播,确保主线程能够接管子任务抛出的异常:

    private static void test2() {
    Shop shop = new Shop("BestShop");
    Stopwatch stopwatch = Stopwatch.createStarted();
    Future
    doubleFuture = shop.getPriceFutureException("pizza");
    System.out.println("getPriceFuture return after: " + stopwatch.stop());
    doSomethingElse();
    try {
    final Double price = doubleFuture.get();
    System.out.println("price is " + price + " return after: " + stopwatch.stop());
    } catch (InterruptedException e) {
    e.printStackTrace();
    } catch (ExecutionException e) {
    e.printStackTrace();
    }
    }

    无堵塞处理

    无堵塞处理即让多个线程异步并行执行任务,汇聚结果。以下是一个使用CompletableFuture的无堵塞处理示例:

    private static void test3(String productName) {
    Stopwatch stopwatch = Stopwatch.createStarted();
    final List
    stringList = Stream.of(new Shop("华强北"), new Shop("益田假日广场"), new Shop("香港九龙城"), new Shop("京东商城"))
    .map(item -> String.format("商店:%s的商品:%s 售价是:%s", item.getName(), productName, item.getPrice(productName)))
    .collect(Collectors.toList());
    System.out.println(stringList);
    System.out.println("test3 done in " + stopwatch.stop());
    }

    多个异步任务的流水线操作

    在有依赖关系的多个任务中,使用流水线操作可以有效管理依赖关系。以下是一个复杂的流水线操作示例:

    private static void test4(String productName) {
    Stopwatch stopwatch = Stopwatch.createStarted();
    final List
    stringList = Stream.of(new Shop("华强北"), new Shop("益田假日广场"), new Shop("香港九龙城"), new Shop("京东商城"))
    .map(shop -> shop.getPrice_discount(productName))
    .map(Quote::parse)
    .map(DisCount::applyDiscount)
    .collect(Collectors.toList());
    System.out.println(stringList);
    System.out.println("test4 done in " + stopwatch.stop());
    }

    Completion事件

    Completion事件允许任务尽快完成,无需等待。以下是一个等待所有任务完成的示例:

    public void findPriceStream(String productName) {
    List
    shops = Arrays.asList(new Shop("华强北"), new Shop("益田假日广场"), new Shop("香港九龙城"), new Shop("京东商城"));
    final CompletableFuture[] completableFutureArray = shops.stream()
    .map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice_discount(productName), pool))
    .map(future -> future.thenApply(Quote::parse))
    .map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(() -> DisCount.applyDiscount(quote), pool)))
    .map(f -> f.thenAccept(System.out::println))
    .toArray(size -> new CompletableFuture[size]);
    CompletableFuture.allOf(completableFutureArray).join();
    }

    小结

  • 异步任务的应用:在需要执行耗时操作时,尤其是依赖远程服务的操作,可以通过异步任务提升程序性能,加快响应速度。
  • CompletableFuture的优势:它提供了灵活的异常管理机制,使主线程能够接管子任务抛出的异常。
  • API转换:将同步API封装到CompletableFuture中,可以异步获得结果。
  • 任务聚合:在多个异步任务之间,使用thenCompose等方法进行聚合,处理任务间的依赖关系。
  • 事件响应:通过CompletionStage的事件响应机制,实现更高效的异步任务管理。
  • 通过以上方法,可以更高效地管理并发任务,提升程序的性能和用户体验。

    转载地址:http://rsxfz.baihongyu.com/

    你可能感兴趣的文章
    Numpy.ndarray对象不可调用
    查看>>
    Numpy如何使用np.umprod重写range函数中i的python
    查看>>
    numpy数组替换其中的值(如1替换为255)
    查看>>
    numpy数组索引-ChatGPT4o作答
    查看>>
    numpy转PIL 报错TypeError: Cannot handle this data type
    查看>>
    NutzCodeInsight 2.0.7 发布,为 nutz-sqltpl 提供友好的 ide 支持
    查看>>
    NUUO网络视频录像机 css_parser.php 任意文件读取漏洞复现
    查看>>
    NVelocity标签使用详解
    查看>>
    nvidia-htop 使用教程
    查看>>
    oauth2-shiro 添加 redis 实现版本
    查看>>
    OAuth2.0_JWT令牌-生成令牌和校验令牌_Spring Security OAuth2.0认证授权---springcloud工作笔记148
    查看>>
    OAuth2.0_JWT令牌介绍_Spring Security OAuth2.0认证授权---springcloud工作笔记147
    查看>>
    OAuth2.0_介绍_Spring Security OAuth2.0认证授权---springcloud工作笔记137
    查看>>
    OAuth2.0_完善环境配置_把资源微服务客户端信息_授权码存入到数据库_Spring Security OAuth2.0认证授权---springcloud工作笔记149
    查看>>
    OAuth2.0_授权服务配置_Spring Security OAuth2.0认证授权---springcloud工作笔记140
    查看>>
    OAuth2.0_授权服务配置_令牌服务和令牌端点配置_Spring Security OAuth2.0认证授权---springcloud工作笔记143
    查看>>
    OAuth2.0_授权服务配置_客户端详情配置_Spring Security OAuth2.0认证授权---springcloud工作笔记142
    查看>>
    OAuth2.0_授权服务配置_密码模式及其他模式_Spring Security OAuth2.0认证授权---springcloud工作笔记145
    查看>>
    OAuth2.0_授权服务配置_资源服务测试_Spring Security OAuth2.0认证授权---springcloud工作笔记146
    查看>>
    OAuth2.0_环境介绍_授权服务和资源服务_Spring Security OAuth2.0认证授权---springcloud工作笔记138
    查看>>