Java代码
import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.streaming.api.java.JavaInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka010.LocationStrategies; import org.apache.spark.streaming.kafka010.KafkaUtils; import org.apache.spark.streaming.kafka010.ConsumerStrategies; import org.apache.kafka.clients.consumer.ConsumerRecord; import java.util.Arrays; import java.util.Properties; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; public class KafkaToPostgreSQL { public static void main(String[] args) throws Exception { SparkConf conf = new SparkConf().setAppName("KafkaToPostgreSQL"); JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(10)); // Kafka配置 Properties kafkaParams = new Properties(); kafkaParams.put("bootstrap.servers", "localhost:9092"); kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); kafkaParams.put("group.id", "use_a_unique_group_id"); kafkaParams.put("auto.offset.reset", "latest"); kafkaParams.put("enable.auto.commit", false); // 创建DStream JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream( ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String>Subscribe(Arrays.asList("topic"), kafkaParams) ); // 将数据写入PostgreSQL messages.foreachRDD(rdd -> { rdd.foreachPartition(partitionOfRecords -> { Connection connection = DriverManager.getConnection("jdbc:postgresql://hostname:port/database", "username", "password"); connection.setAutoCommit(false); PreparedStatement statement = connection.prepareStatement("INSERT INTO your_table (column1, column2) VALUES (?, ?)"); while (partitionOfRecords.hasNext()) { ConsumerRecord<String, String> record = partitionOfRecords.next(); statement.setString(1, record.key()); statement.setString(2, record.value()); statement.executeUpdate(); } connection.commit(); connection.close(); }); }); ssc.start(); ssc.awaitTermination(); } }
标签:postgreSQL,java,kafkaParams,Kafka,org,apache,import,spark From: https://www.cnblogs.com/yeyuzhuanjia/p/18243516