All Products
Search
Document Center

:Create a Hudi result table

更新時間:Jun 08, 2023

This topic provides the DDL syntax that is used to create a Hudi result table, describes the background information, limits, and parameters in the WITH clause, and provides examples.

Background information

  • Introduction to Hudi

    The following table describes the definition, features, and common usage scenarios of Hudi.

    Item

    Description

    Definition

    Apache Hudi is an open source framework that manages table data in data lakes. Hudi organizes file layouts based on Alibaba Cloud Object Storage Service (OSS) or Hadoop Distributed File System (HDFS) to ensure atomicity, consistency, isolation, durability (ACID) and supports efficient row-level data update and deletion. This simplifies extract, transform, load (ETL) development. Hudi can automatically manage small files and merge small files into a large file of the specified size. This prevents an excessive number of small files from being created during data insertion and update. This avoids the impact of excessive small files on query performance and the O&M workloads that are generated when you monitor and rewrite small files. You can use Hudi with compute engines such as Apache Flink, Apache Presto, and Apache Spark to ingest data into data lakes and compute and analyze the data. In most cases, Hudi is used to meet business requirements, such as acceleration of ingesting data from databases into data lakes, real-time consumption of incremental data, and data backfilling. For more information, see Apache Hudi.

    Features

    • Supports ACID semantics and the serializable isolation level. The serializable isolation level is the strictest level of SQL transaction isolation.

    • Supports UPSERT semantics. The UPSERT semantics combines INSERT and UPDATE semantics. Fully managed Flink uses the UPSERT semantics to write data to a destination table based on the following rules: If a data record that is read from the source table does not exist in the destination table, fully managed Flink inserts the data record into the destination table. If a data record that is read from the source table exists in the destination table, fully managed Flink updates the data record. The INSERT INTO statement can significantly simplify the development code and improve data processing efficiency.

    • Provides historical details of the data versions at a point in time based on the time travel feature. This helps you perform data O&M in an efficient manner and improves data quality.

    • Supports the schema evolution feature. This feature allows you to perform schema-related operations, such as dynamically adding columns and changing data types.

    Scenarios

    • Acceleration of ingesting data from databases into data lakes

      Compared with the traditional method that is used to load and merge a large amount of data, Hudi allows you to update and write streaming data to a super large dataset in real time in a more cost-effective manner. During the real-time ETL process, you can directly write change data capture (CDC) data to a data lake for downstream business. In most cases, you can use the MySQL CDC connector of fully managed Flink to write binary log data of the relational database management system (RDBMS) MySQL to a Hudi table.

    • Incremental ETL

      You can use the incremental extraction method of ETL to obtain the change data streams from Hudi in real time. This method provides better real-time performance and is more lightweight than offline ETL scheduling. In most cases, the online business data is incrementally extracted to an offline storage system. The Flink engine writes the extracted data to a Hudi table, and then the Apache Presto or Apache Spark engine is used to perform efficient online analytical processing (OLAP).

    • Message Queue service

      In scenarios where you need to process only a small amount of data, Hudi can also be used as a Message Queue service to replace Kafka. This simplifies the application development architecture.

    • Data backfilling

      If you want to update historical full data in specific rows and columns of a table, you can use data lakes. This greatly reduces the consumption of computing resources and improves end-to-end performance. In most cases, full data and incremental data are read from Hudi tables in a Hive metastore and the two tables are joined to generate a wide table.

  • Advantages of fully managed Flink into which Hudi is integrated

    Compared with the open source Hudi community, fully managed Flink into which Hudi is integrated provides more advantages. The following table describes these advantages.

    Advantage

    Description

    Maintenance-free based on the integration between the platform and fully managed Flink

    Fully managed Flink provides the built-in Hudi connector to simplify O&M and provide a service level agreement (SLA) guarantee.

    Improved data connectivity

    The Hudi connector is connected to multiple Alibaba Cloud big data computing and analytics engines. This way, data is decoupled from computing engines and can be seamlessly migrated among Apache Flink, Apache Spark, Apache Presto, and Apache Hive.

    Optimized data ingestion from databases to data lakes

    The Hudi connector works with the Flink CDC connector to simplify data development.

    Enterprise-class features

    Enterprise-class features are supported, such as unified metadata views of Data Lake Formation (DLF) and automatic and lightweight table schema changes.

    Low-cost storage and high scalability by using Alibaba Cloud OSS

    Data is stored in the Apache Parquet or Apache Avro format in Alibaba Cloud OSS. Storage and computing are isolated and resources can be scaled in a flexible manner.

  • CDC data synchronization导入示意图

    CDC data includes complete database changes. You can use one of the following methods to import data to Hudi:

    • Consume and import Kafka data that is in a specific CDC format to Hudi at the same time.

      Three CDC formats are supported: debezium-json, canal-json, and maxwell-json. This method provides high scalability and requires Kafka and Debezium data synchronization tools.

    • Access binary log data of a database by using the Flink CDC connector and import the data to Hudi.

      This method uses lightweight components to reduce the dependency on tools.

    Note
    • If the upstream data order cannot be ensured, you must specify the write.precombine.field field.

    • In CDC scenarios, you must set changelog.enabled to true to enable the changelog mode.

Limits

  • Only Realtime Compute for Apache Flink whose engine version is vvr-4.0.11-flink-1.13 or later supports the Hudi connector.

  • Only HDFS or Alibaba Cloud OSS can be used as a file system.

  • You cannot publish drafts in a session cluster.

DDL syntax

CREATE TEMPORARY TABLE hudi_sink (
  uuid BIGINT,
  data STRING,
  ts   TIMESTAMP(3)
) WITH (
  'connector' = 'hudi',
  'table.type' = 'COPY_ON_WRITE',
  'path' = 'oss://<yourOSSBucket>/<Custom storage location>',
  'oss.endpoint' = '<yourOSSEndpoint>',
  'accessKeyId' = '<yourAccessKeyId>',
  'accessKeySecret' = '<yourAccessKeySecret>' ,
  'hive_sync.enable'='true',
  'hive_sync.db'='<db name>',
  'hive_sync.table' = '<table name>',
  'hive_sync.mode' = 'hms',
  'dlf.catalog.region' = 'cn-hangzhou',
  'dlf.catalog.endpoint' = 'dlf-vpc.cn-hangzhou.aliyuncs.com'
);

Parameters in the WITH clause

  • Basic parameters

    Parameter

    Description

    Required

    Remarks

    connector

    The type of the result table.

    Yes

    Set the value to hudi.

    table.type

    The type of tables.

    Yes

    Valid values:

    • COPY_ON_WRITE: Parquet columnar storage is used. A base file is created each time data is updated.

    • MERGE_ON_READ: Parquet columnar storage and Avro row-based storage are used. Update operations are recorded in the delta log file, and the delta log file and the Parquet columnar file are asynchronously merged to a new version file.

    path

    The path in which the table is stored.

    Yes

    The table can be stored in an OSS bucket or HDFS. For example, the path can be in the oss://<bucket name>/table or hdfs://<ip>:<port>/table format.

    oss.endpoint

    The endpoint of OSS.

    No

    If you store the table in OSS, you must configure this parameter. For more information about OSS endpoints, see Regions and endpoints.

    accessKeyId

    The AccessKey ID of your Alibaba Cloud account.

    No

    If you store the table in OSS, you must configure this parameter. For more information about how to obtain an AccessKey ID, see Obtain an AccessKey pair.

    accessKeySecret

    The AccessKey secret of your Alibaba Cloud account.

    No

    If you store the table in OSS, you must configure this parameter. For more information about how to obtain an AccessKey secret, see Obtain an AccessKey pair.

    hive_sync.enable

    Specifies whether to synchronize metadata to Hive.

    No

    Valid values:

    • true: Metadata is synchronized to Hive.

    • false: Metadata is not synchronized to Hive.

    hive_sync.mode

    The Hive data synchronization mode.

    No

    Valid values:

    • hms: If you use a Hive metastore or DLF catalog, set this parameter to hms. This is the recommended value.

    • jdbc: If you use a Java Database Connectivity (JDBC) catalog, set this parameter to jdbc. This is the default value.

    hive_sync.db

    The name of the Hive database to which data is synchronized.

    No

    N/A.

    hive_sync.table

    The name of the Hive table to which data is synchronized.

    No

    N/A.

    dlf.catalog.region

    The name of the region in which the DLF service is activated.

    No

    Note
    • The configuration of the dlf.catalog.region parameter takes effect only when you set hive_sync.mode to hms.

    • Make sure that the value of this parameter matches the endpoint that is specified by the dlf.catalog.endpoint parameter.

    dlf.catalog.endpoint

    The endpoint of the DLF service.

    No

    Note
    • The configuration of the dlf.catalog.endpoint parameter takes effect only when you set hive_sync.mode to hms.

    • We recommend that you set the dlf.catalog.endpoint parameter to a VPC endpoint of DLF. For example, if you select the China (Hangzhou) region, set the dlf.catalog.endpoint parameter to dlf-vpc.cn-hangzhou.aliyuncs.com.

    • If you want to access DLF across VPCs, follow the instructions that are described in How does fully managed Flink access a service across VPCs?

    write.operation

    The mode that is used for write operations.

    No

    Valid values:

    • insert: Data is written to the table in append mode.

    • upsert: Data is updated to the table. This is the default value.

    • bulk_insert: Multiple data records are written to the table at a time.

    write.precombine.field

    A version field. The system determines whether to update a message based on the value of this field.

    No

    The default value is the ts field. If the ts field is not used, the processing order is used.

    index.type

    The type of the index.

    No

    Valid values:

    • FLINK_STATE: The state storage backend of Flink is used as index storage. In this case, deployments can be updated across partitions and dynamic scale-out of buckets is supported. This is the default value.

    • BUCKET: The hash algorithm is used to route messages to buckets. In this case, deployments cannot be updated across partitions and only a fixed number of buckets is supported.

      Flink allows you to configure the hoodie.bucket.index.hash.field parameter to specify the hash field. By default, the primary key is used as the hash field. You can also specify a subset of the primary key.

    Note
    • If the amount of data exceeds 20,000 queries per second (QPS), we recommend that you set this parameter to BUCKET.

    • Flink allows you to configure the hoodie.bucket.index.num.bucket parameter to specify the number of buckets. The default value of this parameter is 4.

  • Advanced parameters

    • Memory parameters

      Note
      • The unit of all memory parameters is MB.

      • Three factors affect memory: the number and memory configuration of TaskManagers, the parallelism of write tasks, and the memory that can be allocated to each write task. Therefore, we recommend that you confirm the memory that can be allocated to each write task before you consider the settings of relevant memory parameters.

      Parameter

      Description

      Default value

      Remarks

      write.task.max.size

      The maximum amount of available memory for a write task.

      1024

      The size of the memory buffer that is reserved for each write task is the difference between the value of the write.task.max.size parameter and the value of the compaction.max_memory parameter.

      If the memory buffer of the write task reaches the threshold, data in the memory is stored on disks.

      Take note of the amount of memory that is allocated by TaskManagers to each write task to ensure that each write task is allocated with memory based on the memory size that is specified by the write.task.max.size parameter. For example, if a TaskManager has 4 GB of memory and runs two StreamWriteFunction tasks, each of the StreamWriteFunction tasks can be allocated with 2 GB of memory. In this case, you can reserve a buffer for other tasks, such as BucketAssignFunction tasks, on the TaskManager to consume memory.

      compaction.max_memory

      The maximum amount of available memory for file compaction.

      100

      If you want to compact files online and memory resources are sufficient, you can increase the value of this parameter. For example, you can set this parameter to 1 GB.

      Take note of the changes in compaction memory. The compaction.max_memory parameter specifies the size of memory that can be used when each compaction task reads logs. If memory resources are sufficient, we recommend that you perform the following operation based on the table type:

      • For a Merge on Read (MoR) table, you can increase the value of the compaction.max_memory parameter.

      • For a Copy on Write (CoW) table, you can increase the values of the write.task.max.size and write.merge.max_memory parameters at the same time.

      write.merge.max_memory

      The CoW operation. During this operation, incremental and full data files are merged. The incremental data is cached in the memory. This parameter specifies the size of the heap memory that can be used.

      100

      In most cases, you can retain the default value.

    • Parameters for parallelism

      Parameter

      Description

      Default value

      Remarks

      write.tasks

      The parallelism of write tasks. In each write task, data is written to one bucket or to a specified number of buckets in sequence.

      4

      If you increase the value of this parameter, the number of small files does not increase.

      write.bucket_assign.tasks

      The number of bucket assigners that can run in parallel.

      1

      If you increase the value of this parameter, the number of buckets increases and the number of small files increases.

      write.index_bootstrap.tasks

      The parallelism of index bootstrap operators. If you increase the parallelism of index bootstrap operators, the efficiency of the bootstrap phase can be improved. Checkpoints may be blocked in the bootstrap phase. To resolve this issue, you can set the number of failed checkpoints that is allowed to a large value.

      If this parameter is not explicitly specified, the parallelism of Flink operators is used by default.

      This parameter takes effect only when the index.bootstrap.enabled parameter is set to true.

    • Parameters for online compaction

      Parameter

      Description

      Default value

      Remarks

      compaction.tasks

      The number of parallel operators that can be compacted online.

      4

      Online compaction consumes computing resources.

      compaction.trigger.strategy

      The compaction policy.

      num_commits

      Valid values:

      • num_commits: Compaction is triggered when the specified number of commits files is reached.

      • time_elapsed: Compaction is triggered when the specified period of time elapses.

      • num_and_time: Compaction is triggered when both num_commits and time_elapsed are met.

      • num_or_time: Compaction is triggered when num_commits or time_elapsed is met.

      compaction.delta_commits

      The maximum number of delta commits files.

      5

      The value of this parameter is an integer. We recommend that you set this parameter to a value not greater than 20. The default value 5 indicates that compaction is triggered when five delta commits files are generated.

      compaction.delta_seconds

      The maximum interval to trigger online compaction.

      3600

      Unit: seconds.

      compaction.target_io

      The maximum I/O throughput for each compression task.

      500 (GB)

      N/A.

  • Parameter for enabling the changelog mode

    The Hudi connector allows you to retain all changes to messages. After the Hudi connector is connected to the Flink engine, you can use the end-to-end near-real-time data warehousing is implemented. For a MoR table, the Hudi connector retains all changes in messages in the row-based storage format. This way, the reader can read all modifications on the MoR table in streaming mode and all change records. In this case, you must set changelog.enabled to true to enable the changelog mode.

    After you set changelog.enabled to true, all changes can be consumed. The asynchronous compaction task compacts intermediate changes into one record. Therefore, if data reading in streaming mode and data consumption are not performed in a timely manner, only the last record can be read after data compression. You can change the buffer time for data compression to reserve sufficient time for the reader to read and consume data. For example, you can adjust the configuration of the compaction.delta_commits and compaction.delta_seconds parameters.

    Parameter

    Description

    Required

    Remarks

    changelog.enabled

    Specifies whether to enable the changelog mode.

    No

    Valid values:

    • true: The changelog mode is enabled.

    • false: The changelog mode is disabled. This is the default value. If the changelog mode is disabled, the UPSERT semantics is supported. Among all records, only the last compaction record is available and the intermediate changes may be compacted.

    Note

    If data is read in streaming mode, each data change is displayed. If a specific amount of data is read at a time, only the change structure after compaction is displayed.

  • Parameters for batch import

    If the existing data is obtained from a data source other than Hudi, you can use the batch import feature to quickly import the existing data to a Hudi table. When you use the batch import feature, take note of the following points:

    • During batch import, Avro-based data serialization and data compaction are not performed. Deduplication is not performed after data is imported. If you have high requirements on the uniqueness of data, you must make sure that the imported data is unique by yourself.

    • In batch execution mode, the write.operation parameter is more efficient. The system sorts input data by partition and writes the data to a Hudi table by default. This way, the write operation is not frequently switched among different files and the system performance does not decrease.

    • You can configure the write.tasks parameter to specify the parallelism of bulk_insert write tasks. The number of small files varies based on the parallelism.

    The following table describes the parameters in the WITH clause.

    Parameter

    Description

    Required

    Remarks

    write.tasks

    The parallelism of write tasks. In each write task, data is written to one bucket or to a specified number of buckets in sequence.

    No

    Default value: 4.

    write.bulk_insert.shuffle_by_partition

    Specifies whether to scatter data by partition field and then write the data to partitions in write tasks.

    No

    Default value: true.

    Warning

    If this parameter is set to true, the number of small files reduces but data skew may occur.

    write.bulk_insert.sort_by_partition

    Specifies whether to sort data by partition field before data is written to the table.

    No

    Default value: true. If the system writes data to multiple partitions in a write task, the number of small files reduces when this parameter is set to true.

    write.sort.memory

    The available management memory for the sort operator.

    No

    Default value: 128. Unit: MB.

  • Parameters for full incremental loading

    If you want to import offline data to your offline Hudi result table that contains full data and then write incremental data to the result table with deduplication, you can set the index.bootstrap.enabled parameter to true to enable the full incremental loading feature.

    Note

    If it takes a long period of time to write data to the table, you can increase the number of resources when you write full data to the table. After full data is written, you can decrease the number of resources or set the write.rate.limit parameter to true when you write incremental data to the table.

    Parameter

    Description

    Required

    Remarks

    index.bootstrap.enabled

    Specifies whether to enable the full incremental loading feature. After you enable this feature, the index data of the existing table is loaded to the state storage at a time.

    Yes

    Valid values:

    • true: The write throttling feature is enabled.

    • false: The full incremental loading feature is disabled. This is the default value.

    index.partition.regex

    Specifies a regular expression to filter partitions.

    No

    By default, all partitions are loaded.

  • Parameter for write operation throttling

    High write throughput and out-of-order data that occurs when data is randomly written to partitions may easily decrease the write performance and cause throughput glitches. To ensure smooth write traffic, you can enable the write operation throttling feature. For example, if Flink streaming data is consumed to export database and table data as database tables in Hudi formats after tens of billions of full and incremental data is synchronized to Kafka, a large amount of historical data and incremental data exists in Kafka. As a result, the downstream needs to read a large amount of data. In this case, you can configure the write.rate.limit parameter for throttling.

    Parameter

    Description

    Required

    Remarks

    write.rate.limit

    The maximum number of data records that can be processed per second.

    No

    Default value: 0. This indicates that the number of data records that are processed per second is not restricted.

  • Parameter for writing data in append mode

    Parameter

    Description

    Required

    Remarks

    write.insert.cluster

    Specifies whether to merge small files during data writing.

    No

    Valid values:

    • true: Small files are merged during data writing.

      Important

      If you set this parameter to true, small files are merged without deduplication before data writing. The throughput is affected.

    • false: Small files are not merged during data writing. This is the default value.

    Note

    By default, data is written to a CoW table in append mode. In this mode, files are not merged. However, you can enable the small file merging feature to merge small files.

Examples

  • Example 1: Write data to a Hudi result table

    This example shows how to use the MySQL CDC connector to read data in streaming mode and then write the data to a Hudi table.

    1. An OSS bucket is created.

      For more information, see Create buckets.

    2. On the Draft Editor page, write code for an SQL streaming draft in the text editor of the draft that you want to edit.

      CREATE TEMPORARY TABLE datagen(
        uuid    BIGINT,
        data  STRING,
        ts TIMESTAMP(3)
      ) WITH (
        'connector' = 'datagen'
      );
      
      CREATE TEMPORARY TABLE hudi_sink (
        uuid BIGINT,
        data STRING,
        ts TIMESTAMP(3)
      ) WITH (
        'connector' = 'hudi',           
        'oss.endpoint' = '<yourOSSEndpoint>',                     
        'accessKeyId' = '<yourAccessKeyId>',                    
        'accessKeySecret' = '<yourAccessKeySecret>',                    
        'path' = 'oss://<yourOSSBucket>/<Custom storage location>', 
        'table.type' = 'COPY_ON_WRITE'                           
      );
      
      INSERT INTO hudi_sink SELECT * from datagen;
    3. On the right side of the Draft Editor page, click the Advanced tab and set Engine Version to vvr-4.0.15-flink-1.13.

    4. Click Validate.

    5. In the upper-right corner of the Draft Editor page, click Publish.

    6. On the Deployments page in the console of fully managed Flink, find the desired deployment and click Start in the Actions column.

    7. View the test data that is written in the OSS console.

      You can view the test data that has been written after the first checkpointing operation is complete.

  • Example 2: Ingest MySQL CDC data into data lakes

    This example shows how to read data from a MySQL CDC source table and then write data to a Hudi table.

    1. An OSS bucket is created.

      For more information, see Create buckets.

    2. On the Draft Editor page, write code for an SQL streaming draft in the text editor of the draft that you want to edit.

      CREATE TEMPORARY TABLE mysql_src (
        id BIGINT,
        name STRING,
        PRIMARY KEY(id) NOT ENFORCED
      ) WITH (
        'connector' = 'mysql-cdc',
        'hostname' = '<yourRDSHostName>',
        'port' = '3306',
        'username' = '<yourRDSUserName>',
        'password' = '<yourRDSPassword>',
        'database-name' = 'user_db.*', -- Use a regular expression to match multiple database shards. 
        'table-name' = 'user.*'   -- Use a regular expression to match multiple tables in the sharded database. 
      );
      
      CREATE TEMPORARY TABLE hudi_sink (
        id BIGINT PRIMARY KEY NOT ENFORCED,
        name STRING
      ) WITH (
        'connector' = 'hudi',
        'oss.endpoint' = '<yourOSSEndpoint>',
        'accessKeyId' = '<yourAccessKeyId>',
        'accessKeySecret' = '<yourAccessKeySecret>',
        'path' = 'oss://<yourOSSBucket>/<Path to Table>/',
        'table.type' = 'MERGE_ON_READ'
      );
      
      INSERT INTO hudi_sink SELECT * FROM mysql_src;
    3. On the right side of the Draft Editor page, click the Advanced tab and set Engine Version to vvr-4.0.15-flink-1.13.

    4. Click Validate.

    5. In the upper-right corner of the Draft Editor page, click Publish.

    6. On the Deployments page in the console of fully managed Flink, find the desired deployment and click Start in the Actions column.

      After the deployment is published, you can view the vertex graph of the deployment on the Overview tab to learn the running process of the deployment.上线成功

    7. View the test data that has been written in the OSS console.

      You can view the test data that has been written in the OSS console after the first checkpointing operation is complete. If a DLF catalog is configured for your Hudi table, you can use the data exploration and query feature of DLF to query the written data.