Wordcount在大数据中有点像Hello World,当我们输出Hello World的时候,就说明程序执行成功了,同样在大数据项目中如果成功的统计出了文本或者socket流中的单词数量,也相当于成功运行了第一个项目。flink是一个流批一体的计算引擎,所以wordcount分为两种,从文本或者其它存储中读取的批处理和从socket读取的流处理wordcount。
项目使用maven进行项目管理,首先编写pom文件,添加下面两项依赖。
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<flink.version>1.13.0</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<slf4j.version>1.7.30</slf4j.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version>
</dependency>
</dependencies>
最好也添加flink-clients的包,否则可能会报错。
添加依赖后发现在第二个依赖中有2.12的后缀,flink使用Java编写,但是他依赖的底层使用的是akka进行通信,akka使用Scala进行编写,这里的2.12是指Scala的版本。
Spark原先使用akka进行通信,后来从Spark1.3.1版本开始,为了解决大块数据(如Shuffle)的传输问题,Spark引入了Netty通信框架,到了1.6.0版本,Netty完全取代了Akka,承担Spark内部所有的RPC通信以及数据流传输。
配置日志管理,减少不必要的日志的输出
log4j.rootLogger=error, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
批处理WordCount
public class BatchWordCount {
public static void main(String[] args) throws Exception {
//1.创建执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//2.从文件读取数据 按行读取 (存储的元素就是每行的文本)
DataSource<String> lineDS = env.readTextFile("input/word.txt");
//3.转换数据格式
FlatMapOperator<String, Tuple2<String, Long>> wordAndOne = lineDS
.flatMap(
(String line, Collector<Tuple2<String, Long>> out) -> {
String[] words = line.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1L));
}
}
)
//当 Lambda 表达式使用 Java 泛型的时候, 由于泛型擦除的存在, 需要显示的声明类型信息
.returns(Types.TUPLE(Types.STRING, Types.LONG));
//4.按照word进行分组
UnsortedGrouping<Tuple2<String, Long>> wordAndOneUG = wordAndOne.groupBy(0);
//5.分组内聚合统计
AggregateOperator<Tuple2<String, Long>> sum = wordAndOneUG.sum(1);
//6.打印结果
sum.print();
}
}
Flink本身是流批一体的处理框架,批量的数据集本质上也是流,没有必要用两套不同的API来实现,可以直接使用DataStream API,提交任务时通过将执行模式设为Batch来进行批处理。
$ bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar
流处理WordCount-读取文件
public class BoundedStreamWordCount {
public static void main(String[] args) throws Exception {
//1.创建流式执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.读取文件
DataStreamSource<String> lineDSS = env.readTextFile("input/word.txt");
//3.转换数据格式
SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = lineDSS
.flatMap(
(String line, Collector<String> words) -> {
Arrays.stream(line.split(" ")).forEach(words::collect);
})
.returns(Types.STRING)
.map(word -> Tuple2.of(word, 1L))
.returns(Types.TUPLE(Types.STRING, Types.LONG));
// 4. 分组
KeyedStream<Tuple2<String, Long>, String> wordAndOneKS = wordAndOne
.keyBy(t -> t.f0);
// 5. 求和
SingleOutputStreamOperator<Tuple2<String, Long>> result = wordAndOneKS
.sum(1);
// 6. 打印
result.print();
// 7. 执行
env.execute();
}
}
流处理WordCount-读取文本流
public class StreamWordCount {
public static void main(String[] args) throws Exception {
// 1. 创建流式执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 读取文本流
DataStreamSource<String> lineDSS = env.socketTextStream("hadoop200", 7777);
// 3. 转换数据格式
SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = lineDSS.flatMap(
(String line, Collector<String> word) -> {
Arrays.stream(line.split(" ")).forEach(word::collect);
}
)
.returns(Types.STRING)
.map(word -> Tuple2.of(word, 1L))
.returns(Types.TUPLE(Types.STRING, Types.LONG));
// 4. 分组
KeyedStream<Tuple2<String, Long>, String> wordAndOneKS = wordAndOne
.keyBy(t -> t.f0);
// 5. 求和
SingleOutputStreamOperator<Tuple2<String, Long>> result = wordAndOneKS
.sum(1);
// 6. 打印
result.print();
// 7. 执行
env.execute();
// //需要在命令行输入参数 --host localhost --port 7777
// ParameterTool param = ParameterTool.fromArgs(args);
// String host = param.get("host");
// int port = param.getInt("port");
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// DataStream<String> stream = env.socketTextStream(host, port);
// DataStream<Tuple2<String, Integer>> resultStream = stream.flatMap(new BatchWordCount.MyFlatMapFunction())
// .keyBy(0)
// .sum(1);
// resultStream.print();
// env.execute();
}
}
测试——在Linux中通过netcat命令进行发送测试。
nc -lk 7777
标签:flink,word,String,Flink,env,快速,log4j,Types
From: https://www.cnblogs.com/hanxuefeng/p/16823399.html