首页 > 编程语言 >使用Python调用Hadoop Hdfs的API

使用Python调用Hadoop Hdfs的API

时间:2023-09-13 09:11:53浏览次数:55  
标签:Hdfs fs java Python Hadoop hadoop org apache path

一、Java调用hdfs的api

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;


public class HadoopClient {

    private static FileSystem fs;

    @Before
    public  void init() throws URISyntaxException, IOException, InterruptedException {
        URI uri=new URI("hdfs://hadoop01:9000");
        Configuration config=new Configuration();
        config.set("dfs.clent.use.datanode.hostname","true");
        config.set("dfs.replication","1");
        fs= FileSystem.get(uri,config,"root");
    }

    @Test
    public void mkdir() throws IOException {
        Path path= new Path("/java");
        fs.mkdirs(path);
    }

    @Test
    public void put() throws IOException {

        Path srcpath= new Path("D:/HadoopDemo/local.txt");
        Path dstpath= new Path("/java/local.txt");

        fs.copyFromLocalFile(srcpath,dstpath);
    }

//    @Test
//    public void rmdir() throws IOException {
//        Path path= new Path("/java");
//        fs.deleteOnExit(path);
//    }
    @After
    public void close() throws IOException {
        fs.close();
    }

}
View Code

看着尚硅谷的hadoop课程学习的,我也尝试着使用Java调用hdfs的api,在调用的时候能正常在hdfs上新建文件夹,当上传本地文件时就报错了,通过hdfs的web页面也可以看到文件名但size=0,应该是namanode起作用了,datanode未起作用。

org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /java/local.txt could only be written to 0 of the 1 minReplication nodes. There are 1 datanode(s) running and 1 node(s) are excluded in this operation.
    at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:2350)
    at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.chooseTargetForNewBlock(FSDirWriteFileOp.java:294)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2989)
    at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:912)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:595)
    at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
    at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:621)
    at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:589)
    at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:573)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1227)
    at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1094)
    at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1017)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:3048)


    at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1567)
    at org.apache.hadoop.ipc.Client.call(Client.java:1513)
    at org.apache.hadoop.ipc.Client.call(Client.java:1410)
    at org.apache.hadoop.ipc.ProtobufRpcEngine2$Invoker.invoke(ProtobufRpcEngine2.java:258)
    at org.apache.hadoop.ipc.ProtobufRpcEngine2$Invoker.invoke(ProtobufRpcEngine2.java:139)
    at jdk.proxy2/jdk.proxy2.$Proxy18.addBlock(Unknown Source)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:531)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:433)
    at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:166)
    at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:158)
    at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:96)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:362)
    at jdk.proxy2/jdk.proxy2.$Proxy19.addBlock(Unknown Source)
    at org.apache.hadoop.hdfs.DFSOutputStream.addBlock(DFSOutputStream.java:1088)
    at org.apache.hadoop.hdfs.DataStreamer.locateFollowingBlock(DataStreamer.java:1915)
    at org.apache.hadoop.hdfs.DataStreamer.nextBlockOutputStream(DataStreamer.java:1717)
    at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:713)
View Code
File /java/local.txt could only be written to 0 of the 1 minReplication nodes. There are 1 datanode(s) running and 1 node(s) are excluded in this operation.

 网上搜解决办法,参考https://blog.csdn.net/xiaoyao_zhy/article/details/127134090,有说是域名配置问题,还有说是磁盘满了的,我还把hdfs重新格式化了下,在前面hadoop部署篇 我配置的core-site.xml的fs.defaultFS属性是locahost,我看别人教程配置的都是节点名称,我当时怀疑是不是这个地方出现的问题,我改成了fs.defaultFS=hdfs://hadoop01:9000,这样就把机器实例名也改成了hadoop01,在云主机/etc/hosts中设置机器名hadoop与id地址映射。

<configuration>
 <property>
 <name>fs.defaultFS</name>
 <value>hdfs://localhost:9000</value>
 </property>
 <property>
 <name>hadoop.tmp.dir</name>
 <value>/home/Hadoop/hadooptest/hdata</value>
 </property>
 </configuration>

在我window本地机子上修改了hosts文件,云主机ip hadoop01。昨天是一顿操作之后,我想着能把上面的问题解决了,可还是报上面一样的问题,我真是服了,都想着放弃了。今天想着既然能不能用python试下,看到底是我hadoop环境部署问题还是java问题,不试不知道,一试有新发现,没想到使用python就能正常从本地上传文件到服务器。所以就有了标题写的python调用hdfs的api,虽然网上教程是用java调用的,那我就使用python按教程来调用。

二、python调用hdfs api

python调用hadoop主要使用PyHDFS类库, pip install PyHDFS即可使用。

1.创建文件夹、上传文件

下面代码是在hdfs上创建了个cyw的文件夹,同时将本地test.txt文本拷贝到hdfs上。下面是打印的结果和hdfs的web显示的文件信息,是能正常显示的,说明python调用hdfs api没问题,说明hadoop环境配置的没问题,应该是java的问题,估计是我java项目哪里配置的不正确,这个后续再说。

# -*- coding: utf-8 -*-
import pyhdfs
fs = pyhdfs.HdfsClient(hosts='hadoop01:9870',user_name='root')

home_directory=fs.get_home_directory()#返回这个用户的根目录
print(home_directory)
active_namenode=fs.get_active_namenode()#返回可用的namenode节点
print(active_namenode)
path='/cyw/'
if not fs.exists(path):
    fs.mkdirs(path) #创建hdfs文件夹
file='test.txt'
file_name=path+file
if not fs.exists(path+file) :
    fs.copy_from_local('test.txt',path+file,) #从本地拷贝文件到hdfs

下面代码是使用pyhdfs操作hdfs api的测试。包括常用的上传、下载、写入、合并、删除等操作。

# -*- coding: utf-8 -*-

import pyhdfs
fs = pyhdfs.HdfsClient(hosts='hadoop01:9870',user_name='root')

home_directory=fs.get_home_directory()#返回这个用户的根目录
print(home_directory)
active_namenode=fs.get_active_namenode()#返回可用的namenode节点
print(active_namenode)
path='/cyw/'
if not fs.exists(path):
    fs.mkdirs(path) #创建hdfs文件夹

# 递归创建文件
path='/cyw/sub1'
if not fs.exists(path) :
    fs.mkdirs(path)

# 从本地拷贝文件到hdfs
file='test.txt'
file_name=path+file
if not fs.exists(path+file) :
    fs.copy_from_local('test.txt',path+file,) #从本地拷贝文件到hdfs

# 写入
file='/cyw/test.txt'
if not fs.exists(file) :
    fs.append(path=file, data="new hello", encoding="utf-8")

# 通过open打开 通过read读取
response=fs.open('/cyw/test.txt')
print(response.read())

# 通过copy_to_local将hdfs文件拷贝到本地
fs.copy_to_local('/cyw/test.txt','test1.txt')

# concat 文件合并 将/java/data.txt合并到/java/move.txt,/java/data.txt文件消失
path='/java/data.txt'
if  fs.exists(path) :
    fs.concat('/java/move.txt',['/java/data.txt'])

# 重命名文件夹
path='/java/move.txt'
if fs.exists(path) :
    fs.rename("/java/move.txt", "/java/new_move.txt")

# 先创建后删除
path='/cyw/delete'
if not fs.exists(path):
    fs.mkdirs(path)
if fs.exists(path):
    fs.delete(path)

path='/cyw/sub1'
if fs.exists(path):
    # recursive不传或Flase 删除必须是空文件夹才能删除
    # recursive=True 递归删除
    fs.delete(path,recursive=True)
# 路径总览信息
path='/cyw/test.txt'
content_summary=fs.get_content_summary(path)
print(content_summary)

# 路径状态
path='/cyw/test.txt'
file_status=fs.get_file_status(path)
print(file_status)

path='/cyw'
file_status=fs.get_file_status(path)
print(file_status)
filetype=file_status.get('type') #获取文件类型
print(filetype)
# 路径校验和
path='/cyw/test.txt'
file_checksum=fs.get_file_checksum(path)
print(file_checksum)

# 获取文件夹下文件列表
path='/cyw'
dirlist=fs.listdir(path)
print(dirlist)
/user/root
hadoop01:9870
b'abc\r\n123new hellonew hellonew hellonew hellonew hello'
ContentSummary(directoryCount=0, ecPolicy='Replicated', fileCount=1, length=53, quota=-1, snapshotDirectoryCount=0, snapshotFileCount=0, snapshotLength=0, snapshotSpaceConsumed=0, spaceConsumed=53, spaceQuota=-1, typeQuota={})
FileStatus(accessTime=1694563976129, blockSize=134217728, childrenNum=0, fileId=16443, group='supergroup', length=53, modificationTime=1694564428530, owner='root', pathSuffix='', permission='644', replication=1, storagePolicy=0, type='FILE')
FileStatus(accessTime=0, blockSize=0, childrenNum=2, fileId=16442, group='supergroup', length=0, modificationTime=1694566764561, owner='root', pathSuffix='', permission='755', replication=0, storagePolicy=0, type='DIRECTORY')
DIRECTORY
FileChecksum(algorithm='MD5-of-0MD5-of-512CRC32C', bytes='0000020000000000000000007e9365afb9323129fbe488ed4bc6071500000000', length=28)
['sub1test.txt', 'test.txt']

 

标签:Hdfs,fs,java,Python,Hadoop,hadoop,org,apache,path
From: https://www.cnblogs.com/5ishare/p/17698311.html

相关文章

  • Python用正则化Lasso、岭回归预测房价、随机森林交叉验证鸢尾花数据可视化2案例
    全文链接:https://tecdat.cn/?p=33632原文出处:拓端数据部落公众号机器学习模型的表现不佳通常是由于过度拟合或欠拟合引起的,我们将重点关注客户经常遇到的过拟合情况。过度拟合是指学习的假设在训练数据上拟合得非常好,以至于对未见数据的模型性能造成负面影响。该模型对于训练数......
  • Python使用 - NumPy用法1
    NumPy的核心数据结构之一是ndarray,表示一个多维数组,他存储的是单一数据类型。 导入包importnumpyasnp 1维数组list1=[1,2,3,4,5,6]ndarr1=np.array(list1,dtype=np.float32)print(type(ndarr1),ndarr1)#<class'numpy.ndarray'>[1.2.3.4.5.6.]......
  • Working With Strings In Python.
    #字符串操作在Python中,`string`是一种不可变的数据类型,用于表示文本或字符序列,可以使用单引号或双引号将字符串括起来。<fontcolor="#C7EDCC">所有修改和生成字符串的操作的实现方法都是另一个内存片段中新生成一个字符串对象。</font>##创建字符串```pystr1="Lefti......
  • C++系列三:Qt-for-Python
    目录代码参考:代码参考:官方文档、博客参考代码参考:self.ui.pushButton.setText("demo")lable=QLabel("<fontcolor=redsize=40>HelloWorld!</font>")lable.show()SignalsandSlots:fromPySide6.QtCoreimportSlot@Slot()defsay_hello():......
  • python实现结构体排序
    python默认提供的列表sort方法,仅支持列表的元素排序。若想实现结构提排序,可参考下面方法:1、使用 lambda表达式1classstruct:2def__init__(self):3self.a=04self.b='a'56struct_list=[]7char_list=['a','b','c']8......
  • Python数据可视化:Matplotlib
    Matplotlib是Python中最受欢迎的数据可视化库之一,它提供了多种绘图函数和参数,可以创建各种类型的图形,包括线图、散点图、柱状图、饼图等等。下面是一些Matplotlib的入门知识和具体案例。安装Matplotlib在开始使用Matplotlib之前,需要先安装它。可以使用pip命令来安装:pipinstallmat......
  • 《Flask Web开发:基于Python的Web应用开发实战》高清高质量PDF电子书+源码
    网盘下载:https://pan.quark.cn/s/cc9dc7402cdb......
  • 《Python数据分析基础教程:NumPy学习指南.第2版》高清高质量PDF电子书+源码
    罕见的NumPy中文入门教程,Python数据分析首选从最基础的知识讲起,手把手带你进入大数据挖掘领域囊括大量具有启发性与实用价值的实战案例下载:https://pan.quark.cn/s/730b594117c0......
  • difflib: Python 比较数据集
    difflib 是一个专注于比较数据集(尤其是字符串)的Python模块。为了具体了解您可以使用此模块完成的几件事,让我们检查一下它的一些最常见的函数。SequenceMatcherSequenceMatcher 是一个比较两个字符串并根据它们的相似性返回数据的函数。通过使用 ratio(),我们将能够根据比率/......
  • python实现md5签名
    在Python中,hashlib.md5是一个用于计算MD5哈希值的模块。MD5是一种常用的哈希算法,它将输入数据转换为固定长度的哈希值。defget_api_sign():"""签名的计算方式:sign=md5(accountId+"zjkj@2023"+time)*2重复两次"""acco......