diff --git a/src/main/java/io/qyi/e5/config/rabbitMQ/RabbitMQConfig.java b/src/main/java/io/qyi/e5/config/rabbitMQ/RabbitMQConfig.java index a2ce2d2..d52bc61 100644 --- a/src/main/java/io/qyi/e5/config/rabbitMQ/RabbitMQConfig.java +++ b/src/main/java/io/qyi/e5/config/rabbitMQ/RabbitMQConfig.java @@ -27,18 +27,7 @@ public class RabbitMQConfig { * */ @Bean public Queue fanoutQueue1() { - Map arguments = new HashMap<>(2); - arguments.put("x-dead-letter-exchange", "delay"); - arguments.put("x-dead-letter-routing-key", "delay_key"); - return new Queue("delay_queue1", true, false, false, arguments); - } - /** - * - *创建消息处理队列 - */ - @Bean - public Queue fanoutQueue2() { - return new Queue("delay_queue2", true); // 队列持久 + return new Queue("delay_queue1", true, false, false); } /** @@ -51,8 +40,10 @@ public class RabbitMQConfig { * @return */ @Bean - public DirectExchange fanoutExchangeDelay() { - return new DirectExchange("delay",true, false); + public CustomExchange customExchangeDelay() { + Map arg = new HashMap<>(); + arg.put("x-delayed-type", "direct"); + return new CustomExchange("delay","x-delayed-message",true, false,arg); } /*@Bean @@ -63,13 +54,8 @@ public class RabbitMQConfig { //绑定 将队列和交换机绑定, @Bean public Binding bindingFanoutQueue1() { - return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchangeDelay()).with("delay"); + return BindingBuilder.bind(fanoutQueue1()).to(customExchangeDelay()).with("delay").noargs(); } - @Bean - public Binding bindingFanoutQueue2() { - return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchangeDelay()).with("delay_key"); - } - // 无限循环问题 diff --git a/src/main/java/io/qyi/e5/service/rabbitMQ/impl/ListenerImpl.java b/src/main/java/io/qyi/e5/service/rabbitMQ/impl/ListenerImpl.java index 2fc606e..c2617b6 100644 --- a/src/main/java/io/qyi/e5/service/rabbitMQ/impl/ListenerImpl.java +++ b/src/main/java/io/qyi/e5/service/rabbitMQ/impl/ListenerImpl.java @@ -35,7 +35,7 @@ public class ListenerImpl { ITask Task; @RabbitHandler - @RabbitListener(queues = "delay_queue2", containerFactory = "rabbitListenerContainerFactory") + @RabbitListener(queues = "delay_queue1", containerFactory = "rabbitListenerContainerFactory") public void listen(Message message, Channel channel) throws IOException { logger.info("消费者1开始处理消息: {},时间戳:{}" ,message,System.currentTimeMillis()); System.out.println("消费者1开始处理消息:"+System.currentTimeMillis()); diff --git a/src/main/java/io/qyi/e5/service/task/impl/TaskImpl.java b/src/main/java/io/qyi/e5/service/task/impl/TaskImpl.java index aece88d..29d0a44 100644 --- a/src/main/java/io/qyi/e5/service/task/impl/TaskImpl.java +++ b/src/main/java/io/qyi/e5/service/task/impl/TaskImpl.java @@ -56,7 +56,7 @@ public class TaskImpl implements ITask { Outlook next = iterator.next(); /*根据用户设置生成随机数*/ String Expiration = getRandom(next.getCronTimeRandomStart(), next.getCronTimeRandomEnd()); - System.out.println("生成随机调用时间,github ID" + next.getGithubId() + ",时间:" + Expiration); +// System.out.println("生成随机调用时间,github ID" + next.getGithubId() + ",时间:" + Expiration); send(next.getGithubId(), Expiration); } } @@ -88,7 +88,9 @@ public class TaskImpl implements ITask { rabbitTemplate.convertAndSend("delay", "delay", msg, message -> { MessageProperties messageProperties = message.getMessageProperties(); // 设置这条消息的过期时间 - messageProperties.setExpiration(Expiration); +// messageProperties.setExpiration(Expiration); + + messageProperties.setHeader("x-delay",Expiration); return message; }, correlationData); }