Kudu大数据库零基础从入门到实战

Apache Kudu介绍

背景介绍

在KUDU之前,大数据主要以两种方式存储;
(1)静态数据:
以 HDFS 引擎作为存储引擎,适用于高吞吐量的离线大数据分析场景。
这类存储的局限性是数据无法进行随机的读写。
(2)动态数据:
以 HBase、Cassandra 作为存储引擎,适用于大数据随机读写场景。
局限性是批量读取吞吐量远不如 HDFS,不适用于批量数据分析的场景。
从上面分析可知,这两种数据在存储方式上完全不同,进而导致使用场景完全不同,但在真实的场景中,边界可能没有那么清晰,面对既需要随机读写,又需要批量分析的大数据场景,该如何选择呢?
这个场景中,单种存储引擎无法满足业务需求,我们需要通过多种大数据工具组合来满足这一需求,如下图所示:

image.png

如上图所示,数据实时写入HBase,实时的数据更新也在 HBase 完成,为了应对 OLAP 需求,我们定时将 HBase 数据写成静态的文件(如:Parquet)导入到 OLAP 引擎(如:Impala、hive)。这一架构能满足既需要随机读写,又可以支持 OLAP 分析的场景,但他有如下缺点:

(1) 架构复杂 。从架构上看,数据在HBase、消息队列、HDFS 间流转,涉及环节太多,运维成本很高。并且每个环节需要保证高可用,都需要维护多个副本,存储空间也有一定的浪费。最后数据在多个系统上,对数据安全策略、监控等都提出了挑战。

(2) 时效性低 。数据从HBase导出成静态文件是周期性的,一般这个周期是一天(或一小时),在时效性上不是很高。

(3) 难以应对后续的更新 。真实场景中,总会有数据是延迟到达的。如果这些数据之前已经从HBase导出到HDFS,新到的变更数据就难以处理了,一个方案是把原有数据应用上新的变更后重写一遍,但这代价又很高。

为了解决上述架构的这些问题,KUDU应运而生。KUDU的定位是Fast Analytics on Fast Data, 是一个既支持随机读写、又支持OLAP 分析的大数据存储引擎

image.pngimage.png

从上图可以看出,KUDU 是一个折中的产品,在 HDFS 和 HBase 这两个偏科生中平衡了随机读写和批量分析的性能。从 KUDU 的诞生可以说明一个观点:底层的技术发展很多时候都是上层的业务推动的,脱离业务的技术很可能是空中楼阁。

kudu是什么

Apache Kudu是由Cloudera开源的存储引擎,可以同时提供低延迟的随机读写和高效的数据分析能力。它是一个融合HDFS和HBase的功能的新组件,具备介于两者之间的新存储组件。

Kudu支持水平扩展,并且与Cloudera Impala和Apache Spark等当前流行的大数据查询和分析工具结合紧密。

image.png

kudu应用场景

适用于那些既有随机访问,也有批量数据扫描的复合场景

高计算量的场景

使用了高性能的存储设备,包括使用更多的内存

支持数据更新,避免数据反复迁移

支持跨地域的实时数据备份和查询

国内使用的kudu一些案例可以查看《构建近实时分析系统.pdf》文档。

Apache Kudu架构

与HDFS和HBase相似,Kudu使用单个的Master节点,用来管理集群的元数据,并且使用任意数量的Tablet Server(类似HBase中的RegionServer角色)节点用来存储实际数据。可以部署多个Master节点来提高容错性。

image.png

image.png

Table

表(Table)是数据库中用来存储数据的对象,是有结构的数据集合。kudu中的表具有schema(纲要)和全局有序的primary key(主键)。kudu中一个table会被水平分成多个被称之为tablet的片段。

Tablet

一个tablet 是一张table连续的片段,tablet是kudu表的水平分区,类似于HBase的region。每个tablet存储着一定连续range的数据(key),且tablet两两间的range不会重叠。一张表的所有tablet包含了这张表的所有key空间。

tablet 会冗余存储。放置到多个tablet server上,并且在任何给定的时间点,其中一个副本被认为是leader tablet,其余的被认之为follower tablet。每个tablet都可以进行数据的读请求,但只有Leader tablet负责写数据请求。

Tablet Server

tablet server集群中的小弟,负责数据存储,并提供数据读写服务

一个tablet server 存储了table表的tablet,向kudu client 提供读取数据服务。对于给定的 tablet,一个tablet server 充当 leader,其他 tablet server 充当该 tablet 的 follower 副本。

只有leader服务写请求,然而 leader 或 followers 为每个服务提供读请求 。一个 tablet server 可以服务多个 tablets ,并且一个 tablet 可以被多个 tablet servers 服务着。

Master Server

集群中的老大,负责集群管理、元数据管理等功能。

Apache Kudu安装

节点规划

节点kudu-masterkudu-tserver
node1
node2
node3

本地yum源配置

cdh包下载

http://archive.cloudera.com/cdh5/repo-as-tarball/5.14.0/cdh5.14.0-centos6.tar.gz

下载cdh5.14.0-centos6.tar.gz文件,大小约5G左右。

上传解压

把5个G的压缩文件上传其中某一台服务器,作为本地yum源服务器。(这里需要确保服务器的磁盘空间是充足的,如果磁盘容量不够,就需要扩容,增大磁盘的容量,具体操作可以参考附件)。

cd /cloudera_data

tar -zxvf cdh5.14.0-centos6.tar.gz

制作本地yum源

使用Apache Server来充当web服务器,使得其他机器可以通过http方式读取本地制作的yum源软件。这里我们选用第三台机器(node-3)作为yum源。执行以下命令安装apache Server:

yum -y install httpd

service httpd start

然后创建新增一个解析本地yum源的配置文件

cd /etc/yum.repos.d

vim localimp.repo

[localimp]

name=localimp

baseurl=http://node3/cdh5.14.0

gpgcheck=0

enabled=1

创建连接、启动httpd

ln -s /export/servers/cdh/5.14.0 /var/www/html/cdh5.14.0

访问http://node-3/cdh5.14.0验证是否成功

image.png

如果出现访问异常:You don't have permission to access /cdh5.14.0/ on this server,则需要关闭Selinux服务
    (1)临时关闭
            执行命令:setenforce 0
     (2) 永久关闭
            vim /etc/sysconfig/selinux
            SELINUX=enforcing 改为 SELINUX=disabled
            重启服务reboot
将node-3上制作好的localimp配置文件发放到所有需要kudu的节点上去
scp /etc/yum.repos.d/localimp.repo node-1:/etc/yum.repos.d
scp /etc/yum.repos.d/localimp.repo node-2:/etc/yum.repos.d

安装kudu

使用yum命令,在不同的服务器下载对应的服务。

服务器安装命令
node-1yum install -y kudu kudu-master kudu-tserver kudu-client0 kudu-client-devel
node-2yum install -y kudu kudu-master kudu-tserver kudu-client0 kudu-client-devel
node-3yum install -y kudu kudu-master kudu-tserver kudu-client0 kudu-client-devel

yum install kudu # Kudu的基本包

yum install kudu-master # KuduMaster

yum install kudu-tserver # KuduTserver

yum install kudu-client0 #Kudu C ++客户端共享库

yum install kudu-client-devel # Kudu C ++客户端共享库 SDK

image.png

kudu节点配置

安装完成之后。需要在所有节点的/etc/kudu/conf目录下有两个文件:master.gflagfile和tserver.gflagfile。

修改master.gflagfile

# cat /etc/kudu/conf/master.gflagfile
# Do not modify these two lines. If you wish to change these variables,
# modify them in /etc/default/kudu-master.
--fromenv=rpc_bind_addresses
--fromenv=log_dir
--fs_wal_dir=/export/servers/kudu/master
--fs_data_dirs=/export/servers/kudu/master
--master_addresses=node-1:7051,node-2:7051,node-3:7051

修改tserver.gflagfile

# Do not modify these two lines. If you wish to change these variables,
# modify them in /etc/default/kudu-tserver.
--fromenv=rpc_bind_addresses
--fromenv=log_dir
--fs_wal_dir=/export/servers/kudu/tserver
--fs_data_dirs=/export/servers/kudu/tserver
--tserver_master_addrs=node-1:7051,node-2:7051,node-3:7051

修改 /etc/default/kudu-master

export FLAGS_log_dir=/var/log/kudu
#每台机器的master地址要与主机名一致,这里是在node-1上
export FLAGS_rpc_bind_addresses=node-1:7051

修改 /etc/default/kudu-tserver

export FLAGS_log_dir=/var/log/kudu
#每台机器的tserver地址要与主机名一致,这里是在node-1上
export FLAGS_rpc_bind_addresses=node-1:7050

kudu默认用户就是KUDU,所以需要将/export/servers/kudu权限修改成kudu:

mkdir /export/servers/kudu

chown -R kudu:kudu /export/servers/kudu

(如果使用的是普通的用户,那么最好配置sudo权限)/etc/sudoers文件中添加:

image.png

kudu集群启动和关闭

安装ntp服务

启动的时候要注意时间同步

安装ntp服务

yum -y install ntp

设置开机启动

**service ntpd start **

chkconfig ntpd on

可以在每台服务器执行

/etc/init.d/ntpd restart

启动kudu集群

在每台服务器上都执行下面脚本

service kudu-master start

service kudu-tserver start

如果启动失败,请前往日志目录下查看输出日志信息进行排错。

image.png

关闭kudu集群

在每台服务器上都执行下面脚本

service kudu-master stop

service kudu-tserver stop

kudu web UI

kudu的web管理界面。http://master主机名:8051

image.png

Master的web地址

可以查看每个机器上master相关信息。http://node-1:8051/masters

image.png

TServer的web地址

http://node1:8051/tablet-servers

image.png

安装注意事项

给普通用户授予sudo出错

sudo: /etc/sudoers is world writable

解决方式:pkexec chmod 555 /etc/sudoers

启动kudu的时候报错

Failed to start Kudu Master Server. Return value: 1 [FAILED]
去日志文件中查看:
Service unavailable: Cannot initialize clock: Error reading clock. Clock considered
unsynchronized
解决:
第一步:首先检查是否有安装ntp:如果没有安装则使用以下命令安装:
yum -y install ntp
第二步:设置随机启动:
service ntpd start
chkconfig ntpd on

启动过程中报错

Invalid argument: Unable to initialize catalog manager: Failed to initialize sys
tables
async: on-disk master list
解决:
(1):停掉master和tserver
(2):删除掉之前所有的/export/servers/kudu/master/*和/export/servers/kudu/tserver/*

启动过程中报错

error: Could not create new FS layout: unable to create file system roots: unable to
write instance metadata: Call to mkstemp() failed on name template
/export/servers/kudu/master/instance.kudutmp.XXXXXX: Permission denied (error 13)
这是因为kudu默认使用kudu权限进行执行,可能遇到文件夹的权限不一致情况,更改文件夹权限即可

java 操作kudu

构建maven工程、导入依赖

<dependencies>  
   <dependency>
      <groupId>org.apache.kudu</groupId>
      <artifactId>kudu-client</artifactId>
      <version>1.6.0</version>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.12</version>
    </dependency>
</dependencies>

初始化方法

public class TestKudu {

    //声明全局变量 KuduClient后期通过它来操作kudu表
    private KuduClient kuduClient;
    //指定kuduMaster地址
    private String kuduMaster;
    //指定表名
    private String tableName;

    @Before
    public void init(){
        //初始化操作
        kuduMaster="node1:7051,node2:7051,node3:7051";
        //指定表名
        tableName="student";
        KuduClient.KuduClientBuilder kuduClientBuilder = new KuduClient.KuduClientBuilder(kuduMaster);
        kuduClientBuilder.defaultSocketReadTimeoutMs(10000);
        kuduClient=kuduClientBuilder.build();
    } 

创建表

    /**
     * 创建表
     */
    @Test
    public void createTable() throws KuduException {
        //判断表是否存在,不存在就构建
        if(!kuduClient.tableExists(tableName)){

            //构建创建表的schema信息-----就是表的字段和类型
            ArrayList<ColumnSchema> columnSchemas = new ArrayList<ColumnSchema>();
            columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32).key(true).build());
            columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING).build());
            columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("age", Type.INT32).build());
            columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("sex", Type.INT32).build());
            Schema schema = new Schema(columnSchemas);

            //指定创建表的相关属性
            CreateTableOptions options = new CreateTableOptions();
            ArrayList<String> partitionList = new ArrayList<String>();
            //指定kudu表的分区字段是什么
            partitionList.add("id");    //  按照 id.hashcode % 分区数 = 分区号
            options.addHashPartitions(partitionList,6);

            kuduClient.createTable(tableName,schema,options);
        }
    }

插入数据

    /**
     * 向表加载数据
     */
    @Test
    public void insertTable() throws KuduException {
        //向表加载数据需要一个kuduSession对象
        KuduSession kuduSession = kuduClient.newSession();
        kuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);

        //需要使用kuduTable来构建Operation的子类实例对象
        KuduTable kuduTable = kuduClient.openTable(tableName);

        for(int i=1;i<=10;i++){
            Insert insert = kuduTable.newInsert();
            PartialRow row = insert.getRow();
            row.addInt("id",i);
            row.addString("name","zhangsan-"+i);
            row.addInt("age",20+i);
            row.addInt("sex",i%2);

            kuduSession.apply(insert);//最后实现执行数据的加载操作
        }
    }

查询数据

/**
     * 查询表的数据结果
     */
    @Test
    public void queryData() throws KuduException {

        //构建一个查询的扫描器
        KuduScanner.KuduScannerBuilder kuduScannerBuilder = kuduClient.newScannerBuilder(kuduClient.openTable(tableName));
        ArrayList<String> columnsList = new ArrayList<String>();
        columnsList.add("id");
        columnsList.add("name");
        columnsList.add("age");
        columnsList.add("sex");
        kuduScannerBuilder.setProjectedColumnNames(columnsList);
        //返回结果集
        KuduScanner kuduScanner = kuduScannerBuilder.build();
        //遍历
        while (kuduScanner.hasMoreRows()){
            RowResultIterator rowResults = kuduScanner.nextRows();

             while (rowResults.hasNext()){
                 RowResult row = rowResults.next();
                 int id = row.getInt("id");
                 String name = row.getString("name");
                 int age = row.getInt("age");
                 int sex = row.getInt("sex");

                 System.out.println("id="+id+"  name="+name+"  age="+age+"  sex="+sex);
             }
        }

    }

修改数据

/**
     * 修改表的数据
     */
    @Test
    public void updateData() throws KuduException {
        //修改表的数据需要一个kuduSession对象
        KuduSession kuduSession = kuduClient.newSession();
        kuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);

        //需要使用kuduTable来构建Operation的子类实例对象
        KuduTable kuduTable = kuduClient.openTable(tableName);

        //Update update = kuduTable.newUpdate();
        Upsert upsert = kuduTable.newUpsert(); //如果id存在就表示修改,不存在就新增
        PartialRow row = upsert.getRow();
        row.addInt("id",100);
        row.addString("name","zhangsan-100");
        row.addInt("age",100);
        row.addInt("sex",0);

        kuduSession.apply(upsert);//最后实现执行数据的修改操作
    }

删除数据

/**
     * 删除数据
     */
    @Test
    public void deleteData() throws KuduException {
        //删除表的数据需要一个kuduSession对象
        KuduSession kuduSession = kuduClient.newSession();
        kuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);

        //需要使用kuduTable来构建Operation的子类实例对象
        KuduTable kuduTable = kuduClient.openTable(tableName);

        Delete delete = kuduTable.newDelete();
        PartialRow row = delete.getRow();
        row.addInt("id",100);


        kuduSession.apply(delete);//最后实现执行数据的删除操作
    }

删除表

    @Test
    public void dropTable() throws KuduException {

        if(kuduClient.tableExists(tableName)){
            kuduClient.deleteTable(tableName);
        }

    }

kudu分区方式

为了提供可扩展性,Kudu 表被划分为称为 tablet 的单元,并分布在许多 tablet servers 上。行总是属于单个tablet 。将行分配给 tablet 的方法由在表创建期间设置的表的分区决定。 kudu提供了3种分区方式。

Range Partitioning ( 范围分区 )

范围分区可以根据存入数据的数据量,均衡的存储到各个机器上,防止机器出现负载不均衡现象.

   /**
     * 测试分区:
     * RangePartition
     */
    @Test
    public void testRangePartition() throws KuduException {
        //设置表的schema
        LinkedList<ColumnSchema> columnSchemas = new LinkedList<ColumnSchema>();
        columnSchemas.add(newColumn("CompanyId", Type.INT32,true));
        columnSchemas.add(newColumn("WorkId", Type.INT32,false));
        columnSchemas.add(newColumn("Name", Type.STRING,false));
        columnSchemas.add(newColumn("Gender", Type.STRING,false));
        columnSchemas.add(newColumn("Photo", Type.STRING,false));

        //创建schema
        Schema schema = new Schema(columnSchemas);

        //创建表时提供的所有选项
        CreateTableOptions tableOptions = new CreateTableOptions();
        //设置副本数
        tableOptions.setNumReplicas(1);
        //设置范围分区的规则
        LinkedList<String> parcols = new LinkedList<String>();
        parcols.add("CompanyId");
        //设置按照那个字段进行range分区
        tableOptions.setRangePartitionColumns(parcols);

        /**
         * range
         *  0 < value < 10
         * 10 <= value < 20
         * 20 <= value < 30
         * ........
         * 80 <= value < 90
         * */
        int count=0;
        for(int i =0;i<10;i++){
            //范围开始
            PartialRow lower = schema.newPartialRow();
            lower.addInt("CompanyId",count);

            //范围结束
            PartialRow upper = schema.newPartialRow();
            count +=10;
            upper.addInt("CompanyId",count);

            //设置每一个分区的范围
            tableOptions.addRangePartition(lower,upper);
        }

        try {
            kuduClient.createTable("student",schema,tableOptions);
        } catch (KuduException e) {
            e.printStackTrace();
        }
         kuduClient.close();

    }

Hash Partitioning ( 哈希分区 )

哈希分区通过哈希值将行分配到许多 buckets ( 存储桶 )之一; 哈希分区是一种有效的策略,当不需要对表进行有序访问时。哈希分区对于在 tablet 之间随机散布这些功能是有效的,这有助于减轻热点和 tablet 大小不均匀。

/**
     * 测试分区:
     * hash分区
     */
    @Test
    public void testHashPartition() throws KuduException {
        //设置表的schema
        LinkedList<ColumnSchema> columnSchemas = new LinkedList<ColumnSchema>();
        columnSchemas.add(newColumn("CompanyId", Type.INT32,true));
        columnSchemas.add(newColumn("WorkId", Type.INT32,false));
        columnSchemas.add(newColumn("Name", Type.STRING,false));
        columnSchemas.add(newColumn("Gender", Type.STRING,false));
        columnSchemas.add(newColumn("Photo", Type.STRING,false));

        //创建schema
        Schema schema = new Schema(columnSchemas);

        //创建表时提供的所有选项
        CreateTableOptions tableOptions = new CreateTableOptions();
        //设置副本数
        tableOptions.setNumReplicas(1);
        //设置范围分区的规则
        LinkedList<String> parcols = new LinkedList<String>();
        parcols.add("CompanyId");
        //设置按照那个字段进行range分区
        tableOptions.addHashPartitions(parcols,6);
        try {
            kuduClient.createTable("dog",schema,tableOptions);
        } catch (KuduException e) {
            e.printStackTrace();
        }

        kuduClient.close();
    }

Multilevel Partitioning ( 多级分区 )

Kudu 允许一个表在单个表上组合多级分区。 当正确使用时,多级分区可以保留各个分区类型的优点,同时减少每个分区的缺点 需求

/**
     * 测试分区:
     * 多级分区
     * Multilevel Partition
     * 混合使用hash分区和range分区
     *
     * 哈希分区有利于提高写入数据的吞吐量,而范围分区可以避免tablet无限增长问题,
     * hash分区和range分区结合,可以极大的提升kudu的性能
     */
    @Test
    public void testMultilevelPartition() throws KuduException {
        //设置表的schema
        LinkedList<ColumnSchema> columnSchemas = new LinkedList<ColumnSchema>();
        columnSchemas.add(newColumn("CompanyId", Type.INT32,true));
        columnSchemas.add(newColumn("WorkId", Type.INT32,false));
        columnSchemas.add(newColumn("Name", Type.STRING,false));
        columnSchemas.add(newColumn("Gender", Type.STRING,false));
        columnSchemas.add(newColumn("Photo", Type.STRING,false));

        //创建schema
        Schema schema = new Schema(columnSchemas);
        //创建表时提供的所有选项
        CreateTableOptions tableOptions = new CreateTableOptions();
        //设置副本数
        tableOptions.setNumReplicas(1);
        //设置范围分区的规则
        LinkedList<String> parcols = new LinkedList<String>();
        parcols.add("CompanyId");

        //hash分区
        tableOptions.addHashPartitions(parcols,5);

        //range分区
        int count=0;
        for(int i=0;i<10;i++){
            PartialRow lower = schema.newPartialRow();
            lower.addInt("CompanyId",count);
            count+=10;

            PartialRow upper = schema.newPartialRow();
            upper.addInt("CompanyId",count);
            tableOptions.addRangePartition(lower,upper);
        }

        try {
            kuduClient.createTable("cat",schema,tableOptions);
        } catch (KuduException e) {
            e.printStackTrace();
        }
        kuduClient.close();


    }

spark操作kudu

到目前为止,我们已经听说过几个上下文,例如SparkContext,SQLContext,HiveContext, SparkSession,现在,我们将使用Kudu引入一个KuduContext。这是可在Spark应用程序中广播的主要可序列化对象。此类代表在Spark执行程序中与Kudu Java客户端进行交互。 KuduContext提供执行DDL操作所需的方法,与本机Kudu RDD的接口,对数据执行更新/插入/删除,将数据类型从Kudu转换为Spark等。

引入依赖

<repositories>
    <repository>
        <id>cloudera</id>
        <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
    </repository>
</repositories>   

<dependencies>
        <dependency>
            <groupId>org.apache.kudu</groupId>
            <artifactId>kudu-client-tools</artifactId>
            <version>1.6.0-cdh5.14.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kudu</groupId>
            <artifactId>kudu-client</artifactId>
            <version>1.6.0-cdh5.14.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.kudu/kudu-spark2 -->
        <dependency>
            <groupId>org.apache.kudu</groupId>
            <artifactId>kudu-spark2_2.11</artifactId>
            <version>1.6.0-cdh5.14.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>
</dependencies>

创建表

定义kudu的表需要分成5个步骤:

1:提供表名

2:提供schema

3:提供主键

4:定义重要选项;例如:定义分区的schema

5:调用create Table api

object SparkKuduTest {
  def main(args: Array[String]): Unit = {
    //构建sparkConf对象
     val sparkConf: SparkConf = new SparkConf().setAppName("SparkKuduTest").setMaster("local[2]")

    //构建SparkSession对象
    val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

    //获取sparkContext对象
      val sc: SparkContext = sparkSession.sparkContext
      sc.setLogLevel("warn")

    //构建KuduContext对象
      val kuduContext = new KuduContext("node1:7051,node2:7051,node3:7051",sc)

    //1.创建表操作
    createTable(kuduContext)

    /**
    * 创建表
    * @param kuduContext
    * @return
    */
  private def createTable(kuduContext: KuduContext) = {

    //1.1定义表名
    val tableName = "spark_kudu"

    //1.2 定义表的schema
    val schema = StructType(
        StructField("userId", StringType, false) ::
        StructField("name", StringType, false) ::
        StructField("age", IntegerType, false) ::
        StructField("sex", StringType, false) :: Nil)

    //1.3 定义表的主键
    val primaryKey = Seq("userId")

    //1.4 定义分区的schema
    val options = new CreateTableOptions
    //设置分区
    options.setRangePartitionColumns(List("userId").asJava)
    //设置副本
    options.setNumReplicas(1)

    //1.5 创建表
    if(!kuduContext.tableExists(tableName)){
      kuduContext.createTable(tableName, schema, primaryKey, options)
    }

  }

}

定义表时要注意的是Kudu表选项值。你会注意到在指定组成范围分区列的列名列表时我们调用“asJava”方法。这是因为在这里,我们调用了Kudu Java客户端本身,它需要Java对象(即java.util.List)而不是Scala的List对象;(要使“asJava”方法可用,请记住导入JavaConverters库。)

创建表后,通过将浏览器指向http// master主机名:8051/tables来查看Kudu主UI可以找到创建的表,通过单击表ID,能够看到表模式和分区信息。

image.png

点击Table id 可以观察到表的schema等信息:

image.png

dataFrame操作kudu

Kudu支持许多DML类型的操作,其中一些操作包含在Spark on Kudu集成.

包括:

INSERT – 将DataFrame的行插入Kudu表。请注意,虽然API完全支持INSERT,但不鼓励在Spark中使用它。使用INSERT是有风险的,因为Spark任务可能需要重新执行,这意味着可能要求再次插入已插入的行。这样做会导致失败,因为如果行已经存在,INSERT将不允许插入行(导致失败)。相反,我们鼓励使用下面描述的INSERT_IGNORE。

INSERT-IGNORE – 将DataFrame的行插入Kudu表。如果表存在,则忽略插入动作。

DELETE – 从Kudu表中删除DataFrame中的行

UPSERT – 如果存在,则在Kudu表中更新DataFrame中的行,否则执行插入操作。

UPDATE – 更新dataframe中的行

插入数据insert操作

先创建一张表,然后把数据插入到表中。

case class People(id:Int,name:String,age:Int)
object DataFrameKudu {
  def main(args: Array[String]): Unit = {
      //构建SparkConf对象
     val sparkConf: SparkConf = new SparkConf().setAppName("DataFrameKudu").setMaster("local[2]")
     //构建SparkSession对象
     val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
     //获取SparkContext对象
     val sc: SparkContext = sparkSession.sparkContext
    sc.setLogLevel("warn")
      //指定kudu的master地址
     val kuduMaster="node1:7051,node2:7051,node3:7051"
      //构建KuduContext对象
     val kuduContext = new KuduContext(kuduMaster,sc)

       //定义表名
       val tableName="people"
       //1、创建表
      createTable(kuduContext, tableName)

      //2、插入数据到表中
    insertData2table(sparkSession,sc, kuduContext, tableName)

  }

 /**
    * 创建表
    * @param kuduContext
    * @param tableName
    */
  private def createTable(kuduContext: KuduContext, tableName: String): Unit = {
    //定义表的schema
    val schema = StructType(
        StructField("id", IntegerType, false) ::
        StructField("name", StringType, false) ::
        StructField("age", IntegerType, false) :: Nil
    )

    //定义表的主键
    val tablePrimaryKey = List("id")

    //定义表的选项配置
    val options = new CreateTableOptions
    options.setRangePartitionColumns(List("id").asJava)
    options.setNumReplicas(1)

    //创建表
    if (!kuduContext.tableExists(tableName)) {
      kuduContext.createTable(tableName, schema, tablePrimaryKey, options)
    }
  } 

      /**
    * 插入数据到表中
    * @param sparkSession
    * @param sc
    * @param kuduContext
    * @param tableName
    */
  private def insertData2table(sparkSession:SparkSession,sc: SparkContext, kuduContext: KuduContext, tableName: String): Unit = {
    //准备数据
    val data = List(People(1, "zhangsan", 20), People(2, "lisi", 30), People(3, "wangwu", 40))
    val peopleRDD: RDD[People] = sc.parallelize(data)
    import sparkSession.implicits._
    val peopleDF: DataFrame = peopleRDD.toDF
    kuduContext.insertRows(peopleDF, tableName)


  }

}

删除数据delete操作

  /**
    * 删除表的数据
    * @param sparkSession
    * @param sc
    * @param kuduMaster
    * @param kuduContext
    * @param tableName
    */
  private def deleteData(sparkSession: SparkSession, sc: SparkContext, kuduMaster: String, kuduContext: KuduContext, tableName: String): Unit = {
    //定义一个map集合,封装kudu的相关信息
    val options = Map(
      "kudu.master" -> kuduMaster,
      "kudu.table" -> tableName
    )

    import sparkSession.implicits._
    val data = List(People(1, "zhangsan", 20), People(2, "lisi", 30), People(3, "wangwu", 40))
    val dataFrame: DataFrame = sc.parallelize(data).toDF
    dataFrame.createTempView("temp")
    //获取年龄大于30的所有用户id
    val result: DataFrame = sparkSession.sql("select id from temp where age >30")
    //删除对应的数据,这里必须要是主键字段
    kuduContext.deleteRows(result, tableName)
  }

更新数据upsert操作

/**
    * 更新数据--添加数据
    *
    * @param sc
    * @param kuduMaster
    * @param kuduContext
    * @param tableName
    */
  private def UpsertData(sparkSession: SparkSession,sc: SparkContext, kuduMaster: String, kuduContext: KuduContext, tableName: String): Unit = {
    //更新表中的数据
    //定义一个map集合,封装kudu的相关信息
    val options = Map(
      "kudu.master" -> kuduMaster,
      "kudu.table" -> tableName
    )

    import sparkSession.implicits._
    val data = List(People(1, "zhangsan", 50), People(5, "tom", 30))
    val dataFrame: DataFrame = sc.parallelize(data).toDF
    //如果存在就是更新,否则就是插入
    kuduContext.upsertRows(dataFrame, tableName)
  }

更新数据update操作

  /**
    * 更新数据
    * @param sparkSession
    * @param sc
    * @param kuduMaster
    * @param kuduContext
    * @param tableName
    */
  private def updateData(sparkSession: SparkSession,sc: SparkContext, kuduMaster: String, kuduContext: KuduContext, tableName: String): Unit = {
    //定义一个map集合,封装kudu的相关信息
    val options = Map(
      "kudu.master" -> kuduMaster,
      "kudu.table" -> tableName
    )

    import sparkSession.implicits._
    val data = List(People(1, "zhangsan", 60), People(6, "tom", 30))
    val dataFrame: DataFrame = sc.parallelize(data).toDF
    //如果存在就是更新,否则就是报错
    kuduContext.updateRows(dataFrame, tableName)
  }

DataFrame API读取kudu数据

虽然我们可以通过上面显示的KuduContext执行大量操作,但我们还可以直接从默认数据源本身调用读/写API。要设置读取,我们需要为Kudu表指定选项,命名我们要读取的表以及为表提供服务的Kudu集群的Kudu主服务器列表。

  /**
    * 使用DataFrameApi读取kudu表中的数据
    * @param sparkSession
    * @param kuduMaster
    * @param tableName
    */
  private def getTableData(sparkSession: SparkSession, kuduMaster: String, tableName: String): Unit = {
     //定义map集合,封装kudu的master地址和要读取的表名
    val options = Map(
      "kudu.master" -> kuduMaster,
      "kudu.table" -> tableName
    )
    sparkSession.read.options(options).kudu.show()
  }

DataFrameApi写数据到kudu表

在通过DataFrame API编写时,目前只支持一种模式“append”。尚未实现的“覆盖”模式。

  /**
    * DataFrame api 写数据到kudu表
    * @param sparkSession
    * @param sc
    * @param kuduMaster
    * @param tableName
    */
  private def dataFrame2kudu(sparkSession: SparkSession, sc: SparkContext, kuduMaster: String, tableName: String): Unit = {
 //定义map集合,封装kudu的master地址和要读取的表名
    val options = Map(
      "kudu.master" -> kuduMaster,
      "kudu.table" -> tableName
    )
    val data = List(People(7, "jim", 30), People(8, "xiaoming", 40))
    import sparkSession.implicits._
    val dataFrame: DataFrame = sc.parallelize(data).toDF
    //把dataFrame结果写入到kudu表中  ,目前只支持append追加
    dataFrame.write.options(options).mode("append").kudu

    //查看结果
    //导包
    import org.apache.kudu.spark.kudu._
   //加载表的数据,导包调用kudu方法,转换为dataFrame,最后在使用show方法显示结果
   sparkSession.read.options(options).kudu.show()
  }

使用sparksql操作kudu

可以选择使用Spark SQL直接使用INSERT语句写入Kudu表;与’append’类似,INSERT语句实际上将默认使用 UPSERT语义处理.

/**
    * 使用sparksql操作kudu表
    * @param sparkSession
    * @param sc
    * @param kuduMaster
    * @param tableName
    */
  private def SparkSql2Kudu(sparkSession: SparkSession, sc: SparkContext, kuduMaster: String, tableName: String): Unit = {
   //定义map集合,封装kudu的master地址和表名
    val options = Map(
      "kudu.master" -> kuduMaster,
      "kudu.table" -> tableName
    )
    val data = List(People(10, "小张", 30), People(11, "小王", 40))
    import sparkSession.implicits._
    val dataFrame: DataFrame = sc.parallelize(data).toDF
      //把dataFrame注册成一张表
    dataFrame.createTempView("temp1")

    //获取kudu表中的数据,然后注册成一张表
    sparkSession.read.options(options).kudu.createTempView("temp2")
      //使用sparkSQL的insert操作插入数据
    sparkSession.sql("insert into table temp2 select * from temp1")
    sparkSession.sql("select * from temp2 where age >30").show()
  }

kudu native RDD

Spark与Kudu的集成同时提供了kudu RDD.

    //使用kuduContext对象调用kuduRDD方法,需要sparkContext对象,表名,想要的字段名称
   val kuduRDD: RDD[Row] = kuduContext.kuduRDD(sc,tableName,Seq("name","age"))
    //操作该rdd 打印输出
    val result: RDD[(String, Int)] = kuduRDD.map {
      case Row(name: String, age: Int) => (name, age)
    }
    result.foreach(println)

kudu集成impala

impala配置修改

在每一个服务器的impala的配置文件中添加如下配置。

vim /etc/default/impala

在IMPALA_SERVER_ARGS下添加:

-kudu_master_hosts=node1:7051,node2:7051,node3:7051

创建kudu表

需要先启动hdfs、hive、kudu、impala。使用impala的shell控制台。

image.png

内部表

内部表由Impala管理,当您从Impala中删除时,数据和表确实被删除。当您使用Impala创建新表时,它通常是内部表。

CREATE TABLE my_first_table
(
id BIGINT,
name STRING,
PRIMARY KEY(id)
)
PARTITION BY HASH PARTITIONS 16 
STORED AS KUDU
TBLPROPERTIES (
'kudu.master_addresses' = 'node1:7051,node2:7051,node3:7051',
'kudu.table_name' = 'my_first_table'
);

在 CREATE TABLE 语句中,必须首先列出构成主键的列。

外部表

外部表(创建者CREATE EXTERNAL TABLE)不受Impala管理,并且删除此表不会将表从其源位置(此处为Kudu)丢弃。相反,它只会去除Impala和Kudu之间的映射。这是Kudu提供的用于将现有表映射到Impala的语法。

首先使用java创建kudu表:

public class CreateTable {
        private static ColumnSchema newColumn(String name, Type type, boolean iskey) {
                ColumnSchema.ColumnSchemaBuilder column = new
                    ColumnSchema.ColumnSchemaBuilder(name, type);
                column.key(iskey);
                return column.build();
        }
    public static void main(String[] args) throws KuduException {
        // master地址
        final String masteraddr = "node1,node2,node3";
        // 创建kudu的数据库链接
        KuduClient client = new
     KuduClient.KuduClientBuilder(masteraddr).defaultSocketReadTimeoutMs(6000).build();

        // 设置表的schema
        List<ColumnSchema> columns = new LinkedList<ColumnSchema>();
        columns.add(newColumn("CompanyId", Type.INT32, true));
        columns.add(newColumn("WorkId", Type.INT32, false));
        columns.add(newColumn("Name", Type.STRING, false));
        columns.add(newColumn("Gender", Type.STRING, false));
        columns.add(newColumn("Photo", Type.STRING, false));
        Schema schema = new Schema(columns);
    //创建表时提供的所有选项
    CreateTableOptions options = new CreateTableOptions();

    // 设置表的replica备份和分区规则
    List<String> parcols = new LinkedList<String>();

    parcols.add("CompanyId");
    //设置表的备份数
        options.setNumReplicas(1);
    //设置range分区
    options.setRangePartitionColumns(parcols);

    //设置hash分区和数量
    options.addHashPartitions(parcols, 3);
    try {
    client.createTable("person", schema, options);
    } catch (KuduException e) {
    e.printStackTrace();
    }
    client.close();
    }
}

使用impala创建外部表 , 将kudu的表映射到impala上

CREATE EXTERNAL TABLE `person` STORED AS KUDU
TBLPROPERTIES(
    'kudu.table_name' = 'person',
    'kudu.master_addresses' = 'node1:7051,node2:7051,node3:7051')

image.png

使用impala对kudu进行DML

插入数据

impala 允许使用标准 SQL 语句将数据插入 Kudu 。

首先建表:

CREATE TABLE my_first_table1
(
id BIGINT,
name STRING,
PRIMARY KEY(id)
)
PARTITION BY HASH PARTITIONS 16
STORED AS KUDU 
TBLPROPERTIES(
    'kudu.table_name' = 'person1',
    'kudu.master_addresses' = 'node1:7051,node2:7051,node3:7051');

此示例插入单个行:

INSERT INTO my_first_table VALUES (50, “zhangsan”);

image.png

此示例插入3行:

INSERT INTO my_first_table VALUES (1, “john”), (2, “jane”), (3, “jim”);

image.png

批量导入数据:

从Impala 和 Kudu 的角度来看,通常表现最好的方法通常是使用 Impala 中的 SELECT FROM 语句导入数据。

INSERT INTO my_first_table SELECT * FROM temp1;

更新数据

UPDATE my_first_table SET name=”xiaowang” where id =1 ;

image.png

删除数据

delete from my_first_table where id =2;

image.png

更改表属性

重命名impala表

ALTER TABLE PERSON RENAME TO person_temp;

image.png

重新命名内部表的基础kudu表

创建内部表:

CREATE TABLE kudu_student
(
CompanyId INT,
WorkId INT,
Name STRING,
Gender STRING,
Photo STRING,
PRIMARY KEY(CompanyId)
)
PARTITION BY HASH PARTITIONS 16
STORED AS KUDU
TBLPROPERTIES (
'kudu.master_addresses' = 'node1:7051,node2:7051,node3:7051',
'kudu.table_name' = 'student'
);

如果表是内部表,则可以通过更改kudu.table_name 属性重命名底层的 Kudu 表。

ALTER TABLE kudu_student SET TBLPROPERTIES(‘kudu.table_name’ = ‘new_student’);

将外部表重新映射kudu表

如果用户在使用过程中发现其他应用程序重新命名了kudu表,那么此时的外部表需要重新映射到kudu上。

首先创建一个外部表:

CREATE EXTERNAL TABLE external_table
    STORED AS KUDU
    TBLPROPERTIES (
    'kudu.master_addresses' = 'node1:7051,node2:7051,node3:7051',
    'kudu.table_name' = 'person'
);

重新映射外部表,指向不同的kudu表:

ALTER TABLE external_table
SET TBLPROPERTIES('kudu.table_name' = 'hashTable')

上面的操作是:将external_table映射的PERSON表重新指向hashTable表。

更改kudu master地址

ALTER TABLE my_table
SET TBLPROPERTIES('kudu.master_addresses' = 'kudu-new-master.example.com:7051');

将内部表改为外部表

ALTER TABLE my_table SET TBLPROPERTIES('EXTERNAL' = 'TRUE');

impala使用java操作kudu

对于impala而言,开发人员是可以通过JDBC连接impala的,有了JDBC,开发人员可以通过impala来间接操作 kudu。

引入依赖

       <!--impala的jdbc操作--> 
       <dependency>
            <groupId>com.cloudera</groupId>
            <artifactId>ImpalaJDBC41</artifactId>
            <version>2.5.42</version>
        </dependency>

        <!--Caused by : ClassNotFound : thrift.protocol.TPro-->
        <dependency>
            <groupId>org.apache.thrift</groupId>
            <artifactId>libfb303</artifactId>
            <version>0.9.3</version>
            <type>pom</type>
        </dependency>

        <!--Caused by : ClassNotFound : thrift.protocol.TPro-->
        <dependency>
            <groupId>org.apache.thrift</groupId>
            <artifactId>libthrift</artifactId>
            <version>0.9.3</version>
            <type>pom</type>
        </dependency>

        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-jdbc</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.hive</groupId>
                    <artifactId>hive-service-rpc</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.hive</groupId>
                    <artifactId>hive-service</artifactId>
                </exclusion>
            </exclusions>
            <version>1.1.0</version>
        </dependency>

        <!--导入hive-->
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-service</artifactId>
            <version>1.1.0</version>
        </dependency>

jdbc连接impala操作kudu

使用JDBC连接impala操作kudu,与JDBC连接mysql做更重增删改查基本一样。

创建实体类

package cn.itcast.impala.impala;

public class Person {
    private int companyId;
    private int workId;
    private  String name;
    private  String gender;
    private  String photo;

    public Person(int companyId, int workId, String name, String gender, String photo) {
        this.companyId = companyId;
        this.workId = workId;
        this.name = name;
        this.gender = gender;
        this.photo = photo;
    }

    public int getCompanyId() {
        return companyId;
    }

    public void setCompanyId(int companyId) {
        this.companyId = companyId;
    }

    public int getWorkId() {
        return workId;
    }

    public void setWorkId(int workId) {
        this.workId = workId;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getGender() {
        return gender;
    }

    public void setGender(String gender) {
        this.gender = gender;
    }

    public String getPhoto() {
        return photo;
    }

    public void setPhoto(String photo) {
        this.photo = photo;
    }
}

JDBC连接impala对kudu进行增删改查

package cn.itcast.impala.impala;

import java.sql.*;

public class Contants {
    private static String JDBC_DRIVER="com.cloudera.impala.jdbc41.Driver";
    private static  String CONNECTION_URL="jdbc:impala://node1:21050/default;auth=noSasl";
     //定义数据库连接
    static Connection conn=null;
    //定义PreparedStatement对象
    static PreparedStatement ps=null;
    //定义查询的结果集
    static ResultSet rs= null;


    //数据库连接
    public static Connection getConn(){
        try {
            Class.forName(JDBC_DRIVER);
            conn=DriverManager.getConnection(CONNECTION_URL);
        } catch (Exception e) {
            e.printStackTrace();
        }

        return  conn;

    }

    //创建一个表
    public static void createTable(){
        conn=getConn();
        String sql="CREATE TABLE impala_kudu_test" +
                "(" +
                "companyId BIGINT," +
                "workId BIGINT," +
                "name STRING," +
                "gender STRING," +
                "photo STRING," +
                "PRIMARY KEY(companyId)" +
                ")" +
                "PARTITION BY HASH PARTITIONS 16 " +
                "STORED AS KUDU " +
                "TBLPROPERTIES (" +
                "'kudu.master_addresses' = 'node1:7051,node2:7051,node3:7051'," +
                "'kudu.table_name' = 'impala_kudu_test'" +
                ");";

        try {
            ps = conn.prepareStatement(sql);
            ps.execute();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }


    //查询数据
    public static ResultSet queryRows(){
        try {
            //定义执行的sql语句
            String sql="select * from impala_kudu_test";
            ps = getConn().prepareStatement(sql);
            rs= ps.executeQuery();
        } catch (SQLException e) {
            e.printStackTrace();
        }

        return  rs;
    }

    //打印结果
    public  static void printRows(ResultSet rs){
        /**
         private int companyId;
         private int workId;
         private  String name;
         private  String gender;
         private  String photo;
         */

        try {
            while (rs.next()){
                //获取表的每一行字段信息
                int companyId = rs.getInt("companyId");
                int workId = rs.getInt("workId");
                String name = rs.getString("name");
                String gender = rs.getString("gender");
                String photo = rs.getString("photo");
                System.out.print("companyId:"+companyId+" ");
                System.out.print("workId:"+workId+" ");
                System.out.print("name:"+name+" ");
                System.out.print("gender:"+gender+" ");
                System.out.println("photo:"+photo);

            }
        } catch (SQLException e) {
            e.printStackTrace();
        }finally {
            if(ps!=null){
                try {
                    ps.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }

            if(conn !=null){
                try {
                    conn.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
    }


    //插入数据
    public static void insertRows(Person person){
        conn=getConn();
        String sql="insert into table impala_kudu_test(companyId,workId,name,gender,photo) values(?,?,?,?,?)";

        try {
            ps=conn.prepareStatement(sql);
            //给占位符?赋值
            ps.setInt(1,person.getCompanyId());
            ps.setInt(2,person.getWorkId());
            ps.setString(3,person.getName());
            ps.setString(4,person.getGender());
            ps.setString(5,person.getPhoto());
            ps.execute();

        } catch (SQLException e) {
            e.printStackTrace();
        }finally {
            if(ps !=null){
                try {
                    //关闭
                    ps.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }

            if(conn !=null){
                try {
                      //关闭
                    conn.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }

    }

    //更新数据
    public static void updateRows(Person person){
       //定义执行的sql语句
        String sql="update impala_kudu_test set workId="+person.getWorkId()+
                ",name='"+person.getName()+"' ,"+"gender='"+person.getGender()+"' ,"+
                "photo='"+person.getPhoto()+"' where companyId="+person.getCompanyId();

        try {
            ps= getConn().prepareStatement(sql);
            ps.execute();
        } catch (SQLException e) {
            e.printStackTrace();
        }finally {
            if(ps !=null){
                try {
                      //关闭
                    ps.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }

            if(conn !=null){
                try {
                      //关闭
                    conn.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    //删除数据
    public   static void deleteRows(int companyId){

        //定义sql语句
        String sql="delete from impala_kudu_test where companyId="+companyId;
        try {
            ps =getConn().prepareStatement(sql);
            ps.execute();
        } catch (SQLException e) {
            e.printStackTrace();

        }
    }

   //删除表
    public static void dropTable() {
        String sql="drop table if exists impala_kudu_test";
        try {
            ps =getConn().prepareStatement(sql);
            ps.execute();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}

代码测试运行

package cn.itcast.impala.impala;

import java.sql.Connection;

public class ImpalaJdbcClient {
    public static void main(String[] args) {
        Connection conn = Contants.getConn();

        //创建一个表
       Contants.createTable();

        //插入数据
       Contants.insertRows(new Person(1,100,"lisi","male","lisi-photo"));

        //查询表的数据
        ResultSet rs = Contants.queryRows();
        Contants.printRows(rs);

        //更新数据
        Contants.updateRows(new Person(1,200,"zhangsan","male","zhangsan-photo"));

        //删除数据
        Contants.deleteRows(1);

        //删除表
        Contants.dropTable();

    }
}

Apache Kudu原理

table与schema

Kudu设计是面向结构化存储的,因此,Kudu的表需要用户在建表时定义它的Schema信息,这些Schema信息包含:列定义(含类型),Primary Key定义(用户指定的若干个列的有序组合)。数据的唯一性,依赖于用户所提供的Primary Key中的Column组合的值的唯一性。Kudu提供了Alter命令来增删列,但位于Primary Key中的列是不允许删除的。

从用户角度来看,Kudu是一种存储结构化数据表的存储系统。在一个Kudu集群中可以定义任意数量的table,每个table都需要预先定义好schema。每个table的列数是确定的,每一列都需要有名字和类型,每个表中可以把其中一列或多列定义为主键。这么看来,Kudu更像关系型数据库,而不是像HBase、Cassandra和MongoDB这些NoSQL数据库。不过Kudu目前还不能像关系型数据一样支持二级索引。

Kudu使用确定的列类型,而不是类似于NoSQL的“everything is byte”。带来好处:确定的列类型使Kudu可以进行类型特有的编码,可以提供元数据给其他上层查询工具。

kudu底层数据模型

Kudu的底层数据文件的存储,未采用HDFS这样的较高抽象层次的分布式文件系统,而是自行开发了一套可基于Table/Tablet/Replica视图级别的底层存储系统。

这套实现基于如下的几个设计目标:

• 可提供快速的列式查询

• 可支持快速的随机更新

• 可提供更为稳定的查询性能保障

image.png

一张table会分成若干个tablet,每个tablet包括MetaData元信息及若干个RowSet。

RowSet包含一个MemRowSet及若干个DiskRowSet,DiskRowSet中包含一个BloomFile、Ad_hoc Index、BaseData、DeltaMem及若干个RedoFile和UndoFile。

MemRowSet:用于新数据insert及已在MemRowSet中的数据的更新,一个MemRowSet写满后会将数据刷到磁盘形成若干个DiskRowSet。默认是1G或者或者120S。

DiskRowSet:用于老数据的变更,后台定期对DiskRowSet做compaction,以删除没用的数据及合并历史数据,减少查询过程中的IO开销。

BloomFile:根据一个DiskRowSet中的key生成一个bloom filter,用于快速模糊定位某个key是否在DiskRowSet中。

Ad_hocIndex:是主键的索引,用于定位key在DiskRowSet中的具体哪个偏移位置。

BaseData是MemRowSet flush下来的数据,按列存储,按主键有序。

UndoFile是基于BaseData之前时间的历史数据,通过在BaseData上apply UndoFile中的记录,可以获得历史数据。

RedoFile是基于BaseData之后时间的变更记录,通过在BaseData上apply RedoFile中的记录,可获得较新的数据。

DeltaMem用于DiskRowSet中数据的变更,先写到内存中,写满后flush到磁盘形成RedoFile。

REDO与UNDO与关系型数据库中的REDO与UNDO日志类似(在关系型数据库中,REDO日志记录了更新后的数据,可以用来恢复尚未写入Data File的已成功事务更新的数据。而UNDO日志用来记录事务更新之前的数据,可以用来在事务失败时进行回滚)

image.png

MemRowSets可以对比理解成HBase中的MemStore, 而DiskRowSets可理解成HBase中的HFile。

MemRowSets中的数据被Flush到磁盘之后,形成DiskRowSets。 DisRowSets中的数据,按照32MB大小为单位,按序划分为一个个的DiskRowSet。 DiskRowSet中的数据按照Column进行组织,与Parquet类似。

这是Kudu可支持一些分析性查询的基础。每一个Column的数据被存储在一个相邻的数据区域,而这个数据区域进一步被细分成一个个的小的Page单元,与HBase File中的Block类似,对每一个Column Page可采用一些Encoding算法,以及一些通用的Compression算法。 既然可对Column Page可采用Encoding以及Compression算法,那么,对单条记录的更改就会比较困难了。

前面提到了Kudu可支持单条记录级别的更新/删除,是如何做到的?

与HBase类似,也是通过增加一条新的记录来描述这次更新/删除操作的。DiskRowSet是不可修改了,那么 KUDU 要如何应对数据的更新呢?在KUDU中,把DiskRowSet分为了两部分:base data、delta stores。base data 负责存储基础数据,delta stores负责存储 base data 中的变更数据.

image.png

如上图所示,数据从MemRowSet 刷到磁盘后就形成了一份 DiskRowSet(只包含 base data),每份 DiskRowSet 在内存中都会有一个对应的 DeltaMemStore,负责记录此 DiskRowSet 后续的数据变更(更新、删除)。DeltaMemStore 内部维护一个 B-树索引,映射到每个 row_offset 对应的数据变更。DeltaMemStore 数据增长到一定程度后转化成二进制文件存储到磁盘,形成一个 DeltaFile,随着 base data 对应数据的不断变更,DeltaFile 逐渐增长。

tablet发现过程

当创建Kudu客户端时,其会从主服务器上获取tablet位置信息,然后直接与服务于该tablet的服务器进行交谈。

为了优化读取和写入路径,客户端将保留该信息的本地缓存,以防止他们在每个请求时需要查询主机的tablet位置信息。随着时间的推移,客户端的缓存可能会变得过时,并且当写入被发送到不再是tablet领导者的tablet服务器时,则将被拒绝。然后客户端将通过查询主服务器发现新领导者的位置来更新其缓存。

image.png

kudu写流程

当Client 请求写数据时,先根据主键从Master Server中获取要访问的目标 Tablets,然后到依次对应的Tablet获取数据。

因为KUDU表存在主键约束,所以需要进行主键是否已经存在的判断,这里就涉及到之前说的索引结构对读写的优化了。一个Tablet中存在很多个RowSets,为了提升性能,我们要尽可能地减少要扫描的RowSets数量。

首先,我们先通过每个RowSet 中记录的主键的(最大最小)范围,过滤掉一批不存在目标主键的RowSets,然后在根据RowSet中的布隆过滤器,过滤掉确定不存在目标主键的 RowSets,最后再通过RowSets中的 B-树索引,精确定位目标主键是否存在。

如果主键已经存在,则报错(主键重复),否则就进行写数据(写MemRowSet)。

image.png

kudu读流程

数据读取过程大致如下:先根据要扫描数据的主键范围,定位到目标的Tablets,然后读取Tablets 中的RowSets。

在读取每个RowSet时,先根据主键过滤要scan范围,然后加载范围内的base data,再找到对应的delta stores,应用所有变更,最后union上MemRowSet中的内容,返回数据给Client。

image.png

kudu更新流程

数据更新的核心是定位到待更新数据的位置,这块与写入的时候类似,就不展开了,等定位到具体位置后,然后将变更写到对应的delta store 中。

image.png

版权声明:除特殊说明,博客文章均为Mark原创,依据CC BY-SA 4.0许可证进行授权,转载请附上出处链接及本声明。VIP内容严禁转载! | 广告招租请留言
暂无评论

发送评论 编辑评论

|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇