首页 > 其他分享 >本地spark通过spark集群连接hive

本地spark通过spark集群连接hive

时间:2023-05-27 23:45:15浏览次数:44  
标签:netty java scala AbstractChannelHandlerContext hive 集群 io spark

最近在学习spark,上周将spark集群搭建起来了,今天在idea中创建了一个测试程序读取hive库中的数据,程序很简单,但是踩到一些坑,卡了四五个小时,做个记录。

搭建Spark集群比较简单:
1、安装scala,scala版本要和spark使用的scala版本一致(因为这个问题卡了五六个小时)
2、下载、解压,配置环境变量
2、配置 conf/spark-env.xml

export JAVA_HOME=/opt/java/jdk1.8.0_181
export SCALA_HOME=/opt/scala/scala-2.12.15
export HADOOP_HOME=/opt/hadoop/hadoop-3.2.4
export SPARK_HOME=/opt/spark/spark-3.2.4-bin-hadoop3.2
export SPARK_WORKER_CORES=1
export SPARK_WORKER_INSTANCES=1
export SPARK_WORKER_MEMORY=1G
export SPARK_MASTER_HOST=192.168.31.66
export SPARK_MASTER_PORT=7077

3、进入 sbin 目录下,执行 ./start-all.sh 启动集群
4、先在虚拟机上终端连接测试

spark-shell --driver-class-path /opt/hive/hive-3.1.2/lib/postgresql-42.6.0.jar --master spark://192.168.31.66:7077

正常启动,测试:

import org.apache.spark.sql.hive.HiveContext
val hc = new HiveContext(sc)
hc.sql("select * from test.table1").show

正常看到Hive表中的数据。

5、本地idea中搭建项目,具体可以参考其他的博客,引入依赖,添加scala支持。
依赖:

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>${spark.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.codehaus.janino</groupId>
                    <artifactId>janino</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.codehaus.janino</groupId>
            <artifactId>janino</artifactId>
            <version>3.0.8</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.4.2</version>
                <configuration>
                    <archive>
                        <manifest>
                            <!--指定入口文件的位置-->
                            <mainClass>com.chenxii.jinghong.hadoop.SparkHiveTest</mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>4.7.0</version>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                        <configuration>
                            <includes>
                                <include>**/*.scala</include>
                            </includes>
                        </configuration>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

测试类:

package com.chenxii.jinghong.hadoop

import org.apache.spark.sql.SparkSession

@SerialVersionUID(11111L)
object SparkHiveTest extends Serializable {

  def main(args: Array[String]): Unit = {
    val sc = SparkSession.builder()
      .master("spark://192.168.31.66:7077")
      .appName("SparkHiveTest")
      .config("spark.sql.warehouse.dir", "hdfs://192.168.31.66:9000/opt/hive/hive-3.1.2/warehouse")
      .enableHiveSupport()
      .getOrCreate()

    println("连接成功")

    val sql = "select * from jinghong.rate"
    val result = sc.sql(sql)
    result.show()
    sc.stop()
  }

}

输出结果:
log4j:WARN No appenders could be found for logger (org.apache.hadoop.hive.conf.HiveConf).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
连接成功
+---------+----------+----+
| uid| goods_no|rate|
+---------+----------+----+
| U0001|GOODS_0001| 1|
| U0001|GOODS_0002| 2|
| U0001|GOODS_0003| 3|
| U0001|GOODS_0004| 5|
| U0001|GOODS_0006| 4|
| U0001|GOODS_0021| 0|
| U0001|GOODS_0026| 1|
| U0001|GOODS_0054| 4|
| U0002|GOODS_0001| 2|
| U0002|GOODS_0002| 5|
| U0002|GOODS_0003| 2|
| U0002|GOODS_0004| 1|
| U0003|GOODS_0001| 0|
| U0003|GOODS_0002| 1|
| U0003|GOODS_0003| 5|
| U0003|GOODS_0004| 4|
|undefined|GOODS_0021| 2|
+---------+----------+----+

遇到的问题:
启动提示无法创建 hive.metastore.warehouse地址,是因为master配置错误了,网上写的都是
.setMaster("local[*]"),这个地址是要写spark集群的地址。启动spark集群后在 SPARK master ui上有,spark://192.168.31.66:7077,IP地址和端口都是在 spark-env.sh 中配置的。

运行程序之后连接到集群成功,但是一直无返回结果,卡住几分钟。
看后台日志:

23/05/27 14:40:49 WARN TransportChannelHandler: Exception in connection from /192.168.31.66:36969
java.io.InvalidClassException: scala.collection.mutable.WrappedArray$ofRef; local class incompatible: stream classdesc serialVersionUID = -5578289228658167167, local class serialVersionUID = 3456489343829468865
	at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:109)
	at org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$deserialize$2(NettyRpcEnv.scala:299)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:352)
	at org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$deserialize$1(NettyRpcEnv.scala:298)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:298)
	at org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$askAbortable$7(NettyRpcEnv.scala:246)
	at org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$askAbortable$7$adapted(NettyRpcEnv.scala:246)
	at org.apache.spark.rpc.netty.RpcOutboxMessage.onSuccess(Outbox.scala:90)
	at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:196)
	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:142)
	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)
	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)

这个问题卡了五六个小时,没找到解决方法。最后用bing国际版搜索了一下,在GitHub Scala bug页面上发现了一个相同的问题:
WrappedArray missing explicit SerialVersionUID annotation
kaaquist 老哥 2022年8月20日也是配置spark时遇到相同的问题,他怀疑是scala的bug,最后8月24日他将scala升级之后发现这个错误不见了。
I can confirm that bumping the scala version from 2.12.11 -> 2.12.14 on my spark setup made the error disappear.
我检查了自己的scala版本确实是2.12.5,在2.12.11之前。
我下午搜索这个问题其实有人提到是scala版本不匹配的问题,我也重装了新版本的scala程序,本想从 2.13.10 降级到 spark的scala版本 2.12.15的,不小心看错了降级到2.13.5了。
于是将scala升级到2.12.15之后,再运行程序,就可以正常返回结果了。

标签:netty,java,scala,AbstractChannelHandlerContext,hive,集群,io,spark
From: https://www.cnblogs.com/chenxii81/p/17437589.html

相关文章

  • MyCat19——搭建MyCat高可用集群
    1HAProxy单点故障在上一篇文章里,我们在一台机器上安装了HAProxy,实现了MyCat服务的集群。但是这样的架构中,只有一个HAProxy服务,一旦这个服务发生了宕机,集群将不可用,这就是所谓的单点故障。那么怎么进一步提高HAProxy的高可用,从而解决单点故障的问题呢?通过Keepalived可以实现。2解......
  • MySQL 8.0 主从集群部署
    1、环境服务器名称IP地址备注db-161-13110.32.161.131主db-161-13210.32.161.132从2、MySQL安装参考:https://www.cnblogs.com/a120608yby/p/17164694.html3、修改配置并重启服务#主节点主要配置#vim/etc/my.cnf...server-id=131log_bin=mys......
  • K8S单Master集群安装(Docker)
    原创文档编写不易,未经许可请勿转载。文档中有疑问的可以邮件联系我。邮箱:[email protected]文章基于CentOS7.8系统使用docker作为容器运行时通过kubeadm指导搭建k8s单机master集群,使用calico作为k8s集群的网络插件。需要服务器可以联网。环境节点说明主机名IP地址操作......
  • elasticsearch-7.12.1集群设置账号密码(亲测可用)
    ES7.7以后的版本将安全认证功能免费开放了。并将X-pack插件集成了到了开源的ElasticSearch版本中。1.在集群的“主节点”上生成证书切换到elastsearch的目录下,使用下列命令生成证书bin/elasticsearch-certutilcert-outconfig/elastic-certificates.p12-pass""2.将生成......
  • 在 Kubernetes 上部署 RadonDB MySQL 集群
    1.mysql部署部署参考文档:https://radondb.com/docs/mysql/v2.2.0/installation/on_kubernetes/#content参数:https://github.com/radondb/radondb-mysql-kubernetes/blob/main/docs/zh-cn/config_para.md官网:https://radondb.comhelmrepoaddradondbhttps://radondb.github.......
  • DBeaver连接hive数据库
    引言上一篇文章,主要讲解的是如何使用DBeaver连接sqlserver数据库。本篇文章主要讲解的是:如何使用DBeaver连接hive数据库及扩展聊聊HiveServer2服务,我们操作起来吧。DBeaver连接hive数据库双击等待界面新建数据库连接选择要连接的类型:ApacheHive删除默认联机的驱动jar包添加本地离......
  • etcd集群创建+ssl证书
    创建步骤:1.下载PKI证书管理工具wget-Ocfsslhttps://github.com/cloudflare/cfssl/releases/download/v1.6.3/cfssl_1.6.3_linux_amd64&&chmod+xcfssl&&mvcfssl/usr/local/bin/wget-Ocfssljsonhttps://github.com/cloudflare/cfssl/releases/download/......
  • SparkSQL
    目录SparkSQL数据抽象案例一:加载数据成分布式表案例二:将RDD转为DataFrame使用样例类指定类型+列名自定义Schema案例三:RDD-DF-DS相互转换案例四:SparkSQL花式查询需求一、SQL和DSL两种方式实现各种查询案例五:SparkSQL实现WordCount案例六:多数据源支持案例七:电影数据分析案例八:SparkS......
  • 关于ServiceAccount以及在集群内访问K8S API
    写在开篇在之前的两篇文章中提到,有4种方式使用ConfigMap配置Pod中的容器,关于之前的两篇可参考:《一文了解K8S的ConfigMap》《下篇1:将ConfigMap中的键值对作为容器的环境变量》本篇的实战场景就以访问API的方式读取ConfigMap,也就是编写代码在Pod中运行,然后使用K8SA......
  • 【K8s二进制部署】再见Docker,基于容器轻量化工具containerd完成一套Kubernetes高可用
    Kubernetes高可用集群二进制部署(RuntimeContainerd)Kubernetes(简称为:k8s)是Google在2014年6月开源的一个容器集群管理系统,使用Go语言开发,用于管理云平台中多个主机上的容器化的应用,Kubernetes的目标是让部署容器化的应用简单并且高效,Kubernetes提供了资源调度、部署管理、服务发现......