diff options
Diffstat (limited to 'src/services')
39 files changed, 617 insertions, 244 deletions
diff --git a/src/services/add-note-to-antenna.ts b/src/services/add-note-to-antenna.ts index 2c893488c3..3ba3d1eef5 100644 --- a/src/services/add-note-to-antenna.ts +++ b/src/services/add-note-to-antenna.ts @@ -10,7 +10,7 @@ export async function addNoteToAntenna(antenna: Antenna, note: Note, noteUser: U // 通知しない設定になっているか、自分自身の投稿なら既読にする const read = !antenna.notify || (antenna.userId === noteUser.id); - AntennaNotes.save({ + AntennaNotes.insert({ id: genId(), antennaId: antenna.id, noteId: note.id, diff --git a/src/services/blocking/create.ts b/src/services/blocking/create.ts index def4f33585..dec48d26de 100644 --- a/src/services/blocking/create.ts +++ b/src/services/blocking/create.ts @@ -1,4 +1,4 @@ -import { publishMainStream } from '../stream'; +import { publishMainStream, publishUserEvent } from '../stream'; import { renderActivity } from '../../remote/activitypub/renderer'; import renderFollow from '../../remote/activitypub/renderer/follow'; import renderUndo from '../../remote/activitypub/renderer/undo'; @@ -18,7 +18,7 @@ export default async function(blocker: User, blockee: User) { unFollow(blockee, blocker) ]); - await Blockings.save({ + await Blockings.insert({ id: genId(), createdAt: new Date(), blockerId: blocker.id, @@ -55,7 +55,10 @@ async function cancelRequest(follower: User, followee: User) { if (Users.isLocalUser(follower)) { Users.pack(followee, follower, { detail: true - }).then(packed => publishMainStream(follower.id, 'unfollow', packed)); + }).then(packed => { + publishUserEvent(follower.id, 'unfollow', packed); + publishMainStream(follower.id, 'unfollow', packed); + }); } // リモートにフォローリクエストをしていたらUndoFollow送信 @@ -97,7 +100,10 @@ async function unFollow(follower: User, followee: User) { if (Users.isLocalUser(follower)) { Users.pack(followee, follower, { detail: true - }).then(packed => publishMainStream(follower.id, 'unfollow', packed)); + }).then(packed => { + publishUserEvent(follower.id, 'unfollow', packed); + publishMainStream(follower.id, 'unfollow', packed); + }); } // リモートにフォローをしていたらUndoFollow送信 diff --git a/src/services/chart/charts/classes/active-users.ts b/src/services/chart/charts/classes/active-users.ts index 5128150de6..4820f8281b 100644 --- a/src/services/chart/charts/classes/active-users.ts +++ b/src/services/chart/charts/classes/active-users.ts @@ -18,6 +18,18 @@ export default class ActiveUsersChart extends Chart<ActiveUsersLog> { } @autobind + protected aggregate(logs: ActiveUsersLog[]): ActiveUsersLog { + return { + local: { + users: logs.reduce((a, b) => a.concat(b.local.users), [] as ActiveUsersLog['local']['users']), + }, + remote: { + users: logs.reduce((a, b) => a.concat(b.remote.users), [] as ActiveUsersLog['remote']['users']), + }, + }; + } + + @autobind protected async fetchActual(): Promise<DeepPartial<ActiveUsersLog>> { return {}; } @@ -25,11 +37,11 @@ export default class ActiveUsersChart extends Chart<ActiveUsersLog> { @autobind public async update(user: User) { const update: Obj = { - count: 1 + users: [user.id] }; - await this.incIfUnique({ + await this.inc({ [Users.isLocalUser(user) ? 'local' : 'remote']: update - }, 'users', user.id); + }); } } diff --git a/src/services/chart/charts/classes/drive.ts b/src/services/chart/charts/classes/drive.ts index 57bb120beb..46399a34d8 100644 --- a/src/services/chart/charts/classes/drive.ts +++ b/src/services/chart/charts/classes/drive.ts @@ -28,6 +28,28 @@ export default class DriveChart extends Chart<DriveLog> { } @autobind + protected aggregate(logs: DriveLog[]): DriveLog { + return { + local: { + totalCount: logs[0].local.totalCount, + totalSize: logs[0].local.totalSize, + incCount: logs.reduce((a, b) => a + b.local.incCount, 0), + incSize: logs.reduce((a, b) => a + b.local.incSize, 0), + decCount: logs.reduce((a, b) => a + b.local.decCount, 0), + decSize: logs.reduce((a, b) => a + b.local.decSize, 0), + }, + remote: { + totalCount: logs[0].remote.totalCount, + totalSize: logs[0].remote.totalSize, + incCount: logs.reduce((a, b) => a + b.remote.incCount, 0), + incSize: logs.reduce((a, b) => a + b.remote.incSize, 0), + decCount: logs.reduce((a, b) => a + b.remote.decCount, 0), + decSize: logs.reduce((a, b) => a + b.remote.decSize, 0), + }, + }; + } + + @autobind protected async fetchActual(): Promise<DeepPartial<DriveLog>> { const [localCount, remoteCount, localSize, remoteSize] = await Promise.all([ DriveFiles.count({ userHost: null }), diff --git a/src/services/chart/charts/classes/federation.ts b/src/services/chart/charts/classes/federation.ts index bd2c497e7b..ab6ec2d4dd 100644 --- a/src/services/chart/charts/classes/federation.ts +++ b/src/services/chart/charts/classes/federation.ts @@ -21,6 +21,17 @@ export default class FederationChart extends Chart<FederationLog> { } @autobind + protected aggregate(logs: FederationLog[]): FederationLog { + return { + instance: { + total: logs[0].instance.total, + inc: logs.reduce((a, b) => a + b.instance.inc, 0), + dec: logs.reduce((a, b) => a + b.instance.dec, 0), + }, + }; + } + + @autobind protected async fetchActual(): Promise<DeepPartial<FederationLog>> { const [total] = await Promise.all([ Instances.count({}) diff --git a/src/services/chart/charts/classes/hashtag.ts b/src/services/chart/charts/classes/hashtag.ts index 38c3a94f0c..43db5b0a83 100644 --- a/src/services/chart/charts/classes/hashtag.ts +++ b/src/services/chart/charts/classes/hashtag.ts @@ -18,6 +18,18 @@ export default class HashtagChart extends Chart<HashtagLog> { } @autobind + protected aggregate(logs: HashtagLog[]): HashtagLog { + return { + local: { + users: logs.reduce((a, b) => a.concat(b.local.users), [] as HashtagLog['local']['users']), + }, + remote: { + users: logs.reduce((a, b) => a.concat(b.remote.users), [] as HashtagLog['remote']['users']), + }, + }; + } + + @autobind protected async fetchActual(): Promise<DeepPartial<HashtagLog>> { return {}; } @@ -25,11 +37,11 @@ export default class HashtagChart extends Chart<HashtagLog> { @autobind public async update(hashtag: string, user: User) { const update: Obj = { - count: 1 + users: [user.id] }; - await this.incIfUnique({ + await this.inc({ [Users.isLocalUser(user) ? 'local' : 'remote']: update - }, 'users', user.id, hashtag); + }, hashtag); } } diff --git a/src/services/chart/charts/classes/instance.ts b/src/services/chart/charts/classes/instance.ts index 7575abfb6f..c32b864d87 100644 --- a/src/services/chart/charts/classes/instance.ts +++ b/src/services/chart/charts/classes/instance.ts @@ -37,6 +37,50 @@ export default class InstanceChart extends Chart<InstanceLog> { } @autobind + protected aggregate(logs: InstanceLog[]): InstanceLog { + return { + requests: { + failed: logs.reduce((a, b) => a + b.requests.failed, 0), + succeeded: logs.reduce((a, b) => a + b.requests.succeeded, 0), + received: logs.reduce((a, b) => a + b.requests.received, 0), + }, + notes: { + total: logs[0].notes.total, + inc: logs.reduce((a, b) => a + b.notes.inc, 0), + dec: logs.reduce((a, b) => a + b.notes.dec, 0), + diffs: { + reply: logs.reduce((a, b) => a + b.notes.diffs.reply, 0), + renote: logs.reduce((a, b) => a + b.notes.diffs.renote, 0), + normal: logs.reduce((a, b) => a + b.notes.diffs.normal, 0), + }, + }, + users: { + total: logs[0].users.total, + inc: logs.reduce((a, b) => a + b.users.inc, 0), + dec: logs.reduce((a, b) => a + b.users.dec, 0), + }, + following: { + total: logs[0].following.total, + inc: logs.reduce((a, b) => a + b.following.inc, 0), + dec: logs.reduce((a, b) => a + b.following.dec, 0), + }, + followers: { + total: logs[0].followers.total, + inc: logs.reduce((a, b) => a + b.followers.inc, 0), + dec: logs.reduce((a, b) => a + b.followers.dec, 0), + }, + drive: { + totalFiles: logs[0].drive.totalFiles, + totalUsage: logs[0].drive.totalUsage, + incFiles: logs.reduce((a, b) => a + b.drive.incFiles, 0), + incUsage: logs.reduce((a, b) => a + b.drive.incUsage, 0), + decFiles: logs.reduce((a, b) => a + b.drive.decFiles, 0), + decUsage: logs.reduce((a, b) => a + b.drive.decUsage, 0), + }, + }; + } + + @autobind protected async fetchActual(group: string): Promise<DeepPartial<InstanceLog>> { const [ notesCount, diff --git a/src/services/chart/charts/classes/network.ts b/src/services/chart/charts/classes/network.ts index 8b26e5c4c2..693af48f73 100644 --- a/src/services/chart/charts/classes/network.ts +++ b/src/services/chart/charts/classes/network.ts @@ -16,6 +16,17 @@ export default class NetworkChart extends Chart<NetworkLog> { } @autobind + protected aggregate(logs: NetworkLog[]): NetworkLog { + return { + incomingRequests: logs.reduce((a, b) => a + b.incomingRequests, 0), + outgoingRequests: logs.reduce((a, b) => a + b.outgoingRequests, 0), + totalTime: logs.reduce((a, b) => a + b.totalTime, 0), + incomingBytes: logs.reduce((a, b) => a + b.incomingBytes, 0), + outgoingBytes: logs.reduce((a, b) => a + b.outgoingBytes, 0), + }; + } + + @autobind protected async fetchActual(): Promise<DeepPartial<NetworkLog>> { return {}; } diff --git a/src/services/chart/charts/classes/notes.ts b/src/services/chart/charts/classes/notes.ts index 815061c445..965087bc08 100644 --- a/src/services/chart/charts/classes/notes.ts +++ b/src/services/chart/charts/classes/notes.ts @@ -26,6 +26,32 @@ export default class NotesChart extends Chart<NotesLog> { } @autobind + protected aggregate(logs: NotesLog[]): NotesLog { + return { + local: { + total: logs[0].local.total, + inc: logs.reduce((a, b) => a + b.local.inc, 0), + dec: logs.reduce((a, b) => a + b.local.dec, 0), + diffs: { + reply: logs.reduce((a, b) => a + b.local.diffs.reply, 0), + renote: logs.reduce((a, b) => a + b.local.diffs.renote, 0), + normal: logs.reduce((a, b) => a + b.local.diffs.normal, 0), + }, + }, + remote: { + total: logs[0].remote.total, + inc: logs.reduce((a, b) => a + b.remote.inc, 0), + dec: logs.reduce((a, b) => a + b.remote.dec, 0), + diffs: { + reply: logs.reduce((a, b) => a + b.remote.diffs.reply, 0), + renote: logs.reduce((a, b) => a + b.remote.diffs.renote, 0), + normal: logs.reduce((a, b) => a + b.remote.diffs.normal, 0), + }, + }, + }; + } + + @autobind protected async fetchActual(): Promise<DeepPartial<NotesLog>> { const [localCount, remoteCount] = await Promise.all([ Notes.count({ userHost: null }), diff --git a/src/services/chart/charts/classes/per-user-drive.ts b/src/services/chart/charts/classes/per-user-drive.ts index aed9f6fce7..e778f7bf61 100644 --- a/src/services/chart/charts/classes/per-user-drive.ts +++ b/src/services/chart/charts/classes/per-user-drive.ts @@ -21,6 +21,18 @@ export default class PerUserDriveChart extends Chart<PerUserDriveLog> { } @autobind + protected aggregate(logs: PerUserDriveLog[]): PerUserDriveLog { + return { + totalCount: logs[0].totalCount, + totalSize: logs[0].totalSize, + incCount: logs.reduce((a, b) => a + b.incCount, 0), + incSize: logs.reduce((a, b) => a + b.incSize, 0), + decCount: logs.reduce((a, b) => a + b.decCount, 0), + decSize: logs.reduce((a, b) => a + b.decSize, 0), + }; + } + + @autobind protected async fetchActual(group: string): Promise<DeepPartial<PerUserDriveLog>> { const [count, size] = await Promise.all([ DriveFiles.count({ userId: group }), diff --git a/src/services/chart/charts/classes/per-user-following.ts b/src/services/chart/charts/classes/per-user-following.ts index 8295c0cb0d..8b536009c8 100644 --- a/src/services/chart/charts/classes/per-user-following.ts +++ b/src/services/chart/charts/classes/per-user-following.ts @@ -36,6 +36,36 @@ export default class PerUserFollowingChart extends Chart<PerUserFollowingLog> { } @autobind + protected aggregate(logs: PerUserFollowingLog[]): PerUserFollowingLog { + return { + local: { + followings: { + total: logs[0].local.followings.total, + inc: logs.reduce((a, b) => a + b.local.followings.inc, 0), + dec: logs.reduce((a, b) => a + b.local.followings.dec, 0), + }, + followers: { + total: logs[0].local.followers.total, + inc: logs.reduce((a, b) => a + b.local.followers.inc, 0), + dec: logs.reduce((a, b) => a + b.local.followers.dec, 0), + }, + }, + remote: { + followings: { + total: logs[0].remote.followings.total, + inc: logs.reduce((a, b) => a + b.remote.followings.inc, 0), + dec: logs.reduce((a, b) => a + b.remote.followings.dec, 0), + }, + followers: { + total: logs[0].remote.followers.total, + inc: logs.reduce((a, b) => a + b.remote.followers.inc, 0), + dec: logs.reduce((a, b) => a + b.remote.followers.dec, 0), + }, + }, + }; + } + + @autobind protected async fetchActual(group: string): Promise<DeepPartial<PerUserFollowingLog>> { const [ localFollowingsCount, diff --git a/src/services/chart/charts/classes/per-user-notes.ts b/src/services/chart/charts/classes/per-user-notes.ts index cccd495604..8d1fb8d2b0 100644 --- a/src/services/chart/charts/classes/per-user-notes.ts +++ b/src/services/chart/charts/classes/per-user-notes.ts @@ -21,6 +21,20 @@ export default class PerUserNotesChart extends Chart<PerUserNotesLog> { } @autobind + protected aggregate(logs: PerUserNotesLog[]): PerUserNotesLog { + return { + total: logs[0].total, + inc: logs.reduce((a, b) => a + b.inc, 0), + dec: logs.reduce((a, b) => a + b.dec, 0), + diffs: { + reply: logs.reduce((a, b) => a + b.diffs.reply, 0), + renote: logs.reduce((a, b) => a + b.diffs.renote, 0), + normal: logs.reduce((a, b) => a + b.diffs.normal, 0), + }, + }; + } + + @autobind protected async fetchActual(group: string): Promise<DeepPartial<PerUserNotesLog>> { const [count] = await Promise.all([ Notes.count({ userId: group }), diff --git a/src/services/chart/charts/classes/per-user-reactions.ts b/src/services/chart/charts/classes/per-user-reactions.ts index 124fb4153c..b4cdced40c 100644 --- a/src/services/chart/charts/classes/per-user-reactions.ts +++ b/src/services/chart/charts/classes/per-user-reactions.ts @@ -19,6 +19,18 @@ export default class PerUserReactionsChart extends Chart<PerUserReactionsLog> { } @autobind + protected aggregate(logs: PerUserReactionsLog[]): PerUserReactionsLog { + return { + local: { + count: logs.reduce((a, b) => a + b.local.count, 0), + }, + remote: { + count: logs.reduce((a, b) => a + b.remote.count, 0), + }, + }; + } + + @autobind protected async fetchActual(group: string): Promise<DeepPartial<PerUserReactionsLog>> { return {}; } diff --git a/src/services/chart/charts/classes/test-grouped.ts b/src/services/chart/charts/classes/test-grouped.ts index e32cbcf416..92c8df636e 100644 --- a/src/services/chart/charts/classes/test-grouped.ts +++ b/src/services/chart/charts/classes/test-grouped.ts @@ -22,6 +22,17 @@ export default class TestGroupedChart extends Chart<TestGroupedLog> { } @autobind + protected aggregate(logs: TestGroupedLog[]): TestGroupedLog { + return { + foo: { + total: logs[0].foo.total, + inc: logs.reduce((a, b) => a + b.foo.inc, 0), + dec: logs.reduce((a, b) => a + b.foo.dec, 0), + }, + }; + } + + @autobind protected async fetchActual(group: string): Promise<DeepPartial<TestGroupedLog>> { return { foo: { diff --git a/src/services/chart/charts/classes/test-unique.ts b/src/services/chart/charts/classes/test-unique.ts index 1eb396c293..5680d713ec 100644 --- a/src/services/chart/charts/classes/test-unique.ts +++ b/src/services/chart/charts/classes/test-unique.ts @@ -16,14 +16,21 @@ export default class TestUniqueChart extends Chart<TestUniqueLog> { } @autobind + protected aggregate(logs: TestUniqueLog[]): TestUniqueLog { + return { + foo: logs.reduce((a, b) => a.concat(b.foo), [] as TestUniqueLog['foo']), + }; + } + + @autobind protected async fetchActual(): Promise<DeepPartial<TestUniqueLog>> { return {}; } @autobind public async uniqueIncrement(key: string) { - await this.incIfUnique({ - foo: 1 - }, 'foos', key); + await this.inc({ + foo: [key] + }); } } diff --git a/src/services/chart/charts/classes/test.ts b/src/services/chart/charts/classes/test.ts index ea64040f3e..d37d298de7 100644 --- a/src/services/chart/charts/classes/test.ts +++ b/src/services/chart/charts/classes/test.ts @@ -22,6 +22,17 @@ export default class TestChart extends Chart<TestLog> { } @autobind + protected aggregate(logs: TestLog[]): TestLog { + return { + foo: { + total: logs[0].foo.total, + inc: logs.reduce((a, b) => a + b.foo.inc, 0), + dec: logs.reduce((a, b) => a + b.foo.dec, 0), + }, + }; + } + + @autobind protected async fetchActual(): Promise<DeepPartial<TestLog>> { return { foo: { diff --git a/src/services/chart/charts/classes/users.ts b/src/services/chart/charts/classes/users.ts index 47e4caa1b7..87b19d88f9 100644 --- a/src/services/chart/charts/classes/users.ts +++ b/src/services/chart/charts/classes/users.ts @@ -26,6 +26,22 @@ export default class UsersChart extends Chart<UsersLog> { } @autobind + protected aggregate(logs: UsersLog[]): UsersLog { + return { + local: { + total: logs[0].local.total, + inc: logs.reduce((a, b) => a + b.local.inc, 0), + dec: logs.reduce((a, b) => a + b.local.dec, 0), + }, + remote: { + total: logs[0].remote.total, + inc: logs.reduce((a, b) => a + b.remote.inc, 0), + dec: logs.reduce((a, b) => a + b.remote.dec, 0), + }, + }; + } + + @autobind protected async fetchActual(): Promise<DeepPartial<UsersLog>> { const [localCount, remoteCount] = await Promise.all([ Users.count({ host: null }), diff --git a/src/services/chart/charts/schemas/active-users.ts b/src/services/chart/charts/schemas/active-users.ts index 6e26bb4698..cdf0579efb 100644 --- a/src/services/chart/charts/schemas/active-users.ts +++ b/src/services/chart/charts/schemas/active-users.ts @@ -1,11 +1,15 @@ export const logSchema = { /** - * アクティブユーザー数 + * アクティブユーザー */ - count: { - type: 'number' as const, + users: { + type: 'array' as const, optional: false as const, nullable: false as const, - description: 'アクティブユーザー数', + description: 'アクティブユーザー', + items: { + type: 'string' as const, + optional: false as const, nullable: false as const, + } }, }; diff --git a/src/services/chart/charts/schemas/hashtag.ts b/src/services/chart/charts/schemas/hashtag.ts index 4dfd61c97f..791d0d1721 100644 --- a/src/services/chart/charts/schemas/hashtag.ts +++ b/src/services/chart/charts/schemas/hashtag.ts @@ -1,11 +1,15 @@ export const logSchema = { /** - * 投稿された数 + * 投稿したユーザー */ - count: { - type: 'number' as const, + users: { + type: 'array' as const, optional: false as const, nullable: false as const, - description: '投稿された数', + description: '投稿したユーザー', + items: { + type: 'string' as const, + optional: false as const, nullable: false as const, + } }, }; diff --git a/src/services/chart/charts/schemas/test-unique.ts b/src/services/chart/charts/schemas/test-unique.ts index 075a8092d9..51280400ac 100644 --- a/src/services/chart/charts/schemas/test-unique.ts +++ b/src/services/chart/charts/schemas/test-unique.ts @@ -3,9 +3,12 @@ export const schema = { optional: false as const, nullable: false as const, properties: { foo: { - type: 'number' as const, + type: 'array' as const, optional: false as const, nullable: false as const, - description: '' + items: { + type: 'string' as const, + optional: false as const, nullable: false as const, + } }, } }; diff --git a/src/services/chart/core.ts b/src/services/chart/core.ts index dc09923ae4..10621be073 100644 --- a/src/services/chart/core.ts +++ b/src/services/chart/core.ts @@ -24,8 +24,6 @@ type ArrayValue<T> = { [P in keyof T]: T[P] extends number ? T[P][] : ArrayValue<T[P]>; }; -type Span = 'day' | 'hour'; - type Log = { id: number; @@ -38,22 +36,14 @@ type Log = { * 集計日時のUnixタイムスタンプ(秒) */ date: number; - - /** - * 集計期間 - */ - span: Span; - - /** - * ユニークインクリメント用 - */ - unique?: Record<string, any>; }; const camelToSnake = (str: string) => { return str.replace(/([A-Z])/g, s => '_' + s.charAt(0).toLowerCase()); }; +const removeDuplicates = (array: any[]) => Array.from(new Set(array)); + /** * 様々なチャートの管理を司るクラス */ @@ -62,10 +52,21 @@ export default abstract class Chart<T extends Record<string, any>> { private static readonly columnDot = '_'; private name: string; + private queue: { + diff: DeepPartial<T>; + group: string | null; + }[] = []; public schema: Schema; protected repository: Repository<Log>; + protected abstract genNewLog(latest: T): DeepPartial<T>; - protected abstract async fetchActual(group: string | null): Promise<DeepPartial<T>>; + + /** + * @param logs 日時が新しい方が先頭 + */ + protected abstract aggregate(logs: T[]): T; + + protected abstract fetchActual(group: string | null): Promise<DeepPartial<T>>; @autobind private static convertSchemaToFlatColumnDefinitions(schema: Schema) { @@ -75,10 +76,15 @@ export default abstract class Chart<T extends Record<string, any>> { const p = path ? `${path}${this.columnDot}${k}` : k; if (v.type === 'object') { flatColumns(v.properties, p); - } else { + } else if (v.type === 'number') { columns[this.columnPrefix + p] = { type: 'bigint', }; + } else if (v.type === 'array' && v.items.type === 'string') { + columns[this.columnPrefix + p] = { + type: 'varchar', + array: true, + }; } } }; @@ -99,11 +105,11 @@ export default abstract class Chart<T extends Record<string, any>> { @autobind private static convertObjectToFlattenColumns(x: Record<string, any>) { - const columns = {} as Record<string, number>; + const columns = {} as Record<string, number | unknown[]>; const flatten = (x: Obj, path?: string) => { for (const [k, v] of Object.entries(x)) { const p = path ? `${path}${this.columnDot}${k}` : k; - if (typeof v === 'object') { + if (typeof v === 'object' && !Array.isArray(v)) { flatten(v, p); } else { columns[this.columnPrefix + p] = v; @@ -115,14 +121,37 @@ export default abstract class Chart<T extends Record<string, any>> { } @autobind - private static convertQuery(x: Record<string, any>) { - const query: Record<string, Function> = {}; + private static countUniqueFields(x: Record<string, any>) { + const exec = (x: Obj) => { + const res = {} as Record<string, any>; + for (const [k, v] of Object.entries(x)) { + if (typeof v === 'object' && !Array.isArray(v)) { + res[k] = exec(v); + } else if (Array.isArray(v)) { + res[k] = Array.from(new Set(v)).length; + } else { + res[k] = v; + } + } + return res; + }; + return exec(x); + } - const columns = Chart.convertObjectToFlattenColumns(x); + @autobind + private static convertQuery(diff: Record<string, number | unknown[]>) { + const query: Record<string, Function> = {}; - for (const [k, v] of Object.entries(columns)) { - if (v > 0) query[k] = () => `"${k}" + ${v}`; - if (v < 0) query[k] = () => `"${k}" - ${Math.abs(v)}`; + for (const [k, v] of Object.entries(diff)) { + if (typeof v === 'number') { + if (v > 0) query[k] = () => `"${k}" + ${v}`; + if (v < 0) query[k] = () => `"${k}" - ${Math.abs(v)}`; + } else if (Array.isArray(v)) { + // TODO: item が文字列以外の場合も対応 + // TODO: item をSQLエスケープ + const items = v.map(item => `"${item}"`).join(','); + query[k] = () => `array_cat("${k}", '{${items}}'::varchar[])`; + } } return query; @@ -169,28 +198,14 @@ export default abstract class Chart<T extends Record<string, any>> { length: 128, nullable: true }, - span: { - type: 'enum', - enum: ['hour', 'day'] - }, - unique: { - type: 'jsonb', - default: {} - }, ...Chart.convertSchemaToFlatColumnDefinitions(schema) }, indices: [{ columns: ['date'] }, { - columns: ['span'] - }, { columns: ['group'] }, { - columns: ['span', 'date'] - }, { columns: ['date', 'group'] - }, { - columns: ['span', 'date', 'group'] }] }); } @@ -200,7 +215,7 @@ export default abstract class Chart<T extends Record<string, any>> { this.schema = schema; const entity = Chart.schemaToEntity(name, schema); - const keys = ['span', 'date']; + const keys = ['date']; if (grouped) keys.push('group'); entity.options.uniques = [{ @@ -220,7 +235,8 @@ export default abstract class Chart<T extends Record<string, any>> { flatColumns(v.properties, p); } else { if (nestedProperty.get(log, p) == null) { - nestedProperty.set(log, p, 0); + const emptyValue = v.type === 'number' ? 0 : []; + nestedProperty.set(log, p, emptyValue); } } } @@ -230,10 +246,9 @@ export default abstract class Chart<T extends Record<string, any>> { } @autobind - private getLatestLog(span: Span, group: string | null = null): Promise<Log | null> { + private getLatestLog(group: string | null = null): Promise<Log | null> { return this.repository.findOne({ group: group, - span: span }, { order: { date: -1 @@ -242,17 +257,13 @@ export default abstract class Chart<T extends Record<string, any>> { } @autobind - private async getCurrentLog(span: Span, group: string | null = null): Promise<Log> { + private async getCurrentLog(group: string | null = null): Promise<Log> { const [y, m, d, h] = Chart.getCurrentDate(); - const current = - span == 'day' ? dateUTC([y, m, d, 0]) : - span == 'hour' ? dateUTC([y, m, d, h]) : - null as never; + const current = dateUTC([y, m, d, h]); - // 現在(今日または今のHour)のログ + // 現在(=今のHour)のログ const currentLog = await this.repository.findOne({ - span: span, date: Chart.dateToTimestamp(current), ...(group ? { group: group } : {}) }); @@ -271,7 +282,7 @@ export default abstract class Chart<T extends Record<string, any>> { // * 昨日何もチャートを更新するような出来事がなかった場合は、 // * ログがそもそも作られずドキュメントが存在しないということがあり得るため、 // * 「昨日の」と決め打ちせずに「もっとも最近の」とします - const latest = await this.getLatestLog(span, group); + const latest = await this.getLatestLog(group); if (latest != null) { const obj = Chart.convertFlattenColumnsToObject( @@ -286,17 +297,16 @@ export default abstract class Chart<T extends Record<string, any>> { // 初期ログデータを作成 data = this.getNewLog(null); - logger.info(`${this.name + (group ? `:${group}` : '')} (${span}): Initial commit created`); + logger.info(`${this.name + (group ? `:${group}` : '')}: Initial commit created`); } const date = Chart.dateToTimestamp(current); - const lockKey = `${this.name}:${date}:${group}:${span}`; + const lockKey = `${this.name}:${date}:${group}`; const unlock = await getChartInsertLock(lockKey); try { // ロック内でもう1回チェックする const currentLog = await this.repository.findOne({ - span: span, date: date, ...(group ? { group: group } : {}) }); @@ -307,12 +317,11 @@ export default abstract class Chart<T extends Record<string, any>> { // 新規ログ挿入 log = await this.repository.save({ group: group, - span: span, date: date, ...Chart.convertObjectToFlattenColumns(data) }); - logger.info(`${this.name + (group ? `:${group}` : '')} (${span}): New commit created`); + logger.info(`${this.name + (group ? `:${group}` : '')}: New commit created`); return log; } finally { @@ -321,38 +330,62 @@ export default abstract class Chart<T extends Record<string, any>> { } @autobind - protected commit(query: Record<string, Function>, group: string | null = null, uniqueKey?: string, uniqueValue?: string): Promise<any> { + protected commit(diff: DeepPartial<T>, group: string | null = null): void { + this.queue.push({ + diff, group, + }); + } + + @autobind + public async save() { + if (this.queue.length === 0) { + logger.info(`${this.name}: Write skipped`); + return; + } + + // TODO: 前の時間のログがqueueにあった場合のハンドリング + // 例えば、save が20分ごとに行われるとして、前回行われたのは 01:50 だったとする。 + // 次に save が行われるのは 02:10 ということになるが、もし 01:55 に新規ログが queue に追加されたとすると、 + // そのログは本来は 01:00~ のログとしてDBに保存されて欲しいのに、02:00~ のログ扱いになってしまう。 + // これを回避するための実装は複雑になりそうなため、一旦保留。 + const update = async (log: Log) => { - // ユニークインクリメントの場合、指定のキーに指定の値が既に存在していたら弾く - if ( - uniqueKey && log.unique && - log.unique[uniqueKey] && - log.unique[uniqueKey].includes(uniqueValue) - ) return; + const finalDiffs = {} as Record<string, number | unknown[]>; - // ユニークインクリメントの指定のキーに値を追加 - if (uniqueKey && log.unique) { - if (log.unique[uniqueKey]) { - const sql = `jsonb_set("unique", '{${uniqueKey}}', ("unique"->>'${uniqueKey}')::jsonb || '["${uniqueValue}"]'::jsonb)`; - query['unique'] = () => sql; - } else { - const sql = `jsonb_set("unique", '{${uniqueKey}}', '["${uniqueValue}"]')`; - query['unique'] = () => sql; + for (const diff of this.queue.filter(q => q.group === log.group).map(q => q.diff)) { + const columns = Chart.convertObjectToFlattenColumns(diff); + + for (const [k, v] of Object.entries(columns)) { + if (finalDiffs[k] == null) { + finalDiffs[k] = v; + } else { + if (typeof finalDiffs[k] === 'number') { + (finalDiffs[k] as number) += v as number; + } else { + (finalDiffs[k] as unknown[]) = (finalDiffs[k] as unknown[]).concat(v); + } + } } } + const query = Chart.convertQuery(finalDiffs); + // ログ更新 await this.repository.createQueryBuilder() .update() .set(query) .where('id = :id', { id: log.id }) .execute(); + + logger.info(`${this.name + (log.group ? `:${log.group}` : '')}: Updated`); + + // TODO: この一連の処理が始まった後に新たにqueueに入ったものは消さないようにする + this.queue = this.queue.filter(q => q.group !== log.group); }; - return Promise.all([ - this.getCurrentLog('day', group).then(log => update(log)), - this.getCurrentLog('hour', group).then(log => update(log)), - ]); + const groups = removeDuplicates(this.queue.map(log => log.group)); + + await Promise.all(groups.map(group => this.getCurrentLog(group).then(log => update(log)))); } @autobind @@ -367,39 +400,30 @@ export default abstract class Chart<T extends Record<string, any>> { .execute(); }; - return Promise.all([ - this.getCurrentLog('day', group).then(log => update(log)), - this.getCurrentLog('hour', group).then(log => update(log)), - ]); + return this.getCurrentLog(group).then(log => update(log)); } @autobind protected async inc(inc: DeepPartial<T>, group: string | null = null): Promise<void> { - await this.commit(Chart.convertQuery(inc as any), group); - } - - @autobind - protected async incIfUnique(inc: DeepPartial<T>, key: string, value: string, group: string | null = null): Promise<void> { - await this.commit(Chart.convertQuery(inc as any), group, key, value); + await this.commit(inc, group); } @autobind - public async getChart(span: Span, amount: number, begin: Date | null, group: string | null = null): Promise<ArrayValue<T>> { - const [y, m, d, h, _m, _s, _ms] = begin ? Chart.parseDate(subtractTime(addTime(begin, 1, span), 1)) : Chart.getCurrentDate(); - const [y2, m2, d2, h2] = begin ? Chart.parseDate(addTime(begin, 1, span)) : [] as never; + public async getChart(span: 'hour' | 'day', amount: number, cursor: Date | null, group: string | null = null): Promise<ArrayValue<T>> { + const [y, m, d, h, _m, _s, _ms] = cursor ? Chart.parseDate(subtractTime(addTime(cursor, 1, span), 1)) : Chart.getCurrentDate(); + const [y2, m2, d2, h2] = cursor ? Chart.parseDate(addTime(cursor, 1, span)) : [] as never; const lt = dateUTC([y, m, d, h, _m, _s, _ms]); const gt = - span === 'day' ? subtractTime(begin ? dateUTC([y2, m2, d2, 0]) : dateUTC([y, m, d, 0]), amount - 1, 'day') : - span === 'hour' ? subtractTime(begin ? dateUTC([y2, m2, d2, h2]) : dateUTC([y, m, d, h]), amount - 1, 'hour') : + span === 'day' ? subtractTime(cursor ? dateUTC([y2, m2, d2, 0]) : dateUTC([y, m, d, 0]), amount - 1, 'day') : + span === 'hour' ? subtractTime(cursor ? dateUTC([y2, m2, d2, h2]) : dateUTC([y, m, d, h]), amount - 1, 'hour') : null as never; // ログ取得 let logs = await this.repository.find({ where: { group: group, - span: span, date: Between(Chart.dateToTimestamp(gt), Chart.dateToTimestamp(lt)) }, order: { @@ -413,7 +437,6 @@ export default abstract class Chart<T extends Record<string, any>> { // (すくなくともひとつログが無いと隙間埋めできないため) const recentLog = await this.repository.findOne({ group: group, - span: span }, { order: { date: -1 @@ -430,7 +453,6 @@ export default abstract class Chart<T extends Record<string, any>> { // (隙間埋めできないため) const outdatedLog = await this.repository.findOne({ group: group, - span: span, date: LessThan(Chart.dateToTimestamp(gt)) }, { order: { @@ -445,23 +467,56 @@ export default abstract class Chart<T extends Record<string, any>> { const chart: T[] = []; - // 整形 - for (let i = (amount - 1); i >= 0; i--) { - const current = - span === 'day' ? subtractTime(dateUTC([y, m, d, 0]), i, 'day') : - span === 'hour' ? subtractTime(dateUTC([y, m, d, h]), i, 'hour') : - null as never; + if (span === 'hour') { + for (let i = (amount - 1); i >= 0; i--) { + const current = subtractTime(dateUTC([y, m, d, h]), i, 'hour'); - const log = logs.find(l => isTimeSame(new Date(l.date * 1000), current)); + const log = logs.find(l => isTimeSame(new Date(l.date * 1000), current)); + + if (log) { + const data = Chart.convertFlattenColumnsToObject(log as Record<string, any>); + chart.unshift(Chart.countUniqueFields(data)); + } else { + // 隙間埋め + const latest = logs.find(l => isTimeBefore(new Date(l.date * 1000), current)); + const data = latest ? Chart.convertFlattenColumnsToObject(latest as Record<string, any>) : null; + chart.unshift(Chart.countUniqueFields(this.getNewLog(data))); + } + } + } else if (span === 'day') { + const logsForEachDays: T[][] = []; + let currentDay = -1; + let currentDayIndex = -1; + for (let i = ((amount - 1) * 24) + h; i >= 0; i--) { + const current = subtractTime(dateUTC([y, m, d, h]), i, 'hour'); + const _currentDay = Chart.parseDate(current)[2]; + if (currentDay != _currentDay) currentDayIndex++; + currentDay = _currentDay; + + const log = logs.find(l => isTimeSame(new Date(l.date * 1000), current)); + + if (log) { + if (logsForEachDays[currentDayIndex]) { + logsForEachDays[currentDayIndex].unshift(Chart.convertFlattenColumnsToObject(log)); + } else { + logsForEachDays[currentDayIndex] = [Chart.convertFlattenColumnsToObject(log)]; + } + } else { + // 隙間埋め + const latest = logs.find(l => isTimeBefore(new Date(l.date * 1000), current)); + const data = latest ? Chart.convertFlattenColumnsToObject(latest as Record<string, any>) : null; + const newLog = this.getNewLog(data); + if (logsForEachDays[currentDayIndex]) { + logsForEachDays[currentDayIndex].unshift(newLog); + } else { + logsForEachDays[currentDayIndex] = [newLog]; + } + } + } - if (log) { - const data = Chart.convertFlattenColumnsToObject(log as Record<string, any>); - chart.unshift(data); - } else { - // 隙間埋め - const latest = logs.find(l => isTimeBefore(new Date(l.date * 1000), current)); - const data = latest ? Chart.convertFlattenColumnsToObject(latest as Record<string, any>) : null; - chart.unshift(this.getNewLog(data)); + for (const logs of logsForEachDays) { + const log = this.aggregate(logs); + chart.unshift(Chart.countUniqueFields(log)); } } @@ -473,20 +528,19 @@ export default abstract class Chart<T extends Record<string, any>> { * { foo: [1, 2, 3], bar: [5, 6, 7] } * にする */ - const dive = (x: Obj, path?: string) => { + const compact = (x: Obj, path?: string) => { for (const [k, v] of Object.entries(x)) { const p = path ? `${path}.${k}` : k; - if (typeof v == 'object') { - dive(v, p); + if (typeof v === 'object' && !Array.isArray(v)) { + compact(v, p); } else { - const values = chart.map(s => nestedProperty.get(s, p)) - .map(v => parseInt(v, 10)); // TypeORMのバグ(?)で何故か数値カラムの値が文字列型になっているので数値に戻す + const values = chart.map(s => nestedProperty.get(s, p)); nestedProperty.set(res, p, values); } } }; - dive(chart[0]); + compact(chart[0]); return res; } diff --git a/src/services/chart/index.ts b/src/services/chart/index.ts index 9626e3d6b3..dde02bd64d 100644 --- a/src/services/chart/index.ts +++ b/src/services/chart/index.ts @@ -10,6 +10,7 @@ import PerUserReactionsChart from './charts/classes/per-user-reactions'; import HashtagChart from './charts/classes/hashtag'; import PerUserFollowingChart from './charts/classes/per-user-following'; import PerUserDriveChart from './charts/classes/per-user-drive'; +import { beforeShutdown } from '../../misc/before-shutdown'; export const federationChart = new FederationChart(); export const notesChart = new NotesChart(); @@ -23,3 +24,27 @@ export const perUserReactionsChart = new PerUserReactionsChart(); export const hashtagChart = new HashtagChart(); export const perUserFollowingChart = new PerUserFollowingChart(); export const perUserDriveChart = new PerUserDriveChart(); + +const charts = [ + federationChart, + notesChart, + usersChart, + networkChart, + activeUsersChart, + instanceChart, + perUserNotesChart, + driveChart, + perUserReactionsChart, + hashtagChart, + perUserFollowingChart, + perUserDriveChart, +]; + +// 20分おきにメモリ情報をDBに書き込み +setInterval(() => { + for (const chart of charts) { + chart.save(); + } +}, 1000 * 60 * 20); + +beforeShutdown(() => Promise.all(charts.map(chart => chart.save()))); diff --git a/src/services/create-notification.ts b/src/services/create-notification.ts index 6cd116040a..dedb8eac8d 100644 --- a/src/services/create-notification.ts +++ b/src/services/create-notification.ts @@ -30,7 +30,7 @@ export async function createNotification( ...data } as Partial<Notification>); - const packed = await Notifications.pack(notification); + const packed = await Notifications.pack(notification, {}); // Publish notification event publishMainStream(notifieeId, 'notification', packed); diff --git a/src/services/following/create.ts b/src/services/following/create.ts index c0583cdb86..1ce75caca0 100644 --- a/src/services/following/create.ts +++ b/src/services/following/create.ts @@ -1,4 +1,4 @@ -import { publishMainStream } from '../stream'; +import { publishMainStream, publishUserEvent } from '../stream'; import { renderActivity } from '../../remote/activitypub/renderer'; import renderFollow from '../../remote/activitypub/renderer/follow'; import renderAccept from '../../remote/activitypub/renderer/accept'; @@ -22,7 +22,7 @@ export async function insertFollowingDoc(followee: User, follower: User) { let alreadyFollowed = false; - await Followings.save({ + await Followings.insert({ id: genId(), createdAt: new Date(), followerId: follower.id, @@ -88,12 +88,15 @@ export async function insertFollowingDoc(followee: User, follower: User) { if (Users.isLocalUser(follower)) { Users.pack(followee, follower, { detail: true - }).then(packed => publishMainStream(follower.id, 'follow', packed)); + }).then(packed => { + publishUserEvent(follower.id, 'follow', packed); + publishMainStream(follower.id, 'follow', packed); + }); } // Publish followed event if (Users.isLocalUser(followee)) { - Users.pack(follower, followee).then(packed => publishMainStream(followee.id, 'followed', packed)), + Users.pack(follower, followee).then(packed => publishMainStream(followee.id, 'followed', packed)); // 通知を作成 createNotification(followee.id, 'follow', { diff --git a/src/services/following/delete.ts b/src/services/following/delete.ts index 8821611515..32c47ea7f4 100644 --- a/src/services/following/delete.ts +++ b/src/services/following/delete.ts @@ -1,4 +1,4 @@ -import { publishMainStream } from '../stream'; +import { publishMainStream, publishUserEvent } from '../stream'; import { renderActivity } from '../../remote/activitypub/renderer'; import renderFollow from '../../remote/activitypub/renderer/follow'; import renderUndo from '../../remote/activitypub/renderer/undo'; @@ -30,7 +30,10 @@ export default async function(follower: User, followee: User, silent = false) { if (!silent && Users.isLocalUser(follower)) { Users.pack(followee, follower, { detail: true - }).then(packed => publishMainStream(follower.id, 'unfollow', packed)); + }).then(packed => { + publishUserEvent(follower.id, 'unfollow', packed); + publishMainStream(follower.id, 'unfollow', packed); + }); } if (Users.isLocalUser(follower) && Users.isRemoteUser(followee)) { diff --git a/src/services/following/requests/reject.ts b/src/services/following/requests/reject.ts index 9a8b14bbfd..d8d3788088 100644 --- a/src/services/following/requests/reject.ts +++ b/src/services/following/requests/reject.ts @@ -2,7 +2,7 @@ import { renderActivity } from '../../../remote/activitypub/renderer'; import renderFollow from '../../../remote/activitypub/renderer/follow'; import renderReject from '../../../remote/activitypub/renderer/reject'; import { deliver } from '../../../queue'; -import { publishMainStream } from '../../stream'; +import { publishMainStream, publishUserEvent } from '../../stream'; import { User, ILocalUser } from '../../../models/entities/user'; import { Users, FollowRequests, Followings } from '../../../models'; import { decrementFollowing } from '../delete'; @@ -39,5 +39,8 @@ export default async function(followee: User, follower: User) { Users.pack(followee, follower, { detail: true - }).then(packed => publishMainStream(follower.id, 'unfollow', packed)); + }).then(packed => { + publishUserEvent(follower.id, 'unfollow', packed); + publishMainStream(follower.id, 'unfollow', packed); + }); } diff --git a/src/services/i/pin.ts b/src/services/i/pin.ts index 1ff5476b40..f727a10fb6 100644 --- a/src/services/i/pin.ts +++ b/src/services/i/pin.ts @@ -37,7 +37,7 @@ export async function addPinned(user: User, noteId: Note['id']) { throw new IdentifiableError('23f0cf4e-59a3-4276-a91d-61a5891c1514', 'That note has already been pinned.'); } - await UserNotePinings.save({ + await UserNotePinings.insert({ id: genId(), createdAt: new Date(), userId: user.id, diff --git a/src/services/insert-moderation-log.ts b/src/services/insert-moderation-log.ts index 33dab97259..87587d3bed 100644 --- a/src/services/insert-moderation-log.ts +++ b/src/services/insert-moderation-log.ts @@ -3,7 +3,7 @@ import { ModerationLogs } from '../models'; import { genId } from '../misc/gen-id'; export async function insertModerationLog(moderator: ILocalUser, type: string, info?: Record<string, any>) { - await ModerationLogs.save({ + await ModerationLogs.insert({ id: genId(), createdAt: new Date(), userId: moderator.id, diff --git a/src/services/messages/create.ts b/src/services/messages/create.ts index 8646ce37fc..413266d029 100644 --- a/src/services/messages/create.ts +++ b/src/services/messages/create.ts @@ -14,7 +14,7 @@ import { renderActivity } from '../../remote/activitypub/renderer'; import { deliver } from '../../queue'; export async function createMessage(user: User, recipientUser: User | undefined, recipientGroup: UserGroup | undefined, text: string | undefined, file: DriveFile | null, uri?: string) { - const message = await MessagingMessages.save({ + const message = { id: genId(), createdAt: new Date(), fileId: file ? file.id : null, @@ -25,7 +25,9 @@ export async function createMessage(user: User, recipientUser: User | undefined, isRead: false, reads: [] as any[], uri - } as MessagingMessage); + } as MessagingMessage; + + await MessagingMessages.insert(message); const messageObj = await MessagingMessages.pack(message); diff --git a/src/services/note/create.ts b/src/services/note/create.ts index 563eaac758..4a737e8516 100644 --- a/src/services/note/create.ts +++ b/src/services/note/create.ts @@ -247,7 +247,7 @@ export default async (user: User, data: Option, silent = false) => new Promise<N for (const u of us) { checkWordMute(note, { id: u.userId }, u.mutedWords).then(shouldMute => { if (shouldMute) { - MutedNotes.save({ + MutedNotes.insert({ id: genId(), userId: u.userId, noteId: note.id, @@ -259,21 +259,21 @@ export default async (user: User, data: Option, silent = false) => new Promise<N }); // Antenna - Antennas.find().then(async antennas => { - const followings = await Followings.createQueryBuilder('following') - .andWhere(`following.followeeId = :userId`, { userId: note.userId }) - .getMany(); - - const followers = followings.map(f => f.followerId); - - for (const antenna of antennas) { - checkHitAntenna(antenna, note, user, followers).then(hit => { - if (hit) { - addNoteToAntenna(antenna, note, user); + Followings.createQueryBuilder('following') + .andWhere(`following.followeeId = :userId`, { userId: note.userId }) + .getMany() + .then(followings => { + const followers = followings.map(f => f.followerId); + Antennas.find().then(async antennas => { + for (const antenna of antennas) { + checkHitAntenna(antenna, note, user, followers).then(hit => { + if (hit) { + addNoteToAntenna(antenna, note, user); + } + }); } }); - } - }); + }); // Channel if (note.channelId) { @@ -444,8 +444,13 @@ async function renderNoteOrRenoteActivity(data: Option, note: Note) { } function incRenoteCount(renote: Note) { - Notes.increment({ id: renote.id }, 'renoteCount', 1); - Notes.increment({ id: renote.id }, 'score', 1); + Notes.createQueryBuilder().update() + .set({ + renoteCount: () => '"renoteCount" + 1', + score: () => '"score" + 1' + }) + .where('id = :id', { id: renote.id }) + .execute(); } async function insertNote(user: User, data: Option, tags: string[], emojis: string[], mentionedUsers: User[]) { @@ -525,7 +530,7 @@ async function insertNote(user: User, data: Option, tags: string[], emojis: stri await Notes.insert(insert); } - return await Notes.findOneOrFail(insert.id); + return insert; } catch (e) { // duplicate key error if (isDuplicateKeyValueError(e)) { @@ -594,10 +599,13 @@ function saveReply(reply: Note, note: Note) { } function incNotesCountOfUser(user: User) { - Users.increment({ id: user.id }, 'notesCount', 1); - Users.update({ id: user.id }, { - updatedAt: new Date() - }); + Users.createQueryBuilder().update() + .set({ + updatedAt: new Date(), + notesCount: () => '"notesCount" + 1' + }) + .where('id = :id', { id: user.id }) + .execute(); } async function extractMentionedUsers(user: User, tokens: ReturnType<typeof parse>): Promise<User[]> { diff --git a/src/services/note/polls/vote.ts b/src/services/note/polls/vote.ts index bfcaaa09be..b4ce03ab60 100644 --- a/src/services/note/polls/vote.ts +++ b/src/services/note/polls/vote.ts @@ -29,7 +29,7 @@ export default async function(user: User, note: Note, choice: number) { } // Create vote - await PollVotes.save({ + await PollVotes.insert({ id: genId(), createdAt: new Date(), noteId: note.id, diff --git a/src/services/note/reaction/create.ts b/src/services/note/reaction/create.ts index adc96ddc1f..181099cc2d 100644 --- a/src/services/note/reaction/create.ts +++ b/src/services/note/reaction/create.ts @@ -11,45 +11,53 @@ import { perUserReactionsChart } from '../../chart'; import { genId } from '../../../misc/gen-id'; import { createNotification } from '../../create-notification'; import deleteReaction from './delete'; +import { isDuplicateKeyValueError } from '../../../misc/is-duplicate-key-value-error'; +import { NoteReaction } from '../../../models/entities/note-reaction'; export default async (user: User, note: Note, reaction?: string) => { + // TODO: cache reaction = await toDbReaction(reaction, user.host); - const exist = await NoteReactions.findOne({ + let record: NoteReaction = { + id: genId(), + createdAt: new Date(), noteId: note.id, userId: user.id, - }); + reaction + }; - if (exist) { - if (exist.reaction !== reaction) { - // 別のリアクションがすでにされていたら置き換える - await deleteReaction(user, note); + // Create reaction + try { + await NoteReactions.insert(record); + } catch (e) { + if (isDuplicateKeyValueError(e)) { + record = await NoteReactions.findOneOrFail({ + noteId: note.id, + userId: user.id, + }); + + if (record.reaction !== reaction) { + // 別のリアクションがすでにされていたら置き換える + await deleteReaction(user, note); + } else { + // 同じリアクションがすでにされていたら何もしない + return; + } } else { - // 同じリアクションがすでにされていたら何もしない - return; + throw e; } } - // Create reaction - const inserted = await NoteReactions.save({ - id: genId(), - createdAt: new Date(), - noteId: note.id, - userId: user.id, - reaction - }); - // Increment reactions count const sql = `jsonb_set("reactions", '{${reaction}}', (COALESCE("reactions"->>'${reaction}', '0')::int + 1)::text::jsonb)`; await Notes.createQueryBuilder().update() .set({ reactions: () => sql, + score: () => '"score" + 1' }) .where('id = :id', { id: note.id }) .execute(); - Notes.increment({ id: note.id }, 'score', 1); - perUserReactionsChart.update(user, note); // カスタム絵文字リアクションだったら絵文字情報も送る @@ -101,7 +109,7 @@ export default async (user: User, note: Note, reaction?: string) => { //#region 配信 if (Users.isLocalUser(user) && !note.localOnly) { - const content = renderActivity(await renderLike(inserted, note)); + const content = renderActivity(await renderLike(record, note)); const dm = new DeliverManager(user, content); if (note.userHost !== null) { const reactee = await Users.findOne(note.userId); diff --git a/src/services/note/read.ts b/src/services/note/read.ts index 5a39ab30b7..35279db411 100644 --- a/src/services/note/read.ts +++ b/src/services/note/read.ts @@ -2,70 +2,54 @@ import { publishMainStream } from '../stream'; import { Note } from '../../models/entities/note'; import { User } from '../../models/entities/user'; import { NoteUnreads, Antennas, AntennaNotes, Users } from '../../models'; -import { Not, IsNull } from 'typeorm'; +import { Not, IsNull, In } from 'typeorm'; /** - * Mark a note as read + * Mark notes as read */ export default async function( userId: User['id'], - noteId: Note['id'] + noteIds: Note['id'][] ) { async function careNoteUnreads() { - const exist = await NoteUnreads.findOne({ - userId: userId, - noteId: noteId, - }); - - if (!exist) return; - // Remove the record await NoteUnreads.delete({ userId: userId, - noteId: noteId, + noteId: In(noteIds), }); - if (exist.isMentioned) { - NoteUnreads.count({ - userId: userId, - isMentioned: true - }).then(mentionsCount => { - if (mentionsCount === 0) { - // 全て既読になったイベントを発行 - publishMainStream(userId, 'readAllUnreadMentions'); - } - }); - } + NoteUnreads.count({ + userId: userId, + isMentioned: true + }).then(mentionsCount => { + if (mentionsCount === 0) { + // 全て既読になったイベントを発行 + publishMainStream(userId, 'readAllUnreadMentions'); + } + }); - if (exist.isSpecified) { - NoteUnreads.count({ - userId: userId, - isSpecified: true - }).then(specifiedCount => { - if (specifiedCount === 0) { - // 全て既読になったイベントを発行 - publishMainStream(userId, 'readAllUnreadSpecifiedNotes'); - } - }); - } + NoteUnreads.count({ + userId: userId, + isSpecified: true + }).then(specifiedCount => { + if (specifiedCount === 0) { + // 全て既読になったイベントを発行 + publishMainStream(userId, 'readAllUnreadSpecifiedNotes'); + } + }); - if (exist.noteChannelId) { - NoteUnreads.count({ - userId: userId, - noteChannelId: Not(IsNull()) - }).then(channelNoteCount => { - if (channelNoteCount === 0) { - // 全て既読になったイベントを発行 - publishMainStream(userId, 'readAllChannels'); - } - }); - } + NoteUnreads.count({ + userId: userId, + noteChannelId: Not(IsNull()) + }).then(channelNoteCount => { + if (channelNoteCount === 0) { + // 全て既読になったイベントを発行 + publishMainStream(userId, 'readAllChannels'); + } + }); } async function careAntenna() { - const beforeUnread = await Users.getHasUnreadAntenna(userId); - if (!beforeUnread) return; - const antennas = await Antennas.find({ userId }); await Promise.all(antennas.map(async antenna => { @@ -78,7 +62,7 @@ export default async function( await AntennaNotes.update({ antennaId: antenna.id, - noteId: noteId + noteId: In(noteIds) }, { read: true }); diff --git a/src/services/note/unread.ts b/src/services/note/unread.ts index 6fd9ee2cfe..8e6fb4abe8 100644 --- a/src/services/note/unread.ts +++ b/src/services/note/unread.ts @@ -17,7 +17,7 @@ export default async function(userId: User['id'], note: Note, params: { if (mute.map(m => m.muteeId).includes(note.userId)) return; //#endregion - const unread = await NoteUnreads.save({ + const unread = { id: genId(), noteId: note.id, userId: userId, @@ -25,7 +25,9 @@ export default async function(userId: User['id'], note: Note, params: { isMentioned: params.isMentioned, noteChannelId: note.channelId, noteUserId: note.userId, - }); + }; + + await NoteUnreads.insert(unread); // 2秒経っても既読にならなかったら「未読の投稿がありますよ」イベントを発行する setTimeout(async () => { diff --git a/src/services/note/watch.ts b/src/services/note/watch.ts index d3c9553696..966b7f0054 100644 --- a/src/services/note/watch.ts +++ b/src/services/note/watch.ts @@ -10,7 +10,7 @@ export default async (me: User['id'], note: Note) => { return; } - await NoteWatchings.save({ + await NoteWatchings.insert({ id: genId(), createdAt: new Date(), noteId: note.id, diff --git a/src/services/register-or-fetch-instance-doc.ts b/src/services/register-or-fetch-instance-doc.ts index 3501e20de1..2c39502288 100644 --- a/src/services/register-or-fetch-instance-doc.ts +++ b/src/services/register-or-fetch-instance-doc.ts @@ -3,10 +3,16 @@ import { Instances } from '../models'; import { federationChart } from './chart'; import { genId } from '../misc/gen-id'; import { toPuny } from '../misc/convert-host'; +import { Cache } from '../misc/cache'; + +const cache = new Cache<Instance>(1000 * 60 * 60); export async function registerOrFetchInstanceDoc(host: string): Promise<Instance> { host = toPuny(host); + const cached = cache.get(host); + if (cached) return cached; + const index = await Instances.findOne({ host }); if (index == null) { @@ -19,8 +25,10 @@ export async function registerOrFetchInstanceDoc(host: string): Promise<Instance federationChart.update(true); + cache.set(host, i); return i; } else { + cache.set(host, index); return index; } } diff --git a/src/services/stream.ts b/src/services/stream.ts index d833d700fe..75385847ce 100644 --- a/src/services/stream.ts +++ b/src/services/stream.ts @@ -20,6 +20,10 @@ class Publisher { })); } + public publishUserEvent = (userId: User['id'], type: string, value?: any): void => { + this.publish(`user:${userId}`, type, typeof value === 'undefined' ? null : value); + } + public publishBroadcastStream = (type: string, value?: any): void => { this.publish('broadcast', type, typeof value === 'undefined' ? null : value); } @@ -84,6 +88,7 @@ const publisher = new Publisher(); export default publisher; +export const publishUserEvent = publisher.publishUserEvent; export const publishBroadcastStream = publisher.publishBroadcastStream; export const publishMainStream = publisher.publishMainStream; export const publishDriveStream = publisher.publishDriveStream; diff --git a/src/services/update-hashtag.ts b/src/services/update-hashtag.ts index 1dcb582791..3e22846731 100644 --- a/src/services/update-hashtag.ts +++ b/src/services/update-hashtag.ts @@ -86,7 +86,7 @@ export async function updateHashtag(user: User, tag: string, isUserAttached = fa } } else { if (isUserAttached) { - Hashtags.save({ + Hashtags.insert({ id: genId(), name: tag, mentionedUserIds: [], @@ -103,7 +103,7 @@ export async function updateHashtag(user: User, tag: string, isUserAttached = fa attachedRemoteUsersCount: Users.isRemoteUser(user) ? 1 : 0, } as Hashtag); } else { - Hashtags.save({ + Hashtags.insert({ id: genId(), name: tag, mentionedUserIds: [user.id], diff --git a/src/services/user-list/push.ts b/src/services/user-list/push.ts index e67be4b027..ba54c04475 100644 --- a/src/services/user-list/push.ts +++ b/src/services/user-list/push.ts @@ -8,7 +8,7 @@ import { fetchProxyAccount } from '../../misc/fetch-proxy-account'; import createFollowing from '../following/create'; export async function pushUserToUserList(target: User, list: UserList) { - await UserListJoinings.save({ + await UserListJoinings.insert({ id: genId(), createdAt: new Date(), userId: target.id, |