If you want to build a log retrieval system, you can use Alibaba Cloud Realtime Compute for Apache Flink to compute log data and import the processed data into Alibaba Cloud Elasticsearch for searches. This topic uses log data in Simple Log Service to describe the detailed procedure.
Prerequisites
The following operations are performed:
Activate Realtime Compute for Apache Flink and create a project.
Create an Alibaba Cloud Elasticsearch cluster.
For more information, see Create an Alibaba Cloud Elasticsearch cluster.
Activate Simple Log Service, and create a project and a Logstore.
For more information, see Activate Simple Log Service, Create a project, and Create a Logstore.
Background information
Realtime Compute for Apache Flink is a Flink-based service provided by Alibaba Cloud. It supports various input and output systems, such as Kafka and Elasticsearch. You can use Realtime Compute for Apache Flink and Elasticsearch to retrieve logs.
Realtime Compute for Apache Flink processes logs in Kafka or Simple Log Service by using simple or complex Flink SQL statements. Then, it imports the processed logs into an Elasticsearch cluster as source data for searches. The computing capabilities of Realtime Compute for Apache Flink and the search capabilities of Elasticsearch allow you to process and search for data in real time. This help you transform your business into real-time services.
Realtime Compute for Apache Flink provides a simple way to interact with Elasticsearch. For example, logs or data records are imported into Simple Log Service and must be processed before they are imported into an Elasticsearch cluster. The following figure shows the data consumption pipeline.
Procedure
Log on to the Realtime Compute for Apache Flink console.
Create a Realtime Compute job.
For more information, see Develop a job in Job development of Blink SQL Development Guide in the Blink Exclusive Mode (Phased-Out for Alibaba Cloud) file.
Write Flink SQL statements.
Create a source table for Simple Log Service.
create table sls_stream( a int, b int, c VARCHAR ) WITH ( type ='sls', endPoint ='<yourEndpoint>', accessId ='<yourAccessId>', accessKey ='<yourAccessKey>', startTime = '<yourStartTime>', project ='<yourProjectName>', logStore ='<yourLogStoreName>', consumerGroup ='<yourConsumerGroupName>' );
The following table describes the parameters in the WITH part.
Parameter
Description
endPoint
The URL that is used to access projects and logs in Simple Log Service. For more information, see Endpoints.
For example, the URL that is used to access Simple Log Service in the China (Hangzhou) region is http://cn-hangzhou.log.aliyuncs.com. Make sure that the URL starts with http://.
accessId
The AccessKey ID of your Alibaba Cloud account.
accessKey
The AccessKey secret of your Alibaba Cloud account.
startTime
The time when logs start to be consumed. When you run a Realtime Compute for Apache Flink job, specify a time point that is later than the start time specified by this parameter.
project
The name of the Simple Log Service project.
logStore
The name of the Logstore in the project.
consumerGroup
The name of the Simple Log Service consumer group.
Create an Elasticsearch result table.
ImportantElasticsearch result tables are supported in Realtime Compute V3.2.2 and later. When you create a Realtime Compute for Apache Flink job, select a valid version.
Elasticsearch result tables are based on RESTful APIs and are compatible with all Elasticsearch versions.
CREATE TABLE es_stream_sink( a int, cnt BIGINT, PRIMARY KEY(a) ) WITH( type ='elasticsearch-7', endPoint = 'http://<instanceid>.public.elasticsearch.aliyuncs.com:<port>', accessId = '<yourAccessId>', accessKey = '<yourAccessSecret>', index = '<yourIndex>', typeName = '<yourTypeName>' );
The following table describes the parameters in the WITH part.
Parameter
Description
endPoint
The URL that is used to access the Elasticsearch cluster over the Internet. Specify the URL in the format of http://<instanceid>.public.elasticsearch.aliyuncs.com:9200. You can obtain the public endpoint of the cluster from the Basic Information page of the cluster. For more information, see View the basic information of a cluster.
accessId
The username that is used to access the Elasticsearch cluster. The default username is elastic.
accessKey
The password that is used to access the Elasticsearch cluster. The password of the elastic account is specified when you create the cluster. If you forget the password, you can reset it. For more information about the procedure and precautions for resetting a password, see Reset the access password for an Elasticsearch cluster.
index
The name of the index in the Elasticsearch cluster. If no indexes are created in the Elasticsearch cluster, create one first. For more information, see Step 3: Create an index. You can also enable the Auto Indexing feature for the Elasticsearch cluster. This way, the system can automatically create indexes in the cluster. For more information, see Configure the YML file.
typeName
The type of the index. The index type of Elasticsearch clusters of V7.0 or later must be _doc.
NoteElasticsearch allows you to update documents based on the
PRIMARY KEY
field. Only one field can be specified as the PRIMARY KEY field. If you specify thePRIMARY KEY
field, values of thePRIMARY KEY
field are used as document IDs. If thePRIMARY KEY
field is not specified, the system generates random IDs for documents. For more information, see Index API.Elasticsearch supports multiple update modes. You can configure the updateMode parameter to specify the update mode.
If
updateMode
is set to full, new documents overwrite existing documents.If
updateMode
is set to inc, new values overwrite existing values of the related fields.
All updates in Elasticsearch are performed by using INSERT or UPDATE statements that follow the UPSERT syntax.
Create data consumption logic and synchronize data.
INSERT INTO es_stream_sink SELECT a, count(*) as cnt FROM sls_stream GROUP BY a
Publish and start the job.
After you publish and start the job, data stored in Simple Log Service is aggregated and imported into the Elasticsearch cluster.
Additional information
Realtime Compute for Apache Flink and Elasticsearch allow you to quickly create your own real-time search services. If more complex logic is required to import data into an Elasticsearch cluster, use the user-defined sinks of Realtime Compute for Apache Flink.