ef915a121e
可以在项目代码中直接使用 import um = require('UpdateManager') 然后调用相关接口.
266 lines
7.7 KiB
JavaScript
266 lines
7.7 KiB
JavaScript
'use strict';
|
|
const fs = require('fs');
|
|
const FileUtil = require('./FileUtils');
|
|
const Client = require("ftp");
|
|
const logger = require('./logger');
|
|
|
|
class FileUploader {
|
|
ftps = {};
|
|
maxClients = 3;
|
|
options = {};
|
|
queue = [];
|
|
idleCallback = null;
|
|
loopId = 0;
|
|
lastIdleState = true;
|
|
|
|
constructor() {
|
|
this.loopId = setInterval(this.loop.bind(this), 1000);
|
|
}
|
|
|
|
setOption(options, maxThread) {
|
|
this.options = options;
|
|
this.maxClients = maxThread || 1;
|
|
}
|
|
|
|
setStateListener(callback) {
|
|
this.idleCallback = callback;
|
|
}
|
|
|
|
restart(tName) {
|
|
this.ftps[tName] && this.ftps[tName].restart();
|
|
}
|
|
|
|
prepare() {
|
|
const currLen = Object.keys(this.ftps).length;
|
|
logger.info('准备创建上传线程:', this.maxClients, currLen);
|
|
if (currLen < this.maxClients) {
|
|
for (let i = this.maxClients - Object.keys(this.ftps).length; i > 0; i--) {
|
|
let name = "Thread-" + Math.random();
|
|
this.ftps[name] = new UploadThread(this.options);
|
|
}
|
|
}
|
|
}
|
|
|
|
destroy() {
|
|
for (let id in this.ftps) {
|
|
this.ftps[id].destroy();
|
|
delete this.ftps[id];
|
|
}
|
|
if (this.loopId) {
|
|
clearInterval(this.loopId)
|
|
this.loopId = 0;
|
|
}
|
|
}
|
|
|
|
// event loop.
|
|
loop() {
|
|
const curState = this.checkState();
|
|
if (curState != this.lastIdleState) {
|
|
this.lastIdleState = curState;
|
|
logger.info('当前状态:', curState);
|
|
this.idleCallback && this.idleCallback(curState);
|
|
}
|
|
}
|
|
|
|
checkState() {
|
|
let idleState = true;
|
|
for (let i in this.ftps) {
|
|
idleState = idleState && this.ftps[i].idleState;
|
|
if (!idleState) break;
|
|
}
|
|
return idleState;
|
|
}
|
|
|
|
// 检测所有线程运行状态
|
|
checkThreads() {
|
|
let str = [];
|
|
for (let i in this.ftps) {
|
|
let p = 0;
|
|
if (this.ftps[i].totalTasks && !this.ftps[i].idleState) {
|
|
p = (this.ftps[i].totalTasks - this.ftps[i].queue.length) / this.ftps[i].totalTasks;
|
|
}
|
|
p = parseFloat((Math.floor(p * 10000) / 100).toFixed(2));
|
|
str.push({
|
|
name: i,
|
|
idle: this.ftps[i].idleState,
|
|
remain: this.ftps[i].getTaskLength(),
|
|
hasError: this.ftps[i].hasError,
|
|
totalTasks: this.ftps[i].totalTasks,
|
|
curTaskPath: this.ftps[i].curTaskPath,
|
|
taskStartTime: this.ftps[i].taskStartTime,
|
|
progress: p
|
|
});
|
|
}
|
|
return str;
|
|
}
|
|
|
|
/**
|
|
* 上传文件、目录到ftp服务器。
|
|
* @param source
|
|
* @param dst
|
|
*/
|
|
upload(source, dst) {
|
|
const st = fs.statSync(source);
|
|
if (st.isFile()) {
|
|
this.queue.push({src: source, dst: dst})
|
|
} else if (st.isDirectory()) {
|
|
const files = FileUtil.listDirsAndFiles(source);
|
|
const startIndex = (source.endsWith('\\') || source.endsWith('/')) ? source.length - 1 : source.length;
|
|
const remoteStr = (dst.endsWith('\\') || dst.endsWith('/')) ? dst.substring(0, dst.length - 1) : dst;
|
|
for (let i of files.files) {
|
|
//bugfix: 这里目标文件路径必须是 linux文件路径.否则ftp会做为普通文件直接上传,而不包含目录.
|
|
this.queue.push({src: i, dst: (remoteStr + i.substring(startIndex)).replace(/\\/g, '/')});
|
|
}
|
|
}
|
|
this.checkUpload();
|
|
}
|
|
|
|
async checkUpload() {
|
|
// 准备上传线程.
|
|
this.prepare();
|
|
let once = null;
|
|
while ((once = this.queue.shift()) != null) {
|
|
const readyFtps = Object.values(this.ftps);
|
|
const index = Math.floor(Math.random() * readyFtps.length);
|
|
readyFtps[index].addTask(once);
|
|
}
|
|
}
|
|
}
|
|
|
|
class UploadThread {
|
|
ftp = null;
|
|
queue = [];
|
|
ready = false;
|
|
stopped = false;
|
|
options = null;
|
|
idleState = true;
|
|
hasError = false;
|
|
|
|
// 总任务数量, 所有任务上传完成时,total清零.
|
|
totalTasks = 0;
|
|
curTaskPath = "";// 显示当前任务.
|
|
taskStartTime = 0; // 显示任务用时.
|
|
|
|
constructor(options) {
|
|
this.options = options;
|
|
this.ftp = this.newConnection();
|
|
this.run();
|
|
}
|
|
|
|
addTask(once) {
|
|
this.queue.push(once);
|
|
if (this.stopped) {
|
|
this.ftp = this.newConnection();
|
|
}
|
|
this.totalTasks += 1;
|
|
}
|
|
|
|
newConnection() {
|
|
this.ready = false;
|
|
const ftp = new Client();
|
|
ftp.on('ready', () => {
|
|
this.ready = true;
|
|
if (this.hasError) {
|
|
this.hasError = false;
|
|
logger.success('连接恢复,当前剩余任务:', this.queue.length);
|
|
}
|
|
});
|
|
ftp.on('close', () => {
|
|
this.ready = false;
|
|
});
|
|
ftp.on('end', () => {
|
|
this.ready = false;
|
|
});
|
|
ftp.on('error', (err) => {
|
|
this.ready = false;
|
|
});
|
|
ftp.connect(this.options);
|
|
return ftp;
|
|
}
|
|
|
|
getTaskLength() {
|
|
return this.queue.length;
|
|
}
|
|
|
|
destroy() {
|
|
this.stopped = true;
|
|
this.ftp.destroy();
|
|
this.totalTasks = 0;
|
|
}
|
|
|
|
sleep(ms) {
|
|
return new Promise(resolve => {
|
|
setTimeout(resolve, ms, true);
|
|
});
|
|
}
|
|
|
|
// 定时循环.直到destroy()执行.
|
|
async run() {
|
|
while (!this.stopped) {
|
|
if (this.hasError || !this.ready) {
|
|
await this.sleep(1000);
|
|
let ts = new Date().getTime();
|
|
if (this.hasError && ts - this.taskStartTime > 60000) {
|
|
this.taskStartTime = ts;
|
|
this.ftp.destroy();
|
|
this.ftp = this.newConnection();
|
|
logger.log('线程异常超时,尝试重启线程');
|
|
}
|
|
continue;
|
|
}
|
|
if (this.queue.length <= 0) {
|
|
this.idleState = true;
|
|
this.totalTasks = 0;
|
|
await this.sleep(1000);
|
|
continue;
|
|
}
|
|
this.idleState = false;
|
|
const once = this.queue[0];
|
|
this.curTaskPath = once;
|
|
this.taskStartTime = new Date().getTime();
|
|
const err = await this.uploadOnce(this.ftp, once);
|
|
if (!err) {
|
|
this.queue.shift();
|
|
this.curTaskPath = "";
|
|
continue;
|
|
} else {
|
|
logger.warn('上传失败,准备重试:', once.src);
|
|
this.ftp.logout();
|
|
this.ftp.destroy();
|
|
// 上传失败有可能是线程强制结束导致的上传失败.
|
|
if (!this.stopped) {
|
|
this.hasError = true;
|
|
this.ftp = this.newConnection();
|
|
await this.sleep(5000);
|
|
}
|
|
}
|
|
}
|
|
this.ftp.destroy();
|
|
}
|
|
|
|
uploadOnce(ftp, once) {
|
|
return Promise.race([new Promise(function (resolve) {
|
|
fs.readFile(once.src, (err, data) => {
|
|
if (err) {
|
|
logger.error('文件读取异常:', once.src, err);
|
|
resolve(err)
|
|
return
|
|
} else {
|
|
ftp.put(data, once.dst, false, resolve)
|
|
}
|
|
});
|
|
}), this.sleep(20000)]);
|
|
}
|
|
|
|
restart() {
|
|
this.hasError = true;
|
|
this.ready = false;
|
|
this.ftp.destroy();
|
|
this.ftp = null;
|
|
this.ftp = this.newConnection();
|
|
}
|
|
}
|
|
|
|
module.exports = function () {
|
|
return new FileUploader();
|
|
} |