feat(core): use durations from task history to schedule tasks (#27783)

<!-- Please make sure you have read the submission guidelines before
posting an PR -->
<!--
https://github.com/nrwl/nx/blob/master/CONTRIBUTING.md#-submitting-a-pr
-->

<!-- Please make sure that your commit message follows our format -->
<!-- Example: `fix(nx): must begin with lowercase` -->

<!-- If this is a particularly complex change or feature addition, you
can request a dedicated Nx release for this pull request branch. Mention
someone from the Nx team or the `@nrwl/nx-pipelines-reviewers` and they
will confirm if the PR warrants its own release for testing purposes,
and generate it for you if appropriate. -->

## Current Behavior
<!-- This is the behavior we have today -->

## Expected Behavior
<!-- This is the behavior we should expect with the changes in this PR
-->

## Related Issue(s)
<!-- Please link the issue being fixed so it gets closed when this is
merged. -->

Fixes #

Co-authored-by: FrozenPandaz <jasonjean1993@gmail.com>
This commit is contained in:
Emily Xiong 2024-09-12 15:53:37 -04:00 committed by GitHub
parent 0e603af8a9
commit 9c4092de8d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 395 additions and 117 deletions

51
Cargo.lock generated
View File

@ -177,7 +177,7 @@ version = "0.69.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a00dc851838a2120612785d195287475a3ac45514741da670b735818822129a0"
dependencies = [
"bitflags 2.5.0",
"bitflags 2.6.0",
"cexpr",
"clang-sys",
"itertools",
@ -202,9 +202,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bitflags"
version = "2.5.0"
version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf4b9d6a944f767f8e5e0db018570623c85f3d925ac718db4e06d0187adb21c1"
checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de"
[[package]]
name = "bitvec"
@ -268,9 +268,12 @@ checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223"
[[package]]
name = "cc"
version = "1.0.90"
version = "1.1.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8cd6604a82acf3039f1144f54b8eb34e91ffba622051189e71b781822d5ee1f5"
checksum = "e9d013ecb737093c0e86b151a7b837993cf9ec6c502946cfb44bedc392421e0b"
dependencies = [
"shlex",
]
[[package]]
name = "cexpr"
@ -369,7 +372,7 @@ version = "0.27.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f476fe445d41c9e991fd07515a6f463074b782242ccf4a5b7b1d1012e70824df"
dependencies = [
"bitflags 2.5.0",
"bitflags 2.6.0",
"crossterm_winapi",
"libc",
"mio",
@ -475,9 +478,9 @@ checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0"
[[package]]
name = "fallible-iterator"
version = "0.2.0"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7"
checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649"
[[package]]
name = "fallible-streaming-iterator"
@ -710,7 +713,7 @@ version = "0.14.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fbd06203b1a9b33a78c88252a625031b094d9e1b647260070c25b09910c0a804"
dependencies = [
"bitflags 2.5.0",
"bitflags 2.6.0",
"bstr",
"gix-path",
"libc",
@ -758,7 +761,7 @@ version = "0.14.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5db19298c5eeea2961e5b3bf190767a2d1f09b8802aeb5f258e42276350aff19"
dependencies = [
"bitflags 2.5.0",
"bitflags 2.6.0",
"bstr",
"gix-features",
"gix-path",
@ -844,7 +847,7 @@ version = "0.10.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fddc27984a643b20dd03e97790555804f98cf07404e0e552c0ad8133266a79a1"
dependencies = [
"bitflags 2.5.0",
"bitflags 2.6.0",
"gix-path",
"libc",
"windows-sys 0.52.0",
@ -914,7 +917,7 @@ version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0bf760ebf69878d9fd8f110c89703d90ce35095324d1f1edcb595c63945ee757"
dependencies = [
"bitflags 2.5.0",
"bitflags 2.6.0",
"ignore",
"walkdir",
]
@ -942,9 +945,9 @@ dependencies = [
[[package]]
name = "hashlink"
version = "0.8.4"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8094feaf31ff591f651a2664fb9cfd92bba7a60ce3197265e9482ebe753c8f7"
checksum = "6ba4ff7128dee98c7dc9794b6a411377e1404dba1c97deb8d1a55297bd25d8af"
dependencies = [
"hashbrown 0.14.5",
]
@ -1133,9 +1136,9 @@ dependencies = [
[[package]]
name = "libsqlite3-sys"
version = "0.26.0"
version = "0.30.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "afc22eff61b133b115c6e8c74e818c628d6d5e7a502afea6f64dee076dd94326"
checksum = "2e99fb7a497b1e3339bc746195567ed8d3e24945ecd636e3619d20b9de9e9149"
dependencies = [
"cc",
"pkg-config",
@ -1266,7 +1269,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "54a63d0570e4c3e0daf7a8d380563610e159f538e20448d6c911337246f40e84"
dependencies = [
"anyhow",
"bitflags 2.5.0",
"bitflags 2.6.0",
"ctor",
"napi-derive",
"napi-sys",
@ -1353,7 +1356,7 @@ version = "0.27.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2eb04e9c688eff1c89d72b407f168cf79bb9e867a9d3323ed6c01519eb9cc053"
dependencies = [
"bitflags 2.5.0",
"bitflags 2.6.0",
"cfg-if",
"libc",
]
@ -1380,7 +1383,7 @@ version = "6.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6205bd8bb1e454ad2e27422015fb5e4f2bcc7e08fa8f27058670d208324a4d2d"
dependencies = [
"bitflags 2.5.0",
"bitflags 2.6.0",
"crossbeam-channel",
"filetime",
"fsevent-sys",
@ -1890,11 +1893,11 @@ dependencies = [
[[package]]
name = "rusqlite"
version = "0.29.0"
version = "0.32.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "549b9d036d571d42e6e85d1c1425e2ac83491075078ca9a15be021c56b1641f2"
checksum = "7753b721174eb8ff87a9a0e799e2d7bc3749323e773db92e0984debb00019d6e"
dependencies = [
"bitflags 2.5.0",
"bitflags 2.6.0",
"fallible-iterator",
"fallible-streaming-iterator",
"hashlink",
@ -1920,7 +1923,7 @@ version = "0.38.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "65e04861e65f21776e67888bfbea442b3642beaa0138fdb1dd7a84a52dffdb89"
dependencies = [
"bitflags 2.5.0",
"bitflags 2.6.0",
"errno",
"libc",
"linux-raw-sys",
@ -2236,7 +2239,7 @@ version = "0.107.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6528f3dd33e11eae9d7fe9fee4a79d5bbd211c74426ab2eec64dc82bd2eb74d"
dependencies = [
"bitflags 2.5.0",
"bitflags 2.6.0",
"is-macro",
"num-bigint",
"scoped-tls",

View File

@ -34,7 +34,7 @@ nom = '7.1.3'
regex = "1.9.1"
rayon = "1.7.0"
rkyv = { version = "0.7", features = ["validation"] }
rusqlite = { version = "0.29.0", features = ["bundled", "array", "vtab", "wasm32-wasi-vfs"] }
rusqlite = { version = "0.32.1", features = ["bundled", "array", "vtab", "wasm32-wasi-vfs"] }
thiserror = "1.0.40"
tracing = "0.1.37"
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }

View File

@ -28,7 +28,7 @@ import {
DaemonProjectGraphError,
ProjectGraphError,
} from '../../project-graph/error-types';
import { IS_WASM, NxWorkspaceFiles, TaskRun } from '../../native';
import { IS_WASM, NxWorkspaceFiles, TaskRun, TaskTarget } from '../../native';
import { HandleGlobMessage } from '../message-types/glob';
import {
GET_NX_WORKSPACE_FILES,
@ -44,7 +44,9 @@ import {
} from '../message-types/get-files-in-directory';
import { HASH_GLOB, HandleHashGlobMessage } from '../message-types/hash-glob';
import {
GET_ESTIMATED_TASK_TIMINGS,
GET_FLAKY_TASKS,
HandleGetEstimatedTaskTimings,
HandleGetFlakyTasks,
HandleRecordTaskRunsMessage,
RECORD_TASK_RUNS,
@ -357,6 +359,17 @@ export class DaemonClient {
return this.sendToDaemonViaQueue(message);
}
async getEstimatedTaskTimings(
targets: TaskTarget[]
): Promise<Record<string, number>> {
const message: HandleGetEstimatedTaskTimings = {
type: GET_ESTIMATED_TASK_TIMINGS,
targets,
};
return this.sendToDaemonViaQueue(message);
}
recordTaskRuns(taskRuns: TaskRun[]): Promise<void> {
const message: HandleRecordTaskRunsMessage = {
type: RECORD_TASK_RUNS,

View File

@ -1,12 +1,24 @@
import type { TaskRun } from '../../native';
import type { TaskRun, TaskTarget } from '../../native';
export const GET_FLAKY_TASKS = 'GET_FLAKY_TASKS' as const;
export const GET_ESTIMATED_TASK_TIMINGS = 'GET_ESTIMATED_TASK_TIMINGS' as const;
export const RECORD_TASK_RUNS = 'RECORD_TASK_RUNS' as const;
export type HandleGetFlakyTasks = {
type: typeof GET_FLAKY_TASKS;
hashes: string[];
};
export type HandleGetEstimatedTaskTimings = {
type: typeof GET_ESTIMATED_TASK_TIMINGS;
targets: TaskTarget[];
};
export type HandleRecordTaskRunsMessage = {
type: typeof RECORD_TASK_RUNS;
taskRuns: TaskRun[];
};
export function isHandleGetFlakyTasksMessage(
message: unknown
): message is HandleGetFlakyTasks {
@ -18,12 +30,16 @@ export function isHandleGetFlakyTasksMessage(
);
}
export const RECORD_TASK_RUNS = 'RECORD_TASK_RUNS' as const;
export type HandleRecordTaskRunsMessage = {
type: typeof RECORD_TASK_RUNS;
taskRuns: TaskRun[];
};
export function isHandleGetEstimatedTaskTimings(
message: unknown
): message is HandleGetEstimatedTaskTimings {
return (
typeof message === 'object' &&
message !== null &&
'type' in message &&
message['type'] === GET_ESTIMATED_TASK_TIMINGS
);
}
export function isHandleWriteTaskRunsToHistoryMessage(
message: unknown

View File

@ -1,14 +1,5 @@
import { TaskRun } from '../../native';
import { TaskHistory } from '../../utils/task-history';
let taskHistory: TaskHistory;
function getTaskHistory() {
if (!taskHistory) {
taskHistory = new TaskHistory();
}
return taskHistory;
}
import { getTaskHistory } from '../../utils/task-history';
import type { TaskRun, TaskTarget } from '../../native';
export async function handleRecordTaskRuns(taskRuns: TaskRun[]) {
const taskHistory = getTaskHistory();
@ -27,3 +18,12 @@ export async function handleGetFlakyTasks(hashes: string[]) {
description: 'handleGetFlakyTasks',
};
}
export async function handleGetEstimatedTaskTimings(targets: TaskTarget[]) {
const taskHistory = getTaskHistory();
const history = await taskHistory.getEstimatedTaskTimings(targets);
return {
response: JSON.stringify(history),
description: 'handleGetEstimatedTaskTimings',
};
}

View File

@ -74,12 +74,17 @@ import { handleGetFilesInDirectory } from './handle-get-files-in-directory';
import { HASH_GLOB, isHandleHashGlobMessage } from '../message-types/hash-glob';
import { handleHashGlob } from './handle-hash-glob';
import {
GET_ESTIMATED_TASK_TIMINGS,
GET_FLAKY_TASKS,
isHandleGetEstimatedTaskTimings,
isHandleGetFlakyTasksMessage,
isHandleWriteTaskRunsToHistoryMessage,
RECORD_TASK_RUNS,
} from '../message-types/task-history';
import {
handleRecordTaskRuns,
handleGetFlakyTasks,
handleGetEstimatedTaskTimings,
} from './handle-task-history';
import { isHandleForceShutdownMessage } from '../message-types/force-shutdown';
import { handleForceShutdown } from './handle-force-shutdown';
@ -240,11 +245,15 @@ async function handleMessage(socket, data: string) {
handleHashGlob(payload.globs, payload.exclude)
);
} else if (isHandleGetFlakyTasksMessage(payload)) {
await handleResult(socket, 'GET_TASK_HISTORY_FOR_HASHES', () =>
await handleResult(socket, GET_FLAKY_TASKS, () =>
handleGetFlakyTasks(payload.hashes)
);
} else if (isHandleGetEstimatedTaskTimings(payload)) {
await handleResult(socket, GET_ESTIMATED_TASK_TIMINGS, () =>
handleGetEstimatedTaskTimings(payload.targets)
);
} else if (isHandleWriteTaskRunsToHistoryMessage(payload)) {
await handleResult(socket, 'WRITE_TASK_RUNS_TO_HISTORY', () =>
await handleResult(socket, RECORD_TASK_RUNS, () =>
handleRecordTaskRuns(payload.taskRuns)
);
} else if (isHandleForceShutdownMessage(payload)) {

View File

@ -12,8 +12,6 @@ import { logger } from '../utils/logger';
import { output } from '../utils/output';
import { dirname, join, relative, sep } from 'path';
import * as chalk from 'chalk';
import { gt } from 'semver';
import { nxVersion } from '../utils/versions';
/**
* Options to set when writing a file in the Virtual file system tree.

View File

@ -41,6 +41,7 @@ export declare class NxTaskHistory {
constructor(db: ExternalObject<Connection>)
recordTaskRuns(taskRuns: Array<TaskRun>): void
getFlakyTasks(hashes: Array<string>): Array<string>
getEstimatedTaskTimings(targets: Array<TaskTarget>): Record<string, number>
}
export declare class RustPseudoTerminal {

View File

@ -1,9 +1,12 @@
use std::rc::Rc;
use std::collections::HashMap;
use itertools::Itertools;
use napi::bindgen_prelude::*;
use rusqlite::vtab::array;
use rusqlite::{params, types::Value, Connection};
use crate::native::tasks::types::TaskTarget;
#[napi(object)]
pub struct TaskRun {
pub hash: String,
@ -94,4 +97,40 @@ impl NxTaskHistory {
.map(|r| r.map_err(anyhow::Error::from))
.collect()
}
#[napi]
pub fn get_estimated_task_timings(&self, targets: Vec<TaskTarget>) -> anyhow::Result<HashMap<String, f64>> {
let values = Rc::new(
targets
.iter()
.map(|t| Value::from(
match &t.configuration {
Some(configuration) => format!("{}:{}:{}", t.project, t.target, configuration),
_ => format!("{}:{}", t.project, t.target)
}
))
.collect::<Vec<Value>>(),
);
// for older query sql version, need to select: (project || ':' || target || (CASE WHEN coalesce(configuration, '') <> '' THEN ':' || configuration ELSE '' END)) AS target_string,
self.db
.prepare(
"
SELECT
CONCAT_WS(':', project, target, configuration) AS target_string,
AVG(end - start) AS duration
FROM task_history
JOIN task_details ON task_history.hash = task_details.hash
WHERE target_string in rarray(?1)
GROUP BY target_string
",
)?
.query_map([values], |row| {
let target_string: String = row.get(0)?;
let duration: f64 = row.get(1)?;
Ok((target_string, duration))
})?
.map(|r| r.map_err(anyhow::Error::from))
.collect()
}
}

View File

@ -63,7 +63,7 @@ describe('NxTaskHistory', () => {
hash: '123',
code: 0,
status: 'success',
start: Date.now() - 1000 * 60 * 30,
start: Date.now() - 1000 * 60 * 60,
end: Date.now(),
},
{
@ -82,4 +82,38 @@ describe('NxTaskHistory', () => {
expect(r2).not.toContain('123');
expect(r2).not.toContain('234');
});
it('should get estimated task timings', () => {
taskHistory.recordTaskRuns([
{
hash: '123',
code: 1,
status: 'failure',
start: Date.now() - 1000 * 60 * 60,
end: Date.now(),
},
{
hash: '123',
code: 0,
status: 'success',
start: Date.now() - 1000 * 60 * 60,
end: Date.now(),
},
{
hash: '234',
code: 0,
status: 'success',
start: Date.now() - 1000 * 60 * 60,
end: Date.now(),
},
]);
const r = taskHistory.getEstimatedTaskTimings([
{
project: 'proj',
target: 'build',
configuration: 'production',
},
]);
expect(r['proj:build:production']).toEqual(60 * 60 * 1000);
});
});

View File

@ -1,9 +1,9 @@
import { serializeTarget } from '../../utils/serialize-target';
import { Task } from '../../config/task-graph';
import { output } from '../../utils/output';
import { TaskHistory } from '../../utils/task-history';
import { LifeCycle, TaskResult } from '../life-cycle';
import type { TaskRun as NativeTaskRun } from '../../native';
import { getTaskHistory } from '../../utils/task-history';
interface TaskRun extends NativeTaskRun {
target: Task['target'];
@ -12,7 +12,7 @@ interface TaskRun extends NativeTaskRun {
export class TaskHistoryLifeCycle implements LifeCycle {
private startTimings: Record<string, number> = {};
private taskRuns = new Map<string, TaskRun>();
private taskHistory = new TaskHistory();
private taskHistory = getTaskHistory();
startTasks(tasks: Task[]): void {
for (let task of tasks) {

View File

@ -76,7 +76,10 @@ export class TaskOrchestrator {
async run() {
// Init the ForkedProcessTaskRunner
await this.forkedProcessTaskRunner.init();
await Promise.all([
this.forkedProcessTaskRunner.init(),
this.tasksSchedule.init(),
]);
// initial scheduling
await this.scheduleNextTasks();

View File

@ -4,6 +4,8 @@ import { Task, TaskGraph } from '../config/task-graph';
import { DependencyType, ProjectGraph } from '../config/project-graph';
import * as nxJsonUtils from '../config/nx-json';
import * as executorUtils from '../command-line/run/executor-utils';
import * as taskHistoryUtils from '../utils/task-history';
import type { LifeCycle } from './life-cycle';
function createMockTask(id: string, parallelism: boolean = true): Task {
const [project, target] = id.split(':');
@ -20,14 +22,34 @@ function createMockTask(id: string, parallelism: boolean = true): Task {
}
describe('TasksSchedule', () => {
let taskHistory: any;
let lifeCycle: LifeCycle;
beforeEach(() => {
lifeCycle = {
startTask: jest.fn(),
endTask: jest.fn(),
scheduleTask: jest.fn(),
};
taskHistory = {
getEstimatedTaskTimings: jest.fn(),
getFlakyTasks: jest.fn(),
recordTaskRuns: jest.fn(),
};
jest.spyOn(taskHistoryUtils, 'getTaskHistory').mockReturnValue(taskHistory);
});
afterEach(() => {
jest.resetAllMocks();
});
describe('dependent tasks', () => {
let taskSchedule: TasksSchedule;
let taskGraph: TaskGraph;
let app1Build: Task;
let app2Build: Task;
let lib1Build: Task;
let lifeCycle: any;
beforeEach(() => {
beforeEach(async () => {
app1Build = createMockTask('app1:build');
app2Build = createMockTask('app2:build');
lib1Build = createMockTask('lib1:build');
@ -115,15 +137,11 @@ describe('TasksSchedule', () => {
externalNodes: {},
version: '5',
};
lifeCycle = {
startTask: jest.fn(),
endTask: jest.fn(),
scheduleTask: jest.fn(),
};
taskHistory.getEstimatedTaskTimings.mockReturnValue({});
taskSchedule = new TasksSchedule(projectGraph, taskGraph, {
lifeCycle,
});
await taskSchedule.init();
});
describe('Without Batch Mode', () => {
@ -231,25 +249,38 @@ describe('TasksSchedule', () => {
let taskGraph: TaskGraph;
let app1Test: Task;
let app2Test: Task;
let app3Test: Task;
let app4Test: Task;
let lib1Test: Task;
let lifeCycle: any;
beforeEach(() => {
beforeEach(async () => {
app1Test = createMockTask('app1:test');
app2Test = createMockTask('app2:test');
app3Test = createMockTask('app3:test');
app4Test = createMockTask('app4:test');
lib1Test = createMockTask('lib1:test');
taskGraph = {
tasks: {
'app1:test': app1Test,
'app2:test': app2Test,
'app3:test': app3Test,
'app4:test': app4Test,
'lib1:test': lib1Test,
},
dependencies: {
'app1:test': [],
'app2:test': [],
'app3:test': [],
'app4:test': [],
'lib1:test': [],
},
roots: ['app1:test', 'app2:test', 'lib1:test'],
roots: [
'app1:test',
'app2:test',
'lib1:test',
'app3:test',
'app4:test',
],
};
jest.spyOn(nxJsonUtils, 'readNxJson').mockReturnValue({});
jest.spyOn(executorUtils, 'getExecutorInformation').mockReturnValue({
@ -289,6 +320,30 @@ describe('TasksSchedule', () => {
},
},
},
app3: {
name: 'app3',
type: 'app',
data: {
root: 'app3',
targets: {
test: {
executor: 'awesome-executors:app2-test',
},
},
},
},
app4: {
name: 'app4',
type: 'app',
data: {
root: 'app4',
targets: {
test: {
executor: 'awesome-executors:app2-test',
},
},
},
},
lib1: {
name: 'lib1',
type: 'lib',
@ -321,12 +376,6 @@ describe('TasksSchedule', () => {
externalNodes: {},
version: '5',
};
lifeCycle = {
startTask: jest.fn(),
endTask: jest.fn(),
scheduleTask: jest.fn(),
};
taskSchedule = new TasksSchedule(projectGraph, taskGraph, {
lifeCycle,
});
@ -343,34 +392,93 @@ describe('TasksSchedule', () => {
process.env['NX_BATCH_MODE'] = original;
});
it('should begin with no scheduled tasks', () => {
expect(taskSchedule.nextBatch()).toBeNull();
expect(taskSchedule.nextTask()).toBeNull();
describe('when all tasks have same historical runtime', () => {
beforeEach(async () => {
taskHistory.getEstimatedTaskTimings.mockReturnValue({
'app1:test': 100,
'app2:test': 100,
'app3:test': 100,
'app4:test': 100,
'lib1:test': 100,
});
await taskSchedule.init();
});
it('should begin with no scheduled tasks', () => {
expect(taskSchedule.nextBatch()).toBeNull();
expect(taskSchedule.nextTask()).toBeNull();
});
it('should schedule root tasks in topological order', async () => {
await taskSchedule.scheduleNextTasks();
expect(taskSchedule.nextTask()).toEqual(lib1Test);
expect(taskSchedule.nextTask()).toEqual(app1Test);
expect(taskSchedule.nextTask()).toEqual(app2Test);
expect(taskSchedule.nextTask()).toEqual(app3Test);
expect(taskSchedule.nextTask()).toEqual(app4Test);
});
it('should run out of tasks when they are all complete', async () => {
await taskSchedule.scheduleNextTasks();
taskSchedule.nextTask();
taskSchedule.nextTask();
taskSchedule.nextTask();
taskSchedule.nextTask();
taskSchedule.nextTask();
taskSchedule.complete([
lib1Test.id,
app1Test.id,
app2Test.id,
app3Test.id,
app4Test.id,
]);
expect(taskSchedule.hasTasks()).toEqual(false);
});
it('should not schedule batches', async () => {
await taskSchedule.scheduleNextTasks();
expect(taskSchedule.nextTask()).not.toBeNull();
expect(taskSchedule.nextBatch()).toBeNull();
});
});
it('should schedule root tasks in topological order', async () => {
await taskSchedule.scheduleNextTasks();
expect(taskSchedule.nextTask()).toEqual(lib1Test);
expect(taskSchedule.nextTask()).toEqual(app1Test);
expect(taskSchedule.nextTask()).toEqual(app2Test);
});
describe('when all tasks have different historical runtime', () => {
it('should schedule task with longer runtime first', async () => {
taskHistory.getEstimatedTaskTimings.mockReturnValue({
'app1:test': 200,
'app2:test': 300,
'app3:test': 400,
'app4:test': 500,
'lib1:test': 100,
});
await taskSchedule.init();
it('should run out of tasks when they are all complete', async () => {
await taskSchedule.scheduleNextTasks();
taskSchedule.nextTask();
taskSchedule.nextTask();
taskSchedule.nextTask();
taskSchedule.complete([lib1Test.id, app1Test.id, app2Test.id]);
await taskSchedule.scheduleNextTasks();
expect(taskSchedule.nextTask()).toEqual(lib1Test); // lib1 should run first because app1 and app2 depend on it
expect(taskSchedule.nextTask()).toEqual(app4Test); // app4 should run first because it has the longest runtime
expect(taskSchedule.nextTask()).toEqual(app3Test);
expect(taskSchedule.nextTask()).toEqual(app2Test);
expect(taskSchedule.nextTask()).toEqual(app1Test);
});
expect(taskSchedule.hasTasks()).toEqual(false);
});
it('should schedule task with no historial runtime first', async () => {
taskHistory.getEstimatedTaskTimings.mockReturnValue({
'app1:test': 200,
'app4:test': 500,
'lib1:test': 100,
});
await taskSchedule.init();
it('should not schedule batches', async () => {
await taskSchedule.scheduleNextTasks();
expect(taskSchedule.nextTask()).not.toBeNull();
expect(taskSchedule.nextBatch()).toBeNull();
await taskSchedule.scheduleNextTasks();
expect(taskSchedule.nextTask()).toEqual(lib1Test); // lib1 should run first because app1 and app2 depend on it
expect(taskSchedule.nextTask()).toEqual(app2Test); // app2 should run because it has no historical runtime
expect(taskSchedule.nextTask()).toEqual(app3Test); // app3 should run because it has no historical runtime
expect(taskSchedule.nextTask()).toEqual(app4Test); // app4 should run because it has the longest runtime
expect(taskSchedule.nextTask()).toEqual(app1Test); // app1 should run last because it has the shortest runtime
});
});
});
@ -392,7 +500,11 @@ describe('TasksSchedule', () => {
expect(taskSchedule.nextBatch()).toEqual({
executorName: 'awesome-executors:test',
taskGraph: removeTasksFromTaskGraph(taskGraph, ['app2:test']),
taskGraph: removeTasksFromTaskGraph(taskGraph, [
'app2:test',
'app3:test',
'app4:test',
]),
});
expect(taskSchedule.nextBatch()).toEqual({
executorName: 'awesome-executors:app2-test',
@ -420,8 +532,7 @@ describe('TasksSchedule', () => {
let app1Build: Task;
let app2Build: Task;
let lib1Build: Task;
let lifeCycle: any;
beforeEach(() => {
beforeEach(async () => {
// app1 depends on lib1
// app2 does not depend on anything
// lib1 does not depend on anything
@ -513,15 +624,11 @@ describe('TasksSchedule', () => {
externalNodes: {},
version: '5',
};
lifeCycle = {
startTask: jest.fn(),
endTask: jest.fn(),
scheduleTask: jest.fn(),
};
taskHistory.getEstimatedTaskTimings.mockReturnValue({});
taskSchedule = new TasksSchedule(projectGraph, taskGraph, {
lifeCycle,
});
await taskSchedule.init();
});
describe('Without Batch Mode', () => {
@ -594,8 +701,7 @@ describe('TasksSchedule', () => {
let app1Test: Task;
let app2Test: Task;
let lib1Test: Task;
let lifeCycle: any;
beforeEach(() => {
beforeEach(async () => {
// app1, app2, and lib1 do not depend on each other
// all tasks have parallelism set to false
app1Test = createMockTask('app1:test', false);
@ -687,15 +793,11 @@ describe('TasksSchedule', () => {
externalNodes: {},
version: '5',
};
lifeCycle = {
startTask: jest.fn(),
endTask: jest.fn(),
scheduleTask: jest.fn(),
};
taskHistory.getEstimatedTaskTimings.mockReturnValue({});
taskSchedule = new TasksSchedule(projectGraph, taskGraph, {
lifeCycle,
});
await taskSchedule.init();
});
describe('Without Batch Mode', () => {

View File

@ -9,6 +9,7 @@ import { Task, TaskGraph } from '../config/task-graph';
import { ProjectGraph } from '../config/project-graph';
import { findAllProjectNodeDependencies } from '../utils/project-graph-utils';
import { reverse } from '../project-graph/operators';
import { TaskHistory, getTaskHistory } from '../utils/task-history';
export interface Batch {
executorName: string;
@ -19,11 +20,14 @@ export class TasksSchedule {
private notScheduledTaskGraph = this.taskGraph;
private reverseTaskDeps = calculateReverseDeps(this.taskGraph);
private reverseProjectGraph = reverse(this.projectGraph);
private taskHistory: TaskHistory = getTaskHistory();
private scheduledBatches: Batch[] = [];
private scheduledTasks: string[] = [];
private runningTasks = new Set<string>();
private completedTasks = new Set<string>();
private scheduleRequestsExecutionChain = Promise.resolve();
private estimatedTaskTimings: Record<string, number>;
constructor(
private readonly projectGraph: ProjectGraph,
@ -31,6 +35,12 @@ export class TasksSchedule {
private readonly options: DefaultTasksRunnerOptions
) {}
public async init() {
this.estimatedTaskTimings = await this.taskHistory.getEstimatedTaskTimings(
Object.values(this.taskGraph.tasks).map((t) => t.target)
);
}
public async scheduleNextTasks() {
this.scheduleRequestsExecutionChain =
this.scheduleRequestsExecutionChain.then(() => this.scheduleTasks());
@ -112,12 +122,35 @@ export class TasksSchedule {
const project1 = this.taskGraph.tasks[taskId1].target.project;
const project2 = this.taskGraph.tasks[taskId2].target.project;
return (
findAllProjectNodeDependencies(project2, this.reverseProjectGraph)
.length -
findAllProjectNodeDependencies(project1, this.reverseProjectGraph)
.length
);
const project1NodeDependencies = findAllProjectNodeDependencies(
project1,
this.reverseProjectGraph
).length;
const project2NodeDependencies = findAllProjectNodeDependencies(
project2,
this.reverseProjectGraph
).length;
const dependenciesDiff =
project2NodeDependencies - project1NodeDependencies;
if (dependenciesDiff !== 0) {
return dependenciesDiff;
}
const task1Timing: number | undefined =
this.estimatedTaskTimings[taskId1];
if (!task1Timing) {
// if no timing or 0, put task1 at beginning
return -1;
}
const task2Timing: number | undefined =
this.estimatedTaskTimings[taskId2];
if (!task2Timing) {
// if no timing or 0, put task2 at beginning
return 1;
}
return task2Timing - task1Timing;
});
this.runningTasks.add(taskId);
}

View File

@ -1,11 +1,25 @@
import { daemonClient } from '../daemon/client/client';
import { isOnDaemon } from '../daemon/is-on-daemon';
import { NxTaskHistory, TaskRun } from '../native';
import { NxTaskHistory, TaskRun, TaskTarget } from '../native';
import { getDbConnection } from './db-connection';
export class TaskHistory {
taskHistory = new NxTaskHistory(getDbConnection());
/**
* This function returns estimated timings per task
* @param targets
* @returns a map where key is task id (project:target:configuration), value is average time of historical runs
*/
async getEstimatedTaskTimings(
targets: TaskTarget[]
): Promise<Record<string, number>> {
if (isOnDaemon() || !daemonClient.enabled()) {
return this.taskHistory.getEstimatedTaskTimings(targets);
}
return await daemonClient.getEstimatedTaskTimings(targets);
}
async getFlakyTasks(hashes: string[]) {
if (isOnDaemon() || !daemonClient.enabled()) {
return this.taskHistory.getFlakyTasks(hashes);
@ -20,3 +34,16 @@ export class TaskHistory {
return daemonClient.recordTaskRuns(taskRuns);
}
}
let taskHistory: TaskHistory;
/**
* This function returns the singleton instance of TaskHistory
* @returns singleton instance of TaskHistory
*/
export function getTaskHistory(): TaskHistory {
if (!taskHistory) {
taskHistory = new TaskHistory();
}
return taskHistory;
}