summaryrefslogtreecommitdiff
path: root/src/server/api/stream
diff options
context:
space:
mode:
Diffstat (limited to 'src/server/api/stream')
-rw-r--r--src/server/api/stream/channel.ts12
-rw-r--r--src/server/api/stream/drive.ts10
-rw-r--r--src/server/api/stream/home.ts95
-rw-r--r--src/server/api/stream/messaging-index.ts10
-rw-r--r--src/server/api/stream/messaging.ts24
-rw-r--r--src/server/api/stream/othello-game.ts331
-rw-r--r--src/server/api/stream/othello.ts29
-rw-r--r--src/server/api/stream/requests.ts19
-rw-r--r--src/server/api/stream/server.ts19
9 files changed, 549 insertions, 0 deletions
diff --git a/src/server/api/stream/channel.ts b/src/server/api/stream/channel.ts
new file mode 100644
index 0000000000..d67d77cbf4
--- /dev/null
+++ b/src/server/api/stream/channel.ts
@@ -0,0 +1,12 @@
+import * as websocket from 'websocket';
+import * as redis from 'redis';
+
+export default function(request: websocket.request, connection: websocket.connection, subscriber: redis.RedisClient): void {
+ const channel = request.resourceURL.query.channel;
+
+ // Subscribe channel stream
+ subscriber.subscribe(`misskey:channel-stream:${channel}`);
+ subscriber.on('message', (_, data) => {
+ connection.send(data);
+ });
+}
diff --git a/src/server/api/stream/drive.ts b/src/server/api/stream/drive.ts
new file mode 100644
index 0000000000..c97ab80dcc
--- /dev/null
+++ b/src/server/api/stream/drive.ts
@@ -0,0 +1,10 @@
+import * as websocket from 'websocket';
+import * as redis from 'redis';
+
+export default function(request: websocket.request, connection: websocket.connection, subscriber: redis.RedisClient, user: any): void {
+ // Subscribe drive stream
+ subscriber.subscribe(`misskey:drive-stream:${user._id}`);
+ subscriber.on('message', (_, data) => {
+ connection.send(data);
+ });
+}
diff --git a/src/server/api/stream/home.ts b/src/server/api/stream/home.ts
new file mode 100644
index 0000000000..1ef0f33b4b
--- /dev/null
+++ b/src/server/api/stream/home.ts
@@ -0,0 +1,95 @@
+import * as websocket from 'websocket';
+import * as redis from 'redis';
+import * as debug from 'debug';
+
+import User from '../models/user';
+import Mute from '../models/mute';
+import { pack as packPost } from '../models/post';
+import readNotification from '../common/read-notification';
+
+const log = debug('misskey');
+
+export default async function(request: websocket.request, connection: websocket.connection, subscriber: redis.RedisClient, user: any) {
+ // Subscribe Home stream channel
+ subscriber.subscribe(`misskey:user-stream:${user._id}`);
+
+ const mute = await Mute.find({
+ muter_id: user._id,
+ deleted_at: { $exists: false }
+ });
+ const mutedUserIds = mute.map(m => m.mutee_id.toString());
+
+ subscriber.on('message', async (channel, data) => {
+ switch (channel.split(':')[1]) {
+ case 'user-stream':
+ try {
+ const x = JSON.parse(data);
+
+ if (x.type == 'post') {
+ if (mutedUserIds.indexOf(x.body.user_id) != -1) {
+ return;
+ }
+ if (x.body.reply != null && mutedUserIds.indexOf(x.body.reply.user_id) != -1) {
+ return;
+ }
+ if (x.body.repost != null && mutedUserIds.indexOf(x.body.repost.user_id) != -1) {
+ return;
+ }
+ } else if (x.type == 'notification') {
+ if (mutedUserIds.indexOf(x.body.user_id) != -1) {
+ return;
+ }
+ }
+
+ connection.send(data);
+ } catch (e) {
+ connection.send(data);
+ }
+ break;
+ case 'post-stream':
+ const postId = channel.split(':')[2];
+ log(`RECEIVED: ${postId} ${data} by @${user.username}`);
+ const post = await packPost(postId, user, {
+ detail: true
+ });
+ connection.send(JSON.stringify({
+ type: 'post-updated',
+ body: {
+ post: post
+ }
+ }));
+ break;
+ }
+ });
+
+ connection.on('message', data => {
+ const msg = JSON.parse(data.utf8Data);
+
+ switch (msg.type) {
+ case 'api':
+ // TODO
+ break;
+
+ case 'alive':
+ // Update lastUsedAt
+ User.update({ _id: user._id }, {
+ $set: {
+ 'account.last_used_at': new Date()
+ }
+ });
+ break;
+
+ case 'read_notification':
+ if (!msg.id) return;
+ readNotification(user._id, msg.id);
+ break;
+
+ case 'capture':
+ if (!msg.id) return;
+ const postId = msg.id;
+ log(`CAPTURE: ${postId} by @${user.username}`);
+ subscriber.subscribe(`misskey:post-stream:${postId}`);
+ break;
+ }
+ });
+}
diff --git a/src/server/api/stream/messaging-index.ts b/src/server/api/stream/messaging-index.ts
new file mode 100644
index 0000000000..c1b2fbc806
--- /dev/null
+++ b/src/server/api/stream/messaging-index.ts
@@ -0,0 +1,10 @@
+import * as websocket from 'websocket';
+import * as redis from 'redis';
+
+export default function(request: websocket.request, connection: websocket.connection, subscriber: redis.RedisClient, user: any): void {
+ // Subscribe messaging index stream
+ subscriber.subscribe(`misskey:messaging-index-stream:${user._id}`);
+ subscriber.on('message', (_, data) => {
+ connection.send(data);
+ });
+}
diff --git a/src/server/api/stream/messaging.ts b/src/server/api/stream/messaging.ts
new file mode 100644
index 0000000000..a4a12426a3
--- /dev/null
+++ b/src/server/api/stream/messaging.ts
@@ -0,0 +1,24 @@
+import * as websocket from 'websocket';
+import * as redis from 'redis';
+import read from '../common/read-messaging-message';
+
+export default function(request: websocket.request, connection: websocket.connection, subscriber: redis.RedisClient, user: any): void {
+ const otherparty = request.resourceURL.query.otherparty;
+
+ // Subscribe messaging stream
+ subscriber.subscribe(`misskey:messaging-stream:${user._id}-${otherparty}`);
+ subscriber.on('message', (_, data) => {
+ connection.send(data);
+ });
+
+ connection.on('message', async (data) => {
+ const msg = JSON.parse(data.utf8Data);
+
+ switch (msg.type) {
+ case 'read':
+ if (!msg.id) return;
+ read(user._id, otherparty, msg.id);
+ break;
+ }
+ });
+}
diff --git a/src/server/api/stream/othello-game.ts b/src/server/api/stream/othello-game.ts
new file mode 100644
index 0000000000..1c846f27ae
--- /dev/null
+++ b/src/server/api/stream/othello-game.ts
@@ -0,0 +1,331 @@
+import * as websocket from 'websocket';
+import * as redis from 'redis';
+import * as CRC32 from 'crc-32';
+import Game, { pack } from '../models/othello-game';
+import { publishOthelloGameStream } from '../event';
+import Othello from '../../common/othello/core';
+import * as maps from '../../common/othello/maps';
+
+export default function(request: websocket.request, connection: websocket.connection, subscriber: redis.RedisClient, user?: any): void {
+ const gameId = request.resourceURL.query.game;
+
+ // Subscribe game stream
+ subscriber.subscribe(`misskey:othello-game-stream:${gameId}`);
+ subscriber.on('message', (_, data) => {
+ connection.send(data);
+ });
+
+ connection.on('message', async (data) => {
+ const msg = JSON.parse(data.utf8Data);
+
+ switch (msg.type) {
+ case 'accept':
+ accept(true);
+ break;
+
+ case 'cancel-accept':
+ accept(false);
+ break;
+
+ case 'update-settings':
+ if (msg.settings == null) return;
+ updateSettings(msg.settings);
+ break;
+
+ case 'init-form':
+ if (msg.body == null) return;
+ initForm(msg.body);
+ break;
+
+ case 'update-form':
+ if (msg.id == null || msg.value === undefined) return;
+ updateForm(msg.id, msg.value);
+ break;
+
+ case 'message':
+ if (msg.body == null) return;
+ message(msg.body);
+ break;
+
+ case 'set':
+ if (msg.pos == null) return;
+ set(msg.pos);
+ break;
+
+ case 'check':
+ if (msg.crc32 == null) return;
+ check(msg.crc32);
+ break;
+ }
+ });
+
+ async function updateSettings(settings) {
+ const game = await Game.findOne({ _id: gameId });
+
+ if (game.is_started) return;
+ if (!game.user1_id.equals(user._id) && !game.user2_id.equals(user._id)) return;
+ if (game.user1_id.equals(user._id) && game.user1_accepted) return;
+ if (game.user2_id.equals(user._id) && game.user2_accepted) return;
+
+ await Game.update({ _id: gameId }, {
+ $set: {
+ settings
+ }
+ });
+
+ publishOthelloGameStream(gameId, 'update-settings', settings);
+ }
+
+ async function initForm(form) {
+ const game = await Game.findOne({ _id: gameId });
+
+ if (game.is_started) return;
+ if (!game.user1_id.equals(user._id) && !game.user2_id.equals(user._id)) return;
+
+ const set = game.user1_id.equals(user._id) ? {
+ form1: form
+ } : {
+ form2: form
+ };
+
+ await Game.update({ _id: gameId }, {
+ $set: set
+ });
+
+ publishOthelloGameStream(gameId, 'init-form', {
+ user_id: user._id,
+ form
+ });
+ }
+
+ async function updateForm(id, value) {
+ const game = await Game.findOne({ _id: gameId });
+
+ if (game.is_started) return;
+ if (!game.user1_id.equals(user._id) && !game.user2_id.equals(user._id)) return;
+
+ const form = game.user1_id.equals(user._id) ? game.form2 : game.form1;
+
+ const item = form.find(i => i.id == id);
+
+ if (item == null) return;
+
+ item.value = value;
+
+ const set = game.user1_id.equals(user._id) ? {
+ form2: form
+ } : {
+ form1: form
+ };
+
+ await Game.update({ _id: gameId }, {
+ $set: set
+ });
+
+ publishOthelloGameStream(gameId, 'update-form', {
+ user_id: user._id,
+ id,
+ value
+ });
+ }
+
+ async function message(message) {
+ message.id = Math.random();
+ publishOthelloGameStream(gameId, 'message', {
+ user_id: user._id,
+ message
+ });
+ }
+
+ async function accept(accept: boolean) {
+ const game = await Game.findOne({ _id: gameId });
+
+ if (game.is_started) return;
+
+ let bothAccepted = false;
+
+ if (game.user1_id.equals(user._id)) {
+ await Game.update({ _id: gameId }, {
+ $set: {
+ user1_accepted: accept
+ }
+ });
+
+ publishOthelloGameStream(gameId, 'change-accepts', {
+ user1: accept,
+ user2: game.user2_accepted
+ });
+
+ if (accept && game.user2_accepted) bothAccepted = true;
+ } else if (game.user2_id.equals(user._id)) {
+ await Game.update({ _id: gameId }, {
+ $set: {
+ user2_accepted: accept
+ }
+ });
+
+ publishOthelloGameStream(gameId, 'change-accepts', {
+ user1: game.user1_accepted,
+ user2: accept
+ });
+
+ if (accept && game.user1_accepted) bothAccepted = true;
+ } else {
+ return;
+ }
+
+ if (bothAccepted) {
+ // 3秒後、まだacceptされていたらゲーム開始
+ setTimeout(async () => {
+ const freshGame = await Game.findOne({ _id: gameId });
+ if (freshGame == null || freshGame.is_started || freshGame.is_ended) return;
+ if (!freshGame.user1_accepted || !freshGame.user2_accepted) return;
+
+ let bw: number;
+ if (freshGame.settings.bw == 'random') {
+ bw = Math.random() > 0.5 ? 1 : 2;
+ } else {
+ bw = freshGame.settings.bw as number;
+ }
+
+ function getRandomMap() {
+ const mapCount = Object.entries(maps).length;
+ const rnd = Math.floor(Math.random() * mapCount);
+ return Object.entries(maps).find((x, i) => i == rnd)[1].data;
+ }
+
+ const map = freshGame.settings.map != null ? freshGame.settings.map : getRandomMap();
+
+ await Game.update({ _id: gameId }, {
+ $set: {
+ started_at: new Date(),
+ is_started: true,
+ black: bw,
+ 'settings.map': map
+ }
+ });
+
+ //#region 盤面に最初から石がないなどして始まった瞬間に勝敗が決定する場合があるのでその処理
+ const o = new Othello(map, {
+ isLlotheo: freshGame.settings.is_llotheo,
+ canPutEverywhere: freshGame.settings.can_put_everywhere,
+ loopedBoard: freshGame.settings.looped_board
+ });
+
+ if (o.isEnded) {
+ let winner;
+ if (o.winner === true) {
+ winner = freshGame.black == 1 ? freshGame.user1_id : freshGame.user2_id;
+ } else if (o.winner === false) {
+ winner = freshGame.black == 1 ? freshGame.user2_id : freshGame.user1_id;
+ } else {
+ winner = null;
+ }
+
+ await Game.update({
+ _id: gameId
+ }, {
+ $set: {
+ is_ended: true,
+ winner_id: winner
+ }
+ });
+
+ publishOthelloGameStream(gameId, 'ended', {
+ winner_id: winner,
+ game: await pack(gameId, user)
+ });
+ }
+ //#endregion
+
+ publishOthelloGameStream(gameId, 'started', await pack(gameId, user));
+ }, 3000);
+ }
+ }
+
+ // 石を打つ
+ async function set(pos) {
+ const game = await Game.findOne({ _id: gameId });
+
+ if (!game.is_started) return;
+ if (game.is_ended) return;
+ if (!game.user1_id.equals(user._id) && !game.user2_id.equals(user._id)) return;
+
+ const o = new Othello(game.settings.map, {
+ isLlotheo: game.settings.is_llotheo,
+ canPutEverywhere: game.settings.can_put_everywhere,
+ loopedBoard: game.settings.looped_board
+ });
+
+ game.logs.forEach(log => {
+ o.put(log.color, log.pos);
+ });
+
+ const myColor =
+ (game.user1_id.equals(user._id) && game.black == 1) || (game.user2_id.equals(user._id) && game.black == 2)
+ ? true
+ : false;
+
+ if (!o.canPut(myColor, pos)) return;
+ o.put(myColor, pos);
+
+ let winner;
+ if (o.isEnded) {
+ if (o.winner === true) {
+ winner = game.black == 1 ? game.user1_id : game.user2_id;
+ } else if (o.winner === false) {
+ winner = game.black == 1 ? game.user2_id : game.user1_id;
+ } else {
+ winner = null;
+ }
+ }
+
+ const log = {
+ at: new Date(),
+ color: myColor,
+ pos
+ };
+
+ const crc32 = CRC32.str(game.logs.map(x => x.pos.toString()).join('') + pos.toString());
+
+ await Game.update({
+ _id: gameId
+ }, {
+ $set: {
+ crc32,
+ is_ended: o.isEnded,
+ winner_id: winner
+ },
+ $push: {
+ logs: log
+ }
+ });
+
+ publishOthelloGameStream(gameId, 'set', Object.assign(log, {
+ next: o.turn
+ }));
+
+ if (o.isEnded) {
+ publishOthelloGameStream(gameId, 'ended', {
+ winner_id: winner,
+ game: await pack(gameId, user)
+ });
+ }
+ }
+
+ async function check(crc32) {
+ const game = await Game.findOne({ _id: gameId });
+
+ if (!game.is_started) return;
+
+ // 互換性のため
+ if (game.crc32 == null) return;
+
+ if (crc32 !== game.crc32) {
+ connection.send(JSON.stringify({
+ type: 'rescue',
+ body: await pack(game, user)
+ }));
+ }
+ }
+}
diff --git a/src/server/api/stream/othello.ts b/src/server/api/stream/othello.ts
new file mode 100644
index 0000000000..bd3b4a7637
--- /dev/null
+++ b/src/server/api/stream/othello.ts
@@ -0,0 +1,29 @@
+import * as mongo from 'mongodb';
+import * as websocket from 'websocket';
+import * as redis from 'redis';
+import Matching, { pack } from '../models/othello-matching';
+import publishUserStream from '../event';
+
+export default function(request: websocket.request, connection: websocket.connection, subscriber: redis.RedisClient, user: any): void {
+ // Subscribe othello stream
+ subscriber.subscribe(`misskey:othello-stream:${user._id}`);
+ subscriber.on('message', (_, data) => {
+ connection.send(data);
+ });
+
+ connection.on('message', async (data) => {
+ const msg = JSON.parse(data.utf8Data);
+
+ switch (msg.type) {
+ case 'ping':
+ if (msg.id == null) return;
+ const matching = await Matching.findOne({
+ parent_id: user._id,
+ child_id: new mongo.ObjectID(msg.id)
+ });
+ if (matching == null) return;
+ publishUserStream(matching.child_id, 'othello_invited', await pack(matching, matching.child_id));
+ break;
+ }
+ });
+}
diff --git a/src/server/api/stream/requests.ts b/src/server/api/stream/requests.ts
new file mode 100644
index 0000000000..d7bb5e6c5c
--- /dev/null
+++ b/src/server/api/stream/requests.ts
@@ -0,0 +1,19 @@
+import * as websocket from 'websocket';
+import Xev from 'xev';
+
+const ev = new Xev();
+
+export default function(request: websocket.request, connection: websocket.connection): void {
+ const onRequest = request => {
+ connection.send(JSON.stringify({
+ type: 'request',
+ body: request
+ }));
+ };
+
+ ev.addListener('request', onRequest);
+
+ connection.on('close', () => {
+ ev.removeListener('request', onRequest);
+ });
+}
diff --git a/src/server/api/stream/server.ts b/src/server/api/stream/server.ts
new file mode 100644
index 0000000000..4ca2ad1b10
--- /dev/null
+++ b/src/server/api/stream/server.ts
@@ -0,0 +1,19 @@
+import * as websocket from 'websocket';
+import Xev from 'xev';
+
+const ev = new Xev();
+
+export default function(request: websocket.request, connection: websocket.connection): void {
+ const onStats = stats => {
+ connection.send(JSON.stringify({
+ type: 'stats',
+ body: stats
+ }));
+ };
+
+ ev.addListener('stats', onStats);
+
+ connection.on('close', () => {
+ ev.removeListener('stats', onStats);
+ });
+}