全部產品
Search
文件中心

Lindorm:通過Pull模式建立資料訂閱通道

更新時間:Jul 06, 2024

本文介紹通過Pull模式建立資料訂閱功能,建立後訂閱通道會即時拉取資料庫執行個體的增量資料,並將增量資料儲存在訂閱通道中,您可以使用Lindorm提供的SDK從訂閱通道中訂閱增量資料並進行消費。同時,您可以在LTS頁面進行訂閱通道的建立、查看及刪除等操作。

前提條件

已將用戶端IP添加至白名單中,具體操作請參見設定白名單

已開通資料訂閱功能,具體操作,請參見開通資料訂閱

操作步驟

  1. 進入LTS(原BDS)頁面,在左側導覽列中,選擇資料訂閱 > Pull模式

    streamone

  2. 單擊建立資料訂閱通道,並配置以下參數。

    建立訂閱通道

    名稱

    描述

    源叢集

    填寫Lindorm執行個體ID。

    Lindorm表名

    選擇需要建立資料訂閱通道的Lindorm執行個體表,一條通道只能選擇一張表格。

    主題名

    用於消費資料的主題名稱。

    資料到期時間(天)

    表示資料可以儲存的天數,預設為7天。

    主題分區數

    表示Kafka用戶端為該主題設定多個分區,多分區可以並發消費資料,預設為4個分區。

  3. 單擊提交

  4. (可選)找到目標訂閱通道,單擊操作列的詳情,可以查看資料訂閱通道詳情、消費詳情和儲存詳情。

    詳細資料

  5. (可選)您可以通過以下代碼在Kafka用戶端對訂閱資料進行消費。

    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.kafka.clients.admin.AdminClientConfig;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.ByteArrayDeserializer;
    
    import java.time.Duration;
    import java.util.Arrays;
    import java.util.Properties;
    
    public class TestConsume {
      public static void main(String[] args) throws Exception {
        // 建立訂閱通道時填寫的topic名稱
        String topic = "test-topic";
    
        // 連結endpoint的配置項
        Properties props = new Properties();
        // 指定連結endpoint地址
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "ld-xxx:9092");
        // 指定Key序列化器,不可更改
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
        // 指定Value序列化器,不可更改
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
        // 指定消費組名稱,在消費時會自動建立
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id-0");
    
        // 建立消費者
        KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props);
        // 訂閱主題
        consumer.subscribe(Arrays.asList(topic));
    
        // 用消費者拉取資料
        ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(10000));
        for (ConsumerRecord<byte[], byte[]> record : records) {
          // 查看資料內容
          System.out.println("key: " + Bytes.toString(record.key()));
          System.out.println("value: " + Bytes.toString(record.value()));
        }
        // 提交當前消費位移
        consumer.commitSync();
        // 關閉消費者
        consumer.close();
      }
    }
    說明

    資料消費格式說明請參見資料消費格式