Java代码操作Kudu
一、构建maven工程
二、导入依赖
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client</artifactId>
<version>1.9.0-cdh6.2.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client-tools</artifactId>
<version>1.9.0-cdh6.2.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kudu/kudu-spark2 -->
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-spark2_2.11</artifactId>
<version>1.9.0-cdh6.2.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.0</version>
</dependency>
</dependencies>
三、创建包结构
包名 |
说明 |
---|---|
cn.it |
代码所在的包目录 |
四、初始化方法
package cn.it;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Type;
import org.apache.kudu.client.KuduClient;
import org.junit.Before;
public class TestKudu {
//定义KuduClient客户端对象
private static KuduClient kuduClient;
//定义表名
private static String tableName = "person";
/**
* 初始化方法
*/
@Before
public void init() {
//指定master地址
String masterAddress = "node2.cn";
//创建kudu的数据库连接
kuduClient = new KuduClient.KuduClientBuilder(masterAddress).defaultSocketReadTimeoutMs(6000).build();
}
//构建表schema的字段信息
//字段名称 数据类型 是否为主键
public ColumnSchema newColumn(String name, Type type, boolean isKey) {
ColumnSchema.ColumnSchemaBuilder column = new ColumnSchema.ColumnSchemaBuilder(name, type);
column.key(isKey);
return column.build();
}
}
五、创建表
/** 使用junit进行测试
*
* 创建表
* @throws KuduException
*/
@Test
public void createTable() throws KuduException {
//设置表的schema
List<ColumnSchema> columns = new LinkedList<ColumnSchema>();
columns.add(newColumn("CompanyId", Type.INT32, true));
columns.add(newColumn("WorkId", Type.INT32, false));
columns.add(newColumn("Name", Type.STRING, false));
columns.add(newColumn("Gender", Type.STRING, false));
columns.add(newColumn("Photo", Type.STRING, false));
Schema schema = new Schema(columns);
//创建表时提供的所有选项
CreateTableOptions tableOptions = new CreateTableOptions();
//设置表的副本和分区规则
LinkedList<String> list = new LinkedList<String>();
list.add("CompanyId");
//设置表副本数
tableOptions.setNumReplicas(1);
//设置range分区
//tableOptions.setRangePartitionColumns(list);
//设置hash分区和分区的数量
tableOptions.addHashPartitions(list, 3);
try {
kuduClient.createTable("person", schema, tableOptions);
} catch (Exception e) {
e.printStackTrace();
}
kuduClient.close();
}
六、插入数据
/**
* 向表中加载数据
* @throws KuduException
*/
@Test
public void loadData() throws KuduException {
//打开表
KuduTable kuduTable = kuduClient.openTable(tableName);
//创建KuduSession对象 kudu必须通过KuduSession写入数据
KuduSession kuduSession = kuduClient.newSession();
//采用flush方式 手动刷新
kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
kuduSession.setMutationBufferSpace(3000);
//准备数据
for(int i=1; i<=10; i++){
Insert insert = kuduTable.newInsert();
//设置字段的内容
insert.getRow().addInt("CompanyId",i);
insert.getRow().addInt("WorkId",i);
insert.getRow().addString("Name","lisi"+i);
insert.getRow().addString("Gender","male");
insert.getRow().addString("Photo","person"+i);
kuduSession.flush();
kuduSession.apply(insert);
}
kuduSession.close();
kuduClient.close();
}
七、查询数据
/**
* 查询表数据
* @throws KuduException
*/
@Test
public void queryData() throws KuduException {
//打开表
KuduTable kuduTable = kuduClient.openTable(tableName);
//获取scanner扫描器
KuduScanner.KuduScannerBuilder scannerBuilder = kuduClient.newScannerBuilder(kuduTable);
KuduScanner scanner = scannerBuilder.build();
//遍历
while(scanner.hasMoreRows()){
RowResultIterator rowResults = scanner.nextRows();
while (rowResults.hasNext()){
RowResult result = rowResults.next();
int companyId = result.getInt("CompanyId");
int workId = result.getInt("WorkId");
String name = result.getString("Name");
String gender = result.getString("Gender");
String photo = result.getString("Photo");
System.out.print("companyId:"+companyId+" ");
System.out.print("workId:"+workId+" ");
System.out.print("name:"+name+" ");
System.out.print("gender:"+gender+" ");
System.out.println("photo:"+photo);
}
}
//关闭
scanner.close();
kuduClient.close();
}
八、修改数据
/**
* 修改数据
* @throws KuduException
*/
@Test
public void upDATEData() throws KuduException {
//打开表
KuduTable kuduTable = kuduClient.openTable(tableName);
//构建kuduSession对象
KuduSession kuduSession = kuduClient.newSession();
//设置刷新数据模式,自动提交
kuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
//更新数据需要获取UpDATE对象
UpDATE upDATE = kuduTable.newUpDATE();
//获取row对象
PartialRow row = upDATE.getRow();
//设置要更新的数据信息
row.addInt("CompanyId",1);
row.addString("Name","kobe");
//操作这个upDATE对象
kuduSession.apply(upDATE);
kuduSession.close();
}
九、删除数据
/**
* 删除表中的数据
*/
@Test
public void deleteData() throws KuduException {
//打开表
KuduTable kuduTable = kuduClient.openTable(tableName);
KuduSession kuduSession = kuduClient.newSession();
//获取Delete对象
Delete delete = kuduTable.newDelete();
//构建要删除的行对象
PartialRow row = delete.getRow();
//设置删除数据的条件
row.addInt("CompanyId",2);
kuduSession.flush();
kuduSession.apply(delete);
kuduSession.close();
kuduClient.close();
}
十、修改表
package cn.it.kudu;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Type;
import org.apache.kudu.client.*;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
/**
* 修改表操作
*/
public class AlterTable {
//定义kudu的客户端对象
private static KuduClient kuduClient;
//定义一张表名称
private static String tableName = "person";
/**
* 初始化操作
*/
@Before
public void init() {
//指定kudu的master地址
String masterAddress = "node2.cn";
//创建kudu的数据库连接
kuduClient = new KuduClient.KuduClientBuilder(masterAddress).defaultSocketReadTimeoutMs(6000).build();
}
/**
* 添加列
*/
@Test
public void alterTableAddColumn() {
AlterTableOptions alterTableOptions = new AlterTableOptions();
alterTableOptions.addColumn(new ColumnSchema.ColumnSchemaBuilder("Address", Type.STRING).nullable(true).build());
try {
kuduClient.alterTable(tableName, alterTableOptions);
} catch (KuduException e) {
e.printStackTrace();
}
}
/**
* 删除列
*/
@Test
public void alterTableDeleteColumn(){
AlterTableOptions alterTableOptions = new AlterTableOptions().dropColumn("Address");
try {
kuduClient.alterTable(tableName, alterTableOptions);
} catch (KuduException e) {
e.printStackTrace();
}
}
/**
* 添加分区列
*/
@Test
public void alterTableAddRangePartition(){
int lowerValue = 110;
int upperValue = 120;
try {
KuduTable kuduTable = kuduClient.openTable(tableName);
List<Partition> rangePartitions = kuduTable.getRangePartitions(6000);
boolean flag = true;
for (Partition rangePartition : rangePartitions) {
int startKey = rangePartition.getDecodedRangeKeyStart(kuduTable).getInt("Id");
if(startKey == lowerValue){
flag = false;
}
}
if(flag) {
PartialRow lower = kuduTable.getSchema().newPartialRow();
lower.addInt("Id", lowerValue);
PartialRow upper = kuduTable.getSchema().newPartialRow();
upper.addInt("Id", upperValue);
kuduClient.alterTable(tableName,new AlterTableOptions().addRangePartition(lower, upper));
}else{
System.out.println("分区已经存在,不能重复创建!");
}
} catch (KuduException e) {
e.printStackTrace();
} catch (Exception exception) {
exception.printStackTrace();
}
}
/**
* 删除表
* @throws KuduException
*/
@Test
public void dropTable() throws KuduException {
kuduClient.deleteTable(tableName);
}
}
十一、删除表
/**
* 删除表
*/
@Test
public void dropTable() throws KuduException {
//删除表
DeleteTableResponse response = kuduClient.deleteTable(tableName);
//关闭客户端连接
kuduClient.close();
}
标签:Java,org,KuduException,客快,kuduClient,void,kudu,Kudu,public
From: https://www.cnblogs.com/shan13936/p/18104825