TDengine Pom
pom xml
D:\noteMe\md\时序数据库
https://blog.csdn.net/taos_data/article/details/106851359
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>1.0.3</version>
</dependency>
TDengine虽未提供Spark调用的DataSource,但TDengine本身也支持JDBC,因此,这里使用spark-jdbc来读取TDengine,最新版本可以到官网下载,我这里用的是如下版本:
由于TDengine是使用C语言开发的,使用taos-jdbcdriver驱动包时需要依赖系统对应的本地函数库。
- libtaos.so在Linux系统中成功安装TDengine后,依赖的本地函数库libtaos.so文件会被自动拷贝至/usr/lib/libtaos.so,该目录包含在Linux自动扫描路径上,无需单独指定。
2.taos.dll在windows系统中安装完客户端之后,驱动包依赖的taos.dll文件会自动拷贝到系统默认搜索路径C:/Windows/System32下,同样无需要单独指定。
第一次使用时,为了保证机器上有libtaos.so或taos.dll,需要在本地安装TDengine客户端(客户端请至TDengine官网下载)
————————————————
val jdbccdf = spark
.read
.format("jdbc")
.option("url", "jdbc:TAOS://192.168.1.151:6030/log")
.option("driver", "com.taosdata.jdbc.TSDBDriver")
.option("dbtable", "log")
.option("user", "root")
.option("password", "taosdata")
.option("fetchsize", "1000")
.load()
因为在读TDengine的时候,第一个字段ts会被转换为decimal,但是存储时直接存decimal tdengine是不认的,所以需要将ts进行类型转换
jdbccdf.select(($"ts" / 1000000).cast(TimestampType).as("ts"), $"level", $"content", $"ipaddr")
.write.format("jdbc")
.option("url", "jdbc:TAOS://192.168.1.151:6030/test?charset=UTF-8&locale=en_US.UTF-8")
.option("driver", "com.taosdata.jdbc.TSDBDriver")
.option("dbtable", "log2")
.option("user", "root")
.option("password", "taosdata")
.mode(SaveMode.Append)
.save()
- Spark yarn模式运行TDengine
上面的测试都是基于maser为local测试的,如果以yarn模式运行,则在每个节点上都安装TDengine客户端是不现实的,查看taos-jdbcdriver的代码,发现,driver会执行System.load(“taos”),也就是说只要java.library.path中存在 libtaos.so,程序就可正常运行,不必安装TDengine的客户端,因为java.library.path是在jvm启动时就设置好的,要更改它的值,可以采用动态加载,采用如下方法解决了加载libtaos.so的问题:
(1) 将driver端libtaos.so发送到各个executor
spark.sparkContext.addFile("/path/to/libtaos.so")
(2) 重写Spark中JdbcUtils类中的createConnectionFactory方法,添加
loadLibrary(new File(SparkFiles.get("libtaos.so")).getParent)
进行java.library.path的动态加载
def createConnectionFactory(options: JDBCOptions): () => Connection = {
val driverClass: String = options.driverClass
() => {
loadLibrary(new File(SparkFiles.get("libtaos.so")).getParent)
DriverRegistry.register(driverClass)
val driver: Driver = DriverManager.getDrivers.asScala.collectFirst {
case d: DriverWrapper if d.wrapped.getClass.getCanonicalName == driverClass => d
case d if d.getClass.getCanonicalName == driverClass => d
}.getOrElse {
throw new IllegalStateException(
s"Did not find registered driver with class $driverClass")
}
driver.connect(options.url, options.asConnectionProperties)
}
}
(3) loadLibrary方法如下
def loadLibrary(libPath: String): Unit = {
var lib = System.getProperty("java.library.path")
val dirs = lib.split("