1
0
mirror of https://github.com/snobu/destreamer.git synced 2026-01-24 08:52:18 +00:00

better webSocket initialization

It should solve all timing issues
This commit is contained in:
Luca Armaroli
2020-09-24 02:36:00 +02:00
parent c7e0415786
commit 502565dcea
3 changed files with 80 additions and 55 deletions

View File

@@ -1,3 +1,4 @@
import { ERROR_CODE } from './Errors';
import { logger } from './Logger';
import cliProgress from 'cli-progress';
@@ -5,7 +6,8 @@ import WebSocket from 'ws';
export class DownloadManager {
private webSocket: WebSocket;
private webSocket!: WebSocket;
private connected: boolean;
// TODO: there's a "not a tty" mode for progresBar
// NOTE: is there a way to fix the ETA? Can't get size nor ETA from aria that I can see
// we initialize this for each download
@@ -14,8 +16,8 @@ export class DownloadManager {
private queue: Set<string>;
private index: number;
public constructor(port: number) {
this.webSocket = new WebSocket(`http://localhost:${port}/jsonrpc`);
public constructor() {
this.connected = false;
this.completed = 0;
this.queue = new Set<string>();
this.index = 1;
@@ -28,7 +30,63 @@ export class DownloadManager {
'Please use PowerShell or cmd.exe to run destreamer on Windows.'
);
}
}
/**
* MUST BE CALLED BEFORE ANY OTHER OPERATION
*
* Wait for an established connection between the webSocket
* and Aria2c with a 10s timeout.
* Then send aria2c the global config option if specified.
*/
public async init(port: number, options?: {[option: string]: string}): Promise<void> {
let socTries = 0;
const maxTries = 10;
let timer = 0;
const waitTime = 20;
const errorHanlder = async (err: WebSocket.ErrorEvent): Promise<void> => {
// we try for 10 sec to initialize a socket on the specified port
if (err.error.code === 'ECONNREFUSED' && socTries < maxTries) {
logger.debug(`[DownloadMangaer] trying webSocket init ${socTries}/${maxTries}`);
await new Promise(r => setTimeout(r, 1000));
this.webSocket = new WebSocket(`http://localhost:${port}/jsonrpc`);
this.webSocket.onerror = errorHanlder;
this.webSocket.onopen = openHandler;
socTries++;
}
else {
logger.error(err);
process.exit(ERROR_CODE.NO_CONNECT_ARIA2C);
}
};
const openHandler = (event: WebSocket.OpenEvent): void => {
this.connected = true;
logger.debug(`[DownloadMangaer] open event recived ${event}`);
logger.info('Connected to aria2 daemon!');
};
// create webSocket
// FIXME: implement 'onopen' event
this.webSocket = new WebSocket(`http://localhost:${port}/jsonrpc`);
this.webSocket.onerror = errorHanlder;
this.webSocket.onopen = openHandler;
// wait for socket connection
while (!this.connected) {
if (timer < waitTime) {
timer++;
await new Promise(r => setTimeout(r, 1000));
}
else {
process.exit(ERROR_CODE.NO_CONNECT_ARIA2C);
}
}
// setup messages handling
this.webSocket.on('message', (data: WebSocket.Data) => {
const parsed = JSON.parse(data.toString());
@@ -42,30 +100,6 @@ export class DownloadManager {
logger.info('[INCOMING] \n' + JSON.stringify(parsed, null, 4) + '\n\n');
}
});
}
/**
* MUST BE CALLED BEFORE ANY OTHER OPERATION
*
* Wait for an established connection between the webSocket
* and Aria2c with a 10s timeout.
* Then send aria2c the global config option if specified.
*/
public async init(options?: {[option: string]: string}): Promise<void> {
let tries = 0;
const waitSec = 10;
while (this.webSocket.readyState !== this.webSocket.OPEN) {
if (tries < waitSec) {
tries++;
logger.debug(`[DownloadMangaer] Trying to connect to aria deamon ${tries}/${waitSec}`);
await new Promise(r => setTimeout(r, 1000));
}
else {
throw new Error();
}
}
logger.info('Connected! \n');
if (options) {
logger.info('Now trying to send configs...');

View File

@@ -1,5 +1,3 @@
import { error } from "winston";
/* let's start our error codes up high so we
don't exit with the wrong message if other modules exit with some code */
export const enum ERROR_CODE {
@@ -19,10 +17,11 @@ export const enum ERROR_CODE {
export const errors: {[key: number]: string} = {
[ERROR_CODE.UNHANDLED_ERROR]: 'Unhandled error!\n' +
'Timeout or fatal error, please check your download directory/directories and try again',
[ERROR_CODE.UNHANDLED_ERROR]: 'Unhandled error or uncaught exception! \n' +
'Please check your download directory/directories and try again. \n' +
'If this keep happening please report it on github "https://github.com/snobu/destreamer/issues"',
[ERROR_CODE.ELEVATED_SHELL]: 'Destreamer cannot run in an elevated (Administrator/root) shell.\n' +
[ERROR_CODE.ELEVATED_SHELL]: 'Destreamer cannot run in an elevated (Administrator/root) shell. \n' +
'Please run in a regular, non-elevated window.',
[ERROR_CODE.CANCELLED_USER_INPUT]: 'Input was cancelled by user',
@@ -41,7 +40,7 @@ export const errors: {[key: number]: string} = {
[ERROR_CODE.ARIA2C_CRASH]: 'The aria2c rpc server crashed with the previous message',
[ERROR_CODE.NO_CONNECT_ARIA2C]: 'Could not connect to Aria2c json-rpc webSocket',
[ERROR_CODE.NO_CONNECT_ARIA2C]: 'Could not connect to Aria2c json-rpc webSocket before timeout!',
[ERROR_CODE.NO_DAEMON_PORT]: 'Could not get a free port to use'
};

View File

@@ -24,6 +24,7 @@ import tmp from 'tmp';
// TODO: can we create an export or something for this?
const m3u8Parser: any = require('m3u8-parser');
const tokenCache: TokenCache = new TokenCache();
const downloadManager = new DownloadManager();
export const chromeCacheFolder = '.chrome_data';
tmp.setGracefulCleanup();
@@ -152,12 +153,14 @@ async function downloadVideo(videoGUIDs: Array<string>,
logger.info('Trying to launch and connect to aria2c...\n');
// FIXME: see issue with downloadManager below
/* FIXME: aria2Exec must be defined here for the scope but if it's not aslo undefined it says
that later on is used without being initialized even if we exit if it's not initialized.
Is there something that im missing? Probably since it's late but Ill leave it to you Adrian*/
let aria2cExec: ChildProcess | undefined;
let arai2cExited = false;
let downloadManager: DownloadManager | undefined;
await portfinder.getPortPromise().then(
port => {
await portfinder.getPortPromise({ port: 6800 }).then(
async port => {
logger.debug(`[DESTREAMER] Trying to use port ${port}`);
// Launch aria2c
aria2cExec = exec(
@@ -172,27 +175,16 @@ async function downloadVideo(videoGUIDs: Array<string>,
}
}
);
// bind webSocket
downloadManager = new DownloadManager(port);
// init webSocket
await downloadManager.init(port);
// We are connected
},
error => {
logger.error(error);
process.exit(ERROR_CODE.NO_DEAMON_PORT);
process.exit(ERROR_CODE.NO_DAEMON_PORT);
}
);
// Try to connect to aria2c webSocket
/* FIXME: why does ts not recognize that if we reach here downloadManager is defined
and it forces me to define it as undefined (pun intended) and then check with Optional Chaining
Is there something that im missing? Probably since it's late but Ill leave it to you Adrian*/
try {
await (downloadManager?.init() ?? process.exit(555));
}
catch (err) {
process.exit(ERROR_CODE.NO_CONNECT_ARIA2C);
}
// We are connected
for (const video of videos) {
const masterParser = new m3u8Parser.Parser();
@@ -264,7 +256,7 @@ async function downloadVideo(videoGUIDs: Array<string>,
});
logger.info('\nDownloading video segments \n');
await downloadManager?.downloadUrls(videoUrls, videoSegmentsDir.name);
await downloadManager.downloadUrls(videoUrls, videoSegmentsDir.name);
// audio download
const audioSegmentsDir = tmp.dirSync({
@@ -274,7 +266,7 @@ async function downloadVideo(videoGUIDs: Array<string>,
});
logger.info('\nDownloading audio segments \n');
await downloadManager?.downloadUrls(audioUrls, audioSegmentsDir.name);
await downloadManager.downloadUrls(audioUrls, audioSegmentsDir.name);
// subs download
if (argv.closedCaptions && video.captionsUrl) {
@@ -340,7 +332,7 @@ async function downloadVideo(videoGUIDs: Array<string>,
logger.info('Exiting, this will take some seconds...');
logger.debug('[destreamer] closing downloader socket');
await downloadManager?.close();
await downloadManager.close();
logger.debug('[destreamer] closed downloader. Waiting aria2c deamon exit');
let tries = 0;
while (!arai2cExited) {