summaryrefslogtreecommitdiff
path: root/src/queue
diff options
context:
space:
mode:
Diffstat (limited to 'src/queue')
-rw-r--r--src/queue/get-job-info.ts15
-rw-r--r--src/queue/index.ts17
-rw-r--r--src/queue/processors/deliver.ts22
3 files changed, 43 insertions, 11 deletions
diff --git a/src/queue/get-job-info.ts b/src/queue/get-job-info.ts
new file mode 100644
index 0000000000..f601ae62d0
--- /dev/null
+++ b/src/queue/get-job-info.ts
@@ -0,0 +1,15 @@
+import * as Bull from 'bull';
+
+export function getJobInfo(job: Bull.Job, increment = false) {
+ const age = Date.now() - job.timestamp;
+
+ const formated = age > 60000 ? `${Math.floor(age / 1000 / 60)}m`
+ : age > 10000 ? `${Math.floor(age / 1000)}s`
+ : `${age}ms`;
+
+ // onActiveとかonCompletedのattemptsMadeがなぜか0始まりなのでインクリメントする
+ const currentAttempts = job.attemptsMade + (increment ? 1 : 0);
+ const maxAttempts = job.opts ? job.opts.attempts : 0;
+
+ return `id=${job.id} attempts=${currentAttempts}/${maxAttempts} age=${formated}`;
+}
diff --git a/src/queue/index.ts b/src/queue/index.ts
index 0b20017291..2d3cf6ee39 100644
--- a/src/queue/index.ts
+++ b/src/queue/index.ts
@@ -11,6 +11,7 @@ import processDb from './processors/db';
import procesObjectStorage from './processors/object-storage';
import { queueLogger } from './logger';
import { DriveFile } from '../models/entities/drive-file';
+import { getJobInfo } from './get-job-info';
function initializeQueue(name: string) {
return new Queue(name, {
@@ -44,19 +45,19 @@ const objectStorageLogger = queueLogger.createSubLogger('objectStorage');
deliverQueue
.on('waiting', (jobId) => deliverLogger.debug(`waiting id=${jobId}`))
- .on('active', (job) => deliverLogger.debug(`active id=${job.id} to=${job.data.to}`))
- .on('completed', (job, result) => deliverLogger.debug(`completed(${result}) id=${job.id} to=${job.data.to}`))
- .on('failed', (job, err) => deliverLogger.warn(`failed(${err}) id=${job.id} to=${job.data.to}`, { job, e: renderError(err) }))
+ .on('active', (job) => deliverLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`))
+ .on('completed', (job, result) => deliverLogger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`))
+ .on('failed', (job, err) => deliverLogger.warn(`failed(${err}) ${getJobInfo(job)} to=${job.data.to}`))
.on('error', (job: any, err: Error) => deliverLogger.error(`error ${err}`, { job, e: renderError(err) }))
- .on('stalled', (job) => deliverLogger.warn(`stalled id=${job.id} to=${job.data.to}`));
+ .on('stalled', (job) => deliverLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`));
inboxQueue
.on('waiting', (jobId) => inboxLogger.debug(`waiting id=${jobId}`))
- .on('active', (job) => inboxLogger.debug(`active id=${job.id}`))
- .on('completed', (job, result) => inboxLogger.debug(`completed(${result}) id=${job.id}`))
- .on('failed', (job, err) => inboxLogger.warn(`failed(${err}) id=${job.id} activity=${job.data.activity ? job.data.activity.id : 'none'}`, { job, e: renderError(err) }))
+ .on('active', (job) => inboxLogger.debug(`active ${getJobInfo(job, true)}`))
+ .on('completed', (job, result) => inboxLogger.debug(`completed(${result}) ${getJobInfo(job, true)}`))
+ .on('failed', (job, err) => inboxLogger.warn(`failed(${err}) ${getJobInfo(job)} activity=${job.data.activity ? job.data.activity.id : 'none'}`, { job, e: renderError(err) }))
.on('error', (job: any, err: Error) => inboxLogger.error(`error ${err}`, { job, e: renderError(err) }))
- .on('stalled', (job) => inboxLogger.warn(`stalled id=${job.id} activity=${job.data.activity ? job.data.activity.id : 'none'}`));
+ .on('stalled', (job) => inboxLogger.warn(`stalled ${getJobInfo(job)} activity=${job.data.activity ? job.data.activity.id : 'none'}`));
dbQueue
.on('waiting', (jobId) => dbLogger.debug(`waiting id=${jobId}`))
diff --git a/src/queue/processors/deliver.ts b/src/queue/processors/deliver.ts
index b252c20163..980ca3a437 100644
--- a/src/queue/processors/deliver.ts
+++ b/src/queue/processors/deliver.ts
@@ -5,6 +5,8 @@ import Logger from '../../services/logger';
import { Instances } from '../../models';
import { instanceChart } from '../../services/chart';
import { fetchNodeinfo } from '../../services/fetch-nodeinfo';
+import { fetchMeta } from '../../misc/fetch-meta';
+import { toPuny } from '../../misc/convert-host';
const logger = new Logger('deliver');
@@ -13,6 +15,23 @@ let latest: string | null = null;
export default async (job: Bull.Job) => {
const { host } = new URL(job.data.to);
+ // ブロックしてたら中断
+ const meta = await fetchMeta();
+ if (meta.blockedHosts.includes(toPuny(host))) {
+ return 'skip (blocked)';
+ }
+
+ // closedなら中断
+ const closedHosts = await Instances.find({
+ where: {
+ isMarkedAsClosed: true
+ },
+ cache: 60 * 1000
+ });
+ if (closedHosts.map(x => x.host).includes(toPuny(host))) {
+ return 'skip (closed)';
+ }
+
try {
if (latest !== (latest = JSON.stringify(job.data.content, null, 2))) {
logger.debug(`delivering ${latest}`);
@@ -48,8 +67,6 @@ export default async (job: Bull.Job) => {
});
if (res != null && res.hasOwnProperty('statusCode')) {
- logger.warn(`deliver failed: ${res.statusCode} ${res.statusMessage} to=${job.data.to}`);
-
// 4xx
if (res.statusCode >= 400 && res.statusCode < 500) {
// HTTPステータスコード4xxはクライアントエラーであり、それはつまり
@@ -61,7 +78,6 @@ export default async (job: Bull.Job) => {
throw `${res.statusCode} ${res.statusMessage}`;
} else {
// DNS error, socket error, timeout ...
- logger.warn(`deliver failed: ${res} to=${job.data.to}`);
throw res;
}
}