summaryrefslogtreecommitdiff
path: root/src/caelestia/subcommands
diff options
context:
space:
mode:
Diffstat (limited to 'src/caelestia/subcommands')
-rw-r--r--src/caelestia/subcommands/record.py111
1 files changed, 42 insertions, 69 deletions
diff --git a/src/caelestia/subcommands/record.py b/src/caelestia/subcommands/record.py
index 3cfde82..2b946c6 100644
--- a/src/caelestia/subcommands/record.py
+++ b/src/caelestia/subcommands/record.py
@@ -1,4 +1,5 @@
import json
+import re
import shutil
import subprocess
import time
@@ -8,108 +9,80 @@ from datetime import datetime
from caelestia.utils.notify import close_notification, notify
from caelestia.utils.paths import recording_notif_path, recording_path, recordings_dir
+RECORDER = "gpu-screen-recorder"
+
class Command:
args: Namespace
- recorder: str
def __init__(self, args: Namespace) -> None:
self.args = args
- self.recorder = self._detect_recorder()
-
- def _detect_recorder(self) -> str:
- """Detect which screen recorder to use based on GPU."""
- try:
- # Check for NVIDIA GPU
- lspci_output = subprocess.check_output(["lspci"], text=True)
- if "nvidia" in lspci_output.lower():
- # Check if wf-recorder is available
- if shutil.which("wf-recorder"):
- return "wf-recorder"
-
- # Default to wl-screenrec if available
- if shutil.which("wl-screenrec"):
- return "wl-screenrec"
-
- # Fallback to wf-recorder if wl-screenrec is not available
- if shutil.which("wf-recorder"):
- return "wf-recorder"
-
- raise RuntimeError("No compatible screen recorder found")
- except subprocess.CalledProcessError:
- # If lspci fails, default to wl-screenrec
- return "wl-screenrec" if shutil.which("wl-screenrec") else "wf-recorder"
def run(self) -> None:
- if self.proc_running():
+ if self.args.pause:
+ subprocess.run(["pkill", "-USR2", "-f", RECORDER], stdout=subprocess.DEVNULL)
+ elif self.proc_running():
self.stop()
else:
self.start()
def proc_running(self) -> bool:
- return subprocess.run(["pidof", self.recorder], stdout=subprocess.DEVNULL).returncode == 0
+ return subprocess.run(["pidof", RECORDER], stdout=subprocess.DEVNULL).returncode == 0
+
+ def intersects(self, a: tuple[int, int, int, int], b: tuple[int, int, int, int]) -> bool:
+ return a[0] < b[0] + b[2] and a[0] + a[2] > b[0] and a[1] < b[1] + b[3] and a[1] + a[3] > b[1]
def start(self) -> None:
- args = []
+ args = ["-w"]
+ monitors = json.loads(subprocess.check_output(["hyprctl", "monitors", "-j"]))
if self.args.region:
if self.args.region == "slurp":
- region = subprocess.check_output(["slurp"], text=True)
+ region = subprocess.check_output(["slurp", "-f", "%wx%h+%x+%y"], text=True)
else:
- region = self.args.region
- args += ["-g", region.strip()]
+ region = self.args.region.strip()
+ args += ["region", "-region", region]
+
+ m = re.match(r"(\d+)x(\d+)\+(\d+)\+(\d+)", region)
+ if not m:
+ raise ValueError(f"Invalid region: {region}")
+
+ w, h, x, y = map(int, m.groups())
+ r = x, y, w, h
+ max_rr = 0
+ for monitor in monitors:
+ if self.intersects((monitor["x"], monitor["y"], monitor["width"], monitor["height"]), r):
+ rr = round(monitor["refreshRate"])
+ max_rr = max(max_rr, rr)
+ args += ["-f", str(max_rr)]
else:
- monitors = json.loads(subprocess.check_output(["hyprctl", "monitors", "-j"]))
focused_monitor = next(monitor for monitor in monitors if monitor["focused"])
if focused_monitor:
- args += ["-o", focused_monitor["name"]]
+ args += [focused_monitor["name"], "-f", str(round(focused_monitor["refreshRate"]))]
if self.args.sound:
- sources = subprocess.check_output(["pactl", "list", "short", "sources"], text=True).splitlines()
- audio_source = None
-
- for source in sources:
- if "RUNNING" in source:
- audio_source = source.split()[1]
- break
-
- # Fallback to IDLE source if no RUNNING source
- if not audio_source:
- for source in sources:
- if "IDLE" in source:
- audio_source = source.split()[1]
- break
-
- if not audio_source:
- raise ValueError("No audio source found")
-
- if self.recorder == "wf-recorder":
- args += [f"--audio={audio_source}"]
- else:
- args += ["--audio", "--audio-device", audio_source]
+ args += ["-a", "default_output"]
recording_path.parent.mkdir(parents=True, exist_ok=True)
- proc = subprocess.Popen(
- [self.recorder, *args, "-f", recording_path],
- stderr=subprocess.PIPE,
- text=True,
- start_new_session=True,
- )
+ proc = subprocess.Popen([RECORDER, *args, "-o", str(recording_path)], start_new_session=True)
notif = notify("-p", "Recording started", "Recording...")
recording_notif_path.write_text(notif)
- for _ in range(5):
- if proc.poll() is not None:
- if proc.returncode != 0:
- close_notification(notif)
- notify("Recording failed", f"Recording error: {proc.communicate()[1]}")
- return
- time.sleep(0.2)
+ try:
+ if proc.wait(1) != 0:
+ close_notification(notif)
+ notify(
+ "Recording failed",
+ "An error occurred attempting to start recorder. "
+ f"Command `{' '.join(proc.args)}` failed with exit code {proc.returncode}",
+ )
+ except subprocess.TimeoutExpired:
+ pass
def stop(self) -> None:
# Start killing recording process
- subprocess.run(["pkill", self.recorder])
+ subprocess.run(["pkill", "-f", RECORDER], stdout=subprocess.DEVNULL)
# Wait for recording to finish to avoid corrupted video file
while self.proc_running():