diff options
Diffstat (limited to 'src/server')
| -rw-r--r-- | src/server/ApiManagers/DataVizManager.ts | 26 | ||||
| -rw-r--r-- | src/server/ApiManagers/SearchManager.ts | 151 | ||||
| -rw-r--r-- | src/server/ApiManagers/UploadManager.ts | 90 | ||||
| -rw-r--r-- | src/server/ApiManagers/UserManager.ts | 1 | ||||
| -rw-r--r-- | src/server/DashStats.ts | 293 | ||||
| -rw-r--r-- | src/server/DashUploadUtils.ts | 26 | ||||
| -rw-r--r-- | src/server/DataVizUtils.ts | 26 | ||||
| -rw-r--r-- | src/server/Message.ts | 2 | ||||
| -rw-r--r-- | src/server/index.ts | 28 | ||||
| -rw-r--r-- | src/server/server_Initialization.ts | 29 | ||||
| -rw-r--r-- | src/server/updateProtos.ts | 24 | ||||
| -rw-r--r-- | src/server/websocket.ts | 109 |
12 files changed, 636 insertions, 169 deletions
diff --git a/src/server/ApiManagers/DataVizManager.ts b/src/server/ApiManagers/DataVizManager.ts new file mode 100644 index 000000000..0d43130d1 --- /dev/null +++ b/src/server/ApiManagers/DataVizManager.ts @@ -0,0 +1,26 @@ +import { csvParser, csvToString } from "../DataVizUtils"; +import { Method, _success } from "../RouteManager"; +import ApiManager, { Registration } from "./ApiManager"; +import { Directory, serverPathToFile } from "./UploadManager"; +import * as path from 'path'; + +export default class DataVizManager extends ApiManager { + protected initialize(register: Registration): void { + register({ + method: Method.GET, + subscription: "/csvData", + secureHandler: async ({ req, res }) => { + const uri = req.query.uri as string; + + return new Promise<void>(resolve => { + const name = path.basename(uri); + const sPath = serverPathToFile(Directory.csv, name); + const parsedCsv = csvParser(csvToString(sPath)); + _success(res, parsedCsv); + resolve(); + }); + } + }); + } + +}
\ No newline at end of file diff --git a/src/server/ApiManagers/SearchManager.ts b/src/server/ApiManagers/SearchManager.ts index a74e13a62..186f0bcd3 100644 --- a/src/server/ApiManagers/SearchManager.ts +++ b/src/server/ApiManagers/SearchManager.ts @@ -1,94 +1,89 @@ -import { exec } from "child_process"; -import { cyan, green, red, yellow } from "colors"; +import { exec } from 'child_process'; +import { cyan, green, red, yellow } from 'colors'; import * as path from 'path'; -import { log_execution } from "../ActionUtilities"; -import { Database } from "../database"; -import { Method } from "../RouteManager"; -import RouteSubscriber from "../RouteSubscriber"; -import { Search } from "../Search"; -import ApiManager, { Registration } from "./ApiManager"; -import { Directory, pathToDirectory } from "./UploadManager"; +import { log_execution } from '../ActionUtilities'; +import { Database } from '../database'; +import { Method } from '../RouteManager'; +import RouteSubscriber from '../RouteSubscriber'; +import { Search } from '../Search'; +import ApiManager, { Registration } from './ApiManager'; +import { Directory, pathToDirectory } from './UploadManager'; const findInFiles = require('find-in-files'); export class SearchManager extends ApiManager { - protected initialize(register: Registration): void { - register({ method: Method.GET, - subscription: new RouteSubscriber("solr").add("action"), + subscription: new RouteSubscriber('solr').add('action'), secureHandler: async ({ req, res }) => { const { action } = req.params; switch (action) { - case "start": - case "stop": - const status = req.params.action === "start"; + case 'start': + case 'stop': + const status = req.params.action === 'start'; SolrManager.SetRunning(status); break; - case "update": + case 'update': await SolrManager.update(); break; default: console.log(yellow(`${action} is an unknown solr operation.`)); } - res.redirect("/home"); - } + res.redirect('/home'); + }, }); register({ method: Method.GET, - subscription: "/textsearch", + subscription: '/textsearch', secureHandler: async ({ req, res }) => { const q = req.query.q; if (q === undefined) { res.send([]); return; } - const resObj: { ids: string[], numFound: number, lines: string[] } = { ids: [], numFound: 0, lines: [] }; + const resObj: { ids: string[]; numFound: number; lines: string[] } = { ids: [], numFound: 0, lines: [] }; let results: any; const dir = pathToDirectory(Directory.text); try { const regex = new RegExp(q.toString()); - results = await findInFiles.find({ 'term': q, 'flags': 'ig' }, dir, ".txt$"); + results = await findInFiles.find({ term: q, flags: 'ig' }, dir, '.txt$'); for (const result in results) { - resObj.ids.push(path.basename(result, ".txt").replace(/upload_/, "")); + resObj.ids.push(path.basename(result, '.txt').replace(/upload_/, '')); resObj.lines.push(results[result].line); resObj.numFound++; } res.send(resObj); } catch (e) { - console.log(red("textsearch:bad RegExp" + q.toString())); + console.log(red('textsearch:bad RegExp' + q.toString())); res.send([]); return; } - } + }, }); register({ method: Method.GET, - subscription: "/dashsearch", + subscription: '/dashsearch', secureHandler: async ({ req, res }) => { const solrQuery: any = {}; - ["q", "fq", "start", "rows", "sort", "hl.maxAnalyzedChars", "hl", "hl.fl"].forEach(key => solrQuery[key] = req.query[key]); + ['q', 'fq', 'start', 'rows', 'sort', 'hl.maxAnalyzedChars', 'hl', 'hl.fl'].forEach(key => (solrQuery[key] = req.query[key])); if (solrQuery.q === undefined) { res.send([]); return; } const results = await Search.search(solrQuery); res.send(results); - } + }, }); - } - } export namespace SolrManager { - export function SetRunning(status: boolean) { - const args = status ? "start" : "stop -p 8983"; + const args = status ? 'start' : 'stop -p 8983'; console.log(`solr management: trying to ${args}`); - exec(`solr ${args}`, { cwd: "./solr-8.3.1/bin" }, (error, stdout, stderr) => { + exec(`solr ${args}`, { cwd: './solr-8.3.1/bin' }, (error, stdout, stderr) => { if (error) { console.log(red(`solr management error: unable to ${args} server`)); console.log(red(error.message)); @@ -97,39 +92,39 @@ export namespace SolrManager { console.log(yellow(stderr)); }); if (status) { - console.log(cyan("Start script is executing: please allow 15 seconds for solr to start on port 8983.")); + console.log(cyan('Start script is executing: please allow 15 seconds for solr to start on port 8983.')); } } export async function update() { - console.log(green("Beginning update...")); + console.log(green('Beginning update...')); await log_execution<void>({ - startMessage: "Clearing existing Solr information...", - endMessage: "Solr information successfully cleared", + startMessage: 'Clearing existing Solr information...', + endMessage: 'Solr information successfully cleared', action: Search.clear, - color: cyan + color: cyan, }); const cursor = await log_execution({ - startMessage: "Connecting to and querying for all documents from database...", + startMessage: 'Connecting to and querying for all documents from database...', endMessage: ({ result, error }) => { const success = error === null && result !== undefined; if (!success) { - console.log(red("Unable to connect to the database.")); + console.log(red('Unable to connect to the database.')); process.exit(0); } - return "Connection successful and query complete"; + return 'Connection successful and query complete'; }, action: () => Database.Instance.query({}), - color: yellow + color: yellow, }); const updates: any[] = []; let numDocs = 0; function updateDoc(doc: any) { numDocs++; - if ((numDocs % 50) === 0) { + if (numDocs % 50 === 0) { console.log(`Batch of 50 complete, total of ${numDocs}`); } - if (doc.__type !== "Doc") { + if (doc.__type !== 'Doc') { return; } const fields = doc.fields; @@ -143,8 +138,8 @@ export namespace SolrManager { const term = ToSearchTerm(value); if (term !== undefined) { const { suffix, value } = term; - if (key.endsWith('lastModified')) { - update["lastModified" + suffix] = value; + if (key.endsWith('modificationDate')) { + update['modificationDate' + suffix] = value; } update[key + suffix] = value; dynfield = true; @@ -157,51 +152,54 @@ export namespace SolrManager { await cursor?.forEach(updateDoc); const result = await log_execution({ startMessage: `Dispatching updates for ${updates.length} documents`, - endMessage: "Dispatched updates complete", + endMessage: 'Dispatched updates complete', action: () => Search.updateDocuments(updates), - color: cyan + color: cyan, }); try { if (result) { const { status } = JSON.parse(result).responseHeader; - console.log(status ? red(`Failed with status code (${status})`) : green("Success!")); + console.log(status ? red(`Failed with status code (${status})`) : green('Success!')); } else { - console.log(red("Solr is likely not running!")); + console.log(red('Solr is likely not running!')); } } catch (e) { - console.log(red("Error:")); + console.log(red('Error:')); console.log(e); - console.log("\n"); + console.log('\n'); } await cursor?.close(); } - const suffixMap: { [type: string]: (string | [string, string | ((json: any) => any)]) } = { - "number": "_n", - "string": "_t", - "boolean": "_b", - "image": ["_t", "url"], - "video": ["_t", "url"], - "pdf": ["_t", "url"], - "audio": ["_t", "url"], - "web": ["_t", "url"], - "map": ["_t", "url"], - "date": ["_d", value => new Date(value.date).toISOString()], - "proxy": ["_i", "fieldId"], - "prefetch_proxy": ["_i", "fieldId"], - "list": ["_l", list => { - const results = []; - for (const value of list.fields) { - const term = ToSearchTerm(value); - if (term) { - results.push(term.value); + const suffixMap: { [type: string]: string | [string, string | ((json: any) => any)] } = { + number: '_n', + string: '_t', + boolean: '_b', + image: ['_t', 'url'], + video: ['_t', 'url'], + pdf: ['_t', 'url'], + audio: ['_t', 'url'], + web: ['_t', 'url'], + map: ['_t', 'url'], + date: ['_d', value => new Date(value.date).toISOString()], + proxy: ['_i', 'fieldId'], + prefetch_proxy: ['_i', 'fieldId'], + list: [ + '_l', + list => { + const results = []; + for (const value of list.fields) { + const term = ToSearchTerm(value); + if (term) { + results.push(term.value); + } } - } - return results.length ? results : null; - }] + return results.length ? results : null; + }, + ], }; - function ToSearchTerm(val: any): { suffix: string, value: any } | undefined { + function ToSearchTerm(val: any): { suffix: string; value: any } | undefined { if (val === null || val === undefined) { return; } @@ -213,7 +211,7 @@ export namespace SolrManager { if (Array.isArray(suffix)) { const accessor = suffix[1]; - if (typeof accessor === "function") { + if (typeof accessor === 'function') { val = accessor(val); } else { val = val[accessor]; @@ -223,5 +221,4 @@ export namespace SolrManager { return { suffix, value: val }; } - -}
\ No newline at end of file +} diff --git a/src/server/ApiManagers/UploadManager.ts b/src/server/ApiManagers/UploadManager.ts index 0a16bd8ec..de1661ed6 100644 --- a/src/server/ApiManagers/UploadManager.ts +++ b/src/server/ApiManagers/UploadManager.ts @@ -1,19 +1,19 @@ -import ApiManager, { Registration } from './ApiManager'; -import { Method, _success } from '../RouteManager'; import * as formidable from 'formidable'; -import v4 = require('uuid/v4'); -const AdmZip = require('adm-zip'); -import { extname, basename, dirname } from 'path'; import { createReadStream, createWriteStream, unlink, writeFile } from 'fs'; -import { publicDirectory, filesDirectory } from '..'; -import { Database } from '../database'; -import { DashUploadUtils, InjectSize, SizeSuffix } from '../DashUploadUtils'; +import { basename, dirname, extname, normalize } from 'path'; import * as sharp from 'sharp'; -import { AcceptableMedia, Upload } from '../SharedMediaTypes'; -import { normalize } from 'path'; +import { filesDirectory, publicDirectory } from '..'; +import { retrocycle } from '../../decycler/decycler'; +import { DashUploadUtils, InjectSize, SizeSuffix } from '../DashUploadUtils'; +import { Database } from '../database'; +import { Method, _success } from '../RouteManager'; import RouteSubscriber from '../RouteSubscriber'; -const imageDataUri = require('image-data-uri'); +import { AcceptableMedia, Upload } from '../SharedMediaTypes'; +import ApiManager, { Registration } from './ApiManager'; import { SolrManager } from './SearchManager'; +import v4 = require('uuid/v4'); +const AdmZip = require('adm-zip'); +const imageDataUri = require('image-data-uri'); const fs = require('fs'); export enum Directory { @@ -85,7 +85,7 @@ export default class UploadManager extends ApiManager { for (const key in files) { const f = files[key]; if (!Array.isArray(f)) { - const result = await DashUploadUtils.upload(f); + const result = await DashUploadUtils.upload(f, key); // key is the guid used by the client to track upload progress. result && !(result.result instanceof Error) && results.push(result); } } @@ -190,13 +190,9 @@ export default class UploadManager extends ApiManager { const ids: { [id: string]: string } = {}; let remap = true; const getId = (id: string): string => { - if (!remap) return id; - if (id.endsWith('Proto')) return id; - if (id in ids) { - return ids[id]; - } else { - return (ids[id] = v4()); - } + if (!remap || id.endsWith('Proto')) return id; + if (id in ids) return ids[id]; + return (ids[id] = v4()); }; const mapFn = (doc: any) => { if (doc.id) { @@ -238,48 +234,50 @@ export default class UploadManager extends ApiManager { form.parse(req, async (_err, fields, files) => { remap = fields.remap !== 'false'; let id: string = ''; + let docids: string[] = []; + let linkids: string[] = []; try { for (const name in files) { const f = files[name]; const path_2 = Array.isArray(f) ? '' : f.path; const zip = new AdmZip(path_2); zip.getEntries().forEach((entry: any) => { - if (!entry.entryName.startsWith('files/')) return; - let directory = dirname(entry.entryName) + '/'; - const extension = extname(entry.entryName); - const base = basename(entry.entryName).split('.')[0]; + let entryName = entry.entryName.replace(/%%%/g, '/'); + if (!entryName.startsWith('files/')) { + return; + } + const extension = extname(entryName); + const pathname = publicDirectory + '/' + entry.entryName; + const targetname = publicDirectory + '/' + entryName; try { zip.extractEntryTo(entry.entryName, publicDirectory, true, false); - directory = '/' + directory; - - createReadStream(publicDirectory + directory + base + extension).pipe(createWriteStream(publicDirectory + directory + base + '_o' + extension)); - createReadStream(publicDirectory + directory + base + extension).pipe(createWriteStream(publicDirectory + directory + base + '_s' + extension)); - createReadStream(publicDirectory + directory + base + extension).pipe(createWriteStream(publicDirectory + directory + base + '_m' + extension)); - createReadStream(publicDirectory + directory + base + extension).pipe(createWriteStream(publicDirectory + directory + base + '_l' + extension)); + createReadStream(pathname).pipe(createWriteStream(targetname)); + if (extension !== '.pdf') { + createReadStream(pathname).pipe(createWriteStream(targetname.replace('_o' + extension, '_s' + extension))); + createReadStream(pathname).pipe(createWriteStream(targetname.replace('_o' + extension, '_m' + extension))); + createReadStream(pathname).pipe(createWriteStream(targetname.replace('_o' + extension, '_l' + extension))); + } + unlink(pathname, () => {}); } catch (e) { console.log(e); } }); - const json = zip.getEntry('doc.json'); + const json = zip.getEntry('docs.json'); try { - const data = JSON.parse(json.getData().toString('utf8')); - const datadocs = data.docs; + const data = JSON.parse(json.getData().toString('utf8'), retrocycle()); + const { docs, links } = data; id = getId(data.id); - const docs = Object.keys(datadocs).map(key => datadocs[key]); - docs.forEach(mapFn); + const rdocs = Object.keys(docs).map(key => docs[key]); + const ldocs = Object.keys(links).map(key => links[key]); + [...rdocs, ...ldocs].forEach(mapFn); + docids = rdocs.map(doc => doc.id); + linkids = ldocs.map(link => link.id); await Promise.all( - docs.map( - (doc: any) => + [...rdocs, ...ldocs].map( + doc => new Promise<void>(res => { - Database.Instance.replace( - doc.id, - doc, - (err, r) => { - err && console.log(err); - res(); - }, - true - ); + // overwrite mongo doc with json doc contents + Database.Instance.replace(doc.id, doc, (err, r) => res(err && console.log(err)), true); }) ) ); @@ -289,7 +287,7 @@ export default class UploadManager extends ApiManager { unlink(path_2, () => {}); } SolrManager.update(); - res.send(JSON.stringify(id || 'error')); + res.send(JSON.stringify({ id, docids, linkids } || 'error')); } catch (e) { console.log(e); } diff --git a/src/server/ApiManagers/UserManager.ts b/src/server/ApiManagers/UserManager.ts index 53e55c1c3..c3dadd821 100644 --- a/src/server/ApiManagers/UserManager.ts +++ b/src/server/ApiManagers/UserManager.ts @@ -5,6 +5,7 @@ import { msToTime } from '../ActionUtilities'; import * as bcrypt from 'bcrypt-nodejs'; import { Opt } from '../../fields/Doc'; import { WebSocket } from '../websocket'; +import { DashStats } from '../DashStats'; export const timeMap: { [id: string]: number } = {}; interface ActivityUnit { diff --git a/src/server/DashStats.ts b/src/server/DashStats.ts new file mode 100644 index 000000000..8d341db63 --- /dev/null +++ b/src/server/DashStats.ts @@ -0,0 +1,293 @@ +import { cyan, magenta } from 'colors'; +import { Response } from 'express'; +import SocketIO from 'socket.io'; +import { timeMap } from './ApiManagers/UserManager'; +import { WebSocket } from './websocket'; +const fs = require('fs'); + +/** + * DashStats focuses on tracking user data for each session. + * + * This includes time connected, number of operations, and + * the rate of their operations + */ +export namespace DashStats { + export const SAMPLING_INTERVAL = 1000; // in milliseconds (ms) - Time interval to update the frontend. + export const RATE_INTERVAL = 10; // in seconds (s) - Used to calculate rate + + const statsCSVFilename = './src/server/stats/userLoginStats.csv'; + const columns = ['USERNAME', 'ACTION', 'TIME']; + + /** + * UserStats holds the stats associated with a particular user. + */ + interface UserStats { + socketId: string; + username: string; + time: string; + operations: number; + rate: number; + } + + /** + * UserLastOperations is the queue object for each user + * storing their past operations. + */ + interface UserLastOperations { + sampleOperations: number; // stores how many operations total are in this rate section (10 sec, for example) + lastSampleOperations: number; // stores how many total operations were recorded at the last sample + previousOperationsQueue: number[]; // stores the operations to calculate rate. + } + + /** + * StatsDataBundle represents an object that will be sent to the frontend view + * on each websocket update. + */ + interface StatsDataBundle { + connectedUsers: UserStats[]; + } + + /** + * CSVStore represents how objects will be stored in the CSV + */ + interface CSVStore { + USERNAME: string; + ACTION: string; + TIME: string; + } + + /** + * ServerTraffic describes the current traffic going to the backend. + */ + enum ServerTraffic { + NOT_BUSY, + BUSY, + VERY_BUSY + } + + // These values can be changed after further testing how many + // users correspond to each traffic level in Dash. + const BUSY_SERVER_BOUND = 2; + const VERY_BUSY_SERVER_BOUND = 3; + + const serverTrafficMessages = [ + "Not Busy", + "Busy", + "Very Busy" + ] + + // lastUserOperations maps each username to a UserLastOperations + // structure + export const lastUserOperations = new Map<string, UserLastOperations>(); + + /** + * handleStats is called when the /stats route is called, providing a JSON + * object with relevant stats. In this case, we return the number of + * current connections and + * @param res Response object from Express + */ + export function handleStats(res: Response) { + let current = getCurrentStats(); + const results: CSVStore[] = []; + res.json({ + currentConnections: current.length, + socketMap: current, + }); + } + + /** + * getUpdatedStatesBundle() sends an updated copy of the current stats to the + * frontend /statsview route via websockets. + * + * @returns a StatsDataBundle that is sent to the frontend view on each websocket update + */ + export function getUpdatedStatsBundle(): StatsDataBundle { + let current = getCurrentStats(); + + return { + connectedUsers: current, + } + } + + /** + * handleStatsView() is called when the /statsview route is called. This + * will use pug to render a frontend view of the current stats + * + * @param res + */ + export function handleStatsView(res: Response) { + let current = getCurrentStats(); + + let connectedUsers = current.map((socketPair) => { + return socketPair.time + " - " + socketPair.username + " Operations: " + socketPair.operations; + }) + + let serverTraffic = ServerTraffic.NOT_BUSY; + if(current.length < BUSY_SERVER_BOUND) { + serverTraffic = ServerTraffic.NOT_BUSY; + } else if(current.length >= BUSY_SERVER_BOUND && current.length < VERY_BUSY_SERVER_BOUND) { + serverTraffic = ServerTraffic.BUSY; + } else { + serverTraffic = ServerTraffic.VERY_BUSY; + } + + res.render("stats.pug", { + title: "Dash Stats", + numConnections: connectedUsers.length, + serverTraffic: serverTraffic, + serverTrafficMessage : serverTrafficMessages[serverTraffic], + connectedUsers: connectedUsers + }); + } + + /** + * logUserLogin() writes a login event to the CSV file. + * + * @param username the username in the format of "username@domain.com logged in" + * @param socket the websocket associated with the current connection + */ + export function logUserLogin(username: string | undefined, socket: SocketIO.Socket) { + if (!(username === undefined)) { + let currentDate = new Date(); + console.log(magenta(`User ${username.split(' ')[0]} logged in at: ${currentDate.toISOString()}`)); + + let toWrite: CSVStore = { + USERNAME : username, + ACTION : "loggedIn", + TIME : currentDate.toISOString() + } + + let statsFile = fs.createWriteStream(statsCSVFilename, { flags: "a"}); + statsFile.write(convertToCSV(toWrite)); + statsFile.end(); + console.log(cyan(convertToCSV(toWrite))); + } + } + + /** + * logUserLogout() writes a logout event to the CSV file. + * + * @param username the username in the format of "username@domain.com logged in" + * @param socket the websocket associated with the current connection. + */ + export function logUserLogout(username: string | undefined, socket: SocketIO.Socket) { + if (!(username === undefined)) { + let currentDate = new Date(); + + let statsFile = fs.createWriteStream(statsCSVFilename, { flags: "a"}); + let toWrite: CSVStore = { + USERNAME : username, + ACTION : "loggedOut", + TIME : currentDate.toISOString() + } + statsFile.write(convertToCSV(toWrite)); + statsFile.end(); + } + } + + /** + * getLastOperationsOrDefault() is a helper method that will attempt + * to query the lastUserOperations map for a specified username. If the + * username is not in the map, an empty UserLastOperations object is returned. + * @param username + * @returns the user's UserLastOperations structure or an empty + * UserLastOperations object (All values set to 0) if the username is not found. + */ + function getLastOperationsOrDefault(username: string): UserLastOperations { + if(lastUserOperations.get(username) === undefined) { + let initializeOperationsQueue = []; + for(let i = 0; i < RATE_INTERVAL; i++) { + initializeOperationsQueue.push(0); + } + return { + sampleOperations: 0, + lastSampleOperations: 0, + previousOperationsQueue: initializeOperationsQueue + } + } + return lastUserOperations.get(username)!; + } + + /** + * updateLastOperations updates a specific user's UserLastOperations information + * for the current sampling cycle. The method removes old/outdated counts for + * operations from the queue and adds new data for the current sampling + * cycle to the queue, updating the total count as it goes. + * @param lastOperationData the old UserLastOperations data that must be updated + * @param currentOperations the total number of operations measured for this sampling cycle. + * @returns the udpated UserLastOperations structure. + */ + function updateLastOperations(lastOperationData: UserLastOperations, currentOperations: number): UserLastOperations { + // create a copy of the UserLastOperations to modify + let newLastOperationData: UserLastOperations = { + sampleOperations: lastOperationData.sampleOperations, + lastSampleOperations: lastOperationData.lastSampleOperations, + previousOperationsQueue: lastOperationData.previousOperationsQueue.slice() + } + + let newSampleOperations = newLastOperationData.sampleOperations; + newSampleOperations -= newLastOperationData.previousOperationsQueue.shift()!; // removes and returns the first element of the queue + let operationsThisCycle = currentOperations - lastOperationData.lastSampleOperations; + newSampleOperations += operationsThisCycle; // add the operations this cycle to find out what our count for the interval should be (e.g operations in the last 10 seconds) + + // update values for the copy object + newLastOperationData.sampleOperations = newSampleOperations; + + newLastOperationData.previousOperationsQueue.push(operationsThisCycle); + newLastOperationData.lastSampleOperations = currentOperations; + + return newLastOperationData; + } + + /** + * getUserOperationsOrDefault() is a helper method to get the user's total + * operations for the CURRENT sampling interval. The method will return 0 + * if the username is not in the userOperations map. + * @param username the username to search the map for + * @returns the total number of operations recorded up to this sampling cycle. + */ + function getUserOperationsOrDefault(username: string): number { + return WebSocket.userOperations.get(username) === undefined ? 0 : WebSocket.userOperations.get(username)! + } + + /** + * getCurrentStats() calculates the total stats for this cycle. In this case, + * getCurrentStats() returns an Array of UserStats[] objects describing + * the stats for each user + * @returns an array of UserStats storing data for each user at the current moment. + */ + function getCurrentStats(): UserStats[] { + let socketPairs: UserStats[] = []; + for (let [key, value] of WebSocket.socketMap) { + let username = value.split(' ')[0]; + let connectionTime = new Date(timeMap[username]); + + let connectionTimeString = connectionTime.toLocaleDateString() + " " + connectionTime.toLocaleTimeString(); + + if (!key.disconnected) { + let lastRecordedOperations = getLastOperationsOrDefault(username); + let currentUserOperationCount = getUserOperationsOrDefault(username); + + socketPairs.push({ + socketId: key.id, + username: username, + time: connectionTimeString.includes("Invalid Date") ? "" : connectionTimeString, + operations : WebSocket.userOperations.get(username) ? WebSocket.userOperations.get(username)! : 0, + rate: lastRecordedOperations.sampleOperations + }); + lastUserOperations.set(username, updateLastOperations(lastRecordedOperations,currentUserOperationCount)); + } + } + return socketPairs; + } + + /** + * convertToCSV() is a helper method that stringifies a CSVStore object + * that can be written to the CSV file later. + * @param dataObject the object to stringify + * @returns the object as a string. + */ + function convertToCSV(dataObject: CSVStore): string { + return `${dataObject.USERNAME},${dataObject.ACTION},${dataObject.TIME}\n`; + } +} diff --git a/src/server/DashUploadUtils.ts b/src/server/DashUploadUtils.ts index 4870d218b..eaaac4e6d 100644 --- a/src/server/DashUploadUtils.ts +++ b/src/server/DashUploadUtils.ts @@ -70,7 +70,14 @@ export namespace DashUploadUtils { // make a list of paths to create the ordered text file for ffmpeg const filePathsText = filePaths.map(filePath => `file '${filePath}'`).join('\n'); // write the text file to the file system - writeFile(textFilePath, filePathsText, err => console.log(err)); + await new Promise<void>((res, reject) => + writeFile(textFilePath, filePathsText, err => { + if (err) { + reject(); + console.log(err); + } else res(); + }) + ); // make output file name based on timestamp const outputFileName = `output-${Utils.GenerateGuid()}.mp4`; @@ -86,7 +93,10 @@ export namespace DashUploadUtils { .outputOptions('-c copy') //.videoCodec("copy") .save(outputFilePath) - .on('error', reject) + .on('error', (err: any) => { + console.log(err); + reject(); + }) .on('end', resolve); }); @@ -171,9 +181,10 @@ export namespace DashUploadUtils { }); } - export async function upload(file: File): Promise<Upload.FileResponse> { + export async function upload(file: File, overwriteGuid?: string): Promise<Upload.FileResponse> { const { type, path, name } = file; const types = type?.split('/') ?? []; + uploadProgress.set(overwriteGuid ?? name, 'uploading'); // If the client sent a guid it uses to track upload progress, use that guid. Otherwise, use the file's name. const category = types[0]; let format = `.${types[1]}`; @@ -195,6 +206,7 @@ export namespace DashUploadUtils { .videoCodec('copy') // this will copy the data instead of reencode it .save(file.path.replace('.mkv', '.mp4')) .on('end', res) + .on('error', (e: any) => console.log(e)) ); file.path = file.path.replace('.mkv', '.mp4'); format = '.mp4'; @@ -224,7 +236,7 @@ export namespace DashUploadUtils { return { source: file, result: { name: 'Unsupported video format', message: `Could not upload unsupported file (${name}). Please convert to an .mp4` } }; } } - if (videoFormats.includes(format)) { + if (videoFormats.includes(format) || format.includes('.webm')) { return MoveParsedFile(file, Directory.videos); } fs.unlink(path, () => {}); @@ -398,7 +410,9 @@ export namespace DashUploadUtils { // Use the request library to parse out file level image information in the headers const { headers } = await new Promise<any>((resolve, reject) => { return request.head(resolvedUrl, (error, res) => (error ? reject(error) : resolve(res))); - }).catch(e => console.log(e)); + }).catch(e => { + console.log('Error processing headers: ', e); + }); try { // Compute the native width and height ofthe image with an npm module @@ -499,7 +513,7 @@ export namespace DashUploadUtils { const parseExifData = async (source: string) => { const image = await request.get(source, { encoding: null }); - const { data, error } = await new Promise(resolve => { + const { data, error } = await new Promise<{ data: any; error: any }>(resolve => { new ExifImage({ image }, (error, data) => { let reason: Opt<string> = undefined; if (error) { diff --git a/src/server/DataVizUtils.ts b/src/server/DataVizUtils.ts index 4fd0ca6ff..15f03b319 100644 --- a/src/server/DataVizUtils.ts +++ b/src/server/DataVizUtils.ts @@ -1,13 +1,17 @@ +import { readFileSync } from 'fs'; + export function csvParser(csv: string) { - const lines = csv.split("\n"); - const headers = lines[0].split(","); - const data = lines.slice(1).map(line => { - const values = line.split(","); - const obj: any = {}; - for (let i = 0; i < headers.length; i++) { - obj[headers[i]] = values[i]; - } - return obj; - }); + const lines = csv.split('\n'); + const headers = lines[0].split(',').map(header => header.trim()); + const data = lines.slice(1).map(line => + line.split(',').reduce((last, value, i) => { + last[headers[i]] = value.trim(); + return last; + }, {} as any) + ); return data; -}
\ No newline at end of file +} + +export function csvToString(path: string) { + return readFileSync(path, 'utf8'); +} diff --git a/src/server/Message.ts b/src/server/Message.ts index d87ae5027..8f0af08bc 100644 --- a/src/server/Message.ts +++ b/src/server/Message.ts @@ -94,4 +94,6 @@ export namespace MessageStore { export const YoutubeApiQuery = new Message<YoutubeQueryInput>("Youtube Api Query"); export const DeleteField = new Message<string>("Delete field"); export const DeleteFields = new Message<string[]>("Delete fields"); + + export const UpdateStats = new Message<string>("updatestats"); } diff --git a/src/server/index.ts b/src/server/index.ts index 6e6bde3cb..8b2e18847 100644 --- a/src/server/index.ts +++ b/src/server/index.ts @@ -4,6 +4,7 @@ import * as mobileDetect from 'mobile-detect'; import * as path from 'path'; import * as qs from 'query-string'; import { log_execution } from './ActionUtilities'; +import DataVizManager from './ApiManagers/DataVizManager'; import DeleteManager from './ApiManagers/DeleteManager'; import DownloadManager from './ApiManagers/DownloadManager'; import GeneralGoogleManager from './ApiManagers/GeneralGoogleManager'; @@ -18,6 +19,7 @@ import { GoogleCredentialsLoader, SSL } from './apis/google/CredentialsLoader'; import { GoogleApiServerUtils } from './apis/google/GoogleApiServerUtils'; import { DashSessionAgent } from './DashSession/DashSessionAgent'; import { AppliedSessionAgent } from './DashSession/Session/agents/applied_session_agent'; +import { DashStats } from './DashStats'; import { DashUploadUtils } from './DashUploadUtils'; import { Database } from './database'; import { Logger } from './ProcessFactory'; @@ -62,7 +64,19 @@ async function preliminaryFunctions() { * with the server */ function routeSetter({ isRelease, addSupervisedRoute, logRegistrationOutcome }: RouteManager) { - const managers = [new SessionManager(), new UserManager(), new UploadManager(), new DownloadManager(), new SearchManager(), new PDFManager(), new DeleteManager(), new UtilManager(), new GeneralGoogleManager(), new GooglePhotosManager()]; + const managers = [ + new SessionManager(), + new UserManager(), + new UploadManager(), + new DownloadManager(), + new SearchManager(), + new PDFManager(), + new DeleteManager(), + new UtilManager(), + new GeneralGoogleManager(), + new GooglePhotosManager(), + new DataVizManager(), + ]; // initialize API Managers console.log(yellow('\nregistering server routes...')); @@ -85,6 +99,18 @@ function routeSetter({ isRelease, addSupervisedRoute, logRegistrationOutcome }: addSupervisedRoute({ method: Method.GET, + subscription: '/stats', + secureHandler: ({ res }) => DashStats.handleStats(res), + }); + + addSupervisedRoute({ + method: Method.GET, + subscription: '/statsview', + secureHandler: ({ res }) => DashStats.handleStatsView(res), + }); + + addSupervisedRoute({ + method: Method.GET, subscription: '/resolvedPorts', secureHandler: ({ res }) => res.send(resolvedPorts), publicHandler: ({ res }) => res.send(resolvedPorts), diff --git a/src/server/server_Initialization.ts b/src/server/server_Initialization.ts index b0db71f9c..805da1d43 100644 --- a/src/server/server_Initialization.ts +++ b/src/server/server_Initialization.ts @@ -22,6 +22,7 @@ import { Database } from './database'; import RouteManager from './RouteManager'; import RouteSubscriber from './RouteSubscriber'; import { WebSocket } from './websocket'; +import brotli = require('brotli'); import expressFlash = require('express-flash'); import flash = require('connect-flash'); const MongoStore = require('connect-mongo')(session); @@ -171,31 +172,29 @@ function registerCorsProxy(server: express.Express) { function proxyServe(req: any, requrl: string, response: any) { const htmlBodyMemoryStream = new (require('memorystream'))(); var retrieveHTTPBody: any; + var wasinBrFormat = false; const sendModifiedBody = () => { const header = response.headers['content-encoding']; + const httpsToCors = (match: any, href: string, offset: any, string: any) => `href="${resolvedServerUrl + '/corsProxy/http' + href}"`; if (header?.includes('gzip')) { try { - const replacer = (match: any, href: string, offset: any, string: any) => { - return `href="${resolvedServerUrl + '/corsProxy/http' + href}"`; - }; - const zipToStringDecoder = new (require('string_decoder').StringDecoder)('utf8'); const bodyStream = htmlBodyMemoryStream.read(); if (bodyStream) { - const htmlText = zipToStringDecoder.write( - zlib - .gunzipSync(bodyStream) - .toString('utf8') - .replace('<head>', '<head> <style>[id ^= "google"] { display: none; } </style>') - .replace(/href="https?([^"]*)"/g, replacer) - .replace(/target="_blank"/g, '') - ); + const htmlInputText = wasinBrFormat ? Buffer.from(brotli.decompress(bodyStream)) : zlib.gunzipSync(bodyStream); + const htmlText = htmlInputText + .toString('utf8') + .replace('<head>', '<head> <style>[id ^= "google"] { display: none; } </style>') + .replace(/href="https?([^"]*)"/g, httpsToCors) + .replace(/data-srcset="[^"]*"/g, '') + .replace(/srcset="[^"]*"/g, '') + .replace(/target="_blank"/g, ''); response.send(zlib.gzipSync(htmlText)); } else { req.pipe(request(requrl)).pipe(response); console.log('EMPTY body:' + req.url); } } catch (e) { - console.log('EROR?: ', e); + console.log('ERROR?: ', e); } } else { req.pipe(htmlBodyMemoryStream).pipe(response); @@ -216,6 +215,10 @@ function proxyServe(req: any, requrl: string, response: any) { } else if (headerCharRegex.test(header || '')) { delete res.headers[headerName]; } else res.headers[headerName] = header; + if (headerName === 'content-encoding') { + wasinBrFormat = res.headers[headerName] === 'br'; + res.headers[headerName] = 'gzip'; + } }); res.headers['x-permitted-cross-domain-policies'] = 'all'; res.headers['x-frame-options'] = ''; diff --git a/src/server/updateProtos.ts b/src/server/updateProtos.ts index c5552f6bf..2f3772a77 100644 --- a/src/server/updateProtos.ts +++ b/src/server/updateProtos.ts @@ -1,14 +1,22 @@ -import { Database } from "./database"; +import { Database } from './database'; -const protos = - ["text", "image", "web", "collection", "kvp", "video", "audio", "pdf", "icon", "import", "linkdoc", "map"]; +const protos = ['text', 'image', 'web', 'collection', 'kvp', 'video', 'audio', 'pdf', 'icon', 'import', 'linkdoc', 'map']; (async function () { await Promise.all( - protos.map(protoId => new Promise(res => Database.Instance.update(protoId, { - $set: { "fields.baseProto": true } - }, res))) + protos.map( + protoId => + new Promise(res => + Database.Instance.update( + protoId, + { + $set: { 'fields.isBaseProto': true }, + }, + res + ) + ) + ) ); - console.log("done"); -})();
\ No newline at end of file + console.log('done'); +})(); diff --git a/src/server/websocket.ts b/src/server/websocket.ts index 9b91a35a6..be5cdb202 100644 --- a/src/server/websocket.ts +++ b/src/server/websocket.ts @@ -12,16 +12,19 @@ import { GoogleCredentialsLoader, SSL } from './apis/google/CredentialsLoader'; import YoutubeApi from './apis/youtube/youtubeApiSample'; import { initializeGuest } from './authentication/DashUserModel'; import { Client } from './Client'; +import { DashStats } from './DashStats'; import { Database } from './database'; import { DocumentsCollection } from './IDatabase'; import { Diff, GestureContent, MessageStore, MobileDocumentUploadContent, MobileInkOverlayContent, Transferable, Types, UpdateMobileInkOverlayPositionContent, YoutubeQueryInput, YoutubeQueryTypes } from './Message'; import { Search } from './Search'; import { resolvedPorts } from './server_Initialization'; +var _ = require('lodash'); export namespace WebSocket { export let _socket: Socket; - const clients: { [key: string]: Client } = {}; + export const clients: { [key: string]: Client } = {}; export const socketMap = new Map<SocketIO.Socket, string>(); + export const userOperations = new Map<string, number>(); export let disconnect: Function; export async function initialize(isRelease: boolean, app: express.Express) { @@ -49,6 +52,8 @@ export namespace WebSocket { next(); }); + socket.emit(MessageStore.UpdateStats.Message, DashStats.getUpdatedStatsBundle()); + // convenience function to log server messages on the client function log(message?: any, ...optionalParams: any[]) { socket.emit('log', ['Message from server:', message, ...optionalParams]); @@ -97,6 +102,15 @@ export namespace WebSocket { console.log('received bye'); }); + socket.on('disconnect', function () { + let currentUser = socketMap.get(socket); + if (!(currentUser === undefined)) { + let currentUsername = currentUser.split(' ')[0]; + DashStats.logUserLogout(currentUsername, socket); + delete timeMap[currentUsername]; + } + }); + Utils.Emit(socket, MessageStore.Foo, 'handshooken'); Utils.AddServerHandler(socket, MessageStore.Bar, guid => barReceived(socket, guid)); @@ -130,6 +144,12 @@ export namespace WebSocket { socket.disconnect(true); }; }); + + setInterval(function () { + // Utils.Emit(socket, MessageStore.UpdateStats, DashStats.getUpdatedStatsBundle()); + + io.emit(MessageStore.UpdateStats.Message, DashStats.getUpdatedStatsBundle()); + }, DashStats.SAMPLING_INTERVAL); } function processGesturePoints(socket: Socket, content: GestureContent) { @@ -176,7 +196,12 @@ export namespace WebSocket { const currentdate = new Date(); const datetime = currentdate.getDate() + '/' + (currentdate.getMonth() + 1) + '/' + currentdate.getFullYear() + ' @ ' + currentdate.getHours() + ':' + currentdate.getMinutes() + ':' + currentdate.getSeconds(); console.log(blue(`user ${userEmail} has connected to the web socket at: ${datetime}`)); + printActiveUsers(); + + timeMap[userEmail] = Date.now(); socketMap.set(socket, userEmail + ' at ' + datetime); + userOperations.set(userEmail, 0); + DashStats.logUserLogin(userEmail, socket); } function getField([id, callback]: [string, (result?: Transferable) => void]) { @@ -199,7 +224,7 @@ export namespace WebSocket { return Database.Instance.getDocument(id, callback); } function GetRefField([id, callback]: [string, (result?: Transferable) => void]) { - process.stdout.write(`.`); + process.stdout.write(`+`); GetRefFieldLocal([id, callback]); } @@ -293,15 +318,79 @@ export namespace WebSocket { ); } + /** + * findClosestIndex() is a helper function that will try to find + * the closest index of a list that has the same value as + * a specified argument/index pair. + * @param list the list to search through + * @param indexesToDelete a list of indexes that are already marked for deletion + * so they will be ignored + * @param value the value of the item to remove + * @param hintIndex the index that the element was at on the client's copy of + * the data + * @returns the closest index with the same value or -1 if the element was not found. + */ + function findClosestIndex(list: any, indexesToDelete: number[], value: any, hintIndex: number) { + let closestIndex = -1; + for (let i = 0; i < list.length; i++) { + if (list[i] === value && !indexesToDelete.includes(i)) { + if (Math.abs(i - hintIndex) < Math.abs(closestIndex - hintIndex)) { + closestIndex = i; + } + } + } + return closestIndex; + } + + /** + * remFromListField() receives the items to remove and a hint + * from the client, and attempts to make the modification to the + * server's copy of the data. If server's copy does not match + * the client's after removal, the server will SEND BACk + * its version to the client. + * @param socket the socket that the client is connected on + * @param diff an object containing the items to remove and a hint + * (the hint contains start index and deleteCount, the number of + * items to delete) + * @param curListItems the server's current copy of the data + */ function remFromListField(socket: Socket, diff: Diff, curListItems?: Transferable): void { diff.diff.$set = diff.diff.$remFromSet; delete diff.diff.$remFromSet; const updatefield = Array.from(Object.keys(diff.diff.$set))[0]; const remListItems = diff.diff.$set[updatefield].fields; const curList = (curListItems as any)?.fields?.[updatefield.replace('fields.', '')]?.fields.filter((f: any) => f !== null) || []; - diff.diff.$set[updatefield].fields = curList?.filter( - (curItem: any) => !remListItems.some((remItem: any) => (remItem.fieldId ? remItem.fieldId === curItem.fieldId : remItem.heading ? remItem.heading === curItem.heading : remItem === curItem)) - ); + const hint = diff.diff.$set.hint; + + if (hint) { + // indexesToRemove stores the indexes that we mark for deletion, which is later used to filter the list (delete the elements) + let indexesToRemove: number[] = []; + for (let i = 0; i < hint.deleteCount; i++) { + if (curList.length > i + hint.start && _.isEqual(curList[i + hint.start], remListItems[i])) { + indexesToRemove.push(i + hint.start); + continue; + } + + let closestIndex = findClosestIndex(curList, indexesToRemove, remListItems[i], i + hint.start); + if (closestIndex !== -1) { + indexesToRemove.push(closestIndex); + } else { + console.log('Item to delete was not found - index = -1'); + } + } + + diff.diff.$set[updatefield].fields = curList?.filter((curItem: any, index: number) => !indexesToRemove.includes(index)); + } else { + // go back to the original way to delete if we didn't receive + // a hint from the client + diff.diff.$set[updatefield].fields = curList?.filter( + (curItem: any) => !remListItems.some((remItem: any) => (remItem.fieldId ? remItem.fieldId === curItem.fieldId : remItem.heading ? remItem.heading === curItem.heading : remItem === curItem)) + ); + } + + // if the client and server have different versions of the data after + // deletion, they will have different lengths and the server will + // send its version of the data to the client const sendBack = diff.diff.length !== diff.diff.$set[updatefield].fields.length; delete diff.diff.length; Database.Instance.update( @@ -309,6 +398,7 @@ export namespace WebSocket { diff.diff, () => { if (sendBack) { + // the two copies are different, so the server sends its copy. console.log('SEND BACK'); const id = socket.id; socket.id = ''; @@ -346,6 +436,11 @@ export namespace WebSocket { var CurUser: string | undefined = undefined; function UpdateField(socket: Socket, diff: Diff) { + const curUser = socketMap.get(socket); + if (!curUser) return; + let currentUsername = curUser.split(' ')[0]; + userOperations.set(currentUsername, userOperations.get(currentUsername) !== undefined ? userOperations.get(currentUsername)! + 1 : 0); + if (CurUser !== socketMap.get(socket)) { CurUser = socketMap.get(socket); console.log('Switch User: ' + CurUser); @@ -381,8 +476,8 @@ export namespace WebSocket { if (term !== undefined) { const { suffix, value } = term; update[key + suffix] = { set: value }; - if (key.endsWith('lastModified')) { - update['lastModified' + suffix] = value; + if (key.endsWith('modificationDate')) { + update['modificationDate' + suffix] = value; } } } |
