summaryrefslogtreecommitdiff
path: root/src/server/api
diff options
context:
space:
mode:
Diffstat (limited to 'src/server/api')
-rw-r--r--src/server/api/stream/index.ts6
-rw-r--r--src/server/api/streaming.ts32
2 files changed, 33 insertions, 5 deletions
diff --git a/src/server/api/stream/index.ts b/src/server/api/stream/index.ts
index 838d847004..afedd4362c 100644
--- a/src/server/api/stream/index.ts
+++ b/src/server/api/stream/index.ts
@@ -1,6 +1,5 @@
import autobind from 'autobind-decorator';
import * as websocket from 'websocket';
-import Xev from 'xev';
import * as debug from 'debug';
import User, { IUser } from '../../../models/user';
@@ -11,6 +10,7 @@ import readNote from '../../../services/note/read';
import Channel from './channel';
import channels from './channels';
+import { EventEmitter } from 'events';
const log = debug('misskey');
@@ -21,14 +21,14 @@ export default class Connection {
public user?: IUser;
public app: IApp;
private wsConnection: websocket.connection;
- public subscriber: Xev;
+ public subscriber: EventEmitter;
private channels: Channel[] = [];
private subscribingNotes: any = {};
public sendMessageToWsOverride: any = null; // 後方互換性のため
constructor(
wsConnection: websocket.connection,
- subscriber: Xev,
+ subscriber: EventEmitter,
user: IUser,
app: IApp
) {
diff --git a/src/server/api/streaming.ts b/src/server/api/streaming.ts
index a0a219a317..249157e222 100644
--- a/src/server/api/streaming.ts
+++ b/src/server/api/streaming.ts
@@ -1,11 +1,14 @@
import * as http from 'http';
import * as websocket from 'websocket';
+import * as redis from 'redis';
import Xev from 'xev';
import MainStreamConnection from './stream';
import { ParsedUrlQuery } from 'querystring';
import authenticate from './authenticate';
import channels from './stream/channels';
+import { EventEmitter } from 'events';
+import config from '../../config';
module.exports = (server: http.Server) => {
// Init websocket server
@@ -16,11 +19,36 @@ module.exports = (server: http.Server) => {
ws.on('request', async (request) => {
const connection = request.accept();
- const ev = new Xev();
-
const q = request.resourceURL.query as ParsedUrlQuery;
const [user, app] = await authenticate(q.i as string);
+ let ev: EventEmitter;
+
+ if (config.redis) {
+ // Connect to Redis
+ const subscriber = redis.createClient(
+ config.redis.port, config.redis.host);
+
+ subscriber.subscribe('misskey');
+
+ ev = new EventEmitter();
+
+ subscriber.on('message', async (_, data) => {
+ const obj = JSON.parse(data);
+
+ console.log(obj);
+
+ ev.emit(obj.channel, obj.message);
+ });
+
+ connection.once('close', () => {
+ subscriber.unsubscribe();
+ subscriber.quit();
+ });
+ } else {
+ ev = new Xev();
+ }
+
const main = new MainStreamConnection(connection, ev, user, app);
// 後方互換性のため