diff options
| author | misskey-release-bot[bot] <157398866+misskey-release-bot[bot]@users.noreply.github.com> | 2025-12-06 12:22:58 +0000 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-12-06 12:22:58 +0000 |
| commit | e40c84f31df0202351c5585d3edbca000846b73b (patch) | |
| tree | 548eafb27b758c55de2e750a0ef9efbe3f056357 /packages/backend/src/queue | |
| parent | Merge pull request #16840 from misskey-dev/develop (diff) | |
| parent | Release: 2025.12.0 (diff) | |
| download | misskey-e40c84f31df0202351c5585d3edbca000846b73b.tar.gz misskey-e40c84f31df0202351c5585d3edbca000846b73b.tar.bz2 misskey-e40c84f31df0202351c5585d3edbca000846b73b.zip | |
Merge pull request #16916 from misskey-dev/develop
Release: 2025.12.0
Diffstat (limited to 'packages/backend/src/queue')
3 files changed, 83 insertions, 59 deletions
diff --git a/packages/backend/src/queue/QueueProcessorService.ts b/packages/backend/src/queue/QueueProcessorService.ts index 642d3fc8ad..306fdb41f6 100644 --- a/packages/backend/src/queue/QueueProcessorService.ts +++ b/packages/backend/src/queue/QueueProcessorService.ts @@ -5,7 +5,6 @@ import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common'; import * as Bull from 'bullmq'; -import * as Sentry from '@sentry/node'; import type { Config } from '@/config.js'; import { DI } from '@/di-symbols.js'; import type Logger from '@/logger.js'; @@ -157,6 +156,13 @@ export class QueueProcessorService implements OnApplicationShutdown { }; } + let Sentry: typeof import('@sentry/node') | undefined; + if (Sentry != null) { + import('@sentry/node').then((mod) => { + Sentry = mod; + }); + } + //#region system { const processer = (job: Bull.Job) => { @@ -175,7 +181,7 @@ export class QueueProcessorService implements OnApplicationShutdown { }; this.systemQueueWorker = new Bull.Worker(QUEUE.SYSTEM, (job) => { - if (this.config.sentryForBackend) { + if (Sentry != null) { return Sentry.startSpan({ name: 'Queue: System: ' + job.name }, () => processer(job)); } else { return processer(job); @@ -192,7 +198,7 @@ export class QueueProcessorService implements OnApplicationShutdown { .on('completed', (job, result) => logger.debug(`completed(${result}) id=${job.id}`)) .on('failed', (job, err: Error) => { logger.error(`failed(${err.name}: ${err.message}) id=${job?.id ?? '?'}`, { job: renderJob(job), e: renderError(err) }); - if (config.sentryForBackend) { + if (Sentry != null) { Sentry.captureMessage(`Queue: System: ${job?.name ?? '?'}: ${err.name}: ${err.message}`, { level: 'error', extra: { job, err }, @@ -232,7 +238,7 @@ export class QueueProcessorService implements OnApplicationShutdown { }; this.dbQueueWorker = new Bull.Worker(QUEUE.DB, (job) => { - if (this.config.sentryForBackend) { + if (Sentry != null) { return Sentry.startSpan({ name: 'Queue: DB: ' + job.name }, () => processer(job)); } else { return processer(job); @@ -249,7 +255,7 @@ export class QueueProcessorService implements OnApplicationShutdown { .on('completed', (job, result) => logger.debug(`completed(${result}) id=${job.id}`)) .on('failed', (job, err) => { logger.error(`failed(${err.name}: ${err.message}) id=${job?.id ?? '?'}`, { job: renderJob(job), e: renderError(err) }); - if (config.sentryForBackend) { + if (Sentry != null) { Sentry.captureMessage(`Queue: DB: ${job?.name ?? '?'}: ${err.name}: ${err.message}`, { level: 'error', extra: { job, err }, @@ -264,7 +270,7 @@ export class QueueProcessorService implements OnApplicationShutdown { //#region deliver { this.deliverQueueWorker = new Bull.Worker(QUEUE.DELIVER, (job) => { - if (this.config.sentryForBackend) { + if (Sentry != null) { return Sentry.startSpan({ name: 'Queue: Deliver' }, () => this.deliverProcessorService.process(job)); } else { return this.deliverProcessorService.process(job); @@ -289,7 +295,7 @@ export class QueueProcessorService implements OnApplicationShutdown { .on('completed', (job, result) => logger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`)) .on('failed', (job, err) => { logger.error(`failed(${err.name}: ${err.message}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`); - if (config.sentryForBackend) { + if (Sentry != null) { Sentry.captureMessage(`Queue: Deliver: ${err.name}: ${err.message}`, { level: 'error', extra: { job, err }, @@ -304,7 +310,7 @@ export class QueueProcessorService implements OnApplicationShutdown { //#region inbox { this.inboxQueueWorker = new Bull.Worker(QUEUE.INBOX, (job) => { - if (this.config.sentryForBackend) { + if (Sentry != null) { return Sentry.startSpan({ name: 'Queue: Inbox' }, () => this.inboxProcessorService.process(job)); } else { return this.inboxProcessorService.process(job); @@ -329,7 +335,7 @@ export class QueueProcessorService implements OnApplicationShutdown { .on('completed', (job, result) => logger.debug(`completed(${result}) ${getJobInfo(job, true)}`)) .on('failed', (job, err) => { logger.error(`failed(${err.name}: ${err.message}) ${getJobInfo(job)} activity=${job ? (job.data.activity ? job.data.activity.id : 'none') : '-'}`, { job: renderJob(job), e: renderError(err) }); - if (config.sentryForBackend) { + if (Sentry != null) { Sentry.captureMessage(`Queue: Inbox: ${err.name}: ${err.message}`, { level: 'error', extra: { job, err }, @@ -344,7 +350,7 @@ export class QueueProcessorService implements OnApplicationShutdown { //#region user-webhook deliver { this.userWebhookDeliverQueueWorker = new Bull.Worker(QUEUE.USER_WEBHOOK_DELIVER, (job) => { - if (this.config.sentryForBackend) { + if (Sentry != null) { return Sentry.startSpan({ name: 'Queue: UserWebhookDeliver' }, () => this.userWebhookDeliverProcessorService.process(job)); } else { return this.userWebhookDeliverProcessorService.process(job); @@ -369,7 +375,7 @@ export class QueueProcessorService implements OnApplicationShutdown { .on('completed', (job, result) => logger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`)) .on('failed', (job, err) => { logger.error(`failed(${err.name}: ${err.message}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`); - if (config.sentryForBackend) { + if (Sentry != null) { Sentry.captureMessage(`Queue: UserWebhookDeliver: ${err.name}: ${err.message}`, { level: 'error', extra: { job, err }, @@ -384,7 +390,7 @@ export class QueueProcessorService implements OnApplicationShutdown { //#region system-webhook deliver { this.systemWebhookDeliverQueueWorker = new Bull.Worker(QUEUE.SYSTEM_WEBHOOK_DELIVER, (job) => { - if (this.config.sentryForBackend) { + if (Sentry != null) { return Sentry.startSpan({ name: 'Queue: SystemWebhookDeliver' }, () => this.systemWebhookDeliverProcessorService.process(job)); } else { return this.systemWebhookDeliverProcessorService.process(job); @@ -409,7 +415,7 @@ export class QueueProcessorService implements OnApplicationShutdown { .on('completed', (job, result) => logger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`)) .on('failed', (job, err) => { logger.error(`failed(${err.name}: ${err.message}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`); - if (config.sentryForBackend) { + if (Sentry != null) { Sentry.captureMessage(`Queue: SystemWebhookDeliver: ${err.name}: ${err.message}`, { level: 'error', extra: { job, err }, @@ -434,7 +440,7 @@ export class QueueProcessorService implements OnApplicationShutdown { }; this.relationshipQueueWorker = new Bull.Worker(QUEUE.RELATIONSHIP, (job) => { - if (this.config.sentryForBackend) { + if (Sentry != null) { return Sentry.startSpan({ name: 'Queue: Relationship: ' + job.name }, () => processer(job)); } else { return processer(job); @@ -456,7 +462,7 @@ export class QueueProcessorService implements OnApplicationShutdown { .on('completed', (job, result) => logger.debug(`completed(${result}) id=${job.id}`)) .on('failed', (job, err) => { logger.error(`failed(${err.name}: ${err.message}) id=${job?.id ?? '?'}`, { job: renderJob(job), e: renderError(err) }); - if (config.sentryForBackend) { + if (Sentry != null) { Sentry.captureMessage(`Queue: Relationship: ${job?.name ?? '?'}: ${err.name}: ${err.message}`, { level: 'error', extra: { job, err }, @@ -479,7 +485,7 @@ export class QueueProcessorService implements OnApplicationShutdown { }; this.objectStorageQueueWorker = new Bull.Worker(QUEUE.OBJECT_STORAGE, (job) => { - if (this.config.sentryForBackend) { + if (Sentry != null) { return Sentry.startSpan({ name: 'Queue: ObjectStorage: ' + job.name }, () => processer(job)); } else { return processer(job); @@ -497,7 +503,7 @@ export class QueueProcessorService implements OnApplicationShutdown { .on('completed', (job, result) => logger.debug(`completed(${result}) id=${job.id}`)) .on('failed', (job, err) => { logger.error(`failed(${err.name}: ${err.message}) id=${job?.id ?? '?'}`, { job: renderJob(job), e: renderError(err) }); - if (config.sentryForBackend) { + if (Sentry != null) { Sentry.captureMessage(`Queue: ObjectStorage: ${job?.name ?? '?'}: ${err.name}: ${err.message}`, { level: 'error', extra: { job, err }, @@ -512,7 +518,7 @@ export class QueueProcessorService implements OnApplicationShutdown { //#region ended poll notification { this.endedPollNotificationQueueWorker = new Bull.Worker(QUEUE.ENDED_POLL_NOTIFICATION, (job) => { - if (this.config.sentryForBackend) { + if (Sentry != null) { return Sentry.startSpan({ name: 'Queue: EndedPollNotification' }, () => this.endedPollNotificationProcessorService.process(job)); } else { return this.endedPollNotificationProcessorService.process(job); @@ -527,7 +533,7 @@ export class QueueProcessorService implements OnApplicationShutdown { //#region post scheduled note { this.postScheduledNoteQueueWorker = new Bull.Worker(QUEUE.POST_SCHEDULED_NOTE, async (job) => { - if (this.config.sentryForBackend) { + if (Sentry != null) { return Sentry.startSpan({ name: 'Queue: PostScheduledNote' }, () => this.postScheduledNoteProcessorService.process(job)); } else { return this.postScheduledNoteProcessorService.process(job); diff --git a/packages/backend/src/queue/processors/ExportClipsProcessorService.ts b/packages/backend/src/queue/processors/ExportClipsProcessorService.ts index 486dc4c01f..be7d4e9e21 100644 --- a/packages/backend/src/queue/processors/ExportClipsProcessorService.ts +++ b/packages/backend/src/queue/processors/ExportClipsProcessorService.ts @@ -5,21 +5,20 @@ import * as fs from 'node:fs'; import { Writable } from 'node:stream'; -import { Inject, Injectable, StreamableFile } from '@nestjs/common'; -import { MoreThan } from 'typeorm'; +import { Inject, Injectable } from '@nestjs/common'; import { format as dateFormat } from 'date-fns'; import { DI } from '@/di-symbols.js'; -import type { ClipNotesRepository, ClipsRepository, MiClip, MiClipNote, MiUser, NotesRepository, PollsRepository, UsersRepository } from '@/models/_.js'; +import type { ClipNotesRepository, ClipsRepository, MiClip, MiClipNote, MiUser, PollsRepository, UsersRepository } from '@/models/_.js'; import type Logger from '@/logger.js'; import { DriveService } from '@/core/DriveService.js'; import { createTemp } from '@/misc/create-temp.js'; import type { MiPoll } from '@/models/Poll.js'; import type { MiNote } from '@/models/Note.js'; import { bindThis } from '@/decorators.js'; -import { DriveFileEntityService } from '@/core/entities/DriveFileEntityService.js'; -import { Packed } from '@/misc/json-schema.js'; import { IdService } from '@/core/IdService.js'; import { NotificationService } from '@/core/NotificationService.js'; +import { QueryService } from '@/core/QueryService.js'; +import { shouldHideNoteByTime } from '@/misc/should-hide-note-by-time.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import type * as Bull from 'bullmq'; import type { DbJobDataWithUser } from '../types.js'; @@ -43,6 +42,7 @@ export class ExportClipsProcessorService { private driveService: DriveService, private queueLoggerService: QueueLoggerService, + private queryService: QueryService, private idService: IdService, private notificationService: NotificationService, ) { @@ -100,16 +100,16 @@ export class ExportClipsProcessorService { }); while (true) { - const clips = await this.clipsRepository.find({ - where: { - userId: user.id, - ...(cursor ? { id: MoreThan(cursor) } : {}), - }, - take: 100, - order: { - id: 1, - }, - }); + const query = this.clipsRepository.createQueryBuilder('clip') + .where('clip.userId = :userId', { userId: user.id }) + .orderBy('clip.id', 'ASC') + .take(100); + + if (cursor) { + query.andWhere('clip.id > :cursor', { cursor }); + } + + const clips = await query.getMany(); if (clips.length === 0) { job.updateProgress(100); @@ -124,7 +124,7 @@ export class ExportClipsProcessorService { const isFirst = exportedClipsCount === 0; await writer.write(isFirst ? content : ',\n' + content); - await this.processClipNotes(writer, clip.id); + await this.processClipNotes(writer, clip.id, user.id); await writer.write(']}'); exportedClipsCount++; @@ -134,22 +134,25 @@ export class ExportClipsProcessorService { } } - async processClipNotes(writer: WritableStreamDefaultWriter, clipId: string): Promise<void> { + async processClipNotes(writer: WritableStreamDefaultWriter, clipId: string, userId: string): Promise<void> { let exportedClipNotesCount = 0; let cursor: MiClipNote['id'] | null = null; while (true) { - const clipNotes = await this.clipNotesRepository.find({ - where: { - clipId, - ...(cursor ? { id: MoreThan(cursor) } : {}), - }, - take: 100, - order: { - id: 1, - }, - relations: ['note', 'note.user'], - }) as (MiClipNote & { note: MiNote & { user: MiUser } })[]; + const query = this.clipNotesRepository.createQueryBuilder('clipNote') + .leftJoinAndSelect('clipNote.note', 'note') + .leftJoinAndSelect('note.user', 'user') + .where('clipNote.clipId = :clipId', { clipId }) + .orderBy('clipNote.id', 'ASC') + .take(100); + + if (cursor) { + query.andWhere('clipNote.id > :cursor', { cursor }); + } + + this.queryService.generateVisibilityQuery(query, { id: userId }); + + const clipNotes = await query.getMany() as (MiClipNote & { note: MiNote & { user: MiUser } })[]; if (clipNotes.length === 0) { break; @@ -158,6 +161,11 @@ export class ExportClipsProcessorService { cursor = clipNotes.at(-1)?.id ?? null; for (const clipNote of clipNotes) { + const noteCreatedAt = this.idService.parse(clipNote.note.id).date; + if (shouldHideNoteByTime(clipNote.note.user.makeNotesHiddenBefore, noteCreatedAt)) { + continue; + } + let poll: MiPoll | undefined; if (clipNote.note.hasPoll) { poll = await this.pollsRepository.findOneByOrFail({ noteId: clipNote.note.id }); diff --git a/packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts b/packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts index 7918c8ccb5..87a8ded307 100644 --- a/packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts +++ b/packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts @@ -5,7 +5,6 @@ import * as fs from 'node:fs'; import { Inject, Injectable } from '@nestjs/common'; -import { MoreThan } from 'typeorm'; import { format as dateFormat } from 'date-fns'; import { DI } from '@/di-symbols.js'; import type { MiNoteFavorite, NoteFavoritesRepository, PollsRepository, MiUser, UsersRepository } from '@/models/_.js'; @@ -17,6 +16,8 @@ import type { MiNote } from '@/models/Note.js'; import { bindThis } from '@/decorators.js'; import { IdService } from '@/core/IdService.js'; import { NotificationService } from '@/core/NotificationService.js'; +import { QueryService } from '@/core/QueryService.js'; +import { shouldHideNoteByTime } from '@/misc/should-hide-note-by-time.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import type * as Bull from 'bullmq'; import type { DbJobDataWithUser } from '../types.js'; @@ -37,6 +38,7 @@ export class ExportFavoritesProcessorService { private driveService: DriveService, private queueLoggerService: QueueLoggerService, + private queryService: QueryService, private idService: IdService, private notificationService: NotificationService, ) { @@ -83,17 +85,20 @@ export class ExportFavoritesProcessorService { }); while (true) { - const favorites = await this.noteFavoritesRepository.find({ - where: { - userId: user.id, - ...(cursor ? { id: MoreThan(cursor) } : {}), - }, - take: 100, - order: { - id: 1, - }, - relations: ['note', 'note.user'], - }) as (MiNoteFavorite & { note: MiNote & { user: MiUser } })[]; + const query = this.noteFavoritesRepository.createQueryBuilder('favorite') + .leftJoinAndSelect('favorite.note', 'note') + .leftJoinAndSelect('note.user', 'user') + .where('favorite.userId = :userId', { userId: user.id }) + .orderBy('favorite.id', 'ASC') + .take(100); + + if (cursor) { + query.andWhere('favorite.id > :cursor', { cursor }); + } + + this.queryService.generateVisibilityQuery(query, { id: user.id }); + + const favorites = await query.getMany() as (MiNoteFavorite & { note: MiNote & { user: MiUser } })[]; if (favorites.length === 0) { job.updateProgress(100); @@ -103,6 +108,11 @@ export class ExportFavoritesProcessorService { cursor = favorites.at(-1)?.id ?? null; for (const favorite of favorites) { + const noteCreatedAt = this.idService.parse(favorite.note.id).date; + if (shouldHideNoteByTime(favorite.note.user.makeNotesHiddenBefore, noteCreatedAt)) { + continue; + } + let poll: MiPoll | undefined; if (favorite.note.hasPoll) { poll = await this.pollsRepository.findOneByOrFail({ noteId: favorite.note.id }); |