首页 > 其他分享 >快速体验 Flink Table Store进阶篇

快速体验 Flink Table Store进阶篇

时间:2022-11-14 20:00:41浏览次数:82  
标签:Flink -- flink Kafka 进阶篇 table Table kafka

在本地安装单机版本,能够实现快速体验 Flink Table Store 的目的,本文以 Flink 1.15.2、flink-table-store-dist-0.2.1、flink-shaded-hadoop-2-uber-2.8.3-10.0 和 Kafka 3.3.1 为例,系统为 Centos 3.10,演示 TableStore 及与 Kafka 的结合应用。本文使用的 JDK 为 TencentKona-11.0.17,下载路径:https://github.com/Tencent/TencentKona-11/releases/download/kona11.0.17/TencentKona-11.0.17.b1-jdk_linux-x86_64.tar.gz。

下载安装包

  • 1、下载 Flink 1.15.2
https://www.apache.org/dyn/closer.lua/flink/flink-1.15.2/flink-1.15.2-bin-scala_2.12.tgz
  • 2、下载 Table Store 包
https://www.apache.org/dyn/closer.lua/flink/flink-table-store-0.2.1/flink-table-store-dist-0.2.1.jar
  • 3、下载 hadoop 包
https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
  • 3、下载 Kafka-3.3.1 包
https://downloads.apache.org/kafka/3.3.1/kafka_2.12-3.3.1.tgz

安装 Kafka

将下载好的 kafka_2.13-3.3.1.tgz 放到安装目录下,执行以下步骤完成安装:

  • tar xzf kafka_2.13-3.3.1.tgz

  • ln -s kafka_2.13-3.3.1 kafka

配置 Kafka

进入 Kafka 目录,编辑配置文件 config/server.properties,增加配置项 transaction.max.timeout.ms:

vi config/server.properties

transaction.max.timeout.ms=3600000

启动本地(单机模式) Kafka 集群

进入 Kafka 目录,启动 Zookeeper(这里也可使用 KRaft 替代,方法可参考 https://kafka.apache.org/quickstart):

bin/zookeeper-server-start.sh config/zookeeper.properties

创建 Topic:

bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092

查看 Topic 是否创建成功:

bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092

接下来就可启动 Kafka:

bin/kafka-server-start.sh config/server.properties

执行命令 jps 查看 Kafka 是否启动成功:

# jps
12075 QuorumPeerMain
31807 Kafka

这里的 QuorumPeerMain 为 Zookeeper 进程。

将下载好的 flink-1.15.2-bin-scala_2.12.tgz、flink-table-store-dist-0.2.1.jar 和 flink-shaded-hadoop-2-uber-2.8.3-10.0.jar 都放到安装目录下,执行以下步骤完成安装:

  • tar -xzf flink-1.15.2-bin-scala_2.12.tgz

  • ln -s flink-1.15.2 flink

  • cp flink-table-store-dist-0.2.1.jar flink-1.15.2/lib/

  • cp flink-shaded-hadoop-2-uber-2.8.3-10.0.jar flink-1.15.2/lib/

进入 Flink 目录,编辑 conf 目录下的配置文件 flink-conf.yaml,将配置项 taskmanager.numberOfTaskSlots 的值改为 2 。

vi conf/flink-conf.yaml

taskmanager.numberOfTaskSlots: 2

进入 Flink 目录,执行:

bin/start-cluster.sh

如果启动成功,则执行 jps 命令可看到 TaskManagerRunner 和 StandaloneSessionClusterEntrypoint 两个进程,执行“netstat -lpnt”可看到开启了 127.0.0.1:8081 看板端口。如果想修改看板端口,则只需要编辑配置文件 conf/flink-conf.yaml,将配置项 rest.port 改为其它值,然后重启(执行 bin/stop-cluster.sh 停止本地集群)即可。默认看板的地址为 localhost,如果需要远程查看,则还需要修改配置项 rest.bind-address,比如可将 localhost 改为 0.0.0.0 。

如果启动报错“Unsupported major.minor version 52.0”,这是因为 JDK 的版本不匹配,可执行命令“java -version”查看 JDK 的版本。根据 flink-1.15 的发布说明,建议 Java 11,但 Java 8 也可以。本文使用“Tencent Kona JDK 11.0.17”,下载地址:

https://github.com/Tencent/TencentKona-11/releases/download/kona11.0.17/TencentKona-11.0.17.b1-jdk_linux-x86_64.tar.gz

可将 TencentKona-11.0.17.b1-jdk_linux-x86_64.tar.gz 上传到 /usr/local 目录,然后解压,再建立软链接:

cd /usr/local
tar xzf TencentKona-11.0.17.b1-jdk_linux-x86_64.tar.gz
ln -s TencentKona-11.0.17.b1 jdk

将 JDK 加入 PATH 中:

export JAVA_HOME=/usr/local/jdk
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=$JAVA_HOME/lib/tools.jar

启动成功后,就可开始体验了。

进入 Flink 目录,以单机模式执行“bin/sql-client.sh embedded”,进入 SQL 操作界面,然后可按如下步骤开始体验:

  • 1、创建表*
CREATE CATALOG my_catalog WITH (
  'type'='table-store',
  'warehouse'='file:/tmp/table_store'
);

USE CATALOG my_catalog;

-- 创建单词计数动态表,并绑定 Kafka
CREATE TABLE word_count (
    word STRING PRIMARY KEY NOT ENFORCED,
    cnt BIGINT
) WITH (
    'log.system' = 'kafka',
    'kafka.bootstrap.servers' = '127.0.0.1:9092',
    'kafka.topic' = 'quickstart-events',
    'kafka.transaction.timeout.ms' = '900002'
);
  • 2、写数据
-- 创建生成单词的临时表
CREATE TEMPORARY TABLE word_table (
    word STRING
) WITH (
    'connector' = 'datagen',
    'rows-per-second'='10',
    'fields.word.length' = '1'
);

-- table store requires checkpoint interval in streaming mode
SET 'execution.checkpointing.interval' = '10 s';

-- write streaming data to dynamic table
-- 在 flink 的看板可看到产生了一个 job
INSERT INTO word_count SELECT word, COUNT(*) FROM word_table GROUP BY word;

注意,对表 word_table 只能执行流查询,执行批查询时会报如下所示的错误:

org.apache.flink.table.api.ValidationException: Querying an unbounded table 'my_catalog.default.word_table' in batch mode is not allowed. The table source is unbounded.

这里如果遇到如下错误:

2022-11-14 12:17:22,721 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Writer -> Global Committer -> Sink: end (1/1)#10 (ef09ca23506363aba20557ba3c32e9ad) switched from INITIALIZING to FAILED with failure cause: org.apache.flink.table.store.shaded.org.apache.kafka.common.KafkaException: org.apache.flink.table.store.shaded.org.apache.kafka.common.KafkaException: Unexpected error in InitProducerIdResponse; The transaction timeout is larger than the maximum value allowed by the broker (as configured by transaction.max.timeout.ms).

这是因为 Kafka 中的 transaction.max.timeout.ms 值小于 Flink 端的,在 Kafka 端默认为 900 秒,而Flink Kafka Sink 的 transaction.timeout.ms 默认值为 3600 秒。所以要么修改 Kafka 的 config/server.properties 文件,要么创建表时指定“kafka.transaction.timeout.ms” 值不比 transaction.max.timeout.ms 大。

  • 3、批量查询
-- 设置结果输出模式为 tableau,即表哥方式输出
SET 'sql-client.execution.result-mode' = 'tableau';

-- 切换到批模式(执行 SET 不带参数即可查看值)
RESET 'execution.checkpointing.interval';
SET 'execution.runtime-mode' = 'batch';

SELECT * FROM word_count;
  • 4、流式查询
-- 切换到流模式
SET 'execution.runtime-mode' = 'streaming';
SELECT * FROM word_count; -- log.scan 默认为 full,表示从头查

-- 从最新的开始查
-- “-U” 表示更新前的,“+U” 表示更新后的,“I”表示插入,“D”是删除
SELECT * FROM word_count /*+ OPTIONS ('log.scan'='latest') */;
  • 5、Kafka 上查看

切换到 Kafka 目录,在 Kafka 端查看(假设 Kafka 安装目录为 /data/kafka):

cd /data/kafka
bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
  • 6、结束体验
-- drop the dynamic table, clear the files
DROP TABLE word_count;

-- exit sql-client
EXIT;

Flink 程序由 Source、Transformation 和 Sink 三部分组成,其中 Source 负责数据的读取(支持 JDBC、HDFS、HBase、Hive、Kafka、Pulsar、Rabbitmq 和文件等),Transformation 负责对数据的转换操作, Sink 负责数据输出(支持 JDBC、HDFS、HBase、Hive、Kafka、Pulsar、Rabbitmq 和文件等),在各部间流转的数据称为流(Stream ),Flink 的组件 flink-connectors 包含了 Source 和 Sink 的实现。

标签:Flink,--,flink,Kafka,进阶篇,table,Table,kafka
From: https://www.cnblogs.com/aquester/p/16890185.html

相关文章