【Spark集成HBase】Spark读写HBase表

发布于:2025-05-23 ⋅ 阅读:(15) ⋅ 点赞:(0)

摘要

本文介绍如何使用 Spark 2.3.2 实现对 HBase 1.4.8 表的读写操作,通过 Scala 语言将 CSV 数据写入 HBase,并利用 Spark SQL 分析数据。代码示例涵盖数据批量写入、全表扫描、数据类型转换及结构化查询,适合大数据开发人员快速掌握 Spark 与 HBase 的集成方法。

一、实验环境准备

1. 技术版本

  • Spark:2.3.2
  • HBase:1.4.8
  • Scala:2.11
  • 开发工具:IntelliJ IDEA
  • 依赖管理:Maven

2. Maven 依赖配置

pom.xml 中添加以下依赖:

<dependencies>  
  <!-- Spark 核心与 SQL -->  
  <dependency>  
    <groupId>org.apache.spark</groupId>  
    <artifactId>spark-core_2.11</artifactId>  
    <version>2.3.2</version>  
  </dependency>  
  <dependency>  
    <groupId>org.apache.spark</groupId>  
    <artifactId>spark-sql_2.11</artifactId>  
    <version>2.3.2</version>  
  </dependency>  
  <!-- HBase 客户端与 MapReduce 支持 -->  
  <dependency>  
    <groupId>org.apache.hbase</groupId>  
    <artifactId>hbase-client</artifactId>  
    <version>1.4.8</version>  
  </dependency>  
  <dependency>  
    <groupId>org.apache.hbase</groupId>  
    <artifactId>hbase-common</artifactId>  
    <version>1.4.8</version>  
  </dependency>  
  <dependency>  
    <groupId>org.apache.hbase</groupId>  
    <artifactId>hbase-server</artifactId>  
    <version>1.4.8</version>  
  </dependency>  
  <dependency>  
    <groupId>org.apache.hbase</groupId>  
    <artifactId>hbase-mapreduce</artifactId>  
    <version>1.4.8</version>  
  </dependency>  
  <!-- Hadoop 客户端(与 HBase 兼容) -->  
  <dependency>  
    <groupId>org.apache.hadoop</groupId>  
    <artifactId>hadoop-client</artifactId>  
    <version>2.7.3</version>  
  </dependency>  
</dependencies>  

二、实验步骤

1. 数据准备

  • 文件路径D:\\JavaProjects\\SparkAllProjects\\data\\emp.csv
  • 数据格式(示例):
    7369,SMITH,CLERK,7902,1980-12-17,800,,20  
    7499,ALLEN,SALESMAN,7698,1981-02-20,1600,300,30  
    
    字段含义:员工ID,姓名,职位,上级ID,入职日期,薪资,奖金,部门ID

2. HBase 表结构设计

  • 表名employee
  • 列族info
  • 列标识
    • ename:姓名
    • job:职位
    • mgr:上级ID
    • hiredate:入职日期
    • salary:薪资
    • comm:奖金
    • deptNo:部门ID

3. 代码实现

3.1 数据写入 HBase(writeDataToHBase 方法)

  • 核心逻辑
    1. 读取 CSV 文件为 DataFrame。
    2. 按分区遍历数据,批量创建 Put 对象。
    3. 通过 HBase 连接将数据写入表中,避免单条写入性能瓶颈。
  • 核心代码
      def writeDataToHBase(spark: SparkSession): Unit = {
    
        // 2.读取数据文件
        val empDF = spark.read.csv("file:///D:\\JavaProjects\\SparkAllProjects\\data\\emp.csv")
    
        // 3.按照DataFrame分区写入HBase表中
        empDF.foreachPartition(p => {
          // 3.1 配置HBase连接地址:初始化conf配置对象、配置zk连接地址及其端口
          val conf = HBaseConfiguration.create()
          conf.set("hbase.zookeeper.quorum", "s1,s2,s3")
          conf.set("hbase.zookeeper.property.clientPort", "2181")
    
          // 3.2 在每个HBase节点中创建HBase的链接对象
          val conn = ConnectionFactory.createConnection(conf)
          // 3.3 获取HBase目标表
          val table = conn.getTable(TableName.valueOf(TABLE_NAME))
    
          // 批量提交时,使用list列表存储put,当达到batchSize大小时提交一次
          val batchSize = 14 // emp.csv就14条记录,所以设置成14条提交一次
          var puts = List[Put]() // puts数组
    
          // 3.4 将dataframe中的每个分区数据写入table表中
          try {
            p.foreach(row => {
              // 3.5 获取每行中的各个列的数据
              val empNo = row.getString(0)
              val ename = row.getString(1)
              val job = row.getString(2)
              val mgr = row.getString(3)
              val hireDate = row.getString(4)
              val salary = row.getString(5)
              val comm = row.getString(6)
              val deptNo = row.getString(7)
    
              var mgrStr = "0"
              if (mgr != null) {
                mgrStr = mgr
              }
    
              var commStr = "0.0"
              if (comm != null) {
                commStr = comm
              }
    
              // 3.6 设置rowkey:按照empNo
              val rowKey = Bytes.toBytes(empNo)
    
              // 3.7 创建Put对象,设置列族中的列和字段
              val put = new Put(rowKey)
              put.addColumn(Bytes.toBytes(CF_NAME), Bytes.toBytes("ename"), Bytes.toBytes(ename))
              put.addColumn(Bytes.toBytes(CF_NAME), Bytes.toBytes("job"), Bytes.toBytes(job))
              put.addColumn(Bytes.toBytes(CF_NAME), Bytes.toBytes("mgr"), Bytes.toBytes(mgrStr))
              put.addColumn(Bytes.toBytes(CF_NAME), Bytes.toBytes("hiredate"), Bytes.toBytes(hireDate))
              put.addColumn(Bytes.toBytes(CF_NAME), Bytes.toBytes("salary"), Bytes.toBytes(salary))
              put.addColumn(Bytes.toBytes(CF_NAME), Bytes.toBytes("comm"), Bytes.toBytes(commStr))
              put.addColumn(Bytes.toBytes(CF_NAME), Bytes.toBytes("deptNo"), Bytes.toBytes(deptNo))
    
              // 3.8 将该Put对象加入Table对象中
              puts = put :: puts
              if (puts.size >= batchSize) {
                // 转成Java中的ArrayList
                val javaLists = new util.ArrayList[Put](puts.size)
                puts.foreach(javaLists.add)
                // table调用put添加javaLists
                table.put(javaLists)
                // 添加完成后,清空
                puts = List[Put]()
              }
    
              // 处理剩余数据: 当不满足14条数据时,把剩余数据写入HBase表中
              if (puts.nonEmpty) {
                val javaLists = new util.ArrayList[Put](puts.size)
                puts.foreach(javaLists.add)
                table.put(javaLists)
              }
            })
          } finally {
            // 确保资源释放
            if (table != null) table.close()
            if (conn != null) conn.close()
          }
        })
      }
    

3.2 数据读取与分析(readHBaseData 方法

  • 核心逻辑
    1. 使用 ResultScanner 全表扫描 HBase 数据。
    2. 将二进制数据转换为样例类 Employee,自动推断 DataFrame 的 Schema。
    3. 通过 Spark SQL 执行聚合查询(如按部门统计薪资总和)。
  • 核心代码
    case class Employee(  
      employee_id: Int,  
      employee_name: String,  
      job_title: String,  
      manager_id: Int,  
      hire_date: String,  
      salary: Double,  
      bonus: Double,  
      department_id: Int  
    )  
    
    def readHBaseData(spark: SparkSession): DataFrame = {
        // 1. 配置 HBase 连接参数
        val conf = HBaseConfiguration.create()
        conf.set("hbase.zookeeper.quorum", "s1,s2,s3")  // 替换为你的 ZK 地址
        conf.set("hbase.zookeeper.property.clientPort", "2181")
    
        // 2. 创建 HBase 连接和表对象
        val conn = ConnectionFactory.createConnection(conf)
        val table = conn.getTable(TableName.valueOf(TABLE_NAME))
        var scanner: ResultScanner = null
    
        try {
          // 3. 构造扫描器(Scan)并配置
          val scan = new Scan()
            .addFamily(Bytes.toBytes(CF_NAME))  // 读取指定列族下的所有列
            .setCaching(500)  // 提升批量读取性能
            .setCacheBlocks(false)
    
          // 4. 获取扫描结果迭代器
          scanner = table.getScanner(scan)
    
          // 5. 遍历结果并转换为 Employee 对象
          val employees = ListBuffer[Employee]()
          val it = scanner.iterator()
          while (it.hasNext) {
            val result: Result = it.next()
    
            // 提取行键(假设 rowkey 是 employee_id 的字符串形式)
            val rowKeyStr = Bytes.toString(result.getRow)
            val employeeId = rowKeyStr.toInt  // 转换为 Int(需确保 rowkey 是数字)
    
            // 提取各列数据(根据 HBase 实际存储的列名调整)
            val name = Bytes.toString(result.getValue(Bytes.toBytes(CF_NAME), Bytes.toBytes("ename")))
            val job = Bytes.toString(result.getValue(Bytes.toBytes(CF_NAME), Bytes.toBytes("job")))
            val mgrStr = Bytes.toString(result.getValue(Bytes.toBytes(CF_NAME), Bytes.toBytes("mgr")))
            val hireDate = Bytes.toString(result.getValue(Bytes.toBytes(CF_NAME), Bytes.toBytes("hiredate")))
            val salaryStr = Bytes.toString(result.getValue(Bytes.toBytes(CF_NAME), Bytes.toBytes("salary")))
            val commStr = Bytes.toString(result.getValue(Bytes.toBytes(CF_NAME), Bytes.toBytes("comm")))
            val deptNoStr = Bytes.toString(result.getValue(Bytes.toBytes(CF_NAME), Bytes.toBytes("deptNo")))
    
            // 处理可能的空值或转换异常(示例:默认值为 0)
            val managerId = if (mgrStr != null && mgrStr.nonEmpty) mgrStr.toInt else 0
            val salary = if (salaryStr != null && salaryStr.nonEmpty) salaryStr.toDouble else 0
            val bonus = if (commStr != null && commStr.nonEmpty) commStr.toDouble else 0
            val departmentId = if (deptNoStr != null && deptNoStr.nonEmpty) deptNoStr.toInt else 0
    
            // 创建 Employee 对象并添加到列表
            employees += Employee(
              employee_id = employeeId,
              employee_name = name,
              job_title = job,
              manager_id = managerId,
              hire_date = hireDate,
              salary = salary,
              bonus = bonus,
              department_id = departmentId
            )
          }
    
          // 6. 将 Employee 列表转换为 DataFrame(自动推断 Schema)
          import spark.implicits._
          spark.createDataFrame(employees.toList)
    
        } catch {
          case e: Exception =>
            println(s"读取 HBase 数据失败: ${e.getMessage}")
            throw e  // 抛异常终止流程
        } finally {
          // 7. 释放所有资源(关键!避免连接泄漏)
          if (scanner != null) scanner.close()
          if (table != null) table.close()
          if (conn != null) conn.close()
        }
      }
    

3.3 Spark SQL 分析

  • 读取数据后,通过 Spark SQL 执行聚合查询:
    val df = readHBaseData(spark)  
    df.createOrReplaceTempView("emp")  
    spark.sql("""  
      SELECT department_id,  
        SUM(salary + bonus) AS total  
      FROM emp  
      GROUP BY department_id  
      ORDER BY total DESC  
    """).show(false)  
    
  • 其他分析查询请自行操作。

3.4 完整代码

  • 在IDEA中创建名为WriteAndReadDataToHBase单例对象
  • 添加如下完成代码:
    package com.lpssfxy.spark.datasource
    
    import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
    import org.apache.hadoop.hbase.client.{ConnectionFactory, Put, Result, ResultScanner, Scan}
    import org.apache.hadoop.hbase.util.Bytes
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    import java.util
    import scala.collection.mutable.ListBuffer
    
    case class Employee(
                         employee_id: Int,       // 对应 HBase rowkey(假设 rowkey 是员工ID)
                         employee_name: String,  // 对应列族 info:ename
                         job_title: String,      // 对应列族 info:job
                         manager_id: Int,        // 对应列族 info:mgr
                         hire_date: String,      // 对应列族 info:hiredate
                         salary: Double,            // 对应列族 info:salary
                         bonus: Double,             // 对应列族 info:comm
                         department_id: Int      // 对应列族 info:deptNo
                       )
    
    /**
     * spark读写HBase表employee
     */
    object WriteAndReadDataToHBase {
    
      // 定义表名称及其列族名称
      private val TABLE_NAME = "employee"
      private val CF_NAME = "info"
    
      def main(args: Array[String]): Unit = {
        // 1. 准备环境:SparkSession初始化
        val spark = SparkSession
          .builder()
          .appName("WriteDataToHBase")
          .master("local[*]")
          .getOrCreate()
    
        // 2.调用writeDataToHBase方法将文件写入HBase中
        //writeDataToHBase(spark)
    
        // 3. 读HBase数据
        val df = readHBaseData(spark)
        //df.show(false)
        df.createOrReplaceTempView("emp")
        spark.sql("select department_id,sum(salary+bonus) as total from emp group by department_id order by total desc").show(false)
    
    
        // 4.停止SparkSession对象,释放资源
        spark.stop()
      }
    
      /**
       * 写数据到HBase表中
       * @param spark
       */
      def writeDataToHBase(spark: SparkSession): Unit = {
    
        // 2.读取数据文件
        val empDF = spark.read.csv("file:///D:\\JavaProjects\\SparkAllProjects\\data\\emp.csv")
    
        // 3.按照DataFrame分区写入HBase表中
        empDF.foreachPartition(p => {
          // 3.1 配置HBase连接地址:初始化conf配置对象、配置zk连接地址及其端口
          val conf = HBaseConfiguration.create()
          conf.set("hbase.zookeeper.quorum", "s1,s2,s3")
          conf.set("hbase.zookeeper.property.clientPort", "2181")
    
          // 3.2 在每个HBase节点中创建HBase的链接对象
          val conn = ConnectionFactory.createConnection(conf)
          // 3.3 获取HBase目标表
          val table = conn.getTable(TableName.valueOf(TABLE_NAME))
    
          // 批量提交时,使用list列表存储put,当达到batchSize大小时提交一次
          val batchSize = 14 // emp.csv就14条记录,所以设置成14条提交一次
          var puts = List[Put]() // puts数组
    
          // 3.4 将dataframe中的每个分区数据写入table表中
          try {
            p.foreach(row => {
              // 3.5 获取每行中的各个列的数据
              val empNo = row.getString(0)
              val ename = row.getString(1)
              val job = row.getString(2)
              val mgr = row.getString(3)
              val hireDate = row.getString(4)
              val salary = row.getString(5)
              val comm = row.getString(6)
              val deptNo = row.getString(7)
    
              var mgrStr = "0"
              if (mgr != null) {
                mgrStr = mgr
              }
    
              var commStr = "0.0"
              if (comm != null) {
                commStr = comm
              }
    
              // 3.6 设置rowkey:按照empNo
              val rowKey = Bytes.toBytes(empNo)
    
              // 3.7 创建Put对象,设置列族中的列和字段
              val put = new Put(rowKey)
              put.addColumn(Bytes.toBytes(CF_NAME), Bytes.toBytes("ename"), Bytes.toBytes(ename))
              put.addColumn(Bytes.toBytes(CF_NAME), Bytes.toBytes("job"), Bytes.toBytes(job))
              put.addColumn(Bytes.toBytes(CF_NAME), Bytes.toBytes("mgr"), Bytes.toBytes(mgrStr))
              put.addColumn(Bytes.toBytes(CF_NAME), Bytes.toBytes("hiredate"), Bytes.toBytes(hireDate))
              put.addColumn(Bytes.toBytes(CF_NAME), Bytes.toBytes("salary"), Bytes.toBytes(salary))
              put.addColumn(Bytes.toBytes(CF_NAME), Bytes.toBytes("comm"), Bytes.toBytes(commStr))
              put.addColumn(Bytes.toBytes(CF_NAME), Bytes.toBytes("deptNo"), Bytes.toBytes(deptNo))
    
              // 3.8 将该Put对象加入Table对象中
              puts = put :: puts
              if (puts.size >= batchSize) {
                // 转成Java中的ArrayList
                val javaLists = new util.ArrayList[Put](puts.size)
                puts.foreach(javaLists.add)
                // table调用put添加javaLists
                table.put(javaLists)
                // 添加完成后,清空
                puts = List[Put]()
              }
    
              // 处理剩余数据: 当不满足14条数据时,把剩余数据写入HBase表中
              if (puts.nonEmpty) {
                val javaLists = new util.ArrayList[Put](puts.size)
                puts.foreach(javaLists.add)
                table.put(javaLists)
              }
            })
          } finally {
            // 确保资源释放
            if (table != null) table.close()
            if (conn != null) conn.close()
          }
        })
      }
    
      /**
       * 读取HBase表数据
       * @param spark
       * @return
       */
      def readHBaseData(spark: SparkSession): DataFrame = {
        // 1. 配置 HBase 连接参数
        val conf = HBaseConfiguration.create()
        conf.set("hbase.zookeeper.quorum", "s1,s2,s3")  // 替换为你的 ZK 地址
        conf.set("hbase.zookeeper.property.clientPort", "2181")
    
        // 2. 创建 HBase 连接和表对象
        val conn = ConnectionFactory.createConnection(conf)
        val table = conn.getTable(TableName.valueOf(TABLE_NAME))
        var scanner: ResultScanner = null
    
        try {
          // 3. 构造扫描器(Scan)并配置
          val scan = new Scan()
            .addFamily(Bytes.toBytes(CF_NAME))  // 读取指定列族下的所有列
            .setCaching(500)  // 提升批量读取性能
            .setCacheBlocks(false)
    
          // 4. 获取扫描结果迭代器
          scanner = table.getScanner(scan)
    
          // 5. 遍历结果并转换为 Employee 对象
          val employees = ListBuffer[Employee]()
          val it = scanner.iterator()
          while (it.hasNext) {
            val result: Result = it.next()
    
            // 提取行键(假设 rowkey 是 employee_id 的字符串形式)
            val rowKeyStr = Bytes.toString(result.getRow)
            val employeeId = rowKeyStr.toInt  // 转换为 Int(需确保 rowkey 是数字)
    
            // 提取各列数据(根据 HBase 实际存储的列名调整)
            val name = Bytes.toString(result.getValue(Bytes.toBytes(CF_NAME), Bytes.toBytes("ename")))
            val job = Bytes.toString(result.getValue(Bytes.toBytes(CF_NAME), Bytes.toBytes("job")))
            val mgrStr = Bytes.toString(result.getValue(Bytes.toBytes(CF_NAME), Bytes.toBytes("mgr")))
            val hireDate = Bytes.toString(result.getValue(Bytes.toBytes(CF_NAME), Bytes.toBytes("hiredate")))
            val salaryStr = Bytes.toString(result.getValue(Bytes.toBytes(CF_NAME), Bytes.toBytes("salary")))
            val commStr = Bytes.toString(result.getValue(Bytes.toBytes(CF_NAME), Bytes.toBytes("comm")))
            val deptNoStr = Bytes.toString(result.getValue(Bytes.toBytes(CF_NAME), Bytes.toBytes("deptNo")))
    
            // 处理可能的空值或转换异常(示例:默认值为 0)
            val managerId = if (mgrStr != null && mgrStr.nonEmpty) mgrStr.toInt else 0
            val salary = if (salaryStr != null && salaryStr.nonEmpty) salaryStr.toDouble else 0
            val bonus = if (commStr != null && commStr.nonEmpty) commStr.toDouble else 0
            val departmentId = if (deptNoStr != null && deptNoStr.nonEmpty) deptNoStr.toInt else 0
    
            // 创建 Employee 对象并添加到列表
            employees += Employee(
              employee_id = employeeId,
              employee_name = name,
              job_title = job,
              manager_id = managerId,
              hire_date = hireDate,
              salary = salary,
              bonus = bonus,
              department_id = departmentId
            )
          }
    
          // 6. 将 Employee 列表转换为 DataFrame(自动推断 Schema)
          import spark.implicits._
          spark.createDataFrame(employees.toList)
    
        } catch {
          case e: Exception =>
            println(s"读取 HBase 数据失败: ${e.getMessage}")
            throw e  // 抛异常终止流程
        } finally {
          // 7. 释放所有资源(关键!避免连接泄漏)
          if (scanner != null) scanner.close()
          if (table != null) table.close()
          if (conn != null) conn.close()
        }
      }
    
    }
    
    

三、实验结果

1. 数据写入验证

  • 控制台输出无异常日志,HBase 表 employee 中生成对应 rowkey 的记录。
  • 通过 HBase Shell 命令 scan 'employee' 可查看数据:
    hbase> scan 'employee', {LIMIT => 2}  
    ROW       COLUMN+CELL                                              
    7369      column=info:ename, timestamp=... value=SMITH               
    7369      column=info:job, timestamp=... value=CLERK               
    # ...(其他列省略)  
    

2. 数据读取与分析结果

  • 原始数据展示

    +-----------+-------------+-----------+-----------+----------+------+-----+-------------+  
    |employee_id|employee_name|job_title  |manager_id |hire_date |salary|bonus|department_id|  
    +-----------+-------------+-----------+-----------+----------+------+-----+-------------+  
    |7369       |SMITH        |CLERK      |7902       |1980-12-17|800.0 |0.0  |20           |  
    |7499       |ALLEN        |SALESMAN   |7698       |1981-02-20|1600.0|300.0|30           |  
    # ...(其他行省略)  
    
  • Spark SQL 聚合结果

    +-------------+------------+  
    |department_id|total       |  
    +-------------+------------+  
    |30           |20150.0     |  
    |20           |10875.0     |  
    |10           |8750.0      |  
    +-------------+------------+  
    

四、常见问题与优化

1. 空值处理

  • HBase 列值为 null 时,result.getValue 返回 null,需通过 Option 或判空逻辑处理:
    val commStr = Option(result.getValue(...)).map(Bytes.toString).getOrElse("0.0")  
    

2. 性能优化

  • 批量写入:调整 batchSize(建议 500-2000),减少 RPC 调用次数。
  • 扫描缓存:通过 scan.setCaching(500) 提升全表扫描效率。
  • 数据类型:避免过度使用字符串类型,对数值字段直接存储二进制数据(如 Bytes.toDouble)。

3. 集群配置(可选做)

  • spark-submit 中添加 HBase 配置文件(如 hbase-site.xml),确保 Executor 节点访问 ZooKeeper:
    spark-submit --files hbase-site.xml ...  
    

五、总结

本文通过实际案例演示了 Spark 与 HBase 的集成流程,实现了从 CSV 数据写入 HBase 到结构化数据分析的完整链路。核心要点包括:

  1. 使用 foreachPartition 实现批量写入,避免单条操作性能损耗。
  2. 通过 ResultScanner 和样例类映射,简化 HBase 数据到 DataFrame 的转换。
  3. 利用 Spark SQL 对 HBase 数据进行高效分析,发挥分布式计算优势。

该实验适用于海量结构化数据的存储与分析场景,可进一步扩展至实时数据处理或机器学习模型训练。