首页 > 其他分享 >SparkStreaming 连接 Kafka数据源

SparkStreaming 连接 Kafka数据源

时间:2024-01-15 21:35:45浏览次数:31  
标签:kafka 数据源 kafkaParams SparkStreaming org apache import spark Kafka

本文的前提条件: SparkStreaming in Java
参考地址:Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)

1.添加POM依赖

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.16.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.13</artifactId>
            <version>3.5.0</version>
        </dependency>

2.使用

package cn.coreqi;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.kafka.common.serialization.StringDeserializer;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

public class Main {
    public static void main(String[] args) throws InterruptedException {
        // 创建SparkConf对象
        SparkConf sparkConf = new SparkConf()
                .setMaster("local[*]")
                .setAppName("sparkSql");

        // 第一个参数表示环境配置,第二个参数表示批量处理的周期(采集周期)
        JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(3));


        // 定义Kafka参数
        Map<String, Object> kafkaParams = new HashMap<String, Object>();
        kafkaParams.put("bootstrap.servers", "192.168.58.130:9092,192.168.58.131:9092,192.168.58.132:9092");
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("group.id", "coreqi");  // 配置消费者组
        kafkaParams.put("auto.offset.reset", "latest");
        kafkaParams.put("enable.auto.commit", false);

        // 配置 kafka主题
        Collection<String> topics = Arrays.asList("topicA", "topicB");

        //读取kafka数据创建DStream
        // kafka传入的K,V均为string类型
        JavaInputDStream<ConsumerRecord<String, String>> kafkaDataDS  =
                KafkaUtils.createDirectStream(ssc,
                        LocationStrategies.PreferConsistent(),  //位置策略,采集节点和计算节点如何做匹配,此值为'由框架自行匹配'
                        ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)); //消费者策略,订阅
        JavaPairDStream<String, String> mapToPair = kafkaDataDS.mapToPair(record -> new Tuple2<>(record.key(), record.value()));
        mapToPair.print();

        // 由于SparkStreaming采集器是长期执行的任务,所以不能直接关闭
        // 如果main方法执行完毕,应用程序也会自动结束,所以不能让main执行完毕
        ssc.start();              // 启动采集器

        ssc.awaitTermination();   // 等待采集器的关闭
    }
}

标签:kafka,数据源,kafkaParams,SparkStreaming,org,apache,import,spark,Kafka
From: https://www.cnblogs.com/fanqisoft/p/17966384

相关文章

  • SparkStreaming 自定义数据采集器
    本文的前提条件:SparkStreaminginJava参考地址:SparkStreamingCustomReceivers1.自定义数据采集器packagecn.coreqi.receiver;importorg.apache.spark.storage.StorageLevel;importorg.apache.spark.streaming.receiver.Receiver;importjava.util.Random;/**......
  • SparkStreaming in Java
    参考地址:SparkStreamingProgrammingGuide1.新建Maven项目,POM引入依赖<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.13</artifactId><version>3.5.0</ve......
  • springboot项目配置多数据源
    springboot项目配置多数据源//关键:mybatis文件的目录需要区分开来sqlSessionFactoryBean.setMapperLocations(newPathMatchingResourcePatternResolver().getResources("classpath:mybatis.myProjectOne/*.xml"));#从数据库配置,数据库的配置以spring.datasource.myPr......
  • 多数据源
    dynamic:primary:master#设置默认的数据源或者数据源组,默认值即为masterstrict:true#严格匹配数据源,默认false.true未匹配到指定数据源时抛异常,false使用默认数据源druid:#空闲时执行连接测试test-while-idle:true......
  • SpringBoot集成Kafka构建消息系统
    一、前言在我们当前的互联网应用中,消息驱动已经成为一种不可或缺的模式,Kafka作为一款高性能的分布式消息系统,已经成为很多公司在消息驱动架构选择中很重要的工具。我们使用SpringBoot和Kafka快速构建消息驱动应用,应对高并发的消息处理业务。Kafka是分布式发布-订阅消息系统。主要特......
  • Kafka环境安装
    wgethttps://downloads.apache.org/kafka/3.6.1/kafka_2.13-3.6.1.tgzsudomkdir/usr/local/kafka-server&&cd$_sudotar-xvzf~/kafka_2.13-3.6.1.tgz--strip1sudouseradd-r-d/usr/local/kafka-server-s/usr/sbin/nologinkafkasudo-ukafkamkdi......
  • 第二章 Spring Boot 整合 Kafka消息队列 生产者
    ​ 系列文章目录第一章Kafka配置部署及SASL_PLAINTEXT安全认证第二章  SpringBoot整合Kafka消息队列 生产者第三章  SpringBoot整合Kafka消息队列 消息者(待续) 前言        Kafka是一个消息队列产品,基于Topicpartitions的设计,能达到非常高的消息......
  • org.springframework.kafka.listener.ListenerExecutionFailedException: Listener me
    问题描述kafka在yml文件中未开启批量消费时,程序正常运行;但一开启正常消费后,就直接报错;排查问题的过程中一直觉得是配置文件里的问题,最后发现是消费者接受的参数类型错误 问题本质  消费者开启批量消费数据后,不能用单个实体类接收参数,而应该用list 解决方法  修改......
  • Kafka 万字精讲|工作五年这些你都知道吗?
    目录前言一、Kafka简介1.1事件流平台1.2Kafka主要概念和术语1.3Zookeeper二、Kafka集群搭建和使用2.1使用DockerCompose搭建Kafka集群2.2Kafka集群的使用2.3offset偏移量的提交三、Kafka高级3.1生产者发送数据3.1.1发送数据的6个步骤3.1.2生产者分区选择策略3......
  • Mybatis 拦截器实现单数据源内多数据库切换 | 京东物流技术团队
    物流的分拣业务在某些分拣场地只有一个数据源,因为数据量比较大,将所有数据存在一张表内查询速度慢,也为了做不同设备数据的分库管理,便在这个数据源内创建了多个不同库名但表完全相同的数据库,如下图所示:现在需要上线报表服务来查询所有数据库中的数据进行统计,那么现在的问题来了,该如何......