Hbase学习笔记

发布于:2025-07-15 ⋅ 阅读:(18) ⋅ 点赞:(0)

 

package com.oneday.hbase.model;

import java.util.Map;
import java.util.HashMap;

public class HBaseEntity {
    
    private String rowKey;
    private String tableName;
    private Map<String, Map<String, String>> familyQualifierValue;
    private long timestamp;

    public HBaseEntity() {
        this.familyQualifierValue = new HashMap<>();
        this.timestamp = System.currentTimeMillis();
    }

    public HBaseEntity(String tableName, String rowKey) {
        this();
        this.tableName = tableName;
        this.rowKey = rowKey;
    }

    // 添加列族和列
    public void addColumn(String family, String qualifier, String value) {
        familyQualifierValue.computeIfAbsent(family, k -> new HashMap<>()).put(qualifier, value);
    }

    // 获取列值
    public String getColumnValue(String family, String qualifier) {
        Map<String, String> qualifiers = familyQualifierValue.get(family);
        return qualifiers != null ? qualifiers.get(qualifier) : null;
    }

    // Getters and Setters
    public String getRowKey() { return rowKey; }
    public void setRowKey(String rowKey) { this.rowKey = rowKey; }

    public String getTableName() { return tableName; }
    public void setTableName(String tableName) { this.tableName = tableName; }

    public Map<String, Map<String, String>> getFamilyQualifierValue() { return familyQualifierValue; }
    public void setFamilyQualifierValue(Map<String, Map<String, String>> familyQualifierValue) { 
        this.familyQualifierValue = familyQualifierValue; 
    }

    public long getTimestamp() { return timestamp; }
    public void setTimestamp(long timestamp) { this.timestamp = timestamp; }

    @Override
    public String toString() {
        return "HBaseEntity{" +
                "rowKey='" + rowKey + '\'' +
                ", tableName='" + tableName + '\'' +
                ", familyQualifierValue=" + familyQualifierValue +
                ", timestamp=" + timestamp +
                '}';
    }
}
package com.oneday.hbase.service;

import com.oneday.hbase.model.HBaseEntity;
import org.apache.hadoop.hbase.client.Result;

import java.util.List;
import java.util.Map;

public interface HBaseService {

    /**
     * 创建表
     */
    boolean createTable(String tableName, String... columnFamilies);

    /**
     * 删除表
     */
    boolean deleteTable(String tableName);

    /**
     * 检查表是否存在
     */
    boolean tableExists(String tableName);

    /**
     * 插入或更新数据
     */
    boolean put(HBaseEntity entity);

    /**
     * 批量插入或更新数据
     */
    boolean putBatch(List<HBaseEntity> entities);

    /**
     * 根据rowKey获取数据
     */
    HBaseEntity get(String tableName, String rowKey);

    /**
     * 根据rowKey和列族获取数据
     */
    HBaseEntity get(String tableName, String rowKey, String family);

    /**
     * 根据rowKey、列族和列获取数据
     */
    String get(String tableName, String rowKey, String family, String qualifier);

    /**
     * 扫描表数据
     */
    List<HBaseEntity> scan(String tableName);

    /**
     * 根据条件扫描表数据
     */
    List<HBaseEntity> scan(String tableName, String startRow, String stopRow);

    /**
     * 根据前缀扫描数据
     */
    List<HBaseEntity> scanByPrefix(String tableName, String prefix);

    /**
     * 删除数据
     */
    boolean delete(String tableName, String rowKey);

    /**
     * 删除指定列族的数据
     */
    boolean delete(String tableName, String rowKey, String family);

    /**
     * 删除指定列的数据
     */
    boolean delete(String tableName, String rowKey, String family, String qualifier);

    /**
     * 批量删除数据
     */
    boolean deleteBatch(String tableName, List<String> rowKeys);

    /**
     * 获取表的行数
     */
    long count(String tableName);
}

package com.oneday.hbase.service.impl;

import com.oneday.hbase.exception.HBaseOperationException;
import com.oneday.hbase.model.HBaseEntity;
import com.oneday.hbase.service.HBaseService;
import com.oneday.hbase.util.HBaseConnectionManager;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

@Service
public class HBaseServiceImpl implements HBaseService {

    private static final Logger logger = LoggerFactory.getLogger(HBaseServiceImpl.class);

    @Autowired
    private HBaseConnectionManager connectionManager;

    @Override
    public boolean createTable(String tableName, String... columnFamilies) {
        if (columnFamilies == null || columnFamilies.length == 0) {
            throw new IllegalArgumentException("至少需要一个列族");
        }

        try (Admin admin = connectionManager.getConnection().getAdmin()) {
            TableName table = TableName.valueOf(tableName);
            
            if (admin.tableExists(table)) {
                logger.warn("表 {} 已存在", tableName);
                return false;
            }

            TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(table);
            
            for (String family : columnFamilies) {
                ColumnFamilyDescriptor familyDescriptor = ColumnFamilyDescriptorBuilder
                        .newBuilder(Bytes.toBytes(family))
                        .setMaxVersions(3)
                        .setTimeToLive(Integer.MAX_VALUE)
                        .build();
                builder.setColumnFamily(familyDescriptor);
            }

            admin.createTable(builder.build());
            logger.info("成功创建表: {}", tableName);
            return true;

        } catch (IOException e) {
            logger.error("创建表失败: {}", tableName, e);
            throw new HBaseOperationException("CREATE_TABLE", tableName, null, "创建表失败", e);
        }
    }

    @Override
    public boolean deleteTable(String tableName) {
        try (Admin admin = connectionManager.getConnection().getAdmin()) {
            TableName table = TableName.valueOf(tableName);
            
            if (!admin.tableExists(table)) {
                logger.warn("表 {} 不存在", tableName);
                return false;
            }

            if (admin.isTableEnabled(table)) {
                admin.disableTable(table);
            }
            
            admin.deleteTable(table);
            logger.info("成功删除表: {}", tableName);
            return true;

        } catch (IOException e) {
            logger.error("删除表失败: {}", tableName, e);
            throw new HBaseOperationException("DELETE_TABLE", tableName, null, "删除表失败", e);
        }
    }

    @Override
    public boolean tableExists(String tableName) {
        try (Admin admin = connectionManager.getConnection().getAdmin()) {
            return admin.tableExists(TableName.valueOf(tableName));
        } catch (IOException e) {
            logger.error("检查表是否存在失败: {}", tableName, e);
            throw new HBaseOperationException("TABLE_EXISTS", tableName, null, "检查表是否存在失败", e);
        }
    }

    @Override
    public boolean put(HBaseEntity entity) {
        if (entity == null || entity.getRowKey() == null || entity.getTableName() == null) {
            throw new IllegalArgumentException("实体、rowKey和tableName不能为空");
        }

        try (Table table = connectionManager.getConnection().getTable(TableName.valueOf(entity.getTableName()))) {
            Put put = new Put(Bytes.toBytes(entity.getRowKey()));
            
            for (Map.Entry<String, Map<String, String>> familyEntry : entity.getFamilyQualifierValue().entrySet()) {
                String family = familyEntry.getKey();
                for (Map.Entry<String, String> qualifierEntry : familyEntry.getValue().entrySet()) {
                    String qualifier = qualifierEntry.getKey();
                    String value = qualifierEntry.getValue();
                    put.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier), 
                                entity.getTimestamp(), Bytes.toBytes(value));
                }
            }

            table.put(put);
            logger.debug("成功插入数据: table={}, rowKey={}", entity.getTableName(), entity.getRowKey());
            return true;

        } catch (IOException e) {
            logger.error("插入数据失败: table={}, rowKey={}", entity.getTableName(), entity.getRowKey(), e);
            throw new HBaseOperationException("PUT", entity.getTableName(), entity.getRowKey(), "插入数据失败", e);
        }
    }

    @Override
    public boolean putBatch(List<HBaseEntity> entities) {
        if (entities == null || entities.isEmpty()) {
            return true;
        }

        // 按表名分组
        Map<String, List<HBaseEntity>> tableGroups = new HashMap<>();
        for (HBaseEntity entity : entities) {
            tableGroups.computeIfAbsent(entity.getTableName(), k -> new ArrayList<>()).add(entity);
        }

        try {
            for (Map.Entry<String, List<HBaseEntity>> entry : tableGroups.entrySet()) {
                String tableName = entry.getKey();
                List<HBaseEntity> tableEntities = entry.getValue();

                try (Table table = connectionManager.getConnection().getTable(TableName.valueOf(tableName))) {
                    List<Put> puts = new ArrayList<>();
                    
                    for (HBaseEntity entity : tableEntities) {
                        Put put = new Put(Bytes.toBytes(entity.getRowKey()));
                        
                        for (Map.Entry<String, Map<String, String>> familyEntry : entity.getFamilyQualifierValue().entrySet()) {
                            String family = familyEntry.getKey();
                            for (Map.Entry<String, String> qualifierEntry : familyEntry.getValue().entrySet()) {
                                String qualifier = qualifierEntry.getKey();
                                String value = qualifierEntry.getValue();
                                put.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier), 
                                            entity.getTimestamp(), Bytes.toBytes(value));
                            }
                        }
                        puts.add(put);
                    }

                    table.put(puts);
                    logger.debug("成功批量插入数据: table={}, count={}", tableName, puts.size());
                }
            }
            return true;

        } catch (IOException e) {
            logger.error("批量插入数据失败", e);
            throw new HBaseOperationException("PUT_BATCH", null, null, "批量插入数据失败", e);
        }
    }

    @Override
    public HBaseEntity get(String tableName, String rowKey) {
        try (Table table = connectionManager.getConnection().getTable(TableName.valueOf(tableName))) {
            Get get = new Get(Bytes.toBytes(rowKey));
            Result result = table.get(get);
            
            if (result.isEmpty()) {
                return null;
            }

            return convertResultToEntity(tableName, result);

        } catch (IOException e) {
            logger.error("获取数据失败: table={}, rowKey={}", tableName, rowKey, e);
            throw new HBaseOperationException("GET", tableName, rowKey, "获取数据失败", e);
        }
    }

    @Override
    public HBaseEntity get(String tableName, String rowKey, String family) {
        try (Table table = connectionManager.getConnection().getTable(TableName.valueOf(tableName))) {
            Get get = new Get(Bytes.toBytes(rowKey));
            get.addFamily(Bytes.toBytes(family));
            Result result = table.get(get);
            
            if (result.isEmpty()) {
                return null;
            }

            return convertResultToEntity(tableName, result);

        } catch (IOException e) {
            logger.error("获取数据失败: table={}, rowKey={}, family={}", tableName, rowKey, family, e);
            throw new HBaseOperationException("GET", tableName, rowKey, "获取数据失败", e);
        }
    }

    @Override
    public String get(String tableName, String rowKey, String family, String qualifier) {
        try (Table table = connectionManager.getConnection().getTable(TableName.valueOf(tableName))) {
            Get get = new Get(Bytes.toBytes(rowKey));
            get.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
            Result result = table.get(get);
            
            if (result.isEmpty()) {
                return null;
            }

            byte[] value = result.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier));
            return value != null ? Bytes.toString(value) : null;

        } catch (IOException e) {
            logger.error("获取数据失败: table={}, rowKey={}, family={}, qualifier={}", 
                        tableName, rowKey, family, qualifier, e);
            throw new HBaseOperationException("GET", tableName, rowKey, "获取数据失败", e);
        }
    }

    @Override
    public List<HBaseEntity> scan(String tableName) {
        try (Table table = connectionManager.getConnection().getTable(TableName.valueOf(tableName))) {
            Scan scan = new Scan();
            scan.setCaching(1000); // 设置缓存大小
            
            try (ResultScanner scanner = table.getScanner(scan)) {
                List<HBaseEntity> entities = new ArrayList<>();
                for (Result result : scanner) {
                    entities.add(convertResultToEntity(tableName, result));
                }
                return entities;
            }

        } catch (IOException e) {
            logger.error("扫描表失败: table={}", tableName, e);
            throw new HBaseOperationException("SCAN", tableName, null, "扫描表失败", e);
        }
    }

    @Override
    public List<HBaseEntity> scan(String tableName, String startRow, String stopRow) {
        try (Table table = connectionManager.getConnection().getTable(TableName.valueOf(tableName))) {
            Scan scan = new Scan();
            scan.withStartRow(Bytes.toBytes(startRow));
            scan.withStopRow(Bytes.toBytes(stopRow));
            scan.setCaching(1000);
            
            try (ResultScanner scanner = table.getScanner(scan)) {
                List<HBaseEntity> entities = new ArrayList<>();
                for (Result result : scanner) {
                    entities.add(convertResultToEntity(tableName, result));
                }
                return entities;
            }

        } catch (IOException e) {
            logger.error("扫描表失败: table={}, startRow={}, stopRow={}", tableName, startRow, stopRow, e);
            throw new HBaseOperationException("SCAN", tableName, null, "扫描表失败", e);
        }
    }

    @Override
    public List<HBaseEntity> scanByPrefix(String tableName, String prefix) {
        try (Table table = connectionManager.getConnection().getTable(TableName.valueOf(tableName))) {
            Scan scan = new Scan();
            scan.setFilter(new PrefixFilter(Bytes.toBytes(prefix)));
            scan.setCaching(1000);
            
            try (ResultScanner scanner = table.getScanner(scan)) {
                List<HBaseEntity> entities = new ArrayList<>();
                for (Result result : scanner) {
                    entities.add(convertResultToEntity(tableName, result));
                }
                return entities;
            }

        } catch (IOException e) {
            logger.error("按前缀扫描表失败: table={}, prefix={}", tableName, prefix, e);
            throw new HBaseOperationException("SCAN_PREFIX", tableName, null, "按前缀扫描表失败", e);
        }
    }

    @Override
    public boolean delete(String tableName, String rowKey) {
        try (Table table = connectionManager.getConnection().getTable(TableName.valueOf(tableName))) {
            Delete delete = new Delete(Bytes.toBytes(rowKey));
            table.delete(delete);
            logger.debug("成功删除数据: table={}, rowKey={}", tableName, rowKey);
            return true;

        } catch (IOException e) {
            logger.error("删除数据失败: table={}, rowKey={}", tableName, rowKey, e);
            throw new HBaseOperationException("DELETE", tableName, rowKey, "删除数据失败", e);
        }
    }

    @Override
    public boolean delete(String tableName, String rowKey, String family) {
        try (Table table = connectionManager.getConnection().getTable(TableName.valueOf(tableName))) {
            Delete delete = new Delete(Bytes.toBytes(rowKey));
            delete.addFamily(Bytes.toBytes(family));
            table.delete(delete);
            logger.debug("成功删除列族数据: table={}, rowKey={}, family={}", tableName, rowKey, family);
            return true;

        } catch (IOException e) {
            logger.error("删除列族数据失败: table={}, rowKey={}, family={}", tableName, rowKey, family, e);
            throw new HBaseOperationException("DELETE", tableName, rowKey, "删除列族数据失败", e);
        }
    }

    @Override
    public boolean delete(String tableName, String rowKey, String family, String qualifier) {
        try (Table table = connectionManager.getConnection().getTable(TableName.valueOf(tableName))) {
            Delete delete = new Delete(Bytes.toBytes(rowKey));
            delete.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
            table.delete(delete);
            logger.debug("成功删除列数据: table={}, rowKey={}, family={}, qualifier={}", 
                        tableName, rowKey, family, qualifier);
            return true;

        } catch (IOException e) {
            logger.error("删除列数据失败: table={}, rowKey={}, family={}, qualifier={}", 
                        tableName, rowKey, family, qualifier, e);
            throw new HBaseOperationException("DELETE", tableName, rowKey, "删除列数据失败", e);
        }
    }

    @Override
    public boolean deleteBatch(String tableName, List<String> rowKeys) {
        if (rowKeys == null || rowKeys.isEmpty()) {
            return true;
        }

        try (Table table = connectionManager.getConnection().getTable(TableName.valueOf(tableName))) {
            List<Delete> deletes = new ArrayList<>();
            for (String rowKey : rowKeys) {
                deletes.add(new Delete(Bytes.toBytes(rowKey)));
            }
            
            table.delete(deletes);
            logger.debug("成功批量删除数据: table={}, count={}", tableName, deletes.size());
            return true;

        } catch (IOException e) {
            logger.error("批量删除数据失败: table={}", tableName, e);
            throw new HBaseOperationException("DELETE_BATCH", tableName, null, "批量删除数据失败", e);
        }
    }

    @Override
    public long count(String tableName) {
        try (Table table = connectionManager.getConnection().getTable(TableName.valueOf(tableName))) {
            Scan scan = new Scan();
            scan.setFilter(new org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter());
            
            long count = 0;
            try (ResultScanner scanner = table.getScanner(scan)) {
                for (Result result : scanner) {
                    count++;
                }
            }
            return count;

        } catch (IOException e) {
            logger.error("统计表行数失败: table={}", tableName, e);
            throw new HBaseOperationException("COUNT", tableName, null, "统计表行数失败", e);
        }
    }

    /**
     * 将HBase Result转换为HBaseEntity
     */
    private HBaseEntity convertResultToEntity(String tableName, Result result) {
        HBaseEntity entity = new HBaseEntity(tableName, Bytes.toString(result.getRow()));
        
        for (Cell cell : result.listCells()) {
            String family = Bytes.toString(CellUtil.cloneFamily(cell));
            String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
            String value = Bytes.toString(CellUtil.cloneValue(cell));
            long timestamp = cell.getTimestamp();
            
            entity.addColumn(family, qualifier, value);
            entity.setTimestamp(timestamp);
        }
        
        return entity;
    }
}
package com.oneday.hbase;

import com.oneday.hbase.model.HBaseEntity;
import com.oneday.hbase.service.HBaseService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import java.util.Arrays;
import java.util.List;

@SpringBootApplication
public class HBaseServiceApplication implements CommandLineRunner {

    @Autowired
    private HBaseService hbaseService;

    public static void main(String[] args) {
        SpringApplication.run(HBaseServiceApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        // 测试HBase操作
        testHBaseOperations();
    }

    private void testHBaseOperations() {
        String tableName = "user_info";
        
        try {
            // 1. 创建表
            System.out.println("=== 创建表 ===");
            boolean created = hbaseService.createTable(tableName, "basic", "contact");
            System.out.println("创建表结果: " + created);

            // 2. 插入数据
            System.out.println("\n=== 插入数据 ===");
            HBaseEntity user1 = new HBaseEntity(tableName, "user001");
            user1.addColumn("basic", "name", "张三");
            user1.addColumn("basic", "age", "25");
            user1.addColumn("contact", "email", "zhangsan@example.com");
            user1.addColumn("contact", "phone", "13800138000");
            
            boolean putResult = hbaseService.put(user1);
            System.out.println("插入数据结果: " + putResult);

            // 3. 查询数据
            System.out.println("\n=== 查询数据 ===");
            HBaseEntity retrievedUser = hbaseService.get(tableName, "user001");
            System.out.println("查询结果: " + retrievedUser);

            // 4. 查询特定列
            System.out.println("\n=== 查询特定列 ===");
            String name = hbaseService.get(tableName, "user001", "basic", "name");
            System.out.println("用户姓名: " + name);

            // 5. 扫描表
            System.out.println("\n=== 扫描表 ===");
            List<HBaseEntity> allUsers = hbaseService.scan(tableName);
            System.out.println("表中所有数据: " + allUsers);

            // 6. 统计行数
            System.out.println("\n=== 统计行数 ===");
            long count = hbaseService.count(tableName);
            System.out.println("表行数: " + count);

            // 7. 删除数据
            System.out.println("\n=== 删除数据 ===");
            boolean deleteResult = hbaseService.delete(tableName, "user001");
            System.out.println("删除数据结果: " + deleteResult);

        } catch (Exception e) {
            System.err.println("测试过程中发生错误: " + e.getMessage());
            e.printStackTrace();
        }
    }
}


网站公告

今日签到

点亮在社区的每一天
去签到