本案例读取数据库中数据存储到Hbase
1、原始数据
MySQL数据库
2、Hbase建表
启动Hadoop、Zookeeper、Hbase
启动Hbase Shell窗口,执行
create 'tb_region','cfgeo'
3、编写代码
package com.soft863.demo
import org.apache.spark.sql.SparkSession
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Result
object demo1 {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[*]")
.appName("ReadMySQLDatas")
.getOrCreate()
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:mysql://192.168.1.5:3306/wjobs?useSSL=false&serverTimezone=UTC&characterEncoding=utf-8")
.option("dbtable", "tbspatial_region")
.option("user", "root")
.option("password", "root")
.load()
jdbcDF.printSchema()
jdbcDF.show()
val sc = spark.sparkContext
sc.hadoopConfiguration.set("hbase.zookeeper.quorum", "hadoop100")
sc.hadoopConfiguration.set("hbase.zookeeper.property.clientPort", "2181")
sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, "region")
val rdd = jdbcDF.rdd.map(row => {
val rowKeyBytes = Bytes.toBytes(row.getAs[Int]("id"))
val put = new Put(rowKeyBytes)
val columnFamily = "cfgeo" // 定义列族名称
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("city"), Bytes.toBytes(row.getAs[String]("city")))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("province"), Bytes.toBytes(row.getAs[String]("province")))
new ImmutableBytesWritable(rowKeyBytes) -> put
})
val job = Job.getInstance(sc.hadoopConfiguration)
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Result])
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
rdd.saveAsNewAPIHadoopDataset(job.getConfiguration)
spark.stop()
}
}
需要用的pom依赖
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.15</version>
</dependency>
<dependency>
<groupId>org.apache.hbase.connectors.spark</groupId>
<artifactId>hbase-spark</artifactId>
<version>1.0.0</version>
</dependency>
</dependencies>
4、执行代码
查看Hbase表内容
标签:hbase,存储,val,Bytes,apache,org,Hbase,spark,Spark From: https://blog.csdn.net/taogumo/article/details/144752922