全部产品
Search
文档中心

对象存储 OSS:Python上传回调

更新时间:Mar 12, 2025

OSS在完成简单上传(put_object和put_object_from_file)以及分片上传(complete_multipart_upload)时可以提供回调(Callback)给应用服务器。您只需要在发送给OSS的请求中携带相应的Callback参数,即可实现回调。

注意事项

  • 本文以华东1(杭州)外网Endpoint为例。如果您希望通过与OSS同地域的其他阿里云产品访问OSS,请使用内网Endpoint。关于OSS支持的Region与Endpoint的对应关系,请参见OSS地域和访问域名

  • 本文以OSS域名新建OSSClient为例。如果您希望通过自定义域名、STS等方式新建OSSClient,请参见初始化

示例代码

简单上传回调

# -*- coding: utf-8 -*-
import json
import base64
import oss2
from oss2.credentials import EnvironmentVariableCredentialsProvider

# 从环境变量中获取访问凭证。运行本代码示例之前,请确保已设置环境变量OSS_ACCESS_KEY_ID和OSS_ACCESS_KEY_SECRET。
auth = oss2.ProviderAuthV4(EnvironmentVariableCredentialsProvider())

# 填写Bucket所在地域对应的Endpoint。以华东1(杭州)为例,Endpoint填写为https://oss-cn-hangzhou.aliyuncs.com。
endpoint = "https://oss-cn-hangzhou.aliyuncs.com"

# 填写Endpoint对应的Region信息,例如cn-hangzhou。注意,v4签名下,必须填写该参数
region = "cn-hangzhou"

# yourBucketName填写存储空间名称。
bucket = oss2.Bucket(auth, endpoint, "yourBucketName", region=region)

# 定义回调参数Base64编码函数。
def encode_callback(callback_params):
    cb_str = json.dumps(callback_params).strip()
    return oss2.compat.to_string(base64.b64encode(oss2.compat.to_bytes(cb_str)))

# 设置上传回调参数。
callback_params = {}
# 设置回调请求的服务器地址,例如http://oss-demo.aliyuncs.com:23450。
callback_params['callbackUrl'] = 'http://oss-demo.aliyuncs.com:23450'
#(可选)设置回调请求消息头中Host的值,即您的服务器配置Host的值。
#callback_params['callbackHost'] = 'yourCallbackHost'
# 指定回调请求的 Body 内容,使用占位符动态传递对象信息
callback_params['callbackBody'] = 'bucket=${bucket}&object=${object}&size=${size}&my_var_1=${x:my_var1}&my_var_2=${x:my_var2}'
# 指定回调请求的 Content-Type
callback_params['callbackBodyType'] = 'application/x-www-form-urlencoded'
encoded_callback = encode_callback(callback_params)
# 设置发起回调请求的自定义参数,由Key和Value组成,Key必须以x:开始。
callback_var_params = {'x:my_var1': 'my_val1', 'x:my_var2': 'my_val2'}
encoded_callback_var = encode_callback(callback_var_params)

# 上传回调。
params = {'x-oss-callback': encoded_callback, 'x-oss-callback-var': encoded_callback_var}
# 填写Object完整路径和字符串。Object完整路径中不能包含Bucket名称。
result = bucket.put_object('examplefiles/exampleobject.txt', 'a'*1024*1024, params)

分片上传回调

# -*- coding: utf-8 -*-

import json
from oss2.credentials import EnvironmentVariableCredentialsProvider
import oss2

key = 'exampleobject.txt'
content = "Anything you're good at contributes to happiness."

# 从环境变量中获取访问凭证。运行本代码示例之前,请确保已设置环境变量OSS_ACCESS_KEY_ID和OSS_ACCESS_KEY_SECRET。
auth = oss2.ProviderAuthV4(EnvironmentVariableCredentialsProvider())

# 填写Bucket所在地域对应的Endpoint。以华东1(杭州)为例,Endpoint填写为https://oss-cn-hangzhou.aliyuncs.com。
endpoint = "https://oss-cn-hangzhou.aliyuncs.com"

# 填写Endpoint对应的Region信息,例如cn-hangzhou。注意,v4签名下,必须填写该参数
region = "cn-hangzhou"

# 设置回调请求的服务器地址,例如http://oss-demo.aliyuncs.com:23450。
callback_url = 'http://oss-demo.aliyuncs.com:23450'

# yourBucketName填写存储空间名称。
bucket = oss2.Bucket(auth, endpoint, "yourBucketName", region=region)


# 准备回调参数,更详细的信息请参考 https://help.aliyun.com/document_detail/31989.html
callback_dict = {}
callback_dict['callbackUrl'] = callback_url

#(可选)设置回调请求消息头中Host的值,即您的服务器配置Host的值。
# callback_dict['callbackHost'] = 'oss-cn-hangzhou.aliyuncs.com'

## 指定回调请求的 Body 内容,使用占位符动态传递对象信息
callback_dict['callbackBody'] = 'bucket=${bucket}&object=${object}&size=${size}&mimeType=${mimeType}&my_var_1=${x:my_var1}&my_var_2=${x:my_var2}'
# 指定回调请求的 Content-Type
callback_dict['callbackBodyType'] = 'application/x-www-form-urlencoded'

callback_var_params = {'x:my_var1': 'my_val1', 'x:my_var2': 'my_val2'}
callback_var_param_json = json.dumps(callback_var_params).strip()
encoded_callback_var = oss2.utils.b64encode_as_string(callback_var_param_json)

# 回调参数是json格式,并且base64编码
callback_param = json.dumps(callback_dict).strip()
base64_callback_body = oss2.utils.b64encode_as_string(callback_param)
# 回调参数编码后放在header中传给oss
headers = {'x-oss-callback': base64_callback_body, 'x-oss-callback-var': encoded_callback_var}


"""
分片上传回调
"""

# 分片上传回调
# 初始化上传任务
parts = []
upload_id = bucket.init_multipart_upload(key).upload_id
# 上传分片
result = bucket.upload_part(key, upload_id, 1, content)
parts.append(oss2.models.PartInfo(1, result.etag, size = len(content), part_crc = result.crc))
# 完成上传并回调
result = bucket.complete_multipart_upload(key, upload_id, parts, headers)

# 上传并回调成功status为200,上传成功回调失败status为203
if result.status == 200:
    print("文件上传成功,回调成功 (HTTP 200)")
elif result.status == 203:
    print("文件上传成功,但回调失败 (HTTP 203)")
else:
    print(f"上传异常,状态码: {result.status}")


# 确认文件上传成功
result = bucket.head_object(key)
assert 'x-oss-hash-crc64ecma' in result.headers

表单上传回调

# -*- coding: utf-8 -*-
import os
import time
import datetime
import json
import base64
import hmac
import hashlib
import crcmod
import requests


# 以下代码展示了PostObject的用法。PostObject不依赖于OSS Python SDK。

# PostObject的官网 https://help.aliyun.com/document_detail/31988.html
# PostObject错误及排查 https://yq.aliyun.com/articles/58524

# 首先初始化AccessKeyId、AccessKeySecret、Endpoint等信息。
# 通过环境变量获取,或者把诸如“<你的AccessKeyId>”替换成真实的AccessKeyId等。
access_key_id = os.getenv('OSS_TEST_ACCESS_KEY_ID', '<你的AccessKeyId>')
access_key_secret = os.getenv('OSS_TEST_ACCESS_KEY_SECRET', '<你的AccessKeySecret>')
bucket_name = os.getenv('OSS_TEST_BUCKET', '<你的Bucket>')
endpoint = os.getenv('OSS_TEST_ENDPOINT', '<你的访问域名>')
# 这里以oss-demo.aliyuncs.com:23450举例
call_back_url = "http://oss-demo.aliyuncs.com:23450"
# 例如cn-hangzhou
region = "<你的region>"


# 确认上面的参数都填写正确了
for param in (access_key_id, access_key_secret, bucket_name, endpoint):
    assert '<' not in param, '请设置参数:' + param

def convert_base64(input):
    return base64.b64encode(input.encode(encoding='utf-8')).decode('utf-8')

def calculate_crc64(data):
    """计算文件的MD5
    :param data: 数据
    :return 数据的MD5值
    """
    _POLY = 0x142F0E1EBA9EA3693
    _XOROUT = 0XFFFFFFFFFFFFFFFF

    crc64 = crcmod.Crc(_POLY, initCrc=0, xorOut=_XOROUT)
    crc64.update(data.encode())

    return crc64.crcValue

def build_gmt_expired_time(expire_time):
    """生成GMT格式的请求超时时间
    :param int expire_time: 超时时间,单位秒
    :return str GMT格式的超时时间
    """
    now = int(time.time())
    expire_syncpoint  = now + expire_time

    expire_gmt = datetime.datetime.fromtimestamp(expire_syncpoint).isoformat()
    expire_gmt += 'Z'

    return expire_gmt

def build_encode_policy(expired_time, condition_list):
    """生成policy
    :param int expired_time: 超时时间,单位秒
    :param list condition_list: 限制条件列表
    """
    policy_dict = {}
    policy_dict['expiration'] = build_gmt_expired_time(expired_time)
    policy_dict['conditions'] = condition_list

    policy = json.dumps(policy_dict).strip()
    policy_encode = base64.b64encode(policy.encode())

    return policy_encode

def build_signature(access_key_secret, date):
    """生成签名
    :param str access_key_secret: access key secret
    :return str 请求签名
    """

    signing_key = "aliyun_v4" + access_key_secret
    h1 = hmac.new(signing_key.encode(), date.encode(), hashlib.sha256)
    h1_key = h1.digest()
    h2 = hmac.new(h1_key, region.encode(), hashlib.sha256)
    h2_key = h2.digest()
    h3 = hmac.new(h2_key, product.encode(), hashlib.sha256)
    h3_key = h3.digest()
    h4 = hmac.new(h3_key, "aliyun_v4_request".encode(), hashlib.sha256)
    h4_key = h4.digest()

    h = hmac.new(h4_key, string_to_sign.encode(), hashlib.sha256)
    signature = h.hexdigest()

    return signature

def bulid_callback(cb_url, cb_body, cb_body_type=None, cb_host=None):
    """生成callback字符串
    :param str cb_url: 回调服务器地址,文件上传成功后OSS向此url发送回调请求
    :param str cb_body: 发起回调请求的Content-Type,默认application/x-www-form-urlencoded
    :param str cb_body_type: 发起回调时请求body
    :param str cb_host: 发起回调请求时Host头的值
    :return str 编码后的Callback
    """
    callback_dict = {}

    callback_dict['callbackUrl'] = cb_url

    callback_dict['callbackBody'] = cb_body
    if cb_body_type is None:
        callback_dict['callbackBodyType'] = 'application/x-www-form-urlencoded'
    else:
        callback_dict['callbackBodyType'] = cb_body_type

    if cb_host is not None:
        callback_dict['callbackHost'] = cb_host

    callback_param = json.dumps(callback_dict).strip()
    base64_callback = base64.b64encode(callback_param.encode());

    return base64_callback.decode()

def build_post_url(endpoint, bucket_name):
    """生成POST请求URL
    :param str endpoint: endpoint
    :param str bucket_name: bucket name
    :return str POST请求URL
    """
    if endpoint.startswith('http://'):
        return endpoint.replace('http://', 'http://{0}.'.format(bucket_name))
    elif endpoint.startswith('https://'):
        return endpoint.replace('https://', 'https://{0}.'.format(bucket_name))
    else:
        return 'http://{0}.{1}'.format(bucket_name, endpoint)

def build_post_body(field_dict, boundary):
    """生成POST请求Body
    :param dict field_dict: POST请求表单域
    :param str boundary: 表单域的边界字符串
    :return str POST请求Body
    """
    post_body = ''

    # 编码表单域
    for k,v in field_dict.items():
        if k != 'content' and k != 'content-type':
            post_body += '''--{0}\r\nContent-Disposition: form-data; name=\"{1}\"\r\n\r\n{2}\r\n'''.format(boundary, k, v)

    # 上传文件的内容,必须作为最后一个表单域
    post_body += '''--{0}\r\nContent-Disposition: form-data; name=\"file\"; filename=\"{1}\"\r\nContent-Type: {2}\r\n\r\n{3}'''.format(
        boundary, field_dict['key'], field_dict['content-type'], field_dict['content'])

    # 加上表单域结束符
    post_body += '\r\n--{0}--\r\n'.format(boundary)

    return post_body.encode('utf-8')

def build_post_headers(body_len, boundary, headers=None):
    """生气POST请求Header
    :param str body_len: POST请求Body长度
    :param str boundary: 表单域的边界字符串
    :param dict 请求Header
    """
    headers = headers if headers else {}
    headers['Content-Length'] = str(body_len)
    headers['Content-Type'] = 'multipart/form-data; boundary={0}'.format(boundary)

    return headers

def encode_callback(callback_params):
    cb_str = json.dumps(callback_params).strip()
    return base64.b64encode(cb_str.encode()).decode()

# POST请求表单域,注意大小写
field_dict = {}
# object名称
field_dict['key'] = '0303/post.txt'
# access key id
field_dict['OSSAccessKeyId'] = access_key_id

product = "oss"


utc_time = datetime.datetime.utcnow()
# 过期时间设置3600秒
expiration = '2120-01-01T12:00:00.000Z'
date = utc_time.strftime("%Y%m%d")
policy_map = {
    "expiration": expiration,
    "conditions": [
        {"bucket": bucket_name},
        {"x-oss-signature-version": "OSS4-HMAC-SHA256"},
        {"x-oss-credential": f"{access_key_id}/{date}/{region}/{product}/aliyun_v4_request"},
        {"x-oss-date": utc_time.strftime("%Y%m%dT%H%M%SZ")},
        ["content-length-range", 1, 1024]
    ]
}
policy = json.dumps(policy_map)
print(policy)
string_to_sign = base64.b64encode(policy.encode()).decode()

field_dict['policy'] = string_to_sign

field_dict['x-oss-signature-version'] = "OSS4-HMAC-SHA256"
field_dict['x-oss-credential'] = f"{access_key_id}/{date}/{region}/{product}/aliyun_v4_request"
field_dict['x-oss-date'] = f"{utc_time.strftime('%Y%m%dT%H%M%SZ')}"
# 请求签名
field_dict['x-oss-signature'] = build_signature(access_key_secret, date)



# 临时用户Token,当使用临时用户密钥时Token必填;非临时用户填空或不填
# field_dict['x-oss-security-token'] = ''
# Content-Disposition
field_dict['Content-Disposition'] = 'attachment;filename=download.txt'
# 用户自定义meta
field_dict['x-oss-meta-uuid'] = 'uuid-xxx'
# callback,没有回调需求不填该域
field_dict['callback'] = bulid_callback(call_back_url,
                                        'bucket=${bucket}&object=${object}&size=${size}&mimeType=${mimeType}&my_var_1=${x:my_var1}&my_var_2=${x:my_var2}',
                                        'application/x-www-form-urlencoded')
# callback中的自定义变量,没有回调不填该域
field_dict['x:my_var1'] = 'value1'
field_dict['x:my_var2'] = 'value2'

# 如果上传文件
# with open("", r) as f:
#     content = f.read()
# field_dict['content'] = content

# 上传文件内容
field_dict['content'] = 'a'*64
# 上传文件类型
field_dict['content-type'] = 'text/plain'

# 表单域的边界字符串,一般为随机字符串
boundary = '9431149156168'

# 发送POST请求
body = build_post_body(field_dict, boundary)
headers = build_post_headers(len(body), boundary)

resp = requests.post(build_post_url(endpoint, bucket_name),
                     data=body,
                     headers=headers)

# 确认请求结果
print(resp.status_code)
assert resp.status_code == 200
assert resp.headers['x-oss-hash-crc64ecma'] == str(calculate_crc64(field_dict['content']))

相关文档

  • 关于上传回调的完整示例代码,请参见GitHub示例

  • 关于上传回调的API接口说明,请参见Callback