全部產品
Search
文件中心

Security Center:惡意檔案檢測SDK

更新時間:Nov 30, 2024

惡意檔案檢測SDK功能依託Security Center多引擎檢測平台,可識別離線檔案和阿里雲OSS檔案中存在的常見病毒,例如勒索病毒、挖礦程式等,防止惡意檔案傳播和執行。本文介紹如何使用惡意檔案檢測SDK功能。

功能說明

惡意檔案檢測SDK為雲端檢測方案,使用該功能會將您的檔案上傳到雲端進行檢測。

檢測的病毒類型

支援檢測的病毒類型(virus_type)中英文映射表

病毒類型(virus_type

病毒名稱

Backdoor

反彈Shell後門

DDoS

DDoS木馬

Downloader

下載器木馬

Engtest

引擎測試程式

Hacktool

駭客工具

Trojan

高危程式

Malbaseware

被汙染的基礎軟體

MalScript

惡意指令碼

Malware

惡意程式

Miner

挖礦程式

Proxytool

代理工具

RansomWare

勒索病毒

RiskWare

風險軟體

Rootkit

Rootkit

Stealer

竊密工具

Scanner

掃描器

Suspicious

可疑程式

Virus

感染型病毒

WebShell

網站後門

Worm

蠕蟲

AdWare

廣告軟體

Patcher

破解程式

Gametool

私服工具

檢測檔案說明

  • 支援對未加密的壓縮包解壓並進行檢測。

    • 在調用SDK檢測離線檔案情境下,預設不解壓,需要您自行配置解壓,掃描時可以設定是否識別壓縮檔並解壓、最大解壓層級和最大解壓檔案數。

    • 在OSS檔案檢測情境下,預設不解壓,需要您自行配置解壓,支援全域和單個Bucket粒度下配置壓縮包的解壓層級。

  • 支援對已佈建服務端加密的OSS資料進行解密後檢測。OSS針對不同使用情境提供了以下伺服器端加密方式,詳細內容,請參見伺服器端加密

    • 使用KMS託管密鑰進行加解密(SSE-KMS):使用KMS託管的預設CMK(Customer Master Key)或指定CMK進行加解密操作。資料無需通過網路發送到KMS服務端進行加解密。

    • 使用OSS完全託管密鑰進行加解密(SSE-OSS):使用OSS完全託管的祕密金鑰加密每個Object。

檢測惡意檔案的方式

  • 在商務服務器中調用SDK檢測離線檔案

    在您的業務程式中接入Security Center的惡意檔案檢測SDK,通過調用SDK的方式檢測惡意檔案,在返回結果中擷取惡意檔案資訊,且支援在Security Center控制台查看存在風險的檔案檢測結果。

    支援通過Java或Python方式接入。

  • 在Security Center控制台檢測OSS中儲存的檔案

    如果您待檢測的檔案儲存體在阿里雲Object Storage Service Bucket中,可以直接在Security Center控制台執行對目標OSS Bucket內檔案的檢測,並支援查看存在風險的檔案清單。

提供檢測結果

Security Center通過整合多個知名病毒檢測引擎的結果,並結合安全專家的經驗,基於檔案潛在惡意程度和檢測準確程度兩個關鍵維度,評定檢測出惡意檔案事件的風險等級(高危中危低危),並給出相應惡意檔案說明和處置建議。

日誌分析

如果已開通Security Center日誌分析服務,會將惡意檔案檢測記錄投遞到Security Center的專屬Logstore(日誌庫)中。詳細說明,請參見日誌分析說明惡意檔案檢測欄位說明

應用情境

應用情境

惡意檔案檢測說明

伺服器應用情境

在伺服器環境中,會常遇到各種大規模傳播的惡意檔案,例如蠕蟲、挖礦軟體、DDoS木馬以及惡意指令碼等。

這些惡意檔案通常具備自我複製和廣泛傳播的特性,旨在消耗系統資源、發起分散式阻斷服務攻擊或控制伺服器以實施非法活動。

伺服器定向攻擊情境

在伺服器定向攻擊情境中,為了確保系統的安全和穩定運行,需要特別關注駭客工具、代理工具及後門程式等惡意檔案。

這類攻擊突出隱蔽性和針對性,駭客通過植入此類惡意檔案來竊取敏感資訊、操控系統或作為跳板進一步滲透網路。

全系統內容檢測

在各種環境中,都應密切關注破壞力強的勒索病毒和感染型病毒。

勒索病毒通過加密使用者資料勒索贖金,感染型病毒能自我複製並擴散至其他檔案,兩者均能造成嚴重的資料損失和系統癱瘓。

辦公網與檔案儲存體環境檢測

在辦公網路和檔案儲存體環境中,特別需要注意防範惡意文檔類檔案(例如含有巨集病毒的Office檔案、帶有惡意負載的壓縮包等)的潛在威脅。

這些檔案通常通過偽裝成日常工作交流中的正常文檔,誘導使用者開啟從而實施攻擊。例如竊取登入憑據、部署遠端存取木馬等。

使用限制

  • 在OSS檔案檢測情境下,一次惡意檔案檢測僅可以檢測一個不超過300 MB的檔案。

  • 在調用SDK檢測情境下,一次惡意檔案檢測僅可以檢測一個不超過100 MB的檔案。

  • 試用版和企業版的預設介面請求頻率不同。

    • 試用版:10次/秒。

    • 企業版:20次/秒。

  • 支援檢測的壓縮包檔案類型有.7z.zip.tar.gz.rar.ar.bz2.xz.lzma

  • 一個壓縮包支援解壓的層級最多為5層,解壓後的檔案總個數最多為1,000個,解壓後的所有檔案總大小最多為1 GB。對於超出限制範圍的檔案,不會被檢測。

  • 惡意檔案的檢測速度會受到網速、電腦效能、雲產品限制等方面的影響。惡意檔案檢測SDK內部採用隊列的方式,緩解外部峰值時的請求來提高並發率。當內部的隊列滿時,拒絕外部繼續提交請求,並使用等待介面,等待隊列有空間可用時再處理外部請求。

    您可以通過增加隊列長度來提高並發量,該方法會影響部分樣本的檢測時間長度。timeout_ms參數為樣本逾時時間,單位為毫秒。為了減少逾時,建議您將timeout_ms參數設定為60,000毫秒(即60秒)。

  • 在OSS檔案檢測情境下,僅支援儲存類型為標準儲存低頻訪問的檔案檢測,不支援Archive Storage類型的檔案檢測。儲存類型詳細內容,請參見儲存類型

  • 在OSS檔案檢測情境下,支援檢測的OSS Bucket的所屬地區包括:華北1(青島)、華北2(北京)、華北3(張家口)、華北5(呼和浩特)、華東1(杭州)、華東2(上海)、華南1(深圳)、華南2(河源)、華南3(廣州)、西南1(成都)、中國香港、新加坡、印尼(雅加達)、泰國(曼穀)、菲律賓(馬尼拉)、馬來西亞(吉隆坡)、韓國(首爾)、日本(東京)、美國(矽谷)、英國(倫敦)、美國(維吉尼亞)、德國(法蘭克福)。

計費說明

使用惡意檔案檢測SDK功能會消耗檔案檢測次數。如果是壓縮包檔案,每個壓縮包檔案消耗的檔案檢測次數按照解壓後檔案個數計算。

  • 經過企業實名認證的阿里雲帳號可以免費試用惡意檔案檢測SDK功能。試用版提供10,000次惡意檔案檢測次數。

  • 購買企業版可以選擇您所需的惡意檔案檢測次數。

計費項目詳細說明,請參見計費概述

開通服務並檢測惡意檔案

前提條件

如果您使用的是RAM使用者,請確保已為RAM使用者授予AliyunYundunSASFullAccess許可權。具體操作,請參見為RAM使用者授權

步驟一:開通服務

Security Center支援通過免費試用和付費購買的方式開通服務。

免費試用

如果您的阿里雲帳號已通過企業認證,您可以通過免費試用開通惡意檔案檢測SDK服務,並擷取1萬次惡意檔案檢測次數。每個阿里雲帳號僅有一次免費試用機會。

  1. 登入Security Center控制台。在控制台左上方,選擇需防護資產所在的地區中國

  2. 在左側導覽列,選擇風險治理 > 惡意檔案檢測SDK

  3. 惡意檔案檢測SDK頁面,單擊立即試用

付費購買

如果免費試用無法滿足需求,您可以通過付費購買的方式開通惡意檔案檢測SDK服務。

重要

如果存在未使用的免費試用次數,付費購買次數後,剩餘的免費試用次數將累計在您付費購買的次數中。

  1. 登入Security Center控制台。在控制台左上方,選擇需防護資產所在的地區中國

  2. 在左側導覽列,選擇風險治理 > 惡意檔案檢測SDK

  3. 惡意檔案檢測SDK頁面,單擊立即購買。在購買面板,選擇惡意檔案檢測SDK,並按照要檢測檔案的數量購買足夠的惡意檔案檢測次數

    如果您已購買Security Center付費版本,您可以直接選擇所需的惡意檔案檢測次數。如未購買,請根據您的需求選擇Security Center版本:

    • 無需使用Security Center的其他安全防護功能時,版本選擇僅採購增值服務

    • 如需使用Security Center的其他功能,例如漏洞修複、容器威脅檢測,請選擇相應的Security Center版本。各版本的功能差異詳情,請參見功能特性

  4. 仔細閱讀並選中服務合約,單擊立即購買並完成支付。

開通惡意檔案檢測SDK服務後,您可以在惡意檔案檢測SDK頁面,查看您的惡意檔案檢測SDK的剩餘檢測次數。如果剩餘檢測次數不足以支撐後續業務需求,您可以單擊升級配置,購買更多的資源。更多說明,請參見升級與降配

image.png

步驟二:檢測惡意檔案

根據您的業務情境,選擇以下方式檢測目標環境內的惡意檔案。

重要

執行惡意檔案檢測前,請確保當前阿里雲帳號下有足夠的剩餘檢測次數。如果剩餘檢測次數不足,您可以在惡意檔案檢測SDK頁面的風險檔案總覽頁簽下,單擊升級配置購買足夠的檢測次數。

在商務服務器中調用SDK檢測離線檔案

準備工作

  • 已配置環境變數ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET

    阿里雲SDK支援通過定義ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET環境變數來建立預設的訪問憑證。調用介面時,程式直接存取憑證,讀取您的存取金鑰(即AccessKey)並自動完成鑒權。更多資訊,請參見配置環境變數

  • 您需要選擇接入方式並參考下表的說明擷取SDK包。

    接入方式

    版本要求

    擷取SDK包

    Java接入

    必須使用1.8或更高版本的JDK。

    您可以通過以下方式擷取Java SDK。

    離線匯入安裝:在連網環境下訪問Java SDK程式碼程式庫並下載Java SDK,將下載的Java SDK添加到專案工程中。

    Python接入

    Python 3.6及以上版本。

    您可以通過以下方式擷取部署SDK包。

    • 使用pip快速安裝(連網環境下推薦):

      pip install -U alibabacloud_filedetect
    • 離線安裝(無互連網串連的環境):在連網環境下訪問Python程式碼程式庫並下載Python SDK。將Python SDK上傳至專案工程環境,解壓壓縮包後,運行以下安裝命令:

      # 切換至python SDK根目錄
      cd alibabacloud-file-detect-python-sdk-master
      # 安裝SDK,注意python的版本
      python setup.py install

範例程式碼

重要

使用Python接入時,請替換下面樣本中的測試案例路徑參數,即path

package com.aliyun.filedetect.sample;

import java.io.File;
import java.util.HashMap;
import java.util.Map;

import com.aliyun.filedetect.*;

public class Sample {

	/**
	 * 同步檢測檔案介面
	 * @param detector 檢測器對象
	 * @param path 待檢測的檔案路徑
	 * @param timeout_ms 設定逾時時間,單位為毫秒
	 * @param wait_if_queuefull 如果檢測隊列滿了,false表示不等待直接返回錯誤,true表示一直等待直到隊列不滿時
	 * @throws InterruptedException 
	 */
	public static DetectResult detectFileSync(OpenAPIDetector detector, String path, int timeout_ms, boolean wait_if_queuefull) throws InterruptedException {
		if (null == detector || null == path) return null;
		DetectResult result = null;
		while(true) {
			result = detector.detectSync(path, timeout_ms);
			if (null == result) break;
			if (result.error_code != ERR_CODE.ERR_DETECT_QUEUE_FULL) break;
			if (!wait_if_queuefull) break;
			detector.waitQueueAvailable(-1);
		}
		return result;
	}
	
	/**
	 * 非同步檢測檔案介面
	 * @param detector 檢測器對象
	 * @param path 待檢測的檔案路徑
	 * @param timeout_ms 設定逾時時間,單位為毫秒
	 * @param wait_if_queuefull 如果檢測隊列滿了,false表示不等待直接返回錯誤,true表示一直等待直到隊列不滿時
	 * @param callback 結果回呼函數
	 * @throws InterruptedException 
	 */
	public static int detectFile(OpenAPIDetector detector, String path, int timeout_ms, boolean wait_if_queuefull, IDetectResultCallback callback) throws InterruptedException {
		if (null == detector || null == path || null == callback) return ERR_CODE.ERR_INIT.value();
		int result = ERR_CODE.ERR_INIT.value();
		if (wait_if_queuefull) {
			final IDetectResultCallback real_callback = callback;
			callback = new IDetectResultCallback() {
				public void onScanResult(int seq, String file_path, DetectResult callback_res) {
					if (callback_res.error_code == ERR_CODE.ERR_DETECT_QUEUE_FULL) return;
					real_callback.onScanResult(seq, file_path, callback_res);
				}
			};
		}
		while(true) {
			result = detector.detect(path, timeout_ms, callback);
			if (result != ERR_CODE.ERR_DETECT_QUEUE_FULL.value()) break;
			if (!wait_if_queuefull) break;
			detector.waitQueueAvailable(-1);
		}
		return result;
	}
	
	/**
	 * 同步檢測URL檔案介面
	 * @param detector 檢測器對象
	 * @param url 待檢測的檔案URL
	 * @param md5 待檢測的檔案md5
	 * @param timeout_ms 設定逾時時間,單位為毫秒
	 * @param wait_if_queuefull 如果檢測隊列滿了,false表示不等待直接返回錯誤,true表示一直等待直到隊列不滿時
	 * @throws InterruptedException 
	 */
	public static DetectResult detectUrlSync(OpenAPIDetector detector, String url, String md5, int timeout_ms, boolean wait_if_queuefull) throws InterruptedException {
		if (null == detector || null == url || null == md5) return null;
		DetectResult result = null;
		while(true) {
			result = detector.detectUrlSync(url, md5, timeout_ms);
			if (null == result) break;
			if (result.error_code != ERR_CODE.ERR_DETECT_QUEUE_FULL) break;
			if (!wait_if_queuefull) break;
			detector.waitQueueAvailable(-1);
		}
		return result;
	}
	
	/**
	 * 非同步檢測URL檔案介面
	 * @param detector 檢測器對象
	 * @param url 待檢測的檔案URL
	 * @param md5 待檢測的檔案md5
	 * @param timeout_ms 設定逾時時間,單位為毫秒
	 * @param wait_if_queuefull 如果檢測隊列滿了,false表示不等待直接返回錯誤,true表示一直等待直到隊列不滿時
	 * @param callback 結果回呼函數
	 * @throws InterruptedException 
	 */
	public static int detectUrl(OpenAPIDetector detector, String url, String md5, int timeout_ms, boolean wait_if_queuefull, IDetectResultCallback callback) throws InterruptedException {
		if (null == detector || null == url || null == md5 || null == callback) return ERR_CODE.ERR_INIT.value();
		int result = ERR_CODE.ERR_INIT.value();
		if (wait_if_queuefull) {
			final IDetectResultCallback real_callback = callback;
			callback = new IDetectResultCallback() {
				public void onScanResult(int seq, String file_path, DetectResult callback_res) {
					if (callback_res.error_code == ERR_CODE.ERR_DETECT_QUEUE_FULL) return;
					real_callback.onScanResult(seq, file_path, callback_res);
				}
			};
		}
		while(true) {
			result = detector.detectUrl(url, md5, timeout_ms, callback);
			if (result != ERR_CODE.ERR_DETECT_QUEUE_FULL.value()) break;
			if (!wait_if_queuefull) break;
			detector.waitQueueAvailable(-1);
		}
		return result;
	}
	
	/**
	 * 格式化檢測結果
	 * @param result 檢測結果對象
	 * @return 格式化後的字串
	 */
	public static String formatDetectResult(DetectResult result) {
		if (result.isSucc()) {
			DetectResult.DetectResultInfo info = result.getDetectResultInfo();
			String msg = String.format("[DETECT RESULT] [SUCCEED] %s", formatDetectResultInfo(info));
			if (info.compresslist != null) {
				int idx = 1;
				for (DetectResult.CompressFileDetectResultInfo comp_res : info.compresslist) {
					msg += String.format("\n\t\t\t [COMPRESS FILE] [IDX:%d] %s", idx++, formatCompressFileDetectResultInfo(comp_res));
				}
			}
			return msg;
		} 
		DetectResult.ErrorInfo info = result.getErrorInfo();
		return String.format("[DETECT RESULT] [FAIL] md5: %s, time: %d, error_code: %s, error_message: %s"
				, info.md5, info.time, info.error_code.name(), info.error_string);
	}
	
	private static String formatDetectResultInfo(DetectResult.DetectResultInfo info) {
		String msg = String.format("MD5: %s, TIME: %d, RESULT: %s, SCORE: %d", info.md5, info.time, info.result.name(), info.score);
		if (info.compresslist != null) {
			msg += String.format(", COMPRESS_FILES: %d", info.compresslist.size());
		}
		DetectResult.VirusInfo vinfo = info.getVirusInfo();
		if (vinfo != null) {
			msg += String.format(", VIRUS_TYPE: %s, EXT_INFO: %s", vinfo.virus_type, vinfo.ext_info);
		}
		return msg;
	}
	private static String formatCompressFileDetectResultInfo(DetectResult.CompressFileDetectResultInfo info) {
		String msg = String.format("PATH: %s, \t\t RESULT: %s, SCORE: %d", info.path, info.result.name(), info.score);
		DetectResult.VirusInfo vinfo = info.getVirusInfo();
		if (vinfo != null) {
			msg += String.format(", VIRUS_TYPE: %s, EXT_INFO: %s", vinfo.virus_type, vinfo.ext_info);
		}
		return msg;
	}
	
	/**
	 * 同步檢測目錄或檔案
	 * @param path 指定路徑,可以是檔案或者目錄。目錄的話就會遞迴遍曆
	 * @param is_sync 是否使用同步介面,推薦使用非同步。 true是同步, false是非同步
	 * @throws InterruptedException 
	 */
	public static void detectDirOrFileSync(OpenAPIDetector detector, String path, int timeout_ms, Map<String, DetectResult> result_map) throws InterruptedException {
		File file = new File(path);
		String abs_path = file.getAbsolutePath();
		if (file.isDirectory()) {
			String[] ss = file.list();
	        if (ss == null) return;
	        for (String s : ss) {
	        	String subpath = abs_path + File.separator + s;
	        	detectDirOrFileSync(detector, subpath, timeout_ms, result_map);
	        }
			return;
		}

    	System.out.println(String.format("[detectFileSync] [BEGIN] queueSize: %d, path: %s, timeout: %d", detector.getQueueSize(), abs_path, timeout_ms));
		DetectResult res = detectFileSync(detector, abs_path, timeout_ms, true);
    	System.err.println(String.format("                 [ END ] %s", formatDetectResult(res)));
		result_map.put(abs_path, res);
	}
	
	/**
	 * 非同步檢測目錄或檔案
	 * @param path 指定路徑,可以是檔案或者目錄。目錄的話就會遞迴遍曆
	 * @param is_sync 是否使用同步介面,推薦使用非同步。 true是同步, false是非同步
	 * @throws InterruptedException 
	 */
	public static void detectDirOrFile(OpenAPIDetector detector, String path, int timeout_ms, IDetectResultCallback callback) throws InterruptedException {
		File file = new File(path);
		String abs_path = file.getAbsolutePath();
		if (file.isDirectory()) {
			String[] ss = file.list();
	        if (ss == null) return;
	        for (String s : ss) {
	        	String subpath = abs_path + File.separator + s;
	        	detectDirOrFile(detector, subpath, timeout_ms, callback);
	        }
	        return;
		}

    	
		int seq = detectFile(detector, abs_path, timeout_ms, true, callback);
		System.out.println(String.format("[detectFile] [BEGIN] seq: %d, queueSize: %d, path: %s, timeout: %d", seq, detector.getQueueSize(), abs_path, timeout_ms));
	}
	
	/**
	 * 開始對檔案或目錄進行
	 * @param path 指定路徑,可以是檔案或者目錄。目錄的話就會遞迴遍曆
	 * @param is_sync 是否使用同步介面,推薦使用非同步。 true是同步, false是非同步
	 * @throws InterruptedException 
	 */
	public static void scan(final OpenAPIDetector detector, String path, int detect_timeout_ms, boolean is_sync) throws InterruptedException {
		System.out.println(String.format("[SCAN] [START] path: %s, detect_timeout_ms: %d, is_sync: %b", path, detect_timeout_ms, is_sync));
		long start_time = System.currentTimeMillis();
		final Map<String, DetectResult> result_map = new HashMap<>();
		if (is_sync) {
			detectDirOrFileSync(detector, path, detect_timeout_ms, result_map);
		} else {
			detectDirOrFile(detector, path, detect_timeout_ms, new IDetectResultCallback() {
				public void onScanResult(int seq, String file_path, DetectResult callback_res) {
			    	System.err.println(String.format("[detectFile] [ END ] seq: %d, queueSize: %d, %s", seq, detector.getQueueSize(), formatDetectResult(callback_res)));
					result_map.put(file_path, callback_res);
				}
			});
			// 等待任務執行完成
			detector.waitQueueEmpty(-1);
		}
		long used_time = System.currentTimeMillis() - start_time;
		System.out.println(String.format("[SCAN] [ END ] used_time: %d, files: %d", used_time, result_map.size()));
		
		int fail_count = 0;
		int white_count = 0;
		int black_count = 0;
		for (Map.Entry<String, DetectResult> entry : result_map.entrySet()) {
			DetectResult res = entry.getValue();
			if (res.isSucc()) {
				if (res.getDetectResultInfo().result == DetectResult.RESULT.RES_BLACK) {
					black_count ++;
				} else {
					white_count ++;
				}
			} else {
				fail_count ++;
			}
		}
		System.out.println(String.format("             fail_count: %d, white_count: %d, black_count: %d"
				, fail_count, white_count, black_count));
	}
	
    public static void main(String[] args_) throws Exception {
    	// 擷取檢測器執行個體
    	OpenAPIDetector detector = OpenAPIDetector.getInstance();
    	
    	// 初始化
    	ERR_CODE init_ret = detector.init(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
    	System.out.println("INIT RET: " + init_ret.name());
    	
    	// 設定解壓縮參數(可選,預設不解壓壓縮包)
    	boolean decompress = true; // 是否識別壓縮檔並解壓,預設為false
    	int decompressMaxLayer = 5; // 最大解壓層數,decompress參數為true時生效
    	int decompressMaxFileCount = 1000; // 最大解壓檔案數,decompress參數為true時生效
    	ERR_CODE initdec_ret = detector.initDecompress(decompress, decompressMaxLayer, decompressMaxFileCount);
    	System.out.println("INIT_DECOMPRESS RET: " + initdec_ret.name());
    	
    	if (true) {
    		// 樣本用法1:掃描本地目錄或檔案
    		boolean is_sync_scan = false; // 是非同步檢測還是同步檢測。非同步檢測效能更好。false表示非同步檢測
        	int timeout_ms = 500000;  // 單個樣本檢測時間,單位為毫秒
        	String path = "test2.php"; // 待掃描的檔案或目錄
        	// 啟動掃描,直到掃描結束
        	scan(detector, path, timeout_ms, is_sync_scan);
    	}
    	
    	if (true) {
    		// 樣本用法2:掃描URL檔案
        	int timeout_ms = 500000;  // 單個樣本檢測時間,單位為毫秒
        	String url = "https://xxxxxxxx.oss-cn-hangzhou-1.aliyuncs.com/xxxxx/xxxxxxxxxxxxxx?Expires=1*****25&OSSAccessKeyId=xxx"; // 待掃描的URL檔案
        	String md5 = "a767f*************6e21d000000"; // 待掃描的檔案MD5
        	// 同步掃描。如果需要非同步掃描,調用detectUrl介面
        	System.out.println(String.format("[detectUrlSync] [BEGIN] URL: %s, MD5: %s, TIMEOUT: %d", url, md5, timeout_ms));
        	DetectResult result = detectUrlSync(detector, url, md5, timeout_ms, true);
        	System.err.println(String.format("[detectUrlSync] [ END ] %s", formatDetectResult(result)));
    	}
    	
    	
		// 反初始化
		System.out.println("Over.");
    	detector.uninit();
    }
}
# -*- coding: utf-8 -*-
import os
import sys
from typing import List
import threading
import time
import traceback

from alibabacloud_filedetect.OpenAPIDetector import OpenAPIDetector
from alibabacloud_filedetect.IDetectResultCallback import IDetectResultCallback
from alibabacloud_filedetect.ERR_CODE import ERR_CODE
from alibabacloud_filedetect.DetectResult import DetectResult

class Sample(object):
    
    def __init__(self):
        pass


    """
    同步檢測檔案介面
    @param detector 檢測器對象
    @param path 待檢測的檔案路徑
    @param timeout_ms 設定逾時時間,單位為毫秒
    @param wait_if_queuefull 如果檢測隊列滿了,False表示不等待直接返回錯誤,True表示一直等待直到隊列不滿時
    """
    def detectFileSync(self, detector, path, timeout_ms, wait_if_queuefull):
        if detector is None or path is None:
            return None
        result = None
        while True:
            result = detector.detectSync(path, timeout_ms)
            if result is None:
                break
            if result.error_code != ERR_CODE.ERR_DETECT_QUEUE_FULL:
                break
            if wait_if_queuefull is False:
                break
            detector.waitQueueAvailable(-1)
        return result


    """
    非同步檢測檔案介面
    @param detector 檢測器對象
    @param path 待檢測的檔案路徑
    @param timeout_ms 設定逾時時間,單位為毫秒
    @param wait_if_queuefull 如果檢測隊列滿了,False表示不等待直接返回錯誤,True表示一直等待直到隊列不滿時
    @param callback 結果回呼函數
    """
    def detectFile(self, detector, path, timeout_ms, wait_if_queuefull, callback):
        if detector is None or path is None or callback is None:
            return ERR_CODE.ERR_INIT.value
        result = ERR_CODE.ERR_INIT.value
        if wait_if_queuefull:
            real_callback = callback
            class AsyncTaskCallback(IDetectResultCallback):
                def onScanResult(self, seq, file_path, callback_res):
                    if callback_res.error_code == ERR_CODE.ERR_DETECT_QUEUE_FULL:
                        return
                    real_callback.onScanResult(seq, file_path, callback_res)
            callback = AsyncTaskCallback()
        while True:
            result = detector.detect(path, timeout_ms, callback)
            if result != ERR_CODE.ERR_DETECT_QUEUE_FULL.value:
                break
            if wait_if_queuefull is False:
                break
            detector.waitQueueAvailable(-1)
        return result
    

    """
    同步檢測URL檔案介面
    @param detector 檢測器對象
	@param url 待檢測的檔案URL
	@param md5 待檢測的檔案md5
	@param timeout_ms 設定逾時時間,單位為毫秒
	@param wait_if_queuefull 如果檢測隊列滿了,false表示不等待直接返回錯誤,true表示一直等待直到隊列不滿時
    """
    def detectUrlSync(self, detector, url, md5, timeout_ms, wait_if_queuefull):
        if detector is None or url is None or md5 is None:
            return None
        result = None
        while True:
            result = detector.detectUrlSync(url, md5, timeout_ms)
            if result is None:
                break
            if result.error_code != ERR_CODE.ERR_DETECT_QUEUE_FULL:
                break
            if wait_if_queuefull is False:
                break
            detector.waitQueueAvailable(-1)
        return result


    """
    非同步檢測URL檔案介面
	@param detector 檢測器對象
	@param url 待檢測的檔案URL
	@param md5 待檢測的檔案md5
	@param timeout_ms 設定逾時時間,單位為毫秒
	@param wait_if_queuefull 如果檢測隊列滿了,false表示不等待直接返回錯誤,true表示一直等待直到隊列不滿時
	@param callback 結果回呼函數
    """
    def detectUrl(self, detector, url, md5, timeout_ms, wait_if_queuefull, callback):
        if detector is None or url is None or md5 is None or callback is None:
            return ERR_CODE.ERR_INIT.value
        result = ERR_CODE.ERR_INIT.value
        if wait_if_queuefull:
            real_callback = callback
            class AsyncTaskCallback(IDetectResultCallback):
                def onScanResult(self, seq, file_path, callback_res):
                    if callback_res.error_code == ERR_CODE.ERR_DETECT_QUEUE_FULL:
                        return
                    real_callback.onScanResult(seq, file_path, callback_res)
            callback = AsyncTaskCallback()
        while True:
            result = detector.detectUrl(url, md5, timeout_ms, callback)
            if result != ERR_CODE.ERR_DETECT_QUEUE_FULL.value:
                break
            if wait_if_queuefull is False:
                break
            detector.waitQueueAvailable(-1)
        return result


    """
    格式化檢測結果
    @param result 檢測結果對象
    @return 格式化後的字串
    """
    @staticmethod
    def formatDetectResult(result):
        msg = ""
        if result.isSucc():
            info = result.getDetectResultInfo()
            msg = "[DETECT RESULT] [SUCCEED] {}".format(Sample.formatDetectResultInfo(info))
            if info.compresslist is not None:
                idx = 1
                for comp_res in info.compresslist:
                    msg += "\n\t\t\t [COMPRESS FILE] [IDX:{}] {}".format(idx, Sample.formatCompressFileDetectResultInfo(comp_res))
                    idx += 1
        else:
            info = result.getErrorInfo()
            msg = "[DETECT RESULT] [FAIL] md5: {}, time: {}, error_code: {}, error_message: {}".format(info.md5,
                info.time, info.error_code.name, info.error_string)
        return msg


    @staticmethod
    def formatDetectResultInfo(info):
        msg = "MD5: {}, TIME: {}, RESULT: {}, SCORE: {}".format(info.md5, info.time, info.result.name, info.score)
        if info.compresslist is not None:
            msg += ", COMPRESS_FILES: {}".format(len(info.compresslist))
        vinfo = info.getVirusInfo()
        if vinfo is not None:
            msg += ", VIRUS_TYPE: {}, EXT_INFO: {}".format(vinfo.virus_type, vinfo.ext_info)
        return msg


    @staticmethod
    def formatCompressFileDetectResultInfo(info):
        msg = "PATH: {}, \t\t RESULT: {}, SCORE: {}".format(info.path, info.result.name, info.score)
        vinfo = info.getVirusInfo()
        if vinfo is not None:
            msg += ", VIRUS_TYPE: {}, EXT_INFO: {}".format(vinfo.virus_type, vinfo.ext_info)
        return msg


    """
    同步檢測目錄或檔案
    @param path 指定路徑,可以是檔案或者目錄。目錄的話就會遞迴遍曆
    @param is_sync 是否使用同步介面,推薦使用非同步。 True是同步,False是非同步
    """
    def detectDirOrFileSync(self, detector, path, timeout_ms, result_map):
        abs_path = os.path.abspath(path)
        if os.path.isdir(abs_path):
            sub_files = os.listdir(abs_path)
            if len(sub_files) == 0:
                return
            for sub_file in sub_files:
                sub_path = os.path.join(abs_path, sub_file)
                self.detectDirOrFileSync(detector, sub_path, timeout_ms, result_map)
            return
        
        print("[detectFileSync] [BEGIN] queueSize: {}, path: {}, timeout: {}".format(
            detector.getQueueSize(), abs_path, timeout_ms))
        res = self.detectFileSync(detector, abs_path, timeout_ms, True)
        print("                 [ END ] {}".format(Sample.formatDetectResult(res)))
        result_map[abs_path] = res
        return


    """
    非同步檢測目錄或檔案
    @param path 指定路徑,可以是檔案或者目錄。目錄的話就會遞迴遍曆
    @param is_sync 是否使用同步介面,推薦使用非同步。True是同步, False是非同步
    """
    def detectDirOrFile(self, detector, path, timeout_ms, callback):
        abs_path = os.path.abspath(path)
        if os.path.isdir(abs_path):
            sub_files = os.listdir(abs_path)
            if len(sub_files) == 0:
                return
            for sub_file in sub_files:
                sub_path = os.path.join(abs_path, sub_file)
                self.detectDirOrFile(detector, sub_path, timeout_ms, callback)
            return
        
        seq = self.detectFile(detector, abs_path, timeout_ms, True, callback)
        print("[detectFile] [BEGIN] seq: {}, queueSize: {}, path: {}, timeout: {}".format(
            seq, detector.getQueueSize(), abs_path, timeout_ms))   
        return

    
    """
    開始對檔案或目錄進行檢測
    @param path 指定路徑,可以是檔案或者目錄。目錄的話就會遞迴遍曆
    @param is_sync 是否使用同步介面,推薦使用非同步。 True是同步,False是非同步
    """
    def scan(self, detector, path, detect_timeout_ms, is_sync):
        try:
            print("[SCAN] [START] path: {}, detect_timeout_ms: {}, is_sync: {}".format(path, detect_timeout_ms, is_sync))
            start_time = time.time()
            result_map = {}
            if is_sync:
                self.detectDirOrFileSync(detector, path, detect_timeout_ms, result_map)
            else:
                class AsyncTaskCallback(IDetectResultCallback):
                    def onScanResult(self, seq, file_path, callback_res):
                        print("[detectFile] [ END ] seq: {}, queueSize: {}, {}".format(seq,
                            detector.getQueueSize(), Sample.formatDetectResult(callback_res)))
                        result_map[file_path] = callback_res
                self.detectDirOrFile(detector, path, detect_timeout_ms, AsyncTaskCallback())
                # 等待任務執行完成
                detector.waitQueueEmpty(-1)
            
            used_time_ms = (time.time() - start_time) * 1000 
            print("[SCAN] [ END ] used_time: {}, files: {}".format(int(used_time_ms), len(result_map)))
            
            failed_count = 0
            white_count = 0
            black_count = 0
            for file_path, res in result_map.items():
                if res.isSucc():
                    if res.getDetectResultInfo().result == DetectResult.RESULT.RES_BLACK:
                        black_count += 1
                    else:
                        white_count += 1
                else:
                    failed_count += 1
            
            print("               fail_count: {}, white_count: {}, black_count: {}".format(
                failed_count, white_count, black_count))

        except Exception as e:
            print(traceback.format_exc(), file=sys.stderr)


    def main(self):

        # 擷取檢測器執行個體
        detector = OpenAPIDetector.get_instance()

        # 讀取環境變數中的Access Key ID和Access Key Secret
        access_key_id = os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID')
        access_key_secret = os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET')

        # 初始化
        init_ret = detector.init(access_key_id, access_key_secret)
        print("INIT RET: {}".format(init_ret.name))

        # 設定解壓縮參數(可選,預設不解壓壓縮包)
        decompress = True # 是否識別壓縮檔並解壓,預設為false
        decompressMaxLayer = 5 # 最大解壓層數,decompress參數為true時生效
        decompressMaxFileCount = 1000 # 最大解壓檔案數,decompress參數為true時生效
        initdec_ret = detector.initDecompress(decompress, decompressMaxLayer, decompressMaxFileCount)
        print("INIT_DECOMPRESS RET: {}".format(initdec_ret.name))

        if True:
            # 樣本用法1:掃描本地目錄或檔案
            is_sync_scan = False # 是非同步檢測還是同步檢測。非同步檢測效能更好。False表示非同步檢測
            timeout_ms = 500000 # 單個樣本檢測時間,單位為毫秒
            path = "test.bin" # 待掃描的檔案或目錄
            # 啟動掃描,直到掃描結束
            self.scan(detector, path, timeout_ms, is_sync_scan)

        if True:
            # 樣本用法2:掃描URL檔案
            timeout_ms = 500000
            url = "https://xxxxxxxx.oss-cn-hangzhou-1.aliyuncs.com/xxxxx/xxxxxxxxxxxxxx?Expires=1671448125&OSSAccessKeyId=xxx" # 待掃描的URL檔案
            md5 = "a767ffc59d93125c7505b6e21d000000"
            # 同步掃描。如果需要非同步掃描,調用detectUrl介面
            print("[detectUrlSync] [BEGIN] URL: {}, MD5: {}, TIMEOUT: {}".format(url, md5, timeout_ms))
            result = self.detectUrlSync(detector, url, md5, timeout_ms, True)
            print("[detectUrlSync] [ END ] {}".format(Sample.formatDetectResult(result)))

        # 反初始化
        print("Over.")
        detector.uninit()
        

if __name__ == "__main__":
    sample = Sample()
    sample.main()

返回結果

調用SDK進行惡意檔案檢測後,只能在程式的運行結果中查看檢測結果,檢測結果不會同步到Security Center控制台中。控制台中僅支援查看剩餘檢測次數。風險檔案總覽頁面的統計資料,例如檢測檔案總數等,為OSS檔案檢測的結果。

struct DetectResult {
    std::string md5; // 樣本md5
    long time = 0; // 用時,單位為毫秒

    ERR_CODE error_code;    // 錯誤碼
    std::string error_string;     // 擴充錯誤資訊

    enum RESULT {
        RES_WHITE = 0,       //安全檔案
        RES_BLACK = 1,       //可疑檔案
        RES_PENDING = 3           //檢測中
     };
    RESULT result;          //檢測結果

    int score;                //檢測分值,取值範圍0~100
    std::string virus_type;    //病毒類型,如“WebShell/MalScript/Hacktool”
    std::string ext_info;    //擴充資訊為JSON字串

    struct CompressFileDetectResultInfo {
        std::string path; // 壓縮檔路徑
        RESULT result; // 檢測結果
        int score;	// 分值,取值範圍0-100
        std::string virus_type;    //病毒類型,如“WebShell/MalScript/Hacktool”
        std::string ext_info;    //擴充資訊為JSON字串
	};
    std::list<struct CompressFileDetectResultInfo> compresslist = null; // 如果是壓縮包,並且開啟了壓縮包解壓參數,則此處會輸出壓縮包內檔案檢測結果
    
};

錯誤碼說明如下:

enum ERR_CODE {
    ERR_INIT = -100,            	// 需要初始化,或者重複初始化
    ERR_FILE_NOT_FOUND = -99,  		// 檔案未找到
    ERR_DETECT_QUEUE_FULL = -98, 	// 檢測隊列滿
    ERR_CALL_API = -97, 			// 調用API錯誤
    ERR_TIMEOUT = -96, 				// 逾時
    ERR_UPLOAD = -95, 				//檔案上傳失敗;使用者可重新發起檢測,再次嘗試
    ERR_ABORT = -94, 				//程式退出,樣本未得到檢測
    ERR_TIMEOUT_QUEUE = -93, 		//隊列逾時,使用者發起檢測頻率過高或逾時時間過短
	ERR_MD5 = -92, 					// MD5格式不對
	ERR_URL = -91, 					// URL格式不對
    
    ERR_SUCC = 0              		// 成功
};

檢測分數越高,檔案可能存在的風險越高。檢測分數和危險等級對應表如下:

分數區間

危險等級

0~60

安全

61~70

風險(低危)

71~80

可疑(中危)

81~100

惡意(高危)

在Security Center控制台檢測OSS中儲存的檔案

  1. 登入Security Center控制台。在控制台左上方,選擇需防護資產所在的地區中國

  2. 在左側導覽列,選擇風險治理 > 惡意檔案檢測SDK

  3. 單擊OSS檔案檢測頁簽,選擇合適的檢測方式並執行檢測。

    如果您的Bucket沒有在OSS檔案檢測的列表中,您可以單擊同步Bucket,同步最新的Bucket列表。

    檢測方式

    說明

    操作步驟

    手動全量檢測

    檢測單個或多個Bucket內的所有檔案。

    1. OSS檔案檢測頁簽,單擊單個Bucke操作列的檢測或選中多個Bucket後單擊批量檢測

    2. 檢測對話方塊,指定需要檢測檔案的類型、檢測檔案範圍、掃描路徑等。

      • 解壓層級:如果選擇檢測的檔案類型包含壓縮類型,可以設定解壓層級(最多5層,支援不解壓)和單個壓縮包解壓檔案數量限制(最多1000個)。

      • 檔案解密類型:預設不解密,如果OSS檔案配置了服務端加密(SSE-KMS、SSE-OSS),可以設定解密後再檢測。解密類型OSS對應SSE-OSS加密的OSS檔案,解密類型KMS對應SSE-KMS加密的OSS檔案。

      • 檢測檔案範圍:可選。設定檢測範圍的時間上限。設定後會檢測檔案更新時間晚於該時間的檔案。

      • 掃描路徑:支援選擇按首碼匹配(輸入檔案名稱首碼來匹配掃描指定檔案)或配置到整個Bucket(掃描整個Bucket檔案)。

    3. 單擊確定

    手動增量檢測

    針對已檢測且上次檢測後有檔案更新的Bucket,提供只檢測未檢測過的檔案的功能。

    1. OSS檔案檢測頁簽,單擊目標Bucke操作列的增量檢測

    2. 增量檢測對話方塊中,指定需要檢測檔案的類型、解壓層級(壓縮類型檔案支援)、檔案解密類型、檢測檔案範圍、掃描路徑(支援按首碼匹配配置到整個Bucket)。

    3. 單擊確定

    自動檢測

    通過配置掃描策略可為指定Bucket開啟周期性自動檢測。配置前建議瞭解以下資訊:

    • 一個Bucket只能在一個策略中生效。

    • 自動檢測僅會檢測OSS新增的檔案,不會重複資料偵測同一個檔案。

    1. OSS檔案檢測頁簽,單擊策略管理地區的策略配置

    2. 策略管理面板,單擊新增策略

      如果已配置策略的檢測方案符合檢測要求,您可以單擊該策略操作列的編輯,將目標Bucket添加到該策略的生效範圍內。

    3. 策略建立面板配置策略名稱策略啟用狀態(預設啟用)、掃描路徑(支援按首碼匹配配置到整個Bucket)、檢測檔案範圍附表檔案檢測時間檔案檢測類型解壓層級(壓縮類型檔案支援)、檔案解密類型生效Bucket等。

    4. 單擊確定。

可選:配置DingTalk機器人通知

您可以在系統配置中添加DingTalk機器人通知,通過DingTalk群即時接收Security Center檢測的惡意檔案警示資訊。具體操作,請參見配置DingTalk機器人通知

image

查看和匯出檢測結果

風險檔案總覽

風險檔案總覽頁簽:

  • 查看風險檔案統計資訊

    image

    • 不同等級(高危中危低危)風險通過不同顏色顯示:高危(紅色)中危(橙色)低危(灰色)

    • 在風險檔案清單中,可以根據檢測情境列來選擇查看調用SDK(API)和在控制台(OSS)檢測的惡意檔案資訊,也可以單擊列表左上方的搜尋項的下拉框,根據風險等級檔案名稱威脅標籤MD5最新檢測時間來查看目標惡意檔案詳情。

      如果是壓縮包檔案,可單擊檔案前的展開image表徵圖,展示該壓縮包下的風險檔案清單,該壓縮包檔案的風險等級為其下風險檔案中的最高風險等級。

  • 查看目標風險檔案詳情

    • 在風險檔案清單中,單擊目標風險檔案操作列的詳情,在該檔案的詳情面板,查看檢測出的存在風險的檔案詳情事件說明(包含惡意檔案說明和處置建議)等資訊

      在壓縮包檔案的風險詳情面板,還可查看該壓縮包中的風險檔案數、解壓後檢測的檔案總數,以及風險檔案清單等資訊。

      image

  • 匯出所有風險檔案清單

    單擊image..png表徵圖,等待檔案匯出完成後,在提示框中單擊下載

OSS檔案檢測

OSS檔案檢測頁簽:

  • 從OSS Bucket維度查看風險檔案統計資訊

    不同等級(高危中危低危)風險通過不同顏色顯示:高危(紅色)中危(橙色)低危(灰色)

    image

  • 查看目標OSS Bucket的檢測詳情

    在Bucket列表中,單擊目標Bucket操作列的詳情,查看Bucket的基本資料和風險檔案詳情。

    在壓縮包的檔案檢測詳情頁面,還可以查看設定檔解壓情況、解壓後的檔案總數、檢測的檔案數量以及已掃描、未掃描的風險檔案清單。

    image

  • 按Bucket維度匯出所有風險檔案清單

    OSS檔案檢測頁簽,單擊image..png表徵圖,等待檔案匯出完成後,在提示框中單擊下載

處理風險檔案的警示事件

對於檢測出的風險檔案,需要您根據風險等級以及Security Center提供的事件說明和處置建議,進行確認和手動修複。

  • 高危風險事件:誤判的機率較低,同時惡意性較強,建議及時處理。

  • 中危和低危風險事件:該等級風險檔案多出現在業務情境中,需要您核實對業務的影響後處理。

    如果您確認是業務的正常檔案或是誤判,您可以忽略相關警示事件。如果您在業務程式中調用SDK檢測檔案,對於正常或誤判的檔案,您也可以在業務程式中添加忽略或加入白名單的處理邏輯。

相關API

您可以通過調用API介面方式使用惡意檔案檢測功能。具體使用方法如下: