summaryrefslogtreecommitdiff
path: root/packages/backend/src/server
diff options
context:
space:
mode:
authorsyuilo <Syuilotan@yahoo.co.jp>2023-04-05 10:21:10 +0900
committersyuilo <Syuilotan@yahoo.co.jp>2023-04-05 10:21:10 +0900
commitf44504097c360fc84179161abee47b79a936b455 (patch)
treeaec17f0837ccad89754fae24c044ba0b8d7e8def /packages/backend/src/server
parentenhance(backend): チャンネルの既読管理を削除 (diff)
downloadmisskey-f44504097c360fc84179161abee47b79a936b455.tar.gz
misskey-f44504097c360fc84179161abee47b79a936b455.tar.bz2
misskey-f44504097c360fc84179161abee47b79a936b455.zip
enhance(backend): improve cache
Diffstat (limited to 'packages/backend/src/server')
-rw-r--r--packages/backend/src/server/api/StreamingApiServerService.ts21
-rw-r--r--packages/backend/src/server/api/endpoints/admin/accounts/delete.ts5
-rw-r--r--packages/backend/src/server/api/endpoints/admin/suspend-user.ts5
-rw-r--r--packages/backend/src/server/api/endpoints/channels/follow.ts3
-rw-r--r--packages/backend/src/server/api/endpoints/channels/unfollow.ts4
-rw-r--r--packages/backend/src/server/api/endpoints/i/regenerate-token.ts5
-rw-r--r--packages/backend/src/server/api/endpoints/i/revoke-token.ts3
-rw-r--r--packages/backend/src/server/api/endpoints/i/update.ts1
-rw-r--r--packages/backend/src/server/api/endpoints/mute/create.ts21
-rw-r--r--packages/backend/src/server/api/endpoints/mute/delete.ts13
-rw-r--r--packages/backend/src/server/api/endpoints/renote-mute/delete.ts2
-rw-r--r--packages/backend/src/server/api/stream/channel.ts12
-rw-r--r--packages/backend/src/server/api/stream/channels/antenna.ts6
-rw-r--r--packages/backend/src/server/api/stream/channels/channel.ts6
-rw-r--r--packages/backend/src/server/api/stream/channels/global-timeline.ts6
-rw-r--r--packages/backend/src/server/api/stream/channels/hashtag.ts6
-rw-r--r--packages/backend/src/server/api/stream/channels/home-timeline.ts11
-rw-r--r--packages/backend/src/server/api/stream/channels/hybrid-timeline.ts8
-rw-r--r--packages/backend/src/server/api/stream/channels/local-timeline.ts6
-rw-r--r--packages/backend/src/server/api/stream/channels/main.ts4
-rw-r--r--packages/backend/src/server/api/stream/channels/user-list.ts6
-rw-r--r--packages/backend/src/server/api/stream/index.ts185
-rw-r--r--packages/backend/src/server/api/stream/types.ts21
23 files changed, 98 insertions, 262 deletions
diff --git a/packages/backend/src/server/api/StreamingApiServerService.ts b/packages/backend/src/server/api/StreamingApiServerService.ts
index 13526f277d..bd2d436a23 100644
--- a/packages/backend/src/server/api/StreamingApiServerService.ts
+++ b/packages/backend/src/server/api/StreamingApiServerService.ts
@@ -9,6 +9,7 @@ import { NoteReadService } from '@/core/NoteReadService.js';
import { GlobalEventService } from '@/core/GlobalEventService.js';
import { NotificationService } from '@/core/NotificationService.js';
import { bindThis } from '@/decorators.js';
+import { CacheService } from '@/core/CacheService.js';
import { AuthenticateService } from './AuthenticateService.js';
import MainStreamConnection from './stream/index.js';
import { ChannelsService } from './stream/ChannelsService.js';
@@ -45,7 +46,7 @@ export class StreamingApiServerService {
@Inject(DI.userProfilesRepository)
private userProfilesRepository: UserProfilesRepository,
- private globalEventService: GlobalEventService,
+ private cacheService: CacheService,
private noteReadService: NoteReadService,
private authenticateService: AuthenticateService,
private channelsService: ChannelsService,
@@ -73,8 +74,6 @@ export class StreamingApiServerService {
return;
}
- const connection = request.accept();
-
const ev = new EventEmitter();
async function onRedisMessage(_: string, data: string): Promise<void> {
@@ -85,19 +84,19 @@ export class StreamingApiServerService {
this.redisSubscriber.on('message', onRedisMessage);
const main = new MainStreamConnection(
- this.followingsRepository,
- this.mutingsRepository,
- this.renoteMutingsRepository,
- this.blockingsRepository,
- this.channelFollowingsRepository,
- this.userProfilesRepository,
this.channelsService,
- this.globalEventService,
this.noteReadService,
this.notificationService,
- connection, ev, user, miapp,
+ this.cacheService,
+ ev, user, miapp,
);
+ await main.init();
+
+ const connection = request.accept();
+
+ main.init2(connection);
+
const intervalId = user ? setInterval(() => {
this.usersRepository.update(user.id, {
lastActiveDate: new Date(),
diff --git a/packages/backend/src/server/api/endpoints/admin/accounts/delete.ts b/packages/backend/src/server/api/endpoints/admin/accounts/delete.ts
index e9f72676f0..16232813a8 100644
--- a/packages/backend/src/server/api/endpoints/admin/accounts/delete.ts
+++ b/packages/backend/src/server/api/endpoints/admin/accounts/delete.ts
@@ -61,11 +61,6 @@ export default class extends Endpoint<typeof meta, typeof paramDef> {
await this.usersRepository.update(user.id, {
isDeleted: true,
});
-
- if (this.userEntityService.isLocalUser(user)) {
- // Terminate streaming
- this.globalEventService.publishUserEvent(user.id, 'terminate', {});
- }
});
}
}
diff --git a/packages/backend/src/server/api/endpoints/admin/suspend-user.ts b/packages/backend/src/server/api/endpoints/admin/suspend-user.ts
index 770b61850a..3c99225272 100644
--- a/packages/backend/src/server/api/endpoints/admin/suspend-user.ts
+++ b/packages/backend/src/server/api/endpoints/admin/suspend-user.ts
@@ -62,11 +62,6 @@ export default class extends Endpoint<typeof meta, typeof paramDef> {
targetId: user.id,
});
- // Terminate streaming
- if (this.userEntityService.isLocalUser(user)) {
- this.globalEventService.publishUserEvent(user.id, 'terminate', {});
- }
-
(async () => {
await this.userSuspendService.doPostSuspend(user).catch(e => {});
await this.unFollowAll(user).catch(e => {});
diff --git a/packages/backend/src/server/api/endpoints/channels/follow.ts b/packages/backend/src/server/api/endpoints/channels/follow.ts
index 91693918f2..8ab59991c7 100644
--- a/packages/backend/src/server/api/endpoints/channels/follow.ts
+++ b/packages/backend/src/server/api/endpoints/channels/follow.ts
@@ -41,7 +41,6 @@ export default class extends Endpoint<typeof meta, typeof paramDef> {
private channelFollowingsRepository: ChannelFollowingsRepository,
private idService: IdService,
- private globalEventService: GlobalEventService,
) {
super(meta, paramDef, async (ps, me) => {
const channel = await this.channelsRepository.findOneBy({
@@ -58,8 +57,6 @@ export default class extends Endpoint<typeof meta, typeof paramDef> {
followerId: me.id,
followeeId: channel.id,
});
-
- this.globalEventService.publishUserEvent(me.id, 'followChannel', channel);
});
}
}
diff --git a/packages/backend/src/server/api/endpoints/channels/unfollow.ts b/packages/backend/src/server/api/endpoints/channels/unfollow.ts
index ac2ef825be..855ba47f8c 100644
--- a/packages/backend/src/server/api/endpoints/channels/unfollow.ts
+++ b/packages/backend/src/server/api/endpoints/channels/unfollow.ts
@@ -38,8 +38,6 @@ export default class extends Endpoint<typeof meta, typeof paramDef> {
@Inject(DI.channelFollowingsRepository)
private channelFollowingsRepository: ChannelFollowingsRepository,
-
- private globalEventService: GlobalEventService,
) {
super(meta, paramDef, async (ps, me) => {
const channel = await this.channelsRepository.findOneBy({
@@ -54,8 +52,6 @@ export default class extends Endpoint<typeof meta, typeof paramDef> {
followerId: me.id,
followeeId: channel.id,
});
-
- this.globalEventService.publishUserEvent(me.id, 'unfollowChannel', channel);
});
}
}
diff --git a/packages/backend/src/server/api/endpoints/i/regenerate-token.ts b/packages/backend/src/server/api/endpoints/i/regenerate-token.ts
index 786e64374c..23ff63f5e9 100644
--- a/packages/backend/src/server/api/endpoints/i/regenerate-token.ts
+++ b/packages/backend/src/server/api/endpoints/i/regenerate-token.ts
@@ -54,11 +54,6 @@ export default class extends Endpoint<typeof meta, typeof paramDef> {
// Publish event
this.globalEventService.publishInternalEvent('userTokenRegenerated', { id: me.id, oldToken, newToken });
this.globalEventService.publishMainStream(me.id, 'myTokenRegenerated');
-
- // Terminate streaming
- setTimeout(() => {
- this.globalEventService.publishUserEvent(me.id, 'terminate', {});
- }, 5000);
});
}
}
diff --git a/packages/backend/src/server/api/endpoints/i/revoke-token.ts b/packages/backend/src/server/api/endpoints/i/revoke-token.ts
index 5e1dddb6b7..93daeb0cd7 100644
--- a/packages/backend/src/server/api/endpoints/i/revoke-token.ts
+++ b/packages/backend/src/server/api/endpoints/i/revoke-token.ts
@@ -35,9 +35,6 @@ export default class extends Endpoint<typeof meta, typeof paramDef> {
id: ps.tokenId,
userId: me.id,
});
-
- // Terminate streaming
- this.globalEventService.publishUserEvent(me.id, 'terminate');
}
});
}
diff --git a/packages/backend/src/server/api/endpoints/i/update.ts b/packages/backend/src/server/api/endpoints/i/update.ts
index 46b16e9dce..c20f2b7913 100644
--- a/packages/backend/src/server/api/endpoints/i/update.ts
+++ b/packages/backend/src/server/api/endpoints/i/update.ts
@@ -284,7 +284,6 @@ export default class extends Endpoint<typeof meta, typeof paramDef> {
// Publish meUpdated event
this.globalEventService.publishMainStream(user.id, 'meUpdated', iObj);
- this.globalEventService.publishUserEvent(user.id, 'updateUserProfile', updatedProfile);
// 鍵垢を解除したとき、溜まっていたフォローリクエストがあるならすべて承認
if (user.isLocked && ps.isLocked === false) {
diff --git a/packages/backend/src/server/api/endpoints/mute/create.ts b/packages/backend/src/server/api/endpoints/mute/create.ts
index fd062e1cab..6e24e1024d 100644
--- a/packages/backend/src/server/api/endpoints/mute/create.ts
+++ b/packages/backend/src/server/api/endpoints/mute/create.ts
@@ -1,13 +1,10 @@
import { Inject, Injectable } from '@nestjs/common';
import ms from 'ms';
import { Endpoint } from '@/server/api/endpoint-base.js';
-import { IdService } from '@/core/IdService.js';
import type { MutingsRepository } from '@/models/index.js';
-import type { Muting } from '@/models/entities/Muting.js';
-import { GlobalEventService } from '@/core/GlobalEventService.js';
import { DI } from '@/di-symbols.js';
import { GetterService } from '@/server/api/GetterService.js';
-import { CacheService } from '@/core/CacheService.js';
+import { UserMutingService } from '@/core/UserMutingService.js';
import { ApiError } from '../../error.js';
export const meta = {
@@ -63,10 +60,8 @@ export default class extends Endpoint<typeof meta, typeof paramDef> {
@Inject(DI.mutingsRepository)
private mutingsRepository: MutingsRepository,
- private globalEventService: GlobalEventService,
private getterService: GetterService,
- private idService: IdService,
- private cacheService: CacheService,
+ private userMutingService: UserMutingService,
) {
super(meta, paramDef, async (ps, me) => {
const muter = me;
@@ -96,17 +91,7 @@ export default class extends Endpoint<typeof meta, typeof paramDef> {
return;
}
- // Create mute
- await this.mutingsRepository.insert({
- id: this.idService.genId(),
- createdAt: new Date(),
- expiresAt: ps.expiresAt ? new Date(ps.expiresAt) : null,
- muterId: muter.id,
- muteeId: mutee.id,
- } as Muting);
-
- this.cacheService.userMutingsCache.delete(muter.id);
- this.globalEventService.publishUserEvent(me.id, 'mute', mutee);
+ await this.userMutingService.mute(muter, mutee, ps.expiresAt ? new Date(ps.expiresAt) : null);
});
}
}
diff --git a/packages/backend/src/server/api/endpoints/mute/delete.ts b/packages/backend/src/server/api/endpoints/mute/delete.ts
index 612c4a4c04..90b74590be 100644
--- a/packages/backend/src/server/api/endpoints/mute/delete.ts
+++ b/packages/backend/src/server/api/endpoints/mute/delete.ts
@@ -1,10 +1,10 @@
import { Inject, Injectable } from '@nestjs/common';
import { Endpoint } from '@/server/api/endpoint-base.js';
import type { MutingsRepository } from '@/models/index.js';
-import { GlobalEventService } from '@/core/GlobalEventService.js';
import { DI } from '@/di-symbols.js';
-import { ApiError } from '../../error.js';
import { GetterService } from '@/server/api/GetterService.js';
+import { UserMutingService } from '@/core/UserMutingService.js';
+import { ApiError } from '../../error.js';
export const meta = {
tags: ['account'],
@@ -49,7 +49,7 @@ export default class extends Endpoint<typeof meta, typeof paramDef> {
@Inject(DI.mutingsRepository)
private mutingsRepository: MutingsRepository,
- private globalEventService: GlobalEventService,
+ private userMutingService: UserMutingService,
private getterService: GetterService,
) {
super(meta, paramDef, async (ps, me) => {
@@ -76,12 +76,7 @@ export default class extends Endpoint<typeof meta, typeof paramDef> {
throw new ApiError(meta.errors.notMuting);
}
- // Delete mute
- await this.mutingsRepository.delete({
- id: exist.id,
- });
-
- this.globalEventService.publishUserEvent(me.id, 'unmute', mutee);
+ await this.userMutingService.unmute([exist]);
});
}
}
diff --git a/packages/backend/src/server/api/endpoints/renote-mute/delete.ts b/packages/backend/src/server/api/endpoints/renote-mute/delete.ts
index 51a895fb7e..70901a1406 100644
--- a/packages/backend/src/server/api/endpoints/renote-mute/delete.ts
+++ b/packages/backend/src/server/api/endpoints/renote-mute/delete.ts
@@ -80,8 +80,6 @@ export default class extends Endpoint<typeof meta, typeof paramDef> {
await this.renoteMutingsRepository.delete({
id: exist.id,
});
-
- // publishUserEvent(user.id, 'unmute', mutee);
});
}
}
diff --git a/packages/backend/src/server/api/stream/channel.ts b/packages/backend/src/server/api/stream/channel.ts
index 32935325aa..e67aec9ecd 100644
--- a/packages/backend/src/server/api/stream/channel.ts
+++ b/packages/backend/src/server/api/stream/channel.ts
@@ -23,16 +23,16 @@ export default abstract class Channel {
return this.connection.following;
}
- protected get muting() {
- return this.connection.muting;
+ protected get userIdsWhoMeMuting() {
+ return this.connection.userIdsWhoMeMuting;
}
- protected get renoteMuting() {
- return this.connection.renoteMuting;
+ protected get userIdsWhoMeMutingRenotes() {
+ return this.connection.userIdsWhoMeMutingRenotes;
}
- protected get blocking() {
- return this.connection.blocking;
+ protected get userIdsWhoBlockingMe() {
+ return this.connection.userIdsWhoBlockingMe;
}
protected get followingChannels() {
diff --git a/packages/backend/src/server/api/stream/channels/antenna.ts b/packages/backend/src/server/api/stream/channels/antenna.ts
index e2a42fbfe9..d48dea7258 100644
--- a/packages/backend/src/server/api/stream/channels/antenna.ts
+++ b/packages/backend/src/server/api/stream/channels/antenna.ts
@@ -35,11 +35,11 @@ class AntennaChannel extends Channel {
const note = await this.noteEntityService.pack(data.body.id, this.user, { detail: true });
// 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する
- if (isUserRelated(note, this.muting)) return;
+ if (isUserRelated(note, this.userIdsWhoMeMuting)) return;
// 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する
- if (isUserRelated(note, this.blocking)) return;
+ if (isUserRelated(note, this.userIdsWhoBlockingMe)) return;
- if (note.renote && !note.text && isUserRelated(note, this.renoteMuting)) return;
+ if (note.renote && !note.text && isUserRelated(note, this.userIdsWhoMeMutingRenotes)) return;
this.connection.cacheNote(note);
diff --git a/packages/backend/src/server/api/stream/channels/channel.ts b/packages/backend/src/server/api/stream/channels/channel.ts
index 12caa7f233..9e5b40997b 100644
--- a/packages/backend/src/server/api/stream/channels/channel.ts
+++ b/packages/backend/src/server/api/stream/channels/channel.ts
@@ -47,11 +47,11 @@ class ChannelChannel extends Channel {
}
// 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する
- if (isUserRelated(note, this.muting)) return;
+ if (isUserRelated(note, this.userIdsWhoMeMuting)) return;
// 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する
- if (isUserRelated(note, this.blocking)) return;
+ if (isUserRelated(note, this.userIdsWhoBlockingMe)) return;
- if (note.renote && !note.text && isUserRelated(note, this.renoteMuting)) return;
+ if (note.renote && !note.text && isUserRelated(note, this.userIdsWhoMeMutingRenotes)) return;
this.connection.cacheNote(note);
diff --git a/packages/backend/src/server/api/stream/channels/global-timeline.ts b/packages/backend/src/server/api/stream/channels/global-timeline.ts
index d79247cd6e..5454836fe1 100644
--- a/packages/backend/src/server/api/stream/channels/global-timeline.ts
+++ b/packages/backend/src/server/api/stream/channels/global-timeline.ts
@@ -64,11 +64,11 @@ class GlobalTimelineChannel extends Channel {
if (isInstanceMuted(note, new Set<string>(this.userProfile?.mutedInstances ?? []))) return;
// 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する
- if (isUserRelated(note, this.muting)) return;
+ if (isUserRelated(note, this.userIdsWhoMeMuting)) return;
// 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する
- if (isUserRelated(note, this.blocking)) return;
+ if (isUserRelated(note, this.userIdsWhoBlockingMe)) return;
- if (note.renote && !note.text && isUserRelated(note, this.renoteMuting)) return;
+ if (note.renote && !note.text && isUserRelated(note, this.userIdsWhoMeMutingRenotes)) return;
// 流れてきたNoteがミュートすべきNoteだったら無視する
// TODO: 将来的には、単にMutedNoteテーブルにレコードがあるかどうかで判定したい(以下の理由により難しそうではある)
diff --git a/packages/backend/src/server/api/stream/channels/hashtag.ts b/packages/backend/src/server/api/stream/channels/hashtag.ts
index 98dc858ded..0268fdedde 100644
--- a/packages/backend/src/server/api/stream/channels/hashtag.ts
+++ b/packages/backend/src/server/api/stream/channels/hashtag.ts
@@ -46,11 +46,11 @@ class HashtagChannel extends Channel {
}
// 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する
- if (isUserRelated(note, this.muting)) return;
+ if (isUserRelated(note, this.userIdsWhoMeMuting)) return;
// 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する
- if (isUserRelated(note, this.blocking)) return;
+ if (isUserRelated(note, this.userIdsWhoBlockingMe)) return;
- if (note.renote && !note.text && isUserRelated(note, this.renoteMuting)) return;
+ if (note.renote && !note.text && isUserRelated(note, this.userIdsWhoMeMutingRenotes)) return;
this.connection.cacheNote(note);
diff --git a/packages/backend/src/server/api/stream/channels/home-timeline.ts b/packages/backend/src/server/api/stream/channels/home-timeline.ts
index c623fef64a..ee874ad81e 100644
--- a/packages/backend/src/server/api/stream/channels/home-timeline.ts
+++ b/packages/backend/src/server/api/stream/channels/home-timeline.ts
@@ -24,7 +24,6 @@ class HomeTimelineChannel extends Channel {
@bindThis
public async init(params: any) {
- // Subscribe events
this.subscriber.on('notesStream', this.onNote);
}
@@ -38,7 +37,7 @@ class HomeTimelineChannel extends Channel {
}
// Ignore notes from instances the user has muted
- if (isInstanceMuted(note, new Set<string>(this.userProfile?.mutedInstances ?? []))) return;
+ if (isInstanceMuted(note, new Set<string>(this.userProfile!.mutedInstances ?? []))) return;
if (['followers', 'specified'].includes(note.visibility)) {
note = await this.noteEntityService.pack(note.id, this.user!, {
@@ -71,18 +70,18 @@ class HomeTimelineChannel extends Channel {
}
// 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する
- if (isUserRelated(note, this.muting)) return;
+ if (isUserRelated(note, this.userIdsWhoMeMuting)) return;
// 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する
- if (isUserRelated(note, this.blocking)) return;
+ if (isUserRelated(note, this.userIdsWhoBlockingMe)) return;
- if (note.renote && !note.text && isUserRelated(note, this.renoteMuting)) return;
+ if (note.renote && !note.text && isUserRelated(note, this.userIdsWhoMeMutingRenotes)) return;
// 流れてきたNoteがミュートすべきNoteだったら無視する
// TODO: 将来的には、単にMutedNoteテーブルにレコードがあるかどうかで判定したい(以下の理由により難しそうではある)
// 現状では、ワードミュートにおけるMutedNoteレコードの追加処理はストリーミングに流す処理と並列で行われるため、
// レコードが追加されるNoteでも追加されるより先にここのストリーミングの処理に到達することが起こる。
// そのためレコードが存在するかのチェックでは不十分なので、改めてcheckWordMuteを呼んでいる
- if (this.userProfile && await checkWordMute(note, this.user, this.userProfile.mutedWords)) return;
+ if (await checkWordMute(note, this.user, this.userProfile!.mutedWords)) return;
this.connection.cacheNote(note);
diff --git a/packages/backend/src/server/api/stream/channels/hybrid-timeline.ts b/packages/backend/src/server/api/stream/channels/hybrid-timeline.ts
index f54767bc9d..4f7b4e78b6 100644
--- a/packages/backend/src/server/api/stream/channels/hybrid-timeline.ts
+++ b/packages/backend/src/server/api/stream/channels/hybrid-timeline.ts
@@ -72,7 +72,7 @@ class HybridTimelineChannel extends Channel {
}
// Ignore notes from instances the user has muted
- if (isInstanceMuted(note, new Set<string>(this.userProfile?.mutedInstances ?? []))) return;
+ if (isInstanceMuted(note, new Set<string>(this.userProfile!.mutedInstances ?? []))) return;
// 関係ない返信は除外
if (note.reply && !this.user!.showTimelineReplies) {
@@ -82,11 +82,11 @@ class HybridTimelineChannel extends Channel {
}
// 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する
- if (isUserRelated(note, this.muting)) return;
+ if (isUserRelated(note, this.userIdsWhoMeMuting)) return;
// 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する
- if (isUserRelated(note, this.blocking)) return;
+ if (isUserRelated(note, this.userIdsWhoBlockingMe)) return;
- if (note.renote && !note.text && isUserRelated(note, this.renoteMuting)) return;
+ if (note.renote && !note.text && isUserRelated(note, this.userIdsWhoMeMutingRenotes)) return;
// 流れてきたNoteがミュートすべきNoteだったら無視する
// TODO: 将来的には、単にMutedNoteテーブルにレコードがあるかどうかで判定したい(以下の理由により難しそうではある)
diff --git a/packages/backend/src/server/api/stream/channels/local-timeline.ts b/packages/backend/src/server/api/stream/channels/local-timeline.ts
index eb0642900d..836c5aae6c 100644
--- a/packages/backend/src/server/api/stream/channels/local-timeline.ts
+++ b/packages/backend/src/server/api/stream/channels/local-timeline.ts
@@ -61,11 +61,11 @@ class LocalTimelineChannel extends Channel {
}
// 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する
- if (isUserRelated(note, this.muting)) return;
+ if (isUserRelated(note, this.userIdsWhoMeMuting)) return;
// 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する
- if (isUserRelated(note, this.blocking)) return;
+ if (isUserRelated(note, this.userIdsWhoBlockingMe)) return;
- if (note.renote && !note.text && isUserRelated(note, this.renoteMuting)) return;
+ if (note.renote && !note.text && isUserRelated(note, this.userIdsWhoMeMutingRenotes)) return;
// 流れてきたNoteがミュートすべきNoteだったら無視する
// TODO: 将来的には、単にMutedNoteテーブルにレコードがあるかどうかで判定したい(以下の理由により難しそうではある)
diff --git a/packages/backend/src/server/api/stream/channels/main.ts b/packages/backend/src/server/api/stream/channels/main.ts
index 4dd16b530a..139320ce35 100644
--- a/packages/backend/src/server/api/stream/channels/main.ts
+++ b/packages/backend/src/server/api/stream/channels/main.ts
@@ -26,7 +26,7 @@ class MainChannel extends Channel {
case 'notification': {
// Ignore notifications from instances the user has muted
if (isUserFromMutedInstance(data.body, new Set<string>(this.userProfile?.mutedInstances ?? []))) return;
- if (data.body.userId && this.muting.has(data.body.userId)) return;
+ if (data.body.userId && this.userIdsWhoMeMuting.has(data.body.userId)) return;
if (data.body.note && data.body.note.isHidden) {
const note = await this.noteEntityService.pack(data.body.note.id, this.user, {
@@ -40,7 +40,7 @@ class MainChannel extends Channel {
case 'mention': {
if (isInstanceMuted(data.body, new Set<string>(this.userProfile?.mutedInstances ?? []))) return;
- if (this.muting.has(data.body.userId)) return;
+ if (this.userIdsWhoMeMuting.has(data.body.userId)) return;
if (data.body.isHidden) {
const note = await this.noteEntityService.pack(data.body.id, this.user, {
detail: true,
diff --git a/packages/backend/src/server/api/stream/channels/user-list.ts b/packages/backend/src/server/api/stream/channels/user-list.ts
index 8a42e99a54..8802fc5ab8 100644
--- a/packages/backend/src/server/api/stream/channels/user-list.ts
+++ b/packages/backend/src/server/api/stream/channels/user-list.ts
@@ -89,11 +89,11 @@ class UserListChannel extends Channel {
}
// 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する
- if (isUserRelated(note, this.muting)) return;
+ if (isUserRelated(note, this.userIdsWhoMeMuting)) return;
// 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する
- if (isUserRelated(note, this.blocking)) return;
+ if (isUserRelated(note, this.userIdsWhoBlockingMe)) return;
- if (note.renote && !note.text && isUserRelated(note, this.renoteMuting)) return;
+ if (note.renote && !note.text && isUserRelated(note, this.userIdsWhoMeMutingRenotes)) return;
this.send('note', note);
}
diff --git a/packages/backend/src/server/api/stream/index.ts b/packages/backend/src/server/api/stream/index.ts
index 2f473cd012..a6f9145952 100644
--- a/packages/backend/src/server/api/stream/index.ts
+++ b/packages/backend/src/server/api/stream/index.ts
@@ -1,13 +1,11 @@
import type { User } from '@/models/entities/User.js';
-import type { Channel as ChannelModel } from '@/models/entities/Channel.js';
-import type { FollowingsRepository, MutingsRepository, RenoteMutingsRepository, UserProfilesRepository, ChannelFollowingsRepository, BlockingsRepository } from '@/models/index.js';
import type { AccessToken } from '@/models/entities/AccessToken.js';
-import type { UserProfile } from '@/models/entities/UserProfile.js';
import type { Packed } from '@/misc/json-schema.js';
-import type { GlobalEventService } from '@/core/GlobalEventService.js';
import type { NoteReadService } from '@/core/NoteReadService.js';
import type { NotificationService } from '@/core/NotificationService.js';
import { bindThis } from '@/decorators.js';
+import { CacheService } from '@/core/CacheService.js';
+import { UserProfile } from '@/models/index.js';
import type { ChannelsService } from './ChannelsService.js';
import type * as websocket from 'websocket';
import type { EventEmitter } from 'events';
@@ -19,106 +17,71 @@ import type { StreamEventEmitter, StreamMessages } from './types.js';
*/
export default class Connection {
public user?: User;
- public userProfile?: UserProfile | null;
- public following: Set<User['id']> = new Set();
- public muting: Set<User['id']> = new Set();
- public renoteMuting: Set<User['id']> = new Set();
- public blocking: Set<User['id']> = new Set(); // "被"blocking
- public followingChannels: Set<ChannelModel['id']> = new Set();
public token?: AccessToken;
private wsConnection: websocket.connection;
public subscriber: StreamEventEmitter;
private channels: Channel[] = [];
private subscribingNotes: any = {};
private cachedNotes: Packed<'Note'>[] = [];
+ public userProfile: UserProfile | null = null;
+ public following: Set<string> = new Set();
+ public followingChannels: Set<string> = new Set();
+ public userIdsWhoMeMuting: Set<string> = new Set();
+ public userIdsWhoBlockingMe: Set<string> = new Set();
+ public userIdsWhoMeMutingRenotes: Set<string> = new Set();
+ private fetchIntervalId: NodeJS.Timer | null = null;
constructor(
- private followingsRepository: FollowingsRepository,
- private mutingsRepository: MutingsRepository,
- private renoteMutingsRepository: RenoteMutingsRepository,
- private blockingsRepository: BlockingsRepository,
- private channelFollowingsRepository: ChannelFollowingsRepository,
- private userProfilesRepository: UserProfilesRepository,
private channelsService: ChannelsService,
- private globalEventService: GlobalEventService,
private noteReadService: NoteReadService,
private notificationService: NotificationService,
+ private cacheService: CacheService,
- wsConnection: websocket.connection,
subscriber: EventEmitter,
user: User | null | undefined,
token: AccessToken | null | undefined,
) {
- this.wsConnection = wsConnection;
this.subscriber = subscriber;
if (user) this.user = user;
if (token) this.token = token;
+ }
- //this.onWsConnectionMessage = this.onWsConnectionMessage.bind(this);
- //this.onUserEvent = this.onUserEvent.bind(this);
- //this.onNoteStreamMessage = this.onNoteStreamMessage.bind(this);
- //this.onBroadcastMessage = this.onBroadcastMessage.bind(this);
-
- this.wsConnection.on('message', this.onWsConnectionMessage);
-
- this.subscriber.on('broadcast', data => {
- this.onBroadcastMessage(data);
- });
+ @bindThis
+ public async fetch() {
+ if (this.user == null) return;
+ const [userProfile, following, followingChannels, userIdsWhoMeMuting, userIdsWhoBlockingMe, userIdsWhoMeMutingRenotes] = await Promise.all([
+ this.cacheService.userProfileCache.fetch(this.user.id),
+ this.cacheService.userFollowingsCache.fetch(this.user.id),
+ this.cacheService.userFollowingChannelsCache.fetch(this.user.id),
+ this.cacheService.userMutingsCache.fetch(this.user.id),
+ this.cacheService.userBlockedCache.fetch(this.user.id),
+ this.cacheService.renoteMutingsCache.fetch(this.user.id),
+ ]);
+ this.userProfile = userProfile;
+ this.following = following;
+ this.followingChannels = followingChannels;
+ this.userIdsWhoMeMuting = userIdsWhoMeMuting;
+ this.userIdsWhoBlockingMe = userIdsWhoBlockingMe;
+ this.userIdsWhoMeMutingRenotes = userIdsWhoMeMutingRenotes;
+ }
- if (this.user) {
- this.updateFollowing();
- this.updateMuting();
- this.updateRenoteMuting();
- this.updateBlocking();
- this.updateFollowingChannels();
- this.updateUserProfile();
+ @bindThis
+ public async init() {
+ if (this.user != null) {
+ await this.fetch();
- this.subscriber.on(`user:${this.user.id}`, this.onUserEvent);
+ this.fetchIntervalId = setInterval(this.fetch, 1000 * 10);
}
}
@bindThis
- private onUserEvent(data: StreamMessages['user']['payload']) { // { type, body }と展開するとそれぞれ型が分離してしまう
- switch (data.type) {
- case 'follow':
- this.following.add(data.body.id);
- break;
-
- case 'unfollow':
- this.following.delete(data.body.id);
- break;
-
- case 'mute':
- this.muting.add(data.body.id);
- break;
-
- case 'unmute':
- this.muting.delete(data.body.id);
- break;
-
- // TODO: renote mute events
- // TODO: block events
-
- case 'followChannel':
- this.followingChannels.add(data.body.id);
- break;
-
- case 'unfollowChannel':
- this.followingChannels.delete(data.body.id);
- break;
-
- case 'updateUserProfile':
- this.userProfile = data.body;
- break;
-
- case 'terminate':
- this.wsConnection.close();
- this.dispose();
- break;
+ public async init2(wsConnection: websocket.connection) {
+ this.wsConnection = wsConnection;
+ this.wsConnection.on('message', this.onWsConnectionMessage);
- default:
- break;
- }
+ this.subscriber.on('broadcast', data => {
+ this.onBroadcastMessage(data);
+ });
}
/**
@@ -318,78 +281,12 @@ export default class Connection {
}
}
- @bindThis
- private async updateFollowing() {
- const followings = await this.followingsRepository.find({
- where: {
- followerId: this.user!.id,
- },
- select: ['followeeId'],
- });
-
- this.following = new Set<string>(followings.map(x => x.followeeId));
- }
-
- @bindThis
- private async updateMuting() {
- const mutings = await this.mutingsRepository.find({
- where: {
- muterId: this.user!.id,
- },
- select: ['muteeId'],
- });
-
- this.muting = new Set<string>(mutings.map(x => x.muteeId));
- }
-
- @bindThis
- private async updateRenoteMuting() {
- const renoteMutings = await this.renoteMutingsRepository.find({
- where: {
- muterId: this.user!.id,
- },
- select: ['muteeId'],
- });
-
- this.renoteMuting = new Set<string>(renoteMutings.map(x => x.muteeId));
- }
-
- @bindThis
- private async updateBlocking() { // ここでいうBlockingは被Blockingの意
- const blockings = await this.blockingsRepository.find({
- where: {
- blockeeId: this.user!.id,
- },
- select: ['blockerId'],
- });
-
- this.blocking = new Set<string>(blockings.map(x => x.blockerId));
- }
-
- @bindThis
- private async updateFollowingChannels() {
- const followings = await this.channelFollowingsRepository.find({
- where: {
- followerId: this.user!.id,
- },
- select: ['followeeId'],
- });
-
- this.followingChannels = new Set<string>(followings.map(x => x.followeeId));
- }
-
- @bindThis
- private async updateUserProfile() {
- this.userProfile = await this.userProfilesRepository.findOneBy({
- userId: this.user!.id,
- });
- }
-
/**
* ストリームが切れたとき
*/
@bindThis
public dispose() {
+ if (this.fetchIntervalId) clearInterval(this.fetchIntervalId);
for (const c of this.channels.filter(c => c.dispose)) {
if (c.dispose) c.dispose();
}
diff --git a/packages/backend/src/server/api/stream/types.ts b/packages/backend/src/server/api/stream/types.ts
index f4eedc3964..ed73897e73 100644
--- a/packages/backend/src/server/api/stream/types.ts
+++ b/packages/backend/src/server/api/stream/types.ts
@@ -38,6 +38,11 @@ export interface InternalStreamTypes {
antennaDeleted: Antenna;
antennaUpdated: Antenna;
metaUpdated: Meta;
+ followChannel: { userId: User['id']; channelId: Channel['id']; };
+ unfollowChannel: { userId: User['id']; channelId: Channel['id']; };
+ updateUserProfile: UserProfile;
+ mute: { muterId: User['id']; muteeId: User['id']; };
+ unmute: { muterId: User['id']; muteeId: User['id']; };
}
export interface BroadcastTypes {
@@ -56,18 +61,6 @@ export interface BroadcastTypes {
};
}
-export interface UserStreamTypes {
- terminate: Record<string, unknown>;
- followChannel: Channel;
- unfollowChannel: Channel;
- updateUserProfile: UserProfile;
- mute: User;
- unmute: User;
- follow: Packed<'UserDetailedNotMe'>;
- unfollow: Packed<'User'>;
- userAdded: Packed<'User'>;
-}
-
export interface MainStreamTypes {
notification: Packed<'Notification'>;
mention: Packed<'Note'>;
@@ -200,10 +193,6 @@ export type StreamMessages = {
name: 'broadcast';
payload: EventUnionFromDictionary<SerializedAll<BroadcastTypes>>;
};
- user: {
- name: `user:${User['id']}`;
- payload: EventUnionFromDictionary<SerializedAll<UserStreamTypes>>;
- };
main: {
name: `mainStream:${User['id']}`;
payload: EventUnionFromDictionary<SerializedAll<MainStreamTypes>>;