From 502565dcea95dd742245723aa4f6e229ef717c62 Mon Sep 17 00:00:00 2001 From: Luca Armaroli Date: Thu, 24 Sep 2020 02:36:00 +0200 Subject: [PATCH] better webSocket initialization It should solve all timing issues --- src/DownloadManager.ts | 88 +++++++++++++++++++++++++++++------------- src/Errors.ts | 11 +++--- src/destreamer.ts | 36 +++++++---------- 3 files changed, 80 insertions(+), 55 deletions(-) diff --git a/src/DownloadManager.ts b/src/DownloadManager.ts index 3c7d206..3b3c0fa 100644 --- a/src/DownloadManager.ts +++ b/src/DownloadManager.ts @@ -1,3 +1,4 @@ +import { ERROR_CODE } from './Errors'; import { logger } from './Logger'; import cliProgress from 'cli-progress'; @@ -5,7 +6,8 @@ import WebSocket from 'ws'; export class DownloadManager { - private webSocket: WebSocket; + private webSocket!: WebSocket; + private connected: boolean; // TODO: there's a "not a tty" mode for progresBar // NOTE: is there a way to fix the ETA? Can't get size nor ETA from aria that I can see // we initialize this for each download @@ -14,8 +16,8 @@ export class DownloadManager { private queue: Set; private index: number; - public constructor(port: number) { - this.webSocket = new WebSocket(`http://localhost:${port}/jsonrpc`); + public constructor() { + this.connected = false; this.completed = 0; this.queue = new Set(); this.index = 1; @@ -28,7 +30,63 @@ export class DownloadManager { 'Please use PowerShell or cmd.exe to run destreamer on Windows.' ); } + } + /** + * MUST BE CALLED BEFORE ANY OTHER OPERATION + * + * Wait for an established connection between the webSocket + * and Aria2c with a 10s timeout. + * Then send aria2c the global config option if specified. + */ + public async init(port: number, options?: {[option: string]: string}): Promise { + let socTries = 0; + const maxTries = 10; + let timer = 0; + const waitTime = 20; + + const errorHanlder = async (err: WebSocket.ErrorEvent): Promise => { + // we try for 10 sec to initialize a socket on the specified port + if (err.error.code === 'ECONNREFUSED' && socTries < maxTries) { + logger.debug(`[DownloadMangaer] trying webSocket init ${socTries}/${maxTries}`); + await new Promise(r => setTimeout(r, 1000)); + + this.webSocket = new WebSocket(`http://localhost:${port}/jsonrpc`); + this.webSocket.onerror = errorHanlder; + this.webSocket.onopen = openHandler; + socTries++; + } + else { + logger.error(err); + process.exit(ERROR_CODE.NO_CONNECT_ARIA2C); + } + }; + + const openHandler = (event: WebSocket.OpenEvent): void => { + this.connected = true; + logger.debug(`[DownloadMangaer] open event recived ${event}`); + logger.info('Connected to aria2 daemon!'); + }; + + // create webSocket + // FIXME: implement 'onopen' event + this.webSocket = new WebSocket(`http://localhost:${port}/jsonrpc`); + this.webSocket.onerror = errorHanlder; + this.webSocket.onopen = openHandler; + + + // wait for socket connection + while (!this.connected) { + if (timer < waitTime) { + timer++; + await new Promise(r => setTimeout(r, 1000)); + } + else { + process.exit(ERROR_CODE.NO_CONNECT_ARIA2C); + } + } + + // setup messages handling this.webSocket.on('message', (data: WebSocket.Data) => { const parsed = JSON.parse(data.toString()); @@ -42,30 +100,6 @@ export class DownloadManager { logger.info('[INCOMING] \n' + JSON.stringify(parsed, null, 4) + '\n\n'); } }); - } - - /** - * MUST BE CALLED BEFORE ANY OTHER OPERATION - * - * Wait for an established connection between the webSocket - * and Aria2c with a 10s timeout. - * Then send aria2c the global config option if specified. - */ - public async init(options?: {[option: string]: string}): Promise { - let tries = 0; - const waitSec = 10; - - while (this.webSocket.readyState !== this.webSocket.OPEN) { - if (tries < waitSec) { - tries++; - logger.debug(`[DownloadMangaer] Trying to connect to aria deamon ${tries}/${waitSec}`); - await new Promise(r => setTimeout(r, 1000)); - } - else { - throw new Error(); - } - } - logger.info('Connected! \n'); if (options) { logger.info('Now trying to send configs...'); diff --git a/src/Errors.ts b/src/Errors.ts index 5b91c90..3c9a842 100644 --- a/src/Errors.ts +++ b/src/Errors.ts @@ -1,5 +1,3 @@ -import { error } from "winston"; - /* let's start our error codes up high so we don't exit with the wrong message if other modules exit with some code */ export const enum ERROR_CODE { @@ -19,10 +17,11 @@ export const enum ERROR_CODE { export const errors: {[key: number]: string} = { - [ERROR_CODE.UNHANDLED_ERROR]: 'Unhandled error!\n' + - 'Timeout or fatal error, please check your download directory/directories and try again', + [ERROR_CODE.UNHANDLED_ERROR]: 'Unhandled error or uncaught exception! \n' + + 'Please check your download directory/directories and try again. \n' + + 'If this keep happening please report it on github "https://github.com/snobu/destreamer/issues"', - [ERROR_CODE.ELEVATED_SHELL]: 'Destreamer cannot run in an elevated (Administrator/root) shell.\n' + + [ERROR_CODE.ELEVATED_SHELL]: 'Destreamer cannot run in an elevated (Administrator/root) shell. \n' + 'Please run in a regular, non-elevated window.', [ERROR_CODE.CANCELLED_USER_INPUT]: 'Input was cancelled by user', @@ -41,7 +40,7 @@ export const errors: {[key: number]: string} = { [ERROR_CODE.ARIA2C_CRASH]: 'The aria2c rpc server crashed with the previous message', - [ERROR_CODE.NO_CONNECT_ARIA2C]: 'Could not connect to Aria2c json-rpc webSocket', + [ERROR_CODE.NO_CONNECT_ARIA2C]: 'Could not connect to Aria2c json-rpc webSocket before timeout!', [ERROR_CODE.NO_DAEMON_PORT]: 'Could not get a free port to use' }; diff --git a/src/destreamer.ts b/src/destreamer.ts index 1472cff..eb210b6 100644 --- a/src/destreamer.ts +++ b/src/destreamer.ts @@ -24,6 +24,7 @@ import tmp from 'tmp'; // TODO: can we create an export or something for this? const m3u8Parser: any = require('m3u8-parser'); const tokenCache: TokenCache = new TokenCache(); +const downloadManager = new DownloadManager(); export const chromeCacheFolder = '.chrome_data'; tmp.setGracefulCleanup(); @@ -152,12 +153,14 @@ async function downloadVideo(videoGUIDs: Array, logger.info('Trying to launch and connect to aria2c...\n'); - // FIXME: see issue with downloadManager below + + /* FIXME: aria2Exec must be defined here for the scope but if it's not aslo undefined it says + that later on is used without being initialized even if we exit if it's not initialized. + Is there something that im missing? Probably since it's late but Ill leave it to you Adrian*/ let aria2cExec: ChildProcess | undefined; let arai2cExited = false; - let downloadManager: DownloadManager | undefined; - await portfinder.getPortPromise().then( - port => { + await portfinder.getPortPromise({ port: 6800 }).then( + async port => { logger.debug(`[DESTREAMER] Trying to use port ${port}`); // Launch aria2c aria2cExec = exec( @@ -172,27 +175,16 @@ async function downloadVideo(videoGUIDs: Array, } } ); - // bind webSocket - downloadManager = new DownloadManager(port); + // init webSocket + await downloadManager.init(port); + // We are connected }, error => { logger.error(error); - process.exit(ERROR_CODE.NO_DEAMON_PORT); + process.exit(ERROR_CODE.NO_DAEMON_PORT); } ); - // Try to connect to aria2c webSocket - /* FIXME: why does ts not recognize that if we reach here downloadManager is defined - and it forces me to define it as undefined (pun intended) and then check with Optional Chaining - Is there something that im missing? Probably since it's late but Ill leave it to you Adrian*/ - try { - await (downloadManager?.init() ?? process.exit(555)); - } - catch (err) { - process.exit(ERROR_CODE.NO_CONNECT_ARIA2C); - } - // We are connected - for (const video of videos) { const masterParser = new m3u8Parser.Parser(); @@ -264,7 +256,7 @@ async function downloadVideo(videoGUIDs: Array, }); logger.info('\nDownloading video segments \n'); - await downloadManager?.downloadUrls(videoUrls, videoSegmentsDir.name); + await downloadManager.downloadUrls(videoUrls, videoSegmentsDir.name); // audio download const audioSegmentsDir = tmp.dirSync({ @@ -274,7 +266,7 @@ async function downloadVideo(videoGUIDs: Array, }); logger.info('\nDownloading audio segments \n'); - await downloadManager?.downloadUrls(audioUrls, audioSegmentsDir.name); + await downloadManager.downloadUrls(audioUrls, audioSegmentsDir.name); // subs download if (argv.closedCaptions && video.captionsUrl) { @@ -340,7 +332,7 @@ async function downloadVideo(videoGUIDs: Array, logger.info('Exiting, this will take some seconds...'); logger.debug('[destreamer] closing downloader socket'); - await downloadManager?.close(); + await downloadManager.close(); logger.debug('[destreamer] closed downloader. Waiting aria2c deamon exit'); let tries = 0; while (!arai2cExited) {