summaryrefslogtreecommitdiff
path: root/packages/backend/src/queue
diff options
context:
space:
mode:
authordakkar <dakkar@thenautilus.net>2024-06-26 19:57:45 +0000
committerdakkar <dakkar@thenautilus.net>2024-06-26 19:57:45 +0000
commitdc4f6c8016ccece79f8f2b75a5378c643e884f0b (patch)
tree3e2c9e998a2f97244a37f3f8cb7f1831e4108ab7 /packages/backend/src/queue
parentmerge: Release 2024.3.3 (!501) (diff)
parentmerge: parse `notRespondingSince` from redis instance cache (!560) (diff)
downloadsharkey-dc4f6c8016ccece79f8f2b75a5378c643e884f0b.tar.gz
sharkey-dc4f6c8016ccece79f8f2b75a5378c643e884f0b.tar.bz2
sharkey-dc4f6c8016ccece79f8f2b75a5378c643e884f0b.zip
merge: release 2024.5.0 (!556)
View MR for information: https://activitypub.software/TransFem-org/Sharkey/-/merge_requests/556 Approved-by: Tess K <me@thvxl.se>
Diffstat (limited to 'packages/backend/src/queue')
-rw-r--r--packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts1
-rw-r--r--packages/backend/src/queue/processors/DeliverProcessorService.ts20
-rw-r--r--packages/backend/src/queue/processors/ExportAccountDataProcessorService.ts16
-rw-r--r--packages/backend/src/queue/processors/ExportAntennasProcessorService.ts2
-rw-r--r--packages/backend/src/queue/processors/ImportAntennasProcessorService.ts6
-rw-r--r--packages/backend/src/queue/processors/ImportNotesProcessorService.ts26
-rw-r--r--packages/backend/src/queue/processors/InboxProcessorService.ts43
7 files changed, 80 insertions, 34 deletions
diff --git a/packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts b/packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts
index 4fa414b0b5..81842b221f 100644
--- a/packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts
+++ b/packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts
@@ -73,7 +73,6 @@ export class CleanRemoteFilesProcessorService {
});
await job.updateProgress(100 / total * deletedCount);
-
}
this.logger.succ(`All cached remote files processed. Total deleted: ${deletedCount}, Failed: ${errorCount}.`);
diff --git a/packages/backend/src/queue/processors/DeliverProcessorService.ts b/packages/backend/src/queue/processors/DeliverProcessorService.ts
index 5fed070929..d665945861 100644
--- a/packages/backend/src/queue/processors/DeliverProcessorService.ts
+++ b/packages/backend/src/queue/processors/DeliverProcessorService.ts
@@ -5,6 +5,7 @@
import { Inject, Injectable } from '@nestjs/common';
import * as Bull from 'bullmq';
+import { Not } from 'typeorm';
import { DI } from '@/di-symbols.js';
import type { InstancesRepository } from '@/models/_.js';
import type Logger from '@/logger.js';
@@ -62,7 +63,7 @@ export class DeliverProcessorService {
if (suspendedHosts == null) {
suspendedHosts = await this.instancesRepository.find({
where: {
- isSuspended: true,
+ suspensionState: Not('none'),
},
});
this.suspendedHostsCache.set(suspendedHosts);
@@ -79,6 +80,7 @@ export class DeliverProcessorService {
if (i.isNotResponding) {
this.federatedInstanceService.update(i.id, {
isNotResponding: false,
+ notRespondingSince: null,
});
}
@@ -98,6 +100,20 @@ export class DeliverProcessorService {
if (!i.isNotResponding) {
this.federatedInstanceService.update(i.id, {
isNotResponding: true,
+ notRespondingSince: new Date(),
+ });
+ } else if (i.notRespondingSince) {
+ // 1週間以上不通ならサスペンド
+ if (i.suspensionState === 'none' && i.notRespondingSince.getTime() <= Date.now() - 1000 * 60 * 60 * 24 * 7) {
+ this.federatedInstanceService.update(i.id, {
+ suspensionState: 'autoSuspendedForNotResponding',
+ });
+ }
+ } else {
+ // isNotRespondingがtrueでnotRespondingSinceがnullの場合はnotRespondingSinceをセット
+ // notRespondingSinceは新たな機能なので、それ以前のデータにはnotRespondingSinceがない場合がある
+ this.federatedInstanceService.update(i.id, {
+ notRespondingSince: new Date(),
});
}
@@ -116,7 +132,7 @@ export class DeliverProcessorService {
if (job.data.isSharedInbox && res.statusCode === 410) {
this.federatedInstanceService.fetch(host).then(i => {
this.federatedInstanceService.update(i.id, {
- isSuspended: true,
+ suspensionState: 'goneSuspended',
});
});
throw new Bull.UnrecoverableError(`${host} is gone`);
diff --git a/packages/backend/src/queue/processors/ExportAccountDataProcessorService.ts b/packages/backend/src/queue/processors/ExportAccountDataProcessorService.ts
index f6a4e8e2ec..33a2362c4a 100644
--- a/packages/backend/src/queue/processors/ExportAccountDataProcessorService.ts
+++ b/packages/backend/src/queue/processors/ExportAccountDataProcessorService.ts
@@ -1,3 +1,8 @@
+/*
+ * SPDX-FileCopyrightText: marie and other Sharkey contributors
+ * SPDX-License-Identifier: AGPL-3.0-only
+ */
+
import * as fs from 'node:fs';
import { Inject, Injectable } from '@nestjs/common';
import { In, IsNull, MoreThan, Not } from 'typeorm';
@@ -287,7 +292,7 @@ export class ExportAccountDataProcessorService {
const mutings = await this.mutingsRepository.findBy({
muterId: user.id,
});
-
+
while (true) {
const followings = await this.followingsRepository.find({
where: {
@@ -353,7 +358,7 @@ export class ExportAccountDataProcessorService {
let followersCursor: MiFollowing['id'] | null = null;
let exportedFollowersCount = 0;
-
+
while (true) {
const followers = await this.followingsRepository.find({
where: {
@@ -680,7 +685,6 @@ export class ExportAccountDataProcessorService {
localOnly: antenna.localOnly,
withReplies: antenna.withReplies,
withFile: antenna.withFile,
- notify: antenna.notify,
}));
if (antennas.length - 1 !== index) {
@@ -749,9 +753,9 @@ export class ExportAccountDataProcessorService {
cleanup();
archiveCleanup();
if (profile.email) {
- this.emailService.sendEmail(profile.email,
- 'Your data archive is ready',
- `Click the following link to download the archive: ${driveFile.url}<br/>It is also available in your drive.`,
+ this.emailService.sendEmail(profile.email,
+ 'Your data archive is ready',
+ `Click the following link to download the archive: ${driveFile.url}<br/>It is also available in your drive.`,
`Click the following link to download the archive: ${driveFile.url}\r\n\r\nIt is also available in your drive.`,
);
}
diff --git a/packages/backend/src/queue/processors/ExportAntennasProcessorService.ts b/packages/backend/src/queue/processors/ExportAntennasProcessorService.ts
index af48bad417..88c4ea29c0 100644
--- a/packages/backend/src/queue/processors/ExportAntennasProcessorService.ts
+++ b/packages/backend/src/queue/processors/ExportAntennasProcessorService.ts
@@ -81,9 +81,9 @@ export class ExportAntennasProcessorService {
}) : null,
caseSensitive: antenna.caseSensitive,
localOnly: antenna.localOnly,
+ excludeBots: antenna.excludeBots,
withReplies: antenna.withReplies,
withFile: antenna.withFile,
- notify: antenna.notify,
}));
if (antennas.length - 1 !== index) {
write(', ');
diff --git a/packages/backend/src/queue/processors/ImportAntennasProcessorService.ts b/packages/backend/src/queue/processors/ImportAntennasProcessorService.ts
index 951b560597..e5b7c5ac52 100644
--- a/packages/backend/src/queue/processors/ImportAntennasProcessorService.ts
+++ b/packages/backend/src/queue/processors/ImportAntennasProcessorService.ts
@@ -44,11 +44,11 @@ const validate = new Ajv().compile({
} },
caseSensitive: { type: 'boolean' },
localOnly: { type: 'boolean' },
+ excludeBots: { type: 'boolean' },
withReplies: { type: 'boolean' },
withFile: { type: 'boolean' },
- notify: { type: 'boolean' },
},
- required: ['name', 'src', 'keywords', 'excludeKeywords', 'users', 'caseSensitive', 'withReplies', 'withFile', 'notify'],
+ required: ['name', 'src', 'keywords', 'excludeKeywords', 'users', 'caseSensitive', 'withReplies', 'withFile'],
});
@Injectable()
@@ -88,9 +88,9 @@ export class ImportAntennasProcessorService {
users: (antenna.src === 'list' && antenna.userListAccts !== null ? antenna.userListAccts : antenna.users).filter(Boolean),
caseSensitive: antenna.caseSensitive,
localOnly: antenna.localOnly,
+ excludeBots: antenna.excludeBots,
withReplies: antenna.withReplies,
withFile: antenna.withFile,
- notify: antenna.notify,
}).then(x => this.antennasRepository.findOneByOrFail(x.identifiers[0]));
this.logger.succ('Antenna created: ' + result.id);
this.globalEventService.publishInternalEvent('antennaCreated', result);
diff --git a/packages/backend/src/queue/processors/ImportNotesProcessorService.ts b/packages/backend/src/queue/processors/ImportNotesProcessorService.ts
index 7cef858c51..f89dc46722 100644
--- a/packages/backend/src/queue/processors/ImportNotesProcessorService.ts
+++ b/packages/backend/src/queue/processors/ImportNotesProcessorService.ts
@@ -1,3 +1,8 @@
+/*
+ * SPDX-FileCopyrightText: marie and other Sharkey contributors
+ * SPDX-License-Identifier: AGPL-3.0-only
+ */
+
import * as fs from 'node:fs';
import * as fsp from 'node:fs/promises';
import * as crypto from 'node:crypto';
@@ -19,12 +24,16 @@ import { IdService } from '@/core/IdService.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
import type * as Bull from 'bullmq';
import type { DbNoteImportToDbJobData, DbNoteImportJobData, DbNoteWithParentImportToDbJobData } from '../types.js';
+import type { Config } from '@/config.js';
@Injectable()
export class ImportNotesProcessorService {
private logger: Logger;
constructor(
+ @Inject(DI.config)
+ private config: Config,
+
@Inject(DI.usersRepository)
private usersRepository: UsersRepository,
@@ -74,6 +83,11 @@ export class ImportNotesProcessorService {
}
@bindThis
+ private downloadUrl(url: string, path:string): Promise<{filename: string}> {
+ return this.downloadService.downloadUrl(url, path, { operationTimeout: this.config.import?.downloadTimeout, maxSize: this.config.import?.maxFileSize });
+ }
+
+ @bindThis
private async recreateChain(idFieldPath: string[], replyFieldPath: string[], arr: any[], includeOrphans: boolean): Promise<any[]> {
type NotesMap = {
[id: string]: any;
@@ -176,7 +190,7 @@ export class ImportNotesProcessorService {
try {
await fsp.writeFile(destPath, '', 'binary');
- await this.downloadService.downloadUrl(file.url, destPath);
+ await this.downloadUrl(file.url, destPath);
} catch (e) { // TODO: 何度か再試行
if (e instanceof Error || typeof e === 'string') {
this.logger.error(e);
@@ -206,7 +220,7 @@ export class ImportNotesProcessorService {
try {
await fsp.writeFile(destPath, '', 'binary');
- await this.downloadService.downloadUrl(file.url, destPath);
+ await this.downloadUrl(file.url, destPath);
} catch (e) { // TODO: 何度か再試行
if (e instanceof Error || typeof e === 'string') {
this.logger.error(e);
@@ -239,7 +253,7 @@ export class ImportNotesProcessorService {
try {
await fsp.writeFile(destPath, '', 'binary');
- await this.downloadService.downloadUrl(file.url, destPath);
+ await this.downloadUrl(file.url, destPath);
} catch (e) { // TODO: 何度か再試行
if (e instanceof Error || typeof e === 'string') {
this.logger.error(e);
@@ -297,7 +311,7 @@ export class ImportNotesProcessorService {
try {
await fsp.writeFile(path, '', 'utf-8');
- await this.downloadService.downloadUrl(file.url, path);
+ await this.downloadUrl(file.url, path);
} catch (e) { // TODO: 何度か再試行
if (e instanceof Error || typeof e === 'string') {
this.logger.error(e);
@@ -349,7 +363,7 @@ export class ImportNotesProcessorService {
if (!exists) {
try {
- await this.downloadService.downloadUrl(file.url, filePath);
+ await this.downloadUrl(file.url, filePath);
} catch (e) { // TODO: 何度か再試行
this.logger.error(e instanceof Error ? e : new Error(e as string));
}
@@ -488,7 +502,7 @@ export class ImportNotesProcessorService {
if (!exists) {
try {
- await this.downloadService.downloadUrl(file.url, filePath);
+ await this.downloadUrl(file.url, filePath);
} catch (e) { // TODO: 何度か再試行
this.logger.error(e instanceof Error ? e : new Error(e as string));
}
diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts
index 2b5b7c5619..641b8b8607 100644
--- a/packages/backend/src/queue/processors/InboxProcessorService.ts
+++ b/packages/backend/src/queue/processors/InboxProcessorService.ts
@@ -22,7 +22,7 @@ import { ApDbResolverService } from '@/core/activitypub/ApDbResolverService.js';
import { StatusError } from '@/misc/status-error.js';
import { UtilityService } from '@/core/UtilityService.js';
import { ApPersonService } from '@/core/activitypub/models/ApPersonService.js';
-import { LdSignatureService } from '@/core/activitypub/LdSignatureService.js';
+import { JsonLdService } from '@/core/activitypub/JsonLdService.js';
import { ApInboxService } from '@/core/activitypub/ApInboxService.js';
import { bindThis } from '@/decorators.js';
import { IdentifiableError } from '@/misc/identifiable-error.js';
@@ -39,7 +39,7 @@ export class InboxProcessorService {
private apInboxService: ApInboxService,
private federatedInstanceService: FederatedInstanceService,
private fetchInstanceMetadataService: FetchInstanceMetadataService,
- private ldSignatureService: LdSignatureService,
+ private jsonLdService: JsonLdService,
private apPersonService: ApPersonService,
private apDbResolverService: ApDbResolverService,
private instanceChart: InstanceChart,
@@ -111,7 +111,7 @@ export class InboxProcessorService {
// また、signatureのsignerは、activity.actorと一致する必要がある
if (!httpSignatureValidated || authUser.user.uri !== activity.actor) {
let renewKeyFailed = true;
-
+
if (!httpSignatureValidated) {
authUser.key = await this.apDbResolverService.refetchPublicKeyForApId(authUser.user);
@@ -122,20 +122,21 @@ export class InboxProcessorService {
}
// 一致しなくても、でもLD-Signatureがありそうならそっちも見る
- if (activity.signature && renewKeyFailed) {
- if (activity.signature.type !== 'RsaSignature2017') {
- throw new Bull.UnrecoverableError(`skip: unsupported LD-signature type ${activity.signature.type}`);
+ const ldSignature = activity.signature;
+ if (ldSignature) {
+ if (ldSignature.type !== 'RsaSignature2017') {
+ throw new Bull.UnrecoverableError(`skip: unsupported LD-signature type ${ldSignature.type}`);
}
- // activity.signature.creator: https://example.oom/users/user#main-key
+ // ldSignature.creator: https://example.oom/users/user#main-key
// みたいになっててUserを引っ張れば公開キーも入ることを期待する
- if (activity.signature.creator) {
- const candicate = activity.signature.creator.replace(/#.*/, '');
+ if (ldSignature.creator) {
+ const candicate = ldSignature.creator.replace(/#.*/, '');
await this.apPersonService.resolvePerson(candicate).catch(() => null);
}
// keyIdからLD-Signatureのユーザーを取得
- authUser = await this.apDbResolverService.getAuthUserFromKeyId(activity.signature.creator);
+ authUser = await this.apDbResolverService.getAuthUserFromKeyId(ldSignature.creator);
if (authUser == null) {
throw new Bull.UnrecoverableError('skip: LD-Signatureのユーザーが取得できませんでした');
}
@@ -144,9 +145,10 @@ export class InboxProcessorService {
throw new Bull.UnrecoverableError('skip: LD-SignatureのユーザーはpublicKeyを持っていませんでした');
}
+ const jsonLd = this.jsonLdService.use();
+
// LD-Signature検証
- const ldSignature = this.ldSignatureService.use();
- const verified = await ldSignature.verifyRsaSignature2017(activity, authUser.key.keyPem).catch(() => false);
+ const verified = await jsonLd.verifyRsaSignature2017(activity, authUser.key.keyPem).catch(() => false);
if (!verified) {
throw new Bull.UnrecoverableError('skip: LD-Signatureの検証に失敗しました');
}
@@ -154,7 +156,7 @@ export class InboxProcessorService {
// アクティビティを正規化
delete activity.signature;
try {
- activity = await ldSignature.compact(activity) as IActivity;
+ activity = await jsonLd.compact(activity) as IActivity;
} catch (e) {
throw new Bull.UnrecoverableError(`skip: failed to compact activity: ${e}`);
}
@@ -191,6 +193,8 @@ export class InboxProcessorService {
this.federatedInstanceService.update(i.id, {
latestRequestReceivedAt: new Date(),
isNotResponding: false,
+ // もしサーバーが死んでるために配信が止まっていた場合には自動的に復活させてあげる
+ suspensionState: i.suspensionState === 'autoSuspendedForNotResponding' ? 'none' : undefined,
});
this.fetchInstanceMetadataService.fetchInstanceMetadata(i);
@@ -205,13 +209,22 @@ export class InboxProcessorService {
// アクティビティを処理
try {
- await this.apInboxService.performActivity(authUser.user, activity);
+ const result = await this.apInboxService.performActivity(authUser.user, activity);
+ if (result && !result.startsWith('ok')) {
+ this.logger.warn(`inbox activity ignored (maybe): id=${activity.id} reason=${result}`);
+ return result;
+ }
} catch (e) {
if (e instanceof IdentifiableError) {
if (e.id === '689ee33f-f97c-479a-ac49-1b9f8140af99') {
return 'blocked notes with prohibited words';
}
- if (e.id === '85ab9bd7-3a41-4530-959d-f07073900109') return 'actor has been suspended';
+ if (e.id === '85ab9bd7-3a41-4530-959d-f07073900109') {
+ return 'actor has been suspended';
+ }
+ if (e.id === 'd450b8a9-48e4-4dab-ae36-f4db763fda7c') { // invalid Note
+ return e.message;
+ }
}
throw e;
}