首页 > 其他分享 >使用benthos 实现stream load入库到doris

使用benthos 实现stream load入库到doris

时间:2024-05-26 10:23:34浏览次数:12  
标签:load http stream format json 100 doris

下面给出yaml配置,只有input和output,中间可以自定义数据转换pipeline
当前的数据从kafka中取出来就是json格式,所以不需要进行处理转换,输出段使用http_client组件,配置批处理提高吞吐量

input:
  broker:
    copies: 9
    inputs:
      - kafka:
          addresses:
            - 222.222.222.5:9092
          topics:
            - test_stream_load 
          consumer_group: abc_live
          target_version: 1.1.0
          checkpoint_limit: 10000
          batching: 
            count: 5000
            period: 5s
  processors:
    - log:
        level: debug
        message: "kafka read: *****${! content().string()}*********"

# Config fields, showing default values
# Common config fields, showing default values
output:
  broker:
    copies: 3
    pattern: round_robin
    outputs:
      - http_client:
          url: http://222.222.222.5:8030/api/db/table/_stream_load
          verb: PUT
          headers:
            Content-Type: application/json
            #Connection: keep-alive
            Expect: 100-continue
            Authorization: Basic ******************
            format: json
            read_json_by_line: true
          rate_limit: "" # No default (optional)
          timeout: 10s
          max_in_flight: 100
          batching:
            count: 5000
            byte_size: 0
            period: 5s
            check: ""
            processors:
              - archive:
                  format: lines
      - http_client:
          url: http://222.222.222.5:8030/api/db/table/_stream_load
          verb: PUT
          headers:
            Content-Type: application/json
            #Connection: keep-alive
            Expect: 100-continue
            Authorization: Basic ******************
            format: json
            read_json_by_line: true
          rate_limit: "" # No default (optional)
          timeout: 10s
          max_in_flight: 100
          batching:
            count: 5000
            byte_size: 0
            period: 5s
            check: ""
            processors:
              - archive:
                  format: lines
      - http_client:
          url: http://222.222.222.5:8030/api/db/table/_stream_load
          verb: PUT
          headers:
            Content-Type: application/json
            #Connection: keep-alive
            Expect: 100-continue
            Authorization: Basic ******************
            format: json
            read_json_by_line: true
          rate_limit: "" # No default (optional)
          timeout: 10s
          max_in_flight: 20
          batching:
            ##优化点:频率过高的提交,会导致be publish timeout
            count: 100000
            byte_size: 0
            period: 5s
            check: ""
            processors:
              - archive:
                  format: lines
              ##优化点:1.每一批次的消息增加一个label,加上重试机制,实现exactly-once
              - bloblang: meta stream_label = hostname()+now().ts_format("20060102-15:04:05")
              - log:
                  level: DEBUG
                  message: ${! meta("stream_label")}

标签:load,http,stream,format,json,100,doris
From: https://www.cnblogs.com/hamsure/p/18213381

相关文章

  • THREE.JS中 CubeTextureLoader 使用避坑
    最近在跟着教程学THREE.JS,毕竟在现在的前端开发市场上,THREE.JS太火爆了。今天学到“纹理”这一块的时候,跟着教程敲代码,发现自己的没有正确显示,百思不得其解,打开控制台发现,如下WARNING:localhost/:1[.WebGL-000060380A191C00]GL_INVALID_VALUE:Eachcubemapfacemusthavee......
  • Qt - Qt6中QTextStream类的setCodec方法没有了,怎么解决写中文文本乱码
    简介场景:程序在linux下运行,将中英文写入文本,将文本在windows上打开时,中文出现乱码 原Qt5中:QFilefile;file.open(QIODevice::WriteOnly|QIODevice::Text);QTextStreamtextStream(&file);textStream.setCodec("GBK");使用 QTextStream类的 setCodec方法即可解决上......
  • Redis Stream消息队列
    工具类部分内容packagecom.hwd.campus.common.redis.utils;importcom.hwd.campus.common.redis.constant.RedisKeyPrefixConst;importcom.hwd.campus.common.redis.service.RedisListSelect;importcom.hwd.campus.common.redis.service.RedisSelect;importlombok.AllA......
  • Apache DorisDB 线上部署
    ApacheDorisDB线上部署一、机器资源(初始)机器IPHostname内存CPU磁盘172.16.203.151dorisdb203-15116g4核500G172.16.203.152dorisdb203-15216g4核500G172.16.203.153dorisdb203-15316g4核500G二、角色分配机器IP角色172.16.203.15......
  • Doris:数据导入导出
    数据导入导入(Load)功能就是将用户的原始数据导入到Doris中。导入成功后,用户即可通过Mysql客户端查询数据。为适配不同的数据导入需求,Doris系统提供了6种不同的导入方式(Broker、Stream、Insert、Multi、Routine、S3)。每种导入方式支持不同的数据源,存在不同的使用方式(异步,......
  • stream流
    Stream流作用:结合lamada表达式,简化集合、数组的操作操作步骤1、获取Stream流单列集合⭐️Collection中的默认方法双列集合⭐️无法直接使用,需要配合keyset、entryset数组Arrays.stream(arr)零散数据需要同种数据类型Stream.of()2、中间方法filter......
  • 说说Loader和Plugin的区别?编写Loader,Plugin的思路?
    一、区别前面两节我们有提到Loader与Plugin对应的概念,先来回顾下loader是文件加载器,能够加载资源文件,并对这些文件进行一些处理,诸如编译、压缩等,最终一起打包到指定的文件中plugin赋予了webpack各种灵活的功能,例如打包优化、资源管理、环境变量注入等,目的是解决loader......
  • tensorflow.js示例笔记 - predict-download-time
    预测下载时间。<!DOCTYPEhtml><html><head><title>predict-download-time</title><style>canvas{border:1pxsolid#d3d3d3;}</style><sc......
  • Flask + React 框架 和 Streamlit 的比较
    使用Flask+React这种框架相对于Streamlit有以下优缺点:优点灵活性和扩展性:Flask+React:提供了高度的灵活性和扩展性。你可以根据需求使用各种前端和后端技术,构建复杂的应用程序,增加额外的功能模块,并进行微服务架构的扩展。Streamlit:主要用于快速构建数据应用,适用于较小规......
  • Stream流常用方法总结
    Stream流思想:先得到集合或者数组的Stream流(就是一根传送带);把元素放上去;然后就用这个Stream流简化的API来方便的操作元素。 Stream流的三类方法:1、获取Stream流:创建一条流水线,并把数据流放到流水线上准备进行操作;2、中间方法:流水线上的操作,一次操作完毕之后,还可以继续进行其......