全部產品
Search
文件中心

Realtime Compute for Apache Flink:效能白皮書(Nexmark效能測試)

更新時間:Sep 27, 2024

本文為您介紹如何使用Nexmark測試Realtime ComputeFlink版效能。

效能表現

Realtime ComputeFlink版1 CU計算資源配置下,Nexmark 19個Queries語句的效能表現最小為5000RPS,最大55000RPS。

  • 簡單業務(例如,單流過濾、字串變換等操作)1 CU每秒可以處理40000~55000條資料。

  • 複雜業務(例如,JOIN、GROUP BY或視窗函數等操作)1 CU每秒可以處理5000~10000條資料。

前提條件

  • 已安裝Java JDK 1.8.x版本或更高版本。

  • 已安裝Maven 3.8.2版本。

  • 已安裝Git,下載請參見Git

  • 已安裝Nexmark,關於Nexmark資訊詳情請參見Nexmark

  • 已建立工作空間,詳情請參見開通Realtime ComputeFlink版

    說明

    Nexmark和Git屬於第三方網站,訪問時可能出現延遲或無法訪問的情況。

測試載入器

測試圖片

  • Nexmark源表:基於Nexmark源表按照測試TPS要求產生測試資料。

  • Transformations:Nexmark的19個Queries。

  • Blackhole結果表:排除上下遊儲存的效能影響,重點驗證Flink自身效能。

資料準備

  1. 您可以通過CMD控制台輸入以下命令,下載並編譯Nexmark源碼。

    cd <path>
    git clone https://github.com/nexmark/nexmark.git
    cd nexmark/nexmark-flink
    mvn clean package
    說明

    <path>是您自訂的路徑,用來存放下載的nexmark.git檔案。

  2. 建立Nexmark連接器。

    1. 登入Realtime Compute控制台

    2. 單擊目標工作空間操作列下的控制台

    3. 串連頁面右側,單擊建立自訂連接器

    4. 在彈出的對話方塊中,單擊選擇檔案,上傳作業如下圖所示。串連hh

      說明

      包的位置在您自訂的<path>/nexmark/nexmark-flink/target目錄下,nexmark-flink-0.2-SNAPSHOT.jar是初步編譯完成的包。

    5. 單擊下一步後,單擊完成。

Nexmark連接器參數資訊如下表所示。

參數

參數值

參數說明

first-event.rate

55000

資料產生速率。

next-event.rate

events.num

100000000

產生的事件數目。

bid.proportion

92%

Bid事件佔比。

auction.proportion

6%

Auction事件佔比。

person.proportion

2%

Person事件佔比。

測試步驟

  1. 分別建立以下Queries作業,建立作業詳情請參見SQL作業開發

q0

CREATE TEMPORARY TABLE nexmark_table (  
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    auction BIGINT,
    bidder BIGINT,
    price BIGINT,
    channel VARCHAR,
    url VARCHAR,
    dateTime TIMESTAMP(3),
    extra VARCHAR
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        auction, bidder, price, channel, url, dateTime, extra 
    FROM
        bid;

q1

CREATE TEMPORARY TABLE nexmark_table (  
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    auction BIGINT,
    bidder BIGINT,
    price DECIMAL(23, 3),
    channel VARCHAR,
    url VARCHAR,
    dateTime TIMESTAMP(3),
    extra VARCHAR
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        auction,
        bidder,
        0.908 * price as price, -- convert dollar to euro
        channel,
        url,
        dateTime,
        extra 
    FROM
        bid;

q2

CREATE TEMPORARY TABLE nexmark_table (   
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    auction BIGINT,
    bidder BIGINT,
    price BIGINT,
    channel VARCHAR,
    url VARCHAR,
    dateTime TIMESTAMP(3),
    extra VARCHAR
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        auction, bidder, price, channel, url, dateTime, extra 
    FROM
        bid 
    WHERE
        MOD(auction, 123) = 0;

q3

CREATE TEMPORARY TABLE nexmark_table (  
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (name VARCHAR, city VARCHAR, state VARCHAR, id BIGINT) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW person AS
    SELECT 
        person.id,
        person.name,
        person.emailAddress,
        person.creditCard,
        person.city,
        person.state,
        dateTime,
        person.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 0;

CREATE TEMPORARY VIEW auction AS
    SELECT 
        auction.id,
        auction.itemName,
        auction.description,
        auction.initialBid,
        auction.reserve,
        dateTime,
        auction.expires,
        auction.seller,
        auction.category,
        auction.extra 
    FROM
        nexmark_table 
    WHERE
        event_type = 1;

INSERT INTO discard_sink
    SELECT 
        P.name, P.city, P.state, A.id 
    FROM
        auction AS A 
        INNER JOIN person AS P
            on A.seller = P.id 
    WHERE
        A.category = 10 
            and(P.state = 'OR' 
            OR P.state = 'ID' 
            OR P.state = 'CA');

q4

CREATE TEMPORARY TABLE nexmark_table (  
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (id BIGINT, final BIGINT) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW auction AS
    SELECT 
        auction.id,
        auction.itemName,
        auction.description,
        auction.initialBid,
        auction.reserve,
        dateTime,
        auction.expires,
        auction.seller,
        auction.category,
        auction.extra 
    FROM
        nexmark_table 
    WHERE
        event_type = 1;

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        Q.category, AVG(Q.final) 
    FROM
        (
            SELECT 
                MAX(B.price) AS final, A.category 
            FROM
                auction A, bid B 
            WHERE
                A.id = B.auction 
                    AND B.dateTime BETWEEN A.dateTime AND A.expires 
            GROUP BY
                A.id, A.category
        ) Q 
    GROUP BY
        Q.category;

q5

CREATE TEMPORARY TABLE nexmark_table (   
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (auction BIGINT, num BIGINT) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        AuctionBids.auction, AuctionBids.num 
    FROM
        (
            SELECT 
                B1.auction,
                count(*) AS num,
                HOP_START(B1.dateTime, INTERVAL '2' SECOND, INTERVAL '10' SECOND) AS starttime,
                HOP_END(B1.dateTime, INTERVAL '2' SECOND, INTERVAL '10' SECOND) AS endtime 
            FROM
                bid B1 
            GROUP BY
                B1.auction,
                HOP(B1.dateTime, INTERVAL '2' SECOND, INTERVAL '10' SECOND)
        ) AS AuctionBids 
        JOIN (
            SELECT 
                max(CountBids.num) AS maxn,
                CountBids.starttime,
                CountBids.endtime 
            FROM
                (
                    SELECT 
                        count(*) AS num,
                        HOP_START(B2.dateTime, INTERVAL '2' SECOND, INTERVAL '10' SECOND) AS starttime,
                        HOP_END(B2.dateTime, INTERVAL '2' SECOND, INTERVAL '10' SECOND) AS endtime 
                    FROM
                        bid B2 
                    GROUP BY
                        B2.auction,
                        HOP(B2.dateTime, INTERVAL '2' SECOND, INTERVAL '10' SECOND)
                ) AS CountBids 
            GROUP BY
                CountBids.starttime, CountBids.endtime
        ) AS MaxBids
            ON AuctionBids.starttime = MaxBids.starttime 
                AND AuctionBids.endtime = MaxBids.endtime 
                AND AuctionBids.num >= MaxBids.maxn;

q7

CREATE TEMPORARY TABLE nexmark_table (  
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    auction BIGINT,
    bidder BIGINT,
    price BIGINT,
    dateTime TIMESTAMP(3),
    extra VARCHAR
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        B.auction, B.price, B.bidder, B.dateTime, B.extra 
    from
        bid B 
        JOIN (
            SELECT 
                MAX(B1.price) AS maxprice,
                TUMBLE_ROWTIME(B1.dateTime, INTERVAL '10' SECOND) as dateTime 
            FROM
                bid B1 
            GROUP BY
                TUMBLE(B1.dateTime, INTERVAL '10' SECOND)
        ) B1
            ON B.price = B1.maxprice 
    WHERE
        B.dateTime BETWEEN B1.dateTime - INTERVAL '10' SECOND AND B1.dateTime;

q8

CREATE TEMPORARY TABLE nexmark_table (   
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (id BIGINT, name VARCHAR, stime TIMESTAMP(3)) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW person AS
    SELECT 
        person.id,
        person.name,
        person.emailAddress,
        person.creditCard,
        person.city,
        person.state,
        dateTime,
        person.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 0;

CREATE TEMPORARY VIEW auction AS
    SELECT 
        auction.id,
        auction.itemName,
        auction.description,
        auction.initialBid,
        auction.reserve,
        dateTime,
        auction.expires,
        auction.seller,
        auction.category,
        auction.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 1;

INSERT INTO discard_sink
    SELECT 
        P.id, P.name, P.starttime 
    FROM
        (
            SELECT 
                P.id,
                P.name,
                TUMBLE_START(P.dateTime, INTERVAL '10' SECOND) AS starttime,
                TUMBLE_END(P.dateTime, INTERVAL '10' SECOND) AS endtime 
            FROM
                person P 
            GROUP BY
                P.id,
                P.name,
                TUMBLE(P.dateTime, INTERVAL '10' SECOND)
        ) P 
        JOIN (
            SELECT 
                A.seller,
                TUMBLE_START(A.dateTime, INTERVAL '10' SECOND) AS starttime,
                TUMBLE_END(A.dateTime, INTERVAL '10' SECOND) AS endtime 
            FROM
                auction A 
            GROUP BY
                A.seller,
                TUMBLE(A.dateTime, INTERVAL '10' SECOND)
        ) A
            ON P.id = A.seller 
                AND P.starttime = A.starttime 
                AND P.endtime = A.endtime;

q9

CREATE TEMPORARY TABLE nexmark_table (
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    id BIGINT,
    itemName VARCHAR,
    description VARCHAR,
    initialBid BIGINT,
    reserve BIGINT,
    dateTime TIMESTAMP(3),
    expires TIMESTAMP(3),
    seller BIGINT,
    category BIGINT,
    extra VARCHAR,
    auction BIGINT,
    bidder BIGINT,
    price BIGINT,
    bid_dateTime TIMESTAMP(3),
    bid_extra VARCHAR
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW auction AS
    SELECT 
        auction.id,
        auction.itemName,
        auction.description,
        auction.initialBid,
        auction.reserve,
        dateTime,
        auction.expires,
        auction.seller,
        auction.category,
        auction.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 1;

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        id,
        itemName,
        description,
        initialBid,
        reserve,
        dateTime,
        expires,
        seller,
        category,
        extra,
        auction,
        bidder,
        price,
        bid_dateTime,
        bid_extra 
    FROM
        (
            SELECT 
                A.*,
                B.auction,
                B.bidder,
                B.price,
                B.dateTime AS bid_dateTime,
                B.extra AS bid_extra,
                ROW_NUMBER() 
                    OVER (
                        PARTITION BY
                            A.id
                        ORDER BY
                            B.price DESC, B.dateTime ASC
                    ) AS rownum 
            FROM
                auction A, bid B 
            WHERE
                A.id = B.auction 
                    AND B.dateTime BETWEEN A.dateTime AND A.expires
        ) 
    WHERE
        rownum <= 1;

q11

CREATE TEMPORARY TABLE nexmark_table (
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    bidder BIGINT,
    bid_count BIGINT,
    starttime TIMESTAMP(3),
    endtime TIMESTAMP(3)
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        B.bidder,
        count(*) as bid_count,
        SESSION_START(B.dateTime, INTERVAL '10' SECOND) as starttime,
        SESSION_END(B.dateTime, INTERVAL '10' SECOND) as endtime 
    FROM
        bid B 
    GROUP BY
        B.bidder, SESSION(B.dateTime, INTERVAL '10' SECOND);

q12

CREATE TEMPORARY TABLE nexmark_table (
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    bidder BIGINT,
    bid_count BIGINT,
    starttime TIMESTAMP(3),
    endtime TIMESTAMP(3)
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        B.bidder,
        count(*) as bid_count,
        TUMBLE_START(B.p_time, INTERVAL '10' SECOND) as starttime,
        TUMBLE_END(B.p_time, INTERVAL '10' SECOND) as endtime 
    FROM
        (
            SELECT 
                *, PROCTIME() as p_time 
            FROM
                bid
        ) B 
    GROUP BY
        B.bidder, TUMBLE(B.p_time, INTERVAL '10' SECOND);

q15

CREATE TEMPORARY TABLE nexmark_table (
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    `day` VARCHAR,
    total_bids BIGINT,
    rank1_bids BIGINT,
    rank2_bids BIGINT,
    rank3_bids BIGINT,
    total_bidders BIGINT,
    rank1_bidders BIGINT,
    rank2_bidders BIGINT,
    rank3_bidders BIGINT,
    total_auctions BIGINT,
    rank1_auctions BIGINT,
    rank2_auctions BIGINT,
    rank3_auctions BIGINT
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        DATE_FORMAT(dateTime, 'yyyy-MM-dd') as `day`,
        count(*) AS total_bids,
        count(*) 
            filter(where price < 10000) AS rank1_bids,
        count(*) 
            filter(where price >= 10000 
                and price < 1000000) AS rank2_bids,
        count(*) 
            filter(where price >= 1000000) AS rank3_bids,
        count(distinct bidder) AS total_bidders,
        count(distinct bidder) 
            filter(where price < 10000) AS rank1_bidders,
        count(distinct bidder) 
            filter(where price >= 10000 
                and price < 1000000) AS rank2_bidders,
        count(distinct bidder) 
            filter(where price >= 1000000) AS rank3_bidders,
        count(distinct auction) AS total_auctions,
        count(distinct auction) 
            filter(where price < 10000) AS rank1_auctions,
        count(distinct auction) 
            filter(where price >= 10000 
                and price < 1000000) AS rank2_auctions,
        count(distinct auction) 
            filter(where price >= 1000000) AS rank3_auctions 
    FROM
        bid 
    GROUP BY
        DATE_FORMAT(dateTime, 'yyyy-MM-dd');

q16

CREATE TEMPORARY TABLE nexmark_table (
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    channel VARCHAR,
    `day` VARCHAR,
    `minute` VARCHAR,
    total_bids BIGINT,
    rank1_bids BIGINT,
    rank2_bids BIGINT,
    rank3_bids BIGINT,
    total_bidders BIGINT,
    rank1_bidders BIGINT,
    rank2_bidders BIGINT,
    rank3_bidders BIGINT,
    total_auctions BIGINT,
    rank1_auctions BIGINT,
    rank2_auctions BIGINT,
    rank3_auctions BIGINT
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        channel,
        DATE_FORMAT(dateTime, 'yyyy-MM-dd') as `day`,
        max(DATE_FORMAT(dateTime, 'HH:mm')) as `minute`,
        count(*) AS total_bids,
        count(*) 
            filter(where price < 10000) AS rank1_bids,
        count(*) 
            filter(where price >= 10000 
                and price < 1000000) AS rank2_bids,
        count(*) 
            filter(where price >= 1000000) AS rank3_bids,
        count(distinct bidder) AS total_bidders,
        count(distinct bidder) 
            filter(where price < 10000) AS rank1_bidders,
        count(distinct bidder) 
            filter(where price >= 10000 
                and price < 1000000) AS rank2_bidders,
        count(distinct bidder) 
            filter(where price >= 1000000) AS rank3_bidders,
        count(distinct auction) AS total_auctions,
        count(distinct auction) 
            filter(where price < 10000) AS rank1_auctions,
        count(distinct auction) 
            filter(where price >= 10000 
                and price < 1000000) AS rank2_auctions,
        count(distinct auction) 
            filter(where price >= 1000000) AS rank3_auctions 
    FROM
        bid 
    GROUP BY
        channel, DATE_FORMAT(dateTime, 'yyyy-MM-dd');

q17

CREATE TEMPORARY TABLE nexmark_table (
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    auction BIGINT,
    `day` VARCHAR,
    total_bids BIGINT,
    rank1_bids BIGINT,
    rank2_bids BIGINT,
    rank3_bids BIGINT,
    min_price BIGINT,
    max_price BIGINT,
    avg_price BIGINT,
    sum_price BIGINT
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        auction,
        DATE_FORMAT(dateTime, 'yyyy-MM-dd') as `day`,
        count(*) AS total_bids,
        count(*) 
            filter(where price < 10000) AS rank1_bids,
        count(*) 
            filter(where price >= 10000 
                and price < 1000000) AS rank2_bids,
        count(*) 
            filter(where price >= 1000000) AS rank3_bids,
        min(price) AS min_price,
        max(price) AS max_price,
        avg(price) AS avg_price,
        sum(price) AS sum_price 
    FROM
        bid 
    GROUP BY
        auction, DATE_FORMAT(dateTime, 'yyyy-MM-dd');

q18

CREATE TEMPORARY TABLE nexmark_table (
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    auction BIGINT,
    bidder BIGINT,
    price BIGINT,
    channel VARCHAR,
    url VARCHAR,
    dateTime TIMESTAMP(3),
    extra VARCHAR
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        auction, bidder, price, channel, url, dateTime, extra 
    FROM
        (
            SELECT 
                *,
                ROW_NUMBER() 
                    OVER (
                        PARTITION BY
                            bidder, auction
                        ORDER BY
                            dateTime DESC
                    ) AS rank_number 
            FROM
                bid
        ) 
    WHERE
        rank_number <= 1;

q19

CREATE TEMPORARY TABLE nexmark_table (
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    auction BIGINT,
    bidder BIGINT,
    price BIGINT,
    channel VARCHAR,
    url VARCHAR,
    dateTime TIMESTAMP(3),
    extra VARCHAR,
    rank_number BIGINT
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        * 
    FROM
        (
            SELECT 
                *,
                ROW_NUMBER() 
                    OVER (
                        PARTITION BY
                            auction
                        ORDER BY
                            price DESC
                    ) AS rank_number 
            FROM
                bid
        ) 
    WHERE
        rank_number <= 10;

q20

CREATE TEMPORARY TABLE nexmark_table (
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    auction BIGINT,
    bidder BIGINT,
    price BIGINT,
    channel VARCHAR,
    url VARCHAR,
    bid_dateTime TIMESTAMP(3),
    bid_extra VARCHAR,
    itemName VARCHAR,
    description VARCHAR,
    initialBid BIGINT,
    reserve BIGINT,
    auction_dateTime TIMESTAMP(3),
    expires TIMESTAMP(3),
    seller BIGINT,
    category BIGINT,
    auction_extra VARCHAR
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW auction AS
    SELECT 
        auction.id,
        auction.itemName,
        auction.description,
        auction.initialBid,
        auction.reserve,
        dateTime,
        auction.expires,
        auction.seller,
        auction.category,
        auction.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 1;

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        auction,
        bidder,
        price,
        channel,
        url,
        B.dateTime,
        B.extra,
        itemName,
        description,
        initialBid,
        reserve,
        A.dateTime,
        expires,
        seller,
        category,
        A.extra 
    FROM
        bid AS B 
        INNER JOIN auction AS A
            on B.auction = A.id 
    WHERE
        A.category = 10;

q21

CREATE TEMPORARY TABLE nexmark_table (
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    auction BIGINT,
    bidder BIGINT,
    price BIGINT,
    channel VARCHAR,
    channel_id VARCHAR
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        auction,
        bidder,
        price,
        channel,
        CASE 
            WHEN lower(channel) = 'apple' 
            THEN '0' 
            WHEN lower(channel) = 'google' 
            THEN '1' 
            WHEN lower(channel) = 'facebook' 
            THEN '2' 
            WHEN lower(channel) = 'baidu' 
            THEN '3' 
            ELSE REGEXP_EXTRACT(url, '(&|^)channel_id=([^&]*)', 2) 
        END AS channel_id 
    FROM
        bid 
    where
        REGEXP_EXTRACT(url, '(&|^)channel_id=([^&]*)', 2) is not null 
            or lower(channel) in ('apple', 'google', 'facebook', 'baidu');

q22

CREATE TEMPORARY TABLE nexmark_table (
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    auction BIGINT,
    bidder BIGINT,
    price BIGINT,
    channel VARCHAR,
    dir1 VARCHAR,
    dir2 VARCHAR,
    dir3 VARCHAR
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        auction,
        bidder,
        price,
        channel,
        SPLIT_INDEX(url, '/', 3) as dir1,
        SPLIT_INDEX(url, '/', 4) as dir2,
        SPLIT_INDEX(url, '/', 5) as dir3 
    FROM
        bid;
  1. 單擊右側頁簽選擇更多配置中的引擎版本,版本號碼選擇為vvr-6.0.4-flink-1.15。

  2. 在SQL編輯地區右上方,單擊部署

  3. 營運中心 > 作業營運頁面中分別單擊目標作業,在部署詳情頁簽下資源配置部分,單擊右上方編輯。填寫配置項並發度為1、TaskManager Memory為4 GiB和TaskManager CPU Cores為1 Core,配置如下圖所示。中文阿巴阿巴

  4. 配置完成後,在資源配置右上方單擊儲存

  5. 分別單擊目標作業操作列下的啟動

  6. 待目標作業狀態為已完成時,單擊目標作業名稱,在運行事件頁簽下查看資訊,擷取作業開始時間結束時間,如下所示。

    • 開始時間:Job was successfully started.

    • 結束時間:Job has successfully finished.

預期結果

說明

Duration計算方式為結束時間減去開始時間的差值,RPS計算方式為events.numDuration的比值。

query

開始時間

結束時間

Duration(單位:秒)

RPS

q0

2023-02-07 11:55:13

2023-02-07 12:26:00

1847

54141

q1

2023-02-07 12:28:06

2023-02-07 12:58:53

1847

54141

q2

2023-02-07 12:29:13

2023-02-07 13:00:02

1849

54083

q3

2023-02-07 12:47:12

2023-02-07 13:18:01

1849

54083

q4

2023-02-07 13:30:08

2023-02-07 14:04:46

2078

48123

q5

2023-02-07 14:15:07

2023-02-07 14:45:45

1838

54406

q7

2023-02-07 14:24:13

2023-02-07 17:08:42

9869

10132

q8

2023-02-07 14:27:09

2023-02-07 14:57:47

1838

54406

q9

2023-02-07 14:35:53

2023-02-07 15:38:16

3743

26716

q11

2023-02-07 14:38:14

2023-02-07 15:08:53

1839

54377

q12

2023-02-07 14:40:36

2023-02-07 15:11:15

1839s

54377

q15

2023-02-07 14:42:43

2023-02-07 15:13:22

1839

54377

q16

2023-02-07 14:46:17

2023-02-07 16:03:09

4012

24925

q17

2023-02-07 16:02:19

2023-02-07 16:33:07

1848

54112

q18

2023-02-07 16:04:43

2023-02-07 16:41:08

2185

45766

q19

2023-02-07 16:07:13

2023-02-07 16:40:01

1968

50813

q20

2023-02-07 16:13:29

2023-02-07 16:59:13

2744

36443

q21

2023-02-07 16:16:32

2023-02-07 16:47:11

1849

54083

q22

2023-02-07 16:18:53

2023-02-07 16:49:41

1848

54112