1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
|
import json
import shutil
import subprocess
import time
from argparse import Namespace
from datetime import datetime
from caelestia.utils.notify import close_notification, notify
from caelestia.utils.paths import recording_notif_path, recording_path, recordings_dir
class Command:
args: Namespace
recorder: str
def __init__(self, args: Namespace) -> None:
self.args = args
self.recorder = self._detect_recorder()
def _detect_recorder(self) -> str:
"""Detect which screen recorder to use based on GPU."""
try:
# Check for NVIDIA GPU
lspci_output = subprocess.check_output(["lspci"], text=True)
if "nvidia" in lspci_output.lower():
# Check if wf-recorder is available
if shutil.which("wf-recorder"):
return "wf-recorder"
# Default to wl-screenrec if available
if shutil.which("wl-screenrec"):
return "wl-screenrec"
# Fallback to wf-recorder if wl-screenrec is not available
if shutil.which("wf-recorder"):
return "wf-recorder"
raise RuntimeError("No compatible screen recorder found")
except subprocess.CalledProcessError:
# If lspci fails, default to wl-screenrec
return "wl-screenrec" if shutil.which("wl-screenrec") else "wf-recorder"
def run(self) -> None:
if self.proc_running():
self.stop()
else:
self.start()
def proc_running(self) -> bool:
return subprocess.run(["pidof", self.recorder], stdout=subprocess.DEVNULL).returncode == 0
def start(self) -> None:
args = []
if self.args.region:
if self.args.region == "slurp":
region = subprocess.check_output(["slurp"], text=True)
else:
region = self.args.region
args += ["-g", region.strip()]
else:
monitors = json.loads(subprocess.check_output(["hyprctl", "monitors", "-j"]))
focused_monitor = next(monitor for monitor in monitors if monitor["focused"])
if focused_monitor:
args += ["-o", focused_monitor["name"]]
if self.args.sound:
sources = subprocess.check_output(["pactl", "list", "short", "sources"], text=True).splitlines()
audio_source = None
for source in sources:
if "RUNNING" in source:
audio_source = source.split()[1]
break
# Fallback to IDLE source if no RUNNING source
if not audio_source:
for source in sources:
if "IDLE" in source:
audio_source = source.split()[1]
break
if not audio_source:
raise ValueError("No audio source found")
if self.recorder == "wf-recorder":
args += [f"--audio={audio_source}"]
else:
args += ["--audio", "--audio-device", audio_source]
recording_path.parent.mkdir(parents=True, exist_ok=True)
proc = subprocess.Popen(
[self.recorder, *args, "-f", recording_path],
stderr=subprocess.PIPE,
text=True,
start_new_session=True,
)
notif = notify("-p", "Recording started", "Recording...")
recording_notif_path.write_text(notif)
for _ in range(5):
if proc.poll() is not None:
if proc.returncode != 0:
close_notification(notif)
notify("Recording failed", f"Recording error: {proc.communicate()[1]}")
return
time.sleep(0.2)
def stop(self) -> None:
# Start killing recording process
subprocess.run(["pkill", self.recorder])
# Wait for recording to finish to avoid corrupted video file
while self.proc_running():
time.sleep(0.1)
# Move to recordings folder
new_path = recordings_dir / f"recording_{datetime.now().strftime('%Y%m%d_%H-%M-%S')}.mp4"
recordings_dir.mkdir(exist_ok=True, parents=True)
shutil.move(recording_path, new_path)
# Close start notification
try:
close_notification(recording_notif_path.read_text())
except IOError:
pass
action = notify(
"--action=watch=Watch",
"--action=open=Open",
"--action=delete=Delete",
"Recording stopped",
f"Recording saved in {new_path}",
)
if action == "watch":
subprocess.Popen(["app2unit", "-O", new_path], start_new_session=True)
elif action == "open":
p = subprocess.run(
[
"dbus-send",
"--session",
"--dest=org.freedesktop.FileManager1",
"--type=method_call",
"/org/freedesktop/FileManager1",
"org.freedesktop.FileManager1.ShowItems",
f"array:string:file://{new_path}",
"string:",
]
)
if p.returncode != 0:
subprocess.Popen(["app2unit", "-O", new_path.parent], start_new_session=True)
elif action == "delete":
new_path.unlink()
|