首页 > 编程语言 >Storm-源码分析-Topology Submit-Client

Storm-源码分析-Topology Submit-Client

时间:2023-06-06 13:00:54浏览次数:49  
标签:storm args Submit name Client conf new 源码 topology


1 Storm Client

最开始使用storm命令来启动topology, 如下

storm jar storm-starter-0.0.1-SNAPSHOT-standalone.jar storm.starter.WordCountTopology

这个storm命令是用python实现的, 看看其中的jar函数, 很简单, 调用exec_storm_class, 其中jvmtype=”-client” 
而exec_storm_class其实就是拼出一条java执行命令, 然后用os.system(command)去执行, 为何用Python写, 简单? 可以直接使用storm命令? 
这儿的klass就是topology类, 所以java命令只是调用Topology类的main函数

def jar(jarfile, klass, *args):
    """Syntax: [storm jar topology-jar-path class ...]

Runs the main method of class with the specified arguments.
The storm jars and configs in ~/.storm are put on the classpath.
The process is configured so that StormSubmitter
(http://nathanmarz.github.com/storm/doc/backtype/storm/StormSubmitter.html)
will upload the jar at topology-jar-path when the topology is submitted.
"""
    exec_storm_class(
        klass,
        jvmtype="-client",
        extrajars=[jarfile, CONF_DIR, STORM_DIR + "/bin"],
        args=args,
        childopts="-Dstorm.jar=" + jarfile)

def exec_storm_class(klass, jvmtype="-server", childopts="", extrajars=[], args=[]):
    nativepath = confvalue("java.library.path", extrajars)
    args_str = "".join(map(lambda s: "\"" + s + "\"", args))
    command = "java" + jvmtype + " -Dstorm.home=" + STORM_DIR + "" + get_config_opts() + " -Djava.library.path=" + nativepath + "" + childopts + " -cp" + get_classpath(extrajars) + "" + klass + "" + args_str
    print "Running:" + command
    os.system(command)



直接看看WordCountTopology例子的main函数都执行什么?

除了定义topology, 最终会调用StormSubmitter.submitTopology(args[0], conf, builder.createTopology()), 来提交topology

public static void main(String[] args) throws Exception {        
        TopologyBuilder builder = new TopologyBuilder();        
        builder.setSpout("spout", new RandomSentenceSpout(), 5);        
        builder.setBolt("split", new SplitSentence(), 8)
                 .shuffleGrouping("spout");
        builder.setBolt("count", new WordCount(), 12)
                 .fieldsGrouping("split", new Fields("word"));

        Config conf = new Config();
        conf.setDebug(true);
      
        if(args!=null && args.length > 0) {
            conf.setNumWorkers(3);            
            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
        } else {        
            conf.setMaxTaskParallelism(3);
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("word-count", conf, builder.createTopology());   
            Thread.sleep(10000);
            cluster.shutdown();
        }
    }



 

StormSubmitter

直接看看submitTopology,

1. 配置参数

   把命令行参数放在stormConf, 从conf/storm.yaml读取配置参数到conf, 再把stormConf也put到conf, 可见命令行参数的优先级更高

   将stormConf转化为Json, 因为这个配置是要发送到服务器的

2. Submit Jar

    StormSubmitter的本质是个Thrift Client, 而Nimbus则是Thrift Server, 所以所有的操作都是通过Thrift RPC来完成, Thrift参考Thrift, Storm-源码分析- Thrift的使用

    先判断topologyNameExists, 通过Thrift client得到现在运行的topology的状况, 并check

    然后Submit Jar, 通过底下三步          
    client.getClient().beginFileUpload();

    client.getClient().uploadChunk(uploadLocation, ByteBuffer.wrap(toSubmit));

    client.getClient().finishFileUpload(uploadLocation);

    把数据通过RPC发过去, 具体怎么存是nimbus自己的逻辑的事…

3. Submit Topology

    很简单只是简单的调用RPC

    client.getClient().submitTopologyWithOpts(name, submittedJar, serConf, topology, opts);



/**
     * Submits a topology to run on the cluster. A topology runs forever or until 
     * explicitly killed.
     *
     *
     * @param name the name of the storm.
     * @param stormConf the topology-specific configuration. See {@link Config}. 
     * @param topology the processing to execute.
     * @param options to manipulate the starting of the topology
     * @throws AlreadyAliveException if a topology with this name is already running
     * @throws InvalidTopologyException if an invalid topology was submitted
     */
    public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts) throws AlreadyAliveException, InvalidTopologyException {
        if(!Utils.isValidConf(stormConf)) {
            throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");
        }
        stormConf = new HashMap(stormConf);
        stormConf.putAll(Utils.readCommandLineOpts());
        Map conf = Utils.readStormConfig();
        conf.putAll(stormConf);
        try {
            String serConf = JSONValue.toJSONString(stormConf);
            if(localNimbus!=null) {
                LOG.info("Submitting topology " + name + " in local mode");
                localNimbus.submitTopology(name, null, serConf, topology);
            } else {
                NimbusClient client = NimbusClient.getConfiguredClient(conf);
                if(topologyNameExists(conf, name)) {
                    throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");
                }
                submitJar(conf);
                try {
                    LOG.info("Submitting topology " +  name + " in distributed mode with conf " + serConf);
                    if(opts!=null) {
                        client.getClient().submitTopologyWithOpts(name, submittedJar, serConf, topology, opts);                    
                    } else {
                        // this is for backwards compatibility
                        client.getClient().submitTopology(name, submittedJar, serConf, topology);                                            
                    }
                } catch(InvalidTopologyException e) {
                    LOG.warn("Topology submission exception", e);
                    throw e;
                } catch(AlreadyAliveException e) {
                    LOG.warn("Topology already alive exception", e);
                    throw e;
                } finally {
                    client.close();
                }
            }
            LOG.info("Finished submitting topology: " +  name);
        } catch(TException e) {
            throw new RuntimeException(e);
        }
    }

标签:storm,args,Submit,name,Client,conf,new,源码,topology
From: https://blog.51cto.com/u_2650279/6424062

相关文章

  • iTOP-3588开发板Android12源码定制开发uboot开发
    uboot开发-Uboot源码是v2017.09版本。目前在该平台上已经支持RK所有主流在售芯片。支持的功能主要有:支持RKAndroid固件启动;支持AndroidAOSP固件启动;支持LinuxDistro固件启动;支持Rockchipminiloader和SPL/TPL两种Pre-loader引导;支持LVDS、EDP、MIP......
  • Tinyhttpd:源码分析【3】
    一、问题引入通过Tinyhttpd:运行测试【1】和抓包分析【2】,基本完成了对程序的功能测试和通信原理。此时可以进一步对源码进行分析,本文不考虑代码一行一行的分析,仅对关键部分代码解析。二、解决过程2-1main()函数主函数主要创建http的监听套接字,等待客户端的连接。一旦有新......
  • post请求方式 - 抖音生活服务 使用restTemplate而不使用httpClient
    publicstaticStringdoPostForJson(Stringurl,Stringjson,StringbyteAuthorization){RestTemplaterestTemplate=newRestTemplate();logger.info("restTemplateinvokepostmethod.url:[{}],json:[{}]",url,json);long......
  • Github--源码管理工具介绍
    源代码管理工具在实际软件开发中具有极其重要的作用。相比于相互拷贝源码,使用源代码管理工具更方便开发成员之间进行开发,且使用源码管理工具具有更高的保密性。在此,将对目前相对流行的源代码管理工具--Github进行简要介绍。Github作为源码管理工具,主要由两部分组成:本地数......
  • win10,vs2015深度学习目标检测YOLOV5+deepsort C++多目标跟踪代码实现,源码注释,拿来即
    int8,FP16等选择,而且拿来即用,自己再win10安装上驱动可以立即使用,不用在自己配置,支持答疑。自己辛苦整理的,求大佬打赏一顿饭钱。苦苦苦、平时比较比忙,自己后期会继续发布真实场景项目;欢迎下载。优点:1、架构清晰,yolov5和sort是分开单独写的,可以随意拆解拼接,都是对外接口。2、支持答疑......
  • 1.2 Java基础 数据类型(Integer源码解析)
    Java数据类型和Integer源码解析1.2.1基本数据类型1.2.2什么是拆装箱1.2.3拆装箱是如何实现的1.2.4Integer继承关系1.2.5Integer源码解析1.2.1基本数据类型       如大家所知,Java是一门面向对象的语言,但是java并非完全面向对象,Java中的数据类型分为了       ......
  • HttpClient的使用
    一般的情况下我们都是使用IE或者Navigator浏览器来访问一个WEB服务器,用来浏览页面查看信息或者提交一些数据等等。所访问的这些页面有的仅仅是一些普通的页面,有的需要用户登录后方可使用,或者需要认证以及是一些通过加密方式传输,例如HTTPS。目前我们使用的浏览器处理这些情况都不会......
  • Servlet源码
    顶级接口:Servlet:packagejavax.servlet;importjava.io.IOException;publicinterfaceServlet{publicvoidinit(ServletConfigconfig)throwsServletException;publicServletConfiggetServletConfig();publicvoidser......
  • 视频直播源码,动态合并element-ui el-table列和行
    视频直播源码,动态合并element-uiel-table列和行HTML: <template>  <div>    <el-table     :data="tableData"     show-summary     :span-method="arraySpanMethod"    style="width:100%">      <......
  • 【Netty底层数据交互源码】
    (文章目录)如何学习Netty的底层深入了解Netty的底层实现需要对JavaNIO、OSI模型、TCP/IP协议栈等底层网络知识有一定的了解。下面是一些建议,可以帮助你更深入地了解Netty的底层实现:学习JavaNIO:JavaNIO是Java中用于处理I/O操作的一套库。在深入了解Netty的底层实现时,你需要......