本文通過編寫函數代碼檢測ECS執行個體中的CPU核心數為例,為您介紹基於Function Compute2.0建立自訂規則的完整操作流程。
前提條件
請確保您已開通Function Compute服務。具體操作,請參見開通服務。
關於Function Compute服務的收費標準,請參見計費概覽。
背景資訊
關於自訂函數規則的概念、應用情境、運行原理等,請參見自訂函數規則定義和運行原理。
操作步驟
本文通過編寫函數代碼檢測ECS執行個體中的CPU核心數為例,當CPU核心數小於或等於2時,ECS執行個體合規;反之,ECS執行個體不合規。
建立服務。
在左側導覽列,單擊服務及函數。
在頂部功能表列,選擇地區,例如:新加坡(新加坡)。
在服務列表頁面,單擊建立服務。
在建立服務面板,輸入服務的名稱,其他參數均保持預設值。
單擊確定。
建立函數。
在步驟 1建立的服務中,單擊建立函數。
在建立函數頁面,輸入函數名稱,請求處理常式類型選擇處理事件請求,運行環境選擇Python 3.10,其他參數保持預設值。
單擊建立。
配置函數代碼。
拷貝並粘貼如下代碼至檔案index.py。
# # !/usr/bin/env python # # -*- encoding: utf-8 -*- import json import logging from aliyunsdkcore.client import AcsClient from aliyunsdkcore.request import CommonRequest logger = logging.getLogger() # 合規類型 COMPLIANCE_TYPE_COMPLIANT = 'COMPLIANT' COMPLIANCE_TYPE_NON_COMPLIANT = 'NON_COMPLIANT' COMPLIANCE_TYPE_NOT_APPLICABLE = 'NOT_APPLICABLE' # 資源配置推送類型 CONFIGURATION_TYPE_COMMON = 'COMMON' CONFIGURATION_TYPE_OVERSIZE = 'OVERSIZE' CONFIGURATION_TYPE_NONE = 'NONE' # 入口函數,完成商務邏輯編排和處理。 def handler(event, context): """ 處理函數 :param event:事件 :param context:上下文 :return:評估結果 """ # 函數入參 logger.info(f'列印函數入參:{event}') # 校正Event,代碼可直接複製。 evt = validate_event(event) if not evt: return None creds = context.credentials rule_parameters = evt.get('ruleParameters') result_token = evt.get('resultToken') invoking_event = evt.get('invokingEvent') ordering_timestamp = evt.get('orderingTimestamp') # 資源配置資訊。當規則引發機制設定為配置變更時,該入參有值。當您建立規則或手動執行規則時,配置審計會逐個調用函數觸發對所有資源的評估。如果資源配置變更,配置審計根據變更的資源資訊自動調用函數觸發一次資源評估。 configuration_item = invoking_event.get('configurationItem') account_id = configuration_item.get('accountId') resource_id = configuration_item.get('resourceId') resource_type = configuration_item.get('resourceType') region_id = configuration_item.get('regionId') resource_name = configuration_item.get('resourceName') # 判斷當前推送的資源配置資訊是否大於配置(100 KB),如果是,則需要調用資源詳情API進行完整資料查詢。 configuration_type = invoking_event.get('configurationType') if configuration_type and configuration_type == CONFIGURATION_TYPE_OVERSIZE: resource_result = get_discovered_resource(creds, resource_id, resource_type, region_id) resource_json = json.loads(resource_result) configuration_item["configuration"] = resource_json["DiscoveredResourceDetail"]["Configuration"] # 對資源進行評估,需要根據實際業務自行實現評估邏輯,以下代碼僅供參考。 compliance_type, annotation = evaluate_configuration_item( rule_parameters, configuration_item) # 設定評估結果,格式需符合以下樣本要求。 evaluations = [ { 'accountId': account_id, 'complianceResourceId': resource_id, 'complianceResourceName': resource_name, 'complianceResourceType': resource_type, 'complianceRegionId': region_id, 'orderingTimestamp': ordering_timestamp, 'complianceType': compliance_type, 'annotation': annotation } ] # 將評估結果返回並寫入配置審計,代碼可直接複製。 put_evaluations(creds, result_token, evaluations) return evaluations # 評估ECS執行個體的CPU核心數。 def evaluate_configuration_item(rule_parameters, configuration_item): """ 評估邏輯 :param rule_parameters:規則參數 :param configuration_item:配置項 :return:評估類型 """ # 初始化傳回值 compliance_type = COMPLIANCE_TYPE_COMPLIANT annotation = None # 擷取資源配置完整資訊 full_configuration = configuration_item['configuration'] if not full_configuration: annotation = 'Configuration is empty.' return compliance_type, annotation # 轉換為JSON configuration = parse_json(full_configuration) cpu_count = configuration.get('Cpu') eq_count = rule_parameters.get('CpuCount') if cpu_count and cpu_count <= int(eq_count): annotation = json.dumps({"configuration":cpu_count,"desiredValue":eq_count,"operator":"Greater","property":"$.Cpu"}) compliance_type = COMPLIANCE_TYPE_NON_COMPLIANT return compliance_type, annotation return compliance_type, annotation def validate_event(event): """ 校正Event :param event:Event :return:JSON對象 """ if not event: logger.error('Event is empty.') evt = parse_json(event) logger.info('Loading event: %s .' % json.dumps(evt)) if 'resultToken' not in evt: logger.error('ResultToken is empty.') return None if 'ruleParameters' not in evt: logger.error('RuleParameters is empty.') return None if 'invokingEvent' not in evt: logger.error('InvokingEvent is empty.') return None return evt def parse_json(content): """ JSON類型轉換 :param content:JSON字串 :return:JSON對象 """ try: return json.loads(content) except Exception as e: logger.error('Parse content:{} to json error:{}.'.format(content, e)) return None # 評估結果返回,並寫入配置審計,代碼可直接複製。 def put_evaluations(creds, result_token, evaluations): """ 調用API返回並寫入評估結果 :param context:Function Compute上下文 :param result_token:回調令牌 :param evaluations:評估結果 :return: None """ # 需具備許可權AliyunConfigFullAccess的Function ComputeFC的服務角色。 client = AcsClient(creds.access_key_id, creds.access_key_secret, region_id='cn-shanghai') # 建立Request,並設定參數,Domain為config.cn-shanghai.aliyuncs.com。 request = CommonRequest() request.set_domain('config.cn-shanghai.aliyuncs.com') request.set_version('2019-01-08') request.set_action_name('PutEvaluations') request.add_body_params('ResultToken', result_token) request.add_body_params('Evaluations', evaluations) request.add_body_params('SecurityToken', creds.security_token) request.set_method('POST') try: response = client.do_action_with_exception(request) logger.info('PutEvaluations with request: {}, response: {}.'.format(request, response)) except Exception as e: logger.error('PutEvaluations error: %s' % e)
單擊左上方的部署代碼。
基於Function Compute建立自訂規則。
登入配置審計控制台。
(可選)在左上方選擇目標帳號組。
僅資來源目錄下的管理帳號需要執行該操作。單個阿里雲帳號不涉及。
在左側導覽列,選擇
。在規則頁面,單擊建立規則。
在選擇建立方式頁面,先選擇基於Function Compute自訂,然後選擇函數ARN,再單擊下一步。
在設定基本屬性頁面,先輸入規則名稱,再單擊添加規則入參,規則入參名稱輸入CpuCount,期望值輸入2,然後觸發機制選擇配置變更,最後單擊下一步。
說明規則入參名稱需要與步驟 3代碼中的
rule_parameters
保持一致。當您在建立規則、修改規則和規則重新審計資源時,配置審計會將目標資源類型的所有資源配置資訊逐條推送至Function Compute,並觸發函數對其進行評估。
在設定生效範圍頁面,資源類型選擇ECS執行個體,單擊下一步。
說明請您選擇實際待評估的資源類型,避免選擇所有資源類型觸發無效評估。
選擇規則關聯的資源後,規則將檢測您帳號下該資源類型的所有資源。
在設定修正頁面,單擊提交。
說明您可以開啟設定修正開關,根據控制台提示,設定自訂修正。具體操作,請參見設定自訂修正。
查看規則對ECS執行個體中CPU核心數的檢測結果。
在規則列表中,您可以看到目標規則檢測出的不合規資源數。
說明單擊規則ID或操作列詳情,您可以查看最新檢測資料列表。
當目標規則的不合規資源列顯示無資料,但您確認當前帳號下有該規則中設定的資源時,說明函數調用失敗或Function Compute提交評估結果到配置審計失敗。您可以在目標函數的調用日誌頁簽,單擊操作列的請求日誌,定位失敗原因,並逐步修改。具體操作,請參見查看調用日誌。