首页 > 其他分享 >学习记录22

学习记录22

时间:2024-02-04 20:11:42浏览次数:22  
标签:22 记录 kafka 学习 usr apache org spark local

  本次学习了spark Streaming里进行读取高级数据源的一些操作

进行安装kafka

网站:https://dblab.xmu.edu.cn/blog/1096/

接下来在Ubuntu系统环境下测试简单的实例,按顺序执行如下命令:

# 进入kafka所在的目录
cd /usr/local/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

命令执行后不会返回Shell命令输入状态,zookeeper就会按照默认的配置文件启动服务,请千万不要关闭当前终端.启动新的终端,输入如下命令:

cd /usr/local/kafka
bin/kafka-server-start.sh config/server.properties

kafka服务端就启动了,请千万不要关闭当前终端。启动另外一个终端,输入如下命令:

cd /usr/local/kafka
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic dblab

topic是发布消息发布的category,以单节点的配置创建了一个叫dblab的topic.可以用list列出所有创建的topics,来查看刚才创建的主题是否存在。

bin/kafka-topics.sh --list --zookeeper localhost:2181

可以在结果中查看到dblab这个topic存在。

kafka和Flume等高级输入源,需要依赖独立的库(jar文件)

需要在spark-shell下进行import语句进行测试,但是spark库中没有这类文件

需要下载相对应的spark-streaming-kafka的jar包才可以使用kafka,下载的是:spark-streaming-kafka-0-10_2.12-3.2.0这个版本的,其中2.12是scala的版本,3.20是spark的版本

访问MVNREPOSITORY官网(http://mvnrepository.com),下载spark-streaming-kafka-0-10_2.12-3.2.0.jar和spark-token-provider-kafka-0-10_2.12-3.2.0.jar文件,其中,2.12表示Scala的版本号,3.2.0表示Spark版本号。或者也可以直接到本教材官网的“下载专区”的“软件”目录中下载这两个文件。然后,把这两个文件复制到Spark目录的jars目录下(即“/usr/local/spark/jars”目录)。此外,还需要把“/usr/local/kafka/libs”目录下的kafka-clients-2.6.0.jar文件复制到Spark目录的jars目录下

下载后复制到/usr/local/spark/jars/

[atguigu@hadoop102 ~]$ cd /usr/local/spark/jars/
[atguigu@hadoop102 jars]$ mkdir kafka
[atguigu@hadoop102 jars]$ cd
[atguigu@hadoop102 ~]$ cd 下载
[atguigu@hadoop102 下载]$ cp ./spark-streaming-kafka-0-10_2.12-3.2.0.jar /usr/local/spark/jars/kafka/

继续把kafka安装目录下的libs目录的所有jar文件进行复制到“/usr/local/spark/jars/kafka”目录下,

[atguigu@hadoop102 下载]$ cd /usr/local/kafka/libs/
[atguigu@hadoop102 libs]$ cp ./* /usr/local/spark/jars/kafka

编写Spark Streaming程序使用Kafka数据源

步骤:

1.编写生产者(producer)程序

2.编写消费者(consumer)程序

3.编译打包程序

4.运行程序

1.编写生产者(producer)程序

cd /usr/local/spark/mycode
mkdir kafka
cd kafka
mkdir -p src/main/scala
cd src/main/scala
vim KafkaWordProducer.scala

使用vim编辑器新建了KafkaWordProducer.scala,它是产生一系列字符串的程序,会产生随机的整数序列,每个整数被当做一个单词,提供给KafkaWordCount程序去进行词频统计。请在KafkaWordProducer.scala中输入以下代码:

import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
object KafkaWordProducer {
  def main(args: Array[String]) {
    if (args.length < 4) {
      System.err.println("Usage: KafkaWordProducer <metadataBrokerList> <topic> " +
        "<messagesPerSec> <wordsPerMessage>")
      System.exit(1)
    }
    val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args
    // Zookeeper connection properties
    val props = new HashMap[String, Object]()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    val producer = new KafkaProducer[String, String](props)
 // Send some messages
    while(true) {
      (1 to messagesPerSec.toInt).foreach { messageNum =>
        val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).
toString)
          .mkString(" ")
                    print(str)
                    println()
        val message = new ProducerRecord[String, String](topic, null, str)
        producer.send(message)
      }
     Thread.sleep(1000)
    }
  }
}

2.编写消费者(consumer)程序

保存后退出vim编辑器。然后,继续在当前目录下创建KafkaWordCount.scala代码文件:

vim KafkaWordCount.scala
import org.apache.spark._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

object KafkaWordCount{
  def main(args:Array[String]){
    val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
    val sc = new SparkContext(sparkConf)
    sc.setLogLevel("ERROR")
    val ssc = new StreamingContext(sc,Seconds(10))
    ssc.checkpoint("file:///usr/local/spark/mycode/kafka/checkpoint") //设置检查点,如果存放在HDFS上面,则写成类似ssc.checkpoint("/user/hadoop/checkpoint")这种形式,但是,要启动Hadoop
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "localhost:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "use_a_separate_group_id_for_each_stream",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (true: java.lang.Boolean)
    )
    val topics = Array("wordsender")
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )
    stream.foreachRDD(rdd => {
      val offsetRange = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      val maped: RDD[(String, String)] = rdd.map(record => (record.key,record.value))
      val lines = maped.map(_._2)
      val words = lines.flatMap(_.split(" "))
      val pair = words.map(x => (x,1))
      val wordCounts = pair.reduceByKey(_+_)
      wordCounts.foreach(println)
    })
    ssc.start
    ssc.awaitTermination
  }
}

然后,执行下面命令:

cd /usr/local/spark/mycode/kafka/
vim simple.sbt
name := "Simple Project"
version := "1.0"
scalaVersion := "2.12.15"
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.2.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "3.2.0" % "provided"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-10" % "3.2.0"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.6.0"

3.编译打包程序

cd /usr/local/spark/mycode/kafka/
/usr/local/sbt/sbt package

4.运行程序

启动集群:

[atguigu@hadoop102 ~]$ myhadoop.sh start

启动zookeeper

[atguigu@hadoop102 ~]$ cd /usr/local/kafka
[atguigu@hadoop102 kafka]$ bin/zookeeper-server-start.sh config/zookeeper.properties

命令执行后不会返回Shell命令输入状态,zookeeper就会按照默认的配置文件启动服务,请千万不要关闭当前终端.启动新的终端,输入如下命令:

cd /usr/local/kafka
bin/kafka-server-start.sh config/server.properties

kafka服务端就启动了,请千万不要关闭当前终端。启动另外一个终端,输入如下命令:

运行生产者

/usr/local/spark/bin/spark-submit --class "KafkaWordProducer" ./target/scala-2.12/simple-project_2.12-1.0.jar localhost:9092 wordsender 3 5

请新打开一个终端,执行下面命令,运行KafkaWordCount程序,执行词频统计:

cd /usr/local/spark/mycode/kafka/
/usr/local/spark/bin/spark-submit  \
--class "KafkaWordCount"  \
./target/scala-2.12/simple-project_2.12-1.0.jar

一直报错,无法解决,判断的是版本之间的冲突,无法进行很好的解决

标签:22,记录,kafka,学习,usr,apache,org,spark,local
From: https://www.cnblogs.com/JIANGzihao0222/p/18006916

相关文章

  • How to unlock Nissan Altima 2019-2022 Smart Remote 5 Buttons 433MHz Keys with Sm
    Howtounlock Nissan Altima2019-2022Smart Remote 5Buttons433MHzKeyswithSmartPro5000U-Plusfirst,youneedhavea SmartPro5000U-PlusProgrammer,ifyoudonothaveaSmartPro5000U-Plus,youcanbuyonefromchinaobd2.com.https://www.chinaobd2.co......
  • [官方培训]22-UE资产优化 | Epic 肖月
    传送门:[官方培训]22-UE资产优化|Epic肖月_哔哩哔哩_bilibili 目标:面向TA/美术,不影响视觉的前提下,优化运行和编辑效率引擎:UE5.1延迟管线(不涉及GamePlay、游戏交互逻辑、前向渲染)平台:支持Namite/Lumen等影视级特性的主流PC,Windows10withDX12一.性能查看1.1......
  • 2022河南萌新联赛第(三)场:河南大学
    A-玉米大炮二分一个时间,然后计算每门大炮可以射击的次数#include<bits/stdc++.h>usingnamespacestd;#defineintlonglong//#defineint__int128#definedoublelongdoubletypedefpair<int,int>PII;typedefpair<string,int>PSI;typedefpair<string,string>PS......
  • 【学习笔记】OI 数学学习笔记
    OI数学学习笔记001_整除001.1_整除基础001.1A基本定义整除与因数倍数的定义:设\(a,b\in\mathbb{Z},b\ne0\),若存在\(q\in\mathbbZ\),使得\(a=bq\),则称\(b\)整除\(a\),记为\(b\mida\),此时称\(a\)为\(b\)的因数,\(b\)为\(a\)的倍数.带余除法与余数的......
  • 云打印会记录打印内容吗?
    随着云打印服务的迅猛发展,现在很多有打印需求的朋友都会去选择易绘创云打印服务。一些比较注重文件安全的用户比较警觉,想要了解云打印的安全性。那么云打印会记录打印内容吗?今天就一起来了解一下。 云打印会记录打印内容吗?相信很多有过打印经历的朋友都知道,很多时候一些打印......
  • 学习MarkDown
    MarkDown学习标题三级标题字体HelloWord!(加粗:两边两个*)HelloWord!(斜体:两边一个*)HelloWord!(斜体且加粗:三个*)HelloWord!(字体划线:两边两个~)引用大于号后面写内容用来摘抄引用分割线用三个-或者三个*图片用感叹号加中括号加括号(名字+地址)![图片](C:\Users\2......
  • Pandas库学习笔记(6) -- Pandas 基本方法
    Pandas基本方法实例到目前为止,我们了解了三个PandasDataStructures以及如何创建它们。由于它在实时数据处理中的重要性,因此我们将主要关注DataFrame对象,并讨论其他一些DataStructures。方法描述axes返回行轴标签的列表dtype返回对象的dtype。empty如果Series......
  • 第一章学习Markdown语法详解
    Markdown学习一、标题:一级标题一个井号空格加标题名字就可以了二级标题两个井号空格加标题名字就可以了三级标题三个井号空格加标题名字就可以了四级标题四个井号空格加标题名字就可以了五级六级标题把对应的#写够即可。注意最多只支持到六级标题二、字体Hello,World!......
  • 软件测试学习笔记丨Seleium的BUG:页面元素文案重复空格处理
    前言需求做WEB的UI自动化练习,其需求为:访问慕课网的实战页面,获取实战页面的课程列表信息,并逐个点击进入详情并且关闭详情,直到最后一个。环境Java8MavenSelenium4.0Junit5初步代码importorg.junit.jupiter.api.AfterAll;importorg.junit.jupiter.api.BeforeAll;importorg......
  • Tcpdump和Wireshark的学习与使用
    Tcpdump和Wireshark的学习与使用背景2024年2月份农历小年时。同事为了解决一个应用忽快忽慢的问题去了上海客户那里。第二天自己在理发时(周末)接到了他的电话,说到了一些问题情况。比较明确的是,应用和数据库的请求经常出现20ms左右的高延迟的情况。其实子很早之前学习过tc......