首页 > 其他分享 >Spark+HBase数据处理与存储实验部分内容

Spark+HBase数据处理与存储实验部分内容

时间:2023-04-20 12:13:07浏览次数:53  
标签:hbase val apache import 数据处理 org Spark spark HBase

0. Scala+Spark+HBase的IDEA环境配置

需要下载的内容:Scala、Java,注意两者之间版本是否匹配。

环境:Win10,Scala2.10.6,JDK1.7,IDEA2022.3.1

创建maven工程。

下载Scala插件。

右键项目,添加Scala框架支持。

项目结果如图所示:

scala添加为源目录,下存scala代码

添加依赖包。将property的版本号换成对应版本。依赖和插件作用在注释中。如果下载很慢记得换源。换源教程

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>untitled1</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>7</maven.compiler.source>
        <maven.compiler.target>7</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <scala.version>2.10</scala.version>
        <spark.version>1.6.3</spark.version>
        <hbase.version>1.2.6</hbase.version>
    </properties>

    <dependencies>
        <!-- 导入scala的依赖 -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.10.6</version>
        </dependency>
        <!-- 读取csv的依赖,非必须 -->
        <dependency>
            <groupId>au.com.bytecode</groupId>
            <artifactId>opencsv</artifactId>
            <version>2.4</version>
        </dependency>
        <!-- spark相关依赖 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- sql驱动 -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.43</version>
        </dependency
        >
        <!-- hbase相关依赖 -->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-common</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>${hbase.version}</version>
        </dependency>

    </dependencies>


    <build>
        <plugins>
            <!-- scala插件需要 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <phase>process-test-resources</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>
View Code

scala文件夹下添加测试代码:

import org.apache.spark.{SparkConf, SparkContext}
import au.com.bytecode.opencsv.CSVReader
import org.apache.spark.rdd.RDD

import java.io.StringReader

object WordCount {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("sparkDemo")
    val sc = SparkContext.getOrCreate(conf)
    val input = sc.textFile("file:///home/hadoop/dream.txt");
    val wordCount = input.flatMap(
      line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
    wordCount.foreach(println)
  }
}

maven clean,maven package,打包放到集群上跑一跑:

如果跑不通建议从scala输出helloworld开始调,一点一点添加依赖。

1. 了解Spark的数据读取与保存操作,尝试完成csv文件的读取和保存;

直接用sc.textFile打开会乱码,当前spark版本也不支持sparkSession,于是选择引入opencsv的依赖包

代码:

import org.apache.spark.{SparkConf, SparkContext}
import au.com.bytecode.opencsv.CSVReader

import java.io.StringReader

object WordCount {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("sparkDemo")
    val sc = SparkContext.getOrCreate(conf)
    val input = sc.textFile("file:///home/hadoop/test.csv");

    input.collect().foreach(println)
    val result = input.map { line =>
      val reader = new CSVReader(new StringReader(line));
      reader.readNext()
    }
    println(result.getClass)
    result.collect().foreach(x => {
      x.foreach(println); println("======")
    })
  }
}
View Code

可以发现spark的info很多,很吵,输出结果被淹没了,修改下配置:[转]Spark如何设置不打印INFO日志

2. 请使用Spark编程实现对每个学生所有课程总成绩与平均成绩的统计聚合,并将聚合结果存储到HBase表。

HBase表结构:

行键(number)

列簇1(information)

列簇2(score)

列簇3(stat_score)

列名

(name)

列名

(sex)

列名

(age)

列名

(123001)

列名

(123002)

列名

(123003)

列名

(sum)

列名

(avg)

学号

姓名

性别

年龄

成绩

成绩

成绩

总成绩

平均成绩

 

首先修改一下集群上的配置,不然会出现java.lang.NoClassDefFoundError

java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/HBaseConfiguration

读写HBase有两种方式,RDD和DataFrame,其中DataFrame扩展出DataSet。

读时使用HBase,写时使用DataSet。

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.Job

import scala.collection.mutable.ArrayBuffer

object WordCount {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("sparkDemo")
    val sc = SparkContext.getOrCreate(conf)

    val tablename = "student"

    val hbaseConf = HBaseConfiguration.create()
    hbaseConf.set("hbase.zookeeper.quorum", "192.168.56.121")
    hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")
    hbaseConf.set(TableInputFormat.INPUT_TABLE, tablename)

    val hBaseRDD = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])
    val count = hBaseRDD.count()
    println(count)

    val res=hBaseRDD.map { case (_, result) =>
      val key = Bytes.toString(result.getRow)
      val s1 = Bytes.toString(result.getValue("score".getBytes, "123001".getBytes))
      val s2 = Bytes.toString(result.getValue("score".getBytes, "123002".getBytes))
      val s3 = Bytes.toString(result.getValue("score".getBytes, "123003".getBytes))
      println(key,s1,s2,s3)
      val total=Integer.parseInt(s1)+Integer.parseInt(s2)+Integer.parseInt(s3);
      val aver=total/3;
      key+","+total+","+aver
    }
    println(res)

    val hbaseConf2 = HBaseConfiguration.create()
    hbaseConf2.set("hbase.zookeeper.quorum", "192.168.56.121")
    hbaseConf2.set("hbase.zookeeper.property.clientPort", "2181")
    hbaseConf2.set(TableOutputFormat.OUTPUT_TABLE, tablename)
    val job = new Job(hbaseConf2)
    job.setOutputKeyClass(classOf[ImmutableBytesWritable])
    job.setOutputValueClass(classOf[Result])
    job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

    val rdd=res.map(_.split(",")).map{arr=>
      println(arr(0),arr(1),arr(2))
      val put = new Put(Bytes.toBytes(arr(0)))
      put.addColumn(Bytes.toBytes("stat_score"), Bytes.toBytes("sum"), Bytes.toBytes(arr(1)))
      put.addColumn(Bytes.toBytes("stat_score"), Bytes.toBytes("avg"), Bytes.toBytes(arr(2)))
      (new ImmutableBytesWritable, put)
    }
    rdd.saveAsNewAPIHadoopDataset(job.getConfiguration)
    sc.stop()
  }
}
View Code

 

标签:hbase,val,apache,import,数据处理,org,Spark,spark,HBase
From: https://www.cnblogs.com/capterlliar/p/17336304.html

相关文章

  • padans关于数据处理的杂谈
    情况:业务数据基本字段会有如下:Index(['时间','地区','产品','字段','数值'],dtype='object')这样就会引发一个经典“三角不可能定理”,如何同时简约展现分时序、分产品、分字段数据。)一般来说,1、时序为作为单独的分类,2、然后剩下两个标签就是,要么:2.1、每个字段一张表,......
  • DNA序列数据处理
    dna序列数据处理通常包括以下步骤:数据预处理:首先,需要对原始dna序列数据进行预处理。其中包括测序错误的纠正、碱基质量过滤和去除低质量序列等。这个阶段是非常重要的,因为数据预处理的质量直接影响后续的特征提取和模型学习。特征提取:在dna序列分析中,会涉及到许多不同的特征......
  • pandas数据处理基础-数据读取/数据选择
    数据读取df=pd.read_csv("相对路径或者网址")呈现的结果是一个二维数组,dataframe结构;df.head()----显示的是前5行数据df.tail(7)----显示的是后7行数据df.describe()----对数据进行描述df.values----将dataframe转换为numpy数组结构df.index--查看索引df.columns--查看行......
  • PySpark学习
    学习基于AmitNandi的SparkforPythonDevelopers 1.1 wordcountexample  Chapter5  StreamingLiveDatawithSpark 目的:“investigatevariousimplementationsusinglivesourcesofdatasuchasTCPsocketstotheTwitterfirehoseandputinpl......
  • 高通量测序的数据处理与分析指北(二)-宏基因组篇
    宏基因组篇前言之前的一篇文章已经从生物实验的角度讲述了高通量测序的原理,这篇文章旨在介绍宏基因组二代测序数据的处理方式及其原理。在正文开始之前,我们先来认识一下什么是宏基因组。以我的理解,宏基因组就是某环境中所有生物的基因组的合集,这个环境可以是下水道,河流等自然环......
  • 数据处理的两个基本问题
    bx、si、di、bp在8086CPU下,只有bx、si、di、bp这四个可以用在[...]中进行内存单元的寻址就好像[ax]是错误的,[bx]是正确的[...]的用法在[...]中,如上四个寄存器可以单个出现,或者以以下的组合出现,其他的都是不合法的bx和si、bx和di、bp和si、bp和di如[bx+bp]、[......
  • 解决Spark读取tmp结尾的文件报错的问题
    业务场景flume采集文件到hdfs中,在采集中的文件会添加.tmp后缀。一个批次完成提交后,会将.tmp后缀重名名,将tmp去掉。所以,当Spark程序读取到该hive外部表映射的路径时,在出现找不到xxx.tmp文件的问题出现。解决思路:Hdfs提供了读取文件筛选的接口PathFilter。这个接口在hadoop-co......
  • 轻松应对亿级数据,HBase Scan读取速度翻倍!
    HBase是一种基于Hadoop的分布式列存储数据库,它支持大规模结构化数据的存储和随机访问。在HBase中,扫描(Scan)是一种读取表中数据的方式,它可以返回表中满足条件的一部分或全部数据。本文将介绍HBase中扫描的概念、使用方法和性能优化。1扫描的概念扫描是一种读取表中数据的方式,它可以......
  • Java语言在Spark3.2.4集群中使用Spark MLlib库完成朴素贝叶斯分类器
    一、贝叶斯定理贝叶斯定理是关于随机事件A和B的条件概率,生活中,我们可能很容易知道P(A|B),但是我需要求解P(B|A),学习了贝叶斯定理,就可以解决这类问题,计算公式如下:  P(A)是A的先验概率P(B)是B的先验概率P(A|B)是A的后验概率(已经知道B发生过了)P(B|A)是B的后验概率(已经知道A发生过了)二......
  • spark stream冷启动处理kafka中积压的数据
    因为首次启动JOB的时候,由于冷启动会造成内存使用太大,为了防止这种情况出现,限制首次处理的数据量spark.streaming.backpressure.enabled=truespark.streaming.backpressure.initialRate=200forexample:#!/bin/shTaskName="funnel"UserName="hadoop"cd`dirname$0`nohupsudo......