版本1.6的ApacheIgnite提供了一种基于KafkaConnect进行数据处理的新方法。Kafka Connect是ApacheKafka 0.9中引入的一个新特性,它支持ApacheKafka和其他数据系统之间的可伸缩和可靠的流数据。它使得在内存中向您的可伸缩和安全的流数据管道中添加新系统变得非常容易。在本文中,我们将研究如何设置和配置IgniteSource连接器,以便在Ignite集群之间执行数据复制。
apache ignite,开箱即用,提供点燃-卡夫卡模块采用三种不同的解决方案(API)来实现健壮的数据处理管道,将数据从/到Kafka主题流到ApacheIgnite。
简而言之,ApacheIgniteSourceConnector用于订阅Ignite缓存事件并将其流到Kafka主题。换句话说,它可以用于从Ignite缓存导出数据(更改的数据集),并仅使用配置文件将内容写入Kafka主题。点燃源连接器侦听注册的Ignite网格事件(如PUT),并将它们转发到Kafka主题。这使得保存到Ignite缓存中的数据能够很容易地转换为事件流。每个事件流包含一个键和两个值:旧的和新的。
这篇文章的部分内容摘自这本书。阿帕奇·艾格尼特的书。如果它让你感兴趣,看看书的其余部分,以获得更有帮助的信息。
这,这个,那,那个IgniteSourceConnector对于支持以下用例可能很有用:
- 若要在发生缓存事件时自动通知任何客户端,例如,每当有新条目进入缓存时。
- 若要使用从Ignite缓存流到1-N目的地的异步事件流,请执行以下操作。目标可以是任何数据库,也可以是另一个Ignite集群。这些使您能够通过Kafka在两个Ignite集群之间进行数据复制。
阿帕奇IgniteSourceConnector与IgniteSinkConnector一起使用的工具,可在ignite-kafka-x.x.x.jar分配。IgniteSourceConnector需要以下配置参数:
的高级体系结构。IgniteSinkConnector如图1所示。
在本文中,我们将使用两个IgniteSourceConnector和IgniteSinkConnector对于从一个Ignite集群到另一个Ignite集群的流事件。IgniteSourceConnector将事件从一个Ignite集群(源集群)流到Kafka主题,IgniteSinkConnector将更改从主题流到另一个Ignite集群(目标集群)。我们将演示逐步配置和运行Source和Sink连接器的说明.为了完成Ignite集群之间的数据复制,我们将执行以下操作:
- 在一台机器上执行两个独立的Ignite集群。
- 在发送到Ignite目标集群之前,开发一个流提取器来解析传入的数据。
- 在不同的独立卡夫卡工人中配置并启动IgniteSource和Sink连接器。
- 向Ignite源集群中添加或修改某些数据。
在完成所有配置之后,您应该有一个典型的管道,即将数据从一个Ignite集群流到另一个Ignite集群,如图2所示。
让我们从Ignite集群配置开始。
第一步。我们将在一台机器上启动两个孤立的集群。要做到这一点,我们必须使用另一组TcpDiscoverySpi和TcpConfigurationSpi将两个集群分离到一个主机上。因此,对于第一个集群中的节点,我们将使用以下方法TcpDiscoverySpi和TcpConfigurationSpi组合:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd">
<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="peerClassLoadingEnabled" value="true"/>
<property name="cacheConfiguration">
<list>
<!-- Partitioned cache example configuration (Atomic mode). -->
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<property name="name" value="myCacheSource"/>
<property name="atomicityMode" value="ATOMIC"/>
<property name="backups" value="1"/>
</bean>
</list>
</property>
<!-- Enable cache events. -->
<property name="includeEventTypes">
<list>
<!-- Cache events. -->
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/>
</list>
</property>
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<!-- Initial local port to listen to. -->
<property name="localPort" value="48500"/>
<!-- Changing local port range. This is an optional action. -->
<property name="localPortRange" value="20"/>
<!-- Setting up IP finder for this cluster -->
<property name="ipFinder">
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
<property name="addresses">
<list>
<value>127.0.0.1:48500..48520</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
<!--
Explicitly configure TCP communication SPI changing local
port number for the nodes from the first cluster.
-->
<property name="communicationSpi">
<bean class="org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi">
<property name="localPort" value="48100"/>
</bean>
</property>
</bean>
</beans>
我们指定了本地港口48500
要侦听并使用静态IP查找器来发现节点,请执行以下操作。此外,我们还显式地将tcp通信端口配置为48100
。从上述配置开始的每个Ignite节点将只加入到此集群,并且在同一主机上的另一个集群中不可见。注意,我们还启用了缓存对象放置用于获取的事件PUT
缓存中每个条目的事件通知。作为数据源,我们将使用myCacheSource复制缓存。用名称保存文件isolated-cluster-1-kafka-source.xml
在.。$IGNITE_HOME/examples/config
文件夹。
对于来自第二个集群的节点,我们必须使用另一组端口。配置如下所示:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="peerClassLoadingEnabled" value="true"/>
<property name="cacheConfiguration">
<list>
<!-- Partitioned cache example configuration (Atomic mode). -->
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<property name="name" value="myCacheTarget"/>
<property name="atomicityMode" value="ATOMIC"/>
<property name="backups" value="1"/>
</bean>
</list>
</property>
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<!-- Initial local port to listen to. -->
<property name="localPort" value="49500"/>
<!-- Changing local port range. This is an optional action. -->
<property name="localPortRange" value="20"/>
<!-- Setting up IP finder for this cluster -->
<property name="ipFinder">
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
<property name="addresses">
<list>
<value>127.0.0.1:49500..49520</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
<!--
Explicitly configure TCP communication SPI changing local port number
for the nodes from the second cluster.
-->
<property name="communicationSpi">
<bean class="org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi">
<property name="localPort" value="49100"/>
</bean>
</property>
</bean>
</beans>
对于第二个集群中的节点,我们将发现端口定义为49500
和通信端口49100
。这两种配置之间的差别是微不足道的,只有Spis和IP查找程序的端口号不同。将此配置保存为具有名称的文件isolated-cluster-1.xml
并将文件放入文件夹中。$IGNITE_HOME/examples/config
.
让我们测试一下配置。使用不同的配置文件在单独的控制台中启动两个Ignite节点。下面是一个如何运行Ignite节点的示例。
ignite.sh $IGNITE_HOME/examples/config/isolated-cluster-1-kafka-source.xml
ignite.sh $IGNITE_HOME/examples/config/isolated-cluster-2.xml
下一个屏幕截图显示了上述命令的结果。正如预期的那样,两个独立的Ignite节点在不同的集群中启动和运行。
注意,所有清单和配置文件都可以在GitHub储存库.
第二步。接下来,您需要定义流提取器来将数据转换为键值元组。创建一个Maven项目,并将以下依赖项添加到pom.xml
.
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-kafka</artifactId>
<version>2.6.0</version>
</dependency>
我们用ignite-kafka
模块作为我们的依赖项。添加以下具有名称的Java类CsvStreamExtractor
进入com.blu.imdg
包,它将实现StreamSingleTupleExtractor
接口如下:
public class CsvStreamExtractor implements StreamSingleTupleExtractor<SinkRecord, String, String> {
public Map.Entry<String, String> extract(SinkRecord sinkRecord) {
System.out.println("SinkRecord:"+ sinkRecord.value().toString());
String[] parts = sinkRecord.value().toString().split(",");
String key = ((String[])parts[2].split("="))[1];
String val= ((String[])parts[7].split("="))[1];
return new AbstractMap.SimpleEntry<String, String>(key, val);
}
}
方法extract
是班里的工作人员吗?CsvStreamExtractor
。这里的代码很简单:它从事件的每个元组中检索键和值,其中每个元组都公开为SinkRecord
在小溪里。这,这个,那,那个extract
方法返回键值对,它将被发送到Ignite集群(目标),以便在缓存中进一步存储。
使用Maven命令编译和构建项目:mvn clean install
。成功编译项目后,一个名为kafka-1.0.jar
应该在项目目标目录中创建。将库复制到文件夹$KAFKA_HOME/libs
.
步骤3。现在我们的流提取器已经准备好使用了,让我们配置Ignite源和接收器连接器,并让它们从复制数据开始。让我们创建一个名为ignite-connector- source.properties
进入$KAFKA_HOME/myconfig
目录。添加以下属性并保存文件。
# connector
name=my-ignite-source-connector
connector.class=org.apache.ignite.stream.kafka.connect.IgniteSourceConnector
tasks.max=2
topicNames=test2
# cache
cacheName=myCacheSource
cacheAllowOverwrite=true
cacheEvts=put
igniteCfg=PATH_TO_THE_FILE/isolated-cluster-1-kafka-source.xml
在前面的连接器配置中,我们定义了org.apache.ignite.stream.kafka.connect.IgniteSourceConnector
作为连接器类。我们还将test 2指定为主题名称,其中将存储流事件。接下来,对于缓存配置,我们将PUT事件定义为网格远程事件。在我们的例子中,我们使用myCacheSource作为源缓存。在这里,另一个关键属性是igniteCfg
,其中我们显式地指定了一个孤立的集群配置。第一组将是我们的事件来源。
接下来,让我们配置Ignite接收器连接器。用名称创建另一个文件ignite- connector-sink.properties
进入$KAFKA_HOME/myconfig
目录。从下面的清单中添加以下属性。
# connector
name=my-ignite-sink-connector
connector.class=org.apache.ignite.stream.kafka.connect.IgniteSinkConnector
tasks.max=2
topics=test2
# cache
cacheName=myCacheTarget
cacheAllowOverwrite=true
igniteCfg=PATH_TO_THE_FILE/isolated-cluster-2.xml
singleTupleExtractorCls=com.blu.imdg.CsvStreamExtractor
配置与我们在上一节中使用的配置相同。主要区别在于singleTupleExtractorCls
属性,其中指定了我们在步骤2中开发的流提取器。
步骤4。启动动物园管理员和Kafka Broker(服务器),如Kafka文献.
步骤5。您可以猜到,我们必须用test 2来创建一个新的Kafka主题。让我们使用以下命令创建主题。
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor \ 1 --partitions 1 --topic test2
步骤6。让我们在一个单独的控制台中启动源和接收器连接器。首先,使用以下命令启动源连接器。
bin/connect-standalone.sh myconfig/connect-standalone.properties myconfig/ignite-connecto\ r-source.properties
这将使用默认连接器独立属性启动源连接器。请注意,此连接器还将启动Ignite服务器节点,我们将该节点加入到点燃团簇1.
拼图的最后一部分是沉槽连接器。我们现在已经准备好启动接收器连接器了。但是,在以独立模式启动另一个Kafka连接器之前,我们必须更改连接器的REST端口和存储文件名。创建一个名为Connection的文件-standalone-sink.properties
进入$KAFKA_HOME/myconfig
文件夹。向其添加以下属性。
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.storage.StringConverter
internal.value.converter=org.apache.kafka.connect.storage.StringConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect-1.offsets
rest.port=8888
offset.flush.interval.ms=10000
大多数配置与以前相同,只有rest.port
和offset.storage.file.filename
是不同的。我们已经显式地定义了一个新的端口。8888
对于此连接器,还指定了另一个文件存储。使用此配置从$KAFKA_HOME
目录。
bin/connect-standalone.sh myconfig/connect-standalone-sink.properties myconfig/ignite-con\ nector-sink.properties
上面的命令将在另一个控制台上启动接收器连接器。下一个图显示了在独立控制台上启动和运行的两个连接器的屏幕截图。
步骤7。现在我们已经设置了连接器,现在是测试流管道的时候了。此时,如果我们在myCacheSource
在集群1上创建的缓存,应该将条目复制到myCacheTarget
集群2上的缓存。我们有几种方法可以使用IgniteRESTAPI或Java客户机将一些条目加载到缓存myCacheSource中。让我们使用Ignite Java客户机IsolatedCluster
从…第二章写这篇文章的书。
$ java -jar ./target/IsolatedCLient-runnable.jar
这个Java客户机加载22
进入缓存的条目myCacheSource
。让我们观察一下在Ignite星系团上发生了什么。使用两个Ignite管理程序工具连接到集群,每个集群一个。执行cache -scan
命令来扫描缓存,您应该得到一个非常类似于图5所示的截图。
如图5所示,不同集群中的每个缓存包含相同的条目集。如果仔细查看控制台上的Ignite接收器连接器日志,您应该会发现类似于以下日志的日志:
CacheEvent [cacheName=myCacheSource, part=64, key=Key:150, xid=null, lockId=GridCacheVersion [topVer=150300733, order=1538826349084, nodeOrder=4], newVal=Hello World!!: 150, oldVal=null, hasOldVal=false, hasNewVal=true, near=false, subjId=572ac224-f48b-4a0c-a844-496f4d609b6a, cloClsName=null, taskName=null, nodeId8=fb6ae4b6, evtNodeId8=572ac224, msg=Cache event., type=CACHE_OBJECT_PUT, tstamp=1538829094472]
Key:150
Val:Hello World!!:
启动源连接器流缓存。PUT
事件进入主题test2
作为一个元组,它包含元数据以及键和值:旧值和新值。点火器接收器连接器使用CsvStreamExtractor
提取器从元组中检索值并将键值对存储到缓存中,myCacheTarget
.
在上面的例子中,我们只配置了Ignite集群之间的单向实时数据复制。但是,ApacheIgniteKafka连接器将大量电源打包到一个小模块中。通过利用它的通用性和易用性,您可以开发强大的双向数据复制管道,或者在网格中发生任何缓存事件时通知任何客户端应用程序。此外,您还可以使用任何Kafka JDBC接收器连接器和Ignite源连接器将数据推入任何RDBMS。但是,Ignite源连接器也有一些限制,在生产环境中使用它之前应该考虑到这些限制:
- 点火源连接器不能并行工作。它不能分割工作,一个任务实例处理流。
- 它不处理多个缓存。为了处理多个缓存,必须定义在Kafka中配置和运行的多个连接器。
- 点火器源连接器需要在嵌入式模式下启动服务器节点才能获得通知的事件。
- 它不支持动态重新配置。