首页 > 其他分享 >Query Guide-Stream

Query Guide-Stream

时间:2023-09-01 17:35:08浏览次数:47  
标签:接收器 转换 映射 Stream Siddhi JSON 事件 Query Guide

事件流

事件流定义包含流名称和一组属性,这些属性具有特定类型和流范围内唯一可识别的名称。

目的

       接收事件的输入,接收查询处理结果的输出。

语法

define stream <stream

                             <attribute name> <attribute type>, ... );

以下参数用于配置流定义

参数

描述

<stream name>

流名称

<attribute name>

参数名称

<attribute type>

参数类型(STRING, INT, LONG, DOUBLE, FLOAT, BOOL or OBJECT)

例子

define stream TempStream (deviceID long, roomNo int, temp double);

来源Source

源通过多种传输和各种数据格式接收事件,并将它们引导到流中进行处理。

源配置允许定义映射,以便将每个传入事件从其本地数据格式转换为Siddhi事件。当没有提供对此类映射的自定义时,Siddhi假设到达的事件遵循基于流定义和配置的消息映射类型的预定义格式。

目的

提供了一种使用外部系统事件并将其转换为流处理的方式。

语法

Query Guide-Stream_HTTP

@source annotation的type参数定义了接收事件的源类型。

@source annotation的其他参数取决于所选的源类型,其中一些参数可以是可选的。

以下是Siddhi支持的源类型列表:

源类型

描述

In-memory

允许SiddhiApp使用在同一JVM上运行的其他SiddhiApp中的事件。

HTTP

HTTP服务

Kafka

订阅Kafka主题以消费事件

TCP

TCP服务

Email

通过POP3和IMAP协议使用电子邮件

JMS

订阅JMS主题或队列以使用事件

File

读取文件中的事件

CDC

数据库CDC日志数据

Prometheus

监控系统数据

In-memory是Siddhi中唯一内置的源,所有其他源类型都作为扩展实现

源映射

每个@source配置都可以有一个由@map注释表示的映射,该注释定义了如何将传入事件格式转换为Siddhi事件。

@map的type参数定义了在转换传入事件时要使用的映射类型。@map annotation的其他参数取决于所选的映射器,其中一些参数可以是可选的。

映射Attributes

@attributes是与@map一起使用的可选注释,用于定义自定义映射

支持的源映射类型

源映射类型

描述

PassThrough

省略Siddhi事件的数据转换

JSON

将JSON消息转换为Siddhi事件

XML

将XML消息转换为Siddhi事件

TEXT

将TEXT中消息转换为Siddhi事件

Avro

将Avro事件转换为Siddhi事件

Binary

将Siddhi特定的二进制事件转换为Siddhi事件

Key Value

将键值HashMaps转换为Siddhi事件

CSV

将类似CSV分隔符的事件转换为Siddhi事件

提示:

当没有提供@map注释时,使用@map(类型=“passThrough”)作为默认值,它将消耗的Siddhi事件直接传递到流,而不进行任何数据转换。

PassThrough是Siddhi中唯一内置的源映射器,所有其他源映射器都是作为扩展实现的。

例子1

通过公开HTTP服务接收JSON消息,并将它们引导到InputStream流中进行处理。在这里,HTTP服务将通过基本身份验证进行保护,在端口8080和context/foo上的所有网络接口上接收事件。该服务要求JSON消息采用JSON映射器支持的默认数据格式,如下所示。

Query Guide-Stream_JSON_02

HTTP源和JSON源映射器的配置实现了上述功能,如下所示。

Query Guide-Stream_HTTP_03

例子2

通过公开HTTP服务接收JSON消息,并将它们引导到StockStream流中进行处理。在这里,传入的JSON,如下所示,不遵守JSON映射器支持的默认数据格式。

Query Guide-Stream_HTTP_04

配置HTTP源和自定义JSON源映射以实现上述功能如下。

Query Guide-Stream_HTTP_05

接收器Sink

目的

Sink通过将事件转换为支持的格式,提供了一种将流的Siddhi事件发布到外部系统的方法。

语法

Query Guide-Stream_自定义_06

以下是Siddhi支持的接收器类型列表

接收器类型

描述

In-memory

允许SiddhiApp使用在同一JVM上运行的其他SiddhiApp中的事件。

HTTP

将事件发布到HTTP服务。

Kafka

将事件发送到Kafka主题

TCP

T将事件发布到TCP服务

Email

通过SMTP协议发送电子邮件

JMS

将事件发布到JMS主题或队列

File

写事件到文件

Log

记录流中出现的事件

Prometheus

将数据发布到监控系统

分布式接收器

分布式接收器使用负载平衡或分区策略将事件从定义的流发布到多个端点。

任何接收器都可以用作分布式接收器。分布式接收器配置允许用户定义一个通用映射,以转换和发送所有目标端点的Siddhi事件。

目的

分布式接收器提供了一种以配置的事件格式将Siddhi事件发布到多个端点的方法。

语法

轮询分布式接收器

以循环方式将事件发布到定义的目的地

Query Guide-Stream_HTTP_07

分区分布式接收器

通过基于分区键对事件进行分区,将事件发布到定义的目标。

Query Guide-Stream_自定义_08

接收器映射Sink Mapper

使用@map做映射

@payload是与@map一起使用的可选注释,用于定义自定义映射

有两种方法可以配置@payload注释

1一些映射器(如XML、JSON和Test)只接受一个输出负载

@payload( 'This is a test message from {{user}}.')

2某些映射器(例如键值)接受一系列映射值

@payload( key1='mapping_1', 'key2'='user : {{user}}')

以下是Siddhi支持的接收器映射类型列表

类型

描述

PassThrough

省略传出Siddhi事件的数据转换

JSON

将Siddhi事件转换为JSON消息

XML

将Siddhi事件转换为XML消息

TEXT

将Siddhi事件转换为纯文本消息

Avro

将Siddhi事件转换为Avro事件

Binary

将Siddhi事件转换为Siddhi特定的二进制事件

Key Value

将Siddhi事件转换为键值HashMaps

CSV

将Siddhi事件转换为类似CSV的分隔符分隔事件

例子1

Sink通过将OutputStream事件转换为默认格式的JSON消息并发送到HTTP端点来发布这些事件http://localhost:8005/endpoint1,使用POST方法,Accept头,并且具有admin的基本身份验证是用户名和密码。

HTTP接收器和JSON接收器映射器的配置实现了上述功能,如下所示。

Query Guide-Stream_JSON_09

这将以以下格式发布JSON消息

Query Guide-Stream_HTTP_10

例子2

Sink通过将StockStream事件转换为用户定义的JSON消息并将其发送到HTTP端点来发布StockStream事件http://localhost:8005/stocks.

配置HTTP接收器和自定义JSON接收器映射以实现上述功能如下。

Query Guide-Stream_HTTP_11

这将以以下格式将单个事件发布为JSON消息

Query Guide-Stream_JSON_12

这也可以将多个事件一起发布为JSON消息,格式如下

Query Guide-Stream_自定义_13

例子3

Sink使用分区策略将OutputStream流中的事件发布到多个HTTP端点。在这里,事件被发送到http://localhost:8005/endpoint1http://localhost:8006/endpoint2基于划分关键国家。当发布到两个端点时,它使用默认的JSON映射、POST方法和admin作为用户名和密码。

Query Guide-Stream_自定义_14

这将对传出事件进行分区,并将具有相同国家/地区属性值的所有事件发布到同一个端点。发布的JSON消息将采用以下格式:

Query Guide-Stream_自定义_15

错误处理Error Handling

Siddhi中的错误可以在Streams和Sink中处理

流中的错误处理

当订阅流的Siddhi元素抛出错误时,错误会传播到将事件传递给这些Siddhi元件的流。默认情况下,会在流中记录并删除错误,但可以通过在相应的流定义中添加@OnError注释来更改此行为@OnError注释可以帮助用户捕获错误和相关事件,并通过将它们发送到故障流来优雅地处理它们。

@OnError注释和需要指定的操作如下。

Query Guide-Stream_JSON_16

@OnError注释的action参数定义了在失败场景中要执行的操作。可以为@OnError注释指定以下操作来处理错误场景。

1日志:记录带有错误的事件,并删除该事件。即使未定义@OnError注释,这也是执行的默认操作。

2 STREAM:创建一个故障流,并将事件和错误重定向到它。创建的故障流将具有在基流中定义的所有属性,以捕获导致错误的事件,此外,它还包含包含错误信息的object类型的_error属性。可以通过添加来引用故障流!在基流前面的名称为<流名称>。

例子

通过将错误重定向到故障流来处理TempStream中的错误

TempStream流和@OnError注释的配置如下

Query Guide-Stream_JSON_17

Siddhi将推断并自动定义TempStream的故障流,如下所示。

Query Guide-Stream_自定义_18

SiddhiApp通过使用查询添加故障生成和错误处理来扩展上述用例,如下所示。

注:通过查询编写处理逻辑的详细信息将在后面的章节中解释

Query Guide-Stream_JSON_19

接收器错误处理

在某些情况下,当事件发布到外部系统时,外部系统可能会变得不可用或出现错误。默认情况下,sink会记录并删除导致事件丢失的事件,这可以通过配置@sink注释的.error参数来正常处理。

@sink注释的on.error参数可以指定如下。

Query Guide-Stream_HTTP_20

可以为@sink注释的on.error参数指定以下操作以处理错误场景。

1日志:记录带有错误的事件,并删除该事件。这是即使在@sink注释上未定义on.error参数时执行的默认操作

2 WAIT:发布线程在后退和重试模式下等待,并且仅在重新建立连接时发送事件。在此期间,线程将不会消耗任何新消息,从而导致系统在发布到它的系统上引入背压。

3 STREAM:将具有相应错误的失败事件推送到接收器所属的相关故障流。

例子1

当系统无法连接到Kafka时,等待重试

TempStream流和带有on.error属性的@ssink Kafka注释的配置如下。

Query Guide-Stream_HTTP_21

例子2

当系统无法连接到Kafka时,将事件发送到TempStream的故障流

Query Guide-Stream_HTTP_22



标签:接收器,转换,映射,Stream,Siddhi,JSON,事件,Query,Guide
From: https://blog.51cto.com/u_14602923/7324891

相关文章

  • 从零开始学习jQuery (四) 使用jQuery操作元素的属性与样式
    [导读] 一摘要本篇文章讲解如何使用jQuery获取和操作元素的属性和CSS样式其中DOM属性和元素属性的区分值得大家学习二前言通过前面几章我们已经能够完全控制jQuery包装集了,无论是通过选择器选取对象,或者从包一.摘要本篇文章讲解如何使用jQuery获取和操作元素的属性和CSS......
  • 从零开始学习jQuery (六) AJAX快餐
    [导读] 一摘要本系列文章将带您进入jQuery的精彩世界,其中有很多作者具体的使用经验和解决方案,即使你会使用jQuery也能在阅读中发现些许秘籍本篇文章讲解如何使用jQuery方便快捷的实现Ajax功能统一所有开发人员使一.摘要本系列文章将带您进入jQuery的精彩世界,其中有很多......
  • 从零开始学习jQuery (五) 事件与事件对象
    [导读] 一摘要事件是脚本编程的灵魂所以本章内容也是jQuery学习的重点本文将对jQuery中的事件处理以及事件对象进行详细的讲解二前言本篇文章是至今为止本系列内容最多的一篇,足以可见其重要性大家反映要多一.摘要事件是脚本编程的灵魂.所以本章内容也是jQuery学习的重......
  • Java8之Stream流
    先贴上几个案例,水平高超的同学可以挑战一下:1.从员工集合中筛选出salary大于8000的员工,并放置到新的集合里。2.统计员工的最高薪资、平均薪资、薪资之和。3.将员工按薪资从高到低排序,同样薪资者年龄小者在前。4.将员工按性别分类,将员工按性别和地区分类,将员工按薪资是否高于8000......
  • nginx--添加stream模块
    使用的是openEuler22.03(LTS-SP2)系统,yum源选择清华大学的源清楚yum缓存后重新加载,nginx版本变成1.23.2[[email protected]]#yuminfonginxLastmetadataexpirationcheck:0:21:11agoonFri01Sep202310:29:45AMCST.InstalledPackagesName:......
  • RabbitMQ Stream类型队列
    RabbitMQ提供了三种类型的队列:ClassicQuorumStream官方文档对于流队列的描述是:高性能、可持久化、可复制、非破坏性消费、只追加写入的日志使用场景:一个队列将同一条消息分发给不同消费者可重复消费消息更高的性能存储大量消息而不影响性能更高的吞......
  • java练习:使用Stream
    packagecom.example.ss_0203_array.test.test_0830;importjava.util.ArrayList;importjava.util.Collections;importjava.util.stream.Stream;publicclasstest3{publicstaticvoidmain(String[]args){/***按照下面的要求完成集合的创......
  • Stream流
    Stream流获取Stream流流作用:结合了lambda表达式,简化集合,数组操作使用步骤先得到一条stream流并把数据放上去利用stream流中的API进行各种操作中间方法:过滤,转换终结方法:统计,打印ArrayList<String>list1=newArrayList<>();list1.add("张三丰");list1.add("张文......
  • Jquery的load()方法在IE中不运行
    在ie中load()方法去是拿缓存的数据而不是向服务器拿  在script开头加上这句js 让ie不读取缓存就好 如果只是一个方法用到load()那就在load()之前加上这句不需要全局设置 $.ajaxSetup({cache:false});......
  • 20230621 java.io.OutputStream
    介绍java.io.OutputStreampublicabstractclassOutputStreamimplementsCloseable,FlushableFilterOutputStream是典型的装饰器设计模式,很多子类继承这个类,提供额外的功能protectedOutputStreamout;publicFilterOutputStream(OutputStreamout){this.out=ou......