docs: enhance heartbeat service descriptions and CLI registration
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
/**
|
||||
* cli.ts — CLI registration for `openclaw devclaw setup`.
|
||||
* cli.ts — CLI registration for `openclaw devclaw setup` and `openclaw devclaw heartbeat`.
|
||||
*
|
||||
* Uses Commander.js (provided by OpenClaw plugin SDK context).
|
||||
*/
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
/**
|
||||
* Heartbeat service — token-free interval-based queue processing.
|
||||
* Heartbeat tick — token-free queue processing.
|
||||
*
|
||||
* Runs as a plugin service (tied to gateway lifecycle). Every N seconds:
|
||||
* Runs automatically via plugin service (periodic execution).
|
||||
*
|
||||
* Logic:
|
||||
* 1. Health pass: auto-fix zombies, stale workers, orphaned state
|
||||
* 2. Tick pass: fill free worker slots by priority
|
||||
*
|
||||
@@ -9,14 +11,14 @@
|
||||
* Workers only consume tokens when they start processing dispatched tasks.
|
||||
*/
|
||||
import type { OpenClawPluginApi } from "openclaw/plugin-sdk";
|
||||
import { readProjects, getProject } from "../projects.js";
|
||||
import { readProjects } from "../projects.js";
|
||||
import { log as auditLog } from "../audit.js";
|
||||
import { checkWorkerHealth } from "./health.js";
|
||||
import { projectTick } from "./tick.js";
|
||||
import { createProvider } from "../providers/index.js";
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Config
|
||||
// Types
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
export type HeartbeatConfig = {
|
||||
@@ -25,6 +27,31 @@ export type HeartbeatConfig = {
|
||||
maxPickupsPerTick: number;
|
||||
};
|
||||
|
||||
type Agent = {
|
||||
agentId: string;
|
||||
workspace: string;
|
||||
};
|
||||
|
||||
type TickResult = {
|
||||
totalPickups: number;
|
||||
totalHealthFixes: number;
|
||||
totalSkipped: number;
|
||||
};
|
||||
|
||||
type ServiceContext = {
|
||||
logger: { info(msg: string): void; warn(msg: string): void; error(msg: string): void };
|
||||
config: {
|
||||
agents?: { list?: Array<{ id: string; workspace?: string }> };
|
||||
plugins?: {
|
||||
entries?: { devclaw?: { config?: { devClawAgentIds?: string[] } } };
|
||||
};
|
||||
};
|
||||
};
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Config
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
export const HEARTBEAT_DEFAULTS: HeartbeatConfig = {
|
||||
enabled: true,
|
||||
intervalSeconds: 60,
|
||||
@@ -48,7 +75,7 @@ export function registerHeartbeatService(api: OpenClawPluginApi) {
|
||||
api.registerService({
|
||||
id: "devclaw-heartbeat",
|
||||
|
||||
start: async (ctx) => {
|
||||
start: async (ctx: ServiceContext) => {
|
||||
const pluginConfig = api.pluginConfig as Record<string, unknown> | undefined;
|
||||
const config = resolveHeartbeatConfig(pluginConfig);
|
||||
|
||||
@@ -57,25 +84,20 @@ export function registerHeartbeatService(api: OpenClawPluginApi) {
|
||||
return;
|
||||
}
|
||||
|
||||
const workspaceDir = ctx.workspaceDir;
|
||||
if (!workspaceDir) {
|
||||
ctx.logger.warn("work_heartbeat: no workspaceDir — service not started");
|
||||
const agents = discoverAgents(ctx.config);
|
||||
if (agents.length === 0) {
|
||||
ctx.logger.warn("work_heartbeat service: no DevClaw agents registered");
|
||||
return;
|
||||
}
|
||||
|
||||
const agentId = resolveAgentId(pluginConfig);
|
||||
|
||||
ctx.logger.info(
|
||||
`work_heartbeat service started: every ${config.intervalSeconds}s, max ${config.maxPickupsPerTick} pickups/tick`,
|
||||
`work_heartbeat service started: every ${config.intervalSeconds}s, ${agents.length} agents, max ${config.maxPickupsPerTick} pickups/tick`,
|
||||
);
|
||||
|
||||
intervalId = setInterval(async () => {
|
||||
try {
|
||||
await tick({ workspaceDir, agentId, config, pluginConfig, logger: ctx.logger });
|
||||
} catch (err) {
|
||||
ctx.logger.error(`work_heartbeat tick failed: ${err}`);
|
||||
}
|
||||
}, config.intervalSeconds * 1000);
|
||||
intervalId = setInterval(
|
||||
() => runHeartbeatTick(agents, config, pluginConfig, ctx.logger),
|
||||
config.intervalSeconds * 1000,
|
||||
);
|
||||
},
|
||||
|
||||
stop: async (ctx) => {
|
||||
@@ -89,92 +111,196 @@ export function registerHeartbeatService(api: OpenClawPluginApi) {
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Tick
|
||||
// Helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
async function tick(opts: {
|
||||
/**
|
||||
* Extract DevClaw agents from OpenClaw configuration.
|
||||
*/
|
||||
function discoverAgents(config: ServiceContext["config"]): Agent[] {
|
||||
const devClawAgentIds = config.plugins?.entries?.devclaw?.config?.devClawAgentIds || [];
|
||||
const agentsList = config.agents?.list || [];
|
||||
|
||||
return devClawAgentIds
|
||||
.map((agentId) => {
|
||||
const agent = agentsList.find((a) => a.id === agentId);
|
||||
return {
|
||||
agentId,
|
||||
workspace: agent?.workspace || "",
|
||||
};
|
||||
})
|
||||
.filter((a): a is Agent => !!a.workspace);
|
||||
}
|
||||
|
||||
/**
|
||||
* Run one heartbeat tick for all agents.
|
||||
*/
|
||||
async function runHeartbeatTick(
|
||||
agents: Agent[],
|
||||
config: HeartbeatConfig,
|
||||
pluginConfig: Record<string, unknown> | undefined,
|
||||
logger: ServiceContext["logger"],
|
||||
): Promise<void> {
|
||||
try {
|
||||
const result = await processAllAgents(agents, config, pluginConfig, logger);
|
||||
logTickResult(result, logger);
|
||||
} catch (err) {
|
||||
logger.error(`work_heartbeat tick failed: ${err}`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Process heartbeat tick for all agents and aggregate results.
|
||||
*/
|
||||
async function processAllAgents(
|
||||
agents: Agent[],
|
||||
config: HeartbeatConfig,
|
||||
pluginConfig: Record<string, unknown> | undefined,
|
||||
logger: ServiceContext["logger"],
|
||||
): Promise<TickResult> {
|
||||
const result: TickResult = {
|
||||
totalPickups: 0,
|
||||
totalHealthFixes: 0,
|
||||
totalSkipped: 0,
|
||||
};
|
||||
|
||||
for (const { agentId, workspace } of agents) {
|
||||
const agentResult = await tick({
|
||||
workspaceDir: workspace,
|
||||
agentId,
|
||||
config,
|
||||
pluginConfig,
|
||||
logger,
|
||||
});
|
||||
|
||||
result.totalPickups += agentResult.totalPickups;
|
||||
result.totalHealthFixes += agentResult.totalHealthFixes;
|
||||
result.totalSkipped += agentResult.totalSkipped;
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Log tick results if anything happened.
|
||||
*/
|
||||
function logTickResult(result: TickResult, logger: ServiceContext["logger"]): void {
|
||||
if (result.totalPickups > 0 || result.totalHealthFixes > 0) {
|
||||
logger.info(
|
||||
`work_heartbeat tick: ${result.totalPickups} pickups, ${result.totalHealthFixes} health fixes, ${result.totalSkipped} skipped`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Tick (Main Heartbeat Loop)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
export async function tick(opts: {
|
||||
workspaceDir: string;
|
||||
agentId?: string;
|
||||
config: HeartbeatConfig;
|
||||
pluginConfig?: Record<string, unknown>;
|
||||
logger: { info(msg: string): void; warn(msg: string): void };
|
||||
}) {
|
||||
const { workspaceDir, agentId, config, pluginConfig, logger } = opts;
|
||||
}): Promise<TickResult> {
|
||||
const { workspaceDir, agentId, config, pluginConfig } = opts;
|
||||
|
||||
const data = await readProjects(workspaceDir);
|
||||
const projectIds = Object.keys(data.projects);
|
||||
if (projectIds.length === 0) return;
|
||||
|
||||
const projectExecution =
|
||||
(pluginConfig?.projectExecution as string) ?? "parallel";
|
||||
if (projectIds.length === 0) {
|
||||
return { totalPickups: 0, totalHealthFixes: 0, totalSkipped: 0 };
|
||||
}
|
||||
|
||||
let totalPickups = 0;
|
||||
let totalHealthFixes = 0;
|
||||
let totalSkipped = 0;
|
||||
const result: TickResult = {
|
||||
totalPickups: 0,
|
||||
totalHealthFixes: 0,
|
||||
totalSkipped: 0,
|
||||
};
|
||||
|
||||
const projectExecution = (pluginConfig?.projectExecution as string) ?? "parallel";
|
||||
let activeProjects = 0;
|
||||
|
||||
for (const groupId of projectIds) {
|
||||
const project = data.projects[groupId];
|
||||
if (!project) continue;
|
||||
|
||||
const { provider } = createProvider({ repo: project.repo });
|
||||
// Health pass: auto-fix zombies and stale workers
|
||||
result.totalHealthFixes += await performHealthPass(
|
||||
workspaceDir,
|
||||
groupId,
|
||||
project,
|
||||
);
|
||||
|
||||
// Health pass: auto-fix
|
||||
for (const role of ["dev", "qa"] as const) {
|
||||
const fixes = await checkWorkerHealth({
|
||||
workspaceDir, groupId, project, role,
|
||||
activeSessions: [], // No session list in service context
|
||||
autoFix: true,
|
||||
provider,
|
||||
});
|
||||
totalHealthFixes += fixes.filter((f) => f.fixed).length;
|
||||
}
|
||||
|
||||
// Budget check
|
||||
const remaining = config.maxPickupsPerTick - totalPickups;
|
||||
// Budget check: stop if we've hit the limit
|
||||
const remaining = config.maxPickupsPerTick - result.totalPickups;
|
||||
if (remaining <= 0) break;
|
||||
|
||||
// Sequential project guard
|
||||
const fresh = (await readProjects(workspaceDir)).projects[groupId];
|
||||
if (!fresh) continue;
|
||||
const projectActive = fresh.dev.active || fresh.qa.active;
|
||||
if (projectExecution === "sequential" && !projectActive && activeProjects >= 1) {
|
||||
totalSkipped++;
|
||||
// Sequential project guard: don't start new projects if one is active
|
||||
const isProjectActive = await checkProjectActive(workspaceDir, groupId);
|
||||
if (projectExecution === "sequential" && !isProjectActive && activeProjects >= 1) {
|
||||
result.totalSkipped++;
|
||||
continue;
|
||||
}
|
||||
|
||||
// Tick pass: fill free slots
|
||||
const result = await projectTick({
|
||||
workspaceDir, groupId, agentId,
|
||||
// Tick pass: fill free worker slots
|
||||
const tickResult = await projectTick({
|
||||
workspaceDir,
|
||||
groupId,
|
||||
agentId,
|
||||
pluginConfig,
|
||||
maxPickups: remaining,
|
||||
});
|
||||
|
||||
totalPickups += result.pickups.length;
|
||||
totalSkipped += result.skipped.length;
|
||||
if (projectActive || result.pickups.length > 0) activeProjects++;
|
||||
}
|
||||
|
||||
// Audit (only when something happened)
|
||||
if (totalPickups > 0 || totalHealthFixes > 0) {
|
||||
logger.info(
|
||||
`work_heartbeat tick: ${totalPickups} pickups, ${totalHealthFixes} health fixes, ${totalSkipped} skipped`,
|
||||
);
|
||||
result.totalPickups += tickResult.pickups.length;
|
||||
result.totalSkipped += tickResult.skipped.length;
|
||||
if (isProjectActive || tickResult.pickups.length > 0) activeProjects++;
|
||||
}
|
||||
|
||||
await auditLog(workspaceDir, "heartbeat_tick", {
|
||||
projectsScanned: projectIds.length,
|
||||
healthFixes: totalHealthFixes,
|
||||
pickups: totalPickups,
|
||||
skipped: totalSkipped,
|
||||
healthFixes: result.totalHealthFixes,
|
||||
pickups: result.totalPickups,
|
||||
skipped: result.totalSkipped,
|
||||
});
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
/**
|
||||
* Run health checks and auto-fix for a project (dev + qa roles).
|
||||
*/
|
||||
async function performHealthPass(
|
||||
workspaceDir: string,
|
||||
groupId: string,
|
||||
project: any,
|
||||
): Promise<number> {
|
||||
const { provider } = createProvider({ repo: project.repo });
|
||||
let fixedCount = 0;
|
||||
|
||||
function resolveAgentId(pluginConfig?: Record<string, unknown>): string | undefined {
|
||||
const ids = pluginConfig?.devClawAgentIds as string[] | undefined;
|
||||
return ids?.[0];
|
||||
for (const role of ["dev", "qa"] as const) {
|
||||
const fixes = await checkWorkerHealth({
|
||||
workspaceDir,
|
||||
groupId,
|
||||
project,
|
||||
role,
|
||||
activeSessions: [],
|
||||
autoFix: true,
|
||||
provider,
|
||||
});
|
||||
fixedCount += fixes.filter((f) => f.fixed).length;
|
||||
}
|
||||
|
||||
return fixedCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a project has active work (dev or qa).
|
||||
*/
|
||||
async function checkProjectActive(workspaceDir: string, groupId: string): Promise<boolean> {
|
||||
const fresh = (await readProjects(workspaceDir)).projects[groupId];
|
||||
if (!fresh) return false;
|
||||
return fresh.dev.active || fresh.qa.active;
|
||||
}
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user