187 lines
5.0 KiB
JavaScript
187 lines
5.0 KiB
JavaScript
'use strict';
|
|
const fs = require('fs');
|
|
const FileUtil = require('./FileUtils');
|
|
const Client = require("ftp");
|
|
|
|
class FileUploader {
|
|
ftps = {};
|
|
maxClients = 3;
|
|
options = {};
|
|
queue = [];
|
|
idleCallback = null;
|
|
loopId = 0;
|
|
lastIdleState = true;
|
|
|
|
constructor() {
|
|
this.loopId = setInterval(this.loop.bind(this), 200);
|
|
}
|
|
|
|
setOption(options, maxThread) {
|
|
this.options = options;
|
|
this.maxClients = maxThread || 1;
|
|
}
|
|
|
|
setStateListener(callback) {
|
|
this.idleCallback = callback;
|
|
}
|
|
|
|
prepare() {
|
|
const currLen = Object.keys(this.ftps).length;
|
|
logger.info('准备创建上传线程:', this.maxClients, currLen);
|
|
if (currLen < this.maxClients) {
|
|
for (let i = this.maxClients - Object.keys(this.ftps).length; i > 0; i--) {
|
|
let name = "Thread-" + Math.random();
|
|
this.ftps[name] = new UploadThread(this.options);
|
|
}
|
|
}
|
|
}
|
|
|
|
destroy() {
|
|
for (let id in this.ftps) {
|
|
this.ftps[id].destroy();
|
|
delete this.ftps[id];
|
|
}
|
|
if (this.loopId) {
|
|
clearInterval(this.loopId)
|
|
this.loopId = 0;
|
|
}
|
|
}
|
|
|
|
// event loop.
|
|
loop() {
|
|
const curState = this.checkState();
|
|
if (curState != this.lastIdleState) {
|
|
this.lastIdleState = curState;
|
|
this.idleCallback && this.idleCallback(curState);
|
|
}
|
|
}
|
|
|
|
checkState() {
|
|
let idleState = true;
|
|
for (let i in this.ftps) {
|
|
idleState = idleState && this.ftps[i].idleState;
|
|
if (!idleState) break;
|
|
}
|
|
return idleState;
|
|
}
|
|
|
|
/**
|
|
* 上传文件、目录到ftp服务器。
|
|
* @param source
|
|
* @param dst
|
|
*/
|
|
upload(source, dst) {
|
|
const st = fs.statSync(source);
|
|
if (st.isFile()) {
|
|
this.queue.push({src: source, dst: dst})
|
|
} else if (st.isDirectory()) {
|
|
const files = FileUtil.listDirsAndFiles(source);
|
|
const startIndex = (source.endsWith('\\') || source.endsWith('/')) ? source.length - 1 : source.length;
|
|
const remoteStr = (dst.endsWith('\\') || dst.endsWith('/')) ? dst.substring(0, dst.length - 1) : dst;
|
|
for (let i of files.files) {
|
|
//bugfix: 这里目标文件路径必须是 linux文件路径.否则ftp会做为普通文件直接上传,而不包含目录.
|
|
this.queue.push({src: i, dst: (remoteStr + i.substring(startIndex)).replace(/\\/g, '/')});
|
|
}
|
|
}
|
|
this.checkUpload();
|
|
}
|
|
|
|
async checkUpload() {
|
|
// 准备上传线程.
|
|
this.prepare();
|
|
let once = null;
|
|
while ((once = this.queue.shift()) != null) {
|
|
const readyFtps = Object.values(this.ftps);
|
|
const index = Math.floor(Math.random() * readyFtps.length);
|
|
readyFtps[index].addTask(once);
|
|
}
|
|
}
|
|
}
|
|
|
|
class UploadThread {
|
|
ftp = null;
|
|
queue = [];
|
|
ready = false;
|
|
stopped = false;
|
|
options = null;
|
|
idleState = true;
|
|
|
|
constructor(options) {
|
|
this.options = options;
|
|
this.ftp = this.newConnection();
|
|
this.run();
|
|
}
|
|
|
|
addTask(once) {
|
|
this.queue.push(once);
|
|
if (this.stopped) {
|
|
this.ftp = this.newConnection();
|
|
}
|
|
}
|
|
|
|
newConnection() {
|
|
this.ready = false;
|
|
const ftp = new Client();
|
|
ftp.on('ready', () => {
|
|
this.ready = true;
|
|
});
|
|
ftp.on('close', () => {
|
|
this.ready = false;
|
|
});
|
|
ftp.on('end', () => {
|
|
this.ready = false;
|
|
});
|
|
ftp.on('error', (err) => {
|
|
this.ready = false;
|
|
});
|
|
ftp.connect(this.options);
|
|
return ftp;
|
|
}
|
|
|
|
destroy() {
|
|
this.stopped = true;
|
|
}
|
|
|
|
async sleep(ms) {
|
|
return new Promise(resolve => {
|
|
setTimeout(resolve, ms);
|
|
});
|
|
}
|
|
|
|
// 定时循环.直到destroy()执行.
|
|
async run() {
|
|
while (!this.stopped) {
|
|
if (!this.ready || this.queue.length <= 0) {
|
|
await this.sleep(500);
|
|
this.idleState = true;
|
|
continue;
|
|
}
|
|
this.idleState = false;
|
|
const once = this.queue[0];
|
|
const err = await this.uploadOnce(this.ftp, once);
|
|
if (!err) {
|
|
this.queue.shift();
|
|
logger.info('上传完成:', once.src);
|
|
continue;
|
|
} else {
|
|
logger.warn('上传失败,准备重试:', once.src);
|
|
this.ftp.end();
|
|
// 上传失败有可能是线程强制结束导致的上传失败.
|
|
if (!this.stopped) {
|
|
this.ftp = this.newConnection();
|
|
}
|
|
}
|
|
}
|
|
this.ftp.end();
|
|
}
|
|
|
|
uploadOnce(ftp, once) {
|
|
return new Promise(resolve => {
|
|
ftp.put(once.src, once.dst, false, resolve)
|
|
});
|
|
}
|
|
}
|
|
|
|
module.exports = function () {
|
|
return new FileUploader();
|
|
} |