修复rabbitmq ttl 过期时间问题

This commit is contained in:
APLS 2020-04-21 01:15:20 +08:00
parent 7aa57ae727
commit 8a5f152315
3 changed files with 11 additions and 23 deletions

View File

@ -27,18 +27,7 @@ public class RabbitMQConfig {
* */ * */
@Bean @Bean
public Queue fanoutQueue1() { public Queue fanoutQueue1() {
Map<String, Object> arguments = new HashMap<>(2); return new Queue("delay_queue1", true, false, false);
arguments.put("x-dead-letter-exchange", "delay");
arguments.put("x-dead-letter-routing-key", "delay_key");
return new Queue("delay_queue1", true, false, false, arguments);
}
/**
*
*创建消息处理队列
*/
@Bean
public Queue fanoutQueue2() {
return new Queue("delay_queue2", true); // 队列持久
} }
/** /**
@ -51,8 +40,10 @@ public class RabbitMQConfig {
* @return * @return
*/ */
@Bean @Bean
public DirectExchange fanoutExchangeDelay() { public CustomExchange customExchangeDelay() {
return new DirectExchange("delay",true, false); Map<String, Object> arg = new HashMap<>();
arg.put("x-delayed-type", "direct");
return new CustomExchange("delay","x-delayed-message",true, false,arg);
} }
/*@Bean /*@Bean
@ -63,13 +54,8 @@ public class RabbitMQConfig {
//绑定 将队列和交换机绑定, //绑定 将队列和交换机绑定,
@Bean @Bean
public Binding bindingFanoutQueue1() { public Binding bindingFanoutQueue1() {
return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchangeDelay()).with("delay"); return BindingBuilder.bind(fanoutQueue1()).to(customExchangeDelay()).with("delay").noargs();
} }
@Bean
public Binding bindingFanoutQueue2() {
return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchangeDelay()).with("delay_key");
}
// 无限循环问题 // 无限循环问题

View File

@ -35,7 +35,7 @@ public class ListenerImpl {
ITask Task; ITask Task;
@RabbitHandler @RabbitHandler
@RabbitListener(queues = "delay_queue2", containerFactory = "rabbitListenerContainerFactory") @RabbitListener(queues = "delay_queue1", containerFactory = "rabbitListenerContainerFactory")
public void listen(Message message, Channel channel) throws IOException { public void listen(Message message, Channel channel) throws IOException {
logger.info("消费者1开始处理消息 {},时间戳:{}" ,message,System.currentTimeMillis()); logger.info("消费者1开始处理消息 {},时间戳:{}" ,message,System.currentTimeMillis());
System.out.println("消费者1开始处理消息"+System.currentTimeMillis()); System.out.println("消费者1开始处理消息"+System.currentTimeMillis());

View File

@ -56,7 +56,7 @@ public class TaskImpl implements ITask {
Outlook next = iterator.next(); Outlook next = iterator.next();
/*根据用户设置生成随机数*/ /*根据用户设置生成随机数*/
String Expiration = getRandom(next.getCronTimeRandomStart(), next.getCronTimeRandomEnd()); String Expiration = getRandom(next.getCronTimeRandomStart(), next.getCronTimeRandomEnd());
System.out.println("生成随机调用时间,github ID" + next.getGithubId() + ",时间:" + Expiration); // System.out.println("生成随机调用时间,github ID" + next.getGithubId() + ",时间:" + Expiration);
send(next.getGithubId(), Expiration); send(next.getGithubId(), Expiration);
} }
} }
@ -88,7 +88,9 @@ public class TaskImpl implements ITask {
rabbitTemplate.convertAndSend("delay", "delay", msg, message -> { rabbitTemplate.convertAndSend("delay", "delay", msg, message -> {
MessageProperties messageProperties = message.getMessageProperties(); MessageProperties messageProperties = message.getMessageProperties();
// 设置这条消息的过期时间 // 设置这条消息的过期时间
messageProperties.setExpiration(Expiration); // messageProperties.setExpiration(Expiration);
messageProperties.setHeader("x-delay",Expiration);
return message; return message;
}, correlationData); }, correlationData);
} }