From da833222005ddf712db2c40bcf3848874f7ee3f2 Mon Sep 17 00:00:00 2001 From: Namekuji <11836635+nmkj-io@users.noreply.github.com> Date: Tue, 11 Apr 2023 20:13:58 -0400 Subject: feat: queueing bulk follow/unfollow and block/unblock (#10544) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * wrap follow/unfollow and block/unblock as job queue * create import job to follow in each iteration * make relationship jobs concurrent * replace to job queue if called repeatedly * use addBulk to import * omit stream when importing * fix job caller * use ThinUser instead of User to reduce redis memory consumption * createImportFollowingToDbJobの呼び出し方を変える, 型補強 * Force ThinUser * オブジェクト操作のみのメソッド名はgenerate...Data * Force ThinUser in generateRelationshipJobData * silent bulk unfollow at admin api endpoint --------- Co-authored-by: tamaina --- packages/backend/src/core/QueueModule.ts | 13 ++- packages/backend/src/core/QueueService.ts | 113 ++++++++++++++++++---- packages/backend/src/core/UserBlockingService.ts | 16 +-- packages/backend/src/core/UserFollowingService.ts | 9 +- packages/backend/src/core/UserListService.ts | 8 +- 5 files changed, 123 insertions(+), 36 deletions(-) (limited to 'packages/backend/src/core') diff --git a/packages/backend/src/core/QueueModule.ts b/packages/backend/src/core/QueueModule.ts index 8733a7d7eb..bac85d7a15 100644 --- a/packages/backend/src/core/QueueModule.ts +++ b/packages/backend/src/core/QueueModule.ts @@ -3,7 +3,7 @@ import Bull from 'bull'; import { DI } from '@/di-symbols.js'; import type { Config } from '@/config.js'; import type { Provider } from '@nestjs/common'; -import type { DeliverJobData, InboxJobData, DbJobData, ObjectStorageJobData, EndedPollNotificationJobData, WebhookDeliverJobData } from '../queue/types.js'; +import type { DeliverJobData, InboxJobData, DbJobData, ObjectStorageJobData, EndedPollNotificationJobData, WebhookDeliverJobData, RelationshipJobData, DbJobMap } from '../queue/types.js'; function q(config: Config, name: string, limitPerSec = -1) { return new Bull(name, { @@ -41,7 +41,8 @@ export type SystemQueue = Bull.Queue>; export type EndedPollNotificationQueue = Bull.Queue; export type DeliverQueue = Bull.Queue; export type InboxQueue = Bull.Queue; -export type DbQueue = Bull.Queue; +export type DbQueue = Bull.Queue>; +export type RelationshipQueue = Bull.Queue; export type ObjectStorageQueue = Bull.Queue; export type WebhookDeliverQueue = Bull.Queue; @@ -75,6 +76,12 @@ const $db: Provider = { inject: [DI.config], }; +const $relationship: Provider = { + provide: 'queue:relationship', + useFactory: (config: Config) => q(config, 'relationship'), + inject: [DI.config], +}; + const $objectStorage: Provider = { provide: 'queue:objectStorage', useFactory: (config: Config) => q(config, 'objectStorage'), @@ -96,6 +103,7 @@ const $webhookDeliver: Provider = { $deliver, $inbox, $db, + $relationship, $objectStorage, $webhookDeliver, ], @@ -105,6 +113,7 @@ const $webhookDeliver: Provider = { $deliver, $inbox, $db, + $relationship, $objectStorage, $webhookDeliver, ], diff --git a/packages/backend/src/core/QueueService.ts b/packages/backend/src/core/QueueService.ts index 498ceced7a..375ac49911 100644 --- a/packages/backend/src/core/QueueService.ts +++ b/packages/backend/src/core/QueueService.ts @@ -6,9 +6,10 @@ import type { Webhook, webhookEventTypes } from '@/models/entities/Webhook.js'; import type { Config } from '@/config.js'; import { DI } from '@/di-symbols.js'; import { bindThis } from '@/decorators.js'; -import type { DbQueue, DeliverQueue, EndedPollNotificationQueue, InboxQueue, ObjectStorageQueue, SystemQueue, WebhookDeliverQueue } from './QueueModule.js'; -import type { ThinUser } from '../queue/types.js'; +import type { DbQueue, DeliverQueue, EndedPollNotificationQueue, InboxQueue, ObjectStorageQueue, RelationshipQueue, SystemQueue, WebhookDeliverQueue } from './QueueModule.js'; +import type { DbJobData, RelationshipJobData, ThinUser } from '../queue/types.js'; import type httpSignature from '@peertube/http-signature'; +import Bull from 'bull'; @Injectable() export class QueueService { @@ -21,6 +22,7 @@ export class QueueService { @Inject('queue:deliver') public deliverQueue: DeliverQueue, @Inject('queue:inbox') public inboxQueue: InboxQueue, @Inject('queue:db') public dbQueue: DbQueue, + @Inject('queue:relationship') public relationshipQueue: RelationshipQueue, @Inject('queue:objectStorage') public objectStorageQueue: ObjectStorageQueue, @Inject('queue:webhookDeliver') public webhookDeliverQueue: WebhookDeliverQueue, ) {} @@ -56,7 +58,7 @@ export class QueueService { activity: activity, signature, }; - + return this.inboxQueue.add(data, { attempts: this.config.inboxJobMaxAttempts ?? 8, timeout: 5 * 60 * 1000, // 5min @@ -71,7 +73,7 @@ export class QueueService { @bindThis public createDeleteDriveFilesJob(user: ThinUser) { return this.dbQueue.add('deleteDriveFiles', { - user: user, + user: { id: user.id }, }, { removeOnComplete: true, removeOnFail: true, @@ -81,7 +83,7 @@ export class QueueService { @bindThis public createExportCustomEmojisJob(user: ThinUser) { return this.dbQueue.add('exportCustomEmojis', { - user: user, + user: { id: user.id }, }, { removeOnComplete: true, removeOnFail: true, @@ -91,7 +93,7 @@ export class QueueService { @bindThis public createExportNotesJob(user: ThinUser) { return this.dbQueue.add('exportNotes', { - user: user, + user: { id: user.id }, }, { removeOnComplete: true, removeOnFail: true, @@ -101,7 +103,7 @@ export class QueueService { @bindThis public createExportFavoritesJob(user: ThinUser) { return this.dbQueue.add('exportFavorites', { - user: user, + user: { id: user.id }, }, { removeOnComplete: true, removeOnFail: true, @@ -111,7 +113,7 @@ export class QueueService { @bindThis public createExportFollowingJob(user: ThinUser, excludeMuting = false, excludeInactive = false) { return this.dbQueue.add('exportFollowing', { - user: user, + user: { id: user.id }, excludeMuting, excludeInactive, }, { @@ -123,7 +125,7 @@ export class QueueService { @bindThis public createExportMuteJob(user: ThinUser) { return this.dbQueue.add('exportMuting', { - user: user, + user: { id: user.id }, }, { removeOnComplete: true, removeOnFail: true, @@ -133,7 +135,7 @@ export class QueueService { @bindThis public createExportBlockingJob(user: ThinUser) { return this.dbQueue.add('exportBlocking', { - user: user, + user: { id: user.id }, }, { removeOnComplete: true, removeOnFail: true, @@ -143,7 +145,7 @@ export class QueueService { @bindThis public createExportUserListsJob(user: ThinUser) { return this.dbQueue.add('exportUserLists', { - user: user, + user: { id: user.id }, }, { removeOnComplete: true, removeOnFail: true, @@ -153,7 +155,7 @@ export class QueueService { @bindThis public createImportFollowingJob(user: ThinUser, fileId: DriveFile['id']) { return this.dbQueue.add('importFollowing', { - user: user, + user: { id: user.id }, fileId: fileId, }, { removeOnComplete: true, @@ -161,10 +163,16 @@ export class QueueService { }); } + @bindThis + public createImportFollowingToDbJob(user: ThinUser, targets: string[]) { + const jobs = targets.map(rel => this.generateToDbJobData('importFollowingToDb', { user, target: rel })); + return this.dbQueue.addBulk(jobs); + } + @bindThis public createImportMutingJob(user: ThinUser, fileId: DriveFile['id']) { return this.dbQueue.add('importMuting', { - user: user, + user: { id: user.id }, fileId: fileId, }, { removeOnComplete: true, @@ -175,7 +183,7 @@ export class QueueService { @bindThis public createImportBlockingJob(user: ThinUser, fileId: DriveFile['id']) { return this.dbQueue.add('importBlocking', { - user: user, + user: { id: user.id }, fileId: fileId, }, { removeOnComplete: true, @@ -183,10 +191,32 @@ export class QueueService { }); } + @bindThis + public createImportBlockingToDbJob(user: ThinUser, targets: string[]) { + const jobs = targets.map(rel => this.generateToDbJobData('importBlockingToDb', { user, target: rel })); + return this.dbQueue.addBulk(jobs); + } + + @bindThis + private generateToDbJobData>(name: T, data: D): { + name: string, + data: D, + opts: Bull.JobOptions, + } { + return { + name, + data, + opts: { + removeOnComplete: true, + removeOnFail: true, + }, + }; + } + @bindThis public createImportUserListsJob(user: ThinUser, fileId: DriveFile['id']) { return this.dbQueue.add('importUserLists', { - user: user, + user: { id: user.id }, fileId: fileId, }, { removeOnComplete: true, @@ -197,7 +227,7 @@ export class QueueService { @bindThis public createImportCustomEmojisJob(user: ThinUser, fileId: DriveFile['id']) { return this.dbQueue.add('importCustomEmojis', { - user: user, + user: { id: user.id }, fileId: fileId, }, { removeOnComplete: true, @@ -208,7 +238,7 @@ export class QueueService { @bindThis public createDeleteAccountJob(user: ThinUser, opts: { soft?: boolean; } = {}) { return this.dbQueue.add('deleteAccount', { - user: user, + user: { id: user.id }, soft: opts.soft, }, { removeOnComplete: true, @@ -216,6 +246,51 @@ export class QueueService { }); } + @bindThis + public createFollowJob(followings: { from: ThinUser, to: ThinUser, requestId?: string, silent?: boolean }[]) { + const jobs = followings.map(rel => this.generateRelationshipJobData('follow', rel)); + return this.relationshipQueue.addBulk(jobs); + } + + @bindThis + public createUnfollowJob(followings: { from: ThinUser, to: ThinUser, requestId?: string }[]) { + const jobs = followings.map(rel => this.generateRelationshipJobData('unfollow', rel)); + return this.relationshipQueue.addBulk(jobs); + } + + @bindThis + public createBlockJob(blockings: { from: ThinUser, to: ThinUser, silent?: boolean }[]) { + const jobs = blockings.map(rel => this.generateRelationshipJobData('block', rel)); + return this.relationshipQueue.addBulk(jobs); + } + + @bindThis + public createUnblockJob(blockings: { from: ThinUser, to: ThinUser, silent?: boolean }[]) { + const jobs = blockings.map(rel => this.generateRelationshipJobData('unblock', rel)); + return this.relationshipQueue.addBulk(jobs); + } + + @bindThis + private generateRelationshipJobData(name: 'follow' | 'unfollow' | 'block' | 'unblock', data: RelationshipJobData): { + name: string, + data: RelationshipJobData, + opts: Bull.JobOptions, + } { + return { + name, + data: { + from: { id: data.from.id }, + to: { id: data.to.id }, + silent: data.silent, + requestId: data.requestId, + }, + opts: { + removeOnComplete: true, + removeOnFail: true, + }, + }; + } + @bindThis public createDeleteObjectStorageFileJob(key: string) { return this.objectStorageQueue.add('deleteFile', { @@ -246,7 +321,7 @@ export class QueueService { createdAt: Date.now(), eventId: uuid(), }; - + return this.webhookDeliverQueue.add(data, { attempts: 4, timeout: 1 * 60 * 1000, // 1min @@ -264,7 +339,7 @@ export class QueueService { //deliverLogger.succ(`Cleaned ${jobs.length} ${status} jobs`); }); this.deliverQueue.clean(0, 'delayed'); - + this.inboxQueue.once('cleaned', (jobs, status) => { //inboxLogger.succ(`Cleaned ${jobs.length} ${status} jobs`); }); diff --git a/packages/backend/src/core/UserBlockingService.ts b/packages/backend/src/core/UserBlockingService.ts index b3e306346e..3ca22f8bbc 100644 --- a/packages/backend/src/core/UserBlockingService.ts +++ b/packages/backend/src/core/UserBlockingService.ts @@ -24,7 +24,7 @@ export class UserBlockingService implements OnModuleInit { constructor( private moduleRef: ModuleRef, - + @Inject(DI.followRequestsRepository) private followRequestsRepository: FollowRequestsRepository, @@ -54,12 +54,12 @@ export class UserBlockingService implements OnModuleInit { } @bindThis - public async block(blocker: User, blockee: User) { + public async block(blocker: User, blockee: User, silent = false) { await Promise.all([ - this.cancelRequest(blocker, blockee), - this.cancelRequest(blockee, blocker), - this.userFollowingService.unfollow(blocker, blockee), - this.userFollowingService.unfollow(blockee, blocker), + this.cancelRequest(blocker, blockee, silent), + this.cancelRequest(blockee, blocker, silent), + this.userFollowingService.unfollow(blocker, blockee, silent), + this.userFollowingService.unfollow(blockee, blocker, silent), this.removeFromList(blockee, blocker), ]); @@ -89,7 +89,7 @@ export class UserBlockingService implements OnModuleInit { } @bindThis - private async cancelRequest(follower: User, followee: User) { + private async cancelRequest(follower: User, followee: User, silent = false) { const request = await this.followRequestsRepository.findOneBy({ followeeId: followee.id, followerId: follower.id, @@ -110,7 +110,7 @@ export class UserBlockingService implements OnModuleInit { }).then(packed => this.globalEventService.publishMainStream(followee.id, 'meUpdated', packed)); } - if (this.userEntityService.isLocalUser(follower)) { + if (this.userEntityService.isLocalUser(follower) && !silent) { this.userEntityService.pack(followee, follower, { detail: true, }).then(async packed => { diff --git a/packages/backend/src/core/UserFollowingService.ts b/packages/backend/src/core/UserFollowingService.ts index d7bb8f3920..dacaa7263a 100644 --- a/packages/backend/src/core/UserFollowingService.ts +++ b/packages/backend/src/core/UserFollowingService.ts @@ -43,7 +43,7 @@ export class UserFollowingService implements OnModuleInit { constructor( private moduleRef: ModuleRef, - + @Inject(DI.usersRepository) private usersRepository: UsersRepository, @@ -79,7 +79,7 @@ export class UserFollowingService implements OnModuleInit { } @bindThis - public async follow(_follower: { id: User['id'] }, _followee: { id: User['id'] }, requestId?: string): Promise { + public async follow(_follower: { id: User['id'] }, _followee: { id: User['id'] }, requestId?: string, silent = false): Promise { const [follower, followee] = await Promise.all([ this.usersRepository.findOneByOrFail({ id: _follower.id }), this.usersRepository.findOneByOrFail({ id: _followee.id }), @@ -139,7 +139,7 @@ export class UserFollowingService implements OnModuleInit { } } - await this.insertFollowingDoc(followee, follower); + await this.insertFollowingDoc(followee, follower, silent); if (this.userEntityService.isRemoteUser(follower) && this.userEntityService.isLocalUser(followee)) { const content = this.apRendererService.addContext(this.apRendererService.renderAccept(this.apRendererService.renderFollow(follower, followee, requestId), followee)); @@ -155,6 +155,7 @@ export class UserFollowingService implements OnModuleInit { follower: { id: User['id']; host: User['host']; uri: User['host']; inbox: User['inbox']; sharedInbox: User['sharedInbox'] }, + silent = false, ): Promise { if (follower.id === followee.id) return; @@ -233,7 +234,7 @@ export class UserFollowingService implements OnModuleInit { this.perUserFollowingChart.update(follower, followee, true); // Publish follow event - if (this.userEntityService.isLocalUser(follower)) { + if (this.userEntityService.isLocalUser(follower) && !silent) { this.userEntityService.pack(followee.id, follower, { detail: true, }).then(async packed => { diff --git a/packages/backend/src/core/UserListService.ts b/packages/backend/src/core/UserListService.ts index bc726a1feb..08cc907ebf 100644 --- a/packages/backend/src/core/UserListService.ts +++ b/packages/backend/src/core/UserListService.ts @@ -11,6 +11,7 @@ import { UserEntityService } from '@/core/entities/UserEntityService.js'; import { ProxyAccountService } from '@/core/ProxyAccountService.js'; import { bindThis } from '@/decorators.js'; import { RoleService } from '@/core/RoleService.js'; +import { QueueService } from '@/core/QueueService.js'; @Injectable() export class UserListService { @@ -29,6 +30,7 @@ export class UserListService { private roleService: RoleService, private globalEventService: GlobalEventService, private proxyAccountService: ProxyAccountService, + private queueService: QueueService, ) { } @@ -47,14 +49,14 @@ export class UserListService { userId: target.id, userListId: list.id, } as UserListJoining); - + this.globalEventService.publishUserListStream(list.id, 'userAdded', await this.userEntityService.pack(target)); - + // このインスタンス内にこのリモートユーザーをフォローしているユーザーがいなくても投稿を受け取るためにダミーのユーザーがフォローしたということにする if (this.userEntityService.isRemoteUser(target)) { const proxy = await this.proxyAccountService.fetch(); if (proxy) { - this.userFollowingService.follow(proxy, target); + this.queueService.createFollowJob([{ from: { id: proxy.id }, to: { id: target.id } }]); } } } -- cgit v1.2.3-freya