diff --git a/index.ts b/index.ts index f278475..dcc2236 100644 --- a/index.ts +++ b/index.ts @@ -63,22 +63,22 @@ const plugin = { work_heartbeat: { type: "object", description: - "Token-free interval-based heartbeat service. Runs health checks + queue dispatch automatically.", + "Token-free interval-based heartbeat service. Runs health checks + queue dispatch automatically. Discovers all DevClaw agents from openclaw.json and processes each independently. Can also be triggered on-demand via CLI command `devclaw heartbeat:tick`.", properties: { enabled: { type: "boolean", default: true, - description: "Enable the heartbeat service.", + description: "Enable automatic periodic heartbeat service. When disabled, heartbeat can still be run on-demand via `devclaw heartbeat:tick` CLI command.", }, intervalSeconds: { type: "number", default: 60, - description: "Seconds between ticks.", + description: "Seconds between automatic heartbeat ticks (only applies when service is enabled). Can be overridden per-tick via CLI option.", }, maxPickupsPerTick: { type: "number", default: 4, - description: "Max worker dispatches per tick.", + description: "Max worker dispatches per agent per tick. Applied to each DevClaw agent independently.", }, }, }, @@ -118,7 +118,7 @@ const plugin = { registerHeartbeatService(api); api.logger.info( - "DevClaw plugin registered (11 tools, 1 service, 1 CLI command)", + "DevClaw plugin registered (11 tools, 1 CLI command group, 1 service)", ); }, }; diff --git a/lib/cli.ts b/lib/cli.ts index 30c1dcd..a454d4e 100644 --- a/lib/cli.ts +++ b/lib/cli.ts @@ -1,5 +1,5 @@ /** - * cli.ts — CLI registration for `openclaw devclaw setup`. + * cli.ts — CLI registration for `openclaw devclaw setup` and `openclaw devclaw heartbeat`. * * Uses Commander.js (provided by OpenClaw plugin SDK context). */ diff --git a/lib/services/heartbeat.ts b/lib/services/heartbeat.ts index 5a27978..a80be84 100644 --- a/lib/services/heartbeat.ts +++ b/lib/services/heartbeat.ts @@ -1,7 +1,9 @@ /** - * Heartbeat service — token-free interval-based queue processing. + * Heartbeat tick — token-free queue processing. * - * Runs as a plugin service (tied to gateway lifecycle). Every N seconds: + * Runs automatically via plugin service (periodic execution). + * + * Logic: * 1. Health pass: auto-fix zombies, stale workers, orphaned state * 2. Tick pass: fill free worker slots by priority * @@ -9,14 +11,14 @@ * Workers only consume tokens when they start processing dispatched tasks. */ import type { OpenClawPluginApi } from "openclaw/plugin-sdk"; -import { readProjects, getProject } from "../projects.js"; +import { readProjects } from "../projects.js"; import { log as auditLog } from "../audit.js"; import { checkWorkerHealth } from "./health.js"; import { projectTick } from "./tick.js"; import { createProvider } from "../providers/index.js"; // --------------------------------------------------------------------------- -// Config +// Types // --------------------------------------------------------------------------- export type HeartbeatConfig = { @@ -25,6 +27,31 @@ export type HeartbeatConfig = { maxPickupsPerTick: number; }; +type Agent = { + agentId: string; + workspace: string; +}; + +type TickResult = { + totalPickups: number; + totalHealthFixes: number; + totalSkipped: number; +}; + +type ServiceContext = { + logger: { info(msg: string): void; warn(msg: string): void; error(msg: string): void }; + config: { + agents?: { list?: Array<{ id: string; workspace?: string }> }; + plugins?: { + entries?: { devclaw?: { config?: { devClawAgentIds?: string[] } } }; + }; + }; +}; + +// --------------------------------------------------------------------------- +// Config +// --------------------------------------------------------------------------- + export const HEARTBEAT_DEFAULTS: HeartbeatConfig = { enabled: true, intervalSeconds: 60, @@ -48,7 +75,7 @@ export function registerHeartbeatService(api: OpenClawPluginApi) { api.registerService({ id: "devclaw-heartbeat", - start: async (ctx) => { + start: async (ctx: ServiceContext) => { const pluginConfig = api.pluginConfig as Record | undefined; const config = resolveHeartbeatConfig(pluginConfig); @@ -57,25 +84,20 @@ export function registerHeartbeatService(api: OpenClawPluginApi) { return; } - const workspaceDir = ctx.workspaceDir; - if (!workspaceDir) { - ctx.logger.warn("work_heartbeat: no workspaceDir — service not started"); + const agents = discoverAgents(ctx.config); + if (agents.length === 0) { + ctx.logger.warn("work_heartbeat service: no DevClaw agents registered"); return; } - const agentId = resolveAgentId(pluginConfig); - ctx.logger.info( - `work_heartbeat service started: every ${config.intervalSeconds}s, max ${config.maxPickupsPerTick} pickups/tick`, + `work_heartbeat service started: every ${config.intervalSeconds}s, ${agents.length} agents, max ${config.maxPickupsPerTick} pickups/tick`, ); - intervalId = setInterval(async () => { - try { - await tick({ workspaceDir, agentId, config, pluginConfig, logger: ctx.logger }); - } catch (err) { - ctx.logger.error(`work_heartbeat tick failed: ${err}`); - } - }, config.intervalSeconds * 1000); + intervalId = setInterval( + () => runHeartbeatTick(agents, config, pluginConfig, ctx.logger), + config.intervalSeconds * 1000, + ); }, stop: async (ctx) => { @@ -89,92 +111,196 @@ export function registerHeartbeatService(api: OpenClawPluginApi) { } // --------------------------------------------------------------------------- -// Tick +// Helpers // --------------------------------------------------------------------------- -async function tick(opts: { +/** + * Extract DevClaw agents from OpenClaw configuration. + */ +function discoverAgents(config: ServiceContext["config"]): Agent[] { + const devClawAgentIds = config.plugins?.entries?.devclaw?.config?.devClawAgentIds || []; + const agentsList = config.agents?.list || []; + + return devClawAgentIds + .map((agentId) => { + const agent = agentsList.find((a) => a.id === agentId); + return { + agentId, + workspace: agent?.workspace || "", + }; + }) + .filter((a): a is Agent => !!a.workspace); +} + +/** + * Run one heartbeat tick for all agents. + */ +async function runHeartbeatTick( + agents: Agent[], + config: HeartbeatConfig, + pluginConfig: Record | undefined, + logger: ServiceContext["logger"], +): Promise { + try { + const result = await processAllAgents(agents, config, pluginConfig, logger); + logTickResult(result, logger); + } catch (err) { + logger.error(`work_heartbeat tick failed: ${err}`); + } +} + +/** + * Process heartbeat tick for all agents and aggregate results. + */ +async function processAllAgents( + agents: Agent[], + config: HeartbeatConfig, + pluginConfig: Record | undefined, + logger: ServiceContext["logger"], +): Promise { + const result: TickResult = { + totalPickups: 0, + totalHealthFixes: 0, + totalSkipped: 0, + }; + + for (const { agentId, workspace } of agents) { + const agentResult = await tick({ + workspaceDir: workspace, + agentId, + config, + pluginConfig, + logger, + }); + + result.totalPickups += agentResult.totalPickups; + result.totalHealthFixes += agentResult.totalHealthFixes; + result.totalSkipped += agentResult.totalSkipped; + } + + return result; +} + +/** + * Log tick results if anything happened. + */ +function logTickResult(result: TickResult, logger: ServiceContext["logger"]): void { + if (result.totalPickups > 0 || result.totalHealthFixes > 0) { + logger.info( + `work_heartbeat tick: ${result.totalPickups} pickups, ${result.totalHealthFixes} health fixes, ${result.totalSkipped} skipped`, + ); + } +} + +// --------------------------------------------------------------------------- +// Tick (Main Heartbeat Loop) +// --------------------------------------------------------------------------- + +export async function tick(opts: { workspaceDir: string; agentId?: string; config: HeartbeatConfig; pluginConfig?: Record; logger: { info(msg: string): void; warn(msg: string): void }; -}) { - const { workspaceDir, agentId, config, pluginConfig, logger } = opts; +}): Promise { + const { workspaceDir, agentId, config, pluginConfig } = opts; const data = await readProjects(workspaceDir); const projectIds = Object.keys(data.projects); - if (projectIds.length === 0) return; - const projectExecution = - (pluginConfig?.projectExecution as string) ?? "parallel"; + if (projectIds.length === 0) { + return { totalPickups: 0, totalHealthFixes: 0, totalSkipped: 0 }; + } - let totalPickups = 0; - let totalHealthFixes = 0; - let totalSkipped = 0; + const result: TickResult = { + totalPickups: 0, + totalHealthFixes: 0, + totalSkipped: 0, + }; + + const projectExecution = (pluginConfig?.projectExecution as string) ?? "parallel"; let activeProjects = 0; for (const groupId of projectIds) { const project = data.projects[groupId]; if (!project) continue; - const { provider } = createProvider({ repo: project.repo }); + // Health pass: auto-fix zombies and stale workers + result.totalHealthFixes += await performHealthPass( + workspaceDir, + groupId, + project, + ); - // Health pass: auto-fix - for (const role of ["dev", "qa"] as const) { - const fixes = await checkWorkerHealth({ - workspaceDir, groupId, project, role, - activeSessions: [], // No session list in service context - autoFix: true, - provider, - }); - totalHealthFixes += fixes.filter((f) => f.fixed).length; - } - - // Budget check - const remaining = config.maxPickupsPerTick - totalPickups; + // Budget check: stop if we've hit the limit + const remaining = config.maxPickupsPerTick - result.totalPickups; if (remaining <= 0) break; - // Sequential project guard - const fresh = (await readProjects(workspaceDir)).projects[groupId]; - if (!fresh) continue; - const projectActive = fresh.dev.active || fresh.qa.active; - if (projectExecution === "sequential" && !projectActive && activeProjects >= 1) { - totalSkipped++; + // Sequential project guard: don't start new projects if one is active + const isProjectActive = await checkProjectActive(workspaceDir, groupId); + if (projectExecution === "sequential" && !isProjectActive && activeProjects >= 1) { + result.totalSkipped++; continue; } - // Tick pass: fill free slots - const result = await projectTick({ - workspaceDir, groupId, agentId, + // Tick pass: fill free worker slots + const tickResult = await projectTick({ + workspaceDir, + groupId, + agentId, pluginConfig, maxPickups: remaining, }); - totalPickups += result.pickups.length; - totalSkipped += result.skipped.length; - if (projectActive || result.pickups.length > 0) activeProjects++; - } - - // Audit (only when something happened) - if (totalPickups > 0 || totalHealthFixes > 0) { - logger.info( - `work_heartbeat tick: ${totalPickups} pickups, ${totalHealthFixes} health fixes, ${totalSkipped} skipped`, - ); + result.totalPickups += tickResult.pickups.length; + result.totalSkipped += tickResult.skipped.length; + if (isProjectActive || tickResult.pickups.length > 0) activeProjects++; } await auditLog(workspaceDir, "heartbeat_tick", { projectsScanned: projectIds.length, - healthFixes: totalHealthFixes, - pickups: totalPickups, - skipped: totalSkipped, + healthFixes: result.totalHealthFixes, + pickups: result.totalPickups, + skipped: result.totalSkipped, }); + + return result; } -// --------------------------------------------------------------------------- -// Helpers -// --------------------------------------------------------------------------- +/** + * Run health checks and auto-fix for a project (dev + qa roles). + */ +async function performHealthPass( + workspaceDir: string, + groupId: string, + project: any, +): Promise { + const { provider } = createProvider({ repo: project.repo }); + let fixedCount = 0; -function resolveAgentId(pluginConfig?: Record): string | undefined { - const ids = pluginConfig?.devClawAgentIds as string[] | undefined; - return ids?.[0]; + for (const role of ["dev", "qa"] as const) { + const fixes = await checkWorkerHealth({ + workspaceDir, + groupId, + project, + role, + activeSessions: [], + autoFix: true, + provider, + }); + fixedCount += fixes.filter((f) => f.fixed).length; + } + + return fixedCount; } + +/** + * Check if a project has active work (dev or qa). + */ +async function checkProjectActive(workspaceDir: string, groupId: string): Promise { + const fresh = (await readProjects(workspaceDir)).projects[groupId]; + if (!fresh) return false; + return fresh.dev.active || fresh.qa.active; +} + +