summaryrefslogtreecommitdiff
path: root/packages/backend/src/server/api/stream/index.ts
diff options
context:
space:
mode:
authorsyuilo <Syuilotan@yahoo.co.jp>2023-09-15 14:28:29 +0900
committerGitHub <noreply@github.com>2023-09-15 14:28:29 +0900
commit6cf466e5d1d423aec8ec06f65644c73c7e9a8ecd (patch)
treeb3623e27ae94471fa4d58dda9e9296b21afbd20e /packages/backend/src/server/api/stream/index.ts
parentenhance nodeinfo by prpoagate the standart keys for homepage and repository (... (diff)
downloadsharkey-6cf466e5d1d423aec8ec06f65644c73c7e9a8ecd.tar.gz
sharkey-6cf466e5d1d423aec8ec06f65644c73c7e9a8ecd.tar.bz2
sharkey-6cf466e5d1d423aec8ec06f65644c73c7e9a8ecd.zip
update deps (#11820)
* update deps * fix * wip * wip * wip * Update docker-compose.yml.example * Delete reviewer-lottery.yml * Update RepositoryModule.ts * wip * wip * clean up * update deps * wip * wip
Diffstat (limited to 'packages/backend/src/server/api/stream/index.ts')
-rw-r--r--packages/backend/src/server/api/stream/index.ts299
1 files changed, 0 insertions, 299 deletions
diff --git a/packages/backend/src/server/api/stream/index.ts b/packages/backend/src/server/api/stream/index.ts
deleted file mode 100644
index 232ec5700d..0000000000
--- a/packages/backend/src/server/api/stream/index.ts
+++ /dev/null
@@ -1,299 +0,0 @@
-/*
- * SPDX-FileCopyrightText: syuilo and other misskey contributors
- * SPDX-License-Identifier: AGPL-3.0-only
- */
-
-import * as WebSocket from 'ws';
-import type { MiUser } from '@/models/entities/User.js';
-import type { MiAccessToken } from '@/models/entities/AccessToken.js';
-import type { Packed } from '@/misc/json-schema.js';
-import type { NoteReadService } from '@/core/NoteReadService.js';
-import type { NotificationService } from '@/core/NotificationService.js';
-import { bindThis } from '@/decorators.js';
-import { CacheService } from '@/core/CacheService.js';
-import { MiUserProfile } from '@/models/index.js';
-import type { ChannelsService } from './ChannelsService.js';
-import type { EventEmitter } from 'events';
-import type Channel from './channel.js';
-import type { StreamEventEmitter, StreamMessages } from './types.js';
-
-/**
- * Main stream connection
- */
-// eslint-disable-next-line import/no-default-export
-export default class Connection {
- public user?: MiUser;
- public token?: MiAccessToken;
- private wsConnection: WebSocket.WebSocket;
- public subscriber: StreamEventEmitter;
- private channels: Channel[] = [];
- private subscribingNotes: any = {};
- private cachedNotes: Packed<'Note'>[] = [];
- public userProfile: MiUserProfile | null = null;
- public following: Set<string> = new Set();
- public followingChannels: Set<string> = new Set();
- public userIdsWhoMeMuting: Set<string> = new Set();
- public userIdsWhoBlockingMe: Set<string> = new Set();
- public userIdsWhoMeMutingRenotes: Set<string> = new Set();
- private fetchIntervalId: NodeJS.Timeout | null = null;
-
- constructor(
- private channelsService: ChannelsService,
- private noteReadService: NoteReadService,
- private notificationService: NotificationService,
- private cacheService: CacheService,
-
- user: MiUser | null | undefined,
- token: MiAccessToken | null | undefined,
- ) {
- if (user) this.user = user;
- if (token) this.token = token;
- }
-
- @bindThis
- public async fetch() {
- if (this.user == null) return;
- const [userProfile, following, followingChannels, userIdsWhoMeMuting, userIdsWhoBlockingMe, userIdsWhoMeMutingRenotes] = await Promise.all([
- this.cacheService.userProfileCache.fetch(this.user.id),
- this.cacheService.userFollowingsCache.fetch(this.user.id),
- this.cacheService.userFollowingChannelsCache.fetch(this.user.id),
- this.cacheService.userMutingsCache.fetch(this.user.id),
- this.cacheService.userBlockedCache.fetch(this.user.id),
- this.cacheService.renoteMutingsCache.fetch(this.user.id),
- ]);
- this.userProfile = userProfile;
- this.following = following;
- this.followingChannels = followingChannels;
- this.userIdsWhoMeMuting = userIdsWhoMeMuting;
- this.userIdsWhoBlockingMe = userIdsWhoBlockingMe;
- this.userIdsWhoMeMutingRenotes = userIdsWhoMeMutingRenotes;
- }
-
- @bindThis
- public async init() {
- if (this.user != null) {
- await this.fetch();
-
- if (!this.fetchIntervalId) {
- this.fetchIntervalId = setInterval(this.fetch, 1000 * 10);
- }
- }
- }
-
- @bindThis
- public async listen(subscriber: EventEmitter, wsConnection: WebSocket.WebSocket) {
- this.subscriber = subscriber;
-
- this.wsConnection = wsConnection;
- this.wsConnection.on('message', this.onWsConnectionMessage);
-
- this.subscriber.on('broadcast', data => {
- this.onBroadcastMessage(data);
- });
- }
-
- /**
- * クライアントからメッセージ受信時
- */
- @bindThis
- private async onWsConnectionMessage(data: WebSocket.RawData) {
- let obj: Record<string, any>;
-
- try {
- obj = JSON.parse(data.toString());
- } catch (e) {
- return;
- }
-
- const { type, body } = obj;
-
- switch (type) {
- case 'readNotification': this.onReadNotification(body); break;
- case 'subNote': this.onSubscribeNote(body); break;
- case 's': this.onSubscribeNote(body); break; // alias
- case 'sr': this.onSubscribeNote(body); this.readNote(body); break;
- case 'unsubNote': this.onUnsubscribeNote(body); break;
- case 'un': this.onUnsubscribeNote(body); break; // alias
- case 'connect': this.onChannelConnectRequested(body); break;
- case 'disconnect': this.onChannelDisconnectRequested(body); break;
- case 'channel': this.onChannelMessageRequested(body); break;
- case 'ch': this.onChannelMessageRequested(body); break; // alias
- }
- }
-
- @bindThis
- private onBroadcastMessage(data: StreamMessages['broadcast']['payload']) {
- this.sendMessageToWs(data.type, data.body);
- }
-
- @bindThis
- public cacheNote(note: Packed<'Note'>) {
- const add = (note: Packed<'Note'>) => {
- const existIndex = this.cachedNotes.findIndex(n => n.id === note.id);
- if (existIndex > -1) {
- this.cachedNotes[existIndex] = note;
- return;
- }
-
- this.cachedNotes.unshift(note);
- if (this.cachedNotes.length > 32) {
- this.cachedNotes.splice(32);
- }
- };
-
- add(note);
- if (note.reply) add(note.reply);
- if (note.renote) add(note.renote);
- }
-
- @bindThis
- private readNote(body: any) {
- const id = body.id;
-
- const note = this.cachedNotes.find(n => n.id === id);
- if (note == null) return;
-
- if (this.user && (note.userId !== this.user.id)) {
- this.noteReadService.read(this.user.id, [note]);
- }
- }
-
- @bindThis
- private onReadNotification(payload: any) {
- this.notificationService.readAllNotification(this.user!.id);
- }
-
- /**
- * 投稿購読要求時
- */
- @bindThis
- private onSubscribeNote(payload: any) {
- if (!payload.id) return;
-
- if (this.subscribingNotes[payload.id] == null) {
- this.subscribingNotes[payload.id] = 0;
- }
-
- this.subscribingNotes[payload.id]++;
-
- if (this.subscribingNotes[payload.id] === 1) {
- this.subscriber.on(`noteStream:${payload.id}`, this.onNoteStreamMessage);
- }
- }
-
- /**
- * 投稿購読解除要求時
- */
- @bindThis
- private onUnsubscribeNote(payload: any) {
- if (!payload.id) return;
-
- this.subscribingNotes[payload.id]--;
- if (this.subscribingNotes[payload.id] <= 0) {
- delete this.subscribingNotes[payload.id];
- this.subscriber.off(`noteStream:${payload.id}`, this.onNoteStreamMessage);
- }
- }
-
- @bindThis
- private async onNoteStreamMessage(data: StreamMessages['note']['payload']) {
- this.sendMessageToWs('noteUpdated', {
- id: data.body.id,
- type: data.type,
- body: data.body.body,
- });
- }
-
- /**
- * チャンネル接続要求時
- */
- @bindThis
- private onChannelConnectRequested(payload: any) {
- const { channel, id, params, pong } = payload;
- this.connectChannel(id, params, channel, pong);
- }
-
- /**
- * チャンネル切断要求時
- */
- @bindThis
- private onChannelDisconnectRequested(payload: any) {
- const { id } = payload;
- this.disconnectChannel(id);
- }
-
- /**
- * クライアントにメッセージ送信
- */
- @bindThis
- public sendMessageToWs(type: string, payload: any) {
- this.wsConnection.send(JSON.stringify({
- type: type,
- body: payload,
- }));
- }
-
- /**
- * チャンネルに接続
- */
- @bindThis
- public connectChannel(id: string, params: any, channel: string, pong = false) {
- const channelService = this.channelsService.getChannelService(channel);
-
- if (channelService.requireCredential && this.user == null) {
- return;
- }
-
- // 共有可能チャンネルに接続しようとしていて、かつそのチャンネルに既に接続していたら無意味なので無視
- if (channelService.shouldShare && this.channels.some(c => c.chName === channel)) {
- return;
- }
-
- const ch: Channel = channelService.create(id, this);
- this.channels.push(ch);
- ch.init(params ?? {});
-
- if (pong) {
- this.sendMessageToWs('connected', {
- id: id,
- });
- }
- }
-
- /**
- * チャンネルから切断
- * @param id チャンネルコネクションID
- */
- @bindThis
- public disconnectChannel(id: string) {
- const channel = this.channels.find(c => c.id === id);
-
- if (channel) {
- if (channel.dispose) channel.dispose();
- this.channels = this.channels.filter(c => c.id !== id);
- }
- }
-
- /**
- * チャンネルへメッセージ送信要求時
- * @param data メッセージ
- */
- @bindThis
- private onChannelMessageRequested(data: any) {
- const channel = this.channels.find(c => c.id === data.id);
- if (channel != null && channel.onMessage != null) {
- channel.onMessage(data.type, data.body);
- }
- }
-
- /**
- * ストリームが切れたとき
- */
- @bindThis
- public dispose() {
- if (this.fetchIntervalId) clearInterval(this.fetchIntervalId);
- for (const c of this.channels.filter(c => c.dispose)) {
- if (c.dispose) c.dispose();
- }
- }
-}