最近在学习spark,上周将spark集群搭建起来了,今天在idea中创建了一个测试程序读取hive库中的数据,程序很简单,但是踩到一些坑,卡了四五个小时,做个记录。
搭建Spark集群比较简单:
1、安装scala,scala版本要和spark使用的scala版本一致(因为这个问题卡了五六个小时)
2、下载、解压,配置环境变量
2、配置 conf/spark-env.xml
export JAVA_HOME=/opt/java/jdk1.8.0_181
export SCALA_HOME=/opt/scala/scala-2.12.15
export HADOOP_HOME=/opt/hadoop/hadoop-3.2.4
export SPARK_HOME=/opt/spark/spark-3.2.4-bin-hadoop3.2
export SPARK_WORKER_CORES=1
export SPARK_WORKER_INSTANCES=1
export SPARK_WORKER_MEMORY=1G
export SPARK_MASTER_HOST=192.168.31.66
export SPARK_MASTER_PORT=7077
3、进入 sbin 目录下,执行 ./start-all.sh 启动集群
4、先在虚拟机上终端连接测试
spark-shell --driver-class-path /opt/hive/hive-3.1.2/lib/postgresql-42.6.0.jar --master spark://192.168.31.66:7077
正常启动,测试:
import org.apache.spark.sql.hive.HiveContext
val hc = new HiveContext(sc)
hc.sql("select * from test.table1").show
正常看到Hive表中的数据。
5、本地idea中搭建项目,具体可以参考其他的博客,引入依赖,添加scala支持。
依赖:
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
<version>3.0.8</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.4.2</version>
<configuration>
<archive>
<manifest>
<!--指定入口文件的位置-->
<mainClass>com.chenxii.jinghong.hadoop.SparkHiveTest</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>4.7.0</version>
<executions>
<execution>
<id>scala-compile-first</id>
<goals>
<goal>compile</goal>
</goals>
<configuration>
<includes>
<include>**/*.scala</include>
</includes>
</configuration>
</execution>
<execution>
<id>scala-test-compile</id>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
测试类:
package com.chenxii.jinghong.hadoop
import org.apache.spark.sql.SparkSession
@SerialVersionUID(11111L)
object SparkHiveTest extends Serializable {
def main(args: Array[String]): Unit = {
val sc = SparkSession.builder()
.master("spark://192.168.31.66:7077")
.appName("SparkHiveTest")
.config("spark.sql.warehouse.dir", "hdfs://192.168.31.66:9000/opt/hive/hive-3.1.2/warehouse")
.enableHiveSupport()
.getOrCreate()
println("连接成功")
val sql = "select * from jinghong.rate"
val result = sc.sql(sql)
result.show()
sc.stop()
}
}
输出结果:
log4j:WARN No appenders could be found for logger (org.apache.hadoop.hive.conf.HiveConf).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
连接成功
+---------+----------+----+
| uid| goods_no|rate|
+---------+----------+----+
| U0001|GOODS_0001| 1|
| U0001|GOODS_0002| 2|
| U0001|GOODS_0003| 3|
| U0001|GOODS_0004| 5|
| U0001|GOODS_0006| 4|
| U0001|GOODS_0021| 0|
| U0001|GOODS_0026| 1|
| U0001|GOODS_0054| 4|
| U0002|GOODS_0001| 2|
| U0002|GOODS_0002| 5|
| U0002|GOODS_0003| 2|
| U0002|GOODS_0004| 1|
| U0003|GOODS_0001| 0|
| U0003|GOODS_0002| 1|
| U0003|GOODS_0003| 5|
| U0003|GOODS_0004| 4|
|undefined|GOODS_0021| 2|
+---------+----------+----+
遇到的问题:
启动提示无法创建 hive.metastore.warehouse地址,是因为master配置错误了,网上写的都是
.setMaster("local[*]"),这个地址是要写spark集群的地址。启动spark集群后在 SPARK master ui上有,spark://192.168.31.66:7077,IP地址和端口都是在 spark-env.sh 中配置的。
运行程序之后连接到集群成功,但是一直无返回结果,卡住几分钟。
看后台日志:
23/05/27 14:40:49 WARN TransportChannelHandler: Exception in connection from /192.168.31.66:36969
java.io.InvalidClassException: scala.collection.mutable.WrappedArray$ofRef; local class incompatible: stream classdesc serialVersionUID = -5578289228658167167, local class serialVersionUID = 3456489343829468865
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:109)
at org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$deserialize$2(NettyRpcEnv.scala:299)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:352)
at org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$deserialize$1(NettyRpcEnv.scala:298)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:298)
at org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$askAbortable$7(NettyRpcEnv.scala:246)
at org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$askAbortable$7$adapted(NettyRpcEnv.scala:246)
at org.apache.spark.rpc.netty.RpcOutboxMessage.onSuccess(Outbox.scala:90)
at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:196)
at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:142)
at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
这个问题卡了五六个小时,没找到解决方法。最后用bing国际版搜索了一下,在GitHub Scala bug页面上发现了一个相同的问题:
WrappedArray missing explicit SerialVersionUID annotation
kaaquist 老哥 2022年8月20日也是配置spark时遇到相同的问题,他怀疑是scala的bug,最后8月24日他将scala升级之后发现这个错误不见了。
I can confirm that bumping the scala version from 2.12.11 -> 2.12.14 on my spark setup made the error disappear.
我检查了自己的scala版本确实是2.12.5,在2.12.11之前。
我下午搜索这个问题其实有人提到是scala版本不匹配的问题,我也重装了新版本的scala程序,本想从 2.13.10 降级到 spark的scala版本 2.12.15的,不小心看错了降级到2.13.5了。
于是将scala升级到2.12.15之后,再运行程序,就可以正常返回结果了。