All Products
Search
Document Center

Simple Log Service:Implement row filtering and column pruning by Flink SQL by using SPL statements

Last Updated:Apr 08, 2025

This topic describes how Alibaba Cloud Flink SQL implements row filtering and column pruning by using Simple Log Service Processing Language (SPL) statements.

Background

When you configure Simple Log Service as the source table in Realtime Compute for Apache Flink, data in the Logstores of Simple Log Service is automatically consumed to build a dynamic table. You can specify the start time for the data consumption. Data that is generated after the start time is consumed. Two issues may exist:

  1. The Simple Log Service connector pulls excessive unnecessary data rows or columns, which cause network overheads.

  2. The unnecessary data must be cleansed in Realtime Compute for Apache Flink, but data cleansing is not the focus of data analysis and consumes additional computational resources. Data cleansing involves data filtering and data projection.

SPL provides the Simple Log Service connector with the filter pushdown and projection pushdown capabilities. You can specify a query statement for the Simple Log Service connector to enable the pushdown of filter conditions or configure the query parameter for the Simple Log Service connector to enable the pushdown of projection fields. The settings prevent full data transmission or computing and improve data transmission and computing efficiency.

Solution

  • No SPL statement is specified: Realtime Compute for Apache Flink pulls full logs of Simple Log Service, including all rows and lines.

    image
  • An SPL statement is specified: If row filtering or column pruning settings are included in an SPL statement, Realtime Compute for Apache Flink pulls only partial data that is obtained after row filtering or column pruning for subsequent computing.

    image

Preparations

  • Simple Log Service is activated. A project and a Logstore are created. For more information, see Activate Simple Log Service and Create a project and a Logstore.

  • In this topic, simulated Layer 7 access logs of Server Load Balancer (SLB) are collected to the Logstore of Simple Log Service. The logs contain more than 10 fields. Random simulated logs are continuously generated. Sample log:

    {
      "__source__": "127.0.0.1",
      "__tag__:__receive_time__": "1706531737",
      "__time__": "1706531727",
      "__topic__": "slb_layer7",
      "body_bytes_sent": "3577",
      "client_ip": "114.137.XXX.XXX",
      "host": "www.pi.mock.com",
      "http_host": "www.cwj.mock.com",
      "http_user_agent": "Mozilla/5.0 (Windows NT 6.2; rv:22.0) Gecko/20130405 Firefox/23.0",
      "request_length": "1662",
      "request_method": "GET",
      "request_time": "31",
      "request_uri": "/request/path-0/file-3",
      "scheme": "https",
      "slbid": "slb-02",
      "status": "200",
      "upstream_addr": "42.63.XXX.XXX",
      "upstream_response_time": "32",
      "upstream_status": "200",
      "vip_addr": "223.18.XX.XXX"
    }
  • The slbid field in the Logstore has three types of values. The following statement is used to query the page views (PVs) for the values of the slbid field in the logs that are generated within 15 minutes. The results show that the PVs of the slb-01 value are equivalent to the PVs of the slb-02 value.

    image

Procedure

Row filtering: SPL provides the Simple Log Service connector with the filter pushdown capability. You can specify a query statement for the Simple Log Service connector to enable the pushdown of filter conditions. This prevents full data transmission, and full data filtering and computing.

Column pruning: SPL provides the Simple Log Service connector with the projection pushdown capability. You can configure the query parameter for the Simple Log Service connector to enable the pushdown of projection fields. This prevents full data transmission, and full data filtering and computing.

Row filtering

Step 1: Create an SQL job

  1. Log on to the Realtime Compute for Apache Flink console. Find the workspace that you want to manage and click Console in the Actions column.

  2. In the left-side navigation pane, choose Development > ETL.

  3. Click New. In the New Draft dialog box, choose SQL Scripts > Blank Stream Draft and click Next.

    image

  4. Copy the following SQL job settings used to create a temporary table to the SQL editor:

    CREATE TEMPORARY TABLE sls_input(
      request_uri STRING,
      scheme STRING,
      slbid STRING,
      status STRING,
      `__topic__` STRING METADATA VIRTUAL,
      `__source__` STRING METADATA VIRTUAL,
      `__timestamp__` STRING METADATA VIRTUAL,
       __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
      proctime as PROCTIME()
    ) WITH (
      'connector' = 'sls',
      'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
      'accessId' = 'yourAccessKeyID',
      'accessKey' = 'yourAccessKeySecret',
      'starttime' = '2025-02-19 00:00:00',
      'project' ='test-project',
      'logstore' ='clb-access-log',
      'query' = '* | where slbid = ''slb-01'''
    );

    The following table describes the parameters of the SQL job.

    Parameter

    Description

    Example

    connector

    The Simple Log Service connector. For more information, see Supported connectors.

    sls

    endpoint

    The internal Simple Log Service endpoint. For more information, see Endpoints.

    cn-hangzhou-intranet.log.aliyuncs.com

    accessId

    The AccessKey ID. For more information, see Create an AccessKey pair.

    LTAI****************

    accessKey

    The AccessKey secret. For more information, see Create an AccessKey pair.

    yourAccessKeySecret

    starttime

    The point in time at which the query starts.

    2025-02-19 00:00:00

    project

    The name of the Simple Log Service project.

    test-project

    logstore

    The name of the Simple Log Service Logstore.

    clb-access-log

    query

    The SPL statement. Note that in the development of an SQL job for Realtime Compute for Apache Flink, a string must be escaped by using single quotation marks ('').

    * | where slbid = ''slb-01''

  5. Select and right-click the SQL job settings. In the popover that appears, click Run to connect to Simple Log Service.

    image

Step 2: Consecutive queries and results

  1. Enter the following analytic statement in the SQL job and perform an aggregate query by slbid:

    SELECT slbid, count(1) as slb_cnt FROM sls_input GROUP BY slbid;
  2. In the upper-right corner of the page, click the Debug icon. In the debug dialog box, select Create new session cluster from the Session Cluster drop-down list and configure the parameters based on the following figure.

    image

  3. In the debug drop-down list, select the created session cluster and click OK.

    image

  4. In the Results section, the value slb-01 is always displayed for the slbid field. The results show that after you specify an SPL statement, sls_input includes only data that matches slbid='slb-01'.

    image

Column pruning

Step 1: Create an SQL job

  1. Log on to the Realtime Compute for Apache Flink console. Find the workspace that you want to manage and click Console in the Actions column.

  2. In the left-side navigation pane, choose Development > ETL.

  3. Click New. In the New Draft dialog box, choose SQL Scripts > Blank Stream Draft and click Next.

    image

  4. Copy the following SQL job settings used to create a temporary table to the SQL editor: Different from row filtering, a projection statement is added to the query parameter based on row filtering settings. Vertical bars (|) are used to split instructions. The output of the previous instruction is used as the input of the next instruction. The output of the last instruction indicates the output of the entire pipeline. This way, only the content of specific fields is pulled from your Simple Log Service server.

    CREATE TEMPORARY TABLE sls_input_project(
      request_uri STRING,
      scheme STRING,
      slbid STRING,
      status STRING,
      `__topic__` STRING METADATA VIRTUAL,
      `__source__` STRING METADATA VIRTUAL,
      `__timestamp__` STRING METADATA VIRTUAL,
       __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
      proctime as PROCTIME()
    ) WITH (
      'connector' = 'sls',
      'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
      'accessId' = 'yourAccessKeyID',
      'accessKey' = 'yourAccessKeySecret',
      'starttime' = '2025-02-19 00:00:00',
      'project' ='test-project',
      'logstore' ='clb-access-log',
      'query' = '* | where slbid = ''slb-01'' | project request_uri, scheme, slbid, status, __topic__, __source__, "__tag__:__receive_time__"'
    );

    The following table describes the parameters of the SQL job.

    Parameter

    Description

    Example

    connector

    The Simple Log Service connector. For more information, see Supported connectors.

    sls

    endpoint

    The internal Simple Log Service endpoint. For more information, see Endpoints.

    cn-hangzhou-intranet.log.aliyuncs.com

    accessId

    The AccessKey ID. For more information, see Create an AccessKey pair.

    LTAI****************

    accessKey

    The AccessKey secret. For more information, see Create an AccessKey pair.

    yourAccessKeySecret

    starttime

    The point in time at which the query starts.

    2025-02-19 00:00:00

    project

    The name of the Simple Log Service project.

    test-project

    logstore

    The name of the Simple Log Service Logstore.

    clb-access-log

    query

    The SPL statement. Note that in the development of an SQL job for Realtime Compute for Apache Flink, a string must be escaped by using single quotation marks ('').

    * | where slbid = ''slb-01''

  5. Select and right-click the SQL job settings. In the popover that appears, click Run to connect to Simple Log Service.

    image

Step 2: Consecutive queries and results

  1. Enter the following analytic statement in the SQL job and perform an aggregate query by slbid:

    SELECT slbid, count(1) as slb_cnt FROM sls_input_project GROUP BY slbid;
  2. In the upper-right corner of the page, click the Debug icon. In the debug dialog box, select Create new session cluster from the Session Cluster drop-down list and configure the parameters based on the following figure.

    image

  3. In the debug drop-down list, select the created session cluster and click OK.

    image

  4. The Results section displays the results, similar to those of the SQL job in which row filtering is performed.

    Note

    Note: The job in which row filtering is performed returns full fields. However, the Simple Log Service connector returns only specific fields based on the current SPL statement, further reducing the network transmission of data.

    image