hot-update/packages/update-manager/js/FileUploader.js

269 lines
7.7 KiB
JavaScript
Raw Permalink Normal View History

2021-02-01 16:11:00 +00:00
'use strict';
const fs = require('fs');
const FileUtil = require('./FileUtils');
const Client = require("ftp");
const logger = require('./logger');
2021-02-01 16:11:00 +00:00
class FileUploader {
ftps = {};
maxClients = 3;
options = {};
queue = [];
2021-02-02 03:26:55 +00:00
idleCallback = null;
loopId = 0;
lastIdleState = true;
constructor() {
this.loopId = setInterval(this.loop.bind(this), 1000);
2021-02-02 03:26:55 +00:00
}
2021-02-01 16:11:00 +00:00
setOption(options, maxThread) {
this.options = options;
this.maxClients = maxThread || 1;
}
2021-02-02 03:26:55 +00:00
setStateListener(callback) {
this.idleCallback = callback;
}
restart(tName) {
this.ftps[tName] && this.ftps[tName].restart();
}
2021-02-01 16:11:00 +00:00
prepare() {
const currLen = Object.keys(this.ftps).length;
logger.info('准备创建上传线程:', this.maxClients, currLen);
if (currLen < this.maxClients) {
for (let i = this.maxClients - Object.keys(this.ftps).length; i > 0; i--) {
let name = "Thread-" + Math.random();
this.ftps[name] = new UploadThread(this.options);
}
}
}
destroy() {
for (let id in this.ftps) {
this.ftps[id].destroy();
delete this.ftps[id];
}
2021-02-02 03:26:55 +00:00
if (this.loopId) {
clearInterval(this.loopId)
this.loopId = 0;
}
}
// event loop.
loop() {
const curState = this.checkState();
if (curState != this.lastIdleState) {
this.lastIdleState = curState;
logger.info('当前状态:', curState);
2021-02-02 03:26:55 +00:00
this.idleCallback && this.idleCallback(curState);
}
}
checkState() {
let idleState = true;
for (let i in this.ftps) {
idleState = idleState && this.ftps[i].idleState;
if (!idleState) break;
}
return idleState;
2021-02-01 16:11:00 +00:00
}
// 检测所有线程运行状态
checkThreads() {
let str = [];
for (let i in this.ftps) {
let p = 0;
if (this.ftps[i].totalTasks && !this.ftps[i].idleState) {
p = (this.ftps[i].totalTasks - this.ftps[i].queue.length) / this.ftps[i].totalTasks;
}
p = parseFloat((Math.floor(p * 10000) / 100).toFixed(2));
str.push({
name: i,
idle: this.ftps[i].idleState,
remain: this.ftps[i].getTaskLength(),
hasError: this.ftps[i].hasError,
totalTasks: this.ftps[i].totalTasks,
curTaskPath: this.ftps[i].curTaskPath,
taskStartTime: this.ftps[i].taskStartTime,
progress: p
});
}
return str;
}
2021-02-01 16:11:00 +00:00
/**
* 上传文件目录到ftp服务器
* @param source
* @param dst
*/
upload(source, dst) {
const st = fs.statSync(source);
if (st.isFile()) {
this.queue.push({src: source, dst: dst})
} else if (st.isDirectory()) {
const files = FileUtil.listDirsAndFiles(source);
const startIndex = (source.endsWith('\\') || source.endsWith('/')) ? source.length - 1 : source.length;
const remoteStr = (dst.endsWith('\\') || dst.endsWith('/')) ? dst.substring(0, dst.length - 1) : dst;
for (let i of files.files) {
//bugfix: 这里目标文件路径必须是 linux文件路径.否则ftp会做为普通文件直接上传,而不包含目录.
this.queue.push({src: i, dst: (remoteStr + i.substring(startIndex)).replace(/\\/g, '/')});
}
}
logger.info('待上传文件数量:', this.queue.length);
2021-02-01 16:11:00 +00:00
this.checkUpload();
}
async checkUpload() {
// 准备上传线程.
this.prepare();
let once = null;
while ((once = this.queue.shift()) != null) {
const readyFtps = Object.values(this.ftps);
const index = Math.floor(Math.random() * readyFtps.length);
readyFtps[index].addTask(once);
}
}
}
class UploadThread {
ftp = null;
queue = [];
ready = false;
stopped = false;
options = null;
2021-02-02 03:26:55 +00:00
idleState = true;
hasError = false;
// 总任务数量, 所有任务上传完成时,total清零.
totalTasks = 0;
curTaskPath = "";// 显示当前任务.
taskStartTime = 0; // 显示任务用时.
2021-02-01 16:11:00 +00:00
constructor(options) {
this.options = options;
this.ftp = this.newConnection();
this.run();
}
addTask(once) {
this.queue.push(once);
if (this.stopped) {
this.ftp = this.newConnection();
}
this.totalTasks += 1;
2021-02-01 16:11:00 +00:00
}
newConnection() {
this.ready = false;
const ftp = new Client();
ftp.on('ready', () => {
this.ready = true;
if (this.hasError) {
this.hasError = false;
logger.success('连接恢复,当前剩余任务:', this.queue.length);
}
2021-02-01 16:11:00 +00:00
});
ftp.on('close', () => {
this.ready = false;
});
ftp.on('end', () => {
this.ready = false;
});
ftp.on('error', (err) => {
this.ready = false;
});
ftp.connect(this.options);
return ftp;
}
getTaskLength() {
return this.queue.length;
}
2021-02-01 16:11:00 +00:00
destroy() {
this.stopped = true;
this.ftp.destroy();
this.totalTasks = 0;
2021-02-01 16:11:00 +00:00
}
sleep(ms) {
2021-02-01 16:11:00 +00:00
return new Promise(resolve => {
setTimeout(resolve, ms, true);
2021-02-01 16:11:00 +00:00
});
}
// 定时循环.直到destroy()执行.
async run() {
while (!this.stopped) {
if (this.hasError || !this.ready) {
await this.sleep(1000);
let ts = new Date().getTime();
if (this.hasError && ts - this.taskStartTime > 60000) {
this.taskStartTime = ts;
this.ftp.destroy();
this.ftp = this.newConnection();
logger.log('线程异常超时,尝试重启线程');
}
continue;
}
if (this.queue.length <= 0) {
2021-02-02 03:26:55 +00:00
this.idleState = true;
this.totalTasks = 0;
await this.sleep(1000);
2021-02-01 16:11:00 +00:00
continue;
}
2021-02-02 03:26:55 +00:00
this.idleState = false;
2021-02-01 16:11:00 +00:00
const once = this.queue[0];
this.curTaskPath = once;
this.taskStartTime = new Date().getTime();
2021-02-01 16:11:00 +00:00
const err = await this.uploadOnce(this.ftp, once);
if (!err) {
this.queue.shift();
this.curTaskPath = "";
2021-02-01 16:11:00 +00:00
continue;
} else {
logger.warn('上传失败,准备重试:', once.src);
this.ftp.logout();
this.ftp.destroy();
2021-02-01 16:11:00 +00:00
// 上传失败有可能是线程强制结束导致的上传失败.
if (!this.stopped) {
this.hasError = true;
2021-02-01 16:11:00 +00:00
this.ftp = this.newConnection();
await this.sleep(5000);
2021-02-01 16:11:00 +00:00
}
}
}
this.ftp.destroy();
2021-02-01 16:11:00 +00:00
}
uploadOnce(ftp, once) {
return Promise.race([new Promise(function (resolve) {
fs.readFile(once.src, (err, data) => {
if (err) {
logger.error('文件读取异常:', once.src, err);
resolve(err)
return
} else {
ftp.put(data, once.dst, false, resolve)
}
});
}), this.sleep(20000)]);
}
restart() {
this.hasError = true;
this.ready = false;
this.ftp.destroy();
this.ftp = null;
this.ftp = this.newConnection();
2021-02-01 16:11:00 +00:00
}
}
module.exports = function () {
return new FileUploader();
}