summaryrefslogtreecommitdiff
path: root/src/server/api/stream/index.ts
diff options
context:
space:
mode:
authorsyuilo <Syuilotan@yahoo.co.jp>2018-10-07 11:06:17 +0900
committerGitHub <noreply@github.com>2018-10-07 11:06:17 +0900
commitd0570d7fe3a3bf3c6b0312dece74bacc04c3534a (patch)
tree698218279a38f9c78b0350e81b8ac77ae52e4a0d /src/server/api/stream/index.ts
parentFix お知らせが確認中...のままになる(Announcement Fetching...) (... (diff)
downloadmisskey-d0570d7fe3a3bf3c6b0312dece74bacc04c3534a.tar.gz
misskey-d0570d7fe3a3bf3c6b0312dece74bacc04c3534a.tar.bz2
misskey-d0570d7fe3a3bf3c6b0312dece74bacc04c3534a.zip
V10 (#2826)
* wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * Update CHANGELOG.md * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * Update CHANGELOG.md * Update CHANGELOG.md * wip * Update CHANGELOG.md * wip * wip * wip * wip
Diffstat (limited to 'src/server/api/stream/index.ts')
-rw-r--r--src/server/api/stream/index.ts213
1 files changed, 213 insertions, 0 deletions
diff --git a/src/server/api/stream/index.ts b/src/server/api/stream/index.ts
new file mode 100644
index 0000000000..bd99f2755e
--- /dev/null
+++ b/src/server/api/stream/index.ts
@@ -0,0 +1,213 @@
+import autobind from 'autobind-decorator';
+import * as websocket from 'websocket';
+import Xev from 'xev';
+import * as debug from 'debug';
+
+import User, { IUser } from '../../../models/user';
+import readNotification from '../common/read-notification';
+import call from '../call';
+import { IApp } from '../../../models/app';
+import readNote from '../../../services/note/read';
+
+import Channel from './channel';
+import channels from './channels';
+
+const log = debug('misskey');
+
+/**
+ * Main stream connection
+ */
+export default class Connection {
+ public user?: IUser;
+ public app: IApp;
+ private wsConnection: websocket.connection;
+ public subscriber: Xev;
+ private channels: Channel[] = [];
+ private subscribingNotes: any = {};
+
+ constructor(
+ wsConnection: websocket.connection,
+ subscriber: Xev,
+ user: IUser,
+ app: IApp
+ ) {
+ this.wsConnection = wsConnection;
+ this.user = user;
+ this.app = app;
+ this.subscriber = subscriber;
+
+ this.wsConnection.on('message', this.onWsConnectionMessage);
+ }
+
+ /**
+ * クライアントからメッセージ受信時
+ */
+ @autobind
+ private async onWsConnectionMessage(data: websocket.IMessage) {
+ const { type, body } = JSON.parse(data.utf8Data);
+
+ switch (type) {
+ case 'api': this.onApiRequest(body); break;
+ case 'alive': this.onAlive(); break;
+ case 'readNotification': this.onReadNotification(body); break;
+ case 'subNote': this.onSubscribeNote(body); break;
+ case 'sn': this.onSubscribeNote(body); break; // alias
+ case 'unsubNote': this.onUnsubscribeNote(body); break;
+ case 'un': this.onUnsubscribeNote(body); break; // alias
+ case 'connect': this.onChannelConnectRequested(body); break;
+ case 'disconnect': this.onChannelDisconnectRequested(body); break;
+ case 'channel': this.onChannelMessageRequested(body); break;
+ }
+ }
+
+ /**
+ * APIリクエスト要求時
+ */
+ @autobind
+ private async onApiRequest(payload: any) {
+ // 新鮮なデータを利用するためにユーザーをフェッチ
+ const user = this.user ? await User.findOne({ _id: this.user._id }) : null;
+
+ const endpoint = payload.endpoint || payload.ep; // alias
+
+ // 呼び出し
+ call(endpoint, user, this.app, payload.data).then(res => {
+ this.sendMessageToWs(`api:${payload.id}`, { res });
+ }).catch(e => {
+ this.sendMessageToWs(`api:${payload.id}`, { e });
+ });
+ }
+
+ @autobind
+ private onAlive() {
+ // Update lastUsedAt
+ User.update({ _id: this.user._id }, {
+ $set: {
+ 'lastUsedAt': new Date()
+ }
+ });
+ }
+
+ @autobind
+ private onReadNotification(payload: any) {
+ if (!payload.id) return;
+ readNotification(this.user._id, payload.id);
+ }
+
+ /**
+ * 投稿購読要求時
+ */
+ @autobind
+ private onSubscribeNote(payload: any) {
+ if (!payload.id) return;
+
+ if (this.subscribingNotes[payload.id] == null) {
+ this.subscribingNotes[payload.id] = 0;
+ }
+
+ this.subscribingNotes[payload.id]++;
+
+ if (this.subscribingNotes[payload.id] == 1) {
+ this.subscriber.on(`noteStream:${payload.id}`, this.onNoteStreamMessage);
+ }
+
+ if (payload.read) {
+ readNote(this.user._id, payload.id);
+ }
+ }
+
+ /**
+ * 投稿購読解除要求時
+ */
+ @autobind
+ private onUnsubscribeNote(payload: any) {
+ if (!payload.id) return;
+
+ this.subscribingNotes[payload.id]--;
+ if (this.subscribingNotes[payload.id] <= 0) {
+ delete this.subscribingNotes[payload.id];
+ this.subscriber.off(`noteStream:${payload.id}`, this.onNoteStreamMessage);
+ }
+ }
+
+ @autobind
+ private async onNoteStreamMessage(data: any) {
+ this.sendMessageToWs('noteUpdated', {
+ id: data.body.id,
+ type: data.type,
+ body: data.body.body,
+ });
+ }
+
+ /**
+ * チャンネル接続要求時
+ */
+ @autobind
+ private onChannelConnectRequested(payload: any) {
+ const { channel, id, params } = payload;
+ log(`CH CONNECT: ${id} ${channel} by @${this.user.username}`);
+ this.connectChannel(id, params, (channels as any)[channel]);
+ }
+
+ /**
+ * チャンネル切断要求時
+ */
+ @autobind
+ private onChannelDisconnectRequested(payload: any) {
+ const { id } = payload;
+ log(`CH DISCONNECT: ${id} by @${this.user.username}`);
+ this.disconnectChannel(id);
+ }
+
+ /**
+ * クライアントにメッセージ送信
+ */
+ @autobind
+ public sendMessageToWs(type: string, payload: any) {
+ this.wsConnection.send(JSON.stringify({
+ type: type,
+ body: payload
+ }));
+ }
+
+ /**
+ * チャンネルに接続
+ */
+ @autobind
+ private connectChannel(id: string, params: any, channelClass: { new(id: string, connection: Connection): Channel }) {
+ const channel = new channelClass(id, this);
+ this.channels.push(channel);
+ channel.init(params);
+ }
+
+ /**
+ * チャンネルから切断
+ */
+ @autobind
+ private disconnectChannel(id: string) {
+ const channel = this.channels.find(c => c.id === id);
+
+ if (channel) {
+ if (channel.dispose) channel.dispose();
+ this.channels = this.channels.filter(c => c.id !== id);
+ }
+ }
+
+ @autobind
+ private onChannelMessageRequested(data: any) {
+ const channel = this.channels.find(c => c.id === data.id);
+ if (channel != null && channel.onMessage != null) {
+ channel.onMessage(data.type, data.body);
+ }
+ }
+
+ /**
+ * ストリームが切れたとき
+ */
+ @autobind
+ public dispose() {
+ this.channels.forEach(c => {
+ if (c.dispose) c.dispose();
+ });
+ }
+}