安装一下HADOOP并配置一下LD_LIBRARY_PATH
export HADOOP_VERSION=2.10.1
export HADOOP_HOME=/opt/hadoop-$HADOOP_VERSION
# Add Hadoop Java libraries to your CLASSPATH, and
# add native libraries to LD_LIBRARY_PATH
export CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath --glob`
export HADOOP_OPTS="$HADOOP_OPTS -Djava.library.path=$HADOOP_HOME/lib/native"
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$HADOOP_HOME/lib/native/
复制
这几个库目前用不到,但是CMakeLists.txt里面会用到一个libhdfs.so。后面再说。完事以后就可以用Apache Arrow来读写HDFS的parquet文件了。代码如下,CMakeLists.txt
cmake_minimum_required(VERSION 2.6)
project(lexical_cast)
add_definitions(-std=c++14)
set( ENV{ARROW_LIBHDFS_DIR} opt/hadoop-2.10.1/lib/native )
include_directories("/usr/local/include" "/usr/include")
link_directories("/usr/local/lib" "/usr/lib/x86_64-linux-gnu" "/opt/hadoop-2.10.1/lib/native")
file( GLOB APP_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/*.cpp)
foreach( sourcefile ${APP_SOURCES} )
file(RELATIVE_PATH filename ${CMAKE_CURRENT_SOURCE_DIR} ${sourcefile})
string(REPLACE ".cpp" "" file ${filename})
add_executable(${file} ${sourcefile})
target_link_libraries(${file} boost_filesystem boost_thread boost_system boost_serialization pthread boost_chrono arrow parquet hdfs)
endforeach( sourcefile ${APP_SOURCES} )
复制
注意到这里比读写本地parquet文件多了一个hdfs库,位于/opt/hadoop-2.10.1/lib/native目录,就是本地HDFS安装的目录,否则会出现找不到链接库文件错误。
写入HDFS parquet文件
#include <arrow/io/hdfs.h>
#include <parquet/stream_writer.h>
#include <iostream>
#include <string>
#include <vector>
#include <map>
struct Article {
std::string name;
float price;
int quantity;
};
std::vector<Article> get_articles() {
std::vector<Article> articles {
Article {"南昌好景色", 35.0f, 20},
Article {"武汉好风景", 24.0f, 30},
Article {"北京王府井", 50.0f, 10}
};
return std::move(articles);
}
int main(int argc, char* argv[]) {
std::shared_ptr<arrow::io::HadoopFileSystem> fs;
std::unordered_map<std::string, std::string> extraConf;
arrow::io::HdfsConnectionConfig connectCfg {"172.18.0.2", 0, "root", "", extraConf};
auto connectRes = arrow::io::HadoopFileSystem::Connect(&connectCfg , &fs);
if(!connectRes.ok()) {
std::cerr << "连接到HDFS失败, Error: " << connectRes.message() << std::endl;
return -1;
}
std::shared_ptr<arrow::io::HdfsOutputStream> out_file;
auto streamRes = fs->OpenWritable("/test.parquet", false, &out_file);
if(!streamRes.ok()) {
std::cerr << "连接到HDFS失败, Error: " << streamRes.message() << std::endl;
return -2;
}
parquet::WriterProperties::Builder builder;
parquet::schema::NodeVector fields;
fields.push_back(parquet::schema::PrimitiveNode::Make(
"name", parquet::Repetition::OPTIONAL, parquet::Type::BYTE_ARRAY,
parquet::ConvertedType::UTF8));
fields.push_back(parquet::schema::PrimitiveNode::Make(
"price", parquet::Repetition::REQUIRED, parquet::Type::FLOAT,
parquet::ConvertedType::NONE, -1));
fields.push_back(parquet::schema::PrimitiveNode::Make(
"quantity", parquet::Repetition::REQUIRED, parquet::Type::INT32,
parquet::ConvertedType::INT_32, -1));
std::shared_ptr<parquet::schema::GroupNode> schema = std::static_pointer_cast<parquet::schema::GroupNode>(
parquet::schema::GroupNode::Make("schema", parquet::Repetition::REQUIRED, fields));
parquet::StreamWriter os {parquet::ParquetFileWriter::Open(out_file, schema, builder.build())};
for(const auto& a: get_articles()) {
os << a.name << a.price << a.quantity << parquet::EndRow;
}
return 0;
}
复制
读出HDFS parquet文件
#include <arrow/io/hdfs.h>
#include <parquet/stream_reader.h>
#include <iostream>
struct Article {
std::string name;
float price;
int quantity;
};
int main(int argc, char* argv[]) {
std::shared_ptr<arrow::io::HadoopFileSystem> fs;
std::unordered_map<std::string, std::string> extraConf;
arrow::io::HdfsConnectionConfig connectCfg {"172.18.0.2", 0, "root", "", extraConf};
auto connectRes = arrow::io::HadoopFileSystem::Connect(&connectCfg , &fs);
if(!connectRes.ok()) {
std::cerr << "连接到HDFS失败, Error: " << connectRes.message() << std::endl;
return -1;
}
std::shared_ptr<arrow::io::HdfsReadableFile> infile;
auto streamRes = fs->OpenReadable("/test.parquet", false, &infile);
if(!streamRes.ok()) {
std::cerr << "连接到HDFS失败, Error: " << streamRes.message() << std::endl;
return -2;
}
parquet::StreamReader is {parquet::ParquetFileReader::Open(infile)};
Article arti;
while(!is.eof()) {
is >> arti.name >> arti.price >> arti.quantity >> parquet::EndRow;
std::cout << arti.name << " " << arti.price << " " << arti.quantity << std::endl;
}
return 0;
}
复制
欢迎关注微信公众号肥叔菌PostgreSQL数据库专栏:
PostgreSQL数据库守护进程——Postmaster总体流程
PostgreSQL数据库守护进程——读取控制文件
PostgreSQL数据库守护进程——RemovePgTempFiles删除临时文件
PostgreSQL数据库守护进程——RemovePromoteSignalFiles
PostgreSQL数据库信号处理——kill backend
PostgreSQL数据库PMsignal——后端进程\Postmaster信号通信
PostgreSQL数据库后端进程——inter-process latch
PostgreSQL数据库后端进程——监视postmaster death
PostgreSQL数据库后台进程——一等公民
PostgreSQL数据库后台进程——后台一等公民进程保活
PostgreSQL数据库头胎——后台一等公民进程StartupDataBase启动
PostgreSQL数据库头胎——后台一等公民进程StartupDataBase信号通知
PostgreSQL数据库头胎——StarupXLOG函数恢复模式和目标
PostgreSQL数据库状态pmState——PM_STARTUP状态
PostgreSQL数据库复制——Setting Up Asynchronous Replication
PostgreSQL数据库复制——后台一等公民进程WalReceiver启动函数
PostgreSQL数据库复制——后台一等公民进程WalReceiver获知连接
PostgreSQL数据库复制——后台一等公民进程WalReceiver&startup交互
PostgreSQL数据库复制——后台一等公民进程WalReceiver ready_to_display
PostgreSQL数据库复制——后台一等公民进程WalReceiver提取信息
PostgreSQL数据库复制——后台一等公民进程WalReceiver收发逻辑
PostgreSQL数据库复制——后台一等公民进程WalReceiver pg_stat_wal_receiver视图
PostgreSQL数据库复制——walsender后端启动
PostgreSQL数据库守护进程——后台二等公民进程第一波启动maybe_start_bgworkers
PostgreSQL数据库参数——简述GUC
PostgreSQL数据库网络层——libpq连接参数
PostgreSQL数据库动态共享内存管理器——dynamic shared memory segment
PostgreSQL数据库WAL——资源管理器RMGR
PostgreSQL数据库WAL——备机回放checkpoint WAL
PostgreSQL数据库事务系统——phenomena
PostgreSQL数据库统计信息——analyze命令
PostgreSQL数据库统计信息——analyze大致流程
PostgreSQL数据库统计信息——analyze执行函数
PostgreSQL数据库统计信息——查找继承子表find_all_inheritors
PostgreSQL数据库统计信息——analyze流程对不同表的处理
PostgreSQL数据库统计信息——examine_attribute单列预分析
PostgreSQL数据库统计信息——acquire_sample_rows采样函数
PostgreSQL数据库统计信息——acquire_inherited_sample_rows采样函数
PostgreSQL数据库统计信息——计算统计数据
PostgreSQL数据库统计信息——compute_scalar_stats计算统计数
PostgreSQL数据库统计信息——analyze统计信息收集
PostgreSQL数据库统计信息——统计信息系统表
PostgreSQL守护进程(Postmaster)——辅助进程PgStat主流程
PostgreSQL守护进程(Postmaster)——辅助进程PgStat统计消息
PostgreSQL数据库查询监控技术——pg_stat_activity简介
PostgreSQL查询引擎——编译调试
PostgreSQL查询引擎——create table xxx(...)基础建表流程
PostgreSQL查询引擎——create table xxx(...)基础建表transformCreateStmt
PostgreSQL查询引擎——select * from where = transform流程
PostgreSQL数据库查询执行——T_VariableSetStmt
PostgreSQL数据库查询执行——T_TransactionStmt
PostgreSQL数据库查询执行——Parallel Query
PostgreSQL数据库查询执行——SeqScan节点执行
PostgreSQL数据库查询执行——Using GDB To Trace Into a Parallel Worker Spawned By Postmaster During a Large Query
PostgreSQL数据库查询执行——Parallel SeqScan节点执行
PostgreSQL数据库可插拔存储引擎——pg_am系统表
PostgreSQL数据库可插拔存储引擎——Table Access Manager
PostgreSQL数据库可插拔存储引擎——GetTableAmRoutine函数
PostgreSQL数据库可插拔存储引擎——Table scan callbacks
PostgreSQL数据库HeapAM——TupleTableSlot类型
PostgreSQL数据库HeapAM——HeapAM Scan
PostgreSQL数据库HeapAM——HeapAM Parallel table scan
PostgreSQL数据库HeapAM——synchronized scan machinery
PostgreSQL数据库缓冲区管理器——概述
PostgreSQL数据库缓冲区管理器——本地缓冲区管理
PostgreSQL数据库缓冲区管理器——Shared Buffer Pool初始化
PostgreSQL数据库存储介质管理器——SMGR
PostgreSQL数据库存储介质管理器——磁盘管理器
PostgreSQL数据库目录——目录操作封装
PostgreSQL虚拟文件描述符VFD机制——FD LRU池
PostgreSQL虚拟文件描述符VFD机制——FD LRU池其他函数
PostgreSQL数据库FDW——The Internals of PostgreSQL
PostgreSQL数据库FDW——WIP PostgreSQL Sharding
PostgreSQL数据库FDW——Parquet S3 Foreign Data Wrapper
PostgreSQL数据库FDW——Parquet S3 ParquetReader类
PostgreSQL数据库FDW——Parquet S3 ReaderCacheEntry
PostgreSQL数据库FDW——Parquet S3 ParallelCoordinator
PostgreSQL数据库FDW——Parquet S3 DefaultParquetReader类
PostgreSQL数据库FDW——Parquet S3 CachingParquetReader类
PostgreSQL数据库FDW——Parquet S3 ParquetS3FdwExecutionState类
PostgreSQL数据库FDW——Parquet S3 MultifileMergeExecutionStateBaseS3类
PostgreSQL数据库FDW——Parquet S3 读取parquet文件用例
PostgreSQL数据库使用——between and以及日期的使用
PostgreSQL数据库使用——iRedMail定时备份数据库脚本
PostgreSQL数据库使用——iRedMail初始化数据库脚本
PostgreSQL数据库使用——iRedMail创建用户脚本
PostgreSQL数据库插件——定时任务pg_cron
PostgreSQL数据库故障分析——invalid byte sequence for encoding
ETCD、Zookeeper和Consul 分布式数据库的魔法银弹
PostgreSQL数据库高可用——patroni介绍[翻译]
PostgreSQL数据库高可用——patroni配置[翻译]
PostgreSQL数据库高可用——patroni REST API[翻译]
PostgreSQL数据库高可用——将独立集群转换为Patroni集群[翻译]
PostgreSQL数据库高可用——patroni源码学习
PostgreSQL数据库高可用——patroni源码学习——abstract_main
PostgreSQL数据库高可用——patroni源码AbstractPatroniDaemon类
PostgreSQL数据库高可用——patroni源码Patroni子类简介
PostgreSQL数据库高可用——patroni源码PatroniLogger类
PostgreSQL数据库高可用——patroni RestApiServer
PostgreSQL数据库高可用——patroni源码DCS类
PostgreSQL数据库高可用——patroni源码AbstractEtcd类
PostgreSQL数据库高可用——patroni源码EtcdClient类
PostgreSQL数据库高可用——patroni源码Etcd
PostgreSQL数据库高可用——patroni源码学习——Ha类概述
PostgreSQL数据库高可用——Patroni AsyncExecutor类
PostgreSQL数据库高可用——Patroni PostmasterProcess类
PostgreSQL数据库备份恢复迁移——Barman Before you start[翻译]
PostgreSQL数据库备份恢复迁移——Barman Introduction[翻译]
Postgres-XL数据库GTM——概念
Postgres-XL数据库GTM——事务管理
Postgres-XL数据库GTM——GTM and Global Transaction Management[翻译]
Postgres-XL数据库GTM——Master & Standby启动流程
Postgres-XL数据库GTM——Master & Standby子线程
Postgres-XL数据库GTM——Node管理器
Greenplum数据库统计信息——analyze命令 Greenplum数据库统计信息——分布式采样 Greenplum数据库统计信息——auto-analyze特性 Greenplum数据库Hash分布——计算哈希值和映射segment Greenplum数据库Hash分布——GUC gp_use_legacy_hashops Greenplum数据库数据分片策略Hash分布——执行器行为 Greenplum数据库过滤投影——ExecScan执行逻辑 Greenplum数据库外部表——Scan执行节点 Greenplum数据库外部表——fileam封装 Greenplum数据库外部表——external_getnext获取元组 Greenplum数据库外部表——url_curl创建销毁 Greenplum数据库外部协议——Define EXTPROTOCOL Greenplum数据库外部协议——GPHDFS实现协议 Greenplum数据库外部协议——GPHDFS gphdfs_fopen HashData数据库外部表——GPHDFS实现简介 Greenplum数据库高可用——FTS进程 Greenplum数据库高可用——FTS进程ftsConnect Greenplum数据库高可用——FTS进程触发轮询 Greenplum数据库高可用——FTS进程ftsPoll\Send\Receive Greenplum数据库高可用——FTS Pull模型 Greenplum数据库高可用——FTS HandleFtsWalRepProbe函数 Greenplum数据库高可用——FTS HandleFtsWalRepSyncRepOff函数 Greenplum数据库高可用——FTS HandleFtsWalRepPromote函数 Greenplum数据库高可用——FTS processRetry函数 Greenplum数据库高可用——FTS processResponse函数 Greenplum数据库高可用——FTS updateConfiguration更新系统表 Greenplum Python专用库gppylib学习——logging模块 Greenplum Python专用库gppylib学习——GpArray
Greenplum Python专用库gppylib学习——base.py
Greenplum Python工具库gpload学习——gpload类
Greenplum数据库源码分析——Standby Master操作工具分析
Greenplum数据库故障分析——利用GDB调试多线程core文件
Greenplum数据库故障分析——semop(id=,num=11) failed:invalid argument
Greenplum数据库故障分析——能对数据库base文件夹进行软连接嘛
Greenplum数据库故障分析——UDP Packet Lost(packet reassembles failed)
Greenplum数据库故障分析——版本升级后gpstart -a为何返回失败
Greenplum数据库故障分析——can not listen port
HAWQ数据库技术解析——内部架构
HAWQ数据库技术解析(一)——HAWQ简介[转载]
Apache Arrow User Guide——使用Apache Arrow读写HDFS中的parquet文件
文章转载自肥叔菌,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。
评论
相关阅读
周边生态|PGRX for Cloudberry 开源,pgvector for Cloudberry 升级到 0.8.0
HashData
232次阅读
2025-04-11 15:35:07
内核探究|Apache Cloudberry™ PAX 行列混存方案技术解析
HashData
79次阅读
2025-04-16 10:33:54
内核探究|Apache Cloudberry 并行查询技术解析
HashData
75次阅读
2025-03-31 10:00:24
纯干货 | Dolphinscheduler Master模块源码剖析
海豚调度
38次阅读
2025-04-01 18:30:33
Apache SeaTunnel 同步 MySQL 到 Doris 的优化策略
SeaTunnel
32次阅读
2025-03-27 09:58:16
即将发布|Apache Doris MCP Server & Client v0.1.0 版本!
一臻数据
31次阅读
2025-04-01 18:30:04
Doris 数据库更新指南:高效与灵活的完美结合
数据极客圈
30次阅读
2025-03-28 15:02:42
第二期 | Apache DolphinScheduler社区答疑Star评选结果公示
海豚调度
25次阅读
2025-04-21 10:13:00
Apache Doris × AI 的5个应用场景(附完整案例)
一臻数据
25次阅读
2025-04-08 11:01:45
GSoC谷歌编程之夏2025招募中,DolphinScheduler需要你的提案!
海豚调度
25次阅读
2025-04-02 09:39:41