首页 > 编程语言 >JavaSpark 读取 HBASE

JavaSpark 读取 HBASE

时间:2022-11-24 23:31:26浏览次数:37  
标签:读取 getBytes JavaSpark import apache org HBASE spark hbase

1、pom.xml

<properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>

        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch-spark-20_2.11</artifactId>
            <version>6.0.0</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.2.6</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.2.6</version>
        </dependency>
    </dependencies>

2、SparkHbaseDemo1.java

package com.jun.hbase_prac;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.Base64;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import scala.Tuple2;

import java.io.IOException;

public class SparkHbaseDemo1 {
    public static void main(String[] args) throws IOException {
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("WordCount");
        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

        JavaSparkContext jsc = new JavaSparkContext(conf);

        String tableName = "t2";

        // 全表扫描
        Scan scan = new Scan();
        byte[] cf1 = "info".getBytes();
        byte[] cf2 = "cf1".getBytes();

        byte[] cn1 = "name".getBytes();
        byte[] cn2 = "gender".getBytes();
        byte[] cn3 = "cn1".getBytes();

        scan.addFamily(cf1);
        scan.addColumn(cf1, cn1);
        scan.addColumn(cf1, cn2);

        // 其他列族
        scan.addFamily(cf2);
        scan.addColumn(cf2, cn3);

        // 将scan编码
        ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
        String scanToString = Base64.encodeBytes(proto.toByteArray());

        Configuration configuration = HBaseConfiguration.create();
        configuration.set(TableInputFormat.INPUT_TABLE, tableName);
        configuration.set(TableInputFormat.SCAN, scanToString);

        // 配置文件
        // ZooKeeper集群
//        configuration.set("hbase.zookeeper.quorum", "hadoop1,hadoop2,hadoop3");
        configuration.addResource(new Path("core-site.xml"));
        configuration.addResource(new Path("hbase-site.xml"));

        // 将HBase数据转成RDD
        JavaPairRDD<ImmutableBytesWritable, Result> HBaseRdd = jsc.newAPIHadoopRDD(configuration, TableInputFormat.class,
                ImmutableBytesWritable.class, Result.class);
        System.out.println(HBaseRdd.count());

        // 将 RDD 转成 String
        JavaRDD<String> HBaseResult = HBaseRdd.map(new Function<Tuple2<ImmutableBytesWritable, Result>, String>() {
            @Override
            public String call(Tuple2<ImmutableBytesWritable, Result> tuple2) throws Exception {
                Result result = tuple2._2;
                String rowKey = Bytes.toString(result.getRow());
                String name = Bytes.toString(result.getValue("info".getBytes(), "name".getBytes()));
                String gender = Bytes.toString(result.getValue("info".getBytes(), "gender".getBytes()));

                String cn1 = Bytes.toString(result.getValue("cf1".getBytes(), "cn1".getBytes()));

                System.out.println(tuple2);

                return "rowKey: " + rowKey + " name: " + name + " gender: " + gender + " cn1: " + cn1;
            }
        });

        System.out.println(HBaseResult.collect());  // [rowKey: 10001 name: rose gender: female cn1: value1]

        jsc.stop();
    }
}

标签:读取,getBytes,JavaSpark,import,apache,org,HBASE,spark,hbase
From: https://blog.51cto.com/u_13942374/5885202

相关文章

  • Arduino Wire.requestFrom 函数读取没有反应
    目录前言过程参考文章前言在读取DA217传感器ID的时候,发现把需要读取的寄存器地址写过去之后,再用Wire.requestFrom去读取就没有反应了(程序不会继续运行,会卡死到此处)......
  • java+pgsql实现保存图片到数据库,以及读取数据库存储的图片;java将图片保存到本地、保存
    java将图片保存到本地;pom.xml<dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.4.7</version></dependency><de......
  • 用C#语言遍历读取和操纵XML文档
    实验环境:visualstudio2017问题:用C#语言编写控制台应用(.NETFramework)程序,为“Students.xml(该文件在上两篇博客中均写到,由于篇幅所限,不在书写)”为文档增加“总分”与“平均......
  • Kettle 读取Excel时增加排序字段
    选择指定tab:1.ExcelRowIndex:为该行在源excel中的行索引2.RowIndex:为读取数据时新集合中的行索引ExcelRowIndex和RowIndex 并不相等  Reference:Kettle连接......
  • Spring Data(数据) Couchbase
    版本5.0.0本参考文档描述了SpringDataCouchbase库的一般用法。项目信息版本控制-https://github.com/spring-projects/spring-data-couchbase错误跟踪器-https://jir......
  • 安卓读取手机短信
    上次接了一个项目,大致意思是一个页面,有六个输入(EditText),以及两个Button,一个button用于读取短信,并处理读取的信息填充至六个EditText里面,另外一个按钮用于清除掉六个EditTe......
  • EasyExcel导出/读取多个sheet页数据
    1.导入``List<WriteOffBaseInfo>writeOffBaseInfoList=newArrayList<>();List<WriteOffBaseInfo>writeOffBaseInfoList1=newArrayList<>();......
  • Spring Data(数据) Couchbase(二)
    5.4.7.仓库方法的空处理从SpringData2.0开始,返回单个聚合实例的存储库CRUD方法使用Java8来指示可能缺少值。除此之外,SpringData还支持在查询方法上返回以下包装器类......
  • 随想录(png的读取和显示)
       之前在阅读FTK代码的时候,发现工程本身用到了PNGLIB的代码。虽然网上关于pnglib的描述文件很多,但是真正好用、可以用的却没有多少。所以,为了学习的方便,我自己做了一个......
  • C# 读取web.config应用程序默认配置文件
    前面写过一篇自定义config的的读取方法。前文指路......