全部產品
Search
文件中心

DataWorks:Doris資料來源

更新時間:Dec 12, 2024

DataWorksData Integration支援使用Doris Writer匯入表資料至Doris。本文為您介紹DataWorks的Doris資料同步能力支援情況。

支援的Doris版本

Doris Writer使用的驅動版本是MySQL Driver 5.1.47,該驅動支援的核心版本如下。驅動能力詳情請參見Doris官網文檔

Doris 版本

是否支援

0.x.x

支援

1.1.x

支援

1.2.x

支援

2.x

支援

使用限制

Data Integration僅支援離線寫入Doris。

支援的欄位類型

不同Doris版本支援不同的資料類型和彙總模型。各版本Doris的全量欄位類型請參見Doris的官方文檔,下面為您介紹Doris當前主要欄位的支援情況。

類型

支援模型

Doris版本

離線寫入(Doris Writer)

SMALLINT

Aggregate,Unique,Duplicate

0.x.x、1.1.x、1.2.x、2.x

支援

INT

Aggregate,Unique,Duplicate

0.x.x、1.1.x、1.2.x、2.x

支援

BIGINT

Aggregate,Unique,Duplicate

0.x.x、1.1.x、1.2.x、2.x

支援

LARGEINT

Aggregate,Unique,Duplicate

0.x.x、1.1.x、1.2.x、2.x

支援

FLOAT

Aggregate,Unique,Duplicate

0.x.x、1.1.x、1.2.x、2.x

支援

DOUBLE

Aggregate,Unique,Duplicate

0.x.x、1.1.x、1.2.x、2.x

支援

DECIMAL

Aggregate,Unique,Duplicate

0.x.x、1.1.x、1.2.x、2.x

支援

DECIMALV3

Aggregate,Unique,Duplicate

1.2.1+、2.x

支援

DATE

Aggregate,Unique,Duplicate

0.x.x、1.1.x、1.2.x、2.x

支援

DATETIME

Aggregate,Unique,Duplicate

0.x.x、1.1.x、1.2.x、2.x

支援

DATEV2

Aggregate,Unique,Duplicate

1.2.x、2.x

支援

DATATIMEV2

Aggregate,Unique,Duplicate

1.2.x、2.x

支援

CHAR

Aggregate,Unique,Duplicate

0.x.x、1.1.x、1.2.x、2.x

支援

VARCHAR

Aggregate,Unique,Duplicate

0.x.x、1.1.x、1.2.x、2.x

支援

STRING

Aggregate,Unique,Duplicate

0.x.x、1.1.x、1.2.x、2.x

支援

VARCHAR

Aggregate,Unique,Duplicate

1.1.x、1.2.x、2.x

支援

ARRAY

Duplicate

1.2.x、2.x

支援

JSONB

Aggregate,Unique,Duplicate

1.2.x、2.x

支援

HLL

Aggregate

0.x.x、1.1.x、1.2.x、2.x

支援

BITMAP

Aggregate

0.x.x、1.1.x、1.2.x、2.x

支援

QUANTILE_STATE

Aggregate

1.2.x、2.x

支援

實現原理

Doris Writer通過Doris原生支援的StreamLoad方式匯入資料,Doris Writer會將Reader端讀取到的資料緩衝在記憶體中,並拼接成文本,然後大量匯入至Doris資料庫。更多詳情請參見Doris官方文檔

資料同步前準備

在DataWorks上進行資料同步前,您需要參考本文提前在Doris側進行資料同步環境準備,以便在DataWorks上進行Doris資料同步任務配置與執行時服務正常。以下為您介紹Doris同步前的相關環境準備。

準備工作1:確認Doris的版本

Data Integration對Doris版本有要求,您可以參考上文支援的Doris版本章節,查看當前待同步的Doris是否符合版本要求。您可以在Doris的官方網站下載對應的版本,並且安裝。

準備工作2:建立帳號,並配置帳號許可權

您需要規劃一個資料倉儲的登入帳號用於後續操作,同時,您需要為該帳號設定密碼,以便後續串連到資料倉儲。如果您要使用Doris預設的root使用者進行登入,那麼您需要為root使用者佈建密碼。root使用者預設沒有密碼,您可以在Doris上執行SQL來設定密碼:

SET PASSWORD FOR 'root' = PASSWORD('密碼')

準備工作3:配置Doris的網路連接

使用StreamLoad匯入資料,需要訪問FE節點的私網地址。如果訪問FE的公網地址,則會被重新導向到BE節點的內網IP(資料操作問題)。因此,您需要將Doris的網路與Data Integration使用的Serverless資源群組或獨享Data Integration資源群組打通,使之通過內網地址進行訪問。網路打通的具體操作可以參考網路連通方案

建立資料來源

在進行資料同步任務開發時,您需要在DataWorks上建立一個對應的資料來源,操作流程請參見建立並管理資料來源詳細的配置參數解釋可在配置介面查看對應參數的文案提示

下面對Doris資料來源的幾個配置項進行說明:

  • JdbcUrl:請填寫JDBC串連串,包含IP、連接埠號碼、資料庫和串連參數。支援公網IP和私網IP,如果使用公網IP,請確保Data Integration資源群組能夠正常訪問Doris所在的主機。

  • FE endpoint:請填寫FE節點的IP和連接埠。如果您的叢集中有多個FE節點,可以配置多個FE節點的IP和連接埠,每個IP和連接埠以逗號分隔,例如ip1:port1,ip2:port2。在測試連通性時,會對所有的FE endpoint做連通性測試。

  • 使用者名稱:請填寫Doris資料庫的使用者名稱。

  • 密碼:請填寫Doris資料庫對應使用者的密碼。

資料同步任務開發

資料同步任務的配置入口和通用配置流程可參見下文的配置指導。

單表離線同步任務配置指導

附錄:指令碼Demo與參數說明

離線任務指令碼配置方式

如果您配置離線任務時使用指令碼模式的方式進行配置,您需要按照統一的指令碼格式要求,在任務指令碼中編寫相應的參數,詳情請參見通過指令碼模式配置離線同步任務,以下為您介紹指令碼模式下資料來源的參數配置詳情。

Reader指令碼Demo

{
  "type": "job",
  "version": "2.0",//版本號碼。
  "steps": [
    {
      "stepType": "doris",//外掛程式名。
      "parameter": {
        "column": [//列名。
          "id"
        ],
        "connection": [
          {
            "querySql": [
              "select a,b from join1 c join join2 d on c.id = d.id;"
            ],
            "datasource": ""//資料來源名稱。
          }
        ],
        "where": "",//過濾條件。
        "splitPk": "",//切分鍵。
        "encoding": "UTF-8"//編碼格式。
      },
      "name": "Reader",
      "category": "reader"
    },
    {
      "stepType": "stream",
      "parameter": {},
      "name": "Writer",
      "category": "writer"
    }
  ],
  "setting": {
    "errorLimit": {
      "record": "0"//錯誤記錄數。
    },
    "speed": {
      "throttle": true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
      "concurrent": 1,//作業並發數。
      "mbps": "12"//限流,此處1mbps = 1MB/s。
    }
  },
  "order": {
    "hops": [
      {
        "from": "Reader",
        "to": "Writer"
      }
    ]
  }
}

Reader指令碼參數

指令碼參數名

描述

是否必選

預設值

datasource

資料來源名稱,指令碼模式支援添加資料來源,此配置項填寫的內容必須與添加的資料來源名稱保持一致。

table

選取的需要同步的表名稱。一個Data Integration任務只能從一張表中讀取資料。

table用於配置範圍的進階用法樣本如下:

  • 您可以通過配置區間讀取分庫分表,例如'table_[0-99]'表示讀取'table_0''table_1''table_2'直到'table_99'

  • 如果您的表數字尾碼的長度一致,例如'table_000''table_001''table_002'直到'table_999',您可以配置為'"table": ["table_00[0-9]", "table_0[10-99]", "table_[100-999]"]'

說明

任務會讀取匹配到的所有表,具體讀取這些表中column配置項指定的列。如果表不存在,或者讀取的列不存在,會導致任務失敗。

column

所配置的表中需要同步的列名集合,使用JSON的數組描述欄位資訊 。預設使用所有列配置,例如[ * ]

  • 支援列裁剪:列可以挑選部分列進行匯出。

  • 支援列換序:列可以不按照表schema資訊順序進行匯出。

  • 支援常量配置:您需要按照Doris SQL文法格式,例如["id","table","1","'mingya.wmy'","'null'","to_char(a+1)","2.3","true"]

    • id為普通列名。

    • table為包含保留字的列名。

    • 1為整型數字常量。

    • 'mingya.wmy'為字串常量(注意需要加上一對單引號)。

    • 關於null

      • " "表示空。

      • null表示null。

      • 'null'表示null這個字串。

    • to_char(a+1)為計算字串長度函數。

    • 2.3為浮點數。

    • true為布爾值。

  • column必須顯示指定同步的列集合,不允許為空白。

splitPk

Doris Reader進行資料幫浦時,如果指定splitPk,表示您希望使用splitPk代表的欄位進行資料分區,資料同步因此會啟動並發任務進行資料同步,提高資料同步的效能。

  • 推薦splitPk使用者使用表主鍵,因為表主鍵通常情況下比較均勻,因此切分出來的分區也不容易出現資料熱點。

  • 目前splitPk僅支援整型資料切分,不支援字串、浮點和日期等其他類型 。如果您指定其他非支援類型,忽略splitPk功能,使用單通道進行同步。

  • 如果不填寫splitPk,包括不提供splitPk或者splitPk值為空白,資料同步視作使用單通道同步該表資料 。

where

篩選條件,在實際業務情境中,往往會選擇當天的資料進行同步,將where條件指定為gmt_create>$bizdate

  • where條件可以有效地進行業務增量同步處理。如果不填寫where語句,包括不提供where的key或value,資料同步均視作同步全量資料。

  • 不可以將where條件指定為limit 10,這不符合Doris SQL WHERE子句約束。

querySql(進階模式,嚮導模式不支援此參數的配置)

在部分業務情境中,where配置項不足以描述所篩選的條件,您可以通過該配置項來自訂篩選SQL。配置該項後,資料同步系統會忽略tables、columns和splitPk配置項,直接使用該項配置的內容對資料進行篩選。例如,需要進行多表join後同步資料,使用select a,b from table_a join table_b on table_a.id = table_b.id。當您配置querySql時,Doris Reader直接忽略table、column、where和splitPk條件的配置,querySql優先順序大於tablecolumnwheresplitPk選項。datasource通過它解析出使用者名稱和密碼等資訊。

說明

querySql需要區分大小寫,例如,寫為querysql會不生效。

Writer指令碼Demo

{
  "stepType": "doris",//外掛程式名。
  "parameter":
  {
    "postSql"://執行資料同步任務之後率先執行的SQL語句。
    [],
    "preSql":
    [],//執行資料同步任務之前率先執行的SQL語句。
    "datasource":"doris_datasource",//資料來源名。
    "table": "doris_table_name",//表名。
    "column":
    [
      "id",
      "table_id",
      "table_no",
      "table_name",
      "table_status"
    ],
    "loadProps":{
      "column_separator": "\\x01",//指定CSV格式的資料行分隔符號
      "line_delimiter": "\\x02"//指定CSV格式的行分隔字元
    }
  },
  "name": "Writer",
  "category": "writer"
}

Writer指令碼參數

參數

描述

是否必選

預設值

datasource

資料來源名稱,指令碼模式支援添加資料來源,此配置項填寫的內容必須與添加的資料來源名稱保持一致。

table

選取的需要同步的表名稱。

column

目標表需要寫入資料的欄位,欄位之間用英文逗號分隔。例如"column":["id","name","age"]。如果要依次寫入全部列,使用(*)表示,例如"column":["*"]

preSql

執行資料同步任務之前率先執行的SQL語句。目前嚮導模式僅允許執行一條SQL語句,指令碼模式可以支援多條SQL語句,例如,執行前清空表中的舊資料。

postSql

執行資料同步任務之後執行的SQL語句。目前嚮導模式僅允許執行一條SQL語句,指令碼模式可以支援多條SQL語句,例如,加上某個時間戳記。

maxBatchRows

每批次匯入資料的最大行數,與batchSize共同控制每批次的匯入數量。每批次資料達到兩個閾值之一,即開始匯入這一批次的資料。

500000

batchSize

每批次匯入資料的最巨量資料量,與maxBatchRows共同控制每批次的匯入數量。每批次資料達到兩個閾值之一,即開始匯入這一批次的資料。

104857600

maxRetries

每批次匯入資料失敗後的重試次數。

3

labelPrefix

每批次上傳檔案的 label 首碼。最終的label將有labelPrefix + UUID組成全域唯一的label,確保資料不會重複匯入。

datax_doris_writer_

loadProps

StreamLoad的請求參數,主要用於配置匯入的資料格式。預設以CSV格式匯入。如果loadProps沒有配置,則採用預設的CSV格式,以\t為資料行分隔符號,\n為行分隔字元,配置如下所示。

"loadProps": {
    "format":"csv",
    "column_separator": "\t",
    "line_delimiter": "\n"
}

如果您需要指定為JSON格式匯入,則配置如下所示。

"loadProps": {
    "format": "json"
}

彙總類型指令碼(Doris Writer寫入彙總類型)

Doris Writer支援寫入彙總類型,但是需要在指令碼模式中做額外的配置。如下所示。

例如,對於如下一張Doris的表,其中,uuidbitmap類型(彙總類型)、sexHLL類型(彙總類型)。

CREATE TABLE `example_table_1` (
  `user_id` int(11) NULL,
  `date` varchar(10) NULL DEFAULT "10.5",
  `city` varchar(10) NULL,
  `uuid` bitmap BITMAP_UNION NULL, -- 彙總類型
  `sex` HLL HLL_UNION  -- 彙總類型
) ENGINE=OLAP AGGREGATE KEY(`user_id`, `date`,`city`)
COMMENT 'OLAP' DISTRIBUTED BY HASH(`user_id`) BUCKETS 32

往表中插入未經處理資料:

user_id,date,city,uuid,sex
0,T0S4Pb,abc,43,'54'
1,T0S4Pd,fsd,34,'54'
2,T0S4Pb,fa3,53,'64'
4,T0S4Pb,fwe,87,'64'
5,T0S4Pb,gbr,90,'56'
2,iY3GiHkLF,234,100,'54'

通過Doris Writer寫入彙總類型時,不僅需要在writer.parameter.column中指定該列,還需要在writer.parameter.loadProps.columns中配置彙總函式。例如,為uuid使用彙總函式bitmap_hash,為sex使用彙總函式hll_hash

指令碼模式樣本:

{
    "stepType": "doris",//外掛程式名。
    "writer":
    {
        "parameter":
        {
            "column":
            [
                "user_id",
                "date",
                "city",
                "uuid",// 彙總類型bitmap
                "sex"// 彙總類型HLL
            ],
            "loadProps":
            {
                "format": "csv",
                "column_separator": "\\x01",
                "line_delimiter": "\\x02",
                "columns": "user_id,date,city,k1,uuid=bitmap_hash(k1),k2,sex=hll_hash(k2)"// 需要指定彙總函式
            },
            "postSql":
            [
                "select count(1) from example_tbl_3"
            ],
            "preSql":
            [],
            "datasource":"doris_datasource",//資料來源名。
                    "table": "doris_table_name",//表名。
        }
          "name": "Writer",
              "category": "writer"
    }
}