3分钟搞懂Arrow Flight SQL,让数据传输提速100倍的秘密
此时,数据分析师小华揉着发酸的眼睛,望着电脑屏幕发呆。
他忍不住抱怨道:“这数据导出也太慢了吧!”
是的,又一次等待MySQL协议传输大批量数据,这感觉像是用吸管在喝一桶水,得喝到什么时候才能见底?
就在小华等待传输过程中,翻查着Doris官方文档,发现了个能提升数据传输效率 100 倍的神器 - Arrow Flight SQL!
传统的数据传输像是在玩"倒水"游戏,要经过好几道工序;而Arrow Flight SQL直接架起了一条高速公路,让数据畅通无阻地奔跑。
接下来,追随着小华的故事,3分钟搞懂Arrow Flight SQL!
数据传输提速100倍!
数据分析师小华最近遇到了一个棘手的问题。
他需要从Apache Doris中读取海量数据进行实时分析,传统的MySQL协议每次查询都要经过繁琐的序列化和反序列化过程,速度慢得让人抓狂。
“有没有更快的数据传输方案?” 小华苦恼地挠着头。
于是,小华在Doris官方不停地翻阅着…终于找到Apache Doris 2.1版本带来了革命性的突破 - Arrow Flight SQL高速数据链路。
这个基于Apache Arrow的解决方案:数据传输性能相较于 MySQL 协议提升了惊人的100倍!
如何做到100倍提升?
小华自言自语喃喃道:“你知道传统的MySQL协议数据传输有多慢吗?”
好比是把一桶水倒进另一个桶里,还要经过一个漏斗。数据从Doris的列存格式要先转成MySQL的行存格式,再由客户端转回列存格式,这个过程浪费了大量时间。
Arrow Flight SQL则完全不同。它直接架设起一条直通管道,数据从Doris直接以Arrow列存格式传输到客户端,无需任何转换。这种零拷贝传输方式让数据传输效率提升了近100倍!
如上图所示,在 Doris 中查询结果以列存格式的 Block 组织。在 2.1 以前版本,可以通过 MySQL Client 或 JDBC/ODBC 驱动传输至目标客户端,需要将行存格式的 Bytes 再反序列化为列存格式。
基于 Arrow Flight SQL 构建高速数据传输链路,若目标客户端同样支持 Arrow 列存格式,整体传输过程将完全避免序列化/反序列化操作,彻底消除因此带来时间及性能损耗。
更令人amazing的是,Arrow Flight SQL还支持多节点并行传输,充分利用现代硬件的多核优势。对于数据科学家和分析师来说,这意味着可以在几秒钟内获取海量数据进行分析,大大提升了工作效率!
让数据传输起飞!
了解完基本原理后,小华迫不及待地想试试这个"神器"。
实际上,在Python和Java中使用Arrow Flight SQL非常简单。
在Python中,只需几步就能快速建立高速数据通道:
# Doris Arrow Flight SQL Test
# step 1, library is released on PyPI and can be easily installed.
# pip install adbc_driver_manager
# pip install adbc_driver_flightsql
import adbc_driver_manager
import adbc_driver_flightsql.dbapi as flight_sql
# step 2, create a client that interacts with the Doris Arrow Flight SQL service.
# Modify arrow_flight_sql_port in fe/conf/fe.conf to an available port, such as 9090.
# Modify arrow_flight_sql_port in be/conf/be.conf to an available port, such as 9091.
conn = flight_sql.connect(uri="grpc://{FE_HOST}:{fe.conf:arrow_flight_sql_port}", db_kwargs={
adbc_driver_manager.DatabaseOptions.USERNAME.value: "root",
adbc_driver_manager.DatabaseOptions.PASSWORD.value: "",
})
cursor = conn.cursor()
# interacting with Doris via SQL using Cursor
def execute(sql):
print("\n### execute query: ###\n " + sql)
cursor.execute(sql)
print("### result: ###")
print(cursor.fetchallarrow().to_pandas())
# step3, execute DDL statements, create database/table, show stmt.
execute("DROP DATABASE IF EXISTS arrow_flight_sql FORCE;")
execute("show databases;")
execute("create database arrow_flight_sql;")
execute("show databases;")
execute("use arrow_flight_sql;")
execute("""CREATE TABLE arrow_flight_sql_test
(
k0 INT,
k1 DOUBLE,
K2 varchar(32) NULL DEFAULT "" COMMENT "",
k3 DECIMAL(27,9) DEFAULT "0",
k4 BIGINT NULL DEFAULT '10',
k5 DATE,
)
DISTRIBUTED BY HASH(k5) BUCKETS 5
PROPERTIES("replication_num" = "1");""")
execute("show create table arrow_flight_sql_test;")
# step4, insert into
execute("""INSERT INTO arrow_flight_sql_test VALUES
('0', 0.1, "ID", 0.0001, 9999999999, '2023-10-21'),
('1', 0.20, "ID_1", 1.00000001, 0, '2023-10-21'),
('2', 3.4, "ID_1", 3.1, 123456, '2023-10-22'),
('3', 4, "ID", 4, 4, '2023-10-22'),
('4', 122345.54321, "ID", 122345.54321, 5, '2023-10-22');""")
# step5, execute queries, aggregation, sort, set session variable
execute("select * from arrow_flight_sql_test order by k0;")
execute("set exec_mem_limit=2000;")
execute("show variables like \"%exec_mem_limit%\";")
execute("select k5, sum(k1), count(1), avg(k3) from arrow_flight_sql_test group by k5;")
# step6, close cursor
cursor.close()
对Java开发者来说,Arrow Flight SQL同样提供了优雅的解决方案。你可以选择JDBC风格的API:
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
Class.forName("org.apache.arrow.driver.jdbc.ArrowFlightJdbcDriver");
String DB_URL = "jdbc:arrow-flight-sql://{FE_HOST}:{fe.conf:arrow_flight_sql_port}?useServerPrepStmts=false"
+ "&cachePrepStmts=true&useSSL=false&useEncryption=false";
String USER = "root";
String PASS = "";
Connection conn = DriverManager.getConnection(DB_URL, USER, PASS);
Statement stmt = conn.createStatement();
ResultSet resultSet = stmt.executeQuery("select * from information_schema.tables;");
while (resultSet.next()) {
System.out.println(resultSet.toString());
}
resultSet.close();
stmt.close();
conn.close();
经过一番测试体验之后,小华在实践中总结出了几个提升性能的关键技巧:
1. 智能批处理
默认的批处理大小是1024行,可以根据实际场景调整对应值:setTargetBatchSize。
对于内存充足的环境,适当增大批处理大小能显著提升吞吐量。
2. 并行加速
Java开发者可以使用FlightClient实现多Endpoint并行读取,更加灵活地利用集群资源。
一个典型的优化是:
FlightClient client = FlightClient.builder()
.setHost("localhost")
.setPort(8080)
.build();
3. 列式计算
保持数据在Arrow格式下进行计算,避免不必要的格式转换。
Python用户可以直接使用pandas进行高效的列式计算:
cursor.fetchallarrow().to_pandas()
...
小结
回到故事的开头,小华用Arrow Flight SQL重构了数据分析流程,查询速度提升了近百倍,内存占用也大幅下降。老板看到这个改进非常满意,还给他加薪升职!
从这件事上,小华也悟出:
技术创新不仅能解决实际问题,还能为个人带来职业发展机会。Arrow Flight SQL好比是给数据插上了翅膀,让数据分析工作真"飞"起来了。
下期,我们将一起探讨其它更有趣有用有价值的内容,敬请期待!
标签:execute,Flight,sql,Arrow,SQL,flight,搞懂 From: https://blog.csdn.net/yzData/article/details/145091708