首页 > 其他分享 >大数据NiFi(十九):实时Json日志数据导入到Hive

大数据NiFi(十九):实时Json日志数据导入到Hive

时间:2023-06-11 23:04:54浏览次数:46  
标签:导入到 NiFi 文件 配置 flowfile Json 处理器 root EvaluateJsonPath


大数据NiFi(十九):实时Json日志数据导入到Hive_json

文章目录

实时Json日志数据导入到Hive

一、配置“TailFile”处理器

1、创建“TailFile”处理器

2、配置“PROPERTIES”

二、配置“EvaluateJsonPath”处理器

1、创建“EvaluateJsonPath”处理器

2、配置“PROPERTIES”

3、连接“TailFile”处理器和“EvaluateJsonPath”处理器

三、配置“ReplaceText”处理器

1、新建“ReplaceText”处理器,配置“PROPERTIES”处理器

2、连接“EvaluateJsonPath”处理器与“ReplaceText”处理器,同时设置

四、配置“PutHDFS”处理器

1、创建“PutHDFS”处理器并配置

2、连接“ReplaceText”处理器与“PutHDFS”处理器并配置

五、运行测试

1、在Hive中创建外表personinfo在Hive中创建外表personinfo

2、启动NiFi处理数据流程,处理数据

3、查看结果

六、配置“ConvertRecord”处理器

1、创建“ConvertRecord”处理器

2、配置“PROPERTIES”中“Record Reader”配置

3、配置“PROPERTIES”中“Record Writer”:

4、连接“TailFile”处理器和“ConvertRecord”处理器

5、连接“ConvertRecord”处理器与“PutHDFS”处理器

七、运行测试


实时Json日志数据导入到Hive

案例:使用NiFi将某个目录下产生的json类型的日志文件导入到Hive。这里首先将数据通过NiFi将Json数据解析属性,然后手动设置数据格式,将数据导入到HDFS中,Hive建立外表映射此路径实现外部数据导入到Hive中。

使用到的处理器有:“TailFile”、“EvaluateJsonPath”、“ReplaceText”、“PutHDFS”四个处理器。

一、配置“TailFile”处理器

“TailFile”处理器作用是"Tails"一个文件或文件列表,在文件写入文件时从文件中摄取数据。监控的文件为文本格式,当写入新行时会接收数据。如果要Tail的文件是定期"rolled over(滚动)"的(日志文件通常是这样),则可以使用可选的"Rolling Filename Pattern"从已滚动的文件中检索数据,NiFi未运行时产生的滚动文件在NiFi重启后仍会监控到。建议将运行计划设置为几秒,不使用默认0秒运行,否则此处理器将消耗大量资源。此处理器不支持监控压缩的文件。

关于“TailFile”处理器的“Properties”配置的说明如下:

配置项

默认值

允许值

描述

Tailing mode

(监控模式)

Single file

▪Single file

▪Multiple files

single file只会tail一个文件, multiple file将tail一个文件列表。在multiple file模式下,需要配置"Base directory"。

File(s) to Tail

(监控文件)



在Single file模式下,配置文件的全路径名称。如果使用multiple file模式,这里配置正则表达式,在Base directory中匹配查找要tail的文件,如果"Recursive lookup"设置为true,则正则表达式将用于匹配从"Base directory"开始的路径递归查找。

Rolling Filename Pattern

(滚动文件名匹配)



配置滚动文件匹配名称,支持通配符*和?,支持${filename}属性指定模式。如果NiFi重启,已经滚动的文件也能从停止的位置监控到。

Base directory

(基本目录)



用于查找需要tail的文件的基本目录。使用multiple file模式时,必需设置。

Initial Start Position

(初始监控位置)

Beginning of File


当处理器首次开始tail数据时,此属性指定处理器应在何处开始读取数据。当处理器从文件中提取数据后,处理器将从上一次接收数据的最位置继续tail数据。

有如下可选项:

▪Beginning of File

▪Beginning of Time

▪Beginning of File

▪Current Time

State Location

(数据位置)

Local

▪Local

▪Remote

指定状态位于本地或群集的位置,以便可以适当地存储状态,保证数据不被重复tail。

Recursive lookup(递归查找)

false

▪Local

▪Remote

使用"multiple file"模式时,此属性定义是否必须在基目录中递归列出文件。

Lookup frequency(查询频率)

10 minutes


仅用于"multiple file"模式。它指定处理器在再次列出需要tail的文件之前将等待的最短时间。

Maximum age

(最大时间)

24 hours


仅用于"multiple file"模式。如果自文件最后一次修改以来经过的时间大于此配置时间段,则不会tail文件。

配置步骤如下:

1、创建“TailFile”处理器

大数据NiFi(十九):实时Json日志数据导入到Hive_NiFi_02

2、配置“PROPERTIES”

大数据NiFi(十九):实时Json日志数据导入到Hive_hive_03

注意:以上需要在NiFi集群中的每个节点上创建“/root/test/jsonfile”文件,“jsonfile”是文件,而非目录。

二、配置“EvaluateJsonPath”处理器

“EvaluateJsonPath”处理器根据FlowFile的内容计算一个或多个JsonPath表达式。根据处理器的配置,这些表达式的结果被赋值给FlowFile属性,或者被写入FlowFile本身的内容。

通过添加用户自定义的属性来输入Jsonpath,添加的属性的名称映射到输出流中的属性名称,属性的值必须是有效的JsonPath表达式(例如:$.name)。"auto-detect"的返回类型将根据配置的目标进行确定。当"Destination"被设置为"flowfile-attribute"时,将使用"scalar"(标量)的返回类型。当"Destination"被设置为"flowfile-content"时,将使用"JSON"返回类型。

如果JsonPath计算为JSON数组或JSON对象,并且返回类型设置为"scalar",则流文件将不进行修改,并将路由到失败。如果所提供的JsonPath计算为指定的值,JSON的返回类型可以返回"scalar"。如果目标是"flowfile-content",并且JsonPath没有计算到对应的值,那么流文件将被路由到"unmatched",无需修改其内容。如果目标是"flowfile-attribute",而表达式不匹配任何内容,那么将使用空字符串作为属性的值,并且FlowFile将始终被路由到"matched"。

关于“EvaluateJsonPath”处理器的“Properties”配置的说明如下:

配置项

默认值

允许值

描述

Destination

(目标)

flowfile-content

▪flowfile-content

▪flowfile-attribute

指示是否将JsonPath计算结果写入FlowFile内容或FlowFile属性;如果使用flowfile-attribute,则必须指定属性名称。如果设置为flowfile-content,则只能指定一个JsonPath,并且忽略属性名。

Return Type

(返回类型)

auto-detect

▪auto-detect

▪json

▪scalar

指示JSON路径表达式的期望返回类型。选择"auto-detect","flowfile-content"的返回类型自动设置为"json","flowfile-attribute"的返回类型自动设置为"scalar"标量。

Path Not Found Behavior

(未找到路径)

ignore

▪warn

▪ignore

指示在将Destination设置为"flowfile-attribute"时如何处理丢失的JSON路径表达式。当没有找到JSON路径表达式时,选择"warn"将生成一个警告。

Null Value Representation

(Null值表示)

empty string

▪empty string

▪the string 'null'

指示产生空值的JSON路径表达式的所需表示形式。

示例说明:

  • 提取流文件json内容,作为输出流的属性。(注意:当输出选择flowfile-attribute时,即使jsonpath匹配不到值,流文件也会路由到matched)

大数据NiFi(十九):实时Json日志数据导入到Hive_hive_04

输入json如下:

大数据NiFi(十九):实时Json日志数据导入到Hive_json_05

输出结果如下:

大数据NiFi(十九):实时Json日志数据导入到Hive_hadoop_06

  • 提取流文件json内容,作为输出流的内容。(注意:当选择flowfile-content时,用户只能自定义添加一个属性;如果jsonPath匹配不到,会路由到unmatched)

大数据NiFi(十九):实时Json日志数据导入到Hive_hadoop_07

输出流内容:

大数据NiFi(十九):实时Json日志数据导入到Hive_json_08

介绍完“EvaluateJsonPath”如何使用,下面来配置,配置步骤如下:

1、创建“EvaluateJsonPath”处理器

大数据NiFi(十九):实时Json日志数据导入到Hive_hive_09

2、配置“PROPERTIES”

大数据NiFi(十九):实时Json日志数据导入到Hive_NiFi_10

3、连接“TailFile”处理器和“EvaluateJsonPath”处理器

大数据NiFi(十九):实时Json日志数据导入到Hive_NiFi_11

三、配置“ReplaceText”处理器

“ReplaceText”处理器会替换正则表达式匹配到的FlowFile中的内容,生成新的FlowFile内容。这里我们使用“ReplaceText”处理器将上个处理器“EvaluateJsonPath”处理后的每个FlowFile内容替换成自定义的内容,这里自定义内容都是从FlowFile的属性中获取的值,按照“\t”制表符隔开,方便后期存储到HDFS中映射Hive表。

配置步骤如下:

1、新建“ReplaceText”处理器,配置“PROPERTIES”处理器

大数据NiFi(十九):实时Json日志数据导入到Hive_大数据_12

2、连接“EvaluateJsonPath”处理器与“ReplaceText”处理器,同时设置

大数据NiFi(十九):实时Json日志数据导入到Hive_hadoop_13

在“EvaluateJsonPath”中设置“failure”与“unmatched”的传递关系为自动终止:

大数据NiFi(十九):实时Json日志数据导入到Hive_大数据_14

四、配置“PutHDFS”处理器

这里创建“PutHDFS”处理器将上游处理的数据写入HDFS目录中。配置步骤如下:

1、创建“PutHDFS”处理器并配置

大数据NiFi(十九):实时Json日志数据导入到Hive_json_15

2、连接“ReplaceText”处理器与“PutHDFS”处理器并配置

大数据NiFi(十九):实时Json日志数据导入到Hive_json_16

大数据NiFi(十九):实时Json日志数据导入到Hive_hive_17

配置“ReplaceText”处理器“failure”的FlowFile传递关系为自动终止:

大数据NiFi(十九):实时Json日志数据导入到Hive_hadoop_18

配置“PutHDFS”处理器“failure”和“success”的FlowFile传递关系为自动终止:

大数据NiFi(十九):实时Json日志数据导入到Hive_NiFi_19

五、运行测试

1、在Hive中创建外表personinfo在Hive中创建外表personinfo

CREATE TABLE personinfo(
id int,
name string,
age int
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
LOCATION '/mycluster/personinfo'

2、启动NiFi处理数据流程,处理数据

向任意NiFi节点/root/test/jsonfile文件中写入以下数据写入以下数据:

echo "{\"id\":1,\"name\":\"zhangsan\",\"age\":18}" >> /root/test/jsonfile
echo "{\"id\":2,\"name\":\"lisi\",\"age\":19}"	>> /root/test/jsonfile
echo "{\"id\":3,\"name\":\"wangwu\",\"age\":20}"	>> /root/test/jsonfile
echo "{\"id\":4,\"name\":\"maliu\",\"age\":21}"	>> /root/test/jsonfile
echo "{\"id\":5,\"name\":\"tianqi\",\"age\":22}"	>> /root/test/jsonfile

3、查看结果

NiFi页面:

大数据NiFi(十九):实时Json日志数据导入到Hive_hive_20

hive中结果:

大数据NiFi(十九):实时Json日志数据导入到Hive_hive_21

问题:当我们一次性向某个NiFi节点的“/root/test/jsonfile”文件中写入数据时,这时“EvaluateJsonPath”一个FlowFile中会有多条json数据,当获取json属性时,只会获取第一条json对应的属性。当数据流向下游“ReplaceText”处理器时,由于设置每行替换成指定格式的行,这时会出现将本批次所有行数据都替换成了第一行的json格式数据。如下图:

大数据NiFi(十九):实时Json日志数据导入到Hive_hadoop_22

当一次性向tail的文件输入多条数据,我们不希望全部json行内容替换成第一行json内容,那么可以将“TailFile”处理器处理的数据直接传递给“ConvertRecord”处理器,将数据由json格式转换成自定义文本格式数据,再传递到“PutHDFS”处理器即可,所以解决以上问题,我们这里复用之前的“TailFile”和“PutHDFS”处理器即可,下面只需要配置“ConvertRecord”处理器即可。

六、配置“ConvertRecord”处理器

“ConvertRecord”根据配置的“记录读取器”和“记录写出控制器”来将记录从一种数据格式转换为另一种数据格式。

关于“ConvertRecord”处理器的“Properties”配置的说明如下:

配置项

默认值

允许值

描述

Record Reader

(记录读取器)



指定读取数据的Controller Service。

Record Writer

(记录写出)



指定写出数据的Controller Service。

Include Zero Record FlowFiles(没有记录的FlowFiles)

true

▪true

▪false

在转换传入的流文件时,如果转换没有产生数据,则此属性指定是否将流文件发送到相应的关系。

配置步骤如下:

1、创建“ConvertRecord”处理器

大数据NiFi(十九):实时Json日志数据导入到Hive_hive_23

2、配置“PROPERTIES”中“Record Reader”配置

大数据NiFi(十九):实时Json日志数据导入到Hive_hive_24

大数据NiFi(十九):实时Json日志数据导入到Hive_hive_25

点击以上之后,进入配置:

大数据NiFi(十九):实时Json日志数据导入到Hive_NiFi_26

大数据NiFi(十九):实时Json日志数据导入到Hive_大数据_27

配置好以上之后,需要启动:

大数据NiFi(十九):实时Json日志数据导入到Hive_NiFi_28

3、配置“PROPERTIES”中“Record Writer”:

大数据NiFi(十九):实时Json日志数据导入到Hive_hive_29

点击以上之后进入配置:

大数据NiFi(十九):实时Json日志数据导入到Hive_json_30

大数据NiFi(十九):实时Json日志数据导入到Hive_hadoop_31

大数据NiFi(十九):实时Json日志数据导入到Hive_NiFi_32

4、连接“TailFile”处理器和“ConvertRecord”处理器

大数据NiFi(十九):实时Json日志数据导入到Hive_大数据_33

5、连接“ConvertRecord”处理器与“PutHDFS”处理器

大数据NiFi(十九):实时Json日志数据导入到Hive_大数据_34

同时设置“ConvertRecord”的处理“failure”关系为自动终止:

大数据NiFi(十九):实时Json日志数据导入到Hive_NiFi_35

七、运行测试

删除HDFS中原有的“/personinfo”路径,启动NiFi处理数据流程,处理数据:

大数据NiFi(十九):实时Json日志数据导入到Hive_json_36

向任意NiFi集群节点“/root/test/jsonfile”中一次性写入以下数据:

echo "{\"id\":1,\"name\":\"zhangsan\",\"age\":18}" >> /root/test/jsonfile
echo "{\"id\":2,\"name\":\"lisi\",\"age\":19}"	>> /root/test/jsonfile
echo "{\"id\":3,\"name\":\"wangwu\",\"age\":20}"	>> /root/test/jsonfile
echo "{\"id\":4,\"name\":\"maliu\",\"age\":21}"	>> /root/test/jsonfile
echo "{\"id\":5,\"name\":\"tianqi\",\"age\":22}"	>> /root/test/jsonfile

查询Hive表personinfo数据:

大数据NiFi(十九):实时Json日志数据导入到Hive_hive_37


相关文章

  • Java常用的几种JSON解析工具
    一、Gson:Google开源的JSON解析库1.添加依赖<!--gson--><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId></dependency><!--lombok--><dependency><groupId>org.proje......
  • 使用C#把Json转换成DataTable
    要将JSON转换为DataTable,你可以使用Newtonsoft.Json库(也称为Json.NET),它是一个流行的用于处理JSON数据的库。以下是使用C#和Newtonsoft.Json将JSON首先,确保你已经安装了Newtonsoft.Json库。你可以在VisualStudio中通过NuGet然后,使用以下代码将JSON转换为......
  • jmeter005:察看结果树之以(txt、css、html、json)格式查看结果
     txt:这里就不用说了,已txt文件展示,形式比较单一,但也是用的比较多的 css:css取样测试其实与txt也差不多,区别就是比txt多了“选择器”筛选 html:html有三种模式,(HTML以基本的界面形式展示数据)、(HTMLSourceFormatted会下载图像来展示)、(HTMLSourceformatted:如果选择了HTML......
  • Caused by: java.lang.ClassNotFoundException: com.alibaba.fastjson2.util.Wrap
    1.情景展示使用fastjson2,运行时报错:Causedby:java.lang.ClassNotFoundException:com.alibaba.fastjson2.util.Wrap2.具体分析出现这个问题,是因为pom.xml当中引用的有关fastjson的jar包冲突造成的。只要我们把冲突的jar包排除掉就可以了。3.解决方案在idea当中,使用插件......
  • .NET的8种JSON序列化反序列化工具供你选择
    在.NET开发中,.NET的JSON序列化反序列化工具除了Newtonsoft.Json和System.Text.Json其实还有很多优秀的开源的序列化和反序列化工具,这些工具有的性能更加优秀,更加轻量等特征。本文将汇总介绍这些.NET中常用的JSON序列化和反序列化工具,供大家选择参考使用。1、Newtonsoft.Json(Jso......
  • Qt+QtWebApp开发笔记(五):http服务器html中使用json触发ajax与后台交互实现数据更新传递
    前言  前面完成了页面的跳转、登录,很多时候不刷新页面就想刷新局部数据,此时ajax就是此种技术,且是异步的。  本篇实现网页内部使用js调用ajax实现异步交互数据。  在js中使用ajax是通过XMLHttpRequest来实现的。下载地址  链接:https://pan.baidu.com/s/1tJMTPhIIyVE40......
  • jmeter-json断言
    1.JSON断言所在位置:断言->JSON断言2.JSON断言中的字段解析AssertJSONPathexists:json表达式,判断所字段是否存在,存在则为True,否则为FalseAdditionallyassertvalue:附加断言字段对应的值,匹配则为True,否则为FalseMatchasregularexpression:断言表达式,判断字段是否存在,......
  • JSON是什么?JSON的简单介绍及使用
    一、JSON介绍JSON(JavaScriptObjectNotation)是一种数据结构,当我们需要在不同的应用程序之间传递数据时,我们需要一种通用的格式来表示数据。相比xml交换格式来说,因为解析xml比较的复杂且需要编写大段的代码,而JSON数据更小,也更容易解析。1、JSON基本语法JSON数据与JAVA中......
  • JS 将form表单数据快速转化为object对象(json对象)
    JS将form表单数据快速转化为object对象(json对象)jaymou于2020-03-0311:11:05发布3534收藏3分类专栏:前端文章标签:javascriptjquery版权前端专栏收录该内容5篇文章0订阅订阅专栏直接上代码/***将Form的数据转化成Javascript的Json对象*/$.fn.seri......
  • Java Kafka简单地将Map对象序列化为json
    最近用到kafka,想简单地把Map对象序列化为json发送到主题,直接用string序列化,生成的结果不是json,虽然格式很像,key都没有引号,可能是直接调用的toString方法。但是网上搜了一圈,都是spring组合或者其他不太简单的方案。在哔站看了一段视频受到启发,就实现了一个自定义json序列化类,......