From 2528508cff9d8c90abd33e46b15220a49a00e2e2 Mon Sep 17 00:00:00 2001 From: NoriDev Date: Thu, 31 Oct 2024 13:52:01 +0900 Subject: feat: 노트 게시를 예약할 수 있음 (yojo-art/cherrypick#483, [Type4ny-Project/Type4ny@271c872c](https://github.com/Type4ny-Project/Type4ny/commit/271c872c97f215ef5d8e0be62251dd422a52e5b1)) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- packages/backend/src/queue/QueueProcessorModule.ts | 2 + .../backend/src/queue/QueueProcessorService.ts | 14 ++++ packages/backend/src/queue/const.ts | 1 + .../processors/ScheduleNotePostProcessorService.ts | 94 ++++++++++++++++++++++ packages/backend/src/queue/types.ts | 4 + 5 files changed, 115 insertions(+) create mode 100644 packages/backend/src/queue/processors/ScheduleNotePostProcessorService.ts (limited to 'packages/backend/src/queue') diff --git a/packages/backend/src/queue/QueueProcessorModule.ts b/packages/backend/src/queue/QueueProcessorModule.ts index 7c6675b15d..dd588e0115 100644 --- a/packages/backend/src/queue/QueueProcessorModule.ts +++ b/packages/backend/src/queue/QueueProcessorModule.ts @@ -42,6 +42,7 @@ import { TickChartsProcessorService } from './processors/TickChartsProcessorServ import { AggregateRetentionProcessorService } from './processors/AggregateRetentionProcessorService.js'; import { ExportFavoritesProcessorService } from './processors/ExportFavoritesProcessorService.js'; import { RelationshipProcessorService } from './processors/RelationshipProcessorService.js'; +import { ScheduleNotePostProcessorService } from './processors/ScheduleNotePostProcessorService.js'; @Module({ imports: [ @@ -85,6 +86,7 @@ import { RelationshipProcessorService } from './processors/RelationshipProcessor InboxProcessorService, AggregateRetentionProcessorService, QueueProcessorService, + ScheduleNotePostProcessorService, ], exports: [ QueueProcessorService, diff --git a/packages/backend/src/queue/QueueProcessorService.ts b/packages/backend/src/queue/QueueProcessorService.ts index f130314e74..4cc5446062 100644 --- a/packages/backend/src/queue/QueueProcessorService.ts +++ b/packages/backend/src/queue/QueueProcessorService.ts @@ -44,6 +44,7 @@ import { CheckExpiredMutingsProcessorService } from './processors/CheckExpiredMu import { BakeBufferedReactionsProcessorService } from './processors/BakeBufferedReactionsProcessorService.js'; import { CleanProcessorService } from './processors/CleanProcessorService.js'; import { AggregateRetentionProcessorService } from './processors/AggregateRetentionProcessorService.js'; +import { ScheduleNotePostProcessorService } from './processors/ScheduleNotePostProcessorService.js'; import { QueueLoggerService } from './QueueLoggerService.js'; import { QUEUE, baseQueueOptions } from './const.js'; import { ImportNotesProcessorService } from './processors/ImportNotesProcessorService.js'; @@ -86,6 +87,7 @@ export class QueueProcessorService implements OnApplicationShutdown { private relationshipQueueWorker: Bull.Worker; private objectStorageQueueWorker: Bull.Worker; private endedPollNotificationQueueWorker: Bull.Worker; + private schedulerNotePostQueueWorker: Bull.Worker; constructor( @Inject(DI.config) @@ -126,6 +128,7 @@ export class QueueProcessorService implements OnApplicationShutdown { private checkExpiredMutingsProcessorService: CheckExpiredMutingsProcessorService, private bakeBufferedReactionsProcessorService: BakeBufferedReactionsProcessorService, private cleanProcessorService: CleanProcessorService, + private scheduleNotePostProcessorService: ScheduleNotePostProcessorService, ) { this.logger = this.queueLoggerService.logger; @@ -530,6 +533,15 @@ export class QueueProcessorService implements OnApplicationShutdown { }); } //#endregion + + //#region schedule note post + { + this.schedulerNotePostQueueWorker = new Bull.Worker(QUEUE.SCHEDULE_NOTE_POST, (job) => this.scheduleNotePostProcessorService.process(job), { + ...baseQueueOptions(this.config, QUEUE.SCHEDULE_NOTE_POST), + autorun: false, + }); + } + //#endregion } @bindThis @@ -544,6 +556,7 @@ export class QueueProcessorService implements OnApplicationShutdown { this.relationshipQueueWorker.run(), this.objectStorageQueueWorker.run(), this.endedPollNotificationQueueWorker.run(), + this.schedulerNotePostQueueWorker.run(), ]); } @@ -559,6 +572,7 @@ export class QueueProcessorService implements OnApplicationShutdown { this.relationshipQueueWorker.close(), this.objectStorageQueueWorker.close(), this.endedPollNotificationQueueWorker.close(), + this.schedulerNotePostQueueWorker.close(), ]); } diff --git a/packages/backend/src/queue/const.ts b/packages/backend/src/queue/const.ts index 67f689b618..fdf012f149 100644 --- a/packages/backend/src/queue/const.ts +++ b/packages/backend/src/queue/const.ts @@ -16,6 +16,7 @@ export const QUEUE = { OBJECT_STORAGE: 'objectStorage', USER_WEBHOOK_DELIVER: 'userWebhookDeliver', SYSTEM_WEBHOOK_DELIVER: 'systemWebhookDeliver', + SCHEDULE_NOTE_POST: 'scheduleNotePost', }; export function baseQueueOptions(config: Config, queueName: typeof QUEUE[keyof typeof QUEUE]): Bull.QueueOptions { diff --git a/packages/backend/src/queue/processors/ScheduleNotePostProcessorService.ts b/packages/backend/src/queue/processors/ScheduleNotePostProcessorService.ts new file mode 100644 index 0000000000..62d527953d --- /dev/null +++ b/packages/backend/src/queue/processors/ScheduleNotePostProcessorService.ts @@ -0,0 +1,94 @@ +/* + * SPDX-FileCopyrightText: syuilo and other misskey contributors + * SPDX-License-Identifier: AGPL-3.0-only + */ + +import { Inject, Injectable } from '@nestjs/common'; +import type Logger from '@/logger.js'; +import { bindThis } from '@/decorators.js'; +import { NoteCreateService } from '@/core/NoteCreateService.js'; +import type { ChannelsRepository, DriveFilesRepository, MiDriveFile, NoteScheduleRepository, NotesRepository, UsersRepository } from '@/models/_.js'; +import { DI } from '@/di-symbols.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import type * as Bull from 'bullmq'; +import type { ScheduleNotePostJobData } from '../types.js'; + +@Injectable() +export class ScheduleNotePostProcessorService { + private logger: Logger; + + constructor( + @Inject(DI.noteScheduleRepository) + private noteScheduleRepository: NoteScheduleRepository, + + @Inject(DI.usersRepository) + private usersRepository: UsersRepository, + @Inject(DI.driveFilesRepository) + private driveFilesRepository: DriveFilesRepository, + @Inject(DI.notesRepository) + private notesRepository: NotesRepository, + @Inject(DI.channelsRepository) + private channelsRepository: ChannelsRepository, + + private noteCreateService: NoteCreateService, + private queueLoggerService: QueueLoggerService, + ) { + this.logger = this.queueLoggerService.logger.createSubLogger('schedule-note-post'); + } + + @bindThis + public async process(job: Bull.Job): Promise { + this.noteScheduleRepository.findOneBy({ id: job.data.scheduleNoteId }).then(async (data) => { + if (!data) { + this.logger.warn(`Schedule note ${job.data.scheduleNoteId} not found`); + } else { + const me = await this.usersRepository.findOneBy({ id: data.userId }); + const note = data.note; + + //idの形式でキューに積んであったのをDBから取り寄せる + const reply = note.reply ? await this.notesRepository.findOneBy({ id: note.reply }) : undefined; + const renote = note.reply ? await this.notesRepository.findOneBy({ id: note.renote }) : undefined; + const channel = note.channel ? await this.channelsRepository.findOneBy({ id: note.channel, isArchived: false }) : undefined; + let files: MiDriveFile[] = []; + const fileIds = note.files ?? null; + if (fileIds != null && fileIds.length > 0 && me) { + files = await this.driveFilesRepository.createQueryBuilder('file') + .where('file.userId = :userId AND file.id IN (:...fileIds)', { + userId: me.id, + fileIds, + }) + .orderBy('array_position(ARRAY[:...fileIds], "id"::text)') + .setParameters({ fileIds }) + .getMany(); + } + if ( + !data.userId || + !me || + (note.reply && !reply) || + (note.renote && !renote) || + (note.channel && !channel) || + (note.files.length !== files.length) + ) { + //キューに積んだときは有った物が消滅してたら予約投稿をキャンセルする + this.logger.warn('cancel schedule note'); + await this.noteScheduleRepository.remove(data); + return; + } + await this.noteCreateService.create(me, { + ...note, + createdAt: new Date(note.createdAt), //typeORMのjsonbで何故かstringにされるから戻す + files, + poll: note.poll ? { + choices: note.poll.choices, + multiple: note.poll.multiple, + expiresAt: note.poll.expiresAt ? new Date(note.poll.expiresAt) : null, + } : undefined, + reply, + renote, + channel, + }); + await this.noteScheduleRepository.remove(data); + } + }); + } +} diff --git a/packages/backend/src/queue/types.ts b/packages/backend/src/queue/types.ts index c0d246ebbc..9433392df5 100644 --- a/packages/backend/src/queue/types.ts +++ b/packages/backend/src/queue/types.ts @@ -155,3 +155,7 @@ export type UserWebhookDeliverJobData = { export type ThinUser = { id: MiUser['id']; }; + +export type ScheduleNotePostJobData = { + scheduleNoteId: MiNote['id']; +} -- cgit v1.2.3-freya