All Products
Search
Document Center

E-MapReduce:Kudu connector

Last Updated:Jul 09, 2024

You can use a Kudu connector to query data in, insert data into, and delete data from Kudu tables.

Background information

This topic describes the following information about a Kudu connector and operations that you can perform:

Prerequisites

A Hadoop cluster that contains the Kudu service and a Trino cluster are created. For more information, see Create a cluster.

Limits

  • You can use a Kudu connector to connect only to Kudu 1.10 or later.

  • You must establish a network connection between the Trino cluster and the Hadoop cluster.

  • The names of Kudu tables and columns can contain only lowercase letters.

Modify the configurations of a Kudu connector

You can modify the configurations of a Kudu connector. For more information, see Configure a connector.

Log on to the E-MapReduce (EMR) console and go to the Configure tab of the Trino service page. On the Configure tab, click kudu.properties. Modify or add the configuration items that are described in the following table based on your business requirements.

Configuration item

Description

kudu.client.master-addresses

The Kudu master address. If you want to configure multiple Kudu master addresses, separate the addresses with commas (,).

The following address formats are supported: example.com, example.com:7051, 192.0.2.1, 192.0.2.1:7051, [2001:db8::1], [2001:db8::1]:7051, and 2001:db8::1.

Default value: localhost.

Note

To ensure that you can write data to and query data from a Kudu table, change the default value localhost to the IP address or hostname of the master node of the Kudu cluster. Example: master-1-1.

kudu.schema-emulation.enabled

Specifies whether to enable the schema emulation feature. Valid values:

  • false (default)

  • true

Important

You can click Add Configuration Item on the kudu.properties tab to add this configuration item. For more information about how to add a configuration item, see Add configuration items.

kudu.schema-emulation.prefix

The prefix for the schema emulation feature.

Important

You must configure this parameter if you set the kudu.schema emulation.enabled configuration item to true.

The standard prefix is 'presto::`. You can leave this parameter empty.

kudu.client.default-admin-operation-timeout

The default timeout period for administrative operations, such as the operations to create and delete tables.

Default value: 30. Unit: seconds.

kudu.client.default-operation-timeout

The default timeout period for user operations.

Default value: 30. Unit: seconds.

kudu.client.default-socket-read-timeout

The default timeout period for waiting for data from a socket.

Default value: 10. Unit: seconds.

kudu.client.disable-statistics

Specifies whether to enable the Kudu client to collect statistical information. Valid values:

  • false (default value)

  • true

Query data

Apache Kudu does not support schemas. However, you can configure a Kudu connector to emulate schemas.

Schema emulation disabled (default)

By default, the schema emulation feature is disabled. In this case, all Kudu tables reside in the default schema.

For example, you can execute the SELECT * FROM kudu.default.orders statement to query data from the orders table. If you specify kudu as the catalog and default as the schema, you can execute the SELECT * FROM orders statement to query data from the orders table.

The name of a Kudu table can contain any characters. If a table name contains a special character, you must enclose the name in a pair of double quotation marks ("). For example, to query data from the special.table! table, execute the SELECT * FROM kudu.default."special.table!" statement.

Examples:

  1. Create a table named users in the default schema.

    CREATE TABLE kudu.default.users (
      user_id int WITH (primary_key = true),
      first_name varchar,
      last_name varchar
    ) WITH (
      partition_by_hash_columns = ARRAY['user_id'],
      partition_by_hash_buckets = 2
    );
    Note

    When you create a table, you must specify the required table information, such as the primary key, the encoding format or compression format of columns, and the hash partitions or range partitions.

  2. View information about the table.

    DESCRIBE kudu.default.users;

    Information similar to the following output is returned:

       Column   |  Type   |                      Extra                      | Comment
    ------------+---------+-------------------------------------------------+---------
     user_id    | integer | primary_key, encoding=auto, compression=default |
     first_name | varchar | nullable, encoding=auto, compression=default    |
     last_name  | varchar | nullable, encoding=auto, compression=default    |
    (3 rows)
  3. Insert data into the table.

    INSERT INTO kudu.default.users VALUES (1, 'Donald', 'Duck'), (2, 'Mickey', 'Mouse');
  4. Query data from the table.

    SELECT * FROM kudu.default.users;

Schema emulation enabled

If you enable the schema emulation feature in the kudu.properties configuration file for the Kudu connector in the etc/catalog/ directory, Kudu tables are mapped to schemas based on the naming conventions.

  • If you configure kudu.schema-emulation.enabled=true and kudu.schema-emulation.prefix=, the mappings listed in the following table prevail.

    Kudu table name

    Presto table name

    orders

    kudu.default.orders

    part1.part2

    kudu.part1.part2

    x.y.z

    kudu.x."y.z"

    Note

    Kudu does not support schemas. Presto creates a special table named $schemas to manage schemas.

  • If you configure kudu.schema-emulation.enabled=true and kudu.schema-emulation.prefix=presto::, the mappings listed in the following table prevail.

    Kudu table name

    Presto table name

    orders

    kudu.default.orders

    part1.part2

    kudu.default."part1.part2"

    x.y.z

    kudu.default."x.y.z"

    presto::part1.part2

    kudu.part1.part2

    presto::x.y.z

    kudu.x."y.z"

    Note

    Kudu does not support schemas. Presto creates a special table named presto::$schemas to manage schemas.

Data type mappings

The following table describes the mappings between Presto data types and Kudu data types.

Presto data type

Kudu data type

Remarks

BOOLEAN

BOOL

None.

TINYINT

INT8

SMALLINT

INT16

INTEGER

INT32

BIGINT

INT64

REAL

FLOAT

DOUBLE

DOUBLE

VARCHAR

STRING

When you execute the CREATE TABLE ... AS ... statement to create a Kudu table from an existing Presto table, the maximum length for VARCHAR is lost.

VARBINARY

BINARY

TIMESTAMP

UNIXTIME_MICROS

The precision of a Kudu column of this data type is reduced from µs to ms.

DECIMAL

DECIMAL

This data type is supported only by Kudu servers of 1.7.0 or a later version.

DATE

N/A

Kudu does not have a data type that matches this data type.

When you execute the CREATE TABLE ... AS ... statement to create a Kudu table from an existing Presto table, the DATE type for a column is converted to the STRING type.

CHAR

N/A

Kudu does not have a data type that matches this data type.

TIME

JSON

TIME WITH TIMEZONE

TIMESTAMP WITH TIME ZONE

INTERVAL YEAR TO MO NTH

INTERVAL DAY TO SEC OND

ARRAY

MAP

IPADDRESS

Supported Presto SQL statements

Note

The ALTER SCHEMA ... RENAME TO ... statement is not supported.

Statement

Remarks

SELECT

None.

INSERT INTO ... VALUES

None.

INSERT INTO ... SELECT ...

None.

DELETE

None.

DROP SCHEMA

This statement is supported only if schema emulation is enabled.

CREATE SCHEMA

This statement is supported only if schema emulation is enabled.

CREATE TABLE

For information about how to create a table, see Create a table.

CREATE TABLE ... AS

None.

DROP TABLE

None.

ALTER TABLE ... RENAME TO ...

None.

ALTER TABLE ... ADD COLUMN ...

For information about how to add a column, see Add a column.

ALTER TABLE ... RENAME COLUMN ...

These statements are supported only if the column that you want to rename or drop is not a primary key column.

ALTER TABLE ... DROP COLUMN ...

SHOW SCHEMAS

None.

SHOW TABLES

None.

SHOW CREATE TABLE

None.

SHOW COLUMNS FROM

None.

DESCRIBE

This statement is equivalent to SHOW COLUMNS FROM.

CALL kudu.system.add_range_partition

This statement is used to add a range partition. For more information, see Range partitions.

CALL kudu.system.drop_range_partition

This statement is used to drop a range partition. For more information, see Range partitions.

Create a table

When you create a table, you must specify columns, data types, and partition information. You can also specify the column encoding format or compression format based on your business requirements. Example:

CREATE TABLE user_events (
  user_id int WITH (primary_key = true),
  event_name varchar WITH (primary_key = true),
  message varchar,
  details varchar WITH (nullable = true, encoding = 'plain')
) WITH (
  partition_by_hash_columns = ARRAY['user_id'],
  partition_by_hash_buckets = 5,
  number_of_replicas = 3
);

In this example, user_id and event_name are primary key columns. The table is divided into five partitions based on the hash values in the user_id column. The value of number_of_replicas is 3.

Take note of the following items when you configure parameters for the CREATE TABLE statement:

  • Primary key columns must be specified before other columns, and only a primary key column can be configured as a partition key column.

  • The number_of_replicas parameter is optional. This parameter specifies the number of tablet replicas and must be set to an odd number. If you do not configure this parameter, the default replication factor from the Kudu master configuration is used.

  • Kudu supports hash partitions and range partitions. A hash partition distributes rows by hash value to one of many buckets. A range partition distributes rows by using an ordered range partition key. Range partitions must be explicitly created. Kudu supports multi-level partitioning. A table must contain at least one hash or range partition. A table can contain only one range partition but multiple hash partitions.

Column properties

In addition to column names and data types, you can also specify other column properties.

Column property

Data type

Description

primary_key

BOOLEAN

If this parameter is set to true, the column is used as a primary key column.

A Kudu primary key enforces the uniqueness constraint. If you insert a row that has the same primary key value as an existing row, the existing row is updated. For more information, see Primary Key Design.

nullable

BOOLEAN

If you set this property to true, the column can contain null values.

Important

A primary key column cannot contain null values.

encoding

VARCHAR

Specifies the column encoding format to save storage space and improve query performance.

If you do not configure this property, Kudu encodes data in the column based on the column data type. Valid values: auto, plain, bitshuffle, runlength, prefix, dictionary, and group_varint. For more information, see Column Encoding.

compression

VARCHAR

Specifies the column compression format.

If you do not configure this parameter, Kudu uses the default compression format. Valid values: default, no, lz4, snappy, and zlib. For more information, see Column compression.

Example:

CREATE TABLE mytable (
  name varchar WITH (primary_key = true, encoding = 'dictionary', compression = 'snappy'),
  index bigint WITH (nullable = true, encoding = 'runlength', compression = 'lz4'),
  comment varchar WITH (nullable = true, encoding = 'plain', compression = 'default'),
   ...
) WITH (...);

Partition design

A table can contain only one range partition or at least one hash partition.

  • Define hash partitions

    • Define one partition group

      You can use the table property partition_by_hash_columns to specify partition key columns and use the table property partition_by_hash_buckets to specify the number of partitions. The partition key columns must be a subset of primary key columns. Example:

      CREATE TABLE mytable (
        col1 varchar WITH (primary_key=true),
        col2 varchar WITH (primary_key=true),
        ...
      ) WITH (
        partition_by_hash_columns = ARRAY['col1', 'col2'],
        partition_by_hash_buckets = 4
      )
      Note

      In this example, col1 and col2 columns are defined as hash partition key columns, and data is distributed to four partitions.

    • Define two partition groups

      If you want to define two independent hash partition groups, in addition to the table properties specified in the preceding example, you must specify the table properties partition_by_second_hash_columns and partition_by_second_hash_buckets. Example:

      CREATE TABLE mytable (
        col1 varchar WITH (primary_key=true),
        col2 varchar WITH (primary_key=true),
        ...
      ) WITH (
        partition_by_hash_columns = ARRAY['col1'],
        partition_by_hash_buckets = 2,
        partition_by_second_hash_columns = ARRAY['col2'],
        partition_by_second_hash_buckets = 3
      )
      Note

      In this example, two hash partition groups are defined. In the first hash partition group, rows are distributed to two partitions based on the col1 column. In the second hash partition group, rows are distributed to three partitions based on the col2 column. In this case, the total number of partitions in the table is 6 (2 × 3).

  • Define range partitions

    A Kudu table can contain only one range partition, which can be defined by using the table property partition_by_range_columns. When you create a table, you can use the table property range_partitions to define the range of the partition. You can use the table properties kudu.system.add_range_partition and kudu.system.drop_range_partition to manage the range partitions of existing tables. Example:

    CREATE TABLE events (
      rack varchar WITH (primary_key=true),
      machine varchar WITH (primary_key=true),
      event_time timestamp WITH (primary_key=true),
      ...
    ) WITH (
      partition_by_hash_columns = ARRAY['rack'],
      partition_by_hash_buckets = 2,
      partition_by_second_hash_columns = ARRAY['machine'],
      partition_by_second_hash_buckets = 3,
      partition_by_range_columns = ARRAY['event_time'],
      range_partitions = '[{"lower": null, "upper": "2018-01-01T00:00:00"},
                           {"lower": "2018-01-01T00:00:00", "upper": null}]'
    )
    Note

    In this example, two hash partition groups and one range partition are defined. The table is range partitioned on the event_time field, and data is split based on 2018-01-01T00:00:00.

  • Manage range partitions

    You can use a stored procedure to add a range partition to or drop a range partition from an existing table.

    Examples:

    • Add a range partition

      CALL kudu.system.add_range_partition(<YOUR_SCHEMA_NAME>, <YOUR_TABLE_NAME>, <range_partition_as_json_string>)
    • Drop a range partition

      CALL kudu.system.drop_range_partition(<YOUR_SCHEMA_NAME>, <YOUR_TABLE_NAME>, <range_partition_as_json_string>)

    Parameter

    Description

    <YOUR_SCHEMA_NAME>

    The schema to which the table belongs.

    <YOUR_TABLE_NAME>

    The name of the table.

    <range_partition_as_json_string>

    The upper and lower bounds of the range partition. You must configure this parameter in the '{"lower": <value>, "upper": <value>}' JSON format. If the partition has multiple columns, you must configure this parameter in the '{"lower": [<value_col1>,...], "upper": [<value_col1>,...]}' format. The specific value formats of the upper and lower bounds depend on the data types of columns. Mappings between data types and JSON string formats:

    • BIGINT: '{"lower": 0, "upper": 1000000}'

    • SMALLINT: '{"lower": 10, "upper": null}'

    • VARCHAR: '{"lower": "A", "upper": "M"}'

    • TIMESTAMP: '{"lower": "2018-02-01T00:00:00.000", "upper": "2018-02-01T12:00:00.000"}'

    • BOOLEAN: '{"lower": false, "upper": true}'

    • VARBINARY: Base64-encoded strings

    Note

    If you set this parameter to null, the partition is unbounded.

    Example:

    CALL kudu.system.add_range_partition('myschema', 'events', '{"lower": "2018-01-01", "upper": "2018-06-01"}')
    Note

    In this example, a range partition is added to the events table in the myschema schema. The lower bound of the partition is 2018-01-01, whose accurate value is 2018-01-01T00:00:00.000. The upper bound of the partition is 2018-06-01.

    You can execute the SHOW CREATE TABLE statement to query the existing range partition of the table. In the returned results, the table property range_partitions indicates the partition information of the table.

Add a column

You can execute the ALTER TABLE ... ADD COLUMN ... statement to add a column to an existing table. You can also use column properties to add columns. For more information about the column properties, see Create a table.

ALTER TABLE mytable ADD COLUMN extraInfo varchar WITH (nullable = true, encoding = 'plain')