PR #589: fix native Conductor dispatch and stale terminal state (Battlelamb)

Resolved swarm-dispatch.ts conflict by taking buildHermesChatQueryArgs helper
(correct -q prompt adjacency). Adds 8 passing regression tests for native
dispatch/runtime + terminal active-mission persistence. eslint --fix on touched files.
This commit is contained in:
Aurora
2026-06-05 05:58:15 -04:00
parent 8d3c400f83
commit ef2e4ba02b
13 changed files with 424 additions and 91 deletions

View File

@@ -8,6 +8,8 @@ import {
import { QueryClient, QueryClientProvider } from '@tanstack/react-query'
import { useEffect, useState } from 'react'
import appCss from '../styles.css?url'
import { getRootSurfaceState } from './-root-layout-state'
import type {AuthStatus} from '@/lib/claude-auth';
import { SearchModal } from '@/components/search/search-modal'
import { UsageMeter } from '@/components/usage-meter'
import { TerminalShortcutListener } from '@/components/terminal-shortcut-listener'
@@ -27,8 +29,7 @@ import {
} from '@/components/onboarding/claude-onboarding'
import { ErrorBoundary } from '@/components/error-boundary'
import { LoginScreen } from '@/components/auth/login-screen'
import { fetchClaudeAuthStatus, type AuthStatus } from '@/lib/claude-auth'
import { getRootSurfaceState } from './-root-layout-state'
import { fetchClaudeAuthStatus } from '@/lib/claude-auth'
const APP_CSP = [
"default-src 'self'",
@@ -114,6 +115,13 @@ const themeColorScript = `
})()
`
const DEFAULT_SPLASH_HTML = `
<img src="/claude-avatar.webp" alt="Hermes Agent" style="width:80px;height:80px;margin-bottom:20px;border-radius:16px;filter:drop-shadow(0 8px 32px color-mix(in srgb,#FFAC02 45%, transparent))" />
<img src="/claude-banner.png" alt="Hermes Workspace" style="width:280px;height:auto;margin-bottom:8px;filter:drop-shadow(0 4px 16px rgba(0,0,0,0.5))" />
<div style="font:400 14px/1 system-ui,-apple-system,sans-serif;letter-spacing:0.04em;color:#9CB2AE">Workspace</div>
<div style="margin-top:28px;width:140px;height:3px;background:rgba(255,255,255,0.08);border-radius:3px;overflow:hidden;position:relative"><div id="splash-bar" style="width:0%;height:100%;background:#FFAC02;border-radius:3px;transition:width 0.4s ease"></div></div>
`
export const Route = createRootRoute({
head: () => ({
meta: [
@@ -416,7 +424,16 @@ function RootDocument({ children }: { children: React.ReactNode }) {
/>
</head>
<body>
<div id="splash-screen" aria-hidden="true" style={{ display: 'none' }} />
{/* The inline splash bootstrap mutates this node before React hydrates.
Keep default splash markup in the server/client tree, then suppress
parent-level style/theme mutations for this intentionally browser-owned DOM. */}
<div
id="splash-screen"
aria-hidden="true"
suppressHydrationWarning
style={{ display: 'none' }}
dangerouslySetInnerHTML={{ __html: DEFAULT_SPLASH_HTML }}
/>
<script
dangerouslySetInnerHTML={{
__html: wrapInlineScript(`

View File

@@ -14,22 +14,31 @@ describe('native Conductor fallback', () => {
supervised: false,
})
expect(assignments.map((assignment) => assignment.workerId)).toEqual(['swarm2', 'swarm5', 'swarm6', 'swarm11'])
expect(assignments.map((assignment) => assignment.workerId)).toEqual(['ops-watch', 'builder', 'reviewer', 'qa'])
expect(assignments[0].task).toContain('Conductor mission: Fix conductor')
expect(assignments.every((assignment) => assignment.direct === true)).toBe(true)
expect(assignments.every((assignment) => assignment.reviewRequired === false)).toBe(true)
})
it('uses Scribe when the mission asks for documentation even with a smaller lane count', () => {
it('uses KM Agent when the mission asks for documentation even with a smaller lane count', () => {
const assignments = buildNativeConductorAssignments('Write docs and handoff for the release', {
maxParallel: 3,
supervised: true,
})
expect(assignments.map((assignment) => assignment.workerId)).toContain('swarm7')
expect(assignments.map((assignment) => assignment.workerId)).toContain('km-agent')
expect(assignments.some((assignment) => assignment.task.includes('Supervised mode'))).toBe(true)
})
it('does not collapse generic two-lane missions to a single worker', () => {
const assignments = buildNativeConductorAssignments('Create a small UI prototype', {
maxParallel: 2,
supervised: false,
})
expect(assignments.map((assignment) => assignment.workerId)).toEqual(['builder', 'reviewer'])
})
it('normalizes native swarm missions into the Conductor mission status contract', () => {
const mission: SwarmMission = {
id: 'conductor-test',
@@ -40,9 +49,9 @@ describe('native Conductor fallback', () => {
assignments: [
{
id: 'a1',
workerId: 'swarm2',
workerId: 'builder',
task: 'Run smoke',
rationale: 'Foundation',
rationale: 'Builder',
dependsOn: [],
reviewRequired: false,
state: 'dispatched',
@@ -64,6 +73,6 @@ describe('native Conductor fallback', () => {
expect(record.nativeSwarm).toBe(true)
expect(record.modeOfficialOotb).toBe(true)
expect(record.modeNote).toBe(NATIVE_CONDUCTOR_MODE_NOTE)
expect(record.lines.join('\n')).toContain('swarm2 dispatched')
expect(record.lines.join('\n')).toContain('builder dispatched')
})
})

View File

@@ -1,8 +1,10 @@
import { describe, expect, it } from 'vitest'
import {
buildHermesChatQueryArgs,
buildHermesTmuxLaunchCommand,
buildWorkerPrompt,
checkpointFromRuntimeSnapshot,
dispatchBlockReason,
runtimeCheckpointSignature,
runtimeSnapshotIsFresh,
} from './swarm-dispatch'
@@ -46,6 +48,14 @@ describe('checkpointFromRuntimeSnapshot', () => {
})
})
describe('dispatchBlockReason', () => {
it('turns failed or timed-out dispatch results into mission blocker text', () => {
expect(dispatchBlockReason({ ok: false, error: 'Command failed: worker exited', output: '', checkpointStatus: undefined })).toBe('Command failed: worker exited')
expect(dispatchBlockReason({ ok: true, error: null, output: 'Delivered', checkpointStatus: 'timeout' })).toBe('No fresh checkpoint before poll timeout.')
expect(dispatchBlockReason({ ok: true, error: null, output: 'Checkpoint DONE', checkpointStatus: 'checkpointed' })).toBeNull()
})
})
describe('runtimeSnapshotIsFresh', () => {
it('requires a changed snapshot with post-dispatch activity', () => {
const baseline = {
@@ -109,6 +119,20 @@ describe('buildHermesTmuxLaunchCommand', () => {
})
})
describe('buildHermesChatQueryArgs', () => {
it('passes the prompt immediately after -q so flags are not parsed as the query', () => {
const prompt = 'STATE: DONE\nRESULT: ok'
const args = buildHermesChatQueryArgs(prompt)
expect(args.slice(0, 3)).toEqual(['chat', '-q', prompt])
expect(args).toContain('-Q')
expect(args).toContain('--source')
expect(args[1]).toBe('-q')
expect(args[2]).toBe(prompt)
expect(args[3]).toBe('-Q')
})
})
describe('buildWorkerPrompt', () => {
const roster = {
id: 'swarm5',
@@ -117,9 +141,15 @@ describe('buildWorkerPrompt', () => {
specialty: 'full-stack implementation across Hermes Workspace and Swarm2',
model: 'GPT-5.5',
mission: 'Ship focused product slices with tests and clean diffs.',
modes: [],
tools: [],
skills: ['swarm-ui-worker', 'swarm-worker-core'],
plugins: [],
pluginToolsets: [],
mcpServers: [],
capabilities: ['code-editing', 'ui-implementation', 'build-verification'],
preferredTaskTypes: ['implementation'],
greenlightRequiredFor: [],
maxConcurrentTasks: 1,
acceptsBroadcast: true,
reviewRequired: false,

View File

@@ -7,13 +7,12 @@ import { isAuthenticated } from '../../server/auth-middleware'
import { requireJsonContentType } from '../../server/rate-limit'
import { dashboardFetch, ensureGatewayProbed } from '../../server/gateway-capabilities'
import { sanitizeConductorMissionGoal } from '../../server/conductor-mission-sanitize'
import { getSwarmMission } from '../../server/swarm-missions'
import { dispatchSwarmAssignments, readRuntimeCheckpointSnapshot, checkpointFromRuntimeSnapshot, runtimeCheckpointSignature } from './swarm-dispatch'
import type { SwarmMission } from '../../server/swarm-missions'
import { recordMissionCheckpoint } from '../../server/swarm-missions'
import { getSwarmMission, recordMissionCheckpoint } from '../../server/swarm-missions'
import { getSwarmProfilePath } from '../../server/swarm-foundation'
import { readWorkerMessages } from '../../server/swarm-chat-reader'
import { newestCheckpointFromMessages } from '../../server/swarm-checkpoints'
import { checkpointFromRuntimeSnapshot, dispatchSwarmAssignments, readRuntimeCheckpointSnapshot, runtimeCheckpointSignature } from './swarm-dispatch'
import type { SwarmMission } from '../../server/swarm-missions'
let cachedSkill: string | null = null
@@ -148,96 +147,115 @@ function clipText(value: string, max = 8000): string {
export function buildNativeConductorAssignments(goal: string, options: { maxParallel: number; supervised: boolean }): Array<NativeConductorAssignment> {
const maxParallel = Math.min(5, Math.max(1, options.maxParallel || 1))
const normalizedGoal = goal.toLowerCase()
const wantsProduction = /production|ready|harden|audit|clean|fix|bug|test|build|release|deploy|operational/.test(normalizedGoal)
const wantsDocs = /doc|handoff|readme|spec|plan|summary/.test(normalizedGoal)
const wantsOps = /production|ready|harden|audit|clean|fix|bug|test|build|release|deploy|operational|runtime|gateway|tmux|service|health/.test(normalizedGoal)
const wantsDocs = /doc|handoff|readme|spec|plan|summary|knowledge|note/.test(normalizedGoal)
const assignments: Array<NativeConductorAssignment> = []
assignments.push({
workerId: wantsProduction ? 'swarm2' : 'swarm5',
rationale: wantsProduction ? 'Foundation owns runtime contracts and production blockers.' : 'Builder owns the primary implementation lane.',
const pushUnique = (assignment: NativeConductorAssignment) => {
if (!assignments.some((existing) => existing.workerId === assignment.workerId)) assignments.push(assignment)
}
pushUnique({
workerId: wantsOps ? 'ops-watch' : 'builder',
rationale: wantsOps ? 'Ops Watch owns runtime health, service quality, and production blockers.' : 'Builder owns scoped implementation and concrete progress.',
reviewRequired: false,
direct: true,
task: [
`Conductor mission: ${goal}`,
'',
'Lane: Foundation / primary implementation.',
'Find the smallest safe execution plan, make concrete progress, and produce a checkpoint. If code changes are required, keep them scoped and testable.',
wantsOps ? 'Lane: Ops Watch / runtime quality.' : 'Lane: Builder / primary implementation.',
wantsOps
? 'Diagnose the runtime path, make the smallest safe operational improvement, and return proof. Avoid destructive changes unless explicitly approved.'
: 'Find the smallest safe execution plan, make concrete progress, and produce a checkpoint. If code changes are required, keep them scoped and testable.',
options.supervised ? 'Supervised mode: stop before destructive writes or commits and report the exact approval needed.' : 'Do not ask for confirmation unless blocked; start immediately.',
].join('\n'),
})
if (maxParallel >= 2) {
assignments.push({
workerId: 'swarm5',
rationale: 'Builder executes implementation or patch work in parallel with foundation analysis.',
pushUnique({
workerId: wantsOps ? 'builder' : 'reviewer',
rationale: wantsOps
? 'Builder executes implementation or patch work in parallel with runtime analysis.'
: 'Reviewer provides the second-lane quality gate for implementation work.',
reviewRequired: false,
direct: true,
task: [
`Conductor mission: ${goal}`,
'',
'Lane: Builder.',
'Implement or prototype the concrete fix/feature path. Avoid broad refactors. Report files changed, tests run, and remaining risks.',
wantsOps ? 'Lane: Builder.' : 'Lane: Reviewer / quality gate.',
wantsOps
? 'Implement or prototype the concrete fix/feature path. Avoid broad refactors. Report files changed, tests run, and remaining risks.'
: 'Review the execution path and any changes. Look for regressions, missing tests, unsafe assumptions, and production-readiness gaps.',
options.supervised ? 'Supervised mode: prepare patches but stop before destructive writes or commits if approval is needed.' : 'Proceed without asking unless blocked.',
].join('\n'),
})
}
if (maxParallel >= 3) {
assignments.push({
workerId: 'swarm6',
rationale: 'Reviewer independently checks correctness, regressions, and merge risk.',
pushUnique({
workerId: wantsOps ? 'reviewer' : 'qa',
rationale: wantsOps
? 'Reviewer independently checks correctness, regressions, and merge risk.'
: 'QA validates user-visible behavior with focused smoke checks.',
reviewRequired: false,
direct: true,
task: [
`Conductor mission: ${goal}`,
'',
'Lane: Reviewer / merge gate.',
'Review the implementation plan and any changes from Foundation/Builder. Look for regressions, missing tests, unsafe assumptions, and production-readiness gaps. Do not make broad edits unless needed to unblock correctness.',
wantsOps ? 'Lane: Reviewer / quality gate.' : 'Lane: QA.',
wantsOps
? 'Review the implementation plan and any changes from Ops/Builder. Look for regressions, missing tests, unsafe assumptions, and production-readiness gaps. Do not make broad edits unless needed to unblock correctness.'
: 'Run or design focused verification. Prefer targeted tests/build/smoke checks. Report exact commands and results. If tests are missing, identify the minimal regression coverage needed.',
].join('\n'),
})
}
if (maxParallel >= 4) {
assignments.push({
workerId: 'swarm11',
rationale: 'QA validates behavior with targeted tests and smoke checks.',
pushUnique({
workerId: wantsOps ? 'qa' : 'ops-watch',
rationale: wantsOps
? 'QA validates behavior with targeted tests and smoke checks.'
: 'Ops Watch checks runtime/service risks for implementation missions.',
reviewRequired: false,
direct: true,
task: [
`Conductor mission: ${goal}`,
'',
'Lane: QA.',
'Run or design focused verification. Prefer targeted tests/build/smoke checks. Report exact commands and results. If tests are missing, identify the minimal regression coverage needed.',
wantsOps ? 'Lane: QA.' : 'Lane: Ops Watch / runtime quality.',
wantsOps
? 'Run or design focused verification. Prefer targeted tests/build/smoke checks. Report exact commands and results. If tests are missing, identify the minimal regression coverage needed.'
: 'Check runtime, service, deployment, and operational risk. Report only concrete blockers, verification gaps, and safe next actions.',
].join('\n'),
})
}
if (maxParallel >= 5 || wantsDocs) {
assignments.push({
workerId: 'swarm7',
rationale: 'Scribe captures handoff, docs, and operational notes.',
pushUnique({
workerId: 'km-agent',
rationale: 'KM Agent captures handoff, docs, and durable knowledge notes without leaking secrets.',
reviewRequired: false,
direct: true,
task: [
`Conductor mission: ${goal}`,
'',
'Lane: Scribe.',
'Lane: KM Agent / handoff and knowledge hygiene.',
'Create a concise handoff/status note: what changed, how to operate it, verification, caveats, and next actions. Do not expose secrets.',
options.supervised ? 'Supervised mode: stop before destructive writes or commits and report the exact approval needed.' : 'Proceed without asking unless blocked.',
].join('\n'),
})
}
const selected = assignments.slice(0, maxParallel)
if (wantsDocs && !selected.some((assignment) => assignment.workerId === 'swarm7')) {
if (wantsDocs && !selected.some((assignment) => assignment.workerId === 'km-agent')) {
selected[selected.length - 1] = {
workerId: 'swarm7',
rationale: 'Scribe captures handoff, docs, and operational notes.',
workerId: 'km-agent',
rationale: 'KM Agent captures handoff, docs, and durable knowledge notes without leaking secrets.',
reviewRequired: false,
direct: true,
task: [
`Conductor mission: ${goal}`,
'',
'Lane: Scribe.',
'Lane: KM Agent / handoff and knowledge hygiene.',
'Create a concise handoff/status note: what changed, how to operate it, verification, caveats, and next actions. Do not expose secrets.',
options.supervised ? 'Supervised mode: stop before destructive writes or commits and report the exact approval needed.' : 'Proceed without asking unless blocked.',
].join('\n'),

View File

@@ -7,7 +7,7 @@ import { join } from 'node:path'
import { isAuthenticated } from '../../server/auth-middleware'
import { newestCheckpointFromMessages, parseSwarmCheckpoint, type ParsedSwarmCheckpoint } from '../../server/swarm-checkpoints'
import { readWorkerMessages } from '../../server/swarm-chat-reader'
import { createOrUpdateMission, markMissionAssignmentDispatched, recordMissionCheckpoint } from '../../server/swarm-missions'
import { createOrUpdateMission, getSwarmMission, markMissionAssignmentDispatched, recordMissionAssignmentBlocked, recordMissionCheckpoint } from '../../server/swarm-missions'
import { appendSwarmMemoryEvent, buildSwarmStartupSnapshot } from '../../server/swarm-memory'
import { rosterByWorkerId, type SwarmRosterWorker } from '../../server/swarm-roster'
import { publishSwarmCheckpointNotification } from '../../server/swarm-notifications'
@@ -502,6 +502,33 @@ function markDispatchResult(workerId: string, result: WorkerResult): void {
})
}
export function dispatchBlockReason(result: Pick<WorkerResult, 'ok' | 'error' | 'output' | 'checkpointStatus'>): string | null {
if (!result.ok) return result.error?.trim() || result.output?.trim() || 'Dispatch failed before a worker checkpoint was recorded.'
if (result.checkpointStatus === 'timeout') return 'No fresh checkpoint before poll timeout.'
return null
}
function recordDispatchBlock(workerId: string, assignment: AssignmentRequest, result: WorkerResult, options?: { missionId?: string | null }): void {
const reason = dispatchBlockReason(result)
if (!reason) return
recordMissionAssignmentBlocked({
missionId: options?.missionId,
assignmentId: assignment.assignmentId ?? null,
workerId,
reason,
source: 'swarm-dispatch',
})
writeRuntimePatch(workerId, {
state: 'blocked',
phase: 'blocked',
checkpointStatus: 'blocked',
blockedReason: reason,
lastDispatchResult: reason,
lastCheckIn: new Date().toISOString(),
lastOutputAt: Date.now(),
})
}
function markCheckpointResult(workerId: string, checkpoint: ParsedSwarmCheckpoint, notifySessionKey?: string | null): void {
// When the checkpoint reaches any terminal status (anything other than
// 'in_progress' — i.e. done/blocked/needs_input/handoff) the worker is no
@@ -757,6 +784,14 @@ async function sendPromptToLiveSession(workerId: string, prompt: string): Promis
}
}
export function buildHermesChatQueryArgs(prompt: string): string[] {
// `hermes chat -q` requires the query as the *immediate* next argv item.
// Keeping the prompt adjacent to -q prevents argparse from interpreting
// following flags (for example -Q) as a missing query and failing with:
// "argument -q/--query: expected one argument".
return ['chat', '-q', prompt, '-Q', '--yolo', '--ignore-rules', '--source', 'swarm-dispatch']
}
function runWorker(assignment: AssignmentRequest, timeoutMs: number, roster: SwarmRosterWorker | undefined, options?: { waitForCheckpoint?: boolean; checkpointPollMs?: number; missionId?: string | null; notifySessionKey?: string | null }): Promise<WorkerResult> {
return new Promise(async (resolve) => {
const workerId = assignment.workerId
@@ -866,6 +901,7 @@ function runWorker(assignment: AssignmentRequest, timeoutMs: number, roster: Swa
} else {
liveResult.checkpointStatus = 'not-requested'
}
recordDispatchBlock(workerId, assignment, liveResult, options)
resolve(liveResult)
return
}
@@ -881,15 +917,14 @@ function runWorker(assignment: AssignmentRequest, timeoutMs: number, roster: Swa
delivery: 'oneshot',
}
markDispatchResult(workerId, result)
recordDispatchBlock(workerId, assignment, result, options)
resolve(result)
return
}
const useWrapper = existsSync(wrapperPath)
const cmd = useWrapper ? wrapperPath : resolveHermesBin()
const args = useWrapper
? ['chat', '-Q', '-q', prompt, '--yolo', '--ignore-rules', '--source', 'swarm-dispatch']
: ['chat', '-Q', '-q', prompt, '--yolo', '--ignore-rules', '--source', 'swarm-dispatch']
const args = buildHermesChatQueryArgs(prompt)
const env: NodeJS.ProcessEnv = {
...process.env,
HERMES_HOME: profilePath,
@@ -909,7 +944,6 @@ function runWorker(assignment: AssignmentRequest, timeoutMs: number, roster: Swa
timeout: timeoutMs,
maxBuffer: MAX_OUTPUT_CHARS,
killSignal: 'SIGTERM',
input: prompt,
},
(error, stdout, stderr) => {
const durationMs = Date.now() - startedAt
@@ -929,6 +963,7 @@ function runWorker(assignment: AssignmentRequest, timeoutMs: number, roster: Swa
delivery: 'oneshot',
}
markDispatchResult(workerId, result)
recordDispatchBlock(workerId, assignment, result, options)
resolve(result)
return
}
@@ -985,6 +1020,7 @@ function runWorker(assignment: AssignmentRequest, timeoutMs: number, roster: Swa
result.checkpointStatus = 'not-requested'
}
markDispatchResult(workerId, result)
recordDispatchBlock(workerId, assignment, result, options)
resolve(result)
},
)
@@ -1000,6 +1036,7 @@ function runWorker(assignment: AssignmentRequest, timeoutMs: number, roster: Swa
delivery: 'oneshot',
}
markDispatchResult(workerId, result)
recordDispatchBlock(workerId, assignment, result, options)
resolve(result)
})
})
@@ -1094,11 +1131,13 @@ export async function dispatchSwarmAssignments(body: DispatchRequest) {
{ waitForCheckpoint, checkpointPollMs: checkpointPollSeconds * 1000, missionId: mission.id, notifySessionKey },
)))
const latestMission = getSwarmMission(mission.id) ?? mission
return {
dispatchedAt,
completedAt: Date.now(),
missionId: mission.id,
mission,
mission: latestMission,
prompt: assignments.length === 1 ? assignments[0].task : `${assignments.length} assigned tasks`,
assignments,
timeoutSeconds,

View File

@@ -1,5 +1,6 @@
import { execFile } from 'node:child_process'
import { existsSync, readFileSync, readdirSync, statSync } from 'node:fs'
import { homedir } from 'node:os'
import { join } from 'node:path'
import { json } from '@tanstack/react-start'
import { createFileRoute } from '@tanstack/react-router'
@@ -94,19 +95,22 @@ function lastLogTail(
}
}
function resolveTmuxBin(): string {
const override = process.env.HERMES_TMUX_BIN || process.env.CLAUDE_TMUX_BIN
if (override) return override
const local = join(homedir(), '.local', 'bin', 'tmux')
return existsSync(local) ? local : 'tmux'
}
function tmuxHasSession(name: string): Promise<boolean> {
return new Promise((resolve) => {
execFile('tmux', ['has-session', '-t', name], (error) => resolve(!error))
execFile(resolveTmuxBin(), ['has-session', '-t', name], (error) => resolve(!error))
})
}
function tmuxIsInstalled(): Promise<boolean> {
// Honour HERMES_TMUX_BIN so a custom-path install isn't reported as
// 'tmux not installed' just because PATH doesn't include it. See #244.
const bin =
process.env.HERMES_TMUX_BIN || process.env.CLAUDE_TMUX_BIN || 'tmux'
return new Promise((resolve) => {
execFile(bin, ['-V'], (error) => resolve(!error))
execFile(resolveTmuxBin(), ['-V'], (error) => resolve(!error))
})
}

View File

@@ -0,0 +1,15 @@
import { describe, expect, it } from 'vitest'
import { shouldPersistActiveConductorMission } from './use-conductor-gateway'
describe('Conductor active mission persistence', () => {
it('persists only resumable in-flight phases', () => {
expect(shouldPersistActiveConductorMission('decomposing')).toBe(true)
expect(shouldPersistActiveConductorMission('running')).toBe(true)
})
it('does not persist terminal or idle phases as the active mission', () => {
expect(shouldPersistActiveConductorMission('idle')).toBe(false)
expect(shouldPersistActiveConductorMission('complete')).toBe(false)
})
})

View File

@@ -11,11 +11,11 @@ type HistoryMessagePart = {
type HistoryMessage = {
role?: string
content?: string | HistoryMessagePart[]
content?: string | Array<HistoryMessagePart>
}
type HistoryResponse = {
messages?: HistoryMessage[]
messages?: Array<HistoryMessage>
error?: string
}
@@ -65,6 +65,10 @@ const DEFAULT_CONDUCTOR_SETTINGS: ConductorSettings = {
supervised: false,
}
export function shouldPersistActiveConductorMission(phase: MissionPhase): boolean {
return phase === 'decomposing' || phase === 'running'
}
type PersistedMission = {
missionId: string | null
missionJobId: string | null
@@ -75,13 +79,13 @@ type PersistedMission = {
pausedElapsedMs: number
accumulatedPausedMs: number
pauseStartedAt: string | null
workerKeys: string[]
workerLabels: string[]
workerKeys: Array<string>
workerLabels: Array<string>
workerOutputs: Record<string, string>
streamText: string
planText: string
completedAt: string | null
tasks: ConductorTask[]
tasks: Array<ConductorTask>
}
type StreamEvent =
@@ -156,11 +160,11 @@ export type MissionHistoryEntry = {
status: 'completed' | 'failed'
projectPath: string | null
outputPath?: string | null
workerSummary?: string[]
workerSummary?: Array<string>
outputText?: string
streamText?: string
completeSummary?: string
workerDetails?: MissionHistoryWorkerDetail[]
workerDetails?: Array<MissionHistoryWorkerDetail>
error?: string | null
}
@@ -177,8 +181,8 @@ function getAgentPersona(index: number) {
}
}
function extractTasksFromPlan(planText: string): ConductorTask[] {
const tasks: ConductorTask[] = []
function extractTasksFromPlan(planText: string): Array<ConductorTask> {
const tasks: Array<ConductorTask> = []
const patterns = [/^\s*(\d+)\.\s+(.+)$/gm, /^\s*#{1,3}\s+(?:Step\s+)?(\d+)[.:]\s*(.+)$/gm, /^\s*-\s+\*\*(?:Task\s+)?(\d+)[.:]\s*\*\*\s*(.+)$/gm]
const seen = new Set<string>()
@@ -250,7 +254,7 @@ function getSessionSearchText(session: GatewaySession): string {
.join(' ')
}
function buildMissionNeedles(goal: string): string[] {
function buildMissionNeedles(goal: string): Array<string> {
const words = normalizeMatchText(goal).split(' ').filter(Boolean)
const prefixes = [5, 8, 12]
.map((count) => words.slice(0, count).join(' ').trim())
@@ -261,7 +265,7 @@ function buildMissionNeedles(goal: string): string[] {
function sessionMatchesMissionContext(
session: GatewaySession,
missionStartMs: number,
missionNeedles: string[],
missionNeedles: Array<string>,
): boolean {
const createdAt = toIso(session.createdAt ?? session.startedAt ?? session.updatedAt)
if (!createdAt) return false
@@ -331,6 +335,16 @@ function loadPersistedMission(): PersistedMission | null {
if (!goal || (phase !== 'idle' && phase !== 'decomposing' && phase !== 'running' && phase !== 'complete') || streamText === null || planText === null || !workerKeys || !workerLabels) {
return null
}
// Completed/stopped missions are already represented in mission history.
// Restoring them as the active mission causes stale terminal records to be
// re-queried on page load and can surface an old failure as if Conductor is
// currently broken.
if (!shouldPersistActiveConductorMission(phase)) {
clearPersistedMission()
return null
}
return {
missionId,
missionJobId,
@@ -379,7 +393,7 @@ function persistConductorSettings(settings: ConductorSettings): void {
}
}
function loadMissionHistory(): MissionHistoryEntry[] {
function loadMissionHistory(): Array<MissionHistoryEntry> {
try {
const raw = globalThis.localStorage?.getItem(HISTORY_STORAGE_KEY)
if (!raw) return []
@@ -481,7 +495,7 @@ function deriveWorkerStatus(session: GatewaySession, updatedAt: string | null):
return 'running'
}
function workersLookComplete(workers: ConductorWorker[], staleAfterMs: number): boolean {
function workersLookComplete(workers: Array<ConductorWorker>, staleAfterMs: number): boolean {
if (workers.length === 0) return false
return workers.every((worker) => {
@@ -563,7 +577,7 @@ function extractHistoryMessageText(message: HistoryMessage | undefined): string
return ''
}
function getLastAssistantMessage(messages: HistoryMessage[] | undefined): string {
function getLastAssistantMessage(messages: Array<HistoryMessage> | undefined): string {
if (!Array.isArray(messages)) return ''
// Return the longest assistant message so we prefer the substantive work output.
let best = ''
@@ -576,7 +590,7 @@ function getLastAssistantMessage(messages: HistoryMessage[] | undefined): string
return best
}
function readMissionLines(mission: ConductorMissionRecord | null | undefined): string[] {
function readMissionLines(mission: ConductorMissionRecord | null | undefined): Array<string> {
if (!Array.isArray(mission?.lines)) return []
return mission.lines.filter((line): line is string => typeof line === 'string')
}
@@ -593,7 +607,7 @@ function extractSessionIdFromMission(mission: ConductorMissionRecord | null | un
return null
}
function formatMissionLog(lines: string[]): string {
function formatMissionLog(lines: Array<string>): string {
return lines
.map((line) => line.trimEnd())
.filter((line) => line.trim().length > 0)
@@ -652,8 +666,8 @@ function extractProjectPath(text: string): string | null {
return null
}
function buildMissionOutputPath(workers: ConductorWorker[], workerOutputs: Record<string, string>, tasks: ConductorTask[], streamText: string): string | null {
const workerOutputTexts = [...Object.values(workerOutputs), ...workers.map((worker) => getLastAssistantMessage(worker.raw.messages as HistoryMessage[] | undefined))].filter(Boolean)
function buildMissionOutputPath(workers: Array<ConductorWorker>, workerOutputs: Record<string, string>, tasks: Array<ConductorTask>, streamText: string): string | null {
const workerOutputTexts = [...Object.values(workerOutputs), ...workers.map((worker) => getLastAssistantMessage(worker.raw.messages as Array<HistoryMessage> | undefined))].filter(Boolean)
for (const text of workerOutputTexts) {
const extractedPath = extractProjectPath(text)
@@ -672,9 +686,9 @@ function buildMissionOutputPath(workers: ConductorWorker[], workerOutputs: Recor
return null
}
function summarizeWorkers(workers: ConductorWorker[]): string[] {
function summarizeWorkers(workers: Array<ConductorWorker>): Array<string> {
return workers.map((worker) => {
const output = getLastAssistantMessage(worker.raw.messages as HistoryMessage[] | undefined)
const output = getLastAssistantMessage(worker.raw.messages as Array<HistoryMessage> | undefined)
const firstLine = output
.split(/\n+/)
.map((line) => line.trim())
@@ -714,10 +728,10 @@ function buildCompleteSummary(params: {
return lines.join('\n')
}
function buildMissionOutputText(workers: ConductorWorker[], workerOutputs: Record<string, string>, streamText: string): string {
function buildMissionOutputText(workers: Array<ConductorWorker>, workerOutputs: Record<string, string>, streamText: string): string {
const workerSections = workers
.map((worker) => {
const output = (workerOutputs[worker.key] ?? getLastAssistantMessage(worker.raw.messages as HistoryMessage[] | undefined)).trim()
const output = (workerOutputs[worker.key] ?? getLastAssistantMessage(worker.raw.messages as Array<HistoryMessage> | undefined)).trim()
if (!output) return null
return `### ${worker.displayName}\n\n${output}`
})
@@ -905,7 +919,7 @@ export function useConductorGateway() {
const [orchestratorSessionKey, setOrchestratorSessionKey] = useState<string | null>(() => initialMission?.workerKeys[0] ?? null)
const [streamText, setStreamText] = useState(() => initialMission?.streamText ?? '')
const [planText, setPlanText] = useState(() => initialMission?.planText ?? '')
const [streamEvents, setStreamEvents] = useState<StreamEvent[]>([])
const [streamEvents, setStreamEvents] = useState<Array<StreamEvent>>([])
const [missionStartedAt, setMissionStartedAt] = useState<string | null>(() => initialMission?.missionStartedAt ?? null)
const [isPaused, setIsPaused] = useState(() => initialMission?.isPaused ?? false)
const [pausedElapsedMs, setPausedElapsedMs] = useState(() => initialMission?.pausedElapsedMs ?? 0)
@@ -917,8 +931,8 @@ export function useConductorGateway() {
const [missionWorkerKeys, setMissionWorkerKeys] = useState<Set<string>>(() => new Set(initialMission?.workerKeys ?? []))
const [missionWorkerLabels, setMissionWorkerLabels] = useState<Set<string>>(() => new Set(initialMission?.workerLabels ?? []))
const [workerOutputs, setWorkerOutputs] = useState<Record<string, string>>(() => initialMission?.workerOutputs ?? {})
const [tasks, setTasks] = useState<ConductorTask[]>(() => initialMission?.tasks ?? [])
const [missionHistory, setMissionHistory] = useState<MissionHistoryEntry[]>(() => loadMissionHistory())
const [tasks, setTasks] = useState<Array<ConductorTask>>(() => initialMission?.tasks ?? [])
const [missionHistory, setMissionHistory] = useState<Array<MissionHistoryEntry>>(() => loadMissionHistory())
const [selectedHistoryEntry, setSelectedHistoryEntry] = useState<MissionHistoryEntry | null>(null)
const [conductorSettings, setConductorSettings] = useState<ConductorSettings>(() => loadConductorSettings())
const doneRef = useRef(initialMission?.phase === 'complete')
@@ -1020,7 +1034,7 @@ export function useConductorGateway() {
if (!missionId) return null
return fetchConductorMission(missionId)
},
enabled: Boolean(missionId) && phase !== 'idle',
enabled: Boolean(missionId) && shouldPersistActiveConductorMission(phase),
refetchInterval: phase === 'decomposing' || phase === 'running' ? 2_500 : false,
retry: Infinity,
retryDelay: (attemptIndex: number) => Math.min(2000 * 2 ** attemptIndex, 10_000),
@@ -1032,7 +1046,7 @@ export function useConductorGateway() {
// assignments so the UI shows progress instead of "Spawning workers..." forever.
const swarmAssignments = missionStatusQuery.data?.assignments
const isNativeSwarm = missionStatusQuery.data?.nativeSwarm === true
const virtualWorkers = useMemo<ConductorWorker[]>(() => {
const virtualWorkers = useMemo<Array<ConductorWorker>>(() => {
if (!isNativeSwarm || !swarmAssignments || swarmAssignments.length === 0) return []
const missionUpdatedAt = new Date(missionStatusQuery.data?.updatedAt ?? Date.now()).toISOString()
return swarmAssignments.map((assignment, index) => {
@@ -1374,7 +1388,7 @@ export function useConductorGateway() {
}, [conductorSettings])
useEffect(() => {
if (phase === 'idle') {
if (!shouldPersistActiveConductorMission(phase)) {
try {
localStorage.removeItem(ACTIVE_MISSION_STORAGE_KEY)
} catch {}

View File

@@ -104,6 +104,68 @@ describe('swarm-missions', () => {
expect(existsSync(mod.SWARM_MISSIONS_PATH)).toBe(true)
})
it('does not infer review-required from dispatch/checkpoint wording alone', async () => {
const mod = await loadModule()
const mission = mod.createOrUpdateMission({
missionId: 'mission-dispatch-smoke-review',
title: 'Diagnostic dispatch smoke',
assignments: [{
workerId: 'builder',
task: 'Diagnostic smoke only. Return RESULT: workspace swarm dispatch API smoke passed.',
rationale: 'diagnostic dispatch smoke',
}],
})
expect(mission.assignments[0]?.reviewRequired).toBe(false)
const updated = mod.recordMissionCheckpoint({
missionId: mission.id,
assignmentId: mission.assignments[0]?.id,
workerId: 'builder',
checkpoint: {
stateLabel: 'DONE',
runtimeState: 'idle',
checkpointStatus: 'done',
filesChanged: 'none',
commandsRun: 'none',
result: 'workspace swarm dispatch API smoke passed',
blocker: null,
nextAction: 'none',
raw: 'STATE: DONE\nFILES_CHANGED: none\nCOMMANDS_RUN: none\nRESULT: workspace swarm dispatch API smoke passed\nBLOCKER: none\nNEXT_ACTION: none',
},
source: 'swarm-dispatch',
})
expect(updated?.state).toBe('complete')
})
it('records dispatch failures as blocked mission assignments', async () => {
const mod = await loadModule()
const mission = mod.createOrUpdateMission({
missionId: 'mission-dispatch-failure',
title: 'Dispatch failure test',
assignments: [{ workerId: 'builder', task: 'Probe runtime health', reviewRequired: false }],
})
mod.markMissionAssignmentDispatched({ missionId: mission.id, workerId: 'builder', task: 'Probe runtime health' })
const blocked = mod.recordMissionAssignmentBlocked({
missionId: mission.id,
assignmentId: mission.assignments[0]?.id,
workerId: 'builder',
reason: 'No fresh checkpoint before poll timeout.',
source: 'swarm-dispatch',
})
expect(blocked?.mission.state).toBe('blocked')
expect(blocked?.assignment.state).toBe('blocked')
expect(blocked?.assignment.checkpoint).toMatchObject({
stateLabel: 'BLOCKED',
checkpointStatus: 'blocked',
blocker: 'No fresh checkpoint before poll timeout.',
})
expect(blocked?.mission.events.at(-1)?.type).toBe('blocked')
})
it('keeps dependent work queued until review-required assignments are reviewed', async () => {
const mod = await loadModule()
const mission = mod.createOrUpdateMission({

View File

@@ -126,6 +126,13 @@ function deriveMissionState(assignments: Array<SwarmMissionAssignment>): SwarmMi
return 'planning'
}
function inferReviewRequired(task: string, rationale?: string | null): boolean {
// Match intent-bearing task terms only. The previous loose alternation matched
// substrings such as "patch" inside "dispatch" and left simple smoke runs in
// review forever.
return /\b(code|patch(?:es|ed|ing)?|implement(?:ation|ed|ing)?|pr|benchmarks?)\b/i.test(`${task} ${rationale ?? ''}`)
}
const TERMINAL_ASSIGNMENT_STATES = new Set<SwarmMissionAssignmentState>(['done', 'cancelled'])
function isTerminalAssignment(assignment: SwarmMissionAssignment): boolean {
@@ -197,7 +204,7 @@ export function createOrUpdateMission(input: {
task: assignment.task,
rationale: assignment.rationale ?? null,
dependsOn: assignment.dependsOn ?? [],
reviewRequired: assignment.reviewRequired ?? /code|patch|implement|pr|benchmark/i.test(`${assignment.task} ${assignment.rationale ?? ''}`),
reviewRequired: assignment.reviewRequired ?? inferReviewRequired(assignment.task, assignment.rationale),
state: 'queued',
dispatchedAt: null,
completedAt: null,
@@ -297,6 +304,63 @@ export function recordMissionCheckpoint(input: {
return Object.assign(mission, { _completed: completed })
}
export function recordMissionAssignmentBlocked(input: {
missionId?: string | null
assignmentId?: string | null
workerId: string
reason?: string | null
source?: string | null
}): { mission: SwarmMission; assignment: SwarmMissionAssignment; changed: boolean } | null {
if (!input.missionId) return null
const store = readStore()
const mission = store.missions.find((item) => item.id === input.missionId)
if (!mission) return null
if (mission.state === 'cancelled' || mission.state === 'complete') return null
const assignment = (input.assignmentId
? mission.assignments.find((item) => item.id === input.assignmentId)
: null)
?? [...mission.assignments].reverse().find((item) => item.workerId === input.workerId && !isTerminalAssignment(item))
?? [...mission.assignments].reverse().find((item) => item.workerId === input.workerId)
if (!assignment) return null
if (assignment.state === 'cancelled' || assignment.state === 'done') return { mission, assignment, changed: false }
const reason = input.reason?.trim() || 'Dispatch failed before a worker checkpoint was recorded.'
const blockedAt = now()
const checkpoint: ParsedSwarmCheckpoint = {
stateLabel: 'BLOCKED',
runtimeState: 'blocked',
checkpointStatus: 'blocked',
filesChanged: 'none',
commandsRun: 'none',
result: null,
blocker: reason,
nextAction: 'Fix blocker and retry dispatch.',
raw: `STATE: BLOCKED\nFILES_CHANGED: none\nCOMMANDS_RUN: none\nRESULT: none\nBLOCKER: ${reason}\nNEXT_ACTION: Fix blocker and retry dispatch.`,
}
const changed = assignment.state !== 'blocked' || assignment.checkpoint?.raw !== checkpoint.raw
assignment.state = 'blocked'
assignment.completedAt = blockedAt
assignment.checkpoint = checkpoint
const report = reportFromCheckpoint({
missionId: mission.id,
assignmentId: assignment.id,
workerId: input.workerId,
checkpoint,
source: input.source,
})
if (changed) {
mission.events.push(event('blocked', `${input.workerId} blocked: ${reason}`, {
workerId: input.workerId,
assignmentId: assignment.id,
data: report,
}))
}
mission.updatedAt = blockedAt
mission.state = deriveMissionState(mission.assignments)
writeStore(store)
return { mission, assignment, changed }
}
export function appendMissionContinuation(input: {
missionId?: string | null
workerId: string

View File

@@ -14,7 +14,7 @@
* matches, so re-running on a healthy profile is free.
*/
import { copyFileSync, existsSync, lstatSync, mkdirSync, readFileSync, renameSync, symlinkSync, unlinkSync, writeFileSync } from 'node:fs'
import { copyFileSync, existsSync, lstatSync, mkdirSync, readFileSync, readdirSync, renameSync, symlinkSync, unlinkSync, writeFileSync } from 'node:fs'
import { homedir } from 'node:os'
import { join } from 'node:path'
import * as yaml from 'yaml'
@@ -28,6 +28,7 @@ export type ProfileBootstrapResult = {
configCreated: boolean
envLinked: boolean
authLinked: boolean
mcpTokensLinked: number
error?: string
}
@@ -42,6 +43,24 @@ export type SwarmWorkerIdentity = {
capabilities?: Array<string>
}
function linkSharedFile(source: string, target: string): boolean {
if (!existsSync(source)) return false
if (existsSync(target)) {
try {
const stat = lstatSync(target)
if (stat.isSymbolicLink()) {
unlinkSync(target)
} else {
renameSync(target, `${target}.profile-local.bak-${Date.now()}`)
}
} catch {
return false
}
}
symlinkSync(source, target)
return true
}
/**
* Ensure a worker HERMES_HOME has enough runtime config to boot Hermes.
*
@@ -52,7 +71,7 @@ export type SwarmWorkerIdentity = {
* The config is copied (not symlinked) because per-worker model sync edits it.
*/
export function ensureSwarmProfileConfig(profilePath: string): ProfileBootstrapResult {
const result: ProfileBootstrapResult = { ok: true, configCreated: false, envLinked: false, authLinked: false }
const result: ProfileBootstrapResult = { ok: true, configCreated: false, envLinked: false, authLinked: false, mcpTokensLinked: 0 }
try {
mkdirSync(profilePath, { recursive: true })
@@ -101,6 +120,18 @@ export function ensureSwarmProfileConfig(profilePath: string): ProfileBootstrapR
}
}
const mcpTokensDir = join(profilePath, 'mcp-tokens')
const sourceMcpTokensDir = join(homedir(), '.hermes', 'mcp-tokens')
if (existsSync(sourceMcpTokensDir)) {
mkdirSync(mcpTokensDir, { recursive: true })
for (const name of readdirSync(sourceMcpTokensDir)) {
if (!name.endsWith('.json')) continue
if (linkSharedFile(join(sourceMcpTokensDir, name), join(mcpTokensDir, name))) {
result.mcpTokensLinked += 1
}
}
}
if (!existsSync(configPath)) {
return { ...result, ok: false, error: `config.yaml missing at ${configPath}` }
}

View File

@@ -1,5 +1,5 @@
import { describe, expect, it } from 'vitest'
import { remoteUrlMatches } from './update-system'
import { remoteUrlMatches, updateAvailableFromDivergence } from './update-system'
describe('update-system helpers', () => {
it('matches GitHub URL forms against expected repo aliases', () => {
@@ -19,4 +19,12 @@ describe('update-system helpers', () => {
]),
).toBe(false)
})
it('only reports update availability when the remote side is ahead', () => {
expect(updateAvailableFromDivergence({ ahead: 2, behind: 0 }, true)).toBe(false)
expect(updateAvailableFromDivergence({ ahead: 0, behind: 3 }, true)).toBe(true)
expect(updateAvailableFromDivergence({ ahead: 2, behind: 3 }, true)).toBe(true)
expect(updateAvailableFromDivergence({ ahead: 0, behind: 0 }, false)).toBe(false)
expect(updateAvailableFromDivergence(null, true)).toBe(true)
})
})

View File

@@ -221,6 +221,26 @@ function canResetToRemote(repoPath: string, remoteRef: string): boolean {
return Boolean(git(['rev-parse', '--verify', remoteRef], repoPath, 10_000))
}
function branchDivergence(repoPath: string, remoteRef: string): { ahead: number; behind: number } | null {
const raw = git(['rev-list', '--left-right', '--count', `HEAD...${remoteRef}`], repoPath, 10_000)
if (!raw) return null
const [aheadRaw, behindRaw] = raw.split(/\s+/)
const ahead = Number(aheadRaw)
const behind = Number(behindRaw)
if (!Number.isFinite(ahead) || !Number.isFinite(behind)) return null
return { ahead, behind }
}
export function updateAvailableFromDivergence(
divergence: { ahead: number; behind: number } | null,
headsDiffer: boolean,
): boolean {
// A local checkout can legitimately be ahead of origin because it carries
// hotfixes or unpublished commits. That is not an available upstream update.
// Only remote-ahead or diverged histories should surface as updateable.
return divergence ? divergence.behind > 0 : headsDiffer
}
function syncRepoToRemote(repoPath: string, remoteRef: string): string {
if (canFastForward(repoPath, remoteRef)) {
return execOrThrow('git', ['merge', '--ff-only', remoteRef], {
@@ -338,10 +358,11 @@ export function readWorkspaceUpdateStatus(
const latestHead =
repoMatches && supportedBranch ? remoteHead(gitRepo, 'origin') : null
const dirty = isDirty(gitRepo)
const updateAvailable = Boolean(
supportedBranch && currentHead && latestHead && currentHead !== latestHead,
)
const remoteRef = `origin/${branch || 'main'}`
const divergence = latestHead ? branchDivergence(gitRepo, remoteRef) : null
const updateAvailable = Boolean(
supportedBranch && currentHead && latestHead && updateAvailableFromDivergence(divergence, currentHead !== latestHead),
)
const canSync = updateAvailable ? canResetToRemote(gitRepo, remoteRef) : true
const ff = updateAvailable ? canFastForward(gitRepo, remoteRef) : true
const canUpdate = Boolean(
@@ -443,8 +464,9 @@ export function readAgentUpdateStatus(): ProductUpdateStatus {
const latestHead = repoMatches ? remoteHead(repoPath, 'origin') : null
const remoteRef = repoMatches ? `origin/${branch || 'main'}` : null
const dirty = isDirty(repoPath)
const divergence = remoteRef ? branchDivergence(repoPath, remoteRef) : null
const updateAvailable = Boolean(
currentHead && latestHead && currentHead !== latestHead && remoteRef,
currentHead && latestHead && remoteRef && updateAvailableFromDivergence(divergence, currentHead !== latestHead),
)
const canSync = remoteRef ? canResetToRemote(repoPath, remoteRef) : false
const ff = remoteRef ? canFastForward(repoPath, remoteRef) : false