diff --git a/pom.xml b/pom.xml index 1c987f4..b75732b 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ io.qyi e5 - 1.0.8 + 1.1.0 e5 Demo project for Spring Boot diff --git a/src/main/java/io/qyi/e5/config/Start.java b/src/main/java/io/qyi/e5/config/Start.java index e495dc0..02ee354 100644 --- a/src/main/java/io/qyi/e5/config/Start.java +++ b/src/main/java/io/qyi/e5/config/Start.java @@ -3,6 +3,7 @@ package io.qyi.e5.config; import io.qyi.e5.outlook.bean.OutlookMq; import io.qyi.e5.outlook.entity.Outlook; import io.qyi.e5.outlook.service.IOutlookService; +import io.qyi.e5.service.ExecutorPoolService; import io.qyi.e5.service.task.ITask; import io.qyi.e5.util.redis.RedisUtil; import lombok.SneakyThrows; @@ -14,6 +15,7 @@ import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; +import javax.annotation.Resource; import java.util.List; import java.util.concurrent.*; @@ -35,44 +37,12 @@ public class Start { @Autowired IOutlookService outlookService; - @Value("${e5.system.threadPool}") - Integer poolSize; - - @Value("${e5.system.maximumPoolSize}") - int maximumPoolSize; - - @Value("${e5.system.blockingQueueSize}") - int blockingQueueSize; - @Value("${isdebug:true}") private boolean isdebug; + @Resource + ExecutorPoolService poolService; - private ExecutorService threadPool; - - - @PostConstruct - public void init() { - // log.info("清空redis...... "); - // redisUtil.delAll(); - threadPool = new ThreadPoolExecutor( - //要保留在池中的线程数,即使它们处于空闲状态,除非设置了 allowCoreThreadTimeOut - poolSize, - //池中允许的最大线程数 - maximumPoolSize, - //当线程池中空闲线程数量超过corePoolSize时,多余的线程会在多长时间内被销毁;当线程数大于核心时,这是多余的空闲线程在终止之前等待新任务的最长时间。 - 0, - //unit:keepAliveTime的单位 - TimeUnit.MILLISECONDS, - //任务队列,被添加到线程池中,但尚未被执行的任务;它一般分为直接提交队列、有界任务队列、无界任务队列、优先任务队列几种; - new LinkedBlockingQueue<>(blockingQueueSize), // 有界队列 - //线程工厂,用于创建线程,一般用默认即可; new CustThreadFactory(), - Executors.defaultThreadFactory(), - //拒绝策略;当任务太多来不及处理时,如何拒绝任务; - new CustRejectedExecutionHandler() - ); - - } @Scheduled(cron = "0 0/1 * * * ? ") private void distributeTask() { @@ -80,52 +50,51 @@ public class Start { log.debug("Debug模式,跳过执行"); return; } + ExecutorService threadPool = poolService.getThreadPool(); + List runOutlookList = outlookService.findRunOutlookList(); CountDownLatch cdl = new CountDownLatch(runOutlookList.size()); - log.info("查询到待调用的数量: {}",runOutlookList.size()); + log.info("查询到待调用的数量: {}", runOutlookList.size()); + + + runOutlookList.forEach(outlook -> { - threadPool.execute(new task(outlook,cdl)); + // threadPool.execute(new task(outlook,cdl)); + threadPool.submit(new task(outlook, cdl)); }); /*等待线程池内的线程执行完毕*/ try { cdl.await(); } catch (InterruptedException e) { - e.printStackTrace(); + // e.printStackTrace(); + log.error("等待线程池任务出错,消息: {}",e.getMessage()); } log.debug("本轮调用完成!"); } /*任务执行*/ class task implements Runnable { - Outlook value ; + Outlook value; CountDownLatch cdl; - public task(Outlook outlook,CountDownLatch cdl) { + + public task(Outlook outlook, CountDownLatch cdl) { value = outlook; this.cdl = cdl; } - @SneakyThrows + @Override public void run() { - // System.out.println("消费数据: " + value); - Task.listen(new OutlookMq(value.getGithubId(), value.getId())); - this.cdl.countDown(); - } - } - - - /*拒绝策略*/ - class CustRejectedExecutionHandler implements RejectedExecutionHandler { - public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { - executor.getQueue().put(r); - } catch (InterruptedException e) { - e.printStackTrace(); + Task.listen(new OutlookMq(value.getGithubId(), value.getId())); + } finally { + this.cdl.countDown(); } } - } + + } diff --git a/src/main/java/io/qyi/e5/controller/admin/AdminController.java b/src/main/java/io/qyi/e5/controller/admin/AdminController.java index 55290ee..58a4027 100644 --- a/src/main/java/io/qyi/e5/controller/admin/AdminController.java +++ b/src/main/java/io/qyi/e5/controller/admin/AdminController.java @@ -1,8 +1,11 @@ package io.qyi.e5.controller.admin; +import io.qyi.e5.bean.result.Result; import io.qyi.e5.outlook.service.IOutlookService; +import io.qyi.e5.service.ExecutorPoolService; import io.qyi.e5.service.task.ITask; import io.qyi.e5.util.EncryptUtil; +import io.qyi.e5.util.ResultUtil; import io.qyi.e5.util.redis.RedisUtil; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -12,10 +15,13 @@ import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; +import javax.annotation.Resource; import java.io.File; import java.io.FileWriter; import java.io.IOException; import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; /** * @program: 此类里大多都是测试 @@ -48,6 +54,9 @@ public class AdminController { @Value("${user.token.expire}") private int tokenExpire; + @Resource + ExecutorPoolService poolService; + /** * 测试队列 * @@ -127,5 +136,36 @@ public class AdminController { } + /** + * 查询线程池状态 + * @return + */ + @GetMapping("/findPoolStatus") + public Result findPoolStatus() { + ExecutorService threadPool = poolService.getThreadPool(); + + int activeCount = ((ThreadPoolExecutor) threadPool).getActiveCount(); + + long completeTaskCount = ((ThreadPoolExecutor) threadPool).getCompletedTaskCount(); + + int poolSize = ((ThreadPoolExecutor) threadPool).getPoolSize(); + + long taskCount = ((ThreadPoolExecutor) threadPool).getTaskCount(); + + long largestPoolSize = ((ThreadPoolExecutor) threadPool).getLargestPoolSize(); + + long maximumPoolSize = ((ThreadPoolExecutor) threadPool).getMaximumPoolSize(); + + Map map = new HashMap<>(); + map.put("活动线程数", activeCount); + map.put("执行完成的任务数", completeTaskCount); + map.put("核心线程数", poolSize); + map.put("线程池中的任务总量", taskCount); + map.put("过去执行过的最多的任务数", largestPoolSize); + map.put("线程池最大线程数", maximumPoolSize); + + return ResultUtil.success(map); + + } } diff --git a/src/main/java/io/qyi/e5/github/service/impl/GithubServiceImpl.java b/src/main/java/io/qyi/e5/github/service/impl/GithubServiceImpl.java index ad75054..2b79563 100644 --- a/src/main/java/io/qyi/e5/github/service/impl/GithubServiceImpl.java +++ b/src/main/java/io/qyi/e5/github/service/impl/GithubServiceImpl.java @@ -5,6 +5,7 @@ import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import io.qyi.e5.config.exception.APIException; import io.qyi.e5.github.entity.Github; import io.qyi.e5.github.entity.UserInfo; import io.qyi.e5.github.mapper.GithubMapper; @@ -40,11 +41,12 @@ public class GithubServiceImpl extends ServiceImpl impleme par.put("code", code); Map head = new HashMap<>(); head.put("Content-Type", "application/x-www-form-urlencoded"); - String s = null; + String s; try { s = OkHttpClientUtil.doPost("https://github.com/login/oauth/access_token", head, par); } catch (Exception e) { - e.printStackTrace(); + // e.printStackTrace(); + throw new APIException(e.getMessage()); } Map map = StringUtil.ParsingUrl(s); return map.get("access_token"); @@ -87,9 +89,9 @@ public class GithubServiceImpl extends ServiceImpl impleme } return userInfo; } catch (Exception e) { - e.printStackTrace(); + // e.printStackTrace(); + throw new APIException("请求GITHUB用户信息失败!"); } - return null; } @Override diff --git a/src/main/java/io/qyi/e5/outlook/controller/OutlookController.java b/src/main/java/io/qyi/e5/outlook/controller/OutlookController.java index 8956867..ecf32c2 100644 --- a/src/main/java/io/qyi/e5/outlook/controller/OutlookController.java +++ b/src/main/java/io/qyi/e5/outlook/controller/OutlookController.java @@ -84,7 +84,7 @@ public class OutlookController { cron_time_random_start = Integer.valueOf(split[0]); cron_time_random_end = Integer.valueOf(split[1]); } catch (NumberFormatException e) { - e.printStackTrace(); + // e.printStackTrace(); return ResultUtil.error(ResultEnum.INVALID_format); } if (cron_time_random_start > cron_time_random_end) { @@ -167,6 +167,12 @@ public class OutlookController { outlookService.setStart(authentication.getGithub_id(),id); return ResultUtil.success(); } + + /** + * 临时屏蔽 delete请求 + * @param id + * @return + */ @GetMapping("/delete") public Result delete(@RequestParam int id) { UsernamePasswordAuthenticationToken authentication = (UsernamePasswordAuthenticationToken) SecurityContextHolder.getContext().getAuthentication(); diff --git a/src/main/java/io/qyi/e5/outlook/service/impl/OutlookServiceImpl.java b/src/main/java/io/qyi/e5/outlook/service/impl/OutlookServiceImpl.java index 7c66b1b..63a8cf1 100644 --- a/src/main/java/io/qyi/e5/outlook/service/impl/OutlookServiceImpl.java +++ b/src/main/java/io/qyi/e5/outlook/service/impl/OutlookServiceImpl.java @@ -63,7 +63,7 @@ public class OutlookServiceImpl extends ServiceImpl impl JSONObject jsonObject = JSON.parseObject(s); logger.debug("请求access_token返回数据:" + s); if (jsonObject.get("error") != null) { - logger.error("错授权误!"); + logger.debug("错授权误!"); throw new APIException(jsonObject.get("error_description").toString()); } else { int expires_in = jsonObject.getIntValue("expires_in"); @@ -318,8 +318,7 @@ public class OutlookServiceImpl extends ServiceImpl impl par.put("client_secret", outlook.getClientSecret()); par.put("grant_type", "refresh_token"); par.put("refresh_token", outlook.getRefreshToken()); - String s = null; - s = OkHttpClientUtil.doPost("https://login.microsoftonline.com/" + outlook.getTenantId() + "/oauth2/v2.0/token", head, par); + String s = OkHttpClientUtil.doPost("https://login.microsoftonline.com/" + outlook.getTenantId() + "/oauth2/v2.0/token", head, par); logger.debug("请求刷新列表返回数据:" + s); JSONObject jsonObject = JSON.parseObject(s); if (!jsonObject.containsKey("access_token")) { @@ -331,10 +330,14 @@ public class OutlookServiceImpl extends ServiceImpl impl outlook.setIdToken(jsonObject.getString("id_token")); QueryWrapper queryWrapper = new QueryWrapper<>(); queryWrapper.eq("client_id", outlook.getClientId()); - if (baseMapper.update(outlook, queryWrapper) != 1) { + int update = baseMapper.update(outlook, queryWrapper); + if (update > 1) { logger.debug("返更新行数不为1"); throw new Exception("更新数据库时发现有重复的key"); + } else if (update < 1) { + throw new Exception("调用成功,但更新状态失败。"); } + return outlook.getAccessToken(); // 更新数据库 } @@ -442,10 +445,12 @@ public class OutlookServiceImpl extends ServiceImpl impl QueryWrapper wp = new QueryWrapper<>(); wp.eq("github_id", github_id); wp.eq("id", outlookId); - if (baseMapper.delete(wp) != 1) { - log.error("删除数据失败! github_id:{github_id} - outlookId:{outlookId}"); + int delete = baseMapper.delete(wp); + if (delete != 1) { throw new APIException("删除失败!"); } + + log.error("删除数据失败! github_id:{} - outlookId:{} - 删除结果: {}", github_id, outlookId, delete); } @Override diff --git a/src/main/java/io/qyi/e5/service/ExecutorPoolService.java b/src/main/java/io/qyi/e5/service/ExecutorPoolService.java new file mode 100644 index 0000000..f534839 --- /dev/null +++ b/src/main/java/io/qyi/e5/service/ExecutorPoolService.java @@ -0,0 +1,64 @@ +package io.qyi.e5.service; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import java.util.concurrent.*; + +@Slf4j +@Component +public class ExecutorPoolService { + + @Value("${e5.system.threadPool}") + Integer poolSize; + + @Value("${e5.system.maximumPoolSize}") + int maximumPoolSize; + @Value("${e5.system.blockingQueueSize}") + int blockingQueueSize; + + + private ExecutorService threadPool; + + @PostConstruct + public void init() { + threadPool = new ThreadPoolExecutor( + //要保留在池中的线程数,即使它们处于空闲状态,除非设置了 allowCoreThreadTimeOut + poolSize, + //池中允许的最大线程数 + maximumPoolSize, + //当线程池中空闲线程数量超过corePoolSize时,多余的线程会在多长时间内被销毁;当线程数大于核心时,这是多余的空闲线程在终止之前等待新任务的最长时间。 + 10, + //unit:keepAliveTime的单位 + TimeUnit.SECONDS, + //任务队列,被添加到线程池中,但尚未被执行的任务;它一般分为直接提交队列、有界任务队列、无界任务队列、优先任务队列几种; + new LinkedBlockingQueue<>(blockingQueueSize), // 有界队列 + //线程工厂,用于创建线程,一般用默认即可; new CustThreadFactory(), + Executors.defaultThreadFactory(), + //拒绝策略;当任务太多来不及处理时,如何拒绝任务; + new CustRejectedExecutionHandler() + ); + + } + + public ExecutorService getThreadPool() { + return threadPool; + } + + /*拒绝策略*/ + class CustRejectedExecutionHandler implements RejectedExecutionHandler { + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + try { + executor.getQueue().put(r); + } catch (InterruptedException e) { + // e.printStackTrace(); + log.error("队列拒绝策略错误,问题: {}", e.getMessage()); + } + } + + } + + +} diff --git a/src/main/java/io/qyi/e5/service/task/impl/TaskImpl.java b/src/main/java/io/qyi/e5/service/task/impl/TaskImpl.java index 9c9d017..475520f 100644 --- a/src/main/java/io/qyi/e5/service/task/impl/TaskImpl.java +++ b/src/main/java/io/qyi/e5/service/task/impl/TaskImpl.java @@ -125,7 +125,6 @@ public class TaskImpl implements ITask { } isExecuteE5 = true; } catch (Exception e) { - e.printStackTrace(); /*连续错误判断*/ if (!redisUtil.hasKey(errorKey)) { redisUtil.set(errorKey, 1);