## Problem `dispatchTask()` shells out to `openclaw gateway call sessions.patch` which times out when the gateway is busy, causing: 1. Notifications never fire (they're at the end of dispatchTask) 2. Worker state may not be recorded 3. Workers run silently ## Solution (3 changes) ### 1. Make `ensureSession` fire-and-forget Session key is deterministic, so we don't need to wait for confirmation. Health check catches orphaned state later. ### 2. Use runtime API for notifications instead of CLI Pass `runtime` through opts and use direct API calls: - `runtime.channel.telegram.sendMessageTelegram()` - `runtime.channel.whatsapp.sendMessageWhatsApp()` - etc. ### 3. Move notification before session dispatch Fire workerStart/workerComplete notifications early (after label transition) before the session calls that can timeout. ## Files Changed - lib/dispatch.ts — fire-and-forget ensureSession, early notification, accept runtime - lib/notify.ts — use runtime API for direct channel sends - lib/services/pipeline.ts — early notification, accept runtime - lib/services/tick.ts — pass runtime through to dispatchTask - lib/tool-helpers.ts — accept runtime in tickAndNotify - lib/tools/work-start.ts — pass api.runtime to dispatchTask - lib/tools/work-finish.ts — pass api.runtime to executeCompletion/tickAndNotify
This commit is contained in:
140
lib/dispatch.ts
140
lib/dispatch.ts
@@ -6,6 +6,7 @@
|
||||
*/
|
||||
import fs from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
import type { PluginRuntime } from "openclaw/plugin-sdk";
|
||||
import { log as auditLog } from "./audit.js";
|
||||
import { runCommand } from "./run-command.js";
|
||||
import {
|
||||
@@ -43,6 +44,8 @@ export type DispatchOpts = {
|
||||
channel?: string;
|
||||
/** Orchestrator's session key (used as spawnedBy for subagent tracking) */
|
||||
sessionKey?: string;
|
||||
/** Plugin runtime for direct API access (avoids CLI subprocess timeouts) */
|
||||
runtime?: PluginRuntime;
|
||||
};
|
||||
|
||||
export type DispatchResult = {
|
||||
@@ -131,8 +134,14 @@ export async function buildTaskMessage(opts: {
|
||||
/**
|
||||
* Dispatch a task to a worker session.
|
||||
*
|
||||
* Flow: resolve model → build message → transition label → spawn/send session
|
||||
* → update worker state → audit → build announcement.
|
||||
* Flow:
|
||||
* 1. Resolve model and session key
|
||||
* 2. Build task message
|
||||
* 3. Transition label
|
||||
* 4. Fire notification (early — before session dispatch which can timeout)
|
||||
* 5. Ensure session (fire-and-forget) + send to agent
|
||||
* 6. Update worker state
|
||||
* 7. Audit
|
||||
*
|
||||
* On dispatch failure: rolls back label transition.
|
||||
* On state update failure after dispatch: logs warning (session IS running).
|
||||
@@ -143,7 +152,7 @@ export async function dispatchTask(
|
||||
const {
|
||||
workspaceDir, agentId, groupId, project, issueId, issueTitle,
|
||||
issueDescription, issueUrl, role, level, fromLabel, toLabel,
|
||||
transitionLabel, provider, pluginConfig,
|
||||
transitionLabel, provider, pluginConfig, runtime,
|
||||
} = opts;
|
||||
|
||||
const model = resolveModel(role, level, pluginConfig);
|
||||
@@ -151,6 +160,9 @@ export async function dispatchTask(
|
||||
const existingSessionKey = getSessionForLevel(worker, level);
|
||||
const sessionAction = existingSessionKey ? "send" : "spawn";
|
||||
|
||||
// Compute session key deterministically (avoids waiting for gateway)
|
||||
const sessionKey = `agent:${agentId ?? "unknown"}:subagent:${project.name}-${role}-${level}`;
|
||||
|
||||
// Fetch comments to include in task context
|
||||
const comments = await provider.listComments(issueId);
|
||||
|
||||
@@ -161,55 +173,13 @@ export async function dispatchTask(
|
||||
comments,
|
||||
});
|
||||
|
||||
// Step 1: Transition label (this is the commitment point)
|
||||
await transitionLabel(issueId, fromLabel, toLabel);
|
||||
|
||||
let dispatched = false;
|
||||
let session: { key: string; action: "spawn" | "send" };
|
||||
|
||||
try {
|
||||
session = await ensureSession(sessionAction, existingSessionKey, {
|
||||
agentId, projectName: project.name, role, level, model,
|
||||
});
|
||||
|
||||
sendToAgent(session.key, taskMessage, {
|
||||
agentId, projectName: project.name, issueId, role,
|
||||
orchestratorSessionKey: opts.sessionKey,
|
||||
});
|
||||
|
||||
dispatched = true;
|
||||
|
||||
// Always store session key — a "send" may have fallen back to "spawn"
|
||||
await recordWorkerState(workspaceDir, groupId, role, {
|
||||
issueId, level, sessionKey: session.key, sessionAction: session.action,
|
||||
});
|
||||
} catch (err) {
|
||||
if (dispatched) {
|
||||
await auditLog(workspaceDir, "work_start", {
|
||||
project: project.name, groupId, issue: issueId, role,
|
||||
warning: "State update failed after successful dispatch",
|
||||
error: (err as Error).message, sessionKey: session!.key,
|
||||
});
|
||||
throw new Error(
|
||||
`State update failed after successful session dispatch: ${(err as Error).message}. Session is running but projects.json was not updated.`,
|
||||
);
|
||||
}
|
||||
try { await transitionLabel(issueId, toLabel, fromLabel); } catch { /* best-effort rollback */ }
|
||||
throw new Error(
|
||||
`Session dispatch failed: ${(err as Error).message}. Label reverted to "${fromLabel}".`,
|
||||
);
|
||||
}
|
||||
|
||||
await auditDispatch(workspaceDir, {
|
||||
project: project.name, groupId, issueId, issueTitle,
|
||||
role, level, model, sessionAction: session.action, sessionKey: session.key,
|
||||
fromLabel, toLabel,
|
||||
});
|
||||
|
||||
const announcement = buildAnnouncement(level, role, session.action, issueId, issueTitle, issueUrl);
|
||||
|
||||
// Notify workerStart (non-fatal)
|
||||
// Step 2: Send notification early (before session dispatch which can timeout)
|
||||
// This ensures users see the notification even if gateway is slow
|
||||
const notifyConfig = getNotificationConfig(pluginConfig);
|
||||
await notify(
|
||||
notify(
|
||||
{
|
||||
type: "workerStart",
|
||||
project: project.name,
|
||||
@@ -219,17 +189,51 @@ export async function dispatchTask(
|
||||
issueUrl,
|
||||
role,
|
||||
level,
|
||||
sessionAction: session.action,
|
||||
sessionAction,
|
||||
},
|
||||
{
|
||||
workspaceDir,
|
||||
config: notifyConfig,
|
||||
groupId,
|
||||
channel: opts.channel ?? "telegram",
|
||||
runtime,
|
||||
},
|
||||
).catch(() => { /* non-fatal */ });
|
||||
|
||||
return { sessionAction: session.action, sessionKey: session.key, level, model, announcement };
|
||||
// Step 3: Ensure session exists (fire-and-forget — don't wait for gateway)
|
||||
// Session key is deterministic, so we can proceed immediately
|
||||
ensureSessionFireAndForget(sessionKey, model);
|
||||
|
||||
// Step 4: Send task to agent (fire-and-forget)
|
||||
sendToAgent(sessionKey, taskMessage, {
|
||||
agentId, projectName: project.name, issueId, role,
|
||||
orchestratorSessionKey: opts.sessionKey,
|
||||
});
|
||||
|
||||
// Step 5: Update worker state
|
||||
try {
|
||||
await recordWorkerState(workspaceDir, groupId, role, {
|
||||
issueId, level, sessionKey, sessionAction,
|
||||
});
|
||||
} catch (err) {
|
||||
// Session is already dispatched — log warning but don't fail
|
||||
await auditLog(workspaceDir, "work_start", {
|
||||
project: project.name, groupId, issue: issueId, role,
|
||||
warning: "State update failed after successful dispatch",
|
||||
error: (err as Error).message, sessionKey,
|
||||
});
|
||||
}
|
||||
|
||||
// Step 6: Audit
|
||||
await auditDispatch(workspaceDir, {
|
||||
project: project.name, groupId, issueId, issueTitle,
|
||||
role, level, model, sessionAction, sessionKey,
|
||||
fromLabel, toLabel,
|
||||
});
|
||||
|
||||
const announcement = buildAnnouncement(level, role, sessionAction, issueId, issueTitle, issueUrl);
|
||||
|
||||
return { sessionAction, sessionKey, level, model, announcement };
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
@@ -246,32 +250,16 @@ async function loadRoleInstructions(
|
||||
return "";
|
||||
}
|
||||
|
||||
async function ensureSession(
|
||||
action: "spawn" | "send",
|
||||
existingKey: string | null,
|
||||
opts: { agentId?: string; projectName: string; role: string; level: string; model: string },
|
||||
): Promise<{ key: string; action: "spawn" | "send" }> {
|
||||
const expectedKey = `agent:${opts.agentId ?? "unknown"}:subagent:${opts.projectName}-${opts.role}-${opts.level}`;
|
||||
|
||||
// Reuse: validate stored key matches expected format, then verify session exists
|
||||
if (action === "send" && existingKey === expectedKey) {
|
||||
try {
|
||||
await runCommand(
|
||||
["openclaw", "gateway", "call", "sessions.patch", "--params", JSON.stringify({ key: existingKey, model: opts.model })],
|
||||
{ timeoutMs: 30_000 },
|
||||
);
|
||||
return { key: existingKey, action: "send" };
|
||||
} catch {
|
||||
// Session gone (deleted, cleanup, etc.) — fall through to spawn
|
||||
}
|
||||
}
|
||||
|
||||
// Spawn: create fresh session (also handles stale/mismatched keys)
|
||||
await runCommand(
|
||||
["openclaw", "gateway", "call", "sessions.patch", "--params", JSON.stringify({ key: expectedKey, model: opts.model })],
|
||||
/**
|
||||
* Fire-and-forget session creation/update.
|
||||
* Session key is deterministic, so we don't need to wait for confirmation.
|
||||
* If this fails, health check will catch orphaned state later.
|
||||
*/
|
||||
function ensureSessionFireAndForget(sessionKey: string, model: string): void {
|
||||
runCommand(
|
||||
["openclaw", "gateway", "call", "sessions.patch", "--params", JSON.stringify({ key: sessionKey, model })],
|
||||
{ timeoutMs: 30_000 },
|
||||
);
|
||||
return { key: expectedKey, action: "spawn" };
|
||||
).catch(() => { /* fire-and-forget */ });
|
||||
}
|
||||
|
||||
function sendToAgent(
|
||||
|
||||
Reference in New Issue
Block a user