1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
|
import json
import re
import shutil
import subprocess
import time
from argparse import Namespace
from datetime import datetime
from caelestia.utils.notify import close_notification, notify
from caelestia.utils.paths import recording_notif_path, recording_path, recordings_dir
RECORDER = "gpu-screen-recorder"
class Command:
args: Namespace
def __init__(self, args: Namespace) -> None:
self.args = args
def run(self) -> None:
if self.args.pause:
subprocess.run(["pkill", "-USR2", "-f", RECORDER], stdout=subprocess.DEVNULL)
elif self.proc_running():
self.stop()
else:
self.start()
def proc_running(self) -> bool:
return subprocess.run(["pidof", RECORDER], stdout=subprocess.DEVNULL).returncode == 0
def intersects(self, a: tuple[int, int, int, int], b: tuple[int, int, int, int]) -> bool:
return a[0] < b[0] + b[2] and a[0] + a[2] > b[0] and a[1] < b[1] + b[3] and a[1] + a[3] > b[1]
def start(self) -> None:
args = ["-w"]
monitors = json.loads(subprocess.check_output(["hyprctl", "monitors", "-j"]))
if self.args.region:
if self.args.region == "slurp":
region = subprocess.check_output(["slurp", "-f", "%wx%h+%x+%y"], text=True)
else:
region = self.args.region.strip()
args += ["region", "-region", region]
m = re.match(r"(\d+)x(\d+)\+(\d+)\+(\d+)", region)
if not m:
raise ValueError(f"Invalid region: {region}")
w, h, x, y = map(int, m.groups())
r = x, y, w, h
max_rr = 0
for monitor in monitors:
if self.intersects((monitor["x"], monitor["y"], monitor["width"], monitor["height"]), r):
rr = round(monitor["refreshRate"])
max_rr = max(max_rr, rr)
args += ["-f", str(max_rr)]
else:
focused_monitor = next(monitor for monitor in monitors if monitor["focused"])
if focused_monitor:
args += [focused_monitor["name"], "-f", str(round(focused_monitor["refreshRate"]))]
if self.args.sound:
args += ["-a", "default_output"]
recording_path.parent.mkdir(parents=True, exist_ok=True)
proc = subprocess.Popen([RECORDER, *args, "-o", str(recording_path)], start_new_session=True)
notif = notify("-p", "Recording started", "Recording...")
recording_notif_path.write_text(notif)
try:
if proc.wait(1) != 0:
close_notification(notif)
notify(
"Recording failed",
"An error occurred attempting to start recorder. "
f"Command `{' '.join(proc.args)}` failed with exit code {proc.returncode}",
)
except subprocess.TimeoutExpired:
pass
def stop(self) -> None:
# Start killing recording process
subprocess.run(["pkill", "-f", RECORDER], stdout=subprocess.DEVNULL)
# Wait for recording to finish to avoid corrupted video file
while self.proc_running():
time.sleep(0.1)
# Move to recordings folder
new_path = recordings_dir / f"recording_{datetime.now().strftime('%Y%m%d_%H-%M-%S')}.mp4"
recordings_dir.mkdir(exist_ok=True, parents=True)
shutil.move(recording_path, new_path)
# Close start notification
try:
close_notification(recording_notif_path.read_text())
except IOError:
pass
action = notify(
"--action=watch=Watch",
"--action=open=Open",
"--action=delete=Delete",
"Recording stopped",
f"Recording saved in {new_path}",
)
if action == "watch":
subprocess.Popen(["app2unit", "-O", new_path], start_new_session=True)
elif action == "open":
p = subprocess.run(
[
"dbus-send",
"--session",
"--dest=org.freedesktop.FileManager1",
"--type=method_call",
"/org/freedesktop/FileManager1",
"org.freedesktop.FileManager1.ShowItems",
f"array:string:file://{new_path}",
"string:",
]
)
if p.returncode != 0:
subprocess.Popen(["app2unit", "-O", new_path.parent], start_new_session=True)
elif action == "delete":
new_path.unlink()
|