API手册 https://hbase.apache.org/2.4/apidocs/index.html
pom.xml
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>2.4.11</version>
<exclusions>
<exclusion>
<groupId>org.glassfish</groupId>
<artifactId>javax.el</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.glassfish</groupId>
<artifactId>javax.el</artifactId>
<version>3.0.1-b06</version>
</dependency>
log4j.properties
log4j.rootLogger=info,file
log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH\:mm\:ss.SSS} [%5p] %r %l : %m%n
log4j.appender.file.File=./log/chenxb.log
单线程使用连接
package com.chen;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
public class HBaseConnectSingle {
public static void main(String[] args) throws IOException {
//配置对象
Configuration configuration = new Configuration();
//配置参数
configuration.set("hbase.zookeeper.quorum", "hadoop100,hadoop101,hadoop102");
//创建连接
Connection connection = ConnectionFactory.createConnection(configuration);
//异步连接
CompletableFuture<AsyncConnection> asyncConnection = ConnectionFactory.createAsyncConnection(configuration);
//使用连接
System.out.println(connection);
//关闭连接
connection.close();
}
}
多线程创建连接
hbase-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hbase.zookeeper.quorum</name>
<value>hadoop100,hadoop101,hadoop102</value>
<description>The directory shared by RegionServers.</description>
</property>
</configuration>
HBaseConnect
package com.chen;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import java.io.IOException;
/**
* @author chenxiaobin
*/
public class HBaseConnect {
/**
* 声明静态属性
*/
public static Connection connection = null;
static {
try {
connection = ConnectionFactory.createConnection();
} catch (IOException e) {
e.printStackTrace();
}
}
public static void closeConnection() throws IOException {
//判断连接是否为空
if (connection != null) {
connection.close();
}
}
public static void main(String[] args) throws IOException {
System.out.println(HBaseConnect.connection);
//关闭
HBaseConnect.closeConnection();
}
}
DDL
package com.chen;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* @author chen
*/
public class HBaseDDL {
private static Logger logger = LoggerFactory.getLogger(HBaseDDL.class);
/**
* 声明静态属性
*/
public static Connection connection = HBaseConnect.connection;
/**
* 创建命名空间
*
* @param namespace 命名空间名称
* @throws IOException
*/
public static void createNamespace(String namespace) throws IOException {
//1,获取admin,admin连接是轻量级,不是线程安全,不建议池化或缓存
Admin admin = connection.getAdmin();
//2,命名空间建造者=》设计师=》增强构造函数
NamespaceDescriptor.Builder builder = NamespaceDescriptor.create(namespace);
builder.addConfiguration("user", "chenxb");
//3,创建命名空间,属于方法自身的问题,不应该抛出,应该catch
try {
admin.createNamespace(builder.build());
} catch (IOException e) {
logger.error("命名空间已存在 " + e);
e.printStackTrace();
}
//4,关闭
admin.close();
}
/**
* 判断表是否存在
*
* @param namespace 命名空间名称
* @param tableName 表格名称
* @return
*/
public static boolean isTableExist(String namespace, String tableName) throws IOException {
//1,获取admin,admin连接是轻量级,不是线程安全,不建议池化或缓存
Admin admin = connection.getAdmin();
//2,使用方法判断表格是否存在
boolean exists = false;
try {
exists = admin.tableExists(TableName.valueOf(namespace, tableName));
} catch (IOException e) {
logger.error("命名空间已存在 " + e);
e.printStackTrace();
}
//3,关闭连接
admin.close();
//4,返回结果
return exists;
}
/**
* 创建表
*
* @param namespace 命名空间名称
* @param tableName 表名
* @param columnFamilies 列族名,可以多个
*/
public static void createTable(String namespace, String tableName, String... columnFamilies) throws IOException {
//判断列族不能为空,为空创建表会报错
if (columnFamilies.length == 0) {
logger.error("至少有一个列族");
return;
}
//判断表格是否存在
if (isTableExist(namespace, tableName)) {
logger.error("表已存在");
return;
}
//1,获取admin,admin连接是轻量级,不是线程安全,不建议池化或缓存
Admin admin = connection.getAdmin();
//2,创建表格建造者
TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(namespace, tableName));
//添加参数
for (String columnFamily : columnFamilies) {
//列族建造者
ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(columnFamily));
//添加版本参数,设置最大版本
columnFamilyDescriptorBuilder.setMaxVersions(5);
//创建添加完参数的列族
tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptorBuilder.build());
}
//创建对应表格
try {
admin.createTable(tableDescriptorBuilder.build());
} catch (IOException e) {
logger.error("表已存在 " + e);
e.printStackTrace();
}
//4,关闭
admin.close();
}
/**
* 修改表中一个列族的版本
*
* @param namespace 命名空间
* @param tableName 表名
* @param columnFamily 列族
* @param version 版本
*/
public static void modifyTable(String namespace, String tableName, String columnFamily, int version) throws IOException {
//1,获取admin,admin连接是轻量级,不是线程安全,不建议池化或缓存
Admin admin = connection.getAdmin();
//判断表格是否存在
if (!isTableExist(namespace, tableName)) {
logger.error("表不存在");
return;
}
//修改表格
try {
//获取之前的表格描述
TableDescriptor descriptor = admin.getDescriptor(TableName.valueOf(namespace, tableName));
//2,创建表格建造者
TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(descriptor);
//需要旧的列族
ColumnFamilyDescriptor columnFamilyDescriptor = descriptor.getColumnFamily(Bytes.toBytes(columnFamily));
//列族建造者
ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder = ColumnFamilyDescriptorBuilder.newBuilder(columnFamilyDescriptor);
//添加版本参数,设置最大版本
columnFamilyDescriptorBuilder.setMaxVersions(version);
//对应建造者表格数据修改
tableDescriptorBuilder.modifyColumnFamily(columnFamilyDescriptorBuilder.build());
admin.modifyTable(tableDescriptorBuilder.build());
} catch (IOException e) {
logger.error("修改表格错误 " + e);
e.printStackTrace();
}
//4,关闭
admin.close();
}
/**
* 删除表
*
* @param namespace 命名空间
* @param tableName 表名
* @return true 删除成功
*/
public static boolean deleteTable(String namespace, String tableName) throws IOException {
//判断表格是否存在
if (!isTableExist(namespace, tableName)) {
logger.error("表不存在");
return false;
}
//获取admin,admin连接是轻量级,不是线程安全,不建议池化或缓存
Admin admin = connection.getAdmin();
try {
//先标记不可用
TableName tableName1 = TableName.valueOf(namespace, tableName);
admin.disableTable(tableName1);
admin.deleteTable(tableName1);
} catch (IOException e) {
logger.error("删除表失败 " + e);
e.printStackTrace();
}
//关闭
admin.close();
return true;
}
public static void main(String[] args) throws IOException {
//创建命名空间
//createNamespace("chenxb");
//创建表
//createTable("chenxb", "mytable", "info");
//判断表是否存在
//System.out.println(isTableExist("chenxb", "mytable"));
//修改表
//modifyTable("chenxb", "mytable", "info", 10);
//删除表
deleteTable("chenxb", "mytable");
//其他代码
System.out.println("业务代码");
//关闭
HBaseConnect.closeConnection();
}
}
DML
package com.chen;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.ColumnValueFilter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* @author chen
*/
public class HBaseDML {
private static Logger logger = LoggerFactory.getLogger(HBaseDDL.class);
/**
* 声明静态属性
*/
public static Connection connection = HBaseConnect.connection;
/**
* 插入数据
*
* @param namespace 命名空间名称
* @param tableName 表名称
* @param rowKey 主键
* @param columnFamily 列族名称
* @param columnName 列名
* @param value 值
*/
public static void putCell(String namespace, String tableName, String rowKey, String columnFamily, String columnName, String value) throws IOException {
//读取表
Table table = connection.getTable(TableName.valueOf(namespace, tableName));
//调用相关方法插入数据
Put put = new Put(Bytes.toBytes(rowKey));
//给put添加数据
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName), Bytes.toBytes(value));
//调用相关方法插入数据
try {
table.put(put);
} catch (IOException e) {
logger.error("插入数据失败 " + e);
e.printStackTrace();
}
//关闭
table.close();
}
/**
* 获取数据
*
* @param namespace 命名空间名称
* @param tableName 表名称
* @param rowKey 主键
* @param columnFamily 列族名称
* @param columnName 列名
*/
public static void getCell(String namespace, String tableName, String rowKey, String columnFamily, String columnName) throws IOException {
//读取表
Table table = connection.getTable(TableName.valueOf(namespace, tableName));
//调用相关方法插入数据
Get get = new Get(Bytes.toBytes(rowKey));
//get添加参数(不加参数默认所有行数据)
get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName));
//设置读取数据版本
get.readAllVersions();
try {
//读取数据,得到result对象
Result result = table.get(get);
//处理数据
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
byte[] bytes = CellUtil.cloneValue(cell);
String s = new String(bytes);
System.out.println(s);
}
} catch (IOException e) {
logger.error("获取数据失败 " + e);
e.printStackTrace();
}
//关闭
table.close();
}
/**
* 扫描数据
*
* @param namespace 命名空间名称
* @param tableName 表名称
* @param startRow 开始行
* @param stopRow 结束行
*/
public static void scanRows(String namespace, String tableName, String startRow, String stopRow) throws IOException {
//读取表
Table table = connection.getTable(TableName.valueOf(namespace, tableName));
//创建scan对象
Scan scan = new Scan();
//添加scan参数
scan.withStartRow(Bytes.toBytes(startRow));
scan.withStopRow(Bytes.toBytes(stopRow));
try {
//读取多行数据 获取scanner
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
System.out.print(new String(CellUtil.cloneRow(cell)) + "-" + new String(CellUtil.cloneFamily(cell)) + "-" + new String(CellUtil.cloneValue(cell)) + "\t");
}
System.out.println();
}
} catch (IOException e) {
logger.error("扫描数据失败 " + e);
e.printStackTrace();
}
//关闭
table.close();
}
/**
* 过滤扫描
*
* @param namespace 命名空间名称
* @param tableName 表名称
* @param startRow 开始行
* @param stopRow 结束行
* @param columnFamily 列族名称
* @param columnName 列名
* @param value 值
* @throws IOException
*/
public static void filterScan(String namespace, String tableName, String startRow, String stopRow, String columnFamily, String columnName, String value) throws IOException {
//读取表
Table table = connection.getTable(TableName.valueOf(namespace, tableName));
//创建scan对象
Scan scan = new Scan();
//添加scan参数
scan.withStartRow(Bytes.toBytes(startRow));
scan.withStopRow(Bytes.toBytes(stopRow));
//添加多个过滤
FilterList filterList = new FilterList();
//(1)结果只保留当前列的数据
ColumnValueFilter columnValueFilter = new ColumnValueFilter(
//列族名称
Bytes.toBytes(columnFamily),
//列名
Bytes.toBytes(columnName),
//比较关系
CompareOperator.EQUAL,
//值
Bytes.toBytes(value)
);
//(2)结果保留整行数据
SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(
//列族名称
Bytes.toBytes(columnFamily),
//列名
Bytes.toBytes(columnName),
//比较关系
CompareOperator.EQUAL,
//值
Bytes.toBytes(value)
);
//本身可以添加多个过滤器
filterList.addFilter(singleColumnValueFilter);
//添加过滤
scan.setFilter(filterList);
try {
//读取多行数据 获取scanner
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
System.out.print(new String(CellUtil.cloneRow(cell)) + "-" + new String(CellUtil.cloneFamily(cell)) + "-" + new String(CellUtil.cloneValue(cell)) + "\t");
}
System.out.println();
}
} catch (IOException e) {
logger.error("扫描数据失败 " + e);
e.printStackTrace();
}
//关闭
table.close();
}
/**
* 删除一行中的一列数据
*
* @param namespace 命名空间名称
* @param tableName 表名称
* @param rowKey 主键
* @param columnFamily 列族名称
* @param columnName 列名
*/
public static void deleteColumn(String namespace, String tableName, String rowKey, String columnFamily, String columnName) throws IOException {
//读取表
Table table = connection.getTable(TableName.valueOf(namespace, tableName));
//创建delete对象
Delete delete = new Delete(Bytes.toBytes(rowKey));
//删除一个版本
delete.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName));
//删除多个版本(一般业务都是删除多个版本)
delete.addColumns(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName));
try {
table.delete(delete);
} catch (IOException e) {
logger.error("删除数据失败 " + e);
e.printStackTrace();
}
//关闭table
table.close();
}
public static void main(String[] args) throws IOException {
//插入数据
//putCell("chen", "student", "2001", "msg", "name1", "cxb1");
//读数据
//getCell("chen", "student", "2001", "msg", "name1");
//扫描数据
//scanRows("chen", "student", "1000", "3001");
//过滤扫描
//filterScan("chen", "student", "1000", "3001", "msg", "name1", "cxb1");
//删除数据
deleteColumn("chen", "student", "2001", "info", "name1");
//其他代码
System.out.println("业务代码");
//关闭连接
HBaseConnect.closeConnection();
}
}