[codex] Add serial YMODEM receive (#1438)

* Add serial YMODEM receive

* Address YMODEM receive review feedback
This commit is contained in:
陈大猫
2026-06-12 15:47:24 +08:00
committed by GitHub
parent 2b396c14e3
commit 550a37b379
16 changed files with 767 additions and 30 deletions

View File

@@ -29,7 +29,7 @@ const { createPtyOutputBuffer } = require("./ptyOutputBuffer.cjs");
const { enableTcpNoDelay } = require("./tcpNoDelay.cjs");
const { releaseConnectionRef } = require("./sshConnectionPool.cjs");
const { normalizeTerminalEncoding, encodeTerminalInput } = require("./terminalEncoding.cjs");
const { sendYmodemCancel, sendYmodemFile } = require("./ymodemTransfer.cjs");
const { receiveYmodemFiles, sendYmodemCancel, sendYmodemFile } = require("./ymodemTransfer.cjs");
const execFileAsync = promisify(execFile);
@@ -784,6 +784,47 @@ async function sendSerialYmodem(_event, payload) {
}
}
async function receiveSerialYmodem(_event, payload) {
const session = sessions.get(payload?.sessionId);
if (!session || !session.serialPort || session.type !== 'serial') {
return { success: false, error: "YMODEM receive requires an active serial session" };
}
if (session.ymodemActive) {
return { success: false, error: "A YMODEM transfer is already in progress" };
}
if (session.zmodemSentry?.isActive()) {
return { success: false, error: "Another serial file transfer is already in progress" };
}
if (!payload?.destinationDir || typeof payload.destinationDir !== "string") {
return { success: false, error: "No destination directory selected" };
}
const abortController = new AbortController();
session.ymodemActive = true;
session.ymodemAbortController = abortController;
try {
const result = await receiveYmodemFiles(session.serialPort, {
destinationDir: payload.destinationDir,
abortSignal: abortController.signal,
timeoutMs: Number.isFinite(payload.timeoutMs) ? payload.timeoutMs : undefined,
});
return { success: true, ...result };
} catch (error) {
if (error?.code !== "YMODEM_CANCELLED" && error?.code !== "YMODEM_REMOTE_CANCELLED") {
await sendYmodemCancel(session.serialPort);
}
return {
success: false,
error: error instanceof Error ? error.message : String(error),
code: error?.code,
};
} finally {
session.ymodemActive = false;
session.ymodemAbortController = null;
}
}
/**
* Pause or resume a session's source stream for output back-pressure.
* The renderer asks for this when its write backlog crosses a watermark, so a
@@ -950,6 +991,7 @@ function registerHandlers(ipcMain) {
ipcMain.handle("netcatty:serial:start", startSerialSession);
ipcMain.handle("netcatty:serial:list", listSerialPorts);
ipcMain.handle("netcatty:serial:ymodem-send", sendSerialYmodem);
ipcMain.handle("netcatty:serial:ymodem-receive", receiveSerialYmodem);
ipcMain.handle("netcatty:local:defaultShell", getDefaultShell);
ipcMain.handle("netcatty:local:validatePath", validatePath);
ipcMain.handle("netcatty:shells:discover", () => discoverShells());
@@ -1116,6 +1158,7 @@ module.exports = {
bundledEtClient,
startSerialSession,
sendSerialYmodem,
receiveSerialYmodem,
listSerialPorts,
writeToSession,
setSessionEncoding,

View File

@@ -1,5 +1,8 @@
const test = require("node:test");
const assert = require("node:assert/strict");
const fs = require("node:fs");
const os = require("node:os");
const path = require("node:path");
const terminalBridge = require("./terminalBridge.cjs");
const { YMODEM } = require("./ymodemTransfer.cjs");
@@ -75,3 +78,67 @@ test("YMODEM send is refused while ZMODEM owns the same serial session", async (
assert.equal(result.success, false);
assert.match(result.error, /file transfer is already in progress/);
});
test("YMODEM receive is refused while ZMODEM owns the same serial session", async () => {
const sessions = new Map();
sessions.set("serial-1", {
type: "serial",
protocol: "serial",
serialPort: makeSerialPort(),
zmodemSentry: {
isActive() {
return true;
},
},
});
terminalBridge.init({ sessions, electronModule: {} });
const result = await terminalBridge.receiveSerialYmodem({ sender: {} }, {
sessionId: "serial-1",
destinationDir: "/tmp",
});
assert.equal(result.success, false);
assert.match(result.error, /file transfer is already in progress/);
});
test("YMODEM receive sends cancel bytes when the receive setup fails", async () => {
const targetDir = fs.mkdtempSync(path.join(os.tmpdir(), "netcatty-ymodem-bridge-"));
try {
const destinationFile = path.join(targetDir, "not-a-directory");
fs.writeFileSync(destinationFile, "not a directory");
const sessions = new Map();
const serialPort = makeSerialPort();
sessions.set("serial-1", {
type: "serial",
protocol: "serial",
serialPort,
});
terminalBridge.init({ sessions, electronModule: {} });
const result = await terminalBridge.receiveSerialYmodem({ sender: {} }, {
sessionId: "serial-1",
destinationDir: destinationFile,
});
assert.equal(result.success, false);
assert.deepEqual(
[...serialPort.writes[0]],
[
YMODEM.CAN,
YMODEM.CAN,
YMODEM.CAN,
YMODEM.CAN,
YMODEM.CAN,
YMODEM.BACKSPACE,
YMODEM.BACKSPACE,
YMODEM.BACKSPACE,
YMODEM.BACKSPACE,
YMODEM.BACKSPACE,
],
);
} finally {
fs.rmSync(targetDir, { recursive: true, force: true });
}
});

View File

@@ -93,6 +93,52 @@ function createYmodemEndSessionPacket() {
return createPacket(YMODEM.SOH, 0, Buffer.alloc(YMODEM.PACKET_SIZE_128, 0x00));
}
function sanitizeYmodemFilename(filename) {
const normalized = String(filename || "").replace(/\\/g, "/");
const baseName = path.basename(normalized).replace(/[<>:"/\\|?*\x00-\x1f]/g, "_").trim();
if (!baseName || baseName === "." || baseName === "..") {
return "ymodem-received.bin";
}
return baseName;
}
function parseYmodemFileInfoPayload(payload) {
const separatorIndex = payload.indexOf(0x00);
const rawName = (separatorIndex >= 0 ? payload.subarray(0, separatorIndex) : payload).toString("utf8");
if (!rawName) return null;
const metadata = separatorIndex >= 0
? payload.subarray(separatorIndex + 1).toString("ascii").replace(/\0.*$/u, "").trim()
: "";
const [sizeText] = metadata.split(/\s+/u);
if (!/^\d+$/u.test(sizeText || "")) {
throw new YmodemTransferError("YMODEM file header has an invalid size", "YMODEM_INVALID_SIZE");
}
const parsedSize = Number.parseInt(sizeText, 10);
return {
fileName: sanitizeYmodemFilename(rawName),
totalBytes: Number.isFinite(parsedSize) && parsedSize >= 0 ? parsedSize : 0,
};
}
async function resolveUniqueDestinationPath(destinationDir, fileName) {
const parsed = path.parse(fileName);
for (let attempt = 0; attempt < 10_000; attempt += 1) {
const candidateName = attempt === 0
? fileName
: `${parsed.name} (${attempt})${parsed.ext}`;
const candidatePath = path.join(destinationDir, candidateName);
try {
await fs.promises.access(candidatePath);
} catch (error) {
if (error?.code === "ENOENT") return candidatePath;
throw error;
}
}
throw new YmodemTransferError("Could not choose a destination file name", "YMODEM_DESTINATION_EXISTS");
}
function writeAndDrain(serialPort, buffer) {
return new Promise((resolve, reject) => {
let settled = false;
@@ -229,6 +275,268 @@ async function sendPacketWithRetry({ serialPort, reader, packet, timeoutMs, retr
throw new YmodemTransferError("YMODEM receiver rejected the packet too many times", "YMODEM_RETRY_LIMIT");
}
async function readYmodemFrame(reader, timeoutMs) {
const header = await reader.readByte(timeoutMs);
if (header === YMODEM.CAN) {
const next = await reader.readByte(1_000).catch(() => null);
if (next === YMODEM.CAN) {
throw new YmodemTransferError("YMODEM transfer cancelled by sender", "YMODEM_REMOTE_CANCELLED");
}
if (next !== null) {
reader.unreadByte?.(next);
}
return readYmodemFrame(reader, timeoutMs);
}
if (header === YMODEM.EOT) {
return { type: "eot" };
}
if (header !== YMODEM.SOH && header !== YMODEM.STX) {
throw new YmodemTransferError(`Unexpected YMODEM packet header: 0x${header.toString(16)}`);
}
const payloadSize = header === YMODEM.SOH ? YMODEM.PACKET_SIZE_128 : YMODEM.PACKET_SIZE_1024;
const blockNumber = await reader.readByte(timeoutMs);
const blockComplement = await reader.readByte(timeoutMs);
const payload = Buffer.alloc(payloadSize);
for (let i = 0; i < payloadSize; i += 1) {
payload[i] = await reader.readByte(timeoutMs);
}
const sentCrc = ((await reader.readByte(timeoutMs)) << 8) | await reader.readByte(timeoutMs);
const expectedCrc = crc16Xmodem(payload);
const valid = blockComplement === (0xff - blockNumber) && sentCrc === expectedCrc;
return {
type: "packet",
blockNumber,
payload,
valid,
};
}
async function readYmodemPacketWithRetry({
serialPort,
reader,
expectedBlockNumber,
requestByte,
timeoutMs,
retryLimit,
label,
}) {
for (let attempt = 0; attempt < retryLimit; attempt += 1) {
if (requestByte !== undefined) {
await writeAndDrain(serialPort, Buffer.from([requestByte]));
}
let frame;
try {
frame = await readYmodemFrame(reader, timeoutMs);
} catch (error) {
if (error?.code === "YMODEM_TIMEOUT" && attempt < retryLimit - 1) {
continue;
}
throw error;
}
if (frame.type === "eot") {
return frame;
}
if (frame.valid && frame.blockNumber === expectedBlockNumber) {
return frame;
}
if (frame.valid && frame.blockNumber === ((expectedBlockNumber - 1) & 0xff)) {
await writeAndDrain(serialPort, Buffer.from([YMODEM.ACK]));
continue;
}
await writeAndDrain(serialPort, Buffer.from([YMODEM.NAK]));
}
throw new YmodemTransferError(`YMODEM sender did not provide a valid ${label}`, "YMODEM_RETRY_LIMIT");
}
async function receiveYmodemFileData({
serialPort,
reader,
fileHandle,
totalBytes,
timeoutMs,
retryLimit,
onProgress,
}) {
let expectedBlockNumber = 1;
let writtenBytes = 0;
let rejectedPackets = 0;
for (;;) {
let frame;
try {
frame = await readYmodemFrame(reader, timeoutMs);
} catch (error) {
if (error?.code === "YMODEM_TIMEOUT" && rejectedPackets < retryLimit) {
rejectedPackets += 1;
await writeAndDrain(serialPort, Buffer.from([YMODEM.NAK]));
continue;
}
throw error;
}
if (frame.type === "eot") {
await writeAndDrain(serialPort, Buffer.from([YMODEM.NAK]));
const secondEot = await readYmodemFrame(reader, timeoutMs);
if (secondEot.type !== "eot") {
throw new YmodemTransferError("YMODEM sender did not confirm end of file", "YMODEM_EOT_EXPECTED");
}
await writeAndDrain(serialPort, Buffer.from([YMODEM.ACK]));
if (writtenBytes !== totalBytes) {
throw new YmodemTransferError(
`YMODEM received incomplete file (${writtenBytes}/${totalBytes} bytes)`,
"YMODEM_INCOMPLETE_FILE",
);
}
return writtenBytes;
}
if (!frame.valid) {
rejectedPackets += 1;
if (rejectedPackets > retryLimit) {
throw new YmodemTransferError("YMODEM sender sent too many invalid packets", "YMODEM_RETRY_LIMIT");
}
await writeAndDrain(serialPort, Buffer.from([YMODEM.NAK]));
continue;
}
if (frame.blockNumber === ((expectedBlockNumber - 1) & 0xff)) {
await writeAndDrain(serialPort, Buffer.from([YMODEM.ACK]));
continue;
}
if (frame.blockNumber !== expectedBlockNumber) {
rejectedPackets += 1;
if (rejectedPackets > retryLimit) {
throw new YmodemTransferError("YMODEM sender sent packets out of order", "YMODEM_RETRY_LIMIT");
}
await writeAndDrain(serialPort, Buffer.from([YMODEM.NAK]));
continue;
}
const remainingBytes = Math.max(0, totalBytes - writtenBytes);
const bytesToWrite = Math.min(frame.payload.length, remainingBytes);
if (bytesToWrite > 0) {
await fileHandle.write(frame.payload, 0, bytesToWrite);
writtenBytes += bytesToWrite;
}
rejectedPackets = 0;
expectedBlockNumber = (expectedBlockNumber + 1) & 0xff;
await writeAndDrain(serialPort, Buffer.from([YMODEM.ACK]));
onProgress?.({ transferredBytes: writtenBytes, totalBytes, stage: "data" });
}
}
async function receiveYmodemFiles(serialPort, {
destinationDir,
timeoutMs = DEFAULT_TIMEOUT_MS,
retryLimit = DEFAULT_RETRY_LIMIT,
abortSignal,
onProgress,
} = {}) {
if (!serialPort) {
throw new YmodemTransferError("Serial session is not available", "YMODEM_NO_SERIAL");
}
if (!destinationDir || typeof destinationDir !== "string") {
throw new YmodemTransferError("No destination directory selected", "YMODEM_NO_DESTINATION");
}
const resolvedDestinationDir = path.resolve(destinationDir);
let destinationStat;
try {
destinationStat = await fs.promises.stat(resolvedDestinationDir);
} catch (error) {
throw new YmodemTransferError("Selected destination is not a directory", "YMODEM_DESTINATION_NOT_DIRECTORY");
}
if (!destinationStat.isDirectory()) {
throw new YmodemTransferError("Selected destination is not a directory", "YMODEM_DESTINATION_NOT_DIRECTORY");
}
const reader = createSerialByteReader(serialPort, abortSignal);
const files = [];
try {
for (;;) {
const headerFrame = await readYmodemPacketWithRetry({
serialPort,
reader,
expectedBlockNumber: 0,
requestByte: YMODEM.CRC16,
timeoutMs,
retryLimit,
label: "file header",
});
if (headerFrame.type === "eot") {
await writeAndDrain(serialPort, Buffer.from([YMODEM.ACK]));
continue;
}
const fileInfo = parseYmodemFileInfoPayload(headerFrame.payload);
await writeAndDrain(serialPort, Buffer.from([YMODEM.ACK]));
if (!fileInfo) {
break;
}
await writeAndDrain(serialPort, Buffer.from([YMODEM.CRC16]));
onProgress?.({ transferredBytes: 0, totalBytes: fileInfo.totalBytes, stage: "header" });
const filePath = await resolveUniqueDestinationPath(resolvedDestinationDir, fileInfo.fileName);
let fileHandle;
let closeNeeded = false;
let createdFile = false;
try {
fileHandle = await fs.promises.open(filePath, "wx");
closeNeeded = true;
createdFile = true;
const writtenBytes = await receiveYmodemFileData({
serialPort,
reader,
fileHandle,
totalBytes: fileInfo.totalBytes,
timeoutMs,
retryLimit,
onProgress,
});
await fileHandle.close();
closeNeeded = false;
files.push({
fileName: path.basename(filePath),
filePath,
totalBytes: fileInfo.totalBytes,
writtenBytes,
});
} catch (error) {
if (closeNeeded) {
await fileHandle.close().catch(() => {});
}
if (createdFile) {
await fs.promises.rm(filePath, { force: true }).catch(() => {});
}
throw error;
}
}
const totalBytes = files.reduce((sum, file) => sum + file.totalBytes, 0);
const writtenBytes = files.reduce((sum, file) => sum + file.writtenBytes, 0);
return {
files,
fileCount: files.length,
totalBytes,
writtenBytes,
fileName: files[0]?.fileName,
filePath: files[0]?.filePath,
};
} finally {
reader.cleanup();
}
}
async function sendYmodemBuffer(serialPort, {
filename,
buffer,
@@ -355,6 +663,7 @@ module.exports = {
createYmodemFileInfoPacket,
createYmodemDataPackets,
createYmodemEndSessionPacket,
receiveYmodemFiles,
sendYmodemCancel,
sendYmodemBuffer,
sendYmodemFile,

View File

@@ -1,12 +1,16 @@
const test = require("node:test");
const assert = require("node:assert/strict");
const { EventEmitter } = require("node:events");
const fs = require("node:fs");
const os = require("node:os");
const path = require("node:path");
const {
YMODEM,
createYmodemFileInfoPacket,
createYmodemDataPackets,
createYmodemEndSessionPacket,
receiveYmodemFiles,
sendYmodemCancel,
sendYmodemBuffer,
} = require("./ymodemTransfer.cjs");
@@ -200,6 +204,137 @@ test("sends the Tera Term style cancel sequence", async () => {
);
});
test("receives a YMODEM file into the selected directory", async () => {
const targetDir = fs.mkdtempSync(path.join(os.tmpdir(), "netcatty-ymodem-receive-"));
try {
const serial = new FakeSerialPort();
const transfer = receiveYmodemFiles(serial, {
destinationDir: targetDir,
timeoutMs: 200,
});
await waitForWrites(serial, 1);
assert.deepEqual([...serial.writes[0]], [YMODEM.CRC16]);
serial.emit("data", createYmodemFileInfoPacket({
filename: "device.log",
size: 3,
mtime: 0,
}));
await waitForWrites(serial, 3);
assert.deepEqual([...serial.writes[1]], [YMODEM.ACK]);
assert.deepEqual([...serial.writes[2]], [YMODEM.CRC16]);
serial.emit("data", createYmodemDataPackets(Buffer.from("abc"))[0]);
await waitForWrites(serial, 4);
assert.deepEqual([...serial.writes[3]], [YMODEM.ACK]);
serial.emit("data", Buffer.from([YMODEM.EOT]));
await waitForWrites(serial, 5);
assert.deepEqual([...serial.writes[4]], [YMODEM.NAK]);
serial.emit("data", Buffer.from([YMODEM.EOT]));
await waitForWrites(serial, 7);
assert.deepEqual([...serial.writes[5]], [YMODEM.ACK]);
assert.deepEqual([...serial.writes[6]], [YMODEM.CRC16]);
serial.emit("data", createYmodemEndSessionPacket());
await waitForWrites(serial, 8);
assert.deepEqual([...serial.writes[7]], [YMODEM.ACK]);
const result = await transfer;
assert.deepEqual(result, {
files: [{
fileName: "device.log",
filePath: path.join(targetDir, "device.log"),
totalBytes: 3,
writtenBytes: 3,
}],
fileCount: 1,
totalBytes: 3,
writtenBytes: 3,
fileName: "device.log",
filePath: path.join(targetDir, "device.log"),
});
assert.equal(fs.readFileSync(path.join(targetDir, "device.log"), "utf8"), "abc");
assert.equal(serial.listenerCount("data"), 0);
} finally {
fs.rmSync(targetDir, { recursive: true, force: true });
}
});
test("rejects an incomplete received file and removes the partial file", async () => {
const targetDir = fs.mkdtempSync(path.join(os.tmpdir(), "netcatty-ymodem-short-"));
try {
const serial = new FakeSerialPort();
const transfer = receiveYmodemFiles(serial, {
destinationDir: targetDir,
timeoutMs: 200,
});
const rejectedTransfer = assert.rejects(transfer, /incomplete/i);
await waitForWrites(serial, 1);
serial.emit("data", createYmodemFileInfoPacket({
filename: "short.log",
size: 1500,
mtime: 0,
}));
await waitForWrites(serial, 3);
serial.emit("data", createYmodemDataPackets(Buffer.alloc(1024, 0x61))[0]);
await waitForWrites(serial, 4);
serial.emit("data", Buffer.from([YMODEM.EOT]));
await waitForWrites(serial, 5);
serial.emit("data", Buffer.from([YMODEM.EOT]));
await waitForWrites(serial, 6);
await rejectedTransfer;
assert.equal(fs.existsSync(path.join(targetDir, "short.log")), false);
assert.equal(serial.listenerCount("data"), 0);
} finally {
fs.rmSync(targetDir, { recursive: true, force: true });
}
});
test("does not delete an existing file if creating the receive target fails", async () => {
const targetDir = fs.mkdtempSync(path.join(os.tmpdir(), "netcatty-ymodem-race-"));
const targetPath = path.join(targetDir, "race.log");
const originalOpen = fs.promises.open;
try {
fs.promises.open = async (filePath, flags, ...args) => {
if (filePath === targetPath && flags === "wx") {
fs.writeFileSync(targetPath, "existing");
const error = new Error("file exists");
error.code = "EEXIST";
throw error;
}
return originalOpen.call(fs.promises, filePath, flags, ...args);
};
const serial = new FakeSerialPort();
const transfer = receiveYmodemFiles(serial, {
destinationDir: targetDir,
timeoutMs: 200,
});
const rejectedTransfer = assert.rejects(transfer, /file exists/i);
await waitForWrites(serial, 1);
serial.emit("data", createYmodemFileInfoPacket({
filename: "race.log",
size: 3,
mtime: 0,
}));
await waitForWrites(serial, 3);
await rejectedTransfer;
assert.equal(fs.readFileSync(targetPath, "utf8"), "existing");
} finally {
fs.promises.open = originalOpen;
fs.rmSync(targetDir, { recursive: true, force: true });
}
});
function waitForWrites(serial, count) {
return new Promise((resolve, reject) => {
const startedAt = Date.now();

View File

@@ -45,6 +45,9 @@ function createPreloadApi(ctx) {
sendSerialYmodem: async (sessionId, filePath) => {
return ipcRenderer.invoke("netcatty:serial:ymodem-send", { sessionId, filePath });
},
receiveSerialYmodem: async (sessionId, destinationDir) => {
return ipcRenderer.invoke("netcatty:serial:ymodem-receive", { sessionId, destinationDir });
},
getDefaultShell: async () => {
return ipcRenderer.invoke("netcatty:local:defaultShell");
},