summaryrefslogtreecommitdiff
path: root/packages/backend/src/queue
diff options
context:
space:
mode:
Diffstat (limited to 'packages/backend/src/queue')
-rw-r--r--packages/backend/src/queue/QueueProcessorModule.ts2
-rw-r--r--packages/backend/src/queue/QueueProcessorService.ts8
-rw-r--r--packages/backend/src/queue/SystemQueueProcessorsService.ts5
-rw-r--r--packages/backend/src/queue/processors/AggregateRetentionProcessorService.ts75
4 files changed, 88 insertions, 2 deletions
diff --git a/packages/backend/src/queue/QueueProcessorModule.ts b/packages/backend/src/queue/QueueProcessorModule.ts
index f13dd3ef19..620296498c 100644
--- a/packages/backend/src/queue/QueueProcessorModule.ts
+++ b/packages/backend/src/queue/QueueProcessorModule.ts
@@ -29,6 +29,7 @@ import { ImportMutingProcessorService } from './processors/ImportMutingProcessor
import { ImportUserListsProcessorService } from './processors/ImportUserListsProcessorService.js';
import { ResyncChartsProcessorService } from './processors/ResyncChartsProcessorService.js';
import { TickChartsProcessorService } from './processors/TickChartsProcessorService.js';
+import { AggregateRetentionProcessorService } from './processors/AggregateRetentionProcessorService.js';
@Module({
imports: [
@@ -63,6 +64,7 @@ import { TickChartsProcessorService } from './processors/TickChartsProcessorServ
EndedPollNotificationProcessorService,
DeliverProcessorService,
InboxProcessorService,
+ AggregateRetentionProcessorService,
QueueProcessorService,
],
exports: [
diff --git a/packages/backend/src/queue/QueueProcessorService.ts b/packages/backend/src/queue/QueueProcessorService.ts
index 1d2feb5ef8..2123815c4c 100644
--- a/packages/backend/src/queue/QueueProcessorService.ts
+++ b/packages/backend/src/queue/QueueProcessorService.ts
@@ -4,6 +4,7 @@ import type { Config } from '@/config.js';
import { DI } from '@/di-symbols.js';
import type Logger from '@/logger.js';
import { QueueService } from '@/core/QueueService.js';
+import { bindThis } from '@/decorators.js';
import { getJobInfo } from './get-job-info.js';
import { SystemQueueProcessorsService } from './SystemQueueProcessorsService.js';
import { ObjectStorageQueueProcessorsService } from './ObjectStorageQueueProcessorsService.js';
@@ -13,7 +14,6 @@ import { EndedPollNotificationProcessorService } from './processors/EndedPollNot
import { DeliverProcessorService } from './processors/DeliverProcessorService.js';
import { InboxProcessorService } from './processors/InboxProcessorService.js';
import { QueueLoggerService } from './QueueLoggerService.js';
-import { bindThis } from '@/decorators.js';
@Injectable()
export class QueueProcessorService {
@@ -133,6 +133,12 @@ export class QueueProcessorService {
repeat: { cron: '0 0 * * *' },
removeOnComplete: true,
});
+
+ this.queueService.systemQueue.add('aggregateRetention', {
+ }, {
+ repeat: { cron: '0 0 * * *' },
+ removeOnComplete: true,
+ });
this.queueService.systemQueue.add('clean', {
}, {
diff --git a/packages/backend/src/queue/SystemQueueProcessorsService.ts b/packages/backend/src/queue/SystemQueueProcessorsService.ts
index 1ce4152b2c..7fb0da4b10 100644
--- a/packages/backend/src/queue/SystemQueueProcessorsService.ts
+++ b/packages/backend/src/queue/SystemQueueProcessorsService.ts
@@ -1,13 +1,14 @@
import { Inject, Injectable } from '@nestjs/common';
import { DI } from '@/di-symbols.js';
import type { Config } from '@/config.js';
+import { bindThis } from '@/decorators.js';
import { TickChartsProcessorService } from './processors/TickChartsProcessorService.js';
import { ResyncChartsProcessorService } from './processors/ResyncChartsProcessorService.js';
import { CleanChartsProcessorService } from './processors/CleanChartsProcessorService.js';
import { CheckExpiredMutingsProcessorService } from './processors/CheckExpiredMutingsProcessorService.js';
import { CleanProcessorService } from './processors/CleanProcessorService.js';
+import { AggregateRetentionProcessorService } from './processors/AggregateRetentionProcessorService.js';
import type Bull from 'bull';
-import { bindThis } from '@/decorators.js';
@Injectable()
export class SystemQueueProcessorsService {
@@ -18,6 +19,7 @@ export class SystemQueueProcessorsService {
private tickChartsProcessorService: TickChartsProcessorService,
private resyncChartsProcessorService: ResyncChartsProcessorService,
private cleanChartsProcessorService: CleanChartsProcessorService,
+ private aggregateRetentionProcessorService: AggregateRetentionProcessorService,
private checkExpiredMutingsProcessorService: CheckExpiredMutingsProcessorService,
private cleanProcessorService: CleanProcessorService,
) {
@@ -28,6 +30,7 @@ export class SystemQueueProcessorsService {
q.process('tickCharts', (job, done) => this.tickChartsProcessorService.process(job, done));
q.process('resyncCharts', (job, done) => this.resyncChartsProcessorService.process(job, done));
q.process('cleanCharts', (job, done) => this.cleanChartsProcessorService.process(job, done));
+ q.process('aggregateRetention', (job, done) => this.aggregateRetentionProcessorService.process(job, done));
q.process('checkExpiredMutings', (job, done) => this.checkExpiredMutingsProcessorService.process(job, done));
q.process('clean', (job, done) => this.cleanProcessorService.process(job, done));
}
diff --git a/packages/backend/src/queue/processors/AggregateRetentionProcessorService.ts b/packages/backend/src/queue/processors/AggregateRetentionProcessorService.ts
new file mode 100644
index 0000000000..4650da76bb
--- /dev/null
+++ b/packages/backend/src/queue/processors/AggregateRetentionProcessorService.ts
@@ -0,0 +1,75 @@
+import { Inject, Injectable } from '@nestjs/common';
+import { In, IsNull, MoreThan } from 'typeorm';
+import { DI } from '@/di-symbols.js';
+import type { Config } from '@/config.js';
+import type Logger from '@/logger.js';
+import { bindThis } from '@/decorators.js';
+import type { RetentionAggregationsRepository, UsersRepository } from '@/models/index.js';
+import { deepClone } from '@/misc/clone.js';
+import { IdService } from '@/core/IdService.js';
+import { QueueLoggerService } from '../QueueLoggerService.js';
+import type Bull from 'bull';
+
+@Injectable()
+export class AggregateRetentionProcessorService {
+ private logger: Logger;
+
+ constructor(
+ @Inject(DI.config)
+ private config: Config,
+
+ @Inject(DI.usersRepository)
+ private usersRepository: UsersRepository,
+
+ @Inject(DI.retentionAggregationsRepository)
+ private retentionAggregationsRepository: RetentionAggregationsRepository,
+
+ private idService: IdService,
+ private queueLoggerService: QueueLoggerService,
+ ) {
+ this.logger = this.queueLoggerService.logger.createSubLogger('aggregate-retention');
+ }
+
+ @bindThis
+ public async process(job: Bull.Job<Record<string, unknown>>, done: () => void): Promise<void> {
+ this.logger.info('Aggregating retention...');
+
+ const now = new Date();
+ const dateKey = `${now.getFullYear()}-${now.getMonth() + 1}-${now.getDate()}`;
+
+ // 過去(だいたい)30日分のレコードを取得
+ const pastRecords = await this.retentionAggregationsRepository.findBy({
+ createdAt: MoreThan(new Date(Date.now() - (1000 * 60 * 60 * 24 * 31))),
+ });
+
+ // 今日登録したユーザーを全て取得
+ const targetUsers = await this.usersRepository.findBy({
+ host: IsNull(),
+ createdAt: MoreThan(new Date(Date.now() - (1000 * 60 * 60 * 24))),
+ });
+ const targetUserIds = targetUsers.map(u => u.id);
+
+ await this.retentionAggregationsRepository.insert({
+ id: this.idService.genId(),
+ createdAt: now,
+ updatedAt: now,
+ userIds: targetUserIds,
+ usersCount: targetUserIds.length,
+ });
+
+ for (const record of pastRecords) {
+ const retention = record.userIds.filter(id => targetUserIds.includes(id)).length;
+
+ const data = deepClone(record.data);
+ data[dateKey] = retention;
+
+ this.retentionAggregationsRepository.update(record.id, {
+ updatedAt: now,
+ data,
+ });
+ }
+
+ this.logger.succ('Retention aggregated.');
+ done();
+ }
+}