全部產品
Search
文件中心

Platform For AI:使用TensorFlow實現分布式DeepFM演算法

更新時間:Jul 13, 2024

本文為您介紹如何使用TensorFlow實現分布式DeepFM演算法。

警告

公用雲GPU伺服器即將過保下線,您可以繼續提交CPU版本的TensorFlow任務。如需使用GPU進行模型訓練,請前往DLC提交任務,具體操作請參見建立訓練任務

前提條件

背景資訊

DeepFM演算法對應Wide&Deep部分,且將LR替換為FM,從而避免人工特徵工程。

訓練資料來源為pai_online_project.dwd_avazu_ctr_deepmodel_train,測試資料來源為pai_online_project.dwd_avazu_ctr_deepmodel_test,都是公開資料來源,您可以直接使用。

操作步驟

  1. 下載模型檔案

  2. 修改模型配置代碼。

    1. 修改特徵參數,每個特徵需要配置embedding_dimhash_bucketsdefault_value

      self.fields_config_dict['hour'] = {'field_name': 'field1', 'embedding_dim': self.embedding_dim, 'hash_bucket': 50, 'default_value': '0'}
      self.fields_config_dict['c1'] = {'field_name': 'field2', 'embedding_dim': self.embedding_dim, 'hash_bucket': 10, 'default_value': '0'}

      DeepFM模型中,需要將所有特徵的embedding_dim配置為相同值,而Wide&Deep模型無此限制。對於user_iditemid,建議將hash_buckets配置為較大值,而其他取值較少的特徵建議將hash_buckets配置為較小值。

    2. 配置模型,推薦使用DeepFM(deepfm)和Wide&Deepwdl)。

      tf.app.flags.DEFINE_string("model", 'deepfm', "model {'wdl', 'deepfm'}")
    3. 配置分布式參數。

      tf.app.flags.DEFINE_string("job_name", "", "job name")
      tf.app.flags.DEFINE_integer("task_index", None, "Worker or server index")
      tf.app.flags.DEFINE_string("ps_hosts", "", "ps hosts")
      tf.app.flags.DEFINE_string("worker_hosts", "", "worker hosts")

      提交訓練任務時,只需要配置cluster(詳情請參見下述提交訓練任務步驟),系統自動產生分布式參數。

    4. 配置輸入資料。

        def _parse_batch_for_tabledataset(self, *args):
          label = tf.reshape(args[0], [-1])
          fields = [tf.reshape(v, [-1]) for v in args[1:]]
          return dict(zip(self.feas_name, fields)), label
      
        def train_input_fn_from_odps(self, data_path, epoch=10, batch_size=1024, slice_id=0, slice_count=1):
          with tf.device('/cpu:0'):
            dataset = tf.data.TableRecordDataset([data_path], record_defaults=self.record_defaults,
                               slice_count=slice_count, slice_id=slice_id)
            dataset = dataset.batch(batch_size).repeat(epoch)
            dataset = dataset.map(self._parse_batch_for_tabledataset, num_parallel_calls=8).prefetch(100)
            return dataset
      
        def val_input_fn_from_odps(self, data_path, epoch=1, batch_size=1024, slice_id=0, slice_count=1):
          with tf.device('/cpu:0'):
            dataset = tf.data.TableRecordDataset([data_path], record_defaults=self.record_defaults,
                               slice_count=slice_count, slice_id=slice_id)
            dataset = dataset.batch(batch_size).repeat(epoch)
            dataset = dataset.map(self._parse_batch_for_tabledataset, num_parallel_calls=8).prefetch(100)
            return dataset

      如果需要進行特徵變換,建議在模型外通過MaxCompute實現,從而節約訓練開銷。如果在模型中進行特徵變換,建議在_parse_batch_for_tabledataset函數中實現。

  3. 上傳已修改的模型檔案至OSS。

  4. 可選:提交訓練任務。

    說明

    如果沒有訓練完成的模型檔案,則必須執行該步驟。反之,可以跳過該步驟,直接提交離線推理任務。

    根據配置的模型類型,選擇提交命令:

    • DeepFM

      pai -name tensorflow1120_ext
      -project algo_public
      -Dbuckets='oss://bucket_name.oss-cn-region-internal.aliyuncs.com/'
      -Darn=''
      -Dscript='oss://bucket_name.oss-cn-region-internal.aliyuncs.com/demo/deepfm_pai_ctr.py'
      -Dtables='odps://pai_online_project/tables/dwd_avazu_ctr_predict_deepmodel_train,odps://pai_online_project/tables/dwd_avazu_ctr_predict_deepmodel_test'
      -DuserDefinedParameters="--task_type='train' --model='deepfm' --checkpoint_dir='oss://bucket_name/path/' --output_dir='oss://bucket_name/path/'"
      -Dcluster='{\"ps\":{\"count\":2,\"cpu\":1200,\"memory\":10000},\"worker\":{\"count\":8,\"cpu\":1200,\"memory\":30000}}';

      需要根據實際情況修改project_namebucket_name、帳號arn(請參見角色ARN)及地區。

    • Wide&Deep

      pai -name tensorflow1120_ext
      -project algo_public
      -Dbuckets='oss://bucket_name.oss-cn-region-internal.aliyuncs.com/'
      -Darn=''
      -Dscript='oss://bucket_name.oss-cn-region-internal.aliyuncs.com/demo/deepfm_pai_ctr.py'
      -Dtables='odps://pai_online_project/tables/dwd_avazu_ctr_predict_deepmodel_train,odps://pai_online_project/tables/dwd_avazu_ctr_predict_deepmodel_test'
      -DuserDefinedParameters="--task_type='train' --model='wdl' --checkpoint_dir='oss://bucket_name/path/' --output_dir='oss://bucket_name/path/'"
      -Dcluster='{\"ps\":{\"count\":2,\"cpu\":1200,\"memory\":10000},\"worker\":{\"count\":8,\"cpu\":1200,\"memory\":30000}}';

      需要根據實際情況修改project_namebucket_name、帳號arn(請參見角色ARN)及地區。

    因為該版本為PS分布式,所以需要配置cluster。樣本中的cluster參數表示申請兩個PS節點和8個Worker節點。同時,每個PS節點擁有12個CPU和10 GB記憶體,每個Worker節點擁有12個CPU及30 GB記憶體。各參數的具體介紹請參見PAI-TF任務參數介紹

  5. 提交離線推理任務。

    pai -name tensorflow1120_ext
    -project algo_public
    -Dbuckets='oss://bucket_name.oss-cn-region-internal.aliyuncs.com/'
    -Darn=''
    -Dscript='oss://bucket_name.oss-cn-region-internal.aliyuncs.com/demo/deepfm_pai_ctr.py'
    -Dtables='odps://pai_online_project/tables/dwd_avazu_ctr_predict_deepmodel_train,odps://pai_online_project/tables/dwd_avazu_ctr_predict_deepmodel_test'
    -DuserDefinedParameters="--task_type='predict' --model='deepfm' --checkpoint_dir='oss://bucket_name/path/' --output_dir='oss://bucket_name/path/'"
    -Dcluster='{\"worker\":{\"count\":8,\"cpu\":1200,\"memory\":30000}}'
    -Doutputs='odps://project_name/tables/output_table_name';

    需要根據實際情況修改project_namebucket_name、帳號arn(請參見角色ARN)及地區。

    離線推理需要一張已建立的outputs表,每次執行推理任務的結果會覆蓋該表。建立表的樣本命令如下。

    drop table project_name.output_table_name;
    create table project_name.output_table_name
    (
       probabilities STRING
       ,logits STRING
    )STORED AS ALIORC;
  6. 匯出訓練完成的模型檔案。

    pai -name tensorflow1120_ext
    -project algo_public
    -Dbuckets='oss://bucket_name.oss-cn-region-internal.aliyuncs.com/'
    -Darn=''
    -Dscript='oss://bucket_name.oss-cn-region-internal.aliyuncs.com/demo/deepfm_pai_ctr.py'
    -Dtables='odps://pai_online_project/tables/dwd_avazu_ctr_predict_deepmodel_train,odps://pai_online_project/tables/dwd_avazu_ctr_predict_deepmodel_test'
    -DuserDefinedParameters="--task_type='savemodel' --model='deepfm' --checkpoint_dir='oss://bucket_name/path/' --output_dir='oss://bucket_name/path/'"
    -Dcluster='{\"ps\":{\"count\":2,\"cpu\":1200,\"memory\":10000},\"worker\":{\"count\":8,\"cpu\":1200,\"memory\":30000}}';

    需要根據實際情況修改project_namebucket_name、帳號arn(請參見角色ARN)及地區。系統實際使用單Worker執行匯出模型任務。