mirror of
https://github.com/luoye663/e5.git
synced 2024-12-25 11:18:50 +00:00
调整回调地址
This commit is contained in:
parent
fd52ede2e8
commit
577607dcb6
2
pom.xml
2
pom.xml
@ -10,7 +10,7 @@
|
||||
</parent>
|
||||
<groupId>io.qyi</groupId>
|
||||
<artifactId>e5</artifactId>
|
||||
<version>1.0.8</version>
|
||||
<version>1.1.0</version>
|
||||
<name>e5</name>
|
||||
<description>Demo project for Spring Boot</description>
|
||||
|
||||
|
@ -3,6 +3,7 @@ package io.qyi.e5.config;
|
||||
import io.qyi.e5.outlook.bean.OutlookMq;
|
||||
import io.qyi.e5.outlook.entity.Outlook;
|
||||
import io.qyi.e5.outlook.service.IOutlookService;
|
||||
import io.qyi.e5.service.ExecutorPoolService;
|
||||
import io.qyi.e5.service.task.ITask;
|
||||
import io.qyi.e5.util.redis.RedisUtil;
|
||||
import lombok.SneakyThrows;
|
||||
@ -14,6 +15,7 @@ import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.Resource;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.*;
|
||||
|
||||
@ -35,44 +37,12 @@ public class Start {
|
||||
@Autowired
|
||||
IOutlookService outlookService;
|
||||
|
||||
@Value("${e5.system.threadPool}")
|
||||
Integer poolSize;
|
||||
|
||||
@Value("${e5.system.maximumPoolSize}")
|
||||
int maximumPoolSize;
|
||||
|
||||
@Value("${e5.system.blockingQueueSize}")
|
||||
int blockingQueueSize;
|
||||
|
||||
@Value("${isdebug:true}")
|
||||
private boolean isdebug;
|
||||
|
||||
@Resource
|
||||
ExecutorPoolService poolService;
|
||||
|
||||
private ExecutorService threadPool;
|
||||
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
// log.info("清空redis...... ");
|
||||
// redisUtil.delAll();
|
||||
threadPool = new ThreadPoolExecutor(
|
||||
//要保留在池中的线程数,即使它们处于空闲状态,除非设置了 allowCoreThreadTimeOut
|
||||
poolSize,
|
||||
//池中允许的最大线程数
|
||||
maximumPoolSize,
|
||||
//当线程池中空闲线程数量超过corePoolSize时,多余的线程会在多长时间内被销毁;当线程数大于核心时,这是多余的空闲线程在终止之前等待新任务的最长时间。
|
||||
0,
|
||||
//unit:keepAliveTime的单位
|
||||
TimeUnit.MILLISECONDS,
|
||||
//任务队列,被添加到线程池中,但尚未被执行的任务;它一般分为直接提交队列、有界任务队列、无界任务队列、优先任务队列几种;
|
||||
new LinkedBlockingQueue<>(blockingQueueSize), // 有界队列
|
||||
//线程工厂,用于创建线程,一般用默认即可; new CustThreadFactory(),
|
||||
Executors.defaultThreadFactory(),
|
||||
//拒绝策略;当任务太多来不及处理时,如何拒绝任务;
|
||||
new CustRejectedExecutionHandler()
|
||||
);
|
||||
|
||||
}
|
||||
|
||||
@Scheduled(cron = "0 0/1 * * * ? ")
|
||||
private void distributeTask() {
|
||||
@ -80,52 +50,51 @@ public class Start {
|
||||
log.debug("Debug模式,跳过执行");
|
||||
return;
|
||||
}
|
||||
ExecutorService threadPool = poolService.getThreadPool();
|
||||
|
||||
List<Outlook> runOutlookList = outlookService.findRunOutlookList();
|
||||
CountDownLatch cdl = new CountDownLatch(runOutlookList.size());
|
||||
|
||||
log.info("查询到待调用的数量: {}",runOutlookList.size());
|
||||
log.info("查询到待调用的数量: {}", runOutlookList.size());
|
||||
|
||||
|
||||
|
||||
runOutlookList.forEach(outlook -> {
|
||||
threadPool.execute(new task(outlook,cdl));
|
||||
// threadPool.execute(new task(outlook,cdl));
|
||||
threadPool.submit(new task(outlook, cdl));
|
||||
});
|
||||
|
||||
/*等待线程池内的线程执行完毕*/
|
||||
try {
|
||||
cdl.await();
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
// e.printStackTrace();
|
||||
log.error("等待线程池任务出错,消息: {}",e.getMessage());
|
||||
}
|
||||
log.debug("本轮调用完成!");
|
||||
}
|
||||
|
||||
/*任务执行*/
|
||||
class task implements Runnable {
|
||||
Outlook value ;
|
||||
Outlook value;
|
||||
CountDownLatch cdl;
|
||||
public task(Outlook outlook,CountDownLatch cdl) {
|
||||
|
||||
public task(Outlook outlook, CountDownLatch cdl) {
|
||||
value = outlook;
|
||||
this.cdl = cdl;
|
||||
}
|
||||
@SneakyThrows
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
// System.out.println("消费数据: " + value);
|
||||
Task.listen(new OutlookMq(value.getGithubId(), value.getId()));
|
||||
this.cdl.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*拒绝策略*/
|
||||
class CustRejectedExecutionHandler implements RejectedExecutionHandler {
|
||||
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
|
||||
try {
|
||||
executor.getQueue().put(r);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
Task.listen(new OutlookMq(value.getGithubId(), value.getId()));
|
||||
} finally {
|
||||
this.cdl.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
@ -1,8 +1,11 @@
|
||||
package io.qyi.e5.controller.admin;
|
||||
|
||||
import io.qyi.e5.bean.result.Result;
|
||||
import io.qyi.e5.outlook.service.IOutlookService;
|
||||
import io.qyi.e5.service.ExecutorPoolService;
|
||||
import io.qyi.e5.service.task.ITask;
|
||||
import io.qyi.e5.util.EncryptUtil;
|
||||
import io.qyi.e5.util.ResultUtil;
|
||||
import io.qyi.e5.util.redis.RedisUtil;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
@ -12,10 +15,13 @@ import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RequestParam;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.io.File;
|
||||
import java.io.FileWriter;
|
||||
import java.io.IOException;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
|
||||
/**
|
||||
* @program: 此类里大多都是测试
|
||||
@ -48,6 +54,9 @@ public class AdminController {
|
||||
@Value("${user.token.expire}")
|
||||
private int tokenExpire;
|
||||
|
||||
@Resource
|
||||
ExecutorPoolService poolService;
|
||||
|
||||
/**
|
||||
* 测试队列
|
||||
*
|
||||
@ -127,5 +136,36 @@ public class AdminController {
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 查询线程池状态
|
||||
* @return
|
||||
*/
|
||||
@GetMapping("/findPoolStatus")
|
||||
public Result findPoolStatus() {
|
||||
ExecutorService threadPool = poolService.getThreadPool();
|
||||
|
||||
int activeCount = ((ThreadPoolExecutor) threadPool).getActiveCount();
|
||||
|
||||
long completeTaskCount = ((ThreadPoolExecutor) threadPool).getCompletedTaskCount();
|
||||
|
||||
int poolSize = ((ThreadPoolExecutor) threadPool).getPoolSize();
|
||||
|
||||
long taskCount = ((ThreadPoolExecutor) threadPool).getTaskCount();
|
||||
|
||||
long largestPoolSize = ((ThreadPoolExecutor) threadPool).getLargestPoolSize();
|
||||
|
||||
long maximumPoolSize = ((ThreadPoolExecutor) threadPool).getMaximumPoolSize();
|
||||
|
||||
Map<String, Object> map = new HashMap<>();
|
||||
map.put("活动线程数", activeCount);
|
||||
map.put("执行完成的任务数", completeTaskCount);
|
||||
map.put("核心线程数", poolSize);
|
||||
map.put("线程池中的任务总量", taskCount);
|
||||
map.put("过去执行过的最多的任务数", largestPoolSize);
|
||||
map.put("线程池最大线程数", maximumPoolSize);
|
||||
|
||||
return ResultUtil.success(map);
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -5,6 +5,7 @@ import com.alibaba.fastjson.JSONArray;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
||||
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||
import io.qyi.e5.config.exception.APIException;
|
||||
import io.qyi.e5.github.entity.Github;
|
||||
import io.qyi.e5.github.entity.UserInfo;
|
||||
import io.qyi.e5.github.mapper.GithubMapper;
|
||||
@ -40,11 +41,12 @@ public class GithubServiceImpl extends ServiceImpl<GithubMapper, Github> impleme
|
||||
par.put("code", code);
|
||||
Map<String, Object> head = new HashMap<>();
|
||||
head.put("Content-Type", "application/x-www-form-urlencoded");
|
||||
String s = null;
|
||||
String s;
|
||||
try {
|
||||
s = OkHttpClientUtil.doPost("https://github.com/login/oauth/access_token", head, par);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
// e.printStackTrace();
|
||||
throw new APIException(e.getMessage());
|
||||
}
|
||||
Map<String, String> map = StringUtil.ParsingUrl(s);
|
||||
return map.get("access_token");
|
||||
@ -87,9 +89,9 @@ public class GithubServiceImpl extends ServiceImpl<GithubMapper, Github> impleme
|
||||
}
|
||||
return userInfo;
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
// e.printStackTrace();
|
||||
throw new APIException("请求GITHUB用户信息失败!");
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -84,7 +84,7 @@ public class OutlookController {
|
||||
cron_time_random_start = Integer.valueOf(split[0]);
|
||||
cron_time_random_end = Integer.valueOf(split[1]);
|
||||
} catch (NumberFormatException e) {
|
||||
e.printStackTrace();
|
||||
// e.printStackTrace();
|
||||
return ResultUtil.error(ResultEnum.INVALID_format);
|
||||
}
|
||||
if (cron_time_random_start > cron_time_random_end) {
|
||||
@ -167,6 +167,12 @@ public class OutlookController {
|
||||
outlookService.setStart(authentication.getGithub_id(),id);
|
||||
return ResultUtil.success();
|
||||
}
|
||||
|
||||
/**
|
||||
* 临时屏蔽 delete请求
|
||||
* @param id
|
||||
* @return
|
||||
*/
|
||||
@GetMapping("/delete")
|
||||
public Result delete(@RequestParam int id) {
|
||||
UsernamePasswordAuthenticationToken authentication = (UsernamePasswordAuthenticationToken) SecurityContextHolder.getContext().getAuthentication();
|
||||
|
@ -63,7 +63,7 @@ public class OutlookServiceImpl extends ServiceImpl<OutlookMapper, Outlook> impl
|
||||
JSONObject jsonObject = JSON.parseObject(s);
|
||||
logger.debug("请求access_token返回数据:" + s);
|
||||
if (jsonObject.get("error") != null) {
|
||||
logger.error("错授权误!");
|
||||
logger.debug("错授权误!");
|
||||
throw new APIException(jsonObject.get("error_description").toString());
|
||||
} else {
|
||||
int expires_in = jsonObject.getIntValue("expires_in");
|
||||
@ -318,8 +318,7 @@ public class OutlookServiceImpl extends ServiceImpl<OutlookMapper, Outlook> impl
|
||||
par.put("client_secret", outlook.getClientSecret());
|
||||
par.put("grant_type", "refresh_token");
|
||||
par.put("refresh_token", outlook.getRefreshToken());
|
||||
String s = null;
|
||||
s = OkHttpClientUtil.doPost("https://login.microsoftonline.com/" + outlook.getTenantId() + "/oauth2/v2.0/token", head, par);
|
||||
String s = OkHttpClientUtil.doPost("https://login.microsoftonline.com/" + outlook.getTenantId() + "/oauth2/v2.0/token", head, par);
|
||||
logger.debug("请求刷新列表返回数据:" + s);
|
||||
JSONObject jsonObject = JSON.parseObject(s);
|
||||
if (!jsonObject.containsKey("access_token")) {
|
||||
@ -331,10 +330,14 @@ public class OutlookServiceImpl extends ServiceImpl<OutlookMapper, Outlook> impl
|
||||
outlook.setIdToken(jsonObject.getString("id_token"));
|
||||
QueryWrapper<Outlook> queryWrapper = new QueryWrapper<>();
|
||||
queryWrapper.eq("client_id", outlook.getClientId());
|
||||
if (baseMapper.update(outlook, queryWrapper) != 1) {
|
||||
int update = baseMapper.update(outlook, queryWrapper);
|
||||
if (update > 1) {
|
||||
logger.debug("返更新行数不为1");
|
||||
throw new Exception("更新数据库时发现有重复的key");
|
||||
} else if (update < 1) {
|
||||
throw new Exception("调用成功,但更新状态失败。");
|
||||
}
|
||||
|
||||
return outlook.getAccessToken();
|
||||
// 更新数据库
|
||||
}
|
||||
@ -442,10 +445,12 @@ public class OutlookServiceImpl extends ServiceImpl<OutlookMapper, Outlook> impl
|
||||
QueryWrapper<Outlook> wp = new QueryWrapper<>();
|
||||
wp.eq("github_id", github_id);
|
||||
wp.eq("id", outlookId);
|
||||
if (baseMapper.delete(wp) != 1) {
|
||||
log.error("删除数据失败! github_id:{github_id} - outlookId:{outlookId}");
|
||||
int delete = baseMapper.delete(wp);
|
||||
if (delete != 1) {
|
||||
throw new APIException("删除失败!");
|
||||
}
|
||||
|
||||
log.error("删除数据失败! github_id:{} - outlookId:{} - 删除结果: {}", github_id, outlookId, delete);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
64
src/main/java/io/qyi/e5/service/ExecutorPoolService.java
Normal file
64
src/main/java/io/qyi/e5/service/ExecutorPoolService.java
Normal file
@ -0,0 +1,64 @@
|
||||
package io.qyi.e5.service;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import java.util.concurrent.*;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
public class ExecutorPoolService {
|
||||
|
||||
@Value("${e5.system.threadPool}")
|
||||
Integer poolSize;
|
||||
|
||||
@Value("${e5.system.maximumPoolSize}")
|
||||
int maximumPoolSize;
|
||||
@Value("${e5.system.blockingQueueSize}")
|
||||
int blockingQueueSize;
|
||||
|
||||
|
||||
private ExecutorService threadPool;
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
threadPool = new ThreadPoolExecutor(
|
||||
//要保留在池中的线程数,即使它们处于空闲状态,除非设置了 allowCoreThreadTimeOut
|
||||
poolSize,
|
||||
//池中允许的最大线程数
|
||||
maximumPoolSize,
|
||||
//当线程池中空闲线程数量超过corePoolSize时,多余的线程会在多长时间内被销毁;当线程数大于核心时,这是多余的空闲线程在终止之前等待新任务的最长时间。
|
||||
10,
|
||||
//unit:keepAliveTime的单位
|
||||
TimeUnit.SECONDS,
|
||||
//任务队列,被添加到线程池中,但尚未被执行的任务;它一般分为直接提交队列、有界任务队列、无界任务队列、优先任务队列几种;
|
||||
new LinkedBlockingQueue<>(blockingQueueSize), // 有界队列
|
||||
//线程工厂,用于创建线程,一般用默认即可; new CustThreadFactory(),
|
||||
Executors.defaultThreadFactory(),
|
||||
//拒绝策略;当任务太多来不及处理时,如何拒绝任务;
|
||||
new CustRejectedExecutionHandler()
|
||||
);
|
||||
|
||||
}
|
||||
|
||||
public ExecutorService getThreadPool() {
|
||||
return threadPool;
|
||||
}
|
||||
|
||||
/*拒绝策略*/
|
||||
class CustRejectedExecutionHandler implements RejectedExecutionHandler {
|
||||
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
|
||||
try {
|
||||
executor.getQueue().put(r);
|
||||
} catch (InterruptedException e) {
|
||||
// e.printStackTrace();
|
||||
log.error("队列拒绝策略错误,问题: {}", e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
@ -125,7 +125,6 @@ public class TaskImpl implements ITask {
|
||||
}
|
||||
isExecuteE5 = true;
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
/*连续错误判断*/
|
||||
if (!redisUtil.hasKey(errorKey)) {
|
||||
redisUtil.set(errorKey, 1);
|
||||
|
Loading…
Reference in New Issue
Block a user