summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHazelnoot <acomputerdog@gmail.com>2025-05-21 21:51:36 -0400
committerHazelnoot <acomputerdog@gmail.com>2025-06-09 11:02:36 -0400
commitaa7cadbb6c5068e231638ff25922c439ff949783 (patch)
tree86832ab2646e9b66d2ea03110e7108eab3c99b42
parentmerge: Reduce log spam (!1004) (diff)
downloadsharkey-aa7cadbb6c5068e231638ff25922c439ff949783.tar.gz
sharkey-aa7cadbb6c5068e231638ff25922c439ff949783.tar.bz2
sharkey-aa7cadbb6c5068e231638ff25922c439ff949783.zip
implement InternalEventService
-rw-r--r--packages/backend/src/core/CoreModule.ts6
-rw-r--r--packages/backend/src/core/GlobalEventService.ts9
-rw-r--r--packages/backend/src/core/InternalEventService.ts102
-rw-r--r--packages/backend/test/misc/FakeInternalEventService.ts92
4 files changed, 207 insertions, 2 deletions
diff --git a/packages/backend/src/core/CoreModule.ts b/packages/backend/src/core/CoreModule.ts
index dd8e61d322..6839ba0159 100644
--- a/packages/backend/src/core/CoreModule.ts
+++ b/packages/backend/src/core/CoreModule.ts
@@ -41,6 +41,7 @@ import { HttpRequestService } from './HttpRequestService.js';
import { IdService } from './IdService.js';
import { ImageProcessingService } from './ImageProcessingService.js';
import { SystemAccountService } from './SystemAccountService.js';
+import { InternalEventService } from './InternalEventService.js';
import { InternalStorageService } from './InternalStorageService.js';
import { MetaService } from './MetaService.js';
import { MfmService } from './MfmService.js';
@@ -186,6 +187,7 @@ const $HashtagService: Provider = { provide: 'HashtagService', useExisting: Hash
const $HttpRequestService: Provider = { provide: 'HttpRequestService', useExisting: HttpRequestService };
const $IdService: Provider = { provide: 'IdService', useExisting: IdService };
const $ImageProcessingService: Provider = { provide: 'ImageProcessingService', useExisting: ImageProcessingService };
+const $InternalEventService: Provider = { provide: 'InternalEventService', useExisting: InternalEventService };
const $InternalStorageService: Provider = { provide: 'InternalStorageService', useExisting: InternalStorageService };
const $MetaService: Provider = { provide: 'MetaService', useExisting: MetaService };
const $MfmService: Provider = { provide: 'MfmService', useExisting: MfmService };
@@ -345,6 +347,7 @@ const $SponsorsService: Provider = { provide: 'SponsorsService', useExisting: Sp
HttpRequestService,
IdService,
ImageProcessingService,
+ InternalEventService,
InternalStorageService,
MetaService,
MfmService,
@@ -500,6 +503,7 @@ const $SponsorsService: Provider = { provide: 'SponsorsService', useExisting: Sp
$HttpRequestService,
$IdService,
$ImageProcessingService,
+ $InternalEventService,
$InternalStorageService,
$MetaService,
$MfmService,
@@ -656,6 +660,7 @@ const $SponsorsService: Provider = { provide: 'SponsorsService', useExisting: Sp
HttpRequestService,
IdService,
ImageProcessingService,
+ InternalEventService,
InternalStorageService,
MetaService,
MfmService,
@@ -810,6 +815,7 @@ const $SponsorsService: Provider = { provide: 'SponsorsService', useExisting: Sp
$HttpRequestService,
$IdService,
$ImageProcessingService,
+ $InternalEventService,
$InternalStorageService,
$MetaService,
$MfmService,
diff --git a/packages/backend/src/core/GlobalEventService.ts b/packages/backend/src/core/GlobalEventService.ts
index c0027ae129..d1a5cabd85 100644
--- a/packages/backend/src/core/GlobalEventService.ts
+++ b/packages/backend/src/core/GlobalEventService.ts
@@ -353,12 +353,12 @@ export class GlobalEventService {
}
@bindThis
- private publish(channel: StreamChannels, type: string | null, value?: any): void {
+ private async publish(channel: StreamChannels, type: string | null, value?: any): Promise<void> {
const message = type == null ? value : value == null ?
{ type: type, body: null } :
{ type: type, body: value };
- this.redisForPub.publish(this.config.host, JSON.stringify({
+ await this.redisForPub.publish(this.config.host, JSON.stringify({
channel: channel,
message: message,
}));
@@ -370,6 +370,11 @@ export class GlobalEventService {
}
@bindThis
+ public async publishInternalEventAsync<K extends keyof InternalEventTypes>(type: K, value?: InternalEventTypes[K]): Promise<void> {
+ await this.publish('internal', type, typeof value === 'undefined' ? null : value);
+ }
+
+ @bindThis
public publishBroadcastStream<K extends keyof BroadcastTypes>(type: K, value?: BroadcastTypes[K]): void {
this.publish('broadcast', type, typeof value === 'undefined' ? null : value);
}
diff --git a/packages/backend/src/core/InternalEventService.ts b/packages/backend/src/core/InternalEventService.ts
new file mode 100644
index 0000000000..375ee928c4
--- /dev/null
+++ b/packages/backend/src/core/InternalEventService.ts
@@ -0,0 +1,102 @@
+/*
+ * SPDX-FileCopyrightText: hazelnoot and other Sharkey contributors
+ * SPDX-License-Identifier: AGPL-3.0-only
+ */
+
+import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common';
+import Redis from 'ioredis';
+import { DI } from '@/di-symbols.js';
+import { GlobalEventService } from '@/core/GlobalEventService.js';
+import type { GlobalEvents, InternalEventTypes } from '@/core/GlobalEventService.js';
+import { bindThis } from '@/decorators.js';
+
+export type Listener<K extends keyof InternalEventTypes> = (value: InternalEventTypes[K], key: K) => void | Promise<void>;
+
+export interface ListenerProps {
+ ignoreLocal?: boolean,
+}
+
+@Injectable()
+export class InternalEventService implements OnApplicationShutdown {
+ private readonly listeners = new Map<keyof InternalEventTypes, Map<Listener<keyof InternalEventTypes>, ListenerProps>>();
+
+ constructor(
+ @Inject(DI.redisForSub)
+ private readonly redisForSub: Redis.Redis,
+
+ private readonly globalEventService: GlobalEventService,
+ ) {
+ this.redisForSub.on('message', this.onMessage);
+ }
+
+ @bindThis
+ public on<K extends keyof InternalEventTypes>(type: K, listener: Listener<K>, props?: ListenerProps): void {
+ let set = this.listeners.get(type);
+ if (!set) {
+ set = new Map();
+ this.listeners.set(type, set);
+ }
+
+ // Functionally, this is just a set with metadata on the values.
+ set.set(listener as Listener<keyof InternalEventTypes>, props ?? {});
+ }
+
+ @bindThis
+ public off<K extends keyof InternalEventTypes>(type: K, listener: Listener<K>): void {
+ this.listeners.get(type)?.delete(listener as Listener<keyof InternalEventTypes>);
+ }
+
+ @bindThis
+ public async emit<K extends keyof InternalEventTypes>(type: K, value: InternalEventTypes[K]): Promise<void> {
+ await this.emitInternal(type, value, true);
+ await this.globalEventService.publishInternalEventAsync(type, { ...value, _pid: process.pid });
+ }
+
+ @bindThis
+ private async emitInternal<K extends keyof InternalEventTypes>(type: K, value: InternalEventTypes[K], isLocal: boolean): Promise<void> {
+ const listeners = this.listeners.get(type);
+ if (!listeners) {
+ return;
+ }
+
+ const promises: Promise<void>[] = [];
+ for (const [listener, props] of listeners) {
+ if (!isLocal || !props.ignoreLocal) {
+ const promise = Promise.resolve(listener(value, type));
+ promises.push(promise);
+ }
+ }
+ await Promise.all(promises);
+ }
+
+ @bindThis
+ private async onMessage(_: string, data: string): Promise<void> {
+ const obj = JSON.parse(data);
+
+ if (obj.channel === 'internal') {
+ const { type, body } = obj.message as GlobalEvents['internal']['payload'];
+ if (!isLocalInternalEvent(body) || body._pid !== process.pid) {
+ await this.emitInternal(type, body as InternalEventTypes[keyof InternalEventTypes], false);
+ }
+ }
+ }
+
+ @bindThis
+ public dispose(): void {
+ this.redisForSub.off('message', this.onMessage);
+ this.listeners.clear();
+ }
+
+ @bindThis
+ public onApplicationShutdown(): void {
+ this.dispose();
+ }
+}
+
+interface LocalInternalEvent {
+ _pid: number;
+}
+
+function isLocalInternalEvent(body: object): body is LocalInternalEvent {
+ return '_pid' in body && typeof(body._pid) === 'number';
+}
diff --git a/packages/backend/test/misc/FakeInternalEventService.ts b/packages/backend/test/misc/FakeInternalEventService.ts
new file mode 100644
index 0000000000..ffe8b81d78
--- /dev/null
+++ b/packages/backend/test/misc/FakeInternalEventService.ts
@@ -0,0 +1,92 @@
+/*
+ * SPDX-FileCopyrightText: hazelnoot and other Sharkey contributors
+ * SPDX-License-Identifier: AGPL-3.0-only
+ */
+
+import type { Listener, ListenerProps } from '@/core/InternalEventService.js';
+import type Redis from 'ioredis';
+import type { GlobalEventService, InternalEventTypes } from '@/core/GlobalEventService.js';
+import { InternalEventService } from '@/core/InternalEventService.js';
+import { bindThis } from '@/decorators.js';
+
+type FakeCall<K extends keyof InternalEventService> = [K, Parameters<InternalEventService[K]>];
+type FakeListener<K extends keyof InternalEventTypes> = [K, Listener<K>, ListenerProps];
+
+/**
+ * Minimal implementation of InternalEventService meant for use in unit tests.
+ * There is no redis connection, and metadata is tracked in the public _calls and _listeners arrays.
+ * The on/off/emit methods are fully functional and can be called in tests to invoke any registered listeners.
+ */
+export class FakeInternalEventService extends InternalEventService {
+ /**
+ * List of calls to public methods, in chronological order.
+ */
+ public _calls: FakeCall<keyof InternalEventService>[] = [];
+
+ /**
+ * List of currently registered listeners.
+ */
+ public _listeners: FakeListener<keyof InternalEventTypes>[] = [];
+
+ /**
+ * Resets the mock.
+ * Clears all listeners and tracked calls.
+ */
+ public _reset() {
+ this._calls = [];
+ this._listeners = [];
+ }
+
+ /**
+ * Simulates a remote event sent from another process in the cluster via redis.
+ */
+ @bindThis
+ public async _emitRedis<K extends keyof InternalEventTypes>(type: K, value: InternalEventTypes[K]): Promise<void> {
+ await this.emit(type, value, false);
+ }
+
+ constructor() {
+ super(
+ { on: () => {} } as unknown as Redis.Redis,
+ {} as unknown as GlobalEventService,
+ );
+ }
+
+ @bindThis
+ public on<K extends keyof InternalEventTypes>(type: K, listener: Listener<K>, props?: ListenerProps): void {
+ if (!this._listeners.some(l => l[0] === type && l[1] === listener)) {
+ this._listeners.push([type, listener as Listener<keyof InternalEventTypes>, props ?? {}]);
+ }
+ this._calls.push(['on', [type, listener as Listener<keyof InternalEventTypes>, props]]);
+ }
+
+ @bindThis
+ public off<K extends keyof InternalEventTypes>(type: K, listener: Listener<K>): void {
+ this._listeners = this._listeners.filter(l => l[0] !== type || l[1] !== listener);
+ this._calls.push(['off', [type, listener as Listener<keyof InternalEventTypes>]]);
+ }
+
+ @bindThis
+ public async emit<K extends keyof InternalEventTypes>(type: K, value: InternalEventTypes[K], isLocal = true): Promise<void> {
+ for (const listener of this._listeners) {
+ if (listener[0] === type) {
+ if (!isLocal || !listener[2].ignoreLocal) {
+ await listener[1](value, type);
+ }
+ }
+ }
+ this._calls.push(['emit', [type, value]]);
+ }
+
+ @bindThis
+ public dispose(): void {
+ this._listeners = [];
+ this._calls.push(['dispose', []]);
+ }
+
+ @bindThis
+ public onApplicationShutdown(): void {
+ this._calls.push(['onApplicationShutdown', []]);
+ }
+}
+