All Products
Search
Document Center

Realtime Compute for Apache Flink:Nexmark-based performance white paper

Last Updated:Oct 17, 2024

This topic describes how to use Nexmark to test the performance of Realtime Compute for Apache Flink.

Performance

When one compute unit (CU) is used for computing in Realtime Compute for Apache Flink, the Nexmark test result on 19 queries shows that the minimum records per second (RPS) is 5,000 and the maximum RPS is 55,000.

  • For simple operations such as single-stream filtering and string conversion, one CU can process 40,000 to 55,000 data records per second.

  • For complex operations such as JOIN, GROUP BY, or operations that contain window functions, one CU can process 5,000 to 10,000 data records per second.

Prerequisites

  • Java Development Kit (JDK) 1.8.X or later is installed.

  • Maven 3.8.2 is installed.

  • Git is installed. For more information about how to download Git, see Git.

  • Nexmark is installed. For more information about Nexmark, see Nexmark.

  • A workspace is created. For more information, see Activate Realtime Compute for Apache Flink.

    Note

    Nexmark and Git belong to third-party websites. Your access to the tools may be delayed or you may fail to access the tools.

Test tools

测试图片

  • Nexmark source table: Test data is generated based on a Nexmark source table and the test requirements for transactions per second (TPS).

  • Transformations: A total number of 19 queries of Nexmark are transformed.

  • Blackhole result table: A Blackhole result table instead of an external data store is used to prevent the impact of upstream and downstream data stores on the performance of Realtime Compute for Apache Flink. This helps focus on verifying the performance of Realtime Compute for Apache Flink.

Prepare data

  1. Run the following commands in the command line interface (CLI) to download and compile the Nexmark source code:

    cd <path>
    git clone https://github.com/nexmark/nexmark.git
    cd nexmark/nexmark-flink
    mvn clean package
    Note

    <path> indicates the path that you specified to store the downloaded nexmark.git file.

  2. Create a Nexmark connector.

    1. Log on to the Realtime Compute for Apache Flink console.

    2. On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.

    3. In the left-side navigation pane, click Connectors. On the Connectors page, click Create Custom Connector in the upper-right corner.

    4. In the Create custom connector dialog box, click Click to select and select the package that you want to upload. The following figure shows an example.连接hh

      Note

      The package is stored in the <path>/nexmark/nexmark-flink/target directory that you specified. The nexmark-flink-0.2-SNAPSHOT.jar file is the initially compiled package.

    5. Click Next and then click Finish.

The following table describes the parameters of the Nexmark connector.

Parameter

Example

Description

first-event.rate

55000

The rate at which data is generated.

next-event.rate

events.num

100000000

The number of events that are generated.

bid.proportion

92%

The percentage of Bid events.

auction.proportion

6%

The percentage of Auction events.

person.proportion

2%

The percentage of Person events.

Procedure

  1. Create the following query drafts in the console of fully managed Flink. For more information, see Develop an SQL draft.

q0

CREATE TEMPORARY TABLE nexmark_table (  
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    auction BIGINT,
    bidder BIGINT,
    price BIGINT,
    channel VARCHAR,
    url VARCHAR,
    dateTime TIMESTAMP(3),
    extra VARCHAR
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        auction, bidder, price, channel, url, dateTime, extra 
    FROM
        bid;

q1

CREATE TEMPORARY TABLE nexmark_table (  
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    auction BIGINT,
    bidder BIGINT,
    price DECIMAL(23, 3),
    channel VARCHAR,
    url VARCHAR,
    dateTime TIMESTAMP(3),
    extra VARCHAR
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        auction,
        bidder,
        0.908 * price as price, -- convert dollar to euro
        channel,
        url,
        dateTime,
        extra 
    FROM
        bid;

q2

CREATE TEMPORARY TABLE nexmark_table (   
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    auction BIGINT,
    bidder BIGINT,
    price BIGINT,
    channel VARCHAR,
    url VARCHAR,
    dateTime TIMESTAMP(3),
    extra VARCHAR
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        auction, bidder, price, channel, url, dateTime, extra 
    FROM
        bid 
    WHERE
        MOD(auction, 123) = 0;

q3

CREATE TEMPORARY TABLE nexmark_table (  
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (name VARCHAR, city VARCHAR, state VARCHAR, id BIGINT) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW person AS
    SELECT 
        person.id,
        person.name,
        person.emailAddress,
        person.creditCard,
        person.city,
        person.state,
        dateTime,
        person.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 0;

CREATE TEMPORARY VIEW auction AS
    SELECT 
        auction.id,
        auction.itemName,
        auction.description,
        auction.initialBid,
        auction.reserve,
        dateTime,
        auction.expires,
        auction.seller,
        auction.category,
        auction.extra 
    FROM
        nexmark_table 
    WHERE
        event_type = 1;

INSERT INTO discard_sink
    SELECT 
        P.name, P.city, P.state, A.id 
    FROM
        auction AS A 
        INNER JOIN person AS P
            on A.seller = P.id 
    WHERE
        A.category = 10 
            and(P.state = 'OR' 
            OR P.state = 'ID' 
            OR P.state = 'CA');

q4

CREATE TEMPORARY TABLE nexmark_table (  
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (id BIGINT, final BIGINT) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW auction AS
    SELECT 
        auction.id,
        auction.itemName,
        auction.description,
        auction.initialBid,
        auction.reserve,
        dateTime,
        auction.expires,
        auction.seller,
        auction.category,
        auction.extra 
    FROM
        nexmark_table 
    WHERE
        event_type = 1;

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        Q.category, AVG(Q.final) 
    FROM
        (
            SELECT 
                MAX(B.price) AS final, A.category 
            FROM
                auction A, bid B 
            WHERE
                A.id = B.auction 
                    AND B.dateTime BETWEEN A.dateTime AND A.expires 
            GROUP BY
                A.id, A.category
        ) Q 
    GROUP BY
        Q.category;

q5

CREATE TEMPORARY TABLE nexmark_table (   
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (auction BIGINT, num BIGINT) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        AuctionBids.auction, AuctionBids.num 
    FROM
        (
            SELECT 
                B1.auction,
                count(*) AS num,
                HOP_START(B1.dateTime, INTERVAL '2' SECOND, INTERVAL '10' SECOND) AS starttime,
                HOP_END(B1.dateTime, INTERVAL '2' SECOND, INTERVAL '10' SECOND) AS endtime 
            FROM
                bid B1 
            GROUP BY
                B1.auction,
                HOP(B1.dateTime, INTERVAL '2' SECOND, INTERVAL '10' SECOND)
        ) AS AuctionBids 
        JOIN (
            SELECT 
                max(CountBids.num) AS maxn,
                CountBids.starttime,
                CountBids.endtime 
            FROM
                (
                    SELECT 
                        count(*) AS num,
                        HOP_START(B2.dateTime, INTERVAL '2' SECOND, INTERVAL '10' SECOND) AS starttime,
                        HOP_END(B2.dateTime, INTERVAL '2' SECOND, INTERVAL '10' SECOND) AS endtime 
                    FROM
                        bid B2 
                    GROUP BY
                        B2.auction,
                        HOP(B2.dateTime, INTERVAL '2' SECOND, INTERVAL '10' SECOND)
                ) AS CountBids 
            GROUP BY
                CountBids.starttime, CountBids.endtime
        ) AS MaxBids
            ON AuctionBids.starttime = MaxBids.starttime 
                AND AuctionBids.endtime = MaxBids.endtime 
                AND AuctionBids.num >= MaxBids.maxn;

q7

CREATE TEMPORARY TABLE nexmark_table (  
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    auction BIGINT,
    bidder BIGINT,
    price BIGINT,
    dateTime TIMESTAMP(3),
    extra VARCHAR
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        B.auction, B.price, B.bidder, B.dateTime, B.extra 
    from
        bid B 
        JOIN (
            SELECT 
                MAX(B1.price) AS maxprice,
                TUMBLE_ROWTIME(B1.dateTime, INTERVAL '10' SECOND) as dateTime 
            FROM
                bid B1 
            GROUP BY
                TUMBLE(B1.dateTime, INTERVAL '10' SECOND)
        ) B1
            ON B.price = B1.maxprice 
    WHERE
        B.dateTime BETWEEN B1.dateTime - INTERVAL '10' SECOND AND B1.dateTime;

q8

CREATE TEMPORARY TABLE nexmark_table (   
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (id BIGINT, name VARCHAR, stime TIMESTAMP(3)) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW person AS
    SELECT 
        person.id,
        person.name,
        person.emailAddress,
        person.creditCard,
        person.city,
        person.state,
        dateTime,
        person.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 0;

CREATE TEMPORARY VIEW auction AS
    SELECT 
        auction.id,
        auction.itemName,
        auction.description,
        auction.initialBid,
        auction.reserve,
        dateTime,
        auction.expires,
        auction.seller,
        auction.category,
        auction.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 1;

INSERT INTO discard_sink
    SELECT 
        P.id, P.name, P.starttime 
    FROM
        (
            SELECT 
                P.id,
                P.name,
                TUMBLE_START(P.dateTime, INTERVAL '10' SECOND) AS starttime,
                TUMBLE_END(P.dateTime, INTERVAL '10' SECOND) AS endtime 
            FROM
                person P 
            GROUP BY
                P.id,
                P.name,
                TUMBLE(P.dateTime, INTERVAL '10' SECOND)
        ) P 
        JOIN (
            SELECT 
                A.seller,
                TUMBLE_START(A.dateTime, INTERVAL '10' SECOND) AS starttime,
                TUMBLE_END(A.dateTime, INTERVAL '10' SECOND) AS endtime 
            FROM
                auction A 
            GROUP BY
                A.seller,
                TUMBLE(A.dateTime, INTERVAL '10' SECOND)
        ) A
            ON P.id = A.seller 
                AND P.starttime = A.starttime 
                AND P.endtime = A.endtime;

q9

CREATE TEMPORARY TABLE nexmark_table (
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    id BIGINT,
    itemName VARCHAR,
    description VARCHAR,
    initialBid BIGINT,
    reserve BIGINT,
    dateTime TIMESTAMP(3),
    expires TIMESTAMP(3),
    seller BIGINT,
    category BIGINT,
    extra VARCHAR,
    auction BIGINT,
    bidder BIGINT,
    price BIGINT,
    bid_dateTime TIMESTAMP(3),
    bid_extra VARCHAR
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW auction AS
    SELECT 
        auction.id,
        auction.itemName,
        auction.description,
        auction.initialBid,
        auction.reserve,
        dateTime,
        auction.expires,
        auction.seller,
        auction.category,
        auction.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 1;

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        id,
        itemName,
        description,
        initialBid,
        reserve,
        dateTime,
        expires,
        seller,
        category,
        extra,
        auction,
        bidder,
        price,
        bid_dateTime,
        bid_extra 
    FROM
        (
            SELECT 
                A.*,
                B.auction,
                B.bidder,
                B.price,
                B.dateTime AS bid_dateTime,
                B.extra AS bid_extra,
                ROW_NUMBER() 
                    OVER (
                        PARTITION BY
                            A.id
                        ORDER BY
                            B.price DESC, B.dateTime ASC
                    ) AS rownum 
            FROM
                auction A, bid B 
            WHERE
                A.id = B.auction 
                    AND B.dateTime BETWEEN A.dateTime AND A.expires
        ) 
    WHERE
        rownum <= 1;

q11

CREATE TEMPORARY TABLE nexmark_table (
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    bidder BIGINT,
    bid_count BIGINT,
    starttime TIMESTAMP(3),
    endtime TIMESTAMP(3)
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        B.bidder,
        count(*) as bid_count,
        SESSION_START(B.dateTime, INTERVAL '10' SECOND) as starttime,
        SESSION_END(B.dateTime, INTERVAL '10' SECOND) as endtime 
    FROM
        bid B 
    GROUP BY
        B.bidder, SESSION(B.dateTime, INTERVAL '10' SECOND);

q12

CREATE TEMPORARY TABLE nexmark_table (
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    bidder BIGINT,
    bid_count BIGINT,
    starttime TIMESTAMP(3),
    endtime TIMESTAMP(3)
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        B.bidder,
        count(*) as bid_count,
        TUMBLE_START(B.p_time, INTERVAL '10' SECOND) as starttime,
        TUMBLE_END(B.p_time, INTERVAL '10' SECOND) as endtime 
    FROM
        (
            SELECT 
                *, PROCTIME() as p_time 
            FROM
                bid
        ) B 
    GROUP BY
        B.bidder, TUMBLE(B.p_time, INTERVAL '10' SECOND);

q15

CREATE TEMPORARY TABLE nexmark_table (
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    `day` VARCHAR,
    total_bids BIGINT,
    rank1_bids BIGINT,
    rank2_bids BIGINT,
    rank3_bids BIGINT,
    total_bidders BIGINT,
    rank1_bidders BIGINT,
    rank2_bidders BIGINT,
    rank3_bidders BIGINT,
    total_auctions BIGINT,
    rank1_auctions BIGINT,
    rank2_auctions BIGINT,
    rank3_auctions BIGINT
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        DATE_FORMAT(dateTime, 'yyyy-MM-dd') as `day`,
        count(*) AS total_bids,
        count(*) 
            filter(where price < 10000) AS rank1_bids,
        count(*) 
            filter(where price >= 10000 
                and price < 1000000) AS rank2_bids,
        count(*) 
            filter(where price >= 1000000) AS rank3_bids,
        count(distinct bidder) AS total_bidders,
        count(distinct bidder) 
            filter(where price < 10000) AS rank1_bidders,
        count(distinct bidder) 
            filter(where price >= 10000 
                and price < 1000000) AS rank2_bidders,
        count(distinct bidder) 
            filter(where price >= 1000000) AS rank3_bidders,
        count(distinct auction) AS total_auctions,
        count(distinct auction) 
            filter(where price < 10000) AS rank1_auctions,
        count(distinct auction) 
            filter(where price >= 10000 
                and price < 1000000) AS rank2_auctions,
        count(distinct auction) 
            filter(where price >= 1000000) AS rank3_auctions 
    FROM
        bid 
    GROUP BY
        DATE_FORMAT(dateTime, 'yyyy-MM-dd');

q16

CREATE TEMPORARY TABLE nexmark_table (
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    channel VARCHAR,
    `day` VARCHAR,
    `minute` VARCHAR,
    total_bids BIGINT,
    rank1_bids BIGINT,
    rank2_bids BIGINT,
    rank3_bids BIGINT,
    total_bidders BIGINT,
    rank1_bidders BIGINT,
    rank2_bidders BIGINT,
    rank3_bidders BIGINT,
    total_auctions BIGINT,
    rank1_auctions BIGINT,
    rank2_auctions BIGINT,
    rank3_auctions BIGINT
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        channel,
        DATE_FORMAT(dateTime, 'yyyy-MM-dd') as `day`,
        max(DATE_FORMAT(dateTime, 'HH:mm')) as `minute`,
        count(*) AS total_bids,
        count(*) 
            filter(where price < 10000) AS rank1_bids,
        count(*) 
            filter(where price >= 10000 
                and price < 1000000) AS rank2_bids,
        count(*) 
            filter(where price >= 1000000) AS rank3_bids,
        count(distinct bidder) AS total_bidders,
        count(distinct bidder) 
            filter(where price < 10000) AS rank1_bidders,
        count(distinct bidder) 
            filter(where price >= 10000 
                and price < 1000000) AS rank2_bidders,
        count(distinct bidder) 
            filter(where price >= 1000000) AS rank3_bidders,
        count(distinct auction) AS total_auctions,
        count(distinct auction) 
            filter(where price < 10000) AS rank1_auctions,
        count(distinct auction) 
            filter(where price >= 10000 
                and price < 1000000) AS rank2_auctions,
        count(distinct auction) 
            filter(where price >= 1000000) AS rank3_auctions 
    FROM
        bid 
    GROUP BY
        channel, DATE_FORMAT(dateTime, 'yyyy-MM-dd');

q17

CREATE TEMPORARY TABLE nexmark_table (
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    auction BIGINT,
    `day` VARCHAR,
    total_bids BIGINT,
    rank1_bids BIGINT,
    rank2_bids BIGINT,
    rank3_bids BIGINT,
    min_price BIGINT,
    max_price BIGINT,
    avg_price BIGINT,
    sum_price BIGINT
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        auction,
        DATE_FORMAT(dateTime, 'yyyy-MM-dd') as `day`,
        count(*) AS total_bids,
        count(*) 
            filter(where price < 10000) AS rank1_bids,
        count(*) 
            filter(where price >= 10000 
                and price < 1000000) AS rank2_bids,
        count(*) 
            filter(where price >= 1000000) AS rank3_bids,
        min(price) AS min_price,
        max(price) AS max_price,
        avg(price) AS avg_price,
        sum(price) AS sum_price 
    FROM
        bid 
    GROUP BY
        auction, DATE_FORMAT(dateTime, 'yyyy-MM-dd');

q18

CREATE TEMPORARY TABLE nexmark_table (
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    auction BIGINT,
    bidder BIGINT,
    price BIGINT,
    channel VARCHAR,
    url VARCHAR,
    dateTime TIMESTAMP(3),
    extra VARCHAR
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        auction, bidder, price, channel, url, dateTime, extra 
    FROM
        (
            SELECT 
                *,
                ROW_NUMBER() 
                    OVER (
                        PARTITION BY
                            bidder, auction
                        ORDER BY
                            dateTime DESC
                    ) AS rank_number 
            FROM
                bid
        ) 
    WHERE
        rank_number <= 1;

q19

CREATE TEMPORARY TABLE nexmark_table (
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    auction BIGINT,
    bidder BIGINT,
    price BIGINT,
    channel VARCHAR,
    url VARCHAR,
    dateTime TIMESTAMP(3),
    extra VARCHAR,
    rank_number BIGINT
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        * 
    FROM
        (
            SELECT 
                *,
                ROW_NUMBER() 
                    OVER (
                        PARTITION BY
                            auction
                        ORDER BY
                            price DESC
                    ) AS rank_number 
            FROM
                bid
        ) 
    WHERE
        rank_number <= 10;

q20

CREATE TEMPORARY TABLE nexmark_table (
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    auction BIGINT,
    bidder BIGINT,
    price BIGINT,
    channel VARCHAR,
    url VARCHAR,
    bid_dateTime TIMESTAMP(3),
    bid_extra VARCHAR,
    itemName VARCHAR,
    description VARCHAR,
    initialBid BIGINT,
    reserve BIGINT,
    auction_dateTime TIMESTAMP(3),
    expires TIMESTAMP(3),
    seller BIGINT,
    category BIGINT,
    auction_extra VARCHAR
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW auction AS
    SELECT 
        auction.id,
        auction.itemName,
        auction.description,
        auction.initialBid,
        auction.reserve,
        dateTime,
        auction.expires,
        auction.seller,
        auction.category,
        auction.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 1;

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        auction,
        bidder,
        price,
        channel,
        url,
        B.dateTime,
        B.extra,
        itemName,
        description,
        initialBid,
        reserve,
        A.dateTime,
        expires,
        seller,
        category,
        A.extra 
    FROM
        bid AS B 
        INNER JOIN auction AS A
            on B.auction = A.id 
    WHERE
        A.category = 10;

q21

CREATE TEMPORARY TABLE nexmark_table (
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    auction BIGINT,
    bidder BIGINT,
    price BIGINT,
    channel VARCHAR,
    channel_id VARCHAR
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        auction,
        bidder,
        price,
        channel,
        CASE 
            WHEN lower(channel) = 'apple' 
            THEN '0' 
            WHEN lower(channel) = 'google' 
            THEN '1' 
            WHEN lower(channel) = 'facebook' 
            THEN '2' 
            WHEN lower(channel) = 'baidu' 
            THEN '3' 
            ELSE REGEXP_EXTRACT(url, '(&|^)channel_id=([^&]*)', 2) 
        END AS channel_id 
    FROM
        bid 
    where
        REGEXP_EXTRACT(url, '(&|^)channel_id=([^&]*)', 2) is not null 
            or lower(channel) in ('apple', 'google', 'facebook', 'baidu');

q22

CREATE TEMPORARY TABLE nexmark_table (
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    auction BIGINT,
    bidder BIGINT,
    price BIGINT,
    channel VARCHAR,
    dir1 VARCHAR,
    dir2 VARCHAR,
    dir3 VARCHAR
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        auction,
        bidder,
        price,
        channel,
        SPLIT_INDEX(url, '/', 3) as dir1,
        SPLIT_INDEX(url, '/', 4) as dir2,
        SPLIT_INDEX(url, '/', 5) as dir3 
    FROM
        bid;
  1. In the left-side navigation pane, choose Development > ETL. On the right side of the SQL editor, click the Configurations tab. Then, select vvr-6.0.4-flink-1.15 from the Engine Version drop-down list.

  2. In the upper-right corner of the SQL editor, click Deploy.

  3. In the left-side navigation pane, choose O&M > Deployments. On the Deployments page, find the deployment that you want to manage and click the name of the deployment. In the panel that appears, click the Configuration tab. On the Configuration tab, click Edit in the upper-right corner of the Resources section. Set the Parallelism parameter to 1, the Task Manager Memory parameter to 4 GiB, and Task Manager CPU parameter to 1 Core. The following figure shows an example.中文阿巴阿巴

  4. After the configuration is complete, click Save in the upper-right corner of the Resources section.

  5. Find the deployment and click Start in the Actions column.

  6. After the state of the deployment changes to FINISHED, click the name of the deployment. In the panel that appears, click the Events tab. On the Events tab, view the start time and end time of the deployment. You can obtain the start time and end time of the deployment based on the following information:

    • Information that corresponds to the start time: Job was successfully started.

    • Information that corresponds to the end time: Job has successfully finished.

Expected result

Note

Duration is the difference between the end time and the start time. RPS is the result of events.num divided by Duration.

Query

Start time

End time

Duration (Unit: seconds)

RPS

q0

2023-02-07 11:55:13

2023-02-07 12:26:00

1,847

54,141

q1

2023-02-07 12:28:06

2023-02-07 12:58:53

1,847

54,141

q2

2023-02-07 12:29:13

2023-02-07 13:00:02

1,849

54,083

q3

2023-02-07 12:47:12

2023-02-07 13:18:01

1,849

54,083

q4

2023-02-07 13:30:08

2023-02-07 14:04:46

2,078

48,123

q5

2023-02-07 14:15:07

2023-02-07 14:45:45

1,838

54,406

q7

2023-02-07 14:24:13

2023-02-07 17:08:42

9,869

10,132

q8

2023-02-07 14:27:09

2023-02-07 14:57:47

1,838

54,406

q9

2023-02-07 14:35:53

2023-02-07 15:38:16

3,743

26,716

q11

2023-02-07 14:38:14

2023-02-07 15:08:53

1,839

54,377

q12

2023-02-07 14:40:36

2023-02-07 15:11:15

1,839s

54,377

q15

2023-02-07 14:42:43

2023-02-07 15:13:22

1,839

54,377

q16

2023-02-07 14:46:17

2023-02-07 16:03:09

4,012

24,925

q17

2023-02-07 16:02:19

2023-02-07 16:33:07

1,848

54,112

q18

2023-02-07 16:04:43

2023-02-07 16:41:08

2,185

45,766

q19

2023-02-07 16:07:13

2023-02-07 16:40:01

1,968

50,813

q20

2023-02-07 16:13:29

2023-02-07 16:59:13

2,744

36,443

q21

2023-02-07 16:16:32

2023-02-07 16:47:11

1,849

54,083

q22

2023-02-07 16:18:53

2023-02-07 16:49:41

1,848

54,112