summaryrefslogtreecommitdiff
path: root/src/server/api/stream/index.ts
diff options
context:
space:
mode:
Diffstat (limited to 'src/server/api/stream/index.ts')
-rw-r--r--src/server/api/stream/index.ts112
1 files changed, 88 insertions, 24 deletions
diff --git a/src/server/api/stream/index.ts b/src/server/api/stream/index.ts
index c56a0a157b..99ae558696 100644
--- a/src/server/api/stream/index.ts
+++ b/src/server/api/stream/index.ts
@@ -14,6 +14,7 @@ import { AccessToken } from '../../../models/entities/access-token';
import { UserProfile } from '../../../models/entities/user-profile';
import { publishChannelStream, publishGroupMessagingStream, publishMessagingStream } from '../../../services/stream';
import { UserGroup } from '../../../models/entities/user-group';
+import { PackedNote } from '../../../models/repositories/note';
/**
* Main stream connection
@@ -29,10 +30,7 @@ export default class Connection {
public subscriber: EventEmitter;
private channels: Channel[] = [];
private subscribingNotes: any = {};
- private followingClock: ReturnType<typeof setInterval>;
- private mutingClock: ReturnType<typeof setInterval>;
- private followingChannelsClock: ReturnType<typeof setInterval>;
- private userProfileClock: ReturnType<typeof setInterval>;
+ private cachedNotes: PackedNote[] = [];
constructor(
wsConnection: websocket.connection,
@@ -53,16 +51,49 @@ export default class Connection {
if (this.user) {
this.updateFollowing();
- this.followingClock = setInterval(this.updateFollowing, 5000);
-
this.updateMuting();
- this.mutingClock = setInterval(this.updateMuting, 5000);
-
this.updateFollowingChannels();
- this.followingChannelsClock = setInterval(this.updateFollowingChannels, 5000);
-
this.updateUserProfile();
- this.userProfileClock = setInterval(this.updateUserProfile, 5000);
+
+ this.subscriber.on(`user:${this.user.id}`, ({ type, body }) => {
+ this.onUserEvent(type, body);
+ });
+ }
+ }
+
+ @autobind
+ private onUserEvent(type: string, body: any) {
+ switch (type) {
+ case 'follow':
+ this.following.add(body.id);
+ break;
+
+ case 'unfollow':
+ this.following.delete(body.id);
+ break;
+
+ case 'mute':
+ this.muting.add(body.id);
+ break;
+
+ case 'unmute':
+ this.muting.delete(body.id);
+ break;
+
+ case 'followChannel':
+ this.followingChannels.add(body.id);
+ break;
+
+ case 'unfollowChannel':
+ this.followingChannels.delete(body.id);
+ break;
+
+ case 'updateUserProfile':
+ this.userProfile = body;
+ break;
+
+ default:
+ break;
}
}
@@ -86,9 +117,9 @@ export default class Connection {
switch (type) {
case 'api': this.onApiRequest(body); break;
case 'readNotification': this.onReadNotification(body); break;
- case 'subNote': this.onSubscribeNote(body, true); break;
- case 'sn': this.onSubscribeNote(body, true); break; // alias
- case 's': this.onSubscribeNote(body, false); break;
+ case 'subNote': this.onSubscribeNote(body); break;
+ case 's': this.onSubscribeNote(body); break; // alias
+ case 'sr': this.onSubscribeNote(body); this.readNote(body); break;
case 'unsubNote': this.onUnsubscribeNote(body); break;
case 'un': this.onUnsubscribeNote(body); break; // alias
case 'connect': this.onChannelConnectRequested(body); break;
@@ -109,6 +140,48 @@ export default class Connection {
this.sendMessageToWs(type, body);
}
+ @autobind
+ public cacheNote(note: PackedNote) {
+ const add = (note: PackedNote) => {
+ const existIndex = this.cachedNotes.findIndex(n => n.id === note.id);
+ if (existIndex > -1) {
+ this.cachedNotes[existIndex] = note;
+ return;
+ }
+
+ this.cachedNotes.unshift(note);
+ if (this.cachedNotes.length > 32) {
+ this.cachedNotes.splice(32);
+ }
+ };
+
+ add(note);
+ if (note.reply) add(note.reply);
+ if (note.renote) add(note.renote);
+ }
+
+ @autobind
+ private readNote(body: any) {
+ const id = body.id;
+
+ const note = this.cachedNotes.find(n => n.id === id);
+ if (note == null) return;
+
+ if (this.user && (note.userId !== this.user.id)) {
+ if (note.mentions && note.mentions.includes(this.user.id)) {
+ readNote(this.user.id, [note]);
+ } else if (note.visibleUserIds && note.visibleUserIds.includes(this.user.id)) {
+ readNote(this.user.id, [note]);
+ }
+
+ if (this.followingChannels.has(note.channelId)) {
+ // TODO
+ }
+
+ // TODO: アンテナの既読処理
+ }
+ }
+
/**
* APIリクエスト要求時
*/
@@ -145,7 +218,7 @@ export default class Connection {
* 投稿購読要求時
*/
@autobind
- private onSubscribeNote(payload: any, read: boolean) {
+ private onSubscribeNote(payload: any) {
if (!payload.id) return;
if (this.subscribingNotes[payload.id] == null) {
@@ -157,10 +230,6 @@ export default class Connection {
if (this.subscribingNotes[payload.id] === 1) {
this.subscriber.on(`noteStream:${payload.id}`, this.onNoteStreamMessage);
}
-
- if (this.user && read) {
- readNote(this.user.id, payload.id);
- }
}
/**
@@ -335,10 +404,5 @@ export default class Connection {
for (const c of this.channels.filter(c => c.dispose)) {
if (c.dispose) c.dispose();
}
-
- if (this.followingClock) clearInterval(this.followingClock);
- if (this.mutingClock) clearInterval(this.mutingClock);
- if (this.followingChannelsClock) clearInterval(this.followingChannelsClock);
- if (this.userProfileClock) clearInterval(this.userProfileClock);
}
}