summaryrefslogtreecommitdiff
path: root/packages/backend/src/server/api/stream
diff options
context:
space:
mode:
authorMar0xy <marie@kaifa.ch>2023-10-03 20:21:26 +0200
committerMar0xy <marie@kaifa.ch>2023-10-03 20:21:26 +0200
commitbf3d493d049f30a93aecd8c39fbf433dcfbe959b (patch)
tree7821580eb5161f8182e8f5070baa12e21f74aa75 /packages/backend/src/server/api/stream
parentmerge: upstream (diff)
downloadsharkey-bf3d493d049f30a93aecd8c39fbf433dcfbe959b.tar.gz
sharkey-bf3d493d049f30a93aecd8c39fbf433dcfbe959b.tar.bz2
sharkey-bf3d493d049f30a93aecd8c39fbf433dcfbe959b.zip
Revert "feat: improve tl performance"
Diffstat (limited to 'packages/backend/src/server/api/stream')
-rw-r--r--packages/backend/src/server/api/stream/Connection.ts4
-rw-r--r--packages/backend/src/server/api/stream/channels/global-timeline.ts11
-rw-r--r--packages/backend/src/server/api/stream/channels/home-timeline.ts13
-rw-r--r--packages/backend/src/server/api/stream/channels/hybrid-timeline.ts13
-rw-r--r--packages/backend/src/server/api/stream/channels/local-timeline.ts11
-rw-r--r--packages/backend/src/server/api/stream/channels/user-list.ts31
6 files changed, 53 insertions, 30 deletions
diff --git a/packages/backend/src/server/api/stream/Connection.ts b/packages/backend/src/server/api/stream/Connection.ts
index f981e63871..a73071ea99 100644
--- a/packages/backend/src/server/api/stream/Connection.ts
+++ b/packages/backend/src/server/api/stream/Connection.ts
@@ -11,7 +11,7 @@ import type { NoteReadService } from '@/core/NoteReadService.js';
import type { NotificationService } from '@/core/NotificationService.js';
import { bindThis } from '@/decorators.js';
import { CacheService } from '@/core/CacheService.js';
-import { MiFollowing, MiUserProfile } from '@/models/_.js';
+import { MiUserProfile } from '@/models/_.js';
import type { StreamEventEmitter, GlobalEvents } from '@/core/GlobalEventService.js';
import type { ChannelsService } from './ChannelsService.js';
import type { EventEmitter } from 'events';
@@ -30,7 +30,7 @@ export default class Connection {
private subscribingNotes: any = {};
private cachedNotes: Packed<'Note'>[] = [];
public userProfile: MiUserProfile | null = null;
- public following: Record<string, Pick<MiFollowing, 'withReplies'> | undefined> = {};
+ public following: Set<string> = new Set();
public followingChannels: Set<string> = new Set();
public userIdsWhoMeMuting: Set<string> = new Set();
public userIdsWhoBlockingMe: Set<string> = new Set();
diff --git a/packages/backend/src/server/api/stream/channels/global-timeline.ts b/packages/backend/src/server/api/stream/channels/global-timeline.ts
index f0ac50349c..fef52b6856 100644
--- a/packages/backend/src/server/api/stream/channels/global-timeline.ts
+++ b/packages/backend/src/server/api/stream/channels/global-timeline.ts
@@ -18,6 +18,7 @@ class GlobalTimelineChannel extends Channel {
public readonly chName = 'globalTimeline';
public static shouldShare = true;
public static requireCredential = false;
+ private withReplies: boolean;
private withRenotes: boolean;
constructor(
@@ -37,6 +38,7 @@ class GlobalTimelineChannel extends Channel {
const policies = await this.roleService.getUserPolicies(this.user ? this.user.id : null);
if (!policies.gtlAvailable) return;
+ this.withReplies = params.withReplies ?? false;
this.withRenotes = params.withRenotes ?? true;
// Subscribe events
@@ -62,7 +64,7 @@ class GlobalTimelineChannel extends Channel {
}
// 関係ない返信は除外
- if (note.reply && !this.following[note.userId]?.withReplies) {
+ if (note.reply && !this.withReplies) {
const reply = note.reply;
// 「チャンネル接続主への返信」でもなければ、「チャンネル接続主が行った返信」でもなければ、「投稿者の投稿者自身への返信」でもない場合
if (reply.userId !== this.user!.id && note.userId !== this.user!.id && reply.userId !== note.userId) return;
@@ -80,6 +82,13 @@ class GlobalTimelineChannel extends Channel {
if (note.renote && !note.text && isUserRelated(note, this.userIdsWhoMeMutingRenotes)) return;
+ // 流れてきたNoteがミュートすべきNoteだったら無視する
+ // TODO: 将来的には、単にMutedNoteテーブルにレコードがあるかどうかで判定したい(以下の理由により難しそうではある)
+ // 現状では、ワードミュートにおけるMutedNoteレコードの追加処理はストリーミングに流す処理と並列で行われるため、
+ // レコードが追加されるNoteでも追加されるより先にここのストリーミングの処理に到達することが起こる。
+ // そのためレコードが存在するかのチェックでは不十分なので、改めてcheckWordMuteを呼んでいる
+ if (this.userProfile && await checkWordMute(note, this.user, this.userProfile.mutedWords)) return;
+
this.connection.cacheNote(note);
this.send('note', note);
diff --git a/packages/backend/src/server/api/stream/channels/home-timeline.ts b/packages/backend/src/server/api/stream/channels/home-timeline.ts
index 1c1b1c2ae4..198c68e1c2 100644
--- a/packages/backend/src/server/api/stream/channels/home-timeline.ts
+++ b/packages/backend/src/server/api/stream/channels/home-timeline.ts
@@ -16,6 +16,7 @@ class HomeTimelineChannel extends Channel {
public readonly chName = 'homeTimeline';
public static shouldShare = true;
public static requireCredential = true;
+ private withReplies: boolean;
private withRenotes: boolean;
constructor(
@@ -30,6 +31,7 @@ class HomeTimelineChannel extends Channel {
@bindThis
public async init(params: any) {
+ this.withReplies = params.withReplies ?? false;
this.withRenotes = params.withRenotes ?? true;
this.subscriber.on('notesStream', this.onNote);
@@ -41,7 +43,7 @@ class HomeTimelineChannel extends Channel {
if (!this.followingChannels.has(note.channelId)) return;
} else {
// その投稿のユーザーをフォローしていなかったら弾く
- if ((this.user!.id !== note.userId) && !Object.hasOwn(this.following, note.userId)) return;
+ if ((this.user!.id !== note.userId) && !this.following.has(note.userId)) return;
}
// Ignore notes from instances the user has muted
@@ -71,7 +73,7 @@ class HomeTimelineChannel extends Channel {
}
// 関係ない返信は除外
- if (note.reply && !this.following[note.userId]?.withReplies) {
+ if (note.reply && !this.withReplies) {
const reply = note.reply;
// 「チャンネル接続主への返信」でもなければ、「チャンネル接続主が行った返信」でもなければ、「投稿者の投稿者自身への返信」でもない場合
if (reply.userId !== this.user!.id && note.userId !== this.user!.id && reply.userId !== note.userId) return;
@@ -86,6 +88,13 @@ class HomeTimelineChannel extends Channel {
if (note.renote && !note.text && isUserRelated(note, this.userIdsWhoMeMutingRenotes)) return;
+ // 流れてきたNoteがミュートすべきNoteだったら無視する
+ // TODO: 将来的には、単にMutedNoteテーブルにレコードがあるかどうかで判定したい(以下の理由により難しそうではある)
+ // 現状では、ワードミュートにおけるMutedNoteレコードの追加処理はストリーミングに流す処理と並列で行われるため、
+ // レコードが追加されるNoteでも追加されるより先にここのストリーミングの処理に到達することが起こる。
+ // そのためレコードが存在するかのチェックでは不十分なので、改めてcheckWordMuteを呼んでいる
+ if (await checkWordMute(note, this.user, this.userProfile!.mutedWords)) return;
+
this.connection.cacheNote(note);
this.send('note', note);
diff --git a/packages/backend/src/server/api/stream/channels/hybrid-timeline.ts b/packages/backend/src/server/api/stream/channels/hybrid-timeline.ts
index e2f4817bfa..cde4297478 100644
--- a/packages/backend/src/server/api/stream/channels/hybrid-timeline.ts
+++ b/packages/backend/src/server/api/stream/channels/hybrid-timeline.ts
@@ -18,6 +18,7 @@ class HybridTimelineChannel extends Channel {
public readonly chName = 'hybridTimeline';
public static shouldShare = true;
public static requireCredential = true;
+ private withReplies: boolean;
private withRenotes: boolean;
constructor(
@@ -37,6 +38,7 @@ class HybridTimelineChannel extends Channel {
const policies = await this.roleService.getUserPolicies(this.user ? this.user.id : null);
if (!policies.ltlAvailable) return;
+ this.withReplies = params.withReplies ?? false;
this.withRenotes = params.withRenotes ?? true;
// Subscribe events
@@ -51,7 +53,7 @@ class HybridTimelineChannel extends Channel {
// フォローしているチャンネルの投稿 の場合だけ
if (!(
(note.channelId == null && this.user!.id === note.userId) ||
- (note.channelId == null && Object.hasOwn(this.following, note.userId)) ||
+ (note.channelId == null && this.following.has(note.userId)) ||
(note.channelId == null && (note.user.host == null && note.visibility === 'public')) ||
(note.channelId != null && this.followingChannels.has(note.channelId))
)) return;
@@ -83,7 +85,7 @@ class HybridTimelineChannel extends Channel {
if (isInstanceMuted(note, new Set<string>(this.userProfile!.mutedInstances ?? []))) return;
// 関係ない返信は除外
- if (note.reply && !this.following[note.userId]?.withReplies) {
+ if (note.reply && !this.withReplies) {
const reply = note.reply;
// 「チャンネル接続主への返信」でもなければ、「チャンネル接続主が行った返信」でもなければ、「投稿者の投稿者自身への返信」でもない場合
if (reply.userId !== this.user!.id && note.userId !== this.user!.id && reply.userId !== note.userId) return;
@@ -98,6 +100,13 @@ class HybridTimelineChannel extends Channel {
if (note.renote && !note.text && isUserRelated(note, this.userIdsWhoMeMutingRenotes)) return;
+ // 流れてきたNoteがミュートすべきNoteだったら無視する
+ // TODO: 将来的には、単にMutedNoteテーブルにレコードがあるかどうかで判定したい(以下の理由により難しそうではある)
+ // 現状では、ワードミュートにおけるMutedNoteレコードの追加処理はストリーミングに流す処理と並列で行われるため、
+ // レコードが追加されるNoteでも追加されるより先にここのストリーミングの処理に到達することが起こる。
+ // そのためレコードが存在するかのチェックでは不十分なので、改めてcheckWordMuteを呼んでいる
+ if (this.userProfile && await checkWordMute(note, this.user, this.userProfile.mutedWords)) return;
+
this.connection.cacheNote(note);
this.send('note', note);
diff --git a/packages/backend/src/server/api/stream/channels/local-timeline.ts b/packages/backend/src/server/api/stream/channels/local-timeline.ts
index ca563b5d19..ef708c4fee 100644
--- a/packages/backend/src/server/api/stream/channels/local-timeline.ts
+++ b/packages/backend/src/server/api/stream/channels/local-timeline.ts
@@ -17,6 +17,7 @@ class LocalTimelineChannel extends Channel {
public readonly chName = 'localTimeline';
public static shouldShare = true;
public static requireCredential = false;
+ private withReplies: boolean;
private withRenotes: boolean;
constructor(
@@ -36,6 +37,7 @@ class LocalTimelineChannel extends Channel {
const policies = await this.roleService.getUserPolicies(this.user ? this.user.id : null);
if (!policies.ltlAvailable) return;
+ this.withReplies = params.withReplies ?? false;
this.withRenotes = params.withRenotes ?? true;
// Subscribe events
@@ -62,7 +64,7 @@ class LocalTimelineChannel extends Channel {
}
// 関係ない返信は除外
- if (note.reply && this.user && !this.following[note.userId]?.withReplies) {
+ if (note.reply && this.user && !this.withReplies) {
const reply = note.reply;
// 「チャンネル接続主への返信」でもなければ、「チャンネル接続主が行った返信」でもなければ、「投稿者の投稿者自身への返信」でもない場合
if (reply.userId !== this.user.id && note.userId !== this.user.id && reply.userId !== note.userId) return;
@@ -77,6 +79,13 @@ class LocalTimelineChannel extends Channel {
if (note.renote && !note.text && isUserRelated(note, this.userIdsWhoMeMutingRenotes)) return;
+ // 流れてきたNoteがミュートすべきNoteだったら無視する
+ // TODO: 将来的には、単にMutedNoteテーブルにレコードがあるかどうかで判定したい(以下の理由により難しそうではある)
+ // 現状では、ワードミュートにおけるMutedNoteレコードの追加処理はストリーミングに流す処理と並列で行われるため、
+ // レコードが追加されるNoteでも追加されるより先にここのストリーミングの処理に到達することが起こる。
+ // そのためレコードが存在するかのチェックでは不十分なので、改めてcheckWordMuteを呼んでいる
+ if (this.userProfile && await checkWordMute(note, this.user, this.userProfile.mutedWords)) return;
+
this.connection.cacheNote(note);
this.send('note', note);
diff --git a/packages/backend/src/server/api/stream/channels/user-list.ts b/packages/backend/src/server/api/stream/channels/user-list.ts
index 03f7760d8e..8bbba0b6db 100644
--- a/packages/backend/src/server/api/stream/channels/user-list.ts
+++ b/packages/backend/src/server/api/stream/channels/user-list.ts
@@ -4,7 +4,7 @@
*/
import { Inject, Injectable } from '@nestjs/common';
-import type { MiUserListMembership, UserListMembershipsRepository, UserListsRepository } from '@/models/_.js';
+import type { UserListJoiningsRepository, UserListsRepository } from '@/models/_.js';
import type { MiUser } from '@/models/User.js';
import { isUserRelated } from '@/misc/is-user-related.js';
import type { Packed } from '@/misc/json-schema.js';
@@ -18,12 +18,12 @@ class UserListChannel extends Channel {
public static shouldShare = false;
public static requireCredential = false;
private listId: string;
- public membershipsMap: Record<string, Pick<MiUserListMembership, 'withReplies'> | undefined> = {};
+ public listUsers: MiUser['id'][] = [];
private listUsersClock: NodeJS.Timeout;
constructor(
private userListsRepository: UserListsRepository,
- private userListMembershipsRepository: UserListMembershipsRepository,
+ private userListJoiningsRepository: UserListJoiningsRepository,
private noteEntityService: NoteEntityService,
id: string,
@@ -58,25 +58,19 @@ class UserListChannel extends Channel {
@bindThis
private async updateListUsers() {
- const memberships = await this.userListMembershipsRepository.find({
+ const users = await this.userListJoiningsRepository.find({
where: {
userListId: this.listId,
},
select: ['userId'],
});
- const membershipsMap: Record<string, Pick<MiUserListMembership, 'withReplies'> | undefined> = {};
- for (const membership of memberships) {
- membershipsMap[membership.userId] = {
- withReplies: membership.withReplies,
- };
- }
- this.membershipsMap = membershipsMap;
+ this.listUsers = users.map(x => x.userId);
}
@bindThis
private async onNote(note: Packed<'Note'>) {
- if (!Object.hasOwn(this.membershipsMap, note.userId)) return;
+ if (!this.listUsers.includes(note.userId)) return;
if (['followers', 'specified'].includes(note.visibility)) {
note = await this.noteEntityService.pack(note.id, this.user, {
@@ -101,13 +95,6 @@ class UserListChannel extends Channel {
}
}
- // 関係ない返信は除外
- if (note.reply && !this.membershipsMap[note.userId]?.withReplies) {
- const reply = note.reply;
- // 「チャンネル接続主への返信」でもなければ、「チャンネル接続主が行った返信」でもなければ、「投稿者の投稿者自身への返信」でもない場合
- if (reply.userId !== this.user!.id && note.userId !== this.user!.id && reply.userId !== note.userId) return;
- }
-
// 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する
if (isUserRelated(note, this.userIdsWhoMeMuting)) return;
// 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する
@@ -137,8 +124,8 @@ export class UserListChannelService {
@Inject(DI.userListsRepository)
private userListsRepository: UserListsRepository,
- @Inject(DI.userListMembershipsRepository)
- private userListMembershipsRepository: UserListMembershipsRepository,
+ @Inject(DI.userListJoiningsRepository)
+ private userListJoiningsRepository: UserListJoiningsRepository,
private noteEntityService: NoteEntityService,
) {
@@ -148,7 +135,7 @@ export class UserListChannelService {
public create(id: string, connection: Channel['connection']): UserListChannel {
return new UserListChannel(
this.userListsRepository,
- this.userListMembershipsRepository,
+ this.userListJoiningsRepository,
this.noteEntityService,
id,
connection,