summaryrefslogtreecommitdiff
path: root/packages/backend/src/queue/processors
diff options
context:
space:
mode:
Diffstat (limited to 'packages/backend/src/queue/processors')
-rw-r--r--packages/backend/src/queue/processors/DeliverProcessorService.ts15
-rw-r--r--packages/backend/src/queue/processors/ExportAntennasProcessorService.ts103
-rw-r--r--packages/backend/src/queue/processors/ExportBlockingProcessorService.ts2
-rw-r--r--packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts2
-rw-r--r--packages/backend/src/queue/processors/ExportFollowingProcessorService.ts2
-rw-r--r--packages/backend/src/queue/processors/ExportMutingProcessorService.ts2
-rw-r--r--packages/backend/src/queue/processors/ExportNotesProcessorService.ts2
-rw-r--r--packages/backend/src/queue/processors/ExportUserListsProcessorService.ts2
-rw-r--r--packages/backend/src/queue/processors/ImportAntennasProcessorService.ts96
-rw-r--r--packages/backend/src/queue/processors/InboxProcessorService.ts11
-rw-r--r--packages/backend/src/queue/processors/RelationshipProcessorService.ts3
11 files changed, 214 insertions, 26 deletions
diff --git a/packages/backend/src/queue/processors/DeliverProcessorService.ts b/packages/backend/src/queue/processors/DeliverProcessorService.ts
index 0e99b7bcd2..f293bd4d7e 100644
--- a/packages/backend/src/queue/processors/DeliverProcessorService.ts
+++ b/packages/backend/src/queue/processors/DeliverProcessorService.ts
@@ -79,10 +79,7 @@ export class DeliverProcessorService {
// Update stats
this.federatedInstanceService.fetch(host).then(i => {
if (i.isNotResponding) {
- this.instancesRepository.update(i.id, {
- isNotResponding: false,
- });
- this.federatedInstanceService.updateCachePartial(host, {
+ this.federatedInstanceService.update(i.id, {
isNotResponding: false,
});
}
@@ -101,10 +98,7 @@ export class DeliverProcessorService {
// Update stats
this.federatedInstanceService.fetch(host).then(i => {
if (!i.isNotResponding) {
- this.instancesRepository.update(i.id, {
- isNotResponding: true,
- });
- this.federatedInstanceService.updateCachePartial(host, {
+ this.federatedInstanceService.update(i.id, {
isNotResponding: true,
});
}
@@ -123,10 +117,7 @@ export class DeliverProcessorService {
// 相手が閉鎖していることを明示しているため、配送停止する
if (job.data.isSharedInbox && res.statusCode === 410) {
this.federatedInstanceService.fetch(host).then(i => {
- this.instancesRepository.update(i.id, {
- isSuspended: true,
- });
- this.federatedInstanceService.updateCachePartial(host, {
+ this.federatedInstanceService.update(i.id, {
isSuspended: true,
});
});
diff --git a/packages/backend/src/queue/processors/ExportAntennasProcessorService.ts b/packages/backend/src/queue/processors/ExportAntennasProcessorService.ts
new file mode 100644
index 0000000000..894903e79b
--- /dev/null
+++ b/packages/backend/src/queue/processors/ExportAntennasProcessorService.ts
@@ -0,0 +1,103 @@
+import fs from 'node:fs';
+import { Inject, Injectable } from '@nestjs/common';
+import { format as DateFormat } from 'date-fns';
+import { In } from 'typeorm';
+import { DI } from '@/di-symbols.js';
+import type { AntennasRepository, UsersRepository, UserListJoiningsRepository, User } from '@/models/index.js';
+import type { Config } from '@/config.js';
+import Logger from '@/logger.js';
+import { DriveService } from '@/core/DriveService.js';
+import { bindThis } from '@/decorators.js';
+import { createTemp } from '@/misc/create-temp.js';
+import { UtilityService } from '@/core/UtilityService.js';
+import { QueueLoggerService } from '../QueueLoggerService.js';
+import type { DBExportAntennasData } from '../types.js';
+import type Bull from 'bull';
+
+@Injectable()
+export class ExportAntennasProcessorService {
+ private logger: Logger;
+
+ constructor (
+ @Inject(DI.config)
+ private config: Config,
+
+ @Inject(DI.usersRepository)
+ private usersRepository: UsersRepository,
+
+ @Inject(DI.antennasRepository)
+ private antennsRepository: AntennasRepository,
+
+ @Inject(DI.userListJoiningsRepository)
+ private userListJoiningsRepository: UserListJoiningsRepository,
+
+ private driveService: DriveService,
+ private utilityService: UtilityService,
+ private queueLoggerService: QueueLoggerService,
+ ) {
+ this.logger = this.queueLoggerService.logger.createSubLogger('export-antennas');
+ }
+
+ @bindThis
+ public async process(job: Bull.Job<DBExportAntennasData>, done: () => void): Promise<void> {
+ const user = await this.usersRepository.findOneBy({ id: job.data.user.id });
+ if (user == null) {
+ done();
+ return;
+ }
+ const [path, cleanup] = await createTemp();
+ const stream = fs.createWriteStream(path, { flags: 'a' });
+ const write = (input: string): Promise<void> => {
+ return new Promise((resolve, reject) => {
+ stream.write(input, err => {
+ if (err) {
+ this.logger.error(err);
+ reject();
+ } else {
+ resolve();
+ }
+ });
+ });
+ };
+ try {
+ const antennas = await this.antennsRepository.findBy({ userId: job.data.user.id });
+ write('[');
+ for (const [index, antenna] of antennas.entries()) {
+ let users: User[] | undefined;
+ if (antenna.userListId !== null) {
+ const joinings = await this.userListJoiningsRepository.findBy({ userListId: antenna.userListId });
+ users = await this.usersRepository.findBy({
+ id: In(joinings.map(j => j.userId)),
+ });
+ }
+ write(JSON.stringify({
+ name: antenna.name,
+ src: antenna.src,
+ keywords: antenna.keywords,
+ excludeKeywords: antenna.excludeKeywords,
+ users: antenna.users,
+ userListAccts: typeof users !== 'undefined' ? users.map((u) => {
+ return this.utilityService.getFullApAccount(u.username, u.host); // acct
+ }) : null,
+ caseSensitive: antenna.caseSensitive,
+ withReplies: antenna.withReplies,
+ withFile: antenna.withFile,
+ notify: antenna.notify,
+ }));
+ if (antennas.length - 1 !== index) {
+ write(', ');
+ }
+ }
+ write(']');
+ stream.end();
+
+ const fileName = 'antennas-' + DateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.json';
+ const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true, ext: 'json' });
+ this.logger.succ('Exported to: ' + driveFile.id);
+ } finally {
+ cleanup();
+ done();
+ }
+ }
+}
+
diff --git a/packages/backend/src/queue/processors/ExportBlockingProcessorService.ts b/packages/backend/src/queue/processors/ExportBlockingProcessorService.ts
index a020006732..c7b54070d6 100644
--- a/packages/backend/src/queue/processors/ExportBlockingProcessorService.ts
+++ b/packages/backend/src/queue/processors/ExportBlockingProcessorService.ts
@@ -106,7 +106,7 @@ export class ExportBlockingProcessorService {
this.logger.succ(`Exported to: ${path}`);
const fileName = 'blocking-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.csv';
- const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true });
+ const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true, ext: 'csv' });
this.logger.succ(`Exported to: ${driveFile.id}`);
} finally {
diff --git a/packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts b/packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts
index daefcdf2f5..f2f2383a88 100644
--- a/packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts
+++ b/packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts
@@ -121,7 +121,7 @@ export class ExportFavoritesProcessorService {
this.logger.succ(`Exported to: ${path}`);
const fileName = 'favorites-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.json';
- const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true });
+ const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true, ext: 'json' });
this.logger.succ(`Exported to: ${driveFile.id}`);
} finally {
diff --git a/packages/backend/src/queue/processors/ExportFollowingProcessorService.ts b/packages/backend/src/queue/processors/ExportFollowingProcessorService.ts
index 59443de57f..fa9c1ac1ea 100644
--- a/packages/backend/src/queue/processors/ExportFollowingProcessorService.ts
+++ b/packages/backend/src/queue/processors/ExportFollowingProcessorService.ts
@@ -110,7 +110,7 @@ export class ExportFollowingProcessorService {
this.logger.succ(`Exported to: ${path}`);
const fileName = 'following-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.csv';
- const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true });
+ const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true, ext: 'csv' });
this.logger.succ(`Exported to: ${driveFile.id}`);
} finally {
diff --git a/packages/backend/src/queue/processors/ExportMutingProcessorService.ts b/packages/backend/src/queue/processors/ExportMutingProcessorService.ts
index a2a718b892..b14bf5f5b1 100644
--- a/packages/backend/src/queue/processors/ExportMutingProcessorService.ts
+++ b/packages/backend/src/queue/processors/ExportMutingProcessorService.ts
@@ -110,7 +110,7 @@ export class ExportMutingProcessorService {
this.logger.succ(`Exported to: ${path}`);
const fileName = 'mute-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.csv';
- const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true });
+ const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true, ext: 'csv' });
this.logger.succ(`Exported to: ${driveFile.id}`);
} finally {
diff --git a/packages/backend/src/queue/processors/ExportNotesProcessorService.ts b/packages/backend/src/queue/processors/ExportNotesProcessorService.ts
index 1aa20d6f1d..e4f12ad101 100644
--- a/packages/backend/src/queue/processors/ExportNotesProcessorService.ts
+++ b/packages/backend/src/queue/processors/ExportNotesProcessorService.ts
@@ -117,7 +117,7 @@ export class ExportNotesProcessorService {
this.logger.succ(`Exported to: ${path}`);
const fileName = 'notes-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.json';
- const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true });
+ const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true, ext: 'json' });
this.logger.succ(`Exported to: ${driveFile.id}`);
} finally {
diff --git a/packages/backend/src/queue/processors/ExportUserListsProcessorService.ts b/packages/backend/src/queue/processors/ExportUserListsProcessorService.ts
index ce8ed2f5e8..54bde44044 100644
--- a/packages/backend/src/queue/processors/ExportUserListsProcessorService.ts
+++ b/packages/backend/src/queue/processors/ExportUserListsProcessorService.ts
@@ -86,7 +86,7 @@ export class ExportUserListsProcessorService {
this.logger.succ(`Exported to: ${path}`);
const fileName = 'user-lists-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.csv';
- const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true });
+ const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true, ext: 'csv' });
this.logger.succ(`Exported to: ${driveFile.id}`);
} finally {
diff --git a/packages/backend/src/queue/processors/ImportAntennasProcessorService.ts b/packages/backend/src/queue/processors/ImportAntennasProcessorService.ts
new file mode 100644
index 0000000000..d06131b8c8
--- /dev/null
+++ b/packages/backend/src/queue/processors/ImportAntennasProcessorService.ts
@@ -0,0 +1,96 @@
+import { Injectable, Inject } from '@nestjs/common';
+import Ajv from 'ajv';
+import { IdService } from '@/core/IdService.js';
+import { GlobalEventService } from '@/core/GlobalEventService.js';
+import Logger from '@/logger.js';
+import type { AntennasRepository } from '@/models/index.js';
+import { DI } from '@/di-symbols.js';
+import { bindThis } from '@/decorators.js';
+import { QueueLoggerService } from '../QueueLoggerService.js';
+import { DBAntennaImportJobData } from '../types.js';
+import type Bull from 'bull';
+
+const validate = new Ajv().compile({
+ type: 'object',
+ properties: {
+ name: { type: 'string', minLength: 1, maxLength: 100 },
+ src: { type: 'string', enum: ['home', 'all', 'users', 'list'] },
+ userListAccts: {
+ type: 'array',
+ items: {
+ type: 'string',
+ },
+ nullable: true,
+ },
+ keywords: { type: 'array', items: {
+ type: 'array', items: {
+ type: 'string',
+ },
+ } },
+ excludeKeywords: { type: 'array', items: {
+ type: 'array', items: {
+ type: 'string',
+ },
+ } },
+ users: { type: 'array', items: {
+ type: 'string',
+ } },
+ caseSensitive: { type: 'boolean' },
+ withReplies: { type: 'boolean' },
+ withFile: { type: 'boolean' },
+ notify: { type: 'boolean' },
+ },
+ required: ['name', 'src', 'keywords', 'excludeKeywords', 'users', 'caseSensitive', 'withReplies', 'withFile', 'notify'],
+});
+
+@Injectable()
+export class ImportAntennasProcessorService {
+ private logger: Logger;
+
+ constructor (
+ @Inject(DI.antennasRepository)
+ private antennasRepository: AntennasRepository,
+
+ private queueLoggerService: QueueLoggerService,
+ private idService: IdService,
+ private globalEventService: GlobalEventService,
+ ) {
+ this.logger = this.queueLoggerService.logger.createSubLogger('import-antennas');
+ }
+
+ @bindThis
+ public async process(job: Bull.Job<DBAntennaImportJobData>, done: () => void): Promise<void> {
+ const now = new Date();
+ try {
+ for (const antenna of job.data.antenna) {
+ if (antenna.keywords.length === 0 || antenna.keywords[0].every(x => x === '')) continue;
+ if (!validate(antenna)) {
+ this.logger.warn('Validation Failed');
+ continue;
+ }
+ const result = await this.antennasRepository.insert({
+ id: this.idService.genId(),
+ createdAt: now,
+ lastUsedAt: now,
+ userId: job.data.user.id,
+ name: antenna.name,
+ src: antenna.src === 'list' && antenna.userListAccts ? 'users' : antenna.src,
+ userListId: null,
+ keywords: antenna.keywords,
+ excludeKeywords: antenna.excludeKeywords,
+ users: (antenna.src === 'list' && antenna.userListAccts !== null ? antenna.userListAccts : antenna.users).filter(Boolean),
+ caseSensitive: antenna.caseSensitive,
+ withReplies: antenna.withReplies,
+ withFile: antenna.withFile,
+ notify: antenna.notify,
+ }).then(x => this.antennasRepository.findOneByOrFail(x.identifiers[0]));
+ this.logger.succ('Antenna created: ' + result.id);
+ this.globalEventService.publishInternalEvent('antennaCreated', result);
+ }
+ } catch (err: any) {
+ this.logger.error(err);
+ } finally {
+ done();
+ }
+ }
+}
diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts
index ed7f38d013..ab8b1e9e22 100644
--- a/packages/backend/src/queue/processors/InboxProcessorService.ts
+++ b/packages/backend/src/queue/processors/InboxProcessorService.ts
@@ -84,9 +84,9 @@ export class InboxProcessorService {
// HTTP-Signature keyIdを元にDBから取得
let authUser: {
- user: RemoteUser;
- key: UserPublickey | null;
- } | null = await this.apDbResolverService.getAuthUserFromKeyId(signature.keyId);
+ user: RemoteUser;
+ key: UserPublickey | null;
+ } | null = await this.apDbResolverService.getAuthUserFromKeyId(signature.keyId);
// keyIdでわからなければ、activity.actorを元にDBから取得 || activity.actorを元にリモートから取得
if (authUser == null) {
@@ -174,13 +174,10 @@ export class InboxProcessorService {
// Update stats
this.federatedInstanceService.fetch(authUser.user.host).then(i => {
- this.instancesRepository.update(i.id, {
+ this.federatedInstanceService.update(i.id, {
latestRequestReceivedAt: new Date(),
isNotResponding: false,
});
- this.federatedInstanceService.updateCachePartial(host, {
- isNotResponding: false,
- });
this.fetchInstanceMetadataService.fetchInstanceMetadata(i);
diff --git a/packages/backend/src/queue/processors/RelationshipProcessorService.ts b/packages/backend/src/queue/processors/RelationshipProcessorService.ts
index a5006dcf03..ff454df455 100644
--- a/packages/backend/src/queue/processors/RelationshipProcessorService.ts
+++ b/packages/backend/src/queue/processors/RelationshipProcessorService.ts
@@ -10,6 +10,7 @@ import { QueueLoggerService } from '../QueueLoggerService.js';
import { RelationshipJobData } from '../types.js';
import type { UsersRepository } from '@/models/index.js';
import { DI } from '@/di-symbols.js';
+import { LocalUser, RemoteUser } from '@/models/entities/User.js';
@Injectable()
export class RelationshipProcessorService {
@@ -39,7 +40,7 @@ export class RelationshipProcessorService {
const [follower, followee] = await Promise.all([
this.usersRepository.findOneByOrFail({ id: job.data.from.id }),
this.usersRepository.findOneByOrFail({ id: job.data.to.id }),
- ]);
+ ]) as [LocalUser | RemoteUser, LocalUser | RemoteUser];
await this.userFollowingService.unfollow(follower, followee, job.data.silent);
return 'ok';
}