OpenSearch uses an in-house gateway service for signature verification and relies on Alibaba Cloud TeaDSL SDK for signature construction. Before you use OpenSearch SDK for Python, you must add the following dependencies:
pip install alibabacloud_tea_util
pip install alibabacloud_opensearch_util
pip install alibabacloud_credentials
Sample request: The sample code is contained in the BaseRequest.py file.
# -*- coding: utf-8 -*-
import time
from typing import Dict, Any
from Tea.core import TeaCore
from Tea.exceptions import TeaException, UnretryableException
from Tea.model import TeaModel
from Tea.request import TeaRequest
from alibabacloud_credentials import models as credential_models
from alibabacloud_credentials.client import Client as CredentialClient
from alibabacloud_opensearch_util.opensearch_util import OpensearchUtil
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
class Config(TeaModel):
"""
Config
The environment-related configurations.
"""
def __init__(
self,
endpoint: str = None,
protocol: str = None,
type: str = None,
security_token: str = None,
access_key_id: str = None,
access_key_secret: str = None,
user_agent: str = "",
):
self.endpoint = endpoint
self.protocol = protocol
self.type = type
self.security_token = security_token
self.access_key_id = access_key_id
self.access_key_secret = access_key_secret
self.user_agent = user_agent
class Client:
"""
OpensearchClient
The OpenSearch client that is used to specify request parameters and send the request.
"""
_endpoint: str = None
_protocol: str = None
_user_agent: str = None
_credential: CredentialClient = None
def __init__(
self,
config: Config,
):
if UtilClient.is_unset(config):
raise TeaException({
'name': 'ParameterMissing',
'message': "'config' can not be unset"
})
if UtilClient.empty(config.type):
config.type = 'access_key'
credential_config = credential_models.Config(
access_key_id=config.access_key_id,
type=config.type,
access_key_secret=config.access_key_secret,
security_token=config.security_token
)
self._credential = CredentialClient(credential_config)
self._endpoint = config.endpoint
self._protocol = config.protocol
self._user_agent = config.user_agent
def _request(
self,
method: str,
pathname: str,
query: Dict[str, Any],
headers: Dict[str, str],
body: Any,
runtime: util_models.RuntimeOptions,
) -> Dict[str, Any]:
"""
Process the request.
:param request: TeaRequest
:param runtime: util_models.RuntimeOptions
:return: Dict[str, Any]
"""
runtime.validate()
_runtime = {
'timeouted': 'retry',
'readTimeout': runtime.read_timeout,
'connectTimeout': runtime.connect_timeout,
'httpProxy': runtime.http_proxy,
'httpsProxy': runtime.https_proxy,
'noProxy': runtime.no_proxy,
'maxIdleConns': runtime.max_idle_conns,
'retry': {
'retryable': runtime.autoretry,
'maxAttempts': UtilClient.default_number(runtime.max_attempts, 3)
},
'backoff': {
'policy': UtilClient.default_string(runtime.backoff_policy, 'no'),
'period': UtilClient.default_number(runtime.backoff_period, 1)
},
'ignoreSSL': runtime.ignore_ssl
}
_last_request = None
_last_exception = None
_now = time.time()
_retry_times = 0
while TeaCore.allow_retry(_runtime.get('retry'), _retry_times, _now):
if _retry_times > 0:
_backoff_time = TeaCore.get_backoff_time(_runtime.get('backoff'), _retry_times)
if _backoff_time > 0:
TeaCore.sleep(_backoff_time)
_retry_times = _retry_times + 1
try:
_request = TeaRequest()
accesskey_id = self._credential.get_access_key_id()
access_key_secret = self._credential.get_access_key_secret()
security_token = self._credential.get_security_token()
_request.protocol = UtilClient.default_string(self._protocol, 'HTTP')
_request.method = method
_request.pathname = pathname
_request.headers = TeaCore.merge({
'user-agent': UtilClient.get_user_agent(self._user_agent),
'Content-Type': 'application/json',
'Date': OpensearchUtil.get_date(),
'host': UtilClient.default_string(self._endpoint, f'opensearch-cn-hangzhou.aliyuncs.com'),
'X-Opensearch-Nonce': UtilClient.get_nonce()
}, headers)
if not UtilClient.is_unset(query):
_request.query = UtilClient.stringify_map_value(query)
if not UtilClient.is_unset(body):
req_body = UtilClient.to_jsonstring(body)
_request.headers['Content-MD5'] = OpensearchUtil.get_content_md5(req_body)
_request.body = req_body
if not UtilClient.is_unset(security_token):
_request.headers["X-Opensearch-Security-Token"] = security_token
_request.headers['Authorization'] = OpensearchUtil.get_signature(_request, accesskey_id,
access_key_secret)
_last_request = _request
_response = TeaCore.do_action(_request, _runtime)
obj_str = UtilClient.read_as_string(_response.body)
if UtilClient.is_4xx(_response.status_code) or UtilClient.is_5xx(_response.status_code):
raise TeaException({
'message': _response.status_message,
'data': obj_str,
'code': _response.status_code
})
obj = UtilClient.parse_json(obj_str)
res = UtilClient.assert_as_map(obj)
return {
'body': res,
'headers': _response.headers
}
except TeaException as e:
if TeaCore.is_retryable(e):
_last_exception = e
continue
raise e
raise UnretryableException(_last_request, _last_exception)