summaryrefslogtreecommitdiff
path: root/packages/backend/src/queue/processors
diff options
context:
space:
mode:
authorJulia <julia@insertdomain.name>2024-12-31 02:30:13 +0000
committerJulia <julia@insertdomain.name>2024-12-31 02:30:13 +0000
commit4c0bbddd0fba7e0d76fb484312e691ee29fe5858 (patch)
tree4bb1a3a2a79c679ac021a2199bd526be469524d4 /packages/backend/src/queue/processors
parentmerge: fixes for 2024.9.4 (if we want to) (!770) (diff)
parentBump version (diff)
downloadsharkey-4c0bbddd0fba7e0d76fb484312e691ee29fe5858.tar.gz
sharkey-4c0bbddd0fba7e0d76fb484312e691ee29fe5858.tar.bz2
sharkey-4c0bbddd0fba7e0d76fb484312e691ee29fe5858.zip
merge: Bump stable version (!842)
View MR for information: https://activitypub.software/TransFem-org/Sharkey/-/merge_requests/842
Diffstat (limited to 'packages/backend/src/queue/processors')
-rw-r--r--packages/backend/src/queue/processors/CheckModeratorsActivityProcessorService.ts276
-rw-r--r--packages/backend/src/queue/processors/CleanChartsProcessorService.ts26
-rw-r--r--packages/backend/src/queue/processors/DeliverProcessorService.ts31
-rw-r--r--packages/backend/src/queue/processors/InboxProcessorService.ts36
-rw-r--r--packages/backend/src/queue/processors/ResyncChartsProcessorService.ts8
-rw-r--r--packages/backend/src/queue/processors/ScheduleNotePostProcessorService.ts144
-rw-r--r--packages/backend/src/queue/processors/TickChartsProcessorService.ts26
7 files changed, 493 insertions, 54 deletions
diff --git a/packages/backend/src/queue/processors/CheckModeratorsActivityProcessorService.ts b/packages/backend/src/queue/processors/CheckModeratorsActivityProcessorService.ts
new file mode 100644
index 0000000000..b81987cc15
--- /dev/null
+++ b/packages/backend/src/queue/processors/CheckModeratorsActivityProcessorService.ts
@@ -0,0 +1,276 @@
+/*
+ * SPDX-FileCopyrightText: syuilo and misskey-project
+ * SPDX-License-Identifier: AGPL-3.0-only
+ */
+
+import { Inject, Injectable } from '@nestjs/common';
+import { In } from 'typeorm';
+import type Logger from '@/logger.js';
+import { bindThis } from '@/decorators.js';
+import { MetaService } from '@/core/MetaService.js';
+import { RoleService } from '@/core/RoleService.js';
+import { EmailService } from '@/core/EmailService.js';
+import { MiUser, type UserProfilesRepository } from '@/models/_.js';
+import { DI } from '@/di-symbols.js';
+import { SystemWebhookService } from '@/core/SystemWebhookService.js';
+import { AnnouncementService } from '@/core/AnnouncementService.js';
+import { QueueLoggerService } from '../QueueLoggerService.js';
+
+// モデレーターが不在と判断する日付の閾値
+const MODERATOR_INACTIVITY_LIMIT_DAYS = 7;
+// 警告通知やログ出力を行う残日数の閾値
+const MODERATOR_INACTIVITY_WARNING_REMAINING_DAYS = 2;
+// 期限から6時間ごとに通知を行う
+const MODERATOR_INACTIVITY_WARNING_NOTIFY_INTERVAL_HOURS = 6;
+const ONE_HOUR_MILLI_SEC = 1000 * 60 * 60;
+const ONE_DAY_MILLI_SEC = ONE_HOUR_MILLI_SEC * 24;
+
+export type ModeratorInactivityEvaluationResult = {
+ isModeratorsInactive: boolean;
+ inactiveModerators: MiUser[];
+ remainingTime: ModeratorInactivityRemainingTime;
+}
+
+export type ModeratorInactivityRemainingTime = {
+ time: number;
+ asHours: number;
+ asDays: number;
+};
+
+function generateModeratorInactivityMail(remainingTime: ModeratorInactivityRemainingTime) {
+ const subject = 'Moderator Inactivity Warning';
+
+ const timeVariant = remainingTime.asDays === 0 ? `${remainingTime.asHours} hours` : `${remainingTime.asDays} days`;
+ const timeVariantJa = remainingTime.asDays === 0 ? `${remainingTime.asHours} 時間` : `${remainingTime.asDays} 日間`;
+ const message = [
+ 'To Moderators,',
+ '',
+ `No moderator has been active for a period of time. After further ${timeVariant} of inactivity, the instance will switch to invitation only.`,
+ 'If you do not wish that to happen, please log into Sharkey to update your last active date and time.',
+ ];
+
+ const html = message.join('<br>');
+ const text = message.join('\n');
+
+ return {
+ subject,
+ html,
+ text,
+ };
+}
+
+function generateInvitationOnlyChangedMail() {
+ const subject = 'Switch to invitation only';
+
+ const message = [
+ 'To Moderators,',
+ '',
+ `The instance has been switched to invitation only, because no moderator activity was detected for ${MODERATOR_INACTIVITY_LIMIT_DAYS} days.`,
+ 'To change this, please log in and use the control panel.',
+ ];
+
+ const html = message.join('<br>');
+ const text = message.join('\n');
+
+ return {
+ subject,
+ html,
+ text,
+ };
+}
+
+@Injectable()
+export class CheckModeratorsActivityProcessorService {
+ private logger: Logger;
+
+ constructor(
+ @Inject(DI.userProfilesRepository)
+ private userProfilesRepository: UserProfilesRepository,
+ private metaService: MetaService,
+ private roleService: RoleService,
+ private emailService: EmailService,
+ private announcementService: AnnouncementService,
+ private systemWebhookService: SystemWebhookService,
+ private queueLoggerService: QueueLoggerService,
+ ) {
+ this.logger = this.queueLoggerService.logger.createSubLogger('check-moderators-activity');
+ }
+
+ @bindThis
+ public async process(): Promise<void> {
+ this.logger.info('start.');
+
+ const meta = await this.metaService.fetch(false);
+ if (!meta.disableRegistration) {
+ await this.processImpl();
+ } else {
+ this.logger.info('is already invitation only.');
+ }
+
+ this.logger.succ('finish.');
+ }
+
+ @bindThis
+ private async processImpl() {
+ const evaluateResult = await this.evaluateModeratorsInactiveDays();
+ if (evaluateResult.isModeratorsInactive) {
+ this.logger.warn(`The moderator has been inactive for ${MODERATOR_INACTIVITY_LIMIT_DAYS} days. We will move to invitation only.`);
+
+ await this.changeToInvitationOnly();
+ await this.notifyChangeToInvitationOnly();
+ } else {
+ const remainingTime = evaluateResult.remainingTime;
+ if (remainingTime.asDays <= MODERATOR_INACTIVITY_WARNING_REMAINING_DAYS) {
+ const timeVariant = remainingTime.asDays === 0 ? `${remainingTime.asHours} hours` : `${remainingTime.asDays} days`;
+ this.logger.warn(`A moderator has been inactive for a period of time. If you are inactive for an additional ${timeVariant}, it will switch to invitation only.`);
+
+ if (remainingTime.asHours % MODERATOR_INACTIVITY_WARNING_NOTIFY_INTERVAL_HOURS === 0) {
+ // ジョブの実行頻度と同等だと通知が多すぎるため期限から6時間ごとに通知する
+ // つまり、のこり2日を切ったら6時間ごとに通知が送られる
+ await this.notifyInactiveModeratorsWarning(remainingTime);
+ }
+ }
+ }
+ }
+
+ /**
+ * モデレーターが不在であるかどうかを確認する。trueの場合はモデレーターが不在である。
+ * isModerator, isAdministrator, isRootのいずれかがtrueのユーザを対象に、
+ * {@link MiUser.lastActiveDate}の値が実行日時の{@link MODERATOR_INACTIVITY_LIMIT_DAYS}日前よりも古いユーザがいるかどうかを確認する。
+ * {@link MiUser.lastActiveDate}がnullの場合は、そのユーザは確認の対象外とする。
+ *
+ * -----
+ *
+ * ### サンプルパターン
+ * - 実行日時: 2022-01-30 12:00:00
+ * - 判定基準: 2022-01-23 12:00:00(実行日時の{@link MODERATOR_INACTIVITY_LIMIT_DAYS}日前)
+ *
+ * #### パターン①
+ * - モデレータA: lastActiveDate = 2022-01-20 00:00:00 ※アウト
+ * - モデレータB: lastActiveDate = 2022-01-23 12:00:00 ※セーフ(判定基準と同値なのでギリギリ残り0日)
+ * - モデレータC: lastActiveDate = 2022-01-23 11:59:59 ※アウト(残り-1日)
+ * - モデレータD: lastActiveDate = null
+ *
+ * この場合、モデレータBのアクティビティのみ判定基準日よりも古くないため、モデレーターが在席と判断される。
+ *
+ * #### パターン②
+ * - モデレータA: lastActiveDate = 2022-01-20 00:00:00 ※アウト
+ * - モデレータB: lastActiveDate = 2022-01-22 12:00:00 ※アウト(残り-1日)
+ * - モデレータC: lastActiveDate = 2022-01-23 11:59:59 ※アウト(残り-1日)
+ * - モデレータD: lastActiveDate = null
+ *
+ * この場合、モデレータA, B, Cのアクティビティは判定基準日よりも古いため、モデレーターが不在と判断される。
+ */
+ @bindThis
+ public async evaluateModeratorsInactiveDays(): Promise<ModeratorInactivityEvaluationResult> {
+ const today = new Date();
+ const inactivePeriod = new Date(today);
+ inactivePeriod.setDate(today.getDate() - MODERATOR_INACTIVITY_LIMIT_DAYS);
+
+ const moderators = await this.fetchModerators()
+ .then(it => it.filter(it => it.lastActiveDate != null));
+ const inactiveModerators = moderators
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ .filter(it => it.lastActiveDate!.getTime() < inactivePeriod.getTime());
+
+ // 残りの猶予を示したいので、最終アクティブ日時が一番若いモデレータの日数を基準に猶予を計算する
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ const newestLastActiveDate = new Date(Math.max(...moderators.map(it => it.lastActiveDate!.getTime())));
+ const remainingTime = newestLastActiveDate.getTime() - inactivePeriod.getTime();
+ const remainingTimeAsDays = Math.floor(remainingTime / ONE_DAY_MILLI_SEC);
+ const remainingTimeAsHours = Math.floor((remainingTime / ONE_HOUR_MILLI_SEC));
+
+ return {
+ isModeratorsInactive: inactiveModerators.length === moderators.length,
+ inactiveModerators,
+ remainingTime: {
+ time: remainingTime,
+ asHours: remainingTimeAsHours,
+ asDays: remainingTimeAsDays,
+ },
+ };
+ }
+
+ @bindThis
+ private async changeToInvitationOnly() {
+ await this.metaService.update({ disableRegistration: true });
+ }
+
+ @bindThis
+ public async notifyInactiveModeratorsWarning(remainingTime: ModeratorInactivityRemainingTime) {
+ // -- モデレータへのメール送信
+
+ const moderators = await this.fetchModerators();
+ const moderatorProfiles = await this.userProfilesRepository
+ .findBy({ userId: In(moderators.map(it => it.id)) })
+ .then(it => new Map(it.map(it => [it.userId, it])));
+
+ const mail = generateModeratorInactivityMail(remainingTime);
+ for (const moderator of moderators) {
+ const profile = moderatorProfiles.get(moderator.id);
+ if (profile && profile.email && profile.emailVerified) {
+ this.emailService.sendEmail(profile.email, mail.subject, mail.html, mail.text);
+ }
+ }
+
+ // -- SystemWebhook
+
+ const systemWebhooks = await this.systemWebhookService.fetchActiveSystemWebhooks()
+ .then(it => it.filter(it => it.on.includes('inactiveModeratorsWarning')));
+ for (const systemWebhook of systemWebhooks) {
+ this.systemWebhookService.enqueueSystemWebhook(
+ systemWebhook,
+ 'inactiveModeratorsWarning',
+ { remainingTime: remainingTime },
+ );
+ }
+ }
+
+ @bindThis
+ public async notifyChangeToInvitationOnly() {
+ // -- モデレータへのメールとお知らせ(個人向け)送信
+
+ const moderators = await this.fetchModerators();
+ const moderatorProfiles = await this.userProfilesRepository
+ .findBy({ userId: In(moderators.map(it => it.id)) })
+ .then(it => new Map(it.map(it => [it.userId, it])));
+
+ const mail = generateInvitationOnlyChangedMail();
+ for (const moderator of moderators) {
+ this.announcementService.create({
+ title: mail.subject,
+ text: mail.text,
+ forExistingUsers: true,
+ needConfirmationToRead: true,
+ userId: moderator.id,
+ });
+
+ const profile = moderatorProfiles.get(moderator.id);
+ if (profile && profile.email && profile.emailVerified) {
+ this.emailService.sendEmail(profile.email, mail.subject, mail.html, mail.text);
+ }
+ }
+
+ // -- SystemWebhook
+
+ const systemWebhooks = await this.systemWebhookService.fetchActiveSystemWebhooks()
+ .then(it => it.filter(it => it.on.includes('inactiveModeratorsInvitationOnlyChanged')));
+ for (const systemWebhook of systemWebhooks) {
+ this.systemWebhookService.enqueueSystemWebhook(
+ systemWebhook,
+ 'inactiveModeratorsInvitationOnlyChanged',
+ {},
+ );
+ }
+ }
+
+ @bindThis
+ private async fetchModerators() {
+ // TODO: モデレーター以外にも特別な権限を持つユーザーがいる場合は考慮する
+ return this.roleService.getModerators({
+ includeAdmins: true,
+ includeRoot: true,
+ excludeExpire: true,
+ });
+ }
+}
diff --git a/packages/backend/src/queue/processors/CleanChartsProcessorService.ts b/packages/backend/src/queue/processors/CleanChartsProcessorService.ts
index 110468801c..19f98c0d51 100644
--- a/packages/backend/src/queue/processors/CleanChartsProcessorService.ts
+++ b/packages/backend/src/queue/processors/CleanChartsProcessorService.ts
@@ -48,20 +48,18 @@ export class CleanChartsProcessorService {
public async process(): Promise<void> {
this.logger.info('Clean charts...');
- await Promise.all([
- this.federationChart.clean(),
- this.notesChart.clean(),
- this.usersChart.clean(),
- this.activeUsersChart.clean(),
- this.instanceChart.clean(),
- this.perUserNotesChart.clean(),
- this.perUserPvChart.clean(),
- this.driveChart.clean(),
- this.perUserReactionsChart.clean(),
- this.perUserFollowingChart.clean(),
- this.perUserDriveChart.clean(),
- this.apRequestChart.clean(),
- ]);
+ await this.federationChart.clean();
+ await this.notesChart.clean();
+ await this.usersChart.clean();
+ await this.activeUsersChart.clean();
+ await this.instanceChart.clean();
+ await this.perUserNotesChart.clean();
+ await this.perUserPvChart.clean();
+ await this.driveChart.clean();
+ await this.perUserReactionsChart.clean();
+ await this.perUserFollowingChart.clean();
+ await this.perUserDriveChart.clean();
+ await this.apRequestChart.clean();
this.logger.succ('All charts successfully cleaned.');
}
diff --git a/packages/backend/src/queue/processors/DeliverProcessorService.ts b/packages/backend/src/queue/processors/DeliverProcessorService.ts
index 9590a4fe71..5a16496011 100644
--- a/packages/backend/src/queue/processors/DeliverProcessorService.ts
+++ b/packages/backend/src/queue/processors/DeliverProcessorService.ts
@@ -74,8 +74,17 @@ export class DeliverProcessorService {
try {
await this.apRequestService.signedPost(job.data.user, job.data.to, job.data.content, job.data.digest);
- // Update stats
- this.federatedInstanceService.fetch(host).then(i => {
+ this.apRequestChart.deliverSucc();
+ this.federationChart.deliverd(host, true);
+
+ // Update instance stats
+ process.nextTick(async () => {
+ const i = await (this.meta.enableStatsForFederatedInstances
+ ? this.federatedInstanceService.fetchOrRegister(host)
+ : this.federatedInstanceService.fetch(host));
+
+ if (i == null) return;
+
if (i.isNotResponding) {
this.federatedInstanceService.update(i.id, {
isNotResponding: false,
@@ -83,9 +92,9 @@ export class DeliverProcessorService {
});
}
- this.fetchInstanceMetadataService.fetchInstanceMetadata(i);
- this.apRequestChart.deliverSucc();
- this.federationChart.deliverd(i.host, true);
+ if (this.meta.enableStatsForFederatedInstances) {
+ this.fetchInstanceMetadataService.fetchInstanceMetadata(i);
+ }
if (this.meta.enableChartsForFederatedInstances) {
this.instanceChart.requestSent(i.host, true);
@@ -94,8 +103,11 @@ export class DeliverProcessorService {
return 'Success';
} catch (res) {
- // Update stats
- this.federatedInstanceService.fetch(host).then(i => {
+ this.apRequestChart.deliverFail();
+ this.federationChart.deliverd(host, false);
+
+ // Update instance stats
+ this.federatedInstanceService.fetchOrRegister(host).then(i => {
if (!i.isNotResponding) {
this.federatedInstanceService.update(i.id, {
isNotResponding: true,
@@ -116,9 +128,6 @@ export class DeliverProcessorService {
});
}
- this.apRequestChart.deliverFail();
- this.federationChart.deliverd(i.host, false);
-
if (this.meta.enableChartsForFederatedInstances) {
this.instanceChart.requestSent(i.host, false);
}
@@ -129,7 +138,7 @@ export class DeliverProcessorService {
if (!res.isRetryable) {
// 相手が閉鎖していることを明示しているため、配送停止する
if (job.data.isSharedInbox && res.statusCode === 410) {
- this.federatedInstanceService.fetch(host).then(i => {
+ this.federatedInstanceService.fetchOrRegister(host).then(i => {
this.federatedInstanceService.update(i.id, {
suspensionState: 'goneSuspended',
});
diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts
index 102e835e24..7727a3e985 100644
--- a/packages/backend/src/queue/processors/InboxProcessorService.ts
+++ b/packages/backend/src/queue/processors/InboxProcessorService.ts
@@ -59,7 +59,7 @@ export class InboxProcessorService implements OnApplicationShutdown {
private queueLoggerService: QueueLoggerService,
) {
this.logger = this.queueLoggerService.logger.createSubLogger('inbox');
- this.updateInstanceQueue = new CollapsedQueue(60 * 1000 * 5, this.collapseUpdateInstanceJobs, this.performUpdateInstance);
+ this.updateInstanceQueue = new CollapsedQueue(process.env.NODE_ENV !== 'test' ? 60 * 1000 * 5 : 0, this.collapseUpdateInstanceJobs, this.performUpdateInstance);
}
@bindThis
@@ -193,31 +193,42 @@ export class InboxProcessorService implements OnApplicationShutdown {
throw new Bull.UnrecoverableError(`skip: signerHost(${signerHost}) !== activity.id host(${activityIdHost}`);
}
} else {
- throw new Bull.UnrecoverableError('skip: activity id is not a string');
+ // Activity ID should only be string or undefined.
+ delete activity.id;
}
- // Update stats
- this.federatedInstanceService.fetch(authUser.user.host).then(i => {
+ this.apRequestChart.inbox();
+ this.federationChart.inbox(authUser.user.host);
+
+ // Update instance stats
+ process.nextTick(async () => {
+ const i = await (this.meta.enableStatsForFederatedInstances
+ ? this.federatedInstanceService.fetchOrRegister(authUser.user.host)
+ : this.federatedInstanceService.fetch(authUser.user.host));
+
+ if (i == null) return;
+
this.updateInstanceQueue.enqueue(i.id, {
latestRequestReceivedAt: new Date(),
shouldUnsuspend: i.suspensionState === 'autoSuspendedForNotResponding',
});
- this.fetchInstanceMetadataService.fetchInstanceMetadata(i);
-
- this.apRequestChart.inbox();
- this.federationChart.inbox(i.host);
-
if (this.meta.enableChartsForFederatedInstances) {
this.instanceChart.requestReceived(i.host);
}
+
+ this.fetchInstanceMetadataService.fetchInstanceMetadata(i);
});
// アクティビティを処理
try {
const result = await this.apInboxService.performActivity(authUser.user, activity);
if (result && !result.startsWith('ok')) {
- this.logger.warn(`inbox activity ignored (maybe): id=${activity.id} reason=${result}`);
+ if (result.startsWith('skip:')) {
+ this.logger.info(`inbox activity ignored: id=${activity.id} reason=${result}`);
+ } else {
+ this.logger.warn(`inbox activity failed: id=${activity.id} reason=${result}`);
+ }
return result;
}
} catch (e) {
@@ -232,6 +243,11 @@ export class InboxProcessorService implements OnApplicationShutdown {
return e.message;
}
}
+
+ if (e instanceof StatusError && !e.isRetryable) {
+ return `skip: permanent error ${e.statusCode}`;
+ }
+
throw e;
}
return 'ok';
diff --git a/packages/backend/src/queue/processors/ResyncChartsProcessorService.ts b/packages/backend/src/queue/processors/ResyncChartsProcessorService.ts
index 570cdf9a75..46e1adf173 100644
--- a/packages/backend/src/queue/processors/ResyncChartsProcessorService.ts
+++ b/packages/backend/src/queue/processors/ResyncChartsProcessorService.ts
@@ -31,11 +31,9 @@ export class ResyncChartsProcessorService {
// TODO: ユーザーごとのチャートも更新する
// TODO: インスタンスごとのチャートも更新する
- await Promise.all([
- this.driveChart.resync(),
- this.notesChart.resync(),
- this.usersChart.resync(),
- ]);
+ await this.driveChart.resync();
+ await this.notesChart.resync();
+ await this.usersChart.resync();
this.logger.succ('All charts successfully resynced.');
}
diff --git a/packages/backend/src/queue/processors/ScheduleNotePostProcessorService.ts b/packages/backend/src/queue/processors/ScheduleNotePostProcessorService.ts
new file mode 100644
index 0000000000..62e3d1072f
--- /dev/null
+++ b/packages/backend/src/queue/processors/ScheduleNotePostProcessorService.ts
@@ -0,0 +1,144 @@
+/*
+ * SPDX-FileCopyrightText: syuilo and other misskey contributors
+ * SPDX-License-Identifier: AGPL-3.0-only
+ */
+
+import { Inject, Injectable } from '@nestjs/common';
+import type Logger from '@/logger.js';
+import { bindThis } from '@/decorators.js';
+import { NoteCreateService } from '@/core/NoteCreateService.js';
+import type { ChannelsRepository, DriveFilesRepository, MiDriveFile, NoteScheduleRepository, NotesRepository, UsersRepository } from '@/models/_.js';
+import { DI } from '@/di-symbols.js';
+import { NotificationService } from '@/core/NotificationService.js';
+import { IdentifiableError } from '@/misc/identifiable-error.js';
+import type { MiScheduleNoteType } from '@/models/NoteSchedule.js';
+import { QueueLoggerService } from '../QueueLoggerService.js';
+import type * as Bull from 'bullmq';
+import type { ScheduleNotePostJobData } from '../types.js';
+
+@Injectable()
+export class ScheduleNotePostProcessorService {
+ private logger: Logger;
+
+ constructor(
+ @Inject(DI.noteScheduleRepository)
+ private noteScheduleRepository: NoteScheduleRepository,
+
+ @Inject(DI.usersRepository)
+ private usersRepository: UsersRepository,
+ @Inject(DI.driveFilesRepository)
+ private driveFilesRepository: DriveFilesRepository,
+ @Inject(DI.notesRepository)
+ private notesRepository: NotesRepository,
+ @Inject(DI.channelsRepository)
+ private channelsRepository: ChannelsRepository,
+
+ private noteCreateService: NoteCreateService,
+ private queueLoggerService: QueueLoggerService,
+ private notificationService: NotificationService,
+ ) {
+ this.logger = this.queueLoggerService.logger.createSubLogger('schedule-note-post');
+ }
+
+ @bindThis
+ private async isValidNoteSchedule(note: MiScheduleNoteType, id: string): Promise<boolean> {
+ const reply = note.reply ? await this.notesRepository.findOneBy({ id: note.reply }) : undefined;
+ const renote = note.reply ? await this.notesRepository.findOneBy({ id: note.renote }) : undefined;
+ const channel = note.channel ? await this.channelsRepository.findOneBy({ id: note.channel, isArchived: false }) : undefined;
+ if (note.reply && !reply) {
+ this.logger.warn('Schedule Note Failed Reason: parent note to reply does not exist');
+ this.notificationService.createNotification(id, 'scheduledNoteFailed', {
+ reason: 'Replied to note on your scheduled note no longer exists',
+ });
+ return false;
+ }
+ if (note.renote && !renote) {
+ this.logger.warn('Schedule Note Failed Reason: attached quote note no longer exists');
+ this.notificationService.createNotification(id, 'scheduledNoteFailed', {
+ reason: 'A quoted note from one of your scheduled notes no longer exists',
+ });
+ return false;
+ }
+ if (note.channel && !channel) {
+ this.logger.warn('Schedule Note Failed Reason: Channel does not exist');
+ this.notificationService.createNotification(id, 'scheduledNoteFailed', {
+ reason: 'An attached channel on your scheduled note no longer exists',
+ });
+ return false;
+ }
+ return true;
+ }
+
+ @bindThis
+ public async process(job: Bull.Job<ScheduleNotePostJobData>): Promise<void> {
+ this.noteScheduleRepository.findOneBy({ id: job.data.scheduleNoteId }).then(async (data) => {
+ if (!data) {
+ this.logger.warn(`Schedule note ${job.data.scheduleNoteId} not found`);
+ } else {
+ const me = await this.usersRepository.findOneBy({ id: data.userId });
+ const note = data.note;
+ const reply = note.reply ? await this.notesRepository.findOneBy({ id: note.reply }) : undefined;
+ const renote = note.reply ? await this.notesRepository.findOneBy({ id: note.renote }) : undefined;
+ const channel = note.channel ? await this.channelsRepository.findOneBy({ id: note.channel, isArchived: false }) : undefined;
+
+ let files: MiDriveFile[] = [];
+ const fileIds = note.files;
+
+ if (fileIds.length > 0 && me) {
+ files = await this.driveFilesRepository.createQueryBuilder('file')
+ .where('file.userId = :userId AND file.id IN (:...fileIds)', {
+ userId: me.id,
+ fileIds,
+ })
+ .orderBy('array_position(ARRAY[:...fileIds], "id"::text)')
+ .setParameters({ fileIds })
+ .getMany();
+ }
+
+ if (!data.userId || !me) {
+ this.logger.warn('Schedule Note Failed Reason: User Not Found');
+ await this.noteScheduleRepository.remove(data);
+ return;
+ }
+
+ if (!await this.isValidNoteSchedule(note, me.id)) {
+ await this.noteScheduleRepository.remove(data);
+ return;
+ }
+
+ if (note.files.length !== files.length) {
+ this.logger.warn('Schedule Note Failed Reason: files are missing in the user\'s drive');
+ this.notificationService.createNotification(me.id, 'scheduledNoteFailed', {
+ reason: 'Some attached files on your scheduled note no longer exist',
+ });
+ await this.noteScheduleRepository.remove(data);
+ return;
+ }
+
+ const createdNote = await this.noteCreateService.create(me, {
+ ...note,
+ createdAt: new Date(),
+ files,
+ poll: note.poll ? {
+ choices: note.poll.choices,
+ multiple: note.poll.multiple,
+ expiresAt: note.poll.expiresAt ? new Date(note.poll.expiresAt) : null,
+ } : undefined,
+ reply,
+ renote,
+ channel,
+ }).catch(async (err: IdentifiableError) => {
+ this.notificationService.createNotification(me.id, 'scheduledNoteFailed', {
+ reason: err.message,
+ });
+ await this.noteScheduleRepository.remove(data);
+ throw this.logger.error(`Schedule Note Failed Reason: ${err.message}`);
+ });
+ await this.noteScheduleRepository.remove(data);
+ this.notificationService.createNotification(me.id, 'scheduledNotePosted', {
+ noteId: createdNote.id,
+ });
+ }
+ });
+ }
+}
diff --git a/packages/backend/src/queue/processors/TickChartsProcessorService.ts b/packages/backend/src/queue/processors/TickChartsProcessorService.ts
index 93ec34162d..c09cbccc57 100644
--- a/packages/backend/src/queue/processors/TickChartsProcessorService.ts
+++ b/packages/backend/src/queue/processors/TickChartsProcessorService.ts
@@ -48,20 +48,18 @@ export class TickChartsProcessorService {
public async process(): Promise<void> {
this.logger.info('Tick charts...');
- await Promise.all([
- this.federationChart.tick(false),
- this.notesChart.tick(false),
- this.usersChart.tick(false),
- this.activeUsersChart.tick(false),
- this.instanceChart.tick(false),
- this.perUserNotesChart.tick(false),
- this.perUserPvChart.tick(false),
- this.driveChart.tick(false),
- this.perUserReactionsChart.tick(false),
- this.perUserFollowingChart.tick(false),
- this.perUserDriveChart.tick(false),
- this.apRequestChart.tick(false),
- ]);
+ await this.federationChart.tick(false);
+ await this.notesChart.tick(false);
+ await this.usersChart.tick(false);
+ await this.activeUsersChart.tick(false);
+ await this.instanceChart.tick(false);
+ await this.perUserNotesChart.tick(false);
+ await this.perUserPvChart.tick(false);
+ await this.driveChart.tick(false);
+ await this.perUserReactionsChart.tick(false);
+ await this.perUserFollowingChart.tick(false);
+ await this.perUserDriveChart.tick(false);
+ await this.apRequestChart.tick(false);
this.logger.succ('All charts successfully ticked.');
}