Kafka 分布式消息系统示例
288 views
0
Kafka是Linkedin于2010年12月份开源的消息系统,它主要用于处理活跃的流式数据。活跃的流式数据在web网站应用中非常常见,这些数据包括网站的pv、用户访问了什么内容,搜索了什么内容等。 这些数据通常以日志的形式记录下来,然后每隔一段时间进行一次统计处理。
传统的日志分析系统提供了一种离线处理日志信息的可扩展方案,但若要进行实时处理,通常会有较大延迟。而现有的消(队列)系统能够很好的处理实时或者近似实时的应用,但未处理的数据通常不会写到磁盘上,这对于Hadoop之类(一小时或者一天只处理一部分数据)的离线应用而言,可能存在问题。Kafka正是为了解决以上问题而设计的,它能够很好地离线和在线应用。
1、下载
最新版本源代码 http://incubator.apache.org/kafka/downloads.html
2、安装
解压后,进入目录,执行
./sbt update ./sbt package
3、启动服务
./bin/zookeeper-server-start.sh config/zookeeper.properties ./bin/kafka-server-start.sh config/server.properties
4、编写producer,连续发送10万条消息
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.javaapi.producer.ProducerData;
import kafka.producer.ProducerConfig;
public class producer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("zk.connect", "127.0.0.1:2181");
props.put("serializer.class", "kafka.serializer.StringEncoder");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
ProducerData<String, String> data ;
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
for (int i=0;i<=100000;i++)
{
data = new ProducerData<String, String>("test", df.format(new Date()));
producer.send(data);
}
}
}
5、编写consumer,接收消息
import java.nio.ByteBuffer;
import kafka.api.FetchRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.Message;
import kafka.message.MessageAndOffset;
public class consumer{
public static String getMessage(Message message)
{
ByteBuffer buffer = message.payload();
byte [] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
return new String(bytes);
}
public static void main(String[] args) {
SimpleConsumer consumer = new SimpleConsumer("127.0.0.1", 9092, 10000, 1024000);
long offset = 0L;
while (true) {
FetchRequest fetchRequest = new FetchRequest("test", 0, 0L, 100000);
ByteBufferMessageSet messages = consumer.fetch(fetchRequest);
for (MessageAndOffset messageAndOffset : messages) {
System.out.println(getMessage(messageAndOffset.message()));
offset = offset + messageAndOffset.offset();
}
}
}
}
经过本机(i5, 8G内存)编译运行测试,接收10万条消息,耗时不超过3秒。
参考推荐:
版权所有: 本文系米扑博客原创、转载、摘录,或修订后发表,最后更新于 2021-01-18 23:26:58
侵权处理: 本个人博客,不盈利,若侵犯了您的作品权,请联系博主删除,莫恶意,索钱财,感谢!
转载注明: Kafka 分布式消息系统示例 (米扑博客)