summaryrefslogtreecommitdiff
path: root/packages/backend/src/core/QueueService.ts
diff options
context:
space:
mode:
Diffstat (limited to 'packages/backend/src/core/QueueService.ts')
-rw-r--r--packages/backend/src/core/QueueService.ts118
1 files changed, 63 insertions, 55 deletions
diff --git a/packages/backend/src/core/QueueService.ts b/packages/backend/src/core/QueueService.ts
index 04bbc7e38a..0f225a8242 100644
--- a/packages/backend/src/core/QueueService.ts
+++ b/packages/backend/src/core/QueueService.ts
@@ -17,6 +17,7 @@ import { bindThis } from '@/decorators.js';
import type { Antenna } from '@/server/api/endpoints/i/import-antennas.js';
import { ApRequestCreator } from '@/core/activitypub/ApRequestService.js';
import { type SystemWebhookPayload } from '@/core/SystemWebhookService.js';
+import type { Packed } from '@/misc/json-schema.js';
import { type UserWebhookPayload } from './UserWebhookService.js';
import type {
DbJobData,
@@ -39,7 +40,6 @@ import type {
} from './QueueModule.js';
import type httpSignature from '@peertube/http-signature';
import type * as Bull from 'bullmq';
-import type { Packed } from '@/misc/json-schema.js';
export const QUEUE_TYPES = [
'system',
@@ -53,6 +53,37 @@ export const QUEUE_TYPES = [
'systemWebhookDeliver',
] as const;
+const REPEATABLE_SYSTEM_JOB_DEF = [{
+ name: 'tickCharts',
+ pattern: '55 * * * *',
+}, {
+ name: 'resyncCharts',
+ pattern: '0 0 * * *',
+}, {
+ name: 'cleanCharts',
+ pattern: '0 0 * * *',
+}, {
+ name: 'aggregateRetention',
+ pattern: '0 0 * * *',
+}, {
+ name: 'clean',
+ pattern: '0 0 * * *',
+}, {
+ name: 'checkExpiredMutings',
+ pattern: '*/5 * * * *',
+}, {
+ name: 'bakeBufferedReactions',
+ pattern: '0 0 * * *',
+}, {
+ name: 'checkModeratorsActivity',
+ // 毎時30分に起動
+ pattern: '30 * * * *',
+}, {
+ name: 'cleanRemoteNotes',
+ // 毎日午前4時に起動(最も人の少ない時間帯)
+ pattern: '0 4 * * *',
+}];
+
@Injectable()
export class QueueService {
constructor(
@@ -69,61 +100,31 @@ export class QueueService {
@Inject('queue:userWebhookDeliver') public userWebhookDeliverQueue: UserWebhookDeliverQueue,
@Inject('queue:systemWebhookDeliver') public systemWebhookDeliverQueue: SystemWebhookDeliverQueue,
) {
- this.systemQueue.add('tickCharts', {
- }, {
- repeat: { pattern: '55 * * * *' },
- removeOnComplete: 10,
- removeOnFail: 30,
- });
-
- this.systemQueue.add('resyncCharts', {
- }, {
- repeat: { pattern: '0 0 * * *' },
- removeOnComplete: 10,
- removeOnFail: 30,
- });
-
- this.systemQueue.add('cleanCharts', {
- }, {
- repeat: { pattern: '0 0 * * *' },
- removeOnComplete: 10,
- removeOnFail: 30,
- });
-
- this.systemQueue.add('aggregateRetention', {
- }, {
- repeat: { pattern: '0 0 * * *' },
- removeOnComplete: 10,
- removeOnFail: 30,
- });
-
- this.systemQueue.add('clean', {
- }, {
- repeat: { pattern: '0 0 * * *' },
- removeOnComplete: 10,
- removeOnFail: 30,
- });
-
- this.systemQueue.add('checkExpiredMutings', {
- }, {
- repeat: { pattern: '*/5 * * * *' },
- removeOnComplete: 10,
- removeOnFail: 30,
- });
-
- this.systemQueue.add('bakeBufferedReactions', {
- }, {
- repeat: { pattern: '0 0 * * *' },
- removeOnComplete: 10,
- removeOnFail: 30,
- });
+ for (const def of REPEATABLE_SYSTEM_JOB_DEF) {
+ this.systemQueue.upsertJobScheduler(def.name, {
+ pattern: def.pattern,
+ immediately: false,
+ }, {
+ name: def.name,
+ opts: {
+ // 期限ではなくcountで設定したいが、ジョブごとではなくキュー全体でカウントされるため、高頻度で実行されるジョブによって低頻度で実行されるジョブのログが消えることになる
+ removeOnComplete: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ },
+ removeOnFail: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ },
+ },
+ });
+ }
- this.systemQueue.add('checkModeratorsActivity', {
- }, {
- // 毎時30分に起動
- repeat: { pattern: '30 * * * *' },
- removeOnComplete: 10,
- removeOnFail: 30,
+ // 古いバージョンで作成され現在使われなくなったrepeatableジョブをクリーンアップ
+ this.systemQueue.getJobSchedulers().then(schedulers => {
+ for (const scheduler of schedulers) {
+ if (!REPEATABLE_SYSTEM_JOB_DEF.some(def => def.name === scheduler.key)) {
+ this.systemQueue.removeJobScheduler(scheduler.key);
+ }
+ }
});
}
@@ -811,6 +812,13 @@ export class QueueService {
}
@bindThis
+ public async queueGetJobLogs(queueType: typeof QUEUE_TYPES[number], jobId: string) {
+ const queue = this.getQueue(queueType);
+ const result = await queue.getJobLogs(jobId);
+ return result.logs;
+ }
+
+ @bindThis
public async queueGetJobs(queueType: typeof QUEUE_TYPES[number], jobTypes: JobType[], search?: string) {
const RETURN_LIMIT = 100;
const queue = this.getQueue(queueType);