summaryrefslogtreecommitdiff
path: root/src/queue
diff options
context:
space:
mode:
authorsyuilo <Syuilotan@yahoo.co.jp>2021-08-21 17:59:29 +0900
committersyuilo <Syuilotan@yahoo.co.jp>2021-08-21 17:59:29 +0900
commitf00ceedae48e7969ca9e80f0af2280bf060421ec (patch)
tree620bb82f6a2ce41f3b3b3d187242bd5bc8e35171 /src/queue
parentMerge branch 'develop' (diff)
parent12.89.0 (diff)
downloadmisskey-f00ceedae48e7969ca9e80f0af2280bf060421ec.tar.gz
misskey-f00ceedae48e7969ca9e80f0af2280bf060421ec.tar.bz2
misskey-f00ceedae48e7969ca9e80f0af2280bf060421ec.zip
Merge branch 'develop'
Diffstat (limited to 'src/queue')
-rw-r--r--src/queue/index.ts23
-rw-r--r--src/queue/initialize.ts23
-rw-r--r--src/queue/logger.ts2
-rw-r--r--src/queue/processors/db/delete-account.ts89
-rw-r--r--src/queue/processors/db/delete-drive-files.ts4
-rw-r--r--src/queue/processors/db/export-blocking.ts6
-rw-r--r--src/queue/processors/db/export-following.ts6
-rw-r--r--src/queue/processors/db/export-mute.ts6
-rw-r--r--src/queue/processors/db/export-notes.ts10
-rw-r--r--src/queue/processors/db/export-user-lists.ts6
-rw-r--r--src/queue/processors/db/import-following.ts6
-rw-r--r--src/queue/processors/db/import-user-lists.ts6
-rw-r--r--src/queue/processors/db/index.ts4
-rw-r--r--src/queue/processors/deliver.ts14
-rw-r--r--src/queue/processors/inbox.ts20
-rw-r--r--src/queue/processors/object-storage/clean-remote-files.ts4
-rw-r--r--src/queue/processors/object-storage/delete-file.ts2
-rw-r--r--src/queue/queues.ts2
18 files changed, 173 insertions, 60 deletions
diff --git a/src/queue/index.ts b/src/queue/index.ts
index c7b7f0392c..4ca7998e61 100644
--- a/src/queue/index.ts
+++ b/src/queue/index.ts
@@ -1,14 +1,14 @@
import * as httpSignature from 'http-signature';
-import config from '@/config';
+import config from '@/config/index';
import { program } from '../argv';
import processDeliver from './processors/deliver';
import processInbox from './processors/inbox';
-import processDb from './processors/db';
-import procesObjectStorage from './processors/object-storage';
+import processDb from './processors/db/index';
+import procesObjectStorage from './processors/object-storage/index';
import { queueLogger } from './logger';
-import { DriveFile } from '../models/entities/drive-file';
+import { DriveFile } from '@/models/entities/drive-file';
import { getJobInfo } from './get-job-info';
import { dbQueue, deliverQueue, inboxQueue, objectStorageQueue } from './queues';
import { ThinUser } from './types';
@@ -73,8 +73,7 @@ export function deliver(user: ThinUser, content: unknown, to: string | null) {
attempts: config.deliverJobMaxAttempts || 12,
timeout: 1 * 60 * 1000, // 1min
backoff: {
- type: 'exponential',
- delay: 60 * 1000
+ type: 'apBackoff'
},
removeOnComplete: true,
removeOnFail: true
@@ -91,8 +90,7 @@ export function inbox(activity: IActivity, signature: httpSignature.IParsedSigna
attempts: config.inboxJobMaxAttempts || 8,
timeout: 5 * 60 * 1000, // 5min
backoff: {
- type: 'exponential',
- delay: 60 * 1000
+ type: 'apBackoff'
},
removeOnComplete: true,
removeOnFail: true
@@ -173,6 +171,15 @@ export function createImportUserListsJob(user: ThinUser, fileId: DriveFile['id']
});
}
+export function createDeleteAccountJob(user: ThinUser) {
+ return dbQueue.add('deleteAccount', {
+ user: user
+ }, {
+ removeOnComplete: true,
+ removeOnFail: true
+ });
+}
+
export function createDeleteObjectStorageFileJob(key: string) {
return objectStorageQueue.add('deleteFile', {
key: key
diff --git a/src/queue/initialize.ts b/src/queue/initialize.ts
index 4c0e5f9d87..31102a3ed2 100644
--- a/src/queue/initialize.ts
+++ b/src/queue/initialize.ts
@@ -1,5 +1,5 @@
import * as Bull from 'bull';
-import config from '@/config';
+import config from '@/config/index';
export function initialize<T>(name: string, limitPerSec = -1) {
return new Bull<T>(name, {
@@ -11,8 +11,23 @@ export function initialize<T>(name: string, limitPerSec = -1) {
},
prefix: config.redis.prefix ? `${config.redis.prefix}:queue` : 'queue',
limiter: limitPerSec > 0 ? {
- max: limitPerSec * 5,
- duration: 5000
- } : undefined
+ max: limitPerSec,
+ duration: 1000
+ } : undefined,
+ settings: {
+ backoffStrategies: {
+ apBackoff
+ }
+ }
});
}
+
+// ref. https://github.com/misskey-dev/misskey/pull/7635#issue-971097019
+function apBackoff(attemptsMade: number, err: Error) {
+ const baseDelay = 60 * 1000; // 1min
+ const maxBackoff = 8 * 60 * 60 * 1000; // 8hours
+ let backoff = (Math.pow(2, attemptsMade) - 1) * baseDelay;
+ backoff = Math.min(backoff, maxBackoff);
+ backoff += Math.round(backoff * Math.random() * 0.2);
+ return backoff;
+}
diff --git a/src/queue/logger.ts b/src/queue/logger.ts
index d6d0774680..f789b9d079 100644
--- a/src/queue/logger.ts
+++ b/src/queue/logger.ts
@@ -1,3 +1,3 @@
-import Logger from '../services/logger';
+import Logger from '@/services/logger';
export const queueLogger = new Logger('queue', 'orange');
diff --git a/src/queue/processors/db/delete-account.ts b/src/queue/processors/db/delete-account.ts
new file mode 100644
index 0000000000..65327754c2
--- /dev/null
+++ b/src/queue/processors/db/delete-account.ts
@@ -0,0 +1,89 @@
+import * as Bull from 'bull';
+import { queueLogger } from '../../logger';
+import { DriveFiles, Notes, UserProfiles, Users } from '@/models/index';
+import { DbUserJobData } from '@/queue/types';
+import { Note } from '@/models/entities/note';
+import { DriveFile } from '@/models/entities/drive-file';
+import { MoreThan } from 'typeorm';
+import { deleteFileSync } from '@/services/drive/delete-file';
+import { sendEmail } from '@/services/send-email';
+
+const logger = queueLogger.createSubLogger('delete-account');
+
+export async function deleteAccount(job: Bull.Job<DbUserJobData>): Promise<string | void> {
+ logger.info(`Deleting account of ${job.data.user.id} ...`);
+
+ const user = await Users.findOne(job.data.user.id);
+ if (user == null) {
+ return;
+ }
+
+ { // Delete notes
+ let cursor: Note['id'] | null = null;
+
+ while (true) {
+ const notes = await Notes.find({
+ where: {
+ userId: user.id,
+ ...(cursor ? { id: MoreThan(cursor) } : {})
+ },
+ take: 100,
+ order: {
+ id: 1
+ }
+ });
+
+ if (notes.length === 0) {
+ break;
+ }
+
+ cursor = notes[notes.length - 1].id;
+
+ await Notes.delete(notes.map(note => note.id));
+ }
+
+ logger.succ(`All of notes deleted`);
+ }
+
+ { // Delete files
+ let cursor: DriveFile['id'] | null = null;
+
+ while (true) {
+ const files = await DriveFiles.find({
+ where: {
+ userId: user.id,
+ ...(cursor ? { id: MoreThan(cursor) } : {})
+ },
+ take: 10,
+ order: {
+ id: 1
+ }
+ });
+
+ if (files.length === 0) {
+ break;
+ }
+
+ cursor = files[files.length - 1].id;
+
+ for (const file of files) {
+ await deleteFileSync(file);
+ }
+ }
+
+ logger.succ(`All of files deleted`);
+ }
+
+ { // Send email notification
+ const profile = await UserProfiles.findOneOrFail(user.id);
+ if (profile.email && profile.emailVerified) {
+ sendEmail(profile.email, 'Account deleted',
+ `Your account has been deleted.`,
+ `Your account has been deleted.`);
+ }
+ }
+
+ await Users.delete(job.data.user.id);
+
+ return 'Account deleted';
+}
diff --git a/src/queue/processors/db/delete-drive-files.ts b/src/queue/processors/db/delete-drive-files.ts
index 874623204b..8a28468b0d 100644
--- a/src/queue/processors/db/delete-drive-files.ts
+++ b/src/queue/processors/db/delete-drive-files.ts
@@ -1,8 +1,8 @@
import * as Bull from 'bull';
import { queueLogger } from '../../logger';
-import { deleteFileSync } from '../../../services/drive/delete-file';
-import { Users, DriveFiles } from '../../../models';
+import { deleteFileSync } from '@/services/drive/delete-file';
+import { Users, DriveFiles } from '@/models/index';
import { MoreThan } from 'typeorm';
import { DbUserJobData } from '@/queue/types';
diff --git a/src/queue/processors/db/export-blocking.ts b/src/queue/processors/db/export-blocking.ts
index 001b50a22c..a0fc385006 100644
--- a/src/queue/processors/db/export-blocking.ts
+++ b/src/queue/processors/db/export-blocking.ts
@@ -3,10 +3,10 @@ import * as tmp from 'tmp';
import * as fs from 'fs';
import { queueLogger } from '../../logger';
-import addFile from '../../../services/drive/add-file';
-import dateFormat = require('dateformat');
+import addFile from '@/services/drive/add-file';
+import dateFormat from 'dateformat';
import { getFullApAccount } from '@/misc/convert-host';
-import { Users, Blockings } from '../../../models';
+import { Users, Blockings } from '@/models/index';
import { MoreThan } from 'typeorm';
import { DbUserJobData } from '@/queue/types';
diff --git a/src/queue/processors/db/export-following.ts b/src/queue/processors/db/export-following.ts
index c1ccb7af4c..3612150363 100644
--- a/src/queue/processors/db/export-following.ts
+++ b/src/queue/processors/db/export-following.ts
@@ -3,10 +3,10 @@ import * as tmp from 'tmp';
import * as fs from 'fs';
import { queueLogger } from '../../logger';
-import addFile from '../../../services/drive/add-file';
-import dateFormat = require('dateformat');
+import addFile from '@/services/drive/add-file';
+import dateFormat from 'dateformat';
import { getFullApAccount } from '@/misc/convert-host';
-import { Users, Followings } from '../../../models';
+import { Users, Followings } from '@/models/index';
import { MoreThan } from 'typeorm';
import { DbUserJobData } from '@/queue/types';
diff --git a/src/queue/processors/db/export-mute.ts b/src/queue/processors/db/export-mute.ts
index 55d45cc29c..70b2272cdb 100644
--- a/src/queue/processors/db/export-mute.ts
+++ b/src/queue/processors/db/export-mute.ts
@@ -3,10 +3,10 @@ import * as tmp from 'tmp';
import * as fs from 'fs';
import { queueLogger } from '../../logger';
-import addFile from '../../../services/drive/add-file';
-import dateFormat = require('dateformat');
+import addFile from '@/services/drive/add-file';
+import dateFormat from 'dateformat';
import { getFullApAccount } from '@/misc/convert-host';
-import { Users, Mutings } from '../../../models';
+import { Users, Mutings } from '@/models/index';
import { MoreThan } from 'typeorm';
import { DbUserJobData } from '@/queue/types';
diff --git a/src/queue/processors/db/export-notes.ts b/src/queue/processors/db/export-notes.ts
index 2d09c0d201..3f146aff1b 100644
--- a/src/queue/processors/db/export-notes.ts
+++ b/src/queue/processors/db/export-notes.ts
@@ -3,12 +3,12 @@ import * as tmp from 'tmp';
import * as fs from 'fs';
import { queueLogger } from '../../logger';
-import addFile from '../../../services/drive/add-file';
-import dateFormat = require('dateformat');
-import { Users, Notes, Polls } from '../../../models';
+import addFile from '@/services/drive/add-file';
+import dateFormat from 'dateformat';
+import { Users, Notes, Polls } from '@/models/index';
import { MoreThan } from 'typeorm';
-import { Note } from '../../../models/entities/note';
-import { Poll } from '../../../models/entities/poll';
+import { Note } from '@/models/entities/note';
+import { Poll } from '@/models/entities/poll';
import { DbUserJobData } from '@/queue/types';
const logger = queueLogger.createSubLogger('export-notes');
diff --git a/src/queue/processors/db/export-user-lists.ts b/src/queue/processors/db/export-user-lists.ts
index 3f793e064f..89bbd5af18 100644
--- a/src/queue/processors/db/export-user-lists.ts
+++ b/src/queue/processors/db/export-user-lists.ts
@@ -3,10 +3,10 @@ import * as tmp from 'tmp';
import * as fs from 'fs';
import { queueLogger } from '../../logger';
-import addFile from '../../../services/drive/add-file';
-import dateFormat = require('dateformat');
+import addFile from '@/services/drive/add-file';
+import dateFormat from 'dateformat';
import { getFullApAccount } from '@/misc/convert-host';
-import { Users, UserLists, UserListJoinings } from '../../../models';
+import { Users, UserLists, UserListJoinings } from '@/models/index';
import { In } from 'typeorm';
import { DbUserJobData } from '@/queue/types';
diff --git a/src/queue/processors/db/import-following.ts b/src/queue/processors/db/import-following.ts
index 1156b5cafa..3d7b7ea404 100644
--- a/src/queue/processors/db/import-following.ts
+++ b/src/queue/processors/db/import-following.ts
@@ -1,12 +1,12 @@
import * as Bull from 'bull';
import { queueLogger } from '../../logger';
-import follow from '../../../services/following/create';
+import follow from '@/services/following/create';
import { parseAcct } from '@/misc/acct';
-import { resolveUser } from '../../../remote/resolve-user';
+import { resolveUser } from '@/remote/resolve-user';
import { downloadTextFile } from '@/misc/download-text-file';
import { isSelfHost, toPuny } from '@/misc/convert-host';
-import { Users, DriveFiles } from '../../../models';
+import { Users, DriveFiles } from '@/models/index';
import { DbUserImportJobData } from '@/queue/types';
const logger = queueLogger.createSubLogger('import-following');
diff --git a/src/queue/processors/db/import-user-lists.ts b/src/queue/processors/db/import-user-lists.ts
index d04ead869a..3b8c13262a 100644
--- a/src/queue/processors/db/import-user-lists.ts
+++ b/src/queue/processors/db/import-user-lists.ts
@@ -2,11 +2,11 @@ import * as Bull from 'bull';
import { queueLogger } from '../../logger';
import { parseAcct } from '@/misc/acct';
-import { resolveUser } from '../../../remote/resolve-user';
-import { pushUserToUserList } from '../../../services/user-list/push';
+import { resolveUser } from '@/remote/resolve-user';
+import { pushUserToUserList } from '@/services/user-list/push';
import { downloadTextFile } from '@/misc/download-text-file';
import { isSelfHost, toPuny } from '@/misc/convert-host';
-import { DriveFiles, Users, UserLists, UserListJoinings } from '../../../models';
+import { DriveFiles, Users, UserLists, UserListJoinings } from '@/models/index';
import { genId } from '@/misc/gen-id';
import { DbUserImportJobData } from '@/queue/types';
diff --git a/src/queue/processors/db/index.ts b/src/queue/processors/db/index.ts
index b56b7bfa2c..b051a28e0b 100644
--- a/src/queue/processors/db/index.ts
+++ b/src/queue/processors/db/index.ts
@@ -8,6 +8,7 @@ import { exportBlocking } from './export-blocking';
import { exportUserLists } from './export-user-lists';
import { importFollowing } from './import-following';
import { importUserLists } from './import-user-lists';
+import { deleteAccount } from './delete-account';
const jobs = {
deleteDriveFiles,
@@ -17,7 +18,8 @@ const jobs = {
exportBlocking,
exportUserLists,
importFollowing,
- importUserLists
+ importUserLists,
+ deleteAccount,
} as Record<string, Bull.ProcessCallbackFunction<DbJobData> | Bull.ProcessPromiseFunction<DbJobData>>;
export default function(dbQueue: Bull.Queue<DbJobData>) {
diff --git a/src/queue/processors/deliver.ts b/src/queue/processors/deliver.ts
index f9c53fc8f1..373e57cbd5 100644
--- a/src/queue/processors/deliver.ts
+++ b/src/queue/processors/deliver.ts
@@ -1,15 +1,15 @@
import { URL } from 'url';
import * as Bull from 'bull';
-import request from '../../remote/activitypub/request';
-import { registerOrFetchInstanceDoc } from '../../services/register-or-fetch-instance-doc';
-import Logger from '../../services/logger';
-import { Instances } from '../../models';
-import { instanceChart } from '../../services/chart';
-import { fetchInstanceMetadata } from '../../services/fetch-instance-metadata';
+import request from '@/remote/activitypub/request';
+import { registerOrFetchInstanceDoc } from '@/services/register-or-fetch-instance-doc';
+import Logger from '@/services/logger';
+import { Instances } from '@/models/index';
+import { instanceChart } from '@/services/chart/index';
+import { fetchInstanceMetadata } from '@/services/fetch-instance-metadata';
import { fetchMeta } from '@/misc/fetch-meta';
import { toPuny } from '@/misc/convert-host';
import { Cache } from '@/misc/cache';
-import { Instance } from '../../models/entities/instance';
+import { Instance } from '@/models/entities/instance';
import { DeliverJobData } from '../types';
const logger = new Logger('deliver');
diff --git a/src/queue/processors/inbox.ts b/src/queue/processors/inbox.ts
index 5922c4c560..e2c271cdf8 100644
--- a/src/queue/processors/inbox.ts
+++ b/src/queue/processors/inbox.ts
@@ -1,19 +1,19 @@
import { URL } from 'url';
import * as Bull from 'bull';
import * as httpSignature from 'http-signature';
-import perform from '../../remote/activitypub/perform';
-import Logger from '../../services/logger';
-import { registerOrFetchInstanceDoc } from '../../services/register-or-fetch-instance-doc';
-import { Instances } from '../../models';
-import { instanceChart } from '../../services/chart';
+import perform from '@/remote/activitypub/perform';
+import Logger from '@/services/logger';
+import { registerOrFetchInstanceDoc } from '@/services/register-or-fetch-instance-doc';
+import { Instances } from '@/models/index';
+import { instanceChart } from '@/services/chart/index';
import { fetchMeta } from '@/misc/fetch-meta';
import { toPuny, extractDbHost } from '@/misc/convert-host';
-import { getApId } from '../../remote/activitypub/type';
-import { fetchInstanceMetadata } from '../../services/fetch-instance-metadata';
+import { getApId } from '@/remote/activitypub/type';
+import { fetchInstanceMetadata } from '@/services/fetch-instance-metadata';
import { InboxJobData } from '../types';
-import DbResolver from '../../remote/activitypub/db-resolver';
-import { resolvePerson } from '../../remote/activitypub/models/person';
-import { LdSignature } from '../../remote/activitypub/misc/ld-signature';
+import DbResolver from '@/remote/activitypub/db-resolver';
+import { resolvePerson } from '@/remote/activitypub/models/person';
+import { LdSignature } from '@/remote/activitypub/misc/ld-signature';
const logger = new Logger('inbox');
diff --git a/src/queue/processors/object-storage/clean-remote-files.ts b/src/queue/processors/object-storage/clean-remote-files.ts
index a922755f4d..3b2e4ea939 100644
--- a/src/queue/processors/object-storage/clean-remote-files.ts
+++ b/src/queue/processors/object-storage/clean-remote-files.ts
@@ -1,8 +1,8 @@
import * as Bull from 'bull';
import { queueLogger } from '../../logger';
-import { deleteFileSync } from '../../../services/drive/delete-file';
-import { DriveFiles } from '../../../models';
+import { deleteFileSync } from '@/services/drive/delete-file';
+import { DriveFiles } from '@/models/index';
import { MoreThan, Not, IsNull } from 'typeorm';
const logger = queueLogger.createSubLogger('clean-remote-files');
diff --git a/src/queue/processors/object-storage/delete-file.ts b/src/queue/processors/object-storage/delete-file.ts
index 31050998af..ed22968a27 100644
--- a/src/queue/processors/object-storage/delete-file.ts
+++ b/src/queue/processors/object-storage/delete-file.ts
@@ -1,6 +1,6 @@
import { ObjectStorageFileJobData } from '@/queue/types';
import * as Bull from 'bull';
-import { deleteObjectStorageFile } from '../../../services/drive/delete-file';
+import { deleteObjectStorageFile } from '@/services/drive/delete-file';
export default async (job: Bull.Job<ObjectStorageFileJobData>) => {
const key: string = job.data.key;
diff --git a/src/queue/queues.ts b/src/queue/queues.ts
index 5e2754b83f..d8c09ef86e 100644
--- a/src/queue/queues.ts
+++ b/src/queue/queues.ts
@@ -1,4 +1,4 @@
-import config from '@/config';
+import config from '@/config/index';
import { initialize as initializeQueue } from './initialize';
import { DeliverJobData, InboxJobData, DbJobData, ObjectStorageJobData } from './types';