diff options
| author | Julia <julia@insertdomain.name> | 2025-05-29 00:07:38 +0000 |
|---|---|---|
| committer | Julia <julia@insertdomain.name> | 2025-05-29 00:07:38 +0000 |
| commit | 6b554c178b81f13f83a69b19d44b72b282a0c119 (patch) | |
| tree | f5537f1a56323a4dd57ba150b3cb84a2d8b5dc63 /packages/backend/src/core/NotificationService.ts | |
| parent | merge: Security fixes (!970) (diff) | |
| parent | bump version for release (diff) | |
| download | sharkey-6b554c178b81f13f83a69b19d44b72b282a0c119.tar.gz sharkey-6b554c178b81f13f83a69b19d44b72b282a0c119.tar.bz2 sharkey-6b554c178b81f13f83a69b19d44b72b282a0c119.zip | |
merge: release 2025.4.2 (!1051)
View MR for information: https://activitypub.software/TransFem-org/Sharkey/-/merge_requests/1051
Approved-by: Hazelnoot <acomputerdog@gmail.com>
Approved-by: Marie <github@yuugi.dev>
Approved-by: Julia <julia@insertdomain.name>
Diffstat (limited to 'packages/backend/src/core/NotificationService.ts')
| -rw-r--r-- | packages/backend/src/core/NotificationService.ts | 121 |
1 files changed, 105 insertions, 16 deletions
diff --git a/packages/backend/src/core/NotificationService.ts b/packages/backend/src/core/NotificationService.ts index 68ad92f396..0f05f5425d 100644 --- a/packages/backend/src/core/NotificationService.ts +++ b/packages/backend/src/core/NotificationService.ts @@ -7,6 +7,7 @@ import { setTimeout } from 'node:timers/promises'; import * as Redis from 'ioredis'; import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common'; import { In } from 'typeorm'; +import { ReplyError } from 'ioredis'; import { DI } from '@/di-symbols.js'; import type { UsersRepository } from '@/models/_.js'; import type { MiUser } from '@/models/User.js'; @@ -19,7 +20,7 @@ import { IdService } from '@/core/IdService.js'; import { CacheService } from '@/core/CacheService.js'; import type { Config } from '@/config.js'; import { UserListService } from '@/core/UserListService.js'; -import type { FilterUnionByProperty } from '@/types.js'; +import { FilterUnionByProperty, groupedNotificationTypes, obsoleteNotificationTypes } from '@/types.js'; import { trackPromise } from '@/misc/promise-tracker.js'; @Injectable() @@ -145,21 +146,36 @@ export class NotificationService implements OnApplicationShutdown { } } - const notification = { - id: this.idService.gen(), - createdAt: new Date(), - type: type, - ...(notifierId ? { - notifierId, - } : {}), - ...data, - } as any as FilterUnionByProperty<MiNotification, 'type', T>; + const createdAt = new Date(); + let notification: FilterUnionByProperty<MiNotification, 'type', T>; + let redisId: string; - const redisIdPromise = this.redisClient.xadd( - `notificationTimeline:${notifieeId}`, - 'MAXLEN', '~', this.config.perUserNotificationsMaxCount.toString(), - '*', - 'data', JSON.stringify(notification)); + do { + notification = { + id: this.idService.gen(), + createdAt, + type: type, + ...(notifierId ? { + notifierId, + } : {}), + ...data, + } as unknown as FilterUnionByProperty<MiNotification, 'type', T>; + + try { + redisId = (await this.redisClient.xadd( + `notificationTimeline:${notifieeId}`, + 'MAXLEN', '~', this.config.perUserNotificationsMaxCount.toString(), + this.toXListId(notification.id, 0), + 'data', JSON.stringify(notification)))!; + } catch (e) { + // The ID specified in XADD is equal or smaller than the target stream top item で失敗することがあるのでリトライ + if (e instanceof ReplyError) continue; + throw e; + } + + break; + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + } while (true); const packed = await this.notificationEntityService.pack(notification, notifieeId, {}); @@ -173,7 +189,7 @@ export class NotificationService implements OnApplicationShutdown { const interval = notification.type === 'test' ? 0 : 2000; setTimeout(interval, 'unread notification', { signal: this.#shutdownController.signal }).then(async () => { const latestReadNotificationId = await this.redisClient.get(`latestReadNotification:${notifieeId}`); - if (latestReadNotificationId && (latestReadNotificationId >= (await redisIdPromise)!)) return; + if (latestReadNotificationId && (latestReadNotificationId >= redisId)) return; this.globalEventService.publishMainStream(notifieeId, 'unreadNotification', packed); this.pushNotificationService.pushNotification(notifieeId, 'notification', packed); @@ -228,6 +244,79 @@ export class NotificationService implements OnApplicationShutdown { this.#shutdownController.abort(); } + private toXListId(id: string, offset: number): string { + const { date, additional } = this.idService.parseFull(id); + return (date + offset).toString() + '-' + additional.toString(); + } + + @bindThis + public async getNotifications( + userId: MiUser['id'], + { + sinceId, + untilId, + limit = 20, + includeTypes, + excludeTypes, + }: { + sinceId?: string, + untilId?: string, + limit?: number, + // any extra types are allowed, those are no-op + includeTypes?: (MiNotification['type'] | string)[], + excludeTypes?: (MiNotification['type'] | string)[], + }, + ): Promise<MiNotification[]> { + let sinceTime = sinceId ? this.toXListId(sinceId, 1) : null; + let untilTime = untilId ? this.toXListId(untilId, -1) : null; + + let notifications: MiNotification[]; + for (;;) { + let notificationsRes: [id: string, fields: string[]][]; + + // sinceidのみの場合は古い順、そうでない場合は新しい順。 QueryService.makePaginationQueryも参照 + if (sinceTime && !untilTime) { + notificationsRes = await this.redisClient.xrange( + `notificationTimeline:${userId}`, + '(' + sinceTime, + '+', + 'COUNT', limit); + } else { + notificationsRes = await this.redisClient.xrevrange( + `notificationTimeline:${userId}`, + untilTime ? '(' + untilTime : '+', + sinceTime ? '(' + sinceTime : '-', + 'COUNT', limit); + } + + if (notificationsRes.length === 0) { + return []; + } + + notifications = notificationsRes.map(x => JSON.parse(x[1][1])) as MiNotification[]; + + if (includeTypes && includeTypes.length > 0) { + notifications = notifications.filter(notification => includeTypes.includes(notification.type)); + } else if (excludeTypes && excludeTypes.length > 0) { + notifications = notifications.filter(notification => !excludeTypes.includes(notification.type)); + } + + if (notifications.length !== 0) { + // 通知が1件以上ある場合は返す + break; + } + + // フィルタしたことで通知が0件になった場合、次のページを取得する + if (sinceId && !untilId) { + sinceTime = notificationsRes[notificationsRes.length - 1][0]; + } else { + untilTime = notificationsRes[notificationsRes.length - 1][0]; + } + } + + return notifications; + } + @bindThis public onApplicationShutdown(signal?: string | undefined): void { this.dispose(); |