本文為您介紹如何使用TensorFlow實現分布式DeepFM演算法。
公用雲GPU伺服器即將過保下線,您可以繼續提交CPU版本的TensorFlow任務。如需使用GPU進行模型訓練,請前往DLC提交任務,具體操作請參見建立訓練任務。
前提條件
開通OSS,並建立Bucket,詳情請參見開通OSS服務和控制台建立儲存空間。
重要建立Bucket時,不要開通版本控制,否則同名檔案無法覆蓋。
完成OSS訪問授權,詳情請參見雲產品依賴與授權:Designer。
背景資訊
DeepFM演算法對應Wide&Deep部分,且將LR替換為FM,從而避免人工特徵工程。
訓練資料來源為pai_online_project.dwd_avazu_ctr_deepmodel_train,測試資料來源為pai_online_project.dwd_avazu_ctr_deepmodel_test,都是公開資料來源,您可以直接使用。
操作步驟
下載模型檔案。
修改模型配置代碼。
修改特徵參數,每個特徵需要配置embedding_dim、hash_buckets及default_value。
self.fields_config_dict['hour'] = {'field_name': 'field1', 'embedding_dim': self.embedding_dim, 'hash_bucket': 50, 'default_value': '0'} self.fields_config_dict['c1'] = {'field_name': 'field2', 'embedding_dim': self.embedding_dim, 'hash_bucket': 10, 'default_value': '0'}
DeepFM模型中,需要將所有特徵的embedding_dim配置為相同值,而Wide&Deep模型無此限制。對於user_id和itemid,建議將hash_buckets配置為較大值,而其他取值較少的特徵建議將hash_buckets配置為較小值。
配置模型,推薦使用DeepFM(deepfm)和Wide&Deep(wdl)。
tf.app.flags.DEFINE_string("model", 'deepfm', "model {'wdl', 'deepfm'}")
配置分布式參數。
tf.app.flags.DEFINE_string("job_name", "", "job name") tf.app.flags.DEFINE_integer("task_index", None, "Worker or server index") tf.app.flags.DEFINE_string("ps_hosts", "", "ps hosts") tf.app.flags.DEFINE_string("worker_hosts", "", "worker hosts")
提交訓練任務時,只需要配置cluster(詳情請參見下述提交訓練任務步驟),系統自動產生分布式參數。
配置輸入資料。
def _parse_batch_for_tabledataset(self, *args): label = tf.reshape(args[0], [-1]) fields = [tf.reshape(v, [-1]) for v in args[1:]] return dict(zip(self.feas_name, fields)), label def train_input_fn_from_odps(self, data_path, epoch=10, batch_size=1024, slice_id=0, slice_count=1): with tf.device('/cpu:0'): dataset = tf.data.TableRecordDataset([data_path], record_defaults=self.record_defaults, slice_count=slice_count, slice_id=slice_id) dataset = dataset.batch(batch_size).repeat(epoch) dataset = dataset.map(self._parse_batch_for_tabledataset, num_parallel_calls=8).prefetch(100) return dataset def val_input_fn_from_odps(self, data_path, epoch=1, batch_size=1024, slice_id=0, slice_count=1): with tf.device('/cpu:0'): dataset = tf.data.TableRecordDataset([data_path], record_defaults=self.record_defaults, slice_count=slice_count, slice_id=slice_id) dataset = dataset.batch(batch_size).repeat(epoch) dataset = dataset.map(self._parse_batch_for_tabledataset, num_parallel_calls=8).prefetch(100) return dataset
如果需要進行特徵變換,建議在模型外通過MaxCompute實現,從而節約訓練開銷。如果在模型中進行特徵變換,建議在_parse_batch_for_tabledataset函數中實現。
上傳已修改的模型檔案至OSS。
可選:提交訓練任務。
說明如果沒有訓練完成的模型檔案,則必須執行該步驟。反之,可以跳過該步驟,直接提交離線推理任務。
根據配置的模型類型,選擇提交命令:
DeepFM
pai -name tensorflow1120_ext -project algo_public -Dbuckets='oss://bucket_name.oss-cn-region-internal.aliyuncs.com/' -Darn='' -Dscript='oss://bucket_name.oss-cn-region-internal.aliyuncs.com/demo/deepfm_pai_ctr.py' -Dtables='odps://pai_online_project/tables/dwd_avazu_ctr_predict_deepmodel_train,odps://pai_online_project/tables/dwd_avazu_ctr_predict_deepmodel_test' -DuserDefinedParameters="--task_type='train' --model='deepfm' --checkpoint_dir='oss://bucket_name/path/' --output_dir='oss://bucket_name/path/'" -Dcluster='{\"ps\":{\"count\":2,\"cpu\":1200,\"memory\":10000},\"worker\":{\"count\":8,\"cpu\":1200,\"memory\":30000}}';
需要根據實際情況修改project_name、bucket_name、帳號arn(請參見角色ARN)及地區。
Wide&Deep
pai -name tensorflow1120_ext -project algo_public -Dbuckets='oss://bucket_name.oss-cn-region-internal.aliyuncs.com/' -Darn='' -Dscript='oss://bucket_name.oss-cn-region-internal.aliyuncs.com/demo/deepfm_pai_ctr.py' -Dtables='odps://pai_online_project/tables/dwd_avazu_ctr_predict_deepmodel_train,odps://pai_online_project/tables/dwd_avazu_ctr_predict_deepmodel_test' -DuserDefinedParameters="--task_type='train' --model='wdl' --checkpoint_dir='oss://bucket_name/path/' --output_dir='oss://bucket_name/path/'" -Dcluster='{\"ps\":{\"count\":2,\"cpu\":1200,\"memory\":10000},\"worker\":{\"count\":8,\"cpu\":1200,\"memory\":30000}}';
需要根據實際情況修改project_name、bucket_name、帳號arn(請參見角色ARN)及地區。
因為該版本為PS分布式,所以需要配置cluster。樣本中的cluster參數表示申請兩個PS節點和8個Worker節點。同時,每個PS節點擁有12個CPU和10 GB記憶體,每個Worker節點擁有12個CPU及30 GB記憶體。各參數的具體介紹請參見PAI-TF任務參數介紹。
提交離線推理任務。
pai -name tensorflow1120_ext -project algo_public -Dbuckets='oss://bucket_name.oss-cn-region-internal.aliyuncs.com/' -Darn='' -Dscript='oss://bucket_name.oss-cn-region-internal.aliyuncs.com/demo/deepfm_pai_ctr.py' -Dtables='odps://pai_online_project/tables/dwd_avazu_ctr_predict_deepmodel_train,odps://pai_online_project/tables/dwd_avazu_ctr_predict_deepmodel_test' -DuserDefinedParameters="--task_type='predict' --model='deepfm' --checkpoint_dir='oss://bucket_name/path/' --output_dir='oss://bucket_name/path/'" -Dcluster='{\"worker\":{\"count\":8,\"cpu\":1200,\"memory\":30000}}' -Doutputs='odps://project_name/tables/output_table_name';
需要根據實際情況修改project_name、bucket_name、帳號arn(請參見角色ARN)及地區。
離線推理需要一張已建立的outputs表,每次執行推理任務的結果會覆蓋該表。建立表的樣本命令如下。
drop table project_name.output_table_name; create table project_name.output_table_name ( probabilities STRING ,logits STRING )STORED AS ALIORC;
匯出訓練完成的模型檔案。
pai -name tensorflow1120_ext -project algo_public -Dbuckets='oss://bucket_name.oss-cn-region-internal.aliyuncs.com/' -Darn='' -Dscript='oss://bucket_name.oss-cn-region-internal.aliyuncs.com/demo/deepfm_pai_ctr.py' -Dtables='odps://pai_online_project/tables/dwd_avazu_ctr_predict_deepmodel_train,odps://pai_online_project/tables/dwd_avazu_ctr_predict_deepmodel_test' -DuserDefinedParameters="--task_type='savemodel' --model='deepfm' --checkpoint_dir='oss://bucket_name/path/' --output_dir='oss://bucket_name/path/'" -Dcluster='{\"ps\":{\"count\":2,\"cpu\":1200,\"memory\":10000},\"worker\":{\"count\":8,\"cpu\":1200,\"memory\":30000}}';
需要根據實際情況修改project_name、bucket_name、帳號arn(請參見角色ARN)及地區。系統實際使用單Worker執行匯出模型任務。