summaryrefslogtreecommitdiff
path: root/packages/backend/src/core/NotificationService.ts
diff options
context:
space:
mode:
Diffstat (limited to 'packages/backend/src/core/NotificationService.ts')
-rw-r--r--packages/backend/src/core/NotificationService.ts121
1 files changed, 105 insertions, 16 deletions
diff --git a/packages/backend/src/core/NotificationService.ts b/packages/backend/src/core/NotificationService.ts
index 68ad92f396..0b699a934e 100644
--- a/packages/backend/src/core/NotificationService.ts
+++ b/packages/backend/src/core/NotificationService.ts
@@ -7,6 +7,7 @@ import { setTimeout } from 'node:timers/promises';
import * as Redis from 'ioredis';
import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common';
import { In } from 'typeorm';
+import { ReplyError } from 'ioredis';
import { DI } from '@/di-symbols.js';
import type { UsersRepository } from '@/models/_.js';
import type { MiUser } from '@/models/User.js';
@@ -19,7 +20,7 @@ import { IdService } from '@/core/IdService.js';
import { CacheService } from '@/core/CacheService.js';
import type { Config } from '@/config.js';
import { UserListService } from '@/core/UserListService.js';
-import type { FilterUnionByProperty } from '@/types.js';
+import { FilterUnionByProperty, groupedNotificationTypes, obsoleteNotificationTypes } from '@/types.js';
import { trackPromise } from '@/misc/promise-tracker.js';
@Injectable()
@@ -145,21 +146,36 @@ export class NotificationService implements OnApplicationShutdown {
}
}
- const notification = {
- id: this.idService.gen(),
- createdAt: new Date(),
- type: type,
- ...(notifierId ? {
- notifierId,
- } : {}),
- ...data,
- } as any as FilterUnionByProperty<MiNotification, 'type', T>;
+ const createdAt = new Date();
+ let notification: FilterUnionByProperty<MiNotification, 'type', T>;
+ let redisId: string;
- const redisIdPromise = this.redisClient.xadd(
- `notificationTimeline:${notifieeId}`,
- 'MAXLEN', '~', this.config.perUserNotificationsMaxCount.toString(),
- '*',
- 'data', JSON.stringify(notification));
+ do {
+ notification = {
+ id: this.idService.gen(),
+ createdAt,
+ type: type,
+ ...(notifierId ? {
+ notifierId,
+ } : {}),
+ ...data,
+ } as unknown as FilterUnionByProperty<MiNotification, 'type', T>;
+
+ try {
+ redisId = (await this.redisClient.xadd(
+ `notificationTimeline:${notifieeId}`,
+ 'MAXLEN', '~', this.config.perUserNotificationsMaxCount.toString(),
+ this.toXListId(notification.id),
+ 'data', JSON.stringify(notification)))!;
+ } catch (e) {
+ // The ID specified in XADD is equal or smaller than the target stream top item で失敗することがあるのでリトライ
+ if (e instanceof ReplyError) continue;
+ throw e;
+ }
+
+ break;
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+ } while (true);
const packed = await this.notificationEntityService.pack(notification, notifieeId, {});
@@ -173,7 +189,7 @@ export class NotificationService implements OnApplicationShutdown {
const interval = notification.type === 'test' ? 0 : 2000;
setTimeout(interval, 'unread notification', { signal: this.#shutdownController.signal }).then(async () => {
const latestReadNotificationId = await this.redisClient.get(`latestReadNotification:${notifieeId}`);
- if (latestReadNotificationId && (latestReadNotificationId >= (await redisIdPromise)!)) return;
+ if (latestReadNotificationId && (latestReadNotificationId >= redisId)) return;
this.globalEventService.publishMainStream(notifieeId, 'unreadNotification', packed);
this.pushNotificationService.pushNotification(notifieeId, 'notification', packed);
@@ -228,6 +244,79 @@ export class NotificationService implements OnApplicationShutdown {
this.#shutdownController.abort();
}
+ private toXListId(id: string, offset: number): string {
+ const { date, additional } = this.idService.parseFull(id);
+ return (date + offset).toString() + '-' + additional.toString();
+ }
+
+ @bindThis
+ public async getNotifications(
+ userId: MiUser['id'],
+ {
+ sinceId,
+ untilId,
+ limit = 20,
+ includeTypes,
+ excludeTypes,
+ }: {
+ sinceId?: string,
+ untilId?: string,
+ limit?: number,
+ // any extra types are allowed, those are no-op
+ includeTypes?: (MiNotification['type'] | string)[],
+ excludeTypes?: (MiNotification['type'] | string)[],
+ },
+ ): Promise<MiNotification[]> {
+ let sinceTime = sinceId ? this.toXListId(sinceId, 1) : null;
+ let untilTime = untilId ? this.toXListId(untilId, -1) : null;
+
+ let notifications: MiNotification[];
+ for (;;) {
+ let notificationsRes: [id: string, fields: string[]][];
+
+ // sinceidのみの場合は古い順、そうでない場合は新しい順。 QueryService.makePaginationQueryも参照
+ if (sinceTime && !untilTime) {
+ notificationsRes = await this.redisClient.xrange(
+ `notificationTimeline:${userId}`,
+ '(' + sinceTime,
+ '+',
+ 'COUNT', limit);
+ } else {
+ notificationsRes = await this.redisClient.xrevrange(
+ `notificationTimeline:${userId}`,
+ untilTime ? '(' + untilTime : '+',
+ sinceTime ? '(' + sinceTime : '-',
+ 'COUNT', limit);
+ }
+
+ if (notificationsRes.length === 0) {
+ return [];
+ }
+
+ notifications = notificationsRes.map(x => JSON.parse(x[1][1])) as MiNotification[];
+
+ if (includeTypes && includeTypes.length > 0) {
+ notifications = notifications.filter(notification => includeTypes.includes(notification.type));
+ } else if (excludeTypes && excludeTypes.length > 0) {
+ notifications = notifications.filter(notification => !excludeTypes.includes(notification.type));
+ }
+
+ if (notifications.length !== 0) {
+ // 通知が1件以上ある場合は返す
+ break;
+ }
+
+ // フィルタしたことで通知が0件になった場合、次のページを取得する
+ if (sinceId && !untilId) {
+ sinceTime = notificationsRes[notificationsRes.length - 1][0];
+ } else {
+ untilTime = notificationsRes[notificationsRes.length - 1][0];
+ }
+ }
+
+ return notifications;
+ }
+
@bindThis
public onApplicationShutdown(signal?: string | undefined): void {
this.dispose();