This topic describes how to use Flume to synchronize data from an EMR Kafka cluster to an Object Storage Service (OSS) bucket for which OSS-HDFS is enabled.
Prerequisites
OSS-HDFS is enabled for a bucket and permissions are granted to access OSS-HDFS. For more information, see Enable OSS-HDFS and grant access permissions.
A cluster for data lake analytics is created, with Flume selected. For more information, see Create a cluster.
A DataFlow cluster is created, with Kafka selected. For more information, see Create a cluster.
Procedure
Configure Flume.
Go to the Flume configuration page.
Go to the EMR on ECS page of the EMR console.
In the top navigation bar, select the region where your cluster resides and select a resource group based on your business requirements.
On the EMR on ECS page, find the desired cluster and click Services in the Actions column.
On the Services tab, click Configure in the Flume section.
Set the maximum memory available to Java Virtual Machine (JVM).
A large amount of JVM memory is consumed when data is written from Flume to OSS-HDFS. We recommend that you increase the value of the Xmx option for the Flume agent. To increase the maximum JVM memory, perform the following steps:
Click the flume-env.sh tab.
This topic uses global configuration. If you want to apply the configuration only to a specific node, select Independent Node Configuration from the second drop-down list on the Configure tab of the FLUME service.
Change the value of JAVA_OPTS.
For example, if you want to set the maximum memory available to JVM to 1 GB, set the value to -Xmx1g.
Click Save.
Modify flume-conf.properties.
On the Configure tab, click the flume-conf.properties tab.
This topic uses global configuration. If you want to apply the configuration only to a specific node, select Independent Node Configuration from the second drop-down list on the Configure tab of the FLUME service.
In the editor next to flume-conf.properties, enter the following configuration items.
NoteThe value of the default-agent in the following configuration must be the same as the value of the agent_name parameter on the Configure tab of the FLUME service.
default-agent.sources = source1 default-agent.sinks = k1 default-agent.channels = c1 default-agent.sources.source1.type = org.apache.flume.source.kafka.KafkaSource default-agent.sources.source1.channels = c1 default-agent.sources.source1.kafka.bootstrap.servers = <kafka-host1:port1,kafka-host2:port2...> default-agent.sources.source1.kafka.topics = flume-test default-agent.sources.source1.kafka.consumer.group.id = flume-test-group default-agent.sinks.k1.type = hdfs default-agent.sinks.k1.hdfs.path = oss://<examplebucket>.<exampleregion>.oss-dls.aliyuncs.com/<exampledir> default-agent.sinks.k1.hdfs.fileType=DataStream # Use a channel which buffers events in memory default-agent.channels.c1.type = memory default-agent.channels.c1.capacity = 100 default-agent.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel default-agent.sources.source1.channels = c1 default-agent.sinks.k1.channel = c1
Parameter
Description
default-agent.sources.source1.kafka.bootstrap.servers
The hostnames and port numbers of brokers in the Kafka cluster.
default-agent.sinks.k1.hdfs.path
The path that is used to access OSS-HDFS. The path format is
oss://<examplebucket>.<exampleregion>.oss-dls.aliyuncs.com/<exampledir>
. An example path is oss://flume-test.cn-hangzhou.oss-dls.aliyuncs.com/result.Path components:
<examplebucket>: the name of the bucket for which the OSS-HDFS service is enabled.
<exampleregion>: the ID of the region where the bucket is located.
<exampledir>: the name of the directory for the OSS-HDFS service.
default-agent.channels.c1.capacity
The maximum number of events that are stored in the channel. Modify this parameter based on your business requirements.
default-agent.channels.c1.transactionCapacity
The maximum number of events that each transaction channel receives from the source or provides for the receiver. Modify this parameter based on your business requirements.
Click Save.
Check the result of data synchronization.
- Use Secure Shell (SSH) to log on to the Dataflow cluster. For more information, see Log on to a cluster.
- Run the following command to create a topic named flume-test:
kafka-topics.sh --partitions 10 --replication-factor 2 --zookeeper master-1-1:2181/emr-kafka --topic flume-test --create
Generate test data.
kafka-console-producer.sh --topic flume-test --broker-list master-1-1:9092
For example, enter
abc
and press Enter.A FlumeData.xxxx file is generated in the oss://flume-test.cn-hangzhou.oss-dls.aliyuncs.com/result, where xxxx is the UNIX timestamp in milliseconds of file generation.