由于工作需要,基于目前公司集群存在较多的服务器且存在大量的内存,因此考虑直接将数据Load进内存进行数据处理,测试是否能够加快处理速度;鉴于以上目的,版主尝试使用Parquet的Java API读入Parquet文件。

  目前关于使用Java API访问HDFS的文章较多,但是没有相关的配置比较容易出错;同时Java API读写Parquet虽然文章较多,但多数为基于本地文件的读写实例。因此,有必要研究一下Java API读写HDFS上的Parquet文件。

开发环境

相关程序的运行,需要一个运行环境。本文的运行环境为:

  • Eclipse+Maven
  • CDH5.8.0
  • JDK1.8.0

需要的相关JAR

  • *Hadoop-Common、Hadoop-Client(Maven) *
  • parquet-avro-1.5.0-cdh5.8.0
  • parquet-format-2.1.0-cdh5.8.0.
  • parquet-hadoop-1.5.0-cdh5.8.0
  • parquet-column-1.5.0-cdh5.8.0
  • htrace-core4-4.0.1-incubating

相关代码(单节点)

public public static Configuration conf;
static {
	conf = new Configuration();
	conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
}

public static void main(String[] args) throws IllegalArgumentException,
		IOException {
	long begin = System.currentTimeMillis();
	
	if (args.length < 1) {// Input arguments
		System.out.println("Less params");
		return;
	}
	String date = args[0];// Input Date Arguments
	String hdfsPath = "hdfs://NameNodeURL:port/user/hive/default/ip24data_parquet_all/pt="
			+ date;
	HashMap map = new HashMap();
	for (int i = 0; i < 5; i++) {
		@SuppressWarnings("deprecation")
		AvroParquetReader reader = new AvroParquetReader(
				conf, new Path(hdfsPath + "//00000" + i + "_0"));
		GenericRecord record = null;
		while ((record = reader.read()) != null) {
			String key = record.get("vin").toString()
					+ record.get("data_date").toString();
			DATA cnt = new DATA(record.get("vin"), record.get("data_date"),
					record.get("latitude"), record.get("longitude"),
					record.get("work_model"));
			map.put(key, cnt);
		}
		reader.close();
	}
	
	System.out.println("Left APP: "+new Date().toString());
	System.out.println("Total TIme used : ms: "+(System.currentTimeMillis()-begin));
	System.out.println("Only Load Data: size: "+map.size());
}

需要注意的地方:

  1. htrace-core4-4.0.1-incubating.jar,可以在相应的环境中找到,或到Maven/Htrace的官网上去找到;Htrace管网上的一般为源码,需要自行编译。
  2. 关于HDFS识别问题:第一次测试,没有添加任何配置,结果报错:

NO FileSystem for scheme: hdfs,因此添加了fs.hdfs.impl的配置信息。关于HDFS其他的配置信息,可以根据hdfs的配置信息设置。

参考链接

  1. ParquetWrite Java Code Example
  2. ParquetReader Java Coder Example

本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!

Chromeheadless安装与使用 上一篇
Hadoop集群配置 下一篇