Semi-structured analysis based on SPL in Flink SQL

Updated at: 2025-03-03 09:32

This topic takes an example to explain how to implement semi-structured analysis based on the Simple Log Service Processing Language (SPL) in Flink SQL.

Background information

Simple Log Service is a cloud-native platform for observability and analytics, enabling cost-effective, real-time processing of logs, metrics, and traces. It simplifies data access, facilitating the ingestion of system and business logs for storage and analysis.

Realtime Compute for Apache Flink, built on Apache Flink, is a big data analytics platform suited for real-time data analysis and risk monitoring. It natively supports the Simple Log Service connector, allowing the service to be used as a source or result table.

The connector streamlines the handling of structured logs, allowing direct mapping of Simple Log Service log fields to Flink SQL table fields. For semi-structured logs that contain all content in a single field, methods like regular expressions and delimiters are necessary to extract structured data. This topic discusses a solution using SPL to configure the connector for data structuring, focusing on log cleansing and format normalization.

Semi-structured log data

Consider a log example with a complex format that includes JSON strings and mixed content. The log contains the following elements:

  • Payload: A JSON string, with the schedule field also in JSON format.

  • requestURL: A standard URL path.

  • error: Begins with the string CouldNotExecuteQuery, followed by a JSON structure.

  • __tag__:__path__: Represents the log file path, where service_a may indicate the service name.

  • caller: Contains the file name and line number.

{
  "Payload": "{\"lastNotified\": 1705030483, \"serverUri\": \"http://test.alert.com/alert-api/tasks\", \"jobID\": \"44d6ce47bb4995ef0c8052a9a30ed6d8\", \"alertName\": \"alert-12345678-123456\", \"project\": \"test-sls-project\", \"projectId\": 123, \"aliuid\": \"1234567890\", \"alertDisplayName\": \"\\u6d4b\\u8bd5\\u963f\\u91cc\\u4e91\\u544a\\u8b66\", \"checkJobUri\": \"http://test.alert.com/alert-api/task_check\", \"schedule\": {\"timeZone\": \"\", \"delay\": 0, \"runImmediately\": false, \"type\": \"FixedRate\", \"interval\": \"1m\"}, \"jobRunID\": \"bf86aa5e67a6891d-61016da98c79b-5071a6b\", \"firedNotNotified\": 25161}",
  "TaskID": "bf86aa5e67a6891d-61016da98c79b-5071a6b-334f81a-5c38aaa1-9354-43ec-8369-4f41a7c23887",
  "TaskType": "ALERT",
  "__source__": "11.199.XX.XXX",
  "__tag__:__hostname__": "iabcde12345.cloud.abc121",
  "__tag__:__path__": "/var/log/service_a.LOG",
  "caller": "executor/pool.go:64",
  "error": "CouldNotExecuteQuery : {\n    \"httpCode\": 404,\n    \"errorCode\": \"LogStoreNotExist\",\n    \"errorMessage\": \"logstore k8s-event does not exist\",\n    \"requestID\": \"65B7C10AB43D9895A8C3DB6A\"\n}",
  "requestURL": "/apis/autoscaling/v2beta1/namespaces/python-etl/horizontalpodautoscalers/cn-shenzhen-56492-1234567890123?timeout=30s",
  "ts": "2024-01-29 22:57:13"
}

Requirements for structured data processing

To derive valuable insights from these logs, data cleansing is essential. Key fields must first be extracted for analysis, which is conducted in Flink. The specific requirements for field extraction are as follows:

  • Extract httpCode, errorCode, errorMessage, and requestID from the error field.

  • Extract service_a from __tag__:__path_ as serviceName.

  • Extract pool.go from caller as fileName, and 64 as fileNo.

  • Extract project from the Payload field, and extract type from the schedule within Payload as scheduleType.

  • Rename __source__ to serviceIP.

The final list of required fields is as follows:

image

Solutions

There are several solutions to data cleansing, each suited to specific scenarios.

  1. Data transformation solution: Create a target logstore in the Simple Log Service console, and a data transformation task for cleansing.

  2. Flink solution: Define error and payload as source table fields. Use SQL regular functions and JSON functions to parse these fields, insert the parsed data into a temporary table, and perform analysis on the table.

  3. SPL solution: Configure SPL statements for the Simple Log Service connector in Realtime Compute for Apache Flink to cleanse data. Define the source table fields in Flink according to the cleansed data structure.

Among these options, the SPL solution provides a more efficient approach to data cleansing. It removes the need for intermediate logstores or temporary tables, particularly for semi-structured log data. By performing cleansing closer to the source, the computing platform can focus on business logic, resulting in a clearer separation of responsibilities.

SPL solution

1. Prepare data in Simple Log Service

  1. Activate the Simple Log Service, and create a project and logstore.

  2. Use the Simple Log Service Java SDK to write the log example into the target logstore as sample analog data. For SDKs in other languages, refer to the corresponding SDK reference.

    image

  3. In the logstore, write the SPL pipeline syntax and preview the effect.

    image

    The query statement is as follows. The SPL pipeline syntax uses the | separator to separate instructions. You can immediately see the result after entering each instruction, then progressively add pipelines to iteratively achieve the final result. For more information, see Syntax of scan-based query.

    * | project Payload, error, "__tag__:__path__", "__tag__:__hostname__", caller 
     | parse-json Payload 
     | project-away Payload 
     | parse-regexp error, 'CouldNotExecuteQuery : ({[\w":\s,\-}]+)' as errorJson 
     | parse-json errorJson 
     | parse-regexp "__tag__:__path__", '\/var\/log\/([\w\_]+).LOG' as serviceName 
     | parse-regexp caller, '\w+/([\w\.]+):(\d+)' as fileName, fileNo 
     | project-rename serviceHost="__tag__:__hostname__" 
     | extend scheduleType = json_extract_scalar(schedule, '$.type') 
     | project httpCode, errorCode,errorMessage,requestID,fileName, fileNo, serviceHost,scheduleType, project

    The syntax is explained as follows:

    • project: Retain the Payloaderrortag:path, and caller fields from the original result, discarding others to facilitate subsequent parsing.

    • parse-json: Convert the Payload string into JSON, producing first-level fields like lastNotifiedserviceUri, and jobID.

    • project-away: Remove the original Payload field.

    • parse-regexp: Extract a portion of the JSON content from the error field and store it in errorJson.

    • parse-json: Expand the errorJson field to retrieve fields such as httpCodeerrorCode, and errorMessage.

    • parse-regexp: Use a regular expression to extract the file name from __tag__:__path__ and assign it to serviceName.

    • parse-regexp: Extract the file name and line number from caller, placing them in the fileName and fileNo fields, respectively.

    • project-rename: Rename the __tag__:__hostname__ field to serviceHost.

    • extend: Use the json_extract_scalar function to extract the type field from the schedule, naming it scheduleType.

    • project: Retain the required field list, including the project field from Payload.

2. Create a SQL job

  1. Log on to the Realtime Compute for Apache Flink console and click the target workspace.

    Important

    The target workspace and the Simple Log Service project must be in the same region.

  2. In the left-side navigation pane, choose Development > ETL.

  3. Click New. In the New Draft dialog box, select SQL Scripts > Blank Stream Draft and click Next.

    image

  4. Enter a name and click Create. Copy the following SQL to create a temporary table in the draft.

    CREATE TEMPORARY TABLE sls_input_complex (
      errorCode STRING,
      errorMessage STRING,
      fileName STRING,
      fileNo STRING,
      httpCode STRING,
      requestID STRING,
      scheduleType STRING,
      serviceHost STRING,
      project STRING,
      proctime as PROCTIME()
    ) WITH (
      'connector' = 'sls',
      'endpoint' ='ap-southeast-1-intranet.log.aliyuncs.com',
      'accessId' = '${ak}',
      'accessKey' = '${sk}',
      'starttime' = '2024-02-01 10:30:00',
      'project' ='${project}',
      'logstore' ='${logtore}',
      'query' = '* | project Payload, error, "__tag__:__path__", "__tag__:__hostname__", caller | parse-json Payload | project-away Payload | parse-regexp error, ''CouldNotExecuteQuery : ({[\w":\s,\-}]+)'' as errorJson | parse-json errorJson | parse-regexp "__tag__:__path__", ''\/var\/log\/([\w\_]+).LOG'' as serviceName | parse-regexp caller, ''\w+/([\w\.]+):(\d+)'' as fileName, fileNo | project-rename serviceHost="__tag__:__hostname__" | extend scheduleType = json_extract_scalar(schedule, ''$.type'') | project httpCode, errorCode,errorMessage,requestID,fileName, fileNo, serviceHost,scheduleType,project'
      );

    The parameters in the SQL statement are described below. Replace them as needed.

    Parameter

    Description

    Example

    Parameter

    Description

    Example

    connector

    See Supported connectors.

    sls

    endpoint

    The internal endpoint used to access your Simple Log Service project. For information about how to obtain it, see View endpoints.

    ap-southeast-1-intranet.log.aliyuncs.com

    accessId

    The AccessKey ID used to identify a user. For more information, see Create an AccessKey pair.

    LTAI5tK*******

    accessKey

    The AccessKey secret used to verify the identity of the user. For more information, see Create an AccessKey pair.

    3k5PJm*******

    starttime

    The start time for querying logs.

    2025-02-19 00:00:00

    project

    The name of the Simple Log Service project.

    test-project

    logstore

    The name of the Simple Log Service logstore.

    clb-access-log

    query

    Enter the SPL statement. Note that the strings must be escaped using single quotes.

    * | where slbid = ''slb-01''

    Note

    Here, the '' represents an embedded single quote within the string.

  5. Select the SQL, right-click it, and choose Run to connect to Simple Log Service.

    image

3. Perform query and view the result

  1. Copy the following analysis statement into the draft to perform an aggregate query by slbid.

    SELECT * FROM sls_input_complex;
  2. Click Debug in the upper-right corner. In the debug dialog box, select Create new session cluster from the Session Cluster drop-down list. Refer to the figure below to create a new debug cluster.

    image

  3. In the debug dialog box, select the created debug cluster, and then click OK.

    image

  4. In the Results area, view the column values from the table, which reflect the outcomes processed by SPL. The final list of fields generated by SPL aligns with those in the table.

    image

  • On this page (1)
  • Background information
  • Semi-structured log data
  • Requirements for structured data processing ​
  • Solutions​
  • SPL solution
  • 1. Prepare data in Simple Log Service​
  • 2. Create a SQL job​
  • 3. Perform query and view the result
Feedback
phone Contact Us

Chat now with Alibaba Cloud Customer Service to assist you in finding the right products and services to meet your needs.

alicare alicarealicarealicare