首页 > 编程语言 >SparkStreaming in Java

SparkStreaming in Java

时间:2024-01-15 19:44:07浏览次数:38  
标签:Java log4j import apache org spark SparkStreaming appender

参考地址:Spark Streaming Programming Guide

1.新建Maven项目,POM引入依赖

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.13</artifactId>
            <version>3.5.0</version>
        </dependency>

2.项目添加Scala依赖库

image

3.在资源目录添加日志配置文件log4j.properties

log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

4.代码

package cn.coreqi;

import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;

import java.util.Arrays;

public class Main {
    public static void main(String[] args) throws InterruptedException {
        // 创建SparkConf对象
        SparkConf sparkConf = new SparkConf()
                .setMaster("local[*]")
                .setAppName("sparkSql");

        // 第一个参数表示环境配置,第二个参数表示批量处理的周期(采集周期)
        JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(3));

        // 1.从端口获取数据
        JavaReceiverInputDStream<String> lines = ssc.socketTextStream("localhost", 9999);
        // 处理数据
        JavaDStream<String> words = lines.flatMap(l -> Arrays.asList(l.split(" ")).iterator());
        JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));
        JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2);

        // 打印结果
        wordCounts.print();

        // 由于SparkStreaming采集器是长期执行的任务,所以不能直接关闭
        // 如果main方法执行完毕,应用程序也会自动结束,所以不能让main执行完毕
        ssc.start();              // 启动采集器
        
        ssc.awaitTermination();   // 等待采集器的关闭
    }
}

5.测试

安装netcat https://eternallybored.org/misc/netcat/

nc -lp 9999

标签:Java,log4j,import,apache,org,spark,SparkStreaming,appender
From: https://www.cnblogs.com/fanqisoft/p/17966165

相关文章

  • Java中的ThreadLocal和 InheritableThreadLocal
    Java中的ThreadLocal和InheritableThreadLocalpackagecom.example.core.mydemo.java;/***output*Thread-0ThreadLocalvalue:null*Thread-0InheritableThreadLocalvalue:InheritableThreadLocalstring*/publicclassThreadLocalTest{publicstati......
  • 【Java 进阶篇】使用 Stream 流和 Lambda 组装复杂父子树形结构(List 集合形式)
    目录前言一、以部门结构为例1.1实体1.2返回VO1.3具体实现1.4效果展示二、以省市县结构为例2.1实体2.2返回VO2.3具体实现2.4效果展示三、文章小结前言在最近的开发中,一星期内遇到了两个类似的需求:返回组装好的部门树、返回组装好的地区信息树,最终都需要返回List集合对象给前端......
  • Java的类加载机制
    Java的类加载机制是指在Java程序运行时,将类文件加载到内存中的一系列步骤。Java的类加载机制遵循着“按需加载”的原则,也就是说,只有在需要用到某个类的时候,才会将这个类的相关信息加载到内存中。这种“按需加载”的设计使得Java程序具备了很好的灵活性和效率。Java的类加载器......
  • Vue 项目离线安装 ArcGIS for JavaScript
    注意:arcgis-js-api在4.18及之后版本,可以通过npminstall@arcgis/core@4.18.1直接安装在写些博客时,npm能安装的最新版为4.28.10,下面以4.28.10为例,讲解离线安装。在vue3项目中,通过npminstall@arcgis/core@4.28.10安装,但默认是半本地化的,因为assests资源是通过https://js.ar......
  • java Flink 校验接口数据
    要使用Java编写Flink程序来校验接口的数据,可以按照以下步骤进行操作。首先,需要引入相关依赖包。在pom.xml文件中添加如下依赖项:org.apache.flinkflink-streaming-java_2.12{FLINK版本号}其中{FLINK版本号}应该被替换为所使用的Flink版本号。创建一个新的Java类,并导入必要......
  • 精彩推荐 |【Java技术专题】「重塑技术功底」攻破Java技术盲点之剖析动态代理的实现原
    背景介绍在Java编程中,动态代理的应用非常广泛。它被广泛应用于SpringAOP框架、Hibernate数据查询、测试框架的后端mock、RPC以及Java注解对象获取等领域。静态代理和动态代理与静态代理不同,动态代理的代理关系是在运行时确定的,这使得它在灵活性上更胜一筹。相比之下,静态代理的代理......
  • Java小细节之数组什么情况下相等,什么情况下不相等
    int[]a={1,2,3};int[]b=a;System.out.println(a==b);此时输出trueint[]a={1,2,3};int[]b={1,2,3};System.out.println(a==b);此时输出为false这是因为数组的机制,int[]b=a,相当于让b和a同时管理这个数组,a和b都是代表同一个数组,所以a==b是正确的,此时对数......
  • SparkSQL 操作Hive In Java
    本文的前提条件:SparkSQLinJava1.增加POM依赖<dependency><groupId>com.mysql</groupId><artifactId>mysql-connector-j</artifactId><version>8.0.33</version></dependenc......
  • 【JaveWeb教程】(2)Web前端基础:JavaScript入门不再难:一篇文章教你轻松搞定JavaScript的
    目录1介绍2引入方式3基础语法3.1书写语法3.2变量3.3数据类型和运算符4函数4.1第一种定义格式4.2第二种定义格式html完成了架子,css做了美化,但是网页是死的,我们需要给他注入灵魂,所以接下来我们需要学习JavaScript,这门语言会让我们的页面能够和用户进行交互。1介绍通过代......
  • SpringBoot 2.x 正式停更了。Java 8 就看 Solon 的了!
    最近有好多个新闻说:SpringBoot2.x正式停更了,Java8怎么办?当然用Solon喽!Solon,同时支持jdk8,jdk11,jdk17,jdk21。也支持graalvmnativeimage。既支持java8,也支持java21的:@SolonMainpublicclassApp{publicstaticvoidmain(String[]args){Sol......