diff --git a/Lib/_pyrepl/commands.py b/Lib/_pyrepl/commands.py index 10127e58897a58..418862bbd21998 100644 --- a/Lib/_pyrepl/commands.py +++ b/Lib/_pyrepl/commands.py @@ -22,6 +22,7 @@ from __future__ import annotations import os import time +from typing import TYPE_CHECKING # Categories of actions: # killing @@ -32,10 +33,11 @@ # finishing # [completion] +from .render import RenderedScreen from .trace import trace # types -if False: +if TYPE_CHECKING: from .historical_reader import HistoricalReader @@ -74,7 +76,7 @@ def kill_range(self, start: int, end: int) -> None: else: r.kill_ring.append(text) r.pos = start - r.dirty = True + r.invalidate_buffer(start) class YankCommand(Command): @@ -125,24 +127,27 @@ def do(self) -> None: r.arg = 10 * r.arg - d else: r.arg = 10 * r.arg + d - r.dirty = True + r.invalidate_prompt() class clear_screen(Command): def do(self) -> None: r = self.reader + trace("command.clear_screen") r.console.clear() - r.dirty = True + r.invalidate_full() class refresh(Command): def do(self) -> None: - self.reader.dirty = True + trace("command.refresh") + self.reader.invalidate_full() class repaint(Command): def do(self) -> None: - self.reader.dirty = True + trace("command.repaint") + self.reader.invalidate_full() self.reader.console.repaint() @@ -208,9 +213,10 @@ def do(self) -> None: repl = len(r.kill_ring[-1]) r.kill_ring.insert(0, r.kill_ring.pop()) t = r.kill_ring[-1] + start = r.pos - repl b[r.pos - repl : r.pos] = t r.pos = r.pos - repl + len(t) - r.dirty = True + r.invalidate_buffer(start) class interrupt(FinishCommand): @@ -242,8 +248,9 @@ def do(self) -> None: r.console.prepare() r.pos = p # r.posxy = 0, 0 # XXX this is invalid - r.dirty = True - r.console.screen = [] + r.invalidate_full() + trace("command.suspend sync_rendered_screen") + r.console.sync_rendered_screen(RenderedScreen.empty(), r.console.posxy) class up(MotionCommand): @@ -369,6 +376,7 @@ class self_insert(EditCommand): def do(self) -> None: r = self.reader text = self.event * r.get_arg() + start = r.pos r.insert(text) if r.paste_mode: data = "" @@ -376,7 +384,7 @@ def do(self) -> None: data += ev.data if data: r.insert(data) - r.last_refresh_cache.invalidated = True + r.invalidate_buffer(start) class insert_nl(EditCommand): @@ -400,20 +408,23 @@ def do(self) -> None: del b[s] b.insert(t, c) r.pos = t - r.dirty = True + r.invalidate_buffer(s) class backspace(EditCommand): def do(self) -> None: r = self.reader b = r.buffer + changed_from: int | None = None for i in range(r.get_arg()): if r.pos > 0: r.pos -= 1 del b[r.pos] - r.dirty = True + changed_from = r.pos if changed_from is None else min(changed_from, r.pos) else: self.reader.error("can't backspace at start") + if changed_from is not None: + r.invalidate_buffer(changed_from) class delete(EditCommand): @@ -431,12 +442,15 @@ def do(self) -> None: r.console.finish() raise EOFError + changed_from: int | None = None for i in range(r.get_arg()): if r.pos != len(b): del b[r.pos] - r.dirty = True + changed_from = r.pos if changed_from is None else min(changed_from, r.pos) else: self.reader.error("end of buffer") + if changed_from is not None: + r.invalidate_buffer(changed_from) class accept(FinishCommand): @@ -478,14 +492,17 @@ def do(self) -> None: # We need to copy over the state so that it's consistent between # console and reader, and console does not overwrite/append stuff - self.reader.console.screen = self.reader.screen.copy() - self.reader.console.posxy = self.reader.cxy + trace("command.show_history sync_rendered_screen") + self.reader.console.sync_rendered_screen( + self.reader.rendered_screen, + self.reader.cxy, + ) class paste_mode(Command): def do(self) -> None: self.reader.paste_mode = not self.reader.paste_mode - self.reader.dirty = True + self.reader.invalidate_prompt() class perform_bracketed_paste(Command): @@ -502,4 +519,3 @@ def do(self) -> None: s=time.time() - start, ) self.reader.insert(data.replace(done, "")) - self.reader.last_refresh_cache.invalidated = True diff --git a/Lib/_pyrepl/completing_reader.py b/Lib/_pyrepl/completing_reader.py index 9d2d43be5144e8..a8b94727a04cfc 100644 --- a/Lib/_pyrepl/completing_reader.py +++ b/Lib/_pyrepl/completing_reader.py @@ -21,16 +21,18 @@ from __future__ import annotations from dataclasses import dataclass, field +from typing import TYPE_CHECKING import re from . import commands, console, reader +from .render import RenderLine, ScreenOverlay from .reader import Reader # types Command = commands.Command -if False: - from .types import KeySpec, CommandName +if TYPE_CHECKING: + from .types import CommandName, KeySpec, Keymap def prefix(wordlist: list[str], j: int = 0) -> str: @@ -180,7 +182,7 @@ def do(self) -> None: elif len(completions) == 1: if completions_unchangable and len(completions[0]) == len(stem): r.msg = "[ sole completion ]" - r.dirty = True + r.invalidate_message() r.insert(completions[0][len(stem):]) else: p = prefix(completions, len(stem)) @@ -188,19 +190,20 @@ def do(self) -> None: r.insert(p) if last_is_completer: r.cmpltn_menu_visible = True - r.cmpltn_message_visible = False r.cmpltn_menu, r.cmpltn_menu_end = build_menu( r.console, completions, r.cmpltn_menu_end, r.use_brackets, r.sort_in_column) - r.dirty = True + if r.msg: + r.msg = "" + r.invalidate_message() + r.invalidate_overlay() elif not r.cmpltn_menu_visible: - r.cmpltn_message_visible = True if stem + p in completions: r.msg = "[ complete but not unique ]" - r.dirty = True + r.invalidate_message() else: r.msg = "[ not unique ]" - r.dirty = True + r.invalidate_message() class self_insert(commands.self_insert): @@ -220,6 +223,7 @@ def do(self) -> None: r.cmpltn_menu, r.cmpltn_menu_end = build_menu( r.console, completions, 0, r.use_brackets, r.sort_in_column) + r.invalidate_overlay() else: r.cmpltn_reset() @@ -237,7 +241,6 @@ class CompletingReader(Reader): ### Instance variables cmpltn_menu: list[str] = field(init=False) cmpltn_menu_visible: bool = field(init=False) - cmpltn_message_visible: bool = field(init=False) cmpltn_menu_end: int = field(init=False) cmpltn_menu_choices: list[str] = field(init=False) @@ -248,7 +251,7 @@ def __post_init__(self) -> None: self.commands[c.__name__] = c self.commands[c.__name__.replace('_', '-')] = c - def collect_keymap(self) -> tuple[tuple[KeySpec, CommandName], ...]: + def collect_keymap(self) -> Keymap: return super().collect_keymap() + ( (r'\t', 'complete'),) @@ -257,28 +260,26 @@ def after_command(self, cmd: Command) -> None: if not isinstance(cmd, (complete, self_insert)): self.cmpltn_reset() - def calc_screen(self) -> list[str]: - screen = super().calc_screen() - if self.cmpltn_menu_visible: - # We display the completions menu below the current prompt - ly = self.lxy[1] + 1 - screen[ly:ly] = self.cmpltn_menu - # If we're not in the middle of multiline edit, don't append to screeninfo - # since that screws up the position calculation in pos2xy function. - # This is a hack to prevent the cursor jumping - # into the completions menu when pressing left or down arrow. - if self.pos != len(self.buffer): - self.screeninfo[ly:ly] = [(0, [])]*len(self.cmpltn_menu) - return screen + def get_screen_overlays(self) -> tuple[ScreenOverlay, ...]: + if not self.cmpltn_menu_visible: + return () + return ( + ScreenOverlay( + self.lxy[1] + 1, + tuple(RenderLine.from_rendered_text(line) for line in self.cmpltn_menu), + insert=True, + ), + ) def finish(self) -> None: super().finish() self.cmpltn_reset() def cmpltn_reset(self) -> None: + if getattr(self, "cmpltn_menu_visible", False): + self.invalidate_overlay() self.cmpltn_menu = [] self.cmpltn_menu_visible = False - self.cmpltn_message_visible = False self.cmpltn_menu_end = 0 self.cmpltn_menu_choices = [] diff --git a/Lib/_pyrepl/console.py b/Lib/_pyrepl/console.py index e0535d50396316..2a53d5ff581fa2 100644 --- a/Lib/_pyrepl/console.py +++ b/Lib/_pyrepl/console.py @@ -19,23 +19,25 @@ from __future__ import annotations +import os import _colorize from abc import ABC, abstractmethod import ast import code import linecache -from dataclasses import dataclass, field -import os.path +from dataclasses import dataclass import re import sys +from typing import TYPE_CHECKING - -TYPE_CHECKING = False +from .render import RenderedScreen +from .trace import trace if TYPE_CHECKING: - from typing import IO - from typing import Callable + from typing import Callable, IO + + from .types import CursorXY @dataclass @@ -47,10 +49,17 @@ class Event: @dataclass class Console(ABC): - posxy: tuple[int, int] - screen: list[str] = field(default_factory=list) + posxy: CursorXY = (0, 0) height: int = 25 width: int = 80 + _redraw_debug_palette: tuple[str, ...] = ( + "\x1b[41m", + "\x1b[42m", + "\x1b[43m", + "\x1b[44m", + "\x1b[45m", + "\x1b[46m", + ) def __init__( self, @@ -71,8 +80,52 @@ def __init__( else: self.output_fd = f_out.fileno() + self.posxy = (0, 0) + self.height = 25 + self.width = 80 + self._rendered_screen = RenderedScreen.empty() + self._redraw_visual_cycle = 0 + + @property + def screen(self) -> list[str]: + return list(self._rendered_screen.screen_lines) + + def sync_rendered_screen( + self, + rendered_screen: RenderedScreen, + posxy: CursorXY | None = None, + ) -> None: + if posxy is None: + posxy = rendered_screen.cursor + self.posxy = posxy + self._rendered_screen = rendered_screen + trace( + "console.sync_rendered_screen lines={lines} cursor={cursor}", + lines=len(rendered_screen.composed_lines), + cursor=posxy, + ) + + def invalidate_render_state(self) -> None: + self._rendered_screen = RenderedScreen.empty() + trace("console.invalidate_render_state") + + def begin_redraw_visualization(self) -> str | None: + if "PYREPL_VISUALIZE_REDRAWS" not in os.environ: + return None + + palette = self._redraw_debug_palette + cycle = self._redraw_visual_cycle + style = palette[cycle % len(palette)] + self._redraw_visual_cycle = cycle + 1 + trace( + "console.begin_redraw_visualization cycle={cycle} style={style!r}", + cycle=cycle, + style=style, + ) + return style + @abstractmethod - def refresh(self, screen: list[str], xy: tuple[int, int]) -> None: ... + def refresh(self, rendered_screen: RenderedScreen) -> None: ... @abstractmethod def prepare(self) -> None: ... diff --git a/Lib/_pyrepl/content.py b/Lib/_pyrepl/content.py new file mode 100644 index 00000000000000..f9932df497d43b --- /dev/null +++ b/Lib/_pyrepl/content.py @@ -0,0 +1,82 @@ +from __future__ import annotations + +from dataclasses import dataclass + +from .utils import ColorSpan, StyleRef, THEME, iter_display_chars, unbracket, wlen + + +@dataclass(frozen=True, slots=True) +class ContentFragment: + text: str + width: int + style: StyleRef = StyleRef() + + +@dataclass(frozen=True, slots=True) +class PromptContent: + leading_lines: tuple[ContentFragment, ...] + text: str + width: int + + +@dataclass(frozen=True, slots=True) +class SourceLine: + lineno: int + text: str + start_offset: int + has_newline: bool + cursor_index: int | None = None + + @property + def cursor_on_line(self) -> bool: + return self.cursor_index is not None + + +@dataclass(frozen=True, slots=True) +class ContentLine: + source: SourceLine + prompt: PromptContent + body: tuple[ContentFragment, ...] + + +def process_prompt(prompt: str) -> PromptContent: + r"""Return prompt content with width measured without zero-width markup.""" + + prompt_text = unbracket(prompt, including_content=False) + visible_prompt = unbracket(prompt, including_content=True) + leading_lines: list[ContentFragment] = [] + + while "\n" in prompt_text: + leading_text, _, prompt_text = prompt_text.partition("\n") + visible_leading, _, visible_prompt = visible_prompt.partition("\n") + leading_lines.append(ContentFragment(leading_text, wlen(visible_leading))) + + return PromptContent(tuple(leading_lines), prompt_text, wlen(visible_prompt)) + + +def build_body_fragments( + buffer: str, + colors: list[ColorSpan] | None, + start_index: int, +) -> tuple[ContentFragment, ...]: + if colors is None: + return tuple( + ContentFragment( + styled_char.text, + styled_char.width, + StyleRef(), + ) + for styled_char in iter_display_chars(buffer, colors, start_index) + ) + + theme = THEME() + return tuple( + ContentFragment( + styled_char.text, + styled_char.width, + StyleRef.from_tag(styled_char.tag, theme[styled_char.tag]) + if styled_char.tag + else StyleRef(), + ) + for styled_char in iter_display_chars(buffer, colors, start_index) + ) diff --git a/Lib/_pyrepl/historical_reader.py b/Lib/_pyrepl/historical_reader.py index 5cc82690ae36fc..09b969d80bc231 100644 --- a/Lib/_pyrepl/historical_reader.py +++ b/Lib/_pyrepl/historical_reader.py @@ -90,7 +90,7 @@ def do(self) -> None: if r.get_unicode() != r.history[r.historyi]: r.buffer = list(r.history[r.historyi]) r.pos = len(r.buffer) - r.dirty = True + r.invalidate_buffer(0) class first_history(commands.Command): @@ -130,10 +130,11 @@ def do(self) -> None: o = len(r.yank_arg_yanked) else: o = 0 + start = r.pos - o b[r.pos - o : r.pos] = list(w) r.yank_arg_yanked = w r.pos += len(w) - o - r.dirty = True + r.invalidate_buffer(start) class forward_history_isearch(commands.Command): @@ -142,7 +143,7 @@ def do(self) -> None: r.isearch_direction = ISEARCH_DIRECTION_FORWARDS r.isearch_start = r.historyi, r.pos r.isearch_term = "" - r.dirty = True + r.invalidate_prompt() r.push_input_trans(r.isearch_trans) @@ -150,7 +151,7 @@ class reverse_history_isearch(commands.Command): def do(self) -> None: r = self.reader r.isearch_direction = ISEARCH_DIRECTION_BACKWARDS - r.dirty = True + r.invalidate_prompt() r.isearch_term = "" r.push_input_trans(r.isearch_trans) r.isearch_start = r.historyi, r.pos @@ -163,7 +164,7 @@ def do(self) -> None: r.pop_input_trans() r.select_item(r.isearch_start[0]) r.pos = r.isearch_start[1] - r.dirty = True + r.invalidate_prompt() class isearch_add_character(commands.Command): @@ -171,7 +172,7 @@ def do(self) -> None: r = self.reader b = r.buffer r.isearch_term += self.event[-1] - r.dirty = True + r.invalidate_prompt() p = r.pos + len(r.isearch_term) - 1 if b[p : p + 1] != [r.isearch_term[-1]]: r.isearch_next() @@ -182,7 +183,7 @@ def do(self) -> None: r = self.reader if len(r.isearch_term) > 0: r.isearch_term = r.isearch_term[:-1] - r.dirty = True + r.invalidate_prompt() else: r.error("nothing to rubout") @@ -207,7 +208,7 @@ def do(self) -> None: r.isearch_direction = ISEARCH_DIRECTION_NONE r.console.forgetinput() r.pop_input_trans() - r.dirty = True + r.invalidate_prompt() @dataclass @@ -278,8 +279,7 @@ def select_item(self, i: int) -> None: self.buffer = list(buf) self.historyi = i self.pos = len(self.buffer) - self.dirty = True - self.last_refresh_cache.invalidated = True + self.invalidate_buffer(0) def get_item(self, i: int) -> str: if i != len(self.history): @@ -357,7 +357,7 @@ def search_next(self, *, forwards: bool) -> None: if forwards and not match_prefix: self.pos = 0 self.buffer = [] - self.dirty = True + self.invalidate_buffer(0) else: self.error("not found") return diff --git a/Lib/_pyrepl/input.py b/Lib/_pyrepl/input.py index 21c24eb5cde3e3..2d65246c700f27 100644 --- a/Lib/_pyrepl/input.py +++ b/Lib/_pyrepl/input.py @@ -38,10 +38,11 @@ from abc import ABC, abstractmethod import unicodedata from collections import deque +from typing import TYPE_CHECKING # types -if False: +if TYPE_CHECKING: from .types import EventTuple diff --git a/Lib/_pyrepl/layout.py b/Lib/_pyrepl/layout.py new file mode 100644 index 00000000000000..39054c490037ef --- /dev/null +++ b/Lib/_pyrepl/layout.py @@ -0,0 +1,230 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Self + +from .content import ContentFragment, ContentLine +from .types import CursorXY, ScreenInfoRow + + +@dataclass(frozen=True, slots=True) +class LayoutRow: + prompt_width: int + char_widths: tuple[int, ...] + suffix_width: int = 0 + buffer_advance: int = 0 + + @property + def width(self) -> int: + return self.prompt_width + sum(self.char_widths) + self.suffix_width + + @property + def screeninfo(self) -> ScreenInfoRow: + widths = list(self.char_widths) + if self.suffix_width: + widths.append(self.suffix_width) + return self.prompt_width, widths + + +@dataclass(frozen=True, slots=True) +class LayoutMap: + rows: tuple[LayoutRow, ...] + + @classmethod + def empty(cls) -> Self: + return cls((LayoutRow(0, ()),)) + + @property + def screeninfo(self) -> list[ScreenInfoRow]: + return [row.screeninfo for row in self.rows] + + def max_column(self, y: int) -> int: + return self.rows[y].width + + def max_row(self) -> int: + return len(self.rows) - 1 + + def pos_to_xy(self, pos: int) -> CursorXY: + if not self.rows: + return 0, 0 + + remaining = pos + for y, row in enumerate(self.rows): + if remaining <= len(row.char_widths): + # Prompt-only leading rows are terminal scenery, not real + # buffer positions. Treating them as real just manufactures + # bugs. + if remaining == 0 and not row.char_widths and row.buffer_advance == 0 and y < len(self.rows) - 1: + continue + x = row.prompt_width + for width in row.char_widths[:remaining]: + x += width + return x, y + remaining -= row.buffer_advance + last_row = self.rows[-1] + return last_row.width - last_row.suffix_width, len(self.rows) - 1 + + def xy_to_pos(self, x: int, y: int) -> int: + if not self.rows: + return 0 + + pos = 0 + for row in self.rows[:y]: + pos += row.buffer_advance + + row = self.rows[y] + cur_x = row.prompt_width + char_widths = row.char_widths + i = 0 + for i, width in enumerate(char_widths): + if cur_x >= x: + # Include trailing zero-width (combining) chars at this position + for trailing_width in char_widths[i:]: + if trailing_width == 0: + pos += 1 + else: + break + return pos + if width == 0: + pos += 1 + continue + cur_x += width + pos += 1 + return pos + + +@dataclass(frozen=True, slots=True) +class WrappedRow: + prompt_text: str = "" + prompt_width: int = 0 + fragments: tuple[ContentFragment, ...] = () + layout_widths: tuple[int, ...] = () + suffix: str = "" + suffix_width: int = 0 + buffer_advance: int = 0 + line_end_offset: int = 0 + + +@dataclass(frozen=True, slots=True) +class LayoutResult: + wrapped_rows: tuple[WrappedRow, ...] + layout_map: LayoutMap + line_end_offsets: tuple[int, ...] + + +def layout_content_lines( + lines: tuple[ContentLine, ...], + width: int, + start_offset: int, +) -> LayoutResult: + if width <= 0: + return LayoutResult((), LayoutMap(()), ()) + + offset = start_offset + wrapped_rows: list[WrappedRow] = [] + layout_rows: list[LayoutRow] = [] + line_end_offsets: list[int] = [] + + for line in lines: + newline_advance = int(line.source.has_newline) + for leading in line.prompt.leading_lines: + line_end_offsets.append(offset) + wrapped_rows.append( + WrappedRow( + fragments=(leading,), + line_end_offset=offset, + ) + ) + layout_rows.append(LayoutRow(0, (), buffer_advance=0)) + + prompt_text = line.prompt.text + prompt_width = line.prompt.width + body = tuple(line.body) + body_widths = tuple(fragment.width for fragment in body) + + if not body_widths or (sum(body_widths) + prompt_width) < width: + offset += len(body) + newline_advance + line_end_offsets.append(offset) + wrapped_rows.append( + WrappedRow( + prompt_text=prompt_text, + prompt_width=prompt_width, + fragments=body, + layout_widths=body_widths, + buffer_advance=len(body) + newline_advance, + line_end_offset=offset, + ) + ) + layout_rows.append( + LayoutRow( + prompt_width, + body_widths, + buffer_advance=len(body) + newline_advance, + ) + ) + continue + + current_prompt = prompt_text + current_prompt_width = prompt_width + start = 0 + total = len(body) + while True: + index_to_wrap_before = 0 + column = 0 + for char_width in body_widths[start:]: + if column + char_width + current_prompt_width >= width: + break + index_to_wrap_before += 1 + column += char_width + + if index_to_wrap_before == 0 and start < total: + index_to_wrap_before = 1 # force progress + + at_line_end = (start + index_to_wrap_before) >= total + if at_line_end: + offset += index_to_wrap_before + newline_advance + suffix = "" + suffix_width = 0 + buffer_advance = index_to_wrap_before + newline_advance + else: + offset += index_to_wrap_before + suffix = "\\" + suffix_width = 1 + buffer_advance = index_to_wrap_before + + end = start + index_to_wrap_before + row_fragments = body[start:end] + row_widths = body_widths[start:end] + line_end_offsets.append(offset) + wrapped_rows.append( + WrappedRow( + prompt_text=current_prompt, + prompt_width=current_prompt_width, + fragments=row_fragments, + layout_widths=row_widths, + suffix=suffix, + suffix_width=suffix_width, + buffer_advance=buffer_advance, + line_end_offset=offset, + ) + ) + layout_rows.append( + LayoutRow( + current_prompt_width, + row_widths, + suffix_width=suffix_width, + buffer_advance=buffer_advance, + ) + ) + + start = end + current_prompt = "" + current_prompt_width = 0 + if at_line_end: + break + + return LayoutResult( + tuple(wrapped_rows), + LayoutMap(tuple(layout_rows)), + tuple(line_end_offsets), + ) diff --git a/Lib/_pyrepl/reader.py b/Lib/_pyrepl/reader.py index 9ab92f64d1ef63..03dfb9a3535918 100644 --- a/Lib/_pyrepl/reader.py +++ b/Lib/_pyrepl/reader.py @@ -25,16 +25,40 @@ import _colorize from contextlib import contextmanager -from dataclasses import dataclass, field, fields +from dataclasses import dataclass, field, fields, replace +from typing import Self from . import commands, console, input -from .utils import wlen, unbracket, disp_str, gen_colors, THEME +from .content import ( + ContentFragment, + ContentLine, + SourceLine, + build_body_fragments, + process_prompt as build_prompt_content, +) +from .layout import LayoutMap, LayoutResult, LayoutRow, WrappedRow, layout_content_lines +from .render import RenderCell, RenderLine, RenderedScreen, ScreenOverlay +from .utils import ANSI_ESCAPE_SEQUENCE, THEME, StyleRef, wlen, gen_colors from .trace import trace # types Command = commands.Command -from .types import Callback, SimpleContextManager, KeySpec, CommandName +from .types import ( + Callback, + CommandName, + CursorXY, + Dimensions, + EventData, + KeySpec, + Keymap, + ScreenInfoRow, + SimpleContextManager, +) + +type CommandClass = type[Command] +type CommandInput = tuple[CommandName | CommandClass, EventData] +type PromptCellCacheKey = tuple[str, bool] # syntax classes @@ -52,8 +76,8 @@ def make_default_syntax_table() -> dict[str, int]: return st -def make_default_commands() -> dict[CommandName, type[Command]]: - result: dict[CommandName, type[Command]] = {} +def make_default_commands() -> dict[CommandName, CommandClass]: + result: dict[CommandName, CommandClass] = {} for v in vars(commands).values(): if isinstance(v, type) and issubclass(v, Command) and v.__name__[0].islower(): result[v.__name__] = v @@ -61,7 +85,7 @@ def make_default_commands() -> dict[CommandName, type[Command]]: return result -default_keymap: tuple[tuple[KeySpec, CommandName], ...] = tuple( +default_keymap: Keymap = tuple( [ (r"\C-a", "beginning-of-line"), (r"\C-b", "left"), @@ -131,6 +155,75 @@ def make_default_commands() -> dict[CommandName, type[Command]]: ) +@dataclass(frozen=True, slots=True) +class RefreshInvalidation: + cursor_only: bool = False + buffer_from_pos: int | None = None + prompt: bool = False + layout: bool = False + theme: bool = False + message: bool = False + overlay: bool = False + full: bool = False + + @classmethod + def empty(cls) -> Self: + return cls() + + @property + def needs_screen_refresh(self) -> bool: + return any( + ( + self.buffer_from_pos is not None, + self.prompt, + self.layout, + self.theme, + self.message, + self.overlay, + self.full, + ) + ) + + @property + def is_cursor_only(self) -> bool: + return self.cursor_only and not self.needs_screen_refresh + + @property + def buffer_rebuild_from_pos(self) -> int | None: + if self.full or self.prompt or self.layout or self.theme: + return 0 + return self.buffer_from_pos + + def with_cursor(self) -> RefreshInvalidation: + if self.needs_screen_refresh: + return self + return replace(self, cursor_only=True) + + def with_buffer(self, from_pos: int) -> RefreshInvalidation: + current = from_pos + if self.buffer_from_pos is not None: + current = min(current, self.buffer_from_pos) + return replace(self, cursor_only=False, buffer_from_pos=current) + + def with_prompt(self) -> RefreshInvalidation: + return replace(self, cursor_only=False, prompt=True) + + def with_layout(self) -> RefreshInvalidation: + return replace(self, cursor_only=False, layout=True) + + def with_theme(self) -> RefreshInvalidation: + return replace(self, cursor_only=False, theme=True) + + def with_message(self) -> RefreshInvalidation: + return replace(self, cursor_only=False, message=True) + + def with_overlay(self) -> RefreshInvalidation: + return replace(self, cursor_only=False, overlay=True) + + def with_full(self) -> RefreshInvalidation: + return replace(self, cursor_only=False, full=True) + + @dataclass(slots=True) class Reader: """The Reader class implements the bare bones of a command reader, @@ -148,10 +241,9 @@ class Reader: * pos: A 0-based index into 'buffer' for where the insertion point is. - * screeninfo: - A list of screen position tuples. Each list element is a tuple - representing information on visible line length for a given line. - Allows for efficient skipping of color escape sequences. + * layout: + A mapping between buffer positions and rendered rows/columns. + It is the internal source of truth for cursor placement. * cxy, lxy: the position of the insertion point in screen ... * syntax_table: @@ -162,8 +254,6 @@ class Reader: * arg: The emacs-style prefix argument. It will be None if no such argument has been provided. - * dirty: - True if we need to refresh the display. * kill_ring: The emacs-style kill-ring; manipulated with yank & yank-pop * ps1, ps2, ps3, ps4: @@ -198,62 +288,69 @@ class Reader: kill_ring: list[list[str]] = field(default_factory=list) msg: str = "" arg: int | None = None - dirty: bool = False finished: bool = False paste_mode: bool = False - commands: dict[str, type[Command]] = field(default_factory=make_default_commands) - last_command: type[Command] | None = None + commands: dict[CommandName, CommandClass] = field(default_factory=make_default_commands) + last_command: CommandClass | None = None syntax_table: dict[str, int] = field(default_factory=make_default_syntax_table) - keymap: tuple[tuple[str, str], ...] = () + keymap: Keymap = () input_trans: input.KeymapTranslator = field(init=False) input_trans_stack: list[input.KeymapTranslator] = field(default_factory=list) - screen: list[str] = field(default_factory=list) - screeninfo: list[tuple[int, list[int]]] = field(init=False) - cxy: tuple[int, int] = field(init=False) - lxy: tuple[int, int] = field(init=False) - scheduled_commands: list[str] = field(default_factory=list) + rendered_screen: RenderedScreen = field(init=False) + layout: LayoutMap = field(init=False) + cxy: CursorXY = field(init=False) + lxy: CursorXY = field(init=False) + scheduled_commands: list[CommandName] = field(default_factory=list) can_colorize: bool = False threading_hook: Callback | None = None + invalidation: RefreshInvalidation = field(init=False) ## cached metadata to speed up screen refreshes @dataclass class RefreshCache: - screen: list[str] = field(default_factory=list) - screeninfo: list[tuple[int, list[int]]] = field(init=False) + render_lines: list[RenderLine] = field(default_factory=list) + layout_rows: list[LayoutRow] = field(init=False) line_end_offsets: list[int] = field(default_factory=list) pos: int = field(init=False) - cxy: tuple[int, int] = field(init=False) - dimensions: tuple[int, int] = field(init=False) - invalidated: bool = False + dimensions: Dimensions = field(init=False) def update_cache(self, reader: Reader, - screen: list[str], - screeninfo: list[tuple[int, list[int]]], + render_lines: list[RenderLine], + layout_rows: list[LayoutRow], + line_end_offsets: list[int], ) -> None: - self.screen = screen.copy() - self.screeninfo = screeninfo.copy() + self.render_lines = render_lines.copy() + self.layout_rows = layout_rows.copy() + self.line_end_offsets = line_end_offsets.copy() self.pos = reader.pos - self.cxy = reader.cxy self.dimensions = reader.console.width, reader.console.height - self.invalidated = False def valid(self, reader: Reader) -> bool: - if self.invalidated: - return False dimensions = reader.console.width, reader.console.height dimensions_changed = dimensions != self.dimensions return not dimensions_changed - def get_cached_location(self, reader: Reader) -> tuple[int, int]: - if self.invalidated: - raise ValueError("Cache is invalidated") + def get_cached_location( + self, + reader: Reader, + buffer_from_pos: int | None = None, + *, + reuse_full: bool = False, + ) -> tuple[int, int]: + if reuse_full: + if self.line_end_offsets: + last_offset = self.line_end_offsets[-1] + if last_offset >= len(reader.buffer): + return last_offset, len(self.line_end_offsets) + return 0, 0 + if buffer_from_pos is None: + buffer_from_pos = min(reader.pos, self.pos) offset = 0 - earliest_common_pos = min(reader.pos, self.pos) num_common_lines = len(self.line_end_offsets) while num_common_lines > 0: offset = self.line_end_offsets[num_common_lines - 1] - if earliest_common_pos > offset: + if buffer_from_pos > offset: break num_common_lines -= 1 else: @@ -270,123 +367,253 @@ def __post_init__(self) -> None: self.input_trans = input.KeymapTranslator( self.keymap, invalid_cls="invalid-key", character_cls="self-insert" ) - self.screeninfo = [(0, [])] + self.layout = LayoutMap.empty() self.cxy = self.pos2xy() self.lxy = (self.pos, 0) + self.rendered_screen = RenderedScreen.empty() self.can_colorize = _colorize.can_colorize() + self.invalidation = RefreshInvalidation.empty() - self.last_refresh_cache.screeninfo = self.screeninfo + self.last_refresh_cache.layout_rows = list(self.layout.rows) self.last_refresh_cache.pos = self.pos - self.last_refresh_cache.cxy = self.cxy + self.last_refresh_cache.dimensions = (0, 0) - def collect_keymap(self) -> tuple[tuple[KeySpec, CommandName], ...]: - return default_keymap + @property + def screen(self) -> list[str]: + return list(self.rendered_screen.screen_lines) - def calc_screen(self) -> list[str]: - """Translate changes in self.buffer into changes in self.console.screen.""" - # Since the last call to calc_screen: - # screen and screeninfo may differ due to a completion menu being shown - # pos and cxy may differ due to edits, cursor movements, or completion menus + @property + def screeninfo(self) -> list[ScreenInfoRow]: + return self.layout.screeninfo - # Lines that are above both the old and new cursor position can't have changed, - # unless the terminal has been resized (which might cause reflowing) or we've - # entered or left paste mode (which changes prompts, causing reflowing). + def collect_keymap(self) -> Keymap: + return default_keymap + + def calc_screen(self) -> RenderedScreen: + """Translate the editable buffer into a base rendered screen.""" num_common_lines = 0 offset = 0 if self.last_refresh_cache.valid(self): - offset, num_common_lines = self.last_refresh_cache.get_cached_location(self) - - screen = self.last_refresh_cache.screen - del screen[num_common_lines:] - - screeninfo = self.last_refresh_cache.screeninfo - del screeninfo[num_common_lines:] - - last_refresh_line_end_offsets = self.last_refresh_cache.line_end_offsets - del last_refresh_line_end_offsets[num_common_lines:] + if ( + self.invalidation.buffer_from_pos is None + and not ( + self.invalidation.full + or self.invalidation.prompt + or self.invalidation.layout + or self.invalidation.theme + ) + and (self.invalidation.message or self.invalidation.overlay) + ): + offset, num_common_lines = self.last_refresh_cache.get_cached_location( + self, + reuse_full=True, + ) + else: + offset, num_common_lines = self.last_refresh_cache.get_cached_location( + self, + self._buffer_refresh_from_pos(), + ) + + base_render_lines = self.last_refresh_cache.render_lines[:num_common_lines] + layout_rows = self.last_refresh_cache.layout_rows[:num_common_lines] + last_refresh_line_end_offsets = self.last_refresh_cache.line_end_offsets[:num_common_lines] + + source_lines = self._build_source_lines(offset, num_common_lines) + content_lines = self._build_content_lines( + source_lines, + offset, + prompt_from_cache=bool(offset and self.buffer[offset - 1] != "\n"), + ) + layout_result = self._layout_content(content_lines, offset) + base_render_lines.extend(self._render_wrapped_rows(layout_result.wrapped_rows)) + layout_rows.extend(layout_result.layout_map.rows) + last_refresh_line_end_offsets.extend(layout_result.line_end_offsets) - pos = self.pos - pos -= offset + self.layout = LayoutMap(tuple(layout_rows)) + self.cxy = self.pos2xy() + if not source_lines: + # reuse_full path: _build_source_lines didn't run, + # so lxy wasn't updated. Derive it from the buffer. + buf = self.buffer[:self.pos] + lineno = buf.count("\n") + if lineno: + last_nl = len(buf) - 1 - buf[::-1].index("\n") + col = self.pos - last_nl - 1 + else: + col = self.pos + self.lxy = col, lineno + self.last_refresh_cache.update_cache( + self, + base_render_lines, + layout_rows, + last_refresh_line_end_offsets, + ) + return RenderedScreen(tuple(base_render_lines), self.cxy) - prompt_from_cache = (offset and self.buffer[offset - 1] != "\n") + def _buffer_refresh_from_pos(self) -> int: + """Return buffer position from which to rebuild content. - if self.can_colorize: - colors = list(gen_colors(self.get_unicode())) - else: - colors = None - trace("colors = {colors}", colors=colors) + Returns 0 (full rebuild) when no incremental position is known. + """ + buffer_from_pos = self.invalidation.buffer_rebuild_from_pos + if buffer_from_pos is not None: + return buffer_from_pos + return 0 + + def _build_source_lines( + self, + offset: int, + first_lineno: int, + ) -> tuple[SourceLine, ...]: + if offset == len(self.buffer) and first_lineno > 0: + return () + + pos = self.pos - offset lines = "".join(self.buffer[offset:]).split("\n") cursor_found = False lines_beyond_cursor = 0 - for ln, line in enumerate(lines, num_common_lines): + source_lines: list[SourceLine] = [] + current_offset = offset + + for line_index, line in enumerate(lines): + lineno = first_lineno + line_index + has_newline = line_index < len(lines) - 1 line_len = len(line) + cursor_index: int | None = None if 0 <= pos <= line_len: - self.lxy = pos, ln + cursor_index = pos + self.lxy = pos, lineno cursor_found = True elif cursor_found: lines_beyond_cursor += 1 if lines_beyond_cursor > self.console.height: - # No need to keep formatting lines. - # The console can't show them. break + + source_lines.append( + SourceLine( + lineno=lineno, + text=line, + start_offset=current_offset, + has_newline=has_newline, + cursor_index=cursor_index, + ) + ) + pos -= line_len + 1 + current_offset += line_len + (1 if has_newline else 0) + + return tuple(source_lines) + + def _build_content_lines( + self, + source_lines: tuple[SourceLine, ...], + offset: int, + *, + prompt_from_cache: bool, + ) -> tuple[ContentLine, ...]: + if self.can_colorize: + colors = list(gen_colors(self.get_unicode())) + else: + colors = None + trace("colors = {colors}", colors=colors) + + content_lines: list[ContentLine] = [] + for source_line in source_lines: if prompt_from_cache: - # Only the first line's prompt can come from the cache prompt_from_cache = False prompt = "" else: - prompt = self.get_prompt(ln, line_len >= pos >= 0) - while "\n" in prompt: - pre_prompt, _, prompt = prompt.partition("\n") - last_refresh_line_end_offsets.append(offset) - screen.append(pre_prompt) - screeninfo.append((0, [])) - pos -= line_len + 1 - prompt, prompt_len = self.process_prompt(prompt) - chars, char_widths = disp_str(line, colors, offset) - wrapcount = (sum(char_widths) + prompt_len) // self.console.width - if wrapcount == 0 or not char_widths: - offset += line_len + 1 # Takes all of the line plus the newline - last_refresh_line_end_offsets.append(offset) - screen.append(prompt + "".join(chars)) - screeninfo.append((prompt_len, char_widths)) - else: - pre = prompt - prelen = prompt_len - for wrap in range(wrapcount + 1): - index_to_wrap_before = 0 - column = 0 - for char_width in char_widths: - if column + char_width + prelen >= self.console.width: - break - index_to_wrap_before += 1 - column += char_width - if len(chars) > index_to_wrap_before: - offset += index_to_wrap_before - post = "\\" - after = [1] - else: - offset += index_to_wrap_before + 1 # Takes the newline - post = "" - after = [] - last_refresh_line_end_offsets.append(offset) - render = pre + "".join(chars[:index_to_wrap_before]) + post - render_widths = char_widths[:index_to_wrap_before] + after - screen.append(render) - screeninfo.append((prelen, render_widths)) - chars = chars[index_to_wrap_before:] - char_widths = char_widths[index_to_wrap_before:] - pre = "" - prelen = 0 - self.screeninfo = screeninfo - self.cxy = self.pos2xy() - if self.msg: - for mline in self.msg.split("\n"): - screen.append(mline) - screeninfo.append((0, [])) + prompt = self.get_prompt(source_line.lineno, source_line.cursor_on_line) + content_lines.append( + ContentLine( + source=source_line, + prompt=build_prompt_content(prompt), + body=build_body_fragments( + source_line.text, + colors, + source_line.start_offset, + ), + ) + ) + return tuple(content_lines) + + def _layout_content( + self, + content_lines: tuple[ContentLine, ...], + offset: int, + ) -> LayoutResult: + return layout_content_lines(content_lines, self.console.width, offset) + + def _render_wrapped_rows( + self, + wrapped_rows: tuple[WrappedRow, ...], + ) -> list[RenderLine]: + return [ + self._render_line( + row.prompt_text, + row.fragments, + row.suffix, + ) + for row in wrapped_rows + ] + + def _render_message_lines(self) -> tuple[RenderLine, ...]: + if not self.msg: + return () + return tuple( + RenderLine.from_rendered_text(message_line) + for message_line in self.msg.split("\n") + ) - self.last_refresh_cache.update_cache(self, screen, screeninfo) - return screen + def get_screen_overlays(self) -> tuple[ScreenOverlay, ...]: + return () + + def compose_rendered_screen(self, base_screen: RenderedScreen) -> RenderedScreen: + overlays = list(self.get_screen_overlays()) + message_lines = self._render_message_lines() + if message_lines: + overlays.append(ScreenOverlay(len(base_screen.lines), message_lines)) + if not overlays: + return base_screen + return RenderedScreen(base_screen.lines, base_screen.cursor, tuple(overlays)) + + _prompt_cell_cache: dict[PromptCellCacheKey, tuple[RenderCell, ...]] = field( + init=False, default_factory=dict, repr=False + ) + + def _render_line( + self, + prefix: str, + fragments: tuple[ContentFragment, ...], + suffix: str = "", + ) -> RenderLine: + cells: list[RenderCell] = [] + if prefix: + cache_key = (prefix, self.can_colorize) + cached = self._prompt_cell_cache.get(cache_key) + if cached is None: + prompt_cells = RenderLine.from_rendered_text(prefix).cells + if self.can_colorize and prompt_cells and not ANSI_ESCAPE_SEQUENCE.search(prefix): + prompt_style = StyleRef.from_tag("prompt", THEME()["prompt"]) + prompt_cells = tuple( + RenderCell( + cell.text, + cell.width, + style=prompt_style if cell.text else cell.style, + controls=cell.controls, + ) + for cell in prompt_cells + ) + self._prompt_cell_cache[cache_key] = prompt_cells + cached = prompt_cells + cells.extend(cached) + cells.extend( + RenderCell(fragment.text, fragment.width, style=fragment.style) + for fragment in fragments + ) + if suffix: + cells.extend(RenderLine.from_rendered_text(suffix).cells) + return RenderLine.from_cells(cells) @staticmethod def process_prompt(prompt: str) -> tuple[str, int]: @@ -396,9 +623,8 @@ def process_prompt(prompt: str) -> tuple[str, int]: (\x01 and \x02) removed. The length ignores anything between those brackets as well as any ANSI escape sequences. """ - out_prompt = unbracket(prompt, including_content=False) - visible_prompt = unbracket(prompt, including_content=True) - return out_prompt, wlen(visible_prompt) + prompt_content = build_prompt_content(prompt) + return prompt_content.text, prompt_content.width def bow(self, p: int | None = None) -> int: """Return the 0-based index of the word break preceding p most @@ -460,10 +686,10 @@ def eol(self, p: int | None = None) -> int: def max_column(self, y: int) -> int: """Return the last x-offset for line y""" - return self.screeninfo[y][0] + sum(self.screeninfo[y][1]) + return self.layout.max_column(y) def max_row(self) -> int: - return len(self.screeninfo) - 1 + return self.layout.max_row() def get_arg(self, default: int = 1) -> int: """Return any prefix argument that the user has supplied, @@ -489,10 +715,6 @@ def get_prompt(self, lineno: int, cursor_on_line: bool) -> str: prompt = self.ps3 else: prompt = self.ps1 - - if self.can_colorize: - t = THEME() - prompt = f"{t.prompt}{prompt}{t.reset}" return prompt def push_input_trans(self, itrans: input.KeymapTranslator) -> None: @@ -504,65 +726,48 @@ def pop_input_trans(self) -> None: def setpos_from_xy(self, x: int, y: int) -> None: """Set pos according to coordinates x, y""" - pos = 0 - i = 0 - while i < y: - prompt_len, char_widths = self.screeninfo[i] - offset = len(char_widths) - in_wrapped_line = prompt_len + sum(char_widths) >= self.console.width - if in_wrapped_line: - pos += offset - 1 # -1 cause backslash is not in buffer - else: - pos += offset + 1 # +1 cause newline is in buffer - i += 1 - - j = 0 - cur_x = self.screeninfo[i][0] - while cur_x < x: - if self.screeninfo[i][1][j] == 0: - j += 1 # prevent potential future infinite loop - continue - cur_x += self.screeninfo[i][1][j] - j += 1 - pos += 1 - - self.pos = pos - - def pos2xy(self) -> tuple[int, int]: + self.pos = self.layout.xy_to_pos(x, y) + + def pos2xy(self) -> CursorXY: """Return the x, y coordinates of position 'pos'.""" + assert 0 <= self.pos <= len(self.buffer) + return self.layout.pos_to_xy(self.pos) - prompt_len, y = 0, 0 - char_widths: list[int] = [] - pos = self.pos - assert 0 <= pos <= len(self.buffer) + def insert(self, text: str | list[str]) -> None: + """Insert 'text' at the insertion point.""" + start = self.pos + self.buffer[self.pos : self.pos] = list(text) + self.pos += len(text) + self.invalidate_buffer(start) - # optimize for the common case: typing at the end of the buffer - if pos == len(self.buffer) and len(self.screeninfo) > 0: - y = len(self.screeninfo) - 1 - prompt_len, char_widths = self.screeninfo[y] - return prompt_len + sum(char_widths), y + def invalidate_cursor(self) -> None: + self.invalidation = self.invalidation.with_cursor() - for prompt_len, char_widths in self.screeninfo: - offset = len(char_widths) - in_wrapped_line = prompt_len + sum(char_widths) >= self.console.width - if in_wrapped_line: - offset -= 1 # need to remove line-wrapping backslash + def invalidate_buffer(self, from_pos: int) -> None: + self.invalidation = self.invalidation.with_buffer(from_pos) - if offset >= pos: - break + def invalidate_prompt(self) -> None: + self._prompt_cell_cache.clear() + self.invalidation = self.invalidation.with_prompt() - if not in_wrapped_line: - offset += 1 # there's a newline in buffer + def invalidate_layout(self) -> None: + self.invalidation = self.invalidation.with_layout() - pos -= offset - y += 1 - return prompt_len + sum(char_widths[:pos]), y + def invalidate_theme(self) -> None: + self._prompt_cell_cache.clear() + self.invalidation = self.invalidation.with_theme() - def insert(self, text: str | list[str]) -> None: - """Insert 'text' at the insertion point.""" - self.buffer[self.pos : self.pos] = list(text) - self.pos += len(text) - self.dirty = True + def invalidate_message(self) -> None: + self.invalidation = self.invalidation.with_message() + + def invalidate_overlay(self) -> None: + self.invalidation = self.invalidation.with_overlay() + + def invalidate_full(self) -> None: + self.invalidation = self.invalidation.with_full() + + def clear_invalidation(self) -> None: + self.invalidation = RefreshInvalidation.empty() def update_cursor(self) -> None: """Move the cursor to reflect changes in self.pos""" @@ -574,7 +779,7 @@ def after_command(self, cmd: Command) -> None: """This function is called to allow post command cleanup.""" if getattr(cmd, "kills_digit_arg", True): if self.arg is not None: - self.dirty = True + self.invalidate_prompt() self.arg = None def prepare(self) -> None: @@ -587,9 +792,15 @@ def prepare(self) -> None: self.finished = False del self.buffer[:] self.pos = 0 - self.dirty = True + self.layout = LayoutMap.empty() + self.cxy = self.pos2xy() + self.lxy = (self.pos, 0) + self.rendered_screen = RenderedScreen.empty() + self.invalidate_full() self.last_command = None - self.calc_screen() + base_screen = self.calc_screen() + self.rendered_screen = self.compose_rendered_screen(base_screen) + self.invalidation = RefreshInvalidation.empty() except BaseException: self.restore() raise @@ -598,7 +809,7 @@ def prepare(self) -> None: cmd = self.scheduled_commands.pop() self.do_cmd((cmd, [])) - def last_command_is(self, cls: type) -> bool: + def last_command_is(self, cls: CommandClass) -> bool: if not self.last_command: return False return issubclass(cls, self.last_command) @@ -635,21 +846,35 @@ def finish(self) -> None: def error(self, msg: str = "none") -> None: self.msg = "! " + msg + " " - self.dirty = True + self.invalidate_message() self.console.beep() def update_screen(self) -> None: - if self.dirty: + if self.invalidation.is_cursor_only: + self.update_cursor() + self.clear_invalidation() + elif self.invalidation.needs_screen_refresh: self.refresh() def refresh(self) -> None: """Recalculate and refresh the screen.""" # this call sets up self.cxy, so call it first. - self.screen = self.calc_screen() - self.console.refresh(self.screen, self.cxy) - self.dirty = False + base_screen = self.calc_screen() + rendered_screen = self.compose_rendered_screen(base_screen) + self.rendered_screen = rendered_screen + trace( + "reader.refresh cursor={cursor} lines={lines} " + "dims=({width},{height}) invalidation={invalidation}", + cursor=self.cxy, + lines=len(rendered_screen.composed_lines), + width=self.console.width, + height=self.console.height, + invalidation=self.invalidation, + ) + self.console.refresh(rendered_screen) + self.clear_invalidation() - def do_cmd(self, cmd: tuple[str, list[str]]) -> None: + def do_cmd(self, cmd: CommandInput) -> None: """`cmd` is a tuple of "event_name" and "event", which in the current implementation is always just the "buffer" which happens to be a list of single-character strings.""" @@ -666,13 +891,14 @@ def do_cmd(self, cmd: tuple[str, list[str]]) -> None: command.do() self.after_command(command) - - if self.dirty: - self.refresh() - else: - self.update_cursor() - - if not isinstance(cmd, commands.digit_arg): + if ( + not self.invalidation.needs_screen_refresh + and not self.invalidation.is_cursor_only + ): + self.invalidate_cursor() + self.update_screen() + + if command_type is not commands.digit_arg: self.last_command = command_type self.finished = bool(command.finish) @@ -705,7 +931,7 @@ def handle1(self, block: bool = True) -> bool: if self.msg: self.msg = "" - self.dirty = True + self.invalidate_message() while True: # We use the same timeout as in readline.c: 100ms @@ -722,9 +948,13 @@ def handle1(self, block: bool = True) -> bool: if event.evt == "key": self.input_trans.push(event) elif event.evt == "scroll": + self.invalidate_full() self.refresh() + return True elif event.evt == "resize": + self.invalidate_full() self.refresh() + return True else: translate = False diff --git a/Lib/_pyrepl/readline.py b/Lib/_pyrepl/readline.py index 23b8fa6b9c7625..e3dbd254f68e98 100644 --- a/Lib/_pyrepl/readline.py +++ b/Lib/_pyrepl/readline.py @@ -276,7 +276,7 @@ class maybe_accept(commands.Command): def do(self) -> None: r: ReadlineAlikeReader r = self.reader # type: ignore[assignment] - r.dirty = True # this is needed to hide the completion menu, if visible + r.invalidate_overlay() # hide completion menu, if visible # if there are already several lines and the cursor # is not on the last one, always insert a new \n. @@ -336,7 +336,7 @@ def do(self) -> None: break r.pos -= repeat del b[r.pos : r.pos + repeat] - r.dirty = True + r.invalidate_buffer(r.pos) else: self.reader.error("can't backspace at start") @@ -412,8 +412,12 @@ def set_completer_delims(self, delimiters: Collection[str]) -> None: def get_completer_delims(self) -> str: return "".join(sorted(self.config.completer_delims)) - def _histline(self, line: str) -> str: + def _histline(self, line: str, *, sanitize_nuls: bool = False) -> str: line = line.rstrip("\n") + if "\0" in line: + if not sanitize_nuls: + raise ValueError("embedded null character") + line = line.replace("\0", "") return line def get_history_length(self) -> int: @@ -446,9 +450,12 @@ def read_history_file(self, filename: str = gethistoryfile()) -> None: if line.endswith("\r"): buffer.append(line+'\n') else: - line = self._histline(line) + line = self._histline(line, sanitize_nuls=True) if buffer: - line = self._histline("".join(buffer).replace("\r", "") + line) + line = self._histline( + "".join(buffer).replace("\r", "") + line, + sanitize_nuls=True, + ) del buffer[:] if line: history.append(line) diff --git a/Lib/_pyrepl/render.py b/Lib/_pyrepl/render.py new file mode 100644 index 00000000000000..d626f33bc8d783 --- /dev/null +++ b/Lib/_pyrepl/render.py @@ -0,0 +1,323 @@ +from __future__ import annotations + +from collections.abc import Iterable, Sequence +from dataclasses import dataclass, field +from typing import Literal, Protocol, Self + +from .utils import ANSI_ESCAPE_SEQUENCE, THEME, StyleRef, str_width +from .types import CursorXY + +type RenderStyle = StyleRef | str | None +type LineUpdateKind = Literal[ + "insert_char", + "replace_char", + "replace_span", + "delete_then_insert", + "rewrite_suffix", +] + + +class _ThemeSyntax(Protocol): + def __getitem__(self, key: str, /) -> str: ... + + +@dataclass(frozen=True, slots=True) +class RenderCell: + text: str + width: int + style: StyleRef = field(default_factory=StyleRef) + controls: tuple[str, ...] = () + + @property + def terminal_text(self) -> str: + return render_cells((self,)) + + +def _theme_style(theme: _ThemeSyntax, tag: str) -> str: + return theme[tag] + + +def _style_escape(style: StyleRef) -> str: + if style.sgr: + return style.sgr + if style.tag is None: + return "" + return _theme_style(THEME(), style.tag) + + +def _update_terminal_state(state: str, escape: str) -> str: + if escape in {"\x1b[0m", "\x1b[m"}: + return "" + return state + escape + + +def _cells_from_rendered_text(text: str) -> tuple[RenderCell, ...]: + if not text: + return () + + cells: list[RenderCell] = [] + pending_controls: list[str] = [] + active_sgr = "" + index = 0 + + def append_plain_text(segment: str) -> None: + nonlocal pending_controls + if not segment: + return + if pending_controls: + cells.append(RenderCell("", 0, controls=tuple(pending_controls))) + pending_controls = [] + for char in segment: + cells.append( + RenderCell( + char, + str_width(char), + style=StyleRef.from_sgr(active_sgr), + ) + ) + + for match in ANSI_ESCAPE_SEQUENCE.finditer(text): + append_plain_text(text[index : match.start()]) + escape = match.group(0) + if escape.endswith("m"): + active_sgr = _update_terminal_state(active_sgr, escape) + else: + pending_controls.append(escape) + index = match.end() + + append_plain_text(text[index:]) + if pending_controls: + cells.append(RenderCell("", 0, controls=tuple(pending_controls))) + + return tuple(cells) + + +@dataclass(frozen=True, slots=True) +class RenderLine: + cells: tuple[RenderCell, ...] + text: str + width: int + + @classmethod + def from_cells(cls, cells: Iterable[RenderCell]) -> Self: + cell_tuple = tuple(cells) + return cls( + cells=cell_tuple, + text=render_cells(cell_tuple), + width=sum(cell.width for cell in cell_tuple), + ) + + @classmethod + def from_parts( + cls, + parts: Sequence[str], + widths: Sequence[int], + styles: Sequence[RenderStyle] | None = None, + ) -> Self: + if styles is None: + return cls.from_cells( + RenderCell(text, width) + for text, width in zip(parts, widths) + ) + + cells: list[RenderCell] = [] + for text, width, style in zip(parts, widths, styles): + if isinstance(style, StyleRef): + cells.append(RenderCell(text, width, style=style)) + elif style is None: + cells.append(RenderCell(text, width)) + else: + cells.append(RenderCell(text, width, style=StyleRef.from_tag(style))) + return cls.from_cells(cells) + + @classmethod + def from_rendered_text(cls, text: str) -> Self: + return cls.from_cells(_cells_from_rendered_text(text)) + + +@dataclass(frozen=True, slots=True) +class ScreenOverlay: + y: int + lines: tuple[RenderLine, ...] + insert: bool = False + + +@dataclass(frozen=True, slots=True) +class RenderedScreen: + lines: tuple[RenderLine, ...] + cursor: CursorXY + overlays: tuple[ScreenOverlay, ...] = () + composed_lines: tuple[RenderLine, ...] = field(init=False, default=()) + + def __post_init__(self) -> None: + object.__setattr__(self, "composed_lines", self._compose()) + + def _compose(self) -> tuple[RenderLine, ...]: + if not self.overlays: + return self.lines + + lines = list(self.lines) + y_offset = 0 + for overlay in self.overlays: + adjusted_y = overlay.y + y_offset + if overlay.insert: + lines[adjusted_y:adjusted_y] = overlay.lines + y_offset += len(overlay.lines) + else: + target_len = adjusted_y + len(overlay.lines) + if len(lines) < target_len: + lines.extend([EMPTY_RENDER_LINE] * (target_len - len(lines))) + for index, line in enumerate(overlay.lines): + lines[adjusted_y + index] = line + return tuple(lines) + + @classmethod + def empty(cls) -> Self: + return cls((), (0, 0), ()) + + @classmethod + def from_screen_lines( + cls, + screen: Sequence[str], + cursor: CursorXY, + ) -> Self: + return cls( + tuple(RenderLine.from_rendered_text(line) for line in screen), + cursor, + (), + ) + + def with_overlay( + self, + y: int, + lines: Iterable[RenderLine], + ) -> Self: + return type(self)( + self.lines, + self.cursor, + self.overlays + (ScreenOverlay(y, tuple(lines)),), + ) + + @property + def screen_lines(self) -> tuple[str, ...]: + return tuple(line.text for line in self.composed_lines) + + +@dataclass(frozen=True, slots=True) +class LineDiff: + start_cell: int + start_x: int + old_cells: tuple[RenderCell, ...] + new_cells: tuple[RenderCell, ...] + old_width: int + new_width: int + + @property + def old_text(self) -> str: + return render_cells(self.old_cells) + + @property + def new_text(self) -> str: + return render_cells(self.new_cells) + + @property + def old_changed_width(self) -> int: + return sum(cell.width for cell in self.old_cells) + + @property + def new_changed_width(self) -> int: + return sum(cell.width for cell in self.new_cells) + + +EMPTY_RENDER_LINE = RenderLine(cells=(), text="", width=0) + + +@dataclass(frozen=True, slots=True) +class LineUpdate: + kind: LineUpdateKind + y: int + start_cell: int + start_x: int + cells: tuple[RenderCell, ...] + char_width: int = 0 + clear_eol: bool = False + reset_to_margin: bool = False + text: str = field(init=False, default="") + + def __post_init__(self) -> None: + object.__setattr__(self, "text", render_cells(self.cells)) + + +def _controls_require_cursor_resync(controls: Sequence[str]) -> bool: + # Anything beyond SGR means the cursor may no longer be where we left it. + return any(not control.endswith("m") for control in controls) + + +def requires_cursor_resync(cells: Sequence[RenderCell]) -> bool: + return any(_controls_require_cursor_resync(cell.controls) for cell in cells) + + +def render_cells( + cells: Sequence[RenderCell], + visual_style: str | None = None, +) -> str: + rendered: list[str] = [] + active_escape = "" + for cell in cells: + if cell.controls: + rendered.extend(cell.controls) + if not cell.text: + continue + + target_escape = _style_escape(cell.style) + if visual_style is not None: + target_escape += visual_style + if target_escape != active_escape: + if active_escape: + rendered.append("\x1b[0m") + if target_escape: + rendered.append(target_escape) + active_escape = target_escape + rendered.append(cell.text) + + if active_escape: + rendered.append("\x1b[0m") + return "".join(rendered) + + +def diff_render_lines(old: RenderLine, new: RenderLine) -> LineDiff | None: + if old == new: + return None + + prefix = 0 + start_x = 0 + max_prefix = min(len(old.cells), len(new.cells)) + while prefix < max_prefix and old.cells[prefix] == new.cells[prefix]: + if old.cells[prefix].controls: + break + start_x += old.cells[prefix].width + prefix += 1 + + old_suffix = len(old.cells) + new_suffix = len(new.cells) + while old_suffix > prefix and new_suffix > prefix: + old_cell = old.cells[old_suffix - 1] + new_cell = new.cells[new_suffix - 1] + if old_cell.controls or new_cell.controls or old_cell != new_cell: + break + old_suffix -= 1 + new_suffix -= 1 + + while old_suffix < len(old.cells) and old.cells[old_suffix].width == 0: + old_suffix += 1 + while new_suffix < len(new.cells) and new.cells[new_suffix].width == 0: + new_suffix += 1 + + return LineDiff( + start_cell=prefix, + start_x=start_x, + old_cells=old.cells[prefix:old_suffix], + new_cells=new.cells[prefix:new_suffix], + old_width=old.width, + new_width=new.width, + ) diff --git a/Lib/_pyrepl/simple_interact.py b/Lib/_pyrepl/simple_interact.py index 0da9f91baf6cfc..c169d0191bd833 100644 --- a/Lib/_pyrepl/simple_interact.py +++ b/Lib/_pyrepl/simple_interact.py @@ -161,7 +161,7 @@ def maybe_run_command(statement: str) -> bool: if r.input_trans is r.isearch_trans: r.do_cmd(("isearch-end", [""])) r.pos = len(r.get_unicode()) - r.dirty = True + r.invalidate_full() r.refresh() console.write("\nKeyboardInterrupt\n") console.resetbuffer() diff --git a/Lib/_pyrepl/trace.py b/Lib/_pyrepl/trace.py index 943ee12f964b29..395867805196a5 100644 --- a/Lib/_pyrepl/trace.py +++ b/Lib/_pyrepl/trace.py @@ -32,3 +32,9 @@ def trace(line: str, *k: object, **kw: object) -> None: line = line.format(*k, **kw) trace_file.write(line + "\n") trace_file.flush() + + +def trace_text(text: str, limit: int = 60) -> str: + if len(text) > limit: + text = text[:limit] + "..." + return repr(text) diff --git a/Lib/_pyrepl/types.py b/Lib/_pyrepl/types.py index c5b7ebc1a406bd..c0937ff585e6aa 100644 --- a/Lib/_pyrepl/types.py +++ b/Lib/_pyrepl/types.py @@ -4,7 +4,12 @@ type SimpleContextManager = Iterator[None] type KeySpec = str # like r"\C-c" type CommandName = str # like "interrupt" -type EventTuple = tuple[CommandName, str] +type EventData = list[str] +type EventTuple = tuple[CommandName, EventData] +type CursorXY = tuple[int, int] +type Dimensions = tuple[int, int] +type ScreenInfoRow = tuple[int, list[int]] +type Keymap = tuple[tuple[KeySpec, CommandName], ...] type Completer = Callable[[str, int], str | None] type CharBuffer = list[str] type CharWidths = list[int] diff --git a/Lib/_pyrepl/unix_console.py b/Lib/_pyrepl/unix_console.py index 937b5df6ff7d4c..294d4917c7149b 100644 --- a/Lib/_pyrepl/unix_console.py +++ b/Lib/_pyrepl/unix_console.py @@ -31,14 +31,25 @@ import time import types import platform +from collections.abc import Callable +from dataclasses import dataclass from fcntl import ioctl +from typing import TYPE_CHECKING, cast, overload from . import terminfo from .console import Console, Event from .fancy_termios import tcgetattr, tcsetattr, TermState -from .trace import trace +from .render import ( + EMPTY_RENDER_LINE, + LineUpdate, + RenderLine, + RenderedScreen, + requires_cursor_resync, + diff_render_lines, + render_cells, +) +from .trace import trace, trace_text from .unix_eventqueue import EventQueue -from .utils import wlen # declare posix optional to allow None assignment on other platforms posix: types.ModuleType | None @@ -47,14 +58,12 @@ except ImportError: posix = None -TYPE_CHECKING = False - # types if TYPE_CHECKING: - from typing import AbstractSet, IO, Literal, overload, cast -else: - overload = lambda func: None - cast = lambda typ, val: val + from typing import AbstractSet, IO, Literal + +type _MoveFunc = Callable[[int, int], None] +type _PendingWrite = tuple[str | bytes, bool] class InvalidTerminal(RuntimeError): @@ -140,7 +149,25 @@ def poll(self, timeout: float | None = None) -> list[int]: poll = MinimalPoll # type: ignore[assignment] +@dataclass(frozen=True, slots=True) +class UnixRefreshPlan: + grow_lines: int + use_tall_mode: bool + offset: int + reverse_scroll: int + forward_scroll: int + line_updates: tuple[LineUpdate, ...] + cleared_lines: tuple[int, ...] + rendered_screen: RenderedScreen + cursor: tuple[int, int] + + class UnixConsole(Console): + __buffer: list[_PendingWrite] + __gone_tall: bool + __move: _MoveFunc + __offset: int + def __init__( self, f_in: IO[bytes] | int = 0, @@ -219,7 +246,7 @@ def _my_getstr(cap: str, optional: bool = False) -> bytes | None: self.event_queue = EventQueue( self.input_fd, self.encoding, self.terminfo ) - self.cursor_visible = 1 + self.cursor_visible = True signal.signal(signal.SIGCONT, self._sigcont_handler) @@ -239,34 +266,50 @@ def change_encoding(self, encoding: str) -> None: """ self.encoding = encoding - def refresh(self, screen, c_xy): + def refresh(self, rendered_screen: RenderedScreen) -> None: """ Refresh the console screen. Parameters: - - screen (list): List of strings representing the screen contents. - - c_xy (tuple): Cursor position (x, y) on the screen. + - rendered_screen: Structured rendered screen contents and cursor. """ + c_xy = rendered_screen.cursor + trace( + "unix.refresh start cursor={cursor} lines={lines} prev_lines={prev_lines} " + "offset={offset} posxy={posxy}", + cursor=c_xy, + lines=len(rendered_screen.composed_lines), + prev_lines=len(self._rendered_screen.composed_lines), + offset=self.__offset, + posxy=self.posxy, + ) + plan = self.__plan_refresh(rendered_screen, c_xy) + self.__apply_refresh_plan(plan) + + def __plan_refresh( + self, + rendered_screen: RenderedScreen, + c_xy: tuple[int, int], + ) -> UnixRefreshPlan: cx, cy = c_xy - if not self.__gone_tall: - while len(self.screen) < min(len(screen), self.height): - self.__hide_cursor() - if self.screen: - self.__move(0, len(self.screen) - 1) - self.__write("\n") - self.posxy = 0, len(self.screen) - self.screen.append("") - else: - while len(self.screen) < len(screen): - self.screen.append("") + height = self.height + old_offset = offset = self.__offset + prev_composed = self._rendered_screen.composed_lines + previous_lines = list(prev_composed) + next_lines = list(rendered_screen.composed_lines) + line_count = len(next_lines) - if len(screen) > self.height: - self.__gone_tall = 1 - self.__move = self.__move_tall + grow_lines = 0 + if not self.__gone_tall: + grow_lines = max( + min(line_count, height) - len(prev_composed), + 0, + ) + previous_lines.extend([EMPTY_RENDER_LINE] * grow_lines) + elif len(previous_lines) < line_count: + previous_lines.extend([EMPTY_RENDER_LINE] * (line_count - len(previous_lines))) - px, py = self.posxy - old_offset = offset = self.__offset - height = self.height + use_tall_mode = self.__gone_tall or line_count > height # we make sure the cursor is on the screen, and that we're # using all of the screen if we can @@ -274,56 +317,115 @@ def refresh(self, screen, c_xy): offset = cy elif cy >= offset + height: offset = cy - height + 1 - elif offset > 0 and len(screen) < offset + height: - offset = max(len(screen) - height, 0) - screen.append("") + elif offset > 0 and line_count < offset + height: + offset = max(line_count - height, 0) + next_lines.append(EMPTY_RENDER_LINE) - oldscr = self.screen[old_offset : old_offset + height] - newscr = screen[offset : offset + height] + oldscr = previous_lines[old_offset : old_offset + height] + newscr = next_lines[offset : offset + height] - # use hardware scrolling if we have it. + reverse_scroll = 0 + forward_scroll = 0 if old_offset > offset and self._ri: + reverse_scroll = old_offset - offset + for _ in range(reverse_scroll): + if oldscr: + oldscr.pop(-1) + oldscr.insert(0, EMPTY_RENDER_LINE) + elif old_offset < offset and self._ind: + forward_scroll = offset - old_offset + for _ in range(forward_scroll): + if oldscr: + oldscr.pop(0) + oldscr.append(EMPTY_RENDER_LINE) + + line_updates: list[LineUpdate] = [] + px, _ = self.posxy + for y, oldline, newline in zip(range(offset, offset + height), oldscr, newscr): + update = self.__plan_changed_line(y, oldline, newline, px) + if update is not None: + line_updates.append(update) + + cleared_lines = tuple(range(len(newscr), len(oldscr))) + console_rendered_screen = RenderedScreen(tuple(next_lines), c_xy) + trace( + "unix.refresh plan grow={grow} tall={tall} offset={offset} " + "reverse_scroll={reverse_scroll} forward_scroll={forward_scroll} " + "updates={updates} clears={clears}", + grow=grow_lines, + tall=use_tall_mode, + offset=offset, + reverse_scroll=reverse_scroll, + forward_scroll=forward_scroll, + updates=len(line_updates), + clears=len(cleared_lines), + ) + return UnixRefreshPlan( + grow_lines=grow_lines, + use_tall_mode=use_tall_mode, + offset=offset, + reverse_scroll=reverse_scroll, + forward_scroll=forward_scroll, + line_updates=tuple(line_updates), + cleared_lines=cleared_lines, + rendered_screen=console_rendered_screen, + cursor=(cx, cy), + ) + + def __apply_refresh_plan(self, plan: UnixRefreshPlan) -> None: + cx, cy = plan.cursor + trace( + "unix.refresh apply cursor={cursor} updates={updates} clears={clears}", + cursor=plan.cursor, + updates=len(plan.line_updates), + clears=len(plan.cleared_lines), + ) + visual_style = self.begin_redraw_visualization() + screen_line_count = len(self._rendered_screen.composed_lines) + + for _ in range(plan.grow_lines): + self.__hide_cursor() + if screen_line_count: + self.__move(0, screen_line_count - 1) + self.__write("\n") + self.posxy = 0, screen_line_count + screen_line_count += 1 + + if plan.use_tall_mode and not self.__gone_tall: + self.__gone_tall = True + self.__move = self.__move_tall + + old_offset = self.__offset + if plan.reverse_scroll: self.__hide_cursor() self.__write_code(self._cup, 0, 0) self.posxy = 0, old_offset - for i in range(old_offset - offset): + for _ in range(plan.reverse_scroll): self.__write_code(self._ri) - oldscr.pop(-1) - oldscr.insert(0, "") - elif old_offset < offset and self._ind: + elif plan.forward_scroll: self.__hide_cursor() self.__write_code(self._cup, self.height - 1, 0) self.posxy = 0, old_offset + self.height - 1 - for i in range(offset - old_offset): + for _ in range(plan.forward_scroll): self.__write_code(self._ind) - oldscr.pop(0) - oldscr.append("") - self.__offset = offset + self.__offset = plan.offset - for ( - y, - oldline, - newline, - ) in zip(range(offset, offset + height), oldscr, newscr): - if oldline != newline: - self.__write_changed_line(y, oldline, newline, px) + for update in plan.line_updates: + self.__apply_line_update(update, visual_style) - y = len(newscr) - while y < len(oldscr): + for y in plan.cleared_lines: self.__hide_cursor() self.__move(0, y) self.posxy = 0, y self.__write_code(self._el) - y += 1 self.__show_cursor() - - self.screen = screen.copy() self.move_cursor(cx, cy) self.flushoutput() + self.sync_rendered_screen(plan.rendered_screen, self.posxy) - def move_cursor(self, x, y): + def move_cursor(self, x: int, y: int) -> None: """ Move the cursor to the specified position on the screen. @@ -332,16 +434,25 @@ def move_cursor(self, x, y): - y (int): Y coordinate. """ if y < self.__offset or y >= self.__offset + self.height: - self.event_queue.insert(Event("scroll", None)) + trace( + "unix.move_cursor offscreen x={x} y={y} offset={offset} height={height}", + x=x, + y=y, + offset=self.__offset, + height=self.height, + ) + self.event_queue.insert(Event("scroll", "")) else: + trace("unix.move_cursor x={x} y={y}", x=x, y=y) self.__move(x, y) self.posxy = x, y self.flushoutput() - def prepare(self): + def prepare(self) -> None: """ Prepare the console for input/output operations. """ + trace("unix.prepare") self.__buffer = [] self.__svtermstate = tcgetattr(self.input_fd) @@ -353,21 +464,22 @@ def prepare(self): raw.iflag |= termios.BRKINT raw.lflag &= ~(termios.ICANON | termios.ECHO | termios.IEXTEN) raw.lflag |= termios.ISIG - raw.cc[termios.VMIN] = 1 - raw.cc[termios.VTIME] = 0 + raw.cc[termios.VMIN] = b"\x01" + raw.cc[termios.VTIME] = b"\x00" self.__input_fd_set(raw) - # In macOS terminal we need to deactivate line wrap via ANSI escape code + # Apple Terminal will re-wrap lines for us unless we preempt the + # damage. if self.is_apple_terminal: os.write(self.output_fd, b"\033[?7l") - self.screen = [] self.height, self.width = self.getheightwidth() self.posxy = 0, 0 - self.__gone_tall = 0 + self.__gone_tall = False self.__move = self.__move_short self.__offset = 0 + self.sync_rendered_screen(RenderedScreen.empty(), self.posxy) self.__maybe_write_code(self._smkx) @@ -378,10 +490,11 @@ def prepare(self): self.__enable_bracketed_paste() - def restore(self): + def restore(self) -> None: """ Restore the console to the default state """ + trace("unix.restore") self.__disable_bracketed_paste() self.__maybe_write_code(self._rmkx) self.flushoutput() @@ -446,7 +559,7 @@ def wait(self, timeout: float | None = None) -> bool: or bool(self.pollob.poll(timeout)) ) - def set_cursor_vis(self, visible): + def set_cursor_vis(self, visible: bool) -> None: """ Set the visibility of the cursor. @@ -514,8 +627,9 @@ def finish(self): """ Finish console operations and flush the output buffer. """ - y = len(self.screen) - 1 - while y >= 0 and not self.screen[y]: + rendered_lines = self._rendered_screen.composed_lines + y = len(rendered_lines) - 1 + while y >= 0 and not rendered_lines[y].text: y -= 1 self.__move(0, min(y, self.height + self.__offset - 1)) self.__write("\n\r") @@ -542,7 +656,7 @@ def getpending(self): while not self.event_queue.empty(): e2 = self.event_queue.get() e.data += e2.data - e.raw += e.raw + e.raw += e2.raw amount = struct.unpack("i", ioctl(self.input_fd, FIONREAD, b"\0\0\0\0"))[0] trace("getpending({a})", a=amount) @@ -566,7 +680,7 @@ def getpending(self): while not self.event_queue.empty(): e2 = self.event_queue.get() e.data += e2.data - e.raw += e.raw + e.raw += e2.raw amount = 10000 raw = self.__read(amount) @@ -579,11 +693,12 @@ def clear(self): """ Clear the console screen. """ + trace("unix.clear") self.__write_code(self._clear) - self.__gone_tall = 1 + self.__gone_tall = True self.__move = self.__move_tall self.posxy = 0, 0 - self.screen = [] + self.sync_rendered_screen(RenderedScreen.empty(), self.posxy) @property def input_hook(self): @@ -634,98 +749,178 @@ def __setup_movement(self): self.__move = self.__move_short - def __write_changed_line(self, y, oldline, newline, px_coord): - # this is frustrating; there's no reason to test (say) - # self.dch1 inside the loop -- but alternative ways of - # structuring this function are equally painful (I'm trying to - # avoid writing code generators these days...) - minlen = min(wlen(oldline), wlen(newline)) - x_pos = 0 - x_coord = 0 - - px_pos = 0 - j = 0 - for c in oldline: - if j >= px_coord: - break - j += wlen(c) - px_pos += 1 - - # reuse the oldline as much as possible, but stop as soon as we - # encounter an ESCAPE, because it might be the start of an escape - # sequence - while ( - x_coord < minlen - and oldline[x_pos] == newline[x_pos] - and newline[x_pos] != "\x1b" - ): - x_coord += wlen(newline[x_pos]) - x_pos += 1 + @staticmethod + def __cell_index_from_x(line: RenderLine, x_coord: int) -> int: + width = 0 + index = 0 + while index < len(line.cells) and width < x_coord: + width += line.cells[index].width + index += 1 + return index - # if we need to insert a single character right after the first detected change - if oldline[x_pos:] == newline[x_pos + 1 :] and self.ich1: + def __plan_changed_line( + self, + y: int, + oldline: RenderLine, + newline: RenderLine, + px_coord: int, + ) -> LineUpdate | None: + # NOTE: The shared replace_char / replace_span / rewrite_suffix logic + # is duplicated in WindowsConsole.__plan_changed_line. Keep changes to + # these common cases synchronised between the two files. Yes, this is + # duplicated on purpose; the two backends agree just enough to make a + # shared helper a trap. Unix-only cases (insert_char, delete_then_insert) + # rely on terminal capabilities (ich1/dch1) that are unavailable on + # Windows. + diff = diff_render_lines(oldline, newline) + if diff is None: + return None + + start_cell = diff.start_cell + start_x = diff.start_x + + if ( + self.ich1 + and not diff.old_cells + and (visible_new_cells := tuple( + cell for cell in diff.new_cells if cell.width + )) + and len(visible_new_cells) == 1 + and all(cell.width == 0 for cell in diff.new_cells[1:]) + and oldline.cells[start_cell:] == newline.cells[start_cell + 1 :] + ): + px_cell = self.__cell_index_from_x(oldline, px_coord) if ( y == self.posxy[1] - and x_coord > self.posxy[0] - and oldline[px_pos:x_pos] == newline[px_pos + 1 : x_pos + 1] + and start_x > self.posxy[0] + and oldline.cells[px_cell:start_cell] + == newline.cells[px_cell + 1 : start_cell + 1] ): - x_pos = px_pos - x_coord = px_coord - character_width = wlen(newline[x_pos]) - self.__move(x_coord, y) - self.__write_code(self.ich1) - self.__write(newline[x_pos]) - self.posxy = x_coord + character_width, y - - # if it's a single character change in the middle of the line - elif ( - x_coord < minlen - and oldline[x_pos + 1 :] == newline[x_pos + 1 :] - and wlen(oldline[x_pos]) == wlen(newline[x_pos]) + start_cell = px_cell + start_x = px_coord + planned_cells = diff.new_cells + changed_cell = visible_new_cells[0] + return LineUpdate( + kind="insert_char", + y=y, + start_cell=start_cell, + start_x=start_x, + cells=planned_cells, + char_width=changed_cell.width, + reset_to_margin=requires_cursor_resync(planned_cells), + ) + + if ( + len(diff.old_cells) == 1 + and len(diff.new_cells) == 1 + and diff.old_cells[0].width == diff.new_cells[0].width ): - character_width = wlen(newline[x_pos]) - self.__move(x_coord, y) - self.__write(newline[x_pos]) - self.posxy = x_coord + character_width, y - - # if this is the last character to fit in the line and we edit in the middle of the line - elif ( + planned_cells = diff.new_cells + changed_cell = planned_cells[0] + return LineUpdate( + kind="replace_char", + y=y, + start_cell=start_cell, + start_x=start_x, + cells=planned_cells, + char_width=changed_cell.width, + reset_to_margin=requires_cursor_resync(planned_cells), + ) + + if diff.old_changed_width == diff.new_changed_width: + planned_cells = diff.new_cells + return LineUpdate( + kind="replace_span", + y=y, + start_cell=start_cell, + start_x=start_x, + cells=planned_cells, + char_width=diff.new_changed_width, + reset_to_margin=requires_cursor_resync(planned_cells), + ) + + if ( self.dch1 and self.ich1 - and wlen(newline) == self.width - and x_coord < wlen(newline) - 2 - and newline[x_pos + 1 : -1] == oldline[x_pos:-2] + and newline.width == self.width + and start_x < newline.width - 2 + and newline.cells[start_cell + 1 : -1] == oldline.cells[start_cell:-2] ): + planned_cells = (newline.cells[start_cell],) + changed_cell = planned_cells[0] + return LineUpdate( + kind="delete_then_insert", + y=y, + start_cell=start_cell, + start_x=start_x, + cells=planned_cells, + char_width=changed_cell.width, + reset_to_margin=requires_cursor_resync(planned_cells), + ) + + suffix_cells = newline.cells[start_cell:] + return LineUpdate( + kind="rewrite_suffix", + y=y, + start_cell=start_cell, + start_x=start_x, + cells=suffix_cells, + char_width=sum(cell.width for cell in suffix_cells), + clear_eol=oldline.width > newline.width, + reset_to_margin=requires_cursor_resync(suffix_cells), + ) + + def __apply_line_update( + self, + update: LineUpdate, + visual_style: str | None = None, + ) -> None: + text = render_cells(update.cells, visual_style) if visual_style else update.text + trace( + "unix.refresh update kind={kind} y={y} x={x} text={text} " + "clear_eol={clear_eol} reset_to_margin={reset}", + kind=update.kind, + y=update.y, + x=update.start_x, + text=trace_text(text), + clear_eol=update.clear_eol, + reset=update.reset_to_margin, + ) + if update.kind == "insert_char": + self.__move(update.start_x, update.y) + self.__write_code(self.ich1) + self.__write(text) + self.posxy = update.start_x + update.char_width, update.y + elif update.kind in {"replace_char", "replace_span"}: + self.__move(update.start_x, update.y) + self.__write(text) + self.posxy = update.start_x + update.char_width, update.y + elif update.kind == "delete_then_insert": self.__hide_cursor() - self.__move(self.width - 2, y) - self.posxy = self.width - 2, y + self.__move(self.width - 2, update.y) + self.posxy = self.width - 2, update.y self.__write_code(self.dch1) - - character_width = wlen(newline[x_pos]) - self.__move(x_coord, y) + self.__move(update.start_x, update.y) self.__write_code(self.ich1) - self.__write(newline[x_pos]) - self.posxy = character_width + 1, y - + self.__write(text) + self.posxy = update.start_x + update.char_width, update.y else: self.__hide_cursor() - self.__move(x_coord, y) - if wlen(oldline) > wlen(newline): + self.__move(update.start_x, update.y) + if update.clear_eol: self.__write_code(self._el) - self.__write(newline[x_pos:]) - self.posxy = wlen(newline), y + self.__write(text) + self.posxy = update.start_x + update.char_width, update.y - if "\x1b" in newline: - # ANSI escape characters are present, so we can't assume - # anything about the position of the cursor. Moving the cursor - # to the left margin should work to get to a known position. - self.move_cursor(0, y) + if update.reset_to_margin: + # Non-SGR terminal controls can affect the cursor position. + self.move_cursor(0, update.y) def __write(self, text): - self.__buffer.append((text, 0)) + self.__buffer.append((text, False)) def __write_code(self, fmt, *args): - self.__buffer.append((terminfo.tparm(fmt, *args), 1)) + self.__buffer.append((terminfo.tparm(fmt, *args), True)) def __maybe_write_code(self, fmt, *args): if fmt: @@ -777,29 +972,38 @@ def __move_tall(self, x, y): def __sigwinch(self, signum, frame): self.height, self.width = self.getheightwidth() - self.event_queue.insert(Event("resize", None)) + self.event_queue.insert(Event("resize", "")) def __hide_cursor(self): if self.cursor_visible: self.__maybe_write_code(self._civis) - self.cursor_visible = 0 + self.cursor_visible = False def __show_cursor(self): if not self.cursor_visible: self.__maybe_write_code(self._cnorm) - self.cursor_visible = 1 + self.cursor_visible = True def repaint(self): + composed = self._rendered_screen.composed_lines + trace( + "unix.repaint gone_tall={gone_tall} screen_lines={lines} offset={offset}", + gone_tall=self.__gone_tall, + lines=len(composed), + offset=self.__offset, + ) if not self.__gone_tall: self.posxy = 0, self.posxy[1] self.__write("\r") - ns = len(self.screen) * ["\000" * self.width] - self.screen = ns + ns = len(composed) * ["\000" * self.width] else: self.posxy = 0, self.__offset self.__move(0, self.__offset) ns = self.height * ["\000" * self.width] - self.screen = ns + self.sync_rendered_screen( + RenderedScreen.from_screen_lines(ns, self.posxy), + self.posxy, + ) def __tputs(self, fmt, prog=delayprog): """A Python implementation of the curses tputs function; the diff --git a/Lib/_pyrepl/utils.py b/Lib/_pyrepl/utils.py index 25d7ac1bd0b14e..baeb543d6df9d2 100644 --- a/Lib/_pyrepl/utils.py +++ b/Lib/_pyrepl/utils.py @@ -9,6 +9,7 @@ import _colorize from collections import deque +from dataclasses import dataclass from io import StringIO from tokenize import TokenInfo as TI from typing import Iterable, Iterator, Match, NamedTuple, Self @@ -59,6 +60,21 @@ class ColorSpan(NamedTuple): tag: str +class StyledChar(NamedTuple): + text: str + width: int + tag: str | None = None + + +def _ascii_control_repr(c: str) -> str | None: + code = ord(c) + if code < 32: + return "^" + chr(code + 64) + if code == 127: + return "^?" + return None + + @functools.cache def str_width(c: str) -> int: if ord(c) < 128: @@ -286,6 +302,61 @@ def is_soft_keyword_used(*tokens: TI | None) -> bool: return False +def iter_display_chars( + buffer: str, + colors: list[ColorSpan] | None = None, + start_index: int = 0, +) -> Iterator[StyledChar]: + """Yield visible display characters with widths and semantic color tags. + + Note: ``colors`` is consumed in place as spans are processed -- callers + that split a buffer across multiple calls rely on this mutation to track + which spans have already been handled. + """ + + if not buffer: + return + + color_idx = 0 + if colors: + while color_idx < len(colors) and colors[color_idx].span.end < start_index: + color_idx += 1 + + active_tag = None + if colors and color_idx < len(colors) and colors[color_idx].span.start < start_index: + active_tag = colors[color_idx].tag + + for i, c in enumerate(buffer, start_index): + if colors and color_idx < len(colors) and colors[color_idx].span.start == i: + active_tag = colors[color_idx].tag + + if control := _ascii_control_repr(c): + text = control + width = len(control) + elif ord(c) < 128: + text = c + width = 1 + elif unicodedata.category(c).startswith("C"): + text = r"\u%04x" % ord(c) + width = len(text) + else: + text = c + width = str_width(c) + + yield StyledChar(text, width, active_tag) + + if colors and color_idx < len(colors) and colors[color_idx].span.end == i: + color_idx += 1 + active_tag = None + # Check if the next span starts at the same position + if color_idx < len(colors) and colors[color_idx].span.start == i: + active_tag = colors[color_idx].tag + + # Remove consumed spans so callers see the mutation + if color_idx > 0 and colors: + del colors[:color_idx] + + def disp_str( buffer: str, colors: list[ColorSpan] | None = None, @@ -321,53 +392,18 @@ def disp_str( (['\x1b[1;34mw', 'h', 'i', 'l', 'e\x1b[0m', ' ', '1', ':'], [1, 1, 1, 1, 1, 1, 1, 1]) """ + styled_chars = list(iter_display_chars(buffer, colors, start_index)) chars: CharBuffer = [] char_widths: CharWidths = [] - - if not buffer: - return chars, char_widths - - while colors and colors[0].span.end < start_index: - # move past irrelevant spans - colors.pop(0) - theme = THEME(force_color=force_color) - pre_color = "" - post_color = "" - if colors and colors[0].span.start < start_index: - # looks like we're continuing a previous color (e.g. a multiline str) - pre_color = theme[colors[0].tag] - - for i, c in enumerate(buffer, start_index): - if colors and colors[0].span.start == i: # new color starts now - pre_color = theme[colors[0].tag] - - if c == "\x1a": # CTRL-Z on Windows - chars.append(c) - char_widths.append(2) - elif ord(c) < 128: - chars.append(c) - char_widths.append(1) - elif unicodedata.category(c).startswith("C"): - c = r"\u%04x" % ord(c) - chars.append(c) - char_widths.append(len(c)) - else: - chars.append(c) - char_widths.append(str_width(c)) - - if colors and colors[0].span.end == i: # current color ends now - post_color = theme.reset - colors.pop(0) - - chars[-1] = pre_color + chars[-1] + post_color - pre_color = "" - post_color = "" - if colors and colors[0].span.start < i and colors[0].span.end > i: - # even though the current color should be continued, reset it for now. - # the next call to `disp_str()` will revive it. - chars[-1] += theme.reset + for index, styled_char in enumerate(styled_chars): + previous_tag = styled_chars[index - 1].tag if index else None + next_tag = styled_chars[index + 1].tag if index + 1 < len(styled_chars) else None + prefix = theme[styled_char.tag] if styled_char.tag and styled_char.tag != previous_tag else "" + suffix = theme.reset if styled_char.tag and styled_char.tag != next_tag else "" + chars.append(prefix + styled_char.text + suffix) + char_widths.append(styled_char.width) return chars, char_widths @@ -385,13 +421,35 @@ def prev_next_window[T]( """ iterator = iter(iterable) - window = deque((None, next(iterator)), maxlen=3) + try: + first = next(iterator) + except StopIteration: + return + window = deque((None, first), maxlen=3) try: for x in iterator: window.append(x) yield tuple(window) - except Exception: - raise finally: window.append(None) yield tuple(window) + + +@dataclass(frozen=True, slots=True) +class StyleRef: + tag: str | None = None + sgr: str = "" + + @classmethod + def from_tag(cls, tag: str, sgr: str = "") -> Self: + return cls(tag=tag, sgr=sgr) + + @classmethod + def from_sgr(cls, sgr: str) -> Self: + if not sgr: + return cls() + return cls(sgr=sgr) + + @property + def is_plain(self) -> bool: + return self.tag is None and not self.sgr diff --git a/Lib/_pyrepl/windows_console.py b/Lib/_pyrepl/windows_console.py index cb1834168e881c..e8e8d9604161c8 100644 --- a/Lib/_pyrepl/windows_console.py +++ b/Lib/_pyrepl/windows_console.py @@ -25,6 +25,7 @@ import ctypes import types +from dataclasses import dataclass from ctypes.wintypes import ( _COORD, WORD, @@ -37,9 +38,18 @@ SHORT, ) from ctypes import Structure, POINTER, Union +from typing import TYPE_CHECKING from .console import Event, Console -from .trace import trace -from .utils import wlen +from .render import ( + EMPTY_RENDER_LINE, + LineUpdate, + RenderLine, + RenderedScreen, + requires_cursor_resync, + diff_render_lines, + render_cells, +) +from .trace import trace, trace_text from .windows_eventqueue import EventQueue try: @@ -63,8 +73,6 @@ def __init__(self, err: int | None, descr: str | None = None) -> None: except ImportError: nt = None -TYPE_CHECKING = False - if TYPE_CHECKING: from typing import IO @@ -123,6 +131,17 @@ def __init__(self, err: int | None, descr: str | None = None) -> None: class _error(Exception): pass + +@dataclass(frozen=True, slots=True) +class WindowsRefreshPlan: + grow_lines: int + offset: int + scroll_lines: int + line_updates: tuple[LineUpdate, ...] + cleared_lines: tuple[int, ...] + rendered_screen: RenderedScreen + cursor: tuple[int, int] + def _supports_vt(): try: return nt._supports_virtual_terminal() @@ -159,7 +178,6 @@ def __init__( ): raise WinError(get_last_error()) - self.screen: list[str] = [] self.width = 80 self.height = 25 self.__offset = 0 @@ -170,74 +188,124 @@ def __init__( # Console I/O is redirected, fallback... self.out = None - def refresh(self, screen: list[str], c_xy: tuple[int, int]) -> None: + def refresh(self, rendered_screen: RenderedScreen) -> None: """ Refresh the console screen. Parameters: - - screen (list): List of strings representing the screen contents. - - c_xy (tuple): Cursor position (x, y) on the screen. + - rendered_screen: Structured rendered screen contents and cursor. """ - cx, cy = c_xy - - while len(self.screen) < min(len(screen), self.height): - self._hide_cursor() - if self.screen: - self._move_relative(0, len(self.screen) - 1) - self.__write("\n") - self.posxy = 0, len(self.screen) - self.screen.append("") + c_xy = rendered_screen.cursor + trace( + "windows.refresh start cursor={cursor} lines={lines} prev_lines={prev_lines} " + "offset={offset} posxy={posxy}", + cursor=c_xy, + lines=len(rendered_screen.composed_lines), + prev_lines=len(self._rendered_screen.composed_lines), + offset=self.__offset, + posxy=self.posxy, + ) + plan = self.__plan_refresh(rendered_screen, c_xy) + self.__apply_refresh_plan(plan) - px, py = self.posxy - old_offset = offset = self.__offset + def __plan_refresh( + self, + rendered_screen: RenderedScreen, + c_xy: tuple[int, int], + ) -> WindowsRefreshPlan: + cx, cy = c_xy height = self.height + old_offset = offset = self.__offset + prev_composed = self._rendered_screen.composed_lines + previous_lines = list(prev_composed) + next_lines = list(rendered_screen.composed_lines) + line_count = len(next_lines) + + grow_lines = max( + min(line_count, height) - len(prev_composed), + 0, + ) + previous_lines.extend([EMPTY_RENDER_LINE] * grow_lines) - # we make sure the cursor is on the screen, and that we're - # using all of the screen if we can + scroll_lines = 0 if cy < offset: offset = cy elif cy >= offset + height: offset = cy - height + 1 scroll_lines = offset - old_offset + previous_lines.extend([EMPTY_RENDER_LINE] * scroll_lines) + elif offset > 0 and line_count < offset + height: + offset = max(line_count - height, 0) + next_lines.append(EMPTY_RENDER_LINE) + + oldscr = previous_lines[old_offset : old_offset + height] + newscr = next_lines[offset : offset + height] + + line_updates: list[LineUpdate] = [] + px, _ = self.posxy + for y, oldline, newline in zip(range(offset, offset + height), oldscr, newscr): + update = self.__plan_changed_line(y, oldline, newline, px) + if update is not None: + line_updates.append(update) + + cleared_lines = tuple(range(len(newscr), len(oldscr))) + console_rendered_screen = RenderedScreen(tuple(next_lines), c_xy) + trace( + "windows.refresh plan grow={grow} offset={offset} scroll_lines={scroll_lines} " + "updates={updates} clears={clears}", + grow=grow_lines, + offset=offset, + scroll_lines=scroll_lines, + updates=len(line_updates), + clears=len(cleared_lines), + ) + return WindowsRefreshPlan( + grow_lines=grow_lines, + offset=offset, + scroll_lines=scroll_lines, + line_updates=tuple(line_updates), + cleared_lines=cleared_lines, + rendered_screen=console_rendered_screen, + cursor=(cx, cy), + ) - # Scrolling the buffer as the current input is greater than the visible - # portion of the window. We need to scroll the visible portion and the - # entire history - self._scroll(scroll_lines, self._getscrollbacksize()) - self.posxy = self.posxy[0], self.posxy[1] + scroll_lines - self.__offset += scroll_lines + def __apply_refresh_plan(self, plan: WindowsRefreshPlan) -> None: + cx, cy = plan.cursor + trace( + "windows.refresh apply cursor={cursor} updates={updates} clears={clears}", + cursor=plan.cursor, + updates=len(plan.line_updates), + clears=len(plan.cleared_lines), + ) + visual_style = self.begin_redraw_visualization() + screen_line_count = len(self._rendered_screen.composed_lines) - for i in range(scroll_lines): - self.screen.append("") - elif offset > 0 and len(screen) < offset + height: - offset = max(len(screen) - height, 0) - screen.append("") + for _ in range(plan.grow_lines): + self._hide_cursor() + if screen_line_count: + self._move_relative(0, screen_line_count - 1) + self.__write("\n") + self.posxy = 0, screen_line_count + screen_line_count += 1 - oldscr = self.screen[old_offset : old_offset + height] - newscr = screen[offset : offset + height] + if plan.scroll_lines: + self._scroll(plan.scroll_lines, self._getscrollbacksize()) + self.posxy = self.posxy[0], self.posxy[1] + plan.scroll_lines - self.__offset = offset + self.__offset = plan.offset self._hide_cursor() - for ( - y, - oldline, - newline, - ) in zip(range(offset, offset + height), oldscr, newscr): - if oldline != newline: - self.__write_changed_line(y, oldline, newline, px) - - y = len(newscr) - while y < len(oldscr): + for update in plan.line_updates: + self.__apply_line_update(update, visual_style) + + for y in plan.cleared_lines: self._move_relative(0, y) self.posxy = 0, y self._erase_to_end() - y += 1 self._show_cursor() - - self.screen = screen self.move_cursor(cx, cy) + self.sync_rendered_screen(plan.rendered_screen, self.posxy) @property def input_hook(self): @@ -246,37 +314,98 @@ def input_hook(self): if nt is not None and nt._is_inputhook_installed(): return nt._inputhook - def __write_changed_line( - self, y: int, oldline: str, newline: str, px_coord: int - ) -> None: - minlen = min(wlen(oldline), wlen(newline)) - x_pos = 0 - x_coord = 0 - - # reuse the oldline as much as possible, but stop as soon as we - # encounter an ESCAPE, because it might be the start of an escape - # sequence - while ( - x_coord < minlen - and oldline[x_pos] == newline[x_pos] - and newline[x_pos] != "\x1b" + def __plan_changed_line( # keep in sync with UnixConsole.__plan_changed_line + self, + y: int, + oldline: RenderLine, + newline: RenderLine, + px_coord: int, + ) -> LineUpdate | None: + diff = diff_render_lines(oldline, newline) + if diff is None: + return None + + start_cell = diff.start_cell + start_x = diff.start_x + if ( + len(diff.old_cells) == 1 + and len(diff.new_cells) == 1 + and diff.old_cells[0].width == diff.new_cells[0].width ): - x_coord += wlen(newline[x_pos]) - x_pos += 1 + changed_cell = diff.new_cells[0] + # Ctrl-Z (SUB) can reach here via RenderLine.from_rendered_text() + # for prompt/message lines, which bypasses iter_display_chars(). + # On Windows, raw \x1a causes console cursor anomalies, so we + # force a cursor resync when it appears. + return LineUpdate( + kind="replace_char", + y=y, + start_cell=start_cell, + start_x=start_x, + cells=diff.new_cells, + char_width=changed_cell.width, + reset_to_margin=( + requires_cursor_resync(diff.new_cells) + or "\x1a" in changed_cell.text + ), + ) + + if diff.old_changed_width == diff.new_changed_width: + return LineUpdate( + kind="replace_span", + y=y, + start_cell=start_cell, + start_x=start_x, + cells=diff.new_cells, + char_width=diff.new_changed_width, + reset_to_margin=( + requires_cursor_resync(diff.new_cells) + or any("\x1a" in cell.text for cell in diff.new_cells) + ), + ) + + suffix_cells = newline.cells[start_cell:] + return LineUpdate( + kind="rewrite_suffix", + y=y, + start_cell=start_cell, + start_x=start_x, + cells=suffix_cells, + char_width=sum(cell.width for cell in suffix_cells), + clear_eol=oldline.width > newline.width, + reset_to_margin=( + requires_cursor_resync(suffix_cells) + or any("\x1a" in cell.text for cell in suffix_cells) + ), + ) - self._hide_cursor() - self._move_relative(x_coord, y) - if wlen(oldline) > wlen(newline): + def __apply_line_update( + self, + update: LineUpdate, + visual_style: str | None = None, + ) -> None: + text = render_cells(update.cells, visual_style) if visual_style else update.text + trace( + "windows.refresh update kind={kind} y={y} x={x} text={text} " + "clear_eol={clear_eol} reset_to_margin={reset}", + kind=update.kind, + y=update.y, + x=update.start_x, + text=trace_text(text), + clear_eol=update.clear_eol, + reset=update.reset_to_margin, + ) + original_y = self.posxy[1] + self._move_relative(update.start_x, update.y) + if update.clear_eol: self._erase_to_end() - self.__write(newline[x_pos:]) - self.posxy = min(wlen(newline), self.width - 1), y + self.__write(text) + self.posxy = min(update.start_x + update.char_width, self.width - 1), update.y - if "\x1b" in newline or y != self.posxy[1] or '\x1a' in newline: - # ANSI escape characters are present, so we can't assume - # anything about the position of the cursor. Moving the cursor - # to the left margin should work to get to a known position. - self.move_cursor(0, y) + if update.reset_to_margin or update.y != original_y: + # Non-SGR terminal controls or vertical movement require a cursor sync. + self.move_cursor(0, update.y) def _scroll( self, top: int, bottom: int, left: int | None = None, right: int | None = None @@ -336,12 +465,12 @@ def _erase_to_end(self) -> None: self.__write(ERASE_IN_LINE) def prepare(self) -> None: - trace("prepare") - self.screen = [] + trace("windows.prepare") self.height, self.width = self.getheightwidth() self.posxy = 0, 0 self.__offset = 0 + self.sync_rendered_screen(RenderedScreen.empty(), self.posxy) if self.__vt_support: if not SetConsoleMode(InHandle, self.__original_input_mode | ENABLE_VIRTUAL_TERMINAL_INPUT): @@ -349,6 +478,7 @@ def prepare(self) -> None: self._enable_bracketed_paste() def restore(self) -> None: + trace("windows.restore") if self.__vt_support: # Recover to original mode before running REPL self._disable_bracketed_paste() @@ -374,8 +504,16 @@ def move_cursor(self, x: int, y: int) -> None: raise ValueError(f"Bad cursor position {x}, {y}") if y < self.__offset or y >= self.__offset + self.height: + trace( + "windows.move_cursor offscreen x={x} y={y} offset={offset} height={height}", + x=x, + y=y, + offset=self.__offset, + height=self.height, + ) self.event_queue.insert(Event("scroll", "")) else: + trace("windows.move_cursor x={x} y={y}", x=x, y=y) self._move_relative(x, y) self.posxy = x, y @@ -496,15 +634,17 @@ def beep(self) -> None: def clear(self) -> None: """Wipe the screen""" + trace("windows.clear") self.__write(CLEAR) self.posxy = 0, 0 - self.screen = [] + self.sync_rendered_screen(RenderedScreen.empty(), self.posxy) def finish(self) -> None: """Move the cursor to the end of the display and otherwise get ready for end. XXX could be merged with restore? Hmm.""" - y = len(self.screen) - 1 - while y >= 0 and not self.screen[y]: + rendered_lines = self._rendered_screen.composed_lines + y = len(rendered_lines) - 1 + while y >= 0 and not rendered_lines[y].text: y -= 1 self._move_relative(0, min(y, self.height + self.__offset - 1)) self.__write("\r\n") @@ -573,6 +713,7 @@ def wait(self, timeout: float | None) -> bool: ) def repaint(self) -> None: + trace("windows.repaint unsupported") raise NotImplementedError("No repaint support") diff --git a/Lib/test/test_pyrepl/support.py b/Lib/test/test_pyrepl/support.py index 4f7f9d77933336..4f1125eb500ba8 100644 --- a/Lib/test/test_pyrepl/support.py +++ b/Lib/test/test_pyrepl/support.py @@ -4,6 +4,7 @@ from unittest.mock import MagicMock from _pyrepl.console import Console, Event +from _pyrepl.render import RenderLine, RenderedScreen from _pyrepl.readline import ReadlineAlikeReader, ReadlineConfig from _pyrepl.simple_interact import _strip_final_indent from _pyrepl.utils import unbracket, ANSI_ESCAPE_SEQUENCE @@ -15,7 +16,13 @@ def assert_screen_equal( ): actual = clean_screen(reader) if clean else reader.screen expected = expected.split("\n") - self.assertListEqual(actual, expected) + if clean: + self.assertListEqual(actual, expected) + return + + actual_lines = [RenderLine.from_rendered_text(line) for line in actual] + expected_lines = [RenderLine.from_rendered_text(line) for line in expected] + self.assertListEqual(actual_lines, expected_lines) def multiline_input(reader: ReadlineAlikeReader, namespace: dict | None = None): @@ -118,9 +125,11 @@ class FakeConsole(Console): def __init__(self, events, encoding="utf-8") -> None: self.events = iter(events) self.encoding = encoding - self.screen = [] + self._rendered_screen = RenderedScreen.empty() self.height = 100 self.width = 80 + self.posxy = (0, 0) + self._redraw_visual_cycle = 0 def get_event(self, block: bool = True) -> Event | None: return next(self.events) @@ -131,7 +140,7 @@ def getpending(self) -> Event: def getheightwidth(self) -> tuple[int, int]: return self.height, self.width - def refresh(self, screen: list[str], xy: tuple[int, int]) -> None: + def refresh(self, rendered_screen: RenderedScreen) -> None: pass def prepare(self) -> None: diff --git a/Lib/test/test_pyrepl/test_pyrepl.py b/Lib/test/test_pyrepl/test_pyrepl.py index 082215da0a3fba..a7730b772c1c08 100644 --- a/Lib/test/test_pyrepl/test_pyrepl.py +++ b/Lib/test/test_pyrepl/test_pyrepl.py @@ -10,6 +10,7 @@ import subprocess import sys import tempfile +from functools import partial from pkgutil import ModuleInfo from unittest import TestCase, skipUnless, skipIf, SkipTest from unittest.mock import Mock, patch @@ -752,6 +753,64 @@ def test_history_with_multiline_entries(self): self.assert_screen_equal(reader, expected, clean=True) self.assertEqual(output, expected) + def test_up_arrow_stays_within_recalled_multiline_entry(self): + code = ( + "def fo():\n" + "...\n" + "...\n" + "a = 1\n" + "b = 2\n" + "x = 1\n" + "\n" + "def fo():\n" + "...\n" + "...\n" + "a = 1\n" + "b = 2\n" + "x = 1\n" + "z = 2\n" + "\n" + ) + events = list(itertools.chain( + code_to_events(code), + [ + Event(evt="key", data="up", raw=bytearray(b"\x1bOA")), + Event(evt="key", data="up", raw=bytearray(b"\x1bOA")), + ] + )) + + reader = self.prepare_reader(events) + multiline_input(reader) + multiline_input(reader) + + expected = ( + "def fo():\n" + " ...\n" + " ...\n" + " a = 1\n" + " b = 2\n" + " x = 1\n" + " z = 2" + ) + reader.more_lines = partial(more_lines, namespace=None) + reader.ps1 = reader.ps2 = ">>> " + reader.ps3 = reader.ps4 = "... " + try: + reader.prepare() + reader.refresh() + + reader.handle1() + self.assertEqual(reader.historyi, 1) + self.assertEqual(reader.get_unicode(), expected) + first_cxy = reader.cxy + + reader.handle1() + self.assertEqual(reader.historyi, 1) + self.assertEqual(reader.get_unicode(), expected) + self.assertLess(reader.cxy[1], first_cxy[1]) + finally: + reader.restore() + def test_history_navigation_with_down_arrow(self): events = itertools.chain( @@ -797,12 +856,28 @@ def test_history_search(self): self.assertEqual(output, "1+1") self.assert_screen_equal(reader, "1+1", clean=True) + def test_history_file_embedded_nuls_are_sanitized(self): + reader = self.prepare_reader([]) + wrapper = _ReadlineWrapper(reader=reader, f_in=0, f_out=1) + with tempfile.NamedTemporaryFile("wb", delete=False) as history_file: + history_file.write(b"good\n") + history_file.write(b"ba\0d\n") + history_file.write(b"line1\r\nline2\0\n") + filename = history_file.name + + try: + wrapper.read_history_file(filename) + finally: + unlink(filename) + + self.assertEqual(reader.history, ["good", "bad", "line1\nline2"]) + def test_control_character(self): events = code_to_events("c\x1d\n") reader = self.prepare_reader(events) output = multiline_input(reader) self.assertEqual(output, "c\x1d") - self.assert_screen_equal(reader, "c\x1d", clean=True) + self.assert_screen_equal(reader, "c^]", clean=True) def test_history_search_backward(self): # Test history search backward with "imp" input @@ -2111,12 +2186,13 @@ def test_ctrl_d_single_line_end_no_newline(self): class TestWindowsConsoleEolWrap(TestCase): def _make_mock_console(self, width=80): from _pyrepl import windows_console as wc + from _pyrepl.render import RenderedScreen console = object.__new__(wc.WindowsConsole) console.width = width console.posxy = (0, 0) - console.screen = [""] + console._rendered_screen = RenderedScreen.from_screen_lines([""], (0, 0)) console._hide_cursor = Mock() console._show_cursor = Mock() @@ -2127,25 +2203,29 @@ def _make_mock_console(self, width=80): return console, wc + def _apply_changed_line(self, console, wc, y, old_line, new_line, px=0): + from _pyrepl.render import RenderLine + + old_render = RenderLine.from_rendered_text(old_line) + new_render = RenderLine.from_rendered_text(new_line) + update = wc.WindowsConsole._WindowsConsole__plan_changed_line( + console, y, old_render, new_render, px + ) + if update is not None: + wc.WindowsConsole._WindowsConsole__apply_line_update( + console, update + ) + def test_short_line_sets_posxy_normally(self): width = 10 y = 3 console, wc = self._make_mock_console(width=width) - old_line = "" - new_line = "a" * 3 - wc.WindowsConsole._WindowsConsole__write_changed_line( - console, y, old_line, new_line, 0 - ) + self._apply_changed_line(console, wc, y, "", "a" * 3) self.assertEqual(console.posxy, (3, y)) def test_exact_width_line_does_not_wrap(self): width = 10 y = 3 console, wc = self._make_mock_console(width=width) - old_line = "" - new_line = "a" * width - - wc.WindowsConsole._WindowsConsole__write_changed_line( - console, y, old_line, new_line, 0 - ) + self._apply_changed_line(console, wc, y, "", "a" * width) self.assertEqual(console.posxy, (width - 1, y)) diff --git a/Lib/test/test_pyrepl/test_reader.py b/Lib/test/test_pyrepl/test_reader.py index b1b6ae16a1e592..6c0c655b6d1428 100644 --- a/Lib/test/test_pyrepl/test_reader.py +++ b/Lib/test/test_pyrepl/test_reader.py @@ -10,6 +10,8 @@ from .support import ScreenEqualMixin, code_to_events from .support import prepare_reader, prepare_console from _pyrepl.console import Event +from _pyrepl.layout import LayoutMap +from _pyrepl.readline import ReadlineAlikeReader, ReadlineConfig from _pyrepl.reader import Reader from _colorize import default_theme @@ -102,6 +104,22 @@ def test_calc_screen_backspace(self): reader, _ = handle_all_events(events) self.assert_screen_equal(reader, "aa") + def test_refresh_escapes_control_bytes_in_buffer(self): + console = prepare_console(()) + config = ReadlineConfig(readline_completer=None) + reader = ReadlineAlikeReader(console=console, config=config) + reader.can_colorize = False + reader.ps1 = reader.ps2 = ">>> " + reader.ps3 = reader.ps4 = "... " + reader.buffer = ["\x00", "\x1b"] + reader.pos = len(reader.buffer) + reader.invalidate_full() + + reader.refresh() + + self.assert_screen_equal(reader, ">>> ^@^[") + self.assertEqual(reader.cxy, (8, 0)) + def test_calc_screen_wrap_removes_after_backspace(self): events = itertools.chain( code_to_events(10 * "a"), @@ -176,7 +194,7 @@ def test_up_arrow_after_ctrl_r(self): ) reader, _ = handle_all_events(events) - self.assert_screen_equal(reader, "") + self.assertIn(reader.screen, ([], [""])) def test_newline_within_block_trailing_whitespace(self): # fmt: off @@ -300,6 +318,21 @@ def test_prompt_length(self): self.assertEqual(prompt, "\033[0;32m樂>\033[0m> ") self.assertEqual(l, 5) + def test_prepare_with_zero_width_does_not_crash(self): + console = prepare_console([], width=0) + reader = ReadlineAlikeReader(console=console, config=ReadlineConfig()) + reader.ps1 = ">>> " + reader.ps2 = ">>> " + reader.ps3 = "... " + reader.ps4 = "" + reader.can_colorize = False + reader.paste_mode = False + + reader.prepare() + + self.assertEqual(reader.cxy, (0, 0)) + self.assertEqual(reader.screen, []) + def test_completions_updated_on_key_press(self): namespace = {"itertools": itertools} code = "itertools." @@ -346,8 +379,7 @@ def test_key_press_on_tab_press_once(self): def test_pos2xy_with_no_columns(self): console = prepare_console([]) reader = prepare_reader(console) - # Simulate a resize to 0 columns - reader.screeninfo = [] + reader.layout = LayoutMap(()) self.assertEqual(reader.pos2xy(), (0, 0)) def test_setpos_from_xy_for_non_printing_char(self): diff --git a/Lib/test/test_pyrepl/test_unix_console.py b/Lib/test/test_pyrepl/test_unix_console.py index a1ee6d4878fe93..d95f2fbb1adced 100644 --- a/Lib/test/test_pyrepl/test_unix_console.py +++ b/Lib/test/test_pyrepl/test_unix_console.py @@ -6,13 +6,13 @@ import threading import unittest from functools import partial -from test.support import os_helper, force_not_colorized_test_class +from test.support import force_color, os_helper, force_not_colorized_test_class from test.support import threading_helper from unittest import TestCase from unittest.mock import MagicMock, call, patch, ANY, Mock -from .support import handle_all_events, code_to_events +from .support import handle_all_events, code_to_events, more_lines try: from _pyrepl.console import Event @@ -100,6 +100,46 @@ def unix_console(events, **kwargs): @patch("os.write") @force_not_colorized_test_class class TestConsole(TestCase): + @staticmethod + def _prepare_reader_with_prompts(console, **kwargs): + from _pyrepl.readline import ReadlineAlikeReader, ReadlineConfig + + config = ReadlineConfig( + readline_completer=kwargs.pop("readline_completer", None) + ) + reader = ReadlineAlikeReader(console=console, config=config) + reader.paste_mode = False + for key, val in kwargs.items(): + setattr(reader, key, val) + return reader + + def test_colorized_multiline_typing_does_not_redraw_previous_line(self, _os_write): + def prepare_reader_with_prompts(console, **kwargs): + reader = self._prepare_reader_with_prompts(console, **kwargs) + reader.more_lines = partial(more_lines, namespace=None) + return reader + + with force_color(True): + events = itertools.chain( + code_to_events("def foo():"), + [Event(evt="key", data="\n", raw=bytearray(b"\n"))], + code_to_events("x = 1"), + [Event(evt="key", data="\n", raw=bytearray(b"\n"))], + code_to_events("y"), + ) + _, con = handle_all_events( + events, + prepare_console=unix_console, + prepare_reader=prepare_reader_with_prompts, + ) + con.restore() + + self.assertNotIn( + call(ANY, b" \x1b[0m x \x1b[0m=\x1b[0m "), + _os_write.mock_calls, + ) + self.assertIn(call(ANY, b"y"), _os_write.mock_calls) + def test_no_newline(self, _os_write): code = "1" events = code_to_events(code) diff --git a/Lib/test/test_pyrepl/test_windows_console.py b/Lib/test/test_pyrepl/test_windows_console.py index f03f84e0985c1f..02ac1baf7f5f46 100644 --- a/Lib/test/test_pyrepl/test_windows_console.py +++ b/Lib/test/test_pyrepl/test_windows_console.py @@ -33,7 +33,6 @@ def _mock_console_init(self, f_in=0, f_out=1, term="", encoding="utf-8"): """Mock __init__ to avoid real Windows API calls in headless environments.""" super(WindowsConsole, self).__init__(f_in, f_out, term, encoding) - self.screen = [] self.width = 80 self.height = 25 self._WindowsConsole__offset = 0 @@ -312,7 +311,8 @@ def same_console(events): call(self.move_left(5)), call(self.move_up()), call(b"def f():"), - call(self.move_left(3)), + call(self.move_left(8)), + call(self.move_right(5)), call(self.move_down()), ] )