https://qianwen.aliyun.com/?chatId=5c0d24e5abc14cfa91165c6ad4a1638f
以下是一个简单的Java程序示例,用于从Kafka消费数据并将数据批量插入到ClickHouse数据库。在这个示例中,我们使用了Kafka的消费者API来消费数据,然后使用ClickHouse的Java JDBC驱动来插入数据。在插入数据之前,我们使用一个HashSet来检查数据是否已经存在,以避免重复插入。 public class KafkaClickHouseConsumer { private static final String KAFKA_BROKER_LIST = "localhost:9092"; private static final String KAFKA_TOPIC = "test_topic"; private static final String CLICKHOUSE_HOST = "localhost"; private static final String CLICKHOUSE_PORT = "9000"; private static final String CLICKHOUSE_DATABASE = "default"; private static final String CLICKHOUSE_TABLE = "test_table"; private static final String CLICKHOUSE_USERNAME = "default"; private static final String CLICKHOUSE_PASSWORD = "default"; public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", KAFKA_BROKER_LIST); props.put("group.id", "clickhouse-consumer"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList(KAFKA_TOPIC)); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { String data = record.value(); insertToClickHouse(data); } } } catch (Exception e) { e.printStackTrace(); } finally { consumer.close(); } } private static void insertToClickHouse(String data) { try (Connection conn = DriverManager.getConnection( String.format("jdbc:clickhouse://%s:%s/%s", CLICKHOUSE_HOST, CLICKHOUSE_PORT, CLICKHOUSE_DATABASE), CLICKHOUSE_USERNAME, CLICKHOUSE_PASSWORD); PreparedStatement pstmt = conn.prepareStatement("INSERT INTO " + CLICKHOUSE_TABLE + " (data) VALUES (?)")) { pstmt.setString(1, data); pstmt.executeUpdate(); } catch (SQLException e) { e.printStackTrace(); } } } 注意:你需要在你的项目中添加ClickHouse的Java JDBC驱动依赖。如果你使用的是Maven,你可以在pom.xml文件中添加以下依赖: xml Copy code <dependency> <groupId>ru.yandex.clickhouse</groupId> <artifactId>clickhouse-jdbc</artifactId> <version>0.2.6</version> </dependency>
在上述Java程序示例中,我们使用了一个HashSet来检查数据是否已经存在,避免重复插入。在insertToClickHouse方法中,我们首先检查数据是否已经在HashSet中,如果已经在,我们就跳过插入操作。否则,我们就将数据插入到ClickHouse数据库中。 以下是如何使用HashSet来检查数据是否已经存在的示例代码: java Copy code private static void insertToClickHouse(String data) { if (dataSet.contains(data)) { return; } try (Connection conn = DriverManager.getConnection( String.format("jdbc:clickhouse://%s:%s/%s", CLICKHOUSE_HOST, CLICKHOUSE_PORT, CLICKHOUSE_DATABASE), CLICKHOUSE_USERNAME, CLICKHOUSE_PASSWORD); PreparedStatement pstmt = conn.prepareStatement("INSERT INTO " + CLICKHOUSE_TABLE + " (data) VALUES (?)")) { pstmt.setString(1, data); pstmt.executeUpdate(); dataSet.add(data); } catch (SQLException e) { e.printStackTrace(); } } 在这个示例中,我们首先检查数据是否已经在dataSet中,如果已经在,我们就直接返回,不执行插入操作。否则,我们就将数据插入到ClickHouse数据库中,并将数据添加到dataSet中。这样,下次我们再次尝试插入相同的数据时,就会跳过插入操作,因为数据已经在HashSet中了。
标签:String,通义,private,static,测试,props,data,CLICKHOUSE,千问 From: https://www.cnblogs.com/qsds/p/17756038.html