首页 > 其他分享 >storm任务示例

storm任务示例

时间:2023-06-06 13:04:26浏览次数:31  
标签:String 示例 backtype generated 任务 storm new import

LogProcess.java
package mytest;
 
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.util.Map;
 
import mytest.ThroughputTest.GenSpout;
 
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
 
public class LogProcess {
         public static class FileSpout extends BaseRichSpout {
 
                   /**
                    */
                   private static final long serialVersionUID = 1L;
                   private SpoutOutputCollector _collector;
                   private BufferedReader br;
                   private String dataFile;
                  
                   //定义spout文件
                   FileSpout(String dataFile){
                            this.dataFile = dataFile;
                   }
 
                   //定义如何读取spout文件
                   @Override
                   public void open(Map conf, TopologyContext context,
                                     SpoutOutputCollector collector) {
                            // TODO Auto-generated method stub
                            _collector = collector;
                            File csv = new File(dataFile); // log file
                            try {
                                     br = new BufferedReader(new FileReader(csv));
                            } catch (FileNotFoundException e) {
                                     // TODO Auto-generated catch block
                                     e.printStackTrace();
                            }
                   }
 
                   //获取下一个tuple的方法
                   @Override
                   public void nextTuple() {
                            // TODO Auto-generated method stub
                            try {
                                    
                                     String line = null;
                                     while ((line = br.readLine()) != null) {
                                               _collector.emit(new Values(line));
                                     }
                            } catch (FileNotFoundException e) {
                                     // TODO Auto-generated catch block
                                     e.printStackTrace();
                            } catch (IOException e) {
                                     // TODO Auto-generated catch block
                                     e.printStackTrace();
                            }
                   }
 
 
                   @Override
                   public void declareOutputFields(OutputFieldsDeclarer declarer) {
                            // TODO Auto-generated method stub
                            declarer.declare(new Fields("line"));
                   }
                  
         }
        
 
         public static class Process extends BaseRichBolt{
 
                   private String _seperator;
                   private String _outFile;
                   PrintWriter pw;
                   private OutputCollector _collector;
                   private BufferedWriter bw;
                  
                   public Process(String seperator,String outFile) {
                            this._seperator = seperator;
                            this._outFile   = outFile;
                           
                   }
                  
                   //把输出结果保存到外部文件里面。
                   @Override
                   public void prepare(Map stormConf, TopologyContext context,
                                     OutputCollector collector) {
                            // TODO Auto-generated method stub
                            this._collector = collector;
                            File out = new File(_outFile);
                            try {
//                                  br = new BufferedWriter(new FileWriter(out));
                                     bw = new BufferedWriter(new OutputStreamWriter( 
                             new FileOutputStream(out, true))); 
                            } catch (IOException e1) {
                                     // TODO Auto-generated catch block
                                     e1.printStackTrace();
                            }                
                   }
                  
                   //blot计算单元,把tuple中的数据添加一个bkeep和回车。然后保存到outfile指定的文件中。
                   @Override
                   public void execute(Tuple input) {
                            // TODO Auto-generated method stub
                            String line = input.getString(0);
//                         System.out.println(line);
                            String[] str = line.split(_seperator);
                            System.out.println(str[2]);
                            try {
                                     bw.write(str[2]+",bkeep"+"\n");
                                     bw.flush();
                            } catch (IOException e) {
                                     // TODO Auto-generated catch block
                                     e.printStackTrace();
                            }
                           
                            _collector.emit(new Values(line));
                   }
 
                   @Override
                   public void declareOutputFields(OutputFieldsDeclarer declarer) {
                            // TODO Auto-generated method stub
                            declarer.declare(new Fields("line"));
                   }
                  
         }
        
         public static void main(String[] argv) throws AlreadyAliveException, InvalidTopologyException{
                   String dataFile = argv[0]; //输入文件
                   String seperator = argv[1];      //分隔符
                   String outFile   = argv[2]; //输出文件
                   boolean distribute = Boolean.valueOf(argv[3]);       //本地模式还是集群模式
                   TopologyBuilder builder = new TopologyBuilder();  //build一个topology
        builder.setSpout("spout", new FileSpout(dataFile), 1);   //指定spout
        builder.setBolt("bolt", new Process(seperator,outFile),1).shuffleGrouping("spout");  //指定bolt,包括bolt、process和grouping
        Config conf = new Config();
        if(distribute){
            StormSubmitter.submitTopology("LogProcess", conf, builder.createTopology());
        }else{
                 LocalCluster cluster = new LocalCluster();
                 cluster.submitTopology("LogProcess", conf, builder.createTopology());
        }
         }       
}

 

运行

[admin@vkvm161064 guandao]$ pwd
/home/admin/guandao
[admin@vkvm161064 guandao]$ ls
out.txt  storm-starter-0.0.1-SNAPSHOT-jar-with-dependencies.jar  tmp.txt

 

输入文件:

[admin@vkvm161064 guandao]$ cat tmp.txt

a,b,c,d

1,2,3,4

A,B,C,D

xx,ff,ff,ss

xx,ff,alibaba,ss

xx,ff,taobao,ss

xx,xx,xx,xx

xx,xx,ll,xx

xx,xx,hero,xx

 

输出文件:

[admin@vkvm161064 guandao]$ cat out.txt

c,bkeep

3,bkeep

C,bkeep

ff,bkeep

alibaba,bkeep

taobao,bkeep

xx,bkeep

ll,bkeep

hero,bkeep

 

提交topology

[admin@vkvm161064 guandao]$ storm jar ./storm-starter-0.0.1-SNAPSHOT-jar-with-dependencies.jarSHOT-jar-with-dependencies.jar mytest.LogProcess /home/admin/guandao/tmp.txt , /home/admin/guandao/out.txt fase

 

语法:storm  jar 自己开发的topology   topology_name  inputfile 分隔符   outputfile  true/false(true代表集群运行)

标签:String,示例,backtype,generated,任务,storm,new,import
From: https://blog.51cto.com/u_2650279/6423988

相关文章

  • Storm-源码分析-Topology Submit-Client
    1StormClient最开始使用storm命令来启动topology,如下stormjarstorm-starter-0.0.1-SNAPSHOT-standalone.jarstorm.starter.WordCountTopology这个storm命令是用python实现的,看看其中的jar函数,很简单,调用exec_storm_class,其中jvmtype=”-client” 而exec_storm_clas......
  • 时间管理是一项重要的技能,对于有效地达成目标和提高生产力至关重要。当我们面临许多任
    时间管理是一项重要的技能,对于有效地达成目标和提高生产力至关重要。当我们面临许多任务时,如何有效地处理这些任务,就需要使用优先级排序技术来帮助我们。以下是几种有效的优先级排序技术:Eisenhower矩阵法Eisenhower矩阵法将任务划分为四个象限,分别为:重要且紧急、重要但不紧......
  • Vuex的五个属性及使用方法示例
    一、Vuex简介Vuex是Vue.js的状态管理库,它通过中心化的状态管理使得组件间的数据共享更加容易。Vuex包含五个核心属性:state、getters、mutations、actions和modules。Vuex是Vue.js的状态管理库,它提供了一种集中式存储管理应用程序中所有组件的状态,并将其分离到一个可预测的状态容器......
  • 定时执行的任务Quartz.net
          ......
  • Linux基础24 定时任务, 发邮件, date命令
    date命令[root@localhost~]#date2023年06月01日星期四00:57:36CST[root@localhost~]#date+%F2023-06-01[root@localhost~]#date+%F-%T2023-06-01-01:17:37[root@localhost~]#date+%Y2023[root@localhost~]#date+%m06[root@localhost~]#date+%d......
  • 使用定时任务+脚本方式对nginx进行日志切分
    使用定时任务+脚本方式对nginx进行日志切分nginx路径:/home/nf/nginxnginx日志路径:/home/nf/nginx/logs切分目标文件:access.logerror.log创建脚本: 1vim/home/nf/cut_nginx_log.sh 1#!/bin/bash2#nginxlogpath3LOGS_PATH=/home/nf/nginx/logs4YESTERDAY=$(d......
  • 【IDE】WebStorm 调整Tab缩进为2空格 -- 为遵循ESLint语法规范
    在使用Vue开发项目的过程中,为了遵循ESLint语法规范,我们需要把Tab缩进改为2个空格IDEversionWebStorm2018.3步骤一修改这三处的值为:2步骤二把这两处默认的勾选去掉,不让其detection当前文件的Tab缩进注意!通过上面两个步骤,细心的同学会发现,我们只是改变了在JS文件的Tab缩进改为2个空格......
  • hibernate annotion多对多关系示例
    实体之间是多对多的关系,如图:错误的实体代码如下JAVA代码:@ManyToMany(cascade=CascadeType.PERSIST,fetch=FetchType.EAGER)@JoinTable(name=......
  • SpringBoot中的定时任务的同步与异步
    SpringBoot中的定时任务的同步与异步你确定真的知道?授人以渔Java领域;架构知识;面试心得;互联网行业最新资讯定时任务调度功能在我们的开发中是非常常见的,随便举几个例子:定时清除一些过期的数据,定时发送邮件等等,实现定时任务调度的方式也十分多样,本篇文章主要学习各种实现定时任务......
  • C语言多线程爬虫代码示例
    使用C语言编写多线程爬虫能够同时处理多条数据,提高了爬虫的并发度和效率。在编写多线程爬虫时仍需要注意线程安全性和错误处理机制,并根据系统资源和目标网站的特点调整线程数和优化并发策略,以提高程序效率和稳定性。以下是一个使用C语言多线程编写的简单爬虫示例,实现了并发爬取多......