From 9afa318697a0ac84084308cce7ad874a3ae98d8e Mon Sep 17 00:00:00 2001 From: Lauren ten Hoor <32955832+laurentenhoor@users.noreply.github.com> Date: Mon, 9 Feb 2026 23:49:13 +0800 Subject: [PATCH] feat: add heartbeat_tick tool for automated task pickup (#13) (#14) Adds heartbeat_tick tool that automates task pickup across all projects: - Runs session health checks (zombie cleanup) before pickups - Loops over all registered projects - Picks up tasks by priority (To Improve > To Test > To Do) - Supports two work modes: - parallel: each project can have DEV+QA running independently - sequential: only 1 DEV + 1 QA globally (can be different projects) - Respects per-project maxDevWorkers/maxQaWorkers settings - Supports dryRun mode and maxPickups limit - Context guard: only works from DM/cron, blocks project groups Also adds: - workMode config option (parallel | sequential) - maxDevWorkers/maxQaWorkers fields to Project type --- index.ts | 12 +- lib/projects.ts | 2 + lib/tools/heartbeat-tick.ts | 502 ++++++++++++++++++++++++++++++++++++ 3 files changed, 515 insertions(+), 1 deletion(-) create mode 100644 lib/tools/heartbeat-tick.ts diff --git a/index.ts b/index.ts index b137dde..6bdeebe 100644 --- a/index.ts +++ b/index.ts @@ -9,6 +9,7 @@ import { createSetupTool } from "./lib/tools/devclaw-setup.js"; import { createOnboardTool } from "./lib/tools/devclaw-onboard.js"; import { createAnalyzeChannelBindingsTool } from "./lib/tools/analyze-channel-bindings.js"; import { createContextTestTool } from "./lib/tools/context-test.js"; +import { createHeartbeatTickTool } from "./lib/tools/heartbeat-tick.js"; import { registerCli } from "./lib/cli.js"; const plugin = { @@ -29,6 +30,12 @@ const plugin = { qa: { type: "string", description: "QA engineer model" }, }, }, + workMode: { + type: "string", + enum: ["parallel", "sequential"], + description: "Work mode: parallel (each project independent) or sequential (1 DEV + 1 QA globally)", + default: "parallel", + }, }, }, @@ -64,6 +71,9 @@ const plugin = { api.registerTool(createContextTestTool(api), { names: ["context_test"], }); + api.registerTool(createHeartbeatTickTool(api), { + names: ["heartbeat_tick"], + }); // CLI: `openclaw devclaw setup` api.registerCli(({ program }: { program: any }) => registerCli(program), { @@ -71,7 +81,7 @@ const plugin = { }); api.logger.info( - "DevClaw plugin registered (10 tools, 1 CLI command)", + "DevClaw plugin registered (11 tools, 1 CLI command)", ); }, }; diff --git a/lib/projects.ts b/lib/projects.ts index bf4135b..f85d50c 100644 --- a/lib/projects.ts +++ b/lib/projects.ts @@ -23,6 +23,8 @@ export type Project = { baseBranch: string; deployBranch: string; autoChain: boolean; + maxDevWorkers?: number; + maxQaWorkers?: number; dev: WorkerState; qa: WorkerState; }; diff --git a/lib/tools/heartbeat-tick.ts b/lib/tools/heartbeat-tick.ts new file mode 100644 index 0000000..acb4ab6 --- /dev/null +++ b/lib/tools/heartbeat-tick.ts @@ -0,0 +1,502 @@ +/** + * heartbeat_tick — Automated task pickup across all projects. + * + * Runs on heartbeat/cron context: + * 1. Clean zombie sessions (session_health logic) + * 2. Loop over all projects + * 3. Check worker slots per project + * 4. Pick up tasks by priority (To Improve > To Test > To Do) + * 5. Respect work mode (parallel vs sequential) + * 6. Return summary of actions taken + * + * Context guard: Only allows from DM/cron context, blocks project groups. + */ +import type { OpenClawPluginApi } from "openclaw/plugin-sdk"; +import { jsonResult } from "openclaw/plugin-sdk"; +import { dispatchTask } from "../dispatch.js"; +import { type Issue, type StateLabel } from "../task-managers/task-manager.js"; +import { createProvider } from "../task-managers/index.js"; +import { selectModel } from "../model-selector.js"; +import { + getProject, + getWorker, + getSessionForModel, + readProjects, + updateWorker, + type Project, +} from "../projects.js"; +import type { ToolContext } from "../types.js"; +import { detectContext, generateGuardrails } from "../context-guard.js"; +import { type Tier } from "../tiers.js"; +import { log as auditLog } from "../audit.js"; + +/** Labels that map to DEV role */ +const DEV_LABELS: StateLabel[] = ["To Do", "To Improve"]; + +/** Labels that map to QA role */ +const QA_LABELS: StateLabel[] = ["To Test"]; + +/** All pickable labels, in priority order (highest first) */ +const PRIORITY_ORDER: StateLabel[] = ["To Improve", "To Test", "To Do"]; + +/** Tier labels that can appear on issues */ +const TIER_LABELS: Tier[] = ["junior", "medior", "senior", "qa"]; + +type WorkMode = "parallel" | "sequential"; + +type PickupAction = { + project: string; + groupId: string; + issueId: number; + issueTitle: string; + role: "dev" | "qa"; + model: string; + sessionAction: "spawn" | "send"; + announcement: string; +}; + +type HealthFix = { + project: string; + role: "dev" | "qa"; + type: string; + fixed: boolean; +}; + +type TickResult = { + success: boolean; + dryRun: boolean; + workMode: WorkMode; + healthFixes: HealthFix[]; + pickups: PickupAction[]; + skipped: Array<{ project: string; reason: string }>; + globalState?: { activeDev: number; activeQa: number }; +}; + +/** + * Detect role from issue's current state label. + */ +function detectRoleFromLabel(label: StateLabel): "dev" | "qa" | null { + if (DEV_LABELS.includes(label)) return "dev"; + if (QA_LABELS.includes(label)) return "qa"; + return null; +} + +/** + * Detect tier from issue labels (e.g., "junior", "senior"). + */ +function detectTierFromLabels(labels: string[]): Tier | null { + const lowerLabels = labels.map((l) => l.toLowerCase()); + for (const tier of TIER_LABELS) { + if (lowerLabels.includes(tier)) { + return tier; + } + } + return null; +} + +/** + * Find the next issue to pick up by priority for a specific role. + */ +async function findNextIssueForRole( + provider: { listIssuesByLabel(label: StateLabel): Promise }, + role: "dev" | "qa", +): Promise<{ issue: Issue; label: StateLabel } | null> { + const labelsToCheck = + role === "dev" + ? PRIORITY_ORDER.filter((l) => DEV_LABELS.includes(l)) + : PRIORITY_ORDER.filter((l) => QA_LABELS.includes(l)); + + for (const label of labelsToCheck) { + try { + const issues = await provider.listIssuesByLabel(label); + if (issues.length > 0) { + // Return oldest issue first (FIFO) + const oldest = issues[issues.length - 1]; + return { issue: oldest, label }; + } + } catch { + // Continue to next label on error + } + } + return null; +} + +/** + * Run health check logic for a single project/role. + * Returns fixes applied (simplified version of session_health). + */ +async function checkAndFixWorkerHealth( + workspaceDir: string, + groupId: string, + project: Project, + role: "dev" | "qa", + activeSessions: string[], + autoFix: boolean, + provider: { transitionLabel(id: number, from: StateLabel, to: StateLabel): Promise }, +): Promise { + const fixes: HealthFix[] = []; + const worker = project[role]; + const currentSessionKey = worker.model + ? getSessionForModel(worker, worker.model) + : null; + + // Check 1: Active but no session key for current model + if (worker.active && !currentSessionKey) { + if (autoFix) { + await updateWorker(workspaceDir, groupId, role, { + active: false, + issueId: null, + }); + } + fixes.push({ + project: project.name, + role, + type: "active_no_session", + fixed: autoFix, + }); + } + + // Check 2: Active with session but session is dead (zombie) + if ( + worker.active && + currentSessionKey && + activeSessions.length > 0 && + !activeSessions.includes(currentSessionKey) + ) { + if (autoFix) { + // Revert issue label + const revertLabel: StateLabel = role === "dev" ? "To Do" : "To Test"; + const currentLabel: StateLabel = role === "dev" ? "Doing" : "Testing"; + try { + if (worker.issueId) { + const primaryIssueId = Number(worker.issueId.split(",")[0]); + await provider.transitionLabel(primaryIssueId, currentLabel, revertLabel); + } + } catch { + // Best-effort label revert + } + + // Clear the dead session + const updatedSessions = { ...worker.sessions }; + if (worker.model) { + updatedSessions[worker.model] = null; + } + + await updateWorker(workspaceDir, groupId, role, { + active: false, + issueId: null, + sessions: updatedSessions, + }); + } + fixes.push({ + project: project.name, + role, + type: "zombie_session", + fixed: autoFix, + }); + } + + // Check 3: Inactive but still has issueId + if (!worker.active && worker.issueId) { + if (autoFix) { + await updateWorker(workspaceDir, groupId, role, { + issueId: null, + }); + } + fixes.push({ + project: project.name, + role, + type: "inactive_with_issue", + fixed: autoFix, + }); + } + + return fixes; +} + +/** + * Get max workers for a role from project config (with defaults). + */ +function getMaxWorkers(project: Project, role: "dev" | "qa"): number { + const key = role === "dev" ? "maxDevWorkers" : "maxQaWorkers"; + const value = (project as Record)[key]; + return typeof value === "number" ? value : 1; +} + +export function createHeartbeatTickTool(api: OpenClawPluginApi) { + return (ctx: ToolContext) => ({ + name: "heartbeat_tick", + label: "Heartbeat Tick", + description: `Automated task pickup across all projects. Runs session health checks, then picks up tasks by priority (To Improve > To Test > To Do). Respects work mode (parallel: each project independent, sequential: 1 DEV + 1 QA globally). Only works from DM/cron context, not project groups.`, + parameters: { + type: "object", + properties: { + dryRun: { + type: "boolean", + description: "Report what would happen without actually picking up tasks. Default: false.", + }, + maxPickups: { + type: "number", + description: "Maximum number of task pickups per tick. Default: unlimited.", + }, + activeSessions: { + type: "array", + items: { type: "string" }, + description: "List of currently alive session IDs from sessions_list. Used for zombie detection.", + }, + }, + }, + + async execute(_id: string, params: Record) { + const dryRun = (params.dryRun as boolean) ?? false; + const maxPickups = params.maxPickups as number | undefined; + const activeSessions = (params.activeSessions as string[]) ?? []; + const workspaceDir = ctx.workspaceDir; + + if (!workspaceDir) { + throw new Error("No workspace directory available in tool context"); + } + + // --- Context detection --- + const devClawAgentIds = + ((api.pluginConfig as Record)?.devClawAgentIds as + | string[] + | undefined) ?? []; + const context = await detectContext(ctx, devClawAgentIds); + + // Only allow from DM or direct context (not project groups) + if (context.type === "group") { + return jsonResult({ + success: false, + error: "heartbeat_tick cannot be used in project group chats.", + recommendation: "Use this tool from a DM or cron context to manage all projects.", + contextGuidance: generateGuardrails(context), + }); + } + + // Get work mode from plugin config + const pluginConfig = api.pluginConfig as Record | undefined; + const workMode: WorkMode = + (pluginConfig?.workMode as WorkMode) ?? "parallel"; + + const result: TickResult = { + success: true, + dryRun, + workMode, + healthFixes: [], + pickups: [], + skipped: [], + }; + + // Read all projects + const data = await readProjects(workspaceDir); + const projectEntries = Object.entries(data.projects); + + if (projectEntries.length === 0) { + return jsonResult({ + ...result, + skipped: [{ project: "(none)", reason: "No projects registered" }], + }); + } + + // Track global worker counts for sequential mode + let globalActiveDev = 0; + let globalActiveQa = 0; + let pickupCount = 0; + + // First pass: count active workers and run health checks + for (const [groupId, project] of projectEntries) { + const { provider } = createProvider({ repo: project.repo }); + + // Health check for both roles + for (const role of ["dev", "qa"] as const) { + const fixes = await checkAndFixWorkerHealth( + workspaceDir, + groupId, + project, + role, + activeSessions, + !dryRun, // autoFix when not dryRun + provider, + ); + result.healthFixes.push(...fixes); + } + + // Re-read project after health fixes + const refreshedData = await readProjects(workspaceDir); + const refreshedProject = refreshedData.projects[groupId]; + if (refreshedProject) { + if (refreshedProject.dev.active) globalActiveDev++; + if (refreshedProject.qa.active) globalActiveQa++; + } + } + + // Second pass: pick up tasks + for (const [groupId, _project] of projectEntries) { + // Re-read to get post-health-fix state + const currentData = await readProjects(workspaceDir); + const project = currentData.projects[groupId]; + if (!project) continue; + + const { provider } = createProvider({ repo: project.repo }); + + // Check each role + for (const role of ["dev", "qa"] as const) { + // Check max pickups limit + if (maxPickups !== undefined && pickupCount >= maxPickups) { + result.skipped.push({ + project: project.name, + reason: `Max pickups (${maxPickups}) reached`, + }); + continue; + } + + // Check if worker slot is available + const worker = getWorker(project, role); + if (worker.active) { + result.skipped.push({ + project: project.name, + reason: `${role.toUpperCase()} already active (issue #${worker.issueId})`, + }); + continue; + } + + // Check max workers per project + const maxWorkers = getMaxWorkers(project, role); + // For now we only support 1 worker per role, but structure supports more + if (maxWorkers < 1) { + result.skipped.push({ + project: project.name, + reason: `${role.toUpperCase()} disabled (maxWorkers=0)`, + }); + continue; + } + + // Sequential mode: check global limits + if (workMode === "sequential") { + if (role === "dev" && globalActiveDev >= 1) { + result.skipped.push({ + project: project.name, + reason: "Sequential mode: DEV slot occupied globally", + }); + continue; + } + if (role === "qa" && globalActiveQa >= 1) { + result.skipped.push({ + project: project.name, + reason: "Sequential mode: QA slot occupied globally", + }); + continue; + } + } + + // Find next issue for this role + const next = await findNextIssueForRole(provider, role); + if (!next) { + // No tasks available - not a skip, just nothing to do + continue; + } + + const { issue, label: currentLabel } = next; + const targetLabel: StateLabel = role === "dev" ? "Doing" : "Testing"; + + // Select model + let modelAlias: string; + const tierFromLabels = detectTierFromLabels(issue.labels); + + if (tierFromLabels) { + // Validate tier matches role + if (role === "qa" && tierFromLabels !== "qa") { + modelAlias = "qa"; + } else if (role === "dev" && tierFromLabels === "qa") { + const selected = selectModel(issue.title, issue.description ?? "", role); + modelAlias = selected.tier; + } else { + modelAlias = tierFromLabels; + } + } else { + const selected = selectModel(issue.title, issue.description ?? "", role); + modelAlias = selected.tier; + } + + if (dryRun) { + // In dry run, just report what would happen + result.pickups.push({ + project: project.name, + groupId, + issueId: issue.iid, + issueTitle: issue.title, + role, + model: modelAlias, + sessionAction: getSessionForModel(worker, modelAlias) ? "send" : "spawn", + announcement: `[DRY RUN] Would pick up #${issue.iid}: ${issue.title}`, + }); + pickupCount++; + if (role === "dev") globalActiveDev++; + if (role === "qa") globalActiveQa++; + } else { + // Actually dispatch + try { + const dispatchResult = await dispatchTask({ + workspaceDir, + agentId: ctx.agentId, + groupId, + project, + issueId: issue.iid, + issueTitle: issue.title, + issueDescription: issue.description ?? "", + issueUrl: issue.web_url, + role, + modelAlias, + fromLabel: currentLabel, + toLabel: targetLabel, + transitionLabel: (id, from, to) => + provider.transitionLabel(id, from as StateLabel, to as StateLabel), + pluginConfig, + }); + + result.pickups.push({ + project: project.name, + groupId, + issueId: issue.iid, + issueTitle: issue.title, + role, + model: dispatchResult.modelAlias, + sessionAction: dispatchResult.sessionAction, + announcement: dispatchResult.announcement, + }); + pickupCount++; + if (role === "dev") globalActiveDev++; + if (role === "qa") globalActiveQa++; + } catch (err) { + result.skipped.push({ + project: project.name, + reason: `Dispatch failed for #${issue.iid}: ${(err as Error).message}`, + }); + } + } + } + } + + // Add global state for sequential mode visibility + if (workMode === "sequential") { + result.globalState = { + activeDev: globalActiveDev, + activeQa: globalActiveQa, + }; + } + + // Audit log + await auditLog(workspaceDir, "heartbeat_tick", { + dryRun, + workMode, + projectsScanned: projectEntries.length, + healthFixes: result.healthFixes.length, + pickups: result.pickups.length, + skipped: result.skipped.length, + }); + + return jsonResult(result); + }, + }); +}