插件优化,增加FTP上传进度监控.

This commit is contained in:
andrewlu
2021-02-02 22:20:01 +08:00
parent 46b65e4f9c
commit 99ec923ebc
8 changed files with 292 additions and 103 deletions

View File

@@ -2,6 +2,7 @@
const fs = require('fs');
const FileUtil = require('./FileUtils');
const Client = require("ftp");
const logger = require('./logger');
class FileUploader {
ftps = {};
@@ -13,7 +14,7 @@ class FileUploader {
lastIdleState = true;
constructor() {
this.loopId = setInterval(this.loop.bind(this), 200);
this.loopId = setInterval(this.loop.bind(this), 1000);
}
setOption(options, maxThread) {
@@ -25,6 +26,10 @@ class FileUploader {
this.idleCallback = callback;
}
restart(tName) {
this.ftps[tName] && this.ftps[tName].restart();
}
prepare() {
const currLen = Object.keys(this.ftps).length;
logger.info('准备创建上传线程:', this.maxClients, currLen);
@@ -52,6 +57,7 @@ class FileUploader {
const curState = this.checkState();
if (curState != this.lastIdleState) {
this.lastIdleState = curState;
logger.info('当前状态:', curState);
this.idleCallback && this.idleCallback(curState);
}
}
@@ -65,6 +71,29 @@ class FileUploader {
return idleState;
}
// 检测所有线程运行状态
checkThreads() {
let str = [];
for (let i in this.ftps) {
let p = 0;
if (this.ftps[i].totalTasks && !this.ftps[i].idleState) {
p = (this.ftps[i].totalTasks - this.ftps[i].queue.length) / this.ftps[i].totalTasks;
}
p = Math.floor(p * 10000) / 100;
str.push({
name: i,
idle: this.ftps[i].idleState,
remain: this.ftps[i].queue.length,
hasError: this.ftps[i].hasError,
totalTasks: this.ftps[i].totalTasks,
curTaskPath: this.ftps[i].curTaskPath,
taskStartTime: this.ftps[i].taskStartTime,
progress: p
});
}
return str;
}
/**
* 上传文件、目录到ftp服务器。
* @param source
@@ -105,6 +134,12 @@ class UploadThread {
stopped = false;
options = null;
idleState = true;
hasError = false;
// 总任务数量, 所有任务上传完成时,total清零.
totalTasks = 0;
curTaskPath = "";// 显示当前任务.
taskStartTime = 0; // 显示任务用时.
constructor(options) {
this.options = options;
@@ -117,6 +152,7 @@ class UploadThread {
if (this.stopped) {
this.ftp = this.newConnection();
}
this.totalTasks += 1;
}
newConnection() {
@@ -124,6 +160,10 @@ class UploadThread {
const ftp = new Client();
ftp.on('ready', () => {
this.ready = true;
if (this.hasError) {
this.hasError = false;
logger.success('连接恢复,当前剩余任务:', this.queue.length);
}
});
ftp.on('close', () => {
this.ready = false;
@@ -138,47 +178,78 @@ class UploadThread {
return ftp;
}
destroy() {
this.stopped = true;
getTaskLength() {
return this.queue.length;
}
async sleep(ms) {
destroy() {
this.stopped = true;
this.ftp.destroy();
this.totalTasks = 0;
}
sleep(ms) {
return new Promise(resolve => {
setTimeout(resolve, ms);
setTimeout(resolve, ms, true);
});
}
// 定时循环.直到destroy()执行.
async run() {
while (!this.stopped) {
if (!this.ready || this.queue.length <= 0) {
await this.sleep(500);
if (this.hasError || !this.ready) {
await this.sleep(1000);
let ts = new Date().getTime();
if (this.hasError && ts - this.taskStartTime > 60000) {
this.taskStartTime = ts;
this.ftp.destroy();
this.ftp = this.newConnection();
logger.log('线程异常超时,尝试重启线程');
}
continue;
}
if (this.queue.length <= 0) {
this.idleState = true;
this.totalTasks = 0;
await this.sleep(1000);
continue;
}
this.idleState = false;
const once = this.queue[0];
this.curTaskPath = once;
this.taskStartTime = new Date().getTime();
const err = await this.uploadOnce(this.ftp, once);
if (!err) {
this.queue.shift();
logger.info('上传完成:', once.src);
this.curTaskPath = "";
continue;
} else {
logger.warn('上传失败,准备重试:', once.src);
this.ftp.end();
this.ftp.logout();
this.ftp.destroy();
// 上传失败有可能是线程强制结束导致的上传失败.
if (!this.stopped) {
this.hasError = true;
this.ftp = this.newConnection();
await this.sleep(5000);
}
}
}
this.ftp.end();
this.ftp.destroy();
}
uploadOnce(ftp, once) {
return new Promise(resolve => {
return Promise.race([new Promise(function (resolve) {
ftp.put(once.src, once.dst, false, resolve)
});
}), this.sleep(20000)]);
}
restart() {
this.hasError = true;
this.ready = false;
this.ftp.destroy();
this.ftp = null;
this.ftp = this.newConnection();
}
}

View File

@@ -1,14 +1,6 @@
const fs = require('fs')
const path = require('path')
// console adapter.
global.logger = global.logger || {};
logger.log = (Editor && Editor.log) || console.log;
logger.info = (Editor && Editor.info) || console.info;
logger.warn = (Editor && Editor.warn) || console.warn;
logger.error = (Editor && Editor.error) || console.error;
logger.success = (Editor && Editor.success) || (Editor && Editor.info) || console.log;
class FileUtil {
/*
* 获取window上的文件目录以及文件列表信息

View File

@@ -0,0 +1,12 @@
const logger = (global && global.Editor) || (global && global.console) || (window && window.console);
/**
* 适配不同Logger 对象.
* @type {{warn: (message?: any, ...optionalParams: any[]) => void, log: (message?: any, ...optionalParams: any[]) => void, success: any | ((message?: any, ...optionalParams: any[]) => void), error: (message?: any, ...optionalParams: any[]) => void, info: (message?: any, ...optionalParams: any[]) => void}}
*/
module.exports = {
log: logger.log,
info: logger.info,
warn: logger.warn,
error: logger.error,
success: logger.success
}