安装PyAlink
提前安装好anaconda并创建或切换到要安装的环境,运行pip install pyalink即可安装alink(所支持最新版本的flink,目前是1.13),也可以运行pip install pyalink-flink-***安装指定flink版本的alink,例如pip install pyalink-flink-1.12表示安装支持flink-1.12版本的alink
下载插件
alink任务中如果使用到hive、kafka等数据源,则需要提前下载对应的插件才能使用。可参考这篇文章进行下载:https://www.yuque.com/pinshu/alink_guide/plugin_downloader
插件会默认下载到/data1/tools/miniconda3/envs/alink/lib/python3.6/site-packages/pyalink/lib/plugins(每个人的anaconda安装位置不同,环境名也不同,所以位置也会有所不同)
在flink-conf.yaml文件中添加以下配置:
# java环境加载变量
env.java.opts: -Djava.security.krb5.conf=/etc/krb5.conf -DALINK_PLUGINS_DIR=/data1/tools/miniconda3/envs/alink/lib/python3.6/site-packages/pyalink/lib/plugins -DALINK_HIVE_HDFS_CONFIG=/etc/hadoop/conf
# Alink插件及hadoop配置文件位置
ALINK_HIVE_HDFS_CONFIG: /etc/hadoop/conf
PATH_HADOOP_CONFIG: /etc/hadoop/conf
HADOOP_CONF_DIR: /etc/hadoop/conf
# 配置执行flink run时使用的python解释器:
python.client.executable: /data1/tools/miniconda3/envs/alink/bin/python
# 配置执行flink run时提交到yarn上及提交到的applicationId(重启flink之前需要先把这两个参数注释掉)
execution.target: yarn-session
yarn.application.id: application_1679465143385_16077
# 配置jobmanager和taskmanager之间发送的消息的最大大小
akka.framesize: "524288000b"
# 客户端和服务端最大能接收和处理的大小
rest.client.max-content-length: 1024288000
rest.server.max-content-length: 1024288000
脚本flink.sh、start-cluster.sh、stop-cluster.sh开头新增对flink位置的配置
由于我本地环境变量之前配置过flink-1.10.0的变量,为了提交任务时不干扰到低版本的flink,故此配置
export FLINK_HOME=/data1/tools/flink-1.13.3
export FLINK_LIB_DIR=/data1/tools/flink-1.13.3/lib
export FLINK_BIN_DIR=/data1/tools/flink-1.13.3/bin
踩坑过程:
1、Alink输出的hive表用presto查询会报以下错误:
Output format org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat with SerDe org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe is not supported
从报错信息来看该表是采用默认的存储格式TextFile的outputformat --> IgnoreKeyTextOutputFormat,presto查询的时候使用的LazySimpleSerDe不支持该格式,因为alink的代码中不能单独设置hive的outpformat等参数,所以在hive-site.xml中设置默认的存储格式为Orc。
# 打开hive-site.xml
vim /etc/hive/conf/hive-site.xml
# 添加下面这个属性,将Hive的默认存储属性改为orc后保存退出
<property>
<name>hive.default.fileformat</name>
<value>Orc</value>
</property>
2、当低版本的Hive(我是2.3.4)去读取由高版本Hive(我是3.1.2)写入的数据时,会报以下数组越界异常:
java.lang.RuntimeException:ORC split generation failed with exception: java.lang.ArrayIndexOutOfBoundsException: 6
参考网上的文章:https://blog.csdn.net/lixiaoksi/article/details/106855509
发现是hive源码中对版本进行了限制,可以参考上面这篇文章对hive-exec和hive-orc这两个模块进行修改后重编译替换掉hive原来的这两个包即可解决该问题
3、当Alink任务提交到yarn上运行去读取hive里的大量数据并调用collectToDataFrame方法将operator转换为dataframe时会抛出以下异常
Execution is unpectedly no longer running on task executor
因为从这个报错信息不能看出具体原因,所以我便将该任务提交到本地集群的Flink上,运行后依旧报错了,此时查看报错信息,出现以下关键语句
the rpc invocation size 108982514 exceeds the maximum akka framesize
通过以上报错信息可得知,应该是JobManager和TaskManagers之间发送的消息超过了默认的最大大小10485760b导致报错,在flink-conf.yaml文件中增加以下配置调大该值即可解决该问题
akka.framesize: "204800000b"
4、当Alink任务在调用useLocalEnv(1)初始化本地环境时,会报以下错误
py4j.protocol.Py4JError: org.apache.flink.python.PythonOptions.PYTHON_EXECUTABLE does not exist in the JVM
检查一番后发现是环境变量配置的问题,我的环境变量中之前配置过flink-1.10.0的flink路径,但是我的alink对应的版本是flink-1.13.3的,将环境变量中flink路径改为1.13.3版本的flink即可
5、当Alink任务提交到yarn上运行且有从hive中读取数据或写入数据到hive的代码时,会报以下错误:
javax.security.auth.LoginException: Unable to obtain password from user
这个问题在网上的回答都是说没有权限读取keytab或者是找不到keytab导致的,我在nodemanager每台机都检查了一下,发现我在代码中指定的keytab都是存在的,权限也都是777,这让我百思不得其解,之后我尝试把keytab从/opt文件夹复制一份到根目录/下,重新运行,竟然没有报错,可能是权限问题导致无法读取到/opt文件夹里的内容
6、如果没有配置以下参数,则运行过程中可能会抛出异常
# java环境加载变量
env.java.opts: -Djava.security.krb5.conf=/etc/krb5.conf -DALINK_PLUGINS_DIR=/data1/tools/miniconda3/envs/alink/lib/python3.6/site-packages/pyalink/lib/plugins -DALINK_HIVE_HDFS_CONFIG=/etc/hadoop/conf
# Alink插件及hadoop配置文件位置
ALINK_HIVE_HDFS_CONFIG: /etc/hadoop/conf
PATH_HADOOP_CONFIG: /etc/hadoop/conf
HADOOP_CONF_DIR: /etc/hadoop/conf
Caused by: java.lang.Exception: Configuring the output format (com.alibaba.alink.common.io.plugin.wrapper.RichOutputFormatWithClassLoader@1ef83a78) failed: There should config the PATH_HADOOP_CONFIG in flink configure.
at org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initializeOnMaster(InputOutputFormatVertex.java:105)
at org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:174)
... 19 more
Caused by: java.lang.IllegalStateException: There should config the PATH_HADOOP_CONFIG in flink configure.
at com.alibaba.alink.common.io.catalog.plugin.HiveClassLoaderFactory.loginClassLoader(HiveClassLoaderFactory.java:216)
at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
at com.alibaba.alink.common.io.catalog.plugin.HiveClassLoaderFactory.installSecurity(HiveClassLoaderFactory.java:143)
at com.alibaba.alink.common.io.catalog.plugin.HiveClassLoaderFactory.create(HiveClassLoaderFactory.java:130)
at com.alibaba.alink.common.io.plugin.wrapper.RichOutputFormatWithClassLoader.getOutputFormat(RichOutputFormatWithClassLoader.java:41)
at com.alibaba.alink.common.io.plugin.wrapper.RichOutputFormatWithClassLoader.configure(RichOutputFormatWithClassLoader.java:70)
at org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initializeOnMaster(InputOutputFormatVertex.java:99)
... 20 more
org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:140)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: java.lang.RuntimeException: Python process exits with code: 1
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:130)
... 16 more
7、更换Hive插件版本
由于我本地的hive版本是Hive 2.1.1-cdh6.3.2,alink提供的hive版本只有2.3.4,为了能兼容,需要对下载好的hive插件目录下的部分jar包进行替换。用hive-exec-2.1.1-cdh6.3.2.jar 和 hive-metastore-2.1.1-cdh6.3.2.jar替换掉原来的hive-exec-2.3.4、hive-metastore-2.3.4,并加入新的jar包libfb303-0.9.3.jar就能成功使用了
8、alink代码中hive版本指定错误抛出异常
当执行下面的语句创建hive的catalog时,需要指定hive插件的版本
hive = HiveCatalog("hive_catalog", None, "2.1.1", "/etc/hive/conf", principal, keytab)
当你指定的版本跟hive插件的版本不一致时就会抛出下面的异常
java.lang.NosuchMethodException:org.apache.hadoop.hive.metastore.RetryingMetastoreClient.getProxy(org.apche.hadoop.hive.conf.HiveConf,[Ljava.lang.Class;,[Ljava.lang.Object;,java.lang.string)
9、alink在用Hive catalog读取hive数据且源表存在timestamp字段时存在bug
在com.alibaba.alink.common.io.catalog.HiveCatalog类的RowDataToRow私有类的map方法里(634行),手动进行timestamp的类型判断,添加以下代码
else if (o instanceof TimestampData){
o = ((TimestampData) o).toLocalDateTime();
}
添加好后重新编译,替换掉原来的alink-core包
标签:java,Alink,flink,环境,hive,alink,apache,org,搭建 From: https://www.cnblogs.com/zoufh/p/17831693.html