首页 > 编程语言 >客快物流大数据项目(四十二):Java代码操作Kudu

客快物流大数据项目(四十二):Java代码操作Kudu

时间:2024-03-29 23:22:38浏览次数:26  
标签:Java org KuduException 客快 kuduClient void kudu Kudu public

Java代码操作Kudu

一、构建maven工程

二、导入依赖

三、​​​​​​​创建包结构

四、​​​​​​​初始化方法

五、​​​​​​​创建表

六、​​​​​​​插入数据

七、​​​​​​​查询数据

八、修改数据

九、​​​​​​​删除数据

十、​​​​​​​修改表

十一、​​​​​​​删除表

Java代码操作Kudu

一、​​​​​​​构建maven工程

二、导入依赖

<repositories>
    <repository>
        <id>cloudera</id>
        <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
    </repository>
</repositories>

<dependencies>
    <dependency>
        <groupId>org.apache.kudu</groupId>
        <artifactId>kudu-client</artifactId>
        <version>1.9.0-cdh6.2.1</version>
    </dependency>

    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>

    <dependency>
        <groupId>org.apache.kudu</groupId>
        <artifactId>kudu-client-tools</artifactId>
        <version>1.9.0-cdh6.2.1</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.kudu/kudu-spark2 -->
    <dependency>
        <groupId>org.apache.kudu</groupId>
        <artifactId>kudu-spark2_2.11</artifactId>
        <version>1.9.0-cdh6.2.1</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.1.0</version>
    </dependency>
</dependencies>

三、​​​​​​​创建包结构

包名

说明

cn.it

代码所在的包目录

四、​​​​​​​初始化方法

package cn.it;

import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Type;
import org.apache.kudu.client.KuduClient;
import org.junit.Before;

public class TestKudu {
    //定义KuduClient客户端对象
    private static KuduClient kuduClient;
    //定义表名
    private static String tableName = "person";

    /**
     * 初始化方法
     */
    @Before
    public void init() {
        //指定master地址
        String masterAddress = "node2.cn";
        //创建kudu的数据库连接
        kuduClient = new KuduClient.KuduClientBuilder(masterAddress).defaultSocketReadTimeoutMs(6000).build();
    }

    //构建表schema的字段信息
    //字段名称   数据类型     是否为主键
    public ColumnSchema newColumn(String name, Type type, boolean isKey) {
        ColumnSchema.ColumnSchemaBuilder column = new ColumnSchema.ColumnSchemaBuilder(name, type);
        column.key(isKey);
        return column.build();
    }
}

五、​​​​​​​创建表

/**  使用junit进行测试
 *
 * 创建表
 * @throws KuduException
 */
@Test
public void createTable() throws KuduException {
    //设置表的schema
    List<ColumnSchema> columns = new LinkedList<ColumnSchema>();
    columns.add(newColumn("CompanyId", Type.INT32, true));
    columns.add(newColumn("WorkId", Type.INT32, false));
    columns.add(newColumn("Name", Type.STRING, false));
    columns.add(newColumn("Gender", Type.STRING, false));
    columns.add(newColumn("Photo", Type.STRING, false));
    Schema schema = new Schema(columns);
    //创建表时提供的所有选项
    CreateTableOptions tableOptions = new CreateTableOptions();
    //设置表的副本和分区规则
    LinkedList<String> list = new LinkedList<String>();
    list.add("CompanyId");
    //设置表副本数
    tableOptions.setNumReplicas(1);
    //设置range分区
    //tableOptions.setRangePartitionColumns(list);
    //设置hash分区和分区的数量
    tableOptions.addHashPartitions(list, 3);
    try {
        kuduClient.createTable("person", schema, tableOptions);
    } catch (Exception e) {
        e.printStackTrace();
    }
    kuduClient.close();
}

六、​​​​​​​插入数据

/**
 * 向表中加载数据
 * @throws KuduException
 */
@Test
public void loadData() throws KuduException {
    //打开表
    KuduTable kuduTable = kuduClient.openTable(tableName);
    //创建KuduSession对象 kudu必须通过KuduSession写入数据
    KuduSession kuduSession = kuduClient.newSession();
    //采用flush方式 手动刷新
    kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
    kuduSession.setMutationBufferSpace(3000);
    //准备数据
    for(int i=1; i<=10; i++){
        Insert insert = kuduTable.newInsert();
        //设置字段的内容
        insert.getRow().addInt("CompanyId",i);
        insert.getRow().addInt("WorkId",i);
        insert.getRow().addString("Name","lisi"+i);
        insert.getRow().addString("Gender","male");
        insert.getRow().addString("Photo","person"+i);
        kuduSession.flush();
        kuduSession.apply(insert);
    }
    kuduSession.close();
    kuduClient.close();
}

七、​​​​​​​查询数据

 /**
 * 查询表数据
 * @throws KuduException
 */
@Test
public void queryData() throws KuduException {
    //打开表
    KuduTable kuduTable = kuduClient.openTable(tableName);
    //获取scanner扫描器
    KuduScanner.KuduScannerBuilder scannerBuilder = kuduClient.newScannerBuilder(kuduTable);
    KuduScanner scanner = scannerBuilder.build();
    //遍历
    while(scanner.hasMoreRows()){
        RowResultIterator rowResults = scanner.nextRows();
        while (rowResults.hasNext()){
            RowResult result = rowResults.next();
            int companyId = result.getInt("CompanyId");
            int workId = result.getInt("WorkId");
            String name = result.getString("Name");
            String gender = result.getString("Gender");
            String photo = result.getString("Photo");
            System.out.print("companyId:"+companyId+" ");
            System.out.print("workId:"+workId+" ");
            System.out.print("name:"+name+" ");
            System.out.print("gender:"+gender+" ");
            System.out.println("photo:"+photo);
        }
    }
    //关闭
    scanner.close();
    kuduClient.close();
}

八、修改数据

/**
 * 修改数据
 * @throws KuduException
 */
@Test
public void upDATEData() throws KuduException {
    //打开表
    KuduTable kuduTable = kuduClient.openTable(tableName);
    //构建kuduSession对象
    KuduSession kuduSession = kuduClient.newSession();
    //设置刷新数据模式,自动提交
    kuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);

    //更新数据需要获取UpDATE对象
    UpDATE upDATE = kuduTable.newUpDATE();
    //获取row对象
    PartialRow row = upDATE.getRow();
    //设置要更新的数据信息
    row.addInt("CompanyId",1);
    row.addString("Name","kobe");
    //操作这个upDATE对象
    kuduSession.apply(upDATE);
    kuduSession.close();
}

九、​​​​​​​删除数据

/**
 * 删除表中的数据
 */
@Test
public void deleteData() throws KuduException {
    //打开表
    KuduTable kuduTable = kuduClient.openTable(tableName);
    KuduSession kuduSession = kuduClient.newSession();
    //获取Delete对象
    Delete delete = kuduTable.newDelete();
    //构建要删除的行对象
    PartialRow row = delete.getRow();
    //设置删除数据的条件
    row.addInt("CompanyId",2);
    kuduSession.flush();
    kuduSession.apply(delete);
    kuduSession.close();
    kuduClient.close();
}

十、​​​​​​​修改表

package cn.it.kudu;

import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Type;
import org.apache.kudu.client.*;
import org.junit.Before;
import org.junit.Test;

import java.util.List;

/**
 * 修改表操作
 */
public class AlterTable {
    //定义kudu的客户端对象
    private static KuduClient kuduClient;
    //定义一张表名称
    private static String tableName = "person";

    /**
     * 初始化操作
     */
    @Before
    public void init() {
        //指定kudu的master地址
        String masterAddress = "node2.cn";
        //创建kudu的数据库连接
        kuduClient = new KuduClient.KuduClientBuilder(masterAddress).defaultSocketReadTimeoutMs(6000).build();
    }

    /**
     * 添加列
     */
    @Test
    public void alterTableAddColumn() {
        AlterTableOptions alterTableOptions = new AlterTableOptions();
        alterTableOptions.addColumn(new ColumnSchema.ColumnSchemaBuilder("Address", Type.STRING).nullable(true).build());
        try {
            kuduClient.alterTable(tableName, alterTableOptions);
        } catch (KuduException e) {
            e.printStackTrace();
       }
    }

    /**
     * 删除列
     */
    @Test
    public void alterTableDeleteColumn(){
        AlterTableOptions alterTableOptions = new AlterTableOptions().dropColumn("Address");
        try {
            kuduClient.alterTable(tableName, alterTableOptions);
        } catch (KuduException e) {
            e.printStackTrace();
       }
    }

    /**
     * 添加分区列
     */
    @Test
    public void alterTableAddRangePartition(){
        int lowerValue = 110;
        int upperValue = 120;
        try {
            KuduTable kuduTable = kuduClient.openTable(tableName);
            List<Partition> rangePartitions = kuduTable.getRangePartitions(6000);
            boolean flag = true;
            for (Partition rangePartition : rangePartitions) {
                int startKey = rangePartition.getDecodedRangeKeyStart(kuduTable).getInt("Id");
                if(startKey == lowerValue){
                    flag = false;
                }
            }
            if(flag) {
                PartialRow lower = kuduTable.getSchema().newPartialRow();
                lower.addInt("Id", lowerValue);
                PartialRow upper = kuduTable.getSchema().newPartialRow();
                upper.addInt("Id", upperValue);
                kuduClient.alterTable(tableName,new AlterTableOptions().addRangePartition(lower, upper));
            }else{
                System.out.println("分区已经存在,不能重复创建!");
            }
        } catch (KuduException e) {
            e.printStackTrace();
        } catch (Exception exception) {
            exception.printStackTrace();
        }
    }

    /**
     * 删除表
     * @throws KuduException
     */
    @Test
    public void dropTable() throws KuduException {
        kuduClient.deleteTable(tableName);
    }
}

十一、​​​​​​​删除表

/**
 * 删除表
 */
@Test
public void dropTable() throws KuduException {
    //删除表
    DeleteTableResponse response = kuduClient.deleteTable(tableName);
    //关闭客户端连接
    kuduClient.close();
}

标签:Java,org,KuduException,客快,kuduClient,void,kudu,Kudu,public
From: https://www.cnblogs.com/shan13936/p/18104825

相关文章

  • 客快物流大数据项目(五十一):数据库表分析 物流项目 数据库表设计
    数据库表分析一、物流运输管理数据库表1、揽件表(tbl_collect_package)2、客户表(tbl_customer)3、物流系统码表(tbl_codes)4、快递单据表(tbl_express_bill)5、快递包裹表(tbl_express_package)​​​​​​​6、客户地址表(tbl_address)​​​​​​​7、客户寄件信息表(tbl_consumer......
  • 客快物流大数据项目(八十五):实时OLAP分析需求 一些组件的特点 一般有用 看1
    ​实时OLAP分析需求一、​​​​​​​背景介绍在之前的文章学习了离线数仓的构建,但是离线数仓的最大问题即:慢,数据无法实时的通过可视化页面展示出来,通常离线数仓分析的是“T+1”的数据,针对于时效性要求比较高的场景,则无法满足需求,例如:快速实时返回“分组+聚合计算+排序聚合指标......
  • 客快物流大数据项目(六十八):工作流调度 azkaban介绍及用法 一般有用 图片偏多 看1
    工作流调度一、工作流产生背景工作流(Workflow),指“业务过程的部分或整体在计算机应用环境下的自动化”。是对工作流程及其各操作步骤之间业务规则的抽象、概括描述。工作流解决的主要问题是:为了实现某个业务目标,利用计算机软件在多个参与者之间按某种预定规则自动传递文档、信息......
  • 客快物流大数据项目(六十二):主题及指标开发 common包下定义的一些内容 一般有用 看1
    主题及指标开发一、主题开发业务流程二、离线模块初始化1、创建包结构2、​​​​​​​创建时间处理工具3、​​​​​​​定义主题宽表及指标结果表的表名4、​​​​​​​物流字典码表数据类型定义枚举类5、​​​​​​​封装公共接口主题及指标开发一、主题开发业......
  • Java 实现缓存的三种方式
    Java实现缓存的三种方式文章目录Java实现缓存的三种方式一、`HashMap`实现缓存`Step-1`:实现一个缓存管理类`Step-2`:将缓存管理类交给`Spring`进行管理`Step-3`:编写接口测试缓存`Step-4`:结果展示二、`guavalocalcache`实现`Step-1`:导入`guava`依赖`Step-2`:使用`......
  • Java 抽象类、接口、内部类
    抽象类Java中的抽象类是一种不能被实例化的类,它用于定义子类必须实现的方法和属性。以下是一些关于Java抽象类的关键点:抽象方法:抽象类可以包含抽象方法,这些方法只有声明没有实现,且必须在任何非抽象子类中被覆写实现。构造方法:虽然抽象类不能直接实例化,但它可以有构造方法。......
  • 华为OD机试 - 传递悄悄话(Java & JS & Python & C & C++)
    须知哈喽,本题库完全免费,收费是为了防止被爬,大家订阅专栏后可以私信联系退款。感谢支持文章目录须知题目描述输入描述输出描述解题思路:题目描述给定一个二叉树,每个节点上站一个人,节点数字表示父节点到该节点传递悄悄话需要花费的时间。初始时,根节点所在......
  • 华为OD机试 - 剩余银饰的重量(Java & JS & Python & C & C++)
    须知哈喽,本题库完全免费,收费是为了防止被爬,大家订阅专栏后可以私信联系退款。感谢支持文章目录须知题目描述输入描述输出描述解题思路:题目描述有N块二手市场收集的银饰,每块银饰的重量都是正整数,收集到的银饰会被熔化用于打造新的饰品。每一回合,从中选......
  • 【 Java系列】--基础篇-1、认识接口中的default关键字
    原创:朱老师缘神日记default关键字:是在Java8中引入的新概念,也可称为Virtualextensionmethods——虚拟扩展方法,它与public、private等都属于修饰符关键字,与其它两个关键字不同之处在于default关键字大部分都用于修饰接口。default修饰方法时只能在接口类中使用,在接口中......
  • 【Java 系列】 -- 1、各种服务器
    一、服务器与容器三、各种项目中的容器原创:数据之恋......