diff options
| author | dakkar <dakkar@thenautilus.net> | 2024-12-15 17:27:12 +0000 |
|---|---|---|
| committer | dakkar <dakkar@thenautilus.net> | 2024-12-15 17:27:12 +0000 |
| commit | e2352839e4639b09e2e52b2ada1399097fad1d8d (patch) | |
| tree | 9268cda477b8c1dcfb2c78eaabcb173a1566a469 /packages/backend/src/queue | |
| parent | merge: Fix rate limits under multi-node environments (!809) (diff) | |
| parent | upstream merge checklist: remember to check federated profile fields (diff) | |
| download | sharkey-e2352839e4639b09e2e52b2ada1399097fad1d8d.tar.gz sharkey-e2352839e4639b09e2e52b2ada1399097fad1d8d.tar.bz2 sharkey-e2352839e4639b09e2e52b2ada1399097fad1d8d.zip | |
merge: upstream changes for 2024.11 (!742)
View MR for information: https://activitypub.software/TransFem-org/Sharkey/-/merge_requests/742
Closes #645 and #646
Approved-by: Hazelnoot <acomputerdog@gmail.com>
Approved-by: Marie <github@yuugi.dev>
Diffstat (limited to 'packages/backend/src/queue')
5 files changed, 317 insertions, 20 deletions
diff --git a/packages/backend/src/queue/QueueProcessorModule.ts b/packages/backend/src/queue/QueueProcessorModule.ts index dd588e0115..51fd97dc97 100644 --- a/packages/backend/src/queue/QueueProcessorModule.ts +++ b/packages/backend/src/queue/QueueProcessorModule.ts @@ -6,6 +6,7 @@ import { Module } from '@nestjs/common'; import { CoreModule } from '@/core/CoreModule.js'; import { GlobalModule } from '@/GlobalModule.js'; +import { CheckModeratorsActivityProcessorService } from '@/queue/processors/CheckModeratorsActivityProcessorService.js'; import { QueueLoggerService } from './QueueLoggerService.js'; import { QueueProcessorService } from './QueueProcessorService.js'; import { DeliverProcessorService } from './processors/DeliverProcessorService.js'; @@ -85,6 +86,8 @@ import { ScheduleNotePostProcessorService } from './processors/ScheduleNotePostP DeliverProcessorService, InboxProcessorService, AggregateRetentionProcessorService, + CheckExpiredMutingsProcessorService, + CheckModeratorsActivityProcessorService, QueueProcessorService, ScheduleNotePostProcessorService, ], diff --git a/packages/backend/src/queue/QueueProcessorService.ts b/packages/backend/src/queue/QueueProcessorService.ts index 4cc5446062..33c2d02dd8 100644 --- a/packages/backend/src/queue/QueueProcessorService.ts +++ b/packages/backend/src/queue/QueueProcessorService.ts @@ -10,6 +10,7 @@ import type { Config } from '@/config.js'; import { DI } from '@/di-symbols.js'; import type Logger from '@/logger.js'; import { bindThis } from '@/decorators.js'; +import { CheckModeratorsActivityProcessorService } from '@/queue/processors/CheckModeratorsActivityProcessorService.js'; import { StatusError } from '@/misc/status-error.js'; import { UserWebhookDeliverProcessorService } from './processors/UserWebhookDeliverProcessorService.js'; import { SystemWebhookDeliverProcessorService } from './processors/SystemWebhookDeliverProcessorService.js'; @@ -70,7 +71,7 @@ function getJobInfo(job: Bull.Job | undefined, increment = false): string { // onActiveとかonCompletedのattemptsMadeがなぜか0始まりなのでインクリメントする const currentAttempts = job.attemptsMade + (increment ? 1 : 0); - const maxAttempts = job.opts ? job.opts.attempts : 0; + const maxAttempts = job.opts.attempts ?? 0; return `id=${job.id} attempts=${currentAttempts}/${maxAttempts} age=${formated}`; } @@ -127,6 +128,7 @@ export class QueueProcessorService implements OnApplicationShutdown { private aggregateRetentionProcessorService: AggregateRetentionProcessorService, private checkExpiredMutingsProcessorService: CheckExpiredMutingsProcessorService, private bakeBufferedReactionsProcessorService: BakeBufferedReactionsProcessorService, + private checkModeratorsActivityProcessorService: CheckModeratorsActivityProcessorService, private cleanProcessorService: CleanProcessorService, private scheduleNotePostProcessorService: ScheduleNotePostProcessorService, ) { @@ -171,6 +173,7 @@ export class QueueProcessorService implements OnApplicationShutdown { case 'aggregateRetention': return this.aggregateRetentionProcessorService.process(); case 'checkExpiredMutings': return this.checkExpiredMutingsProcessorService.process(); case 'bakeBufferedReactions': return this.bakeBufferedReactionsProcessorService.process(); + case 'checkModeratorsActivity': return this.checkModeratorsActivityProcessorService.process(); case 'clean': return this.cleanProcessorService.process(); default: throw new Error(`unrecognized job type ${job.name} for system`); } diff --git a/packages/backend/src/queue/processors/CheckModeratorsActivityProcessorService.ts b/packages/backend/src/queue/processors/CheckModeratorsActivityProcessorService.ts new file mode 100644 index 0000000000..b81987cc15 --- /dev/null +++ b/packages/backend/src/queue/processors/CheckModeratorsActivityProcessorService.ts @@ -0,0 +1,276 @@ +/* + * SPDX-FileCopyrightText: syuilo and misskey-project + * SPDX-License-Identifier: AGPL-3.0-only + */ + +import { Inject, Injectable } from '@nestjs/common'; +import { In } from 'typeorm'; +import type Logger from '@/logger.js'; +import { bindThis } from '@/decorators.js'; +import { MetaService } from '@/core/MetaService.js'; +import { RoleService } from '@/core/RoleService.js'; +import { EmailService } from '@/core/EmailService.js'; +import { MiUser, type UserProfilesRepository } from '@/models/_.js'; +import { DI } from '@/di-symbols.js'; +import { SystemWebhookService } from '@/core/SystemWebhookService.js'; +import { AnnouncementService } from '@/core/AnnouncementService.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; + +// モデレーターが不在と判断する日付の閾値 +const MODERATOR_INACTIVITY_LIMIT_DAYS = 7; +// 警告通知やログ出力を行う残日数の閾値 +const MODERATOR_INACTIVITY_WARNING_REMAINING_DAYS = 2; +// 期限から6時間ごとに通知を行う +const MODERATOR_INACTIVITY_WARNING_NOTIFY_INTERVAL_HOURS = 6; +const ONE_HOUR_MILLI_SEC = 1000 * 60 * 60; +const ONE_DAY_MILLI_SEC = ONE_HOUR_MILLI_SEC * 24; + +export type ModeratorInactivityEvaluationResult = { + isModeratorsInactive: boolean; + inactiveModerators: MiUser[]; + remainingTime: ModeratorInactivityRemainingTime; +} + +export type ModeratorInactivityRemainingTime = { + time: number; + asHours: number; + asDays: number; +}; + +function generateModeratorInactivityMail(remainingTime: ModeratorInactivityRemainingTime) { + const subject = 'Moderator Inactivity Warning'; + + const timeVariant = remainingTime.asDays === 0 ? `${remainingTime.asHours} hours` : `${remainingTime.asDays} days`; + const timeVariantJa = remainingTime.asDays === 0 ? `${remainingTime.asHours} 時間` : `${remainingTime.asDays} 日間`; + const message = [ + 'To Moderators,', + '', + `No moderator has been active for a period of time. After further ${timeVariant} of inactivity, the instance will switch to invitation only.`, + 'If you do not wish that to happen, please log into Sharkey to update your last active date and time.', + ]; + + const html = message.join('<br>'); + const text = message.join('\n'); + + return { + subject, + html, + text, + }; +} + +function generateInvitationOnlyChangedMail() { + const subject = 'Switch to invitation only'; + + const message = [ + 'To Moderators,', + '', + `The instance has been switched to invitation only, because no moderator activity was detected for ${MODERATOR_INACTIVITY_LIMIT_DAYS} days.`, + 'To change this, please log in and use the control panel.', + ]; + + const html = message.join('<br>'); + const text = message.join('\n'); + + return { + subject, + html, + text, + }; +} + +@Injectable() +export class CheckModeratorsActivityProcessorService { + private logger: Logger; + + constructor( + @Inject(DI.userProfilesRepository) + private userProfilesRepository: UserProfilesRepository, + private metaService: MetaService, + private roleService: RoleService, + private emailService: EmailService, + private announcementService: AnnouncementService, + private systemWebhookService: SystemWebhookService, + private queueLoggerService: QueueLoggerService, + ) { + this.logger = this.queueLoggerService.logger.createSubLogger('check-moderators-activity'); + } + + @bindThis + public async process(): Promise<void> { + this.logger.info('start.'); + + const meta = await this.metaService.fetch(false); + if (!meta.disableRegistration) { + await this.processImpl(); + } else { + this.logger.info('is already invitation only.'); + } + + this.logger.succ('finish.'); + } + + @bindThis + private async processImpl() { + const evaluateResult = await this.evaluateModeratorsInactiveDays(); + if (evaluateResult.isModeratorsInactive) { + this.logger.warn(`The moderator has been inactive for ${MODERATOR_INACTIVITY_LIMIT_DAYS} days. We will move to invitation only.`); + + await this.changeToInvitationOnly(); + await this.notifyChangeToInvitationOnly(); + } else { + const remainingTime = evaluateResult.remainingTime; + if (remainingTime.asDays <= MODERATOR_INACTIVITY_WARNING_REMAINING_DAYS) { + const timeVariant = remainingTime.asDays === 0 ? `${remainingTime.asHours} hours` : `${remainingTime.asDays} days`; + this.logger.warn(`A moderator has been inactive for a period of time. If you are inactive for an additional ${timeVariant}, it will switch to invitation only.`); + + if (remainingTime.asHours % MODERATOR_INACTIVITY_WARNING_NOTIFY_INTERVAL_HOURS === 0) { + // ジョブの実行頻度と同等だと通知が多すぎるため期限から6時間ごとに通知する + // つまり、のこり2日を切ったら6時間ごとに通知が送られる + await this.notifyInactiveModeratorsWarning(remainingTime); + } + } + } + } + + /** + * モデレーターが不在であるかどうかを確認する。trueの場合はモデレーターが不在である。 + * isModerator, isAdministrator, isRootのいずれかがtrueのユーザを対象に、 + * {@link MiUser.lastActiveDate}の値が実行日時の{@link MODERATOR_INACTIVITY_LIMIT_DAYS}日前よりも古いユーザがいるかどうかを確認する。 + * {@link MiUser.lastActiveDate}がnullの場合は、そのユーザは確認の対象外とする。 + * + * ----- + * + * ### サンプルパターン + * - 実行日時: 2022-01-30 12:00:00 + * - 判定基準: 2022-01-23 12:00:00(実行日時の{@link MODERATOR_INACTIVITY_LIMIT_DAYS}日前) + * + * #### パターン① + * - モデレータA: lastActiveDate = 2022-01-20 00:00:00 ※アウト + * - モデレータB: lastActiveDate = 2022-01-23 12:00:00 ※セーフ(判定基準と同値なのでギリギリ残り0日) + * - モデレータC: lastActiveDate = 2022-01-23 11:59:59 ※アウト(残り-1日) + * - モデレータD: lastActiveDate = null + * + * この場合、モデレータBのアクティビティのみ判定基準日よりも古くないため、モデレーターが在席と判断される。 + * + * #### パターン② + * - モデレータA: lastActiveDate = 2022-01-20 00:00:00 ※アウト + * - モデレータB: lastActiveDate = 2022-01-22 12:00:00 ※アウト(残り-1日) + * - モデレータC: lastActiveDate = 2022-01-23 11:59:59 ※アウト(残り-1日) + * - モデレータD: lastActiveDate = null + * + * この場合、モデレータA, B, Cのアクティビティは判定基準日よりも古いため、モデレーターが不在と判断される。 + */ + @bindThis + public async evaluateModeratorsInactiveDays(): Promise<ModeratorInactivityEvaluationResult> { + const today = new Date(); + const inactivePeriod = new Date(today); + inactivePeriod.setDate(today.getDate() - MODERATOR_INACTIVITY_LIMIT_DAYS); + + const moderators = await this.fetchModerators() + .then(it => it.filter(it => it.lastActiveDate != null)); + const inactiveModerators = moderators + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + .filter(it => it.lastActiveDate!.getTime() < inactivePeriod.getTime()); + + // 残りの猶予を示したいので、最終アクティブ日時が一番若いモデレータの日数を基準に猶予を計算する + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const newestLastActiveDate = new Date(Math.max(...moderators.map(it => it.lastActiveDate!.getTime()))); + const remainingTime = newestLastActiveDate.getTime() - inactivePeriod.getTime(); + const remainingTimeAsDays = Math.floor(remainingTime / ONE_DAY_MILLI_SEC); + const remainingTimeAsHours = Math.floor((remainingTime / ONE_HOUR_MILLI_SEC)); + + return { + isModeratorsInactive: inactiveModerators.length === moderators.length, + inactiveModerators, + remainingTime: { + time: remainingTime, + asHours: remainingTimeAsHours, + asDays: remainingTimeAsDays, + }, + }; + } + + @bindThis + private async changeToInvitationOnly() { + await this.metaService.update({ disableRegistration: true }); + } + + @bindThis + public async notifyInactiveModeratorsWarning(remainingTime: ModeratorInactivityRemainingTime) { + // -- モデレータへのメール送信 + + const moderators = await this.fetchModerators(); + const moderatorProfiles = await this.userProfilesRepository + .findBy({ userId: In(moderators.map(it => it.id)) }) + .then(it => new Map(it.map(it => [it.userId, it]))); + + const mail = generateModeratorInactivityMail(remainingTime); + for (const moderator of moderators) { + const profile = moderatorProfiles.get(moderator.id); + if (profile && profile.email && profile.emailVerified) { + this.emailService.sendEmail(profile.email, mail.subject, mail.html, mail.text); + } + } + + // -- SystemWebhook + + const systemWebhooks = await this.systemWebhookService.fetchActiveSystemWebhooks() + .then(it => it.filter(it => it.on.includes('inactiveModeratorsWarning'))); + for (const systemWebhook of systemWebhooks) { + this.systemWebhookService.enqueueSystemWebhook( + systemWebhook, + 'inactiveModeratorsWarning', + { remainingTime: remainingTime }, + ); + } + } + + @bindThis + public async notifyChangeToInvitationOnly() { + // -- モデレータへのメールとお知らせ(個人向け)送信 + + const moderators = await this.fetchModerators(); + const moderatorProfiles = await this.userProfilesRepository + .findBy({ userId: In(moderators.map(it => it.id)) }) + .then(it => new Map(it.map(it => [it.userId, it]))); + + const mail = generateInvitationOnlyChangedMail(); + for (const moderator of moderators) { + this.announcementService.create({ + title: mail.subject, + text: mail.text, + forExistingUsers: true, + needConfirmationToRead: true, + userId: moderator.id, + }); + + const profile = moderatorProfiles.get(moderator.id); + if (profile && profile.email && profile.emailVerified) { + this.emailService.sendEmail(profile.email, mail.subject, mail.html, mail.text); + } + } + + // -- SystemWebhook + + const systemWebhooks = await this.systemWebhookService.fetchActiveSystemWebhooks() + .then(it => it.filter(it => it.on.includes('inactiveModeratorsInvitationOnlyChanged'))); + for (const systemWebhook of systemWebhooks) { + this.systemWebhookService.enqueueSystemWebhook( + systemWebhook, + 'inactiveModeratorsInvitationOnlyChanged', + {}, + ); + } + } + + @bindThis + private async fetchModerators() { + // TODO: モデレーター以外にも特別な権限を持つユーザーがいる場合は考慮する + return this.roleService.getModerators({ + includeAdmins: true, + includeRoot: true, + excludeExpire: true, + }); + } +} diff --git a/packages/backend/src/queue/processors/DeliverProcessorService.ts b/packages/backend/src/queue/processors/DeliverProcessorService.ts index 9590a4fe71..5a16496011 100644 --- a/packages/backend/src/queue/processors/DeliverProcessorService.ts +++ b/packages/backend/src/queue/processors/DeliverProcessorService.ts @@ -74,8 +74,17 @@ export class DeliverProcessorService { try { await this.apRequestService.signedPost(job.data.user, job.data.to, job.data.content, job.data.digest); - // Update stats - this.federatedInstanceService.fetch(host).then(i => { + this.apRequestChart.deliverSucc(); + this.federationChart.deliverd(host, true); + + // Update instance stats + process.nextTick(async () => { + const i = await (this.meta.enableStatsForFederatedInstances + ? this.federatedInstanceService.fetchOrRegister(host) + : this.federatedInstanceService.fetch(host)); + + if (i == null) return; + if (i.isNotResponding) { this.federatedInstanceService.update(i.id, { isNotResponding: false, @@ -83,9 +92,9 @@ export class DeliverProcessorService { }); } - this.fetchInstanceMetadataService.fetchInstanceMetadata(i); - this.apRequestChart.deliverSucc(); - this.federationChart.deliverd(i.host, true); + if (this.meta.enableStatsForFederatedInstances) { + this.fetchInstanceMetadataService.fetchInstanceMetadata(i); + } if (this.meta.enableChartsForFederatedInstances) { this.instanceChart.requestSent(i.host, true); @@ -94,8 +103,11 @@ export class DeliverProcessorService { return 'Success'; } catch (res) { - // Update stats - this.federatedInstanceService.fetch(host).then(i => { + this.apRequestChart.deliverFail(); + this.federationChart.deliverd(host, false); + + // Update instance stats + this.federatedInstanceService.fetchOrRegister(host).then(i => { if (!i.isNotResponding) { this.federatedInstanceService.update(i.id, { isNotResponding: true, @@ -116,9 +128,6 @@ export class DeliverProcessorService { }); } - this.apRequestChart.deliverFail(); - this.federationChart.deliverd(i.host, false); - if (this.meta.enableChartsForFederatedInstances) { this.instanceChart.requestSent(i.host, false); } @@ -129,7 +138,7 @@ export class DeliverProcessorService { if (!res.isRetryable) { // 相手が閉鎖していることを明示しているため、配送停止する if (job.data.isSharedInbox && res.statusCode === 410) { - this.federatedInstanceService.fetch(host).then(i => { + this.federatedInstanceService.fetchOrRegister(host).then(i => { this.federatedInstanceService.update(i.id, { suspensionState: 'goneSuspended', }); diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts index eaf2aa3c4c..7dfa4ec704 100644 --- a/packages/backend/src/queue/processors/InboxProcessorService.ts +++ b/packages/backend/src/queue/processors/InboxProcessorService.ts @@ -60,7 +60,7 @@ export class InboxProcessorService implements OnApplicationShutdown { private queueLoggerService: QueueLoggerService, ) { this.logger = this.queueLoggerService.logger.createSubLogger('inbox'); - this.updateInstanceQueue = new CollapsedQueue(60 * 1000 * 5, this.collapseUpdateInstanceJobs, this.performUpdateInstance); + this.updateInstanceQueue = new CollapsedQueue(process.env.NODE_ENV !== 'test' ? 60 * 1000 * 5 : 0, this.collapseUpdateInstanceJobs, this.performUpdateInstance); } @bindThis @@ -198,21 +198,27 @@ export class InboxProcessorService implements OnApplicationShutdown { delete activity.id; } - // Update stats - this.federatedInstanceService.fetch(authUser.user.host).then(i => { + this.apRequestChart.inbox(); + this.federationChart.inbox(authUser.user.host); + + // Update instance stats + process.nextTick(async () => { + const i = await (this.meta.enableStatsForFederatedInstances + ? this.federatedInstanceService.fetchOrRegister(authUser.user.host) + : this.federatedInstanceService.fetch(authUser.user.host)); + + if (i == null) return; + this.updateInstanceQueue.enqueue(i.id, { latestRequestReceivedAt: new Date(), shouldUnsuspend: i.suspensionState === 'autoSuspendedForNotResponding', }); - this.fetchInstanceMetadataService.fetchInstanceMetadata(i); - - this.apRequestChart.inbox(); - this.federationChart.inbox(i.host); - if (this.meta.enableChartsForFederatedInstances) { this.instanceChart.requestReceived(i.host); } + + this.fetchInstanceMetadataService.fetchInstanceMetadata(i); }); // アクティビティを処理 |