当前位置: 主页 > JAVA语言

java 延迟队列-java kafka 队列

发布时间:2023-02-09 22:16   浏览次数:次   作者:佚名

Rabbitmq通过延迟插件实现延迟队列

文章目录

DLX+TTL有时序问题

由于队列的先进先出特性,延迟队列是使用死信队列(DLX)实现的java 延迟队列,并为每条消息设置过期时间(TTL),会存在定时问题。 也就是队列头部的消息过期,这样time比较长的话,会导致队列后面的过期时间比较短的消息,过期后不会被消费。 可以通过安装Rabbitmq的延迟插件实现延迟队列功能

安装延时插件下载地址

rabbitmq-delayed-message-exchange 插件可以在这里下载:RabbitMQ Delayed Plugin

你也可以从github上下载:RabbitMQ Delayed Message Plugin

(注意插件版本,本插件适配版本为3.5.8及以后版本)

安装

登录Linux服务器java 延迟队列,将插件复制到这个路径:/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.15/plugins/

然后执行以下命令:

# 开启插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# 重启 rabbitmq
/sbin/service rabbitmq-server restart
# 查看插件是否安装成功
sudo rabbitmq-plugins list

Java代码实现

@Configuration
public class RabbitConfig implements ApplicationContextAware {
    
    private ApplicationContext applicationContext;    
    
    @PostConstruct
    public RabbitAdmin rabbitAdmin() {
        RabbitAdmin rabbitAdmin = applicationContext.getBean("rabbitAdmin", RabbitAdmin.class);
        TopicExchange exchange = new TopicExchange("exchange.delay");
        // 交换器设置延迟属性
        exchange.setDelayed(true);				                  
        rabbitAdmin.declareQueue(new Queue("queue.delay"));
        rabbitAdmin.declareExchange(exchange);
        rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("queue.delay")).to(exchange).with("rountingkey.delay");
        return rabbitAdmin;
    }
}
 // 消息发送器                                  
public class SendMQService {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    // 发送消息
    public void sendMessage(String exchange, String routingKey, String msg, Integer expirationTime) {
        rabbitTemplate.convertAndSend(exchange, routingKey, msg, message -> {
            // 给每条消息设置过期时间
            message.getMessageProperties().setExpiration(expirationTime);
            return message;
        });
    }
}
// 消息监听器, 交换器 delayed = "true"
@Component
@RabbitListener(containerFactory = "listenerContainerFactory",
    bindings = @QueueBinding(value = @Queue(value = "queue.delay"),
        exchange = @Exchange(value = "exchange.delay", type = ExchangeTypes.TOPIC, delayed = "true"),
        key = "rountingkey.delay"))
@Slf4j
public class MsgListener {
    @RabbitHandler
    public void msgHandler(String msg) {
        log.info("接收到的延迟消息 [{}]",msg)
    }
}