From a87930542a2975af05799adc5250771a08301811 Mon Sep 17 00:00:00 2001 From: Zero King Date: Fri, 12 Mar 2021 17:07:18 +0000 Subject: Fix typo (#7334) --- src/services/following/create.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/services') diff --git a/src/services/following/create.ts b/src/services/following/create.ts index c0583cdb86..6bc98aee87 100644 --- a/src/services/following/create.ts +++ b/src/services/following/create.ts @@ -93,7 +93,7 @@ export async function insertFollowingDoc(followee: User, follower: User) { // Publish followed event if (Users.isLocalUser(followee)) { - Users.pack(follower, followee).then(packed => publishMainStream(followee.id, 'followed', packed)), + Users.pack(follower, followee).then(packed => publishMainStream(followee.id, 'followed', packed)); // 通知を作成 createNotification(followee.id, 'follow', { -- cgit v1.2.3-freya From 8aa089178a54559cbc4e4fe84a618fc7535f178c Mon Sep 17 00:00:00 2001 From: syuilo Date: Thu, 18 Mar 2021 10:49:14 +0900 Subject: Improve server performance --- src/misc/cache.ts | 25 +++++++++++++++++++++++++ src/queue/processors/deliver.ts | 19 +++++++++++++------ src/server/api/authenticate.ts | 12 ++++++++---- src/services/register-or-fetch-instance-doc.ts | 8 ++++++++ 4 files changed, 54 insertions(+), 10 deletions(-) create mode 100644 src/misc/cache.ts (limited to 'src/services') diff --git a/src/misc/cache.ts b/src/misc/cache.ts new file mode 100644 index 0000000000..356a3de7b9 --- /dev/null +++ b/src/misc/cache.ts @@ -0,0 +1,25 @@ +export class Cache { + private cache: Map; + private lifetime: number; + + constructor(lifetime: Cache['lifetime']) { + this.lifetime = lifetime; + } + + public set(key: string | null, value: T):void { + this.cache.set(key, { + date: Date.now(), + value + }); + } + + public get(key: string | null): T | null { + const cached = this.cache.get(key); + if (cached == null) return null; + if ((Date.now() - cached.date) > this.lifetime) { + this.cache.delete(key); + return null; + } + return cached.value; + } +} diff --git a/src/queue/processors/deliver.ts b/src/queue/processors/deliver.ts index cb7587ef81..a8b4ed4fe3 100644 --- a/src/queue/processors/deliver.ts +++ b/src/queue/processors/deliver.ts @@ -7,11 +7,15 @@ import { instanceChart } from '../../services/chart'; import { fetchInstanceMetadata } from '../../services/fetch-instance-metadata'; import { fetchMeta } from '../../misc/fetch-meta'; import { toPuny } from '../../misc/convert-host'; +import { Cache } from '../../misc/cache'; +import { Instance } from '../../models/entities/instance'; const logger = new Logger('deliver'); let latest: string | null = null; +const suspendedHostsCache = new Cache(1000 * 60 * 60); + export default async (job: Bull.Job) => { const { host } = new URL(job.data.to); @@ -22,12 +26,15 @@ export default async (job: Bull.Job) => { } // isSuspendedなら中断 - const suspendedHosts = await Instances.find({ - where: { - isSuspended: true - }, - cache: 60 * 1000 - }); + let suspendedHosts = suspendedHostsCache.get(null); + if (suspendedHosts == null) { + suspendedHosts = await Instances.find({ + where: { + isSuspended: true + }, + }); + suspendedHostsCache.set(null, suspendedHosts); + } if (suspendedHosts.map(x => x.host).includes(toPuny(host))) { return 'skip (suspended)'; } diff --git a/src/server/api/authenticate.ts b/src/server/api/authenticate.ts index 7404c477fd..9c9ef74352 100644 --- a/src/server/api/authenticate.ts +++ b/src/server/api/authenticate.ts @@ -2,8 +2,11 @@ import isNativeToken from './common/is-native-token'; import { User } from '../../models/entities/user'; import { Users, AccessTokens, Apps } from '../../models'; import { AccessToken } from '../../models/entities/access-token'; +import { Cache } from '../../misc/cache'; -const cache = {} as Record; +// TODO: TypeORMのカスタムキャッシュプロバイダを使っても良いかも +// ref. https://github.com/typeorm/typeorm/blob/master/docs/caching.md +const cache = new Cache(1000 * 60 * 60); export default async (token: string): Promise<[User | null | undefined, AccessToken | null | undefined]> => { if (token == null) { @@ -11,8 +14,9 @@ export default async (token: string): Promise<[User | null | undefined, AccessTo } if (isNativeToken(token)) { - if (cache[token]) { // TODO: キャッシュされてから一定時間経過していたら破棄する - return [cache[token], null]; + const cached = cache.get(token); + if (cached) { + return [cached, null]; } // Fetch user @@ -23,7 +27,7 @@ export default async (token: string): Promise<[User | null | undefined, AccessTo throw new Error('user not found'); } - cache[token] = user; + cache.set(token, user); return [user, null]; } else { diff --git a/src/services/register-or-fetch-instance-doc.ts b/src/services/register-or-fetch-instance-doc.ts index 3501e20de1..2c39502288 100644 --- a/src/services/register-or-fetch-instance-doc.ts +++ b/src/services/register-or-fetch-instance-doc.ts @@ -3,10 +3,16 @@ import { Instances } from '../models'; import { federationChart } from './chart'; import { genId } from '../misc/gen-id'; import { toPuny } from '../misc/convert-host'; +import { Cache } from '../misc/cache'; + +const cache = new Cache(1000 * 60 * 60); export async function registerOrFetchInstanceDoc(host: string): Promise { host = toPuny(host); + const cached = cache.get(host); + if (cached) return cached; + const index = await Instances.findOne({ host }); if (index == null) { @@ -19,8 +25,10 @@ export async function registerOrFetchInstanceDoc(host: string): Promise Date: Thu, 18 Mar 2021 11:17:05 +0900 Subject: Improve chart performance (#7360) * wip * wip * wip * wip * wip * Update chart.ts * wip * Improve server performance * wip * wip --- migration/1615965918224-chart-v2.ts | 218 ++++++++++++++++ migration/1615966519402-chart-v2-2.ts | 22 ++ package.json | 2 +- src/daemons/queue-stats.ts | 2 +- src/db/postgre.ts | 4 + src/global.d.ts | 1 + src/misc/before-shutdown.ts | 88 +++++++ src/queue/index.ts | 23 +- src/queue/initialize.ts | 18 ++ src/queue/queues.ts | 7 + src/services/chart/charts/classes/active-users.ts | 18 +- src/services/chart/charts/classes/drive.ts | 22 ++ src/services/chart/charts/classes/federation.ts | 11 + src/services/chart/charts/classes/hashtag.ts | 18 +- src/services/chart/charts/classes/instance.ts | 44 ++++ src/services/chart/charts/classes/network.ts | 11 + src/services/chart/charts/classes/notes.ts | 26 ++ .../chart/charts/classes/per-user-drive.ts | 12 + .../chart/charts/classes/per-user-following.ts | 30 +++ .../chart/charts/classes/per-user-notes.ts | 14 + .../chart/charts/classes/per-user-reactions.ts | 12 + src/services/chart/charts/classes/test-grouped.ts | 11 + src/services/chart/charts/classes/test-unique.ts | 13 +- src/services/chart/charts/classes/test.ts | 11 + src/services/chart/charts/classes/users.ts | 16 ++ src/services/chart/charts/schemas/active-users.ts | 12 +- src/services/chart/charts/schemas/hashtag.ts | 12 +- src/services/chart/charts/schemas/test-unique.ts | 7 +- src/services/chart/core.ts | 282 ++++++++++++--------- src/services/chart/index.ts | 25 ++ test/chart.ts | 54 +++- yarn.lock | 8 +- 32 files changed, 891 insertions(+), 163 deletions(-) create mode 100644 migration/1615965918224-chart-v2.ts create mode 100644 migration/1615966519402-chart-v2-2.ts create mode 100644 src/global.d.ts create mode 100644 src/misc/before-shutdown.ts create mode 100644 src/queue/initialize.ts create mode 100644 src/queue/queues.ts (limited to 'src/services') diff --git a/migration/1615965918224-chart-v2.ts b/migration/1615965918224-chart-v2.ts new file mode 100644 index 0000000000..cacbd1945b --- /dev/null +++ b/migration/1615965918224-chart-v2.ts @@ -0,0 +1,218 @@ +import {MigrationInterface, QueryRunner} from "typeorm"; + +export class chartV21615965918224 implements MigrationInterface { + name = 'chartV21615965918224' + + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query(`DELETE FROM "__chart__active_users" WHERE "span" = 'day'`); + await queryRunner.query(`DELETE FROM "__chart__drive" WHERE "span" = 'day'`); + await queryRunner.query(`DELETE FROM "__chart__federation" WHERE "span" = 'day'`); + await queryRunner.query(`DELETE FROM "__chart__hashtag" WHERE "span" = 'day'`); + await queryRunner.query(`DELETE FROM "__chart__instance" WHERE "span" = 'day'`); + await queryRunner.query(`DELETE FROM "__chart__network" WHERE "span" = 'day'`); + await queryRunner.query(`DELETE FROM "__chart__notes" WHERE "span" = 'day'`); + await queryRunner.query(`DELETE FROM "__chart__per_user_drive" WHERE "span" = 'day'`); + await queryRunner.query(`DELETE FROM "__chart__per_user_following" WHERE "span" = 'day'`); + await queryRunner.query(`DELETE FROM "__chart__per_user_notes" WHERE "span" = 'day'`); + await queryRunner.query(`DELETE FROM "__chart__per_user_reaction" WHERE "span" = 'day'`); + await queryRunner.query(`DELETE FROM "__chart__test" WHERE "span" = 'day'`); + await queryRunner.query(`DELETE FROM "__chart__test_grouped" WHERE "span" = 'day'`); + await queryRunner.query(`DELETE FROM "__chart__test_unique" WHERE "span" = 'day'`); + await queryRunner.query(`DELETE FROM "__chart__users" WHERE "span" = 'day'`); + + await queryRunner.query(`DROP INDEX "IDX_15e91a03aeeac9dbccdf43fc06"`); + await queryRunner.query(`DROP INDEX "IDX_20f57cc8f142c131340ee16742"`); + await queryRunner.query(`DROP INDEX "IDX_c26e2c1cbb6e911e0554b27416"`); + await queryRunner.query(`DROP INDEX "IDX_3fa0d0f17ca72e3dc80999a032"`); + await queryRunner.query(`DROP INDEX "IDX_6e1df243476e20cbf86572ecc0"`); + await queryRunner.query(`DROP INDEX "IDX_06690fc959f1c9fdaf21928222"`); + await queryRunner.query(`DROP INDEX "IDX_e447064455928cf627590ef527"`); + await queryRunner.query(`DROP INDEX "IDX_2d416e6af791a82e338c79d480"`); + await queryRunner.query(`DROP INDEX "IDX_e9cd07672b37d8966cf3709283"`); + await queryRunner.query(`DROP INDEX "IDX_fcc181fb8283009c61cc4083ef"`); + await queryRunner.query(`DROP INDEX "IDX_49975586f50ed7b800fdd88fbd"`); + await queryRunner.query(`DROP INDEX "IDX_6d6f156ceefc6bc5f273a0e370"`); + await queryRunner.query(`DROP INDEX "IDX_c12f0af4a66cdd30c2287ce8aa"`); + await queryRunner.query(`DROP INDEX "IDX_d0a4f79af5a97b08f37b547197"`); + await queryRunner.query(`DROP INDEX "IDX_f5448d9633cff74208d850aabe"`); + await queryRunner.query(`DROP INDEX "IDX_f8dd01baeded2ffa833e0a610a"`); + await queryRunner.query(`DROP INDEX "IDX_08fac0eb3b11f04c200c0b40dd"`); + await queryRunner.query(`DROP INDEX "IDX_9ff6944f01acb756fdc92d7563"`); + await queryRunner.query(`DROP INDEX "IDX_e69096589f11e3baa98ddd64d0"`); + await queryRunner.query(`DROP INDEX "IDX_0c9a159c5082cbeef3ca6706b5"`); + await queryRunner.query(`DROP INDEX "IDX_924fc196c80ca24bae01dd37e4"`); + await queryRunner.query(`DROP INDEX "IDX_328f259961e60c4fa0bfcf55ca"`); + await queryRunner.query(`DROP INDEX "IDX_42ea9381f0fda8dfe0fa1c8b53"`); + await queryRunner.query(`DROP INDEX "IDX_f2aeafde2ae6fbad38e857631b"`); + await queryRunner.query(`DROP INDEX "IDX_f92dd6d03f8d994f29987f6214"`); + await queryRunner.query(`DROP INDEX "IDX_57b5458d0d3d6d1e7f13d4e57f"`); + await queryRunner.query(`DROP INDEX "IDX_4db3b84c7be0d3464714f3e0b1"`); + await queryRunner.query(`DROP INDEX "IDX_8d2cbbc8114d90d19b44d626b6"`); + await queryRunner.query(`DROP INDEX "IDX_046feeb12e9ef5f783f409866a"`); + await queryRunner.query(`DROP INDEX "IDX_f68a5ab958f9f5fa17a32ac23b"`); + await queryRunner.query(`DROP INDEX "IDX_65633a106bce43fc7c5c30a5c7"`); + await queryRunner.query(`DROP INDEX "IDX_edeb73c09c3143a81bcb34d569"`); + await queryRunner.query(`DROP INDEX "IDX_e316f01a6d24eb31db27f88262"`); + await queryRunner.query(`DROP INDEX "IDX_2be7ec6cebddc14dc11e206686"`); + await queryRunner.query(`DROP INDEX "IDX_a5133470f4825902e170328ca5"`); + await queryRunner.query(`DROP INDEX "IDX_84e661abb7bd1e51b690d4b017"`); + await queryRunner.query(`DROP INDEX "IDX_5c73bf61da4f6e6f15bae88ed1"`); + await queryRunner.query(`DROP INDEX "IDX_d70c86baedc68326be11f9c0ce"`); + await queryRunner.query(`DROP INDEX "IDX_66e1e1ecd2f29e57778af35b59"`); + await queryRunner.query(`DROP INDEX "IDX_92255988735563f0fe4aba1f05"`); + await queryRunner.query(`DROP INDEX "IDX_c5870993e25c3d5771f91f5003"`); + await queryRunner.query(`DROP INDEX "IDX_f170de677ea75ad4533de2723e"`); + await queryRunner.query(`DROP INDEX "IDX_7c184198ecf66a8d3ecb253ab3"`); + await queryRunner.query(`DROP INDEX "IDX_f091abb24193d50c653c6b77fc"`); + await queryRunner.query(`DROP INDEX "IDX_a770a57c70e668cc61590c9161"`); + await queryRunner.query(`ALTER TABLE "__chart__active_users" DROP COLUMN "span"`); + await queryRunner.query(`DROP TYPE "public"."__chart__active_users_span_enum"`); + await queryRunner.query(`ALTER TABLE "__chart__active_users" DROP COLUMN "unique"`); + await queryRunner.query(`ALTER TABLE "__chart__active_users" DROP COLUMN "___local_count"`); + await queryRunner.query(`ALTER TABLE "__chart__active_users" DROP COLUMN "___remote_count"`); + await queryRunner.query(`ALTER TABLE "__chart__drive" DROP COLUMN "span"`); + await queryRunner.query(`DROP TYPE "public"."__chart__drive_span_enum"`); + await queryRunner.query(`ALTER TABLE "__chart__drive" DROP COLUMN "unique"`); + await queryRunner.query(`ALTER TABLE "__chart__federation" DROP COLUMN "span"`); + await queryRunner.query(`DROP TYPE "public"."__chart__federation_span_enum"`); + await queryRunner.query(`ALTER TABLE "__chart__federation" DROP COLUMN "unique"`); + await queryRunner.query(`ALTER TABLE "__chart__hashtag" DROP COLUMN "span"`); + await queryRunner.query(`DROP TYPE "public"."__chart__hashtag_span_enum"`); + await queryRunner.query(`ALTER TABLE "__chart__hashtag" DROP COLUMN "unique"`); + await queryRunner.query(`ALTER TABLE "__chart__hashtag" DROP COLUMN "___local_count"`); + await queryRunner.query(`ALTER TABLE "__chart__hashtag" DROP COLUMN "___remote_count"`); + await queryRunner.query(`ALTER TABLE "__chart__instance" DROP COLUMN "span"`); + await queryRunner.query(`DROP TYPE "public"."__chart__instance_span_enum"`); + await queryRunner.query(`ALTER TABLE "__chart__instance" DROP COLUMN "unique"`); + await queryRunner.query(`ALTER TABLE "__chart__network" DROP COLUMN "span"`); + await queryRunner.query(`DROP TYPE "public"."__chart__network_span_enum"`); + await queryRunner.query(`ALTER TABLE "__chart__network" DROP COLUMN "unique"`); + await queryRunner.query(`ALTER TABLE "__chart__notes" DROP COLUMN "span"`); + await queryRunner.query(`DROP TYPE "public"."__chart__notes_span_enum"`); + await queryRunner.query(`ALTER TABLE "__chart__notes" DROP COLUMN "unique"`); + await queryRunner.query(`ALTER TABLE "__chart__per_user_drive" DROP COLUMN "span"`); + await queryRunner.query(`DROP TYPE "public"."__chart__per_user_drive_span_enum"`); + await queryRunner.query(`ALTER TABLE "__chart__per_user_drive" DROP COLUMN "unique"`); + await queryRunner.query(`ALTER TABLE "__chart__per_user_following" DROP COLUMN "span"`); + await queryRunner.query(`DROP TYPE "public"."__chart__per_user_following_span_enum"`); + await queryRunner.query(`ALTER TABLE "__chart__per_user_following" DROP COLUMN "unique"`); + await queryRunner.query(`ALTER TABLE "__chart__per_user_notes" DROP COLUMN "span"`); + await queryRunner.query(`DROP TYPE "public"."__chart__per_user_notes_span_enum"`); + await queryRunner.query(`ALTER TABLE "__chart__per_user_notes" DROP COLUMN "unique"`); + await queryRunner.query(`ALTER TABLE "__chart__per_user_reaction" DROP COLUMN "span"`); + await queryRunner.query(`DROP TYPE "public"."__chart__per_user_reaction_span_enum"`); + await queryRunner.query(`ALTER TABLE "__chart__per_user_reaction" DROP COLUMN "unique"`); + await queryRunner.query(`ALTER TABLE "__chart__test_grouped" DROP COLUMN "span"`); + await queryRunner.query(`DROP TYPE "public"."__chart__test_grouped_span_enum"`); + await queryRunner.query(`ALTER TABLE "__chart__test_grouped" DROP COLUMN "unique"`); + await queryRunner.query(`ALTER TABLE "__chart__test_unique" DROP COLUMN "span"`); + await queryRunner.query(`DROP TYPE "public"."__chart__test_unique_span_enum"`); + await queryRunner.query(`ALTER TABLE "__chart__test_unique" DROP COLUMN "unique"`); + await queryRunner.query(`ALTER TABLE "__chart__test_unique" DROP COLUMN "___foo"`); + await queryRunner.query(`ALTER TABLE "__chart__test" DROP COLUMN "span"`); + await queryRunner.query(`DROP TYPE "public"."__chart__test_span_enum"`); + await queryRunner.query(`ALTER TABLE "__chart__test" DROP COLUMN "unique"`); + await queryRunner.query(`ALTER TABLE "__chart__users" DROP COLUMN "span"`); + await queryRunner.query(`DROP TYPE "public"."__chart__users_span_enum"`); + await queryRunner.query(`ALTER TABLE "__chart__users" DROP COLUMN "unique"`); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`ALTER TABLE "__chart__users" ADD "unique" jsonb NOT NULL DEFAULT '{}'`); + await queryRunner.query(`CREATE TYPE "public"."__chart__users_span_enum" AS ENUM('hour', 'day')`); + await queryRunner.query(`ALTER TABLE "__chart__users" ADD "span" "__chart__users_span_enum" NOT NULL`); + await queryRunner.query(`ALTER TABLE "__chart__test" ADD "unique" jsonb NOT NULL DEFAULT '{}'`); + await queryRunner.query(`CREATE TYPE "public"."__chart__test_span_enum" AS ENUM('hour', 'day')`); + await queryRunner.query(`ALTER TABLE "__chart__test" ADD "span" "__chart__test_span_enum" NOT NULL`); + await queryRunner.query(`ALTER TABLE "__chart__test_unique" ADD "___foo" bigint NOT NULL`); + await queryRunner.query(`ALTER TABLE "__chart__test_unique" ADD "unique" jsonb NOT NULL DEFAULT '{}'`); + await queryRunner.query(`CREATE TYPE "public"."__chart__test_unique_span_enum" AS ENUM('hour', 'day')`); + await queryRunner.query(`ALTER TABLE "__chart__test_unique" ADD "span" "__chart__test_unique_span_enum" NOT NULL`); + await queryRunner.query(`ALTER TABLE "__chart__test_grouped" ADD "unique" jsonb NOT NULL DEFAULT '{}'`); + await queryRunner.query(`CREATE TYPE "public"."__chart__test_grouped_span_enum" AS ENUM('hour', 'day')`); + await queryRunner.query(`ALTER TABLE "__chart__test_grouped" ADD "span" "__chart__test_grouped_span_enum" NOT NULL`); + await queryRunner.query(`ALTER TABLE "__chart__per_user_reaction" ADD "unique" jsonb NOT NULL DEFAULT '{}'`); + await queryRunner.query(`CREATE TYPE "public"."__chart__per_user_reaction_span_enum" AS ENUM('hour', 'day')`); + await queryRunner.query(`ALTER TABLE "__chart__per_user_reaction" ADD "span" "__chart__per_user_reaction_span_enum" NOT NULL`); + await queryRunner.query(`ALTER TABLE "__chart__per_user_notes" ADD "unique" jsonb NOT NULL DEFAULT '{}'`); + await queryRunner.query(`CREATE TYPE "public"."__chart__per_user_notes_span_enum" AS ENUM('hour', 'day')`); + await queryRunner.query(`ALTER TABLE "__chart__per_user_notes" ADD "span" "__chart__per_user_notes_span_enum" NOT NULL`); + await queryRunner.query(`ALTER TABLE "__chart__per_user_following" ADD "unique" jsonb NOT NULL DEFAULT '{}'`); + await queryRunner.query(`CREATE TYPE "public"."__chart__per_user_following_span_enum" AS ENUM('hour', 'day')`); + await queryRunner.query(`ALTER TABLE "__chart__per_user_following" ADD "span" "__chart__per_user_following_span_enum" NOT NULL`); + await queryRunner.query(`ALTER TABLE "__chart__per_user_drive" ADD "unique" jsonb NOT NULL DEFAULT '{}'`); + await queryRunner.query(`CREATE TYPE "public"."__chart__per_user_drive_span_enum" AS ENUM('hour', 'day')`); + await queryRunner.query(`ALTER TABLE "__chart__per_user_drive" ADD "span" "__chart__per_user_drive_span_enum" NOT NULL`); + await queryRunner.query(`ALTER TABLE "__chart__notes" ADD "unique" jsonb NOT NULL DEFAULT '{}'`); + await queryRunner.query(`CREATE TYPE "public"."__chart__notes_span_enum" AS ENUM('hour', 'day')`); + await queryRunner.query(`ALTER TABLE "__chart__notes" ADD "span" "__chart__notes_span_enum" NOT NULL`); + await queryRunner.query(`ALTER TABLE "__chart__network" ADD "unique" jsonb NOT NULL DEFAULT '{}'`); + await queryRunner.query(`CREATE TYPE "public"."__chart__network_span_enum" AS ENUM('hour', 'day')`); + await queryRunner.query(`ALTER TABLE "__chart__network" ADD "span" "__chart__network_span_enum" NOT NULL`); + await queryRunner.query(`ALTER TABLE "__chart__instance" ADD "unique" jsonb NOT NULL DEFAULT '{}'`); + await queryRunner.query(`CREATE TYPE "public"."__chart__instance_span_enum" AS ENUM('hour', 'day')`); + await queryRunner.query(`ALTER TABLE "__chart__instance" ADD "span" "__chart__instance_span_enum" NOT NULL`); + await queryRunner.query(`ALTER TABLE "__chart__hashtag" ADD "___remote_count" bigint NOT NULL`); + await queryRunner.query(`ALTER TABLE "__chart__hashtag" ADD "___local_count" bigint NOT NULL`); + await queryRunner.query(`ALTER TABLE "__chart__hashtag" ADD "unique" jsonb NOT NULL DEFAULT '{}'`); + await queryRunner.query(`CREATE TYPE "public"."__chart__hashtag_span_enum" AS ENUM('hour', 'day')`); + await queryRunner.query(`ALTER TABLE "__chart__hashtag" ADD "span" "__chart__hashtag_span_enum" NOT NULL`); + await queryRunner.query(`ALTER TABLE "__chart__federation" ADD "unique" jsonb NOT NULL DEFAULT '{}'`); + await queryRunner.query(`CREATE TYPE "public"."__chart__federation_span_enum" AS ENUM('hour', 'day')`); + await queryRunner.query(`ALTER TABLE "__chart__federation" ADD "span" "__chart__federation_span_enum" NOT NULL`); + await queryRunner.query(`ALTER TABLE "__chart__drive" ADD "unique" jsonb NOT NULL DEFAULT '{}'`); + await queryRunner.query(`CREATE TYPE "public"."__chart__drive_span_enum" AS ENUM('hour', 'day')`); + await queryRunner.query(`ALTER TABLE "__chart__drive" ADD "span" "__chart__drive_span_enum" NOT NULL`); + await queryRunner.query(`ALTER TABLE "__chart__active_users" ADD "___remote_count" bigint NOT NULL`); + await queryRunner.query(`ALTER TABLE "__chart__active_users" ADD "___local_count" bigint NOT NULL`); + await queryRunner.query(`ALTER TABLE "__chart__active_users" ADD "unique" jsonb NOT NULL DEFAULT '{}'`); + await queryRunner.query(`CREATE TYPE "public"."__chart__active_users_span_enum" AS ENUM('hour', 'day')`); + await queryRunner.query(`ALTER TABLE "__chart__active_users" ADD "span" "__chart__active_users_span_enum" NOT NULL`); + await queryRunner.query(`CREATE INDEX "IDX_a770a57c70e668cc61590c9161" ON "__chart__users" ("date", "group", "span") `); + await queryRunner.query(`CREATE INDEX "IDX_f091abb24193d50c653c6b77fc" ON "__chart__users" ("date", "span") `); + await queryRunner.query(`CREATE INDEX "IDX_7c184198ecf66a8d3ecb253ab3" ON "__chart__users" ("span") `); + await queryRunner.query(`CREATE INDEX "IDX_f170de677ea75ad4533de2723e" ON "__chart__test" ("date", "group", "span") `); + await queryRunner.query(`CREATE INDEX "IDX_c5870993e25c3d5771f91f5003" ON "__chart__test" ("date", "span") `); + await queryRunner.query(`CREATE INDEX "IDX_92255988735563f0fe4aba1f05" ON "__chart__test" ("span") `); + await queryRunner.query(`CREATE INDEX "IDX_66e1e1ecd2f29e57778af35b59" ON "__chart__test_unique" ("date", "group", "span") `); + await queryRunner.query(`CREATE INDEX "IDX_d70c86baedc68326be11f9c0ce" ON "__chart__test_unique" ("date", "span") `); + await queryRunner.query(`CREATE INDEX "IDX_5c73bf61da4f6e6f15bae88ed1" ON "__chart__test_unique" ("span") `); + await queryRunner.query(`CREATE INDEX "IDX_84e661abb7bd1e51b690d4b017" ON "__chart__test_grouped" ("date", "group", "span") `); + await queryRunner.query(`CREATE INDEX "IDX_a5133470f4825902e170328ca5" ON "__chart__test_grouped" ("date", "span") `); + await queryRunner.query(`CREATE INDEX "IDX_2be7ec6cebddc14dc11e206686" ON "__chart__test_grouped" ("span") `); + await queryRunner.query(`CREATE INDEX "IDX_e316f01a6d24eb31db27f88262" ON "__chart__per_user_reaction" ("date", "group", "span") `); + await queryRunner.query(`CREATE INDEX "IDX_edeb73c09c3143a81bcb34d569" ON "__chart__per_user_reaction" ("date", "span") `); + await queryRunner.query(`CREATE INDEX "IDX_65633a106bce43fc7c5c30a5c7" ON "__chart__per_user_reaction" ("span") `); + await queryRunner.query(`CREATE INDEX "IDX_f68a5ab958f9f5fa17a32ac23b" ON "__chart__per_user_notes" ("date", "group", "span") `); + await queryRunner.query(`CREATE INDEX "IDX_046feeb12e9ef5f783f409866a" ON "__chart__per_user_notes" ("date", "span") `); + await queryRunner.query(`CREATE INDEX "IDX_8d2cbbc8114d90d19b44d626b6" ON "__chart__per_user_notes" ("span") `); + await queryRunner.query(`CREATE INDEX "IDX_4db3b84c7be0d3464714f3e0b1" ON "__chart__per_user_following" ("date", "group", "span") `); + await queryRunner.query(`CREATE INDEX "IDX_57b5458d0d3d6d1e7f13d4e57f" ON "__chart__per_user_following" ("date", "span") `); + await queryRunner.query(`CREATE INDEX "IDX_f92dd6d03f8d994f29987f6214" ON "__chart__per_user_following" ("span") `); + await queryRunner.query(`CREATE INDEX "IDX_f2aeafde2ae6fbad38e857631b" ON "__chart__per_user_drive" ("date", "group", "span") `); + await queryRunner.query(`CREATE INDEX "IDX_42ea9381f0fda8dfe0fa1c8b53" ON "__chart__per_user_drive" ("date", "span") `); + await queryRunner.query(`CREATE INDEX "IDX_328f259961e60c4fa0bfcf55ca" ON "__chart__per_user_drive" ("span") `); + await queryRunner.query(`CREATE INDEX "IDX_924fc196c80ca24bae01dd37e4" ON "__chart__notes" ("date", "group", "span") `); + await queryRunner.query(`CREATE INDEX "IDX_0c9a159c5082cbeef3ca6706b5" ON "__chart__notes" ("date", "span") `); + await queryRunner.query(`CREATE INDEX "IDX_e69096589f11e3baa98ddd64d0" ON "__chart__notes" ("span") `); + await queryRunner.query(`CREATE INDEX "IDX_9ff6944f01acb756fdc92d7563" ON "__chart__network" ("date", "group", "span") `); + await queryRunner.query(`CREATE INDEX "IDX_08fac0eb3b11f04c200c0b40dd" ON "__chart__network" ("date", "span") `); + await queryRunner.query(`CREATE INDEX "IDX_f8dd01baeded2ffa833e0a610a" ON "__chart__network" ("span") `); + await queryRunner.query(`CREATE INDEX "IDX_f5448d9633cff74208d850aabe" ON "__chart__instance" ("date", "group", "span") `); + await queryRunner.query(`CREATE INDEX "IDX_d0a4f79af5a97b08f37b547197" ON "__chart__instance" ("date", "span") `); + await queryRunner.query(`CREATE INDEX "IDX_c12f0af4a66cdd30c2287ce8aa" ON "__chart__instance" ("span") `); + await queryRunner.query(`CREATE INDEX "IDX_6d6f156ceefc6bc5f273a0e370" ON "__chart__hashtag" ("date", "group", "span") `); + await queryRunner.query(`CREATE INDEX "IDX_49975586f50ed7b800fdd88fbd" ON "__chart__hashtag" ("date", "span") `); + await queryRunner.query(`CREATE INDEX "IDX_fcc181fb8283009c61cc4083ef" ON "__chart__hashtag" ("span") `); + await queryRunner.query(`CREATE INDEX "IDX_e9cd07672b37d8966cf3709283" ON "__chart__federation" ("date", "group", "span") `); + await queryRunner.query(`CREATE INDEX "IDX_2d416e6af791a82e338c79d480" ON "__chart__federation" ("date", "span") `); + await queryRunner.query(`CREATE INDEX "IDX_e447064455928cf627590ef527" ON "__chart__federation" ("span") `); + await queryRunner.query(`CREATE INDEX "IDX_06690fc959f1c9fdaf21928222" ON "__chart__drive" ("date", "group", "span") `); + await queryRunner.query(`CREATE INDEX "IDX_6e1df243476e20cbf86572ecc0" ON "__chart__drive" ("date", "span") `); + await queryRunner.query(`CREATE INDEX "IDX_3fa0d0f17ca72e3dc80999a032" ON "__chart__drive" ("span") `); + await queryRunner.query(`CREATE INDEX "IDX_c26e2c1cbb6e911e0554b27416" ON "__chart__active_users" ("date", "group", "span") `); + await queryRunner.query(`CREATE INDEX "IDX_20f57cc8f142c131340ee16742" ON "__chart__active_users" ("date", "span") `); + await queryRunner.query(`CREATE INDEX "IDX_15e91a03aeeac9dbccdf43fc06" ON "__chart__active_users" ("span") `); + } + +} diff --git a/migration/1615966519402-chart-v2-2.ts b/migration/1615966519402-chart-v2-2.ts new file mode 100644 index 0000000000..a694f9542a --- /dev/null +++ b/migration/1615966519402-chart-v2-2.ts @@ -0,0 +1,22 @@ +import {MigrationInterface, QueryRunner} from "typeorm"; + +export class chartV221615966519402 implements MigrationInterface { + name = 'chartV221615966519402' + + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query(`ALTER TABLE "__chart__active_users" ADD "___local_users" character varying array NOT NULL DEFAULT '{}'::varchar[]`); + await queryRunner.query(`ALTER TABLE "__chart__active_users" ADD "___remote_users" character varying array NOT NULL DEFAULT '{}'::varchar[]`); + await queryRunner.query(`ALTER TABLE "__chart__hashtag" ADD "___local_users" character varying array NOT NULL DEFAULT '{}'::varchar[]`); + await queryRunner.query(`ALTER TABLE "__chart__hashtag" ADD "___remote_users" character varying array NOT NULL DEFAULT '{}'::varchar[]`); + await queryRunner.query(`ALTER TABLE "__chart__test_unique" ADD "___foo" character varying array NOT NULL DEFAULT '{}'::varchar[]`); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`ALTER TABLE "__chart__test_unique" DROP COLUMN "___foo"`); + await queryRunner.query(`ALTER TABLE "__chart__hashtag" DROP COLUMN "___remote_users"`); + await queryRunner.query(`ALTER TABLE "__chart__hashtag" DROP COLUMN "___local_users"`); + await queryRunner.query(`ALTER TABLE "__chart__active_users" DROP COLUMN "___remote_users"`); + await queryRunner.query(`ALTER TABLE "__chart__active_users" DROP COLUMN "___local_users"`); + } + +} diff --git a/package.json b/package.json index 67d7dfca89..4d7f3c8353 100644 --- a/package.json +++ b/package.json @@ -47,7 +47,7 @@ "@koa/router": "9.0.1", "@sentry/browser": "5.29.2", "@sentry/tracing": "5.29.2", - "@sinonjs/fake-timers": "6.0.1", + "@sinonjs/fake-timers": "7.0.2", "@syuilo/aiscript": "0.11.1", "@types/bcryptjs": "2.4.2", "@types/bull": "3.15.0", diff --git a/src/daemons/queue-stats.ts b/src/daemons/queue-stats.ts index 288e855ae9..77f09b18d6 100644 --- a/src/daemons/queue-stats.ts +++ b/src/daemons/queue-stats.ts @@ -1,5 +1,5 @@ import Xev from 'xev'; -import { deliverQueue, inboxQueue } from '../queue'; +import { deliverQueue, inboxQueue } from '../queue/queues'; const ev = new Xev(); diff --git a/src/db/postgre.ts b/src/db/postgre.ts index 2f3c910163..831e5e0592 100644 --- a/src/db/postgre.ts +++ b/src/db/postgre.ts @@ -1,3 +1,7 @@ +// https://github.com/typeorm/typeorm/issues/2400 +const types = require('pg').types; +types.setTypeParser(20, Number); + import { createConnection, Logger, getConnection } from 'typeorm'; import config from '../config'; import { entities as charts } from '../services/chart/entities'; diff --git a/src/global.d.ts b/src/global.d.ts new file mode 100644 index 0000000000..7343aa1994 --- /dev/null +++ b/src/global.d.ts @@ -0,0 +1 @@ +type FIXME = any; diff --git a/src/misc/before-shutdown.ts b/src/misc/before-shutdown.ts new file mode 100644 index 0000000000..58d0ea5108 --- /dev/null +++ b/src/misc/before-shutdown.ts @@ -0,0 +1,88 @@ +// https://gist.github.com/nfantone/1eaa803772025df69d07f4dbf5df7e58 + +'use strict'; + +/** + * @callback BeforeShutdownListener + * @param {string} [signalOrEvent] The exit signal or event name received on the process. + */ + +/** + * System signals the app will listen to initiate shutdown. + * @const {string[]} + */ +const SHUTDOWN_SIGNALS = ['SIGINT', 'SIGTERM']; + +/** + * Time in milliseconds to wait before forcing shutdown. + * @const {number} + */ +const SHUTDOWN_TIMEOUT = 15000; + +/** + * A queue of listener callbacks to execute before shutting + * down the process. + * @type {BeforeShutdownListener[]} + */ +const shutdownListeners = []; + +/** + * Listen for signals and execute given `fn` function once. + * @param {string[]} signals System signals to listen to. + * @param {function(string)} fn Function to execute on shutdown. + */ +const processOnce = (signals, fn) => { + return signals.forEach(sig => process.once(sig, fn)); +}; + +/** + * Sets a forced shutdown mechanism that will exit the process after `timeout` milliseconds. + * @param {number} timeout Time to wait before forcing shutdown (milliseconds) + */ +const forceExitAfter = timeout => () => { + setTimeout(() => { + // Force shutdown after timeout + console.warn(`Could not close resources gracefully after ${timeout}ms: forcing shutdown`); + return process.exit(1); + }, timeout).unref(); +}; + +/** + * Main process shutdown handler. Will invoke every previously registered async shutdown listener + * in the queue and exit with a code of `0`. Any `Promise` rejections from any listener will + * be logged out as a warning, but won't prevent other callbacks from executing. + * @param {string} signalOrEvent The exit signal or event name received on the process. + */ +async function shutdownHandler(signalOrEvent) { + console.warn(`Shutting down: received [${signalOrEvent}] signal`); + + for (const listener of shutdownListeners) { + try { + await listener(signalOrEvent); + } catch (err) { + console.warn(`A shutdown handler failed before completing with: ${err.message || err}`); + } + } + + return process.exit(0); +} + +/** + * Registers a new shutdown listener to be invoked before exiting + * the main process. Listener handlers are guaranteed to be called in the order + * they were registered. + * @param {BeforeShutdownListener} listener The shutdown listener to register. + * @returns {BeforeShutdownListener} Echoes back the supplied `listener`. + */ +export function beforeShutdown(listener) { + shutdownListeners.push(listener); + return listener; +} + +// Register shutdown callback that kills the process after `SHUTDOWN_TIMEOUT` milliseconds +// This prevents custom shutdown handlers from hanging the process indefinitely +processOnce(SHUTDOWN_SIGNALS, forceExitAfter(SHUTDOWN_TIMEOUT)); + +// Register process shutdown callback +// Will listen to incoming signal events and execute all registered handlers in the stack +processOnce(SHUTDOWN_SIGNALS, shutdownHandler); diff --git a/src/queue/index.ts b/src/queue/index.ts index 163c57d691..9fb4595a35 100644 --- a/src/queue/index.ts +++ b/src/queue/index.ts @@ -1,4 +1,3 @@ -import * as Queue from 'bull'; import * as httpSignature from 'http-signature'; import config from '../config'; @@ -13,22 +12,7 @@ import { queueLogger } from './logger'; import { DriveFile } from '../models/entities/drive-file'; import { getJobInfo } from './get-job-info'; import { IActivity } from '../remote/activitypub/type'; - -function initializeQueue(name: string, limitPerSec = -1) { - return new Queue(name, { - redis: { - port: config.redis.port, - host: config.redis.host, - password: config.redis.pass, - db: config.redis.db || 0, - }, - prefix: config.redis.prefix ? `${config.redis.prefix}:queue` : 'queue', - limiter: limitPerSec > 0 ? { - max: limitPerSec * 5, - duration: 5000 - } : undefined - }); -} +import { dbQueue, deliverQueue, inboxQueue, objectStorageQueue } from './queues'; export type InboxJobData = { activity: IActivity, @@ -44,11 +28,6 @@ function renderError(e: Error): any { }; } -export const deliverQueue = initializeQueue('deliver', config.deliverJobPerSec || 128); -export const inboxQueue = initializeQueue('inbox', config.inboxJobPerSec || 16); -export const dbQueue = initializeQueue('db'); -export const objectStorageQueue = initializeQueue('objectStorage'); - const deliverLogger = queueLogger.createSubLogger('deliver'); const inboxLogger = queueLogger.createSubLogger('inbox'); const dbLogger = queueLogger.createSubLogger('db'); diff --git a/src/queue/initialize.ts b/src/queue/initialize.ts new file mode 100644 index 0000000000..92579531e4 --- /dev/null +++ b/src/queue/initialize.ts @@ -0,0 +1,18 @@ +import * as Queue from 'bull'; +import config from '../config'; + +export function initialize(name: string, limitPerSec = -1) { + return new Queue(name, { + redis: { + port: config.redis.port, + host: config.redis.host, + password: config.redis.pass, + db: config.redis.db || 0, + }, + prefix: config.redis.prefix ? `${config.redis.prefix}:queue` : 'queue', + limiter: limitPerSec > 0 ? { + max: limitPerSec * 5, + duration: 5000 + } : undefined + }); +} diff --git a/src/queue/queues.ts b/src/queue/queues.ts new file mode 100644 index 0000000000..d589d9f7da --- /dev/null +++ b/src/queue/queues.ts @@ -0,0 +1,7 @@ +import config from '../config'; +import { initialize as initializeQueue } from './initialize'; + +export const deliverQueue = initializeQueue('deliver', config.deliverJobPerSec || 128); +export const inboxQueue = initializeQueue('inbox', config.inboxJobPerSec || 16); +export const dbQueue = initializeQueue('db'); +export const objectStorageQueue = initializeQueue('objectStorage'); diff --git a/src/services/chart/charts/classes/active-users.ts b/src/services/chart/charts/classes/active-users.ts index 5128150de6..4820f8281b 100644 --- a/src/services/chart/charts/classes/active-users.ts +++ b/src/services/chart/charts/classes/active-users.ts @@ -17,6 +17,18 @@ export default class ActiveUsersChart extends Chart { return {}; } + @autobind + protected aggregate(logs: ActiveUsersLog[]): ActiveUsersLog { + return { + local: { + users: logs.reduce((a, b) => a.concat(b.local.users), [] as ActiveUsersLog['local']['users']), + }, + remote: { + users: logs.reduce((a, b) => a.concat(b.remote.users), [] as ActiveUsersLog['remote']['users']), + }, + }; + } + @autobind protected async fetchActual(): Promise> { return {}; @@ -25,11 +37,11 @@ export default class ActiveUsersChart extends Chart { @autobind public async update(user: User) { const update: Obj = { - count: 1 + users: [user.id] }; - await this.incIfUnique({ + await this.inc({ [Users.isLocalUser(user) ? 'local' : 'remote']: update - }, 'users', user.id); + }); } } diff --git a/src/services/chart/charts/classes/drive.ts b/src/services/chart/charts/classes/drive.ts index 57bb120beb..46399a34d8 100644 --- a/src/services/chart/charts/classes/drive.ts +++ b/src/services/chart/charts/classes/drive.ts @@ -27,6 +27,28 @@ export default class DriveChart extends Chart { }; } + @autobind + protected aggregate(logs: DriveLog[]): DriveLog { + return { + local: { + totalCount: logs[0].local.totalCount, + totalSize: logs[0].local.totalSize, + incCount: logs.reduce((a, b) => a + b.local.incCount, 0), + incSize: logs.reduce((a, b) => a + b.local.incSize, 0), + decCount: logs.reduce((a, b) => a + b.local.decCount, 0), + decSize: logs.reduce((a, b) => a + b.local.decSize, 0), + }, + remote: { + totalCount: logs[0].remote.totalCount, + totalSize: logs[0].remote.totalSize, + incCount: logs.reduce((a, b) => a + b.remote.incCount, 0), + incSize: logs.reduce((a, b) => a + b.remote.incSize, 0), + decCount: logs.reduce((a, b) => a + b.remote.decCount, 0), + decSize: logs.reduce((a, b) => a + b.remote.decSize, 0), + }, + }; + } + @autobind protected async fetchActual(): Promise> { const [localCount, remoteCount, localSize, remoteSize] = await Promise.all([ diff --git a/src/services/chart/charts/classes/federation.ts b/src/services/chart/charts/classes/federation.ts index bd2c497e7b..ab6ec2d4dd 100644 --- a/src/services/chart/charts/classes/federation.ts +++ b/src/services/chart/charts/classes/federation.ts @@ -20,6 +20,17 @@ export default class FederationChart extends Chart { }; } + @autobind + protected aggregate(logs: FederationLog[]): FederationLog { + return { + instance: { + total: logs[0].instance.total, + inc: logs.reduce((a, b) => a + b.instance.inc, 0), + dec: logs.reduce((a, b) => a + b.instance.dec, 0), + }, + }; + } + @autobind protected async fetchActual(): Promise> { const [total] = await Promise.all([ diff --git a/src/services/chart/charts/classes/hashtag.ts b/src/services/chart/charts/classes/hashtag.ts index 38c3a94f0c..43db5b0a83 100644 --- a/src/services/chart/charts/classes/hashtag.ts +++ b/src/services/chart/charts/classes/hashtag.ts @@ -17,6 +17,18 @@ export default class HashtagChart extends Chart { return {}; } + @autobind + protected aggregate(logs: HashtagLog[]): HashtagLog { + return { + local: { + users: logs.reduce((a, b) => a.concat(b.local.users), [] as HashtagLog['local']['users']), + }, + remote: { + users: logs.reduce((a, b) => a.concat(b.remote.users), [] as HashtagLog['remote']['users']), + }, + }; + } + @autobind protected async fetchActual(): Promise> { return {}; @@ -25,11 +37,11 @@ export default class HashtagChart extends Chart { @autobind public async update(hashtag: string, user: User) { const update: Obj = { - count: 1 + users: [user.id] }; - await this.incIfUnique({ + await this.inc({ [Users.isLocalUser(user) ? 'local' : 'remote']: update - }, 'users', user.id, hashtag); + }, hashtag); } } diff --git a/src/services/chart/charts/classes/instance.ts b/src/services/chart/charts/classes/instance.ts index 7575abfb6f..c32b864d87 100644 --- a/src/services/chart/charts/classes/instance.ts +++ b/src/services/chart/charts/classes/instance.ts @@ -36,6 +36,50 @@ export default class InstanceChart extends Chart { }; } + @autobind + protected aggregate(logs: InstanceLog[]): InstanceLog { + return { + requests: { + failed: logs.reduce((a, b) => a + b.requests.failed, 0), + succeeded: logs.reduce((a, b) => a + b.requests.succeeded, 0), + received: logs.reduce((a, b) => a + b.requests.received, 0), + }, + notes: { + total: logs[0].notes.total, + inc: logs.reduce((a, b) => a + b.notes.inc, 0), + dec: logs.reduce((a, b) => a + b.notes.dec, 0), + diffs: { + reply: logs.reduce((a, b) => a + b.notes.diffs.reply, 0), + renote: logs.reduce((a, b) => a + b.notes.diffs.renote, 0), + normal: logs.reduce((a, b) => a + b.notes.diffs.normal, 0), + }, + }, + users: { + total: logs[0].users.total, + inc: logs.reduce((a, b) => a + b.users.inc, 0), + dec: logs.reduce((a, b) => a + b.users.dec, 0), + }, + following: { + total: logs[0].following.total, + inc: logs.reduce((a, b) => a + b.following.inc, 0), + dec: logs.reduce((a, b) => a + b.following.dec, 0), + }, + followers: { + total: logs[0].followers.total, + inc: logs.reduce((a, b) => a + b.followers.inc, 0), + dec: logs.reduce((a, b) => a + b.followers.dec, 0), + }, + drive: { + totalFiles: logs[0].drive.totalFiles, + totalUsage: logs[0].drive.totalUsage, + incFiles: logs.reduce((a, b) => a + b.drive.incFiles, 0), + incUsage: logs.reduce((a, b) => a + b.drive.incUsage, 0), + decFiles: logs.reduce((a, b) => a + b.drive.decFiles, 0), + decUsage: logs.reduce((a, b) => a + b.drive.decUsage, 0), + }, + }; + } + @autobind protected async fetchActual(group: string): Promise> { const [ diff --git a/src/services/chart/charts/classes/network.ts b/src/services/chart/charts/classes/network.ts index 8b26e5c4c2..693af48f73 100644 --- a/src/services/chart/charts/classes/network.ts +++ b/src/services/chart/charts/classes/network.ts @@ -15,6 +15,17 @@ export default class NetworkChart extends Chart { return {}; } + @autobind + protected aggregate(logs: NetworkLog[]): NetworkLog { + return { + incomingRequests: logs.reduce((a, b) => a + b.incomingRequests, 0), + outgoingRequests: logs.reduce((a, b) => a + b.outgoingRequests, 0), + totalTime: logs.reduce((a, b) => a + b.totalTime, 0), + incomingBytes: logs.reduce((a, b) => a + b.incomingBytes, 0), + outgoingBytes: logs.reduce((a, b) => a + b.outgoingBytes, 0), + }; + } + @autobind protected async fetchActual(): Promise> { return {}; diff --git a/src/services/chart/charts/classes/notes.ts b/src/services/chart/charts/classes/notes.ts index 815061c445..965087bc08 100644 --- a/src/services/chart/charts/classes/notes.ts +++ b/src/services/chart/charts/classes/notes.ts @@ -25,6 +25,32 @@ export default class NotesChart extends Chart { }; } + @autobind + protected aggregate(logs: NotesLog[]): NotesLog { + return { + local: { + total: logs[0].local.total, + inc: logs.reduce((a, b) => a + b.local.inc, 0), + dec: logs.reduce((a, b) => a + b.local.dec, 0), + diffs: { + reply: logs.reduce((a, b) => a + b.local.diffs.reply, 0), + renote: logs.reduce((a, b) => a + b.local.diffs.renote, 0), + normal: logs.reduce((a, b) => a + b.local.diffs.normal, 0), + }, + }, + remote: { + total: logs[0].remote.total, + inc: logs.reduce((a, b) => a + b.remote.inc, 0), + dec: logs.reduce((a, b) => a + b.remote.dec, 0), + diffs: { + reply: logs.reduce((a, b) => a + b.remote.diffs.reply, 0), + renote: logs.reduce((a, b) => a + b.remote.diffs.renote, 0), + normal: logs.reduce((a, b) => a + b.remote.diffs.normal, 0), + }, + }, + }; + } + @autobind protected async fetchActual(): Promise> { const [localCount, remoteCount] = await Promise.all([ diff --git a/src/services/chart/charts/classes/per-user-drive.ts b/src/services/chart/charts/classes/per-user-drive.ts index aed9f6fce7..e778f7bf61 100644 --- a/src/services/chart/charts/classes/per-user-drive.ts +++ b/src/services/chart/charts/classes/per-user-drive.ts @@ -20,6 +20,18 @@ export default class PerUserDriveChart extends Chart { }; } + @autobind + protected aggregate(logs: PerUserDriveLog[]): PerUserDriveLog { + return { + totalCount: logs[0].totalCount, + totalSize: logs[0].totalSize, + incCount: logs.reduce((a, b) => a + b.incCount, 0), + incSize: logs.reduce((a, b) => a + b.incSize, 0), + decCount: logs.reduce((a, b) => a + b.decCount, 0), + decSize: logs.reduce((a, b) => a + b.decSize, 0), + }; + } + @autobind protected async fetchActual(group: string): Promise> { const [count, size] = await Promise.all([ diff --git a/src/services/chart/charts/classes/per-user-following.ts b/src/services/chart/charts/classes/per-user-following.ts index 8295c0cb0d..8b536009c8 100644 --- a/src/services/chart/charts/classes/per-user-following.ts +++ b/src/services/chart/charts/classes/per-user-following.ts @@ -35,6 +35,36 @@ export default class PerUserFollowingChart extends Chart { }; } + @autobind + protected aggregate(logs: PerUserFollowingLog[]): PerUserFollowingLog { + return { + local: { + followings: { + total: logs[0].local.followings.total, + inc: logs.reduce((a, b) => a + b.local.followings.inc, 0), + dec: logs.reduce((a, b) => a + b.local.followings.dec, 0), + }, + followers: { + total: logs[0].local.followers.total, + inc: logs.reduce((a, b) => a + b.local.followers.inc, 0), + dec: logs.reduce((a, b) => a + b.local.followers.dec, 0), + }, + }, + remote: { + followings: { + total: logs[0].remote.followings.total, + inc: logs.reduce((a, b) => a + b.remote.followings.inc, 0), + dec: logs.reduce((a, b) => a + b.remote.followings.dec, 0), + }, + followers: { + total: logs[0].remote.followers.total, + inc: logs.reduce((a, b) => a + b.remote.followers.inc, 0), + dec: logs.reduce((a, b) => a + b.remote.followers.dec, 0), + }, + }, + }; + } + @autobind protected async fetchActual(group: string): Promise> { const [ diff --git a/src/services/chart/charts/classes/per-user-notes.ts b/src/services/chart/charts/classes/per-user-notes.ts index cccd495604..8d1fb8d2b0 100644 --- a/src/services/chart/charts/classes/per-user-notes.ts +++ b/src/services/chart/charts/classes/per-user-notes.ts @@ -20,6 +20,20 @@ export default class PerUserNotesChart extends Chart { }; } + @autobind + protected aggregate(logs: PerUserNotesLog[]): PerUserNotesLog { + return { + total: logs[0].total, + inc: logs.reduce((a, b) => a + b.inc, 0), + dec: logs.reduce((a, b) => a + b.dec, 0), + diffs: { + reply: logs.reduce((a, b) => a + b.diffs.reply, 0), + renote: logs.reduce((a, b) => a + b.diffs.renote, 0), + normal: logs.reduce((a, b) => a + b.diffs.normal, 0), + }, + }; + } + @autobind protected async fetchActual(group: string): Promise> { const [count] = await Promise.all([ diff --git a/src/services/chart/charts/classes/per-user-reactions.ts b/src/services/chart/charts/classes/per-user-reactions.ts index 124fb4153c..b4cdced40c 100644 --- a/src/services/chart/charts/classes/per-user-reactions.ts +++ b/src/services/chart/charts/classes/per-user-reactions.ts @@ -18,6 +18,18 @@ export default class PerUserReactionsChart extends Chart { return {}; } + @autobind + protected aggregate(logs: PerUserReactionsLog[]): PerUserReactionsLog { + return { + local: { + count: logs.reduce((a, b) => a + b.local.count, 0), + }, + remote: { + count: logs.reduce((a, b) => a + b.remote.count, 0), + }, + }; + } + @autobind protected async fetchActual(group: string): Promise> { return {}; diff --git a/src/services/chart/charts/classes/test-grouped.ts b/src/services/chart/charts/classes/test-grouped.ts index e32cbcf416..92c8df636e 100644 --- a/src/services/chart/charts/classes/test-grouped.ts +++ b/src/services/chart/charts/classes/test-grouped.ts @@ -21,6 +21,17 @@ export default class TestGroupedChart extends Chart { }; } + @autobind + protected aggregate(logs: TestGroupedLog[]): TestGroupedLog { + return { + foo: { + total: logs[0].foo.total, + inc: logs.reduce((a, b) => a + b.foo.inc, 0), + dec: logs.reduce((a, b) => a + b.foo.dec, 0), + }, + }; + } + @autobind protected async fetchActual(group: string): Promise> { return { diff --git a/src/services/chart/charts/classes/test-unique.ts b/src/services/chart/charts/classes/test-unique.ts index 1eb396c293..5680d713ec 100644 --- a/src/services/chart/charts/classes/test-unique.ts +++ b/src/services/chart/charts/classes/test-unique.ts @@ -15,6 +15,13 @@ export default class TestUniqueChart extends Chart { return {}; } + @autobind + protected aggregate(logs: TestUniqueLog[]): TestUniqueLog { + return { + foo: logs.reduce((a, b) => a.concat(b.foo), [] as TestUniqueLog['foo']), + }; + } + @autobind protected async fetchActual(): Promise> { return {}; @@ -22,8 +29,8 @@ export default class TestUniqueChart extends Chart { @autobind public async uniqueIncrement(key: string) { - await this.incIfUnique({ - foo: 1 - }, 'foos', key); + await this.inc({ + foo: [key] + }); } } diff --git a/src/services/chart/charts/classes/test.ts b/src/services/chart/charts/classes/test.ts index ea64040f3e..d37d298de7 100644 --- a/src/services/chart/charts/classes/test.ts +++ b/src/services/chart/charts/classes/test.ts @@ -21,6 +21,17 @@ export default class TestChart extends Chart { }; } + @autobind + protected aggregate(logs: TestLog[]): TestLog { + return { + foo: { + total: logs[0].foo.total, + inc: logs.reduce((a, b) => a + b.foo.inc, 0), + dec: logs.reduce((a, b) => a + b.foo.dec, 0), + }, + }; + } + @autobind protected async fetchActual(): Promise> { return { diff --git a/src/services/chart/charts/classes/users.ts b/src/services/chart/charts/classes/users.ts index 47e4caa1b7..87b19d88f9 100644 --- a/src/services/chart/charts/classes/users.ts +++ b/src/services/chart/charts/classes/users.ts @@ -25,6 +25,22 @@ export default class UsersChart extends Chart { }; } + @autobind + protected aggregate(logs: UsersLog[]): UsersLog { + return { + local: { + total: logs[0].local.total, + inc: logs.reduce((a, b) => a + b.local.inc, 0), + dec: logs.reduce((a, b) => a + b.local.dec, 0), + }, + remote: { + total: logs[0].remote.total, + inc: logs.reduce((a, b) => a + b.remote.inc, 0), + dec: logs.reduce((a, b) => a + b.remote.dec, 0), + }, + }; + } + @autobind protected async fetchActual(): Promise> { const [localCount, remoteCount] = await Promise.all([ diff --git a/src/services/chart/charts/schemas/active-users.ts b/src/services/chart/charts/schemas/active-users.ts index 6e26bb4698..cdf0579efb 100644 --- a/src/services/chart/charts/schemas/active-users.ts +++ b/src/services/chart/charts/schemas/active-users.ts @@ -1,11 +1,15 @@ export const logSchema = { /** - * アクティブユーザー数 + * アクティブユーザー */ - count: { - type: 'number' as const, + users: { + type: 'array' as const, optional: false as const, nullable: false as const, - description: 'アクティブユーザー数', + description: 'アクティブユーザー', + items: { + type: 'string' as const, + optional: false as const, nullable: false as const, + } }, }; diff --git a/src/services/chart/charts/schemas/hashtag.ts b/src/services/chart/charts/schemas/hashtag.ts index 4dfd61c97f..791d0d1721 100644 --- a/src/services/chart/charts/schemas/hashtag.ts +++ b/src/services/chart/charts/schemas/hashtag.ts @@ -1,11 +1,15 @@ export const logSchema = { /** - * 投稿された数 + * 投稿したユーザー */ - count: { - type: 'number' as const, + users: { + type: 'array' as const, optional: false as const, nullable: false as const, - description: '投稿された数', + description: '投稿したユーザー', + items: { + type: 'string' as const, + optional: false as const, nullable: false as const, + } }, }; diff --git a/src/services/chart/charts/schemas/test-unique.ts b/src/services/chart/charts/schemas/test-unique.ts index 075a8092d9..51280400ac 100644 --- a/src/services/chart/charts/schemas/test-unique.ts +++ b/src/services/chart/charts/schemas/test-unique.ts @@ -3,9 +3,12 @@ export const schema = { optional: false as const, nullable: false as const, properties: { foo: { - type: 'number' as const, + type: 'array' as const, optional: false as const, nullable: false as const, - description: '' + items: { + type: 'string' as const, + optional: false as const, nullable: false as const, + } }, } }; diff --git a/src/services/chart/core.ts b/src/services/chart/core.ts index dc09923ae4..10621be073 100644 --- a/src/services/chart/core.ts +++ b/src/services/chart/core.ts @@ -24,8 +24,6 @@ type ArrayValue = { [P in keyof T]: T[P] extends number ? T[P][] : ArrayValue; }; -type Span = 'day' | 'hour'; - type Log = { id: number; @@ -38,22 +36,14 @@ type Log = { * 集計日時のUnixタイムスタンプ(秒) */ date: number; - - /** - * 集計期間 - */ - span: Span; - - /** - * ユニークインクリメント用 - */ - unique?: Record; }; const camelToSnake = (str: string) => { return str.replace(/([A-Z])/g, s => '_' + s.charAt(0).toLowerCase()); }; +const removeDuplicates = (array: any[]) => Array.from(new Set(array)); + /** * 様々なチャートの管理を司るクラス */ @@ -62,10 +52,21 @@ export default abstract class Chart> { private static readonly columnDot = '_'; private name: string; + private queue: { + diff: DeepPartial; + group: string | null; + }[] = []; public schema: Schema; protected repository: Repository; + protected abstract genNewLog(latest: T): DeepPartial; - protected abstract async fetchActual(group: string | null): Promise>; + + /** + * @param logs 日時が新しい方が先頭 + */ + protected abstract aggregate(logs: T[]): T; + + protected abstract fetchActual(group: string | null): Promise>; @autobind private static convertSchemaToFlatColumnDefinitions(schema: Schema) { @@ -75,10 +76,15 @@ export default abstract class Chart> { const p = path ? `${path}${this.columnDot}${k}` : k; if (v.type === 'object') { flatColumns(v.properties, p); - } else { + } else if (v.type === 'number') { columns[this.columnPrefix + p] = { type: 'bigint', }; + } else if (v.type === 'array' && v.items.type === 'string') { + columns[this.columnPrefix + p] = { + type: 'varchar', + array: true, + }; } } }; @@ -99,11 +105,11 @@ export default abstract class Chart> { @autobind private static convertObjectToFlattenColumns(x: Record) { - const columns = {} as Record; + const columns = {} as Record; const flatten = (x: Obj, path?: string) => { for (const [k, v] of Object.entries(x)) { const p = path ? `${path}${this.columnDot}${k}` : k; - if (typeof v === 'object') { + if (typeof v === 'object' && !Array.isArray(v)) { flatten(v, p); } else { columns[this.columnPrefix + p] = v; @@ -115,14 +121,37 @@ export default abstract class Chart> { } @autobind - private static convertQuery(x: Record) { - const query: Record = {}; + private static countUniqueFields(x: Record) { + const exec = (x: Obj) => { + const res = {} as Record; + for (const [k, v] of Object.entries(x)) { + if (typeof v === 'object' && !Array.isArray(v)) { + res[k] = exec(v); + } else if (Array.isArray(v)) { + res[k] = Array.from(new Set(v)).length; + } else { + res[k] = v; + } + } + return res; + }; + return exec(x); + } - const columns = Chart.convertObjectToFlattenColumns(x); + @autobind + private static convertQuery(diff: Record) { + const query: Record = {}; - for (const [k, v] of Object.entries(columns)) { - if (v > 0) query[k] = () => `"${k}" + ${v}`; - if (v < 0) query[k] = () => `"${k}" - ${Math.abs(v)}`; + for (const [k, v] of Object.entries(diff)) { + if (typeof v === 'number') { + if (v > 0) query[k] = () => `"${k}" + ${v}`; + if (v < 0) query[k] = () => `"${k}" - ${Math.abs(v)}`; + } else if (Array.isArray(v)) { + // TODO: item が文字列以外の場合も対応 + // TODO: item をSQLエスケープ + const items = v.map(item => `"${item}"`).join(','); + query[k] = () => `array_cat("${k}", '{${items}}'::varchar[])`; + } } return query; @@ -169,28 +198,14 @@ export default abstract class Chart> { length: 128, nullable: true }, - span: { - type: 'enum', - enum: ['hour', 'day'] - }, - unique: { - type: 'jsonb', - default: {} - }, ...Chart.convertSchemaToFlatColumnDefinitions(schema) }, indices: [{ columns: ['date'] - }, { - columns: ['span'] }, { columns: ['group'] - }, { - columns: ['span', 'date'] }, { columns: ['date', 'group'] - }, { - columns: ['span', 'date', 'group'] }] }); } @@ -200,7 +215,7 @@ export default abstract class Chart> { this.schema = schema; const entity = Chart.schemaToEntity(name, schema); - const keys = ['span', 'date']; + const keys = ['date']; if (grouped) keys.push('group'); entity.options.uniques = [{ @@ -220,7 +235,8 @@ export default abstract class Chart> { flatColumns(v.properties, p); } else { if (nestedProperty.get(log, p) == null) { - nestedProperty.set(log, p, 0); + const emptyValue = v.type === 'number' ? 0 : []; + nestedProperty.set(log, p, emptyValue); } } } @@ -230,10 +246,9 @@ export default abstract class Chart> { } @autobind - private getLatestLog(span: Span, group: string | null = null): Promise { + private getLatestLog(group: string | null = null): Promise { return this.repository.findOne({ group: group, - span: span }, { order: { date: -1 @@ -242,17 +257,13 @@ export default abstract class Chart> { } @autobind - private async getCurrentLog(span: Span, group: string | null = null): Promise { + private async getCurrentLog(group: string | null = null): Promise { const [y, m, d, h] = Chart.getCurrentDate(); - const current = - span == 'day' ? dateUTC([y, m, d, 0]) : - span == 'hour' ? dateUTC([y, m, d, h]) : - null as never; + const current = dateUTC([y, m, d, h]); - // 現在(今日または今のHour)のログ + // 現在(=今のHour)のログ const currentLog = await this.repository.findOne({ - span: span, date: Chart.dateToTimestamp(current), ...(group ? { group: group } : {}) }); @@ -271,7 +282,7 @@ export default abstract class Chart> { // * 昨日何もチャートを更新するような出来事がなかった場合は、 // * ログがそもそも作られずドキュメントが存在しないということがあり得るため、 // * 「昨日の」と決め打ちせずに「もっとも最近の」とします - const latest = await this.getLatestLog(span, group); + const latest = await this.getLatestLog(group); if (latest != null) { const obj = Chart.convertFlattenColumnsToObject( @@ -286,17 +297,16 @@ export default abstract class Chart> { // 初期ログデータを作成 data = this.getNewLog(null); - logger.info(`${this.name + (group ? `:${group}` : '')} (${span}): Initial commit created`); + logger.info(`${this.name + (group ? `:${group}` : '')}: Initial commit created`); } const date = Chart.dateToTimestamp(current); - const lockKey = `${this.name}:${date}:${group}:${span}`; + const lockKey = `${this.name}:${date}:${group}`; const unlock = await getChartInsertLock(lockKey); try { // ロック内でもう1回チェックする const currentLog = await this.repository.findOne({ - span: span, date: date, ...(group ? { group: group } : {}) }); @@ -307,12 +317,11 @@ export default abstract class Chart> { // 新規ログ挿入 log = await this.repository.save({ group: group, - span: span, date: date, ...Chart.convertObjectToFlattenColumns(data) }); - logger.info(`${this.name + (group ? `:${group}` : '')} (${span}): New commit created`); + logger.info(`${this.name + (group ? `:${group}` : '')}: New commit created`); return log; } finally { @@ -321,38 +330,62 @@ export default abstract class Chart> { } @autobind - protected commit(query: Record, group: string | null = null, uniqueKey?: string, uniqueValue?: string): Promise { + protected commit(diff: DeepPartial, group: string | null = null): void { + this.queue.push({ + diff, group, + }); + } + + @autobind + public async save() { + if (this.queue.length === 0) { + logger.info(`${this.name}: Write skipped`); + return; + } + + // TODO: 前の時間のログがqueueにあった場合のハンドリング + // 例えば、save が20分ごとに行われるとして、前回行われたのは 01:50 だったとする。 + // 次に save が行われるのは 02:10 ということになるが、もし 01:55 に新規ログが queue に追加されたとすると、 + // そのログは本来は 01:00~ のログとしてDBに保存されて欲しいのに、02:00~ のログ扱いになってしまう。 + // これを回避するための実装は複雑になりそうなため、一旦保留。 + const update = async (log: Log) => { - // ユニークインクリメントの場合、指定のキーに指定の値が既に存在していたら弾く - if ( - uniqueKey && log.unique && - log.unique[uniqueKey] && - log.unique[uniqueKey].includes(uniqueValue) - ) return; - - // ユニークインクリメントの指定のキーに値を追加 - if (uniqueKey && log.unique) { - if (log.unique[uniqueKey]) { - const sql = `jsonb_set("unique", '{${uniqueKey}}', ("unique"->>'${uniqueKey}')::jsonb || '["${uniqueValue}"]'::jsonb)`; - query['unique'] = () => sql; - } else { - const sql = `jsonb_set("unique", '{${uniqueKey}}', '["${uniqueValue}"]')`; - query['unique'] = () => sql; + const finalDiffs = {} as Record; + + for (const diff of this.queue.filter(q => q.group === log.group).map(q => q.diff)) { + const columns = Chart.convertObjectToFlattenColumns(diff); + + for (const [k, v] of Object.entries(columns)) { + if (finalDiffs[k] == null) { + finalDiffs[k] = v; + } else { + if (typeof finalDiffs[k] === 'number') { + (finalDiffs[k] as number) += v as number; + } else { + (finalDiffs[k] as unknown[]) = (finalDiffs[k] as unknown[]).concat(v); + } + } } } + const query = Chart.convertQuery(finalDiffs); + // ログ更新 await this.repository.createQueryBuilder() .update() .set(query) .where('id = :id', { id: log.id }) .execute(); + + logger.info(`${this.name + (log.group ? `:${log.group}` : '')}: Updated`); + + // TODO: この一連の処理が始まった後に新たにqueueに入ったものは消さないようにする + this.queue = this.queue.filter(q => q.group !== log.group); }; - return Promise.all([ - this.getCurrentLog('day', group).then(log => update(log)), - this.getCurrentLog('hour', group).then(log => update(log)), - ]); + const groups = removeDuplicates(this.queue.map(log => log.group)); + + await Promise.all(groups.map(group => this.getCurrentLog(group).then(log => update(log)))); } @autobind @@ -367,39 +400,30 @@ export default abstract class Chart> { .execute(); }; - return Promise.all([ - this.getCurrentLog('day', group).then(log => update(log)), - this.getCurrentLog('hour', group).then(log => update(log)), - ]); + return this.getCurrentLog(group).then(log => update(log)); } @autobind protected async inc(inc: DeepPartial, group: string | null = null): Promise { - await this.commit(Chart.convertQuery(inc as any), group); + await this.commit(inc, group); } @autobind - protected async incIfUnique(inc: DeepPartial, key: string, value: string, group: string | null = null): Promise { - await this.commit(Chart.convertQuery(inc as any), group, key, value); - } - - @autobind - public async getChart(span: Span, amount: number, begin: Date | null, group: string | null = null): Promise> { - const [y, m, d, h, _m, _s, _ms] = begin ? Chart.parseDate(subtractTime(addTime(begin, 1, span), 1)) : Chart.getCurrentDate(); - const [y2, m2, d2, h2] = begin ? Chart.parseDate(addTime(begin, 1, span)) : [] as never; + public async getChart(span: 'hour' | 'day', amount: number, cursor: Date | null, group: string | null = null): Promise> { + const [y, m, d, h, _m, _s, _ms] = cursor ? Chart.parseDate(subtractTime(addTime(cursor, 1, span), 1)) : Chart.getCurrentDate(); + const [y2, m2, d2, h2] = cursor ? Chart.parseDate(addTime(cursor, 1, span)) : [] as never; const lt = dateUTC([y, m, d, h, _m, _s, _ms]); const gt = - span === 'day' ? subtractTime(begin ? dateUTC([y2, m2, d2, 0]) : dateUTC([y, m, d, 0]), amount - 1, 'day') : - span === 'hour' ? subtractTime(begin ? dateUTC([y2, m2, d2, h2]) : dateUTC([y, m, d, h]), amount - 1, 'hour') : + span === 'day' ? subtractTime(cursor ? dateUTC([y2, m2, d2, 0]) : dateUTC([y, m, d, 0]), amount - 1, 'day') : + span === 'hour' ? subtractTime(cursor ? dateUTC([y2, m2, d2, h2]) : dateUTC([y, m, d, h]), amount - 1, 'hour') : null as never; // ログ取得 let logs = await this.repository.find({ where: { group: group, - span: span, date: Between(Chart.dateToTimestamp(gt), Chart.dateToTimestamp(lt)) }, order: { @@ -413,7 +437,6 @@ export default abstract class Chart> { // (すくなくともひとつログが無いと隙間埋めできないため) const recentLog = await this.repository.findOne({ group: group, - span: span }, { order: { date: -1 @@ -430,7 +453,6 @@ export default abstract class Chart> { // (隙間埋めできないため) const outdatedLog = await this.repository.findOne({ group: group, - span: span, date: LessThan(Chart.dateToTimestamp(gt)) }, { order: { @@ -445,23 +467,56 @@ export default abstract class Chart> { const chart: T[] = []; - // 整形 - for (let i = (amount - 1); i >= 0; i--) { - const current = - span === 'day' ? subtractTime(dateUTC([y, m, d, 0]), i, 'day') : - span === 'hour' ? subtractTime(dateUTC([y, m, d, h]), i, 'hour') : - null as never; - - const log = logs.find(l => isTimeSame(new Date(l.date * 1000), current)); - - if (log) { - const data = Chart.convertFlattenColumnsToObject(log as Record); - chart.unshift(data); - } else { - // 隙間埋め - const latest = logs.find(l => isTimeBefore(new Date(l.date * 1000), current)); - const data = latest ? Chart.convertFlattenColumnsToObject(latest as Record) : null; - chart.unshift(this.getNewLog(data)); + if (span === 'hour') { + for (let i = (amount - 1); i >= 0; i--) { + const current = subtractTime(dateUTC([y, m, d, h]), i, 'hour'); + + const log = logs.find(l => isTimeSame(new Date(l.date * 1000), current)); + + if (log) { + const data = Chart.convertFlattenColumnsToObject(log as Record); + chart.unshift(Chart.countUniqueFields(data)); + } else { + // 隙間埋め + const latest = logs.find(l => isTimeBefore(new Date(l.date * 1000), current)); + const data = latest ? Chart.convertFlattenColumnsToObject(latest as Record) : null; + chart.unshift(Chart.countUniqueFields(this.getNewLog(data))); + } + } + } else if (span === 'day') { + const logsForEachDays: T[][] = []; + let currentDay = -1; + let currentDayIndex = -1; + for (let i = ((amount - 1) * 24) + h; i >= 0; i--) { + const current = subtractTime(dateUTC([y, m, d, h]), i, 'hour'); + const _currentDay = Chart.parseDate(current)[2]; + if (currentDay != _currentDay) currentDayIndex++; + currentDay = _currentDay; + + const log = logs.find(l => isTimeSame(new Date(l.date * 1000), current)); + + if (log) { + if (logsForEachDays[currentDayIndex]) { + logsForEachDays[currentDayIndex].unshift(Chart.convertFlattenColumnsToObject(log)); + } else { + logsForEachDays[currentDayIndex] = [Chart.convertFlattenColumnsToObject(log)]; + } + } else { + // 隙間埋め + const latest = logs.find(l => isTimeBefore(new Date(l.date * 1000), current)); + const data = latest ? Chart.convertFlattenColumnsToObject(latest as Record) : null; + const newLog = this.getNewLog(data); + if (logsForEachDays[currentDayIndex]) { + logsForEachDays[currentDayIndex].unshift(newLog); + } else { + logsForEachDays[currentDayIndex] = [newLog]; + } + } + } + + for (const logs of logsForEachDays) { + const log = this.aggregate(logs); + chart.unshift(Chart.countUniqueFields(log)); } } @@ -473,20 +528,19 @@ export default abstract class Chart> { * { foo: [1, 2, 3], bar: [5, 6, 7] } * にする */ - const dive = (x: Obj, path?: string) => { + const compact = (x: Obj, path?: string) => { for (const [k, v] of Object.entries(x)) { const p = path ? `${path}.${k}` : k; - if (typeof v == 'object') { - dive(v, p); + if (typeof v === 'object' && !Array.isArray(v)) { + compact(v, p); } else { - const values = chart.map(s => nestedProperty.get(s, p)) - .map(v => parseInt(v, 10)); // TypeORMのバグ(?)で何故か数値カラムの値が文字列型になっているので数値に戻す + const values = chart.map(s => nestedProperty.get(s, p)); nestedProperty.set(res, p, values); } } }; - dive(chart[0]); + compact(chart[0]); return res; } diff --git a/src/services/chart/index.ts b/src/services/chart/index.ts index 9626e3d6b3..dde02bd64d 100644 --- a/src/services/chart/index.ts +++ b/src/services/chart/index.ts @@ -10,6 +10,7 @@ import PerUserReactionsChart from './charts/classes/per-user-reactions'; import HashtagChart from './charts/classes/hashtag'; import PerUserFollowingChart from './charts/classes/per-user-following'; import PerUserDriveChart from './charts/classes/per-user-drive'; +import { beforeShutdown } from '../../misc/before-shutdown'; export const federationChart = new FederationChart(); export const notesChart = new NotesChart(); @@ -23,3 +24,27 @@ export const perUserReactionsChart = new PerUserReactionsChart(); export const hashtagChart = new HashtagChart(); export const perUserFollowingChart = new PerUserFollowingChart(); export const perUserDriveChart = new PerUserDriveChart(); + +const charts = [ + federationChart, + notesChart, + usersChart, + networkChart, + activeUsersChart, + instanceChart, + perUserNotesChart, + driveChart, + perUserReactionsChart, + hashtagChart, + perUserFollowingChart, + perUserDriveChart, +]; + +// 20分おきにメモリ情報をDBに書き込み +setInterval(() => { + for (const chart of charts) { + chart.save(); + } +}, 1000 * 60 * 20); + +beforeShutdown(() => Promise.all(charts.map(chart => chart.save()))); diff --git a/test/chart.ts b/test/chart.ts index 25b083db17..55f6bd696c 100644 --- a/test/chart.ts +++ b/test/chart.ts @@ -72,7 +72,7 @@ describe('Chart', () => { testUniqueChart = new TestUniqueChart(); clock = lolex.install({ - now: new Date('2000-01-01 00:00:00') + now: new Date(Date.UTC(2000, 0, 1, 0, 0, 0)) }); done(); }); @@ -85,6 +85,7 @@ describe('Chart', () => { it('Can updates', async(async () => { await testChart.increment(); + await testChart.save(); const chartHours = await testChart.getChart('hour', 3, null); const chartDays = await testChart.getChart('day', 3, null); @@ -105,9 +106,10 @@ describe('Chart', () => { }, }); })); - + it('Can updates (dec)', async(async () => { await testChart.decrement(); + await testChart.save(); const chartHours = await testChart.getChart('hour', 3, null); const chartDays = await testChart.getChart('day', 3, null); @@ -154,6 +156,7 @@ describe('Chart', () => { await testChart.increment(); await testChart.increment(); await testChart.increment(); + await testChart.save(); const chartHours = await testChart.getChart('hour', 3, null); const chartDays = await testChart.getChart('day', 3, null); @@ -177,10 +180,12 @@ describe('Chart', () => { it('Can updates at different times', async(async () => { await testChart.increment(); + await testChart.save(); clock.tick('01:00:00'); await testChart.increment(); + await testChart.save(); const chartHours = await testChart.getChart('hour', 3, null); const chartDays = await testChart.getChart('day', 3, null); @@ -202,12 +207,45 @@ describe('Chart', () => { }); })); + // 仕様上はこうなってほしいけど、実装は難しそうなのでskip + /* + it('Can updates at different times without save', async(async () => { + await testChart.increment(); + + clock.tick('01:00:00'); + + await testChart.increment(); + await testChart.save(); + + const chartHours = await testChart.getChart('hour', 3, null); + const chartDays = await testChart.getChart('day', 3, null); + + assert.deepStrictEqual(chartHours, { + foo: { + dec: [0, 0, 0], + inc: [1, 1, 0], + total: [2, 1, 0] + }, + }); + + assert.deepStrictEqual(chartDays, { + foo: { + dec: [0, 0, 0], + inc: [2, 0, 0], + total: [2, 0, 0] + }, + }); + })); + */ + it('Can padding', async(async () => { await testChart.increment(); + await testChart.save(); clock.tick('02:00:00'); await testChart.increment(); + await testChart.save(); const chartHours = await testChart.getChart('hour', 3, null); const chartDays = await testChart.getChart('day', 3, null); @@ -232,6 +270,7 @@ describe('Chart', () => { // 要求された範囲にログがひとつもない場合でもパディングできる it('Can padding from past range', async(async () => { await testChart.increment(); + await testChart.save(); clock.tick('05:00:00'); @@ -259,8 +298,12 @@ describe('Chart', () => { // Issue #3190 it('Can padding from past range 2', async(async () => { await testChart.increment(); + await testChart.save(); + clock.tick('05:00:00'); + await testChart.increment(); + await testChart.save(); const chartHours = await testChart.getChart('hour', 3, null); const chartDays = await testChart.getChart('day', 3, null); @@ -284,10 +327,12 @@ describe('Chart', () => { it('Can specify offset', async(async () => { await testChart.increment(); + await testChart.save(); clock.tick('01:00:00'); await testChart.increment(); + await testChart.save(); const chartHours = await testChart.getChart('hour', 3, new Date(Date.UTC(2000, 0, 1, 0, 0, 0))); const chartDays = await testChart.getChart('day', 3, new Date(Date.UTC(2000, 0, 1, 0, 0, 0))); @@ -313,10 +358,12 @@ describe('Chart', () => { clock.tick('00:30:00'); await testChart.increment(); + await testChart.save(); clock.tick('01:30:00'); await testChart.increment(); + await testChart.save(); const chartHours = await testChart.getChart('hour', 3, new Date(Date.UTC(2000, 0, 1, 0, 0, 0))); const chartDays = await testChart.getChart('day', 3, new Date(Date.UTC(2000, 0, 1, 0, 0, 0))); @@ -341,6 +388,7 @@ describe('Chart', () => { describe('Grouped', () => { it('Can updates', async(async () => { await testGroupedChart.increment('alice'); + await testGroupedChart.save(); const aliceChartHours = await testGroupedChart.getChart('hour', 3, null, 'alice'); const aliceChartDays = await testGroupedChart.getChart('day', 3, null, 'alice'); @@ -386,6 +434,7 @@ describe('Chart', () => { await testUniqueChart.uniqueIncrement('alice'); await testUniqueChart.uniqueIncrement('alice'); await testUniqueChart.uniqueIncrement('bob'); + await testUniqueChart.save(); const chartHours = await testUniqueChart.getChart('hour', 3, null); const chartDays = await testUniqueChart.getChart('day', 3, null); @@ -428,6 +477,7 @@ describe('Chart', () => { it('Can resync (2)', async(async () => { await testChart.increment(); + await testChart.save(); clock.tick('01:00:00'); diff --git a/yarn.lock b/yarn.lock index 274f07631c..c838362820 100644 --- a/yarn.lock +++ b/yarn.lock @@ -358,10 +358,10 @@ dependencies: type-detect "4.0.8" -"@sinonjs/fake-timers@6.0.1": - version "6.0.1" - resolved "https://registry.yarnpkg.com/@sinonjs/fake-timers/-/fake-timers-6.0.1.tgz#293674fccb3262ac782c7aadfdeca86b10c75c40" - integrity sha512-MZPUxrmFubI36XS1DI3qmI0YdN1gks62JtFZvxR67ljjSNCeK6U08Zx4msEWOXuofgqUt6zPHSi1H9fbjR/NRA== +"@sinonjs/fake-timers@7.0.2": + version "7.0.2" + resolved "https://registry.yarnpkg.com/@sinonjs/fake-timers/-/fake-timers-7.0.2.tgz#a53e71d4154ee704ea9b36a6d0b0780e246fadd1" + integrity sha512-dF84L5YC90gIOegPDCYymPIsDmwMWWSh7BwfDXQYePi8lVIEp7IZ1UVGkME8FjXOsDPxan12x4aaK+Lo6wVh9A== dependencies: "@sinonjs/commons" "^1.7.0" -- cgit v1.2.3-freya From 449ea4b669252cff3dee0de0192ce4a353106444 Mon Sep 17 00:00:00 2001 From: syuilo Date: Thu, 18 Mar 2021 17:35:47 +0900 Subject: perf: reduce query --- src/services/note/reaction/create.ts | 47 +++++++++++++++++++++--------------- 1 file changed, 27 insertions(+), 20 deletions(-) (limited to 'src/services') diff --git a/src/services/note/reaction/create.ts b/src/services/note/reaction/create.ts index adc96ddc1f..8de9f53a17 100644 --- a/src/services/note/reaction/create.ts +++ b/src/services/note/reaction/create.ts @@ -11,34 +11,41 @@ import { perUserReactionsChart } from '../../chart'; import { genId } from '../../../misc/gen-id'; import { createNotification } from '../../create-notification'; import deleteReaction from './delete'; +import { isDuplicateKeyValueError } from '../../../misc/is-duplicate-key-value-error'; export default async (user: User, note: Note, reaction?: string) => { reaction = await toDbReaction(reaction, user.host); - const exist = await NoteReactions.findOne({ - noteId: note.id, - userId: user.id, - }); + let record; + + // Create reaction + try { + record = await NoteReactions.save({ + id: genId(), + createdAt: new Date(), + noteId: note.id, + userId: user.id, + reaction + }); + } catch (e) { + if (isDuplicateKeyValueError(e)) { + record = await NoteReactions.findOneOrFail({ + noteId: note.id, + userId: user.id, + }); - if (exist) { - if (exist.reaction !== reaction) { - // 別のリアクションがすでにされていたら置き換える - await deleteReaction(user, note); + if (record.reaction !== reaction) { + // 別のリアクションがすでにされていたら置き換える + await deleteReaction(user, note); + } else { + // 同じリアクションがすでにされていたら何もしない + return; + } } else { - // 同じリアクションがすでにされていたら何もしない - return; + throw e; } } - // Create reaction - const inserted = await NoteReactions.save({ - id: genId(), - createdAt: new Date(), - noteId: note.id, - userId: user.id, - reaction - }); - // Increment reactions count const sql = `jsonb_set("reactions", '{${reaction}}', (COALESCE("reactions"->>'${reaction}', '0')::int + 1)::text::jsonb)`; await Notes.createQueryBuilder().update() @@ -101,7 +108,7 @@ export default async (user: User, note: Note, reaction?: string) => { //#region 配信 if (Users.isLocalUser(user) && !note.localOnly) { - const content = renderActivity(await renderLike(inserted, note)); + const content = renderActivity(await renderLike(record, note)); const dm = new DeliverManager(user, content); if (note.userHost !== null) { const reactee = await Users.findOne(note.userId); -- cgit v1.2.3-freya From bffdfea58af44c2faa543ed98cfb878b6d283935 Mon Sep 17 00:00:00 2001 From: syuilo Date: Thu, 18 Mar 2021 17:38:42 +0900 Subject: refactor --- src/services/note/reaction/create.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'src/services') diff --git a/src/services/note/reaction/create.ts b/src/services/note/reaction/create.ts index 8de9f53a17..30a17ad3e9 100644 --- a/src/services/note/reaction/create.ts +++ b/src/services/note/reaction/create.ts @@ -12,11 +12,12 @@ import { genId } from '../../../misc/gen-id'; import { createNotification } from '../../create-notification'; import deleteReaction from './delete'; import { isDuplicateKeyValueError } from '../../../misc/is-duplicate-key-value-error'; +import { NoteReaction } from '../../../models/entities/note-reaction'; export default async (user: User, note: Note, reaction?: string) => { reaction = await toDbReaction(reaction, user.host); - let record; + let record: NoteReaction; // Create reaction try { -- cgit v1.2.3-freya From 65e7204ec94cccb53a66f681efa9c057bb9580a9 Mon Sep 17 00:00:00 2001 From: syuilo Date: Fri, 19 Mar 2021 10:53:09 +0900 Subject: perf: myReaction の取得をまとめて行うように MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Related #6813 --- src/models/repositories/note.ts | 47 +++++++++++++++++++++++++++++++++--- src/services/note/reaction/create.ts | 1 + 2 files changed, 44 insertions(+), 4 deletions(-) (limited to 'src/services') diff --git a/src/models/repositories/note.ts b/src/models/repositories/note.ts index 32552db2fe..43caaf94b2 100644 --- a/src/models/repositories/note.ts +++ b/src/models/repositories/note.ts @@ -9,6 +9,7 @@ import { toString } from '../../mfm/to-string'; import { parse } from '../../mfm/parse'; import { Emoji } from '../entities/emoji'; import { concat } from '../../prelude/array'; +import { NoteReaction } from '../entities/note-reaction'; export type PackedNote = SchemaType; @@ -83,6 +84,9 @@ export class NoteRepository extends Repository { options?: { detail?: boolean; skipHide?: boolean; + _hint_?: { + myReactions: Map; + }; } ): Promise { const opts = Object.assign({ @@ -188,6 +192,16 @@ export class NoteRepository extends Repository { } async function populateMyReaction() { + if (options?._hint_?.myReactions) { + const reaction = options._hint_.myReactions.get(note.id); + if (reaction) { + return convertLegacyReaction(reaction.reaction); + } else if (reaction === null) { + return undefined; + } + // 実装上抜けがあるだけかもしれないので、「ヒントに含まれてなかったら(=undefinedなら)return」のようにはしない + } + const reaction = await NoteReactions.findOne({ userId: meId!, noteId: note.id, @@ -245,11 +259,13 @@ export class NoteRepository extends Repository { ...(opts.detail ? { reply: note.replyId ? this.pack(note.replyId, meId, { - detail: false + detail: false, + _hint_: options?._hint_ }) : undefined, renote: note.renoteId ? this.pack(note.renoteId, meId, { - detail: true + detail: true, + _hint_: options?._hint_ }) : undefined, poll: note.hasPoll ? populatePoll() : undefined, @@ -272,7 +288,7 @@ export class NoteRepository extends Repository { return packed; } - public packMany( + public async packMany( notes: (Note['id'] | Note)[], me?: User['id'] | User | null | undefined, options?: { @@ -280,7 +296,30 @@ export class NoteRepository extends Repository { skipHide?: boolean; } ) { - return Promise.all(notes.map(n => this.pack(n, me, options))); + if (notes.length === 0) return []; + + const meId = me ? typeof me === 'string' ? me : me.id : null; + const noteIds = notes.map(n => typeof n === 'object' ? n.id : n); + const myReactionsMap = new Map(); + if (meId) { + const renoteIds = notes.filter(n => (typeof n === 'object') && (n.renoteId != null)).map(n => (n as Note).renoteId!); + const targets = [...noteIds, ...renoteIds]; + const myReactions = await NoteReactions.find({ + userId: meId, + noteId: In(targets), + }); + + for (const target of targets) { + myReactionsMap.set(target, myReactions.find(reaction => reaction.noteId === target) || null); + } + } + + return await Promise.all(notes.map(n => this.pack(n, me, { + ...options, + _hint_: { + myReactions: myReactionsMap + } + }))); } } diff --git a/src/services/note/reaction/create.ts b/src/services/note/reaction/create.ts index 30a17ad3e9..6c0a852f34 100644 --- a/src/services/note/reaction/create.ts +++ b/src/services/note/reaction/create.ts @@ -15,6 +15,7 @@ import { isDuplicateKeyValueError } from '../../../misc/is-duplicate-key-value-e import { NoteReaction } from '../../../models/entities/note-reaction'; export default async (user: User, note: Note, reaction?: string) => { + // TODO: cache reaction = await toDbReaction(reaction, user.host); let record: NoteReaction; -- cgit v1.2.3-freya From 87c8f9ff953499340496e9c5db09c93eaff08851 Mon Sep 17 00:00:00 2001 From: syuilo Date: Fri, 19 Mar 2021 20:43:24 +0900 Subject: perf: Reduce database query --- src/client/components/note-detailed.vue | 10 ++- src/client/components/note.vue | 10 ++- src/client/ui/chat/note.vue | 10 ++- src/server/api/endpoints/notes/mentions.ts | 6 +- src/server/api/stream/index.ts | 38 ++++++++--- src/services/note/read-mention.ts | 29 ++++++++ src/services/note/read-specified-note.ts | 29 ++++++++ src/services/note/read.ts | 105 ----------------------------- 8 files changed, 114 insertions(+), 123 deletions(-) create mode 100644 src/services/note/read-mention.ts create mode 100644 src/services/note/read-specified-note.ts delete mode 100644 src/services/note/read.ts (limited to 'src/services') diff --git a/src/client/components/note-detailed.vue b/src/client/components/note-detailed.vue index 1ef3f43389..4ad3d2d898 100644 --- a/src/client/components/note-detailed.vue +++ b/src/client/components/note-detailed.vue @@ -350,7 +350,15 @@ export default defineComponent({ capture(withHandler = false) { if (this.$i) { - this.connection.send(document.body.contains(this.$el) ? 'sn' : 's', { id: this.appearNote.id }); + this.connection.send('sn', { id: this.appearNote.id }); + if (this.appearNote.userId !== this.$i.id) { + if (this.appearNote.mentions && this.appearNote.mentions.includes(this.$i.id)) { + this.connection.send('readMention', { id: this.appearNote.id }); + } + if (this.appearNote.visibleUserIds && this.appearNote.visibleUserIds.includes(this.$i.id)) { + this.connection.send('readSpecifiedNote', { id: this.appearNote.id }); + } + } if (withHandler) this.connection.on('noteUpdated', this.onStreamNoteUpdated); } }, diff --git a/src/client/components/note.vue b/src/client/components/note.vue index 65e09b7802..3b59afd71d 100644 --- a/src/client/components/note.vue +++ b/src/client/components/note.vue @@ -325,7 +325,15 @@ export default defineComponent({ capture(withHandler = false) { if (this.$i) { - this.connection.send(document.body.contains(this.$el) ? 'sn' : 's', { id: this.appearNote.id }); + this.connection.send('sn', { id: this.appearNote.id }); + if (this.appearNote.userId !== this.$i.id) { + if (this.appearNote.mentions && this.appearNote.mentions.includes(this.$i.id)) { + this.connection.send('readMention', { id: this.appearNote.id }); + } + if (this.appearNote.visibleUserIds && this.appearNote.visibleUserIds.includes(this.$i.id)) { + this.connection.send('readSpecifiedNote', { id: this.appearNote.id }); + } + } if (withHandler) this.connection.on('noteUpdated', this.onStreamNoteUpdated); } }, diff --git a/src/client/ui/chat/note.vue b/src/client/ui/chat/note.vue index 5a4a13d889..29bc61d9c5 100644 --- a/src/client/ui/chat/note.vue +++ b/src/client/ui/chat/note.vue @@ -325,7 +325,15 @@ export default defineComponent({ capture(withHandler = false) { if (this.$i) { - this.connection.send(document.body.contains(this.$el) ? 'sn' : 's', { id: this.appearNote.id }); + this.connection.send('sn', { id: this.appearNote.id }); + if (this.appearNote.userId !== this.$i.id) { + if (this.appearNote.mentions && this.appearNote.mentions.includes(this.$i.id)) { + this.connection.send('readMention', { id: this.appearNote.id }); + } + if (this.appearNote.visibleUserIds && this.appearNote.visibleUserIds.includes(this.$i.id)) { + this.connection.send('readSpecifiedNote', { id: this.appearNote.id }); + } + } if (withHandler) this.connection.on('noteUpdated', this.onStreamNoteUpdated); } }, diff --git a/src/server/api/endpoints/notes/mentions.ts b/src/server/api/endpoints/notes/mentions.ts index 8a9d295d38..1e3014bd46 100644 --- a/src/server/api/endpoints/notes/mentions.ts +++ b/src/server/api/endpoints/notes/mentions.ts @@ -1,12 +1,12 @@ import $ from 'cafy'; import { ID } from '../../../../misc/cafy-id'; import define from '../../define'; -import read from '../../../../services/note/read'; import { Notes, Followings } from '../../../../models'; import { generateVisibilityQuery } from '../../common/generate-visibility-query'; import { generateMutedUserQuery } from '../../common/generate-muted-user-query'; import { makePaginationQuery } from '../../common/make-pagination-query'; import { Brackets } from 'typeorm'; +import { readMention } from '../../../../services/note/read-mention'; export const meta = { desc: { @@ -79,9 +79,7 @@ export default define(meta, async (ps, user) => { const mentions = await query.take(ps.limit!).getMany(); - for (const note of mentions) { - read(user.id, note.id); - } + readMention(user.id, mentions.map(n => n.id)); return await Notes.packMany(mentions, user); }); diff --git a/src/server/api/stream/index.ts b/src/server/api/stream/index.ts index bb37cfa622..4a87f61e7f 100644 --- a/src/server/api/stream/index.ts +++ b/src/server/api/stream/index.ts @@ -2,7 +2,6 @@ import autobind from 'autobind-decorator'; import * as websocket from 'websocket'; import { readNotification } from '../common/read-notification'; import call from '../call'; -import readNote from '../../../services/note/read'; import Channel from './channel'; import channels from './channels'; import { EventEmitter } from 'events'; @@ -14,6 +13,8 @@ import { AccessToken } from '../../../models/entities/access-token'; import { UserProfile } from '../../../models/entities/user-profile'; import { publishChannelStream, publishGroupMessagingStream, publishMessagingStream } from '../../../services/stream'; import { UserGroup } from '../../../models/entities/user-group'; +import { readMention } from '../../../services/note/read-mention'; +import { readSpecifiedNote } from '../../../services/note/read-specified-note'; /** * Main stream connection @@ -86,9 +87,10 @@ export default class Connection { switch (type) { case 'api': this.onApiRequest(body); break; case 'readNotification': this.onReadNotification(body); break; - case 'subNote': this.onSubscribeNote(body, true); break; - case 'sn': this.onSubscribeNote(body, true); break; // alias - case 's': this.onSubscribeNote(body, false); break; + case 'readMention': this.onReadMention(body); break; + case 'readSpecifiedNote': this.onReadSpecifiedNote(body); break; + case 'subNote': this.onSubscribeNote(body); break; + case 'sn': this.onSubscribeNote(body); break; // alias case 'unsubNote': this.onUnsubscribeNote(body); break; case 'un': this.onUnsubscribeNote(body); break; // alias case 'connect': this.onChannelConnectRequested(body); break; @@ -141,11 +143,31 @@ export default class Connection { readNotification(this.user!.id, [payload.id]); } + @autobind + private onReadMention(payload: any) { + if (!payload.id) return; + if (this.user) { + // TODO: ある程度まとめてreadMentionするようにする + // 具体的には、この箇所ではキュー的な配列にread予定ノートを溜めておくに留めて、別の箇所で定期的にキューにあるノートを配列でreadMentionに渡すような実装にする + readMention(this.user.id, [payload.id]); + } + } + + @autobind + private onReadSpecifiedNote(payload: any) { + if (!payload.id) return; + if (this.user) { + // TODO: ある程度まとめてreadSpecifiedNoteするようにする + // 具体的には、この箇所ではキュー的な配列にread予定ノートを溜めておくに留めて、別の箇所で定期的にキューにあるノートを配列でreadSpecifiedNoteに渡すような実装にする + readSpecifiedNote(this.user.id, [payload.id]); + } + } + /** * 投稿購読要求時 */ @autobind - private onSubscribeNote(payload: any, read: boolean) { + private onSubscribeNote(payload: any) { if (!payload.id) return; if (this.subscribingNotes[payload.id] == null) { @@ -157,12 +179,6 @@ export default class Connection { if (this.subscribingNotes[payload.id] === 1) { this.subscriber.on(`noteStream:${payload.id}`, this.onNoteStreamMessage); } - - if (this.user && read) { - // TODO: クライアントでタイムライン読み込みなどすると、一度に大量のreadNoteが発生しクエリ数がすごいことになるので、ある程度まとめてreadNoteするようにする - // 具体的には、この箇所ではキュー的な配列にread予定ノートを溜めておくに留めて、別の箇所で定期的にキューにあるノートを配列でreadNoteに渡すような実装にする - readNote(this.user.id, payload.id); - } } /** diff --git a/src/services/note/read-mention.ts b/src/services/note/read-mention.ts new file mode 100644 index 0000000000..2a668ecd6c --- /dev/null +++ b/src/services/note/read-mention.ts @@ -0,0 +1,29 @@ +import { publishMainStream } from '../stream'; +import { Note } from '../../models/entities/note'; +import { User } from '../../models/entities/user'; +import { NoteUnreads } from '../../models'; +import { In } from 'typeorm'; + +/** + * Mark a mention note as read + */ +export async function readMention( + userId: User['id'], + noteIds: Note['id'][] +) { + // Remove the records + await NoteUnreads.delete({ + userId: userId, + noteId: In(noteIds), + }); + + const mentionsCount = await NoteUnreads.count({ + userId: userId, + isMentioned: true + }); + + if (mentionsCount === 0) { + // 全て既読になったイベントを発行 + publishMainStream(userId, 'readAllUnreadMentions'); + } +} diff --git a/src/services/note/read-specified-note.ts b/src/services/note/read-specified-note.ts new file mode 100644 index 0000000000..0fcb66bf98 --- /dev/null +++ b/src/services/note/read-specified-note.ts @@ -0,0 +1,29 @@ +import { publishMainStream } from '../stream'; +import { Note } from '../../models/entities/note'; +import { User } from '../../models/entities/user'; +import { NoteUnreads } from '../../models'; +import { In } from 'typeorm'; + +/** + * Mark a specified note as read + */ +export async function readSpecifiedNote( + userId: User['id'], + noteIds: Note['id'][] +) { + // Remove the records + await NoteUnreads.delete({ + userId: userId, + noteId: In(noteIds), + }); + + const specifiedCount = await NoteUnreads.count({ + userId: userId, + isSpecified: true + }); + + if (specifiedCount === 0) { + // 全て既読になったイベントを発行 + publishMainStream(userId, 'readAllUnreadSpecifiedNotes'); + } +} diff --git a/src/services/note/read.ts b/src/services/note/read.ts deleted file mode 100644 index 5a39ab30b7..0000000000 --- a/src/services/note/read.ts +++ /dev/null @@ -1,105 +0,0 @@ -import { publishMainStream } from '../stream'; -import { Note } from '../../models/entities/note'; -import { User } from '../../models/entities/user'; -import { NoteUnreads, Antennas, AntennaNotes, Users } from '../../models'; -import { Not, IsNull } from 'typeorm'; - -/** - * Mark a note as read - */ -export default async function( - userId: User['id'], - noteId: Note['id'] -) { - async function careNoteUnreads() { - const exist = await NoteUnreads.findOne({ - userId: userId, - noteId: noteId, - }); - - if (!exist) return; - - // Remove the record - await NoteUnreads.delete({ - userId: userId, - noteId: noteId, - }); - - if (exist.isMentioned) { - NoteUnreads.count({ - userId: userId, - isMentioned: true - }).then(mentionsCount => { - if (mentionsCount === 0) { - // 全て既読になったイベントを発行 - publishMainStream(userId, 'readAllUnreadMentions'); - } - }); - } - - if (exist.isSpecified) { - NoteUnreads.count({ - userId: userId, - isSpecified: true - }).then(specifiedCount => { - if (specifiedCount === 0) { - // 全て既読になったイベントを発行 - publishMainStream(userId, 'readAllUnreadSpecifiedNotes'); - } - }); - } - - if (exist.noteChannelId) { - NoteUnreads.count({ - userId: userId, - noteChannelId: Not(IsNull()) - }).then(channelNoteCount => { - if (channelNoteCount === 0) { - // 全て既読になったイベントを発行 - publishMainStream(userId, 'readAllChannels'); - } - }); - } - } - - async function careAntenna() { - const beforeUnread = await Users.getHasUnreadAntenna(userId); - if (!beforeUnread) return; - - const antennas = await Antennas.find({ userId }); - - await Promise.all(antennas.map(async antenna => { - const countBefore = await AntennaNotes.count({ - antennaId: antenna.id, - read: false - }); - - if (countBefore === 0) return; - - await AntennaNotes.update({ - antennaId: antenna.id, - noteId: noteId - }, { - read: true - }); - - const countAfter = await AntennaNotes.count({ - antennaId: antenna.id, - read: false - }); - - if (countAfter === 0) { - publishMainStream(userId, 'readAntenna', antenna); - } - })); - - Users.getHasUnreadAntenna(userId).then(unread => { - if (!unread) { - publishMainStream(userId, 'readAllAntennas'); - } - }); - } - - careNoteUnreads(); - careAntenna(); -} -- cgit v1.2.3-freya From 8050352ad88798be222f735a3217367acaee277f Mon Sep 17 00:00:00 2001 From: syuilo Date: Sun, 21 Mar 2021 15:14:03 +0900 Subject: perf: 各ストリーミング接続ごとにポーリングしないように MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/server/api/endpoints/channels/follow.ts | 3 ++ src/server/api/endpoints/channels/unfollow.ts | 3 ++ src/server/api/endpoints/i/update.ts | 3 +- src/server/api/endpoints/mute/create.ts | 3 ++ src/server/api/endpoints/mute/delete.ts | 3 ++ src/server/api/stream/index.ts | 57 +++++++++++++++++++-------- src/services/blocking/create.ts | 12 ++++-- src/services/following/create.ts | 7 +++- src/services/following/delete.ts | 7 +++- src/services/following/requests/reject.ts | 7 +++- src/services/stream.ts | 5 +++ 11 files changed, 83 insertions(+), 27 deletions(-) (limited to 'src/services') diff --git a/src/server/api/endpoints/channels/follow.ts b/src/server/api/endpoints/channels/follow.ts index bf2f2bbb57..11c6e37ff7 100644 --- a/src/server/api/endpoints/channels/follow.ts +++ b/src/server/api/endpoints/channels/follow.ts @@ -4,6 +4,7 @@ import define from '../../define'; import { ApiError } from '../../error'; import { Channels, ChannelFollowings } from '../../../../models'; import { genId } from '../../../../misc/gen-id'; +import { publishUserEvent } from '../../../../services/stream'; export const meta = { tags: ['channels'], @@ -42,4 +43,6 @@ export default define(meta, async (ps, user) => { followerId: user.id, followeeId: channel.id, }); + + publishUserEvent(user.id, 'followChannel', channel); }); diff --git a/src/server/api/endpoints/channels/unfollow.ts b/src/server/api/endpoints/channels/unfollow.ts index 8cab5c36a6..3eb0f1519b 100644 --- a/src/server/api/endpoints/channels/unfollow.ts +++ b/src/server/api/endpoints/channels/unfollow.ts @@ -3,6 +3,7 @@ import { ID } from '../../../../misc/cafy-id'; import define from '../../define'; import { ApiError } from '../../error'; import { Channels, ChannelFollowings } from '../../../../models'; +import { publishUserEvent } from '../../../../services/stream'; export const meta = { tags: ['channels'], @@ -39,4 +40,6 @@ export default define(meta, async (ps, user) => { followerId: user.id, followeeId: channel.id, }); + + publishUserEvent(user.id, 'unfollowChannel', channel); }); diff --git a/src/server/api/endpoints/i/update.ts b/src/server/api/endpoints/i/update.ts index a1faf8f1c2..92be2e9e6d 100644 --- a/src/server/api/endpoints/i/update.ts +++ b/src/server/api/endpoints/i/update.ts @@ -1,6 +1,6 @@ import $ from 'cafy'; import { ID } from '../../../../misc/cafy-id'; -import { publishMainStream } from '../../../../services/stream'; +import { publishMainStream, publishUserEvent } from '../../../../services/stream'; import acceptAllFollowRequests from '../../../../services/following/requests/accept-all'; import { publishToFollowers } from '../../../../services/i/update'; import define from '../../define'; @@ -317,6 +317,7 @@ export default define(meta, async (ps, user, token) => { // Publish meUpdated event publishMainStream(user.id, 'meUpdated', iObj); + publishUserEvent(user.id, 'updateUserProfile', await UserProfiles.findOne(user.id)); // 鍵垢を解除したとき、溜まっていたフォローリクエストがあるならすべて承認 if (user.isLocked && ps.isLocked === false) { diff --git a/src/server/api/endpoints/mute/create.ts b/src/server/api/endpoints/mute/create.ts index 437ad96107..ebfc6028ed 100644 --- a/src/server/api/endpoints/mute/create.ts +++ b/src/server/api/endpoints/mute/create.ts @@ -6,6 +6,7 @@ import { getUser } from '../../common/getters'; import { genId } from '../../../../misc/gen-id'; import { Mutings, NoteWatchings } from '../../../../models'; import { Muting } from '../../../../models/entities/muting'; +import { publishUserEvent } from '../../../../services/stream'; export const meta = { desc: { @@ -82,6 +83,8 @@ export default define(meta, async (ps, user) => { muteeId: mutee.id, } as Muting); + publishUserEvent(user.id, 'mute', mutee); + NoteWatchings.delete({ userId: muter.id, noteUserId: mutee.id diff --git a/src/server/api/endpoints/mute/delete.ts b/src/server/api/endpoints/mute/delete.ts index 217352acb4..67a59e3ae4 100644 --- a/src/server/api/endpoints/mute/delete.ts +++ b/src/server/api/endpoints/mute/delete.ts @@ -4,6 +4,7 @@ import define from '../../define'; import { ApiError } from '../../error'; import { getUser } from '../../common/getters'; import { Mutings } from '../../../../models'; +import { publishUserEvent } from '../../../../services/stream'; export const meta = { desc: { @@ -76,4 +77,6 @@ export default define(meta, async (ps, user) => { await Mutings.delete({ id: exist.id }); + + publishUserEvent(user.id, 'unmute', mutee); }); diff --git a/src/server/api/stream/index.ts b/src/server/api/stream/index.ts index a94923484d..748e894f83 100644 --- a/src/server/api/stream/index.ts +++ b/src/server/api/stream/index.ts @@ -30,10 +30,6 @@ export default class Connection { public subscriber: EventEmitter; private channels: Channel[] = []; private subscribingNotes: any = {}; - private followingClock: ReturnType; - private mutingClock: ReturnType; - private followingChannelsClock: ReturnType; - private userProfileClock: ReturnType; constructor( wsConnection: websocket.connection, @@ -52,19 +48,51 @@ export default class Connection { this.onBroadcastMessage(type, body); }); - // TODO: reidsでイベントをもらうようにし、ポーリングはしないようにする if (this.user) { this.updateFollowing(); - this.followingClock = setInterval(this.updateFollowing, 5000); - this.updateMuting(); - this.mutingClock = setInterval(this.updateMuting, 5000); - this.updateFollowingChannels(); - this.followingChannelsClock = setInterval(this.updateFollowingChannels, 5000); - this.updateUserProfile(); - this.userProfileClock = setInterval(this.updateUserProfile, 5000); + + this.subscriber.on(`user:${this.user.id}`, ({ type, body }) => { + this.onUserEvent(type, body); + }); + } + } + + @autobind + private onUserEvent(type: string, body: any) { + switch (type) { + case 'follow': + this.following.add(body.id); + break; + + case 'unfollow': + this.following.delete(body.id); + break; + + case 'mute': + this.muting.add(body.id); + break; + + case 'unmute': + this.muting.delete(body.id); + break; + + case 'followChannel': + this.followingChannels.add(body.id); + break; + + case 'unfollowChannel': + this.followingChannels.delete(body.id); + break; + + case 'updateUserProfile': + this.userProfile = body; + break; + + default: + break; } } @@ -354,10 +382,5 @@ export default class Connection { for (const c of this.channels.filter(c => c.dispose)) { if (c.dispose) c.dispose(); } - - if (this.followingClock) clearInterval(this.followingClock); - if (this.mutingClock) clearInterval(this.mutingClock); - if (this.followingChannelsClock) clearInterval(this.followingChannelsClock); - if (this.userProfileClock) clearInterval(this.userProfileClock); } } diff --git a/src/services/blocking/create.ts b/src/services/blocking/create.ts index def4f33585..4f0238db91 100644 --- a/src/services/blocking/create.ts +++ b/src/services/blocking/create.ts @@ -1,4 +1,4 @@ -import { publishMainStream } from '../stream'; +import { publishMainStream, publishUserEvent } from '../stream'; import { renderActivity } from '../../remote/activitypub/renderer'; import renderFollow from '../../remote/activitypub/renderer/follow'; import renderUndo from '../../remote/activitypub/renderer/undo'; @@ -55,7 +55,10 @@ async function cancelRequest(follower: User, followee: User) { if (Users.isLocalUser(follower)) { Users.pack(followee, follower, { detail: true - }).then(packed => publishMainStream(follower.id, 'unfollow', packed)); + }).then(packed => { + publishUserEvent(follower.id, 'unfollow', packed); + publishMainStream(follower.id, 'unfollow', packed); + }); } // リモートにフォローリクエストをしていたらUndoFollow送信 @@ -97,7 +100,10 @@ async function unFollow(follower: User, followee: User) { if (Users.isLocalUser(follower)) { Users.pack(followee, follower, { detail: true - }).then(packed => publishMainStream(follower.id, 'unfollow', packed)); + }).then(packed => { + publishUserEvent(follower.id, 'unfollow', packed); + publishMainStream(follower.id, 'unfollow', packed); + }); } // リモートにフォローをしていたらUndoFollow送信 diff --git a/src/services/following/create.ts b/src/services/following/create.ts index 6bc98aee87..eb6699b0bf 100644 --- a/src/services/following/create.ts +++ b/src/services/following/create.ts @@ -1,4 +1,4 @@ -import { publishMainStream } from '../stream'; +import { publishMainStream, publishUserEvent } from '../stream'; import { renderActivity } from '../../remote/activitypub/renderer'; import renderFollow from '../../remote/activitypub/renderer/follow'; import renderAccept from '../../remote/activitypub/renderer/accept'; @@ -88,7 +88,10 @@ export async function insertFollowingDoc(followee: User, follower: User) { if (Users.isLocalUser(follower)) { Users.pack(followee, follower, { detail: true - }).then(packed => publishMainStream(follower.id, 'follow', packed)); + }).then(packed => { + publishUserEvent(follower.id, 'follow', packed); + publishMainStream(follower.id, 'follow', packed); + }); } // Publish followed event diff --git a/src/services/following/delete.ts b/src/services/following/delete.ts index 8821611515..32c47ea7f4 100644 --- a/src/services/following/delete.ts +++ b/src/services/following/delete.ts @@ -1,4 +1,4 @@ -import { publishMainStream } from '../stream'; +import { publishMainStream, publishUserEvent } from '../stream'; import { renderActivity } from '../../remote/activitypub/renderer'; import renderFollow from '../../remote/activitypub/renderer/follow'; import renderUndo from '../../remote/activitypub/renderer/undo'; @@ -30,7 +30,10 @@ export default async function(follower: User, followee: User, silent = false) { if (!silent && Users.isLocalUser(follower)) { Users.pack(followee, follower, { detail: true - }).then(packed => publishMainStream(follower.id, 'unfollow', packed)); + }).then(packed => { + publishUserEvent(follower.id, 'unfollow', packed); + publishMainStream(follower.id, 'unfollow', packed); + }); } if (Users.isLocalUser(follower) && Users.isRemoteUser(followee)) { diff --git a/src/services/following/requests/reject.ts b/src/services/following/requests/reject.ts index 9a8b14bbfd..d8d3788088 100644 --- a/src/services/following/requests/reject.ts +++ b/src/services/following/requests/reject.ts @@ -2,7 +2,7 @@ import { renderActivity } from '../../../remote/activitypub/renderer'; import renderFollow from '../../../remote/activitypub/renderer/follow'; import renderReject from '../../../remote/activitypub/renderer/reject'; import { deliver } from '../../../queue'; -import { publishMainStream } from '../../stream'; +import { publishMainStream, publishUserEvent } from '../../stream'; import { User, ILocalUser } from '../../../models/entities/user'; import { Users, FollowRequests, Followings } from '../../../models'; import { decrementFollowing } from '../delete'; @@ -39,5 +39,8 @@ export default async function(followee: User, follower: User) { Users.pack(followee, follower, { detail: true - }).then(packed => publishMainStream(follower.id, 'unfollow', packed)); + }).then(packed => { + publishUserEvent(follower.id, 'unfollow', packed); + publishMainStream(follower.id, 'unfollow', packed); + }); } diff --git a/src/services/stream.ts b/src/services/stream.ts index d833d700fe..75385847ce 100644 --- a/src/services/stream.ts +++ b/src/services/stream.ts @@ -20,6 +20,10 @@ class Publisher { })); } + public publishUserEvent = (userId: User['id'], type: string, value?: any): void => { + this.publish(`user:${userId}`, type, typeof value === 'undefined' ? null : value); + } + public publishBroadcastStream = (type: string, value?: any): void => { this.publish('broadcast', type, typeof value === 'undefined' ? null : value); } @@ -84,6 +88,7 @@ const publisher = new Publisher(); export default publisher; +export const publishUserEvent = publisher.publishUserEvent; export const publishBroadcastStream = publisher.publishBroadcastStream; export const publishMainStream = publisher.publishMainStream; export const publishDriveStream = publisher.publishDriveStream; -- cgit v1.2.3-freya From 630464f38d2524ddc5d11d2abd4fddcccc4240d4 Mon Sep 17 00:00:00 2001 From: syuilo Date: Sun, 21 Mar 2021 15:35:02 +0900 Subject: Revert "perf: Reduce database query" This reverts commit 87c8f9ff953499340496e9c5db09c93eaff08851. --- src/client/components/note-detailed.vue | 10 +-- src/client/components/note.vue | 10 +-- src/client/ui/chat/note.vue | 10 +-- src/server/api/endpoints/notes/mentions.ts | 6 +- src/server/api/stream/index.ts | 38 +++-------- src/services/note/read-mention.ts | 29 -------- src/services/note/read-specified-note.ts | 29 -------- src/services/note/read.ts | 105 +++++++++++++++++++++++++++++ 8 files changed, 123 insertions(+), 114 deletions(-) delete mode 100644 src/services/note/read-mention.ts delete mode 100644 src/services/note/read-specified-note.ts create mode 100644 src/services/note/read.ts (limited to 'src/services') diff --git a/src/client/components/note-detailed.vue b/src/client/components/note-detailed.vue index 4ad3d2d898..1ef3f43389 100644 --- a/src/client/components/note-detailed.vue +++ b/src/client/components/note-detailed.vue @@ -350,15 +350,7 @@ export default defineComponent({ capture(withHandler = false) { if (this.$i) { - this.connection.send('sn', { id: this.appearNote.id }); - if (this.appearNote.userId !== this.$i.id) { - if (this.appearNote.mentions && this.appearNote.mentions.includes(this.$i.id)) { - this.connection.send('readMention', { id: this.appearNote.id }); - } - if (this.appearNote.visibleUserIds && this.appearNote.visibleUserIds.includes(this.$i.id)) { - this.connection.send('readSpecifiedNote', { id: this.appearNote.id }); - } - } + this.connection.send(document.body.contains(this.$el) ? 'sn' : 's', { id: this.appearNote.id }); if (withHandler) this.connection.on('noteUpdated', this.onStreamNoteUpdated); } }, diff --git a/src/client/components/note.vue b/src/client/components/note.vue index 3b59afd71d..65e09b7802 100644 --- a/src/client/components/note.vue +++ b/src/client/components/note.vue @@ -325,15 +325,7 @@ export default defineComponent({ capture(withHandler = false) { if (this.$i) { - this.connection.send('sn', { id: this.appearNote.id }); - if (this.appearNote.userId !== this.$i.id) { - if (this.appearNote.mentions && this.appearNote.mentions.includes(this.$i.id)) { - this.connection.send('readMention', { id: this.appearNote.id }); - } - if (this.appearNote.visibleUserIds && this.appearNote.visibleUserIds.includes(this.$i.id)) { - this.connection.send('readSpecifiedNote', { id: this.appearNote.id }); - } - } + this.connection.send(document.body.contains(this.$el) ? 'sn' : 's', { id: this.appearNote.id }); if (withHandler) this.connection.on('noteUpdated', this.onStreamNoteUpdated); } }, diff --git a/src/client/ui/chat/note.vue b/src/client/ui/chat/note.vue index 29bc61d9c5..5a4a13d889 100644 --- a/src/client/ui/chat/note.vue +++ b/src/client/ui/chat/note.vue @@ -325,15 +325,7 @@ export default defineComponent({ capture(withHandler = false) { if (this.$i) { - this.connection.send('sn', { id: this.appearNote.id }); - if (this.appearNote.userId !== this.$i.id) { - if (this.appearNote.mentions && this.appearNote.mentions.includes(this.$i.id)) { - this.connection.send('readMention', { id: this.appearNote.id }); - } - if (this.appearNote.visibleUserIds && this.appearNote.visibleUserIds.includes(this.$i.id)) { - this.connection.send('readSpecifiedNote', { id: this.appearNote.id }); - } - } + this.connection.send(document.body.contains(this.$el) ? 'sn' : 's', { id: this.appearNote.id }); if (withHandler) this.connection.on('noteUpdated', this.onStreamNoteUpdated); } }, diff --git a/src/server/api/endpoints/notes/mentions.ts b/src/server/api/endpoints/notes/mentions.ts index 56640ec1ab..30844774e0 100644 --- a/src/server/api/endpoints/notes/mentions.ts +++ b/src/server/api/endpoints/notes/mentions.ts @@ -1,12 +1,12 @@ import $ from 'cafy'; import { ID } from '../../../../misc/cafy-id'; import define from '../../define'; +import read from '../../../../services/note/read'; import { Notes, Followings } from '../../../../models'; import { generateVisibilityQuery } from '../../common/generate-visibility-query'; import { generateMutedUserQuery } from '../../common/generate-muted-user-query'; import { makePaginationQuery } from '../../common/make-pagination-query'; import { Brackets } from 'typeorm'; -import { readMention } from '../../../../services/note/read-mention'; export const meta = { desc: { @@ -83,7 +83,9 @@ export default define(meta, async (ps, user) => { const mentions = await query.take(ps.limit!).getMany(); - readMention(user.id, mentions.map(n => n.id)); + for (const note of mentions) { + read(user.id, note.id); + } return await Notes.packMany(mentions, user); }); diff --git a/src/server/api/stream/index.ts b/src/server/api/stream/index.ts index 748e894f83..f67faee1ce 100644 --- a/src/server/api/stream/index.ts +++ b/src/server/api/stream/index.ts @@ -2,6 +2,7 @@ import autobind from 'autobind-decorator'; import * as websocket from 'websocket'; import { readNotification } from '../common/read-notification'; import call from '../call'; +import readNote from '../../../services/note/read'; import Channel from './channel'; import channels from './channels'; import { EventEmitter } from 'events'; @@ -13,8 +14,6 @@ import { AccessToken } from '../../../models/entities/access-token'; import { UserProfile } from '../../../models/entities/user-profile'; import { publishChannelStream, publishGroupMessagingStream, publishMessagingStream } from '../../../services/stream'; import { UserGroup } from '../../../models/entities/user-group'; -import { readMention } from '../../../services/note/read-mention'; -import { readSpecifiedNote } from '../../../services/note/read-specified-note'; /** * Main stream connection @@ -116,10 +115,9 @@ export default class Connection { switch (type) { case 'api': this.onApiRequest(body); break; case 'readNotification': this.onReadNotification(body); break; - case 'readMention': this.onReadMention(body); break; - case 'readSpecifiedNote': this.onReadSpecifiedNote(body); break; - case 'subNote': this.onSubscribeNote(body); break; - case 'sn': this.onSubscribeNote(body); break; // alias + case 'subNote': this.onSubscribeNote(body, true); break; + case 'sn': this.onSubscribeNote(body, true); break; // alias + case 's': this.onSubscribeNote(body, false); break; case 'unsubNote': this.onUnsubscribeNote(body); break; case 'un': this.onUnsubscribeNote(body); break; // alias case 'connect': this.onChannelConnectRequested(body); break; @@ -172,31 +170,11 @@ export default class Connection { readNotification(this.user!.id, [payload.id]); } - @autobind - private onReadMention(payload: any) { - if (!payload.id) return; - if (this.user) { - // TODO: ある程度まとめてreadMentionするようにする - // 具体的には、この箇所ではキュー的な配列にread予定ノートを溜めておくに留めて、別の箇所で定期的にキューにあるノートを配列でreadMentionに渡すような実装にする - readMention(this.user.id, [payload.id]); - } - } - - @autobind - private onReadSpecifiedNote(payload: any) { - if (!payload.id) return; - if (this.user) { - // TODO: ある程度まとめてreadSpecifiedNoteするようにする - // 具体的には、この箇所ではキュー的な配列にread予定ノートを溜めておくに留めて、別の箇所で定期的にキューにあるノートを配列でreadSpecifiedNoteに渡すような実装にする - readSpecifiedNote(this.user.id, [payload.id]); - } - } - /** * 投稿購読要求時 */ @autobind - private onSubscribeNote(payload: any) { + private onSubscribeNote(payload: any, read: boolean) { if (!payload.id) return; if (this.subscribingNotes[payload.id] == null) { @@ -208,6 +186,12 @@ export default class Connection { if (this.subscribingNotes[payload.id] === 1) { this.subscriber.on(`noteStream:${payload.id}`, this.onNoteStreamMessage); } + + if (this.user && read) { + // TODO: クライアントでタイムライン読み込みなどすると、一度に大量のreadNoteが発生しクエリ数がすごいことになるので、ある程度まとめてreadNoteするようにする + // 具体的には、この箇所ではキュー的な配列にread予定ノートを溜めておくに留めて、別の箇所で定期的にキューにあるノートを配列でreadNoteに渡すような実装にする + readNote(this.user.id, payload.id); + } } /** diff --git a/src/services/note/read-mention.ts b/src/services/note/read-mention.ts deleted file mode 100644 index 2a668ecd6c..0000000000 --- a/src/services/note/read-mention.ts +++ /dev/null @@ -1,29 +0,0 @@ -import { publishMainStream } from '../stream'; -import { Note } from '../../models/entities/note'; -import { User } from '../../models/entities/user'; -import { NoteUnreads } from '../../models'; -import { In } from 'typeorm'; - -/** - * Mark a mention note as read - */ -export async function readMention( - userId: User['id'], - noteIds: Note['id'][] -) { - // Remove the records - await NoteUnreads.delete({ - userId: userId, - noteId: In(noteIds), - }); - - const mentionsCount = await NoteUnreads.count({ - userId: userId, - isMentioned: true - }); - - if (mentionsCount === 0) { - // 全て既読になったイベントを発行 - publishMainStream(userId, 'readAllUnreadMentions'); - } -} diff --git a/src/services/note/read-specified-note.ts b/src/services/note/read-specified-note.ts deleted file mode 100644 index 0fcb66bf98..0000000000 --- a/src/services/note/read-specified-note.ts +++ /dev/null @@ -1,29 +0,0 @@ -import { publishMainStream } from '../stream'; -import { Note } from '../../models/entities/note'; -import { User } from '../../models/entities/user'; -import { NoteUnreads } from '../../models'; -import { In } from 'typeorm'; - -/** - * Mark a specified note as read - */ -export async function readSpecifiedNote( - userId: User['id'], - noteIds: Note['id'][] -) { - // Remove the records - await NoteUnreads.delete({ - userId: userId, - noteId: In(noteIds), - }); - - const specifiedCount = await NoteUnreads.count({ - userId: userId, - isSpecified: true - }); - - if (specifiedCount === 0) { - // 全て既読になったイベントを発行 - publishMainStream(userId, 'readAllUnreadSpecifiedNotes'); - } -} diff --git a/src/services/note/read.ts b/src/services/note/read.ts new file mode 100644 index 0000000000..5a39ab30b7 --- /dev/null +++ b/src/services/note/read.ts @@ -0,0 +1,105 @@ +import { publishMainStream } from '../stream'; +import { Note } from '../../models/entities/note'; +import { User } from '../../models/entities/user'; +import { NoteUnreads, Antennas, AntennaNotes, Users } from '../../models'; +import { Not, IsNull } from 'typeorm'; + +/** + * Mark a note as read + */ +export default async function( + userId: User['id'], + noteId: Note['id'] +) { + async function careNoteUnreads() { + const exist = await NoteUnreads.findOne({ + userId: userId, + noteId: noteId, + }); + + if (!exist) return; + + // Remove the record + await NoteUnreads.delete({ + userId: userId, + noteId: noteId, + }); + + if (exist.isMentioned) { + NoteUnreads.count({ + userId: userId, + isMentioned: true + }).then(mentionsCount => { + if (mentionsCount === 0) { + // 全て既読になったイベントを発行 + publishMainStream(userId, 'readAllUnreadMentions'); + } + }); + } + + if (exist.isSpecified) { + NoteUnreads.count({ + userId: userId, + isSpecified: true + }).then(specifiedCount => { + if (specifiedCount === 0) { + // 全て既読になったイベントを発行 + publishMainStream(userId, 'readAllUnreadSpecifiedNotes'); + } + }); + } + + if (exist.noteChannelId) { + NoteUnreads.count({ + userId: userId, + noteChannelId: Not(IsNull()) + }).then(channelNoteCount => { + if (channelNoteCount === 0) { + // 全て既読になったイベントを発行 + publishMainStream(userId, 'readAllChannels'); + } + }); + } + } + + async function careAntenna() { + const beforeUnread = await Users.getHasUnreadAntenna(userId); + if (!beforeUnread) return; + + const antennas = await Antennas.find({ userId }); + + await Promise.all(antennas.map(async antenna => { + const countBefore = await AntennaNotes.count({ + antennaId: antenna.id, + read: false + }); + + if (countBefore === 0) return; + + await AntennaNotes.update({ + antennaId: antenna.id, + noteId: noteId + }, { + read: true + }); + + const countAfter = await AntennaNotes.count({ + antennaId: antenna.id, + read: false + }); + + if (countAfter === 0) { + publishMainStream(userId, 'readAntenna', antenna); + } + })); + + Users.getHasUnreadAntenna(userId).then(unread => { + if (!unread) { + publishMainStream(userId, 'readAllAntennas'); + } + }); + } + + careNoteUnreads(); + careAntenna(); +} -- cgit v1.2.3-freya From 667d58bad4544d6e9dc75cfc4e6216179e2bc1aa Mon Sep 17 00:00:00 2001 From: syuilo Date: Sun, 21 Mar 2021 17:38:09 +0900 Subject: better note read handling --- src/client/components/note-detailed.vue | 3 +- src/client/components/note.vue | 3 +- src/client/ui/chat/note.vue | 3 +- src/daemons/janitor.ts | 2 + src/server/api/endpoints/notes/mentions.ts | 4 +- src/server/api/stream/channels/antenna.ts | 2 + src/server/api/stream/channels/channel.ts | 2 + src/server/api/stream/channels/global-timeline.ts | 2 + src/server/api/stream/channels/hashtag.ts | 2 + src/server/api/stream/channels/home-timeline.ts | 2 + src/server/api/stream/channels/hybrid-timeline.ts | 2 + src/server/api/stream/channels/local-timeline.ts | 2 + src/server/api/stream/channels/main.ts | 8 ++- src/server/api/stream/index.ts | 58 +++++++++++++--- src/services/note/read.ts | 80 +++++++++-------------- 15 files changed, 109 insertions(+), 66 deletions(-) (limited to 'src/services') diff --git a/src/client/components/note-detailed.vue b/src/client/components/note-detailed.vue index 1ef3f43389..ea26d31100 100644 --- a/src/client/components/note-detailed.vue +++ b/src/client/components/note-detailed.vue @@ -350,7 +350,8 @@ export default defineComponent({ capture(withHandler = false) { if (this.$i) { - this.connection.send(document.body.contains(this.$el) ? 'sn' : 's', { id: this.appearNote.id }); + // TODO: このノートがストリーミング経由で流れてきた場合のみ sr する + this.connection.send(document.body.contains(this.$el) ? 'sr' : 's', { id: this.appearNote.id }); if (withHandler) this.connection.on('noteUpdated', this.onStreamNoteUpdated); } }, diff --git a/src/client/components/note.vue b/src/client/components/note.vue index 65e09b7802..70f49fef7e 100644 --- a/src/client/components/note.vue +++ b/src/client/components/note.vue @@ -325,7 +325,8 @@ export default defineComponent({ capture(withHandler = false) { if (this.$i) { - this.connection.send(document.body.contains(this.$el) ? 'sn' : 's', { id: this.appearNote.id }); + // TODO: このノートがストリーミング経由で流れてきた場合のみ sr する + this.connection.send(document.body.contains(this.$el) ? 'sr' : 's', { id: this.appearNote.id }); if (withHandler) this.connection.on('noteUpdated', this.onStreamNoteUpdated); } }, diff --git a/src/client/ui/chat/note.vue b/src/client/ui/chat/note.vue index 5a4a13d889..97275875ca 100644 --- a/src/client/ui/chat/note.vue +++ b/src/client/ui/chat/note.vue @@ -325,7 +325,8 @@ export default defineComponent({ capture(withHandler = false) { if (this.$i) { - this.connection.send(document.body.contains(this.$el) ? 'sn' : 's', { id: this.appearNote.id }); + // TODO: このノートがストリーミング経由で流れてきた場合のみ sr する + this.connection.send(document.body.contains(this.$el) ? 'sr' : 's', { id: this.appearNote.id }); if (withHandler) this.connection.on('noteUpdated', this.onStreamNoteUpdated); } }, diff --git a/src/daemons/janitor.ts b/src/daemons/janitor.ts index 462ebf915c..c079086427 100644 --- a/src/daemons/janitor.ts +++ b/src/daemons/janitor.ts @@ -1,3 +1,5 @@ +// TODO: 消したい + const interval = 30 * 60 * 1000; import { AttestationChallenges } from '../models'; import { LessThan } from 'typeorm'; diff --git a/src/server/api/endpoints/notes/mentions.ts b/src/server/api/endpoints/notes/mentions.ts index 30844774e0..30368ea578 100644 --- a/src/server/api/endpoints/notes/mentions.ts +++ b/src/server/api/endpoints/notes/mentions.ts @@ -83,9 +83,7 @@ export default define(meta, async (ps, user) => { const mentions = await query.take(ps.limit!).getMany(); - for (const note of mentions) { - read(user.id, note.id); - } + read(user.id, mentions.map(note => note.id)); return await Notes.packMany(mentions, user); }); diff --git a/src/server/api/stream/channels/antenna.ts b/src/server/api/stream/channels/antenna.ts index b5a792f814..36a474f2ac 100644 --- a/src/server/api/stream/channels/antenna.ts +++ b/src/server/api/stream/channels/antenna.ts @@ -27,6 +27,8 @@ export default class extends Channel { // 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する if (isMutedUserRelated(note, this.muting)) return; + this.connection.cacheNote(note); + this.send('note', note); } else { this.send(type, body); diff --git a/src/server/api/stream/channels/channel.ts b/src/server/api/stream/channels/channel.ts index aa570d1ef4..47a52465b2 100644 --- a/src/server/api/stream/channels/channel.ts +++ b/src/server/api/stream/channels/channel.ts @@ -43,6 +43,8 @@ export default class extends Channel { // 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する if (isMutedUserRelated(note, this.muting)) return; + this.connection.cacheNote(note); + this.send('note', note); } diff --git a/src/server/api/stream/channels/global-timeline.ts b/src/server/api/stream/channels/global-timeline.ts index 8c97e67226..8353f45323 100644 --- a/src/server/api/stream/channels/global-timeline.ts +++ b/src/server/api/stream/channels/global-timeline.ts @@ -56,6 +56,8 @@ export default class extends Channel { // そのためレコードが存在するかのチェックでは不十分なので、改めてcheckWordMuteを呼んでいる if (this.userProfile && await checkWordMute(note, this.user, this.userProfile.mutedWords)) return; + this.connection.cacheNote(note); + this.send('note', note); } diff --git a/src/server/api/stream/channels/hashtag.ts b/src/server/api/stream/channels/hashtag.ts index 41447039d5..1b7f8efcc1 100644 --- a/src/server/api/stream/channels/hashtag.ts +++ b/src/server/api/stream/channels/hashtag.ts @@ -37,6 +37,8 @@ export default class extends Channel { // 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する if (isMutedUserRelated(note, this.muting)) return; + this.connection.cacheNote(note); + this.send('note', note); } diff --git a/src/server/api/stream/channels/home-timeline.ts b/src/server/api/stream/channels/home-timeline.ts index 6cfa6eae7b..59ba31c316 100644 --- a/src/server/api/stream/channels/home-timeline.ts +++ b/src/server/api/stream/channels/home-timeline.ts @@ -64,6 +64,8 @@ export default class extends Channel { // そのためレコードが存在するかのチェックでは不十分なので、改めてcheckWordMuteを呼んでいる if (this.userProfile && await checkWordMute(note, this.user, this.userProfile.mutedWords)) return; + this.connection.cacheNote(note); + this.send('note', note); } diff --git a/src/server/api/stream/channels/hybrid-timeline.ts b/src/server/api/stream/channels/hybrid-timeline.ts index a9e577cacb..9715e9973f 100644 --- a/src/server/api/stream/channels/hybrid-timeline.ts +++ b/src/server/api/stream/channels/hybrid-timeline.ts @@ -73,6 +73,8 @@ export default class extends Channel { // そのためレコードが存在するかのチェックでは不十分なので、改めてcheckWordMuteを呼んでいる if (this.userProfile && await checkWordMute(note, this.user, this.userProfile.mutedWords)) return; + this.connection.cacheNote(note); + this.send('note', note); } diff --git a/src/server/api/stream/channels/local-timeline.ts b/src/server/api/stream/channels/local-timeline.ts index a3a5e491fc..e159c72d60 100644 --- a/src/server/api/stream/channels/local-timeline.ts +++ b/src/server/api/stream/channels/local-timeline.ts @@ -58,6 +58,8 @@ export default class extends Channel { // そのためレコードが存在するかのチェックでは不十分なので、改めてcheckWordMuteを呼んでいる if (this.userProfile && await checkWordMute(note, this.user, this.userProfile.mutedWords)) return; + this.connection.cacheNote(note); + this.send('note', note); } diff --git a/src/server/api/stream/channels/main.ts b/src/server/api/stream/channels/main.ts index b69c2ec355..780bc0b89f 100644 --- a/src/server/api/stream/channels/main.ts +++ b/src/server/api/stream/channels/main.ts @@ -18,18 +18,22 @@ export default class extends Channel { case 'notification': { if (this.muting.has(body.userId)) return; if (body.note && body.note.isHidden) { - body.note = await Notes.pack(body.note.id, this.user, { + const note = await Notes.pack(body.note.id, this.user, { detail: true }); + this.connection.cacheNote(note); + body.note = note; } break; } case 'mention': { if (this.muting.has(body.userId)) return; if (body.isHidden) { - body = await Notes.pack(body.id, this.user, { + const note = await Notes.pack(body.id, this.user, { detail: true }); + this.connection.cacheNote(note); + body = note; } break; } diff --git a/src/server/api/stream/index.ts b/src/server/api/stream/index.ts index f67faee1ce..99ae558696 100644 --- a/src/server/api/stream/index.ts +++ b/src/server/api/stream/index.ts @@ -14,6 +14,7 @@ import { AccessToken } from '../../../models/entities/access-token'; import { UserProfile } from '../../../models/entities/user-profile'; import { publishChannelStream, publishGroupMessagingStream, publishMessagingStream } from '../../../services/stream'; import { UserGroup } from '../../../models/entities/user-group'; +import { PackedNote } from '../../../models/repositories/note'; /** * Main stream connection @@ -29,6 +30,7 @@ export default class Connection { public subscriber: EventEmitter; private channels: Channel[] = []; private subscribingNotes: any = {}; + private cachedNotes: PackedNote[] = []; constructor( wsConnection: websocket.connection, @@ -115,9 +117,9 @@ export default class Connection { switch (type) { case 'api': this.onApiRequest(body); break; case 'readNotification': this.onReadNotification(body); break; - case 'subNote': this.onSubscribeNote(body, true); break; - case 'sn': this.onSubscribeNote(body, true); break; // alias - case 's': this.onSubscribeNote(body, false); break; + case 'subNote': this.onSubscribeNote(body); break; + case 's': this.onSubscribeNote(body); break; // alias + case 'sr': this.onSubscribeNote(body); this.readNote(body); break; case 'unsubNote': this.onUnsubscribeNote(body); break; case 'un': this.onUnsubscribeNote(body); break; // alias case 'connect': this.onChannelConnectRequested(body); break; @@ -138,6 +140,48 @@ export default class Connection { this.sendMessageToWs(type, body); } + @autobind + public cacheNote(note: PackedNote) { + const add = (note: PackedNote) => { + const existIndex = this.cachedNotes.findIndex(n => n.id === note.id); + if (existIndex > -1) { + this.cachedNotes[existIndex] = note; + return; + } + + this.cachedNotes.unshift(note); + if (this.cachedNotes.length > 32) { + this.cachedNotes.splice(32); + } + }; + + add(note); + if (note.reply) add(note.reply); + if (note.renote) add(note.renote); + } + + @autobind + private readNote(body: any) { + const id = body.id; + + const note = this.cachedNotes.find(n => n.id === id); + if (note == null) return; + + if (this.user && (note.userId !== this.user.id)) { + if (note.mentions && note.mentions.includes(this.user.id)) { + readNote(this.user.id, [note]); + } else if (note.visibleUserIds && note.visibleUserIds.includes(this.user.id)) { + readNote(this.user.id, [note]); + } + + if (this.followingChannels.has(note.channelId)) { + // TODO + } + + // TODO: アンテナの既読処理 + } + } + /** * APIリクエスト要求時 */ @@ -174,7 +218,7 @@ export default class Connection { * 投稿購読要求時 */ @autobind - private onSubscribeNote(payload: any, read: boolean) { + private onSubscribeNote(payload: any) { if (!payload.id) return; if (this.subscribingNotes[payload.id] == null) { @@ -186,12 +230,6 @@ export default class Connection { if (this.subscribingNotes[payload.id] === 1) { this.subscriber.on(`noteStream:${payload.id}`, this.onNoteStreamMessage); } - - if (this.user && read) { - // TODO: クライアントでタイムライン読み込みなどすると、一度に大量のreadNoteが発生しクエリ数がすごいことになるので、ある程度まとめてreadNoteするようにする - // 具体的には、この箇所ではキュー的な配列にread予定ノートを溜めておくに留めて、別の箇所で定期的にキューにあるノートを配列でreadNoteに渡すような実装にする - readNote(this.user.id, payload.id); - } } /** diff --git a/src/services/note/read.ts b/src/services/note/read.ts index 5a39ab30b7..35279db411 100644 --- a/src/services/note/read.ts +++ b/src/services/note/read.ts @@ -2,70 +2,54 @@ import { publishMainStream } from '../stream'; import { Note } from '../../models/entities/note'; import { User } from '../../models/entities/user'; import { NoteUnreads, Antennas, AntennaNotes, Users } from '../../models'; -import { Not, IsNull } from 'typeorm'; +import { Not, IsNull, In } from 'typeorm'; /** - * Mark a note as read + * Mark notes as read */ export default async function( userId: User['id'], - noteId: Note['id'] + noteIds: Note['id'][] ) { async function careNoteUnreads() { - const exist = await NoteUnreads.findOne({ - userId: userId, - noteId: noteId, - }); - - if (!exist) return; - // Remove the record await NoteUnreads.delete({ userId: userId, - noteId: noteId, + noteId: In(noteIds), }); - if (exist.isMentioned) { - NoteUnreads.count({ - userId: userId, - isMentioned: true - }).then(mentionsCount => { - if (mentionsCount === 0) { - // 全て既読になったイベントを発行 - publishMainStream(userId, 'readAllUnreadMentions'); - } - }); - } + NoteUnreads.count({ + userId: userId, + isMentioned: true + }).then(mentionsCount => { + if (mentionsCount === 0) { + // 全て既読になったイベントを発行 + publishMainStream(userId, 'readAllUnreadMentions'); + } + }); - if (exist.isSpecified) { - NoteUnreads.count({ - userId: userId, - isSpecified: true - }).then(specifiedCount => { - if (specifiedCount === 0) { - // 全て既読になったイベントを発行 - publishMainStream(userId, 'readAllUnreadSpecifiedNotes'); - } - }); - } + NoteUnreads.count({ + userId: userId, + isSpecified: true + }).then(specifiedCount => { + if (specifiedCount === 0) { + // 全て既読になったイベントを発行 + publishMainStream(userId, 'readAllUnreadSpecifiedNotes'); + } + }); - if (exist.noteChannelId) { - NoteUnreads.count({ - userId: userId, - noteChannelId: Not(IsNull()) - }).then(channelNoteCount => { - if (channelNoteCount === 0) { - // 全て既読になったイベントを発行 - publishMainStream(userId, 'readAllChannels'); - } - }); - } + NoteUnreads.count({ + userId: userId, + noteChannelId: Not(IsNull()) + }).then(channelNoteCount => { + if (channelNoteCount === 0) { + // 全て既読になったイベントを発行 + publishMainStream(userId, 'readAllChannels'); + } + }); } async function careAntenna() { - const beforeUnread = await Users.getHasUnreadAntenna(userId); - if (!beforeUnread) return; - const antennas = await Antennas.find({ userId }); await Promise.all(antennas.map(async antenna => { @@ -78,7 +62,7 @@ export default async function( await AntennaNotes.update({ antennaId: antenna.id, - noteId: noteId + noteId: In(noteIds) }, { read: true }); -- cgit v1.2.3-freya From c4c20bee7c58ea7330dbc890b9564bd100ac6e25 Mon Sep 17 00:00:00 2001 From: syuilo Date: Sun, 21 Mar 2021 21:27:09 +0900 Subject: wip #6441 --- src/models/entities/note-reaction.ts | 4 ++-- src/server/api/endpoints/admin/invite.ts | 2 +- src/server/api/endpoints/admin/promo/create.ts | 2 +- src/server/api/endpoints/auth/accept.ts | 2 +- src/server/api/endpoints/channels/follow.ts | 2 +- src/server/api/endpoints/clips/add-note.ts | 2 +- src/server/api/endpoints/i/read-announcement.ts | 2 +- src/server/api/endpoints/miauth/gen-token.ts | 2 +- src/server/api/endpoints/notes/favorites/create.ts | 2 +- src/server/api/endpoints/pages/like.ts | 2 +- src/server/api/endpoints/promo/read.ts | 2 +- src/server/api/endpoints/sw/register.ts | 2 +- src/server/api/endpoints/users/groups/create.ts | 2 +- .../api/endpoints/users/groups/invitations/accept.ts | 2 +- src/server/api/private/signin.ts | 4 ++-- src/services/add-note-to-antenna.ts | 2 +- src/services/blocking/create.ts | 2 +- src/services/following/create.ts | 2 +- src/services/i/pin.ts | 2 +- src/services/insert-moderation-log.ts | 2 +- src/services/messages/create.ts | 6 ++++-- src/services/note/create.ts | 2 +- src/services/note/polls/vote.ts | 2 +- src/services/note/reaction/create.ts | 16 ++++++++-------- src/services/note/unread.ts | 6 ++++-- src/services/note/watch.ts | 2 +- src/services/update-hashtag.ts | 4 ++-- src/services/user-list/push.ts | 2 +- 28 files changed, 44 insertions(+), 40 deletions(-) (limited to 'src/services') diff --git a/src/models/entities/note-reaction.ts b/src/models/entities/note-reaction.ts index 69bb663fd3..674dc3639e 100644 --- a/src/models/entities/note-reaction.ts +++ b/src/models/entities/note-reaction.ts @@ -23,7 +23,7 @@ export class NoteReaction { onDelete: 'CASCADE' }) @JoinColumn() - public user: User | null; + public user?: User | null; @Index() @Column(id()) @@ -33,7 +33,7 @@ export class NoteReaction { onDelete: 'CASCADE' }) @JoinColumn() - public note: Note | null; + public note?: Note | null; // TODO: 対象noteのuserIdを非正規化したい(「受け取ったリアクション一覧」のようなものを(JOIN無しで)実装したいため) diff --git a/src/server/api/endpoints/admin/invite.ts b/src/server/api/endpoints/admin/invite.ts index 4529d16adf..987105791f 100644 --- a/src/server/api/endpoints/admin/invite.ts +++ b/src/server/api/endpoints/admin/invite.ts @@ -38,7 +38,7 @@ export default define(meta, async () => { chars: '2-9A-HJ-NP-Z', // [0-9A-Z] w/o [01IO] (32 patterns) }); - await RegistrationTickets.save({ + await RegistrationTickets.insert({ id: genId(), createdAt: new Date(), code, diff --git a/src/server/api/endpoints/admin/promo/create.ts b/src/server/api/endpoints/admin/promo/create.ts index 8b96d563c2..aa22e68ebd 100644 --- a/src/server/api/endpoints/admin/promo/create.ts +++ b/src/server/api/endpoints/admin/promo/create.ts @@ -53,7 +53,7 @@ export default define(meta, async (ps, user) => { throw new ApiError(meta.errors.alreadyPromoted); } - await PromoNotes.save({ + await PromoNotes.insert({ noteId: note.id, createdAt: new Date(), expiresAt: new Date(ps.expiresAt), diff --git a/src/server/api/endpoints/auth/accept.ts b/src/server/api/endpoints/auth/accept.ts index 6d4d31fa1e..444053a946 100644 --- a/src/server/api/endpoints/auth/accept.ts +++ b/src/server/api/endpoints/auth/accept.ts @@ -58,7 +58,7 @@ export default define(meta, async (ps, user) => { const now = new Date(); // Insert access token doc - await AccessTokens.save({ + await AccessTokens.insert({ id: genId(), createdAt: now, lastUsedAt: now, diff --git a/src/server/api/endpoints/channels/follow.ts b/src/server/api/endpoints/channels/follow.ts index 11c6e37ff7..c5976a8a34 100644 --- a/src/server/api/endpoints/channels/follow.ts +++ b/src/server/api/endpoints/channels/follow.ts @@ -37,7 +37,7 @@ export default define(meta, async (ps, user) => { throw new ApiError(meta.errors.noSuchChannel); } - await ChannelFollowings.save({ + await ChannelFollowings.insert({ id: genId(), createdAt: new Date(), followerId: user.id, diff --git a/src/server/api/endpoints/clips/add-note.ts b/src/server/api/endpoints/clips/add-note.ts index 4f5cc649e3..ee6a117b2d 100644 --- a/src/server/api/endpoints/clips/add-note.ts +++ b/src/server/api/endpoints/clips/add-note.ts @@ -68,7 +68,7 @@ export default define(meta, async (ps, user) => { throw new ApiError(meta.errors.alreadyClipped); } - await ClipNotes.save({ + await ClipNotes.insert({ id: genId(), noteId: note.id, clipId: clip.id diff --git a/src/server/api/endpoints/i/read-announcement.ts b/src/server/api/endpoints/i/read-announcement.ts index 4a4a021af9..d6acb3d2e6 100644 --- a/src/server/api/endpoints/i/read-announcement.ts +++ b/src/server/api/endpoints/i/read-announcement.ts @@ -52,7 +52,7 @@ export default define(meta, async (ps, user) => { } // Create read - await AnnouncementReads.save({ + await AnnouncementReads.insert({ id: genId(), createdAt: new Date(), announcementId: ps.announcementId, diff --git a/src/server/api/endpoints/miauth/gen-token.ts b/src/server/api/endpoints/miauth/gen-token.ts index 0634debb1e..401ed16389 100644 --- a/src/server/api/endpoints/miauth/gen-token.ts +++ b/src/server/api/endpoints/miauth/gen-token.ts @@ -52,7 +52,7 @@ export default define(meta, async (ps, user) => { const now = new Date(); // Insert access token doc - await AccessTokens.save({ + await AccessTokens.insert({ id: genId(), createdAt: now, lastUsedAt: now, diff --git a/src/server/api/endpoints/notes/favorites/create.ts b/src/server/api/endpoints/notes/favorites/create.ts index 952bbfd0eb..d66ce37a46 100644 --- a/src/server/api/endpoints/notes/favorites/create.ts +++ b/src/server/api/endpoints/notes/favorites/create.ts @@ -61,7 +61,7 @@ export default define(meta, async (ps, user) => { } // Create favorite - await NoteFavorites.save({ + await NoteFavorites.insert({ id: genId(), createdAt: new Date(), noteId: note.id, diff --git a/src/server/api/endpoints/pages/like.ts b/src/server/api/endpoints/pages/like.ts index 5c7e13f1c8..3fc2b6ca23 100644 --- a/src/server/api/endpoints/pages/like.ts +++ b/src/server/api/endpoints/pages/like.ts @@ -68,7 +68,7 @@ export default define(meta, async (ps, user) => { } // Create like - await PageLikes.save({ + await PageLikes.insert({ id: genId(), createdAt: new Date(), pageId: page.id, diff --git a/src/server/api/endpoints/promo/read.ts b/src/server/api/endpoints/promo/read.ts index 57eb0681e5..63c90e5d7f 100644 --- a/src/server/api/endpoints/promo/read.ts +++ b/src/server/api/endpoints/promo/read.ts @@ -46,7 +46,7 @@ export default define(meta, async (ps, user) => { return; } - await PromoReads.save({ + await PromoReads.insert({ id: genId(), createdAt: new Date(), noteId: note.id, diff --git a/src/server/api/endpoints/sw/register.ts b/src/server/api/endpoints/sw/register.ts index ceb70a9274..9fc70b5609 100644 --- a/src/server/api/endpoints/sw/register.ts +++ b/src/server/api/endpoints/sw/register.ts @@ -58,7 +58,7 @@ export default define(meta, async (ps, user) => { }; } - await SwSubscriptions.save({ + await SwSubscriptions.insert({ id: genId(), createdAt: new Date(), userId: user.id, diff --git a/src/server/api/endpoints/users/groups/create.ts b/src/server/api/endpoints/users/groups/create.ts index ca011d5cd6..78d2714874 100644 --- a/src/server/api/endpoints/users/groups/create.ts +++ b/src/server/api/endpoints/users/groups/create.ts @@ -39,7 +39,7 @@ export default define(meta, async (ps, user) => { } as UserGroup); // Push the owner - await UserGroupJoinings.save({ + await UserGroupJoinings.insert({ id: genId(), createdAt: new Date(), userId: user.id, diff --git a/src/server/api/endpoints/users/groups/invitations/accept.ts b/src/server/api/endpoints/users/groups/invitations/accept.ts index e86709f83b..2fa22bcf7e 100644 --- a/src/server/api/endpoints/users/groups/invitations/accept.ts +++ b/src/server/api/endpoints/users/groups/invitations/accept.ts @@ -52,7 +52,7 @@ export default define(meta, async (ps, user) => { } // Push the user - await UserGroupJoinings.save({ + await UserGroupJoinings.insert({ id: genId(), createdAt: new Date(), userId: user.id, diff --git a/src/server/api/private/signin.ts b/src/server/api/private/signin.ts index 7a5efc6cc9..d8f2e6d516 100644 --- a/src/server/api/private/signin.ts +++ b/src/server/api/private/signin.ts @@ -53,7 +53,7 @@ export default async (ctx: Koa.Context) => { async function fail(status?: number, failure?: { error: string }) { // Append signin history - await Signins.save({ + await Signins.insert({ id: genId(), createdAt: new Date(), userId: user.id, @@ -198,7 +198,7 @@ export default async (ctx: Koa.Context) => { const challengeId = genId(); - await AttestationChallenges.save({ + await AttestationChallenges.insert({ userId: user.id, id: challengeId, challenge: hash(Buffer.from(challenge, 'utf-8')).toString('hex'), diff --git a/src/services/add-note-to-antenna.ts b/src/services/add-note-to-antenna.ts index 2c893488c3..3ba3d1eef5 100644 --- a/src/services/add-note-to-antenna.ts +++ b/src/services/add-note-to-antenna.ts @@ -10,7 +10,7 @@ export async function addNoteToAntenna(antenna: Antenna, note: Note, noteUser: U // 通知しない設定になっているか、自分自身の投稿なら既読にする const read = !antenna.notify || (antenna.userId === noteUser.id); - AntennaNotes.save({ + AntennaNotes.insert({ id: genId(), antennaId: antenna.id, noteId: note.id, diff --git a/src/services/blocking/create.ts b/src/services/blocking/create.ts index 4f0238db91..dec48d26de 100644 --- a/src/services/blocking/create.ts +++ b/src/services/blocking/create.ts @@ -18,7 +18,7 @@ export default async function(blocker: User, blockee: User) { unFollow(blockee, blocker) ]); - await Blockings.save({ + await Blockings.insert({ id: genId(), createdAt: new Date(), blockerId: blocker.id, diff --git a/src/services/following/create.ts b/src/services/following/create.ts index eb6699b0bf..1ce75caca0 100644 --- a/src/services/following/create.ts +++ b/src/services/following/create.ts @@ -22,7 +22,7 @@ export async function insertFollowingDoc(followee: User, follower: User) { let alreadyFollowed = false; - await Followings.save({ + await Followings.insert({ id: genId(), createdAt: new Date(), followerId: follower.id, diff --git a/src/services/i/pin.ts b/src/services/i/pin.ts index 1ff5476b40..f727a10fb6 100644 --- a/src/services/i/pin.ts +++ b/src/services/i/pin.ts @@ -37,7 +37,7 @@ export async function addPinned(user: User, noteId: Note['id']) { throw new IdentifiableError('23f0cf4e-59a3-4276-a91d-61a5891c1514', 'That note has already been pinned.'); } - await UserNotePinings.save({ + await UserNotePinings.insert({ id: genId(), createdAt: new Date(), userId: user.id, diff --git a/src/services/insert-moderation-log.ts b/src/services/insert-moderation-log.ts index 33dab97259..87587d3bed 100644 --- a/src/services/insert-moderation-log.ts +++ b/src/services/insert-moderation-log.ts @@ -3,7 +3,7 @@ import { ModerationLogs } from '../models'; import { genId } from '../misc/gen-id'; export async function insertModerationLog(moderator: ILocalUser, type: string, info?: Record) { - await ModerationLogs.save({ + await ModerationLogs.insert({ id: genId(), createdAt: new Date(), userId: moderator.id, diff --git a/src/services/messages/create.ts b/src/services/messages/create.ts index 8646ce37fc..413266d029 100644 --- a/src/services/messages/create.ts +++ b/src/services/messages/create.ts @@ -14,7 +14,7 @@ import { renderActivity } from '../../remote/activitypub/renderer'; import { deliver } from '../../queue'; export async function createMessage(user: User, recipientUser: User | undefined, recipientGroup: UserGroup | undefined, text: string | undefined, file: DriveFile | null, uri?: string) { - const message = await MessagingMessages.save({ + const message = { id: genId(), createdAt: new Date(), fileId: file ? file.id : null, @@ -25,7 +25,9 @@ export async function createMessage(user: User, recipientUser: User | undefined, isRead: false, reads: [] as any[], uri - } as MessagingMessage); + } as MessagingMessage; + + await MessagingMessages.insert(message); const messageObj = await MessagingMessages.pack(message); diff --git a/src/services/note/create.ts b/src/services/note/create.ts index 563eaac758..7c7e8d9a08 100644 --- a/src/services/note/create.ts +++ b/src/services/note/create.ts @@ -247,7 +247,7 @@ export default async (user: User, data: Option, silent = false) => new Promise { if (shouldMute) { - MutedNotes.save({ + MutedNotes.insert({ id: genId(), userId: u.userId, noteId: note.id, diff --git a/src/services/note/polls/vote.ts b/src/services/note/polls/vote.ts index bfcaaa09be..b4ce03ab60 100644 --- a/src/services/note/polls/vote.ts +++ b/src/services/note/polls/vote.ts @@ -29,7 +29,7 @@ export default async function(user: User, note: Note, choice: number) { } // Create vote - await PollVotes.save({ + await PollVotes.insert({ id: genId(), createdAt: new Date(), noteId: note.id, diff --git a/src/services/note/reaction/create.ts b/src/services/note/reaction/create.ts index 6c0a852f34..897c816de8 100644 --- a/src/services/note/reaction/create.ts +++ b/src/services/note/reaction/create.ts @@ -18,17 +18,17 @@ export default async (user: User, note: Note, reaction?: string) => { // TODO: cache reaction = await toDbReaction(reaction, user.host); - let record: NoteReaction; + let record: NoteReaction = { + id: genId(), + createdAt: new Date(), + noteId: note.id, + userId: user.id, + reaction + }; // Create reaction try { - record = await NoteReactions.save({ - id: genId(), - createdAt: new Date(), - noteId: note.id, - userId: user.id, - reaction - }); + await NoteReactions.insert(record); } catch (e) { if (isDuplicateKeyValueError(e)) { record = await NoteReactions.findOneOrFail({ diff --git a/src/services/note/unread.ts b/src/services/note/unread.ts index 6fd9ee2cfe..8e6fb4abe8 100644 --- a/src/services/note/unread.ts +++ b/src/services/note/unread.ts @@ -17,7 +17,7 @@ export default async function(userId: User['id'], note: Note, params: { if (mute.map(m => m.muteeId).includes(note.userId)) return; //#endregion - const unread = await NoteUnreads.save({ + const unread = { id: genId(), noteId: note.id, userId: userId, @@ -25,7 +25,9 @@ export default async function(userId: User['id'], note: Note, params: { isMentioned: params.isMentioned, noteChannelId: note.channelId, noteUserId: note.userId, - }); + }; + + await NoteUnreads.insert(unread); // 2秒経っても既読にならなかったら「未読の投稿がありますよ」イベントを発行する setTimeout(async () => { diff --git a/src/services/note/watch.ts b/src/services/note/watch.ts index d3c9553696..966b7f0054 100644 --- a/src/services/note/watch.ts +++ b/src/services/note/watch.ts @@ -10,7 +10,7 @@ export default async (me: User['id'], note: Note) => { return; } - await NoteWatchings.save({ + await NoteWatchings.insert({ id: genId(), createdAt: new Date(), noteId: note.id, diff --git a/src/services/update-hashtag.ts b/src/services/update-hashtag.ts index 1dcb582791..3e22846731 100644 --- a/src/services/update-hashtag.ts +++ b/src/services/update-hashtag.ts @@ -86,7 +86,7 @@ export async function updateHashtag(user: User, tag: string, isUserAttached = fa } } else { if (isUserAttached) { - Hashtags.save({ + Hashtags.insert({ id: genId(), name: tag, mentionedUserIds: [], @@ -103,7 +103,7 @@ export async function updateHashtag(user: User, tag: string, isUserAttached = fa attachedRemoteUsersCount: Users.isRemoteUser(user) ? 1 : 0, } as Hashtag); } else { - Hashtags.save({ + Hashtags.insert({ id: genId(), name: tag, mentionedUserIds: [user.id], diff --git a/src/services/user-list/push.ts b/src/services/user-list/push.ts index e67be4b027..ba54c04475 100644 --- a/src/services/user-list/push.ts +++ b/src/services/user-list/push.ts @@ -8,7 +8,7 @@ import { fetchProxyAccount } from '../../misc/fetch-proxy-account'; import createFollowing from '../following/create'; export async function pushUserToUserList(target: User, list: UserList) { - await UserListJoinings.save({ + await UserListJoinings.insert({ id: genId(), createdAt: new Date(), userId: target.id, -- cgit v1.2.3-freya From a4a9b8707d2d8100e3601679d479dd3b13e73c9a Mon Sep 17 00:00:00 2001 From: syuilo Date: Sun, 21 Mar 2021 21:27:49 +0900 Subject: perf(server): Reduce database query --- src/services/note/reaction/create.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'src/services') diff --git a/src/services/note/reaction/create.ts b/src/services/note/reaction/create.ts index 897c816de8..181099cc2d 100644 --- a/src/services/note/reaction/create.ts +++ b/src/services/note/reaction/create.ts @@ -53,12 +53,11 @@ export default async (user: User, note: Note, reaction?: string) => { await Notes.createQueryBuilder().update() .set({ reactions: () => sql, + score: () => '"score" + 1' }) .where('id = :id', { id: note.id }) .execute(); - Notes.increment({ id: note.id }, 'score', 1); - perUserReactionsChart.update(user, note); // カスタム絵文字リアクションだったら絵文字情報も送る -- cgit v1.2.3-freya From fb194b855b7548672d95071adfb5f354e78d3f32 Mon Sep 17 00:00:00 2001 From: syuilo Date: Sun, 21 Mar 2021 22:09:32 +0900 Subject: perf(server): Reduce database query --- src/services/note/create.ts | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) (limited to 'src/services') diff --git a/src/services/note/create.ts b/src/services/note/create.ts index 7c7e8d9a08..a85e72c5f9 100644 --- a/src/services/note/create.ts +++ b/src/services/note/create.ts @@ -594,10 +594,13 @@ function saveReply(reply: Note, note: Note) { } function incNotesCountOfUser(user: User) { - Users.increment({ id: user.id }, 'notesCount', 1); - Users.update({ id: user.id }, { - updatedAt: new Date() - }); + Users.createQueryBuilder().update() + .set({ + updatedAt: new Date(), + notesCount: () => '"notesCount" + 1' + }) + .where('id = :id', { id: user.id }) + .execute(); } async function extractMentionedUsers(user: User, tokens: ReturnType): Promise { -- cgit v1.2.3-freya From 82de8b7c50767e71f5414481fbb4e5ff7a449593 Mon Sep 17 00:00:00 2001 From: syuilo Date: Sun, 21 Mar 2021 22:15:45 +0900 Subject: perf(server): Reduce database query --- src/services/note/create.ts | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) (limited to 'src/services') diff --git a/src/services/note/create.ts b/src/services/note/create.ts index a85e72c5f9..96177e9758 100644 --- a/src/services/note/create.ts +++ b/src/services/note/create.ts @@ -444,8 +444,13 @@ async function renderNoteOrRenoteActivity(data: Option, note: Note) { } function incRenoteCount(renote: Note) { - Notes.increment({ id: renote.id }, 'renoteCount', 1); - Notes.increment({ id: renote.id }, 'score', 1); + Notes.createQueryBuilder().update() + .set({ + renoteCount: () => '"renoteCount" + 1', + score: () => '"score" + 1' + }) + .where('id = :id', { id: renote.id }) + .execute(); } async function insertNote(user: User, data: Option, tags: string[], emojis: string[], mentionedUsers: User[]) { @@ -525,7 +530,7 @@ async function insertNote(user: User, data: Option, tags: string[], emojis: stri await Notes.insert(insert); } - return await Notes.findOneOrFail(insert.id); + return insert; } catch (e) { // duplicate key error if (isDuplicateKeyValueError(e)) { -- cgit v1.2.3-freya From 2f2a8e537d03cea8d43b1fa84b8f1a48934f5d63 Mon Sep 17 00:00:00 2001 From: syuilo Date: Sun, 21 Mar 2021 22:26:45 +0900 Subject: fix bug --- src/models/repositories/notification.ts | 2 +- src/services/create-notification.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'src/services') diff --git a/src/models/repositories/notification.ts b/src/models/repositories/notification.ts index 1027155873..986ddb1d42 100644 --- a/src/models/repositories/notification.ts +++ b/src/models/repositories/notification.ts @@ -16,7 +16,7 @@ export class NotificationRepository extends Repository { public async pack( src: Notification['id'] | Notification, options: { - _hintForEachNotes_: { + _hintForEachNotes_?: { emojis: Emoji[] | null; myReactions: Map; }; diff --git a/src/services/create-notification.ts b/src/services/create-notification.ts index 6cd116040a..dedb8eac8d 100644 --- a/src/services/create-notification.ts +++ b/src/services/create-notification.ts @@ -30,7 +30,7 @@ export async function createNotification( ...data } as Partial); - const packed = await Notifications.pack(notification); + const packed = await Notifications.pack(notification, {}); // Publish notification event publishMainStream(notifieeId, 'notification', packed); -- cgit v1.2.3-freya From e881e1bfb3d1bf0e1a453235b5fe104f276c1392 Mon Sep 17 00:00:00 2001 From: syuilo Date: Mon, 22 Mar 2021 10:45:07 +0900 Subject: perf(server): Reduce database query --- src/services/note/create.ts | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) (limited to 'src/services') diff --git a/src/services/note/create.ts b/src/services/note/create.ts index 96177e9758..4a737e8516 100644 --- a/src/services/note/create.ts +++ b/src/services/note/create.ts @@ -259,21 +259,21 @@ export default async (user: User, data: Option, silent = false) => new Promise { - const followings = await Followings.createQueryBuilder('following') - .andWhere(`following.followeeId = :userId`, { userId: note.userId }) - .getMany(); - - const followers = followings.map(f => f.followerId); - - for (const antenna of antennas) { - checkHitAntenna(antenna, note, user, followers).then(hit => { - if (hit) { - addNoteToAntenna(antenna, note, user); + Followings.createQueryBuilder('following') + .andWhere(`following.followeeId = :userId`, { userId: note.userId }) + .getMany() + .then(followings => { + const followers = followings.map(f => f.followerId); + Antennas.find().then(async antennas => { + for (const antenna of antennas) { + checkHitAntenna(antenna, note, user, followers).then(hit => { + if (hit) { + addNoteToAntenna(antenna, note, user); + } + }); } }); - } - }); + }); // Channel if (note.channelId) { -- cgit v1.2.3-freya