1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
import { Inject, Injectable } from '@nestjs/common';
import { MoreThan } from 'typeorm';
import { DI } from '@/di-symbols.js';
import type { DriveFilesRepository, InstancesRepository } from '@/models/index.js';
import type { Config } from '@/config.js';
import type Logger from '@/logger.js';
import { MetaService } from '@/core/MetaService.js';
import { ApRequestService } from '@/core/activitypub/ApRequestService.js';
import { FederatedInstanceService } from '@/core/FederatedInstanceService.js';
import { FetchInstanceMetadataService } from '@/core/FetchInstanceMetadataService.js';
import { Cache } from '@/misc/cache.js';
import type { Instance } from '@/models/entities/Instance.js';
import InstanceChart from '@/core/chart/charts/instance.js';
import ApRequestChart from '@/core/chart/charts/ap-request.js';
import FederationChart from '@/core/chart/charts/federation.js';
import { StatusError } from '@/misc/status-error.js';
import { UtilityService } from '@/core/UtilityService.js';
import { bindThis } from '@/decorators.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
import type Bull from 'bull';
import type { DeliverJobData } from '../types.js';
@Injectable()
export class DeliverProcessorService {
private logger: Logger;
private suspendedHostsCache: Cache<Instance[]>;
private latest: string | null;
constructor(
@Inject(DI.config)
private config: Config,
@Inject(DI.instancesRepository)
private instancesRepository: InstancesRepository,
@Inject(DI.driveFilesRepository)
private driveFilesRepository: DriveFilesRepository,
private metaService: MetaService,
private utilityService: UtilityService,
private federatedInstanceService: FederatedInstanceService,
private fetchInstanceMetadataService: FetchInstanceMetadataService,
private apRequestService: ApRequestService,
private instanceChart: InstanceChart,
private apRequestChart: ApRequestChart,
private federationChart: FederationChart,
private queueLoggerService: QueueLoggerService,
) {
this.logger = this.queueLoggerService.logger.createSubLogger('deliver');
this.suspendedHostsCache = new Cache<Instance[]>(1000 * 60 * 60);
this.latest = null;
}
@bindThis
public async process(job: Bull.Job<DeliverJobData>): Promise<string> {
const { host } = new URL(job.data.to);
// ブロックしてたら中断
const meta = await this.metaService.fetch();
if (meta.blockedHosts.includes(this.utilityService.toPuny(host))) {
return 'skip (blocked)';
}
// isSuspendedなら中断
let suspendedHosts = this.suspendedHostsCache.get(null);
if (suspendedHosts == null) {
suspendedHosts = await this.instancesRepository.find({
where: {
isSuspended: true,
},
});
this.suspendedHostsCache.set(null, suspendedHosts);
}
if (suspendedHosts.map(x => x.host).includes(this.utilityService.toPuny(host))) {
return 'skip (suspended)';
}
try {
if (this.latest !== (this.latest = JSON.stringify(job.data.content, null, 2))) {
this.logger.debug(`delivering ${this.latest}`);
}
await this.apRequestService.signedPost(job.data.user, job.data.to, job.data.content);
// Update stats
this.federatedInstanceService.registerOrFetchInstanceDoc(host).then(i => {
this.instancesRepository.update(i.id, {
latestStatus: 200,
isNotResponding: false,
});
this.fetchInstanceMetadataService.fetchInstanceMetadata(i);
this.instanceChart.requestSent(i.host, true);
this.apRequestChart.deliverSucc();
this.federationChart.deliverd(i.host, true);
});
return 'Success';
} catch (res) {
// Update stats
this.federatedInstanceService.registerOrFetchInstanceDoc(host).then(i => {
this.instancesRepository.update(i.id, {
latestStatus: res instanceof StatusError ? res.statusCode : null,
isNotResponding: true,
});
this.instanceChart.requestSent(i.host, false);
this.apRequestChart.deliverFail();
this.federationChart.deliverd(i.host, false);
});
if (res instanceof StatusError) {
// 4xx
if (res.isClientError) {
// HTTPステータスコード4xxはクライアントエラーであり、それはつまり
// 何回再送しても成功することはないということなのでエラーにはしないでおく
return `${res.statusCode} ${res.statusMessage}`;
}
// 5xx etc.
throw `${res.statusCode} ${res.statusMessage}`;
} else {
// DNS error, socket error, timeout ...
throw res;
}
}
}
}
|