From 24b35b3a3e1a7e2b702810e766319d9035b9a625 Mon Sep 17 00:00:00 2001 From: Lauren ten Hoor <32955832+laurentenhoor@users.noreply.github.com> Date: Fri, 13 Feb 2026 17:29:25 +0800 Subject: [PATCH] fix: dispatch timeout causing missed Telegram notifications (#153) (#154) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Problem `dispatchTask()` shells out to `openclaw gateway call sessions.patch` which times out when the gateway is busy, causing: 1. Notifications never fire (they're at the end of dispatchTask) 2. Worker state may not be recorded 3. Workers run silently ## Solution (3 changes) ### 1. Make `ensureSession` fire-and-forget Session key is deterministic, so we don't need to wait for confirmation. Health check catches orphaned state later. ### 2. Use runtime API for notifications instead of CLI Pass `runtime` through opts and use direct API calls: - `runtime.channel.telegram.sendMessageTelegram()` - `runtime.channel.whatsapp.sendMessageWhatsApp()` - etc. ### 3. Move notification before session dispatch Fire workerStart/workerComplete notifications early (after label transition) before the session calls that can timeout. ## Files Changed - lib/dispatch.ts — fire-and-forget ensureSession, early notification, accept runtime - lib/notify.ts — use runtime API for direct channel sends - lib/services/pipeline.ts — early notification, accept runtime - lib/services/tick.ts — pass runtime through to dispatchTask - lib/tool-helpers.ts — accept runtime in tickAndNotify - lib/tools/work-start.ts — pass api.runtime to dispatchTask - lib/tools/work-finish.ts — pass api.runtime to executeCompletion/tickAndNotify --- lib/dispatch.ts | 140 ++++++++++++++++++--------------------- lib/notify.ts | 41 ++++++++++-- lib/services/pipeline.ts | 45 +++++++------ lib/services/tick.ts | 6 +- lib/tool-helpers.ts | 5 +- lib/tools/work-finish.ts | 6 +- lib/tools/work-start.ts | 3 +- 7 files changed, 140 insertions(+), 106 deletions(-) diff --git a/lib/dispatch.ts b/lib/dispatch.ts index a00b128..589bc1c 100644 --- a/lib/dispatch.ts +++ b/lib/dispatch.ts @@ -6,6 +6,7 @@ */ import fs from "node:fs/promises"; import path from "node:path"; +import type { PluginRuntime } from "openclaw/plugin-sdk"; import { log as auditLog } from "./audit.js"; import { runCommand } from "./run-command.js"; import { @@ -43,6 +44,8 @@ export type DispatchOpts = { channel?: string; /** Orchestrator's session key (used as spawnedBy for subagent tracking) */ sessionKey?: string; + /** Plugin runtime for direct API access (avoids CLI subprocess timeouts) */ + runtime?: PluginRuntime; }; export type DispatchResult = { @@ -131,8 +134,14 @@ export async function buildTaskMessage(opts: { /** * Dispatch a task to a worker session. * - * Flow: resolve model → build message → transition label → spawn/send session - * → update worker state → audit → build announcement. + * Flow: + * 1. Resolve model and session key + * 2. Build task message + * 3. Transition label + * 4. Fire notification (early — before session dispatch which can timeout) + * 5. Ensure session (fire-and-forget) + send to agent + * 6. Update worker state + * 7. Audit * * On dispatch failure: rolls back label transition. * On state update failure after dispatch: logs warning (session IS running). @@ -143,7 +152,7 @@ export async function dispatchTask( const { workspaceDir, agentId, groupId, project, issueId, issueTitle, issueDescription, issueUrl, role, level, fromLabel, toLabel, - transitionLabel, provider, pluginConfig, + transitionLabel, provider, pluginConfig, runtime, } = opts; const model = resolveModel(role, level, pluginConfig); @@ -151,6 +160,9 @@ export async function dispatchTask( const existingSessionKey = getSessionForLevel(worker, level); const sessionAction = existingSessionKey ? "send" : "spawn"; + // Compute session key deterministically (avoids waiting for gateway) + const sessionKey = `agent:${agentId ?? "unknown"}:subagent:${project.name}-${role}-${level}`; + // Fetch comments to include in task context const comments = await provider.listComments(issueId); @@ -161,55 +173,13 @@ export async function dispatchTask( comments, }); + // Step 1: Transition label (this is the commitment point) await transitionLabel(issueId, fromLabel, toLabel); - let dispatched = false; - let session: { key: string; action: "spawn" | "send" }; - - try { - session = await ensureSession(sessionAction, existingSessionKey, { - agentId, projectName: project.name, role, level, model, - }); - - sendToAgent(session.key, taskMessage, { - agentId, projectName: project.name, issueId, role, - orchestratorSessionKey: opts.sessionKey, - }); - - dispatched = true; - - // Always store session key — a "send" may have fallen back to "spawn" - await recordWorkerState(workspaceDir, groupId, role, { - issueId, level, sessionKey: session.key, sessionAction: session.action, - }); - } catch (err) { - if (dispatched) { - await auditLog(workspaceDir, "work_start", { - project: project.name, groupId, issue: issueId, role, - warning: "State update failed after successful dispatch", - error: (err as Error).message, sessionKey: session!.key, - }); - throw new Error( - `State update failed after successful session dispatch: ${(err as Error).message}. Session is running but projects.json was not updated.`, - ); - } - try { await transitionLabel(issueId, toLabel, fromLabel); } catch { /* best-effort rollback */ } - throw new Error( - `Session dispatch failed: ${(err as Error).message}. Label reverted to "${fromLabel}".`, - ); - } - - await auditDispatch(workspaceDir, { - project: project.name, groupId, issueId, issueTitle, - role, level, model, sessionAction: session.action, sessionKey: session.key, - fromLabel, toLabel, - }); - - const announcement = buildAnnouncement(level, role, session.action, issueId, issueTitle, issueUrl); - - // Notify workerStart (non-fatal) + // Step 2: Send notification early (before session dispatch which can timeout) + // This ensures users see the notification even if gateway is slow const notifyConfig = getNotificationConfig(pluginConfig); - await notify( + notify( { type: "workerStart", project: project.name, @@ -219,17 +189,51 @@ export async function dispatchTask( issueUrl, role, level, - sessionAction: session.action, + sessionAction, }, { workspaceDir, config: notifyConfig, groupId, channel: opts.channel ?? "telegram", + runtime, }, ).catch(() => { /* non-fatal */ }); - return { sessionAction: session.action, sessionKey: session.key, level, model, announcement }; + // Step 3: Ensure session exists (fire-and-forget — don't wait for gateway) + // Session key is deterministic, so we can proceed immediately + ensureSessionFireAndForget(sessionKey, model); + + // Step 4: Send task to agent (fire-and-forget) + sendToAgent(sessionKey, taskMessage, { + agentId, projectName: project.name, issueId, role, + orchestratorSessionKey: opts.sessionKey, + }); + + // Step 5: Update worker state + try { + await recordWorkerState(workspaceDir, groupId, role, { + issueId, level, sessionKey, sessionAction, + }); + } catch (err) { + // Session is already dispatched — log warning but don't fail + await auditLog(workspaceDir, "work_start", { + project: project.name, groupId, issue: issueId, role, + warning: "State update failed after successful dispatch", + error: (err as Error).message, sessionKey, + }); + } + + // Step 6: Audit + await auditDispatch(workspaceDir, { + project: project.name, groupId, issueId, issueTitle, + role, level, model, sessionAction, sessionKey, + fromLabel, toLabel, + }); + + const announcement = buildAnnouncement(level, role, sessionAction, issueId, issueTitle, issueUrl); + + return { sessionAction, sessionKey, level, model, announcement }; } // --------------------------------------------------------------------------- @@ -246,32 +250,16 @@ async function loadRoleInstructions( return ""; } -async function ensureSession( - action: "spawn" | "send", - existingKey: string | null, - opts: { agentId?: string; projectName: string; role: string; level: string; model: string }, -): Promise<{ key: string; action: "spawn" | "send" }> { - const expectedKey = `agent:${opts.agentId ?? "unknown"}:subagent:${opts.projectName}-${opts.role}-${opts.level}`; - - // Reuse: validate stored key matches expected format, then verify session exists - if (action === "send" && existingKey === expectedKey) { - try { - await runCommand( - ["openclaw", "gateway", "call", "sessions.patch", "--params", JSON.stringify({ key: existingKey, model: opts.model })], - { timeoutMs: 30_000 }, - ); - return { key: existingKey, action: "send" }; - } catch { - // Session gone (deleted, cleanup, etc.) — fall through to spawn - } - } - - // Spawn: create fresh session (also handles stale/mismatched keys) - await runCommand( - ["openclaw", "gateway", "call", "sessions.patch", "--params", JSON.stringify({ key: expectedKey, model: opts.model })], +/** + * Fire-and-forget session creation/update. + * Session key is deterministic, so we don't need to wait for confirmation. + * If this fails, health check will catch orphaned state later. + */ +function ensureSessionFireAndForget(sessionKey: string, model: string): void { + runCommand( + ["openclaw", "gateway", "call", "sessions.patch", "--params", JSON.stringify({ key: sessionKey, model })], { timeoutMs: 30_000 }, - ); - return { key: expectedKey, action: "spawn" }; + ).catch(() => { /* fire-and-forget */ }); } function sendToAgent( diff --git a/lib/notify.ts b/lib/notify.ts index c8f76f4..3b26f07 100644 --- a/lib/notify.ts +++ b/lib/notify.ts @@ -8,7 +8,7 @@ * - workerComplete: Worker completed task (→ project group) */ import { log as auditLog } from "./audit.js"; -import { runCommand } from "./run-command.js"; +import type { PluginRuntime } from "openclaw/plugin-sdk"; /** Per-event-type toggle. All default to true — set to false to suppress. */ export type NotificationConfig = Partial>; @@ -78,19 +78,46 @@ function buildMessage(event: NotifyEvent): string { } /** - * Send a notification message via the native OpenClaw messaging CLI. + * Send a notification message via the plugin runtime API. * - * Uses `openclaw message send` which handles target resolution, chunking, - * retries, and error reporting for all supported channels. - * Fails silently (logs error but doesn't throw) to avoid breaking the main flow. + * Uses the runtime's native send functions to bypass CLI → WebSocket timeouts. + * Falls back gracefully on error (notifications shouldn't break the main flow). */ async function sendMessage( target: string, message: string, channel: string, workspaceDir: string, + runtime?: PluginRuntime, ): Promise { try { + // Use runtime API when available (avoids CLI subprocess timeouts) + if (runtime) { + if (channel === "telegram") { + await runtime.channel.telegram.sendMessageTelegram(target, message, { silent: true }); + return true; + } + if (channel === "whatsapp") { + await runtime.channel.whatsapp.sendMessageWhatsApp(target, message, { verbose: false }); + return true; + } + if (channel === "discord") { + await runtime.channel.discord.sendMessageDiscord(target, message); + return true; + } + if (channel === "slack") { + await runtime.channel.slack.sendMessageSlack(target, message); + return true; + } + if (channel === "signal") { + await runtime.channel.signal.sendMessageSignal(target, message); + return true; + } + } + + // Fallback: use CLI (for unsupported channels or when runtime isn't available) + // Import lazily to avoid circular dependency issues + const { runCommand } = await import("./run-command.js"); await runCommand( [ "openclaw", @@ -132,6 +159,8 @@ export async function notify( groupId?: string; /** Channel type for routing (e.g. "telegram", "whatsapp", "discord", "slack") */ channel?: string; + /** Plugin runtime for direct API access (avoids CLI subprocess timeouts) */ + runtime?: PluginRuntime; }, ): Promise { if (opts.config?.[event.type] === false) return true; @@ -155,7 +184,7 @@ export async function notify( message, }); - return sendMessage(target, message, channel, opts.workspaceDir); + return sendMessage(target, message, channel, opts.workspaceDir, opts.runtime); } /** diff --git a/lib/services/pipeline.ts b/lib/services/pipeline.ts index 19c6389..ba2559f 100644 --- a/lib/services/pipeline.ts +++ b/lib/services/pipeline.ts @@ -3,6 +3,7 @@ * * Replaces 7 if-blocks with a data-driven lookup table. */ +import type { PluginRuntime } from "openclaw/plugin-sdk"; import type { StateLabel, IssueProvider } from "../providers/provider.js"; import { deactivateWorker } from "../projects.js"; import { runCommand } from "../run-command.js"; @@ -74,8 +75,10 @@ export async function executeCompletion(opts: { projectName: string; channel?: string; pluginConfig?: Record; + /** Plugin runtime for direct API access (avoids CLI subprocess timeouts) */ + runtime?: PluginRuntime; }): Promise { - const { workspaceDir, groupId, role, result, issueId, summary, provider, repoPath, projectName, channel, pluginConfig } = opts; + const { workspaceDir, groupId, role, result, issueId, summary, provider, repoPath, projectName, channel, pluginConfig, runtime } = opts; const key = `${role}:${result}`; const rule = COMPLETION_RULES[key]; if (!rule) throw new Error(`No completion rule for ${key}`); @@ -94,27 +97,13 @@ export async function executeCompletion(opts: { try { prUrl = await provider.getMergedMRUrl(issueId) ?? undefined; } catch { /* ignore */ } } - // Deactivate worker + transition label - await deactivateWorker(workspaceDir, groupId, role); - await provider.transitionLabel(issueId, rule.from, rule.to); - - // Close/reopen - if (rule.closeIssue) await provider.closeIssue(issueId); - if (rule.reopenIssue) await provider.reopenIssue(issueId); - - // Build announcement + // Get issue early (for URL in notification) const issue = await provider.getIssue(issueId); - const emoji = EMOJI[key] ?? "📋"; - const label = key.replace(":", " ").toUpperCase(); - let announcement = `${emoji} ${label} #${issueId}`; - if (summary) announcement += ` — ${summary}`; - announcement += `\n📋 Issue: ${issue.web_url}`; - if (prUrl) announcement += `\n🔗 PR: ${prUrl}`; - announcement += `\n${NEXT_STATE[key]}.`; - // Notify workerComplete (non-fatal) + // Send notification early (before deactivation and label transition which can fail) + // This ensures users see the notification even if subsequent steps have issues const notifyConfig = getNotificationConfig(pluginConfig); - await notify( + notify( { type: "workerComplete", project: projectName, @@ -131,9 +120,27 @@ export async function executeCompletion(opts: { config: notifyConfig, groupId, channel: channel ?? "telegram", + runtime, }, ).catch(() => { /* non-fatal */ }); + // Deactivate worker + transition label + await deactivateWorker(workspaceDir, groupId, role); + await provider.transitionLabel(issueId, rule.from, rule.to); + + // Close/reopen + if (rule.closeIssue) await provider.closeIssue(issueId); + if (rule.reopenIssue) await provider.reopenIssue(issueId); + + // Build announcement + const emoji = EMOJI[key] ?? "📋"; + const label = key.replace(":", " ").toUpperCase(); + let announcement = `${emoji} ${label} #${issueId}`; + if (summary) announcement += ` — ${summary}`; + announcement += `\n📋 Issue: ${issue.web_url}`; + if (prUrl) announcement += `\n🔗 PR: ${prUrl}`; + announcement += `\n${NEXT_STATE[key]}.`; + return { labelTransition: `${rule.from} → ${rule.to}`, announcement, diff --git a/lib/services/tick.ts b/lib/services/tick.ts index 7fe89c1..5da132d 100644 --- a/lib/services/tick.ts +++ b/lib/services/tick.ts @@ -4,6 +4,7 @@ * Core function: projectTick() scans one project's queue and fills free worker slots. * Called by: work_start (fill parallel slot), work_finish (next pipeline step), heartbeat service (sweep). */ +import type { PluginRuntime } from "openclaw/plugin-sdk"; import type { Issue, StateLabel } from "../providers/provider.js"; import type { IssueProvider } from "../providers/provider.js"; import { createProvider } from "../providers/index.js"; @@ -118,8 +119,10 @@ export async function projectTick(opts: { targetRole?: "dev" | "qa"; /** Optional provider override (for testing). Uses createProvider if omitted. */ provider?: Pick; + /** Plugin runtime for direct API access (avoids CLI subprocess timeouts) */ + runtime?: PluginRuntime; }): Promise { - const { workspaceDir, groupId, agentId, sessionKey, pluginConfig, dryRun, maxPickups, targetRole } = opts; + const { workspaceDir, groupId, agentId, sessionKey, pluginConfig, dryRun, maxPickups, targetRole, runtime } = opts; const project = (await readProjects(workspaceDir)).projects[groupId]; if (!project) return { pickups: [], skipped: [{ reason: `Project not found: ${groupId}` }] }; @@ -179,6 +182,7 @@ export async function projectTick(opts: { pluginConfig, channel: fresh.channel, sessionKey, + runtime, }); pickups.push({ project: project.name, groupId, issueId: issue.iid, issueTitle: issue.title, issueUrl: issue.web_url, diff --git a/lib/tool-helpers.ts b/lib/tool-helpers.ts index 07be321..b986e86 100644 --- a/lib/tool-helpers.ts +++ b/lib/tool-helpers.ts @@ -4,7 +4,7 @@ * Eliminates repeated boilerplate across tools: workspace validation, * project resolution, provider creation. */ -import type { OpenClawPluginApi } from "openclaw/plugin-sdk"; +import type { OpenClawPluginApi, PluginRuntime } from "openclaw/plugin-sdk"; import type { ToolContext } from "./types.js"; import { readProjects, getProject, type Project, type ProjectsData } from "./projects.js"; import { createProvider, type ProviderWithType } from "./providers/index.js"; @@ -60,6 +60,8 @@ export async function tickAndNotify(opts: { pluginConfig?: Record; sessionKey?: string; targetRole?: "dev" | "qa"; + /** Plugin runtime for direct API access (avoids CLI subprocess timeouts) */ + runtime?: PluginRuntime; }): Promise { try { const result = await projectTick({ @@ -69,6 +71,7 @@ export async function tickAndNotify(opts: { pluginConfig: opts.pluginConfig, sessionKey: opts.sessionKey, targetRole: opts.targetRole, + runtime: opts.runtime, }); return result.pickups; } catch { diff --git a/lib/tools/work-finish.ts b/lib/tools/work-finish.ts index 1ab788d..492a5f7 100644 --- a/lib/tools/work-finish.ts +++ b/lib/tools/work-finish.ts @@ -59,12 +59,13 @@ export function createWorkFinishTool(api: OpenClawPluginApi) { const pluginConfig = getPluginConfig(api); - // Execute completion (pipeline service handles notification) + // Execute completion (pipeline service handles notification with runtime) const completion = await executeCompletion({ workspaceDir, groupId, role, result, issueId, summary, prUrl, provider, repoPath, projectName: project.name, channel: project.channel, pluginConfig, + runtime: api.runtime, }); const output: Record = { @@ -72,9 +73,10 @@ export function createWorkFinishTool(api: OpenClawPluginApi) { ...completion, }; - // Tick: fill free slots (notifications handled by dispatchTask) + // Tick: fill free slots (notifications handled by dispatchTask with runtime) const tickPickups = await tickAndNotify({ workspaceDir, groupId, agentId: ctx.agentId, pluginConfig, sessionKey: ctx.sessionKey, + runtime: api.runtime, }); if (tickPickups.length) output.tickPickups = tickPickups; diff --git a/lib/tools/work-start.ts b/lib/tools/work-start.ts index 96eab97..99c57ad 100644 --- a/lib/tools/work-start.ts +++ b/lib/tools/work-start.ts @@ -89,7 +89,7 @@ export function createWorkStartTool(api: OpenClawPluginApi) { } } - // Dispatch + // Dispatch (pass runtime for direct API access) const pluginConfig = getPluginConfig(api); const dr = await dispatchTask({ workspaceDir, agentId: ctx.agentId, groupId, project, issueId: issue.iid, @@ -100,6 +100,7 @@ export function createWorkStartTool(api: OpenClawPluginApi) { pluginConfig, channel: project.channel, sessionKey: ctx.sessionKey, + runtime: api.runtime, }); // Auto-tick disabled per issue #125 - work_start should only pick up the explicitly requested issue