精品 快速 fork 多个进程的 fork-list 模块
 发布于 11 年前  作者 lellansin  7943 次预览  最后一次回复是 11 年前  来自 分享 

Fork List NPM Version Build Status

前一阵子因为工作需要, 要写一个中转服务器, 然后发现用 node.js 做多进程不是想象中的那么轻松, 想找个现成的模块也没有发现什么好的模块 (官方的 cluster 确实没有想象中过的好), 于是就自己写了一个. 如你所见 fork-list 就是 fork a list of child processes. 该模块是一个 轻量级 的多进程 fork 模块, 使用该模块可以轻松的 fork 一堆子进程出来. 由于是一个轻量级的模块, 所以不推荐在较大的项目中使用, 比如一些分布式应用就不推荐. 不过在一些小项目里面用这个模块还是比较赞的. 比如很多人喜欢用 node.js 写爬虫, 但是 node.js 处理复杂计算效率不是很高, 这个时候我们可以把一些耗时的操作 fork 出去让 node.js 可以继续往下爬了, 那么这种情况就比较推荐使用 fork-list 了, 它可以很轻松的插入的你的项目中, 把耗时的操作交给其他进程去做.

安装

npm install fork-list

简单示例

var ForkList = require('fork-list');

// 指定要 fork 多进程的脚本
var path = './script/write';

// 进程数目
var num = 3;

var forks = new ForkList({
    path: path,
    num: num
});

for (var i = 0; i < 10; i++) {
    // 向子进程发送数据
    forks.send('hello~', i);
}

// 关闭所有子进程
forks.shutdown();

./script/write.js 文件内容:

var Forks = require('fork-list');

Forks.proc(function(data1, data2) {
    console.log('Work id:', this.workid, 'recv data1:', data1, 'data2:', data2);
});

输出:

Work id: 1 recv data1: hello~ data2: 1
Work id: 2 recv data1: hello~ data2: 0
Work id: 0 recv data1: hello~ data2: 5
Work id: 1 recv data1: hello~ data2: 2
Work id: 2 recv data1: hello~ data2: 3
Work id: 1 recv data1: hello~ data2: 9
Work id: 2 recv data1: hello~ data2: 4
Work id: 0 recv data1: hello~ data2: 6
Work id: 2 recv data1: hello~ data2: 7
Work id: 2 recv data1: hello~ data2: 8

简单应用

以网络爬虫为例吧, 首先是 curl.js 文件:

var ForkList = require('fork-list');

// 来 fork 四个子进程
var dao_list = new ForkList({
    path: './daoScript',
    num: 4
});

exports.curl = function(str) {
    // 做点什么, 最后得到你想要的如下数据 
    var title, summary, keywork, resource;
    // 扔给子进程
    dao_list.send(title, summary, keywork, resource);
}

这里提供一个趴取网页具体内容的接口, 处理一些基础信息. 然后把基础信息丢给子进程, 让数据库去处理比较耗时的操作 (如文件读写, 数据库读写).

var dao = require('./recordDao');
var forkList = require('fork-list');

forkList.proc(function(title, summary, keywork, resource) {
    // 那么这里就是子进程了, 通过 this.workid 可以看到当前子进程的编号

    // 你可以在这里把 resource down 下来保存成文件
    var path = wget(resource);

    // 然后把你要的数据写库
    dao.record(title, summary, keywork, path);
});

这样就轻松的甩掉了一些耗时的操作. 让主进程可以处理更多的任务. 值得一提的是, 博主拿网页也是通过该模块 fork 来执行的, 情况大概是这样的, 扫面一个 ip 有没有开发 80 端口, 有的话就把这个 ip/host 扔给子进程 curl-list 去请求网页, 然后简单的处理了一下网页之后, 耗时的操作又扔给另一个子进程 dao-list 去从存. 这样一说你应该能够比较直观的感受到这个模块的灵活性了. 由于 cpu 核数目的问题所以我都只是 fork 2个, 这个项目的进程大概像下面这样:

主进程 扫描 ip |—> 爬网页 |—> 耗时的写入文件存库等 |—> 耗时操作… |—> 爬网页 |—> 耗时操作… |—> 耗时操作…

文档

初始化

  • new
  • count
  • setClassifier
  • setLogger

传输

  • send
  • forward
  • proc

控制

  • kill
  • killByPid
  • shutdown

事件

  • onExit
  • onError
  • onFinish

初始化

new(path)

根据指定的 path 路径 fork 单个子进程. ( 你也可以通过 构造参数指定来配置指定简单的fork多个, 详见开头的例子 )

count()

获取当前子进程总数

setClassifier(classifier)

主进程发送给子进程的数据, 是通过分类器来分派给子进程的. 你可以通过该函数来设置自定义的分类器. 例如根据数据内容来指定分流到某几个子进程, 或者根据自己写好的随机算法来指派子进程, 亦或是自己维护一个类似在线列表的东西根据子进程的负载来指定发送到那个子进程.

setLogger(logger)

启用或关闭 fork-list 调试, 或者设置指定的 logger 来输出调试信息 (例如使用 log4js.getLogger 得到的 logger ).

示例

var ForkList = require('fork-list');
var underscore = require('underscore');

var times = 10;
var forks = new ForkList();

forks.new('./script');
forks.new('./script');

forks.setClassifier(function(msg, done) {
    var id = underscore.random(0, forks.count() - 1);
    done(null, id);
});

for (var i = 0; i < times; i++) {
    forks.send(i, 'some msg');
}

forks.shutdown();

传输

send(msg, …, cb)

【主进程】: 传输普通数据

var forkList = require('fork-list');

// 指定要 fork 多进程的脚本
var path = './test';

// 进程数目
var num = 2;

var forks = new ForkList({
    path: path,
    num: num
});

// 发送数据到子进程
forks.send(somedata, ...);

普通数据包括:

  • Number
  • String
  • Array
  • JSON
  • Object*

注意 Object 数据不会完整传输给子进程:

var forks = new ForkList({
    path: path
});

function test(name) {
    this.name = name || 'default';
}

test.prototype.hi = function() {
    console.log('my name is ', this.name);
};

for (var i = 0; i < times; i++) {
    var t = new test('Alan' + i);
    forks.send(i, t);
}

你在子进程中只会受到形如 { name: 'AlanX' } 的基础数据, prototype 上的东西都不会传输, 无法在子进程中调用 .hi 方法.

forward(type, handleObject, …, cb)

【主进程】: 传输 Handle object, 包括 server 对象 和 socket 对象.

Socket 传输示例

server.js

var net = require('net');
var config = require('./my_config');
var ForkList = require("fork-list");

var num = 2;
var path = './worker';

var workers = new ForkList({
    path: path,
    num: num
});

var server = net.createServer();

server.on('connection', function(sock) {

    workers.foward('socket', sock);

    sock.on('error', function(e) {
        console.log('[server] Error:', e);
    });
});

server.listen(config.port);

console.log('test socket server start');

worker.js

var ForkList = require('fork-list');

ForkList.proc(function(sock) {
    var workid = this.workid;
    sock.on('data', function(data) {
        console.log('[worker] id:', workid, 'data:', data.toString());
    });
});

test client.js

var config = require('./my_config');
var net = require('net');

var client = net.connect({
    hostname: config.host,
    port: config.port
});

client.on('connect', function() {
    var str = 'hello ';
    for (var i = 0; i < 100; i++) {
        client.write(str + i + ' ');
    }
    client.end();
});

client.on('end', function() {
    console.log('client send over!');
});

proc(cb)

【子进程】: 获取主进程传输来的数据.

var forkList = require('fork-list');

forkList.proc(function(msg, ...) {
    /* code */
});

控制

kill(workid)

根据子进程的工作id来结束强制子进程. (工作id 从0 开始, 到 .count() - 1) 子进程结束的时候会触发 exit 事件.

killByPid(pid)

根据子进程的 pid 来结束强制子进程

shutdown()

强制结束所有子进程. 当所有子进程 exit 之后, 会触发 finish 事件.

事件

onExit

onError

onFinish

Example:

var ForkList = require('fork-list');

var path = './script';
var num = 3;

var forks = new ForkList({
    path: path,
    num: num,
    log: true
});

for (var i = 10; i >= 0; i--) {
    forks.send('hello ' + i);
};

forks.on('error', function(err, pid) {
    console.log('--> Error:', err.message, 'pid:', pid);
});

forks.on('exit', function(pid) {
    console.log('--> Child process exit, pid:', pid);
    forks.killByPid(pid);
});

forks.on('finish', function() {
    console.log('--> All of child process has exited');
});

forks.shutdown();

script.js

var Forks = require('fork-list');

Forks.proc(function(data1, data2) {
    console.log('Work id:', this.workid, 'recv data1:', data1);
});

Output:

Work id: 0 recv data1: hello 10
Work id: 0 recv data1: hello 8
Work id: 0 recv data1: hello 7
Work id: 2 recv data1: hello 9
Work id: 0 recv data1: hello 6
Work id: 1 recv data1: hello 5
Work id: 0 recv data1: hello 2
Work id: 0 recv data1: hello 0
Work id: 1 recv data1: hello 4
Work id: 1 recv data1: hello 3
Work id: 1 recv data1: hello 1
--> Child process exit, pid: 15064
--> Error: IPC channel is already disconnected pid: 15064
--> Child process exit, pid: 13072
--> Error: IPC channel is already disconnected pid: 13072
--> Child process exit, pid: 4900
--> Error: IPC channel is already disconnected pid: 4900
--> All of child process has exited

测试

测试命令:

cd node_modules/fork-list
npm test

license

MIT

Github

https://github.com/Lellansin/node-forklist

4 回复
alsotang

留个 github 地址

lellansin

@alsotang 刚刚才看到, 已经加了 :-)

xujun52011

进程的开销还是太大了, 而且进程间通信的效率也不是很高, 推荐用tagg2

lellansin

@xujun52011 吴老师的这个也很赞, 只不过这个模块有点 重量级. 很久以前用的时候还不支持 windows, 不过现在好像好很多了, 在 nodejs 里面用多线程确实很赞. fork 多进程的时候每个进程都会话费几M大小的内存开销, 所以如果 fork 上千个的话内存估计就会爆掉, 这也是我写这个 fork list 的初衷—— fork 特定数目的进程然后方便的把消息转发给子进程. 这个 fork-list 的通信是用原生的 nodejs 接口实现的, 所以更轻量级, 某种程度上说也会更稳定一些, 比较小的项目还是推荐的.