aboutsummaryrefslogtreecommitdiff
path: root/src/server
diff options
context:
space:
mode:
Diffstat (limited to 'src/server')
-rw-r--r--src/server/ApiManagers/DataVizManager.ts26
-rw-r--r--src/server/ApiManagers/SearchManager.ts151
-rw-r--r--src/server/ApiManagers/UploadManager.ts90
-rw-r--r--src/server/ApiManagers/UserManager.ts1
-rw-r--r--src/server/DashStats.ts293
-rw-r--r--src/server/DashUploadUtils.ts26
-rw-r--r--src/server/DataVizUtils.ts26
-rw-r--r--src/server/Message.ts2
-rw-r--r--src/server/index.ts28
-rw-r--r--src/server/server_Initialization.ts29
-rw-r--r--src/server/updateProtos.ts24
-rw-r--r--src/server/websocket.ts109
12 files changed, 636 insertions, 169 deletions
diff --git a/src/server/ApiManagers/DataVizManager.ts b/src/server/ApiManagers/DataVizManager.ts
new file mode 100644
index 000000000..0d43130d1
--- /dev/null
+++ b/src/server/ApiManagers/DataVizManager.ts
@@ -0,0 +1,26 @@
+import { csvParser, csvToString } from "../DataVizUtils";
+import { Method, _success } from "../RouteManager";
+import ApiManager, { Registration } from "./ApiManager";
+import { Directory, serverPathToFile } from "./UploadManager";
+import * as path from 'path';
+
+export default class DataVizManager extends ApiManager {
+ protected initialize(register: Registration): void {
+ register({
+ method: Method.GET,
+ subscription: "/csvData",
+ secureHandler: async ({ req, res }) => {
+ const uri = req.query.uri as string;
+
+ return new Promise<void>(resolve => {
+ const name = path.basename(uri);
+ const sPath = serverPathToFile(Directory.csv, name);
+ const parsedCsv = csvParser(csvToString(sPath));
+ _success(res, parsedCsv);
+ resolve();
+ });
+ }
+ });
+ }
+
+} \ No newline at end of file
diff --git a/src/server/ApiManagers/SearchManager.ts b/src/server/ApiManagers/SearchManager.ts
index a74e13a62..186f0bcd3 100644
--- a/src/server/ApiManagers/SearchManager.ts
+++ b/src/server/ApiManagers/SearchManager.ts
@@ -1,94 +1,89 @@
-import { exec } from "child_process";
-import { cyan, green, red, yellow } from "colors";
+import { exec } from 'child_process';
+import { cyan, green, red, yellow } from 'colors';
import * as path from 'path';
-import { log_execution } from "../ActionUtilities";
-import { Database } from "../database";
-import { Method } from "../RouteManager";
-import RouteSubscriber from "../RouteSubscriber";
-import { Search } from "../Search";
-import ApiManager, { Registration } from "./ApiManager";
-import { Directory, pathToDirectory } from "./UploadManager";
+import { log_execution } from '../ActionUtilities';
+import { Database } from '../database';
+import { Method } from '../RouteManager';
+import RouteSubscriber from '../RouteSubscriber';
+import { Search } from '../Search';
+import ApiManager, { Registration } from './ApiManager';
+import { Directory, pathToDirectory } from './UploadManager';
const findInFiles = require('find-in-files');
export class SearchManager extends ApiManager {
-
protected initialize(register: Registration): void {
-
register({
method: Method.GET,
- subscription: new RouteSubscriber("solr").add("action"),
+ subscription: new RouteSubscriber('solr').add('action'),
secureHandler: async ({ req, res }) => {
const { action } = req.params;
switch (action) {
- case "start":
- case "stop":
- const status = req.params.action === "start";
+ case 'start':
+ case 'stop':
+ const status = req.params.action === 'start';
SolrManager.SetRunning(status);
break;
- case "update":
+ case 'update':
await SolrManager.update();
break;
default:
console.log(yellow(`${action} is an unknown solr operation.`));
}
- res.redirect("/home");
- }
+ res.redirect('/home');
+ },
});
register({
method: Method.GET,
- subscription: "/textsearch",
+ subscription: '/textsearch',
secureHandler: async ({ req, res }) => {
const q = req.query.q;
if (q === undefined) {
res.send([]);
return;
}
- const resObj: { ids: string[], numFound: number, lines: string[] } = { ids: [], numFound: 0, lines: [] };
+ const resObj: { ids: string[]; numFound: number; lines: string[] } = { ids: [], numFound: 0, lines: [] };
let results: any;
const dir = pathToDirectory(Directory.text);
try {
const regex = new RegExp(q.toString());
- results = await findInFiles.find({ 'term': q, 'flags': 'ig' }, dir, ".txt$");
+ results = await findInFiles.find({ term: q, flags: 'ig' }, dir, '.txt$');
for (const result in results) {
- resObj.ids.push(path.basename(result, ".txt").replace(/upload_/, ""));
+ resObj.ids.push(path.basename(result, '.txt').replace(/upload_/, ''));
resObj.lines.push(results[result].line);
resObj.numFound++;
}
res.send(resObj);
} catch (e) {
- console.log(red("textsearch:bad RegExp" + q.toString()));
+ console.log(red('textsearch:bad RegExp' + q.toString()));
res.send([]);
return;
}
- }
+ },
});
register({
method: Method.GET,
- subscription: "/dashsearch",
+ subscription: '/dashsearch',
secureHandler: async ({ req, res }) => {
const solrQuery: any = {};
- ["q", "fq", "start", "rows", "sort", "hl.maxAnalyzedChars", "hl", "hl.fl"].forEach(key => solrQuery[key] = req.query[key]);
+ ['q', 'fq', 'start', 'rows', 'sort', 'hl.maxAnalyzedChars', 'hl', 'hl.fl'].forEach(key => (solrQuery[key] = req.query[key]));
if (solrQuery.q === undefined) {
res.send([]);
return;
}
const results = await Search.search(solrQuery);
res.send(results);
- }
+ },
});
-
}
-
}
export namespace SolrManager {
-
export function SetRunning(status: boolean) {
- const args = status ? "start" : "stop -p 8983";
+ const args = status ? 'start' : 'stop -p 8983';
console.log(`solr management: trying to ${args}`);
- exec(`solr ${args}`, { cwd: "./solr-8.3.1/bin" }, (error, stdout, stderr) => {
+ exec(`solr ${args}`, { cwd: './solr-8.3.1/bin' }, (error, stdout, stderr) => {
if (error) {
console.log(red(`solr management error: unable to ${args} server`));
console.log(red(error.message));
@@ -97,39 +92,39 @@ export namespace SolrManager {
console.log(yellow(stderr));
});
if (status) {
- console.log(cyan("Start script is executing: please allow 15 seconds for solr to start on port 8983."));
+ console.log(cyan('Start script is executing: please allow 15 seconds for solr to start on port 8983.'));
}
}
export async function update() {
- console.log(green("Beginning update..."));
+ console.log(green('Beginning update...'));
await log_execution<void>({
- startMessage: "Clearing existing Solr information...",
- endMessage: "Solr information successfully cleared",
+ startMessage: 'Clearing existing Solr information...',
+ endMessage: 'Solr information successfully cleared',
action: Search.clear,
- color: cyan
+ color: cyan,
});
const cursor = await log_execution({
- startMessage: "Connecting to and querying for all documents from database...",
+ startMessage: 'Connecting to and querying for all documents from database...',
endMessage: ({ result, error }) => {
const success = error === null && result !== undefined;
if (!success) {
- console.log(red("Unable to connect to the database."));
+ console.log(red('Unable to connect to the database.'));
process.exit(0);
}
- return "Connection successful and query complete";
+ return 'Connection successful and query complete';
},
action: () => Database.Instance.query({}),
- color: yellow
+ color: yellow,
});
const updates: any[] = [];
let numDocs = 0;
function updateDoc(doc: any) {
numDocs++;
- if ((numDocs % 50) === 0) {
+ if (numDocs % 50 === 0) {
console.log(`Batch of 50 complete, total of ${numDocs}`);
}
- if (doc.__type !== "Doc") {
+ if (doc.__type !== 'Doc') {
return;
}
const fields = doc.fields;
@@ -143,8 +138,8 @@ export namespace SolrManager {
const term = ToSearchTerm(value);
if (term !== undefined) {
const { suffix, value } = term;
- if (key.endsWith('lastModified')) {
- update["lastModified" + suffix] = value;
+ if (key.endsWith('modificationDate')) {
+ update['modificationDate' + suffix] = value;
}
update[key + suffix] = value;
dynfield = true;
@@ -157,51 +152,54 @@ export namespace SolrManager {
await cursor?.forEach(updateDoc);
const result = await log_execution({
startMessage: `Dispatching updates for ${updates.length} documents`,
- endMessage: "Dispatched updates complete",
+ endMessage: 'Dispatched updates complete',
action: () => Search.updateDocuments(updates),
- color: cyan
+ color: cyan,
});
try {
if (result) {
const { status } = JSON.parse(result).responseHeader;
- console.log(status ? red(`Failed with status code (${status})`) : green("Success!"));
+ console.log(status ? red(`Failed with status code (${status})`) : green('Success!'));
} else {
- console.log(red("Solr is likely not running!"));
+ console.log(red('Solr is likely not running!'));
}
} catch (e) {
- console.log(red("Error:"));
+ console.log(red('Error:'));
console.log(e);
- console.log("\n");
+ console.log('\n');
}
await cursor?.close();
}
- const suffixMap: { [type: string]: (string | [string, string | ((json: any) => any)]) } = {
- "number": "_n",
- "string": "_t",
- "boolean": "_b",
- "image": ["_t", "url"],
- "video": ["_t", "url"],
- "pdf": ["_t", "url"],
- "audio": ["_t", "url"],
- "web": ["_t", "url"],
- "map": ["_t", "url"],
- "date": ["_d", value => new Date(value.date).toISOString()],
- "proxy": ["_i", "fieldId"],
- "prefetch_proxy": ["_i", "fieldId"],
- "list": ["_l", list => {
- const results = [];
- for (const value of list.fields) {
- const term = ToSearchTerm(value);
- if (term) {
- results.push(term.value);
+ const suffixMap: { [type: string]: string | [string, string | ((json: any) => any)] } = {
+ number: '_n',
+ string: '_t',
+ boolean: '_b',
+ image: ['_t', 'url'],
+ video: ['_t', 'url'],
+ pdf: ['_t', 'url'],
+ audio: ['_t', 'url'],
+ web: ['_t', 'url'],
+ map: ['_t', 'url'],
+ date: ['_d', value => new Date(value.date).toISOString()],
+ proxy: ['_i', 'fieldId'],
+ prefetch_proxy: ['_i', 'fieldId'],
+ list: [
+ '_l',
+ list => {
+ const results = [];
+ for (const value of list.fields) {
+ const term = ToSearchTerm(value);
+ if (term) {
+ results.push(term.value);
+ }
}
- }
- return results.length ? results : null;
- }]
+ return results.length ? results : null;
+ },
+ ],
};
- function ToSearchTerm(val: any): { suffix: string, value: any } | undefined {
+ function ToSearchTerm(val: any): { suffix: string; value: any } | undefined {
if (val === null || val === undefined) {
return;
}
@@ -213,7 +211,7 @@ export namespace SolrManager {
if (Array.isArray(suffix)) {
const accessor = suffix[1];
- if (typeof accessor === "function") {
+ if (typeof accessor === 'function') {
val = accessor(val);
} else {
val = val[accessor];
@@ -223,5 +221,4 @@ export namespace SolrManager {
return { suffix, value: val };
}
-
-} \ No newline at end of file
+}
diff --git a/src/server/ApiManagers/UploadManager.ts b/src/server/ApiManagers/UploadManager.ts
index 0a16bd8ec..de1661ed6 100644
--- a/src/server/ApiManagers/UploadManager.ts
+++ b/src/server/ApiManagers/UploadManager.ts
@@ -1,19 +1,19 @@
-import ApiManager, { Registration } from './ApiManager';
-import { Method, _success } from '../RouteManager';
import * as formidable from 'formidable';
-import v4 = require('uuid/v4');
-const AdmZip = require('adm-zip');
-import { extname, basename, dirname } from 'path';
import { createReadStream, createWriteStream, unlink, writeFile } from 'fs';
-import { publicDirectory, filesDirectory } from '..';
-import { Database } from '../database';
-import { DashUploadUtils, InjectSize, SizeSuffix } from '../DashUploadUtils';
+import { basename, dirname, extname, normalize } from 'path';
import * as sharp from 'sharp';
-import { AcceptableMedia, Upload } from '../SharedMediaTypes';
-import { normalize } from 'path';
+import { filesDirectory, publicDirectory } from '..';
+import { retrocycle } from '../../decycler/decycler';
+import { DashUploadUtils, InjectSize, SizeSuffix } from '../DashUploadUtils';
+import { Database } from '../database';
+import { Method, _success } from '../RouteManager';
import RouteSubscriber from '../RouteSubscriber';
-const imageDataUri = require('image-data-uri');
+import { AcceptableMedia, Upload } from '../SharedMediaTypes';
+import ApiManager, { Registration } from './ApiManager';
import { SolrManager } from './SearchManager';
+import v4 = require('uuid/v4');
+const AdmZip = require('adm-zip');
+const imageDataUri = require('image-data-uri');
const fs = require('fs');
export enum Directory {
@@ -85,7 +85,7 @@ export default class UploadManager extends ApiManager {
for (const key in files) {
const f = files[key];
if (!Array.isArray(f)) {
- const result = await DashUploadUtils.upload(f);
+ const result = await DashUploadUtils.upload(f, key); // key is the guid used by the client to track upload progress.
result && !(result.result instanceof Error) && results.push(result);
}
}
@@ -190,13 +190,9 @@ export default class UploadManager extends ApiManager {
const ids: { [id: string]: string } = {};
let remap = true;
const getId = (id: string): string => {
- if (!remap) return id;
- if (id.endsWith('Proto')) return id;
- if (id in ids) {
- return ids[id];
- } else {
- return (ids[id] = v4());
- }
+ if (!remap || id.endsWith('Proto')) return id;
+ if (id in ids) return ids[id];
+ return (ids[id] = v4());
};
const mapFn = (doc: any) => {
if (doc.id) {
@@ -238,48 +234,50 @@ export default class UploadManager extends ApiManager {
form.parse(req, async (_err, fields, files) => {
remap = fields.remap !== 'false';
let id: string = '';
+ let docids: string[] = [];
+ let linkids: string[] = [];
try {
for (const name in files) {
const f = files[name];
const path_2 = Array.isArray(f) ? '' : f.path;
const zip = new AdmZip(path_2);
zip.getEntries().forEach((entry: any) => {
- if (!entry.entryName.startsWith('files/')) return;
- let directory = dirname(entry.entryName) + '/';
- const extension = extname(entry.entryName);
- const base = basename(entry.entryName).split('.')[0];
+ let entryName = entry.entryName.replace(/%%%/g, '/');
+ if (!entryName.startsWith('files/')) {
+ return;
+ }
+ const extension = extname(entryName);
+ const pathname = publicDirectory + '/' + entry.entryName;
+ const targetname = publicDirectory + '/' + entryName;
try {
zip.extractEntryTo(entry.entryName, publicDirectory, true, false);
- directory = '/' + directory;
-
- createReadStream(publicDirectory + directory + base + extension).pipe(createWriteStream(publicDirectory + directory + base + '_o' + extension));
- createReadStream(publicDirectory + directory + base + extension).pipe(createWriteStream(publicDirectory + directory + base + '_s' + extension));
- createReadStream(publicDirectory + directory + base + extension).pipe(createWriteStream(publicDirectory + directory + base + '_m' + extension));
- createReadStream(publicDirectory + directory + base + extension).pipe(createWriteStream(publicDirectory + directory + base + '_l' + extension));
+ createReadStream(pathname).pipe(createWriteStream(targetname));
+ if (extension !== '.pdf') {
+ createReadStream(pathname).pipe(createWriteStream(targetname.replace('_o' + extension, '_s' + extension)));
+ createReadStream(pathname).pipe(createWriteStream(targetname.replace('_o' + extension, '_m' + extension)));
+ createReadStream(pathname).pipe(createWriteStream(targetname.replace('_o' + extension, '_l' + extension)));
+ }
+ unlink(pathname, () => {});
} catch (e) {
console.log(e);
}
});
- const json = zip.getEntry('doc.json');
+ const json = zip.getEntry('docs.json');
try {
- const data = JSON.parse(json.getData().toString('utf8'));
- const datadocs = data.docs;
+ const data = JSON.parse(json.getData().toString('utf8'), retrocycle());
+ const { docs, links } = data;
id = getId(data.id);
- const docs = Object.keys(datadocs).map(key => datadocs[key]);
- docs.forEach(mapFn);
+ const rdocs = Object.keys(docs).map(key => docs[key]);
+ const ldocs = Object.keys(links).map(key => links[key]);
+ [...rdocs, ...ldocs].forEach(mapFn);
+ docids = rdocs.map(doc => doc.id);
+ linkids = ldocs.map(link => link.id);
await Promise.all(
- docs.map(
- (doc: any) =>
+ [...rdocs, ...ldocs].map(
+ doc =>
new Promise<void>(res => {
- Database.Instance.replace(
- doc.id,
- doc,
- (err, r) => {
- err && console.log(err);
- res();
- },
- true
- );
+ // overwrite mongo doc with json doc contents
+ Database.Instance.replace(doc.id, doc, (err, r) => res(err && console.log(err)), true);
})
)
);
@@ -289,7 +287,7 @@ export default class UploadManager extends ApiManager {
unlink(path_2, () => {});
}
SolrManager.update();
- res.send(JSON.stringify(id || 'error'));
+ res.send(JSON.stringify({ id, docids, linkids } || 'error'));
} catch (e) {
console.log(e);
}
diff --git a/src/server/ApiManagers/UserManager.ts b/src/server/ApiManagers/UserManager.ts
index 53e55c1c3..c3dadd821 100644
--- a/src/server/ApiManagers/UserManager.ts
+++ b/src/server/ApiManagers/UserManager.ts
@@ -5,6 +5,7 @@ import { msToTime } from '../ActionUtilities';
import * as bcrypt from 'bcrypt-nodejs';
import { Opt } from '../../fields/Doc';
import { WebSocket } from '../websocket';
+import { DashStats } from '../DashStats';
export const timeMap: { [id: string]: number } = {};
interface ActivityUnit {
diff --git a/src/server/DashStats.ts b/src/server/DashStats.ts
new file mode 100644
index 000000000..8d341db63
--- /dev/null
+++ b/src/server/DashStats.ts
@@ -0,0 +1,293 @@
+import { cyan, magenta } from 'colors';
+import { Response } from 'express';
+import SocketIO from 'socket.io';
+import { timeMap } from './ApiManagers/UserManager';
+import { WebSocket } from './websocket';
+const fs = require('fs');
+
+/**
+ * DashStats focuses on tracking user data for each session.
+ *
+ * This includes time connected, number of operations, and
+ * the rate of their operations
+ */
+export namespace DashStats {
+ export const SAMPLING_INTERVAL = 1000; // in milliseconds (ms) - Time interval to update the frontend.
+ export const RATE_INTERVAL = 10; // in seconds (s) - Used to calculate rate
+
+ const statsCSVFilename = './src/server/stats/userLoginStats.csv';
+ const columns = ['USERNAME', 'ACTION', 'TIME'];
+
+ /**
+ * UserStats holds the stats associated with a particular user.
+ */
+ interface UserStats {
+ socketId: string;
+ username: string;
+ time: string;
+ operations: number;
+ rate: number;
+ }
+
+ /**
+ * UserLastOperations is the queue object for each user
+ * storing their past operations.
+ */
+ interface UserLastOperations {
+ sampleOperations: number; // stores how many operations total are in this rate section (10 sec, for example)
+ lastSampleOperations: number; // stores how many total operations were recorded at the last sample
+ previousOperationsQueue: number[]; // stores the operations to calculate rate.
+ }
+
+ /**
+ * StatsDataBundle represents an object that will be sent to the frontend view
+ * on each websocket update.
+ */
+ interface StatsDataBundle {
+ connectedUsers: UserStats[];
+ }
+
+ /**
+ * CSVStore represents how objects will be stored in the CSV
+ */
+ interface CSVStore {
+ USERNAME: string;
+ ACTION: string;
+ TIME: string;
+ }
+
+ /**
+ * ServerTraffic describes the current traffic going to the backend.
+ */
+ enum ServerTraffic {
+ NOT_BUSY,
+ BUSY,
+ VERY_BUSY
+ }
+
+ // These values can be changed after further testing how many
+ // users correspond to each traffic level in Dash.
+ const BUSY_SERVER_BOUND = 2;
+ const VERY_BUSY_SERVER_BOUND = 3;
+
+ const serverTrafficMessages = [
+ "Not Busy",
+ "Busy",
+ "Very Busy"
+ ]
+
+ // lastUserOperations maps each username to a UserLastOperations
+ // structure
+ export const lastUserOperations = new Map<string, UserLastOperations>();
+
+ /**
+ * handleStats is called when the /stats route is called, providing a JSON
+ * object with relevant stats. In this case, we return the number of
+ * current connections and
+ * @param res Response object from Express
+ */
+ export function handleStats(res: Response) {
+ let current = getCurrentStats();
+ const results: CSVStore[] = [];
+ res.json({
+ currentConnections: current.length,
+ socketMap: current,
+ });
+ }
+
+ /**
+ * getUpdatedStatesBundle() sends an updated copy of the current stats to the
+ * frontend /statsview route via websockets.
+ *
+ * @returns a StatsDataBundle that is sent to the frontend view on each websocket update
+ */
+ export function getUpdatedStatsBundle(): StatsDataBundle {
+ let current = getCurrentStats();
+
+ return {
+ connectedUsers: current,
+ }
+ }
+
+ /**
+ * handleStatsView() is called when the /statsview route is called. This
+ * will use pug to render a frontend view of the current stats
+ *
+ * @param res
+ */
+ export function handleStatsView(res: Response) {
+ let current = getCurrentStats();
+
+ let connectedUsers = current.map((socketPair) => {
+ return socketPair.time + " - " + socketPair.username + " Operations: " + socketPair.operations;
+ })
+
+ let serverTraffic = ServerTraffic.NOT_BUSY;
+ if(current.length < BUSY_SERVER_BOUND) {
+ serverTraffic = ServerTraffic.NOT_BUSY;
+ } else if(current.length >= BUSY_SERVER_BOUND && current.length < VERY_BUSY_SERVER_BOUND) {
+ serverTraffic = ServerTraffic.BUSY;
+ } else {
+ serverTraffic = ServerTraffic.VERY_BUSY;
+ }
+
+ res.render("stats.pug", {
+ title: "Dash Stats",
+ numConnections: connectedUsers.length,
+ serverTraffic: serverTraffic,
+ serverTrafficMessage : serverTrafficMessages[serverTraffic],
+ connectedUsers: connectedUsers
+ });
+ }
+
+ /**
+ * logUserLogin() writes a login event to the CSV file.
+ *
+ * @param username the username in the format of "username@domain.com logged in"
+ * @param socket the websocket associated with the current connection
+ */
+ export function logUserLogin(username: string | undefined, socket: SocketIO.Socket) {
+ if (!(username === undefined)) {
+ let currentDate = new Date();
+ console.log(magenta(`User ${username.split(' ')[0]} logged in at: ${currentDate.toISOString()}`));
+
+ let toWrite: CSVStore = {
+ USERNAME : username,
+ ACTION : "loggedIn",
+ TIME : currentDate.toISOString()
+ }
+
+ let statsFile = fs.createWriteStream(statsCSVFilename, { flags: "a"});
+ statsFile.write(convertToCSV(toWrite));
+ statsFile.end();
+ console.log(cyan(convertToCSV(toWrite)));
+ }
+ }
+
+ /**
+ * logUserLogout() writes a logout event to the CSV file.
+ *
+ * @param username the username in the format of "username@domain.com logged in"
+ * @param socket the websocket associated with the current connection.
+ */
+ export function logUserLogout(username: string | undefined, socket: SocketIO.Socket) {
+ if (!(username === undefined)) {
+ let currentDate = new Date();
+
+ let statsFile = fs.createWriteStream(statsCSVFilename, { flags: "a"});
+ let toWrite: CSVStore = {
+ USERNAME : username,
+ ACTION : "loggedOut",
+ TIME : currentDate.toISOString()
+ }
+ statsFile.write(convertToCSV(toWrite));
+ statsFile.end();
+ }
+ }
+
+ /**
+ * getLastOperationsOrDefault() is a helper method that will attempt
+ * to query the lastUserOperations map for a specified username. If the
+ * username is not in the map, an empty UserLastOperations object is returned.
+ * @param username
+ * @returns the user's UserLastOperations structure or an empty
+ * UserLastOperations object (All values set to 0) if the username is not found.
+ */
+ function getLastOperationsOrDefault(username: string): UserLastOperations {
+ if(lastUserOperations.get(username) === undefined) {
+ let initializeOperationsQueue = [];
+ for(let i = 0; i < RATE_INTERVAL; i++) {
+ initializeOperationsQueue.push(0);
+ }
+ return {
+ sampleOperations: 0,
+ lastSampleOperations: 0,
+ previousOperationsQueue: initializeOperationsQueue
+ }
+ }
+ return lastUserOperations.get(username)!;
+ }
+
+ /**
+ * updateLastOperations updates a specific user's UserLastOperations information
+ * for the current sampling cycle. The method removes old/outdated counts for
+ * operations from the queue and adds new data for the current sampling
+ * cycle to the queue, updating the total count as it goes.
+ * @param lastOperationData the old UserLastOperations data that must be updated
+ * @param currentOperations the total number of operations measured for this sampling cycle.
+ * @returns the udpated UserLastOperations structure.
+ */
+ function updateLastOperations(lastOperationData: UserLastOperations, currentOperations: number): UserLastOperations {
+ // create a copy of the UserLastOperations to modify
+ let newLastOperationData: UserLastOperations = {
+ sampleOperations: lastOperationData.sampleOperations,
+ lastSampleOperations: lastOperationData.lastSampleOperations,
+ previousOperationsQueue: lastOperationData.previousOperationsQueue.slice()
+ }
+
+ let newSampleOperations = newLastOperationData.sampleOperations;
+ newSampleOperations -= newLastOperationData.previousOperationsQueue.shift()!; // removes and returns the first element of the queue
+ let operationsThisCycle = currentOperations - lastOperationData.lastSampleOperations;
+ newSampleOperations += operationsThisCycle; // add the operations this cycle to find out what our count for the interval should be (e.g operations in the last 10 seconds)
+
+ // update values for the copy object
+ newLastOperationData.sampleOperations = newSampleOperations;
+
+ newLastOperationData.previousOperationsQueue.push(operationsThisCycle);
+ newLastOperationData.lastSampleOperations = currentOperations;
+
+ return newLastOperationData;
+ }
+
+ /**
+ * getUserOperationsOrDefault() is a helper method to get the user's total
+ * operations for the CURRENT sampling interval. The method will return 0
+ * if the username is not in the userOperations map.
+ * @param username the username to search the map for
+ * @returns the total number of operations recorded up to this sampling cycle.
+ */
+ function getUserOperationsOrDefault(username: string): number {
+ return WebSocket.userOperations.get(username) === undefined ? 0 : WebSocket.userOperations.get(username)!
+ }
+
+ /**
+ * getCurrentStats() calculates the total stats for this cycle. In this case,
+ * getCurrentStats() returns an Array of UserStats[] objects describing
+ * the stats for each user
+ * @returns an array of UserStats storing data for each user at the current moment.
+ */
+ function getCurrentStats(): UserStats[] {
+ let socketPairs: UserStats[] = [];
+ for (let [key, value] of WebSocket.socketMap) {
+ let username = value.split(' ')[0];
+ let connectionTime = new Date(timeMap[username]);
+
+ let connectionTimeString = connectionTime.toLocaleDateString() + " " + connectionTime.toLocaleTimeString();
+
+ if (!key.disconnected) {
+ let lastRecordedOperations = getLastOperationsOrDefault(username);
+ let currentUserOperationCount = getUserOperationsOrDefault(username);
+
+ socketPairs.push({
+ socketId: key.id,
+ username: username,
+ time: connectionTimeString.includes("Invalid Date") ? "" : connectionTimeString,
+ operations : WebSocket.userOperations.get(username) ? WebSocket.userOperations.get(username)! : 0,
+ rate: lastRecordedOperations.sampleOperations
+ });
+ lastUserOperations.set(username, updateLastOperations(lastRecordedOperations,currentUserOperationCount));
+ }
+ }
+ return socketPairs;
+ }
+
+ /**
+ * convertToCSV() is a helper method that stringifies a CSVStore object
+ * that can be written to the CSV file later.
+ * @param dataObject the object to stringify
+ * @returns the object as a string.
+ */
+ function convertToCSV(dataObject: CSVStore): string {
+ return `${dataObject.USERNAME},${dataObject.ACTION},${dataObject.TIME}\n`;
+ }
+}
diff --git a/src/server/DashUploadUtils.ts b/src/server/DashUploadUtils.ts
index 4870d218b..eaaac4e6d 100644
--- a/src/server/DashUploadUtils.ts
+++ b/src/server/DashUploadUtils.ts
@@ -70,7 +70,14 @@ export namespace DashUploadUtils {
// make a list of paths to create the ordered text file for ffmpeg
const filePathsText = filePaths.map(filePath => `file '${filePath}'`).join('\n');
// write the text file to the file system
- writeFile(textFilePath, filePathsText, err => console.log(err));
+ await new Promise<void>((res, reject) =>
+ writeFile(textFilePath, filePathsText, err => {
+ if (err) {
+ reject();
+ console.log(err);
+ } else res();
+ })
+ );
// make output file name based on timestamp
const outputFileName = `output-${Utils.GenerateGuid()}.mp4`;
@@ -86,7 +93,10 @@ export namespace DashUploadUtils {
.outputOptions('-c copy')
//.videoCodec("copy")
.save(outputFilePath)
- .on('error', reject)
+ .on('error', (err: any) => {
+ console.log(err);
+ reject();
+ })
.on('end', resolve);
});
@@ -171,9 +181,10 @@ export namespace DashUploadUtils {
});
}
- export async function upload(file: File): Promise<Upload.FileResponse> {
+ export async function upload(file: File, overwriteGuid?: string): Promise<Upload.FileResponse> {
const { type, path, name } = file;
const types = type?.split('/') ?? [];
+ uploadProgress.set(overwriteGuid ?? name, 'uploading'); // If the client sent a guid it uses to track upload progress, use that guid. Otherwise, use the file's name.
const category = types[0];
let format = `.${types[1]}`;
@@ -195,6 +206,7 @@ export namespace DashUploadUtils {
.videoCodec('copy') // this will copy the data instead of reencode it
.save(file.path.replace('.mkv', '.mp4'))
.on('end', res)
+ .on('error', (e: any) => console.log(e))
);
file.path = file.path.replace('.mkv', '.mp4');
format = '.mp4';
@@ -224,7 +236,7 @@ export namespace DashUploadUtils {
return { source: file, result: { name: 'Unsupported video format', message: `Could not upload unsupported file (${name}). Please convert to an .mp4` } };
}
}
- if (videoFormats.includes(format)) {
+ if (videoFormats.includes(format) || format.includes('.webm')) {
return MoveParsedFile(file, Directory.videos);
}
fs.unlink(path, () => {});
@@ -398,7 +410,9 @@ export namespace DashUploadUtils {
// Use the request library to parse out file level image information in the headers
const { headers } = await new Promise<any>((resolve, reject) => {
return request.head(resolvedUrl, (error, res) => (error ? reject(error) : resolve(res)));
- }).catch(e => console.log(e));
+ }).catch(e => {
+ console.log('Error processing headers: ', e);
+ });
try {
// Compute the native width and height ofthe image with an npm module
@@ -499,7 +513,7 @@ export namespace DashUploadUtils {
const parseExifData = async (source: string) => {
const image = await request.get(source, { encoding: null });
- const { data, error } = await new Promise(resolve => {
+ const { data, error } = await new Promise<{ data: any; error: any }>(resolve => {
new ExifImage({ image }, (error, data) => {
let reason: Opt<string> = undefined;
if (error) {
diff --git a/src/server/DataVizUtils.ts b/src/server/DataVizUtils.ts
index 4fd0ca6ff..15f03b319 100644
--- a/src/server/DataVizUtils.ts
+++ b/src/server/DataVizUtils.ts
@@ -1,13 +1,17 @@
+import { readFileSync } from 'fs';
+
export function csvParser(csv: string) {
- const lines = csv.split("\n");
- const headers = lines[0].split(",");
- const data = lines.slice(1).map(line => {
- const values = line.split(",");
- const obj: any = {};
- for (let i = 0; i < headers.length; i++) {
- obj[headers[i]] = values[i];
- }
- return obj;
- });
+ const lines = csv.split('\n');
+ const headers = lines[0].split(',').map(header => header.trim());
+ const data = lines.slice(1).map(line =>
+ line.split(',').reduce((last, value, i) => {
+ last[headers[i]] = value.trim();
+ return last;
+ }, {} as any)
+ );
return data;
-} \ No newline at end of file
+}
+
+export function csvToString(path: string) {
+ return readFileSync(path, 'utf8');
+}
diff --git a/src/server/Message.ts b/src/server/Message.ts
index d87ae5027..8f0af08bc 100644
--- a/src/server/Message.ts
+++ b/src/server/Message.ts
@@ -94,4 +94,6 @@ export namespace MessageStore {
export const YoutubeApiQuery = new Message<YoutubeQueryInput>("Youtube Api Query");
export const DeleteField = new Message<string>("Delete field");
export const DeleteFields = new Message<string[]>("Delete fields");
+
+ export const UpdateStats = new Message<string>("updatestats");
}
diff --git a/src/server/index.ts b/src/server/index.ts
index 6e6bde3cb..8b2e18847 100644
--- a/src/server/index.ts
+++ b/src/server/index.ts
@@ -4,6 +4,7 @@ import * as mobileDetect from 'mobile-detect';
import * as path from 'path';
import * as qs from 'query-string';
import { log_execution } from './ActionUtilities';
+import DataVizManager from './ApiManagers/DataVizManager';
import DeleteManager from './ApiManagers/DeleteManager';
import DownloadManager from './ApiManagers/DownloadManager';
import GeneralGoogleManager from './ApiManagers/GeneralGoogleManager';
@@ -18,6 +19,7 @@ import { GoogleCredentialsLoader, SSL } from './apis/google/CredentialsLoader';
import { GoogleApiServerUtils } from './apis/google/GoogleApiServerUtils';
import { DashSessionAgent } from './DashSession/DashSessionAgent';
import { AppliedSessionAgent } from './DashSession/Session/agents/applied_session_agent';
+import { DashStats } from './DashStats';
import { DashUploadUtils } from './DashUploadUtils';
import { Database } from './database';
import { Logger } from './ProcessFactory';
@@ -62,7 +64,19 @@ async function preliminaryFunctions() {
* with the server
*/
function routeSetter({ isRelease, addSupervisedRoute, logRegistrationOutcome }: RouteManager) {
- const managers = [new SessionManager(), new UserManager(), new UploadManager(), new DownloadManager(), new SearchManager(), new PDFManager(), new DeleteManager(), new UtilManager(), new GeneralGoogleManager(), new GooglePhotosManager()];
+ const managers = [
+ new SessionManager(),
+ new UserManager(),
+ new UploadManager(),
+ new DownloadManager(),
+ new SearchManager(),
+ new PDFManager(),
+ new DeleteManager(),
+ new UtilManager(),
+ new GeneralGoogleManager(),
+ new GooglePhotosManager(),
+ new DataVizManager(),
+ ];
// initialize API Managers
console.log(yellow('\nregistering server routes...'));
@@ -85,6 +99,18 @@ function routeSetter({ isRelease, addSupervisedRoute, logRegistrationOutcome }:
addSupervisedRoute({
method: Method.GET,
+ subscription: '/stats',
+ secureHandler: ({ res }) => DashStats.handleStats(res),
+ });
+
+ addSupervisedRoute({
+ method: Method.GET,
+ subscription: '/statsview',
+ secureHandler: ({ res }) => DashStats.handleStatsView(res),
+ });
+
+ addSupervisedRoute({
+ method: Method.GET,
subscription: '/resolvedPorts',
secureHandler: ({ res }) => res.send(resolvedPorts),
publicHandler: ({ res }) => res.send(resolvedPorts),
diff --git a/src/server/server_Initialization.ts b/src/server/server_Initialization.ts
index b0db71f9c..805da1d43 100644
--- a/src/server/server_Initialization.ts
+++ b/src/server/server_Initialization.ts
@@ -22,6 +22,7 @@ import { Database } from './database';
import RouteManager from './RouteManager';
import RouteSubscriber from './RouteSubscriber';
import { WebSocket } from './websocket';
+import brotli = require('brotli');
import expressFlash = require('express-flash');
import flash = require('connect-flash');
const MongoStore = require('connect-mongo')(session);
@@ -171,31 +172,29 @@ function registerCorsProxy(server: express.Express) {
function proxyServe(req: any, requrl: string, response: any) {
const htmlBodyMemoryStream = new (require('memorystream'))();
var retrieveHTTPBody: any;
+ var wasinBrFormat = false;
const sendModifiedBody = () => {
const header = response.headers['content-encoding'];
+ const httpsToCors = (match: any, href: string, offset: any, string: any) => `href="${resolvedServerUrl + '/corsProxy/http' + href}"`;
if (header?.includes('gzip')) {
try {
- const replacer = (match: any, href: string, offset: any, string: any) => {
- return `href="${resolvedServerUrl + '/corsProxy/http' + href}"`;
- };
- const zipToStringDecoder = new (require('string_decoder').StringDecoder)('utf8');
const bodyStream = htmlBodyMemoryStream.read();
if (bodyStream) {
- const htmlText = zipToStringDecoder.write(
- zlib
- .gunzipSync(bodyStream)
- .toString('utf8')
- .replace('<head>', '<head> <style>[id ^= "google"] { display: none; } </style>')
- .replace(/href="https?([^"]*)"/g, replacer)
- .replace(/target="_blank"/g, '')
- );
+ const htmlInputText = wasinBrFormat ? Buffer.from(brotli.decompress(bodyStream)) : zlib.gunzipSync(bodyStream);
+ const htmlText = htmlInputText
+ .toString('utf8')
+ .replace('<head>', '<head> <style>[id ^= "google"] { display: none; } </style>')
+ .replace(/href="https?([^"]*)"/g, httpsToCors)
+ .replace(/data-srcset="[^"]*"/g, '')
+ .replace(/srcset="[^"]*"/g, '')
+ .replace(/target="_blank"/g, '');
response.send(zlib.gzipSync(htmlText));
} else {
req.pipe(request(requrl)).pipe(response);
console.log('EMPTY body:' + req.url);
}
} catch (e) {
- console.log('EROR?: ', e);
+ console.log('ERROR?: ', e);
}
} else {
req.pipe(htmlBodyMemoryStream).pipe(response);
@@ -216,6 +215,10 @@ function proxyServe(req: any, requrl: string, response: any) {
} else if (headerCharRegex.test(header || '')) {
delete res.headers[headerName];
} else res.headers[headerName] = header;
+ if (headerName === 'content-encoding') {
+ wasinBrFormat = res.headers[headerName] === 'br';
+ res.headers[headerName] = 'gzip';
+ }
});
res.headers['x-permitted-cross-domain-policies'] = 'all';
res.headers['x-frame-options'] = '';
diff --git a/src/server/updateProtos.ts b/src/server/updateProtos.ts
index c5552f6bf..2f3772a77 100644
--- a/src/server/updateProtos.ts
+++ b/src/server/updateProtos.ts
@@ -1,14 +1,22 @@
-import { Database } from "./database";
+import { Database } from './database';
-const protos =
- ["text", "image", "web", "collection", "kvp", "video", "audio", "pdf", "icon", "import", "linkdoc", "map"];
+const protos = ['text', 'image', 'web', 'collection', 'kvp', 'video', 'audio', 'pdf', 'icon', 'import', 'linkdoc', 'map'];
(async function () {
await Promise.all(
- protos.map(protoId => new Promise(res => Database.Instance.update(protoId, {
- $set: { "fields.baseProto": true }
- }, res)))
+ protos.map(
+ protoId =>
+ new Promise(res =>
+ Database.Instance.update(
+ protoId,
+ {
+ $set: { 'fields.isBaseProto': true },
+ },
+ res
+ )
+ )
+ )
);
- console.log("done");
-})(); \ No newline at end of file
+ console.log('done');
+})();
diff --git a/src/server/websocket.ts b/src/server/websocket.ts
index 9b91a35a6..be5cdb202 100644
--- a/src/server/websocket.ts
+++ b/src/server/websocket.ts
@@ -12,16 +12,19 @@ import { GoogleCredentialsLoader, SSL } from './apis/google/CredentialsLoader';
import YoutubeApi from './apis/youtube/youtubeApiSample';
import { initializeGuest } from './authentication/DashUserModel';
import { Client } from './Client';
+import { DashStats } from './DashStats';
import { Database } from './database';
import { DocumentsCollection } from './IDatabase';
import { Diff, GestureContent, MessageStore, MobileDocumentUploadContent, MobileInkOverlayContent, Transferable, Types, UpdateMobileInkOverlayPositionContent, YoutubeQueryInput, YoutubeQueryTypes } from './Message';
import { Search } from './Search';
import { resolvedPorts } from './server_Initialization';
+var _ = require('lodash');
export namespace WebSocket {
export let _socket: Socket;
- const clients: { [key: string]: Client } = {};
+ export const clients: { [key: string]: Client } = {};
export const socketMap = new Map<SocketIO.Socket, string>();
+ export const userOperations = new Map<string, number>();
export let disconnect: Function;
export async function initialize(isRelease: boolean, app: express.Express) {
@@ -49,6 +52,8 @@ export namespace WebSocket {
next();
});
+ socket.emit(MessageStore.UpdateStats.Message, DashStats.getUpdatedStatsBundle());
+
// convenience function to log server messages on the client
function log(message?: any, ...optionalParams: any[]) {
socket.emit('log', ['Message from server:', message, ...optionalParams]);
@@ -97,6 +102,15 @@ export namespace WebSocket {
console.log('received bye');
});
+ socket.on('disconnect', function () {
+ let currentUser = socketMap.get(socket);
+ if (!(currentUser === undefined)) {
+ let currentUsername = currentUser.split(' ')[0];
+ DashStats.logUserLogout(currentUsername, socket);
+ delete timeMap[currentUsername];
+ }
+ });
+
Utils.Emit(socket, MessageStore.Foo, 'handshooken');
Utils.AddServerHandler(socket, MessageStore.Bar, guid => barReceived(socket, guid));
@@ -130,6 +144,12 @@ export namespace WebSocket {
socket.disconnect(true);
};
});
+
+ setInterval(function () {
+ // Utils.Emit(socket, MessageStore.UpdateStats, DashStats.getUpdatedStatsBundle());
+
+ io.emit(MessageStore.UpdateStats.Message, DashStats.getUpdatedStatsBundle());
+ }, DashStats.SAMPLING_INTERVAL);
}
function processGesturePoints(socket: Socket, content: GestureContent) {
@@ -176,7 +196,12 @@ export namespace WebSocket {
const currentdate = new Date();
const datetime = currentdate.getDate() + '/' + (currentdate.getMonth() + 1) + '/' + currentdate.getFullYear() + ' @ ' + currentdate.getHours() + ':' + currentdate.getMinutes() + ':' + currentdate.getSeconds();
console.log(blue(`user ${userEmail} has connected to the web socket at: ${datetime}`));
+ printActiveUsers();
+
+ timeMap[userEmail] = Date.now();
socketMap.set(socket, userEmail + ' at ' + datetime);
+ userOperations.set(userEmail, 0);
+ DashStats.logUserLogin(userEmail, socket);
}
function getField([id, callback]: [string, (result?: Transferable) => void]) {
@@ -199,7 +224,7 @@ export namespace WebSocket {
return Database.Instance.getDocument(id, callback);
}
function GetRefField([id, callback]: [string, (result?: Transferable) => void]) {
- process.stdout.write(`.`);
+ process.stdout.write(`+`);
GetRefFieldLocal([id, callback]);
}
@@ -293,15 +318,79 @@ export namespace WebSocket {
);
}
+ /**
+ * findClosestIndex() is a helper function that will try to find
+ * the closest index of a list that has the same value as
+ * a specified argument/index pair.
+ * @param list the list to search through
+ * @param indexesToDelete a list of indexes that are already marked for deletion
+ * so they will be ignored
+ * @param value the value of the item to remove
+ * @param hintIndex the index that the element was at on the client's copy of
+ * the data
+ * @returns the closest index with the same value or -1 if the element was not found.
+ */
+ function findClosestIndex(list: any, indexesToDelete: number[], value: any, hintIndex: number) {
+ let closestIndex = -1;
+ for (let i = 0; i < list.length; i++) {
+ if (list[i] === value && !indexesToDelete.includes(i)) {
+ if (Math.abs(i - hintIndex) < Math.abs(closestIndex - hintIndex)) {
+ closestIndex = i;
+ }
+ }
+ }
+ return closestIndex;
+ }
+
+ /**
+ * remFromListField() receives the items to remove and a hint
+ * from the client, and attempts to make the modification to the
+ * server's copy of the data. If server's copy does not match
+ * the client's after removal, the server will SEND BACk
+ * its version to the client.
+ * @param socket the socket that the client is connected on
+ * @param diff an object containing the items to remove and a hint
+ * (the hint contains start index and deleteCount, the number of
+ * items to delete)
+ * @param curListItems the server's current copy of the data
+ */
function remFromListField(socket: Socket, diff: Diff, curListItems?: Transferable): void {
diff.diff.$set = diff.diff.$remFromSet;
delete diff.diff.$remFromSet;
const updatefield = Array.from(Object.keys(diff.diff.$set))[0];
const remListItems = diff.diff.$set[updatefield].fields;
const curList = (curListItems as any)?.fields?.[updatefield.replace('fields.', '')]?.fields.filter((f: any) => f !== null) || [];
- diff.diff.$set[updatefield].fields = curList?.filter(
- (curItem: any) => !remListItems.some((remItem: any) => (remItem.fieldId ? remItem.fieldId === curItem.fieldId : remItem.heading ? remItem.heading === curItem.heading : remItem === curItem))
- );
+ const hint = diff.diff.$set.hint;
+
+ if (hint) {
+ // indexesToRemove stores the indexes that we mark for deletion, which is later used to filter the list (delete the elements)
+ let indexesToRemove: number[] = [];
+ for (let i = 0; i < hint.deleteCount; i++) {
+ if (curList.length > i + hint.start && _.isEqual(curList[i + hint.start], remListItems[i])) {
+ indexesToRemove.push(i + hint.start);
+ continue;
+ }
+
+ let closestIndex = findClosestIndex(curList, indexesToRemove, remListItems[i], i + hint.start);
+ if (closestIndex !== -1) {
+ indexesToRemove.push(closestIndex);
+ } else {
+ console.log('Item to delete was not found - index = -1');
+ }
+ }
+
+ diff.diff.$set[updatefield].fields = curList?.filter((curItem: any, index: number) => !indexesToRemove.includes(index));
+ } else {
+ // go back to the original way to delete if we didn't receive
+ // a hint from the client
+ diff.diff.$set[updatefield].fields = curList?.filter(
+ (curItem: any) => !remListItems.some((remItem: any) => (remItem.fieldId ? remItem.fieldId === curItem.fieldId : remItem.heading ? remItem.heading === curItem.heading : remItem === curItem))
+ );
+ }
+
+ // if the client and server have different versions of the data after
+ // deletion, they will have different lengths and the server will
+ // send its version of the data to the client
const sendBack = diff.diff.length !== diff.diff.$set[updatefield].fields.length;
delete diff.diff.length;
Database.Instance.update(
@@ -309,6 +398,7 @@ export namespace WebSocket {
diff.diff,
() => {
if (sendBack) {
+ // the two copies are different, so the server sends its copy.
console.log('SEND BACK');
const id = socket.id;
socket.id = '';
@@ -346,6 +436,11 @@ export namespace WebSocket {
var CurUser: string | undefined = undefined;
function UpdateField(socket: Socket, diff: Diff) {
+ const curUser = socketMap.get(socket);
+ if (!curUser) return;
+ let currentUsername = curUser.split(' ')[0];
+ userOperations.set(currentUsername, userOperations.get(currentUsername) !== undefined ? userOperations.get(currentUsername)! + 1 : 0);
+
if (CurUser !== socketMap.get(socket)) {
CurUser = socketMap.get(socket);
console.log('Switch User: ' + CurUser);
@@ -381,8 +476,8 @@ export namespace WebSocket {
if (term !== undefined) {
const { suffix, value } = term;
update[key + suffix] = { set: value };
- if (key.endsWith('lastModified')) {
- update['lastModified' + suffix] = value;
+ if (key.endsWith('modificationDate')) {
+ update['modificationDate' + suffix] = value;
}
}
}