首页 > 其他分享 >[Spark streaming举例]-- 消费kafka中的数据

[Spark streaming举例]-- 消费kafka中的数据

时间:2022-11-03 14:37:05浏览次数:51  
标签:val -- 9092 kafka streaming apache org ssc


第一种方式

package com.kafka.my.scala
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Durations
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka.KafkaUtils
import java.util.Properties
/**
*
* @author root
* sparkstreaming获取kafka方式一
* 测试结果:通过
*测试:1\需要先在h15\h16\h17上启动zookeeper,再启动kafka,创建kafka的topic
* 2\在h15kafka的bin目录下执行
* #sh kafka-console-producer.sh --topic 20160510a --broker-list h15:9092,h16:9092,h17:9092
* 让其等待输入
* 3\启动本程序接受数据
* 4\在h15上刚才的窗口输入数据
* 5\查看本程序是否正常接收
*
*错误:java.nio.channels.ClosedChannelException
* fetching topic metadata for topics [Set(20160510aa)] from broker [ArrayBuffer(id:2,host:h17,port:9092, id:1,host:h16,port:9092, id:0,host:h15,port:9092)] failed
原因:server.propertis中的host.name=h15或者注释掉,否则报错
*/


object KafkaReceiverCountWord {
def main(args: Array[String]): Unit = {
//创建ssc
val ssc=new StreamingContext(new SparkConf().setAppName("wordCount").setMaster("local[2]"),Durations.seconds(5))
//创建properties
val topicThreadMap=Array("20160510a").map { (_,1) }.toMap
//创建客户端接收
val lines =KafkaUtils.createStream(ssc,
"192.168.142.115:2181,192.168.142.116:2181,192.168.142.117:2181"
, "WordcountConsumerGroup"
, topicThreadMap)
//切割
val words =lines.flatMap(_._2.split(" "))
//tuple
val pairs=words.map { (_,1) }
//reduceByKey
val wordcounts=pairs.reduceByKey(_+_)
//必须触发
wordcounts.print()
//开启
ssc.start()
//等待
ssc.awaitTermination()
//关闭
ssc.stop()
}
}

第二种方式

package com.kafka.my.scala

import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Durations
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import kafka.serializer.StringDecoder
/**
*
* @author root
* sparkstreaming获取kafka方式二
* 测试结果:通过
* 区别于第一种方式:
* 1、offset不会更新到zookeeper
* 2、使用的节点端口是9092kafka的broker端口
* 3、不存在group
*
*/
object KafkaDirectCountWord {
def main(args: Array[String]): Unit = {
//获取sparkstreaming
val ssc=new StreamingContext(new SparkConf().setAppName("directCount").setMaster("local[2]"),Durations.seconds(5))
//创建kafkaParams
val kafkaParams=Array("metadata.broker.list").map {(_,"192.168.142.115:9092,192.168.142.116:9092,192.168.142.117:9092") }.toMap
//创建topic
val topics=Array("20160510a").toSet
// val topics=Set("20160510a")
//获取lines
val lines=KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,kafkaParams,topics)
//切割
val words =lines.flatMap(_._2.split(" "))
//tuple
val pairs=words.map { (_,1) }
//reduceByKey
val wordcounts=pairs.reduceByKey(_+_)
//必须触发
wordcounts.print()
//开启
ssc.start()
//等待
ssc.awaitTermination()
//关闭
ssc.stop()
}
}

 

标签:val,--,9092,kafka,streaming,apache,org,ssc
From: https://blog.51cto.com/u_13966077/5819839

相关文章

  • 静态代理模式
    前言为某个对象提供一个代理,以控制对这个对象的访问。代理类和委托类有共同的父类或父接口,这样在任何使用委托类对象的地方都可以用代理对象替代。代理类负责请求的预处......
  • 小感
    未来会怎样的,大多时候我们是在带着焦虑前进,一边后悔昨天,一边担心明天,一边浪费今天。我们或许看不透遥远的未来,但做好局部最优的贪心一定不会错,同时换个角度考虑,我们已经拼......
  • [Java基础]-- 接口、抽象类
    一直都在使用接口,没怎么用过抽象类,今天面试遇到了面试官提问:抽象类和接口有什么区别啊?下面就关于这个问题好好研究一下,希望在以后的工作中能牢记。。以下是《疯狂java讲义》......
  • SpringBoot配置swagger
    1、引入依赖<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><......
  • 什么是mysql数据库?MySQL的特点有哪些?
    MySQL是一个关系型数据库管理系统,由瑞典MySQLAB公司开发,属于Oracle旗下产品MySQL是最流行的关系型数据库管理系统之一,在WEB应用方面,MySQL是最好的RDBMS(Re......
  • 5+:非肿瘤能量代谢思路来喽
    原文公众号:一起实验网对下述分析方法感兴趣或者没有研究思路的小伙伴,欢迎踊跃探讨! 早!今天小编和大家分享一篇10月刚刚发表在FrontCardiovascMed杂志(IF:5.846)的文章《I......
  • [Java应用]-- 拼接多张图片
    实现代码如下importjava.awt.image.BufferedImage;importjava.io.File;importjavax.imageio.ImageIO;/****@类功能说明:java拼接多张图片,生成的格式是jpg、bmp......
  • [数据库基础]-- 字符串截取函数substr、substring以及 case when函数使用
    使用说明:1、使用:substr使用范围:oracle、mysql、sqlserversubstring使用范围:mysql、sqlserver 2、举例:现有表:t_user name、age字段查询需求:如果name字段中的第5个字符有“......
  • 第七章Python实训
    test7-1    test7-2   test7-3    test7-4    test7-5   ......
  • gitlab正确上传文件方法
    一.用户邮箱绑定打开电脑命令提示符首先要配置好用户名和邮箱查看用户名和邮箱gitconfiguser.namegitconfiguser.email修改用户名和邮箱gitconfig--globaluser.n......