summaryrefslogtreecommitdiff
path: root/src/server/api/stream/index.ts
diff options
context:
space:
mode:
Diffstat (limited to 'src/server/api/stream/index.ts')
-rw-r--r--src/server/api/stream/index.ts74
1 files changed, 56 insertions, 18 deletions
diff --git a/src/server/api/stream/index.ts b/src/server/api/stream/index.ts
index 22f7646cb9..f73f3229d5 100644
--- a/src/server/api/stream/index.ts
+++ b/src/server/api/stream/index.ts
@@ -1,40 +1,50 @@
import autobind from 'autobind-decorator';
import * as websocket from 'websocket';
-
-import User, { IUser } from '../../../models/user';
-import readNotification from '../common/read-notification';
+import { readNotification } from '../common/read-notification';
import call from '../call';
-import { IApp } from '../../../models/app';
import readNote from '../../../services/note/read';
-
import Channel from './channel';
import channels from './channels';
import { EventEmitter } from 'events';
+import { User } from '../../../models/entities/user';
+import { App } from '../../../models/entities/app';
+import { Users, Followings, Mutings } from '../../../models';
/**
* Main stream connection
*/
export default class Connection {
- public user?: IUser;
- public app: IApp;
+ public user?: User;
+ public following: User['id'][] = [];
+ public muting: User['id'][] = [];
+ public app: App;
private wsConnection: websocket.connection;
public subscriber: EventEmitter;
private channels: Channel[] = [];
private subscribingNotes: any = {};
- public sendMessageToWsOverride: any = null; // 後方互換性のため
+ private followingClock: NodeJS.Timer;
+ private mutingClock: NodeJS.Timer;
constructor(
wsConnection: websocket.connection,
subscriber: EventEmitter,
- user: IUser,
- app: IApp
+ user: User | null | undefined,
+ app: App | null | undefined
) {
this.wsConnection = wsConnection;
- this.user = user;
- this.app = app;
this.subscriber = subscriber;
+ if (user) this.user = user;
+ if (app) this.app = app;
this.wsConnection.on('message', this.onWsConnectionMessage);
+
+ if (this.user) {
+ this.updateFollowing();
+ this.followingClock = setInterval(this.updateFollowing, 5000);
+
+ this.updateMuting();
+ this.mutingClock = setInterval(this.updateMuting, 5000);
+ }
}
/**
@@ -42,6 +52,8 @@ export default class Connection {
*/
@autobind
private async onWsConnectionMessage(data: websocket.IMessage) {
+ if (data.utf8Data == null) return;
+
const { type, body } = JSON.parse(data.utf8Data);
switch (type) {
@@ -64,7 +76,7 @@ export default class Connection {
@autobind
private async onApiRequest(payload: any) {
// 新鮮なデータを利用するためにユーザーをフェッチ
- const user = this.user ? await User.findOne({ _id: this.user._id }) : null;
+ const user = this.user ? await Users.findOne(this.user.id) : null;
const endpoint = payload.endpoint || payload.ep; // alias
@@ -79,7 +91,7 @@ export default class Connection {
@autobind
private onReadNotification(payload: any) {
if (!payload.id) return;
- readNotification(this.user._id, payload.id);
+ readNotification(this.user!.id, [payload.id]);
}
/**
@@ -99,8 +111,8 @@ export default class Connection {
this.subscriber.on(`noteStream:${payload.id}`, this.onNoteStreamMessage);
}
- if (payload.read) {
- readNote(this.user._id, payload.id);
+ if (payload.read && this.user) {
+ readNote(this.user.id, payload.id);
}
}
@@ -150,7 +162,6 @@ export default class Connection {
*/
@autobind
public sendMessageToWs(type: string, payload: any) {
- if (this.sendMessageToWsOverride) return this.sendMessageToWsOverride(type, payload); // 後方互換性のため
this.wsConnection.send(JSON.stringify({
type: type,
body: payload
@@ -208,13 +219,40 @@ export default class Connection {
}
}
+ @autobind
+ private async updateFollowing() {
+ const followings = await Followings.find({
+ where: {
+ followerId: this.user!.id
+ },
+ select: ['followeeId']
+ });
+
+ this.following = followings.map(x => x.followeeId);
+ }
+
+ @autobind
+ private async updateMuting() {
+ const mutings = await Mutings.find({
+ where: {
+ muterId: this.user!.id
+ },
+ select: ['muteeId']
+ });
+
+ this.muting = mutings.map(x => x.muteeId);
+ }
+
/**
* ストリームが切れたとき
*/
@autobind
public dispose() {
for (const c of this.channels.filter(c => c.dispose)) {
- c.dispose();
+ if (c.dispose) c.dispose();
}
+
+ if (this.followingClock) clearInterval(this.followingClock);
+ if (this.mutingClock) clearInterval(this.mutingClock);
}
}