修改为反应式

This commit is contained in:
Luoye_W 2021-07-26 17:25:02 +08:00
parent e6d0bba0ed
commit 3b21a1cb34
14 changed files with 205 additions and 171 deletions

43
pom.xml
View File

@ -10,7 +10,7 @@
</parent>
<groupId>io.qyi</groupId>
<artifactId>e5</artifactId>
<version>1.0.6</version>
<version>1.0.7</version>
<name>e5</name>
<description>Demo project for Spring Boot</description>
@ -19,11 +19,22 @@
</properties>
<dependencies>
<!--排除自带的logging-->
<!--<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
@ -35,23 +46,13 @@
</exclusion>
</exclusions>
</dependency>
<!--排除自带的logging-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--log4j2-->
<dependency>
<!-- 引入log4j2依赖 -->
<!--<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
</dependency>-->
<!--权限管理插件-->
<dependency>
<groupId>org.springframework.boot</groupId>
@ -72,7 +73,7 @@
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.22</version>
<optional>true</optional>
</dependency>
<!-- commons-lang -->
<dependency>
@ -171,6 +172,14 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>9</source>
<target>9</target>
</configuration>
</plugin>
</plugins>
</build>

View File

@ -1,12 +1,18 @@
package io.qyi.e5.config;
import io.qyi.e5.outlook.entity.Outlook;
import io.qyi.e5.outlook.service.IOutlookService;
import io.qyi.e5.service.task.ITask;
import io.qyi.e5.util.redis.RedisUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.time.LocalDateTime;
import java.util.List;
/**
* @program: e5
@ -16,18 +22,32 @@ import javax.annotation.PostConstruct;
**/
@Component
@Slf4j
@EnableScheduling
public class Start {
@Autowired
RedisUtil redisUtil;
@Autowired
ITask Task;
@Autowired
IOutlookService outlookService;
@PostConstruct
public void initRedis() {
log.info("清空redis...... ");
redisUtil.delAll();
/* log.info("重新添加队列...... ");
Task.sendTaskOutlookMQALL();*/
}
@Scheduled(cron = "0/10 * * * * ?")
private void distributeTask() {
List<Outlook> runOutlookList = outlookService.findRunOutlookList();
log.info("查询到待调用的数量: {}",runOutlookList.size());
runOutlookList.forEach(outlook -> {
Task.submit(outlook);
});
// Task.submit(runOutlookList.get(0));
}
}

View File

@ -25,7 +25,7 @@ import java.util.Iterator;
public class UrlAccessDecisionManager implements AccessDecisionManager {
@Override
public void decide(Authentication authentication, Object o, Collection<ConfigAttribute> collection) throws AccessDeniedException, InsufficientAuthenticationException {
log.info("进入权限判断!");
log.debug("进入权限判断!");
if (collection == null) {
return;
}
@ -36,7 +36,7 @@ public class UrlAccessDecisionManager implements AccessDecisionManager {
ConfigAttribute configAttribute = iterator.next();
//访问所请求资源所需要的权限
String needPermission = configAttribute.getAttribute();
log.info("访问 " + o.toString() + " 需要的权限是:" + needPermission);
log.debug("访问 " + o.toString() + " 需要的权限是:" + needPermission);
if (needPermission == null) {
break;
}
@ -44,8 +44,8 @@ public class UrlAccessDecisionManager implements AccessDecisionManager {
Collection<? extends GrantedAuthority> authorities = authentication.getAuthorities();
for (GrantedAuthority ga : authorities) {
if (needPermission.equals(ga.getAuthority())) {
log.info("当前角色: " + ga.getAuthority());
log.info("访问 " + o.toString() + " 已授权!");
log.debug("当前角色: " + ga.getAuthority());
log.debug("访问 " + o.toString() + " 已授权!");
return;
}
}

View File

@ -44,10 +44,10 @@ public class LinkTokenAuthenticationFilter extends OncePerRequestFilter {
UsernamePasswordAuthenticationToken authenticationToken1 = new UsernamePasswordAuthenticationToken(userInfo.get("github_name") == null ? "" : userInfo.get("github_name").toString(),
userInfo.get("avatar_url").toString(), (int) userInfo.get("github_id"), AuthorityUtils.createAuthorityList(authority));
SecurityContextHolder.getContext().setAuthentication(authenticationToken1);
log.info("完成授权,角色:{}" , Arrays.toString(authority) );
log.debug("完成授权,角色:{}" , Arrays.toString(authority) );
}
}
log.info("--------------Token鉴权---------------");
log.debug("--------------Token鉴权---------------");
/*设置跨域 最好在nginx处设置*/
HttpServletResponse response = httpServletResponse;
response.setHeader("Access-Control-Allow-Origin", "*");

View File

@ -48,8 +48,6 @@ public class AdminController {
@Value("${user.token.expire}")
private int tokenExpire;
/**
* 测试队列
*
@ -59,7 +57,7 @@ public class AdminController {
*/
@GetMapping("/send")
public void send(@RequestParam int githubId, @RequestParam int outlookId) {
Task.sendTaskOutlookMQ(githubId, outlookId);
Task.updateOutlookExecDateTime(githubId, outlookId);
}
@GetMapping("/execute")
@ -67,18 +65,6 @@ public class AdminController {
Task.executeE5(githubId, outlookId);
}
/**
* 对所有队列重新添加
*
* @Author: 落叶随风
* @Date: 2020/9/7 14:43
* @Return: * @return: java.lang.String
*/
@GetMapping("/sendAll")
public String sendAll() {
Task.sendTaskOutlookMQALL();
return "ok";
}
/**
* 清空redis
@ -93,6 +79,13 @@ public class AdminController {
return "ok";
}
/**
* 设置公告
* @param text:
* @Author: 落叶随风
* @Date: 2021/7/26 15:30
* @Return: * @return: java.lang.String
*/
@RequestMapping("setAnnouncement")
public String setAnnouncement(String text) throws IOException {
File file = ResourceUtils.getFile("classpath:announcement.txt");
@ -102,6 +95,13 @@ public class AdminController {
return "ok";
}
/**
* 通过配置的密码获取管理员token
* @param passwd:
* @Author: 落叶随风
* @Date: 2021/7/26 15:29
* @Return: * @return: java.lang.String
*/
@RequestMapping("getDebugAdminToken")
public String getDebugAdminToken(String passwd) {
if (userAdminDebugPasswd.equals(passwd)) {
@ -126,4 +126,6 @@ public class AdminController {
return "la la la";
}
}

View File

@ -2,8 +2,6 @@ package io.qyi.e5.outlook.controller;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import io.qyi.e5.bean.result.Result;
import io.qyi.e5.bean.result.ResultEnum;
import io.qyi.e5.config.ResultVO;
import io.qyi.e5.config.exception.APIException;
import io.qyi.e5.config.security.UsernamePasswordAuthenticationToken;
import io.qyi.e5.outlook.entity.Outlook;
@ -15,13 +13,9 @@ import io.qyi.e5.util.redis.RedisUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import javax.servlet.http.HttpServletResponse;
import java.util.UUID;
/**
@ -85,7 +79,7 @@ public class AuthController {
throw new APIException("clientId 或 clientSecret 填写错误!授权失败!");
}
/*添加此用户进消息队列*/
Task.sendTaskOutlookMQ(outlook.getGithubId(),outlookId);
Task.updateOutlookExecDateTime(outlook.getGithubId(),outlookId);
return ResultUtil.success();
}

View File

@ -27,6 +27,8 @@ public interface IOutlookService extends IService<Outlook> {
List<Outlook> findAll();
List<Outlook> findRunOutlookList();
int deleteInfo(int github_id);
List<Outlook> getOutlooklist(int github_id);
@ -45,7 +47,7 @@ public interface IOutlookService extends IService<Outlook> {
* @Date: 2020/12/19 21:29
* @Return: * @return: void
*/
void update(int github_id, int outlookId, Outlook outlook);
void update(Outlook outlook);
void delete(int github_id, int outlookId);

View File

@ -22,7 +22,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -191,6 +190,13 @@ public class OutlookServiceImpl extends ServiceImpl<OutlookMapper, Outlook> impl
return baseMapper.selectList(null);
}
@Override
public List<Outlook> findRunOutlookList(){
int nowDateTime = (int) (System.currentTimeMillis() / 1000);
List<Outlook> outlooks = baseMapper.selectList(new QueryWrapper<Outlook>().eq("status", 3).lt("next_time", nowDateTime));
return outlooks;
}
/**
* 删除用户outlook
*
@ -342,7 +348,6 @@ public class OutlookServiceImpl extends ServiceImpl<OutlookMapper, Outlook> impl
* @updateTime 2020/3/5 14:47
*/
public boolean errorCheck(String msg) {
System.out.println(Arrays.toString(errorMsg));
for (String s : errorMsg) {
if (msg.indexOf(s) != -1) {
return true;
@ -414,18 +419,16 @@ public class OutlookServiceImpl extends ServiceImpl<OutlookMapper, Outlook> impl
}
/**
* 更新数据
* @param github_id: github_id
* @param outlookId: outlookId
* @param outlook: 更新的数据
* @Author: 落叶随风
* @Date: 2020/12/19 21:29
* @Return: * @return: void
*/
@Override
public void update(int github_id, int outlookId, Outlook outlook) {
public void update( Outlook outlook) {
UpdateWrapper<Outlook> uw = new UpdateWrapper<>();
uw.eq("id", outlookId);
uw.eq("github_id", github_id);
uw.eq("id", outlook.getId());
uw.eq("github_id", outlook.getGithubId());
baseMapper.update(outlook, uw);
}

View File

@ -1,11 +0,0 @@
package io.qyi.e5.service.rabbitMQ;
/**
* @program: msgpush
* @description:
* @author: 落叶随风
* @create: 2020-01-13 23:34
**/
public interface Listener {
public void listen(String msg);
}

View File

@ -1,57 +0,0 @@
package io.qyi.e5.service.rabbitMQ.impl;
import com.google.gson.Gson;
import com.rabbitmq.client.Channel;
import io.qyi.e5.outlook.bean.OutlookMq;
import io.qyi.e5.outlook.service.IOutlookService;
import io.qyi.e5.outlook_log.service.IOutlookLogService;
import io.qyi.e5.service.task.ITask;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
/**
* @program: msgpush
* @description:
* @author: 落叶随风
* @create: 2020-01-13 23:35
**/
@Slf4j
@Service
public class ListenerImpl {
@Autowired
IOutlookService outlookService;
@Autowired
ITask Task;
@Autowired
IOutlookLogService outlookLogService;
private static final Gson gson = new Gson();
@RabbitHandler
@RabbitListener(queues = "delay_queue1", containerFactory = "rabbitListenerContainerFactory")
public void listen(Message message, Channel channel) throws IOException {
log.info("消费者1开始处理消息 {},时间戳:{}" ,message,System.currentTimeMillis());
OutlookMq mq = gson.fromJson(new String(message.getBody()), OutlookMq.class);
boolean b = Task.executeE5(mq.getGithubId(),mq.getOutlookId());
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
/*再次进行添加任务*/
if (b) {
if (outlookService.isStatusRun(mq.getGithubId(), mq.getOutlookId())) {
Task.sendTaskOutlookMQ(mq.getGithubId(), mq.getOutlookId());
} else {
outlookLogService.addLog(mq.getGithubId(), mq.getOutlookId(), "error", 0, "检测到手动设置了运行状态,停止调用!");
}
} else {
outlookLogService.addLog(mq.getGithubId(), mq.getOutlookId(), "error", 0, "执行失败,结束调用!");
}
}
}

View File

@ -1,5 +1,7 @@
package io.qyi.e5.service.task;
import io.qyi.e5.outlook.entity.Outlook;
/**
* @program: e5
* @description:
@ -7,10 +9,11 @@ package io.qyi.e5.service.task;
* @create: 2020-04-16 16:51
**/
public interface ITask {
void sendTaskOutlookMQ(int github_id, int outlookId);
void updateOutlookExecDateTime(int github_id, int outlookId);
void sendTaskOutlookMQALL();
boolean executeE5(int github_id,int outlookId);
void submit(Outlook mq);
}

View File

@ -7,17 +7,18 @@ import io.qyi.e5.outlook.service.IOutlookService;
import io.qyi.e5.outlook_log.service.IOutlookLogService;
import io.qyi.e5.service.task.ITask;
import io.qyi.e5.util.redis.RedisUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
/**
* @program: e5
@ -26,8 +27,8 @@ import java.util.UUID;
* @create: 2020-04-16 16:53
**/
@Service
public class TaskImpl implements ITask {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Slf4j
public class TaskImpl implements ITask, Flow.Subscriber<Outlook> {
@Autowired
IOutlookService outlookService;
@ -41,32 +42,93 @@ public class TaskImpl implements ITask {
@Value("${outlook.error.countMax}")
int errorCountMax;
private Flow.Subscription subscription;
private SubmissionPublisher publisher = new SubmissionPublisher<OutlookMq>();
{
publisher.subscribe(this);
}
@Override
@Async
public void sendTaskOutlookMQ(int github_id, int outlookId) {
public void onSubscribe(Flow.Subscription subscription) {
log.info("建立订阅关系");
this.subscription = subscription;
this.subscription.request(10);
}
@Override
public void onNext(Outlook item) {
System.out.println("接收到一个数据" + item);
System.out.println("该任务处理完成,再次请求");
listen(new OutlookMq(item.getGithubId(), item.getId()));
this.subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
log.error("Flow.Subscription.error: \n{}",throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("处理完成");
}
@PreDestroy
public void close(){
log.info("关闭publisher......");
publisher.close();
}
/**
* 更新下次调用时间
* TODO 这一步待删除
* @param github_id:
* @param outlookId:
* @Author: 落叶随风
* @Date: 2021/7/26 15:41
* @Return: * @return: void
*/
@Override
public void updateOutlookExecDateTime(int github_id, int outlookId) {
Outlook Outlook = outlookService.getOne(new QueryWrapper<Outlook>().eq("github_id", github_id).eq("id", outlookId));
if (Outlook == null) {
logger.warn("未找到此用户,github_id: {}", github_id);
log.warn("未找到此用户,github_id: {}", github_id);
return;
}
/*根据用户设置生成随机数*/
int Expiration = getRandom(Outlook.getCronTimeRandomStart(), Outlook.getCronTimeRandomEnd());
Outlook ol = new Outlook();
ol.setId(outlookId).setGithubId(github_id);
ol.setNextTime((int) ((System.currentTimeMillis() / 1000) + Expiration));
outlookService.update(ol);
/*将此用户信息加入redis如果存在则代表在队列中同时提前10秒过期*/
String rsKey = "user.mq:" + github_id + ".outlookId:" + outlookId;
/* String rsKey = "user.mq:" + github_id + ".outlookId:" + outlookId;
if (!redisUtil.hasKey(rsKey)) {
redisUtil.set(rsKey, (System.currentTimeMillis() / 1000) + Expiration, Expiration - 10);
OutlookMq mq = new OutlookMq(github_id, outlookId);
Outlook ol = new Outlook();
ol.setId(outlookId).setGithubId(github_id);
ol.setNextTime((int) ((System.currentTimeMillis() / 1000) + Expiration));
outlookService.update(github_id,outlookId,ol);
send(mq, Expiration * 1000);
outlookService.update(ol);
// send(mq, Expiration * 1000);
} else {
logger.info("Key 存在,不执行{}",rsKey);
}
log.info("Key 存在,不执行{}",rsKey);
}*/
}
/**
* 将所有outlook账户列表加入队列
* @Author: 落叶随风
* @Date: 2021/7/26 15:40
* @Return: * @return: void
*/
@Override
@Async
public void sendTaskOutlookMQALL() {
List<Outlook> all = outlookService.findAll();
Iterator<Outlook> iterator = all.iterator();
@ -77,16 +139,24 @@ public class TaskImpl implements ITask {
/*将此用户信息加入redis如果存在则代表在队列中同时提前10秒过期*/
if (!redisUtil.hasKey("user.mq:" + next.getGithubId())) {
redisUtil.set("user.mq:" + next.getGithubId(), 0, Expiration - 10);
send(next.getGithubId(), Expiration * 1000);
// send(next.getGithubId(), Expiration * 1000);
}
}
}
/**
* 调用一次邮件
* @param github_id: github_id
* @param outlookId: outlookId
* @Author: 落叶随风
* @Date: 2021/7/26 15:39
* @Return: * @return: boolean
*/
@Override
public boolean executeE5(int github_id,int outlookId) {
Outlook Outlook = outlookService.getOne(new QueryWrapper<Outlook>().eq("github_id", github_id).eq("id",outlookId));
if (Outlook == null) {
logger.warn("未找到此用户,github_id: {}", github_id);
log.warn("未找到此用户,github_id: {}", github_id);
return false;
}
boolean isExecuteE5;
@ -111,8 +181,8 @@ public class TaskImpl implements ITask {
outlookLogService.addLog(github_id, outlookId,"error", 0, "检测到3次连续错误下次将不再自动调用请修正错误后再授权开启续订。");
/*设置状态为停止*/
Outlook outlook = new Outlook();
outlook.setStatus(5);
outlookService.update(github_id,outlookId,outlook);
outlook.setStatus(5).setId(outlookId).setGithubId(github_id);
outlookService.update(outlook);
isExecuteE5 = false;
} else {
redisUtil.incr(errorKey, 1);
@ -124,26 +194,23 @@ public class TaskImpl implements ITask {
return isExecuteE5;
}
/**
* 发送消息到队列
*
* @param Expiration
* @Description:
* @param: * @param msg
* @return: void
* @Author: 落叶随风
* @Date: 2020/4/16
*/
public void send(Object msg, int Expiration) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
@Override
public void submit(Outlook mq){
publisher.submit(mq);
}
rabbitTemplate.convertAndSend("delay", "routing_delay", msg, message -> {
MessageProperties messageProperties = message.getMessageProperties();
// 设置这条消息的过期时间
// messageProperties.setExpiration(Expiration);
messageProperties.setHeader("x-delay", Expiration);
return message;
}, correlationData);
public void listen(OutlookMq mq) {
boolean b = executeE5(mq.getGithubId(),mq.getOutlookId());
/*再次进行添加任务*/
if (b) {
if (outlookService.isStatusRun(mq.getGithubId(), mq.getOutlookId())) {
updateOutlookExecDateTime(mq.getGithubId(), mq.getOutlookId());
} else {
outlookLogService.addLog(mq.getGithubId(), mq.getOutlookId(), "error", 0, "检测到手动设置了运行状态,停止调用!");
}
} else {
outlookLogService.addLog(mq.getGithubId(), mq.getOutlookId(), "error", 0, "执行失败,结束调用!");
}
}
/**
@ -161,4 +228,5 @@ public class TaskImpl implements ITask {
int Expiration = (r.nextInt(end - start + 1) + start);
return Expiration;
}
}

View File

@ -1,8 +1,6 @@
package io.qyi.e5.util;
import com.sun.org.apache.xerces.internal.impl.dv.util.Base64;
import javax.crypto.Cipher;
import javax.crypto.KeyGenerator;
import javax.crypto.Mac;
@ -10,6 +8,7 @@ import javax.crypto.SecretKey;
import javax.crypto.spec.SecretKeySpec;
import java.security.MessageDigest;
import java.security.SecureRandom;
import java.util.Base64;
public class EncryptUtil {
public static final String MD5 = "MD5";
@ -153,7 +152,7 @@ public class EncryptUtil {
}
private String base64(byte[] res) {
return Base64.encode(res);
return Base64.getEncoder().encodeToString(res);
}
/**
@ -324,7 +323,7 @@ public class EncryptUtil {
* @return
*/
public String Base64Encode(String res) {
return Base64.encode(res.getBytes());
return base64(res.getBytes());
}
/**
@ -334,6 +333,6 @@ public class EncryptUtil {
* @return
*/
public String Base64Decode(String res) {
return new String(Base64.decode(res));
return new String(Base64.getDecoder().decode(res));
}
}

View File

@ -5,9 +5,7 @@ import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
@ -99,7 +97,11 @@ public class RedisUtil {
if (key.length == 1) {
redisTemplate.delete(key[0]);
} else {
redisTemplate.delete(CollectionUtils.arrayToList(key));
Collection<String> keys = new ArrayList<>();
for (int i = 0; i < key.length; i++) {
keys.add(key[i]);
}
redisTemplate.delete(keys);
}
}
}