'use strict'; const fs = require('fs'); const FileUtil = require('./FileUtils'); const Client = require("ftp"); class FileUploader { ftps = {}; maxClients = 3; options = {}; queue = []; setOption(options, maxThread) { this.options = options; this.maxClients = maxThread || 1; } prepare() { const currLen = Object.keys(this.ftps).length; logger.info('准备创建上传线程:', this.maxClients, currLen); if (currLen < this.maxClients) { for (let i = this.maxClients - Object.keys(this.ftps).length; i > 0; i--) { let name = "Thread-" + Math.random(); this.ftps[name] = new UploadThread(this.options); } } } destroy() { for (let id in this.ftps) { this.ftps[id].destroy(); delete this.ftps[id]; } } /** * 上传文件、目录到ftp服务器。 * @param source * @param dst */ upload(source, dst) { const st = fs.statSync(source); if (st.isFile()) { this.queue.push({src: source, dst: dst}) } else if (st.isDirectory()) { const files = FileUtil.listDirsAndFiles(source); const startIndex = (source.endsWith('\\') || source.endsWith('/')) ? source.length - 1 : source.length; const remoteStr = (dst.endsWith('\\') || dst.endsWith('/')) ? dst.substring(0, dst.length - 1) : dst; for (let i of files.files) { //bugfix: 这里目标文件路径必须是 linux文件路径.否则ftp会做为普通文件直接上传,而不包含目录. this.queue.push({src: i, dst: (remoteStr + i.substring(startIndex)).replace(/\\/g, '/')}); } } this.checkUpload(); } async checkUpload() { // 准备上传线程. this.prepare(); let once = null; while ((once = this.queue.shift()) != null) { const readyFtps = Object.values(this.ftps); const index = Math.floor(Math.random() * readyFtps.length); readyFtps[index].addTask(once); } } } class UploadThread { ftp = null; queue = []; ready = false; stopped = false; options = null; constructor(options) { this.options = options; this.ftp = this.newConnection(); this.run(); } addTask(once) { this.queue.push(once); if (this.stopped) { this.ftp = this.newConnection(); } } newConnection() { this.ready = false; const ftp = new Client(); ftp.on('ready', () => { logger.log('ftp ready'); this.ready = true; }); ftp.on('close', () => { logger.log('ftp client has close'); this.ready = false; }); ftp.on('end', () => { logger.log('ftp client has end'); this.ready = false; }); ftp.on('error', (err) => { logger.log('ftp client has an error : ', JSON.stringify(err)); this.ready = false; }); ftp.connect(this.options); return ftp; } destroy() { this.stopped = true; } async sleep(ms) { return new Promise(resolve => { setTimeout(resolve, ms); }); } // 定时循环.直到destroy()执行. async run() { while (!this.stopped) { if (!this.ready || this.queue.length <= 0) { await this.sleep(500); continue; } const once = this.queue[0]; const err = await this.uploadOnce(this.ftp, once); if (!err) { this.queue.shift(); logger.info('上传完成:', once.src); continue; } else { logger.warn('上传失败,准备重试:', once.src); this.ftp.end(); // 上传失败有可能是线程强制结束导致的上传失败. if (!this.stopped) { this.ftp = this.newConnection(); } } } this.ftp.end(); } uploadOnce(ftp, once) { return new Promise(resolve => { ftp.put(once.src, once.dst, false, resolve) }); } } module.exports = function () { return new FileUploader(); }