summaryrefslogtreecommitdiff
path: root/packages/backend/src/core
diff options
context:
space:
mode:
authortamaina <tamaina@hotmail.co.jp>2023-07-05 12:15:48 +0900
committerGitHub <noreply@github.com>2023-07-05 12:15:48 +0900
commit22227fa641f80240ab5f134007067d1b1048095a (patch)
tree31e3cb94c364d4c6172f895a8764f4ae310c4056 /packages/backend/src/core
parentenhance(frontend): Better Timeline(MkPagination) Experience (#11066) (diff)
downloadsharkey-22227fa641f80240ab5f134007067d1b1048095a.tar.gz
sharkey-22227fa641f80240ab5f134007067d1b1048095a.tar.bz2
sharkey-22227fa641f80240ab5f134007067d1b1048095a.zip
perf(backend): Use addBulk to add deliver queues (#11114)
Diffstat (limited to 'packages/backend/src/core')
-rw-r--r--packages/backend/src/core/QueueService.ts36
-rw-r--r--packages/backend/src/core/activitypub/ApDeliverManagerService.ts19
2 files changed, 45 insertions, 10 deletions
diff --git a/packages/backend/src/core/QueueService.ts b/packages/backend/src/core/QueueService.ts
index 5b7359074e..48ff00c8ce 100644
--- a/packages/backend/src/core/QueueService.ts
+++ b/packages/backend/src/core/QueueService.ts
@@ -8,7 +8,7 @@ import { DI } from '@/di-symbols.js';
import { bindThis } from '@/decorators.js';
import type { Antenna } from '@/server/api/endpoints/i/import-antennas.js';
import type { DbQueue, DeliverQueue, EndedPollNotificationQueue, InboxQueue, ObjectStorageQueue, RelationshipQueue, SystemQueue, WebhookDeliverQueue } from './QueueModule.js';
-import type { DbJobData, RelationshipJobData, ThinUser } from '../queue/types.js';
+import type { DbJobData, DeliverJobData, RelationshipJobData, ThinUser } from '../queue/types.js';
import type httpSignature from '@peertube/http-signature';
import type * as Bull from 'bullmq';
@@ -69,7 +69,7 @@ export class QueueService {
if (content == null) return null;
if (to == null) return null;
- const data = {
+ const data: DeliverJobData = {
user: {
id: user.id,
},
@@ -88,6 +88,38 @@ export class QueueService {
});
}
+ /**
+ * ApDeliverManager-DeliverManager.execute()からinboxesを突っ込んでaddBulkしたい
+ * @param user `{ id: string; }` この関数ではThinUserに変換しないので前もって変換してください
+ * @param content IActivity | null
+ * @param inboxes `Map<string, boolean>` / key: to (inbox url), value: isSharedInbox (whether it is sharedInbox)
+ * @returns void
+ */
+ @bindThis
+ public async deliverMany(user: ThinUser, content: IActivity | null, inboxes: Map<string, boolean>) {
+ const opts = {
+ attempts: this.config.deliverJobMaxAttempts ?? 12,
+ backoff: {
+ type: 'custom',
+ },
+ removeOnComplete: true,
+ removeOnFail: true,
+ };
+
+ await this.deliverQueue.addBulk(Array.from(inboxes.entries()).map(d => ({
+ name: d[0],
+ data: {
+ user,
+ content,
+ to: d[0],
+ isSharedInbox: d[1],
+ } as DeliverJobData,
+ opts,
+ })));
+
+ return;
+ }
+
@bindThis
public inbox(activity: IActivity, signature: httpSignature.IParsedSignature) {
const data = {
diff --git a/packages/backend/src/core/activitypub/ApDeliverManagerService.ts b/packages/backend/src/core/activitypub/ApDeliverManagerService.ts
index 62a2a33a19..66e7761187 100644
--- a/packages/backend/src/core/activitypub/ApDeliverManagerService.ts
+++ b/packages/backend/src/core/activitypub/ApDeliverManagerService.ts
@@ -7,6 +7,7 @@ import type { LocalUser, RemoteUser, User } from '@/models/entities/User.js';
import { QueueService } from '@/core/QueueService.js';
import { UserEntityService } from '@/core/entities/UserEntityService.js';
import { bindThis } from '@/decorators.js';
+import { ThinUser } from '@/queue/types.js';
interface IRecipe {
type: string;
@@ -94,7 +95,7 @@ export class ApDeliverManagerService {
}
class DeliverManager {
- private actor: { id: User['id']; host: null; };
+ private actor: ThinUser;
private activity: any;
private recipes: IRecipe[] = [];
@@ -111,7 +112,13 @@ class DeliverManager {
actor: { id: User['id']; host: null; },
activity: any,
) {
- this.actor = actor;
+ // 型で弾いてはいるが一応ローカルユーザーかチェック
+ if (actor.host != null) throw new Error('actor.host must be null');
+
+ // パフォーマンス向上のためキューに突っ込むのはidのみに絞る
+ this.actor = {
+ id: actor.id,
+ };
this.activity = activity;
}
@@ -155,9 +162,8 @@ class DeliverManager {
*/
@bindThis
public async execute() {
- if (!this.userEntityService.isLocalUser(this.actor)) return;
-
// The value flags whether it is shared or not.
+ // key: inbox URL, value: whether it is sharedInbox
const inboxes = new Map<string, boolean>();
/*
@@ -201,9 +207,6 @@ class DeliverManager {
.forEach(recipe => inboxes.set(recipe.to.inbox!, false));
// deliver
- for (const inbox of inboxes) {
- // inbox[0]: inbox, inbox[1]: whether it is sharedInbox
- this.queueService.deliver(this.actor, this.activity, inbox[0], inbox[1]);
- }
+ this.queueService.deliverMany(this.actor, this.activity, inboxes);
}
}