首页 > 其他分享 >Apache Arrow User Guide——使用Apache Arrow读写HDFS中的parquet文件

Apache Arrow User Guide——使用Apache Arrow读写HDFS中的parquet文件

时间:2023-01-14 21:08:52浏览次数:48  
标签:std HDFS file HADOOP Arrow parquet Apache include schema


安装一下HADOOP并配置一下LD_LIBRARY_PATH

export HADOOP_VERSION=2.10.1
export HADOOP_HOME=/opt/hadoop-$HADOOP_VERSION

# Add Hadoop Java libraries to your CLASSPATH, and
# add native libraries to LD_LIBRARY_PATH
export CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath --glob`
export HADOOP_OPTS="$HADOOP_OPTS -Djava.library.path=$HADOOP_HOME/lib/native"
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$HADOOP_HOME/lib/native/

这几个库目前用不到,但是CMakeLists.txt里面会用到一个libhdfs.so。后面再说。完事以后就可以用Apache Arrow来读写HDFS的parquet文件了。代码如下,CMakeLists.txt

cmake_minimum_required(VERSION 2.6)
project(lexical_cast)

add_definitions(-std=c++14)
set( ENV{ARROW_LIBHDFS_DIR} /opt/hadoop-2.10.1/lib/native )

include_directories("/usr/local/include" "/usr/include")
link_directories("/usr/local/lib" "/usr/lib/x86_64-linux-gnu" "/opt/hadoop-2.10.1/lib/native")
file( GLOB APP_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/*.cpp)
foreach( sourcefile ${APP_SOURCES} )
file(RELATIVE_PATH filename ${CMAKE_CURRENT_SOURCE_DIR} ${sourcefile})
string(REPLACE ".cpp" "" file ${filename})
add_executable(${file} ${sourcefile})
target_link_libraries(${file} boost_filesystem boost_thread boost_system boost_serialization pthread boost_chrono arrow parquet hdfs)
endforeach( sourcefile ${APP_SOURCES} )

注意到这里比读写本地parquet文件多了一个hdfs库,位于/opt/hadoop-2.10.1/lib/native目录,就是本地HDFS安装的目录,否则会出现找不到链接库文件错误。

写入HDFS parquet文件

#include <arrow/io/hdfs.h>

#include <parquet/stream_writer.h>

#include <iostream>
#include <string>
#include <vector>
#include <map>

struct Article {
std::string name;
float price;
int quantity;
};

std::vector<Article> get_articles() {
std::vector<Article> articles {
Article {"南昌好景色", 35.0f, 20},
Article {"武汉好风景", 24.0f, 30},
Article {"北京王府井", 50.0f, 10}
};
return std::move(articles);
}

int main(int argc, char* argv[]) {

std::shared_ptr<arrow::io::HadoopFileSystem> fs;
std::unordered_map<std::string, std::string> extraConf;
arrow::io::HdfsConnectionConfig connectCfg {"172.18.0.2", 0, "root", "", extraConf};

auto connectRes = arrow::io::HadoopFileSystem::Connect(&connectCfg , &fs);

if(!connectRes.ok()) {
std::cerr << "连接到HDFS失败, Error: " << connectRes.message() << std::endl;
return -1;
}

std::shared_ptr<arrow::io::HdfsOutputStream> out_file;
auto streamRes = fs->OpenWritable("/test.parquet", false, &out_file);

if(!streamRes.ok()) {
std::cerr << "连接到HDFS失败, Error: " << streamRes.message() << std::endl;
return -2;
}

parquet::WriterProperties::Builder builder;
parquet::schema::NodeVector fields;

fields.push_back(parquet::schema::PrimitiveNode::Make(
"name", parquet::Repetition::OPTIONAL, parquet::Type::BYTE_ARRAY,
parquet::ConvertedType::UTF8));

fields.push_back(parquet::schema::PrimitiveNode::Make(
"price", parquet::Repetition::REQUIRED, parquet::Type::FLOAT,
parquet::ConvertedType::NONE, -1));

fields.push_back(parquet::schema::PrimitiveNode::Make(
"quantity", parquet::Repetition::REQUIRED, parquet::Type::INT32,
parquet::ConvertedType::INT_32, -1));

std::shared_ptr<parquet::schema::GroupNode> schema = std::static_pointer_cast<parquet::schema::GroupNode>(
parquet::schema::GroupNode::Make("schema", parquet::Repetition::REQUIRED, fields));

parquet::StreamWriter os {parquet::ParquetFileWriter::Open(out_file, schema, builder.build())};

for(const auto& a: get_articles()) {
os << a.name << a.price << a.quantity << parquet::EndRow;
}

return 0;
}

读出HDFS parquet文件

#include <arrow/io/hdfs.h>

#include <parquet/stream_reader.h>

#include <iostream>

struct Article {
std::string name;
float price;
int quantity;
};

int main(int argc, char* argv[]) {

std::shared_ptr<arrow::io::HadoopFileSystem> fs;
std::unordered_map<std::string, std::string> extraConf;
arrow::io::HdfsConnectionConfig connectCfg {"172.18.0.2", 0, "root", "", extraConf};

auto connectRes = arrow::io::HadoopFileSystem::Connect(&connectCfg , &fs);

if(!connectRes.ok()) {
std::cerr << "连接到HDFS失败, Error: " << connectRes.message() << std::endl;
return -1;
}


std::shared_ptr<arrow::io::HdfsReadableFile> infile;
auto streamRes = fs->OpenReadable("/test.parquet", false, &infile);

if(!streamRes.ok()) {
std::cerr << "连接到HDFS失败, Error: " << streamRes.message() << std::endl;
return -2;
}

parquet::StreamReader is {parquet::ParquetFileReader::Open(infile)};

Article arti;

while(!is.eof()) {
is >> arti.name >> arti.price >> arti.quantity >> parquet::EndRow;
std::cout << arti.name << " " << arti.price << " " << arti.quantity << std::endl;
}

return 0;
}


标签:std,HDFS,file,HADOOP,Arrow,parquet,Apache,include,schema
From: https://blog.51cto.com/feishujun/6007728

相关文章