【hadoop】Hbase java api 案例
代码实现:
HBaseConnection.java
package com.peizheng.bigdata;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;import java.io.IOException;public class HBaseConnection {public static Connection connection = null;static {Configuration conf = HBaseConfiguration.create();conf.set("hbase.zookeeper.quorum", "master,slave1,slave2");conf.set("hbase.zookeeper.property.clientPort","2181");try {connection = ConnectionFactory.createConnection(conf);} catch (IOException e) {e.printStackTrace();}}public static void closeConnection(){try {connection.close();} catch (IOException e) {e.printStackTrace();}}
}
HBaseOperation.java
package com.peizheng.bigdata;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.util.Bytes;import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;public class HBaseOperation {public static void createNameSpace(String namespace) throws IOException {// 1 获取admin对象 另有Table对象Admin admin = HBaseConnection.connection.getAdmin();// 1.1 Builder类NamespaceDescriptor.Builder builder = NamespaceDescriptor.create(namespace);// 1.2 添加需求,这里是添加了自定义的描述信息//builder.addConfiguration("user","peizheng");// 2 调用方法,创建命名空间admin.createNamespace(builder.build());// 3 关闭adminadmin.close();}public static void createTable(String name, String[] cols) throws IOException {Admin admin = HBaseConnection.connection.getAdmin();HTableDescriptor hTableDescriptor = new HTableDescriptor(name);for (String col : cols) {HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(col);hColumnDescriptor.setMaxVersions(5);hTableDescriptor.addFamily(hColumnDescriptor);}admin.createTable(hTableDescriptor);admin.close();}public static void putCell(String tableName, String rowKey, String columnFamily, String columnName, String value) throws IOException {Table table = HBaseConnection.connection.getTable(TableName.valueOf(tableName));Put put = new Put(Bytes.toBytes(rowKey));put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(columnName),Bytes.toBytes(value));table.put(put);table.close();}// 查询// 单行读取public static void getRow(String tableName, String rowKey) throws IOException {Table table = HBaseConnection.connection.getTable(TableName.valueOf(tableName));Get get = new Get(Bytes.toBytes(rowKey));// Result -> Cell[]Result result = table.get(get);// cell存储非常底层Cell[] cells = result.rawCells();for (Cell cell : cells) {String value = new String(CellUtil.cloneValue(cell));String family = new String(CellUtil.cloneFamily(cell));String colunm = new String(CellUtil.cloneQualifier(cell));System.out.println(family + ":" + colunm + "," + value);}table.close();}public static void getCell(String tableName, String rowKey, String familyName, String columnName) throws IOException {Table table = HBaseConnection.connection.getTable(TableName.valueOf(tableName));Get get = new Get(Bytes.toBytes(rowKey));get.addColumn(Bytes.toBytes(familyName),Bytes.toBytes(columnName));// Result -> Cell[]Result result = table.get(get);// cell存储非常底层Cell[] cells = result.rawCells();for (Cell cell : cells) {String value = new String(CellUtil.cloneValue(cell));String family = new String(CellUtil.cloneFamily(cell));String colunm = new String(CellUtil.cloneQualifier(cell));System.out.println(family + ":" + colunm + "," + value);}table.close();}public static void scanRows(String tableName, String startRowKey, String endRowKey) throws IOException {Table table = HBaseConnection.connection.getTable(TableName.valueOf(tableName));Scan scan = new Scan();// 指定起始的行 (包含)scan.setStartRow(Bytes.toBytes(startRowKey));// 指定结束的行 (默认不包含)scan.setStopRow(Bytes.toBytes(endRowKey));ResultScanner scanner = table.getScanner(scan);// Result记录一行数据,Cell数组// ResultScanner记录多行数据,Result数组for (Result result : scanner) {Cell[] cells = result.rawCells();for (Cell cell : cells) {String value = new String(CellUtil.cloneValue(cell));String family = new String(CellUtil.cloneFamily(cell));String colunm = new String(CellUtil.cloneQualifier(cell));System.out.print(family + ":" + colunm + "," + value + "\t");}System.out.println();}table.close();}public static void filterScan(String tableName, String startRowKey, String endRowKey, String familyName, String columnName, String val) throws IOException {Table table = HBaseConnection.connection.getTable(TableName.valueOf(tableName));Scan scan = new Scan();// 指定起始的行 (包含)scan.setStartRow(Bytes.toBytes(startRowKey));// 指定结束的行 (默认不包含)scan.setStopRow(Bytes.toBytes(endRowKey));FilterList filterList = new FilterList();//设置过滤器//SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(Bytes.toBytes(familyName),Bytes.toBytes(columnName),CompareFilter.CompareOp.LESS_OR_EQUAL,Bytes.toBytes(val));//添加过滤器filterList.addFilter(singleColumnValueFilter);scan.setFilter(filterList);ResultScanner scanner = table.getScanner(scan);// Result记录一行数据,Cell数组// ResultScanner记录多行数据,Result数组for (Result result : scanner) {Cell[] cells = result.rawCells();for (Cell cell : cells) {String value = new String(CellUtil.cloneValue(cell));String family = new String(CellUtil.cloneFamily(cell));String colunm = new String(CellUtil.cloneQualifier(cell));System.out.print(family + ":" + colunm + "," + value + "\t");}System.out.println();table.close();}}public static void main(String[] args) throws Exception {Configuration conf = HBaseConfiguration.create();conf.set("hbase.zookeeper.quorum", "master,slave1,slave2");conf.set("hbase.zookeeper.property.clientPort", "2181");HBaseConnection.connection = ConnectionFactory.createConnection(conf);String tableName = "temperature";String[] cols = {"cf"};if (!HBaseConnection.connection.getAdmin().tableExists(TableName.valueOf(tableName))) {createTable(tableName, cols);}BufferedReader bufferedReader = new BufferedReader(new FileReader("F:/temperature.log"));String line;while ((line = bufferedReader.readLine()) != null) {String[] splits = line.split(",");String id = splits[0].trim();String year = splits[1].trim();String temperature = splits[2].trim();String rowKey = id + ":" + year;putCell(tableName, rowKey, "cf", "id", id);putCell(tableName, rowKey, "cf", "year", year);putCell(tableName, rowKey, "cf", "temperature", temperature);}bufferedReader.close();HBaseConnection.closeConnection();}
}
相关运行结果:
java程序运行结果:
hbase客户端运行结果:
scan 'temperature'
报错解决
一直运行中可能是设置连接的是ip,不是master,slave1,slave2,这种,可能报错Caused by: org.apache.hadoop.hbase.MasterNotRunningException: java.net.UnknownHostExce。在网上找了半天的原因也没有找到的话参考下面文章修改 windows的ssh配置文件:
ip,主机名供参考:
【hadoop】创建 SSH 别名来连接远程 linux-CSDN博客