summaryrefslogtreecommitdiff
path: root/packages/backend/src/queue
diff options
context:
space:
mode:
Diffstat (limited to 'packages/backend/src/queue')
-rw-r--r--packages/backend/src/queue/processors/AggregateRetentionProcessorService.ts23
1 files changed, 16 insertions, 7 deletions
diff --git a/packages/backend/src/queue/processors/AggregateRetentionProcessorService.ts b/packages/backend/src/queue/processors/AggregateRetentionProcessorService.ts
index 02324c6cd4..fcfba75909 100644
--- a/packages/backend/src/queue/processors/AggregateRetentionProcessorService.ts
+++ b/packages/backend/src/queue/processors/AggregateRetentionProcessorService.ts
@@ -7,6 +7,7 @@ import { bindThis } from '@/decorators.js';
import type { RetentionAggregationsRepository, UsersRepository } from '@/models/index.js';
import { deepClone } from '@/misc/clone.js';
import { IdService } from '@/core/IdService.js';
+import { isDuplicateKeyValueError } from '@/misc/is-duplicate-key-value-error.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
import type Bull from 'bull';
@@ -49,13 +50,21 @@ export class AggregateRetentionProcessorService {
});
const targetUserIds = targetUsers.map(u => u.id);
- await this.retentionAggregationsRepository.insert({
- id: this.idService.genId(),
- createdAt: now,
- updatedAt: now,
- userIds: targetUserIds,
- usersCount: targetUserIds.length,
- });
+ try {
+ await this.retentionAggregationsRepository.insert({
+ id: this.idService.genId(),
+ createdAt: now,
+ updatedAt: now,
+ dateKey,
+ userIds: targetUserIds,
+ usersCount: targetUserIds.length,
+ });
+ } catch (err) {
+ if (isDuplicateKeyValueError(err)) {
+ this.logger.succ('Skip because it has already been processed by another worker.');
+ done();
+ }
+ }
// 今日活動したユーザーを全て取得
const activeUsers = await this.usersRepository.findBy({