OSS在完成简单上传(put_object和put_object_from_file)以及分片上传(complete_multipart_upload)时可以提供回调(Callback)给应用服务器。您只需要在发送给OSS的请求中携带相应的Callback参数,即可实现回调。
注意事项
本文以华东1(杭州)外网Endpoint为例。如果您希望通过与OSS同地域的其他阿里云产品访问OSS,请使用内网Endpoint。关于OSS支持的Region与Endpoint的对应关系,请参见OSS地域和访问域名。
本文以OSS域名新建OSSClient为例。如果您希望通过自定义域名、STS等方式新建OSSClient,请参见初始化。
示例代码
简单上传回调
# -*- coding: utf-8 -*-
import json
import base64
import oss2
from oss2.credentials import EnvironmentVariableCredentialsProvider
# 从环境变量中获取访问凭证。运行本代码示例之前,请确保已设置环境变量OSS_ACCESS_KEY_ID和OSS_ACCESS_KEY_SECRET。
auth = oss2.ProviderAuthV4(EnvironmentVariableCredentialsProvider())
# 填写Bucket所在地域对应的Endpoint。以华东1(杭州)为例,Endpoint填写为https://oss-cn-hangzhou.aliyuncs.com。
endpoint = "https://oss-cn-hangzhou.aliyuncs.com"
# 填写Endpoint对应的Region信息,例如cn-hangzhou。注意,v4签名下,必须填写该参数
region = "cn-hangzhou"
# yourBucketName填写存储空间名称。
bucket = oss2.Bucket(auth, endpoint, "yourBucketName", region=region)
# 定义回调参数Base64编码函数。
def encode_callback(callback_params):
cb_str = json.dumps(callback_params).strip()
return oss2.compat.to_string(base64.b64encode(oss2.compat.to_bytes(cb_str)))
# 设置上传回调参数。
callback_params = {}
# 设置回调请求的服务器地址,例如http://oss-demo.aliyuncs.com:23450。
callback_params['callbackUrl'] = 'http://oss-demo.aliyuncs.com:23450'
#(可选)设置回调请求消息头中Host的值,即您的服务器配置Host的值。
#callback_params['callbackHost'] = 'yourCallbackHost'
# 指定回调请求的 Body 内容,使用占位符动态传递对象信息
callback_params['callbackBody'] = 'bucket=${bucket}&object=${object}&size=${size}&my_var_1=${x:my_var1}&my_var_2=${x:my_var2}'
# 指定回调请求的 Content-Type
callback_params['callbackBodyType'] = 'application/x-www-form-urlencoded'
encoded_callback = encode_callback(callback_params)
# 设置发起回调请求的自定义参数,由Key和Value组成,Key必须以x:开始。
callback_var_params = {'x:my_var1': 'my_val1', 'x:my_var2': 'my_val2'}
encoded_callback_var = encode_callback(callback_var_params)
# 上传回调。
params = {'x-oss-callback': encoded_callback, 'x-oss-callback-var': encoded_callback_var}
# 填写Object完整路径和字符串。Object完整路径中不能包含Bucket名称。
result = bucket.put_object('examplefiles/exampleobject.txt', 'a'*1024*1024, params)
分片上传回调
# -*- coding: utf-8 -*-
import json
from oss2.credentials import EnvironmentVariableCredentialsProvider
import oss2
key = 'exampleobject.txt'
content = "Anything you're good at contributes to happiness."
# 从环境变量中获取访问凭证。运行本代码示例之前,请确保已设置环境变量OSS_ACCESS_KEY_ID和OSS_ACCESS_KEY_SECRET。
auth = oss2.ProviderAuthV4(EnvironmentVariableCredentialsProvider())
# 填写Bucket所在地域对应的Endpoint。以华东1(杭州)为例,Endpoint填写为https://oss-cn-hangzhou.aliyuncs.com。
endpoint = "https://oss-cn-hangzhou.aliyuncs.com"
# 填写Endpoint对应的Region信息,例如cn-hangzhou。注意,v4签名下,必须填写该参数
region = "cn-hangzhou"
# 设置回调请求的服务器地址,例如http://oss-demo.aliyuncs.com:23450。
callback_url = 'http://oss-demo.aliyuncs.com:23450'
# yourBucketName填写存储空间名称。
bucket = oss2.Bucket(auth, endpoint, "yourBucketName", region=region)
# 准备回调参数,更详细的信息请参考 https://help.aliyun.com/document_detail/31989.html
callback_dict = {}
callback_dict['callbackUrl'] = callback_url
#(可选)设置回调请求消息头中Host的值,即您的服务器配置Host的值。
# callback_dict['callbackHost'] = 'oss-cn-hangzhou.aliyuncs.com'
## 指定回调请求的 Body 内容,使用占位符动态传递对象信息
callback_dict['callbackBody'] = 'bucket=${bucket}&object=${object}&size=${size}&mimeType=${mimeType}&my_var_1=${x:my_var1}&my_var_2=${x:my_var2}'
# 指定回调请求的 Content-Type
callback_dict['callbackBodyType'] = 'application/x-www-form-urlencoded'
callback_var_params = {'x:my_var1': 'my_val1', 'x:my_var2': 'my_val2'}
callback_var_param_json = json.dumps(callback_var_params).strip()
encoded_callback_var = oss2.utils.b64encode_as_string(callback_var_param_json)
# 回调参数是json格式,并且base64编码
callback_param = json.dumps(callback_dict).strip()
base64_callback_body = oss2.utils.b64encode_as_string(callback_param)
# 回调参数编码后放在header中传给oss
headers = {'x-oss-callback': base64_callback_body, 'x-oss-callback-var': encoded_callback_var}
"""
分片上传回调
"""
# 分片上传回调
# 初始化上传任务
parts = []
upload_id = bucket.init_multipart_upload(key).upload_id
# 上传分片
result = bucket.upload_part(key, upload_id, 1, content)
parts.append(oss2.models.PartInfo(1, result.etag, size = len(content), part_crc = result.crc))
# 完成上传并回调
result = bucket.complete_multipart_upload(key, upload_id, parts, headers)
# 上传并回调成功status为200,上传成功回调失败status为203
if result.status == 200:
print("文件上传成功,回调成功 (HTTP 200)")
elif result.status == 203:
print("文件上传成功,但回调失败 (HTTP 203)")
else:
print(f"上传异常,状态码: {result.status}")
# 确认文件上传成功
result = bucket.head_object(key)
assert 'x-oss-hash-crc64ecma' in result.headers
表单上传回调
# -*- coding: utf-8 -*-
import os
import time
import datetime
import json
import base64
import hmac
import hashlib
import crcmod
import requests
# 以下代码展示了PostObject的用法。PostObject不依赖于OSS Python SDK。
# PostObject的官网 https://help.aliyun.com/document_detail/31988.html
# PostObject错误及排查 https://yq.aliyun.com/articles/58524
# 首先初始化AccessKeyId、AccessKeySecret、Endpoint等信息。
# 通过环境变量获取,或者把诸如“<你的AccessKeyId>”替换成真实的AccessKeyId等。
access_key_id = os.getenv('OSS_TEST_ACCESS_KEY_ID', '<你的AccessKeyId>')
access_key_secret = os.getenv('OSS_TEST_ACCESS_KEY_SECRET', '<你的AccessKeySecret>')
bucket_name = os.getenv('OSS_TEST_BUCKET', '<你的Bucket>')
endpoint = os.getenv('OSS_TEST_ENDPOINT', '<你的访问域名>')
# 这里以oss-demo.aliyuncs.com:23450举例
call_back_url = "http://oss-demo.aliyuncs.com:23450"
# 例如cn-hangzhou
region = "<你的region>"
# 确认上面的参数都填写正确了
for param in (access_key_id, access_key_secret, bucket_name, endpoint):
assert '<' not in param, '请设置参数:' + param
def convert_base64(input):
return base64.b64encode(input.encode(encoding='utf-8')).decode('utf-8')
def calculate_crc64(data):
"""计算文件的MD5
:param data: 数据
:return 数据的MD5值
"""
_POLY = 0x142F0E1EBA9EA3693
_XOROUT = 0XFFFFFFFFFFFFFFFF
crc64 = crcmod.Crc(_POLY, initCrc=0, xorOut=_XOROUT)
crc64.update(data.encode())
return crc64.crcValue
def build_gmt_expired_time(expire_time):
"""生成GMT格式的请求超时时间
:param int expire_time: 超时时间,单位秒
:return str GMT格式的超时时间
"""
now = int(time.time())
expire_syncpoint = now + expire_time
expire_gmt = datetime.datetime.fromtimestamp(expire_syncpoint).isoformat()
expire_gmt += 'Z'
return expire_gmt
def build_encode_policy(expired_time, condition_list):
"""生成policy
:param int expired_time: 超时时间,单位秒
:param list condition_list: 限制条件列表
"""
policy_dict = {}
policy_dict['expiration'] = build_gmt_expired_time(expired_time)
policy_dict['conditions'] = condition_list
policy = json.dumps(policy_dict).strip()
policy_encode = base64.b64encode(policy.encode())
return policy_encode
def build_signature(access_key_secret, date):
"""生成签名
:param str access_key_secret: access key secret
:return str 请求签名
"""
signing_key = "aliyun_v4" + access_key_secret
h1 = hmac.new(signing_key.encode(), date.encode(), hashlib.sha256)
h1_key = h1.digest()
h2 = hmac.new(h1_key, region.encode(), hashlib.sha256)
h2_key = h2.digest()
h3 = hmac.new(h2_key, product.encode(), hashlib.sha256)
h3_key = h3.digest()
h4 = hmac.new(h3_key, "aliyun_v4_request".encode(), hashlib.sha256)
h4_key = h4.digest()
h = hmac.new(h4_key, string_to_sign.encode(), hashlib.sha256)
signature = h.hexdigest()
return signature
def bulid_callback(cb_url, cb_body, cb_body_type=None, cb_host=None):
"""生成callback字符串
:param str cb_url: 回调服务器地址,文件上传成功后OSS向此url发送回调请求
:param str cb_body: 发起回调请求的Content-Type,默认application/x-www-form-urlencoded
:param str cb_body_type: 发起回调时请求body
:param str cb_host: 发起回调请求时Host头的值
:return str 编码后的Callback
"""
callback_dict = {}
callback_dict['callbackUrl'] = cb_url
callback_dict['callbackBody'] = cb_body
if cb_body_type is None:
callback_dict['callbackBodyType'] = 'application/x-www-form-urlencoded'
else:
callback_dict['callbackBodyType'] = cb_body_type
if cb_host is not None:
callback_dict['callbackHost'] = cb_host
callback_param = json.dumps(callback_dict).strip()
base64_callback = base64.b64encode(callback_param.encode());
return base64_callback.decode()
def build_post_url(endpoint, bucket_name):
"""生成POST请求URL
:param str endpoint: endpoint
:param str bucket_name: bucket name
:return str POST请求URL
"""
if endpoint.startswith('http://'):
return endpoint.replace('http://', 'http://{0}.'.format(bucket_name))
elif endpoint.startswith('https://'):
return endpoint.replace('https://', 'https://{0}.'.format(bucket_name))
else:
return 'http://{0}.{1}'.format(bucket_name, endpoint)
def build_post_body(field_dict, boundary):
"""生成POST请求Body
:param dict field_dict: POST请求表单域
:param str boundary: 表单域的边界字符串
:return str POST请求Body
"""
post_body = ''
# 编码表单域
for k,v in field_dict.items():
if k != 'content' and k != 'content-type':
post_body += '''--{0}\r\nContent-Disposition: form-data; name=\"{1}\"\r\n\r\n{2}\r\n'''.format(boundary, k, v)
# 上传文件的内容,必须作为最后一个表单域
post_body += '''--{0}\r\nContent-Disposition: form-data; name=\"file\"; filename=\"{1}\"\r\nContent-Type: {2}\r\n\r\n{3}'''.format(
boundary, field_dict['key'], field_dict['content-type'], field_dict['content'])
# 加上表单域结束符
post_body += '\r\n--{0}--\r\n'.format(boundary)
return post_body.encode('utf-8')
def build_post_headers(body_len, boundary, headers=None):
"""生气POST请求Header
:param str body_len: POST请求Body长度
:param str boundary: 表单域的边界字符串
:param dict 请求Header
"""
headers = headers if headers else {}
headers['Content-Length'] = str(body_len)
headers['Content-Type'] = 'multipart/form-data; boundary={0}'.format(boundary)
return headers
def encode_callback(callback_params):
cb_str = json.dumps(callback_params).strip()
return base64.b64encode(cb_str.encode()).decode()
# POST请求表单域,注意大小写
field_dict = {}
# object名称
field_dict['key'] = '0303/post.txt'
# access key id
field_dict['OSSAccessKeyId'] = access_key_id
product = "oss"
utc_time = datetime.datetime.utcnow()
# 过期时间设置3600秒
expiration = '2120-01-01T12:00:00.000Z'
date = utc_time.strftime("%Y%m%d")
policy_map = {
"expiration": expiration,
"conditions": [
{"bucket": bucket_name},
{"x-oss-signature-version": "OSS4-HMAC-SHA256"},
{"x-oss-credential": f"{access_key_id}/{date}/{region}/{product}/aliyun_v4_request"},
{"x-oss-date": utc_time.strftime("%Y%m%dT%H%M%SZ")},
["content-length-range", 1, 1024]
]
}
policy = json.dumps(policy_map)
print(policy)
string_to_sign = base64.b64encode(policy.encode()).decode()
field_dict['policy'] = string_to_sign
field_dict['x-oss-signature-version'] = "OSS4-HMAC-SHA256"
field_dict['x-oss-credential'] = f"{access_key_id}/{date}/{region}/{product}/aliyun_v4_request"
field_dict['x-oss-date'] = f"{utc_time.strftime('%Y%m%dT%H%M%SZ')}"
# 请求签名
field_dict['x-oss-signature'] = build_signature(access_key_secret, date)
# 临时用户Token,当使用临时用户密钥时Token必填;非临时用户填空或不填
# field_dict['x-oss-security-token'] = ''
# Content-Disposition
field_dict['Content-Disposition'] = 'attachment;filename=download.txt'
# 用户自定义meta
field_dict['x-oss-meta-uuid'] = 'uuid-xxx'
# callback,没有回调需求不填该域
field_dict['callback'] = bulid_callback(call_back_url,
'bucket=${bucket}&object=${object}&size=${size}&mimeType=${mimeType}&my_var_1=${x:my_var1}&my_var_2=${x:my_var2}',
'application/x-www-form-urlencoded')
# callback中的自定义变量,没有回调不填该域
field_dict['x:my_var1'] = 'value1'
field_dict['x:my_var2'] = 'value2'
# 如果上传文件
# with open("", r) as f:
# content = f.read()
# field_dict['content'] = content
# 上传文件内容
field_dict['content'] = 'a'*64
# 上传文件类型
field_dict['content-type'] = 'text/plain'
# 表单域的边界字符串,一般为随机字符串
boundary = '9431149156168'
# 发送POST请求
body = build_post_body(field_dict, boundary)
headers = build_post_headers(len(body), boundary)
resp = requests.post(build_post_url(endpoint, bucket_name),
data=body,
headers=headers)
# 确认请求结果
print(resp.status_code)
assert resp.status_code == 200
assert resp.headers['x-oss-hash-crc64ecma'] == str(calculate_crc64(field_dict['content']))