踩坑之CompletableFuture异步处理吞异常阻塞排查
背景: 之前排查过一个问题,项目中定时任务走着走着就停了,后台日志也看不到任何报错,一直找不到原因。
private void sendJz(String rpUrl, Date latestHyPushTime, Date curDate, List<CompletableFuture<RpPushLogEntity>> futureList,
DzjzJzDO dzjzJzDO) {
// 代码……
CompletableFuture<RpPushLogEntity> future = CompletableFuture.supplyAsync(() ->
doSendJz(dzjzJzDO, latestRpPushLog, rpUrl, curDate, latestHyPushTime), pool);
futureList.add(future);
}
public RpPushLogEntity doSendJz(DzjzJzDO dzjzJzDO, RpPushLogEntity latestRpPushLogEntity, String rpUrl, Date pushTime,
Date latestHyPushTime) {
//代码……
dealSecondMlCl(dossierId);
//代码……
String xml;
try {
xml = creatXml(dzjzJzDO, rpXml, latestRpPushLogEntity.getVersion(), latestHyPushTime, pushTime);
} catch (Exception e) {
//代码……
}
xml = formatXML(xml);
//代码……
}
通过各种加日志排查定位,发现是在doSendJz()方法中有报错,但是报错信息被吞了,于是怀疑到是CompletableFuture.supplyAsync的问题,在网上经过一番搜索之后,发现这个玩意儿确实会吞掉异常并且发生阻塞。
后来,我们在doSendJz()中,把所有可能抛异常的位置都进行了异常捕获,问题就解决了!
public RpPushLogEntity doSendJz(DzjzJzDO dzjzJzDO, RpPushLogEntity latestRpPushLogEntity, String rpUrl, Date pushTime,
Date latestHyPushTime) {
//代码……
try {
dealSecondMlCl(dossierId);
} catch (Exception e) {
log.error("推送异常!", dossierId, e);
message = "推送失败!";
hyPushLogEntity.setMessage(message);
hyPushLogEntity.setResult(RpHyPushResultEnum.PUSH_FAILED.getStatus());
return hyPushLogEntity;
}
//代码……
String xml;
try {
xml = creatXml(dzjzJzDO, rpXml, latestRpPushLogEntity.getVersion(), latestHyPushTime, pushTime);
} catch (Exception e) {
//代码……
}
try {
xml = formatXML(xml);
} catch (Exception e) {
log.error("格式化xml失败,案件标识:{}", entityBs, e);
hyPushLogEntity.setResult(RpHyPushResultEnum.PUSH_FAILED.getStatus());
hyPushLogEntity.setMessage("格式化xml出错了," + e.getMessage());
return hyPushLogEntity;
}
//代码……
}
关于CompletableFuture异步处理吞异常阻塞的其他实践
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
int a = 0;
int b = 100;
int c = b / a;
return true;
}, new ThreadPoolExecutor(5, 5, 5L,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(50)))
.thenAccept(result -> System.out.println("result:" + result));
可以看到 int c = b / a; 这一行本该会抛出异常byZero,可是事实是运行调试至这一行就不再往下运行了,也没有任何的报错。
Future不同于Runnable需要获取返回值,才能获取异常信息,而Runnable可以直接抛出异常。所以可以使用
future.get();
来获取异步调用的返回值,但是该方法会阻塞等待,如果这个任务执行了9分钟,那么接口调用方还是会等待很长时间,不推荐使用。
解决这个问题也可以使用下面的方法:
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
int a = 0;
int b = 100;
int c = b / a;
return true;
}, new ThreadPoolExecutor(5, 5, 5L,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(50)))
.exceptionally(ex -> {
System.out.println(ex);
return false;
})
.thenAccept(result -> System.out.println("result:" + result));
CompletableFuture异步处理使用
CompletableFuture
是 Java 中用于异步编程的类,它提供了一种方便的方式来处理异步操作的结果。它是 java.util.concurrent.CompletionStage
接口的一个实现,用于表示一个可能异步计算的结果,并且能够以链式的方式对其进行操作。
以下是 CompletableFuture
的一些重要特性和用法:
创建 CompletableFuture
-
使用静态方法创建:
CompletableFuture<String> future = CompletableFuture.completedFuture("Hello");
-
手动创建:
CompletableFuture<String> future = new CompletableFuture<>(); // 执行某个操作,完成 CompletableFuture future.complete("World");
异步执行任务
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
// 异步执行的任务
System.out.println("Doing some task asynchronously");
});
异步执行任务并返回结果
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 异步执行的任务,返回结果
return "Task Result";
});
链式操作
CompletableFuture
支持链式操作,允许你按顺序组合多个异步操作,处理其结果或进行其他操作。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
.thenApplyAsync(result -> result + " World")
.thenApplyAsync(String::toUpperCase);
异常处理
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 有可能抛出异常的异步操作
throw new RuntimeException("Exception occurred!");
}).exceptionally(ex -> "Handled Exception: " + ex.getMessage());
组合多个 CompletableFuture
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");
CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (result1, result2) -> result1 + " " + result2);
等待 CompletableFuture 完成并获取结果
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");
String result = future.get(); // 阻塞直到获取结果