summaryrefslogtreecommitdiff
path: root/packages/backend/src/queue
diff options
context:
space:
mode:
authormisskey-release-bot[bot] <157398866+misskey-release-bot[bot]@users.noreply.github.com>2025-12-06 12:22:58 +0000
committerGitHub <noreply@github.com>2025-12-06 12:22:58 +0000
commite40c84f31df0202351c5585d3edbca000846b73b (patch)
tree548eafb27b758c55de2e750a0ef9efbe3f056357 /packages/backend/src/queue
parentMerge pull request #16840 from misskey-dev/develop (diff)
parentRelease: 2025.12.0 (diff)
downloadmisskey-e40c84f31df0202351c5585d3edbca000846b73b.tar.gz
misskey-e40c84f31df0202351c5585d3edbca000846b73b.tar.bz2
misskey-e40c84f31df0202351c5585d3edbca000846b73b.zip
Merge pull request #16916 from misskey-dev/develop
Release: 2025.12.0
Diffstat (limited to 'packages/backend/src/queue')
-rw-r--r--packages/backend/src/queue/QueueProcessorService.ts44
-rw-r--r--packages/backend/src/queue/processors/ExportClipsProcessorService.ts64
-rw-r--r--packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts34
3 files changed, 83 insertions, 59 deletions
diff --git a/packages/backend/src/queue/QueueProcessorService.ts b/packages/backend/src/queue/QueueProcessorService.ts
index 642d3fc8ad..306fdb41f6 100644
--- a/packages/backend/src/queue/QueueProcessorService.ts
+++ b/packages/backend/src/queue/QueueProcessorService.ts
@@ -5,7 +5,6 @@
import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common';
import * as Bull from 'bullmq';
-import * as Sentry from '@sentry/node';
import type { Config } from '@/config.js';
import { DI } from '@/di-symbols.js';
import type Logger from '@/logger.js';
@@ -157,6 +156,13 @@ export class QueueProcessorService implements OnApplicationShutdown {
};
}
+ let Sentry: typeof import('@sentry/node') | undefined;
+ if (Sentry != null) {
+ import('@sentry/node').then((mod) => {
+ Sentry = mod;
+ });
+ }
+
//#region system
{
const processer = (job: Bull.Job) => {
@@ -175,7 +181,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
};
this.systemQueueWorker = new Bull.Worker(QUEUE.SYSTEM, (job) => {
- if (this.config.sentryForBackend) {
+ if (Sentry != null) {
return Sentry.startSpan({ name: 'Queue: System: ' + job.name }, () => processer(job));
} else {
return processer(job);
@@ -192,7 +198,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
.on('completed', (job, result) => logger.debug(`completed(${result}) id=${job.id}`))
.on('failed', (job, err: Error) => {
logger.error(`failed(${err.name}: ${err.message}) id=${job?.id ?? '?'}`, { job: renderJob(job), e: renderError(err) });
- if (config.sentryForBackend) {
+ if (Sentry != null) {
Sentry.captureMessage(`Queue: System: ${job?.name ?? '?'}: ${err.name}: ${err.message}`, {
level: 'error',
extra: { job, err },
@@ -232,7 +238,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
};
this.dbQueueWorker = new Bull.Worker(QUEUE.DB, (job) => {
- if (this.config.sentryForBackend) {
+ if (Sentry != null) {
return Sentry.startSpan({ name: 'Queue: DB: ' + job.name }, () => processer(job));
} else {
return processer(job);
@@ -249,7 +255,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
.on('completed', (job, result) => logger.debug(`completed(${result}) id=${job.id}`))
.on('failed', (job, err) => {
logger.error(`failed(${err.name}: ${err.message}) id=${job?.id ?? '?'}`, { job: renderJob(job), e: renderError(err) });
- if (config.sentryForBackend) {
+ if (Sentry != null) {
Sentry.captureMessage(`Queue: DB: ${job?.name ?? '?'}: ${err.name}: ${err.message}`, {
level: 'error',
extra: { job, err },
@@ -264,7 +270,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
//#region deliver
{
this.deliverQueueWorker = new Bull.Worker(QUEUE.DELIVER, (job) => {
- if (this.config.sentryForBackend) {
+ if (Sentry != null) {
return Sentry.startSpan({ name: 'Queue: Deliver' }, () => this.deliverProcessorService.process(job));
} else {
return this.deliverProcessorService.process(job);
@@ -289,7 +295,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
.on('completed', (job, result) => logger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`))
.on('failed', (job, err) => {
logger.error(`failed(${err.name}: ${err.message}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`);
- if (config.sentryForBackend) {
+ if (Sentry != null) {
Sentry.captureMessage(`Queue: Deliver: ${err.name}: ${err.message}`, {
level: 'error',
extra: { job, err },
@@ -304,7 +310,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
//#region inbox
{
this.inboxQueueWorker = new Bull.Worker(QUEUE.INBOX, (job) => {
- if (this.config.sentryForBackend) {
+ if (Sentry != null) {
return Sentry.startSpan({ name: 'Queue: Inbox' }, () => this.inboxProcessorService.process(job));
} else {
return this.inboxProcessorService.process(job);
@@ -329,7 +335,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
.on('completed', (job, result) => logger.debug(`completed(${result}) ${getJobInfo(job, true)}`))
.on('failed', (job, err) => {
logger.error(`failed(${err.name}: ${err.message}) ${getJobInfo(job)} activity=${job ? (job.data.activity ? job.data.activity.id : 'none') : '-'}`, { job: renderJob(job), e: renderError(err) });
- if (config.sentryForBackend) {
+ if (Sentry != null) {
Sentry.captureMessage(`Queue: Inbox: ${err.name}: ${err.message}`, {
level: 'error',
extra: { job, err },
@@ -344,7 +350,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
//#region user-webhook deliver
{
this.userWebhookDeliverQueueWorker = new Bull.Worker(QUEUE.USER_WEBHOOK_DELIVER, (job) => {
- if (this.config.sentryForBackend) {
+ if (Sentry != null) {
return Sentry.startSpan({ name: 'Queue: UserWebhookDeliver' }, () => this.userWebhookDeliverProcessorService.process(job));
} else {
return this.userWebhookDeliverProcessorService.process(job);
@@ -369,7 +375,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
.on('completed', (job, result) => logger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`))
.on('failed', (job, err) => {
logger.error(`failed(${err.name}: ${err.message}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`);
- if (config.sentryForBackend) {
+ if (Sentry != null) {
Sentry.captureMessage(`Queue: UserWebhookDeliver: ${err.name}: ${err.message}`, {
level: 'error',
extra: { job, err },
@@ -384,7 +390,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
//#region system-webhook deliver
{
this.systemWebhookDeliverQueueWorker = new Bull.Worker(QUEUE.SYSTEM_WEBHOOK_DELIVER, (job) => {
- if (this.config.sentryForBackend) {
+ if (Sentry != null) {
return Sentry.startSpan({ name: 'Queue: SystemWebhookDeliver' }, () => this.systemWebhookDeliverProcessorService.process(job));
} else {
return this.systemWebhookDeliverProcessorService.process(job);
@@ -409,7 +415,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
.on('completed', (job, result) => logger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`))
.on('failed', (job, err) => {
logger.error(`failed(${err.name}: ${err.message}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`);
- if (config.sentryForBackend) {
+ if (Sentry != null) {
Sentry.captureMessage(`Queue: SystemWebhookDeliver: ${err.name}: ${err.message}`, {
level: 'error',
extra: { job, err },
@@ -434,7 +440,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
};
this.relationshipQueueWorker = new Bull.Worker(QUEUE.RELATIONSHIP, (job) => {
- if (this.config.sentryForBackend) {
+ if (Sentry != null) {
return Sentry.startSpan({ name: 'Queue: Relationship: ' + job.name }, () => processer(job));
} else {
return processer(job);
@@ -456,7 +462,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
.on('completed', (job, result) => logger.debug(`completed(${result}) id=${job.id}`))
.on('failed', (job, err) => {
logger.error(`failed(${err.name}: ${err.message}) id=${job?.id ?? '?'}`, { job: renderJob(job), e: renderError(err) });
- if (config.sentryForBackend) {
+ if (Sentry != null) {
Sentry.captureMessage(`Queue: Relationship: ${job?.name ?? '?'}: ${err.name}: ${err.message}`, {
level: 'error',
extra: { job, err },
@@ -479,7 +485,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
};
this.objectStorageQueueWorker = new Bull.Worker(QUEUE.OBJECT_STORAGE, (job) => {
- if (this.config.sentryForBackend) {
+ if (Sentry != null) {
return Sentry.startSpan({ name: 'Queue: ObjectStorage: ' + job.name }, () => processer(job));
} else {
return processer(job);
@@ -497,7 +503,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
.on('completed', (job, result) => logger.debug(`completed(${result}) id=${job.id}`))
.on('failed', (job, err) => {
logger.error(`failed(${err.name}: ${err.message}) id=${job?.id ?? '?'}`, { job: renderJob(job), e: renderError(err) });
- if (config.sentryForBackend) {
+ if (Sentry != null) {
Sentry.captureMessage(`Queue: ObjectStorage: ${job?.name ?? '?'}: ${err.name}: ${err.message}`, {
level: 'error',
extra: { job, err },
@@ -512,7 +518,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
//#region ended poll notification
{
this.endedPollNotificationQueueWorker = new Bull.Worker(QUEUE.ENDED_POLL_NOTIFICATION, (job) => {
- if (this.config.sentryForBackend) {
+ if (Sentry != null) {
return Sentry.startSpan({ name: 'Queue: EndedPollNotification' }, () => this.endedPollNotificationProcessorService.process(job));
} else {
return this.endedPollNotificationProcessorService.process(job);
@@ -527,7 +533,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
//#region post scheduled note
{
this.postScheduledNoteQueueWorker = new Bull.Worker(QUEUE.POST_SCHEDULED_NOTE, async (job) => {
- if (this.config.sentryForBackend) {
+ if (Sentry != null) {
return Sentry.startSpan({ name: 'Queue: PostScheduledNote' }, () => this.postScheduledNoteProcessorService.process(job));
} else {
return this.postScheduledNoteProcessorService.process(job);
diff --git a/packages/backend/src/queue/processors/ExportClipsProcessorService.ts b/packages/backend/src/queue/processors/ExportClipsProcessorService.ts
index 486dc4c01f..be7d4e9e21 100644
--- a/packages/backend/src/queue/processors/ExportClipsProcessorService.ts
+++ b/packages/backend/src/queue/processors/ExportClipsProcessorService.ts
@@ -5,21 +5,20 @@
import * as fs from 'node:fs';
import { Writable } from 'node:stream';
-import { Inject, Injectable, StreamableFile } from '@nestjs/common';
-import { MoreThan } from 'typeorm';
+import { Inject, Injectable } from '@nestjs/common';
import { format as dateFormat } from 'date-fns';
import { DI } from '@/di-symbols.js';
-import type { ClipNotesRepository, ClipsRepository, MiClip, MiClipNote, MiUser, NotesRepository, PollsRepository, UsersRepository } from '@/models/_.js';
+import type { ClipNotesRepository, ClipsRepository, MiClip, MiClipNote, MiUser, PollsRepository, UsersRepository } from '@/models/_.js';
import type Logger from '@/logger.js';
import { DriveService } from '@/core/DriveService.js';
import { createTemp } from '@/misc/create-temp.js';
import type { MiPoll } from '@/models/Poll.js';
import type { MiNote } from '@/models/Note.js';
import { bindThis } from '@/decorators.js';
-import { DriveFileEntityService } from '@/core/entities/DriveFileEntityService.js';
-import { Packed } from '@/misc/json-schema.js';
import { IdService } from '@/core/IdService.js';
import { NotificationService } from '@/core/NotificationService.js';
+import { QueryService } from '@/core/QueryService.js';
+import { shouldHideNoteByTime } from '@/misc/should-hide-note-by-time.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
import type * as Bull from 'bullmq';
import type { DbJobDataWithUser } from '../types.js';
@@ -43,6 +42,7 @@ export class ExportClipsProcessorService {
private driveService: DriveService,
private queueLoggerService: QueueLoggerService,
+ private queryService: QueryService,
private idService: IdService,
private notificationService: NotificationService,
) {
@@ -100,16 +100,16 @@ export class ExportClipsProcessorService {
});
while (true) {
- const clips = await this.clipsRepository.find({
- where: {
- userId: user.id,
- ...(cursor ? { id: MoreThan(cursor) } : {}),
- },
- take: 100,
- order: {
- id: 1,
- },
- });
+ const query = this.clipsRepository.createQueryBuilder('clip')
+ .where('clip.userId = :userId', { userId: user.id })
+ .orderBy('clip.id', 'ASC')
+ .take(100);
+
+ if (cursor) {
+ query.andWhere('clip.id > :cursor', { cursor });
+ }
+
+ const clips = await query.getMany();
if (clips.length === 0) {
job.updateProgress(100);
@@ -124,7 +124,7 @@ export class ExportClipsProcessorService {
const isFirst = exportedClipsCount === 0;
await writer.write(isFirst ? content : ',\n' + content);
- await this.processClipNotes(writer, clip.id);
+ await this.processClipNotes(writer, clip.id, user.id);
await writer.write(']}');
exportedClipsCount++;
@@ -134,22 +134,25 @@ export class ExportClipsProcessorService {
}
}
- async processClipNotes(writer: WritableStreamDefaultWriter, clipId: string): Promise<void> {
+ async processClipNotes(writer: WritableStreamDefaultWriter, clipId: string, userId: string): Promise<void> {
let exportedClipNotesCount = 0;
let cursor: MiClipNote['id'] | null = null;
while (true) {
- const clipNotes = await this.clipNotesRepository.find({
- where: {
- clipId,
- ...(cursor ? { id: MoreThan(cursor) } : {}),
- },
- take: 100,
- order: {
- id: 1,
- },
- relations: ['note', 'note.user'],
- }) as (MiClipNote & { note: MiNote & { user: MiUser } })[];
+ const query = this.clipNotesRepository.createQueryBuilder('clipNote')
+ .leftJoinAndSelect('clipNote.note', 'note')
+ .leftJoinAndSelect('note.user', 'user')
+ .where('clipNote.clipId = :clipId', { clipId })
+ .orderBy('clipNote.id', 'ASC')
+ .take(100);
+
+ if (cursor) {
+ query.andWhere('clipNote.id > :cursor', { cursor });
+ }
+
+ this.queryService.generateVisibilityQuery(query, { id: userId });
+
+ const clipNotes = await query.getMany() as (MiClipNote & { note: MiNote & { user: MiUser } })[];
if (clipNotes.length === 0) {
break;
@@ -158,6 +161,11 @@ export class ExportClipsProcessorService {
cursor = clipNotes.at(-1)?.id ?? null;
for (const clipNote of clipNotes) {
+ const noteCreatedAt = this.idService.parse(clipNote.note.id).date;
+ if (shouldHideNoteByTime(clipNote.note.user.makeNotesHiddenBefore, noteCreatedAt)) {
+ continue;
+ }
+
let poll: MiPoll | undefined;
if (clipNote.note.hasPoll) {
poll = await this.pollsRepository.findOneByOrFail({ noteId: clipNote.note.id });
diff --git a/packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts b/packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts
index 7918c8ccb5..87a8ded307 100644
--- a/packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts
+++ b/packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts
@@ -5,7 +5,6 @@
import * as fs from 'node:fs';
import { Inject, Injectable } from '@nestjs/common';
-import { MoreThan } from 'typeorm';
import { format as dateFormat } from 'date-fns';
import { DI } from '@/di-symbols.js';
import type { MiNoteFavorite, NoteFavoritesRepository, PollsRepository, MiUser, UsersRepository } from '@/models/_.js';
@@ -17,6 +16,8 @@ import type { MiNote } from '@/models/Note.js';
import { bindThis } from '@/decorators.js';
import { IdService } from '@/core/IdService.js';
import { NotificationService } from '@/core/NotificationService.js';
+import { QueryService } from '@/core/QueryService.js';
+import { shouldHideNoteByTime } from '@/misc/should-hide-note-by-time.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
import type * as Bull from 'bullmq';
import type { DbJobDataWithUser } from '../types.js';
@@ -37,6 +38,7 @@ export class ExportFavoritesProcessorService {
private driveService: DriveService,
private queueLoggerService: QueueLoggerService,
+ private queryService: QueryService,
private idService: IdService,
private notificationService: NotificationService,
) {
@@ -83,17 +85,20 @@ export class ExportFavoritesProcessorService {
});
while (true) {
- const favorites = await this.noteFavoritesRepository.find({
- where: {
- userId: user.id,
- ...(cursor ? { id: MoreThan(cursor) } : {}),
- },
- take: 100,
- order: {
- id: 1,
- },
- relations: ['note', 'note.user'],
- }) as (MiNoteFavorite & { note: MiNote & { user: MiUser } })[];
+ const query = this.noteFavoritesRepository.createQueryBuilder('favorite')
+ .leftJoinAndSelect('favorite.note', 'note')
+ .leftJoinAndSelect('note.user', 'user')
+ .where('favorite.userId = :userId', { userId: user.id })
+ .orderBy('favorite.id', 'ASC')
+ .take(100);
+
+ if (cursor) {
+ query.andWhere('favorite.id > :cursor', { cursor });
+ }
+
+ this.queryService.generateVisibilityQuery(query, { id: user.id });
+
+ const favorites = await query.getMany() as (MiNoteFavorite & { note: MiNote & { user: MiUser } })[];
if (favorites.length === 0) {
job.updateProgress(100);
@@ -103,6 +108,11 @@ export class ExportFavoritesProcessorService {
cursor = favorites.at(-1)?.id ?? null;
for (const favorite of favorites) {
+ const noteCreatedAt = this.idService.parse(favorite.note.id).date;
+ if (shouldHideNoteByTime(favorite.note.user.makeNotesHiddenBefore, noteCreatedAt)) {
+ continue;
+ }
+
let poll: MiPoll | undefined;
if (favorite.note.hasPoll) {
poll = await this.pollsRepository.findOneByOrFail({ noteId: favorite.note.id });