Kafka是Linkedin于2010年12月份开源的消息系统,它主要用于处理活跃的流式数据。活跃的流式数据在web网站应用中非常常见,这些数据包括网站的pv、用户访问了什么内容,搜索了什么内容等。 这些数据通常以日志的形式记录下来,然后每隔一段时间进行一次统计处理。

传统的日志分析系统提供了一种离线处理日志信息的可扩展方案,但若要进行实时处理,通常会有较大延迟。而现有的消(队列)系统能够很好的处理实时或者近似实时的应用,但未处理的数据通常不会写到磁盘上,这对于Hadoop之类(一小时或者一天只处理一部分数据)的离线应用而言,可能存在问题。Kafka正是为了解决以上问题而设计的,它能够很好地离线和在线应用。

1、下载

最新版本源代码 http://incubator.apache.org/kafka/downloads.html 

 

2、安装

解压后,进入目录,执行

./sbt update
./sbt package

 

3、启动服务

./bin/zookeeper-server-start.sh config/zookeeper.properties
./bin/kafka-server-start.sh config/server.properties

 

4、编写producer,连续发送10万条消息

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.javaapi.producer.ProducerData;
import kafka.producer.ProducerConfig;
public class producer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("zk.connect", "127.0.0.1:2181");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        ProducerConfig config = new ProducerConfig(props);
        Producer<String, String> producer = new Producer<String, String>(config);
        ProducerData<String, String> data ;
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
         for (int i=0;i<=100000;i++)
        {
            data = new ProducerData<String, String>("test", df.format(new Date()));
            producer.send(data);
        }
    }
}

 

5编写consumer,接收消息

import java.nio.ByteBuffer;
import kafka.api.FetchRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.Message;
import kafka.message.MessageAndOffset;

public class consumer{
      public static String getMessage(Message message)
      {
        ByteBuffer buffer = message.payload();
        byte [] bytes = new byte[buffer.remaining()];
        buffer.get(bytes);
        return new String(bytes);
      }

    public static void main(String[] args) {
        SimpleConsumer consumer = new SimpleConsumer("127.0.0.1", 9092, 10000, 1024000);
        long offset = 0L;
        while (true) {
          FetchRequest fetchRequest = new FetchRequest("test", 0, 0L, 100000);
          ByteBufferMessageSet messages = consumer.fetch(fetchRequest);
          for (MessageAndOffset messageAndOffset : messages) {
            System.out.println(getMessage(messageAndOffset.message()));
            offset = offset + messageAndOffset.offset(); 
          }
        }
    }
}

经过本机(i5, 8G内存)编译运行测试,接收10万条消息,耗时不超过3秒。

 

 

参考推荐:

Kafka 分布式消息系统架构

Kafka 分布式消息系统示例

Kafka:下一代分布式消息系统