Promo Center

50% off for new user

Direct Mail-46% off

Learn More

Access column-oriented data

Updated at: 2025-01-09 08:53

Column-oriented storage is a data management method that stores and processes data by column. The Lindorm compute engine provides column-oriented storage to store semi-structured and structured data. Compared with row-oriented storage, column-oriented storage reduces query response time and saves I/O resources. This topic describes how to access Lindorm column-oriented data by using the compute engine.

Background information

Lindorm column-oriented storage is a columnar distributed storage service. Column-oriented storage is suitable for storing large amounts of semi-structured and structured data in scenarios such as Internet of Vehicles (IoV), Internet of Things (IoT), orders, and logs. Column-oriented storage provides the following core capabilities:

  • Computing and analytics

    The Lindorm compute engine can access column-oriented data to perform interactive analytics and online computing. Column-oriented storage provides the data distribution feature and rich indexing capabilities, which can effectively accelerate data positioning and arrangement in the computing process. You can add, remove, modify, and query large amounts of primary key data by using SQL statements.

  • High throughput

    Column-oriented storage supports horizontal scaling and provides the capability to read and write terabytes of data per minute. This makes it suitable for high-throughput data scenarios such as quick import of IoV data, model training dataset access, and large-scale report analysis and production.

  • Cost-effectiveness

    Lindorm implements technologies such as columnar high compression ratio algorithms, high-density low-cost media, hot and cold data separation, compression encoding, and cold data archive storage. Compared with self-managed storage, Lindorm column-oriented storage significantly reduces storage costs, allowing you to archive and store large amounts of data at reduced costs.

  • High availability

    By using technologies such as erasure coding, Lindorm column-oriented storage ensures the high availability of distributed datasets and eliminates the risk of single points of failure (SPOFs).

  • Compatibility with open source services

    Column-oriented storage is compatible with multiple open-source standard APIs, such as the Iceberg API. Column-oriented storage can also connect to compute engines such as Spark and Flink. This way, column-oriented storage can be seamlessly integrated into mainstream data ecosystems.

  • Hot and cold data separation

    You can store hot and cold data on different storage media based on your business requirements. This helps reduce the performance overhead caused by access to cold data, while effectively lowering storage costs.

Prerequisites

Feature description

DDL

Namespace
Table
Partition

Create a namespace (database).

USE lindorm_columnar;
CREATE NAMESPACE mydb;

Delete a namespace (database).

USE lindorm_columnar;
DROP NAMESPACE mydb;

Create a table.

USE lindorm_columnar;
CREATE TABLE mydb.mytable (
  id INT NOT NULL,
  city STRING NOT NULL,
  name STRING,
  score INT)
PARTITIONED BY (city, bucket(128,id))
TBLPROPERTIES(
  'primary-key' = 'id,city');

The following section describes primary keys and data partitioning methods.

Primary key

When you create a table, you can create a primary key table by specifying a primary key or create a non-primary key table without specifying a primary key. The following list describes how to create a primary key table and a non-primary key table, and the rules that you must follow when you create a table:

  • Create a primary key table. When you create a table, specify the primary-key parameter in the TBLPROPERTIES clause. You only need to specify the primary key fields of the table. The primary key table must meet the following requirements:

    • Multiple fields are separated by commas (,). The following data types are supported: BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE, STRING, and BINARY.

    • The primary key of the column-oriented table is unique.

    • If the primary key is written multiple times, the new data overwrites the old data.

    • You must specify a partition. The table partition expression field must be a primary key field. The last-level partition must be a bucket partition.

  • Create a non-primary key table. When you create a table, do not specify the primary-key parameter in the TBLPROPERTIES clause. A non-primary key table does not require partitioning and does not ensure data uniqueness. In this case, duplicate data may exist.

Data partitioning methods

When you create a table, use the PARTITIONED BY([regular partition expression],{bucket(bucketNum,bucketCol)}) clause to specify the data partitioning method.

  • Bucket partition expression

    • The bucketNum parameter specifies the number of shards, which directly affects the concurrency of data write and scanning.

      Note

      Different bucket partitions have different partition IDs, which are specified by the bucket_index parameter. The bucketNum parameter determines the number of bucket partitions in a regular partition.

      • To obtain the value of the bucket_index parameter, you can use the hash function to obtain the hash value of a specific partition field, and then take the remainder of the Hash value divided by the value of the bucketNum parameter. For example, in the mydb.mytable table, the bucket index is calculated as bucket_index = hash(id)%128.

      • The underlying storage is partitioned based on the value of the bucket_index parameter. We recommend that you evaluate the total data volume before you create a table and specify the bucketNum parameter properly to ensure that the amount of data in a single bucket partition is 50 to 512 MB.

    • The bucketCol parameter specifies the bucket partition fields.

      Important

      To avoid data skew, make sure that the values of the bucketCol parameter are discrete.

    Examples

    Specify only the bucket partition when you create a table.

    • Example 1:

      USE lindorm_columnar;
      CREATE TABLE mydb.mytable (
        id INT NOT NULL,
        city STRING,
        name STRING,
        score DOUBLE)
      PARTITIONED BY (bucket(1024,id))
      TBLPROPERTIES(
        'primary-key' = 'id');
    • Example 2:

      USE lindorm_columnar;
      CREATE TABLE mydb.mytable (
        id INT NOT NULL,
        timestamp LONG NOT NULL,
        city STRING,
        name STRING,
        score DOUBLE)
      PARTITIONED BY (bucket(512,timestamp))
      TBLPROPERTIES(
        'primary-key' = 'id,timestamp');
  • Regular partition expression

    For a regular partitioning expression, each distinct value results in a physical partition in the underlying storage. This partitioning mechanism helps optimize query performance through data pruning.

    Important

    Make sure that the values of the regular partition expression are concentrated. Common partition fields include date, city, and gender. If the regular partition expression involves discrete values such as timestamps, a large amount of column-oriented metadata may be generated.

    Examples

    Specify a regular partition and a bucket partition at the same time when you create a table.

    • Example 1:

      USE lindorm_columnar;
      CREATE TABLE mydb.mytable (
        id INT NOT NULL,
        year STRING NOT NULL,
        month STRING NOT NULL,
        day STRING NOT NULL,
        city STRING,
        name STRING,
        score DOUBLE)
      PARTITIONED BY (year, month, day, bucket(1024,id))
      TBLPROPERTIES(
        'primary-key' = 'id, year, month, day');
    • Example 2:

      USE lindorm_columnar;
      CREATE TABLE mydb.mytable (
        id INT NOT NULL,
        date STRING NOT NULL,
        city STRING NOT NULL,
        name STRING,
        score DOUBLE)
      PARTITIONED BY (date, city, bucket(1024,id))
      TBLPROPERTIES(
        'primary-key' = 'id,date,city');

View tables in the current namespace.

USE lindorm_columnar;
USE mydb;
SHOW TABLES;

View existing tables.

You can execute the following SQL statements to view the table schema.

USE lindorm_columnar;
SHOW CREATE TABLE mydb.mytable;
DESC mydb.mytable;

Delete a specific table.

USE lindorm_columnar;
-- Delete a table and retain data files.
DROP TABLE mydb.mytable;
-- Delete a table and data files.
DROP TABLE mydb.mytable PURGE;

Clear the data in a table.

USE lindorm_columnar;
TRUNCATE TABLE mydb.mytable;

Delete a partition.

You can execute the DELETE FROM statement to delete a partition by specifying the WHERE clause. Example:

USE lindorm_columnar;
DELETE FROM mydb.mytable WHERE city = 'beijing';

DML

Table
Rewrite data in a partition

Insert data into a table.

  • Example 1:

    USE lindorm_columnar;
    INSERT INTO mydb.mytable VALUES (0, 'beijing', 'zhang3', 99);
  • Example 2:

    USE lindorm_columnar;
    INSERT INTO mydb.mytable SELECT id, city, name, score FROM another_table;

Query data in a table.

  • Example 1:

    USE lindorm_columnar;
    SELECT * from mydb.mytable where id=0;
  • Example 2:

    USE lindorm_columnar;
    SELECT count(1), sum(score) from mydb.mytable where city = 'beijing';

After you write data to a column-oriented partition for a period of time, you can run the rewrite_data_files or rewrite_manifest command to rewrite data. For example, you can combine small files into larger files to reduce data or metadata redundancy and improve data query performance. For more information, see rewrite_data_files and rewrite_manifest.

Note

When you run the rewrite_data_files or rewrite_manifest command, database resources are consumed. We recommend that you run the command during off-peak hours.

  • Example 1:

    USE lindorm_columnar;
    CALL lindorm_columnar.system.rewrite_data_files(table => 'mydb.mytable');
  • Example 2:

    USE lindorm_columnar;
    CALL lindorm_columnar.system.rewrite_data_files(table => 'mydb.mytable', where => 'city=\"beijing\"');
  • Example 3:

    USE lindorm_columnar;
    CALL lindorm_columnar.system.rewrite_manifest('mydb.mytable'); 

Hot and cold data storage

In most cases, frequently accessed data is stored in high-performance storage and infrequently accessed historical data is migrated to low-cost storage. This helps reduce the storage costs and improve performance. Lindorm supports hot and cold data separation at three levels: L1, L2, and L3. You can develop policies for conversion between hot and cold data based on your business requirements. You can use the data lake service to automatically dump data between hot and cold data storage. This helps you effectively manage storage costs without extending the response time of key services.

Important

To enable the automatic hot-cold conversion feature for column-oriented data in Lindorm, contact Lindorm technical support (DingTalk ID: s0s3eg3).

Parameters

When you create a column-oriented table, you can define the cold or hot storage (CHS) attribute to specify how to dump data between cold and hot storage based on the time partition of the column-oriented table. The following table describes how to configure CHS parameters.

Parameter

Description

Parameter

Description

CHS

Specifies whether to enable the cold and hot data separation feature. The following list describes how to specify this parameter:

  • One long integer. Unit: seconds.

    • If the gap between the partition data time and the current time is less than or equal to the value, the data is stored in L1.

    • If the gap between the partition data time and the current time is greater than the value, the data is automatically dumped to L2.

    Note

    If this parameter is set to 259200, the cold and hot data is separately stored in two layers.

    • If the gap between the partition data time and the current time is less than or equal to 259,200 seconds, the data is stored in L1.

    • If the gap between the partition data time and the current time is greater than 259,200 seconds, the data is automatically dumped to L2.

  • Two long integers. Unit: seconds. Separate two long integers with a comma. Example: num0, num1, where num0 must be less than num1.

    • If the gap between the partition data time and the current time is less than or equal to the first value, the data is stored in L1.

    • If the gap between the partition data time and the current time is greater than the first value but less than or equal to the second value, the data is automatically dumped to L2.

    • If the gap between the partition data time and the current time is greater than the second value, the data is automatically dumped to L3.

    Note

    If this parameter is set to 259200, 864000, the cold and hot data is separately stored in three layers.

    • If the gap between the partition data time and the current time is less than or equal to 259,200 seconds, the data is stored in L1.

    • If the gap between the partition data time and the current time is greater than 259,200 seconds but less than or equal to 864,000 seconds, the data is automatically dumped to L2.

    • If the gap between the partition data time and the current time is greater than 864,000 seconds, the data is automatically dumped to L3.

CHS_L1

Specifies the storage type for L1. Format: 'CHS_L1'='storagetype=<desired storage type>'.

Note

If you do not specify this parameter when you create a table, the default storage type is Capacity storage.

If you store data in the cloud, you can set the desired storage type to one of the following values:

  • CAPACITY_CLOUD_STORAGE (default): stores data in Capacity storage.

  • STANDARD_CLOUD_STORAGE: stores data in standard storage.

  • PERFORMANCE_CLOUD_STORAGE: stores data in performance storage.

  • CLOUD_ARCHIVE_STORAGE: stores data in archive storage.

    Note

    Archive storage is in internal preview. To use archive storage, contact Lindorm technical support (DingTalk ID: s0s3eg3).

If you store data in local disks, you can set the desired storage type to one of the following values:

  • CAPACITY_CLOUD_STORAGE (default): stores data in Capacity storage.

  • LOCAL_SSD_STORAGE: stores data in local SSDs.

  • LOCAL_HDD_STORAGE: stores a large amount of data.

  • LOCAL_EBS_STORAGE: stores data in local ESSDs.

CHS_L2

Specifies the storage type for L2. CHS_L2 has the same format and valid values as CHS_L1.

Note

You must specify the CHS_L2 parameter.

CHS_L3

Specifies the storage type for L3. CHS_L3 has the same format and valid values as CHS_L1.

Note

If the CHS parameter is set to two long integers, you must specify the CHS_L3 parameter.

CHS_EXP

Specifies how to obtain the partition data time. Format: toSec(${column0},${pattern0},${column1},${pattern1},...${columnN},${patternN}).

Where:

  • columnN: the time partition field. Supported data types are INTEGER, LONG, STRING, and DATE.

  • patternN: the format of the corresponding time partition field. Valid values:

    • yyyy: year

    • MM: month

    • dd: day

    • HH: hour

    • mm: minute

  • toSec: the system function that calculates the maximum data time for the corresponding time partition. Examples:

    • Assume that the time partition fields are year, month, and day. For the year=2023, month=10, day=2 partition, toSec(year, yyyy) returns 2023-12-31 23:59:59, toSec(year, yyyy, month, MM) returns 2023-10-31 23:59:59, and toSec(year, yyyy, month, MM,day,'dd') returns 2023-10-02 23:59:59.

    • Assume that the time partition field is date. For the date=2023-10-02 partition, toSec(date, yyyy-MM-dd) returns 2023-10-02 23:59:59.

The Lindorm compute engine obtains the maximum partition data time returned from the CHS_EXP parameter and dumps the partition data to the corresponding storage based on the value specified by the CHS parameter.

Examples

  • Example 1:

    Create a table named table0. The partition field names are year, month, and day. Based on the hot and cold data separation policy that you specify, data that was stored one month (2,592,000 seconds) ago is automatically dumped to Capacity storage. You can execute the following statement to create a table:

    CREATE TABLE table0 (col0 INT,year STRING,month STRING,day STRING)
    PARTITIONED BY  (year,month)
    TBLPROPERTIES 
    'CHS'='2592000',
    'CHS_L2'='storagetype=CAPACITY_CLOUD_STORAGE',
    'CHS_EXP'='toSec(year,yyyy,month,MM,day,dd)'
    );
  • Example 2:

    Change the hot and cold data separation policy of table0. After the change, data that was stored one month (2,592,000 seconds) ago is automatically dumped to Capacity storage, and data that was stored three months (5,184,000 seconds) ago is automatically dumped to archive storage. You can execute the following statement to update a table:

    ALTER TABLE table0
    SET TBLPROPERTIES (
    'CHS'='2592000,5184000',
    'CHS_L2'='storagetype=CAPACITY_CLOUD_STORAGE',
    'CHS_L3'='storagetype=CLOUD_ARCHIVE_STORAGE',
    'CHS_EXP'='toSec(year,yyyy,month,MM,day,dd)'
    );
  • Example 3:

    Create a table named table1. The partition field name is dt. Example: 2020/12/1. Based on the hot and cold data separation policy that you specify, data that was stored one month (2,592,000 seconds) ago is automatically dumped to Capacity storage, and data that was stored three months (5,184,000 seconds) ago is automatically dumped to archive storage. You can execute the following statement to create a table:

    CREATE TABLE table1 (col0 INT,dt STRING)
    PARTITIONED BY  (dt)
    TBLPROPERTIES (
    'CHS'='2592000,5184000',
    'CHS_L2'='storagetype=CAPACITY_CLOUD_STORAGE',
    'CHS_L3'='storagetype=CLOUD_ARCHIVE_STORAGE',
    'CHS_EXP'='toSec(dt,yyyy/MM/dd)'
    );
Precautions
  • You can specify the CHS parameter when you create a table. If you want to change the hot and cold data separation policy, you can execute the ALTER TABLE ...SET TBLPROPERTIES... statement.

  • If you specify the CHS parameter incorrectly, the table can be created and updated as expected, but data cannot be automatically dumped between cold and hot storage.

  • Hot and cold data dumping is triggered in asynchronous mode. During and after data dumping, data access is not affected, but access performance may vary based on different storage media.

  • You can only implement the hot and cold data separation policy based on the time partition in a column-oriented table.

Best practices

You can use the following methods to accelerate data queries or computing.

Specify a primary key

If large amounts of datasets are stored in a table, you can specify a primary key to accelerate queries. A smaller primary key data range provides a better acceleration result.

In this example, a sample table is created by executing the following statements:

USE lindorm_columnar;
CREATE TABLE orders (
o_orderkey       INT NOT NULL,
o_custkey        INT,
o_orderstatus    STRING,
o_totalprice     DOUBLE,
o_orderdate      STRING,
o_orderpriority  STRING,
o_clerk          STRING,
o_shippriority   INT,
o_comment        STRING)
PARTITIONED BY (bucket(1024,o_orderkey))
TBLPROPERTIES(
'primary-key' = 'o_orderkey');           
  • Example 1:

    USE lindorm_columnar;
    SELECT * FROM orders WHERE o_orderkey=18394;
  • Example 2:

    USE lindorm_columnar;
    SELECT count(*) FROM orders WHERE o_orderkey>100000 AND o_orderkey<200000;
  • Example 3:

    USE lindorm_columnar;
    SELECT count(*) FROM orders WHERE o_orderkey>100000;

Add a partition

In a column-oriented storage engine of Lindorm, partitions are physically isolated from each other. You can add a partition to accelerate data queries.

In this example, a sample table is created by executing the following statements:

USE lindorm_columnar;
CREATE TABLE orders (
o_orderkey       INT NOT NULL,
o_custkey        INT,
o_orderstatus    STRING,
o_totalprice     DOUBLE,
o_orderdate      STRING NOT NULL,
o_orderpriority  STRING,
o_clerk          STRING,
o_shippriority   INT,
o_comment        STRING)
PARTITIONED BY (o_orderdate, bucket(1024,o_orderkey))
TBLPROPERTIES(
'primary-key' = 'o_orderdate,o_orderkey');
  • Example 1:

    USE lindorm_columnar;
    SELECT o_orderdate, count(*) FROM orders WHERE o_orderdate='2022-01-01' GROUP BY o_orderdate;
  • Example 2:

    USE lindorm_columnar;
    SELECT o_orderdate, count(*) FROM orders WHERE o_orderdate>='2022-01-01' AND o_orderdate<='2022-01-07' GROUP BY o_orderdate;

Query acceleration

You can rewrite data in a specific table or a specific partition of a table. This helps enhance data orderliness and compactness and improve data scanning performance.

In this example, a sample table is created by executing the following statements:

CREATE TABLE mydb.mytable (
  id INT NOT NULL, 
  city STRING NOT NULL, 
  name STRING, 
  score INT)
partitioned by (city, bucket(4, id))
tblproperties('primary-key' = 'id,city');
  • Example 1: Rewrite data in the mydb.mytable table.

    CALL lindorm_columnar.system.rewrite_data_files(table => 'mydb.mytable');
  • Example 2: Rewrite data in a specific partition.

    CALL lindorm_columnar.system.rewrite_data_files(table => 'mydb.mytable', where => 'city=\"beijing\"');

If you want to further improve the efficiency of queries after you rewrite the data, you can execute the following statement to specify the table parameters:

ALTER TABLE mydb.mytable SET TBLPROPERTIES ('read.scan-major-rewritten-files-only' = true);

Parameter description

read.scan-major-rewritten-files-only: specifies the data range for query. The data type is BOOLEAN. Valid values:

  • true: queries only the rewritten data. Do not query the incremental data that is not rewritten.

  • false (default): queries all data.

Non-primary key queries

When you rewrite data in a partition, data is sorted by primary key in a column-oriented table. After you create a table, you can specify a sort key based on your requirements to accelerate non-primary key queries.

Important

After you specify the sort key, you must rewrite data in a partition to ensure the acceleration performance. Only the rewritten data can be queried and the incremental data cannot be queried.

In this example, a sample table is created by executing the following statement:

USE lindorm_columnar;
CREATE TABLE orders (
o_orderkey       INT NOT NULL,
o_custkey        INT,
o_orderstatus    STRING,
o_totalprice     DOUBLE ,
o_orderdate      STRING ,
o_orderpriority  STRING,
o_clerk          STRING,
o_shippriority   INT,
o_comment        STRING)
PARTITIONED BY (bucket(1024,o_orderkey))
TBLPROPERTIES(
'primary-key' = 'o_orderkey',
'read.scan-major-rewritten-files-only' = 'true');

Execute the following statement to specify the sort key:

ALTER TABLE orders WRITE ORDERED BY o_shippriority,o_totalprice;

Execute the following statement to rewrite data:

CALL lindorm_columnar.system.rewrite_data_files(table => 'orders');

You can execute the following SQL statements to query rewritten data.

  • Example 1:

    USE lindorm_columnar;
    SELECT count(*) FROM orders WHERE o_shippriority=0;
  • Example 2:

    USE lindorm_columnar;
    SELECT count(*) FROM orders WHERE o_shippriority=0 AND o_totalprice>999.9;
  • On this page (1, T)
  • Background information
  • Prerequisites
  • Feature description
  • DDL
  • DML
  • Hot and cold data storage
  • Best practices
  • Specify a primary key
  • Add a partition
  • Query acceleration
  • Non-primary key queries
Feedback
phone Contact Us

Chat now with Alibaba Cloud Customer Service to assist you in finding the right products and services to meet your needs.

alicare alicarealicarealicare