feat: implement work heartbeat service for health checks and task dispatching

- Introduced a new heartbeat service that runs at defined intervals to perform health checks on workers and fill available task slots based on priority.
- Added a health tool to scan worker health across projects with optional auto-fix capabilities.
- Updated the status tool to provide a lightweight overview of worker states and queue counts without health checks.
- Enhanced task creation tool descriptions to clarify task state handling.
- Implemented tests for the work heartbeat logic, ensuring proper project resolution, worker state management, and task prioritization.
This commit is contained in:
Lauren ten Hoor
2026-02-11 01:04:30 +08:00
parent 71a3ea2352
commit f2e71a35d8
13 changed files with 1044 additions and 426 deletions

180
lib/services/heartbeat.ts Normal file
View File

@@ -0,0 +1,180 @@
/**
* Heartbeat service — token-free interval-based queue processing.
*
* Runs as a plugin service (tied to gateway lifecycle). Every N seconds:
* 1. Health pass: auto-fix zombies, stale workers, orphaned state
* 2. Tick pass: fill free worker slots by priority
*
* Zero LLM tokens — all logic is deterministic code + CLI calls.
* Workers only consume tokens when they start processing dispatched tasks.
*/
import type { OpenClawPluginApi } from "openclaw/plugin-sdk";
import { readProjects, getProject } from "../projects.js";
import { log as auditLog } from "../audit.js";
import { checkWorkerHealth } from "./health.js";
import { projectTick } from "./tick.js";
import { createProvider } from "../providers/index.js";
// ---------------------------------------------------------------------------
// Config
// ---------------------------------------------------------------------------
export type HeartbeatConfig = {
enabled: boolean;
intervalSeconds: number;
maxPickupsPerTick: number;
};
export const HEARTBEAT_DEFAULTS: HeartbeatConfig = {
enabled: true,
intervalSeconds: 60,
maxPickupsPerTick: 4,
};
export function resolveHeartbeatConfig(
pluginConfig?: Record<string, unknown>,
): HeartbeatConfig {
const raw = pluginConfig?.work_heartbeat as Partial<HeartbeatConfig> | undefined;
return { ...HEARTBEAT_DEFAULTS, ...raw };
}
// ---------------------------------------------------------------------------
// Service
// ---------------------------------------------------------------------------
export function registerHeartbeatService(api: OpenClawPluginApi) {
let intervalId: ReturnType<typeof setInterval> | null = null;
api.registerService({
id: "devclaw-heartbeat",
start: async (ctx) => {
const pluginConfig = api.pluginConfig as Record<string, unknown> | undefined;
const config = resolveHeartbeatConfig(pluginConfig);
if (!config.enabled) {
ctx.logger.info("work_heartbeat service disabled");
return;
}
const workspaceDir = ctx.workspaceDir;
if (!workspaceDir) {
ctx.logger.warn("work_heartbeat: no workspaceDir — service not started");
return;
}
const agentId = resolveAgentId(pluginConfig);
ctx.logger.info(
`work_heartbeat service started: every ${config.intervalSeconds}s, max ${config.maxPickupsPerTick} pickups/tick`,
);
intervalId = setInterval(async () => {
try {
await tick({ workspaceDir, agentId, config, pluginConfig, logger: ctx.logger });
} catch (err) {
ctx.logger.error(`work_heartbeat tick failed: ${err}`);
}
}, config.intervalSeconds * 1000);
},
stop: async (ctx) => {
if (intervalId) {
clearInterval(intervalId);
intervalId = null;
ctx.logger.info("work_heartbeat service stopped");
}
},
});
}
// ---------------------------------------------------------------------------
// Tick
// ---------------------------------------------------------------------------
async function tick(opts: {
workspaceDir: string;
agentId?: string;
config: HeartbeatConfig;
pluginConfig?: Record<string, unknown>;
logger: { info(msg: string): void; warn(msg: string): void };
}) {
const { workspaceDir, agentId, config, pluginConfig, logger } = opts;
const data = await readProjects(workspaceDir);
const projectIds = Object.keys(data.projects);
if (projectIds.length === 0) return;
const projectExecution =
(pluginConfig?.projectExecution as string) ?? "parallel";
let totalPickups = 0;
let totalHealthFixes = 0;
let totalSkipped = 0;
let activeProjects = 0;
for (const groupId of projectIds) {
const project = data.projects[groupId];
if (!project) continue;
const { provider } = createProvider({ repo: project.repo });
// Health pass: auto-fix
for (const role of ["dev", "qa"] as const) {
const fixes = await checkWorkerHealth({
workspaceDir, groupId, project, role,
activeSessions: [], // No session list in service context
autoFix: true,
provider,
});
totalHealthFixes += fixes.filter((f) => f.fixed).length;
}
// Budget check
const remaining = config.maxPickupsPerTick - totalPickups;
if (remaining <= 0) break;
// Sequential project guard
const fresh = (await readProjects(workspaceDir)).projects[groupId];
if (!fresh) continue;
const projectActive = fresh.dev.active || fresh.qa.active;
if (projectExecution === "sequential" && !projectActive && activeProjects >= 1) {
totalSkipped++;
continue;
}
// Tick pass: fill free slots
const result = await projectTick({
workspaceDir, groupId, agentId,
pluginConfig,
maxPickups: remaining,
});
totalPickups += result.pickups.length;
totalSkipped += result.skipped.length;
if (projectActive || result.pickups.length > 0) activeProjects++;
}
// Audit (only when something happened)
if (totalPickups > 0 || totalHealthFixes > 0) {
logger.info(
`work_heartbeat tick: ${totalPickups} pickups, ${totalHealthFixes} health fixes, ${totalSkipped} skipped`,
);
}
await auditLog(workspaceDir, "heartbeat_tick", {
projectsScanned: projectIds.length,
healthFixes: totalHealthFixes,
pickups: totalPickups,
skipped: totalSkipped,
});
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
function resolveAgentId(pluginConfig?: Record<string, unknown>): string | undefined {
const ids = pluginConfig?.devClawAgentIds as string[] | undefined;
return ids?.[0];
}

View File

@@ -1,8 +1,8 @@
/**
* Queue service — task sequencing and priority logic.
* Queue service — issue queue fetching.
*
* Pure functions for scanning issue queues, building execution sequences,
* and formatting output. No tool registration or I/O concerns.
* Fetches issue queues per project from the issue provider.
* Pure functions, no tool registration or state mutation.
*/
import type { Issue } from "../providers/provider.js";
import { createProvider } from "../providers/index.js";
@@ -13,56 +13,6 @@ import type { Project } from "../projects.js";
// ---------------------------------------------------------------------------
export type QueueLabel = "To Improve" | "To Test" | "To Do";
export type Role = "dev" | "qa";
export interface SequencedTask {
sequence: number;
projectId: string;
projectName: string;
role: Role;
issueId: number;
title: string;
label: QueueLabel;
active: boolean;
}
export interface ProjectTrack {
name: string;
role: Role;
tasks: SequencedTask[];
}
export interface ProjectExecutionConfig {
name: string;
groupId: string;
roleExecution: "parallel" | "sequential";
devActive: boolean;
qaActive: boolean;
devIssueId: string | null;
qaIssueId: string | null;
}
export interface ProjectTaskSequence {
projectId: string;
projectName: string;
roleExecution: "parallel" | "sequential";
tracks: ProjectTrack[];
}
export interface GlobalTaskSequence {
mode: "sequential";
tasks: SequencedTask[];
}
export interface ProjectQueues {
projectId: string;
project: Project;
queues: Record<QueueLabel, Issue[]>;
}
// ---------------------------------------------------------------------------
// Constants & helpers
// ---------------------------------------------------------------------------
export const QUEUE_PRIORITY: Record<QueueLabel, number> = {
"To Improve": 3,
@@ -74,7 +24,7 @@ export function getTaskPriority(label: QueueLabel, issue: Issue): number {
return QUEUE_PRIORITY[label] * 10000 - issue.iid;
}
export function getRoleForLabel(label: QueueLabel): Role {
export function getRoleForLabel(label: QueueLabel): "dev" | "qa" {
return label === "To Test" ? "qa" : "dev";
}
@@ -97,151 +47,3 @@ export async function fetchProjectQueues(project: Project): Promise<Record<Queue
}
return queues;
}
// ---------------------------------------------------------------------------
// Track building
// ---------------------------------------------------------------------------
export function buildProjectTrack(
projectId: string, projectName: string, role: Role,
queues: Record<QueueLabel, Issue[]>,
isActive: boolean, activeIssueId: string | null,
startSeq: number,
): { track: ProjectTrack; nextSequence: number } {
const tasks: SequencedTask[] = [];
let seq = startSeq;
for (const label of ["To Improve", "To Test", "To Do"] as QueueLabel[]) {
if (getRoleForLabel(label) !== role) continue;
for (const issue of queues[label]) {
tasks.push({
sequence: seq++, projectId, projectName, role,
issueId: issue.iid, title: issue.title, label,
active: isActive && activeIssueId === String(issue.iid),
});
}
}
return { track: { name: role === "dev" ? "DEV Track" : "QA Track", role, tasks }, nextSequence: seq };
}
// ---------------------------------------------------------------------------
// Sequence building
// ---------------------------------------------------------------------------
export function buildParallelProjectSequences(projectQueues: ProjectQueues[]): ProjectTaskSequence[] {
return projectQueues.map(({ projectId, project, queues }) => {
const roleExecution = project.roleExecution ?? "parallel";
const tracks: ProjectTrack[] = [];
if (roleExecution === "sequential") {
// Build alternating DEV/QA sequence
const alternating = buildAlternatingTrack(projectId, project, queues);
if (alternating.tasks.length > 0) tracks.push(alternating);
} else {
const dev = buildProjectTrack(projectId, project.name, "dev", queues, project.dev.active, project.dev.issueId, 1);
const qa = buildProjectTrack(projectId, project.name, "qa", queues, project.qa.active, project.qa.issueId, 1);
if (dev.track.tasks.length > 0) tracks.push(dev.track);
if (qa.track.tasks.length > 0) tracks.push(qa.track);
}
return { projectId, projectName: project.name, roleExecution, tracks };
});
}
function buildAlternatingTrack(
projectId: string, project: Project, queues: Record<QueueLabel, Issue[]>,
): ProjectTrack {
const tasks: SequencedTask[] = [];
const added = new Set<number>();
let seq = 1;
const nextForRole = (role: Role): SequencedTask | null => {
for (const label of ["To Improve", "To Test", "To Do"] as QueueLabel[]) {
if (getRoleForLabel(label) !== role) continue;
for (const issue of queues[label]) {
if (added.has(issue.iid)) continue;
const isActive =
(role === "dev" && project.dev.active && project.dev.issueId === String(issue.iid)) ||
(role === "qa" && project.qa.active && project.qa.issueId === String(issue.iid));
return { sequence: 0, projectId, projectName: project.name, role, issueId: issue.iid, title: issue.title, label, active: isActive };
}
}
return null;
};
// Start with active task
for (const role of ["dev", "qa"] as Role[]) {
const w = project[role];
if (w.active && w.issueId) {
const t = nextForRole(role);
if (t) { t.sequence = seq++; t.active = true; tasks.push(t); added.add(t.issueId); break; }
}
}
// Alternate
let lastRole: Role | null = tasks[0]?.role ?? null;
while (true) {
const next = nextForRole(lastRole === "dev" ? "qa" : "dev");
if (!next) break;
next.sequence = seq++;
tasks.push(next);
added.add(next.issueId);
lastRole = next.role;
}
return { name: "DEV/QA Alternating", role: "dev", tasks };
}
export function buildGlobalTaskSequence(projectQueues: ProjectQueues[]): GlobalTaskSequence {
const all: Array<{ projectId: string; projectName: string; role: Role; label: QueueLabel; issue: Issue; priority: number }> = [];
for (const { projectId, project, queues } of projectQueues) {
for (const label of ["To Improve", "To Test", "To Do"] as QueueLabel[]) {
for (const issue of queues[label]) {
all.push({ projectId, projectName: project.name, role: getRoleForLabel(label), label, issue, priority: getTaskPriority(label, issue) });
}
}
}
all.sort((a, b) => b.priority !== a.priority ? b.priority - a.priority : a.issue.iid - b.issue.iid);
const tasks: SequencedTask[] = [];
const added = new Set<string>();
let seq = 1;
// Active task first
const active = projectQueues.find(({ project }) => project.dev.active || project.qa.active);
if (active) {
const { project, projectId } = active;
for (const [role, w] of [["dev", project.dev], ["qa", project.qa]] as const) {
if (w.active && w.issueId) {
const t = all.find((t) => t.projectId === projectId && t.role === role && String(t.issue.iid) === w.issueId);
if (t) {
const key = `${t.projectId}:${t.issue.iid}`;
tasks.push({ sequence: seq++, projectId: t.projectId, projectName: t.projectName, role: t.role, issueId: t.issue.iid, title: t.issue.title, label: t.label, active: true });
added.add(key);
break;
}
}
}
}
for (const t of all) {
const key = `${t.projectId}:${t.issue.iid}`;
if (added.has(key)) continue;
tasks.push({ sequence: seq++, projectId: t.projectId, projectName: t.projectName, role: t.role, issueId: t.issue.iid, title: t.issue.title, label: t.label, active: false });
added.add(key);
}
return { mode: "sequential", tasks };
}
// ---------------------------------------------------------------------------
// Formatting
// ---------------------------------------------------------------------------
export function formatProjectQueues(queues: Record<QueueLabel, Issue[]>) {
const fmt = (label: QueueLabel) => queues[label].map((i) => ({ id: i.iid, title: i.title, priority: QUEUE_PRIORITY[label] }));
return { toImprove: fmt("To Improve"), toTest: fmt("To Test"), toDo: fmt("To Do") };
}

View File

@@ -2,7 +2,7 @@
* tick.ts — Project-level queue scan + dispatch.
*
* Core function: projectTick() scans one project's queue and fills free worker slots.
* Called by: work_start (fill parallel slot), work_finish (next pipeline step), auto_pickup (sweep).
* Called by: work_start (fill parallel slot), work_finish (next pipeline step), work_heartbeat (sweep).
*/
import type { Issue, StateLabel } from "../providers/provider.js";
import type { IssueProvider } from "../providers/provider.js";
@@ -90,7 +90,7 @@ export type TickResult = {
/**
* Scan one project's queue and fill free worker slots.
*
* Does NOT run health checks (that's auto_pickup's job).
* Does NOT run health checks (that's work_heartbeat's job).
* Non-destructive: only dispatches if slots are free and issues are queued.
*/
export async function projectTick(opts: {
@@ -103,13 +103,15 @@ export async function projectTick(opts: {
maxPickups?: number;
/** Only attempt this role. Used by work_start to fill the other slot. */
targetRole?: "dev" | "qa";
/** Optional provider override (for testing). Uses createProvider if omitted. */
provider?: Pick<IssueProvider, "listIssuesByLabel" | "transitionLabel">;
}): Promise<TickResult> {
const { workspaceDir, groupId, agentId, sessionKey, pluginConfig, dryRun, maxPickups, targetRole } = opts;
const project = (await readProjects(workspaceDir)).projects[groupId];
if (!project) return { pickups: [], skipped: [{ reason: `Project not found: ${groupId}` }] };
const { provider } = createProvider({ repo: project.repo });
const provider = opts.provider ?? createProvider({ repo: project.repo }).provider;
const roleExecution = project.roleExecution ?? "parallel";
const roles: Array<"dev" | "qa"> = targetRole ? [targetRole] : ["dev", "qa"];