配置环境变量
配置环境变量ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET。
重要
阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维,具体操作,请参见创建RAM用户。
创建AccessKey ID和AccessKey Secret,请参考创建AccessKey。
如果您使用的是RAM用户的AccessKey,请确保主账号已授权AliyunServiceRoleForOpenSearch服务关联角色,请参考OpenSearch-行业算法版服务关联角色,相关文档参考访问鉴权规则。
请不要将AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。
Linux和macOS系统配置方法:
执行以下命令,其中,
<access_key_id>
需替换为您RAM用户的AccessKey ID,<access_key_secret>
替换为您RAM用户的AccessKey Secret。export ALIBABA_CLOUD_ACCESS_KEY_ID=<access_key_id> export ALIBABA_CLOUD_ACCESS_KEY_SECRET=<access_key_secret>
Windows系统配置方法
新建环境变量文件,添加环境变量ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET,并写入已准备好的AccessKey ID和AccessKey Secret。
重启Windows系统生效。
引入依赖
pip install alibabacloud_tea_util
pip install alibabacloud_opensearch_util
pip install alibabacloud_credentials
行为数据推送示例
# -*- coding: utf-8 -*-
import time, os
from typing import Dict, Any
from Tea.exceptions import TeaException
from Tea.request import TeaRequest
from alibabacloud_tea_util import models as util_models
from BaseRequest import Config, Client
class opensearch:
def __init__(self, config: Config):
self.Clients = Client(config=config)
self.runtime = util_models.RuntimeOptions(
connect_timeout=10000,
read_timeout=10000,
autoretry=False,
ignore_ssl=False,
max_idle_conns=50,
max_attempts=3
)
self.header = {}
def behaviorBulk(self, app_name: str, collections_name: str, doc_content: list) -> Dict[str, Any]:
try:
response = self.Clients._request(method="POST",
pathname=f'/v3/openapi/app-groups/{app_name}/data-collections/{collections_name}/data-collection-type/BEHAVIOR/actions/bulk',query={},headers = self.header,
body=doc_content, runtime=self.runtime)
return response
except Exception as e:
print(e)
if __name__ == "__main__":
# 配置统一的请求入口和
endpoint = "<endpoint>"
# 支持 protocol 配置 HTTPS/HTTP
endpoint_protocol = "HTTP"
# 用户识别信息
# 从环境变量读取配置的AccessKey ID和AccessKey Secret,
# 运行代码示例前必须先配置环境变量,参考文档上面“配置环境变量”步骤
access_key_id = os.environ.get("ALIBABA_CLOUD_ACCESS_KEY_ID")
access_key_secret = os.environ.get("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
# 支持 type 配置 sts/access_key 鉴权. 其中 type 默认为 access_key 鉴权. 使用 sts 可配置 RAM-STS 鉴权.
# 备选参数为: sts 或者 access_key
auth_type = "sts"
# 如果使用 RAM-STS 鉴权, 请配置 security_token, 可使用 阿里云 AssumeRole 获取 相关 STS 鉴权结构.
security_token = "<security_token>"
# 配置请求使用的通用信息.
Configs = Config(endpoint=endpoint, access_key_id=access_key_id, access_key_secret=access_key_secret,
security_token=security_token, type=auth_type, protocol=endpoint_protocol)
# 创建 opensearch 实例
ops = opensearch(Configs)
app_name = "app_name"
# --------------- 行为日志 ---------------
# item_id 信息 为 搜索结果返回的主键信息. 特定 为主键 id.
item_id = "358713"
# ops_request_misc 为搜索请求返回的 ops_request_misc 信息.
ops_request_misc = "%7B%22request%5Fid%22%3A%22161777635816780357273903%22%2C%22scm%22%3A%2220140713.130149759..%22%7D"
# bhv_type 为 行为事件的特征信息, 备选信息如下为类别:
# expose(曝光)
# cart(加购物车)
# collect(收藏)
# like(点赞)
# comment(评论)
# buy(购买)
# click(点击/查看)
bhv_type = "click"
# request_id 为搜索请求返回的 request_id 信息.
request_id = "161777635816780357273903"
# 该数据到达服务端的时间,格式:时间戳,单位:秒
reach_time = "1709708439"
# 用于唯一标识终端应用上的用户的ID。
# *一般为登录用户ID。
# *对于PC端,如果是未登录用户,也可以设置为cookieid
user_id = "a7a0d37c824b659f36a5b9e3b819fcdd"
behavior_fields1 = behavior_fields2 = {
"item_id": item_id,
"sdk_type": "opensearch_sdk",
"sdk_version": "<sdk_version>", # 当前使用的 opensearch sdk 的版本号.(当前pythonsdk版本号:3.2.0)
"trace_id": "ALIBABA", # 用于区分调用了哪个服务商的服务
"trace_info": ops_request_misc,
"bhv_type": bhv_type,
"item_type": "item",
"rn": request_id,
"biz_id": "<biz_id>", # 手机或终端应用用于区分业务的一个数值ID,该字段可用来和OpenSearch上的应用或和AIRec上的实例做关联
"reach_time": reach_time,
"user_id": user_id,
}
behavior_documents = [{"cmd": "add", "fields": behavior_fields1}, {"cmd": "add", "fields": behavior_fields2}]
res6 = ops.behaviorBulk(app_name=app_name, collections_name=app_name, doc_content=behavior_documents)
print(res6)
说明
相关参考:推送数据采集