全部產品
Search
文件中心

OpenSearch:行為資料推送Demo

更新時間:Jul 13, 2024

配置環境變數

配置環境變數ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET

重要
  • 阿里雲帳號AccessKey擁有所有API的存取權限,建議您使用RAM使用者進行API訪問或日常營運,具體操作,請參見建立RAM使用者

  • 建立AccessKey ID和AccessKey Secret,請參考建立AccessKey

  • 如果您使用的是RAM使用者的AccessKey,請確保主帳號已授權AliyunServiceRoleForOpenSearch服務關聯角色,請參考OpenSearch-行業演算法版服務關聯角色,相關文檔參考訪問鑒權規則

  • 請不要將AccessKey ID和AccessKey Secret儲存到工程代碼裡,否則可能導致AccessKey泄露,威脅您帳號下所有資源的安全。

  • LinuxmacOS系統配置方法:

    執行以下命令,其中, <access_key_id>需替換為您RAM使用者的AccessKey ID,<access_key_secret>替換為您RAM使用者的AccessKey Secret。

    export ALIBABA_CLOUD_ACCESS_KEY_ID=<access_key_id> 
    export ALIBABA_CLOUD_ACCESS_KEY_SECRET=<access_key_secret>
  • Windows系統配置方法

    1. 建立環境變數檔案,添加環境變數ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET,並寫入已準備好的AccessKey ID和AccessKey Secret。

    2. 重啟Windows系統生效。

引入依賴

pip install alibabacloud_tea_util 
pip install alibabacloud_opensearch_util
pip install alibabacloud_credentials

行為資料推送樣本

# -*- coding: utf-8 -*-

import time, os
from typing import Dict, Any

from Tea.exceptions import TeaException
from Tea.request import TeaRequest
from alibabacloud_tea_util import models as util_models

from BaseRequest import Config, Client


class opensearch:
    def __init__(self, config: Config):
        self.Clients = Client(config=config)
        self.runtime = util_models.RuntimeOptions(
            connect_timeout=10000,
            read_timeout=10000,
            autoretry=False,
            ignore_ssl=False,
            max_idle_conns=50,
            max_attempts=3
        )
        self.header = {}

    def behaviorBulk(self, app_name: str, collections_name: str, doc_content: list) -> Dict[str, Any]:
        try:
            response = self.Clients._request(method="POST",
                                             pathname=f'/v3/openapi/app-groups/{app_name}/data-collections/{collections_name}/data-collection-type/BEHAVIOR/actions/bulk',query={},headers = self.header,
                                             body=doc_content, runtime=self.runtime)
            return response
        except Exception as e:
            print(e)

if __name__ == "__main__":

    # 配置統一的請求入口和
    endpoint = "<endpoint>"

    # 支援 protocol 配置 HTTPS/HTTP
    endpoint_protocol = "HTTP"

    # 使用者識別資訊
    # 從環境變數讀取配置的AccessKey ID和AccessKey Secret,
    # 運行程式碼範例前必須先配置環境變數,參考文檔上面“配置環境變數”步驟
    access_key_id = os.environ.get("ALIBABA_CLOUD_ACCESS_KEY_ID")
    access_key_secret = os.environ.get("ALIBABA_CLOUD_ACCESS_KEY_SECRET")

    # 支援 type 配置 sts/access_key 鑒權. 其中 type 預設為 access_key 鑒權. 使用 sts 可配置 RAM-STS 鑒權.
    # 備選參數為:  sts 或者 access_key
    auth_type = "sts"

    # 如果使用 RAM-STS 鑒權, 請配置 security_token, 可使用 阿里雲 AssumeRole 擷取 相關 STS 鑒權結構.
    security_token = "<security_token>"

 # 配置請求使用的通用資訊.
    Configs = Config(endpoint=endpoint, access_key_id=access_key_id, access_key_secret=access_key_secret,
                     security_token=security_token, type=auth_type, protocol=endpoint_protocol)

    # 建立 opensearch 執行個體
    ops = opensearch(Configs)
    app_name = "app_name"
 
  # ---------------  行為日誌 ---------------

    # item_id  資訊 為 搜尋結果返回的主鍵資訊. 特定 為主鍵 id.
    item_id = "358713"

    # ops_request_misc 為搜尋請求返回的 ops_request_misc 資訊.
    ops_request_misc = "%7B%22request%5Fid%22%3A%22161777635816780357273903%22%2C%22scm%22%3A%2220140713.130149759..%22%7D"

    # bhv_type 為 行為事件的特徵資訊, 備選資訊如下為類別:
    #       expose(曝光)
    #       cart(加購物車)
    #       collect(收藏)
    #       like(點贊)
    #       comment(評論)
    #       buy(購買)
    #       click(點擊/查看)
    bhv_type = "click"

    # request_id 為搜尋請求返回的 request_id 資訊.
    request_id = "161777635816780357273903"

    # 該資料到達服務端的時間,格式:時間戳記,單位:秒
    reach_time = "1709708439"

    # 用於唯一標識終端應用上的使用者的ID。
    # *一般為登入使用者ID。
    # *對於PC端,如果是未登入使用者,也可以設定為cookieid
    user_id = "a7a0d37c824b659f36a5b9e3b819fcdd"
    behavior_fields1 = behavior_fields2 = {
        "item_id": item_id,
        "sdk_type": "opensearch_sdk",
        "sdk_version": "<sdk_version>",  # 當前使用的 opensearch sdk 的版本號碼.(當前pythonsdk版本號碼:3.2.0)
        "trace_id": "ALIBABA",  # 用於區分調用了哪個服務商的服務
        "trace_info": ops_request_misc,
        "bhv_type": bhv_type,
        "item_type": "item",
        "rn": request_id,
        "biz_id": "<biz_id>",  # 手機或終端應用用於區分業務的一個數值ID,該欄位可用來和OpenSearch上的應用或和AIRec上的執行個體做關聯
        "reach_time": reach_time,
        "user_id": user_id,
    }

    behavior_documents = [{"cmd": "add", "fields": behavior_fields1}, {"cmd": "add", "fields": behavior_fields2}]
    res6 = ops.behaviorBulk(app_name=app_name, collections_name=app_name, doc_content=behavior_documents)
    print(res6)
說明

相關參考:推送資料擷取