首页 > 其他分享 >第2章 Flink快速上手

第2章 Flink快速上手

时间:2022-10-24 23:12:17浏览次数:52  
标签:flink word String Flink env 快速 log4j Types

Wordcount在大数据中有点像Hello World,当我们输出Hello World的时候,就说明程序执行成功了,同样在大数据项目中如果成功的统计出了文本或者socket流中的单词数量,也相当于成功运行了第一个项目。flink是一个流批一体的计算引擎,所以wordcount分为两种,从文本或者其它存储中读取的批处理和从socket读取的流处理wordcount。

项目使用maven进行项目管理,首先编写pom文件,添加下面两项依赖。

<properties>
    <maven.compiler.source>11</maven.compiler.source>
    <maven.compiler.target>11</maven.compiler.target>
    <flink.version>1.13.0</flink.version>
    <scala.binary.version>2.12</scala.binary.version>
    <slf4j.version>1.7.30</slf4j.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>${slf4j.version}</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>${slf4j.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-to-slf4j</artifactId>
        <version>2.14.0</version>
    </dependency>
</dependencies>

最好也添加flink-clients的包,否则可能会报错。

添加依赖后发现在第二个依赖中有2.12的后缀,flink使用Java编写,但是他依赖的底层使用的是akka进行通信,akka使用Scala进行编写,这里的2.12是指Scala的版本。

Spark原先使用akka进行通信,后来从Spark1.3.1版本开始,为了解决大块数据(如Shuffle)的传输问题,Spark引入了Netty通信框架,到了1.6.0版本,Netty完全取代了Akka,承担Spark内部所有的RPC通信以及数据流传输。

配置日志管理,减少不必要的日志的输出

log4j.rootLogger=error, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

批处理WordCount

public class BatchWordCount {
    public static void main(String[] args) throws Exception {
        //1.创建执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //2.从文件读取数据 按行读取 (存储的元素就是每行的文本)
        DataSource<String> lineDS = env.readTextFile("input/word.txt");
        //3.转换数据格式
        FlatMapOperator<String, Tuple2<String, Long>> wordAndOne = lineDS
                .flatMap(
                        (String line, Collector<Tuple2<String, Long>> out) -> {
                            String[] words = line.split(" ");
                            for (String word : words) {
                                out.collect(Tuple2.of(word, 1L));
                            }
                        }
                )
                //当 Lambda 表达式使用 Java 泛型的时候, 由于泛型擦除的存在, 需要显示的声明类型信息
                .returns(Types.TUPLE(Types.STRING, Types.LONG));
        //4.按照word进行分组
        UnsortedGrouping<Tuple2<String, Long>> wordAndOneUG = wordAndOne.groupBy(0);
        //5.分组内聚合统计
        AggregateOperator<Tuple2<String, Long>> sum = wordAndOneUG.sum(1);
        //6.打印结果
        sum.print();
    }
}

Flink本身是流批一体的处理框架,批量的数据集本质上也是流,没有必要用两套不同的API来实现,可以直接使用DataStream API,提交任务时通过将执行模式设为Batch来进行批处理。

$ bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar

流处理WordCount-读取文件

public class BoundedStreamWordCount {
    public static void main(String[] args) throws Exception {
        //1.创建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.读取文件
        DataStreamSource<String> lineDSS = env.readTextFile("input/word.txt");
        //3.转换数据格式
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = lineDSS
                .flatMap(
                        (String line, Collector<String> words) -> {
                            Arrays.stream(line.split(" ")).forEach(words::collect);
                        })
                .returns(Types.STRING)
                .map(word -> Tuple2.of(word, 1L))
                .returns(Types.TUPLE(Types.STRING, Types.LONG));
        // 4. 分组
        KeyedStream<Tuple2<String, Long>, String> wordAndOneKS = wordAndOne
                .keyBy(t -> t.f0);
        // 5. 求和
        SingleOutputStreamOperator<Tuple2<String, Long>> result = wordAndOneKS
                .sum(1);
        // 6. 打印
        result.print();
        // 7. 执行
        env.execute();
    }
}

流处理WordCount-读取文本流

public class StreamWordCount {
    public static void main(String[] args) throws Exception {
        // 1. 创建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2. 读取文本流
        DataStreamSource<String> lineDSS = env.socketTextStream("hadoop200", 7777);
        // 3. 转换数据格式
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = lineDSS.flatMap(
                        (String line, Collector<String> word) -> {
                            Arrays.stream(line.split(" ")).forEach(word::collect);
                        }
                )
                .returns(Types.STRING)
                .map(word -> Tuple2.of(word, 1L))
                .returns(Types.TUPLE(Types.STRING, Types.LONG));
        // 4. 分组
        KeyedStream<Tuple2<String, Long>, String> wordAndOneKS = wordAndOne
                .keyBy(t -> t.f0);
        // 5. 求和
        SingleOutputStreamOperator<Tuple2<String, Long>> result = wordAndOneKS
                .sum(1);
        // 6. 打印
        result.print();
        // 7. 执行
        env.execute();
        //        //需要在命令行输入参数  --host localhost --port 7777
        //        ParameterTool param = ParameterTool.fromArgs(args);
        //        String host = param.get("host");
        //        int port = param.getInt("port");
        //        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //        DataStream<String> stream = env.socketTextStream(host, port);
        //        DataStream<Tuple2<String, Integer>> resultStream = stream.flatMap(new BatchWordCount.MyFlatMapFunction())
        //                .keyBy(0)
        //                .sum(1);
        //        resultStream.print();
        //        env.execute();
    }
}

测试——在Linux中通过netcat命令进行发送测试。

nc -lk 7777

标签:flink,word,String,Flink,env,快速,log4j,Types
From: https://www.cnblogs.com/hanxuefeng/p/16823399.html

相关文章