Merge pull request #151 from laurentenhoor/feature/150-centralize-notifications
refactor: centralize notifications in core dispatch/completion functions (#150)
This commit is contained in:
@@ -15,6 +15,7 @@ import {
|
|||||||
getWorker,
|
getWorker,
|
||||||
} from "./projects.js";
|
} from "./projects.js";
|
||||||
import { resolveModel, levelEmoji } from "./tiers.js";
|
import { resolveModel, levelEmoji } from "./tiers.js";
|
||||||
|
import { notify, getNotificationConfig } from "./notify.js";
|
||||||
|
|
||||||
export type DispatchOpts = {
|
export type DispatchOpts = {
|
||||||
workspaceDir: string;
|
workspaceDir: string;
|
||||||
@@ -36,8 +37,10 @@ export type DispatchOpts = {
|
|||||||
transitionLabel: (issueId: number, from: string, to: string) => Promise<void>;
|
transitionLabel: (issueId: number, from: string, to: string) => Promise<void>;
|
||||||
/** Issue provider for fetching comments */
|
/** Issue provider for fetching comments */
|
||||||
provider: import("./providers/provider.js").IssueProvider;
|
provider: import("./providers/provider.js").IssueProvider;
|
||||||
/** Plugin config for model resolution */
|
/** Plugin config for model resolution and notification config */
|
||||||
pluginConfig?: Record<string, unknown>;
|
pluginConfig?: Record<string, unknown>;
|
||||||
|
/** Channel for notifications (e.g. "telegram", "whatsapp") */
|
||||||
|
channel?: string;
|
||||||
/** Orchestrator's session key (used as spawnedBy for subagent tracking) */
|
/** Orchestrator's session key (used as spawnedBy for subagent tracking) */
|
||||||
sessionKey?: string;
|
sessionKey?: string;
|
||||||
};
|
};
|
||||||
@@ -204,6 +207,28 @@ export async function dispatchTask(
|
|||||||
|
|
||||||
const announcement = buildAnnouncement(level, role, session.action, issueId, issueTitle, issueUrl);
|
const announcement = buildAnnouncement(level, role, session.action, issueId, issueTitle, issueUrl);
|
||||||
|
|
||||||
|
// Notify workerStart (non-fatal)
|
||||||
|
const notifyConfig = getNotificationConfig(pluginConfig);
|
||||||
|
await notify(
|
||||||
|
{
|
||||||
|
type: "workerStart",
|
||||||
|
project: project.name,
|
||||||
|
groupId,
|
||||||
|
issueId,
|
||||||
|
issueTitle,
|
||||||
|
issueUrl,
|
||||||
|
role,
|
||||||
|
level,
|
||||||
|
sessionAction: session.action,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
workspaceDir,
|
||||||
|
config: notifyConfig,
|
||||||
|
groupId,
|
||||||
|
channel: opts.channel ?? "telegram",
|
||||||
|
},
|
||||||
|
).catch(() => { /* non-fatal */ });
|
||||||
|
|
||||||
return { sessionAction: session.action, sessionKey: session.key, level, model, announcement };
|
return { sessionAction: session.action, sessionKey: session.key, level, model, announcement };
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -8,7 +8,6 @@
|
|||||||
* - workerComplete: Worker completed task (→ project group)
|
* - workerComplete: Worker completed task (→ project group)
|
||||||
*/
|
*/
|
||||||
import { log as auditLog } from "./audit.js";
|
import { log as auditLog } from "./audit.js";
|
||||||
import type { TickAction } from "./services/tick.js";
|
|
||||||
import { runCommand } from "./run-command.js";
|
import { runCommand } from "./run-command.js";
|
||||||
|
|
||||||
/** Per-event-type toggle. All default to true — set to false to suppress. */
|
/** Per-event-type toggle. All default to true — set to false to suppress. */
|
||||||
@@ -159,43 +158,6 @@ export async function notify(
|
|||||||
return sendMessage(target, message, channel, opts.workspaceDir);
|
return sendMessage(target, message, channel, opts.workspaceDir);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Send workerStart notifications for each tick pickup.
|
|
||||||
*
|
|
||||||
* Called after projectTick() returns pickups — callers pass the array
|
|
||||||
* so each dispatched task gets a visible start notification in the project group.
|
|
||||||
*/
|
|
||||||
export async function notifyTickPickups(
|
|
||||||
pickups: TickAction[],
|
|
||||||
opts: {
|
|
||||||
workspaceDir: string;
|
|
||||||
config?: NotificationConfig;
|
|
||||||
channel?: string;
|
|
||||||
},
|
|
||||||
): Promise<void> {
|
|
||||||
for (const pickup of pickups) {
|
|
||||||
await notify(
|
|
||||||
{
|
|
||||||
type: "workerStart",
|
|
||||||
project: pickup.project,
|
|
||||||
groupId: pickup.groupId,
|
|
||||||
issueId: pickup.issueId,
|
|
||||||
issueTitle: pickup.issueTitle,
|
|
||||||
issueUrl: pickup.issueUrl,
|
|
||||||
role: pickup.role,
|
|
||||||
level: pickup.level,
|
|
||||||
sessionAction: pickup.sessionAction,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
workspaceDir: opts.workspaceDir,
|
|
||||||
config: opts.config,
|
|
||||||
groupId: pickup.groupId,
|
|
||||||
channel: opts.channel,
|
|
||||||
},
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Extract notification config from plugin config.
|
* Extract notification config from plugin config.
|
||||||
* All event types default to enabled (true).
|
* All event types default to enabled (true).
|
||||||
|
|||||||
@@ -18,7 +18,6 @@ import { log as auditLog } from "../audit.js";
|
|||||||
import { checkWorkerHealth, fetchGatewaySessions, type SessionLookup } from "./health.js";
|
import { checkWorkerHealth, fetchGatewaySessions, type SessionLookup } from "./health.js";
|
||||||
import { projectTick } from "./tick.js";
|
import { projectTick } from "./tick.js";
|
||||||
import { createProvider } from "../providers/index.js";
|
import { createProvider } from "../providers/index.js";
|
||||||
import { notifyTickPickups, getNotificationConfig } from "../notify.js";
|
|
||||||
|
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
// Types
|
// Types
|
||||||
@@ -281,15 +280,7 @@ export async function tick(opts: {
|
|||||||
result.totalPickups += tickResult.pickups.length;
|
result.totalPickups += tickResult.pickups.length;
|
||||||
result.totalSkipped += tickResult.skipped.length;
|
result.totalSkipped += tickResult.skipped.length;
|
||||||
|
|
||||||
// Notify project group about any pickups
|
// Notifications now handled by dispatchTask
|
||||||
if (tickResult.pickups.length > 0) {
|
|
||||||
const notifyConfig = getNotificationConfig(pluginConfig);
|
|
||||||
await notifyTickPickups(tickResult.pickups, {
|
|
||||||
workspaceDir,
|
|
||||||
config: notifyConfig,
|
|
||||||
channel: project.channel,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
if (isProjectActive || tickResult.pickups.length > 0) activeProjects++;
|
if (isProjectActive || tickResult.pickups.length > 0) activeProjects++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -6,6 +6,7 @@
|
|||||||
import type { StateLabel, IssueProvider } from "../providers/provider.js";
|
import type { StateLabel, IssueProvider } from "../providers/provider.js";
|
||||||
import { deactivateWorker } from "../projects.js";
|
import { deactivateWorker } from "../projects.js";
|
||||||
import { runCommand } from "../run-command.js";
|
import { runCommand } from "../run-command.js";
|
||||||
|
import { notify, getNotificationConfig } from "../notify.js";
|
||||||
|
|
||||||
export type CompletionRule = {
|
export type CompletionRule = {
|
||||||
from: StateLabel;
|
from: StateLabel;
|
||||||
@@ -70,8 +71,11 @@ export async function executeCompletion(opts: {
|
|||||||
prUrl?: string;
|
prUrl?: string;
|
||||||
provider: IssueProvider;
|
provider: IssueProvider;
|
||||||
repoPath: string;
|
repoPath: string;
|
||||||
|
projectName: string;
|
||||||
|
channel?: string;
|
||||||
|
pluginConfig?: Record<string, unknown>;
|
||||||
}): Promise<CompletionOutput> {
|
}): Promise<CompletionOutput> {
|
||||||
const { workspaceDir, groupId, role, result, issueId, summary, provider, repoPath } = opts;
|
const { workspaceDir, groupId, role, result, issueId, summary, provider, repoPath, projectName, channel, pluginConfig } = opts;
|
||||||
const key = `${role}:${result}`;
|
const key = `${role}:${result}`;
|
||||||
const rule = COMPLETION_RULES[key];
|
const rule = COMPLETION_RULES[key];
|
||||||
if (!rule) throw new Error(`No completion rule for ${key}`);
|
if (!rule) throw new Error(`No completion rule for ${key}`);
|
||||||
@@ -108,6 +112,28 @@ export async function executeCompletion(opts: {
|
|||||||
if (prUrl) announcement += `\n🔗 PR: ${prUrl}`;
|
if (prUrl) announcement += `\n🔗 PR: ${prUrl}`;
|
||||||
announcement += `\n${NEXT_STATE[key]}.`;
|
announcement += `\n${NEXT_STATE[key]}.`;
|
||||||
|
|
||||||
|
// Notify workerComplete (non-fatal)
|
||||||
|
const notifyConfig = getNotificationConfig(pluginConfig);
|
||||||
|
await notify(
|
||||||
|
{
|
||||||
|
type: "workerComplete",
|
||||||
|
project: projectName,
|
||||||
|
groupId,
|
||||||
|
issueId,
|
||||||
|
issueUrl: issue.web_url,
|
||||||
|
role,
|
||||||
|
result: result as "done" | "pass" | "fail" | "refine" | "blocked",
|
||||||
|
summary,
|
||||||
|
nextState: NEXT_STATE[key],
|
||||||
|
},
|
||||||
|
{
|
||||||
|
workspaceDir,
|
||||||
|
config: notifyConfig,
|
||||||
|
groupId,
|
||||||
|
channel: channel ?? "telegram",
|
||||||
|
},
|
||||||
|
).catch(() => { /* non-fatal */ });
|
||||||
|
|
||||||
return {
|
return {
|
||||||
labelTransition: `${rule.from} → ${rule.to}`,
|
labelTransition: `${rule.from} → ${rule.to}`,
|
||||||
announcement,
|
announcement,
|
||||||
|
|||||||
@@ -176,7 +176,9 @@ export async function projectTick(opts: {
|
|||||||
role, level: selectedLevel, fromLabel: currentLabel, toLabel: targetLabel,
|
role, level: selectedLevel, fromLabel: currentLabel, toLabel: targetLabel,
|
||||||
transitionLabel: (id, from, to) => provider.transitionLabel(id, from as StateLabel, to as StateLabel),
|
transitionLabel: (id, from, to) => provider.transitionLabel(id, from as StateLabel, to as StateLabel),
|
||||||
provider: provider as IssueProvider,
|
provider: provider as IssueProvider,
|
||||||
pluginConfig, sessionKey,
|
pluginConfig,
|
||||||
|
channel: fresh.channel,
|
||||||
|
sessionKey,
|
||||||
});
|
});
|
||||||
pickups.push({
|
pickups.push({
|
||||||
project: project.name, groupId, issueId: issue.iid, issueTitle: issue.title, issueUrl: issue.web_url,
|
project: project.name, groupId, issueId: issue.iid, issueTitle: issue.title, issueUrl: issue.web_url,
|
||||||
|
|||||||
@@ -9,7 +9,6 @@ import type { ToolContext } from "./types.js";
|
|||||||
import { readProjects, getProject, type Project, type ProjectsData } from "./projects.js";
|
import { readProjects, getProject, type Project, type ProjectsData } from "./projects.js";
|
||||||
import { createProvider, type ProviderWithType } from "./providers/index.js";
|
import { createProvider, type ProviderWithType } from "./providers/index.js";
|
||||||
import { projectTick, type TickAction } from "./services/tick.js";
|
import { projectTick, type TickAction } from "./services/tick.js";
|
||||||
import { notifyTickPickups, getNotificationConfig } from "./notify.js";
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Require workspaceDir from context or throw a clear error.
|
* Require workspaceDir from context or throw a clear error.
|
||||||
@@ -51,7 +50,7 @@ export function getPluginConfig(api: OpenClawPluginApi): Record<string, unknown>
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Run projectTick (non-fatal) and send workerStart notifications for any pickups.
|
* Run projectTick (non-fatal). Notifications are now handled by dispatchTask.
|
||||||
* Returns the pickups array (empty on failure).
|
* Returns the pickups array (empty on failure).
|
||||||
*/
|
*/
|
||||||
export async function tickAndNotify(opts: {
|
export async function tickAndNotify(opts: {
|
||||||
@@ -61,9 +60,7 @@ export async function tickAndNotify(opts: {
|
|||||||
pluginConfig?: Record<string, unknown>;
|
pluginConfig?: Record<string, unknown>;
|
||||||
sessionKey?: string;
|
sessionKey?: string;
|
||||||
targetRole?: "dev" | "qa";
|
targetRole?: "dev" | "qa";
|
||||||
channel?: string;
|
|
||||||
}): Promise<TickAction[]> {
|
}): Promise<TickAction[]> {
|
||||||
let pickups: TickAction[] = [];
|
|
||||||
try {
|
try {
|
||||||
const result = await projectTick({
|
const result = await projectTick({
|
||||||
workspaceDir: opts.workspaceDir,
|
workspaceDir: opts.workspaceDir,
|
||||||
@@ -73,17 +70,9 @@ export async function tickAndNotify(opts: {
|
|||||||
sessionKey: opts.sessionKey,
|
sessionKey: opts.sessionKey,
|
||||||
targetRole: opts.targetRole,
|
targetRole: opts.targetRole,
|
||||||
});
|
});
|
||||||
pickups = result.pickups;
|
return result.pickups;
|
||||||
} catch { /* non-fatal: tick failure shouldn't break the caller */ }
|
} catch {
|
||||||
|
/* non-fatal: tick failure shouldn't break the caller */
|
||||||
if (pickups.length) {
|
return [];
|
||||||
const notifyConfig = getNotificationConfig(opts.pluginConfig);
|
|
||||||
await notifyTickPickups(pickups, {
|
|
||||||
workspaceDir: opts.workspaceDir,
|
|
||||||
config: notifyConfig,
|
|
||||||
channel: opts.channel,
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return pickups;
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -10,7 +10,6 @@ import type { ToolContext } from "../types.js";
|
|||||||
import { getWorker, resolveRepoPath } from "../projects.js";
|
import { getWorker, resolveRepoPath } from "../projects.js";
|
||||||
import { executeCompletion, getRule, NEXT_STATE } from "../services/pipeline.js";
|
import { executeCompletion, getRule, NEXT_STATE } from "../services/pipeline.js";
|
||||||
import { log as auditLog } from "../audit.js";
|
import { log as auditLog } from "../audit.js";
|
||||||
import { notify, getNotificationConfig } from "../notify.js";
|
|
||||||
import { requireWorkspaceDir, resolveProject, resolveProvider, getPluginConfig, tickAndNotify } from "../tool-helpers.js";
|
import { requireWorkspaceDir, resolveProject, resolveProvider, getPluginConfig, tickAndNotify } from "../tool-helpers.js";
|
||||||
|
|
||||||
export function createWorkFinishTool(api: OpenClawPluginApi) {
|
export function createWorkFinishTool(api: OpenClawPluginApi) {
|
||||||
@@ -58,9 +57,14 @@ export function createWorkFinishTool(api: OpenClawPluginApi) {
|
|||||||
const repoPath = resolveRepoPath(project.repo);
|
const repoPath = resolveRepoPath(project.repo);
|
||||||
const issue = await provider.getIssue(issueId);
|
const issue = await provider.getIssue(issueId);
|
||||||
|
|
||||||
// Execute completion (pipeline service)
|
const pluginConfig = getPluginConfig(api);
|
||||||
|
|
||||||
|
// Execute completion (pipeline service handles notification)
|
||||||
const completion = await executeCompletion({
|
const completion = await executeCompletion({
|
||||||
workspaceDir, groupId, role, result, issueId, summary, prUrl, provider, repoPath,
|
workspaceDir, groupId, role, result, issueId, summary, prUrl, provider, repoPath,
|
||||||
|
projectName: project.name,
|
||||||
|
channel: project.channel,
|
||||||
|
pluginConfig,
|
||||||
});
|
});
|
||||||
|
|
||||||
const output: Record<string, unknown> = {
|
const output: Record<string, unknown> = {
|
||||||
@@ -68,18 +72,9 @@ export function createWorkFinishTool(api: OpenClawPluginApi) {
|
|||||||
...completion,
|
...completion,
|
||||||
};
|
};
|
||||||
|
|
||||||
// Notify completion
|
// Tick: fill free slots (notifications handled by dispatchTask)
|
||||||
const pluginConfig = getPluginConfig(api);
|
|
||||||
const notifyConfig = getNotificationConfig(pluginConfig);
|
|
||||||
await notify(
|
|
||||||
{ type: "workerComplete", project: project.name, groupId, issueId, issueUrl: issue.web_url, role, result: result as "done" | "pass" | "fail" | "refine" | "blocked", summary, nextState: NEXT_STATE[`${role}:${result}`] },
|
|
||||||
{ workspaceDir, config: notifyConfig, groupId, channel: project.channel ?? "telegram" },
|
|
||||||
);
|
|
||||||
|
|
||||||
// Tick: fill free slots + notify starts
|
|
||||||
const tickPickups = await tickAndNotify({
|
const tickPickups = await tickAndNotify({
|
||||||
workspaceDir, groupId, agentId: ctx.agentId, pluginConfig, sessionKey: ctx.sessionKey,
|
workspaceDir, groupId, agentId: ctx.agentId, pluginConfig, sessionKey: ctx.sessionKey,
|
||||||
channel: project.channel ?? "telegram",
|
|
||||||
});
|
});
|
||||||
if (tickPickups.length) output.tickPickups = tickPickups;
|
if (tickPickups.length) output.tickPickups = tickPickups;
|
||||||
|
|
||||||
|
|||||||
@@ -12,7 +12,6 @@ import type { StateLabel } from "../providers/provider.js";
|
|||||||
import { selectLevel } from "../model-selector.js";
|
import { selectLevel } from "../model-selector.js";
|
||||||
import { getWorker } from "../projects.js";
|
import { getWorker } from "../projects.js";
|
||||||
import { dispatchTask } from "../dispatch.js";
|
import { dispatchTask } from "../dispatch.js";
|
||||||
import { notify, getNotificationConfig } from "../notify.js";
|
|
||||||
import { findNextIssue, detectRoleFromLabel, detectLevelFromLabels } from "../services/tick.js";
|
import { findNextIssue, detectRoleFromLabel, detectLevelFromLabels } from "../services/tick.js";
|
||||||
import { isDevLevel } from "../tiers.js";
|
import { isDevLevel } from "../tiers.js";
|
||||||
import { requireWorkspaceDir, resolveProject, resolveProvider, getPluginConfig } from "../tool-helpers.js";
|
import { requireWorkspaceDir, resolveProject, resolveProvider, getPluginConfig } from "../tool-helpers.js";
|
||||||
@@ -98,16 +97,11 @@ export function createWorkStartTool(api: OpenClawPluginApi) {
|
|||||||
role, level: selectedLevel, fromLabel: currentLabel, toLabel: targetLabel,
|
role, level: selectedLevel, fromLabel: currentLabel, toLabel: targetLabel,
|
||||||
transitionLabel: (id, from, to) => provider.transitionLabel(id, from as StateLabel, to as StateLabel),
|
transitionLabel: (id, from, to) => provider.transitionLabel(id, from as StateLabel, to as StateLabel),
|
||||||
provider,
|
provider,
|
||||||
pluginConfig, sessionKey: ctx.sessionKey,
|
pluginConfig,
|
||||||
|
channel: project.channel,
|
||||||
|
sessionKey: ctx.sessionKey,
|
||||||
});
|
});
|
||||||
|
|
||||||
// Notify
|
|
||||||
const notifyConfig = getNotificationConfig(pluginConfig);
|
|
||||||
await notify(
|
|
||||||
{ type: "workerStart", project: project.name, groupId, issueId: issue.iid, issueTitle: issue.title, issueUrl: issue.web_url, role, level: dr.level, sessionAction: dr.sessionAction },
|
|
||||||
{ workspaceDir, config: notifyConfig, groupId, channel: project.channel ?? "telegram" },
|
|
||||||
);
|
|
||||||
|
|
||||||
// Auto-tick disabled per issue #125 - work_start should only pick up the explicitly requested issue
|
// Auto-tick disabled per issue #125 - work_start should only pick up the explicitly requested issue
|
||||||
// The heartbeat service fills parallel slots automatically
|
// The heartbeat service fills parallel slots automatically
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user