java 延迟队列-java kafka 队列
发布时间:2023-02-09 22:16 浏览次数:次 作者:佚名
Rabbitmq通过延迟插件实现延迟队列
文章目录
DLX+TTL有时序问题
由于队列的先进先出特性,延迟队列是使用死信队列(DLX)实现的java 延迟队列,并为每条消息设置过期时间(TTL),会存在定时问题。 也就是队列头部的消息过期,这样time比较长的话,会导致队列后面的过期时间比较短的消息,过期后不会被消费。 可以通过安装Rabbitmq的延迟插件实现延迟队列功能
安装延时插件下载地址
rabbitmq-delayed-message-exchange 插件可以在这里下载:RabbitMQ Delayed Plugin
你也可以从github上下载:RabbitMQ Delayed Message Plugin
(注意插件版本,本插件适配版本为3.5.8及以后版本)
安装
登录Linux服务器java 延迟队列,将插件复制到这个路径:/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.15/plugins/
然后执行以下命令:
# 开启插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# 重启 rabbitmq
/sbin/service rabbitmq-server restart
# 查看插件是否安装成功
sudo rabbitmq-plugins list
Java代码实现
@Configuration
public class RabbitConfig implements ApplicationContextAware {
private ApplicationContext applicationContext;
@PostConstruct
public RabbitAdmin rabbitAdmin() {
RabbitAdmin rabbitAdmin = applicationContext.getBean("rabbitAdmin", RabbitAdmin.class);
TopicExchange exchange = new TopicExchange("exchange.delay");
// 交换器设置延迟属性
exchange.setDelayed(true);
rabbitAdmin.declareQueue(new Queue("queue.delay"));
rabbitAdmin.declareExchange(exchange);
rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("queue.delay")).to(exchange).with("rountingkey.delay");
return rabbitAdmin;
}
}
// 消息发送器
public class SendMQService {
@Autowired
private RabbitTemplate rabbitTemplate;
// 发送消息
public void sendMessage(String exchange, String routingKey, String msg, Integer expirationTime) {
rabbitTemplate.convertAndSend(exchange, routingKey, msg, message -> {
// 给每条消息设置过期时间
message.getMessageProperties().setExpiration(expirationTime);
return message;
});
}
}
// 消息监听器, 交换器 delayed = "true"
@Component
@RabbitListener(containerFactory = "listenerContainerFactory",
bindings = @QueueBinding(value = @Queue(value = "queue.delay"),
exchange = @Exchange(value = "exchange.delay", type = ExchangeTypes.TOPIC, delayed = "true"),
key = "rountingkey.delay"))
@Slf4j
public class MsgListener {
@RabbitHandler
public void msgHandler(String msg) {
log.info("接收到的延迟消息 [{}]",msg)
}
}