diff options
Diffstat (limited to 'packages/backend/src/queue/processors')
| -rw-r--r-- | packages/backend/src/queue/processors/EndedPollNotificationProcessorService.ts | 11 | ||||
| -rw-r--r-- | packages/backend/src/queue/processors/ImportAntennasProcessorService.ts | 4 | ||||
| -rw-r--r-- | packages/backend/src/queue/processors/ImportUserListsProcessorService.ts | 4 | ||||
| -rw-r--r-- | packages/backend/src/queue/processors/SystemWebhookDeliverProcessorService.ts | 87 | ||||
| -rw-r--r-- | packages/backend/src/queue/processors/UserWebhookDeliverProcessorService.ts (renamed from packages/backend/src/queue/processors/WebhookDeliverProcessorService.ts) | 6 |
5 files changed, 102 insertions, 10 deletions
diff --git a/packages/backend/src/queue/processors/EndedPollNotificationProcessorService.ts b/packages/backend/src/queue/processors/EndedPollNotificationProcessorService.ts index 29c1f27bb1..34180e5f2b 100644 --- a/packages/backend/src/queue/processors/EndedPollNotificationProcessorService.ts +++ b/packages/backend/src/queue/processors/EndedPollNotificationProcessorService.ts @@ -7,6 +7,7 @@ import { Inject, Injectable } from '@nestjs/common'; import { DI } from '@/di-symbols.js'; import type { PollVotesRepository, NotesRepository } from '@/models/_.js'; import type Logger from '@/logger.js'; +import { CacheService } from '@/core/CacheService.js'; import { NotificationService } from '@/core/NotificationService.js'; import { bindThis } from '@/decorators.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; @@ -24,6 +25,7 @@ export class EndedPollNotificationProcessorService { @Inject(DI.pollVotesRepository) private pollVotesRepository: PollVotesRepository, + private cacheService: CacheService, private notificationService: NotificationService, private queueLoggerService: QueueLoggerService, ) { @@ -47,9 +49,12 @@ export class EndedPollNotificationProcessorService { const userIds = [...new Set([note.userId, ...votes.map(v => v.userId)])]; for (const userId of userIds) { - this.notificationService.createNotification(userId, 'pollEnded', { - noteId: note.id, - }); + const profile = await this.cacheService.userProfileCache.fetch(userId); + if (profile.userHost === null) { + this.notificationService.createNotification(userId, 'pollEnded', { + noteId: note.id, + }); + } } } } diff --git a/packages/backend/src/queue/processors/ImportAntennasProcessorService.ts b/packages/backend/src/queue/processors/ImportAntennasProcessorService.ts index e5b7c5ac52..9c033b73e2 100644 --- a/packages/backend/src/queue/processors/ImportAntennasProcessorService.ts +++ b/packages/backend/src/queue/processors/ImportAntennasProcessorService.ts @@ -76,7 +76,7 @@ export class ImportAntennasProcessorService { this.logger.warn('Validation Failed'); continue; } - const result = await this.antennasRepository.insert({ + const result = await this.antennasRepository.insertOne({ id: this.idService.gen(now.getTime()), lastUsedAt: now, userId: job.data.user.id, @@ -91,7 +91,7 @@ export class ImportAntennasProcessorService { excludeBots: antenna.excludeBots, withReplies: antenna.withReplies, withFile: antenna.withFile, - }).then(x => this.antennasRepository.findOneByOrFail(x.identifiers[0])); + }); this.logger.succ('Antenna created: ' + result.id); this.globalEventService.publishInternalEvent('antennaCreated', result); } diff --git a/packages/backend/src/queue/processors/ImportUserListsProcessorService.ts b/packages/backend/src/queue/processors/ImportUserListsProcessorService.ts index a5992c28c8..db9255b35d 100644 --- a/packages/backend/src/queue/processors/ImportUserListsProcessorService.ts +++ b/packages/backend/src/queue/processors/ImportUserListsProcessorService.ts @@ -79,11 +79,11 @@ export class ImportUserListsProcessorService { }); if (list == null) { - list = await this.userListsRepository.insert({ + list = await this.userListsRepository.insertOne({ id: this.idService.gen(), userId: user.id, name: listName, - }).then(x => this.userListsRepository.findOneByOrFail(x.identifiers[0])); + }); } let target = this.utilityService.isSelfHost(host!) ? await this.usersRepository.findOneBy({ diff --git a/packages/backend/src/queue/processors/SystemWebhookDeliverProcessorService.ts b/packages/backend/src/queue/processors/SystemWebhookDeliverProcessorService.ts new file mode 100644 index 0000000000..f6bef52684 --- /dev/null +++ b/packages/backend/src/queue/processors/SystemWebhookDeliverProcessorService.ts @@ -0,0 +1,87 @@ +/* + * SPDX-FileCopyrightText: syuilo and misskey-project + * SPDX-License-Identifier: AGPL-3.0-only + */ + +import { Inject, Injectable } from '@nestjs/common'; +import * as Bull from 'bullmq'; +import { DI } from '@/di-symbols.js'; +import type { SystemWebhooksRepository } from '@/models/_.js'; +import type { Config } from '@/config.js'; +import type Logger from '@/logger.js'; +import { HttpRequestService } from '@/core/HttpRequestService.js'; +import { StatusError } from '@/misc/status-error.js'; +import { bindThis } from '@/decorators.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import { SystemWebhookDeliverJobData } from '../types.js'; + +@Injectable() +export class SystemWebhookDeliverProcessorService { + private logger: Logger; + + constructor( + @Inject(DI.config) + private config: Config, + + @Inject(DI.systemWebhooksRepository) + private systemWebhooksRepository: SystemWebhooksRepository, + + private httpRequestService: HttpRequestService, + private queueLoggerService: QueueLoggerService, + ) { + this.logger = this.queueLoggerService.logger.createSubLogger('webhook'); + } + + @bindThis + public async process(job: Bull.Job<SystemWebhookDeliverJobData>): Promise<string> { + try { + this.logger.debug(`delivering ${job.data.webhookId}`); + + const res = await this.httpRequestService.send(job.data.to, { + method: 'POST', + headers: { + 'User-Agent': 'Misskey-Hooks', + 'X-Misskey-Host': this.config.host, + 'X-Misskey-Hook-Id': job.data.webhookId, + 'X-Misskey-Hook-Secret': job.data.secret, + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + server: this.config.url, + hookId: job.data.webhookId, + eventId: job.data.eventId, + createdAt: job.data.createdAt, + type: job.data.type, + body: job.data.content, + }), + }); + + this.systemWebhooksRepository.update({ id: job.data.webhookId }, { + latestSentAt: new Date(), + latestStatus: res.status, + }); + + return 'Success'; + } catch (res) { + this.logger.error(res as Error); + + this.systemWebhooksRepository.update({ id: job.data.webhookId }, { + latestSentAt: new Date(), + latestStatus: res instanceof StatusError ? res.statusCode : 1, + }); + + if (res instanceof StatusError) { + // 4xx + if (!res.isRetryable) { + throw new Bull.UnrecoverableError(`${res.statusCode} ${res.statusMessage}`); + } + + // 5xx etc. + throw new Error(`${res.statusCode} ${res.statusMessage}`); + } else { + // DNS error, socket error, timeout ... + throw res; + } + } + } +} diff --git a/packages/backend/src/queue/processors/WebhookDeliverProcessorService.ts b/packages/backend/src/queue/processors/UserWebhookDeliverProcessorService.ts index 8c260c0137..9ec630ef70 100644 --- a/packages/backend/src/queue/processors/WebhookDeliverProcessorService.ts +++ b/packages/backend/src/queue/processors/UserWebhookDeliverProcessorService.ts @@ -13,10 +13,10 @@ import { HttpRequestService } from '@/core/HttpRequestService.js'; import { StatusError } from '@/misc/status-error.js'; import { bindThis } from '@/decorators.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; -import type { WebhookDeliverJobData } from '../types.js'; +import { UserWebhookDeliverJobData } from '../types.js'; @Injectable() -export class WebhookDeliverProcessorService { +export class UserWebhookDeliverProcessorService { private logger: Logger; constructor( @@ -33,7 +33,7 @@ export class WebhookDeliverProcessorService { } @bindThis - public async process(job: Bull.Job<WebhookDeliverJobData>): Promise<string> { + public async process(job: Bull.Job<UserWebhookDeliverJobData>): Promise<string> { try { this.logger.debug(`delivering ${job.data.webhookId}`); |