首页 > 其他分享 >Kafka 与Spark的集成

Kafka 与Spark的集成

时间:2024-04-08 13:58:33浏览次数:23  
标签:集成 val Kafka Streaming Spark spark

Apache Kafka与Apache Spark可以进行深度集成,实现从Kafka中读取实时流数据,并利用Spark的分布式计算能力进行高效的数据处理和分析。以下是如何将Kafka与Spark(特别是Spark Streaming或Structured Streaming)进行集成的示例:

1. Spark Streaming与Kafka集成

对于Spark 2.x之前的版本,通常使用Spark Streaming与Kafka集成。以下是一个使用Scala编写的简单示例:

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}

// Spark Streaming配置
val ssc = new StreamingContext(spark.sparkContext, Seconds(1))

// Kafka配置
val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "spark-streaming-consumer",
  "auto.offset.reset" -> "latest"
)

// 订阅的Kafka Topics
val topics = Array("my-topic")

// 创建Kafka DStream
val kafkaStream = KafkaUtils.createDirectStream[
  String,
  String,
  StringDecoder,
  StringDecoder
](ssc, kafkaParams, topics)

// 处理Kafka DStream
kafkaStream.foreachRDD { rdd =>
  rdd.foreach { record =>
    println(s"Received message: ${record.value()} from topic ${record.topic()}, partition ${record.partition()}, offset ${record.offset()}")
  }
}

// 启动Spark Streaming
ssc.start()
ssc.awaitTermination()

在这个示例中:

  • 首先创建一个StreamingContext,设置批处理间隔为1秒。
  • 然后定义Kafka连接参数,包括Broker地址、消费者组ID、键值反序列化器以及自动偏移重置策略。
  • 指定要订阅的Kafka Topic。
  • 使用KafkaUtils.createDirectStream方法创建一个从Kafka接收数据的DStream(离散化流)。
  • 定义对DStream中每一批数据(RDD)的处理逻辑,这里只是简单地打印每条消息的内容、Topic、分区和偏移量。
  • 最后启动Spark Streaming并等待其结束。

2. Spark Structured Streaming与Kafka集成

对于Spark 2.x及更高版本,推荐使用Spark Structured Streaming与Kafka集成,因为它提供了更丰富的SQL-like API和更好的性能。以下是一个使用Scala编写的简单示例:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

val spark = SparkSession.builder
  .appName("Spark Structured Streaming with Kafka")
  .getOrCreate()

val kafkaDataFrame = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "my-topic")
  .load()

val parsedDataFrame = kafkaDataFrame.selectExpr("cast(key as string)", "cast(value as string)")

parsedDataFrame.writeStream
  .format("console")
  .outputMode("append")
  .start()
  .awaitTermination()

在这个示例中:

  • 创建一个SparkSession,它是Spark 2.x及以上版本中统一的入口点。
  • 使用readStream方法从Kafka读取数据,指定Kafka Broker地址、订阅的Topic以及数据源格式为"kafka"。
  • 从Kafka DataFrame中选择键和值列,并将它们转换为字符串类型。
  • 使用writeStream方法将处理后的数据写入控制台输出,设置输出模式为追加(append)。
  • 启动流处理作业并等待其结束。

注意事项

  • 请确保Kafka服务已启动,且指定的Topic中有消息可供消费。
  • 根据实际需求调整Spark Streaming或Structured Streaming的配置,如检查点、故障恢复、输出持久化等。
  • 在生产环境中,需要考虑数据处理的性能、容错性、监控以及与现有系统的集成。

通过以上步骤,您已经成功地将Apache Kafka与Apache Spark(Spark Streaming或Structured Streaming)进行了集成,实现了从Kafka Topic接收数据并使用Spark进行实时流处理。在实际应用中,您可以根据业务需求添加更复杂的Spark SQL查询、机器学习算法等操作。

标签:集成,val,Kafka,Streaming,Spark,spark
From: https://blog.csdn.net/qq_33240556/article/details/137497770

相关文章

  • spark 算子优化 repartiton
    算子调优之使用repartition解决SparkSQL低并行度的性能问题并行度:之前说过,并行度是自己可以调节,或者说是设置的。1、spark.default.parallelism2、textFile(),传入第二个参数,指定partition数量(比较少用)咱们的项目代码中,没有设置并行度,实际上,在生产环境中,是最好自己设置......
  • 微服务集成Spring Cloud Zipkin实现链路追踪并集成Dubbo
    1、什么是ZipKinZipkin是一个根据Google发表的论文“Dapper”进行开源实现的分布式跟踪系统。Dapper是Google公司内部的分布式追踪系统,用于生产环境中的系统分布式跟踪。Google在其论文中对此进行了解释,他们“构建了Dapper,以向Google开发人员提供有关复杂分布式系统行为......
  • Kafka原理剖析之「位点提交」
    一、背景Kafka的位点提交一直是Consumer端非常重要的一部分,业务上我们经常遇到的消息丢失、消息重复也与其息息相关。位点提交说简单也简单,说复杂也确实复杂,没有人能用一段简短的话将其说清楚,最近团队生产环境便遇到一个小概率的报错“Offsetcommitfailedwitharetriablee......
  • Hadoop3.1.3+Spark2.3.4全分布决策树
    该文档是一些配置全分布的注意事项(遇到的坑)与个人的一些指令备注,阅读文档前需要配置好网络,具体可以参考:网络配置。linux系统选择的是Centos7首先是一些小工具:小技巧1.Xshell:可以更方便地批量操控虚拟机进行全分布:这样输入任何指令都可以输入给所有虚拟机,方便全分布的配置......
  • SpringBoot集成mqtt启动就不断报已断开连接问题
    踩坑记录,实在是天坑!!!原因一:clientId相同,即clientId重复导致(不过我不是这个问题)我的问题是:项目启动成功后,控制台不停地反复输出:已断开连接,,,加了重连机制后,则不停地输出:重连失败,已连接客户机,,,尼玛,,关键点还在于我能接收到订阅的消息(不影响消息处理),这又是什么情况,明明没断连,确一直......
  • 软考-系统集成项目管理中级-项目管理一般知识
    本章历年考题分值统计本章重点常考知识点汇总清单(学握部分可直接理解记忆)项目型组织的优点体现在如下方面:本章历年考题及答案解析2019年上半年第29题(此题为常规重点考题,建议举一反三)在(29)组织结构中,项目拥有独立的项目团队,项目经理在调用与项目相关的资源时,......
  • vue websocket电脑端前端集成
    后端数据用websocket推送数据,前端在大屏左上角模块页面接收,用bus发送到其他模块(总共6个模块页面,从左上模块页面发送到其他5个模块页面)页面,数据用于大屏上显示,废话不多说,直接上代码。eventBus.js文件,放到根目录src->assets->js文件夹下,eventBus.js文件内容如下:importVuefr......
  • TalkingData——Unity应用开发中集成统计分析工具
    第一步:帐号注册官方网站:TalkingData-移动.数据.价值第二步:创建应用查看appid可以进入网站注册,注册好以后就可以创建应用 创建好应用后,点击 应用管理-》基本信息就可以查看自己的AppID第三步:申请对应平台的sdk 接下来就是申请sdk这里是申请sdk的网站:SDK定制填写......
  • Spark-Scala语言实战(13)
    在之前的文章中,我们学习了如何在spark中使用键值对中的keys和values,reduceByKey,groupByKey三种方法。想了解的朋友可以查看这篇文章。同时,希望我的文章能帮助到你,如果觉得我的文章写的不错,请留下你宝贵的点赞,谢谢。Spark-Scala语言实战(12)-CSDN博客文章浏览阅读722次,点赞19次......
  • SpringBoot集成微信支付(JAVA)
    微信支付(Java)目录微信支付(Java)简介:登录微信公众平台(JSAPI支付):注意事项:添加依赖:application.yaml:WeixinPayController:PaymentService:PaymentServiceImpl:实体类PaymentJSAPI:简介:        Springboot项目集成微信支付(JSAPI),用于微信公众号对接支付功......