一、开发环境
- OpenJDK版本 >= 17
- ClickHouse:20.7+
1、支持的数据类型
Format | Support | Comment |
AggregatedFunction | ❌ | limited to |
Array(*) | ✅ | |
Bool | ✅ | |
Date* | ✅ | |
DateTime* | ✅ | |
Decimal* | ✅ |
|
Enum* | ✅ | can be treated as both string and integer |
Geo Types | ✅ | Point, Ring, Polygon, and MultiPolygon |
Int*, UInt* | ✅ | UInt64 is mapped to |
IPv* | ✅ | |
Map(*) | ✅ | |
Nested(*) | ✅ | |
Object('JSON') | ✅ | |
SimpleAggregateFunction | ✅ | |
*String | ✅ | |
Tuple(*) | ✅ | |
UUID | ✅ |
二、Java客户端方式
1、引入依赖
<dependency>
<groupId>com.clickhouse</groupId>
<!-- or clickhouse-grpc-client if you prefer gRPC -->
<artifactId>clickhouse-http-client</artifactId>
<version>0.4.0</version>
</dependency>
2、连接ClickHouse
连接字符串: protocol://host[:port][/database][?param[=value][¶m[=value]][#tag[,tag]]
示例:
http://localhost:8443?ssl=true&sslmode=NONE
http://(https://[email protected]:443
tcp://localhost?!auto_discovery#experimental),(grpc://localhost#experimental)?failover=3#test
ClickHouseNodes servers = ClickHouseNodes.of(
"jdbc:ch:http://server1.domain,server2.domain,server3.domain/my_db"
+ "?load_balancing_policy=random&health_check_interval=5000&failover=2");
3、查询
ClickHouseResponse response = client.connect(endpoint) // or client.connect(endpoints)
// you'll have to parse response manually if using a different format
.format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
.query("select * from numbers(:limit)")
.params(1000).executeAndWait()) {
ClickHouseResponseSummary summary = response.getSummary();
long totalRows = summary.getTotalRowsToRead();
4、流式查询
ClickHouseResponse response = client.connect(endpoint) // or client.connect(endpoints)
// you'll have to parse response manually if using a different format
.format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
.query("select * from numbers(:limit)")
.params(1000).executeAndWait()) {
for (ClickHouseRecord r : response.records()) {
int num = r.getValue(0).asInteger();
// type conversion
String str = r.getValue(0).asString();
LocalDate date = r.getValue(0).asDate();
}
5、Insert
try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP)) {
ClickHouseRequest<?> request = client.connect(servers).format(ClickHouseFormat.RowBinaryWithNamesAndTypes);
// load data into a table and wait until it's completed
request.write()
.query("insert into my_table select c2, c3 from input('c1 UInt8, c2 String, c3 Int32')")
.data(myInputStream).execute().thenAccept(response -> {
response.close();
});
6、多语句操作
在同一会话中一个接一个地在工作线程中执行多个语句:
CompletableFuture<List<ClickHouseResponseSummary>> future = ClickHouseClient.send(servers.get(),
"create database if not exists my_base",
"use my_base",
"create table if not exists test_table(s String) engine=Memory",
"insert into test_table values('1')('2')('3')",
"select * from test_table limit 1",
"truncate table test_table",
"drop table if exists test_table");
// block current thread until queries completed, and then retrieve summaries
List<ClickHouseResponseSummary> results = future.get();
三、JDBC Driver方式
clickhouse jdbc实现了标准jdbc接口。它构建在clickhouse客户端之上,提供了自定义类型映射、事务支持、标准同步UPDATE和DELETE语句等附加功能,因此可以轻松地与遗留应用程序和工具一起使用。
clickhouse jdbc API是同步的,通常会有更多的开销(例如SQL解析和类型映射/转换等)。当性能至关重要时,或者如果您喜欢更直接的访问clickhouse的方式,请考虑使用clickhouse客户端。
1、引入依赖
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.4.0</version>
<!-- use uber jar with all dependencies included, change classifier to http for smaller jar -->
<classifier>all</classifier>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
2、配置
驱动程序类:com.clickhouse.jdbc.ClickHouseDriver
连接字符串:例如:jdbc:(ch|clickhouse)[:<protocol>]://endpoint1[,endpoint2,...][/<database>][?param1=value1¶m2=value2][#tag1,tag2,...]
jdbc:ch://localhost
与jdbc:clickhouse:http://localhost:8123
jdbc:ch:https://localhost
与jdbc:clickhouse:http://localhost:8443?ssl=true&sslmode=STRICT
jdbc:ch:grpc://localhost
与jdbc:clickhouse:grpc://localhost:9100
连接属性:
属性 | 违约 | 描述 |
continueBatchOnError |
| 发生错误时是否继续批处理 |
createDatabaseIfNotExist |
| 如果数据库不存在,是否创建数据库 |
custom_http_headers | 逗号分隔的自定义 HTTP 标头,例如:User-Agent=client1,X-Gateway-Id=123 | |
custom_http_params | 逗号分隔的自定义 HTTP 标头,例如:User-Agent=client1,X-Gateway-Id=123 | |
nullAsDefault |
| 0-将null值按原样处理,并在将null插入不可为null的列时引发异常;1-按原样处理null值,并禁用插入时的null检查;2-将null替换为查询和插入的相应数据类型的默认值 |
jdbcCompliance |
| 是否支持标准同步 UPDATE/DELETE 和fake transaction |
typeMappings | 自定义ClickHouse数据类型和Java类之间的映射,这将影响getColumnType()和getObject(class<?>)的结果。例如:UInt128=java.lang.String,UInt256=java.lang.String | |
wrapperObject |
| getObject()是否应为Array/Tuple返回java.sql.Array/java.sql.Struct。 |
3、连接到ClickHouse
String url = "jdbc:ch://my-server/system"; // use http protocol and port 8123 by default
// String url = "jdbc:ch://my-server:8443/system?ssl=true&sslmode=strict&&sslrootcert=/mine.crt";
Properties properties = new Properties();
// properties.setProperty("ssl", "true");
// properties.setProperty("sslmode", "NONE"); // NONE to trust all servers; STRICT for trusted only
ClickHouseDataSource dataSource = new ClickHouseDataSource(url, new Properties());
try (Connection conn = dataSource.getConnection("default", "password");
Statement stmt = conn.createStatement()) {
}
4、Query
try (Connection conn = dataSource.getConnection(...);
Statement stmt = conn.createStatement()) {
ResultSet rs = stmt.executeQuery("select * from numbers(50000)");
while(rs.next()) {
// ...
}
}
5、Insert
使用 input table函数
具有最佳性能的推荐方式
try (PreparedStatement ps = conn.prepareStatement(
"insert into mytable select col1, col2 from input('col1 String, col2 DateTime64(3), col3 Int32')")) {
// the column definition will be parsed so the driver knows there are 3 parameters: col1, col2 and col3
ps.setString(1, "test"); // col1
ps.setObject(2, LocalDateTime.now()); // col2, setTimestamp is slow and not recommended
ps.setInt(3, 123); // col3
ps.addBatch(); // parameters will be write into buffered stream immediately in binary format
...
ps.executeBatch(); // stream everything on-hand into ClickHouse
}
Insert
它更易于使用,但与输入函数相比,性能更慢
try (PreparedStatement ps = conn.prepareStatement("insert into mytable(* except (description))")) {
// the driver will issue query "select * except (description) from mytable where 0" for type inferring
// since description column is excluded, we know there are only two parameters: col1 and col2
ps.setString(1, "test"); // id
ps.setObject(2, LocalDateTime.now()); // timestamp
ps.addBatch(); // parameters will be write into buffered stream immediately in binary format
...
ps.executeBatch(); // stream everything on-hand into ClickHouse
}
使用占位符新增
不推荐,因为它基于大型 SQL
// Note: "insert into mytable values(?,?,?)" is treated as "insert into mytable"
try (PreparedStatement ps = conn.prepareStatement("insert into mytable values(trim(?),?,?)")) {
ps.setString(1, "test"); // id
ps.setObject(2, LocalDateTime.now()); // timestamp
ps.setString(3, null); // description
ps.addBatch(); // append parameters to the query
...
ps.executeBatch(); // issue the composed query: insert into mytable values(...)(...)...(...)
}
6、高级接口
处理日期时间和时区
请使用java.time.LocalDateTime或java.time.OffsetDateTime代替java.sql.Timestamp,并使用java.time_LocalDate代替java.sql.Date。
try (PreparedStatement ps = conn.prepareStatement("select date_time from mytable where date_time > ?")) {
ps.setObject(2, LocalDateTime.now());
ResultSet rs = ps.executeQuery();
while(rs.next()) {
LocalDateTime dateTime = (LocalDateTime) rs.getObject(1);
}
...
}
7、处理聚合函数
// batch insert using input function
try (ClickHouseConnection conn = newConnection(props);
Statement s = conn.createStatement();
PreparedStatement stmt = conn.prepareStatement(
"insert into test_batch_input select id, name, value from input('id Int32, name Nullable(String), desc Nullable(String), value AggregateFunction(groupBitmap, UInt32)')")) {
s.execute("drop table if exists test_batch_input;"
+ "create table test_batch_input(id Int32, name Nullable(String), value AggregateFunction(groupBitmap, UInt32))engine=Memory");
Object[][] objs = new Object[][] {
new Object[] { 1, "a", "aaaaa", ClickHouseBitmap.wrap(1, 2, 3, 4, 5) },
new Object[] { 2, "b", null, ClickHouseBitmap.wrap(6, 7, 8, 9, 10) },
new Object[] { 3, null, "33333", ClickHouseBitmap.wrap(11, 12, 13) }
};
for (Object[] v : objs) {
stmt.setInt(1, (int) v[0]);
stmt.setString(2, (String) v[1]);
stmt.setString(3, (String) v[2]);
stmt.setObject(4, v[3]);
stmt.addBatch();
}
int[] results = stmt.executeBatch();
...
}
// use bitmap as query parameter
try (PreparedStatement stmt = conn.prepareStatement(
"SELECT bitmapContains(my_bitmap, toUInt32(1)) as v1, bitmapContains(my_bitmap, toUInt32(2)) as v2 from {tt 'ext_table'}")) {
stmt.setObject(1, ClickHouseExternalTable.builder().name("ext_table")
.columns("my_bitmap AggregateFunction(groupBitmap,UInt32)").format(ClickHouseFormat.RowBinary)
.content(new ByteArrayInputStream(ClickHouseBitmap.wrap(1, 3, 5).toBytes()))
.asTempTable()
.build());
ResultSet rs = stmt.executeQuery();
Assert.assertTrue(rs.next());
Assert.assertEquals(rs.getInt(1), 1);
Assert.assertEquals(rs.getInt(2), 0);
Assert.assertFalse(rs.next());
}
大家好,我是Doker品牌的Sinbad,欢迎点赞和评论,您的鼓励是我们持续更新的动力!欢迎加微信进入技术群聊!