summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAkihiko Odaki <nekomanma@pixiv.co.jp>2018-04-04 22:45:55 +0900
committerAkihiko Odaki <nekomanma@pixiv.co.jp>2018-04-04 22:45:55 +0900
commitd7c13b975f55c85b695b72a3ded3d5de97227414 (patch)
treee197f64e6b977c1f8199b3b2a93815a5529e704d /src
parentMake HTTP request first in unfollow job (diff)
downloadsharkey-d7c13b975f55c85b695b72a3ded3d5de97227414.tar.gz
sharkey-d7c13b975f55c85b695b72a3ded3d5de97227414.tar.bz2
sharkey-d7c13b975f55c85b695b72a3ded3d5de97227414.zip
Retry HTTP requests
Diffstat (limited to 'src')
-rw-r--r--src/following/distribute.ts42
-rw-r--r--src/index.ts2
-rw-r--r--src/post/distribute.ts4
-rw-r--r--src/processor/http/perform-activitypub.ts7
-rw-r--r--src/processor/index.ts18
-rw-r--r--src/queue.ts10
-rw-r--r--src/queue/index.ts38
-rw-r--r--src/queue/processors/db/delete-post-dependents.ts (renamed from src/processor/db/delete-post-dependents.ts)12
-rw-r--r--src/queue/processors/db/index.ts (renamed from src/processor/db/index.ts)0
-rw-r--r--src/queue/processors/http/deliver-post.ts (renamed from src/processor/http/deliver-post.ts)12
-rw-r--r--src/queue/processors/http/follow.ts (renamed from src/processor/http/follow.ts)20
-rw-r--r--src/queue/processors/http/index.ts (renamed from src/processor/http/index.ts)0
-rw-r--r--src/queue/processors/http/perform-activitypub.ts7
-rw-r--r--src/queue/processors/http/process-inbox.ts (renamed from src/processor/http/process-inbox.ts)10
-rw-r--r--src/queue/processors/http/report-github-failure.ts (renamed from src/processor/http/report-github-failure.ts)4
-rw-r--r--src/queue/processors/http/unfollow.ts (renamed from src/processor/http/unfollow.ts)20
-rw-r--r--src/remote/activitypub/act/follow.ts4
-rw-r--r--src/remote/activitypub/act/undo/unfollow.ts4
-rw-r--r--src/remote/activitypub/delete/post.ts4
-rw-r--r--src/remote/activitypub/resolve-person.ts4
-rw-r--r--src/server/activitypub/inbox.ts4
-rw-r--r--src/server/api/endpoints/following/create.ts4
-rw-r--r--src/server/api/endpoints/following/delete.ts4
-rw-r--r--src/server/api/service/github.ts4
24 files changed, 145 insertions, 93 deletions
diff --git a/src/following/distribute.ts b/src/following/distribute.ts
new file mode 100644
index 0000000000..10ff988814
--- /dev/null
+++ b/src/following/distribute.ts
@@ -0,0 +1,42 @@
+import User, { pack as packUser } from '../models/user';
+import FollowingLog from '../models/following-log';
+import FollowedLog from '../models/followed-log';
+import event from '../publishers/stream';
+import notify from '../publishers/notify';
+
+export default async (follower, followee) => Promise.all([
+ // Increment following count
+ User.update(follower._id, {
+ $inc: {
+ followingCount: 1
+ }
+ }),
+
+ FollowingLog.insert({
+ createdAt: new Date(),
+ userId: followee._id,
+ count: follower.followingCount + 1
+ }),
+
+ // Increment followers count
+ User.update({ _id: followee._id }, {
+ $inc: {
+ followersCount: 1
+ }
+ }),
+
+ FollowedLog.insert({
+ createdAt: new Date(),
+ userId: follower._id,
+ count: followee.followersCount + 1
+ }),
+
+ followee.host === null && Promise.all([
+ // Notify
+ notify(followee.id, follower.id, 'follow'),
+
+ // Publish follow event
+ packUser(follower, followee)
+ .then(packed => event(followee._id, 'followed', packed))
+ ])
+]);
diff --git a/src/index.ts b/src/index.ts
index 29c4f3431a..21fb2f5530 100644
--- a/src/index.ts
+++ b/src/index.ts
@@ -99,7 +99,7 @@ async function workerMain(opt) {
if (!opt['only-server']) {
// start processor
- require('./processor').default();
+ require('./queue').process();
}
// Send a 'ready' message to parent process
diff --git a/src/post/distribute.ts b/src/post/distribute.ts
index ad699d6b84..f748a620c0 100644
--- a/src/post/distribute.ts
+++ b/src/post/distribute.ts
@@ -8,7 +8,7 @@ import User, { isLocalUser } from '../models/user';
import stream, { publishChannelStream } from '../publishers/stream';
import notify from '../publishers/notify';
import pushSw from '../publishers/push-sw';
-import queue from '../queue';
+import { createHttp } from '../queue';
import watch from './watch';
export default async (user, mentions, post) => {
@@ -84,7 +84,7 @@ export default async (user, mentions, post) => {
}
return new Promise((resolve, reject) => {
- queue.create('http', {
+ createHttp({
type: 'deliverPost',
fromId: user._id,
toId: following.followerId,
diff --git a/src/processor/http/perform-activitypub.ts b/src/processor/http/perform-activitypub.ts
deleted file mode 100644
index 963e532fe5..0000000000
--- a/src/processor/http/perform-activitypub.ts
+++ /dev/null
@@ -1,7 +0,0 @@
-import User from '../../models/user';
-import act from '../../remote/activitypub/act';
-import Resolver from '../../remote/activitypub/resolver';
-
-export default ({ data }) => User.findOne({ _id: data.actor })
- .then(actor => act(new Resolver(), actor, data.outbox))
- .then(Promise.all);
diff --git a/src/processor/index.ts b/src/processor/index.ts
deleted file mode 100644
index 172048ddae..0000000000
--- a/src/processor/index.ts
+++ /dev/null
@@ -1,18 +0,0 @@
-import queue from '../queue';
-import db from './db';
-import http from './http';
-
-export default () => {
- queue.process('db', db);
-
- /*
- 256 is the default concurrency limit of Mozilla Firefox and Google
- Chromium.
-
- a8af215e691f3a2205a3758d2d96e9d328e100ff - chromium/src.git - Git at Google
- https://chromium.googlesource.com/chromium/src.git/+/a8af215e691f3a2205a3758d2d96e9d328e100ff
- Network.http.max-connections - MozillaZine Knowledge Base
- http://kb.mozillazine.org/Network.http.max-connections
- */
- queue.process('http', 256, http);
-};
diff --git a/src/queue.ts b/src/queue.ts
deleted file mode 100644
index 08ea13c2a3..0000000000
--- a/src/queue.ts
+++ /dev/null
@@ -1,10 +0,0 @@
-import { createQueue } from 'kue';
-import config from './config';
-
-export default createQueue({
- redis: {
- port: config.redis.port,
- host: config.redis.host,
- auth: config.redis.pass
- }
-});
diff --git a/src/queue/index.ts b/src/queue/index.ts
new file mode 100644
index 0000000000..f90754a561
--- /dev/null
+++ b/src/queue/index.ts
@@ -0,0 +1,38 @@
+import { createQueue } from 'kue';
+import config from '../config';
+import db from './processors/db';
+import http from './processors/http';
+
+const queue = createQueue({
+ redis: {
+ port: config.redis.port,
+ host: config.redis.host,
+ auth: config.redis.pass
+ }
+});
+
+export function createHttp(data) {
+ return queue
+ .create('http', data)
+ .attempts(16)
+ .backoff({ delay: 16384, type: 'exponential' });
+}
+
+export function createDb(data) {
+ return queue.create('db', data);
+}
+
+export function process() {
+ queue.process('db', db);
+
+ /*
+ 256 is the default concurrency limit of Mozilla Firefox and Google
+ Chromium.
+
+ a8af215e691f3a2205a3758d2d96e9d328e100ff - chromium/src.git - Git at Google
+ https://chromium.googlesource.com/chromium/src.git/+/a8af215e691f3a2205a3758d2d96e9d328e100ff
+ Network.http.max-connections - MozillaZine Knowledge Base
+ http://kb.mozillazine.org/Network.http.max-connections
+ */
+ queue.process('http', 256, http);
+}
diff --git a/src/processor/db/delete-post-dependents.ts b/src/queue/processors/db/delete-post-dependents.ts
index 879c41ec9c..6de21eb053 100644
--- a/src/processor/db/delete-post-dependents.ts
+++ b/src/queue/processors/db/delete-post-dependents.ts
@@ -1,9 +1,9 @@
-import Favorite from '../../models/favorite';
-import Notification from '../../models/notification';
-import PollVote from '../../models/poll-vote';
-import PostReaction from '../../models/post-reaction';
-import PostWatching from '../../models/post-watching';
-import Post from '../../models/post';
+import Favorite from '../../../models/favorite';
+import Notification from '../../../models/notification';
+import PollVote from '../../../models/poll-vote';
+import PostReaction from '../../../models/post-reaction';
+import PostWatching from '../../../models/post-watching';
+import Post from '../../../models/post';
export default async ({ data }) => Promise.all([
Favorite.remove({ postId: data._id }),
diff --git a/src/processor/db/index.ts b/src/queue/processors/db/index.ts
index 75838c099b..75838c099b 100644
--- a/src/processor/db/index.ts
+++ b/src/queue/processors/db/index.ts
diff --git a/src/processor/http/deliver-post.ts b/src/queue/processors/http/deliver-post.ts
index 48ad4f95a1..e743fc5f68 100644
--- a/src/processor/http/deliver-post.ts
+++ b/src/queue/processors/http/deliver-post.ts
@@ -1,9 +1,9 @@
-import Post from '../../models/post';
-import User, { IRemoteUser } from '../../models/user';
-import context from '../../remote/activitypub/renderer/context';
-import renderCreate from '../../remote/activitypub/renderer/create';
-import renderNote from '../../remote/activitypub/renderer/note';
-import request from '../../remote/request';
+import Post from '../../../models/post';
+import User, { IRemoteUser } from '../../../models/user';
+import context from '../../../remote/activitypub/renderer/context';
+import renderCreate from '../../../remote/activitypub/renderer/create';
+import renderNote from '../../../remote/activitypub/renderer/note';
+import request from '../../../remote/request';
export default async ({ data }) => {
const promisedTo = User.findOne({ _id: data.toId }) as Promise<IRemoteUser>;
diff --git a/src/processor/http/follow.ts b/src/queue/processors/http/follow.ts
index ed36fa18d4..4cb72828e7 100644
--- a/src/processor/http/follow.ts
+++ b/src/queue/processors/http/follow.ts
@@ -1,13 +1,13 @@
-import User, { isLocalUser, isRemoteUser, pack as packUser } from '../../models/user';
-import Following from '../../models/following';
-import FollowingLog from '../../models/following-log';
-import FollowedLog from '../../models/followed-log';
-import event from '../../publishers/stream';
-import notify from '../../publishers/notify';
-import context from '../../remote/activitypub/renderer/context';
-import render from '../../remote/activitypub/renderer/follow';
-import request from '../../remote/request';
-import Logger from '../../utils/logger';
+import User, { isLocalUser, isRemoteUser, pack as packUser } from '../../../models/user';
+import Following from '../../../models/following';
+import FollowingLog from '../../../models/following-log';
+import FollowedLog from '../../../models/followed-log';
+import event from '../../../publishers/stream';
+import notify from '../../../publishers/notify';
+import context from '../../../remote/activitypub/renderer/context';
+import render from '../../../remote/activitypub/renderer/follow';
+import request from '../../../remote/request';
+import Logger from '../../../utils/logger';
export default async ({ data }) => {
const { followerId, followeeId } = await Following.findOne({ _id: data.following });
diff --git a/src/processor/http/index.ts b/src/queue/processors/http/index.ts
index 8f9aa717c3..8f9aa717c3 100644
--- a/src/processor/http/index.ts
+++ b/src/queue/processors/http/index.ts
diff --git a/src/queue/processors/http/perform-activitypub.ts b/src/queue/processors/http/perform-activitypub.ts
new file mode 100644
index 0000000000..7b84400d5c
--- /dev/null
+++ b/src/queue/processors/http/perform-activitypub.ts
@@ -0,0 +1,7 @@
+import User from '../../../models/user';
+import act from '../../../remote/activitypub/act';
+import Resolver from '../../../remote/activitypub/resolver';
+
+export default ({ data }) => User.findOne({ _id: data.actor })
+ .then(actor => act(new Resolver(), actor, data.outbox))
+ .then(Promise.all);
diff --git a/src/processor/http/process-inbox.ts b/src/queue/processors/http/process-inbox.ts
index f102f8d6b4..de1dbd2f98 100644
--- a/src/processor/http/process-inbox.ts
+++ b/src/queue/processors/http/process-inbox.ts
@@ -1,9 +1,9 @@
import { verifySignature } from 'http-signature';
-import parseAcct from '../../acct/parse';
-import User, { IRemoteUser } from '../../models/user';
-import act from '../../remote/activitypub/act';
-import resolvePerson from '../../remote/activitypub/resolve-person';
-import Resolver from '../../remote/activitypub/resolver';
+import parseAcct from '../../../acct/parse';
+import User, { IRemoteUser } from '../../../models/user';
+import act from '../../../remote/activitypub/act';
+import resolvePerson from '../../../remote/activitypub/resolve-person';
+import Resolver from '../../../remote/activitypub/resolver';
export default async ({ data }): Promise<void> => {
const keyIdLower = data.signature.keyId.toLowerCase();
diff --git a/src/processor/http/report-github-failure.ts b/src/queue/processors/http/report-github-failure.ts
index 4f6f5ccee5..21683ba3c2 100644
--- a/src/processor/http/report-github-failure.ts
+++ b/src/queue/processors/http/report-github-failure.ts
@@ -1,6 +1,6 @@
import * as request from 'request-promise-native';
-import User from '../../models/user';
-const createPost = require('../../server/api/endpoints/posts/create');
+import User from '../../../models/user';
+const createPost = require('../../../server/api/endpoints/posts/create');
export default async ({ data }) => {
const asyncBot = User.findOne({ _id: data.userId });
diff --git a/src/processor/http/unfollow.ts b/src/queue/processors/http/unfollow.ts
index fbfd7b3420..801a3612a7 100644
--- a/src/processor/http/unfollow.ts
+++ b/src/queue/processors/http/unfollow.ts
@@ -1,13 +1,13 @@
-import FollowedLog from '../../models/followed-log';
-import Following from '../../models/following';
-import FollowingLog from '../../models/following-log';
-import User, { isLocalUser, isRemoteUser, pack as packUser } from '../../models/user';
-import stream from '../../publishers/stream';
-import renderFollow from '../../remote/activitypub/renderer/follow';
-import renderUndo from '../../remote/activitypub/renderer/undo';
-import context from '../../remote/activitypub/renderer/context';
-import request from '../../remote/request';
-import Logger from '../../utils/logger';
+import FollowedLog from '../../../models/followed-log';
+import Following from '../../../models/following';
+import FollowingLog from '../../../models/following-log';
+import User, { isLocalUser, isRemoteUser, pack as packUser } from '../../../models/user';
+import stream from '../../../publishers/stream';
+import renderFollow from '../../../remote/activitypub/renderer/follow';
+import renderUndo from '../../../remote/activitypub/renderer/undo';
+import context from '../../../remote/activitypub/renderer/context';
+import request from '../../../remote/request';
+import Logger from '../../../utils/logger';
export default async ({ data }) => {
const following = await Following.findOne({ _id: data.id });
diff --git a/src/remote/activitypub/act/follow.ts b/src/remote/activitypub/act/follow.ts
index 23fa41df8e..222a257e1a 100644
--- a/src/remote/activitypub/act/follow.ts
+++ b/src/remote/activitypub/act/follow.ts
@@ -3,7 +3,7 @@ import parseAcct from '../../../acct/parse';
import Following, { IFollowing } from '../../../models/following';
import User from '../../../models/user';
import config from '../../../config';
-import queue from '../../../queue';
+import { createHttp } from '../../../queue';
import context from '../renderer/context';
import renderAccept from '../renderer/accept';
import request from '../../request';
@@ -44,7 +44,7 @@ export default async (resolver: Resolver, actor, activity, distribute) => {
followerId: actor._id,
followeeId: followee._id
}).then(following => new Promise((resolve, reject) => {
- queue.create('http', {
+ createHttp({
type: 'follow',
following: following._id
}).save(error => {
diff --git a/src/remote/activitypub/act/undo/unfollow.ts b/src/remote/activitypub/act/undo/unfollow.ts
index c17e06e8a9..4f15d9a3e4 100644
--- a/src/remote/activitypub/act/undo/unfollow.ts
+++ b/src/remote/activitypub/act/undo/unfollow.ts
@@ -1,7 +1,7 @@
-import queue from '../../../../queue';
+import { createHttp } from '../../../../queue';
export default ({ $id }) => new Promise((resolve, reject) => {
- queue.create('http', { type: 'unfollow', id: $id }).save(error => {
+ createHttp({ type: 'unfollow', id: $id }).save(error => {
if (error) {
reject(error);
} else {
diff --git a/src/remote/activitypub/delete/post.ts b/src/remote/activitypub/delete/post.ts
index f6c816647d..59ae8c2b94 100644
--- a/src/remote/activitypub/delete/post.ts
+++ b/src/remote/activitypub/delete/post.ts
@@ -1,10 +1,10 @@
import Post from '../../../models/post';
-import queue from '../../../queue';
+import { createDb } from '../../../queue';
export default async ({ $id }) => {
const promisedDeletion = Post.findOneAndDelete({ _id: $id });
- await new Promise((resolve, reject) => queue.create('db', {
+ await new Promise((resolve, reject) => createDb({
type: 'deletePostDependents',
id: $id
}).delay(65536).save(error => error ? reject(error) : resolve()));
diff --git a/src/remote/activitypub/resolve-person.ts b/src/remote/activitypub/resolve-person.ts
index 59be65908e..2cf3ad32d8 100644
--- a/src/remote/activitypub/resolve-person.ts
+++ b/src/remote/activitypub/resolve-person.ts
@@ -1,7 +1,7 @@
import { JSDOM } from 'jsdom';
import { toUnicode } from 'punycode';
import User, { validateUsername, isValidName, isValidDescription } from '../../models/user';
-import queue from '../../queue';
+import { createHttp } from '../../queue';
import webFinger from '../webfinger';
import create from './create';
import Resolver from './resolver';
@@ -69,7 +69,7 @@ export default async (value, verifier?: string) => {
},
});
- queue.create('http', {
+ createHttp({
type: 'performActivityPub',
actor: user._id,
outbox
diff --git a/src/server/activitypub/inbox.ts b/src/server/activitypub/inbox.ts
index 5de8433850..0907823b23 100644
--- a/src/server/activitypub/inbox.ts
+++ b/src/server/activitypub/inbox.ts
@@ -1,7 +1,7 @@
import * as bodyParser from 'body-parser';
import * as express from 'express';
import { parseRequest } from 'http-signature';
-import queue from '../../queue';
+import { createHttp } from '../../queue';
const app = express();
@@ -22,7 +22,7 @@ app.post('/@:user/inbox', bodyParser.json({
return res.sendStatus(401);
}
- queue.create('http', {
+ createHttp({
type: 'processInbox',
inbox: req.body,
signature,
diff --git a/src/server/api/endpoints/following/create.ts b/src/server/api/endpoints/following/create.ts
index e568595215..9ccbe20171 100644
--- a/src/server/api/endpoints/following/create.ts
+++ b/src/server/api/endpoints/following/create.ts
@@ -4,7 +4,7 @@
import $ from 'cafy';
import User from '../../../../models/user';
import Following from '../../../../models/following';
-import queue from '../../../../queue';
+import { createHttp } from '../../../../queue';
/**
* Follow a user
@@ -56,7 +56,7 @@ module.exports = (params, user) => new Promise(async (res, rej) => {
followeeId: followee._id
});
- queue.create('http', { type: 'follow', following: _id }).save();
+ createHttp({ type: 'follow', following: _id }).save();
// Send response
res();
diff --git a/src/server/api/endpoints/following/delete.ts b/src/server/api/endpoints/following/delete.ts
index bf21bf0cb7..0684b87504 100644
--- a/src/server/api/endpoints/following/delete.ts
+++ b/src/server/api/endpoints/following/delete.ts
@@ -4,7 +4,7 @@
import $ from 'cafy';
import User from '../../../../models/user';
import Following from '../../../../models/following';
-import queue from '../../../../queue';
+import { createHttp } from '../../../../queue';
/**
* Unfollow a user
@@ -49,7 +49,7 @@ module.exports = (params, user) => new Promise(async (res, rej) => {
return rej('already not following');
}
- queue.create('http', {
+ createHttp({
type: 'unfollow',
id: exist._id
}).save(error => {
diff --git a/src/server/api/service/github.ts b/src/server/api/service/github.ts
index 4fd59c2a94..5fc4a92f57 100644
--- a/src/server/api/service/github.ts
+++ b/src/server/api/service/github.ts
@@ -3,7 +3,7 @@ import * as express from 'express';
//const crypto = require('crypto');
import User from '../../../models/user';
import config from '../../../config';
-import queue from '../../../queue';
+import { createHttp } from '../../../queue';
module.exports = async (app: express.Application) => {
if (config.github_bot == null) return;
@@ -42,7 +42,7 @@ module.exports = async (app: express.Application) => {
const commit = event.commit;
const parent = commit.parents[0];
- queue.create('http', {
+ createHttp({
type: 'gitHubFailureReport',
userId: bot._id,
parentUrl: parent.url,