根据业务需求,将Kafka中数据抽取插入到Hbase中。目前网上可以找到许多相关的文章,这里介绍Github上的一个开源工具。

  上一章节讲到选择SparkOnHbase为主要原型,将之修改为我们需要的源代码。这里给出修改之后的源代码,修改之后符合我们的业务需求,并尽量避免引起其他不必要的问题。同时,后期优化程序执行效率问题。

源代码

class HBaseContext(class HBaseContext(
  @transient sc:        SparkContext,
  @transient config:    Configuration,
  metas:                java.util.HashMap[String, java.util.HashMap[String, java.util.HashMap[String, ColumnInfo]]],
  val tmpHdfsConfgFile: String                                                                                      = null) extends Serializable with Logging {

  @transient var credentials = SparkHadoopUtil.get.getCurrentUserCredentials()
  @transient var tmpHdfsConfiguration: Configuration = config
  @transient var appliedCredentials = false;
  @transient var metasLocal = metas
  @transient val job = new Job(config)
  TableMapReduceUtil.initCredentials(job)
  val broadcastedConf = sc.broadcast(new SerializableWritable(config))
  val credentialsConf = sc.broadcast(new SerializableWritable(job.getCredentials()))
  val broadcastMetas = sc.broadcast(metas)

  if (tmpHdfsConfgFile != null && config != null) {
    val fs = FileSystem.newInstance(config)
    val tmpPath = new Path(tmpHdfsConfgFile)
    if (!fs.exists(tmpPath)) {
      val outputStream = fs.create(tmpPath)
      config.write(outputStream)
      outputStream.close();
    } else {
      logWarning("tmpHdfsConfigDir " + tmpHdfsConfgFile + " exist!!")
    }
  }

  def mapPartition[T, R: ClassTag](
    rdd: RDD[T],
    mp:  (Iterator[T], HConnection) => Iterator[R]): RDD[R] = {

    rdd.mapPartitions[R](it => hbaseMapPartition[T, R](
      broadcastedConf,
      it,
      mp), true)
  }

  def applyCreds[T](configBroadcast: Broadcast[SerializableWritable[Configuration]]) {

    credentials = SparkHadoopUtil.get.getCurrentUserCredentials()

    logInfo("appliedCredentials:" + appliedCredentials + ",credentials:" + credentials);

    if (appliedCredentials == false && credentials != null) {
      appliedCredentials = true
      logCredInformation(credentials)

      @transient val ugi = UserGroupInformation.getCurrentUser();
      ugi.addCredentials(credentials)
      ugi.setAuthenticationMethod(AuthenticationMethod.PROXY)
      ugi.addCredentials(credentialsConf.value.value)

    }
  }

  def logCredInformation[T](credentials2: Credentials) {
    logInfo("credentials:" + credentials2);
    for (a <- 0 until credentials2.getAllSecretKeys.size()) {
      logInfo("getAllSecretKeys:" + a + ":" + credentials2.getAllSecretKeys.get(a));
    }
    val it = credentials2.getAllTokens.iterator();
    while (it.hasNext) {
      logInfo("getAllTokens:" + it.next());
    }
  }

  def bulkMutation[T](rdd: RDD[T], fun: (T) => (DataEntity), autoFlush: Boolean) {
    
    rdd.foreachPartition(
      it => {
        hbaseForeachPartition[T](
          broadcastedConf, broadcastMetas,
          it,
          (iter, hConnection, metas) => {

            iter.foreach(item => {

              val entity = fun(item)
              val dbName = entity.dbName
              val tabName = entity.tabName
              if (metas.containsKey(dbName) && metas.get(dbName).containsKey(tabName)) {

                val htable = hConnection.getTable(entity.dbName + ":" + entity.tabName)
                htable.setAutoFlush(autoFlush, true)

                entity.`type` match {
                  case "INSERT" | "insert" => {
                    val insertPuts = Instance.insert(entity, metas)
                    if (null != insertPuts && insertPuts.size() > 0)
                      htable.batch(insertPuts)
                  }

                  case "UPDATE" | "update" => {
                    val updatePuts = Instance.update(entity, metas)
                    if (null != updatePuts && updatePuts.size() > 0)
                      htable.batch(updatePuts)
                  }

                  case "DELETE" | "delete" => {
                    val deleteDels = Instance.delete(entity)
                    if (null != deleteDels && deleteDels.size() > 0)
                      htable.batch(deleteDels)
                  }
                  
                  case all: Any => {
                    logInfo("其他操作:" + all)
                  }
                }
                
                htable.flushCommits()
                htable.close()
              }
            })
          })
      })
  }

  def hbaseRDD[U: ClassTag](tableName: String, scan: Scan, f: ((ImmutableBytesWritable, Result)) => U): RDD[U] = {

    var job: Job = new Job(getConf(broadcastedConf))

    TableMapReduceUtil.initCredentials(job)
    TableMapReduceUtil.initTableMapperJob(tableName, scan, classOf[IdentityTableMapper], null, null, job)

    sc.newAPIHadoopRDD(
      job.getConfiguration(),
      classOf[TableInputFormat],
      classOf[ImmutableBytesWritable],
      classOf[Result]).map(f)
  }

  def hbaseRDD(tableName: String, scans: Scan): RDD[(Array[Byte], java.util.List[(Array[Byte], Array[Byte], Array[Byte])])] = {

    hbaseRDD[(Array[Byte], java.util.List[(Array[Byte], Array[Byte], Array[Byte])])](
      tableName,
      scans,
      (r: (ImmutableBytesWritable, Result)) => {
        val it = r._2.list().iterator()
        val list = new ArrayList[(Array[Byte], Array[Byte], Array[Byte])]()

        while (it.hasNext()) {
          val kv = it.next()
          list.add((kv.getFamily(), kv.getQualifier(), kv.getValue()))
        }

        (r._1.copyBytes(), list)
      })
  }

  private def hbaseForeachPartition[T](
    configBroadcast: Broadcast[SerializableWritable[Configuration]],
    metasBroadcast:  Broadcast[HashMap[String, HashMap[String, HashMap[String, ColumnInfo]]]],
    it:              Iterator[T],
    fun:               (Iterator[T], HConnection, HashMap[String, HashMap[String, HashMap[String, ColumnInfo]]]) => Unit) = {
    
    val config = getConf(configBroadcast)
    val metas = getMetas(metasBroadcast)
    applyCreds(configBroadcast)
    val hConnection = HConnectionManager.createConnection(config)
    fun(it, hConnection, metas)
    hConnection.close()

  }

  /**
   * @desc get METAS from broadcast or driver's configure
   */
  private def getMetas(metasBroadcast: Broadcast[HashMap[String, HashMap[String, HashMap[String, ColumnInfo]]]]): HashMap[String, HashMap[String, HashMap[String, ColumnInfo]]] = {

    if (null != metasLocal) {
      return metasLocal
    } else {
      try {
        metasLocal = metasBroadcast.value
        metasLocal
      } catch {
        case ex: Exception => {
          logInfo("Unable to getConfig from broadcast")
        }
      }
    }
    metasLocal
  }

  private def getConf(configBroadcast: Broadcast[SerializableWritable[Configuration]]): Configuration = {

    if (tmpHdfsConfiguration != null) {
      tmpHdfsConfiguration
    } else if (tmpHdfsConfgFile != null) {

      val fs = FileSystem.newInstance(SparkHadoopUtil.get.conf)

      val inputStream = fs.open(new Path(tmpHdfsConfgFile))
      tmpHdfsConfiguration = new Configuration(false)
      tmpHdfsConfiguration.readFields(inputStream)
      inputStream.close()

      tmpHdfsConfiguration
    }

    if (tmpHdfsConfiguration == null) {
      try {
        tmpHdfsConfiguration = configBroadcast.value.value
        tmpHdfsConfiguration
      } catch {
        case ex: Exception => {
          println("Unable to getConfig from broadcast")
        }
      }
    }

    tmpHdfsConfiguration
  }

  private def hbaseMapPartition[K, U](
    configBroadcast: Broadcast[SerializableWritable[Configuration]],
    it:              Iterator[K],
    mp:              (Iterator[K], HConnection) => Iterator[U]): Iterator[U] = {

    val config = getConf(configBroadcast)
    applyCreds(configBroadcast)
    val hConnection = HConnectionManager.createConnection(config)
    val res = mp(it, hConnection)
    hConnection.close()
    res

  }

  private class GetMapPartition[T, U](
    tableName:     String,
    batchSize:     Integer,
    makeGet:       (T) => Get,
    convertResult: (Result) => U) extends Serializable {

    def run(iterator: Iterator[T], hConnection: HConnection): Iterator[U] = {
      val htable = hConnection.getTable(tableName)

      val gets = new ArrayList[Get]()
      var res = List[U]()

      while (iterator.hasNext) {
        gets.add(makeGet(iterator.next))

        if (gets.size() == batchSize) {
          var results = htable.get(gets)
          res = res ++ results.map(convertResult)
          gets.clear()
        }
      }

      if (gets.size() > 0) {
        val results = htable.get(gets)
        res = res ++ results.map(convertResult)
        gets.clear()
      }

      htable.close()
      res.iterator
    }
  }

  def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]]

}

  根据我们的需求,重构了HbaseContext的源代码,删除了不必要的程序代码,从源头上保证了程序适用于我们的应用场景。

SparkSteaming代码

      /** initialize ZK UTIL */
   @transient val zkUtil = new CuratorUtil()

   /** get initialize parameters */
   val offsetPath = PropertiesUtil.getProperty(ConstantUtil.ZOOKEEPER_SPARK_PATH)
   zkUtil.createZKNodePer(offsetPath, null)

   val topic = PropertiesUtil.getProperty(ConstantUtil.KAFKA_TOPIC_NAME)
   val recTime = Integer.parseInt(PropertiesUtil.getProperty(ConstantUtil.STREAMING_RECTCKE_TIME))
   val ZK_MYSQL_PATH = PropertiesUtil.getProperty(ConstantUtil.ZOOKEEPER_NAMESPACE_MYSQL_TABLES);
   val brokerList = PropertiesUtil.getProperty(ConstantUtil.KAFKA_BROKER_LIST);

   val kafkaParams = Map[String, String](
     "metadata.broker.list" -> brokerList,
     "zookeeper.connect" -> PropertiesUtil.getProperty(ConstantUtil.ZOOKEEPER_SERVER_LIST),
     "group.id" -> PropertiesUtil.getProperty(ConstantUtil.KAFKA_CONSUMER_GROUPID))

   /** initialize HBASE METAS for filter */
   @transient @volatile var metas: java.util.HashMap[String, java.util.HashMap[String, java.util.HashMap[String, ColumnInfo]]] = Instance.paserMetas(zkUtil, ZK_MYSQL_PATH)
   if (metas.size() < 1) {
     println("load hbase tablem metas failed!")
     return ;
   }

   /**  initialize Context */
   // configure
   @transient val sparkConf = new SparkConf()
     .set("spark.streaming.backpressure.enabled", PropertiesUtil.getProperty(ConstantUtil.STREAMING_BACK_ENABLED)) // 设置可以限制
     .set("spark.streaming.kafka.maxRatePerPartition", PropertiesUtil.getProperty(ConstantUtil.STREAMING_KAFKA_MAXRATE)) // 设置具体限制数量:records/SEC
     .set("spark.streaming.stopGracefullyOnShutdown", PropertiesUtil.getProperty(ConstantUtil.STREAMING_SHUTDOWN_GRACEFULLLY)) // 设置Gracefully stop
     .set("serializer.class", "kafka.serializer.StringEncoder")
   @transient val hbaseConf = HBaseConfiguration.create();
   hbaseConf.addResource("/etc/hbase/conf.cloudera.hbase/hbase-site.xml")
   hbaseConf.addResource("/etc/hbase/conf.cloudera.hbase/core-site.xml")
   @transient val sc = new SparkContext(sparkConf)
   val ssc = new StreamingContext(sc, Seconds(recTime));

   val fromOffsets = readOffsetData(zkUtil, offsetPath, topic, brokerList, 9092)
   val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, (mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message()))

   stream.foreachRDD(rdd => {

     val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges.map { offset => (offset.partition, offset.fromOffset) }
     writeOffsetData(zkUtil, offsetPath, offsets)

     val hbaseContext = new HBaseContext(sc, hbaseConf, metas)
     hbaseContext.bulkMutation(rdd.map(item => item._2), (KV: String) => {
       Instance.parse(KV)
     }, false)

   })

   /** add gracefully stop control */
   Runtime.getRuntime.addShutdownHook(new Thread {
     override def run(): Unit = {
       try {
         zkUtil.close()
       } catch {
         case e: Exception => {
         }
       }
       ssc.stop(true, true)
     }
   })

   /** spark streaming start and wait termination */
   ssc.start()
   ssc.awaitTermination()

 }

/**
  * @desc read data from Zookeeper
  */
 def readOffsetData(zkUtil: CuratorUtil, offsetPath: String, topic: String, brokerList: String, kafkaPort: Integer): Map[TopicAndPartition, Long] = {

   val orgData = zkUtil.readDataForPath(offsetPath)
   if (null == orgData) {
     val util = KafkaUtil.getInstance();
     util.init(brokerList, kafkaPort, topic);
     val offsets = util.getLeastOffsets
     val fromOffsets = for (i <- 0 to offsets.size() - 1)
       yield TopicAndPartition.apply(topic, i) -> offsets.get(i).toLong
     return fromOffsets.toMap
   }

   val data = JSON.parseFull(orgData).get.asInstanceOf[Map[String, String]]
   val fromOffsets = data.map(item => {
     TopicAndPartition.apply(topic, item._1.toInt) -> item._2.toLong
   })
   return fromOffsets

 }

 /**
  * @desc write offset data to Zookeeper
  */
 def writeOffsetData(zkUtil: CuratorUtil, offsetPath: String, data: Array[(Int, Long)]): Unit = {

   val map = data.toMap[Int, Long].map(item => {
     item._1.toString() -> item._2.toString()
   })
   zkUtil.setDataForPath(offsetPath, JSONObject(map).toString)

 }