首页 > 其他分享 >springboot+Flink 接收、处理数据20220919

springboot+Flink 接收、处理数据20220919

时间:2022-09-19 17:56:27浏览次数:72  
标签:20220919 Flink springboot void flink org apache import public

 

1、pom.xml

<dependencies>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.10.0</version>
    <scope>provided</scope>
  </dependency>
  <dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-streaming-java_2.11</artifactId>
   <version>1.10.0</version>
   <scope>provided</scope>
  </dependency>
  <dependency>
   <groupId>org.projectlombok</groupId>
   <artifactId>lombok</artifactId>
   <version>1.18.24</version>
  </dependency>
  <dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
   <version>1.10.0</version>
   <scope>provided</scope>
  </dependency> 
  <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
  </dependency>
</dependencies>


2、MySlink
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.kafka.common.config.Config;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

@Slf4j
class MySlink extends RichSinkFunction<String> {
  private AnnotationConfigApplicationContext context;
    public MySlink() {
      log.info("MySlink new");
    }

  @Override
  public void open(Configuration parameters) throws Exception {
    this.context=new AnnotationConfigApplicationContext(Config.class);
    log.info("MySlink open");
  }

  @Override
  public void invoke(String value, Context context) {
    log.info("调用了invoke方法"+value);
  }

  @Override
  public void close() throws Exception {
    context.close();
    log.info("MySlink close");
  }
}

 

3、Runner

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

@Component
public class Runner implements CommandLineRunner {
  @Override
  public void run(String... args) throws Exception {
    StreamExecutionEnvironment environment=StreamExecutionEnvironment.getExecutionEnvironment();
    DataStream<String> stream=environment.addSource(new SourceFunction<String>() {
      @Override
      public void run(SourceContext<String> sourceContext) throws Exception {
        long c=0;
        while (true){
          sourceContext.collect("test"+c++);
          Thread.sleep(3000);
        }
      }

      @Override
      public void cancel() {

      }
    });


  stream.addSink(new MySlink());
  environment.execute("spring flink 20220919...........");


  }
}

 

4、Flink2034Application

@SpringBootApplication
public class Flink2034Application {

  public static void main(String[] args) {
    SpringApplication.run(Flink2034Application.class, args);
  }

}

 

 

 

 

 

标签:20220919,Flink,springboot,void,flink,org,apache,import,public
From: https://www.cnblogs.com/smallfa/p/16708509.html

相关文章

  • 20220919
    计算机网络网络网络(Network)由若干结点(Node)和连接这些结点的链路(Link)组成。internet与Internet的区别internet(互联网)是一个通用名词,泛指有多个计算机网络互联而......
  • 【Azure 事件中心】Flink消费Event Hub中事件, 使用Azure默认示例代码,始终获取新产生
    问题描述根据AzureEventHub示例文档,[将ApacheFlink与适用于ApacheKafka的Azure事件中心配合使用],配置好 consumer.config文件后,为什么不能自动消费EventHub......
  • springboot拦截器
    packagecom.module.interceptor;importlombok.Data;importlombok.extern.slf4j.Slf4j;importorg.springframework.context.annotation.Configuration;importorg......
  • springboot内置tomcat配置本地文件夹的映射路径
    例如要访问的本地路径是D盘下的PersonalHomePage目录的某个图片1importorg.springframework.context.annotation.Configuration;2importorg.springframework.web.......
  • springboot中解析JSON参数
    解析psot请求中的JSON参数Map<String,String>attrMap=newHashMap<String,String>();BufferedReaderstreamReader=null;try{streamReader=newBufferedRead......
  • Java【SpringBoot】——添加测试依赖
    在pom.xml添加依赖1<dependency>2<groupId>org.springframework.boot</groupId>3<artifactId>spring-boot-starter-test</artifactId>......
  • springboot集成mybatis获取插入数据的主键
    问题:我们想在插入一条数据后同时能够返回这条数据在表中的id,Mybatis提供了@SelectKey注解。student为数据表,主键自增SelectKey的四个属性:selectKey会将SELECTLAS......
  • SpringBoot集成Mybatis 实现InsertOrUpdate功能
    需求场景在项目开发过程中,难免会遇到这样的场景:对一张表,当数据不存在的时候,进行insert插入操作;数据存在的时候,进行update更新操作;下面就来使用Mybatis的InsertOrUpdate功......
  • springboot代码生成器
    一、使用springboot+mybatisplus+swagger完成如下操作1、创建数据库表如下channel字段名称中文类型长度主键外键自增约束cid栏目id......
  • springboot Condition 动态value
    packagecom.example.demo.condtion;importorg.springframework.context.annotation.Conditional;importjava.lang.annotation.*;@Target({ElementType.TYPE,Ele......