我正在尝试在本地托管的 Flink 集群上运行一个简单的 Beam 管道,但在执行此操作时遇到错误。我已经尝试了在互联网上可以找到的所有内容。
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
import apache_beam.transforms.window as window
def run():
options = PipelineOptions([
"--runner=FlinkRunner",
"--flink_master=localhost:8081",
"--environment_type=EXTERNAL",
"--environment_config=localhost:50000"
])
with beam.Pipeline(options=options) as p:
(p
| 'Create words' >> beam.Create(['to be or not to be'])
| 'Split words' >> beam.FlatMap(lambda words: words.split(' '))
| 'Write to file' >> WriteToText('testproject.txt')
)
if __name__ == "__main__":
run()
这是我尝试运行的代码,但我收到这些错误:
(env) shivakhatri@Shivas-Air beam-starter-python % python3.8 app.py 警告:root:等待 grpc 通道在 localhost:55999 处准备好。警告:root:等待 grpc 通道在 localhost:55999 处准备就绪。警告:root:等待 grpc 通道在 localhost:55999 处准备就绪。警告:root:等待 grpc 通道在 localhost:55999 处准备就绪。错误:root:java.lang.NullPointerException Traceback(最近一次调用):文件“app.py”,第23行,在<module> run()文件“app.py”,第16行,运行(p文件“ /Users/shivakhatri/Documents/work/beam-starter-python/env/lib/python3.8/site-packages/apache_beam/pipeline.py”,第 601 行,在 _ exit _ self.result 中.wait_until_finish() 文件“/Users/shivakhatri/Documents/work/beam-starter-python/env/lib/python3.8/site-packages/apache_beam/runners/portability/portable_runner.py”,第 614 行,在 wait_until_finish 中引发self._runtime_exception RuntimeError:管道 BeamApp-shivakhatri-0405201752-c62cb18a_fd177a56-ec3e-448a-a46a-4530de13702b 状态失败失败:java.lang.NullPointerException zsh:分段错误 python3.8 app.py(env)shivakhatri@Shivas-Air 光束-starter-python%
java.lang.NullPointerException
错误表明你的 Beam 管道在 Flink 集群上执行期间遇到了问题。虽然错误消息没有提供确切的原因,但有一些常见的问题需要检查:
-
Flink 集群可用性: 请确保你的 Flink 集群正在运行并且可以通过
localhost:8081
访问。在运行 Beam 管道之前,请验证 Flink 集群是否正常运行。 -
环境配置: 选项
--environment_config=localhost:50000
指示 Flink 集群正在侦听作业提交的端口。此配置适用于使用外部资源管理器(如 YARN 或 Mesos)的 Flink 集群。如果你正在运行独立的 Flink 集群,则可以删除此选项。 -
依赖版本: 不兼容的 Beam 和 Flink 版本可能会导致问题。请确保你使用的是兼容的版本。请参阅 Beam 文档以获取受支持的版本组合。
-
日志记录: 启用 Beam 和 Flink 的详细日志记录,以获取有关错误的更多信息。这可以帮助查明问题的原因。
-
简单示例: 首先尝试运行一个更简单的 Beam 管道,以排除代码中的任何问题。例如,尝试从文本文件中读取并将其写入控制台:
import apache_beam as beam
with beam.Pipeline() as pipeline:
(pipeline
| 'ReadFromText' >> beam.io.ReadFromText('input.txt')
| 'WriteToConsole' >> beam.io.WriteToText(sys.stdout)
)
如果简单示例运行成功,则问题可能出在你的管道代码中。如果问题仍然存在,则问题可能出在你的 Flink 集群配置中。
调试步骤:
-
验证你的 Flink 集群是否正在运行并且可以通过
localhost:8081
访问。 -
如果你是独立运行 Flink,请删除
--environment_config
选项。 - 检查 Beam 和 Flink 版本的兼容性。
- 启用详细日志记录以获取有关错误的更多信息。
- 首先尝试运行一个简单的 Beam 管道示例。
通过仔细检查这些问题并执行调试步骤,你应该能够识别并解决导致
java.lang.NullPointerException
错误的原因。