Adds heartbeat_tick tool that automates task pickup across all projects: - Runs session health checks (zombie cleanup) before pickups - Loops over all registered projects - Picks up tasks by priority (To Improve > To Test > To Do) - Supports two work modes: - parallel: each project can have DEV+QA running independently - sequential: only 1 DEV + 1 QA globally (can be different projects) - Respects per-project maxDevWorkers/maxQaWorkers settings - Supports dryRun mode and maxPickups limit - Context guard: only works from DM/cron, blocks project groups Also adds: - workMode config option (parallel | sequential) - maxDevWorkers/maxQaWorkers fields to Project type
This commit is contained in:
12
index.ts
12
index.ts
@@ -9,6 +9,7 @@ import { createSetupTool } from "./lib/tools/devclaw-setup.js";
|
||||
import { createOnboardTool } from "./lib/tools/devclaw-onboard.js";
|
||||
import { createAnalyzeChannelBindingsTool } from "./lib/tools/analyze-channel-bindings.js";
|
||||
import { createContextTestTool } from "./lib/tools/context-test.js";
|
||||
import { createHeartbeatTickTool } from "./lib/tools/heartbeat-tick.js";
|
||||
import { registerCli } from "./lib/cli.js";
|
||||
|
||||
const plugin = {
|
||||
@@ -29,6 +30,12 @@ const plugin = {
|
||||
qa: { type: "string", description: "QA engineer model" },
|
||||
},
|
||||
},
|
||||
workMode: {
|
||||
type: "string",
|
||||
enum: ["parallel", "sequential"],
|
||||
description: "Work mode: parallel (each project independent) or sequential (1 DEV + 1 QA globally)",
|
||||
default: "parallel",
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
@@ -64,6 +71,9 @@ const plugin = {
|
||||
api.registerTool(createContextTestTool(api), {
|
||||
names: ["context_test"],
|
||||
});
|
||||
api.registerTool(createHeartbeatTickTool(api), {
|
||||
names: ["heartbeat_tick"],
|
||||
});
|
||||
|
||||
// CLI: `openclaw devclaw setup`
|
||||
api.registerCli(({ program }: { program: any }) => registerCli(program), {
|
||||
@@ -71,7 +81,7 @@ const plugin = {
|
||||
});
|
||||
|
||||
api.logger.info(
|
||||
"DevClaw plugin registered (10 tools, 1 CLI command)",
|
||||
"DevClaw plugin registered (11 tools, 1 CLI command)",
|
||||
);
|
||||
},
|
||||
};
|
||||
|
||||
@@ -23,6 +23,8 @@ export type Project = {
|
||||
baseBranch: string;
|
||||
deployBranch: string;
|
||||
autoChain: boolean;
|
||||
maxDevWorkers?: number;
|
||||
maxQaWorkers?: number;
|
||||
dev: WorkerState;
|
||||
qa: WorkerState;
|
||||
};
|
||||
|
||||
502
lib/tools/heartbeat-tick.ts
Normal file
502
lib/tools/heartbeat-tick.ts
Normal file
@@ -0,0 +1,502 @@
|
||||
/**
|
||||
* heartbeat_tick — Automated task pickup across all projects.
|
||||
*
|
||||
* Runs on heartbeat/cron context:
|
||||
* 1. Clean zombie sessions (session_health logic)
|
||||
* 2. Loop over all projects
|
||||
* 3. Check worker slots per project
|
||||
* 4. Pick up tasks by priority (To Improve > To Test > To Do)
|
||||
* 5. Respect work mode (parallel vs sequential)
|
||||
* 6. Return summary of actions taken
|
||||
*
|
||||
* Context guard: Only allows from DM/cron context, blocks project groups.
|
||||
*/
|
||||
import type { OpenClawPluginApi } from "openclaw/plugin-sdk";
|
||||
import { jsonResult } from "openclaw/plugin-sdk";
|
||||
import { dispatchTask } from "../dispatch.js";
|
||||
import { type Issue, type StateLabel } from "../task-managers/task-manager.js";
|
||||
import { createProvider } from "../task-managers/index.js";
|
||||
import { selectModel } from "../model-selector.js";
|
||||
import {
|
||||
getProject,
|
||||
getWorker,
|
||||
getSessionForModel,
|
||||
readProjects,
|
||||
updateWorker,
|
||||
type Project,
|
||||
} from "../projects.js";
|
||||
import type { ToolContext } from "../types.js";
|
||||
import { detectContext, generateGuardrails } from "../context-guard.js";
|
||||
import { type Tier } from "../tiers.js";
|
||||
import { log as auditLog } from "../audit.js";
|
||||
|
||||
/** Labels that map to DEV role */
|
||||
const DEV_LABELS: StateLabel[] = ["To Do", "To Improve"];
|
||||
|
||||
/** Labels that map to QA role */
|
||||
const QA_LABELS: StateLabel[] = ["To Test"];
|
||||
|
||||
/** All pickable labels, in priority order (highest first) */
|
||||
const PRIORITY_ORDER: StateLabel[] = ["To Improve", "To Test", "To Do"];
|
||||
|
||||
/** Tier labels that can appear on issues */
|
||||
const TIER_LABELS: Tier[] = ["junior", "medior", "senior", "qa"];
|
||||
|
||||
type WorkMode = "parallel" | "sequential";
|
||||
|
||||
type PickupAction = {
|
||||
project: string;
|
||||
groupId: string;
|
||||
issueId: number;
|
||||
issueTitle: string;
|
||||
role: "dev" | "qa";
|
||||
model: string;
|
||||
sessionAction: "spawn" | "send";
|
||||
announcement: string;
|
||||
};
|
||||
|
||||
type HealthFix = {
|
||||
project: string;
|
||||
role: "dev" | "qa";
|
||||
type: string;
|
||||
fixed: boolean;
|
||||
};
|
||||
|
||||
type TickResult = {
|
||||
success: boolean;
|
||||
dryRun: boolean;
|
||||
workMode: WorkMode;
|
||||
healthFixes: HealthFix[];
|
||||
pickups: PickupAction[];
|
||||
skipped: Array<{ project: string; reason: string }>;
|
||||
globalState?: { activeDev: number; activeQa: number };
|
||||
};
|
||||
|
||||
/**
|
||||
* Detect role from issue's current state label.
|
||||
*/
|
||||
function detectRoleFromLabel(label: StateLabel): "dev" | "qa" | null {
|
||||
if (DEV_LABELS.includes(label)) return "dev";
|
||||
if (QA_LABELS.includes(label)) return "qa";
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Detect tier from issue labels (e.g., "junior", "senior").
|
||||
*/
|
||||
function detectTierFromLabels(labels: string[]): Tier | null {
|
||||
const lowerLabels = labels.map((l) => l.toLowerCase());
|
||||
for (const tier of TIER_LABELS) {
|
||||
if (lowerLabels.includes(tier)) {
|
||||
return tier;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Find the next issue to pick up by priority for a specific role.
|
||||
*/
|
||||
async function findNextIssueForRole(
|
||||
provider: { listIssuesByLabel(label: StateLabel): Promise<Issue[]> },
|
||||
role: "dev" | "qa",
|
||||
): Promise<{ issue: Issue; label: StateLabel } | null> {
|
||||
const labelsToCheck =
|
||||
role === "dev"
|
||||
? PRIORITY_ORDER.filter((l) => DEV_LABELS.includes(l))
|
||||
: PRIORITY_ORDER.filter((l) => QA_LABELS.includes(l));
|
||||
|
||||
for (const label of labelsToCheck) {
|
||||
try {
|
||||
const issues = await provider.listIssuesByLabel(label);
|
||||
if (issues.length > 0) {
|
||||
// Return oldest issue first (FIFO)
|
||||
const oldest = issues[issues.length - 1];
|
||||
return { issue: oldest, label };
|
||||
}
|
||||
} catch {
|
||||
// Continue to next label on error
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Run health check logic for a single project/role.
|
||||
* Returns fixes applied (simplified version of session_health).
|
||||
*/
|
||||
async function checkAndFixWorkerHealth(
|
||||
workspaceDir: string,
|
||||
groupId: string,
|
||||
project: Project,
|
||||
role: "dev" | "qa",
|
||||
activeSessions: string[],
|
||||
autoFix: boolean,
|
||||
provider: { transitionLabel(id: number, from: StateLabel, to: StateLabel): Promise<void> },
|
||||
): Promise<HealthFix[]> {
|
||||
const fixes: HealthFix[] = [];
|
||||
const worker = project[role];
|
||||
const currentSessionKey = worker.model
|
||||
? getSessionForModel(worker, worker.model)
|
||||
: null;
|
||||
|
||||
// Check 1: Active but no session key for current model
|
||||
if (worker.active && !currentSessionKey) {
|
||||
if (autoFix) {
|
||||
await updateWorker(workspaceDir, groupId, role, {
|
||||
active: false,
|
||||
issueId: null,
|
||||
});
|
||||
}
|
||||
fixes.push({
|
||||
project: project.name,
|
||||
role,
|
||||
type: "active_no_session",
|
||||
fixed: autoFix,
|
||||
});
|
||||
}
|
||||
|
||||
// Check 2: Active with session but session is dead (zombie)
|
||||
if (
|
||||
worker.active &&
|
||||
currentSessionKey &&
|
||||
activeSessions.length > 0 &&
|
||||
!activeSessions.includes(currentSessionKey)
|
||||
) {
|
||||
if (autoFix) {
|
||||
// Revert issue label
|
||||
const revertLabel: StateLabel = role === "dev" ? "To Do" : "To Test";
|
||||
const currentLabel: StateLabel = role === "dev" ? "Doing" : "Testing";
|
||||
try {
|
||||
if (worker.issueId) {
|
||||
const primaryIssueId = Number(worker.issueId.split(",")[0]);
|
||||
await provider.transitionLabel(primaryIssueId, currentLabel, revertLabel);
|
||||
}
|
||||
} catch {
|
||||
// Best-effort label revert
|
||||
}
|
||||
|
||||
// Clear the dead session
|
||||
const updatedSessions = { ...worker.sessions };
|
||||
if (worker.model) {
|
||||
updatedSessions[worker.model] = null;
|
||||
}
|
||||
|
||||
await updateWorker(workspaceDir, groupId, role, {
|
||||
active: false,
|
||||
issueId: null,
|
||||
sessions: updatedSessions,
|
||||
});
|
||||
}
|
||||
fixes.push({
|
||||
project: project.name,
|
||||
role,
|
||||
type: "zombie_session",
|
||||
fixed: autoFix,
|
||||
});
|
||||
}
|
||||
|
||||
// Check 3: Inactive but still has issueId
|
||||
if (!worker.active && worker.issueId) {
|
||||
if (autoFix) {
|
||||
await updateWorker(workspaceDir, groupId, role, {
|
||||
issueId: null,
|
||||
});
|
||||
}
|
||||
fixes.push({
|
||||
project: project.name,
|
||||
role,
|
||||
type: "inactive_with_issue",
|
||||
fixed: autoFix,
|
||||
});
|
||||
}
|
||||
|
||||
return fixes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get max workers for a role from project config (with defaults).
|
||||
*/
|
||||
function getMaxWorkers(project: Project, role: "dev" | "qa"): number {
|
||||
const key = role === "dev" ? "maxDevWorkers" : "maxQaWorkers";
|
||||
const value = (project as Record<string, unknown>)[key];
|
||||
return typeof value === "number" ? value : 1;
|
||||
}
|
||||
|
||||
export function createHeartbeatTickTool(api: OpenClawPluginApi) {
|
||||
return (ctx: ToolContext) => ({
|
||||
name: "heartbeat_tick",
|
||||
label: "Heartbeat Tick",
|
||||
description: `Automated task pickup across all projects. Runs session health checks, then picks up tasks by priority (To Improve > To Test > To Do). Respects work mode (parallel: each project independent, sequential: 1 DEV + 1 QA globally). Only works from DM/cron context, not project groups.`,
|
||||
parameters: {
|
||||
type: "object",
|
||||
properties: {
|
||||
dryRun: {
|
||||
type: "boolean",
|
||||
description: "Report what would happen without actually picking up tasks. Default: false.",
|
||||
},
|
||||
maxPickups: {
|
||||
type: "number",
|
||||
description: "Maximum number of task pickups per tick. Default: unlimited.",
|
||||
},
|
||||
activeSessions: {
|
||||
type: "array",
|
||||
items: { type: "string" },
|
||||
description: "List of currently alive session IDs from sessions_list. Used for zombie detection.",
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
async execute(_id: string, params: Record<string, unknown>) {
|
||||
const dryRun = (params.dryRun as boolean) ?? false;
|
||||
const maxPickups = params.maxPickups as number | undefined;
|
||||
const activeSessions = (params.activeSessions as string[]) ?? [];
|
||||
const workspaceDir = ctx.workspaceDir;
|
||||
|
||||
if (!workspaceDir) {
|
||||
throw new Error("No workspace directory available in tool context");
|
||||
}
|
||||
|
||||
// --- Context detection ---
|
||||
const devClawAgentIds =
|
||||
((api.pluginConfig as Record<string, unknown>)?.devClawAgentIds as
|
||||
| string[]
|
||||
| undefined) ?? [];
|
||||
const context = await detectContext(ctx, devClawAgentIds);
|
||||
|
||||
// Only allow from DM or direct context (not project groups)
|
||||
if (context.type === "group") {
|
||||
return jsonResult({
|
||||
success: false,
|
||||
error: "heartbeat_tick cannot be used in project group chats.",
|
||||
recommendation: "Use this tool from a DM or cron context to manage all projects.",
|
||||
contextGuidance: generateGuardrails(context),
|
||||
});
|
||||
}
|
||||
|
||||
// Get work mode from plugin config
|
||||
const pluginConfig = api.pluginConfig as Record<string, unknown> | undefined;
|
||||
const workMode: WorkMode =
|
||||
(pluginConfig?.workMode as WorkMode) ?? "parallel";
|
||||
|
||||
const result: TickResult = {
|
||||
success: true,
|
||||
dryRun,
|
||||
workMode,
|
||||
healthFixes: [],
|
||||
pickups: [],
|
||||
skipped: [],
|
||||
};
|
||||
|
||||
// Read all projects
|
||||
const data = await readProjects(workspaceDir);
|
||||
const projectEntries = Object.entries(data.projects);
|
||||
|
||||
if (projectEntries.length === 0) {
|
||||
return jsonResult({
|
||||
...result,
|
||||
skipped: [{ project: "(none)", reason: "No projects registered" }],
|
||||
});
|
||||
}
|
||||
|
||||
// Track global worker counts for sequential mode
|
||||
let globalActiveDev = 0;
|
||||
let globalActiveQa = 0;
|
||||
let pickupCount = 0;
|
||||
|
||||
// First pass: count active workers and run health checks
|
||||
for (const [groupId, project] of projectEntries) {
|
||||
const { provider } = createProvider({ repo: project.repo });
|
||||
|
||||
// Health check for both roles
|
||||
for (const role of ["dev", "qa"] as const) {
|
||||
const fixes = await checkAndFixWorkerHealth(
|
||||
workspaceDir,
|
||||
groupId,
|
||||
project,
|
||||
role,
|
||||
activeSessions,
|
||||
!dryRun, // autoFix when not dryRun
|
||||
provider,
|
||||
);
|
||||
result.healthFixes.push(...fixes);
|
||||
}
|
||||
|
||||
// Re-read project after health fixes
|
||||
const refreshedData = await readProjects(workspaceDir);
|
||||
const refreshedProject = refreshedData.projects[groupId];
|
||||
if (refreshedProject) {
|
||||
if (refreshedProject.dev.active) globalActiveDev++;
|
||||
if (refreshedProject.qa.active) globalActiveQa++;
|
||||
}
|
||||
}
|
||||
|
||||
// Second pass: pick up tasks
|
||||
for (const [groupId, _project] of projectEntries) {
|
||||
// Re-read to get post-health-fix state
|
||||
const currentData = await readProjects(workspaceDir);
|
||||
const project = currentData.projects[groupId];
|
||||
if (!project) continue;
|
||||
|
||||
const { provider } = createProvider({ repo: project.repo });
|
||||
|
||||
// Check each role
|
||||
for (const role of ["dev", "qa"] as const) {
|
||||
// Check max pickups limit
|
||||
if (maxPickups !== undefined && pickupCount >= maxPickups) {
|
||||
result.skipped.push({
|
||||
project: project.name,
|
||||
reason: `Max pickups (${maxPickups}) reached`,
|
||||
});
|
||||
continue;
|
||||
}
|
||||
|
||||
// Check if worker slot is available
|
||||
const worker = getWorker(project, role);
|
||||
if (worker.active) {
|
||||
result.skipped.push({
|
||||
project: project.name,
|
||||
reason: `${role.toUpperCase()} already active (issue #${worker.issueId})`,
|
||||
});
|
||||
continue;
|
||||
}
|
||||
|
||||
// Check max workers per project
|
||||
const maxWorkers = getMaxWorkers(project, role);
|
||||
// For now we only support 1 worker per role, but structure supports more
|
||||
if (maxWorkers < 1) {
|
||||
result.skipped.push({
|
||||
project: project.name,
|
||||
reason: `${role.toUpperCase()} disabled (maxWorkers=0)`,
|
||||
});
|
||||
continue;
|
||||
}
|
||||
|
||||
// Sequential mode: check global limits
|
||||
if (workMode === "sequential") {
|
||||
if (role === "dev" && globalActiveDev >= 1) {
|
||||
result.skipped.push({
|
||||
project: project.name,
|
||||
reason: "Sequential mode: DEV slot occupied globally",
|
||||
});
|
||||
continue;
|
||||
}
|
||||
if (role === "qa" && globalActiveQa >= 1) {
|
||||
result.skipped.push({
|
||||
project: project.name,
|
||||
reason: "Sequential mode: QA slot occupied globally",
|
||||
});
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// Find next issue for this role
|
||||
const next = await findNextIssueForRole(provider, role);
|
||||
if (!next) {
|
||||
// No tasks available - not a skip, just nothing to do
|
||||
continue;
|
||||
}
|
||||
|
||||
const { issue, label: currentLabel } = next;
|
||||
const targetLabel: StateLabel = role === "dev" ? "Doing" : "Testing";
|
||||
|
||||
// Select model
|
||||
let modelAlias: string;
|
||||
const tierFromLabels = detectTierFromLabels(issue.labels);
|
||||
|
||||
if (tierFromLabels) {
|
||||
// Validate tier matches role
|
||||
if (role === "qa" && tierFromLabels !== "qa") {
|
||||
modelAlias = "qa";
|
||||
} else if (role === "dev" && tierFromLabels === "qa") {
|
||||
const selected = selectModel(issue.title, issue.description ?? "", role);
|
||||
modelAlias = selected.tier;
|
||||
} else {
|
||||
modelAlias = tierFromLabels;
|
||||
}
|
||||
} else {
|
||||
const selected = selectModel(issue.title, issue.description ?? "", role);
|
||||
modelAlias = selected.tier;
|
||||
}
|
||||
|
||||
if (dryRun) {
|
||||
// In dry run, just report what would happen
|
||||
result.pickups.push({
|
||||
project: project.name,
|
||||
groupId,
|
||||
issueId: issue.iid,
|
||||
issueTitle: issue.title,
|
||||
role,
|
||||
model: modelAlias,
|
||||
sessionAction: getSessionForModel(worker, modelAlias) ? "send" : "spawn",
|
||||
announcement: `[DRY RUN] Would pick up #${issue.iid}: ${issue.title}`,
|
||||
});
|
||||
pickupCount++;
|
||||
if (role === "dev") globalActiveDev++;
|
||||
if (role === "qa") globalActiveQa++;
|
||||
} else {
|
||||
// Actually dispatch
|
||||
try {
|
||||
const dispatchResult = await dispatchTask({
|
||||
workspaceDir,
|
||||
agentId: ctx.agentId,
|
||||
groupId,
|
||||
project,
|
||||
issueId: issue.iid,
|
||||
issueTitle: issue.title,
|
||||
issueDescription: issue.description ?? "",
|
||||
issueUrl: issue.web_url,
|
||||
role,
|
||||
modelAlias,
|
||||
fromLabel: currentLabel,
|
||||
toLabel: targetLabel,
|
||||
transitionLabel: (id, from, to) =>
|
||||
provider.transitionLabel(id, from as StateLabel, to as StateLabel),
|
||||
pluginConfig,
|
||||
});
|
||||
|
||||
result.pickups.push({
|
||||
project: project.name,
|
||||
groupId,
|
||||
issueId: issue.iid,
|
||||
issueTitle: issue.title,
|
||||
role,
|
||||
model: dispatchResult.modelAlias,
|
||||
sessionAction: dispatchResult.sessionAction,
|
||||
announcement: dispatchResult.announcement,
|
||||
});
|
||||
pickupCount++;
|
||||
if (role === "dev") globalActiveDev++;
|
||||
if (role === "qa") globalActiveQa++;
|
||||
} catch (err) {
|
||||
result.skipped.push({
|
||||
project: project.name,
|
||||
reason: `Dispatch failed for #${issue.iid}: ${(err as Error).message}`,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Add global state for sequential mode visibility
|
||||
if (workMode === "sequential") {
|
||||
result.globalState = {
|
||||
activeDev: globalActiveDev,
|
||||
activeQa: globalActiveQa,
|
||||
};
|
||||
}
|
||||
|
||||
// Audit log
|
||||
await auditLog(workspaceDir, "heartbeat_tick", {
|
||||
dryRun,
|
||||
workMode,
|
||||
projectsScanned: projectEntries.length,
|
||||
healthFixes: result.healthFixes.length,
|
||||
pickups: result.pickups.length,
|
||||
skipped: result.skipped.length,
|
||||
});
|
||||
|
||||
return jsonResult(result);
|
||||
},
|
||||
});
|
||||
}
|
||||
Reference in New Issue
Block a user