全部產品
Search
文件中心

DataWorks:Lindorm資料來源

更新時間:Oct 24, 2024

DataWorksData Integration支援使用Lindorm Reader和Lindorm Writer外掛程式讀取和寫入Lindorm雙向通道的功能,本文為您介紹DataWorks的Lindorm資料讀取與寫入能力。

使用限制

  • Lindorm需要在DataWorks上配置資料來源,通過Lindorm Reader和Lindorm Writer外掛程式讀取與寫入Lindorm資料。

  • Lindorm Reader和LindormWriter支援使用Serverless資源群組(推薦)獨享Data Integration資源群組

  • Lindorm 時序引擎目前不支援作為DataWorksData Integration的資料來源。

  • LindormReader和LindormWriter的必填配置項configuration,可以通過Lindorm叢集控制台查看串連Lindorm的相關配置項進行擷取,並以JSON格式填寫相關資訊。

    說明

    Lindorm為多模資料庫,LindormReader和LindormWriter支援讀取table和widecolumn類型的資料,您也可以通過DingTalk諮詢Lindorm值班人員。

支援的欄位類型

Lindorm Reader和Lindorm Writer支援大部分Lindorm類型,但也存在個別沒有支援的情況,請注意檢查您的資料類型。

Lindorm Reader和Lindorm Writer針對Lindorm類型的轉換列表,如下所示。

類型分類

資料類型

整數類

INT、LONG、SHORT

浮點類

DOUBLE、FLOAT、DOUBLE

字串類

STRING

日期時間類

DATE

布爾類

BOOLEAN

二進位類

BINARYSTRING

資料同步任務開發

資料同步任務的配置入口和通用配置流程可參見下文的配置指導。

附錄:指令碼Demo與參數說明

離線任務指令碼配置方式

如果您配置離線任務時使用指令碼模式的方式進行配置,您需要按照統一的指令碼格式要求,在任務指令碼中編寫相應的參數,詳情請參見通過指令碼模式配置離線同步任務,以下為您介紹指令碼模式下資料來源的參數配置詳情。

Reader指令碼Demo

  • 配置一個Lindorm Table(對應SDK中的TableService模型)抽取資料到本地的作業。

    {
        "type": "job",
        "version": "2.0",
        "steps": [
            {
                "stepType": "lindorm",
                "parameter": {
                    "mode": "FixedColumn",
                "caching": 128,
                    "configuration": {    //lindorm控制台中與串連相關的配置項,以JSON格式填寫
                        "lindorm.client.username": "",
                        "lindorm.client.seedserver": "seddserver.et2sqa.tbsite.net:30020",
                        "lindorm.client.namespace": "namespace",
                        "lindorm.client.password": ""
                    },
                    "columns": [
                        "id",
                        "name",
                        "age",
                        "birthday",
                        "gender"
                    ],
                    "envType": 1,
                    "datasource": "_LINDORM",
                    "namespace": "namespace",
                    "table": "lindorm_table"
                },
                "name": "lindormreader",
                "category": "reader"
            },
            {
                "stepType": "mysql",
                "parameter": {
                    "postSql": [],
                    "datasource": "_IDB.TAOBAO",
                    "session": [],
                    "envType": 1,
                    "columns": "columns": [
                        "id",
                        "name",
                        "age",
                        "birthday",
                        "gender"
                    ],
             "selects": [
                        "where(compare(\"id\", LESS, 5))",
                        "where(and(compare(\"id\", GREATER_OR_EQUAL, 5), compare(\"id\", LESS, 10)))",
                        "where(compare(\"id\", GREATER_OR_EQUAL, 10))"
                    ],
                    "socketTimeout": 3600000,
                    "guid": "",
                    "writeMode": "insert",
                    "batchSize": 1024,
                    "encoding": "UTF-8",
                    "table": "",
                    "preSql": []
                },
                "name": "Writer",
                "category": "writer"
            }
        ],
        "setting": {
            "jvmOption": "",
            "executeMode": null,
            "errorLimit": {
                "record": "0"
            },
            "speed": {
            //設定傳輸速度,單位為byte/s,DataX運行會儘可能達到該速度但是不超過它.
            "byte": 1048576
          }
          //出錯限制
          "errorLimit": {
            //出錯的record條數上限,當大於該值即報錯。
            "record": 0,
            //出錯的record百分比上限 1.0表示100%,0.02表示2%
            "percentage": 0.02
          }
        },
        "order": {
            "hops": [
                {
                    "from": "Reader",
                    "to": "Writer"
                }
            ]
        }
    }
  • 配置一個Lindorm wideColumn(對應SDK中的WideColumnService模型)抽取資料到本地的作業。

    {
        "type": "job",
        "version": "2.0",
        "steps": [
            {
                "stepType": "lindorm",
                "parameter": {
                    "mode": "FixedColumn",
                    "configuration": {  //lindorm控制台中與串連相關的配置項,以JSON格式填寫
                        "lindorm.client.username": "",
                        "lindorm.client.seedserver": "seddserver.et2sqa.tbsite.net:30020",
                        "lindorm.client.namespace": "namespace",
                        "lindorm.client.password": ""
                    },
                    "columns":  [
                       "STRING|rowkey",
                          "INT|f:a",
                          "DOUBLE|f:b"
                    ],
                    "envType": 1,
                    "datasource": "_LINDORM",
                    "namespace": "namespace",
                    "tableMode": "wideColumn",
                    "table":"yourTableName"
                },
                "name": "lindormreader",
                "category": "reader"
            },
            {
                "stepType": "mysql",
                "parameter": {
                    "postSql": [],
                    "datasource": "_IDB.TAOBAO",
                    "session": [],
                    "envType": 1,
                    "column": [
                        "id",
                        "value"
                    ],
                    "socketTimeout": 3600000,
                    "guid": "",
                    "writeMode": "insert",
                    "batchSize": 1024,
                    "encoding": "UTF-8",
                    "table": "",
                    "preSql": []
                },
                "name": "Writer",
                "category": "writer"
            }
        ],
        "setting": {
            "jvmOption": "",
            "executeMode": null,
            "errorLimit": {
                "record": "0"
            },
            "speed": {
            //設定傳輸速度,單位為byte/s,DataX運行會儘可能達到該速度但是不超過它。
            "byte": 1048576
          }
            //出錯限制
            "errorLimit": {
            //出錯的record條數上限,當大於該值即報錯。
            "record": 0,
            //出錯的record百分比上限 1.0表示100%,0.02表示2%。
            "percentage": 0.02
          }
        },
        "order": {
            "hops": [
                {
                    "from": "Reader",
                    "to": "Writer"
                }
            ]
        }
    }

Reader指令碼參數

參數

描述

是否必選

預設值

configuration

表示每個lindorm叢集提供給DataX用戶端串連的配置資訊,可以通過lindorm叢集控制台查詢,擷取到配置資訊後可以聯絡lindorm資料庫管理員將其轉換為如下JSON格式:{"key1":"value1","key2":"value2"}

例如:{"lindorm.zookeeper.quorum":"????","lindorm.zookeeper.property.clientPort":"????"}

說明

如果是手工編寫的JSON代碼,則需要將JSON格式中value值的雙引號轉義為\"

mode

表示資料讀模數式,包括固定列模式FixedColumn和動態列模式DynamicColumn。預設選擇FixedColumn

FixedColumn

tableMode

包括普通表模式table和寬表模式wideColumn。預設為table,如果選擇table模式,可不填寫。

預設不填寫

table

表示所要讀取的lindorm表名。lindorm表名對大小寫敏感。

namespace

表示所要讀取的lindorm表的命名空間。lindorm表的命名空間對大小寫敏感。

encoding

編碼方式,取值為UTF-8或GBK。一般用於將二進位儲存的lindorm byte[]類型轉換為String類型。

UTF-8

caching

一次性批量擷取的記錄數大小,該值可以極大減少資料同步系統與Lindorm的網路互動次數,並提升整體輸送量。如果該值設定過大,會導致Lindorm服務端壓力過大或者資料同步運行進程OOM異常。

100

selects

當前讀取的Table類型資料不支援自動切割分區,預設單並發運行,因此需要手動設定selects參數進行資料切片,例如:

selects": [
                    "where(compare(\"id\", LESS, 5))",
                    "where(and(compare(\"id\", GREATER_OR_EQUAL, 5), compare(\"id\", LESS, 10)))",
                    "where(compare(\"id\", GREATER_OR_EQUAL, 10))"
                ],

columns

讀取欄位列表。讀取欄位列表支援列裁剪和列換序,列裁剪指可以選擇部分列進行匯出,列換序指可以不按照表schema資訊順序進行匯出。

  • table類型的表,只需要填寫列名即可,會自動從表的meta擷取schema資訊。樣本如下:

    table類型:
    [
        "id",
        "name",
        "age",
        "birthday",
        "gender"
    ]
  • widecolumn類型的表。樣本如下:

    Widecolumn類型:
    [
        "STRING|rowkey",
        "INT|f:a",
        "DOUBLE|f:b"
    ]

Writer指令碼Demo

  • 配置一個資料來源為MySQL,需要寫入資料到Lindorm Table(對應SDK中的TableService模型)的作業。

    {
      "type": "job",
      "version": "2.0",
      "steps": [
        {
          "stepType": "mysql",
          "parameter": {
            "checkSlave": true,
            "datasource": " ",
            "envType": 1,
            "column": [
              "id",
              "value",
              "table"
            ],
            "socketTimeout": 3600000,
            "masterSlave": "slave",
            "connection": [
              {
                "datasource": " ",
                "table": []
              }
            ],
            "where": "",
            "splitPk": "",
            "encoding": "UTF-8",
            "print": true
          },
          "name": "mysqlReader",
          "category": "reader"
        },
        {
          "stepType": "lindorm",
          "parameter": {
            "configuration":  {
              "lindorm.client.seedserver": "xxxxxxx:30020",
              "lindorm.client.username": "xxxxxx",
              "lindorm.client.namespace": "default",
              "lindorm.client.password": "xxxxxx"
            },
            "nullMode": "skip",
            "datasource": "",
            "writeMode": "api",
            "envType": 1,
            "columns": [
              "id",
              "name",
              "age",
              "birthday",
              "gender"
            ],
            "dynamicColumn": "false",
            "table": "lindorm_table",
            "encoding": "utf8"
          },
          "name": "Writer",
          "category": "writer"
        }
      ],
      "setting": {
        "jvmOption": "",
        "executeMode": null,
        "speed": {
          //設定傳輸速度,單位為byte/s,DataX運行會儘可能達到該速度但是不超過它。
          "byte": 1048576
        },
        //出錯限制
        "errorLimit": {
          //出錯的record條數上限,當大於該值即報錯。
          "record": 0,
          //出錯的record百分比上限 1.0表示100%,0.02表示2%。
          "percentage": 0.02
        }
      },
      "order": {
        "hops": [
          {
            "from": "Reader",
            "to": "Writer"
          }
        ]
      }
    }
  • 配置一個資料來源為MySQL,需要寫入資料到Lindorm wideColumn(對應SDK中的WideColumnService模型)作業。

    {
        "type": "job",
        "version": "2.0",
        "steps": [
            {
                "stepType": "mysql",
                "parameter": {
                    "envType": 0,
                    "datasource": " ",
                    "column": [
                         "id",
                        "name",
                        "age",
                        "birthday",
                        "gender"
                    ],
                    "connection": [
                        {
                            "datasource": " ",
                            "table": []
                        }
                    ],
                    "where": "",
                    "splitPk": "",
                    "encoding": "UTF-8"
                },
                "name": "Reader",
                "category": "reader"
    
            },
          {
              "stepType": "lindorm",
              "parameter": {
                     "configuration":  {
                      "lindorm.client.seedserver": "xxxxxxx:30020",
                      "lindorm.client.username": "xxxxxx",
                      "lindorm.client.namespace": "default",
                      "lindorm.client.password": "xxxxxx"
                    },
                "writeMode": "api",
                "namespace": "default",
                "table": "xxxxxx",
                "encoding": "utf8",
                "nullMode": "skip",
                "dynamicColumn": "false",
                "caching": 128,
                "columns": [
                      "ROW|STRING",
                      "cf:id|STRING",
                      "cf:age|INT",
                      "cf:birthday|STRING"
                    ]
                  },
              "name":"Writer",
        "category":"writer"
            }
        ],
        "setting": {
            "jvmOption": "",
                    "errorLimit": {
                            "record": "0"
                    },
                    "speed": {
                        "concurrent": 3,
                        "throttle": false
                    }
        },
        "order": {
                "hops": [
                   {
                    "from": "Reader",
                    "to": "Writer"
                }
            ]
        }
      }

Writer指令碼參數

參數

描述

是否必選

預設值

configuration

每個lindorm叢集提供給DataX用戶端串連的配置資訊,可以通過lindorm叢集控制台查詢,擷取到配置資訊後可以聯絡lindorm資料庫管理員將其轉換為如下JSON格式:{"key1":"value1","key2":"value2"}

例如:{"lindorm.zookeeper.quorum":"????","lindorm.zookeeper.property.clientPort":"????"}

說明

如果是手寫的JSON代碼,則需要將雙引號轉義為\"

table

表示所要寫入的lindorm表名。lindorm表名對大小寫敏感。

namespace

表示所要寫入的lindorm表的命名空間。lindorm表的命名空間對大小寫敏感。

encoding

編碼方式,取值為UTF-8或GBK。一般用於將二進位儲存的lindorm byte[]類型轉換為String類型。

UTF-8

columns

寫入欄位列表。寫入欄位列表支援列裁剪和列換序,列裁剪指可以選擇部分列進行匯出,列換序指可以不按照表schema資訊順序進行匯出。

  • table類型的表,只需要填寫列名即可,會自動從表的meta擷取schema資訊。

  • widecolumn類型或table類型的表。

nullMode

表示在讀取源頭資料的值為null時,Lindorm Writer 中的nullMode參數可通過配置不同內容,實現不同的處理方式。

  • SKIP:表示不向Lindorm寫這列。

  • EMPTY_BYTES:表示遇到欄位值為空白時,寫入空位元組數組到Lindorm對應的欄位。

  • NULL:表示寫入null值。

  • DELETE:表示遇到欄位值為空白時刪除Lindorm中對應的欄位。

EMPTY_BYTES