首页 > 其他分享 >pyspark实践

pyspark实践

时间:2024-04-01 09:56:08浏览次数:34  
标签:instance df pyspark hadoop 实践 rdd sql spark

from pyspark.sql import SparkSession

spark = SparkSession.builder.enableHiveSupport().getOrCreate()
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
spark.conf.set("spark.executor.memory", "10g")

sc = spark.sparkContext
sql = spark.sql

# hdfs文件读取
rdd=sc.hadoopFile(
    data_path,
    inputFormatClass='com.bytedance.hadoop.mapred.PBInputFormat',
    keyClass='org.apache.hadoop.io.BytesWritable',
    valueClass='org.apache.hadoop.io.BytesWritable'
)


def shuffle(instances):
    for instance in instances:
    # 序列化,并生成shuffle key
        yield random.randint(0, 100000), instance

def serialize(line):
    _, instance = line
    uid = instance.line_id.uid
    gid = instance.line_id.gid
    sort_id = (str(uid) +'#' + str(gid)).encode()
    data = instance.SerializeToString()
    return sort_id, data


# shuffle
rdd.mapPartitions(shuffle).sortByKey()

# 写入hdfs
rdd.map(serialize).saveAsHadoopFile(pb_output_path,
                            outputFormatClass='com.bytedance.hadoop.mapred.PBOutputFormat',
                                keyClass='org.apache.hadoop.io.BytesWritable',
                                valueClass='org.apache.hadoop.io.BytesWritable')

# hive表数据读取
source_df = sql(READ_SQL)
rdd = source_df.rdd

# hive表数据写入
columns = ['uid', 'tag', 'c3_300_labels', 'embedding']
df = output_rdd1.toDF(columns,sampleRatio=0.01)
df.createOrReplaceTempView("tmpv")
sql(WRITE_SQL)

 

标签:instance,df,pyspark,hadoop,实践,rdd,sql,spark
From: https://www.cnblogs.com/xumaomao/p/18107807

相关文章

  • 政安晨:【Keras机器学习实践要点】(十)—— 自定义保存和序列化
    目录导言涵盖的APISetup状态保存自定义构建和编译保存自定义结论政安晨的个人主页:政安晨欢迎 ......
  • 政安晨:【Keras机器学习实践要点】(十一)—— 编写自己的回调
    目录导言设置Keras回调概述回调方法概述全局方法用于训练/测试/预测的批量级方法时代级方法(仅限培训)基本示例日志的使用self.model属性的使用Keras回调应用示例提前停机,损失最小学习率调度内置Keras回调政安晨的个人主页:政安晨欢迎 ......
  • Spring Boot框架中的JDK动态代理实践及其应用场景
    引言在Java编程中,JDK动态代理是一种强大的设计模式,它允许我们在运行时动态地创建并实现代理类,从而对目标对象的行为进行增强或控制。这种机制主要由Java标准库java.lang.reflect.Proxy类和java.lang.reflect.InvocationHandler接口提供支持。在诸如SpringBoot这样的企业级开......
  • 图像分类实践
    图像分类实践了解使用简单的卷积神经网络来完成图像分类。实际上,对于一些较为复杂的数据集,简单的卷积神经网络无法达到一个较高的分类准确度,而深度学习实践中的网络结构通常可以达到几十甚至上百层的数目。知识点数据加载器迁移学习猫狗识别卷积神经网络可视化数据集计......
  • 【Python】如何入门 Python:系统化方法与实践路径
    目录前言一、基础知识打牢基础二、选择合适的学习工具三、实践项目加深理解四、深入学习高级主题五、探索数据科学与机器学习六、加入社区与协作七、持续学习与跟进最新动态总结前言    在当今这个数据驱动的时代,Python作为一门强大、易学且应用广泛的编......
  • 状态机入门实践
    状态机是“有限状态自动机”的简称,是一种描述和处理事物状态变化的数学模型。本质上来讲,就是一种比if...else结构更加优雅,并具备可扩展性的状态转移处理机制。有多种实现方案,如:枚举,SpringStatemachine,colastatemachine。枚举状态机通过在枚举中定义方法来实现状态转移,状态定......
  • Python数据库编程全指南SQLite和MySQL实践
    1.安装必要的库首先,我们需要安装Python的数据库驱动程序,以便与SQLite和MySQL进行交互。对于SQLite,Python自带了支持;而对于MySQL,我们需要安装额外的库,如mysql-connector-python。#安装MySQL连接器pipinstallmysql-connector-python2.连接SQLite数据库SQLite是一......
  • 使用Andorid Studio解决app内存泄漏问题方法与实践
    某项目的app运行一段时间(切换页面、触发交互事件等)后就开始严重卡顿,使用top查看内存的使用情况,发现每次操作过后内存都有小幅增长,且永远不下降,存在内存泄露问题。目录1AndoridStudio内存泄露检测工具使用方法2内存泄露实例分析2.1页面切换后未主动释放​编辑2.2回调......
  • 【在线回放】NVIDIA GTC 2024 大会 | 如何降低 AI 工程成本?蚂蚁从训练到推理的全栈实
    本文内容来源于蚂蚁集团AIInfra部门负责人张科,在GTC2024大会ChinaAIDay线上专场的演讲。在演讲中张科分享了AI工程当前的现状和主要挑战,以及蚂蚁集团在AI工程领域的实践经验和开源项目,也欢迎AI工程领域的同行们共同参与开源项目的共建与共享。张科于2009......
  • 8、.NET Core 实践 2024-03-29 11:44 CPU过高
    Windbg指令记录0:008>!runawayUserModeTimeThreadTime7:35c00days0:03:04.9538:111c0days0:03:01.6406:4d300days0:02:57.2815:84240days0:02:52.6400:6fe80days0:00:00.0312:6c280......