Use KMS to build MaxCompute UDFs to encrypt and decrypt data

Updated at: 2025-04-07 07:03

If you want to encrypt and decrypt user data in MaxCompute SQL, you can use user-defined functions (UDFs) together with Key Management Service (KMS) instances. This topic describes how to use KMS instances to encrypt and decrypt data in MaxCompute UDFs.

Overview

  1. In the encryption process, a KMS instance client is initialized and a data key is generated during UDF initialization, and the data key is used for envelope encryption during data processing. If you want to encrypt a large amount of data, we recommend that you use a secret to obtain a data key. This prevents excessive data keys from being generated.

  2. In the decryption process, a KMS instance client, a thread pool, and the cache are initialized during UDF initialization, and the data key ciphertext is decrypted during data processing. Then, data is decrypted by using the plaintext data key.

  3. The fields that store the encrypted data are of the string type. The data format is [Data key length|Fixed length 4][Data key ciphertext][Ciphertext of the data in the field].

Important

The concurrency of MaxCompute UDFs is high. We recommend that you implement envelope encryption to generate a data key from a KMS instance and use the data key to encrypt data rather than you use a key to directly encrypt data in UDFs. For more information about envelope encryption, see Use envelope encryption.

Prerequisites

  1. A KMS instance of the software key management type or a KMS instance of the hardware key management type is purchased. For more information, see Purchase and enable a KMS instance.

  2. A customer master key (CMK) is created. Envelope encryption supports only the Galois/Counter Mode (GCM) mode. Therefore, you must select Symmetric Key for the Key Type parameter when you create a CMK. For more information about key specifications and encryption modes, see Key types and specifications.

  3. An access point is created, a client key is saved, and the certification authority (CA) certificate of a KMS instance is obtained. For more information, see Create an AAP and Obtain the instance CA certificate.

Sample code

Note

If you require high performance, we recommend that you use SynchronousKeyEncryptUDF in Java.

Sample code in Java
Sample code in Python

1. Install dependencies

<dependencies>
        <dependency>
            <groupId>com.aliyun.odps</groupId>
            <artifactId>odps-sdk-udf</artifactId>
            <version>0.48.6-public</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>alibabacloud-kms-java-sdk</artifactId>
            <version>1.2.5</version>
            <exclusions>
                <exclusion>
                    <groupId>com.aliyun</groupId>
                    <artifactId>tea</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>tea</artifactId>
            <version>1.2.3</version>
        </dependency>
</dependencies>

2. Define the UDFUtils.java class

package com.aliyun.kms.sample;

import com.aliyun.tea.utils.StringUtils;

import java.nio.charset.StandardCharsets;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class UDFUtils {

    private UDFUtils() {
        // do noting
    }

    public static boolean isNull(String value) {
        return StringUtils.isEmpty(value) || "\\N".equals(value);
    }

    public static String getSynchronousKeyVersion(String udfIdentify, String keyIdentify) {
        UUID uuid = null;
        byte[] bytes = null;
        if (StringUtils.isEmpty(keyIdentify)) {
            bytes = udfIdentify.getBytes(StandardCharsets.UTF_8);
            uuid = UUID.nameUUIDFromBytes(bytes);
            return uuid.toString();
        }
        bytes = (udfIdentify + "/" + keyIdentify).getBytes(StandardCharsets.UTF_8);
        uuid = UUID.nameUUIDFromBytes(bytes);
        return uuid.toString();
    }

    public static <K, V> Map<K, V> getLRUMap(int capacity) {
        return new LRUMap<K, V>(capacity);
    }

    private static class LRUMap<K, V> extends LinkedHashMap<K, V> {
        private final int capacity;
        private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
        private final Lock readLock = readWriteLock.readLock();
        private final Lock writeLock = readWriteLock.writeLock();

        public LRUMap(int capacity) {
            super(capacity, 0.75f, true);
            this.capacity = capacity;
        }

        @Override
        public V get(Object key) {
            readLock.lock();
            try {
                return super.get(key);
            } finally {
                readLock.unlock();
            }
        }

        @Override
        public V put(K key, V value) {
            writeLock.lock();
            try {
                return super.put(key, value);
            } finally {
                writeLock.unlock();
            }
        }

        @Override
        protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
            return size() > capacity;
        }
    }

}

Method description

Method

Description

isNull

Checks whether a string is an empty string.

getSynchronousKeyVersion

Obtains the version number by combining the UDF ID and the Version input parameter of the UDF.

3. Define the UDFConstant.java class

Note

The UDFConstant.java class defines the parameter names in configuration files. For more information about parameter values, see Configure variables.

package com.aliyun.kms.sample;

public class UDFConstant {

    private UDFConstant() {
        // do nothing
    }
    public static final String KMS_UDF_CONF_NAME = "kms_udf_conf.properties";
    public static final String UDF_ENCRYPT_KEY_ID_KEY = "udf.kms.keyId";
    public static final String KMS_CLIENT_KEY_FILE_KEY = "udf.kms.clientkey.file";
    public static final String KMS_CLIENT_KEY_PASSWORD_KEY = "udf.kms.clientkey.password";
    public static final String KMS_INSTANCE_CA_FILE_KEY = "udf.kms.instance.ca.file";
    public static final String KMS_INSTANCE_ENDPOINT_KEY = "udf.kms.instance.endpoint";
    public static final String KMS_SYNCHRONOUS_SECRET_NAME_KEY = "udf.synchronous.secret.name";
    public static final String KMS_RAM_SECRET_NAME_KEY = "udf.ram.secret.name";
    public static final String KMS_ENDPOINT_KEY = "udf.kms.endpoint";
    public static final String ENCRYPT_DATA_KEY_FORMAT = "%04d";
    public static final int GCM_IV_LENGTH = 12;
    public static final int GCM_IV_BASE64_LENGTH = 16;
    public static final int GCM_TAG_LENGTH = 16;
}

Parameter description

Parameter

Description

KMS_UDF_CONF_NAME

The name of the configuration file.

UDF_ENCRYPT_KEY_ID_KEY

The ID of the key that is managed by KMS.

KMS_CLIENT_KEY_FILE_KEY

The content of the client key file.

KMS_CLIENT_KEY_PASSWORD_KEY

The password of the client key.

KMS_INSTANCE_CA_FILE_KEY

The CA certificate of the KMS instance.

KMS_INSTANCE_ENDPOINT_KEY

The endpoint of the KMS instance.

KMS_SYNCHRONOUS_SECRET_NAME_KEY

The name of the secret that is synchronized.

KMS_RAM_SECRET_NAME_KEY

The name of the Resource Access Management (RAM) secret.

KMS_ENDPOINT_KEY

The endpoint of the shared gateway.

4. Create an encryption UDF

Note
  • We recommend that you use the com.aliyun.kms.sample.SynchronousKeyEncryptUDF class as a high-performance encryption UDF for tables that contain a large amount of data.

  • If your tables contain a small amount of data and the UDF concurrency is low, you can use the com.aliyun.kms.sample.EncryptUDF class as an encryption UDF.

SynchronousKeyEncryptUDF example
EncryptUDF example

SynchronousKeyEncryptUDF is suitable for scenarios that involve a large amount of data and high concurrency. You can use ForSynchronousKeyEncryptUDF to leverage the secret management capabilities of KMS and store data keys and version information as secrets in KMS. This prevents multiple keys that have the same version, reduces the number of keys generated, and improves performance. The following figure shows the flowchart for generating a data key.

image
  1. Create a secret named xxxxSynchronousSecret

    Create a generic secret in the KMS console or by calling KMS API operations. The secret name is xxxxSynchronousSecret, and the secret value is a random value. The generic secret is used to store your data key. You can specify the udf.synchronous.secret.name parameter in the UDF to specify the secret. For more information about how to create a secret, see Manage and use generic secrets.

  2. Create a RAM secret named xxxxRamSecret

    Create a RAM secret in the KMS console or by calling KMS API operations. The value of the RAM secret is the AccessKey pair of the KMS account that is used to call the shared gateway. The RAM secret is retrieved for identity authentication when the UDF calls API operations. You can specify the udf.ram.secret.name parameter in the UDF to specify the secret. For more information about how to create a RAM secret and create an AccessKey pair for a KMS account, see Manage and use RAM secrets and Create an AccessKey pair. Example of the secret value

    {
    "AccessKeyId":"LTAI****************",
    "AccessKeySecret":"yourAccessKeySecret"
    }
  3. Define the SynchronousKeyEncryptUDF.java class

    package com.aliyun.kms.sample;
    
    import com.aliyun.kms.kms20160120.TransferClient;
    import com.aliyun.kms.kms20160120.model.KmsConfig;
    import com.aliyun.kms.kms20160120.model.KmsRuntimeOptions;
    import com.aliyun.kms20160120.Client;
    import com.aliyun.kms20160120.models.*;
    import com.aliyun.odps.udf.ExecutionContext;
    import com.aliyun.odps.udf.UDF;
    import com.aliyun.odps.udf.UDFException;
    import com.aliyun.tea.TeaException;
    import com.aliyun.tea.utils.StringUtils;
    import com.google.gson.Gson;
    import org.apache.commons.codec.binary.Base64;
    
    import javax.crypto.Cipher;
    import javax.crypto.spec.GCMParameterSpec;
    import javax.crypto.spec.SecretKeySpec;
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.Serializable;
    import java.nio.charset.StandardCharsets;
    import java.security.SecureRandom;
    import java.util.Map;
    import java.util.Properties;
    import java.util.concurrent.ConcurrentHashMap;
    
    public class SynchronousKeyEncryptUDF extends UDF {
    
        private static Gson gson = new Gson();
        private static Client instanceClient;
        private static com.aliyun.kms.kms20160120.Client shareClient;
    
        private static Map<String, DataKeyObject> dataKeyObjectMap;
        private static Base64 base64 = new Base64();
    
        private String runTaskIdentify;
        private String keySecretName;
        private String keyId;
        private Properties properties = new Properties();
    
        @Override
        public void setup(ExecutionContext ctx) throws UDFException, IOException {
            super.setup(ctx);
            try {
                InputStream in = ctx.readResourceFileAsStream(UDFConstant.KMS_UDF_CONF_NAME);
                properties.load(in);
                // The dimension for generating a data key. If a data key is generated by project, you must use the getRunningProject method. If a data key is generated by table, you must use the getTableInfo method. 
                runTaskIdentify = ctx.getRunningProject();
                dataKeyObjectMap = new ConcurrentHashMap<>();
                keySecretName = properties.getProperty(UDFConstant.KMS_SYNCHRONOUS_SECRET_NAME_KEY);
                keyId = properties.getProperty(UDFConstant.UDF_ENCRYPT_KEY_ID_KEY);
    
                if (StringUtils.isEmpty(keySecretName)) {
                    throw new UDFException("the secret name is null");
                }
                buildInstanceClient(ctx);
                checkSecretExist();
                buildShareClient();
            } catch (Exception e) {
                throw new UDFException(e);
            }
        }
    
        public String evaluate(String keyIdentify, String data) {
            if (UDFUtils.isNull(data)) {
                return data;
            }
            byte[] dataBytes = data.getBytes(StandardCharsets.UTF_8);
            byte[] iv = null;
            byte[] cipherTextBytes = null;
            try {
                String keyVersion = UDFUtils.getSynchronousKeyVersion(runTaskIdentify, keyIdentify);
                if (!dataKeyObjectMap.containsKey(keyVersion)) {
                    dataKeyObjectMap.put(keyVersion, getDataKeyObject(keyVersion));
                }
                DataKeyObject dataKeyObject = dataKeyObjectMap.get(keyVersion);
                iv = new byte[UDFConstant.GCM_IV_LENGTH];
                SecureRandom random = new SecureRandom();
                random.nextBytes(iv);
                Cipher cipher = Cipher.getInstance("AES/GCM/NoPadding");
                SecretKeySpec keySpec = new SecretKeySpec(dataKeyObject.plaintextBytes, "AES");
                GCMParameterSpec gcmParameterSpec = new GCMParameterSpec(UDFConstant.GCM_TAG_LENGTH * 8, iv);
                cipher.init(Cipher.ENCRYPT_MODE, keySpec, gcmParameterSpec);
                cipherTextBytes = cipher.doFinal(dataBytes);
                return dataKeyObject.encryptedDataKeyPart + base64.encodeAsString(iv) + base64.encodeAsString(cipherTextBytes);
            } catch (Exception e) {
                e.printStackTrace();
                throw new RuntimeException(e);
            }
        }
    
        private DataKeyObject getDataKeyObject(String keyVersion) throws Exception {
            try {
                return getDataKeyObjectByGetSecretValue(keyVersion);
            } catch (TeaException e) {
                if ("Forbidden.ResourceNotFound".equals(e.code)) {
                    return buildDataKeyObject(keyVersion);
                }
            }
            return null;
        }
    
        private DataKeyObject getDataKeyObjectByGetSecretValue(String keyVersion) throws Exception {
            String secretData = getSecretValue(keyVersion);
            DataKeyPersistentObject dataKeyPersistentObject = gson.fromJson(secretData, DataKeyPersistentObject.class);
            byte[] plaintextBytes = base64.decode(dataKeyPersistentObject.plaintext);
            return new DataKeyObject(plaintextBytes, dataKeyPersistentObject.encryptedDataKeyPart);
        }
    
        private DataKeyObject buildDataKeyObject(String keyVersion) throws Exception {
            GenerateDataKeyRequest request = new GenerateDataKeyRequest();
            request.setKeyId(keyId);
            KmsRuntimeOptions runtimeOptions = new KmsRuntimeOptions();
            GenerateDataKeyResponse response = instanceClient.generateDataKeyWithOptions(request, runtimeOptions);
            String plaintext = response.getBody().plaintext;
            byte[] plaintextBytes = base64.decode(plaintext);
            String encryptedDataKeyPart = String.format(UDFConstant.ENCRYPT_DATA_KEY_FORMAT, response.getBody().ciphertextBlob.length()) + response.getBody().ciphertextBlob;
    
            DataKeyObject dataKeyObject = new DataKeyObject(plaintextBytes, encryptedDataKeyPart);
            DataKeyPersistentObject dataKeyPersistentObject = new DataKeyPersistentObject(plaintext, encryptedDataKeyPart);
            PutSecretValueRequest putSecretValueRequest = new PutSecretValueRequest();
            putSecretValueRequest.secretData = dataKeyPersistentObject.toJsonString();
            putSecretValueRequest.secretName = keySecretName;
            putSecretValueRequest.versionId = keyVersion;
    
            try {
                shareClient.putSecretValue(putSecretValueRequest);
            } catch (TeaException e) {
                if ("Rejected.ResourceExist".equals(e.code)) {
                    return getDataKeyObjectByGetSecretValue(keyVersion);
                }
            }
            return dataKeyObject;
        }
    
        @Override
        public void close() throws UDFException, IOException {
            super.close();
        }
    
        private void buildInstanceClient(ExecutionContext ctx) throws Exception {
            try {
                KmsConfig config = new KmsConfig();
                String clientKeyFileName = properties.getProperty(UDFConstant.KMS_CLIENT_KEY_FILE_KEY);
                byte[] clientKeyFileContent = ctx.readResourceFile(clientKeyFileName);
                config.setClientKeyContent(new String(clientKeyFileContent, StandardCharsets.UTF_8));
                String caFileName = properties.getProperty(UDFConstant.KMS_INSTANCE_CA_FILE_KEY);
                byte[] caFileContent = ctx.readResourceFile(caFileName);
                config.setCa(new String(caFileContent, StandardCharsets.UTF_8));
                config.setPassword(properties.getProperty(UDFConstant.KMS_CLIENT_KEY_PASSWORD_KEY));
                config.setEndpoint(properties.getProperty(UDFConstant.KMS_INSTANCE_ENDPOINT_KEY));
                instanceClient = new TransferClient(config);
            } catch (Exception e) {
                throw new UDFException(e);
            }
        }
    
        private void checkSecretExist() throws Exception {
            getSecretValue(null);
        }
    
        private String getSecretValue(String versionId) throws Exception {
            GetSecretValueRequest getSecretValueRequest = new GetSecretValueRequest();
            getSecretValueRequest.setSecretName(keySecretName);
            if (!StringUtils.isEmpty(versionId)) {
                getSecretValueRequest.setVersionId(versionId);
            }
            KmsRuntimeOptions runtimeOptions = new KmsRuntimeOptions();
            GetSecretValueResponse getSecretValueResponse = instanceClient.getSecretValueWithOptions(getSecretValueRequest, runtimeOptions);
            return getSecretValueResponse.body.secretData;
        }
    
        private void buildShareClient() throws Exception {
            String ramSecretName = properties.getProperty(UDFConstant.KMS_RAM_SECRET_NAME_KEY);
            GetSecretValueRequest getSecretValueRequest = new GetSecretValueRequest();
            getSecretValueRequest.setSecretName(ramSecretName);
            KmsRuntimeOptions runtimeOptions = new KmsRuntimeOptions();
            GetSecretValueResponse getSecretValueResponse = instanceClient.getSecretValueWithOptions(getSecretValueRequest, runtimeOptions);
            AccessKeyObject accessKeyObject = gson.fromJson(getSecretValueResponse.body.secretData, AccessKeyObject.class);
            com.aliyun.teaopenapi.models.Config shareConfig = new com.aliyun.teaopenapi.models.Config()
                    .setAccessKeyId(accessKeyObject.AccessKeyId)
                    .setAccessKeySecret(accessKeyObject.AccessKeySecret)
                    .setEndpoint(properties.getProperty(UDFConstant.KMS_ENDPOINT_KEY));
            shareClient = new com.aliyun.kms.kms20160120.Client(shareConfig);
        }
    
    
        private static class AccessKeyObject implements Serializable {
            private String AccessKeyId;
            private String AccessKeySecret;
        }
    
        private final static class DataKeyObject {
            private byte[] plaintextBytes;
            private String encryptedDataKeyPart;
    
            public DataKeyObject() {
            }
    
            public DataKeyObject(byte[] plaintextBytes, String encryptedDataKeyPart) {
                this.plaintextBytes = plaintextBytes;
                this.encryptedDataKeyPart = encryptedDataKeyPart;
            }
        }
    
        private final static class DataKeyPersistentObject implements Serializable {
            private String plaintext;
            private String encryptedDataKeyPart;
    
            public DataKeyPersistentObject() {
            }
    
            public DataKeyPersistentObject(String plaintext, String encryptedDataKeyPart) {
                this.plaintext = plaintext;
                this.encryptedDataKeyPart = encryptedDataKeyPart;
            }
    
            public String toJsonString() {
                return gson.toJson(this);
            }
        }
    
    }
    

    Main method description

    Method

    Description

    setup

    Reads the configuration file, initializes KMS Instance SDK and Alibaba Cloud SDK, and checks whether the initial secret exists.

    buildInstanceClient

    Initializes KMS Instance SDK.

    buildShareClient

    Obtains the AccessKey pair that is stored in the RAM secret and initializes Alibaba Cloud SDK.

    getDataKeyObjectByGetSecretValue

    Obtains the key that is stored in the secret.

    buildDataKeyObject

    Generates a new key and stores the key in the secret.

    evaluate

    Encrypts data and returns the encrypted data.

    Note

    The following API operations of KMS are used.

    • KMS Instance API operations: The GenerateDataKey operation is called to generate a data key and the GetSecretValue operation to retrieve a secret value.

    • KMS API operations: The PutSecretValue operation is called to store a secret value.

    For more information about API operations of KMS and how to call the API operations, see API references and SDK references.

Define the EncryptUDF.java class

package com.aliyun.kms.sample;

import com.aliyun.kms.kms20160120.TransferClient;
import com.aliyun.kms.kms20160120.model.KmsConfig;
import com.aliyun.kms.kms20160120.model.KmsRuntimeOptions;
import com.aliyun.kms20160120.Client;
import com.aliyun.kms20160120.models.GenerateDataKeyRequest;
import com.aliyun.kms20160120.models.GenerateDataKeyResponse;
import com.aliyun.odps.udf.ExecutionContext;
import com.aliyun.odps.udf.UDF;
import com.aliyun.odps.udf.UDFException;
import org.apache.commons.codec.binary.Base64;

import javax.crypto.Cipher;
import javax.crypto.spec.GCMParameterSpec;
import javax.crypto.spec.SecretKeySpec;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.security.SecureRandom;
import java.util.Properties;

public class EncryptUDF extends UDF {

    private static Client client;
    private static String plaintext;
    private static byte[] plaintextBytes;
    private static String encryptedDataKeyPart;

    private static Base64 base64 = new Base64();

    @Override
    public void setup(ExecutionContext ctx) throws UDFException, IOException {
        super.setup(ctx);
        try {
            InputStream in = ctx.readResourceFileAsStream(UDFConstant.KMS_UDF_CONF_NAME);
            Properties properties = new Properties();
            properties.load(in);
            KmsConfig config = new KmsConfig();
            String clientKeyFileName = properties.getProperty(UDFConstant.KMS_CLIENT_KEY_FILE_KEY);
            byte[] clientKeyFileContent = ctx.readResourceFile(clientKeyFileName);
            config.setClientKeyContent(new String(clientKeyFileContent, StandardCharsets.UTF_8));
            String caFileName = properties.getProperty(UDFConstant.KMS_INSTANCE_CA_FILE_KEY);
            byte[] caFileContent = ctx.readResourceFile(caFileName);
            config.setCa(new String(caFileContent, StandardCharsets.UTF_8));
            config.setPassword(properties.getProperty(UDFConstant.KMS_CLIENT_KEY_PASSWORD_KEY));
            config.setEndpoint(properties.getProperty(UDFConstant.KMS_INSTANCE_ENDPOINT_KEY));
            String keyId = properties.getProperty(UDFConstant.UDF_ENCRYPT_KEY_ID_KEY);
            client = new TransferClient(config);
            GenerateDataKeyRequest request = new GenerateDataKeyRequest();
            request.setKeyId(keyId);
            KmsRuntimeOptions runtimeOptions = new KmsRuntimeOptions();
            GenerateDataKeyResponse response = client.generateDataKeyWithOptions(request, runtimeOptions);
            plaintext = response.getBody().plaintext;
            plaintextBytes = base64.decode(plaintext);
            encryptedDataKeyPart = String.format(UDFConstant.ENCRYPT_DATA_KEY_FORMAT, response.getBody().ciphertextBlob.length()) + response.getBody().ciphertextBlob;
        } catch (Exception e) {
            throw new UDFException(e);
        }
    }

    public String evaluate(String data) {
        if (UDFUtils.isNull(data)) {
            return data;
        }
        byte[] dataBytes = data.getBytes(StandardCharsets.UTF_8);
        byte[] iv = null;
        byte[] cipherTextBytes = null;
        try {
            iv = new byte[UDFConstant.GCM_IV_LENGTH];
            SecureRandom random = new SecureRandom();
            random.nextBytes(iv);
            Cipher cipher = Cipher.getInstance("AES/GCM/NoPadding");
            SecretKeySpec keySpec = new SecretKeySpec(plaintextBytes, "AES");
            GCMParameterSpec gcmParameterSpec = new GCMParameterSpec(UDFConstant.GCM_TAG_LENGTH * 8, iv);
            cipher.init(Cipher.ENCRYPT_MODE, keySpec, gcmParameterSpec);
            cipherTextBytes = cipher.doFinal(dataBytes);
            return encryptedDataKeyPart + base64.encodeAsString(iv) + base64.encodeAsString(cipherTextBytes);
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }

    @Override
    public void close() throws UDFException, IOException {
        super.close();
    }

}

Main method description

Method

Description

setup

Reads the configuration file, initializes KMS Instance SDK, and generates the data key ciphertext.

evaluate

Encrypts data and returns the encrypted data.

Note

The following API operations of KMS are used.

  • KMS Instance API: The GenerateDataKey operation is called to generate a data key.

For more information about API operations of KMS and how to call the API operations, see API references and SDK references.

5. Create a decryption UDF

Define the DecryptUDF.java class

package com.aliyun.kms.sample;

import com.aliyun.dkms.gcs.openapi.models.Config;
import com.aliyun.kms.kms20160120.TransferClient;
import com.aliyun.kms.kms20160120.model.KmsConfig;
import com.aliyun.kms.kms20160120.model.KmsRuntimeOptions;
import com.aliyun.kms20160120.Client;
import com.aliyun.kms20160120.models.DecryptRequest;
import com.aliyun.kms20160120.models.DecryptResponse;
import com.aliyun.odps.udf.ExecutionContext;
import com.aliyun.odps.udf.UDF;
import com.aliyun.odps.udf.UDFException;
import org.apache.commons.codec.binary.Base64;

import javax.crypto.Cipher;
import javax.crypto.spec.GCMParameterSpec;
import javax.crypto.spec.SecretKeySpec;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Properties;

public class DecryptUDF extends UDF {

    private static Client client;
    private static Map<String, byte[]> dataKeyMap;
    private static final int BASE_STEP=4;

    private static Base64 base64 = new Base64();

    @Override
    public void setup(ExecutionContext ctx) throws UDFException, IOException {
        super.setup(ctx);
        try {
            InputStream in = ctx.readResourceFileAsStream(UDFConstant.KMS_UDF_CONF_NAME);
            Properties properties = new Properties();
            properties.load(in);
            KmsConfig config = new KmsConfig();
            String clientKeyFileName = properties.getProperty(UDFConstant.KMS_CLIENT_KEY_FILE_KEY);
            byte[] clientKeyFileContent = ctx.readResourceFile(clientKeyFileName);
            config.setClientKeyContent(new String(clientKeyFileContent, StandardCharsets.UTF_8));
            String caFileName = properties.getProperty(UDFConstant.KMS_INSTANCE_CA_FILE_KEY);
            byte[] caFileContent = ctx.readResourceFile(caFileName);
            config.setCa(new String(caFileContent,StandardCharsets.UTF_8));
            config.setPassword(properties.getProperty(UDFConstant.KMS_CLIENT_KEY_PASSWORD_KEY));
            config.setEndpoint(properties.getProperty(UDFConstant.KMS_INSTANCE_ENDPOINT_KEY));
            dataKeyMap = UDFUtils.getLRUMap(1000);
            client = new TransferClient(config);
        } catch (Exception e) {
            throw new UDFException(e);
        }
    }

    public String evaluate(String data) {
        if (UDFUtils.isNull(data)) {
            return data;
        }
        if (data.length() <= BASE_STEP) {
            return data;
        }
        int dataLength = Integer.valueOf(data.substring(0, 4));
        if (data.length() <= BASE_STEP + dataLength) {
            return data;
        }
        String dataKeyCiphertext = data.substring(4, 4 + dataLength);
        try {

            byte[] dataKeyBytes = getDataKeyPlaintext(dataKeyCiphertext);
            Cipher cipher = Cipher.getInstance("AES/GCM/NoPadding");
            SecretKeySpec keySpec = new SecretKeySpec(dataKeyBytes, "AES");
            byte[] iv = base64.decode(data.substring(4 + dataLength, 4 + dataLength + UDFConstant.GCM_IV_BASE64_LENGTH));
            byte[] cipherText = base64.decode(data.substring(4 + dataLength + UDFConstant.GCM_IV_BASE64_LENGTH));
            GCMParameterSpec gcmParameterSpec = new GCMParameterSpec(UDFConstant.GCM_TAG_LENGTH * 8, iv);
            cipher.init(Cipher.DECRYPT_MODE, keySpec, gcmParameterSpec);
            byte[] decryptedData = cipher.doFinal(cipherText);
            return new String(decryptedData, "UTF-8");

        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }

    private byte[] getDataKeyPlaintext(String dataKeyCiphertext) throws Exception {
        if (!dataKeyMap.containsKey(dataKeyCiphertext)) {
            synchronized (dataKeyCiphertext.intern()) {
                DecryptRequest request = new DecryptRequest();
                request.ciphertextBlob = dataKeyCiphertext;
                KmsRuntimeOptions runtimeOptions = new KmsRuntimeOptions();
                DecryptResponse response = client.decryptWithOptions(request, runtimeOptions);
                if (!dataKeyMap.containsKey(dataKeyCiphertext)) {
                    dataKeyMap.put(dataKeyCiphertext, base64.decode(response.getBody().plaintext));
                }
            }
        }
        return dataKeyMap.get(dataKeyCiphertext);
    }


    @Override
    public void close() throws UDFException, IOException {
        super.close();
    }
}

Main method description

Method

Description

setup

Reads the configuration file and initializes KMS Instance SDK.

getDataKeyPlaintext

Obtains the plaintext data key.

evaluate

Uses the plaintext data key to decrypt data and returns the decrypted data.

Note

The following API operations of KMS are used.

  • KMS Instance API: The Decrypt operation is called to obtain the plaintext data key.

For more information about API operations of KMS and how to call the API operations, see API references and SDK references.

6. Create an SQL UDF

Important
  • Before you create an SQL UDF, you must add the JAR package of the UDF implementation class to your MaxCompute project.

  • In this example, the encrypt_udf, sync_key_encrypt_udf, and decrypt_udf functions are created. You can delete UDFs based on your business requirements. To delete a UDF, you need to only delete the JAR package and CREATE statements.

The following statements are used to create a UDF:

# Add the JAR package of the UDF implementation class to the MaxCompute project.
ADD JAR encrypt_udf.jar;
ADD JAR sync_key_encrypt_udf.jar;
ADD JAR decrypt_udf.jar;
# Add the configuration file to the MaxCompute project.
ADD FILE kms_udf_conf.properties;
# Add the client key file of the KMS instance to the MaxCompute project.
ADD FILE clientkey_xxxx.json;
# Add the CA certificate file of the KMS instance to the MaxCompute project.
ADD FILE PrivateKmsCA_xxxx.pem;
# Create an encryption UDF named encrypt_udf.
CREATE FUNCTION encrypt_udf AS 'com.aliyun.kms.sample.EncryptUDF'
USING 'encrypt_udf.jar,kms_udf_conf.properties,clientkey_xxxx.json,PrivateKmsCA_xxxx.pem';
# Create a high-performance encryption UDF named sync_key_encrypt_udf.
CREATE FUNCTION sync_key_encrypt_udf AS 'com.aliyun.kms.sample.SynchronousKeyEncryptUDF'
USING 'sync_key_encrypt_udf.jar,kms_udf_conf.properties,clientkey_xxxx.json,PrivateKmsCA_xxxx.pem';
# Create a decryption UDF named decrypt_udf. 
CREATE FUNCTION decrypt_udf AS 'com.aliyun.kms.sample.DecryptUDF'
USING 'decrypt_udf.jar,kms_udf_conf.properties,clientkey_xxxx.json,PrivateKmsCA_xxxx.pem';

Parameter description

Implementation class

JAR package

UDF

Description

EncryptUDF.java

encrypt_udf.jar

encrypt_udf

Encryption UDF

SynchronousKeyEncryptUDF.java

sync_key_encrypt_udf.jar

sync_key_encrypt_udf

High-performance encryption UDF

DecryptUDF.java

decrypt_udf.jar

decrypt_udf

Decryption UDF

7. Configure variables

The variables required by the UDF implementation class are stored in the kms_udf_conf.properties configuration file. Secret-related configurations are added to the configuration file for the high-performance UDF.

Configuration file for the high-performance UDF
Regular configuration file
# Configure variables.
# Specify the key ID.
udf.kms.keyId=key-xxxxxxxxxxxx
# Specify the name of the client key file.
udf.kms.clientkey.file=clientkey_xxxx.json
# Specify the password of the client key.
udf.kms.clientkey.password=xxxxxxxx
# Specify the CA certificate of the KMS instance.
udf.kms.instance.ca.file=PrivateKmsCA_xxxx.pem
# Specify the endpoint of the KMS instance.
udf.kms.instance.endpoint=xxxxx.cryptoservice.kms.aliyuncs.com
# Specify the name of the secret that is synchronized.
udf.synchronous.secret.name=xxxxSynchronousSecretName
# Specify the name of the RAM secret.
udf.ram.secret.name=xxxxRamSecret
# Specify the endpoint of the shared gateway.
udf.kms.endpoint=kms-vpc.cn-xxx.aliyuncs.com
Note

The secret-related configurations, such as the name of the secret that is synchronized, the name of the RAM secret, and the endpoint of the shared gateway, are added.

# Configure variables.
# Specify the key ID.
udf.kms.keyId=key-xxxxxxxxxxxx
# Specify the name of the client key file.
udf.kms.clientkey.file=clientkey_xxxx.json
# Specify the password of the client key.
udf.kms.clientkey.password=xxxxxxxx
# Specify the CA certificate of the KMS instance.
udf.kms.instance.ca.file=PrivateKmsCA_xxxx.pem
# Specify the endpoint of the KMS instance.
udf.kms.instance.endpoint=xxxxx.cryptoservice.kms.aliyuncs.com

8. Use UDFs

-- Use a UDF to encrypt data.
SELECT encrypt_udf(column_name) FROM my_table;
-- Use a UDF to encrypt data. The value of version is the secret version, and the default value of version is null.
SELECT sync_key_encrypt_udf(version,column_name) FROM my_table;
-- Use a UDF to decrypt data.
SELECT decrypt_udf(column_name) FROM my_table;

1. Install requirements.txt-related dependencies

pyodps==0.11.6.2
alibabacloud-kms-python-sdk==1.1.3

2. Define the kms_udf_utils.py class

# coding=utf-8
import uuid
from collections import OrderedDict


def is_null(value):
    return len(value) == 0 or value == '\\N'


def generate_uuid(key_version):
    return uuid.uuid5(uuid.NAMESPACE_DNS, key_version)


def get_config_value(config_dict, dict_key):
    if dict_key not in config_dict:
        raise ValueError("can not find key[{}]" % dict_key)
    return config_dict[dict_key]


def properties_to_dict(cache_file):
    config_dict = {}
    for line in cache_file:
        line = line.strip()
        if not line:
            continue
        k, v = line.split("=")
        config_dict[k.strip()] = v.strip()
    return config_dict


def object_to_dict(obj):
    return obj.__dict__


class LRUCache(object):
    def __init__(self, capacity):
        self.capacity = capacity
        self.cache = OrderedDict()

    def get(self, key):
        if key not in self.cache:
            return -1
        value = self.cache.pop(key)  # Remove the item to update the order
        self.cache[key] = value  # Re-insert it to the end
        return value

    def put(self, key, value):
        if key in self.cache:
            self.cache.pop(key)  # Remove the existing entry to update the order
        elif len(self.cache) == self.capacity:
            self.cache.popitem(last=False)  # Remove the least recently used item
        self.cache[key] = value

    def contains_key(self, key):
        return key in self.cache

3. Define the kms_udf_constant.py class

# coding=utf-8
UDF_CONFIG_CACHE_FILE = "kms_udf_conf.properties"
KMS_CLIENT_KEY_FILE_KEY = "udf.kms.clientkey.file"
KMS_CLIENT_KEY_PASSWORD_KEY = "udf.kms.clientkey.password"
KMS_INSTANCE_ENDPOINT_KEY = "udf.kms.instance.endpoint"
KMS_INSTANCE_CA_FILE_KEY = "udf.kms.instance.ca.file"
KMS_SYNCHRONOUS_SECRET_NAME_KEY = "udf.synchronous.secret.name"
KMS_RAM_SECRET_NAME_KEY = "udf.ram.secret.name"
KMS_REGION_KEY = "udf.kms.regionId"
UDF_ENCRYPT_KEY_ID_KEY = "udf.kms.keyId"
ENCRYPT_DATA_KEY_FORMAT = "%04d"
GCM_IV_LENGTH = 12
GCM_TAG_LENGTH = 16
DEFAULT_NUMBER_OF_BYTES = 32

Parameter description

Parameter

Description

UDF_CONFIG_CACHE_FILE

The name of the configuration file.

KMS_CLIENT_KEY_FILE_KEY

The content of the client key file.

KMS_CLIENT_KEY_PASSWORD_KEY

The password of the client key.

KMS_INSTANCE_ENDPOINT_KEY

The endpoint of the KMS instance.

KMS_INSTANCE_CA_FILE_KEY

The CA certificate of the KMS instance.

KMS_SYNCHRONOUS_SECRET_NAME_KEY

The name of the secret that is synchronized.

KMS_RAM_SECRET_NAME_KEY

The name of the RAM secret.

KMS_ENDPOINT_KEY

The endpoint of the shared gateway.

UDF_ENCRYPT_KEY_ID_KEY

The ID of the key that is managed by KMS.

4. Create an encryption UDF

Define the encrypt_udf.py class

# coding=utf-8
import base64

from alibabacloud_kms20160120.models import GenerateDataKeyRequest
from alibabacloud_kms_kms20160120.models import KmsConfig, KmsRuntimeOptions
from alibabacloud_kms_kms20160120.transfer_client import TransferClient
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from odps.distcache import get_cache_file
import random
import string
import kms_udf_constant
import kms_udf_utils


class EncryptUDF(object):

    def __init__(self):
        cache_file = get_cache_file(kms_udf_constant.UDF_CONFIG_CACHE_FILE)
        self.config_dict = kms_udf_utils.properties_to_dict(cache_file)
        cache_file.close()
        kms_config = KmsConfig(
            protocol='https',
            client_key_file=kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.KMS_CLIENT_KEY_FILE_KEY),
            password=kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.KMS_CLIENT_KEY_PASSWORD_KEY),
            endpoint=kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.KMS_INSTANCE_ENDPOINT_KEY),
        )
        self.ca_file_path = kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.KMS_INSTANCE_CA_FILE_KEY)
        self.keyId = kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.UDF_ENCRYPT_KEY_ID_KEY)
        self.instance_client = TransferClient(kms_config=kms_config)
        resp = self.generate_data_key()
        self.encrypted_data_key_part = (kms_udf_constant.ENCRYPT_DATA_KEY_FORMAT %
                                        len(resp.body.ciphertext_blob) + resp.body.ciphertext_blob)
        self.plaintext = resp.body.plaintext
        self.ciphertext_blob = resp.body.ciphertext_blob

    def evaluate(self, data):
        if kms_udf_utils.is_null(data):
            return data
        iv = bytes(''.join(random.sample(string.ascii_letters + string.digits, kms_udf_constant.GCM_IV_LENGTH)),
                   encoding="utf-8")
        data_bytes = data.encode("utf-8")
        encryptor = Cipher(
            algorithms.AES(base64.b64decode(self.plaintext)),
            modes.GCM(iv),
        ).encryptor()
        ciphertext = encryptor.update(data_bytes) + encryptor.finalize()
        return self.encrypted_data_key_part + str(base64.b64encode(iv), "utf-8") + str(
            base64.b64encode(ciphertext + encryptor.tag), "utf-8")

    def generate_data_key(self):
        request = GenerateDataKeyRequest()
        request.key_id = self.keyId
        request.number_of_bytes = kms_udf_constant.DEFAULT_NUMBER_OF_BYTES
        runtime = KmsRuntimeOptions(
            ca=self.ca_file_path
        )
        return self.instance_client.generate_data_key_with_options(request, runtime)

Note

The following API operations of KMS are used.

  • KMS Instance API: The GenerateDataKey operation is called to generate a data key.

For more information about API operations of KMS and how to call the API operations, see API references and SDK references.

5. Create a decryption UDF

Define the decrypt_udf.py class

# coding=utf-8
import base64

from alibabacloud_kms20160120 import models as kms_20160120_models
from alibabacloud_kms_kms20160120.models import KmsConfig, KmsRuntimeOptions
from alibabacloud_kms_kms20160120.transfer_client import TransferClient
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from odps.distcache import get_cache_file

import kms_udf_constant
import kms_udf_utils
from kms_udf_utils import LRUCache


class DecryptUDF(object):

    def __init__(self):
        cache_file = get_cache_file(kms_udf_constant.UDF_CONFIG_CACHE_FILE)
        self.config_dict = kms_udf_utils.properties_to_dict(cache_file)
        cache_file.close()
        kms_config = KmsConfig(
            protocol='https',
            client_key_file=kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.KMS_CLIENT_KEY_FILE_KEY),
            password=kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.KMS_CLIENT_KEY_PASSWORD_KEY),
            endpoint=kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.KMS_INSTANCE_ENDPOINT_KEY),
        )
        self.ca_file_path = kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.KMS_INSTANCE_CA_FILE_KEY)
        self.keyId = kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.UDF_ENCRYPT_KEY_ID_KEY)
        self.instance_client = TransferClient(kms_config=kms_config)
        self.data_key_cache = LRUCache(1000)

    def evaluate(self, data):
        if kms_udf_utils.is_null(data):
            return data
        if len(data) < 4:
            return data
        data_key_len = int(data[0:4])
        if len(data) < 4 + data_key_len:
            return data
        data_key_ciphertext = data[4:4 + data_key_len]
        if not self.data_key_cache.contains_key(data_key_ciphertext):
            decrypt_request = kms_20160120_models.DecryptRequest(
                ciphertext_blob=data_key_ciphertext
            )
            decrypt_runtime = KmsRuntimeOptions(
                ca=self.ca_file_path
            )
            decrypt_response = self.instance_client.decrypt_with_options(decrypt_request, decrypt_runtime)
            self.data_key_cache.put(data_key_ciphertext, base64.b64decode(decrypt_response.body.plaintext))
        data_key = self.data_key_cache.get(data_key_ciphertext)
        envelope_cipher_bytes = base64.b64decode(data[4 + data_key_len:])
        iv = envelope_cipher_bytes[0:kms_udf_constant.GCM_IV_LENGTH]
        cipher_bytes = envelope_cipher_bytes[
                   kms_udf_constant.GCM_IV_LENGTH:len(envelope_cipher_bytes) - kms_udf_constant.GCM_TAG_LENGTH]
        tag = envelope_cipher_bytes[len(envelope_cipher_bytes) - kms_udf_constant.GCM_TAG_LENGTH:]
        decryptor = Cipher(algorithms.AES(data_key), modes.GCM(iv, tag)).decryptor()
        return str(decryptor.update(cipher_bytes) + decryptor.finalize())
Note

The following API operations of KMS are used.

  • KMS Instance API: The Decrypt operation is called to obtain the plaintext data key.

For more information about API operations of KMS and how to call the API operations, see API references and SDK references.

6. Create an SQL UDF

Important

In this example, the encrypt_udf and decrypt_udf functions are created. You can delete UDFs based on your business requirements. To delete a UDF, you need to only delete the JAR package and CREATE statements.

# Add the preceding program and requirements.txt-related dependencies to the resource.
ADD PY decrypt_udf.py;
ADD PY encrypt_udf.py;
# Add the configuration file to the MaxCompute project.
ADD FILE kms_udf_conf.properties;
# Add the client key file of the KMS instance to the MaxCompute project.
ADD FILE clientkey_xxxx.json;
# Add the CA certificate file of the KMS instance to the MaxCompute project.
ADD FILE PrivateKmsCA_xxxx.pem;
# Create an encryption UDF.
CREATE FUNCTION encrypt_udf AS 'com.aliyun.kms.sample.EncryptUDF'
USING 'encrypt_udf.py,kms_udf_conf.properties,clientkey_xxxx.json,PrivateKmsCA_xxxx.pem';
# Create a decryption UDF.
CREATE FUNCTION decrypt_udf AS 'com.aliyun.kms.sample.DecryptUDF'
USING 'decrypt_udf.py,kms_udf_conf.properties,clientkey_xxxx.json,PrivateKmsCA_xxxx.pem';

Parameter description

Dependency name

UDF

Description

encrypt_udf.py

encrypt_udf

Encryption UDF

decrypt_udf.py

decrypt_udf

Decryption UDF

7. Configure variables

The variables required by the UDF implementation class are stored in the kms_udf_conf.properties configuration file.

# Configure variables.
# Specify the key ID.
udf.kms.keyId=key-xxxxxxxxxxxx
# Specify the name of the client key file.
udf.kms.clientkey.file=clientkey_xxxx.json
# Specify the password of the client key.
udf.kms.clientkey.password=xxxxxxxx
# Specify the CA certificate of the KMS instance.
udf.kms.instance.ca.file=PrivateKmsCA_xxxx.pem
# Specify the endpoint of the KMS instance.
udf.kms.instance.endpoint=xxxxx.cryptoservice.kms.aliyuncs.com

8. Use UDFs

-- Use a UDF to encrypt data.
SELECT encrypt_udf(column_name) FROM my_table;
-- Use a UDF to decrypt data.
SELECT decrypt_udf(column_name) FROM my_table;

References

  • On this page (1)
  • Overview
  • Prerequisites
  • Sample code
  • References
Feedback
phone Contact Us

Chat now with Alibaba Cloud Customer Service to assist you in finding the right products and services to meet your needs.

alicare alicarealicarealicare