首页 > 数据库 >pyspark 环境搭建和相关操作redis ,es

pyspark 环境搭建和相关操作redis ,es

时间:2023-08-03 23:45:18浏览次数:43  
标签:option pyspark redis spark port es row

一.环境搭建

1. 创建虚拟环境, 指定python包

2. 切换到虚拟环境,安装你所需要的python相关模块包

3. 把整个虚拟环境打成.zip

4. 将 zip上传的hadfs

5. spark-submit 指定python包的路径

可以参考 https://dandelioncloud.cn/article/details/1589470996832964609

二. pyspark数据redis

1. 先要在之前的虚拟环境中安装,redis的python相关包

'''
pip3 install pyspark
pip3 install redis-py-cluster==2.1.3 -i http://pypi.douban.com/simple/ --trusted-host pypi.douban.com
'''
from pyspark.sql import SparkSession
from rediscluster import RedisCluster

redis_hosts = [
    {"host": "192.168.2.150", "port": 6379},
    {"host": "192.168.2.150", "port": 6380},
    {"host": "192.168.2.150", "port": 6381},
    {"host": "192.168.2.150", "port": 6382}
]
def write_hive2redis(df):
    json_rdd = df.rdd.map(lambda row:row.asDict())
    def write2redis(partition_data_list):
        redis_conn = RedisCluster(startup_nodes=redis_hosts, password="", decode_responses=True)
        for dic in partition_data_list:
            id = dic.get("id")
            name = dic.get("name")
            # 往redis中写数据
            redis_conn.set(id,name)
    json_rdd.foreachPartition(write2redis)

            
        
if __name__ == '__main__':
    # 创建SparkSession对象
    spark = SparkSession.builder.appName("HiveExample").enableHiveSupport().getOrCreate()
    sql="select * from table1"
    df = spark.sql(sql)
    write_hive2redis(df)
    spark.stop()

 

 或写redis这样批量写

def to_redis(part, batch=500):
    redis_pool = redis.ConnectionPool(host='127.0.0.1', port=26379, db=10, password='password')
    redis_cli = redis.StrictRedis(connection_pool=redis_pool)

    cnt = 0
    pipeline = redis_cli.pipeline()
    for row in part:
        pipeline.set(row.name, "\t".join([row.name, row.source, row.end_format]))
        cnt += 1
        if cnt > 0 and cnt % batch == 0:
            pipeline.execute()
    if cnt % batch != 0:
        pipeline.execute()
    pipeline.close()
    redis_cli.close()


sdf = spark.read.csv("/home/testuser/data/csv/", schema=data_schema, header=False, sep="\t")
sdf.show()
# 按照自定义的写入方式和格式 分片写入到redis
sdf.foreachPartition(functools.partial(to_redis, batch=500))

 

三.  pyspark将数据写入es

参考:https://www.jianshu.com/p/3ccd902f0a03

先下载jar包: elasticsearch-spark-20_2.11-7.6.2.jar

在提交spark-submit时,--jars   elasticsearch-spark-20_2.11-7.6.2.jar 

或 如果有网络的情况下:可以用参数 --packages=org.elasticsearch:elasticsearch-spark-20_2.11-7.6.2.jar

 

park=SparkSession.builder.getOrCreate()

# 读es
df=spark.read.format('org.elasticsearch.spark.sql')\
            .option("spark.es.nodes",es_url)\
            .option("spark.es.port",es_port)\
            .option("es.net.http.auth.user",es_user)\
            .option("es.net.http.auth.pass",es_pass)\
            .option("es.mapping.id","id")\
            .option("es.nodes.wan.only","true")\
            .option("es.write.operation","upsert")\
            .option('es.resource', 'cancer_example/_doc')
# 写es
df.write.format('org.elasticsearch.spark.sql')\
            .option("spark.es.nodes",es_url)\
            .option("spark.es.port",es_port)\
            .option("es.net.http.auth.user",es_user)\
            .option("es.net.http.auth.pass",es_pass)\
            .option("es.mapping.id","id")\
            .option("es.nodes.wan.only","true")\
            .option("es.write.operation","upsert")\
            .option('es.resource', 'cancer_example/_doc').mode("Append").save()


原文链接:https://blog.csdn.net/nanfeizhenkuangou/article/details/121894010

 

标签:option,pyspark,redis,spark,port,es,row
From: https://www.cnblogs.com/knighterrant/p/17604738.html

相关文章

  • spring-mvc系列:详解@RequestMapping注解(value、method、params、header等)
    目录一、@RequestMapping注解的功能二、@RequestMapping注解的位置三、@RequestMapping注解的value属性四、@RequestMapping注解的method属性五、@RequestMapping注解的params属性六、@RequestMapping注解的header属性七、SpringMVC支持ant分格的路径八、SpringMVC支持路径中的占......
  • RHEL4 i386下安装rdesktop【原创】
    http://rpmfind.net/1、根据系统下载rdesktop(1)查看Linux版本:#lsb_release-aLSBVersion::core-3.0-ia32:core-3.0-noarch:graphics-3.0-ia32:graphics-3.0-noarchDistributorID:RedHatEnterpriseASDescription:RedHatEnterpriseLinuxASrelease4(NahantUpdate7......
  • .Net Core AlwaysRunResultFilter
    目录作用实现IAlwaysRunResultFilterIAsyncAlwaysRunResultFilterDemoCustomAsyncAlwaysRunResultFilterAttribute.cs全局注册Program.cs作用修改返回值,始终会触发,即使filter已经中断也会执行AlwaysRunFilter任何时刻都会执行一遍,可以在做了缓存的时候(如果有缓存并中......
  • Java中properties文件中的中文乱码问题
    问题代码:1//目标:使用Properties读取属性文件中的内容。2//1、创建properties对象3Propertiesproperties=newProperties();456//2、使用properties对象加载属性文件中的键值对数据。7properties.load(newFileInputSt......
  • ES profile 性能优化用——返回各个shard的耗时
    ProfileAPI都说要致富先修路,要调优当然需要先监控啦,elasticsearch在很多层面都提供了stats方便你来监控调优,但是还不够,其实很多情况下查询速度慢很大一部分原因是糟糕的查询引起的,玩过SQL的人都知道,数据库服务的执行计划(executionplan)非常有用,可以看到那些查询走没走索引和执行时......
  • SLF4J warning or error messages and their meanings(转)
     Themethod o.a.commons.logging.impl.SLF4FLogFactory#release wasinvoked.Giventhestructureofthecommons-loggingAPI,inparticularasimplementedbySLF4J,the o.a.commons.logging.impl.SLF4FLogFactory#release()methodshouldneverbecalled.However,d......
  • Codeforces Round 449 (Div. 1) D. Nephren Runs a Cinema 卡特兰数
    luogu链接题意不再赘述。优先枚举的应该是\(VIP\)用户,枚举范围应该是\([0,n-l]\)之后总客户数为\(s=n-i\)再考虑枚举\(100\)的总人数为\(x\)则要求\(s-2x\in[l,r]\)这部分方案数应该为从\((0,0)\)到达\((s-x,x)\)且不越过\(y=x\)的方案数。利用折线法求出方案数为\(C(s,x)......
  • Redis持久化
    RDB持久化Redis可以通过创建快照来获得存储在内存里面的数据在 某个时间点 上的副本。Redis创建快照之后,可以对快照进行备份,可以将快照复制到其他服务器从而创建具有相同数据的服务器副本(Redis主从结构,主要用来提高Redis性能),还可以将快照留在原地以便重启服务器的时候使......
  • delegate open and send for XMLHttpRequest by rewrite the prototype
     varsendProxied=window.XMLHttpRequest.prototype.send;window.XMLHttpRequest.prototype.send=function(){varobject={};letdata=arguments[0]if(data&&data.forEach){data.forEach((value,key)=>obj......
  • A07、redis
    review#Spring全家桶#Spring#中间件回顾1.springboot特性版本锁定继承官方的parent起步依赖完成某个功能的坐标集合体自动装配可以通过默认规则创建对象2.yml配置文件简单对象/map集合3.整合junit4.整合ssm5.整合日志今日目标1.掌握......