diff options
Diffstat (limited to 'packages/backend/src/queue/processors/db')
12 files changed, 979 insertions, 0 deletions
diff --git a/packages/backend/src/queue/processors/db/delete-account.ts b/packages/backend/src/queue/processors/db/delete-account.ts new file mode 100644 index 0000000000..e54f38e35e --- /dev/null +++ b/packages/backend/src/queue/processors/db/delete-account.ts @@ -0,0 +1,94 @@ +import * as Bull from 'bull'; +import { queueLogger } from '../../logger'; +import { DriveFiles, Notes, UserProfiles, Users } from '@/models/index'; +import { DbUserDeleteJobData } from '@/queue/types'; +import { Note } from '@/models/entities/note'; +import { DriveFile } from '@/models/entities/drive-file'; +import { MoreThan } from 'typeorm'; +import { deleteFileSync } from '@/services/drive/delete-file'; +import { sendEmail } from '@/services/send-email'; + +const logger = queueLogger.createSubLogger('delete-account'); + +export async function deleteAccount(job: Bull.Job<DbUserDeleteJobData>): Promise<string | void> { + logger.info(`Deleting account of ${job.data.user.id} ...`); + + const user = await Users.findOne(job.data.user.id); + if (user == null) { + return; + } + + { // Delete notes + let cursor: Note['id'] | null = null; + + while (true) { + const notes = await Notes.find({ + where: { + userId: user.id, + ...(cursor ? { id: MoreThan(cursor) } : {}) + }, + take: 100, + order: { + id: 1 + } + }); + + if (notes.length === 0) { + break; + } + + cursor = notes[notes.length - 1].id; + + await Notes.delete(notes.map(note => note.id)); + } + + logger.succ(`All of notes deleted`); + } + + { // Delete files + let cursor: DriveFile['id'] | null = null; + + while (true) { + const files = await DriveFiles.find({ + where: { + userId: user.id, + ...(cursor ? { id: MoreThan(cursor) } : {}) + }, + take: 10, + order: { + id: 1 + } + }); + + if (files.length === 0) { + break; + } + + cursor = files[files.length - 1].id; + + for (const file of files) { + await deleteFileSync(file); + } + } + + logger.succ(`All of files deleted`); + } + + { // Send email notification + const profile = await UserProfiles.findOneOrFail(user.id); + if (profile.email && profile.emailVerified) { + sendEmail(profile.email, 'Account deleted', + `Your account has been deleted.`, + `Your account has been deleted.`); + } + } + + // soft指定されている場合は物理削除しない + if (job.data.soft) { + // nop + } else { + await Users.delete(job.data.user.id); + } + + return 'Account deleted'; +} diff --git a/packages/backend/src/queue/processors/db/delete-drive-files.ts b/packages/backend/src/queue/processors/db/delete-drive-files.ts new file mode 100644 index 0000000000..8a28468b0d --- /dev/null +++ b/packages/backend/src/queue/processors/db/delete-drive-files.ts @@ -0,0 +1,56 @@ +import * as Bull from 'bull'; + +import { queueLogger } from '../../logger'; +import { deleteFileSync } from '@/services/drive/delete-file'; +import { Users, DriveFiles } from '@/models/index'; +import { MoreThan } from 'typeorm'; +import { DbUserJobData } from '@/queue/types'; + +const logger = queueLogger.createSubLogger('delete-drive-files'); + +export async function deleteDriveFiles(job: Bull.Job<DbUserJobData>, done: any): Promise<void> { + logger.info(`Deleting drive files of ${job.data.user.id} ...`); + + const user = await Users.findOne(job.data.user.id); + if (user == null) { + done(); + return; + } + + let deletedCount = 0; + let cursor: any = null; + + while (true) { + const files = await DriveFiles.find({ + where: { + userId: user.id, + ...(cursor ? { id: MoreThan(cursor) } : {}) + }, + take: 100, + order: { + id: 1 + } + }); + + if (files.length === 0) { + job.progress(100); + break; + } + + cursor = files[files.length - 1].id; + + for (const file of files) { + await deleteFileSync(file); + deletedCount++; + } + + const total = await DriveFiles.count({ + userId: user.id, + }); + + job.progress(deletedCount / total); + } + + logger.succ(`All drive files (${deletedCount}) of ${user.id} has been deleted.`); + done(); +} diff --git a/packages/backend/src/queue/processors/db/export-blocking.ts b/packages/backend/src/queue/processors/db/export-blocking.ts new file mode 100644 index 0000000000..8b8aa259d4 --- /dev/null +++ b/packages/backend/src/queue/processors/db/export-blocking.ts @@ -0,0 +1,94 @@ +import * as Bull from 'bull'; +import * as tmp from 'tmp'; +import * as fs from 'fs'; + +import { queueLogger } from '../../logger'; +import addFile from '@/services/drive/add-file'; +import * as dateFormat from 'dateformat'; +import { getFullApAccount } from '@/misc/convert-host'; +import { Users, Blockings } from '@/models/index'; +import { MoreThan } from 'typeorm'; +import { DbUserJobData } from '@/queue/types'; + +const logger = queueLogger.createSubLogger('export-blocking'); + +export async function exportBlocking(job: Bull.Job<DbUserJobData>, done: any): Promise<void> { + logger.info(`Exporting blocking of ${job.data.user.id} ...`); + + const user = await Users.findOne(job.data.user.id); + if (user == null) { + done(); + return; + } + + // Create temp file + const [path, cleanup] = await new Promise<[string, any]>((res, rej) => { + tmp.file((e, path, fd, cleanup) => { + if (e) return rej(e); + res([path, cleanup]); + }); + }); + + logger.info(`Temp file is ${path}`); + + const stream = fs.createWriteStream(path, { flags: 'a' }); + + let exportedCount = 0; + let cursor: any = null; + + while (true) { + const blockings = await Blockings.find({ + where: { + blockerId: user.id, + ...(cursor ? { id: MoreThan(cursor) } : {}) + }, + take: 100, + order: { + id: 1 + } + }); + + if (blockings.length === 0) { + job.progress(100); + break; + } + + cursor = blockings[blockings.length - 1].id; + + for (const block of blockings) { + const u = await Users.findOne({ id: block.blockeeId }); + if (u == null) { + exportedCount++; continue; + } + + const content = getFullApAccount(u.username, u.host); + await new Promise<void>((res, rej) => { + stream.write(content + '\n', err => { + if (err) { + logger.error(err); + rej(err); + } else { + res(); + } + }); + }); + exportedCount++; + } + + const total = await Blockings.count({ + blockerId: user.id, + }); + + job.progress(exportedCount / total); + } + + stream.end(); + logger.succ(`Exported to: ${path}`); + + const fileName = 'blocking-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.csv'; + const driveFile = await addFile(user, path, fileName, null, null, true); + + logger.succ(`Exported to: ${driveFile.id}`); + cleanup(); + done(); +} diff --git a/packages/backend/src/queue/processors/db/export-following.ts b/packages/backend/src/queue/processors/db/export-following.ts new file mode 100644 index 0000000000..a0ecf5f560 --- /dev/null +++ b/packages/backend/src/queue/processors/db/export-following.ts @@ -0,0 +1,94 @@ +import * as Bull from 'bull'; +import * as tmp from 'tmp'; +import * as fs from 'fs'; + +import { queueLogger } from '../../logger'; +import addFile from '@/services/drive/add-file'; +import * as dateFormat from 'dateformat'; +import { getFullApAccount } from '@/misc/convert-host'; +import { Users, Followings } from '@/models/index'; +import { MoreThan } from 'typeorm'; +import { DbUserJobData } from '@/queue/types'; + +const logger = queueLogger.createSubLogger('export-following'); + +export async function exportFollowing(job: Bull.Job<DbUserJobData>, done: any): Promise<void> { + logger.info(`Exporting following of ${job.data.user.id} ...`); + + const user = await Users.findOne(job.data.user.id); + if (user == null) { + done(); + return; + } + + // Create temp file + const [path, cleanup] = await new Promise<[string, any]>((res, rej) => { + tmp.file((e, path, fd, cleanup) => { + if (e) return rej(e); + res([path, cleanup]); + }); + }); + + logger.info(`Temp file is ${path}`); + + const stream = fs.createWriteStream(path, { flags: 'a' }); + + let exportedCount = 0; + let cursor: any = null; + + while (true) { + const followings = await Followings.find({ + where: { + followerId: user.id, + ...(cursor ? { id: MoreThan(cursor) } : {}) + }, + take: 100, + order: { + id: 1 + } + }); + + if (followings.length === 0) { + job.progress(100); + break; + } + + cursor = followings[followings.length - 1].id; + + for (const following of followings) { + const u = await Users.findOne({ id: following.followeeId }); + if (u == null) { + exportedCount++; continue; + } + + const content = getFullApAccount(u.username, u.host); + await new Promise<void>((res, rej) => { + stream.write(content + '\n', err => { + if (err) { + logger.error(err); + rej(err); + } else { + res(); + } + }); + }); + exportedCount++; + } + + const total = await Followings.count({ + followerId: user.id, + }); + + job.progress(exportedCount / total); + } + + stream.end(); + logger.succ(`Exported to: ${path}`); + + const fileName = 'following-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.csv'; + const driveFile = await addFile(user, path, fileName, null, null, true); + + logger.succ(`Exported to: ${driveFile.id}`); + cleanup(); + done(); +} diff --git a/packages/backend/src/queue/processors/db/export-mute.ts b/packages/backend/src/queue/processors/db/export-mute.ts new file mode 100644 index 0000000000..d5976f7d56 --- /dev/null +++ b/packages/backend/src/queue/processors/db/export-mute.ts @@ -0,0 +1,94 @@ +import * as Bull from 'bull'; +import * as tmp from 'tmp'; +import * as fs from 'fs'; + +import { queueLogger } from '../../logger'; +import addFile from '@/services/drive/add-file'; +import * as dateFormat from 'dateformat'; +import { getFullApAccount } from '@/misc/convert-host'; +import { Users, Mutings } from '@/models/index'; +import { MoreThan } from 'typeorm'; +import { DbUserJobData } from '@/queue/types'; + +const logger = queueLogger.createSubLogger('export-mute'); + +export async function exportMute(job: Bull.Job<DbUserJobData>, done: any): Promise<void> { + logger.info(`Exporting mute of ${job.data.user.id} ...`); + + const user = await Users.findOne(job.data.user.id); + if (user == null) { + done(); + return; + } + + // Create temp file + const [path, cleanup] = await new Promise<[string, any]>((res, rej) => { + tmp.file((e, path, fd, cleanup) => { + if (e) return rej(e); + res([path, cleanup]); + }); + }); + + logger.info(`Temp file is ${path}`); + + const stream = fs.createWriteStream(path, { flags: 'a' }); + + let exportedCount = 0; + let cursor: any = null; + + while (true) { + const mutes = await Mutings.find({ + where: { + muterId: user.id, + ...(cursor ? { id: MoreThan(cursor) } : {}) + }, + take: 100, + order: { + id: 1 + } + }); + + if (mutes.length === 0) { + job.progress(100); + break; + } + + cursor = mutes[mutes.length - 1].id; + + for (const mute of mutes) { + const u = await Users.findOne({ id: mute.muteeId }); + if (u == null) { + exportedCount++; continue; + } + + const content = getFullApAccount(u.username, u.host); + await new Promise<void>((res, rej) => { + stream.write(content + '\n', err => { + if (err) { + logger.error(err); + rej(err); + } else { + res(); + } + }); + }); + exportedCount++; + } + + const total = await Mutings.count({ + muterId: user.id, + }); + + job.progress(exportedCount / total); + } + + stream.end(); + logger.succ(`Exported to: ${path}`); + + const fileName = 'mute-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.csv'; + const driveFile = await addFile(user, path, fileName, null, null, true); + + logger.succ(`Exported to: ${driveFile.id}`); + cleanup(); + done(); +} diff --git a/packages/backend/src/queue/processors/db/export-notes.ts b/packages/backend/src/queue/processors/db/export-notes.ts new file mode 100644 index 0000000000..49850aa706 --- /dev/null +++ b/packages/backend/src/queue/processors/db/export-notes.ts @@ -0,0 +1,133 @@ +import * as Bull from 'bull'; +import * as tmp from 'tmp'; +import * as fs from 'fs'; + +import { queueLogger } from '../../logger'; +import addFile from '@/services/drive/add-file'; +import * as dateFormat from 'dateformat'; +import { Users, Notes, Polls } from '@/models/index'; +import { MoreThan } from 'typeorm'; +import { Note } from '@/models/entities/note'; +import { Poll } from '@/models/entities/poll'; +import { DbUserJobData } from '@/queue/types'; + +const logger = queueLogger.createSubLogger('export-notes'); + +export async function exportNotes(job: Bull.Job<DbUserJobData>, done: any): Promise<void> { + logger.info(`Exporting notes of ${job.data.user.id} ...`); + + const user = await Users.findOne(job.data.user.id); + if (user == null) { + done(); + return; + } + + // Create temp file + const [path, cleanup] = await new Promise<[string, any]>((res, rej) => { + tmp.file((e, path, fd, cleanup) => { + if (e) return rej(e); + res([path, cleanup]); + }); + }); + + logger.info(`Temp file is ${path}`); + + const stream = fs.createWriteStream(path, { flags: 'a' }); + + await new Promise<void>((res, rej) => { + stream.write('[', err => { + if (err) { + logger.error(err); + rej(err); + } else { + res(); + } + }); + }); + + let exportedNotesCount = 0; + let cursor: any = null; + + while (true) { + const notes = await Notes.find({ + where: { + userId: user.id, + ...(cursor ? { id: MoreThan(cursor) } : {}) + }, + take: 100, + order: { + id: 1 + } + }); + + if (notes.length === 0) { + job.progress(100); + break; + } + + cursor = notes[notes.length - 1].id; + + for (const note of notes) { + let poll: Poll | undefined; + if (note.hasPoll) { + poll = await Polls.findOneOrFail({ noteId: note.id }); + } + const content = JSON.stringify(serialize(note, poll)); + await new Promise<void>((res, rej) => { + stream.write(exportedNotesCount === 0 ? content : ',\n' + content, err => { + if (err) { + logger.error(err); + rej(err); + } else { + res(); + } + }); + }); + exportedNotesCount++; + } + + const total = await Notes.count({ + userId: user.id, + }); + + job.progress(exportedNotesCount / total); + } + + await new Promise<void>((res, rej) => { + stream.write(']', err => { + if (err) { + logger.error(err); + rej(err); + } else { + res(); + } + }); + }); + + stream.end(); + logger.succ(`Exported to: ${path}`); + + const fileName = 'notes-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.json'; + const driveFile = await addFile(user, path, fileName, null, null, true); + + logger.succ(`Exported to: ${driveFile.id}`); + cleanup(); + done(); +} + +function serialize(note: Note, poll: Poll | null = null): any { + return { + id: note.id, + text: note.text, + createdAt: note.createdAt, + fileIds: note.fileIds, + replyId: note.replyId, + renoteId: note.renoteId, + poll: poll, + cw: note.cw, + viaMobile: note.viaMobile, + visibility: note.visibility, + visibleUserIds: note.visibleUserIds, + localOnly: note.localOnly + }; +} diff --git a/packages/backend/src/queue/processors/db/export-user-lists.ts b/packages/backend/src/queue/processors/db/export-user-lists.ts new file mode 100644 index 0000000000..8a86c4df5d --- /dev/null +++ b/packages/backend/src/queue/processors/db/export-user-lists.ts @@ -0,0 +1,71 @@ +import * as Bull from 'bull'; +import * as tmp from 'tmp'; +import * as fs from 'fs'; + +import { queueLogger } from '../../logger'; +import addFile from '@/services/drive/add-file'; +import * as dateFormat from 'dateformat'; +import { getFullApAccount } from '@/misc/convert-host'; +import { Users, UserLists, UserListJoinings } from '@/models/index'; +import { In } from 'typeorm'; +import { DbUserJobData } from '@/queue/types'; + +const logger = queueLogger.createSubLogger('export-user-lists'); + +export async function exportUserLists(job: Bull.Job<DbUserJobData>, done: any): Promise<void> { + logger.info(`Exporting user lists of ${job.data.user.id} ...`); + + const user = await Users.findOne(job.data.user.id); + if (user == null) { + done(); + return; + } + + const lists = await UserLists.find({ + userId: user.id + }); + + // Create temp file + const [path, cleanup] = await new Promise<[string, any]>((res, rej) => { + tmp.file((e, path, fd, cleanup) => { + if (e) return rej(e); + res([path, cleanup]); + }); + }); + + logger.info(`Temp file is ${path}`); + + const stream = fs.createWriteStream(path, { flags: 'a' }); + + for (const list of lists) { + const joinings = await UserListJoinings.find({ userListId: list.id }); + const users = await Users.find({ + id: In(joinings.map(j => j.userId)) + }); + + for (const u of users) { + const acct = getFullApAccount(u.username, u.host); + const content = `${list.name},${acct}`; + await new Promise<void>((res, rej) => { + stream.write(content + '\n', err => { + if (err) { + logger.error(err); + rej(err); + } else { + res(); + } + }); + }); + } + } + + stream.end(); + logger.succ(`Exported to: ${path}`); + + const fileName = 'user-lists-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.csv'; + const driveFile = await addFile(user, path, fileName, null, null, true); + + logger.succ(`Exported to: ${driveFile.id}`); + cleanup(); + done(); +} diff --git a/packages/backend/src/queue/processors/db/import-blocking.ts b/packages/backend/src/queue/processors/db/import-blocking.ts new file mode 100644 index 0000000000..2e77107034 --- /dev/null +++ b/packages/backend/src/queue/processors/db/import-blocking.ts @@ -0,0 +1,74 @@ +import * as Bull from 'bull'; + +import { queueLogger } from '../../logger'; +import * as Acct from 'misskey-js/built/acct'; +import { resolveUser } from '@/remote/resolve-user'; +import { downloadTextFile } from '@/misc/download-text-file'; +import { isSelfHost, toPuny } from '@/misc/convert-host'; +import { Users, DriveFiles, Blockings } from '@/models/index'; +import { DbUserImportJobData } from '@/queue/types'; +import block from '@/services/blocking/create'; + +const logger = queueLogger.createSubLogger('import-blocking'); + +export async function importBlocking(job: Bull.Job<DbUserImportJobData>, done: any): Promise<void> { + logger.info(`Importing blocking of ${job.data.user.id} ...`); + + const user = await Users.findOne(job.data.user.id); + if (user == null) { + done(); + return; + } + + const file = await DriveFiles.findOne({ + id: job.data.fileId + }); + if (file == null) { + done(); + return; + } + + const csv = await downloadTextFile(file.url); + + let linenum = 0; + + for (const line of csv.trim().split('\n')) { + linenum++; + + try { + const acct = line.split(',')[0].trim(); + const { username, host } = Acct.parse(acct); + + let target = isSelfHost(host!) ? await Users.findOne({ + host: null, + usernameLower: username.toLowerCase() + }) : await Users.findOne({ + host: toPuny(host!), + usernameLower: username.toLowerCase() + }); + + if (host == null && target == null) continue; + + if (target == null) { + target = await resolveUser(username, host); + } + + if (target == null) { + throw `cannot resolve user: @${username}@${host}`; + } + + // skip myself + if (target.id === job.data.user.id) continue; + + logger.info(`Block[${linenum}] ${target.id} ...`); + + await block(user, target); + } catch (e) { + logger.warn(`Error in line:${linenum} ${e}`); + } + } + + logger.succ('Imported'); + done(); +} + diff --git a/packages/backend/src/queue/processors/db/import-following.ts b/packages/backend/src/queue/processors/db/import-following.ts new file mode 100644 index 0000000000..2bd079e4bc --- /dev/null +++ b/packages/backend/src/queue/processors/db/import-following.ts @@ -0,0 +1,73 @@ +import * as Bull from 'bull'; + +import { queueLogger } from '../../logger'; +import follow from '@/services/following/create'; +import * as Acct from 'misskey-js/built/acct'; +import { resolveUser } from '@/remote/resolve-user'; +import { downloadTextFile } from '@/misc/download-text-file'; +import { isSelfHost, toPuny } from '@/misc/convert-host'; +import { Users, DriveFiles } from '@/models/index'; +import { DbUserImportJobData } from '@/queue/types'; + +const logger = queueLogger.createSubLogger('import-following'); + +export async function importFollowing(job: Bull.Job<DbUserImportJobData>, done: any): Promise<void> { + logger.info(`Importing following of ${job.data.user.id} ...`); + + const user = await Users.findOne(job.data.user.id); + if (user == null) { + done(); + return; + } + + const file = await DriveFiles.findOne({ + id: job.data.fileId + }); + if (file == null) { + done(); + return; + } + + const csv = await downloadTextFile(file.url); + + let linenum = 0; + + for (const line of csv.trim().split('\n')) { + linenum++; + + try { + const acct = line.split(',')[0].trim(); + const { username, host } = Acct.parse(acct); + + let target = isSelfHost(host!) ? await Users.findOne({ + host: null, + usernameLower: username.toLowerCase() + }) : await Users.findOne({ + host: toPuny(host!), + usernameLower: username.toLowerCase() + }); + + if (host == null && target == null) continue; + + if (target == null) { + target = await resolveUser(username, host); + } + + if (target == null) { + throw `cannot resolve user: @${username}@${host}`; + } + + // skip myself + if (target.id === job.data.user.id) continue; + + logger.info(`Follow[${linenum}] ${target.id} ...`); + + follow(user, target); + } catch (e) { + logger.warn(`Error in line:${linenum} ${e}`); + } + } + + logger.succ('Imported'); + done(); +} diff --git a/packages/backend/src/queue/processors/db/import-muting.ts b/packages/backend/src/queue/processors/db/import-muting.ts new file mode 100644 index 0000000000..8060980625 --- /dev/null +++ b/packages/backend/src/queue/processors/db/import-muting.ts @@ -0,0 +1,83 @@ +import * as Bull from 'bull'; + +import { queueLogger } from '../../logger'; +import * as Acct from 'misskey-js/built/acct'; +import { resolveUser } from '@/remote/resolve-user'; +import { downloadTextFile } from '@/misc/download-text-file'; +import { isSelfHost, toPuny } from '@/misc/convert-host'; +import { Users, DriveFiles, Mutings } from '@/models/index'; +import { DbUserImportJobData } from '@/queue/types'; +import { User } from '@/models/entities/user'; +import { genId } from '@/misc/gen-id'; + +const logger = queueLogger.createSubLogger('import-muting'); + +export async function importMuting(job: Bull.Job<DbUserImportJobData>, done: any): Promise<void> { + logger.info(`Importing muting of ${job.data.user.id} ...`); + + const user = await Users.findOne(job.data.user.id); + if (user == null) { + done(); + return; + } + + const file = await DriveFiles.findOne({ + id: job.data.fileId + }); + if (file == null) { + done(); + return; + } + + const csv = await downloadTextFile(file.url); + + let linenum = 0; + + for (const line of csv.trim().split('\n')) { + linenum++; + + try { + const acct = line.split(',')[0].trim(); + const { username, host } = Acct.parse(acct); + + let target = isSelfHost(host!) ? await Users.findOne({ + host: null, + usernameLower: username.toLowerCase() + }) : await Users.findOne({ + host: toPuny(host!), + usernameLower: username.toLowerCase() + }); + + if (host == null && target == null) continue; + + if (target == null) { + target = await resolveUser(username, host); + } + + if (target == null) { + throw `cannot resolve user: @${username}@${host}`; + } + + // skip myself + if (target.id === job.data.user.id) continue; + + logger.info(`Mute[${linenum}] ${target.id} ...`); + + await mute(user, target); + } catch (e) { + logger.warn(`Error in line:${linenum} ${e}`); + } + } + + logger.succ('Imported'); + done(); +} + +async function mute(user: User, target: User) { + await Mutings.insert({ + id: genId(), + createdAt: new Date(), + muterId: user.id, + muteeId: target.id, + }); +} diff --git a/packages/backend/src/queue/processors/db/import-user-lists.ts b/packages/backend/src/queue/processors/db/import-user-lists.ts new file mode 100644 index 0000000000..46b728b387 --- /dev/null +++ b/packages/backend/src/queue/processors/db/import-user-lists.ts @@ -0,0 +1,80 @@ +import * as Bull from 'bull'; + +import { queueLogger } from '../../logger'; +import * as Acct from 'misskey-js/built/acct'; +import { resolveUser } from '@/remote/resolve-user'; +import { pushUserToUserList } from '@/services/user-list/push'; +import { downloadTextFile } from '@/misc/download-text-file'; +import { isSelfHost, toPuny } from '@/misc/convert-host'; +import { DriveFiles, Users, UserLists, UserListJoinings } from '@/models/index'; +import { genId } from '@/misc/gen-id'; +import { DbUserImportJobData } from '@/queue/types'; + +const logger = queueLogger.createSubLogger('import-user-lists'); + +export async function importUserLists(job: Bull.Job<DbUserImportJobData>, done: any): Promise<void> { + logger.info(`Importing user lists of ${job.data.user.id} ...`); + + const user = await Users.findOne(job.data.user.id); + if (user == null) { + done(); + return; + } + + const file = await DriveFiles.findOne({ + id: job.data.fileId + }); + if (file == null) { + done(); + return; + } + + const csv = await downloadTextFile(file.url); + + let linenum = 0; + + for (const line of csv.trim().split('\n')) { + linenum++; + + try { + const listName = line.split(',')[0].trim(); + const { username, host } = Acct.parse(line.split(',')[1].trim()); + + let list = await UserLists.findOne({ + userId: user.id, + name: listName + }); + + if (list == null) { + list = await UserLists.save({ + id: genId(), + createdAt: new Date(), + userId: user.id, + name: listName, + userIds: [] + }); + } + + let target = isSelfHost(host!) ? await Users.findOne({ + host: null, + usernameLower: username.toLowerCase() + }) : await Users.findOne({ + host: toPuny(host!), + usernameLower: username.toLowerCase() + }); + + if (target == null) { + target = await resolveUser(username, host); + } + + if (await UserListJoinings.findOne({ userListId: list.id, userId: target.id }) != null) continue; + + pushUserToUserList(target, list); + } catch (e) { + logger.warn(`Error in line:${linenum} ${e}`); + } + } + + logger.succ('Imported'); + done(); +} diff --git a/packages/backend/src/queue/processors/db/index.ts b/packages/backend/src/queue/processors/db/index.ts new file mode 100644 index 0000000000..97087642b7 --- /dev/null +++ b/packages/backend/src/queue/processors/db/index.ts @@ -0,0 +1,33 @@ +import * as Bull from 'bull'; +import { DbJobData } from '@/queue/types'; +import { deleteDriveFiles } from './delete-drive-files'; +import { exportNotes } from './export-notes'; +import { exportFollowing } from './export-following'; +import { exportMute } from './export-mute'; +import { exportBlocking } from './export-blocking'; +import { exportUserLists } from './export-user-lists'; +import { importFollowing } from './import-following'; +import { importUserLists } from './import-user-lists'; +import { deleteAccount } from './delete-account'; +import { importMuting } from './import-muting'; +import { importBlocking } from './import-blocking'; + +const jobs = { + deleteDriveFiles, + exportNotes, + exportFollowing, + exportMute, + exportBlocking, + exportUserLists, + importFollowing, + importMuting, + importBlocking, + importUserLists, + deleteAccount, +} as Record<string, Bull.ProcessCallbackFunction<DbJobData> | Bull.ProcessPromiseFunction<DbJobData>>; + +export default function(dbQueue: Bull.Queue<DbJobData>) { + for (const [k, v] of Object.entries(jobs)) { + dbQueue.process(k, v); + } +} |