feat: enhance workflow and testing infrastructure

- Introduced ExecutionMode type for project execution modes (parallel, sequential).
- Updated SetupOpts to use ExecutionMode instead of string literals.
- Enhanced workflow states to include a new "In Review" state with appropriate transitions.
- Implemented TestHarness for end-to-end testing, including command interception and workspace setup.
- Created TestProvider for in-memory issue tracking during tests.
- Refactored project registration and setup tools to utilize ExecutionMode.
- Updated various tools to ensure compatibility with new workflow and execution modes.
- Added new dependencies: cockatiel for resilience and zod for schema validation.
This commit is contained in:
Lauren ten Hoor
2026-02-16 13:27:14 +08:00
parent a359ffed34
commit 371e760d94
37 changed files with 2444 additions and 263 deletions

View File

@@ -18,7 +18,10 @@ import { log as auditLog } from "../audit.js";
import { DATA_DIR } from "../setup/migrate-layout.js";
import { checkWorkerHealth, scanOrphanedLabels, fetchGatewaySessions, type SessionLookup } from "./health.js";
import { projectTick } from "./tick.js";
import { reviewPass } from "./review.js";
import { createProvider } from "../providers/index.js";
import { loadConfig } from "../config/index.js";
import { ExecutionMode } from "../workflow.js";
// ---------------------------------------------------------------------------
// Types
@@ -39,6 +42,7 @@ type TickResult = {
totalPickups: number;
totalHealthFixes: number;
totalSkipped: number;
totalReviewTransitions: number;
};
type ServiceContext = {
@@ -191,6 +195,7 @@ async function processAllAgents(
totalPickups: 0,
totalHealthFixes: 0,
totalSkipped: 0,
totalReviewTransitions: 0,
};
// Fetch gateway sessions once for all agents/projects
@@ -209,6 +214,7 @@ async function processAllAgents(
result.totalPickups += agentResult.totalPickups;
result.totalHealthFixes += agentResult.totalHealthFixes;
result.totalSkipped += agentResult.totalSkipped;
result.totalReviewTransitions += agentResult.totalReviewTransitions;
}
return result;
@@ -218,9 +224,9 @@ async function processAllAgents(
* Log tick results if anything happened.
*/
function logTickResult(result: TickResult, logger: ServiceContext["logger"]): void {
if (result.totalPickups > 0 || result.totalHealthFixes > 0) {
if (result.totalPickups > 0 || result.totalHealthFixes > 0 || result.totalReviewTransitions > 0) {
logger.info(
`work_heartbeat tick: ${result.totalPickups} pickups, ${result.totalHealthFixes} health fixes, ${result.totalSkipped} skipped`,
`work_heartbeat tick: ${result.totalPickups} pickups, ${result.totalHealthFixes} health fixes, ${result.totalReviewTransitions} review transitions, ${result.totalSkipped} skipped`,
);
}
}
@@ -243,60 +249,83 @@ export async function tick(opts: {
const projectIds = Object.keys(data.projects);
if (projectIds.length === 0) {
return { totalPickups: 0, totalHealthFixes: 0, totalSkipped: 0 };
return { totalPickups: 0, totalHealthFixes: 0, totalSkipped: 0, totalReviewTransitions: 0 };
}
const result: TickResult = {
totalPickups: 0,
totalHealthFixes: 0,
totalSkipped: 0,
totalReviewTransitions: 0,
};
const projectExecution = (pluginConfig?.projectExecution as string) ?? "parallel";
const projectExecution = (pluginConfig?.projectExecution as string) ?? ExecutionMode.PARALLEL;
let activeProjects = 0;
for (const groupId of projectIds) {
const project = data.projects[groupId];
if (!project) continue;
try {
const project = data.projects[groupId];
if (!project) continue;
// Health pass: auto-fix zombies and stale workers
result.totalHealthFixes += await performHealthPass(
workspaceDir,
groupId,
project,
sessions,
);
const { provider } = await createProvider({ repo: project.repo, provider: project.provider });
const resolvedConfig = await loadConfig(workspaceDir, project.name);
// Budget check: stop if we've hit the limit
const remaining = config.maxPickupsPerTick - result.totalPickups;
if (remaining <= 0) break;
// Health pass: auto-fix zombies and stale workers
result.totalHealthFixes += await performHealthPass(
workspaceDir,
groupId,
project,
sessions,
provider,
resolvedConfig.timeouts.staleWorkerHours,
);
// Sequential project guard: don't start new projects if one is active
const isProjectActive = await checkProjectActive(workspaceDir, groupId);
if (projectExecution === "sequential" && !isProjectActive && activeProjects >= 1) {
// Review pass: transition issues whose PR check condition is met
result.totalReviewTransitions += await reviewPass({
workspaceDir,
groupId,
workflow: resolvedConfig.workflow,
provider,
repoPath: project.repo,
gitPullTimeoutMs: resolvedConfig.timeouts.gitPullMs,
});
// Budget check: stop if we've hit the limit
const remaining = config.maxPickupsPerTick - result.totalPickups;
if (remaining <= 0) break;
// Sequential project guard: don't start new projects if one is active
const isProjectActive = await checkProjectActive(workspaceDir, groupId);
if (projectExecution === ExecutionMode.SEQUENTIAL && !isProjectActive && activeProjects >= 1) {
result.totalSkipped++;
continue;
}
// Tick pass: fill free worker slots
const tickResult = await projectTick({
workspaceDir,
groupId,
agentId,
pluginConfig,
maxPickups: remaining,
});
result.totalPickups += tickResult.pickups.length;
result.totalSkipped += tickResult.skipped.length;
// Notifications now handled by dispatchTask
if (isProjectActive || tickResult.pickups.length > 0) activeProjects++;
} catch (err) {
// Per-project isolation: one failing project doesn't crash the entire tick
opts.logger.warn(`Heartbeat tick failed for project ${groupId}: ${(err as Error).message}`);
result.totalSkipped++;
continue;
}
// Tick pass: fill free worker slots
const tickResult = await projectTick({
workspaceDir,
groupId,
agentId,
pluginConfig,
maxPickups: remaining,
});
result.totalPickups += tickResult.pickups.length;
result.totalSkipped += tickResult.skipped.length;
// Notifications now handled by dispatchTask
if (isProjectActive || tickResult.pickups.length > 0) activeProjects++;
}
await auditLog(workspaceDir, "heartbeat_tick", {
projectsScanned: projectIds.length,
healthFixes: result.totalHealthFixes,
reviewTransitions: result.totalReviewTransitions,
pickups: result.totalPickups,
skipped: result.totalSkipped,
});
@@ -312,8 +341,9 @@ async function performHealthPass(
groupId: string,
project: any,
sessions: SessionLookup | null,
provider: import("../providers/provider.js").IssueProvider,
staleWorkerHours?: number,
): Promise<number> {
const { provider } = await createProvider({ repo: project.repo, provider: project.provider });
let fixedCount = 0;
for (const role of Object.keys(project.workers)) {
@@ -326,6 +356,7 @@ async function performHealthPass(
sessions,
autoFix: true,
provider,
staleWorkerHours,
});
fixedCount += healthFixes.filter((f) => f.fixed).length;