全部产品
Search
文档中心

实时计算Flink版:性能白皮书(Nexmark性能测试)

更新时间:Sep 26, 2024

本文为您介绍如何使用Nexmark测试实时计算Flink版性能。

性能表现

实时计算Flink版1 CU计算资源配置下,Nexmark 19个Queries语句的性能表现最小为5000RPS,最大55000RPS。

  • 简单业务(例如,单流过滤、字符串变换等操作)1 CU每秒可以处理40000~55000条数据。

  • 复杂业务(例如,JOIN、GROUP BY或窗口函数等操作)1 CU每秒可以处理5000~10000条数据。

前提条件

  • 已安装Java JDK 1.8.x版本或更高版本。

  • 已安装Maven 3.8.2版本。

  • 已安装Git,下载请参见Git

  • 已安装Nexmark,关于Nexmark信息详情请参见Nexmark

  • 已创建工作空间,详情请参见开通实时计算Flink版

    说明

    Nexmark和Git属于第三方网站,访问时可能出现延迟或无法访问的情况。

测试工具

测试图片

  • Nexmark源表:基于Nexmark源表按照测试TPS要求生成测试数据。

  • Transformations:Nexmark的19个Queries。

  • Blackhole结果表:排除上下游存储的性能影响,重点验证Flink自身性能。

数据准备

  1. 您可以通过CMD控制台输入以下命令,下载并编译Nexmark源码。

    cd <path>
    git clone https://github.com/nexmark/nexmark.git
    cd nexmark/nexmark-flink
    mvn clean package
    说明

    <path>是您自定义的路径,用来存放下载的nexmark.git文件。

  2. 创建Nexmark连接器。

    1. 登录实时计算控制台

    2. 单击目标工作空间操作列下的控制台

    3. 连接页面右侧,单击创建自定义连接器

    4. 在弹出的对话框中,单击选择文件,上传作业如下图所示。连接hh

      说明

      包的位置在您自定义的<path>/nexmark/nexmark-flink/target目录下,nexmark-flink-0.2-SNAPSHOT.jar是初步编译完成的包。

    5. 单击下一步后,单击完成。

Nexmark连接器参数信息如下表所示。

参数

参数值

参数说明

first-event.rate

55000

数据生成速率。

next-event.rate

events.num

100000000

生成的事件数。

bid.proportion

92%

Bid事件占比。

auction.proportion

6%

Auction事件占比。

person.proportion

2%

Person事件占比。

测试步骤

  1. 分别创建以下Queries作业,创建作业详情请参见SQL作业开发

q0

CREATE TEMPORARY TABLE nexmark_table (  
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    auction BIGINT,
    bidder BIGINT,
    price BIGINT,
    channel VARCHAR,
    url VARCHAR,
    dateTime TIMESTAMP(3),
    extra VARCHAR
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        auction, bidder, price, channel, url, dateTime, extra 
    FROM
        bid;

q1

CREATE TEMPORARY TABLE nexmark_table (  
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    auction BIGINT,
    bidder BIGINT,
    price DECIMAL(23, 3),
    channel VARCHAR,
    url VARCHAR,
    dateTime TIMESTAMP(3),
    extra VARCHAR
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        auction,
        bidder,
        0.908 * price as price, -- convert dollar to euro
        channel,
        url,
        dateTime,
        extra 
    FROM
        bid;

q2

CREATE TEMPORARY TABLE nexmark_table (   
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    auction BIGINT,
    bidder BIGINT,
    price BIGINT,
    channel VARCHAR,
    url VARCHAR,
    dateTime TIMESTAMP(3),
    extra VARCHAR
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        auction, bidder, price, channel, url, dateTime, extra 
    FROM
        bid 
    WHERE
        MOD(auction, 123) = 0;

q3

CREATE TEMPORARY TABLE nexmark_table (  
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (name VARCHAR, city VARCHAR, state VARCHAR, id BIGINT) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW person AS
    SELECT 
        person.id,
        person.name,
        person.emailAddress,
        person.creditCard,
        person.city,
        person.state,
        dateTime,
        person.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 0;

CREATE TEMPORARY VIEW auction AS
    SELECT 
        auction.id,
        auction.itemName,
        auction.description,
        auction.initialBid,
        auction.reserve,
        dateTime,
        auction.expires,
        auction.seller,
        auction.category,
        auction.extra 
    FROM
        nexmark_table 
    WHERE
        event_type = 1;

INSERT INTO discard_sink
    SELECT 
        P.name, P.city, P.state, A.id 
    FROM
        auction AS A 
        INNER JOIN person AS P
            on A.seller = P.id 
    WHERE
        A.category = 10 
            and(P.state = 'OR' 
            OR P.state = 'ID' 
            OR P.state = 'CA');

q4

CREATE TEMPORARY TABLE nexmark_table (  
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (id BIGINT, final BIGINT) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW auction AS
    SELECT 
        auction.id,
        auction.itemName,
        auction.description,
        auction.initialBid,
        auction.reserve,
        dateTime,
        auction.expires,
        auction.seller,
        auction.category,
        auction.extra 
    FROM
        nexmark_table 
    WHERE
        event_type = 1;

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        Q.category, AVG(Q.final) 
    FROM
        (
            SELECT 
                MAX(B.price) AS final, A.category 
            FROM
                auction A, bid B 
            WHERE
                A.id = B.auction 
                    AND B.dateTime BETWEEN A.dateTime AND A.expires 
            GROUP BY
                A.id, A.category
        ) Q 
    GROUP BY
        Q.category;

q5

CREATE TEMPORARY TABLE nexmark_table (   
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (auction BIGINT, num BIGINT) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        AuctionBids.auction, AuctionBids.num 
    FROM
        (
            SELECT 
                B1.auction,
                count(*) AS num,
                HOP_START(B1.dateTime, INTERVAL '2' SECOND, INTERVAL '10' SECOND) AS starttime,
                HOP_END(B1.dateTime, INTERVAL '2' SECOND, INTERVAL '10' SECOND) AS endtime 
            FROM
                bid B1 
            GROUP BY
                B1.auction,
                HOP(B1.dateTime, INTERVAL '2' SECOND, INTERVAL '10' SECOND)
        ) AS AuctionBids 
        JOIN (
            SELECT 
                max(CountBids.num) AS maxn,
                CountBids.starttime,
                CountBids.endtime 
            FROM
                (
                    SELECT 
                        count(*) AS num,
                        HOP_START(B2.dateTime, INTERVAL '2' SECOND, INTERVAL '10' SECOND) AS starttime,
                        HOP_END(B2.dateTime, INTERVAL '2' SECOND, INTERVAL '10' SECOND) AS endtime 
                    FROM
                        bid B2 
                    GROUP BY
                        B2.auction,
                        HOP(B2.dateTime, INTERVAL '2' SECOND, INTERVAL '10' SECOND)
                ) AS CountBids 
            GROUP BY
                CountBids.starttime, CountBids.endtime
        ) AS MaxBids
            ON AuctionBids.starttime = MaxBids.starttime 
                AND AuctionBids.endtime = MaxBids.endtime 
                AND AuctionBids.num >= MaxBids.maxn;

q7

CREATE TEMPORARY TABLE nexmark_table (  
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    auction BIGINT,
    bidder BIGINT,
    price BIGINT,
    dateTime TIMESTAMP(3),
    extra VARCHAR
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        B.auction, B.price, B.bidder, B.dateTime, B.extra 
    from
        bid B 
        JOIN (
            SELECT 
                MAX(B1.price) AS maxprice,
                TUMBLE_ROWTIME(B1.dateTime, INTERVAL '10' SECOND) as dateTime 
            FROM
                bid B1 
            GROUP BY
                TUMBLE(B1.dateTime, INTERVAL '10' SECOND)
        ) B1
            ON B.price = B1.maxprice 
    WHERE
        B.dateTime BETWEEN B1.dateTime - INTERVAL '10' SECOND AND B1.dateTime;

q8

CREATE TEMPORARY TABLE nexmark_table (   
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (id BIGINT, name VARCHAR, stime TIMESTAMP(3)) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW person AS
    SELECT 
        person.id,
        person.name,
        person.emailAddress,
        person.creditCard,
        person.city,
        person.state,
        dateTime,
        person.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 0;

CREATE TEMPORARY VIEW auction AS
    SELECT 
        auction.id,
        auction.itemName,
        auction.description,
        auction.initialBid,
        auction.reserve,
        dateTime,
        auction.expires,
        auction.seller,
        auction.category,
        auction.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 1;

INSERT INTO discard_sink
    SELECT 
        P.id, P.name, P.starttime 
    FROM
        (
            SELECT 
                P.id,
                P.name,
                TUMBLE_START(P.dateTime, INTERVAL '10' SECOND) AS starttime,
                TUMBLE_END(P.dateTime, INTERVAL '10' SECOND) AS endtime 
            FROM
                person P 
            GROUP BY
                P.id,
                P.name,
                TUMBLE(P.dateTime, INTERVAL '10' SECOND)
        ) P 
        JOIN (
            SELECT 
                A.seller,
                TUMBLE_START(A.dateTime, INTERVAL '10' SECOND) AS starttime,
                TUMBLE_END(A.dateTime, INTERVAL '10' SECOND) AS endtime 
            FROM
                auction A 
            GROUP BY
                A.seller,
                TUMBLE(A.dateTime, INTERVAL '10' SECOND)
        ) A
            ON P.id = A.seller 
                AND P.starttime = A.starttime 
                AND P.endtime = A.endtime;

q9

CREATE TEMPORARY TABLE nexmark_table (
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    id BIGINT,
    itemName VARCHAR,
    description VARCHAR,
    initialBid BIGINT,
    reserve BIGINT,
    dateTime TIMESTAMP(3),
    expires TIMESTAMP(3),
    seller BIGINT,
    category BIGINT,
    extra VARCHAR,
    auction BIGINT,
    bidder BIGINT,
    price BIGINT,
    bid_dateTime TIMESTAMP(3),
    bid_extra VARCHAR
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW auction AS
    SELECT 
        auction.id,
        auction.itemName,
        auction.description,
        auction.initialBid,
        auction.reserve,
        dateTime,
        auction.expires,
        auction.seller,
        auction.category,
        auction.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 1;

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        id,
        itemName,
        description,
        initialBid,
        reserve,
        dateTime,
        expires,
        seller,
        category,
        extra,
        auction,
        bidder,
        price,
        bid_dateTime,
        bid_extra 
    FROM
        (
            SELECT 
                A.*,
                B.auction,
                B.bidder,
                B.price,
                B.dateTime AS bid_dateTime,
                B.extra AS bid_extra,
                ROW_NUMBER() 
                    OVER (
                        PARTITION BY
                            A.id
                        ORDER BY
                            B.price DESC, B.dateTime ASC
                    ) AS rownum 
            FROM
                auction A, bid B 
            WHERE
                A.id = B.auction 
                    AND B.dateTime BETWEEN A.dateTime AND A.expires
        ) 
    WHERE
        rownum <= 1;

q11

CREATE TEMPORARY TABLE nexmark_table (
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    bidder BIGINT,
    bid_count BIGINT,
    starttime TIMESTAMP(3),
    endtime TIMESTAMP(3)
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        B.bidder,
        count(*) as bid_count,
        SESSION_START(B.dateTime, INTERVAL '10' SECOND) as starttime,
        SESSION_END(B.dateTime, INTERVAL '10' SECOND) as endtime 
    FROM
        bid B 
    GROUP BY
        B.bidder, SESSION(B.dateTime, INTERVAL '10' SECOND);

q12

CREATE TEMPORARY TABLE nexmark_table (
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    bidder BIGINT,
    bid_count BIGINT,
    starttime TIMESTAMP(3),
    endtime TIMESTAMP(3)
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        B.bidder,
        count(*) as bid_count,
        TUMBLE_START(B.p_time, INTERVAL '10' SECOND) as starttime,
        TUMBLE_END(B.p_time, INTERVAL '10' SECOND) as endtime 
    FROM
        (
            SELECT 
                *, PROCTIME() as p_time 
            FROM
                bid
        ) B 
    GROUP BY
        B.bidder, TUMBLE(B.p_time, INTERVAL '10' SECOND);

q15

CREATE TEMPORARY TABLE nexmark_table (
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    `day` VARCHAR,
    total_bids BIGINT,
    rank1_bids BIGINT,
    rank2_bids BIGINT,
    rank3_bids BIGINT,
    total_bidders BIGINT,
    rank1_bidders BIGINT,
    rank2_bidders BIGINT,
    rank3_bidders BIGINT,
    total_auctions BIGINT,
    rank1_auctions BIGINT,
    rank2_auctions BIGINT,
    rank3_auctions BIGINT
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        DATE_FORMAT(dateTime, 'yyyy-MM-dd') as `day`,
        count(*) AS total_bids,
        count(*) 
            filter(where price < 10000) AS rank1_bids,
        count(*) 
            filter(where price >= 10000 
                and price < 1000000) AS rank2_bids,
        count(*) 
            filter(where price >= 1000000) AS rank3_bids,
        count(distinct bidder) AS total_bidders,
        count(distinct bidder) 
            filter(where price < 10000) AS rank1_bidders,
        count(distinct bidder) 
            filter(where price >= 10000 
                and price < 1000000) AS rank2_bidders,
        count(distinct bidder) 
            filter(where price >= 1000000) AS rank3_bidders,
        count(distinct auction) AS total_auctions,
        count(distinct auction) 
            filter(where price < 10000) AS rank1_auctions,
        count(distinct auction) 
            filter(where price >= 10000 
                and price < 1000000) AS rank2_auctions,
        count(distinct auction) 
            filter(where price >= 1000000) AS rank3_auctions 
    FROM
        bid 
    GROUP BY
        DATE_FORMAT(dateTime, 'yyyy-MM-dd');

q16

CREATE TEMPORARY TABLE nexmark_table (
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    channel VARCHAR,
    `day` VARCHAR,
    `minute` VARCHAR,
    total_bids BIGINT,
    rank1_bids BIGINT,
    rank2_bids BIGINT,
    rank3_bids BIGINT,
    total_bidders BIGINT,
    rank1_bidders BIGINT,
    rank2_bidders BIGINT,
    rank3_bidders BIGINT,
    total_auctions BIGINT,
    rank1_auctions BIGINT,
    rank2_auctions BIGINT,
    rank3_auctions BIGINT
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        channel,
        DATE_FORMAT(dateTime, 'yyyy-MM-dd') as `day`,
        max(DATE_FORMAT(dateTime, 'HH:mm')) as `minute`,
        count(*) AS total_bids,
        count(*) 
            filter(where price < 10000) AS rank1_bids,
        count(*) 
            filter(where price >= 10000 
                and price < 1000000) AS rank2_bids,
        count(*) 
            filter(where price >= 1000000) AS rank3_bids,
        count(distinct bidder) AS total_bidders,
        count(distinct bidder) 
            filter(where price < 10000) AS rank1_bidders,
        count(distinct bidder) 
            filter(where price >= 10000 
                and price < 1000000) AS rank2_bidders,
        count(distinct bidder) 
            filter(where price >= 1000000) AS rank3_bidders,
        count(distinct auction) AS total_auctions,
        count(distinct auction) 
            filter(where price < 10000) AS rank1_auctions,
        count(distinct auction) 
            filter(where price >= 10000 
                and price < 1000000) AS rank2_auctions,
        count(distinct auction) 
            filter(where price >= 1000000) AS rank3_auctions 
    FROM
        bid 
    GROUP BY
        channel, DATE_FORMAT(dateTime, 'yyyy-MM-dd');

q17

CREATE TEMPORARY TABLE nexmark_table (
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    auction BIGINT,
    `day` VARCHAR,
    total_bids BIGINT,
    rank1_bids BIGINT,
    rank2_bids BIGINT,
    rank3_bids BIGINT,
    min_price BIGINT,
    max_price BIGINT,
    avg_price BIGINT,
    sum_price BIGINT
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        auction,
        DATE_FORMAT(dateTime, 'yyyy-MM-dd') as `day`,
        count(*) AS total_bids,
        count(*) 
            filter(where price < 10000) AS rank1_bids,
        count(*) 
            filter(where price >= 10000 
                and price < 1000000) AS rank2_bids,
        count(*) 
            filter(where price >= 1000000) AS rank3_bids,
        min(price) AS min_price,
        max(price) AS max_price,
        avg(price) AS avg_price,
        sum(price) AS sum_price 
    FROM
        bid 
    GROUP BY
        auction, DATE_FORMAT(dateTime, 'yyyy-MM-dd');

q18

CREATE TEMPORARY TABLE nexmark_table (
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    auction BIGINT,
    bidder BIGINT,
    price BIGINT,
    channel VARCHAR,
    url VARCHAR,
    dateTime TIMESTAMP(3),
    extra VARCHAR
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        auction, bidder, price, channel, url, dateTime, extra 
    FROM
        (
            SELECT 
                *,
                ROW_NUMBER() 
                    OVER (
                        PARTITION BY
                            bidder, auction
                        ORDER BY
                            dateTime DESC
                    ) AS rank_number 
            FROM
                bid
        ) 
    WHERE
        rank_number <= 1;

q19

CREATE TEMPORARY TABLE nexmark_table (
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    auction BIGINT,
    bidder BIGINT,
    price BIGINT,
    channel VARCHAR,
    url VARCHAR,
    dateTime TIMESTAMP(3),
    extra VARCHAR,
    rank_number BIGINT
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        * 
    FROM
        (
            SELECT 
                *,
                ROW_NUMBER() 
                    OVER (
                        PARTITION BY
                            auction
                        ORDER BY
                            price DESC
                    ) AS rank_number 
            FROM
                bid
        ) 
    WHERE
        rank_number <= 10;

q20

CREATE TEMPORARY TABLE nexmark_table (
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    auction BIGINT,
    bidder BIGINT,
    price BIGINT,
    channel VARCHAR,
    url VARCHAR,
    bid_dateTime TIMESTAMP(3),
    bid_extra VARCHAR,
    itemName VARCHAR,
    description VARCHAR,
    initialBid BIGINT,
    reserve BIGINT,
    auction_dateTime TIMESTAMP(3),
    expires TIMESTAMP(3),
    seller BIGINT,
    category BIGINT,
    auction_extra VARCHAR
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW auction AS
    SELECT 
        auction.id,
        auction.itemName,
        auction.description,
        auction.initialBid,
        auction.reserve,
        dateTime,
        auction.expires,
        auction.seller,
        auction.category,
        auction.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 1;

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        auction,
        bidder,
        price,
        channel,
        url,
        B.dateTime,
        B.extra,
        itemName,
        description,
        initialBid,
        reserve,
        A.dateTime,
        expires,
        seller,
        category,
        A.extra 
    FROM
        bid AS B 
        INNER JOIN auction AS A
            on B.auction = A.id 
    WHERE
        A.category = 10;

q21

CREATE TEMPORARY TABLE nexmark_table (
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    auction BIGINT,
    bidder BIGINT,
    price BIGINT,
    channel VARCHAR,
    channel_id VARCHAR
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        auction,
        bidder,
        price,
        channel,
        CASE 
            WHEN lower(channel) = 'apple' 
            THEN '0' 
            WHEN lower(channel) = 'google' 
            THEN '1' 
            WHEN lower(channel) = 'facebook' 
            THEN '2' 
            WHEN lower(channel) = 'baidu' 
            THEN '3' 
            ELSE REGEXP_EXTRACT(url, '(&|^)channel_id=([^&]*)', 2) 
        END AS channel_id 
    FROM
        bid 
    where
        REGEXP_EXTRACT(url, '(&|^)channel_id=([^&]*)', 2) is not null 
            or lower(channel) in ('apple', 'google', 'facebook', 'baidu');

q22

CREATE TEMPORARY TABLE nexmark_table (
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);

CREATE TEMPORARY TABLE discard_sink (
    auction BIGINT,
    bidder BIGINT,
    price BIGINT,
    channel VARCHAR,
    dir1 VARCHAR,
    dir2 VARCHAR,
    dir3 VARCHAR
) 
WITH ('connector' = 'blackhole');

CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;

INSERT INTO discard_sink
    SELECT 
        auction,
        bidder,
        price,
        channel,
        SPLIT_INDEX(url, '/', 3) as dir1,
        SPLIT_INDEX(url, '/', 4) as dir2,
        SPLIT_INDEX(url, '/', 5) as dir3 
    FROM
        bid;
  1. 单击右侧页签选择更多配置中的引擎版本,版本号选择为vvr-6.0.4-flink-1.15。

  2. 在SQL编辑区域右上方,单击部署

  3. 运维中心 > 作业运维页面中分别单击目标作业,在部署详情页签下资源配置部分,单击右上角编辑。填写配置项并发度为1、TaskManager Memory为4 GiB和TaskManager CPU Cores为1 Core,配置如下图所示。中文阿巴阿巴

  4. 配置完成后,在资源配置右上方单击保存

  5. 分别单击目标作业操作列下的启动

  6. 待目标作业状态为已完成时,单击目标作业名称,在运行事件页签下查看信息,获取作业开始时间结束时间,如下所示。

    • 开始时间:Job was successfully started.

    • 结束时间:Job has successfully finished.

预期结果

说明

Duration计算方式为结束时间减去开始时间的差值,RPS计算方式为events.numDuration的比值。

query

开始时间

结束时间

Duration(单位:秒)

RPS

q0

2023-02-07 11:55:13

2023-02-07 12:26:00

1847

54141

q1

2023-02-07 12:28:06

2023-02-07 12:58:53

1847

54141

q2

2023-02-07 12:29:13

2023-02-07 13:00:02

1849

54083

q3

2023-02-07 12:47:12

2023-02-07 13:18:01

1849

54083

q4

2023-02-07 13:30:08

2023-02-07 14:04:46

2078

48123

q5

2023-02-07 14:15:07

2023-02-07 14:45:45

1838

54406

q7

2023-02-07 14:24:13

2023-02-07 17:08:42

9869

10132

q8

2023-02-07 14:27:09

2023-02-07 14:57:47

1838

54406

q9

2023-02-07 14:35:53

2023-02-07 15:38:16

3743

26716

q11

2023-02-07 14:38:14

2023-02-07 15:08:53

1839

54377

q12

2023-02-07 14:40:36

2023-02-07 15:11:15

1839s

54377

q15

2023-02-07 14:42:43

2023-02-07 15:13:22

1839

54377

q16

2023-02-07 14:46:17

2023-02-07 16:03:09

4012

24925

q17

2023-02-07 16:02:19

2023-02-07 16:33:07

1848

54112

q18

2023-02-07 16:04:43

2023-02-07 16:41:08

2185

45766

q19

2023-02-07 16:07:13

2023-02-07 16:40:01

1968

50813

q20

2023-02-07 16:13:29

2023-02-07 16:59:13

2744

36443

q21

2023-02-07 16:16:32

2023-02-07 16:47:11

1849

54083

q22

2023-02-07 16:18:53

2023-02-07 16:49:41

1848

54112