'use strict'; const fs = require('fs'); const FileUtil = require('./FileUtils'); const Client = require("ftp"); const logger = require('./logger'); class FileUploader { ftps = {}; maxClients = 3; options = {}; queue = []; idleCallback = null; loopId = 0; lastIdleState = true; constructor() { this.loopId = setInterval(this.loop.bind(this), 1000); } setOption(options, maxThread) { this.options = options; this.maxClients = maxThread || 1; } setStateListener(callback) { this.idleCallback = callback; } restart(tName) { this.ftps[tName] && this.ftps[tName].restart(); } prepare() { const currLen = Object.keys(this.ftps).length; logger.info('准备创建上传线程:', this.maxClients, currLen); if (currLen < this.maxClients) { for (let i = this.maxClients - Object.keys(this.ftps).length; i > 0; i--) { let name = "Thread-" + Math.random(); this.ftps[name] = new UploadThread(this.options); } } } destroy() { for (let id in this.ftps) { this.ftps[id].destroy(); delete this.ftps[id]; } if (this.loopId) { clearInterval(this.loopId) this.loopId = 0; } } // event loop. loop() { const curState = this.checkState(); if (curState != this.lastIdleState) { this.lastIdleState = curState; logger.info('当前状态:', curState); this.idleCallback && this.idleCallback(curState); } } checkState() { let idleState = true; for (let i in this.ftps) { idleState = idleState && this.ftps[i].idleState; if (!idleState) break; } return idleState; } // 检测所有线程运行状态 checkThreads() { let str = []; for (let i in this.ftps) { let p = 0; if (this.ftps[i].totalTasks && !this.ftps[i].idleState) { p = (this.ftps[i].totalTasks - this.ftps[i].queue.length) / this.ftps[i].totalTasks; } p = parseFloat((Math.floor(p * 10000) / 100).toFixed(2)); str.push({ name: i, idle: this.ftps[i].idleState, remain: this.ftps[i].getTaskLength(), hasError: this.ftps[i].hasError, totalTasks: this.ftps[i].totalTasks, curTaskPath: this.ftps[i].curTaskPath, taskStartTime: this.ftps[i].taskStartTime, progress: p }); } return str; } /** * 上传文件、目录到ftp服务器。 * @param source * @param dst */ upload(source, dst) { const st = fs.statSync(source); if (st.isFile()) { this.queue.push({src: source, dst: dst}) } else if (st.isDirectory()) { const files = FileUtil.listDirsAndFiles(source); const startIndex = (source.endsWith('\\') || source.endsWith('/')) ? source.length - 1 : source.length; const remoteStr = (dst.endsWith('\\') || dst.endsWith('/')) ? dst.substring(0, dst.length - 1) : dst; for (let i of files.files) { //bugfix: 这里目标文件路径必须是 linux文件路径.否则ftp会做为普通文件直接上传,而不包含目录. this.queue.push({src: i, dst: (remoteStr + i.substring(startIndex)).replace(/\\/g, '/')}); } } logger.info('待上传文件数量:', this.queue.length); this.checkUpload(); } async checkUpload() { // 准备上传线程. this.prepare(); let once = null; while ((once = this.queue.shift()) != null) { const readyFtps = Object.values(this.ftps); const index = Math.floor(Math.random() * readyFtps.length); readyFtps[index].addTask(once); } } } class UploadThread { ftp = null; queue = []; ready = false; stopped = false; options = null; idleState = true; hasError = false; // 总任务数量, 所有任务上传完成时,total清零. totalTasks = 0; curTaskPath = "";// 显示当前任务. taskStartTime = 0; // 显示任务用时. constructor(options) { this.options = options; this.ftp = this.newConnection(); this.run(); } addTask(once) { this.queue.push(once); if (this.stopped) { this.ftp = this.newConnection(); } this.totalTasks += 1; } newConnection() { this.ready = false; const ftp = new Client(); ftp.on('ready', () => { this.ready = true; if (this.hasError) { this.hasError = false; logger.success('连接恢复,当前剩余任务:', this.queue.length); } }); ftp.on('close', () => { this.ready = false; }); ftp.on('end', () => { this.ready = false; }); ftp.on('error', (err) => { this.ready = false; }); ftp.connect(this.options); return ftp; } getTaskLength() { return this.queue.length; } destroy() { this.stopped = true; this.ftp.destroy(); this.totalTasks = 0; } sleep(ms) { return new Promise(resolve => { setTimeout(resolve, ms, true); }); } // 定时循环.直到destroy()执行. async run() { while (!this.stopped) { if (this.hasError || !this.ready) { await this.sleep(1000); let ts = new Date().getTime(); if (this.hasError && ts - this.taskStartTime > 60000) { this.taskStartTime = ts; this.ftp.destroy(); this.ftp = this.newConnection(); logger.log('线程异常超时,尝试重启线程'); } continue; } if (this.queue.length <= 0) { this.idleState = true; this.totalTasks = 0; await this.sleep(1000); continue; } this.idleState = false; const once = this.queue[0]; this.curTaskPath = once; this.taskStartTime = new Date().getTime(); const err = await this.uploadOnce(this.ftp, once); if (!err) { this.queue.shift(); this.curTaskPath = ""; continue; } else { logger.warn('上传失败,准备重试:', once.src); this.ftp.logout(); this.ftp.destroy(); // 上传失败有可能是线程强制结束导致的上传失败. if (!this.stopped) { this.hasError = true; this.ftp = this.newConnection(); await this.sleep(5000); } } } this.ftp.destroy(); } uploadOnce(ftp, once) { return Promise.race([new Promise(function (resolve) { fs.readFile(once.src, (err, data) => { if (err) { logger.error('文件读取异常:', once.src, err); resolve(err) return } else { ftp.put(data, once.dst, false, resolve) } }); }), this.sleep(20000)]); } restart() { this.hasError = true; this.ready = false; this.ftp.destroy(); this.ftp = null; this.ftp = this.newConnection(); } } module.exports = function () { return new FileUploader(); }