diff options
| author | syuilo <syuilotan@yahoo.co.jp> | 2019-03-11 19:43:58 +0900 |
|---|---|---|
| committer | syuilo <syuilotan@yahoo.co.jp> | 2019-03-11 19:43:58 +0900 |
| commit | c3d34bda37ca3b48214b094d54ee22d987a42574 (patch) | |
| tree | cc5c7bb9d1883563e83c3a0165b2c5e6380da9ba /src/queue | |
| parent | 10.92.4 (diff) | |
| download | sharkey-c3d34bda37ca3b48214b094d54ee22d987a42574.tar.gz sharkey-c3d34bda37ca3b48214b094d54ee22d987a42574.tar.bz2 sharkey-c3d34bda37ca3b48214b094d54ee22d987a42574.zip | |
Resolve #4259
Diffstat (limited to 'src/queue')
| -rw-r--r-- | src/queue/index.ts | 11 | ||||
| -rw-r--r-- | src/queue/processors/db/import-user-lists.ts | 140 | ||||
| -rw-r--r-- | src/queue/processors/db/index.ts | 4 |
3 files changed, 154 insertions, 1 deletions
diff --git a/src/queue/index.ts b/src/queue/index.ts index 00a4a48f14..09e0ad59c9 100644 --- a/src/queue/index.ts +++ b/src/queue/index.ts @@ -9,6 +9,7 @@ import processDeliver from './processors/deliver'; import processInbox from './processors/inbox'; import processDb from './processors/db'; import { queueLogger } from './logger'; +import { IDriveFile } from '../models/drive-file'; function initializeQueue(name: string) { return new Queue(name, config.redis != null ? { @@ -145,6 +146,16 @@ export function createExportUserListsJob(user: ILocalUser) { }); } +export function createImportUserListsJob(user: ILocalUser, fileId: IDriveFile['_id']) { + return dbQueue.add('importUserLists', { + user: user, + fileId: fileId + }, { + removeOnComplete: true, + removeOnFail: true + }); +} + export default function() { if (!program.onlyServer) { deliverQueue.process(128, processDeliver); diff --git a/src/queue/processors/db/import-user-lists.ts b/src/queue/processors/db/import-user-lists.ts new file mode 100644 index 0000000000..ee1468d5ae --- /dev/null +++ b/src/queue/processors/db/import-user-lists.ts @@ -0,0 +1,140 @@ +import * as Bull from 'bull'; +import * as tmp from 'tmp'; +import * as fs from 'fs'; +import * as util from 'util'; +import * as mongo from 'mongodb'; +import * as request from 'request'; + +import { queueLogger } from '../../logger'; +import User from '../../../models/user'; +import config from '../../../config'; +import UserList from '../../../models/user-list'; +import DriveFile from '../../../models/drive-file'; +import chalk from 'chalk'; +import { getOriginalUrl } from '../../../misc/get-drive-file-url'; +import parseAcct from '../../../misc/acct/parse'; +import resolveUser from '../../../remote/resolve-user'; + +const logger = queueLogger.createSubLogger('import-user-lists'); + +export async function importUserLists(job: Bull.Job, done: any): Promise<void> { + logger.info(`Importing user lists of ${job.data.user._id} ...`); + + const user = await User.findOne({ + _id: new mongo.ObjectID(job.data.user._id.toString()) + }); + + const file = await DriveFile.findOne({ + _id: new mongo.ObjectID(job.data.fileId.toString()) + }); + + const url = getOriginalUrl(file); + + // Create temp file + const [path, cleanup] = await new Promise<[string, any]>((res, rej) => { + tmp.file((e, path, fd, cleanup) => { + if (e) return rej(e); + res([path, cleanup]); + }); + }); + + logger.info(`Temp file is ${path}`); + + // write content at URL to temp file + await new Promise((res, rej) => { + logger.info(`Downloading ${chalk.cyan(url)} ...`); + + const writable = fs.createWriteStream(path); + + writable.on('finish', () => { + logger.succ(`Download finished: ${chalk.cyan(url)}`); + res(); + }); + + writable.on('error', error => { + logger.error(`Download failed: ${chalk.cyan(url)}: ${error}`, { + url: url, + e: error + }); + rej(error); + }); + + const requestUrl = new URL(url).pathname.match(/[^\u0021-\u00ff]/) ? encodeURI(url) : url; + + const req = request({ + url: requestUrl, + proxy: config.proxy, + timeout: 10 * 1000, + headers: { + 'User-Agent': config.userAgent + } + }); + + req.pipe(writable); + + req.on('response', response => { + if (response.statusCode !== 200) { + logger.error(`Got ${response.statusCode} (${url})`); + writable.close(); + rej(response.statusCode); + } + }); + + req.on('error', error => { + logger.error(`Failed to start download: ${chalk.cyan(url)}: ${error}`, { + url: url, + e: error + }); + writable.close(); + rej(error); + }); + }); + + logger.succ(`Downloaded to: ${path}`); + + const csv = await util.promisify(fs.readFile)(path, 'utf8'); + + for (const line of csv.trim().split('\n')) { + const listName = line.split(',')[0].trim(); + const { username, host } = parseAcct(line.split(',')[1].trim()); + + let list = await UserList.findOne({ + userId: user._id, + title: listName + }); + + if (list == null) { + list = await UserList.insert({ + createdAt: new Date(), + userId: user._id, + title: listName, + userIds: [] + }); + } + + let target = host === config.host ? await User.findOne({ + host: null, + usernameLower: username.toLowerCase() + }) : await User.findOne({ + host: host, + usernameLower: username.toLowerCase() + }); + + if (host == null && target == null) continue; + if (list.userIds.some(id => id.equals(target._id))) continue; + + if (target == null) { + target = await resolveUser(username, host); + } + + await UserList.update({ _id: list._id }, { + $push: { + userIds: target._id + } + }); + } + + logger.succ('Imported'); + cleanup(); + done(); +} diff --git a/src/queue/processors/db/index.ts b/src/queue/processors/db/index.ts index 8ac9c1a3d6..4a97a1c884 100644 --- a/src/queue/processors/db/index.ts +++ b/src/queue/processors/db/index.ts @@ -6,6 +6,7 @@ import { exportFollowing } from './export-following'; import { exportMute } from './export-mute'; import { exportBlocking } from './export-blocking'; import { exportUserLists } from './export-user-lists'; +import { importUserLists } from './import-user-lists'; const jobs = { deleteNotes, @@ -14,7 +15,8 @@ const jobs = { exportFollowing, exportMute, exportBlocking, - exportUserLists + exportUserLists, + importUserLists } as any; export default function(dbQueue: Bull.Queue) { |