socket.io线上生产环境有经验的请进
发布于 12 年前 作者 samoin 22543 次预览 最后一次回复是 12 年前 来自
这两天简单的用socket.io做了下封装来实现用户在线状态的统计,在实际应用中,遇到了很诡异的问题。
简单来说,就说在部分机器上会重复的给服务器发送连接请求,按照昨天出问题后回看的日志中,最多的会发送超过1000次,不知道现在各位是否有遇到同样的情况,如果有的话,求解。
transports策略为[“websocket” , “htmlfile” , “xhr-polling” , “jsonp-polling”],和express进行了整合(socket.io:0.9.14,express:3.2.5,node.js:v0.10.5,策略里没有flashsocket的原因是实际测试发现这种模式建立连接要等待好几秒,同时默认的重连策略感觉大概超过1-2秒以后就无法自动重连)。
大致的流程就是客户端浏览器在建立连接后将一些信息发送给服务端,然后服务端进行状态保持,当监听到用户断开后,再更新用户状态。
举例出问题的连接(最夸张的一个): “Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; .NET CLR 2.0.50727)-******”:1518 按照useragent来解析,这台用户的机器是winxp,用的是ie8,到出问题为止一共发送了1518次连接请求,但是我用同样的环境来连就不会出现这种情况(我本机就是这样)。
不知道问题这样描述是否详细,。
39 回复
补充下,我在代码里还用到了cluster,默认配置只启动一个子进程
首先,感谢您的回复
确定你的浏览器支持websocket? 这个应该不是重点,用socket.io就是想利用他现成的长连接的解决方案
禁用htmlfile协议,如果启用了,它与gzip是不相容的,忘了具体原因,现在也没空给你看线上配置 这个是否说要在transports策略中去掉htmlfile
前端可以禁止socket.io重连配置,你自己监控disconnect事件,然后重连 现在我用的是默认的自动连接方案,回头我试下。
顺便问下,请问你们现在线上的服务是要求用户使用只支持websocket的浏览器吗?
哦,行吧,谢谢
问题依旧,郁闷,总会有这种不断连接的现象出现,从而导致端口不释放出问题。 (node) warning: possible EventEmitter memory leak detected. 11 listeners added. Use emitter.setMaxListeners() to increase limit. Trace at Socket.EventEmitter.addListener (events.js:160:15) at Socket.Readable.on (_stream_readable.js:663:33) at Socket.EventEmitter.once (events.js:179:8) at TCP.onread (net.js:527:26)
我已经按照网上的一些做法对app,io以及process都增加了设置 app.setMaxListeners(0); process.setMaxListeners(0); io.setMaxListeners(0); 但是仍然出现问题,求各种help
出问题的时候,node的进程cpu占满,请求过去一直不会返回。
你的代码应该写的有问题,方便贴出来吗?
方便,这个需要全部贴出来吗?还是只需要其中某一部分
配置文件socket.io-chat-loginpro.js var arr = []; arr.push({pass : “admin” , pwd : “admin123456” , type : “admin”}); var socketioProperty = { “log level” : 0 , “transports” : [“websocket” , “xhr-polling” , “jsonp-polling”] , “close timeout” : 25 , “heartbeat timeout” : 25 , “heartbeat interval” : 12 , “polling duration” : 10};
//服务开放的端口号 exports.port=443; //用来配置后台可登录的用户,这里没有数据落地,如需数据落地,可进行相关代码的修改 exports.userList=arr; //对express的logger的配置,调用为app.use(express.logger(‘dev’)); exports.expressLogger=""; //对socket.io的一些设置,目前只做了这一些,如有需求,可继续添加和修改 exports.socketioProperty = socketioProperty; //memoryStore的ReapInterval exports.memoryStoreReapInterval = 1000 * 60 * 10; //session的Secret-key exports.sessionSecret = ‘socket.io-chat’; //是否允许唯一key多个浏览器窗口登录 exports.canMoreClientForUqKey = true; //在不允许唯一key多个浏览器窗口登录的前提下,是否发送脚本踢掉上一个登录用户 exports.kickMoreClientForUqKey = true; //如需要踢人,对应调用的函数名 exports.kickMoreClientForUqKeyFuncName = “chat.kickUser()”; //启动node对应的主文件名,用来统计资源消耗用 exports.appName = “socket.io-chat.js”; //统计日志的间隔时间,秒为单位 exports.staticSec = 60; //服务对应的外网地址和port exports.workUrl=“http://127.0.0.1:443”; //mongodb的集群的地址,建议您使用repSet集群配置 exports.mongodbRep = “mongodb://127.0.0.1:47022,127.0.0.1:47023”;
主文件socket.io-chat.js var loginPro = require("./socket.io-chat-loginpro") , routesIndex = require(’./routes/index’); var exec = require(‘child_process’).exec; //服务对应的外网地址和port var workUrl = loginPro.workUrl; //启动node对应的主文件名,用来统计资源消耗用 var appName = loginPro.appName; //统计日志的间隔时间,秒为单位 var staticSec = loginPro.staticSec; //是否允许唯一key多个浏览器窗口登录 var canMoreClientForUqKey = loginPro.canMoreClientForUqKey; //在不允许唯一key多个浏览器窗口登录的前提下,是否发送脚本踢掉上一个登录用户 var kickMoreClientForUqKey = loginPro.kickMoreClientForUqKey; //如需要踢人,对应调用的函数名 var kickMoreClientForUqKeyFuncName = loginPro.kickMoreClientForUqKeyFuncName; //该进程内的所有聊天室的相关信息,map结构 var chatMap = {}; //命令的备注(尽量不要修改,在前台的js调用中需同步复制一份,并遵循命令规则) cmdObj = {“register” : 1 , “leave” : 2 , “talk” : 3 , “func” : 4}; // 当前在线用户人数 var userCount = 0; var parseCookie = require(‘connect’).utils.parseCookie , MemoryStore = require(‘connect/lib/middleware/session/memory’); //建立一个memory store的实例 var storeMemory = new MemoryStore({ reapInterval: loginPro.memoryStoreReapInterval }); var express = require(‘express’), app, server, io; app = express(); app.setMaxListeners(0); var cluster = require(‘cluster’); // mongodb var clusterList = require("./model/clusterList") ,clusterListDao = clusterList.dao; var chatQueen = require("./model/chatQueen") ,chatQueenDao = chatQueen.dao; // 最新同步记录的最大时间 var lastSyncDate = new Date().getTime(); // 是否正在进行同步的相关操作 var isSyncQueen = false; if (cluster.isMaster) { var numCPUs = require(‘os’).cpus().length; numCPUs = 1; for ( var i = 0; i < numCPUs; i++) { cluster.fork(); } //server.serverCluster = cluster.workers; // this is for version 0.8.x + cluster.on(‘exit’, function(worker, code, signal) { console.log(“worker " , worker.process.pid , " exit , restarting …”); cluster.fork(); }); } else if(cluster.isWorker){ process.setMaxListeners(0); createServer(); }
function createServer(){ startSyncMongodb(true); }
function insertChatQueen(data){ var dao = new chatQueenDao(); dao.work_url = workUrl; var msg = data.msg; dao.cmd = data.cmd; dao.userAgent = data.msg.userAgent; dao.uqKey = msg.uqKey; dao.moudle = msg.moudle; dao.randomKey = msg.randomKey; dao.ip = msg.ip; dao.toId = msg.toId; dao.from = msg.from; dao.info = msg.info; dao.create_time = new Date().getTime(); dao.save(function(err){ if(err){ console.log("system > cmd - " , data , " error writing to mongodb , " , err); } }); }
function getLastChatQueenList(date){ if(!isSyncQueen){ isSyncQueen = true; if(!date){ date = new Date().getTime(); } chatQueenDao.find({“create_time” : {$gt : date} , “work_url” : {$ne : workUrl}},null,function(err,data){ if(!err){ //console.log(JSON.stringify(data)); if(data && data.length > 0){ syncQueenList(data); } }else{ console.log(err); } }); } }
function syncQueenList(data){ var tmpTime = 0; for(var i = 0 ; i < data.length ; i++){ var msgObj = data[i]; //console.log(msgObj.toString()); solveQueenMsg(eval(msgObj.toJSON())); if(i == data.length - 1){ tmpTime = msgObj.toJSON().create_time; } } clusterListDao.update({“work_url” : workUrl},{last_synctime_flag : tmpTime},function(err, numAffected){ if(!err){//null:0 isSyncQueen = false; lastSyncDate = tmpTime; console.log(“system > syncQueenList ok”); //console.log(JSON.stringify(chatMap)); } });
}
function solveQueenMsg(msgObj){ var cmdMap = {msg:{isDummySocekt:true},cmd:0}; for(var col in msgObj){ if(col == “cmd”){ cmdMap.cmd = msgObj[col]; }else{ if(col.substring(0,1) != “_”){ cmdMap.msg[col] = msgObj[col]; } } } var dummySocket = cmdMap.msg; var cmd = cmdMap.cmd; // 下面的代码,从逻辑上和真实处理是一致的,但是没有对应的socket,为了满足之前的value结构,所以用一个模拟的对象来代替,在发送的时候要注意排除给这些对象的发送 if(cmd == cmdObj[“register”]){ var msg = dummySocket; var moduleKey = msg.moudle; var uqKey = msg.uqKey; var randomKey = msg.randomKey; if(!chatMap[moduleKey]){ chatMap[moduleKey] = {}; } if(!chatMap[moduleKey][uqKey]){ chatMap[moduleKey][uqKey] = {}; } if( !canMoreClientForUqKey){ chatMap[moduleKey][uqKey] = {}; } chatMap[moduleKey][uqKey][randomKey] = dummySocket; } if(cmd == cmdObj[“leave”]){ var msg = dummySocket; var moduleKey = msg.moudle; var uqKey = msg.uqKey; var randomKey = msg.randomKey; if(chatMap[moduleKey] && chatMap[moduleKey][uqKey] && getCountFromMap(chatMap[moduleKey][uqKey]) == 1){ emitByMoudle(moduleKey , cmdMap); } if(chatMap[moduleKey] && chatMap[moduleKey][uqKey]){ delete(chatMap[moduleKey][uqKey][randomKey]); } if(chatMap[moduleKey] && chatMap[moduleKey][uqKey] && getCountFromMap(chatMap[moduleKey][uqKey]) == 0){ delete(chatMap[moduleKey][uqKey]); } } if(cmd == cmdObj[“talk”] || cmd == cmdObj[“func”]){ var msg = dummySocket; var toId = msg.toId; var moduleKey = msg[“moudle”]; if(toId == “”){ //console.log(moduleKey, “emit to all”); emitByMoudle(moduleKey , msgObj); }else{ //console.log(moduleKey, “emit to <” + toId + “>”); emitByMoudleAndId(moduleKey , toId , msgObj); emitByMoudleAndId(moduleKey , msg[“uqKey”] , msgObj); } } //console.log(JSON.stringify(dummySocket)); }
function startSyncMongodb(isInit){ // 入库mongodb,更新自己的状态为在线 clusterListDao.find({“work_url” : workUrl},null,function(err,data){ if(!err){ var dao = new clusterListDao(); if(data.length == 0){ dao.work_url = workUrl; //console.log(dao.work_url); dao.save(function(err){ if(!err){//null if(isInit){ startServer(); } } console.log(“system > create clusterList ok”); }); }else{ data = data[0]; var obj = {is_alive : true , browser_count : userCount}; if(isInit){ obj[“last_synctime_flag”] = new Date().getTime(); } clusterListDao.update({_id:data._id},obj,function(err, numAffected){ if(!err){//null:0 if(isInit){ startServer(); } console.log(“system > update clusterList ok”); } }); lastSyncDate = data.last_synctime_flag; } } }); }
function startServer(){ setInterval(function(){getResByShell();},staticSec * 1000); server = require(‘http’).createServer(app); io = require(‘socket.io’).listen(server); io.setMaxListeners(0); server.listen(loginPro.port);
function renderAllChatMapJsonStr(isAll){ //console.log(isAll); //console.log(chatMap); var result = {}; for(var room in chatMap){ var userListMap = chatMap[room]; //console.log(userListMap); var map1 = {}; for(uqKey in userListMap){ map1[uqKey] = getCountFromMap(userListMap[uqKey]); } result[room] = map1; } if(getCountFromMap(result) == 0){ return “{}”; } if(isAll){ var c = 0; //console.log©; for(var k in result){ var usrMap = result[k]; for(var k2 in usrMap){ //console.log(usrMap[k2]); c += usrMap[k2]; } } userCount = c; result[“totalCount”] = userCount; } return JSON.stringify(result); } /**
//根据所在模块(房间)给该模块下的所有socket推送消息 function emitByMoudle(moudleKey , data){ for(var key in chatMap[moudleKey]){ emitByMoudleAndId(moudleKey , key , data); } } //根据所在模块(房间)给对应的唯一id推送消息 function emitByMoudleAndId(moudleKey , uqId , data){ if(chatMap[moudleKey]){ var socketTmpObj = chatMap[moudleKey][uqId]; //console.log(uqId); emitByMap(moudleKey,socketTmpObj,data); }else{ console.log(“key > “,moudleKey,” does not exist in chatMap,no socket to send…”); } } //根据所在模块(房间)给对应的socket集合推送消息 function emitByMap(moudleKey,socketTmpObj,data){ for(var key in socketTmpObj){ var socketTmp = socketTmpObj[key]; emitBySocket(moudleKey,socketTmp,data) } } //根据所在模块(房间)给对应的socket推送消息 function emitBySocket(moudleKey,socketTmp,data){ if(socketTmp){ try{ // 下面的代码,从逻辑上和真实处理是一致的,但是没有对应的socket,为了满足之前的value结构,所以用一个模拟的对象来代替,在发送的时候要注意排除给这些对象的发送 if(!socketTmp.isDummySocekt){ socketTmp.emit(“msgs” , data); } }catch(e){ console.log("emit error >> " , e); } }else{ delete(chatMap[moudleKey][uqId][key]); } } //从object(map)对象中,返回存在的key的个数 function getCountFromMap(map){ var count = 0; for(var key in map){ count++; } return count; } //从object(map)对象中,返回存在的key的详细信息汇总 function getStaticInfoFromMap(map){ var arr = [];; for(var key in map){ var objTmp = map[key]; var tmp = {}; tmp[“ip”] = objTmp[“ip”]; tmp[“uqKey”] = objTmp[“uqKey”]; tmp[“moudle”] = objTmp[“moudle”]; tmp[“randomKey”] = objTmp[“randomKey”]; arr.push(tmp); } return arr; } function judgeErrorConnect(){
} var currentCountArr = []; // 获取当前node所占的资源 function getResByShell() { //“ps aux| grep ‘node’ " dir exec(“ps aux| grep ‘node’ “, function(err, output) { if (err) { throw err; } if (output.length > 0) { // USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND // root 1211 0.0 0.1 659800 10512 ? Sl Mar13 0:36 node app2.js var tmpArr = output.split(”\n”); var resultArr = new Array(); for ( var i = 0; i < tmpArr.length; i++) { var infoTmp = tmpArr[i]; if (typeof (infoTmp) == “object”) { continue; } var infoTmpArr = infoTmp.split(” “); var tmpIndex = 0; var nodeInfo = {}; if (infoTmpArr.length < 2) { continue; } for ( var j = 0; j < infoTmpArr.length; j++) { var infoTmpArrTmp = infoTmpArr[j]; if (infoTmpArrTmp.replace(/ /gim, “”).length > 0) { if (tmpIndex == 0) { nodeInfo[“USER”] = infoTmpArrTmp; } if (tmpIndex == 1) { nodeInfo[“PID”] = infoTmpArrTmp; } if (tmpIndex == 2) { nodeInfo[”%CPU"] = infoTmpArrTmp; } if (tmpIndex == 3) { nodeInfo["%MEM"] = infoTmpArrTmp; } if (tmpIndex == 4) { nodeInfo[“VSZ”] = infoTmpArrTmp; } if (tmpIndex == 5) { nodeInfo[“RSS”] = infoTmpArrTmp; } if (tmpIndex == 6) { nodeInfo[“TTY”] = infoTmpArrTmp; } if (tmpIndex == 7) { nodeInfo[“STAT”] = infoTmpArrTmp; } if (tmpIndex == 8) { nodeInfo[“START”] = infoTmpArrTmp; } if (tmpIndex == 9) { nodeInfo[“TIME”] = infoTmpArrTmp; } if (tmpIndex == 10) { nodeInfo[“COMMAND1”] = infoTmpArrTmp; } if (tmpIndex == 11) { nodeInfo[“COMMAND2”] = infoTmpArrTmp; } tmpIndex++;
}
// extend Date Date.prototype.format = function(format) { var o = { “M+” : this.getMonth() + 1, // month “d+” : this.getDate(), // day “h+” : this.getHours(), // hour “m+” : this.getMinutes(), // minute “s+” : this.getSeconds(), // second “q+” : Math.floor((this.getMonth() + 3) / 3), // quarter “S” : this.getMilliseconds() // millisecond }
} function getNow() { return getNowByFmt(“yyyy-MM-dd hh:mm:ss”); } function getToday() { return getNowByFmt(“yyyy-MM-dd”); } function getNowByFmt(fmt) { return new Date().format(fmt); } function getDateByFmt(date,fmt) { return date.format(fmt); }
其它的如route文件和mongoose的文件还需要贴吗。 整体写的比较乱,想做的就是一个实时在线的服务
前端页面里的代码 <input id=“chatName” type=“hidden”/><input id=“chatUserName” type=“hidden”/> <script type=“text/javascript” src=“http://XX.XX.XX.XX:443/socket.io/socket.io.js”></script> <script type=“text/javascript” src=“http://XX.XX.XX.XX:443/javascripts/browser.js”></script> <script type=“text/javascript” > //获得coolie 的值 function cookie(name){
var cookieArray=document.cookie.split("; “); //得到分割的cookie名值对
var cookie=new Object();
for (var i=0;i<cookieArray.length;i++){
var arr=cookieArray[i].split(”="); //将名和值分开
if(arr[0]==name)return unescape(arr[1]); //如果是指定的cookie,则返回它的值
} return “”; } WEBSSO_LID = cookie(“WEBSSO_LID”); $("#chatName").val(location.href); $("#chatUserName").val(“test” + parseInt(Math.random() * 1000000000000)); chat.login();
</script>
js文件browser.js var browserInfo = navigator.userAgent; var socketioHttp = “http://XX.XX.XX.XX:443”;
var chat = {}; chat.kickUser = function (){ alert(“抱歉,您已在其它地方登录!”); location.reload(); } chat.cmdObj = {“register” : 1 , “leave” : 2 , “talk” : 3 , “func” : 4}; chat.login = function (){ chat.connectServer(); }; chat.getParams = function (params){ if (params) { var arr1 = params.split("&"); var map = {}; for ( var i = 0; i < arr1.length; i++) { var arr2 = arr1[i].split("="); map[arr2[0]] = arr2.length > 1 ? arr2[1] : “”; } return map; } return {}; } chat.connectSocketIo = function(socketioHttp){ chat.socket.socket.reconnect(); } var reconnectInterval ; chat.connectServer = function(){ chat.userName = $("#chatUserName").val(); chat.moudle = $("#chatName").val(); if(!chat.userName || !chat.moudle){ return; } //chat.socket = io.connect(‘http://127.0.0.1:443’); chat.socket = io.connect(socketioHttp , {“reconnect” : false});
}; chat.talk = function(){ var talk_info = $("#talk_info").val(); var to_id = $("#to_id").val(); chat.socket.emit(“msgs” , {cmd : chat.cmdObj[“talk”] , msg : {from : chat.userName ,moudle : chat.moudle ,info : talk_info , toId : to_id}}); $("#talk_info").val(""); }; chat.changeTalk = function(uqName){ $("#to_id").val(uqName); };
顺便问下,您用的是node.js的什么版本,我在这个地方用的是v0.10.5,之前在另台机器上写了一个类似的服务,但是没用express和mongodb,那个是用的v0.6.12,那个就不会出这问题。 今天看到一个老外的帖子遇到类似的问题,他是在9和10版本出现,8这个版本就不会。 如您看到,请帮我查看下对应的版本,谢谢。
我用的node0.8.22,没有用mongodb,用了express3
你的代码太多了?看得晕。你先把自动重连给禁用了:
然后自己写重连,大体是这样的:
然后你查看下客户端日志,验证下重连的情况。我猜测你那个短时间内重练了1000多次,是因为那个浏览器使用的通信协议不对,然后一直连不上。
请问intervalID = setInterval(function(){},这句是做什么用的啊?
@sunuxreg 定时轮询、重连
@sumory Socket.IO基于WebSocket,不是没有轮询吗?
你说的这个,我已经修改了,去掉了自动重连,然后在client里增加了相关的重连代码。 按照你这样的情况来看,我大概确定怎么回事了,结合我之前功能的使用版本v0.6.x,应该是node.js的版本问题,我今天看了一篇国外的帖子,他那边跟我类似也是出现这种情况,V0.8.X版本正常,但是用到9,10就出现我这种报错。 感谢您最近这段时间的解答,后面我会切换回旧版本的node.js,传送们 https://github.com/joyent/node/issues/5108#issuecomment-16897815
@sunuxreg 谁说的socketio是基于websocket的?
@samoin 不错!找到问题就好
@sumory 个人觉得,socket.io相当于提供了一套解决方案,可以尽力兼容各种浏览器,维持类似长连接(全双工)的服务。作为技术人员更多的注意的只是业务逻辑,在这套方案的基础上做开发即可。
@sumory 应该是吧,如果支持WebSocket的话就会默认使用WebSocket,如果不支持的话会采用polling等方式,不是吗?
@sunuxreg 是这样的。但不能说是基于websocket的,它提供了很多协议,htmlfile、jsonp-polling、xhr-polling等,各个都是不同的。我记得socket.io的客户端版本在chrome和firefox上默认只能用websocket,要用其他的话,貌似要改源码,我是改了源码才用的。
@samoin
@sumory 那请问clearInterval(intervalID); 是指在*Polling中才用的吗?WebSocket的时候还用吗?
@sumory 不是在握手中,进行协议协商的时候,改变默认的ws协议就可以了吗?还要改动源码?长轮训等式为了兼容IE等不支持ws协议的浏览器而做的,应该是通用的,按理任何浏览器都会支持的吧。你怎么改动源码的?
@sunuxreg 都有用啊,这个只是在连接失败时重连接
socket.socket.reconnect();@halfblood 前端代码有判断浏览器,然后根据浏览器决定使用什么协议,我把那里改了。
@sumory 长轮询的方式应该在性能等方面都会比ws差些吧,你是什么应用场景啊,需要改成长轮询的方式?
@halfblood 要兼容所有浏览器.所以直接把ws禁了,维护起来也方便.
@sumory 那你对时间参数的设置是如何的。比如"close timeout" , “heartbeat timeout” , “heartbeat interval” , “polling duration” 这些,默认的感觉对非ws协议的断开判断等1分钟太长了,又不敢设置得太小,我现在是取的一半的值。
@samoin 我也取了几十秒。你要根据你的业务来判断到底选什么时间合适啊。我到时觉得这些值的设置不是重点,1分钟也好,30秒也好。客户端方面关键是你中断的这段时间内,要友好的提示用户,并做好重连前的善后处理。服务端方面,做好连接断开后的对象处理,比如是否要删掉不可用的socket对象,业务数据是否要重置等等。
@sumory 嗯,友好提示很重要。请问你们上线前有对socket.io压力测试吗,如果有的话请问是用的什么工具。目前为止只是听说要自己弄机器人来压,具体不清楚是怎么是想的,还请指点迷经,谢谢。
@samoin 去年在github上找了一个websocket的库模拟压测过,不过那个很老了都不更新了。我们的应用规模不大,所以没做太多性能测试。
@sumory 哦,好的,谢谢啊
@samoin 哦,对了,腾讯朋友网有做过一个使用node.js做长连接的分享,那个里有性能测试。其实只是用长连接的话,你自己用node.js写也很容易的。
@sumory 哦,谢谢。我之前有听过pamelo的演讲,他们的框架通讯也基于socket.io,我看他说是800+的并发,机器人就是从他那听到的,但是不知道自己怎么弄,呵呵。 嗯,之前在做页游的时候只是用flash来做通讯,现在公司的需求号称要兼容所有浏览器(感觉各种苦逼),就想到用socket.io来试试。现在考虑到在线并发的性能,所以想自己看看能支持多少。呵呵。
@samoin 我在我的普通虚机上测了下,前面加了一层nginx代理,测了下长连接的rps在500左右还能保持较高的响应速度。
@sumory 谢谢