divisor.cli_menu
Command handlers for interactive denoising state changes.
1# SPDX-License-Identifier: MPL-2.0 AND LicenseRef-Commons-Clause-License-Condition-1.0 2# <!-- // /* d a r k s h a p e s */ --> 3 4"""Command handlers for interactive denoising state changes.""" 5 6import inspect 7from typing import Any, Callable 8 9from nnll.console import nfo 10 11from divisor.controller import ManualTimestepController 12from divisor.interaction_context import InteractionContext 13from divisor.keybinds import _CHOICE_REGISTRY 14from divisor.state import MenuState 15 16 17def _format_menu_line( 18 key: str, 19 entry: dict, 20 state: MenuState, 21) -> str: 22 """ 23 Build a printable menu line for a registry entry.\n 24 param key: Shortcut key used in the menu (e.g. ``"g"`` for guidance). 25 param entry: Registry entry containing at least a ``desc`` string and optionally a `state_keys`` list that maps to attributes on ``state``. 26 param state: The current input state instance. 27 returns: Formatted line ready for ``nfo`` output.""" 28 import torch 29 30 shortcut = f"[{key.upper()}]" # e.g. "[G]" 31 description = entry.get("desc", "") # human‑readable label 32 33 state_keys = entry.get("state_keys", []) 34 if not state_keys: 35 return f"{shortcut}{description}" 36 37 state_values = [] 38 for attr_name in state_keys: 39 attr_value = getattr(state, attr_name, "?") 40 if isinstance(attr_value, torch.Tensor): 41 attr_value = attr_value.item() if attr_value.numel() == 1 else f"tensor{tuple(attr_value.shape)}" 42 state_values.append(f"{attr_value}") 43 44 current_state = " / ".join(state_values) # join multiple values 45 return f"{shortcut}{description}: {current_state}" 46 47 48def route_choices( 49 controller: ManualTimestepController, 50 state: MenuState, 51 interaction_context: InteractionContext, 52 **kwargs: Any, 53) -> MenuState: 54 """Process user choice input and return updated state.\n 55 :param controller: ManualTimestepController instance 56 :param state: Current input state 57 :param interaction_context: InteractionContext instance 58 :returns: Updated input state 59 """ 60 61 step = state.timestep_index 62 nfo(f"Step {step}/{state.total_timesteps - 1} @ noise level {state.current_timestep:.4f}") 63 for choice_letter in sorted(_CHOICE_REGISTRY): 64 choice_description = _CHOICE_REGISTRY[choice_letter] 65 if not choice_letter: 66 continue 67 line = _format_menu_line(choice_letter, choice_description, state) 68 nfo(line) 69 70 menu_keybinds: dict[str, Callable[[], Any]] = {} 71 for choice_letter, choice_function in _CHOICE_REGISTRY.items(): 72 fn = choice_function["fn"] 73 sig = inspect.signature(fn) 74 param_names = list(sig.parameters.keys()) 75 kwargs = {} 76 for name in param_names: 77 if name not in kwargs and hasattr(controller, name): 78 kwargs[name] = getattr(interaction_context, name, None) 79 menu_keybinds[choice_letter] = lambda fn=fn, kwargs=kwargs: fn(controller, state, interaction_context, **kwargs) 80 prompt = "".join(k.upper() for k in _CHOICE_REGISTRY if k) + "/q" 81 choice = input(f": [{prompt}] or advance with Enter:").lower().strip() 82 83 if choice == "q": 84 import sys 85 86 nfo("Quitting...") 87 sys.exit(0) 88 89 elif choice == "/": 90 return controller.current_state 91 elif choice in menu_keybinds: 92 result = menu_keybinds[choice]() 93 state = result if isinstance(result, MenuState) else state 94 else: 95 nfo("Invalid choice, please try again") 96 97 return controller.current_state
def
route_choices( controller: divisor.controller.ManualTimestepController, state: divisor.state.MenuState, interaction_context: divisor.interaction_context.InteractionContext, **kwargs: Any) -> divisor.state.MenuState:
49def route_choices( 50 controller: ManualTimestepController, 51 state: MenuState, 52 interaction_context: InteractionContext, 53 **kwargs: Any, 54) -> MenuState: 55 """Process user choice input and return updated state.\n 56 :param controller: ManualTimestepController instance 57 :param state: Current input state 58 :param interaction_context: InteractionContext instance 59 :returns: Updated input state 60 """ 61 62 step = state.timestep_index 63 nfo(f"Step {step}/{state.total_timesteps - 1} @ noise level {state.current_timestep:.4f}") 64 for choice_letter in sorted(_CHOICE_REGISTRY): 65 choice_description = _CHOICE_REGISTRY[choice_letter] 66 if not choice_letter: 67 continue 68 line = _format_menu_line(choice_letter, choice_description, state) 69 nfo(line) 70 71 menu_keybinds: dict[str, Callable[[], Any]] = {} 72 for choice_letter, choice_function in _CHOICE_REGISTRY.items(): 73 fn = choice_function["fn"] 74 sig = inspect.signature(fn) 75 param_names = list(sig.parameters.keys()) 76 kwargs = {} 77 for name in param_names: 78 if name not in kwargs and hasattr(controller, name): 79 kwargs[name] = getattr(interaction_context, name, None) 80 menu_keybinds[choice_letter] = lambda fn=fn, kwargs=kwargs: fn(controller, state, interaction_context, **kwargs) 81 prompt = "".join(k.upper() for k in _CHOICE_REGISTRY if k) + "/q" 82 choice = input(f": [{prompt}] or advance with Enter:").lower().strip() 83 84 if choice == "q": 85 import sys 86 87 nfo("Quitting...") 88 sys.exit(0) 89 90 elif choice == "/": 91 return controller.current_state 92 elif choice in menu_keybinds: 93 result = menu_keybinds[choice]() 94 state = result if isinstance(result, MenuState) else state 95 else: 96 nfo("Invalid choice, please try again") 97 98 return controller.current_state
Process user choice input and return updated state.
Parameters
- controller: ManualTimestepController instance
- state: Current input state
- interaction_context: InteractionContext instance :returns: Updated input state