首页 > 其他分享 >Apache Cassandra 支持

Apache Cassandra 支持

时间:2022-12-08 12:06:22浏览次数:34  
标签:CassandraMessageHandler String author 负载 支持 book Apache Cassandra

Apache Cassandra 支持_Apache

Spring Integration 提供了通道适配器(从版本 6.0 开始),用于对 Apache Cassandra 集群执行数据库操作。 它完全基于Apache Cassandra项目的Spring Data。

您需要将此依赖项包含在项目中:

<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-cassandra</artifactId>
<version>6.0.0</version>
</dependency>

卡桑德拉出站组件

这是一个实现,可以在单向(默认)和请求-答复模式(选项)下工作。 默认情况下,它是异步的(重置),并对提供的 执行反应式 、 或操作。 可以通过该选项配置操作类型。 将模式设置为 ;或 、 或 将模式设置为 .​​CassandraMessageHandler​​​​AbstractReplyProducingMessageHandler​​​​producesReply​​​​setAsync(false)​​​​INSERT​​​​UPDATE​​​​DELETE​​​​STATEMENT​​​​ReactiveCassandraOperations​​​​CassandraMessageHandler.Type​​​​ingestQuery​​​​INSERT​​​​query​​​​statementExpression​​​​statementProcessor​​​​STATEMENT​

以下代码片段演示了此通道适配器或网关的各种配置:

@Bean
IntegrationFlow cassandraSelectFlow(ReactiveCassandraOperations cassandraOperations) {
return flow -> flow
.handle(Cassandra.outboundGateway(cassandraOperations)
.query("SELECT * FROM book WHERE author = :author limit :size")
.parameter("author", "payload")
.parameter("size", m -> m.getHeaders().get("limit")))
.channel(c -> c.flux("resultChannel"));
}

如果在默认异步模式下将 a 用作网关,则生成 a,根据提供的实现进行处理。 对于真正的反应式处理,建议对输出通道配置使用。 在同步模式下调用以获取回复值。​​CassandraMessageHandler​​​​Mono<WriteResult>​​​​MessageChannel​​​​FluxMessageChannel​​​​Mono.block()​

如果执行 或操作,则请求消息有效负载中应有一个实体(标记为 )。 如果有效负载是实体列表,则执行相应的批处理操作。​​INSERT​​​​UPDATE​​​​DELETE​​​​org.springframework.data.cassandra.core.mapping.Table​

该模式期望有效负载以要插入的值矩阵的形式存在 - 。 例如,如果实体如下所示:​​ingestQuery​​​​List<List<?>>​

@Table("book")
public record Book(@PrimaryKey String isbn,
String title,
@Indexed String author,
int pages,
LocalDate saleDate,
boolean isInStock) {

}

通道适配器具有以下配置:

@Bean
public MessageHandler cassandraMessageHandler3() {
CassandraMessageHandler cassandraMessageHandler = new CassandraMessageHandler(this.template);
String cqlIngest = "insert into book (isbn, title, author, pages, saleDate, isInStock) values (?, ?, ?, ?, ?, ?)";
cassandraMessageHandler.setIngestQuery(cqlIngest);
cassandraMessageHandler.setAsync(false);
return cassandraMessageHandler;
}

请求消息有效负载必须按如下方式转换:

List<List<Object>> ingestBooks =
payload.stream()
.map(book ->
List.<Object>of(
book.isbn(),
book.title(),
book.author(),
book.pages(),
book.saleDate(),
book.isInStock()))
.toList();

对于更复杂的用例,有效负载可以作为 的实例。 建议使用 API 构建各种语句来针对 Apache Cassandra 执行。 例如,要从表中删除所有数据,可以将具有此类有效负载的消息发送到 : 。 或者,对于基于请求消息的逻辑,可以为 提供 或 以构建基于该消息的 。 为方便起见,a 被注册为 SpEL 求值上下文中的 ,因此目标表达式可以像这样简单:​​com.datastax.oss.driver.api.core.cql.Statement​​​​com.datastax.oss.driver.api.querybuilder.QueryBuilder​​​​Book​​​​CassandraMessageHandler​​​​QueryBuilder.truncate("book").build()​​​​statementExpression​​​​statementProcessor​​​​CassandraMessageHandler​​​​Statement​​​​com.datastax.oss.driver.api.querybuilder​​​​import​

statement-expression="T(QueryBuilder).selectFrom("book").all()"

表示可绑定的命名查询参数,并且仅与选项一起使用。 请参阅上面提到的 Java 和 XML 示例。​​setParameterExpressions(Map<String, Expression> parameterExpressions)​​​​setQuery(String query)​

标签:CassandraMessageHandler,String,author,负载,支持,book,Apache,Cassandra
From: https://blog.51cto.com/u_15326439/5920840

相关文章