首页 > 数据库 >spark读取Kafka数据写入postgreSQL

spark读取Kafka数据写入postgreSQL

时间:2024-06-12 10:56:09浏览次数:26  
标签:postgreSQL java kafkaParams Kafka org apache import spark

Java代码

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.util.Arrays;
import java.util.Properties;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
 
public class KafkaToPostgreSQL {
 
    public static void main(String[] args) throws Exception {
        SparkConf conf = new SparkConf().setAppName("KafkaToPostgreSQL");
        JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(10));
 
        // Kafka配置
        Properties kafkaParams = new Properties();
        kafkaParams.put("bootstrap.servers", "localhost:9092");
        kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        kafkaParams.put("group.id", "use_a_unique_group_id");
        kafkaParams.put("auto.offset.reset", "latest");
        kafkaParams.put("enable.auto.commit", false);
 
        // 创建DStream
        JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
                ssc,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.<String, String>Subscribe(Arrays.asList("topic"), kafkaParams)
        );
 
        // 将数据写入PostgreSQL
        messages.foreachRDD(rdd -> {
            rdd.foreachPartition(partitionOfRecords -> {
                Connection connection = DriverManager.getConnection("jdbc:postgresql://hostname:port/database", "username", "password");
                connection.setAutoCommit(false);
                PreparedStatement statement = connection.prepareStatement("INSERT INTO your_table (column1, column2) VALUES (?, ?)");
 
                while (partitionOfRecords.hasNext()) {
                    ConsumerRecord<String, String> record = partitionOfRecords.next();
                    statement.setString(1, record.key());
                    statement.setString(2, record.value());
                    statement.executeUpdate();
                }
 
                connection.commit();
                connection.close();
            });
        });
 
        ssc.start();
        ssc.awaitTermination();
    }
}

 

标签:postgreSQL,java,kafkaParams,Kafka,org,apache,import,spark
From: https://www.cnblogs.com/yeyuzhuanjia/p/18243516

相关文章

  • PostgreSQL教程
    PostgreSQL教程可以按照以下步骤进行,我将根据参考文章中的信息,以清晰、分点的方式进行归纳和总结:一、PostgreSQL简介PostgreSQL是一个功能强大的开源关系型数据库管理系统,基于C语言开发。它提供了丰富的特性和良好的性能,被广泛应用于各种应用场景中。二、PostgreSQL安装......
  • Apache Kafka框架
    简述:ApacheKafka是一个基于发布/订阅模式的分布式流数据处理系统,用于实时事件流处理和数据流中转。其架构包括生产者、代理、主题、分区、消费者和ZooKeeper组件,通过它们的协作实现高吞吐量、可水平扩展、持久性、容错性等特点,适用于构建实时数据管道、实时数据分析等场景。......
  • Kafka源码分析(十八)——Broker:日志子系统——整体架构
    作者简介:大家好,我是smart哥,前中兴通讯、美团架构师,现某互联网公司CTO联系qq:184480602,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬学习必须往深处挖,挖的越深,基础越扎实!阶段1、深入多线程阶段2、深入多线程设计模式阶段3、深入juc源码解析阶段4、深入jdk其余源码解析......
  • Kafka源码分析(十九)——Broker:日志子系统——Log
    作者简介:大家好,我是smart哥,前中兴通讯、美团架构师,现某互联网公司CTO联系qq:184480602,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬学习必须往深处挖,挖的越深,基础越扎实!阶段1、深入多线程阶段2、深入多线程设计模式阶段3、深入juc源码解析阶段4、深入jdk其余源码解析......
  • Kafka 主题 CLI 教程
    KafkaTopicsCLI,即kafka-topics用于创建、删除、描述或更改Kafka中的主题。请确保您已预先启动Kafka 如何创建Kafka主题?要创建Kafka主题,我们需要提供必需的参数:如果是Kafkav2.2+,请使用Kafka主机名和端口,例如,localhost:9092如果是旧版本的Kafka,请使用Zook......
  • spark-3.5.1+Hadoop 3.4.0+Hive4.0 分布式集群 安装配置
    Hadoop安装参考:Hadoop3.4.0+HBase2.5.8+ZooKeeper3.8.4+Hive4.0+Sqoop分布式高可用集群部署安装大数据系列二-CSDN博客一下载:Downloads|ApacheSpark1下载Maven–WelcometoApacheMaven# maven安装及配置教程wgethttps://dlcdn.apache.org/maven/maven-3/......
  • postgresql 基本查询
    建表语句--========sys_dict_typecreatetablesys_dict_type(idbigintprimarykey,namevarchar(100),typevarchar(100),group_codevarchar(100),statuschar(1));commentontablesys_dict_typeis'系统字典类型表';commentoncolumnsys_dict_type.nam......
  • postgresql 数据库基本管理
    逻辑结构PostgreSQL教程--逻辑结构:实例、数据库、schema、表之间的关系数据库基本管理--查询所有数据库selectdatnamefrompg_catalog.pg_database;--创建数据库createdatabasejxwithencoding'UTF8'LC_COLLATE='C'LC_CTYPE='C'TEMPLATE=template1;--查询......
  • 深入解析Kafka消息丢失的原因与解决方案
    深入解析Kafka消息丢失的原因与解决方案ApacheKafka是一种高吞吐量、分布式的消息系统,广泛应用于实时数据流处理。然而,在某些情况下,Kafka可能会出现消息丢失的情况,这对于数据敏感的应用来说是不可接受的。本文将深入解析Kafka消息丢失的各种原因,包括生产者、broker和消费......
  • 深入解析Kafka消息传递的可靠性保证机制
    深入解析Kafka消息传递的可靠性保证机制Kafka在设计上提供了不同层次的消息传递保证,包括atmostonce(至多一次)、atleastonce(至少一次)和exactlyonce(精确一次)。每种保证通过不同的机制实现,下面详细介绍Kafka如何实现这些消息传递保证。1.AtMostOnce(至多一次)在这种模......