本文代码通过spark-shell运行
spark-shell --master yarn --driver-class-path /home/hadoop/software/spark/spark-2.4.4-bin-hadoop2.7/jars/hbase/*:/home/hadoop/software/hbase-1.4.10/conf
1.Put API
Put API可能是将数据快速导入HBase表的最直接的方法。但是在导入大量数据时不建议使用!但是可以作为简单数据迁移的选择,直接写个代码批量处理,开发简单、方便、可控强。
使用saveAsHadoopDataset写入数据
// 这里我在spark-shell,所以没有引用SparkContext和SparkConf
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
val conf = HBaseConfiguration.create()
//设置连接的节点和端口,也可以将hbase-site.xml导入classpath
conf.set("hbase.zookeeper.quorum","master,slave1,slave2")
conf.set("hbase.zookeeper.property.clientPort","2181")
val tablename = "info"
val jobConf = new JobConf(conf)
jobConf.setOutputFormat(classOf[TableOutputFormat])
jobConf.set(TableOutputFormat.OUTPUT_TABLE,tablename)
val indataRDD = sc.makeRDD(Array("1,Jack,15","2,Lily,16","3,Mike,16"))
val rdd = indataRDD.map(_.split(",")).map{arr=>{
// Put建立一行,并输入指定的rowkey--主键
val put = new Put(Bytes.toBytes(arr(0).toInt))
// 所有数据需要通过toBytes转换 按照格式:(列簇,列名,数据)
put.add(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))
put.add(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))
// 转化成RDD[(ImmutableBytesWritable,Put)]类型才能调用saveAsHadoopDataset
(new ImmutableBytesWritable,put)
}}
rdd.saveAsHadoopDataset(jobConf)
使用saveAsNewAPIHadoopDataset写入数据
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.spark._
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
val tablename = "account"
sc.hadoopConfiguration.set("hbase.zookeeper.quorum","master,slave1,slave2")
sc.hadoopConfiguration.set("hbase.zookeeper.property.clientPort","2181")
sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE,tablename)
val job = new Job(sc.hadoopConfiguration)
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Result])
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
val inputRDD = sc.makeRDD(Array("1,Jack,15","2,Lily,16","3,Mike,16"))
val rdd = innputRDD.map(_.split(',')).map{arr=>{
val put = new Put(Bytes.toBytes(arr(0)))
put.add(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))
put.add(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2)))
(new ImmutableBytesWritable,put)
}}
rdd.saveAsNewAPIHadoopDataset(job.getConfiguration())
从hbase读取数据转化为RDD
import org.apache.hadoop.hbase.{HBaseConfiguration,HTableDescriptor,TableName}
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.spark._
import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.io._
val tablename = "account"
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum","master,slave1,slave2")
conf.set("hbase.zookeeper.property.clientPort","2181")
conf.set(TableInputFormat.INPUT_TABLE,tablename)
val admin = new HBaseAdmin(conf)
if(!admin,isTableAvailable(tablename)){
// 表描述:表名
val tableDesc = new HTableDescriptor(TableName.valueOf(tablename))
admin.createTable(tableDesc)}
val hBaseRDD = sc.newAPIHadoopRDD(conf,classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])
val count = hBaseRDD.count()
println(count)
hBaseRDD.foreach{case (_,result)=>{
val key = Bytes.toString(result.getRow)
val name = Bytes.toString(result.getValue("cf".getBytes,"name".getBytes))
val age = Bytes.toInt(result.getValue("cf".getBytes,"age".getBytes))
println("Row key:"+key+" Name:"+name+" Age:"+age)
}}
参考spark将数据写入hbase以及从hbase读取数据 未完待续…
2、MapReduce Job
推荐使用sqoop,它的底层实现是mapreduce,数据并行导入的,这样无须自己开发代码,过滤条件通过query参数可以实现。
Sqoop是一款开源的工具,主要用于在Hadoop(Hive)与传统的数据库(mysql、postgresql…)间进行数据的传递,可以将MySQL中的数据导进到Hadoop的HDFS中,也可以将HDFS的数据导进到Mysql中。
参考Index of /docs。
采用如下命令:sqoop import
–connect jdbc:mysql://localhost/db
–username root -P
–table mysql_order
–columns “id,name”
–hbase-table hbase_order
–column-family f
–hbase-row-key id
–query “select id,name from mysql_order where…”
-m 1
3、采用Bulk load装载数据
bulk-load的作用是用mapreduce的方式将hdfs上的文件装载到hbase中,对于海量数据装载入hbase非常有用。
需要将MySQL的表数据导出为TSV格式(因为后面使用Import TSV工具),还需要确保有一个字段可以表示HBase表行的row key。