Unlock the Power of AI

1 million free tokens

88% Price Reduction

Activate Now

Partitioning and bucketing

Updated at: 2024-05-09 07:49

This topic describes the partitioning and bucketing features of ApsaraDB for SelectDB and how to use the partitions and buckets of ApsaraDB for SelectDB.

Overview

To efficiently store and compute large amounts of data, ApsaraDB for SelectDB divides the data based on partitions and distributes the data to a distributed system for processing.

All data models supported by ApsaraDB for SelectDB support data partitioning at the following two levels:

  • One level: Data is partitioned at only one level.

    • When you create a table, you do not need to write a statement to partition data. In this case, ApsaraDB for SelectDB creates a default partition that is transparent to you. If you partition data at one level, only bucketing is supported.

  • Two levels: Data is partitioned at two levels.

    • The first level is partition, which supports range partitioning and list partitioning.

    • The second level is bucket, which is also known as tablet and supports hash partitioning.

Partitioning

Partitions are used to divide data into different ranges. This is similar to dividing a table into multiple child tables. This way, you can manage data in partitions. When you use partitions, take note of the following items:

  • You can specify one or more columns as partition key columns. Partition key columns must be key columns.

  • You must enclose partition key values in double quotation marks (") regardless of the partition key column type.

  • The number of partitions that you can create is theoretically unlimited.

  • If you create a table without specifying partitions, the system automatically creates a partition whose name is the same as the table and that contains the full data of the table. This partition is invisible to you and cannot be deleted or modified.

  • When you create a partition, the range of the partition cannot overlap with that of another partition.

Range partitioning

In most cases, time columns are used as partition key columns during range partitioning to facilitate the management of new and historical data. Range partitions allow you to specify only the upper limit by executing the VALUES LESS THAN (...) statement. The system uses the upper limit of the previous partition as the lower limit of the current partition to create a partition whose range is left-closed and right-open. You can also specify the upper and lower limits by executing the VALUES [... statement to create a partition whose range is left-closed and right-open.

Single-column partitioning

This section describes how the partition ranges of a table change if you execute the VALUES LESS THAN (...) statement to create or delete partitions for the table. The following sample code provides an example:

  1. Create a table named test_table.

    CREATE TABLE IF NOT EXISTS test_db.test_table
    (
      `user_id` LARGEINT NOT NULL COMMENT "The user ID", 
      `date` DATE NOT NULL COMMENT "The date on which data is imported to the table", 
      `timestamp` DATETIME NOT NULL COMMENT "The time when data is imported to the table", 
      `city` VARCHAR(20) COMMENT "The city in which the user resides", 
      `age` SMALLINT COMMENT "The age of the user", 
      `sex` TINYINT COMMENT "The gender of the user", 
      `last_visit_date` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT "The last time when the user paid a visit",
      `cost` BIGINT SUM DEFAULT "0" COMMENT "The amount of money that the user spends",
      `max_dwell_time` INT MAX DEFAULT "0" COMMENT "The maximum dwell time of the user",
      `min_dwell_time` INT MIN DEFAULT "99999" COMMENT "The minimum dwell time of the user"
    )ENGINE=OLAP
    AGGREGATE KEY(`user_id`, `date`, `timestamp`, `city`, `age`, `sex`)
    PARTITION BY RANGE(`date`)
    ( 
      PARTITION `p201701` VALUES LESS THAN ("2017-02-01"),
      PARTITION `p201702` VALUES LESS THAN ("2017-03-01"),
      PARTITION `p201703` VALUES LESS THAN ("2017-04-01")
    )
    DISTRIBUTED BY HASH(`user_id`) BUCKETS 16;

    After the test_table table is created, the following three partitions are automatically created:

    p201701: [MIN_VALUE, 2017-02-01)
    p201702: [2017-02-01, 2017-03-01)
    p201703: [2017-03-01, 2017-04-01)
  2. Execute the ALTER TABLE test_db.test_table ADD PARTITION p201705 VALUES LESS THAN ("2017-06-01"); statement to create a partition named p201705. The following sample code shows the partitioning results:

    p201701: [MIN_VALUE, 2017-02-01)
    p201702: [2017-02-01, 2017-03-01)
    p201703: [2017-03-01, 2017-04-01)
    p201705: [2017-04-01, 2017-06-01)
  3. Execute the ALTER TABLE test_db.test_table DROP PARTITION p201703; statement to delete the p201703 partition. The following sample code shows the partitioning results:

    p201701: [MIN_VALUE, 2017-02-01)
    p201702: [2017-02-01, 2017-03-01)
    p201705: [2017-04-01, 2017-06-01)
    Important

    In the preceding example, after the p201703 partition is deleted, the ranges of the p201702 and p201705 partitions remain unchanged. However, the range [2017-03-01,2017-04-01) between the two ranges is vacant. The existing data in this range is also deleted. In this case, if the data to be imported is within the vacant range, the data cannot be imported.

  4. Delete the p201702 partition. The following sample code shows the partitioning results:

    p201701: [MIN_VALUE, 2017-02-01)
    p201705: [2017-04-01, 2017-06-01)

    The vacant range becomes [2017-02-01,2017-04-01).

  5. Execute the `p201702new` VALUES LESS THAN ("2017-03-01") statement to create a partition. The following sample code shows the partitioning results:

    p201701: [MIN_VALUE, 2017-02-01)
    p201702new: [2017-02-01, 2017-03-01)
    p201705: [2017-04-01, 2017-06-01)

    The vacant range becomes [2017-03-01,2017-04-01).

  6. Delete the p201701 partition and execute the `p201612` VALUES LESS THAN ("2017-01-01") statement to create a partition. The following sample code shows the partitioning results:

    p201612: [MIN_VALUE, 2017-01-01)
    p201702new: [2017-02-01, 2017-03-01)
    p201705: [2017-04-01, 2017-06-01) 

    The vacant ranges become [2017-01-01,2017-02-01) and [2017-03-01,2017-04-01).

The preceding example shows that the ranges of existing partitions remain unchanged after you delete partitions. However, vacant ranges may occur. If you execute the VALUES LESS THAN (...) statement to create a partition, the lower limit of the partition must be adjacent to the upper limit of the previous partition.

Multiple-column partitioning

When you create partitions for a table, you can partition data based on multiple columns. The following sample code provides an example:

PARTITION BY RANGE(`date`, `id`)
(
  PARTITION `p201701_1000` VALUES LESS THAN ("2017-02-01", "1000"),
  PARTITION `p201702_2000` VALUES LESS THAN ("2017-03-01", "2000"),
  PARTITION `p201703_all` VALUES LESS THAN ("2017-04-01")
)

In this example, the date and id columns are specified as partition key columns. The date column is of the DATE type, and the id column is of the INT type. The following sample code shows the partitioning results:

* p201701_1000: [(MIN_VALUE, MIN_VALUE), ("2017-02-01", "1000") )
* p201702_2000: [("2017-02-01", "1000"), ("2017-03-01", "2000") )
* p201703_all: [("2017-03-01", "2000"), ("2017-04-01", MIN_VALUE)) 

In the last partition, only the value of the date column is specified. By default, MIN_VALUE is used as the value of the id column. When you insert data, the system compares the data with the specified partition key values in sequence to determine the partition into which data is inserted. The following sample code provides an example:

* Data --> Partition
* 2017-01-01, 200 --> p201701_1000
* 2017-01-01, 2000 --> p201701_1000
* 2017-02-01, 100 --> p201701_1000
* 2017-02-01, 2000 --> p201702_2000
* 2017-02-15, 5000 --> p201702_2000
* 2017-03-01, 2000 --> p201703_all
* 2017-03-10, 1 --> p201703_all
* 2017-04-01, 1000 --> Failed to be imported.
* 2017-05-01, 1000 --> Failed to be imported.

List partitioning

List partitioning supports partition key columns of the following data types: BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, LARGEINT, DATE, DATETIME, CHAR, and VARCHAR. Partition key values are enumeration values. The data to be imported hits a partition only if the data contains one of the enumeration values of the partition.

You can specify the enumeration values contained in each partition by executing the VALUES IN (...) statement.

Single-column partitioning

This section describes how the partitions of a table change if you execute the VALUES IN (...) statement to create or delete partitions for the table. The following sample code provides an example:

  1. Create a table named test_table1.

    CREATE TABLE IF NOT EXISTS test_db.example_list_tbl1
    (
        `user_id` LARGEINT NOT NULL COMMENT "The user ID",
        `date` DATE NOT NULL COMMENT "The date on which data is imported to the table",
        `timestamp` DATETIME NOT NULL COMMENT "The time when data is imported to the table",
        `city` VARCHAR(20) NOT NULL COMMENT "The city in which the user resides",
        `age` SMALLINT COMMENT "The age of the user",
        `sex` TINYINT COMMENT "The gender of the user",
        `last_visit_date` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT "The last time when the user paid a visit",
        `cost` BIGINT SUM DEFAULT "0" COMMENT "The amount of money that the user spends",
        `max_dwell_time` INT MAX DEFAULT "0" COMMENT "The maximum dwell time of the user",
        `min_dwell_time` INT MIN DEFAULT "99999" COMMENT "The minimum dwell time of the user"
    )
    ENGINE=olap
    AGGREGATE KEY(`user_id`, `date`, `timestamp`, `city`, `age`, `sex`)
    PARTITION BY LIST(`city`)
    (
        PARTITION `p_cn` VALUES IN ("Beijing", "Shanghai", "Hong Kong"),
        PARTITION `p_usa` VALUES IN ("New York", "San Francisco"),
        PARTITION `p_jp` VALUES IN ("Tokyo")
    )
    DISTRIBUTED BY HASH(`user_id`) BUCKETS 16;

    After the test_table1 table is created, the following three partitions are automatically created:

    p_cn: ("Beijing", "Shanghai", "Hong Kong")
    p_usa: ("New York", "San Francisco")
    p_jp: ("Tokyo")
  2. Execute the `p_uk` VALUES IN ("London") statement to create a partition. The following sample code shows the partitioning results:

    p_cn: ("Beijing", "Shanghai", "Hong Kong")
    p_usa: ("New York", "San Francisco")
    p_jp: ("Tokyo")
    p_uk: ("London")
  3. Delete the p_jp partition. The following sample code shows the partitioning results:

    p_cn: ("Beijing", "Shanghai", "Hong Kong")
    p_usa: ("New York", "San Francisco")
    p_uk: ("London")

Multiple-column partitioning

When you create partitions for a table, you can partition data based on multiple columns. The following sample code provides an example:

PARTITION BY LIST(`id`, `city`)
(
	PARTITION `p1_city` VALUES IN (("1", "Beijing"), ("1", "Shanghai")),
	PARTITION `p2_city` VALUES IN (("2", "Beijing"), ("2", "Shanghai")),
	PARTITION `p3_city` VALUES IN (("3", "Beijing"), ("3", "Shanghai"))
)

In this example, the id and city columns are specified as partition key columns. The id column is of the INT type, and the city column is of the VARCHAR type. The following sample code shows the partitioning results:

* p1_city: [("1", "Beijing"), ("1", "Shanghai")]
* p2_city: [("2", "Beijing"), ("2", "Shanghai")]
* p3_city: [("3", "Beijing"), ("3", "Shanghai")]

When you insert data, the system compares the data with the specified partition key values in sequence to determine the partition into which data is inserted. The following sample code provides an example:

* Data ---> Partition
* 1, Beijing ---> p1_city
* 1, Shanghai ---> p1_city
* 2, Shanghai ---> p2_city
* 3, Beijing ---> p3_city
* 1, Tianjin ---> Failed to be imported.
* 4, Beijing ---> Failed to be imported.

Bucketing

Data is divided and stored in different buckets based on the hash value of the bucket column.

  • If partitions are used, the DISTRIBUTED... statement describes the rules for dividing data in each partition. If no partitions are used, the statement describes the rules for dividing the full data of a table.

  • You can specify multiple columns as bucket columns. For the Aggregate or Unique model, bucket columns must be key columns. For the Duplicate model, bucket columns can be key columns or value columns. Bucket columns can be the same as or different from partition key columns.

  • To specify bucket columns, you must strike a balance between the query throughput and query concurrency.

    • If you specify multiple bucket columns, the data is more evenly distributed. If the conditions for a query do not include the equivalent conditions for all bucket columns, the system scans all buckets. This increases the query throughput and reduces the query latency. This method is suitable for high-throughput and low-concurrency query scenarios.

    • If you specify only one or a small number of bucket columns, the system scans only one bucket for a point query. In this case, if multiple point queries are concurrently performed, the system may scan different buckets. The I/O operations of queries do not affect each other, especially when different buckets are distributed on different disks. Therefore, this method is suitable for high-concurrency point query scenarios.

  • The number of buckets that you can create is theoretically unlimited.

Best practices

Suggestions on configuring partitions and buckets

  • The total number of buckets in a table is calculated based on the following formula: Total number of buckets = Number of partitions × Number of buckets in each partition.

  • If the configurations of a cluster are not changed, the recommended number of buckets in a partition of a table can be slightly greater than the total number of disks in the cluster.

  • Theoretically, the amount of data that can be stored in a single bucket is unlimited. However, we recommend that you store 1 to 10 GB of data in a bucket. If a small amount of data is stored in a single bucket, data aggregation may fail to meet your business requirements and the pressure on metadata management is high. If a large amount of data is stored in a single bucket, it is not conducive to the migration and replenishment of replicas. This increases the cost of retrying failed operations such as schema change or rollup because these operations are performed at the bucket level.

  • If you cannot find a balance between the amount of data stored in a single bucket and the number of buckets, we recommend that you prioritize the amount of data stored.

  • When you create a table, the same number of buckets is specified for each partition. However, when you execute the ADD PARTITION statement to dynamically create a partition, you can separately specify the number of buckets in the new partition. You can use this feature to handle data reduction or expansion.

  • You cannot change the number of buckets in a partition after the partition is created. Therefore, when you determine the number of buckets, you must consider the scale-out of your cluster in advance. For example, your cluster consists of three machines and each machine has a disk. If the number of buckets is set to 3 or a smaller value, the concurrency cannot be increased even if you increase the number of machines in the cluster.

The following table describes the suggestions on configuring partitions and buckets for a cluster that consists of 10 backends and has a disk on each backend.

Table size

500 MB

5 GB

50 GB

500 GB

5 TB

Partitions

No partition is required.

No partition is required.

No partition is required.

Each partition is 50 GB in size.

Each partition is 50 GB in size.

Buckets

The table contains four to eight buckets.

The table contains 8 to 16 buckets.

The table contains 32 buckets.

Each partition contains 16 to 32 buckets.

Each partition contains 16 to 32 buckets.

Note

You can execute the SHOW DATA; statement to query the size of a table.

Configuration and use of the random distribution method

For the detailed data that does not need to be aggregated or updated, you can create a table by using the Duplicate model and the random distribution method. The following sample code provides an example:

CREATE TABLE IF NOT EXISTS test.example_tbl
(
    `timestamp` DATETIME NOT NULL COMMENT "The time when the log was generated",
    `type` INT NOT NULL COMMENT "The type of the log",
    `error_code` INT COMMENT "The error code",
    `error_msg` VARCHAR(1024) COMMENT "The error message",
    `op_id` BIGINT COMMENT "The owner ID",
    `op_time` DATETIME COMMENT "The time when the error was handled"
)
DUPLICATE KEY(`timestamp`, `type`, `error_code`)
DISTRIBUTED BY RANDOM BUCKETS 16;
  • The table that uses the Duplicate key model does not contain columns whose aggregation type is REPLACE. You can set the data bucketing mode of the table to RANDOM. This prevents serious data skews. When you import data to the table, a single import job writes data to a random bucket of a partition.

  • If you set the data bucketing mode to RANDOM for the table, you cannot query only specific buckets based on the values of bucket columns because no bucket column is specified. When you query the table, the system scans all buckets of the partition that the query hits. This setting is suitable for the aggregate queries and analysis of the full data in the table instead of high-concurrency point queries.

  • If the table that uses the Duplicate model uses the random distribution method, you can enable the single-bucket import mode when you import data. To enable the single-bucket import mode, set the load_to_single_tablet parameter to true. By default, this parameter is set to false. In this case, when you import a large amount of data, a job writes data to only one bucket of a partition. This can improve the concurrency and throughput of data import, reduce the write amplification issues caused by data import and compaction, and ensure the stability of your cluster.

Scenarios in which partitions and buckets are used together

  • If a table contains time dimension columns or dimension columns that have ordered values, such dimension columns can be used as partition key columns. The granularity based on which data is partitioned can be evaluated based on the import frequency and the amount of data to be stored in each partition.

  • If you want to delete historical data to retain only the data within the previous N days, you can use compound partitioning to delete historical partitions. Alternatively, you can execute the DELETE statement to delete the data in a specific partition.

  • To prevent data skews, you can separately specify the number of buckets for each partition. For example, in scenarios in which data is partitioned by the day, if the amount of data greatly varies every day, you can customize the number of buckets for each partition. We recommend that you specify bucket columns that are easy to be identified and based on which data can be evenly distributed.

  • On this page (1, O)
  • Overview
  • Partitioning
  • Range partitioning
  • List partitioning
  • Bucketing
  • Best practices
  • Suggestions on configuring partitions and buckets
  • Configuration and use of the random distribution method
  • Scenarios in which partitions and buckets are used together
Feedback