diff --git a/src/main/java/io/qyi/e5/config/rabbitMQ/RabbitMQConfig.java b/src/main/java/io/qyi/e5/config/rabbitMQ/RabbitMQConfig.java new file mode 100644 index 0000000..4b0e651 --- /dev/null +++ b/src/main/java/io/qyi/e5/config/rabbitMQ/RabbitMQConfig.java @@ -0,0 +1,92 @@ +package io.qyi.e5.config.rabbitMQ; + +import org.springframework.amqp.core.*; +import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.HashMap; +import java.util.Map; + +/** + * @program: msgpush + * @description: + * @author: 落叶随风 + * @create: 2020-01-12 22:00 + **/ +@Configuration +public class RabbitMQConfig { + @Value("") + private String DirectQueueName; + /** + * 处理死信队列的消费队列 + * */ + @Bean + public Queue fanoutQueue1() { + Map arguments = new HashMap<>(2); + arguments.put("x-dead-letter-exchange", "delay"); + arguments.put("x-dead-letter-routing-key", "delay_key"); + return new Queue("delay_queue1", true, false, false, arguments); + } + /** + * + *创建消息处理队列 + */ + @Bean + public Queue fanoutQueue2() { + return new Queue("delay_queue2", true); // 队列持久 + } + + /** + * 配置消息交换机 + * 针对消费者配置 + * FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念 + * HeadersExchange :通过添加属性key-value匹配 + * DirectExchange:按照routingkey分发到指定队列 + * TopicExchange:多关键字匹配 + * @return + */ + @Bean + public DirectExchange fanoutExchangeDelay() { + return new DirectExchange("delay",true, false); + } + + /*@Bean + public FanoutExchange fanoutExchangeTencentMsg() { + return new FanoutExchange(EXCHANGE); + }*/ + + //绑定 将队列和交换机绑定, + @Bean + public Binding bindingFanoutQueue1() { + return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchangeDelay()).with("delay"); + } + @Bean + public Binding bindingFanoutQueue2() { + return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchangeDelay()).with("delay_key"); + } + + + + // 无限循环问题 + @Bean + public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { + RabbitTemplate template = new RabbitTemplate(connectionFactory); + template.setMessageConverter(new Jackson2JsonMessageConverter()); + return template; + } + + @Bean + public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { + SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); + factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); + factory.setConnectionFactory(connectionFactory); + factory.setMessageConverter(new Jackson2JsonMessageConverter()); + return factory; + } + +} diff --git a/src/main/java/io/qyi/e5/controller/TestController.java b/src/main/java/io/qyi/e5/controller/TestController.java new file mode 100644 index 0000000..0654255 --- /dev/null +++ b/src/main/java/io/qyi/e5/controller/TestController.java @@ -0,0 +1,36 @@ +package io.qyi.e5.controller; + +import org.springframework.amqp.core.MessageProperties; +import org.springframework.amqp.rabbit.connection.CorrelationData; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Controller; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.UUID; + +/** + * @program: e5 + * @description: + * @author: 落叶随风 + * @create: 2020-03-16 01:01 + **/ +@Controller +@RestController +public class TestController { + @Autowired + RabbitTemplate rabbitTemplate; + @GetMapping("/send") + public void aaa(){ + CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); + + + rabbitTemplate.convertAndSend("delay", "delay", "ttt", message -> { + MessageProperties messageProperties = message.getMessageProperties(); + // 设置这条消息的过期时间 + messageProperties.setExpiration("10000"); + return message; + }, correlationData); + } +} diff --git a/src/main/java/io/qyi/e5/service/rabbitMQ/Listener.java b/src/main/java/io/qyi/e5/service/rabbitMQ/Listener.java new file mode 100644 index 0000000..1bd24f7 --- /dev/null +++ b/src/main/java/io/qyi/e5/service/rabbitMQ/Listener.java @@ -0,0 +1,11 @@ +package io.qyi.e5.service.rabbitMQ; + +/** + * @program: msgpush + * @description: + * @author: 落叶随风 + * @create: 2020-01-13 23:34 + **/ +public interface Listener { + public void listen(String msg); +} diff --git a/src/main/java/io/qyi/e5/service/rabbitMQ/impl/ListenerImpl.java b/src/main/java/io/qyi/e5/service/rabbitMQ/impl/ListenerImpl.java new file mode 100644 index 0000000..8535324 --- /dev/null +++ b/src/main/java/io/qyi/e5/service/rabbitMQ/impl/ListenerImpl.java @@ -0,0 +1,41 @@ +package io.qyi.e5.service.rabbitMQ.impl; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.rabbitmq.client.Channel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitHandler; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.stereotype.Service; + +import java.io.IOException; + +/** + * @program: msgpush + * @description: + * @author: 落叶随风 + * @create: 2020-01-13 23:35 + **/ +@Service +public class ListenerImpl { + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + + @RabbitHandler + @RabbitListener(queues = "delay_queue2", containerFactory = "rabbitListenerContainerFactory") + public void listen(Message message, Channel channel) throws IOException { + try { + logger.info("消费者开始处理消息: {}" ,new String(message.getBody())); +// JSONObject data = JSON.parseObject(new String(message.getBody())); +// String token = data.getString("token"); + + channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); + logger.info("处理完成!"); + } catch (IOException e) { + channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true); + e.printStackTrace(); + } + } +} diff --git a/src/test/java/io/qyi/e5/test/MailJobListener.java b/src/test/java/io/qyi/e5/test/MailJobListener.java deleted file mode 100644 index 83016ae..0000000 --- a/src/test/java/io/qyi/e5/test/MailJobListener.java +++ /dev/null @@ -1,33 +0,0 @@ -package io.qyi.e5.test; - -import org.quartz.JobExecutionContext; -import org.quartz.JobExecutionException; -import org.quartz.JobListener; - -/** - * @program: e5 - * @description: - * @author: 落叶随风 - * @create: 2020-03-03 09:39 - **/ -public class MailJobListener implements JobListener { - @Override - public String getName() { - return "listener of mail job"; - } - - @Override - public void jobToBeExecuted(JobExecutionContext context) { - System.out.println("取消执行:\t "+context.getJobDetail().getKey()); - } - - @Override - public void jobExecutionVetoed(JobExecutionContext context) { - System.out.println("准备执行:\t "+context.getJobDetail().getKey()); - } - - @Override - public void jobWasExecuted(JobExecutionContext context, JobExecutionException jobException) { - System.out.println("执行结束:\t "+context.getJobDetail().getKey()); - } -} diff --git a/src/test/java/io/qyi/e5/test/RamJob.java b/src/test/java/io/qyi/e5/test/RamJob.java deleted file mode 100644 index 4769e69..0000000 --- a/src/test/java/io/qyi/e5/test/RamJob.java +++ /dev/null @@ -1,25 +0,0 @@ -package io.qyi.e5.test; - -import org.quartz.Job; -import org.quartz.JobDataMap; -import org.quartz.JobExecutionContext; -import org.quartz.JobExecutionException; - -import java.time.LocalTime; - -/** - * @program: e5 - * @description: - * @author: 落叶随风 - * @create: 2020-03-02 16:37 - **/ -public class RamJob implements Job { - - @Override - public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { - System.out.println("启动定时任务......每十秒执行一次,共执行三次"); - JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap(); - System.out.println(LocalTime.now().toString()); - System.out.println(jobDataMap.get("level") + "" + jobDataMap.get("job")); - } -} diff --git a/src/test/java/io/qyi/e5/test/quartzDome01.java b/src/test/java/io/qyi/e5/test/quartzDome01.java deleted file mode 100644 index 2d577c8..0000000 --- a/src/test/java/io/qyi/e5/test/quartzDome01.java +++ /dev/null @@ -1,65 +0,0 @@ -package io.qyi.e5.test; - -import org.junit.jupiter.api.Test; -import org.quartz.*; -import org.quartz.impl.StdSchedulerFactory; -import org.quartz.impl.matchers.KeyMatcher; - -/** - * @program: e5 - * @description: - * @author: 落叶随风 - * @create: 2020-03-02 16:37 - **/ -public class quartzDome01 { - - @Test - public void d0() throws Exception { - try { - demo01(); - } catch (SchedulerException e) { - System.err.println("发现任务已经在数据库存在了,直接从数据库里运行:"+ e.getMessage()); - // TODO Auto-generated catch block - resumeJobFromDatabase(); - } - - } - - private void resumeJobFromDatabase() throws Exception { - Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler(); - scheduler.start(); - // 等待200秒,让前面的任务都执行完了之后,再关闭调度器 - Thread.sleep(200000); - scheduler.shutdown(true); - } - - public void demo01() throws SchedulerException { - Scheduler scheduler = new StdSchedulerFactory().getScheduler(); - JobDetail jobDetail = JobBuilder.newJob(RamJob.class) - .withDescription("this is a job") - .withIdentity("job1", "group1") - .usingJobData("level", "老") - .build(); - JobDataMap jobDataMap = jobDetail.getJobDataMap(); - jobDataMap.put("job","司机"); - - CronTrigger trigger = TriggerBuilder.newTrigger() - .startNow() -// .withDescription("this is a trigger1") -// .withSchedule(SimpleScheduleBuilder.repeatSecondlyForever(3)) - .withIdentity("mailjob1", "mailgroup") //定义任务名称和分组 - .withSchedule(CronScheduleBuilder.cronSchedule("0/5 * * * * ?")) - .build(); - //增加Job监听 - MailJobListener mailJobListener = new MailJobListener(); - KeyMatcher uKeyMatcher = KeyMatcher.keyEquals(jobDetail.getKey()); - scheduler.getListenerManager().addJobListener(mailJobListener,uKeyMatcher); - - - //将触发器以及调度任务详情绑定到调度器上 - scheduler.scheduleJob(jobDetail,trigger); - //启动调度器 - scheduler.start(); - - } -}