Spark中的那些坑
前言
在微软内部,除了使用Cosmos+Scope处理大数据外,最近Hadoop+Spark的运用也逐渐多了起来。
一方面Cosmos毕竟是微软自家的东西,很多新招来的员工以前都没有接触过,要熟练运用有一个熟悉的过程;而Hadoop+Spark就不一样了,这可是大数据分析的标配,国外很多大学生毕业的时候已经运用相当熟练了,这样在技术上能很快上手,减少了培训成本;另一方面Hadoop+Spark等是开源产品,代码由社区维护,使用起来成本减少了不少,按照微软一贯的做法,开源软件外边套个壳就可以直接上线跑了。
我们使用的Spark平台称之为MT,我看了目前有17个集群,提供约104+万个token(1个token约等于1.3个CPU核心);我们把Spark脚本写好,通过上传或者使用Python脚本提交到服务器上,就可以运行了。Spark对任务的配置相当地宽松和灵活,可以分别指定Driver和Excuter的核心数、内存、重试次数等,也可以使用一系列的工具对任务进行调优。目前在MT上的Haddop版本和Spark版本都是2.x,据说3.x有非常的大的改动,对性能有进一步的提升,希望微软内部相关人员在测试后能够尽可能早日上线,使大家早日受益。
关于Spark的运用场景我就不多说了,园内有很多大神都写了相关介绍文章进行介绍。总的来说有两方面,一方面是Spark结合大规模的Kafka集群,对大量数据进行近乎实时的数据分析(QPS数千到上万不等),称之为DLIS。在这种场景下,一些实时数据(比如浏览数据、或者传感器数据)放到Kafka队列中,由订阅相关topic的Spark脚本进行消费,部分数据处理后就直接送到DL(深度学习)/ML(机器学习)的模型中去了,近乎实时地、不断地对模型进行训练(adhoc inferencing),微软在这方面也有其它类似的产品(比如Xap)。另一种场景就是DLOP(Deep Learning Offline Processing),可以写一些脚本,对每日生成的数据进行离线分析,同样送到DL/ML的模型中去,使用另一个开源产品Airflow进行定时触发,这种场景很类似Cosmos运用的场景,所以存在平台互换的可能性。
正因为两者的功能可以互换,所以越来越多的Cosmos使用者开始使用Spark。但是,进行语言切换是有使用成本的,.net平台下的语言,换成java/Spark/scala运行的结果会有很大的不同,再此我把使用过程中碰到的Spark语言的一些坑记录下来,以供大家参考,避免在今后的工作中踩坑。
读取配置文件
这是一份普通的配置文件 Sample.config
input {
path = "/aa/bb/cc";
format = "parquet";
saveMode = "overwrite";
}
当Spark读取这份配置后,不会抛出任何错误,但是程序中始终无法读取该input文件,提示路径不对。
解决方法:
java或者Spark的配置文件在每行末尾不需要添加任何结束符号,包括“;”、“,”等,像这个例子中,实际读取到的path是"/aa/bb/cc;",自然是不存在的路径。
正确的配置文件格式
input {
path = "/aa/bb/cc"
format = "parquet"
saveMode = "overwrite"
}
时区陷阱
我们在国际化的程序中,需要对保存的时间进行处理,保存为统一的UTC时间戳。当Spark读取到UTC时间,使用hour函数读取该日期的小时值,在不同时区的电脑上获得的值不同。
示例:
Dataset<Row> data = data.withColumn("Hour", hour(col("SomeTime")));
我们假设某一行SomeTime的值为 2022-10-23 5:17:14 (UTC时间),那么在美国的群集上运行,hour函数取得的值为22(太平洋夏令时时间),在香港的服务器上运行,那么获取到的hour值为13(UTC+8)。
解决方法
在对UTC时间进行计算前先根据本地时区转换为UTC标准时间,例:
在美国的服务器上运行:
Dataset<Row> data = data.withColumn("Hour", hour(to_utc_timestamp(col("SomeTime"), "America/Los_Angeles")));
或
Dataset<Row> data = data.withColumn("Hour", hour(to_utc_timestamp(col("SomeTime"), "GMT-7")));
在香港的服务器上运行:
Dataset<Row> data = data.withColumn("Hour", hour(to_utc_timestamp(col("SomeTime"), "China/Beijing")));
或
Dataset<Row> data = data.withColumn("Hour", hour(to_utc_timestamp(col("SomeTime"), "GMT+8")));
*使用.net处理时区没有问题,不管电脑在那里运行,不会把UTC-0的时间转换为本地时间再进行运算。
怪异的DayOfWeek
一个不难理解的日期函数,在不同的语言上,竟得出不一样的结果。
C#
var date = DateTime.Parse("2022-11-11");
Console.WriteLine(date.DayOfWeek);
结果是:Friday
java:
import java.time.DayOfWeek;
import java.time.LocalDate;
...
LocalDate date = LocalDate.of(2022, 11, 11);
DayOfWeek week = DayOfWeek.from(date);
System.out.println(week);
结果是:FRIDAY
Spark:
Dataset<Row> data = data.withColumn("Hour", dayofweek(col("SomeTime")));
结果是:6
问题原因
Spark用了个很怪的序列存储Dayofweek的值,它的序列从Sunday开始,到Saturday结束,返回的值分别是1,2,3,4,5,6,7,Saturday的值是7,我也是无语了。
解决方法
如果要与java或者.net返回的dayofweek值保持一致,写一个UDF(User Define Function)来进行转换,比如:
/**
* Get the C# DayOfWeek Name by Spark Value
*/
UserDefinedFunction udfGetDayOfWeekNameBySparkValue = udf((Integer dayOfWeek) -> {
if(dayOfWeek > 7 || dayOfWeek < 1 ){
return "0";
}
String[] DayOfWeeks = (String[]) Arrays.asList("0", "Sunday", "Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday").toArray();
return DayOfWeeks[dayOfWeek];
}, StringType);
substring陷阱
在Spark中使用substring函数获取字符串中的一部分,比如说有一个字符串的日期格式是20221204(YYYYMMDD),那么我需要分别获取他的年、月、日,使用如下代码
Dataset<Row> data = data.withColumn("YEAR", substring(col("SomeTime"), 0, 4));
.withColumn("MONTH", substring(col("SomeTime"), 4, 2));
.withColumn("DAY", substring(col("SomeTime"), 6, 2));
结果是:
YEAR --> 2022 --> 正确!
MONTH --> 21 --> 错误!
DAY --> 20 --> 错误!
问题原因
在Spark,substring的下标从1开始,这点和c#或者java有很大的不同。
解决方法
把初始下标换成1,那么就能取得正确的值,如下:
Dataset<Row> data = data.withColumn("YEAR", substring(col("SomeTime"), 1, 4));
.withColumn("MONTH", substring(col("SomeTime"), 5, 2));
.withColumn("DAY", substring(col("SomeTime"), 7, 2));
这点在Spark的官方文档 functions (Spark 3.3.1 JavaDoc) 中有一行小字说明 。
但是,如果是从第一个字符开始取值,那么使用0或者1会取得相同的结果,也就是
Dataset<Row> data = data.withColumn("YEAR", substring(col("SomeTime"), 0, 4));
和
Dataset<Row> data = data.withColumn("YEAR", substring(col("SomeTime"), 1, 4));
取到的值居然是相同的!
但,如果在Spark中使用数组,那么下标仍然是从0开始!晕~
IP地址解析
在工作中,我们经常要对IP地址进行解析,使用C#代码可以方便地对IPV4地址进行校验和划分网段:
string IpAddress = "202.96.205.133";
string[] octets = IpAddress.Split(".");
Console.WriteLine(octets[0]);
输出:202
我们把它转换成java代码:
String IpAddress = "202.96.205.133";
String[] octets = IpAddress.split(".");
System.out.println(octets[0]);
运行,它居然报错了:java.lang.ArrayIndexOutOfBoundsException: Index 0 out of bounds for length 0
问题原因
java的split函数接收的参数其实是正则表达式,一些符号需要做转义,比如 * ^ : | . \ 等。
解决方法
在“.”前加上双斜杠 \\ 即可。
String IpAddress = "202.96.205.133";
String[] octets = IpAddress.split("\\.");
System.out.println(octets[0]);
这样就得到和C#一样的结果。
枚举的数值
有这么一个枚举
C# & java
public enum Color {
Red,
Green,
Blue,
Gray
........
}
有时候我们需要获取枚举的数值,比如Red所代表的数值,使用C#可以方便地取得枚举相应的值:
C#
Color.Red.ToString("D")
或者
(int)Color.Red
输出: 0
但是,如果你用java,却无论如何获取不到枚举所代表的数值。
解决方法
在java中,要获取枚举所代表的数值,需要在枚举定义中添加一些额外的代码。
public enum Color {
Red(0),
Green(1),
Blue(2),
Gray(3)
........
}
private final int value;
private LocationLevelType(int value){
this.value = value;
}
public int getValue() {
return value;
}
这样使用如下代码就可以获取每个枚举所代表的数值了:
Color.Red.value
或者
Color.valueOf("Red").value
输出:0
posexplode函数
有时候我们需要对一个数组类型的列进行拆分,获得每一项的值,单独生成行值和索引,这时候可以使用posexplode函数。在Spark的官方文档 functions (Spark 3.3.1 JavaDoc) 中,明确说明该函数从2.1.0版本就得到支持;但是,我们在Spark2.12环境中,该函数仍然不见踪影。
解决方法
使用selectExpr的方法进行替代,如下所示:
Dataset<Row> data = data.selectExpr("*", "posexplode(some_array) as (index, content)")
为什么我的程序运行那么慢?慎用Count()和Show()
“我的程序运行了好长时间,还停留在初始步骤,我已经分配了足够的资源,输入的数据量也不是特别大啊?”
问题原因
检查一下你的代码,是不是加了很多埋点,使用Count()和Show()来计算Dataset的行数?
在Spark中,Count()和Show()是个非常消耗资源的操作,他会把该中间过程的所有数据都计算出来,然后给出一个统计值,或者若干行的输出。在不缓存数据的情况下,加一个Count()或者Show(),都会使运行时间成倍增长。
解决方法
去掉那些埋点,原先几天都跑不完的代码几小时就跑完了。
在生产环境中运行的代码万万不建议使用Count()和Show()函数。实在不得已要用,也要适当增加计算资源,做好job失败和成倍的运行时间的准备。
为什么我的程序运行那么慢?(2)优化、优化
“什么,这么简单job用了500个token,跑了一天还跑不完?那么多节点重试出错?赶紧去优化!”
问题原因
Spark是个非常灵活的数据处理平台,可以运行各种符合规范的java代码,限制明显比Scope、Sql少很多。它的机制是把任务分解到各个容器中去运行,直到有输出的时候再计算,并没有统一缓存的中间过程,所以运行效率取决于你写程序的好坏,它能做的优化有限。
比方说,原始表有25TB数据,255列,你在代码里Count()了一下、选择某些列进行和其他表join、再从原始表中根据某些条件筛选一些数据,和join的数据union。那么,一共需要扫描三遍原始数据表,读取75TB的数据量,运行时间惊人。
解决方法
选择只需要用到的列(一般不会超过50列),缓存,进行Count()(小心这一步往往会失败),再按照正常步骤join和union,这样读取一遍,缓存5TB数据;如果不需要Count,还可以事先缓存和过滤。
其它
使用java代码解析windows下生成的文本文件要注意换行符是\r\n,不是\n,否则从第二行开始,每一行的开头都会多一个空格。在对对象进行BASE64编码的时候也要选择合适的类,否则和C#编译出来的会不同。
题外话
微软的Cosmos是内部使用的大数据平台,没有对外开放(参见我的聊聊我在微软外服大数据分析部门的工作经历及一些个人见解),.net缺少类似Spark这样的大数据平台和生态,目前微软在这方面的解决方案是Azure上的Datalake(数据湖)+ Synapase SQl;希望.net今后能够向开源大数据发展方向发力,从而打造起自己的生态,和java一争高下!
Intel最近发布了针对数据中心、AI学习的旗舰级志强芯片Intel Xeon Max,它具有56个核心、64GB HBM2e内存,在一些横向测试中比竞品和上一代产品功耗降低、性能提升2~4倍不等。我不知道根据美国芯片法禁令,这样的芯片能不能进口,如果不能进口的话,我国在大数据分析、AI领域方面的差距将进一步与国际拉大。希望我们国家能够自立自强,早日生产出能够运用于商业化的超算芯片,使得华为这样的公司不再处处受排挤。
微软外服工作札记系列
①聊聊我在微软外服大数据分析部门的工作经历及一些个人见解
②聊聊微软的知识管理服务平台和一些编程风格
③窗口函数的介绍
④Spark中的那些坑
⑤微软内部的知识图谱Satori介绍
⑥聊聊我认识的那些印度人