配置基础rabbitMQ

This commit is contained in:
APLS 2020-03-16 01:22:59 +08:00
parent 760e26d192
commit d85709c4a2
7 changed files with 180 additions and 123 deletions

View File

@ -0,0 +1,92 @@
package io.qyi.e5.config.rabbitMQ;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* @program: msgpush
* @description:
* @author: 落叶随风
* @create: 2020-01-12 22:00
**/
@Configuration
public class RabbitMQConfig {
@Value("")
private String DirectQueueName;
/**
* 处理死信队列的消费队列
* */
@Bean
public Queue fanoutQueue1() {
Map<String, Object> arguments = new HashMap<>(2);
arguments.put("x-dead-letter-exchange", "delay");
arguments.put("x-dead-letter-routing-key", "delay_key");
return new Queue("delay_queue1", true, false, false, arguments);
}
/**
*
*创建消息处理队列
*/
@Bean
public Queue fanoutQueue2() {
return new Queue("delay_queue2", true); // 队列持久
}
/**
* 配置消息交换机
* 针对消费者配置
* FanoutExchange: 将消息分发到所有的绑定队列无routingkey的概念
* HeadersExchange 通过添加属性key-value匹配
* DirectExchange:按照routingkey分发到指定队列
* TopicExchange:多关键字匹配
* @return
*/
@Bean
public DirectExchange fanoutExchangeDelay() {
return new DirectExchange("delay",true, false);
}
/*@Bean
public FanoutExchange fanoutExchangeTencentMsg() {
return new FanoutExchange(EXCHANGE);
}*/
//绑定 将队列和交换机绑定,
@Bean
public Binding bindingFanoutQueue1() {
return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchangeDelay()).with("delay");
}
@Bean
public Binding bindingFanoutQueue2() {
return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchangeDelay()).with("delay_key");
}
// 无限循环问题
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter());
return template;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
}

View File

@ -0,0 +1,36 @@
package io.qyi.e5.controller;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.UUID;
/**
* @program: e5
* @description:
* @author: 落叶随风
* @create: 2020-03-16 01:01
**/
@Controller
@RestController
public class TestController {
@Autowired
RabbitTemplate rabbitTemplate;
@GetMapping("/send")
public void aaa(){
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("delay", "delay", "ttt", message -> {
MessageProperties messageProperties = message.getMessageProperties();
// 设置这条消息的过期时间
messageProperties.setExpiration("10000");
return message;
}, correlationData);
}
}

View File

@ -0,0 +1,11 @@
package io.qyi.e5.service.rabbitMQ;
/**
* @program: msgpush
* @description:
* @author: 落叶随风
* @create: 2020-01-13 23:34
**/
public interface Listener {
public void listen(String msg);
}

View File

@ -0,0 +1,41 @@
package io.qyi.e5.service.rabbitMQ.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import java.io.IOException;
/**
* @program: msgpush
* @description:
* @author: 落叶随风
* @create: 2020-01-13 23:35
**/
@Service
public class ListenerImpl {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@RabbitHandler
@RabbitListener(queues = "delay_queue2", containerFactory = "rabbitListenerContainerFactory")
public void listen(Message message, Channel channel) throws IOException {
try {
logger.info("消费者开始处理消息: {}" ,new String(message.getBody()));
// JSONObject data = JSON.parseObject(new String(message.getBody()));
// String token = data.getString("token");
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
logger.info("处理完成!");
} catch (IOException e) {
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
e.printStackTrace();
}
}
}

View File

@ -1,33 +0,0 @@
package io.qyi.e5.test;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.JobListener;
/**
* @program: e5
* @description:
* @author: 落叶随风
* @create: 2020-03-03 09:39
**/
public class MailJobListener implements JobListener {
@Override
public String getName() {
return "listener of mail job";
}
@Override
public void jobToBeExecuted(JobExecutionContext context) {
System.out.println("取消执行:\t "+context.getJobDetail().getKey());
}
@Override
public void jobExecutionVetoed(JobExecutionContext context) {
System.out.println("准备执行:\t "+context.getJobDetail().getKey());
}
@Override
public void jobWasExecuted(JobExecutionContext context, JobExecutionException jobException) {
System.out.println("执行结束:\t "+context.getJobDetail().getKey());
}
}

View File

@ -1,25 +0,0 @@
package io.qyi.e5.test;
import org.quartz.Job;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import java.time.LocalTime;
/**
* @program: e5
* @description:
* @author: 落叶随风
* @create: 2020-03-02 16:37
**/
public class RamJob implements Job {
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
System.out.println("启动定时任务......每十秒执行一次,共执行三次");
JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap();
System.out.println(LocalTime.now().toString());
System.out.println(jobDataMap.get("level") + "" + jobDataMap.get("job"));
}
}

View File

@ -1,65 +0,0 @@
package io.qyi.e5.test;
import org.junit.jupiter.api.Test;
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
import org.quartz.impl.matchers.KeyMatcher;
/**
* @program: e5
* @description:
* @author: 落叶随风
* @create: 2020-03-02 16:37
**/
public class quartzDome01 {
@Test
public void d0() throws Exception {
try {
demo01();
} catch (SchedulerException e) {
System.err.println("发现任务已经在数据库存在了,直接从数据库里运行:"+ e.getMessage());
// TODO Auto-generated catch block
resumeJobFromDatabase();
}
}
private void resumeJobFromDatabase() throws Exception {
Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
scheduler.start();
// 等待200秒让前面的任务都执行完了之后再关闭调度器
Thread.sleep(200000);
scheduler.shutdown(true);
}
public void demo01() throws SchedulerException {
Scheduler scheduler = new StdSchedulerFactory().getScheduler();
JobDetail jobDetail = JobBuilder.newJob(RamJob.class)
.withDescription("this is a job")
.withIdentity("job1", "group1")
.usingJobData("level", "")
.build();
JobDataMap jobDataMap = jobDetail.getJobDataMap();
jobDataMap.put("job","司机");
CronTrigger trigger = TriggerBuilder.newTrigger()
.startNow()
// .withDescription("this is a trigger1")
// .withSchedule(SimpleScheduleBuilder.repeatSecondlyForever(3))
.withIdentity("mailjob1", "mailgroup") //定义任务名称和分组
.withSchedule(CronScheduleBuilder.cronSchedule("0/5 * * * * ?"))
.build();
//增加Job监听
MailJobListener mailJobListener = new MailJobListener();
KeyMatcher<JobKey> uKeyMatcher = KeyMatcher.keyEquals(jobDetail.getKey());
scheduler.getListenerManager().addJobListener(mailJobListener,uKeyMatcher);
//将触发器以及调度任务详情绑定到调度器上
scheduler.scheduleJob(jobDetail,trigger);
//启动调度器
scheduler.start();
}
}