This topic takes an example to explain how to implement semi-structured analysis based on the Simple Log Service Processing Language (SPL) in Flink SQL.
Background information
Simple Log Service is a cloud-native platform for observability and analytics, enabling cost-effective, real-time processing of logs, metrics, and traces. It simplifies data access, facilitating the ingestion of system and business logs for storage and analysis.
Realtime Compute for Apache Flink, built on Apache Flink, is a big data analytics platform suited for real-time data analysis and risk monitoring. It natively supports the Simple Log Service connector, allowing the service to be used as a source or result table.
The connector streamlines the handling of structured logs, allowing direct mapping of Simple Log Service log fields to Flink SQL table fields. For semi-structured logs that contain all content in a single field, methods like regular expressions and delimiters are necessary to extract structured data. This topic discusses a solution using SPL to configure the connector for data structuring, focusing on log cleansing and format normalization.
Semi-structured log data
Consider a log example with a complex format that includes JSON strings and mixed content. The log contains the following elements:
Payload
: A JSON string, with the schedule
field also in JSON format.
requestURL
: A standard URL path.
error
: Begins with the string CouldNotExecuteQuery
, followed by a JSON structure.
__tag__:__path__
: Represents the log file path, where service_a
may indicate the service name.
caller
: Contains the file name and line number.
{
"Payload": "{\"lastNotified\": 1705030483, \"serverUri\": \"http://test.alert.com/alert-api/tasks\", \"jobID\": \"44d6ce47bb4995ef0c8052a9a30ed6d8\", \"alertName\": \"alert-12345678-123456\", \"project\": \"test-sls-project\", \"projectId\": 123, \"aliuid\": \"1234567890\", \"alertDisplayName\": \"\\u6d4b\\u8bd5\\u963f\\u91cc\\u4e91\\u544a\\u8b66\", \"checkJobUri\": \"http://test.alert.com/alert-api/task_check\", \"schedule\": {\"timeZone\": \"\", \"delay\": 0, \"runImmediately\": false, \"type\": \"FixedRate\", \"interval\": \"1m\"}, \"jobRunID\": \"bf86aa5e67a6891d-61016da98c79b-5071a6b\", \"firedNotNotified\": 25161}",
"TaskID": "bf86aa5e67a6891d-61016da98c79b-5071a6b-334f81a-5c38aaa1-9354-43ec-8369-4f41a7c23887",
"TaskType": "ALERT",
"__source__": "11.199.XX.XXX",
"__tag__:__hostname__": "iabcde12345.cloud.abc121",
"__tag__:__path__": "/var/log/service_a.LOG",
"caller": "executor/pool.go:64",
"error": "CouldNotExecuteQuery : {\n \"httpCode\": 404,\n \"errorCode\": \"LogStoreNotExist\",\n \"errorMessage\": \"logstore k8s-event does not exist\",\n \"requestID\": \"65B7C10AB43D9895A8C3DB6A\"\n}",
"requestURL": "/apis/autoscaling/v2beta1/namespaces/python-etl/horizontalpodautoscalers/cn-shenzhen-56492-1234567890123?timeout=30s",
"ts": "2024-01-29 22:57:13"
}
Requirements for structured data processing
To derive valuable insights from these logs, data cleansing is essential. Key fields must first be extracted for analysis, which is conducted in Flink. The specific requirements for field extraction are as follows:
Extract httpCode
, errorCode
, errorMessage
, and requestID
from the error
field.
Extract service_a
from __tag__:__path_
as serviceName
.
Extract pool.go
from caller
as fileName
, and 64
as fileNo
.
Extract project
from the Payload
field, and extract type
from the schedule
within Payload
as scheduleType
.
Rename __source__
to serviceIP
.
The final list of required fields is as follows:

Solutions
There are several solutions to data cleansing, each suited to specific scenarios.
Data transformation solution: Create a target logstore in the Simple Log Service console, and a data transformation task for cleansing.
Flink solution: Define error
and payload
as source table fields. Use SQL regular functions and JSON functions to parse these fields, insert the parsed data into a temporary table, and perform analysis on the table.
SPL solution: Configure SPL statements for the Simple Log Service connector in Realtime Compute for Apache Flink to cleanse data. Define the source table fields in Flink according to the cleansed data structure.
Among these options, the SPL solution provides a more efficient approach to data cleansing. It removes the need for intermediate logstores or temporary tables, particularly for semi-structured log data. By performing cleansing closer to the source, the computing platform can focus on business logic, resulting in a clearer separation of responsibilities.
SPL solution
1. Prepare data in Simple Log Service
Activate the Simple Log Service, and create a project and logstore.
Use the Simple Log Service Java SDK to write the log example into the target logstore as sample analog data. For SDKs in other languages, refer to the corresponding SDK reference.

In the logstore, write the SPL pipeline syntax and preview the effect.

The query statement is as follows. The SPL pipeline syntax uses the |
separator to separate instructions. You can immediately see the result after entering each instruction, then progressively add pipelines to iteratively achieve the final result. For more information, see Syntax of scan-based query.
* | project Payload, error, "__tag__:__path__", "__tag__:__hostname__", caller
| parse-json Payload
| project-away Payload
| parse-regexp error, 'CouldNotExecuteQuery : ({[\w":\s,\-}]+)' as errorJson
| parse-json errorJson
| parse-regexp "__tag__:__path__", '\/var\/log\/([\w\_]+).LOG' as serviceName
| parse-regexp caller, '\w+/([\w\.]+):(\d+)' as fileName, fileNo
| project-rename serviceHost="__tag__:__hostname__"
| extend scheduleType = json_extract_scalar(schedule, '$.type')
| project httpCode, errorCode,errorMessage,requestID,fileName, fileNo, serviceHost,scheduleType, project
The syntax is explained as follows:
project
: Retain the Payload
, error
, tag:path
, and caller
fields from the original result, discarding others to facilitate subsequent parsing.
parse-json
: Convert the Payload
string into JSON, producing first-level fields like lastNotified
, serviceUri
, and jobID
.
project-away
: Remove the original Payload
field.
parse-regexp
: Extract a portion of the JSON content from the error
field and store it in errorJson
.
parse-json
: Expand the errorJson
field to retrieve fields such as httpCode
, errorCode
, and errorMessage
.
parse-regexp
: Use a regular expression to extract the file name from __tag__:__path__
and assign it to serviceName
.
parse-regexp
: Extract the file name and line number from caller
, placing them in the fileName
and fileNo
fields, respectively.
project-rename
: Rename the __tag__:__hostname__
field to serviceHost.
extend
: Use the json_extract_scalar
function to extract the type
field from the schedule
, naming it scheduleType
.
project
: Retain the required field list, including the project
field from Payload
.
2. Create a SQL job
Log on to the Realtime Compute for Apache Flink console and click the target workspace.
Important
The target workspace and the Simple Log Service project must be in the same region.
In the left-side navigation pane, choose .
Click New. In the New Draft dialog box, select and click Next.

Enter a name and click Create. Copy the following SQL to create a temporary table in the draft.
CREATE TEMPORARY TABLE sls_input_complex (
errorCode STRING,
errorMessage STRING,
fileName STRING,
fileNo STRING,
httpCode STRING,
requestID STRING,
scheduleType STRING,
serviceHost STRING,
project STRING,
proctime as PROCTIME()
) WITH (
'connector' = 'sls',
'endpoint' ='ap-southeast-1-intranet.log.aliyuncs.com',
'accessId' = '${ak}',
'accessKey' = '${sk}',
'starttime' = '2024-02-01 10:30:00',
'project' ='${project}',
'logstore' ='${logtore}',
'query' = '* | project Payload, error, "__tag__:__path__", "__tag__:__hostname__", caller | parse-json Payload | project-away Payload | parse-regexp error, ''CouldNotExecuteQuery : ({[\w":\s,\-}]+)'' as errorJson | parse-json errorJson | parse-regexp "__tag__:__path__", ''\/var\/log\/([\w\_]+).LOG'' as serviceName | parse-regexp caller, ''\w+/([\w\.]+):(\d+)'' as fileName, fileNo | project-rename serviceHost="__tag__:__hostname__" | extend scheduleType = json_extract_scalar(schedule, ''$.type'') | project httpCode, errorCode,errorMessage,requestID,fileName, fileNo, serviceHost,scheduleType,project'
);
The parameters in the SQL statement are described below. Replace them as needed.
Parameter | Description | Example |
Parameter | Description | Example |
connector | See Supported connectors. | sls |
endpoint | The internal endpoint used to access your Simple Log Service project. For information about how to obtain it, see View endpoints. | ap-southeast-1-intranet.log.aliyuncs.com |
accessId | The AccessKey ID used to identify a user. For more information, see Create an AccessKey pair. | LTAI5tK******* |
accessKey | The AccessKey secret used to verify the identity of the user. For more information, see Create an AccessKey pair. | 3k5PJm******* |
starttime | The start time for querying logs. | 2025-02-19 00:00:00 |
project | The name of the Simple Log Service project. | test-project |
logstore | The name of the Simple Log Service logstore. | clb-access-log |
query | Enter the SPL statement. Note that the strings must be escaped using single quotes. | * | where slbid = ''slb-01'' Note Here, the '' represents an embedded single quote within the string. |
Select the SQL, right-click it, and choose Run to connect to Simple Log Service.

3. Perform query and view the result
Copy the following analysis statement into the draft to perform an aggregate query by slbid
.
SELECT * FROM sls_input_complex;
Click Debug in the upper-right corner. In the debug dialog box, select Create new session cluster from the Session Cluster drop-down list. Refer to the figure below to create a new debug cluster.

In the debug dialog box, select the created debug cluster, and then click OK.

In the Results area, view the column values from the table, which reflect the outcomes processed by SPL. The final list of fields generated by SPL aligns with those in the table.
