diff options
| author | syuilo <Syuilotan@yahoo.co.jp> | 2019-04-07 21:50:36 +0900 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2019-04-07 21:50:36 +0900 |
| commit | f0a29721c9fb10f97faf386bc9d6b1b2fad97895 (patch) | |
| tree | b5c1d38d698589bb444c0881a431391db91eb5bc /src/services/chart/core.ts | |
| parent | Update README.md [AUTOGEN] (#4639) (diff) | |
| download | misskey-f0a29721c9fb10f97faf386bc9d6b1b2fad97895.tar.gz misskey-f0a29721c9fb10f97faf386bc9d6b1b2fad97895.tar.bz2 misskey-f0a29721c9fb10f97faf386bc9d6b1b2fad97895.zip | |
Use PostgreSQL instead of MongoDB (#4572)
* wip
* Update note.ts
* Update timeline.ts
* Update core.ts
* wip
* Update generate-visibility-query.ts
* wip
* wip
* wip
* wip
* wip
* Update global-timeline.ts
* wip
* wip
* wip
* Update vote.ts
* wip
* wip
* Update create.ts
* wip
* wip
* wip
* wip
* wip
* wip
* wip
* wip
* wip
* wip
* wip
* wip
* Update files.ts
* wip
* wip
* Update CONTRIBUTING.md
* wip
* wip
* wip
* wip
* wip
* wip
* wip
* wip
* Update read-notification.ts
* wip
* wip
* wip
* wip
* wip
* wip
* wip
* Update cancel.ts
* wip
* wip
* wip
* Update show.ts
* wip
* wip
* Update gen-id.ts
* Update create.ts
* Update id.ts
* wip
* wip
* wip
* wip
* wip
* wip
* wip
* Docker: Update files about Docker (#4599)
* Docker: Use cache if files used by `yarn install` was not updated
This patch reduces the number of times to installing node_modules.
For example, `yarn install` step will be skipped when only ".config/default.yml" is updated.
* Docker: Migrate MongoDB to Postgresql
Misskey uses Postgresql as a database instead of Mongodb since version 11.
* Docker: Uncomment about data persistence
This patch will save a lot of databases.
* wip
* wip
* wip
* Update activitypub.ts
* wip
* wip
* wip
* Update logs.ts
* wip
* Update drive-file.ts
* Update register.ts
* wip
* wip
* Update mentions.ts
* wip
* wip
* wip
* Update recommendation.ts
* wip
* Update index.ts
* wip
* Update recommendation.ts
* Doc: Update docker.ja.md and docker.en.md (#1) (#4608)
Update how to set up misskey.
* wip
* :v:
* wip
* Update note.ts
* Update postgre.ts
* wip
* wip
* wip
* wip
* Update add-file.ts
* wip
* wip
* wip
* Clean up
* Update logs.ts
* wip
* :pizza:
* wip
* Ad notes
* wip
* Update api-visibility.ts
* Update note.ts
* Update add-file.ts
* tests
* tests
* Update postgre.ts
* Update utils.ts
* wip
* wip
* Refactor
* wip
* Refactor
* wip
* wip
* Update show-users.ts
* Update update-instance.ts
* wip
* Update feed.ts
* Update outbox.ts
* Update outbox.ts
* Update user.ts
* wip
* Update list.ts
* Update update-hashtag.ts
* wip
* Update update-hashtag.ts
* Refactor
* Update update.ts
* wip
* wip
* :v:
* clean up
* docs
* Update push.ts
* wip
* Update api.ts
* wip
* :v:
* Update make-pagination-query.ts
* :v:
* Delete hashtags.ts
* Update instances.ts
* Update instances.ts
* Update create.ts
* Update search.ts
* Update reversi-game.ts
* Update signup.ts
* Update user.ts
* id
* Update example.yml
* :art:
* objectid
* fix
* reversi
* reversi
* Fix bug of chart engine
* Add test of chart engine
* Improve test
* Better testing
* Improve chart engine
* Refactor
* Add test of chart engine
* Refactor
* Add chart test
* Fix bug
* コミットし忘れ
* Refactoring
* :v:
* Add tests
* Add test
* Extarct note tests
* Refactor
* 存在しないユーザーにメンションできなくなっていた問題を修正
* Fix bug
* Update update-meta.ts
* Fix bug
* Update mention.vue
* Fix bug
* Update meta.ts
* Update CONTRIBUTING.md
* Fix bug
* Fix bug
* Fix bug
* Clean up
* Clean up
* Update notification.ts
* Clean up
* Add mute tests
* Add test
* Refactor
* Add test
* Fix test
* Refactor
* Refactor
* Add tests
* Update utils.ts
* Update utils.ts
* Fix test
* Update package.json
* Update update.ts
* Update manifest.ts
* Fix bug
* Fix bug
* Add test
* :art:
* Update endpoint permissions
* Updaye permisison
* Update person.ts
#4299
* データベースと同期しないように
* Fix bug
* Fix bug
* Update reversi-game.ts
* Use a feature of Node v11.7.0 to extract a public key (#4644)
* wip
* wip
* :v:
* Refactoring
#1540
* test
* test
* test
* test
* test
* test
* test
* Fix bug
* Fix test
* :sushi:
* wip
* #4471
* Add test for #4335
* Refactor
* Fix test
* Add tests
* :clock4:
* Fix bug
* Add test
* Add test
* rename
* Fix bug
Diffstat (limited to 'src/services/chart/core.ts')
| -rw-r--r-- | src/services/chart/core.ts | 460 |
1 files changed, 460 insertions, 0 deletions
diff --git a/src/services/chart/core.ts b/src/services/chart/core.ts new file mode 100644 index 0000000000..2a60b1a0a3 --- /dev/null +++ b/src/services/chart/core.ts @@ -0,0 +1,460 @@ +/** + * チャートエンジン + * + * Tests located in test/chart + */ + +import * as moment from 'moment'; +import * as nestedProperty from 'nested-property'; +import autobind from 'autobind-decorator'; +import Logger from '../logger'; +import { Schema } from '../../misc/schema'; +import { EntitySchema, getRepository, Repository, LessThan, MoreThanOrEqual } from 'typeorm'; +import { isDuplicateKeyValueError } from '../../misc/is-duplicate-key-value-error'; + +const logger = new Logger('chart', 'white', process.env.NODE_ENV !== 'test'); + +const utc = moment.utc; + +export type Obj = { [key: string]: any }; + +export type DeepPartial<T> = { + [P in keyof T]?: DeepPartial<T[P]>; +}; + +type ArrayValue<T> = { + [P in keyof T]: T[P] extends number ? T[P][] : ArrayValue<T[P]>; +}; + +type Span = 'day' | 'hour'; + +type Log = { + id: number; + + /** + * 集計のグループ + */ + group: string | null; + + /** + * 集計日時のUnixタイムスタンプ(秒) + */ + date: number; + + /** + * 集計期間 + */ + span: Span; + + /** + * ユニークインクリメント用 + */ + unique?: Record<string, any>; +}; + +const camelToSnake = (str: string) => { + return str.replace(/([A-Z])/g, s => '_' + s.charAt(0).toLowerCase()); +}; + +/** + * 様々なチャートの管理を司るクラス + */ +export default abstract class Chart<T extends Record<string, any>> { + private static readonly columnPrefix = '___'; + private static readonly columnDot = '_'; + + private name: string; + public schema: Schema; + protected repository: Repository<Log>; + protected abstract genNewLog(latest: T): DeepPartial<T>; + protected abstract async fetchActual(group?: string): Promise<DeepPartial<T>>; + + @autobind + private static convertSchemaToFlatColumnDefinitions(schema: Schema) { + const columns = {} as any; + const flatColumns = (x: Obj, path?: string) => { + for (const [k, v] of Object.entries(x)) { + const p = path ? `${path}${this.columnDot}${k}` : k; + if (v.type === 'object') { + flatColumns(v.properties, p); + } else { + columns[this.columnPrefix + p] = { + type: 'integer', + }; + } + } + }; + flatColumns(schema.properties); + return columns; + } + + @autobind + private static convertFlattenColumnsToObject(x: Record<string, number>) { + const obj = {} as any; + for (const k of Object.keys(x).filter(k => k.startsWith(Chart.columnPrefix))) { + // now k is ___x_y_z + const path = k.substr(Chart.columnPrefix.length).split(Chart.columnDot).join('.'); + nestedProperty.set(obj, path, x[k]); + } + return obj; + } + + @autobind + private static convertObjectToFlattenColumns(x: Record<string, any>) { + const columns = {} as Record<string, number>; + const flatten = (x: Obj, path?: string) => { + for (const [k, v] of Object.entries(x)) { + const p = path ? `${path}${this.columnDot}${k}` : k; + if (typeof v === 'object') { + flatten(v, p); + } else { + columns[this.columnPrefix + p] = v; + } + } + }; + flatten(x); + return columns; + } + + @autobind + private static convertQuery(x: Record<string, any>) { + const query: Record<string, Function> = {}; + + const columns = Chart.convertObjectToFlattenColumns(x); + + for (const [k, v] of Object.entries(columns)) { + if (v > 0) query[k] = () => `"${k}" + ${v}`; + if (v < 0) query[k] = () => `"${k}" - ${v}`; + } + + return query; + } + + @autobind + private static momentToTimestamp(x: moment.Moment): Log['date'] { + return x.unix(); + } + + @autobind + public static schemaToEntity(name: string, schema: Schema): EntitySchema { + return new EntitySchema({ + name: `__chart__${camelToSnake(name)}`, + columns: { + id: { + type: 'integer', + primary: true, + generated: true + }, + date: { + type: 'integer', + }, + group: { + type: 'varchar', + length: 128, + nullable: true + }, + span: { + type: 'enum', + enum: ['hour', 'day'] + }, + unique: { + type: 'jsonb', + default: {} + }, + ...Chart.convertSchemaToFlatColumnDefinitions(schema) + }, + }); + } + + constructor(name: string, schema: Schema, grouped = false) { + this.name = name; + this.schema = schema; + const entity = Chart.schemaToEntity(name, schema); + + const keys = ['span', 'date']; + if (grouped) keys.push('group'); + + entity.options.uniques = [{ + columns: keys + }]; + + this.repository = getRepository<Log>(entity); + } + + @autobind + private getNewLog(latest?: T): T { + const log = latest ? this.genNewLog(latest) : {}; + const flatColumns = (x: Obj, path?: string) => { + for (const [k, v] of Object.entries(x)) { + const p = path ? `${path}.${k}` : k; + if (v.type === 'object') { + flatColumns(v.properties, p); + } else { + if (nestedProperty.get(log, p) == null) { + nestedProperty.set(log, p, 0); + } + } + } + }; + flatColumns(this.schema.properties); + return log as T; + } + + @autobind + private getCurrentDate(): [number, number, number, number] { + const now = moment().utc(); + + const y = now.year(); + const m = now.month(); + const d = now.date(); + const h = now.hour(); + + return [y, m, d, h]; + } + + @autobind + private getLatestLog(span: Span, group: string = null): Promise<Log> { + return this.repository.findOne({ + group: group, + span: span + }, { + order: { + date: -1 + } + }); + } + + @autobind + private async getCurrentLog(span: Span, group: string = null): Promise<Log> { + const [y, m, d, h] = this.getCurrentDate(); + + const current = + span == 'day' ? utc([y, m, d]) : + span == 'hour' ? utc([y, m, d, h]) : + null; + + // 現在(今日または今のHour)のログ + const currentLog = await this.repository.findOne({ + span: span, + date: Chart.momentToTimestamp(current), + ...(group ? { group: group } : {}) + }); + + // ログがあればそれを返して終了 + if (currentLog != null) { + return currentLog; + } + + let log: Log; + let data: T; + + // 集計期間が変わってから、初めてのチャート更新なら + // 最も最近のログを持ってくる + // * 例えば集計期間が「日」である場合で考えると、 + // * 昨日何もチャートを更新するような出来事がなかった場合は、 + // * ログがそもそも作られずドキュメントが存在しないということがあり得るため、 + // * 「昨日の」と決め打ちせずに「もっとも最近の」とします + const latest = await this.getLatestLog(span, group); + + if (latest != null) { + const obj = Chart.convertFlattenColumnsToObject( + latest as Record<string, any>); + + // 空ログデータを作成 + data = await this.getNewLog(obj); + } else { + // ログが存在しなかったら + // (Misskeyインスタンスを建てて初めてのチャート更新時) + + // 初期ログデータを作成 + data = await this.getNewLog(null); + + logger.info(`${this.name}: Initial commit created`); + } + + try { + // 新規ログ挿入 + log = await this.repository.save({ + group: group, + span: span, + date: Chart.momentToTimestamp(current), + ...Chart.convertObjectToFlattenColumns(data) + }); + } catch (e) { + // duplicate key error + // 並列動作している他のチャートエンジンプロセスと処理が重なる場合がある + // その場合は再度最も新しいログを持ってくる + if (isDuplicateKeyValueError(e)) { + log = await this.getLatestLog(span, group); + } else { + logger.error(e); + throw e; + } + } + + return log; + } + + @autobind + protected commit(query: Record<string, Function>, group: string = null, uniqueKey?: string, uniqueValue?: string): Promise<any> { + const update = async (log: Log) => { + // ユニークインクリメントの場合、指定のキーに指定の値が既に存在していたら弾く + if ( + uniqueKey && + log.unique[uniqueKey] && + log.unique[uniqueKey].includes(uniqueValue) + ) return; + + // ユニークインクリメントの指定のキーに値を追加 + if (uniqueKey) { + if (log.unique[uniqueKey]) { + const sql = `jsonb_set("unique", '{${uniqueKey}}', ("unique"->>'${uniqueKey}')::jsonb || '["${uniqueValue}"]'::jsonb)`; + query['unique'] = () => sql; + } else { + const sql = `jsonb_set("unique", '{${uniqueKey}}', '["${uniqueValue}"]')`; + query['unique'] = () => sql; + } + } + + // ログ更新 + await this.repository.createQueryBuilder() + .update() + .set(query) + .where('id = :id', { id: log.id }) + .execute(); + }; + + return Promise.all([ + this.getCurrentLog('day', group).then(log => update(log)), + this.getCurrentLog('hour', group).then(log => update(log)), + ]); + } + + @autobind + protected async inc(inc: DeepPartial<T>, group: string = null): Promise<void> { + await this.commit(Chart.convertQuery(inc as any), group); + } + + @autobind + protected async incIfUnique(inc: DeepPartial<T>, key: string, value: string, group: string = null): Promise<void> { + await this.commit(Chart.convertQuery(inc as any), group, key, value); + } + + @autobind + public async getChart(span: Span, range: number, group: string = null): Promise<ArrayValue<T>> { + const [y, m, d, h] = this.getCurrentDate(); + + const gt = + span == 'day' ? utc([y, m, d]).subtract(range, 'days') : + span == 'hour' ? utc([y, m, d, h]).subtract(range, 'hours') : + null; + + // ログ取得 + let logs = await this.repository.find({ + where: { + group: group, + span: span, + date: MoreThanOrEqual(Chart.momentToTimestamp(gt)) + }, + order: { + date: -1 + }, + }); + + // 要求された範囲にログがひとつもなかったら + if (logs.length === 0) { + // もっとも新しいログを持ってくる + // (すくなくともひとつログが無いと隙間埋めできないため) + const recentLog = await this.repository.findOne({ + group: group, + span: span + }, { + order: { + date: -1 + }, + }); + + if (recentLog) { + logs = [recentLog]; + } + + // 要求された範囲の最も古い箇所に位置するログが存在しなかったら + } else if (!utc(logs[logs.length - 1].date * 1000).isSame(gt)) { + // 要求された範囲の最も古い箇所時点での最も新しいログを持ってきて末尾に追加する + // (隙間埋めできないため) + const outdatedLog = await this.repository.findOne({ + group: group, + span: span, + date: LessThan(Chart.momentToTimestamp(gt)) + }, { + order: { + date: -1 + }, + }); + + if (outdatedLog) { + logs.push(outdatedLog); + } + } + + const chart: T[] = []; + + // 整形 + for (let i = (range - 1); i >= 0; i--) { + const current = + span == 'day' ? utc([y, m, d]).subtract(i, 'days') : + span == 'hour' ? utc([y, m, d, h]).subtract(i, 'hours') : + null; + + const log = logs.find(l => utc(l.date * 1000).isSame(current)); + + if (log) { + const data = Chart.convertFlattenColumnsToObject(log as Record<string, any>); + chart.unshift(data); + } else { + // 隙間埋め + const latest = logs.find(l => utc(l.date * 1000).isBefore(current)); + const data = latest ? Chart.convertFlattenColumnsToObject(latest as Record<string, any>) : null; + chart.unshift(this.getNewLog(data)); + } + } + + const res: ArrayValue<T> = {} as any; + + /** + * [{ foo: 1, bar: 5 }, { foo: 2, bar: 6 }, { foo: 3, bar: 7 }] + * を + * { foo: [1, 2, 3], bar: [5, 6, 7] } + * にする + */ + const dive = (x: Obj, path?: string) => { + for (const [k, v] of Object.entries(x)) { + const p = path ? `${path}.${k}` : k; + if (typeof v == 'object') { + dive(v, p); + } else { + nestedProperty.set(res, p, chart.map(s => nestedProperty.get(s, p))); + } + } + }; + + dive(chart[0]); + + return res; + } +} + +export function convertLog(logSchema: Schema): Schema { + const v: Schema = JSON.parse(JSON.stringify(logSchema)); // copy + if (v.type === 'number') { + v.type = 'array'; + v.items = { + type: 'number' + }; + } else if (v.type === 'object') { + for (const k of Object.keys(v.properties)) { + v.properties[k] = convertLog(v.properties[k]); + } + } + return v; +} |