This topic describes how to submit a Flink job.
Prerequisites
A Flink cluster is created on the EMR on ACK page of the new Alibaba Cloud E-MapReduce (EMR) console. For more information, see Getting started.
Method 1: Use the ACK console
Log on to the EMR on ACK console.
On the EMR on ACK page, find the specified EMR cluster and click the link in the ACK Cluster column.
In the upper-right corner of the Pods page, click Create from YAML.
On the Create page, select Custom from the Sample Template drop-down list, add the following code to the editor, and then click Create.
apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-emr-example spec: flinkVersion: v1_13 flinkConfiguration: taskmanager.numberOfTaskSlots: "2" state.savepoints.dir: file:///flink-data/flink-savepoints state.checkpoints.dir: file:///flink-data/flink-checkpoints serviceAccount: flink podTemplate: spec: serviceAccount: flink containers: - name: flink-main-container volumeMounts: - mountPath: /flink-data name: flink-volume volumes: - name: flink-volume emptyDir: {} jobManager: replicas: 1 resource: memory: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 job: jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar parallelism: 2 upgradeMode: stateless
NoteIn this example, Flink 1.13 is used. If you use Flink of another version, configure the flinkVersion parameter based on the version information in the EMR console.
Method 2: Use kubectl
Connect to an Alibaba Cloud Container Service for Kubernetes (ACK) cluster by using kubectl. For more information, see Obtain the kubeconfig file of a cluster and use kubectl to connect to the cluster.
You can also connect to the ACK cluster by calling an API operation. For more information, see Use the Kubernetes API.
Create a file named basic-emr-example.yaml. The file contains the following information:
apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-emr-example spec: flinkVersion: v1_13 flinkConfiguration: taskmanager.numberOfTaskSlots: "2" state.savepoints.dir: file:///flink-data/flink-savepoints state.checkpoints.dir: file:///flink-data/flink-checkpoints serviceAccount: flink podTemplate: spec: serviceAccount: flink containers: - name: flink-main-container volumeMounts: - mountPath: /flink-data name: flink-volume volumes: - name: flink-volume emptyDir: {} jobManager: replicas: 1 resource: memory: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 job: jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar parallelism: 2 upgradeMode: stateless
NoteYou can change the name of the file. In this example, basic-emr-example.yaml is used.
In this example, Flink 1.13 is used. If you use another version of Flink, configure the flinkVersion parameter based on your business requirements.
Run the following command to submit the Flink job:
kubectl apply -f basic-emr-example.yaml -namespace <Namespace in which the cluster resides>
NoteReplace
<Namespace in which the cluster resides>
with the namespace based on your business requirements. To view the namespace, log on to the EMR console and go to the Cluster Details tab.