From f2e71a35d83045c4ec41ff9d2a68f3516a5c7abc Mon Sep 17 00:00:00 2001 From: Lauren ten Hoor Date: Wed, 11 Feb 2026 01:04:30 +0800 Subject: [PATCH] feat: implement work heartbeat service for health checks and task dispatching - Introduced a new heartbeat service that runs at defined intervals to perform health checks on workers and fill available task slots based on priority. - Added a health tool to scan worker health across projects with optional auto-fix capabilities. - Updated the status tool to provide a lightweight overview of worker states and queue counts without health checks. - Enhanced task creation tool descriptions to clarify task state handling. - Implemented tests for the work heartbeat logic, ensuring proper project resolution, worker state management, and task prioritization. --- index.ts | 21 +- lib/dispatch.ts | 2 +- lib/services/heartbeat.ts | 180 ++++++++++++++ lib/services/queue.ts | 206 +--------------- lib/services/tick.ts | 8 +- lib/setup/config.ts | 9 + lib/templates.ts | 23 +- lib/tools/auto-pickup.ts | 125 ---------- lib/tools/health.ts | 74 ++++++ lib/tools/status.ts | 100 +++----- lib/tools/task-create.ts | 11 +- lib/tools/work-heartbeat.test.ts | 401 +++++++++++++++++++++++++++++++ lib/tools/work-heartbeat.ts | 310 ++++++++++++++++++++++++ 13 files changed, 1044 insertions(+), 426 deletions(-) create mode 100644 lib/services/heartbeat.ts delete mode 100644 lib/tools/auto-pickup.ts create mode 100644 lib/tools/health.ts create mode 100644 lib/tools/work-heartbeat.test.ts create mode 100644 lib/tools/work-heartbeat.ts diff --git a/index.ts b/index.ts index 17b0e8d..3120d50 100644 --- a/index.ts +++ b/index.ts @@ -5,11 +5,13 @@ import { createTaskCreateTool } from "./lib/tools/task-create.js"; import { createTaskUpdateTool } from "./lib/tools/task-update.js"; import { createTaskCommentTool } from "./lib/tools/task-comment.js"; import { createStatusTool } from "./lib/tools/status.js"; -import { createAutoPickupTool } from "./lib/tools/auto-pickup.js"; +import { createHealthTool } from "./lib/tools/health.js"; +import { createWorkHeartbeatTool } from "./lib/tools/work-heartbeat.js"; import { createProjectRegisterTool } from "./lib/tools/project-register.js"; import { createSetupTool } from "./lib/tools/setup.js"; import { createOnboardTool } from "./lib/tools/onboard.js"; import { registerCli } from "./lib/cli.js"; +import { registerHeartbeatService } from "./lib/services/heartbeat.js"; const plugin = { id: "devclaw", @@ -44,6 +46,15 @@ const plugin = { workerComplete: { type: "boolean", default: true }, }, }, + work_heartbeat: { + type: "object", + description: "Token-free interval-based heartbeat service. Runs health checks + queue dispatch automatically.", + properties: { + enabled: { type: "boolean", default: true, description: "Enable the heartbeat service." }, + intervalSeconds: { type: "number", default: 60, description: "Seconds between ticks." }, + maxPickupsPerTick: { type: "number", default: 4, description: "Max worker dispatches per tick." }, + }, + }, }, }, @@ -59,7 +70,8 @@ const plugin = { // Operations api.registerTool(createStatusTool(api), { names: ["status"] }); - api.registerTool(createAutoPickupTool(api), { names: ["auto_pickup"] }); + api.registerTool(createHealthTool(api), { names: ["health"] }); + api.registerTool(createWorkHeartbeatTool(api), { names: ["work_heartbeat"] }); // Setup & config api.registerTool(createProjectRegisterTool(api), { names: ["project_register"] }); @@ -71,7 +83,10 @@ const plugin = { commands: ["devclaw"], }); - api.logger.info("DevClaw plugin registered (10 tools, 1 CLI command)"); + // Services + registerHeartbeatService(api); + + api.logger.info("DevClaw plugin registered (11 tools, 1 service, 1 CLI command)"); }, }; diff --git a/lib/dispatch.ts b/lib/dispatch.ts index 4c5b0cb..4d16965 100644 --- a/lib/dispatch.ts +++ b/lib/dispatch.ts @@ -1,5 +1,5 @@ /** - * dispatch.ts — Core dispatch logic shared by work_start, auto_pickup, and projectTick. + * dispatch.ts — Core dispatch logic shared by work_start, work_heartbeat, and projectTick. * * Handles: session lookup, spawn/reuse via Gateway RPC, task dispatch via CLI, * state update (activateWorker), and audit logging. diff --git a/lib/services/heartbeat.ts b/lib/services/heartbeat.ts new file mode 100644 index 0000000..5a27978 --- /dev/null +++ b/lib/services/heartbeat.ts @@ -0,0 +1,180 @@ +/** + * Heartbeat service — token-free interval-based queue processing. + * + * Runs as a plugin service (tied to gateway lifecycle). Every N seconds: + * 1. Health pass: auto-fix zombies, stale workers, orphaned state + * 2. Tick pass: fill free worker slots by priority + * + * Zero LLM tokens — all logic is deterministic code + CLI calls. + * Workers only consume tokens when they start processing dispatched tasks. + */ +import type { OpenClawPluginApi } from "openclaw/plugin-sdk"; +import { readProjects, getProject } from "../projects.js"; +import { log as auditLog } from "../audit.js"; +import { checkWorkerHealth } from "./health.js"; +import { projectTick } from "./tick.js"; +import { createProvider } from "../providers/index.js"; + +// --------------------------------------------------------------------------- +// Config +// --------------------------------------------------------------------------- + +export type HeartbeatConfig = { + enabled: boolean; + intervalSeconds: number; + maxPickupsPerTick: number; +}; + +export const HEARTBEAT_DEFAULTS: HeartbeatConfig = { + enabled: true, + intervalSeconds: 60, + maxPickupsPerTick: 4, +}; + +export function resolveHeartbeatConfig( + pluginConfig?: Record, +): HeartbeatConfig { + const raw = pluginConfig?.work_heartbeat as Partial | undefined; + return { ...HEARTBEAT_DEFAULTS, ...raw }; +} + +// --------------------------------------------------------------------------- +// Service +// --------------------------------------------------------------------------- + +export function registerHeartbeatService(api: OpenClawPluginApi) { + let intervalId: ReturnType | null = null; + + api.registerService({ + id: "devclaw-heartbeat", + + start: async (ctx) => { + const pluginConfig = api.pluginConfig as Record | undefined; + const config = resolveHeartbeatConfig(pluginConfig); + + if (!config.enabled) { + ctx.logger.info("work_heartbeat service disabled"); + return; + } + + const workspaceDir = ctx.workspaceDir; + if (!workspaceDir) { + ctx.logger.warn("work_heartbeat: no workspaceDir — service not started"); + return; + } + + const agentId = resolveAgentId(pluginConfig); + + ctx.logger.info( + `work_heartbeat service started: every ${config.intervalSeconds}s, max ${config.maxPickupsPerTick} pickups/tick`, + ); + + intervalId = setInterval(async () => { + try { + await tick({ workspaceDir, agentId, config, pluginConfig, logger: ctx.logger }); + } catch (err) { + ctx.logger.error(`work_heartbeat tick failed: ${err}`); + } + }, config.intervalSeconds * 1000); + }, + + stop: async (ctx) => { + if (intervalId) { + clearInterval(intervalId); + intervalId = null; + ctx.logger.info("work_heartbeat service stopped"); + } + }, + }); +} + +// --------------------------------------------------------------------------- +// Tick +// --------------------------------------------------------------------------- + +async function tick(opts: { + workspaceDir: string; + agentId?: string; + config: HeartbeatConfig; + pluginConfig?: Record; + logger: { info(msg: string): void; warn(msg: string): void }; +}) { + const { workspaceDir, agentId, config, pluginConfig, logger } = opts; + + const data = await readProjects(workspaceDir); + const projectIds = Object.keys(data.projects); + if (projectIds.length === 0) return; + + const projectExecution = + (pluginConfig?.projectExecution as string) ?? "parallel"; + + let totalPickups = 0; + let totalHealthFixes = 0; + let totalSkipped = 0; + let activeProjects = 0; + + for (const groupId of projectIds) { + const project = data.projects[groupId]; + if (!project) continue; + + const { provider } = createProvider({ repo: project.repo }); + + // Health pass: auto-fix + for (const role of ["dev", "qa"] as const) { + const fixes = await checkWorkerHealth({ + workspaceDir, groupId, project, role, + activeSessions: [], // No session list in service context + autoFix: true, + provider, + }); + totalHealthFixes += fixes.filter((f) => f.fixed).length; + } + + // Budget check + const remaining = config.maxPickupsPerTick - totalPickups; + if (remaining <= 0) break; + + // Sequential project guard + const fresh = (await readProjects(workspaceDir)).projects[groupId]; + if (!fresh) continue; + const projectActive = fresh.dev.active || fresh.qa.active; + if (projectExecution === "sequential" && !projectActive && activeProjects >= 1) { + totalSkipped++; + continue; + } + + // Tick pass: fill free slots + const result = await projectTick({ + workspaceDir, groupId, agentId, + pluginConfig, + maxPickups: remaining, + }); + + totalPickups += result.pickups.length; + totalSkipped += result.skipped.length; + if (projectActive || result.pickups.length > 0) activeProjects++; + } + + // Audit (only when something happened) + if (totalPickups > 0 || totalHealthFixes > 0) { + logger.info( + `work_heartbeat tick: ${totalPickups} pickups, ${totalHealthFixes} health fixes, ${totalSkipped} skipped`, + ); + } + + await auditLog(workspaceDir, "heartbeat_tick", { + projectsScanned: projectIds.length, + healthFixes: totalHealthFixes, + pickups: totalPickups, + skipped: totalSkipped, + }); +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function resolveAgentId(pluginConfig?: Record): string | undefined { + const ids = pluginConfig?.devClawAgentIds as string[] | undefined; + return ids?.[0]; +} diff --git a/lib/services/queue.ts b/lib/services/queue.ts index 3461bcf..d65dda0 100644 --- a/lib/services/queue.ts +++ b/lib/services/queue.ts @@ -1,8 +1,8 @@ /** - * Queue service — task sequencing and priority logic. + * Queue service — issue queue fetching. * - * Pure functions for scanning issue queues, building execution sequences, - * and formatting output. No tool registration or I/O concerns. + * Fetches issue queues per project from the issue provider. + * Pure functions, no tool registration or state mutation. */ import type { Issue } from "../providers/provider.js"; import { createProvider } from "../providers/index.js"; @@ -13,56 +13,6 @@ import type { Project } from "../projects.js"; // --------------------------------------------------------------------------- export type QueueLabel = "To Improve" | "To Test" | "To Do"; -export type Role = "dev" | "qa"; - -export interface SequencedTask { - sequence: number; - projectId: string; - projectName: string; - role: Role; - issueId: number; - title: string; - label: QueueLabel; - active: boolean; -} - -export interface ProjectTrack { - name: string; - role: Role; - tasks: SequencedTask[]; -} - -export interface ProjectExecutionConfig { - name: string; - groupId: string; - roleExecution: "parallel" | "sequential"; - devActive: boolean; - qaActive: boolean; - devIssueId: string | null; - qaIssueId: string | null; -} - -export interface ProjectTaskSequence { - projectId: string; - projectName: string; - roleExecution: "parallel" | "sequential"; - tracks: ProjectTrack[]; -} - -export interface GlobalTaskSequence { - mode: "sequential"; - tasks: SequencedTask[]; -} - -export interface ProjectQueues { - projectId: string; - project: Project; - queues: Record; -} - -// --------------------------------------------------------------------------- -// Constants & helpers -// --------------------------------------------------------------------------- export const QUEUE_PRIORITY: Record = { "To Improve": 3, @@ -74,7 +24,7 @@ export function getTaskPriority(label: QueueLabel, issue: Issue): number { return QUEUE_PRIORITY[label] * 10000 - issue.iid; } -export function getRoleForLabel(label: QueueLabel): Role { +export function getRoleForLabel(label: QueueLabel): "dev" | "qa" { return label === "To Test" ? "qa" : "dev"; } @@ -97,151 +47,3 @@ export async function fetchProjectQueues(project: Project): Promise, - isActive: boolean, activeIssueId: string | null, - startSeq: number, -): { track: ProjectTrack; nextSequence: number } { - const tasks: SequencedTask[] = []; - let seq = startSeq; - - for (const label of ["To Improve", "To Test", "To Do"] as QueueLabel[]) { - if (getRoleForLabel(label) !== role) continue; - for (const issue of queues[label]) { - tasks.push({ - sequence: seq++, projectId, projectName, role, - issueId: issue.iid, title: issue.title, label, - active: isActive && activeIssueId === String(issue.iid), - }); - } - } - - return { track: { name: role === "dev" ? "DEV Track" : "QA Track", role, tasks }, nextSequence: seq }; -} - -// --------------------------------------------------------------------------- -// Sequence building -// --------------------------------------------------------------------------- - -export function buildParallelProjectSequences(projectQueues: ProjectQueues[]): ProjectTaskSequence[] { - return projectQueues.map(({ projectId, project, queues }) => { - const roleExecution = project.roleExecution ?? "parallel"; - const tracks: ProjectTrack[] = []; - - if (roleExecution === "sequential") { - // Build alternating DEV/QA sequence - const alternating = buildAlternatingTrack(projectId, project, queues); - if (alternating.tasks.length > 0) tracks.push(alternating); - } else { - const dev = buildProjectTrack(projectId, project.name, "dev", queues, project.dev.active, project.dev.issueId, 1); - const qa = buildProjectTrack(projectId, project.name, "qa", queues, project.qa.active, project.qa.issueId, 1); - if (dev.track.tasks.length > 0) tracks.push(dev.track); - if (qa.track.tasks.length > 0) tracks.push(qa.track); - } - - return { projectId, projectName: project.name, roleExecution, tracks }; - }); -} - -function buildAlternatingTrack( - projectId: string, project: Project, queues: Record, -): ProjectTrack { - const tasks: SequencedTask[] = []; - const added = new Set(); - let seq = 1; - - const nextForRole = (role: Role): SequencedTask | null => { - for (const label of ["To Improve", "To Test", "To Do"] as QueueLabel[]) { - if (getRoleForLabel(label) !== role) continue; - for (const issue of queues[label]) { - if (added.has(issue.iid)) continue; - const isActive = - (role === "dev" && project.dev.active && project.dev.issueId === String(issue.iid)) || - (role === "qa" && project.qa.active && project.qa.issueId === String(issue.iid)); - return { sequence: 0, projectId, projectName: project.name, role, issueId: issue.iid, title: issue.title, label, active: isActive }; - } - } - return null; - }; - - // Start with active task - for (const role of ["dev", "qa"] as Role[]) { - const w = project[role]; - if (w.active && w.issueId) { - const t = nextForRole(role); - if (t) { t.sequence = seq++; t.active = true; tasks.push(t); added.add(t.issueId); break; } - } - } - - // Alternate - let lastRole: Role | null = tasks[0]?.role ?? null; - while (true) { - const next = nextForRole(lastRole === "dev" ? "qa" : "dev"); - if (!next) break; - next.sequence = seq++; - tasks.push(next); - added.add(next.issueId); - lastRole = next.role; - } - - return { name: "DEV/QA Alternating", role: "dev", tasks }; -} - -export function buildGlobalTaskSequence(projectQueues: ProjectQueues[]): GlobalTaskSequence { - const all: Array<{ projectId: string; projectName: string; role: Role; label: QueueLabel; issue: Issue; priority: number }> = []; - - for (const { projectId, project, queues } of projectQueues) { - for (const label of ["To Improve", "To Test", "To Do"] as QueueLabel[]) { - for (const issue of queues[label]) { - all.push({ projectId, projectName: project.name, role: getRoleForLabel(label), label, issue, priority: getTaskPriority(label, issue) }); - } - } - } - - all.sort((a, b) => b.priority !== a.priority ? b.priority - a.priority : a.issue.iid - b.issue.iid); - - const tasks: SequencedTask[] = []; - const added = new Set(); - let seq = 1; - - // Active task first - const active = projectQueues.find(({ project }) => project.dev.active || project.qa.active); - if (active) { - const { project, projectId } = active; - for (const [role, w] of [["dev", project.dev], ["qa", project.qa]] as const) { - if (w.active && w.issueId) { - const t = all.find((t) => t.projectId === projectId && t.role === role && String(t.issue.iid) === w.issueId); - if (t) { - const key = `${t.projectId}:${t.issue.iid}`; - tasks.push({ sequence: seq++, projectId: t.projectId, projectName: t.projectName, role: t.role, issueId: t.issue.iid, title: t.issue.title, label: t.label, active: true }); - added.add(key); - break; - } - } - } - } - - for (const t of all) { - const key = `${t.projectId}:${t.issue.iid}`; - if (added.has(key)) continue; - tasks.push({ sequence: seq++, projectId: t.projectId, projectName: t.projectName, role: t.role, issueId: t.issue.iid, title: t.issue.title, label: t.label, active: false }); - added.add(key); - } - - return { mode: "sequential", tasks }; -} - -// --------------------------------------------------------------------------- -// Formatting -// --------------------------------------------------------------------------- - -export function formatProjectQueues(queues: Record) { - const fmt = (label: QueueLabel) => queues[label].map((i) => ({ id: i.iid, title: i.title, priority: QUEUE_PRIORITY[label] })); - return { toImprove: fmt("To Improve"), toTest: fmt("To Test"), toDo: fmt("To Do") }; -} diff --git a/lib/services/tick.ts b/lib/services/tick.ts index 06932fd..373d96b 100644 --- a/lib/services/tick.ts +++ b/lib/services/tick.ts @@ -2,7 +2,7 @@ * tick.ts — Project-level queue scan + dispatch. * * Core function: projectTick() scans one project's queue and fills free worker slots. - * Called by: work_start (fill parallel slot), work_finish (next pipeline step), auto_pickup (sweep). + * Called by: work_start (fill parallel slot), work_finish (next pipeline step), work_heartbeat (sweep). */ import type { Issue, StateLabel } from "../providers/provider.js"; import type { IssueProvider } from "../providers/provider.js"; @@ -90,7 +90,7 @@ export type TickResult = { /** * Scan one project's queue and fill free worker slots. * - * Does NOT run health checks (that's auto_pickup's job). + * Does NOT run health checks (that's work_heartbeat's job). * Non-destructive: only dispatches if slots are free and issues are queued. */ export async function projectTick(opts: { @@ -103,13 +103,15 @@ export async function projectTick(opts: { maxPickups?: number; /** Only attempt this role. Used by work_start to fill the other slot. */ targetRole?: "dev" | "qa"; + /** Optional provider override (for testing). Uses createProvider if omitted. */ + provider?: Pick; }): Promise { const { workspaceDir, groupId, agentId, sessionKey, pluginConfig, dryRun, maxPickups, targetRole } = opts; const project = (await readProjects(workspaceDir)).projects[groupId]; if (!project) return { pickups: [], skipped: [{ reason: `Project not found: ${groupId}` }] }; - const { provider } = createProvider({ repo: project.repo }); + const provider = opts.provider ?? createProvider({ repo: project.repo }).provider; const roleExecution = project.roleExecution ?? "parallel"; const roles: Array<"dev" | "qa"> = targetRole ? [targetRole] : ["dev", "qa"]; diff --git a/lib/setup/config.ts b/lib/setup/config.ts index b6e9695..5272dd4 100644 --- a/lib/setup/config.ts +++ b/lib/setup/config.ts @@ -6,6 +6,7 @@ import fs from "node:fs/promises"; import path from "node:path"; import type { Tier } from "../tiers.js"; +import { HEARTBEAT_DEFAULTS } from "../services/heartbeat.js"; function openclawConfigPath(): string { return path.join(process.env.HOME ?? "/home/lauren", ".openclaw", "openclaw.json"); @@ -35,6 +36,7 @@ export async function writePluginConfig( config.plugins.entries.devclaw.config.projectExecution = projectExecution; } + ensureHeartbeatDefaults(config); configureSubagentCleanup(config); if (agentId) { @@ -86,3 +88,10 @@ function addToolRestrictions(config: Record, agentId: string): delete agent.tools.allow; } } + +function ensureHeartbeatDefaults(config: Record): void { + const devclaw = (config as any).plugins.entries.devclaw.config; + if (!devclaw.work_heartbeat) { + devclaw.work_heartbeat = { ...HEARTBEAT_DEFAULTS }; + } +} diff --git a/lib/templates.ts b/lib/templates.ts index 2447fc2..f37eac4 100644 --- a/lib/templates.ts +++ b/lib/templates.ts @@ -26,7 +26,7 @@ Read the comments carefully — they often contain clarifications, decisions, or - Clean up the worktree after merging - When done, call work_finish with role "dev", result "done", and a brief summary - If you discover unrelated bugs, call task_create to file them -- Do NOT call work_start, status, or project_register +- Do NOT call work_start, status, health, work_heartbeat, or project_register `; export const DEFAULT_QA_INSTRUCTIONS = `# QA Worker Instructions @@ -41,7 +41,7 @@ export const DEFAULT_QA_INSTRUCTIONS = `# QA Worker Instructions - result "fail" with specific issues if problems found - result "refine" if you need human input to decide - If you discover unrelated bugs, call task_create to file them -- Do NOT call work_start, status, or project_register +- Do NOT call work_start, status, health, work_heartbeat, or project_register `; export const AGENTS_MD_TEMPLATE = `# AGENTS.md - Development Orchestration (DevClaw) @@ -82,7 +82,7 @@ If you discover unrelated bugs or needed improvements during your work, call \`t ### Tools You Should NOT Use These are orchestrator-only tools. Do not call them: -- \`work_start\`, \`status\`, \`auto_pickup\`, \`project_register\` +- \`work_start\`, \`status\`, \`health\`, \`work_heartbeat\`, \`project_register\` --- @@ -99,7 +99,8 @@ All orchestration goes through these tools. You do NOT manually manage sessions, | \`project_register\` | One-time project setup: creates labels, scaffolds role files, adds to projects.json | | \`task_create\` | Create issues from chat (bugs, features, tasks) | | \`task_update\` | Update issue title, description, or labels | -| \`status\` | Scans issue queue + worker state + health checks | +| \`status\` | Task queue and worker state per project (lightweight dashboard) | +| \`health\` | Scan worker health: zombies, stale workers, orphaned state. Pass fix=true to auto-fix | | \`work_start\` | End-to-end: label transition, tier assignment, session create/reuse, dispatch with role instructions | | \`work_finish\` | End-to-end: label transition, state update, issue close/reopen. Auto-ticks queue after completion. | @@ -149,7 +150,7 @@ Workers receive role-specific instructions appended to their task message. These ### Heartbeats -On heartbeat, call \`auto_pickup\` — it runs health checks and picks up available work automatically. +**Do nothing.** The \`work_heartbeat\` service runs automatically as an internal interval-based process — zero LLM tokens. It handles health checks (zombie detection, stale workers) and queue dispatch (filling free worker slots by priority) every 60 seconds by default. Configure via \`plugins.entries.devclaw.config.work_heartbeat\` in openclaw.json. ### Safety @@ -161,15 +162,5 @@ On heartbeat, call \`auto_pickup\` — it runs health checks and picks up availa export const HEARTBEAT_MD_TEMPLATE = `# HEARTBEAT.md -On each heartbeat, call \`auto_pickup\` (no parameters needed for a full sweep). - -It will automatically: -1. Run health checks (zombie workers, stale sessions) -2. Scan queues across all projects -3. Pick up available work by priority: To Improve > To Test > To Do -4. Choose appropriate developer tier based on task complexity - -If you want to target a single project, pass \`projectGroupId\`. - -If nothing needs attention, it reports HEARTBEAT_OK. +Do nothing. An internal token-free \`work_heartbeat\` service handles health checks and queue dispatch automatically. `; diff --git a/lib/tools/auto-pickup.ts b/lib/tools/auto-pickup.ts deleted file mode 100644 index 58e058c..0000000 --- a/lib/tools/auto-pickup.ts +++ /dev/null @@ -1,125 +0,0 @@ -/** - * auto_pickup — Automated task pickup (heartbeat handler). - * - * Health checks → projectTick per project → notify. - * Optional projectGroupId for single-project or all-project sweep. - */ -import type { OpenClawPluginApi } from "openclaw/plugin-sdk"; -import { jsonResult } from "openclaw/plugin-sdk"; -import type { ToolContext } from "../types.js"; -import { readProjects } from "../projects.js"; -import { log as auditLog } from "../audit.js"; -import { notify, notifyTickPickups, getNotificationConfig } from "../notify.js"; -import { checkWorkerHealth, type HealthFix } from "../services/health.js"; -import { projectTick, type TickAction } from "../services/tick.js"; -import { requireWorkspaceDir, resolveContext, resolveProvider, getPluginConfig } from "../tool-helpers.js"; - -type ExecutionMode = "parallel" | "sequential"; - -export function createAutoPickupTool(api: OpenClawPluginApi) { - return (ctx: ToolContext) => ({ - name: "auto_pickup", - label: "Auto Pickup", - description: `Automated task pickup. With projectGroupId: targets one project. Without: sweeps all projects. Runs health checks, then fills free worker slots by priority.`, - parameters: { - type: "object", - properties: { - projectGroupId: { type: "string", description: "Target a single project. Omit to sweep all." }, - dryRun: { type: "boolean", description: "Report only, don't dispatch. Default: false." }, - maxPickups: { type: "number", description: "Max pickups per tick." }, - activeSessions: { type: "array", items: { type: "string" }, description: "Active session IDs for zombie detection." }, - }, - }, - - async execute(_id: string, params: Record) { - const targetGroupId = params.projectGroupId as string | undefined; - const dryRun = (params.dryRun as boolean) ?? false; - const maxPickups = params.maxPickups as number | undefined; - const activeSessions = (params.activeSessions as string[]) ?? []; - const workspaceDir = requireWorkspaceDir(ctx); - - const pluginConfig = getPluginConfig(api); - const projectExecution: ExecutionMode = (pluginConfig?.projectExecution as ExecutionMode) ?? "parallel"; - - const data = await readProjects(workspaceDir); - const projectEntries = targetGroupId - ? [[targetGroupId, data.projects[targetGroupId]] as const].filter(([, p]) => p) - : Object.entries(data.projects); - - if (projectEntries.length === 0) { - return jsonResult({ success: true, dryRun, healthFixes: [], pickups: [], skipped: [{ project: "(none)", reason: "No projects" }] }); - } - - const healthFixes: Array = []; - const pickups: Array = []; - const skipped: Array<{ project: string; role?: string; reason: string }> = []; - let globalActiveDev = 0, globalActiveQa = 0, activeProjectCount = 0, pickupCount = 0; - - // Pass 1: health checks - for (const [groupId, project] of projectEntries) { - const { provider } = resolveProvider(project); - for (const role of ["dev", "qa"] as const) { - const fixes = await checkWorkerHealth({ workspaceDir, groupId, project, role, activeSessions, autoFix: !dryRun, provider }); - healthFixes.push(...fixes.map((f) => ({ ...f, project: project.name, role }))); - } - const refreshed = (await readProjects(workspaceDir)).projects[groupId]; - if (refreshed) { - if (refreshed.dev.active) globalActiveDev++; - if (refreshed.qa.active) globalActiveQa++; - if (refreshed.dev.active || refreshed.qa.active) activeProjectCount++; - } - } - - // Pass 2: projectTick per project - for (const [groupId] of projectEntries) { - const current = (await readProjects(workspaceDir)).projects[groupId]; - if (!current) continue; - const projectActive = current.dev.active || current.qa.active; - - // Sequential project guard (needs global state) - if (projectExecution === "sequential" && !projectActive && activeProjectCount >= 1) { - skipped.push({ project: current.name, reason: "Sequential: another project active" }); - continue; - } - - const remaining = maxPickups !== undefined ? maxPickups - pickupCount : undefined; - const result = await projectTick({ - workspaceDir, groupId, agentId: ctx.agentId, pluginConfig, sessionKey: ctx.sessionKey, - dryRun, maxPickups: remaining, - }); - - pickups.push(...result.pickups.map((p) => ({ ...p, project: current.name }))); - skipped.push(...result.skipped.map((s) => ({ project: current.name, ...s }))); - pickupCount += result.pickups.length; - - // Send workerStart notifications for each pickup in this project - if (!dryRun && result.pickups.length > 0) { - const notifyConfig = getNotificationConfig(pluginConfig); - await notifyTickPickups(result.pickups, { workspaceDir, config: notifyConfig, channel: current.channel ?? "telegram" }); - } - for (const p of result.pickups) { - if (p.role === "dev") globalActiveDev++; else globalActiveQa++; - } - if (result.pickups.length > 0 && !projectActive) activeProjectCount++; - } - - await auditLog(workspaceDir, "auto_pickup", { - dryRun, projectExecution, projectsScanned: projectEntries.length, - healthFixes: healthFixes.length, pickups: pickups.length, skipped: skipped.length, - }); - - // Notify - const context = await resolveContext(ctx, api); - const notifyConfig = getNotificationConfig(pluginConfig); - await notify( - { type: "heartbeat", projectsScanned: projectEntries.length, healthFixes: healthFixes.length, pickups: pickups.length, skipped: skipped.length, dryRun, pickupDetails: pickups.map((p) => ({ project: p.project, issueId: p.issueId, role: p.role })) }, - { workspaceDir, config: notifyConfig, orchestratorDm: context.type === "direct" ? context.chatId : undefined, channel: "channel" in context ? context.channel : undefined }, - ); - - return jsonResult({ - success: true, dryRun, projectExecution, healthFixes, pickups, skipped, - globalState: { activeProjects: activeProjectCount, activeDev: globalActiveDev, activeQa: globalActiveQa }, - }); - }, - }); -} diff --git a/lib/tools/health.ts b/lib/tools/health.ts new file mode 100644 index 0000000..ad24c68 --- /dev/null +++ b/lib/tools/health.ts @@ -0,0 +1,74 @@ +/** + * health — Worker health scan with optional auto-fix. + * + * Read-only by default (surfaces issues). Pass fix=true to apply fixes. + * Context-aware: auto-filters to project in group chats. + */ +import type { OpenClawPluginApi } from "openclaw/plugin-sdk"; +import { jsonResult } from "openclaw/plugin-sdk"; +import type { ToolContext } from "../types.js"; +import { readProjects, getProject, type Project } from "../projects.js"; +import { log as auditLog } from "../audit.js"; +import { checkWorkerHealth, type HealthFix } from "../services/health.js"; +import { requireWorkspaceDir, resolveContext, resolveProvider } from "../tool-helpers.js"; + +export function createHealthTool(api: OpenClawPluginApi) { + return (ctx: ToolContext) => ({ + name: "health", + label: "Health", + description: `Scan worker health across projects. Detects zombies, stale workers, orphaned state. Pass fix=true to auto-fix. Context-aware: auto-filters in group chats.`, + parameters: { + type: "object", + properties: { + projectGroupId: { type: "string", description: "Filter to specific project. Omit for all." }, + fix: { type: "boolean", description: "Apply fixes for detected issues. Default: false (read-only)." }, + activeSessions: { type: "array", items: { type: "string" }, description: "Active session IDs for zombie detection." }, + }, + }, + + async execute(_id: string, params: Record) { + const workspaceDir = requireWorkspaceDir(ctx); + const fix = (params.fix as boolean) ?? false; + const activeSessions = (params.activeSessions as string[]) ?? []; + + // Auto-filter in group context + const context = await resolveContext(ctx, api); + let groupId = params.projectGroupId as string | undefined; + if (context.type === "group" && !groupId) groupId = context.groupId; + + const data = await readProjects(workspaceDir); + const projectIds = groupId ? [groupId] : Object.keys(data.projects); + + const issues: Array = []; + + for (const pid of projectIds) { + const project = getProject(data, pid); + if (!project) continue; + const { provider } = resolveProvider(project); + + for (const role of ["dev", "qa"] as const) { + const fixes = await checkWorkerHealth({ + workspaceDir, groupId: pid, project, role, activeSessions, + autoFix: fix, provider, + }); + issues.push(...fixes.map((f) => ({ ...f, project: project.name, role }))); + } + } + + await auditLog(workspaceDir, "health", { + projectCount: projectIds.length, + fix, + issuesFound: issues.length, + issuesFixed: issues.filter((i) => i.fixed).length, + }); + + return jsonResult({ + success: true, + fix, + projectsScanned: projectIds.length, + issues, + note: activeSessions.length === 0 ? "No activeSessions provided — zombie detection skipped." : undefined, + }); + }, + }); +} diff --git a/lib/tools/status.ts b/lib/tools/status.ts index e9d98b1..9b557e1 100644 --- a/lib/tools/status.ts +++ b/lib/tools/status.ts @@ -1,40 +1,33 @@ /** - * status — Unified queue + health overview. + * status — Lightweight queue + worker state dashboard. * - * Merges queue_status + session_health into a single tool. + * Shows worker state and queue counts per project. No health checks + * (use `health` tool), no complex sequencing. * Context-aware: auto-filters to project in group chats. */ import type { OpenClawPluginApi } from "openclaw/plugin-sdk"; import { jsonResult } from "openclaw/plugin-sdk"; import type { ToolContext } from "../types.js"; -import { readProjects, getProject, type Project } from "../projects.js"; +import { readProjects, getProject } from "../projects.js"; import { generateGuardrails } from "../context-guard.js"; import { log as auditLog } from "../audit.js"; -import { checkWorkerHealth } from "../services/health.js"; -import { - fetchProjectQueues, buildParallelProjectSequences, buildGlobalTaskSequence, - formatProjectQueues, type ProjectQueues, type ProjectExecutionConfig, -} from "../services/queue.js"; -import { requireWorkspaceDir, resolveContext, resolveProvider, getPluginConfig } from "../tool-helpers.js"; +import { fetchProjectQueues, type QueueLabel } from "../services/queue.js"; +import { requireWorkspaceDir, resolveContext, getPluginConfig } from "../tool-helpers.js"; export function createStatusTool(api: OpenClawPluginApi) { return (ctx: ToolContext) => ({ name: "status", label: "Status", - description: `Show task queue, worker status, and health across projects. Context-aware: auto-filters in group chats. Pass activeSessions for zombie detection.`, + description: `Show task queue and worker state per project. Context-aware: auto-filters in group chats. Use \`health\` tool for worker health checks.`, parameters: { type: "object", properties: { projectGroupId: { type: "string", description: "Filter to specific project. Omit for all." }, - includeHealth: { type: "boolean", description: "Run health checks. Default: true." }, - activeSessions: { type: "array", items: { type: "string" }, description: "Active session IDs for zombie detection." }, }, }, async execute(_id: string, params: Record) { const workspaceDir = requireWorkspaceDir(ctx); - const includeHealth = (params.includeHealth as boolean) ?? true; - const activeSessions = (params.activeSessions as string[]) ?? []; const context = await resolveContext(ctx, api); if (context.type === "via-agent") { @@ -51,75 +44,42 @@ export function createStatusTool(api: OpenClawPluginApi) { if (context.type === "group" && !groupId) groupId = context.groupId; const pluginConfig = getPluginConfig(api); - const projectExecution = (pluginConfig?.projectExecution as "parallel" | "sequential") ?? "parallel"; + const projectExecution = (pluginConfig?.projectExecution as string) ?? "parallel"; const data = await readProjects(workspaceDir); const projectIds = groupId ? [groupId] : Object.keys(data.projects); - // Build execution configs + fetch queues - const configs: ProjectExecutionConfig[] = []; - const projectList: Array<{ id: string; project: Project }> = []; + // Build project summaries with queue counts + const projects = await Promise.all( + projectIds.map(async (pid) => { + const project = getProject(data, pid); + if (!project) return null; - for (const pid of projectIds) { - const project = getProject(data, pid); - if (!project) continue; - projectList.push({ id: pid, project }); - configs.push({ - name: project.name, groupId: pid, - roleExecution: project.roleExecution ?? "parallel", - devActive: project.dev.active, qaActive: project.qa.active, - devIssueId: project.dev.issueId, qaIssueId: project.qa.issueId, - }); - } + const queues = await fetchProjectQueues(project); + const count = (label: QueueLabel) => queues[label].length; - // Health checks (read-only — never auto-fix from status) - const healthIssues: Array> = []; - if (includeHealth) { - for (const { id, project } of projectList) { - const { provider } = resolveProvider(project); - for (const role of ["dev", "qa"] as const) { - const fixes = await checkWorkerHealth({ - workspaceDir, groupId: id, project, role, activeSessions, - autoFix: false, provider, - }); - for (const f of fixes) healthIssues.push({ ...f.issue, fixed: f.fixed }); - } - } - } - - // Fetch queues - const projectQueues: ProjectQueues[] = await Promise.all( - projectList.map(async ({ id, project }) => ({ - projectId: id, project, - queues: await fetchProjectQueues(project), - })), + return { + name: project.name, + groupId: pid, + roleExecution: project.roleExecution ?? "parallel", + dev: { active: project.dev.active, issueId: project.dev.issueId, tier: project.dev.tier, startTime: project.dev.startTime }, + qa: { active: project.qa.active, issueId: project.qa.issueId, tier: project.qa.tier, startTime: project.qa.startTime }, + queue: { toImprove: count("To Improve"), toTest: count("To Test"), toDo: count("To Do") }, + }; + }), ); - // Build sequences - const sequences = projectExecution === "sequential" - ? { mode: "sequential" as const, global: buildGlobalTaskSequence(projectQueues) } - : { mode: "parallel" as const, projects: buildParallelProjectSequences(projectQueues) }; - - // Build project details - const projects = projectQueues.map(({ projectId, project, queues }) => ({ - name: project.name, groupId: projectId, - dev: { active: project.dev.active, issueId: project.dev.issueId, tier: project.dev.tier, sessions: project.dev.sessions }, - qa: { active: project.qa.active, issueId: project.qa.issueId, tier: project.qa.tier, sessions: project.qa.sessions }, - queue: formatProjectQueues(queues), - })); + const filtered = projects.filter(Boolean); await auditLog(workspaceDir, "status", { - projectCount: projects.length, - totalToImprove: projects.reduce((s, p) => s + p.queue.toImprove.length, 0), - totalToTest: projects.reduce((s, p) => s + p.queue.toTest.length, 0), - totalToDo: projects.reduce((s, p) => s + p.queue.toDo.length, 0), - healthIssues: healthIssues.length, + projectCount: filtered.length, + totalQueued: filtered.reduce((s, p) => s + p!.queue.toImprove + p!.queue.toTest + p!.queue.toDo, 0), }); return jsonResult({ - execution: { plugin: { projectExecution }, projects: configs }, - sequences, projects, - health: includeHealth ? { issues: healthIssues, note: activeSessions.length === 0 ? "No activeSessions — zombie detection skipped." : undefined } : undefined, + success: true, + execution: { projectExecution }, + projects: filtered, context: { type: context.type, ...(context.type === "group" && { projectName: context.projectName, autoFiltered: !params.projectGroupId }), diff --git a/lib/tools/task-create.ts b/lib/tools/task-create.ts index b7fdba8..c85d141 100644 --- a/lib/tools/task-create.ts +++ b/lib/tools/task-create.ts @@ -22,12 +22,11 @@ export function createTaskCreateTool(api: OpenClawPluginApi) { label: "Task Create", description: `Create a new task (issue) in the project's issue tracker. Use this to file bugs, features, or tasks from chat. -Examples: -- Simple: { title: "Fix login bug" } -- With body: { title: "Add dark mode", description: "## Why\nUsers want dark mode...\n\n## Acceptance Criteria\n- [ ] Toggle in settings" } -- Ready for dev: { title: "Implement auth", description: "...", label: "To Do", pickup: true } +**IMPORTANT:** Always creates in "Planning" unless the user explicitly asks to start work immediately. Never set label to "To Do" on your own — "Planning" issues require human review before entering the queue. -The issue is created with a state label (defaults to "Planning"). Returns the created issue for immediate pickup.`, +Examples: +- Default: { title: "Fix login bug" } → created in Planning +- User says "create and start working": { title: "Implement auth", description: "...", label: "To Do" }`, parameters: { type: "object", required: ["projectGroupId", "title"], @@ -46,7 +45,7 @@ The issue is created with a state label (defaults to "Planning"). Returns the cr }, label: { type: "string", - description: `State label for the issue. One of: ${STATE_LABELS.join(", ")}. Defaults to "Planning".`, + description: `State label. Defaults to "Planning" — only use "To Do" when the user explicitly asks to start work immediately.`, enum: STATE_LABELS, }, assignees: { diff --git a/lib/tools/work-heartbeat.test.ts b/lib/tools/work-heartbeat.test.ts new file mode 100644 index 0000000..17e9820 --- /dev/null +++ b/lib/tools/work-heartbeat.test.ts @@ -0,0 +1,401 @@ +/** + * Tests for work_heartbeat logic: project resolution, tick behavior, execution guards. + * + * Uses projectTick with dryRun: true to test the decision logic without + * requiring OpenClaw API (sessions, dispatch). Mock providers simulate + * issue queues; real projects.json fixtures simulate worker state. + * + * Run with: npx tsx --test lib/tools/work-heartbeat.test.ts + */ +import { describe, it, afterEach } from "node:test"; +import assert from "node:assert"; +import fs from "node:fs/promises"; +import path from "node:path"; +import os from "node:os"; +import type { Project, WorkerState } from "../projects.js"; +import { readProjects } from "../projects.js"; +import { projectTick } from "../services/tick.js"; +import type { StateLabel } from "../providers/provider.js"; + +// --------------------------------------------------------------------------- +// Test fixtures +// --------------------------------------------------------------------------- + +const INACTIVE_WORKER: WorkerState = { + active: false, issueId: null, startTime: null, tier: null, sessions: {}, +}; + +const ACTIVE_DEV: WorkerState = { + active: true, issueId: "42", startTime: new Date().toISOString(), tier: "medior", + sessions: { medior: "session-dev-42" }, +}; + +const ACTIVE_QA: WorkerState = { + active: true, issueId: "42", startTime: new Date().toISOString(), tier: "qa", + sessions: { qa: "session-qa-42" }, +}; + +function makeProject(overrides: Partial = {}): Project { + return { + name: "Test Project", + repo: "https://github.com/test/repo", + groupName: "Test Group", + deployUrl: "", + baseBranch: "main", + deployBranch: "main", + dev: { ...INACTIVE_WORKER }, + qa: { ...INACTIVE_WORKER }, + ...overrides, + }; +} + +/** Minimal mock provider that returns pre-configured issues per label. */ +function mockProvider(issuesByLabel: Partial>>) { + return { + listIssuesByLabel: async (label: string) => issuesByLabel[label as StateLabel] ?? [], + getIssue: async () => { throw new Error("not implemented"); }, + transitionLabel: async () => {}, + getCurrentStateLabel: () => null, + }; +} + +// --------------------------------------------------------------------------- +// Temp workspace helpers +// --------------------------------------------------------------------------- + +let tmpDir: string; + +async function setupWorkspace(projects: Record): Promise { + tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "devclaw-test-")); + const memoryDir = path.join(tmpDir, "memory"); + await fs.mkdir(memoryDir, { recursive: true }); + await fs.writeFile( + path.join(memoryDir, "projects.json"), + JSON.stringify({ projects }, null, 2) + "\n", + "utf-8", + ); + return tmpDir; +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +describe("work_heartbeat: project resolution", () => { + afterEach(async () => { if (tmpDir) await fs.rm(tmpDir, { recursive: true }).catch(() => {}); }); + + it("resolves all projects when no targetGroupId", async () => { + // Given: two registered projects + const workspaceDir = await setupWorkspace({ + "-100": makeProject({ name: "Alpha" }), + "-200": makeProject({ name: "Beta" }), + }); + + const data = await readProjects(workspaceDir); + const entries = Object.entries(data.projects); + + assert.strictEqual(entries.length, 2); + assert.deepStrictEqual(entries.map(([, p]) => p.name).sort(), ["Alpha", "Beta"]); + }); + + it("resolves single project when targetGroupId given", async () => { + const workspaceDir = await setupWorkspace({ + "-100": makeProject({ name: "Alpha" }), + "-200": makeProject({ name: "Beta" }), + }); + + const data = await readProjects(workspaceDir); + const project = data.projects["-100"]; + + assert.ok(project); + assert.strictEqual(project.name, "Alpha"); + }); + + it("returns empty for unknown targetGroupId", async () => { + const workspaceDir = await setupWorkspace({ + "-100": makeProject({ name: "Alpha" }), + }); + + const data = await readProjects(workspaceDir); + assert.strictEqual(data.projects["-999"], undefined); + }); +}); + +describe("work_heartbeat: global state snapshot", () => { + afterEach(async () => { if (tmpDir) await fs.rm(tmpDir, { recursive: true }).catch(() => {}); }); + + it("counts active workers across projects", async () => { + // Given: Alpha has active DEV, Beta has active QA, Gamma is idle + const workspaceDir = await setupWorkspace({ + "-100": makeProject({ name: "Alpha", dev: { ...ACTIVE_DEV } }), + "-200": makeProject({ name: "Beta", qa: { ...ACTIVE_QA } }), + "-300": makeProject({ name: "Gamma" }), + }); + + const data = await readProjects(workspaceDir); + let activeDev = 0, activeQa = 0, activeProjects = 0; + for (const p of Object.values(data.projects)) { + if (p.dev.active) activeDev++; + if (p.qa.active) activeQa++; + if (p.dev.active || p.qa.active) activeProjects++; + } + + assert.strictEqual(activeDev, 1, "One active DEV worker (Alpha)"); + assert.strictEqual(activeQa, 1, "One active QA worker (Beta)"); + assert.strictEqual(activeProjects, 2, "Two projects have active workers"); + }); +}); + +describe("work_heartbeat: priority ordering (dry run)", () => { + afterEach(async () => { if (tmpDir) await fs.rm(tmpDir, { recursive: true }).catch(() => {}); }); + + it("picks To Improve over To Do for dev", async () => { + // Given: project with both "To Improve" and "To Do" issues + // Expected: projectTick picks the To Improve issue (higher priority) + const workspaceDir = await setupWorkspace({ + "-100": makeProject({ name: "Alpha", repo: "https://github.com/test/alpha" }), + }); + + // To Improve = fix failures (priority 1), To Do = new work (priority 3) + // Priority order: To Improve > To Test > To Do + const provider = mockProvider({ + "To Improve": [{ iid: 10, title: "Fix login bug", description: "", labels: ["To Improve"], web_url: "https://github.com/test/alpha/issues/10", state: "opened" }], + "To Do": [{ iid: 20, title: "Add dark mode", description: "", labels: ["To Do"], web_url: "https://github.com/test/alpha/issues/20", state: "opened" }], + }); + + // projectTick with dryRun shows what would be picked up + const result = await projectTick({ + workspaceDir, groupId: "-100", dryRun: true, provider, + }); + + // Should pick up #10 (To Improve) for dev, not #20 (To Do) + const devPickup = result.pickups.find((p) => p.role === "dev"); + assert.ok(devPickup, "Should pick up a dev task"); + assert.strictEqual(devPickup.issueId, 10, "Should pick To Improve (#10) over To Do (#20)"); + assert.strictEqual(devPickup.announcement, "[DRY RUN] Would pick up #10"); + }); + + it("picks To Test for qa role", async () => { + // Given: project with "To Test" issue, QA slot free + const workspaceDir = await setupWorkspace({ + "-100": makeProject({ name: "Alpha", repo: "https://github.com/test/alpha" }), + }); + + const provider = mockProvider({ + "To Test": [{ iid: 42, title: "Verify auth flow", description: "", labels: ["To Test"], web_url: "https://github.com/test/alpha/issues/42", state: "opened" }], + }); + + const result = await projectTick({ + workspaceDir, groupId: "-100", dryRun: true, provider, + }); + + const qaPickup = result.pickups.find((p) => p.role === "qa"); + assert.ok(qaPickup, "Should pick up a QA task"); + assert.strictEqual(qaPickup.issueId, 42); + assert.strictEqual(qaPickup.role, "qa"); + }); +}); + +describe("work_heartbeat: worker slot guards", () => { + afterEach(async () => { if (tmpDir) await fs.rm(tmpDir, { recursive: true }).catch(() => {}); }); + + it("skips role when worker already active", async () => { + // Given: DEV worker active on #42, To Do issues in queue + // Expected: skips DEV slot, only picks up QA if To Test available + const workspaceDir = await setupWorkspace({ + "-100": makeProject({ + name: "Alpha", + repo: "https://github.com/test/alpha", + dev: { ...ACTIVE_DEV }, + }), + }); + + const provider = mockProvider({ + "To Do": [{ iid: 99, title: "New feature", description: "", labels: ["To Do"], web_url: "https://github.com/test/alpha/issues/99", state: "opened" }], + }); + + const result = await projectTick({ + workspaceDir, groupId: "-100", dryRun: true, provider, + }); + + // DEV already active → skipped, no To Test → QA skipped too + assert.strictEqual(result.pickups.length, 0, "No pickups: DEV busy, no QA work"); + const devSkip = result.skipped.find((s) => s.role === "dev"); + assert.ok(devSkip, "Should have a skip reason for dev"); + assert.ok(devSkip.reason.includes("Already active"), "Skip reason should mention active worker"); + }); + + it("fills both slots in parallel mode", async () => { + // Given: parallel roleExecution (default), both DEV and QA slots free + // To Do issue + To Test issue available + const workspaceDir = await setupWorkspace({ + "-100": makeProject({ + name: "Alpha", + repo: "https://github.com/test/alpha", + roleExecution: "parallel", + }), + }); + + const provider = mockProvider({ + "To Do": [{ iid: 10, title: "Build API", description: "", labels: ["To Do"], web_url: "https://github.com/test/alpha/issues/10", state: "opened" }], + "To Test": [{ iid: 20, title: "Verify API", description: "", labels: ["To Test"], web_url: "https://github.com/test/alpha/issues/20", state: "opened" }], + }); + + const result = await projectTick({ + workspaceDir, groupId: "-100", dryRun: true, provider, + }); + + // Both slots should be filled + assert.strictEqual(result.pickups.length, 2, "Should pick up both DEV and QA"); + assert.ok(result.pickups.some((p) => p.role === "dev"), "Should have a dev pickup"); + assert.ok(result.pickups.some((p) => p.role === "qa"), "Should have a qa pickup"); + }); + + it("respects sequential roleExecution", async () => { + // Given: sequential roleExecution, DEV active on #42 + // To Test issue available for QA + // Expected: QA skipped because DEV is active (sequential = one role at a time) + const workspaceDir = await setupWorkspace({ + "-100": makeProject({ + name: "Alpha", + repo: "https://github.com/test/alpha", + roleExecution: "sequential", + dev: { ...ACTIVE_DEV }, + }), + }); + + const provider = mockProvider({ + "To Test": [{ iid: 20, title: "Verify fix", description: "", labels: ["To Test"], web_url: "https://github.com/test/alpha/issues/20", state: "opened" }], + }); + + const result = await projectTick({ + workspaceDir, groupId: "-100", dryRun: true, provider, + }); + + // DEV active + sequential → QA blocked + assert.strictEqual(result.pickups.length, 0, "No pickups in sequential mode with active DEV"); + const qaSkip = result.skipped.find((s) => s.role === "qa"); + assert.ok(qaSkip, "Should skip QA"); + assert.ok(qaSkip.reason.includes("Sequential"), "Skip reason should mention sequential"); + }); +}); + +describe("work_heartbeat: tier assignment", () => { + afterEach(async () => { if (tmpDir) await fs.rm(tmpDir, { recursive: true }).catch(() => {}); }); + + it("uses label-based tier when present", async () => { + // Given: issue with "senior" label → tier should be "senior" + const workspaceDir = await setupWorkspace({ + "-100": makeProject({ name: "Alpha", repo: "https://github.com/test/alpha" }), + }); + + const provider = mockProvider({ + "To Do": [{ iid: 10, title: "Refactor auth", description: "", labels: ["To Do", "senior"], web_url: "https://github.com/test/alpha/issues/10", state: "opened" }], + }); + + const result = await projectTick({ + workspaceDir, groupId: "-100", dryRun: true, provider, + }); + + const pickup = result.pickups.find((p) => p.role === "dev"); + assert.ok(pickup); + assert.strictEqual(pickup.tier, "senior", "Should use label-based tier"); + }); + + it("overrides to qa tier for qa role regardless of label", async () => { + // Given: issue with "senior" label but picked up by QA + // Expected: tier = "qa" (QA always uses qa tier) + const workspaceDir = await setupWorkspace({ + "-100": makeProject({ name: "Alpha", repo: "https://github.com/test/alpha" }), + }); + + const provider = mockProvider({ + "To Test": [{ iid: 10, title: "Review auth", description: "", labels: ["To Test", "senior"], web_url: "https://github.com/test/alpha/issues/10", state: "opened" }], + }); + + const result = await projectTick({ + workspaceDir, groupId: "-100", dryRun: true, provider, + }); + + const qaPickup = result.pickups.find((p) => p.role === "qa"); + assert.ok(qaPickup); + assert.strictEqual(qaPickup.tier, "qa", "QA always uses qa tier regardless of issue label"); + }); + + it("falls back to heuristic when no tier label", async () => { + // Given: issue with no tier label → heuristic selects based on title/description + const workspaceDir = await setupWorkspace({ + "-100": makeProject({ name: "Alpha", repo: "https://github.com/test/alpha" }), + }); + + const provider = mockProvider({ + "To Do": [{ iid: 10, title: "Fix typo in README", description: "Simple typo fix", labels: ["To Do"], web_url: "https://github.com/test/alpha/issues/10", state: "opened" }], + }); + + const result = await projectTick({ + workspaceDir, groupId: "-100", dryRun: true, provider, + }); + + const pickup = result.pickups.find((p) => p.role === "dev"); + assert.ok(pickup); + // Heuristic should select junior for a typo fix + assert.strictEqual(pickup.tier, "junior", "Heuristic should assign junior for simple typo fix"); + }); +}); + +describe("work_heartbeat: maxPickups budget", () => { + afterEach(async () => { if (tmpDir) await fs.rm(tmpDir, { recursive: true }).catch(() => {}); }); + + it("respects maxPickups limit", async () => { + // Given: both DEV and QA slots free, issues available for both + // maxPickups = 1 + // Expected: only one pickup + const workspaceDir = await setupWorkspace({ + "-100": makeProject({ name: "Alpha", repo: "https://github.com/test/alpha" }), + }); + + const provider = mockProvider({ + "To Do": [{ iid: 10, title: "Feature A", description: "", labels: ["To Do"], web_url: "https://github.com/test/alpha/issues/10", state: "opened" }], + "To Test": [{ iid: 20, title: "Review B", description: "", labels: ["To Test"], web_url: "https://github.com/test/alpha/issues/20", state: "opened" }], + }); + + const result = await projectTick({ + workspaceDir, groupId: "-100", dryRun: true, maxPickups: 1, provider, + }); + + assert.strictEqual(result.pickups.length, 1, "Should respect maxPickups=1"); + }); +}); + +describe("work_heartbeat: TickAction output shape", () => { + afterEach(async () => { if (tmpDir) await fs.rm(tmpDir, { recursive: true }).catch(() => {}); }); + + it("includes all fields needed for notifications", async () => { + // The TickAction must include issueUrl for workerStart notifications + const workspaceDir = await setupWorkspace({ + "-100": makeProject({ name: "Alpha", repo: "https://github.com/test/alpha" }), + }); + + const provider = mockProvider({ + "To Do": [{ iid: 10, title: "Build feature", description: "Details here", labels: ["To Do"], web_url: "https://github.com/test/alpha/issues/10", state: "opened" }], + }); + + const result = await projectTick({ + workspaceDir, groupId: "-100", dryRun: true, provider, + }); + + const pickup = result.pickups[0]; + assert.ok(pickup, "Should have a pickup"); + + // Verify all fields needed by notifyTickPickups + assert.strictEqual(pickup.project, "Alpha"); + assert.strictEqual(pickup.groupId, "-100"); + assert.strictEqual(pickup.issueId, 10); + assert.strictEqual(pickup.issueTitle, "Build feature"); + assert.strictEqual(pickup.issueUrl, "https://github.com/test/alpha/issues/10"); + assert.ok(["dev", "qa"].includes(pickup.role)); + assert.ok(typeof pickup.tier === "string"); + assert.ok(["spawn", "send"].includes(pickup.sessionAction)); + assert.ok(pickup.announcement.includes("[DRY RUN]")); + }); +}); diff --git a/lib/tools/work-heartbeat.ts b/lib/tools/work-heartbeat.ts new file mode 100644 index 0000000..d16a445 --- /dev/null +++ b/lib/tools/work-heartbeat.ts @@ -0,0 +1,310 @@ +/** + * work_heartbeat — Heartbeat handler: health fix + dispatch. + * + * Two-pass sweep: + * 1. Health pass: zombie detection + stale worker cleanup per project + * 2. Tick pass: fill free worker slots per project by priority + * + * Execution guards: + * - projectExecution (parallel|sequential): cross-project parallelism (this file) + * - roleExecution (parallel|sequential): DEV/QA parallelism (handled by projectTick) + */ +import type { OpenClawPluginApi } from "openclaw/plugin-sdk"; +import { jsonResult } from "openclaw/plugin-sdk"; +import type { ToolContext } from "../types.js"; +import type { Project } from "../projects.js"; +import { readProjects } from "../projects.js"; +import { log as auditLog } from "../audit.js"; +import { notify, notifyTickPickups, getNotificationConfig } from "../notify.js"; +import { checkWorkerHealth, type HealthFix } from "../services/health.js"; +import { projectTick, type TickAction } from "../services/tick.js"; +import { + requireWorkspaceDir, + resolveContext, + resolveProvider, + getPluginConfig, +} from "../tool-helpers.js"; + +// --------------------------------------------------------------------------- +// Types +// --------------------------------------------------------------------------- + +type ProjectEntry = readonly [groupId: string, project: Project]; + +type GlobalState = { + activeProjects: number; + activeDev: number; + activeQa: number; +}; + +// --------------------------------------------------------------------------- +// Tool +// --------------------------------------------------------------------------- + +export function createWorkHeartbeatTool(api: OpenClawPluginApi) { + return (ctx: ToolContext) => ({ + name: "work_heartbeat", + label: "Work Heartbeat", + description: `Heartbeat handler: health fix + dispatch. With projectGroupId: targets one project. Without: sweeps all. Runs health checks, then fills free worker slots by priority.`, + parameters: { + type: "object", + properties: { + projectGroupId: { + type: "string", + description: "Target a single project. Omit to sweep all.", + }, + dryRun: { + type: "boolean", + description: "Report only, don't dispatch. Default: false.", + }, + maxPickups: { type: "number", description: "Max pickups per tick." }, + activeSessions: { + type: "array", + items: { type: "string" }, + description: "Active session IDs for zombie detection.", + }, + }, + }, + + async execute(_id: string, params: Record) { + const targetGroupId = params.projectGroupId as string | undefined; + const dryRun = (params.dryRun as boolean) ?? false; + const maxPickups = params.maxPickups as number | undefined; + const activeSessions = (params.activeSessions as string[]) ?? []; + const workspaceDir = requireWorkspaceDir(ctx); + const pluginConfig = getPluginConfig(api); + const projectExecution = + (pluginConfig?.projectExecution as string) ?? "parallel"; + + // Resolve target projects + const entries = await resolveTargetProjects(workspaceDir, targetGroupId); + if (!entries.length) { + return jsonResult({ + success: true, + dryRun, + healthFixes: [], + pickups: [], + skipped: [{ project: "(none)", reason: "No projects" }], + }); + } + + // Pass 1: health checks (zombie detection, stale worker cleanup) + const healthFixes = await runHealthPass(entries, { + workspaceDir, + activeSessions, + dryRun, + }); + + // Snapshot global state after health fixes + const globalState = await snapshotGlobalState(workspaceDir, entries); + + // Pass 2: fill free worker slots per project + const notifyConfig = getNotificationConfig(pluginConfig); + const { pickups, skipped } = await runTickPass(entries, { + workspaceDir, + pluginConfig, + dryRun, + maxPickups, + notifyConfig, + agentId: ctx.agentId, + sessionKey: ctx.sessionKey, + projectExecution, + initialActiveProjects: globalState.activeProjects, + }); + + // Update global state with new pickups + for (const p of pickups) { + if (p.role === "dev") globalState.activeDev++; + else globalState.activeQa++; + } + globalState.activeProjects += pickups.filter( + (p, i, arr) => arr.findIndex((x) => x.groupId === p.groupId) === i, + ).length; + + // Audit + await auditLog(workspaceDir, "work_heartbeat", { + dryRun, + projectExecution, + projectsScanned: entries.length, + healthFixes: healthFixes.length, + pickups: pickups.length, + skipped: skipped.length, + }); + + // Heartbeat summary notification + const context = await resolveContext(ctx, api); + await notify( + { + type: "heartbeat", + projectsScanned: entries.length, + dryRun, + healthFixes: healthFixes.length, + pickups: pickups.length, + skipped: skipped.length, + pickupDetails: pickups.map((p) => ({ + project: p.project, + issueId: p.issueId, + role: p.role, + })), + }, + { + workspaceDir, + config: notifyConfig, + orchestratorDm: + context.type === "direct" ? context.chatId : undefined, + channel: "channel" in context ? context.channel : undefined, + }, + ); + + return jsonResult({ + success: true, + dryRun, + projectExecution, + healthFixes, + pickups, + skipped, + globalState, + }); + }, + }); +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +async function resolveTargetProjects( + workspaceDir: string, + targetGroupId?: string, +): Promise { + const data = await readProjects(workspaceDir); + if (targetGroupId) { + const project = data.projects[targetGroupId]; + return project ? [[targetGroupId, project]] : []; + } + return Object.entries(data.projects) as ProjectEntry[]; +} + +async function runHealthPass( + entries: ProjectEntry[], + opts: { workspaceDir: string; activeSessions: string[]; dryRun: boolean }, +): Promise> { + const fixes: Array = []; + for (const [groupId, project] of entries) { + const { provider } = resolveProvider(project); + for (const role of ["dev", "qa"] as const) { + const roleFixes = await checkWorkerHealth({ + workspaceDir: opts.workspaceDir, + groupId, + project, + role, + activeSessions: opts.activeSessions, + autoFix: !opts.dryRun, + provider, + }); + fixes.push( + ...roleFixes.map((f) => ({ ...f, project: project.name, role })), + ); + } + } + return fixes; +} + +async function snapshotGlobalState( + workspaceDir: string, + entries: ProjectEntry[], +): Promise { + const data = await readProjects(workspaceDir); + let activeDev = 0, + activeQa = 0, + activeProjects = 0; + for (const [groupId] of entries) { + const p = data.projects[groupId]; + if (!p) continue; + if (p.dev.active) activeDev++; + if (p.qa.active) activeQa++; + if (p.dev.active || p.qa.active) activeProjects++; + } + return { activeDev, activeQa, activeProjects }; +} + +async function runTickPass( + entries: ProjectEntry[], + opts: { + workspaceDir: string; + pluginConfig?: Record; + dryRun: boolean; + maxPickups?: number; + notifyConfig: ReturnType; + agentId?: string; + sessionKey?: string; + projectExecution: string; + initialActiveProjects: number; + }, +): Promise<{ + pickups: Array; + skipped: Array<{ project: string; role?: string; reason: string }>; +}> { + const pickups: Array = []; + const skipped: Array<{ project: string; role?: string; reason: string }> = []; + let pickupCount = 0; + let activeProjects = opts.initialActiveProjects; + + for (const [groupId] of entries) { + const current = (await readProjects(opts.workspaceDir)).projects[groupId]; + if (!current) continue; + + // Budget check + if (opts.maxPickups !== undefined && pickupCount >= opts.maxPickups) { + skipped.push({ project: current.name, reason: "Max pickups reached" }); + continue; + } + + // Sequential project guard: only one project active at a time + const projectActive = current.dev.active || current.qa.active; + if ( + opts.projectExecution === "sequential" && + !projectActive && + activeProjects >= 1 + ) { + skipped.push({ + project: current.name, + reason: "Sequential: another project active", + }); + continue; + } + + // projectTick handles roleExecution (parallel|sequential) internally + const remaining = + opts.maxPickups !== undefined ? opts.maxPickups - pickupCount : undefined; + const result = await projectTick({ + workspaceDir: opts.workspaceDir, + groupId, + agentId: opts.agentId, + pluginConfig: opts.pluginConfig, + sessionKey: opts.sessionKey, + dryRun: opts.dryRun, + maxPickups: remaining, + }); + + pickups.push( + ...result.pickups.map((p) => ({ ...p, project: current.name })), + ); + skipped.push( + ...result.skipped.map((s) => ({ project: current.name, ...s })), + ); + pickupCount += result.pickups.length; + + // Notify workerStart for each pickup in this project + if (!opts.dryRun && result.pickups.length > 0) { + await notifyTickPickups(result.pickups, { + workspaceDir: opts.workspaceDir, + config: opts.notifyConfig, + channel: current.channel ?? "telegram", + }); + if (!projectActive) activeProjects++; + } + } + + return { pickups, skipped }; +}