すべてのプロダクト
Search
ドキュメントセンター

Object Storage Service:OSS SDK for Python 1.0 を使用してアップロードコールバックを設定する

最終更新日:Aug 16, 2025

OSS は、Object Storage Service (OSS) が complete_multipart_upload を使用してマルチパートアップロードタスクを完了した場合、および put_object または put_object_from_file を使用して単純なアップロードタスクを完了した場合に、アプリケーションサーバーにコールバックを送信できます。アップロードコールバックを設定するには、OSS に送信されるアップロードリクエストに関連するコールバックパラメーターを指定するだけです。

使用上の注意

  • このトピックでは、中国 (杭州) リージョンのパブリックエンドポイントが使用されています。OSS と同じリージョン内の他の Alibaba Cloud サービスから OSS にアクセスする場合は、内部エンドポイントを使用します。OSS のリージョンとエンドポイントの詳細については、「OSS のリージョンとエンドポイント」をご参照ください。

  • このトピックでは、OSS エンドポイントを使用して OSSClient インスタンスが作成されます。カスタムドメイン名または Security Token Service (STS) を使用して OSSClient インスタンスを作成する場合は、「初期化」をご参照ください。

単純なアップロードリクエストのアップロードコールバックを設定する

# -*- coding: utf-8 -*-
import json
import base64
import oss2
from oss2.credentials import EnvironmentVariableCredentialsProvider

# 環境変数からアクセス認証情報を取得します。サンプルコードを実行する前に、OSS_ACCESS_KEY_ID および OSS_ACCESS_KEY_SECRET 環境変数が設定されていることを確認してください。
auth = oss2.ProviderAuthV4(EnvironmentVariableCredentialsProvider())

# バケットが配置されているリージョンのエンドポイントを指定します。たとえば、バケットが中国 (杭州) リージョンにある場合は、エンドポイントを https://oss-cn-hangzhou.aliyuncs.com に設定します。
endpoint = "https://oss-cn-hangzhou.aliyuncs.com"

# バケットが配置されているリージョンを指定します。例: cn-hangzhou。このパラメーターは、V4 署名アルゴリズムを使用する場合に必須です。
region = "cn-hangzhou"

# バケットの名前を指定します。
bucket = oss2.Bucket(auth, endpoint, "yourBucketName", region=region)

# コールバックパラメーターを Base64 でエンコードするために使用される関数を指定します。
def encode_callback(callback_params):
    cb_str = json.dumps(callback_params).strip()
    return oss2.compat.to_string(base64.b64encode(oss2.compat.to_bytes(cb_str)))

# アップロードコールバックパラメーターを指定します。
callback_params = {}
# コールバックリクエストを受信するコールバックサーバーのアドレスを指定します。例: http://oss-demo.aliyuncs.com:23450。
callback_params['callbackUrl'] = 'http://oss-demo.aliyuncs.com:23450'
# (オプション) コールバックリクエストヘッダーに含まれる Host フィールドを指定します。
#callback_params['callbackHost'] = 'yourCallbackHost'
# コールバックリクエストに含まれる body フィールドを指定します。プレースホルダーを使用してオブジェクト情報を渡します。
callback_params['callbackBody'] = 'bucket=${bucket}&object=${object}&size=${size}&my_var_1=${x:my_var1}&my_var_2=${x:my_var2}'
# コールバックリクエストのコンテンツタイプを指定します。
callback_params['callbackBodyType'] = 'application/x-www-form-urlencoded'
encoded_callback = encode_callback(callback_params)
# コールバックリクエストのカスタムパラメーターを指定します。各カスタムパラメーターは、キーと値で構成されます。キーは x: で始まる必要があります。
callback_var_params = {'x:my_var1': 'my_val1', 'x:my_var2': 'my_val2'}
encoded_callback_var = encode_callback(callback_var_params)

# アップロードコールバックパラメーターを指定します。
params = {'x-oss-callback': encoded_callback, 'x-oss-callback-var': encoded_callback_var}
# オブジェクトの完全なパスとアップロードする文字列を指定します。完全なパスにバケット名を含めないでください。
result = bucket.put_object('examplefiles/exampleobject.txt', 'a'*1024*1024, params)

マルチパートアップロードリクエストのアップロードコールバックを設定する

# -*- coding: utf-8 -*-

import json
from oss2.credentials import EnvironmentVariableCredentialsProvider
import oss2

key = 'exampleobject.txt'
content = "Anything you're good at contributes to happiness."

# 環境変数からアクセス認証情報を取得します。サンプルコードを実行する前に、OSS_ACCESS_KEY_ID および OSS_ACCESS_KEY_SECRET 環境変数が設定されていることを確認してください。
auth = oss2.ProviderAuthV4(EnvironmentVariableCredentialsProvider())

# バケットが配置されているリージョンのエンドポイントを指定します。たとえば、バケットが中国 (杭州) リージョンにある場合は、エンドポイントを https://oss-cn-hangzhou.aliyuncs.com に設定します。
endpoint = "https://oss-cn-hangzhou.aliyuncs.com"

# バケットが配置されているリージョンを指定します。例: cn-hangzhou。このパラメーターは、V4 署名アルゴリズムを使用する場合に必須です。
region = "cn-hangzhou"

# コールバックリクエストを受信するコールバックサーバーのアドレスを指定します。例: http://oss-demo.aliyuncs.com:23450。
callback_url = 'http://oss-demo.aliyuncs.com:23450'

# バケットの名前を指定します。
bucket = oss2.Bucket(auth, endpoint, "yourBucketName", region=region)


# コールバックパラメーターを準備します。
callback_dict = {}
callback_dict['callbackUrl'] = callback_url

# (オプション) コールバックリクエストヘッダーに含まれる Host フィールドを指定します。
# callback_dict['callbackHost'] = 'oss-cn-hangzhou.aliyuncs.com'

## コールバックリクエストに含まれる body フィールドを指定します。プレースホルダーを使用してオブジェクト情報を渡します。
callback_dict['callbackBody'] = 'bucket=${bucket}&object=${object}&size=${size}&mimeType=${mimeType}&my_var_1=${x:my_var1}&my_var_2=${x:my_var2}'
# コールバックリクエストのコンテンツタイプを指定します。
callback_dict['callbackBodyType'] = 'application/x-www-form-urlencoded'

callback_var_params = {'x:my_var1': 'my_val1', 'x:my_var2': 'my_val2'}
callback_var_param_json = json.dumps(callback_var_params).strip()
encoded_callback_var = oss2.utils.b64encode_as_string(callback_var_param_json)

# コールバックパラメーターを JSON 形式で指定し、コールバックパラメーターを Base64 でエンコードします。
callback_param = json.dumps(callback_dict).strip()
base64_callback_body = oss2.utils.b64encode_as_string(callback_param)
# エンコードされたコールバックパラメーターをヘッダーの最後に追加し、コールバックパラメーターを OSS に渡します。
headers = {'x-oss-callback': base64_callback_body, 'x-oss-callback-var': encoded_callback_var}


"""
マルチパートアップロードリクエストのアップロードコールバックを設定する
"""

# マルチパートアップロードリクエストのアップロードコールバックを設定します。
# マルチパートアップロードタスクを初期化します。
parts = []
upload_id = bucket.init_multipart_upload(key).upload_id
# パートをアップロードします。
result = bucket.upload_part(key, upload_id, 1, content)
parts.append(oss2.models.PartInfo(1, result.etag, size = len(content), part_crc = result.crc))
# マルチパートアップロードタスクを完了し、アップロードコールバックを送信します。
result = bucket.complete_multipart_upload(key, upload_id, parts, headers)

# マルチパートアップロードタスクとアップロードコールバックリクエストが成功した場合、HTTP ステータスコード 200 が返されます。マルチパートアップロードタスクは完了したが、アップロードコールバックリクエストが失敗した場合、HTTP ステータスコード 203 が返されます。
if result.status == 200:
    print("オブジェクトがアップロードされ、コールバックリクエストは成功しました (HTTP 200)")
elif result.status == 203:
    print("オブジェクトはアップロードされましたが、コールバックリクエストは失敗しました (HTTP 203)")
else:
    print(f "アップロード例外、HTTP ステータスコード: {result.status}")


# オブジェクトがアップロードされているかどうかを確認します。
result = bucket.head_object(key)
assert 'x-oss-hash-crc64ecma' in result.headers

フォームアップロードリクエストのアップロードコールバックを設定する

詳細については、「PostObject」をご参照ください。

# -*- coding: utf-8 -*-
import os
import time
import datetime
import json
import base64
import hmac
import hashlib
import crcmod
import requests


# 次のサンプルコードは、PostObject リクエストのアップロードコールバックを設定する方法の例を示しています。実装は OSS SDK for Python から独立しています。

# AccessKey ID、AccessKey シークレット、エンドポイントなどの OSS 情報を初期化します。
# 環境変数から情報を取得するか、<yourAccessKeyId> などの変数を実際の値に置き換えます。
access_key_id=os.getenv ('OSS_TEST_ACCESS_KEY_ID', '<yourAccessKeyId>')
access_key_secret=os.getenv ('OSS_TEST_ACCESS_KEY_SECRET, '<yourAccessKeySecret>')
bucket_name=os.getenv ('OSS_TEST_BUCKET, '<you bucket>')
endpoint=os.getenv ('OSS_TEST_ENDPOINT', '<yourendpoint>')
# この例では、oss-demo.aliyuncs.com:23450 が使用されています。
call_back_url = "http://oss-demo.aliyuncs.com:23450"
# バケットが配置されているリージョンを指定します。この例では、cn-hangzhou が使用されています。
region = "<yourregion>"


# 前述のパラメーターが正しく指定されていることを確認します。
for param in (access_key_id, access_key_secret, bucket_name, endpoint):
    assert '<'not in param, ' パラメーターを指定してください:' + param

def convert_base64(input):
    return base64.b64encode(input.encode(encoding='utf-8')).decode('utf-8')

def calculate_crc64(data):
    """ ローカルファイルの MD5 ハッシュを計算します。
    : param data: データ
    : ローカルファイルの MD5 ハッシュが返されます。
    """
    _POLY = 0x142F0E1EBA9EA3693
    _XOROUT = 0XFFFFFFFFFFFFFFFF

    crc64 = crcmod.Crc(_POLY, initCrc=0, xorOut=_XOROUT)
    crc64.update(data.encode())

    return crc64.crcValue

def build_gmt_expired_time(expire_time):
    """ リクエストの有効期限を指定する GMT 時間文字列を生成します。
    : param int expire_time: リクエストのタイムアウト期間。単位: 秒。
    :return str: リクエストの有効期限を指定する GMT 時間文字列。
    """
    now = int(time.time())
    expire_syncpoint  = now + expire_time

    expire_gmt = datetime.datetime.fromtimestamp(expire_syncpoint).isoformat()
    expire_gmt += 'Z'

    return expire_gmt

def build_encode_policy(expired_time, condition_list):
    """ ポリシーを生成します。
    :param int expired_time: ポリシーのタイムアウト期間。単位: 秒。
    :param list condition_list: ポリシーの条件。
    """
    policy_dict = {}
    policy_dict['expiration'] = build_gmt_expired_time(expired_time)
    policy_dict['conditions'] = condition_list

    policy = json.dumps(policy_dict).strip()
    policy_encode = base64.b64encode(policy.encode())

    return policy_encode

def build_signature(access_key_secret, date):
    """ 署名を生成します。
    :param str access_key_secret: アクセスキーシークレット
    :return str: 署名のリクエスト。
    """

    signing_key = "aliyun_v4" + access_key_secret
    h1 = hmac.new(signing_key.encode(), date.encode(), hashlib.sha256)
    h1_key = h1.digest()
    h2 = hmac.new(h1_key, region.encode(), hashlib.sha256)
    h2_key = h2.digest()
    h3 = hmac.new(h2_key, product.encode(), hashlib.sha256)
    h3_key = h3.digest()
    h4 = hmac.new(h3_key, "aliyun_v4_request".encode(), hashlib.sha256)
    h4_key = h4.digest()

    h = hmac.new(h4_key, string_to_sign.encode(), hashlib.sha256)
    signature = h.hexdigest()

    return signature

def bulid_callback(cb_url, cb_body, cb_body_type=None, cb_host=None):
    """ コールバック文字列を生成します。
    :param str cb_url: コールバックサーバーの URL を指定します。OSS は、オブジェクトがアップロードされた後にコールバックリクエストを URL に送信します。
    :param str cb_body: コールバックリクエストのコンテンツタイプを指定します。デフォルト値: application/x-www-form-urlencoded。
    :param str cb_body_type: コールバックリクエストのリクエスト本文を指定します。
    :param str cb_host: コールバックリクエストの Host ヘッダーの値を指定します。
    :return str: エンコードされたコールバック文字列。
    """
    callback_dict = {}

    callback_dict['callbackUrl'] = cb_url

    callback_dict['callbackBody'] = cb_body
    if cb_body_type is None:
        callback_dict['callbackBodyType'] = 'application/x-www-form-urlencoded'
    else:
        callback_dict['callbackBodyType'] = cb_body_type

    if cb_host is not None:
        callback_dict['callbackHost'] = cb_host

    callback_param = json.dumps(callback_dict).strip()
    base64_callback = base64.b64encode(callback_param.encode());

    return base64_callback.decode()

def build_post_url(endpoint, bucket_name):
    """ PostObject リクエストの URL を生成します。
    :param str endpoint: エンドポイント
    :param str bucket_name: バケット名
    :return str: PostObject リクエストの URL。
    """
    if endpoint.startswith('http://'):
        return endpoint.replace('http://', 'http://{0}.'.format(bucket_name))
    elif endpoint.startswith('https://'):
        return endpoint.replace('https://', 'https://{0}.'.format(bucket_name))
    else:
        return 'http://{0}.{1}'.format(bucket_name, endpoint)

def build_post_body(field_dict, boundary):
    """ PostObject リクエストの本文を指定します。
    :param dict field_dict: PostObject リクエストのフォームフィールドを指定します。
    :param str boundary: フォームフィールドの境界文字列を指定します。
    :return str: PostObject リクエストのリクエスト本文。
    """
    post_body = ''

    # フォームフィールドをエンコードします。
    for k,v in field_dict.items():
        if k != 'content' and k != 'content-type':
            post_body += '''--{0}\r\nContent-Disposition: form-data; name=\"{1}\"\r\n\r\n{2}\r\n'''.format(boundary, k, v)

    # アップロードするオブジェクトのコンテンツを指定します。フォームの最後のフィールドである必要があります。
    post_body += '''--{0}\r\nContent-Disposition: form-data; name=\"file\"; filename=\"{1}\"\r\nContent-Type: {2}\r\n\r\n{3}'''.format(
        boundary, field_dict['key'], field_dict['content-type'], field_dict['content'])

    # フォームフィールドにターミネーターを追加します。
    post_body += '\r\n--{0}--\r\n'.format(boundary)

    return post_body.encode('utf-8')

def build_post_headers(body_len, boundary, headers=None):
    """ PostObject リクエストのヘッダーを指定します。
    :param str body_len: PostObject リクエスト本文の長さを指定します。
    :param str boundary: フォームフィールドの境界文字列を指定します。
    # headers=dict(), # リクエストヘッダーを指定します。
    """
    headers = headers if headers else {}
    headers['Content-Length'] = str(body_len)
    headers['Content-Type'] = 'multipart/form-data; boundary={0}'.format(boundary)

    return headers

def encode_callback(callback_params):
    cb_str = json.dumps(callback_params).strip()
    return base64.b64encode(cb_str.encode()).decode()

# PostObject リクエストのフォームフィールドを指定します。フォームフィールドは大文字と小文字が区別されます。
field_dict = {}
# オブジェクトの名前を指定します。
field_dict['key'] = '0303/post.txt'
# アクセスキー ID
field_dict['OSSAccessKeyId'] = access_key_id

product = "oss"


utc_time = datetime.datetime.utcnow()
# 有効期間を 3600 秒に設定します。
expiration = '2120-01-01T12:00:00.000Z'
date = utc_time.strftime("%Y%m%d")
policy_map = {
    "expiration": expiration,
    "conditions": [
        {"bucket": bucket_name},
        {"x-oss-signature-version": "OSS4-HMAC-SHA256"},
        {"x-oss-credential": f"{access_key_id}/{date}/{region}/{product}/aliyun_v4_request"},
        {"x-oss-date": utc_time.strftime("%Y%m%dT%H%M%SZ")},
        ["content-length-range", 1, 1024]
    ]
}
policy = json.dumps(policy_map)
print(policy)
string_to_sign = base64.b64encode(policy.encode()).decode()

field_dict['policy'] = string_to_sign

field_dict['x-oss-signature-version'] = "OSS4-HMAC-SHA256"
field_dict['x-oss-credential'] = f"{access_key_id}/{date}/{region}/{product}/aliyun_v4_request"
field_dict['x-oss-date'] = f"{utc_time.strftime('%Y%m%dT%H%M%SZ')}"
# 署名をリクエストします。
field_dict['x-oss-signature'] = build_signature(access_key_secret, date)



# 一時ユーザーのセキュリティトークンを指定します。このパラメーターは、一時ユーザーが一時認証情報を使用する場合に必須です。一時ユーザー以外の場合、このパラメーターを空のままにするか、このパラメーターを指定する必要はありません。
# field_dict['x-oss-security-token'] = ''
# Content-Disposition
field_dict['Content-Disposition'] = 'attachment;filename=download.txt'
# ユーザーメタデータを指定します。
field_dict['x-oss-meta-uuid'] = 'uuid-xxx'
# コールバックパラメーターを指定します。コールバックが不要な場合は、パラメーターを指定しないでください。
field_dict['callback'] = bulid_callback(call_back_url,
                                        'bucket=${bucket}&object=${object}&size=${size}&mimeType=${mimeType}&my_var_1=${x:my_var1}&my_var_2=${x:my_var2}',
                                        'application/x-www-form-urlencoded')
# コールバックのカスタム変数を指定します。コールバックが不要な場合は、パラメーターを指定しないでください。
field_dict['x:my_var1'] = 'value1'
field_dict['x:my_var2'] = 'value2'

# オブジェクトをアップロードします。
# with open("", r) as f:
#     content = f.read()
# field_dict['content'] = content

# オブジェクトコンテンツをアップロードします。
field_dict['content'] = 'a'*64
# アップロードされたオブジェクトのタイプを指定します。
field_dict['content-type'] = 'text/plain'

# フォームによってランダムに生成される境界文字列を指定します。
boundary = '9431149156168'

# PostObject リクエストを送信します。
body = build_post_body(field_dict, boundary)
headers = build_post_headers(len(body), boundary)

resp = requests.post(build_post_url(endpoint, bucket_name),
                     data=body,
                     headers=headers)

# リクエスト結果を確認します。
print(resp.status_code)
assert resp.status_code == 200
assert resp.headers['x-oss-hash-crc64ecma'] == str(calculate_crc64(field_dict['content']))

関連情報

  • アップロードコールバックの完全なサンプルコードについては、GitHub をご覧ください。

  • アップロードコールバックを設定するために呼び出すことができる API 操作の詳細については、「コールバック」をご参照ください。