第2章、第3章介绍了两大类常见数据类型,即关系型数据和非关系型数据的收集。读者通过这些内容的学习已经了解了如何构建数据收集流水线,进而将不同服务器上的数据收集到中央化的存储系统中。接下来,我们将介绍构建数据流水线过程中常用的另外一类组件——消息队列。
在实际应用中,不同服务器( 数据生产者 )产生的日志,比如指标监控数据、用户搜索日志、用点击日志等,需要同时传送到多个系统中以便进行相应的逻辑处理和挖掘,比如指标监控数据可能被同时写入Hadoop和Storm集群( 数据消费者 )进行离线和实时分析。为了降低数据生产者和消费者之间的耦合性、平衡两者处理能力的不对等,消息队列出现了。消息队列是位于生产者和消费者之间的“中间件”,它解除了生产者和消费者的直接依赖关系,使得软件架构更容易扩展和伸缩;它能够缓冲生产者产生的数据,防止消费者无法及时处理生产者产生的数据。本章将以大数据领域最常用的分布式消息队列Kafka为例,剖析分布式消息队列的设计动机、基本架构以及应用场景等。
每个公司的业务复杂度及产生的数据量都是在不断增加的。如图4-1所示,公司刚起步时,业务简单,此时只需要一条数据流水线即可,即从前端机器上收集日志,直接导入后端的存储系统中进行分析;当业务规模发展到一定程度后,业务逻辑会变得复杂起来,数据量也会越来越多,此时可能需要增加多条数据线,每条数据线将收集到的数据导入不同的存储和分析系统中。此时若仍采用之前的数据收集模式,将收集到的数据直接写入后端,则会产生以下几个潜在的问题:
图4-1 数据流水演化
图4-2 Kafka在数据流中扮演的角色
为了解决以上这些问题,降低数据生产者(比如Web Server)与消费者(比如Hadoop集群、实时监控系统等)之间的耦合性,使系统更易扩展,需引入一层“中间件”,这正是Kafka担任的角色,如图4-2所示。可从以下几个角度理解Kafka的重要地位:
Kafka与AMQP的关系: 尽管Kafka可看作一个消息队列,但与ZeroMQ 、RabbitMQ 等消息队列不同,它不遵循AMQP(Advanced Message Queuing Protocol)协议标准 ,而是在大数据场景下设计的,有自己的特色和优势。
Flume与Kafka的区别: Flume和Kafka在架构和应用定位上均有较大不同,Kafka中存储的数据是多副本的,能够做到数据不丢,而Flume提供的memory channel和file channel均做不到;Kafka可将数据暂存一段时间(默认是一周),供消费者重复读取,提供了类似于“发布订阅模式”的功能,而Flume Sink发送数据成功后会立刻将之删除;Kafka的生产者和消费者均需要用户使用API编写,仅提供了少量的与外部系统集成的组件,而Flume则提供了大量的Source和Sink实现,能够更容易地完成数据收集工作。由于两者各具长,我们通常会选择同时使用这两个系统,具体会在后面几节介绍。另外,网上有大量讨论Flume和Kafka区别的文章,读者可自行查阅。
Kafka是在大数据背景下产生的,能应对海量数据的处理场景,具有高性能、良好的扩展性、数据持久性等特点。
Kafka是一个分布式消息队列,它将数据分区保存,并将每个分区保存成多份以提高数据可靠性。本节将从设计架构角度剖析Kafka。
如图4-3所示,Kafka架构由Producer、Broker和Consumer三类组件构成,其中Producer将数据写入Broker,Consumer则从Broker上读取数据进行处理,而Broker构成了连接Producer和Consumer的“缓冲区”。Broker和Consumer通过ZooKeeper做协调和服务发现 。多个Broker构成一个可靠的分布式消息存储系统,避免数据丢失。Broker中的消息被划分成若干个topic,同属一个topic的所有数据按照某种策略被分成多个partition,以实现负载分摊和数据并行处理。
图4-3 Kafka基本架构
Kafka采用了不同于其他消息队列的push-push架构,而是采用了push-pull架构,即Producer将数据直接“push”给Broker,而Consumer从Broker端“pull”数据,这种架构优势主要体现在以下两点:
本节将详细剖析Kafka各组件,涉及Producer、Broker、Consumer及ZooKeeper各自的功能、设计要点等。
1. Kafka Producer
Kafka Producer是由用户使用Kafka提供的SDK开发的,Producer将数据转化成“消息”,并通过网络发送给Broker。
在Kafka中,每条数据被称为“消息”,每条消息表示为一个三元组:
<topic, key, message>
每个元素表示的含义如下:
图4-4 Kafka Producer写消息过程
Kafka Producer发送消息时,不需要指定所有Broker的地址,只需给定一个或几个初始化Broker地址即可(一般给定多于一个以达到容错的目的),Producer可通过指定的Broker获取其他所有Broker的位置信息,并自动实现负载均衡。
2. Kafka Broker
在Kafka中,Broker一般有多个,它们组成一个分布式高容错的集群。Broker的主要职责是接受Producer和Consumer的请求,并把消息持久化到本地磁盘。如图4-5所示,Broker以topic为单位将消息分成不同的分区(partition),每个分区可以有多个副本,通过数据冗余的方式实现容错。当partition存在多个副本时,其中有一个是leader,对外提供读写请求,其他均是follower,不对外提供读写服务,只是同步leader中的数据,并在leader出现问题时,通过选举算法将其中的某一个提升为leader。
图4-5 Kafka Broker集群
Kafka Broker能够保证同一topic下同一partition内部的消息是有序的,但无法保证partition之间的消息全局有序,这意味着一个Consumer读取某个topic下(多个分区中,如图4-6所示)的消息时,可能得到跟写入顺序不一致的消息序列。但在实际应用中,合理利用分区内部有序这一特征即可完成时序相关的需求。
图4-6 Kafka Broker数据分区
Kafka Broker以追加的方式将消息写到磁盘文件中,且每个分区中的消息被赋予了唯一整数标识,称之为“offset”(偏移量),如图4-6所示,Broker仅提供基于offset的读取方式,不会维护各个Consumer当前已消费消息的offset值,而是由Consumer各自维护当前读取的进度。Consumer读取数据时告诉Broker请求消息的起始offset值,Broker将之后的消息流式发送过去。
Broker中保存的数据是有有效期的,比如7天,一旦超过了有效期,对应的数据将被移除以释放磁盘空间。只要数据在有效期内,Consumer可以重复读取而不受限制。
3. Kafka Consumer
Kafka Consumer主动从Kafka Broker拉取消息进行处理。每个Kafka Consumer自己维护最后一个已读取消息的offset,并在下次请求从这个offset开始的消息,这一点不同于ZeroMQ、RabbitMQ等其他消息队列,这种基于pull的机制大大降低了Broker的压力,使得Kafka Broker的吞吐率很高。
如图4-7所示,Kafka允许多个Consumer构成一个Consumer Group,共同读取同一topic中的数据,提高数据读取效率。Kafka可自动为同一Group中的Consumer分摊负载,从而实现消息的并发读取,并在某个Consumer发生故障时,自动将它处理的partition转移给同Group中其他Consumer处理。
图4-7 Kafka Consumer Group原理
4. ZooKeeper
在一个Kafka集群中,ZooKeeper担任分布式服务协调的作用,Broker和Consumer直接依赖于ZooKeeper才能正常工作:
4.1.2节中提到,Kafka作为一个分布式消息队列,具有高性能、良好的扩展性、数据持久性等特点,本节将从几个方面深入剖析Kafka实现这些设计目标所采用的关键技术点。
1. 可控的可靠性级别
Producer可通过两种方式向Broker发送数据:同步方式与异步方式,其中异步方式通过批处理的方式,可大大提高数据写入效率。不管是何种数据发送方式,Producer均能通过控制消息应答方式,在写性能与可靠性之间做一个较好的权衡。
当Producer向Broker发送一条消息时,可通过设置该消息的确认应答方式,控制写性能与可靠性级别。在实际系统中,写性能和可靠性级别是两个此消彼长的指标,当可靠性级别较高时(每条消息确保成功写入多个副本),写性能则会降低,用户可根据实际需要进行设置。目前Kafka支持三种消息应答方式,可通过参数request.required.acks控制:
2. 数据多副本
Kafka Broker允许为每个topic中的数据存放多个副本,以达到容错的目的。Kafka采用了强一致的数据复制策略,如图4-8所示,消息首先被写入leader partition,之后由leader partition负责将收到的消息同步给其他副本。Leader Partition负责对外的读写服务,而follower partition仅负责同步数据,并在leader partition出现故障时,通过选举的方式竞选成为leader。Kafka Broker负载均衡实际上是对leader partition的负载均衡,即保证leader partition在各个Broker上数目尽可能相近。
图4-8 Kafka Broker多副本放置
3. 高效的持久化机制
为了应对大数据应用场景,Kafka Broker直接将消息持久化到磁盘上而不是内存中,这要求必须采用高效的数据写入和存储方式。实际上,当将数据写入磁盘时,采用顺序写的速度要远高于随机写,经测试 ,在同样环境下,顺序写速度为600MB/s,而随机写仅达到100KB/s,两者相差6000倍。基于此,Kafka Broker将收到的数据顺序写入磁盘,并结合基于offset的数据组织方式,能达到很高效的读速度和写速度。
4. 数据传输优化:批处理与zero-copy技术
为了优化Broker与Consumer之间的网络数据传输效率,Kafka引入了大量优化技术,典型的两个代表是批处理和zero-copy技术。
5. 可控的消息传递语义
在消息系统中,根据接收者可能收到重复消息的次数,将消息传递语义分为三种:
1)at most once: 发送者将消息发送给消费者后,立刻返回,不会关心消费者是否成功收到消息。这种情况下,消息可能被消费者成功接收,也可能丢失。
图4-9 application-copy与zero-copy对比
2)at least once: 发送者将消息发送给消费者后,需等待确认,如果未收到确认消息,则会重发消息。这种语义能保证消费者收到消息,但可能会收到多次。
3)exactly once: 消费者会且只会处理一次同一条消息。为实现该语义,通常有两种常用技术手段:
对于Kafka而言,Producer与Broker,以及Broker与Consumer之间,均存在消息传递语义问题,下面分别讨论:
前面对Kafka的基本架构和关键技术进行了介绍,而本节将介绍如何使用Kafka提供的SDK开发Producer和Consumer,即如何往Kafka Broker中写数据,以及如何从Broker上读取数据。
Producer负责将数据写入Broker,通常由用户根据实际需要编写,其相关类如下:
// K: key的数据类型, V:message的数据类型 class kafka.javaapi.producer.Producer<K,V> { // 构造函数,参数为配置对象ProducerConfig public Producer(ProducerConfig config); /** * 将数据同步或异步发送到Broker上,并按照key进行分区 * @param message 封装了topic,key和message三元组的对象 */ public void send(KeyedMessage<K,V> message); // 将一批消息发送出去 public void send(List<KeyedMessage<K,V>> messages); // 关闭与所有Kafka Broker的连接 public void close(); }
为了使用该类设计一个Producer程序,可按照以下步骤操作:
(1)创建配置对象ProducerConfig
用户可为Producer设置一些参数,控制Producer的行为,常见的配置参数如表4-1所示(更多参数可参考Kafka官方文档 ):
表4-1 Producer常见的配置参数
举例如下:
Properties props = new Properties(); props.put("metadata.broker.list", "broker1:9092,broker2:9092"); // 在此使用StringEncoder,注意,该类必须与KeyedMessage中的定义一致 props.put("serializer.class", "kafka.serializer.StringEncoder"); // 设置自定义分区类example.producer.SimplePartitioner props.put("partitioner.class", "example.producer.SimplePartitioner"); props.put("request.required.acks", "1"); ProducerConfig config = new ProducerConfig(props);
(2)定义分区类SimplePartitioner
用户可通过实现kafka.producer.Partitioner接口实现自己的分区类(重载并实现partition方法),前一个实例中SimplePartitioner实现如下:
public class SimplePartitioner implements Partitioner { public SimplePartitioner (VerifiableProperties props) {} // 根据ip最后一段分区,划分到partitions个分区(用户创建topic时指定的)中 public int partition(Object key, int partitions) { int partition = 0; String sKey= (String) key; int offset = sKey.lastIndexOf('.'); if (offset > 0) { partition = Integer.parseInt(sKey.substring(offset+1)) % partitions; } return partition; } }
(3)创建Producer对象,并发送数据
下面的实例尝试发送一些构造的消息,消息的topic为“page_visits”,key为IP,message为访问时间与被访问的url,代码如下:
// 创建producer Producer<String, String> producer = new Producer<String, String>(config); // 产生并发送消息 for (long i = 0; i < events; i++) { long runtime = new Date().getTime(); String ip = "192.168.2." + i; String msg = runtime + ",www.example.com," + ip; //如果topic不存在,则会自动创建,默认数据的副本数为1 KeyedMessage<String, String> data = new KeyedMessage<String, String>( "page_visits", ip, msg); producer.send(data); } producer.close();
从0.8.2版本开始,Kafka提供了一个新的线程安全的Producer实现org.apache.kafka.clients.producer,它在后台维护了一个线程处理I/O请求和与各个Broker的网络连接,并在Producer退出时自动关闭这些网络连接。有兴趣的读者可尝试该Producer实现。
Kafka提供了两种Consumer API,分别为high-level API和low-level API,其中high-level API是一种高度封装的API,它自动帮你管理和持久化offset,并将多个Consumer抽象成一个Consumer Group,为这些Consumer分摊负载,处理容错等,这使得用户开发程序更加容易;low-level API是一种底层原生态的API,仅提供了消息获取接口,用户需自己管理offset,处理容错(Broker失败时,其上leader partition会转移到其他Broker上)等,一般而言,使用high-level API即可满足实际需求,除非遇到以下场景:
Cousumer负责从Broker中读取数据,通常由用户根据实际需要编写,high-level API相关类如下:
class Consumer { // 创建一个Consumer连接句柄ConsumerConnector public static kafka.javaapi.consumer.ConsumerConnector createJavaConsumerConnector(ConsumerConfig config); } public interface kafka.javaapi.consumer.ConsumerConnector { public Map<String, List<KafkaStream<byte[], byte[]>>> createMessageStreams(Map<String, Integer> topicCountMap); …… //其中createMessageStreams重载函数 // 提交当前已读消息的offset(持久化到zookeeper) public void commitOffsets(); //关闭connector public void shutdown(); }
为了使用以上两个类设计一个Consumer程序,可按照以下步骤操作:
步骤1:创建配置对象ConsumerConfig 。
用户可为Consumer设置一些参数,控制Consumer的行为,常见的配置参数如表4-2所示(更多参数可参考Kafka官方文档 )。
表4-2 常见的Consumer配置参数
举例如下:
Properties props = new Properties(); props.put("zookeeper.connect", "zk1:2181,zk2:2181,zk3:2181"); props.put("group.id", "kafka-test"); //启动时,从最小offset处开始读取消息 props.put("auto.offset.reset", "smallest"); ConsumerConfig consumerConfig = new ConsumerConfig(props);
步骤2:创建Consumer Group并启动所有Consumer 。
Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); int numThreads = 5; topicCountMap.put(topic, numThreads); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); // 启动所有consumer executor = Executors.newFixedThreadPool(numThreads); for (final KafkaStream stream : streams) { executor.submit(new ConsumerRunner(stream)); }
其中ConsumerRunner会通过KafkaStream中的迭代器流式读取Broker中的数据,实现如下:
public class ConsumerRunner implements Runnable { private KafkaStream stream; public ConsumerRunner(KafkaStream stream) { this.stream = stream; } public void run() { // 得到消息迭代器 ConsumerIterator<byte[], byte[]> it = m_stream.iterator(); while (it.hasNext()) // 通过it.next().message()获取消息,并处理 } }
使用high-level API时,需注意以下几点:
由于Kafka在大数据领域应用越来越广泛,很多大数据开源系统均主动增加了对Kafka的支持,本节列举了几个常用的组件:
Kafka Producer常用组件为Flume Kafka Sink: Flume软件包中内置的Kafka Producer,可将Flume Channel中的数据写入Kafka。
Kafka Consumer
Kafka作为一个分布式消息队列,在多种大数据应用场景中得到广泛使用,举例如下:
· 消息队列
与RabbitMQ和ZeroMQ等开源消息队列相比,Kafka具有高吞吐率、自动分区、多副本以及良好的容错性等特点,这使得它非常适合大数据应用场景。
· 流式计算框架的数据源
在流式计算框架(比如Storm,Spark Streaming等,具体可参考第13章流式实时计算引擎)中,为了保证数据不丢失,具备“at least once”数据发送语意,通常在数据源中使用一个高性能的消息队列,从而形成了“分布式消息队列+分布式流式计算框架”的实时计算架构,架构如图4-10所示。
图4-10 Kafka与流式计算框架组合架构
· 分布式日志收集系统中的Source或Sink
可与日志收集组件Flume或Logstash 组合使用,担任Source或Sink的角色,如图4-11所示,Flume或Logstash提供可配置化的Source和Sink,Kafka提供分布式高可用的消息系统,进而形成一个在扩展性、吞吐率等方面都非常优秀的分布式系统。
图4-11 Kafka在日志收集系统中用作Source或Sink
· Lambda Architecture中的Source
同时为批处理和流式处理两条流水线提供数据源,如图4-12所示,各种流式数据直接写入Kafka,之后由Hadoop批处理集群和Storm流式处理集群分别读取,进行相应的处理,最终将结果合并后,呈献给用户。
图4-12 Kafka同时担任批处理和流式处理系统的数据源
Kafka是在大数据背景下产生的,能应对海量数据的处理场景,具有高性能、良好的扩展性、数据持久性等特点。
Kafka架构由Producer,Broker和Consumer三类组件构成,其中Producer将数据写入Broker,Consumer则从Broker上读取数据进行处理,而Broker构成了连接Producer和Consumer的“缓冲区”。Kafka Broker中的消息被划分成若干个topic,同属一个topic的所有数据按照某种策略被分成多个partition,以实现负载分摊和数据并行处理。Kafka采用了多种优化手段保证它的高性能、扩展性等优点。
本章首先介绍了Kafka设计动机和特点,之后对其设计架构进行了剖析,最后从应用开发的角度,介绍了如何开发Kafka Producer和Consumer两类组件。
问题1: Kafka不能保证数据的时序性,即Producer依次将数据X、Y写入Broker,Consumer读出的数据顺序可能是Y、X。如果想在使用Kafka时,能够保证数据时序,有哪些可行方案?
问题2: 试比较Kafka与Flume的异同。
问题3: 尝试使用low-level API编写Consumer读取数据,并将offset保存到ZooKeeper以便故障时恢复。
问题4: 设置合理的Source或Sink,分别使用以下两种方式集成Flume和Kafka:
问题5: 解释Kafka中以下几个概念:
问题6: LinkedIn开源了大数据集成框架Gobblin( https://github.com/linkedin/gobblin ),可以将Kafka中的数据批量导入HDFS中,尝试使用Gobblin将Kafka中出题为“log”的数据写入HDFS下的/data/log目录中。