购买
下载掌阅APP,畅读海量书库
立即打开
畅读海量书库
扫码下载掌阅APP

第4章 分布式消息队列Kafka

第2章、第3章介绍了两大类常见数据类型,即关系型数据和非关系型数据的收集。读者通过这些内容的学习已经了解了如何构建数据收集流水线,进而将不同服务器上的数据收集到中央化的存储系统中。接下来,我们将介绍构建数据流水线过程中常用的另外一类组件——消息队列。

在实际应用中,不同服务器( 数据生产者 )产生的日志,比如指标监控数据、用户搜索日志、用点击日志等,需要同时传送到多个系统中以便进行相应的逻辑处理和挖掘,比如指标监控数据可能被同时写入Hadoop和Storm集群( 数据消费者 )进行离线和实时分析。为了降低数据生产者和消费者之间的耦合性、平衡两者处理能力的不对等,消息队列出现了。消息队列是位于生产者和消费者之间的“中间件”,它解除了生产者和消费者的直接依赖关系,使得软件架构更容易扩展和伸缩;它能够缓冲生产者产生的数据,防止消费者无法及时处理生产者产生的数据。本章将以大数据领域最常用的分布式消息队列Kafka为例,剖析分布式消息队列的设计动机、基本架构以及应用场景等。

4.1 概述

4.1.1 Kafka设计动机

每个公司的业务复杂度及产生的数据量都是在不断增加的。如图4-1所示,公司刚起步时,业务简单,此时只需要一条数据流水线即可,即从前端机器上收集日志,直接导入后端的存储系统中进行分析;当业务规模发展到一定程度后,业务逻辑会变得复杂起来,数据量也会越来越多,此时可能需要增加多条数据线,每条数据线将收集到的数据导入不同的存储和分析系统中。此时若仍采用之前的数据收集模式,将收集到的数据直接写入后端,则会产生以下几个潜在的问题:

图4-1 数据流水演化

图4-2 Kafka在数据流中扮演的角色

为了解决以上这些问题,降低数据生产者(比如Web Server)与消费者(比如Hadoop集群、实时监控系统等)之间的耦合性,使系统更易扩展,需引入一层“中间件”,这正是Kafka担任的角色,如图4-2所示。可从以下几个角度理解Kafka的重要地位:

Kafka与AMQP的关系: 尽管Kafka可看作一个消息队列,但与ZeroMQ 、RabbitMQ 等消息队列不同,它不遵循AMQP(Advanced Message Queuing Protocol)协议标准 ,而是在大数据场景下设计的,有自己的特色和优势。

Flume与Kafka的区别: Flume和Kafka在架构和应用定位上均有较大不同,Kafka中存储的数据是多副本的,能够做到数据不丢,而Flume提供的memory channel和file channel均做不到;Kafka可将数据暂存一段时间(默认是一周),供消费者重复读取,提供了类似于“发布订阅模式”的功能,而Flume Sink发送数据成功后会立刻将之删除;Kafka的生产者和消费者均需要用户使用API编写,仅提供了少量的与外部系统集成的组件,而Flume则提供了大量的Source和Sink实现,能够更容易地完成数据收集工作。由于两者各具长,我们通常会选择同时使用这两个系统,具体会在后面几节介绍。另外,网上有大量讨论Flume和Kafka区别的文章,读者可自行查阅。

4.1.2 Kafka特点

Kafka是在大数据背景下产生的,能应对海量数据的处理场景,具有高性能、良好的扩展性、数据持久性等特点。

4.2 Kafka设计架构

Kafka是一个分布式消息队列,它将数据分区保存,并将每个分区保存成多份以提高数据可靠性。本节将从设计架构角度剖析Kafka。

4.2.1 Kafka基本架构

如图4-3所示,Kafka架构由Producer、Broker和Consumer三类组件构成,其中Producer将数据写入Broker,Consumer则从Broker上读取数据进行处理,而Broker构成了连接Producer和Consumer的“缓冲区”。Broker和Consumer通过ZooKeeper做协调和服务发现 。多个Broker构成一个可靠的分布式消息存储系统,避免数据丢失。Broker中的消息被划分成若干个topic,同属一个topic的所有数据按照某种策略被分成多个partition,以实现负载分摊和数据并行处理。

图4-3 Kafka基本架构

Kafka采用了不同于其他消息队列的push-push架构,而是采用了push-pull架构,即Producer将数据直接“push”给Broker,而Consumer从Broker端“pull”数据,这种架构优势主要体现在以下两点:

4.2.2 Kafka各组件详解

本节将详细剖析Kafka各组件,涉及Producer、Broker、Consumer及ZooKeeper各自的功能、设计要点等。

1. Kafka Producer

Kafka Producer是由用户使用Kafka提供的SDK开发的,Producer将数据转化成“消息”,并通过网络发送给Broker。

在Kafka中,每条数据被称为“消息”,每条消息表示为一个三元组:

<topic, key, message>

每个元素表示的含义如下:

图4-4 Kafka Producer写消息过程

Kafka Producer发送消息时,不需要指定所有Broker的地址,只需给定一个或几个初始化Broker地址即可(一般给定多于一个以达到容错的目的),Producer可通过指定的Broker获取其他所有Broker的位置信息,并自动实现负载均衡。

2. Kafka Broker

在Kafka中,Broker一般有多个,它们组成一个分布式高容错的集群。Broker的主要职责是接受Producer和Consumer的请求,并把消息持久化到本地磁盘。如图4-5所示,Broker以topic为单位将消息分成不同的分区(partition),每个分区可以有多个副本,通过数据冗余的方式实现容错。当partition存在多个副本时,其中有一个是leader,对外提供读写请求,其他均是follower,不对外提供读写服务,只是同步leader中的数据,并在leader出现问题时,通过选举算法将其中的某一个提升为leader。

图4-5 Kafka Broker集群

Kafka Broker能够保证同一topic下同一partition内部的消息是有序的,但无法保证partition之间的消息全局有序,这意味着一个Consumer读取某个topic下(多个分区中,如图4-6所示)的消息时,可能得到跟写入顺序不一致的消息序列。但在实际应用中,合理利用分区内部有序这一特征即可完成时序相关的需求。

图4-6 Kafka Broker数据分区

Kafka Broker以追加的方式将消息写到磁盘文件中,且每个分区中的消息被赋予了唯一整数标识,称之为“offset”(偏移量),如图4-6所示,Broker仅提供基于offset的读取方式,不会维护各个Consumer当前已消费消息的offset值,而是由Consumer各自维护当前读取的进度。Consumer读取数据时告诉Broker请求消息的起始offset值,Broker将之后的消息流式发送过去。

Broker中保存的数据是有有效期的,比如7天,一旦超过了有效期,对应的数据将被移除以释放磁盘空间。只要数据在有效期内,Consumer可以重复读取而不受限制。

3. Kafka Consumer

Kafka Consumer主动从Kafka Broker拉取消息进行处理。每个Kafka Consumer自己维护最后一个已读取消息的offset,并在下次请求从这个offset开始的消息,这一点不同于ZeroMQ、RabbitMQ等其他消息队列,这种基于pull的机制大大降低了Broker的压力,使得Kafka Broker的吞吐率很高。

如图4-7所示,Kafka允许多个Consumer构成一个Consumer Group,共同读取同一topic中的数据,提高数据读取效率。Kafka可自动为同一Group中的Consumer分摊负载,从而实现消息的并发读取,并在某个Consumer发生故障时,自动将它处理的partition转移给同Group中其他Consumer处理。

图4-7 Kafka Consumer Group原理

4. ZooKeeper

在一个Kafka集群中,ZooKeeper担任分布式服务协调的作用,Broker和Consumer直接依赖于ZooKeeper才能正常工作:

4.2.3 Kafka关键技术点

4.1.2节中提到,Kafka作为一个分布式消息队列,具有高性能、良好的扩展性、数据持久性等特点,本节将从几个方面深入剖析Kafka实现这些设计目标所采用的关键技术点。

1. 可控的可靠性级别

Producer可通过两种方式向Broker发送数据:同步方式与异步方式,其中异步方式通过批处理的方式,可大大提高数据写入效率。不管是何种数据发送方式,Producer均能通过控制消息应答方式,在写性能与可靠性之间做一个较好的权衡。

当Producer向Broker发送一条消息时,可通过设置该消息的确认应答方式,控制写性能与可靠性级别。在实际系统中,写性能和可靠性级别是两个此消彼长的指标,当可靠性级别较高时(每条消息确保成功写入多个副本),写性能则会降低,用户可根据实际需要进行设置。目前Kafka支持三种消息应答方式,可通过参数request.required.acks控制:

2. 数据多副本

Kafka Broker允许为每个topic中的数据存放多个副本,以达到容错的目的。Kafka采用了强一致的数据复制策略,如图4-8所示,消息首先被写入leader partition,之后由leader partition负责将收到的消息同步给其他副本。Leader Partition负责对外的读写服务,而follower partition仅负责同步数据,并在leader partition出现故障时,通过选举的方式竞选成为leader。Kafka Broker负载均衡实际上是对leader partition的负载均衡,即保证leader partition在各个Broker上数目尽可能相近。

图4-8 Kafka Broker多副本放置

3. 高效的持久化机制

为了应对大数据应用场景,Kafka Broker直接将消息持久化到磁盘上而不是内存中,这要求必须采用高效的数据写入和存储方式。实际上,当将数据写入磁盘时,采用顺序写的速度要远高于随机写,经测试 ,在同样环境下,顺序写速度为600MB/s,而随机写仅达到100KB/s,两者相差6000倍。基于此,Kafka Broker将收到的数据顺序写入磁盘,并结合基于offset的数据组织方式,能达到很高效的读速度和写速度。

4. 数据传输优化:批处理与zero-copy技术

为了优化Broker与Consumer之间的网络数据传输效率,Kafka引入了大量优化技术,典型的两个代表是批处理和zero-copy技术。

5. 可控的消息传递语义

在消息系统中,根据接收者可能收到重复消息的次数,将消息传递语义分为三种:

1)at most once: 发送者将消息发送给消费者后,立刻返回,不会关心消费者是否成功收到消息。这种情况下,消息可能被消费者成功接收,也可能丢失。

图4-9 application-copy与zero-copy对比

2)at least once: 发送者将消息发送给消费者后,需等待确认,如果未收到确认消息,则会重发消息。这种语义能保证消费者收到消息,但可能会收到多次。

3)exactly once: 消费者会且只会处理一次同一条消息。为实现该语义,通常有两种常用技术手段:

对于Kafka而言,Producer与Broker,以及Broker与Consumer之间,均存在消息传递语义问题,下面分别讨论:

4.3 Kafka程序设计

前面对Kafka的基本架构和关键技术进行了介绍,而本节将介绍如何使用Kafka提供的SDK开发Producer和Consumer,即如何往Kafka Broker中写数据,以及如何从Broker上读取数据。

4.3.1 Producer程序设计

Producer负责将数据写入Broker,通常由用户根据实际需要编写,其相关类如下:

//  K: key的数据类型, V:message的数据类型
class kafka.javaapi.producer.Producer<K,V> {
    // 构造函数,参数为配置对象ProducerConfig
    public Producer(ProducerConfig config);

    /**
     * 将数据同步或异步发送到Broker上,并按照key进行分区
     * @param message 封装了topic,key和message三元组的对象
     */
    public void send(KeyedMessage<K,V> message);

    // 将一批消息发送出去
    public void send(List<KeyedMessage<K,V>> messages);

    // 关闭与所有Kafka Broker的连接
    public void close();
}

为了使用该类设计一个Producer程序,可按照以下步骤操作:

(1)创建配置对象ProducerConfig

用户可为Producer设置一些参数,控制Producer的行为,常见的配置参数如表4-1所示(更多参数可参考Kafka官方文档 ):

表4-1 Producer常见的配置参数

举例如下:

Properties props = new Properties();
props.put("metadata.broker.list", "broker1:9092,broker2:9092");
// 在此使用StringEncoder,注意,该类必须与KeyedMessage中的定义一致
props.put("serializer.class", "kafka.serializer.StringEncoder");
// 设置自定义分区类example.producer.SimplePartitioner
props.put("partitioner.class", "example.producer.SimplePartitioner");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);

(2)定义分区类SimplePartitioner

用户可通过实现kafka.producer.Partitioner接口实现自己的分区类(重载并实现partition方法),前一个实例中SimplePartitioner实现如下:

public class SimplePartitioner implements Partitioner {
public SimplePartitioner (VerifiableProperties props) {}
    // 根据ip最后一段分区,划分到partitions个分区(用户创建topic时指定的)中
public int partition(Object key, int partitions) {
int partition = 0;
String sKey= (String) key;
int offset = sKey.lastIndexOf('.');
if (offset > 0) {
        partition = Integer.parseInt(sKey.substring(offset+1)) % partitions;
}
return partition;
}
}

(3)创建Producer对象,并发送数据

下面的实例尝试发送一些构造的消息,消息的topic为“page_visits”,key为IP,message为访问时间与被访问的url,代码如下:

// 创建producer
Producer<String, String> producer = new Producer<String, String>(config);
// 产生并发送消息
for (long i = 0; i < events; i++) {
    long runtime = new Date().getTime();
    String ip = "192.168.2." + i;
    String msg = runtime + ",www.example.com," + ip;
    //如果topic不存在,则会自动创建,默认数据的副本数为1
    KeyedMessage<String, String> data = new KeyedMessage<String, String>(
                    "page_visits", ip, msg);
    producer.send(data);
}
producer.close();

从0.8.2版本开始,Kafka提供了一个新的线程安全的Producer实现org.apache.kafka.clients.producer,它在后台维护了一个线程处理I/O请求和与各个Broker的网络连接,并在Producer退出时自动关闭这些网络连接。有兴趣的读者可尝试该Producer实现。

4.3.2 Consumer程序设计

Kafka提供了两种Consumer API,分别为high-level API和low-level API,其中high-level API是一种高度封装的API,它自动帮你管理和持久化offset,并将多个Consumer抽象成一个Consumer Group,为这些Consumer分摊负载,处理容错等,这使得用户开发程序更加容易;low-level API是一种底层原生态的API,仅提供了消息获取接口,用户需自己管理offset,处理容错(Broker失败时,其上leader partition会转移到其他Broker上)等,一般而言,使用high-level API即可满足实际需求,除非遇到以下场景:

Cousumer负责从Broker中读取数据,通常由用户根据实际需要编写,high-level API相关类如下:

class Consumer {
// 创建一个Consumer连接句柄ConsumerConnector
    public static kafka.javaapi.consumer.ConsumerConnector
        createJavaConsumerConnector(ConsumerConfig config);
}

public interface kafka.javaapi.consumer.ConsumerConnector {
public Map<String, List<KafkaStream<byte[], byte[]>>> 
        createMessageStreams(Map<String, Integer> topicCountMap);
…… //其中createMessageStreams重载函数

    // 提交当前已读消息的offset(持久化到zookeeper)
    public void commitOffsets();

//关闭connector
    public void shutdown();
}

为了使用以上两个类设计一个Consumer程序,可按照以下步骤操作:

步骤1:创建配置对象ConsumerConfig

用户可为Consumer设置一些参数,控制Consumer的行为,常见的配置参数如表4-2所示(更多参数可参考Kafka官方文档 )。

表4-2 常见的Consumer配置参数

举例如下:

Properties props = new Properties();
props.put("zookeeper.connect", "zk1:2181,zk2:2181,zk3:2181");
props.put("group.id", "kafka-test");
//启动时,从最小offset处开始读取消息
props.put("auto.offset.reset", "smallest"); 
ConsumerConfig consumerConfig = new ConsumerConfig(props);

步骤2:创建Consumer Group并启动所有Consumer

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
int numThreads = 5;
topicCountMap.put(topic, numThreads);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
// 启动所有consumer
executor = Executors.newFixedThreadPool(numThreads);
for (final KafkaStream stream : streams) {
    executor.submit(new ConsumerRunner(stream));
}

其中ConsumerRunner会通过KafkaStream中的迭代器流式读取Broker中的数据,实现如下:

public class ConsumerRunner implements Runnable {
private KafkaStream stream; 
public ConsumerRunner(KafkaStream stream) {
this.stream = stream;
}
public void run() {
    // 得到消息迭代器
ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
while (it.hasNext())
    // 通过it.next().message()获取消息,并处理
        }
}

使用high-level API时,需注意以下几点:

4.3.3 开源Producer与Consumer实现

由于Kafka在大数据领域应用越来越广泛,很多大数据开源系统均主动增加了对Kafka的支持,本节列举了几个常用的组件:

Kafka Producer常用组件为Flume Kafka Sink: Flume软件包中内置的Kafka Producer,可将Flume Channel中的数据写入Kafka。

Kafka Consumer

4.4 Kafka典型应用场景

Kafka作为一个分布式消息队列,在多种大数据应用场景中得到广泛使用,举例如下:

· 消息队列

与RabbitMQ和ZeroMQ等开源消息队列相比,Kafka具有高吞吐率、自动分区、多副本以及良好的容错性等特点,这使得它非常适合大数据应用场景。

· 流式计算框架的数据源

在流式计算框架(比如Storm,Spark Streaming等,具体可参考第13章流式实时计算引擎)中,为了保证数据不丢失,具备“at least once”数据发送语意,通常在数据源中使用一个高性能的消息队列,从而形成了“分布式消息队列+分布式流式计算框架”的实时计算架构,架构如图4-10所示。

图4-10 Kafka与流式计算框架组合架构

· 分布式日志收集系统中的Source或Sink

可与日志收集组件Flume或Logstash 组合使用,担任Source或Sink的角色,如图4-11所示,Flume或Logstash提供可配置化的Source和Sink,Kafka提供分布式高可用的消息系统,进而形成一个在扩展性、吞吐率等方面都非常优秀的分布式系统。

图4-11 Kafka在日志收集系统中用作Source或Sink

· Lambda Architecture中的Source

同时为批处理和流式处理两条流水线提供数据源,如图4-12所示,各种流式数据直接写入Kafka,之后由Hadoop批处理集群和Storm流式处理集群分别读取,进行相应的处理,最终将结果合并后,呈献给用户。

图4-12 Kafka同时担任批处理和流式处理系统的数据源

4.5 小结

Kafka是在大数据背景下产生的,能应对海量数据的处理场景,具有高性能、良好的扩展性、数据持久性等特点。

Kafka架构由Producer,Broker和Consumer三类组件构成,其中Producer将数据写入Broker,Consumer则从Broker上读取数据进行处理,而Broker构成了连接Producer和Consumer的“缓冲区”。Kafka Broker中的消息被划分成若干个topic,同属一个topic的所有数据按照某种策略被分成多个partition,以实现负载分摊和数据并行处理。Kafka采用了多种优化手段保证它的高性能、扩展性等优点。

本章首先介绍了Kafka设计动机和特点,之后对其设计架构进行了剖析,最后从应用开发的角度,介绍了如何开发Kafka Producer和Consumer两类组件。

4.6 本章问题

问题1: Kafka不能保证数据的时序性,即Producer依次将数据X、Y写入Broker,Consumer读出的数据顺序可能是Y、X。如果想在使用Kafka时,能够保证数据时序,有哪些可行方案?

问题2: 试比较Kafka与Flume的异同。

问题3: 尝试使用low-level API编写Consumer读取数据,并将offset保存到ZooKeeper以便故障时恢复。

问题4: 设置合理的Source或Sink,分别使用以下两种方式集成Flume和Kafka:

问题5: 解释Kafka中以下几个概念:

问题6: LinkedIn开源了大数据集成框架Gobblin( https://github.com/linkedin/gobblin ),可以将Kafka中的数据批量导入HDFS中,尝试使用Gobblin将Kafka中出题为“log”的数据写入HDFS下的/data/log目录中。 4CLEW89rux87zSDTo3lyo/HJfFCh7ljXMN3/4qrVtXCRsLxu9ERIirfANeV4qKjO

点击中间区域
呼出菜单
上一章
目录
下一章
×