购买
下载掌阅APP,畅读海量书库
立即打开
畅读海量书库
扫码下载掌阅APP

第2章 关系型数据的收集

从本章开始,我们将介绍与数据收集相关的工具和系统。正如第1章所述,数据可简单分为关系型和非关系型两种,本章重点介绍如何实现关系型数据的收集。

关系型数据是常见的一种数据类型,通常存储在像MySQL、Oracle等关系型数据库中,为了能够利用大数据技术处理和存储这些关系型数据,首先需将这些数据导入到像HDFS、HBase这样的大数据存储系统中,以便使用MapReduce、Spark这样的分布式计算技术进行高效分析和处理。从另一个角度讲,为了便于与前端的数据可视化系统对接,我们通常需要将Hadoop大数据系统分析产生的结果(比如报表,通常数据量不会太大)导回到关系型数据库中。为了解决上述问题,高效地实现关系型数据库与Hadoop之间的数据导入导出,Hadoop生态系统提供了工具Sqoop(SQL to Hadoop),本章将重点剖析Sqoop设计思想、基本架构以及常见的使用场景。

2.1 Sqoop概述

2.1.1 设计动机

Sqoop从工程角度,解决了关系型数据库与Hadoop之间的数据传输问题,它构建了两者之间的“桥梁”,使得数据迁移工作变得异常简单。在实际项目中,如果遇到以下任务,可尝试使用Sqoop完成:

为了解决上述数据收集过程中遇到的问题,Apache Sqoop项目诞生了,它是一个性能高、易用、灵活的数据导入导出工具,在关系型数据库与Hadoop之间搭建了一个桥梁,如图2-1所示,让关系型数据收集变得异常简单。

图2-1 Sqoop的“桥梁”作用

2.1.2 Sqoop基本思想及特点

Sqoop采用插拔式Connector架构,Connector是与特定数据源相关的组件,主要负责(从特定数据源中)抽取和加载数据。用户可选择Sqoop自带的Connector,或者数据库提供商发布的native Connector,甚至根据自己的需要定制Connector,从而把Sqoop打造成一个公司级别的数据迁移统一管理工具。Sqoop主要具备以下特点:

2.2 Sqoop基本架构

Sqoop目前存在两个版本,截至本书出版时,两个版本分别以版本号1.4.x和1.99.x表示,通常简称为“Sqoop1”和“Sqoop2”,Sqoop2在架构和设计思路上对Sqoop1做了重大改进,因此两个版本是完全不兼容的。在这一节中,我们重点关注这两个版本的设计原理和架构。

2.2.1 Sqoop1基本架构

Sqoop1是一个客户端工具,不需要启动任何服务便可以使用,非常简便。Sqoop1是实际上是一个只有Map的MapReduce作业,它充分利用MapReduce高容错性、扩展性好等优点,将数据迁移任务转换为MapReduce作业,整个架构如图2-2所示。

图2-2 Sqoop1基本架构

当用户通过shell命令提交迁移作业后,Sqoop会从关系型数据库中读取元信息,并根据并发度和数据表大小将数据划分成若干分片,每片交给一个Map Task处理,这样,多个Map Task同时读取数据库中的数据,并行将数据写入目标存储系统,比如HDFS、HBase和Hive等。

Sqoop允许用户通过定制各种参数控制作业,包括任务并发度、数据源、超时时间等。总架构上讲,Sqoop1只是一个客户库工具,windows下绿色版软件大多是对原始软件的破解,建议删去这个类比因此使用起来非常简单,但如果你的数据迁移作业很多,Sqoop1则会暴露很多缺点,包括:

2.2.2 Sqoop2基本架构

为了解决Sqoop1客户端架构所带来的问题,Sqoop2对其进行了改进,如图2-3所示,引入了Sqoop Server,将所有管理工作放到Server端,包括Connector管理、MySQL/Hadoop相关的客户端、安全认证等,这使得Sqoop客户端变得非常轻,更易于使用。Sqoop1到Sqoop2的变迁,类似于传统软件架构到云计算架构的变迁,将所有软件运行到“云端”(Sqoop Server),而用户只需通过命令和或浏览器便可随时随处使用Sqoop。

图2-3 Sqoop2基本结构

Sqoop2主要组件及功能如下:

1. Sqoop Client

定义了用户使用Sqoop的方式,包括客户端命令行(CLI)和浏览器两种方式,其中浏览器方式允许用户直接通过HTTP方式完成Sqoop的管理和数据的导入导出。

2. Sqoop Server

Sqoop1中Client端大部分功能在Sqoop2中转移到了Sqoop Server端,包括:

1)Connector:所有Connector的实现放置到Sqoop Server端,且Connector被进一步抽象化和模块化,它的通用部分被抽取出来,本身只关注数据解析和加载相关的功能,包括Partitioner、Extractor和Loader等主要模块。具体功能如下:

从前面介绍可容易看出,整个过程只需一个MapReduce作业即可完成:Partitioner和Extractor在Map阶段完成,Loader在Reduce阶段完成。

2)Metadata:Sqoop中的元信息,包括可用的Connector列表、用户创建的作业和Link(实例化的一个Connector,以便创建作业时使用)等。元信息被存储在数据仓库中,默认使用轻量级数据库Apache Derby,用户也可根据需要替换成MySQL等其他数据库。

3)RESTful和HTTP Server:与客户端对接,响应客户端发出的RESTful请求和HTTP请求。

Sqoop Server会根据用户创建的Sqoop Job生成一个MapReduce作业,提交到Hadoop集群中分布式执行。

2.2.3 Sqoop1与Sqoop2对比

Sqoop2在Sqoop1的基础上进行了重大改进,本节将从易用性、扩展性和安全性三个方面对比Sqoop1和Sqoop2,分别如表2-1~表2-3所示。

表2-1 Sqoop1与Sqoop2在易用性方面对比

表2-2 Sqoop1与Sqoop2在扩展性方面对比

表2-3 Sqoop1与Sqoop2在安全性方面对比

总结起来,Sqoop2通过将访问入口服务化,将所有的复杂功能放到服务器端,大大简化了客户端实现,使其更轻量级,进而变得更加易用。

2.3 Sqoop使用方式

本节将介绍如何使用Sqoop完成数据迁移工作。本节内容假定用户环境中已存在关系型数据库MySQL和分布式文件系统HDFS,且Sqoop已经安装部署完成,在此基础上,介绍如何使用Sqoop1和Sqoop2在MySQL和HDFS之间导入导出数据。

2.3.1 Sqoop1使用方式

Sqoop1仅支持命令行使用方式,主要为用户提供了import和export两种命令:

接下来分别介绍import和export的用法。

1. import用法

import基本用法如下:

$ sqoop import [generic-args] [import-args]

import包含两类参数,[generic-args]是Hadoop通用参数,[import-args]是import特有的参数,含义分别如下:

1)Hadoop通用参数主要包含以下几个:

2)import特有参数:import特有参数非常多,可通过“sqoop import help”命令查看所有参数,常用的如表2-4所示。

表2-4 import特有参数及其含义

【实例1】 将MySQL数据库movie中表data中的数据导出到HDFS中:

$ sqoop import --connect jdbc:mysql://mysql.example.com/movie\
--table data --username xicheng --password 123456

最终导出结果存放在HDFS的用户根目录下的user目录中,比如运行这个命令的linux用户为X,则数据最终存放在HDFS的/user/X/data/目录中。

【实例2】 将MySQL数据库movie表中data中符合某种条件的数据导出到HDFS中:

$ sqoop import --connect jdbc:mysql://mysql.example.com/movie\
    --username xicheng --password 123456 --num-mapper 10 \
    --query "select name, id from data where date > 10" \
--target-dir /prod/data

该命令将SQL筛选出来的数据导出到HDFS的/prod/data目录中,为了防止并发的任务数目过多对MySQL产生过大负载,该命令限制并发任务数目为10。

2. export用法

export基本用法如下:

$ sqoop export [generic-args] [export-args]

与import类似,export也包含两类参数,[generic-args]是Hadoop通用参数,[export-args]是export特有的参数,其中Hadoop通用参数与import中的相同,而特有参数稍有不同,具体如表2-5所示。

表2-5 import特有参数及其含义

【实例1】 将HDFS中/user/X/data/目录下的数据导入MySQL数据库movie中表data中:

$ sqoop export --connect jdbc:mysql://mysql.example.com/movie\
--table data --export-dir /user/X/data/ \
    --username xicheng --password 123456

/user/X/data/中数据列数与类型应该与表data是一一对应的,默认情况下,该目录下所有文件应为文本格式,且数据记录之间用“\n”分割(可使用参数“--lines-terminated-by<char>”修改),记录内部列之间用“,”分割(可使用参数“--fields-terminated-by<char>”修改)。

【实例2】 将HDFS中数据增量导入MySQL表中:

$ sqoop export --connect jdbc:mysql://mysql.example.com/movie\
--table data --export-dir /user/X/data/ \
    --username xicheng --password 123456 \
    --update-key id --update-mode allowinsert

假设表data的定义如下:

CREATE TABLE foo(
    id INT NOT NULL PRIMARY KEY,
    msg VARCHAR(32),
    bar INT);

HDFS中/user/X/data/目录下数据如下:

0,this is a test,42
1,some more data,100
...

则以上命令执行效果等价于:

UPDATE foo SET msg='this is a test', bar=42 WHERE id=0;
UPDATE foo SET msg='some more data', bar=100 WHERE id=1;
...

某一条数据对应的id,若在MySQL中已存在,则会更新这一行内容;否则,会将该条数据作为一条新记录插入表中。

2.3.2 Sqoop2使用方式

在正式介绍Sqoop2使用方法之前,我们先解释几个关键概念:

1)Connector:访问某种数据源的组件,负责从数据源中读取数据,或将数据写入数据源,Sqoop2内置了多种数据源,具体如下:

2)Link:一个Connector实例。

3)Job:完成数据迁移功能的分布式作业,可从某个数据源(称为“FROM link”)中读取数据,并导入到另一种数据源(称为“TO link”)中。

可通过“sqoop.sh client”进入Sqoop2提供的shell命令行,并完成创建Link,创建作业,执行作业等一系列过程。

1. 创建Link

先通过以下命令查看Sqoop2提供的所有可用Connector,如图2-4所示:

图2-4 查看所有可用的Connector

用户可使用“create link–c<connector-id>”创建一个link,接下来具体介绍。

【实例1】 创建一个JDBC类型的Link

sqoop:000> create link -c 4
Creating link for connector with id 4
Please fill following values to create new link object
Name: MySQL-Reader
Link configuration
JDBC Driver Class: com.mysql.jdbc.Driver
JDBC Connection String: jdbc:mysql://localhost/movie
Username: dongxicheng
Password: ******
JDBC Connection Properties: 
There are currently 0 values in the map:
entry# 
New link was successfully created with validation status OK and persistent id 1

【实例2】 创建一个HDFS类型的Link

sqoop:000> create link -c 3
Creating link for connector with id 3
Please fill following values to create new link object
Name: HDFS-Loader
Link configuration
HDFS URI: hdfs://locahost:8020
New link was successfully created with validation status OK and persistent id 2

该Link需要指定两个属性,分别是“Name”和“HDFS URI”,在该实例中,分别设置为“HDFS-Loader”和“hdfs://localhost:8020”。

查看已创建的Link,如图2-5所示。

图2-5 查看已创建的Link

2. 创建Job

可使用“create job-f<link-id1>-t<link-id2>”命令创建一个从link-id1到link-id2的数据迁移作业,实例如下:

sqoop:000> create job -f 1 -t 2
Creating job for links with from id 1 and to id 2
Please fill following values to create new job object
Name: mysql-to-hdfs
From database configuration
Schema name: movie 
Table name: user
Table SQL statement: 
Table column names: 
Partition column name: userid
Null value allowed for the partition column: 
Boundary query: 
ToJob configuration
Override null value: 
Null value: 
Output format: 
  0 : TEXT_FILE
  1 : SEQUENCE_FILE
Choose: 0
Compression format: 
  0 : NONE
  1 : DEFAULT
  2 : DEFLATE
  3 : GZIP
  4 : BZIP2
  5 : LZO
  6 : LZ4
  7 : SNAPPY
  8 : CUSTOM
Choose: 0
Custom compression format: 
Output directory: /tmp/xicheng/user_table

Throttling resources
Extractors: 5
Loaders: 2
New job was successfully created with validation status OK  and persistent id 1

用户只需使用Sqoop2提供的交互式引导流程完成相应的内容填写即可,除了“Name”(job名称),“Schema Name”(数据库名称),“Table Name”(表名)和“Partition column name”(采用指定的列对数据分片,每片由一个任务处理,通常设置为主键),其他均可以使用默认值。需要解释的是,“Extractors”和“Loaders”两栏相当于指定Map Task(默认是10)和Reduce Task的数目。

3. 提交并监控Job

可使用“start job–jid<job-id>”命令将作业提交到集群中,使用“status job–jid<job-id>”查看作业运行状态,实例如下:

sqoop:000> start job -jid 1
Submission details
Job ID: 1
Server URL: http://localhost:12000/sqoop/
Created by: xicheng.dong
Creation date: 2015-09-02 19:39:56 PDT
Lastly updated by: xicheng
External ID: job_1440945009469_0008
http://localhost:8088/proxy/application_1440945009469_0008/

查看作业运行状态:

sqoop:000> status job -jid 1
Submission details
Job ID: 1
Server URL: http://localhost:12000/sqoop/
Created by: xicheng.dong
Creation date: 2015-09-01 19:39:56 PDT
Lastly updated by: xicheng.dong
External ID: job_1440945009469_0008
http://localhost:8088/proxy/application_1440945009469_0008/
2015-09-02 19:41:01 PDT: RUNNING  - 83.35 %

除了以上常用命令外,Sqoop2还提供了针对Link和Job的更新(update)、删除(delete)、复制(clone)等管理命令,读者如果感兴趣,可自行尝试。

2.4 数据增量收集CDC

Sqoop采用MapReduce可进行全量关系型数据的收集,但难以高效地增量收集数据。很多场景下,除了收集数据库全量数据外,还希望只获取增量数据,即MySQL某个表从某个时刻开始修改/插入/删除的数据。捕获数据源中数据的更新,进而获取增量数据的过程,被称为CDC(“Change Data Capture”)。为了实现CDC,可选的方案有:

以上几种方案均存在明显缺点,“定时扫描整表”性能低效,且延迟过高;“写双份数据”需要业务层修改代码,一致性难以保证,不利于系统演化;“利用触发器机制”管理烦琐,且对数据库性能影响较大。

为了克服这几种方案存在的问题,基于事务或提交日志解析的方案出现了。这种方案通过解析数据库更新日志,还原更新的数据,能够在不对业务层代码做任何修改的前提下,高效地获取更新数据。目前常见的开源实现有阿里巴巴的Canal 和LinkedIn的Databus ,本节以Canal为主,介绍CDC设计的主要原理和架构。

2.4.1 CDC动机与应用场景

图2-6 CDC应用场景

CDC系统主要功能是捕获数据库中的数据更新,将增量数据发送给各个订阅者和消费者。CDC系统应用非常广泛,具体可描述为图2-6所示,主要包括:

2.4.2 CDC开源实现Canal

Canal的主要定位是基于数据库增量日志解析,提供增量数据订阅和消费,目前主要支持了MySQL关系型数据库。

Canal的主要原理是,模拟数据库的主备复制协议,接收主数据库产生的binary log(简称“binlog”),进而捕获更新数据,以MySQL为例说明,具体如图2-7所示。

步骤1:Canal实现MySQL主备复制协议,向MySQL Server发送dump协议。

步骤2:MySQL收到dump请求,开始推送binlog给Canal。

步骤3:Canal解析binlog对象,并发送给各个消费者。

为了便于扩展,Canal采用了模块化架构设计,具体如图2-8所示。

图2-7 Canal基本原理

图2-8 Canal基本架构

1)Canal Server:代表一个Canal运行实例,对应于一个JVM。

2)Canal Instance:对应于一个数据队列(1个Canal Server对应1到n个instance),主要包含以下几个模块:

Canal提供了流式API,可更加高效地获取数据库变更数据,代码示例如下:

CanalConnector connector =  //建立与Canal Server的连接
    CanalConnectors.newClusterConnector("hostX:2181", destination, "", "");
while (running) {
    try {
    connector.connect(); //连接Canal
    connector.subscribe(); //订阅数据
    while (running) {
        Message message = connector.getWithoutAck(5 * 1024); // 获取指定数量的数据
        long batchId = message.getId();
        int size = message.getEntries().size();
        if (batchId == -1 || size == 0) {
    // no data, sleep…
    } else {
    processEntry(message.getEntries()); //数据处理函数
        }
        connector.ack(batchId); // 提交确认
        // connector.rollback(batchId); // 处理失败, 回滚数据
}
……

processEntry是捕获更新的函数,实现如下:

void processEntry (List<Entry> entrys) {
    for (Entry entry : entrys) {
        if (entry.getEntryType() == EntryType.ROWDATA) {
            RowChange rowChage = null;
            try {
            rowChage = RowChange.parseFrom(entry.getStoreValue()); //数据反序列化
            } catch (Exception e) {
            throw new RuntimeException("parse event an error:" + entry.toString(), e);
            }
            EventType eventType = rowChage.getEventType();
            for (RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == EventType.DELETE) {     //删除操作
                //processColumn(rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) { //插入操作
                //processColumn(rowData.getAfterColumnsList());
                } else { //其他操作
                //processColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }
}

关于Canal的更多细节,可参考Canal官网文档

相比于阿里巴巴的Canal系统,LinkedIn的Databus更加强大,包括支持更多数据源(Oracle和MySQL等)、扩展性更优的架构(比如高扩展的架构允许保存更长时间的更新数据)等,有兴趣的读者可参考Databus官方文档

2.4.3 多机房数据同步系统Otter

为了解决多机房数据同步问题,阿里巴巴基于Canal研发了Otter ,该系统是Canal消费端的一个实现,其定位是分布式数据库同步系统,它基于数据库增量日志解析,准实时同步到本机房或异地机房的MySQL/Oracle数据库。

1. Otter基本原理

Otter基于Canal开源产品,获取数据库增量日志数据,本身采用典型的管理系统架构:Manager(Web管理)+Node(工作节点),具体架构如图2-9所示。

为了解决分布式状态调度,允许多Node节点之间协同工作,Otter采用了开源分布式协调组件ZooKeeper。

图2-9 Otter基本架构

2. Otter的S、E、T、L阶段模型

为了让系统具有良好的扩展性和灵活性,Otter将整个同步流程抽象为Select、Extract、Transform、Load(简称S、E、T、L)四个阶段,具体如图2-10所示。

图2-10 Otter的S/E/T/L四个阶段

3. Otter跨机房数据同步

使用Otter可以构建各种数据同步应用,其中最经典的是异地跨机房数据同步,部署架构如图2-11所示。

图2-11 Otter跨机房部署图

用户通过Manager可设置跨机房同步任务。在原机房中,数据由Canal获取后,经数据接入和数据提取两个阶段处理后,通过网络发送给目标机房,在目标机房中,经数据转换和数据载入两个阶段处理后,最终将数据写入新的数据库或消息队列等系统。整个过程需要说明的是:

2.5 小结

本章介绍了关系型数据收集系统Sqoop,涉及内容包括设计动机、基本架构以及使用方式。Sqoop非常适合全量数据的收集,而在很多应用场景下,还需要进行增量数据的收集,为了解决该问题,CDC系统出现了,本章最后介绍了CDC系统的设计动机、开源实现Canal以及构建在Canal之上,跨机房数据同步方案Otter。

2.6 本章问题

问题1: Hive、HBase和Kafka均是具有存储能力的系统,尝试使用Sqoop将MySQL中的一张表分别导入这三个系统。

问题2: 试说明,Sqoop是如何保证容错能力的?即在机器或网络出现故障的情况下,如何保证数据同步工作顺利完成? /UhMaX8jq/DmqlcJ3Tc3QQ5NZNViNTJjTqJCO5NboH+BwBr+eaYcsCxdnWR+P1ij

点击中间区域
呼出菜单
上一章
目录
下一章
×