Kafka代码API

卡夫卡是运用scala语言开垦,相通于RabbitMQ的布满式信息系统。卡夫卡是布满式的,它经过方可多少个broker组成壹个集群。卡夫卡重视于Zookeeper。

卡夫卡基本原理

1.起家工程,导入相应的jar包

Topic 特定项目标音讯流。音讯是字节的得力载荷,话题是音信的归类或种子名。

Apache 卡夫卡是遍及式发布-订阅音信系统。它最先由LinkedIn集团花销,之后成为Apache项指标豆蔻年华局地。卡夫卡是风华正茂种高效、可扩张的、设计内在正是布满式的,分区的和可复制的提交日志服务。

Procuder类

Broker 或称Kafka集群。用于保存信息的服务器。

卡夫卡构造组件

package cn.itcast.kafka;

Producer 能够透露新闻到话题的别的对象。

话题(Topic):是一定项指标消息流。音信是字节的实用载荷(Payload),话题是音信的归类名或种子(Feed)名。

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

Consumer 能够订阅多个或多个话题,并从Broker拉取数据,进而开销那个已宣告的消息。

生产者(Producer):是能够透露音讯到话题的其他对象。

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;

劳动者接受本人的种类化方法对音信内容开展编码。然后向broker发起消息。为了提升作用,一个发表央浼中得以包罗大器晚成组新闻。

劳动代办(Broker):已揭露的消息保存在一组服务器中,它们被叫作代理(Broker)或卡夫卡集群。

public class ConsumerDemo {
 
 //要读取的数据主旨
 private static final String topic = "kfc";
 //消费者的数目
 private static final Integer threads = 2;
 
 public static void main(String[] args) {
 
  Properties props = new Properties();
  //指定zookeeper的地址
  props.put("zookeeper.connect", "storm01:2181,storm02:2181,storm03:2181");
  //花费组的编号
  props.put("group.id", "1111");
  //偏移量,从哪些岗位读
  props.put("auto.offset.reset", "smallest");
 
  ConsumerConfig config = new ConsumerConfig(props);
  ConsumerConnector consumer = Consumer.createJavaConsumerConnector(config);
  HashMap<String, Integer> topicCountmap = new HashMap<String,Integer>();
  topicCountmap.put(topic, threads);
 
  //依照map获取具有的大旨遥相呼应的信息流
  Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountmap);
  //获取有些大旨的消息流
  List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
 
  //开启多个顾客进程,读取大旨下的流
  for (final KafkaStream<byte[], byte[]> kafkaStream : streams) {
   new Thread(new Runnable() {
   
    @Override
    public void run() {
     for (MessageAndMetadata<byte[], byte[]> messageAndMetadata : kafkaStream) {
      System.err.println(new String(messageAndMetadata.message()));
     }
     
    }
   }).start();
  }
 
 }
}

买主订阅话题,并为话题创造二个或八个音信流。公布到该话题的音讯被平均的散发到那些流中。

消费者(Consumer):能够订阅叁个或八个话题,并从Broker拉数量,进而花费那个已发布的音讯。

consumer--消费者类

各种信息流为不断发出的音讯提供了迭代接口。

kafka储存战略

package cn.itcast.kafka;

买主迭代流中每一条音讯,并拍卖音讯的一蹴而就载荷。

kafka以topic来开展消息管理,每一种topic包括三个partition,每一种partition对应五个逻辑log,有五个segment组成。

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

迭代器不会停下。借使当前尚无音信,迭代器将封堵直至有新的消息表露到该话题。

每一种segment中储存多条新闻(见下图),音讯id由其论理地点决定,即从消息id可一直固定到音信的蕴藏地点,防止id到岗位的额外映射。

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;

卡夫卡辅助点到点分发模型(Proint-to-point delivery model卡塔尔(英语:State of Qatar),即七个买主一同花费队列中某些音信的单个副本;也支撑宣布-订阅模型(Publish-subscribe model卡塔尔(قطر‎,即七个客商选择本身的新闻别本。

种种part在内部存储器中对应一个index,记录每种segment中的第一条信息偏移。

public class ConsumerDemo {
 
 //要读取的数码宗旨
 private static final String topic = "kfc";
 //消费者的多少
 private static final Integer threads = 2;
 
 public static void main(String[] args) {
 
  Properties props = new Properties();
  //指定zookeeper的地址
  props.put("zookeeper.connect", "storm01:2181,storm02:2181,storm03:2181");
  //花费组的号码
  props.put("group.id", "1111");
  //偏移量,从哪个地方读
  props.put("auto.offset.reset", "smallest");
 
  ConsumerConfig config = new ConsumerConfig(props);
  ConsumerConnector consumer = Consumer.createJavaConsumerConnector(config);
  HashMap<String, Integer> topicCountmap = new HashMap<String,Integer>();
  topicCountmap.put(topic, threads);
 
  //依照map获取具有的主旨一呼百诺的音讯流
  Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountmap);
  //获取某些核心的音信流
  List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
 
  //开启多个买主进程,读取宗旨下的流
  for (final KafkaStream<byte[], byte[]> kafkaStream : streams) {
   new Thread(new Runnable() {
   
    @Override
    public void run() {
     for (MessageAndMetadata<byte[], byte[]> messageAndMetadata : kafkaStream) {
      System.err.println(new String(messageAndMetadata.message()));
     }
     
    }
   }).start();
  }
 
 }
}

kafka-proc-topic-part-con.png

发表者发到某个topic的新闻会被均匀的遍及到三个partition上(或基于客商钦赐的路由法则进行布满),broker收到发布新闻往对应partition的最后四个segment上增添该消息,当有些segment上的音信条数达到配置值或音讯公布时间抢先阈值时,segment上的音讯会被flush到磁盘,只有flush到磁盘上的音信订阅者本领订阅到,segment到达自然的深浅后将不会再往该segment写多少,broker会创设新的segment。

分布式发布订阅消息系统 卡夫卡 结构划虚构计 http://www.linuxidc.com/Linux/2013-11/92751.htm

kafka的蕴藏,话题的各样分区对应一个逻辑日志。物理上,二个日记为同样大小的朝气蓬勃段分组文件。

kafka删除攻略

Apache 卡夫卡 代码实例 http://www.linuxidc.com/Linux/2013-11/92754.htm

每一趟临蓐者公布新闻到一个分区,代理就将新闻追加到后一个段文件中。

1.N天前的删减。

Apache 卡夫卡 教程笔记 http://www.linuxidc.com/Linux/2014-01/94682.htm

当公布的音信数量达到设定值或透过生机勃勃段时间后,段文件真正写入磁盘中。

2.封存目前的MGB数据。

卡夫卡使用入门教程 http://www.linuxidc.com/Linux/2014-07/104470.htm

写入实现后,音信公开给买主。

Kafka broker

卡夫卡的详实介绍:请点这里
Kafka的下载地址:请点这里

与历史观的消息分裂,kafka系统中储存的新闻并未有分明性的id,而是经过日记的逻辑偏移量来公开。比较其它措施,这种管理更为便捷。

与此外新闻系统分裂,卡夫卡broker是无状态的。那代表消费者必需保险已费用的情景新闻。那个新闻由购买者自身维护,broker完全不管(有offset managerbroker管理)。

本文长久更新链接地址:http://www.linuxidc.com/Linux/2014-09/107383.htm

消费者始终从卓绝分区顺序的拿走信息。

从代理删除音信变得很讨厌,因为代理并不知道消费者是还是不是已经采取了该音讯。卡夫卡校勘性地消除了这些题目,它将三个粗略的基于时间的SLA应用于保留战术。当音讯在代理中国足球联赛过一准时间后,将会被机关删除。这种翻新设计有不小的平价,消费者能够故意倒回来老的偏移量再一次花费数据。这违反了队列的周围约定,但被验证是比超级多买主的基本特征。

澳门新葡亰网站所有平台 1

不相同于其余新闻系统,kafka代理是无状态的,即购买者必需保证已花销的情状消息,而代理完全不管。

以下来自kafka官方文书档案:

这种规划的换代在于:

Kafka Design

代办以一个依照时间的SLA应用于保留战略。当音信在代理中中国足球球联赛越一按期期后,将会被电动删除。

目标

买主能够故意倒回来老的偏移量再度开支数量。就算这违规了队列的大范围约定,但大面积于广大作业中。

  1. 高吞吐量来帮助高体积的平地风波流管理

与zookeeper的关系

2.支撑从离线系统加载数据

kafka使用ZooKeeper用于管理、协和代理。每一个卡夫卡代理通过Zookeeper和煦其余卡夫卡代理。

  1. 低顺延的音讯系统

当卡夫卡系统中新添了代理或有个别代理失效时,Zookeeper服务将布告劳动者和买主。

持久化

坐褥者与买主要原由此开头与别的代理和睦职业。

  1. 依傍文件系统,漫长化到本地

  2. 数码长久化到log

wget tar -xzvf kafka_2.11-0.8.2.1.tgz

效率

config/server.properties

1、 解决”small IO problem“:

# 若是安顿多少个kafka节点,id需安装为分歧的值broker.id=1# !!务必将host.name配置为ip地址。# 在java代码里总是kafka时,服务端会把host.name的值传给zookeeper# 若是应用暗中认可配置的localhost,会现身连输的特别host.name=192.168.1.1port=9092log.dir=./logs# 假使有几个zookeeper服务,用,号隔离就可以。# zookeeper使用私下认可配置的2181端口zookeeper.connect=192.168.1.1:2181zookeeper.connection.timeout.ms=6000# 先启动zookeeper服务./zookeeper-server-start.sh ../config/zookeeper.properties # 再启动kafka服务./kafka-server-start.sh ../config/server.properties./kafka-topics.sh --create --zookeeper 192.168.1.1:2181 --replication-factor 2 --partitions 1 --topic test./kafka-topics.sh --describe --zookeeper 192.168.1.1:2181 --topic test

利用”message set“组合音信。

起步临蓐者临盆该Topic的新闻

server使用”chunks of messages“写到log。

./kafka-console-producer.sh --broker-list 192.168.1.1:9092 --topic test

consumer二遍拿走大的音讯块。

起步消费者花费该Topic的消息

2、解决”byte copying“:

./kafka-console-consumer.sh --zookeeper 192.168.1.1:2181 --from-beginning -topic test

在producer、broker和consumer之间使用统后生可畏的binary message format。

Java代码Producer 测试

选择系统的pagecache。

import java.util.Date;import java.util.Properties;import kafka.javaapi.producer.Producer;import kafka.producer.KeyedMessage;import kafka.producer.ProducerConfig;import kafka.serializer.StringEncoder;public class KafkaProducer { public static void testProducer() { Properties props = new Properties(); props.put("metadata.broker.list", "192.168.1.1:9092"); props.put("serializer.class", StringEncoder.class.getName; //props.put("partitioner.class", ); props.put("request.required.arks", "1"); ProducerConfig config = new ProducerConfig; Producer String, String producer = new Producer String, String ; String msg = new Date()

应用sendfile传输log,制止拷贝。

  • " - hello world : 测试 " ; KeyedMessage String, String data = new KeyedMessage String, String ; producer.send; producer.close(); System.out.println("-- producer sended: " msg); public static void main { testProducer();}

端到端的批量减少(End-to-end Batch Compression)

Consumer 测试

卡夫卡辅助GZIP和Snappy压缩契约。

import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Properties;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import kafka.consumer.ConsumerConfig;import kafka.consumer.ConsumerIterator;import kafka.consumer.KafkaStream;import kafka.javaapi.consumer.ConsumerConnector;public class KafkaConsumer { private final ConsumerConnector consumer; private final String topic; private ExecutorService executor; public KafkaConsumer(String a_zookeeper, String a_groupId, String a_topic) { this.consumer = kafka.consumer.Consumer .createJavaConsumerConnector(createConsumerConfig(a_zookeeper, a_groupId)); this.topic = a_topic; private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) { Properties props = new Properties(); props.put("zookeeper.connect", a_zookeeper); props.put("group.id", a_groupId); props.put("zookeeper.session.timeout.ms", "1000"); props.put("zookeeper.sync.time.ms", "1000"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "smallest"); return new ConsumerConfig; public void shutdown() { if consumer.shutdown(); if executor.shutdown(); public void run { Map String, Integer topicCountMap = new HashMap String, Integer topicCountMap.put(topic, new Integer; Map String, List KafkaStream byte[], byte[] consumerMap = consumer .createMessageStreams; List KafkaStream byte[], byte[] streams = consumerMap.get; System.out.println("streams.size = " streams.size; // now launch all the threads executor = Executors.newFixedThreadPool; // now create an object to consume the messages int threadNumber = 0; for (final KafkaStream stream : streams) { executor.submit(new ConsumerTest); threadNumber ; public static void main { String zooKeeper = "192.168.212.100:2181"; String groupId = "group1"; String topic = "test"; int threads = 3; KafkaConsumer example = new KafkaConsumer(zooKeeper, groupId, topic); example.run; public class ConsumerTest implements Runnable { private KafkaStream m_stream; private int m_threadNumber; public ConsumerTest(KafkaStream a_stream, int a_threadNumber) { m_threadNumber = a_threadNumber; m_stream = a_stream; public void run() { System.out.println("calling ConsumerTest.run; ConsumerIterator byte[], byte[] it = m_stream.iterator(); while { System.out.println("-- consumer Thread " m_threadNumber ": " new String; System.out.println("Shutting down Thread: " m_threadNumber);}

The Producer

负载均衡

producer能够自定义发送到哪个partition的路由法则。默许路由准绳:hash(key卡塔尔%numPartitions,假使key为null则随机选用一个partition。

自定义路由:要是key是三个user id,能够把同叁个user的音信发送到同二个partition,当时consumer就可以从同三个partition读取同二个user的新闻。

异步批量发送

批量出殡和下葬:配置非常少于一定新闻数目一齐发送而且等待时间低于四个一定延迟的数额。

The Consumer

consumer调节新闻的读取。

Push vs Pull

1)producer push data to broker,consumer pull data from broker

2卡塔尔(英语:State of Qatar)consumer pull的长处:consumer本人主宰新闻的读取速度和数码。

3卡塔尔(英语:State of Qatar)consumer pull的弱项:假若broker十分的少,则只怕要pull数次忙等待,卡夫卡能够配备consumer long pull一向等到有数量。

Consumer Position

1卡塔尔国当先六分之三音信系统由broker记录哪些音信被开支了,但卡夫卡不是。

2卡塔尔(英语:State of Qatar)卡夫卡由consumer调节音讯的开支,consumer以致可以回来叁个old offset的岗位再度花费音讯。

Message Delivery Semantics

三种:

At most once—Messages may be lost but are never redelivered.

At least once—Messages are never lost but may be redelivered.

Exactly once—this is what people actually want, each message is delivered once and only once.

Producer:有个”acks“配置能够垄断选拔的leader的在怎么着状态下就回应producer新闻写入成功。

Consumer:

* 读废除息,写log,管理信息。纵然拍卖新闻战败,log已经写入,则无从再一次拍卖失利的信息,对应”At most once“。

* 读取音讯,处理音讯,写log。假诺音讯管理成功,写log退步,则信息会被拍卖两回,对应”At least once“。

* 读取音讯,同期管理音信并把result和log同有的时候候写入。那样保险result和log同一时候更新或同期失败,对应”Exactly once“。

卡夫卡私下认可有限支撑at-least-once delivery,容许顾客完毕at-most-once语义,exactly-once的完成决议于指标存款和储蓄系统,kafka提供了读取offset,完结也未有时常。

复制(Replication)

1)二个partition的复制个数(replication factor)包括这一个partition的leader本身。

2)全数对partition的读和写都经过leader。

3)Followers通过pull获取leader上log(message和offset)

4)尽管多个follower挂掉、卡住恐怕联合太慢,leader会把那个follower从”in sync replicas“(IS福特Explorer)列表中去除。

5)当有着的”in sync replicas“的follower把四个音讯写入到和煦的log中时,那么些新闻才被以为是”committed“的。

6)假使针对有个别partition的享有复制节点都挂了,卡夫卡采取最初复活的十一分节点作为leader(那么些节点不必然在ISSportage里)。

日记压缩(Log Compaction)

1)针对贰个topic的partition,压缩使得卡夫卡起码知道种种key对应的结尾叁个值。

2)压缩不会重排序新闻。

3)消息的offset是不会变的。

4)新闻的offset是逐意气风发的。

Distribution

Consumer Offset Tracking

1)High-level consumer记录各类partition所开支的maximum offset,并限制期限commit到offset manager(broker)。

2)Simple consumer供给手动处理offset。现在的Simple consumer Java API只扶助commit offset到zookeeper。

Consumers and Consumer Groups

1)consumer注册到zookeeper

2)归属同三个group的consumer(group id同样)平均分配partition,每一个partition只会被一个consumer消费。

3)当broker或同三个group的其他consumer的情景爆发变化的时候,consumer rebalance就能够发出。

Zookeeper协和约束

1)管理broker与consumer的动态参加与离开。

2)触发负载均衡,当broker或consumer参与或离开时会触发负载均衡算法,使得二个consumer group内的几个consumer的订阅负载平衡。

3)维护花费关系及各样partition的花费新闻。

劳动者代码示例:

import java.util.*;

import kafka.javaapi.producer.Producer;

import kafka.producer.KeyedMessage;

import kafka.producer.ProducerConfig;

public class TestProducer {

public static void main(String[] args) {

long events = Long.parseLong(args[0]);

Random rnd = new Random();

Properties props = new Properties();

props.put("metadata.broker.list", "broker1:9092,broker2:9092 ");

props.put("serializer.class", "kafka.serializer.StringEncoder");

props.put("partitioner.class", "example.producer.SimplePartitioner");

props.put("request.required.acks", "1");

ProducerConfig config = new ProducerConfig(props);

Producer producer = new Producer(config);

for (long nEvents = 0; nEvents < events; nEvents ) {

long runtime = new Date().getTime();

String ip = “192.168.2.” rnd.nextInt(255);

String msg = runtime “,www.example.com,” ip;

KeyedMessage data = new KeyedMessage("page_visits", ip, msg);

producer.send(data);

}

producer.close();

}

}

Partitioning Code:

import kafka.producer.Partitioner;

import kafka.utils.VerifiableProperties;

public class SimplePartitioner implements Partitioner {

public SimplePartitioner (VerifiableProperties props) {

}

public int partition(Object key, int a_numPartitions) {

int partition = 0;

String stringKey = (String) key;

int offset = stringKey.lastIndexOf('.');

if (offset > 0) {

partition = Integer.parseInt( stringKey.substring(offset 1)) % a_numPartitions;

}

return partition;

}

}

买主代码示例:

import kafka.consumer.ConsumerConfig;

import kafka.consumer.KafkaStream;

import kafka.javaapi.consumer.ConsumerConnector;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

public class ConsumerGroupExample {

private final ConsumerConnector consumer;

private final String topic;

private ExecutorService executor;

public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) {

consumer = kafka.consumer.Consumer.createJavaConsumerConnector(

createConsumerConfig(a_zookeeper, a_groupId));

this.topic = a_topic;

}

public void shutdown() {

if (consumer != null) consumer.shutdown();

if (executor != null) executor.shutdown();

try {

if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {

System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly");

}

} catch (InterruptedException e) {

System.out.println("Interrupted during shutdown, exiting uncleanly");

}

}

public void run(int a_numThreads) {

Map topicCountMap = new HashMap();

topicCountMap.put(topic, new Integer(a_numThreads));

Map>> consumerMap = consumer.createMessageStreams(topicCountMap);

List> streams = consumerMap.get(topic);

// now launch all the threads

//

澳门新葡亰网站所有平台,executor = Executors.newFixedThreadPool(a_numThreads);

// now create an object to consume the messages

//

int threadNumber = 0;

for (final KafkaStream stream : streams) {

executor.submit(new ConsumerTest(stream, threadNumber));

threadNumber ;

}

}

private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {

Properties props = new Properties();

props.put("zookeeper.connect", a_zookeeper);

props.put("group.id", a_groupId);

props.put("zookeeper.session.timeout.ms", "400");

props.put("zookeeper.sync.time.ms", "200");

props.put("auto.commit.interval.ms", "1000");

return new ConsumerConfig(props);

}

public static void main(String[] args) {

String zooKeeper = args[0];

String groupId = args[1];

String topic = args[2];

int threads = Integer.parseInt(args[3]);

ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);

example.run(threads);

try {

Thread.sleep(10000);

} catch (InterruptedException ie) {

}

example.shutdown();

}

}

import kafka.consumer.ConsumerIterator;

import kafka.consumer.KafkaStream;

public class ConsumerTest implements Runnable {

private KafkaStream m_stream;

private int m_threadNumber;

public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {

m_threadNumber = a_threadNumber;

m_stream = a_stream;

}

public void run() {

ConsumerIterator it = m_stream.iterator();

while (it.hasNext())

System.out.println("Thread " m_threadNumber ": " new String(it.next().message()));

System.out.println("Shutting down Thread: " m_threadNumber);

}

}

本文由澳门新葡亰平台发布于古典文学,转载请注明出处:Kafka代码API

您可能还会对下面的文章感兴趣: