- 创建 maven 工程,pom 文件如下:
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.0.3</version>
<scope>provided</scope>
</dependency>
<!-- 与jdbc集成 -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-jdbc</artifactId>
<version>1.0.3</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.43</version>
</dependency>
</dependencies>
- Spout 任务代码如下:
package storm;
import java.util.Map;
import java.util.Random;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
public class WordCountSpout extends BaseRichSpout {
private static final long serialVersionUID = 1571765705181254611L;
// 模拟数据
private String[] data = {"I love Beijing", "I love China", "Beijing is the capital of China"};
// 用于往下一个组件发送消息
private SpoutOutputCollector collector;
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
public void nextTuple() {
Utils.sleep(3000);
// 由Strom框架调用,用于接收外部数据源的数据
int random = (new Random()).nextInt(3);
String sentence = data[random];
// 发送数据
System.out.println("发送数据:" + sentence);
this.collector.emit(new Values(sentence));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence"));
}
}
- 用于分词的 Bolt 任务代码如下:
package storm;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
public class WordCountSplitBolt extends BaseRichBolt {
private static final long serialVersionUID = -7399165475264468561L;
private OutputCollector collector;
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
public void execute(Tuple tuple) {
String sentence = tuple.getStringByField("sentence");
// 分词
String[] words = sentence.split(" ");
for (String word : words) {
this.collector.emit(new Values(word, 1));
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
- 用于计数的 Bolt 任务:
package storm;
import java.util.HashMap;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
public class WordCountBoltCount extends BaseRichBolt {
private static final long serialVersionUID = -3206516572376524950L;
private OutputCollector collector;
private Map<String, Integer> result = new HashMap<String, Integer>();
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
public void execute(Tuple tuple) {
String word = tuple.getStringByField("word");
int count = tuple.getIntegerByField("count");
if (result.containsKey(word)) {
result.put(word, result.get(word) + count);
} else {
result.put(word, 1);
}
// 直接输出到屏幕
System.out.println("输出的结果是:" + result);
// 将统计结果插入到数据库中
this.collector.emit(new Values(word, result.get(word)));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "total"));
}
}
- 用于连接的 ConnectionProvider 的代码如下:
package jdbc;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import org.apache.storm.jdbc.common.ConnectionProvider;
public class MyConnectionProvider implements ConnectionProvider {
private static final long serialVersionUID = -4784999115987415445L;
private static String driver = "com.mysql.jdbc.Driver";
private static String url = "jdbc:mysql://qujianlei:3306/storm";
private static String user = "root";
private static String password = "123";
static {
try {
Class.forName(driver);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
public Connection getConnection() {
try {
return DriverManager.getConnection(url, user, password);
} catch (SQLException e) {
e.printStackTrace();
}
return null;
}
public void prepare() {
}
public void cleanup() {
}
}
- 获取 JdbcBolt 的工具类如下:
package jdbc;
import org.apache.storm.jdbc.bolt.JdbcInsertBolt;
import org.apache.storm.jdbc.common.ConnectionProvider;
import org.apache.storm.jdbc.mapper.JdbcMapper;
import org.apache.storm.jdbc.mapper.SimpleJdbcMapper;
import org.apache.storm.topology.IRichBolt;
public class JdbcBoltUtils {
public static IRichBolt createJDBCBolt() {
ConnectionProvider connectionProvider = new MyConnectionProvider();
JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper("result", connectionProvider);
return new JdbcInsertBolt(connectionProvider, simpleJdbcMapper).
withTableName("result").withQueryTimeoutSecs(30);
}
}
注:result 为表的名字,共有两个字段:word, total
- Topology 的代码如下:
package storm;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import jdbc.JdbcBoltUtils;
public class WordCountTopology {
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
// 设置任务的spout组件
builder.setSpout("wordcount_spout", new WordCountSpout());
// 设置任务的第一个bolt组件
builder.setBolt("wordcount_splitbolt", new WordCountSplitBolt()).
shuffleGrouping("wordcount_spout");
// 设置任务的第二个bolt组件
builder.setBolt("wordcount_count", new WordCountBoltCount()).
fieldsGrouping("wordcount_splitbolt", new Fields("word"));
// 设置任务的第三个bolt组件将数据持久化到mysql
builder.setBolt("wordcount_jdbcBolt", JdbcBoltUtils.createJDBCBolt()).
shuffleGrouping("wordcount_count");
// 创建Topology任务
StormTopology wc = builder.createTopology();
Config config = new Config();
// 提交到本地运行
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("mywordcount", config, wc);
// 提交任务到Storm集群运行
// StormSubmitter.submitTopology(args[0], config, wc);
}
}
- 右击,运行即可(注:Eclipse 要以管理员身份启动)。