首页 > 其他分享 >pyspark 常用Transform算子

pyspark 常用Transform算子

时间:2023-10-16 17:24:08浏览次数:34  
标签:sc parallelize collect pyspark Transform rdd 算子 print 原始数据

from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName("test_SamShare").setMaster("local[4]")
sc = SparkContext(conf=conf)

# 1.map对每一个元素进行一个映射
rdd = sc.parallelize(range(1, 11), 4)
rdd_map = rdd.map(lambda x: x * 2)
print("rdd", rdd.collect())
print("rdd_map", rdd_map.collect())

# rdd [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# rdd_map [2, 4, 6, 8, 10, 12, 14, 16, 18, 20]

# 2.flatmap把高维的数组变成一维
rdd = sc.parallelize(["hello hadoop", "hello pyspark"])
print("原始数据", rdd.collect())
print("map", rdd.map(lambda x: x.split(" ")).collect())
print("flatmap", rdd.flatMap(lambda x: x.split(" ")).collect())

# 原始数据 ['hello hadoop', 'hello pyspark']
# map [['hello', 'hadoop'], ['hello', 'pyspark']]
# flatmap ['hello', 'hadoop', 'hello', 'pyspark']

# 3.filter过滤数据
rdd =sc.parallelize(range(1, 11), 4)
print("原始数据", rdd.collect())
print("输出偶数", rdd.filter(lambda x: x % 2 == 0).collect())

# 原始数据 [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# 输出偶数 [2, 4, 6, 8, 10]

# 4.distinct去重元素
rdd = sc.parallelize([1, 2, 3, 2, 5, 6, 8, 9, 8, 5, 1])
print("原始数据", rdd.collect())
print("去重数据", rdd.distinct().collect())

# 原始数据 [1, 2, 3, 2, 5, 6, 8, 9, 8, 5, 1]
# 去重数据 [8, 1, 5, 9, 2, 6, 3]


# 5.reduceBykey 根据Key来映射数据

rdd = sc.parallelize([("a", 1), ("b", 1), ("c", 1), ("b", 1), ("c", 1)])
print("原始数据", rdd.collect())
5.1 
print("累加后数据", rdd.reduceByKey(lambda a, b: a + b).collect())

5.2 
from operator import add
print("累加后数据", rdd.reduceByKey(add).collect())

# 原始数据 [('a', 1), ('b', 1), ('c', 1), ('b', 1), ('c', 1)]
# 累加后数据 [('b', 2), ('c', 2), ('a', 1)]

# 6.mapPartitions 根据分区内数据进行映射操作
rdd = sc.parallelize([1, 2, 3, 4, 5], 3)


def f(iterator):
    yield sum(iterator)


print("原始数据", rdd.collect())
print("映射数据", rdd.mapPartitions(f).collect())

# 原始数据 [1, 2, 3, 4, 5]
# 映射数据 [1, 5, 9]


# 7.sortBy根据规则进行排序

rdd = sc.parallelize([('b', 1), ('a', 2), ('d', 3)])
print("原始数据", rdd.collect())
print("排序数据", rdd.sortBy(lambda x: x[0]).collect())
print("排序数据", rdd.sortBy(lambda x: x[1]).collect())

# 原始数据 [('b', 1), ('a', 2), ('d', 3)]
# 排序数据 [('a', 2), ('b', 1), ('d', 3)]
# 排序数据 [('b', 1), ('a', 2), ('d', 3)]

# 8.subtract 数据集相减

x = sc.parallelize([('a', 1), ('b', 2), ('c', 3)])
y = sc.parallelize([('c', 3), ('b', None)])
print(sorted(x.subtract(y).collect()))

# [('a', 1), ('b', 2)]

# 9.union 合并两个RDD

rdd1 = sc.parallelize([1, 2, 3])
rdd2 = sc.parallelize([4, 5, 6])
print(rdd1.union(rdd2).collect())

# [1, 2, 3, 4, 5, 6]

# 10.intersection 取两个RDD的交集且去重

rdd1 = sc.parallelize([1, 2, 3, 4, 5, 6])
rdd2 = sc.parallelize([2, 4, 6, 8, 1])
print(rdd1.intersection(rdd2).collect())

# [1, 2, 4, 6]

# 11.cartesian 生成笛卡尔积

rdd = sc.parallelize([1, 3, 5])
print(rdd.cartesian(rdd).collect())

# [(1, 1), (1, 3), (1, 5), (3, 1), (3, 3), (3, 5), (5, 1), (5, 3), (5, 5)]

# 12.zip  拉链合并 ,需要两个相同长度以及分区数量

x = sc.parallelize(range(0, 5))
y = sc.parallelize(range(1000, 1005))
print(x.collect())
print(y.collect())
print(x.zip(y).collect())

# [0, 1, 2, 3, 4]
# [1000, 1001, 1002, 1003, 1004]
# [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]

# 13.zipWithIndex 将RDD和一个从0开始的递增序列按照拉链方式连接

rdd_name = sc.parallelize(["hive", "spark", "hbase", "hdfs"])
rdd_index = rdd_name.zipWithIndex()
print(rdd_index.collect())

# [('hive', 0), ('spark', 1), ('hbase', 2), ('hdfs', 3)]

# 14.groupByKey  按照KEY来聚合数据

rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
print(rdd.collect())
print(rdd.groupByKey().mapValues(len).collect())
print(rdd.groupByKey().mapValues(list).collect())

# [('a', 1), ('b', 1), ('a', 1)]
# [('b', 1), ('a', 2)]
# [('b', [1]), ('a', [1, 1])]

# 15.sortByKey(True, 2) True 升序,Fales 倒序
rdd = sc.parallelize([("a", 1), ("b", 2), ("1", 3),("c", 1)])

print(rdd.sortByKey(False, 2).collect())

# [('1', 3), ('a', 1), ('b', 2), ('c', 1)]

# 16.join

x = sc.parallelize([('a', 1), ('b', 3)])
y = sc.parallelize([('a', 2), ('c', 1), ('a', 3)])
print(x.join(y).collect())

# [('a', (1, 2)), ('a', (1, 3))]

# 17.leftOutJoin/rightOutJoin

x = sc.parallelize([('a', 1), ('b', 2)])
y = sc.parallelize([('a', 2)])
print(x.leftOuterJoin(y).collect())

# [('a', (1, 2)), ('b', (2, None))]

标签:sc,parallelize,collect,pyspark,Transform,rdd,算子,print,原始数据
From: https://www.cnblogs.com/whiteY/p/17767844.html

相关文章

  • Transformer
    自注意力机制(self-attention)一堆向量asetofvector:词语、图(每个节点可以看作一个向量)一对一:SequenceLabelingself-attention会吃一整个sequence的咨询全连接是定长的,attention是不定长的α计算关联性(自己也得和自己计算关联性)过程:b1b2b3b4是一致同时计算......
  • 利用Transform 画一个图像的反射,倒影
    例子:Snippet279publicclassSnippet279{ publicstaticvoidmain(String[]args){ Displaydisplay=newDisplay(); Shellshell=newShell(display,SWT.SHELL_TRIM|SWT.DOUBLE_BUFFERED); shell.setLayout(newFillLayout()); finalImageimage=d......
  • 2023ICCV_Retinexformer: One-stage Retinex-based Transformer for Low-light Image
    一.Motivation(1)Retinex理论没有考虑到噪声,并且基于Retinex分解的网络通常需要很多阶段训练。(2)直接使用从CNN从低光图像到正常光图像的映射忽略了人类的颜色感知,CNN更适合捕获局部信息,对于捕获远程依赖和非局部自相似性方面存在局限。二.Contribution(1)设计了一个阶段......
  • windows 安装pyspark环境及pycharm配置
    1.安装JDKhttps://www.cnblogs.com/whiteY/p/13332708.html2.安装hadoop2.7下载hadoop2.7.1安装包链接:https://pan.baidu.com/s/1saGhaKbcvwrE4P3F5_UhZQ提取码:1234解压到指定位置3.下载winutils链接:https://pan.baidu.com/s/1L1iRZQcmaw9voQEJzO4bmA提取码:1234......
  • Transformer
    importmathimporttorchfromtorchimportnnimportmatplotlib.pyplotaspltfromd2limporttorchasd2ldefsequence_mask(X,valid_len,value=0):"""在序列中屏蔽不相关的项"""max_len=X.size(1)mask=torch.arange((max......
  • pyspark-sql
    使用spark-sql操作文件进行sql查询示例代码如下if__name__=='__main__':#SparkSession用于SparkSQL编程作为入口对象#用于SparkCore编程,可以通过SparkSession对象中获取到SparkContext#也可以直接进入pyspark客户端省略该步骤./pyspark--masterlocal[*],会自动创建sc......
  • java fx 报错 java.lang.instrument ASSERTION FAILED ***: “!errorOutstanding“ wi
    问题描述在javafx中遇到的错误在fxml中通过了fx:controller绑定了控制器在控制的controller里面使用了FXMLLoader.load获取这个fxml文件出现报错java.lang.instrumentASSERTIONFAILED***:"!errorOutstanding"withmessagetransformmethodcallfailedat......
  • Personalized Transformer for Explainable Recommendation论文阅读笔记
    PersonalizedTransformerforExplainableRecommendation论文阅读笔记摘要​ 自然语言生成的个性化在大量任务中都起着至关重要的作用。比如可解释的推荐,评审总结和对话系统等。在这些任务中,用户和项目ID是个性化的重要标识符。虽然Transfomer拥有强大的语言建模能力,但是没有......
  • 手敲,Ascend算子开发入门笔记分享
    本文分享自华为云社区《Ascend算子开发入门笔记》,作者:JeffDing。基础概念什么是AscendCAscendC是CANN针对算子开发场景推出的编程语言,原生支持C和C++标准规范,最大化匹配用户开发习惯;通过多层接口抽象、自动并行计算、孪生调试等关键技术,极大提高算子开发效率,助力AI开发者......
  • ICCV 2023 | 当尺度感知调制遇上Transformer,会碰撞出怎样的火花?
    作者|AFzzz1文章介绍近年来,基于Transformer和CNN的视觉基础模型取得巨大成功。有许多研究进一步地将Transformer结构与CNN架构结合,设计出了更为高效的hybridCNN-TransformerNetwork,但它们的精度仍然不尽如意。本文介绍了一种新的基础模型SMT(Scale-AwareModulationTransformer......