diff options
Diffstat (limited to 'src/server/api')
| -rw-r--r-- | src/server/api/endpoints/channels/timeline.ts | 2 | ||||
| -rw-r--r-- | src/server/api/stream/channels/channel.ts | 39 | ||||
| -rw-r--r-- | src/server/api/stream/channels/games/reversi-game.ts | 2 | ||||
| -rw-r--r-- | src/server/api/stream/channels/messaging.ts | 47 | ||||
| -rw-r--r-- | src/server/api/stream/index.ts | 34 |
5 files changed, 114 insertions, 10 deletions
diff --git a/src/server/api/endpoints/channels/timeline.ts b/src/server/api/endpoints/channels/timeline.ts index 3ae28fc67a..acb34f124d 100644 --- a/src/server/api/endpoints/channels/timeline.ts +++ b/src/server/api/endpoints/channels/timeline.ts @@ -85,7 +85,7 @@ export default define(meta, async (ps, user) => { } //#region Construct query - const query = makePaginationQuery(Notes.createQueryBuilder('note'), ps.sinceId, ps.untilId) + const query = makePaginationQuery(Notes.createQueryBuilder('note'), ps.sinceId, ps.untilId, ps.sinceDate, ps.untilDate) .andWhere('note.channelId = :channelId', { channelId: channel.id }) .leftJoinAndSelect('note.user', 'user') .leftJoinAndSelect('note.channel', 'channel'); diff --git a/src/server/api/stream/channels/channel.ts b/src/server/api/stream/channels/channel.ts index c24b3db937..aa570d1ef4 100644 --- a/src/server/api/stream/channels/channel.ts +++ b/src/server/api/stream/channels/channel.ts @@ -1,14 +1,17 @@ import autobind from 'autobind-decorator'; import Channel from '../channel'; -import { Notes } from '../../../../models'; +import { Notes, Users } from '../../../../models'; import { isMutedUserRelated } from '../../../../misc/is-muted-user-related'; import { PackedNote } from '../../../../models/repositories/note'; +import { User } from '../../../../models/entities/user'; export default class extends Channel { public readonly chName = 'channel'; public static shouldShare = false; public static requireCredential = false; private channelId: string; + private typers: Record<User['id'], Date> = {}; + private emitTypersIntervalId: ReturnType<typeof setInterval>; @autobind public async init(params: any) { @@ -16,6 +19,8 @@ export default class extends Channel { // Subscribe stream this.subscriber.on('notesStream', this.onNote); + this.subscriber.on(`channelStream:${this.channelId}`, this.onEvent); + this.emitTypersIntervalId = setInterval(this.emitTypers, 5000); } @autobind @@ -42,8 +47,40 @@ export default class extends Channel { } @autobind + private onEvent(data: any) { + if (data.type === 'typing') { + const id = data.body; + const begin = this.typers[id] == null; + this.typers[id] = new Date(); + if (begin) { + this.emitTypers(); + } + } + } + + @autobind + private async emitTypers() { + const now = new Date(); + + // Remove not typing users + for (const [userId, date] of Object.entries(this.typers)) { + if (now.getTime() - date.getTime() > 5000) delete this.typers[userId]; + } + + const users = await Users.packMany(Object.keys(this.typers), null, { detail: false }); + + this.send({ + type: 'typers', + body: users, + }); + } + + @autobind public dispose() { // Unsubscribe events this.subscriber.off('notesStream', this.onNote); + this.subscriber.off(`channelStream:${this.channelId}`, this.onEvent); + + clearInterval(this.emitTypersIntervalId); } } diff --git a/src/server/api/stream/channels/games/reversi-game.ts b/src/server/api/stream/channels/games/reversi-game.ts index ea62ab1e88..e1c2116ac6 100644 --- a/src/server/api/stream/channels/games/reversi-game.ts +++ b/src/server/api/stream/channels/games/reversi-game.ts @@ -15,7 +15,7 @@ export default class extends Channel { private gameId: ReversiGame['id'] | null = null; private watchers: Record<User['id'], Date> = {}; - private emitWatchersIntervalId: any; + private emitWatchersIntervalId: ReturnType<typeof setInterval>; @autobind public async init(params: any) { diff --git a/src/server/api/stream/channels/messaging.ts b/src/server/api/stream/channels/messaging.ts index 8456871e6a..4c41dc820b 100644 --- a/src/server/api/stream/channels/messaging.ts +++ b/src/server/api/stream/channels/messaging.ts @@ -12,6 +12,9 @@ export default class extends Channel { private otherpartyId: string | null; private otherparty?: User; private groupId: string | null; + private subCh: string; + private typers: Record<User['id'], Date> = {}; + private emitTypersIntervalId: ReturnType<typeof setInterval>; @autobind public async init(params: any) { @@ -31,14 +34,28 @@ export default class extends Channel { } } - const subCh = this.otherpartyId + this.emitTypersIntervalId = setInterval(this.emitTypers, 5000); + + this.subCh = this.otherpartyId ? `messagingStream:${this.user!.id}-${this.otherpartyId}` : `messagingStream:${this.groupId}`; // Subscribe messaging stream - this.subscriber.on(subCh, data => { + this.subscriber.on(this.subCh, this.onEvent); + } + + @autobind + private onEvent(data: any) { + if (data.type === 'typing') { + const id = data.body; + const begin = this.typers[id] == null; + this.typers[id] = new Date(); + if (begin) { + this.emitTypers(); + } + } else { this.send(data); - }); + } } @autobind @@ -60,4 +77,28 @@ export default class extends Channel { break; } } + + @autobind + private async emitTypers() { + const now = new Date(); + + // Remove not typing users + for (const [userId, date] of Object.entries(this.typers)) { + if (now.getTime() - date.getTime() > 5000) delete this.typers[userId]; + } + + const users = await Users.packMany(Object.keys(this.typers), null, { detail: false }); + + this.send({ + type: 'typers', + body: users, + }); + } + + @autobind + public dispose() { + this.subscriber.off(this.subCh, this.onEvent); + + clearInterval(this.emitTypersIntervalId); + } } diff --git a/src/server/api/stream/index.ts b/src/server/api/stream/index.ts index 5b975d07db..c56a0a157b 100644 --- a/src/server/api/stream/index.ts +++ b/src/server/api/stream/index.ts @@ -12,6 +12,8 @@ import { Users, Followings, Mutings, UserProfiles, ChannelFollowings } from '../ import { ApiError } from '../error'; import { AccessToken } from '../../../models/entities/access-token'; import { UserProfile } from '../../../models/entities/user-profile'; +import { publishChannelStream, publishGroupMessagingStream, publishMessagingStream } from '../../../services/stream'; +import { UserGroup } from '../../../models/entities/user-group'; /** * Main stream connection @@ -27,10 +29,10 @@ export default class Connection { public subscriber: EventEmitter; private channels: Channel[] = []; private subscribingNotes: any = {}; - private followingClock: NodeJS.Timer; - private mutingClock: NodeJS.Timer; - private followingChannelsClock: NodeJS.Timer; - private userProfileClock: NodeJS.Timer; + private followingClock: ReturnType<typeof setInterval>; + private mutingClock: ReturnType<typeof setInterval>; + private followingChannelsClock: ReturnType<typeof setInterval>; + private userProfileClock: ReturnType<typeof setInterval>; constructor( wsConnection: websocket.connection, @@ -93,6 +95,12 @@ export default class Connection { case 'disconnect': this.onChannelDisconnectRequested(body); break; case 'channel': this.onChannelMessageRequested(body); break; case 'ch': this.onChannelMessageRequested(body); break; // alias + + // 個々のチャンネルではなくルートレベルでこれらのメッセージを受け取る理由は、 + // クライアントの事情を考慮したとき、入力フォームはノートチャンネルやメッセージのメインコンポーネントとは別 + // なこともあるため、それらのコンポーネントがそれぞれ各チャンネルに接続するようにするのは面倒なため。 + case 'typingOnChannel': this.typingOnChannel(body.channel); break; + case 'typingOnMessaging': this.typingOnMessaging(body); break; } } @@ -259,6 +267,24 @@ export default class Connection { } @autobind + private typingOnChannel(channel: ChannelModel['id']) { + if (this.user) { + publishChannelStream(channel, 'typing', this.user.id); + } + } + + @autobind + private typingOnMessaging(param: { partner?: User['id']; group?: UserGroup['id']; }) { + if (this.user) { + if (param.partner) { + publishMessagingStream(param.partner, this.user.id, 'typing', this.user.id); + } else if (param.group) { + publishGroupMessagingStream(param.group, 'typing', this.user.id); + } + } + } + + @autobind private async updateFollowing() { const followings = await Followings.find({ where: { |