diff options
Diffstat (limited to 'packages/backend/src/queue/processors/InboxProcessorService.ts')
| -rw-r--r-- | packages/backend/src/queue/processors/InboxProcessorService.ts | 36 |
1 files changed, 26 insertions, 10 deletions
diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts index 102e835e24..7727a3e985 100644 --- a/packages/backend/src/queue/processors/InboxProcessorService.ts +++ b/packages/backend/src/queue/processors/InboxProcessorService.ts @@ -59,7 +59,7 @@ export class InboxProcessorService implements OnApplicationShutdown { private queueLoggerService: QueueLoggerService, ) { this.logger = this.queueLoggerService.logger.createSubLogger('inbox'); - this.updateInstanceQueue = new CollapsedQueue(60 * 1000 * 5, this.collapseUpdateInstanceJobs, this.performUpdateInstance); + this.updateInstanceQueue = new CollapsedQueue(process.env.NODE_ENV !== 'test' ? 60 * 1000 * 5 : 0, this.collapseUpdateInstanceJobs, this.performUpdateInstance); } @bindThis @@ -193,31 +193,42 @@ export class InboxProcessorService implements OnApplicationShutdown { throw new Bull.UnrecoverableError(`skip: signerHost(${signerHost}) !== activity.id host(${activityIdHost}`); } } else { - throw new Bull.UnrecoverableError('skip: activity id is not a string'); + // Activity ID should only be string or undefined. + delete activity.id; } - // Update stats - this.federatedInstanceService.fetch(authUser.user.host).then(i => { + this.apRequestChart.inbox(); + this.federationChart.inbox(authUser.user.host); + + // Update instance stats + process.nextTick(async () => { + const i = await (this.meta.enableStatsForFederatedInstances + ? this.federatedInstanceService.fetchOrRegister(authUser.user.host) + : this.federatedInstanceService.fetch(authUser.user.host)); + + if (i == null) return; + this.updateInstanceQueue.enqueue(i.id, { latestRequestReceivedAt: new Date(), shouldUnsuspend: i.suspensionState === 'autoSuspendedForNotResponding', }); - this.fetchInstanceMetadataService.fetchInstanceMetadata(i); - - this.apRequestChart.inbox(); - this.federationChart.inbox(i.host); - if (this.meta.enableChartsForFederatedInstances) { this.instanceChart.requestReceived(i.host); } + + this.fetchInstanceMetadataService.fetchInstanceMetadata(i); }); // アクティビティを処理 try { const result = await this.apInboxService.performActivity(authUser.user, activity); if (result && !result.startsWith('ok')) { - this.logger.warn(`inbox activity ignored (maybe): id=${activity.id} reason=${result}`); + if (result.startsWith('skip:')) { + this.logger.info(`inbox activity ignored: id=${activity.id} reason=${result}`); + } else { + this.logger.warn(`inbox activity failed: id=${activity.id} reason=${result}`); + } return result; } } catch (e) { @@ -232,6 +243,11 @@ export class InboxProcessorService implements OnApplicationShutdown { return e.message; } } + + if (e instanceof StatusError && !e.isRetryable) { + return `skip: permanent error ${e.statusCode}`; + } + throw e; } return 'ok'; |