All Products
Search
Document Center

Elasticsearch:Migrate data from self-managed Elasticsearch to Alibaba Cloud Elasticsearch using Logstash

Last Updated:Feb 28, 2026

Deploy a self-managed Logstash instance on an Elastic Compute Service (ECS) instance to migrate full or incremental data from a self-managed Elasticsearch cluster to an Alibaba Cloud Elasticsearch cluster.

Choose the right migration method

MethodBest forTrade-offs
LogstashIncremental sync, continuous replication, selective index migrationRequires a running Logstash instance; pipeline processing overhead
SnapshotsOne-time full migration of large datasetsRequires shared storage; restores complete indices (no document-level incremental sync)
Reindex APISmall datasets or cross-version migrationsSlower for large volumes; network-dependent

Prerequisites

Before you begin, make sure that you have:

  • A self-managed Elasticsearch cluster (source) with data to migrate

  • An Alibaba Cloud Elasticsearch cluster (destination) created and running. For more information, see Create an Alibaba Cloud Elasticsearch cluster

  • An ECS instance in the same Virtual Private Cloud (VPC) as the destination cluster, with a public IPv4 address assigned

  • Self-managed Logstash installed on the ECS instance

  • Network connectivity verified between Logstash and both Elasticsearch clusters

  • A security group inbound rule that allows traffic on port 5601 (for Kibana access)

Important

The ECS instance must reside in the same VPC as the Alibaba Cloud Elasticsearch cluster. Logstash must be able to connect to both the source and destination clusters.

Step 1: Set up the environment

This section walks through deploying a self-managed Elasticsearch cluster, Kibana, and Logstash on a single ECS instance. If you already have these components running, skip to Step 2.

Example environment specifications

The following tables describe the example configurations used in this document.

Alibaba Cloud Elasticsearch cluster

ConfigurationValue
RegionChina (Hangzhou)
Edition and versionV7.10.0, Standard Edition
Zones and nodesThree zones, three data nodes
Single node specs4 vCPUs, 16 GiB memory, Enhanced SSD (ESSD) with 100 GiB storage

ECS instance

ConfigurationValue
RegionChina (Hangzhou)
Specs4 vCPUs, 16 GiB memory
ImageCentOS 7.9 64-bit (public image)
System diskESSD, 100 GiB
NetworkSame VPC as the Alibaba Cloud Elasticsearch cluster. Assign Public IPv4 Address selected. Pay-by-traffic billing, 100 Mbit/s peak bandwidth
Security groupInbound rule allowing traffic on port 5601 (Kibana). Add your client IP address as the authorization object
Important
  • If your client is on a home or office LAN, add the public egress IP address, not the local IP address. Visit cip.cc to find your public IP.

  • Adding 0.0.0.0/0 as an authorization object allows access from all public IPv4 addresses. Avoid this in production environments due to security risks.

For more information about creating an ECS instance, see Create an instance on the Custom Launch tab.

Create an Alibaba Cloud Elasticsearch cluster

Create a cluster with the specifications listed above or adjust them to your requirements. For detailed instructions, see Create an Alibaba Cloud Elasticsearch cluster.

Deploy self-managed Elasticsearch

This example deploys a single-node Elasticsearch 7.6.2 cluster on the ECS instance.

  1. Connect to the ECS instance. For more information, see Connect to a Linux instance by using a password or key.

  2. Create a user named elastic as the root user:

       useradd elastic
  3. Set the password for the elastic user: Enter and confirm the password when prompted.

       passwd elastic
  4. Switch to the elastic user:

       su -l elastic
  5. Download and extract the Elasticsearch package:

       wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.6.2-linux-x86_64.tar.gz
       tar -zvxf elasticsearch-7.6.2-linux-x86_64.tar.gz
  6. Start Elasticsearch:

       cd elasticsearch-7.6.2
       ./bin/elasticsearch -d
  7. Verify that Elasticsearch is running: A successful response contains the version number and the message "You Know, for Search".

       cd ~
       curl localhost:9200

Deploy Kibana and add test data

This example deploys Kibana 7.6.2 on the same ECS instance.

Run Kibana as a regular user, not root.
  1. Download and extract the Kibana package:

       wget https://artifacts.elastic.co/downloads/kibana/kibana-7.6.2-linux-x86_64.tar.gz
       tar -zvxf kibana-7.6.2-linux-x86_64.tar.gz
  2. Configure Kibana to accept connections from all IP addresses. Open the kibana.yml file in the config/ directory and add the following line: Add:

       cd kibana-7.6.2-linux-x86_64
       vi config/kibana.yml
       server.host: "0.0.0.0"
  3. Start Kibana:

       sudo nohup ./bin/kibana &
  4. Open the Kibana console in your browser:

       http://<ECS-public-IP>:5601/app/kibana#/home
  5. On the Kibana home page, click Try our sample data. On the Sample data tab, click Add data in the Sample web logs card to load test data.

Deploy Logstash

This example deploys Logstash 7.10.0 on the same ECS instance.

Run Logstash as a regular user, not root.
  1. Download and extract the Logstash package:

       cd ~
       wget https://artifacts.elastic.co/downloads/logstash/logstash-7.10.0-linux-x86_64.tar.gz
       tar -zvxf logstash-7.10.0-linux-x86_64.tar.gz
  2. Increase the JVM heap memory. The default heap size is 1 GiB. Set it to a value appropriate for your ECS instance specs to speed up data migration: Set the heap size (for example, 8 GiB for a 16 GiB instance):

       cd logstash-7.10.0
       sudo vi config/jvm.options
       -Xms8g
       -Xmx8g
  3. Increase the batch size. Edit the pipelines.yml configuration file and change pipeline.batch.size from the default 125 to 5000. This allows Logstash to write 5 MiB to 15 MiB of data per batch, which speeds up migration:

       vi config/pipelines.yml
  4. Verify that Logstash runs correctly: Type Hello world! and press Enter. If Logstash is working, it outputs Hello world! back to the terminal. Press Ctrl+C to stop.

       bin/logstash -e 'input { stdin { } } output { stdout {} }'

Step 2: (Optional) Migrate index metadata

When Logstash migrates data, the Auto Indexing feature on the Alibaba Cloud Elasticsearch cluster automatically creates indices. However, auto-created indices may have different settings and mappings than the source indices. To preserve the original index structure, create the indices manually on the destination cluster before migrating data.

The following Python script reads index settings and mappings from the source cluster and creates matching indices on the destination cluster.

  1. Connect to the ECS instance as a regular user.

  2. Create a Python script file:

       sudo vi indiceCreate.py
  3. Paste the following script. Replace the host, username, and password values for both the source and destination clusters: Key behaviors of this script:

    • Preserves the number of primary shards from the source index.

    • Sets the number of replica shards to 0 (DEFAULT_REPLICAS = 0) for faster initial indexing. Increase replicas after migration.

    • Skips system indices (names starting with .). Recreate these manually if needed.

       #!/usr/bin/python
       # -*- coding: UTF-8 -*-
       # File name: indiceCreate.py
       import sys
       import base64
       import time
       import httplib
       import json
       ## Source Elasticsearch cluster
       oldClusterHost = "<source-es-host>:9200"
       ## Source cluster username (leave blank if not required)
       oldClusterUserName = "elastic"
       ## Source cluster password (leave blank if not required)
       oldClusterPassword = "<source-es-password>"
       ## Destination Elasticsearch cluster (find on the Basic Information page)
       newClusterHost = "<destination-es-endpoint>:9200"
       ## Destination cluster username
       newClusterUser = "elastic"
       ## Destination cluster password
       newClusterPassword = "<destination-es-password>"
       DEFAULT_REPLICAS = 0
       def httpRequest(method, host, endpoint, params="", username="", password=""):
           conn = httplib.HTTPConnection(host)
           headers = {}
           if (username != "") :
               'Hello {name}, your age is {age} !'.format(name = 'Tom', age = '20')
               base64string = base64.encodestring('{username}:{password}'.format(username = username, password = password)).replace('\n', '')
               headers["Authorization"] = "Basic %s" % base64string;
           if "GET" == method:
               headers["Content-Type"] = "application/x-www-form-urlencoded"
               conn.request(method=method, url=endpoint, headers=headers)
           else :
               headers["Content-Type"] = "application/json"
               conn.request(method=method, url=endpoint, body=params, headers=headers)
           response = conn.getresponse()
           res = response.read()
           return res
       def httpGet(host, endpoint, username="", password=""):
           return httpRequest("GET", host, endpoint, "", username, password)
       def httpPost(host, endpoint, params, username="", password=""):
           return httpRequest("POST", host, endpoint, params, username, password)
       def httpPut(host, endpoint, params, username="", password=""):
           return httpRequest("PUT", host, endpoint, params, username, password)
       def getIndices(host, username="", password=""):
           endpoint = "/_cat/indices"
           indicesResult = httpGet(oldClusterHost, endpoint, oldClusterUserName, oldClusterPassword)
           indicesList = indicesResult.split("\n")
           indexList = []
           for indices in indicesList:
               if (indices.find("open") > 0):
                   indexList.append(indices.split()[2])
           return indexList
       def getSettings(index, host, username="", password=""):
           endpoint = "/" + index + "/_settings"
           indexSettings = httpGet(host, endpoint, username, password)
           print (index + "  Original settings: \n" + indexSettings)
           settingsDict = json.loads(indexSettings)
           ## Number of primary shards matches the source index
           number_of_shards = settingsDict[index]["settings"]["index"]["number_of_shards"]
           ## Default replica count is 0
           number_of_replicas = DEFAULT_REPLICAS
           newSetting = "\"settings\": {\"number_of_shards\": %s, \"number_of_replicas\": %s}" % (number_of_shards, number_of_replicas)
           return newSetting
       def getMapping(index, host, username="", password=""):
           endpoint = "/" + index + "/_mapping"
           indexMapping = httpGet(host, endpoint, username, password)
           print (index + " Original mappings: \n" + indexMapping)
           mappingDict = json.loads(indexMapping)
           mappings = json.dumps(mappingDict[index]["mappings"])
           newMapping = "\"mappings\" : " + mappings
           return newMapping
       def createIndexStatement(oldIndexName):
           settingStr = getSettings(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
           mappingStr = getMapping(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
           createstatement = "{\n" + str(settingStr) + ",\n" + str(mappingStr) + "\n}"
           return createstatement
       def createIndex(oldIndexName, newIndexName=""):
           if (newIndexName == "") :
               newIndexName = oldIndexName
           createstatement = createIndexStatement(oldIndexName)
           print ("New index " + newIndexName + " Index settings and mappings: \n" + createstatement)
           endpoint = "/" + newIndexName
           createResult = httpPut(newClusterHost, endpoint, createstatement, newClusterUser, newClusterPassword)
           print ("New index " + newIndexName + " Creation result: " + createResult)
       ## main
       indexList = getIndices(oldClusterHost, oldClusterUserName, oldClusterPassword)
       systemIndex = []
       for index in indexList:
           if (index.startswith(".")):
               systemIndex.append(index)
           else :
               createIndex(index, index)
       if (len(systemIndex) > 0) :
           for index in systemIndex:
               print (index + " It may be a system index and will not be recreated. You can manually recreate the index based on your business requirements.")
  4. Run the script:

       sudo /usr/bin/python indiceCreate.py
  5. Verify the created indices. Log on to the Kibana console of the Alibaba Cloud Elasticsearch cluster (see Log on to the Kibana console) and run:

       GET /_cat/indices?v

Step 3: Migrate full data

Perform full data migration first if data writes or updates occur continuously on the source cluster. This establishes a complete baseline on the destination cluster before starting incremental synchronization.

To improve data accuracy, create multiple Logstash pipelines and migrate different indices separately.
The Logstash configuration format changed in version 8.5. This section provides configurations for both 7.x and 8.x.
  1. Connect to the ECS instance.

  2. Create a Logstash configuration file:

       cd logstash-7.10.0/config
       vi es2es_all.conf
  3. Add the configuration for your Logstash version.

    Note
    • The configuration parameters for Logstash 8.5 are different. This section provides configurations for both 7.x and 8.x.

    • To improve data accuracy, create multiple Logstash pipelines and migrate different indices separately.

    Logstash 7.x configuration

       input{
           elasticsearch{
               # Source Elasticsearch host
               hosts =>  ["http://<source-es-host>:9200"]
               # Source cluster credentials
               user => "<source-es-username>"
               password => "<source-es-password>"
               # Indices to migrate (supports wildcards)
               index => "kibana_sample_data_*"
               # Performance settings
               docinfo=>true
               slices => 5
               size => 5000
           }
       }
    
       filter {
         # Remove fields added by Logstash
         mutate {
           remove_field => ["@timestamp", "@version"]
         }
       }
    
       output{
           elasticsearch{
               # Destination cluster endpoint (find on the Basic Information page)
               hosts => ["http://<destination-es-endpoint>:9200"]
               # Destination cluster credentials
               user => "elastic"
               password => "<destination-es-password>"
               # Preserve original index names
               index => "%{[@metadata][_index]}"
               # Preserve original document types (7.x only)
               document_type => "%{[@metadata][_type]}"
               # Preserve original document IDs (remove for better performance)
               document_id => "%{[@metadata][_id]}"
               ilm_enabled => false
               manage_template => false
           }
       }

    Logstash 8.x configuration

       input{
           elasticsearch{
               # Source Elasticsearch host
               hosts =>  ["http://<source-es-host>:9200"]
               # Source cluster credentials
               user => "elastic"
               password => "<source-es-password>"
               # Indices to migrate
               index => "<index-name>"
               # Performance settings
               docinfo => true
               size => 10000
               docinfo_target => "[@metadata]"
           }
       }
    
       filter {
         # Remove fields added by Logstash
         mutate {
           remove_field => ["@timestamp","@version"]
         }
       }
    
       output{
           elasticsearch{
               # Destination cluster endpoint (find on the Basic Information page)
               hosts => ["http://<destination-es-endpoint>:9200"]
               # Destination cluster credentials
               user => "elastic"
               password => "<destination-es-password>"
               # Preserve original index names
               index => "%{[@metadata][_index]}"
               # Preserve original document IDs (remove for better performance)
               document_id => "%{[@metadata][_id]}"
               ilm_enabled => false
               manage_template => false
           }
       }

    Prevent duplicate data: The Elasticsearch input plugin reads all data and then stops the Logstash process. Logstash automatically restarts the process, which can cause duplicate writes with a single pipeline. To prevent this, use the schedule parameter with a cron expression to run the pipeline at a specific time. For example, to run the pipeline once at 13:20 on March 5: For more information about cron syntax, see Scheduling in the Logstash documentation.

    In Logstash 8.x, the document_type setting is removed and docinfo_target => "[@metadata]" is required.
       schedule => "20 13 5 3 *"
  4. Navigate to the Logstash installation directory:

       cd ~/logstash-7.10.0
  5. Start the migration:

       nohup bin/logstash -f config/es2es_all.conf >/dev/null 2>&1 &

Step 4: Migrate incremental data

After full data migration, set up incremental synchronization to capture ongoing changes. The incremental pipeline uses a time-range query and a cron schedule to periodically pull new data from the source cluster.

Run this step as a regular user.
  1. Create the incremental migration configuration file:

       cd ~/logstash-7.10.0/config
       vi es2es_kibana_sample_data_logs.conf
  2. Add the following configuration. This example queries data updated in the last 5 minutes and runs every minute. Logstash 7.x configuration:

    For Logstash 8.x, remove the document_type => "%{[@metadata][_type]}" setting.
    Important

    - Logstash timestamps are in UTC. If your local timezone is UTC+8, convert timestamps accordingly. The @timestamp range filter in the query parameter uses UTC. - If the source index does not contain a time field, use an ingest pipeline with the _ingest.timestamp parameter to add one.

       input{
           elasticsearch{
               # Source Elasticsearch host
               hosts =>  ["http://<source-es-host>:9200"]
               # Source cluster credentials
               user => "<source-es-username>"
               password => "<source-es-password>"
               # Index to sync
               index => "kibana_sample_data_logs"
               # Query for incremental data (last 5 minutes)
               query => '{"query":{"range":{"@timestamp":{"gte":"now-5m","lte":"now/m"}}}}'
               # Run every minute
               schedule => "* * * * *"
               scroll => "5m"
               docinfo=>true
               size => 5000
           }
       }
    
       filter {
         # Remove fields added by Logstash
         mutate {
           remove_field => ["@timestamp", "@version"]
         }
       }
    
       output{
           elasticsearch{
               # Destination cluster endpoint (find on the Basic Information page)
               hosts => ["http://<destination-es-endpoint>:9200"]
               # Destination cluster credentials
               user => "elastic"
               password => "<destination-es-password>"
               # Preserve original index names
               index => "%{[@metadata][_index]}"
               # Preserve original document types (remove for 8.x)
               document_type => "%{[@metadata][_type]}"
               # Preserve original document IDs
               document_id => "%{[@metadata][_id]}"
               ilm_enabled => false
               manage_template => false
           }
       }
  3. Navigate to the Logstash installation directory:

       cd ~/logstash-7.10.0
  4. Start the incremental migration:

       sudo nohup bin/logstash -f config/es2es_kibana_sample_data_logs.conf >/dev/null 2>&1 &
  5. Verify incremental data in the Alibaba Cloud Elasticsearch cluster. In the Kibana console, query recently updated records:

       GET kibana_sample_data_logs/_search
       {
         "query": {
           "range": {
             "@timestamp": {
               "gte": "now-5m",
               "lte": "now/m"
             }
           }
         },
         "sort": [
           {
             "@timestamp": {
               "order": "desc"
             }
           }
         ]
       }

Step 5: Verify migration results

Verify full data migration

  1. In the source cluster Kibana console, check the index document counts:

       GET _cat/indices?v
  2. In the Alibaba Cloud Elasticsearch cluster Kibana console, run the same query:

       GET _cat/indices?v
  3. Compare the docs.count values for each index. If full data migration succeeded, the document counts match between the source and destination clusters.

Verify incremental data migration

  1. In the source cluster Kibana console, query the most recently updated records:

       GET kibana_sample_data_logs/_search
       {
         "query": {
           "range": {
             "@timestamp": {
               "gte": "now-5m",
               "lte": "now/m"
             }
           }
         },
         "sort": [
           {
             "@timestamp": {
               "order": "desc"
             }
           }
         ]
       }
  2. Run the same query in the Alibaba Cloud Elasticsearch cluster Kibana console. If incremental data migration succeeded, the recent records match between the source and destination clusters.

Performance tuning

Adjust the following parameters to optimize migration throughput:

ParameterLocationDefaultRecommendedEffect
pipeline.batch.sizeconfig/pipelines.yml1255000Number of events per batch. Larger values increase throughput but use more memory
-Xms / -Xmxconfig/jvm.options1g50% of available memory (e.g., 8g for 16 GiB instance)JVM heap size. Larger heaps support bigger batches
sizePipeline config (input)10005000--10000Documents per scroll request from the source cluster
slicesPipeline config (input)none (disabled)5Parallel scroll slices. Set to enable sliced scrolling for higher read parallelism
scrollPipeline config (input)1m5mScroll context timeout. Increase for slow networks or large indices
Set -Xms and -Xmx to the same value to avoid JVM garbage collection pauses. Do not exceed 50% of total system memory.

Credential security

Avoid embedding credentials directly in Logstash configuration files. Use one of these alternatives:

  • Logstash keystore: Store sensitive values in the Logstash keystore and reference them in configuration files with ${KEY_NAME} syntax.

  • Environment variables: Export credentials as environment variables and reference them in configuration files.

For more information, see Secrets keystore in the Logstash documentation.

Troubleshooting

Logstash cannot connect to the source cluster

Symptom: Connection refused or timeout errors in Logstash logs.

Solution:

  1. Verify the source Elasticsearch host and port are correct.

  2. Confirm the ECS security group allows outbound traffic to the source cluster.

  3. Test connectivity with curl <source-es-host>:9200 from the ECS instance.

Logstash cannot connect to the destination cluster

Symptom: Authentication failure or connection timeout when writing to the Alibaba Cloud Elasticsearch cluster.

Solution:

  1. Verify the destination cluster endpoint on the Basic Information page of the Alibaba Cloud Elasticsearch console.

  2. Confirm the ECS instance is in the same VPC as the destination cluster.

  3. Check that the username and password are correct.

Document counts do not match after migration

Symptom: The destination cluster has fewer documents than the source cluster.

Solution:

  1. Check Logstash logs for errors: tail -f logs/logstash-plain.log.

  2. Verify that index names in the pipeline configuration match the source indices.

  3. If using wildcard patterns, confirm they match all intended indices.

  4. Re-run the full migration pipeline. Logstash uses document IDs to prevent duplicates when document_id is set.

Duplicate data after full migration

Symptom: The destination cluster has more documents than the source cluster.

Solution: This happens when Logstash restarts the pipeline automatically after completion. Use the schedule parameter with a cron expression to run the pipeline only once (for example, schedule => "20 13 5 3 *").

Timezone mismatch in incremental migration

Symptom: Incremental queries return unexpected results or miss recent data.

Solution: Logstash timestamps use UTC. If your source data uses a different timezone (for example, UTC+8), adjust the @timestamp range in the query parameter accordingly.

Index mapping conflicts

Symptom: Errors about field type conflicts when indexing data in the destination cluster.

Solution:

  1. Run the index metadata migration script (Step 2) before migrating data.

  2. Verify that mappings on the destination cluster match the source: GET /<index-name>/_mapping.

  3. If conflicts persist, delete the auto-created index on the destination cluster and re-create it with the correct mappings.

Post-migration checklist

After migration completes, verify the following:

  • [ ] Document counts match between source and destination for all migrated indices

  • [ ] Sample document content is identical (spot-check a few documents)

  • [ ] Index mappings are correct on the destination cluster

  • [ ] No errors in Logstash logs (logs/logstash-plain.log)

  • [ ] Application endpoints updated to point to the Alibaba Cloud Elasticsearch cluster

  • [ ] Application connectivity to the new cluster verified

  • [ ] Incremental sync pipeline stopped after cutover

  • [ ] Logstash pipelines cleaned up on the ECS instance

  • [ ] Destination cluster monitored for errors and performance

References