2017-04-07 01:29:05 +02:00
'use strict';
2018-04-12 16:40:58 +00:00
const db = require('../models/db');
const Sequelize = require('sequelize');
const Op = Sequelize.Op;
2017-04-07 01:29:05 +02:00
2018-03-30 22:34:27 +02:00
const config = require('config');
2017-04-07 01:29:05 +02:00
const WebSocketServer = require('ws').Server;
2018-04-12 16:40:58 +00:00
//const RedisConnection = require('ioredis');
2017-04-07 01:29:05 +02:00
const async = require('async');
const _ = require("underscore");
const crypto = require('crypto');
2018-03-30 22:34:27 +02:00
const redisMock = require("./redis.js");
2018-01-08 00:08:42 +01:00
2017-04-07 01:29:05 +02:00
module.exports = {
2018-01-08 00:08:42 +01:00
startWebsockets: function(server) {
2017-04-07 01:29:05 +02:00
2018-01-08 00:08:42 +01:00
2018-03-30 22:34:27 +02:00
if (!this.current_websockets) {
if (config.get("redis_mock")) {
this.state = redisMock.getConnection();
} else {
this.state = new RedisConnection(6379, process.env.REDIS_PORT_6379_TCP_ADDR || config.get("redis_host"));
2017-04-07 01:29:05 +02:00
this.current_websockets = [];
const wss = new WebSocketServer({ server:server, path: "/socket" });
wss.on('connection', function(ws) {
this.state.incr("socket_id", function(err, socketCounter) {
const socketId = "socket_" + socketCounter + "_" + crypto.randomBytes(64).toString('hex').substring(0,8);
const serverScope = this;
ws.on('message', function(msgString){
const socket = this;
const msg = JSON.parse(msgString);
if(msg.action == "auth"){
const token = msg.auth_token;
const editorName = msg.editor_name;
const editorAuth = msg.editor_auth;
const spaceId = msg.space_id;
2018-04-12 16:40:58 +00:00
db.Space.findOne({where: {"_id": spaceId}}).then(space => {
2017-04-07 01:29:05 +02:00
if (space) {
const upgradeSocket = function() {
if (token) {
2018-04-12 16:40:58 +00:00
db.findUserBySessionToken(token, function(err, user) {
2017-04-07 01:29:05 +02:00
if (err) {
console.error(err, user);
} else {
if (user) {
serverScope.addUserInSpace(user._id, space, ws, function(err){
serverScope.addLocalUser(user._id, ws);
console.log("[websockets] user " + user.email + " online in space " + space._id);
} else {
const anonymousUserId = space._id + "-" + editorName;
if(space.access_mode == "private" && space.edit_hash != editorAuth){
console.error("closing websocket: unauthed.");
ws.send(JSON.stringify({error: "auth_failed"}));
// ws.close();
serverScope.addUserInSpace(anonymousUserId, space, ws, function(err){
serverScope.addLocalUser(anonymousUserId, ws);
console.log("[websockets] anonymous user " + anonymousUserId + " online in space " + space._id);
if (!ws.id) {
ws['id'] = socketId;
try {
ws.send(JSON.stringify({"action": "init", "channel_id": socketId}));
} catch (e) {
console.log("ws.send error: "+e);
if (ws.space_id) {
serverScope.removeUserInSpace(ws.space_id, ws, function(err) {
} else {
} else {
ws.send(JSON.stringify({error: "space not found"}));
} else if (msg.action == "cursor" || msg.action == "viewport" || msg.action=="media") {
msg.space_id = socket.space_id;
msg.from_socket_id = socket.id;
serverScope.state.publish('cursors', JSON.stringify(msg));
ws.on('close', function(evt) {
console.log("websocket closed: ", ws.id, ws.space_id);
const spaceId = ws.space_id;
serverScope.removeUserInSpace(spaceId, ws, function(err) {
this.removeLocalUser(ws, function(err) {
ws.on('error', function(ws, err) {
console.error(err, res);
setupSubscription: function() {
2018-03-30 22:34:27 +02:00
if (config.get("redis_mock")) {
this.cursorSubscriber = redisMock.getConnection().subscribe(['cursors', 'users', 'updates'], function (err, count) {
console.log("[redis-mock] websockets subscribed to " + count + " topics." );
} else {
this.cursorSubscriber = new RedisConnection(6379, process.env.REDIS_PORT_6379_TCP_ADDR || config.get("redis_host"));
this.cursorSubscriber.subscribe(['cursors', 'users', 'updates'], function (err, count) {
console.log("[redis] websockets subscribed to " + count + " topics." );
2017-04-07 01:29:05 +02:00
this.cursorSubscriber.on('message', function (channel, rawMessage) {
const msg = JSON.parse(rawMessage);
const spaceId = msg.space_id;
const websockets = this.current_websockets;
if(channel === "updates") {
for(let i=0;i<websockets.length;i++) {
const ws = websockets[i];
if(ws.readyState === 1) {
} else if(channel === "users") {
const usersList = msg.users;
if (usersList) {
for(let i=0;i<usersList.length;i++) {
const activeUser = usersList[i];
let user_id;
if (activeUser._id) {
user_id = activeUser._id;
} else {
user_id = spaceId + "-" + (activeUser.nickname||"anonymous");
for (let a=0; a < websockets.length; a++) {
const ws = websockets[a];
if(ws.readyState === 1){
if(ws.space_id == spaceId) {
ws.send(JSON.stringify({"action": "status_update", space_id: spaceId, users: usersList}));
} else {
//console.log("space id not matching", spaceId, ws.space_id);
} else {
console.error("socket in wrong state", ws.readyState);
if(ws.readyState == 3) {
this.removeLocalUser(ws, (err) => {
console.log("old websocket removed");
} else {
console.error("userlist undefined for websocket");
} else if(channel === "cursors") {
const socketId = msg.from_socket_id;
for (let i=0;i<websockets.length;i++) {
const ws = websockets[i];
if (ws.readyState === 1) {
if (ws.space_id && spaceId) {
if ((ws.space_id == spaceId) && (ws.id !== socketId)) {
} else {
console.log("space id not set, ignoring");
addLocalUser: function(username, ws) {
if (ws.added) {
ws.added = true;
removeLocalUser: function(ws, cb) {
const idx = this.current_websockets.indexOf(ws);
if(idx > -1) {
this.removed_items = this.current_websockets.splice(idx, 1);
console.log("removed local socket, current online on this process: ", this.current_websockets.length);
} else {
console.log("websocket not found to remove");
2018-01-08 00:08:42 +01:00
this.state.del(ws.id+"", function(err, res) {
2017-04-07 01:29:05 +02:00
if (err) console.error(err, res);
else {
this.removeUserInSpace(ws.space_id, ws, (err) => {
console.log("removed user from space list");
addUserInSpace: function(username, space, ws, cb) {
console.log("[websockets] user "+username+" in "+space.access_mode +" space " + space._id + " with socket " + ws.id);
2018-01-08 00:08:42 +01:00
this.state.set(ws.id+"", username+"", function(err, res) {
2017-04-07 01:29:05 +02:00
if(err) console.error(err, res);
else {
this.state.sadd("space_" + space._id, ws.id, function(err, res) {
if(err) cb(err);
else {
ws['space_id'] = space._id.toString();
removeUserInSpace: function(spaceId, ws, cb) {
2018-01-08 00:08:42 +01:00
this.state.srem("space_" + spaceId, ws.id+"", function(err, res) {
2017-04-07 01:29:05 +02:00
if (err) cb(err);
else {
console.log("[websockets] socket "+ ws.id + " went offline in space " + spaceId);
ws['space_id'] = null;
if (cb)
distributeUsers: function(spaceId) {
2018-04-12 16:40:58 +00:00
if (!spaceId)
2017-04-07 01:29:05 +02:00
2018-04-12 16:40:58 +00:00
/*this.state.smembers("space_" + spaceId, function(err, list) {
2017-04-07 01:29:05 +02:00
async.map(list, function(item, callback) {
this.state.get(item, function(err, userId) {
console.log(item, "->", userId);
callback(null, userId);
}.bind(this), function(err, userIds) {
const uniqueUserIds = _.unique(userIds);
const validUserIds = _.filter(uniqueUserIds, function(uId) {
return mongoose.Types.ObjectId.isValid(uId);
const nonValidUserIds = _.filter(uniqueUserIds, function(uId) {
return (uId !== null && !mongoose.Types.ObjectId.isValid(uId));
const anonymousUsers = _.map(nonValidUserIds, function(nonValidId) {
const realNickname = nonValidId.slice(nonValidId.indexOf("-")+1);
return {nickname: realNickname, email: null, avatar_thumbnail_uri: null };
2018-04-12 16:40:58 +00:00
db.User.findAll({where: {
"_id" : { "$in" : validUserIds }}, attributes: ["nickname","email","avatar_thumbnail_uri"]})
.then(users) {
2017-04-07 01:29:05 +02:00
const allUsers = users.concat(anonymousUsers);
const strUsers = JSON.stringify({users: allUsers, space_id: spaceId});
this.state.publish("users", strUsers);
2018-04-12 16:40:58 +00:00
2017-04-07 01:29:05 +02:00
2018-04-12 16:40:58 +00:00
2017-04-07 01:29:05 +02:00