fix(core): daemon process should propagate file watcher errors to client

This commit is contained in:
Victor Savkin 2021-12-02 13:54:04 -05:00 committed by Victor Savkin
parent 99164baaa0
commit 738708c11c
5 changed files with 116 additions and 80 deletions

View File

@ -48,8 +48,6 @@ describe('serverLogger', () => {
serverLogger.log('Server started');
serverLogger.watcherLog('Watching started');
serverLogger.requestLog('A request has come in');
serverLogger.nestedLog('Doing something with the request...');
serverLogger.nestedLog('Done with the request');
serverLogger.watcherLog('Watching stopped');
serverLogger.log('Server stopped');
// prettier-ignore
@ -57,8 +55,6 @@ describe('serverLogger', () => {
['[NX Daemon Server] - 2021-10-11T17:18:45.980Z - Server started'],
['[NX Daemon Server] - 2021-10-11T17:18:45.980Z - [WATCHER]: Watching started'],
['[NX Daemon Server] - 2021-10-11T17:18:45.980Z - [REQUEST]: A request has come in'],
['[NX Daemon Server] - 2021-10-11T17:18:45.980Z - Doing something with the request...'],
['[NX Daemon Server] - 2021-10-11T17:18:45.980Z - Done with the request'],
['[NX Daemon Server] - 2021-10-11T17:18:45.980Z - [WATCHER]: Watching stopped'],
['[NX Daemon Server] - 2021-10-11T17:18:45.980Z - Server stopped'],
]);

View File

@ -32,10 +32,6 @@ class ServerLogger {
this.log(`[WATCHER]: ${s.join(' ')}`);
}
nestedLog(...s: unknown[]) {
this.log(` ${s.join(' ')}`);
}
private formatLogMessage(message: string) {
return `[NX Daemon Server] - ${this.getNow()} - ${message}`;
}

View File

@ -33,26 +33,30 @@ const collectedDeletedFiles = new Set<string>();
let waitPeriod = 100;
let scheduledTimeoutId;
export function getCachedSerializedProjectGraphPromise() {
// recomputing it now on demand. we can ignore the scheduled timeout
if (scheduledTimeoutId) {
clearTimeout(scheduledTimeoutId);
scheduledTimeoutId = undefined;
}
export async function getCachedSerializedProjectGraphPromise() {
try {
// recomputing it now on demand. we can ignore the scheduled timeout
if (scheduledTimeoutId) {
clearTimeout(scheduledTimeoutId);
scheduledTimeoutId = undefined;
}
// reset the wait time
waitPeriod = 100;
resetInternalStateIfNxDepsMissing();
if (collectedUpdatedFiles.size == 0 && collectedDeletedFiles.size == 0) {
if (!cachedSerializedProjectGraphPromise) {
processCollectedUpdatedAndDeletedFiles(); // this creates a project graph
// reset the wait time
waitPeriod = 100;
resetInternalStateIfNxDepsMissing();
if (collectedUpdatedFiles.size == 0 && collectedDeletedFiles.size == 0) {
if (!cachedSerializedProjectGraphPromise) {
processCollectedUpdatedAndDeletedFiles(); // this creates a project graph
cachedSerializedProjectGraphPromise = createAndSerializeProjectGraph();
}
} else {
processCollectedUpdatedAndDeletedFiles();
cachedSerializedProjectGraphPromise = createAndSerializeProjectGraph();
}
} else {
processCollectedUpdatedAndDeletedFiles();
cachedSerializedProjectGraphPromise = createAndSerializeProjectGraph();
return await cachedSerializedProjectGraphPromise;
} catch (e) {
return { error: e, serializedProjectGraph: null };
}
return cachedSerializedProjectGraphPromise;
}
export function addUpdatedAndDeletedFiles(
@ -96,7 +100,7 @@ function processCollectedUpdatedAndDeletedFiles() {
);
defaultFileHasher.incrementalUpdate(updatedFiles, deletedFiles);
const workspaceJson = readWorkspaceJson();
serverLogger.nestedLog(
serverLogger.requestLog(
`Updated file-hasher based on watched changes, recomputing project graph...`
);
// when workspace.json changes we cannot be sure about the correctness of the project file map

View File

@ -1,4 +1,4 @@
import { logger, normalizePath } from '@nrwl/devkit';
import { logger, normalizePath, stripIndents } from '@nrwl/devkit';
import { appRootPath } from '@nrwl/tao/src/utils/app-root';
import { createServer, Server, Socket } from 'net';
import { join } from 'path';
@ -24,57 +24,88 @@ import {
import {
addUpdatedAndDeletedFiles,
getCachedSerializedProjectGraphPromise,
resetInternalState,
} from './project-graph-incremental-recomputation';
import { statSync } from 'fs';
function respondToClient(socket: Socket, message: string) {
socket.write(message, () => {
// Close the connection once all data has been written so that the client knows when to read it.
socket.end();
serverLogger.nestedLog(`Closed Connection to Client`);
return new Promise((res) => {
socket.write(message, () => {
// Close the connection once all data has been written so that the client knows when to read it.
socket.end();
serverLogger.log(`Closed Connection to Client`);
res(null);
});
});
}
let watcherSubscription: WatcherSubscription | undefined;
let performanceObserver: PerformanceObserver | undefined;
let watcherError: Error | undefined;
const server = createServer((socket) => {
async function respondWithErrorAndExit(
socket: Socket,
description: string,
error: Error
) {
// print some extra stuff in the error message
serverLogger.requestLog(
`Responding to the client with an error`,
description,
error.message
);
console.error(error);
error.message = stripIndents`
${error.message}
${description}
Because of the error the Nx daemon process has exited. The next Nx command is going to restart the daemon process.
If the error persists, please run "nx reset".
`;
await respondToClient(socket, serializeResult(error, null));
process.exit(1);
}
const server = createServer(async (socket) => {
resetInactivityTimeout(handleInactivityTimeout);
if (!performanceObserver) {
performanceObserver = new PerformanceObserver((list) => {
const entry = list.getEntries()[0];
serverLogger.nestedLog(
`Time taken for '${entry.name}'`,
`${entry.duration}ms`
);
serverLogger.log(`Time taken for '${entry.name}'`, `${entry.duration}ms`);
});
performanceObserver.observe({ entryTypes: ['measure'], buffered: false });
}
socket.on('data', async (data) => {
if (watcherError) {
await respondWithErrorAndExit(
socket,
`File watcher error.`,
watcherError
);
}
resetInactivityTimeout(handleInactivityTimeout);
const payload = data.toString();
if (payload !== 'REQUEST_PROJECT_GRAPH_PAYLOAD') {
throw new Error(`Unsupported payload sent to daemon server: ${payload}`);
await respondWithErrorAndExit(
socket,
null,
new Error(`Unsupported payload sent to daemon server: ${payload}`)
);
}
performance.mark('server-connection');
serverLogger.requestLog('Client Request for Project Graph Received');
const result = await getCachedSerializedProjectGraphPromise();
if (result.error) {
resetInternalState();
serverLogger.nestedLog(
`Error when preparing serialized project graph: ${result.error.message}`
);
respondToClient(
await respondWithErrorAndExit(
socket,
serializeResult(result.error, result.serializedProjectGraph)
`Error when preparing serialized project graph.`,
result.error
);
return;
}
const serializedResult = serializeResult(
@ -82,18 +113,13 @@ const server = createServer((socket) => {
result.serializedProjectGraph
);
if (!serializedResult) {
resetInternalState();
serverLogger.nestedLog(`Error when serializing project graph result`);
respondToClient(
await respondWithErrorAndExit(
socket,
serializeResult(
new Error(
'Critical error when serializing server result, check server logs'
),
null
`Error when serializing project graph result.`,
new Error(
'Critical error when serializing server result, check server logs'
)
);
return;
}
performance.mark('serialized-project-graph-ready');
@ -121,7 +147,7 @@ const server = createServer((socket) => {
result.serializedProjectGraph,
'utf-8'
);
serverLogger.nestedLog(
serverLogger.requestLog(
`Closed Connection to Client (${bytesWritten} bytes transferred)`
);
});
@ -178,8 +204,8 @@ function resolveCurrentNxVersion(): string | null {
version: string;
};
return version;
} catch {
serverLogger.nestedLog(
} catch (err) {
serverLogger.log(
`Error: Could not determine the current Nx version by inspecting: ${nrwlWorkspacePackageJsonPath}`
);
return null;
@ -203,26 +229,34 @@ const handleWorkspaceChanges: SubscribeToWorkspaceChangesCallback = async (
err,
changeEvents
) => {
resetInactivityTimeout(handleInactivityTimeout);
if (!isNxVersionSame(resolveCurrentNxVersion())) {
await handleServerProcessTermination({
server,
watcherSubscription,
reason: '@nrwl/workspace installation changed',
});
if (watcherError) {
serverLogger.watcherLog(
'Skipping handleWorkspaceChanges because of a previously recorded watcher error.'
);
return;
}
if (err || !changeEvents || !changeEvents.length) {
serverLogger.watcherLog('Unexpected Error');
console.error(err);
return;
}
serverLogger.watcherLog(convertChangeEventsToLogMessage(changeEvents));
try {
resetInactivityTimeout(handleInactivityTimeout);
if (!isNxVersionSame(resolveCurrentNxVersion())) {
await handleServerProcessTermination({
server,
watcherSubscription,
reason: '@nrwl/workspace installation changed',
});
return;
}
if (err || !changeEvents || !changeEvents.length) {
serverLogger.watcherLog('Unexpected watcher error', err.message);
console.error(err);
watcherError = err;
return;
}
serverLogger.watcherLog(convertChangeEventsToLogMessage(changeEvents));
const filesToHash = [];
const deletedFiles = [];
for (const event of changeEvents) {
@ -241,8 +275,9 @@ const handleWorkspaceChanges: SubscribeToWorkspaceChangesCallback = async (
}
addUpdatedAndDeletedFiles(filesToHash, deletedFiles);
} catch (err) {
serverLogger.log(`Unexpected Error`);
serverLogger.watcherLog(`Unexpected error`, err.message);
console.error(err);
watcherError = err;
}
};

View File

@ -21,13 +21,18 @@ export async function handleServerProcessTermination({
reason,
watcherSubscription,
}: HandleServerProcessTerminationParams) {
server.close();
if (watcherSubscription) {
await watcherSubscription.unsubscribe();
serverLogger.watcherLog(`Unsubscribed from changes within: ${appRootPath}`);
try {
server.close();
if (watcherSubscription) {
await watcherSubscription.unsubscribe();
serverLogger.watcherLog(
`Unsubscribed from changes within: ${appRootPath}`
);
}
serverLogger.log(`Server stopped because: "${reason}"`);
} finally {
process.exit(0);
}
serverLogger.log(`Server stopped because: "${reason}"`);
process.exit(0);
}
let serverInactivityTimerId: NodeJS.Timeout | undefined;