首页 > 其他分享 >pyFlink 入门总结

pyFlink 入门总结

时间:2024-05-13 10:21:42浏览次数:25  
标签:总结 city 入门 age tbl sql col pyFlink name

一 整体流程

1. 初始化pyFlink执行环境 2. 加载数据集 3. 执行数据分析 4. 导出分析结果   二 初始化执行环境

2.1 初始化

参考代码如下 from pyflink.table import EnvironmentSettings, StreamTableEnvironment es = EnvironmentSettings.new_instance().in_batch_mode().build() tv = StreamTableEnvironment.create(environment_settings=es)

2.2 其它

待补充其它初始化方法,如流处理等

三 加载数据集

3.1 基于变量

参考代码如下 data = [['T1', 34, 'XY'],['T2', 34, 'NY'],['T3', 33, 'XX'],['T4', 33, 'JZ'],['T5', 33, 'SZ'],['T6', 33, 'PDS'],['T7', 32, 'XC'],['T8', 32, 'NY']] tbl = tv.from_elements(data, ['name','age','city'],['STRING','INT','STRING']) tv.create_temporary_view('itable', tbl)  # 注册为flinksql能访问的对向

3.2 基于pandas.DataFrame

dfa = pd.DataFrame(data, columns='name age city'.split()) tbl = tv.from_pandas(dfa) 

3.3 基于csv

csv_path = 'iexample.csv' csv_schema = 'name string, age int, city string' csv_sql = F"create table iTable({csv_schema}) with ('connector' = 'filesystem', 'path' = '{csv_path}', 'format' = 'csv'))" tv.execute_sql(csv_sql) tbl = tv.from_path('iTable') 经验1 包含表头的csv会报错 经验2 csv_sql 后面的with 中的引号必须是 单引号 双引号会报错 经验3 不要创建重名的表,会报错 此条有待进一步确认  

3.4 连接postgresql

from pyflink.table import EnvironmentSettings, StreamTableEnvironment es = EnvironmentSettings.new_instance().in_batch_mode().build() tv = StreamTableEnvironment.create(environment_settings=es)   pg_schema = 'name STRING, age INT, city string' dsn = F'jdbc:postgresql://{host}:{port}/{database}' pg_sql = F"create table pg_table ({pg_schema})with ('connector'='jdbc','url'='{dsn}','table-name'='{table_name}','driver'='org.postgresql.Driver','username'='{user}','password'='{pwd}')" print(pg_sql) tv.execute_sql(pg_sql) tbl = tv.from_path('pg_table') tbl.limit(5).execute().print() 经验4 需要下载flink-connector-jdbc-*.jar文件 和 postgresql-*.jar文件 对应目录是pyflink安装目录的 lib文件夹下 经验5 连接依赖文件与数据库版本也有关系

四 数据处理

4.1 简单处理

1) select

from pyflink.table.expressions import col, call tt = tbl.select(col("city")) tt.limit(3).execute().print()

2)group_by

tbl.group_by(col('city')).select(col('city'),call("count", col('city')).alias('cnt')).execute().print() tv.register_table('itable', tbl) tv.sql_query('select city, count(*)cnt from itable group by city').execute().print()

3)order_by

tbl.order_by(col('age').desc).execute().print()

4)buildin function

tbl.select(call('avg',col('age')).alias('age_avg')).execute().print() tbl.select(call('sum',col('age')).alias('age_sum')).execute().print()

5)normalized



@udf(result_type='Row<_name string, _age float>', func_type='pandas')
def inormal(data: pd.DataFrame) -> pd.DataFrame:
    data['_age'] = (data['age']-data['age'].mean())/data['age'].std()
    return data[['name', '_age']]
tbl.map(inormal).execute().print()

6)map & udf



@udf(result_type='Row<_name string, province string>', func_type='pandas')
def itown(data: pd.DataFrame) -> pd.DataFrame:
    data['province'] = data.city.apply(lambda x: dct.get(x))
    return data[['name', 'province']]
tbl.map(itown).execute().print()

4.2 其它处理

待补充

五 输出

5.1 print

tbl.map(itown).execute().print()

5.2 CSV

# 定义输出 CSV 文件的 schema
sink_schema = "name STRING, age int, _age float, city string, province string"
# 定义输出 CSV 文件的目录
sink_path = "tmpfile"
# 注册输出表
tv.execute_sql(f"""
    CREATE TABLE sink_table (
        {sink_schema}
    ) WITH (
        'connector' = 'filesystem',
        'path' = '{sink_path}',
        'format' = 'csv'
    )
""")

_age = tbl.map(inormal)
_town = tbl.map(itown)
t1 = tbl.join(_age).where(col('name')==col('_name')).select(col('name'), col('age'), col('_age'), col('city'))
t2 = t1.join(_town).where(col('name')==col('_name')).select(col('name'), col('age'), col('_age'), col('city'), col('province'))
tv.create_temporary_view('jtable', t2)  # old api tv.register_table

# 执行查询并将结果写入输出表
sql = 'INSERT INTO sink_table(name, age, _age, city, province) SELECT name, age, _age, city, province FROM jtable'
tv.execute_sql(sql).wait()
 

经验6 csv输出路径只能指定目录 不能指定名称

经验7 数据库输出需要提前创建供写入的表

5.3 POSTGRESQL



pg_schema = "name STRING, age int, _age float, city string, province string"
dsn = F'jdbc:postgresql://{host}:{port}/{database}'
pg_sql = F"create table pg_table ({pg_schema})with ('connector'='jdbc','url'='{dsn}','table-name'='{table_name}','driver'='org.postgresql.Driver','username'='{user}','password'='{pwd}')"
tv.execute_sql(pg_sql)  # create

sql = 'INSERT INTO pg_table(name, age, _age, city, province) SELECT name, age, _age, city, province FROM jtable'
tv.execute_sql(sql).wait() 




标签:总结,city,入门,age,tbl,sql,col,pyFlink,name
From: https://www.cnblogs.com/ddzhen/p/18188654

相关文章

  • kettle从入门到精通 第五十九课 ETL之kettle 邮件发送多个附件,使用正则轻松解决
    问题场景:一个朋友说他用kettle将生成好的多个文件(a.xls和b.xls,文件在data目录下)发送给客户,但是data目录下还有其他的文件,他如果指定data目录发送会把data目录下面的所有文件都作为附件进行发送,显然不符合要求,所以他当时的临时解决方法是创建个临时目录,里面只放a.xls和b.xls两个......
  • CTF总结
    CTFmisccryptorepyjail总结MISC常见文件16进制头尾一些CTF题目的附件会去掉文件头,需要补全文件头,在一些文件里也可能隐藏多个文件,这就需要熟悉文件尾/文件头以方便提取或做判断,题目附件在下载的时候可能会没有文件后缀,无法判断是那种文件,由于各文件的16进制都有规定格式,在......
  • 关于为随机函数PRF的入门认知
    伪随机函数(PseudoRandomFunction,即PRF)在密码学中是一个重要的概念,是一个基础的密码学原语。基本概念PRF是一个确定性的函数。我们记定义在$(K,X,Y)$上的函数$F$,其中$K$是密钥空间,$X$和$Y$分别是输入和输出空间。对于PRF,给定确定的密钥k,函数$F$应该看上去是一个定义在$X\rig......
  • rdp利用技巧总结
    近期在项目中管理员在rdp挂载之后搞掉了管理员,想着有时间就整理下针对rdp的利用方法。针对挂盘的利用方法复制文件这个不多说,可以根据的不同的挂盘来决定是拖文件还是放启动项。有一些自动文件监控和拷贝的应用,如:https://github.com/cnucky/DarkGuardianDarkGuardian是一款用于监......
  • 软考知识点总结
    第一章计算机组成,操作系统CPU:运算器,控制器PU(中央处理器,CentralProcessingUnit)是计算机系统中负责执行程序指令和数据运算的核心部件。一个典型的CPU通常包括以下几个主要组成部分:   控制单元:控制单元负责从内存中获取待执行的指令并解码、译码。它还负责生成相应的操作......
  • Vue.js的Vue@Cli入门指南
    Vue.js是一款流行的JavaScript框架,它使得构建交互式的Web界面变得简单和快捷。Vue@Cli是Vue.js官方提供的脚手架工具,它能够帮助我们快速搭建Vue.js项目,并提供了丰富的功能和插件。准备工作在开始之前,确保您已经安装了node.js和npm。然后,您可以通过以下命令安装Vue@Cli:npminsta......
  • ThreadLocal入门笔记
    ThreadLocal入门笔记最近学习小傅哥的面经手册,学习到ThreadLocal,这里做个笔记加深印象,也方便日后复习。ThreadLocal是除了加锁这种同步方式之外的一种规避多线程访问出现线程不安全的方法,它的核心思想是:共享变量在每个线程都有一个副本,每个线程操作的都是自己的副本,对另外的线程......
  • 【vue3入门】-【22】 插槽
    插槽-基本使用方式我们已经了解了组件能够接收任意类型的JavaScript值作为props,但是组件要如何接收模版内容呢?在某些场景中,我们可能想要为子组件传递一些模版片段,让子组件在他们的组件中渲染这些片段。最基本的使用方式app.vue<template><!--单标签就是仅应用当前组件-->......
  • Windows 下 PyTorch 入门深度学习环境安装(CPU版本)
    Windows下PyTorch入门深度学习环境安装(CPU版本)一、安装Anaconda二、虚拟环境配置2.1基础命令列出虚拟环境condaenvlist创建虚拟环境https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/maincondacreate-n虚拟环境名字python=版本-c镜像地址激活环境conda......
  • PyTorch深度学习快速入门教程
    PyTorch深度学习快速入门教程一、基础知识1.1Python学习中的两大法宝1.2pycharm以及jupyter使用及对比将环境写入Notebook的kernel中:python-mipykernelinstall--user--name环境名称--display-name"Python(环境名称)"打开Jupyternotebook,新建Python文件,这时候......