暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Apache Arrow User Guide——使用Apache Arrow读写HDFS中的parquet文件

肥叔菌 2023-01-28
878

安装一下HADOOP并配置一下LD_LIBRARY_PATH

    export HADOOP_VERSION=2.10.1
    export HADOOP_HOME=/opt/hadoop-$HADOOP_VERSION


    # Add Hadoop Java libraries to your CLASSPATH, and
    # add native libraries to LD_LIBRARY_PATH
    export CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath --glob`
    export HADOOP_OPTS="$HADOOP_OPTS -Djava.library.path=$HADOOP_HOME/lib/native"
    export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$HADOOP_HOME/lib/native/
    复制

    这几个库目前用不到,但是CMakeLists.txt里面会用到一个libhdfs.so。后面再说。完事以后就可以用Apache Arrow来读写HDFS的parquet文件了。代码如下,CMakeLists.txt

      cmake_minimum_required(VERSION 2.6)
      project(lexical_cast)


      add_definitions(-std=c++14)
      set( ENV{ARROW_LIBHDFS_DIR} opt/hadoop-2.10.1/lib/native )


      include_directories("/usr/local/include" "/usr/include")
      link_directories("/usr/local/lib" "/usr/lib/x86_64-linux-gnu" "/opt/hadoop-2.10.1/lib/native")
      file( GLOB APP_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/*.cpp)
      foreach( sourcefile ${APP_SOURCES} )
      file(RELATIVE_PATH filename ${CMAKE_CURRENT_SOURCE_DIR} ${sourcefile})
      string(REPLACE ".cpp" "" file ${filename})
      add_executable(${file} ${sourcefile})
      target_link_libraries(${file} boost_filesystem boost_thread boost_system boost_serialization pthread boost_chrono arrow parquet hdfs)
      endforeach( sourcefile ${APP_SOURCES} )
      复制

      注意到这里比读写本地parquet文件多了一个hdfs库,位于/opt/hadoop-2.10.1/lib/native目录,就是本地HDFS安装的目录,否则会出现找不到链接库文件错误。

      写入HDFS parquet文件

        #include <arrow/io/hdfs.h>


        #include <parquet/stream_writer.h>


        #include <iostream>
        #include <string>
        #include <vector>
        #include <map>


        struct Article {
        std::string name;
        float price;
        int quantity;
        };


        std::vector<Article> get_articles() {
        std::vector<Article> articles {
        Article {"南昌好景色", 35.0f, 20},
        Article {"武汉好风景", 24.0f, 30},
        Article {"北京王府井", 50.0f, 10}
        };
        return std::move(articles);
        }


        int main(int argc, char* argv[]) {


        std::shared_ptr<arrow::io::HadoopFileSystem> fs;
        std::unordered_map<std::string, std::string> extraConf;
        arrow::io::HdfsConnectionConfig connectCfg {"172.18.0.2", 0, "root", "", extraConf};


        auto connectRes = arrow::io::HadoopFileSystem::Connect(&connectCfg , &fs);


        if(!connectRes.ok()) {
        std::cerr << "连接到HDFS失败, Error: " << connectRes.message() << std::endl;
        return -1;
        }


        std::shared_ptr<arrow::io::HdfsOutputStream> out_file;
        auto streamRes = fs->OpenWritable("/test.parquet", false, &out_file);


        if(!streamRes.ok()) {
        std::cerr << "连接到HDFS失败, Error: " << streamRes.message() << std::endl;
        return -2;
        }

        parquet::WriterProperties::Builder builder;
        parquet::schema::NodeVector fields;


        fields.push_back(parquet::schema::PrimitiveNode::Make(
        "name", parquet::Repetition::OPTIONAL, parquet::Type::BYTE_ARRAY,
        parquet::ConvertedType::UTF8));


        fields.push_back(parquet::schema::PrimitiveNode::Make(
        "price", parquet::Repetition::REQUIRED, parquet::Type::FLOAT,
        parquet::ConvertedType::NONE, -1));


        fields.push_back(parquet::schema::PrimitiveNode::Make(
        "quantity", parquet::Repetition::REQUIRED, parquet::Type::INT32,
        parquet::ConvertedType::INT_32, -1));

        std::shared_ptr<parquet::schema::GroupNode> schema = std::static_pointer_cast<parquet::schema::GroupNode>(
        parquet::schema::GroupNode::Make("schema", parquet::Repetition::REQUIRED, fields));


        parquet::StreamWriter os {parquet::ParquetFileWriter::Open(out_file, schema, builder.build())};


        for(const auto& a: get_articles()) {
        os << a.name << a.price << a.quantity << parquet::EndRow;
        }

        return 0;
        }
        复制

        读出HDFS parquet文件

          #include <arrow/io/hdfs.h>


          #include <parquet/stream_reader.h>


          #include <iostream>


          struct Article {
          std::string name;
          float price;
          int quantity;
          };


          int main(int argc, char* argv[]) {


          std::shared_ptr<arrow::io::HadoopFileSystem> fs;
          std::unordered_map<std::string, std::string> extraConf;
          arrow::io::HdfsConnectionConfig connectCfg {"172.18.0.2", 0, "root", "", extraConf};


          auto connectRes = arrow::io::HadoopFileSystem::Connect(&connectCfg , &fs);


          if(!connectRes.ok()) {
          std::cerr << "连接到HDFS失败, Error: " << connectRes.message() << std::endl;
          return -1;
          }



          std::shared_ptr<arrow::io::HdfsReadableFile> infile;
          auto streamRes = fs->OpenReadable("/test.parquet", false, &infile);


          if(!streamRes.ok()) {
          std::cerr << "连接到HDFS失败, Error: " << streamRes.message() << std::endl;
          return -2;
          }

          parquet::StreamReader is {parquet::ParquetFileReader::Open(infile)};


          Article arti;


          while(!is.eof()) {
          is >> arti.name >> arti.price >> arti.quantity >> parquet::EndRow;
          std::cout << arti.name << " " << arti.price << " " << arti.quantity << std::endl;
          }

          return 0;
          }
          复制

          欢迎关注微信公众号肥叔菌PostgreSQL数据库专栏:



          文章转载自肥叔菌,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

          评论