首页 > 其他分享 >ETL工具-nifi干货系列 第十六讲 nifi Process Group实战教程,一文轻松搞定

ETL工具-nifi干货系列 第十六讲 nifi Process Group实战教程,一文轻松搞定

时间:2024-04-24 22:46:36浏览次数:25  
标签:nifi 文件 Group Process 流程 端口 处理 FlowFile

1、目前nifi系列已经更新了10多篇教程了,跟着教程走的同学应该已经对nifi有了初步的解,但是我相信同学们应该有一个疑问:nifi设计好的数据流列表在哪里?如何同时运行多个数据流?如启停单个数据流?

带着这些疑问,今天的主角nifi Process Group正式登场,先给大家看个图。

2、Process Group(处理组)

1)当数据流变得复杂时,以更高级、更抽象的层次来思考数据流往往是有益的。也就是说通过处理组将一个或多个流程封装起来,屏蔽其复杂性。

2)NiFi允许将多个组件,如处理器(Processors),组合成一个处理组(Process Group)。处理组中可以是一些列的处理器,也可以是处理组。

3)NiFi用户界面使得数据流管理者(DFM)能够轻松将多个处理组连接成一个逻辑数据流。用户能够将多个处理组连成一个逻辑数据流。但是处理组之间进行连接需要用到Input Port和Out Port,后续会进行讲解。

4)同时也允许DFM进入处理组以查看和操作处理组内的组件。操作人员可以很方便的进入(双击处理组)和退出处理组(点击面包屑导航)。

 3、处理组配置,空白画布默认为根处理组,我把根处理组的名字修改为myNiflFlow。myNiflFlow处理组包含处理组【用户信息同步】和【交易订单同步】两个处理组。

当然这里的分组不一定合适,有可能是按照系统分的,如myNiflFlow处理组包含处理组【申请系统】、【审核系统】、【信贷系统等】,自己可以根据业务场景进行分组。

1)鼠标点击空白处,点击▶️运行按钮可以启动多个处理组流程。

2)鼠标点击空白处,点击⏸️暂停按钮可以停止多个处理组流程。

3)选中某个处理组,可以单独启动或者暂停该处理组,而不影响其他处理组。

4)启用,禁用等按钮操作也可以进行类似批量或者单个处理组操作。

 5)Process Group Name:自定义处理组名称

6)Process Group Parameter Context:它用于为流程中的组件提供参数。从这个下拉菜单中,用户可以选择应该绑定到此流程组的参数上下文,并可选择创建一个新的参数上下文以绑定到该流程组。后续参数和参数上下文设置单独讲解,这里先做了解。

7) Process Group FlowFile Concurrency:

FlowFile Concurrency 用于控制数据如何进入流程组。有三种可用选项:

1. 无限制(默认值)

2. 单个节点每次一个流文件

3. 单个节点每次一个批次

当FlowFile并发性设置为“无限制”时,流程组中的输入端口将尽可能快地摄取数据,前提是背压不会阻止它们这样做。

当FlowFile并发性配置为“单个节点每次一个流文件”时,输入端口将只允许一个流文件通过。一旦该流文件进入流程组,直到所有流文件离开流程组(通过从系统中移除/自动终止,或通过输出端口退出)之前,将不会再带入任何其他流文件。这通常会导致性能较慢,因为它减少了NiFi用于处理数据的并行性。然而,用户可能希望使用这种方法的原因有几个。一个常见的用例是每个传入的FlowFile包含对其他几个数据项的引用,比如目录中的文件列表。用户可能希望在允许任何其他数据进入流程组之前处理整个列表。

当FlowFile并发性配置为“单个节点每次一个批次”时,输入端口的行为方式与“单个节点每次一个流文件”模式类似,但是当摄取一个流文件时,输入端口将继续摄取所有数据,直到所有馈送输入端口的队列已被清空。在那时,它们将不会将任何更多的数据带入流程组,直到所有数据已完成处理并离开流程组。

8)Process Group Outbound Policy:出站策略控制了数据从流程组中流出的方式。有两个可用选项:

1. 当可用时流式传输(默认值)
2. 批量输出

当出站策略配置为“当可用时流式传输”时,到达输出端口的数据将立即从流程组中转移出去,假设没有施加任何背压。

当出站策略配置为“批量输出”时,输出端口将不会将数据从流程组中传输出去,直到所有数据都在输出端口排队(即,在所有数据完成处理之前,没有数据离开流程组)。无论数据是否全部排队到同一个输出端口,还是一些数据排队到输出端口A,而另一些数据排队到输出端口B,这些条件在流文件处理完成方面都被视为相同。

将出站策略设置为“批量输出”,并结合使用“单个节点每次一个流文件”的FlowFile并发性,允许用户轻松摄取单个流文件(该流文件本身可能代表一批数据),然后等待直到该流文件的所有处理完成后再继续数据流的下一步(即流程组之外的下一个组件)。此外,在使用此模式时,从流程组传输出的每个流文件都将被赋予一系列属性,属性名称为“batch.output.<Port Name>”,每个流程组中的输出端口都有一个。该值将等于被路由到该输出端口的流文件数量。例如,考虑一个情况,其中一个单个流文件被拆分为5个流文件:两个流文件送到输出端口A,一个送到输出端口B,两个送到输出端口C,而没有流文件送到输出端口D。在这种情况下,每个流文件将具有属性 batch.output.A = 2, batch.output.B = 1, batch.output.C = 2, batch.output.D = 0。

当与“无限制”的FlowFile并发性结合使用时,“批量输出”出站策略不提供任何好处。因此,如果FlowFile并发性设置为“无限制”,则会忽略出站策略。

 9)Process Group Comments:自定义注解  10)Default FlowFile Expiration 、Default FlowFile Expiration, Default Back Pressure Object Threshold, 最后三个元素是默认流文件过期、默认背压对象阈值和默认背压数据大小阈值。这些设置配置了创建新连接时的默认值。每个连接表示一个队列,每个队列都有流文件过期、背压对象计数和背压数据大小的设置。在此指定的设置将影响流程组中创建的所有新连接的默认值;它不会影响现有连接。在配置的流程组内创建的子流程组将继承默认设置。再次强调,现有的流程组不会受到影响。如果没有使用这些选项进行覆盖,根流程组将从nifi.properties获取其默认背压设置,并具有默认的流文件过期时间为“0秒”(即不过期)。  11)Log File Suffix:为处理组指定日志文件后缀,不填写,默认日志文件为nifi-app.log。

 

 

标签:nifi,文件,Group,Process,流程,端口,处理,FlowFile
From: https://www.cnblogs.com/zjBoy/p/18153931

相关文章

  • 10.prometheus监控--监控进程process
    一、进程监控如果想要对主机的进程进行监控,例如chronyd,sshd等服务进程以及自定义脚本程序运行状态监控。我们使用nodeexporter就不能实现需求了,此时就需要使用processexporter来做进程状态的监控。项目地址:https://github.com/ncabatoff/process-exporter二、process-export......
  • 模块(pickle、subprocess、正则re)
    【一】序列化模块【1】json模块将python对象序列化成json字符串将json字符串反序列化成python对象importjsonjson.dump()#写文件json.dumps()#转换字符串json.load()#读数据json.loads()#将字符串转回对象【2】pickle模块用于python特有的类型和python的......
  • 序列化模块,subprocess模块,re模块,常用正则
    Ⅰ序列化模块【1】json模块'''json模块是一个序列化模块,主要用于跨语言传输'''1.由下图可知json格式数据是不同编程语言之间数据交互的媒介2.json格式数据的具体特征 结论一中:数据基于网络传输肯定是二进制格式在python中bytes类型的数据可以直接看成是二进制格式......
  • list all possible combination of group separator and decimal separator by iterat
    一共有7种子组合01[,2C][.2E]en-US02[C2A0][,2C]fr-FR03[.2E][,2C]da-DK04[’E28099][.2E]de-CH05[C2A0][.2E]tn-BW06[,2C][/2F]fa-IR07[’E28099][,2C]wae-CHvarlist=CultureInfo.GetCultures(CultureTypes.AllCultures);Dictionary<string,List<str......
  • 【ArcGIS Pro SDK】ArcGIS Pro SDK Geoprocessor 仿 ArcGIS Engine Geoprocessor、IGP
    Baci.Net.ToolKit.ArcGISProGeoprocessor介绍在ArcGISProSDK(2.8)中仿ArcGISEngine中的Geoprocessor、IGPProcess。ArcGISProSDKGeoprocessor仿ArcGISEngine中的Geoprocessor实现2.8版本下的所有工具的生成。方法、参数的注释,翻译(机翻)。各个工具支持的有效的环......
  • ETL工具-nifi干货系列 第十四讲 nifi处理器QueryDatabaseTableRecord查询表数据实战教
    1、处理器QueryDatabaseTableRecord和处理器QueryDatabaseTable比较相似,该组件生成一个SQL查询,或者使用用户提供的语句,并执行它以获取所有在指定的最大值列中值大于先前所见最大值的行。QueryDatabaseTable的查询结果将被转换为Avro,而QueryDatabaseTableRecord的查询结果则被......
  • ETL工具-nifi干货系列 第十三讲 nifi处理器QueryDatabaseTable查询表数据实战教程
    1、处理器QueryDatabaseTable,该组件生成一个SQL查询,或者使用用户提供的语句,并执行它以获取所有在指定的最大值列中值大于先前所见最大值的行。查询结果将被转换为Avro格式,如下图所示: 本示例通过QueryDatabaseTable处理器连接数据库查询表数据,然后连接到LogMessage打印日志......
  • uniapp checkbox_group实现全选和反选功能
    <template> <view> <label> <checkbox:value="value":checked="allpicks"@tap="allpick"/><text>全选</text> </label> <checkbox-groupname="allpick"> <label......
  • 30 天精通 RxJS (20):Observable Operators - window, windowToggle, groupBy
    前几天我们讲完了能把HigherOrderObservable转成一般的Observable的operators,今天我们要讲能够把一般的Observable转成HigherOrderObservable的operators。其实前端不太有机会用到这类型的Operators,都是在比较特殊的需求下才会看到,但还是会有遇到的时候。Op......
  • process scheduling (进程调度)& practice 1 process operation
    进程切换并发进程的切换并发进程中,一个进程在执行过程中可能被另一个进程替换占有CPU,这个过程称为“进程切换”是什么触发了进程切换?进程切换时要做什么?操作系统到底做了什么操作2中断技术中断是指程序执行过程中当发生某一个事件时,中止cpu上现行的程序的运行in......