mirror of
https://github.com/luoye663/e5.git
synced 2024-12-26 03:38:53 +00:00
移除webflux依赖
更改调用模式为线程池,取消rabbitMQ队列使用
This commit is contained in:
parent
e21f4d52e9
commit
8a59009db3
4
pom.xml
4
pom.xml
@ -152,10 +152,10 @@
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<!--webflux依赖-->
|
||||
<dependency>
|
||||
<!--<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-webflux</artifactId>
|
||||
</dependency>
|
||||
</dependency>-->
|
||||
|
||||
<!--这个包同时包含了一些其它的包:spring-context、spring-tx、spring-web、spring-messaging、spring-retry、spring-amqp、amqp-client,如果想单纯一点,可以单独引入。-->
|
||||
<!-- <dependency>
|
||||
|
@ -1,11 +1,14 @@
|
||||
package io.qyi.e5.config;
|
||||
|
||||
import io.qyi.e5.outlook.bean.OutlookMq;
|
||||
import io.qyi.e5.outlook.entity.Outlook;
|
||||
import io.qyi.e5.outlook.service.IOutlookService;
|
||||
import io.qyi.e5.service.task.ITask;
|
||||
import io.qyi.e5.util.redis.RedisUtil;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
@ -13,6 +16,7 @@ import org.springframework.stereotype.Component;
|
||||
import javax.annotation.PostConstruct;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.*;
|
||||
|
||||
/**
|
||||
* @program: e5
|
||||
@ -32,22 +36,81 @@ public class Start {
|
||||
@Autowired
|
||||
IOutlookService outlookService;
|
||||
|
||||
@Value("${e5.system.threadPool}")
|
||||
private int poolSize = 10;
|
||||
|
||||
private ExecutorService threadPool = new ThreadPoolExecutor(
|
||||
//指定了线程池中的线程数量,它的数量决定了添加的任务是开辟新的线程去执行,还是放到workQueue任务队列中去;
|
||||
poolSize,
|
||||
//指定了线程池中的最大线程数量,这个参数会根据你使用的workQueue任务队列的类型,决定线程池会开辟的最大线程数量;
|
||||
poolSize,
|
||||
//当线程池中空闲线程数量超过corePoolSize时,多余的线程会在多长时间内被销毁;
|
||||
0,
|
||||
//unit:keepAliveTime的单位
|
||||
TimeUnit.MILLISECONDS,
|
||||
//任务队列,被添加到线程池中,但尚未被执行的任务;它一般分为直接提交队列、有界任务队列、无界任务队列、优先任务队列几种;
|
||||
new LinkedBlockingQueue<>(poolSize), // 有界队列
|
||||
//线程工厂,用于创建线程,一般用默认即可; new CustThreadFactory(),
|
||||
Executors.defaultThreadFactory(),
|
||||
//拒绝策略;当任务太多来不及处理时,如何拒绝任务;
|
||||
new CustRejectedExecutionHandler()
|
||||
);
|
||||
|
||||
@PostConstruct
|
||||
public void initRedis() {
|
||||
public void init() {
|
||||
log.info("清空redis...... ");
|
||||
redisUtil.delAll();
|
||||
/* log.info("重新添加队列...... ");
|
||||
Task.sendTaskOutlookMQALL();*/
|
||||
|
||||
}
|
||||
|
||||
@Scheduled(cron = "0/10 * * * * ?")
|
||||
private void distributeTask() {
|
||||
List<Outlook> runOutlookList = outlookService.findRunOutlookList();
|
||||
CountDownLatch cdl = new CountDownLatch(runOutlookList.size());
|
||||
|
||||
log.info("查询到待调用的数量: {}",runOutlookList.size());
|
||||
runOutlookList.forEach(outlook -> {
|
||||
Task.submit(outlook);
|
||||
threadPool.execute(new task(outlook,cdl));
|
||||
});
|
||||
// Task.submit(runOutlookList.get(0));
|
||||
/*等待线程池内的线程执行完毕*/
|
||||
try {
|
||||
cdl.await();
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
/*任务执行*/
|
||||
class task implements Runnable {
|
||||
Outlook value ;
|
||||
CountDownLatch cdl;
|
||||
public task(Outlook outlook,CountDownLatch cdl) {
|
||||
value = outlook;
|
||||
this.cdl = cdl;
|
||||
}
|
||||
@SneakyThrows
|
||||
@Override
|
||||
public void run() {
|
||||
System.out.println("消费数据: " + value);
|
||||
Task.listen(new OutlookMq(value.getGithubId(), value.getId()));
|
||||
this.cdl.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*拒绝策略*/
|
||||
class CustRejectedExecutionHandler implements RejectedExecutionHandler {
|
||||
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
|
||||
try {
|
||||
executor.getQueue().put(r);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
@ -1,5 +1,6 @@
|
||||
package io.qyi.e5.service.task;
|
||||
|
||||
import io.qyi.e5.outlook.bean.OutlookMq;
|
||||
import io.qyi.e5.outlook.entity.Outlook;
|
||||
|
||||
/**
|
||||
@ -15,5 +16,5 @@ public interface ITask {
|
||||
|
||||
boolean executeE5(int github_id,int outlookId);
|
||||
|
||||
void submit(Outlook mq);
|
||||
void listen(OutlookMq mq);
|
||||
}
|
||||
|
@ -28,7 +28,7 @@ import java.util.concurrent.SubmissionPublisher;
|
||||
**/
|
||||
@Service
|
||||
@Slf4j
|
||||
public class TaskImpl implements ITask, Flow.Subscriber<Outlook> {
|
||||
public class TaskImpl implements ITask {
|
||||
|
||||
@Autowired
|
||||
IOutlookService outlookService;
|
||||
@ -42,47 +42,7 @@ public class TaskImpl implements ITask, Flow.Subscriber<Outlook> {
|
||||
@Value("${outlook.error.countMax}")
|
||||
int errorCountMax;
|
||||
|
||||
private Flow.Subscription subscription;
|
||||
|
||||
private SubmissionPublisher publisher = new SubmissionPublisher<OutlookMq>();
|
||||
|
||||
{
|
||||
publisher.subscribe(this);
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public void onSubscribe(Flow.Subscription subscription) {
|
||||
log.info("建立订阅关系");
|
||||
this.subscription = subscription;
|
||||
this.subscription.request(10);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void onNext(Outlook item) {
|
||||
System.out.println("接收到一个数据" + item);
|
||||
System.out.println("该任务处理完成,再次请求");
|
||||
listen(new OutlookMq(item.getGithubId(), item.getId()));
|
||||
this.subscription.request(1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable throwable) {
|
||||
log.error("Flow.Subscription.error: \n{}",throwable.getMessage());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onComplete() {
|
||||
System.out.println("处理完成");
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
public void close(){
|
||||
log.info("关闭publisher......");
|
||||
publisher.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* 更新下次调用时间
|
||||
@ -194,11 +154,8 @@ public class TaskImpl implements ITask, Flow.Subscriber<Outlook> {
|
||||
return isExecuteE5;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void submit(Outlook mq){
|
||||
publisher.submit(mq);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void listen(OutlookMq mq) {
|
||||
boolean b = executeE5(mq.getGithubId(),mq.getOutlookId());
|
||||
/*再次进行添加任务*/
|
||||
@ -229,4 +186,5 @@ public class TaskImpl implements ITask, Flow.Subscriber<Outlook> {
|
||||
return Expiration;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
@ -102,9 +102,13 @@ outlook.errorMsg = CompactToken validation, Access token has expired, Access tok
|
||||
outlook.replyUrl=https://e5.qyi.io/outlook/auth2/receive
|
||||
outlook.replyUrlDebug=http://localhost:4200/outlook/auth2/receive
|
||||
|
||||
|
||||
# 连续错误次数
|
||||
outlook.error.countMax=3
|
||||
# 公告txt
|
||||
announcement=classpath:announcement.txt
|
||||
# debug模式密码
|
||||
user.admin.debug.passwd=123456
|
||||
# 执行调用线程池大小
|
||||
e5.system.threadPool=10
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user