Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 82 additions & 29 deletions src/meshcore_cli/meshcore_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
import serial.tools.list_ports
from pathlib import Path
import traceback
from prompt_toolkit.application.current import get_app_or_none
from prompt_toolkit.shortcuts import PromptSession
from prompt_toolkit.shortcuts import CompleteStyle
from prompt_toolkit.shortcuts import print_formatted_text
from prompt_toolkit.completion import NestedCompleter, PathCompleter
from prompt_toolkit.completion import CompleteEvent, Completer, Completion
from prompt_toolkit.history import FileHistory
Expand All @@ -22,6 +24,7 @@
from prompt_toolkit.shortcuts import radiolist_dialog
from prompt_toolkit.completion.word_completer import WordCompleter
from prompt_toolkit.document import Document
from prompt_toolkit.utils import get_cwidth

try:
from bleak import BleakScanner, BleakClient
Expand Down Expand Up @@ -98,26 +101,55 @@ def escape_ansi(line):
ansi_escape = re.compile(r'(?:\x1B[@-_]|[\x80-\x9F])[0-?]*[ -/]*[@-~]')
return ansi_escape.sub('', line)

def print_one_line_above(str):
""" prints a string above current line """
width = os.get_terminal_size().columns
stringlen = len(escape_ansi(str))-1
lines = divmod(stringlen, width)[0] + 1
print("\u001B[s", end="") # Save current cursor position
print("\u001B[A", end="") # Move cursor up one line
print("\u001B[999D", end="") # Move cursor to beginning of line
for _ in range(lines):
print("\u001B[S", end="") # Scroll up/pan window down 1 line
print("\u001B[L", end="") # Insert new line
for _ in range(lines - 1):
print("\u001B[A", end="") # Move cursor up one line
print(str, end="") # Print output status msg
print("\u001B[u", end="", flush=True) # Jump back to saved cursor position

def print_above(str):
lines = str.split('\n')
for l in lines:
print_one_line_above(l)
def truncate_to_display_width(text, max_width):
"""Return the longest prefix of text whose terminal display width is <= max_width."""
if max_width <= 0:
return ""
lo, hi = 0, len(text)
while lo < hi:
mid = (lo + hi + 1) // 2
if get_cwidth(text[:mid]) <= max_width:
lo = mid
else:
hi = mid - 1
return text[:lo]

def print_one_line_above(text):
"""Status line(s) above the prompt; delegates to print_above."""
print_above(text)

def print_above(text):
"""
Show text above the current input line.

When prompt_toolkit has a running Application (interactive prompt), plain
print() and DEC save/restore fight the renderer — use print_formatted_text,
which suspends the UI, writes via the Output layer (UTF-8), and redraws.

With no active app (scripts, pipes), use CSI save/move/clear/restore only.
"""
if text == "":
return
if get_app_or_none() is not None:
print_formatted_text(ANSI(text), end="\n")
return
lines = text.split("\n")
n = len(lines)
chunks = ["\033[s", f"\033[{n}A"]
for i, line in enumerate(lines):
chunks.append("\033[1G\033[2K")
chunks.append(line)
if i < n - 1:
chunks.append("\033[B")
chunks.append("\033[u")
combined = "".join(chunks)
enc = getattr(sys.stdout, "encoding", None) or "utf-8"
try:
sys.stdout.buffer.write(combined.encode(enc, errors="replace"))
sys.stdout.buffer.flush()
except (AttributeError, TypeError):
sys.stdout.write(combined)
sys.stdout.flush()

async def process_event_message(mc, ev, json_output, end="\n", above=False):
""" display incoming message """
Expand Down Expand Up @@ -252,9 +284,30 @@ async def handle_log_rx(event):

if chan_name != "" :
width = os.get_terminal_size().columns
cars = width - 13 - len(event.payload["path"]) - len(chan_name) - 1
dispmsg = message.replace("\n","")[0:cars]
txt = f"{ANSI_LIGHT_GRAY}{chan_name} {ANSI_DGREEN}{dispmsg+(cars-len(dispmsg))*' '} {ANSI_START}{width-11-len(event.payload['path'])}G{ANSI_YELLOW}[{event.payload['path']}]{ANSI_LIGHT_GRAY}{event.payload['snr']:6,.2f}{event.payload['rssi']:4}{ANSI_END}"
path = event.payload["path"]
snr_str = f"{event.payload['snr']:6,.2f}"
rssi_str = f"{event.payload['rssi']:4}"
rhs_plain = f"[{path}]{snr_str}{rssi_str}"
rhs_w = get_cwidth(rhs_plain)
rhs_col = max(1, width - rhs_w + 1)
prefix_plain = chan_name + " "
prefix_w = get_cwidth(prefix_plain)
trailing_before_csi_w = 1
max_msg_w = max(0, rhs_col - 1 - prefix_w - trailing_before_csi_w)
raw_msg = message.replace("\n", "")
dispmsg = truncate_to_display_width(raw_msg, max_msg_w)
while True:
left_w = get_cwidth(prefix_plain + dispmsg + " ")
if left_w >= rhs_col - 1:
break
nxt = dispmsg + " "
if get_cwidth(prefix_plain + nxt + " ") > rhs_col - 1:
break
dispmsg = nxt
txt = (
f"{ANSI_LIGHT_GRAY}{prefix_plain}{ANSI_DGREEN}{dispmsg} "
f"{ANSI_START}{rhs_col}G{ANSI_YELLOW}[{path}]{ANSI_LIGHT_GRAY}{snr_str}{rssi_str}{ANSI_END}"
)

if handle_message.above:
print_above(txt)
Expand Down Expand Up @@ -2785,7 +2838,7 @@ async def next_cmd(mc, cmds, json_output=False):
sess = PromptSession("Password: ", is_password=True)
password = await sess.prompt_async()

timeout = 0 if not "timeout" in contact else contact["timeout"]
timeout = 0 if not "timeout" in contact else contact["timeout"]
res = await mc.commands.send_login_sync(contact, password, timeout = timeout)
logger.debug(res)
if res is None:
Expand Down Expand Up @@ -3373,7 +3426,7 @@ async def next_cmd(mc, cmds, json_output=False):
else:
if json_output:
print(json.dumps(res.payload))
else :
else:
path_len = res.payload['path_len']
if (path_len == 0) :
print("0 hop")
Expand Down Expand Up @@ -3940,7 +3993,7 @@ def get_help_for (cmdname, context="line") :
elif cmdname == "trace" or cmdname == "tr" :
print("""Trace

Trace is a command used to get signal information (SNR) along a path.
Trace is a command used to get signal information (SNR) along a path.

Basic call to trace takes the path to follow as an argument, specifying each repeater along the path with its hash (separated or not with a comma).

Expand Down Expand Up @@ -3993,17 +4046,17 @@ def get_help_for (cmdname, context="line") :
* advert_path : path taken by an advert
* disc_path : discover in and out path for a contact

When using change_path, you specify manually the path to the contact. Path is given as an hex string containing hashes for all repeaters in the way (you can use commas to separate hashes). By default hash_size will be the one of the node. If using commas, it will be guessed from first hash. You can also use a colon to specify path_hash_mode.
When using change_path, you specify manually the path to the contact. Path is given as an hex string containing hashes for all repeaters in the way (you can use commas to separate hashes). By default hash_size will be the one of the node. If using commas, it will be guessed from first hash. You can also use a colon to specify path_hash_mode.

If you want to set the path for a node through 112233 445566 778899, you can use
If you want to set the path for a node through 112233 445566 778899, you can use
- 114477:0 or 11,44,77 for one byte hash
- 112244557788:1 or 1122,4455,7788 for two byte hash
- 112233445566778899:2 or 112233,445566,778899 for three byte hash

To set an empty path use 0.

To get the path for a contact, you can use three commands:
- path will gives you the path stored in the node.
- path will gives you the path stored in the node.
- You can also get a path from a key using advert_path which will give you the path taken for last advert from that node to come.
disc_path will send a path request and give you input and output path for a node.

Expand Down