Kafka 分布式消息系统示例
268 views
0
Kafka是Linkedin于2010年12月份开源的消息系统,它主要用于处理活跃的流式数据。活跃的流式数据在web网站应用中非常常见,这些数据包括网站的pv、用户访问了什么内容,搜索了什么内容等。 这些数据通常以日志的形式记录下来,然后每隔一段时间进行一次统计处理。
传统的日志分析系统提供了一种离线处理日志信息的可扩展方案,但若要进行实时处理,通常会有较大延迟。而现有的消(队列)系统能够很好的处理实时或者近似实时的应用,但未处理的数据通常不会写到磁盘上,这对于Hadoop之类(一小时或者一天只处理一部分数据)的离线应用而言,可能存在问题。Kafka正是为了解决以上问题而设计的,它能够很好地离线和在线应用。
1、下载
最新版本源代码 http://incubator.apache.org/kafka/downloads.html
2、安装
解压后,进入目录,执行
./sbt update ./sbt package
3、启动服务
./bin/zookeeper-server-start.sh config/zookeeper.properties ./bin/kafka-server-start.sh config/server.properties
4、编写producer,连续发送10万条消息
import java.text.SimpleDateFormat; import java.util.Date; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.javaapi.producer.ProducerData; import kafka.producer.ProducerConfig; public class producer { public static void main(String[] args) { Properties props = new Properties(); props.put("zk.connect", "127.0.0.1:2181"); props.put("serializer.class", "kafka.serializer.StringEncoder"); ProducerConfig config = new ProducerConfig(props); Producer<String, String> producer = new Producer<String, String>(config); ProducerData<String, String> data ; SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); for (int i=0;i<=100000;i++) { data = new ProducerData<String, String>("test", df.format(new Date())); producer.send(data); } } }
5、编写consumer,接收消息
import java.nio.ByteBuffer; import kafka.api.FetchRequest; import kafka.javaapi.consumer.SimpleConsumer; import kafka.javaapi.message.ByteBufferMessageSet; import kafka.message.Message; import kafka.message.MessageAndOffset; public class consumer{ public static String getMessage(Message message) { ByteBuffer buffer = message.payload(); byte [] bytes = new byte[buffer.remaining()]; buffer.get(bytes); return new String(bytes); } public static void main(String[] args) { SimpleConsumer consumer = new SimpleConsumer("127.0.0.1", 9092, 10000, 1024000); long offset = 0L; while (true) { FetchRequest fetchRequest = new FetchRequest("test", 0, 0L, 100000); ByteBufferMessageSet messages = consumer.fetch(fetchRequest); for (MessageAndOffset messageAndOffset : messages) { System.out.println(getMessage(messageAndOffset.message())); offset = offset + messageAndOffset.offset(); } } } }
经过本机(i5, 8G内存)编译运行测试,接收10万条消息,耗时不超过3秒。
参考推荐:
版权所有: 本文系米扑博客原创、转载、摘录,或修订后发表,最后更新于 2021-01-18 23:26:58
侵权处理: 本个人博客,不盈利,若侵犯了您的作品权,请联系博主删除,莫恶意,索钱财,感谢!
转载注明: Kafka 分布式消息系统示例 (米扑博客)