diff options
Diffstat (limited to 'packages/backend/src/queue/processors')
17 files changed, 118 insertions, 53 deletions
diff --git a/packages/backend/src/queue/processors/db/delete-account.ts b/packages/backend/src/queue/processors/db/delete-account.ts index dbc1f16a46..c1657b4be6 100644 --- a/packages/backend/src/queue/processors/db/delete-account.ts +++ b/packages/backend/src/queue/processors/db/delete-account.ts @@ -13,7 +13,7 @@ const logger = queueLogger.createSubLogger('delete-account'); export async function deleteAccount(job: Bull.Job<DbUserDeleteJobData>): Promise<string | void> { logger.info(`Deleting account of ${job.data.user.id} ...`); - const user = await Users.findOne(job.data.user.id); + const user = await Users.findOneBy({ id: job.data.user.id }); if (user == null) { return; } @@ -75,7 +75,7 @@ export async function deleteAccount(job: Bull.Job<DbUserDeleteJobData>): Promise } { // Send email notification - const profile = await UserProfiles.findOneOrFail(user.id); + const profile = await UserProfiles.findOneByOrFail({ userId: user.id }); if (profile.email && profile.emailVerified) { sendEmail(profile.email, 'Account deleted', `Your account has been deleted.`, diff --git a/packages/backend/src/queue/processors/db/delete-drive-files.ts b/packages/backend/src/queue/processors/db/delete-drive-files.ts index f6a8699855..b3832d9f04 100644 --- a/packages/backend/src/queue/processors/db/delete-drive-files.ts +++ b/packages/backend/src/queue/processors/db/delete-drive-files.ts @@ -11,7 +11,7 @@ const logger = queueLogger.createSubLogger('delete-drive-files'); export async function deleteDriveFiles(job: Bull.Job<DbUserJobData>, done: any): Promise<void> { logger.info(`Deleting drive files of ${job.data.user.id} ...`); - const user = await Users.findOne(job.data.user.id); + const user = await Users.findOneBy({ id: job.data.user.id }); if (user == null) { done(); return; @@ -44,7 +44,7 @@ export async function deleteDriveFiles(job: Bull.Job<DbUserJobData>, done: any): deletedCount++; } - const total = await DriveFiles.count({ + const total = await DriveFiles.countBy({ userId: user.id, }); diff --git a/packages/backend/src/queue/processors/db/export-blocking.ts b/packages/backend/src/queue/processors/db/export-blocking.ts index 83f1ec8fd6..166c9e4cd3 100644 --- a/packages/backend/src/queue/processors/db/export-blocking.ts +++ b/packages/backend/src/queue/processors/db/export-blocking.ts @@ -15,7 +15,7 @@ const logger = queueLogger.createSubLogger('export-blocking'); export async function exportBlocking(job: Bull.Job<DbUserJobData>, done: any): Promise<void> { logger.info(`Exporting blocking of ${job.data.user.id} ...`); - const user = await Users.findOne(job.data.user.id); + const user = await Users.findOneBy({ id: job.data.user.id }); if (user == null) { done(); return; @@ -56,7 +56,7 @@ export async function exportBlocking(job: Bull.Job<DbUserJobData>, done: any): P cursor = blockings[blockings.length - 1].id; for (const block of blockings) { - const u = await Users.findOne({ id: block.blockeeId }); + const u = await Users.findOneBy({ id: block.blockeeId }); if (u == null) { exportedCount++; continue; } @@ -75,7 +75,7 @@ export async function exportBlocking(job: Bull.Job<DbUserJobData>, done: any): P exportedCount++; } - const total = await Blockings.count({ + const total = await Blockings.countBy({ blockerId: user.id, }); diff --git a/packages/backend/src/queue/processors/db/export-custom-emojis.ts b/packages/backend/src/queue/processors/db/export-custom-emojis.ts index a65b46cc00..c2467fb5f0 100644 --- a/packages/backend/src/queue/processors/db/export-custom-emojis.ts +++ b/packages/backend/src/queue/processors/db/export-custom-emojis.ts @@ -12,13 +12,14 @@ import { Users, Emojis } from '@/models/index.js'; import { } from '@/queue/types.js'; import { downloadUrl } from '@/misc/download-url.js'; import config from '@/config/index.js'; +import { IsNull } from 'typeorm'; const logger = queueLogger.createSubLogger('export-custom-emojis'); export async function exportCustomEmojis(job: Bull.Job, done: () => void): Promise<void> { logger.info(`Exporting custom emojis ...`); - const user = await Users.findOne(job.data.user.id); + const user = await Users.findOneBy({ id: job.data.user.id }); if (user == null) { done(); return; @@ -57,7 +58,7 @@ export async function exportCustomEmojis(job: Bull.Job, done: () => void): Promi const customEmojis = await Emojis.find({ where: { - host: null, + host: IsNull(), }, order: { id: 'ASC', diff --git a/packages/backend/src/queue/processors/db/export-following.ts b/packages/backend/src/queue/processors/db/export-following.ts index 162862180b..965500ac27 100644 --- a/packages/backend/src/queue/processors/db/export-following.ts +++ b/packages/backend/src/queue/processors/db/export-following.ts @@ -16,7 +16,7 @@ const logger = queueLogger.createSubLogger('export-following'); export async function exportFollowing(job: Bull.Job<DbUserJobData>, done: () => void): Promise<void> { logger.info(`Exporting following of ${job.data.user.id} ...`); - const user = await Users.findOne(job.data.user.id); + const user = await Users.findOneBy({ id: job.data.user.id }); if (user == null) { done(); return; @@ -36,7 +36,7 @@ export async function exportFollowing(job: Bull.Job<DbUserJobData>, done: () => let cursor: Following['id'] | null = null; - const mutings = job.data.excludeMuting ? await Mutings.find({ + const mutings = job.data.excludeMuting ? await Mutings.findBy({ muterId: user.id, }) : []; @@ -60,7 +60,7 @@ export async function exportFollowing(job: Bull.Job<DbUserJobData>, done: () => cursor = followings[followings.length - 1].id; for (const following of followings) { - const u = await Users.findOne({ id: following.followeeId }); + const u = await Users.findOneBy({ id: following.followeeId }); if (u == null) { continue; } diff --git a/packages/backend/src/queue/processors/db/export-mute.ts b/packages/backend/src/queue/processors/db/export-mute.ts index 9fb144abb2..0ef81971f1 100644 --- a/packages/backend/src/queue/processors/db/export-mute.ts +++ b/packages/backend/src/queue/processors/db/export-mute.ts @@ -15,7 +15,7 @@ const logger = queueLogger.createSubLogger('export-mute'); export async function exportMute(job: Bull.Job<DbUserJobData>, done: any): Promise<void> { logger.info(`Exporting mute of ${job.data.user.id} ...`); - const user = await Users.findOne(job.data.user.id); + const user = await Users.findOneBy({ id: job.data.user.id }); if (user == null) { done(); return; @@ -57,7 +57,7 @@ export async function exportMute(job: Bull.Job<DbUserJobData>, done: any): Promi cursor = mutes[mutes.length - 1].id; for (const mute of mutes) { - const u = await Users.findOne({ id: mute.muteeId }); + const u = await Users.findOneBy({ id: mute.muteeId }); if (u == null) { exportedCount++; continue; } @@ -76,7 +76,7 @@ export async function exportMute(job: Bull.Job<DbUserJobData>, done: any): Promi exportedCount++; } - const total = await Mutings.count({ + const total = await Mutings.countBy({ muterId: user.id, }); diff --git a/packages/backend/src/queue/processors/db/export-notes.ts b/packages/backend/src/queue/processors/db/export-notes.ts index c79679366c..7e12a6fac2 100644 --- a/packages/backend/src/queue/processors/db/export-notes.ts +++ b/packages/backend/src/queue/processors/db/export-notes.ts @@ -16,7 +16,7 @@ const logger = queueLogger.createSubLogger('export-notes'); export async function exportNotes(job: Bull.Job<DbUserJobData>, done: any): Promise<void> { logger.info(`Exporting notes of ${job.data.user.id} ...`); - const user = await Users.findOne(job.data.user.id); + const user = await Users.findOneBy({ id: job.data.user.id }); if (user == null) { done(); return; @@ -74,7 +74,7 @@ export async function exportNotes(job: Bull.Job<DbUserJobData>, done: any): Prom for (const note of notes) { let poll: Poll | undefined; if (note.hasPoll) { - poll = await Polls.findOneOrFail({ noteId: note.id }); + poll = await Polls.findOneByOrFail({ noteId: note.id }); } const content = JSON.stringify(serialize(note, poll)); const isFirst = exportedNotesCount === 0; @@ -82,7 +82,7 @@ export async function exportNotes(job: Bull.Job<DbUserJobData>, done: any): Prom exportedNotesCount++; } - const total = await Notes.count({ + const total = await Notes.countBy({ userId: user.id, }); diff --git a/packages/backend/src/queue/processors/db/export-user-lists.ts b/packages/backend/src/queue/processors/db/export-user-lists.ts index 1c04c36789..45852a6038 100644 --- a/packages/backend/src/queue/processors/db/export-user-lists.ts +++ b/packages/backend/src/queue/processors/db/export-user-lists.ts @@ -15,13 +15,13 @@ const logger = queueLogger.createSubLogger('export-user-lists'); export async function exportUserLists(job: Bull.Job<DbUserJobData>, done: any): Promise<void> { logger.info(`Exporting user lists of ${job.data.user.id} ...`); - const user = await Users.findOne(job.data.user.id); + const user = await Users.findOneBy({ id: job.data.user.id }); if (user == null) { done(); return; } - const lists = await UserLists.find({ + const lists = await UserLists.findBy({ userId: user.id, }); @@ -38,8 +38,8 @@ export async function exportUserLists(job: Bull.Job<DbUserJobData>, done: any): const stream = fs.createWriteStream(path, { flags: 'a' }); for (const list of lists) { - const joinings = await UserListJoinings.find({ userListId: list.id }); - const users = await Users.find({ + const joinings = await UserListJoinings.findBy({ userListId: list.id }); + const users = await Users.findBy({ id: In(joinings.map(j => j.userId)), }); diff --git a/packages/backend/src/queue/processors/db/import-blocking.ts b/packages/backend/src/queue/processors/db/import-blocking.ts index 857c2629e3..8bddf34bc2 100644 --- a/packages/backend/src/queue/processors/db/import-blocking.ts +++ b/packages/backend/src/queue/processors/db/import-blocking.ts @@ -8,19 +8,20 @@ import { isSelfHost, toPuny } from '@/misc/convert-host.js'; import { Users, DriveFiles, Blockings } from '@/models/index.js'; import { DbUserImportJobData } from '@/queue/types.js'; import block from '@/services/blocking/create.js'; +import { IsNull } from 'typeorm'; const logger = queueLogger.createSubLogger('import-blocking'); export async function importBlocking(job: Bull.Job<DbUserImportJobData>, done: any): Promise<void> { logger.info(`Importing blocking of ${job.data.user.id} ...`); - const user = await Users.findOne(job.data.user.id); + const user = await Users.findOneBy({ id: job.data.user.id }); if (user == null) { done(); return; } - const file = await DriveFiles.findOne({ + const file = await DriveFiles.findOneBy({ id: job.data.fileId, }); if (file == null) { @@ -39,10 +40,10 @@ export async function importBlocking(job: Bull.Job<DbUserImportJobData>, done: a const acct = line.split(',')[0].trim(); const { username, host } = Acct.parse(acct); - let target = isSelfHost(host!) ? await Users.findOne({ - host: null, + let target = isSelfHost(host!) ? await Users.findOneBy({ + host: IsNull(), usernameLower: username.toLowerCase(), - }) : await Users.findOne({ + }) : await Users.findOneBy({ host: toPuny(host!), usernameLower: username.toLowerCase(), }); diff --git a/packages/backend/src/queue/processors/db/import-custom-emojis.ts b/packages/backend/src/queue/processors/db/import-custom-emojis.ts index f862276b47..28e0b867a4 100644 --- a/packages/backend/src/queue/processors/db/import-custom-emojis.ts +++ b/packages/backend/src/queue/processors/db/import-custom-emojis.ts @@ -2,7 +2,6 @@ import Bull from 'bull'; import * as tmp from 'tmp'; import * as fs from 'node:fs'; import unzipper from 'unzipper'; -import { getConnection } from 'typeorm'; import { queueLogger } from '../../logger.js'; import { downloadUrl } from '@/misc/download-url.js'; @@ -10,6 +9,7 @@ import { DriveFiles, Emojis } from '@/models/index.js'; import { DbUserImportJobData } from '@/queue/types.js'; import { addFile } from '@/services/drive/add-file.js'; import { genId } from '@/misc/gen-id.js'; +import { db } from '@/db/postgre.js'; const logger = queueLogger.createSubLogger('import-custom-emojis'); @@ -17,7 +17,7 @@ const logger = queueLogger.createSubLogger('import-custom-emojis'); export async function importCustomEmojis(job: Bull.Job<DbUserImportJobData>, done: any): Promise<void> { logger.info(`Importing custom emojis ...`); - const file = await DriveFiles.findOne({ + const file = await DriveFiles.findOneBy({ id: job.data.fileId, }); if (file == null) { @@ -72,10 +72,10 @@ export async function importCustomEmojis(job: Bull.Job<DbUserImportJobData>, don originalUrl: driveFile.url, publicUrl: driveFile.webpublicUrl ?? driveFile.url, type: driveFile.webpublicType ?? driveFile.type, - }).then(x => Emojis.findOneOrFail(x.identifiers[0])); + }).then(x => Emojis.findOneByOrFail(x.identifiers[0])); } - await getConnection().queryResultCache!.remove(['meta_emojis']); + await db.queryResultCache!.remove(['meta_emojis']); cleanup(); diff --git a/packages/backend/src/queue/processors/db/import-following.ts b/packages/backend/src/queue/processors/db/import-following.ts index 235fc28394..8ce2c367d6 100644 --- a/packages/backend/src/queue/processors/db/import-following.ts +++ b/packages/backend/src/queue/processors/db/import-following.ts @@ -8,19 +8,20 @@ import { downloadTextFile } from '@/misc/download-text-file.js'; import { isSelfHost, toPuny } from '@/misc/convert-host.js'; import { Users, DriveFiles } from '@/models/index.js'; import { DbUserImportJobData } from '@/queue/types.js'; +import { IsNull } from 'typeorm'; const logger = queueLogger.createSubLogger('import-following'); export async function importFollowing(job: Bull.Job<DbUserImportJobData>, done: any): Promise<void> { logger.info(`Importing following of ${job.data.user.id} ...`); - const user = await Users.findOne(job.data.user.id); + const user = await Users.findOneBy({ id: job.data.user.id }); if (user == null) { done(); return; } - const file = await DriveFiles.findOne({ + const file = await DriveFiles.findOneBy({ id: job.data.fileId, }); if (file == null) { @@ -39,10 +40,10 @@ export async function importFollowing(job: Bull.Job<DbUserImportJobData>, done: const acct = line.split(',')[0].trim(); const { username, host } = Acct.parse(acct); - let target = isSelfHost(host!) ? await Users.findOne({ - host: null, + let target = isSelfHost(host!) ? await Users.findOneBy({ + host: IsNull(), usernameLower: username.toLowerCase(), - }) : await Users.findOne({ + }) : await Users.findOneBy({ host: toPuny(host!), usernameLower: username.toLowerCase(), }); diff --git a/packages/backend/src/queue/processors/db/import-muting.ts b/packages/backend/src/queue/processors/db/import-muting.ts index 32f5f6bbee..8552b797be 100644 --- a/packages/backend/src/queue/processors/db/import-muting.ts +++ b/packages/backend/src/queue/processors/db/import-muting.ts @@ -9,19 +9,20 @@ import { Users, DriveFiles, Mutings } from '@/models/index.js'; import { DbUserImportJobData } from '@/queue/types.js'; import { User } from '@/models/entities/user.js'; import { genId } from '@/misc/gen-id.js'; +import { IsNull } from 'typeorm'; const logger = queueLogger.createSubLogger('import-muting'); export async function importMuting(job: Bull.Job<DbUserImportJobData>, done: any): Promise<void> { logger.info(`Importing muting of ${job.data.user.id} ...`); - const user = await Users.findOne(job.data.user.id); + const user = await Users.findOneBy({ id: job.data.user.id }); if (user == null) { done(); return; } - const file = await DriveFiles.findOne({ + const file = await DriveFiles.findOneBy({ id: job.data.fileId, }); if (file == null) { @@ -40,10 +41,10 @@ export async function importMuting(job: Bull.Job<DbUserImportJobData>, done: any const acct = line.split(',')[0].trim(); const { username, host } = Acct.parse(acct); - let target = isSelfHost(host!) ? await Users.findOne({ - host: null, + let target = isSelfHost(host!) ? await Users.findOneBy({ + host: IsNull(), usernameLower: username.toLowerCase(), - }) : await Users.findOne({ + }) : await Users.findOneBy({ host: toPuny(host!), usernameLower: username.toLowerCase(), }); diff --git a/packages/backend/src/queue/processors/db/import-user-lists.ts b/packages/backend/src/queue/processors/db/import-user-lists.ts index ae263e19b0..9919b7c53c 100644 --- a/packages/backend/src/queue/processors/db/import-user-lists.ts +++ b/packages/backend/src/queue/processors/db/import-user-lists.ts @@ -9,19 +9,20 @@ import { isSelfHost, toPuny } from '@/misc/convert-host.js'; import { DriveFiles, Users, UserLists, UserListJoinings } from '@/models/index.js'; import { genId } from '@/misc/gen-id.js'; import { DbUserImportJobData } from '@/queue/types.js'; +import { IsNull } from 'typeorm'; const logger = queueLogger.createSubLogger('import-user-lists'); export async function importUserLists(job: Bull.Job<DbUserImportJobData>, done: any): Promise<void> { logger.info(`Importing user lists of ${job.data.user.id} ...`); - const user = await Users.findOne(job.data.user.id); + const user = await Users.findOneBy({ id: job.data.user.id }); if (user == null) { done(); return; } - const file = await DriveFiles.findOne({ + const file = await DriveFiles.findOneBy({ id: job.data.fileId, }); if (file == null) { @@ -40,7 +41,7 @@ export async function importUserLists(job: Bull.Job<DbUserImportJobData>, done: const listName = line.split(',')[0].trim(); const { username, host } = Acct.parse(line.split(',')[1].trim()); - let list = await UserLists.findOne({ + let list = await UserLists.findOneBy({ userId: user.id, name: listName, }); @@ -51,13 +52,13 @@ export async function importUserLists(job: Bull.Job<DbUserImportJobData>, done: createdAt: new Date(), userId: user.id, name: listName, - }).then(x => UserLists.findOneOrFail(x.identifiers[0])); + }).then(x => UserLists.findOneByOrFail(x.identifiers[0])); } - let target = isSelfHost(host!) ? await Users.findOne({ - host: null, + let target = isSelfHost(host!) ? await Users.findOneBy({ + host: IsNull(), usernameLower: username.toLowerCase(), - }) : await Users.findOne({ + }) : await Users.findOneBy({ host: toPuny(host!), usernameLower: username.toLowerCase(), }); @@ -66,7 +67,7 @@ export async function importUserLists(job: Bull.Job<DbUserImportJobData>, done: target = await resolveUser(username, host); } - if (await UserListJoinings.findOne({ userListId: list!.id, userId: target.id }) != null) continue; + if (await UserListJoinings.findOneBy({ userListId: list!.id, userId: target.id }) != null) continue; pushUserToUserList(target, list!); } catch (e) { diff --git a/packages/backend/src/queue/processors/ended-poll-notification.ts b/packages/backend/src/queue/processors/ended-poll-notification.ts index afac27921f..6151c96ad6 100644 --- a/packages/backend/src/queue/processors/ended-poll-notification.ts +++ b/packages/backend/src/queue/processors/ended-poll-notification.ts @@ -8,7 +8,7 @@ import { createNotification } from '@/services/create-notification.js'; const logger = queueLogger.createSubLogger('ended-poll-notification'); export async function endedPollNotification(job: Bull.Job<EndedPollNotificationJobData>, done: any): Promise<void> { - const note = await Notes.findOne(job.data.noteId); + const note = await Notes.findOneBy({ id: job.data.noteId }); if (note == null || !note.hasPoll) { done(); return; diff --git a/packages/backend/src/queue/processors/inbox.ts b/packages/backend/src/queue/processors/inbox.ts index 1b3f94b700..4fbfdb234f 100644 --- a/packages/backend/src/queue/processors/inbox.ts +++ b/packages/backend/src/queue/processors/inbox.ts @@ -15,6 +15,8 @@ import DbResolver from '@/remote/activitypub/db-resolver.js'; import { resolvePerson } from '@/remote/activitypub/models/person.js'; import { LdSignature } from '@/remote/activitypub/misc/ld-signature.js'; import { StatusError } from '@/misc/fetch.js'; +import { CacheableRemoteUser } from '@/models/entities/user.js'; +import { UserPublickey } from '@/models/entities/user-publickey.js'; const logger = new Logger('inbox'); @@ -42,11 +44,13 @@ export default async (job: Bull.Job<InboxJobData>): Promise<string> => { return `Old keyId is no longer supported. ${keyIdLower}`; } - // TDOO: キャッシュ const dbResolver = new DbResolver(); // HTTP-Signature keyIdを元にDBから取得 - let authUser = await dbResolver.getAuthUserFromKeyId(signature.keyId); + let authUser: { + user: CacheableRemoteUser; + key: UserPublickey | null; + } | null = await dbResolver.getAuthUserFromKeyId(signature.keyId); // keyIdでわからなければ、activity.actorを元にDBから取得 || activity.actorを元にリモートから取得 if (authUser == null) { diff --git a/packages/backend/src/queue/processors/object-storage/clean-remote-files.ts b/packages/backend/src/queue/processors/object-storage/clean-remote-files.ts index 7d71a20adb..77da162f6e 100644 --- a/packages/backend/src/queue/processors/object-storage/clean-remote-files.ts +++ b/packages/backend/src/queue/processors/object-storage/clean-remote-files.ts @@ -37,7 +37,7 @@ export default async function cleanRemoteFiles(job: Bull.Job<Record<string, unkn deletedCount += 8; - const total = await DriveFiles.count({ + const total = await DriveFiles.countBy({ userHost: Not(IsNull()), isLink: false, }); diff --git a/packages/backend/src/queue/processors/webhook-deliver.ts b/packages/backend/src/queue/processors/webhook-deliver.ts new file mode 100644 index 0000000000..a4d39d86e4 --- /dev/null +++ b/packages/backend/src/queue/processors/webhook-deliver.ts @@ -0,0 +1,56 @@ +import { URL } from 'node:url'; +import Bull from 'bull'; +import Logger from '@/services/logger.js'; +import { WebhookDeliverJobData } from '../types.js'; +import { getResponse, StatusError } from '@/misc/fetch.js'; +import { Webhooks } from '@/models/index.js'; +import config from '@/config/index.js'; + +const logger = new Logger('webhook'); + +let latest: string | null = null; + +export default async (job: Bull.Job<WebhookDeliverJobData>) => { + try { + if (latest !== (latest = JSON.stringify(job.data.content, null, 2))) { + logger.debug(`delivering ${latest}`); + } + + const res = await getResponse({ + url: job.data.to, + method: 'POST', + headers: { + 'User-Agent': 'Misskey-Hooks', + 'X-Misskey-Host': config.host, + 'X-Misskey-Hook-Id': job.data.webhookId, + 'X-Misskey-Hook-Secret': job.data.secret, + }, + body: JSON.stringify(job.data.content), + }); + + Webhooks.update({ id: job.data.webhookId }, { + latestSentAt: new Date(), + latestStatus: res.status, + }); + + return 'Success'; + } catch (res) { + Webhooks.update({ id: job.data.webhookId }, { + latestSentAt: new Date(), + latestStatus: res instanceof StatusError ? res.statusCode : 1, + }); + + if (res instanceof StatusError) { + // 4xx + if (res.isClientError) { + return `${res.statusCode} ${res.statusMessage}`; + } + + // 5xx etc. + throw `${res.statusCode} ${res.statusMessage}`; + } else { + // DNS error, socket error, timeout ... + throw res; + } + } +}; |