由于工作需要,基于目前公司集群存在较多的服务器且存在大量的内存,因此考虑直接将数据Load进内存进行数据处理,测试是否能够加快处理速度;鉴于以上目的,版主尝试使用Parquet的Java API读入Parquet文件。
目前关于使用Java API访问HDFS的文章较多,但是没有相关的配置比较容易出错;同时Java API读写Parquet虽然文章较多,但多数为基于本地文件的读写实例。因此,有必要研究一下Java API读写HDFS上的Parquet文件。
开发环境
相关程序的运行,需要一个运行环境。本文的运行环境为:
- Eclipse+Maven
- CDH5.8.0
- JDK1.8.0
需要的相关JAR
- *Hadoop-Common、Hadoop-Client(Maven) *
- parquet-avro-1.5.0-cdh5.8.0
- parquet-format-2.1.0-cdh5.8.0.
- parquet-hadoop-1.5.0-cdh5.8.0
- parquet-column-1.5.0-cdh5.8.0
- htrace-core4-4.0.1-incubating
相关代码(单节点)
public public static Configuration conf;
static {
conf = new Configuration();
conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
}
public static void main(String[] args) throws IllegalArgumentException,
IOException {
long begin = System.currentTimeMillis();
if (args.length < 1) {// Input arguments
System.out.println("Less params");
return;
}
String date = args[0];// Input Date Arguments
String hdfsPath = "hdfs://NameNodeURL:port/user/hive/default/ip24data_parquet_all/pt="
+ date;
HashMap map = new HashMap();
for (int i = 0; i < 5; i++) {
@SuppressWarnings("deprecation")
AvroParquetReader reader = new AvroParquetReader(
conf, new Path(hdfsPath + "//00000" + i + "_0"));
GenericRecord record = null;
while ((record = reader.read()) != null) {
String key = record.get("vin").toString()
+ record.get("data_date").toString();
DATA cnt = new DATA(record.get("vin"), record.get("data_date"),
record.get("latitude"), record.get("longitude"),
record.get("work_model"));
map.put(key, cnt);
}
reader.close();
}
System.out.println("Left APP: "+new Date().toString());
System.out.println("Total TIme used : ms: "+(System.currentTimeMillis()-begin));
System.out.println("Only Load Data: size: "+map.size());
}
需要注意的地方:
- htrace-core4-4.0.1-incubating.jar,可以在相应的环境中找到,或到Maven/Htrace的官网上去找到;Htrace管网上的一般为源码,需要自行编译。
- 关于HDFS识别问题:第一次测试,没有添加任何配置,结果报错:
NO FileSystem for scheme: hdfs,因此添加了fs.hdfs.impl的配置信息。关于HDFS其他的配置信息,可以根据hdfs的配置信息设置。
参考链接
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!