全部產品
Search
文件中心

E-MapReduce:Routine Load

更新時間:Jul 01, 2024

Routine Load是一種例行匯入方式,StarRocks通過該方式支援從Kafka持續不斷的匯入資料,並且支援通過SQL控制匯入任務的暫停、重啟和停止。本文為您介紹Routine Load匯入的基本原理、匯入樣本以及常見問題。

基本概念

  • RoutineLoadJob:提交的一個例行匯入任務。
  • JobScheduler:例行匯入任務調度器,用於調度和拆分一個RoutineLoadJob為多個Task。
  • Task:RoutineLoadJob被JobScheduler根據規則拆分的子任務。
  • TaskScheduler:任務調度器,用於調度Task的執行。

基本原理

Routine Load的匯入流程如下圖。Routine Load
匯入流程如下:
  1. 使用者通過支援MySQL協議的用戶端向FE提交一個Kafka匯入任務。
  2. FE將一個匯入任務拆分成若干個Task,每個Task負責匯入指定的一部分資料。
  3. 每個Task被分配到指定的BE上執行。在BE上,一個Task被視為一個普通的匯入任務,通過Stream Load的匯入機制進行匯入。
  4. BE匯入完成後,向FE彙報。
  5. FE根據彙報結果,繼續產生後續新的Task,或者對失敗的Task進行重試。
  6. FE會不斷的產生新的Task,來完成資料不間斷的匯入。
說明 本文圖片和部分內容來源於開源StarRocks的Continuously load data from Apache Kafka

匯入樣本

環境要求

  • 支援訪問無認證或使用SSL方式認證的Kafka叢集。
  • 支援的訊息格式如下:
    • CSV文字格式設定,每一個message為一行,且行尾不包含分行符號。
    • JSON文字格式設定。
  • 不支援Array類型。
  • 僅支援Kafka 0.10.0.0及以上版本。

建立匯入任務

  • 文法
    CREATE ROUTINE LOAD [database.][job_name] ON [table_name]
        [COLUMNS TERMINATED BY "column_separator" ,]
        [COLUMNS (col1, col2, ...) ,]
        [WHERE where_condition ,]
        [PARTITION (part1, part2, ...)]
        [PROPERTIES ("key" = "value", ...)]
        FROM [DATA_SOURCE]
        [(data_source_properties1 = 'value1',
        data_source_properties2 = 'value2',
        ...)]
    相關參數描述如下表所示。
    參數是否必填描述
    job_name匯入任務的名稱,首碼可以攜帶匯入資料庫名稱,常見命名方式為時間戳記+表名。 一個DataBase內,任務名稱不可重複。
    table_name匯入的目標表的名稱。
    COLUMNS TERMINATED子句指定來源資料檔案中的資料行分隔符號,分隔字元預設為\t。
    COLUMNS子句指定來源資料中列和表中列的映射關係。
    • 映射列:例如,目標表有三列col1、col2和col3,來源資料有4列,其中第1、2、4列分別對應col2、col1和col3,則書寫為COLUMNS (col2, col1, temp, col3),其中temp列為不存在的一列,用於跳過來源資料中的第三列。
    • 衍生列:除了直接讀取來源資料的列內容之外,StarRocks還提供對資料列的加工操作。例如,目標表後加入了第四列col4,其結果由col1 + col2產生,則可以書寫為COLUMNS (col2, col1, temp, col3, col4 = col1 + col2)
    WHERE子句指定過濾條件,可以過濾掉不需要的行。過濾條件可以指定映射列或衍生列。

    例如,只匯入k1大於100並且k2等於1000的行,則書寫為WHERE k1 > 100 and k2 = 1000

    PARTITION子句指定匯入目標表的Partition。如果不指定,則會自動匯入到對應的Partition中。
    PROPERTIES子句指定匯入任務的通用參數。
    desired_concurrent_number匯入並發度,指定一個匯入任務最多會被分成多少個子任務執行。必須大於0,預設值為3。
    max_batch_interval每個子任務的最大執行時間。範圍為5~60,單位是秒。預設值為10。

    1.15版本後,該參數表示子任務的調度時間,即任務多久執行一次。任務的消費資料時間為fe.conf中的routine_load_task_consume_second,預設為3s。任務的執行逾時時間為fe.conf中的routine_load_task_timeout_second,預設為15s。

    max_batch_rows每個子任務最多讀取的行數。必須大於等於200000。預設值為200000。

    1.15版本後,該參數只用於定義錯誤偵測視窗範圍,視窗的範圍是10 * max-batch-rows

    max_batch_size每個子任務最多讀取的位元組數。單位為位元組,範圍是100 MB到1 GB。預設值為100 MB。

    1.15版本後,廢棄該參數,任務消費資料的時間為fe.conf中的routine_load_task_consume_second,預設為3s。

    max_error_number採樣視窗內,允許的最大錯誤行數。必須大於等於0。預設是0,即不允許有錯誤行。
    重要 被WHERE條件過濾掉的行不算錯誤行。
    strict_mode是否開啟strict 模式,預設為開啟。

    如果開啟後,非空未經處理資料的列類型變換為NULL,則會被過濾。關閉方式為設定該參數為false。

    timezone指定匯入任務所使用的時區。

    預設為使用Session的timezone參數。該參數會影響所有匯入涉及的和時區有關的函數結果。

    DATA_SOURCE指定資料來源,請使用KAFKA。
    data_source_properties指定資料來源相關的資訊。包括以下參數:
    • kafka_broker_list:Kafka的Broker串連資訊,格式為ip:host。多個Broker之間以逗號(,)分隔。
    • kafka_topic:指定待訂閱的Kafka的Topic。
      說明 如果指定資料來源相關的資訊,則kafka_broker_listkafka_topic必填。
    • kafka_partitionskafka_offsets:指定需要訂閱的Kafka Partition,以及對應的每個Partition的起始offset。
    • property:Kafka相關的屬性,功能等同於Kafka Shell中"--property"參數。建立匯入任務更詳細的文法可以通過執行HELP ROUTINE LOAD; 命令查看。
    說明 建立匯入任務更詳細的文法可以通過執行HELP ROUTINE LOAD;命令查看。
  • 樣本:從一個本地Kafka叢集匯入資料。
    CREATE ROUTINE LOAD routine_load_wikipedia ON routine_wiki_edit
    COLUMNS TERMINATED BY ",",
    COLUMNS (event_time, channel, user, is_anonymous, is_minor, is_new, is_robot, is_unpatrolled, delta, added, deleted)
    PROPERTIES
    (
        "desired_concurrent_number"="1",
        "max_error_number"="1000"
    )
    FROM KAFKA
    (
        "kafka_broker_list"= "localhost:9092",
        "kafka_topic" = "starrocks-load"
    );

查看任務狀態

  • 顯示database下,所有的例行匯入任務(包括已停止或取消的任務)。結果為一行或多行。
    USE [database];
    SHOW ALL ROUTINE LOAD;
  • 顯示database下,名稱為job_name的當前正在啟動並執行例行匯入任務。
    SHOW ROUTINE LOAD FOR [database].[job_name];
重要 StarRocks只能查看當前正在運行中的任務,已結束和未開始的任務無法查看。

查看任務狀態的具體命令和樣本,都可以通過HELP SHOW ROUTINE LOAD命令查看。查看任務運行狀態(包括子任務)的具體命令和樣本,可以通過HELP SHOW ROUTINE LOAD TASK命令查看。

執行SHOW ALL ROUTINE LOAD命令,可以查看當前正在啟動並執行所有Routine Load任務,返回如下類似資訊。
*************************** 1. row ***************************

                  Id: 14093
                Name: routine_load_wikipedia
          CreateTime: 2020-05-16 16:00:48
           PauseTime: N/A
             EndTime: N/A
              DbName: default_cluster:load_test
           TableName: routine_wiki_edit
               State: RUNNING
      DataSourceType: KAFKA
      CurrentTaskNum: 1
       JobProperties: {"partitions":"*","columnToColumnExpr":"event_time,channel,user,is_anonymous,is_minor,is_new,is_robot,is_unpatrolled,delta,added,deleted","maxBatchIntervalS":"10","whereExpr":"*","maxBatchSizeBytes":"104857600","columnSeparator":"','","maxErrorNum":"1000","currentTaskConcurrentNum":"1","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"starrocks-load","currentKafkaPartitions":"0","brokerList":"localhost:9092"}
    CustomProperties: {}
           Statistic: {"receivedBytes":150821770,"errorRows":122,"committedTaskNum":12,"loadedRows":2399878,"loadRowsRate":199000,"abortedTaskNum":1,"totalRows":2400000,"unselectedRows":0,"receivedBytesRate":12523000,"taskExecuteTimeMs":12043}
            Progress: {"0":"13634667"}
ReasonOfStateChanged:
        ErrorLogUrls: http://172.26.**.**:9122/api/_load_error_log?file=__shard_53/error_log_insert_stmt_47e8a1d107ed4932-8f1ddf7b01ad2fee_47e8a1d107ed4932_8f1ddf7b01ad2fee, http://172.26.**.**:9122/api/_load_error_log?file=__shard_54/error_log_insert_stmt_e0c0c6b040c044fd-a162b16f6bad53e6_e0c0c6b040c044fd_a162b16f6bad53e6, http://172.26.**.**:9122/api/_load_error_log?file=__shard_55/error_log_insert_stmt_ce4c95f0c72440ef-a442bb300bd743c8_ce4c95f0c72440ef_a442bb300bd743c8
            OtherMsg:
1 row in set (0.00 sec)
本樣本建立名為routine_load_wikipedia的匯入任務,相關參數描述如下表。
參數描述
State匯入任務狀態。RUNNING表示該匯入任務處於持續運行中。
Statistic進度資訊,記錄了從建立任務開始後的匯入資訊。
receivedBytes接收到的資料大小,單位是Byte。
errorRows匯入錯誤行數。
committedTaskNumFE提交的Task數。
loadedRows已匯入的行數。
loadRowsRate匯入資料速率,單位是行每秒(row/s)。
abortedTaskNumBE失敗的Task數。
totalRows接收的總行數。
unselectedRows被WHERE條件過濾的行數。
receivedBytesRate接收資料速率,單位是Bytes/s。
taskExecuteTimeMs匯入耗時,單位是ms。
ErrorLogUrls錯誤資訊日誌,可以通過URL看到匯入處理程序中的錯誤資訊。

暫停匯入任務

使用PAUSE語句後,此時匯入任務進入PAUSED狀態,資料暫停匯入,但任務未終止,可以通過RESUME語句重啟任務。

例如,暫停名稱為job_name的例行匯入任務。
PAUSE ROUTINE LOAD FOR [job_name];

可以通過HELP PAUSE ROUTINE LOAD命令查看協助和樣本。

暫停匯入任務後,任務的State變更為PAUSED,Statistic和Progress中的匯入資訊停止更新。此時,任務並未終止,通過SHOW ROUTINE LOAD語句可以看到已經暫停匯入任務。

恢複匯入任務

使用RESUME語句後,任務會短暫的進入NEED_SCHEDULE狀態,表示任務正在重新調度,一段時間後會重新恢複至RUNNING狀態,繼續匯入資料。

例如,恢複名稱為job_name的例行匯入任務。
RESUME ROUTINE LOAD FOR [job_name];

可以通過HELP RESUME ROUTINE LOAD命令查看協助和樣本。

執行SHOW ROUTINE LOAD命令,查看任務狀態。返回資訊如下。
*************************** 1. row ***************************
                  Id: 14093
                Name: routine_load_wikipedia
          CreateTime: 2020-05-16 16:00:48
           PauseTime: N/A
             EndTime: N/A
              DbName: default_cluster:load_test
           TableName: routine_wiki_edit
               State: NEED_SCHEDULE
      DataSourceType: KAFKA
      CurrentTaskNum: 0
       JobProperties: {"partitions":"*","columnToColumnExpr":"event_time,channel,user,is_anonymous,is_minor,is_new,is_robot,is_unpatrolled,delta,added,deleted","maxBatchIntervalS":"10","whereExpr":"*","maxBatchSizeBytes":"104857600","columnSeparator":"','","maxErrorNum":"1000","currentTaskConcurrentNum":"1","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"starrocks-load","currentKafkaPartitions":"0","brokerList":"localhost:9092"}
    CustomProperties: {}
           Statistic: {"receivedBytes":162767220,"errorRows":132,"committedTaskNum":13,"loadedRows":2589972,"loadRowsRate":115000,"abortedTaskNum":7,"totalRows":2590104,"unselectedRows":0,"receivedBytesRate":7279000,"taskExecuteTimeMs":22359}
            Progress: {"0":"13824771"}
ReasonOfStateChanged:
        ErrorLogUrls: http://172.26.**.**:9122/api/_load_error_log?file=__shard_54/error_log_insert_stmt_e0c0c6b040c044fd-a162b16f6bad53e6_e0c0c6b040c044fd_a162b16f6bad53e6, http://172.26.**.**:9122/api/_load_error_log?file=__shard_55/error_log_insert_stmt_ce4c95f0c72440ef-a442bb300bd743c8_ce4c95f0c72440ef_a442bb300bd743c8, http://172.26.**.**:9122/api/_load_error_log?file=__shard_56/error_log_insert_stmt_8753041cd5fb42d0-b5150367a5175391_8753041cd5fb42d0_b5150367a5175391
            OtherMsg:
1 row in set (0.00 sec)
再一次執行SHOW ROUTINE LOAD命令,查看任務狀態。返回資訊如下。
*************************** 1. row ***************************
                  Id: 14093
                Name: routine_load_wikipedia
          CreateTime: 2020-05-16 16:00:48
           PauseTime: N/A
             EndTime: N/A
              DbName: default_cluster:load_test
           TableName: routine_wiki_edit
               State: RUNNING
      DataSourceType: KAFKA
      CurrentTaskNum: 1
       JobProperties: {"partitions":"*","columnToColumnExpr":"event_time,channel,user,is_anonymous,is_minor,is_new,is_robot,is_unpatrolled,delta,added,deleted","maxBatchIntervalS":"10","whereExpr":"*","maxBatchSizeBytes":"104857600","columnSeparator":"','","maxErrorNum":"1000","currentTaskConcurrentNum":"1","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"starrocks-load","currentKafkaPartitions":"0","brokerList":"localhost:9092"}
    CustomProperties: {}
           Statistic: {"receivedBytes":175337712,"errorRows":142,"committedTaskNum":14,"loadedRows":2789962,"loadRowsRate":118000,"abortedTaskNum":7,"totalRows":2790104,"unselectedRows":0,"receivedBytesRate":7422000,"taskExecuteTimeMs":23623}
            Progress: {"0":"14024771"}
ReasonOfStateChanged:
        ErrorLogUrls: http://172.26.**.**:9122/api/_load_error_log?file=__shard_55/error_log_insert_stmt_ce4c95f0c72440ef-a442bb300bd743c8_ce4c95f0c72440ef_a442bb300bd743c8, http://172.26.**.**:9122/api/_load_error_log?file=__shard_56/error_log_insert_stmt_8753041cd5fb42d0-b5150367a5175391_8753041cd5fb42d0_b5150367a5175391, http://172.26.**.**:9122/api/_load_error_log?file=__shard_57/error_log_insert_stmt_31304c87bb82431a-9f2baf7d5fd7f252_31304c87bb82431a_9f2baf7d5fd7f252
            OtherMsg:
1 row in set (0.00 sec)
ERROR: No query specified
說明 第一次查詢任務時,State變為NEED_SCHEDULE,表示任務正在重新調度。第二次查詢任務時,State變為RUNNING,同時Statistic和Progress中的匯入資訊開始更新,繼續匯入資料。

停止匯入任務

使用STOP語句讓匯入任務進入STOP狀態,資料停止匯入,任務終止,無法恢複資料匯入。

例如,停止名稱為job_name的例行匯入任務。
STOP ROUTINE LOAD FOR [job_name];

您可以通過HELP STOP ROUTINE LOAD命令查看協助和樣本。

執行SHOW ROUTINE LOAD命令,查看任務狀態。返回資訊如下。
*************************** 1. row ***************************
                  Id: 14093
                Name: routine_load_wikipedia
          CreateTime: 2020-05-16 16:00:48
           PauseTime: N/A
             EndTime: 2020-05-16 16:08:25
              DbName: default_cluster:load_test
           TableName: routine_wiki_edit
               State: STOPPED
      DataSourceType: KAFKA
      CurrentTaskNum: 0
       JobProperties: {"partitions":"*","columnToColumnExpr":"event_time,channel,user,is_anonymous,is_minor,is_new,is_robot,is_unpatrolled,delta,added,deleted","maxBatchIntervalS":"10","whereExpr":"*","maxBatchSizeBytes":"104857600","columnSeparator":"','","maxErrorNum":"1000","currentTaskConcurrentNum":"1","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"starrocks-load","currentKafkaPartitions":"0","brokerList":"localhost:9092"}
    CustomProperties: {}
           Statistic: {"receivedBytes":325534440,"errorRows":264,"committedTaskNum":26,"loadedRows":5179944,"loadRowsRate":109000,"abortedTaskNum":18,"totalRows":5180208,"unselectedRows":0,"receivedBytesRate":6900000,"taskExecuteTimeMs":47173}
            Progress: {"0":"16414875"}
ReasonOfStateChanged:
        ErrorLogUrls: http://172.26.**.**:9122/api/_load_error_log?file=__shard_67/error_log_insert_stmt_79e9504cafee4fbd-b3981a65fb158cde_79e9504cafee4fbd_b3981a65fb158cde, http://172.26.**.**:9122/api/_load_error_log?file=__shard_68/error_log_insert_stmt_b6981319ce56421b-bf4486c2cd371353_b6981319ce56421b_bf4486c2cd371353, http://172.26.**.**:9122/api/_load_error_log?file=__shard_69/error_log_insert_stmt_1121400c1f6f4aed-866c381eb49c966e_1121400c1f6f4aed_866c381eb49c966e
            OtherMsg:

停止匯入任務後,任務的State變更為STOPPED,Statistic和Progress中的匯入資訊再也不會更新。此時,通過SHOW ROUTINE LOAD語句無法看到已經停止的匯入任務。