全部產品
Search
文件中心

:使用Flume同步EMR Kafka叢集的資料至OSS-HDFS服務

更新時間:Jun 19, 2024

本文為您介紹如何使用Flume同步EMR Kafka叢集的資料至阿里雲OSS-HDFS服務。

前提條件

操作步驟

  1. 配置Flume。

    1. 進入Flume的配置頁面。

      1. 登入E-MapReduce控制台

      2. 在頂部功能表列處,根據實際情況選擇地區和資源群組

      3. EMR on ECS頁面,單擊目的地組群操作列的叢集服務

      4. 叢集服務頁簽,單擊FLUME服務地區的配置

    2. 設定JVM最大可用記憶體(Xmx)。

      Flume向OSS-HDFS寫入資料時需要佔用較大的JVM記憶體,建議增加Flume Agent的Xmx。具體步驟如下:

      1. 單擊flume-env.sh頁簽。

        本文採用了全域配置方式。如果您希望按照節點配置,可以在FLUME服務配置頁面的下拉式清單中選擇獨立節點配置

      2. 修改JAVA_OPTS的參數值。

        例如,JVM最大可用記憶體設定為1 GB,則參數值修改為-Xmx1g。

      3. 單擊儲存。

    3. 修改flume-conf.properties配置。

      1. 單擊flume-conf.properties頁簽。

        本文採用了全域配置方式。如果您希望按照節點配置,可以在FLUME服務配置頁面的下拉式清單中選擇獨立節點配置

      2. flume-conf.properties右側,輸入以下配置項。

        說明

        以下樣本中的default-agent的值需與FLUME服務配置頁面的agent_name參數值保持一致。

        default-agent.sources = source1
        default-agent.sinks = k1
        default-agent.channels = c1
        
        default-agent.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
        default-agent.sources.source1.channels = c1
        default-agent.sources.source1.kafka.bootstrap.servers = <kafka-host1:port1,kafka-host2:port2...>
        default-agent.sources.source1.kafka.topics = flume-test
        default-agent.sources.source1.kafka.consumer.group.id = flume-test-group
        
        default-agent.sinks.k1.type = hdfs
        default-agent.sinks.k1.hdfs.path = oss://<examplebucket>.<exampleregion>.oss-dls.aliyuncs.com/<exampledir>
        default-agent.sinks.k1.hdfs.fileType=DataStream
        
        # Use a channel which buffers events in memory
        default-agent.channels.c1.type = memory
        default-agent.channels.c1.capacity = 100
        default-agent.channels.c1.transactionCapacity = 100
        
        # Bind the source and sink to the channel
        default-agent.sources.source1.channels = c1
        default-agent.sinks.k1.channel = c1

        參數

        描述

        default-agent.sources.source1.kafka.bootstrap.servers

        Kafka叢集Broker的Host和連接埠號碼。

        default-agent.sinks.k1.hdfs.path

        OSS-HDFS的路徑。填寫格式為oss://<examplebucket>.<exampleregion>.oss-dls.aliyuncs.com/<exampledir>。樣本值為oss://flume-test.cn-hangzhou.oss-dls.aliyuncs.com/result

        各參數說明如下:

        • <examplebucket>:填寫已開啟OSS-HDFS服務的Bucket名稱。

        • <exampleregion>:填寫Bucket所在的地區ID。

        • <exampledir>:填寫OSS-HDFS服務的目錄名稱。

        default-agent.channels.c1.capacity

        通道中儲存的最大事件數目。請根據實際環境修改該參數值。

        default-agent.channels.c1.transactionCapacity

        每個事務通道將從源接收或提供給接收器的最大事件數目。請根據實際環境修改該參數值。

      3. 單擊儲存。

  2. 測試資料同步情況。

    1. 通過SSH方式串連DataFlow叢集,詳情請參見登入叢集
    2. 建立名稱為flume-test的Topic。
      kafka-topics.sh --partitions 10 --replication-factor 2 --zookeeper master-1-1:2181/emr-kafka --topic flume-test --create
    3. 產生測試資料。

      kafka-console-producer.sh --topic flume-test --broker-list master-1-1:9092

      例如,輸入abc並斷行符號。

      oss://flume-test.cn-hangzhou.oss-dls.aliyuncs.com/result路徑下會以目前時間的時間戳記(毫秒)為尾碼產生格式為FlumeData.xxxx的檔案。