全部產品
Search
文件中心

Tablestore:寫入資料

更新時間:Sep 03, 2024

Tablestore提供了單行插入、單行更新和批量寫入的寫入方式用於寫入資料到資料表。當要寫入資料到資料表時,您需要指定完整主鍵以及要增刪改的屬性列。在高並發應用中寫入資料時,您可以配置行存在性條件或者列條件實現按照指定條件更新資料。

寫入方式

Table Store提供的資料寫入介面包括PutRow、UpdateRow和BatchWriteRow。寫入資料時,請根據實際情境選擇相應的寫入方式。

寫入方式

說明

適用情境

插入單行資料

調用PutRow介面新寫入一行資料。如果該行已存在,則Table Store會先刪除原行資料(原行的所有列以及所有版本的資料),再寫入新行資料。

適用於要寫入資料較少的情境。

更新單行資料

調用UpdateRow介面更新一行資料,支援增加和刪除一行中的屬性列,刪除屬性列指定版本的資料,或者更新已存在的屬性列值。如果更新的行不存在,則新增一行資料。

適用於更新已寫入資料的情境,例如刪除屬性列、刪除某個資料版本、修改屬性列值等。

批量寫入資料

調用BatchWriteRow介面在一次請求中進行批量寫入操作或者一次對多張表進行寫入。

BatchWriteRow操作由多個PutRow、UpdateRow、DeleteRow子操作組成,構造子操作的過程與使用PutRow介面、UpdateRow介面和DeleteRow介面時相同。

適用於要寫入、刪除或者更新大量資料以及要同時進行增刪改資料的情境。

前提條件

  • 已初始化Client。具體操作,請參見初始化OTSClient

  • 已建立資料表並寫入資料。

插入單行資料

調用PutRow介面新寫入一行資料。如果該行已存在,則先刪除原行資料(原行的所有列以及所有版本的資料),再寫入新行資料。

介面

"""
說明:寫入一行資料。返回本次操作消耗的CapacityUnit。
``table_name``是對應的表名。
``row``是行資料,包括主鍵和屬性列。
``condition``表示執行操作前做條件檢查,滿足條件才執行,是tablestore.metadata.Condition類的執行個體。
目前支援兩種條件檢測,一是對行的存在性進行檢查,檢查條件包括:'IGNORE','EXPECT_EXIST'和'EXPECT_NOT_EXIST';二是對屬性列值的條件檢測。
``return_type``表示傳回型別,是tablestore.metadata.ReturnType類的執行個體,目前僅支援返回PrimaryKey,一般用於主鍵列自增中。
返回:本次操作消耗的CapacityUnit和需要返回的行資料。
consumed表示消耗的CapacityUnit,是tablestore.metadata.CapacityUnit類的執行個體。
return_row表示返回的行資料,可能包括主鍵、屬性列。
樣本:
    primary_key = [('gid',1), ('uid',101)]
    attribute_columns = [('name','張三'), ('mobile',111111111), ('address','中國A地'), ('age',20)]
    row = Row(primary_key, attribute_columns)
    condition = Condition('EXPECT_NOT_EXIST')
    consumed, return_row = client.put_row('myTable', row, condition)
"""
def put_row(self, table_name, row, condition = None, return_type = None, transaction_id = None):

參數

參數

說明

table_name

資料表名稱。

row

行資料,包括以下配置:

  • primary_key:行的主鍵。主鍵包括主鍵列名、主鍵類型和主索引值。

    重要
    • 設定的主鍵個數和類型必須和資料表的主鍵個數和類型一致。

    • 當主鍵為自增列時,只需將自增列的值設定為預留位置。更多資訊,請參見主鍵列自增

  • attribute_columns:行的屬性列。每一項的順序是屬性名稱、屬性值ColumnValue、屬性類型ColumnType(可選)、時間戳記(可選)。

    • 屬性名稱即屬性列的名稱,屬性類型即屬性列的資料類型。更多資訊,請參見命名規則和資料類型

      屬性類型可以是INTEGER、STRING(UTF-8編碼字串)、BINARY、BOOLEAN、DOUBLE五種,分別用ColumnType.INTEGER、ColumnType.STRING、ColumnType.BINARY、ColumnType.BOOLEAN、ColumnType.DOUBLE表示,其中BINARY不可省略,其他類型都可以省略。

    • 時間戳記即資料的版本號碼。更多資訊,請參見資料版本和生命週期

      資料的版本號碼可以由系統自動產生或者自訂,如果不設定此參數,則預設由系統自動產生。

      • 當由系統自動產生資料的版本號碼時,系統預設將目前時間的毫秒單位時間戳記(從1970-01-01 00:00:00 UTC計算起的毫秒數)作為資料的版本號碼。

      • 當自訂資料的版本號碼時,版本號碼需要為64位的毫秒單位時間戳記且在有效版本範圍內。

condition

使用條件更新,可以設定原行的存在性條件或者原行中某列的列值條件。更多資訊,請參見條件更新

說明
  • RowExistenceExpectation.IGNORE表示無論此行是否存在均會插入新資料,如果之前行已存在,則寫入資料時會覆蓋原有資料。

  • RowExistenceExpectation.EXPECT_EXIST表示只有此行存在時才會插入新資料,寫入資料時會覆蓋原有資料。

  • RowExistenceExpectation.EXPECT_NOT_EXIST表示只有此行不存在時才會插入資料。

return_type

表示返回資料的類型,是tablestore.metadata.ReturnType類的執行個體,目前僅支援返回PrimaryKey,一般用於主鍵列自增中。

transaction_id

局部事務ID。使用局部事務功能刪除資料時必須設定此參數。

樣本

插入一行資料。

說明

如下樣本中屬性列age的版本為1498184687000,此值是2017年06月23日,如果目前時間-max_time_deviation(max_time_deviation由建立資料表時指定)大於1498184687000時,則PutRow時會被禁止。

# 設定資料表名稱。
table_name = '<TABLE_NAME>'

# 主鍵的第一個主鍵列是gid,值是整數1,第二個主鍵列是uid,值是整數101。
primary_key = [('gid',1), ('uid',101)]
# 屬性列包括五個:
# 第一個屬性列的名字是name,值是字串John,版本號碼沒有指定,使用系統目前時間作為版本號碼。
# 第二個屬性列的名字是mobile,值是整數1390000****,版本號碼沒有指定,使用系統目前時間作為版本號碼。
# 第三個屬性列的名字是address,值是二進位的China,版本號碼沒有指定,使用系統目前時間作為版本號碼。
# 第四個屬性列的名字是female,值是布爾值False,版本號碼沒有指定,使用系統目前時間作為版本號碼。
# 第五個屬性列的名字是age,值是29.7,指定版本號碼為1498184687000。
attribute_columns = [('name','John'), ('mobile',1390000****),('address', bytearray('China', encoding='utf8')),('female', False), ('age', 29.7, 1498184687000)]
# 通過primary_key和attribute_columns構造Row。
row = Row(primary_key, attribute_columns)

# 設定條件更新,行條件檢查為期望行不存在。如果行存在會出現Condition Update Failed錯誤。
condition = Condition(RowExistenceExpectation.EXPECT_NOT_EXIST)
try:
    # 調用put_row方法,如果沒有指定ReturnType,則return_row為None。
    consumed, return_row = client.put_row(table_name, row, condition)

    # 列印此次請求消耗的寫CU。
    print('put row succeed, consume %s write cu.' % consumed.write)
# 用戶端異常,一般為參數錯誤或者網路異常。
except OTSClientError as e:
    print("put row failed, http_status:%d, error_message:%s" % (e.get_http_status(), e.get_error_message()))
# 服務端異常,一般為參數錯誤或者流控錯誤。
except OTSServiceError as e:
    print("put row failed, http_status:%d, error_code:%s, error_message:%s, request_id:%s" % (e.get_http_status(), e.get_error_code(), e.get_error_message(), e.get_request_id()))                    

詳細代碼請參見PutRow@GitHub

更新單行資料

調用UpdateRow介面更新一行資料,可以增加和刪除一行中的屬性列,刪除屬性列指定版本的資料,或者更新已存在的屬性列的值。如果更新的行不存在,則新增一行資料。

說明

當UpdateRow請求中只包含刪除指定的列且該行不存在時,則該請求不會新增一行資料。

介面

"""
說明:更新一行資料。
``table_name``是對應的表名。
``row``表示更新的行資料,包括主鍵列和屬性列,主鍵列是list;屬性列是dict。
``condition``表示執行操作前做條件檢查,滿足條件才執行,是tablestore.metadata.Condition類的執行個體。
目前支援兩種條件檢測,一是對行的存在性進行檢查,檢查條件包括:'IGNORE','EXPECT_EXIST'和'EXPECT_NOT_EXIST';二是對屬性列值的條件檢測。
``return_type``表示傳回型別,是tablestore.metadata.ReturnType類的執行個體,目前僅支援返回PrimaryKey,一般用於主鍵列自增中。
返回:本次操作消耗的CapacityUnit和需要返回的行資料return_row
consumed表示消耗的CapacityUnit,是tablestore.metadata.CapacityUnit類的執行個體。
return_row表示需要返回的行資料。
樣本:
    primary_key = [('gid',1), ('uid',101)]
    update_of_attribute_columns = {
        'put' : [('name','張三丰'), ('address','中國B地')],
        'delete' : [('mobile', 1493725896147)],
        'delete_all' : [('age')],
        'increment' : [('counter', 1)]
    }
    row = Row(primary_key, update_of_attribute_columns)
    condition = Condition('EXPECT_EXIST')
    consumed = client.update_row('myTable', row, condition)
"""
def update_row(self, table_name, row, condition, return_type = None, transaction_id = None):

參數

參數

說明

table_name

資料表名稱。

row

行資料,包括以下配置:

  • primary_key:行的主鍵。主鍵包括主鍵列名、主鍵類型和主索引值。

    重要
    • 設定的主鍵個數和類型必須和資料表的主鍵個數和類型一致。

    • 當主鍵為自增列時,只需將自增列的值設定為預留位置。更多資訊,請參見主鍵列自增

  • update_of_attribute_columns:更新的屬性列。

    • 增加或更新資料時,需要設定屬性名稱、屬性值、屬性類型(可選)、時間戳記(可選)。

      屬性名稱即屬性列的名稱,屬性類型即屬性列的資料類型。更多資訊,請參見命名規則和資料類型

      時間戳記即資料的版本號碼,可以由系統自動產生或者自訂,如果不設定此參數,則預設由系統自動產生。更多資訊,請參見資料版本和生命週期

      • 當由系統自動產生資料的版本號碼時,系統預設將目前時間的毫秒單位時間戳記(從1970-01-01 00:00:00 UTC計算起的毫秒數)作為資料的版本號碼。

      • 當自訂資料的版本號碼時,版本號碼需要為64位的毫秒單位時間戳記且在有效版本範圍內。

    • 刪除屬性列特定版本的資料時,只需要設定屬性名稱和時間戳記。

      時間戳記是64位整數,單位為毫秒,表示某個特定版本的資料。

    • 刪除屬性列時,只需要設定屬性名稱。

      說明

      刪除一行的全部屬性列不等同於刪除該行,如果需要刪除該行,請使用DeleteRow操作。

condition

使用條件更新,可以設定原行的存在性條件或者原行中某列的列值條件。更多資訊,請參見條件更新

return_type

表示返回資料的類型,是tablestore.metadata.ReturnType類的執行個體,目前僅支援返回PrimaryKey,一般用於主鍵列自增中。

transaction_id

局部事務ID。使用局部事務功能刪除資料時必須設定此參數。

樣本

更新一行資料。

# 設定資料表名稱。
table_name = '<TABLE_NAME>'

# 主鍵的第一列是gid,值是整數1,第二列是uid,值是整數101。
primary_key = [('gid',1), ('uid',101)]
# 更新包括PUT,DELETE和DELETE_ALL三部分。
# PUT:新增或者更新列值。樣本中是新增兩列,第一列名字是name,值是David,第二列名字是address,值是Hongkong。
# DELETE: 刪除指定版本號碼(時間戳記)的值。樣本中是刪除版本為1488436949003的address列的值。
# DELETE_ALL:刪除列。樣本中是刪除mobile和age兩列的所有版本的值。
update_of_attribute_columns = {
    'PUT' : [('name','David'), ('address','Hongkong')],
    'DELETE' : [('address', None, 1488436949003)],
    'DELETE_ALL' : [('mobile'), ('age')],
}
row = Row(primary_key, update_of_attribute_columns)

# 行條件檢查為忽略,無論行是否存在,均會更新。
condition = Condition(RowExistenceExpectation.IGNORE, SingleColumnCondition("age", 20, ComparatorType.EQUAL)) # update row only when this row is exist
try:
    consumed, return_row = client.update_row(table_name, row, condition)
    
    # 列印此次請求消耗的寫CU。
    print('put row succeed, consume %s write cu.' % consumed.write)
# 用戶端異常,一般為參數錯誤或者網路異常。
except OTSClientError as e:
    print("update row failed, http_status:%d, error_message:%s" % (e.get_http_status(), e.get_error_message()))
# 服務端異常,一般為參數錯誤或者流控錯誤。
except OTSServiceError as e:
    print("update row failed, http_status:%d, error_code:%s, error_message:%s, request_id:%s" % (e.get_http_status(), e.get_error_code(), e.get_error_message(), e.get_request_id()))

詳細代碼請參見UpdateRow@GitHub

批量寫入資料

調用BatchWriteRow介面在一次請求中進行批量寫入操作或者一次對多張表進行寫入。

BatchWriteRow的各個子操作獨立執行,Tablestore會分別返回各個子操作的執行結果。

注意事項

由於批量寫入可能存在部分行失敗的情況,失敗行的Index及錯誤資訊在返回的BatchWriteRowResponse中,但並不拋出異常。因此調用BatchWriteRow介面時,需要檢查傳回值,判斷每行的狀態是否成功;如果不檢查傳回值,則可能會忽略掉部分操作的失敗。

當服務端檢查到某些操作出現參數錯誤時,BatchWriteRow介面可能會拋出參數錯誤的異常,此時該請求中所有的操作都未執行。

介面

"""
說明:批量修改多行資料。
request = MiltiTableInBatchWriteRowItem()
request.add(TableInBatchWriteRowItem(table0, row_items))
request.add(TableInBatchWriteRowItem(table1, row_items))
response = client.batch_write_row(request)
``response``為返回的結果,類型為tablestore.metadata.BatchWriteRowResponse。
"""
def batch_write_row(self, request):                   

樣本

批量寫資料。

put_row_items = []
# 增加PutRow的行。
for i in range(0, 20):
    primary_key = [('gid',i), ('uid',i+1)]
    attribute_columns = [('name','somebody'+str(i)), ('address','somewhere'+str(i)), ('age',i)]
    row = Row(primary_key, attribute_columns)
    condition = Condition(RowExistenceExpectation.IGNORE)
    item = PutRowItem(row, condition)
    put_row_items.append(item)

# 增加UpdateRow的行。
for i in range(0, 10):
    primary_key = [('gid',i), ('uid',i+1)]
    attribute_columns = {'put': [('name','somebody'+str(i)), ('address','somewhere'+str(i)), ('age',i)]}
    row = Row(primary_key, attribute_columns)
    condition = Condition(RowExistenceExpectation.IGNORE, SingleColumnCondition("age", i, ComparatorType.EQUAL))
    item = UpdateRowItem(row, condition)
    put_row_items.append(item)

# 增加DeleteRow的行。
delete_row_items = []
for i in range(10, 20):
    primary_key = [('gid',i), ('uid',i+1)]
    row = Row(primary_key)
    condition = Condition(RowExistenceExpectation.IGNORE)
    item = DeleteRowItem(row, condition)
    delete_row_items.append(item)

# 構造批量寫請求。
request = BatchWriteRowRequest()
request.add(TableInBatchWriteRowItem('<TABLE_NAME>', put_row_items))
request.add(TableInBatchWriteRowItem('<DELETE_TABLE_NAME>', delete_row_items))

# 調用batch_write_row方法執行批量寫,如果請求參數等錯誤會拋異常;如果部分行失敗,則不會拋異常,但是內部的Item會失敗。
try:
    result = client.batch_write_row(request)
    print('Result status: %s'%(result.is_all_succeed()))

    # 檢查Put行的結果。
    print('check first table\'s put results:')
    succ, fail = result.get_put()
    for item in succ:
        print('Put succeed, consume %s write cu.' % item.consumed.write)
    for item in fail:
        print('Put failed, error code: %s, error message: %s' % (item.error_code, item.error_message))

    # 檢查Update行的結果。
    print('check first table\'s update results:')
    succ, fail = result.get_update()
    for item in succ:
        print('Update succeed, consume %s write cu.' % item.consumed.write)
    for item in fail:
        print('Update failed, error code: %s, error message: %s' % (item.error_code, item.error_message))

    # 檢查Delete行的結果。
    print('check second table\'s delete results:')
    succ, fail = result.get_delete()
    for item in succ:
        print('Delete succeed, consume %s write cu.' % item.consumed.write)
    for item in fail:
        print('Delete failed, error code: %s, error message: %s' % (item.error_code, item.error_message)) 
# 用戶端異常,一般為參數錯誤或者網路異常。
except OTSClientError as e:
    print("get row failed, http_status:%d, error_message:%s" % (e.get_http_status(), e.get_error_message()))
# 服務端異常,一般為參數錯誤或者流控錯誤。
except OTSServiceError as e:
    print("get row failed, http_status:%d, error_code:%s, error_message:%s, request_id:%s" % (e.get_http_status(), e.get_error_code(), e.get_error_message(), e.get_request_id()))

詳細代碼請參見BatchWriteRow@GitHub

常見問題

相關文檔

  • 當要在高並發應用中實現按照指定條件更新資料時,您可以通過條件更新實現。更多資訊,請參見條件更新

  • 當要為線上應用提供即時統計功能時,例如統計文章的PV(即時瀏覽量)等,您可以通過原子計數器實現。更多資訊,請參見原子計數器

  • 當要進行單行寫或多行寫的原子操作,您可以通過局部事務實現。更多資訊,請參見局部事務

  • 寫入資料後,即可根據需要讀取或者刪除表中資料。更多資訊,請參見讀取資料刪除資料