Files
Netcatty/lib/uploadService.ts
2026-06-12 17:46:32 +08:00

1009 lines
36 KiB
TypeScript

/**
* Shared Upload Service
*
* Provides core upload logic for both SftpView and SftpModal components.
* Handles bundled folder uploads with aggregate progress tracking,
* cancellation support, and works for both local and remote (SFTP) uploads.
*/
import { extractDropEntries, DropEntry, getPathForFile } from "./sftpFileUtils";
import { logger } from "./logger";
import { uploadFoldersCompressed } from "./uploadCompressed";
import {
canReplaceSftpConflict,
describeSftpExistingKind,
describeSftpIncomingKind,
getSftpConflictTypeKey,
} from "../domain/sftpConflict";
// ============================================================================
// Types
// ============================================================================
export type {
UploadBridge,
UploadCallbacks,
UploadConfig,
UploadProgress,
UploadResult,
UploadTaskInfo,
} from "./uploadService.types";
import type { UploadBridge, UploadCallbacks, UploadConfig, UploadResult } from "./uploadService.types";
// ============================================================================
// Helper Functions
// ============================================================================
const formatUploadError = (error: unknown): string =>
error instanceof Error ? error.message : String(error);
const getDropEntrySize = (entry: DropEntry): number => entry.file?.size ?? entry.size ?? 0;
const getDropEntryLocalPath = (entry: DropEntry): string | undefined =>
entry.localPath ?? (entry.file as (File & { path?: string }) | null | undefined)?.path;
const isUploadableFileEntry = (entry: DropEntry): boolean =>
!entry.isDirectory && (!!entry.file || !!getDropEntryLocalPath(entry));
export interface UploadScanningTask {
taskId: string;
complete: () => void;
fail: (error: unknown) => void;
cancel: () => void;
isOpen: () => boolean;
}
export function startUploadScanningTask(
callbacks?: UploadCallbacks,
taskId = crypto.randomUUID(),
): UploadScanningTask {
let open = true;
callbacks?.onScanningStart?.(taskId);
const close = (settle: () => void) => {
if (!open) return;
open = false;
settle();
};
return {
taskId,
complete: () => close(() => callbacks?.onScanningEnd?.(taskId)),
fail: (error) => close(() => callbacks?.onTaskFailed?.(taskId, formatUploadError(error))),
cancel: () => close(() => callbacks?.onTaskCancelled?.(taskId)),
isOpen: () => open,
};
}
/**
* Detect root folders from drop entries for bundled task creation
*/
export function detectRootFolders(entries: DropEntry[]): Map<string, DropEntry[]> {
const rootFolders = new Map<string, DropEntry[]>();
for (const entry of entries) {
const parts = entry.relativePath.split('/');
const rootName = parts[0];
// Group if there's more than one part (from a folder) or the entry is a directory
if (parts.length > 1 || entry.isDirectory) {
if (!rootFolders.has(rootName)) {
rootFolders.set(rootName, []);
}
rootFolders.get(rootName)!.push(entry);
} else {
// Standalone file - use its name as key with special prefix
const key = `__file__${entry.relativePath}`;
rootFolders.set(key, [entry]);
}
}
return rootFolders;
}
/**
* Sort entries: directories first, then by path depth
*/
export function sortEntries(entries: DropEntry[]): DropEntry[] {
return [...entries].sort((a, b) => {
if (a.isDirectory && !b.isDirectory) return -1;
if (!a.isDirectory && b.isDirectory) return 1;
const aDepth = a.relativePath.split('/').length;
const bDepth = b.relativePath.split('/').length;
return aDepth - bDepth;
});
}
// ============================================================================
// Upload Controller
// ============================================================================
/**
* Controller for managing upload operations with cancellation support
*/
export { UploadController } from "./uploadController";
import { UploadController } from "./uploadController";
// ============================================================================
// Core Upload Function
// ============================================================================
/**
* Upload files from a DataTransfer object with bundled folder support
*
* @param dataTransfer - The DataTransfer object from a drop event
* @param config - Upload configuration
* @param controller - Optional upload controller for cancellation
* @returns Array of upload results
*/
export async function uploadFromDataTransfer(
dataTransfer: DataTransfer,
config: UploadConfig,
controller?: UploadController
): Promise<UploadResult[]> {
const { targetPath, sftpId, isLocal, bridge, joinPath, callbacks, useCompressedUpload, resolveConflict } = config;
// Reset controller if provided
if (controller) {
controller.reset();
controller.setBridge(bridge);
}
// Create scanning placeholder
const scanT0 = performance.now();
const scanningTask = startUploadScanningTask(callbacks);
let entries: DropEntry[];
try {
entries = await extractDropEntries(dataTransfer);
} catch (error) {
scanningTask.complete();
throw error;
}
scanningTask.complete();
logger.debug(`[SFTP:perf] extractDropEntries — ${entries.length} entries — ${(performance.now() - scanT0).toFixed(0)}ms`);
if (entries.length === 0) {
return [];
}
// Check if this is a folder upload and compressed upload is enabled
if (useCompressedUpload && !resolveConflict && !isLocal && sftpId) {
const rootFolders = detectRootFolders(entries);
const folderEntries = Array.from(rootFolders.entries()).filter(([key]) => !key.startsWith("__file__"));
const standaloneFileEntries = Array.from(rootFolders.entries()).filter(([key]) => key.startsWith("__file__"));
if (folderEntries.length > 0) {
try {
const compressedResults = await uploadFoldersCompressed(folderEntries, targetPath, sftpId, callbacks, controller);
// Check if any folders failed due to lack of compression support
const failedFolders = compressedResults.filter(result =>
!result.success && result.error === "Compressed upload not supported - fallback needed"
);
const successfulFolders = compressedResults.filter(result =>
result.success || result.error !== "Compressed upload not supported - fallback needed"
);
let fallbackResults: UploadResult[] = [];
if (failedFolders.length > 0) {
// Get entries only for failed folders, not already successful ones
const failedFolderNames = new Set(failedFolders.map(f => f.fileName));
const failedFolderEntries = entries.filter(entry => {
const topFolder = entry.relativePath.split('/')[0];
return failedFolderNames.has(topFolder);
});
if (failedFolderEntries.length > 0) {
fallbackResults = await uploadEntries(failedFolderEntries, targetPath, sftpId, isLocal, bridge, joinPath, callbacks, controller, resolveConflict);
}
}
// Upload standalone files using regular upload if any exist
let standaloneResults: UploadResult[] = [];
if (standaloneFileEntries.length > 0) {
const standaloneEntries = standaloneFileEntries.flatMap(([, entries]) => entries);
standaloneResults = await uploadEntries(standaloneEntries, targetPath, sftpId, isLocal, bridge, joinPath, callbacks, controller, resolveConflict);
}
// Combine results: successful compressed + fallback results + standalone files
return [...successfulFolders, ...fallbackResults, ...standaloneResults];
} catch {
// Fall back to regular upload
return uploadEntries(entries, targetPath, sftpId, isLocal, bridge, joinPath, callbacks, controller, resolveConflict);
}
}
}
return uploadEntries(entries, targetPath, sftpId, isLocal, bridge, joinPath, callbacks, controller, resolveConflict);
}
/**
* Upload a FileList or File array with bundled folder support
*/
export async function uploadFromFileList(
fileList: FileList | File[],
config: UploadConfig,
controller?: UploadController
): Promise<UploadResult[]> {
const { targetPath, sftpId, isLocal, bridge, joinPath, callbacks, useCompressedUpload, resolveConflict } = config;
if (controller) {
controller.reset();
controller.setBridge(bridge);
}
// Convert FileList to DropEntry array
// Use webkitRelativePath for folder uploads, fallback to file.name for regular file uploads
const entries: DropEntry[] = Array.from(fileList).map(file => {
const localPath = getPathForFile(file);
// Use webkitRelativePath if available (folder upload), otherwise use file.name (regular file upload)
const relativePath = (file as File & { webkitRelativePath?: string }).webkitRelativePath || file.name;
if (localPath) {
// Set the path property on the file for stream transfer
(file as File & { path?: string }).path = localPath;
}
return {
file,
relativePath,
isDirectory: false,
};
});
if (entries.length === 0) {
return [];
}
// Check if this is a folder upload and compressed upload is enabled
if (useCompressedUpload && !resolveConflict && !isLocal && sftpId) {
const rootFolders = detectRootFolders(entries);
const folderEntries = Array.from(rootFolders.entries()).filter(([key]) => !key.startsWith("__file__"));
const standaloneFileEntries = Array.from(rootFolders.entries()).filter(([key]) => key.startsWith("__file__"));
if (folderEntries.length > 0) {
try {
const compressedResults = await uploadFoldersCompressed(folderEntries, targetPath, sftpId, callbacks, controller);
// Check if any folders failed due to lack of compression support
const failedFolders = compressedResults.filter(result =>
!result.success && result.error === "Compressed upload not supported - fallback needed"
);
const successfulFolders = compressedResults.filter(result =>
result.success || result.error !== "Compressed upload not supported - fallback needed"
);
let fallbackResults: UploadResult[] = [];
if (failedFolders.length > 0) {
// Get entries only for failed folders, not already successful ones
const failedFolderNames = new Set(failedFolders.map(f => f.fileName));
const failedFolderEntries = entries.filter(entry => {
const topFolder = entry.relativePath.split('/')[0];
return failedFolderNames.has(topFolder);
});
if (failedFolderEntries.length > 0) {
fallbackResults = await uploadEntries(failedFolderEntries, targetPath, sftpId, isLocal, bridge, joinPath, callbacks, controller, resolveConflict);
}
}
// Upload standalone files using regular upload if any exist
let standaloneResults: UploadResult[] = [];
if (standaloneFileEntries.length > 0) {
const standaloneEntries = standaloneFileEntries.flatMap(([, entries]) => entries);
standaloneResults = await uploadEntries(standaloneEntries, targetPath, sftpId, isLocal, bridge, joinPath, callbacks, controller, resolveConflict);
}
// Combine results: successful compressed + fallback results + standalone files
return [...successfulFolders, ...fallbackResults, ...standaloneResults];
} catch {
// Fall back to regular upload
return uploadEntries(entries, targetPath, sftpId, isLocal, bridge, joinPath, callbacks, controller, resolveConflict);
}
}
}
return uploadEntries(entries, targetPath, sftpId, isLocal, bridge, joinPath, callbacks, controller, resolveConflict);
}
/**
* Core upload logic for entries
*/
async function uploadEntries(
entries: DropEntry[],
targetPath: string,
sftpId: string | null,
isLocal: boolean,
bridge: UploadBridge,
joinPath: (base: string, name: string) => string,
callbacks?: UploadCallbacks,
controller?: UploadController,
resolveConflict?: UploadConfig["resolveConflict"]
): Promise<UploadResult[]> {
const results: UploadResult[] = [];
const createdDirs = new Set<string>();
const failedDirs = new Map<string, string>();
const reportedDirectoryFailures = new Set<string>();
let wasCancelled = false;
if (controller?.isCancelled()) {
return [{ fileName: "", success: false, cancelled: true }];
}
const statTarget = async (path: string) => {
try {
if (isLocal) return await bridge.statLocal?.(path);
if (sftpId) return await bridge.statSftp?.(sftpId, path);
} catch {
return null;
}
return null;
};
const deleteTarget = async (path: string) => {
if (isLocal) {
await bridge.deleteLocalFile?.(path);
} else if (sftpId) {
await bridge.deleteSftp?.(sftpId, path);
}
};
const splitNameForDuplicate = (name: string, isDirectory: boolean) => {
if (isDirectory) return { baseName: name, ext: "" };
const lastDot = name.lastIndexOf(".");
if (lastDot <= 0) return { baseName: name, ext: "" };
return { baseName: name.slice(0, lastDot), ext: name.slice(lastDot) };
};
const getDuplicateName = async (name: string, isDirectory: boolean) => {
const { baseName, ext } = splitNameForDuplicate(name, isDirectory);
for (let index = 1; index < 1000; index++) {
const suffix = index === 1 ? " (copy)" : ` (copy ${index})`;
const candidate = `${baseName}${suffix}${ext}`;
const candidatePath = joinPath(targetPath, candidate);
const existing = await statTarget(candidatePath);
if (!existing) return candidate;
}
return `${baseName} (copy ${Date.now()})${ext}`;
};
const renameRoot = (entry: DropEntry, oldName: string, newName: string): DropEntry => {
if (entry.relativePath === oldName) {
return { ...entry, relativePath: newName };
}
if (entry.relativePath.startsWith(`${oldName}/`)) {
return { ...entry, relativePath: `${newName}/${entry.relativePath.slice(oldName.length + 1)}` };
}
return entry;
};
const ensureDirectory = async (dirPath: string): Promise<string | null> => {
if (createdDirs.has(dirPath)) return null;
const previousFailure = failedDirs.get(dirPath);
if (previousFailure) return previousFailure;
try {
if (isLocal) {
if (bridge.mkdirLocal) {
await bridge.mkdirLocal(dirPath);
}
} else if (sftpId) {
await bridge.mkdirSftp(sftpId, dirPath);
}
createdDirs.add(dirPath);
return null;
} catch (error) {
const errorMessage = formatUploadError(error);
failedDirs.set(dirPath, errorMessage);
return errorMessage;
}
};
// Group entries by root folder
const rootFolders = detectRootFolders(entries);
let resolvedEntries = entries;
if (resolveConflict) {
const resolved: DropEntry[] = [];
let stop = false;
const groupInfos = await Promise.all(Array.from(rootFolders.entries()).map(async ([key, groupEntries]) => {
const isStandaloneFile = key.startsWith("__file__");
const rootName = isStandaloneFile ? key.slice("__file__".length) : key;
const isDirectory = !isStandaloneFile;
const rootTargetPath = joinPath(targetPath, rootName);
const existing = await statTarget(rootTargetPath);
return {
groupEntries,
rootName,
isDirectory,
rootTargetPath,
existing,
};
}));
const conflictCounts = new Map<string, number>();
for (const info of groupInfos) {
if (!info.existing) continue;
const conflictKey = getSftpConflictTypeKey(info.isDirectory, info.existing.type);
conflictCounts.set(conflictKey, (conflictCounts.get(conflictKey) ?? 0) + 1);
}
for (const { groupEntries, rootName, isDirectory, rootTargetPath, existing } of groupInfos) {
if (stop || controller?.isCancelled()) break;
if (!existing) {
resolved.push(...groupEntries);
continue;
}
const conflictKey = getSftpConflictTypeKey(isDirectory, existing.type);
const newSize = groupEntries.reduce((sum, entry) => sum + getDropEntrySize(entry), 0);
const action = await resolveConflict({
fileName: rootName,
targetPath: rootTargetPath,
isDirectory,
existingType: existing.type,
existingSize: existing.size,
newSize,
existingModified: existing.lastModified,
newModified: Date.now(),
applyToAllCount: conflictCounts.get(conflictKey) ?? 1,
});
if (action === "stop") {
stop = true;
await controller?.cancel();
resolved.length = 0;
results.push({ fileName: rootName, success: false, cancelled: true });
break;
}
if (action === "skip") {
results.push({ fileName: rootName, success: false, cancelled: true });
continue;
}
if (action === "replace") {
if (!canReplaceSftpConflict(isDirectory, existing.type)) {
results.push({
fileName: rootName,
success: false,
error: `Cannot replace existing ${describeSftpExistingKind(existing.type)} with ${describeSftpIncomingKind(isDirectory)}: ${rootTargetPath}`,
});
continue;
}
await deleteTarget(rootTargetPath);
resolved.push(...groupEntries);
continue;
}
if (action === "duplicate") {
const duplicateName = await getDuplicateName(rootName, isDirectory);
resolved.push(...groupEntries.map((entry) => renameRoot(entry, rootName, duplicateName)));
continue;
}
if (action === "merge" && !(isDirectory && existing.type === "directory")) {
results.push({
fileName: rootName,
success: false,
error: `Cannot merge existing ${describeSftpExistingKind(existing.type)} with ${describeSftpIncomingKind(isDirectory)}: ${rootTargetPath}`,
});
continue;
}
resolved.push(...groupEntries);
}
resolvedEntries = resolved;
}
if (resolvedEntries.length === 0) {
return results;
}
const resolvedRootFolders = detectRootFolders(resolvedEntries);
const sortedEntries = sortEntries(resolvedEntries);
const explicitDirectoryPaths = new Map<string, string>();
// Pre-create all needed directories in batch before file transfers
const uploadT0 = performance.now();
logger.debug(`[SFTP:perf] uploadEntries START — ${sortedEntries.length} entries, ${sortedEntries.filter(e => !e.isDirectory).length} files`);
const allDirPaths = new Set<string>();
for (const entry of sortedEntries) {
if (entry.isDirectory) {
const dirPath = joinPath(targetPath, entry.relativePath);
allDirPaths.add(dirPath);
explicitDirectoryPaths.set(dirPath, entry.relativePath);
} else {
const pathParts = entry.relativePath.split('/');
if (pathParts.length > 1) {
let parentPath = targetPath;
for (let i = 0; i < pathParts.length - 1; i++) {
parentPath = joinPath(parentPath, pathParts[i]);
allDirPaths.add(parentPath);
}
}
}
}
// Create directories in sorted order (parents before children) with limited concurrency
const sortedDirPaths = Array.from(allDirPaths).sort();
// Group by depth and create each depth level in parallel
const dirsByDepth = new Map<number, string[]>();
for (const dirPath of sortedDirPaths) {
const depth = dirPath.split('/').length;
const group = dirsByDepth.get(depth) || [];
group.push(dirPath);
dirsByDepth.set(depth, group);
}
const sortedDepths = Array.from(dirsByDepth.keys()).sort((a, b) => a - b);
for (const depth of sortedDepths) {
const dirs = dirsByDepth.get(depth)!;
const directoryResults = await Promise.all(dirs.map(async (dirPath) => ({
dirPath,
error: await ensureDirectory(dirPath),
})));
for (const { dirPath, error } of directoryResults) {
if (!error) continue;
const relativePath = explicitDirectoryPaths.get(dirPath);
if (!relativePath || reportedDirectoryFailures.has(relativePath)) continue;
reportedDirectoryFailures.add(relativePath);
results.push({ fileName: relativePath, success: false, error });
}
if (controller?.isCancelled()) {
wasCancelled = true;
break;
}
}
logger.debug(`[SFTP:perf] batch mkdir done — ${allDirPaths.size} dirs — ${(performance.now() - uploadT0).toFixed(0)}ms`);
// Track bundled task progress
const bundleProgress = new Map<string, {
totalBytes: number;
transferredBytes: number;
fileCount: number;
completedCount: number;
failedCount: number;
currentSpeed: number;
completedFilesBytes: number;
}>();
const pendingTaskIds = new Set<string>();
// Create bundled tasks for each root folder
const bundleTaskIds = new Map<string, string>(); // rootName -> bundleTaskId
for (const [rootName, rootEntries] of resolvedRootFolders) {
const isStandaloneFile = rootName.startsWith("__file__");
if (isStandaloneFile) continue;
// Calculate total bytes for this folder
let totalBytes = 0;
let fileCount = 0;
for (const entry of rootEntries) {
if (!entry.isDirectory && entry.file) {
totalBytes += entry.file.size;
fileCount++;
}
}
if (fileCount === 0) continue;
const bundleTaskId = crypto.randomUUID();
bundleTaskIds.set(rootName, bundleTaskId);
bundleProgress.set(bundleTaskId, {
totalBytes,
transferredBytes: 0,
fileCount,
completedCount: 0,
failedCount: 0,
currentSpeed: 0,
completedFilesBytes: 0,
});
// Notify task created
if (callbacks?.onTaskCreated) {
const displayName = rootName;
callbacks.onTaskCreated({
id: bundleTaskId,
fileName: rootName,
displayName,
isDirectory: true,
progressMode: 'files',
totalBytes: fileCount,
transferredBytes: 0,
speed: 0,
fileCount,
completedCount: 0,
});
pendingTaskIds.add(bundleTaskId);
}
}
// Helper to get bundle task ID for an entry
const getBundleTaskId = (entry: DropEntry): string | null => {
const parts = entry.relativePath.split('/');
const rootName = parts[0];
if (parts.length > 1 || entry.isDirectory) {
return bundleTaskIds.get(rootName) || null;
}
return null;
};
// Upload a single file entry — returns result and handles progress
const uploadSingleFile = async (
entry: DropEntry,
entryTargetPath: string,
standaloneTransferId: string,
fileTotalBytes: number,
): Promise<{ cancelled?: boolean; error?: string }> => {
const localFilePath = getDropEntryLocalPath(entry);
// Progress callback factory for both stream and memory paths
const makeOnProgress = () => {
let pendingProgressUpdate: { transferred: number; total: number; speed: number } | null = null;
let rafScheduled = false;
return (transferred: number, total: number, speed: number) => {
if (controller?.isCancelled()) return;
pendingProgressUpdate = { transferred, total, speed };
if (!rafScheduled) {
rafScheduled = true;
requestAnimationFrame(() => {
rafScheduled = false;
const update = pendingProgressUpdate;
pendingProgressUpdate = null;
if (!update || controller?.isCancelled() || !callbacks?.onTaskProgress) return;
if (standaloneTransferId) {
callbacks.onTaskProgress(standaloneTransferId, {
transferred: update.transferred,
total: update.total,
speed: update.speed,
percent: update.total > 0 ? (update.transferred / update.total) * 100 : 0,
});
}
});
}
};
};
if (localFilePath && bridge.startStreamTransfer && (!isLocal ? !!sftpId : true)) {
const onProgress = makeOnProgress();
const fileTransferId = crypto.randomUUID();
controller?.addActiveTransfer(fileTransferId);
let streamResult: { transferId: string; totalBytes?: number; error?: string; cancelled?: boolean } | undefined;
try {
streamResult = await bridge.startStreamTransfer(
{
transferId: fileTransferId,
sourcePath: localFilePath,
targetPath: entryTargetPath,
sourceType: 'local',
targetType: isLocal ? 'local' : 'sftp',
targetSftpId: isLocal ? undefined : sftpId,
totalBytes: fileTotalBytes,
},
onProgress,
undefined,
undefined
);
} finally {
controller?.removeActiveTransfer(fileTransferId);
}
if (streamResult?.cancelled || streamResult?.error?.includes('cancelled')) {
return { cancelled: true };
}
if (streamResult?.error) {
return { error: streamResult.error };
}
} else {
if (!entry.file) {
return { error: "No local file data available" };
}
const arrayBuffer = await entry.file.arrayBuffer();
if (isLocal) {
if (!bridge.writeLocalFile) throw new Error("writeLocalFile not available");
await bridge.writeLocalFile(entryTargetPath, arrayBuffer);
} else if (sftpId) {
if (bridge.writeSftpBinaryWithProgress) {
const onProgress = makeOnProgress();
const fileTransferId = crypto.randomUUID();
controller?.addActiveTransfer(fileTransferId);
let result;
try {
result = await bridge.writeSftpBinaryWithProgress(
sftpId,
entryTargetPath,
arrayBuffer,
fileTransferId,
onProgress,
() => {},
() => {}
);
} finally {
controller?.removeActiveTransfer(fileTransferId);
}
if (result?.cancelled) {
return { cancelled: true };
}
if (!result || result.success === false) {
if (bridge.writeSftpBinary) {
await bridge.writeSftpBinary(sftpId, entryTargetPath, arrayBuffer);
} else {
return { error: "Upload failed and no fallback method available" };
}
}
} else if (bridge.writeSftpBinary) {
await bridge.writeSftpBinary(sftpId, entryTargetPath, arrayBuffer);
} else {
return { error: "No SFTP write method available" };
}
}
}
return {};
};
// Filter to only file entries (directories are pre-created above)
const fileEntries = sortedEntries.filter(isUploadableFileEntry);
// Create standalone task entries upfront so they're visible immediately.
// Bundled child tasks are created lazily when upload actually starts, so
// large folder uploads don't flood React state before work begins.
const standaloneTaskIds = new Map<string, string>(); // relativePath -> taskId
for (const entry of fileEntries) {
const bundleTaskId = getBundleTaskId(entry);
if (!bundleTaskId) {
const taskId = crypto.randomUUID();
standaloneTaskIds.set(entry.relativePath, taskId);
if (callbacks?.onTaskCreated) {
callbacks.onTaskCreated({
id: taskId,
fileName: entry.relativePath,
displayName: entry.relativePath,
isDirectory: false,
progressMode: 'bytes',
totalBytes: getDropEntrySize(entry),
transferredBytes: 0,
speed: 0,
fileCount: 1,
completedCount: 0,
});
pendingTaskIds.add(taskId);
}
}
}
const createBundledChildTask = (entry: DropEntry, bundleTaskId: string): string => {
const taskId = crypto.randomUUID();
if (callbacks?.onTaskCreated) {
callbacks.onTaskCreated({
id: taskId,
fileName: entry.relativePath,
displayName: entry.relativePath,
isDirectory: false,
progressMode: 'bytes',
parentTaskId: bundleTaskId,
totalBytes: getDropEntrySize(entry),
transferredBytes: 0,
speed: 0,
fileCount: 1,
completedCount: 0,
});
pendingTaskIds.add(taskId);
}
return taskId;
};
const settleTask = (
taskId: string,
settle: (taskId: string) => void,
) => {
if (!taskId) return;
if (!pendingTaskIds.delete(taskId)) return;
settle(taskId);
};
const UPLOAD_CONCURRENCY = 4;
try {
let entryIndex = 0;
const worker = async () => {
while (entryIndex < fileEntries.length) {
if (controller?.isCancelled() || wasCancelled) break;
const idx = entryIndex++;
const entry = fileEntries[idx];
const entryTargetPath = joinPath(targetPath, entry.relativePath);
const bundleTaskId = getBundleTaskId(entry);
const bundledChildTaskId = bundleTaskId ? createBundledChildTask(entry, bundleTaskId) : "";
const standaloneTransferId = standaloneTaskIds.get(entry.relativePath) || "";
const fileTotalBytes = getDropEntrySize(entry);
try {
const uploadResult = await uploadSingleFile(
entry,
entryTargetPath,
bundledChildTaskId || standaloneTransferId,
fileTotalBytes,
);
if (uploadResult.cancelled) {
wasCancelled = true;
settleTask(bundledChildTaskId, (taskId) => callbacks?.onTaskCancelled?.(taskId));
settleTask(bundleTaskId ?? "", (taskId) => callbacks?.onTaskCancelled?.(taskId));
settleTask(!bundleTaskId ? standaloneTransferId : "", (taskId) => callbacks?.onTaskCancelled?.(taskId));
break;
}
if (uploadResult.error) {
throw new Error(uploadResult.error);
}
results.push({ fileName: entry.relativePath, success: true });
// Update progress tracking
if (bundleTaskId) {
const progress = bundleProgress.get(bundleTaskId);
if (bundledChildTaskId) {
settleTask(bundledChildTaskId, (taskId) => callbacks?.onTaskCompleted?.(taskId, fileTotalBytes));
}
if (progress) {
progress.completedCount++;
progress.completedFilesBytes += fileTotalBytes;
progress.transferredBytes = progress.completedCount;
if (progress.completedCount >= progress.fileCount) {
callbacks?.onTaskProgress?.(bundleTaskId, {
transferred: progress.fileCount,
total: progress.fileCount,
speed: 0,
percent: 100,
});
settleTask(bundleTaskId, (taskId) => callbacks?.onTaskCompleted?.(taskId, progress.fileCount));
} else {
callbacks?.onTaskProgress?.(bundleTaskId, {
transferred: progress.completedCount,
total: progress.fileCount,
speed: 0,
percent: progress.fileCount > 0 ? (progress.completedCount / progress.fileCount) * 100 : 0,
});
}
}
} else if (standaloneTransferId) {
settleTask(standaloneTransferId, (taskId) => callbacks?.onTaskCompleted?.(taskId, fileTotalBytes));
}
} catch (error) {
if (controller?.isCancelled()) {
wasCancelled = true;
settleTask(bundledChildTaskId, (taskId) => callbacks?.onTaskCancelled?.(taskId));
settleTask(bundleTaskId ?? "", (taskId) => callbacks?.onTaskCancelled?.(taskId));
settleTask(!bundleTaskId ? standaloneTransferId : "", (taskId) => callbacks?.onTaskCancelled?.(taskId));
break;
}
const errorMessage = formatUploadError(error);
results.push({ fileName: entry.relativePath, success: false, error: errorMessage });
if (bundleTaskId) {
const progress = bundleProgress.get(bundleTaskId);
if (progress) {
progress.failedCount++;
}
}
settleTask(bundledChildTaskId, (taskId) => callbacks?.onTaskFailed?.(taskId, errorMessage));
settleTask(!bundleTaskId ? standaloneTransferId : "", (taskId) => callbacks?.onTaskFailed?.(taskId, errorMessage));
}
}
};
const workers = Array.from(
{ length: Math.min(UPLOAD_CONCURRENCY, fileEntries.length || 1) },
() => worker(),
);
await Promise.all(workers);
if (!wasCancelled) {
for (const [bundleTaskId, progress] of bundleProgress) {
if (progress.failedCount > 0) {
settleTask(bundleTaskId, (taskId) => {
callbacks?.onTaskFailed?.(
taskId,
progress.failedCount === progress.fileCount
? `All ${progress.fileCount} files failed`
: `${progress.failedCount} of ${progress.fileCount} files failed`,
);
});
}
}
}
// Mark any remaining incomplete tasks as cancelled if upload was cancelled
if (wasCancelled) {
for (const pendingTaskId of Array.from(pendingTaskIds)) {
settleTask(pendingTaskId, (taskId) => callbacks?.onTaskCancelled?.(taskId));
}
}
} finally {
controller?.clearCurrentTransfer();
}
if (wasCancelled) {
results.push({ fileName: "", success: false, cancelled: true });
}
return results;
}
/**
* Upload entries directly (used when entries are already extracted)
*/
export async function uploadEntriesDirect(
entries: DropEntry[],
config: UploadConfig,
controller?: UploadController
): Promise<UploadResult[]> {
const { targetPath, sftpId, isLocal, bridge, joinPath, callbacks, useCompressedUpload, resolveConflict } = config;
if (controller?.isCancelled()) {
return [{ fileName: "", success: false, cancelled: true }];
}
if (controller) {
controller.reset();
controller.setBridge(bridge);
}
if (entries.length === 0) {
return [];
}
// Support compressed folder uploads (same logic as uploadFromDataTransfer)
if (useCompressedUpload && !resolveConflict && !isLocal && sftpId) {
const rootFolders = detectRootFolders(entries);
const folderEntries = Array.from(rootFolders.entries()).filter(([key]) => !key.startsWith("__file__"));
const standaloneFileEntries = Array.from(rootFolders.entries()).filter(([key]) => key.startsWith("__file__"));
if (folderEntries.length > 0) {
try {
const compressedResults = await uploadFoldersCompressed(folderEntries, targetPath, sftpId, callbacks, controller);
const failedFolders = compressedResults.filter(result =>
!result.success && result.error === "Compressed upload not supported - fallback needed"
);
const successfulFolders = compressedResults.filter(result =>
result.success || result.error !== "Compressed upload not supported - fallback needed"
);
let fallbackResults: UploadResult[] = [];
if (failedFolders.length > 0) {
const failedFolderNames = new Set(failedFolders.map(f => f.fileName));
const failedFolderEntries = entries.filter(entry => {
const topFolder = entry.relativePath.split('/')[0];
return failedFolderNames.has(topFolder);
});
if (failedFolderEntries.length > 0) {
fallbackResults = await uploadEntries(failedFolderEntries, targetPath, sftpId, isLocal, bridge, joinPath, callbacks, controller, resolveConflict);
}
}
let standaloneResults: UploadResult[] = [];
if (standaloneFileEntries.length > 0) {
const standaloneEntries = standaloneFileEntries.flatMap(([, e]) => e);
standaloneResults = await uploadEntries(standaloneEntries, targetPath, sftpId, isLocal, bridge, joinPath, callbacks, controller, resolveConflict);
}
return [...successfulFolders, ...fallbackResults, ...standaloneResults];
} catch {
return uploadEntries(entries, targetPath, sftpId, isLocal, bridge, joinPath, callbacks, controller, resolveConflict);
}
}
}
return uploadEntries(entries, targetPath, sftpId, isLocal, bridge, joinPath, callbacks, controller, resolveConflict);
}