全部產品
Search
文件中心

:DataWorks OpenLake一站式智能化湖倉一體資料開發

更新時間:Aug 09, 2025

在本實驗中,您將體驗基於OpenLake House環境下的零售電子商務資料開發與分析情境,通過DataWorks實現面向多引擎協同開發、可視化工作流程編排和資料目錄管理等。同時實踐Python編程及調試,並使用Notebook進行與AI聯動的互動式資料探索與分析。

背景介紹

DataWorks簡介

DataWorks是智能湖倉一體資料開發治理平台,內建阿里巴巴15年巨量資料建設方法論,深度適配阿里雲MaxCompute、E-MapReduce、Hologres、Flink、PAI 等數十種巨量資料和AI計算服務,為資料倉儲、資料湖、OpenLake湖倉一體資料架構提供智能化ETL開發、資料分析與主動式資料資產治理服務,助力“Data+AI”全生命週期的資料管理。自2009年起,DataWorks不斷對阿里巴巴資料體系進行產品化沉澱,服務於政務、金融、零售、互連網、汽車、製造等行業,使數以萬計的客戶信賴並選擇DataWorks進行數字化升級和價值創造。

DataWorks Copilot簡介

DataWorks Copilot是您在DataWorks的智能助手,在DataWorks,您可以自由選擇用DataWorks預設模型、Qwen3-235B-A22B、DeepSeek-R1-0528或Qwen3-Coder大模型來完成相關Copilot產品操作。藉助DeepSeek-R1的深度推理能力,DataWorks Copilot可以協助您通過自然語言互動完成更為複雜的SQL代碼產生、最佳化、測試等操作,顯著提升ETL開發和資料分析效率。

DataWorks Notebook簡介

DataWorks Notebook是智能化互動式資料開發和分析工具,能夠面向多種資料引擎開展SQL或Python分析,即時運行或調試代碼,擷取可視化資料結果。同時,DataWorks Notebook能夠與其他任務節點混合編排為工作流程,提交至調度系統運行,助力複雜業務情境的靈活實現。

注意事項

  • 當前DataWorks Copilot產品存在公測地區限制和版本限制,詳情請參見公測說明

  • 當前Data Studio對Python、Notebook的支援,需要先切換至個人開發環境

使用限制

  • OpenLake只支援資料湖構建(DLF)2.0版本

  • 資料目錄只支援資料湖構建(DLF)2.0版本

  • Qwen3-235B-A22B/DeepSeek- R1模型支援地區:華東1(杭州)、華東2(上海)、華北2(北京)、華北3(張家口)、華南1(深圳)、西南1(成都)。

  • Qwen3-Coder模型支援地區為華東1(杭州)、華東2(上海)、華北2(北京)、華北3(張家口)、華北6(烏蘭察布)、華南1(深圳)、西南1(成都)。

環境準備

  1. 準備阿里雲帳號(主帳號)準備RAM使用者(子帳號)

  2. 建立工作空間

    說明

    請選擇參加資料開發(Data Studio)(新版)公測

  3. 綁定計算資源

實驗步驟

步驟一:資料目錄管理

湖倉一體的資料目錄管理能力,支援對DLF、MaxCompute、Hologres等進行資料目錄管理及建立。

  1. Data Studio頁面,單擊頁面左側一級菜單image,進入資料目錄功能。在資料目錄左側列表上找到您需要管理的中繼資料類型,單擊添加專案(不同中繼資料類型按鈕存在差異,本文以MaxCompute為例)。

    您可以按需添加DataWorks工作空間中已建立的資料來源,也可以在MaxCompute-專案頁簽下選擇當前帳號具有相關許可權的MaxCompute專案。

    image

  2. 添加專案後,即可在對應中繼資料類型下看到已添加的專案,您可以單擊專案名稱,進入資料目錄詳情頁。

  3. 在資料目錄詳情頁選擇Schema後,單擊任意表名可進入該表的詳情頁面查看錶詳情。

  4. 資料目錄支援您可視化建表。

    展開資料目錄至指定Schema的層級,單擊右側的image,進入建立表頁面。

    image

  5. 建立表頁面,您可以通過多種方式建表:

    • 在地區①中填入表名欄位資訊等。

    • 在地區②中直接填入建表DDL語句。

    image

  6. 單擊頁面頂部的發布,完成表建立。

步驟二:工作流程編排

工作流程(Workflow)支援以業務視角通過可視化拖拽的方式編排多種不同類型的資料開發節點,調度時間等通用參數無需單獨配置,可以協助您輕鬆管理複雜的任務工程。

  1. Data Studio頁面,單擊頁面左側一級菜單image,進入資料開發功能。在資料開發左側列表中找到專案目錄,單擊專案目錄右側的image,選擇建立工作流程

  2. 進入工作流程編輯功能介面前,請先輸入工作流程名稱,單擊確定,進入工作流程編輯介面。

  3. 進入工作流程編輯功能介面後,單擊畫布中央的拖拽或點擊添加節點,然後在添加節點對話方塊中,指定節點類型虛擬節點,自訂節點名稱,單擊確認

  4. 從工作流程編輯功能介面左側的節點類型列表中找到自己需要的節點類型,並將其拖至畫布中,在添加節點對話方塊中輸入節點名稱,單擊確認

    image

  5. 從工作流程編輯功能介面右側的畫布上,找到需要建立依賴關係的兩個節點,滑鼠移至上方到其中一個節點下邊緣的中間位置,當滑鼠變為+後,開始拖動滑鼠,將箭頭拖動至另一個節點後鬆開。設定依賴關係如下圖所示後,在頂部工具列單擊儲存

    image

  6. 儲存成功後,可按需對畫布進行布局調整。image

  7. 從工作流程畫布右側找到並單擊調度配置,在調度配置面板中,依次配置工作流程的調度參數及節點依賴。單擊調度參數中的添加參數,參數名輸入框中輸入bizdate,在參數值下拉式清單中選擇$[yyyymmdd-1]

    image

  8. 單擊使用工作空間根節點,將工作空間根節點作為工作流程的上遊依賴。

    image

  9. 單擊工作流程畫布上方的發布,頁面右下方會出現發佈動作介面,單擊上線發布內容操作介面中的開始發布生產後,依次進行檢查和確認即可。

    image

步驟三:多引擎協同開發

Data Studio支援Data Integration、MaxCompute、Hologres、EMR、Flink、Python、Notebook、ADB等數十種不同引擎類型的節點的數倉開發,支援複雜的調度依賴,提供開發環境與生產環境隔離的研發模式。本實驗以建立Flink SQL Streaming節點為例。

  1. Data Studio頁面,單擊頁面左側一級菜單image,進入資料開發功能介面。在資料開發功能介面左側列表中找到專案目錄,單擊專案目錄右側的image,單擊串聯功能表中的Flink SQL Streaming進入節點編輯功能介面。在進入節點編輯功能介面前,請先輸入節點名稱,敲擊斷行符號鍵,等待即可。

    預設節點名稱ads_ec_page_visit_log

    image

  2. 在節點編輯功能介面,將預設Flink SQL Stream代碼粘貼到代碼編輯器中。

    image

    預設Flink SQL Stream代碼

    CREATE TEMPORARY VIEW log_ri_base
    AS 
    SELECT 
      visit_time
      ,substring(visit_time,1,8) as stat_date
      ,substring(visit_time,9,2) as stat_hour
      ,visitor_id
      ,item_id
      ,cate_id
      ,ext_pv
    FROM vvp_ec_ads.dws_log_all_itm_ipv_ri
    WHERE
      bc_type IN ('b', 'z')
      AND coalesce(cate_id, '') <> ''
      AND visitor_type = 'uid'
      and coalesce(item_id, '') <> ''
      AND substring(visit_time,1,8) >= '${bizdate}'
    ;
    
    
    CREATE TEMPORARY VIEW itm_log_day
    AS
    SELECT
      sum(ext_pv) as pv
      ,count(DISTINCT visitor_id) FILTER (WHERE ext_pv>0) as uv
      ,stat_date
      ,cate_id
      ,item_id
    FROM log_ri_base
    GROUP BY stat_date
      ,cate_id
      ,item_id
    ;
    
    
    CREATE TEMPORARY VIEW itm_log_hh_00
    AS
    SELECT
      sum(ext_pv) as pv
      ,count(DISTINCT visitor_id) FILTER (WHERE ext_pv>0) as uv
      ,stat_date
      ,stat_hour
      ,item_id
      ,cate_id
    FROM log_ri_base
    GROUP BY stat_date
      ,stat_hour
      ,cate_id
      ,item_id
    ;
    
    BEGIN STATEMENT SET;
    
    INSERT INTO vvp_ec_ads.ads_ec_log
    SELECT
      a.stat_date
      ,cast(a.item_id as varchar) as item_id
      ,a.cate_id
      ,b.cate_name
      ,cast(b.industry_id as varchar) as industry_id
      ,cast(b.xcat1_id as varchar) as xcat1_id
      ,cast(b.xcat2_id as varchar) as xcat2_id
      ,cast(b.xcat3_id as varchar) as xcat3_id
      ,cast(b.cate_level1_id as varchar) as cate_level1_id
      ,cast(b.cate_level2_id as varchar) as cate_level2_id
      ,cast(b.is_sw as varchar) as is_sw
      ,a.pv as mbr_ipv_1d
      ,a.uv as mbr_ipv_uv_1d
      ,DATE_FORMAT(CURRENT_TIMESTAMP,'yyyy-MM-dd HH:mm:ss') as log_gmt_modify
      ,DATE_FORMAT(CURRENT_TIMESTAMP,'yyyy-MM-dd HH:mm:ss') as gmt_modify
    FROM itm_log_day a
    JOIN ec.dim_tm_cate_360_2pt_ri FOR SYSTEM_TIME AS OF PROCTIME() AS b
    ON vvp_dt_rtcdm.DateAddOrSub(a.stat_date, -2) = b.stat_date 
        AND a.cate_id = b.cate_id
    ;
    
    --寫入
    INSERT INTO vvp_ec_ads.ads_ec_log_hh
    
    SELECT
      a.stat_date
      ,a.stat_hour
      ,cast(a.item_id as varchar) as item_id
      ,a.cate_id
      ,b.cate_name
      ,cast(b.industry_id as varchar) as industry_id
      ,cast(b.xcat1_id as varchar) as xcat1_id
      ,cast(b.xcat2_id as varchar) as xcat2_id
      ,cast(b.xcat3_id as varchar) as xcat3_id
      ,cast(b.cate_level1_id as varchar) as cate_level1_id
      ,cast(b.cate_level2_id as varchar) as cate_level2_id
      ,cast(b.is_sw as varchar) as is_sw
      ,a.pv as mbr_ipv_1h
      ,a.uv as mbr_ipv_uv_1h
      ,DATE_FORMAT(CURRENT_TIMESTAMP,'yyyy-MM-dd HH:mm:ss') as log_gmt_modify
      ,DATE_FORMAT(CURRENT_TIMESTAMP,'yyyy-MM-dd HH:mm:ss') as gmt_modify
    FROM itm_log_hh_00 a
    JOIN ec.dim_tm_cate_360_2pt_ri FOR SYSTEM_TIME AS OF PROCTIME() AS b
    ON vvp_ec_ads.DateAddOrSub(a.stat_date, -2) = b.stat_date 
        AND a.cate_id = b.cate_id
    ;
    
    END;
  3. 在節點編輯功能介面,單擊代碼編輯器右側的即時配置,配置Flink資源資訊指令碼參數Flink運行參數相關參數資訊。

    image

    預設Flink SQL Stream即時配置-專家模式代碼

    {
      "nodes": [
        {
          "profile": {
            "parallelism": 256,
            "maxParallelism": 32768,
            "minParallelism": 1,
            "group": "0"
          },
          "id": 1,
          "type": "StreamExecTableSourceScan",
          "desc": "Source: vvp_dt_rtcdm_dwd_tb_trd_ord_pay_nrt_ri[71980]"
        },
        {
          "profile": {
            "parallelism": 256,
            "maxParallelism": 32768,
            "minParallelism": 1,
            "group": "0"
          },
          "id": 2,
          "type": "StreamExecCalc",
          "desc": "Calc[71981]"
        },
        {
          "profile": {
            "parallelism": 256,
            "maxParallelism": 32768,
            "minParallelism": 1,
            "group": "0"
          },
          "id": 3,
          "type": "StreamExecLookupJoin",
          "desc": "LookupJoin[71982]"
        },
        {
          "profile": {
            "parallelism": 256,
            "maxParallelism": 32768,
            "minParallelism": 1,
            "group": "0"
          },
          "id": 4,
          "type": "StreamExecCalc",
          "desc": "Calc[71983]"
        },
        {
          "profile": {
            "parallelism": 256,
            "maxParallelism": 32768,
            "minParallelism": 1,
            "group": "1"
          },
          "id": 6,
          "state": [
            {
              "userDefined": false,
              "name": "groupAggregateState",
              "index": 0,
              "ttl": "36 h"
            }
          ],
          "type": "StreamExecGroupAggregate",
          "desc": "GroupAggregate[71985]"
        },
        {
          "profile": {
            "parallelism": 256,
            "maxParallelism": 32768,
            "minParallelism": 1,
            "group": "1"
          },
          "id": 7,
          "type": "StreamExecCalc",
          "desc": "Calc[71986]"
        },
        {
          "profile": {
            "parallelism": 256,
            "maxParallelism": 32768,
            "minParallelism": 1,
            "group": "1"
          },
          "id": 8,
          "type": "StreamExecSink",
          "desc": "ConstraintEnforcer[71987]"
        },
        {
          "profile": {
            "parallelism": 256,
            "maxParallelism": 32768,
            "minParallelism": 1,
            "group": "2"
          },
          "id": 10,
          "state": [
            {
              "userDefined": false,
              "name": "sinkMaterializeState",
              "index": 0,
              "ttl": "36 h"
            }
          ],
          "type": "StreamExecSink",
          "desc": "SinkMaterializer[71987]"
        },
        {
          "profile": {
            "parallelism": 256,
            "maxParallelism": 32768,
            "minParallelism": 1,
            "group": "2"
          },
          "id": 11,
          "type": "StreamExecSink",
          "desc": "Sink: vvp_dt_ads_tb_dev_ads_tb_idec_seckill_cate_bc_trd_flow_htr_000[71987]"
        }
      ],
      "vertices": {
        "2d95a2974e3b3137fd533ecfd3490bc5": [
          10,
          11
        ],
        "717c7b8afebbfb7137f6f0f99beb2a94": [
          1,
          2,
          3,
          4
        ],
        "44b79c13fdb45883c7f21ee510155f4d": [
          6,
          7,
          8
        ]
      },
      "edges": [
        {
          "mode": "PIPELINED",
          "source": 1,
          "strategy": "FORWARD",
          "target": 2
        },
        {
          "mode": "PIPELINED",
          "source": 2,
          "strategy": "FORWARD",
          "target": 3
        },
        {
          "mode": "PIPELINED",
          "source": 3,
          "strategy": "FORWARD",
          "target": 4
        },
        {
          "mode": "PIPELINED",
          "source": 4,
          "strategy": "HASH",
          "target": 6
        },
        {
          "mode": "PIPELINED",
          "source": 6,
          "strategy": "FORWARD",
          "target": 7
        },
        {
          "mode": "PIPELINED",
          "source": 7,
          "strategy": "FORWARD",
          "target": 8
        },
        {
          "mode": "PIPELINED",
          "source": 8,
          "strategy": "HASH",
          "target": 10
        },
        {
          "mode": "PIPELINED",
          "source": 10,
          "strategy": "FORWARD",
          "target": 11
        }
      ],
      "ssgProfiles": [
        {
          "managed": {},
          "name": "0",
          "cpu": 0.25,
          "offHeap": "32 mb",
          "heap": "992 mb",
          "extended": {}
        },
        {
          "managed": {
            "STATE_BACKEND": "512 mb"
          },
          "name": "1",
          "cpu": 0.25,
          "offHeap": "32 mb",
          "heap": "480 mb",
          "extended": {}
        },
        {
          "managed": {
            "STATE_BACKEND": "512 mb"
          },
          "name": "2",
          "cpu": 0.25,
          "offHeap": "32 mb",
          "heap": "480 mb",
          "extended": {}
        }
      ]
    

    預設Flink SQL Stream即時配置-Flink運行參數-其他配置

    blob.fetch.backlog: 1000
    taskmanager.debug.memory.log-interval: 5000
  4. 完成即時配置後,單擊代碼編輯器上方的儲存,單擊代碼編輯器上方的發布,頁面右下方會出現發佈動作介面,單擊上線發布內容操作介面中的開始發布生產後,依次進行檢查和確認即可。

    image

步驟四:進入個人開發環境

個人開發環境,支援自訂容器鏡像,支援對接使用者NAS,支援對接Git,支援Python編程與Notebook。

Data Studio頁面,單擊頁面頂部image,在下拉式功能表中選中您需要進入的個人開發環境,等待頁面返回即可。

image

步驟五:Python編程與調試

DataWorks深度整合DSW,在進入個人開發環境後,Data Studio支援Python語言的編寫、調試、運行及調度。

重要

該步驟需要先完成步驟四:進入個人開發環境後方可開始。

  1. Data Studio頁面,且已進入個人開發環境,單擊workspace目錄,單擊個人目錄右側的image,在左側列表上會新增一個未命名的檔案,輸入預設檔案名稱,敲擊斷行符號鍵,等待檔案產生即可。

    預設檔案名稱ec_item_rec.py

    image

  2. 在Python檔案編輯頁面的代碼編輯器中,先輸入預設的Python代碼,再單擊代碼編輯器上方的運行Python檔案,在頁面下方的終端中查詢運行結果。

    image

    image

    預設Python代碼

    import pandas as pd
    from surprise import Dataset, Reader, SVD
    from surprise.model_selection import train_test_split
    from surprise import accuracy
    
    # 建立樣本資料
    data_dict = {
        'user_id': [1, 1, 1, 2, 2, 2, 3, 3, 4],
        'item_id': [101, 102, 103, 101, 104, 105, 102, 105, 101],
        'rating': [5, 3, 4, 2, 4, 5, 4, 5, 3]
    }
    
    # 將資料轉換為DataFrame
    df = pd.DataFrame(data_dict)
    
    # 使用Surprise庫準備資料集
    reader = Reader(rating_scale=(1, 5))
    data = Dataset.load_from_df(df[['user_id', 'item_id', 'rating']], reader)
    
    # 分割資料集為訓練集和測試集
    trainset, testset = train_test_split(data, test_size=0.2)
    
    # 使用SVD演算法進行推薦
    model = SVD()
    model.fit(trainset)
    
    # 進行預測
    predictions = model.test(testset)
    
    # 計算RMSE
    rmse = accuracy.rmse(predictions)
    print(f'RMSE: {rmse:.2f}')
    
    # 擷取某個使用者的推薦商品
    def get_recommendations(user_id, model, all_items, n=3):
        item_ids = all_items['item_id'].unique()
        user_item_col = all_items[(all_items['user_id'] == user_id)]['item_id']
        unseen_items = [item for item in item_ids if item not in user_item_col.values]
    
        # 預測未見商品的評分
        predictions = []
        for item in unseen_items:
            pred = model.predict(user_id, item)
            predictions.append((item, pred.est))
    
        # 按照預測評分進行排序
        predictions.sort(key=lambda x: x[1], reverse=True)
        return predictions[:n]
    
    # 擷取商品推薦
    all_items = df
    user_id = 1  # 需要擷取推薦的使用者ID
    recommendations = get_recommendations(user_id, model, all_items)
    
    print(f'推薦給使用者 {user_id} 的商品:')
    for item_id, score in recommendations:
        print(f'商品ID: {item_id}, 預測評分: {score:.2f}')

    Python環境安裝

    pip install pandas scikit-surprise
  3. 單擊Python檔案編輯頁面的代碼編輯器上方的調試Python檔案,在代碼編輯器中程式碼號左側可以單擊產生斷點,代碼編輯器左側面板上方單擊image進行代碼調試。

    image

步驟六:Notebook資料探索

在進行Notebook資料探索時,Notebook資料探索相關操作是基於個人開發環境的,您需要先完成前面的步驟四:進入個人開發環境後方可開始。

建立Notebook

  1. 進入Data Studio > 資料開發

  2. 個人目錄中,按右鍵目標檔案夾,選擇建立Notebook

  3. 輸入Notebook名稱,單擊斷行符號鍵 頁面空白位置,使Notebook名稱生效。

  4. 在個人目錄中單擊Notebook名稱,即可開啟並進入Notebook編輯頁面。

Notebook使用

說明

如下內容為獨立操作步驟,不分先後順序,可以按需體驗。

  • Notebook多引擎開發

    EMR Spark SQL

    1. 在DataWorks Notebook中單擊image按鈕,建立SQL Cell

    2. 在SQL Cell中,輸入以下語句,完成dim_ec_mbr_user_info 表的查詢。

      dim_ec_mbr_user_info

      -- 說明:基於「會員資訊源表」和「地區源表」,查詢某電商平台的會員基礎資訊。
      USE openlake_win.default;
      SELECT  user.user_id AS user_id
              ,user.nick AS nick
              ,user.gmt_create AS gmt_modified
              ,user.gmt_modified AS gmt_create
              ,user.reg_fullname AS reg_fullname
              ,user.reg_mobile_phone AS reg_mobile_phone
              ,user.reg_email AS reg_email
              ,user.reg_gender AS reg_gender
              ,user.reg_gender_name AS reg_gender_name
              ,user.reg_birthdate AS reg_birthdate
              ,user.reg_address AS reg_address
              ,user.reg_nation_id AS reg_nation_id
              ,user.reg_nation_name AS reg_nation_name
              ,user.reg_prov_id AS reg_prov_id
              ,area_prov.name AS reg_prov_name
              ,user.reg_city_id AS reg_city_id
              ,area_city.name AS reg_city_name
              ,user.user_regip AS user_regip
              ,user.id_card_type AS id_card_type
              ,user.id_card_type_name AS id_card_type_name
              ,user.id_card_number AS id_card_number
              ,null as id_gender
              ,null as id_bday
              ,(2024 - CAST(SUBSTR(user.id_card_number,7,4) AS INT)) AS id_age
              ,user.user_regdate AS user_regdate
              ,user.user_active_type AS user_active_type
              ,user.user_active_name AS user_active_name
              ,user.user_active_time AS user_active_time
              ,user.vip_level AS vip_level
              ,user.vip_level_name AS vip_level_name
              ,user.is_delete AS is_delete
      FROM    (
                  SELECT  id    -- 主鍵
                          ,gmt_create    -- 建立時間
                          ,gmt_modified    -- 修改時間
                          ,user_id    -- 會員數字ID
                          ,nick    -- 會員NICK。會員暱稱
                          ,reg_fullname    -- 個人認證表示真實姓名,企業認證表示企業名稱
                          ,reg_mobile_phone    -- 註冊時綁定手機號碼
                          ,reg_email    -- 註冊填寫EMAIL(使用者可以修改)
                          ,reg_gender    -- 註冊填寫性別(F女,M男,不是這兩個就是未知的,說明性別保密)
                          ,CASE    WHEN reg_gender='F' THEN '女'
                                   WHEN reg_gender='M' THEN '男' 
                                   ELSE '未知' 
                           END AS reg_gender_name    -- 註冊填寫性別(F女,M男,不是這兩個就是未知的,說明性別保密)
                          ,reg_birthdate    -- 註冊填寫生日(使用者可以修改)
                          ,reg_address    -- 註冊填寫地址(使用者可以修改)
                          ,reg_nation_id    -- 註冊填寫國家ID(暫時為空白)
                          ,CASE    WHEN reg_nation_id='cn' THEN '中國' 
                                   ELSE '海外' 
                           END AS reg_nation_name
                          ,reg_prov_id    -- 註冊填寫省ID
                          ,reg_city_id    -- 註冊填寫城市ID
                          ,user_regip    -- 註冊IP
                          ,id_card_type    -- 會員認證證件類型 0:未知 1:身份證 2:企業營業執照號
                          ,CASE    WHEN id_card_type=0 THEN '未知'
                                   WHEN id_card_type=1 THEN '身份證'
                                   WHEN id_card_type=2 THEN '企業營業執照號' 
                                   ELSE '異常' 
                           END AS id_card_type_name
                          ,id_card_number    -- 個人認證表示社會安全號碼,企業認證表示企業的營業執照號,沒有認證不保證準確性
                          ,user_regdate    -- 註冊時間
                          ,user_active_type    -- 使用者啟用方式
                          ,CASE    WHEN user_active_type='email' THEN '郵箱'
                                   WHEN user_active_type='mobile_phone' THEN '手機' 
                                   ELSE '異常' 
                           END AS user_active_name    -- 使用者啟用方式
                          ,user_active_time    -- 啟用時間
                          ,cast(vip_level AS BIGINT) AS vip_level    -- VIP等級
                          ,CASE    WHEN vip_level>0 AND vip_level<=3 THEN '初級'
                                   WHEN vip_level>3 AND vip_level<=6 THEN '中級'
                                   WHEN vip_level>6 AND vip_level<=10 THEN '進階' 
                                   WHEN vip_level>10  THEN '特級' 
                           ELSE '異常'
                           END  AS vip_level_name
                          ,is_delete    -- 是否刪除
                  FROM    ods_mbr_user_info
              ) AS USER
      LEFT JOIN (
                    SELECT  id,pid,name,shortname,longitude,latitude,level,sort
                    FROM    ods_t_area
                ) AS area_prov
      ON      user.reg_prov_id = area_prov.id 
      LEFT JOIN    (
                  SELECT  id,pid,name,shortname,longitude,latitude,level,sort
                  FROM    ods_t_area
              ) AS area_city
      ON      user.reg_city_id = area_city.id
      ;
    3. 在SQL Cell右下角,選擇SQL Cell類型為EMR Spark SQL,選擇計算資源為openlake_serverless_spark

      image

    4. 單擊運行按鈕,等待運行完成,查看資料結果。

    StarRocks SQL

    1. 在DataWorks Notebook中單擊image按鈕,建立SQL Cell,如下圖:

    2. 在SQL Cell中,輸入以下語句,完成dws_ec_trd_cate_commodity_gmv_kpi_fy 表的查詢。

      dws_ec_trd_cate_commodity_gmv_kpi_fy

      -- 說明:基於「交易下單事實明細表」和「商品基礎資訊維度資料表」查詢"財年_訂單支付成功金額、財年_成交金額完成度"等資料指標
      USE `openlake_win`.`default`;
      select   t1.cate_id, t1.cate_name, t1.commodity_id, t1.commodity_name, round(10*sum(t1.total_fee),4) as pay_ord_amt_fy, round((10*sum(t1.total_fee)/30000000),4) as kpi_gmv_rate_fy
      from    (
                  select  DATE_FORMAT(a.gmt_create,'yyyymmdd') as stat_date
                          ,a.sub_order_id, a.buyer_id, a.item_id, a.biz_type, a.pay_status, a.total_fee/100 as total_fee, b.cate_id, b.cate_name, b.commodity_id, b.commodity_name 
                  from    `openlake_win`.`default`.dwd_ec_trd_create_ord_di a
                  left outer join (
                                      select  distinct item_id, cate_id, cate_name, commodity_id, commodity_name, shop_id, shop_nick
                                      from    `openlake_win`.`default`.dim_ec_itm_item_info
                                  ) b
                  on      a.item_id = b.item_id
                  and     a.shop_id = b.shop_id
              ) t1
      where   t1.pay_status in ('2')
      and     t1.biz_type in ('2','3','4')
      group by   t1.cate_id, t1.cate_name, t1.commodity_id, t1.commodity_name
      ;
    3. 在SQL Cell右下角,選擇SQL Cell類型為StarRocks SQL,選擇計算資源為openlake_starrocks

      image

    4. 單擊運行按鈕,等待運行完成,查看資料結果。

    Hologres SQL

    1. 在DataWorks Notebook中單擊image按鈕,建立SQL Cell

    2. 在SQL Cell中,輸入以下語句,完成dws_ec_mbr_cnt_std 表的查詢。

      dws_ec_mbr_cnt_std

      -- 說明:「會員基礎資訊維度資料表」的資料進行計算轉換得到“存量會員數”等資料指標,獲得歷史截至當日_存量會員數_cube統計情況
      SELECT    IF(grouping(reg_prov_id) = 0, reg_prov_id, '-9999') as reg_prov_id
              , IF(grouping(reg_prov_name) = 0, reg_prov_name, '全部') as reg_prov_name
              , IF(grouping(reg_gender) = 0, reg_gender, '-9999') as reg_gender
              , IF(grouping(reg_gender_name) = 0, reg_gender_name, '全部') as reg_gender_name
              , IF(grouping(age_tag) = 0, age_tag, '-9999') as age_tag
              , IF(grouping(user_active_type) = 0, user_active_type, '-9999') as user_active_type
              , IF(grouping(user_active_name) = 0, user_active_name, '全部') as user_active_name
              , IF(grouping(vip_level) = 0, vip_level, '-9999') as vip_level
              , IF(grouping(vip_level_name) = 0, vip_level_name, '全部') as vip_level_name 
              , count(distinct user_id) as mbr_cnt
      from (
          select    reg_prov_id
                  , reg_prov_name
                  , reg_gender
                  , reg_gender_name
                  , case when cast(substr(reg_birthdate,1,4) as int)>=2010 and cast(substr(reg_birthdate,1,4) as int)<2020 then '10後' 
                          when cast(substr(reg_birthdate,1,4) as int)>=2000 and cast(substr(reg_birthdate,1,4) as int)<2010 then '00後' 
                          when cast(substr(reg_birthdate,1,4) as int)>=1990 and cast(substr(reg_birthdate,1,4) as int)<2000 then '90後' 
                          when cast(substr(reg_birthdate,1,4) as int)>=1980 and cast(substr(reg_birthdate,1,4) as int)<1990 then '80後' 
                          when cast(substr(reg_birthdate,1,4) as int)>=1970 and cast(substr(reg_birthdate,1,4) as int)<1980 then '70後' 
                          else '其他' 
                    end as age_tag
                  , user_active_type
                  , user_active_name
                  , vip_level
                  , vip_level_name 
                  , user_id
          from    openlake_win.default.dim_ec_mbr_user_info
      ) _main       
      group by 
      grouping sets(
          (reg_prov_id, reg_prov_name)
         ,(reg_gender, reg_gender_name)
         ,(age_tag)
         ,(user_active_type, user_active_name)
         ,(vip_level, vip_level_name)
         ,()
      );
    3. 在SQL Cell右下角,選擇SQL Cell類型為Hologres SQL,選擇計算資源為openlake_hologres

      image

    4. 單擊運行按鈕,等待運行完成,查看資料結果。

    MaxCompute SQL

    1. 在DataWorks Notebook中單擊image按鈕,建立SQL Cell

    2. 在SQL Cell中,輸入以下語句,完成dws_ec_mbr_cnt_std 表的查詢。

      dws_ec_mbr_cnt_std

      -- 說明:查詢輕度匯總層「歷史截至當日_存量會員數_cube統計表」
      set odps.task.major.version=flighting;
      set odps.namespace.schema=true;
      set odps.sql.allow.namespace.schema=true;
      set odps.service.mode=only;
      set odps.sql.unstructured.data.split.size=1;
      
      SELECT * 
      FROM openlake_win.default.dws_ec_mbr_cnt_std 
      LIMIT 200;
    3. 在SQL Cell右下角,選擇SQL Cell類型為MaxCompute SQL,選擇計算資源為openlake_maxcompute

      image

    4. 單擊運行按鈕,等待運行完成,查看資料結果。

  • Notebook互動式資料
    1. 在DataWorks Notebook中單擊image按鈕,建立Python Cell

    2. 在Python Cell右上方,單擊image按鈕,呼出DataWorks Copilot智能編程助手。

    3. 在DataWorks Copilot輸入框中,輸入以下需求,用於產生一個查詢會員年齡的ipywidgets互動組件。

      說明

      需求描述:使用Python,產生一個會員年齡的滑動條組件,取值範圍從1到100,預設值為20,即時監測組件取值的變化,並將值儲存到全域變數query_age中。

    4. 查看DataWorks Copilot產生的Python代碼,單擊接受按鈕。

      image

    5. 單擊Python Cell的運行按鈕,等待運行完成,查看互動組件的產生(運行Copilot產生的程式碼,或預設代碼);同時,能夠在互動組件中滑動選擇目標年齡。

      產生ipywidgets互動組件範例程式碼

      import ipywidgets as widgets
      
      # 建立滑動條組件
      slider = widgets.IntSlider(
          min = 1,
          max = 100,
          value = 20,
          description = '年齡:',
      )
      
      # 定義全域變數query_age
      query_age = None
      
      
      # 定義函數來處理滑動條值的變化
      def on_slider_change(change):
          global query_age
          query_age = change.new
      
      # 將函數綁定到滑動條的值變化事件上
      slider.observe(on_slider_change,names='value')
      
      # 顯示滑動條
      display(slider)

      image

    6. 在DataWorks Notebook中單擊image按鈕,建立SQL Cell

    7. 在SQL Cell中,輸入以下查詢語句,包含Python中定義的會員年齡變數${query_age}

      SELECT * FROM openlake_win.default.dim_ec_mbr_user_info
      WHERE CAST(id_age AS INT) >= ${query_age};
    8. 在SQL Cell右下角,選擇SQL Cell類型為Hologres SQL,選擇計算資源為openlake_hologres

      image

    9. 單擊運行按鈕,等待運行完成,查看資料結果。

    10. 在運行結果中,單擊image按鈕,產生可視化圖表。

  • Notebook模型開發與訓練
    1. 在DataWorks Notebook中單擊image按鈕,建立SQL Cell

    2. 在SQL Cell中,輸入以下語句,完成ods_trade_order表的查詢。

      SELECT * FROM openlake_win.default.ods_trade_order;
    3. 將SQL查詢結果寫入DataFrame變數中,單擊df位置,自訂DataFrame變數名稱(例如:df_ml)。

      image

    4. 單擊SQL Cell的運行按鈕,等待運行完成,查看資料結果。

    5. 在DataWorks Notebook中單擊image按鈕,建立Python Cell

    6. 在Python Cell中,輸入以下語句,使用Pandas完成資料清洗和處理,並存入DataFrame的新變數df_ml_clean中。

      import pandas as pd
      
      def clean_data(df_ml):
          # 產生新的一列:預估訂單總額 = 商品單價 * 商品數量
          df_ml['predict_total_fee'] = df_ml['item_price'].astype(float).values * df_ml['buy_amount'].astype(float).values
          # 將列 'total_fee' 重新命名為 'actual_total_fee'
          df_ml = df_ml.rename(columns={'total_fee': 'actual_total_fee'})
          return df_ml
      
      df_ml_clean = clean_data(df_ml.copy())
      df_ml_clean.head()
    7. 單擊Python Cell的運行按鈕,等待運行完成,查看資料清理結果。

    8. 在DataWorks Notebook中單擊image按鈕,再次建立Python Cell

    9. 在Python Cell中,輸入以下語句,構建一個線性迴歸的機器學習模型,並進行訓練和測試。

      import pandas as pd  
      from sklearn.model_selection import train_test_split  
      from sklearn.linear_model import LinearRegression  
      from sklearn.metrics import mean_squared_error  
        
      # 擷取商品價格及總費用
      X = df_ml_clean[['predict_total_fee']].values  
      y = df_ml_clean['actual_total_fee'].astype(float).values  
      
      # 準備資料  
      X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.05, random_state=42)  
      
      # 建立並訓練模型  
      model = LinearRegression()  
      model.fit(X_train, y_train)  
        
      # 預測和評估  
      y_pred = model.predict(X_test)  
      for index, (x_t, y_pre, y_t) in enumerate(zip(X_test, y_pred, y_test)):
          print("[{:>2}] input: {:<10} prediction:{:<10} gt: {:<10}".format(str(index+1), f"{x_t[0]:.3f}", f"{y_pre:.3f}", f"{y_t:.3f}"))
      
      # 計算均方誤差MSE
      mse = mean_squared_error(y_test, y_pred)  
      print("均方誤差(MSE):", mse)
    10. 單擊運行按鈕,等待運行完成,查看模型訓練的測試結果。