summaryrefslogtreecommitdiff
path: root/packages/backend/src/queue
diff options
context:
space:
mode:
authordakkar <dakkar@thenautilus.net>2024-06-18 14:25:04 +0000
committerdakkar <dakkar@thenautilus.net>2024-06-18 14:25:04 +0000
commitb7805adc85653d8d789728dfaaa6a7e80d1440b8 (patch)
tree0109695430a1e604e68ddc2b7137f1fa7e354e4c /packages/backend/src/queue
parentmerge: feat: add an option to collapse replies (!545) (diff)
parentmerge: merge up to 2024.5.0 (!537) (diff)
downloadsharkey-b7805adc85653d8d789728dfaaa6a7e80d1440b8.tar.gz
sharkey-b7805adc85653d8d789728dfaaa6a7e80d1440b8.tar.bz2
sharkey-b7805adc85653d8d789728dfaaa6a7e80d1440b8.zip
merge: prepare for 2024.5, 2nd try (!554)
View MR for information: https://activitypub.software/TransFem-org/Sharkey/-/merge_requests/554 Closes #494 Approved-by: Tess K <me@thvxl.se> Approved-by: Marie <marie@kaifa.ch>
Diffstat (limited to 'packages/backend/src/queue')
-rw-r--r--packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts1
-rw-r--r--packages/backend/src/queue/processors/DeliverProcessorService.ts14
-rw-r--r--packages/backend/src/queue/processors/ExportAccountDataProcessorService.ts16
-rw-r--r--packages/backend/src/queue/processors/ExportAntennasProcessorService.ts2
-rw-r--r--packages/backend/src/queue/processors/ImportAntennasProcessorService.ts6
-rw-r--r--packages/backend/src/queue/processors/ImportNotesProcessorService.ts5
-rw-r--r--packages/backend/src/queue/processors/InboxProcessorService.ts43
7 files changed, 59 insertions, 28 deletions
diff --git a/packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts b/packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts
index 4fa414b0b5..81842b221f 100644
--- a/packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts
+++ b/packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts
@@ -73,7 +73,6 @@ export class CleanRemoteFilesProcessorService {
});
await job.updateProgress(100 / total * deletedCount);
-
}
this.logger.succ(`All cached remote files processed. Total deleted: ${deletedCount}, Failed: ${errorCount}.`);
diff --git a/packages/backend/src/queue/processors/DeliverProcessorService.ts b/packages/backend/src/queue/processors/DeliverProcessorService.ts
index 5fed070929..b73195afc3 100644
--- a/packages/backend/src/queue/processors/DeliverProcessorService.ts
+++ b/packages/backend/src/queue/processors/DeliverProcessorService.ts
@@ -5,6 +5,7 @@
import { Inject, Injectable } from '@nestjs/common';
import * as Bull from 'bullmq';
+import { Not } from 'typeorm';
import { DI } from '@/di-symbols.js';
import type { InstancesRepository } from '@/models/_.js';
import type Logger from '@/logger.js';
@@ -62,7 +63,7 @@ export class DeliverProcessorService {
if (suspendedHosts == null) {
suspendedHosts = await this.instancesRepository.find({
where: {
- isSuspended: true,
+ suspensionState: Not('none'),
},
});
this.suspendedHostsCache.set(suspendedHosts);
@@ -79,6 +80,7 @@ export class DeliverProcessorService {
if (i.isNotResponding) {
this.federatedInstanceService.update(i.id, {
isNotResponding: false,
+ notRespondingSince: null,
});
}
@@ -98,7 +100,15 @@ export class DeliverProcessorService {
if (!i.isNotResponding) {
this.federatedInstanceService.update(i.id, {
isNotResponding: true,
+ notRespondingSince: new Date(),
});
+ } else if (i.notRespondingSince) {
+ // 1週間以上不通ならサスペンド
+ if (i.suspensionState === 'none' && i.notRespondingSince.getTime() <= Date.now() - 1000 * 60 * 60 * 24 * 7) {
+ this.federatedInstanceService.update(i.id, {
+ suspensionState: 'autoSuspendedForNotResponding',
+ });
+ }
}
this.apRequestChart.deliverFail();
@@ -116,7 +126,7 @@ export class DeliverProcessorService {
if (job.data.isSharedInbox && res.statusCode === 410) {
this.federatedInstanceService.fetch(host).then(i => {
this.federatedInstanceService.update(i.id, {
- isSuspended: true,
+ suspensionState: 'goneSuspended',
});
});
throw new Bull.UnrecoverableError(`${host} is gone`);
diff --git a/packages/backend/src/queue/processors/ExportAccountDataProcessorService.ts b/packages/backend/src/queue/processors/ExportAccountDataProcessorService.ts
index f6a4e8e2ec..33a2362c4a 100644
--- a/packages/backend/src/queue/processors/ExportAccountDataProcessorService.ts
+++ b/packages/backend/src/queue/processors/ExportAccountDataProcessorService.ts
@@ -1,3 +1,8 @@
+/*
+ * SPDX-FileCopyrightText: marie and other Sharkey contributors
+ * SPDX-License-Identifier: AGPL-3.0-only
+ */
+
import * as fs from 'node:fs';
import { Inject, Injectable } from '@nestjs/common';
import { In, IsNull, MoreThan, Not } from 'typeorm';
@@ -287,7 +292,7 @@ export class ExportAccountDataProcessorService {
const mutings = await this.mutingsRepository.findBy({
muterId: user.id,
});
-
+
while (true) {
const followings = await this.followingsRepository.find({
where: {
@@ -353,7 +358,7 @@ export class ExportAccountDataProcessorService {
let followersCursor: MiFollowing['id'] | null = null;
let exportedFollowersCount = 0;
-
+
while (true) {
const followers = await this.followingsRepository.find({
where: {
@@ -680,7 +685,6 @@ export class ExportAccountDataProcessorService {
localOnly: antenna.localOnly,
withReplies: antenna.withReplies,
withFile: antenna.withFile,
- notify: antenna.notify,
}));
if (antennas.length - 1 !== index) {
@@ -749,9 +753,9 @@ export class ExportAccountDataProcessorService {
cleanup();
archiveCleanup();
if (profile.email) {
- this.emailService.sendEmail(profile.email,
- 'Your data archive is ready',
- `Click the following link to download the archive: ${driveFile.url}<br/>It is also available in your drive.`,
+ this.emailService.sendEmail(profile.email,
+ 'Your data archive is ready',
+ `Click the following link to download the archive: ${driveFile.url}<br/>It is also available in your drive.`,
`Click the following link to download the archive: ${driveFile.url}\r\n\r\nIt is also available in your drive.`,
);
}
diff --git a/packages/backend/src/queue/processors/ExportAntennasProcessorService.ts b/packages/backend/src/queue/processors/ExportAntennasProcessorService.ts
index af48bad417..88c4ea29c0 100644
--- a/packages/backend/src/queue/processors/ExportAntennasProcessorService.ts
+++ b/packages/backend/src/queue/processors/ExportAntennasProcessorService.ts
@@ -81,9 +81,9 @@ export class ExportAntennasProcessorService {
}) : null,
caseSensitive: antenna.caseSensitive,
localOnly: antenna.localOnly,
+ excludeBots: antenna.excludeBots,
withReplies: antenna.withReplies,
withFile: antenna.withFile,
- notify: antenna.notify,
}));
if (antennas.length - 1 !== index) {
write(', ');
diff --git a/packages/backend/src/queue/processors/ImportAntennasProcessorService.ts b/packages/backend/src/queue/processors/ImportAntennasProcessorService.ts
index 951b560597..e5b7c5ac52 100644
--- a/packages/backend/src/queue/processors/ImportAntennasProcessorService.ts
+++ b/packages/backend/src/queue/processors/ImportAntennasProcessorService.ts
@@ -44,11 +44,11 @@ const validate = new Ajv().compile({
} },
caseSensitive: { type: 'boolean' },
localOnly: { type: 'boolean' },
+ excludeBots: { type: 'boolean' },
withReplies: { type: 'boolean' },
withFile: { type: 'boolean' },
- notify: { type: 'boolean' },
},
- required: ['name', 'src', 'keywords', 'excludeKeywords', 'users', 'caseSensitive', 'withReplies', 'withFile', 'notify'],
+ required: ['name', 'src', 'keywords', 'excludeKeywords', 'users', 'caseSensitive', 'withReplies', 'withFile'],
});
@Injectable()
@@ -88,9 +88,9 @@ export class ImportAntennasProcessorService {
users: (antenna.src === 'list' && antenna.userListAccts !== null ? antenna.userListAccts : antenna.users).filter(Boolean),
caseSensitive: antenna.caseSensitive,
localOnly: antenna.localOnly,
+ excludeBots: antenna.excludeBots,
withReplies: antenna.withReplies,
withFile: antenna.withFile,
- notify: antenna.notify,
}).then(x => this.antennasRepository.findOneByOrFail(x.identifiers[0]));
this.logger.succ('Antenna created: ' + result.id);
this.globalEventService.publishInternalEvent('antennaCreated', result);
diff --git a/packages/backend/src/queue/processors/ImportNotesProcessorService.ts b/packages/backend/src/queue/processors/ImportNotesProcessorService.ts
index 58a0ea10ad..f89dc46722 100644
--- a/packages/backend/src/queue/processors/ImportNotesProcessorService.ts
+++ b/packages/backend/src/queue/processors/ImportNotesProcessorService.ts
@@ -1,3 +1,8 @@
+/*
+ * SPDX-FileCopyrightText: marie and other Sharkey contributors
+ * SPDX-License-Identifier: AGPL-3.0-only
+ */
+
import * as fs from 'node:fs';
import * as fsp from 'node:fs/promises';
import * as crypto from 'node:crypto';
diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts
index 2b5b7c5619..641b8b8607 100644
--- a/packages/backend/src/queue/processors/InboxProcessorService.ts
+++ b/packages/backend/src/queue/processors/InboxProcessorService.ts
@@ -22,7 +22,7 @@ import { ApDbResolverService } from '@/core/activitypub/ApDbResolverService.js';
import { StatusError } from '@/misc/status-error.js';
import { UtilityService } from '@/core/UtilityService.js';
import { ApPersonService } from '@/core/activitypub/models/ApPersonService.js';
-import { LdSignatureService } from '@/core/activitypub/LdSignatureService.js';
+import { JsonLdService } from '@/core/activitypub/JsonLdService.js';
import { ApInboxService } from '@/core/activitypub/ApInboxService.js';
import { bindThis } from '@/decorators.js';
import { IdentifiableError } from '@/misc/identifiable-error.js';
@@ -39,7 +39,7 @@ export class InboxProcessorService {
private apInboxService: ApInboxService,
private federatedInstanceService: FederatedInstanceService,
private fetchInstanceMetadataService: FetchInstanceMetadataService,
- private ldSignatureService: LdSignatureService,
+ private jsonLdService: JsonLdService,
private apPersonService: ApPersonService,
private apDbResolverService: ApDbResolverService,
private instanceChart: InstanceChart,
@@ -111,7 +111,7 @@ export class InboxProcessorService {
// また、signatureのsignerは、activity.actorと一致する必要がある
if (!httpSignatureValidated || authUser.user.uri !== activity.actor) {
let renewKeyFailed = true;
-
+
if (!httpSignatureValidated) {
authUser.key = await this.apDbResolverService.refetchPublicKeyForApId(authUser.user);
@@ -122,20 +122,21 @@ export class InboxProcessorService {
}
// 一致しなくても、でもLD-Signatureがありそうならそっちも見る
- if (activity.signature && renewKeyFailed) {
- if (activity.signature.type !== 'RsaSignature2017') {
- throw new Bull.UnrecoverableError(`skip: unsupported LD-signature type ${activity.signature.type}`);
+ const ldSignature = activity.signature;
+ if (ldSignature) {
+ if (ldSignature.type !== 'RsaSignature2017') {
+ throw new Bull.UnrecoverableError(`skip: unsupported LD-signature type ${ldSignature.type}`);
}
- // activity.signature.creator: https://example.oom/users/user#main-key
+ // ldSignature.creator: https://example.oom/users/user#main-key
// みたいになっててUserを引っ張れば公開キーも入ることを期待する
- if (activity.signature.creator) {
- const candicate = activity.signature.creator.replace(/#.*/, '');
+ if (ldSignature.creator) {
+ const candicate = ldSignature.creator.replace(/#.*/, '');
await this.apPersonService.resolvePerson(candicate).catch(() => null);
}
// keyIdからLD-Signatureのユーザーを取得
- authUser = await this.apDbResolverService.getAuthUserFromKeyId(activity.signature.creator);
+ authUser = await this.apDbResolverService.getAuthUserFromKeyId(ldSignature.creator);
if (authUser == null) {
throw new Bull.UnrecoverableError('skip: LD-Signatureのユーザーが取得できませんでした');
}
@@ -144,9 +145,10 @@ export class InboxProcessorService {
throw new Bull.UnrecoverableError('skip: LD-SignatureのユーザーはpublicKeyを持っていませんでした');
}
+ const jsonLd = this.jsonLdService.use();
+
// LD-Signature検証
- const ldSignature = this.ldSignatureService.use();
- const verified = await ldSignature.verifyRsaSignature2017(activity, authUser.key.keyPem).catch(() => false);
+ const verified = await jsonLd.verifyRsaSignature2017(activity, authUser.key.keyPem).catch(() => false);
if (!verified) {
throw new Bull.UnrecoverableError('skip: LD-Signatureの検証に失敗しました');
}
@@ -154,7 +156,7 @@ export class InboxProcessorService {
// アクティビティを正規化
delete activity.signature;
try {
- activity = await ldSignature.compact(activity) as IActivity;
+ activity = await jsonLd.compact(activity) as IActivity;
} catch (e) {
throw new Bull.UnrecoverableError(`skip: failed to compact activity: ${e}`);
}
@@ -191,6 +193,8 @@ export class InboxProcessorService {
this.federatedInstanceService.update(i.id, {
latestRequestReceivedAt: new Date(),
isNotResponding: false,
+ // もしサーバーが死んでるために配信が止まっていた場合には自動的に復活させてあげる
+ suspensionState: i.suspensionState === 'autoSuspendedForNotResponding' ? 'none' : undefined,
});
this.fetchInstanceMetadataService.fetchInstanceMetadata(i);
@@ -205,13 +209,22 @@ export class InboxProcessorService {
// アクティビティを処理
try {
- await this.apInboxService.performActivity(authUser.user, activity);
+ const result = await this.apInboxService.performActivity(authUser.user, activity);
+ if (result && !result.startsWith('ok')) {
+ this.logger.warn(`inbox activity ignored (maybe): id=${activity.id} reason=${result}`);
+ return result;
+ }
} catch (e) {
if (e instanceof IdentifiableError) {
if (e.id === '689ee33f-f97c-479a-ac49-1b9f8140af99') {
return 'blocked notes with prohibited words';
}
- if (e.id === '85ab9bd7-3a41-4530-959d-f07073900109') return 'actor has been suspended';
+ if (e.id === '85ab9bd7-3a41-4530-959d-f07073900109') {
+ return 'actor has been suspended';
+ }
+ if (e.id === 'd450b8a9-48e4-4dab-ae36-f4db763fda7c') { // invalid Note
+ return e.message;
+ }
}
throw e;
}