全部產品
Search
文件中心

Lindorm:通過Spark訪問寬表引擎

更新時間:Jul 06, 2024

本文介紹如何通過Spark以開源的方式訪問Lindorm寬表。

前提條件

  • 寬表引擎為2.4.3及以上版本。

  • 已將用戶端IP地址添加至Lindorm白名單。如何添加,請參見設定白名單

  • 已擷取寬表引擎的HBase Java API串連地址。如何擷取,請參見查看串連地址

注意事項

  • 如果您想要通過公網訪問或您的執行個體類型為Lindorm單節點,在執行本文操作前,需要先升級SDK並更改配置。具體操作,請參見通過HBase Java API串連並使用寬表引擎章節中的步驟1。

  • 如果應用部署在ECS執行個體,通過專用網路訪問Lindorm執行個體前,需要確保Lindorm執行個體和ECS執行個體滿足以下條件,以保證網路的連通性。

    • 所在地區相同,並建議所在可用性區域相同(以減少網路延時)。

    • ECS執行個體與Lindorm執行個體屬於同一專用網路。

添加Lindorm訪問配置

  • 方式一:通過設定檔添加訪問配置。

    在設定檔hbase-site.xml中增加下列配置項:

    <configuration>
          <!--
        寬表引擎的HBase Java API串連地址
        -->
        <property>
            <name>hbase.zookeeper.quorum</name>
            <value>ld-m5ef25p66n5es****-proxy-lindorm.lindorm.rds.aliyuncs.com:30020</value>
        </property>
    </configuration>
  • 方式二:通過代碼在Configuration中添加參數。

    // 建立一個Configuration
    Configuration conf = HBaseConfiguration.create();
    // 寬表引擎的HBase Java API串連地址
    conf.set("hbase.zookeeper.quorum", "ld-m5ef25p66n5es****-proxy-lindorm.lindorm.rds.aliyuncs.com:30020");

Spark訪問樣本

test(" test the spark sql count result") {
  //1. 添加HBaseue訪問配置
  var conf = HBaseConfiguration.create
  conf.set("hbase.zookeeper.quorum", "ld-m5ef25p66n5es****-proxy-lindorm.lindorm.rds.aliyuncs.com:30020")
  //2. 建立表
  val hbaseTableName = "testTable"
  val cf = "f"
  val column1 = cf + ":a"
  val column2 = cf + ":b"
  var rowsCount: Int = -1
  var namespace = "spark_test"
  val admin = ConnectionFactory.createConnection(conf).getAdmin()
  val tableName = TableName.valueOf(namespace, hbaseTableName)   
  val htd = new HTableDescriptor(tableName)
  htd.addFamily(new HColumnDescriptor(cf))
  admin.createTable(htd)
  //3. 插入測試資料
  val rng = new Random()
  val k: Array[Byte] = new Array[Byte](3)
  val famAndQf = KeyValue.parseColumn(Bytes.toBytes(column))
  val puts = new util.ArrayList[Put]()
  var i = 0
  for (b1 <- ('a' to 'z')) {
      for (b2 <- ('a' to 'z')) {
        for (b3 <- ('a' to 'z')) {
          if(i < 10) {
            k(0) = b1.toByte
            k(1) = b2.toByte
            k(2) = b3.toByte
            val put = new Put(k)
            put.addColumn(famAndQf(0), famAndQf(1), ("value_" + b1 + b2 + b3).getBytes())
            puts.add(put)
            i = i + 1
          }
        }
      }
  }
  val conn = ConnectionFactory.createConnection(conf)
  val table = conn.getTable(tableName)
  table.put(puts)
  //4. 建立spark表
  val sparkTableName = "spark_hbase"
  val createCmd = s"""CREATE TABLE ${sparkTableName} USING org.apache.hadoop.hbase.spark
                         |    OPTIONS ('catalog'=
                         |    '{"table":{"namespace":"$${hbaseTableName}",                   "name":"${hbaseTableName}"},"rowkey":"rowkey",
                         |    "columns":{
                         |    "col0":{"cf":"rowkey", "col":"rowkey", "type":"string"},
                         |    "col1":{"cf":"cf1", "col":"a", "type":"string"},
                         |    "col2":{"cf":"cf1", "col":"b", "type":"String"}}}'
                         |    )""".stripMargin
  println(" createCmd: \n" + createCmd + " rows : " + rowsCount)
  sparkSession.sql(createCmd)
  //5. 執行count sql
  val result = sparkSession.sql("select count(*) from " + sparkTableName)
  val sparkCounts = result.collect().apply(0).getLong(0)
  println(" sparkCounts : " + sparkCounts)