本文介紹通過Pull模式建立資料訂閱功能,建立後訂閱通道會即時拉取資料庫執行個體的增量資料,並將增量資料儲存在訂閱通道中,您可以使用Lindorm提供的SDK從訂閱通道中訂閱增量資料並進行消費。同時,您可以在LTS頁面進行訂閱通道的建立、查看及刪除等操作。
前提條件
已將用戶端IP添加至白名單中,具體操作請參見設定白名單。
已開通資料訂閱功能,具體操作,請參見開通資料訂閱。
操作步驟
進入LTS(原BDS)頁面,在左側導覽列中,選擇資料訂閱 > Pull模式。
單擊建立資料訂閱通道,並配置以下參數。
名稱
描述
源叢集
填寫Lindorm執行個體ID。
Lindorm表名
選擇需要建立資料訂閱通道的Lindorm執行個體表,一條通道只能選擇一張表格。
主題名
用於消費資料的主題名稱。
資料到期時間(天)
表示資料可以儲存的天數,預設為7天。
主題分區數
表示Kafka用戶端為該主題設定多個分區,多分區可以並發消費資料,預設為4個分區。
單擊提交。
(可選)找到目標訂閱通道,單擊操作列的詳情,可以查看資料訂閱通道詳情、消費詳情和儲存詳情。
(可選)您可以通過以下代碼在Kafka用戶端對訂閱資料進行消費。
import org.apache.hadoop.hbase.util.Bytes; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import java.time.Duration; import java.util.Arrays; import java.util.Properties; public class TestConsume { public static void main(String[] args) throws Exception { // 建立訂閱通道時填寫的topic名稱 String topic = "test-topic"; // 連結endpoint的配置項 Properties props = new Properties(); // 指定連結endpoint地址 props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "ld-xxx:9092"); // 指定Key序列化器,不可更改 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); // 指定Value序列化器,不可更改 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); // 指定消費組名稱,在消費時會自動建立 props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id-0"); // 建立消費者 KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props); // 訂閱主題 consumer.subscribe(Arrays.asList(topic)); // 用消費者拉取資料 ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(10000)); for (ConsumerRecord<byte[], byte[]> record : records) { // 查看資料內容 System.out.println("key: " + Bytes.toString(record.key())); System.out.println("value: " + Bytes.toString(record.value())); } // 提交當前消費位移 consumer.commitSync(); // 關閉消費者 consumer.close(); } }
說明資料消費格式說明請參見資料消費格式。