全部產品
Search
文件中心

Elasticsearch:通過reindex將自建ES資料移轉至阿里雲

更新時間:Jun 30, 2024

本文介紹通過reindex方式,將ECS上自建Elasticsearch(簡稱ES)叢集中的資料移轉至阿里雲ES中,包括建立索引和遷移資料。

背景資訊

通過reindex遷移資料,僅支援單可用性區域執行個體。如果您使用的是多可用性區域執行個體,建議採用如下方案將自建ES資料移轉至阿里雲:

前提條件

您已完成以下操作:

  • 建立單可用性區域的阿里雲ES執行個體。

    具體操作請參見建立Elasticsearch執行個體

  • 準備自建ES叢集和待遷移的資料。

    如果您還沒有自建ES叢集,建議您使用阿里雲ECS進行搭建,具體操作步驟請參見安裝並運行Elasticsearch。自建ES叢集需要滿足以下條件:

    • 所在的ECS的網路類型必須是專用網路(不支援ClassicLink方式打通的ECS),且必須與阿里雲ES在同一個專用網路下。

    • 所在的ECS的安全性群組不能限制阿里雲ES執行個體的各節點IP(Kibana控制台可查看各節點的IP),且要開啟9200連接埠。

    • 能夠與阿里雲ES執行個體連通。可在執行指令碼的機器上,使用curl -XGET http://<host>:9200命令驗證。

      說明

      您可以通過任意一台機器執行文檔中的指令碼,前提是該機器可以同時訪問自建ES和阿里雲ES叢集的9200連接埠。

使用限制

2020年10月阿里雲ES進行了網路架構調整,新網路架構下的叢集跨叢集reindex需依賴Privatelink打通阿里雲ES叢集私網。您可以參見下表,依據您的業務情境選擇解決方案進行處理。

說明

2020年10月之前建立的ES叢集屬於舊網路架構,2020年10月及之後建立的ES叢集屬於新網路架構。

使用情境

ES叢集所處網路架構

解決方案

阿里雲ES叢集間的資料移轉

兩個ES叢集均建立於舊網路架構下。

reindex方式:阿里雲ES間跨叢集reindex

其中一個ES叢集建立於新網路架構下。

說明

另一個ES叢集可以建立於新網路架構,也可以建立於舊網路架構。

將ECS上自建的ES叢集中的資料移轉至阿里雲ES叢集中

阿里雲ES叢集建立於舊網路架構下。

reindex方式:通過reindex將自建ES資料移轉至阿里雲

阿里雲ES叢集建立於新網路架構下。

reindex方式:通過執行個體私網打通將自建Elasticsearch資料移轉至阿里雲

注意事項

  • 2020年10月,Elasticsearch對網路架構進行了調整。2020年10月之前為舊網路架構,2020年10月及之後為新網路架構。新網路架構下的執行個體不支援與舊網路架構下的執行個體進行跨叢集reindex、跨叢集搜尋、跨叢集複製等執行個體互連操作。如果需要進行互連,需要確保執行個體建立在同一網路架構下。對於華北3(張家口)和海外地區,由於網路架構調整時間不確定,因此需要提交工單,聯絡Elasticsearch支援人員,校正網路是否可以互連。

  • 新網路架構下,阿里雲ES執行個體部署在阿里雲服務帳號下的VPC中,不支援訪問其他網路環境下的資源;舊網路架構下,阿里雲ES部署在使用者VPC中,網路訪問不受影響。

  • 為保證資料移轉前後一致,建議上遊業務停止自建ES叢集的資料寫入更新操作,確保讀操作正常進行。遷移完成後,直接切換到阿里雲ES叢集進行讀寫操作。如果不停止寫操作,建議通過指令碼設定迴圈任務減少停寫服務時間,具體請參見步驟四:遷移資料中的《資料量大、無刪除操作、有更新時間》章節。

  • 當使用網域名稱訪問自建ES或阿里雲ES叢集時,不允許通過http://host:port/path這種帶path的形式訪問。

操作流程

  1. 步驟一:擷取終端網域名稱(可選)

  2. 步驟二:建立目標端索引

  3. 步驟三:配置reindex白名單

  4. 步驟四:遷移資料

步驟一:擷取終端網域名稱(可選)

如果您建立的阿里雲ES處於新網路架構下,需要藉助PrivateLink,打通ECS上自建的ES叢集所處的網路與阿里雲服務帳號的網路,擷取終端網域名稱,為後續配置做準備。具體操作,請參見配置執行個體私網串連

步驟二:建立目標端索引

參考自建ES叢集中需要遷移的索引配置,提前在阿里雲ES叢集中建立索引。或者為阿里雲ES叢集開啟自動建立索引功能(不建議)。

以Python2為例,使用如下指令碼,在阿里雲ES叢集中大量建立自建ES叢集中需要遷移的索引。預設新建立的索引複本數為0。

#!/usr/bin/python
# -*- coding: UTF-8 -*-
# 檔案名稱:indiceCreate.py
import sys
import base64
import time
import httplib
import json
## 自建Elasticsearch叢集host。
oldClusterHost = "old-cluster.com"
## 自建Elasticsearch叢集使用者名稱,可為空白。
oldClusterUserName = "old-username"
## 自建Elasticsearch叢集密碼,可為空白。
oldClusterPassword = "old-password"
## Elasticsearch叢集host,可在Elasticsearch執行個體的基本資料頁面擷取。
newClusterHost = "new-cluster.com"
## Elasticsearch叢集使用者名稱。
newClusterUser = "elastic"
## Elasticsearch叢集密碼。
newClusterPassword = "new-password"
DEFAULT_REPLICAS = 0
def httpRequest(method, host, endpoint, params="", username="", password=""):
    conn = httplib.HTTPConnection(host)
    headers = {}
    if (username != "") :
        'Hello {name}, your age is {age} !'.format(name = 'Tom', age = '20')
        base64string = base64.encodestring('{username}:{password}'.format(username = username, password = password)).replace('\n', '')
        headers["Authorization"] = "Basic %s" % base64string;
    if "GET" == method:
        headers["Content-Type"] = "application/x-www-form-urlencoded"
        conn.request(method=method, url=endpoint, headers=headers)
    else :
        headers["Content-Type"] = "application/json"
        conn.request(method=method, url=endpoint, body=params, headers=headers)
    response = conn.getresponse()
    res = response.read()
    return res
def httpGet(host, endpoint, username="", password=""):
    return httpRequest("GET", host, endpoint, "", username, password)
def httpPost(host, endpoint, params, username="", password=""):
    return httpRequest("POST", host, endpoint, params, username, password)
def httpPut(host, endpoint, params, username="", password=""):
    return httpRequest("PUT", host, endpoint, params, username, password)
def getIndices(host, username="", password=""):
    endpoint = "/_cat/indices"
    indicesResult = httpGet(oldClusterHost, endpoint, oldClusterUserName, oldClusterPassword)
    indicesList = indicesResult.split("\n")
    indexList = []
    for indices in indicesList:
        if (indices.find("open") > 0):
            indexList.append(indices.split()[2])
    return indexList
def getSettings(index, host, username="", password=""):
    endpoint = "/" + index + "/_settings"
    indexSettings = httpGet(host, endpoint, username, password)
    print index + "  原始settings如下:\n" + indexSettings
    settingsDict = json.loads(indexSettings)
    ## 分區數預設和自建Elasticsearch叢集索引保持一致。
    number_of_shards = settingsDict[index]["settings"]["index"]["number_of_shards"]
    ## 副本數預設為0。
    number_of_replicas = DEFAULT_REPLICAS
    newSetting = "\"settings\": {\"number_of_shards\": %s, \"number_of_replicas\": %s}" % (number_of_shards, number_of_replicas)
    return newSetting
def getMapping(index, host, username="", password=""):
    endpoint = "/" + index + "/_mapping"
    indexMapping = httpGet(host, endpoint, username, password)
    print index + " 原始mapping如下:\n" + indexMapping
    mappingDict = json.loads(indexMapping)
    mappings = json.dumps(mappingDict[index]["mappings"])
    newMapping = "\"mappings\" : " + mappings
    return newMapping
def createIndexStatement(oldIndexName):
    settingStr = getSettings(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
    mappingStr = getMapping(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
    createstatement = "{\n" + str(settingStr) + ",\n" + str(mappingStr) + "\n}"
    return createstatement
def createIndex(oldIndexName, newIndexName=""):
    if (newIndexName == "") :
        newIndexName = oldIndexName
    createstatement = createIndexStatement(oldIndexName)
    print "新索引 " + newIndexName + " 的setting和mapping如下:\n" + createstatement
    endpoint = "/" + newIndexName
    createResult = httpPut(newClusterHost, endpoint, createstatement, newClusterUser, newClusterPassword)
    print "新索引 " + newIndexName + " 建立結果:" + createResult
## main
indexList = getIndices(oldClusterHost, oldClusterUserName, oldClusterPassword)
systemIndex = []
for index in indexList:
    if (index.startswith(".")):
        systemIndex.append(index)
    else :
        createIndex(index, index)
if (len(systemIndex) > 0) :
    for index in systemIndex:
        print index + " 或許是系統索引,不會重新建立,如有需要,請單獨處理~"

步驟三:配置reindex白名單

  1. 登入Elasticsearch控制台
  2. 在左側導覽列,單擊Elasticsearch執行個體
  3. 進入目標執行個體。
    1. 在頂部功能表列處,選擇資源群組和地區。
    2. Elasticsearch執行個體中單擊目標執行個體ID。
  4. 在左側導覽列,選擇配置與管理 > ES叢集配置

  5. YML檔案配置地區,單擊右側的修改配置

  6. YML檔案配置面板,修改其他Configure配置,配置reindex白名單。

    配置樣本如下。

    reindex.remote.whitelist: ["10.0.xx.xx:9200","10.0.xx.xx:9200","10.0.xx.xx:9200","10.15.xx.xx:9200","10.15.xx.xx:9200","10.15.xx.xx:9200"]

    多可用性區域reindex白名單配置

    在配置reindex白名單時,需要通過reindex.remote.whitelist參數,設定自建ES叢集的訪問地址,將其添加到阿里雲ES叢集的遠端存取白名單中。阿里雲ES叢集的網路架構不同,配置規則也不同,具體如下:

    • 舊網路架構下:需要配置為hostport的組合,並使用逗號分隔多個主機配置。例如:otherhost:9200,another:9200,127.0.10.**:9200,localhost:**,不識別協議資訊。

    • 新網路架構下:需要配置為執行個體對應的終端節點網域名稱port的組合。例如:ep-bp1hfkx7coy8lvu4****-cn-hangzhou-i.epsrv-bp1zczi0fgoc5qtv****.cn-hangzhou.privatelink.aliyuncs.com:9200

    說明

    更多參數說明請參見配置YML參數

  7. 選中該操作會重啟執行個體,請確認後操作,單擊確定
    確定後,Elasticsearch執行個體會重啟。重啟過程中,可在工作清單查看進度。重啟成功後,即可完成配置。

步驟四:遷移資料

本文以舊網路架構下的執行個體為例,提供了以下三種資料移轉的方式,請根據遷移的資料量大小以及實際業務情況,選擇合適的方式遷移資料。

資料量小

使用如下指令碼。

#!/bin/bash
# file:reindex.sh
indexName="您的索引名"
newClusterUser="Elasticsearch叢集使用者名稱"
newClusterPass="Elasticsearch叢集密碼"
newClusterHost="Elasticsearch叢集host"
oldClusterUser="自建Elasticsearch叢集使用者名稱"
oldClusterPass="自建Elasticsearch叢集密碼"
# 自建Elasticsearch叢集host必須是[scheme]://[host]:[port],例如http://10.37.*.*:9200。
oldClusterHost="自建Elasticsearch叢集host"
curl -u ${newClusterUser}:${newClusterPass} -XPOST "http://${newClusterHost}/_reindex?pretty" -H "Content-Type: application/json" -d'{
    "source": {
        "remote": {
            "host": "'${oldClusterHost}'",
            "username": "'${oldClusterUser}'",
            "password": "'${oldClusterPass}'"
        },
        "index": "'${indexName}'",
        "query": {
            "match_all": {}
        }
    },
    "dest": {
       "index": "'${indexName}'"
    }
}'

資料量大、無刪除操作、有更新時間

資料量較大且無刪除操作時,可以使用滾動遷移的方式,減少停止寫服務的時間。滾動遷移需要有一個類似於更新時間的欄位代表新資料的寫時序。在資料移轉完成後,先停止業務寫操作,待reindex使用最近一次更新時間快速執行一次更新後,將讀寫業務切換到阿里雲ES叢集。

#!/bin/bash
# file: circleReindex.sh
# CONTROLLING STARTUP:
# 這是通過reindex操作遠程重建索引的指令碼,要求:
# 1. Elasticsearch叢集已經建立完索引,或者支援自動建立和動態映射。
# 2. Elasticsearch叢集必須在yml裡配置IP白名單,例如reindex.remote.whitelist: 172.16.**.**:9200。
# 3. host必須是[scheme]://[host]:[port]。
USAGE="Usage: sh circleReindex.sh <count>
       count: 執行次數,多次(負數為迴圈)增量執行或者單次執行
Example:
        sh circleReindex.sh 1
        sh circleReindex.sh 5
        sh circleReindex.sh -1"
indexName="您的索引名"
newClusterUser="Elasticsearch叢集使用者名稱"
newClusterPass="Elasticsearch叢集密碼"
oldClusterUser="自建Elasticsearch叢集使用者名稱"
oldClusterPass="自建Elasticsearch叢集密碼"
## http://myescluster.com
newClusterHost="Elasticsearch叢集host"
# 自建Elasticsearch叢集host必須是[scheme]://[host]:[port],例如http://10.37.*.*:9200。
oldClusterHost="自建Elasticsearch叢集host"
timeField="更新時間欄位"
reindexTimes=0
lastTimestamp=0
curTimestamp=`date +%s`
hasError=false
function reIndexOP() {
    reindexTimes=$[${reindexTimes} + 1]
    curTimestamp=`date +%s`
    ret=`curl -u ${newClusterUser}:${newClusterPass} -XPOST "${newClusterHost}/_reindex?pretty" -H "Content-Type: application/json" -d '{
        "source": {
            "remote": {
                "host": "'${oldClusterHost}'",
                "username": "'${oldClusterUser}'",
                "password": "'${oldClusterPass}'"
            },
            "index": "'${indexName}'",
            "query": {
                "range" : {
                    "'${timeField}'" : {
                        "gte" : '${lastTimestamp}',
                        "lt" : '${curTimestamp}'
                    }
                }
            }
        },
        "dest": {
            "index": "'${indexName}'"
        }
    }'`
    lastTimestamp=${curTimestamp}
    echo "第${reindexTimes}次reIndex,本次更新截止時間 ${lastTimestamp} 結果:${ret}"
    if [[ ${ret} == *error* ]]; then
        hasError=true
        echo "本次執行異常,中斷後續執行操作~~,請檢查"
    fi
}
function start() {
    ## 負數就不停迴圈執行
    if [[ $1 -lt 0 ]]; then
        while :
        do
            reIndexOP
        done
    elif [[ $1 -gt 0 ]]; then
        k=0
        while [[ k -lt $1 ]] && [[ ${hasError} == false ]]; do
            reIndexOP
            let ++k
        done
    fi
}
## main 
if [ $# -lt 1 ]; then
    echo "$USAGE"
    exit 1
fi
echo "開始執行索引 ${indexName} 的 ReIndex操作"
start $1
echo "總共執行 ${reindexTimes} 次 reIndex 操作"

資料量大、無刪除操作、無更新時間

當資料量較大,且索引的Mapping中沒有定義更新時間的欄位時,需要由上遊業務修改代碼添加更新時間的欄位。添加完成後可以先將歷史資料移轉完,然後再使用上述第二種方案操作。

#!/bin/bash
# file:miss.sh
indexName="您的索引名"
newClusterUser="Elasticsearch叢集使用者名稱"
newClusterPass="Elasticsearch叢集密碼"
newClusterHost="Elasticsearch叢集host"
oldClusterUser="自建Elasticsearch叢集使用者名稱"
oldClusterPass="自建Elasticsearch叢集密碼"
# 自建Elasticsearch叢集host必須是[scheme]://[host]:[port],例如http://10.37.*.*:9200
oldClusterHost="自建Elasticsearch叢集host"
timeField="updatetime"
curl -u ${newClusterUser}:${newClusterPass} -XPOST "http://${newClusterHost}/_reindex?pretty" -H "Content-Type: application/json" -d '{
    "source": {
        "remote": {
            "host": "'${oldClusterHost}'",
            "username": "'${oldClusterUser}'",
            "password": "'${oldClusterPass}'"
        },
        "index": "'${indexName}'",
        "query": {
            "bool": {
                "must_not": {
                    "exists": {
                        "field": "'${timeField}'"
                    }
                }
            }
        }
    },
    "dest": {
       "index": "'${indexName}'"
    }
}'

常見問題

  • 問題:執行curl命令時,提示{"error":"Content-Type header [application/x-www-form-urlencoded] is not supported","status":406}

    解決方案:在curl命令中,添加-H "Content-Type: application/json"指令碼重試。

      // 擷取自建Elasticsearch叢集中所有索引資訊,如果沒有許可權可去掉"-u user:pass"參數,oldClusterHost為自建Elasticsearch叢集的host,注意替換。
      curl -u user:pass -XGET http://oldClusterHost/_cat/indices | awk '{print $3}'
      // 參考上面返回的索引列表,擷取需要遷移的指定使用者索引的setting和mapping,注意替換indexName為要查詢的使用者索引名。
      curl -u user:pass -XGET http://oldClusterHost/indexName/_settings,_mapping?pretty=true
      // 參考上面擷取到的對應索引的_settings和_mapping資訊,在Elasticsearch叢集中建立對應索引,索引複本數可以先設定為0,用於加快資料同步速度,資料移轉完成後再重設副本數為1。
      //其中newClusterHost是Elasticsearch叢集的host,testindex是已經建立的索引名,testtype是對應索引的type。
      curl -u user:pass -XPUT http://<newClusterHost>/<testindex> -d '{
        "testindex" : {
            "settings" : {
                "number_of_shards" : "5", //假設自建Elasticsearch叢集中對應索引的shard數是5個。
                "number_of_replicas" : "0" //設定索引複本為0。
              }
            },
            "mappings" : { //假設自建Elasticsearch叢集中對應索引的mappings配置如下。
                "testtype" : {
                    "properties" : {
                        "uid" : {
                            "type" : "long"
                        },
                        "name" : {
                            "type" : "text"
                        },
                        "create_time" : {
                          "type" : "long"
                        }
                    }
               }
           }
       }
    }'
  • 問題:單索引資料量比較大,資料同步速度比較慢時,如何處理?

    解決方案:

    • 由於reindex功能的底層實現原理是通過scroll方式實現的,所以您可以適當調大scroll size的大小或配置scroll slice,藉助scroll並行化機制提升效率。詳情請參見reindex API

    • 如果源端資料量較大,建議採用OSS快照方式。詳情請參見通過OSS將自建Elasticsearch資料移轉至阿里雲

    • 如果單索引資料量比較大,可以在遷移前將目標索引的副本數設定為0,重新整理時間設定為-1,以加快資料同步速度。待資料移轉完成後,再修改回來。

      // 遷移索引資料前可以先將索引複本數設為0,不重新整理,用於加快資料移轉速度。
      curl -u user:password -XPUT 'http://<host:port>/indexName/_settings' -d' {
              "number_of_replicas" : 0,
              "refresh_interval" : "-1"
      }'
      // 索引資料移轉完成後,可以重設索引複本數為1,重新整理時間1s(1s是預設值)。
      curl -u user:password -XPUT 'http://<host:port>/indexName/_settings' -d' {
              "number_of_replicas" : 1,
              "refresh_interval" : "1s"
      }'