diff options
| author | syuilo <Syuilotan@yahoo.co.jp> | 2023-05-09 09:17:34 +0900 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2023-05-09 09:17:34 +0900 |
| commit | 94690c835e3179e3fd616758ad00a8b66d844a0a (patch) | |
| tree | 3171356ca8298aa6caae7c95df7232844163f913 /packages/backend/src/queue/processors | |
| parent | Merge pull request #10608 from misskey-dev/develop (diff) | |
| parent | [ci skip] 13.12.0 (diff) | |
| download | misskey-94690c835e3179e3fd616758ad00a8b66d844a0a.tar.gz misskey-94690c835e3179e3fd616758ad00a8b66d844a0a.tar.bz2 misskey-94690c835e3179e3fd616758ad00a8b66d844a0a.zip | |
Merge pull request #10774 from misskey-dev/develop
Release: 13.12.0
Diffstat (limited to 'packages/backend/src/queue/processors')
11 files changed, 214 insertions, 26 deletions
diff --git a/packages/backend/src/queue/processors/DeliverProcessorService.ts b/packages/backend/src/queue/processors/DeliverProcessorService.ts index 0e99b7bcd2..f293bd4d7e 100644 --- a/packages/backend/src/queue/processors/DeliverProcessorService.ts +++ b/packages/backend/src/queue/processors/DeliverProcessorService.ts @@ -79,10 +79,7 @@ export class DeliverProcessorService { // Update stats this.federatedInstanceService.fetch(host).then(i => { if (i.isNotResponding) { - this.instancesRepository.update(i.id, { - isNotResponding: false, - }); - this.federatedInstanceService.updateCachePartial(host, { + this.federatedInstanceService.update(i.id, { isNotResponding: false, }); } @@ -101,10 +98,7 @@ export class DeliverProcessorService { // Update stats this.federatedInstanceService.fetch(host).then(i => { if (!i.isNotResponding) { - this.instancesRepository.update(i.id, { - isNotResponding: true, - }); - this.federatedInstanceService.updateCachePartial(host, { + this.federatedInstanceService.update(i.id, { isNotResponding: true, }); } @@ -123,10 +117,7 @@ export class DeliverProcessorService { // 相手が閉鎖していることを明示しているため、配送停止する if (job.data.isSharedInbox && res.statusCode === 410) { this.federatedInstanceService.fetch(host).then(i => { - this.instancesRepository.update(i.id, { - isSuspended: true, - }); - this.federatedInstanceService.updateCachePartial(host, { + this.federatedInstanceService.update(i.id, { isSuspended: true, }); }); diff --git a/packages/backend/src/queue/processors/ExportAntennasProcessorService.ts b/packages/backend/src/queue/processors/ExportAntennasProcessorService.ts new file mode 100644 index 0000000000..894903e79b --- /dev/null +++ b/packages/backend/src/queue/processors/ExportAntennasProcessorService.ts @@ -0,0 +1,103 @@ +import fs from 'node:fs'; +import { Inject, Injectable } from '@nestjs/common'; +import { format as DateFormat } from 'date-fns'; +import { In } from 'typeorm'; +import { DI } from '@/di-symbols.js'; +import type { AntennasRepository, UsersRepository, UserListJoiningsRepository, User } from '@/models/index.js'; +import type { Config } from '@/config.js'; +import Logger from '@/logger.js'; +import { DriveService } from '@/core/DriveService.js'; +import { bindThis } from '@/decorators.js'; +import { createTemp } from '@/misc/create-temp.js'; +import { UtilityService } from '@/core/UtilityService.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import type { DBExportAntennasData } from '../types.js'; +import type Bull from 'bull'; + +@Injectable() +export class ExportAntennasProcessorService { + private logger: Logger; + + constructor ( + @Inject(DI.config) + private config: Config, + + @Inject(DI.usersRepository) + private usersRepository: UsersRepository, + + @Inject(DI.antennasRepository) + private antennsRepository: AntennasRepository, + + @Inject(DI.userListJoiningsRepository) + private userListJoiningsRepository: UserListJoiningsRepository, + + private driveService: DriveService, + private utilityService: UtilityService, + private queueLoggerService: QueueLoggerService, + ) { + this.logger = this.queueLoggerService.logger.createSubLogger('export-antennas'); + } + + @bindThis + public async process(job: Bull.Job<DBExportAntennasData>, done: () => void): Promise<void> { + const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); + if (user == null) { + done(); + return; + } + const [path, cleanup] = await createTemp(); + const stream = fs.createWriteStream(path, { flags: 'a' }); + const write = (input: string): Promise<void> => { + return new Promise((resolve, reject) => { + stream.write(input, err => { + if (err) { + this.logger.error(err); + reject(); + } else { + resolve(); + } + }); + }); + }; + try { + const antennas = await this.antennsRepository.findBy({ userId: job.data.user.id }); + write('['); + for (const [index, antenna] of antennas.entries()) { + let users: User[] | undefined; + if (antenna.userListId !== null) { + const joinings = await this.userListJoiningsRepository.findBy({ userListId: antenna.userListId }); + users = await this.usersRepository.findBy({ + id: In(joinings.map(j => j.userId)), + }); + } + write(JSON.stringify({ + name: antenna.name, + src: antenna.src, + keywords: antenna.keywords, + excludeKeywords: antenna.excludeKeywords, + users: antenna.users, + userListAccts: typeof users !== 'undefined' ? users.map((u) => { + return this.utilityService.getFullApAccount(u.username, u.host); // acct + }) : null, + caseSensitive: antenna.caseSensitive, + withReplies: antenna.withReplies, + withFile: antenna.withFile, + notify: antenna.notify, + })); + if (antennas.length - 1 !== index) { + write(', '); + } + } + write(']'); + stream.end(); + + const fileName = 'antennas-' + DateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.json'; + const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true, ext: 'json' }); + this.logger.succ('Exported to: ' + driveFile.id); + } finally { + cleanup(); + done(); + } + } +} + diff --git a/packages/backend/src/queue/processors/ExportBlockingProcessorService.ts b/packages/backend/src/queue/processors/ExportBlockingProcessorService.ts index a020006732..c7b54070d6 100644 --- a/packages/backend/src/queue/processors/ExportBlockingProcessorService.ts +++ b/packages/backend/src/queue/processors/ExportBlockingProcessorService.ts @@ -106,7 +106,7 @@ export class ExportBlockingProcessorService { this.logger.succ(`Exported to: ${path}`); const fileName = 'blocking-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.csv'; - const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true }); + const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true, ext: 'csv' }); this.logger.succ(`Exported to: ${driveFile.id}`); } finally { diff --git a/packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts b/packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts index daefcdf2f5..f2f2383a88 100644 --- a/packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts +++ b/packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts @@ -121,7 +121,7 @@ export class ExportFavoritesProcessorService { this.logger.succ(`Exported to: ${path}`); const fileName = 'favorites-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.json'; - const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true }); + const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true, ext: 'json' }); this.logger.succ(`Exported to: ${driveFile.id}`); } finally { diff --git a/packages/backend/src/queue/processors/ExportFollowingProcessorService.ts b/packages/backend/src/queue/processors/ExportFollowingProcessorService.ts index 59443de57f..fa9c1ac1ea 100644 --- a/packages/backend/src/queue/processors/ExportFollowingProcessorService.ts +++ b/packages/backend/src/queue/processors/ExportFollowingProcessorService.ts @@ -110,7 +110,7 @@ export class ExportFollowingProcessorService { this.logger.succ(`Exported to: ${path}`); const fileName = 'following-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.csv'; - const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true }); + const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true, ext: 'csv' }); this.logger.succ(`Exported to: ${driveFile.id}`); } finally { diff --git a/packages/backend/src/queue/processors/ExportMutingProcessorService.ts b/packages/backend/src/queue/processors/ExportMutingProcessorService.ts index a2a718b892..b14bf5f5b1 100644 --- a/packages/backend/src/queue/processors/ExportMutingProcessorService.ts +++ b/packages/backend/src/queue/processors/ExportMutingProcessorService.ts @@ -110,7 +110,7 @@ export class ExportMutingProcessorService { this.logger.succ(`Exported to: ${path}`); const fileName = 'mute-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.csv'; - const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true }); + const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true, ext: 'csv' }); this.logger.succ(`Exported to: ${driveFile.id}`); } finally { diff --git a/packages/backend/src/queue/processors/ExportNotesProcessorService.ts b/packages/backend/src/queue/processors/ExportNotesProcessorService.ts index 1aa20d6f1d..e4f12ad101 100644 --- a/packages/backend/src/queue/processors/ExportNotesProcessorService.ts +++ b/packages/backend/src/queue/processors/ExportNotesProcessorService.ts @@ -117,7 +117,7 @@ export class ExportNotesProcessorService { this.logger.succ(`Exported to: ${path}`); const fileName = 'notes-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.json'; - const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true }); + const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true, ext: 'json' }); this.logger.succ(`Exported to: ${driveFile.id}`); } finally { diff --git a/packages/backend/src/queue/processors/ExportUserListsProcessorService.ts b/packages/backend/src/queue/processors/ExportUserListsProcessorService.ts index ce8ed2f5e8..54bde44044 100644 --- a/packages/backend/src/queue/processors/ExportUserListsProcessorService.ts +++ b/packages/backend/src/queue/processors/ExportUserListsProcessorService.ts @@ -86,7 +86,7 @@ export class ExportUserListsProcessorService { this.logger.succ(`Exported to: ${path}`); const fileName = 'user-lists-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.csv'; - const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true }); + const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true, ext: 'csv' }); this.logger.succ(`Exported to: ${driveFile.id}`); } finally { diff --git a/packages/backend/src/queue/processors/ImportAntennasProcessorService.ts b/packages/backend/src/queue/processors/ImportAntennasProcessorService.ts new file mode 100644 index 0000000000..d06131b8c8 --- /dev/null +++ b/packages/backend/src/queue/processors/ImportAntennasProcessorService.ts @@ -0,0 +1,96 @@ +import { Injectable, Inject } from '@nestjs/common'; +import Ajv from 'ajv'; +import { IdService } from '@/core/IdService.js'; +import { GlobalEventService } from '@/core/GlobalEventService.js'; +import Logger from '@/logger.js'; +import type { AntennasRepository } from '@/models/index.js'; +import { DI } from '@/di-symbols.js'; +import { bindThis } from '@/decorators.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import { DBAntennaImportJobData } from '../types.js'; +import type Bull from 'bull'; + +const validate = new Ajv().compile({ + type: 'object', + properties: { + name: { type: 'string', minLength: 1, maxLength: 100 }, + src: { type: 'string', enum: ['home', 'all', 'users', 'list'] }, + userListAccts: { + type: 'array', + items: { + type: 'string', + }, + nullable: true, + }, + keywords: { type: 'array', items: { + type: 'array', items: { + type: 'string', + }, + } }, + excludeKeywords: { type: 'array', items: { + type: 'array', items: { + type: 'string', + }, + } }, + users: { type: 'array', items: { + type: 'string', + } }, + caseSensitive: { type: 'boolean' }, + withReplies: { type: 'boolean' }, + withFile: { type: 'boolean' }, + notify: { type: 'boolean' }, + }, + required: ['name', 'src', 'keywords', 'excludeKeywords', 'users', 'caseSensitive', 'withReplies', 'withFile', 'notify'], +}); + +@Injectable() +export class ImportAntennasProcessorService { + private logger: Logger; + + constructor ( + @Inject(DI.antennasRepository) + private antennasRepository: AntennasRepository, + + private queueLoggerService: QueueLoggerService, + private idService: IdService, + private globalEventService: GlobalEventService, + ) { + this.logger = this.queueLoggerService.logger.createSubLogger('import-antennas'); + } + + @bindThis + public async process(job: Bull.Job<DBAntennaImportJobData>, done: () => void): Promise<void> { + const now = new Date(); + try { + for (const antenna of job.data.antenna) { + if (antenna.keywords.length === 0 || antenna.keywords[0].every(x => x === '')) continue; + if (!validate(antenna)) { + this.logger.warn('Validation Failed'); + continue; + } + const result = await this.antennasRepository.insert({ + id: this.idService.genId(), + createdAt: now, + lastUsedAt: now, + userId: job.data.user.id, + name: antenna.name, + src: antenna.src === 'list' && antenna.userListAccts ? 'users' : antenna.src, + userListId: null, + keywords: antenna.keywords, + excludeKeywords: antenna.excludeKeywords, + users: (antenna.src === 'list' && antenna.userListAccts !== null ? antenna.userListAccts : antenna.users).filter(Boolean), + caseSensitive: antenna.caseSensitive, + withReplies: antenna.withReplies, + withFile: antenna.withFile, + notify: antenna.notify, + }).then(x => this.antennasRepository.findOneByOrFail(x.identifiers[0])); + this.logger.succ('Antenna created: ' + result.id); + this.globalEventService.publishInternalEvent('antennaCreated', result); + } + } catch (err: any) { + this.logger.error(err); + } finally { + done(); + } + } +} diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts index ed7f38d013..ab8b1e9e22 100644 --- a/packages/backend/src/queue/processors/InboxProcessorService.ts +++ b/packages/backend/src/queue/processors/InboxProcessorService.ts @@ -84,9 +84,9 @@ export class InboxProcessorService { // HTTP-Signature keyIdを元にDBから取得 let authUser: { - user: RemoteUser; - key: UserPublickey | null; - } | null = await this.apDbResolverService.getAuthUserFromKeyId(signature.keyId); + user: RemoteUser; + key: UserPublickey | null; + } | null = await this.apDbResolverService.getAuthUserFromKeyId(signature.keyId); // keyIdでわからなければ、activity.actorを元にDBから取得 || activity.actorを元にリモートから取得 if (authUser == null) { @@ -174,13 +174,10 @@ export class InboxProcessorService { // Update stats this.federatedInstanceService.fetch(authUser.user.host).then(i => { - this.instancesRepository.update(i.id, { + this.federatedInstanceService.update(i.id, { latestRequestReceivedAt: new Date(), isNotResponding: false, }); - this.federatedInstanceService.updateCachePartial(host, { - isNotResponding: false, - }); this.fetchInstanceMetadataService.fetchInstanceMetadata(i); diff --git a/packages/backend/src/queue/processors/RelationshipProcessorService.ts b/packages/backend/src/queue/processors/RelationshipProcessorService.ts index a5006dcf03..ff454df455 100644 --- a/packages/backend/src/queue/processors/RelationshipProcessorService.ts +++ b/packages/backend/src/queue/processors/RelationshipProcessorService.ts @@ -10,6 +10,7 @@ import { QueueLoggerService } from '../QueueLoggerService.js'; import { RelationshipJobData } from '../types.js'; import type { UsersRepository } from '@/models/index.js'; import { DI } from '@/di-symbols.js'; +import { LocalUser, RemoteUser } from '@/models/entities/User.js'; @Injectable() export class RelationshipProcessorService { @@ -39,7 +40,7 @@ export class RelationshipProcessorService { const [follower, followee] = await Promise.all([ this.usersRepository.findOneByOrFail({ id: job.data.from.id }), this.usersRepository.findOneByOrFail({ id: job.data.to.id }), - ]); + ]) as [LocalUser | RemoteUser, LocalUser | RemoteUser]; await this.userFollowingService.unfollow(follower, followee, job.data.silent); return 'ok'; } |