事件流
事件流定义包含流名称和一组属性,这些属性具有特定类型和流范围内唯一可识别的名称。
目的
接收事件的输入,接收查询处理结果的输出。
语法
define stream <stream
<attribute name> <attribute type>, ... );
以下参数用于配置流定义
参数 | 描述 |
<stream name> | 流名称 |
<attribute name> | 参数名称 |
<attribute type> | 参数类型(STRING, INT, LONG, DOUBLE, FLOAT, BOOL or OBJECT) |
例子
define stream TempStream (deviceID long, roomNo int, temp double);
来源Source
源通过多种传输和各种数据格式接收事件,并将它们引导到流中进行处理。
源配置允许定义映射,以便将每个传入事件从其本地数据格式转换为Siddhi事件。当没有提供对此类映射的自定义时,Siddhi假设到达的事件遵循基于流定义和配置的消息映射类型的预定义格式。
目的
提供了一种使用外部系统事件并将其转换为流处理的方式。
语法
@source annotation的type参数定义了接收事件的源类型。
@source annotation的其他参数取决于所选的源类型,其中一些参数可以是可选的。
以下是Siddhi支持的源类型列表:
源类型 | 描述 |
In-memory | 允许SiddhiApp使用在同一JVM上运行的其他SiddhiApp中的事件。 |
HTTP | HTTP服务 |
Kafka | 订阅Kafka主题以消费事件 |
TCP | TCP服务 |
通过POP3和IMAP协议使用电子邮件 | |
JMS | 订阅JMS主题或队列以使用事件 |
File | 读取文件中的事件 |
CDC | 数据库CDC日志数据 |
Prometheus | 监控系统数据 |
In-memory是Siddhi中唯一内置的源,所有其他源类型都作为扩展实现
源映射
每个@source配置都可以有一个由@map注释表示的映射,该注释定义了如何将传入事件格式转换为Siddhi事件。
@map的type参数定义了在转换传入事件时要使用的映射类型。@map annotation的其他参数取决于所选的映射器,其中一些参数可以是可选的。
映射Attributes
@attributes是与@map一起使用的可选注释,用于定义自定义映射
支持的源映射类型
源映射类型 | 描述 |
PassThrough | 省略Siddhi事件的数据转换 |
JSON | 将JSON消息转换为Siddhi事件 |
XML | 将XML消息转换为Siddhi事件 |
TEXT | 将TEXT中消息转换为Siddhi事件 |
Avro | 将Avro事件转换为Siddhi事件 |
Binary | 将Siddhi特定的二进制事件转换为Siddhi事件 |
Key Value | 将键值HashMaps转换为Siddhi事件 |
CSV | 将类似CSV分隔符的事件转换为Siddhi事件 |
提示:
当没有提供@map注释时,使用@map(类型=“passThrough”)作为默认值,它将消耗的Siddhi事件直接传递到流,而不进行任何数据转换。
PassThrough是Siddhi中唯一内置的源映射器,所有其他源映射器都是作为扩展实现的。
例子1
通过公开HTTP服务接收JSON消息,并将它们引导到InputStream流中进行处理。在这里,HTTP服务将通过基本身份验证进行保护,在端口8080和context/foo上的所有网络接口上接收事件。该服务要求JSON消息采用JSON映射器支持的默认数据格式,如下所示。
HTTP源和JSON源映射器的配置实现了上述功能,如下所示。
例子2
通过公开HTTP服务接收JSON消息,并将它们引导到StockStream流中进行处理。在这里,传入的JSON,如下所示,不遵守JSON映射器支持的默认数据格式。
配置HTTP源和自定义JSON源映射以实现上述功能如下。
接收器Sink
目的
Sink通过将事件转换为支持的格式,提供了一种将流的Siddhi事件发布到外部系统的方法。
语法
以下是Siddhi支持的接收器类型列表
接收器类型 | 描述 |
In-memory | 允许SiddhiApp使用在同一JVM上运行的其他SiddhiApp中的事件。 |
HTTP | 将事件发布到HTTP服务。 |
Kafka | 将事件发送到Kafka主题 |
TCP | T将事件发布到TCP服务 |
通过SMTP协议发送电子邮件 | |
JMS | 将事件发布到JMS主题或队列 |
File | 写事件到文件 |
Log | 记录流中出现的事件 |
Prometheus | 将数据发布到监控系统 |
分布式接收器
分布式接收器使用负载平衡或分区策略将事件从定义的流发布到多个端点。
任何接收器都可以用作分布式接收器。分布式接收器配置允许用户定义一个通用映射,以转换和发送所有目标端点的Siddhi事件。
目的
分布式接收器提供了一种以配置的事件格式将Siddhi事件发布到多个端点的方法。
语法
轮询分布式接收器
以循环方式将事件发布到定义的目的地
分区分布式接收器
通过基于分区键对事件进行分区,将事件发布到定义的目标。
接收器映射Sink Mapper
使用@map做映射
@payload是与@map一起使用的可选注释,用于定义自定义映射
有两种方法可以配置@payload注释
1一些映射器(如XML、JSON和Test)只接受一个输出负载
@payload( 'This is a test message from {{user}}.')
2某些映射器(例如键值)接受一系列映射值
@payload( key1='mapping_1', 'key2'='user : {{user}}')
以下是Siddhi支持的接收器映射类型列表
类型 | 描述 |
PassThrough | 省略传出Siddhi事件的数据转换 |
JSON | 将Siddhi事件转换为JSON消息 |
XML | 将Siddhi事件转换为XML消息 |
TEXT | 将Siddhi事件转换为纯文本消息 |
Avro | 将Siddhi事件转换为Avro事件 |
Binary | 将Siddhi事件转换为Siddhi特定的二进制事件 |
Key Value | 将Siddhi事件转换为键值HashMaps |
CSV | 将Siddhi事件转换为类似CSV的分隔符分隔事件 |
例子1
Sink通过将OutputStream事件转换为默认格式的JSON消息并发送到HTTP端点来发布这些事件http://localhost:8005/endpoint1,使用POST方法,Accept头,并且具有admin的基本身份验证是用户名和密码。
HTTP接收器和JSON接收器映射器的配置实现了上述功能,如下所示。
这将以以下格式发布JSON消息
例子2
Sink通过将StockStream事件转换为用户定义的JSON消息并将其发送到HTTP端点来发布StockStream事件http://localhost:8005/stocks.
配置HTTP接收器和自定义JSON接收器映射以实现上述功能如下。
这将以以下格式将单个事件发布为JSON消息
这也可以将多个事件一起发布为JSON消息,格式如下
例子3
Sink使用分区策略将OutputStream流中的事件发布到多个HTTP端点。在这里,事件被发送到http://localhost:8005/endpoint1或http://localhost:8006/endpoint2基于划分关键国家。当发布到两个端点时,它使用默认的JSON映射、POST方法和admin作为用户名和密码。
这将对传出事件进行分区,并将具有相同国家/地区属性值的所有事件发布到同一个端点。发布的JSON消息将采用以下格式:
错误处理Error Handling
Siddhi中的错误可以在Streams和Sink中处理
流中的错误处理
当订阅流的Siddhi元素抛出错误时,错误会传播到将事件传递给这些Siddhi元件的流。默认情况下,会在流中记录并删除错误,但可以通过在相应的流定义中添加@OnError注释来更改此行为@OnError注释可以帮助用户捕获错误和相关事件,并通过将它们发送到故障流来优雅地处理它们。
@OnError注释和需要指定的操作如下。
@OnError注释的action参数定义了在失败场景中要执行的操作。可以为@OnError注释指定以下操作来处理错误场景。
1日志:记录带有错误的事件,并删除该事件。即使未定义@OnError注释,这也是执行的默认操作。
2 STREAM:创建一个故障流,并将事件和错误重定向到它。创建的故障流将具有在基流中定义的所有属性,以捕获导致错误的事件,此外,它还包含包含错误信息的object类型的_error属性。可以通过添加来引用故障流!在基流前面的名称为<流名称>。
例子
通过将错误重定向到故障流来处理TempStream中的错误
TempStream流和@OnError注释的配置如下
Siddhi将推断并自动定义TempStream的故障流,如下所示。
SiddhiApp通过使用查询添加故障生成和错误处理来扩展上述用例,如下所示。
注:通过查询编写处理逻辑的详细信息将在后面的章节中解释
接收器错误处理
在某些情况下,当事件发布到外部系统时,外部系统可能会变得不可用或出现错误。默认情况下,sink会记录并删除导致事件丢失的事件,这可以通过配置@sink注释的.error参数来正常处理。
@sink注释的on.error参数可以指定如下。
可以为@sink注释的on.error参数指定以下操作以处理错误场景。
1日志:记录带有错误的事件,并删除该事件。这是即使在@sink注释上未定义on.error参数时执行的默认操作
2 WAIT:发布线程在后退和重试模式下等待,并且仅在重新建立连接时发送事件。在此期间,线程将不会消耗任何新消息,从而导致系统在发布到它的系统上引入背压。
3 STREAM:将具有相应错误的失败事件推送到接收器所属的相关故障流。
例子1
当系统无法连接到Kafka时,等待重试
TempStream流和带有on.error属性的@ssink Kafka注释的配置如下。
例子2
当系统无法连接到Kafka时,将事件发送到TempStream的故障流