From da833222005ddf712db2c40bcf3848874f7ee3f2 Mon Sep 17 00:00:00 2001 From: Namekuji <11836635+nmkj-io@users.noreply.github.com> Date: Tue, 11 Apr 2023 20:13:58 -0400 Subject: feat: queueing bulk follow/unfollow and block/unblock (#10544) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * wrap follow/unfollow and block/unblock as job queue * create import job to follow in each iteration * make relationship jobs concurrent * replace to job queue if called repeatedly * use addBulk to import * omit stream when importing * fix job caller * use ThinUser instead of User to reduce redis memory consumption * createImportFollowingToDbJobの呼び出し方を変える, 型補強 * Force ThinUser * オブジェクト操作のみのメソッド名はgenerate...Data * Force ThinUser in generateRelationshipJobData * silent bulk unfollow at admin api endpoint --------- Co-authored-by: tamaina --- .../queue/RelationshipQueueProcessorsService.ts | 26 ++++++++++++++++++++++ 1 file changed, 26 insertions(+) create mode 100644 packages/backend/src/queue/RelationshipQueueProcessorsService.ts (limited to 'packages/backend/src/queue/RelationshipQueueProcessorsService.ts') diff --git a/packages/backend/src/queue/RelationshipQueueProcessorsService.ts b/packages/backend/src/queue/RelationshipQueueProcessorsService.ts new file mode 100644 index 0000000000..af086fa4e7 --- /dev/null +++ b/packages/backend/src/queue/RelationshipQueueProcessorsService.ts @@ -0,0 +1,26 @@ +import { Inject, Injectable } from '@nestjs/common'; +import { bindThis } from '@/decorators.js'; +import { RelationshipProcessorService } from './processors/RelationshipProcessorService.js'; +import type Bull from 'bull'; +import { DI } from '@/di-symbols.js'; +import type { Config } from '@/config.js'; + +@Injectable() +export class RelationshipQueueProcessorsService { + constructor( + @Inject(DI.config) + private config: Config, + + private relationshipProcessorService: RelationshipProcessorService, + ) { + } + + @bindThis + public start(q: Bull.Queue): void { + const maxJobs = (this.config.deliverJobConcurrency ?? 128) / 4; // conservative? + q.process('follow', maxJobs, (job) => this.relationshipProcessorService.processFollow(job)); + q.process('unfollow', maxJobs, (job) => this.relationshipProcessorService.processUnfollow(job)); + q.process('block', maxJobs, (job) => this.relationshipProcessorService.processBlock(job)); + q.process('unblock', maxJobs, (job) => this.relationshipProcessorService.processUnblock(job)); + } +} -- cgit v1.2.3-freya