首页 > 其他分享 >【flink学习系列1】sink文件到kafka

【flink学习系列1】sink文件到kafka

时间:2022-10-30 20:23:17浏览次数:53  
标签:flink kafka sink env org apache import TODO

import java.util.Properties;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

/**
 * Desc 演示DataStream-Sink-基于控制台和文件
 */
public class SinkDemoFileToKafka {
    public static void main(String[] args) throws Exception {
        //TODO 0.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //TODO 1.source
        DataStream<String> ds = env.readTextFile("C:\\Users\\K21\\Desktop\\temp\\1200.unl");

        //TODO 2.transformation
        
        //TODO 3.sink
        Properties props2 = new Properties();
        props2.setProperty("bootstrap.servers", "192.168.78.203:9092,192.168.78.204:9092,192.168.78.205:9092");
        FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>("FileToKafka", new SimpleStringSchema(), props2);
        ds.addSink(kafkaSink).setParallelism(4);

        //TODO 4.execute
        env.execute();
    }
}

 

标签:flink,kafka,sink,env,org,apache,import,TODO
From: https://www.cnblogs.com/qsds/p/16842132.html

相关文章

  • Kafka简明教程(三)Kafka‘s Ecosystem
    ​​LinkedinBurrow​​:Burrow是linkedin开源的一个监控ApacheKafka的工具,burrow可以将消费者滞后检查作为一项服务来对外提供。它监视所有消费者的承诺偏移量,并根据需要......
  • kafka常用命令
    相关概念Brokerkafka节点,多个broker组成kafka集群。Topic即主题,kafka通过Topic对消息进行分类,发布到kafka的消息都需要指定Topic。Producer即消息生产者,向Broker发送......
  • flink开发环境执行sql及生产环境提交sql文件
    flink提供了sql-client.sh工具可直接操作sql,该工具一般在开发环境用于调试,在生产环境还是要打成jar文件。为了避免在java文件中写大量sql,我们可以将sql提取出来放到一......
  • Flink通过Side Outputs侧输出流处理迟到数据(Trigger、设置水位线延迟时间、允许窗口
    前言:迟到数据,是指在watermark之后到来的数据,事件时间在水位线之前。所以只有在事件时间语义下,讨论迟到数据的处理才有意义。对于乱序流,可以设置一个延迟时间;对于窗口计......
  • Kafka的启动与关闭
    以Windows系统为例启动zookeeper打开cmd并进入kafka的安装目录cdC:\BigData\kafka_2.13-3.3.1启动zookeeper并指定启动的配置文件bin\windows\zookeeper-server-st......
  • Apache Flink 流计算基准测试框架
    每一种引擎有其优势的地方,如何选择适合自己业务的流计算引擎成了一个由来已久的话题。除了比较各个引擎提供的不同的功能矩阵之外,性能是一个无法绕开的评估因素。基准测试(b......
  • Flink性能测试case案例
    在我们做测试之前,调研了一些已有的大数据平台性能测试报告,比如,雅虎的Streaming-benchmarks,或者Intel的HiBench等等。除此之外,还有很多的论文也从不同的角度对分布式计算平......
  • Kafka 为什么那么快?
    有人说:他曾在一台配置较好的机子上对​​Kafka​​​进行性能压测,压测结果是​​Kafka​​​单个节点的极限处理能力接近每秒​​2000万​​​条消息,吞吐量达到每秒......
  • zookeeper和kafka安装
    系统:CentOSLinux7(Core)x86_64一、JDK安装1.安装准备#查看系统jdk版本java-version#检查jdk自带安装包rpm-qa|grepjava#卸载jdkyum-yremovetzdat......
  • kafka 从入门到精通2 、 创建kafka 生产者与消费者实例
    上一篇:​​kafka单机版和分布式版安装​​首先创建一个生产者:packageorg.training.hadoop.kafka;importorg.apache.kafka.clients.producer.KafkaProducer;importorg.a......