From bec2cd99581efd8f6fcc4d3b3f694603ef16061a Mon Sep 17 00:00:00 2001 From: Ben Allfree Date: Sun, 12 Apr 2026 17:05:22 -0700 Subject: [PATCH 1/7] feat: xmodem copy (cp) --- meshtastic/__main__.py | 38 +++++++ meshtastic/node.py | 244 ++++++++++++++++++++++++++++++++++++++++- 2 files changed, 281 insertions(+), 1 deletion(-) diff --git a/meshtastic/__main__.py b/meshtastic/__main__.py index fa3cf1311..1c190c786 100644 --- a/meshtastic/__main__.py +++ b/meshtastic/__main__.py @@ -443,6 +443,31 @@ def onConnected(interface): # Must turn off encryption on primary channel interface.getNode(args.dest, **getNode_kwargs).turnOffEncryptionOnPrimaryChannel() + if args.cp: + closeNow = True + src, dst = args.cp + import os + node = interface.localNode + # Direction: if src is an existing local file → upload; otherwise → download + if os.path.isfile(src): + print(f"Uploading {src} → {dst}") + def _upload_progress(sent, total): + pct = 100 * sent // total + bar = '#' * (pct // 5) + '.' * (20 - pct // 5) + print(f"\r [{bar}] {pct}%", end="", flush=True) + ok = node.uploadFile(src, dst, on_progress=_upload_progress) + print(f"\r {'OK' if ok else 'FAILED'}: {src} → {dst} ") + if not ok: + our_exit("Upload failed", 1) + else: + print(f"Downloading {src} → {dst}") + def _download_progress(received, _total): + print(f"\r {received} bytes received...", end="", flush=True) + ok = node.downloadFile(src, dst, on_progress=_download_progress) + print(f"\r {'OK' if ok else 'FAILED'}: {src} → {dst} ") + if not ok: + our_exit("Download failed", 1) + if args.reboot: closeNow = True waitForAckNak = True @@ -1876,6 +1901,19 @@ def addLocalActionArgs(parser: argparse.ArgumentParser) -> argparse.ArgumentPars default=None ) + group.add_argument( + "--cp", + help=( + "Copy a file to or from the device via XModem. " + "Usage: --cp . " + "If is a local file it is uploaded to on the device. " + "If starts with / it is downloaded from the device to locally. " + "Use /__ext__/ or /__int__/ prefixes to target external or internal flash." + ), + nargs=2, + metavar=("SRC", "DST"), + ) + return parser def addRemoteActionArgs(parser: argparse.ArgumentParser) -> argparse.ArgumentParser: diff --git a/meshtastic/node.py b/meshtastic/node.py index 66b6312ff..24fa8d6b0 100644 --- a/meshtastic/node.py +++ b/meshtastic/node.py @@ -3,11 +3,12 @@ import base64 import logging +import threading import time from typing import Optional, Union, List -from meshtastic.protobuf import admin_pb2, apponly_pb2, channel_pb2, config_pb2, localonly_pb2, mesh_pb2, portnums_pb2 +from meshtastic.protobuf import admin_pb2, apponly_pb2, channel_pb2, config_pb2, localonly_pb2, mesh_pb2, portnums_pb2, xmodem_pb2 from meshtastic.util import ( Timeout, camel_to_snake, @@ -1076,3 +1077,244 @@ def get_channels_with_hash(self): "hash": hash_val, }) return result + + # ── XModem file transfer ─────────────────────────────────────────────────── + + _XMODEM_BUFFER_MAX = 128 # meshtastic_XModem_buffer_t::bytes + _XMODEM_MAX_RETRY = 10 + _XMODEM_TIMEOUT_S = 5.0 + + @staticmethod + def _xmodem_crc16(data: bytes) -> int: + """CRC-16-CCITT matching the firmware's XModemAdapter::crc16_ccitt.""" + crc = 0 + for b in data: + crc = ((crc >> 8) | (crc << 8)) & 0xFFFF + crc ^= b + crc ^= ((crc & 0xFF) >> 4) & 0xFFFF + crc ^= ((crc << 8) << 4) & 0xFFFF + crc ^= (((crc & 0xFF) << 4) << 1) & 0xFFFF + return crc & 0xFFFF + + def _xmodem_send(self, xm: xmodem_pb2.XModem) -> None: + """Wrap an XModem protobuf in ToRadio and send to the device.""" + tr = mesh_pb2.ToRadio() + tr.xmodemPacket.CopyFrom(xm) + self.iface._sendToRadio(tr) + + def _xmodem_roundtrip(self, xm: xmodem_pb2.XModem, + timeout_s: float = _XMODEM_TIMEOUT_S) -> Optional[xmodem_pb2.XModem]: + """Subscribe to xmodempacket, send, then wait for response (subscribe-first to avoid race).""" + from pubsub import pub # type: ignore[import-untyped] + event = threading.Event() + result: list = [None] + + def _on_xmodem(packet, interface): + result[0] = packet + event.set() + + # Subscribe BEFORE sending so we don't miss a fast response + pub.subscribe(_on_xmodem, "meshtastic.xmodempacket") + try: + self._xmodem_send(xm) + event.wait(timeout=timeout_s) + finally: + try: + pub.unsubscribe(_on_xmodem, "meshtastic.xmodempacket") + except Exception: + pass + + return result[0] + + def uploadFile(self, local_path: str, device_path: str, + on_progress=None, timeout_s: float = _XMODEM_TIMEOUT_S) -> bool: + """Upload a local file to the device via XModem. + + Args: + local_path: Path to the local file to upload. + device_path: Destination path on the device. Use ``/__ext__/`` or + ``/__int__/`` prefixes to target external or internal + flash respectively; bare ``/`` paths go to InternalFS. + on_progress: Optional callback ``fn(bytes_sent, total_bytes)``. + timeout_s: Per-packet ACK timeout in seconds. + + Returns: + True on success, False on failure. + + Example:: + + iface.localNode.uploadFile("wordle.bin", "/__ext__/bbs/kb/wordle.bin") + """ + if self.noProto: + logger.warning("uploadFile: protocol disabled (noProto)") + return False + + try: + data = open(local_path, "rb").read() + except OSError as e: + logger.error(f"uploadFile: cannot read {local_path}: {e}") + return False + + XC = xmodem_pb2.XModem + + # SOH seq=0 — filename handshake + xm = xmodem_pb2.XModem() + xm.control = XC.SOH + xm.seq = 0 + xm.buffer = device_path.encode("utf-8")[: self._XMODEM_BUFFER_MAX] + + for attempt in range(self._XMODEM_MAX_RETRY): + resp = self._xmodem_roundtrip(xm, timeout_s) + logger.debug(f"uploadFile: OPEN attempt {attempt+1} resp={resp.control if resp else None}") + if resp and resp.control == XC.ACK: + break + if attempt == self._XMODEM_MAX_RETRY - 1: + logger.error(f"uploadFile: OPEN rejected for {device_path}") + return False + + # STX data packets + seq = 1 + offset = 0 + total = len(data) + while offset < total: + chunk = data[offset: offset + self._XMODEM_BUFFER_MAX] + crc = self._xmodem_crc16(chunk) + + xm = xmodem_pb2.XModem() + xm.control = XC.STX + xm.seq = seq + xm.crc16 = crc + xm.buffer = bytes(chunk) + + acked = False + for retry in range(self._XMODEM_MAX_RETRY): + resp = self._xmodem_roundtrip(xm, timeout_s) + if resp and resp.control == XC.ACK: + acked = True + break + if resp and resp.control == XC.CAN: + logger.error(f"uploadFile: transfer cancelled at offset {offset}") + return False + if not acked: + logger.error(f"uploadFile: no ACK for seq {seq} at offset {offset}") + return False + + offset += len(chunk) + seq = (seq & 0xFF) + 1 + if on_progress: + on_progress(offset, total) + + # EOT + xm = xmodem_pb2.XModem() + xm.control = XC.EOT + for attempt in range(self._XMODEM_MAX_RETRY): + resp = self._xmodem_roundtrip(xm, timeout_s) + if resp and resp.control == XC.ACK: + logger.debug(f"uploadFile: {local_path} → {device_path} complete ({total} bytes)") + return True + logger.error(f"uploadFile: EOT not acknowledged for {device_path}") + return False + + def _xmodem_wait_next(self, timeout_s: float = _XMODEM_TIMEOUT_S) -> Optional[xmodem_pb2.XModem]: + """Wait for the next xmodem packet without sending anything first.""" + from pubsub import pub # type: ignore[import-untyped] + event = threading.Event() + result: list = [None] + + def _cb(packet, interface): + result[0] = packet + event.set() + + pub.subscribe(_cb, "meshtastic.xmodempacket") + try: + event.wait(timeout=timeout_s) + finally: + try: + pub.unsubscribe(_cb, "meshtastic.xmodempacket") + except Exception: + pass + return result[0] + + def downloadFile(self, device_path: str, local_path: str, + on_progress=None, timeout_s: float = _XMODEM_TIMEOUT_S) -> bool: + """Download a file from the device via XModem. + + Args: + device_path: Source path on the device (``/__ext__/``, ``/__int__/``, + or bare ``/`` for InternalFS). + local_path: Destination path on the local filesystem. + on_progress: Optional callback ``fn(bytes_received, total_bytes)``. + ``total_bytes`` is -1 (unknown) during transfer. + timeout_s: Per-packet response timeout in seconds. + + Returns: + True on success, False on failure. + + Example:: + + iface.localNode.downloadFile("/__ext__/bbs/kb/wordle.bin", "wordle.bin") + """ + if self.noProto: + logger.warning("downloadFile: protocol disabled (noProto)") + return False + + XC = xmodem_pb2.XModem + + # STX seq=0 — request device to transmit the file + xm = xmodem_pb2.XModem() + xm.control = XC.STX + xm.seq = 0 + xm.buffer = device_path.encode("utf-8")[: self._XMODEM_BUFFER_MAX] + + chunks: list = [] + expected_seq = 1 + + # Subscribe first, then send, so we don't miss the first response + resp = self._xmodem_roundtrip(xm, timeout_s) + + while True: + if resp is None: + logger.error(f"downloadFile: timeout waiting for data from {device_path}") + return False + + if resp.control == XC.EOT: + # Final ACK — no more packets expected after this + ack = xmodem_pb2.XModem() + ack.control = XC.ACK + self._xmodem_send(ack) + break + + if resp.control in (XC.NAK, XC.CAN): + logger.error(f"downloadFile: device sent error control for {device_path}") + return False + + if resp.control in (XC.SOH, XC.STX): + chunk = bytes(resp.buffer) + if resp.seq == expected_seq and self._xmodem_crc16(chunk) == resp.crc16: + chunks.append(chunk) + if on_progress: + on_progress(sum(len(c) for c in chunks), -1) + ack = xmodem_pb2.XModem() + ack.control = XC.ACK + expected_seq = (expected_seq & 0xFF) + 1 + else: + ack = xmodem_pb2.XModem() + ack.control = XC.NAK + + # Subscribe BEFORE sending ACK/NAK so we don't miss the next packet + resp = self._xmodem_roundtrip(ack, timeout_s) + continue + + # Unexpected control — skip + resp = self._xmodem_wait_next(timeout_s) + + data = b"".join(chunks) + try: + with open(local_path, "wb") as f: + f.write(data) + except OSError as e: + logger.error(f"downloadFile: cannot write {local_path}: {e}") + return False + + logger.debug(f"downloadFile: {device_path} → {local_path} complete ({len(data)} bytes)") + return True From d04fdb80774b0f681c22facfb8f89eb53a5f332c Mon Sep 17 00:00:00 2001 From: Ben Allfree Date: Sun, 12 Apr 2026 21:32:24 -0700 Subject: [PATCH 2/7] feat: add round-trip XModem testing script for firmware path prefixes --- scripts/test_prefix_routing.py | 523 +++++++++++++++++++++++++++++++++ 1 file changed, 523 insertions(+) create mode 100644 scripts/test_prefix_routing.py diff --git a/scripts/test_prefix_routing.py b/scripts/test_prefix_routing.py new file mode 100644 index 000000000..973a8c802 --- /dev/null +++ b/scripts/test_prefix_routing.py @@ -0,0 +1,523 @@ +#!/usr/bin/env python3 +""" +Round-trip XModem tests for firmware path prefixes /__int__/ and /__ext__/. + +Run one device at a time: connect only the target radio, then pass --device and +either --port (serial) or --host (TCP). Do not rely on auto port discovery. + +Examples: + python scripts/test_prefix_routing.py --device tdeck --port /dev/ttyUSB0 + python scripts/test_prefix_routing.py --device tdeck --port /dev/ttyUSB0 --trace-xmodem --verbose + python scripts/test_prefix_routing.py --device rak4631 --port COM5 + python scripts/test_prefix_routing.py --device techo --host 192.168.1.50 + +Run all suites in order (pauses for you to swap USB between devices): + python scripts/test_prefix_routing.py --all --port /dev/ttyACM0 --pause-between + +Device firmware logs on the same USB link (no second serial client): + python scripts/test_prefix_routing.py --device tdeck --port /dev/ttyUSB0 --device-log + Requires the node setting security.debug_log_api_enabled (enable once in the app / admin). + +Failure triage (for firmware fixes): + - Path mount / fsRoute / extFS init issues -> branch nrf-external-flash + - XModem state (truncate, ACK/NAK, wrong path on remove) -> branch xmodem-external-flash + +If OPEN hangs, the device is not returning an xmodem frame on the wire (meshtastic-python retries +up to 10 times per step). Default per-try timeout is short so a dead link fails fast; raise it on +noisy links: `--xmodem-timeout 15`. Use `--verbose` for DEBUG lines from uploadFile/downloadFile. +""" + +from __future__ import annotations + +import argparse +import hashlib +import logging +import json +import os +import sys +import tempfile +import threading +import time +from dataclasses import dataclass, asdict +from pathlib import Path +from typing import Any, Literal + +# meshtastic-python repo root (parent of `meshtastic/` package) +_PY_ROOT = Path(__file__).resolve().parent.parent +if str(_PY_ROOT) not in sys.path: + sys.path.insert(0, str(_PY_ROOT)) + +from meshtastic.serial_interface import SerialInterface # noqa: E402 +from meshtastic.tcp_interface import TCPInterface # noqa: E402 + +DeviceId = Literal["tdeck", "rak4631", "techo"] + +DEVICE_ORDER: tuple[DeviceId, ...] = ("tdeck", "rak4631", "techo") + +DEVICE_LABELS: dict[DeviceId, str] = { + "tdeck": "LilyGO T-Deck", + "rak4631": "RAK4631 (stock / no ext LittleFS mount)", + "techo": "T-Echo (external QSPI LittleFS when enabled in firmware)", +} + + +@dataclass +class StepResult: + device: str + prefix: str + direction: str + device_path: str + bytes_count: int + sha256_expected: str + sha256_got: str + duration_s: float + ok: bool + error: str | None = None + + +@dataclass +class SuiteResult: + device: str + ok: bool + steps: list[StepResult] + + +def _sha256(data: bytes) -> str: + return hashlib.sha256(data).hexdigest() + + +def _make_payload(size: int, seed: int = 0x4D455348) -> bytes: + """Deterministic pseudo-random payload (repeatable across runs).""" + out = bytearray(size) + state = seed & 0xFFFFFFFF + for i in range(size): + # LCG + state = (1103515245 * state + 12345) & 0xFFFFFFFF + out[i] = state & 0xFF + return bytes(out) + + +def _device_paths(device: DeviceId) -> tuple[str, str]: + base = f"/__int__/meshforge-test/{device}/int.bin" + ext = f"/__ext__/meshforge-test/{device}/ext.bin" + return base, ext + + +def _prompt(msg: str, skip: bool) -> None: + if skip: + print(f"[non-interactive] {msg}") + return + try: + input(f"{msg}\nPress Enter to continue… ") + except EOFError: + print("(EOF — continuing)") + + +def _connect_heartbeat(stop: threading.Event) -> None: + """Print every few seconds until stop is set (covers SerialInterface waitForConfig).""" + elapsed = 0 + interval = 3.0 + while not stop.wait(timeout=interval): + elapsed += int(interval) + print( + f" … API handshake still running (~{elapsed}s) — normal on slow USB / large nodedb; " + "try --verbose for library DEBUG", + flush=True, + ) + + +def _open_interface( + port: str | None, + host: str | None, + tcp_port: int, + timeout: int, + *, + device_log: bool = False, +) -> SerialInterface | TCPInterface: + if port and host: + raise SystemExit("Use only one of --port or --host.") + if not port and not host: + raise SystemExit( + "You must specify exactly one transport: --port SERIAL or --host HOST.\n" + "Auto port scan is disabled for this harness so the wrong device is never touched." + ) + # Routes FromRadio.log_record -> meshtastic.log.line -> stdout (needs debug_log_api_enabled on device). + dbg = sys.stdout if device_log else None + if host: + return TCPInterface(hostname=host, portNumber=tcp_port, timeout=timeout, debugOut=dbg) + return SerialInterface(devPath=port, timeout=timeout, debugOut=dbg) + + +def _run_prefix_roundtrip( + iface: Any, + device: DeviceId, + prefix_name: str, + device_path: str, + payload: bytes, + timeout_s: float, +) -> StepResult: + node = iface.localNode + expected = _sha256(payload) + t0 = time.perf_counter() + + tmp_up = tempfile.NamedTemporaryFile(delete=False, suffix=".bin") + tmp_up.close() + tmp_down = tempfile.NamedTemporaryFile(delete=False, suffix=".bin") + tmp_down.close() + try: + with open(tmp_up.name, "wb") as f: + f.write(payload) + + print( + f" (upload OPEN: meshtastic-python waits up to {timeout_s:g}s per try, max 10 tries — not frozen)", + flush=True, + ) + ok_up = node.uploadFile(tmp_up.name, device_path, timeout_s=timeout_s) + if not ok_up: + return StepResult( + device=device, + prefix=prefix_name, + direction="upload", + device_path=device_path, + bytes_count=len(payload), + sha256_expected=expected, + sha256_got="", + duration_s=time.perf_counter() - t0, + ok=False, + error="uploadFile returned False", + ) + + ok_down = node.downloadFile(device_path, tmp_down.name, timeout_s=timeout_s) + if not ok_down: + return StepResult( + device=device, + prefix=prefix_name, + direction="download", + device_path=device_path, + bytes_count=len(payload), + sha256_expected=expected, + sha256_got="", + duration_s=time.perf_counter() - t0, + ok=False, + error="downloadFile returned False", + ) + + with open(tmp_down.name, "rb") as f: + got = f.read() + got_hash = _sha256(got) + if got != payload: + return StepResult( + device=device, + prefix=prefix_name, + direction="verify", + device_path=device_path, + bytes_count=len(payload), + sha256_expected=expected, + sha256_got=got_hash, + duration_s=time.perf_counter() - t0, + ok=False, + error=f"size local={len(payload)} got={len(got)}", + ) + + return StepResult( + device=device, + prefix=prefix_name, + direction="roundtrip", + device_path=device_path, + bytes_count=len(payload), + sha256_expected=expected, + sha256_got=got_hash, + duration_s=time.perf_counter() - t0, + ok=True, + error=None, + ) + finally: + for p in (tmp_up.name, tmp_down.name): + try: + os.unlink(p) + except OSError: + pass + + +def run_suite( + device: DeviceId, + port: str | None, + host: str | None, + tcp_port: int, + payload_size: int, + iface_timeout: int, + xmodem_timeout: float, + skip_prompt: bool, + device_log: bool = False, +) -> SuiteResult: + label = DEVICE_LABELS[device] + print("\n" + "=" * 72) + print(f"SUITE: {device} — {label}") + print("Connect ONLY this device for the duration of this suite.") + _prompt(f"Ready when {label} is connected.", skip_prompt) + + print( + " Opening transport + Meshtastic API handshake (serial open, want_config, nodedb). " + "This phase can take 10–40s on some boards…", + flush=True, + ) + if device_log: + print( + " --device-log: printing device LogRecord lines to stdout (enable security.debug_log_api_enabled on the node).", + flush=True, + ) + + iface: SerialInterface | TCPInterface | None = None + steps: list[StepResult] = [] + try: + stop_hb = threading.Event() + hb_thread = threading.Thread(target=_connect_heartbeat, args=(stop_hb,), daemon=True) + hb_thread.start() + t_link = time.perf_counter() + open_err: list[Exception] = [] + try: + iface = _open_interface(port, host, tcp_port, iface_timeout, device_log=device_log) + except Exception as exc: # pylint: disable=broad-except + open_err.append(exc) + finally: + stop_hb.set() + hb_thread.join(timeout=2.0) + link_dt = time.perf_counter() - t_link + + if open_err: + err = f"could not open interface: {open_err[0]}" + print(f" FAIL connect ({link_dt:.1f}s) {err}", flush=True) + steps.append( + StepResult( + device=device, + prefix="(none)", + direction="open", + device_path="", + bytes_count=0, + sha256_expected="(n/a)", + sha256_got="(n/a)", + duration_s=0.0, + ok=False, + error=err, + ) + ) + print(f"\nYou may disconnect {label} now (open failed).") + _prompt("Disconnect complete?", skip_prompt) + return SuiteResult(device=device, ok=False, steps=steps) + + print(f" Transport + handshake finished in {link_dt:.1f}s", flush=True) + + mi = getattr(iface, "myInfo", None) + nn = getattr(mi, "my_node_num", None) if mi is not None else None + print(f" API session ready (my_node_num={nn!r}). Starting transfers…", flush=True) + + payload = _make_payload(payload_size) + int_path, ext_path = _device_paths(device) + for prefix_name, path in (("__int__", int_path), ("__ext__", ext_path)): + print(f"\n--- {prefix_name}: {path} ({len(payload)} bytes) ---") + r = _run_prefix_roundtrip(iface, device, prefix_name, path, payload, xmodem_timeout) + steps.append(r) + status = "PASS" if r.ok else "FAIL" + print(f" {status} {r.direction} {r.duration_s:.2f}s sha256={r.sha256_expected[:16]}…") + if not r.ok: + print(f" error: {r.error}") + finally: + if iface is not None: + try: + iface.close() + except Exception: # pylint: disable=broad-except + pass + + print(f"\nYou may disconnect {label} now.") + _prompt("Disconnect complete?", skip_prompt) + + suite_ok = all(s.ok for s in steps) + return SuiteResult(device=device, ok=suite_ok, steps=steps) + + +# ── Optional XModem round-trip tracing (monkeypatch Node._xmodem_roundtrip) ─── + +_trace_xmodem_installed = False +_orig_node_xmodem_roundtrip: Any = None + + +def install_xmodem_trace() -> None: + """Print each XModem ToRadio send and FromRadio response (or timeout).""" + global _trace_xmodem_installed, _orig_node_xmodem_roundtrip + if _trace_xmodem_installed: + return + from meshtastic.node import Node + + _orig_node_xmodem_roundtrip = Node._xmodem_roundtrip + + def _wrapped(self: Any, xm: Any, timeout_s: float = Node._XMODEM_TIMEOUT_S) -> Any: + t0 = time.perf_counter() + bl = len(xm.buffer) if xm.buffer else 0 + preview = (bytes(xm.buffer)[: min(72, bl)] if bl else b"").decode("utf-8", errors="replace") + print( + f" [xmodem] tx control={int(xm.control)} seq={int(xm.seq)} buf_len={bl} " + f"timeout={timeout_s:g}s preview={preview!r}", + flush=True, + ) + assert _orig_node_xmodem_roundtrip is not None + resp = _orig_node_xmodem_roundtrip(self, xm, timeout_s) + dt = time.perf_counter() - t0 + if resp is None: + print(f" [xmodem] rx (no response) after {dt:.2f}s", flush=True) + else: + br = len(resp.buffer) if resp.buffer else 0 + print( + f" [xmodem] rx control={int(resp.control)} seq={int(resp.seq)} buf_len={br} ({dt:.2f}s)", + flush=True, + ) + return resp + + Node._xmodem_roundtrip = _wrapped # type: ignore[method-assign] + _trace_xmodem_installed = True + + +def uninstall_xmodem_trace() -> None: + global _trace_xmodem_installed, _orig_node_xmodem_roundtrip + if not _trace_xmodem_installed or _orig_node_xmodem_roundtrip is None: + return + from meshtastic.node import Node + + Node._xmodem_roundtrip = _orig_node_xmodem_roundtrip # type: ignore[method-assign] + _orig_node_xmodem_roundtrip = None + _trace_xmodem_installed = False + + +def _emit_json(results: list[SuiteResult]) -> None: + def ser(obj: Any) -> Any: + if isinstance(obj, StepResult): + return asdict(obj) + if isinstance(obj, SuiteResult): + return {"device": obj.device, "ok": obj.ok, "steps": [asdict(s) for s in obj.steps]} + raise TypeError(type(obj)) + + print(json.dumps([ser(r) for r in results], indent=2)) + + +def main() -> int: + p = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) + p.add_argument( + "--device", + choices=("tdeck", "rak4631", "techo"), + help="Which device-specific suite to run (one physical radio at a time).", + ) + p.add_argument( + "--all", + action="store_true", + help="Run tdeck, then rak4631, then techo in order. Use --pause-between to swap USB between suites.", + ) + p.add_argument("--port", help="Serial device path (e.g. /dev/ttyACM0). Mutually exclusive with --host.") + p.add_argument("--host", help="TCP hostname or IP for meshtasticd / TCP bridge.") + p.add_argument("--tcp-port", type=int, default=4403, help="TCP port when using --host (default 4403).") + p.add_argument("--size", type=int, default=384, help="Payload size in bytes (default 384 = 3 xmodem chunks).") + p.add_argument("--iface-timeout", type=int, default=300, help="StreamInterface timeout seconds (default 300).") + p.add_argument( + "--xmodem-timeout", + type=float, + default=4.0, + help="Per-call XModem timeout for uploadFile/downloadFile (default 4; increase on flaky USB).", + ) + p.add_argument( + "--pause-between", + action="store_true", + help="With --all: pause for Enter between device suites (recommended when swapping USB).", + ) + p.add_argument( + "--skip-prompt", + action="store_true", + help="Non-interactive: print prompts but do not wait for Enter.", + ) + p.add_argument("--json", action="store_true", help="Print machine-readable JSON summary at end.") + p.add_argument( + "--verbose", + action="store_true", + help="Enable DEBUG logging for meshtastic.* (serial, mesh, node, xmodem retries).", + ) + p.add_argument( + "--trace-xmodem", + action="store_true", + help="Print every XModem round-trip (tx control/seq, rx or timeout). Implies progress during OPEN.", + ) + p.add_argument( + "--device-log", + action="store_true", + help="Print device firmware logs on stdout over the same API link (needs security.debug_log_api_enabled on the node).", + ) + + args = p.parse_args() + if args.verbose: + logging.basicConfig(level=logging.DEBUG, format="%(levelname)s %(name)s: %(message)s") + for _logname in ( + "meshtastic", + "meshtastic.node", + "meshtastic.mesh_interface", + "meshtastic.stream_interface", + "meshtastic.serial_interface", + ): + logging.getLogger(_logname).setLevel(logging.DEBUG) + if args.all and args.device: + p.error("Do not pass --device together with --all.") + if not args.all and not args.device: + p.error("Pass --device or use --all.") + if bool(args.port) == bool(args.host): + p.error("Specify exactly one of --port SERIAL or --host HOST (not both, not neither).") + + if args.size < 1: + p.error("--size must be >= 1") + + devices: tuple[DeviceId, ...] = DEVICE_ORDER if args.all else (args.device,) # type: ignore[assignment] + + print("Prefix routing XModem harness") + print(" Transport:", "--host " + args.host if args.host else "--port " + (args.port or "")) + print(" Payload size:", args.size, "bytes") + if args.trace_xmodem: + print(" --trace-xmodem: printing each XModem tx/rx line", flush=True) + if args.device_log: + print(" --device-log: firmware LogRecord -> stdout (same USB session)", flush=True) + + if args.trace_xmodem: + install_xmodem_trace() + try: + results: list[SuiteResult] = [] + for i, dev in enumerate(devices): + if args.all and i > 0 and args.pause_between and not args.skip_prompt: + _prompt(f"Swap USB: next suite is `{dev}` ({DEVICE_LABELS[dev]}).", args.skip_prompt) + r = run_suite( + device=dev, + port=args.port, + host=args.host, + tcp_port=args.tcp_port, + payload_size=args.size, + iface_timeout=args.iface_timeout, + xmodem_timeout=args.xmodem_timeout, + skip_prompt=args.skip_prompt, + device_log=args.device_log, + ) + results.append(r) + line = "PASS" if r.ok else "FAIL" + print(f"\n>>> {dev}: {line} <<<") + + if args.json: + _emit_json(results) + + print("\n" + "=" * 72) + print("SUMMARY (copy/paste)") + for r in results: + print(f" {r.device}: {'PASS' if r.ok else 'FAIL'}") + any_fail = any(not r.ok for r in results) + if any_fail: + print("\nTriage:") + print(" FS routing / extFS / mount -> nrf-external-flash") + print(" XModem truncate / ACK path / wrong remove path -> xmodem-external-flash") + return 1 if any_fail else 0 + finally: + if args.trace_xmodem: + uninstall_xmodem_trace() + + +if __name__ == "__main__": + raise SystemExit(main()) From 88ebd98c280424621cd83a11aa9948674024d716 Mon Sep 17 00:00:00 2001 From: Ben Allfree Date: Sun, 12 Apr 2026 21:53:30 -0700 Subject: [PATCH 3/7] feat: add unit tests for XModem --- meshtastic/node.py | 76 +++++++++ meshtastic/tests/test_node.py | 283 +++++++++++++++++++++++++++++++++- 2 files changed, 358 insertions(+), 1 deletion(-) diff --git a/meshtastic/node.py b/meshtastic/node.py index 24fa8d6b0..98b632307 100644 --- a/meshtastic/node.py +++ b/meshtastic/node.py @@ -1083,6 +1083,7 @@ def get_channels_with_hash(self): _XMODEM_BUFFER_MAX = 128 # meshtastic_XModem_buffer_t::bytes _XMODEM_MAX_RETRY = 10 _XMODEM_TIMEOUT_S = 5.0 + _MFLIST_PREFIX = "MFLIST " @staticmethod def _xmodem_crc16(data: bytes) -> int: @@ -1318,3 +1319,78 @@ def downloadFile(self, device_path: str, local_path: str, logger.debug(f"downloadFile: {device_path} → {local_path} complete ({len(data)} bytes)") return True + + def listDir(self, device_path: str, depth: int = 0, timeout_s: float = _XMODEM_TIMEOUT_S): + """List files on the device under ``device_path`` via XMODEM ``MFLIST`` (matching firmware). + + Args: + device_path: Directory on the device (``/__ext__/``, ``/__int__/``, or bare ``/``). + depth: Recursion depth (0 = files in that directory only; each increment adds one tree level). + timeout_s: Per-packet timeout. + + Returns: + List of ``(path, size_bytes)`` for each file, or ``None`` on failure. + Lines starting with ``#`` in the payload are ignored (comments / truncation markers). + """ + if self.noProto: + logger.warning("listDir: protocol disabled (noProto)") + return None + + d = max(0, min(255, int(depth))) + cmd = f"{self._MFLIST_PREFIX}{device_path} {d}".encode("utf-8")[: self._XMODEM_BUFFER_MAX] + XC = xmodem_pb2.XModem + + xm = xmodem_pb2.XModem() + xm.control = XC.SOH + xm.seq = 0 + xm.buffer = bytes(cmd) + + chunks: list = [] + expected_seq = 1 + resp = self._xmodem_roundtrip(xm, timeout_s) + + while True: + if resp is None: + logger.error(f"listDir: timeout waiting for data from {device_path}") + return None + + if resp.control == XC.EOT: + ack = xmodem_pb2.XModem() + ack.control = XC.ACK + self._xmodem_send(ack) + break + + if resp.control in (XC.NAK, XC.CAN): + logger.error(f"listDir: device rejected or cancelled for {device_path}") + return None + + if resp.control in (XC.SOH, XC.STX): + chunk = bytes(resp.buffer) + if resp.seq == expected_seq and self._xmodem_crc16(chunk) == resp.crc16: + chunks.append(chunk) + ack = xmodem_pb2.XModem() + ack.control = XC.ACK + expected_seq = (expected_seq & 0xFF) + 1 + else: + ack = xmodem_pb2.XModem() + ack.control = XC.NAK + + resp = self._xmodem_roundtrip(ack, timeout_s) + continue + + resp = self._xmodem_wait_next(timeout_s) + + raw = b"".join(chunks).decode("utf-8", errors="replace") + out: list = [] + for line in raw.splitlines(): + line = line.strip() + if not line or line.startswith("#"): + continue + if "\t" not in line: + continue + path, sz = line.split("\t", 1) + try: + out.append((path, int(sz))) + except ValueError: + logger.debug("listDir: skip unparsable line %r", line) + return out diff --git a/meshtastic/tests/test_node.py b/meshtastic/tests/test_node.py index 986c1783c..5d8b41b5f 100644 --- a/meshtastic/tests/test_node.py +++ b/meshtastic/tests/test_node.py @@ -7,7 +7,7 @@ import pytest -from ..protobuf import admin_pb2, localonly_pb2, config_pb2 +from ..protobuf import admin_pb2, localonly_pb2, config_pb2, xmodem_pb2 from ..protobuf.channel_pb2 import Channel # pylint: disable=E0611 from ..node import Node from ..serial_interface import SerialInterface @@ -1610,6 +1610,287 @@ def test_start_ota_remote_node_raises_error(): ) +@pytest.mark.unit +def test_node_xmodem_crc16_known_vectors(): + """Regression vectors for CRC-16-CCITT matching firmware XModem.""" + assert Node._xmodem_crc16(b"") == 0 + assert Node._xmodem_crc16(b"a") == 31879 + assert Node._xmodem_crc16(b"hello") == 50018 + assert Node._xmodem_crc16(b"x" * 128) == 33239 + + +@pytest.mark.unit +def test_node_upload_file_xmodem_happy_path(tmp_path): + """uploadFile: OPEN (SOH/0), one STX data packet, EOT — all ACKed.""" + iface = MagicMock(autospec=MeshInterface) + anode = Node(iface, 1234567890, noProto=False) + XC = xmodem_pb2.XModem + device_path = "/__ext__/t.bin" + payload = b"hello" + src = tmp_path / "src.bin" + src.write_bytes(payload) + + def fake_roundtrip(xm, timeout_s=5.0): + if xm.control == XC.SOH and xm.seq == 0: + assert bytes(xm.buffer) == device_path.encode("utf-8") + p = xmodem_pb2.XModem() + p.control = XC.ACK + return p + if xm.control == XC.STX and xm.seq == 1: + assert bytes(xm.buffer) == payload + assert xm.crc16 == Node._xmodem_crc16(payload) + p = xmodem_pb2.XModem() + p.control = XC.ACK + return p + if xm.control == XC.EOT: + p = xmodem_pb2.XModem() + p.control = XC.ACK + return p + return None + + with patch.object(anode, "_xmodem_roundtrip", side_effect=fake_roundtrip): + with patch.object(anode, "_xmodem_send"): + assert anode.uploadFile(str(src), device_path) is True + + +@pytest.mark.unit +def test_node_upload_file_xmodem_two_chunks(tmp_path): + """uploadFile spans two STX packets when payload is larger than buffer max (128).""" + iface = MagicMock(autospec=MeshInterface) + anode = Node(iface, 1234567890, noProto=False) + XC = xmodem_pb2.XModem + device_path = "/__int__/big.bin" + payload = b"Z" * 129 + c0, c1 = payload[:128], payload[128:] + src = tmp_path / "src.bin" + src.write_bytes(payload) + + def fake_roundtrip(xm, timeout_s=5.0): + if xm.control == XC.SOH and xm.seq == 0: + p = xmodem_pb2.XModem() + p.control = XC.ACK + return p + if xm.control == XC.STX and xm.seq == 1: + assert bytes(xm.buffer) == c0 + assert xm.crc16 == Node._xmodem_crc16(c0) + p = xmodem_pb2.XModem() + p.control = XC.ACK + return p + if xm.control == XC.STX and xm.seq == 2: + assert bytes(xm.buffer) == c1 + assert xm.crc16 == Node._xmodem_crc16(c1) + p = xmodem_pb2.XModem() + p.control = XC.ACK + return p + if xm.control == XC.EOT: + p = xmodem_pb2.XModem() + p.control = XC.ACK + return p + return None + + with patch.object(anode, "_xmodem_roundtrip", side_effect=fake_roundtrip): + with patch.object(anode, "_xmodem_send"): + assert anode.uploadFile(str(src), device_path) is True + + +@pytest.mark.unit +def test_node_upload_file_open_rejected(tmp_path): + """uploadFile returns False when device never ACKs OPEN.""" + iface = MagicMock(autospec=MeshInterface) + anode = Node(iface, 1234567890, noProto=False) + XC = xmodem_pb2.XModem + src = tmp_path / "src.bin" + src.write_bytes(b"x") + + def fake_roundtrip(xm, timeout_s=5.0): + if xm.control == XC.SOH and xm.seq == 0: + p = xmodem_pb2.XModem() + p.control = XC.NAK + return p + return None + + with patch.object(anode, "_xmodem_roundtrip", side_effect=fake_roundtrip): + with patch.object(anode, "_xmodem_send"): + assert anode.uploadFile(str(src), "/x.bin") is False + + +@pytest.mark.unit +def test_node_download_file_xmodem_happy_path(tmp_path): + """downloadFile: request STX/0, receive data STX/1..n, then EOT.""" + iface = MagicMock(autospec=MeshInterface) + anode = Node(iface, 1234567890, noProto=False) + XC = xmodem_pb2.XModem + device_path = "/__ext__/r.bin" + payload = b"hi" + dst = tmp_path / "out.bin" + phase = 0 + + def fake_roundtrip(xm, timeout_s=5.0): + nonlocal phase + if phase == 0: + phase += 1 + assert xm.control == XC.STX and xm.seq == 0 + assert bytes(xm.buffer) == device_path.encode("utf-8") + p = xmodem_pb2.XModem() + p.control = XC.STX + p.seq = 1 + p.buffer = payload + p.crc16 = Node._xmodem_crc16(payload) + return p + if phase == 1: + phase += 1 + assert xm.control == XC.ACK + p = xmodem_pb2.XModem() + p.control = XC.EOT + return p + return None + + with patch.object(anode, "_xmodem_roundtrip", side_effect=fake_roundtrip): + with patch.object(anode, "_xmodem_send"): + assert anode.downloadFile(device_path, str(dst)) is True + assert dst.read_bytes() == payload + + +@pytest.mark.unit +def test_node_download_file_two_chunks(tmp_path): + """downloadFile reassembles multiple STX payloads before EOT.""" + iface = MagicMock(autospec=MeshInterface) + anode = Node(iface, 1234567890, noProto=False) + XC = xmodem_pb2.XModem + device_path = "/p.bin" + c0, c1 = b"A" * 128, b"B" * 10 + payload = c0 + c1 + dst = tmp_path / "out.bin" + phase = 0 + + def fake_roundtrip(xm, timeout_s=5.0): + nonlocal phase + if phase == 0: + phase += 1 + assert xm.control == XC.STX and xm.seq == 0 + p = xmodem_pb2.XModem() + p.control = XC.STX + p.seq = 1 + p.buffer = c0 + p.crc16 = Node._xmodem_crc16(c0) + return p + if phase == 1: + phase += 1 + assert xm.control == XC.ACK + p = xmodem_pb2.XModem() + p.control = XC.STX + p.seq = 2 + p.buffer = c1 + p.crc16 = Node._xmodem_crc16(c1) + return p + if phase == 2: + phase += 1 + assert xm.control == XC.ACK + p = xmodem_pb2.XModem() + p.control = XC.EOT + return p + return None + + with patch.object(anode, "_xmodem_roundtrip", side_effect=fake_roundtrip): + with patch.object(anode, "_xmodem_send"): + assert anode.downloadFile(device_path, str(dst)) is True + assert dst.read_bytes() == payload + + +@pytest.mark.unit +def test_node_listdir_parses_mflist_payload(): + """listDir sends MFLIST, collects SOH chunks, parses path\\tsize lines.""" + iface = MagicMock(autospec=MeshInterface) + anode = Node(iface, 1234567890, noProto=False) + XC = xmodem_pb2.XModem + + def fake_roundtrip(xm, timeout_s=5.0): + if xm.control == XC.SOH and xm.seq == 0: + assert bytes(xm.buffer) == b"MFLIST / 0" + chunk = b"/a.txt\t10\n/b.bin\t3\n" + p = xmodem_pb2.XModem() + p.control = XC.SOH + p.seq = 1 + p.buffer = chunk + p.crc16 = Node._xmodem_crc16(chunk) + return p + if xm.control == XC.ACK: + p = xmodem_pb2.XModem() + p.control = XC.EOT + return p + return None + + with patch.object(anode, "_xmodem_roundtrip", side_effect=fake_roundtrip): + with patch.object(anode, "_xmodem_send"): + rows = anode.listDir("/", depth=0) + assert rows == [("/a.txt", 10), ("/b.bin", 3)] + + +@pytest.mark.unit +def test_node_listdir_skips_comments_and_bad_lines(): + """listDir ignores # lines, lines without tab, and non-integer sizes.""" + iface = MagicMock(autospec=MeshInterface) + anode = Node(iface, 1234567890, noProto=False) + XC = xmodem_pb2.XModem + chunk = ( + b"# meta\n" + b"/ok.txt\t1\n" + b"no-tab-field\n" + b"/badsz\txx\n" + b"/good.bin\t99\n" + ) + + def fake_roundtrip(xm, timeout_s=5.0): + if xm.control == XC.SOH and xm.seq == 0: + p = xmodem_pb2.XModem() + p.control = XC.SOH + p.seq = 1 + p.buffer = chunk + p.crc16 = Node._xmodem_crc16(chunk) + return p + if xm.control == XC.ACK: + p = xmodem_pb2.XModem() + p.control = XC.EOT + return p + return None + + with patch.object(anode, "_xmodem_roundtrip", side_effect=fake_roundtrip): + with patch.object(anode, "_xmodem_send"): + rows = anode.listDir("/__ext__", depth=0) + assert rows == [("/ok.txt", 1), ("/good.bin", 99)] + + +@pytest.mark.unit +def test_node_listdir_depth_clamped_to_byte_range(): + """listDir clamps depth to 0..255 in the MFLIST command.""" + iface = MagicMock(autospec=MeshInterface) + anode = Node(iface, 1234567890, noProto=False) + XC = xmodem_pb2.XModem + first_cmd: list[bytes] = [] + + def fake_roundtrip(xm, timeout_s=5.0): + if xm.control == XC.SOH and xm.seq == 0: + first_cmd.append(bytes(xm.buffer)) + p = xmodem_pb2.XModem() + p.control = XC.SOH + p.seq = 1 + p.buffer = b"/x\t0\n" + p.crc16 = Node._xmodem_crc16(p.buffer) + return p + if xm.control == XC.ACK: + p = xmodem_pb2.XModem() + p.control = XC.EOT + return p + return None + + with patch.object(anode, "_xmodem_roundtrip", side_effect=fake_roundtrip): + with patch.object(anode, "_xmodem_send"): + anode.listDir("/mount", depth=300) + anode.listDir("/mount", depth=-5) + assert first_cmd[0] == b"MFLIST /mount 255" + assert first_cmd[1] == b"MFLIST /mount 0" + + # TODO # @pytest.mark.unitslow # def test_waitForConfig(): From 8fa1bed97796f76760e2597ad0fd1ef96166d9f8 Mon Sep 17 00:00:00 2001 From: Ben Allfree Date: Sun, 12 Apr 2026 22:07:16 -0700 Subject: [PATCH 4/7] feat: --ls --- meshtastic/__main__.py | 34 ++++- scripts/test_prefix_routing.py | 242 ++++++++++++++++++++++++++++++++- 2 files changed, 268 insertions(+), 8 deletions(-) diff --git a/meshtastic/__main__.py b/meshtastic/__main__.py index 1c190c786..a67a3ad7a 100644 --- a/meshtastic/__main__.py +++ b/meshtastic/__main__.py @@ -443,6 +443,17 @@ def onConnected(interface): # Must turn off encryption on primary channel interface.getNode(args.dest, **getNode_kwargs).turnOffEncryptionOnPrimaryChannel() + if args.ls is not None: + closeNow = True + remote_dir = args.ls + depth = int(getattr(args, "ls_depth", 0) or 0) + node = interface.localNode + rows = node.listDir(remote_dir, depth=depth) + if rows is None: + our_exit("listDir failed", 1) + for path, sz in rows: + print(f"{sz}\t{path}") + if args.cp: closeNow = True src, dst = args.cp @@ -1906,14 +1917,33 @@ def addLocalActionArgs(parser: argparse.ArgumentParser) -> argparse.ArgumentPars help=( "Copy a file to or from the device via XModem. " "Usage: --cp . " - "If is a local file it is uploaded to on the device. " - "If starts with / it is downloaded from the device to locally. " + "If is an existing local file it is uploaded to on the device. " + "Otherwise is treated as a device path and downloaded to local . " "Use /__ext__/ or /__int__/ prefixes to target external or internal flash." ), nargs=2, metavar=("SRC", "DST"), ) + group.add_argument( + "--ls", + help=( + "List files on the device under REMOTE_DIR via XMODEM MFLIST (requires matching firmware). " + "Output: size_bytespath (one per line)." + ), + nargs="?", + const="/", + default=None, + metavar="REMOTE_DIR", + ) + + group.add_argument( + "--ls-depth", + help="Max directory depth for --ls (0 = files in REMOTE_DIR only).", + type=int, + default=0, + ) + return parser def addRemoteActionArgs(parser: argparse.ArgumentParser) -> argparse.ArgumentParser: diff --git a/scripts/test_prefix_routing.py b/scripts/test_prefix_routing.py index 973a8c802..45ddb30d3 100644 --- a/scripts/test_prefix_routing.py +++ b/scripts/test_prefix_routing.py @@ -2,6 +2,11 @@ """ Round-trip XModem tests for firmware path prefixes /__int__/ and /__ext__/. +After each successful upload+download round-trip, the harness runs ``listDir`` +(MFLIST) on the parent directory and checks that the test file path appears in +the listing (requires firmware with MFLIST support). Use ``--skip-listdir`` to +disable that step (e.g. older firmware). + Run one device at a time: connect only the target radio, then pass --device and either --port (serial) or --host (TCP). Do not rely on auto port discovery. @@ -38,7 +43,7 @@ import tempfile import threading import time -from dataclasses import dataclass, asdict +from dataclasses import dataclass, asdict, field from pathlib import Path from typing import Any, Literal @@ -75,11 +80,26 @@ class StepResult: error: str | None = None +@dataclass +class ListStepResult: + """Result of verifying an upload via MFLIST ``listDir`` on the parent directory.""" + + device: str + prefix: str + dir_path: str + expect_path: str + row_count: int + duration_s: float + ok: bool + error: str | None = None + + @dataclass class SuiteResult: device: str ok: bool steps: list[StepResult] + list_steps: list[ListStepResult] = field(default_factory=list) def _sha256(data: bytes) -> str: @@ -239,6 +259,68 @@ def _run_prefix_roundtrip( pass +def _run_listdir_check( + iface: Any, + device: DeviceId, + prefix_name: str, + dir_path: str, + expect_path: str, + timeout_s: float, +) -> ListStepResult: + """Confirm ``expect_path`` appears in ``listDir(dir_path, depth=0)``.""" + t0 = time.perf_counter() + node = iface.localNode + rows = node.listDir(dir_path, depth=0, timeout_s=timeout_s) + dt = time.perf_counter() - t0 + if rows is None: + return ListStepResult( + device=device, + prefix=prefix_name, + dir_path=dir_path, + expect_path=expect_path, + row_count=0, + duration_s=dt, + ok=False, + error="listDir returned None (NAK, timeout, or firmware without MFLIST)", + ) + paths = [p for p, _ in rows] + if expect_path in paths: + return ListStepResult( + device=device, + prefix=prefix_name, + dir_path=dir_path, + expect_path=expect_path, + row_count=len(rows), + duration_s=dt, + ok=True, + error=None, + ) + base = Path(expect_path).name + matched = [p for p in paths if p.rstrip("/").endswith("/" + base) or p.rstrip("/").endswith(base)] + if matched: + return ListStepResult( + device=device, + prefix=prefix_name, + dir_path=dir_path, + expect_path=expect_path, + row_count=len(rows), + duration_s=dt, + ok=True, + error=None, + ) + sample = paths[:12] + return ListStepResult( + device=device, + prefix=prefix_name, + dir_path=dir_path, + expect_path=expect_path, + row_count=len(rows), + duration_s=dt, + ok=False, + error=f"expected path not in listing (sample {sample!r})", + ) + + def run_suite( device: DeviceId, port: str | None, @@ -249,6 +331,8 @@ def run_suite( xmodem_timeout: float, skip_prompt: bool, device_log: bool = False, + *, + skip_listdir: bool = False, ) -> SuiteResult: label = DEVICE_LABELS[device] print("\n" + "=" * 72) @@ -269,6 +353,7 @@ def run_suite( iface: SerialInterface | TCPInterface | None = None steps: list[StepResult] = [] + list_steps: list[ListStepResult] = [] try: stop_hb = threading.Event() hb_thread = threading.Thread(target=_connect_heartbeat, args=(stop_hb,), daemon=True) @@ -303,7 +388,7 @@ def run_suite( ) print(f"\nYou may disconnect {label} now (open failed).") _prompt("Disconnect complete?", skip_prompt) - return SuiteResult(device=device, ok=False, steps=steps) + return SuiteResult(device=device, ok=False, steps=steps, list_steps=list_steps) print(f" Transport + handshake finished in {link_dt:.1f}s", flush=True) @@ -321,6 +406,15 @@ def run_suite( print(f" {status} {r.direction} {r.duration_s:.2f}s sha256={r.sha256_expected[:16]}…") if not r.ok: print(f" error: {r.error}") + elif not skip_listdir: + parent = str(Path(path).parent) + print(f" listDir: {parent} (expect {path!r})", flush=True) + lr = _run_listdir_check(iface, device, prefix_name, parent, path, xmodem_timeout) + list_steps.append(lr) + ls = "PASS" if lr.ok else "FAIL" + print(f" {ls} listDir {lr.duration_s:.2f}s rows={lr.row_count}", flush=True) + if not lr.ok: + print(f" error: {lr.error}", flush=True) finally: if iface is not None: try: @@ -331,8 +425,8 @@ def run_suite( print(f"\nYou may disconnect {label} now.") _prompt("Disconnect complete?", skip_prompt) - suite_ok = all(s.ok for s in steps) - return SuiteResult(device=device, ok=suite_ok, steps=steps) + suite_ok = all(s.ok for s in steps) and all(s.ok for s in list_steps) + return SuiteResult(device=device, ok=suite_ok, steps=steps, list_steps=list_steps) # ── Optional XModem round-trip tracing (monkeypatch Node._xmodem_roundtrip) ─── @@ -391,8 +485,15 @@ def _emit_json(results: list[SuiteResult]) -> None: def ser(obj: Any) -> Any: if isinstance(obj, StepResult): return asdict(obj) + if isinstance(obj, ListStepResult): + return asdict(obj) if isinstance(obj, SuiteResult): - return {"device": obj.device, "ok": obj.ok, "steps": [asdict(s) for s in obj.steps]} + return { + "device": obj.device, + "ok": obj.ok, + "steps": [asdict(s) for s in obj.steps], + "list_steps": [asdict(s) for s in obj.list_steps], + } raise TypeError(type(obj)) print(json.dumps([ser(r) for r in results], indent=2)) @@ -447,6 +548,11 @@ def main() -> int: action="store_true", help="Print device firmware logs on stdout over the same API link (needs security.debug_log_api_enabled on the node).", ) + p.add_argument( + "--skip-listdir", + action="store_true", + help="Do not run MFLIST listDir checks after each successful round-trip (older firmware without MFLIST).", + ) args = p.parse_args() if args.verbose: @@ -478,6 +584,8 @@ def main() -> int: print(" --trace-xmodem: printing each XModem tx/rx line", flush=True) if args.device_log: print(" --device-log: firmware LogRecord -> stdout (same USB session)", flush=True) + if args.skip_listdir: + print(" --skip-listdir: MFLIST verification disabled", flush=True) if args.trace_xmodem: install_xmodem_trace() @@ -496,6 +604,7 @@ def main() -> int: xmodem_timeout=args.xmodem_timeout, skip_prompt=args.skip_prompt, device_log=args.device_log, + skip_listdir=args.skip_listdir, ) results.append(r) line = "PASS" if r.ok else "FAIL" @@ -507,7 +616,11 @@ def main() -> int: print("\n" + "=" * 72) print("SUMMARY (copy/paste)") for r in results: - print(f" {r.device}: {'PASS' if r.ok else 'FAIL'}") + extra = "" + if r.list_steps: + bad = [x for x in r.list_steps if not x.ok] + extra = f" listDir: {len(r.list_steps)} step(s)" + (f", {len(bad)} FAIL" if bad else " OK") + print(f" {r.device}: {'PASS' if r.ok else 'FAIL'}{extra}") any_fail = any(not r.ok for r in results) if any_fail: print("\nTriage:") @@ -519,5 +632,122 @@ def main() -> int: uninstall_xmodem_trace() +# ── Unit tests (no device): `pytest scripts/test_prefix_routing.py -q` from repo root ── + + +def test_run_listdir_check_exact_path(): + from unittest.mock import MagicMock + + node = MagicMock() + node.listDir.return_value = [("/__int__/meshforge-test/tdeck/int.bin", 10)] + iface = MagicMock() + iface.localNode = node + r = _run_listdir_check( + iface, + "tdeck", + "__int__", + "/__int__/meshforge-test/tdeck", + "/__int__/meshforge-test/tdeck/int.bin", + 1.0, + ) + assert r.ok is True + assert r.row_count == 1 + assert r.error is None + node.listDir.assert_called_once_with("/__int__/meshforge-test/tdeck", depth=0, timeout_s=1.0) + + +def test_run_listdir_check_basename_fallback(): + from unittest.mock import MagicMock + + node = MagicMock() + # Listed path differs from expected full virtual path but same basename — still accepted + node.listDir.return_value = [("/some/other/prefix/ext.bin", 10)] + iface = MagicMock() + iface.localNode = node + r = _run_listdir_check( + iface, + "techo", + "__ext__", + "/__ext__/meshforge-test/techo", + "/__ext__/meshforge-test/techo/ext.bin", + 2.0, + ) + assert r.ok is True + + +def test_run_listdir_check_fails_when_path_missing(): + from unittest.mock import MagicMock + + node = MagicMock() + node.listDir.return_value = [("/other/file.bin", 5)] + iface = MagicMock() + iface.localNode = node + r = _run_listdir_check( + iface, + "tdeck", + "__ext__", + "/__ext__/meshforge-test/tdeck", + "/__ext__/meshforge-test/tdeck/ext.bin", + 1.0, + ) + assert r.ok is False + assert r.error is not None + assert r.row_count == 1 + + +def test_run_listdir_check_none_from_node(): + from unittest.mock import MagicMock + + node = MagicMock() + node.listDir.return_value = None + iface = MagicMock() + iface.localNode = node + r = _run_listdir_check(iface, "rak4631", "__ext__", "/__ext__/x", "/__ext__/x/y.bin", 0.5) + assert r.ok is False + assert "None" in (r.error or "") + + +def test_emit_json_includes_list_steps(): + sr = SuiteResult( + device="tdeck", + ok=True, + steps=[ + StepResult( + device="tdeck", + prefix="__int__", + direction="roundtrip", + device_path="/p", + bytes_count=1, + sha256_expected="a", + sha256_got="a", + duration_s=0.1, + ok=True, + ) + ], + list_steps=[ + ListStepResult( + device="tdeck", + prefix="__int__", + dir_path="/", + expect_path="/p", + row_count=3, + duration_s=0.05, + ok=True, + ) + ], + ) + import io + import contextlib + + buf = io.StringIO() + with contextlib.redirect_stdout(buf): + _emit_json([sr]) + out = json.loads(buf.getvalue()) + assert len(out) == 1 + assert "list_steps" in out[0] + assert len(out[0]["list_steps"]) == 1 + assert out[0]["list_steps"][0]["expect_path"] == "/p" + + if __name__ == "__main__": raise SystemExit(main()) From 517bcb6e6cd5e6a358c461842484194589704864 Mon Sep 17 00:00:00 2001 From: Ben Allfree Date: Sun, 12 Apr 2026 22:17:51 -0700 Subject: [PATCH 5/7] refactor: update exit calls to use meshtastic.util.our_exit for consistency --- meshtastic/__main__.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/meshtastic/__main__.py b/meshtastic/__main__.py index a67a3ad7a..02919a46b 100644 --- a/meshtastic/__main__.py +++ b/meshtastic/__main__.py @@ -450,7 +450,7 @@ def onConnected(interface): node = interface.localNode rows = node.listDir(remote_dir, depth=depth) if rows is None: - our_exit("listDir failed", 1) + meshtastic.util.our_exit("listDir failed", 1) for path, sz in rows: print(f"{sz}\t{path}") @@ -469,7 +469,7 @@ def _upload_progress(sent, total): ok = node.uploadFile(src, dst, on_progress=_upload_progress) print(f"\r {'OK' if ok else 'FAILED'}: {src} → {dst} ") if not ok: - our_exit("Upload failed", 1) + meshtastic.util.our_exit("Upload failed", 1) else: print(f"Downloading {src} → {dst}") def _download_progress(received, _total): @@ -477,7 +477,7 @@ def _download_progress(received, _total): ok = node.downloadFile(src, dst, on_progress=_download_progress) print(f"\r {'OK' if ok else 'FAILED'}: {src} → {dst} ") if not ok: - our_exit("Download failed", 1) + meshtastic.util.our_exit("Download failed", 1) if args.reboot: closeNow = True From 63e230d0c4011ae08328fd90d1f832d1f8798964 Mon Sep 17 00:00:00 2001 From: Ben Allfree Date: Sun, 12 Apr 2026 22:18:03 -0700 Subject: [PATCH 6/7] fix: update packet number handling to use monotonic uint16 instead of 8-bit wrap --- meshtastic/node.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/meshtastic/node.py b/meshtastic/node.py index 98b632307..cf2be3bc1 100644 --- a/meshtastic/node.py +++ b/meshtastic/node.py @@ -1201,7 +1201,8 @@ def uploadFile(self, local_path: str, device_path: str, return False offset += len(chunk) - seq = (seq & 0xFF) + 1 + # Firmware uses monotonic uint16 packet numbers (not 8-bit XMODEM wrap). + seq += 1 if on_progress: on_progress(offset, total) @@ -1297,7 +1298,7 @@ def downloadFile(self, device_path: str, local_path: str, on_progress(sum(len(c) for c in chunks), -1) ack = xmodem_pb2.XModem() ack.control = XC.ACK - expected_seq = (expected_seq & 0xFF) + 1 + expected_seq += 1 else: ack = xmodem_pb2.XModem() ack.control = XC.NAK @@ -1370,7 +1371,7 @@ def listDir(self, device_path: str, depth: int = 0, timeout_s: float = _XMODEM_T chunks.append(chunk) ack = xmodem_pb2.XModem() ack.control = XC.ACK - expected_seq = (expected_seq & 0xFF) + 1 + expected_seq += 1 else: ack = xmodem_pb2.XModem() ack.control = XC.NAK From 25469fefcc16b858de1dc69b4863375d2c0dc7a8 Mon Sep 17 00:00:00 2001 From: Ben Allfree Date: Mon, 13 Apr 2026 16:23:04 -0700 Subject: [PATCH 7/7] --upload and --download globbing --- meshtastic/__main__.py | 163 ++++++++++-- meshtastic/file_transfer_cli.py | 275 +++++++++++++++++++++ meshtastic/tests/test_file_transfer_cli.py | 129 ++++++++++ 3 files changed, 541 insertions(+), 26 deletions(-) create mode 100644 meshtastic/file_transfer_cli.py create mode 100644 meshtastic/tests/test_file_transfer_cli.py diff --git a/meshtastic/__main__.py b/meshtastic/__main__.py index 02919a46b..223668474 100644 --- a/meshtastic/__main__.py +++ b/meshtastic/__main__.py @@ -37,6 +37,7 @@ except ImportError as e: have_test = False +import meshtastic.file_transfer_cli as file_transfer_cli import meshtastic.ota import meshtastic.util import meshtastic.serial_interface @@ -454,28 +455,102 @@ def onConnected(interface): for path, sz in rows: print(f"{sz}\t{path}") - if args.cp: + if args.upload is not None: closeNow = True - src, dst = args.cp - import os node = interface.localNode - # Direction: if src is an existing local file → upload; otherwise → download - if os.path.isfile(src): - print(f"Uploading {src} → {dst}") - def _upload_progress(sent, total): - pct = 100 * sent // total - bar = '#' * (pct // 5) + '.' * (20 - pct // 5) - print(f"\r [{bar}] {pct}%", end="", flush=True) - ok = node.uploadFile(src, dst, on_progress=_upload_progress) - print(f"\r {'OK' if ok else 'FAILED'}: {src} → {dst} ") + local_tokens = list(args.upload[:-1]) + remote_base = args.upload[-1] + try: + upload_pairs = file_transfer_cli.plan_upload(local_tokens, remote_base) + except file_transfer_cli.FileTransferCliError as e: + meshtastic.util.our_exit(str(e), 1) + + def _upload_progress(sent, total): + pct = 100 * sent // total if total else 0 + bar = '#' * (pct // 5) + '.' * (20 - pct // 5) + print(f"\r [{bar}] {pct}%", end="", flush=True) + + for i, (lp, devp) in enumerate(upload_pairs, start=1): + print(f"Uploading ({i}/{len(upload_pairs)}) {lp} → {devp}") + ok = node.uploadFile(lp, devp, on_progress=_upload_progress) + print(f"\r {'OK' if ok else 'FAILED'}: {lp} → {devp} ") if not ok: meshtastic.util.our_exit("Upload failed", 1) - else: - print(f"Downloading {src} → {dst}") - def _download_progress(received, _total): - print(f"\r {received} bytes received...", end="", flush=True) - ok = node.downloadFile(src, dst, on_progress=_download_progress) - print(f"\r {'OK' if ok else 'FAILED'}: {src} → {dst} ") + + if args.download is not None: + closeNow = True + node = interface.localNode + rpath, lpath = args.download + lpath_abs = os.path.abspath(os.path.expanduser(lpath)) + if os.path.isdir(lpath_abs): + meshtastic.util.our_exit( + "ERROR: --download LOCAL must be a file path, not a directory (use --download-tree or --download-glob).", + 1, + ) + parent = os.path.dirname(lpath_abs) + if parent: + os.makedirs(parent, exist_ok=True) + print(f"Downloading {rpath} → {lpath}") + def _download_progress(received, _total): + print(f"\r {received} bytes received...", end="", flush=True) + ok = node.downloadFile(rpath, lpath_abs, on_progress=_download_progress) + print(f"\r {'OK' if ok else 'FAILED'}: {rpath} → {lpath_abs} ") + if not ok: + meshtastic.util.our_exit("Download failed", 1) + + if args.download_tree is not None: + closeNow = True + node = interface.localNode + rdir, ldir = args.download_tree + depth = 255 + rows = node.listDir(rdir.rstrip("/") or "/", depth=depth) + if rows is None: + meshtastic.util.our_exit("listDir failed", 1) + try: + tree_pairs = file_transfer_cli.plan_download_tree(rdir, ldir, rows) + except file_transfer_cli.FileTransferCliError as e: + meshtastic.util.our_exit(str(e), 1) + if not tree_pairs: + meshtastic.util.our_exit("No files matched for --download-tree", 1) + + def _download_progress(received, _total): + print(f"\r {received} bytes received...", end="", flush=True) + + for i, (devp, locp) in enumerate(tree_pairs, start=1): + os.makedirs(os.path.dirname(locp) or ".", exist_ok=True) + print(f"Downloading ({i}/{len(tree_pairs)}) {devp} → {locp}") + ok = node.downloadFile(devp, locp, on_progress=_download_progress) + print(f"\r {'OK' if ok else 'FAILED'}: {devp} → {locp} ") + if not ok: + meshtastic.util.our_exit("Download failed", 1) + + if args.download_glob is not None: + closeNow = True + node = interface.localNode + pattern, ldir = args.download_glob + try: + base, _rel = file_transfer_cli.split_remote_glob_pattern(pattern) + except file_transfer_cli.FileTransferCliError as e: + meshtastic.util.our_exit(str(e), 1) + depth = 255 + rows = node.listDir(base, depth=depth) + if rows is None: + meshtastic.util.our_exit("listDir failed", 1) + try: + glob_pairs = file_transfer_cli.plan_download_glob(pattern, ldir, rows) + except file_transfer_cli.FileTransferCliError as e: + meshtastic.util.our_exit(str(e), 1) + if not glob_pairs: + meshtastic.util.our_exit("No files matched for --download-glob", 1) + + def _download_progress(received, _total): + print(f"\r {received} bytes received...", end="", flush=True) + + for i, (devp, locp) in enumerate(glob_pairs, start=1): + os.makedirs(os.path.dirname(locp) or ".", exist_ok=True) + print(f"Downloading ({i}/{len(glob_pairs)}) {devp} → {locp}") + ok = node.downloadFile(devp, locp, on_progress=_download_progress) + print(f"\r {'OK' if ok else 'FAILED'}: {devp} → {locp} ") if not ok: meshtastic.util.our_exit("Download failed", 1) @@ -1912,17 +1987,53 @@ def addLocalActionArgs(parser: argparse.ArgumentParser) -> argparse.ArgumentPars default=None ) - group.add_argument( - "--cp", + xfer = group.add_mutually_exclusive_group() + xfer.add_argument( + "--upload", + help=( + "Upload local files to the device via XModem (requires matching firmware). " + "Usage: --upload LOCAL [LOCAL ...] REMOTE. " + "Last argument is the device path: for a single plain file LOCAL, REMOTE is the exact destination file path; " + "otherwise REMOTE is a directory prefix and relative paths are preserved. " + "LOCAL may be files, directories (recursive), or globs (quote patterns with **). " + "Each device path must be <= 128 UTF-8 bytes." + ), + nargs="+", + metavar="SPEC", + default=None, + ) + xfer.add_argument( + "--download", + help=( + "Download one file from the device. " + "Usage: --download REMOTE_FILE LOCAL_FILE. " + "For a directory tree use --download-tree; for remote globs use --download-glob." + ), + nargs=2, + metavar=("REMOTE", "LOCAL"), + default=None, + ) + xfer.add_argument( + "--download-tree", help=( - "Copy a file to or from the device via XModem. " - "Usage: --cp . " - "If is an existing local file it is uploaded to on the device. " - "Otherwise is treated as a device path and downloaded to local . " - "Use /__ext__/ or /__int__/ prefixes to target external or internal flash." + "Download a full remote directory tree via MFLIST + XModem. " + "Usage: --download-tree REMOTE_DIR LOCAL_DIR. " + "Only rows with size > 0 are treated as files." ), nargs=2, - metavar=("SRC", "DST"), + metavar=("REMOTE_DIR", "LOCAL_DIR"), + default=None, + ) + xfer.add_argument( + "--download-glob", + help=( + "Download remote files matching a glob (MFLIST at the literal base + filter). " + "Usage: --download-glob 'REMOTE_PATTERN' LOCAL_DIR. " + "Pattern must include * ? or [; ** matches across / (relative to the literal base)." + ), + nargs=2, + metavar=("REMOTE_PATTERN", "LOCAL_DIR"), + default=None, ) group.add_argument( diff --git a/meshtastic/file_transfer_cli.py b/meshtastic/file_transfer_cli.py new file mode 100644 index 000000000..cb8730453 --- /dev/null +++ b/meshtastic/file_transfer_cli.py @@ -0,0 +1,275 @@ +"""CLI helpers for XModem upload/download (multi-file, globs, listDir-based downloads).""" + +from __future__ import annotations + +import glob +import os +import posixpath +import re +from typing import Iterable, List, Optional, Sequence, Tuple + +# Must match meshtastic.node.MeshInterface._XMODEM_BUFFER_MAX (path in first packet). +XMODEM_DEVICE_PATH_UTF8_MAX = 128 + + +def device_posix_join(base: str, *parts: str) -> str: + """Join device path segments with forward slashes; collapse duplicate slashes.""" + segs: List[str] = [] + for p in (base,) + parts: + if not p: + continue + for seg in p.replace("\\", "/").split("/"): + if seg == "" or seg == ".": + continue + if seg == "..": + if segs: + segs.pop() + continue + segs.append(seg) + return "/" + "/".join(segs) if segs else "/" + + +def device_path_utf8_len(path: str) -> int: + return len(path.encode("utf-8")) + + +def check_device_paths(paths: Iterable[str]) -> Optional[str]: + """Return first error message if any path exceeds XModem limit, else None.""" + for p in paths: + n = device_path_utf8_len(p) + if n > XMODEM_DEVICE_PATH_UTF8_MAX: + return ( + f"Device path exceeds {XMODEM_DEVICE_PATH_UTF8_MAX} UTF-8 bytes ({n}): {p!r}" + ) + return None + + +def _first_glob_magic_index(s: str) -> int: + for i, c in enumerate(s): + if c in "*?[": + return i + return -1 + + +def _expand_local_token(token: str) -> Tuple[List[str], str]: + """ + Expand one local CLI token to absolute file paths and a strip_prefix for relpath. + + Returns (sorted_unique_files, strip_prefix). + """ + exp = os.path.expanduser(token) + idx = _first_glob_magic_index(exp) + + if idx >= 0: + raw = sorted(set(glob.glob(exp, recursive=True))) + files = [os.path.normpath(os.path.abspath(p)) for p in raw if os.path.isfile(p)] + if not files: + raise FileTransferCliError(f"Glob matched no files: {token!r}") + literal = exp[:idx] + if not literal.strip(): + anchor = os.path.abspath(".") + else: + anchor = os.path.normpath(os.path.abspath(literal)) + return files, anchor + + if os.path.isfile(exp): + p = os.path.normpath(os.path.abspath(exp)) + return [p], os.path.dirname(p) + + if os.path.isdir(exp): + root = os.path.normpath(os.path.abspath(exp)) + out: List[str] = [] + for dirpath, _dirnames, filenames in os.walk(root): + for name in filenames: + fp = os.path.join(dirpath, name) + if os.path.isfile(fp): + out.append(os.path.normpath(os.path.abspath(fp))) + return sorted(set(out)), root + + raise FileTransferCliError(f"Not a file, directory, or glob: {token!r}") + + +class FileTransferCliError(Exception): + pass + + +def plan_upload(local_tokens: Sequence[str], remote_base: str) -> List[Tuple[str, str]]: + """ + Build (local_abs_path, device_path) for each upload. + + Rules: + - Exactly one local path token that is a plain file, and expansion yields one file: + device path is ``remote_base`` as given. + - Otherwise: device path is device_posix_join(remote_base, relpath) where relpath uses + os.path.relpath(local_file, strip_prefix) with forward slashes. + """ + if not local_tokens: + raise FileTransferCliError("--upload requires at least LOCAL and REMOTE") + remote_base = remote_base.replace("\\", "/") + if remote_base and not remote_base.startswith("/"): + remote_base = "/" + remote_base.lstrip("/") + + single_token_plain_file = ( + len(local_tokens) == 1 + and _first_glob_magic_index(local_tokens[0]) < 0 + and os.path.isfile(os.path.expanduser(local_tokens[0])) + ) + + entries: List[Tuple[str, str]] = [] + for tok in local_tokens: + files, strip = _expand_local_token(tok) + for f in files: + entries.append((f, strip)) + + if not entries: + raise FileTransferCliError("No files to upload") + + seen: set = set() + deduped: List[Tuple[str, str]] = [] + for f, strip in entries: + if f in seen: + continue + seen.add(f) + deduped.append((f, strip)) + + use_exact_remote = len(deduped) == 1 and single_token_plain_file + out: List[Tuple[str, str]] = [] + for local_path, strip_prefix in deduped: + if use_exact_remote: + dev = remote_base + else: + rel = os.path.relpath(local_path, strip_prefix) + rel_posix = rel.replace(os.sep, "/") + dev = device_posix_join(remote_base, rel_posix) + out.append((local_path, dev)) + + err = check_device_paths(dev for _loc, dev in out) + if err: + raise FileTransferCliError(err) + return out + + +def split_remote_glob_pattern(pattern: str) -> Tuple[str, str]: + """ + Split ``pattern`` into (list_dir_base, relative_glob) for MFLIST + filtering. + + ``list_dir_base`` is the longest leading substring with no glob metacharacters, + normalized to a POSIX path without trailing slash (except root ``/``). + ``relative_glob`` is the remainder (may include ``**``). + """ + pattern = pattern.replace("\\", "/") + idx = _first_glob_magic_index(pattern) + if idx < 0: + raise FileTransferCliError( + "--download-glob pattern must contain at least one of * ? [" + ) + if idx == 0: + base = "/" + rel = pattern.lstrip("/") + else: + literal = pattern[:idx] + rel = pattern[idx:].lstrip("/") + if not rel: + raise FileTransferCliError("Invalid --download-glob pattern") + base = posixpath.normpath(literal.rstrip("/") or "/") + if not base.startswith("/"): + base = "/" + base + return base, rel + + +def remote_rel_glob_to_regex(rel_glob: str) -> re.Pattern[str]: + """ + Match a path relative to list base, using ``/`` separators. + ``**`` matches across directories; ``*`` and ``?`` do not cross ``/``. + """ + rel_glob = rel_glob.replace("\\", "/") + out: List[str] = ["\\A"] + i = 0 + while i < len(rel_glob): + if rel_glob[i : i + 2] == "**": + if i + 2 < len(rel_glob) and rel_glob[i + 2] == "/": + out.append("(?:.*/)?") + i += 3 + else: + out.append(".*") + i += 2 + elif rel_glob[i] == "*": + out.append("[^/]*") + i += 1 + elif rel_glob[i] == "?": + out.append("[^/]") + i += 1 + elif rel_glob[i] in r".^$+{}[]|()\\": + out.append(re.escape(rel_glob[i])) + i += 1 + else: + out.append(re.escape(rel_glob[i])) + i += 1 + out.append("\\Z") + return re.compile("".join(out)) + + +def plan_download_tree( + remote_dir: str, local_dir: str, rows: Sequence[Tuple[str, int]] +) -> List[Tuple[str, str]]: + """From listDir rows, build (device_path, local_abs_path) for every file.""" + remote_dir = remote_dir.rstrip("/").replace("\\", "/") + if not remote_dir: + remote_dir = "/" + if not remote_dir.startswith("/"): + remote_dir = "/" + remote_dir + + local_root = os.path.abspath(os.path.expanduser(local_dir)) + out: List[Tuple[str, str]] = [] + + for path, sz in rows: + if sz <= 0: + continue + path = path.replace("\\", "/") + if not path.startswith(remote_dir): + continue + tail = path[len(remote_dir) :].lstrip("/") + if not tail: + continue + local_path = os.path.join(local_root, *tail.split("/")) + out.append((path, os.path.normpath(local_path))) + + err = check_device_paths(dev for dev, _l in out) + if err: + raise FileTransferCliError(err) + return out + + +def plan_download_glob( + pattern: str, local_dir: str, rows: Sequence[Tuple[str, int]] +) -> List[Tuple[str, str]]: + base, rel_pat = split_remote_glob_pattern(pattern) + rx = remote_rel_glob_to_regex(rel_pat) + base_n = base.rstrip("/") or "/" + + local_root = os.path.abspath(os.path.expanduser(local_dir)) + out: List[Tuple[str, str]] = [] + + for path, sz in rows: + if sz <= 0: + continue + path = path.replace("\\", "/") + if base_n == "/": + if not path.startswith("/"): + continue + rel = path.lstrip("/") + else: + if not (path == base_n or path.startswith(base_n + "/")): + continue + rel = path[len(base_n) :].lstrip("/") + if not rel: + continue + if not rx.match(rel): + continue + local_path = os.path.join(local_root, *rel.split("/")) + out.append((path, os.path.normpath(local_path))) + + err = check_device_paths(dev for dev, _l in out) + if err: + raise FileTransferCliError(err) + return out diff --git a/meshtastic/tests/test_file_transfer_cli.py b/meshtastic/tests/test_file_transfer_cli.py new file mode 100644 index 000000000..e442fd099 --- /dev/null +++ b/meshtastic/tests/test_file_transfer_cli.py @@ -0,0 +1,129 @@ +"""Tests for meshtastic.file_transfer_cli.""" + +import os +import tempfile + +import pytest + +from meshtastic.file_transfer_cli import ( + FileTransferCliError, + XMODEM_DEVICE_PATH_UTF8_MAX, + check_device_paths, + device_posix_join, + plan_download_glob, + plan_download_tree, + plan_upload, + remote_rel_glob_to_regex, + split_remote_glob_pattern, +) + + +def test_device_posix_join(): + assert device_posix_join("/__ext__/d", "a/b") == "/__ext__/d/a/b" + assert device_posix_join("/__ext__/d/", "a", "b") == "/__ext__/d/a/b" + assert device_posix_join("/", "x") == "/x" + + +def test_check_device_paths(): + assert check_device_paths(["/short"]) is None + longp = "/" + "a" * (XMODEM_DEVICE_PATH_UTF8_MAX + 1) + assert check_device_paths([longp]) is not None + + +def test_plan_upload_single_plain_file(): + with tempfile.TemporaryDirectory() as d: + f = os.path.join(d, "one.bin") + with open(f, "wb") as fp: + fp.write(b"x") + pairs = plan_upload([f], "/__ext__/out.bin") + assert pairs == [(f, "/__ext__/out.bin")] + + +def test_plan_upload_glob_recursive(): + with tempfile.TemporaryDirectory() as d: + os.makedirs(os.path.join(d, "p", "q")) + f1 = os.path.join(d, "p", "a.txt") + f2 = os.path.join(d, "p", "q", "b.txt") + for fp in (f1, f2): + with open(fp, "w") as fh: + fh.write("x") + pat = os.path.join(d, "p", "**", "*.txt") + pairs = plan_upload([pat], "/__ext__/out") + assert len(pairs) == 2 + devs = sorted(dev for _loc, dev in pairs) + assert devs == ["/__ext__/out/a.txt", "/__ext__/out/q/b.txt"] + + +def test_plan_upload_directory_preserves_layout(): + with tempfile.TemporaryDirectory() as d: + sub = os.path.join(d, "a", "b") + os.makedirs(sub) + f1 = os.path.join(sub, "f.txt") + with open(f1, "w") as fp: + fp.write("hi") + pairs = plan_upload([d], "/__ext__/dst") + assert len(pairs) == 1 + loc, dev = pairs[0] + assert loc == f1 + assert dev == "/__ext__/dst/a/b/f.txt" + + +def test_plan_upload_dedupe(): + with tempfile.TemporaryDirectory() as d: + f = os.path.join(d, "x.bin") + with open(f, "wb") as fp: + fp.write(b"x") + pairs = plan_upload([f, f], "/__ext__/d") + assert len(pairs) == 1 + + +def test_plan_upload_path_too_long(): + with tempfile.TemporaryDirectory() as d: + f = os.path.join(d, "x.bin") + with open(f, "wb") as fp: + fp.write(b"x") + long_base = "/" + "z" * 200 + with pytest.raises(FileTransferCliError): + plan_upload([f], long_base) + + +def test_split_remote_glob_pattern(): + assert split_remote_glob_pattern("/__ext__/bbs/*.md") == ("/__ext__/bbs", "*.md") + base, rel = split_remote_glob_pattern("*.md") + assert base == "/" + assert rel == "*.md" + + +def test_remote_rel_glob_to_regex(): + rx = remote_rel_glob_to_regex("**/*.png") + assert rx.match("a/b/c.png") + assert rx.match("x.png") + assert not rx.match("a/b/c.jpg") + + +def test_plan_download_tree(): + rows = [ + ("/__ext__/d/a.txt", 3), + ("/__ext__/d/sub/b.bin", 2), + ("/__ext__/d/emptydir", 0), + ] + with tempfile.TemporaryDirectory() as ld: + pairs = plan_download_tree("/__ext__/d", ld, rows) + assert len(pairs) == 2 + by = {os.path.basename(lp): dp for dp, lp in pairs} + assert by["a.txt"] == "/__ext__/d/a.txt" + assert by["b.bin"] == "/__ext__/d/sub/b.bin" + + +def test_plan_download_glob(): + rows = [ + ("/__ext__/bbs/kb/one.md", 10), + ("/__ext__/bbs/kb/two.txt", 5), + ("/__ext__/bbs/other/x.md", 3), + ] + with tempfile.TemporaryDirectory() as ld: + pairs = plan_download_glob("/__ext__/bbs/**/*.md", ld, rows) + assert len(pairs) == 2 + locs = sorted(lp for _dp, lp in pairs) + assert locs[0].endswith(os.path.join("kb", "one.md")) + assert locs[1].endswith(os.path.join("other", "x.md"))