All Products
Search
Document Center

Realtime Compute for Apache Flink:SelectDB connector (public preview)

Last Updated:Dec 12, 2024

This topic describes how to use the custom SelectDB connector to write data to ApsaraDB for SelectDB.

Background

ApsaraDB for SelectDB is a next-generation real-time data warehouse service. It is fully managed and hosted on Alibaba Cloud and 100% compatible with Apache Doris. ApsaraDB for SelectDB can accommodate your needs to analyze massive amounts of data. For information about the service's benefits and scenarios, see What is ApsaraDB for SelectDB.

The following table describes the capabilities supported by the custom SelectDB connector.

Item

Description

Supported type

Sink

Running mode

Streaming and batch

Data format

JSON and CSV

Metrics

N/A

API

DataStream API and SQL API

Data update/deletion in the sink

Supported

Features

  • Database synchronization

  • Exactly-once semantics, ensuring no duplicates or omissions.

  • Compatibility with Apache Doris 1.0 or later, enabling seamless data synchronization to Apache Doris via the custom SelectDB connector.

Usage notes and prerequisites

  • Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 8.0.10 or later supports the custom SelectDB connector.

  • If you have any questions about the custom SelectDB connector, submit a ticket and select ApsaraDB for SelectDB.

  • The prerequisites of synchronizing data to ApsaraDB for SelectDB are as follows:

    • An ApsaraDB for SelectDB instance is created. For more information, see Create an instance.

    • The CIDR block of the vSwitch in your Flink workspace is added to the IP address whitelist of your ApsaraDB for SelectDB instance. For more information, see Configure an IP address whitelist.

Procedure

  1. Download the JAR file of a custom SelectDB connector of version 1.15, 1.16, or 1.17.

  2. Upload the custom SelectDB connector in the Realtime Compute for Apache Flink console. For more information, see Manage custom connectors.

  3. Create an SQL draft and use the custom SelectDB connector. For more information, see Develop an SQL draft.

    Syntax:

    CREATE TABLE selectdb_sink (
      emp_no       INT ,
      birth_date   DATE,
      first_name   STRING,
      last_name    STRING,
      gender       STRING,
      hire_date    DATE
    ) WITH (
      'connector' = 'doris',
      'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
      'table.identifier' = 'test.employees',
      'username' = 'admin',
      'password' = '****',
      'sink.enable-delete' = 'true'
    );

    The connector option is fixed to doris. For information about how to configure other sink options, see Configuration items of Doris sink.

Data type mappings

See the "Doris & Flink column type mapping" section of the Flink Doris connector topic in the Doris documentation.

Example

This section illustrates how to synchronize data from ApsaraDB RDS for MySQL to ApsaraDB for SelectDB by using the custom SelectDB connector.

  1. Prepare for data synchronization.

    1. Create a Realtime Compute for Apache Flink workspace, ApsaraDB RDS for MySQL instance, and ApsaraDB for SelectDB instance. For more information, see Activate Realtime Compute for Apache Flink, Step 1: Create an ApsaraDB RDS for MySQL instance and configure databases, and Create an instance.

    2. In the ApsaraDB RDS for MySQL console, create a database named order_dw_mysql and a table named orders, and import test data into the table.

      CREATE TABLE `orders` (
        order_id bigint not null primary key,
        user_id varchar(50) not null,
        shop_id bigint not null,
        product_id bigint not null,
        buy_fee decimal(20,2) not null,   
        create_time timestamp not null,
        update_time timestamp not null default now(),
        state int not null 
      );
      
      INSERT INTO orders VALUES
      (100001, 'user_001', 12345, 1, 5000.05, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1),
      (100002, 'user_002', 12346, 2, 4000.04, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1),
      (100003, 'user_003', 12347, 3, 3000.03, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1),
      (100004, 'user_001', 12347, 4, 2000.02, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1),
      (100005, 'user_002', 12348, 5, 1000.01, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1),
      (100006, 'user_001', 12348, 1, 1000.01, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1),
      (100007, 'user_003', 12347, 4, 2000.02, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1);
    3. After connecting to an ApsaraDB for SelectDB instance by using DMS, create a database named selectdb and a table named selecttable.

      CREATE DATABASE selectdb;
      
      CREATE TABLE `selecttable` (
        order_id bigint,
        user_id varchar(50),
        shop_id bigint,
        product_id bigint,
        buy_fee DECIMAL,   
        create_time DATETIME,
        update_time DATETIME,
        state int
       )DISTRIBUTED BY HASH(order_id) BUCKETS 10;
    4. Add the CIDR block of the vSwitch in your Flink workspace to the IP address whitelist of your ApsaraDB for SelectDB instance. For more information, see Console operations and Configure an IP address whitelist.

  2. In the Realtime Compute for Apache Flink console, create an SQL deployment and start the deployment.

    1. Create a MySQL catalog named mysqlcatalog. For more information, see Manage MySQL catalogs.

    2. Download the JAR file of a custom SelectDB connector of version 1.15, 1.16, or 1.17, and upload the JAR file. For more information, see Manage custom connectors.

    3. Go to Development > ETL, click New to create a blank stream draft, and copy the following code to the draft:

      CREATE TEMPORARY TABLE  selectdb_sink (
        order_id BIGINT,
        user_id STRING,
        shop_id BIGINT,
        product_id BIGINT,
        buy_fee DECIMAL,   
        create_time TIMESTAMP(6),
        update_time TIMESTAMP(6),
        state int
      ) 
        WITH (
        'connector' = 'doris',
        'fenodes' = 'selectdb-cn-jfj3z******.selectdbfe.rds.aliyuncs.com:8080',
        'table.identifier' = 'selectdb.selecttable',
        'username' = 'admin',
        'password' = '${secret_values.selectdb}',
        'sink.enable-delete' = 'true'
      );
      
      INSERT INTO selectdb_sink SELECT * FROM `mysqlcatalog`.`order_dw_mysql`.`orders`;
    4. Deploy the draft and start the deployment in the initial mode. For more information, see Create a deployment and Start a deployment.

  3. After connecting to an ApsaraDB for SelectDB instance by using DMS, query data in the selecttable table.

    SELECT * FROM `selecttable` ;