1、pom.xml
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_2.11</artifactId>
<version>6.0.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.2.6</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.6</version>
</dependency>
</dependencies>
2、SparkHbaseDemo1.java
package com.jun.hbase_prac;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.Base64;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import scala.Tuple2;
import java.io.IOException;
public class SparkHbaseDemo1 {
public static void main(String[] args) throws IOException {
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("WordCount");
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
JavaSparkContext jsc = new JavaSparkContext(conf);
String tableName = "t2";
// 全表扫描
Scan scan = new Scan();
byte[] cf1 = "info".getBytes();
byte[] cf2 = "cf1".getBytes();
byte[] cn1 = "name".getBytes();
byte[] cn2 = "gender".getBytes();
byte[] cn3 = "cn1".getBytes();
scan.addFamily(cf1);
scan.addColumn(cf1, cn1);
scan.addColumn(cf1, cn2);
// 其他列族
scan.addFamily(cf2);
scan.addColumn(cf2, cn3);
// 将scan编码
ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
String scanToString = Base64.encodeBytes(proto.toByteArray());
Configuration configuration = HBaseConfiguration.create();
configuration.set(TableInputFormat.INPUT_TABLE, tableName);
configuration.set(TableInputFormat.SCAN, scanToString);
// 配置文件
// ZooKeeper集群
// configuration.set("hbase.zookeeper.quorum", "hadoop1,hadoop2,hadoop3");
configuration.addResource(new Path("core-site.xml"));
configuration.addResource(new Path("hbase-site.xml"));
// 将HBase数据转成RDD
JavaPairRDD<ImmutableBytesWritable, Result> HBaseRdd = jsc.newAPIHadoopRDD(configuration, TableInputFormat.class,
ImmutableBytesWritable.class, Result.class);
System.out.println(HBaseRdd.count());
// 将 RDD 转成 String
JavaRDD<String> HBaseResult = HBaseRdd.map(new Function<Tuple2<ImmutableBytesWritable, Result>, String>() {
@Override
public String call(Tuple2<ImmutableBytesWritable, Result> tuple2) throws Exception {
Result result = tuple2._2;
String rowKey = Bytes.toString(result.getRow());
String name = Bytes.toString(result.getValue("info".getBytes(), "name".getBytes()));
String gender = Bytes.toString(result.getValue("info".getBytes(), "gender".getBytes()));
String cn1 = Bytes.toString(result.getValue("cf1".getBytes(), "cn1".getBytes()));
System.out.println(tuple2);
return "rowKey: " + rowKey + " name: " + name + " gender: " + gender + " cn1: " + cn1;
}
});
System.out.println(HBaseResult.collect()); // [rowKey: 10001 name: rose gender: female cn1: value1]
jsc.stop();
}
}
标签:读取,getBytes,JavaSpark,import,apache,org,HBASE,spark,hbase
From: https://blog.51cto.com/u_13942374/5885202