Files
devclaw-gitea/lib/services/heartbeat.ts
Peter Foster b26ed1eb53 feat: add business hours scheduling to heartbeat
- Configurable start/end hours (0-23)
- Timezone support (default: UTC)
- Silent skip outside business hours (no log spam)
- Backward compatible (no schedule = always run)
2026-02-16 11:27:27 +00:00

418 lines
12 KiB
TypeScript

/**
* Heartbeat tick — token-free queue processing.
*
* Runs automatically via plugin service (periodic execution).
*
* Logic:
* 1. Health pass: auto-fix zombies, stale workers, orphaned state
* 2. Tick pass: fill free worker slots by priority
*
* Zero LLM tokens — all logic is deterministic code + CLI calls.
* Workers only consume tokens when they start processing dispatched tasks.
*/
import type { OpenClawPluginApi } from "openclaw/plugin-sdk";
import fs from "node:fs";
import path from "node:path";
import { readProjects } from "../projects.js";
import { log as auditLog } from "../audit.js";
import { DATA_DIR } from "../setup/migrate-layout.js";
import { checkWorkerHealth, scanOrphanedLabels, fetchGatewaySessions, type SessionLookup } from "./health.js";
import { projectTick } from "./tick.js";
import { reviewPass } from "./review.js";
import { createProvider } from "../providers/index.js";
import { loadConfig } from "../config/index.js";
import { ExecutionMode } from "../workflow.js";
// ---------------------------------------------------------------------------
// Types
// ---------------------------------------------------------------------------
export type HeartbeatConfig = {
enabled: boolean;
intervalSeconds: number;
maxPickupsPerTick: number;
schedule?: {
enabled_hours?: {
start: number; // 0-23
end: number; // 0-23
timezone?: string; // IANA timezone (default: UTC)
};
};
};
type Agent = {
agentId: string;
workspace: string;
};
type TickResult = {
totalPickups: number;
totalHealthFixes: number;
totalSkipped: number;
totalReviewTransitions: number;
};
type ServiceContext = {
logger: { info(msg: string): void; warn(msg: string): void; error(msg: string): void };
config: {
agents?: { list?: Array<{ id: string; workspace?: string }> };
};
};
// ---------------------------------------------------------------------------
// Config
// ---------------------------------------------------------------------------
export const HEARTBEAT_DEFAULTS: HeartbeatConfig = {
enabled: true,
intervalSeconds: 60,
maxPickupsPerTick: 4,
};
export function resolveHeartbeatConfig(
pluginConfig?: Record<string, unknown>,
): HeartbeatConfig {
const raw = pluginConfig?.work_heartbeat as Partial<HeartbeatConfig> | undefined;
return { ...HEARTBEAT_DEFAULTS, ...raw };
}
// ---------------------------------------------------------------------------
// Service
// ---------------------------------------------------------------------------
export function registerHeartbeatService(api: OpenClawPluginApi) {
let intervalId: ReturnType<typeof setInterval> | null = null;
api.registerService({
id: "devclaw-heartbeat",
start: async (ctx: ServiceContext) => {
const pluginConfig = api.pluginConfig as Record<string, unknown> | undefined;
const config = resolveHeartbeatConfig(pluginConfig);
if (!config.enabled) {
ctx.logger.info("work_heartbeat service disabled");
return;
}
const agents = discoverAgents(api.config);
if (agents.length === 0) {
ctx.logger.warn("work_heartbeat service: no DevClaw agents registered");
return;
}
ctx.logger.info(
`work_heartbeat service started: every ${config.intervalSeconds}s, ${agents.length} agents, max ${config.maxPickupsPerTick} pickups/tick`,
);
intervalId = setInterval(
() => runHeartbeatTick(agents, config, pluginConfig, ctx.logger),
config.intervalSeconds * 1000,
);
},
stop: async (ctx) => {
if (intervalId) {
clearInterval(intervalId);
intervalId = null;
ctx.logger.info("work_heartbeat service stopped");
}
},
});
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
/**
* Discover DevClaw agents by scanning which agent workspaces have projects.
* Self-discovering: any agent whose workspace contains projects.json is processed.
* Also checks the default workspace (agents.defaults.workspace) for projects.
*/
function discoverAgents(config: {
agents?: {
list?: Array<{ id: string; workspace?: string }>;
defaults?: { workspace?: string };
};
}): Agent[] {
const seen = new Set<string>();
const agents: Agent[] = [];
// Check explicit agent list
for (const a of config.agents?.list || []) {
if (!a.workspace) continue;
try {
if (hasProjects(a.workspace)) {
agents.push({ agentId: a.id, workspace: a.workspace });
seen.add(a.workspace);
}
} catch { /* skip */ }
}
// Check default workspace (used when no explicit agents are registered)
const defaultWorkspace = config.agents?.defaults?.workspace;
if (defaultWorkspace && !seen.has(defaultWorkspace)) {
try {
if (hasProjects(defaultWorkspace)) {
agents.push({ agentId: "main", workspace: defaultWorkspace });
}
} catch { /* skip */ }
}
return agents;
}
/** Check if a workspace has a projects.json (new or old locations). */
function hasProjects(workspace: string): boolean {
return (
fs.existsSync(path.join(workspace, DATA_DIR, "projects.json")) ||
fs.existsSync(path.join(workspace, "projects.json")) ||
fs.existsSync(path.join(workspace, "projects", "projects.json"))
);
}
/**
* Check if current time is within enabled hours.
*/
function isWithinBusinessHours(config: HeartbeatConfig): boolean {
if (!config.schedule?.enabled_hours) return true;
const { start, end, timezone = "UTC" } = config.schedule.enabled_hours;
// Get current hour in the configured timezone
const now = new Date();
const formatter = new Intl.DateTimeFormat("en-US", {
timeZone: timezone,
hour: "numeric",
hour12: false,
});
const currentHour = parseInt(formatter.format(now), 10);
return currentHour >= start && currentHour < end;
}
/**
* Run one heartbeat tick for all agents.
*/
async function runHeartbeatTick(
agents: Agent[],
config: HeartbeatConfig,
pluginConfig: Record<string, unknown> | undefined,
logger: ServiceContext["logger"],
): Promise<void> {
// Skip tick if outside business hours
if (!isWithinBusinessHours(config)) {
return; // Silent skip - no logging spam
}
try {
const result = await processAllAgents(agents, config, pluginConfig, logger);
logTickResult(result, logger);
} catch (err) {
logger.error(`work_heartbeat tick failed: ${err}`);
}
}
/**
* Process heartbeat tick for all agents and aggregate results.
*/
async function processAllAgents(
agents: Agent[],
config: HeartbeatConfig,
pluginConfig: Record<string, unknown> | undefined,
logger: ServiceContext["logger"],
): Promise<TickResult> {
const result: TickResult = {
totalPickups: 0,
totalHealthFixes: 0,
totalSkipped: 0,
totalReviewTransitions: 0,
};
// Fetch gateway sessions once for all agents/projects
const sessions = await fetchGatewaySessions();
for (const { agentId, workspace } of agents) {
const agentResult = await tick({
workspaceDir: workspace,
agentId,
config,
pluginConfig,
sessions,
logger,
});
result.totalPickups += agentResult.totalPickups;
result.totalHealthFixes += agentResult.totalHealthFixes;
result.totalSkipped += agentResult.totalSkipped;
result.totalReviewTransitions += agentResult.totalReviewTransitions;
}
return result;
}
/**
* Log tick results if anything happened.
*/
function logTickResult(result: TickResult, logger: ServiceContext["logger"]): void {
if (result.totalPickups > 0 || result.totalHealthFixes > 0 || result.totalReviewTransitions > 0) {
logger.info(
`work_heartbeat tick: ${result.totalPickups} pickups, ${result.totalHealthFixes} health fixes, ${result.totalReviewTransitions} review transitions, ${result.totalSkipped} skipped`,
);
}
}
// ---------------------------------------------------------------------------
// Tick (Main Heartbeat Loop)
// ---------------------------------------------------------------------------
export async function tick(opts: {
workspaceDir: string;
agentId?: string;
config: HeartbeatConfig;
pluginConfig?: Record<string, unknown>;
sessions: SessionLookup | null;
logger: { info(msg: string): void; warn(msg: string): void };
}): Promise<TickResult> {
const { workspaceDir, agentId, config, pluginConfig, sessions } = opts;
const data = await readProjects(workspaceDir);
const projectIds = Object.keys(data.projects);
if (projectIds.length === 0) {
return { totalPickups: 0, totalHealthFixes: 0, totalSkipped: 0, totalReviewTransitions: 0 };
}
const result: TickResult = {
totalPickups: 0,
totalHealthFixes: 0,
totalSkipped: 0,
totalReviewTransitions: 0,
};
const projectExecution = (pluginConfig?.projectExecution as string) ?? ExecutionMode.PARALLEL;
let activeProjects = 0;
for (const groupId of projectIds) {
try {
const project = data.projects[groupId];
if (!project) continue;
const { provider } = await createProvider({ repo: project.repo, provider: project.provider });
const resolvedConfig = await loadConfig(workspaceDir, project.name);
// Health pass: auto-fix zombies and stale workers
result.totalHealthFixes += await performHealthPass(
workspaceDir,
groupId,
project,
sessions,
provider,
resolvedConfig.timeouts.staleWorkerHours,
);
// Review pass: transition issues whose PR check condition is met
result.totalReviewTransitions += await reviewPass({
workspaceDir,
groupId,
workflow: resolvedConfig.workflow,
provider,
repoPath: project.repo,
gitPullTimeoutMs: resolvedConfig.timeouts.gitPullMs,
});
// Budget check: stop if we've hit the limit
const remaining = config.maxPickupsPerTick - result.totalPickups;
if (remaining <= 0) break;
// Sequential project guard: don't start new projects if one is active
const isProjectActive = await checkProjectActive(workspaceDir, groupId);
if (projectExecution === ExecutionMode.SEQUENTIAL && !isProjectActive && activeProjects >= 1) {
result.totalSkipped++;
continue;
}
// Tick pass: fill free worker slots
const tickResult = await projectTick({
workspaceDir,
groupId,
agentId,
pluginConfig,
maxPickups: remaining,
});
result.totalPickups += tickResult.pickups.length;
result.totalSkipped += tickResult.skipped.length;
// Notifications now handled by dispatchTask
if (isProjectActive || tickResult.pickups.length > 0) activeProjects++;
} catch (err) {
// Per-project isolation: one failing project doesn't crash the entire tick
opts.logger.warn(`Heartbeat tick failed for project ${groupId}: ${(err as Error).message}`);
result.totalSkipped++;
}
}
await auditLog(workspaceDir, "heartbeat_tick", {
projectsScanned: projectIds.length,
healthFixes: result.totalHealthFixes,
reviewTransitions: result.totalReviewTransitions,
pickups: result.totalPickups,
skipped: result.totalSkipped,
});
return result;
}
/**
* Run health checks and auto-fix for a project (dev + qa roles).
*/
async function performHealthPass(
workspaceDir: string,
groupId: string,
project: any,
sessions: SessionLookup | null,
provider: import("../providers/provider.js").IssueProvider,
staleWorkerHours?: number,
): Promise<number> {
let fixedCount = 0;
for (const role of Object.keys(project.workers)) {
// Check worker health (session liveness, label consistency, etc)
const healthFixes = await checkWorkerHealth({
workspaceDir,
groupId,
project,
role,
sessions,
autoFix: true,
provider,
staleWorkerHours,
});
fixedCount += healthFixes.filter((f) => f.fixed).length;
// Scan for orphaned labels (active labels with no tracking worker)
const orphanFixes = await scanOrphanedLabels({
workspaceDir,
groupId,
project,
role,
autoFix: true,
provider,
});
fixedCount += orphanFixes.filter((f) => f.fixed).length;
}
return fixedCount;
}
/**
* Check if a project has any active worker.
*/
async function checkProjectActive(workspaceDir: string, groupId: string): Promise<boolean> {
const fresh = (await readProjects(workspaceDir)).projects[groupId];
if (!fresh) return false;
return Object.values(fresh.workers).some(w => w.active);
}