Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions meshtastic/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,31 @@ def onConnected(interface):
# Must turn off encryption on primary channel
interface.getNode(args.dest, **getNode_kwargs).turnOffEncryptionOnPrimaryChannel()

if args.cp:
closeNow = True
src, dst = args.cp
import os
node = interface.localNode
# Direction: if src is an existing local file → upload; otherwise → download
if os.path.isfile(src):
print(f"Uploading {src} → {dst}")
def _upload_progress(sent, total):
pct = 100 * sent // total
bar = '#' * (pct // 5) + '.' * (20 - pct // 5)
print(f"\r [{bar}] {pct}%", end="", flush=True)
ok = node.uploadFile(src, dst, on_progress=_upload_progress)
print(f"\r {'OK' if ok else 'FAILED'}: {src} → {dst} ")
if not ok:
our_exit("Upload failed", 1)
else:
print(f"Downloading {src} → {dst}")
def _download_progress(received, _total):
print(f"\r {received} bytes received...", end="", flush=True)
ok = node.downloadFile(src, dst, on_progress=_download_progress)
print(f"\r {'OK' if ok else 'FAILED'}: {src} → {dst} ")
if not ok:
our_exit("Download failed", 1)

if args.reboot:
closeNow = True
waitForAckNak = True
Expand Down Expand Up @@ -1876,6 +1901,19 @@ def addLocalActionArgs(parser: argparse.ArgumentParser) -> argparse.ArgumentPars
default=None
)

group.add_argument(
"--cp",
help=(
"Copy a file to or from the device via XModem. "
"Usage: --cp <src> <dst>. "
"If <src> is a local file it is uploaded to <dst> on the device. "
"If <src> starts with / it is downloaded from the device to <dst> locally. "
"Use /__ext__/ or /__int__/ prefixes to target external or internal flash."
),
nargs=2,
metavar=("SRC", "DST"),
)

return parser

def addRemoteActionArgs(parser: argparse.ArgumentParser) -> argparse.ArgumentParser:
Expand Down
321 changes: 320 additions & 1 deletion meshtastic/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@

import base64
import logging
import threading
import time

from typing import Optional, Union, List

from meshtastic.protobuf import admin_pb2, apponly_pb2, channel_pb2, config_pb2, localonly_pb2, mesh_pb2, portnums_pb2
from meshtastic.protobuf import admin_pb2, apponly_pb2, channel_pb2, config_pb2, localonly_pb2, mesh_pb2, portnums_pb2, xmodem_pb2
from meshtastic.util import (
Timeout,
camel_to_snake,
Expand Down Expand Up @@ -1076,3 +1077,321 @@ def get_channels_with_hash(self):
"hash": hash_val,
})
return result

# ── XModem file transfer ───────────────────────────────────────────────────

_XMODEM_BUFFER_MAX = 128 # meshtastic_XModem_buffer_t::bytes
_XMODEM_MAX_RETRY = 10
_XMODEM_TIMEOUT_S = 5.0
_MFLIST_PREFIX = "MFLIST "

@staticmethod
def _xmodem_crc16(data: bytes) -> int:
"""CRC-16-CCITT matching the firmware's XModemAdapter::crc16_ccitt."""
crc = 0
for b in data:
crc = ((crc >> 8) | (crc << 8)) & 0xFFFF
crc ^= b
crc ^= ((crc & 0xFF) >> 4) & 0xFFFF
crc ^= ((crc << 8) << 4) & 0xFFFF
crc ^= (((crc & 0xFF) << 4) << 1) & 0xFFFF
return crc & 0xFFFF

def _xmodem_send(self, xm: xmodem_pb2.XModem) -> None:
"""Wrap an XModem protobuf in ToRadio and send to the device."""
tr = mesh_pb2.ToRadio()
tr.xmodemPacket.CopyFrom(xm)
self.iface._sendToRadio(tr)

def _xmodem_roundtrip(self, xm: xmodem_pb2.XModem,
timeout_s: float = _XMODEM_TIMEOUT_S) -> Optional[xmodem_pb2.XModem]:
"""Subscribe to xmodempacket, send, then wait for response (subscribe-first to avoid race)."""
from pubsub import pub # type: ignore[import-untyped]
event = threading.Event()
result: list = [None]

def _on_xmodem(packet, interface):
result[0] = packet
event.set()

# Subscribe BEFORE sending so we don't miss a fast response
pub.subscribe(_on_xmodem, "meshtastic.xmodempacket")
try:
self._xmodem_send(xm)
event.wait(timeout=timeout_s)
finally:
try:
pub.unsubscribe(_on_xmodem, "meshtastic.xmodempacket")
except Exception:
pass

return result[0]

def uploadFile(self, local_path: str, device_path: str,
on_progress=None, timeout_s: float = _XMODEM_TIMEOUT_S) -> bool:
"""Upload a local file to the device via XModem.

Args:
local_path: Path to the local file to upload.
device_path: Destination path on the device. Use ``/__ext__/`` or
``/__int__/`` prefixes to target external or internal
flash respectively; bare ``/`` paths go to InternalFS.
on_progress: Optional callback ``fn(bytes_sent, total_bytes)``.
timeout_s: Per-packet ACK timeout in seconds.

Returns:
True on success, False on failure.

Example::

iface.localNode.uploadFile("wordle.bin", "/__ext__/bbs/kb/wordle.bin")
"""
if self.noProto:
logger.warning("uploadFile: protocol disabled (noProto)")
return False

try:
data = open(local_path, "rb").read()
except OSError as e:
logger.error(f"uploadFile: cannot read {local_path}: {e}")
return False

XC = xmodem_pb2.XModem

# SOH seq=0 — filename handshake
xm = xmodem_pb2.XModem()
xm.control = XC.SOH
xm.seq = 0
xm.buffer = device_path.encode("utf-8")[: self._XMODEM_BUFFER_MAX]

for attempt in range(self._XMODEM_MAX_RETRY):
resp = self._xmodem_roundtrip(xm, timeout_s)
logger.debug(f"uploadFile: OPEN attempt {attempt+1} resp={resp.control if resp else None}")
if resp and resp.control == XC.ACK:
break
if attempt == self._XMODEM_MAX_RETRY - 1:
logger.error(f"uploadFile: OPEN rejected for {device_path}")
return False

# STX data packets
seq = 1
offset = 0
total = len(data)
while offset < total:
chunk = data[offset: offset + self._XMODEM_BUFFER_MAX]
crc = self._xmodem_crc16(chunk)

xm = xmodem_pb2.XModem()
xm.control = XC.STX
xm.seq = seq
xm.crc16 = crc
xm.buffer = bytes(chunk)

acked = False
for retry in range(self._XMODEM_MAX_RETRY):
resp = self._xmodem_roundtrip(xm, timeout_s)
if resp and resp.control == XC.ACK:
acked = True
break
if resp and resp.control == XC.CAN:
logger.error(f"uploadFile: transfer cancelled at offset {offset}")
return False
if not acked:
logger.error(f"uploadFile: no ACK for seq {seq} at offset {offset}")
return False

offset += len(chunk)
# Firmware uses monotonic uint16 packet numbers (not 8-bit XMODEM wrap).
seq += 1
if on_progress:
on_progress(offset, total)

# EOT
xm = xmodem_pb2.XModem()
xm.control = XC.EOT
for attempt in range(self._XMODEM_MAX_RETRY):
resp = self._xmodem_roundtrip(xm, timeout_s)
if resp and resp.control == XC.ACK:
logger.debug(f"uploadFile: {local_path} → {device_path} complete ({total} bytes)")
return True
logger.error(f"uploadFile: EOT not acknowledged for {device_path}")
return False

def _xmodem_wait_next(self, timeout_s: float = _XMODEM_TIMEOUT_S) -> Optional[xmodem_pb2.XModem]:
"""Wait for the next xmodem packet without sending anything first."""
from pubsub import pub # type: ignore[import-untyped]
event = threading.Event()
result: list = [None]

def _cb(packet, interface):
result[0] = packet
event.set()

pub.subscribe(_cb, "meshtastic.xmodempacket")
try:
event.wait(timeout=timeout_s)
finally:
try:
pub.unsubscribe(_cb, "meshtastic.xmodempacket")
except Exception:
pass
return result[0]

def downloadFile(self, device_path: str, local_path: str,
on_progress=None, timeout_s: float = _XMODEM_TIMEOUT_S) -> bool:
"""Download a file from the device via XModem.

Args:
device_path: Source path on the device (``/__ext__/``, ``/__int__/``,
or bare ``/`` for InternalFS).
local_path: Destination path on the local filesystem.
on_progress: Optional callback ``fn(bytes_received, total_bytes)``.
``total_bytes`` is -1 (unknown) during transfer.
timeout_s: Per-packet response timeout in seconds.

Returns:
True on success, False on failure.

Example::

iface.localNode.downloadFile("/__ext__/bbs/kb/wordle.bin", "wordle.bin")
"""
if self.noProto:
logger.warning("downloadFile: protocol disabled (noProto)")
return False

XC = xmodem_pb2.XModem

# STX seq=0 — request device to transmit the file
xm = xmodem_pb2.XModem()
xm.control = XC.STX
xm.seq = 0
xm.buffer = device_path.encode("utf-8")[: self._XMODEM_BUFFER_MAX]

chunks: list = []
expected_seq = 1

# Subscribe first, then send, so we don't miss the first response
resp = self._xmodem_roundtrip(xm, timeout_s)

while True:
if resp is None:
logger.error(f"downloadFile: timeout waiting for data from {device_path}")
return False

if resp.control == XC.EOT:
# Final ACK — no more packets expected after this
ack = xmodem_pb2.XModem()
ack.control = XC.ACK
self._xmodem_send(ack)
break

if resp.control in (XC.NAK, XC.CAN):
logger.error(f"downloadFile: device sent error control for {device_path}")
return False

if resp.control in (XC.SOH, XC.STX):
chunk = bytes(resp.buffer)
if resp.seq == expected_seq and self._xmodem_crc16(chunk) == resp.crc16:
chunks.append(chunk)
if on_progress:
on_progress(sum(len(c) for c in chunks), -1)
ack = xmodem_pb2.XModem()
ack.control = XC.ACK
expected_seq += 1
else:
ack = xmodem_pb2.XModem()
ack.control = XC.NAK

# Subscribe BEFORE sending ACK/NAK so we don't miss the next packet
resp = self._xmodem_roundtrip(ack, timeout_s)
continue

# Unexpected control — skip
resp = self._xmodem_wait_next(timeout_s)

data = b"".join(chunks)
try:
with open(local_path, "wb") as f:
f.write(data)
except OSError as e:
logger.error(f"downloadFile: cannot write {local_path}: {e}")
return False

logger.debug(f"downloadFile: {device_path} → {local_path} complete ({len(data)} bytes)")
return True

def listDir(self, device_path: str, depth: int = 0, timeout_s: float = _XMODEM_TIMEOUT_S):
"""List files on the device under ``device_path`` via XMODEM ``MFLIST`` (matching firmware).

Args:
device_path: Directory on the device (``/__ext__/``, ``/__int__/``, or bare ``/``).
depth: Recursion depth (0 = files in that directory only; each increment adds one tree level).
timeout_s: Per-packet timeout.

Returns:
List of ``(path, size_bytes)`` for each file, or ``None`` on failure.
Lines starting with ``#`` in the payload are ignored (comments / truncation markers).
"""
if self.noProto:
logger.warning("listDir: protocol disabled (noProto)")
return None

d = max(0, min(255, int(depth)))
cmd = f"{self._MFLIST_PREFIX}{device_path} {d}".encode("utf-8")[: self._XMODEM_BUFFER_MAX]
XC = xmodem_pb2.XModem

xm = xmodem_pb2.XModem()
xm.control = XC.SOH
xm.seq = 0
xm.buffer = bytes(cmd)

chunks: list = []
expected_seq = 1
resp = self._xmodem_roundtrip(xm, timeout_s)

while True:
if resp is None:
logger.error(f"listDir: timeout waiting for data from {device_path}")
return None

if resp.control == XC.EOT:
ack = xmodem_pb2.XModem()
ack.control = XC.ACK
self._xmodem_send(ack)
break

if resp.control in (XC.NAK, XC.CAN):
logger.error(f"listDir: device rejected or cancelled for {device_path}")
return None

if resp.control in (XC.SOH, XC.STX):
chunk = bytes(resp.buffer)
if resp.seq == expected_seq and self._xmodem_crc16(chunk) == resp.crc16:
chunks.append(chunk)
ack = xmodem_pb2.XModem()
ack.control = XC.ACK
expected_seq += 1
else:
ack = xmodem_pb2.XModem()
ack.control = XC.NAK

resp = self._xmodem_roundtrip(ack, timeout_s)
continue

resp = self._xmodem_wait_next(timeout_s)

raw = b"".join(chunks).decode("utf-8", errors="replace")
out: list = []
for line in raw.splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
if "\t" not in line:
continue
path, sz = line.split("\t", 1)
try:
out.append((path, int(sz)))
except ValueError:
logger.debug("listDir: skip unparsable line %r", line)
return out
Loading