1 部署一个Knative事件驱动应用
通过以下9个步骤完成一个完整的Knative事件驱动应用的部署与验证。
1)下载官方示例代码:
$ git clone-b "release-0.16" https://github.com/knative/docs knative-docs
2)创建镜像,{username}需要替换为自己的Docker Hub的用户名:
$ cd knative-docs/docs/eventing/samples/helloworld/helloworld-go
$ docker build-t {username}/helloworld-go.
$ docker push {username}/helloworld-go
3)创建应用配置。以下是配置示例文件sample-app.yaml中的内容:
$ vim sample-app.yaml
# 命名空间启用Eventing Injection
apiVersion: v1
kind: Namespace
metadata:
name: knative-samples
labels:
knative-eventing-injection: enabled
---
# 部署Helloworld-go应用
apiVersion: apps/v1
kind: Deployment
metadata:
name: helloworld-go
namespace: knative-samples
spec:
replicas: 1
selector:
matchLabels: &labels
app: helloworld-go
template:
metadata:
labels: *labels
spec:
containers:
- name: helloworld-go
image: docker.io/{username}/helloworld-go
---
# 创建Service,Trigger可以通过Service来确定subscriber
kind: Service
apiVersion: v1
metadata:
name: helloworld-go
namespace: knative-samples
spec:
selector:
app: helloworld-go
ports:
- protocol: TCP
port: 80
targetPort: 8080
---
# 创建Trigger
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: helloworld-go
namespace: knative-samples
spec:
broker: default
filter:
attributes:
type: dev.knative.samples.helloworld
source: dev.knative.samples/helloworldsource
subscriber:
ref:
apiVersion: v1
kind: Service
name: helloworld-go
注意:Trigger中的订阅者可以是Kubernetes的Service,也可以是Knative的Service,这取决于apiVersion定义的值。这体现了Knative的松散组合、不绑定的设计原则。在这个例子中,我们使用的是Kubernetes的Service。
4)部署应用程序:
$ kubectl apply-f sample-app.yaml
5)验证knative-eventing-injection标签是否正常启用:
$ kubectl get ns knative-samples--show-labels
NAME STATUS AGE LABELS
knative-samples Active 12m knative-eventing-injection=enabled
注意:当某命名空间中开启knative-eventing-injection=enabled标签后,Knative Eventing Controller会自动在此命名空间创建代理(Broker)。
6)验证资源运行状态:
$ kubectl-n knative-samples get deploy helloworld-go
$ kubectl-n knative-samples get svc helloworld-go
$ kubectl-n knative-samples get trigger helloworld-go
7)获取Broker URL:
$ kubectl--namespace knative-samples get broker default
NAME READY REASON URL AGE
default True http://broker-ingress.knative-eventing.svc. 2d
cluster.local/knative-samples/default
8)按照CloudEvent规范发送HTTP请求到Broker。
可以按照CloudEvent规范创建一个HTTP请求发送给Broker,再通过Trigger传递给已订阅的应用,然后验证此Web应用程序(helloworld-go)对事件的接收与响应
# 创建一个带有curl命令的Pod
$ kubectl-n knative-samples run curl--image=radial/busyboxplus:curl-it--
generator=run-pod/v1
# 通过curl命令向Broker发起请求
[ root@curl:/ ]$ curl-v "broker-ingress.knative-eventing.svc.cluster.local/
knative-samples/default" \
-X POST \
-H "Ce-Id: 536808d3-88be-4077-9d7a-a3f162705f79" \
-H "Ce-specversion: 1.0" \
-H "Ce-Type: dev.knative.samples.helloworld" \
-H "Ce-Source: dev.knative.samples/helloworldsource" \
-H "Content-Type: application/json" \
-d '{"msg":"Hello World from the curl pod."}'
9)验证helloworld-go应用的事件接收情况。
查看helloworld-go应用的日志信息来验证事件是否已经接收。
$ kubectl--namespace knative-samples logs-l app=helloworld-go--tail=50
2020/08/16 11:42:22 Event received. Context: Context Attributes,
specversion: 1.0
type: dev.knative.samples.helloworld
source: dev.knative.samples/helloworldsource
id: 536808d3-88be-4077-9d7a-a3f162705f79
time: 2020-08-16T11:42:22.813410631Z
datacontenttype: application/json
Extensions,
knativearrivaltime: 2020-08-16T11:42:22.813310554Z
knativehistory: default-kne-trigger-kn-channel.knative-samples.svc.cluster.local
traceparent: 00-59756cc2d11f022c86ff67cd9e33f61f-50f538cf949e0c44-00
2020/08/16 11:42:22 Hello World Message from received event "Hello World from
the curl pod."
2020/08/16 11:42:22 Responded with event Validation: valid
Context Attributes,
specversion: 1.0 # CloudEvent规范的版本
type: dev.knative.samples.hifromknative # 事件类型
source: knative/eventing/samples/hello-world # 事件源
id: 5e42b46b-9629-4b73-8fc6-e01ec06b8d83 # 事件ID
Data, # 事件的内容
{"msg":"Hi from helloworld-go app!"}
2 使用通道与订阅方式传递事件
如果想要有多个事件接收器,每个接收器带有各自的服务来响应事件,可以采用通道与订阅方式来实现。使用通道与订阅的方式可以实现将事件的生产者和消费者解耦。
通道是一个事件转发和持久层,每个通道是一个独立的Kubernetes定制资源。通道的后端可能是通过Apache Kafka或InMemoryChannel实现的。订阅是将服务注册监听到特定通道的方法。
接下来,通过范例来演示使用通道与订阅的方式处理事件。
1)创建一个通道:
apiVersion: messaging.knative.dev/v1
kind: Channel
metadata:
name: eventing-channel
将上面的配置文本保存成channel.yaml文件,然后执行以下命令。
kubectl apply-f channel.yaml
2)创建一个事件源,将事件发送到通道:
apiVersion: sources.knative.dev/v1alpha2
kind: PingSource
metadata:
name: channel-ping-source
spec:
schedule: "*/2 * * * *"
jsonData: '{"message": "Hello world!"}'
sink:
ref:
apiVersion: messaging.knative.dev/v1
kind: Channel
name: eventing-channel
将上面的配置文本保存成ping-source.yaml文件,然后执行以下命令:
kubectl apply-f ping-source.yaml
3)创建接收事件的服务:
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: event-display
spec:
template:
spec:
containers:
- image: gcr.io/knative-releases/knative.dev/eventing-contrib/cmd/
event_display
将上面的配置文本保存成service.yaml文件,然后执行以下命令:
kubectl apply-f service.yaml
4)创建服务的订阅,即将服务订阅到制定的通道:
apiVersion: messaging.knative.dev/v1
kind: Subscription
metadata:
name: eventing-subscription
spec:
channel:
apiVersion: messaging.knative.dev/v1
kind: Channel
name: eventing-channel
subscriber:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: event-display
将上面的配置文本保存成subscription.yaml文件,然后执行以下命令:
kubectl apply-f subscription.yaml
3 与Kafka集成
1)安装Apache Kafka:
# curl-L "https://knative.dev/v0.16-docs/eventing/samples/kafka/kafka_setup.sh" | sh-
2)创建事件显示服务(event-display.yaml):
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: event-display
namespace: default
spec:
template:
spec:
containers:
- # This corresponds to
# https://github.com/knative/eventing-contrib/tree/master/cmd/event_
display/main.go
image: gcr.io/knative-releases/knative.dev/eventing-contrib/cmd/
event_display
执行以下命令部署event-display服务:
# kubectl apply--filename event-display.yaml
3)创建Kafka Topic(strimzi-topic.yaml):
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaTopic
metadata:
name: knative-demo-topic
namespace: kafka
labels:
strimzi.io/cluster: my-cluster
spec:
partitions: 3
replicas: 1
config:
retention.ms: 7200000
segment.bytes: 1073741824
执行以下命令部署KafkaTopic:
# kubectl apply-f strimzi-topic.yaml
4)创建Kafka Source到服务:
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
name: kafka-source
spec:
consumerGroup: knative-group
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092 # 注意kafka的命名空间
topics:
- knative-demo-topic
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: event-display
可以根据实际情况修改bootstrapServers、topics字段值。
部署事件源:
# kubectl apply-f event-source.yaml
生产一个消息{"msg":"This is a test!"}发送到Kafka topic,命令如下:
# kubectl-n kafka run kafka-producer-ti--image=strimzi/kafka:0.14.0-
kafka-2.3.0--rm=true--restart=Never-- bin/kafka-console-producer.sh--
broker-list my-cluster-kafka-bootstrap:9092--topic knative-demo-topic
If you don't see a command prompt, try pressing enter.
>{"msg": "This is a test!"}
检查event-display容器的日志输出:
5)创建KafkaChannel和Broker。
创建KafkaChannel示例:
cat <<-EOF | kubectl apply-f-
---
apiVersion: messaging.knative.dev/v1alpha1
kind: KafkaChannel
metadata:
name: my-kafka-channel
spec:
numPartitions: 3
replicationFactor: 1
EOF
指定默认的Channel配置:
cat <<-EOF | kubectl apply-f-
---
apiVersion: v1
kind: ConfigMap
metadata:
name: default-ch-webhook
namespace: knative-eventing
data:
# Configuration for defaulting channels that do not specify CRD implementations.
default-ch-config: |
clusterDefault:
apiVersion: messaging.knative.dev/v1alpha1
kind: KafkaChannel
spec:
numPartitions: 3
replicationFactor: 1
EOF
4 ContainerSource事件源
ContainerSource事件源是应用开发中最常用的一种事件通信方式。通常,ContainerSource事件源代表特定业务含义的事件。使用ContainerSource创建新的事件源时,需要创建一个容器镜像,然后使用容器镜像的URI和特定参数值进行创建。该容器镜像将在特定业务场景下生成事件并将消息发送到接收器的URI。这也是在Knative中支持自定义事件源的简便方法。以下展示了如何将ContainerSource配置为函数的事件源,并概述创建ContainerSource自定义事件源的准则。
创建heartbeats容器镜像
假设运行环境中已经安装有ko部署工具和golang开发工具。使用Knative的eventing-contrib项目中heartbeats事件源。heartbeats容器每1秒将发送一个CloudEvent事件。
1)克隆项目源代码:
# mkdir-p ${GOPATH}/src/knative.dev
# cd ${GOPATH}/src/knative.dev
# git clone-b "release-0.15" https://github.com/knative/eventing-contrib.git
2)构建heartbeats容器镜像并发布到容器镜像仓库:
# export KO_DOCKER_REPO="docker.io/cnlab" ##容器镜像仓库的地址可以修改成自己的仓库地址
# ko publish knative.dev/eventing-contrib/cmd/heartbeats
3)创建Knative Service event-display。
为了验证ContainerSource是否正常运行,我们将创建一个事件显示event-display服务。该服务将收到的消息转存到日志中。event-display服务的配置文件service.yaml示例如下:
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: event-display
spec:
template:
spec:
containers:
- image: gcr.io/knative-releases/knative.dev/eventing-contrib/cmd/
event_display
使用以下命令创建event-display服务:
kubectl apply-f service.yaml
4)使用heartbeats容器镜像创建ContainerSource:
为了将heartbeats容器作为事件源运行,必须创建具有特定参数和环境设置的ContainerSource。这里要将.spec.template.spec.containers[0].image设置为第1步生成的镜像URL。
apiVersion: sources.knative.dev/v1alpha2
kind: ContainerSource
metadata:
name: test-heartbeats
spec:
template:
spec:
containers:
- image: cnlab/heartbeats-007104604b758f52b70a5535e662802b #事件源容器
name: heartbeats
args:
---period=1
env:
- name: POD_NAME
value: "mypod"
- name: POD_NAMESPACE
value: "event-test"
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: event-display
5)验证结果:
通过查看event-display服务的日志,验证消息已发送到Knative事件系统。
kubectl logs-l serving.knative.dev/service=event-display-c user-container--since=10m
以下是相应的日志行,显示heartbeatsg事件源发送给event-display函数事件消息的请求头和消息体内容:
标签:name,helloworld,dev,事件驱动,Knative,samples,组件,event,knative From: https://www.cnblogs.com/muzinan110/p/17067020.html