首页 > 其他分享 >SparkStreaming 自定义数据采集器

SparkStreaming 自定义数据采集器

时间:2024-01-15 19:56:55浏览次数:18  
标签:定义数据 SparkStreaming 采集器 new apache import public flg

本文的前提条件: SparkStreaming in Java
参考地址:Spark Streaming Custom Receivers

1.自定义数据采集器

package cn.coreqi.receiver;

import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.receiver.Receiver;

import java.util.Random;

/**
 * 自定义数据采集器
 */
public class MyReceiver extends Receiver<String> {
    private boolean flg = true;

    public boolean isFlg() {
        return flg;
    }

    public void setFlg(boolean flg) {
        this.flg = flg;
    }

    public MyReceiver(){
        super(StorageLevel.MEMORY_ONLY());
    }

    /**
     *
     * @param storageLevel 存储级别
     */
    public MyReceiver(StorageLevel storageLevel) {
        super(storageLevel);
    }

    /**
     * 启动采集器时的操作
     */
    @Override
    public void onStart() {
        new Thread(() -> {
            while (flg){
                try {
                    String message = "采集的数据为:" + new Random().nextInt(10);
                    store(message);
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }).start();
    }

    /**
     * 停止采集器时的操作
     */
    @Override
    public void onStop() {
        setFlg(false);
    }
}

2.注册并使用

package cn.coreqi;

import cn.coreqi.receiver.MyReceiver;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.*;

public class Main {
    public static void main(String[] args) throws InterruptedException {
        // 创建SparkConf对象
        SparkConf sparkConf = new SparkConf()
                .setMaster("local[*]")
                .setAppName("sparkSql");

        // 第一个参数表示环境配置,第二个参数表示批量处理的周期(采集周期)
        JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(3));

        JavaReceiverInputDStream<String> messageDS = ssc.receiverStream(new MyReceiver());
        messageDS.print();
        
        // 由于SparkStreaming采集器是长期执行的任务,所以不能直接关闭
        // 如果main方法执行完毕,应用程序也会自动结束,所以不能让main执行完毕
        ssc.start();              // 启动采集器

        ssc.awaitTermination();   // 等待采集器的关闭
    }
}

标签:定义数据,SparkStreaming,采集器,new,apache,import,public,flg
From: https://www.cnblogs.com/fanqisoft/p/17966182

相关文章

  • SparkStreaming in Java
    参考地址:SparkStreamingProgrammingGuide1.新建Maven项目,POM引入依赖<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.13</artifactId><version>3.5.0</ve......
  • 大一统的监控探针采集器 cprobe
    需求背景监控数据采集领域,比如Prometheus生态有非常多的Exporter,虽然生态繁荣,但是无法达到开箱即用的大一统体验,Exporter体系的核心问题有:良莠不齐:有的Exporter写的非常棒,有的则并不完善,有些监控类别甚至有多个Exporter,选择困难写法各异:Exporter所用的日志库、配置文件管理......
  • 大一统的监控探针采集器 cprobe
    需求背景监控数据采集领域,比如Prometheus生态有非常多的Exporter,虽然生态繁荣,但是无法达到开箱即用的大一统体验,Exporter体系的核心问题有:良莠不齐:有的Exporter写的非常棒,有的则并不完善,有些监控类别甚至有多个Exporter,选择困难写法各异:Exporter所用的日志库、配置文......
  • 谷歌地图数据采集器
    易谷歌地图数据采集大师说明谷歌地图数据采集器(易谷歌地图数据采集大师)是一款采集全球200多个国家或地区客户数据的软件,是你开发外贸客户的好帮手。软件采集数据范围广,功能强,又简单易用。其智能挖掘功能可以全方位获取外贸客户联系方式,包括邮箱、Facebook、推特、Linkin、YouTube......
  • Grafana 自定义数据源支持 RESTful API 查询
    背景数据爆炸的时代,信息化步伐越来越快,接入互联网的服务越来越多。随着业务迭代变更越来越复杂化,需求/产品者对系统的要求越来越高,对业务走势及健康状态需要更直观的感知。这意味着我们需要随时能够“看见”系统的状态,对系统/业务的实时监控以及可视化是技术演进的必然。Grafana......
  • C语言自定义数据类型-结构体
    在讨论自定义数据类型之前,我们不妨先回忆一下C语言的内置类型。例如字符型的char,整型中的intshortlong以及浮点型的floatdouble,这些都会C语言本身提供的数据类型,但仅仅有这些,是不足以满足我们的开发的。那么也就意味着需要一些复杂类型来帮助我们实现对复杂对象的操作,例如结构......
  • C语言【自定义数据类型、typedef、动态内存分配】
    C语言【自定义数据类型、typedef、动态内存分配】一、自定义数据类型。​ 关于下面讲到的所有自定义数据类型(enum、struct、union),有一点要说的是:定义类型不是声明变量,做这步操作时不分配内存,也不能在定义类型时赋值(枚举那个不是赋值,是做一个限定,赋值时赋限定之外的值也不报错。)......
  • 探索向量数据库 | 重新定义数据存储与分析
          随着大模型带来的应用需求提升,最近以来多家海外知名向量数据库创业企业传出融资喜讯。随着AI时代的到来,向量数据库市场空间巨大,目前处于从0-1阶段,预测到2030年,全球向量数据库市场规模有望达到500亿美元,国内向量数据库市场规模有望超过600亿人民币。      今天......
  • Spring自定义数据校验并实现国际化功能
    通常,当我们需要验证用户输入时,SpringMVC提供标准的预定义验证器。我们会引入spring-boot-starter-validation依赖来实现数据校验功能。但是,当我们需要验证特定类型的输入时,我们就需要创建自己的自定义校验逻辑。这里我们取一个相对简单的校验手机号码的功能来实现。为了校验手......
  • 如何定义数据库表之间的关系
    特别说明数据库的正规化是关系型数据库理论的基础。随着数据库的正规化工作的完成,数据库中的各个数据表中的数据关系也就建立起来了。在设计关系型数据库时,最主要的一部分工作是将数据元素如何分配到各个关系数据表中。一旦完成了对这些数据元素的分类,对于数据的操作将依赖于这些数......