今天中午接着做大数据的实验
实验8
Flink初级编程实践
1.实验目的
(1)通过实验掌握基本的Flink编程方法。
(2)掌握用IntelliJ IDEA工具编写Flink程序的方法。
2.实验平台
(1)Ubuntu18.04(或Ubuntu16.04)。
(2)IntelliJ IDEA。
(3)Flink1.9.1。
3.实验步骤
(1)使用IntelliJ IDEA工具开发WordCount程序
在Linux系统中安装IntelliJ IDEA,然后使用IntelliJ IDEA工具开发WordCount程序,并打包成JAR文件,提交到Flink中运行。
WordCountData import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; public class WordCountData { public static final String[] WORDS=new String[]{"My name is LCZ, I am a college student studying at Shijiazhuang Railway University."}; public WordCountData() { } public static DataSet<String> getDefaultTextLineDataset(ExecutionEnvironment env){ return env.fromElements(WORDS); } } WordCountTokenizer import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; public class WordCountTokenizer implements FlatMapFunction<String, Tuple2<String,Integer>>{ public WordCountTokenizer(){} public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] tokens = value.toLowerCase().split("\\W+"); int len = tokens.length; for(int i = 0; i<len;i++){ String tmp = tokens[i]; if(tmp.length()>0){ out.collect(new Tuple2<String, Integer>(tmp,Integer.valueOf(1))); } } } } WordCount import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.AggregateOperator; import org.apache.flink.api.java.utils.ParameterTool; public class WordCount { public WordCount(){} public static void main(String[] args) throws Exception { ParameterTool params = ParameterTool.fromArgs(args); ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(params); Object text; if(params.has("input")){ text = env.readTextFile(params.get("input")); }else{ text = WordCountData.getDefaultTextLineDataset(env); } AggregateOperator counts = ((DataSet)text).flatMap(new WordCountTokenizer()).groupBy(new int[]{0}).sum(1); if(params.has("output")){ counts.writeAsCsv(params.get("output"),"\n", " "); env.execute(); }else{ counts.print(); } } }
(2)数据流词频统计
使用Linux系统自带的NC程序模拟生成数据流,不断产生单词并发送出去。编写Flink程序对NC程序发来的单词进行实时处理,计算词频,并把词频统计结果输出。要求首先在IntelliJ IDEA中开发和调试程序,然后,再打成JAR包部署到Flink中运行。
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; public class TongJi { public static void main(String[] args) throws Exception { //定义socket的端口号 int port; try { ParameterTool parameterTool = ParameterTool.fromArgs(args); port = parameterTool.getInt("port"); } catch (Exception e) { System.err.println("指定port参数,默认值为9000"); port = 9001; } //获取运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //连接socket获取输入的数据 DataStreamSource<String> text = env.socketTextStream("127.0.0.1", port, "\n"); //计算数据 DataStream<WordWithCount> windowCount = text.flatMap(new FlatMapFunction<String, WordWithCount>() { public void flatMap(String value, Collector<WordWithCount> out) throws Exception { String[] splits = value.split("\\s"); for (String word : splits) { out.collect(new WordWithCount(word, 1L)); } } })//打平操作,把每行的单词转为<word,count>类型的数据 .keyBy("word")//针对相同的word数据进行分组 .timeWindow(Time.seconds(2), Time.seconds(1))//指定计算数据的窗口大小和滑动窗口大小 .sum("count"); //把数据打印到控制台 windowCount.print() .setParallelism(1);//使用一个并行度 //注意:因为flink是懒加载的,所以必须调用execute方法,上面的代码才会执行 env.execute("streaming word count"); } /** * 主要为了存储单词以及单词出现的次数 */ public static class WordWithCount { public String word; public long count; public WordWithCount() { } public WordWithCount(String word, long count) { this.word = word; this.count = count; } @Override public String toString() { return "WordWithCount{" + "word='" + word + '\'' + ", count=" + count + '}'; } } } Pom.xml <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>TongJi</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.9.2</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>1.9.2</version> <!-- <scope>provided</scope>--> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.12</artifactId> <version>1.9.2</version> </dependency> </dependencies> </project>
标签:总结,每日,flink,12.25,api,import,apache,org,public From: https://www.cnblogs.com/louwangshayu/p/17926667.html