Deploy a self-managed Logstash instance on an Elastic Compute Service (ECS) instance to migrate full or incremental data from a self-managed Elasticsearch cluster to an Alibaba Cloud Elasticsearch cluster.
Choose the right migration method
| Method | Best for | Trade-offs |
|---|---|---|
| Logstash | Incremental sync, continuous replication, selective index migration | Requires a running Logstash instance; pipeline processing overhead |
| Snapshots | One-time full migration of large datasets | Requires shared storage; restores complete indices (no document-level incremental sync) |
| Reindex API | Small datasets or cross-version migrations | Slower for large volumes; network-dependent |
Prerequisites
Before you begin, make sure that you have:
A self-managed Elasticsearch cluster (source) with data to migrate
An Alibaba Cloud Elasticsearch cluster (destination) created and running. For more information, see Create an Alibaba Cloud Elasticsearch cluster
An ECS instance in the same Virtual Private Cloud (VPC) as the destination cluster, with a public IPv4 address assigned
Self-managed Logstash installed on the ECS instance
Network connectivity verified between Logstash and both Elasticsearch clusters
A security group inbound rule that allows traffic on port 5601 (for Kibana access)
The ECS instance must reside in the same VPC as the Alibaba Cloud Elasticsearch cluster. Logstash must be able to connect to both the source and destination clusters.
Step 1: Set up the environment
This section walks through deploying a self-managed Elasticsearch cluster, Kibana, and Logstash on a single ECS instance. If you already have these components running, skip to Step 2.
Example environment specifications
The following tables describe the example configurations used in this document.
Alibaba Cloud Elasticsearch cluster
| Configuration | Value |
|---|---|
| Region | China (Hangzhou) |
| Edition and version | V7.10.0, Standard Edition |
| Zones and nodes | Three zones, three data nodes |
| Single node specs | 4 vCPUs, 16 GiB memory, Enhanced SSD (ESSD) with 100 GiB storage |
ECS instance
| Configuration | Value |
|---|---|
| Region | China (Hangzhou) |
| Specs | 4 vCPUs, 16 GiB memory |
| Image | CentOS 7.9 64-bit (public image) |
| System disk | ESSD, 100 GiB |
| Network | Same VPC as the Alibaba Cloud Elasticsearch cluster. Assign Public IPv4 Address selected. Pay-by-traffic billing, 100 Mbit/s peak bandwidth |
| Security group | Inbound rule allowing traffic on port 5601 (Kibana). Add your client IP address as the authorization object |
If your client is on a home or office LAN, add the public egress IP address, not the local IP address. Visit cip.cc to find your public IP.
Adding
0.0.0.0/0as an authorization object allows access from all public IPv4 addresses. Avoid this in production environments due to security risks.
For more information about creating an ECS instance, see Create an instance on the Custom Launch tab.
Create an Alibaba Cloud Elasticsearch cluster
Create a cluster with the specifications listed above or adjust them to your requirements. For detailed instructions, see Create an Alibaba Cloud Elasticsearch cluster.
Deploy self-managed Elasticsearch
This example deploys a single-node Elasticsearch 7.6.2 cluster on the ECS instance.
Connect to the ECS instance. For more information, see Connect to a Linux instance by using a password or key.
Create a user named
elasticas the root user:useradd elasticSet the password for the
elasticuser: Enter and confirm the password when prompted.passwd elasticSwitch to the
elasticuser:su -l elasticDownload and extract the Elasticsearch package:
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.6.2-linux-x86_64.tar.gz tar -zvxf elasticsearch-7.6.2-linux-x86_64.tar.gzStart Elasticsearch:
cd elasticsearch-7.6.2 ./bin/elasticsearch -dVerify that Elasticsearch is running: A successful response contains the version number and the message
"You Know, for Search".cd ~ curl localhost:9200
Deploy Kibana and add test data
This example deploys Kibana 7.6.2 on the same ECS instance.
Run Kibana as a regular user, not root.
Download and extract the Kibana package:
wget https://artifacts.elastic.co/downloads/kibana/kibana-7.6.2-linux-x86_64.tar.gz tar -zvxf kibana-7.6.2-linux-x86_64.tar.gzConfigure Kibana to accept connections from all IP addresses. Open the
kibana.ymlfile in theconfig/directory and add the following line: Add:cd kibana-7.6.2-linux-x86_64 vi config/kibana.ymlserver.host: "0.0.0.0"Start Kibana:
sudo nohup ./bin/kibana &Open the Kibana console in your browser:
http://<ECS-public-IP>:5601/app/kibana#/homeOn the Kibana home page, click Try our sample data. On the Sample data tab, click Add data in the Sample web logs card to load test data.
Deploy Logstash
This example deploys Logstash 7.10.0 on the same ECS instance.
Run Logstash as a regular user, not root.
Download and extract the Logstash package:
cd ~ wget https://artifacts.elastic.co/downloads/logstash/logstash-7.10.0-linux-x86_64.tar.gz tar -zvxf logstash-7.10.0-linux-x86_64.tar.gzIncrease the JVM heap memory. The default heap size is 1 GiB. Set it to a value appropriate for your ECS instance specs to speed up data migration: Set the heap size (for example, 8 GiB for a 16 GiB instance):
cd logstash-7.10.0 sudo vi config/jvm.options-Xms8g -Xmx8gIncrease the batch size. Edit the
pipelines.ymlconfiguration file and changepipeline.batch.sizefrom the default125to5000. This allows Logstash to write 5 MiB to 15 MiB of data per batch, which speeds up migration:vi config/pipelines.ymlVerify that Logstash runs correctly: Type
Hello world!and press Enter. If Logstash is working, it outputsHello world!back to the terminal. PressCtrl+Cto stop.bin/logstash -e 'input { stdin { } } output { stdout {} }'
Step 2: (Optional) Migrate index metadata
When Logstash migrates data, the Auto Indexing feature on the Alibaba Cloud Elasticsearch cluster automatically creates indices. However, auto-created indices may have different settings and mappings than the source indices. To preserve the original index structure, create the indices manually on the destination cluster before migrating data.
The following Python script reads index settings and mappings from the source cluster and creates matching indices on the destination cluster.
Connect to the ECS instance as a regular user.
Create a Python script file:
sudo vi indiceCreate.pyPaste the following script. Replace the host, username, and password values for both the source and destination clusters: Key behaviors of this script:
Preserves the number of primary shards from the source index.
Sets the number of replica shards to 0 (
DEFAULT_REPLICAS = 0) for faster initial indexing. Increase replicas after migration.Skips system indices (names starting with
.). Recreate these manually if needed.
#!/usr/bin/python # -*- coding: UTF-8 -*- # File name: indiceCreate.py import sys import base64 import time import httplib import json ## Source Elasticsearch cluster oldClusterHost = "<source-es-host>:9200" ## Source cluster username (leave blank if not required) oldClusterUserName = "elastic" ## Source cluster password (leave blank if not required) oldClusterPassword = "<source-es-password>" ## Destination Elasticsearch cluster (find on the Basic Information page) newClusterHost = "<destination-es-endpoint>:9200" ## Destination cluster username newClusterUser = "elastic" ## Destination cluster password newClusterPassword = "<destination-es-password>" DEFAULT_REPLICAS = 0 def httpRequest(method, host, endpoint, params="", username="", password=""): conn = httplib.HTTPConnection(host) headers = {} if (username != "") : 'Hello {name}, your age is {age} !'.format(name = 'Tom', age = '20') base64string = base64.encodestring('{username}:{password}'.format(username = username, password = password)).replace('\n', '') headers["Authorization"] = "Basic %s" % base64string; if "GET" == method: headers["Content-Type"] = "application/x-www-form-urlencoded" conn.request(method=method, url=endpoint, headers=headers) else : headers["Content-Type"] = "application/json" conn.request(method=method, url=endpoint, body=params, headers=headers) response = conn.getresponse() res = response.read() return res def httpGet(host, endpoint, username="", password=""): return httpRequest("GET", host, endpoint, "", username, password) def httpPost(host, endpoint, params, username="", password=""): return httpRequest("POST", host, endpoint, params, username, password) def httpPut(host, endpoint, params, username="", password=""): return httpRequest("PUT", host, endpoint, params, username, password) def getIndices(host, username="", password=""): endpoint = "/_cat/indices" indicesResult = httpGet(oldClusterHost, endpoint, oldClusterUserName, oldClusterPassword) indicesList = indicesResult.split("\n") indexList = [] for indices in indicesList: if (indices.find("open") > 0): indexList.append(indices.split()[2]) return indexList def getSettings(index, host, username="", password=""): endpoint = "/" + index + "/_settings" indexSettings = httpGet(host, endpoint, username, password) print (index + " Original settings: \n" + indexSettings) settingsDict = json.loads(indexSettings) ## Number of primary shards matches the source index number_of_shards = settingsDict[index]["settings"]["index"]["number_of_shards"] ## Default replica count is 0 number_of_replicas = DEFAULT_REPLICAS newSetting = "\"settings\": {\"number_of_shards\": %s, \"number_of_replicas\": %s}" % (number_of_shards, number_of_replicas) return newSetting def getMapping(index, host, username="", password=""): endpoint = "/" + index + "/_mapping" indexMapping = httpGet(host, endpoint, username, password) print (index + " Original mappings: \n" + indexMapping) mappingDict = json.loads(indexMapping) mappings = json.dumps(mappingDict[index]["mappings"]) newMapping = "\"mappings\" : " + mappings return newMapping def createIndexStatement(oldIndexName): settingStr = getSettings(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword) mappingStr = getMapping(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword) createstatement = "{\n" + str(settingStr) + ",\n" + str(mappingStr) + "\n}" return createstatement def createIndex(oldIndexName, newIndexName=""): if (newIndexName == "") : newIndexName = oldIndexName createstatement = createIndexStatement(oldIndexName) print ("New index " + newIndexName + " Index settings and mappings: \n" + createstatement) endpoint = "/" + newIndexName createResult = httpPut(newClusterHost, endpoint, createstatement, newClusterUser, newClusterPassword) print ("New index " + newIndexName + " Creation result: " + createResult) ## main indexList = getIndices(oldClusterHost, oldClusterUserName, oldClusterPassword) systemIndex = [] for index in indexList: if (index.startswith(".")): systemIndex.append(index) else : createIndex(index, index) if (len(systemIndex) > 0) : for index in systemIndex: print (index + " It may be a system index and will not be recreated. You can manually recreate the index based on your business requirements.")Run the script:
sudo /usr/bin/python indiceCreate.pyVerify the created indices. Log on to the Kibana console of the Alibaba Cloud Elasticsearch cluster (see Log on to the Kibana console) and run:
GET /_cat/indices?v
Step 3: Migrate full data
Perform full data migration first if data writes or updates occur continuously on the source cluster. This establishes a complete baseline on the destination cluster before starting incremental synchronization.
To improve data accuracy, create multiple Logstash pipelines and migrate different indices separately.
The Logstash configuration format changed in version 8.5. This section provides configurations for both 7.x and 8.x.
Connect to the ECS instance.
Create a Logstash configuration file:
cd logstash-7.10.0/config vi es2es_all.confAdd the configuration for your Logstash version.
NoteThe configuration parameters for Logstash 8.5 are different. This section provides configurations for both 7.x and 8.x.
To improve data accuracy, create multiple Logstash pipelines and migrate different indices separately.
Logstash 7.x configuration
input{ elasticsearch{ # Source Elasticsearch host hosts => ["http://<source-es-host>:9200"] # Source cluster credentials user => "<source-es-username>" password => "<source-es-password>" # Indices to migrate (supports wildcards) index => "kibana_sample_data_*" # Performance settings docinfo=>true slices => 5 size => 5000 } } filter { # Remove fields added by Logstash mutate { remove_field => ["@timestamp", "@version"] } } output{ elasticsearch{ # Destination cluster endpoint (find on the Basic Information page) hosts => ["http://<destination-es-endpoint>:9200"] # Destination cluster credentials user => "elastic" password => "<destination-es-password>" # Preserve original index names index => "%{[@metadata][_index]}" # Preserve original document types (7.x only) document_type => "%{[@metadata][_type]}" # Preserve original document IDs (remove for better performance) document_id => "%{[@metadata][_id]}" ilm_enabled => false manage_template => false } }Logstash 8.x configuration
input{ elasticsearch{ # Source Elasticsearch host hosts => ["http://<source-es-host>:9200"] # Source cluster credentials user => "elastic" password => "<source-es-password>" # Indices to migrate index => "<index-name>" # Performance settings docinfo => true size => 10000 docinfo_target => "[@metadata]" } } filter { # Remove fields added by Logstash mutate { remove_field => ["@timestamp","@version"] } } output{ elasticsearch{ # Destination cluster endpoint (find on the Basic Information page) hosts => ["http://<destination-es-endpoint>:9200"] # Destination cluster credentials user => "elastic" password => "<destination-es-password>" # Preserve original index names index => "%{[@metadata][_index]}" # Preserve original document IDs (remove for better performance) document_id => "%{[@metadata][_id]}" ilm_enabled => false manage_template => false } }Prevent duplicate data: The Elasticsearch input plugin reads all data and then stops the Logstash process. Logstash automatically restarts the process, which can cause duplicate writes with a single pipeline. To prevent this, use the
scheduleparameter with a cron expression to run the pipeline at a specific time. For example, to run the pipeline once at 13:20 on March 5: For more information about cron syntax, see Scheduling in the Logstash documentation.In Logstash 8.x, the
document_typesetting is removed anddocinfo_target => "[@metadata]"is required.schedule => "20 13 5 3 *"Navigate to the Logstash installation directory:
cd ~/logstash-7.10.0Start the migration:
nohup bin/logstash -f config/es2es_all.conf >/dev/null 2>&1 &
Step 4: Migrate incremental data
After full data migration, set up incremental synchronization to capture ongoing changes. The incremental pipeline uses a time-range query and a cron schedule to periodically pull new data from the source cluster.
Run this step as a regular user.
Create the incremental migration configuration file:
cd ~/logstash-7.10.0/config vi es2es_kibana_sample_data_logs.confAdd the following configuration. This example queries data updated in the last 5 minutes and runs every minute. Logstash 7.x configuration:
For Logstash 8.x, remove the
document_type => "%{[@metadata][_type]}"setting.Important- Logstash timestamps are in UTC. If your local timezone is UTC+8, convert timestamps accordingly. The
@timestamprange filter in thequeryparameter uses UTC. - If the source index does not contain a time field, use an ingest pipeline with the_ingest.timestampparameter to add one.input{ elasticsearch{ # Source Elasticsearch host hosts => ["http://<source-es-host>:9200"] # Source cluster credentials user => "<source-es-username>" password => "<source-es-password>" # Index to sync index => "kibana_sample_data_logs" # Query for incremental data (last 5 minutes) query => '{"query":{"range":{"@timestamp":{"gte":"now-5m","lte":"now/m"}}}}' # Run every minute schedule => "* * * * *" scroll => "5m" docinfo=>true size => 5000 } } filter { # Remove fields added by Logstash mutate { remove_field => ["@timestamp", "@version"] } } output{ elasticsearch{ # Destination cluster endpoint (find on the Basic Information page) hosts => ["http://<destination-es-endpoint>:9200"] # Destination cluster credentials user => "elastic" password => "<destination-es-password>" # Preserve original index names index => "%{[@metadata][_index]}" # Preserve original document types (remove for 8.x) document_type => "%{[@metadata][_type]}" # Preserve original document IDs document_id => "%{[@metadata][_id]}" ilm_enabled => false manage_template => false } }Navigate to the Logstash installation directory:
cd ~/logstash-7.10.0Start the incremental migration:
sudo nohup bin/logstash -f config/es2es_kibana_sample_data_logs.conf >/dev/null 2>&1 &Verify incremental data in the Alibaba Cloud Elasticsearch cluster. In the Kibana console, query recently updated records:
GET kibana_sample_data_logs/_search { "query": { "range": { "@timestamp": { "gte": "now-5m", "lte": "now/m" } } }, "sort": [ { "@timestamp": { "order": "desc" } } ] }
Step 5: Verify migration results
Verify full data migration
In the source cluster Kibana console, check the index document counts:
GET _cat/indices?vIn the Alibaba Cloud Elasticsearch cluster Kibana console, run the same query:
GET _cat/indices?vCompare the
docs.countvalues for each index. If full data migration succeeded, the document counts match between the source and destination clusters.
Verify incremental data migration
In the source cluster Kibana console, query the most recently updated records:
GET kibana_sample_data_logs/_search { "query": { "range": { "@timestamp": { "gte": "now-5m", "lte": "now/m" } } }, "sort": [ { "@timestamp": { "order": "desc" } } ] }Run the same query in the Alibaba Cloud Elasticsearch cluster Kibana console. If incremental data migration succeeded, the recent records match between the source and destination clusters.
Performance tuning
Adjust the following parameters to optimize migration throughput:
| Parameter | Location | Default | Recommended | Effect |
|---|---|---|---|---|
pipeline.batch.size | config/pipelines.yml | 125 | 5000 | Number of events per batch. Larger values increase throughput but use more memory |
-Xms / -Xmx | config/jvm.options | 1g | 50% of available memory (e.g., 8g for 16 GiB instance) | JVM heap size. Larger heaps support bigger batches |
size | Pipeline config (input) | 1000 | 5000--10000 | Documents per scroll request from the source cluster |
slices | Pipeline config (input) | none (disabled) | 5 | Parallel scroll slices. Set to enable sliced scrolling for higher read parallelism |
scroll | Pipeline config (input) | 1m | 5m | Scroll context timeout. Increase for slow networks or large indices |
Set-Xmsand-Xmxto the same value to avoid JVM garbage collection pauses. Do not exceed 50% of total system memory.
Credential security
Avoid embedding credentials directly in Logstash configuration files. Use one of these alternatives:
Logstash keystore: Store sensitive values in the Logstash keystore and reference them in configuration files with
${KEY_NAME}syntax.Environment variables: Export credentials as environment variables and reference them in configuration files.
For more information, see Secrets keystore in the Logstash documentation.
Troubleshooting
Logstash cannot connect to the source cluster
Symptom: Connection refused or timeout errors in Logstash logs.
Solution:
Verify the source Elasticsearch host and port are correct.
Confirm the ECS security group allows outbound traffic to the source cluster.
Test connectivity with
curl <source-es-host>:9200from the ECS instance.
Logstash cannot connect to the destination cluster
Symptom: Authentication failure or connection timeout when writing to the Alibaba Cloud Elasticsearch cluster.
Solution:
Verify the destination cluster endpoint on the Basic Information page of the Alibaba Cloud Elasticsearch console.
Confirm the ECS instance is in the same VPC as the destination cluster.
Check that the username and password are correct.
Document counts do not match after migration
Symptom: The destination cluster has fewer documents than the source cluster.
Solution:
Check Logstash logs for errors:
tail -f logs/logstash-plain.log.Verify that index names in the pipeline configuration match the source indices.
If using wildcard patterns, confirm they match all intended indices.
Re-run the full migration pipeline. Logstash uses document IDs to prevent duplicates when
document_idis set.
Duplicate data after full migration
Symptom: The destination cluster has more documents than the source cluster.
Solution: This happens when Logstash restarts the pipeline automatically after completion. Use the schedule parameter with a cron expression to run the pipeline only once (for example, schedule => "20 13 5 3 *").
Timezone mismatch in incremental migration
Symptom: Incremental queries return unexpected results or miss recent data.
Solution: Logstash timestamps use UTC. If your source data uses a different timezone (for example, UTC+8), adjust the @timestamp range in the query parameter accordingly.
Index mapping conflicts
Symptom: Errors about field type conflicts when indexing data in the destination cluster.
Solution:
Run the index metadata migration script (Step 2) before migrating data.
Verify that mappings on the destination cluster match the source:
GET /<index-name>/_mapping.If conflicts persist, delete the auto-created index on the destination cluster and re-create it with the correct mappings.
Post-migration checklist
After migration completes, verify the following:
[ ] Document counts match between source and destination for all migrated indices
[ ] Sample document content is identical (spot-check a few documents)
[ ] Index mappings are correct on the destination cluster
[ ] No errors in Logstash logs (
logs/logstash-plain.log)[ ] Application endpoints updated to point to the Alibaba Cloud Elasticsearch cluster
[ ] Application connectivity to the new cluster verified
[ ] Incremental sync pipeline stopped after cutover
[ ] Logstash pipelines cleaned up on the ECS instance
[ ] Destination cluster monitored for errors and performance