使用Spark RDD或DataFrame,有时需要在foreachPartition或foreachWith里面保存数据到本地或HDFS。

1. 直接保存数据

  当然如果不需要在map里面保存数据,那么针对RDD可以有如下方式

val rdd = // target rdd
rdd.saveAsHadoopFile // add some parameters

针对DataFrame可以有如下方式保存数据

val df = // target dataframe
// 保存中间数据
df.registerTempTable("temp table name")

// 持久化数据
df.save // 使用save函数,指定模式等参数
df.saveAsParquetFile	// depressed
df.saveAsTable	// depressed

2. foreach里面保存数据

  调用foreachXXX之后,里面的每条记录都是Iterator[YYY]形式的数据,是可迭代数据。

2.1. 保存到文件

保存到文件相对简单,可以直接使用上面的save保存,例如

def save2HDFS(sc: SparkContext, input: Iterator[Row]): Unit = {
	val result = input.map(item => item.getString(0) + "," + item.getInt(1)).toSeq
    val tmpRDD = sc.parallelize(result)
    tmpRDD.saveAsObjectFile("//path") // 1
    tmpRDD.saveAsTextFile("//path") // 2
    tmpRDD.saveAsTextFile("",CompressClass) // 3 内容编码类,继承自org.apache.hadoop.io.compress.CompressionCodec
  }

2.2. 保存到数据库

  在foreachXXX里面,可以将数据保存到数据库,这里使用的方式为JDBC的方式。

def save2DB(input: Iterator[Row]): def save2DB(input: Iterator[Row]): Unit = {

   var temp: Row = null
   while (input.hasNext) {
     temp = input.next // 将迭代数据保存为入库数据
   }

   var dbconn: Connection = null
   var stmt: Statement = null
   try {
     dbconn = DriverManager.getConnection("", "", "")
     stmt = dbconn.createStatement()
     stmt.execute("truncate table TableName")
   } catch {
     case e: Exception => {
       // println(">>>>>>>>>>>>清空表失败")
       // e.printStackTrace()
     }
   } finally {
     { // close connection
       if (stmt != null)
         stmt.close()
       if (dbconn != null)
         dbconn.close()
     }
     { // modify poiner to NULL
       stmt = null
       dbconn = null
     }
   }
 }

3. DataFrame读入写出操作

  DataFrame可以方便的将要各种数据源的数据,读入到内存中,也可以方便的将DF数据写为各种格式的数据。

3.1. 读入操作

sqlContext.read.jdbcsqlContext.read.jdbc// JDBC数据源
sqlContext.read.json// JSON数据源
sqlContext.read.parquet// Parquet数据源

3.2. 写出操作

val tarDF =  val tarDF =  // target dataframe 
tarDF.write.jdbc// 写入JDBC数据库
tarDF.write.json// 写入JSON数据源
tarDF.write.parquet// 写入Parquet数据源

  以上几种数据源,是Spark自身带有驱动程序的。其他文件格式,需要相应的驱动程序,或相应的安装包支持。