diff options
Diffstat (limited to 'packages/backend/src/queue')
4 files changed, 88 insertions, 2 deletions
diff --git a/packages/backend/src/queue/QueueProcessorModule.ts b/packages/backend/src/queue/QueueProcessorModule.ts index f13dd3ef19..620296498c 100644 --- a/packages/backend/src/queue/QueueProcessorModule.ts +++ b/packages/backend/src/queue/QueueProcessorModule.ts @@ -29,6 +29,7 @@ import { ImportMutingProcessorService } from './processors/ImportMutingProcessor import { ImportUserListsProcessorService } from './processors/ImportUserListsProcessorService.js'; import { ResyncChartsProcessorService } from './processors/ResyncChartsProcessorService.js'; import { TickChartsProcessorService } from './processors/TickChartsProcessorService.js'; +import { AggregateRetentionProcessorService } from './processors/AggregateRetentionProcessorService.js'; @Module({ imports: [ @@ -63,6 +64,7 @@ import { TickChartsProcessorService } from './processors/TickChartsProcessorServ EndedPollNotificationProcessorService, DeliverProcessorService, InboxProcessorService, + AggregateRetentionProcessorService, QueueProcessorService, ], exports: [ diff --git a/packages/backend/src/queue/QueueProcessorService.ts b/packages/backend/src/queue/QueueProcessorService.ts index 1d2feb5ef8..2123815c4c 100644 --- a/packages/backend/src/queue/QueueProcessorService.ts +++ b/packages/backend/src/queue/QueueProcessorService.ts @@ -4,6 +4,7 @@ import type { Config } from '@/config.js'; import { DI } from '@/di-symbols.js'; import type Logger from '@/logger.js'; import { QueueService } from '@/core/QueueService.js'; +import { bindThis } from '@/decorators.js'; import { getJobInfo } from './get-job-info.js'; import { SystemQueueProcessorsService } from './SystemQueueProcessorsService.js'; import { ObjectStorageQueueProcessorsService } from './ObjectStorageQueueProcessorsService.js'; @@ -13,7 +14,6 @@ import { EndedPollNotificationProcessorService } from './processors/EndedPollNot import { DeliverProcessorService } from './processors/DeliverProcessorService.js'; import { InboxProcessorService } from './processors/InboxProcessorService.js'; import { QueueLoggerService } from './QueueLoggerService.js'; -import { bindThis } from '@/decorators.js'; @Injectable() export class QueueProcessorService { @@ -133,6 +133,12 @@ export class QueueProcessorService { repeat: { cron: '0 0 * * *' }, removeOnComplete: true, }); + + this.queueService.systemQueue.add('aggregateRetention', { + }, { + repeat: { cron: '0 0 * * *' }, + removeOnComplete: true, + }); this.queueService.systemQueue.add('clean', { }, { diff --git a/packages/backend/src/queue/SystemQueueProcessorsService.ts b/packages/backend/src/queue/SystemQueueProcessorsService.ts index 1ce4152b2c..7fb0da4b10 100644 --- a/packages/backend/src/queue/SystemQueueProcessorsService.ts +++ b/packages/backend/src/queue/SystemQueueProcessorsService.ts @@ -1,13 +1,14 @@ import { Inject, Injectable } from '@nestjs/common'; import { DI } from '@/di-symbols.js'; import type { Config } from '@/config.js'; +import { bindThis } from '@/decorators.js'; import { TickChartsProcessorService } from './processors/TickChartsProcessorService.js'; import { ResyncChartsProcessorService } from './processors/ResyncChartsProcessorService.js'; import { CleanChartsProcessorService } from './processors/CleanChartsProcessorService.js'; import { CheckExpiredMutingsProcessorService } from './processors/CheckExpiredMutingsProcessorService.js'; import { CleanProcessorService } from './processors/CleanProcessorService.js'; +import { AggregateRetentionProcessorService } from './processors/AggregateRetentionProcessorService.js'; import type Bull from 'bull'; -import { bindThis } from '@/decorators.js'; @Injectable() export class SystemQueueProcessorsService { @@ -18,6 +19,7 @@ export class SystemQueueProcessorsService { private tickChartsProcessorService: TickChartsProcessorService, private resyncChartsProcessorService: ResyncChartsProcessorService, private cleanChartsProcessorService: CleanChartsProcessorService, + private aggregateRetentionProcessorService: AggregateRetentionProcessorService, private checkExpiredMutingsProcessorService: CheckExpiredMutingsProcessorService, private cleanProcessorService: CleanProcessorService, ) { @@ -28,6 +30,7 @@ export class SystemQueueProcessorsService { q.process('tickCharts', (job, done) => this.tickChartsProcessorService.process(job, done)); q.process('resyncCharts', (job, done) => this.resyncChartsProcessorService.process(job, done)); q.process('cleanCharts', (job, done) => this.cleanChartsProcessorService.process(job, done)); + q.process('aggregateRetention', (job, done) => this.aggregateRetentionProcessorService.process(job, done)); q.process('checkExpiredMutings', (job, done) => this.checkExpiredMutingsProcessorService.process(job, done)); q.process('clean', (job, done) => this.cleanProcessorService.process(job, done)); } diff --git a/packages/backend/src/queue/processors/AggregateRetentionProcessorService.ts b/packages/backend/src/queue/processors/AggregateRetentionProcessorService.ts new file mode 100644 index 0000000000..4650da76bb --- /dev/null +++ b/packages/backend/src/queue/processors/AggregateRetentionProcessorService.ts @@ -0,0 +1,75 @@ +import { Inject, Injectable } from '@nestjs/common'; +import { In, IsNull, MoreThan } from 'typeorm'; +import { DI } from '@/di-symbols.js'; +import type { Config } from '@/config.js'; +import type Logger from '@/logger.js'; +import { bindThis } from '@/decorators.js'; +import type { RetentionAggregationsRepository, UsersRepository } from '@/models/index.js'; +import { deepClone } from '@/misc/clone.js'; +import { IdService } from '@/core/IdService.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import type Bull from 'bull'; + +@Injectable() +export class AggregateRetentionProcessorService { + private logger: Logger; + + constructor( + @Inject(DI.config) + private config: Config, + + @Inject(DI.usersRepository) + private usersRepository: UsersRepository, + + @Inject(DI.retentionAggregationsRepository) + private retentionAggregationsRepository: RetentionAggregationsRepository, + + private idService: IdService, + private queueLoggerService: QueueLoggerService, + ) { + this.logger = this.queueLoggerService.logger.createSubLogger('aggregate-retention'); + } + + @bindThis + public async process(job: Bull.Job<Record<string, unknown>>, done: () => void): Promise<void> { + this.logger.info('Aggregating retention...'); + + const now = new Date(); + const dateKey = `${now.getFullYear()}-${now.getMonth() + 1}-${now.getDate()}`; + + // 過去(だいたい)30日分のレコードを取得 + const pastRecords = await this.retentionAggregationsRepository.findBy({ + createdAt: MoreThan(new Date(Date.now() - (1000 * 60 * 60 * 24 * 31))), + }); + + // 今日登録したユーザーを全て取得 + const targetUsers = await this.usersRepository.findBy({ + host: IsNull(), + createdAt: MoreThan(new Date(Date.now() - (1000 * 60 * 60 * 24))), + }); + const targetUserIds = targetUsers.map(u => u.id); + + await this.retentionAggregationsRepository.insert({ + id: this.idService.genId(), + createdAt: now, + updatedAt: now, + userIds: targetUserIds, + usersCount: targetUserIds.length, + }); + + for (const record of pastRecords) { + const retention = record.userIds.filter(id => targetUserIds.includes(id)).length; + + const data = deepClone(record.data); + data[dateKey] = retention; + + this.retentionAggregationsRepository.update(record.id, { + updatedAt: now, + data, + }); + } + + this.logger.succ('Retention aggregated.'); + done(); + } +} |