首页 > 编程语言 >python操作hdfs

python操作hdfs

时间:2022-11-16 18:33:38浏览次数:50  
标签:hdfs 文件 python client 操作 print path txt

安装

安装hadoop

关于hadoop的安装配置会在另一篇文章中介绍,这里只介绍pythonhdfs库的安装.

安装hdfs库

所有python的三方模块均采用pip来安装.

pip install hdfs

hdfs库的使用

下面将介绍hdfs库的方法列表,并会与hadoop自带的命令行工具进行比较

注:hdfs dfs开头是hadoop自带的命令行工具命令

连接hadoop

通过http协议连接hadoopdatanode节点,默认端口50070

from hdfs.client import Client
client = Client("http://127.0.0.1:50070/")

注:为了节省篇幅,下面的所有代码片段默认包含上两行,此外,后续所有的hdfs指代hadoop的hdfs模块,而非python的hdfs库

list()

list()会列出hdfs指定路径的所有文件信息,接收两个参数

  • hdfs_path 要列出的hdfs路径
  • status 默认为False,是否显示详细信息
print("hdfs中的目录为:", client.list(hdfs_path="/",status=True))

查看hdfs根目录下的文件信息,等同于hdfs dfs -ls /

status()

查看文件或者目录状态,接收两个参数

  • hdfs_path 要列出的hdfs路径
  • strict 是否开启严格模式,严格模式下目录或文件不存在不会返回None,而是raise
print(client.status(hdfs_path="/b.txt",strict=True))

checksum()

checksum() 计算目录下的文件数量,只有一个参数.

print("根目录下的文件数量为:", client.checksum(hdfs_path="/input.txt"))

parts()

列出路径下的part file,接收三个参数

  • hdfs_path 要列出的hdfs路径
  • parts 要显示的parts数量 默认全部显示
  • status 默认为False,是否显示详细信息
print("", client.parts(hdfs_path="/log", parts=0, status=True))

content()

列出目录或文件详情,接收两个参数

  • hdfs_path 要列出的hdfs路径
  • strict 是否开启严格模式,严格模式下目录或文件不存在不会返回None,而是raise
print(client.content(hdfs_path="/",strict=True))

makedirs()

创建目录,同hdfs dfs -mkdirhdfs dfs -chmod的结合体,接收两个参数

  • hdfs_path hdfs路径
  • permission 文件权限
print("创建目录", client.makedirs(hdfs_path="/t", permission="755"))

rename()

文件或目录重命名,接收两个参数

  • hdfs_src_path 原始路径或名称
  • hdfs_dst_path 修改后的文件或路径
client.rename(hdfs_src_path="/d.txt",hdfs_dst_path="/d.bak.txt")

resolve()

返回绝对路径,接收一个参数hdfs_path

print(client.resolve("d.txt"))

set_replication()

设置文件在hdfs上的副本(datanode上)数量,接收两个参数,集群模式下的hadoop默认保存3份

  • hdfs_path hdfs路径
  • replication 副本数量
client.set_replication(hdfs_path="/b.txt",replication=2)

read()

读取文件信息 类似与 hdfs dfs -cat hfds_path,参数如下:

  • hdfs_path hdfs路径
  • offset 读取位置
  • length 读取长度
  • buffer_size 设置buffer_size 不设置使用hdfs默认100MB 对于大文件 buffer够大的化 sort与shuffle都更快
  • encoding 指定编码
  • chunk_size 字节的生成器,必须和encodeing一起使用 满足chunk_size设置即 yield
  • delimiter 设置分隔符 必须和encodeing一起设置
  • progress 读取进度回调函数 读取一个chunk_size回调一次
# 读取200长度
with client.read("/input.txt", length=200, encoding='utf-8') as obj:
    for i in obj:
        print(i)

# 从200位置读取200长度
with client.read("/input.txt", offset=200, length=200, encoding='utf-8') as obj:
    for i in obj:
        print(i)

# 设置buffer为1024,读取
with client.read("/input.txt", buffer_size=1024, encoding='utf-8') as obj:
    for i in obj:
        print(i)

# 设置分隔符为换行
p = client.read("/input.txt", encoding='utf-8', delimiter='\n')
with p as d:
    print(d, type(d), next(d))

# 设置读取每个块的大小为8
p = client.read("/input.txt", encoding='utf-8', chunk_size=8)
with p as d:
    print(d, type(d), next(d))

download()

hdfs下载文件到本地,参数列表如下.

  • hdfs_path hdfs路径
  • local_path 下载到的本地路径
  • overwrite 是否覆盖(如果有同名文件) 默认为Flase
  • n_threads 启动线程数量,默认为1,不启用多线程
  • temp_dir下载过程中文件的临时路径
  • **kwargs其他属性
print("下载文件结果input.txt:", client.download(hdfs_path="/input.txt", local_path="~/",overwrite=True))

等同 hdfs dfs copyToLocal /input ~/

upload()

上传文件到hdfs 同hdfs dfs -copyFromLocal local_file hdfs_path,参数列表如下:

  • hdfs_path, hdfs上位置
  • local_path, 本地文件位置
  • n_threads=1 并行线程数量 temp_dir=None, overwrite=True或者文件已存在的情况下的临时路径
  • chunk_size=2 ** 16 块大小
  • progress=None, 报告进度的回调函数 完成一个chunk_size回调一次 chunk_size可以设置大点 如果大文件的话
  • cleanup=True, 上传错误时 是否删除已经上传的文件
  • **kwargs 上传的一些关键字 一般设置为 overwrite 来覆盖上传
def callback(filename, size):
    print(filename, "完成了一个chunk上传", "当前大小:", size)
    if size == -1:
        print("文件上传完成")
        
# 上传成功返回 hdfs_path
client.upload(hdfs_path="/a_bak14.txt", local_path="a.txt", chunk_size=2 << 19, progress=callback,cleanup=True)

delete()

删除文件,接收三个参数

  • hdfs_path
  • recursive=False 是否递归删除
  • skip_trash=True 是否移到垃圾箱而不是直接删除 hadoop 2.9+版本支持
client.delete("/a.s")

等同hdfs dfs -rm (-r)

set_owner()

类似与 hdfs dfs -chown root root hdfs_path修改目录或文件的所属用户,用户组,接收三个参数

  • hdfs_path hdfs路径
  • owner 用户
  • group 用户组

注意:对于默认用户,只能修改自己的文件.

client.set_owner(hdfs_path="/a.txt", owner="root", group="root")

set_permission

修改权限,类似于hdfs dfs -chmod 777 hdfs_path,接收两个参数

  • hdfs_path hdfs路径
  • permission 权限
client.set_permission(hdfs_path="/b.txt",permission='755')

注意:对于默认用户,只能修改自己的文件.

set_acl()与acl_status()

查看和修改访问权限控制 需要开启acl支持

set_times()

设置文件时间,接收参数如下:

  • hdfs_path: hdfs路径.
  • access_time: 最后访问时间 时间戳 毫秒
  • modification_time: 最后修改时间 时间戳 毫秒
import time

client.set_times(hdfs_path="/b.txt", access_time=int(time.time())*1000,
                 modification_time=int(time.time())*1000)

使用案例

@staticmethod
    def hdfs_put_all(path, h_path='/testkk/', all_files = []):
        """
        批量上传文件
        :param h_path: hdfs 路径
        @Author: kk
        """
        # 首先遍历当前目录所有文件及文件夹
        try:
            client = pyhdfs.HdfsClient(hosts="", user_name="root")
            file_list = os.listdir(path)
            # 准备循环判断每个元素是否是文件夹还是文件,是文件的话,把名称传入list,是文件夹的话,递归
            for file in file_list:
                # 利用os.path.join()方法取得路径全名,并存入cur_path变量,否则每次只能遍历一层目录
                cur_path = os.path.join(path, file)
                # 判断是否是文件夹
                if os.path.isdir(cur_path):
                    client.mkdirs(h_path + file)
                    Property.hdfs_put_all(cur_path, h_path + file + '/', all_files)
                else:
                    all_files.append(file)
                    client.copy_from_local(cur_path, h_path + file)
            return all_files
        except Exception as e:
            print(e)

标签:hdfs,文件,python,client,操作,print,path,txt
From: https://www.cnblogs.com/xiaofubase/p/16897087.html

相关文章

  • 20221115-Python列表与元组
    1.列表的概念:  列表是可变对象  2.列表元素的新增与删除    3.列表的下标和切片同字符串一致4.元组   ......
  • ArcGIS Python API可视化及分析系列教程(一):入门与简介(2)安装与配置
    前文再续,本节主要讲安装……前置要求:1、有Python软件安装的经验。2、离线安装的话,需要有ArcGISJavascriptAPI部署经验和能力。如果这两个都从来没有弄过的话,就用在线的......
  • Python实验报告——第10章 文件及目录操作
    Python实验报告——第10章文件及目录操作 实验报告【实验目的】 1.掌握Python自带的函数进行基本文件操作。2.掌握Python内置的os模块及其子模块os.path进行目......
  • Python实验报告——第8章 模块
    Python实验报告——第8章模块 实验报告【实验目的】 1.掌握Python内置的标准模块和第三方模块的使用。【实验条件】1.PC机或者远程编程环境。 【实验内容......
  • C语言《数据结构与数据库/操作系统》实验测试数据集
    C语言《数据结构与数据库/操作系统》实验测试数据集实验二、栈的应用注意需要根据实验内容文件实现相应的数据结构——栈,以及菜单(程序要能循环使用,不要计算一次就必须重......
  • python JSON模块
    一、JSON介绍JSON(JavaScriptObjectNotation)是一种轻量级的数据交换格式,易于人阅读和编写。二、常用方法方法描述json.loads()将JSON字符串转化为Python对......
  • 银河麒麟服务器操作系统安装达梦数据库DM8
    DM8达梦数据库安装(银河麒麟服务器V10)一、准备工作:1、银河麒麟服务器操作系统iso(官方网站申请试用,普通下载地址太慢,建议使用Bt软件下载,不要用迅雷,可能存在数据下载不一致)......
  • python垃圾回收机制
    python垃圾回收机制主要分为:1.引用计数2.标记清除3.分代回收python的引用计数机制:python是根据对象的引用计数是否为0,来进行垃圾回收,释放内......
  • python的文件操作
    步骤1、打开文件:使用内置函数open2、进行操作(读或者写)读:read方法或者写:write方法3、关闭文件close方法#1、打开文件,返回文件的句柄f=open(file="xxx",mode=......
  • python源码通过词语标记化器tokenize提取注释并正则匹配测试用例作者名
    提取代码如下importtokenizeimportrewithtokenize.open('readcomment.py')asf:list=[]fortoktype,tok,start,end,lineintokenize.generate_t......