refactor: centralize notifications in core dispatch/completion functions (#150)
This commit is contained in:
@@ -15,6 +15,7 @@ import {
|
||||
getWorker,
|
||||
} from "./projects.js";
|
||||
import { resolveModel, levelEmoji } from "./tiers.js";
|
||||
import { notify, getNotificationConfig } from "./notify.js";
|
||||
|
||||
export type DispatchOpts = {
|
||||
workspaceDir: string;
|
||||
@@ -36,8 +37,10 @@ export type DispatchOpts = {
|
||||
transitionLabel: (issueId: number, from: string, to: string) => Promise<void>;
|
||||
/** Issue provider for fetching comments */
|
||||
provider: import("./providers/provider.js").IssueProvider;
|
||||
/** Plugin config for model resolution */
|
||||
/** Plugin config for model resolution and notification config */
|
||||
pluginConfig?: Record<string, unknown>;
|
||||
/** Channel for notifications (e.g. "telegram", "whatsapp") */
|
||||
channel?: string;
|
||||
/** Orchestrator's session key (used as spawnedBy for subagent tracking) */
|
||||
sessionKey?: string;
|
||||
};
|
||||
@@ -204,6 +207,28 @@ export async function dispatchTask(
|
||||
|
||||
const announcement = buildAnnouncement(level, role, session.action, issueId, issueTitle, issueUrl);
|
||||
|
||||
// Notify workerStart (non-fatal)
|
||||
const notifyConfig = getNotificationConfig(pluginConfig);
|
||||
await notify(
|
||||
{
|
||||
type: "workerStart",
|
||||
project: project.name,
|
||||
groupId,
|
||||
issueId,
|
||||
issueTitle,
|
||||
issueUrl,
|
||||
role,
|
||||
level,
|
||||
sessionAction: session.action,
|
||||
},
|
||||
{
|
||||
workspaceDir,
|
||||
config: notifyConfig,
|
||||
groupId,
|
||||
channel: opts.channel ?? "telegram",
|
||||
},
|
||||
).catch(() => { /* non-fatal */ });
|
||||
|
||||
return { sessionAction: session.action, sessionKey: session.key, level, model, announcement };
|
||||
}
|
||||
|
||||
|
||||
@@ -8,7 +8,6 @@
|
||||
* - workerComplete: Worker completed task (→ project group)
|
||||
*/
|
||||
import { log as auditLog } from "./audit.js";
|
||||
import type { TickAction } from "./services/tick.js";
|
||||
import { runCommand } from "./run-command.js";
|
||||
|
||||
/** Per-event-type toggle. All default to true — set to false to suppress. */
|
||||
@@ -159,43 +158,6 @@ export async function notify(
|
||||
return sendMessage(target, message, channel, opts.workspaceDir);
|
||||
}
|
||||
|
||||
/**
|
||||
* Send workerStart notifications for each tick pickup.
|
||||
*
|
||||
* Called after projectTick() returns pickups — callers pass the array
|
||||
* so each dispatched task gets a visible start notification in the project group.
|
||||
*/
|
||||
export async function notifyTickPickups(
|
||||
pickups: TickAction[],
|
||||
opts: {
|
||||
workspaceDir: string;
|
||||
config?: NotificationConfig;
|
||||
channel?: string;
|
||||
},
|
||||
): Promise<void> {
|
||||
for (const pickup of pickups) {
|
||||
await notify(
|
||||
{
|
||||
type: "workerStart",
|
||||
project: pickup.project,
|
||||
groupId: pickup.groupId,
|
||||
issueId: pickup.issueId,
|
||||
issueTitle: pickup.issueTitle,
|
||||
issueUrl: pickup.issueUrl,
|
||||
role: pickup.role,
|
||||
level: pickup.level,
|
||||
sessionAction: pickup.sessionAction,
|
||||
},
|
||||
{
|
||||
workspaceDir: opts.workspaceDir,
|
||||
config: opts.config,
|
||||
groupId: pickup.groupId,
|
||||
channel: opts.channel,
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract notification config from plugin config.
|
||||
* All event types default to enabled (true).
|
||||
|
||||
@@ -18,7 +18,6 @@ import { log as auditLog } from "../audit.js";
|
||||
import { checkWorkerHealth, fetchGatewaySessions, type SessionLookup } from "./health.js";
|
||||
import { projectTick } from "./tick.js";
|
||||
import { createProvider } from "../providers/index.js";
|
||||
import { notifyTickPickups, getNotificationConfig } from "../notify.js";
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Types
|
||||
@@ -281,15 +280,7 @@ export async function tick(opts: {
|
||||
result.totalPickups += tickResult.pickups.length;
|
||||
result.totalSkipped += tickResult.skipped.length;
|
||||
|
||||
// Notify project group about any pickups
|
||||
if (tickResult.pickups.length > 0) {
|
||||
const notifyConfig = getNotificationConfig(pluginConfig);
|
||||
await notifyTickPickups(tickResult.pickups, {
|
||||
workspaceDir,
|
||||
config: notifyConfig,
|
||||
channel: project.channel,
|
||||
});
|
||||
}
|
||||
// Notifications now handled by dispatchTask
|
||||
if (isProjectActive || tickResult.pickups.length > 0) activeProjects++;
|
||||
}
|
||||
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
import type { StateLabel, IssueProvider } from "../providers/provider.js";
|
||||
import { deactivateWorker } from "../projects.js";
|
||||
import { runCommand } from "../run-command.js";
|
||||
import { notify, getNotificationConfig } from "../notify.js";
|
||||
|
||||
export type CompletionRule = {
|
||||
from: StateLabel;
|
||||
@@ -70,8 +71,11 @@ export async function executeCompletion(opts: {
|
||||
prUrl?: string;
|
||||
provider: IssueProvider;
|
||||
repoPath: string;
|
||||
projectName: string;
|
||||
channel?: string;
|
||||
pluginConfig?: Record<string, unknown>;
|
||||
}): Promise<CompletionOutput> {
|
||||
const { workspaceDir, groupId, role, result, issueId, summary, provider, repoPath } = opts;
|
||||
const { workspaceDir, groupId, role, result, issueId, summary, provider, repoPath, projectName, channel, pluginConfig } = opts;
|
||||
const key = `${role}:${result}`;
|
||||
const rule = COMPLETION_RULES[key];
|
||||
if (!rule) throw new Error(`No completion rule for ${key}`);
|
||||
@@ -108,6 +112,28 @@ export async function executeCompletion(opts: {
|
||||
if (prUrl) announcement += `\n🔗 PR: ${prUrl}`;
|
||||
announcement += `\n${NEXT_STATE[key]}.`;
|
||||
|
||||
// Notify workerComplete (non-fatal)
|
||||
const notifyConfig = getNotificationConfig(pluginConfig);
|
||||
await notify(
|
||||
{
|
||||
type: "workerComplete",
|
||||
project: projectName,
|
||||
groupId,
|
||||
issueId,
|
||||
issueUrl: issue.web_url,
|
||||
role,
|
||||
result: result as "done" | "pass" | "fail" | "refine" | "blocked",
|
||||
summary,
|
||||
nextState: NEXT_STATE[key],
|
||||
},
|
||||
{
|
||||
workspaceDir,
|
||||
config: notifyConfig,
|
||||
groupId,
|
||||
channel: channel ?? "telegram",
|
||||
},
|
||||
).catch(() => { /* non-fatal */ });
|
||||
|
||||
return {
|
||||
labelTransition: `${rule.from} → ${rule.to}`,
|
||||
announcement,
|
||||
|
||||
@@ -176,7 +176,9 @@ export async function projectTick(opts: {
|
||||
role, level: selectedLevel, fromLabel: currentLabel, toLabel: targetLabel,
|
||||
transitionLabel: (id, from, to) => provider.transitionLabel(id, from as StateLabel, to as StateLabel),
|
||||
provider: provider as IssueProvider,
|
||||
pluginConfig, sessionKey,
|
||||
pluginConfig,
|
||||
channel: fresh.channel,
|
||||
sessionKey,
|
||||
});
|
||||
pickups.push({
|
||||
project: project.name, groupId, issueId: issue.iid, issueTitle: issue.title, issueUrl: issue.web_url,
|
||||
|
||||
@@ -9,7 +9,6 @@ import type { ToolContext } from "./types.js";
|
||||
import { readProjects, getProject, type Project, type ProjectsData } from "./projects.js";
|
||||
import { createProvider, type ProviderWithType } from "./providers/index.js";
|
||||
import { projectTick, type TickAction } from "./services/tick.js";
|
||||
import { notifyTickPickups, getNotificationConfig } from "./notify.js";
|
||||
|
||||
/**
|
||||
* Require workspaceDir from context or throw a clear error.
|
||||
@@ -51,7 +50,7 @@ export function getPluginConfig(api: OpenClawPluginApi): Record<string, unknown>
|
||||
}
|
||||
|
||||
/**
|
||||
* Run projectTick (non-fatal) and send workerStart notifications for any pickups.
|
||||
* Run projectTick (non-fatal). Notifications are now handled by dispatchTask.
|
||||
* Returns the pickups array (empty on failure).
|
||||
*/
|
||||
export async function tickAndNotify(opts: {
|
||||
@@ -61,9 +60,7 @@ export async function tickAndNotify(opts: {
|
||||
pluginConfig?: Record<string, unknown>;
|
||||
sessionKey?: string;
|
||||
targetRole?: "dev" | "qa";
|
||||
channel?: string;
|
||||
}): Promise<TickAction[]> {
|
||||
let pickups: TickAction[] = [];
|
||||
try {
|
||||
const result = await projectTick({
|
||||
workspaceDir: opts.workspaceDir,
|
||||
@@ -73,17 +70,9 @@ export async function tickAndNotify(opts: {
|
||||
sessionKey: opts.sessionKey,
|
||||
targetRole: opts.targetRole,
|
||||
});
|
||||
pickups = result.pickups;
|
||||
} catch { /* non-fatal: tick failure shouldn't break the caller */ }
|
||||
|
||||
if (pickups.length) {
|
||||
const notifyConfig = getNotificationConfig(opts.pluginConfig);
|
||||
await notifyTickPickups(pickups, {
|
||||
workspaceDir: opts.workspaceDir,
|
||||
config: notifyConfig,
|
||||
channel: opts.channel,
|
||||
});
|
||||
return result.pickups;
|
||||
} catch {
|
||||
/* non-fatal: tick failure shouldn't break the caller */
|
||||
return [];
|
||||
}
|
||||
|
||||
return pickups;
|
||||
}
|
||||
|
||||
@@ -10,7 +10,6 @@ import type { ToolContext } from "../types.js";
|
||||
import { getWorker, resolveRepoPath } from "../projects.js";
|
||||
import { executeCompletion, getRule, NEXT_STATE } from "../services/pipeline.js";
|
||||
import { log as auditLog } from "../audit.js";
|
||||
import { notify, getNotificationConfig } from "../notify.js";
|
||||
import { requireWorkspaceDir, resolveProject, resolveProvider, getPluginConfig, tickAndNotify } from "../tool-helpers.js";
|
||||
|
||||
export function createWorkFinishTool(api: OpenClawPluginApi) {
|
||||
@@ -58,9 +57,14 @@ export function createWorkFinishTool(api: OpenClawPluginApi) {
|
||||
const repoPath = resolveRepoPath(project.repo);
|
||||
const issue = await provider.getIssue(issueId);
|
||||
|
||||
// Execute completion (pipeline service)
|
||||
const pluginConfig = getPluginConfig(api);
|
||||
|
||||
// Execute completion (pipeline service handles notification)
|
||||
const completion = await executeCompletion({
|
||||
workspaceDir, groupId, role, result, issueId, summary, prUrl, provider, repoPath,
|
||||
projectName: project.name,
|
||||
channel: project.channel,
|
||||
pluginConfig,
|
||||
});
|
||||
|
||||
const output: Record<string, unknown> = {
|
||||
@@ -68,18 +72,9 @@ export function createWorkFinishTool(api: OpenClawPluginApi) {
|
||||
...completion,
|
||||
};
|
||||
|
||||
// Notify completion
|
||||
const pluginConfig = getPluginConfig(api);
|
||||
const notifyConfig = getNotificationConfig(pluginConfig);
|
||||
await notify(
|
||||
{ type: "workerComplete", project: project.name, groupId, issueId, issueUrl: issue.web_url, role, result: result as "done" | "pass" | "fail" | "refine" | "blocked", summary, nextState: NEXT_STATE[`${role}:${result}`] },
|
||||
{ workspaceDir, config: notifyConfig, groupId, channel: project.channel ?? "telegram" },
|
||||
);
|
||||
|
||||
// Tick: fill free slots + notify starts
|
||||
// Tick: fill free slots (notifications handled by dispatchTask)
|
||||
const tickPickups = await tickAndNotify({
|
||||
workspaceDir, groupId, agentId: ctx.agentId, pluginConfig, sessionKey: ctx.sessionKey,
|
||||
channel: project.channel ?? "telegram",
|
||||
});
|
||||
if (tickPickups.length) output.tickPickups = tickPickups;
|
||||
|
||||
|
||||
@@ -12,7 +12,6 @@ import type { StateLabel } from "../providers/provider.js";
|
||||
import { selectLevel } from "../model-selector.js";
|
||||
import { getWorker } from "../projects.js";
|
||||
import { dispatchTask } from "../dispatch.js";
|
||||
import { notify, getNotificationConfig } from "../notify.js";
|
||||
import { findNextIssue, detectRoleFromLabel, detectLevelFromLabels } from "../services/tick.js";
|
||||
import { isDevLevel } from "../tiers.js";
|
||||
import { requireWorkspaceDir, resolveProject, resolveProvider, getPluginConfig } from "../tool-helpers.js";
|
||||
@@ -98,16 +97,11 @@ export function createWorkStartTool(api: OpenClawPluginApi) {
|
||||
role, level: selectedLevel, fromLabel: currentLabel, toLabel: targetLabel,
|
||||
transitionLabel: (id, from, to) => provider.transitionLabel(id, from as StateLabel, to as StateLabel),
|
||||
provider,
|
||||
pluginConfig, sessionKey: ctx.sessionKey,
|
||||
pluginConfig,
|
||||
channel: project.channel,
|
||||
sessionKey: ctx.sessionKey,
|
||||
});
|
||||
|
||||
// Notify
|
||||
const notifyConfig = getNotificationConfig(pluginConfig);
|
||||
await notify(
|
||||
{ type: "workerStart", project: project.name, groupId, issueId: issue.iid, issueTitle: issue.title, issueUrl: issue.web_url, role, level: dr.level, sessionAction: dr.sessionAction },
|
||||
{ workspaceDir, config: notifyConfig, groupId, channel: project.channel ?? "telegram" },
|
||||
);
|
||||
|
||||
// Auto-tick disabled per issue #125 - work_start should only pick up the explicitly requested issue
|
||||
// The heartbeat service fills parallel slots automatically
|
||||
|
||||
|
||||
Reference in New Issue
Block a user