核心概念
topology拓扑组成
storm分布式计算结构称为topology拓扑,由stream(数据流)、spout(数据流生成者)、bolt(运算)组成。
tuple
storm的核心数据结构是tuple,tuple中包含了一个或者多个键值对的列表。
Stream由无限制的tuple组成的序列。
spout
spout代表数据入口,充当采集器的角色,链接到数据源,将数据转化为一个个tuple,并将tuple作为数据流进行发射。
spout通常不会实现业务逻辑,所以spout可以作为多个topology的入口。
bolt
bolt可以理解为计算程序中的运算或者函数,将一个或者多个数据流作为输入,对数据实施运算后,选择性的输出一个或者多个数据流。
bolt可以订阅多个有spout或者其他bolt发射的数据流,这样可以简历一个数据流转换网络。
bolt作用:
- 过滤tuple
- 链接或者聚合操作
- 计算
- 数据库读写
使用
spout
继承BaseRichSpout抽象类
open:获取collector,collector用于发送数据
nextTuple:实际执行发送数据逻辑,collector.emit(new Values(xxxxxx))
declareOutputFields:定义发送出的字段名称,下游根据字段名称获取数据,例如outputFieldsDeclarer.declare(new Fields("sentence"))
bolt
继承BaseRichBolt
prepare:获取collector,collector用于获取数据(collector.getStringByField("sentence"))、发送数据collector.emit(new Values(xxxx))
execute:实际执行运算逻辑或者继续发送tuple到下游,例如collector.emit(new Values(xxxxxx))
declareOutputFields:定义发送到下游bolt的字段名称,下游根据字段名称获取数据,例如outputFieldsDeclarer.declare(new Fields("word"))
cleanup:storm终止一个bolt时会调用cleanup方法,cleanup不保证一定会调用到,如果集群出现问题可能存在未调用cleanup的情况。
topology编排
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("spoutTest", new SentenceSpout(), 1);
topologyBuilder.setBolt("boltTest", new WordBolt()).shuffleGrouping("spoutTest", "sentence");
Config config = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("topologyTest", config, topologyBuilder.createTopology());
cluster.killTopology("topologyTest");
cluster.shutdown();