1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
|
import cluster from 'cluster';
import { green, red, white, yellow } from 'colors';
import { get } from 'request-promise';
import { ExitHandler } from './applied_session_agent';
import { Monitor } from './monitor';
import IPCMessageReceiver from './process_message_router';
import { ErrorLike, manage } from './promisified_ipc_manager';
/**
* Effectively, each worker repairs the connection to the server by reintroducing a consistent state
* if its predecessor has died. It itself also polls the server heartbeat, and exits with a notification
* email if the server encounters an uncaught exception or if the server cannot be reached.
*/
export class ServerWorker extends IPCMessageReceiver {
private static count = 0;
private shouldServerBeResponsive = false;
private exitHandlers: ExitHandler[] = [];
private pollingFailureCount = 0;
private pollingIntervalSeconds: number;
private pollingFailureTolerance: number;
private pollTarget: string;
private serverPort: number;
private isInitialized = false;
public static Create(work: Function) {
if (cluster.isPrimary) {
console.error(red('cannot create a worker on the monitor process.'));
process.exit(1);
} else if (++ServerWorker.count > 1) {
ServerWorker.IPCManager.emit('kill', {
reason: 'cannot create more than one worker on a given worker process.',
graceful: false,
errorCode: 1,
});
process.exit(1);
}
return new ServerWorker(work);
}
/**
* Allows developers to invoke application specific logic
* by hooking into the exiting of the server process.
*/
public addExitHandler = (handler: ExitHandler) => this.exitHandlers.push(handler);
/**
* Kill the session monitor (parent process) from this
* server worker (child process). This will also kill
* this process (child process).
*/
public killSession = (reason: string, graceful = true, errorCode = 0) => this.emit<never>('kill', { reason, graceful, errorCode });
/**
* A convenience wrapper to tell the session monitor (parent process)
* to carry out the action with the specified message and arguments.
*/
public emit = async <T = any>(name: string, args?: any) => ServerWorker.IPCManager.emit<T>(name, args);
private constructor(work: Function) {
super();
this.configureInternalHandlers();
ServerWorker.IPCManager = manage(process, this.handlers);
this.lifecycleNotification(green(`initializing process... ${white(`[${process.execPath} ${process.execArgv.join(' ')}]`)}`));
const { pollingRoute, serverPort, pollingIntervalSeconds, pollingFailureTolerance } = process.env;
this.serverPort = Number(serverPort);
this.pollingIntervalSeconds = Number(pollingIntervalSeconds);
this.pollingFailureTolerance = Number(pollingFailureTolerance);
this.pollTarget = `http://localhost:${serverPort}${pollingRoute}`;
work();
this.pollServer();
}
/**
* Set up message and uncaught exception handlers for this
* server process.
*/
protected configureInternalHandlers = () => {
// updates the local values of variables to the those sent from master
this.on('updatePollingInterval', ({ newPollingIntervalSeconds }) => {
this.pollingIntervalSeconds = newPollingIntervalSeconds;
});
this.on('manualExit', async ({ isSessionEnd }) => {
await ServerWorker.IPCManager.destroy();
await this.executeExitHandlers(isSessionEnd);
process.exit(0);
});
// one reason to exit, as the process might be in an inconsistent state after such an exception
process.on('uncaughtException', this.proactiveUnplannedExit);
process.on('unhandledRejection', reason => {
const appropriateError = reason instanceof Error ? reason : new Error(`unhandled rejection: ${reason}`);
this.proactiveUnplannedExit(appropriateError);
});
};
/**
* Execute the list of functions registered to be called
* whenever the process exits.
*/
private executeExitHandlers = async (reason: Error | boolean) => Promise.all(this.exitHandlers.map(handler => handler(reason)));
/**
* Notify master thread (which will log update in the console) of initialization via IPC.
*/
public lifecycleNotification = (event: string) => this.emit('lifecycle', { event });
/**
* Called whenever the process has a reason to terminate, either through an uncaught exception
* in the process (potentially inconsistent state) or the server cannot be reached.
*/
private proactiveUnplannedExit = async (error: Error): Promise<void> => {
this.shouldServerBeResponsive = false;
// communicates via IPC to the master thread that it should dispatch a crash notification email
const { name, message, stack } = error;
const errorLike: ErrorLike = { name, message, stack };
this.emit(Monitor.IntrinsicEvents.CrashDetected, { error: errorLike });
await this.executeExitHandlers(error);
// notify master thread (which will log update in the console) of crash event via IPC
this.lifecycleNotification(red(`crash event detected @ ${new Date().toUTCString()}`));
this.lifecycleNotification(red(error.message));
await ServerWorker.IPCManager.destroy();
process.exit(1);
};
/**
* This monitors the health of the server by submitting a get request to whatever port / route specified
* by the configuration every n seconds, where n is also given by the configuration.
*/
private pollServer = async (): Promise<void> => {
await new Promise<void>(resolve => {
setTimeout(async () => {
try {
await get(this.pollTarget);
if (!this.shouldServerBeResponsive) {
// notify monitor thread that the server is up and running
this.lifecycleNotification(green(`listening on ${this.serverPort}...`));
this.emit(Monitor.IntrinsicEvents.ServerRunning, { isFirstTime: !this.isInitialized });
this.isInitialized = true;
}
this.shouldServerBeResponsive = true;
} catch (error: any) {
// if we expect the server to be unavailable, i.e. during compilation,
// the listening variable is false, activeExit will return early and the child
// process will continue
if (this.shouldServerBeResponsive) {
if (++this.pollingFailureCount > this.pollingFailureTolerance) {
this.proactiveUnplannedExit(error);
} else {
this.lifecycleNotification(yellow(`the server has encountered ${this.pollingFailureCount} of ${this.pollingFailureTolerance} tolerable failures`));
}
}
} finally {
resolve();
}
}, 1000 * this.pollingIntervalSeconds);
});
// controlled, asynchronous infinite recursion achieves a persistent poll that does not submit a new request until the previous has completed
this.pollServer();
};
}
|