6.4. 反应式流订阅者
还可以获取由订阅支持的反应式流。 为此,必须将项目反应器依赖项 () 添加到项目中。 然后,Pub/Sub 启动器和项目反应器依赖项的组合将使 abean 可用,然后可用于获取 abean。io.projectreactor:reactor-core
PubSubReactiveFactory
Publisher
@Autowired
PubSubReactiveFactory reactiveFactory;
// ...
Flux<AcknowledgeablePubsubMessage> flux
= reactiveFactory.poll("exampleSubscription", 1000);
然后表示通过指定订阅传入的无限 GCP 发布/订阅消息流。 对于无限需求,将定期轮询 Pub/Sub 订阅,时间间隔由创建时传入的参数确定。 对于有界需求,参数未使用。 相反,会立即传递尽可能多的消息(最多为请求的数量),其余消息在可用时传递。Flux
pollingPeriodMs
Flux
pollingPeriodMs
基础消息检索逻辑引发的任何异常都将作为错误传递给流。 错误处理运算符(等)可用于恢复。Flux#retry()
Flux#onErrorResume()
项目反应器的全部操作都可以应用于流。 例如,如果只想获取 5 条消息,则可以使用操作将无限流转换为有限流:limitRequest
Flux<AcknowledgeablePubsubMessage> fiveMessageFlux = flux.limitRequest(5);
流经的消息应手动确认。Flux
flux.doOnNext(AcknowledgeablePubsubMessage::ack);
6.5. 发布/订阅管理
PubSubAdmin
是Spring Cloud GCP提供的用于管理Google Cloud Pub/Sub资源的抽象。 它允许创建,删除和列出主题和订阅。
通常,在引用主题和订阅时,可以使用当前项目中的短规范名称,也可以使用 theformat 使用引用不同项目中的主题或订阅的完全限定名称。 |
用于 GCP 发布/订阅的 Spring 引导启动器使用 Spring 引导 GCP Core 启动器自动配置的对象。PubSubAdmin
GcpProjectIdProvider
CredentialsProvider
6.5.1. 创建主题
PubSubAdmin
实现创建主题的方法:
public Topic createTopic(String topicName)
以下是如何创建 Google Cloud 发布/订阅主题的示例:
public void newTopic() {
pubSubAdmin.createTopic("topicName");
}
6.5.2. 删除主题
PubSubAdmin
实现删除主题的方法:
public void deleteTopic(String topicName)
以下是如何删除 Google Cloud 发布/订阅主题的示例:
public void deleteTopic() {
pubSubAdmin.deleteTopic("topicName");
}
6.5.3. 列出主题
PubSubAdmin
实现列出主题的方法:
public List<Topic> listTopics
以下是如何列出项目中的每个 Google Cloud 发布/订阅主题名称的示例:
List<String> topics =
pubSubAdmin.listTopics().stream().map(Topic::getName).collect(Collectors.toList());
6.5.4. 创建订阅
PubSubAdmin
实现多种方法来创建对现有主题的订阅:
public Subscription createSubscription(String subscriptionName, String topicName)
public Subscription createSubscription(String subscriptionName, String topicName, Integer ackDeadline)
public Subscription createSubscription(String subscriptionName, String topicName, Integer ackDeadline, String pushEndpoint)
public Subscription createSubscription(Subscriber.Builder builder)
默认值为 for0 秒。 如果未指定 if,订阅将改用消息拉取。 您还可以通过 a,完全控制客户端库中可用的任何选项或功能。ackDeadline
pushEndpoint
Subscription.Builder
以下是如何创建 Google Cloud Pub/Sub 订阅的示例:
public Subscription newSubscription() {
return pubSubAdmin.createSubscription("subscriptionName", "topicName", 15);
}
6.5.5. 删除订阅
PubSubAdmin
实现删除订阅的方法:
public void deleteSubscription(String subscriptionName)
以下是如何删除 Google Cloud Pub/Sub 订阅的示例:
public void deleteSubscription() {
pubSubAdmin.deleteSubscription("subscriptionName");
}
6.5.6. 列出订阅
PubSubAdmin
实现列出订阅的方法:
public List<Subscription> listSubscriptions()
下面是如何列出项目中的每个订阅名称的示例:
List<String> subscriptions =
pubSubAdmin.listSubscriptions().stream()
.map(Subscription::getName)
.collect(Collectors.toList());
6.6. 示例
提供了用于使用模板和使用订阅支持的反应式流的示例应用程序。
6.7. 测试
Testcontainers
提供模块。在文档中查看更多信息gcloud
PubSubEmulatorContainer
7. 弹簧集成
Spring Cloud GCP 提供 Spring Integration 适配器,允许您的应用程序使用由 Google Cloud Platform 服务备份的企业集成模式。
7.1. 云发布/订阅的通道适配器
Google Cloud Pub/Sub 的频道适配器将您的 SpringMessageChannels连接到 Google Cloud Pub/Sub 主题和订阅。 这样,就可以在由 Google Cloud Pub/Sub 备份的不同流程、应用或微服务之间进行消息传递。
用于 Google Cloud Pub/Sub 的 Spring 集成通道适配器包含在模块中,可以通过将该模块与 Spring 集成依赖项结合使用来自动配置。spring-cloud-gcp-pubsub
spring-cloud-gcp-starter-pubsub
Maven 坐标,使用Spring Cloud GCP BOM:
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
</dependency>
格拉德尔坐标:
dependencies {
implementation("com.google.cloud:spring-cloud-gcp-starter-pubsub")
implementation("org.springframework.integration:spring-integration-core")
}
7.1.1. 入站通道适配器(使用发布/订阅流拉取)
PubSubInboundChannelAdapter
是 GCP 发布/订阅的入站通道适配器,用于侦听新消息的 GCP 发布/订阅。 它将新消息转换为内部 Spring消息,然后将其发送到绑定的输出通道。
Google Pub/Sub 将消息有效负载视为字节数组。 因此,默认情况下,入站通道适配器将构造 Springwithas 有效负载。 但是,您可以通过设置的属性来更改所需的有效负载类型。 将转换委托给所需的有效负载类型到配置。Message
byte[]
payloadType
PubSubInboundChannelAdapter
PubSubInboundChannelAdapter
PubSubMessageConverter
PubSubTemplate
要使用入站通道适配器,必须在用户应用程序端提供和配置。PubSubInboundChannelAdapter
订阅名称可以是当前项目中的短订阅名称,也可以是使用 theformat 引用不同项目中的订阅的完全限定名称。 |
@Bean
public MessageChannel pubsubInputChannel() {
return new PublishSubscribeChannel();
}
@Bean
public PubSubInboundChannelAdapter messageChannelAdapter(
@Qualifier("pubsubInputChannel") MessageChannel inputChannel,
PubSubTemplate pubsubTemplate) {
PubSubInboundChannelAdapter adapter =
new PubSubInboundChannelAdapter(pubsubTemplate, "subscriptionName");
adapter.setOutputChannel(inputChannel);
adapter.setAckMode(AckMode.MANUAL);
return adapter;
}
在示例中,我们首先指定适配器要将传入消息写入的位置。 实现在这里并不重要。 根据您的用例,您可能希望使用其他用途。MessageChannel
MessageChannel
MessageChannel
PublishSubscribeChannel
然后,我们宣布 abean。 它需要我们刚刚创建的通道和一个,它从 Google Cloud Java Client for Pub/Sub 创建对象。 用于 GCP 发布/订阅的 Spring 引导启动器提供了一个已配置的对象。PubSubInboundChannelAdapter
SubscriberFactory
Subscriber
PubSubSubscriberOperations
确认消息和处理失败
在使用 Cloud Pub/Sub 时,了解以下概念非常重要: Cloud Pub/Sub 在尝试重新传递未完成消息之前等待的时间。 每个订阅都有一个默认值,应用于发送给它的所有消息。 此外,Cloud Pub/Sub 客户端库可以扩展每个流消息,直到消息处理完成、失败或最长扩展期过去。ackDeadline
ackDeadline
ackDeadline
在发布/订阅客户端库中,默认的最长扩展期为一小时。但是,Spring Cloud GCP 会禁用此自动扩展行为。 使用该属性重新启用它。 |
确认(确认)消息会将其从 Pub/Sub 的已知未完成消息中删除。取消邮件会将其确认截止时间重置为 0,从而强制立即重新传递。 这在负载平衡体系结构中可能很有用,其中一个订阅者遇到问题,但其他订阅者可用于处理消息。
支持三种确认模式:默认模式(处理成功时自动确认和异常时自动确认),以及两种用于额外手动控制的模式:AckMode.AUTO_ACK(成功时自动确认,但异常时不执行任何操作)和AckMode.MANUAL(完全没有自动操作;确认和等待都必须手动完成)。PubSubInboundChannelAdapter
AckMode.AUTO
Table 1. Acknowledgement mode behavior
自动 | AUTO_ACK | 手动 | |
消息处理成功完成 | 咳咳,无重新交付 | 咳咳,无重新交付 | <无操作>* |
消息处理失败,但错误处理程序成功完成** | 咳咳,无重新交付 | 咳咳,无重新交付 | <无操作>* |
消息处理失败;不存在错误处理程序 | 纳克,立即重新交付 | <无操作>* | <无操作>* |
消息处理失败,错误处理程序引发异常 | 纳克,立即重新交付 | <无操作>* | <无操作>* |
* <无操作>表示消息既不会被确认也不会被裸露。 云发布/订阅将根据订阅设置和客户端库设置尝试重新交付。ackDeadline
max-ack-extension-period
** 对于适配器,“成功”表示 Spring 集成流已处理而不引发异常,因此成功的消息处理和错误处理程序的成功完成都会导致相同的行为(消息将被确认)。 要触发默认错误行为(在模式中挂起;既不执行也不在模式中插入),请通过从错误处理流中引发异常将错误传播回适配器。AUTO
AUTO_ACK
手动捡合/捡拾
适配器将对象附加到标头。 用户可以提取使用密钥并使用它来确认(或纳克)消息。BasicAcknowledgeablePubsubMessage
Message
BasicAcknowledgeablePubsubMessage
GcpPubSubHeaders.ORIGINAL_MESSAGE
@Bean
@ServiceActivator(inputChannel = "pubsubInputChannel")
public MessageHandler messageReceiver() {
return message -> {
LOGGER.info("Message arrived! Payload: " + new String((byte[]) message.getPayload()));
BasicAcknowledgeablePubsubMessage originalMessage =
message.getHeaders().get(GcpPubSubHeaders.ORIGINAL_MESSAGE, BasicAcknowledgeablePubsubMessage.class);
originalMessage.ack();
};
}
错误处理
如果要在发生错误时对消息处理进行更多控制,则需要将其与 Spring 集成错误通道相关联,并指定要调用的行为。PubSubInboundChannelAdapter
@ServiceActivator
为了激活默认行为(非模式指示;既不占用也不占用非模式),错误处理程序必须引发异常。 否则,适配器将假定处理成功完成,并将确认消息。 |
@Bean
public MessageChannel pubsubInputChannel() {
return new PublishSubscribeChannel();
}
@Bean
public PubSubInboundChannelAdapter messageChannelAdapter(
@Qualifier("pubsubInputChannel") MessageChannel inputChannel,
PubSubTemplate pubsubTemplate) {
PubSubInboundChannelAdapter adapter =
new PubSubInboundChannelAdapter(pubsubTemplate, "subscriptionName");
adapter.setOutputChannel(inputChannel);
adapter.setAckMode(AckMode.AUTO_ACK);
adapter.setErrorChannelName("pubsubErrors");
return adapter;
}
@ServiceActivator(inputChannel = "pubsubErrors")
public void pubsubErrorHandler(Message<MessagingException> message) {
LOGGER.warn("This message will be automatically acked because error handler completes successfully");
}
如果您希望手动确认或处理消息,可以通过检索异常有效负载的标头来完成:
@ServiceActivator(inputChannel = "pubsubErrors")
public void pubsubErrorHandler(Message<MessagingException> exceptionMessage) {
BasicAcknowledgeablePubsubMessage originalMessage =
(BasicAcknowledgeablePubsubMessage)exceptionMessage.getPayload().getFailedMessage()
.getHeaders().get(GcpPubSubHeaders.ORIGINAL_MESSAGE);
originalMessage.nack();
}
7.1.2. 可轮询消息源(使用发布/订阅同步拉取)
虽然通过底层异步拉取发布/订阅机制,为接收稳定消息流的大容量应用程序提供最佳性能,但它可能会由于消息缓存而产生负载平衡异常。 在发布大量需要很长时间才能单独处理的小消息时,此行为最为明显。 它表现为一个订阅者占用了大多数消息,即使有多个订阅者可以承担工作。 有关此方案的更详细说明,请参阅GCP 发布/订阅文档。PubSubInboundChannelAdapter
在这种情况下,acan 有助于更均匀地在不同订阅者之间分配负载。PubSubMessageSource
与入站通道适配器一样,消息源具有可配置的确认方式、有效负载类型和标头映射。
默认行为是在不存在消息时立即从同步拉取操作返回。 这可以通过使用方法等待至少一条消息到达来覆盖。setBlockOnPull()
默认情况下,一次从订阅中提取一条消息。 若要在每个请求上拉取一批消息,请使用该方法设置批大小。PubSubMessageSource
setMaxFetchSize()
订阅名称可以是当前项目中的短订阅名称,也可以是使用 theformat 引用不同项目中的订阅的完全限定名称。 |
@Bean
@InboundChannelAdapter(channel = "pubsubInputChannel", poller = @Poller(fixedDelay = "100"))
public MessageSource<Object> pubsubAdapter(PubSubTemplate pubSubTemplate) {
PubSubMessageSource messageSource = new PubSubMessageSource(pubSubTemplate, "exampleSubscription");
messageSource.setAckMode(AckMode.MANUAL);
messageSource.setPayloadType(String.class);
messageSource.setBlockOnPull(true);
messageSource.setMaxFetchSize(100);
return messageSource;
}
上面的注释确保配置被轮询消息,然后可以使用消息通道上的任何 Spring 集成机制进行操作。 例如,可以在带有批注的方法中检索消息,如下所示。@InboundChannelAdapter
MessageSource
pubsubInputChannel
@ServiceActivator
为了提高灵活性,请将对象附加到消息头。 该对象可用于手动 (n) 确认消息。PubSubMessageSource
AcknowledgeablePubSubMessage
GcpPubSubHeaders.ORIGINAL_MESSAGE
@ServiceActivator(inputChannel = "pubsubInputChannel")
public void messageReceiver(String payload,
@Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) AcknowledgeablePubsubMessage message)
throws InterruptedException {
LOGGER.info("Message arrived by Synchronous Pull! Payload: " + payload);
message.ack();
}
|
7.1.3. 出站通道适配器
PubSubMessageHandler
是 GCP 发布/订阅的出站通道适配器,用于侦听 Spring 上的新消息。 它用于将它们发布到 GCP 发布/订阅主题。MessageChannel
PubSubTemplate
要构造消息的发布/订阅表示形式,出站通道适配器需要将 Springpayload 转换为 Pub/Sub 期望的字节数组表示形式。 它将此转换委托给。 要自定义转换,您可以指定应该转换 Springto a 的有效载荷和标头。Message
PubSubTemplate
PubSubMessageConverter
PubSubTemplate
Object
Message
PubsubMessage
要使用出站通道适配器,必须在用户应用程序端提供和配置 abean。PubSubMessageHandler
主题名称可以是当前项目中的短主题名称,也可以是使用 theformat 引用不同项目中的主题的完全限定名称。 |
@Bean
@ServiceActivator(inputChannel = "pubsubOutputChannel")
public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
return new PubSubMessageHandler(pubsubTemplate, "topicName");
}
提供的包含将消息发布到 GCP 发布/订阅主题所需的所有配置。PubSubTemplate
PubSubMessageHandler
默认情况下异步发布消息。 可以为同步发布配置发布超时。 如果未提供任何内容,适配器将无限期地等待响应。
可以通过 and方法 为 call in(可以设置一个或两个)设置用户定义的回调。 这些允许在成功时访问发布/订阅发布消息 ID,或在发生错误时访问根本原因异常。 两个回调都包含原始消息作为第二个参数。 仅授予对消息 ID 或根本原因异常的访问权限的旧方法已弃用,并将在将来的发行版中删除。publish()
PubSubMessageHandler
setSuccessCallback()
setFailureCallback()
setPublishCallback()
adapter.setPublishCallback(
new ListenableFutureCallback<String>() {
@Override
public void onFailure(Throwable ex) {}
@Override
public void onSuccess(String result) {}
});
要覆盖默认主题,您可以使用标头。GcpPubSubHeaders.TOPIC
@Autowired
private MessageChannel pubsubOutputChannel;
public void handleMessage(Message<?> msg) throws MessagingException {
final Message<?> message = MessageBuilder
.withPayload(msg.getPayload())
.setHeader(GcpPubSubHeaders.TOPIC, "customTopic").build();
pubsubOutputChannel.send(message);
}
也可以使用理论方法为主题设置 SpEL 表达式。setTopicExpression()
setTopicExpressionString()
PubSubMessageHandler adapter = new PubSubMessageHandler(pubSubTemplate, "myDefaultTopic");
adapter.setTopicExpressionString("headers['sendToTopic']");
7.1.4. 标头映射
这些通道适配器包含标头映射器,允许您映射或过滤掉从 Spring 到 Google Cloud 发布/订阅消息的标头,反之亦然。 默认情况下,入站通道适配器会将 Google Cloud 发布/订阅消息上的每个标头映射到适配器生成的 Spring 消息。 出站通道适配器将 Spring 消息中的每个标头映射到 Google Cloud Pub/Sub 消息,除了 Spring 添加的标头和一些特殊标头,例如带有 key ,,, 和 的标头。 在此过程中,出站映射器还会将标头的值转换为字符串。"id"
"timestamp"
"gcp_pubsub_acknowledgement"
"gcp_pubsub_ordering_key"
请注意,您可以提供 () 标头,该标头将自动映射到属性,并从已发布消息的标头中排除。 如果您使用此标头发布邮件,请记住设置。GcpPubSubHeaders.ORDERING_KEY
"gcp_pubsub_ordering_key"
PubsubMessage.orderingKey
spring.cloud.gcp.pubsub.publisher.enable-message-ordering
true
每个适配器都声明了一个方法,以便您进一步自定义要从Spring映射到Google Cloud Pub/Sub的标头,反之亦然。setHeaderMapper()
例如,要过滤掉标头以及所有以前缀“prefix_”开头的标头,您可以使用此模块提供的实现。"foo"
"bar"
setHeaderMapper()
PubSubHeaderMapper
PubSubMessageHandler adapter = ...
...
PubSubHeaderMapper headerMapper = new PubSubHeaderMapper();
headerMapper.setOutboundHeaderPatterns("!foo", "!bar", "!prefix_*", "*");
adapter.setHeaderMapper(headerMapper);
模式声明为不重要的顺序。 第一种模式优先于以下模式。 |
在前面的示例中,模式意味着映射了每个标头。 但是,由于它在列表中排在最后,因此以前的模式优先。"*"
7.1.5. 示例
可用示例:
- 使用通道适配器发送/接收消息
- 具有 JSON 有效负载的发布/子通道适配器
- Spring 集成和发布/订阅代码实验室
7.2. 谷歌云存储的通道适配器
谷歌云存储的通道适配器允许您读取和写入文件到谷歌云存储。MessageChannels
Spring Cloud GCP 提供两个入站适配器和一个出站适配器。GcsInboundFileSynchronizingMessageSource
GcsStreamingMessageSource
GcsMessageHandler
用于 Google Cloud Storage 的 Spring 集成通道适配器包含在该模块中。spring-cloud-gcp-storage
要使用Spring Integration for Spring Cloud GCP的存储部分,您还必须提供依赖项,因为它不是可传递拉取的。spring-integration-file
Maven 坐标,使用Spring Cloud GCP BOM:
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>spring-cloud-gcp-storage</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-file</artifactId>
</dependency>
格拉德尔坐标:
dependencies {
implementation("com.google.cloud:spring-cloud-gcp-starter-storage")
implementation("org.springframework.integration:spring-integration-file")
}
7.2.1. 入站通道适配器
Google Cloud Storage 入站通道适配器轮询 Google Cloud Storage 存储桶以获取新文件,并将每个文件在有效负载中发送到注释中的指定文件。 这些文件临时存储在本地文件系统的文件夹中。Message
MessageChannel
@InboundChannelAdapter
下面是如何配置 Google Cloud Storage 入站通道适配器的示例。
@Bean
@InboundChannelAdapter(channel = "new-file-channel", poller = @Poller(fixedDelay = "5000"))
public MessageSource<File> synchronizerAdapter(Storage gcs) {
GcsInboundFileSynchronizer synchronizer = new GcsInboundFileSynchronizer(gcs);
synchronizer.setRemoteDirectory("your-gcs-bucket");
GcsInboundFileSynchronizingMessageSource synchAdapter =
new GcsInboundFileSynchronizingMessageSource(synchronizer);
synchAdapter.setLocalDirectory(new File("local-directory"));
return synchAdapter;
}
7.2.2. 入站流通道适配器
入站流通道适配器类似于普通的入站通道适配器,不同之处在于它不需要将文件存储在文件系统中。
下面是如何配置 Google Cloud Storage 入站流通道适配器的示例。
@Bean
@InboundChannelAdapter(channel = "streaming-channel", poller = @Poller(fixedDelay = "5000"))
public MessageSource<InputStream> streamingAdapter(Storage gcs) {
GcsStreamingMessageSource adapter =
new GcsStreamingMessageSource(new GcsRemoteFileTemplate(new GcsSessionFactory(gcs)));
adapter.setRemoteDirectory("your-gcs-bucket");
return adapter;
}
如果您想按特定顺序处理存储桶中的文件,您可以传入 ato 构造函数对正在处理的文件进行排序。Comparator<BlobInfo>
GcsStreamingMessageSource
7.2.3. 出站通道适配器
出站通道适配器允许将文件写入 Google Cloud Storage。 当它收到包含有效负载类型的有效负载时,它会将该文件写入适配器中指定的 Google Cloud Storage 存储桶。Message
File
下面是如何配置 Google Cloud Storage 出站通道适配器的示例。
@Bean
@ServiceActivator(inputChannel = "writeFiles")
public MessageHandler outboundChannelAdapter(Storage gcs) {
GcsMessageHandler outboundChannelAdapter = new GcsMessageHandler(new GcsSessionFactory(gcs));
outboundChannelAdapter.setRemoteDirectoryExpression(new ValueExpression<>("your-gcs-bucket"));
return outboundChannelAdapter;
}
8.春云流
Spring Cloud GCP 为 Google Cloud Pub/Sub 提供了一个Spring Cloud Stream活页夹。
提供的活页夹依赖于Spring Integration Channel Adapters for Google Cloud Pub/Sub。
Maven 坐标,使用Spring Cloud GCP BOM:
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>spring-cloud-gcp-pubsub-stream-binder</artifactId>
</dependency>
格拉德尔坐标:
dependencies {
implementation("com.google.cloud:spring-cloud-gcp-pubsub-stream-binder")
}
8.1. 概述
此活页夹将生产者绑定到 Google Cloud 发布/订阅主题,将使用者绑定到订阅。
此绑定程序当前不支持分区。 |
8.2. 配置
您可以为 Google Cloud Pub/Sub 配置 Spring Cloud Stream Binder,以自动生成底层资源,例如 Google Cloud Pub/Sub 主题以及生产者和消费者的订阅。 为此,您可以使用默认情况下打开的属性。spring.cloud.stream.gcp.pubsub.bindings.<channelName>.<consumer|producer>.auto-create-resources
从版本 1.1 开始,可以为所有绑定全局配置这些和其他绑定器属性,例如spring.cloud.stream.gcp.pubsub.default.consumer.auto-create-resources
如果您使用的是 Spring Cloud GCP 发布/订阅启动器的发布/订阅自动配置,则应参考配置部分了解其他发布/订阅参数。
若要将此绑定程序与正在运行的模拟器一起使用,请通过以下方式配置其主机和端口。 |
8.2.1. 生产者同步发送配置
默认情况下,此绑定程序将异步将消息发送到云发布/订阅。 如果首选同步发送(例如,允许将错误传播回发送方),请将属性设置为 。spring.cloud.stream.gcp.pubsub.default.producer.sync
true
8.2.2. 生产者目标配置
如果自动资源创建开启,且目标名称对应的主题不存在,则会创建该主题。
例如,对于以下配置,将创建一个名为“将”的主题。myEvents
应用程序属性
spring.cloud.stream.bindings.events.destination=myEvents
spring.cloud.stream.gcp.pubsub.bindings.events.producer.auto-create-resources=true
8.2.3. 消费者目的地配置
将为使用方终结点配置 A。 您可以使用属性调整使用方终结点的确认模式。 确认模式控制成功接收消息时如何确认消息。 三个可能的选项是:(默认)、和。 发布/订阅通道适配器文档中详细介绍了这些选项。PubSubInboundChannelAdapter
ack-mode
AUTO
AUTO_ACK
MANUAL
应用程序属性
# How to set the ACK mode of the consumer endpoint.
spring.cloud.stream.gcp.pubsub.bindings.{CONSUMER_NAME}.consumer.ack-mode=AUTO_ACK
为使用者启用自动资源创建后,库会创建主题和/或订阅(如果它们不存在)。 主题名称将与目标名称相同,订阅名称遵循以下规则(按优先级顺序):
- 用户定义的预先存在的订阅(使用
spring.cloud.stream.gcp.pubsub.bindings.{CONSUMER_NAME}.consumer.subscriptionName
) - 使用主题名称的使用者组 (用于创建名为
spring.cloud.stream.bindings.events.group
<topicName>.<group>
) - 如果未指定上述任何一项,则库将创建具有该名称的匿名订阅。 然后,当绑定程序关闭时,库会自动清理为匿名使用者组创建的所有发布/订阅。
anonymous.<destinationName>.<randomUUID>
例如,使用此配置:
应用程序属性
spring.cloud.stream.bindings.events.destination=myEvents
spring.cloud.stream.gcp.pubsub.bindings.events.consumer.auto-create-resources=false
仅创建并稍后清理了匿名订阅 namedis。anonymous.myEvents.a6d83782-c5a3-4861-ac38-e6e2af15a7be
在另一个示例中,具有以下配置:
应用程序属性
spring.cloud.stream.bindings.events.destination=myEvents
spring.cloud.stream.gcp.pubsub.bindings.events.consumer.auto-create-resources=true
# specify consumer group, and avoid anonymous consumer group generation
spring.cloud.stream.bindings.events.group=consumerGroup1
将创建以下资源:
- 名为
myEvents
- 名为
myEvents.consumerGroup1
8.2.4. 标头映射
您可以使用属性过滤传入和传出邮件头。 例如,对于只允许两个标头的使用者,请提供逗号分隔的列表,如下所示:allowHeaders
应用程序属性
spring.cloud.stream.gcp.pubsub.bindings.<consumerFunction>-in-0.consumer.allowedHeaders=allowed1, allowed2
其中<consumerFunction>应替换为使用/读取来自Cloud Pub/Sub的消息的方法,并且允许1,allow2是用户想要保留的逗号分隔的标头列表。
类似的风格也适用于生产者。例如:
应用程序属性
spring.cloud.stream.gcp.pubsub.bindings.<producerFunction>-out-0.producer.allowedHeaders=allowed3,allowed4
其中<producerFunction>应替换为生成/发送消息到Cloud Pub/Sub的方法,并且允许3,allow4是用户想要映射的标题的逗号分隔列表。在将邮件发送到 Cloud Pub/Sub 之前,将删除所有其他标头。
8.2.5. 端点定制
您可以通过在自动配置中定义 a.如果要自定义 Pub/Sub Spring 云流活页夹提供的默认配置,这将非常有用。ConsumerEndpointCustomizer
下面的示例演示如何使用 ato 覆盖绑定程序配置的默认错误通道。ConsumerEndpointCustomizer
@Bean
public ConsumerEndpointCustomizer<PubSubInboundChannelAdapter> messageChannelAdapter() {
return (endpoint, destinationName, group) -> {
NamedComponent namedComponent = (NamedComponent) endpoint.getOutputChannel();
String channelName = namedComponent.getBeanName();
endpoint.setErrorChannelName(channelName + ".errors");
};
}
8.3. 与函数绑定
从 3.0 版本开始,Spring Cloud Stream 原生支持函数式编程模型。 这意味着将应用程序转换为接收器的唯一要求是应用程序上下文中存在 abean。java.util.function.Consumer
@Bean
public Consumer<UserMessage> logUserMessage() {
return userMessage -> {
// process message
}
};
源应用程序是存在 abean 的应用程序。 它可以返回一个对象,在这种情况下,Spring Cloud Stream将重复调用供应商。 或者,该函数可以返回一个反应式流,该流将按原样使用。Supplier
@Bean
Supplier<Flux<UserMessage>> generateUserMessages() {
return () -> /* flux creation logic */;
}
处理器应用程序的工作方式与源应用程序类似,只是它是由 abean 的存在触发的。Function
8.4. 使用注释绑定
从版本 3.0 开始,批注绑定被视为旧版。 |
若要以这种样式设置接收器应用程序,请将类与绑定接口(如内置接口)相关联。Sink
@EnableBinding(Sink.class)
public class SinkExample {
@StreamListener(Sink.INPUT)
public void handleMessage(UserMessage userMessage) {
// process message
}
}
要设置源应用程序,您需要将类与内置接口相关联,并注入Spring Cloud Stream提供的该类的实例。Source
@EnableBinding(Source.class)
public class SourceExample {
@Autowired
private Source source;
public void sendMessage() {
this.source.output().send(new GenericMessage<>(/* your object here */));
}
}
8.5. 流式传输与轮询输入
许多 Spring 云流应用程序将使用内置绑定,这会触发流式输入绑定器的创建。 然后,可以使用标记为注释的输入处理程序使用消息,无论 Pub/Sub 发送它们的速率如何。Sink
@StreamListener(Sink.INPUT)
为了更好地控制消息到达速率,可以通过定义具有注释方法返回的自定义绑定接口来设置轮询输入绑定器。@Input
PollableMessageSource
public interface PollableSink {
@Input("input")
PollableMessageSource input();
}
然后可以根据需要注入和查询。PollableMessageSource
@EnableBinding(PollableSink.class)
public class SinkExample {
@Autowired
PollableMessageSource destIn;
@Bean
public ApplicationRunner singlePollRunner() {
return args -> {
// This will poll only once.
// Add a loop or a scheduler to get more messages.
destIn.poll(message -> System.out.println("Message retrieved: " + message));
};
}
}
默认情况下,轮询一次只会收到 1 条消息。 使用该属性获取每个网络往返的其他消息。spring.cloud.stream.gcp.pubsub.default.consumer.maxFetchSize
8.6. 示例
提供示例应用程序:
- 对于流式输入,基于注释。
- 用于流式输入,功能样式。
- 对于轮询输入。
9. 春云公交
使用 CloudPub/Sub作为Spring Cloud Bus实现就像导入启动器一样简单。spring-cloud-gcp-starter-bus-pubsub
此启动器引入了用于 Cloud Pub/Sub 的 Spring Cloud Stream 活页夹,用于发布和订阅总线。 如果总线主题(默认命名)不存在,则绑定程序会自动创建它。 活页夹还使用启动器为每个项目创建匿名订阅。springCloudBus
spring-cloud-gcp-starter-bus-pubsub
Maven 坐标,使用Spring Cloud GCP BOM:
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>spring-cloud-gcp-starter-bus-pubsub</artifactId>
</dependency>
格拉德尔坐标:
dependencies {
implementation("com.google.cloud:spring-cloud-gcp-starter-bus-pubsub")
}
9.1. 使用弹簧云配置和弹簧云总线进行配置管理
Spring Cloud Bus 可用于将配置更改从 Spring Cloud Config 服务器推送到侦听同一总线的客户端。
要使用 GCP 发布/订阅作为总线实现,配置服务器和配置客户端都需要依赖关系。spring-cloud-gcp-starter-bus-pubsub
所有其他配置都是Spring Cloud Config的标准配置。
Spring Cloud Config Server通常在端口上运行,可以从各种源代码管理系统(如GitHub)甚至本地文件系统中读取配置。 当服务器收到新配置可用的通知时,它会获取更新的配置并通过 Spring Cloud Bus 发送通知 ()。8888
RefreshRemoteApplicationEvent
在本地存储配置时,配置服务器会轮询父目录以进行更改。 将配置存储在源代码管理存储库(如 GitHub)中时,需要通知配置服务器有新版本的配置可用。 在已部署的服务器中,这将通过 GitHub webhook 自动完成,但在本地测试方案中,需要手动调用 HTTP 终结点。/monitor
curl -X POST http://localhost:8888/monitor -H "X-Github-Event: push" -H "Content-Type: application/json" -d '{"commits": [{"modified": ["application.properties"]}]}'
通过添加依赖项,您可以指示 Spring Cloud Bus 使用 Cloud Pub/Sub 广播配置更改。 然后,Spring Cloud Bus 将为每个配置客户端创建一个名为的主题以及订阅。spring-cloud-gcp-starter-bus-pubsub
springCloudBus
配置服务器恰好也是一个配置客户端,订阅它发送的配置更改。 因此,在具有一个配置服务器和一个配置客户端的方案中,将创建两个对主题的匿名订阅。 但是,默认情况下,配置服务器禁用配置刷新(有关更多详细信息,请参阅ConfigServerBootstrapApplicationListener)。springCloudBus
提供演示应用程序,显示通过云发布/子供电总线进行配置管理和分发。 该示例包含两个使用 Spring Cloud Bus 进行配置管理的示例:一个监视本地文件系统,另一个从 GitHub 存储库检索配置。
10. 云跟踪
Google Cloud Platform 提供了一个名为 CloudTrace 的托管分布式跟踪服务,SpringCloud Sleuth可以与它一起使用,以轻松检测 Spring Boot 应用程序的可观察性。
通常,Spring Cloud Sleuth捕获跟踪信息并将跟踪转发到Zipkin等服务进行存储和分析。 但是,在 GCP 上,您可以使用 Cloud Trace 来存储跟踪、查看跟踪详细信息、生成延迟分布图以及生成性能回归报告,而不是运行和维护自己的 Zipkin 实例和存储。
这个Spring Cloud GCP入门器可以将Spring Cloud Sleuth跟踪转发到Cloud Trace,而无需中间的Zipkin服务器。
Maven 坐标,使用Spring Cloud GCP BOM:
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>spring-cloud-gcp-starter-trace</artifactId>
</dependency>
格拉德尔坐标:
dependencies {
implementation("com.google.cloud:spring-cloud-gcp-starter-trace")
}
您必须从 Google Cloud Console 启用云跟踪 API 才能捕获跟踪。 导航到项目的云跟踪 API,并确保它已启用。
如果您已经在使用 Zipkin 服务器从多个平台/框架捕获跟踪信息,您还可以使用Stackdriver Zipkin 代理将这些跟踪转发到 Cloud Trace,而无需修改现有应用程序。 |
10.1. 追踪
春云侦探使用勇敢的追踪器生成痕迹。 这种集成使 Brave 能够使用StackdriverTracePropagation传播。
传播负责从实体中提取跟踪上下文(例如,HTTP servlet 请求)并将跟踪上下文注入实体。 传播用法的一个典型示例是接收 HTTP 请求的 Web 服务器,该请求在向原始调用方返回 HTTP 响应之前触发来自服务器的其他 HTTP 请求。
在这种情况下,它首先在X-B3 标头 (,) 中查找跟踪上下文。 如果未找到这些,将回退到密钥(例如,HTTP 请求标头)。StackdriverTracePropagation
X-B3-TraceId
X-B3-SpanId
StackdriverTracePropagation
x-cloud-trace-context
如果您需要不同的传播行为(例如,主要依赖于混合的Spring/非Spring应用程序环境),Spring Cloud Sleuth允许通过自定义传播类型进行此类自定义。x-cloud-trace-context
密钥的值可以通过三种不同的方式格式化:
|
10.2. 用于云跟踪的 Spring 启动启动器
Spring Boot Starter for Cloud Trace使用Spring Cloud Sdetectth并自动配置一个StackdriverSender,将侦探的跟踪信息发送到Cloud Trace。
所有配置都是可选的:
名字 | 描述 | 必填 | 默认值 |
| 自动配置弹簧云侦探以将跟踪发送到云跟踪。 | 不 | |
| 覆盖来自 Spring Cloud GCP 模块的项目 ID | 不 | |
| 覆盖来自 Spring Cloud GCP 模块的凭据位置 | 不 | |
| 覆盖来自 Spring Cloud GCP 模块的凭据编码密钥 | 不 | |
| Overrides the credentials scopes from the Spring Cloud GCP Module | No | |
| Number of threads used by the Trace executor | No | 4 |
| HTTP/2 authority the channel claims to be connecting to. | No | |
| 要在跟踪调用中使用的压缩的名称 | 不 | |
| 呼叫截止时间(毫秒) | 不 | |
| 入站邮件的最大大小 | 不 | |
| 出站邮件的最大大小 | 不 | |
| 等待通道准备就绪,以防发生暂时性故障 | 不 | |
| 挂起的跨度将批量发送到 GCP 云跟踪之前的超时(以秒为单位)。(以前 | 不 | 1 |
| 服务器响应超时(以毫秒为单位)。 | 不 | |
| (实验性)自动配置跟踪的发布/订阅检测。 | 不 | |
您可以使用核心的泉云侦探属性来控制侦探的采样率等。 阅读侦探文档,了解有关侦探配置的更多信息。
例如,当您进行测试以查看跟踪是否通过时,可以将采样率设置为 100%。
spring.sleuth.sampler.probability=1 # Send 100% of the request traces to Cloud Trace.
spring.sleuth.web.skipPattern=(^cleanup.*|.+favicon.*) # Ignore some URL paths.
spring.sleuth.scheduled.enabled=false # disable executor 'async' traces
默认情况下,Spring Cloud Sleuth 自动配置会检测执行器 bean,如果您的应用程序或其依赖项之一将调度程序 bean 引入 Spring 应用程序上下文,则可能会导致具有 name 的重复跟踪出现在云跟踪中。为避免这种干扰,请禁用通过应用程序配置执行程序的自动检测。 |
Spring Cloud GCP 跟踪会覆盖某些侦探配置:
- 始终使用 128 位跟踪 ID。 这是云跟踪所必需的。
- 不使用跨度联接。 跨度联接将在客户端和服务器跨度之间共享跨度 ID。 云跟踪要求跟踪中的每个 Span ID 都是唯一的,因此不支持 Span 联接。
- 默认情况下用于填充堆栈驱动程序相关字段。
StackdriverHttpRequestParser
10.3. 覆盖自动配置
Spring Cloud Sleuth 从 2.1.0 版本开始支持向多个跟踪系统发送跟踪。 为了使它工作,每个跟踪系统都需要有aand。 如果要覆盖提供的 bean,则需要为它们指定一个特定名称。 为此,您可以分别使用和。Reporter<Span>
Sender
StackdriverTraceAutoConfiguration.REPORTER_BEAN_NAME
StackdriverTraceAutoConfiguration.SENDER_BEAN_NAME
10.4. 自定义跨度
您可以使用 向跨度添加其他标记和注释,这在应用程序上下文中可用。brave.SpanCustomizer
下面是一个用于配置 MVC 拦截器的示例,该拦截器向所有 Web 控制器跨度添加两个额外的标记。WebMvcConfigurer
@SpringBootApplication
public class Application implements WebMvcConfigurer {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Autowired
private SpanCustomizer spanCustomizer;
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new HandlerInterceptor() {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
spanCustomizer.tag("session-id", request.getSession().getId());
spanCustomizer.tag("environment", "QA");
return true;
}
});
}
}
然后,您可以根据云跟踪服务中的这些附加标签搜索和筛选跟踪。
10.5. 与日志记录集成
可通过云日志记录支持与云日志记录集成。 如果跟踪集成与日志记录集成一起使用,则请求日志将与相应的跟踪相关联。 可以通过转到Google云控制台跟踪列表,选择跟踪并按部分中的链接来查看跟踪日志。Logs → View
Details
10.6. 发布/订阅跟踪检测(实验)
可以使用属性为发布/订阅消息启用跟踪检测和传播。 默认情况下,它设置为,但设置为 时,只要应用程序通过或在其上构建的任何其他集成(例如 Spring 集成通道适配器和 Spring Cloud Stream Binder)发送或接收消息,就会创建跟踪跨度并将其传播到 Cloud Trace。spring.cloud.gcp.trace.pubsub.enabled=true
false
true
PubSubTemplate
PubSubTemplate
# Enable Pub/Sub tracing using this property
spring.cloud.gcp.trace.pubsub.enabled=true
# You should disable Spring Integration instrumentation by Sleuth as it's unnecessary when Pub/Sub tracing is enabled
spring.sleuth.integration.enabled=false
11. 云日志记录
Maven 坐标,使用Spring Cloud GCP BOM:
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>spring-cloud-gcp-starter-logging</artifactId>
</dependency>
格拉德尔坐标:
dependencies {
implementation("com.google.cloud:spring-cloud-gcp-starter-logging")
}
Cloud Logging 是 Google CloudPlatform 提供的托管日志记录服务。
此模块支持将 Web 请求跟踪 ID 与相应的日志条目相关联。 它通过从映射诊断上下文 (MDC) 中检索值来实现此目的,该上下文由 Spring Cloud Sleuth 设置。 如果未使用春云侦探,则配置提取所需的标头值并将其设置为日志条目的跟踪 ID。 这允许按请求对日志消息进行分组,例如,在Google 云控制台日志查看器中。X-B3-TraceId
TraceIdExtractor
由于日志记录的设置方式,其中定义的 GCP 项目 ID 和凭据将被忽略。 相反,您应该将 andenvironment 变量分别设置为项目 ID 和凭据私钥位置。 如果您使用的是Google Cloud SDK,则可以分别使用 and命令轻松完成此操作。 |
11.1. 网页 MVC 拦截器
为了在基于 Web MVC 的应用程序中使用,提供了使用 aa 从 HTTP 请求中提取请求跟踪 ID 并将其存储在线程本地中,然后可以在日志记录追加器中使用该线程将跟踪 ID 元数据添加到日志消息中。TraceIdLoggingWebMvcInterceptor
TraceIdExtractor
如果启用了 Spring Cloud GCP 跟踪,日志记录模块将禁用自身并将日志关联委托给 Spring Cloud 侦探。 |
LoggingWebMvcConfigurer
还提供了配置类来帮助注册Spring MVC应用程序。TraceIdLoggingWebMvcInterceptor
托管在 Google Cloud Platform 上的应用程序在标头下包含跟踪 ID,这些 ID 将包含在日志条目中。 但是,如果使用侦探,将从 MDC 中选取跟踪 ID。x-cloud-trace-context
11.2. 登录支持
目前,仅支持 Logback,并且有 2 种可能性可以通过此库和 Logback 登录到云日志记录:通过直接 API 调用和通过 JSON 格式的控制台日志。
11.2.1. 通过 API 记录
可以使用云日志记录追加器。 此追加程序从 JUL 或 Logback 日志条目构建云日志记录日志条目,向其添加跟踪 ID 并将其发送到云日志记录。com/google/cloud/spring/logging/logback-appender.xml
STACKDRIVER_LOG_NAME
和环境变量可用于自定义追加器。STACKDRIVER_LOG_FLUSH_LEVEL
STACKDRIVER
然后,您的配置可能如下所示:
<configuration>
<include resource="com/google/cloud/spring/logging/logback-appender.xml" />
<root level="INFO">
<appender-ref ref="STACKDRIVER" />
</root>
</configuration>
如果要对日志输出进行更多控制,可以进一步配置追加器。 以下属性可用(有关完整列表,请参阅java-logging-logback 项目):
财产 | 默认值 | 描述 |
| | 云日志记录日志名称。 这也可以通过环境变量进行设置。 |
| | 如果遇到具有此级别的日志条目,请触发将本地缓冲日志刷新到云日志记录。 这也可以通过环境变量进行设置。 |
| 用于自定义日志记录条目的完全限定类名;必须实施。 | |
| 完全限定的类名,用于自定义给定ILoggingEvent 的日志记录条目;必须实施。 |
11.2.2. 异步日志记录
如果要将日志异步发送到云日志记录,可以使用。AsyncAppender
然后,您的配置可能如下所示:
<configuration>
<include resource="com/google/cloud/spring/logging/logback-appender.xml" />
<appender name="ASYNC_STACKDRIVER" class="ch.qos.logback.classic.AsyncAppender">
<appender-ref ref="STACKDRIVER" />
</appender>
<root level="INFO">
<appender-ref ref="ASYNC_STACKDRIVER" />
</root>
</configuration>
11.2.3. 通过控制台登录
对于 Logback,afile 可用于导入,以便更轻松地配置 JSON Logback 追加程序。com/google/cloud/spring/logging/logback-json-appender.xml
然后,您的配置可能如下所示:
<configuration>
<include resource="com/google/cloud/spring/logging/logback-json-appender.xml" />
<root level="INFO">
<appender-ref ref="CONSOLE_JSON" />
</root>
</configuration>
如果您的应用程序在 Google Kubernetes Engine、Google Compute Engine 或 Google App Engine Flexible 上运行,您的控制台日志记录会自动保存到 Google Cloud Logging。 因此,您可以只在日志记录配置中包含将 JSON 条目记录到控制台。 将正确设置跟踪 ID。com/google/cloud/spring/logging/logback-json-appender.xml
如果要对日志输出进行更多控制,可以进一步配置追加器。 以下属性可用:
财产 | 默认值 | 描述 |
| 如果未设置,则按以下顺序确定默认值:
| 这用于生成完全限定的云跟踪 ID 格式: 需要此格式来关联云跟踪和云日志记录之间的跟踪。 ifis 未设置且无法确定,则它将在没有完全限定格式的情况下进行日志。 |
| | 用于检索跟踪 ID 的 MDC 字段名称 |
| | 用于检索范围 ID 的 MDC 字段名称 |
| | 是否应包含跟踪 ID |
| | 是否应包含跨度 ID |
| | 是否应包括严重性 |
| | 是否应包含线程名称 |
| | 是否应包括所有 MDC 属性。 由Spring Sdetecth提供的MDC属性将被排除在外,因为它们被单独处理 |
| | 是否应包括记录器的名称 |
| | 是否应包含格式化的日志消息。 |
| | 是否应将堆栈跟踪附加到格式化的日志消息中。 此设置仅计算 ifis |
| | 是否应包含日志记录上下文 |
| | 是否应包含带有空白占位符的日志消息 |
| | 堆栈跟踪是否应作为自己的字段包含在内 |
| 没有 | 定义堆栈驱动程序服务上下文数据(服务和版本)。这样,您就可以在 Google Cloud 错误报告视图中过滤服务和版本的错误报告。 |
| 没有 | 定义自定义 JSON 数据。数据将添加到 json 输出中。 |
| 没有 | 实现的类的名称修改 JSON 日志记录输出。此标记是可重复的。 扩展包中提供了示例。 - 日志增强剂 |
这是此类 Logback 配置的示例:
<configuration >
<property name="projectId" value="${projectId:-${GOOGLE_CLOUD_PROJECT}}"/>
<appender name="CONSOLE_JSON" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
<layout class="com.google.cloud.spring.logging.StackdriverJsonLayout">
<projectId>${projectId}</projectId>
<!--<traceIdMdcField>traceId</traceIdMdcField>-->
<!--<spanIdMdcField>spanId</spanIdMdcField>-->
<!--<includeTraceId>true</includeTraceId>-->
<!--<includeSpanId>true</includeSpanId>-->
<!--<includeLevel>true</includeLevel>-->
<!--<includeThreadName>true</includeThreadName>-->
<!--<includeMDC>true</includeMDC>-->
<!--<includeLoggerName>true</includeLoggerName>-->
<!--<includeFormattedMessage>true</includeFormattedMessage>-->
<!--<includeExceptionInMessage>true</includeExceptionInMessage>-->
<!--<includeContextName>true</includeContextName>-->
<!--<includeMessage>false</includeMessage>-->
<!--<includeException>false</includeException>-->
<!--<serviceContext>
<service>service-name</service>
<version>service-version</version>
</serviceContext>-->
<!--<customJson>{"custom-key": "custom-value"}</customJson>-->
<!--<loggingEventEnhancer>your.package.YourLoggingEventEnhancer</loggingEventEnhancer> -->
</layout>
</encoder>
</appender>
</configuration>
12. 云监控
Google Cloud Platform提供一项名为Cloud Monitoring的服务,Micrometer可以与它一起使用,以轻松检测Spring Boot应用程序的可观察性。
Spring Boot 已经为云监控提供了自动配置。 该模块可实现与的自动检测。 此外,它可以定制。project-id
credentials
Maven 坐标,使用Spring Cloud GCP BOM:
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>spring-cloud-gcp-starter-metrics</artifactId>
</dependency>
格拉德尔坐标:
dependencies {
implementation("com.google.cloud:spring-cloud-gcp-starter-metrics")
}
您必须从 Google 云控制台启用云监控 API 才能捕获指标。 导航到项目的云监视 API,并确保它已启用。
用于云监控的 Spring 启动启动器使用千分尺。
12.1. 配置
所有配置都是可选的:
名字 | 描述 | 必填 | 默认值 |
| 自动配置千分尺以将指标发送到云监控。 | 不 | |
| 覆盖来自 Spring Cloud GCP 模块的项目 ID | 不 | |
| 覆盖来自 Spring Cloud GCP 模块的凭据位置 | 不 | |
| 覆盖来自 Spring Cloud GCP 模块的凭据编码密钥 | 不 | |
| 覆盖来自 Spring Cloud GCP 模块的凭据范围 | 不 |
您可以使用核心 Spring 引导执行器属性来控制报告频率等。 阅读Spring 引导执行器文档,了解有关堆栈驱动程序执行器配置的更多信息。
12.1.1. 指标消歧
默认情况下,/the不会向指标添加任何特定于应用程序/Pod 的标签, 因此,谷歌无法区分多个指标来源。 这可能会导致应用程序日志中出现以下警告:spring-cloud-gcp-starter-metrics
StackdriverMeterRegistry
2022-08-15 10:26:00.248 WARN 1 --- [trics-publisher] i.m.s.StackdriverMeterRegistry : failed to send metrics to Stackdriver
实际原因消息可能会有所不同:
One or more TimeSeries could not be written:
One or more points were written more frequently than the maximum sampling period configured for the metric.: global{} timeSeries[4]: custom.googleapis.com/process/uptime{};
One or more points were written more frequently than the maximum sampling period configured for the metric.: global{} timeSeries[6]: custom.googleapis.com/system/load/average/1m{};
One or more points ...
甚至:
Caused by: io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2Exception: Header size exceeded max allowed size (10240)
(由于错误消息太长)
Google 会拒绝之前(从其他应用)以相同间隔接收的条目的指标更新。 为了避免这些冲突,为了区分应用程序/实例,您应该添加类似于以下内容的配置:
management:
metrics:
tags:
app: my-foobar-service
instance: ${random.uuid}
除了随机的 uuid,您还可以使用 pod id/主机名或其他一些实例 id。 在此处阅读有关自定义代码的更多信息。
标签:订阅,GCP,spring,gcp,Cloud,Spring,cloud From: https://blog.51cto.com/u_15326439/5901865