本文匯總了DataFlow叢集使用時的常見問題。
- 叢集使用與營運:
- 作業問題:
- 如何處理上下遊儲存(Connector)問題?
- 通過DataFlow叢集運行Flink作業免密讀寫OSS時報錯,該如何處理?
- Flink UI上作業只有一個Operator,並且顯示Records Received為0,該如何處理?
- 用戶端日誌在哪裡?如何查看?
- 通過flink run命令運行作業時,作業的參數沒有生效
- 如何使用開源的StateBackend?
- 叢集中的日誌在哪裡?如何查看?
- 作業JAR包和叢集內Flink的JAR包存在衝突
- Flink作業如何開啟火焰圖?
- 報錯Multiple factories for identifier '...' that implement '...' found in the classpath,該如何處理?
- 報錯java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id timed out,該如何處理?
- 報錯java.lang.OutOfMemoryError: GC overhead limit exceeded,該如何處理?
- 報錯Exception in thread "main" java.lang.NoSuchFieldError: DEPLOYMENT_MODE,該如何處理?
叢集中的日誌在哪裡?如何查看?
- 如果Flink叢集的JobManager已退出,可以在叢集機器中通過命令yarn logs -applicationId application_xxxx_yy拉取到本地進行查看,也可以在YARN的Web UI中訪問結束的作業的日誌連結在網頁端進行查看。
- 如果Flink叢集的JobManager仍在運行。
- 可以通過訪問對應的Flink Web UI進行查看。
- 使用命令列命令,通過yarn logs -applicationId application_xxxx_yy -am ALL -logFiles jobmanager.log查看JobManager日誌,通過yarn logs -applicationId application_xxxx_yy -containerId container_xxxx_yy_aa_bb -logFiles taskmanager.log來查看TaskManager日誌。
作業JAR包和叢集內Flink的JAR包存在衝突
NoSuchFieldError/NoSuchMethodError/ClassNotFoundException
等。您可以通過以下步驟排查和解決:- 定位引起衝突的依賴類。根據報錯中的異常類,您可以找到該類所在的依賴JAR,然後在作業JAR的pom.xml所在目錄運行mvn dependency:tree查看依賴樹,判斷該類是如何被引入的。
- 排除引起衝突的依賴類。
- 如果是在pom.xml中錯誤設定了JAR包的Scope,則可以修改Scope為Provided來將對應JAR包排除。
- 如果確實需要使用該異常類所在的JAR,則可通過添加Exclude來排除特定類。
- 如果確實需要使用該異常類,無法更換為叢集內對應版本的類,可以通過Maven Shade Plugin對該類進行Shade。
此外,如果Classpath中存在多個版本的JAR包,作業實際使用的Class版本和類的載入順序有關,為了確認某個類具體是從哪個JAR載入而來,可以在flink-conf.yaml中設定JVM參數env.java.opts: -verbose:class或者通過指定動態參數-Denv.java.opts="-verbose:class"來列印載入的類及其來源。說明 對於JobManager或TaskManager來說,上述資訊會列印到jobmanager.out
或taskmanager.out
中。
DataFlow叢集外的機器,如何提交作業到DataFlow叢集?
- 確保DataFlow叢集和DataFlow叢集外的機器網路互連。
- 配置提交Flink作業的用戶端的Hadoop YARN環境。
DataFlow叢集中的Hadoop YARN的軟體安裝目錄是/opt/apps/YARN/yarn-current,設定檔的目錄是/etc/taihao-apps/hadoop-conf/,您需要將yarn-current目錄及hadoop-conf目錄下載到提交Flink作業的用戶端上。
然後,在提交Flink作業的用戶端上,配置如下環境變數。
export HADOOP_HOME=/path/to/yarn-current && \ export PATH=${HADOOP_HOME}/bin/:$PATH && \ export HADOOP_CLASSPATH=$(hadoop classpath) && \ export HADOOP_CONF_DIR=/path/to/hadoop-conf
重要 Hadoop的設定檔中(例如yarn-site.xml等)配置的服務地址(例如ResourceManager等),使用的是全網域名稱(FQDN,Fully Qualified Domain Name)。例如,master-1-1.c-xxxxxxxxxx.cn-hangzhou.emr.aliyuncs.com。因此,如果您通過叢集外的機器提交作業,需要能夠解析這些FQDN或者將設定檔中的FQDN修改成對應的IP地址。 - 完成以上配置後,您在叢集外的機器上啟動Flink作業(例如,運行命令
flink run -d -t yarn-per-job -ynm flink-test $FLINK_HOME/examples/streaming/TopSpeedWindowing.jar
)後,應當能在DataFlow叢集的YARN Web UI中看到相應的Flink作業。
在DataFlow叢集外機器上,如何解析DataFlow叢集中的hostname?
- 修改提交Flink作業的用戶端上的/etc/hosts檔案,添加相應的hostname到IP的映射。
- 通過PrivateZone提供的DNS解析服務。如果您有自己的網域名稱解析服務,也可以通過如下方式,配置JVM的運行參數,使用自己的網域名稱解析服務。
env.java.opts.client: "-Dsun.net.spi.nameservice.nameservers=xxx -Dsun.net.spi.nameservice.provider.1=dns,sun -Dsun.net.spi.nameservice.domain=yyy"
如何查看Flink作業的運行狀態?
- 通過EMR控制台查看。
EMR支援Knox,可以通過公網方式訪問YARN、Flink等的Web UI介面,Flink的Web UI可以通過YARN進行查看,詳細資料請參見通過Web UI查看作業狀態。
- 通過SSH隧道的方式查看,詳情資訊請參見通過SSH隧道方式訪問開源組件Web UI。
- 直接存取YARN REST介面。
curl --compressed -v -H "Accept: application/json" -X GET "http://master-1-1:8088/ws/v1/cluster/apps?states=RUNNING&queue=default&user.name=***"
說明 需確保安全性群組開放了8443和8088連接埠,可以訪問到YARN的REST介面或者DataFlow叢集和訪問的節點處於同一內網中。
如何訪問Flink作業的日誌?
- 對於運行中的作業,可以通過Flink Web UI,訪問Flink作業的日誌。
- 對於已經運行結束的作業,可以通過Flink History Server查看作業的統計資訊或者通過命令
yarn logs -applicationId application_xxxx_yyyy
訪問作業的日誌,已經運行結束的作業的日誌預設儲存在HDFS叢集的hdfs:///tmp/logs/$USERNAME/logs/目錄下。
如何訪問DataFlow叢集中的Flink HistoryServer?
- 配置安全性群組規則,開放master-1-1節點的18082連接埠的存取權限。
- 直接存取http://$master-1-1-ip:18082。
如何使用DataFlow叢集中所支援的商業化Connector?
DataFlow叢集提供了很多商業化Connector,例如Hologres、SLS、MaxCompute、DataHub、Elasticsearch和ClickHouse等,您在Flink作業中除了可以使用開源的Connector之外,還可以使用這些商業化Connector。下面以Hologres Connector為例,介紹如何在Flink作業中使用DataFlow叢集所攜帶的商業化Connector。
- 作業開發
- 下載DataFlow叢集所攜帶的商業化Connector的JAR包(位於DataFlow叢集的/opt/apps/FLINK/flink-current/opt/connectors目錄下),並通過如下方式將商業化Connector安裝在本地Maven環境中。
mvn install:install-file -Dfile=/path/to/ververica-connector-hologres-1.13-vvr-4.0.7.jar -DgroupId=com.alibaba.ververica -DartifactId=ververica-connector-hologres -Dversion=1.13-vvr-4.0.7 -Dpackaging=jar
- 在專案的pom.xml檔案中添加以下依賴。
<dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-hologres</artifactId> <version>1.13-vvr-4.0.7</version> <scope>provided</scope> </dependency>
- 下載DataFlow叢集所攜帶的商業化Connector的JAR包(位於DataFlow叢集的/opt/apps/FLINK/flink-current/opt/connectors目錄下),並通過如下方式將商業化Connector安裝在本地Maven環境中。
- 運行作業
- 方式一:
- 拷貝Hologres Connector到一個獨立的目錄。
hdfs mkdir hdfs:///flink-current/opt/connectors/hologres/ hdfs cp hdfs:///flink-current/opt/connectors/ververica-connector-hologres-1.13-vvr-4.0.7.jar hdfs:///flink-current/opt/connectors/hologres/ververica-connector-hologres-1.13-vvr-4.0.7.jar
- 提交作業時,命令中添加以下參數。
-D yarn.provided.lib.dirs=hdfs:///flink-current/opt/connectors/hologres/
- 拷貝Hologres Connector到一個獨立的目錄。
- 方式二:
- 拷貝Hologres Connector到提交Flink作業的用戶端的/opt/apps/FLINK/flink-current/opt/connectors/ververica-connector-hologres-1.13-vvr-4.0.7.jar目錄下,與DataFlow叢集中的目錄結構保持一致。
- 提交作業時,命令中添加以下參數。
-C file:///opt/apps/FLINK/flink-current/opt/connectors/ververica-connector-hologres-1.13-vvr-4.0.7.jar
- 方式三:將Hologres Connector打包到作業的JAR包中。
- 方式一:
如何使用GeminiStateBackend?
DataFlow叢集提供了企業版狀態後端(即GeminiStateBackend),效能是開源版本的3~5倍。DataFlow叢集在設定檔中預設使用GeminiStateBackend,關於GeminiStateBackend的更多進階配置,詳情請參見企業級狀態後端儲存配置。
如何使用開源的StateBackend?
flink run-application -t yarn-application -D state.backend=rocksdb /opt/apps/FLINK/flink-current/examples/streaming/TopSpeedWindowing.jar
或者如果您想讓上述修改對後續作業生效,在EMR控制台,修改state.backend參數的值為您想使用的狀態的後端(例如rocksdb)。單擊儲存,然後單擊部署用戶端配置:用戶端日誌在哪裡?如何查看?
在EMR的叢集環境中我們配置了FLINK_LOG_DIR環境變數來指明Flink用戶端的日誌存放位置。它的預設值是/var/log/taihao-apps/flink(在3.43.0之前的版本中預設是/mnt/disk1/log/flink)。您如果需要查看用戶端的完整日誌(如SQL-Client的日誌)可以在該目錄下查看對應檔案。
通過flink run命令運行作業時,作業的參數沒有生效
在通過命令列命令運行Flink作業時,Flink作業的參數需要放在Flink作業JAR包的後面,例如flink run -d -t yarn-per-job test.jar arg1 arg2。
報錯Multiple factories for identifier '...' that implement '...' found in the classpath,該如何處理?
- 報錯原因
表明在Classpath中找到了某個Connector的多個實現。原因通常為在作業JAR添加了相關Connector依賴,同時又手動在$FLINK_HOME/ib目錄下放入了相同的Connector依賴,導致了依賴衝突。
- 解決方案
解決思路為去除重複的依賴,詳細步驟可以參考作業JAR包和叢集內Flink的JAR包存在衝突問題排查。
如何開啟Flink作業JobManager的HA?
high-availability: zookeeper
high-availability.zookeeper.quorum: 192.168.**.**:2181,192.168.**.**:2181,192.168.**.**:2181
high-availability.zookeeper.path.root: /flink
high-availability.storageDir: hdfs:///flink/recovery
如何查看Flink作業的監控指標?
- 您可以在EMR控制台目的地組群的叢集監控頁面,單擊指標監控。
- 在Dashboard下拉框中選擇FLINK。
- 選擇待查看作業對應的Application ID和Job ID,即可展示Flink作業的各項監控指標。說明
- 僅當叢集中已有啟動並執行Flink作業時,才會有可供選擇的Application ID和Job ID。
- 部分指標只有配置了相應上下遊的Source和Sink才會有輸出資訊。例如,sourceIdleTime。
如何處理上下遊儲存(Connector)問題?
關於上下遊儲存方面的常見問題,請參見上下遊儲存。
通過DataFlow叢集運行Flink作業免密讀寫OSS時報錯,該如何處理?
您需要根據具體的報錯資訊,進行相應的處理:
- 報錯提示
java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are only supported for HDFS
。- 問題原因:DataFlow叢集目前是通過內建的JindoSDK來支援免密讀寫OSS,並支援StreamingFileSink等API的,不需要再按照社區文檔進行額外的配置,否則會由於依賴衝突導致此報錯。
- 處理方法:檢查您叢集內提交作業的機器的$FLINK_HOME/plugins目錄,查看是否放置了oss-fs-hadoop目錄。如果放置了該目錄,請刪除該目錄後重新提交作業。
- 報錯提示
Could not find a file system implementation for scheme 'oss'. The scheme is directly supported by Flink through the following plugin: flink-oss-fs-hadoop. ....
。- 問題原因:EMR-3.40及之前的版本的EMR叢集中,master機器組內非master-1-1機器上可能缺少Jindo相關的JAR包。
- 處理方法:
- EMR-3.40及之前的版本:檢查您的叢集內提交作業的機器的$FLINK_HOME/lib目錄下是否有Jindo相關的JAR包,例如jindo-flink-4.0.0.jar。如果沒有Jindo相關的JAR包,您可以在叢集中運行以下命令將Jindo相關的JAR包,拷貝到$FLINK_HOME/lib目錄後重新提交作業。
cp /opt/apps/extra-jars/flink/jindo-flink-*-full.jar $FLINK_HOME/lib
- EMR-3.40之後版本:最佳化了支援的方式,即使$FLINK_HOME/lib目錄下沒有Jindo相關的JAR包,讀寫OSS的作業也是可以正常啟動並執行。
- EMR-3.40及之前的版本:檢查您的叢集內提交作業的機器的$FLINK_HOME/lib目錄下是否有Jindo相關的JAR包,例如jindo-flink-4.0.0.jar。如果沒有Jindo相關的JAR包,您可以在叢集中運行以下命令將Jindo相關的JAR包,拷貝到$FLINK_HOME/lib目錄後重新提交作業。
報錯java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id timed out,該如何處理?
- 報錯原因
直接原因是TaskManager心跳逾時,具體原因可以在TaskManager日誌中查看報錯資訊進行定位。除此之外,還存在TaskManager堆記憶體大小有限或者作業代碼存在記憶體泄露導致的記憶體溢出錯誤,例如報錯java.lang.OutOfMemoryError: GC overhead limit exceeded,該如何處理?。
- 解決方案
上述報錯遇到該類報錯時需要您調大記憶體或者分析作業記憶體使用量情況來進一步定位原因。
報錯java.lang.OutOfMemoryError: GC overhead limit exceeded,該如何處理?
- 報錯原因
該報錯代表為作業設定的記憶體不夠,導致GC逾時。常見原因為代碼(如UDF)發生記憶體泄露或者記憶體大小確實不能滿足業務需求。
- 解決方案
- 您可在重新運行問題作業前通過-D方式指定JVM參數,儲存OutOfMemoryError發生時的現場
-D env.java.opts="-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/dump.hprof"
。 - 在flink-conf.yaml中添加參數
env.java.opts: -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/dump.hprof
來配置OutOfMemoryError發生時進行heap dump。
待作業再次報錯之後,您可針對HeapDumpPath指定的heap dump檔案進行分析(例如使用MAT工具或者jvisualvm工具),確定問題根因。
- 您可在重新運行問題作業前通過-D方式指定JVM參數,儲存OutOfMemoryError發生時的現場
Flink UI上作業只有一個Operator,並且顯示Records Received為0,該如何處理?
這是正常現象,Flink的Records Received相關指標用於描述不同Operator之間的資料通訊,當作業被最佳化為一個Operator時,該指標值恒為0。
Flink作業如何開啟火焰圖?
火焰圖(Flame Graph)用於可視化進程中各個方法的CPU消耗,協助使用者解決效能瓶頸。Flink 1.13版本開始支援火焰圖功能,但為了避免火焰圖對生產環境中作業的影響,預設關閉該功能。如果您需要藉助火焰圖功能對作業效能進行分析,可以在EMR控制台Flink服務配置頁簽的flink-conf.yaml中新增參數為rest.flamegraph.enabled,參數值為true的配置項。新增配置項的具體操作,請參見管理配置項。
關於火焰圖的更多介紹,請參見Flame Graphs。
報錯Exception in thread "main" java.lang.NoSuchFieldError: DEPLOYMENT_MODE,該如何處理?
- 報錯原因
您的作業的JAR中直接或者間接引入了與叢集中Flink版本不相容的flink-core依賴,造成了依賴衝突。
- 解決方案在pom.xml中添加以下資訊,將flink-core依賴的
scope
設定為provided
來修複該問題。<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <!-- Change to your own flink version --> <version>1.16.1</version> <scope>provided</scope> </dependency>
說明version
需要修改為您實際使用的Flink版本。如果您想進一步定位引入該依賴的原因,可以參見作業JAR包和叢集內Flink的JAR包存在衝突。