feat: implement runCommand wrapper and refactor command executions across modules
This commit is contained in:
@@ -4,11 +4,10 @@
|
||||
* Handles: session lookup, spawn/reuse via Gateway RPC, task dispatch via CLI,
|
||||
* state update (activateWorker), and audit logging.
|
||||
*/
|
||||
import { execFile, spawn } from "node:child_process";
|
||||
import fs from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
import { promisify } from "node:util";
|
||||
import { log as auditLog } from "./audit.js";
|
||||
import { runCommand } from "./run-command.js";
|
||||
import {
|
||||
type Project,
|
||||
activateWorker,
|
||||
@@ -17,8 +16,6 @@ import {
|
||||
} from "./projects.js";
|
||||
import { resolveModel, levelEmoji } from "./tiers.js";
|
||||
|
||||
const execFileAsync = promisify(execFile);
|
||||
|
||||
export type DispatchOpts = {
|
||||
workspaceDir: string;
|
||||
agentId?: string;
|
||||
@@ -142,30 +139,31 @@ export async function dispatchTask(
|
||||
|
||||
await transitionLabel(issueId, fromLabel, toLabel);
|
||||
|
||||
let sessionKey = existingSessionKey;
|
||||
let dispatched = false;
|
||||
let session: { key: string; action: "spawn" | "send" };
|
||||
|
||||
try {
|
||||
sessionKey = await ensureSession(sessionAction, sessionKey, {
|
||||
session = await ensureSession(sessionAction, existingSessionKey, {
|
||||
agentId, projectName: project.name, role, level, model,
|
||||
});
|
||||
|
||||
await sendToAgent(sessionKey!, taskMessage, {
|
||||
sendToAgent(session.key, taskMessage, {
|
||||
agentId, projectName: project.name, issueId, role,
|
||||
orchestratorSessionKey: opts.sessionKey,
|
||||
});
|
||||
|
||||
dispatched = true;
|
||||
|
||||
// Always store session key — a "send" may have fallen back to "spawn"
|
||||
await recordWorkerState(workspaceDir, groupId, role, {
|
||||
issueId, level, sessionKey: sessionKey!, sessionAction,
|
||||
issueId, level, sessionKey: session.key, sessionAction: session.action,
|
||||
});
|
||||
} catch (err) {
|
||||
if (dispatched) {
|
||||
await auditLog(workspaceDir, "work_start", {
|
||||
project: project.name, groupId, issue: issueId, role,
|
||||
warning: "State update failed after successful dispatch",
|
||||
error: (err as Error).message, sessionKey,
|
||||
error: (err as Error).message, sessionKey: session!.key,
|
||||
});
|
||||
throw new Error(
|
||||
`State update failed after successful session dispatch: ${(err as Error).message}. Session is running but projects.json was not updated.`,
|
||||
@@ -179,13 +177,13 @@ export async function dispatchTask(
|
||||
|
||||
await auditDispatch(workspaceDir, {
|
||||
project: project.name, groupId, issueId, issueTitle,
|
||||
role, level, model, sessionAction, sessionKey: sessionKey!,
|
||||
role, level, model, sessionAction: session.action, sessionKey: session.key,
|
||||
fromLabel, toLabel,
|
||||
});
|
||||
|
||||
const announcement = buildAnnouncement(level, role, sessionAction, issueId, issueTitle, issueUrl);
|
||||
const announcement = buildAnnouncement(level, role, session.action, issueId, issueTitle, issueUrl);
|
||||
|
||||
return { sessionAction, sessionKey: sessionKey!, level, model, announcement };
|
||||
return { sessionAction: session.action, sessionKey: session.key, level, model, announcement };
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
@@ -206,16 +204,28 @@ async function ensureSession(
|
||||
action: "spawn" | "send",
|
||||
existingKey: string | null,
|
||||
opts: { agentId?: string; projectName: string; role: string; level: string; model: string },
|
||||
): Promise<string> {
|
||||
if (action === "send") return existingKey!;
|
||||
): Promise<{ key: string; action: "spawn" | "send" }> {
|
||||
const expectedKey = `agent:${opts.agentId ?? "unknown"}:subagent:${opts.projectName}-${opts.role}-${opts.level}`;
|
||||
|
||||
const sessionKey = `agent:${opts.agentId ?? "unknown"}:subagent:${opts.projectName}-${opts.role}-${opts.level}`;
|
||||
await execFileAsync(
|
||||
"openclaw",
|
||||
["gateway", "call", "sessions.patch", "--params", JSON.stringify({ key: sessionKey, model: opts.model })],
|
||||
{ timeout: 30_000 },
|
||||
// Reuse: validate stored key matches expected format, then verify session exists
|
||||
if (action === "send" && existingKey === expectedKey) {
|
||||
try {
|
||||
await runCommand(
|
||||
["openclaw", "gateway", "call", "sessions.patch", "--params", JSON.stringify({ key: existingKey, model: opts.model })],
|
||||
{ timeoutMs: 30_000 },
|
||||
);
|
||||
return { key: existingKey, action: "send" };
|
||||
} catch {
|
||||
// Session gone (deleted, cleanup, etc.) — fall through to spawn
|
||||
}
|
||||
}
|
||||
|
||||
// Spawn: create fresh session (also handles stale/mismatched keys)
|
||||
await runCommand(
|
||||
["openclaw", "gateway", "call", "sessions.patch", "--params", JSON.stringify({ key: expectedKey, model: opts.model })],
|
||||
{ timeoutMs: 30_000 },
|
||||
);
|
||||
return sessionKey;
|
||||
return { key: expectedKey, action: "spawn" };
|
||||
}
|
||||
|
||||
function sendToAgent(
|
||||
@@ -231,27 +241,23 @@ function sendToAgent(
|
||||
lane: "subagent",
|
||||
...(opts.orchestratorSessionKey ? { spawnedBy: opts.orchestratorSessionKey } : {}),
|
||||
});
|
||||
const child = spawn(
|
||||
"openclaw",
|
||||
["gateway", "call", "agent", "--params", gatewayParams, "--expect-final", "--json"],
|
||||
{ detached: true, stdio: "ignore" },
|
||||
);
|
||||
child.unref();
|
||||
// Fire-and-forget: long-running agent turn, don't await
|
||||
runCommand(
|
||||
["openclaw", "gateway", "call", "agent", "--params", gatewayParams, "--expect-final", "--json"],
|
||||
{ timeoutMs: 600_000 },
|
||||
).catch(() => { /* fire-and-forget */ });
|
||||
}
|
||||
|
||||
async function recordWorkerState(
|
||||
workspaceDir: string, groupId: string, role: "dev" | "qa",
|
||||
opts: { issueId: number; level: string; sessionKey: string; sessionAction: "spawn" | "send" },
|
||||
): Promise<void> {
|
||||
const params: { issueId: string; level: string; sessionKey?: string; startTime: string } = {
|
||||
await activateWorker(workspaceDir, groupId, role, {
|
||||
issueId: String(opts.issueId),
|
||||
level: opts.level,
|
||||
sessionKey: opts.sessionKey,
|
||||
startTime: new Date().toISOString(),
|
||||
};
|
||||
if (opts.sessionAction === "spawn") {
|
||||
params.sessionKey = opts.sessionKey;
|
||||
}
|
||||
await activateWorker(workspaceDir, groupId, role, params);
|
||||
});
|
||||
}
|
||||
|
||||
async function auditDispatch(
|
||||
|
||||
Reference in New Issue
Block a user