diff options
| author | Batuhan Edgüer <67585935+BestSithInEU@users.noreply.github.com> | 2025-08-18 10:39:35 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-08-18 17:39:35 +1000 |
| commit | c72223a7e6ede10931be329620d3b76d592b4eef (patch) | |
| tree | ca1fcd619a335d71364f835329b335845d754a07 /src/caelestia | |
| parent | theme: add nvtop, htop, and cava support (#45) (diff) | |
| download | caelestia-cli-c72223a7e6ede10931be329620d3b76d592b4eef.tar.gz caelestia-cli-c72223a7e6ede10931be329620d3b76d592b4eef.tar.bz2 caelestia-cli-c72223a7e6ede10931be329620d3b76d592b4eef.zip | |
feat: window resizer command & daemon (#43)
* resizer: add window resizer daemon command
Implements a continuous window resizer daemon that automatically resizes
windows based on configurable rules. Features include:
- Listens to Hyprland socket events for real-time window detection
- Supports multiple match types: initial_title, title_contains, title_exact
- Configurable via CLI config file with fallback to sensible defaults
- Rate limiting to prevent excessive resize operations
- Window actions: float, center, and custom dimensions
- Integration with existing CLI structure
Usage: caelestia resizer --daemon
* refactor: replace pip daemon with integrated resizer functionality
## Summary
- Remove standalone pip daemon and integrate its functionality into the resizer
- Add regex matching, config support, and active mode to resizer
- Implement clean 'caelestia resizer pip' command for quick PiP operations
- Update keybinds to use new unified resizer command
## Why Replace the Old PiP Method?
### 1. Code Duplication
The old pip daemon duplicated window management logic that already existed in the resizer:
- Both daemons listened to Hyprland socket events
- Both had similar window detection and manipulation code
- Both needed rate limiting and error handling
### 2. Limited Functionality
The old pip daemon was restricted:
- Only worked with regex pattern matching for 'Picture in Picture' titles
- No configuration support for custom rules
- No way to apply PiP to arbitrary windows
- No integration with other window actions
### 3. Maintenance Overhead
Having two separate daemons created maintenance issues:
- Two different codebases to maintain and debug
- Potential conflicts when both daemons run simultaneously
- Inconsistent error handling and logging approaches
### 4. Review Feedback Implementation
The PR review specifically requested this consolidation:
- "This can actually probably replace the pip daemon entirely"
- "consider adding a regex match mode and pip action, then add that to the default rules"
## New Integrated Approach Benefits
### 1. Unified Window Management
- Single daemon handles all window operations (resize, float, center, pip)
- Consistent configuration format using camelCase
- Shared error handling and rate limiting
### 2. Enhanced PiP Functionality
- Works with any window title pattern (regex, contains, exact)
- Configurable through CLI config file
- Active mode: `caelestia resizer pip` for quick PiP on current window
- Better error messages and user guidance
### 3. Future-Proof Architecture
- Easy to add new window actions (e.g., minimize, maximize, workspace move)
- Extensible pattern matching (could add class-based matching)
- Single place to implement new Hyprland features
### 4. Improved User Experience
- Simpler command structure: `caelestia resizer pip` vs complex arguments
- Better error messages when windows aren't floating
- Consistent CLI interface across all window operations
## Implementation Details
- Added pip action to WindowRule system
- Integrated original pip calculation with minimum size constraints
- Added type safety improvements throughout
- Maintained backward compatibility for existing users
- Updated keybind: `bind = $kbWindowPip, exec, caelestia resizer pip`
* fix: unpack dispatch_commands list in hypr.batch call
- Fix 'sequence item 0: expected str instance, list found' error
- hypr.batch() expects individual string arguments, not a list
- Use *dispatch_commands to unpack the list properly
* fix: handle Hyprland event format with triple > separators
- Fix window ID parsing for events with >>> instead of >>
- Add .lstrip('>') to remove any leading > characters
- Support both >> and >>> formats for compatibility
- Fixes 'Invalid window ID format: >555ee935ba30' errors
* resizer: implement active mode for all matching windows
Active mode now searches through all open windows and applies the rule to
any that match the specified pattern, rather than just checking if the
currently active window matches. This allows for batch operations on
multiple windows with the same pattern.
Special case: using pattern "active" will still target only the currently
active window, allowing users to apply rules to just the focused window
when needed.
This addresses the latest review feedback requesting that active mode work
on any open window that matches the given pattern.
* parser: better resizer help
* completions: add for resizer
---------
Co-authored-by: 2 * r + 2 * t <61896496+soramanew@users.noreply.github.com>
Diffstat (limited to 'src/caelestia')
| -rw-r--r-- | src/caelestia/parser.py | 25 | ||||
| -rw-r--r-- | src/caelestia/subcommands/pip.py | 46 | ||||
| -rw-r--r-- | src/caelestia/subcommands/resizer.py | 463 |
3 files changed, 483 insertions, 51 deletions
diff --git a/src/caelestia/parser.py b/src/caelestia/parser.py index 838a608..311b782 100644 --- a/src/caelestia/parser.py +++ b/src/caelestia/parser.py @@ -1,6 +1,6 @@ import argparse -from caelestia.subcommands import clipboard, emoji, pip, record, scheme, screenshot, shell, toggle, wallpaper +from caelestia.subcommands import clipboard, emoji, record, resizer, scheme, screenshot, shell, toggle, wallpaper from caelestia.utils.paths import wallpapers_dir from caelestia.utils.scheme import get_scheme_names, scheme_variants from caelestia.utils.wallpaper import get_wallpaper @@ -106,9 +106,24 @@ def parse_args() -> (argparse.ArgumentParser, argparse.Namespace): help="do not automatically change the scheme mode based on wallpaper colour", ) - # Create parser for pip opts - pip_parser = command_parser.add_parser("pip", help="picture in picture utilities") - pip_parser.set_defaults(cls=pip.Command) - pip_parser.add_argument("-d", "--daemon", action="store_true", help="start the daemon") + # Create parser for resizer opts + resizer_parser = command_parser.add_parser("resizer", help="window resizer daemon") + resizer_parser.set_defaults(cls=resizer.Command) + resizer_parser.add_argument("-d", "--daemon", action="store_true", help="start the resizer daemon") + resizer_parser.add_argument( + "pattern", + nargs="?", + help="pattern to match against windows ('active' for current window only, 'pip' for quick pip mode)", + ) + resizer_parser.add_argument( + "match_type", + nargs="?", + metavar="match_type", + choices=["titleContains", "titleExact", "titleRegex", "initialTitle"], + help="type of pattern matching (titleContains,titleExact,titleRegex,initialTitle)", + ) + resizer_parser.add_argument("width", nargs="?", help="width to resize to") + resizer_parser.add_argument("height", nargs="?", help="height to resize to") + resizer_parser.add_argument("actions", nargs="?", help="comma-separated actions to apply (float,center,pip)") return parser, parser.parse_args() diff --git a/src/caelestia/subcommands/pip.py b/src/caelestia/subcommands/pip.py deleted file mode 100644 index 6f4727d..0000000 --- a/src/caelestia/subcommands/pip.py +++ /dev/null @@ -1,46 +0,0 @@ -import re -import socket -from argparse import Namespace - -from caelestia.utils import hypr - - -class Command: - args: Namespace - - def __init__(self, args: Namespace) -> None: - self.args = args - - def run(self) -> None: - if self.args.daemon: - self.daemon() - else: - win = hypr.message("activewindow") - if win["floating"]: - self.handle_window(win["address"], win["workspace"]["name"]) - - def handle_window(self, address: str, ws: str) -> None: - mon_id = next(w for w in hypr.message("workspaces") if w["name"] == ws)["monitorID"] - mon = next(m for m in hypr.message("monitors") if m["id"] == mon_id) - width, height = next(c for c in hypr.message("clients") if c["address"] == address)["size"] - - scale_factor = mon["height"] / 4 / height - scaled_win_size = f"{int(width * scale_factor)} {int(height * scale_factor)}" - off = min(mon["width"], mon["height"]) * 0.03 - move_to = f"{int(mon['x']) + int(mon['width'] - off - width * scale_factor)} {int(mon['y']) + int(mon['height'] - off - height * scale_factor)}" - - hypr.batch( - f"dispatch resizewindowpixel exact {scaled_win_size},address:{address}", - f"dispatch movewindowpixel exact {move_to},address:{address}", - ) - - def daemon(self) -> None: - with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock: - sock.connect(hypr.socket2_path) - - while True: - data = sock.recv(4096).decode() - if data.startswith("openwindow>>"): - address, ws, cls, title = data[12:].split(",") - if re.match(r"^[Pp]icture(-| )in(-| )[Pp]icture$", title): - self.handle_window(f"0x{address}", ws) diff --git a/src/caelestia/subcommands/resizer.py b/src/caelestia/subcommands/resizer.py new file mode 100644 index 0000000..142a725 --- /dev/null +++ b/src/caelestia/subcommands/resizer.py @@ -0,0 +1,463 @@ +import json +import re +import socket +import time +from argparse import Namespace +from pathlib import Path +from typing import Any, Dict, Optional + +from caelestia.utils import hypr +from caelestia.utils.paths import user_config_path + + +class WindowRule: + def __init__(self, name: str, match_type: str, width: str, height: str, actions: list[str]): + self.name = name + self.match_type = match_type + self.width = width + self.height = height + self.actions = actions + + +class Command: + def __init__(self, args: Namespace) -> None: + self.args = args + self.timeout_tracker: dict[str, float] = {} + self.window_rules = self._load_window_rules() + + def _load_window_rules(self) -> list[WindowRule]: + default_rules = [ + WindowRule("(Bitwarden", "titleContains", "20%", "54%", ["float", "center"]), + WindowRule("Sign in - Google Accounts", "titleContains", "35%", "65%", ["float", "center"]), + WindowRule("oauth", "titleContains", "30%", "60%", ["float", "center"]), + WindowRule("^[Pp]icture(-| )in(-| )[Pp]icture$", "titleRegex", "", "", ["pip"]), + ] + + try: + config = json.loads(user_config_path.read_text()) + if "resizer" in config and "rules" in config["resizer"]: + rules = [] + for rule_config in config["resizer"]["rules"]: + rules.append( + WindowRule( + rule_config["name"], + rule_config["matchType"], + rule_config["width"], + rule_config["height"], + rule_config["actions"], + ) + ) + return rules + except (json.JSONDecodeError, KeyError): + self._log_message("ERROR: invalid config") + except FileNotFoundError: + pass + + return default_rules + + def _log_message(self, message: str) -> None: + timestamp = time.strftime("%Y-%m-%d %H:%M:%S") + print(f"[{timestamp}] {message}") + + def _is_rate_limited(self, key: str) -> bool: + current_time = time.time() + last_time = self.timeout_tracker.get(key, 0) + + if current_time < last_time + 1: + return True + + self.timeout_tracker[key] = current_time + return False + + def _get_window_info(self, window_id: str) -> Optional[Dict[str, Any]]: + try: + clients = hypr.message("clients") + if isinstance(clients, list): + for client in clients: + if isinstance(client, dict) and client.get("address") == f"0x{window_id}": + return client + except Exception: + pass + + return None + + def _apply_pip_action(self, window_id: str) -> None: + try: + address = f"0x{window_id}" + clients_result = hypr.message("clients") + if not isinstance(clients_result, list): + return + + window = None + for c in clients_result: + if isinstance(c, dict) and c.get("address") == address: + window = c + break + + if not window or not isinstance(window, dict) or not window.get("floating", False): + return + + workspaces_result = hypr.message("workspaces") + if not isinstance(workspaces_result, list): + return + + workspace_info = window.get("workspace") + if not isinstance(workspace_info, dict): + return + + workspace_name = workspace_info.get("name") + workspace = None + for w in workspaces_result: + if isinstance(w, dict) and w.get("name") == workspace_name: + workspace = w + break + + if not workspace or not isinstance(workspace, dict): + return + + monitors_result = hypr.message("monitors") + if not isinstance(monitors_result, list): + return + + monitor_id = workspace.get("monitorID") + monitor = None + for m in monitors_result: + if isinstance(m, dict) and m.get("id") == monitor_id: + monitor = m + break + + if not monitor or not isinstance(monitor, dict): + return + + window_size = window.get("size") + if not isinstance(window_size, list) or len(window_size) < 2: + return + + width, height = window_size[0], window_size[1] + if not isinstance(width, (int, float)) or not isinstance(height, (int, float)): + return + + monitor_height = monitor.get("height") + monitor_width = monitor.get("width") + monitor_x = monitor.get("x") + monitor_y = monitor.get("y") + + if not all(isinstance(x, (int, float)) for x in [monitor_height, monitor_width, monitor_x, monitor_y]): + return + + scale_factor = monitor_height / 4 / height + scaled_width = int(width * scale_factor) + scaled_height = int(height * scale_factor) + + # Ensure minimum reasonable size + min_width = 200 + min_height = 150 + scaled_width = max(scaled_width, min_width) + scaled_height = max(scaled_height, min_height) + + # Use offset to ensure window stays on screen with some margin + offset = min(monitor_width, monitor_height) * 0.03 + + # Position in bottom-right corner with offset + move_x = monitor_x + monitor_width - scaled_width - offset + move_y = monitor_y + monitor_height - scaled_height - offset + + command1 = f"dispatch resizewindowpixel exact {scaled_width} {scaled_height},address:{address}" + command2 = f"dispatch movewindowpixel exact {int(move_x)} {int(move_y)},address:{address}" + hypr.batch(command1, command2) + + self._log_message( + f"Applied PiP action to window {address}: {scaled_width}x{scaled_height} at ({move_x}, {move_y})" + ) + + except Exception as e: + self._log_message(f"ERROR: Failed to apply PiP action to window 0x{window_id}: {e}") + + def _apply_window_actions(self, window_id: str, width: str, height: str, actions: list[str]) -> bool: + dispatch_commands = [] + + if "float" in actions: + window_info = self._get_window_info(window_id) + if window_info and not window_info.get("floating", False): + dispatch_commands.append(f"dispatch togglefloating address:0x{window_id}") + + if "pip" in actions: + self._apply_pip_action(window_id) + return True + + dispatch_commands.append(f"dispatch resizewindowpixel exact {width} {height},address:0x{window_id}") + + if "center" in actions: + dispatch_commands.append("dispatch centerwindow") + + try: + hypr.batch(*dispatch_commands) + self._log_message(f"Applied actions to window 0x{window_id}: {width} x {height} ({', '.join(actions)})") + return True + except Exception as e: + self._log_message(f"ERROR: Failed to apply window actions for window 0x{window_id}: {e}") + return False + + def _match_window_rule(self, window_title: str, initial_title: str) -> WindowRule | None: + for rule in self.window_rules: + if rule.match_type == "initialTitle": + if initial_title == rule.name: + return rule + elif rule.match_type == "titleContains": + if rule.name in window_title: + return rule + elif rule.match_type == "titleExact": + if window_title == rule.name: + return rule + elif rule.match_type == "titleRegex": + try: + if re.search(rule.name, window_title): + return rule + except re.error: + self._log_message(f"ERROR: Invalid regex pattern in rule '{rule.name}'") + + return None + + def _handle_window_event(self, event: str) -> None: + if event.startswith("windowtitle"): + self._handle_title_event(event) + elif event.startswith("openwindow"): + self._handle_open_event(event) + + def _handle_title_event(self, event: str) -> None: + try: + # Handle both >> and >>> separators (different Hyprland versions) + if ">>>" in event: + window_id = event.split(">>>")[1].split(",")[0] + else: + window_id = event.split(">>")[1].split(",")[0] + + # Remove any leading > characters + window_id = window_id.lstrip(">") + + if not all(c in "0123456789abcdefABCDEF" for c in window_id): + self._log_message(f"ERROR: Invalid window ID format: {window_id}") + return + + window_info = self._get_window_info(window_id) + if not window_info: + return + + window_title = window_info.get("title", "") + initial_title = window_info.get("initialTitle", "") + + self._log_message(f"DEBUG: Window 0x{window_id} - Title: '{window_title}' | Initial: '{initial_title}'") + + rule = self._match_window_rule(window_title, initial_title) + if rule: + if self._is_rate_limited(window_id): + self._log_message(f"Rate limited: skipping window 0x{window_id}") + return + + self._log_message(f"Matched rule '{rule.name}' for window 0x{window_id}") + self._apply_window_actions(window_id, rule.width, rule.height, rule.actions) + + except (IndexError, ValueError) as e: + self._log_message(f"ERROR: Failed to parse window title event: {e}") + + def _handle_open_event(self, event: str) -> None: + try: + # Handle both >> and >>> separators + if "openwindow>>>" in event: + data = event[13:] # Remove "openwindow>>>" + else: + data = event[12:] # Remove "openwindow>>" + + window_id, workspace, window_class, title = data.split(",", 3) + + # Remove any leading > characters + window_id = window_id.lstrip(">") + + if not all(c in "0123456789abcdefABCDEF" for c in window_id): + self._log_message(f"ERROR: Invalid window ID format: {window_id}") + return + + self._log_message(f"DEBUG: New window 0x{window_id} - Title: '{title}' | Class: '{window_class}'") + + rule = self._match_window_rule(title, title) + if rule: + if self._is_rate_limited(window_id): + self._log_message(f"Rate limited: skipping window 0x{window_id}") + return + + self._log_message(f"Matched rule '{rule.name}' for new window 0x{window_id}") + self._apply_window_actions(window_id, rule.width, rule.height, rule.actions) + + except (IndexError, ValueError) as e: + self._log_message(f"ERROR: Failed to parse window open event: {e}") + + def run(self) -> None: + if self.args.daemon: + self._run_daemon() + elif hasattr(self.args, "pattern") and self.args.pattern == "pip": + self._run_pip_mode() + elif all( + hasattr(self.args, attr) and getattr(self.args, attr) + for attr in ["pattern", "match_type", "width", "height", "actions"] + ): + self._run_active_mode() + else: + print( + "Resizer daemon - use --daemon to start, 'pip' for quick pip mode, or provide pattern, match_type, width, height, and actions for active mode" + ) + + def _run_pip_mode(self) -> None: + """Quick pip mode - applies pip action to the active window if it's floating""" + try: + active_window_result = hypr.message("activewindow") + if not isinstance(active_window_result, dict) or not active_window_result.get("address"): + print("ERROR: No active window found") + return + + address = active_window_result.get("address", "") + if not isinstance(address, str) or not address.startswith("0x"): + print("ERROR: Invalid window address") + return + + window_id = address[2:] # Remove "0x" prefix + window_title = active_window_result.get("title", "") + + if not active_window_result.get("floating", False): + print(f"Window '{window_title}' is not floating. PIP only works on floating windows.") + print("Try making it floating first with: hyprctl dispatch togglefloating") + return + + print(f"Applying PIP to active window: '{window_title}'") + self._apply_pip_action(window_id) + print("PIP applied successfully") + + except Exception as e: + print(f"ERROR: Failed to apply PIP to active window: {e}") + + def _run_active_mode(self) -> None: + try: + # Create a temporary rule from command line arguments + actions = self.args.actions.split(",") if self.args.actions else [] + temp_rule = WindowRule(self.args.pattern, self.args.match_type, self.args.width, self.args.height, actions) + + # Special case: "active" pattern means only target the currently active window + if temp_rule.name.lower() == "active": + self._apply_to_active_window(temp_rule) + return + + # Find all windows that match the pattern + matching_windows = self._find_matching_windows(temp_rule) + + if not matching_windows: + print(f"No windows found matching pattern '{temp_rule.name}' with match type '{temp_rule.match_type}'") + return + + print(f"Found {len(matching_windows)} matching window(s)") + + # Apply rule to all matching windows + success_count = 0 + for window in matching_windows: + window_id = window["address"][2:] # Remove "0x" prefix + window_title = window.get("title", "") + + print(f"Applying rule to window 0x{window_id}: '{window_title}'") + success = self._apply_window_actions(window_id, temp_rule.width, temp_rule.height, temp_rule.actions) + if success: + success_count += 1 + + print(f"Successfully applied rule to {success_count}/{len(matching_windows)} windows") + + except Exception as e: + print(f"ERROR: Failed to apply rule: {e}") + + def _apply_to_active_window(self, temp_rule: WindowRule) -> None: + """Apply rule only to the currently active window""" + try: + active_window_result = hypr.message("activewindow") + if not isinstance(active_window_result, dict) or not active_window_result.get("address"): + print("ERROR: No active window found") + return + + window_title = active_window_result.get("title", "") + address = active_window_result.get("address", "") + if not isinstance(address, str) or not address.startswith("0x"): + print("ERROR: Invalid window address") + return + + window_id = address[2:] # Remove "0x" prefix + + print(f"Applying rule to active window 0x{window_id}: '{window_title}'") + success = self._apply_window_actions(window_id, temp_rule.width, temp_rule.height, temp_rule.actions) + if success: + print("Rule applied successfully") + else: + print("Failed to apply rule") + + except Exception as e: + print(f"ERROR: Failed to apply rule to active window: {e}") + + def _find_matching_windows(self, temp_rule: WindowRule) -> list: + """Find all windows that match the given rule pattern""" + try: + clients_result = hypr.message("clients") + if not isinstance(clients_result, list): + return [] + + matching_windows = [] + for window in clients_result: + if not isinstance(window, dict): + continue + + window_title = window.get("title", "") + initial_title = window.get("initialTitle", "") + + # Check if window matches the pattern + matches = False + if temp_rule.match_type == "initialTitle": + matches = initial_title == temp_rule.name + elif temp_rule.match_type == "titleContains": + matches = temp_rule.name in window_title + elif temp_rule.match_type == "titleExact": + matches = window_title == temp_rule.name + elif temp_rule.match_type == "titleRegex": + try: + matches = bool(re.search(temp_rule.name, window_title)) + except re.error: + print(f"ERROR: Invalid regex pattern '{temp_rule.name}'") + return [] + + if matches: + matching_windows.append(window) + + return matching_windows + + except Exception as e: + print(f"ERROR: Failed to find matching windows: {e}") + return [] + + def _run_daemon(self) -> None: + self._log_message("Hyprland window resizer started") + self._log_message(f"Loaded {len(self.window_rules)} window rules") + + socket_path = Path(hypr.socket2_path) + if not socket_path.exists(): + self._log_message(f"ERROR: Hyprland socket not found at {socket_path}") + return + + try: + with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock: + sock.connect(hypr.socket2_path) + + self._log_message("Connected to Hyprland socket, listening for events...") + + while True: + data = sock.recv(4096).decode() + if data: + for line in data.strip().split("\n"): + if line: + self._handle_window_event(line) + + except KeyboardInterrupt: + self._log_message("Resizer daemon stopped") + except Exception as e: + self._log_message(f"ERROR: {e}") |