summaryrefslogtreecommitdiff
path: root/src/api/stream/messaging.ts
blob: 71bf7a34c6171a4eb74204b97a2ca1ce984befe8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import * as mongodb from 'mongodb';
import * as websocket from 'websocket';
import * as redis from 'redis';
import Message from '../models/messaging-message';
import { publishMessagingStream } from '../event';

export default function messagingStream(request: websocket.request, connection: websocket.connection, subscriber: redis.RedisClient, user: any): void {
	const otherparty = request.resourceURL.query.otherparty;

	// Subscribe messaging stream
	subscriber.subscribe(`misskey:messaging-stream:${user._id}-${otherparty}`);
	subscriber.on('message', (_, data) => {
		connection.send(data);
	});

	connection.on('message', async (data) => {
		const msg = JSON.parse(data.utf8Data);

		switch (msg.type) {
			case 'read':
				if (!msg.id) {
					return;
				}

				const id = new mongodb.ObjectID(msg.id);

				// Fetch message
				// SELECT _id, user_id, is_read
				const message = await Message.findOne({
					_id: id,
					recipient_id: user._id
				}, {
					fields: {
						_id: true,
						user_id: true,
						is_read: true
					}
				});

				if (message == null) {
					return;
				}

				if (message.is_read) {
					return;
				}

				// Update documents
				await Message.update({
					_id: id
				}, {
					$set: { is_read: true }
				});

				// Publish event
				publishMessagingStream(message.user_id, user._id, 'read', id.toString());
				break;
		}
	});
}