Adds notification system for full visibility into the DevClaw pipeline: Events and targets: - workerStart: Posted to project group when worker spawns/resumes - workerComplete: Posted to project group when DEV done/QA pass/fail/refine - heartbeat: Posted to orchestrator DM with tick summary Implementation: - New lib/notify.ts module with buildMessage() and sendMessage() - Integrated into task_pickup, task_complete, and heartbeat_tick - Uses OpenClaw gateway to invoke message tool Configuration (optional): - orchestratorDm: Chat ID for heartbeat notifications - notifications.heartbeatDm: Enable/disable heartbeat DM (default: true) - notifications.workerStart: Enable/disable start notifications (default: true) - notifications.workerComplete: Enable/disable completion notifications (default: true) Notifications fail silently (logged but don't break main flow).
530 lines
16 KiB
TypeScript
530 lines
16 KiB
TypeScript
/**
|
|
* heartbeat_tick — Automated task pickup across all projects.
|
|
*
|
|
* Runs on heartbeat/cron context:
|
|
* 1. Clean zombie sessions (session_health logic)
|
|
* 2. Loop over all projects
|
|
* 3. Check worker slots per project
|
|
* 4. Pick up tasks by priority (To Improve > To Test > To Do)
|
|
* 5. Respect work mode (parallel vs sequential)
|
|
* 6. Return summary of actions taken
|
|
*
|
|
* Context guard: Only allows from DM/cron context, blocks project groups.
|
|
*/
|
|
import type { OpenClawPluginApi } from "openclaw/plugin-sdk";
|
|
import { jsonResult } from "openclaw/plugin-sdk";
|
|
import { dispatchTask } from "../dispatch.js";
|
|
import { type Issue, type StateLabel } from "../task-managers/task-manager.js";
|
|
import { createProvider } from "../task-managers/index.js";
|
|
import { selectModel } from "../model-selector.js";
|
|
import {
|
|
getProject,
|
|
getWorker,
|
|
getSessionForModel,
|
|
readProjects,
|
|
updateWorker,
|
|
type Project,
|
|
} from "../projects.js";
|
|
import type { ToolContext } from "../types.js";
|
|
import { detectContext, generateGuardrails } from "../context-guard.js";
|
|
import { type Tier } from "../tiers.js";
|
|
import { log as auditLog } from "../audit.js";
|
|
import { notify, getNotificationConfig } from "../notify.js";
|
|
|
|
/** Labels that map to DEV role */
|
|
const DEV_LABELS: StateLabel[] = ["To Do", "To Improve"];
|
|
|
|
/** Labels that map to QA role */
|
|
const QA_LABELS: StateLabel[] = ["To Test"];
|
|
|
|
/** All pickable labels, in priority order (highest first) */
|
|
const PRIORITY_ORDER: StateLabel[] = ["To Improve", "To Test", "To Do"];
|
|
|
|
/** Tier labels that can appear on issues */
|
|
const TIER_LABELS: Tier[] = ["junior", "medior", "senior", "qa"];
|
|
|
|
type WorkMode = "parallel" | "sequential";
|
|
|
|
type PickupAction = {
|
|
project: string;
|
|
groupId: string;
|
|
issueId: number;
|
|
issueTitle: string;
|
|
role: "dev" | "qa";
|
|
model: string;
|
|
sessionAction: "spawn" | "send";
|
|
announcement: string;
|
|
};
|
|
|
|
type HealthFix = {
|
|
project: string;
|
|
role: "dev" | "qa";
|
|
type: string;
|
|
fixed: boolean;
|
|
};
|
|
|
|
type TickResult = {
|
|
success: boolean;
|
|
dryRun: boolean;
|
|
workMode: WorkMode;
|
|
healthFixes: HealthFix[];
|
|
pickups: PickupAction[];
|
|
skipped: Array<{ project: string; reason: string }>;
|
|
globalState?: { activeDev: number; activeQa: number };
|
|
};
|
|
|
|
/**
|
|
* Detect role from issue's current state label.
|
|
*/
|
|
function detectRoleFromLabel(label: StateLabel): "dev" | "qa" | null {
|
|
if (DEV_LABELS.includes(label)) return "dev";
|
|
if (QA_LABELS.includes(label)) return "qa";
|
|
return null;
|
|
}
|
|
|
|
/**
|
|
* Detect tier from issue labels (e.g., "junior", "senior").
|
|
*/
|
|
function detectTierFromLabels(labels: string[]): Tier | null {
|
|
const lowerLabels = labels.map((l) => l.toLowerCase());
|
|
for (const tier of TIER_LABELS) {
|
|
if (lowerLabels.includes(tier)) {
|
|
return tier;
|
|
}
|
|
}
|
|
return null;
|
|
}
|
|
|
|
/**
|
|
* Find the next issue to pick up by priority for a specific role.
|
|
*/
|
|
async function findNextIssueForRole(
|
|
provider: { listIssuesByLabel(label: StateLabel): Promise<Issue[]> },
|
|
role: "dev" | "qa",
|
|
): Promise<{ issue: Issue; label: StateLabel } | null> {
|
|
const labelsToCheck =
|
|
role === "dev"
|
|
? PRIORITY_ORDER.filter((l) => DEV_LABELS.includes(l))
|
|
: PRIORITY_ORDER.filter((l) => QA_LABELS.includes(l));
|
|
|
|
for (const label of labelsToCheck) {
|
|
try {
|
|
const issues = await provider.listIssuesByLabel(label);
|
|
if (issues.length > 0) {
|
|
// Return oldest issue first (FIFO)
|
|
const oldest = issues[issues.length - 1];
|
|
return { issue: oldest, label };
|
|
}
|
|
} catch {
|
|
// Continue to next label on error
|
|
}
|
|
}
|
|
return null;
|
|
}
|
|
|
|
/**
|
|
* Run health check logic for a single project/role.
|
|
* Returns fixes applied (simplified version of session_health).
|
|
*/
|
|
async function checkAndFixWorkerHealth(
|
|
workspaceDir: string,
|
|
groupId: string,
|
|
project: Project,
|
|
role: "dev" | "qa",
|
|
activeSessions: string[],
|
|
autoFix: boolean,
|
|
provider: { transitionLabel(id: number, from: StateLabel, to: StateLabel): Promise<void> },
|
|
): Promise<HealthFix[]> {
|
|
const fixes: HealthFix[] = [];
|
|
const worker = project[role];
|
|
const currentSessionKey = worker.model
|
|
? getSessionForModel(worker, worker.model)
|
|
: null;
|
|
|
|
// Check 1: Active but no session key for current model
|
|
if (worker.active && !currentSessionKey) {
|
|
if (autoFix) {
|
|
await updateWorker(workspaceDir, groupId, role, {
|
|
active: false,
|
|
issueId: null,
|
|
});
|
|
}
|
|
fixes.push({
|
|
project: project.name,
|
|
role,
|
|
type: "active_no_session",
|
|
fixed: autoFix,
|
|
});
|
|
}
|
|
|
|
// Check 2: Active with session but session is dead (zombie)
|
|
if (
|
|
worker.active &&
|
|
currentSessionKey &&
|
|
activeSessions.length > 0 &&
|
|
!activeSessions.includes(currentSessionKey)
|
|
) {
|
|
if (autoFix) {
|
|
// Revert issue label
|
|
const revertLabel: StateLabel = role === "dev" ? "To Do" : "To Test";
|
|
const currentLabel: StateLabel = role === "dev" ? "Doing" : "Testing";
|
|
try {
|
|
if (worker.issueId) {
|
|
const primaryIssueId = Number(worker.issueId.split(",")[0]);
|
|
await provider.transitionLabel(primaryIssueId, currentLabel, revertLabel);
|
|
}
|
|
} catch {
|
|
// Best-effort label revert
|
|
}
|
|
|
|
// Clear the dead session
|
|
const updatedSessions = { ...worker.sessions };
|
|
if (worker.model) {
|
|
updatedSessions[worker.model] = null;
|
|
}
|
|
|
|
await updateWorker(workspaceDir, groupId, role, {
|
|
active: false,
|
|
issueId: null,
|
|
sessions: updatedSessions,
|
|
});
|
|
}
|
|
fixes.push({
|
|
project: project.name,
|
|
role,
|
|
type: "zombie_session",
|
|
fixed: autoFix,
|
|
});
|
|
}
|
|
|
|
// Check 3: Inactive but still has issueId
|
|
if (!worker.active && worker.issueId) {
|
|
if (autoFix) {
|
|
await updateWorker(workspaceDir, groupId, role, {
|
|
issueId: null,
|
|
});
|
|
}
|
|
fixes.push({
|
|
project: project.name,
|
|
role,
|
|
type: "inactive_with_issue",
|
|
fixed: autoFix,
|
|
});
|
|
}
|
|
|
|
return fixes;
|
|
}
|
|
|
|
/**
|
|
* Get max workers for a role from project config (with defaults).
|
|
*/
|
|
function getMaxWorkers(project: Project, role: "dev" | "qa"): number {
|
|
const key = role === "dev" ? "maxDevWorkers" : "maxQaWorkers";
|
|
const value = (project as Record<string, unknown>)[key];
|
|
return typeof value === "number" ? value : 1;
|
|
}
|
|
|
|
export function createHeartbeatTickTool(api: OpenClawPluginApi) {
|
|
return (ctx: ToolContext) => ({
|
|
name: "heartbeat_tick",
|
|
label: "Heartbeat Tick",
|
|
description: `Automated task pickup across all projects. Runs session health checks, then picks up tasks by priority (To Improve > To Test > To Do). Respects work mode (parallel: each project independent, sequential: 1 DEV + 1 QA globally). Only works from DM/cron context, not project groups.`,
|
|
parameters: {
|
|
type: "object",
|
|
properties: {
|
|
dryRun: {
|
|
type: "boolean",
|
|
description: "Report what would happen without actually picking up tasks. Default: false.",
|
|
},
|
|
maxPickups: {
|
|
type: "number",
|
|
description: "Maximum number of task pickups per tick. Default: unlimited.",
|
|
},
|
|
activeSessions: {
|
|
type: "array",
|
|
items: { type: "string" },
|
|
description: "List of currently alive session IDs from sessions_list. Used for zombie detection.",
|
|
},
|
|
},
|
|
},
|
|
|
|
async execute(_id: string, params: Record<string, unknown>) {
|
|
const dryRun = (params.dryRun as boolean) ?? false;
|
|
const maxPickups = params.maxPickups as number | undefined;
|
|
const activeSessions = (params.activeSessions as string[]) ?? [];
|
|
const workspaceDir = ctx.workspaceDir;
|
|
|
|
if (!workspaceDir) {
|
|
throw new Error("No workspace directory available in tool context");
|
|
}
|
|
|
|
// --- Context detection ---
|
|
const devClawAgentIds =
|
|
((api.pluginConfig as Record<string, unknown>)?.devClawAgentIds as
|
|
| string[]
|
|
| undefined) ?? [];
|
|
const context = await detectContext(ctx, devClawAgentIds);
|
|
|
|
// Only allow from DM or direct context (not project groups)
|
|
if (context.type === "group") {
|
|
return jsonResult({
|
|
success: false,
|
|
error: "heartbeat_tick cannot be used in project group chats.",
|
|
recommendation: "Use this tool from a DM or cron context to manage all projects.",
|
|
contextGuidance: generateGuardrails(context),
|
|
});
|
|
}
|
|
|
|
// Get work mode from plugin config
|
|
const pluginConfig = api.pluginConfig as Record<string, unknown> | undefined;
|
|
const workMode: WorkMode =
|
|
(pluginConfig?.workMode as WorkMode) ?? "parallel";
|
|
|
|
const result: TickResult = {
|
|
success: true,
|
|
dryRun,
|
|
workMode,
|
|
healthFixes: [],
|
|
pickups: [],
|
|
skipped: [],
|
|
};
|
|
|
|
// Read all projects
|
|
const data = await readProjects(workspaceDir);
|
|
const projectEntries = Object.entries(data.projects);
|
|
|
|
if (projectEntries.length === 0) {
|
|
return jsonResult({
|
|
...result,
|
|
skipped: [{ project: "(none)", reason: "No projects registered" }],
|
|
});
|
|
}
|
|
|
|
// Track global worker counts for sequential mode
|
|
let globalActiveDev = 0;
|
|
let globalActiveQa = 0;
|
|
let pickupCount = 0;
|
|
|
|
// First pass: count active workers and run health checks
|
|
for (const [groupId, project] of projectEntries) {
|
|
const { provider } = createProvider({ repo: project.repo });
|
|
|
|
// Health check for both roles
|
|
for (const role of ["dev", "qa"] as const) {
|
|
const fixes = await checkAndFixWorkerHealth(
|
|
workspaceDir,
|
|
groupId,
|
|
project,
|
|
role,
|
|
activeSessions,
|
|
!dryRun, // autoFix when not dryRun
|
|
provider,
|
|
);
|
|
result.healthFixes.push(...fixes);
|
|
}
|
|
|
|
// Re-read project after health fixes
|
|
const refreshedData = await readProjects(workspaceDir);
|
|
const refreshedProject = refreshedData.projects[groupId];
|
|
if (refreshedProject) {
|
|
if (refreshedProject.dev.active) globalActiveDev++;
|
|
if (refreshedProject.qa.active) globalActiveQa++;
|
|
}
|
|
}
|
|
|
|
// Second pass: pick up tasks
|
|
for (const [groupId, _project] of projectEntries) {
|
|
// Re-read to get post-health-fix state
|
|
const currentData = await readProjects(workspaceDir);
|
|
const project = currentData.projects[groupId];
|
|
if (!project) continue;
|
|
|
|
const { provider } = createProvider({ repo: project.repo });
|
|
|
|
// Check each role
|
|
for (const role of ["dev", "qa"] as const) {
|
|
// Check max pickups limit
|
|
if (maxPickups !== undefined && pickupCount >= maxPickups) {
|
|
result.skipped.push({
|
|
project: project.name,
|
|
reason: `Max pickups (${maxPickups}) reached`,
|
|
});
|
|
continue;
|
|
}
|
|
|
|
// Check if worker slot is available
|
|
const worker = getWorker(project, role);
|
|
if (worker.active) {
|
|
result.skipped.push({
|
|
project: project.name,
|
|
reason: `${role.toUpperCase()} already active (issue #${worker.issueId})`,
|
|
});
|
|
continue;
|
|
}
|
|
|
|
// Check max workers per project
|
|
const maxWorkers = getMaxWorkers(project, role);
|
|
// For now we only support 1 worker per role, but structure supports more
|
|
if (maxWorkers < 1) {
|
|
result.skipped.push({
|
|
project: project.name,
|
|
reason: `${role.toUpperCase()} disabled (maxWorkers=0)`,
|
|
});
|
|
continue;
|
|
}
|
|
|
|
// Sequential mode: check global limits
|
|
if (workMode === "sequential") {
|
|
if (role === "dev" && globalActiveDev >= 1) {
|
|
result.skipped.push({
|
|
project: project.name,
|
|
reason: "Sequential mode: DEV slot occupied globally",
|
|
});
|
|
continue;
|
|
}
|
|
if (role === "qa" && globalActiveQa >= 1) {
|
|
result.skipped.push({
|
|
project: project.name,
|
|
reason: "Sequential mode: QA slot occupied globally",
|
|
});
|
|
continue;
|
|
}
|
|
}
|
|
|
|
// Find next issue for this role
|
|
const next = await findNextIssueForRole(provider, role);
|
|
if (!next) {
|
|
// No tasks available - not a skip, just nothing to do
|
|
continue;
|
|
}
|
|
|
|
const { issue, label: currentLabel } = next;
|
|
const targetLabel: StateLabel = role === "dev" ? "Doing" : "Testing";
|
|
|
|
// Select model
|
|
let modelAlias: string;
|
|
const tierFromLabels = detectTierFromLabels(issue.labels);
|
|
|
|
if (tierFromLabels) {
|
|
// Validate tier matches role
|
|
if (role === "qa" && tierFromLabels !== "qa") {
|
|
modelAlias = "qa";
|
|
} else if (role === "dev" && tierFromLabels === "qa") {
|
|
const selected = selectModel(issue.title, issue.description ?? "", role);
|
|
modelAlias = selected.tier;
|
|
} else {
|
|
modelAlias = tierFromLabels;
|
|
}
|
|
} else {
|
|
const selected = selectModel(issue.title, issue.description ?? "", role);
|
|
modelAlias = selected.tier;
|
|
}
|
|
|
|
if (dryRun) {
|
|
// In dry run, just report what would happen
|
|
result.pickups.push({
|
|
project: project.name,
|
|
groupId,
|
|
issueId: issue.iid,
|
|
issueTitle: issue.title,
|
|
role,
|
|
model: modelAlias,
|
|
sessionAction: getSessionForModel(worker, modelAlias) ? "send" : "spawn",
|
|
announcement: `[DRY RUN] Would pick up #${issue.iid}: ${issue.title}`,
|
|
});
|
|
pickupCount++;
|
|
if (role === "dev") globalActiveDev++;
|
|
if (role === "qa") globalActiveQa++;
|
|
} else {
|
|
// Actually dispatch
|
|
try {
|
|
const dispatchResult = await dispatchTask({
|
|
workspaceDir,
|
|
agentId: ctx.agentId,
|
|
groupId,
|
|
project,
|
|
issueId: issue.iid,
|
|
issueTitle: issue.title,
|
|
issueDescription: issue.description ?? "",
|
|
issueUrl: issue.web_url,
|
|
role,
|
|
modelAlias,
|
|
fromLabel: currentLabel,
|
|
toLabel: targetLabel,
|
|
transitionLabel: (id, from, to) =>
|
|
provider.transitionLabel(id, from as StateLabel, to as StateLabel),
|
|
pluginConfig,
|
|
});
|
|
|
|
result.pickups.push({
|
|
project: project.name,
|
|
groupId,
|
|
issueId: issue.iid,
|
|
issueTitle: issue.title,
|
|
role,
|
|
model: dispatchResult.modelAlias,
|
|
sessionAction: dispatchResult.sessionAction,
|
|
announcement: dispatchResult.announcement,
|
|
});
|
|
pickupCount++;
|
|
if (role === "dev") globalActiveDev++;
|
|
if (role === "qa") globalActiveQa++;
|
|
} catch (err) {
|
|
result.skipped.push({
|
|
project: project.name,
|
|
reason: `Dispatch failed for #${issue.iid}: ${(err as Error).message}`,
|
|
});
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Add global state for sequential mode visibility
|
|
if (workMode === "sequential") {
|
|
result.globalState = {
|
|
activeDev: globalActiveDev,
|
|
activeQa: globalActiveQa,
|
|
};
|
|
}
|
|
|
|
// Audit log
|
|
await auditLog(workspaceDir, "heartbeat_tick", {
|
|
dryRun,
|
|
workMode,
|
|
projectsScanned: projectEntries.length,
|
|
healthFixes: result.healthFixes.length,
|
|
pickups: result.pickups.length,
|
|
skipped: result.skipped.length,
|
|
});
|
|
|
|
// Send heartbeat notification to orchestrator DM
|
|
const notifyConfig = getNotificationConfig(pluginConfig);
|
|
const orchestratorDm = pluginConfig?.orchestratorDm as string | undefined;
|
|
|
|
await notify(
|
|
{
|
|
type: "heartbeat",
|
|
projectsScanned: projectEntries.length,
|
|
healthFixes: result.healthFixes.length,
|
|
pickups: result.pickups.length,
|
|
skipped: result.skipped.length,
|
|
dryRun,
|
|
pickupDetails: result.pickups.map((p) => ({
|
|
project: p.project,
|
|
issueId: p.issueId,
|
|
role: p.role,
|
|
})),
|
|
},
|
|
{
|
|
workspaceDir,
|
|
config: notifyConfig,
|
|
orchestratorDm,
|
|
channel: "telegram",
|
|
},
|
|
);
|
|
|
|
return jsonResult(result);
|
|
},
|
|
});
|
|
}
|