本文通过编写函数代码检测ECS实例中的CPU核数为例,为您介绍基于函数计算2.0创建自定义规则的完整操作流程。
前提条件
请确保您已开通函数计算服务。具体操作,请参见开通服务。
关于函数计算服务的收费标准,请参见计费概览。
背景信息
关于自定义函数规则的概念、应用场景、运行原理等,请参见自定义函数规则定义和运行原理。
操作步骤
本文通过编写函数代码检测ECS实例中的CPU核数为例,当CPU核数小于或等于2时,ECS实例合规;反之,ECS实例不合规。
创建服务。
登录函数计算控制台。
在左侧导航栏,单击服务及函数。
在顶部菜单栏,选择地域,例如:新加坡(新加坡)。
在服务列表页面,单击创建服务。
在创建服务面板,输入服务的名称,其他参数均保持默认值。
单击确定。
创建函数。
在步骤 1创建的服务中,单击创建函数。
在创建函数页面,输入函数名称,请求处理程序类型选择处理事件请求,运行环境选择Python 3.10,其他参数保持默认值。
单击创建。
配置函数代码。
拷贝并粘贴如下代码至文件index.py。
# # !/usr/bin/env python # # -*- encoding: utf-8 -*- import json import logging from aliyunsdkcore.client import AcsClient from aliyunsdkcore.request import CommonRequest logger = logging.getLogger() # 合规类型 COMPLIANCE_TYPE_COMPLIANT = 'COMPLIANT' COMPLIANCE_TYPE_NON_COMPLIANT = 'NON_COMPLIANT' COMPLIANCE_TYPE_NOT_APPLICABLE = 'NOT_APPLICABLE' # 资源配置推送类型 CONFIGURATION_TYPE_COMMON = 'COMMON' CONFIGURATION_TYPE_OVERSIZE = 'OVERSIZE' CONFIGURATION_TYPE_NONE = 'NONE' # 入口函数,完成业务逻辑编排和处理。 def handler(event, context): """ 处理函数 :param event:事件 :param context:上下文 :return:评估结果 """ # 函数入参 logger.info(f'打印函数入参:{event}') # 校验Event,代码可直接复制。 evt = validate_event(event) if not evt: return None creds = context.credentials rule_parameters = evt.get('ruleParameters') result_token = evt.get('resultToken') invoking_event = evt.get('invokingEvent') ordering_timestamp = evt.get('orderingTimestamp') # 资源配置信息。当规则触发机制设置为配置变更时,该入参有值。当您新建规则或手动执行规则时,配置审计会逐个调用函数触发对所有资源的评估。如果资源配置变更,配置审计根据变更的资源信息自动调用函数触发一次资源评估。 configuration_item = invoking_event.get('configurationItem') account_id = configuration_item.get('accountId') resource_id = configuration_item.get('resourceId') resource_type = configuration_item.get('resourceType') region_id = configuration_item.get('regionId') resource_name = configuration_item.get('resourceName') # 判断当前推送的资源配置信息是否大于配置(100 KB),如果是,则需要调用资源详情API进行完整数据查询。 configuration_type = invoking_event.get('configurationType') if configuration_type and configuration_type == CONFIGURATION_TYPE_OVERSIZE: resource_result = get_discovered_resource(creds, resource_id, resource_type, region_id) resource_json = json.loads(resource_result) configuration_item["configuration"] = resource_json["DiscoveredResourceDetail"]["Configuration"] # 对资源进行评估,需要根据实际业务自行实现评估逻辑,以下代码仅供参考。 compliance_type, annotation = evaluate_configuration_item( rule_parameters, configuration_item) # 设置评估结果,格式需符合以下示例要求。 evaluations = [ { 'accountId': account_id, 'complianceResourceId': resource_id, 'complianceResourceName': resource_name, 'complianceResourceType': resource_type, 'complianceRegionId': region_id, 'orderingTimestamp': ordering_timestamp, 'complianceType': compliance_type, 'annotation': annotation } ] # 将评估结果返回并写入配置审计,代码可直接复制。 put_evaluations(creds, result_token, evaluations) return evaluations # 评估ECS实例的CPU核数。 def evaluate_configuration_item(rule_parameters, configuration_item): """ 评估逻辑 :param rule_parameters:规则参数 :param configuration_item:配置项 :return:评估类型 """ # 初始化返回值 compliance_type = COMPLIANCE_TYPE_COMPLIANT annotation = None # 获取资源配置完整信息 full_configuration = configuration_item['configuration'] if not full_configuration: annotation = 'Configuration is empty.' return compliance_type, annotation # 转换为JSON configuration = parse_json(full_configuration) cpu_count = configuration.get('Cpu') eq_count = rule_parameters.get('CpuCount') if cpu_count and cpu_count <= int(eq_count): annotation = json.dumps({"configuration":cpu_count,"desiredValue":eq_count,"operator":"Greater","property":"$.Cpu"}) compliance_type = COMPLIANCE_TYPE_NON_COMPLIANT return compliance_type, annotation return compliance_type, annotation def validate_event(event): """ 校验Event :param event:Event :return:JSON对象 """ if not event: logger.error('Event is empty.') evt = parse_json(event) logger.info('Loading event: %s .' % json.dumps(evt)) if 'resultToken' not in evt: logger.error('ResultToken is empty.') return None if 'ruleParameters' not in evt: logger.error('RuleParameters is empty.') return None if 'invokingEvent' not in evt: logger.error('InvokingEvent is empty.') return None return evt def parse_json(content): """ JSON类型转换 :param content:JSON字符串 :return:JSON对象 """ try: return json.loads(content) except Exception as e: logger.error('Parse content:{} to json error:{}.'.format(content, e)) return None # 评估结果返回,并写入配置审计,代码可直接复制。 def put_evaluations(creds, result_token, evaluations): """ 调用API返回并写入评估结果 :param context:函数计算上下文 :param result_token:回调令牌 :param evaluations:评估结果 :return: None """ # 需具备权限AliyunConfigFullAccess的函数计算FC的服务角色。 client = AcsClient(creds.access_key_id, creds.access_key_secret, region_id='cn-shanghai') # 新建Request,并设置参数,Domain为config.cn-shanghai.aliyuncs.com。 request = CommonRequest() request.set_domain('config.cn-shanghai.aliyuncs.com') request.set_version('2019-01-08') request.set_action_name('PutEvaluations') request.add_body_params('ResultToken', result_token) request.add_body_params('Evaluations', evaluations) request.add_body_params('SecurityToken', creds.security_token) request.set_method('POST') try: response = client.do_action_with_exception(request) logger.info('PutEvaluations with request: {}, response: {}.'.format(request, response)) except Exception as e: logger.error('PutEvaluations error: %s' % e)
单击左上角的部署代码。
基于函数计算创建自定义规则。
登录配置审计控制台。
(可选)在左上角选择目标账号组。
仅资源目录下的管理账号需要执行该操作。单个阿里云账号不涉及。
在左侧导航栏,选择
。在规则页面,单击新建规则。
在选择创建方式页面,先选择基于函数计算自定义,然后选择函数ARN,再单击下一步。
在设置基本属性页面,先输入规则名称,再单击添加规则入参,规则入参名称输入CpuCount,期望值输入2,然后触发机制选择配置变更,最后单击下一步。
说明规则入参名称需要与步骤 3代码中的
rule_parameters
保持一致。当您在创建规则、修改规则和规则重新审计资源时,配置审计会将目标资源类型的所有资源配置信息逐条推送至函数计算,并触发函数对其进行评估。
在设置生效范围页面,资源类型选择ECS实例,单击下一步。
说明请您选择实际待评估的资源类型,避免选择所有资源类型触发无效评估。
选择规则关联的资源后,规则将检测您账号下该资源类型的所有资源。
在设置修正页面,单击提交。
说明您可以打开设置修正开关,根据控制台提示,设置自定义修正。具体操作,请参见设置自定义修正。
查看规则对ECS实例中CPU核数的检测结果。
在规则列表中,您可以看到目标规则检测出的不合规资源数。
说明单击规则ID或操作列详情,您可以查看最新检测数据列表。
当目标规则的不合规资源列显示无数据,但您确认当前账号下有该规则中设置的资源时,说明函数调用失败或函数计算提交评估结果到配置审计失败。您可以在目标函数的调用日志页签,单击操作列的请求日志,定位失败原因,并逐步修改。具体操作,请参见查看调用日志。