summaryrefslogtreecommitdiff
path: root/packages/backend/src/queue
diff options
context:
space:
mode:
Diffstat (limited to 'packages/backend/src/queue')
-rw-r--r--packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts2
-rw-r--r--packages/backend/src/queue/processors/DeliverProcessorService.ts14
-rw-r--r--packages/backend/src/queue/processors/ExportAntennasProcessorService.ts2
-rw-r--r--packages/backend/src/queue/processors/ImportAntennasProcessorService.ts10
-rw-r--r--packages/backend/src/queue/processors/ImportUserListsProcessorService.ts4
-rw-r--r--packages/backend/src/queue/processors/InboxProcessorService.ts59
6 files changed, 66 insertions, 25 deletions
diff --git a/packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts b/packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts
index 917de8b72c..728fc9e72b 100644
--- a/packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts
+++ b/packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts
@@ -63,7 +63,7 @@ export class CleanRemoteFilesProcessorService {
isLink: false,
});
- job.updateProgress(deletedCount / total);
+ job.updateProgress(100 / total * deletedCount);
}
this.logger.succ('All cached remote files has been deleted.');
diff --git a/packages/backend/src/queue/processors/DeliverProcessorService.ts b/packages/backend/src/queue/processors/DeliverProcessorService.ts
index 5fed070929..b73195afc3 100644
--- a/packages/backend/src/queue/processors/DeliverProcessorService.ts
+++ b/packages/backend/src/queue/processors/DeliverProcessorService.ts
@@ -5,6 +5,7 @@
import { Inject, Injectable } from '@nestjs/common';
import * as Bull from 'bullmq';
+import { Not } from 'typeorm';
import { DI } from '@/di-symbols.js';
import type { InstancesRepository } from '@/models/_.js';
import type Logger from '@/logger.js';
@@ -62,7 +63,7 @@ export class DeliverProcessorService {
if (suspendedHosts == null) {
suspendedHosts = await this.instancesRepository.find({
where: {
- isSuspended: true,
+ suspensionState: Not('none'),
},
});
this.suspendedHostsCache.set(suspendedHosts);
@@ -79,6 +80,7 @@ export class DeliverProcessorService {
if (i.isNotResponding) {
this.federatedInstanceService.update(i.id, {
isNotResponding: false,
+ notRespondingSince: null,
});
}
@@ -98,7 +100,15 @@ export class DeliverProcessorService {
if (!i.isNotResponding) {
this.federatedInstanceService.update(i.id, {
isNotResponding: true,
+ notRespondingSince: new Date(),
});
+ } else if (i.notRespondingSince) {
+ // 1週間以上不通ならサスペンド
+ if (i.suspensionState === 'none' && i.notRespondingSince.getTime() <= Date.now() - 1000 * 60 * 60 * 24 * 7) {
+ this.federatedInstanceService.update(i.id, {
+ suspensionState: 'autoSuspendedForNotResponding',
+ });
+ }
}
this.apRequestChart.deliverFail();
@@ -116,7 +126,7 @@ export class DeliverProcessorService {
if (job.data.isSharedInbox && res.statusCode === 410) {
this.federatedInstanceService.fetch(host).then(i => {
this.federatedInstanceService.update(i.id, {
- isSuspended: true,
+ suspensionState: 'goneSuspended',
});
});
throw new Bull.UnrecoverableError(`${host} is gone`);
diff --git a/packages/backend/src/queue/processors/ExportAntennasProcessorService.ts b/packages/backend/src/queue/processors/ExportAntennasProcessorService.ts
index af48bad417..88c4ea29c0 100644
--- a/packages/backend/src/queue/processors/ExportAntennasProcessorService.ts
+++ b/packages/backend/src/queue/processors/ExportAntennasProcessorService.ts
@@ -81,9 +81,9 @@ export class ExportAntennasProcessorService {
}) : null,
caseSensitive: antenna.caseSensitive,
localOnly: antenna.localOnly,
+ excludeBots: antenna.excludeBots,
withReplies: antenna.withReplies,
withFile: antenna.withFile,
- notify: antenna.notify,
}));
if (antennas.length - 1 !== index) {
write(', ');
diff --git a/packages/backend/src/queue/processors/ImportAntennasProcessorService.ts b/packages/backend/src/queue/processors/ImportAntennasProcessorService.ts
index 951b560597..9c033b73e2 100644
--- a/packages/backend/src/queue/processors/ImportAntennasProcessorService.ts
+++ b/packages/backend/src/queue/processors/ImportAntennasProcessorService.ts
@@ -44,11 +44,11 @@ const validate = new Ajv().compile({
} },
caseSensitive: { type: 'boolean' },
localOnly: { type: 'boolean' },
+ excludeBots: { type: 'boolean' },
withReplies: { type: 'boolean' },
withFile: { type: 'boolean' },
- notify: { type: 'boolean' },
},
- required: ['name', 'src', 'keywords', 'excludeKeywords', 'users', 'caseSensitive', 'withReplies', 'withFile', 'notify'],
+ required: ['name', 'src', 'keywords', 'excludeKeywords', 'users', 'caseSensitive', 'withReplies', 'withFile'],
});
@Injectable()
@@ -76,7 +76,7 @@ export class ImportAntennasProcessorService {
this.logger.warn('Validation Failed');
continue;
}
- const result = await this.antennasRepository.insert({
+ const result = await this.antennasRepository.insertOne({
id: this.idService.gen(now.getTime()),
lastUsedAt: now,
userId: job.data.user.id,
@@ -88,10 +88,10 @@ export class ImportAntennasProcessorService {
users: (antenna.src === 'list' && antenna.userListAccts !== null ? antenna.userListAccts : antenna.users).filter(Boolean),
caseSensitive: antenna.caseSensitive,
localOnly: antenna.localOnly,
+ excludeBots: antenna.excludeBots,
withReplies: antenna.withReplies,
withFile: antenna.withFile,
- notify: antenna.notify,
- }).then(x => this.antennasRepository.findOneByOrFail(x.identifiers[0]));
+ });
this.logger.succ('Antenna created: ' + result.id);
this.globalEventService.publishInternalEvent('antennaCreated', result);
}
diff --git a/packages/backend/src/queue/processors/ImportUserListsProcessorService.ts b/packages/backend/src/queue/processors/ImportUserListsProcessorService.ts
index a5992c28c8..db9255b35d 100644
--- a/packages/backend/src/queue/processors/ImportUserListsProcessorService.ts
+++ b/packages/backend/src/queue/processors/ImportUserListsProcessorService.ts
@@ -79,11 +79,11 @@ export class ImportUserListsProcessorService {
});
if (list == null) {
- list = await this.userListsRepository.insert({
+ list = await this.userListsRepository.insertOne({
id: this.idService.gen(),
userId: user.id,
name: listName,
- }).then(x => this.userListsRepository.findOneByOrFail(x.identifiers[0]));
+ });
}
let target = this.utilityService.isSelfHost(host!) ? await this.usersRepository.findOneBy({
diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts
index 3addead058..fa7009f8f5 100644
--- a/packages/backend/src/queue/processors/InboxProcessorService.ts
+++ b/packages/backend/src/queue/processors/InboxProcessorService.ts
@@ -15,13 +15,14 @@ import InstanceChart from '@/core/chart/charts/instance.js';
import ApRequestChart from '@/core/chart/charts/ap-request.js';
import FederationChart from '@/core/chart/charts/federation.js';
import { getApId } from '@/core/activitypub/type.js';
+import type { IActivity } from '@/core/activitypub/type.js';
import type { MiRemoteUser } from '@/models/User.js';
import type { MiUserPublickey } from '@/models/UserPublickey.js';
import { ApDbResolverService } from '@/core/activitypub/ApDbResolverService.js';
import { StatusError } from '@/misc/status-error.js';
import { UtilityService } from '@/core/UtilityService.js';
import { ApPersonService } from '@/core/activitypub/models/ApPersonService.js';
-import { LdSignatureService } from '@/core/activitypub/LdSignatureService.js';
+import { JsonLdService } from '@/core/activitypub/JsonLdService.js';
import { ApInboxService } from '@/core/activitypub/ApInboxService.js';
import { bindThis } from '@/decorators.js';
import { IdentifiableError } from '@/misc/identifiable-error.js';
@@ -38,7 +39,7 @@ export class InboxProcessorService {
private apInboxService: ApInboxService,
private federatedInstanceService: FederatedInstanceService,
private fetchInstanceMetadataService: FetchInstanceMetadataService,
- private ldSignatureService: LdSignatureService,
+ private jsonLdService: JsonLdService,
private apPersonService: ApPersonService,
private apDbResolverService: ApDbResolverService,
private instanceChart: InstanceChart,
@@ -52,7 +53,7 @@ export class InboxProcessorService {
@bindThis
public async process(job: Bull.Job<InboxJobData>): Promise<string> {
const signature = job.data.signature; // HTTP-signature
- const activity = job.data.activity;
+ let activity = job.data.activity;
//#region Log
const info = Object.assign({}, activity);
@@ -110,20 +111,21 @@ export class InboxProcessorService {
// また、signatureのsignerは、activity.actorと一致する必要がある
if (!httpSignatureValidated || authUser.user.uri !== activity.actor) {
// 一致しなくても、でもLD-Signatureがありそうならそっちも見る
- if (activity.signature) {
- if (activity.signature.type !== 'RsaSignature2017') {
- throw new Bull.UnrecoverableError(`skip: unsupported LD-signature type ${activity.signature.type}`);
+ const ldSignature = activity.signature;
+ if (ldSignature) {
+ if (ldSignature.type !== 'RsaSignature2017') {
+ throw new Bull.UnrecoverableError(`skip: unsupported LD-signature type ${ldSignature.type}`);
}
- // activity.signature.creator: https://example.oom/users/user#main-key
+ // ldSignature.creator: https://example.oom/users/user#main-key
// みたいになっててUserを引っ張れば公開キーも入ることを期待する
- if (activity.signature.creator) {
- const candicate = activity.signature.creator.replace(/#.*/, '');
+ if (ldSignature.creator) {
+ const candicate = ldSignature.creator.replace(/#.*/, '');
await this.apPersonService.resolvePerson(candicate).catch(() => null);
}
// keyIdからLD-Signatureのユーザーを取得
- authUser = await this.apDbResolverService.getAuthUserFromKeyId(activity.signature.creator);
+ authUser = await this.apDbResolverService.getAuthUserFromKeyId(ldSignature.creator);
if (authUser == null) {
throw new Bull.UnrecoverableError('skip: LD-Signatureのユーザーが取得できませんでした');
}
@@ -132,13 +134,31 @@ export class InboxProcessorService {
throw new Bull.UnrecoverableError('skip: LD-SignatureのユーザーはpublicKeyを持っていませんでした');
}
+ const jsonLd = this.jsonLdService.use();
+
// LD-Signature検証
- const ldSignature = this.ldSignatureService.use();
- const verified = await ldSignature.verifyRsaSignature2017(activity, authUser.key.keyPem).catch(() => false);
+ const verified = await jsonLd.verifyRsaSignature2017(activity, authUser.key.keyPem).catch(() => false);
if (!verified) {
throw new Bull.UnrecoverableError('skip: LD-Signatureの検証に失敗しました');
}
+ // アクティビティを正規化
+ delete activity.signature;
+ try {
+ activity = await jsonLd.compact(activity) as IActivity;
+ } catch (e) {
+ throw new Bull.UnrecoverableError(`skip: failed to compact activity: ${e}`);
+ }
+ // TODO: 元のアクティビティと非互換な形に正規化される場合は転送をスキップする
+ // https://github.com/mastodon/mastodon/blob/664b0ca/app/services/activitypub/process_collection_service.rb#L24-L29
+ activity.signature = ldSignature;
+
+ //#region Log
+ const compactedInfo = Object.assign({}, activity);
+ delete compactedInfo['@context'];
+ this.logger.debug(`compacted: ${JSON.stringify(compactedInfo, null, 2)}`);
+ //#endregion
+
// もう一度actorチェック
if (authUser.user.uri !== activity.actor) {
throw new Bull.UnrecoverableError(`skip: LD-Signature user(${authUser.user.uri}) !== activity.actor(${activity.actor})`);
@@ -168,6 +188,8 @@ export class InboxProcessorService {
this.federatedInstanceService.update(i.id, {
latestRequestReceivedAt: new Date(),
isNotResponding: false,
+ // もしサーバーが死んでるために配信が止まっていた場合には自動的に復活させてあげる
+ suspensionState: i.suspensionState === 'autoSuspendedForNotResponding' ? 'none' : undefined,
});
this.fetchInstanceMetadataService.fetchInstanceMetadata(i);
@@ -182,13 +204,22 @@ export class InboxProcessorService {
// アクティビティを処理
try {
- await this.apInboxService.performActivity(authUser.user, activity);
+ const result = await this.apInboxService.performActivity(authUser.user, activity);
+ if (result && !result.startsWith('ok')) {
+ this.logger.warn(`inbox activity ignored (maybe): id=${activity.id} reason=${result}`);
+ return result;
+ }
} catch (e) {
if (e instanceof IdentifiableError) {
if (e.id === '689ee33f-f97c-479a-ac49-1b9f8140af99') {
return 'blocked notes with prohibited words';
}
- if (e.id === '85ab9bd7-3a41-4530-959d-f07073900109') return 'actor has been suspended';
+ if (e.id === '85ab9bd7-3a41-4530-959d-f07073900109') {
+ return 'actor has been suspended';
+ }
+ if (e.id === 'd450b8a9-48e4-4dab-ae36-f4db763fda7c') { // invalid Note
+ return e.message;
+ }
}
throw e;
}