首页 > 其他分享 >Paimon Quick Start Demo

Paimon Quick Start Demo

时间:2024-07-13 16:09:56浏览次数:11  
标签:flink -- Flink jar Start memory Quick Paimon paimon

主要解读:
1.Paimon和Hadoop 的包放到 lib
2.此处2中格式均可以: 'warehouse'='file:/tmp/paimon' 'warehouse'='file:///tmp/paimon'
3.数据持久化到了2中文件,断开连接。插入目标表任务不会中断,这个任务生命周期应该是服务器级别的流任务。再次连接后,创建catalog即可读取 word_count,但 word_table 再次连接后没有该表,这个应该是会话级别。
4. 可以在2中路径下看到持久化的文件和文件的组织形式。

Quick Start #
This documentation is a guide for using Paimon in Flink.

Jars #
Paimon currently supports Flink 1.19, 1.18, 1.17, 1.16, 1.15. We recommend the latest Flink version for a better experience.

Download the jar file with corresponding version.

Currently, paimon provides two types jar: one of which(the bundled jar) is used for read/write data, and the other(action jar) for operations such as manually compaction,

Version Type Jar
Flink 1.19 Bundled Jar paimon-flink-1.19-0.8.2.jar
Flink 1.18 Bundled Jar paimon-flink-1.18-0.8.2.jar
Flink 1.17 Bundled Jar paimon-flink-1.17-0.8.2.jar
Flink 1.16 Bundled Jar paimon-flink-1.16-0.8.2.jar
Flink 1.15 Bundled Jar paimon-flink-1.15-0.8.2.jar
Flink Action Action Jar paimon-flink-action-0.8.2.jar
You can also manually build bundled jar from the source code.

To build from source code, clone the git repository.

Build bundled jar with the following command.

mvn clean install -DskipTests
You can find the bundled jar in ./paimon-flink/paimon-flink-/target/paimon-flink--0.8.2.jar, and the action jar in ./paimon-flink/paimon-flink-action/target/paimon-flink-action-0.8.2.jar.

Start #
Step 1: Download Flink

If you haven’t downloaded Flink, you can download Flink, then extract the archive with the following command.

tar -xzf flink-*.tgz
Step 2: Copy Paimon Bundled Jar

Copy paimon bundled jar to the lib directory of your Flink home.

cp paimon-flink-*.jar <FLINK_HOME>/lib/
Step 3: Copy Hadoop Bundled Jar

If the machine is in a hadoop environment, please ensure the value of the environment variable HADOOP_CLASSPATH include path to the common Hadoop libraries, you do not need to use the following pre-bundled Hadoop jar.
Download Pre-bundled Hadoop jar and copy the jar file to the lib directory of your Flink home.

cp flink-shaded-hadoop-2-uber-*.jar <FLINK_HOME>/lib/
Step 4: Start a Flink Local Cluster

In order to run multiple Flink jobs at the same time, you need to modify the cluster configuration in <FLINK_HOME>/conf/flink-conf.yaml.

taskmanager.numberOfTaskSlots: 2
To start a local cluster, run the bash script that comes with Flink:

<FLINK_HOME>/bin/start-cluster.sh
You should be able to navigate to the web UI at localhost:8081 to view the Flink dashboard and see that the cluster is up and running.

You can now start Flink SQL client to execute SQL scripts.

<FLINK_HOME>/bin/sql-client.sh
Step 5: Create a Catalog and a Table

Catalog
-- if you're trying out Paimon in a distributed environment,
-- the warehouse path should be set to a shared file system, such as HDFS or OSS
CREATE CATALOG my_catalog WITH (
'type'='paimon',
'warehouse'='file:/tmp/paimon'
);

USE CATALOG my_catalog;

-- create a word count table
CREATE TABLE word_count (
word STRING PRIMARY KEY NOT ENFORCED,
cnt BIGINT
);
Generic-Catalog
Step 6: Write Data

-- create a word data generator table
CREATE TEMPORARY TABLE word_table (
word STRING
) WITH (
'connector' = 'datagen',
'fields.word.length' = '1'
);

-- paimon requires checkpoint interval in streaming mode
SET 'execution.checkpointing.interval' = '10 s';

-- write streaming data to dynamic table
INSERT INTO word_count SELECT word, COUNT(*) FROM word_table GROUP BY word;
Step 7: OLAP Query

-- use tableau result mode
SET 'sql-client.execution.result-mode' = 'tableau';

-- switch to batch mode
RESET 'execution.checkpointing.interval';
SET 'execution.runtime-mode' = 'batch';

-- olap query the table
SELECT * FROM word_count;
You can execute the query multiple times and observe the changes in the results.

Step 8: Streaming Query

-- switch to streaming mode
SET 'execution.runtime-mode' = 'streaming';

-- track the changes of table and calculate the count interval statistics
SELECT interval, COUNT(*) AS interval_cnt FROM
(SELECT cnt / 10000 AS interval FROM word_count) GROUP BY interval;
Step 9: Exit

Cancel streaming job in localhost:8081, then execute the following SQL script to exit Flink SQL client.

-- uncomment the following line if you want to drop the dynamic table and clear the files
-- DROP TABLE word_count;

-- exit sql-client
EXIT;
Stop the Flink local cluster.

./bin/stop-cluster.sh
Use Flink Managed Memory #
Paimon tasks can create memory pools based on executor memory which will be managed by Flink executor, such as managed memory in Flink task manager. It will improve the stability and performance of sinks by managing writer buffers for multiple tasks through executor.

The following properties can be set if using Flink managed memory:

Option Default Description
sink.use-managed-memory-allocator false If true, flink sink will use managed memory for merge tree; otherwise, it will create an independent memory allocator, which means each task allocates and manages its own memory pool (heap memory), if there are too many tasks in one Executor, it may cause performance issues and even OOM.
sink.managed.writer-buffer-memory 256M Weight of writer buffer in managed memory, Flink will compute the memory size, for writer according to the weight, the actual memory used depends on the running environment. Now the memory size defined in this property are equals to the exact memory allocated to write buffer in runtime.
Use In SQL Users can set memory weight in SQL for Flink Managed Memory, then Flink sink operator will get the memory pool size and create allocator for Paimon writer.

INSERT INTO paimon_table /*+ OPTIONS('sink.use-managed-memory-allocator'='true', 'sink.managed.writer-buffer-memory'='256M') */
SELECT * FROM ....;
Setting dynamic options #
When interacting with the Paimon table, table options can be tuned without changing the options in the catalog. Paimon will extract job-level dynamic options and take effect in the current session. The dynamic option’s key format is paimon.${catalogName}.${dbName}.${tableName}.${config_key}. The catalogName/dbName/tableName can be *, which means matching all the specific parts.

For example:

-- set scan.timestamp-millis=1697018249000 for the table mycatalog.default.T
SET 'paimon.mycatalog.default.T.scan.timestamp-millis' = '1697018249000';
SELECT * FROM T;

-- set scan.timestamp-millis=1697018249000 for the table default.T in any catalog
SET 'paimon.*.default.T.scan.timestamp-millis' = '1697018249000';
SELECT * FROM T;
Edit This Page
Copyright © 2024 The Apache Software Foundation. Apache Paimon, Paimon, and its feather logo are trademarks of The Apache Software Foundation.

标签:flink,--,Flink,jar,Start,memory,Quick,Paimon,paimon
From: https://www.cnblogs.com/huft/p/18300228

相关文章

  • Js 前置,后置补零的原生方法与补字符串 padStart及padEnd
    在工作中,遇到了需要将不满八位的一个字符串进行后补0的操作,所以就在网上学习了关于js原生补充字符串的方法,然后用这篇博客记录下来。目录前置补充字符串 String.prototype.padStart()后置补充字符串String.prototype.padEnd()前置补充字符串 String.prototype.padStart......
  • Microsoft.Windows.StartLayout.Commands.dll文件丢失导致程序无法运行问题
    其实很多用户玩单机游戏或者安装软件的时候就出现过这种问题,如果是新手第一时间会认为是软件或游戏出错了,其实并不是这样,其主要原因就是你电脑系统的该dll文件丢失了或没有安装一些系统软件平台所需要的动态链接库,这时你可以下载这个Microsoft.Windows.StartLayout.Commands.dl......
  • 时间序列分析论文翻译与笔记:The correct way to start an Exponential Moving Average
            在之前的笔记中,我们初步认识了指数移动平均(指数加权移动平均),本文将通过翻译一篇DavidOwen 在2017年的一篇博客,讨论如何确保移动平均数能够通过识别记录信息的时长,来适应新的信息。原文链接:点击这里(原文的代码为R,本文将补充py代码)目录如何正确地开始指数移......
  • ROS源码学习分享_TopicManager::start()
        在上一章节中,我们讲解了NodeHandle节点创建后的一些背后行为。其最重要的行为是启动了全部的管理节点。在本章中,我们将看一看TopicManager节点在启动之后发生了什么。(以下内容,属于个人观看源码后得出的理解,可能有错,仅用于自我复习,请批判的看待)    TopicM......
  • QtQuick.Dialogs中的FileDialog设置默认目录的问题
    在QML中,假如想要使用文件浏览器选择文件或者文件夹时。可以使用FileDialog。FileDialog有个属性folder,设置好路径之后,当你打开fileDialog时,fileDialog当前定位到的路径就是你设置的路径。但是这个folder的设置有点问题,和路径的层级有关系假如你的目标路径是大于等于三级的(比如......
  • C#面:解释startup class的configure方法有什么作用?
    这个⽅法来定义整个应⽤如何响应HTTP请求。它有⼏个⽐较重要的参数,applicationbuilder,Hostingenvironment,logofactory,在这⾥我们可以配置⼀些中间件⽤来处理路径,验证和session等等。startupclass的configure方法是ASP.NETCore应用程序中的一个重要方法,它的作用是配置......
  • Core WebAPI 中 Program.cs和Startup.cs认识 (高频考点)
    1.Program.cs认识publicclassProgram{publicstaticvoidMain(string[]args){//Build方法构建出一个web应用程序,然后去运行CreateWebHostBuilder(args)//调用下面的方法,返回一个IWebHostBuilder对象.Build()//用......
  • 机器学习策略篇:快速搭建你的第一个系统,并进行迭代(Build your first system quickly, t
    快速搭建的第一个系统,并进行迭代如果正在考虑建立一个新的语音识别系统,其实可以走很多方向,可以优先考虑很多事情。比如,有一些特定的技术,可以让语音识别系统对嘈杂的背景更加健壮,嘈杂的背景可能是说咖啡店的噪音,背景里有很多人在聊天,或者车辆的噪音,高速上汽车的噪音或者其他类型......
  • Milvus lite start 及存储策略
    背景今天开始写下Milvus,为了方便,我直接使用的是 milvus-lite版本,default情况下,你可能不知道他到底将db存储到什么位置了。启动default-server,看下Milvus的start及存储逻辑主逻辑defstart(self):self.config.resolve()_initialize_data_files(self.config......
  • srpingboot 自定义 start
    自动配置工程绑定配置文件,上逼格的start都支持自定义配置,我们也装像点~~@ConfigurationProperties("cyrus.hello")publicclassCyrusHelloProperties{//绑定配置文件cyrus.hello.username属性privateStringusername;publicStringgetUsernam......