踩坑之CompletableFuture异步处理吞异常阻塞排查

背景: 之前排查过一个问题,项目中定时任务走着走着就停了,后台日志也看不到任何报错,一直找不到原因。

private void sendJz(String rpUrl, Date latestHyPushTime, Date curDate, List<CompletableFuture<RpPushLogEntity>> futureList,
                    DzjzJzDO dzjzJzDO) {
	// 代码……
    CompletableFuture<RpPushLogEntity> future = CompletableFuture.supplyAsync(() ->
            doSendJz(dzjzJzDO, latestRpPushLog, rpUrl, curDate, latestHyPushTime), pool);
    futureList.add(future);
}

public RpPushLogEntity doSendJz(DzjzJzDO dzjzJzDO, RpPushLogEntity latestRpPushLogEntity, String rpUrl, Date pushTime,
                                Date latestHyPushTime) {
    //代码……
    dealSecondMlCl(dossierId);
    //代码……
    String xml;
    try {
        xml = creatXml(dzjzJzDO, rpXml, latestRpPushLogEntity.getVersion(), latestHyPushTime, pushTime);
    } catch (Exception e) {
        //代码……
    }
    xml = formatXML(xml);
    //代码……
}

通过各种加日志排查定位,发现是在doSendJz()方法中有报错,但是报错信息被吞了,于是怀疑到是CompletableFuture.supplyAsync的问题,在网上经过一番搜索之后,发现这个玩意儿确实会吞掉异常并且发生阻塞。

后来,我们在doSendJz()中,把所有可能抛异常的位置都进行了异常捕获,问题就解决了!

public RpPushLogEntity doSendJz(DzjzJzDO dzjzJzDO, RpPushLogEntity latestRpPushLogEntity, String rpUrl, Date pushTime,
                                Date latestHyPushTime) {
    //代码……
    try {
        dealSecondMlCl(dossierId);
    } catch (Exception e) {
        log.error("推送异常!", dossierId, e);
        message = "推送失败!";
        hyPushLogEntity.setMessage(message);
        hyPushLogEntity.setResult(RpHyPushResultEnum.PUSH_FAILED.getStatus());
        return hyPushLogEntity;
    }
    //代码……
    String xml;
    try {
        xml = creatXml(dzjzJzDO, rpXml, latestRpPushLogEntity.getVersion(), latestHyPushTime, pushTime);
    } catch (Exception e) {
        //代码……
    }
    try {
        xml = formatXML(xml);
    } catch (Exception e) {
        log.error("格式化xml失败,案件标识:{}", entityBs, e);
        hyPushLogEntity.setResult(RpHyPushResultEnum.PUSH_FAILED.getStatus());
        hyPushLogEntity.setMessage("格式化xml出错了," + e.getMessage());
        return hyPushLogEntity;
    }
    //代码……
}

关于CompletableFuture异步处理吞异常阻塞的其他实践

CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
            int a = 0;
            int b = 100;
            int c = b / a;
            return true;
        }, new ThreadPoolExecutor(5, 5, 5L,
                TimeUnit.SECONDS, new ArrayBlockingQueue<>(50)))
                .thenAccept(result -> System.out.println("result:" + result));

可以看到 int c = b / a; 这一行本该会抛出异常byZero,可是事实是运行调试至这一行就不再往下运行了,也没有任何的报错。

Future不同于Runnable需要获取返回值,才能获取异常信息,而Runnable可以直接抛出异常。所以可以使用

future.get();

来获取异步调用的返回值,但是该方法会阻塞等待,如果这个任务执行了9分钟,那么接口调用方还是会等待很长时间,不推荐使用。

解决这个问题也可以使用下面的方法:

CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
            int a = 0;
            int b = 100;
            int c = b / a;
            return true;
        }, new ThreadPoolExecutor(5, 5, 5L,
                TimeUnit.SECONDS, new ArrayBlockingQueue<>(50)))
                .exceptionally(ex -> {
                    System.out.println(ex);
                    return false;
                })
                .thenAccept(result -> System.out.println("result:" + result));

CompletableFuture异步处理使用

CompletableFuture 是 Java 中用于异步编程的类,它提供了一种方便的方式来处理异步操作的结果。它是 java.util.concurrent.CompletionStage 接口的一个实现,用于表示一个可能异步计算的结果,并且能够以链式的方式对其进行操作。

以下是 CompletableFuture 的一些重要特性和用法:

创建 CompletableFuture

  1. 使用静态方法创建

    CompletableFuture<String> future = CompletableFuture.completedFuture("Hello");
    
  2. 手动创建

    CompletableFuture<String> future = new CompletableFuture<>();
    // 执行某个操作,完成 CompletableFuture
    future.complete("World");
    

异步执行任务

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    // 异步执行的任务
    System.out.println("Doing some task asynchronously");
});

异步执行任务并返回结果

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // 异步执行的任务,返回结果
    return "Task Result";
});

链式操作

CompletableFuture 支持链式操作,允许你按顺序组合多个异步操作,处理其结果或进行其他操作。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
    .thenApplyAsync(result -> result + " World")
    .thenApplyAsync(String::toUpperCase);

异常处理

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // 有可能抛出异常的异步操作
    throw new RuntimeException("Exception occurred!");
}).exceptionally(ex -> "Handled Exception: " + ex.getMessage());

组合多个 CompletableFuture

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");

CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (result1, result2) -> result1 + " " + result2);

等待 CompletableFuture 完成并获取结果

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");
String result = future.get(); // 阻塞直到获取结果
异步任务组合和并发处理是 CompletableFuture 的强大之处,使得编写并发、异步的程序变得更加简单和灵活。但是,需要小心处理异常、确保资源的正确释放,并了解并发操作可能带来的竞态条件和同步问题。