第2章介绍了关系型数据的收集,即如何将关系型数据库(如MySQL、Oracle等)导入Hadoop中。本章将介绍另一类型的数据——非关系型数据的收集。
在现实世界中,非关系型数据量远大于关系型数据。非关系型数据种类繁多,包括网页、视频、图片、用户行为日志、机器日志等,其中日志类数据直接反映了(日志)生产者的现状和行为特征,通常会用在行为分析系统、推荐系统、广告系统中。日志数据具有流式、数据量大等特点,通常分散在各种设备上,由不同服务和组件产生,为了高效地收集这些流式日志,需要采用具有良好扩展性、伸缩性和容错性的分布式系统。为了帮助用户解决日志收集问题,Hadoop生态系统提供了Flume,它是Cloudera公司开源的一个分布式高可靠系统,能够对不同数据源的海量日志数据进行高效收集、聚合、移动,最后存储到一个中心化的数据存储系统中。本章将重点剖析Flume设计思想、基本架构以及常见的使用场景。
在生产环境中,通常会部署各种类型的服务,比如搜索、推荐、广告等,这些服务均会记录大量流式日志。比如搜索系统,当用户输入一个查询词时,该搜索行为会以日志的形式被后端系统记录下来,当并发访问用户数非常多时,搜索系统后端将实时产生大量日志,如图3-1所示,如何高效地收集这些日志,并发送到后端存储系统(比如Hadoop、数据仓库等)中进行统一分析和挖掘,是每个公司需要解决的问题。
图3-1 数据收集面临的问题
总结起来,日志收集面临以下问题:
Cloudera公司开源的Flume系统便是解决以上这些流式数据收集问题的,它是一个通用的流式数据收集系统,可以将不同数据源产生的流式数据近实时地发送到后端中心化的存储系统中,具有分布式、良好的可靠性以及可用性等优点。
Flume采用了插拔式软件架构,所有组件均是可插拔的,用户可以根据自己的需要定制每个组件。Flume本质上是一个中间件,它屏蔽了流式数据源和后端中心化存储系统之间的异构性,使得整个数据流非常容易扩展和演化。
Flume最初是Cloudera工程师开发的日志收集和聚集系统,后来逐步演化成支持任何流式数据收集的通用系统。总结起来,Flume主要具备以下几个特点:
Flume目前存在两个版本,分别称为Flume OG(Original Generation)和Flume NG(Next/New Generation),其中Flume OG对应Apache Flume 0.9.x及之前的版本,已经被各大Hadoop发行版(比如CDH和HDP)所弃用;Flume NG对应Apache Flume 1.x版本,被主流Hadoop发行版采用,目前应用广泛。Flume NG在OG架构基础上做了调整,去掉了中心化的组件master以及服务协调组件ZooKeeper,使得架构更加简单和容易部署。Flume NG与OG是完全不兼容的,但沿袭了OG中很多概念,包括Source、Sink等,本节将重点剖析Flume NG的基本架构。
Flume的数据流是通过一系列称为Agent的组件构成的,如图3-2所示,一个Agent可从客户端或前一个Agent接收数据,经过过滤(可选)、路由等操作后,传递给下一个或多个Agent(完全分布式),直到抵达指定的目标系统。用户可根据需要拼接任意多个Agent构成一个数据流水线。
图3-2 Flume基本构成
Flume将数据流水线中传递的数据称为“Event”,每个Event由头部和字节数组(数据内容)两部分构成,其中,头部由一系列key/value对构成,可用于数据路由,字节数组封装了实际要传递的数据内容,通常使用Avro,Thrift,Protobuf(关于Avro、Thrift和Protobuf的具体介绍,可参考第5章)等对象序列化而成。
Flume中Event可由专门的客户端程序产生,这些客户端程序将要发送的数据封装成Event对象,并调用Flume提供的SDK发送给Agent。
接下来重点讲解Agent内部的组件构成,如图3-3所示。
Agent内部主要由三个组件构成,分别是Source,Channel和Sink,其作用和功能如下:
图3-3 Flume Agent基本构成
1. Source
Flume数据流中接收Event的组件,通常从Client程序或上一个Agent接收数据,并写入一个或多个Channel。为了方便用户使用,Flume提供了很多Source实现,主要包括:
当然,用户也可以根据自己的需要定制Source。
如何选择Flume Source?在实际生产环境中,存在两种数据源,一种是文件,可采用Exec Source或Spooling Directory Source收集数据,但考虑到前者无法保证数据完整性,后者实时性较差,通常会自己进行定制,既保证完整性,又具备较高的实时性,taildir source 便是一个非常优秀的解决方案,它能实时监控一个目录下文件的变化,实时读取新增数据,并记录断点,保证重启Agent后数据不丢失或被重复传输;另一种是网络数据,这时候可采用Avro/Thrift source,并自己编写客户端程序传输数据给该source。
2. Channel
Channel是一个缓存区,它暂存Source写入的Event,直到被Sink发送出去。目前Flume主要提供了以下几种Channel实现:
3. Sink
Sink负责从Channel中读取数据,并发送给下一个Agent(的Source)。Flume主要提供了以下几种Sink实现:
Flume使用事务性的方式保证Event传递的可靠性。Sink必须在Event被存入Channel后,或者已经被成功传递给下一个Agent后,才能把Event从Channel中删除掉。这样数据流里的Event无论是在一个Agent里还是多个Agent之间流转,都能保证可靠。
除了Source、Channel和Sink外,Flume Agent还允许用户设置其他组件更灵活地控制数据流,包括Interceptor,Channel Selector和Sink Processor等,如图3-4所示,本节将详细剖析这几个组件。
图3-4 Flume Agent内部高级组件
1. Interceptor
Interceptor组件允许用户修改或丢弃传输过程中的Event。Interceptor是一个实现了org.apache.flume.interceptor.Interceptor接口的类。用户可配置多个Interceptor,形成一个Interceptor链,这样,前一个Interceptor返回的Event将被传递给下一个Interceptor,而传递过程中,任何一个Interceptor均可修改或丢弃当前的Event。Flume自带了很多Interceptor实现,常用的有:
2. Channel Selector
Channel Selector允许Flume Source选择一个或多个目标Channel,并将当前Event写入这些Channel。Flume提供了两种Channel Selector实现,分别如下:
3. Sink Processor
Flume允许将多个Sink组装在一起形成一个逻辑实体(称为“Sink Group”),而Sink Processor则在Sink Group基础上提供负载均衡以及容错的功能(当一个Sink挂掉了,可由另一个Sink接替)。Flume提供多种Sink Processor实现,分别如下
3.2节从理论层面介绍了Flume NG的架构和模块构成,而本节我们将介绍如何使用Flume NG构建数据流拓扑,以满足生产环境需求。
为了使用Flume收集日志,我们需要构建一个完整的数据流水线。为此,我们可按照以下步骤操作:
步骤1:确定流式数据获取方式。
步骤2:根据需求规划Agent,包括Agent数目,Agent依赖关系等。
步骤3:设置每个Agent,包括Source,Channel和Sink等组件的基本配置。可参考Flume官方文档全面而详细地了解各组件的配置项 。
步骤4:测试构建的数据流拓扑。
步骤5:在生产环境部署该数据流拓扑。
本节重点关注步骤1~3的操作方法,接下来重点介绍流式数据获取方式、常见拓扑架构以及Agent配置方式。
1. 流式数据获取方式
Flume支持多种方式供外部数据源将流式数据发送给Flume,常用的方式包括:
1)远程过程调用(RPC): 这是最常用的一种方式,Flume支持目前主流的RPC协议,包括Avro和Thrift,比如下面的示例。
$ bin/flume-ng avro-client -H localhost -p 41414 -F /usr/logs/ngnix.log
该命令启动了一个Avro客户端,将/usr/logs/ngnix.log中的数据发送到指定的Avro服务器(启动在本地的41414端口)上。
2)TCP或UDP: Flume提供了syslog source,支持TCP和UDP两种协议,用户可通过这两种协议将外部数据写入flume。
3)执行命令: Flume提供了Exec Source,允许用户执行一个shell命令产生流式数据。
2. 常见拓扑架构
常见的Flume拓扑架构有两种:多路合并和多路复用。
(1)多路合并
在流式日志收集应用中,常见的一种场景是大量的日志产生客户端将日志发送到少数几个聚集节点上,由这些节点对日志进行聚集合并后,写入后端的HDFS中,整个数据流如图3-5所示。
图3-5 多路合并拓扑架构图
在该数据流中,每个Web Server将流式日志发送到一个对应的Flume Agent上,Flume Agent收到数据后,统一发送给一个汇总的Agent,由它写入HDFS。
(2)多路复用
Flume支持将数据路由到多个目标系统中,这是通过Flume内置的多路复用功能实现的,典型拓扑如图3-6所示。
图3-6 多路复用拓扑架构图
在该数据流中,Source产生的数据按照类别被写入不同的Channel,之后由不同Sink写入不同的目标系统中,需要注意的是,Sink可进一步写入另外一个Agent,进而实现Agent级联。
3. Agent配置方式
Flume采用标准的Java property文件格式描述各个组件的配置,当Agent启动时,会读取本地对应的Java property文件,并按照配置要求创建和启动Agent内部的各个组件。举例如下:
$ bin/flume-ng agent -n flume_agent_name -c conf -f conf/flume-conf.properties
该命令指定了Agent名称(flume_agent_name),Flume配置文件存放目录(conf)和该Agent的配置文件(conf/flume-conf.properties),之后Flume将按照conf/flume-conf.properties的描述,启动名为flume_agent_name的Agent。
(1)配置Agent
Agent由Source、Channel和Sink构成,其中Source和Sink充当生产者和消费者的角色,Channel可看作缓冲区,Source可对应多个Channel,但每个Sink只能与一个Channel关联。每个组件的定义描述由name、type和一系列特定属性构成。
Agent定义格式如下:
# 列出agent包含的source, sink和channel,可自定义名称 <agent>.sources = <source> <agent>.sinks = <sink> <agent>.channels = <channel1>,<channel2> # 为source设置channel <agent>.sources.<source>.channels = <channel1>,<channel2>,... # 为sink设置channel <agent>.sinks.<sink>.channel = <channel1>
【LogAgent实例】 如图3-7所示,我们需要从本地磁盘/tmp/logs目录下获取数据,写入分布式文件系统HDFS中,它对应的配置文件logagent.property如下:
LogAgent.sources = mysource LogAgent.channels = mychannel LogAgent.sinks = mysink LogAgent.sources.mysource.channels = mychannel LogAgent.sinks.mysink.channel = mychannel
图3-7 LogAgent实例示意图
(2)配置单个组件
用户可对Agent内部每个组件单独进行配置,格式如下:
# 设置source的属性<someProperty>值为<someValue> <agent>.sources.<source>.<someProperty> = <someValue> # 设置channel的属性<someProperty>值为<someValue> <agent>.channel.<channel>.<someProperty> = <someValue> # 设置sink的属性<someProperty>值为<someValue> <agent>.sources.<sink>.<someProperty> = <someValue>
接着前面的 【LogAgent实例】 ,在配置文件logagent.property中为mysource、mysink和mychannel设置属性:
# 配置名为"mysource"的Source,采用spooldir类型,从本地目录/tmp/logs获取数据 LogAgent.sources.mysource.type = spooldir LogAgent.sources.mysource.channels = mychannel LogAgent.sources.mysource.spoolDir =/tmp/logs # 配置名为"mysink"的Sink,将结果写入HDFS中,每个文件10000行数据 LogAgent.sinks.mysink.type = hdfs LogAgent.sinks.mysink.hdfs.path = hdfs://master:8020/data/logs/%Y/%m/%d/%H/ LogAgent.sinks.mysink.hdfs.batchSize = 1000 LogAgent.sinks.mysink.hdfs.rollSize = 0 LogAgent.sinks.mysink.hdfs.rollCount = 10000 LogAgent.sinks.mysink.hdfs.useLocalTimeStamp = true # 配置名为"mychannel"的Channel,采用memory类型 LogAgent.channels.mychannel.type = memory LogAgent.channels.mychannel.capacity = 10000
一旦logagent.property文件配置完毕后,可通过以下命令启动LogAgent:
bin/flume-ng agent -n LogAgent -c conf -f logagent.properties -Dflume.root.logger=DEBUG,console
3.2.1节已经提到,Flume提供了很多Source,Channel和Sink的实现,而每个实现都有自己独有的配置,查阅某个具体组件的可配置项,可参考Flume官方用户使用文档 ,在此不再赘述。
为了让读者更直观地了解Flume拓扑的构建方法,我们给出了两个综合实例。
1. 多路合并拓扑实例
设想在生产环境中,我们需上线一批应用,这些应用会实时产生用户行为相关的流式日志。我们的一个任务是收集这些日志,并按照日志类别(比如搜索日志,点击日志等)写到不同的HDFS目录中。为了完成这个任务,我们构建了图3-8所示的Flume拓扑。
图3-8 多路合并拓扑实例架构图
在应用内部,我们采用Flume SDK将日志(采用Avro格式)发送到后端的各个Agent上,为了减少HDFS访问并发数和生成的小文件数目,我们设计了两层Agent:第一层Agent采用Avro Source从应用程序端接收Event,并写入File Channel,之后由一组Avro Sink将数据发送给第二层Agent;第二层Agent接收到前一层Event数据后,通过HDFS Sink写入后端的HDFS。接下来介绍如何通过Flume提供的声明性语言构建这个拓扑。
(1)第一层Agent配置
以Agent11为例进行介绍。
首先定义Agent a11,并声明它的Source,Channel和Sink:
a11.sources = r11 a11.channels = c11 a11.sinks = k11,k12
接下来为两个Sink s11和s12构造一个Sink Group,并配置该group的属性:
a11.sinkgroups = g11 #开启负载均衡功能 a11.sinkgroups.g11.processor.type = LOAD_BALANCE #采用轮询方式进行负载均衡 a11.sinkgroups.g11.processor.selector = ROUND_ROBIN #同时开启负载均衡和容错 a11.sinkgroups.g11.processor.backoff = true
配置Source类型为AVRO,并绑定本地的IP和端口号:
a11.sources.r11.channels = c11 a11.sources.r11.type = AVRO a11.sources.r11.bind = 0.0.0.0 #本地IP a11.sources.r11.port = 41414
配置Channel类型为FILE:
a11.channels.c11.type = FILE
配置两个Sink类型为AVRO,并设置目标Agent的Avro Server地址:
a11.sinks.k11.channel = c11 a11.sinks.k11.type = AVRO a11.sinks.k11.hostname = a21.example.org a11.sinks.k11.port = 41414 a11.sinks.k12.channel = c11 a11.sinks.k12.type = AVRO a11.sinks.k12.hostname = a22.example.org a11.sinks.k12.port = 41414
以上配置文件信息可保存到文件agent11.properties中,并通过以下命令启动该Agent:
bin/flume-ng agent -n a11 -c conf -f agent11.properties
(2)第二层Agent配置
以Agent21为例进行介绍。
首先定义Agent a21,并依次声明它的Source,Channel和Sink:
a21.sources = r21 a21.channels = c21 a21.sinks = k21
配置Source类型为AVRO,并绑定本地的IP和端口号:
a21.sources.r21.channels = c21 a21.sources.r21.type = AVRO a21.sources.r21.bind = 0.0.0.0 a21.sources.r21.port = 41414
配置Channel类型为FILE:
a21.channels.c21.type = FILE
配置Sink类型为HDFS:
a21.sinks.k21.channel = c21 a21.sinks.k21.type = hdfs #指定HDFS存放路径 a21.sinks.hdfsSink.hdfs.path = hdfs://bigdata/flume/appdata/%Y-%m-%d/%H%M a21.sinks.hdfsSink.hdfs.filePrefix= log a21.sinks.hdfsSink.hdfs.rollInterval= 600 a21.sinks.hdfsSink.hdfs.rollCount= 10000 a21.sinks.hdfsSink.hdfs.rollSize= 0 a21.sinks.hdfsSink.hdfs.round = true a21.sinks.hdfsSink.hdfs.roundValue = 10 a21.sinks.hdfsSink.hdfs.roundUnit = minute #fileType可以是SequenceFile, DataStream 或CompressedStream,分别表示 #二进制格式,未压缩原始数据格式,经压缩的原始数据格式 a21.sinks.k21.hdfs.fileType = DataStreama21.sinks.k21.channel = c21 a21.sinks.k21.type = hdfs #指定HDFS存放路径 a21.sinks.hdfsSink.hdfs.path = hdfs://bigdata/flume/appdata/%Y-%m-%d/%H%M a21.sinks.hdfsSink.hdfs.filePrefix= log a21.sinks.hdfsSink.hdfs.rollInterval= 600 a21.sinks.hdfsSink.hdfs.rollCount= 10000 a21.sinks.hdfsSink.hdfs.rollSize= 0 a21.sinks.hdfsSink.hdfs.round = true a21.sinks.hdfsSink.hdfs.roundValue = 10 a21.sinks.hdfsSink.hdfs.roundUnit = minute #fileType可以是SequenceFile, DataStream 或CompressedStream,分别表示 #二进制格式,未压缩原始数据格式,经压缩的原始数据格式 a21.sinks.k21.hdfs.fileType = DataStream
以上配置文件信息可保存到文件agent21.properties中,并通过以下命令启动该Agent:
bin/flume-ng agent -n a21 -c conf -f agent21.properties
2. 多路复用拓扑实例
在该实例中,我们直接让应用程序通过TCP发送日志到对应的Agent,之后由Agent将所有数据写入HDFS,此外,Agent会按照Event头部的Severity属性值判断数据的重要性(共分为五种级别的数据:emergency,alert,critical,error和normal,分别用0~4表示),其中重要的数据会往HBase额外写入一份,具体拓扑如图3-9所示,与实例1类似,该拓扑也分为两层,其中第一层两者类似,重点介绍第二层。
以Agent21为例,介绍第二层的拓扑配置方法。Agent a21中的Channel和Sink Group声明和定义方式与实例1类似,在此不再赘述,接下来重点而介绍Source和Sink的配置方法。
图3-9 多路复用拓扑实例架构图
定义Source,类型为AVRO,对应两个Channel:
a21.sources.r21.channels = c21 c22 a21.sources.r21.type = AVRO a21.sources.r21.bind = 0.0.0.0 a21.sources.r21.port = 41414
为Source添加Channel Selector,按照Event头部的Severity属性值判定数据的重要程度,如果该值的范围为0~3,表示为重要数据,需往HBase中额外写一份,具体如下:
a21.sources.r21.selector.type = MULTIPLEXING a21.sources.r21.selector.header = Severity a21.sources.r21.selector.default = c21 # 重要数据需同时写到HDFS和HBase中,显式指定重要度为0~3的数据同时写入c21和c22 a21.sources.r21.selector.mapping.0 = c21 c22 a21.sources.r21.selector.mapping.1 = c21 c22 a21.sources.r21.selector.mapping.2 = c21 c22 a21.sources.r21.selector.mapping.3 = c21 c22
配置HDFS Sink如下:
a21.sinks.k21.channel = c21 a21.sinks.k21.type = hdfs # 指定HDFS写入目录,同时以时间为子目录名 a21.sinks.k21.hdfs.path = hdfs://bigdata/appdata/%Y-%m-%d/%H%M/ # 每次生成的文件的前缀名 a21.sinks.k21.hfds.filePrefix = FlumeData-%{host}- a21.sinks.k21.hdfs.fileType = DataStream # 每隔十分钟,生成一个新文件 a21.sinks.k21.hdfs.round = true a21.sinks.k21.hdfs.roundUnit = minute a21.sinks.k21.hdfs.roundValue = 10
配置HBase Sink如下:
a21.sinks.k22.channel = c22 # 异步写入Hbase a21.sinks.k22.type = asynchbase # 指定Hbase写入的表名和列簇名分别为appdata和log a21.sinks.k22.table = appdata a21.sinks.k22.columnFamily = log
Flume是一个高效的流式数据收集系统,具有良好扩展性、伸缩性和容错性等特点,被互联网公司广泛采用。它采用插拔式软件架构,其数据流是由一系列称为Agent的组件构成的,每个Agent内部由Source、Channel和Sink模块化组件构成,用户可根据实际应用场景选择最合适的Source、Channel和Sink,也可根据需要定制自己的实现。
问题1: 在互联网领域,常使用Ngnix作为Web服务器,假设公司X拥有10台Web服务器,试说明如何收集这些机器上Ngnix产生的日志(存放在目录/var/log/nginx下),并在确保不丢数据的前提下存储到HDFS中(存放在/data/log目录下)。需要给出配置文件内容和Agent启动方式。
问题2: 如何保证在以下情况下,Flume不会丢失数据:
问题3: 尝试定义一个名为“KafkaAgent”的Agent,它将本地目录/data/log下新产生的数据写入Kafka的log主题(topic)中。