From d7c13b975f55c85b695b72a3ded3d5de97227414 Mon Sep 17 00:00:00 2001 From: Akihiko Odaki Date: Wed, 4 Apr 2018 22:45:55 +0900 Subject: Retry HTTP requests --- src/following/distribute.ts | 42 +++++++++++++ src/index.ts | 2 +- src/post/distribute.ts | 4 +- src/processor/db/delete-post-dependents.ts | 22 ------- src/processor/db/index.ts | 7 --- src/processor/http/deliver-post.ts | 21 ------- src/processor/http/follow.ts | 69 ---------------------- src/processor/http/index.ts | 17 ------ src/processor/http/perform-activitypub.ts | 7 --- src/processor/http/process-inbox.ts | 39 ------------ src/processor/http/report-github-failure.ts | 24 -------- src/processor/http/unfollow.ts | 63 -------------------- src/processor/index.ts | 18 ------ src/queue.ts | 10 ---- src/queue/index.ts | 38 ++++++++++++ src/queue/processors/db/delete-post-dependents.ts | 22 +++++++ src/queue/processors/db/index.ts | 7 +++ src/queue/processors/http/deliver-post.ts | 21 +++++++ src/queue/processors/http/follow.ts | 69 ++++++++++++++++++++++ src/queue/processors/http/index.ts | 17 ++++++ src/queue/processors/http/perform-activitypub.ts | 7 +++ src/queue/processors/http/process-inbox.ts | 39 ++++++++++++ src/queue/processors/http/report-github-failure.ts | 24 ++++++++ src/queue/processors/http/unfollow.ts | 63 ++++++++++++++++++++ src/remote/activitypub/act/follow.ts | 4 +- src/remote/activitypub/act/undo/unfollow.ts | 4 +- src/remote/activitypub/delete/post.ts | 4 +- src/remote/activitypub/resolve-person.ts | 4 +- src/server/activitypub/inbox.ts | 4 +- src/server/api/endpoints/following/create.ts | 4 +- src/server/api/endpoints/following/delete.ts | 4 +- src/server/api/service/github.ts | 4 +- 32 files changed, 368 insertions(+), 316 deletions(-) create mode 100644 src/following/distribute.ts delete mode 100644 src/processor/db/delete-post-dependents.ts delete mode 100644 src/processor/db/index.ts delete mode 100644 src/processor/http/deliver-post.ts delete mode 100644 src/processor/http/follow.ts delete mode 100644 src/processor/http/index.ts delete mode 100644 src/processor/http/perform-activitypub.ts delete mode 100644 src/processor/http/process-inbox.ts delete mode 100644 src/processor/http/report-github-failure.ts delete mode 100644 src/processor/http/unfollow.ts delete mode 100644 src/processor/index.ts delete mode 100644 src/queue.ts create mode 100644 src/queue/index.ts create mode 100644 src/queue/processors/db/delete-post-dependents.ts create mode 100644 src/queue/processors/db/index.ts create mode 100644 src/queue/processors/http/deliver-post.ts create mode 100644 src/queue/processors/http/follow.ts create mode 100644 src/queue/processors/http/index.ts create mode 100644 src/queue/processors/http/perform-activitypub.ts create mode 100644 src/queue/processors/http/process-inbox.ts create mode 100644 src/queue/processors/http/report-github-failure.ts create mode 100644 src/queue/processors/http/unfollow.ts (limited to 'src') diff --git a/src/following/distribute.ts b/src/following/distribute.ts new file mode 100644 index 0000000000..10ff988814 --- /dev/null +++ b/src/following/distribute.ts @@ -0,0 +1,42 @@ +import User, { pack as packUser } from '../models/user'; +import FollowingLog from '../models/following-log'; +import FollowedLog from '../models/followed-log'; +import event from '../publishers/stream'; +import notify from '../publishers/notify'; + +export default async (follower, followee) => Promise.all([ + // Increment following count + User.update(follower._id, { + $inc: { + followingCount: 1 + } + }), + + FollowingLog.insert({ + createdAt: new Date(), + userId: followee._id, + count: follower.followingCount + 1 + }), + + // Increment followers count + User.update({ _id: followee._id }, { + $inc: { + followersCount: 1 + } + }), + + FollowedLog.insert({ + createdAt: new Date(), + userId: follower._id, + count: followee.followersCount + 1 + }), + + followee.host === null && Promise.all([ + // Notify + notify(followee.id, follower.id, 'follow'), + + // Publish follow event + packUser(follower, followee) + .then(packed => event(followee._id, 'followed', packed)) + ]) +]); diff --git a/src/index.ts b/src/index.ts index 29c4f3431a..21fb2f5530 100644 --- a/src/index.ts +++ b/src/index.ts @@ -99,7 +99,7 @@ async function workerMain(opt) { if (!opt['only-server']) { // start processor - require('./processor').default(); + require('./queue').process(); } // Send a 'ready' message to parent process diff --git a/src/post/distribute.ts b/src/post/distribute.ts index ad699d6b84..f748a620c0 100644 --- a/src/post/distribute.ts +++ b/src/post/distribute.ts @@ -8,7 +8,7 @@ import User, { isLocalUser } from '../models/user'; import stream, { publishChannelStream } from '../publishers/stream'; import notify from '../publishers/notify'; import pushSw from '../publishers/push-sw'; -import queue from '../queue'; +import { createHttp } from '../queue'; import watch from './watch'; export default async (user, mentions, post) => { @@ -84,7 +84,7 @@ export default async (user, mentions, post) => { } return new Promise((resolve, reject) => { - queue.create('http', { + createHttp({ type: 'deliverPost', fromId: user._id, toId: following.followerId, diff --git a/src/processor/db/delete-post-dependents.ts b/src/processor/db/delete-post-dependents.ts deleted file mode 100644 index 879c41ec9c..0000000000 --- a/src/processor/db/delete-post-dependents.ts +++ /dev/null @@ -1,22 +0,0 @@ -import Favorite from '../../models/favorite'; -import Notification from '../../models/notification'; -import PollVote from '../../models/poll-vote'; -import PostReaction from '../../models/post-reaction'; -import PostWatching from '../../models/post-watching'; -import Post from '../../models/post'; - -export default async ({ data }) => Promise.all([ - Favorite.remove({ postId: data._id }), - Notification.remove({ postId: data._id }), - PollVote.remove({ postId: data._id }), - PostReaction.remove({ postId: data._id }), - PostWatching.remove({ postId: data._id }), - Post.find({ repostId: data._id }).then(reposts => Promise.all([ - Notification.remove({ - postId: { - $in: reposts.map(({ _id }) => _id) - } - }), - Post.remove({ repostId: data._id }) - ])) -]); diff --git a/src/processor/db/index.ts b/src/processor/db/index.ts deleted file mode 100644 index 75838c099b..0000000000 --- a/src/processor/db/index.ts +++ /dev/null @@ -1,7 +0,0 @@ -import deletePostDependents from './delete-post-dependents'; - -const handlers = { - deletePostDependents -}; - -export default (job, done) => handlers[job.data.type](job).then(() => done(), done); diff --git a/src/processor/http/deliver-post.ts b/src/processor/http/deliver-post.ts deleted file mode 100644 index 48ad4f95a1..0000000000 --- a/src/processor/http/deliver-post.ts +++ /dev/null @@ -1,21 +0,0 @@ -import Post from '../../models/post'; -import User, { IRemoteUser } from '../../models/user'; -import context from '../../remote/activitypub/renderer/context'; -import renderCreate from '../../remote/activitypub/renderer/create'; -import renderNote from '../../remote/activitypub/renderer/note'; -import request from '../../remote/request'; - -export default async ({ data }) => { - const promisedTo = User.findOne({ _id: data.toId }) as Promise; - const [from, post] = await Promise.all([ - User.findOne({ _id: data.fromId }), - Post.findOne({ _id: data.postId }) - ]); - const note = await renderNote(from, post); - const to = await promisedTo; - const create = renderCreate(note); - - create['@context'] = context; - - return request(from, to.account.inbox, create); -}; diff --git a/src/processor/http/follow.ts b/src/processor/http/follow.ts deleted file mode 100644 index ed36fa18d4..0000000000 --- a/src/processor/http/follow.ts +++ /dev/null @@ -1,69 +0,0 @@ -import User, { isLocalUser, isRemoteUser, pack as packUser } from '../../models/user'; -import Following from '../../models/following'; -import FollowingLog from '../../models/following-log'; -import FollowedLog from '../../models/followed-log'; -import event from '../../publishers/stream'; -import notify from '../../publishers/notify'; -import context from '../../remote/activitypub/renderer/context'; -import render from '../../remote/activitypub/renderer/follow'; -import request from '../../remote/request'; -import Logger from '../../utils/logger'; - -export default async ({ data }) => { - const { followerId, followeeId } = await Following.findOne({ _id: data.following }); - const [follower, followee] = await Promise.all([ - User.findOne({ _id: followerId }), - User.findOne({ _id: followeeId }) - ]); - - if (isLocalUser(follower) && isRemoteUser(followee)) { - const rendered = render(follower, followee); - rendered['@context'] = context; - - await request(follower, followee.account.inbox, rendered); - } - - try { - await Promise.all([ - // Increment following count - User.update(followerId, { - $inc: { - followingCount: 1 - } - }), - - FollowingLog.insert({ - createdAt: data.following.createdAt, - userId: followerId, - count: follower.followingCount + 1 - }), - - // Increment followers count - User.update({ _id: followeeId }, { - $inc: { - followersCount: 1 - } - }), - - FollowedLog.insert({ - createdAt: data.following.createdAt, - userId: followerId, - count: followee.followersCount + 1 - }), - - // Publish follow event - isLocalUser(follower) && packUser(followee, follower) - .then(packed => event(follower._id, 'follow', packed)), - - isLocalUser(followee) && Promise.all([ - packUser(follower, followee) - .then(packed => event(followee._id, 'followed', packed)), - - // Notify - isLocalUser(followee) && notify(followeeId, followerId, 'follow') - ]) - ]); - } catch (error) { - Logger.error(error.toString()); - } -}; diff --git a/src/processor/http/index.ts b/src/processor/http/index.ts deleted file mode 100644 index 8f9aa717c3..0000000000 --- a/src/processor/http/index.ts +++ /dev/null @@ -1,17 +0,0 @@ -import deliverPost from './deliver-post'; -import follow from './follow'; -import performActivityPub from './perform-activitypub'; -import processInbox from './process-inbox'; -import reportGitHubFailure from './report-github-failure'; -import unfollow from './unfollow'; - -const handlers = { - deliverPost, - follow, - performActivityPub, - processInbox, - reportGitHubFailure, - unfollow -}; - -export default (job, done) => handlers[job.data.type](job).then(() => done(), done); diff --git a/src/processor/http/perform-activitypub.ts b/src/processor/http/perform-activitypub.ts deleted file mode 100644 index 963e532fe5..0000000000 --- a/src/processor/http/perform-activitypub.ts +++ /dev/null @@ -1,7 +0,0 @@ -import User from '../../models/user'; -import act from '../../remote/activitypub/act'; -import Resolver from '../../remote/activitypub/resolver'; - -export default ({ data }) => User.findOne({ _id: data.actor }) - .then(actor => act(new Resolver(), actor, data.outbox)) - .then(Promise.all); diff --git a/src/processor/http/process-inbox.ts b/src/processor/http/process-inbox.ts deleted file mode 100644 index f102f8d6b4..0000000000 --- a/src/processor/http/process-inbox.ts +++ /dev/null @@ -1,39 +0,0 @@ -import { verifySignature } from 'http-signature'; -import parseAcct from '../../acct/parse'; -import User, { IRemoteUser } from '../../models/user'; -import act from '../../remote/activitypub/act'; -import resolvePerson from '../../remote/activitypub/resolve-person'; -import Resolver from '../../remote/activitypub/resolver'; - -export default async ({ data }): Promise => { - const keyIdLower = data.signature.keyId.toLowerCase(); - let user; - - if (keyIdLower.startsWith('acct:')) { - const { username, host } = parseAcct(keyIdLower.slice('acct:'.length)); - if (host === null) { - throw 'request was made by local user'; - } - - user = await User.findOne({ usernameLower: username, hostLower: host }) as IRemoteUser; - } else { - user = await User.findOne({ - host: { $ne: null }, - 'account.publicKey.id': data.signature.keyId - }) as IRemoteUser; - - if (user === null) { - user = await resolvePerson(data.signature.keyId); - } - } - - if (user === null) { - throw 'failed to resolve user'; - } - - if (!verifySignature(data.signature, user.account.publicKey.publicKeyPem)) { - throw 'signature verification failed'; - } - - await Promise.all(await act(new Resolver(), user, data.inbox, true)); -}; diff --git a/src/processor/http/report-github-failure.ts b/src/processor/http/report-github-failure.ts deleted file mode 100644 index 4f6f5ccee5..0000000000 --- a/src/processor/http/report-github-failure.ts +++ /dev/null @@ -1,24 +0,0 @@ -import * as request from 'request-promise-native'; -import User from '../../models/user'; -const createPost = require('../../server/api/endpoints/posts/create'); - -export default async ({ data }) => { - const asyncBot = User.findOne({ _id: data.userId }); - - // Fetch parent status - const parentStatuses = await request({ - url: `${data.parentUrl}/statuses`, - headers: { - 'User-Agent': 'misskey' - }, - json: true - }); - - const parentState = parentStatuses[0].state; - const stillFailed = parentState == 'failure' || parentState == 'error'; - const text = stillFailed ? - `**⚠️BUILD STILL FAILED⚠️**: ?[${data.message}](${data.htmlUrl})` : - `**🚨BUILD FAILED🚨**: →→→?[${data.message}](${data.htmlUrl})←←←`; - - createPost({ text }, await asyncBot); -}; diff --git a/src/processor/http/unfollow.ts b/src/processor/http/unfollow.ts deleted file mode 100644 index fbfd7b3420..0000000000 --- a/src/processor/http/unfollow.ts +++ /dev/null @@ -1,63 +0,0 @@ -import FollowedLog from '../../models/followed-log'; -import Following from '../../models/following'; -import FollowingLog from '../../models/following-log'; -import User, { isLocalUser, isRemoteUser, pack as packUser } from '../../models/user'; -import stream from '../../publishers/stream'; -import renderFollow from '../../remote/activitypub/renderer/follow'; -import renderUndo from '../../remote/activitypub/renderer/undo'; -import context from '../../remote/activitypub/renderer/context'; -import request from '../../remote/request'; -import Logger from '../../utils/logger'; - -export default async ({ data }) => { - const following = await Following.findOne({ _id: data.id }); - if (following === null) { - return; - } - - const [follower, followee] = await Promise.all([ - User.findOne({ _id: following.followerId }), - User.findOne({ _id: following.followeeId }) - ]); - - if (isLocalUser(follower) && isRemoteUser(followee)) { - const undo = renderUndo(renderFollow(follower, followee)); - undo['@context'] = context; - - await request(follower, followee.account.inbox, undo); - } - - try { - await Promise.all([ - // Delete following - Following.findOneAndDelete({ _id: data.id }), - - // Decrement following count - User.update({ _id: follower._id }, { $inc: { followingCount: -1 } }), - FollowingLog.insert({ - createdAt: new Date(), - userId: follower._id, - count: follower.followingCount - 1 - }), - - // Decrement followers count - User.update({ _id: followee._id }, { $inc: { followersCount: -1 } }), - FollowedLog.insert({ - createdAt: new Date(), - userId: followee._id, - count: followee.followersCount - 1 - }) - ]); - - if (isLocalUser(follower)) { - return; - } - - const promisedPackedUser = packUser(followee, follower); - - // Publish follow event - stream(follower._id, 'unfollow', promisedPackedUser); - } catch (error) { - Logger.error(error.toString()); - } -}; diff --git a/src/processor/index.ts b/src/processor/index.ts deleted file mode 100644 index 172048ddae..0000000000 --- a/src/processor/index.ts +++ /dev/null @@ -1,18 +0,0 @@ -import queue from '../queue'; -import db from './db'; -import http from './http'; - -export default () => { - queue.process('db', db); - - /* - 256 is the default concurrency limit of Mozilla Firefox and Google - Chromium. - - a8af215e691f3a2205a3758d2d96e9d328e100ff - chromium/src.git - Git at Google - https://chromium.googlesource.com/chromium/src.git/+/a8af215e691f3a2205a3758d2d96e9d328e100ff - Network.http.max-connections - MozillaZine Knowledge Base - http://kb.mozillazine.org/Network.http.max-connections - */ - queue.process('http', 256, http); -}; diff --git a/src/queue.ts b/src/queue.ts deleted file mode 100644 index 08ea13c2a3..0000000000 --- a/src/queue.ts +++ /dev/null @@ -1,10 +0,0 @@ -import { createQueue } from 'kue'; -import config from './config'; - -export default createQueue({ - redis: { - port: config.redis.port, - host: config.redis.host, - auth: config.redis.pass - } -}); diff --git a/src/queue/index.ts b/src/queue/index.ts new file mode 100644 index 0000000000..f90754a561 --- /dev/null +++ b/src/queue/index.ts @@ -0,0 +1,38 @@ +import { createQueue } from 'kue'; +import config from '../config'; +import db from './processors/db'; +import http from './processors/http'; + +const queue = createQueue({ + redis: { + port: config.redis.port, + host: config.redis.host, + auth: config.redis.pass + } +}); + +export function createHttp(data) { + return queue + .create('http', data) + .attempts(16) + .backoff({ delay: 16384, type: 'exponential' }); +} + +export function createDb(data) { + return queue.create('db', data); +} + +export function process() { + queue.process('db', db); + + /* + 256 is the default concurrency limit of Mozilla Firefox and Google + Chromium. + + a8af215e691f3a2205a3758d2d96e9d328e100ff - chromium/src.git - Git at Google + https://chromium.googlesource.com/chromium/src.git/+/a8af215e691f3a2205a3758d2d96e9d328e100ff + Network.http.max-connections - MozillaZine Knowledge Base + http://kb.mozillazine.org/Network.http.max-connections + */ + queue.process('http', 256, http); +} diff --git a/src/queue/processors/db/delete-post-dependents.ts b/src/queue/processors/db/delete-post-dependents.ts new file mode 100644 index 0000000000..6de21eb053 --- /dev/null +++ b/src/queue/processors/db/delete-post-dependents.ts @@ -0,0 +1,22 @@ +import Favorite from '../../../models/favorite'; +import Notification from '../../../models/notification'; +import PollVote from '../../../models/poll-vote'; +import PostReaction from '../../../models/post-reaction'; +import PostWatching from '../../../models/post-watching'; +import Post from '../../../models/post'; + +export default async ({ data }) => Promise.all([ + Favorite.remove({ postId: data._id }), + Notification.remove({ postId: data._id }), + PollVote.remove({ postId: data._id }), + PostReaction.remove({ postId: data._id }), + PostWatching.remove({ postId: data._id }), + Post.find({ repostId: data._id }).then(reposts => Promise.all([ + Notification.remove({ + postId: { + $in: reposts.map(({ _id }) => _id) + } + }), + Post.remove({ repostId: data._id }) + ])) +]); diff --git a/src/queue/processors/db/index.ts b/src/queue/processors/db/index.ts new file mode 100644 index 0000000000..75838c099b --- /dev/null +++ b/src/queue/processors/db/index.ts @@ -0,0 +1,7 @@ +import deletePostDependents from './delete-post-dependents'; + +const handlers = { + deletePostDependents +}; + +export default (job, done) => handlers[job.data.type](job).then(() => done(), done); diff --git a/src/queue/processors/http/deliver-post.ts b/src/queue/processors/http/deliver-post.ts new file mode 100644 index 0000000000..e743fc5f68 --- /dev/null +++ b/src/queue/processors/http/deliver-post.ts @@ -0,0 +1,21 @@ +import Post from '../../../models/post'; +import User, { IRemoteUser } from '../../../models/user'; +import context from '../../../remote/activitypub/renderer/context'; +import renderCreate from '../../../remote/activitypub/renderer/create'; +import renderNote from '../../../remote/activitypub/renderer/note'; +import request from '../../../remote/request'; + +export default async ({ data }) => { + const promisedTo = User.findOne({ _id: data.toId }) as Promise; + const [from, post] = await Promise.all([ + User.findOne({ _id: data.fromId }), + Post.findOne({ _id: data.postId }) + ]); + const note = await renderNote(from, post); + const to = await promisedTo; + const create = renderCreate(note); + + create['@context'] = context; + + return request(from, to.account.inbox, create); +}; diff --git a/src/queue/processors/http/follow.ts b/src/queue/processors/http/follow.ts new file mode 100644 index 0000000000..4cb72828e7 --- /dev/null +++ b/src/queue/processors/http/follow.ts @@ -0,0 +1,69 @@ +import User, { isLocalUser, isRemoteUser, pack as packUser } from '../../../models/user'; +import Following from '../../../models/following'; +import FollowingLog from '../../../models/following-log'; +import FollowedLog from '../../../models/followed-log'; +import event from '../../../publishers/stream'; +import notify from '../../../publishers/notify'; +import context from '../../../remote/activitypub/renderer/context'; +import render from '../../../remote/activitypub/renderer/follow'; +import request from '../../../remote/request'; +import Logger from '../../../utils/logger'; + +export default async ({ data }) => { + const { followerId, followeeId } = await Following.findOne({ _id: data.following }); + const [follower, followee] = await Promise.all([ + User.findOne({ _id: followerId }), + User.findOne({ _id: followeeId }) + ]); + + if (isLocalUser(follower) && isRemoteUser(followee)) { + const rendered = render(follower, followee); + rendered['@context'] = context; + + await request(follower, followee.account.inbox, rendered); + } + + try { + await Promise.all([ + // Increment following count + User.update(followerId, { + $inc: { + followingCount: 1 + } + }), + + FollowingLog.insert({ + createdAt: data.following.createdAt, + userId: followerId, + count: follower.followingCount + 1 + }), + + // Increment followers count + User.update({ _id: followeeId }, { + $inc: { + followersCount: 1 + } + }), + + FollowedLog.insert({ + createdAt: data.following.createdAt, + userId: followerId, + count: followee.followersCount + 1 + }), + + // Publish follow event + isLocalUser(follower) && packUser(followee, follower) + .then(packed => event(follower._id, 'follow', packed)), + + isLocalUser(followee) && Promise.all([ + packUser(follower, followee) + .then(packed => event(followee._id, 'followed', packed)), + + // Notify + isLocalUser(followee) && notify(followeeId, followerId, 'follow') + ]) + ]); + } catch (error) { + Logger.error(error.toString()); + } +}; diff --git a/src/queue/processors/http/index.ts b/src/queue/processors/http/index.ts new file mode 100644 index 0000000000..8f9aa717c3 --- /dev/null +++ b/src/queue/processors/http/index.ts @@ -0,0 +1,17 @@ +import deliverPost from './deliver-post'; +import follow from './follow'; +import performActivityPub from './perform-activitypub'; +import processInbox from './process-inbox'; +import reportGitHubFailure from './report-github-failure'; +import unfollow from './unfollow'; + +const handlers = { + deliverPost, + follow, + performActivityPub, + processInbox, + reportGitHubFailure, + unfollow +}; + +export default (job, done) => handlers[job.data.type](job).then(() => done(), done); diff --git a/src/queue/processors/http/perform-activitypub.ts b/src/queue/processors/http/perform-activitypub.ts new file mode 100644 index 0000000000..7b84400d5c --- /dev/null +++ b/src/queue/processors/http/perform-activitypub.ts @@ -0,0 +1,7 @@ +import User from '../../../models/user'; +import act from '../../../remote/activitypub/act'; +import Resolver from '../../../remote/activitypub/resolver'; + +export default ({ data }) => User.findOne({ _id: data.actor }) + .then(actor => act(new Resolver(), actor, data.outbox)) + .then(Promise.all); diff --git a/src/queue/processors/http/process-inbox.ts b/src/queue/processors/http/process-inbox.ts new file mode 100644 index 0000000000..de1dbd2f98 --- /dev/null +++ b/src/queue/processors/http/process-inbox.ts @@ -0,0 +1,39 @@ +import { verifySignature } from 'http-signature'; +import parseAcct from '../../../acct/parse'; +import User, { IRemoteUser } from '../../../models/user'; +import act from '../../../remote/activitypub/act'; +import resolvePerson from '../../../remote/activitypub/resolve-person'; +import Resolver from '../../../remote/activitypub/resolver'; + +export default async ({ data }): Promise => { + const keyIdLower = data.signature.keyId.toLowerCase(); + let user; + + if (keyIdLower.startsWith('acct:')) { + const { username, host } = parseAcct(keyIdLower.slice('acct:'.length)); + if (host === null) { + throw 'request was made by local user'; + } + + user = await User.findOne({ usernameLower: username, hostLower: host }) as IRemoteUser; + } else { + user = await User.findOne({ + host: { $ne: null }, + 'account.publicKey.id': data.signature.keyId + }) as IRemoteUser; + + if (user === null) { + user = await resolvePerson(data.signature.keyId); + } + } + + if (user === null) { + throw 'failed to resolve user'; + } + + if (!verifySignature(data.signature, user.account.publicKey.publicKeyPem)) { + throw 'signature verification failed'; + } + + await Promise.all(await act(new Resolver(), user, data.inbox, true)); +}; diff --git a/src/queue/processors/http/report-github-failure.ts b/src/queue/processors/http/report-github-failure.ts new file mode 100644 index 0000000000..21683ba3c2 --- /dev/null +++ b/src/queue/processors/http/report-github-failure.ts @@ -0,0 +1,24 @@ +import * as request from 'request-promise-native'; +import User from '../../../models/user'; +const createPost = require('../../../server/api/endpoints/posts/create'); + +export default async ({ data }) => { + const asyncBot = User.findOne({ _id: data.userId }); + + // Fetch parent status + const parentStatuses = await request({ + url: `${data.parentUrl}/statuses`, + headers: { + 'User-Agent': 'misskey' + }, + json: true + }); + + const parentState = parentStatuses[0].state; + const stillFailed = parentState == 'failure' || parentState == 'error'; + const text = stillFailed ? + `**⚠️BUILD STILL FAILED⚠️**: ?[${data.message}](${data.htmlUrl})` : + `**🚨BUILD FAILED🚨**: →→→?[${data.message}](${data.htmlUrl})←←←`; + + createPost({ text }, await asyncBot); +}; diff --git a/src/queue/processors/http/unfollow.ts b/src/queue/processors/http/unfollow.ts new file mode 100644 index 0000000000..801a3612a7 --- /dev/null +++ b/src/queue/processors/http/unfollow.ts @@ -0,0 +1,63 @@ +import FollowedLog from '../../../models/followed-log'; +import Following from '../../../models/following'; +import FollowingLog from '../../../models/following-log'; +import User, { isLocalUser, isRemoteUser, pack as packUser } from '../../../models/user'; +import stream from '../../../publishers/stream'; +import renderFollow from '../../../remote/activitypub/renderer/follow'; +import renderUndo from '../../../remote/activitypub/renderer/undo'; +import context from '../../../remote/activitypub/renderer/context'; +import request from '../../../remote/request'; +import Logger from '../../../utils/logger'; + +export default async ({ data }) => { + const following = await Following.findOne({ _id: data.id }); + if (following === null) { + return; + } + + const [follower, followee] = await Promise.all([ + User.findOne({ _id: following.followerId }), + User.findOne({ _id: following.followeeId }) + ]); + + if (isLocalUser(follower) && isRemoteUser(followee)) { + const undo = renderUndo(renderFollow(follower, followee)); + undo['@context'] = context; + + await request(follower, followee.account.inbox, undo); + } + + try { + await Promise.all([ + // Delete following + Following.findOneAndDelete({ _id: data.id }), + + // Decrement following count + User.update({ _id: follower._id }, { $inc: { followingCount: -1 } }), + FollowingLog.insert({ + createdAt: new Date(), + userId: follower._id, + count: follower.followingCount - 1 + }), + + // Decrement followers count + User.update({ _id: followee._id }, { $inc: { followersCount: -1 } }), + FollowedLog.insert({ + createdAt: new Date(), + userId: followee._id, + count: followee.followersCount - 1 + }) + ]); + + if (isLocalUser(follower)) { + return; + } + + const promisedPackedUser = packUser(followee, follower); + + // Publish follow event + stream(follower._id, 'unfollow', promisedPackedUser); + } catch (error) { + Logger.error(error.toString()); + } +}; diff --git a/src/remote/activitypub/act/follow.ts b/src/remote/activitypub/act/follow.ts index 23fa41df8e..222a257e1a 100644 --- a/src/remote/activitypub/act/follow.ts +++ b/src/remote/activitypub/act/follow.ts @@ -3,7 +3,7 @@ import parseAcct from '../../../acct/parse'; import Following, { IFollowing } from '../../../models/following'; import User from '../../../models/user'; import config from '../../../config'; -import queue from '../../../queue'; +import { createHttp } from '../../../queue'; import context from '../renderer/context'; import renderAccept from '../renderer/accept'; import request from '../../request'; @@ -44,7 +44,7 @@ export default async (resolver: Resolver, actor, activity, distribute) => { followerId: actor._id, followeeId: followee._id }).then(following => new Promise((resolve, reject) => { - queue.create('http', { + createHttp({ type: 'follow', following: following._id }).save(error => { diff --git a/src/remote/activitypub/act/undo/unfollow.ts b/src/remote/activitypub/act/undo/unfollow.ts index c17e06e8a9..4f15d9a3e4 100644 --- a/src/remote/activitypub/act/undo/unfollow.ts +++ b/src/remote/activitypub/act/undo/unfollow.ts @@ -1,7 +1,7 @@ -import queue from '../../../../queue'; +import { createHttp } from '../../../../queue'; export default ({ $id }) => new Promise((resolve, reject) => { - queue.create('http', { type: 'unfollow', id: $id }).save(error => { + createHttp({ type: 'unfollow', id: $id }).save(error => { if (error) { reject(error); } else { diff --git a/src/remote/activitypub/delete/post.ts b/src/remote/activitypub/delete/post.ts index f6c816647d..59ae8c2b94 100644 --- a/src/remote/activitypub/delete/post.ts +++ b/src/remote/activitypub/delete/post.ts @@ -1,10 +1,10 @@ import Post from '../../../models/post'; -import queue from '../../../queue'; +import { createDb } from '../../../queue'; export default async ({ $id }) => { const promisedDeletion = Post.findOneAndDelete({ _id: $id }); - await new Promise((resolve, reject) => queue.create('db', { + await new Promise((resolve, reject) => createDb({ type: 'deletePostDependents', id: $id }).delay(65536).save(error => error ? reject(error) : resolve())); diff --git a/src/remote/activitypub/resolve-person.ts b/src/remote/activitypub/resolve-person.ts index 59be65908e..2cf3ad32d8 100644 --- a/src/remote/activitypub/resolve-person.ts +++ b/src/remote/activitypub/resolve-person.ts @@ -1,7 +1,7 @@ import { JSDOM } from 'jsdom'; import { toUnicode } from 'punycode'; import User, { validateUsername, isValidName, isValidDescription } from '../../models/user'; -import queue from '../../queue'; +import { createHttp } from '../../queue'; import webFinger from '../webfinger'; import create from './create'; import Resolver from './resolver'; @@ -69,7 +69,7 @@ export default async (value, verifier?: string) => { }, }); - queue.create('http', { + createHttp({ type: 'performActivityPub', actor: user._id, outbox diff --git a/src/server/activitypub/inbox.ts b/src/server/activitypub/inbox.ts index 5de8433850..0907823b23 100644 --- a/src/server/activitypub/inbox.ts +++ b/src/server/activitypub/inbox.ts @@ -1,7 +1,7 @@ import * as bodyParser from 'body-parser'; import * as express from 'express'; import { parseRequest } from 'http-signature'; -import queue from '../../queue'; +import { createHttp } from '../../queue'; const app = express(); @@ -22,7 +22,7 @@ app.post('/@:user/inbox', bodyParser.json({ return res.sendStatus(401); } - queue.create('http', { + createHttp({ type: 'processInbox', inbox: req.body, signature, diff --git a/src/server/api/endpoints/following/create.ts b/src/server/api/endpoints/following/create.ts index e568595215..9ccbe20171 100644 --- a/src/server/api/endpoints/following/create.ts +++ b/src/server/api/endpoints/following/create.ts @@ -4,7 +4,7 @@ import $ from 'cafy'; import User from '../../../../models/user'; import Following from '../../../../models/following'; -import queue from '../../../../queue'; +import { createHttp } from '../../../../queue'; /** * Follow a user @@ -56,7 +56,7 @@ module.exports = (params, user) => new Promise(async (res, rej) => { followeeId: followee._id }); - queue.create('http', { type: 'follow', following: _id }).save(); + createHttp({ type: 'follow', following: _id }).save(); // Send response res(); diff --git a/src/server/api/endpoints/following/delete.ts b/src/server/api/endpoints/following/delete.ts index bf21bf0cb7..0684b87504 100644 --- a/src/server/api/endpoints/following/delete.ts +++ b/src/server/api/endpoints/following/delete.ts @@ -4,7 +4,7 @@ import $ from 'cafy'; import User from '../../../../models/user'; import Following from '../../../../models/following'; -import queue from '../../../../queue'; +import { createHttp } from '../../../../queue'; /** * Unfollow a user @@ -49,7 +49,7 @@ module.exports = (params, user) => new Promise(async (res, rej) => { return rej('already not following'); } - queue.create('http', { + createHttp({ type: 'unfollow', id: exist._id }).save(error => { diff --git a/src/server/api/service/github.ts b/src/server/api/service/github.ts index 4fd59c2a94..5fc4a92f57 100644 --- a/src/server/api/service/github.ts +++ b/src/server/api/service/github.ts @@ -3,7 +3,7 @@ import * as express from 'express'; //const crypto = require('crypto'); import User from '../../../models/user'; import config from '../../../config'; -import queue from '../../../queue'; +import { createHttp } from '../../../queue'; module.exports = async (app: express.Application) => { if (config.github_bot == null) return; @@ -42,7 +42,7 @@ module.exports = async (app: express.Application) => { const commit = event.commit; const parent = commit.parents[0]; - queue.create('http', { + createHttp({ type: 'gitHubFailureReport', userId: bot._id, parentUrl: parent.url, -- cgit v1.2.3-freya