From ad38cd2605a7cd857618a5498651d5b10e7b1bfc Mon Sep 17 00:00:00 2001 From: Akihiko Odaki Date: Mon, 2 Apr 2018 13:33:46 +0900 Subject: Introduce publishers directory --- src/drive/add-file.ts | 2 +- src/event.ts | 80 ---------------------- src/notify.ts | 50 -------------- src/processor/http/follow.ts | 4 +- src/publishers/notify.ts | 50 ++++++++++++++ src/publishers/push-sw.ts | 52 ++++++++++++++ src/publishers/stream.ts | 73 ++++++++++++++++++++ src/push-sw.ts | 52 -------------- src/server/api/common/read-messaging-message.ts | 6 +- src/server/api/common/read-notification.ts | 2 +- src/server/api/endpoints/drive/files/update.ts | 2 +- src/server/api/endpoints/drive/folders/create.ts | 2 +- src/server/api/endpoints/drive/folders/update.ts | 2 +- src/server/api/endpoints/following/delete.ts | 2 +- src/server/api/endpoints/i/regenerate_token.ts | 2 +- src/server/api/endpoints/i/update.ts | 2 +- .../api/endpoints/i/update_client_setting.ts | 2 +- src/server/api/endpoints/i/update_home.ts | 2 +- src/server/api/endpoints/i/update_mobile_home.ts | 2 +- .../api/endpoints/messaging/messages/create.ts | 5 +- .../endpoints/notifications/mark_as_read_all.ts | 2 +- src/server/api/endpoints/othello/match.ts | 2 +- src/server/api/endpoints/posts/create.ts | 15 ++-- src/server/api/endpoints/posts/polls/vote.ts | 4 +- src/server/api/endpoints/posts/reactions/create.ts | 5 +- src/server/api/endpoints/posts/reactions/delete.ts | 2 +- src/server/api/private/signin.ts | 2 +- src/server/api/service/twitter.ts | 2 +- src/server/api/stream/othello-game.ts | 2 +- src/server/api/stream/othello.ts | 2 +- 30 files changed, 214 insertions(+), 218 deletions(-) delete mode 100644 src/event.ts delete mode 100644 src/notify.ts create mode 100644 src/publishers/notify.ts create mode 100644 src/publishers/push-sw.ts create mode 100644 src/publishers/stream.ts delete mode 100644 src/push-sw.ts (limited to 'src') diff --git a/src/drive/add-file.ts b/src/drive/add-file.ts index db13e04be0..4a718a9da0 100644 --- a/src/drive/add-file.ts +++ b/src/drive/add-file.ts @@ -13,7 +13,7 @@ import prominence = require('prominence'); import DriveFile, { getGridFSBucket } from '../models/drive-file'; import DriveFolder from '../models/drive-folder'; import { pack } from '../models/drive-file'; -import event, { publishDriveStream } from '../event'; +import event, { publishDriveStream } from '../publishers/stream'; import getAcct from '../user/get-acct'; import config from '../config'; diff --git a/src/event.ts b/src/event.ts deleted file mode 100644 index 81876b3cf4..0000000000 --- a/src/event.ts +++ /dev/null @@ -1,80 +0,0 @@ -import * as mongo from 'mongodb'; -import * as redis from 'redis'; -import swPush from './push-sw'; -import config from './config'; - -type ID = string | mongo.ObjectID; - -class MisskeyEvent { - private redisClient: redis.RedisClient; - - constructor() { - // Connect to Redis - this.redisClient = redis.createClient( - config.redis.port, config.redis.host); - } - - public publishUserStream(userId: ID, type: string, value?: any): void { - this.publish(`user-stream:${userId}`, type, typeof value === 'undefined' ? null : value); - } - - public publishSw(userId: ID, type: string, value?: any): void { - swPush(userId, type, value); - } - - public publishDriveStream(userId: ID, type: string, value?: any): void { - this.publish(`drive-stream:${userId}`, type, typeof value === 'undefined' ? null : value); - } - - public publishPostStream(postId: ID, type: string, value?: any): void { - this.publish(`post-stream:${postId}`, type, typeof value === 'undefined' ? null : value); - } - - public publishMessagingStream(userId: ID, otherpartyId: ID, type: string, value?: any): void { - this.publish(`messaging-stream:${userId}-${otherpartyId}`, type, typeof value === 'undefined' ? null : value); - } - - public publishMessagingIndexStream(userId: ID, type: string, value?: any): void { - this.publish(`messaging-index-stream:${userId}`, type, typeof value === 'undefined' ? null : value); - } - - public publishOthelloStream(userId: ID, type: string, value?: any): void { - this.publish(`othello-stream:${userId}`, type, typeof value === 'undefined' ? null : value); - } - - public publishOthelloGameStream(gameId: ID, type: string, value?: any): void { - this.publish(`othello-game-stream:${gameId}`, type, typeof value === 'undefined' ? null : value); - } - - public publishChannelStream(channelId: ID, type: string, value?: any): void { - this.publish(`channel-stream:${channelId}`, type, typeof value === 'undefined' ? null : value); - } - - private publish(channel: string, type: string, value?: any): void { - const message = value == null ? - { type: type } : - { type: type, body: value }; - - this.redisClient.publish(`misskey:${channel}`, JSON.stringify(message)); - } -} - -const ev = new MisskeyEvent(); - -export default ev.publishUserStream.bind(ev); - -export const pushSw = ev.publishSw.bind(ev); - -export const publishDriveStream = ev.publishDriveStream.bind(ev); - -export const publishPostStream = ev.publishPostStream.bind(ev); - -export const publishMessagingStream = ev.publishMessagingStream.bind(ev); - -export const publishMessagingIndexStream = ev.publishMessagingIndexStream.bind(ev); - -export const publishOthelloStream = ev.publishOthelloStream.bind(ev); - -export const publishOthelloGameStream = ev.publishOthelloGameStream.bind(ev); - -export const publishChannelStream = ev.publishChannelStream.bind(ev); diff --git a/src/notify.ts b/src/notify.ts deleted file mode 100644 index 228317b88f..0000000000 --- a/src/notify.ts +++ /dev/null @@ -1,50 +0,0 @@ -import * as mongo from 'mongodb'; -import Notification from './models/notification'; -import Mute from './models/mute'; -import event from './event'; -import { pack } from './models/notification'; - -export default ( - notifiee: mongo.ObjectID, - notifier: mongo.ObjectID, - type: string, - content?: any -) => new Promise(async (resolve, reject) => { - if (notifiee.equals(notifier)) { - return resolve(); - } - - // Create notification - const notification = await Notification.insert(Object.assign({ - createdAt: new Date(), - notifieeId: notifiee, - notifierId: notifier, - type: type, - isRead: false - }, content)); - - resolve(notification); - - // Publish notification event - event(notifiee, 'notification', - await pack(notification)); - - // 3秒経っても(今回作成した)通知が既読にならなかったら「未読の通知がありますよ」イベントを発行する - setTimeout(async () => { - const fresh = await Notification.findOne({ _id: notification._id }, { isRead: true }); - if (!fresh.isRead) { - //#region ただしミュートしているユーザーからの通知なら無視 - const mute = await Mute.find({ - muterId: notifiee, - deletedAt: { $exists: false } - }); - const mutedUserIds = mute.map(m => m.muteeId.toString()); - if (mutedUserIds.indexOf(notifier.toString()) != -1) { - return; - } - //#endregion - - event(notifiee, 'unread_notification', await pack(notification)); - } - }, 3000); -}); diff --git a/src/processor/http/follow.ts b/src/processor/http/follow.ts index d6ce00006f..a7e4fa23d5 100644 --- a/src/processor/http/follow.ts +++ b/src/processor/http/follow.ts @@ -3,8 +3,8 @@ import { sign } from 'http-signature'; import { URL } from 'url'; import User, { isLocalUser, pack as packUser, ILocalUser } from '../../models/user'; import Following from '../../models/following'; -import event from '../../event'; -import notify from '../../notify'; +import event from '../../publishers/stream'; +import notify from '../../publishers/notify'; import context from '../../remote/activitypub/renderer/context'; import render from '../../remote/activitypub/renderer/follow'; import config from '../../config'; diff --git a/src/publishers/notify.ts b/src/publishers/notify.ts new file mode 100644 index 0000000000..2b89515d42 --- /dev/null +++ b/src/publishers/notify.ts @@ -0,0 +1,50 @@ +import * as mongo from 'mongodb'; +import Notification from '../models/notification'; +import Mute from '../models/mute'; +import { pack } from '../models/notification'; +import stream from './stream'; + +export default ( + notifiee: mongo.ObjectID, + notifier: mongo.ObjectID, + type: string, + content?: any +) => new Promise(async (resolve, reject) => { + if (notifiee.equals(notifier)) { + return resolve(); + } + + // Create notification + const notification = await Notification.insert(Object.assign({ + createdAt: new Date(), + notifieeId: notifiee, + notifierId: notifier, + type: type, + isRead: false + }, content)); + + resolve(notification); + + // Publish notification event + stream(notifiee, 'notification', + await pack(notification)); + + // 3秒経っても(今回作成した)通知が既読にならなかったら「未読の通知がありますよ」イベントを発行する + setTimeout(async () => { + const fresh = await Notification.findOne({ _id: notification._id }, { isRead: true }); + if (!fresh.isRead) { + //#region ただしミュートしているユーザーからの通知なら無視 + const mute = await Mute.find({ + muterId: notifiee, + deletedAt: { $exists: false } + }); + const mutedUserIds = mute.map(m => m.muteeId.toString()); + if (mutedUserIds.indexOf(notifier.toString()) != -1) { + return; + } + //#endregion + + stream(notifiee, 'unread_notification', await pack(notification)); + } + }, 3000); +}); diff --git a/src/publishers/push-sw.ts b/src/publishers/push-sw.ts new file mode 100644 index 0000000000..aab91df62f --- /dev/null +++ b/src/publishers/push-sw.ts @@ -0,0 +1,52 @@ +const push = require('web-push'); +import * as mongo from 'mongodb'; +import Subscription from '../models/sw-subscription'; +import config from '../config'; + +if (config.sw) { + // アプリケーションの連絡先と、サーバーサイドの鍵ペアの情報を登録 + push.setVapidDetails( + config.maintainer.url, + config.sw.public_key, + config.sw.private_key); +} + +export default async function(userId: mongo.ObjectID | string, type, body?) { + if (!config.sw) return; + + if (typeof userId === 'string') { + userId = new mongo.ObjectID(userId); + } + + // Fetch + const subscriptions = await Subscription.find({ + userId: userId + }); + + subscriptions.forEach(subscription => { + const pushSubscription = { + endpoint: subscription.endpoint, + keys: { + auth: subscription.auth, + p256dh: subscription.publickey + } + }; + + push.sendNotification(pushSubscription, JSON.stringify({ + type, body + })).catch(err => { + //console.log(err.statusCode); + //console.log(err.headers); + //console.log(err.body); + + if (err.statusCode == 410) { + Subscription.remove({ + userId: userId, + endpoint: subscription.endpoint, + auth: subscription.auth, + publickey: subscription.publickey + }); + } + }); + }); +} diff --git a/src/publishers/stream.ts b/src/publishers/stream.ts new file mode 100644 index 0000000000..498ff33f31 --- /dev/null +++ b/src/publishers/stream.ts @@ -0,0 +1,73 @@ +import * as mongo from 'mongodb'; +import * as redis from 'redis'; +import config from '../config'; + +type ID = string | mongo.ObjectID; + +class MisskeyEvent { + private redisClient: redis.RedisClient; + + constructor() { + // Connect to Redis + this.redisClient = redis.createClient( + config.redis.port, config.redis.host); + } + + public publishUserStream(userId: ID, type: string, value?: any): void { + this.publish(`user-stream:${userId}`, type, typeof value === 'undefined' ? null : value); + } + + public publishDriveStream(userId: ID, type: string, value?: any): void { + this.publish(`drive-stream:${userId}`, type, typeof value === 'undefined' ? null : value); + } + + public publishPostStream(postId: ID, type: string, value?: any): void { + this.publish(`post-stream:${postId}`, type, typeof value === 'undefined' ? null : value); + } + + public publishMessagingStream(userId: ID, otherpartyId: ID, type: string, value?: any): void { + this.publish(`messaging-stream:${userId}-${otherpartyId}`, type, typeof value === 'undefined' ? null : value); + } + + public publishMessagingIndexStream(userId: ID, type: string, value?: any): void { + this.publish(`messaging-index-stream:${userId}`, type, typeof value === 'undefined' ? null : value); + } + + public publishOthelloStream(userId: ID, type: string, value?: any): void { + this.publish(`othello-stream:${userId}`, type, typeof value === 'undefined' ? null : value); + } + + public publishOthelloGameStream(gameId: ID, type: string, value?: any): void { + this.publish(`othello-game-stream:${gameId}`, type, typeof value === 'undefined' ? null : value); + } + + public publishChannelStream(channelId: ID, type: string, value?: any): void { + this.publish(`channel-stream:${channelId}`, type, typeof value === 'undefined' ? null : value); + } + + private publish(channel: string, type: string, value?: any): void { + const message = value == null ? + { type: type } : + { type: type, body: value }; + + this.redisClient.publish(`misskey:${channel}`, JSON.stringify(message)); + } +} + +const ev = new MisskeyEvent(); + +export default ev.publishUserStream.bind(ev); + +export const publishDriveStream = ev.publishDriveStream.bind(ev); + +export const publishPostStream = ev.publishPostStream.bind(ev); + +export const publishMessagingStream = ev.publishMessagingStream.bind(ev); + +export const publishMessagingIndexStream = ev.publishMessagingIndexStream.bind(ev); + +export const publishOthelloStream = ev.publishOthelloStream.bind(ev); + +export const publishOthelloGameStream = ev.publishOthelloGameStream.bind(ev); + +export const publishChannelStream = ev.publishChannelStream.bind(ev); diff --git a/src/push-sw.ts b/src/push-sw.ts deleted file mode 100644 index fcef7796d9..0000000000 --- a/src/push-sw.ts +++ /dev/null @@ -1,52 +0,0 @@ -const push = require('web-push'); -import * as mongo from 'mongodb'; -import Subscription from './models/sw-subscription'; -import config from './config'; - -if (config.sw) { - // アプリケーションの連絡先と、サーバーサイドの鍵ペアの情報を登録 - push.setVapidDetails( - config.maintainer.url, - config.sw.public_key, - config.sw.private_key); -} - -export default async function(userId: mongo.ObjectID | string, type, body?) { - if (!config.sw) return; - - if (typeof userId === 'string') { - userId = new mongo.ObjectID(userId); - } - - // Fetch - const subscriptions = await Subscription.find({ - userId: userId - }); - - subscriptions.forEach(subscription => { - const pushSubscription = { - endpoint: subscription.endpoint, - keys: { - auth: subscription.auth, - p256dh: subscription.publickey - } - }; - - push.sendNotification(pushSubscription, JSON.stringify({ - type, body - })).catch(err => { - //console.log(err.statusCode); - //console.log(err.headers); - //console.log(err.body); - - if (err.statusCode == 410) { - Subscription.remove({ - userId: userId, - endpoint: subscription.endpoint, - auth: subscription.auth, - publickey: subscription.publickey - }); - } - }); - }); -} diff --git a/src/server/api/common/read-messaging-message.ts b/src/server/api/common/read-messaging-message.ts index 755cf1f502..c52f9363b5 100644 --- a/src/server/api/common/read-messaging-message.ts +++ b/src/server/api/common/read-messaging-message.ts @@ -1,9 +1,9 @@ import * as mongo from 'mongodb'; import Message from '../../../models/messaging-message'; import { IMessagingMessage as IMessage } from '../../../models/messaging-message'; -import publishUserStream from '../../../event'; -import { publishMessagingStream } from '../../../event'; -import { publishMessagingIndexStream } from '../../../event'; +import publishUserStream from '../../../publishers/stream'; +import { publishMessagingStream } from '../../../publishers/stream'; +import { publishMessagingIndexStream } from '../../../publishers/stream'; /** * Mark as read message(s) diff --git a/src/server/api/common/read-notification.ts b/src/server/api/common/read-notification.ts index b51c0ca004..9bd41519fb 100644 --- a/src/server/api/common/read-notification.ts +++ b/src/server/api/common/read-notification.ts @@ -1,6 +1,6 @@ import * as mongo from 'mongodb'; import { default as Notification, INotification } from '../../../models/notification'; -import publishUserStream from '../../../event'; +import publishUserStream from '../../../publishers/stream'; /** * Mark as read notification(s) diff --git a/src/server/api/endpoints/drive/files/update.ts b/src/server/api/endpoints/drive/files/update.ts index 85bd2110f2..c783ad8b3b 100644 --- a/src/server/api/endpoints/drive/files/update.ts +++ b/src/server/api/endpoints/drive/files/update.ts @@ -4,7 +4,7 @@ import $ from 'cafy'; import DriveFolder from '../../../../../models/drive-folder'; import DriveFile, { validateFileName, pack } from '../../../../../models/drive-file'; -import { publishDriveStream } from '../../../../../event'; +import { publishDriveStream } from '../../../../../publishers/stream'; /** * Update a file diff --git a/src/server/api/endpoints/drive/folders/create.ts b/src/server/api/endpoints/drive/folders/create.ts index d9d39a9184..f34d0019d7 100644 --- a/src/server/api/endpoints/drive/folders/create.ts +++ b/src/server/api/endpoints/drive/folders/create.ts @@ -3,7 +3,7 @@ */ import $ from 'cafy'; import DriveFolder, { isValidFolderName, pack } from '../../../../../models/drive-folder'; -import { publishDriveStream } from '../../../../../event'; +import { publishDriveStream } from '../../../../../publishers/stream'; /** * Create drive folder diff --git a/src/server/api/endpoints/drive/folders/update.ts b/src/server/api/endpoints/drive/folders/update.ts index 1cea05d71c..dd7e8f5c86 100644 --- a/src/server/api/endpoints/drive/folders/update.ts +++ b/src/server/api/endpoints/drive/folders/update.ts @@ -3,7 +3,7 @@ */ import $ from 'cafy'; import DriveFolder, { isValidFolderName, pack } from '../../../../../models/drive-folder'; -import { publishDriveStream } from '../../../../../event'; +import { publishDriveStream } from '../../../../../publishers/stream'; /** * Update a folder diff --git a/src/server/api/endpoints/following/delete.ts b/src/server/api/endpoints/following/delete.ts index 77a6cebee0..3facfdcdd4 100644 --- a/src/server/api/endpoints/following/delete.ts +++ b/src/server/api/endpoints/following/delete.ts @@ -4,7 +4,7 @@ import $ from 'cafy'; import User, { pack as packUser } from '../../../../models/user'; import Following from '../../../../models/following'; -import event from '../../../../event'; +import event from '../../../../publishers/stream'; /** * Unfollow a user diff --git a/src/server/api/endpoints/i/regenerate_token.ts b/src/server/api/endpoints/i/regenerate_token.ts index d7cb697848..9aa6725f8c 100644 --- a/src/server/api/endpoints/i/regenerate_token.ts +++ b/src/server/api/endpoints/i/regenerate_token.ts @@ -4,7 +4,7 @@ import $ from 'cafy'; import * as bcrypt from 'bcryptjs'; import User from '../../../../models/user'; -import event from '../../../../event'; +import event from '../../../../publishers/stream'; import generateUserToken from '../../common/generate-native-user-token'; /** diff --git a/src/server/api/endpoints/i/update.ts b/src/server/api/endpoints/i/update.ts index c4ec413399..279b062f52 100644 --- a/src/server/api/endpoints/i/update.ts +++ b/src/server/api/endpoints/i/update.ts @@ -3,7 +3,7 @@ */ import $ from 'cafy'; import User, { isValidName, isValidDescription, isValidLocation, isValidBirthday, pack } from '../../../../models/user'; -import event from '../../../../event'; +import event from '../../../../publishers/stream'; import config from '../../../../config'; /** diff --git a/src/server/api/endpoints/i/update_client_setting.ts b/src/server/api/endpoints/i/update_client_setting.ts index 263ac6d07d..10741aceba 100644 --- a/src/server/api/endpoints/i/update_client_setting.ts +++ b/src/server/api/endpoints/i/update_client_setting.ts @@ -3,7 +3,7 @@ */ import $ from 'cafy'; import User, { pack } from '../../../../models/user'; -import event from '../../../../event'; +import event from '../../../../publishers/stream'; /** * Update myself diff --git a/src/server/api/endpoints/i/update_home.ts b/src/server/api/endpoints/i/update_home.ts index 227b8dd5a4..91be0714d7 100644 --- a/src/server/api/endpoints/i/update_home.ts +++ b/src/server/api/endpoints/i/update_home.ts @@ -3,7 +3,7 @@ */ import $ from 'cafy'; import User from '../../../../models/user'; -import event from '../../../../event'; +import event from '../../../../publishers/stream'; module.exports = async (params, user) => new Promise(async (res, rej) => { // Get 'home' parameter diff --git a/src/server/api/endpoints/i/update_mobile_home.ts b/src/server/api/endpoints/i/update_mobile_home.ts index 007eb2eab3..1efda120d5 100644 --- a/src/server/api/endpoints/i/update_mobile_home.ts +++ b/src/server/api/endpoints/i/update_mobile_home.ts @@ -3,7 +3,7 @@ */ import $ from 'cafy'; import User from '../../../../models/user'; -import event from '../../../../event'; +import event from '../../../../publishers/stream'; module.exports = async (params, user) => new Promise(async (res, rej) => { // Get 'home' parameter diff --git a/src/server/api/endpoints/messaging/messages/create.ts b/src/server/api/endpoints/messaging/messages/create.ts index 604da32d2e..085e75e6cf 100644 --- a/src/server/api/endpoints/messaging/messages/create.ts +++ b/src/server/api/endpoints/messaging/messages/create.ts @@ -9,8 +9,9 @@ import User from '../../../../../models/user'; import Mute from '../../../../../models/mute'; import DriveFile from '../../../../../models/drive-file'; import { pack } from '../../../../../models/messaging-message'; -import publishUserStream from '../../../../../event'; -import { publishMessagingStream, publishMessagingIndexStream, pushSw } from '../../../../../event'; +import publishUserStream from '../../../../../publishers/stream'; +import { publishMessagingStream, publishMessagingIndexStream } from '../../../../../publishers/stream'; +import pushSw from '../../../../../publishers/push-sw'; import html from '../../../../../text/html'; import parse from '../../../../../text/parse'; import config from '../../../../../config'; diff --git a/src/server/api/endpoints/notifications/mark_as_read_all.ts b/src/server/api/endpoints/notifications/mark_as_read_all.ts index 3ba00a9070..01c9145837 100644 --- a/src/server/api/endpoints/notifications/mark_as_read_all.ts +++ b/src/server/api/endpoints/notifications/mark_as_read_all.ts @@ -2,7 +2,7 @@ * Module dependencies */ import Notification from '../../../../models/notification'; -import event from '../../../../event'; +import event from '../../../../publishers/stream'; /** * Mark as read all notifications diff --git a/src/server/api/endpoints/othello/match.ts b/src/server/api/endpoints/othello/match.ts index 1fc46ab908..d9274f8f9c 100644 --- a/src/server/api/endpoints/othello/match.ts +++ b/src/server/api/endpoints/othello/match.ts @@ -2,7 +2,7 @@ import $ from 'cafy'; import Matching, { pack as packMatching } from '../../../../models/othello-matching'; import OthelloGame, { pack as packGame } from '../../../../models/othello-game'; import User from '../../../../models/user'; -import publishUserStream, { publishOthelloStream } from '../../../../event'; +import publishUserStream, { publishOthelloStream } from '../../../../publishers/stream'; import { eighteight } from '../../../../othello/maps'; module.exports = (params, user) => new Promise(async (res, rej) => { diff --git a/src/server/api/endpoints/posts/create.ts b/src/server/api/endpoints/posts/create.ts index bf08fe2839..34a3aa1901 100644 --- a/src/server/api/endpoints/posts/create.ts +++ b/src/server/api/endpoints/posts/create.ts @@ -15,8 +15,9 @@ import Watching from '../../../../models/post-watching'; import ChannelWatching from '../../../../models/channel-watching'; import { pack } from '../../../../models/post'; import watch from '../../common/watch-post'; -import event, { pushSw, publishChannelStream } from '../../../../event'; -import notify from '../../../../notify'; +import stream, { publishChannelStream } from '../../../../publishers/stream'; +import notify from '../../../../publishers/notify'; +import pushSw from '../../../../publishers/push-sw'; import getAcct from '../../../../user/get-acct'; import parseAcct from '../../../../user/parse-acct'; import config from '../../../../config'; @@ -306,7 +307,7 @@ module.exports = (params, user: ILocalUser, app) => new Promise(async (res, rej) }); const mentioneesMutedUserIds = mentioneeMutes.map(m => m.muteeId.toString()); if (mentioneesMutedUserIds.indexOf(user._id.toString()) == -1) { - event(mentionee, reason, postObj); + stream(mentionee, reason, postObj); pushSw(mentionee, reason, postObj); } } @@ -315,7 +316,7 @@ module.exports = (params, user: ILocalUser, app) => new Promise(async (res, rej) // タイムラインへの投稿 if (!channel) { // Publish event to myself's stream - event(user._id, 'post', postObj); + stream(user._id, 'post', postObj); // Fetch all followers const followers = await Following @@ -330,7 +331,7 @@ module.exports = (params, user: ILocalUser, app) => new Promise(async (res, rej) // Publish event to followers stream followers.forEach(following => - event(following.followerId, 'post', postObj)); + stream(following.followerId, 'post', postObj)); } // チャンネルへの投稿 @@ -354,7 +355,7 @@ module.exports = (params, user: ILocalUser, app) => new Promise(async (res, rej) // チャンネルの視聴者(のタイムライン)に配信 watches.forEach(w => { - event(w.userId, 'post', postObj); + stream(w.userId, 'post', postObj); }); } @@ -448,7 +449,7 @@ module.exports = (params, user: ILocalUser, app) => new Promise(async (res, rej) } else { // Publish event if (!user._id.equals(repost.userId)) { - event(repost.userId, 'repost', postObj); + stream(repost.userId, 'repost', postObj); } } diff --git a/src/server/api/endpoints/posts/polls/vote.ts b/src/server/api/endpoints/posts/polls/vote.ts index be1fd7b5d0..029fb93230 100644 --- a/src/server/api/endpoints/posts/polls/vote.ts +++ b/src/server/api/endpoints/posts/polls/vote.ts @@ -6,8 +6,8 @@ import Vote from '../../../../../models/poll-vote'; import Post from '../../../../../models/post'; import Watching from '../../../../../models/post-watching'; import watch from '../../../common/watch-post'; -import { publishPostStream } from '../../../../../event'; -import notify from '../../../../../notify'; +import { publishPostStream } from '../../../../../publishers/stream'; +import notify from '../../../../../publishers/notify'; /** * Vote poll of a post diff --git a/src/server/api/endpoints/posts/reactions/create.ts b/src/server/api/endpoints/posts/reactions/create.ts index 408b2483af..8b5f1e57d0 100644 --- a/src/server/api/endpoints/posts/reactions/create.ts +++ b/src/server/api/endpoints/posts/reactions/create.ts @@ -7,8 +7,9 @@ import Post, { pack as packPost } from '../../../../../models/post'; import { pack as packUser } from '../../../../../models/user'; import Watching from '../../../../../models/post-watching'; import watch from '../../../common/watch-post'; -import { publishPostStream, pushSw } from '../../../../../event'; -import notify from '../../../../../notify'; +import { publishPostStream } from '../../../../../publishers/stream'; +import notify from '../../../../../publishers/notify'; +import pushSw from '../../../../../publishers/push-sw'; /** * React to a post diff --git a/src/server/api/endpoints/posts/reactions/delete.ts b/src/server/api/endpoints/posts/reactions/delete.ts index 11f5c7dafa..3a88bbd7ca 100644 --- a/src/server/api/endpoints/posts/reactions/delete.ts +++ b/src/server/api/endpoints/posts/reactions/delete.ts @@ -4,7 +4,7 @@ import $ from 'cafy'; import Reaction from '../../../../../models/post-reaction'; import Post from '../../../../../models/post'; -// import event from '../../../event'; +// import event from '../../../publishers/stream'; /** * Unreact to a post diff --git a/src/server/api/private/signin.ts b/src/server/api/private/signin.ts index bf883ee27a..e0bd67d1ca 100644 --- a/src/server/api/private/signin.ts +++ b/src/server/api/private/signin.ts @@ -3,7 +3,7 @@ import * as bcrypt from 'bcryptjs'; import * as speakeasy from 'speakeasy'; import User, { ILocalUser } from '../../../models/user'; import Signin, { pack } from '../../../models/signin'; -import event from '../../../event'; +import event from '../../../publishers/stream'; import signin from '../common/signin'; import config from '../../../config'; diff --git a/src/server/api/service/twitter.ts b/src/server/api/service/twitter.ts index 69fa5f3c67..77b932b13b 100644 --- a/src/server/api/service/twitter.ts +++ b/src/server/api/service/twitter.ts @@ -6,7 +6,7 @@ import * as uuid from 'uuid'; import autwh from 'autwh'; import redis from '../../../db/redis'; import User, { pack } from '../../../models/user'; -import event from '../../../event'; +import event from '../../../publishers/stream'; import config from '../../../config'; import signin from '../common/signin'; diff --git a/src/server/api/stream/othello-game.ts b/src/server/api/stream/othello-game.ts index 6eb610331c..841e542610 100644 --- a/src/server/api/stream/othello-game.ts +++ b/src/server/api/stream/othello-game.ts @@ -2,7 +2,7 @@ import * as websocket from 'websocket'; import * as redis from 'redis'; import * as CRC32 from 'crc-32'; import OthelloGame, { pack } from '../../../models/othello-game'; -import { publishOthelloGameStream } from '../../../event'; +import { publishOthelloGameStream } from '../../../publishers/stream'; import Othello from '../../../othello/core'; import * as maps from '../../../othello/maps'; import { ParsedUrlQuery } from 'querystring'; diff --git a/src/server/api/stream/othello.ts b/src/server/api/stream/othello.ts index 4c292056da..fa62b05836 100644 --- a/src/server/api/stream/othello.ts +++ b/src/server/api/stream/othello.ts @@ -2,7 +2,7 @@ import * as mongo from 'mongodb'; import * as websocket from 'websocket'; import * as redis from 'redis'; import Matching, { pack } from '../../../models/othello-matching'; -import publishUserStream from '../../../event'; +import publishUserStream from '../../../publishers/stream'; export default function(request: websocket.request, connection: websocket.connection, subscriber: redis.RedisClient, user: any): void { // Subscribe othello stream -- cgit v1.2.3-freya