首页 > 其他分享 >Spark导入导出Hbase

Spark导入导出Hbase

时间:2022-12-02 15:00:58浏览次数:43  
标签:val Hbase Bytes hadoop 导入 apache org hbase Spark


本文代码通过spark-shell运行

spark-shell --master yarn --driver-class-path /home/hadoop/software/spark/spark-2.4.4-bin-hadoop2.7/jars/hbase/*:/home/hadoop/software/hbase-1.4.10/conf

1.Put API

Put API可能是将数据快速导入HBase表的最直接的方法。但是在导入大量数据时不建议使用!但是可以作为简单数据迁移的选择,直接写个代码批量处理,开发简单、方便、可控强。

使用saveAsHadoopDataset写入数据

// 这里我在spark-shell,所以没有引用SparkContext和SparkConf
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf

val conf = HBaseConfiguration.create()
//设置连接的节点和端口,也可以将hbase-site.xml导入classpath
conf.set("hbase.zookeeper.quorum","master,slave1,slave2")
conf.set("hbase.zookeeper.property.clientPort","2181")
val tablename = "info"
val jobConf = new JobConf(conf)
jobConf.setOutputFormat(classOf[TableOutputFormat])
jobConf.set(TableOutputFormat.OUTPUT_TABLE,tablename)
val indataRDD = sc.makeRDD(Array("1,Jack,15","2,Lily,16","3,Mike,16"))
val rdd = indataRDD.map(_.split(",")).map{arr=>{
// Put建立一行,并输入指定的rowkey--主键
val put = new Put(Bytes.toBytes(arr(0).toInt))
// 所有数据需要通过toBytes转换 按照格式:(列簇,列名,数据)
put.add(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))
put.add(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))
// 转化成RDD[(ImmutableBytesWritable,Put)]类型才能调用saveAsHadoopDataset
(new ImmutableBytesWritable,put)
}}
rdd.saveAsHadoopDataset(jobConf)

使用saveAsNewAPIHadoopDataset写入数据

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.spark._
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes

val tablename = "account"
sc.hadoopConfiguration.set("hbase.zookeeper.quorum","master,slave1,slave2")
sc.hadoopConfiguration.set("hbase.zookeeper.property.clientPort","2181")
sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE,tablename)
val job = new Job(sc.hadoopConfiguration)
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Result])
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
val inputRDD = sc.makeRDD(Array("1,Jack,15","2,Lily,16","3,Mike,16"))
val rdd = innputRDD.map(_.split(',')).map{arr=>{
val put = new Put(Bytes.toBytes(arr(0)))
put.add(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))
put.add(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2)))
(new ImmutableBytesWritable,put)
}}
rdd.saveAsNewAPIHadoopDataset(job.getConfiguration())

从hbase读取数据转化为RDD

import org.apache.hadoop.hbase.{HBaseConfiguration,HTableDescriptor,TableName}
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.spark._
import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.io._

val tablename = "account"
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum","master,slave1,slave2")
conf.set("hbase.zookeeper.property.clientPort","2181")
conf.set(TableInputFormat.INPUT_TABLE,tablename)
val admin = new HBaseAdmin(conf)
if(!admin,isTableAvailable(tablename)){
// 表描述:表名
val tableDesc = new HTableDescriptor(TableName.valueOf(tablename))
admin.createTable(tableDesc)}
val hBaseRDD = sc.newAPIHadoopRDD(conf,classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])
val count = hBaseRDD.count()
println(count)
hBaseRDD.foreach{case (_,result)=>{
val key = Bytes.toString(result.getRow)
val name = Bytes.toString(result.getValue("cf".getBytes,"name".getBytes))

val age = Bytes.toInt(result.getValue("cf".getBytes,"age".getBytes))
println("Row key:"+key+" Name:"+name+" Age:"+age)
}}

参考​​spark将数据写入hbase以及从hbase读取数据​​ 未完待续…

2、MapReduce Job
推荐使用sqoop,它的底层实现是mapreduce,数据并行导入的,这样无须自己开发代码,过滤条件通过query参数可以实现。
Sqoop是一款开源的工具,主要用于在Hadoop(Hive)与传统的数据库(mysql、postgresql…)间进行数据的传递,可以将MySQL中的数据导进到Hadoop的HDFS中,也可以将HDFS的数据导进到Mysql中。
参考Index of /docs。
采用如下命令:sqoop import
–connect jdbc:mysql://localhost/db
–username root -P
–table mysql_order
–columns “id,name”
–hbase-table hbase_order
–column-family f
–hbase-row-key id
–query “select id,name from mysql_order where…”
-m 1
3、采用Bulk load装载数据
bulk-load的作用是用mapreduce的方式将hdfs上的文件装载到hbase中,对于海量数据装载入hbase非常有用。
需要将MySQL的表数据导出为TSV格式(因为后面使用Import TSV工具),还需要确保有一个字段可以表示HBase表行的row key。


标签:val,Hbase,Bytes,hadoop,导入,apache,org,hbase,Spark
From: https://blog.51cto.com/u_15899958/5907047

相关文章

  • Hive数据的导入和导出
    导入以下通过hive交互shell执行//创建ai库createdatabaseai;showdatabases;//创建表partition添加分区字段用tab间隔createtabletable_name(namestring,encode......
  • Hbase协处理器详解
    Hbase协处理器一、简述二、协处理器类型    2.1Observer协处理器    2.2Endpoint协处理器三、协处理的加载方式四、静态加载与卸载    4.1静态加......
  • Hbase系统架构及数据结构
    Hbase系统架构及数据结构一、基本概念    1.1RowKey(行键)    1.2ColumnFamily(列族)    1.3ColumnQualifier(列限定符)    1.4Column(列)......
  • Hbase容灾与备份
    Hbase容灾与备份一、前言二、CopyTable    2.1简介    2.2命令格式    2.3常用命令    2.4更多参数三、Export/Import    3.1简介  ......
  • HBase单机环境搭建
    HBase基本环境搭建一、安装前置条件说明二、Standalone模式三、伪集群模式安装(Pseudo-Distributed)一、安装前置条件说明1.1JDK版本说明HBase需要依赖JDK环境,......
  • HBase集群环境搭建
    HBase集群环境配置一、集群规划二、前置条件三、集群搭建        3.1下载并解压        3.2配置环境变量        3.3集群配置    ......
  • hbaseSink
    1.格式:CREATETABLEMyResult(colFamily:colNamecolType,...)WITH(type='hbase',zookeeperQuorum='ip:port[,ip:port]',tableName='table......
  • Hbase_Java_API
    HBaseJavaAPI的基本使用一、简述二、JavaAPI1.x基本使用三、JavaAPI2.x基本使用四、正确连接Hbase一、简述截至到目前(2019.04),HBase有两个主要的版本,......
  • Hbase_Shell
    Hbase常用Shell命令一、基本命令        1.1获取帮助        1.2查看服务器状态        1.3查看版本信息二、关于表的操作    ......
  • hbaseSide
    1.格式CREATETABLEtableName(columnFamily:columnNametypeasalias,...PRIMARYKEY(keyInfo),PERIODFORSYSTEM_TIME)WITH(type......