使用Spark RDD或DataFrame,有时需要在foreachPartition或foreachWith里面保存数据到本地或HDFS。
1. 直接保存数据
当然如果不需要在map里面保存数据,那么针对RDD可以有如下方式
val rdd = // target rdd
rdd.saveAsHadoopFile // add some parameters
针对DataFrame可以有如下方式保存数据
val df = // target dataframe
// 保存中间数据
df.registerTempTable("temp table name")
// 持久化数据
df.save // 使用save函数,指定模式等参数
df.saveAsParquetFile // depressed
df.saveAsTable // depressed
2. foreach里面保存数据
调用foreachXXX之后,里面的每条记录都是Iterator[YYY]形式的数据,是可迭代数据。
2.1. 保存到文件
保存到文件相对简单,可以直接使用上面的save保存,例如
def save2HDFS(sc: SparkContext, input: Iterator[Row]): Unit = {
val result = input.map(item => item.getString(0) + "," + item.getInt(1)).toSeq
val tmpRDD = sc.parallelize(result)
tmpRDD.saveAsObjectFile("//path") // 1
tmpRDD.saveAsTextFile("//path") // 2
tmpRDD.saveAsTextFile("",CompressClass) // 3 内容编码类,继承自org.apache.hadoop.io.compress.CompressionCodec
}
2.2. 保存到数据库
在foreachXXX里面,可以将数据保存到数据库,这里使用的方式为JDBC的方式。
def save2DB(input: Iterator[Row]): def save2DB(input: Iterator[Row]): Unit = {
var temp: Row = null
while (input.hasNext) {
temp = input.next // 将迭代数据保存为入库数据
}
var dbconn: Connection = null
var stmt: Statement = null
try {
dbconn = DriverManager.getConnection("", "", "")
stmt = dbconn.createStatement()
stmt.execute("truncate table TableName")
} catch {
case e: Exception => {
// println(">>>>>>>>>>>>清空表失败")
// e.printStackTrace()
}
} finally {
{ // close connection
if (stmt != null)
stmt.close()
if (dbconn != null)
dbconn.close()
}
{ // modify poiner to NULL
stmt = null
dbconn = null
}
}
}
3. DataFrame读入写出操作
DataFrame可以方便的将要各种数据源的数据,读入到内存中,也可以方便的将DF数据写为各种格式的数据。
3.1. 读入操作
sqlContext.read.jdbcsqlContext.read.jdbc// JDBC数据源
sqlContext.read.json// JSON数据源
sqlContext.read.parquet// Parquet数据源
3.2. 写出操作
val tarDF = val tarDF = // target dataframe
tarDF.write.jdbc// 写入JDBC数据库
tarDF.write.json// 写入JSON数据源
tarDF.write.parquet// 写入Parquet数据源
以上几种数据源,是Spark自身带有驱动程序的。其他文件格式,需要相应的驱动程序,或相应的安装包支持。
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!