首页 > 其他分享 >Flink 消费 Kafka 数据后在写回 Kafka 示例

Flink 消费 Kafka 数据后在写回 Kafka 示例

时间:2022-08-28 07:22:07浏览次数:67  
标签:01 示例 Flink kafka topic split new hadoop103 Kafka

今天介绍一下 Flink从kafka 读取数据后,再将数据写回 kafka 的一个案例

示例代码

/**
 * 从一个 topic 读取数据,在写回另一个 topic  
 */
public class SinkToKafka0824 {
    public static void main(String[] args) throws Exception {
        //1、获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //测试环境设置并行度1,生产环境设置 topic 分区数
        env.setParallelism(1);

        //2、读取数据
        //2.1、kafka 配置西信息
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "hadoop103:9092");
        properties.setProperty("group.id", "consumer.group");
        //2.2、读取 kafka 数据
        DataStreamSource<String> lhcSDtream = env.addSource(new FlinkKafkaConsumer<String>("lhc", new SimpleStringSchema(), properties));
        //2.3、包装读取到的数据
        SingleOutputStreamOperator<String> map = lhcSDtream.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                String[] split = value.split(",");
                return new Event(split[0].trim(), split[1].trim(), Long.valueOf(split[2].trim())).toString();
            }
        });

        //3、写回 kafka
        map.addSink(new FlinkKafkaProducer<String>("hadoop103:9092", "tbg", new SimpleStringSchema()));
        //打印测试
        map.print();
        //执行任务
        env.execute();
    }
}

测试

启动生产者发生消息

[hui@hadoop103 ~]$ kafka-console-producer.sh --bootstrap-server hadoop103:9092 --topic lhc
>令狐冲,./home,1000
>任盈盈,./pro?id=1001,1000
>令狐冲,./cart,8000

启动一个消费者观察

[hui@hadoop103 ~]$ kafka-console-consumer.sh --bootstrap-server hadoop103:9092 --topic tbg
Event{user='令狐冲', url='./home', timestamp=1970-01-01 08:00:01.0}
Event{user='任盈盈', url='./pro?id=1001', timestamp=1970-01-01 08:00:01.0}
Event{user='令狐冲', url='./cart', timestamp=1970-01-01 08:00:08.0}

标签:01,示例,Flink,kafka,topic,split,new,hadoop103,Kafka
From: https://www.cnblogs.com/wdh01/p/16620337.html

相关文章

  • Kafka相关问题
    Kafka有哪几个部分组成生产者、消费者、topic、group、partitionkafka的group1)定义:即消费者组是Kafka提供的可扩展且具有容错性的消费者机制。在Kafka中,消费者组是一个......
  • Kafka的简单使用
    下面目的主要是进行简单测试kafka,比如在其他网络中已提供了IP和地址,进行kafka的读取,自已不想再写代码:环境要求:存在java环境:1、下载Kafka的程序https://kafka.apache.or......
  • kafka基础使用
    生产者/opt/kafka/bin/kafka-console-producer.sh--broker-listkafka.service.consul:9092--topic0bkmonitor_15026750消费者/opt/kafka/bin/kafka-console-consumer.......
  • flink cdc 使用
    flinkcdc使用目前cdc产品非常多,目前我使用canal,flinkcdc(集成debezium)二者对比相对来说flinkcdc更加强大,功能很多但是有很多坑,迭代速度很快,借助flink......
  • 10个快速入门Query函数使用的Pandas的查询示例
    转载:https://mp.weixin.qq.com/s/TJStQDtUfOOXtb__cpivDgpandas.的query函数为我们提供了一种编写查询过滤条件更简单的方法,特别是在的查询条件很多的时候,在本文中整理了1......
  • 微服务架构之服务注册与发现(Consul+gliderlabs/registrator示例)
    在微服务架构中,由于系统的拆分,通常会有很多的服务,而每个服务又可能因为横向扩展而部署在多台服务器上,当服务A需要调用服务B的接口时,服务A该如何知道服务B的主机地址就......
  • stream() 简单示例
    publicstaticvoidmain(String[]args){List<Integer>list=newArrayList<>();list.add(1);list.add(2);list.add(3);......
  • Flink出现network.partition.ProducerFailedException: java.lang.NullPointerExcepti
    一、错误日志org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:Erroratremotetaskmanager'xx.xxx.xxx.xxx/xxx.xxx.xxx.xxx:34750'......
  • 部署ELK及kafka日志收集k8s容器环境
    部署zookeeper      准备三个节点系统并安装jdk       结构图:官网下载地址:   https://zookeeper.apache.org/releases.html  安装JDK环......
  • Kafka重启出错:Corrupt index found
      今天发现一台kafkabroker宕掉,重启kafkabroker集群发现日志中报如下错误,查阅各种资料,解决问题如下一、出现的问题Foundacorruptedindexfileduetorequireme......