diff options
Diffstat (limited to 'src/queue')
| -rw-r--r-- | src/queue/index.ts | 38 | ||||
| -rw-r--r-- | src/queue/processors/db/delete-post-dependents.ts | 22 | ||||
| -rw-r--r-- | src/queue/processors/db/index.ts | 7 | ||||
| -rw-r--r-- | src/queue/processors/http/deliver-post.ts | 21 | ||||
| -rw-r--r-- | src/queue/processors/http/follow.ts | 69 | ||||
| -rw-r--r-- | src/queue/processors/http/index.ts | 17 | ||||
| -rw-r--r-- | src/queue/processors/http/perform-activitypub.ts | 7 | ||||
| -rw-r--r-- | src/queue/processors/http/process-inbox.ts | 39 | ||||
| -rw-r--r-- | src/queue/processors/http/report-github-failure.ts | 24 | ||||
| -rw-r--r-- | src/queue/processors/http/unfollow.ts | 63 |
10 files changed, 307 insertions, 0 deletions
diff --git a/src/queue/index.ts b/src/queue/index.ts new file mode 100644 index 0000000000..f90754a561 --- /dev/null +++ b/src/queue/index.ts @@ -0,0 +1,38 @@ +import { createQueue } from 'kue'; +import config from '../config'; +import db from './processors/db'; +import http from './processors/http'; + +const queue = createQueue({ + redis: { + port: config.redis.port, + host: config.redis.host, + auth: config.redis.pass + } +}); + +export function createHttp(data) { + return queue + .create('http', data) + .attempts(16) + .backoff({ delay: 16384, type: 'exponential' }); +} + +export function createDb(data) { + return queue.create('db', data); +} + +export function process() { + queue.process('db', db); + + /* + 256 is the default concurrency limit of Mozilla Firefox and Google + Chromium. + + a8af215e691f3a2205a3758d2d96e9d328e100ff - chromium/src.git - Git at Google + https://chromium.googlesource.com/chromium/src.git/+/a8af215e691f3a2205a3758d2d96e9d328e100ff + Network.http.max-connections - MozillaZine Knowledge Base + http://kb.mozillazine.org/Network.http.max-connections + */ + queue.process('http', 256, http); +} diff --git a/src/queue/processors/db/delete-post-dependents.ts b/src/queue/processors/db/delete-post-dependents.ts new file mode 100644 index 0000000000..6de21eb053 --- /dev/null +++ b/src/queue/processors/db/delete-post-dependents.ts @@ -0,0 +1,22 @@ +import Favorite from '../../../models/favorite'; +import Notification from '../../../models/notification'; +import PollVote from '../../../models/poll-vote'; +import PostReaction from '../../../models/post-reaction'; +import PostWatching from '../../../models/post-watching'; +import Post from '../../../models/post'; + +export default async ({ data }) => Promise.all([ + Favorite.remove({ postId: data._id }), + Notification.remove({ postId: data._id }), + PollVote.remove({ postId: data._id }), + PostReaction.remove({ postId: data._id }), + PostWatching.remove({ postId: data._id }), + Post.find({ repostId: data._id }).then(reposts => Promise.all([ + Notification.remove({ + postId: { + $in: reposts.map(({ _id }) => _id) + } + }), + Post.remove({ repostId: data._id }) + ])) +]); diff --git a/src/queue/processors/db/index.ts b/src/queue/processors/db/index.ts new file mode 100644 index 0000000000..75838c099b --- /dev/null +++ b/src/queue/processors/db/index.ts @@ -0,0 +1,7 @@ +import deletePostDependents from './delete-post-dependents'; + +const handlers = { + deletePostDependents +}; + +export default (job, done) => handlers[job.data.type](job).then(() => done(), done); diff --git a/src/queue/processors/http/deliver-post.ts b/src/queue/processors/http/deliver-post.ts new file mode 100644 index 0000000000..e743fc5f68 --- /dev/null +++ b/src/queue/processors/http/deliver-post.ts @@ -0,0 +1,21 @@ +import Post from '../../../models/post'; +import User, { IRemoteUser } from '../../../models/user'; +import context from '../../../remote/activitypub/renderer/context'; +import renderCreate from '../../../remote/activitypub/renderer/create'; +import renderNote from '../../../remote/activitypub/renderer/note'; +import request from '../../../remote/request'; + +export default async ({ data }) => { + const promisedTo = User.findOne({ _id: data.toId }) as Promise<IRemoteUser>; + const [from, post] = await Promise.all([ + User.findOne({ _id: data.fromId }), + Post.findOne({ _id: data.postId }) + ]); + const note = await renderNote(from, post); + const to = await promisedTo; + const create = renderCreate(note); + + create['@context'] = context; + + return request(from, to.account.inbox, create); +}; diff --git a/src/queue/processors/http/follow.ts b/src/queue/processors/http/follow.ts new file mode 100644 index 0000000000..4cb72828e7 --- /dev/null +++ b/src/queue/processors/http/follow.ts @@ -0,0 +1,69 @@ +import User, { isLocalUser, isRemoteUser, pack as packUser } from '../../../models/user'; +import Following from '../../../models/following'; +import FollowingLog from '../../../models/following-log'; +import FollowedLog from '../../../models/followed-log'; +import event from '../../../publishers/stream'; +import notify from '../../../publishers/notify'; +import context from '../../../remote/activitypub/renderer/context'; +import render from '../../../remote/activitypub/renderer/follow'; +import request from '../../../remote/request'; +import Logger from '../../../utils/logger'; + +export default async ({ data }) => { + const { followerId, followeeId } = await Following.findOne({ _id: data.following }); + const [follower, followee] = await Promise.all([ + User.findOne({ _id: followerId }), + User.findOne({ _id: followeeId }) + ]); + + if (isLocalUser(follower) && isRemoteUser(followee)) { + const rendered = render(follower, followee); + rendered['@context'] = context; + + await request(follower, followee.account.inbox, rendered); + } + + try { + await Promise.all([ + // Increment following count + User.update(followerId, { + $inc: { + followingCount: 1 + } + }), + + FollowingLog.insert({ + createdAt: data.following.createdAt, + userId: followerId, + count: follower.followingCount + 1 + }), + + // Increment followers count + User.update({ _id: followeeId }, { + $inc: { + followersCount: 1 + } + }), + + FollowedLog.insert({ + createdAt: data.following.createdAt, + userId: followerId, + count: followee.followersCount + 1 + }), + + // Publish follow event + isLocalUser(follower) && packUser(followee, follower) + .then(packed => event(follower._id, 'follow', packed)), + + isLocalUser(followee) && Promise.all([ + packUser(follower, followee) + .then(packed => event(followee._id, 'followed', packed)), + + // Notify + isLocalUser(followee) && notify(followeeId, followerId, 'follow') + ]) + ]); + } catch (error) { + Logger.error(error.toString()); + } +}; diff --git a/src/queue/processors/http/index.ts b/src/queue/processors/http/index.ts new file mode 100644 index 0000000000..8f9aa717c3 --- /dev/null +++ b/src/queue/processors/http/index.ts @@ -0,0 +1,17 @@ +import deliverPost from './deliver-post'; +import follow from './follow'; +import performActivityPub from './perform-activitypub'; +import processInbox from './process-inbox'; +import reportGitHubFailure from './report-github-failure'; +import unfollow from './unfollow'; + +const handlers = { + deliverPost, + follow, + performActivityPub, + processInbox, + reportGitHubFailure, + unfollow +}; + +export default (job, done) => handlers[job.data.type](job).then(() => done(), done); diff --git a/src/queue/processors/http/perform-activitypub.ts b/src/queue/processors/http/perform-activitypub.ts new file mode 100644 index 0000000000..7b84400d5c --- /dev/null +++ b/src/queue/processors/http/perform-activitypub.ts @@ -0,0 +1,7 @@ +import User from '../../../models/user'; +import act from '../../../remote/activitypub/act'; +import Resolver from '../../../remote/activitypub/resolver'; + +export default ({ data }) => User.findOne({ _id: data.actor }) + .then(actor => act(new Resolver(), actor, data.outbox)) + .then(Promise.all); diff --git a/src/queue/processors/http/process-inbox.ts b/src/queue/processors/http/process-inbox.ts new file mode 100644 index 0000000000..de1dbd2f98 --- /dev/null +++ b/src/queue/processors/http/process-inbox.ts @@ -0,0 +1,39 @@ +import { verifySignature } from 'http-signature'; +import parseAcct from '../../../acct/parse'; +import User, { IRemoteUser } from '../../../models/user'; +import act from '../../../remote/activitypub/act'; +import resolvePerson from '../../../remote/activitypub/resolve-person'; +import Resolver from '../../../remote/activitypub/resolver'; + +export default async ({ data }): Promise<void> => { + const keyIdLower = data.signature.keyId.toLowerCase(); + let user; + + if (keyIdLower.startsWith('acct:')) { + const { username, host } = parseAcct(keyIdLower.slice('acct:'.length)); + if (host === null) { + throw 'request was made by local user'; + } + + user = await User.findOne({ usernameLower: username, hostLower: host }) as IRemoteUser; + } else { + user = await User.findOne({ + host: { $ne: null }, + 'account.publicKey.id': data.signature.keyId + }) as IRemoteUser; + + if (user === null) { + user = await resolvePerson(data.signature.keyId); + } + } + + if (user === null) { + throw 'failed to resolve user'; + } + + if (!verifySignature(data.signature, user.account.publicKey.publicKeyPem)) { + throw 'signature verification failed'; + } + + await Promise.all(await act(new Resolver(), user, data.inbox, true)); +}; diff --git a/src/queue/processors/http/report-github-failure.ts b/src/queue/processors/http/report-github-failure.ts new file mode 100644 index 0000000000..21683ba3c2 --- /dev/null +++ b/src/queue/processors/http/report-github-failure.ts @@ -0,0 +1,24 @@ +import * as request from 'request-promise-native'; +import User from '../../../models/user'; +const createPost = require('../../../server/api/endpoints/posts/create'); + +export default async ({ data }) => { + const asyncBot = User.findOne({ _id: data.userId }); + + // Fetch parent status + const parentStatuses = await request({ + url: `${data.parentUrl}/statuses`, + headers: { + 'User-Agent': 'misskey' + }, + json: true + }); + + const parentState = parentStatuses[0].state; + const stillFailed = parentState == 'failure' || parentState == 'error'; + const text = stillFailed ? + `**⚠️BUILD STILL FAILED⚠️**: ?[${data.message}](${data.htmlUrl})` : + `**🚨BUILD FAILED🚨**: →→→?[${data.message}](${data.htmlUrl})←←←`; + + createPost({ text }, await asyncBot); +}; diff --git a/src/queue/processors/http/unfollow.ts b/src/queue/processors/http/unfollow.ts new file mode 100644 index 0000000000..801a3612a7 --- /dev/null +++ b/src/queue/processors/http/unfollow.ts @@ -0,0 +1,63 @@ +import FollowedLog from '../../../models/followed-log'; +import Following from '../../../models/following'; +import FollowingLog from '../../../models/following-log'; +import User, { isLocalUser, isRemoteUser, pack as packUser } from '../../../models/user'; +import stream from '../../../publishers/stream'; +import renderFollow from '../../../remote/activitypub/renderer/follow'; +import renderUndo from '../../../remote/activitypub/renderer/undo'; +import context from '../../../remote/activitypub/renderer/context'; +import request from '../../../remote/request'; +import Logger from '../../../utils/logger'; + +export default async ({ data }) => { + const following = await Following.findOne({ _id: data.id }); + if (following === null) { + return; + } + + const [follower, followee] = await Promise.all([ + User.findOne({ _id: following.followerId }), + User.findOne({ _id: following.followeeId }) + ]); + + if (isLocalUser(follower) && isRemoteUser(followee)) { + const undo = renderUndo(renderFollow(follower, followee)); + undo['@context'] = context; + + await request(follower, followee.account.inbox, undo); + } + + try { + await Promise.all([ + // Delete following + Following.findOneAndDelete({ _id: data.id }), + + // Decrement following count + User.update({ _id: follower._id }, { $inc: { followingCount: -1 } }), + FollowingLog.insert({ + createdAt: new Date(), + userId: follower._id, + count: follower.followingCount - 1 + }), + + // Decrement followers count + User.update({ _id: followee._id }, { $inc: { followersCount: -1 } }), + FollowedLog.insert({ + createdAt: new Date(), + userId: followee._id, + count: followee.followersCount - 1 + }) + ]); + + if (isLocalUser(follower)) { + return; + } + + const promisedPackedUser = packUser(followee, follower); + + // Publish follow event + stream(follower._id, 'unfollow', promisedPackedUser); + } catch (error) { + Logger.error(error.toString()); + } +}; |