首页 > 编程语言 >Flink Java Demo

Flink Java Demo

时间:2024-01-17 19:27:25浏览次数:32  
标签:Flink Java Demo flink streaming api import apache org

1.新建Maven项目

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>cn.coreqi</groupId>
        <artifactId>Flink_HS</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>

    <artifactId>FlinkTutorial</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.5.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers combine.children="append">
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>1.18.0</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>1.18.0</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>
</project>

2.编写代码

package cn.coreqi;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class WordCountStreamUnboundedDemo {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 读取socket数据
        DataStreamSource<String> socketDS = env.socketTextStream("192.168.58.130", 7878);

        // 处理数据
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordToOne = socketDS.flatMap((String s, Collector<Tuple2<String, Integer>> collector) -> {
            // 按照空格切分单词
            String[] words = s.split(" ");
            for (String word : words) {
                Tuple2<String, Integer> wordTuple2 = Tuple2.of(word, 1);
                // 使用 Collector 向下游发送数据
                collector.collect(wordTuple2);
            }
        }).returns(Types.TUPLE(Types.STRING,Types.INT));

        KeyedStream<Tuple2<String, Integer>, String> wordToOneKS = wordToOne.keyBy((Tuple2<String, Integer> stringIntegerTuple2) -> stringIntegerTuple2.f0);

        SingleOutputStreamOperator<Tuple2<String, Integer>> sumDS = wordToOneKS.sum(1);

        //输出数据
        sumDS.print();
        // 执行
        env.execute();
    }
}

3.推送端运行netcat

nc -lp 7878

4.运行项目,进行测试

依赖配置为<scope>provided</scope>,因此需要进行如下配置
image
image

5.Maven打包[略]

标签:Flink,Java,Demo,flink,streaming,api,import,apache,org
From: https://www.cnblogs.com/fanqisoft/p/17970828

相关文章

  • Java异步编程详解
    在现代应用程序开发中,异步编程变得越来越重要,特别是在处理I/O密集型任务时。Java提供了一套强大的异步编程工具,使得开发者能够更有效地处理并发任务。本篇博文将深入探讨Java中异步编程的方方面面,通过具体例子详细说明异步编程的实践。异步编程的背景在传统的同步编程模型中,任务......
  • 利用javascript获取并修改伪元素的值
    HEAD中添加style标签强制覆盖初始属性这个方法是利用内部css样式的高优先级来覆盖外部css,好处是简单易理解,实现简单。坏处就是吃相太难看,过于粗暴。varstyle=document.createElement('style');style.innerHTML=".test::before{color:green}";//添加样式内容的话也可以用上面提......
  • java继承
    java三大特征:封装、继承、多态封装:对象代表什么就得封装什么样的数据,并提供数据对应的行为。继承java中提供了一个extends关键字,用这个关键字可以让一个类和另外一个类建立起继承关系publicclassStudebtextendsPerson(){}例如上段代码中,Student类就叫子类(派生类),Person......
  • 【从零开始重学Java】第13天 Java网络功能
    前情提示从零开始重学Java第0天从零开始重学Java第1天Java概述从零开始重学Java第2天标识符和数据类型从零开始重学Java第3天表达式和流程控制语句从零开始重学Java第4天数组、向量和字符串从零开始重学Java第5天对象和类从零开始重学Java第6天异常从零开始......
  • 多模块之间的循环依赖:java: Annotation processing is not supported for module cycl
    问题描述java:Annotationprocessingisnotsupportedformodulecycles.Pleaseensurethatallmodulesfromcycle[BDCloud-business,BDCloud-admin]areexcludedfromannotationprocessing  本质:BDCloud-admin模块为主启动模块,其包含了BDCloud-business模块;但在......
  • Java HttpClient 实战 GET 与 POST 请求一网打尽
    使用JavaHttpClient进行HTTP请求在Java中,HttpClient是进行HTTP通信的一个强大工具。它提供了简单而灵活的API,可以轻松地发送HTTP请求并处理响应。在本篇博文中,我们将深入探讨如何使用HttpClient执行GET、POST等不同类型的HTTP请求。1.引入依赖首先,确保在项目的pom.xml文件中......
  • java代码里如何判断某个IP/域名是否可达?
    在Java中,你可以使用java.net.InetAddress类来实现ping某个IP地址是否可达。下面是一个简单的示例代码:importjava.net.InetAddress;importjava.io.IOException;publicclassPingExample{publicstaticvoidmain(String[]args){StringipAddress="你的......
  • JavaGuide 设计模式
    JavaGuide设计模式1.软件设计原则设计原则名称简单定义开闭原则对扩展开放,对修改关闭单一职责原则一个类只负责一个功能领域中的相应职责里氏替换原则所有引用基类的地方必须能透明地使用其子类的对象依赖倒置原则依赖于抽象,不能依赖于具体实现接......
  • Java Collections.frequency()方法返回集合中指定元素个数
    JavaCollections.frequency()方法具有什么功能呢?下文笔者讲述Collections.frequency()方法的功能简介说明,如下所示:Collections.frequency()方法的功能:返回一个int值,其值给指定对象在集合中出现的次数Collections.frequency()方法的语法publicstaticintfreque......
  • Java 秘钥对相关操作
    生成JKS(JavaKeyStore)文件keytool-genkeypair-keystoremercury.jks-keyalgRSA-validity180-aliasmercury参数说明keytool:这是JavaKeytool工具,用于管理密钥和证书。-genkeypair:指示Keytool生成一个密钥对(公钥和私钥)。-aliasmercury:设置密钥对的别......