首页 > 编程问答 >当我尝试在 flink 集群上运行 Beam Pipeline 时,为什么会出现 ERROR:root:java.lang.NullPointerException?

当我尝试在 flink 集群上运行 Beam Pipeline 时,为什么会出现 ERROR:root:java.lang.NullPointerException?

时间:2024-07-30 13:01:03浏览次数:16  
标签:python apache-flink apache-beam flink-streaming

我正在尝试在本地托管的 Flink 集群上运行一个简单的 Beam 管道,但在执行此操作时遇到错误。我已经尝试了在互联网上可以找到的所有内容。

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
import apache_beam.transforms.window as window

def run():
    options = PipelineOptions([
        "--runner=FlinkRunner",
        "--flink_master=localhost:8081",
        "--environment_type=EXTERNAL",
        "--environment_config=localhost:50000"
    ])

    with beam.Pipeline(options=options) as p:
        (p
            | 'Create words' >> beam.Create(['to be or not to be'])
            | 'Split words' >> beam.FlatMap(lambda words: words.split(' '))
            | 'Write to file' >> WriteToText('testproject.txt')
        )

if __name__ == "__main__":
    run()

这是我尝试运行的代码,但我收到这些错误:

(env) shivakhatri@Shivas-Air beam-starter-python % python3.8 app.py 警告:root:等待 grpc 通道在 localhost:55999 处准备好。警告:root:等待 grpc 通道在 localhost:55999 处准备就绪。警告:root:等待 grpc 通道在 localhost:55999 处准备就绪。警告:root:等待 grpc 通道在 localhost:55999 处准备就绪。错误:root:java.lang.NullPointerException Traceback(最近一次调用):文件“app.py”,第23行,在<module> run()文件“app.py”,第16行,运行(p文件“ /Users/shivakhatri/Documents/work/beam-starter-python/env/lib/python3.8/site-packages/apache_beam/pipeline.py”,第 601 行,在 _ exit _ self.result 中.wait_until_finish() 文件“/Users/shivakhatri/Documents/work/beam-starter-python/env/lib/python3.8/site-packages/apache_beam/runners/portability/portable_runner.py”,第 614 行,在 wait_until_finish 中引发self._runtime_exception RuntimeError:管道 BeamApp-shivakhatri-0405201752-c62cb18a_fd177a56-ec3e-448a-a46a-4530de13702b 状态失败失败:java.lang.NullPointerException zsh:分段错误 python3.8 app.py(env)shivakhatri@Shivas-Air 光束-starter-python%


java.lang.NullPointerException 错误表明你的 Beam 管道在 Flink 集群上执行期间遇到了问题。虽然错误消息没有提供确切的原因,但有一些常见的问题需要检查:

  1. Flink 集群可用性: 请确保你的 Flink 集群正在运行并且可以通过 localhost:8081 访问。在运行 Beam 管道之前,请验证 Flink 集群是否正常运行。

  2. 环境配置: 选项 --environment_config=localhost:50000 指示 Flink 集群正在侦听作业提交的端口。此配置适用于使用外部资源管理器(如 YARN 或 Mesos)的 Flink 集群。如果你正在运行独立的 Flink 集群,则可以删除此选项。

  3. 依赖版本: 不兼容的 Beam 和 Flink 版本可能会导致问题。请确保你使用的是兼容的版本。请参阅 Beam 文档以获取受支持的版本组合。

  4. 日志记录: 启用 Beam 和 Flink 的详细日志记录,以获取有关错误的更多信息。这可以帮助查明问题的原因。

  5. 简单示例: 首先尝试运行一个更简单的 Beam 管道,以排除代码中的任何问题。例如,尝试从文本文件中读取并将其写入控制台:

import apache_beam as beam

with beam.Pipeline() as pipeline:
    (pipeline
     | 'ReadFromText' >> beam.io.ReadFromText('input.txt')
     | 'WriteToConsole' >> beam.io.WriteToText(sys.stdout)
    )

如果简单示例运行成功,则问题可能出在你的管道代码中。如果问题仍然存在,则问题可能出在你的 Flink 集群配置中。

调试步骤:

  1. 验证你的 Flink 集群是否正在运行并且可以通过 localhost:8081 访问。
  2. 如果你是独立运行 Flink,请删除 --environment_config 选项。
  3. 检查 Beam 和 Flink 版本的兼容性。
  4. 启用详细日志记录以获取有关错误的更多信息。
  5. 首先尝试运行一个简单的 Beam 管道示例。

通过仔细检查这些问题并执行调试步骤,你应该能够识别并解决导致 java.lang.NullPointerException 错误的原因。

标签:python,apache-flink,apache-beam,flink-streaming
From: 78282191

相关文章

  • 如何将多个变量分配给 python 函数中的单个参数?
    我正在尝试编写一个程序,如果可能的话,它需要一个三项式并对其进行因式分解。每当用户输入A、B和C时,三项式应该通过Factor(product,summation)函数获取,但我似乎无法弄清楚如何将A和C分配给乘积arg,将B分配给我尝试在函数外部声明不同的变量,product=(a*c)和summati......
  • python - 从文本生成音乐
    请给我一些建议为了解释一下,我输入“深度睡眠的睡眠音乐”,它将返回一个wav文件:https://www.youtube.com/watch?v=1wAdQhFJy54或者我给出一个wav文件,它会返回相同的现在这是我尝试过的:https://github.com/facebookresearch/audiocraft......
  • 从零开始的Python开发日记(7):短信验证功能开发流程
    短信验证功能开发流程在开发一个包含登录、注册以及短信验证的功能时,你需要遵循一个系统的开发流程。以下是实现这一功能的基本步骤,包括所需的技术和代码示例。1.环境配置首先,确保你的开发环境已经配置好,并安装了必要的库和工具。pipinstallfastapiuvicornsqlalche......
  • 【Python数值分析】革命:引领【数学建模】新时代的插值与拟合前沿技术
    目录​编辑第一部分:插值的基本原理及应用1.插值的基本原理1.1插值多项式1.2拉格朗日插值 1.3牛顿插值 1.4样条插值2.插值的Python实现2.1使用NumPy进行插值2.2使用SciPy进行插值2.2.1一维插值​编辑2.2.2二维插值3.插值的应用场景3.1数据平......
  • 在家用电脑上设置 Python 和 Jupyter,尝试打开 Jupyter 笔记本并显示错误,无法获取
    我有最新的Python版本3.12.4和以下版本的Jupyter:SelectedJupytercorepackages...IPython:8.26.0ipykernel:6.29.5ipywidgets:notinstalledjupyter_client:8.6.2jupyter_core:5.7.2jupyter_server:2.14.2jupyterlab......
  • Python - Reloading a module
    Eachmoduleisloadedintomemoryonlyonceduringaninterpretersessionorduringaprogramrun,regardlessofthenumberoftimesitisimportedintoaprogram.Ifmultipleimportsoccur,themodule’scodewillnotbeexecutedagainandagain.Suppose......
  • vscode python 3.7 pylance debugpy 插件 vsix
    可能报错  crashed5timesinthelast3minutes.Theserverwillnotberestarted.  ---pylance 可能报错  cannotreadpropertiesofundefinedreadingresolveEnvironment   --- debugger可能      vscodepython3.7调试没有反应......
  • Python获取秒级时间戳与毫秒级时间戳的方法[通俗易懂]
    参考资料:https://cloud.tencent.com/developer/article/21581481、获取秒级时间戳与毫秒级时间戳、微秒级时间戳代码语言:javascript复制importtimeimportdatetimet=time.time()print(t)#原始时间数据print(int(t))......
  • CEFPython
    在Tkinter界面中直接嵌入Selenium的浏览器视图并不是一件直接的事情,因为Selenium本身并不提供图形界面嵌入的功能。Selenium主要用于自动化web浏览器,但它并不直接控制浏览器窗口的显示方式,而是依赖于WebDriver来与浏览器交互。然而,你可以使用一些替代方案来在Tkinter应用中模拟或......
  • 《最新出炉》系列初窥篇-Python+Playwright自动化测试-58 - 文件下载
    1.简介前边几篇文章讲解完如何上传文件,既然有上传,那么就可能会有下载文件。因此宏哥就接着讲解和分享一下:自动化测试下载文件。可能有的小伙伴或者童鞋们会觉得这不是很简单吗,还用你介绍和讲解啊,不说就是访问到下载页面,然后定位到要下载的文件的下载按钮后,点击按钮就可以了。其实......