diff options
| author | syuilo <syuilotan@yahoo.co.jp> | 2018-04-04 23:12:35 +0900 |
|---|---|---|
| committer | syuilo <syuilotan@yahoo.co.jp> | 2018-04-04 23:12:35 +0900 |
| commit | e8b42d7e1668679e6a6ee0a7aea1e2ff7f37005b (patch) | |
| tree | 6973284192eb419bd7bfed2361a594e668b81f9a /src/queue | |
| parent | Merge pull request #1393 from akihikodaki/duplicate (diff) | |
| download | sharkey-e8b42d7e1668679e6a6ee0a7aea1e2ff7f37005b.tar.gz sharkey-e8b42d7e1668679e6a6ee0a7aea1e2ff7f37005b.tar.bz2 sharkey-e8b42d7e1668679e6a6ee0a7aea1e2ff7f37005b.zip | |
wip
Diffstat (limited to 'src/queue')
| -rw-r--r-- | src/queue/index.ts | 37 | ||||
| -rw-r--r-- | src/queue/processors/db/delete-post-dependents.ts | 22 | ||||
| -rw-r--r-- | src/queue/processors/db/index.ts | 7 | ||||
| -rw-r--r-- | src/queue/processors/http/deliver.ts | 17 | ||||
| -rw-r--r-- | src/queue/processors/http/follow.ts | 69 | ||||
| -rw-r--r-- | src/queue/processors/http/index.ts | 17 | ||||
| -rw-r--r-- | src/queue/processors/http/perform-activitypub.ts | 7 | ||||
| -rw-r--r-- | src/queue/processors/http/process-inbox.ts | 55 | ||||
| -rw-r--r-- | src/queue/processors/http/report-github-failure.ts | 24 | ||||
| -rw-r--r-- | src/queue/processors/http/unfollow.ts | 56 | ||||
| -rw-r--r-- | src/queue/processors/index.ts | 18 |
11 files changed, 329 insertions, 0 deletions
diff --git a/src/queue/index.ts b/src/queue/index.ts new file mode 100644 index 0000000000..c8c436b18c --- /dev/null +++ b/src/queue/index.ts @@ -0,0 +1,37 @@ +import { createQueue } from 'kue'; +import config from '../config'; +import db from './processors/db'; +import http from './processors/http'; + +const queue = createQueue({ + redis: { + port: config.redis.port, + host: config.redis.host, + auth: config.redis.pass + } +}); + +export function createHttp(data) { + return queue + .create('http', data) + .attempts(16) + .backoff({ delay: 16384, type: 'exponential' }); +} + +export function createDb(data) { + return queue.create('db', data); +} + +export function process() { + queue.process('db', db); + + /* + 256 is the default concurrency limit of Mozilla Firefox and Google + Chromium. + a8af215e691f3a2205a3758d2d96e9d328e100ff - chromium/src.git - Git at Google + https://chromium.googlesource.com/chromium/src.git/+/a8af215e691f3a2205a3758d2d96e9d328e100ff + Network.http.max-connections - MozillaZine Knowledge Base + http://kb.mozillazine.org/Network.http.max-connections + */ + queue.process('http', 256, http); +} diff --git a/src/queue/processors/db/delete-post-dependents.ts b/src/queue/processors/db/delete-post-dependents.ts new file mode 100644 index 0000000000..879c41ec9c --- /dev/null +++ b/src/queue/processors/db/delete-post-dependents.ts @@ -0,0 +1,22 @@ +import Favorite from '../../models/favorite'; +import Notification from '../../models/notification'; +import PollVote from '../../models/poll-vote'; +import PostReaction from '../../models/post-reaction'; +import PostWatching from '../../models/post-watching'; +import Post from '../../models/post'; + +export default async ({ data }) => Promise.all([ + Favorite.remove({ postId: data._id }), + Notification.remove({ postId: data._id }), + PollVote.remove({ postId: data._id }), + PostReaction.remove({ postId: data._id }), + PostWatching.remove({ postId: data._id }), + Post.find({ repostId: data._id }).then(reposts => Promise.all([ + Notification.remove({ + postId: { + $in: reposts.map(({ _id }) => _id) + } + }), + Post.remove({ repostId: data._id }) + ])) +]); diff --git a/src/queue/processors/db/index.ts b/src/queue/processors/db/index.ts new file mode 100644 index 0000000000..75838c099b --- /dev/null +++ b/src/queue/processors/db/index.ts @@ -0,0 +1,7 @@ +import deletePostDependents from './delete-post-dependents'; + +const handlers = { + deletePostDependents +}; + +export default (job, done) => handlers[job.data.type](job).then(() => done(), done); diff --git a/src/queue/processors/http/deliver.ts b/src/queue/processors/http/deliver.ts new file mode 100644 index 0000000000..8cd9eb624e --- /dev/null +++ b/src/queue/processors/http/deliver.ts @@ -0,0 +1,17 @@ +import * as kue from 'kue'; + +import Channel from '../../models/channel'; +import Following from '../../models/following'; +import ChannelWatching from '../../models/channel-watching'; +import Post, { pack } from '../../models/post'; +import User, { isLocalUser } from '../../models/user'; +import stream, { publishChannelStream } from '../../publishers/stream'; +import context from '../../remote/activitypub/renderer/context'; +import renderCreate from '../../remote/activitypub/renderer/create'; +import renderNote from '../../remote/activitypub/renderer/note'; +import request from '../../remote/request'; + +export default async (job: kue.Job, done): Promise<void> => { + + request(user, following.follower[0].account.inbox, create); +} diff --git a/src/queue/processors/http/follow.ts b/src/queue/processors/http/follow.ts new file mode 100644 index 0000000000..8bf890efbc --- /dev/null +++ b/src/queue/processors/http/follow.ts @@ -0,0 +1,69 @@ +import User, { isLocalUser, pack as packUser } from '../../models/user'; +import Following from '../../models/following'; +import FollowingLog from '../../models/following-log'; +import FollowedLog from '../../models/followed-log'; +import event from '../../publishers/stream'; +import notify from '../../publishers/notify'; +import context from '../../remote/activitypub/renderer/context'; +import render from '../../remote/activitypub/renderer/follow'; +import request from '../../remote/request'; + +export default ({ data }) => Following.findOne({ _id: data.following }).then(({ followerId, followeeId }) => { + const promisedFollower = User.findOne({ _id: followerId }); + const promisedFollowee = User.findOne({ _id: followeeId }); + + return Promise.all([ + // Increment following count + User.update(followerId, { + $inc: { + followingCount: 1 + } + }), + + promisedFollower.then(({ followingCount }) => FollowingLog.insert({ + createdAt: data.following.createdAt, + userId: followerId, + count: followingCount + 1 + })), + + // Increment followers count + User.update({ _id: followeeId }, { + $inc: { + followersCount: 1 + } + }), + + promisedFollowee.then(({ followersCount }) => FollowedLog.insert({ + createdAt: data.following.createdAt, + userId: followerId, + count: followersCount + 1 + })), + + // Notify + promisedFollowee.then(followee => followee.host === null ? + notify(followeeId, followerId, 'follow') : null), + + // Publish follow event + Promise.all([promisedFollower, promisedFollowee]).then(([follower, followee]) => { + let followerEvent; + let followeeEvent; + + if (isLocalUser(follower)) { + followerEvent = packUser(followee, follower) + .then(packed => event(follower._id, 'follow', packed)); + } + + if (isLocalUser(followee)) { + followeeEvent = packUser(follower, followee) + .then(packed => event(followee._id, 'followed', packed)); + } else if (isLocalUser(follower)) { + const rendered = render(follower, followee); + rendered['@context'] = context; + + followeeEvent = request(follower, followee.account.inbox, rendered); + } + + return Promise.all([followerEvent, followeeEvent]); + }) + ]); +}); diff --git a/src/queue/processors/http/index.ts b/src/queue/processors/http/index.ts new file mode 100644 index 0000000000..8f9aa717c3 --- /dev/null +++ b/src/queue/processors/http/index.ts @@ -0,0 +1,17 @@ +import deliverPost from './deliver-post'; +import follow from './follow'; +import performActivityPub from './perform-activitypub'; +import processInbox from './process-inbox'; +import reportGitHubFailure from './report-github-failure'; +import unfollow from './unfollow'; + +const handlers = { + deliverPost, + follow, + performActivityPub, + processInbox, + reportGitHubFailure, + unfollow +}; + +export default (job, done) => handlers[job.data.type](job).then(() => done(), done); diff --git a/src/queue/processors/http/perform-activitypub.ts b/src/queue/processors/http/perform-activitypub.ts new file mode 100644 index 0000000000..963e532fe5 --- /dev/null +++ b/src/queue/processors/http/perform-activitypub.ts @@ -0,0 +1,7 @@ +import User from '../../models/user'; +import act from '../../remote/activitypub/act'; +import Resolver from '../../remote/activitypub/resolver'; + +export default ({ data }) => User.findOne({ _id: data.actor }) + .then(actor => act(new Resolver(), actor, data.outbox)) + .then(Promise.all); diff --git a/src/queue/processors/http/process-inbox.ts b/src/queue/processors/http/process-inbox.ts new file mode 100644 index 0000000000..fff1fbf663 --- /dev/null +++ b/src/queue/processors/http/process-inbox.ts @@ -0,0 +1,55 @@ +import * as kue from 'kue'; + +import { verifySignature } from 'http-signature'; +import parseAcct from '../../acct/parse'; +import User, { IRemoteUser } from '../../models/user'; +import act from '../../remote/activitypub/act'; +import resolvePerson from '../../remote/activitypub/resolve-person'; +import Resolver from '../../remote/activitypub/resolver'; + +// ユーザーのinboxにアクティビティが届いた時の処理 +export default async (job: kue.Job, done): Promise<void> => { + const signature = job.data.signature; + const activity = job.data.activity; + + const keyIdLower = signature.keyId.toLowerCase(); + let user; + + if (keyIdLower.startsWith('acct:')) { + const { username, host } = parseAcct(keyIdLower.slice('acct:'.length)); + if (host === null) { + console.warn(`request was made by local user: @${username}`); + done(); + } + + user = await User.findOne({ usernameLower: username, hostLower: host }) as IRemoteUser; + } else { + user = await User.findOne({ + host: { $ne: null }, + 'account.publicKey.id': signature.keyId + }) as IRemoteUser; + + // アクティビティを送信してきたユーザーがまだMisskeyサーバーに登録されていなかったら登録する + if (user === null) { + user = await resolvePerson(signature.keyId); + } + } + + if (user === null) { + done(new Error('failed to resolve user')); + return; + } + + if (!verifySignature(signature, user.account.publicKey.publicKeyPem)) { + done(new Error('signature verification failed')); + return; + } + + // アクティビティを処理 + try { + await act(new Resolver(), user, activity); + done(); + } catch (e) { + done(e); + } +}; diff --git a/src/queue/processors/http/report-github-failure.ts b/src/queue/processors/http/report-github-failure.ts new file mode 100644 index 0000000000..4f6f5ccee5 --- /dev/null +++ b/src/queue/processors/http/report-github-failure.ts @@ -0,0 +1,24 @@ +import * as request from 'request-promise-native'; +import User from '../../models/user'; +const createPost = require('../../server/api/endpoints/posts/create'); + +export default async ({ data }) => { + const asyncBot = User.findOne({ _id: data.userId }); + + // Fetch parent status + const parentStatuses = await request({ + url: `${data.parentUrl}/statuses`, + headers: { + 'User-Agent': 'misskey' + }, + json: true + }); + + const parentState = parentStatuses[0].state; + const stillFailed = parentState == 'failure' || parentState == 'error'; + const text = stillFailed ? + `**⚠️BUILD STILL FAILED⚠️**: ?[${data.message}](${data.htmlUrl})` : + `**🚨BUILD FAILED🚨**: →→→?[${data.message}](${data.htmlUrl})←←←`; + + createPost({ text }, await asyncBot); +}; diff --git a/src/queue/processors/http/unfollow.ts b/src/queue/processors/http/unfollow.ts new file mode 100644 index 0000000000..d3d5f2246f --- /dev/null +++ b/src/queue/processors/http/unfollow.ts @@ -0,0 +1,56 @@ +import FollowedLog from '../../models/followed-log'; +import Following from '../../models/following'; +import FollowingLog from '../../models/following-log'; +import User, { isRemoteUser, pack as packUser } from '../../models/user'; +import stream from '../../publishers/stream'; +import renderFollow from '../../remote/activitypub/renderer/follow'; +import renderUndo from '../../remote/activitypub/renderer/undo'; +import context from '../../remote/activitypub/renderer/context'; +import request from '../../remote/request'; + +export default async ({ data }) => { + // Delete following + const following = await Following.findOneAndDelete({ _id: data.id }); + if (following === null) { + return; + } + + const promisedFollower = User.findOne({ _id: following.followerId }); + const promisedFollowee = User.findOne({ _id: following.followeeId }); + + await Promise.all([ + // Decrement following count + User.update({ _id: following.followerId }, { $inc: { followingCount: -1 } }), + promisedFollower.then(({ followingCount }) => FollowingLog.insert({ + createdAt: new Date(), + userId: following.followerId, + count: followingCount - 1 + })), + + // Decrement followers count + User.update({ _id: following.followeeId }, { $inc: { followersCount: -1 } }), + promisedFollowee.then(({ followersCount }) => FollowedLog.insert({ + createdAt: new Date(), + userId: following.followeeId, + count: followersCount - 1 + })), + + // Publish follow event + Promise.all([promisedFollower, promisedFollowee]).then(async ([follower, followee]) => { + if (isRemoteUser(follower)) { + return; + } + + const promisedPackedUser = packUser(followee, follower); + + if (isRemoteUser(followee)) { + const undo = renderUndo(renderFollow(follower, followee)); + undo['@context'] = context; + + await request(follower, followee.account.inbox, undo); + } + + stream(follower._id, 'unfollow', promisedPackedUser); + }) + ]); +}; diff --git a/src/queue/processors/index.ts b/src/queue/processors/index.ts new file mode 100644 index 0000000000..172048ddae --- /dev/null +++ b/src/queue/processors/index.ts @@ -0,0 +1,18 @@ +import queue from '../queue'; +import db from './db'; +import http from './http'; + +export default () => { + queue.process('db', db); + + /* + 256 is the default concurrency limit of Mozilla Firefox and Google + Chromium. + + a8af215e691f3a2205a3758d2d96e9d328e100ff - chromium/src.git - Git at Google + https://chromium.googlesource.com/chromium/src.git/+/a8af215e691f3a2205a3758d2d96e9d328e100ff + Network.http.max-connections - MozillaZine Knowledge Base + http://kb.mozillazine.org/Network.http.max-connections + */ + queue.process('http', 256, http); +}; |