首页 > 其他分享 >hive写入star,csv格式的streamload-简单版

hive写入star,csv格式的streamload-简单版

时间:2024-05-16 18:41:21浏览次数:24  
标签:STARROCKS star String list hive static streamload new append

hive写入star,csv格式的streamload

注意字符串中的转移字符直接拼接\n而要显示\\是非转义字符
public class GcyDataTrans {
    private static String STARROCKS_HOST = "IP";
    private static String STARROCKS_HTTP_PORT = "8030";
    private static String STARROCKS_DB = "test";
    private static String STARROCKS_TABLE = "test";
    private static String STARROCKS_USER = "test";
    private static String STARROCKS_PASSWORD = "test12345";
//        String starUtl = "http://IP:8030/api/ic_pms_ods/test_pms_ods_bd_staff_ds_s/_stream_load";


    public static void main(String[] args) {
        if (args.length < 2) {
            System.out.println("参数异常");
            return;
        }
        String hTableName = args[0];
        String srTableName = args[1].split("\\.")[1];
        dealData(hTableName, srTableName);
//        StringBuilder list = new StringBuilder();
//        list.append("1");
//        list.append("2\n");
//        list.append("3\t");
//        list.append("\\N");
//        list.append("4");
//        System.out.println(list.toString());
    }

    public static void dealData(String hTableName, String srTableName) {
        SparkSession spark = SparkSession.builder().appName("gcySync").master("yarn").enableHiveSupport().getOrCreate();
        Dataset<Row> table =
                spark.read().table(hTableName);
        table.show(10);

        table.map(new MapFunction<Row, String>() {
                    @Override
                    public String call(Row value) throws Exception {
                        StringBuilder list = new StringBuilder();
                        for (int i = 0; i < value.length(); i++) {
                            if(i==value.length()-1){
                                if (value.getString(i) == null || value.getString(i) == "") {
                                    list.append("\\N");
                                } else {
                                    list.append(value.getString(i));
                                }
                            }else{
                                if (value.getString(i) == null || value.getString(i) == "") {
                                    list.append("\\N\t");
                                } else {
                                    list.append(value.getString(i)+"\t");
                                }
                            }

                        }
                        String data = list.toString();
                        list.setLength(0);
                        return data;
                    }
                }, Encoders.STRING())
                .mapPartitions(new MapPartitionsFunction<String, String>() {
                    StringBuilder sb = new StringBuilder();

                    @Override
                    public Iterator<String> call(Iterator<String> input) throws Exception {
                        while (input.hasNext()) {
                            sb.append(input.next() + "\n");
                            if (sb.length() > 500) {
                                sendData(sb, srTableName);
                                sb.setLength(0);
                            }
                        }
                        System.out.println("send data to sr size:"+sb.length());
                        sendData(sb, srTableName);
                        sb.setLength(0);
                        return input;
                    }
                }, Encoders.STRING())
                .show(2);

        spark.stop();
    }


    private static void sendData(StringBuilder content, String srTableName) throws Exception {
        String loadUrl = String.format("http://%s:%s/api/%s/%s/_stream_load",
                STARROCKS_HOST,
                STARROCKS_HTTP_PORT,
                STARROCKS_DB,
                srTableName);

        final HttpClientBuilder httpClientBuilder = HttpClients
                .custom()
                .setRedirectStrategy(new DefaultRedirectStrategy() {
                    @Override
                    protected boolean isRedirectable(String method) {
                        return true;
                    }
                });

        try (CloseableHttpClient client = httpClientBuilder.build()) {
            HttpPut put = new HttpPut(loadUrl);
            StringEntity entity = new StringEntity(content.toString(), "UTF-8");
            put.setHeader(HttpHeaders.EXPECT, "100-continue");
            put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(STARROCKS_USER, STARROCKS_PASSWORD));
            // the label header is optional, not necessary
            // use label header can ensure at most once semantics
//            put.setHeader("label", "39c25a5c-7000-496e-a98e-348a264c81de");
            put.setEntity(entity);

            try (CloseableHttpResponse response = client.execute(put)) {
                String loadResult = "";
                if (response.getEntity() != null) {
                    loadResult = EntityUtils.toString(response.getEntity());
                }
                final int statusCode = response.getStatusLine().getStatusCode();
                // statusCode 200 just indicates that starrocks be service is ok, not stream load
                // you should see the output content to find whether stream load is success
                if (statusCode != 200) {
                    throw new IOException(
                            String.format("Stream load failed, statusCode=%s load result=%s", statusCode, loadResult));
                }

                System.out.println(loadResult);
            }
        }
    }

    private static String basicAuthHeader(String username, String password) {
        final String tobeEncode = username + ":" + password;
        byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8));
        return "Basic " + new String(encoded);
    }


标签:STARROCKS,star,String,list,hive,static,streamload,new,append
From: https://www.cnblogs.com/hbym/p/18196505

相关文章

  • 部署freeipa中报错:Command '/bin/systemctl start certmonger.service' returned non-
    cat/etc/dbus-1/system.d/certmonger.conf<allowsend_destination="org.fedorahosted.certmonger"send_interface="org.fedorahosted.certmonger"/><allowsend_destination="org.fedorahosted.certmonger"......
  • 首次尝试SeaTunnel同步Doris至Hive?这些坑你不能不避
    笔者使用SeaTunnel2.3.2版本将Doris数据同步到Hive(cdh-6.3.2)首次运行时有如下报错,并附上报错的解决方案:java.lang.NoClassDefFoundError:org/apache/hadoop/hive/metastore/api/MetaExceptionjava.lang.NoClassDefFoundError:org/apache/thrift/TBasejava.lang.NoClassDe......
  • hive3.1.2概述和基本操作
    1.hive基本概念hive简介hive的本质:Hive本质是将SQL转换为MapReduce的任务进行运算,底层由HDFS来提供数据存储,说白了hive可以理解为一个将SQL转换为MapReduce的任务的工具,甚至更近一步说hive就是一个MapReduce客户端。经常有面试问什么时hive我们可以从两点来回答:1.hive时数据......
  • hive-3.1.2分布式搭建文档
    hive-3.1.2分布式搭建文档1.上传解压配置环境变量#1、解压tar-zxvfapache-hive-3.1.2-bin.tar.gz-C/usr/local/soft/#2、重名名mvapache-hive-3.1.2-binhive-3.1.2#3、配置环境变量vim/etc/profile#4、在最后增加配置exportHIVE_HOME=/usr/local/soft/hiv......
  • Gitee千Star优质项目解析: ng-form-element低开引擎解析
    好家伙,在写项目的时候,我发现自己的平台的组件写的实在是太难看了,于是想去gitee上偷点东西,于是我们本期的受害者出现了gitee项目地址https://gitee.com/jjxliu306/ng-form-elementplus-sample.git组件库以及引擎完全开源,非常牛逼的项目,非常牛逼的作者 项目名:ng-form-......
  • ClickHouse vs StarRocks 全场景MPP数据库选型对比
    ClickHousevsStarRocks选型对比面向列存的DBMS新的选择Hadoop从诞生已经十三年了,Hadoop的供应商争先恐后的为Hadoop贡献各种开源插件,发明各种的解决方案技术栈,一方面确实帮助很多用户解决了问题,但另一方面因为繁杂的技术栈与高昂的维护成本,Hadoop也渐渐地失去了原本......
  • PXE+Kickstart无人值守安装安装Centos7.9
    目录一、什么是PXE1、简介2、工作模式3、工作流程二、什么是Kickstart1、简介2、触发方式三、无人值守安装系统工作流程四、实验部署1、环境准备2、服务端:关闭防火墙和selinux3、添加一张仅主机的网卡4、配置仅主机的网卡4.1、修改网络连接名4.2、配IP地址4.3、重启网卡5、配置DHC......
  • Hive分析函数
    ●测试表test1.groupingsets ①未使用②使用groupingsets(与上面等价)【代码实例】查看代码 --todo方式一--所有学校的人数总和select'全学校'asschool,'全年级'asgrade,count(name)asnum,1asgrouping__idfrom......
  • hive on spark
    1Hive的执行引擎Hive:专业的数仓软件,可以高效的读写和管理数据集。  Hive的运行原理:  ①hive主要是写HQL的(类SQL,相似度90%,剩下的10%就是HQL里面一些独有的语法)  ②写的HQL会根据不同的计算引擎翻译成不同的代码 2数仓搭建技术选型SparkOnHive:基于Hive的Spar......
  • Hive计算窗口内的累计值
    一个值得记下来的窗口累计计算办法,使用的情况是:计算某个窗口内的累计值1.ExamplePart1CREATETABLEtest_table(dailyDATE,person_numINT,app_regionSTRING)ROWFORMATDELIMITEDFIELDSTERMINATEDBY'\t'STOREDASTEXTFILE;--一张包含了daily日期、......