安装一下HADOOP并配置一下LD_LIBRARY_PATH
export HADOOP_VERSION=2.10.1
export HADOOP_HOME=/opt/hadoop-$HADOOP_VERSION
# Add Hadoop Java libraries to your CLASSPATH, and
# add native libraries to LD_LIBRARY_PATH
export CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath --glob`
export HADOOP_OPTS="$HADOOP_OPTS -Djava.library.path=$HADOOP_HOME/lib/native"
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$HADOOP_HOME/lib/native/
这几个库目前用不到,但是CMakeLists.txt里面会用到一个libhdfs.so。后面再说。完事以后就可以用Apache Arrow来读写HDFS的parquet文件了。代码如下,CMakeLists.txt
cmake_minimum_required(VERSION 2.6)
project(lexical_cast)
add_definitions(-std=c++14)
set( ENV{ARROW_LIBHDFS_DIR} /opt/hadoop-2.10.1/lib/native )
include_directories("/usr/local/include" "/usr/include")
link_directories("/usr/local/lib" "/usr/lib/x86_64-linux-gnu" "/opt/hadoop-2.10.1/lib/native")
file( GLOB APP_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/*.cpp)
foreach( sourcefile ${APP_SOURCES} )
file(RELATIVE_PATH filename ${CMAKE_CURRENT_SOURCE_DIR} ${sourcefile})
string(REPLACE ".cpp" "" file ${filename})
add_executable(${file} ${sourcefile})
target_link_libraries(${file} boost_filesystem boost_thread boost_system boost_serialization pthread boost_chrono arrow parquet hdfs)
endforeach( sourcefile ${APP_SOURCES} )
注意到这里比读写本地parquet文件多了一个hdfs库,位于/opt/hadoop-2.10.1/lib/native目录,就是本地HDFS安装的目录,否则会出现找不到链接库文件错误。
写入HDFS parquet文件
#include <arrow/io/hdfs.h>
#include <parquet/stream_writer.h>
#include <iostream>
#include <string>
#include <vector>
#include <map>
struct Article {
std::string name;
float price;
int quantity;
};
std::vector<Article> get_articles() {
std::vector<Article> articles {
Article {"南昌好景色", 35.0f, 20},
Article {"武汉好风景", 24.0f, 30},
Article {"北京王府井", 50.0f, 10}
};
return std::move(articles);
}
int main(int argc, char* argv[]) {
std::shared_ptr<arrow::io::HadoopFileSystem> fs;
std::unordered_map<std::string, std::string> extraConf;
arrow::io::HdfsConnectionConfig connectCfg {"172.18.0.2", 0, "root", "", extraConf};
auto connectRes = arrow::io::HadoopFileSystem::Connect(&connectCfg , &fs);
if(!connectRes.ok()) {
std::cerr << "连接到HDFS失败, Error: " << connectRes.message() << std::endl;
return -1;
}
std::shared_ptr<arrow::io::HdfsOutputStream> out_file;
auto streamRes = fs->OpenWritable("/test.parquet", false, &out_file);
if(!streamRes.ok()) {
std::cerr << "连接到HDFS失败, Error: " << streamRes.message() << std::endl;
return -2;
}
parquet::WriterProperties::Builder builder;
parquet::schema::NodeVector fields;
fields.push_back(parquet::schema::PrimitiveNode::Make(
"name", parquet::Repetition::OPTIONAL, parquet::Type::BYTE_ARRAY,
parquet::ConvertedType::UTF8));
fields.push_back(parquet::schema::PrimitiveNode::Make(
"price", parquet::Repetition::REQUIRED, parquet::Type::FLOAT,
parquet::ConvertedType::NONE, -1));
fields.push_back(parquet::schema::PrimitiveNode::Make(
"quantity", parquet::Repetition::REQUIRED, parquet::Type::INT32,
parquet::ConvertedType::INT_32, -1));
std::shared_ptr<parquet::schema::GroupNode> schema = std::static_pointer_cast<parquet::schema::GroupNode>(
parquet::schema::GroupNode::Make("schema", parquet::Repetition::REQUIRED, fields));
parquet::StreamWriter os {parquet::ParquetFileWriter::Open(out_file, schema, builder.build())};
for(const auto& a: get_articles()) {
os << a.name << a.price << a.quantity << parquet::EndRow;
}
return 0;
}
读出HDFS parquet文件
#include <arrow/io/hdfs.h>
#include <parquet/stream_reader.h>
#include <iostream>
struct Article {
std::string name;
float price;
int quantity;
};
int main(int argc, char* argv[]) {
std::shared_ptr<arrow::io::HadoopFileSystem> fs;
std::unordered_map<std::string, std::string> extraConf;
arrow::io::HdfsConnectionConfig connectCfg {"172.18.0.2", 0, "root", "", extraConf};
auto connectRes = arrow::io::HadoopFileSystem::Connect(&connectCfg , &fs);
if(!connectRes.ok()) {
std::cerr << "连接到HDFS失败, Error: " << connectRes.message() << std::endl;
return -1;
}
std::shared_ptr<arrow::io::HdfsReadableFile> infile;
auto streamRes = fs->OpenReadable("/test.parquet", false, &infile);
if(!streamRes.ok()) {
std::cerr << "连接到HDFS失败, Error: " << streamRes.message() << std::endl;
return -2;
}
parquet::StreamReader is {parquet::ParquetFileReader::Open(infile)};
Article arti;
while(!is.eof()) {
is >> arti.name >> arti.price >> arti.quantity >> parquet::EndRow;
std::cout << arti.name << " " << arti.price << " " << arti.quantity << std::endl;
}
return 0;
}