本文由簡體中文內容自動轉碼而成。阿里雲不保證此自動轉碼的準確性、完整性及時效性。本文内容請以簡體中文版本為準。

通過指令碼實現索引結構、模板或ILM的同步

更新時間:2024-08-07 19:29

在進行Elasticsearch(ES)叢集間資料移轉時,例如從自建ES遷移到阿里雲ES,必須確保源端和目標端的索引結構保持一致,避免自動對應可能引入的資料丟失、格式錯誤及查詢效能下降等問題,因此一般要求在資料移轉前手動建立目標索引,預先定義索引的映射和設定。本文介紹運用Python指令碼預定義ES叢集索引的映射和設定,並實現索引結構、索引模板和索引生命週期管理(ILM)在目的地組群的寫入。

前提條件

  1. 已建立兩個阿里雲ES執行個體。具體操作,請參見建立Elasticsearch執行個體

    本文以源端和目標端的ES版本均為7.10為例。

    說明

    本文以ES 7.10版本為例提供相關的Python指令碼樣本。高版本ES在mapping構造上可能存在差異,其他版本需結合情境進行調整,如低版本多type結構在高版本已不支援,無法通過文檔樣本進行建立。

  2. 已建立ECS執行個體,並配置python環境,具體操作,請參考Linux系統執行個體快速入門

    本文以Python 3.6.8為例,其他版本結合對應版本介面的Requests模組進行調整。

  3. 已打通源端ES和目標端ES與ECS的網路,將ECS公網或私網IP地址,分別配置到源端ES和目標端ES的公網地址訪問白名單或VPC私網訪問白名單中。

    說明

    生產環境注重資料安全性,建議您通過私網連通ECS與源端ES和目標端ES。

同步索引結構

同步索引結構(mappings)和設定(settings)下的主副本配置。

  1. 準備測試資料。

    在源端ES中執行以下命令,建立索引。

    PUT /product_info
    {
      "settings": {
        "number_of_shards": 3,
        "number_of_replicas": 1
      },
      "mappings": {
          "properties": {
            "productName": {
              "type": "text"
            },
            "annual_rate":{
              "type":"keyword"
            },
            "describe": {
              "type": "text"
            }
        }
      }
    }
  2. 在ECS中執行以下指令碼,同步索引結構和設定。

    import requests
    from requests.auth import HTTPBasicAuth
    
    # 配置資訊,按照實際環境調整
    config = {
        # 源叢集host
        'old_cluster_host': 'es-cn-27a3jyyd2000770dd.public.elasticsearch.aliyuncs.com:9200',
        # 源叢集使用者名稱
        'old_cluster_user': 'yourusername',
        # 源叢集密碼
        'old_cluster_password': 'yourpassward',
        # 源叢集http協議,可選 http/https
        'old_cluster_protocol': 'http',
        # 目的地組群host,可在Elasticsearch執行個體的基本資料頁面擷取。
        'new_cluster_host': 'es-cn-o493kbics000cw4pt.elasticsearch.aliyuncs.com:9200',
        # 目的地組群使用者名稱
        'new_cluster_user': 'yourusername',
        # 目的地組群密碼
        'new_cluster_password': 'yourpassward',
        # 目的地組群http協議,可選 http/https
        'new_cluster_protocol': 'http',
        # 目的地組群預設副本數
        'default_replicas': 1,
    }
    
    # 通用的 HTTP 要求函數
    def send_http_request(method, host, endpoint, username="", password="", params=None, json_body=None, protocol='http'):
        url = f"{protocol}://{host}{endpoint}"
        auth = (username, password) if username and password else None
        headers = {'Content-Type': 'application/json'} if method != 'GET' else None
        try:
            response = requests.request(method, url, auth=auth, params=params, json=json_body, headers=headers)
            response.raise_for_status()
            return response.json()
        except requests.HTTPError as e:
            # 列印錯誤資訊
            print(f"HTTP Error: {e.response.status_code} for {url}")
            print(e.response.text)
        except ValueError as e:
            # 如果響應不是 JSON 格式,列印錯誤並返回原始內容
            print("Invalid JSON response:")
            print(response.text)
            raise
    
    # 擷取所有索引列表
    def get_indices():
        endpoint = "/_cat/indices?format=json"
        indices_result = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol'])
        index_list = [index['index'] for index in indices_result if index['status'] == 'open']
        return index_list
    
    # 擷取索引的設定
    def get_index_settings(index):
        endpoint = f"/{index}/_settings"
        index_settings = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol'])
        settings = index_settings[index]['settings']['index']
        shards_replicas_settings = {
            'number_of_shards': settings.get('number_of_shards'),
            'number_of_replicas': config['default_replicas']
        }
        return {'settings': shards_replicas_settings}
    
    # 擷取索引的映射
    def get_index_mapping(index):
        endpoint = f"/{index}/_mapping"
        index_mapping = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol'])
        return {'mappings': index_mapping[index]['mappings']}
    
    # 建立新索引
    def create_index(old_index_name, new_index_name=""):
        if not new_index_name:
            new_index_name = old_index_name
    
        settings = get_index_settings(old_index_name)
        mappings = get_index_mapping(old_index_name)
        body = {**settings, **mappings}
    
        endpoint = f"/{new_index_name}"
        create_result = send_http_request('PUT', config['new_cluster_host'], endpoint, config['new_cluster_user'], config['new_cluster_password'], protocol=config['new_cluster_protocol'], json_body=body)
    
        print(f"Index {new_index_name} created with result: {create_result}")
    
    # 主函數
    def main():
        index_list = get_indices()
        for index in index_list:
            if not index.startswith("."):  # 忽略系統索引
                create_index(index)
    
    
    if __name__ == '__main__':
        main()
    
  3. 驗證結果。

    在目標端ES中執行以下命令,查看同步成功的索引。

    GET _cat/indices/product_info

同步索引模板

  1. 準備測試資料。

    在源端ES中執行以下命令,建立索引模板。

    PUT _template/product
    {
      "index_patterns": ["product_*"],
      "settings": {
        "number_of_shards": 3,
        "number_of_replicas": 1
      },
      "mappings": {
        "properties": {
            "productName": {
              "type": "text"
            },
            "annual_rate":{
              "type":"keyword"
            },
            "describe": {
              "type": "text"
            }
        }
      }
    }
  2. 在ECS中執行以下指令碼,同步索引模板。

    import requests
    from requests.auth import HTTPBasicAuth
    
    # 配置資訊,按照實際環境調整
    config = {
        # 源叢集host
        'old_cluster_host': 'es-cn-27a3jyyd2000770dd.public.elasticsearch.aliyuncs.com:9200',
        # 源叢集使用者名稱
        'old_cluster_user': 'yourusername',
        # 源叢集密碼
        'old_cluster_password': 'yourpassward',
        # 源叢集http協議,可選 http/https
        'old_cluster_protocol': 'http',
        # 目的地組群host,可在Elasticsearch執行個體的基本資料頁面擷取。
        'new_cluster_host': 'es-cn-o493kbics000cw4pt.elasticsearch.aliyuncs.com:9200',
        # 目的地組群使用者名稱
        'new_cluster_user': 'yourusername',
        # 目的地組群密碼
        'new_cluster_password': 'yourpassward',
        # 目的地組群http協議,可選 http/https
        'new_cluster_protocol': 'http',
        # 目的地組群預設副本數
        'default_replicas': 1,
    }
    
    # 通用的 HTTP 要求函數
    def send_http_request(method, host, endpoint, username="", password="", params=None, json_body=None, protocol='http'):
        url = f"{protocol}://{host}{endpoint}"
        auth = (username, password) if username and password else None
        headers = {'Content-Type': 'application/json'} if method != 'GET' else None
        try:
            response = requests.request(method, url, auth=auth, params=params, json=json_body, headers=headers)
            response.raise_for_status()
            return response.json()
        except requests.HTTPError as e:
            # 列印錯誤資訊
            print(f"HTTP Error: {e.response.status_code} for {url}")
            print(e.response.text)
        except ValueError as e:
            # 如果響應不是 JSON 格式,列印錯誤並返回原始內容
            print("Invalid JSON response:")
            print(response.text)
            raise
    
    # 擷取源叢集的所有索引模板
    def get_index_templates():
        endpoint = "/_template"
        templates_result = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol'])
        return templates_result
    
    # 建立目的地組群的索引模板
    def create_index_template(template_name, template_body):
        endpoint = f"/_template/{template_name}"
        create_result = send_http_request('PUT', config['new_cluster_host'], endpoint, config['new_cluster_user'], config['new_cluster_password'], protocol=config['new_cluster_protocol'], json_body=template_body)
        print(f"Template {template_name} created with result: {create_result}")
    
    # 主函數
    def main():
    
        # 同步索引模板
        templates = get_index_templates()
        for template_name, template_body in templates.items():
            create_index_template(template_name, template_body)
    
    if __name__ == '__main__':
        main()
  3. 驗證結果。

    在目標端ES中執行以下命令,查詢目標模板的資訊。

    GET _template/product

同步索引生命週期管理(ILM)

  1. 準備測試資料。

    在源端ES中執行以下命令,建立索引生命週期管理(ILM)。

    PUT _ilm/policy/product
    {
      "policy": {
        "phases": {
          "hot": {
            "actions": {
              "rollover": {
                "max_size": "1GB",
                "max_age": "1d",
                "max_docs": 1000
              }
            }
          },
          "delete": {
            "min_age": "2h",
            "actions": {
              "delete": {}
            }
          }
        }
      }
    }
  1. 在ECS中執行以下指令碼,同步索引生命週期管理(ILM)。

    import requests
    from requests.auth import HTTPBasicAuth
    
    # 配置資訊,按照實際環境調整
    config = {
        # 源叢集host
        'old_cluster_host': 'es-cn-27a3jyyd2000770dd.public.elasticsearch.aliyuncs.com:9200',
        # 源叢集使用者名稱
        'old_cluster_user': 'yourusername',
        # 源叢集密碼
        'old_cluster_password': 'yourpassward',
        # 源叢集http協議,可選 http/https
        'old_cluster_protocol': 'http',
        # 目的地組群host,可在Elasticsearch執行個體的基本資料頁面擷取。
        'new_cluster_host': 'es-cn-o493kbics000cw4pt.elasticsearch.aliyuncs.com:9200',
        # 目的地組群使用者名稱
        'new_cluster_user': 'yourusername',
        # 目的地組群密碼
        'new_cluster_password': 'yourpassward',
        # 目的地組群http協議,可選 http/https
        'new_cluster_protocol': 'http',
        # 目的地組群預設副本數
        'default_replicas': 1,
    }
    
    # 通用的 HTTP 要求函數
    def send_http_request(method, host, endpoint, username="", password="", params=None, json_body=None, protocol='http'):
        url = f"{protocol}://{host}{endpoint}"
        auth = (username, password) if username and password else None
        headers = {'Content-Type': 'application/json'} if method != 'GET' else None
        try:
            response = requests.request(method, url, auth=auth, params=params, json=json_body, headers=headers)
            response.raise_for_status()
            return response.json()
        except requests.HTTPError as e:
            # 列印錯誤資訊
            print(f"HTTP Error: {e.response.status_code} for {url}")
            print(e.response.text)
        except ValueError as e:
            # 如果響應不是 JSON 格式,列印錯誤並返回原始內容
            print("Invalid JSON response:")
            print(response.text)
            raise
    
    # 擷取源叢集的所有索引ILM
    def get_ilm_polices():
        endpoint = "/_ilm/policy"
        templates_result = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol'])
        return templates_result
    
    # 建立目的地組群的索引ILM
    def create_ilm_policy(policy_name, policy_body):
        policy_body.pop('version', None)
        policy_body.pop('modified_date', None)
        policy_body.pop('modified_date_string', None)
    
        endpoint = f"/_ilm/policy/{policy_name}"
        create_result = send_http_request('PUT', config['new_cluster_host'], endpoint, config['new_cluster_user'], config['new_cluster_password'], protocol=config['new_cluster_protocol'], json_body=policy_body)
        print(f"Policy {policy_name} created with result: {create_result}")
    
    # 主函數
    def main():
    
        # 同步索引ILM
        policies = get_ilm_polices()
        for policy_name, policy_body in policies.items():
            create_ilm_policy(policy_name, policy_body)
    if __name__ == '__main__':
        main()
    
  2. 驗證結果。

    在目標端ES中執行以下命令,查詢索引生命週期管理(ILM)的資訊。

    GET _ilm/policy/product

  • 本頁導讀 (1, M)
  • 前提條件
  • 同步索引結構
  • 同步索引模板
  • 同步索引生命週期管理(ILM)
文檔反饋