OSS は、Object Storage Service (OSS) が complete_multipart_upload を使用してマルチパートアップロードタスクを完了した場合、および put_object または put_object_from_file を使用して単純なアップロードタスクを完了した場合に、アプリケーションサーバーにコールバックを送信できます。アップロードコールバックを設定するには、OSS に送信されるアップロードリクエストに関連するコールバックパラメーターを指定するだけです。
使用上の注意
このトピックでは、中国 (杭州) リージョンのパブリックエンドポイントが使用されています。OSS と同じリージョン内の他の Alibaba Cloud サービスから OSS にアクセスする場合は、内部エンドポイントを使用します。OSS のリージョンとエンドポイントの詳細については、「OSS のリージョンとエンドポイント」をご参照ください。
このトピックでは、OSS エンドポイントを使用して OSSClient インスタンスが作成されます。カスタムドメイン名または Security Token Service (STS) を使用して OSSClient インスタンスを作成する場合は、「初期化」をご参照ください。
例
単純なアップロードリクエストのアップロードコールバックを設定する
# -*- coding: utf-8 -*-
import json
import base64
import oss2
from oss2.credentials import EnvironmentVariableCredentialsProvider
# 環境変数からアクセス認証情報を取得します。サンプルコードを実行する前に、OSS_ACCESS_KEY_ID および OSS_ACCESS_KEY_SECRET 環境変数が設定されていることを確認してください。
auth = oss2.ProviderAuthV4(EnvironmentVariableCredentialsProvider())
# バケットが配置されているリージョンのエンドポイントを指定します。たとえば、バケットが中国 (杭州) リージョンにある場合は、エンドポイントを https://oss-cn-hangzhou.aliyuncs.com に設定します。
endpoint = "https://oss-cn-hangzhou.aliyuncs.com"
# バケットが配置されているリージョンを指定します。例: cn-hangzhou。このパラメーターは、V4 署名アルゴリズムを使用する場合に必須です。
region = "cn-hangzhou"
# バケットの名前を指定します。
bucket = oss2.Bucket(auth, endpoint, "yourBucketName", region=region)
# コールバックパラメーターを Base64 でエンコードするために使用される関数を指定します。
def encode_callback(callback_params):
cb_str = json.dumps(callback_params).strip()
return oss2.compat.to_string(base64.b64encode(oss2.compat.to_bytes(cb_str)))
# アップロードコールバックパラメーターを指定します。
callback_params = {}
# コールバックリクエストを受信するコールバックサーバーのアドレスを指定します。例: http://oss-demo.aliyuncs.com:23450。
callback_params['callbackUrl'] = 'http://oss-demo.aliyuncs.com:23450'
# (オプション) コールバックリクエストヘッダーに含まれる Host フィールドを指定します。
#callback_params['callbackHost'] = 'yourCallbackHost'
# コールバックリクエストに含まれる body フィールドを指定します。プレースホルダーを使用してオブジェクト情報を渡します。
callback_params['callbackBody'] = 'bucket=${bucket}&object=${object}&size=${size}&my_var_1=${x:my_var1}&my_var_2=${x:my_var2}'
# コールバックリクエストのコンテンツタイプを指定します。
callback_params['callbackBodyType'] = 'application/x-www-form-urlencoded'
encoded_callback = encode_callback(callback_params)
# コールバックリクエストのカスタムパラメーターを指定します。各カスタムパラメーターは、キーと値で構成されます。キーは x: で始まる必要があります。
callback_var_params = {'x:my_var1': 'my_val1', 'x:my_var2': 'my_val2'}
encoded_callback_var = encode_callback(callback_var_params)
# アップロードコールバックパラメーターを指定します。
params = {'x-oss-callback': encoded_callback, 'x-oss-callback-var': encoded_callback_var}
# オブジェクトの完全なパスとアップロードする文字列を指定します。完全なパスにバケット名を含めないでください。
result = bucket.put_object('examplefiles/exampleobject.txt', 'a'*1024*1024, params)マルチパートアップロードリクエストのアップロードコールバックを設定する
# -*- coding: utf-8 -*-
import json
from oss2.credentials import EnvironmentVariableCredentialsProvider
import oss2
key = 'exampleobject.txt'
content = "Anything you're good at contributes to happiness."
# 環境変数からアクセス認証情報を取得します。サンプルコードを実行する前に、OSS_ACCESS_KEY_ID および OSS_ACCESS_KEY_SECRET 環境変数が設定されていることを確認してください。
auth = oss2.ProviderAuthV4(EnvironmentVariableCredentialsProvider())
# バケットが配置されているリージョンのエンドポイントを指定します。たとえば、バケットが中国 (杭州) リージョンにある場合は、エンドポイントを https://oss-cn-hangzhou.aliyuncs.com に設定します。
endpoint = "https://oss-cn-hangzhou.aliyuncs.com"
# バケットが配置されているリージョンを指定します。例: cn-hangzhou。このパラメーターは、V4 署名アルゴリズムを使用する場合に必須です。
region = "cn-hangzhou"
# コールバックリクエストを受信するコールバックサーバーのアドレスを指定します。例: http://oss-demo.aliyuncs.com:23450。
callback_url = 'http://oss-demo.aliyuncs.com:23450'
# バケットの名前を指定します。
bucket = oss2.Bucket(auth, endpoint, "yourBucketName", region=region)
# コールバックパラメーターを準備します。
callback_dict = {}
callback_dict['callbackUrl'] = callback_url
# (オプション) コールバックリクエストヘッダーに含まれる Host フィールドを指定します。
# callback_dict['callbackHost'] = 'oss-cn-hangzhou.aliyuncs.com'
## コールバックリクエストに含まれる body フィールドを指定します。プレースホルダーを使用してオブジェクト情報を渡します。
callback_dict['callbackBody'] = 'bucket=${bucket}&object=${object}&size=${size}&mimeType=${mimeType}&my_var_1=${x:my_var1}&my_var_2=${x:my_var2}'
# コールバックリクエストのコンテンツタイプを指定します。
callback_dict['callbackBodyType'] = 'application/x-www-form-urlencoded'
callback_var_params = {'x:my_var1': 'my_val1', 'x:my_var2': 'my_val2'}
callback_var_param_json = json.dumps(callback_var_params).strip()
encoded_callback_var = oss2.utils.b64encode_as_string(callback_var_param_json)
# コールバックパラメーターを JSON 形式で指定し、コールバックパラメーターを Base64 でエンコードします。
callback_param = json.dumps(callback_dict).strip()
base64_callback_body = oss2.utils.b64encode_as_string(callback_param)
# エンコードされたコールバックパラメーターをヘッダーの最後に追加し、コールバックパラメーターを OSS に渡します。
headers = {'x-oss-callback': base64_callback_body, 'x-oss-callback-var': encoded_callback_var}
"""
マルチパートアップロードリクエストのアップロードコールバックを設定する
"""
# マルチパートアップロードリクエストのアップロードコールバックを設定します。
# マルチパートアップロードタスクを初期化します。
parts = []
upload_id = bucket.init_multipart_upload(key).upload_id
# パートをアップロードします。
result = bucket.upload_part(key, upload_id, 1, content)
parts.append(oss2.models.PartInfo(1, result.etag, size = len(content), part_crc = result.crc))
# マルチパートアップロードタスクを完了し、アップロードコールバックを送信します。
result = bucket.complete_multipart_upload(key, upload_id, parts, headers)
# マルチパートアップロードタスクとアップロードコールバックリクエストが成功した場合、HTTP ステータスコード 200 が返されます。マルチパートアップロードタスクは完了したが、アップロードコールバックリクエストが失敗した場合、HTTP ステータスコード 203 が返されます。
if result.status == 200:
print("オブジェクトがアップロードされ、コールバックリクエストは成功しました (HTTP 200)")
elif result.status == 203:
print("オブジェクトはアップロードされましたが、コールバックリクエストは失敗しました (HTTP 203)")
else:
print(f "アップロード例外、HTTP ステータスコード: {result.status}")
# オブジェクトがアップロードされているかどうかを確認します。
result = bucket.head_object(key)
assert 'x-oss-hash-crc64ecma' in result.headersフォームアップロードリクエストのアップロードコールバックを設定する
詳細については、「PostObject」をご参照ください。
# -*- coding: utf-8 -*-
import os
import time
import datetime
import json
import base64
import hmac
import hashlib
import crcmod
import requests
# 次のサンプルコードは、PostObject リクエストのアップロードコールバックを設定する方法の例を示しています。実装は OSS SDK for Python から独立しています。
# AccessKey ID、AccessKey シークレット、エンドポイントなどの OSS 情報を初期化します。
# 環境変数から情報を取得するか、<yourAccessKeyId> などの変数を実際の値に置き換えます。
access_key_id=os.getenv ('OSS_TEST_ACCESS_KEY_ID', '<yourAccessKeyId>')
access_key_secret=os.getenv ('OSS_TEST_ACCESS_KEY_SECRET, '<yourAccessKeySecret>')
bucket_name=os.getenv ('OSS_TEST_BUCKET, '<you bucket>')
endpoint=os.getenv ('OSS_TEST_ENDPOINT', '<yourendpoint>')
# この例では、oss-demo.aliyuncs.com:23450 が使用されています。
call_back_url = "http://oss-demo.aliyuncs.com:23450"
# バケットが配置されているリージョンを指定します。この例では、cn-hangzhou が使用されています。
region = "<yourregion>"
# 前述のパラメーターが正しく指定されていることを確認します。
for param in (access_key_id, access_key_secret, bucket_name, endpoint):
assert '<'not in param, ' パラメーターを指定してください:' + param
def convert_base64(input):
return base64.b64encode(input.encode(encoding='utf-8')).decode('utf-8')
def calculate_crc64(data):
""" ローカルファイルの MD5 ハッシュを計算します。
: param data: データ
: ローカルファイルの MD5 ハッシュが返されます。
"""
_POLY = 0x142F0E1EBA9EA3693
_XOROUT = 0XFFFFFFFFFFFFFFFF
crc64 = crcmod.Crc(_POLY, initCrc=0, xorOut=_XOROUT)
crc64.update(data.encode())
return crc64.crcValue
def build_gmt_expired_time(expire_time):
""" リクエストの有効期限を指定する GMT 時間文字列を生成します。
: param int expire_time: リクエストのタイムアウト期間。単位: 秒。
:return str: リクエストの有効期限を指定する GMT 時間文字列。
"""
now = int(time.time())
expire_syncpoint = now + expire_time
expire_gmt = datetime.datetime.fromtimestamp(expire_syncpoint).isoformat()
expire_gmt += 'Z'
return expire_gmt
def build_encode_policy(expired_time, condition_list):
""" ポリシーを生成します。
:param int expired_time: ポリシーのタイムアウト期間。単位: 秒。
:param list condition_list: ポリシーの条件。
"""
policy_dict = {}
policy_dict['expiration'] = build_gmt_expired_time(expired_time)
policy_dict['conditions'] = condition_list
policy = json.dumps(policy_dict).strip()
policy_encode = base64.b64encode(policy.encode())
return policy_encode
def build_signature(access_key_secret, date):
""" 署名を生成します。
:param str access_key_secret: アクセスキーシークレット
:return str: 署名のリクエスト。
"""
signing_key = "aliyun_v4" + access_key_secret
h1 = hmac.new(signing_key.encode(), date.encode(), hashlib.sha256)
h1_key = h1.digest()
h2 = hmac.new(h1_key, region.encode(), hashlib.sha256)
h2_key = h2.digest()
h3 = hmac.new(h2_key, product.encode(), hashlib.sha256)
h3_key = h3.digest()
h4 = hmac.new(h3_key, "aliyun_v4_request".encode(), hashlib.sha256)
h4_key = h4.digest()
h = hmac.new(h4_key, string_to_sign.encode(), hashlib.sha256)
signature = h.hexdigest()
return signature
def bulid_callback(cb_url, cb_body, cb_body_type=None, cb_host=None):
""" コールバック文字列を生成します。
:param str cb_url: コールバックサーバーの URL を指定します。OSS は、オブジェクトがアップロードされた後にコールバックリクエストを URL に送信します。
:param str cb_body: コールバックリクエストのコンテンツタイプを指定します。デフォルト値: application/x-www-form-urlencoded。
:param str cb_body_type: コールバックリクエストのリクエスト本文を指定します。
:param str cb_host: コールバックリクエストの Host ヘッダーの値を指定します。
:return str: エンコードされたコールバック文字列。
"""
callback_dict = {}
callback_dict['callbackUrl'] = cb_url
callback_dict['callbackBody'] = cb_body
if cb_body_type is None:
callback_dict['callbackBodyType'] = 'application/x-www-form-urlencoded'
else:
callback_dict['callbackBodyType'] = cb_body_type
if cb_host is not None:
callback_dict['callbackHost'] = cb_host
callback_param = json.dumps(callback_dict).strip()
base64_callback = base64.b64encode(callback_param.encode());
return base64_callback.decode()
def build_post_url(endpoint, bucket_name):
""" PostObject リクエストの URL を生成します。
:param str endpoint: エンドポイント
:param str bucket_name: バケット名
:return str: PostObject リクエストの URL。
"""
if endpoint.startswith('http://'):
return endpoint.replace('http://', 'http://{0}.'.format(bucket_name))
elif endpoint.startswith('https://'):
return endpoint.replace('https://', 'https://{0}.'.format(bucket_name))
else:
return 'http://{0}.{1}'.format(bucket_name, endpoint)
def build_post_body(field_dict, boundary):
""" PostObject リクエストの本文を指定します。
:param dict field_dict: PostObject リクエストのフォームフィールドを指定します。
:param str boundary: フォームフィールドの境界文字列を指定します。
:return str: PostObject リクエストのリクエスト本文。
"""
post_body = ''
# フォームフィールドをエンコードします。
for k,v in field_dict.items():
if k != 'content' and k != 'content-type':
post_body += '''--{0}\r\nContent-Disposition: form-data; name=\"{1}\"\r\n\r\n{2}\r\n'''.format(boundary, k, v)
# アップロードするオブジェクトのコンテンツを指定します。フォームの最後のフィールドである必要があります。
post_body += '''--{0}\r\nContent-Disposition: form-data; name=\"file\"; filename=\"{1}\"\r\nContent-Type: {2}\r\n\r\n{3}'''.format(
boundary, field_dict['key'], field_dict['content-type'], field_dict['content'])
# フォームフィールドにターミネーターを追加します。
post_body += '\r\n--{0}--\r\n'.format(boundary)
return post_body.encode('utf-8')
def build_post_headers(body_len, boundary, headers=None):
""" PostObject リクエストのヘッダーを指定します。
:param str body_len: PostObject リクエスト本文の長さを指定します。
:param str boundary: フォームフィールドの境界文字列を指定します。
# headers=dict(), # リクエストヘッダーを指定します。
"""
headers = headers if headers else {}
headers['Content-Length'] = str(body_len)
headers['Content-Type'] = 'multipart/form-data; boundary={0}'.format(boundary)
return headers
def encode_callback(callback_params):
cb_str = json.dumps(callback_params).strip()
return base64.b64encode(cb_str.encode()).decode()
# PostObject リクエストのフォームフィールドを指定します。フォームフィールドは大文字と小文字が区別されます。
field_dict = {}
# オブジェクトの名前を指定します。
field_dict['key'] = '0303/post.txt'
# アクセスキー ID
field_dict['OSSAccessKeyId'] = access_key_id
product = "oss"
utc_time = datetime.datetime.utcnow()
# 有効期間を 3600 秒に設定します。
expiration = '2120-01-01T12:00:00.000Z'
date = utc_time.strftime("%Y%m%d")
policy_map = {
"expiration": expiration,
"conditions": [
{"bucket": bucket_name},
{"x-oss-signature-version": "OSS4-HMAC-SHA256"},
{"x-oss-credential": f"{access_key_id}/{date}/{region}/{product}/aliyun_v4_request"},
{"x-oss-date": utc_time.strftime("%Y%m%dT%H%M%SZ")},
["content-length-range", 1, 1024]
]
}
policy = json.dumps(policy_map)
print(policy)
string_to_sign = base64.b64encode(policy.encode()).decode()
field_dict['policy'] = string_to_sign
field_dict['x-oss-signature-version'] = "OSS4-HMAC-SHA256"
field_dict['x-oss-credential'] = f"{access_key_id}/{date}/{region}/{product}/aliyun_v4_request"
field_dict['x-oss-date'] = f"{utc_time.strftime('%Y%m%dT%H%M%SZ')}"
# 署名をリクエストします。
field_dict['x-oss-signature'] = build_signature(access_key_secret, date)
# 一時ユーザーのセキュリティトークンを指定します。このパラメーターは、一時ユーザーが一時認証情報を使用する場合に必須です。一時ユーザー以外の場合、このパラメーターを空のままにするか、このパラメーターを指定する必要はありません。
# field_dict['x-oss-security-token'] = ''
# Content-Disposition
field_dict['Content-Disposition'] = 'attachment;filename=download.txt'
# ユーザーメタデータを指定します。
field_dict['x-oss-meta-uuid'] = 'uuid-xxx'
# コールバックパラメーターを指定します。コールバックが不要な場合は、パラメーターを指定しないでください。
field_dict['callback'] = bulid_callback(call_back_url,
'bucket=${bucket}&object=${object}&size=${size}&mimeType=${mimeType}&my_var_1=${x:my_var1}&my_var_2=${x:my_var2}',
'application/x-www-form-urlencoded')
# コールバックのカスタム変数を指定します。コールバックが不要な場合は、パラメーターを指定しないでください。
field_dict['x:my_var1'] = 'value1'
field_dict['x:my_var2'] = 'value2'
# オブジェクトをアップロードします。
# with open("", r) as f:
# content = f.read()
# field_dict['content'] = content
# オブジェクトコンテンツをアップロードします。
field_dict['content'] = 'a'*64
# アップロードされたオブジェクトのタイプを指定します。
field_dict['content-type'] = 'text/plain'
# フォームによってランダムに生成される境界文字列を指定します。
boundary = '9431149156168'
# PostObject リクエストを送信します。
body = build_post_body(field_dict, boundary)
headers = build_post_headers(len(body), boundary)
resp = requests.post(build_post_url(endpoint, bucket_name),
data=body,
headers=headers)
# リクエスト結果を確認します。
print(resp.status_code)
assert resp.status_code == 200
assert resp.headers['x-oss-hash-crc64ecma'] == str(calculate_crc64(field_dict['content']))