首页 > 其他分享 >Spark消费Kafka

Spark消费Kafka

时间:2023-06-05 21:12:50浏览次数:55  
标签:消费 scala kafka org apache import Spark Kafka spark

0. 前言

之前先写了处理数据的spark,用文件读写测了一批数据,能跑出结果;今天调通了Kafka,拼在一起,没有半点输出,查了半天,发现是之前的处理部分出了问题,把一个不等号打成了等号,把数据全filter没了。很恐怖,我保证这段时间我没动过这段代码,但上次真的跑出东西了啊(尖叫

1. 配置流程

主节点打开zookeeper、yarn、hdfs、kafka和spark

从节点打开zookeeper和kafka

(有冗余)

cluster1打开生产者:$ kafka-console-producer.sh --broker-list localhost:9092 --topic mykafka

cluster2可以打开消费者调试:$ kafka-console-consumer.sh -zookeeper cluster1:2181,cluster2:2181,cluster3:2181 --topic mykafka --from-beginning

这时从cluster1发送一条消息,可以从cluster2的屏幕上看到。

使用spark的脚本:

import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

object Direct {
  val topic = "mykafka"
  val brokerList = "cluster1:9092,cluster2:9092,cluster3:9092"
  val groupId = "test-consumer-group"

  def main(args: Array[String]): Unit = {
    val topics = topic.split(",").toSet
    val conf = new SparkConf()
      .setAppName("Direct")
      .setMaster("local[*]")
        //多个节点要加[*]
      .set("spark.streaming.receiver.writeAheadLog.enable", "true")

    val batchInterval = Seconds(5)
    val kafkaParams: Map[String, String] = Map[String, String](
      "metadata.broker.list" -> brokerList,
      "group.id" -> groupId,
        //从最早的消息开始读
      "auto.offset.reset" -> "smallest"
    )
    val ssc = new StreamingContext(conf, batchInterval)
    val input: InputDStream[(String, String)] = KafkaUtils
      .createDirectStream[String, String, StringDecoder, StringDecoder](ssc,
        kafkaParams, topics)

   //...处理input,input为InputDStream格式

    ssc.start()
    ssc.awaitTermination()
  }
}
    

使用Java的代码:

import java.util.*;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.javaapi.consumer.ZookeeperConsumerConnector;
import kafka.message.MessageAndMetadata;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import scala.*;
public class KafkaConsumerTest {
    public static void main(String[] args) {
        Properties props = new Properties();
        // zookeeper地址
        props.put("zookeeper.connect", "cluster1:2181,cluster2:2181,cluster3:2181");
        // 消费者组id
        props.put("group.id", "test-consumer-group");
        // smallest : 从头消费
        // largest : 从最后消费
        props.put("auto.offset.reset", "smallest");

        ConsumerConfig conf = new ConsumerConfig(props);
        ConsumerConnector consumer = new ZookeeperConsumerConnector(conf);
        Map<String, Integer> topicStrams = new HashMap<String, Integer>();
        // 第二个数字是返回几个流,topic几个分区就陪几个流比较合理
        topicStrams.put("mykafka", 1);
        Map<String, List<KafkaStream<byte[], byte[]>>> messageStreamsMap = consumer.createMessageStreams(topicStrams);
        List<KafkaStream<byte[], byte[]>> messageStreams = messageStreamsMap.get("mykafka");
        for(final KafkaStream<byte[], byte[]> kafkaStream : messageStreams){
            new Thread(new Runnable() {
                public void run() {
                    ArrayList<String> input = new ArrayList<>();
                    for(MessageAndMetadata<byte[], byte[]> mm : kafkaStream){
                        String msg = new String(mm.message());
                        //...进行操作
                    }

                }

            }).start();

        }
    }
}

可以看到一个配置是用的zookeeper,端口号为2181;一个是用的spark的端口号,9092,不要搞混,否则会报错:

Received -1 when reading from channel, socket has likely been closed.

将代码打成jar包,注意要带依赖,spark或者kafka不自带spark-streaming-kafka,或者在集群上配置相应jar包。

运行java代码:java -jar <包名> KafkaConsumerTest

运行spark代码:spark-submit --class Direct <包名>

依赖:

<dependencies>
        <!-- 导入scala的依赖 -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.10.6</version>
        </dependency>
        <!-- spark相关依赖 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-8_${scala.version}</artifactId>
            <version>2.0.0-preview</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- sql驱动 -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.43</version>
        </dependency>
View Code

插件:

<plugins>
            <!-- scala插件需要 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <phase>process-test-resources</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.1.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
View Code

第一个是scala需要的插件,第二个是打jar-with-dependency的插件。

 

2. 消费者group id

配置的时候有groupid,可以在该节点的${KafkaHome}/config/consumer.properties中找到。

group的意义:一个topic有很多分区,一份消息可以拆开来放在若干分区上,但很难分辨哪些分区可以拼出全部消息,需要group来记录一下。

 

3. scala object类

object类里的方法都是可以直接使用的静态方法,相应地,class没有static关键字。

同名的object和class形成伴生关系,互相可以访问私有成员,静态类可以提供一些静态方法,听上去有点怪,但实际蛮好用的。

Kotlin也有这样的设计

 

4. Dstream、InputDStream和RDD

DStream、RDD、DataFrame 的相互转换、spark 比 MapReduce 快的原因 - 赤兔胭脂小吕布 - 博客园 (cnblogs.com)

关于DStream:DStream是一串RDD,过一会来一些,过一会来一些,很符合数据生成的样子。

每一个InputDStream对应一个receiver

 

5. maven scope标签

遇到这个是因为所有依赖都打进包里这个包太大了,有144.8M,想能不能指定依赖,但没成功。后面再学吧。

Maven之scope详解 - satire - 博客园 (cnblogs.com)

 

6. kafka和zookeeper的关系

旧版kafka依赖于zookeeper,v2.8之后就不依赖了。

Kafka参数zookeeper和bootstrap-server的区别 - Clotho_Lee - 博客园 (cnblogs.com)

kafka 中 zookeeper 具体是做什么的? - 知乎 (zhihu.com)

标签:消费,scala,kafka,org,apache,import,Spark,Kafka,spark
From: https://www.cnblogs.com/capterlliar/p/17458914.html

相关文章

  • 面试官问:kafka为什么如此之快?
    前言天下武功,唯快不破。同样的,kafka在消息队列领域,也是非常快的,这里的块指的是kafka在单位时间搬运的数据量大小,也就是吞吐量,下图是搬运网上的一个性能测试结果,在同步发送场景下,单机Kafka的吞吐量高达17.3w/s,不愧是高吞吐量消息中间件的行业老大。那究竟是什么原因让kafka如此......
  • Spark搭建
    Spark搭建Local模式主要用于本地开发测试本文档主要介绍如何在IDEA中配置Spark开发环境打开IDEA,创建Maven项目在IDEA设置中安装Scala插件在pom.xml文件中添加Scala依赖<dependency><groupId>org.scala-lang</groupId><artifa......
  • 单节点kafka部署笔记
    1背景因为工作中需要对接kafka,准备在测试环境中自己部署一套,考虑方便决定部署一台单点。2部署2.1scala2.1.1java环境openjdk即可,我使用的是openjdk1.82.1.2下载软件下载scala-2.12.17.tgz并解压,例如解压到/home/scala/scala-2.12.172.1.3环境变量exportSCALA_HOME......
  • python中生产者和消费者理论
    1.模型理论生产者消费者他是一个概念,(由于生产者消费者模型并不局限于某一类技术,因此,有多种实现方式)所以,代码很简单,所以这里首先要弄懂理论。 1.1 生产者消费者模型模型指的是一种解决问题的套路。 1.2生产者消费者模型中包含两类重要角色一类叫生产者,另一类叫消费者......
  • apache kafka系列之迁移与扩容工具用法
    kafka迁移与扩容工具使用参考官网site:https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Replicationtools-6.ReassignPartitionsTool说明:当我们对kafka集群扩容时,需要满足2点要求: 将指定topic迁移到集群内新增的node上。将topic的指定partition迁移到新增......
  • 【Azure 事件中心】如何查看Event Hub的生产者或者是消费者端的IP地址呢?
    问题描述哪些客户端IP正在向/从AzureEventHub发送/接收事件?如何来查看EventHub的生产者端,消费者端的IP地址呢? 问题解答如果需要查看EventHub服务端的日志,可以在Azure门户上开启诊断日志来查看。默认情况下,我们并不能看见EventHub的生产者,消费者端所使用的IP地址。在查看官......
  • 【Azure 事件中心】如何查看Event Hub的生产者或者是消费者端的IP地址呢?
    问题描述哪些客户端IP正在向/从AzureEventHub发送/接收事件?如何来查看EventHub的生产者端,消费者端的IP地址呢? 问题解答如果需要查看EventHub服务端的日志,可以在Azure门户上开启诊断日志来查看。默认情况下,我们并不能看见EventHub的生产者,消费者端所使用的IP地址。在查看官......
  • kafka跨集群发送消息
    1.场景集群B有一个应用要向集群A的kafka集群发送消息,但是集群A和集群B不是直接互通的,需要经过一层转发。 ......
  • Use trained sklearn model with pyspark
    Usetrainedsklearnmodelwithpyspark frompysparkimportSparkContextimportnumpyasnpfromsklearnimportensembledefbatch(xs):yieldlist(xs)N=1000train_x=np.random.randn(N,10)train_y=np.random.binomial(1,0.5,N)model=ensemb......
  • JanusGraph架构——gremlin是提交spark任务计算,数据读写转给后端DB做读写
    JanusGraph是一个图形数据库引擎。JanusGraph本身专注于压缩图序列化、丰富图数据建模、高效的查询执行。此外,JanusGraph利用Hadoop进行图分析和批处理。JanusGraph为数据持久化,数据索引和客户端访问实现了强大的模块化接口。JanusGraph的模块化架构使其能够与各种存储,索引和客......