All Products
Search
Document Center

Realtime Compute for Apache Flink:StarRocks connector

Last Updated:Dec 03, 2024

This topic describes how to use the StarRocks connector.

Background information

StarRocks is a new generation of Massively Parallel Processing (MPP) data warehouses that provide extremely fast query performance in all scenarios. StarRocks is dedicated to providing extremely fast and unified analytics experience. StarRocks provides the following benefits:

  • Is compatible with the MySQL protocol. You can use a MySQL client or a common business intelligence (BI) tool to access StarRocks for data analytics.

  • Uses a distributed architecture that provides the following capabilities:

    • Horizontally splits tables and stores data in multiple replicas.

    • Scales clusters in a flexible manner to support analytics of 10 PB of data.

    • Supports the MPP architecture to accelerate data computing.

    • Supports multiple replicas to ensure fault tolerance.

Flink connectors cache data and use Stream Load to import data in batches to generate result tables, and read data in batches to generate source tables. The following table describes the capabilities supported by the StarRocks connector.

Item

Description

Table type

Source table, result table, and data ingestion sink

Running mode

Streaming mode and batch mode

Data format

CSV

Metric

N/A

API type

DataStream API, SQL API, and YAML API for data ingestion

Data update or deletion in a result table

Supported

Prerequisites

A StarRocks cluster is created. The StarRocks cluster can be a StarRocks cluster of EMR or a self-managed StarRocks cluster that is hosted on Elastic Compute Service (ECS) instances.

Limits

  • Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 6.0.5 or later supports the StarRocks connector.

  • The StarRocks connector supports only the at-least-once and exactly-once semantics.

SQL statements

Features

StarRocks of E-MapReduce (EMR) supports the CREATE TABLE AS and CREATE DATABASE AS statements. The CREATE TABLE AS statement can be used to synchronize the schema and data of a single table. The CREATE DATABASE AS statement can be used to synchronize data of an entire database or the schema and data of multiple tables in the same database. For more information, see Use the CREATE TABLE AS and CREATE DATABASE AS statements of Realtime Compute for Apache Flink to synchronize data from an ApsaraDB RDS for MySQL instance to a StarRocks cluster.

Syntax

CREATE TABLE USER_RESULT(
 name VARCHAR,
 score BIGINT
 ) WITH (
 'connector' = 'starrocks',
 'jdbc-url'='jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port?xxxxx',
 'load-url'='fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port',
 'database-name' = 'xxx',
 'table-name' = 'xxx',
 'username' = 'xxx',
 'password' = 'xxx'
 );

Parameters in the WITH clause

Category

Parameter

Description

Data type

Required

Default value

Remarks

Common parameters

connector

The table type.

String

Yes

No default value

Set the value to starrocks.

jdbc-url

The Java Database Connectivity (JDBC) URL that is used to connect to the database.

String

Yes

No default value

The specified IP address and JDBC port of a frontend (FE) are used. The value of this parameter is in the jdbc:mysql://ip:port format.

database-name

The name of the StarRocks database.

String

Yes

No default value

N/A.

table-name

The name of the StarRocks table.

String

Yes

No default value

N/A.

username

The username that is used to connect to the StarRocks database.

String

Yes

No default value

N/A.

password

The password that is used to connect to the StarRocks database.

String

Yes

No default value

N/A.

starrocks.create.table.properties

The properties of the StarRocks table.

String

No

No default value

The initial properties of the StarRocks table, such as the engine and the number of replicas, are specified. Example: 'starrocks.create.table.properties' = 'buckets 8','starrocks.create.table.properties' = 'replication_num=1'

Parameters only for source tables

scan-url

The URL for data scan.

String

No

No default value

The specified IP address and HTTP port of an FE are used. The value of this parameter is in the fe_ip:http_port;fe_ip:http_port format.

Note

Separate multiple pairs of IP addresses and port numbers with semicolons (;).

scan.connect.timeout-ms

The timeout period for the StarRocks connector of Realtime Compute for Apache Flink to connect to the StarRocks database.

If the connection duration exceeds the value of this parameter, an error is returned.

String

No

1000

Unit: milliseconds.

scan.params.keep-alive-min

The keep-alive period of the query task.

String

No

10

N/A.

scan.params.query-timeout-s

The timeout period of the query task.

If no query result is returned within the period specified by this parameter, the query task is stopped.

String

No

600

Unit: seconds.

scan.params.mem-limit-byte

The maximum memory for a single query in a backend (BE) node.

String

No

1073741824 (1 GB)

Unit: bytes.

scan.max-retries

The maximum number of retries when a query fails.

If the number of retries reaches the value of this parameter, an error is returned.

String

No

1

N/A.

Parameters only for sink tables

load-url

The URL for data import.

String

Yes

No default value

The specified IP address and HTTP port of an FE are used. The value of this parameter is in the fe_ip:http_port;fe_ip:http_port format.

Note

Separate multiple pairs of IP addresses and port numbers with semicolons (;).

sink.semantic

The semantics for data writing.

String

No

at-least-once

Valid values:

  • at-least-once: The at-least-once semantics is used. This is the default value.

  • exactly-once: The exactly-once semantics is used.

sink.buffer-flush.max-bytes

The maximum amount of data that is allowed in the buffer.

String

No

94371840 (90 MB)

Valid values: 64 MB to 10 GB.

sink.buffer-flush.max-rows

The maximum number of rows that are allowed in the buffer.

String

No

500000

Valid values: 64000 to 5000000.

sink.buffer-flush.interval-ms

The interval at which the buffer is refreshed.

String

No

300000

Valid values: 1000 to 3600000. Unit: milliseconds.

sink.max-retries

The maximum number of retries for writing data to the table.

String

No

3

Valid values: 0 to 10.

sink.connect.timeout-ms

The timeout period for connecting to the StarRocks database.

String

No

1000

Valid values: 100 to 60000. Unit: millisecond.

sink.properties.*

The properties of the sink table.

String

No

No default value

The import properties of Stream Load. For example, the sink.properties.format property specifies the format of data that is imported in Stream Load mode. The data format can be CSV. For more information about parameters, see Stream Load.

Data type mappings

Data type of StarRocks

Data type of Realtime Compute for Apache Flink

NULL

NULL

BOOLEAN

BOOLEAN

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INT

BIGINT

BIGINT

BIGINT UNSIGNED

Note

Only Realtime Compute for Apache Flink that uses VVR 8.0.10 or later supports this data type mapping.

DECIMAL(20,0)

LARGEINT

DECIMAL(20,0)

FLOAT

FLOAT

DOUBLE

DOUBLE

DATE

DATE

DATETIME

TIMESTAMP

DECIMAL

DECIMAL

DECIMALV2

DECIMAL

DECIMAL32

DECIMAL

DECIMAL64

DECIMAL

DECIMAL128

DECIMAL

CHAR(n × 3)

Note
  • Only Realtime Compute for Apache Flink that uses VVR 8.0.10 and later allow the length of a CHAR-type column to be automatically extended to three times the length before mapping. This is adapted to the differences in encoding between MySQL and StarRocks.

  • The length of a CHAR-type column in StarRocks cannot exceed 255 bytes. Therefore, only CHAR-type columns whose length does not exceed 85 characters in Realtime Compute for Apache Flink can be mapped to CHAR-type columns in StarRocks.

CHAR(n)

(n ≤ 85)

VARCHAR(n × 3)

Note
  • Only Realtime Compute for Apache Flink that uses VVR 8.0.10 and later allow the length of a VARCHAR-type column to be automatically extended to three times the length before mapping. This is adapted to the differences in encoding between MySQL and StarRocks.

  • The length of a CHAR-type column in StarRocks cannot exceed 255 bytes. Therefore, only CHAR-type columns whose length does not exceed 85 characters in Realtime Compute for Apache Flink can be mapped to VARCHAR-type columns in StarRocks.

CHAR(n)

(n > 85)

VARCHAR

STRING

VARBINARY

Note

Only Realtime Compute for Apache Flink that uses VVR 8.0.10 or later supports this data type mapping.

VARBINARY

Sample code

CREATE TEMPORARY TABLE IF NOT EXISTS `runoob_tbl_source` (
  `runoob_id` BIGINT NOT NULL,
  `runoob_title` STRING NOT NULL,
  `runoob_author` STRING NOT NULL,
  `submission_date` DATE NULL
) WITH (
  'connector' = 'starrocks',
  'jdbc-url' = 'jdbc:mysql://ip:9030',
  'scan-url' = 'ip:18030',
  'database-name' = 'db_name',
  'table-name' = 'table_name',
  'password' = 'xxxxxxx',
  'username' = 'xxxxx'
);
CREATE TEMPORARY TABLE IF NOT EXISTS `runoob_tbl_sink` (
  `runoob_id` BIGINT NOT NULL,
  `runoob_title` STRING NOT NULL,
  `runoob_author` STRING NOT NULL,
  `submission_date` DATE NULL
  PRIMARY KEY(`runoob_id`)
  NOT ENFORCED
) WITH (
  'jdbc-url' = 'jdbc:mysql://ip:9030',
  'connector' = 'starrocks',
  'load-url' = 'ip:18030',
  'database-name' = 'db_name',
  'table-name' = 'table_name',
  'password' = 'xxxxxxx',
  'username' = 'xxxx',
  'sink.buffer-flush.interval-ms' = '5000'
);

INSERT INTO runoob_tbl_sink SELECT * FROM runoob_tbl_source;

Data ingestion

You can use the StarRocks pipeline connector to easily write data records and table schema changes from upstream data sources to external StarRocks databases. Both open source StarRocks and fully-managed EMR Serverless StarRocks are supported.

Features

  • Automatic database and table creation

    If an upstream database and table do not exist in the downstream StarRocks instance, the database and table are automatically created. You can configure the table.create.properties.* parameters to specify the options for automatic table creation.

  • Synchronization of table schema changes

    The StarRocks connector automatically applies the CreateTableEvent, AddColumnEvent, and DropColumnEvent events to downstream databases.

Usage notes

  • The StarRocks connector supports only at-least-once semantics and uses primary key tables to ensure the idempotence of write operations.

  • The table from which data is synchronized must contain a primary key. For tables that do not contain a primary key, you must specify a primary key for each table in the TRANSFORM statement block before data in the tables can be written to downstream databases. Sample code:

    transform:
      - source-table: ...
        primary-keys: id, ...
  • The bucket key of an automatically created table must be the same as the primary key, and the table cannot contain partition keys.

  • During synchronization of table schema changes, new columns can only be appended to the end of existing columns. By default, the Lenient mode is used for schema evolution. In this mode, columns that are inserted at other positions of a table are automatically moved to the end of existing columns.

  • If you use a StarRocks version earlier than 2.5.7, you must explicitly specify the number of buckets by using the table.create.num-buckets parameter. If you use StarRocks 2.5.7 or later, the number of buckets is automatically specified. For more information, see Data distribution.

  • If you use StarRocks 3.2 or later, we recommend that you set the table.create.properties.fast_schema_evolution parameter to true to accelerate table schema changes.

Syntax

source:
  ...

sink:
  type: starrocks
  name: StarRocks Sink
  jdbc-url: jdbc:mysql://127.0.0.1:9030
  load-url: 127.0.0.1:8030
  username: root
  password: pass

Parameters

Parameter

Description

Data type

Required

Default value

Remarks

type

The connector name.

String

Yes

No default value

Set the value to starrocks.

name

The display name of the sink.

String

No

No default value

N/A.

jdbc-url

The JDBC URL that is used to connect to the database.

String

Yes

No default value

You can specify multiple URLs. Separate the URLs with commas (,). Example: jdbc:mysql://fe_host1:fe_query_port1,fe_host2:fe_query_port2,fe_host3:fe_query_port3.

load-url

The HTTP URL that is used to connect to the FE node.

String

Yes

No default value

You can specify multiple URLs. Separate the URLs with semicolons (;). Example: fe_host1:fe_http_port1;fe_host2:fe_http_port2.

username

The username that is used to connect to the StarRocks database.

String

Yes

No default value

The SELECT and INSERT permissions on the destination table must be granted to the user. You can grant the required permissions to the user by using the GRANT command of StarRocks.

password

The password that is used to connect to the StarRocks database.

String

Yes

No default value

N/A.

sink.label-prefix

The label prefix that is used for Stream Load.

String

No

No default value

N/A.

sink.connect.timeout-ms

The timeout period for HTTP connections.

Integer

No

30000

Unit: milliseconds. Valid values: 100 to 60000.

sink.wait-for-continue.timeout-ms

The timeout period for the client to wait for a 100 Continue response from the server.

Integer

No

30000

Unit: millisecond. Valid values: 3000 to 600000.

sink.buffer-flush.max-bytes

The size of data that can be cached in the memory before data is written to the StarRocks database.

Long

No

157286400

Unit: bytes. Valid values: 64 MB to 10 GB.

Note
  • The cache space is shared among all tables. When the cache size reaches the specified value, the connector performs a flush operation on multiple tables.

  • To improve the throughput, you can increase the value of this parameter. However, this may increase the data import latency.

sink.buffer-flush.interval-ms

The interval between two consecutive flush operations for each table.

Long

No

300000

Unit: millisecond.

sink.scan-frequency.ms

The interval between two consecutive checks to detect whether a flush operation needs to be performed.

Long

No

50

Unit: millisecond.

sink.io.thread-count

The number of threads during data import in the Stream Load mode.

Integer

No

2

N/A.

sink.at-least-once.use-transaction-stream-load

Specifies whether to use the Stream Load transaction interface for data import.

Boolean

No

true

The setting of this parameter takes effect only when a supported database is used.

sink.properties.*

The additional parameters that are provided for the sink.

String

No

No default value

You can view the supported parameters in the Stream Load mode.

table.create.num-buckets

The number of buckets of an automatically created table.

Integer

No

No default value

  • StarRocks 2.5.7 and later: This parameter is optional. The number of buckets can be automatically specified. For more information, see Data distribution.

  • StarRocks 2.5.6 and earlier: This parameter is required.

table.create.properties.*

The additional parameters to be specified when a table is automatically created.

String

No

No default value

For example, you can add the 'table.create.properties.fast_schema_evolution' = 'true' configuration to enable the fast schema evolution feature. For more information, see the StarRocks documentation.

table.schema-change.timeout

The timeout duration for a schema change operation.

Duration

No

30 min

The value of this parameter must be set to an integer. Unit: seconds.

Note

If the duration of a schema change operation exceeds the value specified by this parameter, the deployment fails.

Data type mappings

Note

StarRocks does not support all Change Data Capture (CDC) YAML data types. If you write data of an unsupported type to a downstream database, the deployment fails. You can use the built-in function CAST in the transform component to convert unsupported data types or use the projection statement to delete data of unsupported types from the result table. For more information, see Development guide for data ingestion.

Data type of CDC

Data type of StarRocks

Remarks

TINYINT

TINYINT

N/A.

SMALLINT

SMALLINT

INT

INT

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL(p, s)

DECIMAL(p, s)

BOOLEAN

BOOLEAN

DATE

DATE

TIMESTAMP

DATETIME

TIMESTAMP_LTZ

DATETIME

CHAR(n)

(n ≤ 85)

CHAR(n × 3)

The length of a CHAR-type column in CDC specifies the number of characters that can be stored. However, the length of a CHAR-type column in StarRocks specifies the number of bytes encoded in UTF-8 that can be stored. In most cases, the length of a UTF-8 encoded Chinese character cannot exceed three bytes. Therefore, after a CHAR-type column in CDC is mapped to a CHAR-type column in StarRocks, the length of the column is three times the length before mapping.

Note

The length of a CHAR-type column in StarRocks cannot exceed 255 bytes. Therefore, only CDC CHAR-type columns whose length does not exceed 85 characters can be mapped to CHAR-type columns in StarRocks.

CHAR(n)

(n > 85)

VARCHAR(n × 3)

The length of a CHAR-type column in CDC specifies the number of characters that can be stored. However, the length of a CHAR-type column in StarRocks specifies the number of bytes encoded in UTF-8 that can be stored. In most cases, the length of a UTF-8 encoded Chinese character cannot exceed three bytes. Therefore, after a CHAR-type column in CDC is mapped to a VARCHAR-type column in StarRocks, the length of the column is three times the length before mapping.

Note

The length of a CHAR-type column in StarRocks cannot exceed 255 bytes. Therefore, CDC CHAR-type columns whose length is greater than 85 characters are mapped to VARCHAR-type columns in StarRocks.

VARCHAR(n)

VARCHAR(n × 3)

The length of a VARCHAR-type column in CDC specifies the number of characters that can be stored. However, the length of a VARCHAR-type column in StarRocks specifies the number of bytes encoded in UTF-8 that can be stored. In most cases, the length of a UTF-8 encoded Chinese character cannot exceed three bytes. Therefore, after a VARCHAR-type column in CDC is mapped to a VARCHAR-type column in StarRocks, the length of the column is three times the length before mapping.