移除webflux依赖

更改调用模式为线程池,取消rabbitMQ队列使用
This commit is contained in:
Luoye_W 2021-07-28 15:39:54 +08:00
parent 3b21a1cb34
commit 3cc4b0992f
4 changed files with 73 additions and 51 deletions

View File

@ -152,10 +152,10 @@
<scope>compile</scope>
</dependency>
<!--webflux依赖-->
<dependency>
<!--<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
</dependency>-->
<!--这个包同时包含了一些其它的包spring-context、spring-tx、spring-web、spring-messaging、spring-retry、spring-amqp、amqp-client如果想单纯一点可以单独引入。-->
<!-- <dependency>

View File

@ -1,11 +1,14 @@
package io.qyi.e5.config;
import io.qyi.e5.outlook.bean.OutlookMq;
import io.qyi.e5.outlook.entity.Outlook;
import io.qyi.e5.outlook.service.IOutlookService;
import io.qyi.e5.service.task.ITask;
import io.qyi.e5.util.redis.RedisUtil;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@ -13,6 +16,7 @@ import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.time.LocalDateTime;
import java.util.List;
import java.util.concurrent.*;
/**
* @program: e5
@ -32,22 +36,81 @@ public class Start {
@Autowired
IOutlookService outlookService;
@Value("${e5.system.threadPool}")
private int poolSize = 10;
private ExecutorService threadPool = new ThreadPoolExecutor(
//指定了线程池中的线程数量它的数量决定了添加的任务是开辟新的线程去执行还是放到workQueue任务队列中去
poolSize,
//指定了线程池中的最大线程数量这个参数会根据你使用的workQueue任务队列的类型决定线程池会开辟的最大线程数量
poolSize,
//当线程池中空闲线程数量超过corePoolSize时多余的线程会在多长时间内被销毁
0,
//unit:keepAliveTime的单位
TimeUnit.MILLISECONDS,
//任务队列被添加到线程池中但尚未被执行的任务它一般分为直接提交队列有界任务队列无界任务队列优先任务队列几种
new LinkedBlockingQueue<>(poolSize), // 有界队列
//线程工厂用于创建线程一般用默认即可 new CustThreadFactory(),
Executors.defaultThreadFactory(),
//拒绝策略当任务太多来不及处理时如何拒绝任务
new CustRejectedExecutionHandler()
);
@PostConstruct
public void initRedis() {
public void init() {
log.info("清空redis...... ");
redisUtil.delAll();
/* log.info("重新添加队列...... ");
Task.sendTaskOutlookMQALL();*/
}
@Scheduled(cron = "0/10 * * * * ?")
private void distributeTask() {
List<Outlook> runOutlookList = outlookService.findRunOutlookList();
CountDownLatch cdl = new CountDownLatch(runOutlookList.size());
log.info("查询到待调用的数量: {}",runOutlookList.size());
runOutlookList.forEach(outlook -> {
Task.submit(outlook);
threadPool.execute(new task(outlook,cdl));
});
// Task.submit(runOutlookList.get(0));
/*等待线程池内的线程执行完毕*/
try {
cdl.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/*任务执行*/
class task implements Runnable {
Outlook value ;
CountDownLatch cdl;
public task(Outlook outlook,CountDownLatch cdl) {
value = outlook;
this.cdl = cdl;
}
@SneakyThrows
@Override
public void run() {
System.out.println("消费数据: " + value);
Task.listen(new OutlookMq(value.getGithubId(), value.getId()));
this.cdl.countDown();
}
}
/*拒绝策略*/
class CustRejectedExecutionHandler implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

View File

@ -1,5 +1,6 @@
package io.qyi.e5.service.task;
import io.qyi.e5.outlook.bean.OutlookMq;
import io.qyi.e5.outlook.entity.Outlook;
/**
@ -15,5 +16,5 @@ public interface ITask {
boolean executeE5(int github_id,int outlookId);
void submit(Outlook mq);
void listen(OutlookMq mq);
}

View File

@ -28,7 +28,7 @@ import java.util.concurrent.SubmissionPublisher;
**/
@Service
@Slf4j
public class TaskImpl implements ITask, Flow.Subscriber<Outlook> {
public class TaskImpl implements ITask {
@Autowired
IOutlookService outlookService;
@ -42,47 +42,7 @@ public class TaskImpl implements ITask, Flow.Subscriber<Outlook> {
@Value("${outlook.error.countMax}")
int errorCountMax;
private Flow.Subscription subscription;
private SubmissionPublisher publisher = new SubmissionPublisher<OutlookMq>();
{
publisher.subscribe(this);
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
log.info("建立订阅关系");
this.subscription = subscription;
this.subscription.request(10);
}
@Override
public void onNext(Outlook item) {
System.out.println("接收到一个数据" + item);
System.out.println("该任务处理完成,再次请求");
listen(new OutlookMq(item.getGithubId(), item.getId()));
this.subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
log.error("Flow.Subscription.error: \n{}",throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("处理完成");
}
@PreDestroy
public void close(){
log.info("关闭publisher......");
publisher.close();
}
/**
* 更新下次调用时间
@ -194,11 +154,8 @@ public class TaskImpl implements ITask, Flow.Subscriber<Outlook> {
return isExecuteE5;
}
@Override
public void submit(Outlook mq){
publisher.submit(mq);
}
@Override
public void listen(OutlookMq mq) {
boolean b = executeE5(mq.getGithubId(),mq.getOutlookId());
/*再次进行添加任务*/
@ -229,4 +186,5 @@ public class TaskImpl implements ITask, Flow.Subscriber<Outlook> {
return Expiration;
}
}