fix(core): properly hash tasks that depend on outputs of other tasks (#17512)
This commit is contained in:
parent
9fa9fda61d
commit
83895e4573
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -1090,6 +1090,7 @@ dependencies = [
|
||||
"anyhow",
|
||||
"assert_fs",
|
||||
"crossbeam-channel",
|
||||
"globset",
|
||||
"ignore",
|
||||
"ignore-files",
|
||||
"itertools",
|
||||
|
||||
@ -10,6 +10,7 @@ napi-derive = '2.9.3'
|
||||
ignore = '0.4'
|
||||
crossbeam-channel = '0.5'
|
||||
|
||||
globset = "0.4.10"
|
||||
ignore-files = "1.3.0"
|
||||
watchexec = "2.3.0"
|
||||
watchexec-filterer-ignore = "1.2.1"
|
||||
|
||||
@ -28,11 +28,10 @@ export class FileHasher {
|
||||
return hashFile(path).hash;
|
||||
}
|
||||
|
||||
hashFolder(path: string): Map<string, string> {
|
||||
hashFilesMatchingGlobs(path: string, globs: string[]): string {
|
||||
// Import as needed. There is also an issue running unit tests in Nx repo if this is a top-level import.
|
||||
const { hashFiles } = require('../native');
|
||||
const filesObject = hashFiles(path) as Record<string, string>;
|
||||
return new Map(Object.entries(filesObject));
|
||||
const { hashFilesMatchingGlobs } = require('../native');
|
||||
return hashFilesMatchingGlobs(path, globs);
|
||||
}
|
||||
|
||||
clear(): void {
|
||||
|
||||
@ -1,15 +1,17 @@
|
||||
import { Task, TaskGraph } from '../config/task-graph';
|
||||
import { getCustomHasher } from '../tasks-runner/utils';
|
||||
import { readProjectsConfigurationFromProjectGraph } from '../project-graph/project-graph';
|
||||
import { TaskHasher } from './task-hasher';
|
||||
import { getInputs, TaskHasher } from './task-hasher';
|
||||
import { ProjectGraph } from '../config/project-graph';
|
||||
import { Workspaces } from '../config/workspaces';
|
||||
import { NxJsonConfiguration } from '../config/nx-json';
|
||||
|
||||
export async function hashTasksThatDoNotDependOnOtherTasks(
|
||||
export async function hashTasksThatDoNotDependOnOutputsOfOtherTasks(
|
||||
workspaces: Workspaces,
|
||||
hasher: TaskHasher,
|
||||
projectGraph: ProjectGraph,
|
||||
taskGraph: TaskGraph
|
||||
taskGraph: TaskGraph,
|
||||
nxJson: NxJsonConfiguration
|
||||
) {
|
||||
const tasks = Object.values(taskGraph.tasks);
|
||||
const tasksWithHashers = await Promise.all(
|
||||
@ -25,7 +27,17 @@ export async function hashTasksThatDoNotDependOnOtherTasks(
|
||||
);
|
||||
|
||||
const tasksToHash = tasksWithHashers
|
||||
.filter((t) => !t.customHasher)
|
||||
.filter(({ task, customHasher }) => {
|
||||
// If a task has a custom hasher, it might depend on the outputs of other tasks
|
||||
if (customHasher) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return (
|
||||
taskGraph.dependencies[task.id].length === 0 &&
|
||||
!!getInputs(task, projectGraph, nxJson).depsOutputs
|
||||
);
|
||||
})
|
||||
.map((t) => t.task);
|
||||
|
||||
const hashes = await hasher.hashTasks(tasksToHash);
|
||||
|
||||
@ -1499,10 +1499,19 @@ describe('TaskHasher', () => {
|
||||
} as any,
|
||||
{},
|
||||
{
|
||||
// allFileData: () => allWorkspaceFiles,
|
||||
hashFolder: (path: string) => {
|
||||
const files = distFolder.filter((f) => f[0].startsWith(path));
|
||||
return new Map(files.map((f) => [f[0], f[1]]));
|
||||
hashFilesMatchingGlobs: (path: string, globs: string[]) => {
|
||||
const hashes = [];
|
||||
for (const [file, hash] of distFolder) {
|
||||
if (!file.startsWith(path)) {
|
||||
continue;
|
||||
}
|
||||
for (const glob of globs) {
|
||||
if (file.endsWith(glob.split('**/*')[1])) {
|
||||
hashes.push(hash);
|
||||
}
|
||||
}
|
||||
}
|
||||
return hashes.join('|');
|
||||
},
|
||||
} as any
|
||||
);
|
||||
@ -1519,8 +1528,8 @@ describe('TaskHasher', () => {
|
||||
expect(hash.value).toContain('c.d.ts.hash');
|
||||
|
||||
assertFilesets(hash, {
|
||||
'child:output': { contains: '**/*.d.ts' },
|
||||
'grandchild:output': { contains: '**/*.d.ts' },
|
||||
'dist/libs/child/**/*.d.ts': { contains: 'b.d.ts.hash' },
|
||||
'dist/libs/grandchild/**/*.d.ts': { contains: 'c.d.ts.hash' },
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@ -16,7 +16,6 @@ import { createProjectRootMappings } from '../project-graph/utils/find-project-f
|
||||
import { findMatchingProjects } from '../utils/find-matching-projects';
|
||||
import { FileHasher, hashArray } from './file-hasher';
|
||||
import { getOutputsForTargetAndConfiguration } from '../tasks-runner/utils';
|
||||
import { workspaceRoot } from '../devkit-exports';
|
||||
import { join } from 'path';
|
||||
|
||||
type ExpandedSelfInput =
|
||||
@ -203,19 +202,11 @@ class TaskHasherImpl {
|
||||
|
||||
async hashTask(task: Task, visited: string[]): Promise<PartialHash> {
|
||||
return Promise.resolve().then(async () => {
|
||||
const projectNode = this.projectGraph.nodes[task.target.project];
|
||||
const namedInputs = getNamedInputs(this.nxJson, projectNode);
|
||||
const targetData = projectNode.data.targets[task.target.target];
|
||||
const targetDefaults = (this.nxJson.targetDefaults || {})[
|
||||
task.target.target
|
||||
];
|
||||
const { selfInputs, depsInputs, depsOutputs, projectInputs } =
|
||||
splitInputsIntoSelfAndDependencies(
|
||||
targetData.inputs ||
|
||||
targetDefaults?.inputs ||
|
||||
(DEFAULT_INPUTS as any),
|
||||
namedInputs
|
||||
);
|
||||
const { selfInputs, depsInputs, depsOutputs, projectInputs } = getInputs(
|
||||
task,
|
||||
this.projectGraph,
|
||||
this.nxJson
|
||||
);
|
||||
|
||||
const selfAndInputs = await this.hashSelfAndDepsInputs(
|
||||
task.target.project,
|
||||
@ -391,24 +382,18 @@ class TaskHasherImpl {
|
||||
childTask,
|
||||
this.projectGraph.nodes[childTask.target.project]
|
||||
);
|
||||
const files: FileData[] = [];
|
||||
const patterns: string[] = [];
|
||||
const hashes = {};
|
||||
for (const outputDir of outputDirs) {
|
||||
const fileHashes = this.fileHasher.hashFolder(outputDir);
|
||||
for (const [file, hash] of fileHashes) {
|
||||
files.push({ file, hash });
|
||||
}
|
||||
patterns.push(join(outputDir, dependentTasksOutputFiles));
|
||||
}
|
||||
const filtered = filterUsingGlobPatterns(workspaceRoot, files, patterns);
|
||||
if (filtered.length > 0) {
|
||||
partialHashes.push({
|
||||
value: hashArray(filtered.map((f) => f.hash)),
|
||||
details: {
|
||||
[`${childTask.target.project}:output`]: dependentTasksOutputFiles,
|
||||
},
|
||||
});
|
||||
hashes[join(outputDir, dependentTasksOutputFiles)] =
|
||||
this.fileHasher.hashFilesMatchingGlobs(outputDir, [
|
||||
dependentTasksOutputFiles,
|
||||
]);
|
||||
}
|
||||
|
||||
partialHashes.push({
|
||||
value: hashArray(Object.values(hashes)),
|
||||
details: hashes,
|
||||
});
|
||||
if (transitive) {
|
||||
partialHashes.push(
|
||||
...this.hashDepOuputs(
|
||||
@ -761,7 +746,24 @@ export function extractPatternsFromFileSets(
|
||||
.map((c) => c['fileset']);
|
||||
}
|
||||
|
||||
export function splitInputsIntoSelfAndDependencies(
|
||||
export function getInputs(
|
||||
task: Task,
|
||||
projectGraph: ProjectGraph,
|
||||
nxJson: NxJsonConfiguration
|
||||
) {
|
||||
const projectNode = projectGraph.nodes[task.target.project];
|
||||
const namedInputs = getNamedInputs(nxJson, projectNode);
|
||||
const targetData = projectNode.data.targets[task.target.target];
|
||||
const targetDefaults = (nxJson.targetDefaults || {})[task.target.target];
|
||||
const { selfInputs, depsInputs, depsOutputs, projectInputs } =
|
||||
splitInputsIntoSelfAndDependencies(
|
||||
targetData.inputs || targetDefaults?.inputs || (DEFAULT_INPUTS as any),
|
||||
namedInputs
|
||||
);
|
||||
return { selfInputs, depsInputs, depsOutputs, projectInputs };
|
||||
}
|
||||
|
||||
function splitInputsIntoSelfAndDependencies(
|
||||
inputs: ReadonlyArray<InputDefinition | string>,
|
||||
namedInputs: { [inputName: string]: ReadonlyArray<InputDefinition | string> }
|
||||
): {
|
||||
|
||||
1
packages/nx/src/native/index.d.ts
vendored
1
packages/nx/src/native/index.d.ts
vendored
@ -10,6 +10,7 @@ export interface FileData {
|
||||
export function hashArray(input: Array<string>): string
|
||||
export function hashFile(file: string): FileData | null
|
||||
export function hashFiles(workspaceRoot: string): Record<string, string>
|
||||
export function hashFilesMatchingGlobs(directory: string, globPatterns: Array<string>): string | null
|
||||
/**
|
||||
* Newly created files will have the `update` EventType as well.
|
||||
* This simplifies logic between OS's, IDEs and git operations
|
||||
|
||||
@ -246,10 +246,11 @@ if (!nativeBinding) {
|
||||
throw new Error(`Failed to load native binding`)
|
||||
}
|
||||
|
||||
const { hashArray, hashFile, hashFiles, EventType, Watcher } = nativeBinding
|
||||
const { hashArray, hashFile, hashFiles, hashFilesMatchingGlobs, EventType, Watcher } = nativeBinding
|
||||
|
||||
module.exports.hashArray = hashArray
|
||||
module.exports.hashFile = hashFile
|
||||
module.exports.hashFiles = hashFiles
|
||||
module.exports.hashFilesMatchingGlobs = hashFilesMatchingGlobs
|
||||
module.exports.EventType = EventType
|
||||
module.exports.Watcher = Watcher
|
||||
|
||||
@ -1,7 +1,10 @@
|
||||
#![allow(unused)]
|
||||
|
||||
use anyhow::anyhow;
|
||||
use crossbeam_channel::unbounded;
|
||||
use globset::{Glob, GlobSetBuilder};
|
||||
use ignore::WalkBuilder;
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::HashMap;
|
||||
use std::path::Path;
|
||||
use std::thread::{self, available_parallelism};
|
||||
@ -15,6 +18,26 @@ pub struct FileData {
|
||||
pub hash: String,
|
||||
}
|
||||
|
||||
impl Eq for FileData {}
|
||||
|
||||
impl PartialEq<Self> for FileData {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.file.eq(&other.file)
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialOrd<Self> for FileData {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||
self.file.partial_cmp(&other.file)
|
||||
}
|
||||
}
|
||||
|
||||
impl Ord for FileData {
|
||||
fn cmp(&self, other: &Self) -> Ordering {
|
||||
self.file.cmp(&other.file)
|
||||
}
|
||||
}
|
||||
|
||||
#[napi]
|
||||
fn hash_array(input: Vec<String>) -> String {
|
||||
let joined = input.join(",");
|
||||
@ -97,6 +120,87 @@ fn hash_files(workspace_root: String) -> HashMap<String, String> {
|
||||
receiver_thread.join().unwrap()
|
||||
}
|
||||
|
||||
#[napi]
|
||||
fn hash_files_matching_globs(
|
||||
directory: String,
|
||||
glob_patterns: Vec<String>,
|
||||
) -> anyhow::Result<Option<String>> {
|
||||
let mut globset_builder = GlobSetBuilder::new();
|
||||
|
||||
for pattern in glob_patterns {
|
||||
globset_builder.add(Glob::new(&pattern).map_err(|_| anyhow!("Invalid Glob {pattern}"))?);
|
||||
}
|
||||
let globset = globset_builder
|
||||
.build()
|
||||
.map_err(|_| anyhow!("Error building globset builder"))?;
|
||||
|
||||
let cpus = available_parallelism().map_or(2, |n| n.get()) - 1;
|
||||
|
||||
let mut walker = WalkBuilder::new(&directory);
|
||||
walker.hidden(false);
|
||||
|
||||
let (sender, receiver) = unbounded::<(String, Vec<u8>)>();
|
||||
|
||||
let receiver_thread = thread::spawn(move || {
|
||||
let mut collection: Vec<FileData> = Vec::new();
|
||||
for (path, content) in receiver {
|
||||
if globset.is_match(&path) {
|
||||
collection.push(FileData {
|
||||
file: path,
|
||||
hash: xxh3::xxh3_64(&content).to_string(),
|
||||
});
|
||||
}
|
||||
}
|
||||
collection
|
||||
});
|
||||
|
||||
walker.threads(cpus).build_parallel().run(|| {
|
||||
let tx = sender.clone();
|
||||
let directory = directory.clone();
|
||||
Box::new(move |entry| {
|
||||
use ignore::WalkState::*;
|
||||
|
||||
#[rustfmt::skip]
|
||||
let Ok(dir_entry) = entry else {
|
||||
return Continue;
|
||||
};
|
||||
|
||||
let Ok(content) = std::fs::read(dir_entry.path()) else {
|
||||
return Continue;
|
||||
};
|
||||
|
||||
let Ok(file_path) = dir_entry.path().strip_prefix(&directory) else {
|
||||
return Continue;
|
||||
};
|
||||
|
||||
let Some(file_path) = file_path.to_str() else {
|
||||
return Continue;
|
||||
};
|
||||
|
||||
// convert back-slashes in Windows paths, since the js expects only forward-slash path separators
|
||||
#[cfg(target_os = "windows")]
|
||||
let file_path = file_path.replace('\\', "/");
|
||||
|
||||
tx.send((file_path.to_string(), content)).ok();
|
||||
|
||||
Continue
|
||||
})
|
||||
});
|
||||
drop(sender);
|
||||
|
||||
let mut hashes = receiver_thread.join().unwrap();
|
||||
if hashes.is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
// Sort the file data so that its in deterministically ordered by file path
|
||||
hashes.sort();
|
||||
|
||||
let sorted_file_hashes: Vec<String> =
|
||||
hashes.into_iter().map(|file_data| file_data.hash).collect();
|
||||
Ok(Some(hash_array(sorted_file_hashes)))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
@ -157,6 +261,25 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn it_hashes_files_matching_globs() -> anyhow::Result<()> {
|
||||
// handle empty workspaces
|
||||
let content =
|
||||
hash_files_matching_globs("/does/not/exist".into(), Vec::from([String::from("**/*")]))?;
|
||||
assert!(content.is_none());
|
||||
|
||||
let temp_dir = setup_fs();
|
||||
|
||||
let content = hash_files_matching_globs(
|
||||
temp_dir.display().to_string(),
|
||||
Vec::from([String::from("fo*.txt")]),
|
||||
)?;
|
||||
// println!("{:?}", content);
|
||||
assert_eq!(content.unwrap(), String::from("12742692716897613184"),);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handles_nx_ignore() {
|
||||
let temp_dir = setup_fs();
|
||||
|
||||
@ -29,7 +29,7 @@ import {
|
||||
DaemonBasedTaskHasher,
|
||||
InProcessTaskHasher,
|
||||
} from '../hasher/task-hasher';
|
||||
import { hashTasksThatDoNotDependOnOtherTasks } from '../hasher/hash-task';
|
||||
import { hashTasksThatDoNotDependOnOutputsOfOtherTasks } from '../hasher/hash-task';
|
||||
import { daemonClient } from '../daemon/client/client';
|
||||
import { StoreRunInformationLifeCycle } from './life-cycles/store-run-information-life-cycle';
|
||||
import { fileHasher } from '../hasher/file-hasher';
|
||||
@ -253,11 +253,12 @@ export async function invokeTasksRunner({
|
||||
// to submit everything that is known in advance to Nx Cloud to run in
|
||||
// a distributed fashion
|
||||
performance.mark('hashing:start');
|
||||
await hashTasksThatDoNotDependOnOtherTasks(
|
||||
await hashTasksThatDoNotDependOnOutputsOfOtherTasks(
|
||||
new Workspaces(workspaceRoot),
|
||||
hasher,
|
||||
projectGraph,
|
||||
taskGraph
|
||||
taskGraph,
|
||||
nxJson
|
||||
);
|
||||
performance.mark('hashing:end');
|
||||
performance.measure('hashing', 'hashing:start', 'hashing:end');
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user