summaryrefslogtreecommitdiff
path: root/packages/backend/src/core/QueueService.ts
diff options
context:
space:
mode:
authorNamekuji <11836635+nmkj-io@users.noreply.github.com>2023-04-11 20:13:58 -0400
committerGitHub <noreply@github.com>2023-04-12 09:13:58 +0900
commitda833222005ddf712db2c40bcf3848874f7ee3f2 (patch)
treea8b45580a94a212fa5f05e056fcc59c9db7a7b73 /packages/backend/src/core/QueueService.ts
parentUpdate CHANGELOG.md (#10591) (diff)
downloadsharkey-da833222005ddf712db2c40bcf3848874f7ee3f2.tar.gz
sharkey-da833222005ddf712db2c40bcf3848874f7ee3f2.tar.bz2
sharkey-da833222005ddf712db2c40bcf3848874f7ee3f2.zip
feat: queueing bulk follow/unfollow and block/unblock (#10544)
* wrap follow/unfollow and block/unblock as job queue * create import job to follow in each iteration * make relationship jobs concurrent * replace to job queue if called repeatedly * use addBulk to import * omit stream when importing * fix job caller * use ThinUser instead of User to reduce redis memory consumption * createImportFollowingToDbJobの呼び出し方を変える, 型補強 * Force ThinUser * オブジェクト操作のみのメソッド名はgenerate...Data * Force ThinUser in generateRelationshipJobData * silent bulk unfollow at admin api endpoint --------- Co-authored-by: tamaina <tamaina@hotmail.co.jp>
Diffstat (limited to 'packages/backend/src/core/QueueService.ts')
-rw-r--r--packages/backend/src/core/QueueService.ts113
1 files changed, 94 insertions, 19 deletions
diff --git a/packages/backend/src/core/QueueService.ts b/packages/backend/src/core/QueueService.ts
index 498ceced7a..375ac49911 100644
--- a/packages/backend/src/core/QueueService.ts
+++ b/packages/backend/src/core/QueueService.ts
@@ -6,9 +6,10 @@ import type { Webhook, webhookEventTypes } from '@/models/entities/Webhook.js';
import type { Config } from '@/config.js';
import { DI } from '@/di-symbols.js';
import { bindThis } from '@/decorators.js';
-import type { DbQueue, DeliverQueue, EndedPollNotificationQueue, InboxQueue, ObjectStorageQueue, SystemQueue, WebhookDeliverQueue } from './QueueModule.js';
-import type { ThinUser } from '../queue/types.js';
+import type { DbQueue, DeliverQueue, EndedPollNotificationQueue, InboxQueue, ObjectStorageQueue, RelationshipQueue, SystemQueue, WebhookDeliverQueue } from './QueueModule.js';
+import type { DbJobData, RelationshipJobData, ThinUser } from '../queue/types.js';
import type httpSignature from '@peertube/http-signature';
+import Bull from 'bull';
@Injectable()
export class QueueService {
@@ -21,6 +22,7 @@ export class QueueService {
@Inject('queue:deliver') public deliverQueue: DeliverQueue,
@Inject('queue:inbox') public inboxQueue: InboxQueue,
@Inject('queue:db') public dbQueue: DbQueue,
+ @Inject('queue:relationship') public relationshipQueue: RelationshipQueue,
@Inject('queue:objectStorage') public objectStorageQueue: ObjectStorageQueue,
@Inject('queue:webhookDeliver') public webhookDeliverQueue: WebhookDeliverQueue,
) {}
@@ -56,7 +58,7 @@ export class QueueService {
activity: activity,
signature,
};
-
+
return this.inboxQueue.add(data, {
attempts: this.config.inboxJobMaxAttempts ?? 8,
timeout: 5 * 60 * 1000, // 5min
@@ -71,7 +73,7 @@ export class QueueService {
@bindThis
public createDeleteDriveFilesJob(user: ThinUser) {
return this.dbQueue.add('deleteDriveFiles', {
- user: user,
+ user: { id: user.id },
}, {
removeOnComplete: true,
removeOnFail: true,
@@ -81,7 +83,7 @@ export class QueueService {
@bindThis
public createExportCustomEmojisJob(user: ThinUser) {
return this.dbQueue.add('exportCustomEmojis', {
- user: user,
+ user: { id: user.id },
}, {
removeOnComplete: true,
removeOnFail: true,
@@ -91,7 +93,7 @@ export class QueueService {
@bindThis
public createExportNotesJob(user: ThinUser) {
return this.dbQueue.add('exportNotes', {
- user: user,
+ user: { id: user.id },
}, {
removeOnComplete: true,
removeOnFail: true,
@@ -101,7 +103,7 @@ export class QueueService {
@bindThis
public createExportFavoritesJob(user: ThinUser) {
return this.dbQueue.add('exportFavorites', {
- user: user,
+ user: { id: user.id },
}, {
removeOnComplete: true,
removeOnFail: true,
@@ -111,7 +113,7 @@ export class QueueService {
@bindThis
public createExportFollowingJob(user: ThinUser, excludeMuting = false, excludeInactive = false) {
return this.dbQueue.add('exportFollowing', {
- user: user,
+ user: { id: user.id },
excludeMuting,
excludeInactive,
}, {
@@ -123,7 +125,7 @@ export class QueueService {
@bindThis
public createExportMuteJob(user: ThinUser) {
return this.dbQueue.add('exportMuting', {
- user: user,
+ user: { id: user.id },
}, {
removeOnComplete: true,
removeOnFail: true,
@@ -133,7 +135,7 @@ export class QueueService {
@bindThis
public createExportBlockingJob(user: ThinUser) {
return this.dbQueue.add('exportBlocking', {
- user: user,
+ user: { id: user.id },
}, {
removeOnComplete: true,
removeOnFail: true,
@@ -143,7 +145,7 @@ export class QueueService {
@bindThis
public createExportUserListsJob(user: ThinUser) {
return this.dbQueue.add('exportUserLists', {
- user: user,
+ user: { id: user.id },
}, {
removeOnComplete: true,
removeOnFail: true,
@@ -153,7 +155,7 @@ export class QueueService {
@bindThis
public createImportFollowingJob(user: ThinUser, fileId: DriveFile['id']) {
return this.dbQueue.add('importFollowing', {
- user: user,
+ user: { id: user.id },
fileId: fileId,
}, {
removeOnComplete: true,
@@ -162,9 +164,15 @@ export class QueueService {
}
@bindThis
+ public createImportFollowingToDbJob(user: ThinUser, targets: string[]) {
+ const jobs = targets.map(rel => this.generateToDbJobData('importFollowingToDb', { user, target: rel }));
+ return this.dbQueue.addBulk(jobs);
+ }
+
+ @bindThis
public createImportMutingJob(user: ThinUser, fileId: DriveFile['id']) {
return this.dbQueue.add('importMuting', {
- user: user,
+ user: { id: user.id },
fileId: fileId,
}, {
removeOnComplete: true,
@@ -175,7 +183,7 @@ export class QueueService {
@bindThis
public createImportBlockingJob(user: ThinUser, fileId: DriveFile['id']) {
return this.dbQueue.add('importBlocking', {
- user: user,
+ user: { id: user.id },
fileId: fileId,
}, {
removeOnComplete: true,
@@ -184,9 +192,31 @@ export class QueueService {
}
@bindThis
+ public createImportBlockingToDbJob(user: ThinUser, targets: string[]) {
+ const jobs = targets.map(rel => this.generateToDbJobData('importBlockingToDb', { user, target: rel }));
+ return this.dbQueue.addBulk(jobs);
+ }
+
+ @bindThis
+ private generateToDbJobData<T extends 'importFollowingToDb' | 'importBlockingToDb', D extends DbJobData<T>>(name: T, data: D): {
+ name: string,
+ data: D,
+ opts: Bull.JobOptions,
+ } {
+ return {
+ name,
+ data,
+ opts: {
+ removeOnComplete: true,
+ removeOnFail: true,
+ },
+ };
+ }
+
+ @bindThis
public createImportUserListsJob(user: ThinUser, fileId: DriveFile['id']) {
return this.dbQueue.add('importUserLists', {
- user: user,
+ user: { id: user.id },
fileId: fileId,
}, {
removeOnComplete: true,
@@ -197,7 +227,7 @@ export class QueueService {
@bindThis
public createImportCustomEmojisJob(user: ThinUser, fileId: DriveFile['id']) {
return this.dbQueue.add('importCustomEmojis', {
- user: user,
+ user: { id: user.id },
fileId: fileId,
}, {
removeOnComplete: true,
@@ -208,7 +238,7 @@ export class QueueService {
@bindThis
public createDeleteAccountJob(user: ThinUser, opts: { soft?: boolean; } = {}) {
return this.dbQueue.add('deleteAccount', {
- user: user,
+ user: { id: user.id },
soft: opts.soft,
}, {
removeOnComplete: true,
@@ -217,6 +247,51 @@ export class QueueService {
}
@bindThis
+ public createFollowJob(followings: { from: ThinUser, to: ThinUser, requestId?: string, silent?: boolean }[]) {
+ const jobs = followings.map(rel => this.generateRelationshipJobData('follow', rel));
+ return this.relationshipQueue.addBulk(jobs);
+ }
+
+ @bindThis
+ public createUnfollowJob(followings: { from: ThinUser, to: ThinUser, requestId?: string }[]) {
+ const jobs = followings.map(rel => this.generateRelationshipJobData('unfollow', rel));
+ return this.relationshipQueue.addBulk(jobs);
+ }
+
+ @bindThis
+ public createBlockJob(blockings: { from: ThinUser, to: ThinUser, silent?: boolean }[]) {
+ const jobs = blockings.map(rel => this.generateRelationshipJobData('block', rel));
+ return this.relationshipQueue.addBulk(jobs);
+ }
+
+ @bindThis
+ public createUnblockJob(blockings: { from: ThinUser, to: ThinUser, silent?: boolean }[]) {
+ const jobs = blockings.map(rel => this.generateRelationshipJobData('unblock', rel));
+ return this.relationshipQueue.addBulk(jobs);
+ }
+
+ @bindThis
+ private generateRelationshipJobData(name: 'follow' | 'unfollow' | 'block' | 'unblock', data: RelationshipJobData): {
+ name: string,
+ data: RelationshipJobData,
+ opts: Bull.JobOptions,
+ } {
+ return {
+ name,
+ data: {
+ from: { id: data.from.id },
+ to: { id: data.to.id },
+ silent: data.silent,
+ requestId: data.requestId,
+ },
+ opts: {
+ removeOnComplete: true,
+ removeOnFail: true,
+ },
+ };
+ }
+
+ @bindThis
public createDeleteObjectStorageFileJob(key: string) {
return this.objectStorageQueue.add('deleteFile', {
key: key,
@@ -246,7 +321,7 @@ export class QueueService {
createdAt: Date.now(),
eventId: uuid(),
};
-
+
return this.webhookDeliverQueue.add(data, {
attempts: 4,
timeout: 1 * 60 * 1000, // 1min
@@ -264,7 +339,7 @@ export class QueueService {
//deliverLogger.succ(`Cleaned ${jobs.length} ${status} jobs`);
});
this.deliverQueue.clean(0, 'delayed');
-
+
this.inboxQueue.once('cleaned', (jobs, status) => {
//inboxLogger.succ(`Cleaned ${jobs.length} ${status} jobs`);
});