首页 > 其他分享 >springboot 使用 doris-streamloader 到doris 防止批量更新 事务卡主

springboot 使用 doris-streamloader 到doris 防止批量更新 事务卡主

时间:2024-06-18 18:09:49浏览次数:10  
标签:http streamloader private org apache import 卡主 doris String

背景:

使用mybatis 批量实时和更新doris时 经常出现连接不上的错误,导致kafka死信队列堆积很多滞后消费

https://doris.apache.org/zh-CN/docs/2.0/ecosystem/doris-streamloader/

package com.jiaoda.sentiment.data.etl.service.update;

import cn.hutool.core.text.CharSequenceUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.jiaoda.sentiment.data.etl.service.biz.DwdPublicOpinionDataService;
import lombok.Data;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.codec.binary.Base64;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.FileEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import sun.misc.BASE64Encoder;

import javax.annotation.PostConstruct;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;

import static java.util.jar.Pack200.Unpacker.TRUE;

/**
 * @author by jerryjhhe
 * @description todo
 * @create 2024/5/22 13:41
 */
@Service
@Log4j2
public class DorisStreamLoadClient {


    @Value("${spring.datasource.dynamic.datasource.master.url}")
    private String dorisIP;

    private final String user = "root";

    private final String password = "";

    private final String credentials = user + ":" + password;
    BASE64Encoder encoder = new BASE64Encoder();
    //通过BASE64对账号密码加密
    String ticket = encoder.encode(credentials.getBytes());
    private final static String DATABASE = "analysis";   // 要导入的数据库
    private final static String TABLE = "dwd_public_opinion_data";     // 要导入的表
   /* private final  String loadUrl = String.format("http://%s:%s/api/%s/%s/_stream_load",
            dorisIP, 8030, DATABASE, TABLE);*/
    private String urlTemplateContent = "http://{}:8030/api/{}/{}/_stream_load";

    private final CloseableHttpClient client = httpClientBuilder.build();


    @PostConstruct
    public void init() {
        dorisIP = dorisIP.split(":")[2].replace("//", "");
        log.info("DorisStreamLoadClient doris ip :{}", dorisIP);
    }

    private final static HttpClientBuilder httpClientBuilder = HttpClients
            .custom()
            .setRedirectStrategy(new DefaultRedirectStrategy() {
                @Override
                protected boolean isRedirectable(String method) {
                    // 如果连接目标是 FE,则需要处理 307 redirect。
                    return true;
                }
            });

    private String basicAuthHeader(String username, String password) {
        final String tobeEncode = username + ":" + password;
        byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8));
        return "Basic " + new String(encoded);
    }

    public StreamLoadResult putData(File file, String db, String table) throws IOException {
        String loadUrl = CharSequenceUtil.format(urlTemplateContent, dorisIP, db, table);

        try (CloseableHttpClient client = httpClientBuilder.build()) {
            HttpPut put = new HttpPut(loadUrl);
            put.setHeader(HttpHeaders.EXPECT, "100-continue");
            put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(user,password));

            // 可以在 Header 中设置 stream load 相关属性,这里我们设置 label 和 column_separator。
            put.setHeader("label", "label_" + StrUtil.uuid());
            put.setHeader("format", "json");
            put.setHeader("Content-Type", ContentType.APPLICATION_JSON.toString());
            put.setHeader("strip_outer_array", TRUE);
            // Array 中的每行数据的字段顺序完全一致。Doris 仅会根据第一行的字段顺序做解析,然后以下标的形式访问之后的数据。该方式可以提升 3-5X 的导入效率。
            put.setHeader("fuzzy_parse", TRUE);

//            put.setHeader("jsonpaths","[\"$.siteid\",\"$.username\"]");
//            put.setHeader("columns","siteid,username,doris_update_time=current_timestamp()");

            FileEntity entity = new FileEntity(file);
            put.setEntity(entity);
            System.out.print(entity);

            try (CloseableHttpResponse response = client.execute(put)) {
                response.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(user,password));


                String loadResult = "";
                if (response.getEntity() != null) {
                    loadResult = EntityUtils.toString(response.getEntity());
                }

                final int statusCode = response.getStatusLine().getStatusCode();
                if (statusCode != 200) {
                    throw new IOException(
                            String.format("Stream load failed. status: %s load result: %s", statusCode, loadResult));
                }

               log.info("Get load result: {}" , loadResult);

                return JSON.parseObject(loadResult,StreamLoadResult.class);
            }
        }
    }

    public static void main(String[] args) throws IOException {
        DorisStreamLoadClient dorisStreamLoadClient = new DorisStreamLoadClient();
        StreamLoadResult streamLoadResult = dorisStreamLoadClient.putData(new File("C:\\home\\doris_stream_load\\update_dwdPublicOpinionData.csv"), "analysis", "dwd_public_opinion_data");
        System.out.println(streamLoadResult);
    }

    @Data
    public static class StreamLoadResult {
        private long Txnid;
        private String Label;
        private String Comment;
        private boolean TwoPhaseCommit;
        private String Status;
        private String Message;
        private long numberTotalRows;
        private long numberLoadedRows;
        private long numberFilteredRows;
        private long numberUnselectedRows;
        private long loadBytes;
        private long loadTimeMs;
        private long beginTxnTimeMs;
        private long streamLoadPutTimeMs;
        private long readDataTimeMs;
        private long writeDataTimeMs;
        private long commitAndPublishTimeMs;
    }
}

 使用:


public void test(Object dwdPublicOpinionDataList){

try {
String jsonString = JSON.toJSONString(dwdPublicOpinionDataList);

FileUtils.write(new File(path), jsonString, "utf-8", true);
DorisStreamLoadClient.StreamLoadResult streamLoadResult = dorisStreamLoadClient.putData(new File(path), "analysis", "dwd_public_opinion_data");
if ("Success".equals(streamLoadResult.getStatus())) {
//成功后的逻辑
}

} catch (
IOException e) {
log.error("dorisStreamLoadClient{}失败 :{}", path, e);
} finally {
try {
FileUtils.delete(new File(path));
} catch (IOException e) {
log.error("删除{}失败 :{}", path, e);
return;
}
}

  

标签:http,streamloader,private,org,apache,import,卡主,doris,String
From: https://www.cnblogs.com/hejunhong/p/18254879

相关文章

  • Doris开发手记5:一场链接引发“吊诡”的性能问题
    近期正在对Doris的性能问题展开排查,发现了一个极为“吊诡”的函数执行性能问题。经过一系列的CPU热点代码分析之后,发现“罪魁祸首”居然是libtoolchain中的静态库导致的。借用本篇手记记录下问题的发现,希望记录下一些对于C/C++程序链接问题的分析思路,也希望读者也能有所收获。......
  • 使用spark-sql处理Doris大表关联
    背景最近项目上有一个需求,需要将两张表(A表和B表)的数据进行关联并回写入其中一张表(A表),两张表都是分区表,但是关联条件不包括分区字段。分析过程方案一最朴素的想法,直接关联执行,全表关联,一条SQL搞定全部逻辑。想法越简单,执行越困难。由于数据量大,服务器规模较小,尽管各台服务......
  • doris配置自动拉起服务-supervisor
    服务自动拉起本文档主要介绍如何配置Doris集群的自动拉起,保证生产环境中出现特殊情况导致服务宕机后未及时拉起服务从而影响到业务的正常运行。Doris集群必须完全搭建完成后再配置FE和BE的自动拉起服务。Supervisor配置Doris服务​Supervisor具体使用以及参数解......
  • datax 抽取hive表到doris
    datax读取hive表有两种方式,一种是读取hdfs文件路径HDFSReader,因为hive是存储在hdfs上。第二种是读取hive表RDBMSReader。HDFSReader{"job":{"setting":{"speed":{"channel":3},"......
  • 使用benthos 实现stream load入库到doris
    下面给出yaml配置,只有input和output,中间可以自定义数据转换pipeline当前的数据从kafka中取出来就是json格式,所以不需要进行处理转换,输出段使用http_client组件,配置批处理提高吞吐量input:broker:copies:9inputs:-kafka:addresses:......
  • Apache DorisDB 线上部署
    ApacheDorisDB线上部署一、机器资源(初始)机器IPHostname内存CPU磁盘172.16.203.151dorisdb203-15116g4核500G172.16.203.152dorisdb203-15216g4核500G172.16.203.153dorisdb203-15316g4核500G二、角色分配机器IP角色172.16.203.15......
  • Doris:数据导入导出
    数据导入导入(Load)功能就是将用户的原始数据导入到Doris中。导入成功后,用户即可通过Mysql客户端查询数据。为适配不同的数据导入需求,Doris系统提供了6种不同的导入方式(Broker、Stream、Insert、Multi、Routine、S3)。每种导入方式支持不同的数据源,存在不同的使用方式(异步,......
  • Doris:概念与基础操作
    Doris一款现代化的MPP分析性数据库产品支持亚秒级响应支持10PB以上数据集兼容MySQL协议基础概念doris有3种基础表:明细表(Duplicate):一张普普通通的表,doris默认表模式,支持数据预排序主键表(Unique):一种特殊的聚合表,如果主键重复,会自动更新其他值聚合表(Aggregate):聚合模......
  • 首次尝试SeaTunnel同步Doris至Hive?这些坑你不能不避
    笔者使用SeaTunnel2.3.2版本将Doris数据同步到Hive(cdh-6.3.2)首次运行时有如下报错,并附上报错的解决方案:java.lang.NoClassDefFoundError:org/apache/hadoop/hive/metastore/api/MetaExceptionjava.lang.NoClassDefFoundError:org/apache/thrift/TBasejava.lang.NoClassDe......
  • Doris、StarRocks 压测对比
    先说结论:0、本次测试,未调优二者的参数,开箱起服务,直接测试的,部署架构一致。1、在单表查询下,StarRocks在部分场景下优于Doris,但是二次查询,二者不分伯仲。2、在多表查询下,仅在一个场景下Doris速度逊于StarRocks,大部分场景是Doris优于StarRocks的。3、在cpu和内存的使用上,dori......