summaryrefslogtreecommitdiff
path: root/packages/backend/src/daemons/QueueStatsService.ts
diff options
context:
space:
mode:
authorsyuilo <Syuilotan@yahoo.co.jp>2022-09-18 03:27:08 +0900
committerGitHub <noreply@github.com>2022-09-18 03:27:08 +0900
commitb75184ec8e3436200bacdcd832e3324702553d20 (patch)
tree8b7e316f29e95df921db57289c8b8da476d18f07 /packages/backend/src/daemons/QueueStatsService.ts
parentUpdate ROADMAP.md (diff)
downloadsharkey-b75184ec8e3436200bacdcd832e3324702553d20.tar.gz
sharkey-b75184ec8e3436200bacdcd832e3324702553d20.tar.bz2
sharkey-b75184ec8e3436200bacdcd832e3324702553d20.zip
なんかもうめっちゃ変えた
Diffstat (limited to 'packages/backend/src/daemons/QueueStatsService.ts')
-rw-r--r--packages/backend/src/daemons/QueueStatsService.ts77
1 files changed, 77 insertions, 0 deletions
diff --git a/packages/backend/src/daemons/QueueStatsService.ts b/packages/backend/src/daemons/QueueStatsService.ts
new file mode 100644
index 0000000000..f05d3ce33f
--- /dev/null
+++ b/packages/backend/src/daemons/QueueStatsService.ts
@@ -0,0 +1,77 @@
+import { Inject, Injectable } from '@nestjs/common';
+import Xev from 'xev';
+import { DI } from '@/di-symbols.js';
+import { QueueService } from '@/core/QueueService.js';
+import type { OnApplicationShutdown } from '@nestjs/common';
+
+const ev = new Xev();
+
+const interval = 10000;
+
+@Injectable()
+export class QueueStatsService implements OnApplicationShutdown {
+ #intervalId: NodeJS.Timer;
+
+ constructor(
+ private queueService: QueueService,
+ ) {
+ }
+
+ /**
+ * Report queue stats regularly
+ */
+ public start(): void {
+ const log = [] as any[];
+
+ ev.on('requestQueueStatsLog', x => {
+ ev.emit(`queueStatsLog:${x.id}`, log.slice(0, x.length ?? 50));
+ });
+
+ let activeDeliverJobs = 0;
+ let activeInboxJobs = 0;
+
+ this.queueService.deliverQueue.on('global:active', () => {
+ activeDeliverJobs++;
+ });
+
+ this.queueService.inboxQueue.on('global:active', () => {
+ activeInboxJobs++;
+ });
+
+ const tick = async () => {
+ const deliverJobCounts = await this.queueService.deliverQueue.getJobCounts();
+ const inboxJobCounts = await this.queueService.inboxQueue.getJobCounts();
+
+ const stats = {
+ deliver: {
+ activeSincePrevTick: activeDeliverJobs,
+ active: deliverJobCounts.active,
+ waiting: deliverJobCounts.waiting,
+ delayed: deliverJobCounts.delayed,
+ },
+ inbox: {
+ activeSincePrevTick: activeInboxJobs,
+ active: inboxJobCounts.active,
+ waiting: inboxJobCounts.waiting,
+ delayed: inboxJobCounts.delayed,
+ },
+ };
+
+ ev.emit('queueStats', stats);
+
+ log.unshift(stats);
+ if (log.length > 200) log.pop();
+
+ activeDeliverJobs = 0;
+ activeInboxJobs = 0;
+ };
+
+ tick();
+
+ this.#intervalId = setInterval(tick, interval);
+ }
+
+ public onApplicationShutdown(signal?: string | undefined) {
+ clearInterval(this.#intervalId);
+ }
+}