このトピックでは、Flumeを使用してEMR KafkaクラスターからOSS-HDFSが有効になっているObject Storage Service (OSS) バケットにデータを同期する方法について説明します。
前提条件
バケットのOSS-HDFSが有効になり、OSS-HDFSにアクセスする権限が付与されます。 詳細については、「OSS-HDFSの有効化とアクセス許可の付与」をご参照ください。
データレイク分析用のクラスターが作成され、Flumeが選択されます。 詳細については、「クラスターの作成」をご参照ください。
DataFlowクラスターが作成され、Kafkaが選択されます。 詳細については、「クラスターの作成」をご参照ください。
手順
Flumeを設定します。
Flume設定ページに移動します。
EMRコンソールの [EMR on ECS] ページに移動します。
上部のナビゲーションバーで、クラスターが存在するリージョンを選択し、ビジネス要件に基づいてリソースグループを選択します。
[ECS上のEMR] ページで、目的のクラスターを見つけ、[操作] 列の [サービス] をクリックします。
[サービス] タブで、[Flume] セクションの [設定] をクリックします。
Java仮想マシン (JVM) で使用可能な最大メモリを設定します。
FlumeからJVMにデータを書き込むと、大量のOSS-HDFSメモリが消費されます。 FlumeエージェントのXmxオプションの値を増やすことを推奨します。 最大JVMメモリを増やすには、次の手順を実行します。
flume-env.shタブをクリックします。
このトピックでは、グローバル設定を使用します。 特定のノードにのみ設定を適用する場合は、FLUMEサービスの [設定] タブの2番目のドロップダウンリストから [独立ノード設定] を選択します。
JAVA_OPTSの値を変更します。
たとえば、JVMで使用可能な最大メモリを1 GBに設定する場合は、値を-Xmx1gに設定します。
[保存] をクリックします。
flume-conf.propertiesを変更します。
[設定] タブで、[flume-conf.properties] タブをクリックします。
このトピックでは、グローバル設定を使用します。 特定のノードにのみ設定を適用する場合は、FLUMEサービスの [設定] タブの2番目のドロップダウンリストから [独立ノード設定] を選択します。
flume-conf.propertiesの横にあるエディターで、次の設定項目を入力します。
説明次の設定のdefault-agentの値は、FLUMEサービスの [設定] タブのagent_nameパラメーターの値と同じである必要があります。
default-agent.sources = source1 default-agent.sinks = k1 default-agent.channels = c1 default-agent.sources.source1.type = org.apache.flume.source.kafka.KafkaSource default-agent.sources.source1.channels = c1 default-agent.sources.source1.kafka.bootstrap.servers = <kafka-host1: ポート1,kafka-host2: ポート2 ..> default-agent.sources.source1.kafka.topics = flume-test default-agent.sources.source1.kafka.consumer.group.id = flume-test-group default-agent.sinks.k1.type = hdfs default-agent.sinks.k1.hdfs.path = oss://<examplebucket> 。<exampleregio n>.oss-dls.aliyuncs.com/<exampledir> default-agent.sinks.k1.hdfs.fileType=DataStream # メモリ内のイベントをバッファリングするチャネルを使用する default-agent.channels.c1.type=メモリ default-agent.channels.c1.capacity = 100 default-agent.channels.c1.transactionCapacity = 100 # ソースとシンクをチャネルにバインドする default-agent.sources.source1.channels = c1 default-agent.sinks.k1.channel = c1
パラメーター
説明
default-agent.sources.source1.kafka.bootstrap.servers
Kafkaクラスター内のブローカーのホスト名とポート番号。
default-agent.sinks.k1.hdfs.path
OSS-HDFSへのアクセスに使用されるパス。 パスの形式は
oss://<examplebucket>.<exampleregio n>.oss-dls.aliyuncs.com/<exampledir>
です。 パスの例はoss:// flume-test.cn-hangzhou.oss-dls.aliyuncs.com/resultです。パスコンポーネント:
<examplebucket>: OSS-HDFSサービスが有効になっているバケットの名前。
<exampleregion>: バケットが配置されているリージョンのID。
<exampledir>: OSS-HDFSサービスのディレクトリ名。
default-agent.channels.c1.ca平和
チャネルに保存されるイベントの最大数。 ビジネス要件に基づいてこのパラメーターを変更します。
default-agent.channels.c1.transactionCapacity
各トランザクションチャネルがソースから受信する、または受信機に提供するイベントの最大数。 ビジネス要件に基づいてこのパラメーターを変更します。
[保存] をクリックします。
データ同期の結果を確認します。
- Secure Shell (SSH) を使用してDataflowクラスターにログインします。 詳細については、「クラスターへのログイン」をご参照ください。
- 次のコマンドを実行して、flume-testという名前のトピックを作成します。
kafka-topics.sh-パーティション10-レプリケーションファクター2-zookeeper master-1-1:2181/emr-kafka-topic flume-test-create
テストデータを生成します。
kafka-console-producer.sh -- topic flume-test -- broker-list master-1-1:9092
たとえば、
abc
と入力し、enterキーを押します。FlumeData.xxxxファイルはoss:// flume-test.cn-hangzhou.oss-dls.aliyuncs.com/resultで生成されます。xxxxは、ファイル生成のミリ秒単位のUNIXタイムスタンプです。