下面给出yaml配置,只有input和output,中间可以自定义数据转换pipeline
当前的数据从kafka中取出来就是json格式,所以不需要进行处理转换,输出段使用http_client组件,配置批处理提高吞吐量
input:
broker:
copies: 9
inputs:
- kafka:
addresses:
- 222.222.222.5:9092
topics:
- test_stream_load
consumer_group: abc_live
target_version: 1.1.0
checkpoint_limit: 10000
batching:
count: 5000
period: 5s
processors:
- log:
level: debug
message: "kafka read: *****${! content().string()}*********"
# Config fields, showing default values
# Common config fields, showing default values
output:
broker:
copies: 3
pattern: round_robin
outputs:
- http_client:
url: http://222.222.222.5:8030/api/db/table/_stream_load
verb: PUT
headers:
Content-Type: application/json
#Connection: keep-alive
Expect: 100-continue
Authorization: Basic ******************
format: json
read_json_by_line: true
rate_limit: "" # No default (optional)
timeout: 10s
max_in_flight: 100
batching:
count: 5000
byte_size: 0
period: 5s
check: ""
processors:
- archive:
format: lines
- http_client:
url: http://222.222.222.5:8030/api/db/table/_stream_load
verb: PUT
headers:
Content-Type: application/json
#Connection: keep-alive
Expect: 100-continue
Authorization: Basic ******************
format: json
read_json_by_line: true
rate_limit: "" # No default (optional)
timeout: 10s
max_in_flight: 100
batching:
count: 5000
byte_size: 0
period: 5s
check: ""
processors:
- archive:
format: lines
- http_client:
url: http://222.222.222.5:8030/api/db/table/_stream_load
verb: PUT
headers:
Content-Type: application/json
#Connection: keep-alive
Expect: 100-continue
Authorization: Basic ******************
format: json
read_json_by_line: true
rate_limit: "" # No default (optional)
timeout: 10s
max_in_flight: 20
batching:
##优化点:频率过高的提交,会导致be publish timeout
count: 100000
byte_size: 0
period: 5s
check: ""
processors:
- archive:
format: lines
##优化点:1.每一批次的消息增加一个label,加上重试机制,实现exactly-once
- bloblang: meta stream_label = hostname()+now().ts_format("20060102-15:04:05")
- log:
level: DEBUG
message: ${! meta("stream_label")}
标签:load,http,stream,format,json,100,doris
From: https://www.cnblogs.com/hamsure/p/18213381