首页 > 其他分享 >数据源:flume采集到的端口

数据源:flume采集到的端口

时间:2022-08-31 19:56:36浏览次数:65  
标签:flume sinks 数据源 端口 a1 sources node1 spark

推送式

  • 将flume采集的数据主动推送给Spark程序,容易导致Spark程序接受数据出问题,推送式整合是基于avro端口下沉地方式完成
  • 引入SparkStreaming和Flume整合的依赖
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.3.1</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-flume_2.11</artifactId>
    <version>2.3.1</version>
</dependency>
</dependencies>
  • 定义Flume采集数据进程脚本,把sink下沉地指定为avro类型的端口下沉底
[root@node1 data]# vi portToSpark.conf 
#指定agent的sources,sinks,channels
a1.sources = s1
a1.sinks = k1
a1.channels = c1
#配置sources属性
a1.sources.s1.type = netcat
a1.sources.s1.bind = node1
a1.sources.s1.port = 44444
#配置sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node1
a1.sinks.k1.port = 8888
a1.sinks.k1.batch-size = 1
#配置channel类型
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#整合flume进程中source channel sink
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
  • 通过FileUtils.createStream方法从avro的端口中获取flume采集到avro端口的实时数据
package SparkStreaming.flume

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

object ByFlumePush {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[3]").setAppName("hdfs")
    val ssc: StreamingContext = new StreamingContext(conf, Seconds(10))
    val ds = FlumeUtils.createStream(ssc, "node1", 8888, StorageLevel.MEMORY_ONLY)
    ds.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
  • 启动
1. 启动flume
flume-ng agent -n a1 -f portToSpark.conf -Dflume.root.logger=INFO
2. 运行主类,将java代码打包上传到node1上
spark-submit --class flume.Demo01 ssc.jar
3. 开启监听的端口号
[root@node1 ~]# telnet node1 44444
  • 注意:
    必须保证Spark Streaming运行程序和Flume采集进程在同一个节点上,保证Spark Streaming打包的jar包必须把spark-streaming-flume_2.11:2.3.1版本的依赖包全部打包到jar包中
    (这里用的别人打的包,保存在G://shixun//ssc.jar路径下了)

拉取式

  • 将Flume采集的数据发送给sink了,sink并不是直接把数据立马给了Spark,而是先把数据缓冲,Spark接收器可以按照我的需求主动去sink中拉取数据.
    拉取式整合方式是基于Spark下沉地完成----建议使用
  • 引入依赖:
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.3.1</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-flume_2.11</artifactId>
    <version>2.3.1</version>
</dependency>
</dependencies>
  • 定义flume脚本文件,和上面的方式同,但把sink的下沉地改为SparkSink
[root@node1 data]# vi portToSpark2.conf 
#指定agent的sources,sinks,channels
a1.sources = s1
a1.sinks = k1
a1.channels = c1
#配置sources属性
a1.sources.s1.type = netcat
a1.sources.s1.bind = node1
a1.sources.s1.port = 44444
#配置sink
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.hostname = node1
a1.sinks.k1.port = 8888
a1.sinks.k1.batch-size = 1
#配置channel类型
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#整合flume进程中source channel sink
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
  • 定义读取方法
package SparkStreaming.flume

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

object ByFlumePush {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[3]").setAppName("hdfs")
    val ssc: StreamingContext = new StreamingContext(conf, Seconds(10))
    val ds = FlumeUtils.createPollingStream(ssc, "node1", 8888, StorageLevel.MEMORY_ONLY)
    ds.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
  • [注意]:
    SparkStreaming的依赖jar包复制到flume软件的lib目录下,把spark-streaming-flume的依赖jar包放到flume软件的lib目录下
[root@node1 jars]# pwd
/opt/app/spark-2.3.1/jars
[root@node1 jars]# cp spark-streaming_2.11-2.3.1.jar /opt/app/flume-1.8.0/lib/
[root@node1 data]# pwd
/opt/data
[root@node1 data]# cp ssc.jar /opt/app/flume-1.8.0/lib/

(ssc.jar为别人打的包,保存在G://shixun//ssc.jar路径下了)

  • 启动
[root@node1 data]# flume-ng agent -n a1 -f portToSpark2.conf -Dflume.root.logger=INFO,console
[root@node1 data]# spark-submit --class flume.ByFlumePush ssc2.jar
[root@node1 data]# telnet node1 44444

标签:flume,sinks,数据源,端口,a1,sources,node1,spark
From: https://www.cnblogs.com/jsqup/p/16643991.html

相关文章

  • 处理不同的数据源(端口,HDFS)
    端口//地址,端口号,级别(将数据存储在所设置的级别中,这里设置级别为spark的内存)valds:DStream[String]=ssc.socketTextStream("node1",44444,StorageLevel.MEMORY_......
  • OPNsense 防火墙系列二:SSL 证书和监听端口更改
    SSL证书默认OPNsense在初始化时,就默认监听80和443端口,强制SSL跳转,规定使用自签名的证书,有效期一年.我们只需要添加第三方证书并设置使用即可。申请证书需要拥......
  • Linux操作系统中修改putty工具的ssh端口号(22)
    Linux服务器为了保证安全,需修改putty远程的默认端口22,具体操作步骤:1.在Linux服务器中登录用户名和密码(用root用户登录);2.输入vim /etc/ssh/sshd_config3.上下箭头移动......
  • VTP | DTP | 端口镜像 | 端口聚合 | 端口隔离
    1、VTP(vlan数据库同步协议):凡是同个网络中开启了VTP协议的网络设备,客户端上的vlan全部来自与服务端的vlan。就是全网vlan数据库同步,不用自己一个一个的去配置。①VTP具有......
  • 大数据技术Flume框架详解
    Flume的概述Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。Flume基于流式架构,灵活简单。高可用(HA)flume框架(故障转移机制)高......
  • [Bug0044] NT Kernel & System进程占用80端口并且关闭不掉
    1、问题NTKernel&System进程占用80端口并且关闭不掉问题排查cmd命令行运行netstat-ano发现80端口被pid=4的进程占用打开任务管理器,发现pid=4的进程,其实是system......
  • 端口、进程和socket
    端口:端口是tcp/ip五层模型中的概念,用来唯一标识一个应用程序。socket:插座是网络进程通信中的概念,用来唯一标识一个网络进程。因为pid或者其他只能唯一标识本地的进程,如......
  • echarts-dataset数据源配置项
     如下效果图:   代码入下:letbox4=document.querySelector('.box4')letmyCharts3=echarts.init(box4)myCharts3.setOption({......
  • EdgeX使用问题日志:端口被占用
    问题:本来以为可以只使用C语言就可以完成EdgeXFoundry的开发,最终发现go语言是绕不过去的坎,于是从头开始,把go语言相关的教程做一遍,但是在安装了ZeroMq和VsCode之后,Edge的部......
  • 3.Linux更新数据源
    在一个没有安装vim等命令的Linux环境中,没办法更新数据源,没办法软件的安装等的解决方案:编辑数据源vi/etc/apt/sources.list删除全部内容并修改为debhttp://mirrors......