Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
db9970d040 | ||
|
|
3d4fbf8763 | ||
|
|
9387590696 | ||
|
|
74a04f1d8e | ||
|
|
3c258b0f19 | ||
|
|
6303eef3a2 |
@@ -32,6 +32,14 @@ function getAIBridge() {
|
||||
return (window as unknown as { netcatty?: Record<string, (...args: unknown[]) => unknown> }).netcatty;
|
||||
}
|
||||
|
||||
function cleanupAcpSessions(sessionIds: string[]) {
|
||||
const bridge = getAIBridge();
|
||||
if (!bridge?.aiAcpCleanup || sessionIds.length === 0) return;
|
||||
for (const sessionId of sessionIds) {
|
||||
void bridge.aiAcpCleanup(sessionId).catch(() => {});
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/** Maximum number of sessions to keep in localStorage. */
|
||||
const MAX_STORED_SESSIONS = 50;
|
||||
@@ -376,6 +384,7 @@ export function useAIState() {
|
||||
}, [defaultAgentId, persistSessions, setActiveSessionId]);
|
||||
|
||||
const deleteSession = useCallback((sessionId: string, scopeKey?: string) => {
|
||||
cleanupAcpSessions([sessionId]);
|
||||
if (persistTimerRef.current) {
|
||||
clearTimeout(persistTimerRef.current);
|
||||
persistTimerRef.current = null;
|
||||
@@ -394,6 +403,10 @@ export function useAIState() {
|
||||
}, [persistSessions]);
|
||||
|
||||
const deleteSessionsByTarget = useCallback((scopeType: 'terminal' | 'workspace', targetId: string) => {
|
||||
const removedSessionIds = sessionsRef.current
|
||||
.filter(s => s.scope.type === scopeType && s.scope.targetId === targetId)
|
||||
.map(s => s.id);
|
||||
cleanupAcpSessions(removedSessionIds);
|
||||
if (persistTimerRef.current) {
|
||||
clearTimeout(persistTimerRef.current);
|
||||
persistTimerRef.current = null;
|
||||
@@ -420,6 +433,18 @@ export function useAIState() {
|
||||
});
|
||||
}, [persistSessions]);
|
||||
|
||||
const updateSessionExternalSessionId = useCallback((sessionId: string, externalSessionId: string | undefined) => {
|
||||
setSessionsRaw(prev => {
|
||||
const next = prev.map(s => (
|
||||
s.id === sessionId
|
||||
? { ...s, externalSessionId, updatedAt: Date.now() }
|
||||
: s
|
||||
));
|
||||
debouncedPersistSessions();
|
||||
return next;
|
||||
});
|
||||
}, [debouncedPersistSessions]);
|
||||
|
||||
// Maximum messages per session to prevent unbounded memory growth
|
||||
const MAX_MESSAGES_PER_SESSION = 500;
|
||||
|
||||
@@ -484,6 +509,10 @@ export function useAIState() {
|
||||
}, [persistSessions]);
|
||||
|
||||
const cleanupOrphanedSessions = useCallback((activeTargetIds: Set<string>) => {
|
||||
const removedSessionIds = sessionsRef.current
|
||||
.filter(s => s.scope.targetId && !activeTargetIds.has(s.scope.targetId))
|
||||
.map(s => s.id);
|
||||
cleanupAcpSessions(removedSessionIds);
|
||||
setSessionsRaw(prev => {
|
||||
const next = prev.filter(s => {
|
||||
// Keep sessions without a targetId (global scope)
|
||||
@@ -572,6 +601,7 @@ export function useAIState() {
|
||||
deleteSession,
|
||||
deleteSessionsByTarget,
|
||||
updateSessionTitle,
|
||||
updateSessionExternalSessionId,
|
||||
addMessageToSession,
|
||||
updateLastMessage,
|
||||
updateMessageById,
|
||||
|
||||
@@ -52,11 +52,13 @@ interface SyncNowOptions {
|
||||
export const useAutoSync = (config: AutoSyncConfig) => {
|
||||
const { t } = useI18n();
|
||||
const sync = useCloudSync();
|
||||
const { onApplyPayload } = config;
|
||||
const syncTimeoutRef = useRef<NodeJS.Timeout | null>(null);
|
||||
const lastSyncedDataRef = useRef<string>('');
|
||||
const hasCheckedRemoteRef = useRef(false);
|
||||
const isInitializedRef = useRef(false);
|
||||
const isSyncRunningRef = useRef(false);
|
||||
const skipNextSyncRef = useRef(false);
|
||||
|
||||
const getSyncSnapshot = useCallback(() => {
|
||||
let effectivePFRules = config.portForwardingRules;
|
||||
@@ -162,6 +164,16 @@ export const useAutoSync = (config: AutoSyncConfig) => {
|
||||
|
||||
const results = await sync.syncNow(payload);
|
||||
|
||||
// Apply merged payloads first (before checking for failures) so local
|
||||
// state gets updated even when some providers failed
|
||||
for (const result of results.values()) {
|
||||
if (result.mergedPayload) {
|
||||
onApplyPayload(result.mergedPayload);
|
||||
skipNextSyncRef.current = true;
|
||||
break; // All providers share the same merged payload
|
||||
}
|
||||
}
|
||||
|
||||
for (const result of results.values()) {
|
||||
if (!result.success) {
|
||||
if (result.conflictDetected) {
|
||||
@@ -184,7 +196,7 @@ export const useAutoSync = (config: AutoSyncConfig) => {
|
||||
} finally {
|
||||
isSyncRunningRef.current = false;
|
||||
}
|
||||
}, [sync, buildPayload, getDataHash, t]);
|
||||
}, [sync, buildPayload, getDataHash, onApplyPayload, t]);
|
||||
|
||||
// Check remote version and pull if newer (on startup)
|
||||
const checkRemoteVersion = useCallback(async () => {
|
||||
@@ -207,18 +219,26 @@ export const useAutoSync = (config: AutoSyncConfig) => {
|
||||
|
||||
try {
|
||||
console.log('[AutoSync] Checking remote version...');
|
||||
// Load base BEFORE downloading (downloadFromProvider overwrites the base)
|
||||
const base = await manager.loadSyncBase(connectedProvider);
|
||||
const remotePayload = await sync.downloadFromProvider(connectedProvider);
|
||||
|
||||
|
||||
if (remotePayload && remotePayload.syncedAt > state.localUpdatedAt) {
|
||||
console.log('[AutoSync] Remote is newer, applying...');
|
||||
config.onApplyPayload(remotePayload);
|
||||
const { mergeSyncPayloads } = await import('../../domain/syncMerge');
|
||||
const localPayload = buildPayload();
|
||||
const mergeResult = mergeSyncPayloads(base, localPayload, remotePayload);
|
||||
|
||||
console.log('[AutoSync] Remote is newer, merged:', mergeResult.summary);
|
||||
config.onApplyPayload(mergeResult.payload);
|
||||
// Don't save base or skip auto-sync — let the data-change effect
|
||||
// naturally trigger an upload of the merged payload (which will
|
||||
// go through syncAllProviders and save base on success).
|
||||
toast.success(t('sync.autoSync.syncedMessage'), t('sync.autoSync.syncedTitle'));
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('[AutoSync] Failed to check remote version:', error);
|
||||
// Don't show error toast for initial check - it's not critical
|
||||
}
|
||||
}, [sync, config, t]);
|
||||
}, [sync, config, buildPayload, t]);
|
||||
|
||||
// Debounced auto-sync when data changes
|
||||
useEffect(() => {
|
||||
@@ -235,7 +255,15 @@ export const useAutoSync = (config: AutoSyncConfig) => {
|
||||
}
|
||||
|
||||
const currentHash = getDataHash();
|
||||
|
||||
|
||||
// After a merge, onApplyPayload changes local state which triggers
|
||||
// this effect. Skip that cycle and just update the hash baseline.
|
||||
if (skipNextSyncRef.current) {
|
||||
skipNextSyncRef.current = false;
|
||||
lastSyncedDataRef.current = currentHash;
|
||||
return;
|
||||
}
|
||||
|
||||
// Skip if data hasn't changed
|
||||
if (currentHash === lastSyncedDataRef.current) {
|
||||
return;
|
||||
|
||||
@@ -90,7 +90,7 @@ export const useTerminalBackend = () => {
|
||||
return bridge.onSessionData(sessionId, cb);
|
||||
}, []);
|
||||
|
||||
const onSessionExit = useCallback((sessionId: string, cb: (evt: { exitCode?: number; signal?: number }) => void) => {
|
||||
const onSessionExit = useCallback((sessionId: string, cb: (evt: { exitCode?: number; signal?: number; error?: string; reason?: "exited" | "error" | "timeout" | "closed" }) => void) => {
|
||||
const bridge = netcattyBridge.get();
|
||||
if (!bridge?.onSessionExit) throw new Error("onSessionExit unavailable");
|
||||
return bridge.onSessionExit(sessionId, cb);
|
||||
|
||||
@@ -42,6 +42,7 @@ import ConversationExport from './ai/ConversationExport';
|
||||
import { useAIChatStreaming, getNetcattyBridge } from './ai/hooks/useAIChatStreaming';
|
||||
import { useToolApproval } from './ai/hooks/useToolApproval';
|
||||
import { useConversationExport } from './ai/hooks/useConversationExport';
|
||||
import type { ExecutorContext } from '../infrastructure/ai/cattyAgent/executor';
|
||||
|
||||
// -------------------------------------------------------------------
|
||||
// Props
|
||||
@@ -55,6 +56,7 @@ interface AIChatSidePanelProps {
|
||||
createSession: (scope: AISessionScope, agentId?: string) => AISession;
|
||||
deleteSession: (sessionId: string, scopeKey?: string) => void;
|
||||
updateSessionTitle: (sessionId: string, title: string) => void;
|
||||
updateSessionExternalSessionId: (sessionId: string, externalSessionId: string | undefined) => void;
|
||||
addMessageToSession: (sessionId: string, message: ChatMessage) => void;
|
||||
updateLastMessage: (
|
||||
sessionId: string,
|
||||
@@ -115,6 +117,35 @@ function generateId(): string {
|
||||
return `msg-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
|
||||
}
|
||||
|
||||
function buildAcpHistoryMessages(messages: ChatMessage[]): Array<{ role: 'user' | 'assistant'; content: string }> {
|
||||
return messages.flatMap((message) => {
|
||||
if (message.role === 'system') return [];
|
||||
|
||||
if (message.role === 'user') {
|
||||
return message.content ? [{ role: 'user' as const, content: message.content }] : [];
|
||||
}
|
||||
|
||||
if (message.role === 'assistant') {
|
||||
const parts: string[] = [];
|
||||
if (message.content) parts.push(message.content);
|
||||
if (message.toolCalls?.length) {
|
||||
parts.push(...message.toolCalls.map((tc) => `Tool call: ${tc.name}(${JSON.stringify(tc.arguments ?? {})})`));
|
||||
}
|
||||
if (!parts.length) return [];
|
||||
return [{ role: 'assistant' as const, content: parts.join('\n\n') }];
|
||||
}
|
||||
|
||||
if (message.role === 'tool' && message.toolResults?.length) {
|
||||
return message.toolResults.map((tr) => ({
|
||||
role: 'assistant' as const,
|
||||
content: `Tool result:\n${tr.content}`,
|
||||
}));
|
||||
}
|
||||
|
||||
return [];
|
||||
});
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------
|
||||
// Component
|
||||
// -------------------------------------------------------------------
|
||||
@@ -126,6 +157,7 @@ const AIChatSidePanelInner: React.FC<AIChatSidePanelProps> = ({
|
||||
createSession,
|
||||
deleteSession,
|
||||
updateSessionTitle,
|
||||
updateSessionExternalSessionId,
|
||||
addMessageToSession,
|
||||
updateLastMessage,
|
||||
updateMessageById,
|
||||
@@ -166,6 +198,14 @@ const AIChatSidePanelInner: React.FC<AIChatSidePanelProps> = ({
|
||||
|
||||
const { images, addImages, removeImage, clearImages } = useImageUpload();
|
||||
const { openSettingsWindow } = useWindowControls();
|
||||
const terminalSessionsRef = useRef(terminalSessions);
|
||||
terminalSessionsRef.current = terminalSessions;
|
||||
const scopeTypeRef = useRef(scopeType);
|
||||
scopeTypeRef.current = scopeType;
|
||||
const scopeTargetIdRef = useRef(scopeTargetId);
|
||||
scopeTargetIdRef.current = scopeTargetId;
|
||||
const scopeLabelRef = useRef(scopeLabel);
|
||||
scopeLabelRef.current = scopeLabel;
|
||||
|
||||
// ── Streaming hook ──
|
||||
const {
|
||||
@@ -242,16 +282,13 @@ const AIChatSidePanelInner: React.FC<AIChatSidePanelProps> = ({
|
||||
}
|
||||
}, [webSearchConfig?.apiHost, webSearchConfig?.apiKey, webSearchConfig?.enabled]);
|
||||
|
||||
// Abort all active streams and clean up on unmount
|
||||
// Preserve active streams across tab switches. The panel is conditionally
|
||||
// mounted per tab, so unmounting here should not cancel in-flight work.
|
||||
useEffect(() => {
|
||||
const controllers = abortControllersRef.current;
|
||||
return () => {
|
||||
controllers.forEach(c => c.abort());
|
||||
controllers.clear();
|
||||
// Clear pending approval (clears timeout too via setPendingApproval)
|
||||
setPendingApproval(null);
|
||||
// no-op: stream lifecycle is managed by explicit stop/delete actions
|
||||
};
|
||||
}, [abortControllersRef, setPendingApproval]);
|
||||
}, []);
|
||||
|
||||
// Agent discovery
|
||||
const {
|
||||
@@ -379,6 +416,12 @@ const AIChatSidePanelInner: React.FC<AIChatSidePanelProps> = ({
|
||||
}
|
||||
}, [updateSessionTitle]);
|
||||
|
||||
const getExecutorContext = useCallback((): ExecutorContext => ({
|
||||
sessions: terminalSessionsRef.current,
|
||||
workspaceId: scopeTypeRef.current === 'workspace' ? scopeTargetIdRef.current : undefined,
|
||||
workspaceName: scopeTypeRef.current === 'workspace' ? scopeLabelRef.current : undefined,
|
||||
}), []);
|
||||
|
||||
/** Ensure a session exists for the current scope and return its ID. */
|
||||
const ensureSession = useCallback((): string => {
|
||||
if (activeSessionId) return activeSessionId;
|
||||
@@ -445,6 +488,9 @@ const AIChatSidePanelInner: React.FC<AIChatSidePanelProps> = ({
|
||||
}
|
||||
try {
|
||||
await sendToExternalAgent(sessionId, trimmed, agentConfig, abortController, attachedImages, {
|
||||
existingSessionId: currentSession?.externalSessionId,
|
||||
updateExternalSessionId: updateSessionExternalSessionId,
|
||||
historyMessages: buildAcpHistoryMessages(currentSession?.messages ?? []),
|
||||
terminalSessions,
|
||||
providers,
|
||||
selectedAgentModel,
|
||||
@@ -468,6 +514,7 @@ const AIChatSidePanelInner: React.FC<AIChatSidePanelProps> = ({
|
||||
commandBlocklist,
|
||||
terminalSessions,
|
||||
webSearchConfig,
|
||||
getExecutorContext,
|
||||
setPendingApproval,
|
||||
autoTitleSession,
|
||||
});
|
||||
@@ -478,8 +525,8 @@ const AIChatSidePanelInner: React.FC<AIChatSidePanelProps> = ({
|
||||
ensureSession, addMessageToSession, updateMessageById, updateLastMessage,
|
||||
setStreamingForScope, setInputValue, clearImages,
|
||||
sendToExternalAgent, sendToCattyAgent, reportStreamError, autoTitleSession, t,
|
||||
abortControllersRef, terminalSessions, providers, selectedAgentModel,
|
||||
scopeType, scopeTargetId, scopeLabel, globalPermissionMode, commandBlocklist, webSearchConfig, setPendingApproval,
|
||||
abortControllersRef, terminalSessions, providers, selectedAgentModel, updateSessionExternalSessionId,
|
||||
scopeType, scopeTargetId, scopeLabel, globalPermissionMode, commandBlocklist, webSearchConfig, getExecutorContext, setPendingApproval,
|
||||
]);
|
||||
|
||||
const handleStop = useCallback(() => {
|
||||
@@ -492,7 +539,7 @@ const AIChatSidePanelInner: React.FC<AIChatSidePanelProps> = ({
|
||||
updateLastMessage(activeSessionId, msg => ({
|
||||
...msg,
|
||||
statusText: '',
|
||||
executionStatus: msg.executionStatus === 'running' ? 'completed' : msg.executionStatus,
|
||||
executionStatus: msg.executionStatus === 'running' ? 'cancelled' : msg.executionStatus,
|
||||
}));
|
||||
// Also clear any pending approval (clears timeout too via setPendingApproval)
|
||||
if (pendingApprovalContextRef.current?.sessionId === activeSessionId) {
|
||||
@@ -516,8 +563,6 @@ const AIChatSidePanelInner: React.FC<AIChatSidePanelProps> = ({
|
||||
const handleDeleteSession = useCallback(
|
||||
(e: React.MouseEvent, sessionId: string) => {
|
||||
e.stopPropagation();
|
||||
const bridge = getNetcattyBridge();
|
||||
void bridge?.aiAcpCleanup?.(sessionId).catch(() => {});
|
||||
deleteSession(sessionId, scopeKey);
|
||||
// Active session clearing is handled by deleteSession with scopeKey
|
||||
},
|
||||
|
||||
@@ -978,6 +978,10 @@ export const SyncDashboard: React.FC<SyncDashboardProps> = ({
|
||||
const result = await sync.syncToProvider(provider, payload);
|
||||
|
||||
if (result.success) {
|
||||
// Apply merged data if a three-way merge happened
|
||||
if (result.mergedPayload && onApplyPayload) {
|
||||
onApplyPayload(result.mergedPayload);
|
||||
}
|
||||
toast.success(t('cloudSync.sync.success', { provider }));
|
||||
} else if (result.conflictDetected) {
|
||||
// Conflict modal will show automatically
|
||||
|
||||
@@ -124,7 +124,7 @@ interface TerminalProps {
|
||||
keyBindings?: KeyBinding[];
|
||||
onHotkeyAction?: (action: string, event: KeyboardEvent) => void;
|
||||
onStatusChange?: (sessionId: string, status: TerminalSession["status"]) => void;
|
||||
onSessionExit?: (sessionId: string) => void;
|
||||
onSessionExit?: (sessionId: string, evt: { exitCode?: number; signal?: number; error?: string; reason?: "exited" | "error" | "timeout" | "closed" }) => void;
|
||||
onTerminalDataCapture?: (sessionId: string, data: string) => void;
|
||||
onOsDetected?: (hostId: string, distro: string) => void;
|
||||
onCloseSession?: (sessionId: string) => void;
|
||||
@@ -247,8 +247,9 @@ const TerminalComponent: React.FC<TerminalProps> = ({
|
||||
// Host-level toggle: undefined = inherit global, true/false = explicit override
|
||||
const hostEnabled = host?.keywordHighlightEnabled;
|
||||
|
||||
// If host explicitly disabled highlighting, disable everything for this terminal
|
||||
const effectiveGlobalEnabled = hostEnabled === false ? false : globalEnabled;
|
||||
// Global and host-level highlights are independent:
|
||||
// global toggle controls global rules, host toggle controls host-specific rules
|
||||
const effectiveGlobalEnabled = globalEnabled;
|
||||
const effectiveHostEnabled = hostEnabled ?? false;
|
||||
|
||||
const mergedRules = [
|
||||
@@ -542,7 +543,7 @@ const TerminalComponent: React.FC<TerminalProps> = ({
|
||||
const hostRules = host?.keywordHighlightRules ?? [];
|
||||
const globalEnabled = terminalSettingsRef.current?.keywordHighlightEnabled ?? false;
|
||||
const hostEnabled = host?.keywordHighlightEnabled;
|
||||
const effectiveGlobalEnabled = hostEnabled === false ? false : globalEnabled;
|
||||
const effectiveGlobalEnabled = globalEnabled;
|
||||
const effectiveHostEnabled = hostEnabled ?? false;
|
||||
const mergedRules = [
|
||||
...(effectiveGlobalEnabled ? globalRules : []),
|
||||
|
||||
@@ -171,9 +171,16 @@ const TerminalLayerInner: React.FC<TerminalLayerProps> = ({
|
||||
onUpdateSessionStatus(sessionId, status);
|
||||
}, [onUpdateSessionStatus]);
|
||||
|
||||
const handleSessionExit = useCallback((sessionId: string) => {
|
||||
onUpdateSessionStatus(sessionId, 'disconnected');
|
||||
}, [onUpdateSessionStatus]);
|
||||
const handleSessionExit = useCallback((sessionId: string, evt: { exitCode?: number; signal?: number; error?: string; reason?: "exited" | "error" | "timeout" | "closed" }) => {
|
||||
// Auto-close the tab/session when the user actively exited (e.g. typed `exit`)
|
||||
// reason === "exited" means the remote process/shell exited normally (stream-level close),
|
||||
// as opposed to network errors, timeouts, or connection-level drops
|
||||
if (evt.reason === "exited") {
|
||||
onCloseSession(sessionId);
|
||||
} else {
|
||||
onUpdateSessionStatus(sessionId, 'disconnected');
|
||||
}
|
||||
}, [onUpdateSessionStatus, onCloseSession]);
|
||||
|
||||
const handleOsDetected = useCallback((hostId: string, distro: string) => {
|
||||
onUpdateHostDistro(hostId, distro);
|
||||
@@ -929,15 +936,7 @@ const TerminalLayerInner: React.FC<TerminalLayerProps> = ({
|
||||
const aiState = useAIState();
|
||||
const { cleanupOrphanedSessions } = aiState;
|
||||
|
||||
// On mount: clean up orphaned AI sessions after a short delay
|
||||
// (allows sessions/workspaces to fully initialize)
|
||||
const hasCleanedUpRef = useRef(false);
|
||||
useEffect(() => {
|
||||
if (hasCleanedUpRef.current) return;
|
||||
// Guard: wait until both sessions AND workspaces have loaded to avoid
|
||||
// racing with partial state (e.g. sessions loaded but workspaces not yet).
|
||||
if (sessions.length === 0 || workspaces.length === 0) return;
|
||||
hasCleanedUpRef.current = true;
|
||||
const activeIds = new Set<string>();
|
||||
for (const s of sessions) activeIds.add(s.id);
|
||||
for (const w of workspaces) activeIds.add(w.id);
|
||||
@@ -1333,6 +1332,7 @@ const TerminalLayerInner: React.FC<TerminalLayerProps> = ({
|
||||
createSession={aiState.createSession}
|
||||
deleteSession={aiState.deleteSession}
|
||||
updateSessionTitle={aiState.updateSessionTitle}
|
||||
updateSessionExternalSessionId={aiState.updateSessionExternalSessionId}
|
||||
addMessageToSession={aiState.addMessageToSession}
|
||||
updateLastMessage={aiState.updateLastMessage}
|
||||
updateMessageById={aiState.updateMessageById}
|
||||
@@ -1358,7 +1358,7 @@ const TerminalLayerInner: React.FC<TerminalLayerProps> = ({
|
||||
}).filter((id): id is string => !!id)
|
||||
: activeSession?.hostId ? [activeSession.hostId] : []
|
||||
}
|
||||
scopeLabel={activeWorkspace?.name ?? activeSession?.label ?? ''}
|
||||
scopeLabel={activeWorkspace?.title ?? activeSession?.hostLabel ?? ''}
|
||||
terminalSessions={aiTerminalSessions}
|
||||
/>
|
||||
</div>
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import { cn } from '../../lib/utils';
|
||||
import { ChevronDown, ChevronRight, CheckCircle2, Loader2, XCircle } from 'lucide-react';
|
||||
import { ChevronDown, ChevronRight, CheckCircle2, Loader2, XCircle, Slash } from 'lucide-react';
|
||||
import type { HTMLAttributes } from 'react';
|
||||
import { useState } from 'react';
|
||||
|
||||
@@ -9,13 +9,16 @@ export interface ToolCallProps extends HTMLAttributes<HTMLDivElement> {
|
||||
result?: unknown;
|
||||
isError?: boolean;
|
||||
isLoading?: boolean;
|
||||
isInterrupted?: boolean;
|
||||
}
|
||||
|
||||
export const ToolCall = ({ name, args, result, isError, isLoading, className, ...props }: ToolCallProps) => {
|
||||
export const ToolCall = ({ name, args, result, isError, isLoading, isInterrupted, className, ...props }: ToolCallProps) => {
|
||||
const [expanded, setExpanded] = useState(false);
|
||||
|
||||
const statusIcon = isLoading ? (
|
||||
<Loader2 size={12} className="animate-spin text-blue-400/70" />
|
||||
) : isInterrupted ? (
|
||||
<Slash size={12} className="text-muted-foreground/55" />
|
||||
) : isError ? (
|
||||
<XCircle size={12} className="text-red-400/70" />
|
||||
) : result !== undefined ? (
|
||||
@@ -58,6 +61,14 @@ export const ToolCall = ({ name, args, result, isError, isLoading, className, ..
|
||||
</pre>
|
||||
</div>
|
||||
)}
|
||||
{isInterrupted && result === undefined && (
|
||||
<div className="px-3 py-2 border-t border-border/20">
|
||||
<div className="text-[10px] font-medium uppercase tracking-wider text-muted-foreground/30 mb-1">Status</div>
|
||||
<div className="text-[11px] text-muted-foreground/50">
|
||||
Interrupted
|
||||
</div>
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
|
||||
@@ -30,6 +30,11 @@ interface ChatMessageListProps {
|
||||
const ChatMessageList: React.FC<ChatMessageListProps> = ({ messages, isStreaming, onApprove, onReject }) => {
|
||||
const { t } = useI18n();
|
||||
const visibleMessages = messages.filter(m => m.role !== 'system');
|
||||
const resolvedToolCallIds = new Set(
|
||||
visibleMessages
|
||||
.filter((m) => m.role === 'tool')
|
||||
.flatMap((m) => m.toolResults?.map((tr) => tr.toolCallId) ?? []),
|
||||
);
|
||||
|
||||
if (visibleMessages.length === 0 && !isStreaming) {
|
||||
return (
|
||||
@@ -107,6 +112,7 @@ const ChatMessageList: React.FC<ChatMessageListProps> = ({ messages, isStreaming
|
||||
name={tc.name}
|
||||
args={tc.arguments}
|
||||
isLoading={isThisStreaming && message.executionStatus === 'running'}
|
||||
isInterrupted={message.executionStatus === 'cancelled' && !resolvedToolCallIds.has(tc.id)}
|
||||
/>
|
||||
))}
|
||||
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
* - Error reporting
|
||||
*/
|
||||
|
||||
import React, { useCallback, useRef, useState } from 'react';
|
||||
import React, { useCallback, useEffect, useRef, useState } from 'react';
|
||||
import { streamText, stepCountIs, type ModelMessage } from 'ai';
|
||||
import type {
|
||||
AIPermissionMode,
|
||||
@@ -24,7 +24,7 @@ import { isWebSearchReady } from '../../../infrastructure/ai/types';
|
||||
import { buildSystemPrompt } from '../../../infrastructure/ai/cattyAgent/systemPrompt';
|
||||
import { createModelFromConfig } from '../../../infrastructure/ai/sdk/providers';
|
||||
import { createCattyTools } from '../../../infrastructure/ai/sdk/tools';
|
||||
import type { NetcattyBridge } from '../../../infrastructure/ai/cattyAgent/executor';
|
||||
import type { NetcattyBridge, ExecutorContext } from '../../../infrastructure/ai/cattyAgent/executor';
|
||||
import { runExternalAgentTurn } from '../../../infrastructure/ai/externalAgentAdapter';
|
||||
import { runAcpAgentTurn } from '../../../infrastructure/ai/acpAgentAdapter';
|
||||
import { classifyError, sanitizeErrorMessage } from '../../../infrastructure/ai/errorClassifier';
|
||||
@@ -141,6 +141,20 @@ function generateId(): string {
|
||||
return `msg-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
|
||||
}
|
||||
|
||||
const sharedStreamingSessionIds = new Set<string>();
|
||||
const sharedAbortControllers = new Map<string, AbortController>();
|
||||
const streamingSubscribers = new Set<() => void>();
|
||||
|
||||
function emitStreamingStoreChange(): void {
|
||||
streamingSubscribers.forEach(listener => {
|
||||
try {
|
||||
listener();
|
||||
} catch (err) {
|
||||
console.error('[AIChatStreaming] Failed to notify streaming subscriber:', err);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------
|
||||
// Hook parameters
|
||||
// -------------------------------------------------------------------
|
||||
@@ -207,12 +221,16 @@ export interface SendToCattyContext {
|
||||
commandBlocklist?: string[];
|
||||
terminalSessions: TerminalSessionInfo[];
|
||||
webSearchConfig?: WebSearchConfig | null;
|
||||
getExecutorContext?: () => ExecutorContext;
|
||||
setPendingApproval: (ctx: PendingApprovalContext | null) => void;
|
||||
autoTitleSession: (sessionId: string, text: string) => void;
|
||||
}
|
||||
|
||||
/** Context values needed by sendToExternalAgent that change frequently. */
|
||||
export interface SendToExternalContext {
|
||||
existingSessionId?: string;
|
||||
updateExternalSessionId?: (sessionId: string, externalSessionId: string | undefined) => void;
|
||||
historyMessages?: Array<{ role: 'user' | 'assistant'; content: string }>;
|
||||
terminalSessions: TerminalSessionInfo[];
|
||||
providers: ProviderConfig[];
|
||||
selectedAgentModel?: string;
|
||||
@@ -229,17 +247,34 @@ export function useAIChatStreaming({
|
||||
updateMessageById,
|
||||
}: UseAIChatStreamingParams): UseAIChatStreamingReturn {
|
||||
// Per-session streaming state (keyed by sessionId)
|
||||
const [streamingSessionIds, setStreamingSessions] = useState<Set<string>>(new Set());
|
||||
const [streamingSessionIds, setStreamingSessions] = useState<Set<string>>(
|
||||
() => new Set(sharedStreamingSessionIds),
|
||||
);
|
||||
useEffect(() => {
|
||||
const syncFromStore = () => {
|
||||
setStreamingSessions(new Set(sharedStreamingSessionIds));
|
||||
};
|
||||
streamingSubscribers.add(syncFromStore);
|
||||
syncFromStore();
|
||||
return () => {
|
||||
streamingSubscribers.delete(syncFromStore);
|
||||
};
|
||||
}, []);
|
||||
|
||||
const setStreamingForScope = useCallback((key: string, val: boolean) => {
|
||||
setStreamingSessions(prev => {
|
||||
const next = new Set(prev);
|
||||
if (val) next.add(key); else next.delete(key);
|
||||
return next;
|
||||
});
|
||||
const hadKey = sharedStreamingSessionIds.has(key);
|
||||
if (val) {
|
||||
sharedStreamingSessionIds.add(key);
|
||||
} else {
|
||||
sharedStreamingSessionIds.delete(key);
|
||||
}
|
||||
if (hadKey !== val) {
|
||||
emitStreamingStoreChange();
|
||||
}
|
||||
}, []);
|
||||
|
||||
// Per-scope abort controllers
|
||||
const abortControllersRef = useRef<Map<string, AbortController>>(new Map());
|
||||
const abortControllersRef = useRef<Map<string, AbortController>>(sharedAbortControllers);
|
||||
|
||||
// -------------------------------------------------------------------
|
||||
// reportStreamError
|
||||
@@ -581,6 +616,9 @@ export function useAIChatStreaming({
|
||||
maybeCreateAssistantMsg();
|
||||
updateLastMessage(sessionId, msg => ({ ...msg, statusText: message }));
|
||||
},
|
||||
onSessionId: (externalSessionId: string) => {
|
||||
context.updateExternalSessionId?.(sessionId, externalSessionId);
|
||||
},
|
||||
onError: (error: string) => {
|
||||
reportStreamError(sessionId, abortController.signal, error);
|
||||
setStreamingForScope(sessionId, false);
|
||||
@@ -590,6 +628,8 @@ export function useAIChatStreaming({
|
||||
abortController.signal,
|
||||
agentProviderId,
|
||||
context.selectedAgentModel,
|
||||
context.existingSessionId,
|
||||
context.historyMessages,
|
||||
attachedImages.length > 0 ? attachedImages : undefined,
|
||||
);
|
||||
} else {
|
||||
@@ -629,11 +669,18 @@ export function useAIChatStreaming({
|
||||
context: SendToCattyContext,
|
||||
) => {
|
||||
const bridge = getNetcattyBridge();
|
||||
const tools = createCattyTools(bridge, {
|
||||
const toolContext = context.getExecutorContext ?? (() => ({
|
||||
sessions: context.terminalSessions,
|
||||
workspaceId: context.scopeTargetId,
|
||||
workspaceName: context.scopeLabel,
|
||||
}, context.commandBlocklist, context.globalPermissionMode, context.webSearchConfig ?? undefined);
|
||||
workspaceId: context.scopeType === 'workspace' ? context.scopeTargetId : undefined,
|
||||
workspaceName: context.scopeType === 'workspace' ? context.scopeLabel : undefined,
|
||||
}));
|
||||
const tools = createCattyTools(
|
||||
bridge,
|
||||
toolContext,
|
||||
context.commandBlocklist,
|
||||
context.globalPermissionMode,
|
||||
context.webSearchConfig ?? undefined,
|
||||
);
|
||||
|
||||
const systemPrompt = buildSystemPrompt({
|
||||
scopeType: context.scopeType, scopeLabel: context.scopeLabel,
|
||||
|
||||
@@ -31,6 +31,9 @@ function generateId(): string {
|
||||
return `msg-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
|
||||
}
|
||||
|
||||
let sharedPendingApprovalContext: PendingApprovalContext | null = null;
|
||||
let sharedPendingApprovalTimeout: ReturnType<typeof setTimeout> | null = null;
|
||||
|
||||
// -------------------------------------------------------------------
|
||||
// Hook parameters
|
||||
// -------------------------------------------------------------------
|
||||
@@ -95,23 +98,23 @@ export function useToolApproval({
|
||||
t,
|
||||
}: UseToolApprovalParams): UseToolApprovalReturn {
|
||||
// Pending approval context — stores SDK state needed to resume after user approves/rejects
|
||||
const pendingApprovalContextRef = useRef<PendingApprovalContext | null>(null);
|
||||
|
||||
// Timeout ID for auto-clearing stale pending approval (Issue #14)
|
||||
const pendingApprovalTimeoutRef = useRef<ReturnType<typeof setTimeout> | null>(null);
|
||||
const pendingApprovalContextRef = useRef<PendingApprovalContext | null>(sharedPendingApprovalContext);
|
||||
pendingApprovalContextRef.current = sharedPendingApprovalContext;
|
||||
|
||||
/** Set pending approval context with a 5-minute auto-clear timeout. */
|
||||
const setPendingApproval = useCallback((ctx: PendingApprovalContext | null) => {
|
||||
// Clear any existing timeout
|
||||
if (pendingApprovalTimeoutRef.current) {
|
||||
clearTimeout(pendingApprovalTimeoutRef.current);
|
||||
pendingApprovalTimeoutRef.current = null;
|
||||
if (sharedPendingApprovalTimeout) {
|
||||
clearTimeout(sharedPendingApprovalTimeout);
|
||||
sharedPendingApprovalTimeout = null;
|
||||
}
|
||||
sharedPendingApprovalContext = ctx;
|
||||
pendingApprovalContextRef.current = ctx;
|
||||
if (ctx) {
|
||||
pendingApprovalTimeoutRef.current = setTimeout(() => {
|
||||
sharedPendingApprovalTimeout = setTimeout(() => {
|
||||
// Auto-clear after 5 minutes if user never responds
|
||||
if (pendingApprovalContextRef.current?.sessionId === ctx.sessionId) {
|
||||
if (sharedPendingApprovalContext?.sessionId === ctx.sessionId) {
|
||||
sharedPendingApprovalContext = null;
|
||||
pendingApprovalContextRef.current = null;
|
||||
setStreamingForScope(ctx.sessionId, false);
|
||||
abortControllersRef.current.get(ctx.sessionId)?.abort();
|
||||
@@ -129,7 +132,7 @@ export function useToolApproval({
|
||||
timestamp: Date.now(),
|
||||
});
|
||||
}
|
||||
pendingApprovalTimeoutRef.current = null;
|
||||
sharedPendingApprovalTimeout = null;
|
||||
}, 5 * 60 * 1000); // 5 minutes
|
||||
}
|
||||
}, [setStreamingForScope, abortControllersRef, updateLastMessage, addMessageToSession, t]);
|
||||
@@ -219,8 +222,8 @@ export function useToolApproval({
|
||||
const bridge = getNetcattyBridge();
|
||||
const freshTools = createCattyTools(bridge, {
|
||||
sessions: approvalContext.terminalSessions,
|
||||
workspaceId: approvalContext.scopeTargetId,
|
||||
workspaceName: approvalContext.scopeLabel,
|
||||
workspaceId: approvalContext.scopeType === 'workspace' ? approvalContext.scopeTargetId : undefined,
|
||||
workspaceName: approvalContext.scopeType === 'workspace' ? approvalContext.scopeLabel : undefined,
|
||||
}, approvalContext.commandBlocklist, approvalContext.globalPermissionMode, approvalContext.webSearchConfig ?? undefined);
|
||||
const freshSystemPrompt = buildSystemPrompt({
|
||||
scopeType: approvalContext.scopeType, scopeLabel: approvalContext.scopeLabel,
|
||||
|
||||
@@ -41,7 +41,7 @@ type TerminalBackendApi = {
|
||||
onSessionData: (sessionId: string, cb: (data: string) => void) => () => void;
|
||||
onSessionExit: (
|
||||
sessionId: string,
|
||||
cb: (evt: { exitCode?: number; signal?: number }) => void,
|
||||
cb: (evt: { exitCode?: number; signal?: number; error?: string; reason?: "exited" | "error" | "timeout" | "closed" }) => void,
|
||||
) => () => void;
|
||||
onChainProgress: (
|
||||
cb: (hop: number, total: number, label: string, status: string) => void,
|
||||
@@ -100,7 +100,7 @@ export type TerminalSessionStartersContext = {
|
||||
t?: (key: string) => string;
|
||||
|
||||
onSessionAttached?: (sessionId: string) => void;
|
||||
onSessionExit?: (sessionId: string) => void;
|
||||
onSessionExit?: (sessionId: string, evt: { exitCode?: number; signal?: number; error?: string; reason?: "exited" | "error" | "timeout" | "closed" }) => void;
|
||||
onTerminalDataCapture?: (sessionId: string, data: string) => void;
|
||||
onOsDetected?: (hostId: string, distro: string) => void;
|
||||
onCommandExecuted?: (
|
||||
@@ -213,7 +213,7 @@ const attachSessionToTerminal = (
|
||||
}
|
||||
}
|
||||
|
||||
ctx.onSessionExit?.(ctx.sessionId);
|
||||
ctx.onSessionExit?.(ctx.sessionId, evt);
|
||||
});
|
||||
};
|
||||
|
||||
@@ -754,7 +754,7 @@ export const createTerminalSessionStarters = (ctx: TerminalSessionStartersContex
|
||||
}
|
||||
}
|
||||
|
||||
ctx.onSessionExit?.(ctx.sessionId);
|
||||
ctx.onSessionExit?.(ctx.sessionId, evt);
|
||||
});
|
||||
} catch (err) {
|
||||
const message = err instanceof Error ? err.message : String(err);
|
||||
|
||||
@@ -31,9 +31,10 @@ export type SyncState =
|
||||
/**
|
||||
* Conflict Resolution Strategy
|
||||
*/
|
||||
export type ConflictResolution =
|
||||
| 'USE_REMOTE' // Download cloud data, overwrite local
|
||||
| 'USE_LOCAL'; // Upload local data, overwrite cloud
|
||||
export type ConflictResolution =
|
||||
| 'USE_REMOTE' // Download cloud data, overwrite local
|
||||
| 'USE_LOCAL' // Upload local data, overwrite cloud
|
||||
| 'AUTO_MERGED'; // Three-way merge was applied automatically
|
||||
|
||||
// ============================================================================
|
||||
// Cloud Provider Types
|
||||
@@ -275,10 +276,12 @@ export interface UnlockedMasterKey {
|
||||
export interface SyncResult {
|
||||
success: boolean;
|
||||
provider: CloudProvider;
|
||||
action: 'upload' | 'download' | 'none';
|
||||
action: 'upload' | 'download' | 'merge' | 'none';
|
||||
version?: number;
|
||||
error?: string;
|
||||
conflictDetected?: boolean;
|
||||
/** Present when action === 'merge'; caller should apply this to update local state */
|
||||
mergedPayload?: import('./sync').SyncPayload;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -312,7 +315,7 @@ export interface SyncHistoryEntry {
|
||||
id: string;
|
||||
timestamp: number;
|
||||
provider: CloudProvider;
|
||||
action: 'upload' | 'download' | 'conflict_resolved';
|
||||
action: 'upload' | 'download' | 'merge' | 'conflict_resolved';
|
||||
success: boolean;
|
||||
localVersion: number;
|
||||
remoteVersion?: number;
|
||||
@@ -405,6 +408,7 @@ export const SYNC_STORAGE_KEYS = {
|
||||
PROVIDER_S3: 'netcatty_provider_s3_v1',
|
||||
PROVIDER_SMB: 'netcatty_provider_smb_v1',
|
||||
LOCAL_SYNC_META: 'netcatty_local_sync_meta_v1',
|
||||
SYNC_BASE_PAYLOAD: 'netcatty_sync_base_payload_v1',
|
||||
} as const;
|
||||
|
||||
// ============================================================================
|
||||
|
||||
432
domain/syncMerge.ts
Normal file
432
domain/syncMerge.ts
Normal file
@@ -0,0 +1,432 @@
|
||||
/**
|
||||
* Three-Way Merge for Cloud Sync Payloads
|
||||
*
|
||||
* Implements a Git-style three-way merge using a stored "base" snapshot
|
||||
* (the last successfully synced payload) to detect per-entity changes
|
||||
* on both the local and remote sides.
|
||||
*
|
||||
* Algorithm:
|
||||
* For each entity (identified by `id`):
|
||||
* - Only in local → local addition → keep
|
||||
* - Only in remote → remote addition → keep
|
||||
* - In base, removed locally → local deletion → remove (unless remote modified)
|
||||
* - In base, removed remotely → remote deletion → remove (unless local modified)
|
||||
* - Modified only locally → keep local version
|
||||
* - Modified only remotely → keep remote version
|
||||
* - Modified on both sides → prefer local (conflict logged)
|
||||
*
|
||||
* When no base is available (first sync), falls back to a set-union
|
||||
* merge by entity ID, preferring local for duplicates.
|
||||
*/
|
||||
|
||||
import type { SyncPayload } from './sync';
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Public types
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
export interface MergeSummary {
|
||||
added: { local: number; remote: number };
|
||||
deleted: { local: number; remote: number };
|
||||
modified: { local: number; remote: number; conflicts: number };
|
||||
}
|
||||
|
||||
export interface MergeResult {
|
||||
payload: SyncPayload;
|
||||
/** True when both sides modified the same entity (resolved by preferring local) */
|
||||
hadConflicts: boolean;
|
||||
summary: MergeSummary;
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Deterministic JSON string for content comparison.
|
||||
* Sorts object keys to avoid false diffs from key ordering.
|
||||
*/
|
||||
function fingerprint(value: unknown): string {
|
||||
return JSON.stringify(value, (_key, v) => {
|
||||
if (v && typeof v === 'object' && !Array.isArray(v)) {
|
||||
return Object.keys(v).sort().reduce<Record<string, unknown>>((acc, k) => {
|
||||
acc[k] = (v as Record<string, unknown>)[k];
|
||||
return acc;
|
||||
}, {});
|
||||
}
|
||||
return v;
|
||||
});
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Entity-array merge (hosts, keys, identities, snippets, etc.)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
interface EntityMergeResult<T> {
|
||||
merged: T[];
|
||||
conflicts: number;
|
||||
added: { local: number; remote: number };
|
||||
deleted: { local: number; remote: number };
|
||||
modified: { local: number; remote: number };
|
||||
}
|
||||
|
||||
function mergeEntityArrays<T extends { id: string }>(
|
||||
base: T[],
|
||||
local: T[],
|
||||
remote: T[],
|
||||
): EntityMergeResult<T> {
|
||||
const baseMap = new Map(base.map((e) => [e.id, e]));
|
||||
const localMap = new Map(local.map((e) => [e.id, e]));
|
||||
const remoteMap = new Map(remote.map((e) => [e.id, e]));
|
||||
|
||||
const allIds = new Set([
|
||||
...baseMap.keys(),
|
||||
...localMap.keys(),
|
||||
...remoteMap.keys(),
|
||||
]);
|
||||
|
||||
const merged: T[] = [];
|
||||
let conflicts = 0;
|
||||
const added = { local: 0, remote: 0 };
|
||||
const deleted = { local: 0, remote: 0 };
|
||||
const modified = { local: 0, remote: 0 };
|
||||
|
||||
for (const id of allIds) {
|
||||
const baseItem = baseMap.get(id);
|
||||
const localItem = localMap.get(id);
|
||||
const remoteItem = remoteMap.get(id);
|
||||
|
||||
const inBase = baseItem !== undefined;
|
||||
const inLocal = localItem !== undefined;
|
||||
const inRemote = remoteItem !== undefined;
|
||||
|
||||
if (!inBase && inLocal && !inRemote) {
|
||||
// Local addition
|
||||
merged.push(localItem);
|
||||
added.local++;
|
||||
} else if (!inBase && !inLocal && inRemote) {
|
||||
// Remote addition
|
||||
merged.push(remoteItem);
|
||||
added.remote++;
|
||||
} else if (!inBase && inLocal && inRemote) {
|
||||
// Both added same ID — prefer local
|
||||
merged.push(localItem);
|
||||
if (fingerprint(localItem) !== fingerprint(remoteItem)) {
|
||||
conflicts++;
|
||||
}
|
||||
} else if (inBase && inLocal && inRemote) {
|
||||
// Exists in all three — compare changes
|
||||
const localChanged = fingerprint(localItem) !== fingerprint(baseItem);
|
||||
const remoteChanged = fingerprint(remoteItem) !== fingerprint(baseItem);
|
||||
|
||||
if (!localChanged && !remoteChanged) {
|
||||
merged.push(baseItem);
|
||||
} else if (localChanged && !remoteChanged) {
|
||||
merged.push(localItem);
|
||||
modified.local++;
|
||||
} else if (!localChanged && remoteChanged) {
|
||||
merged.push(remoteItem);
|
||||
modified.remote++;
|
||||
} else {
|
||||
// Both changed — prefer local
|
||||
merged.push(localItem);
|
||||
if (fingerprint(localItem) !== fingerprint(remoteItem)) {
|
||||
conflicts++;
|
||||
}
|
||||
modified.local++;
|
||||
modified.remote++;
|
||||
}
|
||||
} else if (inBase && !inLocal && inRemote) {
|
||||
// Local deleted
|
||||
const remoteChanged = fingerprint(remoteItem) !== fingerprint(baseItem);
|
||||
if (remoteChanged) {
|
||||
// Remote modified + local deleted → keep modification (safer)
|
||||
merged.push(remoteItem);
|
||||
conflicts++;
|
||||
} else {
|
||||
deleted.local++;
|
||||
}
|
||||
} else if (inBase && inLocal && !inRemote) {
|
||||
// Remote deleted
|
||||
const localChanged = fingerprint(localItem) !== fingerprint(baseItem);
|
||||
if (localChanged) {
|
||||
// Local modified + remote deleted → keep modification (safer)
|
||||
merged.push(localItem);
|
||||
conflicts++;
|
||||
} else {
|
||||
deleted.remote++;
|
||||
}
|
||||
}
|
||||
// inBase && !inLocal && !inRemote → both deleted → gone
|
||||
}
|
||||
|
||||
return { merged, conflicts, added, deleted, modified };
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// String-array merge (customGroups, snippetPackages)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
function mergeStringArrays(
|
||||
base: string[],
|
||||
local: string[],
|
||||
remote: string[],
|
||||
): string[] {
|
||||
const baseSet = new Set(base);
|
||||
const localSet = new Set(local);
|
||||
const remoteSet = new Set(remote);
|
||||
|
||||
const result = new Set<string>();
|
||||
|
||||
// Start with base items, then apply additions/deletions
|
||||
const allValues = new Set([...baseSet, ...localSet, ...remoteSet]);
|
||||
|
||||
for (const value of allValues) {
|
||||
const inBase = baseSet.has(value);
|
||||
const inLocal = localSet.has(value);
|
||||
const inRemote = remoteSet.has(value);
|
||||
|
||||
if (!inBase) {
|
||||
// Addition — keep if either side added it
|
||||
if (inLocal || inRemote) result.add(value);
|
||||
} else {
|
||||
// Was in base — keep unless both sides deleted
|
||||
const localDeleted = !inLocal;
|
||||
const remoteDeleted = !inRemote;
|
||||
if (localDeleted && remoteDeleted) {
|
||||
// Both deleted — gone
|
||||
} else if (localDeleted || remoteDeleted) {
|
||||
// Only one side deleted — honour the deletion
|
||||
// (If the other side didn't touch it, it's still in their set from base)
|
||||
} else {
|
||||
result.add(value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return [...result];
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Settings merge (flat key-value)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
type SettingsObj = NonNullable<SyncPayload['settings']>;
|
||||
|
||||
/** Check if an array contains objects with `id` fields (for entity merge). */
|
||||
function isIdArray(arr: unknown[]): boolean {
|
||||
return arr.length > 0 && typeof arr[0] === 'object' && arr[0] !== null && 'id' in arr[0];
|
||||
}
|
||||
|
||||
/** Recursively merge two plain objects against a base using three-way logic. */
|
||||
function mergeSettingsDeep(
|
||||
base: Record<string, unknown>,
|
||||
local: Record<string, unknown>,
|
||||
remote: Record<string, unknown>,
|
||||
): Record<string, unknown> {
|
||||
const allKeys = new Set([
|
||||
...Object.keys(base),
|
||||
...Object.keys(local),
|
||||
...Object.keys(remote),
|
||||
]);
|
||||
const merged: Record<string, unknown> = {};
|
||||
for (const key of allKeys) {
|
||||
const bVal = base[key];
|
||||
const lVal = local[key];
|
||||
const rVal = remote[key];
|
||||
const lChanged = fingerprint(lVal) !== fingerprint(bVal);
|
||||
const rChanged = fingerprint(rVal) !== fingerprint(bVal);
|
||||
|
||||
if (!lChanged && !rChanged) {
|
||||
if (bVal !== undefined) merged[key] = bVal;
|
||||
} else if (lChanged && !rChanged) {
|
||||
if (lVal !== undefined) merged[key] = lVal;
|
||||
} else if (!lChanged && rChanged) {
|
||||
if (rVal !== undefined) merged[key] = rVal;
|
||||
} else {
|
||||
// Both changed — recurse if both are plain objects, else prefer local
|
||||
if (
|
||||
lVal && rVal &&
|
||||
typeof lVal === 'object' && !Array.isArray(lVal) &&
|
||||
typeof rVal === 'object' && !Array.isArray(rVal)
|
||||
) {
|
||||
merged[key] = mergeSettingsDeep(
|
||||
(bVal && typeof bVal === 'object' && !Array.isArray(bVal) ? bVal : {}) as Record<string, unknown>,
|
||||
lVal as Record<string, unknown>,
|
||||
rVal as Record<string, unknown>,
|
||||
);
|
||||
} else if (lVal !== undefined) {
|
||||
merged[key] = lVal;
|
||||
}
|
||||
}
|
||||
}
|
||||
return merged;
|
||||
}
|
||||
|
||||
function mergeSettings(
|
||||
base: SettingsObj | undefined,
|
||||
local: SettingsObj | undefined,
|
||||
remote: SettingsObj | undefined,
|
||||
): SettingsObj | undefined {
|
||||
if (!local && !remote) return undefined;
|
||||
if (!local) return remote;
|
||||
if (!remote) return local;
|
||||
|
||||
const b = base ?? {};
|
||||
const allKeys = new Set([
|
||||
...Object.keys(b),
|
||||
...Object.keys(local),
|
||||
...Object.keys(remote),
|
||||
]);
|
||||
|
||||
const merged: Record<string, unknown> = {};
|
||||
|
||||
for (const key of allKeys) {
|
||||
const bVal = (b as Record<string, unknown>)[key];
|
||||
const lVal = (local as Record<string, unknown>)[key];
|
||||
const rVal = (remote as Record<string, unknown>)[key];
|
||||
|
||||
const lChanged = fingerprint(lVal) !== fingerprint(bVal);
|
||||
const rChanged = fingerprint(rVal) !== fingerprint(bVal);
|
||||
|
||||
if (!lChanged && !rChanged) {
|
||||
if (bVal !== undefined) merged[key] = bVal;
|
||||
} else if (lChanged && !rChanged) {
|
||||
if (lVal !== undefined) merged[key] = lVal;
|
||||
} else if (!lChanged && rChanged) {
|
||||
if (rVal !== undefined) merged[key] = rVal;
|
||||
} else {
|
||||
// Both changed — deep merge if both are plain objects, else prefer local
|
||||
if (
|
||||
lVal && rVal &&
|
||||
typeof lVal === 'object' && !Array.isArray(lVal) &&
|
||||
typeof rVal === 'object' && !Array.isArray(rVal)
|
||||
) {
|
||||
merged[key] = mergeSettingsDeep(
|
||||
(bVal && typeof bVal === 'object' && !Array.isArray(bVal) ? bVal : {}) as Record<string, unknown>,
|
||||
lVal as Record<string, unknown>,
|
||||
rVal as Record<string, unknown>,
|
||||
);
|
||||
} else if (
|
||||
Array.isArray(lVal) && Array.isArray(rVal) &&
|
||||
(isIdArray(lVal) || isIdArray(rVal) || isIdArray(Array.isArray(bVal) ? bVal as unknown[] : []))
|
||||
) {
|
||||
// Array of objects with `id` (e.g. customTerminalThemes) — entity merge
|
||||
const bArr = Array.isArray(bVal) ? bVal as Array<{ id: string }> : [];
|
||||
const result = mergeEntityArrays(bArr, lVal as Array<{ id: string }>, rVal as Array<{ id: string }>);
|
||||
merged[key] = result.merged;
|
||||
} else if (lVal !== undefined) {
|
||||
merged[key] = lVal;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return Object.keys(merged).length > 0 ? (merged as SettingsObj) : undefined;
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Main merge function
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Three-way merge of sync payloads.
|
||||
*
|
||||
* @param base - The last successfully synced payload (null if unavailable)
|
||||
* @param local - The current device's data
|
||||
* @param remote - The other device's data (downloaded from cloud)
|
||||
*/
|
||||
export function mergeSyncPayloads(
|
||||
base: SyncPayload | null,
|
||||
local: SyncPayload,
|
||||
remote: SyncPayload,
|
||||
): MergeResult {
|
||||
const emptyBase: SyncPayload = {
|
||||
hosts: [],
|
||||
keys: [],
|
||||
identities: [],
|
||||
snippets: [],
|
||||
customGroups: [],
|
||||
snippetPackages: [],
|
||||
knownHosts: [],
|
||||
portForwardingRules: [],
|
||||
settings: undefined,
|
||||
syncedAt: 0,
|
||||
};
|
||||
const b = base ?? emptyBase;
|
||||
|
||||
const summary: MergeSummary = {
|
||||
added: { local: 0, remote: 0 },
|
||||
deleted: { local: 0, remote: 0 },
|
||||
modified: { local: 0, remote: 0, conflicts: 0 },
|
||||
};
|
||||
|
||||
// Merge each entity type
|
||||
const hosts = mergeEntityArrays(b.hosts ?? [], local.hosts ?? [], remote.hosts ?? []);
|
||||
const keys = mergeEntityArrays(b.keys ?? [], local.keys ?? [], remote.keys ?? []);
|
||||
const identities = mergeEntityArrays(b.identities ?? [], local.identities ?? [], remote.identities ?? []);
|
||||
const snippets = mergeEntityArrays(b.snippets ?? [], local.snippets ?? [], remote.snippets ?? []);
|
||||
const knownHostsRaw = mergeEntityArrays(b.knownHosts ?? [], local.knownHosts ?? [], remote.knownHosts ?? []);
|
||||
// Deduplicate known hosts by (hostname, port, keyType) since IDs are random per device
|
||||
const knownHostSeen = new Set<string>();
|
||||
const knownHosts = {
|
||||
...knownHostsRaw,
|
||||
merged: knownHostsRaw.merged.filter((kh) => {
|
||||
const entry = kh as unknown as { hostname: string; port: number; keyType: string };
|
||||
const fp = `${entry.hostname}:${entry.port}:${entry.keyType}`;
|
||||
if (knownHostSeen.has(fp)) return false;
|
||||
knownHostSeen.add(fp);
|
||||
return true;
|
||||
}),
|
||||
};
|
||||
const portForwardingRules = mergeEntityArrays(
|
||||
b.portForwardingRules ?? [],
|
||||
local.portForwardingRules ?? [],
|
||||
remote.portForwardingRules ?? [],
|
||||
);
|
||||
|
||||
// Aggregate stats
|
||||
const entityResults = [hosts, keys, identities, snippets, knownHosts, portForwardingRules];
|
||||
for (const r of entityResults) {
|
||||
summary.added.local += r.added.local;
|
||||
summary.added.remote += r.added.remote;
|
||||
summary.deleted.local += r.deleted.local;
|
||||
summary.deleted.remote += r.deleted.remote;
|
||||
summary.modified.local += r.modified.local;
|
||||
summary.modified.remote += r.modified.remote;
|
||||
summary.modified.conflicts += r.conflicts;
|
||||
}
|
||||
|
||||
// Merge string arrays
|
||||
const customGroups = mergeStringArrays(
|
||||
b.customGroups ?? [],
|
||||
local.customGroups ?? [],
|
||||
remote.customGroups ?? [],
|
||||
);
|
||||
const snippetPackages = mergeStringArrays(
|
||||
b.snippetPackages ?? [],
|
||||
local.snippetPackages ?? [],
|
||||
remote.snippetPackages ?? [],
|
||||
);
|
||||
|
||||
// Merge settings
|
||||
const settings = mergeSettings(b.settings, local.settings, remote.settings);
|
||||
|
||||
const payload: SyncPayload = {
|
||||
hosts: hosts.merged,
|
||||
keys: keys.merged,
|
||||
identities: identities.merged,
|
||||
snippets: snippets.merged,
|
||||
customGroups,
|
||||
snippetPackages,
|
||||
knownHosts: knownHosts.merged,
|
||||
portForwardingRules: portForwardingRules.merged,
|
||||
settings,
|
||||
syncedAt: Date.now(),
|
||||
};
|
||||
|
||||
return {
|
||||
payload,
|
||||
hadConflicts: summary.modified.conflicts > 0,
|
||||
summary,
|
||||
};
|
||||
}
|
||||
@@ -156,7 +156,10 @@ function execViaPty(ptyStream, command, options) {
|
||||
* @param {number} [options.timeoutMs=60000] - Command timeout in milliseconds
|
||||
*/
|
||||
function execViaChannel(sshClient, command, options) {
|
||||
const { timeoutMs = 60000 } = options || {};
|
||||
const {
|
||||
timeoutMs = 60000,
|
||||
trackForCancellation = null,
|
||||
} = options || {};
|
||||
|
||||
return new Promise((resolve) => {
|
||||
sshClient.exec(command, (err, execStream) => {
|
||||
@@ -168,27 +171,40 @@ function execViaChannel(sshClient, command, options) {
|
||||
resolve({ ok: false, error: 'Failed to create exec stream', exitCode: 1 });
|
||||
return;
|
||||
}
|
||||
const marker = `__NCMCP_CH_${Date.now().toString(36)}_${crypto.randomBytes(16).toString('hex')}__`;
|
||||
let stdout = "";
|
||||
let stderr = "";
|
||||
let finished = false;
|
||||
const timeoutId = setTimeout(() => {
|
||||
if (finished) return;
|
||||
finished = true;
|
||||
try { execStream.close(); } catch { /* ignore */ }
|
||||
const timeoutSec = Math.round(timeoutMs / 1000);
|
||||
resolve({ ok: false, stdout, stderr, exitCode: -1, error: `Command timed out (${timeoutSec}s)` });
|
||||
}, timeoutMs);
|
||||
execStream.on("data", (data) => { stdout += data.toString(); });
|
||||
execStream.stderr.on("data", (data) => { stderr += data.toString(); });
|
||||
execStream.on("close", (code) => {
|
||||
const finish = (result) => {
|
||||
if (finished) return;
|
||||
finished = true;
|
||||
clearTimeout(timeoutId);
|
||||
if (trackForCancellation) {
|
||||
trackForCancellation.delete(marker);
|
||||
}
|
||||
resolve(result);
|
||||
};
|
||||
const timeoutId = setTimeout(() => {
|
||||
try { execStream.close(); } catch { /* ignore */ }
|
||||
const timeoutSec = Math.round(timeoutMs / 1000);
|
||||
finish({ ok: false, stdout, stderr, exitCode: -1, error: `Command timed out (${timeoutSec}s)` });
|
||||
}, timeoutMs);
|
||||
if (trackForCancellation) {
|
||||
trackForCancellation.set(marker, {
|
||||
cleanup: () => {
|
||||
clearTimeout(timeoutId);
|
||||
try { execStream.close(); } catch { /* ignore */ }
|
||||
},
|
||||
});
|
||||
}
|
||||
execStream.on("data", (data) => { stdout += data.toString(); });
|
||||
execStream.stderr.on("data", (data) => { stderr += data.toString(); });
|
||||
execStream.on("close", (code) => {
|
||||
// code is null when SSH disconnects or process is signal-terminated
|
||||
if (code == null) {
|
||||
resolve({ ok: false, stdout, stderr, exitCode: -1, error: "Command terminated unexpectedly (connection lost or signal)" });
|
||||
finish({ ok: false, stdout, stderr, exitCode: -1, error: "Command terminated unexpectedly (connection lost or signal)" });
|
||||
} else {
|
||||
resolve({ ok: code === 0, stdout, stderr, exitCode: code });
|
||||
finish({ ok: code === 0, stdout, stderr, exitCode: code });
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
@@ -56,6 +56,9 @@ const MAX_CONCURRENT_AGENTS = 5;
|
||||
// ACP providers (module-level so cleanup() can access them)
|
||||
const acpProviders = new Map();
|
||||
const acpActiveStreams = new Map();
|
||||
const acpRequestSessions = new Map();
|
||||
const acpForceProviderReset = new Set();
|
||||
const acpChatRuns = new Map();
|
||||
|
||||
// ── Provider registry (synced from renderer, keys stay encrypted) ──
|
||||
const ENC_PREFIX = "enc:v1:";
|
||||
@@ -137,6 +140,8 @@ function injectApiKeyIntoRequest(url, headers, providerId) {
|
||||
function cleanupAcpProvider(chatSessionId) {
|
||||
const entry = acpProviders.get(chatSessionId);
|
||||
if (!entry) return;
|
||||
const rootPid = entry.provider?.model?.agentProcess?.pid;
|
||||
const childPids = getChildProcessTreePids(rootPid);
|
||||
try {
|
||||
if (typeof entry.provider.forceCleanup === "function") {
|
||||
entry.provider.forceCleanup();
|
||||
@@ -146,9 +151,75 @@ function cleanupAcpProvider(chatSessionId) {
|
||||
} catch (err) {
|
||||
console.warn("[ACP] Provider cleanup failed for session", chatSessionId, err?.message || err);
|
||||
}
|
||||
killTrackedProcessTree(rootPid, childPids);
|
||||
acpProviders.delete(chatSessionId);
|
||||
}
|
||||
|
||||
function isActiveAcpRun(chatSessionId, requestId) {
|
||||
const activeRun = acpChatRuns.get(chatSessionId);
|
||||
return Boolean(activeRun && activeRun.requestId === requestId);
|
||||
}
|
||||
|
||||
function isUnsupportedLoadSessionError(err) {
|
||||
const message = String(err?.message || err || "").toLowerCase();
|
||||
return message.includes("method not found") && message.includes("session/load");
|
||||
}
|
||||
|
||||
function getChildProcessTreePids(rootPid) {
|
||||
if (!Number.isInteger(rootPid) || rootPid <= 0) return [];
|
||||
if (process.platform === "win32") return [];
|
||||
|
||||
const discovered = new Set();
|
||||
const queue = [rootPid];
|
||||
|
||||
while (queue.length > 0) {
|
||||
const pid = queue.shift();
|
||||
if (!Number.isInteger(pid) || pid <= 0) continue;
|
||||
try {
|
||||
const output = execFileSync("pgrep", ["-P", String(pid)], { encoding: "utf8" }).trim();
|
||||
if (!output) continue;
|
||||
for (const line of output.split(/\s+/)) {
|
||||
const childPid = Number(line);
|
||||
if (!Number.isInteger(childPid) || childPid <= 0 || discovered.has(childPid)) continue;
|
||||
discovered.add(childPid);
|
||||
queue.push(childPid);
|
||||
}
|
||||
} catch {
|
||||
// No child processes or pgrep unavailable.
|
||||
}
|
||||
}
|
||||
|
||||
return Array.from(discovered);
|
||||
}
|
||||
|
||||
function killTrackedProcessTree(rootPid, childPids) {
|
||||
if (process.platform === "win32") {
|
||||
if (Number.isInteger(rootPid) && rootPid > 0) {
|
||||
try {
|
||||
execFileSync("taskkill", ["/PID", String(rootPid), "/T", "/F"], { stdio: "ignore" });
|
||||
} catch {
|
||||
// Ignore kill failures; the process may have already exited.
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
const pids = [...(Array.isArray(childPids) ? childPids : [])];
|
||||
if (Number.isInteger(rootPid) && rootPid > 0) {
|
||||
pids.push(rootPid);
|
||||
}
|
||||
|
||||
// Kill children before the wrapper so orphaned grandchildren do not survive.
|
||||
for (const pid of pids.reverse()) {
|
||||
if (!Number.isInteger(pid) || pid <= 0) continue;
|
||||
try {
|
||||
process.kill(pid, "SIGKILL");
|
||||
} catch {
|
||||
// Ignore kill failures; the process may have already exited.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Safely send an IPC message to a renderer, guarding against destroyed senders.
|
||||
*/
|
||||
@@ -1545,12 +1616,14 @@ function registerHandlers(ipcMain) {
|
||||
|
||||
// ── ACP (Agent Client Protocol) streaming ──
|
||||
|
||||
ipcMain.handle("netcatty:ai:acp:stream", async (event, { requestId, chatSessionId, acpCommand, acpArgs, prompt, cwd, providerId, model, images }) => {
|
||||
ipcMain.handle("netcatty:ai:acp:stream", async (event, { requestId, chatSessionId, acpCommand, acpArgs, prompt, cwd, providerId, model, existingSessionId, historyMessages, images }) => {
|
||||
// Validate IPC sender (Issue #17)
|
||||
if (!validateSender(event)) {
|
||||
return { ok: false, error: "Unauthorized IPC sender" };
|
||||
}
|
||||
let abortController = null;
|
||||
try {
|
||||
mcpServerBridge.setChatSessionCancelled?.(chatSessionId, false);
|
||||
const { createACPProvider } = require("@mcpc-tech/acp-ai-provider");
|
||||
const { streamText, stepCountIs } = require("ai");
|
||||
|
||||
@@ -1601,9 +1674,22 @@ function registerHandlers(ipcMain) {
|
||||
mcpSnapshot.fingerprint = getCodexMcpFingerprint(mcpSnapshot.mcpServers);
|
||||
|
||||
const currentPermissionMode = mcpServerBridge.getPermissionMode();
|
||||
const existingRun = acpChatRuns.get(chatSessionId);
|
||||
if (existingRun && existingRun.requestId !== requestId) {
|
||||
existingRun.cancelRequested = true;
|
||||
const existingController = acpActiveStreams.get(existingRun.requestId);
|
||||
if (existingController) {
|
||||
existingController.abort();
|
||||
acpActiveStreams.delete(existingRun.requestId);
|
||||
}
|
||||
acpRequestSessions.delete(existingRun.requestId);
|
||||
cleanupAcpProvider(chatSessionId);
|
||||
}
|
||||
|
||||
let providerEntry = acpProviders.get(chatSessionId);
|
||||
const shouldForceProviderReset = acpForceProviderReset.has(chatSessionId);
|
||||
const shouldReuseProvider = Boolean(
|
||||
!shouldForceProviderReset &&
|
||||
providerEntry &&
|
||||
providerEntry.acpCommand === acpCommand &&
|
||||
providerEntry.cwd === sessionCwd &&
|
||||
@@ -1613,6 +1699,7 @@ function registerHandlers(ipcMain) {
|
||||
);
|
||||
|
||||
if (!shouldReuseProvider) {
|
||||
const resumeSessionId = providerEntry?.provider?.getSessionId?.() || existingSessionId || undefined;
|
||||
cleanupAcpProvider(chatSessionId);
|
||||
|
||||
const agentEnv = { ...shellEnv };
|
||||
@@ -1632,6 +1719,7 @@ function registerHandlers(ipcMain) {
|
||||
cwd: sessionCwd,
|
||||
mcpServers: mcpSnapshot.mcpServers,
|
||||
},
|
||||
...(resumeSessionId ? { existingSessionId: resumeSessionId } : {}),
|
||||
...(isCodexAgent
|
||||
? { authMethodId: apiKey ? "codex-api-key" : "chatgpt" }
|
||||
: {}),
|
||||
@@ -1645,12 +1733,64 @@ function registerHandlers(ipcMain) {
|
||||
authFingerprint,
|
||||
mcpFingerprint: mcpSnapshot.fingerprint,
|
||||
permissionMode: currentPermissionMode,
|
||||
historyReplayFallback: false,
|
||||
};
|
||||
acpProviders.set(chatSessionId, providerEntry);
|
||||
}
|
||||
acpForceProviderReset.delete(chatSessionId);
|
||||
|
||||
const abortController = new AbortController();
|
||||
let modelInstance = providerEntry.provider.languageModel(model || undefined);
|
||||
try {
|
||||
await providerEntry.provider.initSession(providerEntry.provider.tools);
|
||||
} catch (err) {
|
||||
const attemptedResumeSessionId = providerEntry.provider?.getSessionId?.() || existingSessionId;
|
||||
if (!attemptedResumeSessionId || !isUnsupportedLoadSessionError(err)) {
|
||||
throw err;
|
||||
}
|
||||
|
||||
cleanupAcpProvider(chatSessionId);
|
||||
|
||||
const fallbackProvider = createACPProvider({
|
||||
command: isCodexAgent
|
||||
? resolveCodexAcpBinaryPath(shellEnv, electronModule)
|
||||
: acpCommand,
|
||||
args: acpArgs || [],
|
||||
env: apiKey ? { ...shellEnv, CODEX_API_KEY: apiKey } : { ...shellEnv },
|
||||
session: {
|
||||
cwd: sessionCwd,
|
||||
mcpServers: mcpSnapshot.mcpServers,
|
||||
},
|
||||
...(isCodexAgent
|
||||
? { authMethodId: apiKey ? "codex-api-key" : "chatgpt" }
|
||||
: {}),
|
||||
persistSession: true,
|
||||
});
|
||||
|
||||
providerEntry = {
|
||||
provider: fallbackProvider,
|
||||
acpCommand,
|
||||
cwd: sessionCwd,
|
||||
authFingerprint,
|
||||
mcpFingerprint: mcpSnapshot.fingerprint,
|
||||
permissionMode: currentPermissionMode,
|
||||
historyReplayFallback: Array.isArray(historyMessages) && historyMessages.length > 0,
|
||||
};
|
||||
acpProviders.set(chatSessionId, providerEntry);
|
||||
modelInstance = providerEntry.provider.languageModel(model || undefined);
|
||||
await providerEntry.provider.initSession(providerEntry.provider.tools);
|
||||
}
|
||||
const activeProviderSessionId = providerEntry.provider.getSessionId?.() || null;
|
||||
if (activeProviderSessionId) {
|
||||
safeSend(event.sender, "netcatty:ai:acp:event", {
|
||||
requestId,
|
||||
event: { type: "session-id", sessionId: activeProviderSessionId },
|
||||
});
|
||||
}
|
||||
|
||||
abortController = new AbortController();
|
||||
acpActiveStreams.set(requestId, abortController);
|
||||
acpRequestSessions.set(requestId, chatSessionId);
|
||||
acpChatRuns.set(chatSessionId, { requestId, cancelRequested: false });
|
||||
|
||||
// Prepend context hint so the agent uses MCP tools for remote hosts
|
||||
const contextualPrompt =
|
||||
@@ -1679,12 +1819,21 @@ function registerHandlers(ipcMain) {
|
||||
return content;
|
||||
}
|
||||
|
||||
const latestPromptMessage = {
|
||||
role: "user",
|
||||
content: buildMessageContent(contextualPrompt, images),
|
||||
};
|
||||
|
||||
const result = streamText({
|
||||
model: providerEntry.provider.languageModel(model || undefined),
|
||||
messages: [{
|
||||
role: "user",
|
||||
content: buildMessageContent(contextualPrompt, images),
|
||||
}],
|
||||
model: modelInstance,
|
||||
messages: providerEntry.historyReplayFallback
|
||||
? [
|
||||
...(Array.isArray(historyMessages)
|
||||
? historyMessages.map((msg) => ({ role: msg.role, content: msg.content }))
|
||||
: []),
|
||||
latestPromptMessage,
|
||||
]
|
||||
: [latestPromptMessage],
|
||||
tools: providerEntry.provider.tools,
|
||||
stopWhen: stepCountIs(mcpServerBridge.getMaxIterations ? mcpServerBridge.getMaxIterations() : 20),
|
||||
abortSignal: abortController.signal,
|
||||
@@ -1698,6 +1847,7 @@ function registerHandlers(ipcMain) {
|
||||
if (stallTimer) clearTimeout(stallTimer);
|
||||
stallTimer = setTimeout(() => {
|
||||
if (!abortController.signal.aborted) {
|
||||
if (!isActiveAcpRun(chatSessionId, requestId)) return;
|
||||
safeSend(event.sender, "netcatty:ai:acp:event", {
|
||||
requestId,
|
||||
event: { type: "status", message: "Waiting for response from agent..." },
|
||||
@@ -1710,6 +1860,7 @@ function registerHandlers(ipcMain) {
|
||||
while (true) {
|
||||
const { done, value: chunk } = await reader.read();
|
||||
if (done || abortController.signal.aborted) break;
|
||||
if (!isActiveAcpRun(chatSessionId, requestId)) break;
|
||||
resetStallTimer();
|
||||
try {
|
||||
const serialized = serializeStreamChunk(chunk);
|
||||
@@ -1733,6 +1884,9 @@ function registerHandlers(ipcMain) {
|
||||
|
||||
// If stream completed with zero content, likely an auth or connection issue
|
||||
if (!hasContent && !abortController.signal.aborted) {
|
||||
if (!isActiveAcpRun(chatSessionId, requestId)) {
|
||||
return { ok: true };
|
||||
}
|
||||
safeSend(event.sender, "netcatty:ai:acp:error", {
|
||||
requestId,
|
||||
error: isCodexAgent
|
||||
@@ -1740,6 +1894,9 @@ function registerHandlers(ipcMain) {
|
||||
: "Agent returned an empty response.",
|
||||
});
|
||||
} else {
|
||||
if (!isActiveAcpRun(chatSessionId, requestId)) {
|
||||
return { ok: true };
|
||||
}
|
||||
safeSend(event.sender, "netcatty:ai:acp:done", { requestId });
|
||||
}
|
||||
} catch (err) {
|
||||
@@ -1761,27 +1918,53 @@ function registerHandlers(ipcMain) {
|
||||
});
|
||||
} finally {
|
||||
acpActiveStreams.delete(requestId);
|
||||
acpRequestSessions.delete(requestId);
|
||||
const activeRun = acpChatRuns.get(chatSessionId);
|
||||
if (activeRun?.requestId === requestId) {
|
||||
if (abortController?.signal?.aborted || activeRun.cancelRequested) {
|
||||
cleanupAcpProvider(chatSessionId);
|
||||
}
|
||||
acpChatRuns.delete(chatSessionId);
|
||||
}
|
||||
}
|
||||
|
||||
return { ok: true };
|
||||
});
|
||||
|
||||
ipcMain.handle("netcatty:ai:acp:cancel", async (event, { requestId }) => {
|
||||
ipcMain.handle("netcatty:ai:acp:cancel", async (event, { requestId, chatSessionId }) => {
|
||||
if (!validateSender(event)) return { ok: false, error: "Unauthorized IPC sender" };
|
||||
// Cancel any active PTY executions (send Ctrl+C)
|
||||
mcpServerBridge.cancelAllPtyExecs();
|
||||
const effectiveChatSessionId = chatSessionId || acpRequestSessions.get(requestId);
|
||||
mcpServerBridge.setChatSessionCancelled?.(effectiveChatSessionId, true);
|
||||
const activeRun = effectiveChatSessionId ? acpChatRuns.get(effectiveChatSessionId) : null;
|
||||
if (activeRun && activeRun.requestId === requestId) {
|
||||
activeRun.cancelRequested = true;
|
||||
}
|
||||
const controller = acpActiveStreams.get(requestId);
|
||||
let cancelled = false;
|
||||
if (controller) {
|
||||
controller.abort();
|
||||
acpActiveStreams.delete(requestId);
|
||||
return { ok: true };
|
||||
cancelled = true;
|
||||
}
|
||||
return { ok: false, error: "Stream not found" };
|
||||
if (effectiveChatSessionId) {
|
||||
acpForceProviderReset.add(effectiveChatSessionId);
|
||||
cleanupAcpProvider(effectiveChatSessionId);
|
||||
}
|
||||
// Preserve the ACP provider session on stop so the next user message can
|
||||
// continue within the same persisted conversation context. Full provider
|
||||
// cleanup is handled by netcatty:ai:acp:cleanup when the chat is deleted.
|
||||
if (effectiveChatSessionId) cancelled = true;
|
||||
acpRequestSessions.delete(requestId);
|
||||
return cancelled ? { ok: true } : { ok: false, error: "Stream not found" };
|
||||
});
|
||||
|
||||
// Cleanup a specific ACP session (when chat session is deleted)
|
||||
ipcMain.handle("netcatty:ai:acp:cleanup", async (event, { chatSessionId }) => {
|
||||
if (!validateSender(event)) return { ok: false, error: "Unauthorized IPC sender" };
|
||||
mcpServerBridge.setChatSessionCancelled?.(chatSessionId, true);
|
||||
acpForceProviderReset.delete(chatSessionId);
|
||||
cleanupAcpProvider(chatSessionId);
|
||||
mcpServerBridge.cleanupScopedMetadata(chatSessionId);
|
||||
return { ok: true };
|
||||
|
||||
@@ -55,6 +55,7 @@ let permissionMode = "confirm";
|
||||
|
||||
// Track active PTY executions for cancellation
|
||||
const activePtyExecs = new Map(); // marker → { ptyStream, cleanup }
|
||||
const cancelledChatSessions = new Set();
|
||||
|
||||
function cancelAllPtyExecs() {
|
||||
for (const [marker, entry] of activePtyExecs) {
|
||||
@@ -116,6 +117,19 @@ function getPermissionMode() {
|
||||
return permissionMode;
|
||||
}
|
||||
|
||||
function setChatSessionCancelled(chatSessionId, cancelled) {
|
||||
if (!chatSessionId) return;
|
||||
if (cancelled) {
|
||||
cancelledChatSessions.add(chatSessionId);
|
||||
} else {
|
||||
cancelledChatSessions.delete(chatSessionId);
|
||||
}
|
||||
}
|
||||
|
||||
function isChatSessionCancelled(chatSessionId) {
|
||||
return Boolean(chatSessionId && cancelledChatSessions.has(chatSessionId));
|
||||
}
|
||||
|
||||
/**
|
||||
* Register metadata for terminal sessions (called from renderer via IPC).
|
||||
* Metadata is stored per-scope (chatSessionId) so different AI chat sessions
|
||||
@@ -336,6 +350,10 @@ async function dispatch(method, params) {
|
||||
return { ok: false, error: `Operation denied: permission mode is "observer" (read-only). Change to "confirm" or "autonomous" in Settings → AI → Safety to allow this action.` };
|
||||
}
|
||||
|
||||
if (WRITE_METHODS.has(method) && isChatSessionCancelled(params?.chatSessionId)) {
|
||||
return { ok: false, error: "Operation cancelled: the ACP session was stopped." };
|
||||
}
|
||||
|
||||
// Scope validation for session-targeted operations
|
||||
if (method !== "netcatty/getContext" && params?.sessionId) {
|
||||
const scopeErr = validateSessionScope(params.sessionId, params?.chatSessionId);
|
||||
@@ -382,20 +400,19 @@ async function dispatch(method, params) {
|
||||
function handleGetContext(params) {
|
||||
if (!sessions) return { hosts: [], instructions: "No sessions available." };
|
||||
|
||||
// Scope resolution: use explicit scopedSessionIds from MCP server env var (per-process, set at spawn).
|
||||
// If scopedSessionIds is provided but empty, that means "no access" (not "all access").
|
||||
// Only fall back to unscoped (show all) when scopedSessionIds is not provided at all.
|
||||
const hasScopeParam = params?.scopedSessionIds != null;
|
||||
const scopedIds = hasScopeParam
|
||||
? new Set(params.scopedSessionIds)
|
||||
: null;
|
||||
|
||||
// chatSessionId may be passed via env for per-scope metadata lookup
|
||||
const chatSessionId = params?.chatSessionId || null;
|
||||
const explicitScopedIds = Array.isArray(params?.scopedSessionIds)
|
||||
? params.scopedSessionIds
|
||||
: null;
|
||||
const resolvedScopedIds = explicitScopedIds ?? (chatSessionId ? getScopedSessionIds(chatSessionId) : null);
|
||||
const hasScopedContext = explicitScopedIds !== null || chatSessionId !== null;
|
||||
const scopedIds = resolvedScopedIds ? new Set(resolvedScopedIds) : null;
|
||||
|
||||
const hosts = [];
|
||||
// When scope param is provided (even if empty Set), enforce it strictly
|
||||
if (hasScopeParam && scopedIds.size === 0) {
|
||||
// When a scoped context exists but currently resolves to zero sessions, treat
|
||||
// it as "no access" rather than falling back to all sessions.
|
||||
if (hasScopedContext && (!resolvedScopedIds || resolvedScopedIds.length === 0)) {
|
||||
return {
|
||||
environment: "netcatty-terminal",
|
||||
description: "No hosts are available in the current scope.",
|
||||
@@ -458,7 +475,10 @@ function handleExec(params) {
|
||||
|
||||
// If no PTY stream, fall back to exec channel (invisible to terminal)
|
||||
if (!ptyStream || typeof ptyStream.write !== "function") {
|
||||
return execViaChannel(sshClient, command, { timeoutMs: commandTimeoutMs });
|
||||
return execViaChannel(sshClient, command, {
|
||||
timeoutMs: commandTimeoutMs,
|
||||
trackForCancellation: activePtyExecs,
|
||||
});
|
||||
}
|
||||
|
||||
// Execute via PTY stream so user sees the command in the terminal
|
||||
@@ -755,7 +775,9 @@ function buildMcpServerConfig(port, scopedSessionIds, chatSessionId) {
|
||||
env.push({ name: "NETCATTY_MCP_TOKEN", value: authToken });
|
||||
}
|
||||
|
||||
if (effectiveIds && effectiveIds.length > 0) {
|
||||
// When chatSessionId is present, the MCP subprocess resolves scope dynamically
|
||||
// through main-process metadata, so avoid freezing session IDs at spawn time.
|
||||
if (!chatSessionId && effectiveIds && effectiveIds.length > 0) {
|
||||
env.push({ name: "NETCATTY_MCP_SESSION_IDS", value: effectiveIds.join(",") });
|
||||
}
|
||||
|
||||
@@ -781,6 +803,7 @@ function buildMcpServerConfig(port, scopedSessionIds, chatSessionId) {
|
||||
function cleanupScopedMetadata(chatSessionId) {
|
||||
if (chatSessionId) {
|
||||
scopedMetadata.delete(chatSessionId);
|
||||
cancelledChatSessions.delete(chatSessionId);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -802,6 +825,7 @@ module.exports = {
|
||||
getMaxIterations,
|
||||
setPermissionMode,
|
||||
getPermissionMode,
|
||||
setChatSessionCancelled,
|
||||
checkCommandSafety,
|
||||
updateSessionMetadata,
|
||||
getScopedSessionIds,
|
||||
|
||||
@@ -1017,6 +1017,19 @@ async function startSSHSession(event, options) {
|
||||
bufferData(decoder.write(data));
|
||||
});
|
||||
|
||||
// Capture the real exit code from the remote process.
|
||||
// "exit" fires when the remote shell/process exits normally;
|
||||
// "close" fires whenever the channel closes (could be network drop).
|
||||
// Only treat it as user-initiated exit if "exit" fired with a numeric
|
||||
// code and no signal. Signal terminations (e.g. server kill, idle
|
||||
// timeout) have code=null and signal set — those are not user exits.
|
||||
let streamExitCode = 0;
|
||||
let streamExited = false;
|
||||
stream.on("exit", (code, signal) => {
|
||||
streamExitCode = typeof code === "number" ? code : 0;
|
||||
streamExited = typeof code === "number" && !signal;
|
||||
});
|
||||
|
||||
stream.on("close", () => {
|
||||
// Flush any remaining data before close
|
||||
if (flushTimeout) {
|
||||
@@ -1024,7 +1037,7 @@ async function startSSHSession(event, options) {
|
||||
}
|
||||
flushBuffer();
|
||||
const contents = event.sender;
|
||||
safeSend(contents, "netcatty:exit", { sessionId, exitCode: 0 });
|
||||
safeSend(contents, "netcatty:exit", { sessionId, exitCode: streamExitCode, reason: streamExited ? "exited" : "closed" });
|
||||
sessions.delete(sessionId);
|
||||
sessionEncodings.delete(sessionId);
|
||||
sessionDecoders.delete(sessionId);
|
||||
@@ -1072,7 +1085,7 @@ async function startSSHSession(event, options) {
|
||||
console.error(`${logPrefix} ${options.hostname} error:`, err.message);
|
||||
}
|
||||
|
||||
safeSend(contents, "netcatty:exit", { sessionId, exitCode: 1, error: err.message });
|
||||
safeSend(contents, "netcatty:exit", { sessionId, exitCode: 1, error: err.message, reason: "error" });
|
||||
sessions.delete(sessionId);
|
||||
sessionEncodings.delete(sessionId);
|
||||
sessionDecoders.delete(sessionId);
|
||||
@@ -1086,7 +1099,7 @@ async function startSSHSession(event, options) {
|
||||
console.error(`${logPrefix} ${options.hostname} connection timeout`);
|
||||
const err = new Error(`Connection timeout to ${options.hostname}`);
|
||||
const contents = event.sender;
|
||||
safeSend(contents, "netcatty:exit", { sessionId, exitCode: 1, error: err.message });
|
||||
safeSend(contents, "netcatty:exit", { sessionId, exitCode: 1, error: err.message, reason: "timeout" });
|
||||
sessions.delete(sessionId);
|
||||
sessionEncodings.delete(sessionId);
|
||||
sessionDecoders.delete(sessionId);
|
||||
@@ -1098,7 +1111,7 @@ async function startSSHSession(event, options) {
|
||||
|
||||
conn.once("close", () => {
|
||||
const contents = event.sender;
|
||||
safeSend(contents, "netcatty:exit", { sessionId, exitCode: 0 });
|
||||
safeSend(contents, "netcatty:exit", { sessionId, exitCode: 0, reason: "closed" });
|
||||
sessions.delete(sessionId);
|
||||
sessionEncodings.delete(sessionId);
|
||||
sessionDecoders.delete(sessionId);
|
||||
|
||||
@@ -222,9 +222,13 @@ function startLocalSession(event, payload) {
|
||||
proc.onExit((evt) => {
|
||||
sessions.delete(sessionId);
|
||||
const contents = electronModule.webContents.fromId(session.webContentsId);
|
||||
contents?.send("netcatty:exit", { sessionId, ...evt });
|
||||
// Signal present = killed externally (show disconnected UI).
|
||||
// No signal = process exited normally, even with non-zero code
|
||||
// (e.g. user typed `exit` after a failed command), so auto-close.
|
||||
const reason = evt.signal ? "error" : "exited";
|
||||
contents?.send("netcatty:exit", { sessionId, ...evt, reason });
|
||||
});
|
||||
|
||||
|
||||
return { sessionId };
|
||||
}
|
||||
|
||||
@@ -426,7 +430,7 @@ async function startTelnetSession(event, options) {
|
||||
const session = sessions.get(sessionId);
|
||||
if (session) {
|
||||
const contents = electronModule.webContents.fromId(session.webContentsId);
|
||||
contents?.send("netcatty:exit", { sessionId, exitCode: 1, error: err.message });
|
||||
contents?.send("netcatty:exit", { sessionId, exitCode: 1, error: err.message, reason: "error" });
|
||||
}
|
||||
sessions.delete(sessionId);
|
||||
}
|
||||
@@ -435,11 +439,11 @@ async function startTelnetSession(event, options) {
|
||||
socket.on('close', (hadError) => {
|
||||
console.log(`[Telnet] Connection closed${hadError ? ' with error' : ''}`);
|
||||
clearTimeout(connectTimeout);
|
||||
|
||||
|
||||
const session = sessions.get(sessionId);
|
||||
if (session) {
|
||||
const contents = electronModule.webContents.fromId(session.webContentsId);
|
||||
contents?.send("netcatty:exit", { sessionId, exitCode: hadError ? 1 : 0 });
|
||||
contents?.send("netcatty:exit", { sessionId, exitCode: hadError ? 1 : 0, reason: hadError ? "error" : "closed" });
|
||||
}
|
||||
sessions.delete(sessionId);
|
||||
});
|
||||
@@ -523,7 +527,8 @@ async function startMoshSession(event, options) {
|
||||
proc.onExit((evt) => {
|
||||
sessions.delete(sessionId);
|
||||
const contents = electronModule.webContents.fromId(session.webContentsId);
|
||||
contents?.send("netcatty:exit", { sessionId, ...evt });
|
||||
// Mosh non-zero exit typically means connection/auth failure — show error UI
|
||||
contents?.send("netcatty:exit", { sessionId, ...evt, reason: evt.exitCode === 0 ? "exited" : "error" });
|
||||
});
|
||||
|
||||
return { sessionId };
|
||||
@@ -615,14 +620,14 @@ async function startSerialSession(event, options) {
|
||||
serialPort.on('error', (err) => {
|
||||
console.error(`[Serial] Port error: ${err.message}`);
|
||||
const contents = electronModule.webContents.fromId(session.webContentsId);
|
||||
contents?.send("netcatty:exit", { sessionId, exitCode: 1, error: err.message });
|
||||
contents?.send("netcatty:exit", { sessionId, exitCode: 1, error: err.message, reason: "error" });
|
||||
sessions.delete(sessionId);
|
||||
});
|
||||
|
||||
serialPort.on('close', () => {
|
||||
console.log(`[Serial] Port closed`);
|
||||
const contents = electronModule.webContents.fromId(session.webContentsId);
|
||||
contents?.send("netcatty:exit", { sessionId, exitCode: 0 });
|
||||
contents?.send("netcatty:exit", { sessionId, exitCode: 0, reason: "closed" });
|
||||
sessions.delete(sessionId);
|
||||
});
|
||||
|
||||
|
||||
@@ -168,8 +168,13 @@ const server = new McpServer({
|
||||
version: "1.0.0",
|
||||
});
|
||||
|
||||
// Scope params shared by all tool calls (includes chatSessionId for metadata isolation)
|
||||
const scopeParams = { scopedSessionIds: SCOPED_SESSION_IDS, chatSessionId: CHAT_SESSION_ID };
|
||||
// Scope params shared by all tool calls.
|
||||
// When chatSessionId is present, let the main process resolve the current
|
||||
// workspace membership dynamically so mid-session workspace changes are visible
|
||||
// without restarting the MCP subprocess.
|
||||
const scopeParams = CHAT_SESSION_ID
|
||||
? { chatSessionId: CHAT_SESSION_ID }
|
||||
: { scopedSessionIds: SCOPED_SESSION_IDS, chatSessionId: CHAT_SESSION_ID };
|
||||
|
||||
// Resource: environment context
|
||||
server.resource(
|
||||
@@ -194,7 +199,7 @@ server.tool(
|
||||
"Get information about the current Netcatty workspace: all connected remote hosts, their session IDs, OS, and connection status. Call this first to discover available hosts before executing commands.",
|
||||
{},
|
||||
async () => {
|
||||
process.stderr.write(`[netcatty-mcp] get_environment called, SCOPED_SESSION_IDS: ${JSON.stringify(SCOPED_SESSION_IDS)}\n`);
|
||||
process.stderr.write(`[netcatty-mcp] get_environment called, SCOPED_SESSION_IDS: ${JSON.stringify(SCOPED_SESSION_IDS)}, CHAT_SESSION_ID: ${CHAT_SESSION_ID}\n`);
|
||||
const ctx = await rpcCall("netcatty/getContext", scopeParams);
|
||||
process.stderr.write(`[netcatty-mcp] get_environment result: hostCount=${ctx.hostCount}, hosts=${JSON.stringify(ctx.hosts?.map(h => h.sessionId))}\n`);
|
||||
return { content: [{ type: "text", text: JSON.stringify(ctx, null, 2) }] };
|
||||
@@ -214,7 +219,7 @@ server.tool(
|
||||
if (guardErr) {
|
||||
return { content: [{ type: "text", text: `Error: ${guardErr}` }], isError: true };
|
||||
}
|
||||
const result = await rpcCall("netcatty/exec", { sessionId, command });
|
||||
const result = await rpcCall("netcatty/exec", { ...scopeParams, sessionId, command });
|
||||
if (!result.ok) {
|
||||
return { content: [{ type: "text", text: `Error: ${result.error || "Command failed"}` }], isError: true };
|
||||
}
|
||||
@@ -239,7 +244,7 @@ server.tool(
|
||||
if (guardErr) {
|
||||
return { content: [{ type: "text", text: `Error: ${guardErr}` }], isError: true };
|
||||
}
|
||||
const result = await rpcCall("netcatty/terminalWrite", { sessionId, input });
|
||||
const result = await rpcCall("netcatty/terminalWrite", { ...scopeParams, sessionId, input });
|
||||
if (!result.ok) {
|
||||
return { content: [{ type: "text", text: `Error: ${result.error}` }], isError: true };
|
||||
}
|
||||
@@ -256,7 +261,7 @@ server.tool(
|
||||
path: z.string().describe("The absolute path of the remote directory to list."),
|
||||
},
|
||||
async ({ sessionId, path }) => {
|
||||
const result = await rpcCall("netcatty/sftpList", { sessionId, path });
|
||||
const result = await rpcCall("netcatty/sftpList", { ...scopeParams, sessionId, path });
|
||||
if (result.error) {
|
||||
return { content: [{ type: "text", text: `Error: ${result.error}` }], isError: true };
|
||||
}
|
||||
@@ -275,7 +280,7 @@ server.tool(
|
||||
},
|
||||
async ({ sessionId, path, maxBytes }) => {
|
||||
const safeMaxBytes = Math.max(1, Math.min(10 * 1024 * 1024, Number(maxBytes) || 10000));
|
||||
const result = await rpcCall("netcatty/sftpRead", { sessionId, path, maxBytes: safeMaxBytes });
|
||||
const result = await rpcCall("netcatty/sftpRead", { ...scopeParams, sessionId, path, maxBytes: safeMaxBytes });
|
||||
if (result.error) {
|
||||
return { content: [{ type: "text", text: `Error: ${result.error}` }], isError: true };
|
||||
}
|
||||
@@ -297,7 +302,7 @@ server.tool(
|
||||
if (guardErr) {
|
||||
return { content: [{ type: "text", text: `Error: ${guardErr}` }], isError: true };
|
||||
}
|
||||
const result = await rpcCall("netcatty/sftpWrite", { sessionId, path, content });
|
||||
const result = await rpcCall("netcatty/sftpWrite", { ...scopeParams, sessionId, path, content });
|
||||
if (result.error) {
|
||||
return { content: [{ type: "text", text: `Error: ${result.error}` }], isError: true };
|
||||
}
|
||||
@@ -318,7 +323,7 @@ server.tool(
|
||||
if (guardErr) {
|
||||
return { content: [{ type: "text", text: `Error: ${guardErr}` }], isError: true };
|
||||
}
|
||||
const result = await rpcCall("netcatty/sftpMkdir", { sessionId, path });
|
||||
const result = await rpcCall("netcatty/sftpMkdir", { ...scopeParams, sessionId, path });
|
||||
if (result.error) {
|
||||
return { content: [{ type: "text", text: `Error: ${result.error}` }], isError: true };
|
||||
}
|
||||
@@ -339,7 +344,7 @@ server.tool(
|
||||
if (guardErr) {
|
||||
return { content: [{ type: "text", text: `Error: ${guardErr}` }], isError: true };
|
||||
}
|
||||
const result = await rpcCall("netcatty/sftpRemove", { sessionId, path });
|
||||
const result = await rpcCall("netcatty/sftpRemove", { ...scopeParams, sessionId, path });
|
||||
if (result.error) {
|
||||
return { content: [{ type: "text", text: `Error: ${result.error}` }], isError: true };
|
||||
}
|
||||
@@ -361,7 +366,7 @@ server.tool(
|
||||
if (guardErr) {
|
||||
return { content: [{ type: "text", text: `Error: ${guardErr}` }], isError: true };
|
||||
}
|
||||
const result = await rpcCall("netcatty/sftpRename", { sessionId, oldPath, newPath });
|
||||
const result = await rpcCall("netcatty/sftpRename", { ...scopeParams, sessionId, oldPath, newPath });
|
||||
if (result.error) {
|
||||
return { content: [{ type: "text", text: `Error: ${result.error}` }], isError: true };
|
||||
}
|
||||
@@ -378,7 +383,7 @@ server.tool(
|
||||
path: z.string().describe("The absolute path to stat."),
|
||||
},
|
||||
async ({ sessionId, path }) => {
|
||||
const result = await rpcCall("netcatty/sftpStat", { sessionId, path });
|
||||
const result = await rpcCall("netcatty/sftpStat", { ...scopeParams, sessionId, path });
|
||||
if (result.error) {
|
||||
return { content: [{ type: "text", text: `Error: ${result.error}` }], isError: true };
|
||||
}
|
||||
@@ -401,7 +406,7 @@ server.tool(
|
||||
if (guardErr) {
|
||||
return { content: [{ type: "text", text: `Error: ${guardErr}` }], isError: true };
|
||||
}
|
||||
const result = await rpcCall("netcatty/multiExec", { sessionIds, command, mode, stopOnError });
|
||||
const result = await rpcCall("netcatty/multiExec", { ...scopeParams, sessionIds, command, mode, stopOnError });
|
||||
if (result.error) {
|
||||
return { content: [{ type: "text", text: `Error: ${result.error}` }], isError: true };
|
||||
}
|
||||
|
||||
@@ -1065,11 +1065,11 @@ const api = {
|
||||
return ipcRenderer.invoke("netcatty:ai:mcp:set-permission-mode", { mode });
|
||||
},
|
||||
// ACP streaming
|
||||
aiAcpStream: async (requestId, chatSessionId, acpCommand, acpArgs, prompt, cwd, providerId, model, images) => {
|
||||
return ipcRenderer.invoke("netcatty:ai:acp:stream", { requestId, chatSessionId, acpCommand, acpArgs, prompt, cwd, providerId, model, images });
|
||||
aiAcpStream: async (requestId, chatSessionId, acpCommand, acpArgs, prompt, cwd, providerId, model, existingSessionId, historyMessages, images) => {
|
||||
return ipcRenderer.invoke("netcatty:ai:acp:stream", { requestId, chatSessionId, acpCommand, acpArgs, prompt, cwd, providerId, model, existingSessionId, historyMessages, images });
|
||||
},
|
||||
aiAcpCancel: async (requestId) => {
|
||||
return ipcRenderer.invoke("netcatty:ai:acp:cancel", { requestId });
|
||||
aiAcpCancel: async (requestId, chatSessionId) => {
|
||||
return ipcRenderer.invoke("netcatty:ai:acp:cancel", { requestId, chatSessionId });
|
||||
},
|
||||
aiAcpCleanup: async (chatSessionId) => {
|
||||
return ipcRenderer.invoke("netcatty:ai:acp:cleanup", { chatSessionId });
|
||||
|
||||
6
global.d.ts
vendored
6
global.d.ts
vendored
@@ -244,7 +244,7 @@ declare global {
|
||||
onSessionData(sessionId: string, cb: (data: string) => void): () => void;
|
||||
onSessionExit(
|
||||
sessionId: string,
|
||||
cb: (evt: { exitCode?: number; signal?: number }) => void
|
||||
cb: (evt: { exitCode?: number; signal?: number; error?: string; reason?: "exited" | "error" | "timeout" | "closed" }) => void
|
||||
): () => void;
|
||||
onAuthFailed?(
|
||||
sessionId: string,
|
||||
@@ -689,8 +689,8 @@ declare global {
|
||||
aiWriteToAgent?(agentId: string, data: string): Promise<{ ok: boolean; error?: string }>;
|
||||
aiCloseAgentStdin?(agentId: string): Promise<{ ok: boolean; error?: string }>;
|
||||
aiKillAgent?(agentId: string): Promise<{ ok: boolean; error?: string }>;
|
||||
aiAcpStream?(requestId: string, chatSessionId: string, acpCommand: string, acpArgs: string[], prompt: string, cwd?: string, providerId?: string): Promise<{ ok: boolean; error?: string }>;
|
||||
aiAcpCancel?(requestId: string): Promise<{ ok: boolean; error?: string }>;
|
||||
aiAcpStream?(requestId: string, chatSessionId: string, acpCommand: string, acpArgs: string[], prompt: string, cwd?: string, providerId?: string, model?: string, existingSessionId?: string, historyMessages?: Array<{ role: 'user' | 'assistant'; content: string }>, images?: Array<{ base64Data: string; mediaType: string; filename?: string }>): Promise<{ ok: boolean; error?: string }>;
|
||||
aiAcpCancel?(requestId: string, chatSessionId?: string): Promise<{ ok: boolean; error?: string }>;
|
||||
aiAcpCleanup?(chatSessionId: string): Promise<{ ok: boolean }>;
|
||||
onAiAcpEvent?(requestId: string, cb: (event: Record<string, unknown>) => void): () => void;
|
||||
onAiAcpDone?(requestId: string, cb: () => void): () => void;
|
||||
|
||||
@@ -9,6 +9,7 @@
|
||||
import type { ExternalAgentConfig } from './types';
|
||||
|
||||
export interface AcpAgentCallbacks {
|
||||
onSessionId?: (sessionId: string) => void;
|
||||
onTextDelta: (text: string) => void;
|
||||
onThinkingDelta: (text: string) => void;
|
||||
onThinkingDone: () => void;
|
||||
@@ -29,9 +30,11 @@ interface AcpBridge {
|
||||
cwd?: string,
|
||||
providerId?: string,
|
||||
model?: string,
|
||||
existingSessionId?: string,
|
||||
historyMessages?: Array<{ role: 'user' | 'assistant'; content: string }>,
|
||||
images?: ImageAttachment[],
|
||||
): Promise<{ ok: boolean; error?: string }>;
|
||||
aiAcpCancel(requestId: string): Promise<{ ok: boolean }>;
|
||||
aiAcpCancel(requestId: string, chatSessionId?: string): Promise<{ ok: boolean }>;
|
||||
onAiAcpEvent(requestId: string, cb: (event: StreamEvent) => void): () => void;
|
||||
onAiAcpDone(requestId: string, cb: () => void): () => void;
|
||||
onAiAcpError(requestId: string, cb: (error: string) => void): () => void;
|
||||
@@ -63,6 +66,8 @@ export async function runAcpAgentTurn(
|
||||
signal?: AbortSignal,
|
||||
providerId?: string,
|
||||
model?: string,
|
||||
existingSessionId?: string,
|
||||
historyMessages?: Array<{ role: 'user' | 'assistant'; content: string }>,
|
||||
images?: ImageAttachment[],
|
||||
): Promise<void> {
|
||||
const acpBridge = bridge as unknown as AcpBridge;
|
||||
@@ -101,7 +106,7 @@ export async function runAcpAgentTurn(
|
||||
return;
|
||||
}
|
||||
const onAbort = () => {
|
||||
acpBridge.aiAcpCancel(requestId).catch(() => {});
|
||||
acpBridge.aiAcpCancel(requestId, chatSessionId).catch(() => {});
|
||||
};
|
||||
signal.addEventListener('abort', onAbort, { once: true });
|
||||
cleanupFns.push(() => signal.removeEventListener('abort', onAbort));
|
||||
@@ -117,6 +122,8 @@ export async function runAcpAgentTurn(
|
||||
undefined, // cwd
|
||||
providerId,
|
||||
model,
|
||||
existingSessionId,
|
||||
historyMessages,
|
||||
images?.length ? images : undefined,
|
||||
).catch((err: Error) => {
|
||||
callbacks.onError(err.message);
|
||||
@@ -177,6 +184,11 @@ function handleStreamEvent(event: StreamEvent, callbacks: AcpAgentCallbacks) {
|
||||
if (msg) callbacks.onStatus?.(msg);
|
||||
break;
|
||||
}
|
||||
case 'session-id': {
|
||||
const sessionId = (event.sessionId as string) || '';
|
||||
if (sessionId) callbacks.onSessionId?.(sessionId);
|
||||
break;
|
||||
}
|
||||
case 'error': {
|
||||
callbacks.onError(String(event.error || 'Unknown error'));
|
||||
break;
|
||||
|
||||
@@ -86,6 +86,20 @@ function extractHeaders(headers?: HeadersInit): Record<string, string> {
|
||||
/** Placeholder API key used by the renderer; main process replaces it with the real key. */
|
||||
export const API_KEY_PLACEHOLDER = '__IPC_SECURED__';
|
||||
|
||||
function toSafeStatusText(message: string, fallback: string): string {
|
||||
const normalized = message
|
||||
.replace(/[\r\n\t]+/g, ' ')
|
||||
.replace(/\s+/g, ' ')
|
||||
.trim();
|
||||
if (!normalized) return fallback;
|
||||
const byteStringSafe = Array.from(normalized, (char) => {
|
||||
const code = char.charCodeAt(0);
|
||||
if (code < 0x20 || code === 0x7f || code > 0xff) return '?';
|
||||
return char;
|
||||
}).join('');
|
||||
return byteStringSafe.slice(0, 120) || fallback;
|
||||
}
|
||||
|
||||
export function createBridgeFetchForSDK(providerId?: string): typeof globalThis.fetch {
|
||||
return async (
|
||||
input: string | URL | Request,
|
||||
@@ -182,7 +196,7 @@ export function createBridgeFetchForSDK(providerId?: string): typeof globalThis.
|
||||
const jsonBody = JSON.stringify({ error: { message: errorMessage } });
|
||||
return new Response(jsonBody, {
|
||||
status: 502,
|
||||
statusText: 'Bad Gateway',
|
||||
statusText: toSafeStatusText(errorMessage, 'Bad Gateway'),
|
||||
headers: { 'content-type': 'application/json' },
|
||||
});
|
||||
}
|
||||
@@ -198,7 +212,7 @@ export function createBridgeFetchForSDK(providerId?: string): typeof globalThis.
|
||||
const jsonBody = JSON.stringify({ error: { message: errorDetail } });
|
||||
return new Response(jsonBody, {
|
||||
status: statusCode,
|
||||
statusText: `Error ${statusCode}`,
|
||||
statusText: toSafeStatusText(errorDetail, `Error ${statusCode}`),
|
||||
headers: { 'content-type': 'application/json' },
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import { tool } from 'ai';
|
||||
import { z } from 'zod';
|
||||
import type { NetcattyBridge, ExecutorContext } from '../cattyAgent/executor';
|
||||
import type { NetcattyBridge } from '../cattyAgent/executor';
|
||||
import type { AIPermissionMode } from '../types';
|
||||
import type { WebSearchConfig } from '../types';
|
||||
import { isWebSearchReady } from '../types';
|
||||
@@ -30,7 +30,7 @@ function unwrap<T>(r: ToolExecResult<T>): T | { error: string } {
|
||||
*/
|
||||
export function createCattyTools(
|
||||
bridge: NetcattyBridge,
|
||||
context: ExecutorContext,
|
||||
context: ToolDeps['context'],
|
||||
commandBlocklist?: string[],
|
||||
permissionMode: AIPermissionMode = 'confirm',
|
||||
webSearchConfig?: WebSearchConfig,
|
||||
|
||||
@@ -27,7 +27,7 @@ export type ToolExecResult<T = unknown> =
|
||||
|
||||
export interface ToolDeps {
|
||||
bridge: NetcattyBridge;
|
||||
context: ExecutorContext;
|
||||
context: ExecutorContext | (() => ExecutorContext);
|
||||
commandBlocklist?: string[];
|
||||
permissionMode: AIPermissionMode;
|
||||
webSearchConfig?: WebSearchConfig;
|
||||
@@ -37,11 +37,16 @@ export interface ToolDeps {
|
||||
// Helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
function validSessionIds(ctx: ExecutorContext): Set<string> {
|
||||
return new Set(ctx.sessions.map(s => s.sessionId));
|
||||
function resolveContext(ctx: ToolDeps['context']): ExecutorContext {
|
||||
return typeof ctx === 'function' ? ctx() : ctx;
|
||||
}
|
||||
|
||||
function validateSessionScope(ctx: ExecutorContext, sessionId: string): string | null {
|
||||
function validSessionIds(ctx: ToolDeps['context']): Set<string> {
|
||||
const resolved = resolveContext(ctx);
|
||||
return new Set(resolved.sessions.map(s => s.sessionId));
|
||||
}
|
||||
|
||||
function validateSessionScope(ctx: ToolDeps['context'], sessionId: string): string | null {
|
||||
const ids = validSessionIds(ctx);
|
||||
if (!ids.has(sessionId)) {
|
||||
return `Session "${sessionId}" is not in the current scope. Available sessions: ${[...ids].join(', ')}`;
|
||||
@@ -110,7 +115,7 @@ export function executeWorkspaceGetInfo(
|
||||
connected: boolean;
|
||||
}>;
|
||||
}> {
|
||||
const { context } = deps;
|
||||
const context = resolveContext(deps.context);
|
||||
return {
|
||||
ok: true,
|
||||
data: {
|
||||
@@ -132,7 +137,7 @@ export function executeWorkspaceGetSessionInfo(
|
||||
deps: ToolDeps,
|
||||
args: { sessionId: string },
|
||||
): ToolExecResult<ExecutorContext['sessions'][number]> {
|
||||
const { context } = deps;
|
||||
const context = resolveContext(deps.context);
|
||||
const session = context.sessions.find(s => s.sessionId === args.sessionId);
|
||||
if (!session) {
|
||||
return { ok: false, error: `Session not found: ${args.sessionId}` };
|
||||
|
||||
@@ -48,7 +48,7 @@ export interface ChatMessage {
|
||||
};
|
||||
/** Transient status text shown with shimmer effect (e.g. "Waiting for response...") */
|
||||
statusText?: string;
|
||||
executionStatus?: 'pending' | 'approved' | 'rejected' | 'running' | 'completed' | 'failed';
|
||||
executionStatus?: 'pending' | 'approved' | 'rejected' | 'running' | 'completed' | 'failed' | 'cancelled';
|
||||
pendingApproval?: {
|
||||
approvalId: string;
|
||||
toolCallId: string;
|
||||
@@ -100,6 +100,7 @@ export interface AISession {
|
||||
agentId: string;
|
||||
scope: AISessionScope;
|
||||
messages: ChatMessage[];
|
||||
externalSessionId?: string;
|
||||
createdAt: number;
|
||||
updatedAt: number;
|
||||
}
|
||||
|
||||
@@ -43,6 +43,7 @@ import {
|
||||
decryptProviderSecrets,
|
||||
encryptProviderSecrets,
|
||||
} from '../persistence/secureFieldAdapter';
|
||||
import { mergeSyncPayloads } from '../../domain/syncMerge';
|
||||
|
||||
const SYNC_HISTORY_STORAGE_KEY = 'netcatty_sync_history_v1';
|
||||
|
||||
@@ -256,6 +257,15 @@ export class CloudSyncManager {
|
||||
}
|
||||
}
|
||||
|
||||
private removeFromStorage(key: string): void {
|
||||
try {
|
||||
// eslint-disable-next-line no-restricted-globals
|
||||
localStorage.removeItem(key);
|
||||
} catch {
|
||||
// ignore storage removal failures
|
||||
}
|
||||
}
|
||||
|
||||
// ==========================================================================
|
||||
// Cross-window sync (Electron settings window, etc.)
|
||||
// ==========================================================================
|
||||
@@ -757,6 +767,8 @@ export class CloudSyncManager {
|
||||
}
|
||||
|
||||
await this.saveProviderConnection('github', this.state.providers.github);
|
||||
// Clear merge base when (re)authenticating to a potentially different account
|
||||
this.removeFromStorage(this.syncBaseKey('github'));
|
||||
this.emit({
|
||||
type: 'AUTH_COMPLETED',
|
||||
provider: 'github',
|
||||
@@ -810,6 +822,8 @@ export class CloudSyncManager {
|
||||
}
|
||||
|
||||
await this.saveProviderConnection(provider, this.state.providers[provider]);
|
||||
// Clear merge base when (re)authenticating to a potentially different account
|
||||
this.removeFromStorage(this.syncBaseKey(provider));
|
||||
this.emit({
|
||||
type: 'AUTH_COMPLETED',
|
||||
provider,
|
||||
@@ -846,6 +860,8 @@ export class CloudSyncManager {
|
||||
};
|
||||
|
||||
await this.saveProviderConnection(provider, this.state.providers[provider]);
|
||||
// Clear merge base when (re)configuring to a different endpoint/bucket
|
||||
this.removeFromStorage(this.syncBaseKey(provider));
|
||||
this.emit({
|
||||
type: 'AUTH_COMPLETED',
|
||||
provider,
|
||||
@@ -874,6 +890,9 @@ export class CloudSyncManager {
|
||||
};
|
||||
|
||||
await this.saveProviderConnection(provider, this.state.providers[provider]);
|
||||
// Clear the merge base for this provider so reconnecting to a different
|
||||
// account/resource doesn't reuse an unrelated snapshot
|
||||
this.removeFromStorage(this.syncBaseKey(provider));
|
||||
this.notifyStateChange(); // Ensure UI updates immediately after disconnect
|
||||
}
|
||||
|
||||
@@ -1081,30 +1100,81 @@ export class CloudSyncManager {
|
||||
}
|
||||
|
||||
if (checkResult.conflict && checkResult.remoteFile) {
|
||||
const remoteFile = checkResult.remoteFile;
|
||||
// Remote is newer - conflict
|
||||
this.state.syncState = 'CONFLICT';
|
||||
this.state.currentConflict = {
|
||||
provider,
|
||||
localVersion: this.state.localVersion,
|
||||
localUpdatedAt: this.state.localUpdatedAt,
|
||||
localDeviceName: this.state.deviceName,
|
||||
remoteVersion: remoteFile.meta.version,
|
||||
remoteUpdatedAt: remoteFile.meta.updatedAt,
|
||||
remoteDeviceName: remoteFile.meta.deviceName,
|
||||
};
|
||||
// Remote is newer — attempt three-way merge instead of blocking
|
||||
try {
|
||||
const remotePayload = await EncryptionService.decryptPayload(
|
||||
checkResult.remoteFile,
|
||||
this.masterPassword,
|
||||
);
|
||||
const base = await this.loadSyncBase(provider);
|
||||
const mergeResult = mergeSyncPayloads(base, payload, remotePayload);
|
||||
|
||||
this.emit({
|
||||
type: 'CONFLICT_DETECTED',
|
||||
conflict: this.state.currentConflict,
|
||||
});
|
||||
console.log('[CloudSyncManager] Three-way merge completed', mergeResult.summary);
|
||||
|
||||
return {
|
||||
success: false,
|
||||
provider,
|
||||
action: 'none',
|
||||
conflictDetected: true,
|
||||
};
|
||||
// Encrypt and upload merged payload
|
||||
const mergedSyncedFile = await EncryptionService.encryptPayload(
|
||||
mergeResult.payload,
|
||||
this.masterPassword,
|
||||
this.state.deviceId,
|
||||
this.state.deviceName,
|
||||
packageJson.version,
|
||||
checkResult.remoteFile.meta.version, // base on remote version
|
||||
);
|
||||
|
||||
const uploadResult = await this.uploadToProvider(provider, adapter, mergedSyncedFile);
|
||||
|
||||
if (uploadResult.success) {
|
||||
await this.saveSyncBase(mergeResult.payload, provider);
|
||||
this.state.syncState = 'IDLE';
|
||||
|
||||
this.addSyncHistoryEntry({
|
||||
timestamp: Date.now(),
|
||||
provider,
|
||||
action: 'merge',
|
||||
success: true,
|
||||
localVersion: mergedSyncedFile.meta.version,
|
||||
remoteVersion: checkResult.remoteFile.meta.version,
|
||||
deviceName: this.state.deviceName,
|
||||
});
|
||||
|
||||
return {
|
||||
...uploadResult,
|
||||
action: 'merge',
|
||||
mergedPayload: mergeResult.payload,
|
||||
};
|
||||
}
|
||||
|
||||
// Upload after merge failed — set ERROR so sync isn't stuck in SYNCING
|
||||
this.state.syncState = 'ERROR';
|
||||
this.state.lastError = uploadResult.error || 'Upload failed after merge';
|
||||
return uploadResult;
|
||||
} catch (mergeError) {
|
||||
// Merge failed — fall back to conflict UI
|
||||
console.error('[CloudSyncManager] Merge failed, falling back to conflict UI', mergeError);
|
||||
const remoteFile = checkResult.remoteFile;
|
||||
this.state.syncState = 'CONFLICT';
|
||||
this.state.currentConflict = {
|
||||
provider,
|
||||
localVersion: this.state.localVersion,
|
||||
localUpdatedAt: this.state.localUpdatedAt,
|
||||
localDeviceName: this.state.deviceName,
|
||||
remoteVersion: remoteFile.meta.version,
|
||||
remoteUpdatedAt: remoteFile.meta.updatedAt,
|
||||
remoteDeviceName: remoteFile.meta.deviceName,
|
||||
};
|
||||
|
||||
this.emit({
|
||||
type: 'CONFLICT_DETECTED',
|
||||
conflict: this.state.currentConflict,
|
||||
});
|
||||
|
||||
return {
|
||||
success: false,
|
||||
provider,
|
||||
action: 'none',
|
||||
conflictDetected: true,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
// 2. Encrypt
|
||||
@@ -1121,6 +1191,7 @@ export class CloudSyncManager {
|
||||
const result = await this.uploadToProvider(provider, adapter, syncedFile);
|
||||
|
||||
if (result.success) {
|
||||
await this.saveSyncBase(payload, provider);
|
||||
this.state.syncState = 'IDLE';
|
||||
} else {
|
||||
this.state.syncState = 'ERROR';
|
||||
@@ -1182,6 +1253,7 @@ export class CloudSyncManager {
|
||||
this.state.remoteVersion = remoteFile.meta.version;
|
||||
this.state.remoteUpdatedAt = remoteFile.meta.updatedAt;
|
||||
this.saveSyncConfig();
|
||||
await this.saveSyncBase(payload, provider);
|
||||
this.notifyStateChange(); // Notify UI of state change
|
||||
|
||||
// Add to sync history
|
||||
@@ -1240,8 +1312,10 @@ export class CloudSyncManager {
|
||||
/**
|
||||
* Sync to all connected providers
|
||||
*/
|
||||
async syncAllProviders(payload?: SyncPayload): Promise<Map<CloudProvider, SyncResult>> {
|
||||
async syncAllProviders(inputPayload?: SyncPayload): Promise<Map<CloudProvider, SyncResult>> {
|
||||
const results = new Map<CloudProvider, SyncResult>();
|
||||
let payload = inputPayload;
|
||||
let wasMerged = false;
|
||||
|
||||
if (!payload) {
|
||||
// Caller should provide payload from app state
|
||||
@@ -1293,58 +1367,85 @@ export class CloudSyncManager {
|
||||
|
||||
const checkResults = await Promise.all(checkTasks);
|
||||
|
||||
// 2. Analyze Results & Handle Conflicts
|
||||
const conflict = checkResults.find((r) => !r.error && r.check?.conflict);
|
||||
// 2. Analyze Results & Handle Conflicts — merge ALL conflicting providers
|
||||
const conflicts = checkResults.filter((r) => !r.error && r.check?.conflict && r.check?.remoteFile);
|
||||
|
||||
if (conflict && conflict.check?.remoteFile) {
|
||||
const { provider, check } = conflict;
|
||||
const remoteFile = check.remoteFile!;
|
||||
|
||||
this.state.syncState = 'CONFLICT';
|
||||
this.state.currentConflict = {
|
||||
provider: provider as CloudProvider,
|
||||
localVersion: this.state.localVersion,
|
||||
localUpdatedAt: this.state.localUpdatedAt,
|
||||
localDeviceName: this.state.deviceName,
|
||||
remoteVersion: remoteFile.meta.version,
|
||||
remoteUpdatedAt: remoteFile.meta.updatedAt,
|
||||
remoteDeviceName: remoteFile.meta.deviceName,
|
||||
};
|
||||
|
||||
this.emit({
|
||||
type: 'CONFLICT_DETECTED',
|
||||
conflict: this.state.currentConflict,
|
||||
});
|
||||
|
||||
// Populate results
|
||||
for (const r of checkResults) {
|
||||
if (r.error) {
|
||||
results.set(r.provider as CloudProvider, {
|
||||
success: false,
|
||||
provider: r.provider as CloudProvider,
|
||||
action: 'none',
|
||||
error: r.error,
|
||||
});
|
||||
this.updateProviderStatus(r.provider as CloudProvider, 'error', r.error);
|
||||
this.emit({ type: 'SYNC_ERROR', provider: r.provider as CloudProvider, error: r.error });
|
||||
} else if (r.provider === provider) {
|
||||
results.set(provider as CloudProvider, {
|
||||
success: false,
|
||||
provider: provider as CloudProvider,
|
||||
action: 'none',
|
||||
conflictDetected: true,
|
||||
});
|
||||
} else {
|
||||
// Others are reset to connected
|
||||
this.updateProviderStatus(r.provider as CloudProvider, 'connected');
|
||||
results.set(r.provider as CloudProvider, {
|
||||
success: true, // Should we mark as success if skipped?
|
||||
provider: r.provider as CloudProvider,
|
||||
action: 'none',
|
||||
});
|
||||
if (conflicts.length > 0) {
|
||||
// Three-way merge: incorporate remote data from every conflicting provider
|
||||
try {
|
||||
let merged = payload;
|
||||
for (const c of conflicts) {
|
||||
const providerBase = await this.loadSyncBase(c.provider as CloudProvider);
|
||||
const remotePayload = await EncryptionService.decryptPayload(
|
||||
c.check!.remoteFile!,
|
||||
this.masterPassword,
|
||||
);
|
||||
const result = mergeSyncPayloads(providerBase, merged, remotePayload);
|
||||
merged = result.payload;
|
||||
}
|
||||
const mergeResult = { payload: merged };
|
||||
|
||||
console.log('[CloudSyncManager] syncAll: three-way merge completed');
|
||||
|
||||
// Replace payload with merged payload for upload to all providers
|
||||
payload = mergeResult.payload;
|
||||
wasMerged = true;
|
||||
|
||||
// Re-classify: all providers (including the conflicting one) should now upload
|
||||
// Clear the conflict check result so all go through the upload path
|
||||
for (const r of checkResults) {
|
||||
if (r.check) r.check.conflict = false;
|
||||
}
|
||||
} catch (mergeError) {
|
||||
// Merge failed — fall back to conflict UI
|
||||
console.error('[CloudSyncManager] syncAll: merge failed', mergeError);
|
||||
const { provider, check } = conflicts[0];
|
||||
const remoteFile = check!.remoteFile!;
|
||||
|
||||
this.state.syncState = 'CONFLICT';
|
||||
this.state.currentConflict = {
|
||||
provider: provider as CloudProvider,
|
||||
localVersion: this.state.localVersion,
|
||||
localUpdatedAt: this.state.localUpdatedAt,
|
||||
localDeviceName: this.state.deviceName,
|
||||
remoteVersion: remoteFile.meta.version,
|
||||
remoteUpdatedAt: remoteFile.meta.updatedAt,
|
||||
remoteDeviceName: remoteFile.meta.deviceName,
|
||||
};
|
||||
|
||||
this.emit({
|
||||
type: 'CONFLICT_DETECTED',
|
||||
conflict: this.state.currentConflict,
|
||||
});
|
||||
|
||||
for (const r of checkResults) {
|
||||
if (r.error) {
|
||||
results.set(r.provider as CloudProvider, {
|
||||
success: false,
|
||||
provider: r.provider as CloudProvider,
|
||||
action: 'none',
|
||||
error: r.error,
|
||||
});
|
||||
this.updateProviderStatus(r.provider as CloudProvider, 'error', r.error);
|
||||
this.emit({ type: 'SYNC_ERROR', provider: r.provider as CloudProvider, error: r.error });
|
||||
} else if (r.provider === conflicts[0].provider) {
|
||||
results.set(r.provider as CloudProvider, {
|
||||
success: false,
|
||||
provider: r.provider as CloudProvider,
|
||||
action: 'none',
|
||||
conflictDetected: true,
|
||||
});
|
||||
} else {
|
||||
this.updateProviderStatus(r.provider as CloudProvider, 'connected');
|
||||
results.set(r.provider as CloudProvider, {
|
||||
success: true,
|
||||
provider: r.provider as CloudProvider,
|
||||
action: 'none',
|
||||
});
|
||||
}
|
||||
}
|
||||
return results;
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
// 3. Encrypt Once
|
||||
@@ -1370,6 +1471,15 @@ export class CloudSyncManager {
|
||||
return results;
|
||||
}
|
||||
|
||||
// Use the highest version as base: either local or any remote that was merged
|
||||
let baseVersion = this.state.localVersion;
|
||||
if (wasMerged) {
|
||||
for (const c of conflicts) {
|
||||
const rv = c.check?.remoteFile?.meta?.version ?? 0;
|
||||
if (rv > baseVersion) baseVersion = rv;
|
||||
}
|
||||
}
|
||||
|
||||
let syncedFile: SyncedFile;
|
||||
try {
|
||||
syncedFile = await EncryptionService.encryptPayload(
|
||||
@@ -1378,7 +1488,7 @@ export class CloudSyncManager {
|
||||
this.state.deviceId,
|
||||
this.state.deviceName,
|
||||
packageJson.version,
|
||||
this.state.localVersion
|
||||
baseVersion
|
||||
);
|
||||
} catch (error) {
|
||||
const msg = String(error);
|
||||
@@ -1411,6 +1521,22 @@ export class CloudSyncManager {
|
||||
const hasSuccess = Array.from(results.values()).some((r) => r.success);
|
||||
if (hasSuccess) {
|
||||
this.state.syncState = 'IDLE';
|
||||
// Save base per provider that successfully uploaded
|
||||
if (payload) {
|
||||
for (const [p, r] of results) {
|
||||
if (r.success) await this.saveSyncBase(payload, p);
|
||||
}
|
||||
}
|
||||
|
||||
// If a merge happened, attach the merged payload to successful results
|
||||
// so callers can apply remote additions to local state
|
||||
if (wasMerged && payload) {
|
||||
for (const [p, r] of results) {
|
||||
if (r.success) {
|
||||
results.set(p, { ...r, action: 'merge', mergedPayload: payload });
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
this.state.syncState = 'ERROR';
|
||||
// lastError is set by uploadToProvider
|
||||
@@ -1494,6 +1620,60 @@ export class CloudSyncManager {
|
||||
});
|
||||
}
|
||||
|
||||
// ==========================================================================
|
||||
// Sync Base (three-way merge snapshot)
|
||||
// ==========================================================================
|
||||
|
||||
private syncBaseKey(provider?: CloudProvider): string {
|
||||
const suffix = provider ? `_${provider}` : '';
|
||||
return `${SYNC_STORAGE_KEYS.SYNC_BASE_PAYLOAD}${suffix}`;
|
||||
}
|
||||
|
||||
async saveSyncBase(payload: SyncPayload, provider?: CloudProvider): Promise<void> {
|
||||
const key = this.state.unlockedKey?.derivedKey;
|
||||
if (!key) return;
|
||||
try {
|
||||
const data = new TextEncoder().encode(JSON.stringify(payload));
|
||||
const iv = crypto.getRandomValues(new Uint8Array(12));
|
||||
const encrypted = await crypto.subtle.encrypt({ name: 'AES-GCM', iv }, key, data);
|
||||
const combined = new Uint8Array(iv.length + encrypted.byteLength);
|
||||
combined.set(iv);
|
||||
combined.set(new Uint8Array(encrypted), iv.length);
|
||||
// Encode in chunks to avoid stack overflow with large buffers
|
||||
let binary = '';
|
||||
const CHUNK = 8192;
|
||||
for (let i = 0; i < combined.length; i += CHUNK) {
|
||||
binary += String.fromCharCode(...combined.subarray(i, i + CHUNK));
|
||||
}
|
||||
this.saveToStorage(this.syncBaseKey(provider), btoa(binary));
|
||||
} catch {
|
||||
console.warn('[CloudSyncManager] Failed to save sync base');
|
||||
}
|
||||
}
|
||||
|
||||
async loadSyncBase(provider?: CloudProvider): Promise<SyncPayload | null> {
|
||||
const key = this.state.unlockedKey?.derivedKey;
|
||||
if (!key) return null;
|
||||
try {
|
||||
const encoded = this.loadFromStorage<string>(this.syncBaseKey(provider));
|
||||
if (!encoded || typeof encoded !== 'string') return null;
|
||||
const combined = Uint8Array.from(atob(encoded), (c) => c.charCodeAt(0));
|
||||
const iv = combined.slice(0, 12);
|
||||
const ciphertext = combined.slice(12);
|
||||
const decrypted = await crypto.subtle.decrypt({ name: 'AES-GCM', iv }, key, ciphertext);
|
||||
return JSON.parse(new TextDecoder().decode(decrypted));
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private clearSyncBase(): void {
|
||||
this.removeFromStorage(SYNC_STORAGE_KEYS.SYNC_BASE_PAYLOAD);
|
||||
for (const p of ['github', 'google', 'onedrive', 'webdav', 's3'] as const) {
|
||||
this.removeFromStorage(this.syncBaseKey(p));
|
||||
}
|
||||
}
|
||||
|
||||
private addSyncHistoryEntry(entry: Omit<SyncHistoryEntry, 'id'>): void {
|
||||
const newEntry: SyncHistoryEntry = {
|
||||
...entry,
|
||||
@@ -1521,6 +1701,7 @@ export class CloudSyncManager {
|
||||
this.state.syncHistory = [];
|
||||
this.saveSyncConfig();
|
||||
this.saveToStorage(SYNC_HISTORY_STORAGE_KEY, []);
|
||||
this.clearSyncBase();
|
||||
this.notifyStateChange();
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user