From 55d835ad51dd4e114d367b3711ce0025a15fe26f Mon Sep 17 00:00:00 2001 From: anatawa12 Date: Wed, 2 Apr 2025 10:37:16 +0900 Subject: Fix: 通知のページネーションで2つ以上読み込めなくなることがある問題 (#15277) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: notifications-groupedのinclude/exclude typesに:groupedを指定できてしまう問題 * refactor: 通知の取得処理を Notification Service に移動 * feat: add function to parse additional part of id * fix: 通知のページネーションが正しく動かない問題 Redisにのページネーションで使用する時間及びidとRedis上のものが混同されていたので、Misskeyが生成するものに寄せました。 * pnpm run build-misskey-js-with-types * chore: XADDをretryするように * fix: notifications-groupedでxrevrangeしているのを消し忘れていた --- packages/backend/src/core/NotificationService.ts | 123 +++++++++++++++++++---- 1 file changed, 106 insertions(+), 17 deletions(-) (limited to 'packages/backend/src/core/NotificationService.ts') diff --git a/packages/backend/src/core/NotificationService.ts b/packages/backend/src/core/NotificationService.ts index 68ad92f396..eeade4569b 100644 --- a/packages/backend/src/core/NotificationService.ts +++ b/packages/backend/src/core/NotificationService.ts @@ -7,6 +7,7 @@ import { setTimeout } from 'node:timers/promises'; import * as Redis from 'ioredis'; import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common'; import { In } from 'typeorm'; +import { ReplyError } from 'ioredis'; import { DI } from '@/di-symbols.js'; import type { UsersRepository } from '@/models/_.js'; import type { MiUser } from '@/models/User.js'; @@ -19,7 +20,7 @@ import { IdService } from '@/core/IdService.js'; import { CacheService } from '@/core/CacheService.js'; import type { Config } from '@/config.js'; import { UserListService } from '@/core/UserListService.js'; -import type { FilterUnionByProperty } from '@/types.js'; +import { FilterUnionByProperty, groupedNotificationTypes, obsoleteNotificationTypes } from '@/types.js'; import { trackPromise } from '@/misc/promise-tracker.js'; @Injectable() @@ -145,21 +146,36 @@ export class NotificationService implements OnApplicationShutdown { } } - const notification = { - id: this.idService.gen(), - createdAt: new Date(), - type: type, - ...(notifierId ? { - notifierId, - } : {}), - ...data, - } as any as FilterUnionByProperty; - - const redisIdPromise = this.redisClient.xadd( - `notificationTimeline:${notifieeId}`, - 'MAXLEN', '~', this.config.perUserNotificationsMaxCount.toString(), - '*', - 'data', JSON.stringify(notification)); + const createdAt = new Date(); + let notification: FilterUnionByProperty; + let redisId: string; + + do { + notification = { + id: this.idService.gen(), + createdAt, + type: type, + ...(notifierId ? { + notifierId, + } : {}), + ...data, + } as unknown as FilterUnionByProperty; + + try { + redisId = (await this.redisClient.xadd( + `notificationTimeline:${notifieeId}`, + 'MAXLEN', '~', this.config.perUserNotificationsMaxCount.toString(), + this.toXListId(notification.id), + 'data', JSON.stringify(notification)))!; + } catch (e) { + // The ID specified in XADD is equal or smaller than the target stream top item で失敗することがあるのでリトライ + if (e instanceof ReplyError) continue; + throw e; + } + + break; + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + } while (true); const packed = await this.notificationEntityService.pack(notification, notifieeId, {}); @@ -173,7 +189,7 @@ export class NotificationService implements OnApplicationShutdown { const interval = notification.type === 'test' ? 0 : 2000; setTimeout(interval, 'unread notification', { signal: this.#shutdownController.signal }).then(async () => { const latestReadNotificationId = await this.redisClient.get(`latestReadNotification:${notifieeId}`); - if (latestReadNotificationId && (latestReadNotificationId >= (await redisIdPromise)!)) return; + if (latestReadNotificationId && (latestReadNotificationId >= redisId)) return; this.globalEventService.publishMainStream(notifieeId, 'unreadNotification', packed); this.pushNotificationService.pushNotification(notifieeId, 'notification', packed); @@ -228,6 +244,79 @@ export class NotificationService implements OnApplicationShutdown { this.#shutdownController.abort(); } + private toXListId(id: string): string { + const { date, additional } = this.idService.parseFull(id); + return date.toString() + '-' + additional.toString(); + } + + @bindThis + public async getNotifications( + userId: MiUser['id'], + { + sinceId, + untilId, + limit = 20, + includeTypes, + excludeTypes, + }: { + sinceId?: string, + untilId?: string, + limit?: number, + // any extra types are allowed, those are no-op + includeTypes?: (MiNotification['type'] | string)[], + excludeTypes?: (MiNotification['type'] | string)[], + }, + ): Promise { + let sinceTime = sinceId ? this.toXListId(sinceId) : null; + let untilTime = untilId ? this.toXListId(untilId) : null; + + let notifications: MiNotification[]; + for (;;) { + let notificationsRes: [id: string, fields: string[]][]; + + // sinceidのみの場合は古い順、そうでない場合は新しい順。 QueryService.makePaginationQueryも参照 + if (sinceTime && !untilTime) { + notificationsRes = await this.redisClient.xrange( + `notificationTimeline:${userId}`, + '(' + sinceTime, + '+', + 'COUNT', limit); + } else { + notificationsRes = await this.redisClient.xrevrange( + `notificationTimeline:${userId}`, + untilTime ? '(' + untilTime : '+', + sinceTime ? '(' + sinceTime : '-', + 'COUNT', limit); + } + + if (notificationsRes.length === 0) { + return []; + } + + notifications = notificationsRes.map(x => JSON.parse(x[1][1])) as MiNotification[]; + + if (includeTypes && includeTypes.length > 0) { + notifications = notifications.filter(notification => includeTypes.includes(notification.type)); + } else if (excludeTypes && excludeTypes.length > 0) { + notifications = notifications.filter(notification => !excludeTypes.includes(notification.type)); + } + + if (notifications.length !== 0) { + // 通知が1件以上ある場合は返す + break; + } + + // フィルタしたことで通知が0件になった場合、次のページを取得する + if (sinceId && !untilId) { + sinceTime = notificationsRes[notificationsRes.length - 1][0]; + } else { + untilTime = notificationsRes[notificationsRes.length - 1][0]; + } + } + + return notifications; + } + @bindThis public onApplicationShutdown(signal?: string | undefined): void { this.dispose(); -- cgit v1.2.3-freya