You can use Function Compute to rotate generic secrets to improve the security of sensitive information. This topic describes how to use Function Compute to rotate generic secrets.
Background information
Function Compute is a fully managed, event-driven computing service. Function Compute allows you to focus on coding without the need to obtain or manage infrastructure resources such as servers. This way, you need to only write and upload your code. Function Compute allocates computing resources, runs tasks in an elastic and reliable manner, and provides features such as log query, performance monitoring, and alerting. For more information, see What is Function Compute?.
Billing
If you use Function Compute to rotate generic secrets, you are not charged for secret rotation that is provided by Key Management Service (KMS). You are charged for Function Compute and Serverless Workflow that is integrated into Function Compute. For more information about billing, see Billing of Function Compute and Billing of Serverless Workflow.
Secrets Manager provides dynamic secrets that are automatically rotated. If dynamic secrets can meet your requirements for security, we recommend that you use dynamic secrets to reduce costs. For more information, see Overview of Dynamic ApsaraDB RDS secrets,Overview of Dynamic RAM secrets, or Overview of Dynamic ECS secrets.
Secret rotation process
This topic describes how to rotate a generic secret in the following scenarios:
Rotate a generic secret of an ApsaraDB RDS database in dual-account mode by using ApsaraDB RDS API
Rotate generic secrets of a self-managed MySQL database in dual-account mode by using a privileged account
Rotate a generic secret of an ApsaraDB RDS database in dual-account mode by using ApsaraDB RDS API
You can use Serverless Workflow and Function Compute to rotate a generic secret.
1. Prepare for secret rotation
Activate Function Compute. For more information, see Activate Function Compute.
Create an account that is used to log on to your ApsaraDB RDS database. Then, create a generic secret for the account in the KMS console. If the generic secret is rotated for the first time, a new account is generated.
The secret value is in the JSON format. Example: The secret name is used as the scheduling parameter that will be passed to Serverless Workflow.
{ "AccountName": "", "AccountPassword": "" }
NoteAccountName specifies the username of the account that is used to log on to your ApsaraDB RDS database, and AccountPassword specifies the password of the account that is used to log on to your ApsaraDB RDS database.
Create a normal service role for Function Compute and grant permissions to allow Function Compute to access KMS. For more information, see Grant Function Compute permissions to access other Alibaba Cloud services.
Attach the AliyunFCDefaultRolePolicy policy to the normal service role of Function Compute.
Attach the AliyunSTSAssumeRoleAccess system policy to the normal service role.
Attach a custom policy that allows access to KMS to the normal service role. The following script shows the content of the policy:
{ "Version": "1", "Statement": [ { "Effect": "Allow", "Action": [ "kms:GetSecretValue", "kms:GetRandomPassword", "kms:PutSecretValue", "kms:UpdateSecretVersionStage" ], "Resource": "*" }, { "Effect": "Allow", "Action": [ "rds:GrantAccountPrivilege", "rds:DescribeAccounts", "rds:ResetAccountPassword", "rds:CreateAccount" ], "Resource": "*" } ] }
2. Create secret rotation functions
You must create a Function Compute service and secret rotation functions in the Function Compute console. For more information, see Quickly create a function.
Create a Function Compute service.
When you create a Function Compute service, you must click Show Advanced Options and set Access to VPC to Yes. Then, configure VPC, vSwitch, and Security Group. Make sure that the Function Compute service can access ApsaraDB RDS API and KMS by using the specified security group and vSwitch in the specified VPC.
Create functions.
In this example, Python is used. When you create the functions, select Python 3.9 for Runtime Environments. The following code provides an example. You can modify the code based on your business requirements.
# -*- coding: utf-8 -*- import json import logging import os from aliyunsdkrds.request.v20140815.CreateAccountRequest import CreateAccountRequest from aliyunsdkrds.request.v20140815.DescribeAccountsRequest import DescribeAccountsRequest from aliyunsdkrds.request.v20140815.GrantAccountPrivilegeRequest import GrantAccountPrivilegeRequest from aliyunsdkrds.request.v20140815.ResetAccountPasswordRequest import ResetAccountPasswordRequest from aliyunsdkcore.acs_exception.exceptions import ServerException from aliyunsdkcore.auth.credentials import StsTokenCredential from aliyunsdkcore.client import AcsClient from aliyunsdkkms.request.v20160120.GetRandomPasswordRequest import GetRandomPasswordRequest from aliyunsdkkms.request.v20160120.GetSecretValueRequest import GetSecretValueRequest from aliyunsdkkms.request.v20160120.PutSecretValueRequest import PutSecretValueRequest from aliyunsdkkms.request.v20160120.UpdateSecretVersionStageRequest import UpdateSecretVersionStageRequest logger = logging.getLogger() logger.setLevel(logging.INFO) def handler(event, context): evt = json.loads(event) secret_name = evt['SecretName'] region_id = evt['RegionId'] step = evt['Step'] instance_id = evt['InstanceId'] version_id = evt.get('VersionId') if not version_id: version_id = context.requestId credentials = StsTokenCredential(context.credentials.accessKeyId, context.credentials.accessKeySecret, context.credentials.securityToken) client = AcsClient(region_id=region_id, credential=credentials) endpoint = "kms-vpc." + region_id + ".aliyuncs.com" client.add_endpoint(region_id, 'kms', endpoint) resp = get_secret_value(client, secret_name) if "Generic" != resp['SecretType']: logger.error("Secret %s is not enabled for rotation" % secret_name) raise ValueError("Secret %s is not enabled for rotation" % secret_name) if step == "new": new_phase(client, secret_name, version_id) elif step == "set": set_phase(client, instance_id, secret_name, version_id) elif step == "end": end_phase(client, secret_name, version_id) else: logger.error("handler: Invalid step parameter %s for secret %s" % (step, secret_name)) raise ValueError("Invalid step parameter %s for secret %s" % (step, secret_name)) return {"VersionId": version_id} def new_phase(client, secret_name, version_id): current_dict = get_secret_dict(client, secret_name, "ACSCurrent") try: get_secret_dict(client, secret_name, "ACSPending", version_id) logger.info("new: Successfully retrieved secret for %s." % secret_name) except ServerException as e: if e.error_code != 'Forbidden.ResourceNotFound': raise ValueError("Can to find secret %s " % (secret_name)) current_dict['AccountName'] = get_alt_account_name(current_dict['AccountName']) exclude_characters = os.environ[ 'EXCLUDE_CHARACTERS'] if 'EXCLUDE_CHARACTERS' in os.environ else "\\\"\',./:;<>?[]{|}~`" passwd = get_random_password(client, exclude_characters) current_dict['AccountPassword'] = passwd['RandomPassword'] put_secret_value(client, secret_name, version_id, json.dumps(current_dict), json.dumps(['ACSPending'])) logger.info( "new: Successfully put secret for secret_name %s and version %s." % (secret_name, version_id)) def set_phase(client, instance_id, secret_name, version_id): current_dict = get_secret_dict(client, secret_name, "ACSCurrent") pending_dict = get_secret_dict(client, secret_name, "ACSPending", version_id) pending_resp = describe_accounts(client, instance_id, pending_dict["AccountName"]) pending_accounts = pending_resp["Accounts"]["DBInstanceAccount"] if get_alt_account_name(current_dict['AccountName']) != pending_dict['AccountName']: logger.error("set: Attempting to modify user %s other than current user or rotation %s" % ( pending_dict['AccountName'], current_dict['AccountName'])) raise ValueError("Attempting to modify user %s other than current user or rotation %s" % ( pending_dict['AccountName'], current_dict['AccountName'])) current_resp = describe_accounts(client, instance_id, current_dict["AccountName"]) current_accounts = current_resp["Accounts"]["DBInstanceAccount"] if len(current_accounts) == 0: logger.error("set: Unable to log into database using current credentials for secret %s" % secret_name) raise ValueError("Unable to log into database using current credentials for secret %s" % secret_name) if len(pending_accounts) == 0: create_rds_account(client, instance_id, pending_dict["AccountName"], pending_dict["AccountPassword"], current_accounts[0]["AccountType"]) pending_accounts = describe_accounts(client, instance_id, pending_dict["AccountName"])["Accounts"][ "DBInstanceAccount"] else: # reset password reset_account_password(client, instance_id, pending_dict["AccountName"], pending_dict["AccountPassword"]) current_privileges = current_accounts[0]["DatabasePrivileges"]["DatabasePrivilege"] pending_privileges = pending_accounts[0]["DatabasePrivileges"]["DatabasePrivilege"] if len(current_privileges) > 0: for current_privilege in current_privileges: is_contains = False for pending_privilege in pending_privileges: if current_privilege["DBName"] == pending_privilege["DBName"] and current_privilege[ "AccountPrivilege"] == pending_privilege["AccountPrivilege"]: is_contains = True continue if not is_contains: grant_account_privilege(client, instance_id, pending_dict["AccountName"], current_privilege["DBName"], current_privilege["AccountPrivilege"]) def end_phase(client, secret_name, version_id): update_secret_version_stage(client, secret_name, 'ACSCurrent', move_to_version=version_id) update_secret_version_stage(client, secret_name, 'ACSPending', remove_from_version=version_id) logger.info( "end: Successfully set ACSCurrent stage to version %s for secret %s." % (version_id, secret_name)) def get_secret_dict(client, secret_name, stage, version_id=None): required_fields = ['AccountName', 'AccountPassword'] if version_id: secret = get_secret_value(client, secret_name, version_id, stage) else: secret = get_secret_value(client, secret_name, stage=stage) plaintext = secret['SecretData'] secret_dict = json.loads(plaintext) for field in required_fields: if field not in secret_dict: raise KeyError("%s key is missing from secret JSON" % field) return secret_dict def get_alt_account_name(current_account_name): rotation_suffix = "_rt" if current_account_name.endswith(rotation_suffix): return current_account_name[:(len(rotation_suffix) * -1)] else: new_account_name = current_account_name + rotation_suffix if len(new_account_name) > 16: raise ValueError( "Unable to rotation user, account_name length with _rotation appended would exceed 16 characters") return new_account_name def get_secret_value(client, secret_name, version_id=None, stage=None): request = GetSecretValueRequest() request.set_accept_format('json') request.set_SecretName(secret_name) if version_id: request.set_VersionId(version_id) if stage: request.set_VersionStage(stage) response = client.do_action_with_exception(request) return json.loads(response) def put_secret_value(client, secret_name, version_id, secret_data, version_stages=None): request = PutSecretValueRequest() request.set_accept_format('json') request.set_SecretName(secret_name) request.set_VersionId(version_id) if version_stages: request.set_VersionStages(version_stages) request.set_SecretData(secret_data) response = client.do_action_with_exception(request) return json.loads(response) def get_random_password(client, exclude_characters=None): request = GetRandomPasswordRequest() request.set_accept_format('json') if exclude_characters: request.set_ExcludeCharacters(exclude_characters) response = client.do_action_with_exception(request) return json.loads(response) def update_secret_version_stage(client, secret_name, version_stage, remove_from_version=None, move_to_version=None): request = UpdateSecretVersionStageRequest() request.set_accept_format('json') request.set_VersionStage(version_stage) request.set_SecretName(secret_name) if remove_from_version: request.set_RemoveFromVersion(remove_from_version) if move_to_version: request.set_MoveToVersion(move_to_version) response = client.do_action_with_exception(request) return json.loads(response) def create_rds_account(client, db_instance_id, account_name, account_password, account_type): request = CreateAccountRequest() request.set_accept_format('json') request.set_DBInstanceId(db_instance_id) request.set_AccountName(account_name) request.set_AccountPassword(account_password) request.set_AccountType(account_type) response = client.do_action_with_exception(request) return json.loads(response) def grant_account_privilege(client, db_instance_id, account_name, db_name, account_privilege): request = GrantAccountPrivilegeRequest() request.set_accept_format('json') request.set_DBInstanceId(db_instance_id) request.set_AccountName(account_name) request.set_DBName(db_name) request.set_AccountPrivilege(account_privilege) response = client.do_action_with_exception(request) return json.loads(response) def describe_accounts(client, db_instance_id, account_name): request = DescribeAccountsRequest() request.set_accept_format('json') request.set_DBInstanceId(db_instance_id) request.set_AccountName(account_name) response = client.do_action_with_exception(request) return json.loads(response) def reset_account_password(client, db_instance_id, account_name, account_password): request = ResetAccountPasswordRequest() request.set_accept_format('json') request.set_DBInstanceId(db_instance_id) request.set_AccountName(account_name) request.set_AccountPassword(account_password) response = client.do_action_with_exception(request) return json.loads(response)
3. Create a Serverless Workflow flow for secret rotation
Create a secret rotation flow. For more information, see Create a flow.
Log on to the Serverless Workflow console.
In the top navigation bar, select the region where you want to create a flow.
ImportantYou must select the region where the functions are created.
On the Flows page, click Create flow.
On the Create Flow page, click Create Flow with Code, configure the parameters, and then click Next Step.
You must change the content in the YAML file of Definition to the following code:
version: v1 type: flow steps: - type: task name: RotateSecretNew resourceArn: the Alibaba Cloud Resource Name (ARN) of the function that you created inputMappings: - target: SecretName source: $input.payload.SecretName - target: RegionId source: $input.payload.RegionId - target: InstanceId source: $input.payload.InstanceId - target: Step source: new - type: task name: RotateSecretSet resourceArn: the ARN of the function that you created inputMappings: - target: SecretName source: $input.payload.SecretName - target: RegionId source: $input.payload.RegionId - target: InstanceId source: $input.payload.InstanceId - target: Step source: set - target: VersionId source: $local.VersionId - type: task name: RotateSecretEnd resourceArn: the ARN of the function that you created inputMappings: - target: SecretName source: $input.payload.SecretName - target: RegionId source: $input.payload.RegionId - target: InstanceId source: $input.payload.InstanceId - target: Step source: end - target: VersionId source: $local.VersionId
Configure a flow role.
Click Create Flow.
Configure settings to rotate the generic secret at a scheduled time. For more information, see Create a time-based schedule.
If you want to use Serverless Workflow to configure scheduled rotation of the generic secret, set Payload to the following content:
{ "SecretName": "", "RegionId": "", "InstanceId":"" }
NoteSecretName specifies the secret name, RegionId specifies the region, and InstanceId specifies the ID of your ApsaraDB RDS instance.
4. (Optional) Connect applications to Secrets Manager
You can use Secrets Manager JDBC to connect applications to Secrets Manager. For more information, see Secrets Manager JDBC.
Rotate generic secrets of a self-managed MySQL database in dual-account mode by using a privileged account
You can use Serverless Workflow and Function Compute to rotate generic secrets.
1. Prepare for secret rotation
Activate Function Compute. For more information, see Activate Function Compute.
Create a privileged account that is used to log on to your self-managed MySQL database and create a generic secret for the privileged account in the KMS console. The privileged account is also used to create an account or modify the password of an account that is used to log on to your self-managed MySQL database.
The secret value is in the JSON format. Example:
{ "Endpoint": "", "AccountName": "", "AccountPassword": "", "SSL":false }
NoteEndpoint specifies the domain name or address of your self-managed MySQL database, and AccountName specifies the username of the privileged account. AccountPassword specifies the password of the privileged account, and SSL specifies whether to use a certificate. Valid values for SSL are true and false. Default value: false. By default, the certificate is stored in the /opt/python/certs/cert.pem directory.
Create an account that is used to log on to your self-managed MySQL database and create a generic secret for the account in the KMS console. If the generic secret is rotated for the first time, a new account is created.
The secret value is in the JSON format. Example: The secret name is used as the scheduling parameter that will be passed to Serverless Workflow.
{ "Endpoint": "", "AccountName": "", "AccountPassword": "", "MasterSecret":"", "SSL":false }
NoteEndpoint specifies the domain name or address of your self-managed MySQL database, AccountName specifies the username of the account that is used to log on to your self-managed MySQL database, AccountPassword specifies the password of the account that is used to log on to your self-managed MySQL database, MasterSecret specifies the privileged account, and SSL specifies whether to use a certificate. Valid values for SSL are true and false. Default value: false. By default, the certificate is stored in the /opt/python/certs/cert.pem directory.
Create a normal service role for Function Compute and grant permissions to allow Function Compute to access KMS. For more information, see Grant Function Compute permissions to access other Alibaba Cloud services.
Attach the AliyunFCDefaultRolePolicy policy to the normal service role of Function Compute.
Attach the AliyunSTSAssumeRoleAccess system policy to the normal service role.
Attach a custom policy that allows access to KMS to the normal service role. The following script shows the content of the policy:
{ "Version": "1", "Statement": [ { "Effect": "Allow", "Action": [ "kms:GetSecretValue", "kms:GetRandomPassword", "kms:Decrypt", "kms:GenerateDataKey", "kms:PutSecretValue", "kms:UpdateSecretVersionStage" ], "Resource": "*" } ] }
2. Create secret rotation functions
Create a Function Compute service. For more information, see Quickly create a function.
When you create a Function Compute service, you must click Show Advanced Options and set Access to VPC to Yes. Then, configure VPC, vSwitch, and Security Group. Make sure that the Function Compute service can access ApsaraDB RDS API and KMS by using the specified security group and vSwitch in the specified VPC.
Create functions. For more information, see Quickly create a function.
In this example, Python is used. When you create the functions, select Python 3.9 for Runtime Environments. The following code provides an example. You can modify the code based on your business requirements.
# -*- coding: utf-8 -*- import json import logging import os try: import pymysql except: os.system('pip install pymysql -t ./') import pymysql from aliyunsdkcore.acs_exception.exceptions import ServerException from aliyunsdkcore.auth.credentials import StsTokenCredential from aliyunsdkcore.client import AcsClient from aliyunsdkkms.request.v20160120.GetRandomPasswordRequest import GetRandomPasswordRequest from aliyunsdkkms.request.v20160120.GetSecretValueRequest import GetSecretValueRequest from aliyunsdkkms.request.v20160120.PutSecretValueRequest import PutSecretValueRequest from aliyunsdkkms.request.v20160120.UpdateSecretVersionStageRequest import UpdateSecretVersionStageRequest from aliyunsdkrds.request.v20140815.DescribeDBInstancesRequest import DescribeDBInstancesRequest logger = logging.getLogger() logger.setLevel(logging.INFO) def handler(event, context): evt = json.loads(event) secret_name = evt['SecretName'] region_id = evt['RegionId'] step = evt['Step'] version_id = evt.get('VersionId') if not version_id: version_id = context.requestId credentials = StsTokenCredential(context.credentials.accessKeyId, context.credentials.accessKeySecret, context.credentials.securityToken) client = AcsClient(region_id=region_id, credential=credentials) endpoint = "kms-vpc." + region_id + ".aliyuncs.com" client.add_endpoint(region_id, 'kms', endpoint) resp = get_secret_value(client, secret_name) if "Generic" != resp['SecretType']: logger.error("Secret %s is not enabled for rotation" % secret_name) raise ValueError("Secret %s is not enabled for rotation" % secret_name) if step == "new": new_phase(client, secret_name, version_id) elif step == "set": set_phase(client, secret_name, version_id) elif step == "test": test_phase(client, secret_name, version_id) elif step == "end": end_phase(client, secret_name, version_id) else: logger.error("handler: Invalid step parameter %s for secret %s" % (step, secret_name)) raise ValueError("Invalid step parameter %s for secret %s" % (step, secret_name)) return {"VersionId": version_id} def new_phase(client, secret_name, version_id): current_dict = get_secret_dict(client, secret_name, "ACSCurrent") try: get_secret_dict(client, secret_name, "ACSPending", version_id) logger.info("new: Successfully retrieved secret for %s." % secret_name) except ServerException as e: if e.error_code != 'Forbidden.ResourceNotFound': raise current_dict['AccountName'] = get_alt_account_name(current_dict['AccountName']) exclude_characters = os.environ['EXCLUDE_CHARACTERS'] if 'EXCLUDE_CHARACTERS' in os.environ else '/@"\'\\' passwd = get_random_password(client, exclude_characters) current_dict['AccountPassword'] = passwd['RandomPassword'] put_secret_value(client, secret_name, version_id, json.dumps(current_dict), json.dumps(['ACSPending'])) logger.info( "new: Successfully put secret for secret_name %s and version %s." % (secret_name, version_id)) def set_phase(client, secret_name, version_id): current_dict = get_secret_dict(client, secret_name, "ACSCurrent") pending_dict = get_secret_dict(client, secret_name, "ACSPending", version_id) conn = get_connection(pending_dict) if conn: conn.close() logger.info( "set: ACSPending secret is already set as password in MySQL DB for secret secret_name %s." % secret_name) return if get_alt_account_name(current_dict['AccountName']) != pending_dict['AccountName']: logger.error("set: Attempting to modify user %s other than current user or rotation %s" % ( pending_dict['AccountName'], current_dict['AccountName'])) raise ValueError("Attempting to modify user %s other than current user or rotation %s" % ( pending_dict['AccountName'], current_dict['AccountName'])) if current_dict['Endpoint'] != pending_dict['Endpoint']: logger.error("set: Attempting to modify user for Endpoint %s other than current Endpoint %s" % ( pending_dict['Endpoint'], current_dict['Endpoint'])) raise ValueError("Attempting to modify user for Endpoint %s other than current Endpoint %s" % ( pending_dict['Endpoint'], current_dict['Endpoint'])) conn = get_connection(current_dict) if not conn: logger.error("set: Unable to access the given database using current credentials for secret %s" % secret_name) raise ValueError("Unable to access the given database using current credentials for secret %s" % secret_name) conn.close() master_secret = current_dict['MasterSecret'] master_dict = get_secret_dict(client, master_secret, "ACSCurrent") if current_dict['Endpoint'] != master_dict['Endpoint'] and not is_rds_replica_database(current_dict, master_dict): logger.error("set: Current database Endpoint %s is not the same Endpoint as/rds replica of master %s" % ( current_dict['Endpoint'], master_dict['Endpoint'])) raise ValueError("Current database Endpoint %s is not the same Endpoint as/rds replica of master %s" % ( current_dict['Endpoint'], master_dict['Endpoint'])) conn = get_connection(master_dict) if not conn: logger.error( "set: Unable to access the given database using credentials in master secret secret %s" % master_secret) raise ValueError("Unable to access the given database using credentials in master secret secret %s" % master_secret) try: with conn.cursor() as cur: cur.execute("SELECT User FROM mysql.user WHERE User = %s", pending_dict['AccountName']) if cur.rowcount == 0: cur.execute("CREATE USER %s IDENTIFIED BY %s", (pending_dict['AccountName'], pending_dict['AccountPassword'])) cur.execute("SHOW GRANTS FOR %s", current_dict['AccountName']) for row in cur.fetchall(): if 'XA_RECOVER_ADMIN' in row[0]: continue grant = row[0].split(' TO ') new_grant_escaped = grant[0].replace('%', '%%') # % is a special cha30racter in Python format strings. cur.execute(new_grant_escaped + " TO %s ", (pending_dict['AccountName'],)) cur.execute("SELECT VERSION()") ver = cur.fetchone()[0] escaped_encryption_statement = get_escaped_encryption_statement(ver) cur.execute("SELECT ssl_type, ssl_cipher, x509_issuer, x509_subject FROM mysql.user WHERE User = %s", current_dict['AccountName']) tls_options = cur.fetchone() ssl_type = tls_options[0] if not ssl_type: cur.execute(escaped_encryption_statement + " NONE", pending_dict['AccountName']) elif ssl_type == "ANY": cur.execute(escaped_encryption_statement + " SSL", pending_dict['AccountName']) elif ssl_type == "X509": cur.execute(escaped_encryption_statement + " X509", pending_dict['AccountName']) else: cur.execute(escaped_encryption_statement + " CIPHER %s AND ISSUER %s AND SUBJECT %s", (pending_dict['AccountName'], tls_options[1], tls_options[2], tls_options[3])) password_option = get_password_option(ver) cur.execute("SET PASSWORD FOR %s = " + password_option, (pending_dict['AccountName'], pending_dict['AccountPassword'])) conn.commit() logger.info("set: Successfully changed password for %s in MySQL DB for secret secret_name %s." % ( pending_dict['AccountName'], secret_name)) finally: conn.close() def test_phase(client, secret_name, version_id): conn = get_connection(get_secret_dict(client, secret_name, "ACSPending", version_id)) if conn: try: with conn.cursor() as cur: cur.execute("SELECT NOW()") conn.commit() finally: conn.close() logger.info("test: Successfully accessed into MySQL DB with ACSPending secret in %s." % secret_name) return else: logger.error( "test: Unable to access the given database with pending secret of secret secret_name %s" % secret_name) raise ValueError("Unable to access the given database with pending secret of secret secret_name %s" % secret_name) def end_phase(client, secret_name, version_id): update_secret_version_stage(client, secret_name, 'ACSCurrent', move_to_version=version_id) update_secret_version_stage(client, secret_name, 'ACSPending', remove_from_version=version_id) logger.info( "end: Successfully update ACSCurrent stage to version %s for secret %s." % (version_id, secret_name)) def get_connection(secret_dict): port = int(secret_dict['Port']) if 'Port' in secret_dict else 3306 dbname = secret_dict['DBName'] if 'DBName' in secret_dict else None use_ssl, fall_back = get_ssl_config(secret_dict) conn = connect_and_authenticate(secret_dict, port, dbname, use_ssl) if conn or not fall_back: return conn else: return connect_and_authenticate(secret_dict, port, dbname, False) def get_ssl_config(secret_dict): if 'SSL' not in secret_dict: return True, True if isinstance(secret_dict['SSL'], bool): return secret_dict['SSL'], False if isinstance(secret_dict['SSL'], str): ssl = secret_dict['SSL'].lower() if ssl == "true": return True, False elif ssl == "false": return False, False else: return True, True return True, True def connect_and_authenticate(secret_dict, port, dbname, use_ssl): ssl = {'ca': '/opt/python/certs/cert.pem'} if use_ssl else None try: conn = pymysql.connect(host=secret_dict['Endpoint'], user=secret_dict['AccountName'], password=secret_dict['AccountPassword'], port=port, database=dbname, connect_timeout=5, ssl=ssl) logger.info("Successfully established %s connection as user '%s' with Endpoint: '%s'" % ( "SSL/TLS" if use_ssl else "non SSL/TLS", secret_dict['AccountName'], secret_dict['Endpoint'])) return conn except pymysql.OperationalError as e: if 'certificate verify failed: IP address mismatch' in e.args[1]: logger.error( "Hostname verification failed when estlablishing SSL/TLS Handshake with Endpoint: %s" % secret_dict[ 'Endpoint']) return None def get_secret_dict(client, secret_name, stage, version_id=None): required_fields = ['Endpoint', 'AccountName', 'AccountPassword'] if version_id: secret = get_secret_value(client, secret_name, version_id, stage) else: secret = get_secret_value(client, secret_name, stage=stage) plaintext = secret['SecretData'] secret_dict = json.loads(plaintext) for field in required_fields: if field not in secret_dict: raise KeyError("%s key is missing from secret JSON" % field) return secret_dict def get_alt_account_name(current_account_name): rotation_suffix = "_rt" if current_account_name.endswith(rotation_suffix): return current_account_name[:(len(rotation_suffix) * -1)] else: new_account_name = current_account_name + rotation_suffix if len(new_account_name) > 16: raise ValueError( "Unable to rotate user, account_name length with _rotation appended would exceed 16 characters") return new_account_name def get_password_option(version): if version.startswith("8"): return "%s" else: return "PASSWORD(%s)" def get_escaped_encryption_statement(version): if version.startswith("5.6"): return "GRANT USAGE ON *.* TO %s@'%%' REQUIRE" else: return "ALTER USER %s@'%%' REQUIRE" def is_rds_replica_database(client, replica_dict, master_dict): replica_instance_id = replica_dict['Endpoint'].split(".")[0].replace('io', '') master_instance_id = master_dict['Endpoint'].split(".")[0].replace('io', '') try: describe_response = describe_db_instances(client, replica_instance_id) except Exception as err: logger.warning("Encountered error while verifying rds replica status: %s" % err) return False items = describe_response['Items'] instances = items.get("DBInstance") if not instances: logger.info("Cannot verify replica status - no RDS instance found with identifier: %s" % replica_instance_id) return False current_instance = instances[0] return master_instance_id == current_instance.get('DBInstanceId') def get_secret_value(client, secret_name, version_id=None, stage=None): request = GetSecretValueRequest() request.set_accept_format('json') request.set_SecretName(secret_name) if version_id: request.set_VersionId(version_id) if stage: request.set_VersionStage(stage) response = client.do_action_with_exception(request) return json.loads(response) def put_secret_value(client, secret_name, version_id, secret_data, version_stages=None): request = PutSecretValueRequest() request.set_accept_format('json') request.set_SecretName(secret_name) request.set_VersionId(version_id) if version_stages: request.set_VersionStages(version_stages) request.set_SecretData(secret_data) response = client.do_action_with_exception(request) return json.loads(response) def get_random_password(client, exclude_characters=None): request = GetRandomPasswordRequest() request.set_accept_format('json') if exclude_characters: request.set_ExcludeCharacters(exclude_characters) response = client.do_action_with_exception(request) return json.loads(response) def update_secret_version_stage(client, secret_name, version_stage, remove_from_version=None, move_to_version=None): request = UpdateSecretVersionStageRequest() request.set_accept_format('json') request.set_VersionStage(version_stage) request.set_SecretName(secret_name) if remove_from_version: request.set_RemoveFromVersion(remove_from_version) if move_to_version: request.set_MoveToVersion(move_to_version) response = client.do_action_with_exception(request) return json.loads(response) def describe_db_instances(client, db_instance_id): request = DescribeDBInstancesRequest() request.set_accept_format('json') request.set_DBInstanceId(db_instance_id) response = client.do_action_with_exception(request) return json.loads(response)
Create a custom layer. For more information, see Create a custom layer.
You can use custom layers to prevent frequent installation of dependencies. In this example, Python is used. In the sample code, you need to install the PyMySQL dependencies. If an SSL certificate is used for the MySQL database, you can also add the SSL certificate to the layer.
Build the ZIP package of the layer.
Run the
mkdir my-secret-rotate
command to create a working directory.Run the
cd my-secret-rotate
command to go to the working directory.Run the
pip install --target ./python pymysql
command to install the dependencies in the my-secret-rotate/python directory.If an SSL certificate is used for the MySQL database, create the certs directory in the python directory and save the cert.pem file to the certs directory.
Go to the my-secret-rotate directory and run the
zip -r my-secret-rotate.zip python
command to package the dependencies.
Create a custom layer in the Function Compute console.
Configure the custom layer for the functions. For more information, see Manage layers.
On the Services page, find the service whose function you want to manage and click Functions in the Actions column.
Click Functions, find the function that you want to manage, and then click Configure in the Actions column. In the Layers section, add the custom layer for the function.
3. Create a Serverless Workflow flow for secret rotation
Create a secret rotation flow. For more information, see Create a flow.
Log on to the Serverless Workflow console.
In the top navigation bar, select the region where you want to create a flow.
ImportantYou must select the region where the functions are created.
On the Flows page, click Create flow.
On the Create Flow page, click Create Flow with Code, configure the parameters, and then click Next Step.
You must change the content in the YAML file of Definition to the following code:
version: v1 type: flow steps: - type: task name: RotateSecretNew resourceArn: the ARN of the function that you created inputMappings: - target: SecretName source: $input.payload.SecretName - target: RegionId source: $input.payload.RegionId - target: Step source: new - type: task name: RotateSecretSet resourceArn: the ARN of the function that you created inputMappings: - target: SecretName source: $input.payload.SecretName - target: RegionId source: $input.payload.RegionId - target: Step source: set - target: VersionId source: $local.VersionId - type: task name: RotateSecretTest resourceArn: the ARN of the function that you created inputMappings: - target: SecretName source: $input.payload.SecretName - target: RegionId source: $input.payload.RegionId - target: Step source: test - target: VersionId source: $local.VersionId - type: task name: RotateSecretEnd resourceArn: the ARN of the function that you created inputMappings: - target: SecretName source: $input.payload.SecretName - target: RegionId source: $input.payload.RegionId - target: Step source: end - target: VersionId source: $local.VersionId
Configure a flow role.
Click Create Flow.
Configure settings to rotate the generic secrets at a scheduled time. For more information, see Create a time-based schedule.
If you want to use Serverless Workflow to configure scheduled rotation of the generic secrets, set Payload to the following content:
{ "SecretName": "", "RegionId": "" }
NoteSecretName specifies the secret name and RegionId specifies the region.
4. (Optional) Connect applications to Secrets Manager
You can use Secrets Manager JDBC to connect applications to Secrets Manager. For more information, see Secrets Manager JDBC.