From bf69a4fd35378d83b5cc6a32714edcaeccaefb9c Mon Sep 17 00:00:00 2001 From: "David A. van Leeuwen" Date: Mon, 13 Apr 2026 21:18:56 +0200 Subject: [PATCH 1/2] Fix issue in returning cursor after print_above() --- src/meshcore_cli/meshcore_cli.py | 70 ++++++++++++++++++++------------ 1 file changed, 44 insertions(+), 26 deletions(-) diff --git a/src/meshcore_cli/meshcore_cli.py b/src/meshcore_cli/meshcore_cli.py index 021d70e..96535f3 100644 --- a/src/meshcore_cli/meshcore_cli.py +++ b/src/meshcore_cli/meshcore_cli.py @@ -12,8 +12,10 @@ import serial.tools.list_ports from pathlib import Path import traceback +from prompt_toolkit.application.current import get_app_or_none from prompt_toolkit.shortcuts import PromptSession from prompt_toolkit.shortcuts import CompleteStyle +from prompt_toolkit.shortcuts import print_formatted_text from prompt_toolkit.completion import NestedCompleter, PathCompleter from prompt_toolkit.completion import CompleteEvent, Completer, Completion from prompt_toolkit.history import FileHistory @@ -98,26 +100,42 @@ def escape_ansi(line): ansi_escape = re.compile(r'(?:\x1B[@-_]|[\x80-\x9F])[0-?]*[ -/]*[@-~]') return ansi_escape.sub('', line) -def print_one_line_above(str): - """ prints a string above current line """ - width = os.get_terminal_size().columns - stringlen = len(escape_ansi(str))-1 - lines = divmod(stringlen, width)[0] + 1 - print("\u001B[s", end="") # Save current cursor position - print("\u001B[A", end="") # Move cursor up one line - print("\u001B[999D", end="") # Move cursor to beginning of line - for _ in range(lines): - print("\u001B[S", end="") # Scroll up/pan window down 1 line - print("\u001B[L", end="") # Insert new line - for _ in range(lines - 1): - print("\u001B[A", end="") # Move cursor up one line - print(str, end="") # Print output status msg - print("\u001B[u", end="", flush=True) # Jump back to saved cursor position - -def print_above(str): - lines = str.split('\n') - for l in lines: - print_one_line_above(l) +def print_one_line_above(text): + """Status line(s) above the prompt; delegates to print_above.""" + print_above(text) + +def print_above(text): + """ + Show text above the current input line. + + When prompt_toolkit has a running Application (interactive prompt), plain + print() and DEC save/restore fight the renderer — use print_formatted_text, + which suspends the UI, writes via the Output layer (UTF-8), and redraws. + + With no active app (scripts, pipes), use CSI save/move/clear/restore only. + """ + if text == "": + return + if get_app_or_none() is not None: + print_formatted_text(ANSI(text), end="\n") + return + lines = text.split("\n") + n = len(lines) + chunks = ["\033[s", f"\033[{n}A"] + for i, line in enumerate(lines): + chunks.append("\033[1G\033[2K") + chunks.append(line) + if i < n - 1: + chunks.append("\033[B") + chunks.append("\033[u") + combined = "".join(chunks) + enc = getattr(sys.stdout, "encoding", None) or "utf-8" + try: + sys.stdout.buffer.write(combined.encode(enc, errors="replace")) + sys.stdout.buffer.flush() + except (AttributeError, TypeError): + sys.stdout.write(combined) + sys.stdout.flush() async def process_event_message(mc, ev, json_output, end="\n", above=False): """ display incoming message """ @@ -2785,7 +2803,7 @@ async def next_cmd(mc, cmds, json_output=False): sess = PromptSession("Password: ", is_password=True) password = await sess.prompt_async() - timeout = 0 if not "timeout" in contact else contact["timeout"] + timeout = 0 if not "timeout" in contact else contact["timeout"] res = await mc.commands.send_login_sync(contact, password, timeout = timeout) logger.debug(res) if res is None: @@ -3373,7 +3391,7 @@ async def next_cmd(mc, cmds, json_output=False): else: if json_output: print(json.dumps(res.payload)) - else : + else: path_len = res.payload['path_len'] if (path_len == 0) : print("0 hop") @@ -3940,7 +3958,7 @@ def get_help_for (cmdname, context="line") : elif cmdname == "trace" or cmdname == "tr" : print("""Trace -Trace is a command used to get signal information (SNR) along a path. +Trace is a command used to get signal information (SNR) along a path. Basic call to trace takes the path to follow as an argument, specifying each repeater along the path with its hash (separated or not with a comma). @@ -3993,9 +4011,9 @@ def get_help_for (cmdname, context="line") : * advert_path : path taken by an advert * disc_path : discover in and out path for a contact -When using change_path, you specify manually the path to the contact. Path is given as an hex string containing hashes for all repeaters in the way (you can use commas to separate hashes). By default hash_size will be the one of the node. If using commas, it will be guessed from first hash. You can also use a colon to specify path_hash_mode. +When using change_path, you specify manually the path to the contact. Path is given as an hex string containing hashes for all repeaters in the way (you can use commas to separate hashes). By default hash_size will be the one of the node. If using commas, it will be guessed from first hash. You can also use a colon to specify path_hash_mode. -If you want to set the path for a node through 112233 445566 778899, you can use +If you want to set the path for a node through 112233 445566 778899, you can use - 114477:0 or 11,44,77 for one byte hash - 112244557788:1 or 1122,4455,7788 for two byte hash - 112233445566778899:2 or 112233,445566,778899 for three byte hash @@ -4003,7 +4021,7 @@ def get_help_for (cmdname, context="line") : To set an empty path use 0. To get the path for a contact, you can use three commands: - - path will gives you the path stored in the node. + - path will gives you the path stored in the node. - You can also get a path from a key using advert_path which will give you the path taken for last advert from that node to come. disc_path will send a path request and give you input and output path for a node. From c3f7257777d1623b5d12dd5c0dd21fb06e72a08e Mon Sep 17 00:00:00 2001 From: "David A. van Leeuwen" Date: Wed, 15 Apr 2026 10:46:35 +0200 Subject: [PATCH 2/2] Fix width calculations for right-aligned texts --- src/meshcore_cli/meshcore_cli.py | 41 +++++++++++++++++++++++++++++--- 1 file changed, 38 insertions(+), 3 deletions(-) diff --git a/src/meshcore_cli/meshcore_cli.py b/src/meshcore_cli/meshcore_cli.py index 96535f3..2526fff 100644 --- a/src/meshcore_cli/meshcore_cli.py +++ b/src/meshcore_cli/meshcore_cli.py @@ -24,6 +24,7 @@ from prompt_toolkit.shortcuts import radiolist_dialog from prompt_toolkit.completion.word_completer import WordCompleter from prompt_toolkit.document import Document +from prompt_toolkit.utils import get_cwidth try: from bleak import BleakScanner, BleakClient @@ -100,6 +101,19 @@ def escape_ansi(line): ansi_escape = re.compile(r'(?:\x1B[@-_]|[\x80-\x9F])[0-?]*[ -/]*[@-~]') return ansi_escape.sub('', line) +def truncate_to_display_width(text, max_width): + """Return the longest prefix of text whose terminal display width is <= max_width.""" + if max_width <= 0: + return "" + lo, hi = 0, len(text) + while lo < hi: + mid = (lo + hi + 1) // 2 + if get_cwidth(text[:mid]) <= max_width: + lo = mid + else: + hi = mid - 1 + return text[:lo] + def print_one_line_above(text): """Status line(s) above the prompt; delegates to print_above.""" print_above(text) @@ -270,9 +284,30 @@ async def handle_log_rx(event): if chan_name != "" : width = os.get_terminal_size().columns - cars = width - 13 - len(event.payload["path"]) - len(chan_name) - 1 - dispmsg = message.replace("\n","")[0:cars] - txt = f"{ANSI_LIGHT_GRAY}{chan_name} {ANSI_DGREEN}{dispmsg+(cars-len(dispmsg))*' '} {ANSI_START}{width-11-len(event.payload['path'])}G{ANSI_YELLOW}[{event.payload['path']}]{ANSI_LIGHT_GRAY}{event.payload['snr']:6,.2f}{event.payload['rssi']:4}{ANSI_END}" + path = event.payload["path"] + snr_str = f"{event.payload['snr']:6,.2f}" + rssi_str = f"{event.payload['rssi']:4}" + rhs_plain = f"[{path}]{snr_str}{rssi_str}" + rhs_w = get_cwidth(rhs_plain) + rhs_col = max(1, width - rhs_w + 1) + prefix_plain = chan_name + " " + prefix_w = get_cwidth(prefix_plain) + trailing_before_csi_w = 1 + max_msg_w = max(0, rhs_col - 1 - prefix_w - trailing_before_csi_w) + raw_msg = message.replace("\n", "") + dispmsg = truncate_to_display_width(raw_msg, max_msg_w) + while True: + left_w = get_cwidth(prefix_plain + dispmsg + " ") + if left_w >= rhs_col - 1: + break + nxt = dispmsg + " " + if get_cwidth(prefix_plain + nxt + " ") > rhs_col - 1: + break + dispmsg = nxt + txt = ( + f"{ANSI_LIGHT_GRAY}{prefix_plain}{ANSI_DGREEN}{dispmsg} " + f"{ANSI_START}{rhs_col}G{ANSI_YELLOW}[{path}]{ANSI_LIGHT_GRAY}{snr_str}{rssi_str}{ANSI_END}" + ) if handle_message.above: print_above(txt)