diff options
| author | Marie <github@yuugi.dev> | 2024-12-12 12:50:11 +0000 |
|---|---|---|
| committer | Marie <github@yuugi.dev> | 2024-12-12 12:50:11 +0000 |
| commit | 8eb9c20df73960baf78834da65349fbfb7014f23 (patch) | |
| tree | ab8326ede2f2a475069ad7b820c6689401798c79 /packages/backend/src/queue | |
| parent | merge: fix icons in dev mode (!813) (diff) | |
| parent | Apply suggestions (diff) | |
| download | sharkey-8eb9c20df73960baf78834da65349fbfb7014f23.tar.gz sharkey-8eb9c20df73960baf78834da65349fbfb7014f23.tar.bz2 sharkey-8eb9c20df73960baf78834da65349fbfb7014f23.zip | |
merge: Schedule Notes (!804)
View MR for information: https://activitypub.software/TransFem-org/Sharkey/-/merge_requests/804
Approved-by: dakkar <dakkar@thenautilus.net>
Approved-by: Hazelnoot <acomputerdog@gmail.com>
Diffstat (limited to 'packages/backend/src/queue')
| -rw-r--r-- | packages/backend/src/queue/QueueProcessorModule.ts | 2 | ||||
| -rw-r--r-- | packages/backend/src/queue/QueueProcessorService.ts | 14 | ||||
| -rw-r--r-- | packages/backend/src/queue/const.ts | 1 | ||||
| -rw-r--r-- | packages/backend/src/queue/processors/ScheduleNotePostProcessorService.ts | 144 | ||||
| -rw-r--r-- | packages/backend/src/queue/types.ts | 4 |
5 files changed, 165 insertions, 0 deletions
diff --git a/packages/backend/src/queue/QueueProcessorModule.ts b/packages/backend/src/queue/QueueProcessorModule.ts index 7c6675b15d..dd588e0115 100644 --- a/packages/backend/src/queue/QueueProcessorModule.ts +++ b/packages/backend/src/queue/QueueProcessorModule.ts @@ -42,6 +42,7 @@ import { TickChartsProcessorService } from './processors/TickChartsProcessorServ import { AggregateRetentionProcessorService } from './processors/AggregateRetentionProcessorService.js'; import { ExportFavoritesProcessorService } from './processors/ExportFavoritesProcessorService.js'; import { RelationshipProcessorService } from './processors/RelationshipProcessorService.js'; +import { ScheduleNotePostProcessorService } from './processors/ScheduleNotePostProcessorService.js'; @Module({ imports: [ @@ -85,6 +86,7 @@ import { RelationshipProcessorService } from './processors/RelationshipProcessor InboxProcessorService, AggregateRetentionProcessorService, QueueProcessorService, + ScheduleNotePostProcessorService, ], exports: [ QueueProcessorService, diff --git a/packages/backend/src/queue/QueueProcessorService.ts b/packages/backend/src/queue/QueueProcessorService.ts index f130314e74..4cc5446062 100644 --- a/packages/backend/src/queue/QueueProcessorService.ts +++ b/packages/backend/src/queue/QueueProcessorService.ts @@ -44,6 +44,7 @@ import { CheckExpiredMutingsProcessorService } from './processors/CheckExpiredMu import { BakeBufferedReactionsProcessorService } from './processors/BakeBufferedReactionsProcessorService.js'; import { CleanProcessorService } from './processors/CleanProcessorService.js'; import { AggregateRetentionProcessorService } from './processors/AggregateRetentionProcessorService.js'; +import { ScheduleNotePostProcessorService } from './processors/ScheduleNotePostProcessorService.js'; import { QueueLoggerService } from './QueueLoggerService.js'; import { QUEUE, baseQueueOptions } from './const.js'; import { ImportNotesProcessorService } from './processors/ImportNotesProcessorService.js'; @@ -86,6 +87,7 @@ export class QueueProcessorService implements OnApplicationShutdown { private relationshipQueueWorker: Bull.Worker; private objectStorageQueueWorker: Bull.Worker; private endedPollNotificationQueueWorker: Bull.Worker; + private schedulerNotePostQueueWorker: Bull.Worker; constructor( @Inject(DI.config) @@ -126,6 +128,7 @@ export class QueueProcessorService implements OnApplicationShutdown { private checkExpiredMutingsProcessorService: CheckExpiredMutingsProcessorService, private bakeBufferedReactionsProcessorService: BakeBufferedReactionsProcessorService, private cleanProcessorService: CleanProcessorService, + private scheduleNotePostProcessorService: ScheduleNotePostProcessorService, ) { this.logger = this.queueLoggerService.logger; @@ -530,6 +533,15 @@ export class QueueProcessorService implements OnApplicationShutdown { }); } //#endregion + + //#region schedule note post + { + this.schedulerNotePostQueueWorker = new Bull.Worker(QUEUE.SCHEDULE_NOTE_POST, (job) => this.scheduleNotePostProcessorService.process(job), { + ...baseQueueOptions(this.config, QUEUE.SCHEDULE_NOTE_POST), + autorun: false, + }); + } + //#endregion } @bindThis @@ -544,6 +556,7 @@ export class QueueProcessorService implements OnApplicationShutdown { this.relationshipQueueWorker.run(), this.objectStorageQueueWorker.run(), this.endedPollNotificationQueueWorker.run(), + this.schedulerNotePostQueueWorker.run(), ]); } @@ -559,6 +572,7 @@ export class QueueProcessorService implements OnApplicationShutdown { this.relationshipQueueWorker.close(), this.objectStorageQueueWorker.close(), this.endedPollNotificationQueueWorker.close(), + this.schedulerNotePostQueueWorker.close(), ]); } diff --git a/packages/backend/src/queue/const.ts b/packages/backend/src/queue/const.ts index 67f689b618..fdf012f149 100644 --- a/packages/backend/src/queue/const.ts +++ b/packages/backend/src/queue/const.ts @@ -16,6 +16,7 @@ export const QUEUE = { OBJECT_STORAGE: 'objectStorage', USER_WEBHOOK_DELIVER: 'userWebhookDeliver', SYSTEM_WEBHOOK_DELIVER: 'systemWebhookDeliver', + SCHEDULE_NOTE_POST: 'scheduleNotePost', }; export function baseQueueOptions(config: Config, queueName: typeof QUEUE[keyof typeof QUEUE]): Bull.QueueOptions { diff --git a/packages/backend/src/queue/processors/ScheduleNotePostProcessorService.ts b/packages/backend/src/queue/processors/ScheduleNotePostProcessorService.ts new file mode 100644 index 0000000000..62e3d1072f --- /dev/null +++ b/packages/backend/src/queue/processors/ScheduleNotePostProcessorService.ts @@ -0,0 +1,144 @@ +/* + * SPDX-FileCopyrightText: syuilo and other misskey contributors + * SPDX-License-Identifier: AGPL-3.0-only + */ + +import { Inject, Injectable } from '@nestjs/common'; +import type Logger from '@/logger.js'; +import { bindThis } from '@/decorators.js'; +import { NoteCreateService } from '@/core/NoteCreateService.js'; +import type { ChannelsRepository, DriveFilesRepository, MiDriveFile, NoteScheduleRepository, NotesRepository, UsersRepository } from '@/models/_.js'; +import { DI } from '@/di-symbols.js'; +import { NotificationService } from '@/core/NotificationService.js'; +import { IdentifiableError } from '@/misc/identifiable-error.js'; +import type { MiScheduleNoteType } from '@/models/NoteSchedule.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import type * as Bull from 'bullmq'; +import type { ScheduleNotePostJobData } from '../types.js'; + +@Injectable() +export class ScheduleNotePostProcessorService { + private logger: Logger; + + constructor( + @Inject(DI.noteScheduleRepository) + private noteScheduleRepository: NoteScheduleRepository, + + @Inject(DI.usersRepository) + private usersRepository: UsersRepository, + @Inject(DI.driveFilesRepository) + private driveFilesRepository: DriveFilesRepository, + @Inject(DI.notesRepository) + private notesRepository: NotesRepository, + @Inject(DI.channelsRepository) + private channelsRepository: ChannelsRepository, + + private noteCreateService: NoteCreateService, + private queueLoggerService: QueueLoggerService, + private notificationService: NotificationService, + ) { + this.logger = this.queueLoggerService.logger.createSubLogger('schedule-note-post'); + } + + @bindThis + private async isValidNoteSchedule(note: MiScheduleNoteType, id: string): Promise<boolean> { + const reply = note.reply ? await this.notesRepository.findOneBy({ id: note.reply }) : undefined; + const renote = note.reply ? await this.notesRepository.findOneBy({ id: note.renote }) : undefined; + const channel = note.channel ? await this.channelsRepository.findOneBy({ id: note.channel, isArchived: false }) : undefined; + if (note.reply && !reply) { + this.logger.warn('Schedule Note Failed Reason: parent note to reply does not exist'); + this.notificationService.createNotification(id, 'scheduledNoteFailed', { + reason: 'Replied to note on your scheduled note no longer exists', + }); + return false; + } + if (note.renote && !renote) { + this.logger.warn('Schedule Note Failed Reason: attached quote note no longer exists'); + this.notificationService.createNotification(id, 'scheduledNoteFailed', { + reason: 'A quoted note from one of your scheduled notes no longer exists', + }); + return false; + } + if (note.channel && !channel) { + this.logger.warn('Schedule Note Failed Reason: Channel does not exist'); + this.notificationService.createNotification(id, 'scheduledNoteFailed', { + reason: 'An attached channel on your scheduled note no longer exists', + }); + return false; + } + return true; + } + + @bindThis + public async process(job: Bull.Job<ScheduleNotePostJobData>): Promise<void> { + this.noteScheduleRepository.findOneBy({ id: job.data.scheduleNoteId }).then(async (data) => { + if (!data) { + this.logger.warn(`Schedule note ${job.data.scheduleNoteId} not found`); + } else { + const me = await this.usersRepository.findOneBy({ id: data.userId }); + const note = data.note; + const reply = note.reply ? await this.notesRepository.findOneBy({ id: note.reply }) : undefined; + const renote = note.reply ? await this.notesRepository.findOneBy({ id: note.renote }) : undefined; + const channel = note.channel ? await this.channelsRepository.findOneBy({ id: note.channel, isArchived: false }) : undefined; + + let files: MiDriveFile[] = []; + const fileIds = note.files; + + if (fileIds.length > 0 && me) { + files = await this.driveFilesRepository.createQueryBuilder('file') + .where('file.userId = :userId AND file.id IN (:...fileIds)', { + userId: me.id, + fileIds, + }) + .orderBy('array_position(ARRAY[:...fileIds], "id"::text)') + .setParameters({ fileIds }) + .getMany(); + } + + if (!data.userId || !me) { + this.logger.warn('Schedule Note Failed Reason: User Not Found'); + await this.noteScheduleRepository.remove(data); + return; + } + + if (!await this.isValidNoteSchedule(note, me.id)) { + await this.noteScheduleRepository.remove(data); + return; + } + + if (note.files.length !== files.length) { + this.logger.warn('Schedule Note Failed Reason: files are missing in the user\'s drive'); + this.notificationService.createNotification(me.id, 'scheduledNoteFailed', { + reason: 'Some attached files on your scheduled note no longer exist', + }); + await this.noteScheduleRepository.remove(data); + return; + } + + const createdNote = await this.noteCreateService.create(me, { + ...note, + createdAt: new Date(), + files, + poll: note.poll ? { + choices: note.poll.choices, + multiple: note.poll.multiple, + expiresAt: note.poll.expiresAt ? new Date(note.poll.expiresAt) : null, + } : undefined, + reply, + renote, + channel, + }).catch(async (err: IdentifiableError) => { + this.notificationService.createNotification(me.id, 'scheduledNoteFailed', { + reason: err.message, + }); + await this.noteScheduleRepository.remove(data); + throw this.logger.error(`Schedule Note Failed Reason: ${err.message}`); + }); + await this.noteScheduleRepository.remove(data); + this.notificationService.createNotification(me.id, 'scheduledNotePosted', { + noteId: createdNote.id, + }); + } + }); + } +} diff --git a/packages/backend/src/queue/types.ts b/packages/backend/src/queue/types.ts index c0d246ebbc..9433392df5 100644 --- a/packages/backend/src/queue/types.ts +++ b/packages/backend/src/queue/types.ts @@ -155,3 +155,7 @@ export type UserWebhookDeliverJobData = { export type ThinUser = { id: MiUser['id']; }; + +export type ScheduleNotePostJobData = { + scheduleNoteId: MiNote['id']; +} |