summaryrefslogtreecommitdiff
path: root/packages/backend/src/queue/processors/AggregateRetentionProcessorService.ts
diff options
context:
space:
mode:
Diffstat (limited to 'packages/backend/src/queue/processors/AggregateRetentionProcessorService.ts')
-rw-r--r--packages/backend/src/queue/processors/AggregateRetentionProcessorService.ts75
1 files changed, 75 insertions, 0 deletions
diff --git a/packages/backend/src/queue/processors/AggregateRetentionProcessorService.ts b/packages/backend/src/queue/processors/AggregateRetentionProcessorService.ts
new file mode 100644
index 0000000000..4650da76bb
--- /dev/null
+++ b/packages/backend/src/queue/processors/AggregateRetentionProcessorService.ts
@@ -0,0 +1,75 @@
+import { Inject, Injectable } from '@nestjs/common';
+import { In, IsNull, MoreThan } from 'typeorm';
+import { DI } from '@/di-symbols.js';
+import type { Config } from '@/config.js';
+import type Logger from '@/logger.js';
+import { bindThis } from '@/decorators.js';
+import type { RetentionAggregationsRepository, UsersRepository } from '@/models/index.js';
+import { deepClone } from '@/misc/clone.js';
+import { IdService } from '@/core/IdService.js';
+import { QueueLoggerService } from '../QueueLoggerService.js';
+import type Bull from 'bull';
+
+@Injectable()
+export class AggregateRetentionProcessorService {
+ private logger: Logger;
+
+ constructor(
+ @Inject(DI.config)
+ private config: Config,
+
+ @Inject(DI.usersRepository)
+ private usersRepository: UsersRepository,
+
+ @Inject(DI.retentionAggregationsRepository)
+ private retentionAggregationsRepository: RetentionAggregationsRepository,
+
+ private idService: IdService,
+ private queueLoggerService: QueueLoggerService,
+ ) {
+ this.logger = this.queueLoggerService.logger.createSubLogger('aggregate-retention');
+ }
+
+ @bindThis
+ public async process(job: Bull.Job<Record<string, unknown>>, done: () => void): Promise<void> {
+ this.logger.info('Aggregating retention...');
+
+ const now = new Date();
+ const dateKey = `${now.getFullYear()}-${now.getMonth() + 1}-${now.getDate()}`;
+
+ // 過去(だいたい)30日分のレコードを取得
+ const pastRecords = await this.retentionAggregationsRepository.findBy({
+ createdAt: MoreThan(new Date(Date.now() - (1000 * 60 * 60 * 24 * 31))),
+ });
+
+ // 今日登録したユーザーを全て取得
+ const targetUsers = await this.usersRepository.findBy({
+ host: IsNull(),
+ createdAt: MoreThan(new Date(Date.now() - (1000 * 60 * 60 * 24))),
+ });
+ const targetUserIds = targetUsers.map(u => u.id);
+
+ await this.retentionAggregationsRepository.insert({
+ id: this.idService.genId(),
+ createdAt: now,
+ updatedAt: now,
+ userIds: targetUserIds,
+ usersCount: targetUserIds.length,
+ });
+
+ for (const record of pastRecords) {
+ const retention = record.userIds.filter(id => targetUserIds.includes(id)).length;
+
+ const data = deepClone(record.data);
+ data[dateKey] = retention;
+
+ this.retentionAggregationsRepository.update(record.id, {
+ updatedAt: now,
+ data,
+ });
+ }
+
+ this.logger.succ('Retention aggregated.');
+ done();
+ }
+}