summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorsyuilo <Syuilotan@yahoo.co.jp>2018-04-04 22:48:27 +0900
committerGitHub <noreply@github.com>2018-04-04 22:48:27 +0900
commitcf160285f82a200a6f042cf924272809280bac79 (patch)
tree9cf0211613474ecf5868e8a2bd59469a83bc9bb3 /src
parentMerge pull request #1393 from akihikodaki/duplicate (diff)
parentRetry HTTP requests (diff)
downloadmisskey-cf160285f82a200a6f042cf924272809280bac79.tar.gz
misskey-cf160285f82a200a6f042cf924272809280bac79.tar.bz2
misskey-cf160285f82a200a6f042cf924272809280bac79.zip
Merge pull request #1394 from akihikodaki/duplicate
Retry HTTP requests
Diffstat (limited to 'src')
-rw-r--r--src/following/distribute.ts42
-rw-r--r--src/index.ts2
-rw-r--r--src/post/distribute.ts98
-rw-r--r--src/processor/http/deliver-post.ts93
-rw-r--r--src/processor/http/follow.ts69
-rw-r--r--src/processor/http/perform-activitypub.ts7
-rw-r--r--src/processor/http/unfollow.ts56
-rw-r--r--src/processor/index.ts18
-rw-r--r--src/queue.ts10
-rw-r--r--src/queue/index.ts38
-rw-r--r--src/queue/processors/db/delete-post-dependents.ts (renamed from src/processor/db/delete-post-dependents.ts)12
-rw-r--r--src/queue/processors/db/index.ts (renamed from src/processor/db/index.ts)0
-rw-r--r--src/queue/processors/http/deliver-post.ts21
-rw-r--r--src/queue/processors/http/follow.ts69
-rw-r--r--src/queue/processors/http/index.ts (renamed from src/processor/http/index.ts)0
-rw-r--r--src/queue/processors/http/perform-activitypub.ts7
-rw-r--r--src/queue/processors/http/process-inbox.ts (renamed from src/processor/http/process-inbox.ts)10
-rw-r--r--src/queue/processors/http/report-github-failure.ts (renamed from src/processor/http/report-github-failure.ts)4
-rw-r--r--src/queue/processors/http/unfollow.ts63
-rw-r--r--src/remote/activitypub/act/follow.ts4
-rw-r--r--src/remote/activitypub/act/undo/unfollow.ts4
-rw-r--r--src/remote/activitypub/delete/post.ts4
-rw-r--r--src/remote/activitypub/resolve-person.ts4
-rw-r--r--src/server/activitypub/inbox.ts4
-rw-r--r--src/server/api/endpoints/following/create.ts4
-rw-r--r--src/server/api/endpoints/following/delete.ts4
-rw-r--r--src/server/api/service/github.ts4
27 files changed, 361 insertions, 290 deletions
diff --git a/src/following/distribute.ts b/src/following/distribute.ts
new file mode 100644
index 0000000000..10ff988814
--- /dev/null
+++ b/src/following/distribute.ts
@@ -0,0 +1,42 @@
+import User, { pack as packUser } from '../models/user';
+import FollowingLog from '../models/following-log';
+import FollowedLog from '../models/followed-log';
+import event from '../publishers/stream';
+import notify from '../publishers/notify';
+
+export default async (follower, followee) => Promise.all([
+ // Increment following count
+ User.update(follower._id, {
+ $inc: {
+ followingCount: 1
+ }
+ }),
+
+ FollowingLog.insert({
+ createdAt: new Date(),
+ userId: followee._id,
+ count: follower.followingCount + 1
+ }),
+
+ // Increment followers count
+ User.update({ _id: followee._id }, {
+ $inc: {
+ followersCount: 1
+ }
+ }),
+
+ FollowedLog.insert({
+ createdAt: new Date(),
+ userId: follower._id,
+ count: followee.followersCount + 1
+ }),
+
+ followee.host === null && Promise.all([
+ // Notify
+ notify(followee.id, follower.id, 'follow'),
+
+ // Publish follow event
+ packUser(follower, followee)
+ .then(packed => event(followee._id, 'followed', packed))
+ ])
+]);
diff --git a/src/index.ts b/src/index.ts
index 29c4f3431a..21fb2f5530 100644
--- a/src/index.ts
+++ b/src/index.ts
@@ -99,7 +99,7 @@ async function workerMain(opt) {
if (!opt['only-server']) {
// start processor
- require('./processor').default();
+ require('./queue').process();
}
// Send a 'ready' message to parent process
diff --git a/src/post/distribute.ts b/src/post/distribute.ts
index 49c6eb22df..f748a620c0 100644
--- a/src/post/distribute.ts
+++ b/src/post/distribute.ts
@@ -1,11 +1,14 @@
+import Channel from '../models/channel';
+import ChannelWatching from '../models/channel-watching';
+import Following from '../models/following';
import Mute from '../models/mute';
import Post, { pack } from '../models/post';
import Watching from '../models/post-watching';
-import User from '../models/user';
-import stream from '../publishers/stream';
+import User, { isLocalUser } from '../models/user';
+import stream, { publishChannelStream } from '../publishers/stream';
import notify from '../publishers/notify';
import pushSw from '../publishers/push-sw';
-import queue from '../queue';
+import { createHttp } from '../queue';
import watch from './watch';
export default async (user, mentions, post) => {
@@ -21,10 +24,6 @@ export default async (user, mentions, post) => {
latestPost: post._id
}
}),
- new Promise((resolve, reject) => queue.create('http', {
- type: 'deliverPost',
- id: post._id,
- }).save(error => error ? reject(error) : resolve())),
] as Array<Promise<any>>;
function addMention(promisedMentionee, reason) {
@@ -50,6 +49,91 @@ export default async (user, mentions, post) => {
}));
}
+ // タイムラインへの投稿
+ if (!post.channelId) {
+ promises.push(
+ // Publish event to myself's stream
+ promisedPostObj.then(postObj => {
+ stream(post.userId, 'post', postObj);
+ }),
+
+ Promise.all([
+ User.findOne({ _id: post.userId }),
+
+ // Fetch all followers
+ Following.aggregate([{
+ $lookup: {
+ from: 'users',
+ localField: 'followerId',
+ foreignField: '_id',
+ as: 'follower'
+ }
+ }, {
+ $match: {
+ followeeId: post.userId
+ }
+ }], {
+ _id: false
+ })
+ ]).then(([user, followers]) => Promise.all(followers.map(following => {
+ if (isLocalUser(following.follower)) {
+ // Publish event to followers stream
+ return promisedPostObj.then(postObj => {
+ stream(following.followerId, 'post', postObj);
+ });
+ }
+
+ return new Promise((resolve, reject) => {
+ createHttp({
+ type: 'deliverPost',
+ fromId: user._id,
+ toId: following.followerId,
+ postId: post._id
+ }).save(error => {
+ if (error) {
+ reject(error);
+ } else {
+ resolve();
+ }
+ });
+ });
+ })))
+ );
+ }
+
+ // チャンネルへの投稿
+ if (post.channelId) {
+ promises.push(
+ // Increment channel index(posts count)
+ Channel.update({ _id: post.channelId }, {
+ $inc: {
+ index: 1
+ }
+ }),
+
+ // Publish event to channel
+ promisedPostObj.then(postObj => {
+ publishChannelStream(post.channelId, 'post', postObj);
+ }),
+
+ Promise.all([
+ promisedPostObj,
+
+ // Get channel watchers
+ ChannelWatching.find({
+ channelId: post.channelId,
+ // 削除されたドキュメントは除く
+ deletedAt: { $exists: false }
+ })
+ ]).then(([postObj, watches]) => {
+ // チャンネルの視聴者(のタイムライン)に配信
+ watches.forEach(w => {
+ stream(w.userId, 'post', postObj);
+ });
+ })
+ );
+ }
+
// If has in reply to post
if (post.replyId) {
promises.push(
diff --git a/src/processor/http/deliver-post.ts b/src/processor/http/deliver-post.ts
deleted file mode 100644
index c00ab912c9..0000000000
--- a/src/processor/http/deliver-post.ts
+++ /dev/null
@@ -1,93 +0,0 @@
-import Channel from '../../models/channel';
-import Following from '../../models/following';
-import ChannelWatching from '../../models/channel-watching';
-import Post, { pack } from '../../models/post';
-import User, { isLocalUser } from '../../models/user';
-import stream, { publishChannelStream } from '../../publishers/stream';
-import context from '../../remote/activitypub/renderer/context';
-import renderCreate from '../../remote/activitypub/renderer/create';
-import renderNote from '../../remote/activitypub/renderer/note';
-import request from '../../remote/request';
-
-export default ({ data }) => Post.findOne({ _id: data.id }).then(post => {
- const promisedPostObj = pack(post);
- const promises = [];
-
- // タイムラインへの投稿
- if (!post.channelId) {
- promises.push(
- // Publish event to myself's stream
- promisedPostObj.then(postObj => {
- stream(post.userId, 'post', postObj);
- }),
-
- Promise.all([
- User.findOne({ _id: post.userId }),
-
- // Fetch all followers
- Following.aggregate([{
- $lookup: {
- from: 'users',
- localField: 'followerId',
- foreignField: '_id',
- as: 'follower'
- }
- }, {
- $match: {
- followeeId: post.userId
- }
- }], {
- _id: false
- })
- ]).then(([user, followers]) => Promise.all(followers.map(following => {
- if (isLocalUser(following.follower)) {
- // Publish event to followers stream
- return promisedPostObj.then(postObj => {
- stream(following.followerId, 'post', postObj);
- });
- }
-
- return renderNote(user, post).then(note => {
- const create = renderCreate(note);
- create['@context'] = context;
- return request(user, following.follower[0].account.inbox, create);
- });
- })))
- );
- }
-
- // チャンネルへの投稿
- if (post.channelId) {
- promises.push(
- // Increment channel index(posts count)
- Channel.update({ _id: post.channelId }, {
- $inc: {
- index: 1
- }
- }),
-
- // Publish event to channel
- promisedPostObj.then(postObj => {
- publishChannelStream(post.channelId, 'post', postObj);
- }),
-
- Promise.all([
- promisedPostObj,
-
- // Get channel watchers
- ChannelWatching.find({
- channelId: post.channelId,
- // 削除されたドキュメントは除く
- deletedAt: { $exists: false }
- })
- ]).then(([postObj, watches]) => {
- // チャンネルの視聴者(のタイムライン)に配信
- watches.forEach(w => {
- stream(w.userId, 'post', postObj);
- });
- })
- );
- }
-
- return Promise.all(promises);
-});
diff --git a/src/processor/http/follow.ts b/src/processor/http/follow.ts
deleted file mode 100644
index 8bf890efbc..0000000000
--- a/src/processor/http/follow.ts
+++ /dev/null
@@ -1,69 +0,0 @@
-import User, { isLocalUser, pack as packUser } from '../../models/user';
-import Following from '../../models/following';
-import FollowingLog from '../../models/following-log';
-import FollowedLog from '../../models/followed-log';
-import event from '../../publishers/stream';
-import notify from '../../publishers/notify';
-import context from '../../remote/activitypub/renderer/context';
-import render from '../../remote/activitypub/renderer/follow';
-import request from '../../remote/request';
-
-export default ({ data }) => Following.findOne({ _id: data.following }).then(({ followerId, followeeId }) => {
- const promisedFollower = User.findOne({ _id: followerId });
- const promisedFollowee = User.findOne({ _id: followeeId });
-
- return Promise.all([
- // Increment following count
- User.update(followerId, {
- $inc: {
- followingCount: 1
- }
- }),
-
- promisedFollower.then(({ followingCount }) => FollowingLog.insert({
- createdAt: data.following.createdAt,
- userId: followerId,
- count: followingCount + 1
- })),
-
- // Increment followers count
- User.update({ _id: followeeId }, {
- $inc: {
- followersCount: 1
- }
- }),
-
- promisedFollowee.then(({ followersCount }) => FollowedLog.insert({
- createdAt: data.following.createdAt,
- userId: followerId,
- count: followersCount + 1
- })),
-
- // Notify
- promisedFollowee.then(followee => followee.host === null ?
- notify(followeeId, followerId, 'follow') : null),
-
- // Publish follow event
- Promise.all([promisedFollower, promisedFollowee]).then(([follower, followee]) => {
- let followerEvent;
- let followeeEvent;
-
- if (isLocalUser(follower)) {
- followerEvent = packUser(followee, follower)
- .then(packed => event(follower._id, 'follow', packed));
- }
-
- if (isLocalUser(followee)) {
- followeeEvent = packUser(follower, followee)
- .then(packed => event(followee._id, 'followed', packed));
- } else if (isLocalUser(follower)) {
- const rendered = render(follower, followee);
- rendered['@context'] = context;
-
- followeeEvent = request(follower, followee.account.inbox, rendered);
- }
-
- return Promise.all([followerEvent, followeeEvent]);
- })
- ]);
-});
diff --git a/src/processor/http/perform-activitypub.ts b/src/processor/http/perform-activitypub.ts
deleted file mode 100644
index 963e532fe5..0000000000
--- a/src/processor/http/perform-activitypub.ts
+++ /dev/null
@@ -1,7 +0,0 @@
-import User from '../../models/user';
-import act from '../../remote/activitypub/act';
-import Resolver from '../../remote/activitypub/resolver';
-
-export default ({ data }) => User.findOne({ _id: data.actor })
- .then(actor => act(new Resolver(), actor, data.outbox))
- .then(Promise.all);
diff --git a/src/processor/http/unfollow.ts b/src/processor/http/unfollow.ts
deleted file mode 100644
index d3d5f2246f..0000000000
--- a/src/processor/http/unfollow.ts
+++ /dev/null
@@ -1,56 +0,0 @@
-import FollowedLog from '../../models/followed-log';
-import Following from '../../models/following';
-import FollowingLog from '../../models/following-log';
-import User, { isRemoteUser, pack as packUser } from '../../models/user';
-import stream from '../../publishers/stream';
-import renderFollow from '../../remote/activitypub/renderer/follow';
-import renderUndo from '../../remote/activitypub/renderer/undo';
-import context from '../../remote/activitypub/renderer/context';
-import request from '../../remote/request';
-
-export default async ({ data }) => {
- // Delete following
- const following = await Following.findOneAndDelete({ _id: data.id });
- if (following === null) {
- return;
- }
-
- const promisedFollower = User.findOne({ _id: following.followerId });
- const promisedFollowee = User.findOne({ _id: following.followeeId });
-
- await Promise.all([
- // Decrement following count
- User.update({ _id: following.followerId }, { $inc: { followingCount: -1 } }),
- promisedFollower.then(({ followingCount }) => FollowingLog.insert({
- createdAt: new Date(),
- userId: following.followerId,
- count: followingCount - 1
- })),
-
- // Decrement followers count
- User.update({ _id: following.followeeId }, { $inc: { followersCount: -1 } }),
- promisedFollowee.then(({ followersCount }) => FollowedLog.insert({
- createdAt: new Date(),
- userId: following.followeeId,
- count: followersCount - 1
- })),
-
- // Publish follow event
- Promise.all([promisedFollower, promisedFollowee]).then(async ([follower, followee]) => {
- if (isRemoteUser(follower)) {
- return;
- }
-
- const promisedPackedUser = packUser(followee, follower);
-
- if (isRemoteUser(followee)) {
- const undo = renderUndo(renderFollow(follower, followee));
- undo['@context'] = context;
-
- await request(follower, followee.account.inbox, undo);
- }
-
- stream(follower._id, 'unfollow', promisedPackedUser);
- })
- ]);
-};
diff --git a/src/processor/index.ts b/src/processor/index.ts
deleted file mode 100644
index 172048ddae..0000000000
--- a/src/processor/index.ts
+++ /dev/null
@@ -1,18 +0,0 @@
-import queue from '../queue';
-import db from './db';
-import http from './http';
-
-export default () => {
- queue.process('db', db);
-
- /*
- 256 is the default concurrency limit of Mozilla Firefox and Google
- Chromium.
-
- a8af215e691f3a2205a3758d2d96e9d328e100ff - chromium/src.git - Git at Google
- https://chromium.googlesource.com/chromium/src.git/+/a8af215e691f3a2205a3758d2d96e9d328e100ff
- Network.http.max-connections - MozillaZine Knowledge Base
- http://kb.mozillazine.org/Network.http.max-connections
- */
- queue.process('http', 256, http);
-};
diff --git a/src/queue.ts b/src/queue.ts
deleted file mode 100644
index 08ea13c2a3..0000000000
--- a/src/queue.ts
+++ /dev/null
@@ -1,10 +0,0 @@
-import { createQueue } from 'kue';
-import config from './config';
-
-export default createQueue({
- redis: {
- port: config.redis.port,
- host: config.redis.host,
- auth: config.redis.pass
- }
-});
diff --git a/src/queue/index.ts b/src/queue/index.ts
new file mode 100644
index 0000000000..f90754a561
--- /dev/null
+++ b/src/queue/index.ts
@@ -0,0 +1,38 @@
+import { createQueue } from 'kue';
+import config from '../config';
+import db from './processors/db';
+import http from './processors/http';
+
+const queue = createQueue({
+ redis: {
+ port: config.redis.port,
+ host: config.redis.host,
+ auth: config.redis.pass
+ }
+});
+
+export function createHttp(data) {
+ return queue
+ .create('http', data)
+ .attempts(16)
+ .backoff({ delay: 16384, type: 'exponential' });
+}
+
+export function createDb(data) {
+ return queue.create('db', data);
+}
+
+export function process() {
+ queue.process('db', db);
+
+ /*
+ 256 is the default concurrency limit of Mozilla Firefox and Google
+ Chromium.
+
+ a8af215e691f3a2205a3758d2d96e9d328e100ff - chromium/src.git - Git at Google
+ https://chromium.googlesource.com/chromium/src.git/+/a8af215e691f3a2205a3758d2d96e9d328e100ff
+ Network.http.max-connections - MozillaZine Knowledge Base
+ http://kb.mozillazine.org/Network.http.max-connections
+ */
+ queue.process('http', 256, http);
+}
diff --git a/src/processor/db/delete-post-dependents.ts b/src/queue/processors/db/delete-post-dependents.ts
index 879c41ec9c..6de21eb053 100644
--- a/src/processor/db/delete-post-dependents.ts
+++ b/src/queue/processors/db/delete-post-dependents.ts
@@ -1,9 +1,9 @@
-import Favorite from '../../models/favorite';
-import Notification from '../../models/notification';
-import PollVote from '../../models/poll-vote';
-import PostReaction from '../../models/post-reaction';
-import PostWatching from '../../models/post-watching';
-import Post from '../../models/post';
+import Favorite from '../../../models/favorite';
+import Notification from '../../../models/notification';
+import PollVote from '../../../models/poll-vote';
+import PostReaction from '../../../models/post-reaction';
+import PostWatching from '../../../models/post-watching';
+import Post from '../../../models/post';
export default async ({ data }) => Promise.all([
Favorite.remove({ postId: data._id }),
diff --git a/src/processor/db/index.ts b/src/queue/processors/db/index.ts
index 75838c099b..75838c099b 100644
--- a/src/processor/db/index.ts
+++ b/src/queue/processors/db/index.ts
diff --git a/src/queue/processors/http/deliver-post.ts b/src/queue/processors/http/deliver-post.ts
new file mode 100644
index 0000000000..e743fc5f68
--- /dev/null
+++ b/src/queue/processors/http/deliver-post.ts
@@ -0,0 +1,21 @@
+import Post from '../../../models/post';
+import User, { IRemoteUser } from '../../../models/user';
+import context from '../../../remote/activitypub/renderer/context';
+import renderCreate from '../../../remote/activitypub/renderer/create';
+import renderNote from '../../../remote/activitypub/renderer/note';
+import request from '../../../remote/request';
+
+export default async ({ data }) => {
+ const promisedTo = User.findOne({ _id: data.toId }) as Promise<IRemoteUser>;
+ const [from, post] = await Promise.all([
+ User.findOne({ _id: data.fromId }),
+ Post.findOne({ _id: data.postId })
+ ]);
+ const note = await renderNote(from, post);
+ const to = await promisedTo;
+ const create = renderCreate(note);
+
+ create['@context'] = context;
+
+ return request(from, to.account.inbox, create);
+};
diff --git a/src/queue/processors/http/follow.ts b/src/queue/processors/http/follow.ts
new file mode 100644
index 0000000000..4cb72828e7
--- /dev/null
+++ b/src/queue/processors/http/follow.ts
@@ -0,0 +1,69 @@
+import User, { isLocalUser, isRemoteUser, pack as packUser } from '../../../models/user';
+import Following from '../../../models/following';
+import FollowingLog from '../../../models/following-log';
+import FollowedLog from '../../../models/followed-log';
+import event from '../../../publishers/stream';
+import notify from '../../../publishers/notify';
+import context from '../../../remote/activitypub/renderer/context';
+import render from '../../../remote/activitypub/renderer/follow';
+import request from '../../../remote/request';
+import Logger from '../../../utils/logger';
+
+export default async ({ data }) => {
+ const { followerId, followeeId } = await Following.findOne({ _id: data.following });
+ const [follower, followee] = await Promise.all([
+ User.findOne({ _id: followerId }),
+ User.findOne({ _id: followeeId })
+ ]);
+
+ if (isLocalUser(follower) && isRemoteUser(followee)) {
+ const rendered = render(follower, followee);
+ rendered['@context'] = context;
+
+ await request(follower, followee.account.inbox, rendered);
+ }
+
+ try {
+ await Promise.all([
+ // Increment following count
+ User.update(followerId, {
+ $inc: {
+ followingCount: 1
+ }
+ }),
+
+ FollowingLog.insert({
+ createdAt: data.following.createdAt,
+ userId: followerId,
+ count: follower.followingCount + 1
+ }),
+
+ // Increment followers count
+ User.update({ _id: followeeId }, {
+ $inc: {
+ followersCount: 1
+ }
+ }),
+
+ FollowedLog.insert({
+ createdAt: data.following.createdAt,
+ userId: followerId,
+ count: followee.followersCount + 1
+ }),
+
+ // Publish follow event
+ isLocalUser(follower) && packUser(followee, follower)
+ .then(packed => event(follower._id, 'follow', packed)),
+
+ isLocalUser(followee) && Promise.all([
+ packUser(follower, followee)
+ .then(packed => event(followee._id, 'followed', packed)),
+
+ // Notify
+ isLocalUser(followee) && notify(followeeId, followerId, 'follow')
+ ])
+ ]);
+ } catch (error) {
+ Logger.error(error.toString());
+ }
+};
diff --git a/src/processor/http/index.ts b/src/queue/processors/http/index.ts
index 8f9aa717c3..8f9aa717c3 100644
--- a/src/processor/http/index.ts
+++ b/src/queue/processors/http/index.ts
diff --git a/src/queue/processors/http/perform-activitypub.ts b/src/queue/processors/http/perform-activitypub.ts
new file mode 100644
index 0000000000..7b84400d5c
--- /dev/null
+++ b/src/queue/processors/http/perform-activitypub.ts
@@ -0,0 +1,7 @@
+import User from '../../../models/user';
+import act from '../../../remote/activitypub/act';
+import Resolver from '../../../remote/activitypub/resolver';
+
+export default ({ data }) => User.findOne({ _id: data.actor })
+ .then(actor => act(new Resolver(), actor, data.outbox))
+ .then(Promise.all);
diff --git a/src/processor/http/process-inbox.ts b/src/queue/processors/http/process-inbox.ts
index f102f8d6b4..de1dbd2f98 100644
--- a/src/processor/http/process-inbox.ts
+++ b/src/queue/processors/http/process-inbox.ts
@@ -1,9 +1,9 @@
import { verifySignature } from 'http-signature';
-import parseAcct from '../../acct/parse';
-import User, { IRemoteUser } from '../../models/user';
-import act from '../../remote/activitypub/act';
-import resolvePerson from '../../remote/activitypub/resolve-person';
-import Resolver from '../../remote/activitypub/resolver';
+import parseAcct from '../../../acct/parse';
+import User, { IRemoteUser } from '../../../models/user';
+import act from '../../../remote/activitypub/act';
+import resolvePerson from '../../../remote/activitypub/resolve-person';
+import Resolver from '../../../remote/activitypub/resolver';
export default async ({ data }): Promise<void> => {
const keyIdLower = data.signature.keyId.toLowerCase();
diff --git a/src/processor/http/report-github-failure.ts b/src/queue/processors/http/report-github-failure.ts
index 4f6f5ccee5..21683ba3c2 100644
--- a/src/processor/http/report-github-failure.ts
+++ b/src/queue/processors/http/report-github-failure.ts
@@ -1,6 +1,6 @@
import * as request from 'request-promise-native';
-import User from '../../models/user';
-const createPost = require('../../server/api/endpoints/posts/create');
+import User from '../../../models/user';
+const createPost = require('../../../server/api/endpoints/posts/create');
export default async ({ data }) => {
const asyncBot = User.findOne({ _id: data.userId });
diff --git a/src/queue/processors/http/unfollow.ts b/src/queue/processors/http/unfollow.ts
new file mode 100644
index 0000000000..801a3612a7
--- /dev/null
+++ b/src/queue/processors/http/unfollow.ts
@@ -0,0 +1,63 @@
+import FollowedLog from '../../../models/followed-log';
+import Following from '../../../models/following';
+import FollowingLog from '../../../models/following-log';
+import User, { isLocalUser, isRemoteUser, pack as packUser } from '../../../models/user';
+import stream from '../../../publishers/stream';
+import renderFollow from '../../../remote/activitypub/renderer/follow';
+import renderUndo from '../../../remote/activitypub/renderer/undo';
+import context from '../../../remote/activitypub/renderer/context';
+import request from '../../../remote/request';
+import Logger from '../../../utils/logger';
+
+export default async ({ data }) => {
+ const following = await Following.findOne({ _id: data.id });
+ if (following === null) {
+ return;
+ }
+
+ const [follower, followee] = await Promise.all([
+ User.findOne({ _id: following.followerId }),
+ User.findOne({ _id: following.followeeId })
+ ]);
+
+ if (isLocalUser(follower) && isRemoteUser(followee)) {
+ const undo = renderUndo(renderFollow(follower, followee));
+ undo['@context'] = context;
+
+ await request(follower, followee.account.inbox, undo);
+ }
+
+ try {
+ await Promise.all([
+ // Delete following
+ Following.findOneAndDelete({ _id: data.id }),
+
+ // Decrement following count
+ User.update({ _id: follower._id }, { $inc: { followingCount: -1 } }),
+ FollowingLog.insert({
+ createdAt: new Date(),
+ userId: follower._id,
+ count: follower.followingCount - 1
+ }),
+
+ // Decrement followers count
+ User.update({ _id: followee._id }, { $inc: { followersCount: -1 } }),
+ FollowedLog.insert({
+ createdAt: new Date(),
+ userId: followee._id,
+ count: followee.followersCount - 1
+ })
+ ]);
+
+ if (isLocalUser(follower)) {
+ return;
+ }
+
+ const promisedPackedUser = packUser(followee, follower);
+
+ // Publish follow event
+ stream(follower._id, 'unfollow', promisedPackedUser);
+ } catch (error) {
+ Logger.error(error.toString());
+ }
+};
diff --git a/src/remote/activitypub/act/follow.ts b/src/remote/activitypub/act/follow.ts
index 23fa41df8e..222a257e1a 100644
--- a/src/remote/activitypub/act/follow.ts
+++ b/src/remote/activitypub/act/follow.ts
@@ -3,7 +3,7 @@ import parseAcct from '../../../acct/parse';
import Following, { IFollowing } from '../../../models/following';
import User from '../../../models/user';
import config from '../../../config';
-import queue from '../../../queue';
+import { createHttp } from '../../../queue';
import context from '../renderer/context';
import renderAccept from '../renderer/accept';
import request from '../../request';
@@ -44,7 +44,7 @@ export default async (resolver: Resolver, actor, activity, distribute) => {
followerId: actor._id,
followeeId: followee._id
}).then(following => new Promise((resolve, reject) => {
- queue.create('http', {
+ createHttp({
type: 'follow',
following: following._id
}).save(error => {
diff --git a/src/remote/activitypub/act/undo/unfollow.ts b/src/remote/activitypub/act/undo/unfollow.ts
index c17e06e8a9..4f15d9a3e4 100644
--- a/src/remote/activitypub/act/undo/unfollow.ts
+++ b/src/remote/activitypub/act/undo/unfollow.ts
@@ -1,7 +1,7 @@
-import queue from '../../../../queue';
+import { createHttp } from '../../../../queue';
export default ({ $id }) => new Promise((resolve, reject) => {
- queue.create('http', { type: 'unfollow', id: $id }).save(error => {
+ createHttp({ type: 'unfollow', id: $id }).save(error => {
if (error) {
reject(error);
} else {
diff --git a/src/remote/activitypub/delete/post.ts b/src/remote/activitypub/delete/post.ts
index f6c816647d..59ae8c2b94 100644
--- a/src/remote/activitypub/delete/post.ts
+++ b/src/remote/activitypub/delete/post.ts
@@ -1,10 +1,10 @@
import Post from '../../../models/post';
-import queue from '../../../queue';
+import { createDb } from '../../../queue';
export default async ({ $id }) => {
const promisedDeletion = Post.findOneAndDelete({ _id: $id });
- await new Promise((resolve, reject) => queue.create('db', {
+ await new Promise((resolve, reject) => createDb({
type: 'deletePostDependents',
id: $id
}).delay(65536).save(error => error ? reject(error) : resolve()));
diff --git a/src/remote/activitypub/resolve-person.ts b/src/remote/activitypub/resolve-person.ts
index 59be65908e..2cf3ad32d8 100644
--- a/src/remote/activitypub/resolve-person.ts
+++ b/src/remote/activitypub/resolve-person.ts
@@ -1,7 +1,7 @@
import { JSDOM } from 'jsdom';
import { toUnicode } from 'punycode';
import User, { validateUsername, isValidName, isValidDescription } from '../../models/user';
-import queue from '../../queue';
+import { createHttp } from '../../queue';
import webFinger from '../webfinger';
import create from './create';
import Resolver from './resolver';
@@ -69,7 +69,7 @@ export default async (value, verifier?: string) => {
},
});
- queue.create('http', {
+ createHttp({
type: 'performActivityPub',
actor: user._id,
outbox
diff --git a/src/server/activitypub/inbox.ts b/src/server/activitypub/inbox.ts
index 5de8433850..0907823b23 100644
--- a/src/server/activitypub/inbox.ts
+++ b/src/server/activitypub/inbox.ts
@@ -1,7 +1,7 @@
import * as bodyParser from 'body-parser';
import * as express from 'express';
import { parseRequest } from 'http-signature';
-import queue from '../../queue';
+import { createHttp } from '../../queue';
const app = express();
@@ -22,7 +22,7 @@ app.post('/@:user/inbox', bodyParser.json({
return res.sendStatus(401);
}
- queue.create('http', {
+ createHttp({
type: 'processInbox',
inbox: req.body,
signature,
diff --git a/src/server/api/endpoints/following/create.ts b/src/server/api/endpoints/following/create.ts
index e568595215..9ccbe20171 100644
--- a/src/server/api/endpoints/following/create.ts
+++ b/src/server/api/endpoints/following/create.ts
@@ -4,7 +4,7 @@
import $ from 'cafy';
import User from '../../../../models/user';
import Following from '../../../../models/following';
-import queue from '../../../../queue';
+import { createHttp } from '../../../../queue';
/**
* Follow a user
@@ -56,7 +56,7 @@ module.exports = (params, user) => new Promise(async (res, rej) => {
followeeId: followee._id
});
- queue.create('http', { type: 'follow', following: _id }).save();
+ createHttp({ type: 'follow', following: _id }).save();
// Send response
res();
diff --git a/src/server/api/endpoints/following/delete.ts b/src/server/api/endpoints/following/delete.ts
index bf21bf0cb7..0684b87504 100644
--- a/src/server/api/endpoints/following/delete.ts
+++ b/src/server/api/endpoints/following/delete.ts
@@ -4,7 +4,7 @@
import $ from 'cafy';
import User from '../../../../models/user';
import Following from '../../../../models/following';
-import queue from '../../../../queue';
+import { createHttp } from '../../../../queue';
/**
* Unfollow a user
@@ -49,7 +49,7 @@ module.exports = (params, user) => new Promise(async (res, rej) => {
return rej('already not following');
}
- queue.create('http', {
+ createHttp({
type: 'unfollow',
id: exist._id
}).save(error => {
diff --git a/src/server/api/service/github.ts b/src/server/api/service/github.ts
index 4fd59c2a94..5fc4a92f57 100644
--- a/src/server/api/service/github.ts
+++ b/src/server/api/service/github.ts
@@ -3,7 +3,7 @@ import * as express from 'express';
//const crypto = require('crypto');
import User from '../../../models/user';
import config from '../../../config';
-import queue from '../../../queue';
+import { createHttp } from '../../../queue';
module.exports = async (app: express.Application) => {
if (config.github_bot == null) return;
@@ -42,7 +42,7 @@ module.exports = async (app: express.Application) => {
const commit = event.commit;
const parent = commit.parents[0];
- queue.create('http', {
+ createHttp({
type: 'gitHubFailureReport',
userId: bot._id,
parentUrl: parent.url,