我如何调用新的@Async方法,但要等待条件?
来源:爱站网时间:2021-09-16编辑:网友分享
我有一个实现AMQP MessageListener的Spring Boot应用。该侦听器以池大小调用由ThreadPoolTask Executor管理的@Async方法。当有很多...
问题描述
我有一个实现AMQP MessageListener的Spring Boot应用。该侦听器以池大小调用由ThreadPoolTaskExecutor管理的@Async方法。当有很多传入消息时,会发生此问题,因此这些消息会丢失,因为没有可用的异步工作器。
我正在使用Spring Core 5.0.7-RELEASE,Java 8
这是我的代码:
AsyncConfigurator:
@EnableAsync
@Configuration
public class AsyncConfiguration extends AsyncConfigurerSupport {
@Override
@Bean("docThreadPoolTaskExecutor")
public Executor getAsyncExecutor() {
final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(8);
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setThreadNamePrefix("DocWorkerThread-");
executor.initialize();
return executor;
}
我的后备服务(MyAsyncService):
@Async("docThreadPoolTaskExecutor")
@Override
public void generaDocumento(String idUser, int year) {
//... some heavy and slow process
}
我的消息侦听器:
...
@Autowired
private MyAsyncService myAsyncService;
@Override
@EntryPoint
public void onMessage(Message message) {
try {
final String mensaje = new String(message.getBody(), StandardCharsets.UTF_8);
final MyPojo payload = JsonUtils.readFromJson(mensaje , MyPojo.class);
myAsyncService.generaDocumento(payload.getIdUser(), Integer.valueOf(payload.getYear()));
} catch ( Throwable t ) {
throw new AmqpRejectAndDontRequeueException( t );
}
}
我需要有人给我一些解决方案。
思路:
ThreadPoolTaskExecutor
默认情况下有一个队列。到达corePoolSize
时,应将消息添加到队列中。当队列已满时,将启动更多工作线程,直到您到达maxPoolSize
。您可以通过executor.getThreadPoolExecutor().getQueue().size()
检查当前队列的大小,以验证消息是否真的丢失或只是停留在队列中。