在進行Elasticsearch(ES)叢集間資料移轉時,例如從自建ES遷移到阿里雲ES,必須確保源端和目標端的索引結構保持一致,避免自動對應可能引入的資料丟失、格式錯誤及查詢效能下降等問題,因此一般要求在資料移轉前手動建立目標索引,預先定義索引的映射和設定。本文介紹運用Python指令碼預定義ES叢集索引的映射和設定,並實現索引結構、索引模板和索引生命週期管理(ILM)在目的地組群的寫入。
前提條件
已建立兩個阿里雲ES執行個體。具體操作,請參見建立Elasticsearch執行個體。
本文以源端和目標端的ES版本均為7.10為例。
本文以ES 7.10版本為例提供相關的Python指令碼樣本。高版本ES在mapping構造上可能存在差異,其他版本需結合情境進行調整,如低版本多type結構在高版本已不支援,無法通過文檔樣本進行建立。
已建立ECS執行個體,並配置python環境,具體操作,請參考Linux系統執行個體快速入門。
本文以Python 3.6.8為例,其他版本結合對應版本介面的Requests模組進行調整。
已打通源端ES和目標端ES與ECS的網路,將ECS公網或私網IP地址,分別配置到源端ES和目標端ES的公網地址訪問白名單或VPC私網訪問白名單中。
生產環境注重資料安全性,建議您通過私網連通ECS與源端ES和目標端ES。
同步索引結構
同步索引結構(mappings)和設定(settings)下的主副本配置。
準備測試資料。
在源端ES中執行以下命令,建立索引。
PUT /product_info { "settings": { "number_of_shards": 3, "number_of_replicas": 1 }, "mappings": { "properties": { "productName": { "type": "text" }, "annual_rate":{ "type":"keyword" }, "describe": { "type": "text" } } } }
在ECS中執行以下指令碼,同步索引結構和設定。
import requests from requests.auth import HTTPBasicAuth # 配置資訊,按照實際環境調整 config = { # 源叢集host 'old_cluster_host': 'es-cn-27a3jyyd2000770dd.public.elasticsearch.aliyuncs.com:9200', # 源叢集使用者名稱 'old_cluster_user': 'yourusername', # 源叢集密碼 'old_cluster_password': 'yourpassward', # 源叢集http協議,可選 http/https 'old_cluster_protocol': 'http', # 目的地組群host,可在Elasticsearch執行個體的基本資料頁面擷取。 'new_cluster_host': 'es-cn-o493kbics000cw4pt.elasticsearch.aliyuncs.com:9200', # 目的地組群使用者名稱 'new_cluster_user': 'yourusername', # 目的地組群密碼 'new_cluster_password': 'yourpassward', # 目的地組群http協議,可選 http/https 'new_cluster_protocol': 'http', # 目的地組群預設副本數 'default_replicas': 1, } # 通用的 HTTP 要求函數 def send_http_request(method, host, endpoint, username="", password="", params=None, json_body=None, protocol='http'): url = f"{protocol}://{host}{endpoint}" auth = (username, password) if username and password else None headers = {'Content-Type': 'application/json'} if method != 'GET' else None try: response = requests.request(method, url, auth=auth, params=params, json=json_body, headers=headers) response.raise_for_status() return response.json() except requests.HTTPError as e: # 列印錯誤資訊 print(f"HTTP Error: {e.response.status_code} for {url}") print(e.response.text) except ValueError as e: # 如果響應不是 JSON 格式,列印錯誤並返回原始內容 print("Invalid JSON response:") print(response.text) raise # 擷取所有索引列表 def get_indices(): endpoint = "/_cat/indices?format=json" indices_result = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol']) index_list = [index['index'] for index in indices_result if index['status'] == 'open'] return index_list # 擷取索引的設定 def get_index_settings(index): endpoint = f"/{index}/_settings" index_settings = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol']) settings = index_settings[index]['settings']['index'] shards_replicas_settings = { 'number_of_shards': settings.get('number_of_shards'), 'number_of_replicas': config['default_replicas'] } return {'settings': shards_replicas_settings} # 擷取索引的映射 def get_index_mapping(index): endpoint = f"/{index}/_mapping" index_mapping = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol']) return {'mappings': index_mapping[index]['mappings']} # 建立新索引 def create_index(old_index_name, new_index_name=""): if not new_index_name: new_index_name = old_index_name settings = get_index_settings(old_index_name) mappings = get_index_mapping(old_index_name) body = {**settings, **mappings} endpoint = f"/{new_index_name}" create_result = send_http_request('PUT', config['new_cluster_host'], endpoint, config['new_cluster_user'], config['new_cluster_password'], protocol=config['new_cluster_protocol'], json_body=body) print(f"Index {new_index_name} created with result: {create_result}") # 主函數 def main(): index_list = get_indices() for index in index_list: if not index.startswith("."): # 忽略系統索引 create_index(index) if __name__ == '__main__': main()
驗證結果。
在目標端ES中執行以下命令,查看同步成功的索引。
GET _cat/indices/product_info
同步索引模板
準備測試資料。
在源端ES中執行以下命令,建立索引模板。
PUT _template/product { "index_patterns": ["product_*"], "settings": { "number_of_shards": 3, "number_of_replicas": 1 }, "mappings": { "properties": { "productName": { "type": "text" }, "annual_rate":{ "type":"keyword" }, "describe": { "type": "text" } } } }
在ECS中執行以下指令碼,同步索引模板。
import requests from requests.auth import HTTPBasicAuth # 配置資訊,按照實際環境調整 config = { # 源叢集host 'old_cluster_host': 'es-cn-27a3jyyd2000770dd.public.elasticsearch.aliyuncs.com:9200', # 源叢集使用者名稱 'old_cluster_user': 'yourusername', # 源叢集密碼 'old_cluster_password': 'yourpassward', # 源叢集http協議,可選 http/https 'old_cluster_protocol': 'http', # 目的地組群host,可在Elasticsearch執行個體的基本資料頁面擷取。 'new_cluster_host': 'es-cn-o493kbics000cw4pt.elasticsearch.aliyuncs.com:9200', # 目的地組群使用者名稱 'new_cluster_user': 'yourusername', # 目的地組群密碼 'new_cluster_password': 'yourpassward', # 目的地組群http協議,可選 http/https 'new_cluster_protocol': 'http', # 目的地組群預設副本數 'default_replicas': 1, } # 通用的 HTTP 要求函數 def send_http_request(method, host, endpoint, username="", password="", params=None, json_body=None, protocol='http'): url = f"{protocol}://{host}{endpoint}" auth = (username, password) if username and password else None headers = {'Content-Type': 'application/json'} if method != 'GET' else None try: response = requests.request(method, url, auth=auth, params=params, json=json_body, headers=headers) response.raise_for_status() return response.json() except requests.HTTPError as e: # 列印錯誤資訊 print(f"HTTP Error: {e.response.status_code} for {url}") print(e.response.text) except ValueError as e: # 如果響應不是 JSON 格式,列印錯誤並返回原始內容 print("Invalid JSON response:") print(response.text) raise # 擷取源叢集的所有索引模板 def get_index_templates(): endpoint = "/_template" templates_result = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol']) return templates_result # 建立目的地組群的索引模板 def create_index_template(template_name, template_body): endpoint = f"/_template/{template_name}" create_result = send_http_request('PUT', config['new_cluster_host'], endpoint, config['new_cluster_user'], config['new_cluster_password'], protocol=config['new_cluster_protocol'], json_body=template_body) print(f"Template {template_name} created with result: {create_result}") # 主函數 def main(): # 同步索引模板 templates = get_index_templates() for template_name, template_body in templates.items(): create_index_template(template_name, template_body) if __name__ == '__main__': main()
驗證結果。
在目標端ES中執行以下命令,查詢目標模板的資訊。
GET _template/product
同步索引生命週期管理(ILM)
準備測試資料。
在源端ES中執行以下命令,建立索引生命週期管理(ILM)。
PUT _ilm/policy/product { "policy": { "phases": { "hot": { "actions": { "rollover": { "max_size": "1GB", "max_age": "1d", "max_docs": 1000 } } }, "delete": { "min_age": "2h", "actions": { "delete": {} } } } } }
在ECS中執行以下指令碼,同步索引生命週期管理(ILM)。
import requests from requests.auth import HTTPBasicAuth # 配置資訊,按照實際環境調整 config = { # 源叢集host 'old_cluster_host': 'es-cn-27a3jyyd2000770dd.public.elasticsearch.aliyuncs.com:9200', # 源叢集使用者名稱 'old_cluster_user': 'yourusername', # 源叢集密碼 'old_cluster_password': 'yourpassward', # 源叢集http協議,可選 http/https 'old_cluster_protocol': 'http', # 目的地組群host,可在Elasticsearch執行個體的基本資料頁面擷取。 'new_cluster_host': 'es-cn-o493kbics000cw4pt.elasticsearch.aliyuncs.com:9200', # 目的地組群使用者名稱 'new_cluster_user': 'yourusername', # 目的地組群密碼 'new_cluster_password': 'yourpassward', # 目的地組群http協議,可選 http/https 'new_cluster_protocol': 'http', # 目的地組群預設副本數 'default_replicas': 1, } # 通用的 HTTP 要求函數 def send_http_request(method, host, endpoint, username="", password="", params=None, json_body=None, protocol='http'): url = f"{protocol}://{host}{endpoint}" auth = (username, password) if username and password else None headers = {'Content-Type': 'application/json'} if method != 'GET' else None try: response = requests.request(method, url, auth=auth, params=params, json=json_body, headers=headers) response.raise_for_status() return response.json() except requests.HTTPError as e: # 列印錯誤資訊 print(f"HTTP Error: {e.response.status_code} for {url}") print(e.response.text) except ValueError as e: # 如果響應不是 JSON 格式,列印錯誤並返回原始內容 print("Invalid JSON response:") print(response.text) raise # 擷取源叢集的所有索引ILM def get_ilm_polices(): endpoint = "/_ilm/policy" templates_result = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol']) return templates_result # 建立目的地組群的索引ILM def create_ilm_policy(policy_name, policy_body): policy_body.pop('version', None) policy_body.pop('modified_date', None) policy_body.pop('modified_date_string', None) endpoint = f"/_ilm/policy/{policy_name}" create_result = send_http_request('PUT', config['new_cluster_host'], endpoint, config['new_cluster_user'], config['new_cluster_password'], protocol=config['new_cluster_protocol'], json_body=policy_body) print(f"Policy {policy_name} created with result: {create_result}") # 主函數 def main(): # 同步索引ILM policies = get_ilm_polices() for policy_name, policy_body in policies.items(): create_ilm_policy(policy_name, policy_body) if __name__ == '__main__': main()
驗證結果。
在目標端ES中執行以下命令,查詢索引生命週期管理(ILM)的資訊。
GET _ilm/policy/product