node-kafka中获取消息
发布于 9 年前 作者 songqinghehe 14033 次预览 最后一次回复是 8 年前 来自 问答
最近在写node-kafka的应用,但是遇到一个问题,就是读取消息的时候,node读取kafka的消息还没读完,如果不延时获取信息的话,是拿不到数据的,请问该如何操作? 以下为我写的代码:
exports.ognvPullMsg = function (req, res) {
var userId = req.body.userId;
if (userId == 'undefined' || userId <= 0) {
res.send('{"state":"301","message":"\u975e\u6cd5userId"}');
}
var Consumer = kafka.Consumer;
var client = new kafka.Client();
var consumer = new Consumer(client,[{ topic: 'ognv-backend-log-'+userId, partition: 0 }],{autoCommit: false});
var realData = [];
consumer.on('message', function (message) {
console.log(message);
realData.push(message);
});
setTimeout(function() {
res.send(JSON.stringify(realData));
}, 200);
}
12 回复
‘use strict’;
@yxz1025 你好,我第一次看你的代码是在csdn上,给你私信了,没有回复我,没想到在这里看到你了。
@yxz1025 我看你的代码时,每向args push信息的时候 都会调用load事件,最后可以拿到全部的数据么?
之前一直很忙 不好意思
你是要从kafka取数据是吧,kafka-node这种需要用co函数包装一下 不然你回调的数据保存不到数据库的
@yxz1025 目前的想法是获取此用户下面的所有数据并展示给用户,没有考虑存到数据库中
我这里通过emit函数将获取到的数据转到loader了,你可以引入一个 var Promise = require(“bluebird”); 然后写一个promise函数,详细代码如下:
@songqinghehe 下面写了一个 你看看,
@yxz1025 谢谢,我先看看,不懂得在请教你一下。
@songqinghehe 好的 只要把获取的流返回到promise就好了
@yxz1025 上面的代码有完整的代码库么,谢谢!
@yxz1025 consumerGroup也可以这些实现么?