全部產品
Search
文件中心

DataWorks:RDS增量資料同步至MaxCompute

更新時間:Jun 19, 2024

本文以同步業務RDS資料庫的資料至MaxCompute為例,為您介紹如何對不同情境的資料進行增量同步處理。

背景資訊

根據需要同步的資料在寫入後是否發生變化,分為恒定的存量資料(通常是日誌資料)和持續更新的資料(例如人員表中,人員的狀態會發生變化)。

根據等冪性原則(一個任務多次啟動並執行結果一致,則該任務支援重跑調度。如果該任務出現錯誤,髒資料較容易清理),每次匯入資料都是匯入至一張單獨的表或分區中,或者覆蓋記錄。

本文定義任務測試時間是2016年11月14日,在14日進行增量同步處理,同步歷史資料至分區ds=20161113中。增量同步處理的情境配置了自動調度,把增量資料在15日淩晨同步至分區ds=20161114中。資料中的時間欄位optime用來表示該資料的修改時間,從而判斷這條資料是否為增量資料。

使用說明

  • 部分資料來源暫無增量同步處理方案,例如HBase、OTSStream資料來源等。具體資料來源是否支援增量同步處理可以看具體的Reader外掛程式文檔。

  • 每個外掛程式實現增量同步處理的所配置的參數可能不同,具體參數配置可以參考對應的Reader外掛程式文檔,詳情可參考:支援的資料來源與讀寫外掛程式

  • 增量同步處理配置相關說明詳情請參見:情境:配置增量資料離線同步任務

對恒定的存量資料進行增量同步處理

由於資料產生後不會發生變化,因此可以很方便地根據資料的產生規律進行分區。較常見的是根據日期進行分區,例如每天1個分區。

  1. 在RDS資料庫中,執行下述語句準備資料。

    drop table if exists oplog;
    create table if not exists oplog(
    optime DATETIME,
    uname varchar(50),
    action varchar(50),
    status varchar(10)
     );
    Insert into oplog values(str_to_date('2016-11-11','%Y-%m-%d'),'LiLei','SELECT','SUCCESS');
    Insert into oplog values(str_to_date('2016-11-12','%Y-%m-%d'),'HanMM','DESC','SUCCESS');

    上述的兩條資料作為歷史資料,需要先進行一次全量資料同步,將歷史資料同步至昨天的分區。

  2. 資料開發頁面,按右鍵商務程序下的,選擇建立表

  3. 建立表對話方塊中,輸入表名(ods_oplog),單擊提交

  4. 雙擊ods_oplog表,在右側的編輯頁面單擊DDL模式,輸入下述建表語句。

    --建立好MaxCompute表,按天進行分區。
    create table if not exists ods_oplog(
     optime datetime,
     uname string,
     action string,
     status string
    ) partitioned by (ds string);
  5. 配置同步歷史資料的任務,詳情請參見通過嚮導模式配置離線同步任務

    測試同步任務成功後,單擊節點編輯頁面右側的調度配置,勾選暫停調度並重新提交或發布,避免任務自動調度執行。

  6. 執行下述語句,向RDS源頭表中插入資料作為增量資料。

    insert into oplog values(CURRENT_DATE,'Jim','Update','SUCCESS');
    insert into oplog values(CURRENT_DATE,'Kate','Delete','Failed'); 
    insert into oplog values(CURRENT_DATE,'Lily','Drop','Failed');
  7. 配置同步增量資料的任務。

    資料來源中設定資料過濾為date_format(optime,'%Y%m%d')=${bdp.system.bizdate},在資料去向中設定分區資訊為${bdp.system.bizdate}

    說明

    通過配置資料過濾,在15日淩晨進行同步時,您可以查詢14日源頭表全天新增的資料,並同步至目標表的增量分區中。

  8. 查看同步結果。

    單擊節點編輯頁面右側的調度配置,設定任務的調度周期為天調度。提交或發布任務後,第2天任務將自動調度執行。執行成功後,即可查看MaxCompute目標表的資料。

對持續更新的資料進行增量同步處理

根據資料倉儲反映歷史變化的特點,建議每天對人員表、訂單表等會發生變化的資料進行全量同步,即每天儲存的都是全量資料,方便您擷取歷史資料和當前資料。

真實情境中因為某些特殊情況,需要每天只進行增量同步處理。但MaxCompute不支援Update語句修改資料,只能通過其它方式實現。下文將為您介紹兩種同步策略(全量同步、增量同步處理)的具體操作。

  1. 執行下述語句準備資料。

    drop table if exists user ;
    create table if not exists user(
        uid int,
        uname varchar(50),
        deptno int,
        gender VARCHAR(1),
        optime DATETIME
        );
    --歷史資料
    insert into user values (1,'LiLei',100,'M',str_to_date('2016-11-13','%Y-%m-%d'));
    insert into user values (2,'HanMM',null,'F',str_to_date('2016-11-13','%Y-%m-%d'));
    insert into user values (3,'Jim',102,'M',str_to_date('2016-11-12','%Y-%m-%d'));
    insert into user values (4,'Kate',103,'F',str_to_date('2016-11-12','%Y-%m-%d'));
    insert into user values (5,'Lily',104,'F',str_to_date('2016-11-11','%Y-%m-%d'));
    --增量資料
    update user set deptno=101,optime=CURRENT_TIME  where uid = 2; --null改成非null
    update user set deptno=104,optime=CURRENT_TIME  where uid = 3; --非null改成非null
    update user set deptno=null,optime=CURRENT_TIME  where uid = 4; --非null改成null
    delete from user where uid = 5;
    insert into user(uid,uname,deptno,gender,optime) values (6,'Lucy',105,'F',CURRENT_TIME);
  2. 進行資料同步。

    • 每天全量同步

      1. 執行下述語句建立MaxCompute表,建立表的詳情請參見建立表

        --全量同步
        create table ods_user_full(
            uid bigint,
            uname string,
            deptno bigint,
            gender string,
            optime DATETIME 
        ) partitioned by (ds string);
      2. 配置全量同步任務。

        說明

        需要每天都全量同步,因此任務的調度周期需要配置為天調度。

      3. 運行任務,並查看同步後MaxCompute目標表的結果。

        因為每天都是全量同步,沒有全量和增量的區別,所以第2天任務自動調度執行成功後,即可看到資料結果。

    • 每天增量同步處理

      不推薦使用該方式,只有在不支援Delete語句、無法通過SQL語句查看被刪除的資料等情境才會考慮。雖然實際上大多使用邏輯刪除資料,將Delete轉化為Update進行處理。但仍會限制一些特殊的業務情境不能實現,導致資料不一致。並且同步後需要合并新增資料和歷史資料。

      準備資料

      需要建立兩張表,一張寫當前的最新資料,一張寫增量資料。

      --結果表
      create table dw_user_inc(
          uid bigint,
          uname string,
          deptno bigint,
          gender string,
          optime DATETIME 
      );
      --增量記錄表
      create table ods_user_inc(
          uid bigint,
          uname string,
          deptno bigint,
          gender string,
          optime DATETIME 
      )
      1. 配置同步任務,將全量資料直接寫入結果表。

        說明

        只需要執行一次,執行成功後需要單擊頁面右側的調度配置,勾選暫停調度

      2. 配置同步任務,將增量資料寫入到增量表。設定資料過濾,即where參數配置為date_format(optime,'%Y%m%d')=${bdp.system.bizdate}

      3. 執行下述語句合并資料。

        insert overwrite table dw_user_inc 
        select 
        --所有select操作,如果ODS表有資料,說明發生了變動,以ODS表為準。
        case when b.uid is not null then b.uid else a.uid end as uid,
        case when b.uid is not null then b.uname else a.uname end as uname,
        case when b.uid is not null then b.deptno else a.deptno end as deptno,
        case when b.uid is not null then b.gender else a.gender end as gender,
        case when b.uid is not null then b.optime else a.optime end as optime
        from 
        dw_user_inc a 
        full outer join ods_user_inc b
        on a.uid  = b.uid ;

        查看執行結果會發現Delete的記錄沒有同步成功。

    每天增量同步處理的優點是同步的增量資料量較小,但可能出現資料不一致的情況,並且需要通過額外的計算進行資料合併。

    如果不是必要情況,對持續更新的資料進行每天全量同步即可。如果希望歷史資料僅保留一定的時間,自動刪除超出保留時間的資料,您可以設定Lifecycle