From dc51ee0dc771b3ac6ff6adc7c039df94935ef943 Mon Sep 17 00:00:00 2001 From: Sam Wilkins Date: Sat, 11 Jan 2020 17:58:16 -0500 Subject: switched out to npm --- src/server/session/agents/applied_session_agent.ts | 58 ---- src/server/session/agents/monitor.ts | 302 --------------------- .../session/agents/process_message_router.ts | 46 ---- .../session/agents/promisified_ipc_manager.ts | 97 ------- src/server/session/agents/server_worker.ts | 160 ----------- 5 files changed, 663 deletions(-) delete mode 100644 src/server/session/agents/applied_session_agent.ts delete mode 100644 src/server/session/agents/monitor.ts delete mode 100644 src/server/session/agents/process_message_router.ts delete mode 100644 src/server/session/agents/promisified_ipc_manager.ts delete mode 100644 src/server/session/agents/server_worker.ts (limited to 'src/server/session/agents') diff --git a/src/server/session/agents/applied_session_agent.ts b/src/server/session/agents/applied_session_agent.ts deleted file mode 100644 index 48226dab6..000000000 --- a/src/server/session/agents/applied_session_agent.ts +++ /dev/null @@ -1,58 +0,0 @@ -import { isMaster } from "cluster"; -import { Monitor } from "./monitor"; -import { ServerWorker } from "./server_worker"; -import { Utils } from "../../../Utils"; - -export type ExitHandler = (reason: Error | boolean) => void | Promise; - -export abstract class AppliedSessionAgent { - - // the following two methods allow the developer to create a custom - // session and use the built in customization options for each thread - protected abstract async initializeMonitor(monitor: Monitor, key: string): Promise; - protected abstract async initializeServerWorker(): Promise; - - private launched = false; - - public killSession = (reason: string, graceful = true, errorCode = 0) => { - const target = isMaster ? this.sessionMonitor : this.serverWorker; - target.killSession(reason, graceful, errorCode); - } - - private sessionMonitorRef: Monitor | undefined; - public get sessionMonitor(): Monitor { - if (!isMaster) { - this.serverWorker.emitToMonitor("kill", { - graceful: false, - reason: "Cannot access the session monitor directly from the server worker thread.", - errorCode: 1 - }); - throw new Error(); - } - return this.sessionMonitorRef!; - } - - private serverWorkerRef: ServerWorker | undefined; - public get serverWorker(): ServerWorker { - if (isMaster) { - throw new Error("Cannot access the server worker directly from the session monitor thread"); - } - return this.serverWorkerRef!; - } - - public async launch(): Promise { - if (!this.launched) { - this.launched = true; - if (isMaster) { - const sessionKey = Utils.GenerateGuid(); - await this.initializeMonitor(this.sessionMonitorRef = Monitor.Create(sessionKey), sessionKey); - this.sessionMonitorRef.finalize(); - } else { - this.serverWorkerRef = await this.initializeServerWorker(); - } - } else { - throw new Error("Cannot launch a session thread more than once per process."); - } - } - -} \ No newline at end of file diff --git a/src/server/session/agents/monitor.ts b/src/server/session/agents/monitor.ts deleted file mode 100644 index 5ea950b2b..000000000 --- a/src/server/session/agents/monitor.ts +++ /dev/null @@ -1,302 +0,0 @@ -import { ExitHandler } from "./applied_session_agent"; -import { Configuration, configurationSchema, defaultConfig, Identifiers, colorMapping } from "../utilities/session_config"; -import Repl, { ReplAction } from "../utilities/repl"; -import { isWorker, setupMaster, on, Worker, fork } from "cluster"; -import { IPC_Promisify, MessageHandler } from "./promisified_ipc_manager"; -import { red, cyan, white, yellow, blue } from "colors"; -import { exec, ExecOptions } from "child_process"; -import { validate, ValidationError } from "jsonschema"; -import { Utilities } from "../utilities/utilities"; -import { readFileSync } from "fs"; -import ProcessMessageRouter from "./process_message_router"; -import { ServerWorker } from "./server_worker"; - -/** - * Validates and reads the configuration file, accordingly builds a child process factory - * and spawns off an initial process that will respawn as predecessors die. - */ -export class Monitor extends ProcessMessageRouter { - private static count = 0; - private finalized = false; - private exitHandlers: ExitHandler[] = []; - private readonly config: Configuration; - private activeWorker: Worker | undefined; - private key: string | undefined; - private repl: Repl; - - public static Create(sessionKey: string) { - if (isWorker) { - ServerWorker.IPCManager.emit("kill", { - reason: "cannot create a monitor on the worker process.", - graceful: false, - errorCode: 1 - }); - process.exit(1); - } else if (++Monitor.count > 1) { - console.error(red("cannot create more than one monitor.")); - process.exit(1); - } else { - return new Monitor(sessionKey); - } - } - - private constructor(sessionKey: string) { - super(); - this.config = this.loadAndValidateConfiguration(); - this.initialize(sessionKey); - this.repl = this.initializeRepl(); - } - - private initialize = (sessionKey: string) => { - console.log(this.timestamp(), cyan("initializing session...")); - this.key = sessionKey; - - // determines whether or not we see the compilation / initialization / runtime output of each child server process - const output = this.config.showServerOutput ? "inherit" : "ignore"; - setupMaster({ stdio: ["ignore", output, output, "ipc"] }); - - // handle exceptions in the master thread - there shouldn't be many of these - // the IPC (inter process communication) channel closed exception can't seem - // to be caught in a try catch, and is inconsequential, so it is ignored - process.on("uncaughtException", ({ message, stack }): void => { - if (message !== "Channel closed") { - this.mainLog(red(message)); - if (stack) { - this.mainLog(`uncaught exception\n${red(stack)}`); - } - } - }); - - // a helpful cluster event called on the master thread each time a child process exits - on("exit", ({ process: { pid } }, code, signal) => { - const prompt = `server worker with process id ${pid} has exited with code ${code}${signal === null ? "" : `, having encountered signal ${signal}`}.`; - this.mainLog(cyan(prompt)); - // to make this a robust, continuous session, every time a child process dies, we immediately spawn a new one - this.spawn(); - }); - } - - public finalize = (): void => { - if (this.finalized) { - throw new Error("Session monitor is already finalized"); - } - this.finalized = true; - this.spawn(); - } - - public readonly coreHooks = Object.freeze({ - onCrashDetected: (listener: MessageHandler<{ error: Error }>) => this.on(Monitor.IntrinsicEvents.CrashDetected, listener), - onServerRunning: (listener: MessageHandler<{ isFirstTime: boolean }>) => this.on(Monitor.IntrinsicEvents.ServerRunning, listener) - }); - - /** - * Kill this session and its active child - * server process, either gracefully (may wait - * indefinitely, but at least allows active networking - * requests to complete) or immediately. - */ - public killSession = async (reason: string, graceful = true, errorCode = 0) => { - this.mainLog(cyan(`exiting session ${graceful ? "clean" : "immediate"}ly`)); - this.mainLog(`session exit reason: ${(red(reason))}`); - await this.executeExitHandlers(true); - this.killActiveWorker(graceful, true); - process.exit(errorCode); - } - - /** - * Execute the list of functions registered to be called - * whenever the process exits. - */ - public addExitHandler = (handler: ExitHandler) => this.exitHandlers.push(handler); - - /** - * Extend the default repl by adding in custom commands - * that can invoke application logic external to this module - */ - public addReplCommand = (basename: string, argPatterns: (RegExp | string)[], action: ReplAction) => { - this.repl.registerCommand(basename, argPatterns, action); - } - - public exec = (command: string, options?: ExecOptions) => { - return new Promise(resolve => { - exec(command, { ...options, encoding: "utf8" }, (error, stdout, stderr) => { - if (error) { - this.execLog(red(`unable to execute ${white(command)}`)); - error.message.split("\n").forEach(line => line.length && this.execLog(red(`(error) ${line}`))); - } else { - let outLines: string[], errorLines: string[]; - if ((outLines = stdout.split("\n").filter(line => line.length)).length) { - outLines.forEach(line => line.length && this.execLog(cyan(`(stdout) ${line}`))); - } - if ((errorLines = stderr.split("\n").filter(line => line.length)).length) { - errorLines.forEach(line => line.length && this.execLog(yellow(`(stderr) ${line}`))); - } - } - resolve(); - }); - }); - } - - /** - * Generates a blue UTC string associated with the time - * of invocation. - */ - private timestamp = () => blue(`[${new Date().toUTCString()}]`); - - /** - * A formatted, identified and timestamped log in color - */ - public mainLog = (...optionalParams: any[]) => { - console.log(this.timestamp(), this.config.identifiers.master.text, ...optionalParams); - } - - /** - * A formatted, identified and timestamped log in color for non- - */ - private execLog = (...optionalParams: any[]) => { - console.log(this.timestamp(), this.config.identifiers.exec.text, ...optionalParams); - } - - /** - * Reads in configuration .json file only once, in the master thread - * and pass down any variables the pertinent to the child processes as environment variables. - */ - private loadAndValidateConfiguration = (): Configuration => { - let config: Configuration; - try { - console.log(this.timestamp(), cyan("validating configuration...")); - config = JSON.parse(readFileSync('./session.config.json', 'utf8')); - const options = { - throwError: true, - allowUnknownAttributes: false - }; - // ensure all necessary and no excess information is specified by the configuration file - validate(config, configurationSchema, options); - config = Utilities.preciseAssign({}, defaultConfig, config); - } catch (error) { - if (error instanceof ValidationError) { - console.log(red("\nSession configuration failed.")); - console.log("The given session.config.json configuration file is invalid."); - console.log(`${error.instance}: ${error.stack}`); - process.exit(0); - } else if (error.code === "ENOENT" && error.path === "./session.config.json") { - console.log(cyan("Loading default session parameters...")); - console.log("Consider including a session.config.json configuration file in your project root for customization."); - config = Utilities.preciseAssign({}, defaultConfig); - } else { - console.log(red("\nSession configuration failed.")); - console.log("The following unknown error occurred during configuration."); - console.log(error.stack); - process.exit(0); - } - } finally { - const { identifiers } = config!; - Object.keys(identifiers).forEach(key => { - const resolved = key as keyof Identifiers; - const { text, color } = identifiers[resolved]; - identifiers[resolved].text = (colorMapping.get(color) || white)(`${text}:`); - }); - return config!; - } - } - - /** - * Builds the repl that allows the following commands to be typed into stdin of the master thread. - */ - private initializeRepl = (): Repl => { - const repl = new Repl({ identifier: () => `${this.timestamp()} ${this.config.identifiers.master.text}` }); - const boolean = /true|false/; - const number = /\d+/; - const letters = /[a-zA-Z]+/; - repl.registerCommand("exit", [/clean|force/], args => this.killSession("manual exit requested by repl", args[0] === "clean", 0)); - repl.registerCommand("restart", [/clean|force/], args => this.killActiveWorker(args[0] === "clean")); - repl.registerCommand("set", [letters, "port", number, boolean], args => this.setPort(args[0], Number(args[2]), args[3] === "true")); - repl.registerCommand("set", [/polling/, number, boolean], args => { - const newPollingIntervalSeconds = Math.floor(Number(args[1])); - if (newPollingIntervalSeconds < 0) { - this.mainLog(red("the polling interval must be a non-negative integer")); - } else { - if (newPollingIntervalSeconds !== this.config.polling.intervalSeconds) { - this.config.polling.intervalSeconds = newPollingIntervalSeconds; - if (args[2] === "true") { - Monitor.IPCManager.emit("updatePollingInterval", { newPollingIntervalSeconds }); - } - } - } - }); - return repl; - } - - private executeExitHandlers = async (reason: Error | boolean) => Promise.all(this.exitHandlers.map(handler => handler(reason))); - - /** - * Attempts to kill the active worker gracefully, unless otherwise specified. - */ - private killActiveWorker = (graceful = true, isSessionEnd = false): void => { - if (this.activeWorker && !this.activeWorker.isDead()) { - if (graceful) { - Monitor.IPCManager.emit("manualExit", { isSessionEnd }); - } else { - this.activeWorker.process.kill(); - } - } - } - - /** - * Allows the caller to set the port at which the target (be it the server, - * the websocket, some other custom port) is listening. If an immediate restart - * is specified, this monitor will kill the active child and re-launch the server - * at the port. Otherwise, the updated port won't be used until / unless the child - * dies on its own and triggers a restart. - */ - private setPort = (port: "server" | "socket" | string, value: number, immediateRestart: boolean): void => { - if (value > 1023 && value < 65536) { - this.config.ports[port] = value; - if (immediateRestart) { - this.killActiveWorker(); - } - } else { - this.mainLog(red(`${port} is an invalid port number`)); - } - } - - /** - * Kills the current active worker and proceeds to spawn a new worker, - * feeding in configuration information as environment variables. - */ - private spawn = (): void => { - const { - polling: { - route, - failureTolerance, - intervalSeconds - }, - ports - } = this.config; - this.killActiveWorker(); - this.activeWorker = fork({ - pollingRoute: route, - pollingFailureTolerance: failureTolerance, - serverPort: ports.server, - socketPort: ports.socket, - pollingIntervalSeconds: intervalSeconds, - session_key: this.key - }); - Monitor.IPCManager = IPC_Promisify(this.activeWorker, this.route); - this.mainLog(cyan(`spawned new server worker with process id ${this.activeWorker?.process.pid}`)); - - this.on("kill", ({ reason, graceful, errorCode }) => this.killSession(reason, graceful, errorCode), true); - this.on("lifecycle", ({ event }) => console.log(this.timestamp(), `${this.config.identifiers.worker.text} lifecycle phase (${event})`), true); - } - -} - -export namespace Monitor { - - export enum IntrinsicEvents { - KeyGenerated = "key_generated", - CrashDetected = "crash_detected", - ServerRunning = "server_running" - } - -} \ No newline at end of file diff --git a/src/server/session/agents/process_message_router.ts b/src/server/session/agents/process_message_router.ts deleted file mode 100644 index d359e97c3..000000000 --- a/src/server/session/agents/process_message_router.ts +++ /dev/null @@ -1,46 +0,0 @@ -import { MessageHandler, PromisifiedIPCManager } from "./promisified_ipc_manager"; - -export default abstract class ProcessMessageRouter { - - protected static IPCManager: PromisifiedIPCManager; - private onMessage: { [name: string]: MessageHandler[] | undefined } = {}; - - /** - * Add a listener at this message. When the monitor process - * receives a message, it will invoke all registered functions. - */ - public on = (name: string, handler: MessageHandler, exclusive = false) => { - const handlers = this.onMessage[name]; - if (exclusive || !handlers) { - this.onMessage[name] = [handler]; - } else { - handlers.push(handler); - } - } - - /** - * Unregister a given listener at this message. - */ - public off = (name: string, handler: MessageHandler) => { - const handlers = this.onMessage[name]; - if (handlers) { - const index = handlers.indexOf(handler); - if (index > -1) { - handlers.splice(index, 1); - } - } - } - - /** - * Unregister all listeners at this message. - */ - public clearMessageListeners = (...names: string[]) => names.map(name => this.onMessage[name] = undefined); - - protected route: MessageHandler = async ({ name, args }) => { - const handlers = this.onMessage[name]; - if (handlers) { - await Promise.all(handlers.map(handler => handler(args))); - } - } - -} \ No newline at end of file diff --git a/src/server/session/agents/promisified_ipc_manager.ts b/src/server/session/agents/promisified_ipc_manager.ts deleted file mode 100644 index 216e9be44..000000000 --- a/src/server/session/agents/promisified_ipc_manager.ts +++ /dev/null @@ -1,97 +0,0 @@ -import { Utils } from "../../../Utils"; -import { isMaster } from "cluster"; - -/** - * Convenience constructor - * @param target the process / worker to which to attach the specialized listeners - */ -export function IPC_Promisify(target: IPCTarget, router: Router) { - return new PromisifiedIPCManager(target, router); -} - -/** - * Essentially, a node process or node cluster worker - */ -export type IPCTarget = NodeJS.EventEmitter & { send?: Function }; - -/** - * Some external code that maps the name of incoming messages to registered handlers, if any - * when this returns, the message is assumed to have been handled in its entirety by the process, so - * await any asynchronous code inside this router. - */ -export type Router = (message: Message) => void | Promise; - -/** - * Specifies a general message format for this API - */ -export type Message = { name: string; args: T; }; -export type MessageHandler = (args: T) => any | Promise; - -/** - * When a message is emitted, it - */ -type InternalMessage = Message & { metadata: any }; -type InternalMessageHandler = (message: InternalMessage) => any | Promise; - -/** - * This is a wrapper utility class that allows the caller process - * to emit an event and return a promise that resolves when it and all - * other processes listening to its emission of this event have completed. - */ -export class PromisifiedIPCManager { - private readonly target: IPCTarget; - - constructor(target: IPCTarget, router: Router) { - this.target = target; - this.target.addListener("message", this.internalHandler(router)); - } - - /** - * A convenience wrapper around the standard process emission. - * Does not wait for a response. - */ - public emit = async (name: string, args?: any) => this.target.send?.({ name, args }); - - /** - * This routine uniquely identifies each message, then adds a general - * message listener that waits for a response with the same id before resolving - * the promise. - */ - public emitPromise = async (name: string, args?: any) => { - return new Promise(resolve => { - const messageId = Utils.GenerateGuid(); - const responseHandler: InternalMessageHandler = ({ metadata: { id, isResponse }, args, name }) => { - if (isResponse && id === messageId) { - this.target.removeListener("message", responseHandler); - resolve(args?.error as Error | undefined); - } - }; - this.target.addListener("message", responseHandler); - const message = { name, args, metadata: { id: messageId } }; - this.target.send?.(message); - }); - } - - /** - * This routine receives a uniquely identified message. If the message is itself a response, - * it is ignored to avoid infinite mutual responses. Otherwise, the routine awaits its completion using whatever - * router the caller has installed, and then sends a response containing the original message id, - * which will ultimately invoke the responseHandler of the original emission and resolve the - * sender's promise. - */ - private internalHandler = (router: Router) => async ({ name, args, metadata }: InternalMessage) => { - if (name && (!metadata || !metadata.isResponse)) { - let error: Error | undefined; - try { - await router({ name, args }); - } catch (e) { - error = e; - } - if (metadata && this.target.send) { - metadata.isResponse = true; - this.target.send({ name, args: { error }, metadata }); - } - } - } - -} \ No newline at end of file diff --git a/src/server/session/agents/server_worker.ts b/src/server/session/agents/server_worker.ts deleted file mode 100644 index 705307030..000000000 --- a/src/server/session/agents/server_worker.ts +++ /dev/null @@ -1,160 +0,0 @@ -import { ExitHandler } from "./applied_session_agent"; -import { isMaster } from "cluster"; -import { PromisifiedIPCManager } from "./promisified_ipc_manager"; -import ProcessMessageRouter from "./process_message_router"; -import { red, green, white, yellow } from "colors"; -import { get } from "request-promise"; -import { Monitor } from "./monitor"; - -/** - * Effectively, each worker repairs the connection to the server by reintroducing a consistent state - * if its predecessor has died. It itself also polls the server heartbeat, and exits with a notification - * email if the server encounters an uncaught exception or if the server cannot be reached. - */ -export class ServerWorker extends ProcessMessageRouter { - private static count = 0; - private shouldServerBeResponsive = false; - private exitHandlers: ExitHandler[] = []; - private pollingFailureCount = 0; - private pollingIntervalSeconds: number; - private pollingFailureTolerance: number; - private pollTarget: string; - private serverPort: number; - private isInitialized = false; - - public static Create(work: Function) { - if (isMaster) { - console.error(red("cannot create a worker on the monitor process.")); - process.exit(1); - } else if (++ServerWorker.count > 1) { - ServerWorker.IPCManager.emit("kill", { - reason: "cannot create more than one worker on a given worker process.", - graceful: false, - errorCode: 1 - }); - process.exit(1); - } else { - return new ServerWorker(work); - } - } - - /** - * Allows developers to invoke application specific logic - * by hooking into the exiting of the server process. - */ - public addExitHandler = (handler: ExitHandler) => this.exitHandlers.push(handler); - - /** - * Kill the session monitor (parent process) from this - * server worker (child process). This will also kill - * this process (child process). - */ - public killSession = (reason: string, graceful = true, errorCode = 0) => this.emitToMonitor("kill", { reason, graceful, errorCode }); - - /** - * A convenience wrapper to tell the session monitor (parent process) - * to carry out the action with the specified message and arguments. - */ - public emitToMonitor = (name: string, args?: any) => ServerWorker.IPCManager.emit(name, args); - - public emitToMonitorPromise = (name: string, args?: any) => ServerWorker.IPCManager.emitPromise(name, args); - - private constructor(work: Function) { - super(); - ServerWorker.IPCManager = new PromisifiedIPCManager(process, this.route); - this.lifecycleNotification(green(`initializing process... ${white(`[${process.execPath} ${process.execArgv.join(" ")}]`)}`)); - - const { pollingRoute, serverPort, pollingIntervalSeconds, pollingFailureTolerance } = process.env; - this.serverPort = Number(serverPort); - this.pollingIntervalSeconds = Number(pollingIntervalSeconds); - this.pollingFailureTolerance = Number(pollingFailureTolerance); - this.pollTarget = `http://localhost:${serverPort}${pollingRoute}`; - - this.configureProcess(); - work(); - this.pollServer(); - } - - /** - * Set up message and uncaught exception handlers for this - * server process. - */ - private configureProcess = () => { - // updates the local values of variables to the those sent from master - this.on("updatePollingInterval", ({ newPollingIntervalSeconds }) => this.pollingIntervalSeconds = newPollingIntervalSeconds); - this.on("manualExit", async ({ isSessionEnd }) => { - await this.executeExitHandlers(isSessionEnd); - process.exit(0); - }); - - // one reason to exit, as the process might be in an inconsistent state after such an exception - process.on('uncaughtException', this.proactiveUnplannedExit); - process.on('unhandledRejection', reason => { - const appropriateError = reason instanceof Error ? reason : new Error(`unhandled rejection: ${reason}`); - this.proactiveUnplannedExit(appropriateError); - }); - } - - /** - * Execute the list of functions registered to be called - * whenever the process exits. - */ - private executeExitHandlers = async (reason: Error | boolean) => Promise.all(this.exitHandlers.map(handler => handler(reason))); - - /** - * Notify master thread (which will log update in the console) of initialization via IPC. - */ - public lifecycleNotification = (event: string) => ServerWorker.IPCManager.emit("lifecycle", { event }); - - /** - * Called whenever the process has a reason to terminate, either through an uncaught exception - * in the process (potentially inconsistent state) or the server cannot be reached. - */ - private proactiveUnplannedExit = async (error: Error): Promise => { - this.shouldServerBeResponsive = false; - // communicates via IPC to the master thread that it should dispatch a crash notification email - this.emitToMonitor(Monitor.IntrinsicEvents.CrashDetected, { error }); - await this.executeExitHandlers(error); - // notify master thread (which will log update in the console) of crash event via IPC - this.lifecycleNotification(red(`crash event detected @ ${new Date().toUTCString()}`)); - this.lifecycleNotification(red(error.message)); - process.exit(1); - } - - /** - * This monitors the health of the server by submitting a get request to whatever port / route specified - * by the configuration every n seconds, where n is also given by the configuration. - */ - private pollServer = async (): Promise => { - await new Promise(resolve => { - setTimeout(async () => { - try { - await get(this.pollTarget); - if (!this.shouldServerBeResponsive) { - // notify monitor thread that the server is up and running - this.lifecycleNotification(green(`listening on ${this.serverPort}...`)); - this.emitToMonitor(Monitor.IntrinsicEvents.ServerRunning, { isFirstTime: !this.isInitialized }); - this.isInitialized = true; - } - this.shouldServerBeResponsive = true; - } catch (error) { - // if we expect the server to be unavailable, i.e. during compilation, - // the listening variable is false, activeExit will return early and the child - // process will continue - if (this.shouldServerBeResponsive) { - if (++this.pollingFailureCount > this.pollingFailureTolerance) { - this.proactiveUnplannedExit(error); - } else { - this.lifecycleNotification(yellow(`the server has encountered ${this.pollingFailureCount} of ${this.pollingFailureTolerance} tolerable failures`)); - } - } - } finally { - resolve(); - } - }, 1000 * this.pollingIntervalSeconds); - }); - // controlled, asynchronous infinite recursion achieves a persistent poll that does not submit a new request until the previous has completed - this.pollServer(); - } - -} \ No newline at end of file -- cgit v1.2.3-70-g09d2