我如何调用新的@Async方法,但要等待条件?

来源:爱站网时间:2021-09-16编辑:网友分享
我有一个实现AMQP MessageListener的Spring Boot应用。该侦听器以池大小调用由ThreadPoolTask Executor管理的@Async方法。当有很多...

问题描述


我有一个实现AMQP MessageListener的Spring Boot应用。该侦听器以池大小调用由ThreadPoolTask​​Executor管理的@Async方法。当有很多传入消息时,会发生此问题,因此这些消息会丢失,因为没有可用的异步工作器。

我正在使用Spring Core 5.0.7-RELEASE,Java 8

这是我的代码:

AsyncConfigurator:

@EnableAsync
@Configuration
public class AsyncConfiguration extends AsyncConfigurerSupport {

    @Override
    @Bean("docThreadPoolTaskExecutor")
    public Executor getAsyncExecutor() {
        final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(4);
        executor.setMaxPoolSize(8);
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setThreadNamePrefix("DocWorkerThread-");
        executor.initialize();
        return executor;
    }

我的后备服务(MyAsyncService):


    @Async("docThreadPoolTaskExecutor")
    @Override
    public void generaDocumento(String idUser, int year) {
       //... some heavy and slow process
    }

我的消息侦听器:

...
    @Autowired
    private MyAsyncService myAsyncService;

    @Override
    @EntryPoint
    public void onMessage(Message message) {
        try {
            final String mensaje = new String(message.getBody(), StandardCharsets.UTF_8); 
            final MyPojo payload = JsonUtils.readFromJson(mensaje , MyPojo.class);

            myAsyncService.generaDocumento(payload.getIdUser(), Integer.valueOf(payload.getYear()));

        } catch ( Throwable t ) { 
            throw new AmqpRejectAndDontRequeueException( t );
        }
    }

我需要有人给我一些解决方案。

思路:


ThreadPoolTaskExecutor默认情况下有一个队列。到达corePoolSize时,应将消息添加到队列中。当队列已满时,将启动更多工作线程,直到您到达maxPoolSize。您可以通过executor.getThreadPoolExecutor().getQueue().size()检查当前队列的大小,以验证消息是否真的丢失或只是停留在队列中。

上一篇:水平滚动屏幕上的Appium滚动问题

下一篇:从SQL查询休眠Java中获取对象

您可能感兴趣的文章

相关阅读

热门软件源码

最新软件源码下载