summaryrefslogtreecommitdiff
path: root/src/queue/processors/db
diff options
context:
space:
mode:
authorsyuilo <syuilotan@yahoo.co.jp>2019-03-07 23:07:21 +0900
committersyuilo <syuilotan@yahoo.co.jp>2019-03-07 23:07:21 +0900
commitc934987b14dc2b0c362f2d13e0664ab275aca522 (patch)
tree4a83b3a22745c9b5ad991a5f7c61d7acee0eba77 /src/queue/processors/db
parentUpdate issue templates (diff)
downloadsharkey-c934987b14dc2b0c362f2d13e0664ab275aca522.tar.gz
sharkey-c934987b14dc2b0c362f2d13e0664ab275aca522.tar.bz2
sharkey-c934987b14dc2b0c362f2d13e0664ab275aca522.zip
Resolve #4444
Diffstat (limited to 'src/queue/processors/db')
-rw-r--r--src/queue/processors/db/delete-drive-files.ts55
-rw-r--r--src/queue/processors/db/delete-notes.ts55
-rw-r--r--src/queue/processors/db/export-blocking.ts89
-rw-r--r--src/queue/processors/db/export-following.ts89
-rw-r--r--src/queue/processors/db/export-mute.ts89
-rw-r--r--src/queue/processors/db/export-notes.ts128
-rw-r--r--src/queue/processors/db/index.ts20
7 files changed, 525 insertions, 0 deletions
diff --git a/src/queue/processors/db/delete-drive-files.ts b/src/queue/processors/db/delete-drive-files.ts
new file mode 100644
index 0000000000..3de960a25e
--- /dev/null
+++ b/src/queue/processors/db/delete-drive-files.ts
@@ -0,0 +1,55 @@
+import * as Bull from 'bull';
+import * as mongo from 'mongodb';
+
+import { queueLogger } from '../../logger';
+import User from '../../../models/user';
+import DriveFile from '../../../models/drive-file';
+import deleteFile from '../../../services/drive/delete-file';
+
+const logger = queueLogger.createSubLogger('delete-drive-files');
+
+export async function deleteDriveFiles(job: Bull.Job, done: any): Promise<void> {
+ logger.info(`Deleting drive files of ${job.data.user._id} ...`);
+
+ const user = await User.findOne({
+ _id: new mongo.ObjectID(job.data.user._id.toString())
+ });
+
+ let deletedCount = 0;
+ let ended = false;
+ let cursor: any = null;
+
+ while (!ended) {
+ const files = await DriveFile.find({
+ userId: user._id,
+ ...(cursor ? { _id: { $gt: cursor } } : {})
+ }, {
+ limit: 100,
+ sort: {
+ _id: 1
+ }
+ });
+
+ if (files.length === 0) {
+ ended = true;
+ job.progress(100);
+ break;
+ }
+
+ cursor = files[files.length - 1]._id;
+
+ for (const file of files) {
+ await deleteFile(file);
+ deletedCount++;
+ }
+
+ const total = await DriveFile.count({
+ userId: user._id,
+ });
+
+ job.progress(deletedCount / total);
+ }
+
+ logger.succ(`All drive files (${deletedCount}) of ${user._id} has been deleted.`);
+ done();
+}
diff --git a/src/queue/processors/db/delete-notes.ts b/src/queue/processors/db/delete-notes.ts
new file mode 100644
index 0000000000..021db8062e
--- /dev/null
+++ b/src/queue/processors/db/delete-notes.ts
@@ -0,0 +1,55 @@
+import * as Bull from 'bull';
+import * as mongo from 'mongodb';
+
+import { queueLogger } from '../../logger';
+import Note from '../../../models/note';
+import deleteNote from '../../../services/note/delete';
+import User from '../../../models/user';
+
+const logger = queueLogger.createSubLogger('delete-notes');
+
+export async function deleteNotes(job: Bull.Job, done: any): Promise<void> {
+ logger.info(`Deleting notes of ${job.data.user._id} ...`);
+
+ const user = await User.findOne({
+ _id: new mongo.ObjectID(job.data.user._id.toString())
+ });
+
+ let deletedCount = 0;
+ let ended = false;
+ let cursor: any = null;
+
+ while (!ended) {
+ const notes = await Note.find({
+ userId: user._id,
+ ...(cursor ? { _id: { $gt: cursor } } : {})
+ }, {
+ limit: 100,
+ sort: {
+ _id: 1
+ }
+ });
+
+ if (notes.length === 0) {
+ ended = true;
+ job.progress(100);
+ break;
+ }
+
+ cursor = notes[notes.length - 1]._id;
+
+ for (const note of notes) {
+ await deleteNote(user, note, true);
+ deletedCount++;
+ }
+
+ const total = await Note.count({
+ userId: user._id,
+ });
+
+ job.progress(deletedCount / total);
+ }
+
+ logger.succ(`All notes (${deletedCount}) of ${user._id} has been deleted.`);
+ done();
+}
diff --git a/src/queue/processors/db/export-blocking.ts b/src/queue/processors/db/export-blocking.ts
new file mode 100644
index 0000000000..e56aec94ac
--- /dev/null
+++ b/src/queue/processors/db/export-blocking.ts
@@ -0,0 +1,89 @@
+import * as Bull from 'bull';
+import * as tmp from 'tmp';
+import * as fs from 'fs';
+import * as mongo from 'mongodb';
+
+import { queueLogger } from '../../logger';
+import addFile from '../../../services/drive/add-file';
+import User from '../../../models/user';
+import dateFormat = require('dateformat');
+import Blocking from '../../../models/blocking';
+import config from '../../../config';
+
+const logger = queueLogger.createSubLogger('export-blocking');
+
+export async function exportBlocking(job: Bull.Job, done: any): Promise<void> {
+ logger.info(`Exporting blocking of ${job.data.user._id} ...`);
+
+ const user = await User.findOne({
+ _id: new mongo.ObjectID(job.data.user._id.toString())
+ });
+
+ // Create temp file
+ const [path, cleanup] = await new Promise<[string, any]>((res, rej) => {
+ tmp.file((e, path, fd, cleanup) => {
+ if (e) return rej(e);
+ res([path, cleanup]);
+ });
+ });
+
+ logger.info(`Temp file is ${path}`);
+
+ const stream = fs.createWriteStream(path, { flags: 'a' });
+
+ let exportedCount = 0;
+ let ended = false;
+ let cursor: any = null;
+
+ while (!ended) {
+ const blockings = await Blocking.find({
+ blockerId: user._id,
+ ...(cursor ? { _id: { $gt: cursor } } : {})
+ }, {
+ limit: 100,
+ sort: {
+ _id: 1
+ }
+ });
+
+ if (blockings.length === 0) {
+ ended = true;
+ job.progress(100);
+ break;
+ }
+
+ cursor = blockings[blockings.length - 1]._id;
+
+ for (const block of blockings) {
+ const u = await User.findOne({ _id: block.blockeeId }, { fields: { username: true, host: true } });
+ const content = u.host ? `${u.username}@${u.host}` : `${u.username}@${config.host}`;
+ await new Promise((res, rej) => {
+ stream.write(content + '\n', err => {
+ if (err) {
+ logger.error(err);
+ rej(err);
+ } else {
+ res();
+ }
+ });
+ });
+ exportedCount++;
+ }
+
+ const total = await Blocking.count({
+ blockerId: user._id,
+ });
+
+ job.progress(exportedCount / total);
+ }
+
+ stream.end();
+ logger.succ(`Exported to: ${path}`);
+
+ const fileName = 'blocking-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.csv';
+ const driveFile = await addFile(user, path, fileName);
+
+ logger.succ(`Exported to: ${driveFile._id}`);
+ cleanup();
+ done();
+}
diff --git a/src/queue/processors/db/export-following.ts b/src/queue/processors/db/export-following.ts
new file mode 100644
index 0000000000..1d8a501b78
--- /dev/null
+++ b/src/queue/processors/db/export-following.ts
@@ -0,0 +1,89 @@
+import * as Bull from 'bull';
+import * as tmp from 'tmp';
+import * as fs from 'fs';
+import * as mongo from 'mongodb';
+
+import { queueLogger } from '../../logger';
+import addFile from '../../../services/drive/add-file';
+import User from '../../../models/user';
+import dateFormat = require('dateformat');
+import Following from '../../../models/following';
+import config from '../../../config';
+
+const logger = queueLogger.createSubLogger('export-following');
+
+export async function exportFollowing(job: Bull.Job, done: any): Promise<void> {
+ logger.info(`Exporting following of ${job.data.user._id} ...`);
+
+ const user = await User.findOne({
+ _id: new mongo.ObjectID(job.data.user._id.toString())
+ });
+
+ // Create temp file
+ const [path, cleanup] = await new Promise<[string, any]>((res, rej) => {
+ tmp.file((e, path, fd, cleanup) => {
+ if (e) return rej(e);
+ res([path, cleanup]);
+ });
+ });
+
+ logger.info(`Temp file is ${path}`);
+
+ const stream = fs.createWriteStream(path, { flags: 'a' });
+
+ let exportedCount = 0;
+ let ended = false;
+ let cursor: any = null;
+
+ while (!ended) {
+ const followings = await Following.find({
+ followerId: user._id,
+ ...(cursor ? { _id: { $gt: cursor } } : {})
+ }, {
+ limit: 100,
+ sort: {
+ _id: 1
+ }
+ });
+
+ if (followings.length === 0) {
+ ended = true;
+ job.progress(100);
+ break;
+ }
+
+ cursor = followings[followings.length - 1]._id;
+
+ for (const following of followings) {
+ const u = await User.findOne({ _id: following.followeeId }, { fields: { username: true, host: true } });
+ const content = u.host ? `${u.username}@${u.host}` : `${u.username}@${config.host}`;
+ await new Promise((res, rej) => {
+ stream.write(content + '\n', err => {
+ if (err) {
+ logger.error(err);
+ rej(err);
+ } else {
+ res();
+ }
+ });
+ });
+ exportedCount++;
+ }
+
+ const total = await Following.count({
+ followerId: user._id,
+ });
+
+ job.progress(exportedCount / total);
+ }
+
+ stream.end();
+ logger.succ(`Exported to: ${path}`);
+
+ const fileName = 'following-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.csv';
+ const driveFile = await addFile(user, path, fileName);
+
+ logger.succ(`Exported to: ${driveFile._id}`);
+ cleanup();
+ done();
+}
diff --git a/src/queue/processors/db/export-mute.ts b/src/queue/processors/db/export-mute.ts
new file mode 100644
index 0000000000..6f2dd6df13
--- /dev/null
+++ b/src/queue/processors/db/export-mute.ts
@@ -0,0 +1,89 @@
+import * as Bull from 'bull';
+import * as tmp from 'tmp';
+import * as fs from 'fs';
+import * as mongo from 'mongodb';
+
+import { queueLogger } from '../../logger';
+import addFile from '../../../services/drive/add-file';
+import User from '../../../models/user';
+import dateFormat = require('dateformat');
+import Mute from '../../../models/mute';
+import config from '../../../config';
+
+const logger = queueLogger.createSubLogger('export-mute');
+
+export async function exportMute(job: Bull.Job, done: any): Promise<void> {
+ logger.info(`Exporting mute of ${job.data.user._id} ...`);
+
+ const user = await User.findOne({
+ _id: new mongo.ObjectID(job.data.user._id.toString())
+ });
+
+ // Create temp file
+ const [path, cleanup] = await new Promise<[string, any]>((res, rej) => {
+ tmp.file((e, path, fd, cleanup) => {
+ if (e) return rej(e);
+ res([path, cleanup]);
+ });
+ });
+
+ logger.info(`Temp file is ${path}`);
+
+ const stream = fs.createWriteStream(path, { flags: 'a' });
+
+ let exportedCount = 0;
+ let ended = false;
+ let cursor: any = null;
+
+ while (!ended) {
+ const mutes = await Mute.find({
+ muterId: user._id,
+ ...(cursor ? { _id: { $gt: cursor } } : {})
+ }, {
+ limit: 100,
+ sort: {
+ _id: 1
+ }
+ });
+
+ if (mutes.length === 0) {
+ ended = true;
+ job.progress(100);
+ break;
+ }
+
+ cursor = mutes[mutes.length - 1]._id;
+
+ for (const mute of mutes) {
+ const u = await User.findOne({ _id: mute.muteeId }, { fields: { username: true, host: true } });
+ const content = u.host ? `${u.username}@${u.host}` : `${u.username}@${config.host}`;
+ await new Promise((res, rej) => {
+ stream.write(content + '\n', err => {
+ if (err) {
+ logger.error(err);
+ rej(err);
+ } else {
+ res();
+ }
+ });
+ });
+ exportedCount++;
+ }
+
+ const total = await Mute.count({
+ muterId: user._id,
+ });
+
+ job.progress(exportedCount / total);
+ }
+
+ stream.end();
+ logger.succ(`Exported to: ${path}`);
+
+ const fileName = 'mute-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.csv';
+ const driveFile = await addFile(user, path, fileName);
+
+ logger.succ(`Exported to: ${driveFile._id}`);
+ cleanup();
+ done();
+}
diff --git a/src/queue/processors/db/export-notes.ts b/src/queue/processors/db/export-notes.ts
new file mode 100644
index 0000000000..8f3cdc5b99
--- /dev/null
+++ b/src/queue/processors/db/export-notes.ts
@@ -0,0 +1,128 @@
+import * as Bull from 'bull';
+import * as tmp from 'tmp';
+import * as fs from 'fs';
+import * as mongo from 'mongodb';
+
+import { queueLogger } from '../../logger';
+import Note, { INote } from '../../../models/note';
+import addFile from '../../../services/drive/add-file';
+import User from '../../../models/user';
+import dateFormat = require('dateformat');
+
+const logger = queueLogger.createSubLogger('export-notes');
+
+export async function exportNotes(job: Bull.Job, done: any): Promise<void> {
+ logger.info(`Exporting notes of ${job.data.user._id} ...`);
+
+ const user = await User.findOne({
+ _id: new mongo.ObjectID(job.data.user._id.toString())
+ });
+
+ // Create temp file
+ const [path, cleanup] = await new Promise<[string, any]>((res, rej) => {
+ tmp.file((e, path, fd, cleanup) => {
+ if (e) return rej(e);
+ res([path, cleanup]);
+ });
+ });
+
+ logger.info(`Temp file is ${path}`);
+
+ const stream = fs.createWriteStream(path, { flags: 'a' });
+
+ await new Promise((res, rej) => {
+ stream.write('[', err => {
+ if (err) {
+ logger.error(err);
+ rej(err);
+ } else {
+ res();
+ }
+ });
+ });
+
+ let exportedNotesCount = 0;
+ let ended = false;
+ let cursor: any = null;
+
+ while (!ended) {
+ const notes = await Note.find({
+ userId: user._id,
+ ...(cursor ? { _id: { $gt: cursor } } : {})
+ }, {
+ limit: 100,
+ sort: {
+ _id: 1
+ }
+ });
+
+ if (notes.length === 0) {
+ ended = true;
+ job.progress(100);
+ break;
+ }
+
+ cursor = notes[notes.length - 1]._id;
+
+ for (const note of notes) {
+ const content = JSON.stringify(serialize(note));
+ await new Promise((res, rej) => {
+ stream.write(exportedNotesCount === 0 ? content : ',\n' + content, err => {
+ if (err) {
+ logger.error(err);
+ rej(err);
+ } else {
+ res();
+ }
+ });
+ });
+ exportedNotesCount++;
+ }
+
+ const total = await Note.count({
+ userId: user._id,
+ });
+
+ job.progress(exportedNotesCount / total);
+ }
+
+ await new Promise((res, rej) => {
+ stream.write(']', err => {
+ if (err) {
+ logger.error(err);
+ rej(err);
+ } else {
+ res();
+ }
+ });
+ });
+
+ stream.end();
+ logger.succ(`Exported to: ${path}`);
+
+ const fileName = 'notes-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.json';
+ const driveFile = await addFile(user, path, fileName);
+
+ logger.succ(`Exported to: ${driveFile._id}`);
+ cleanup();
+ done();
+}
+
+function serialize(note: INote): any {
+ return {
+ id: note._id,
+ text: note.text,
+ createdAt: note.createdAt,
+ fileIds: note.fileIds,
+ replyId: note.replyId,
+ renoteId: note.renoteId,
+ poll: note.poll,
+ cw: note.cw,
+ viaMobile: note.viaMobile,
+ visibility: note.visibility,
+ visibleUserIds: note.visibleUserIds,
+ appId: note.appId,
+ geo: note.geo,
+ localOnly: note.localOnly
+ };
+}
diff --git a/src/queue/processors/db/index.ts b/src/queue/processors/db/index.ts
new file mode 100644
index 0000000000..91d7f06a4b
--- /dev/null
+++ b/src/queue/processors/db/index.ts
@@ -0,0 +1,20 @@
+import * as Bull from 'bull';
+import { deleteNotes } from './delete-notes';
+import { deleteDriveFiles } from './delete-drive-files';
+import { exportNotes } from './export-notes';
+import { exportFollowing } from './export-following';
+import { exportMute } from './export-mute';
+import { exportBlocking } from './export-blocking';
+
+const jobs = {
+ deleteNotes,
+ deleteDriveFiles,
+ exportNotes,
+ exportFollowing,
+ exportMute,
+ exportBlocking,
+} as any;
+
+export default function(job: Bull.Job, done: any) {
+ jobs[job.data.type](job, done);
+}