【hadoop】Hbase java api 案例

发布于:2025-05-09 ⋅ 阅读:(19) ⋅ 点赞:(0)

 代码实现:

HBaseConnection.java

package com.peizheng.bigdata;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;

import java.io.IOException;

public class HBaseConnection {
    public static Connection connection = null;
    static {
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "master,slave1,slave2");
        conf.set("hbase.zookeeper.property.clientPort","2181");

        try {
            connection = ConnectionFactory.createConnection(conf);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void closeConnection(){
        try {
            connection.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

HBaseOperation.java

package com.peizheng.bigdata;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;

public class HBaseOperation {

    public static void createNameSpace(String namespace) throws IOException {

        // 1 获取admin对象   另有Table对象
        Admin admin = HBaseConnection.connection.getAdmin();

        // 1.1 Builder类
        NamespaceDescriptor.Builder builder = NamespaceDescriptor.create(namespace);

        // 1.2 添加需求,这里是添加了自定义的描述信息
        //builder.addConfiguration("user","peizheng");

        // 2 调用方法,创建命名空间
        admin.createNamespace(builder.build());

        // 3 关闭admin
        admin.close();
    }

    public static void createTable(String name, String[] cols) throws IOException {
        Admin admin = HBaseConnection.connection.getAdmin();

        HTableDescriptor hTableDescriptor = new HTableDescriptor(name);

        for (String col : cols) {
            HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(col);
            hColumnDescriptor.setMaxVersions(5);

            hTableDescriptor.addFamily(hColumnDescriptor);
        }


        admin.createTable(hTableDescriptor);

        admin.close();
    }

    public static void putCell(String tableName, String rowKey, String columnFamily, String columnName, String value) throws IOException {
        Table table = HBaseConnection.connection.getTable(TableName.valueOf(tableName));

        Put put = new Put(Bytes.toBytes(rowKey));

        put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(columnName),Bytes.toBytes(value));

        table.put(put);

        table.close();
    }

    // 查询
    // 单行读取
    public static void getRow(String tableName, String rowKey) throws IOException {
        Table table = HBaseConnection.connection.getTable(TableName.valueOf(tableName));

        Get get = new Get(Bytes.toBytes(rowKey));



        // Result -> Cell[]
        Result result = table.get(get);

        // cell存储非常底层
        Cell[] cells = result.rawCells();

        for (Cell cell : cells) {
            String value = new String(CellUtil.cloneValue(cell));
            String family = new String(CellUtil.cloneFamily(cell));
            String colunm = new String(CellUtil.cloneQualifier(cell));


            System.out.println(family + ":" + colunm + "," + value);
        }

        table.close();
    }

    public static void getCell(String tableName, String rowKey, String familyName, String columnName) throws IOException {
        Table table = HBaseConnection.connection.getTable(TableName.valueOf(tableName));

        Get get = new Get(Bytes.toBytes(rowKey));

        get.addColumn(Bytes.toBytes(familyName),Bytes.toBytes(columnName));

        // Result -> Cell[]
        Result result = table.get(get);

        // cell存储非常底层
        Cell[] cells = result.rawCells();

        for (Cell cell : cells) {
            String value = new String(CellUtil.cloneValue(cell));
            String family = new String(CellUtil.cloneFamily(cell));
            String colunm = new String(CellUtil.cloneQualifier(cell));


            System.out.println(family + ":" + colunm + "," + value);
        }

        table.close();
    }

    public static void scanRows(String tableName, String startRowKey, String endRowKey) throws IOException {
        Table table = HBaseConnection.connection.getTable(TableName.valueOf(tableName));

        Scan scan = new Scan();

        // 指定起始的行 (包含)
        scan.setStartRow(Bytes.toBytes(startRowKey));

        // 指定结束的行 (默认不包含)
        scan.setStopRow(Bytes.toBytes(endRowKey));

        ResultScanner scanner = table.getScanner(scan);

        // Result记录一行数据,Cell数组
        // ResultScanner记录多行数据,Result数组
        for (Result result : scanner) {
            Cell[] cells = result.rawCells();

            for (Cell cell : cells) {
                String value = new String(CellUtil.cloneValue(cell));
                String family = new String(CellUtil.cloneFamily(cell));
                String colunm = new String(CellUtil.cloneQualifier(cell));


                System.out.print(family + ":" + colunm + "," + value + "\t");
            }
            System.out.println();
        }

        table.close();
    }

    public static void filterScan(String tableName, String startRowKey, String endRowKey, String familyName, String columnName, String val) throws IOException {
        Table table = HBaseConnection.connection.getTable(TableName.valueOf(tableName));

        Scan scan = new Scan();

        // 指定起始的行 (包含)
        scan.setStartRow(Bytes.toBytes(startRowKey));

        // 指定结束的行 (默认不包含)
        scan.setStopRow(Bytes.toBytes(endRowKey));

        FilterList filterList = new FilterList();

        //设置过滤器
        //
        SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(
            Bytes.toBytes(familyName),
            Bytes.toBytes(columnName),
                CompareFilter.CompareOp.LESS_OR_EQUAL,
                Bytes.toBytes(val)
        );

        //添加过滤器
        filterList.addFilter(singleColumnValueFilter);
        scan.setFilter(filterList);

        ResultScanner scanner = table.getScanner(scan);

        // Result记录一行数据,Cell数组
        // ResultScanner记录多行数据,Result数组
        for (Result result : scanner) {
            Cell[] cells = result.rawCells();

            for (Cell cell : cells) {
                String value = new String(CellUtil.cloneValue(cell));
                String family = new String(CellUtil.cloneFamily(cell));
                String colunm = new String(CellUtil.cloneQualifier(cell));


                System.out.print(family + ":" + colunm + "," + value + "\t");
            }
            System.out.println();

            table.close();
        }
    }



    public static void main(String[] args) throws Exception {
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "master,slave1,slave2");
        conf.set("hbase.zookeeper.property.clientPort", "2181");
        HBaseConnection.connection = ConnectionFactory.createConnection(conf);

        String tableName = "temperature";
        String[] cols = {"cf"};
        if (!HBaseConnection.connection.getAdmin().tableExists(TableName.valueOf(tableName))) {
            createTable(tableName, cols);
        }

        BufferedReader bufferedReader = new BufferedReader(new FileReader("F:/temperature.log"));
        String line;
        while ((line = bufferedReader.readLine()) != null) {
            String[] splits = line.split(",");
            String id = splits[0].trim();
            String year = splits[1].trim();
            String temperature = splits[2].trim();
            String rowKey = id + ":" + year;
            putCell(tableName, rowKey, "cf", "id", id);
            putCell(tableName, rowKey, "cf", "year", year);
            putCell(tableName, rowKey, "cf", "temperature", temperature);
        }
        bufferedReader.close();
        HBaseConnection.closeConnection();
    }
}

相关运行结果:

java程序运行结果:

 hbase客户端运行结果:

scan 'temperature'

 

报错解决

一直运行中可能是设置连接的是ip,不是master,slave1,slave2,这种,可能报错Caused by: org.apache.hadoop.hbase.MasterNotRunningException: java.net.UnknownHostExce。在网上找了半天的原因也没有找到的话参考下面文章修改 windows的ssh配置文件:

ip,主机名供参考:

【hadoop】创建 SSH 别名来连接远程 linux-CSDN博客


网站公告

今日签到

点亮在社区的每一天
去签到