From a1760ecbb780dc17a7675bd60fb50aa1103fa961 Mon Sep 17 00:00:00 2001 From: Sam Wilkins Date: Fri, 10 Jan 2020 06:35:30 -0500 Subject: added hierarchical structure to session --- src/server/session/agents/applied_session_agent.ts | 55 ++++++++++++++++++++++ 1 file changed, 55 insertions(+) create mode 100644 src/server/session/agents/applied_session_agent.ts (limited to 'src/server/session/agents/applied_session_agent.ts') diff --git a/src/server/session/agents/applied_session_agent.ts b/src/server/session/agents/applied_session_agent.ts new file mode 100644 index 000000000..cb7f63c34 --- /dev/null +++ b/src/server/session/agents/applied_session_agent.ts @@ -0,0 +1,55 @@ +import { isMaster } from "cluster"; +import { Monitor } from "./monitor"; +import { ServerWorker } from "./server_worker"; + +export type ExitHandler = (reason: Error | boolean) => void | Promise; + +export abstract class AppliedSessionAgent { + + // the following two methods allow the developer to create a custom + // session and use the built in customization options for each thread + protected abstract async launchMonitor(): Promise; + protected abstract async launchServerWorker(): Promise; + + private launched = false; + + public killSession = (reason: string, graceful = true, errorCode = 0) => { + const target = isMaster ? this.sessionMonitor : this.serverWorker; + target.killSession(reason, graceful, errorCode); + } + + private sessionMonitorRef: Monitor | undefined; + public get sessionMonitor(): Monitor { + if (!isMaster) { + this.serverWorker.sendMonitorAction("kill", { + graceful: false, + reason: "Cannot access the session monitor directly from the server worker thread.", + errorCode: 1 + }); + throw new Error(); + } + return this.sessionMonitorRef!; + } + + private serverWorkerRef: ServerWorker | undefined; + public get serverWorker(): ServerWorker { + if (isMaster) { + throw new Error("Cannot access the server worker directly from the session monitor thread"); + } + return this.serverWorkerRef!; + } + + public async launch(): Promise { + if (!this.launched) { + this.launched = true; + if (isMaster) { + this.sessionMonitorRef = await this.launchMonitor(); + } else { + this.serverWorkerRef = await this.launchServerWorker(); + } + } else { + throw new Error("Cannot launch a session thread more than once per process."); + } + } + +} \ No newline at end of file -- cgit v1.2.3-70-g09d2 From 109abe78646c94903ef423aeb7db213087c4b92d Mon Sep 17 00:00:00 2001 From: Sam Wilkins Date: Fri, 10 Jan 2020 13:57:21 -0500 Subject: event emitter, streamlined initialization --- src/server/DashSession/DashSessionAgent.ts | 213 +++++++++++++++++++++ src/server/DashSession/crash_instructions.txt | 14 ++ .../DashSession/remote_debug_instructions.txt | 16 ++ src/server/DashSessionAgent.ts | 168 ---------------- src/server/index.ts | 2 +- src/server/remote_debug_instructions.txt | 16 -- src/server/repl.ts | 128 ------------- src/server/session/agents/applied_session_agent.ts | 9 +- src/server/session/agents/monitor.ts | 84 ++++---- src/server/session/agents/server_worker.ts | 4 + src/server/session/utilities/repl.ts | 128 +++++++++++++ 11 files changed, 418 insertions(+), 364 deletions(-) create mode 100644 src/server/DashSession/DashSessionAgent.ts create mode 100644 src/server/DashSession/crash_instructions.txt create mode 100644 src/server/DashSession/remote_debug_instructions.txt delete mode 100644 src/server/DashSessionAgent.ts delete mode 100644 src/server/remote_debug_instructions.txt delete mode 100644 src/server/repl.ts create mode 100644 src/server/session/utilities/repl.ts (limited to 'src/server/session/agents/applied_session_agent.ts') diff --git a/src/server/DashSession/DashSessionAgent.ts b/src/server/DashSession/DashSessionAgent.ts new file mode 100644 index 000000000..b031c177e --- /dev/null +++ b/src/server/DashSession/DashSessionAgent.ts @@ -0,0 +1,213 @@ +import { Email, pathFromRoot } from "../ActionUtilities"; +import { red, yellow, green, cyan } from "colors"; +import { get } from "request-promise"; +import { Utils } from "../../Utils"; +import { WebSocket } from "../Websocket/Websocket"; +import { MessageStore } from "../Message"; +import { launchServer, onWindows } from ".."; +import { existsSync, mkdirSync, readdirSync, statSync, createWriteStream, readFileSync } from "fs"; +import * as Archiver from "archiver"; +import { resolve } from "path"; +import { AppliedSessionAgent, ExitHandler } from "../session/agents/applied_session_agent"; +import { Monitor } from "../session/agents/monitor"; +import { ServerWorker } from "../session/agents/server_worker"; + +/** + * If we're the monitor (master) thread, we should launch the monitor logic for the session. + * Otherwise, we must be on a worker thread that was spawned *by* the monitor (master) thread, and thus + * our job should be to run the server. + */ +export class DashSessionAgent extends AppliedSessionAgent { + + private readonly notificationRecipients = ["brownptcdash@gmail.com"]; + private readonly signature = "-Dash Server Session Manager"; + private readonly releaseDesktop = pathFromRoot("../../Desktop"); + + /** + * The core method invoked when the single master thread is initialized. + * Installs event hooks, repl commands and additional IPC listeners. + */ + protected async initializeMonitor(monitor: Monitor) { + monitor.addReplCommand("pull", [], () => monitor.exec("git pull")); + monitor.addReplCommand("solr", [/start|stop|index/], this.executeSolrCommand); + monitor.addReplCommand("backup", [], this.backup); + monitor.addReplCommand("debug", [/active|passive/, /\S+\@\S+/], async ([mode, recipient]) => this.dispatchZippedDebugBackup(mode, recipient)); + monitor.addServerMessageListener("backup", this.backup); + monitor.addServerMessageListener("debug", ({ args: { mode, recipient } }) => this.dispatchZippedDebugBackup(mode, recipient)); + monitor.on(Monitor.IntrinsicEvents.KeyGenerated, this.dispatchSessionPassword); + monitor.on(Monitor.IntrinsicEvents.CrashDetected, this.dispatchCrashReport); + } + + /** + * The core method invoked when a server worker thread is initialized. + * Installs logic to be executed when the server worker dies. + */ + protected async initializeServerWorker() { + const worker = ServerWorker.Create(launchServer); // server initialization delegated to worker + worker.addExitHandler(this.notifyClient); + return worker; + } + + /** + * Prepares the body of the email with instructions on restoring the transmitted remote database backup locally. + */ + private _remoteDebugInstructions: string | undefined; + private generateDebugInstructions = (zipName: string, target: string) => { + if (!this._remoteDebugInstructions) { + this._remoteDebugInstructions = readFileSync(resolve(__dirname, "./remote_debug_instructions.txt"), { encoding: "utf8" }); + } + return this._remoteDebugInstructions + .replace(/__zipname__/, zipName) + .replace(/__target__/, target) + .replace(/__signature__/, this.signature); + } + + /** + * Prepares the body of the email with information regarding a crash event. + */ + private _crashInstructions: string | undefined; + private generateCrashInstructions({ name, message, stack }: Error) { + if (!this._crashInstructions) { + this._crashInstructions = readFileSync(resolve(__dirname, "./crash_instructions.txt"), { encoding: "utf8" }); + } + return this._crashInstructions + .replace(/__name__/, name || "[no error name found]") + .replace(/__message__/, message || "[no error message found]") + .replace(/__stack__/, stack || "[no error stack found]") + .replace(/__signature__/, this.signature); + } + + /** + * This sends a pseudorandomly generated guid to the configuration's recipients, allowing them alone + * to kill the server via the /kill/:key route. + */ + private dispatchSessionPassword = async (key: string) => { + const { mainLog } = this.sessionMonitor; + mainLog(green("dispatching session key...")); + const failures = await Email.dispatchAll({ + to: this.notificationRecipients, + subject: "Dash Release Session Admin Authentication Key", + content: `The key for this session (started @ ${new Date().toUTCString()}) is ${key}.\n\n${this.signature}` + }); + if (failures) { + failures.map(({ recipient, error: { message } }) => this.sessionMonitor.mainLog(red(`dispatch failure @ ${recipient} (${yellow(message)})`))); + mainLog(red("distribution of session key experienced errors")); + } else { + mainLog(green("successfully distributed session key to recipients")); + } + } + + /** + * This sends an email with the generated crash report. + */ + private dispatchCrashReport = async (crashCause: Error) => { + const { mainLog } = this.sessionMonitor; + const failures = await Email.dispatchAll({ + to: this.notificationRecipients, + subject: "Dash Web Server Crash", + content: this.generateCrashInstructions(crashCause) + }); + if (failures) { + failures.map(({ recipient, error: { message } }) => this.sessionMonitor.mainLog(red(`dispatch failure @ ${recipient} (${yellow(message)})`))); + mainLog(red("distribution of crash notification experienced errors")); + } else { + mainLog(green("successfully distributed crash notification to recipients")); + } + } + + /** + * Logic for interfacing with Solr. Either starts it, + * stops it, or rebuilds its indicies. + */ + private executeSolrCommand = async (args: string[]) => { + const { exec, mainLog } = this.sessionMonitor; + const action = args[0]; + if (action === "index") { + exec("npx ts-node ./updateSearch.ts", { cwd: pathFromRoot("./src/server") }); + } else { + const command = `${onWindows ? "solr.cmd" : "solr"} ${args[0] === "start" ? "start" : "stop -p 8983"}`; + await exec(command, { cwd: "./solr-8.3.1/bin" }); + try { + await get("http://localhost:8983"); + mainLog(green("successfully connected to 8983 after running solr initialization")); + } catch { + mainLog(red("unable to connect at 8983 after running solr initialization")); + } + } + } + + /** + * Broadcast to all clients that their connection + * is no longer valid, and explain why / what to expect. + */ + private notifyClient: ExitHandler = reason => { + const { _socket } = WebSocket; + if (_socket) { + const message = typeof reason === "boolean" ? (reason ? "exit" : "temporary") : "crash"; + Utils.Emit(_socket, MessageStore.ConnectionTerminated, message); + } + } + + /** + * Performs a backup of the database, saved to the desktop subdirectory. + * This should work as is only on our specific release server. + */ + private backup = async () => this.sessionMonitor.exec("backup.bat", { cwd: this.releaseDesktop }); + + /** + * Compress either a brand new backup or the most recent backup and send it + * as an attachment to an email, dispatched to the requested recipient. + * @param mode specifies whether or not to make a new backup before exporting + * @param to the recipient of the email + */ + private async dispatchZippedDebugBackup(mode: string, to: string) { + const { mainLog } = this.sessionMonitor; + try { + // if desired, complete an immediate backup to send + if (mode === "active") { + await this.backup(); + mainLog("backup complete"); + } + + // ensure the directory for compressed backups exists + const backupsDirectory = `${this.releaseDesktop}/backups`; + const compressedDirectory = `${this.releaseDesktop}/compressed`; + if (!existsSync(compressedDirectory)) { + mkdirSync(compressedDirectory); + } + + // sort all backups by their modified time, and choose the most recent one + const target = readdirSync(backupsDirectory).map(filename => ({ + modifiedTime: statSync(`${backupsDirectory}/${filename}`).mtimeMs, + filename + })).sort((a, b) => b.modifiedTime - a.modifiedTime)[0].filename; + mainLog(`targeting ${target}...`); + + // create a zip file and to it, write the contents of the backup directory + const zipName = `${target}.zip`; + const zipPath = `${compressedDirectory}/${zipName}`; + const output = createWriteStream(zipPath); + const zip = Archiver('zip'); + zip.pipe(output); + zip.directory(`${backupsDirectory}/${target}/Dash`, false); + await zip.finalize(); + mainLog(`zip finalized with size ${statSync(zipPath).size} bytes, saved to ${zipPath}`); + + // dispatch the email to the recipient, containing the finalized zip file + const error = await Email.dispatch({ + to, + subject: `Remote debug: compressed backup of ${target}...`, + content: this.generateDebugInstructions(zipName, target), + attachments: [{ filename: zipName, path: zipPath }] + }); + + // indicate success or failure + mainLog(`${error === null ? green("successfully dispatched") : red("failed to dispatch")} ${zipName} to ${cyan(to)}`); + error && mainLog(red(error.message)); + } catch (error) { + mainLog(red("unable to dispatch zipped backup...")); + mainLog(red(error.message)); + } + } + +} \ No newline at end of file diff --git a/src/server/DashSession/crash_instructions.txt b/src/server/DashSession/crash_instructions.txt new file mode 100644 index 000000000..65417919d --- /dev/null +++ b/src/server/DashSession/crash_instructions.txt @@ -0,0 +1,14 @@ +You, as a Dash Administrator, are being notified of a server crash event. Here's what we know: + +name: +__name__ + +message: +__message__ + +stack: +__stack__ + +The server is already restarting itself, but if you're concerned, use the Remote Desktop Connection to monitor progress. + +__signature__ \ No newline at end of file diff --git a/src/server/DashSession/remote_debug_instructions.txt b/src/server/DashSession/remote_debug_instructions.txt new file mode 100644 index 000000000..c279c460a --- /dev/null +++ b/src/server/DashSession/remote_debug_instructions.txt @@ -0,0 +1,16 @@ +Instructions: + +Download this attachment, open your downloads folder and find this file (__zipname__). +Right click on the zip file and select 'Extract to __target__\'. +Open up the command line, and remember that you can get the path to any file or directory by literally dragging it from the file system and dropping it onto the terminal. +Unless it's in your path, you'll want to navigate to the MongoDB bin directory, given for Windows: + +cd '/c/Program Files/MongoDB/Server/[your version, i.e. 4.0, goes here]/bin' + +Then run the following command (if you're in the bin folder, make that ./mongorestore ...): + +mongorestore --gzip [/path/to/directory/you/just/unzipped] --db Dash + +Assuming everything runs well, this will mirror your local database with that of the server. Now, just start the server locally and debug. + +__signature__ \ No newline at end of file diff --git a/src/server/DashSessionAgent.ts b/src/server/DashSessionAgent.ts deleted file mode 100644 index 3073e69c3..000000000 --- a/src/server/DashSessionAgent.ts +++ /dev/null @@ -1,168 +0,0 @@ -import { Email, pathFromRoot } from "./ActionUtilities"; -import { red, yellow, green, cyan } from "colors"; -import { get } from "request-promise"; -import { Utils } from "../Utils"; -import { WebSocket } from "./Websocket/Websocket"; -import { MessageStore } from "./Message"; -import { launchServer, onWindows } from "."; -import { existsSync, mkdirSync, readdirSync, statSync, createWriteStream, readFileSync } from "fs"; -import * as Archiver from "archiver"; -import { resolve } from "path"; -import { AppliedSessionAgent, ExitHandler } from "./session/agents/applied_session_agent"; -import { Monitor } from "./session/agents/monitor"; -import { ServerWorker } from "./session/agents/server_worker"; - -/** - * If we're the monitor (master) thread, we should launch the monitor logic for the session. - * Otherwise, we must be on a worker thread that was spawned *by* the monitor (master) thread, and thus - * our job should be to run the server. - */ -export class DashSessionAgent extends AppliedSessionAgent { - - private readonly notificationRecipients = ["samuel_wilkins@brown.edu"]; - private readonly signature = "-Dash Server Session Manager"; - private readonly releaseDesktop = pathFromRoot("../../Desktop"); - private _instructions: string | undefined; - private get instructions() { - if (!this._instructions) { - this._instructions = readFileSync(resolve(__dirname, "./remote_debug_instructions.txt"), { encoding: "utf8" }); - } - return this._instructions; - } - - protected async launchMonitor() { - const monitor = Monitor.Create(this.notifiers); - monitor.addReplCommand("pull", [], () => monitor.exec("git pull")); - monitor.addReplCommand("solr", [/start|stop|index/], this.executeSolrCommand); - monitor.addReplCommand("backup", [], this.backup); - monitor.addReplCommand("debug", [/active|passive/, /\S+\@\S+/], async ([mode, recipient]) => this.dispatchZippedDebugBackup(mode, recipient)); - monitor.addServerMessageListener("backup", this.backup); - monitor.addServerMessageListener("debug", ({ args: { mode, recipient } }) => this.dispatchZippedDebugBackup(mode, recipient)); - return monitor; - } - - protected async launchServerWorker() { - const worker = ServerWorker.Create(launchServer); // server initialization delegated to worker - worker.addExitHandler(this.notifyClient); - return worker; - } - - private readonly notifiers: Monitor.NotifierHooks = { - key: async key => { - // this sends a pseudorandomly generated guid to the configuration's recipients, allowing them alone - // to kill the server via the /kill/:key route - const content = `The key for this session (started @ ${new Date().toUTCString()}) is ${key}.\n\n${this.signature}`; - const failures = await Email.dispatchAll({ - to: this.notificationRecipients, - subject: "Dash Release Session Admin Authentication Key", - content - }); - if (failures) { - failures.map(({ recipient, error: { message } }) => this.sessionMonitor.mainLog(red(`dispatch failure @ ${recipient} (${yellow(message)})`))); - return false; - } - return true; - }, - crash: async ({ name, message, stack }) => { - const body = [ - "You, as a Dash Administrator, are being notified of a server crash event. Here's what we know:", - `name:\n${name}`, - `message:\n${message}`, - `stack:\n${stack}`, - "The server is already restarting itself, but if you're concerned, use the Remote Desktop Connection to monitor progress.", - ].join("\n\n"); - const content = `${body}\n\n${this.signature}`; - const failures = await Email.dispatchAll({ - to: this.notificationRecipients, - subject: "Dash Web Server Crash", - content - }); - if (failures) { - failures.map(({ recipient, error: { message } }) => this.sessionMonitor.mainLog(red(`dispatch failure @ ${recipient} (${yellow(message)})`))); - return false; - } - return true; - } - }; - - private executeSolrCommand = async (args: string[]) => { - const { exec, mainLog } = this.sessionMonitor; - const action = args[0]; - if (action === "index") { - exec("npx ts-node ./updateSearch.ts", { cwd: pathFromRoot("./src/server") }); - } else { - const command = `${onWindows ? "solr.cmd" : "solr"} ${args[0] === "start" ? "start" : "stop -p 8983"}`; - await exec(command, { cwd: "./solr-8.3.1/bin" }); - try { - await get("http://localhost:8983"); - mainLog(green("successfully connected to 8983 after running solr initialization")); - } catch { - mainLog(red("unable to connect at 8983 after running solr initialization")); - } - } - } - - private notifyClient: ExitHandler = reason => { - const { _socket } = WebSocket; - if (_socket) { - const message = typeof reason === "boolean" ? (reason ? "exit" : "temporary") : "crash"; - Utils.Emit(_socket, MessageStore.ConnectionTerminated, message); - } - } - - private backup = async () => this.sessionMonitor.exec("backup.bat", { cwd: this.releaseDesktop }); - - private async dispatchZippedDebugBackup(mode: string, to: string) { - const { mainLog } = this.sessionMonitor; - try { - // if desired, complete an immediate backup to send - if (mode === "active") { - await this.backup(); - mainLog("backup complete"); - } - - // ensure the directory for compressed backups exists - const backupsDirectory = `${this.releaseDesktop}/backups`; - const compressedDirectory = `${this.releaseDesktop}/compressed`; - if (!existsSync(compressedDirectory)) { - mkdirSync(compressedDirectory); - } - - // sort all backups by their modified time, and choose the most recent one - const target = readdirSync(backupsDirectory).map(filename => ({ - modifiedTime: statSync(`${backupsDirectory}/${filename}`).mtimeMs, - filename - })).sort((a, b) => b.modifiedTime - a.modifiedTime)[0].filename; - mainLog(`targeting ${target}...`); - - // create a zip file and to it, write the contents of the backup directory - const zipName = `${target}.zip`; - const zipPath = `${compressedDirectory}/${zipName}`; - const output = createWriteStream(zipPath); - const zip = Archiver('zip'); - zip.pipe(output); - zip.directory(`${backupsDirectory}/${target}/Dash`, false); - await zip.finalize(); - mainLog(`zip finalized with size ${statSync(zipPath).size} bytes, saved to ${zipPath}`); - - // dispatch the email to the recipient, containing the finalized zip file - const error = await Email.dispatch({ - to, - subject: `Remote debug: compressed backup of ${target}...`, - content: this.instructions // prepare the body of the email with instructions on restoring the local database - .replace(/__zipname__/, zipName) - .replace(/__target__/, target) - .replace(/__signature__/, this.signature), - attachments: [{ filename: zipName, path: zipPath }] - }); - - // indicate success or failure - mainLog(`${error === null ? green("successfully dispatched") : red("failed to dispatch")} ${zipName} to ${cyan(to)}`); - error && mainLog(red(error.message)); - } catch (error) { - mainLog(red("unable to dispatch zipped backup...")); - mainLog(red(error.message)); - } - } - -} \ No newline at end of file diff --git a/src/server/index.ts b/src/server/index.ts index de56d31bf..0cce0dc54 100644 --- a/src/server/index.ts +++ b/src/server/index.ts @@ -22,7 +22,7 @@ import GeneralGoogleManager from "./ApiManagers/GeneralGoogleManager"; import GooglePhotosManager from "./ApiManagers/GooglePhotosManager"; import { Logger } from "./ProcessFactory"; import { yellow } from "colors"; -import { DashSessionAgent } from "./DashSessionAgent"; +import { DashSessionAgent } from "./DashSession/DashSessionAgent"; import SessionManager from "./ApiManagers/SessionManager"; import { AppliedSessionAgent } from "./session/agents/applied_session_agent"; diff --git a/src/server/remote_debug_instructions.txt b/src/server/remote_debug_instructions.txt deleted file mode 100644 index c279c460a..000000000 --- a/src/server/remote_debug_instructions.txt +++ /dev/null @@ -1,16 +0,0 @@ -Instructions: - -Download this attachment, open your downloads folder and find this file (__zipname__). -Right click on the zip file and select 'Extract to __target__\'. -Open up the command line, and remember that you can get the path to any file or directory by literally dragging it from the file system and dropping it onto the terminal. -Unless it's in your path, you'll want to navigate to the MongoDB bin directory, given for Windows: - -cd '/c/Program Files/MongoDB/Server/[your version, i.e. 4.0, goes here]/bin' - -Then run the following command (if you're in the bin folder, make that ./mongorestore ...): - -mongorestore --gzip [/path/to/directory/you/just/unzipped] --db Dash - -Assuming everything runs well, this will mirror your local database with that of the server. Now, just start the server locally and debug. - -__signature__ \ No newline at end of file diff --git a/src/server/repl.ts b/src/server/repl.ts deleted file mode 100644 index ad55b6aaa..000000000 --- a/src/server/repl.ts +++ /dev/null @@ -1,128 +0,0 @@ -import { createInterface, Interface } from "readline"; -import { red, green, white } from "colors"; - -export interface Configuration { - identifier: () => string | string; - onInvalid?: (command: string, validCommand: boolean) => string | string; - onValid?: (success?: string) => string | string; - isCaseSensitive?: boolean; -} - -export type ReplAction = (parsedArgs: Array) => any | Promise; -export interface Registration { - argPatterns: RegExp[]; - action: ReplAction; -} - -export default class Repl { - private identifier: () => string | string; - private onInvalid: ((command: string, validCommand: boolean) => string) | string; - private onValid: ((success: string) => string) | string; - private isCaseSensitive: boolean; - private commandMap = new Map(); - public interface: Interface; - private busy = false; - private keys: string | undefined; - - constructor({ identifier: prompt, onInvalid, onValid, isCaseSensitive }: Configuration) { - this.identifier = prompt; - this.onInvalid = onInvalid || this.usage; - this.onValid = onValid || this.success; - this.isCaseSensitive = isCaseSensitive ?? true; - this.interface = createInterface(process.stdin, process.stdout).on('line', this.considerInput); - } - - private resolvedIdentifier = () => typeof this.identifier === "string" ? this.identifier : this.identifier(); - - private usage = (command: string, validCommand: boolean) => { - if (validCommand) { - const formatted = white(command); - const patterns = green(this.commandMap.get(command)!.map(({ argPatterns }) => `${formatted} ${argPatterns.join(" ")}`).join('\n')); - return `${this.resolvedIdentifier()}\nthe given arguments do not match any registered patterns for ${formatted}\nthe list of valid argument patterns is given by:\n${patterns}`; - } else { - const resolved = this.keys; - if (resolved) { - return resolved; - } - const members: string[] = []; - const keys = this.commandMap.keys(); - let next: IteratorResult; - while (!(next = keys.next()).done) { - members.push(next.value); - } - return `${this.resolvedIdentifier()} commands: { ${members.sort().join(", ")} }`; - } - } - - private success = (command: string) => `${this.resolvedIdentifier()} completed execution of ${white(command)}`; - - public registerCommand = (basename: string, argPatterns: (RegExp | string)[], action: ReplAction) => { - const existing = this.commandMap.get(basename); - const converted = argPatterns.map(input => input instanceof RegExp ? input : new RegExp(input)); - const registration = { argPatterns: converted, action }; - if (existing) { - existing.push(registration); - } else { - this.commandMap.set(basename, [registration]); - } - } - - private invalid = (command: string, validCommand: boolean) => { - console.log(red(typeof this.onInvalid === "string" ? this.onInvalid : this.onInvalid(command, validCommand))); - this.busy = false; - } - - private valid = (command: string) => { - console.log(green(typeof this.onValid === "string" ? this.onValid : this.onValid(command))); - this.busy = false; - } - - private considerInput = async (line: string) => { - if (this.busy) { - console.log(red("Busy")); - return; - } - this.busy = true; - line = line.trim(); - if (this.isCaseSensitive) { - line = line.toLowerCase(); - } - const [command, ...args] = line.split(/\s+/g); - if (!command) { - return this.invalid(command, false); - } - const registered = this.commandMap.get(command); - if (registered) { - const { length } = args; - const candidates = registered.filter(({ argPatterns: { length: count } }) => count === length); - for (const { argPatterns, action } of candidates) { - const parsed: string[] = []; - let matched = true; - if (length) { - for (let i = 0; i < length; i++) { - let matches: RegExpExecArray | null; - if ((matches = argPatterns[i].exec(args[i])) === null) { - matched = false; - break; - } - parsed.push(matches[0]); - } - } - if (!length || matched) { - const result = action(parsed); - const resolve = () => this.valid(`${command} ${parsed.join(" ")}`); - if (result instanceof Promise) { - result.then(resolve); - } else { - resolve(); - } - return; - } - } - this.invalid(command, true); - } else { - this.invalid(command, false); - } - } - -} \ No newline at end of file diff --git a/src/server/session/agents/applied_session_agent.ts b/src/server/session/agents/applied_session_agent.ts index cb7f63c34..53293d3bf 100644 --- a/src/server/session/agents/applied_session_agent.ts +++ b/src/server/session/agents/applied_session_agent.ts @@ -8,8 +8,8 @@ export abstract class AppliedSessionAgent { // the following two methods allow the developer to create a custom // session and use the built in customization options for each thread - protected abstract async launchMonitor(): Promise; - protected abstract async launchServerWorker(): Promise; + protected abstract async initializeMonitor(monitor: Monitor): Promise; + protected abstract async initializeServerWorker(): Promise; private launched = false; @@ -43,9 +43,10 @@ export abstract class AppliedSessionAgent { if (!this.launched) { this.launched = true; if (isMaster) { - this.sessionMonitorRef = await this.launchMonitor(); + await this.initializeMonitor(this.sessionMonitorRef = Monitor.Create()); + this.sessionMonitorRef.finalize(); } else { - this.serverWorkerRef = await this.launchServerWorker(); + this.serverWorkerRef = await this.initializeServerWorker(); } } else { throw new Error("Cannot launch a session thread more than once per process."); diff --git a/src/server/session/agents/monitor.ts b/src/server/session/agents/monitor.ts index 673be99be..e1709f5e6 100644 --- a/src/server/session/agents/monitor.ts +++ b/src/server/session/agents/monitor.ts @@ -1,6 +1,6 @@ import { ExitHandler } from "./applied_session_agent"; import { Configuration, configurationSchema, defaultConfig, Identifiers, colorMapping } from "../utilities/session_config"; -import Repl, { ReplAction } from "../../repl"; +import Repl, { ReplAction } from "../utilities/repl"; import { isWorker, setupMaster, on, Worker, fork } from "cluster"; import { IPC } from "../utilities/ipc"; import { red, cyan, white, yellow, blue, green } from "colors"; @@ -9,39 +9,24 @@ import { Utils } from "../../../Utils"; import { validate, ValidationError } from "jsonschema"; import { Utilities } from "../utilities/utilities"; import { readFileSync } from "fs"; - -export namespace Monitor { - - export interface NotifierHooks { - key?: (key: string) => (boolean | Promise); - crash?: (error: Error) => (boolean | Promise); - } - - export interface Action { - message: string; - args: any; - } - - export type ServerMessageHandler = (action: Action) => void | Promise; - -} +import { EventEmitter } from "events"; /** * Validates and reads the configuration file, accordingly builds a child process factory * and spawns off an initial process that will respawn as predecessors die. */ -export class Monitor { +export class Monitor extends EventEmitter { private static count = 0; + private finalized = false; private exitHandlers: ExitHandler[] = []; - private readonly notifiers: Monitor.NotifierHooks | undefined; private readonly config: Configuration; private onMessage: { [message: string]: Monitor.ServerMessageHandler[] | undefined } = {}; private activeWorker: Worker | undefined; private key: string | undefined; private repl: Repl; - public static Create(notifiers?: Monitor.NotifierHooks) { + public static Create() { if (isWorker) { IPC.dispatchMessage(process, { action: { @@ -58,7 +43,7 @@ export class Monitor { console.error(red("cannot create more than one monitor.")); process.exit(1); } else { - return new Monitor(notifiers); + return new Monitor(); } } @@ -141,14 +126,12 @@ export class Monitor { */ public clearServerMessageListeners = (message: string) => this.onMessage[message] = undefined; - private constructor(notifiers?: Monitor.NotifierHooks) { - this.notifiers = notifiers; + private constructor() { + super(); console.log(this.timestamp(), cyan("initializing session...")); - this.config = this.loadAndValidateConfiguration(); - this.initializeSessionKey(); // determines whether or not we see the compilation / initialization / runtime output of each child server process const output = this.config.showServerOutput ? "inherit" : "ignore"; setupMaster({ stdio: ["ignore", output, output, "ipc"] }); @@ -174,6 +157,14 @@ export class Monitor { }); this.repl = this.initializeRepl(); + } + + public finalize = (): void => { + if (this.finalized) { + throw new Error("Session monitor is already finalized"); + } + this.finalized = true; + this.emit(Monitor.IntrinsicEvents.KeyGenerated, this.key = Utils.GenerateGuid()); this.spawn(); } @@ -197,22 +188,6 @@ export class Monitor { console.log(this.timestamp(), this.config.identifiers.exec.text, ...optionalParams); } - /** - * If the caller has indicated an interest - * in being notified of this feature, creates - * a GUID for this session that can, for example, - * be used as authentication for killing the server - * (checked externally). - */ - private initializeSessionKey = async (): Promise => { - if (this.notifiers?.key) { - this.key = Utils.GenerateGuid(); - const success = await this.notifiers.key(this.key); - const statement = success ? green("distributed session key to recipients") : red("distribution of session key failed"); - this.mainLog(statement); - } - } - /** * Reads in configuration .json file only once, in the master thread * and pass down any variables the pertinent to the child processes as environment variables. @@ -351,12 +326,10 @@ export class Monitor { this.killSession(reason, graceful, errorCode); break; case "notify_crash": - if (this.notifiers?.crash) { - const { error } = args; - const success = await this.notifiers.crash(error); - const statement = success ? green("distributed crash notification to recipients") : red("distribution of crash notification failed"); - this.mainLog(statement); - } + this.emit(Monitor.IntrinsicEvents.CrashDetected, args.error); + break; + case Monitor.IntrinsicEvents.ServerRunning: + this.emit(Monitor.IntrinsicEvents.ServerRunning, args.firstTime); break; case "set_port": const { port, value, immediateRestart } = args; @@ -374,4 +347,21 @@ export class Monitor { }); } +} + +export namespace Monitor { + + export interface Action { + message: string; + args: any; + } + + export type ServerMessageHandler = (action: Action) => void | Promise; + + export enum IntrinsicEvents { + KeyGenerated = "key_generated", + CrashDetected = "crash_detected", + ServerRunning = "server_running" + } + } \ No newline at end of file diff --git a/src/server/session/agents/server_worker.ts b/src/server/session/agents/server_worker.ts index 6ed385151..e9fdaf923 100644 --- a/src/server/session/agents/server_worker.ts +++ b/src/server/session/agents/server_worker.ts @@ -3,6 +3,7 @@ import { isMaster } from "cluster"; import { IPC } from "../utilities/ipc"; import { red, green, white, yellow } from "colors"; import { get } from "request-promise"; +import { Monitor } from "./monitor"; /** * Effectively, each worker repairs the connection to the server by reintroducing a consistent state @@ -19,6 +20,7 @@ export class ServerWorker { private pollingFailureTolerance: number; private pollTarget: string; private serverPort: number; + private isInitialized = false; public static Create(work: Function) { if (isMaster) { @@ -136,6 +138,8 @@ export class ServerWorker { if (!this.shouldServerBeResponsive) { // notify monitor thread that the server is up and running this.lifecycleNotification(green(`listening on ${this.serverPort}...`)); + this.sendMonitorAction(Monitor.IntrinsicEvents.ServerRunning, { firstTime: !this.isInitialized }); + this.isInitialized = true; } this.shouldServerBeResponsive = true; } catch (error) { diff --git a/src/server/session/utilities/repl.ts b/src/server/session/utilities/repl.ts new file mode 100644 index 000000000..643141286 --- /dev/null +++ b/src/server/session/utilities/repl.ts @@ -0,0 +1,128 @@ +import { createInterface, Interface } from "readline"; +import { red, green, white } from "colors"; + +export interface Configuration { + identifier: () => string | string; + onInvalid?: (command: string, validCommand: boolean) => string | string; + onValid?: (success?: string) => string | string; + isCaseSensitive?: boolean; +} + +export type ReplAction = (parsedArgs: Array) => any | Promise; +export interface Registration { + argPatterns: RegExp[]; + action: ReplAction; +} + +export default class Repl { + private identifier: () => string | string; + private onInvalid: ((command: string, validCommand: boolean) => string) | string; + private onValid: ((success: string) => string) | string; + private isCaseSensitive: boolean; + private commandMap = new Map(); + public interface: Interface; + private busy = false; + private keys: string | undefined; + + constructor({ identifier: prompt, onInvalid, onValid, isCaseSensitive }: Configuration) { + this.identifier = prompt; + this.onInvalid = onInvalid || this.usage; + this.onValid = onValid || this.success; + this.isCaseSensitive = isCaseSensitive ?? true; + this.interface = createInterface(process.stdin, process.stdout).on('line', this.considerInput); + } + + private resolvedIdentifier = () => typeof this.identifier === "string" ? this.identifier : this.identifier(); + + private usage = (command: string, validCommand: boolean) => { + if (validCommand) { + const formatted = white(command); + const patterns = green(this.commandMap.get(command)!.map(({ argPatterns }) => `${formatted} ${argPatterns.join(" ")}`).join('\n')); + return `${this.resolvedIdentifier()}\nthe given arguments do not match any registered patterns for ${formatted}\nthe list of valid argument patterns is given by:\n${patterns}`; + } else { + const resolved = this.keys; + if (resolved) { + return resolved; + } + const members: string[] = []; + const keys = this.commandMap.keys(); + let next: IteratorResult; + while (!(next = keys.next()).done) { + members.push(next.value); + } + return `${this.resolvedIdentifier()} commands: { ${members.sort().join(", ")} }`; + } + } + + private success = (command: string) => `${this.resolvedIdentifier()} completed local execution of ${white(command)}`; + + public registerCommand = (basename: string, argPatterns: (RegExp | string)[], action: ReplAction) => { + const existing = this.commandMap.get(basename); + const converted = argPatterns.map(input => input instanceof RegExp ? input : new RegExp(input)); + const registration = { argPatterns: converted, action }; + if (existing) { + existing.push(registration); + } else { + this.commandMap.set(basename, [registration]); + } + } + + private invalid = (command: string, validCommand: boolean) => { + console.log(red(typeof this.onInvalid === "string" ? this.onInvalid : this.onInvalid(command, validCommand))); + this.busy = false; + } + + private valid = (command: string) => { + console.log(green(typeof this.onValid === "string" ? this.onValid : this.onValid(command))); + this.busy = false; + } + + private considerInput = async (line: string) => { + if (this.busy) { + console.log(red("Busy")); + return; + } + this.busy = true; + line = line.trim(); + if (this.isCaseSensitive) { + line = line.toLowerCase(); + } + const [command, ...args] = line.split(/\s+/g); + if (!command) { + return this.invalid(command, false); + } + const registered = this.commandMap.get(command); + if (registered) { + const { length } = args; + const candidates = registered.filter(({ argPatterns: { length: count } }) => count === length); + for (const { argPatterns, action } of candidates) { + const parsed: string[] = []; + let matched = true; + if (length) { + for (let i = 0; i < length; i++) { + let matches: RegExpExecArray | null; + if ((matches = argPatterns[i].exec(args[i])) === null) { + matched = false; + break; + } + parsed.push(matches[0]); + } + } + if (!length || matched) { + const result = action(parsed); + const resolve = () => this.valid(`${command} ${parsed.join(" ")}`); + if (result instanceof Promise) { + result.then(resolve); + } else { + resolve(); + } + return; + } + } + this.invalid(command, true); + } else { + this.invalid(command, false); + } + } + +} \ No newline at end of file -- cgit v1.2.3-70-g09d2 From 27c93abd49ca8a519d2aa3cf7938434fe25947d7 Mon Sep 17 00:00:00 2001 From: Sam Wilkins Date: Sat, 11 Jan 2020 09:54:48 -0500 Subject: extends message, removed duplicate handlers, IPC streamlined --- src/server/ApiManagers/SessionManager.ts | 4 +- src/server/DashSession/DashSessionAgent.ts | 15 ++-- src/server/session/agents/applied_session_agent.ts | 8 +- src/server/session/agents/message_router.ts | 45 +++++++++++ src/server/session/agents/monitor.ts | 30 +++---- src/server/session/agents/server_worker.ts | 17 ++-- src/server/session/utilities/ipc.ts | 93 ++++++++-------------- 7 files changed, 116 insertions(+), 96 deletions(-) create mode 100644 src/server/session/agents/message_router.ts (limited to 'src/server/session/agents/applied_session_agent.ts') diff --git a/src/server/ApiManagers/SessionManager.ts b/src/server/ApiManagers/SessionManager.ts index 21103fdd5..91ef7e298 100644 --- a/src/server/ApiManagers/SessionManager.ts +++ b/src/server/ApiManagers/SessionManager.ts @@ -33,7 +33,7 @@ export default class SessionManager extends ApiManager { const { mode } = req.params; if (["passive", "active"].includes(mode)) { const recipient = req.params.recipient || DashSessionAgent.notificationRecipient; - const response = await sessionAgent.serverWorker.sendMonitorAction("debug", { mode, recipient }, true); + const response = await sessionAgent.serverWorker.emitToMonitor("debug", { mode, recipient }, true); if (response instanceof Error) { res.send(response); } else { @@ -49,7 +49,7 @@ export default class SessionManager extends ApiManager { method: Method.GET, subscription: this.secureSubscriber("backup"), secureHandler: this.authorizedAction(async ({ res }) => { - const response = await sessionAgent.serverWorker.sendMonitorAction("backup"); + const response = await sessionAgent.serverWorker.emitToMonitor("backup"); if (response instanceof Error) { res.send(response); } else { diff --git a/src/server/DashSession/DashSessionAgent.ts b/src/server/DashSession/DashSessionAgent.ts index 0d9486757..b7e741525 100644 --- a/src/server/DashSession/DashSessionAgent.ts +++ b/src/server/DashSession/DashSessionAgent.ts @@ -11,6 +11,7 @@ import { resolve } from "path"; import { AppliedSessionAgent, ExitHandler } from "../session/agents/applied_session_agent"; import { Monitor } from "../session/agents/monitor"; import { ServerWorker } from "../session/agents/server_worker"; +import { Message } from "../session/utilities/ipc"; /** * If we're the monitor (master) thread, we should launch the monitor logic for the session. @@ -26,14 +27,14 @@ export class DashSessionAgent extends AppliedSessionAgent { * The core method invoked when the single master thread is initialized. * Installs event hooks, repl commands and additional IPC listeners. */ - protected async initializeMonitor(monitor: Monitor) { + protected async initializeMonitor(monitor: Monitor, sessionKey: string) { + await this.dispatchSessionPassword(sessionKey); monitor.addReplCommand("pull", [], () => monitor.exec("git pull")); monitor.addReplCommand("solr", [/start|stop|index/], this.executeSolrCommand); monitor.addReplCommand("backup", [], this.backup); monitor.addReplCommand("debug", [/active|passive/, /\S+\@\S+/], async ([mode, recipient]) => this.dispatchZippedDebugBackup(mode, recipient)); - monitor.addServerMessageListener("backup", this.backup); - monitor.addServerMessageListener("debug", ({ args: { mode, recipient } }) => this.dispatchZippedDebugBackup(mode, recipient)); - monitor.onKeyGenerated(this.dispatchSessionPassword); + monitor.addMessageListener("backup", this.backup); + monitor.addMessageListener("debug", ({ args: { mode, recipient } }) => this.dispatchZippedDebugBackup(mode, recipient)); monitor.onCrashDetected(this.dispatchCrashReport); } @@ -80,14 +81,14 @@ export class DashSessionAgent extends AppliedSessionAgent { * This sends a pseudorandomly generated guid to the configuration's recipients, allowing them alone * to kill the server via the /kill/:key route. */ - private dispatchSessionPassword = async (key: string) => { + private dispatchSessionPassword = async (sessionKey: string) => { const { mainLog } = this.sessionMonitor; const { notificationRecipient } = DashSessionAgent; mainLog(green("dispatching session key...")); const error = await Email.dispatch({ to: notificationRecipient, subject: "Dash Release Session Admin Authentication Key", - content: `The key for this session (started @ ${new Date().toUTCString()}) is ${key}.\n\n${this.signature}` + content: `The key for this session (started @ ${new Date().toUTCString()}) is ${sessionKey}.\n\n${this.signature}` }); if (error) { this.sessionMonitor.mainLog(red(`dispatch failure @ ${notificationRecipient} (${yellow(error.message)})`)); @@ -100,7 +101,7 @@ export class DashSessionAgent extends AppliedSessionAgent { /** * This sends an email with the generated crash report. */ - private dispatchCrashReport = async (crashCause: Error) => { + private dispatchCrashReport = async ({ args: { error: crashCause } }: Message) => { const { mainLog } = this.sessionMonitor; const { notificationRecipient } = DashSessionAgent; const error = await Email.dispatch({ diff --git a/src/server/session/agents/applied_session_agent.ts b/src/server/session/agents/applied_session_agent.ts index 53293d3bf..48226dab6 100644 --- a/src/server/session/agents/applied_session_agent.ts +++ b/src/server/session/agents/applied_session_agent.ts @@ -1,6 +1,7 @@ import { isMaster } from "cluster"; import { Monitor } from "./monitor"; import { ServerWorker } from "./server_worker"; +import { Utils } from "../../../Utils"; export type ExitHandler = (reason: Error | boolean) => void | Promise; @@ -8,7 +9,7 @@ export abstract class AppliedSessionAgent { // the following two methods allow the developer to create a custom // session and use the built in customization options for each thread - protected abstract async initializeMonitor(monitor: Monitor): Promise; + protected abstract async initializeMonitor(monitor: Monitor, key: string): Promise; protected abstract async initializeServerWorker(): Promise; private launched = false; @@ -21,7 +22,7 @@ export abstract class AppliedSessionAgent { private sessionMonitorRef: Monitor | undefined; public get sessionMonitor(): Monitor { if (!isMaster) { - this.serverWorker.sendMonitorAction("kill", { + this.serverWorker.emitToMonitor("kill", { graceful: false, reason: "Cannot access the session monitor directly from the server worker thread.", errorCode: 1 @@ -43,7 +44,8 @@ export abstract class AppliedSessionAgent { if (!this.launched) { this.launched = true; if (isMaster) { - await this.initializeMonitor(this.sessionMonitorRef = Monitor.Create()); + const sessionKey = Utils.GenerateGuid(); + await this.initializeMonitor(this.sessionMonitorRef = Monitor.Create(sessionKey), sessionKey); this.sessionMonitorRef.finalize(); } else { this.serverWorkerRef = await this.initializeServerWorker(); diff --git a/src/server/session/agents/message_router.ts b/src/server/session/agents/message_router.ts new file mode 100644 index 000000000..5848e27ab --- /dev/null +++ b/src/server/session/agents/message_router.ts @@ -0,0 +1,45 @@ +import { MessageHandler, Message } from "../utilities/ipc"; + +export default abstract class MessageRouter { + + private onMessage: { [name: string]: MessageHandler[] | undefined } = {}; + + /** + * Add a listener at this message. When the monitor process + * receives a message, it will invoke all registered functions. + */ + public addMessageListener = (name: string, handler: MessageHandler, exclusive = false) => { + const handlers = this.onMessage[name]; + if (exclusive || !handlers) { + this.onMessage[name] = [handler]; + } else { + handlers.push(handler); + } + } + + /** + * Unregister a given listener at this message. + */ + public removeMessageListener = (name: string, handler: MessageHandler) => { + const handlers = this.onMessage[name]; + if (handlers) { + const index = handlers.indexOf(handler); + if (index > -1) { + handlers.splice(index, 1); + } + } + } + + /** + * Unregister all listeners at this message. + */ + public clearMessageListeners = (...names: string[]) => names.map(name => this.onMessage[name] = undefined); + + protected route: MessageHandler = async ({ name, args }) => { + const handlers = this.onMessage[name]; + if (handlers) { + await Promise.all(handlers.map(handler => handler({ name, args }))); + } + } + +} \ No newline at end of file diff --git a/src/server/session/agents/monitor.ts b/src/server/session/agents/monitor.ts index 18fa6df24..96f1f8130 100644 --- a/src/server/session/agents/monitor.ts +++ b/src/server/session/agents/monitor.ts @@ -2,20 +2,19 @@ import { ExitHandler } from "./applied_session_agent"; import { Configuration, configurationSchema, defaultConfig, Identifiers, colorMapping } from "../utilities/session_config"; import Repl, { ReplAction } from "../utilities/repl"; import { isWorker, setupMaster, on, Worker, fork } from "cluster"; -import { PromisifiedIPCManager, suffix, IPC, Message } from "../utilities/ipc"; +import { PromisifiedIPCManager, suffix, IPC, MessageHandler, Message } from "../utilities/ipc"; import { red, cyan, white, yellow, blue } from "colors"; import { exec, ExecOptions } from "child_process"; -import { Utils } from "../../../Utils"; import { validate, ValidationError } from "jsonschema"; import { Utilities } from "../utilities/utilities"; import { readFileSync } from "fs"; -import { EventEmitter } from "events"; +import MessageRouter from "./message_router"; /** * Validates and reads the configuration file, accordingly builds a child process factory * and spawns off an initial process that will respawn as predecessors die. */ -export class Monitor extends EventEmitter { +export class Monitor extends MessageRouter { private static IPCManager: PromisifiedIPCManager; private static count = 0; private finalized = false; @@ -25,7 +24,7 @@ export class Monitor extends EventEmitter { private key: string | undefined; private repl: Repl; - public static Create() { + public static Create(sessionKey: string) { if (isWorker) { IPC(process).emit("kill", { reason: "cannot create a monitor on the worker process.", @@ -37,13 +36,12 @@ export class Monitor extends EventEmitter { console.error(red("cannot create more than one monitor.")); process.exit(1); } else { - return new Monitor(); + return new Monitor(sessionKey); } } - public onCrashDetected = (listener: (...args: any[]) => void) => this.on(Monitor.IntrinsicEvents.CrashDetected, listener); - public onKeyGenerated = (listener: (...args: any[]) => void) => this.on(Monitor.IntrinsicEvents.KeyGenerated, listener); - public onServerRunning = (listener: (...args: any[]) => void) => this.on(Monitor.IntrinsicEvents.ServerRunning, listener); + public onCrashDetected = (listener: MessageHandler) => this.addMessageListener(Monitor.IntrinsicEvents.CrashDetected, listener); + public onServerRunning = (listener: MessageHandler) => this.addMessageListener(Monitor.IntrinsicEvents.ServerRunning, listener); /** * Kill this session and its active child @@ -93,10 +91,10 @@ export class Monitor extends EventEmitter { }); } - private constructor() { + private constructor(sessionKey: string) { super(); - console.log(this.timestamp(), cyan("initializing session...")); + this.key = sessionKey; this.config = this.loadAndValidateConfiguration(); // determines whether or not we see the compilation / initialization / runtime output of each child server process @@ -131,7 +129,6 @@ export class Monitor extends EventEmitter { throw new Error("Session monitor is already finalized"); } this.finalized = true; - this.emit(Monitor.IntrinsicEvents.KeyGenerated, this.key = Utils.GenerateGuid()); this.spawn(); } @@ -284,11 +281,10 @@ export class Monitor extends EventEmitter { Monitor.IPCManager = IPC(this.activeWorker); this.mainLog(cyan(`spawned new server worker with process id ${this.activeWorker?.process.pid}`)); - const { addMessageListener } = Monitor.IPCManager; - addMessageListener("kill", ({ args: { reason, graceful, errorCode } }) => this.killSession(reason, graceful, errorCode)); - addMessageListener(`notify_${Monitor.IntrinsicEvents.CrashDetected}`, ({ args: { error } }) => this.emit(Monitor.IntrinsicEvents.CrashDetected, error)); - addMessageListener(`notify_${Monitor.IntrinsicEvents.ServerRunning}`, ({ args: { firstTime } }) => this.emit(Monitor.IntrinsicEvents.ServerRunning, firstTime)); - addMessageListener("lifecycle", ({ args: { event } }) => console.log(this.timestamp(), `${this.config.identifiers.worker.text} lifecycle phase (${event})`)); + this.addMessageListener("kill", ({ args: { reason, graceful, errorCode } }) => this.killSession(reason, graceful, errorCode), true); + this.addMessageListener("lifecycle", ({ args: { event } }) => console.log(this.timestamp(), `${this.config.identifiers.worker.text} lifecycle phase (${event})`), true); + + Monitor.IPCManager.setRouter(this.route); } } diff --git a/src/server/session/agents/server_worker.ts b/src/server/session/agents/server_worker.ts index 9e471366a..01e1cf971 100644 --- a/src/server/session/agents/server_worker.ts +++ b/src/server/session/agents/server_worker.ts @@ -1,6 +1,7 @@ import { ExitHandler } from "./applied_session_agent"; import { isMaster } from "cluster"; import { PromisifiedIPCManager } from "../utilities/ipc"; +import MessageRouter from "./message_router"; import { red, green, white, yellow } from "colors"; import { get } from "request-promise"; import { Monitor } from "./monitor"; @@ -10,7 +11,7 @@ import { Monitor } from "./monitor"; * if its predecessor has died. It itself also polls the server heartbeat, and exits with a notification * email if the server encounters an uncaught exception or if the server cannot be reached. */ -export class ServerWorker { +export class ServerWorker extends MessageRouter { private static IPCManager = new PromisifiedIPCManager(process); private static count = 0; private shouldServerBeResponsive = false; @@ -58,6 +59,7 @@ export class ServerWorker { public emitToMonitor = (name: string, args?: any, expectResponse = false) => ServerWorker.IPCManager.emit(name, args, expectResponse); private constructor(work: Function) { + super(); this.lifecycleNotification(green(`initializing process... ${white(`[${process.execPath} ${process.execArgv.join(" ")}]`)}`)); const { pollingRoute, serverPort, pollingIntervalSeconds, pollingFailureTolerance } = process.env; @@ -76,10 +78,13 @@ export class ServerWorker { * server process. */ private configureProcess = () => { + ServerWorker.IPCManager.setRouter(this.route); // updates the local values of variables to the those sent from master - const { addMessageListener } = ServerWorker.IPCManager; - addMessageListener("updatePollingInterval", ({ args }) => this.pollingIntervalSeconds = args.newPollingIntervalSeconds); - addMessageListener("manualExit", async ({ args: { isSessionEnd } }) => { + this.addMessageListener("updatePollingInterval", ({ args }) => { + this.pollingIntervalSeconds = args.newPollingIntervalSeconds; + return new Promise(resolve => setTimeout(resolve, 1000 * 10)); + }); + this.addMessageListener("manualExit", async ({ args: { isSessionEnd } }) => { await this.executeExitHandlers(isSessionEnd); process.exit(0); }); @@ -110,7 +115,7 @@ export class ServerWorker { private proactiveUnplannedExit = async (error: Error): Promise => { this.shouldServerBeResponsive = false; // communicates via IPC to the master thread that it should dispatch a crash notification email - this.emitToMonitor(`notify_${Monitor.IntrinsicEvents.CrashDetected}`, { error }); + this.emitToMonitor(Monitor.IntrinsicEvents.CrashDetected, { error }); await this.executeExitHandlers(error); // notify master thread (which will log update in the console) of crash event via IPC this.lifecycleNotification(red(`crash event detected @ ${new Date().toUTCString()}`)); @@ -130,7 +135,7 @@ export class ServerWorker { if (!this.shouldServerBeResponsive) { // notify monitor thread that the server is up and running this.lifecycleNotification(green(`listening on ${this.serverPort}...`)); - this.emitToMonitor(`notify_${Monitor.IntrinsicEvents.ServerRunning}`, { firstTime: !this.isInitialized }); + this.emitToMonitor(Monitor.IntrinsicEvents.ServerRunning, { firstTime: !this.isInitialized }); this.isInitialized = true; } this.shouldServerBeResponsive = true; diff --git a/src/server/session/utilities/ipc.ts b/src/server/session/utilities/ipc.ts index 2faf9f63e..37aaa6757 100644 --- a/src/server/session/utilities/ipc.ts +++ b/src/server/session/utilities/ipc.ts @@ -8,95 +8,66 @@ export const suffix = isMaster ? Utils.GenerateGuid() : process.env.ipc_suffix; export interface Message { name: string; - args: any; + args?: any; } +type InternalMessage = Message & { metadata: any }; -export type MessageHandler = (message: Message) => any | Promise; +export type MessageHandler = (message: T) => any | Promise; export class PromisifiedIPCManager { - private onMessage: { [message: string]: MessageHandler[] | undefined } = {}; private readonly target: IPCTarget; private readonly ipc_id = `ipc_id_${suffix}`; - private readonly response_expected = `response_expected_${suffix}`; private readonly is_response = `is_response_${suffix}`; constructor(target: IPCTarget) { this.target = target; - - this.target.addListener("message", async ({ name, args }: Message) => { - let error: Error | undefined; - try { - const handlers = this.onMessage[name]; - if (handlers) { - await Promise.all(handlers.map(handler => handler({ name, args }))); - } - } catch (e) { - error = e; - } - if (args[this.response_expected] && this.target.send) { - const response: any = { error }; - response[this.ipc_id] = args[this.ipc_id]; - response[this.is_response] = true; - this.target.send(response); - } - }); - } - - /** - * Add a listener at this message. When the monitor process - * receives a message, it will invoke all registered functions. - */ - public addMessageListener = (name: string, handler: MessageHandler) => { - const handlers = this.onMessage[name]; - if (handlers) { - handlers.push(handler); - } else { - this.onMessage[name] = [handler]; - } - } - - /** - * Unregister a given listener at this message. - */ - public removeMessageListener = (name: string, handler: MessageHandler) => { - const handlers = this.onMessage[name]; - if (handlers) { - const index = handlers.indexOf(handler); - if (index > -1) { - handlers.splice(index, 1); - } - } } - /** - * Unregister all listeners at this message. - */ - public clearMessageListeners = (message: string) => this.onMessage[message] = undefined; - - public emit = async (name: string, args: any, expectResponse = false): Promise => { + public emit = async (name: string, args?: any, expectResponse = false): Promise => { if (!this.target.send) { return new Error("Cannot dispatch when send is undefined."); } - args[this.response_expected] = expectResponse; if (expectResponse) { return new Promise(resolve => { const messageId = Utils.GenerateGuid(); - args[this.ipc_id] = messageId; - const responseHandler: (args: any) => void = response => { - const { error } = response; - if (response[this.is_response] && response[this.ipc_id] === messageId) { + const metadata: any = {}; + metadata[this.ipc_id] = messageId; + const responseHandler: MessageHandler = ({ args, metadata }) => { + if (metadata[this.is_response] && metadata[this.ipc_id] === messageId) { + const { error } = args; this.target.removeListener("message", responseHandler); resolve(error); } }; this.target.addListener("message", responseHandler); - this.target.send!({ name, args }); + this.target.send?.({ name, args, metadata }); }); } else { - this.target.send({ name, args }); + this.target.send?.({ name, args }); } } + public setRouter = (router: Router) => { + this.target.addListener("message", async ({ name, args, metadata }: InternalMessage) => { + if (name && (!metadata || !metadata[this.is_response])) { + let error: Error | undefined; + try { + await router({ name, args }); + } catch (e) { + error = e; + } + if (metadata && this.target.send) { + metadata[this.is_response] = true; + this.target.send({ + name, + args: { error }, + metadata + }); + } + } + }); + } + } export function IPC(target: IPCTarget) { -- cgit v1.2.3-70-g09d2