惡意檔案檢測SDK功能依託Security Center多引擎檢測平台,可識別離線檔案和阿里雲OSS檔案中存在的常見病毒,例如勒索病毒、挖礦程式等,防止惡意檔案傳播和執行。本文介紹如何使用惡意檔案檢測SDK功能。
功能說明
惡意檔案檢測SDK為雲端檢測方案,使用該功能會將您的檔案上傳到雲端進行檢測。
檢測的病毒類型
支援檢測的病毒類型(virus_type)中英文映射表
檢測檔案說明
支援對未加密的壓縮包解壓並進行檢測。
在調用SDK檢測離線檔案情境下,預設不解壓,需要您自行配置解壓,掃描時可以設定是否識別壓縮檔並解壓、最大解壓層級和最大解壓檔案數。
在OSS檔案檢測情境下,預設不解壓,需要您自行配置解壓,支援全域和單個Bucket粒度下配置壓縮包的解壓層級。
支援對已佈建服務端加密的OSS資料進行解密後檢測。OSS針對不同使用情境提供了以下伺服器端加密方式,詳細內容,請參見伺服器端加密。
使用KMS託管密鑰進行加解密(SSE-KMS):使用KMS託管的預設CMK(Customer Master Key)或指定CMK進行加解密操作。資料無需通過網路發送到KMS服務端進行加解密。
使用OSS完全託管密鑰進行加解密(SSE-OSS):使用OSS完全託管的祕密金鑰加密每個Object。
檢測惡意檔案的方式
在商務服務器中調用SDK檢測離線檔案
在您的業務程式中接入Security Center的惡意檔案檢測SDK,通過調用SDK的方式檢測惡意檔案,在返回結果中擷取惡意檔案資訊,且支援在Security Center控制台查看存在風險的檔案檢測結果。
支援通過Java或Python方式接入。
在Security Center控制台檢測OSS中儲存的檔案
如果您待檢測的檔案儲存體在阿里雲Object Storage Service Bucket中,可以直接在Security Center控制台執行對目標OSS Bucket內檔案的檢測,並支援查看存在風險的檔案清單。
提供檢測結果
Security Center通過整合多個知名病毒檢測引擎的結果,並結合安全專家的經驗,基於檔案潛在惡意程度和檢測準確程度兩個關鍵維度,評定檢測出惡意檔案事件的風險等級(高危、中危、低危),並給出相應惡意檔案說明和處置建議。
日誌分析
如果已開通Security Center日誌分析服務,會將惡意檔案檢測記錄投遞到Security Center的專屬Logstore(日誌庫)中。詳細說明,請參見日誌分析說明和惡意檔案檢測欄位說明。
應用情境
應用情境 | 惡意檔案檢測說明 |
伺服器應用情境 | 在伺服器環境中,會常遇到各種大規模傳播的惡意檔案,例如蠕蟲、挖礦軟體、DDoS木馬以及惡意指令碼等。 這些惡意檔案通常具備自我複製和廣泛傳播的特性,旨在消耗系統資源、發起分散式阻斷服務攻擊或控制伺服器以實施非法活動。 |
伺服器定向攻擊情境 | 在伺服器定向攻擊情境中,為了確保系統的安全和穩定運行,需要特別關注駭客工具、代理工具及後門程式等惡意檔案。 這類攻擊突出隱蔽性和針對性,駭客通過植入此類惡意檔案來竊取敏感資訊、操控系統或作為跳板進一步滲透網路。 |
全系統內容檢測 | 在各種環境中,都應密切關注破壞力強的勒索病毒和感染型病毒。 勒索病毒通過加密使用者資料勒索贖金,感染型病毒能自我複製並擴散至其他檔案,兩者均能造成嚴重的資料損失和系統癱瘓。 |
辦公網與檔案儲存體環境檢測 | 在辦公網路和檔案儲存體環境中,特別需要注意防範惡意文檔類檔案(例如含有巨集病毒的Office檔案、帶有惡意負載的壓縮包等)的潛在威脅。 這些檔案通常通過偽裝成日常工作交流中的正常文檔,誘導使用者開啟從而實施攻擊。例如竊取登入憑據、部署遠端存取木馬等。 |
使用限制
在OSS檔案檢測情境下,一次惡意檔案檢測僅可以檢測一個不超過300 MB的檔案。
在調用SDK檢測情境下,一次惡意檔案檢測僅可以檢測一個不超過100 MB的檔案。
試用版和企業版的預設介面請求頻率不同。
試用版:10次/秒。
企業版:20次/秒。
支援檢測的壓縮包檔案類型有
.7z
、.zip
、.tar
、.gz
、.rar
、.ar
、.bz2
、.xz
和.lzma
。一個壓縮包支援解壓的層級最多為5層,解壓後的檔案總個數最多為1,000個,解壓後的所有檔案總大小最多為1 GB。對於超出限制範圍的檔案,不會被檢測。
惡意檔案的檢測速度會受到網速、電腦效能、雲產品限制等方面的影響。惡意檔案檢測SDK內部採用隊列的方式,緩解外部峰值時的請求來提高並發率。當內部的隊列滿時,拒絕外部繼續提交請求,並使用等待介面,等待隊列有空間可用時再處理外部請求。
您可以通過增加隊列長度來提高並發量,該方法會影響部分樣本的檢測時間長度。
timeout_ms
參數為樣本逾時時間,單位為毫秒。為了減少逾時,建議您將timeout_ms
參數設定為60,000毫秒(即60秒)。在OSS檔案檢測情境下,僅支援儲存類型為標準儲存和低頻訪問的檔案檢測,不支援Archive Storage類型的檔案檢測。儲存類型詳細內容,請參見儲存類型。
在OSS檔案檢測情境下,支援檢測的OSS Bucket的所屬地區包括:華北1(青島)、華北2(北京)、華北3(張家口)、華北5(呼和浩特)、華東1(杭州)、華東2(上海)、華南1(深圳)、華南2(河源)、華南3(廣州)、西南1(成都)、中國香港、新加坡、印尼(雅加達)、泰國(曼穀)、菲律賓(馬尼拉)、馬來西亞(吉隆坡)、韓國(首爾)、日本(東京)、美國(矽谷)、英國(倫敦)、美國(維吉尼亞)、德國(法蘭克福)。
計費說明
使用惡意檔案檢測SDK功能會消耗檔案檢測次數。如果是壓縮包檔案,每個壓縮包檔案消耗的檔案檢測次數按照解壓後檔案個數計算。
經過企業實名認證的阿里雲帳號可以免費試用惡意檔案檢測SDK功能。試用版提供10,000次惡意檔案檢測次數。
購買企業版可以選擇您所需的惡意檔案檢測次數。
計費項目詳細說明,請參見計費概述。
開通服務並檢測惡意檔案
前提條件
如果您使用的是RAM使用者,請確保已為RAM使用者授予AliyunYundunSASFullAccess許可權。具體操作,請參見為RAM使用者授權。
步驟一:開通服務
Security Center支援通過免費試用和付費購買的方式開通服務。
免費試用
如果您的阿里雲帳號已通過企業認證,您可以通過免費試用開通惡意檔案檢測SDK服務,並擷取1萬次惡意檔案檢測次數。每個阿里雲帳號僅有一次免費試用機會。
登入Security Center控制台。在控制台左上方,選擇需防護資產所在的地區中國。
在左側導覽列,選擇 。
在惡意檔案檢測SDK頁面,單擊立即試用。
付費購買
如果免費試用無法滿足需求,您可以通過付費購買的方式開通惡意檔案檢測SDK服務。
如果存在未使用的免費試用次數,付費購買次數後,剩餘的免費試用次數將累計在您付費購買的次數中。
登入Security Center控制台。在控制台左上方,選擇需防護資產所在的地區中國。
在左側導覽列,選擇 。
在惡意檔案檢測SDK頁面,單擊立即購買。在購買面板,選擇惡意檔案檢測SDK為是,並按照要檢測檔案的數量購買足夠的惡意檔案檢測次數。
如果您已購買Security Center付費版本,您可以直接選擇所需的惡意檔案檢測次數。如未購買,請根據您的需求選擇Security Center版本:
無需使用Security Center的其他安全防護功能時,版本選擇僅採購增值服務。
如需使用Security Center的其他功能,例如漏洞修複、容器威脅檢測,請選擇相應的Security Center版本。各版本的功能差異詳情,請參見功能特性。
仔細閱讀並選中服務合約,單擊立即購買並完成支付。
開通惡意檔案檢測SDK服務後,您可以在惡意檔案檢測SDK頁面,查看您的惡意檔案檢測SDK的剩餘檢測次數。如果剩餘檢測次數不足以支撐後續業務需求,您可以單擊升級配置,購買更多的資源。更多說明,請參見升級與降配。
步驟二:檢測惡意檔案
根據您的業務情境,選擇以下方式檢測目標環境內的惡意檔案。
執行惡意檔案檢測前,請確保當前阿里雲帳號下有足夠的剩餘檢測次數。如果剩餘檢測次數不足,您可以在惡意檔案檢測SDK頁面的風險檔案總覽頁簽下,單擊升級配置購買足夠的檢測次數。
在商務服務器中調用SDK檢測離線檔案
準備工作
已配置環境變數
ALIBABA_CLOUD_ACCESS_KEY_ID
和ALIBABA_CLOUD_ACCESS_KEY_SECRET
。阿里雲SDK支援通過定義
ALIBABA_CLOUD_ACCESS_KEY_ID
和ALIBABA_CLOUD_ACCESS_KEY_SECRET
環境變數來建立預設的訪問憑證。調用介面時,程式直接存取憑證,讀取您的存取金鑰(即AccessKey)並自動完成鑒權。更多資訊,請參見配置環境變數。您需要選擇接入方式並參考下表的說明擷取SDK包。
接入方式
版本要求
擷取SDK包
Java接入
必須使用1.8或更高版本的JDK。
您可以通過以下方式擷取Java SDK。
離線匯入安裝:在連網環境下訪問Java SDK程式碼程式庫並下載Java SDK,將下載的Java SDK添加到專案工程中。
Python接入
Python 3.6及以上版本。
您可以通過以下方式擷取部署SDK包。
使用pip快速安裝(連網環境下推薦):
pip install -U alibabacloud_filedetect
離線安裝(無互連網串連的環境):在連網環境下訪問Python程式碼程式庫並下載Python SDK。將Python SDK上傳至專案工程環境,解壓壓縮包後,運行以下安裝命令:
# 切換至python SDK根目錄 cd alibabacloud-file-detect-python-sdk-master # 安裝SDK,注意python的版本 python setup.py install
範例程式碼
使用Python接入時,請替換下面樣本中的測試案例路徑參數,即path
。
package com.aliyun.filedetect.sample;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import com.aliyun.filedetect.*;
public class Sample {
/**
* 同步檢測檔案介面
* @param detector 檢測器對象
* @param path 待檢測的檔案路徑
* @param timeout_ms 設定逾時時間,單位為毫秒
* @param wait_if_queuefull 如果檢測隊列滿了,false表示不等待直接返回錯誤,true表示一直等待直到隊列不滿時
* @throws InterruptedException
*/
public static DetectResult detectFileSync(OpenAPIDetector detector, String path, int timeout_ms, boolean wait_if_queuefull) throws InterruptedException {
if (null == detector || null == path) return null;
DetectResult result = null;
while(true) {
result = detector.detectSync(path, timeout_ms);
if (null == result) break;
if (result.error_code != ERR_CODE.ERR_DETECT_QUEUE_FULL) break;
if (!wait_if_queuefull) break;
detector.waitQueueAvailable(-1);
}
return result;
}
/**
* 非同步檢測檔案介面
* @param detector 檢測器對象
* @param path 待檢測的檔案路徑
* @param timeout_ms 設定逾時時間,單位為毫秒
* @param wait_if_queuefull 如果檢測隊列滿了,false表示不等待直接返回錯誤,true表示一直等待直到隊列不滿時
* @param callback 結果回呼函數
* @throws InterruptedException
*/
public static int detectFile(OpenAPIDetector detector, String path, int timeout_ms, boolean wait_if_queuefull, IDetectResultCallback callback) throws InterruptedException {
if (null == detector || null == path || null == callback) return ERR_CODE.ERR_INIT.value();
int result = ERR_CODE.ERR_INIT.value();
if (wait_if_queuefull) {
final IDetectResultCallback real_callback = callback;
callback = new IDetectResultCallback() {
public void onScanResult(int seq, String file_path, DetectResult callback_res) {
if (callback_res.error_code == ERR_CODE.ERR_DETECT_QUEUE_FULL) return;
real_callback.onScanResult(seq, file_path, callback_res);
}
};
}
while(true) {
result = detector.detect(path, timeout_ms, callback);
if (result != ERR_CODE.ERR_DETECT_QUEUE_FULL.value()) break;
if (!wait_if_queuefull) break;
detector.waitQueueAvailable(-1);
}
return result;
}
/**
* 同步檢測URL檔案介面
* @param detector 檢測器對象
* @param url 待檢測的檔案URL
* @param md5 待檢測的檔案md5
* @param timeout_ms 設定逾時時間,單位為毫秒
* @param wait_if_queuefull 如果檢測隊列滿了,false表示不等待直接返回錯誤,true表示一直等待直到隊列不滿時
* @throws InterruptedException
*/
public static DetectResult detectUrlSync(OpenAPIDetector detector, String url, String md5, int timeout_ms, boolean wait_if_queuefull) throws InterruptedException {
if (null == detector || null == url || null == md5) return null;
DetectResult result = null;
while(true) {
result = detector.detectUrlSync(url, md5, timeout_ms);
if (null == result) break;
if (result.error_code != ERR_CODE.ERR_DETECT_QUEUE_FULL) break;
if (!wait_if_queuefull) break;
detector.waitQueueAvailable(-1);
}
return result;
}
/**
* 非同步檢測URL檔案介面
* @param detector 檢測器對象
* @param url 待檢測的檔案URL
* @param md5 待檢測的檔案md5
* @param timeout_ms 設定逾時時間,單位為毫秒
* @param wait_if_queuefull 如果檢測隊列滿了,false表示不等待直接返回錯誤,true表示一直等待直到隊列不滿時
* @param callback 結果回呼函數
* @throws InterruptedException
*/
public static int detectUrl(OpenAPIDetector detector, String url, String md5, int timeout_ms, boolean wait_if_queuefull, IDetectResultCallback callback) throws InterruptedException {
if (null == detector || null == url || null == md5 || null == callback) return ERR_CODE.ERR_INIT.value();
int result = ERR_CODE.ERR_INIT.value();
if (wait_if_queuefull) {
final IDetectResultCallback real_callback = callback;
callback = new IDetectResultCallback() {
public void onScanResult(int seq, String file_path, DetectResult callback_res) {
if (callback_res.error_code == ERR_CODE.ERR_DETECT_QUEUE_FULL) return;
real_callback.onScanResult(seq, file_path, callback_res);
}
};
}
while(true) {
result = detector.detectUrl(url, md5, timeout_ms, callback);
if (result != ERR_CODE.ERR_DETECT_QUEUE_FULL.value()) break;
if (!wait_if_queuefull) break;
detector.waitQueueAvailable(-1);
}
return result;
}
/**
* 格式化檢測結果
* @param result 檢測結果對象
* @return 格式化後的字串
*/
public static String formatDetectResult(DetectResult result) {
if (result.isSucc()) {
DetectResult.DetectResultInfo info = result.getDetectResultInfo();
String msg = String.format("[DETECT RESULT] [SUCCEED] %s", formatDetectResultInfo(info));
if (info.compresslist != null) {
int idx = 1;
for (DetectResult.CompressFileDetectResultInfo comp_res : info.compresslist) {
msg += String.format("\n\t\t\t [COMPRESS FILE] [IDX:%d] %s", idx++, formatCompressFileDetectResultInfo(comp_res));
}
}
return msg;
}
DetectResult.ErrorInfo info = result.getErrorInfo();
return String.format("[DETECT RESULT] [FAIL] md5: %s, time: %d, error_code: %s, error_message: %s"
, info.md5, info.time, info.error_code.name(), info.error_string);
}
private static String formatDetectResultInfo(DetectResult.DetectResultInfo info) {
String msg = String.format("MD5: %s, TIME: %d, RESULT: %s, SCORE: %d", info.md5, info.time, info.result.name(), info.score);
if (info.compresslist != null) {
msg += String.format(", COMPRESS_FILES: %d", info.compresslist.size());
}
DetectResult.VirusInfo vinfo = info.getVirusInfo();
if (vinfo != null) {
msg += String.format(", VIRUS_TYPE: %s, EXT_INFO: %s", vinfo.virus_type, vinfo.ext_info);
}
return msg;
}
private static String formatCompressFileDetectResultInfo(DetectResult.CompressFileDetectResultInfo info) {
String msg = String.format("PATH: %s, \t\t RESULT: %s, SCORE: %d", info.path, info.result.name(), info.score);
DetectResult.VirusInfo vinfo = info.getVirusInfo();
if (vinfo != null) {
msg += String.format(", VIRUS_TYPE: %s, EXT_INFO: %s", vinfo.virus_type, vinfo.ext_info);
}
return msg;
}
/**
* 同步檢測目錄或檔案
* @param path 指定路徑,可以是檔案或者目錄。目錄的話就會遞迴遍曆
* @param is_sync 是否使用同步介面,推薦使用非同步。 true是同步, false是非同步
* @throws InterruptedException
*/
public static void detectDirOrFileSync(OpenAPIDetector detector, String path, int timeout_ms, Map<String, DetectResult> result_map) throws InterruptedException {
File file = new File(path);
String abs_path = file.getAbsolutePath();
if (file.isDirectory()) {
String[] ss = file.list();
if (ss == null) return;
for (String s : ss) {
String subpath = abs_path + File.separator + s;
detectDirOrFileSync(detector, subpath, timeout_ms, result_map);
}
return;
}
System.out.println(String.format("[detectFileSync] [BEGIN] queueSize: %d, path: %s, timeout: %d", detector.getQueueSize(), abs_path, timeout_ms));
DetectResult res = detectFileSync(detector, abs_path, timeout_ms, true);
System.err.println(String.format(" [ END ] %s", formatDetectResult(res)));
result_map.put(abs_path, res);
}
/**
* 非同步檢測目錄或檔案
* @param path 指定路徑,可以是檔案或者目錄。目錄的話就會遞迴遍曆
* @param is_sync 是否使用同步介面,推薦使用非同步。 true是同步, false是非同步
* @throws InterruptedException
*/
public static void detectDirOrFile(OpenAPIDetector detector, String path, int timeout_ms, IDetectResultCallback callback) throws InterruptedException {
File file = new File(path);
String abs_path = file.getAbsolutePath();
if (file.isDirectory()) {
String[] ss = file.list();
if (ss == null) return;
for (String s : ss) {
String subpath = abs_path + File.separator + s;
detectDirOrFile(detector, subpath, timeout_ms, callback);
}
return;
}
int seq = detectFile(detector, abs_path, timeout_ms, true, callback);
System.out.println(String.format("[detectFile] [BEGIN] seq: %d, queueSize: %d, path: %s, timeout: %d", seq, detector.getQueueSize(), abs_path, timeout_ms));
}
/**
* 開始對檔案或目錄進行
* @param path 指定路徑,可以是檔案或者目錄。目錄的話就會遞迴遍曆
* @param is_sync 是否使用同步介面,推薦使用非同步。 true是同步, false是非同步
* @throws InterruptedException
*/
public static void scan(final OpenAPIDetector detector, String path, int detect_timeout_ms, boolean is_sync) throws InterruptedException {
System.out.println(String.format("[SCAN] [START] path: %s, detect_timeout_ms: %d, is_sync: %b", path, detect_timeout_ms, is_sync));
long start_time = System.currentTimeMillis();
final Map<String, DetectResult> result_map = new HashMap<>();
if (is_sync) {
detectDirOrFileSync(detector, path, detect_timeout_ms, result_map);
} else {
detectDirOrFile(detector, path, detect_timeout_ms, new IDetectResultCallback() {
public void onScanResult(int seq, String file_path, DetectResult callback_res) {
System.err.println(String.format("[detectFile] [ END ] seq: %d, queueSize: %d, %s", seq, detector.getQueueSize(), formatDetectResult(callback_res)));
result_map.put(file_path, callback_res);
}
});
// 等待任務執行完成
detector.waitQueueEmpty(-1);
}
long used_time = System.currentTimeMillis() - start_time;
System.out.println(String.format("[SCAN] [ END ] used_time: %d, files: %d", used_time, result_map.size()));
int fail_count = 0;
int white_count = 0;
int black_count = 0;
for (Map.Entry<String, DetectResult> entry : result_map.entrySet()) {
DetectResult res = entry.getValue();
if (res.isSucc()) {
if (res.getDetectResultInfo().result == DetectResult.RESULT.RES_BLACK) {
black_count ++;
} else {
white_count ++;
}
} else {
fail_count ++;
}
}
System.out.println(String.format(" fail_count: %d, white_count: %d, black_count: %d"
, fail_count, white_count, black_count));
}
public static void main(String[] args_) throws Exception {
// 擷取檢測器執行個體
OpenAPIDetector detector = OpenAPIDetector.getInstance();
// 初始化
ERR_CODE init_ret = detector.init(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
System.out.println("INIT RET: " + init_ret.name());
// 設定解壓縮參數(可選,預設不解壓壓縮包)
boolean decompress = true; // 是否識別壓縮檔並解壓,預設為false
int decompressMaxLayer = 5; // 最大解壓層數,decompress參數為true時生效
int decompressMaxFileCount = 1000; // 最大解壓檔案數,decompress參數為true時生效
ERR_CODE initdec_ret = detector.initDecompress(decompress, decompressMaxLayer, decompressMaxFileCount);
System.out.println("INIT_DECOMPRESS RET: " + initdec_ret.name());
if (true) {
// 樣本用法1:掃描本地目錄或檔案
boolean is_sync_scan = false; // 是非同步檢測還是同步檢測。非同步檢測效能更好。false表示非同步檢測
int timeout_ms = 500000; // 單個樣本檢測時間,單位為毫秒
String path = "test2.php"; // 待掃描的檔案或目錄
// 啟動掃描,直到掃描結束
scan(detector, path, timeout_ms, is_sync_scan);
}
if (true) {
// 樣本用法2:掃描URL檔案
int timeout_ms = 500000; // 單個樣本檢測時間,單位為毫秒
String url = "https://xxxxxxxx.oss-cn-hangzhou-1.aliyuncs.com/xxxxx/xxxxxxxxxxxxxx?Expires=1*****25&OSSAccessKeyId=xxx"; // 待掃描的URL檔案
String md5 = "a767f*************6e21d000000"; // 待掃描的檔案MD5
// 同步掃描。如果需要非同步掃描,調用detectUrl介面
System.out.println(String.format("[detectUrlSync] [BEGIN] URL: %s, MD5: %s, TIMEOUT: %d", url, md5, timeout_ms));
DetectResult result = detectUrlSync(detector, url, md5, timeout_ms, true);
System.err.println(String.format("[detectUrlSync] [ END ] %s", formatDetectResult(result)));
}
// 反初始化
System.out.println("Over.");
detector.uninit();
}
}
# -*- coding: utf-8 -*-
import os
import sys
from typing import List
import threading
import time
import traceback
from alibabacloud_filedetect.OpenAPIDetector import OpenAPIDetector
from alibabacloud_filedetect.IDetectResultCallback import IDetectResultCallback
from alibabacloud_filedetect.ERR_CODE import ERR_CODE
from alibabacloud_filedetect.DetectResult import DetectResult
class Sample(object):
def __init__(self):
pass
"""
同步檢測檔案介面
@param detector 檢測器對象
@param path 待檢測的檔案路徑
@param timeout_ms 設定逾時時間,單位為毫秒
@param wait_if_queuefull 如果檢測隊列滿了,False表示不等待直接返回錯誤,True表示一直等待直到隊列不滿時
"""
def detectFileSync(self, detector, path, timeout_ms, wait_if_queuefull):
if detector is None or path is None:
return None
result = None
while True:
result = detector.detectSync(path, timeout_ms)
if result is None:
break
if result.error_code != ERR_CODE.ERR_DETECT_QUEUE_FULL:
break
if wait_if_queuefull is False:
break
detector.waitQueueAvailable(-1)
return result
"""
非同步檢測檔案介面
@param detector 檢測器對象
@param path 待檢測的檔案路徑
@param timeout_ms 設定逾時時間,單位為毫秒
@param wait_if_queuefull 如果檢測隊列滿了,False表示不等待直接返回錯誤,True表示一直等待直到隊列不滿時
@param callback 結果回呼函數
"""
def detectFile(self, detector, path, timeout_ms, wait_if_queuefull, callback):
if detector is None or path is None or callback is None:
return ERR_CODE.ERR_INIT.value
result = ERR_CODE.ERR_INIT.value
if wait_if_queuefull:
real_callback = callback
class AsyncTaskCallback(IDetectResultCallback):
def onScanResult(self, seq, file_path, callback_res):
if callback_res.error_code == ERR_CODE.ERR_DETECT_QUEUE_FULL:
return
real_callback.onScanResult(seq, file_path, callback_res)
callback = AsyncTaskCallback()
while True:
result = detector.detect(path, timeout_ms, callback)
if result != ERR_CODE.ERR_DETECT_QUEUE_FULL.value:
break
if wait_if_queuefull is False:
break
detector.waitQueueAvailable(-1)
return result
"""
同步檢測URL檔案介面
@param detector 檢測器對象
@param url 待檢測的檔案URL
@param md5 待檢測的檔案md5
@param timeout_ms 設定逾時時間,單位為毫秒
@param wait_if_queuefull 如果檢測隊列滿了,false表示不等待直接返回錯誤,true表示一直等待直到隊列不滿時
"""
def detectUrlSync(self, detector, url, md5, timeout_ms, wait_if_queuefull):
if detector is None or url is None or md5 is None:
return None
result = None
while True:
result = detector.detectUrlSync(url, md5, timeout_ms)
if result is None:
break
if result.error_code != ERR_CODE.ERR_DETECT_QUEUE_FULL:
break
if wait_if_queuefull is False:
break
detector.waitQueueAvailable(-1)
return result
"""
非同步檢測URL檔案介面
@param detector 檢測器對象
@param url 待檢測的檔案URL
@param md5 待檢測的檔案md5
@param timeout_ms 設定逾時時間,單位為毫秒
@param wait_if_queuefull 如果檢測隊列滿了,false表示不等待直接返回錯誤,true表示一直等待直到隊列不滿時
@param callback 結果回呼函數
"""
def detectUrl(self, detector, url, md5, timeout_ms, wait_if_queuefull, callback):
if detector is None or url is None or md5 is None or callback is None:
return ERR_CODE.ERR_INIT.value
result = ERR_CODE.ERR_INIT.value
if wait_if_queuefull:
real_callback = callback
class AsyncTaskCallback(IDetectResultCallback):
def onScanResult(self, seq, file_path, callback_res):
if callback_res.error_code == ERR_CODE.ERR_DETECT_QUEUE_FULL:
return
real_callback.onScanResult(seq, file_path, callback_res)
callback = AsyncTaskCallback()
while True:
result = detector.detectUrl(url, md5, timeout_ms, callback)
if result != ERR_CODE.ERR_DETECT_QUEUE_FULL.value:
break
if wait_if_queuefull is False:
break
detector.waitQueueAvailable(-1)
return result
"""
格式化檢測結果
@param result 檢測結果對象
@return 格式化後的字串
"""
@staticmethod
def formatDetectResult(result):
msg = ""
if result.isSucc():
info = result.getDetectResultInfo()
msg = "[DETECT RESULT] [SUCCEED] {}".format(Sample.formatDetectResultInfo(info))
if info.compresslist is not None:
idx = 1
for comp_res in info.compresslist:
msg += "\n\t\t\t [COMPRESS FILE] [IDX:{}] {}".format(idx, Sample.formatCompressFileDetectResultInfo(comp_res))
idx += 1
else:
info = result.getErrorInfo()
msg = "[DETECT RESULT] [FAIL] md5: {}, time: {}, error_code: {}, error_message: {}".format(info.md5,
info.time, info.error_code.name, info.error_string)
return msg
@staticmethod
def formatDetectResultInfo(info):
msg = "MD5: {}, TIME: {}, RESULT: {}, SCORE: {}".format(info.md5, info.time, info.result.name, info.score)
if info.compresslist is not None:
msg += ", COMPRESS_FILES: {}".format(len(info.compresslist))
vinfo = info.getVirusInfo()
if vinfo is not None:
msg += ", VIRUS_TYPE: {}, EXT_INFO: {}".format(vinfo.virus_type, vinfo.ext_info)
return msg
@staticmethod
def formatCompressFileDetectResultInfo(info):
msg = "PATH: {}, \t\t RESULT: {}, SCORE: {}".format(info.path, info.result.name, info.score)
vinfo = info.getVirusInfo()
if vinfo is not None:
msg += ", VIRUS_TYPE: {}, EXT_INFO: {}".format(vinfo.virus_type, vinfo.ext_info)
return msg
"""
同步檢測目錄或檔案
@param path 指定路徑,可以是檔案或者目錄。目錄的話就會遞迴遍曆
@param is_sync 是否使用同步介面,推薦使用非同步。 True是同步,False是非同步
"""
def detectDirOrFileSync(self, detector, path, timeout_ms, result_map):
abs_path = os.path.abspath(path)
if os.path.isdir(abs_path):
sub_files = os.listdir(abs_path)
if len(sub_files) == 0:
return
for sub_file in sub_files:
sub_path = os.path.join(abs_path, sub_file)
self.detectDirOrFileSync(detector, sub_path, timeout_ms, result_map)
return
print("[detectFileSync] [BEGIN] queueSize: {}, path: {}, timeout: {}".format(
detector.getQueueSize(), abs_path, timeout_ms))
res = self.detectFileSync(detector, abs_path, timeout_ms, True)
print(" [ END ] {}".format(Sample.formatDetectResult(res)))
result_map[abs_path] = res
return
"""
非同步檢測目錄或檔案
@param path 指定路徑,可以是檔案或者目錄。目錄的話就會遞迴遍曆
@param is_sync 是否使用同步介面,推薦使用非同步。True是同步, False是非同步
"""
def detectDirOrFile(self, detector, path, timeout_ms, callback):
abs_path = os.path.abspath(path)
if os.path.isdir(abs_path):
sub_files = os.listdir(abs_path)
if len(sub_files) == 0:
return
for sub_file in sub_files:
sub_path = os.path.join(abs_path, sub_file)
self.detectDirOrFile(detector, sub_path, timeout_ms, callback)
return
seq = self.detectFile(detector, abs_path, timeout_ms, True, callback)
print("[detectFile] [BEGIN] seq: {}, queueSize: {}, path: {}, timeout: {}".format(
seq, detector.getQueueSize(), abs_path, timeout_ms))
return
"""
開始對檔案或目錄進行檢測
@param path 指定路徑,可以是檔案或者目錄。目錄的話就會遞迴遍曆
@param is_sync 是否使用同步介面,推薦使用非同步。 True是同步,False是非同步
"""
def scan(self, detector, path, detect_timeout_ms, is_sync):
try:
print("[SCAN] [START] path: {}, detect_timeout_ms: {}, is_sync: {}".format(path, detect_timeout_ms, is_sync))
start_time = time.time()
result_map = {}
if is_sync:
self.detectDirOrFileSync(detector, path, detect_timeout_ms, result_map)
else:
class AsyncTaskCallback(IDetectResultCallback):
def onScanResult(self, seq, file_path, callback_res):
print("[detectFile] [ END ] seq: {}, queueSize: {}, {}".format(seq,
detector.getQueueSize(), Sample.formatDetectResult(callback_res)))
result_map[file_path] = callback_res
self.detectDirOrFile(detector, path, detect_timeout_ms, AsyncTaskCallback())
# 等待任務執行完成
detector.waitQueueEmpty(-1)
used_time_ms = (time.time() - start_time) * 1000
print("[SCAN] [ END ] used_time: {}, files: {}".format(int(used_time_ms), len(result_map)))
failed_count = 0
white_count = 0
black_count = 0
for file_path, res in result_map.items():
if res.isSucc():
if res.getDetectResultInfo().result == DetectResult.RESULT.RES_BLACK:
black_count += 1
else:
white_count += 1
else:
failed_count += 1
print(" fail_count: {}, white_count: {}, black_count: {}".format(
failed_count, white_count, black_count))
except Exception as e:
print(traceback.format_exc(), file=sys.stderr)
def main(self):
# 擷取檢測器執行個體
detector = OpenAPIDetector.get_instance()
# 讀取環境變數中的Access Key ID和Access Key Secret
access_key_id = os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID')
access_key_secret = os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET')
# 初始化
init_ret = detector.init(access_key_id, access_key_secret)
print("INIT RET: {}".format(init_ret.name))
# 設定解壓縮參數(可選,預設不解壓壓縮包)
decompress = True # 是否識別壓縮檔並解壓,預設為false
decompressMaxLayer = 5 # 最大解壓層數,decompress參數為true時生效
decompressMaxFileCount = 1000 # 最大解壓檔案數,decompress參數為true時生效
initdec_ret = detector.initDecompress(decompress, decompressMaxLayer, decompressMaxFileCount)
print("INIT_DECOMPRESS RET: {}".format(initdec_ret.name))
if True:
# 樣本用法1:掃描本地目錄或檔案
is_sync_scan = False # 是非同步檢測還是同步檢測。非同步檢測效能更好。False表示非同步檢測
timeout_ms = 500000 # 單個樣本檢測時間,單位為毫秒
path = "test.bin" # 待掃描的檔案或目錄
# 啟動掃描,直到掃描結束
self.scan(detector, path, timeout_ms, is_sync_scan)
if True:
# 樣本用法2:掃描URL檔案
timeout_ms = 500000
url = "https://xxxxxxxx.oss-cn-hangzhou-1.aliyuncs.com/xxxxx/xxxxxxxxxxxxxx?Expires=1671448125&OSSAccessKeyId=xxx" # 待掃描的URL檔案
md5 = "a767ffc59d93125c7505b6e21d000000"
# 同步掃描。如果需要非同步掃描,調用detectUrl介面
print("[detectUrlSync] [BEGIN] URL: {}, MD5: {}, TIMEOUT: {}".format(url, md5, timeout_ms))
result = self.detectUrlSync(detector, url, md5, timeout_ms, True)
print("[detectUrlSync] [ END ] {}".format(Sample.formatDetectResult(result)))
# 反初始化
print("Over.")
detector.uninit()
if __name__ == "__main__":
sample = Sample()
sample.main()
返回結果
調用SDK進行惡意檔案檢測後,只能在程式的運行結果中查看檢測結果,檢測結果不會同步到Security Center控制台中。控制台中僅支援查看剩餘檢測次數。風險檔案總覽頁面的統計資料,例如檢測檔案總數等,為OSS檔案檢測的結果。
struct DetectResult {
std::string md5; // 樣本md5
long time = 0; // 用時,單位為毫秒
ERR_CODE error_code; // 錯誤碼
std::string error_string; // 擴充錯誤資訊
enum RESULT {
RES_WHITE = 0, //安全檔案
RES_BLACK = 1, //可疑檔案
RES_PENDING = 3 //檢測中
};
RESULT result; //檢測結果
int score; //檢測分值,取值範圍0~100
std::string virus_type; //病毒類型,如“WebShell/MalScript/Hacktool”
std::string ext_info; //擴充資訊為JSON字串
struct CompressFileDetectResultInfo {
std::string path; // 壓縮檔路徑
RESULT result; // 檢測結果
int score; // 分值,取值範圍0-100
std::string virus_type; //病毒類型,如“WebShell/MalScript/Hacktool”
std::string ext_info; //擴充資訊為JSON字串
};
std::list<struct CompressFileDetectResultInfo> compresslist = null; // 如果是壓縮包,並且開啟了壓縮包解壓參數,則此處會輸出壓縮包內檔案檢測結果
};
錯誤碼說明如下:
enum ERR_CODE {
ERR_INIT = -100, // 需要初始化,或者重複初始化
ERR_FILE_NOT_FOUND = -99, // 檔案未找到
ERR_DETECT_QUEUE_FULL = -98, // 檢測隊列滿
ERR_CALL_API = -97, // 調用API錯誤
ERR_TIMEOUT = -96, // 逾時
ERR_UPLOAD = -95, //檔案上傳失敗;使用者可重新發起檢測,再次嘗試
ERR_ABORT = -94, //程式退出,樣本未得到檢測
ERR_TIMEOUT_QUEUE = -93, //隊列逾時,使用者發起檢測頻率過高或逾時時間過短
ERR_MD5 = -92, // MD5格式不對
ERR_URL = -91, // URL格式不對
ERR_SUCC = 0 // 成功
};
檢測分數越高,檔案可能存在的風險越高。檢測分數和危險等級對應表如下:
分數區間 | 危險等級 |
0~60 | 安全 |
61~70 | 風險(低危) |
71~80 | 可疑(中危) |
81~100 | 惡意(高危) |
在Security Center控制台檢測OSS中儲存的檔案
登入Security Center控制台。在控制台左上方,選擇需防護資產所在的地區中國。
在左側導覽列,選擇 。
單擊OSS檔案檢測頁簽,選擇合適的檢測方式並執行檢測。
如果您的Bucket沒有在OSS檔案檢測的列表中,您可以單擊同步Bucket,同步最新的Bucket列表。
檢測方式
說明
操作步驟
手動全量檢測
檢測單個或多個Bucket內的所有檔案。
在OSS檔案檢測頁簽,單擊單個Bucke操作列的檢測或選中多個Bucket後單擊批量檢測。
在檢測對話方塊,指定需要檢測檔案的類型、檢測檔案範圍、掃描路徑等。
解壓層級:如果選擇檢測的檔案類型包含壓縮類型,可以設定解壓層級(最多5層,支援不解壓)和單個壓縮包解壓檔案數量限制(最多1000個)。
檔案解密類型:預設不解密,如果OSS檔案配置了服務端加密(SSE-KMS、SSE-OSS),可以設定解密後再檢測。解密類型OSS對應SSE-OSS加密的OSS檔案,解密類型KMS對應SSE-KMS加密的OSS檔案。
檢測檔案範圍:可選。設定檢測範圍的時間上限。設定後會檢測檔案更新時間晚於該時間的檔案。
掃描路徑:支援選擇按首碼匹配(輸入檔案名稱首碼來匹配掃描指定檔案)或配置到整個Bucket(掃描整個Bucket檔案)。
單擊確定。
手動增量檢測
針對已檢測且上次檢測後有檔案更新的Bucket,提供只檢測未檢測過的檔案的功能。
在OSS檔案檢測頁簽,單擊目標Bucke操作列的增量檢測。
在增量檢測對話方塊中,指定需要檢測檔案的類型、解壓層級(壓縮類型檔案支援)、檔案解密類型、檢測檔案範圍、掃描路徑(支援按首碼匹配和配置到整個Bucket)。
單擊確定。
自動檢測
通過配置掃描策略可為指定Bucket開啟周期性自動檢測。配置前建議瞭解以下資訊:
一個Bucket只能在一個策略中生效。
自動檢測僅會檢測OSS新增的檔案,不會重複資料偵測同一個檔案。
在OSS檔案檢測頁簽,單擊策略管理地區的策略配置。
在策略管理面板,單擊新增策略。
如果已配置策略的檢測方案符合檢測要求,您可以單擊該策略操作列的編輯,將目標Bucket添加到該策略的生效範圍內。
在策略建立面板,配置策略名稱、策略啟用狀態(預設啟用)、掃描路徑(支援按首碼匹配和配置到整個Bucket)、檢測檔案範圍、附表、檔案檢測時間、檔案檢測類型、解壓層級(壓縮類型檔案支援)、檔案解密類型和生效Bucket等。
單擊確定。
可選:配置DingTalk機器人通知
您可以在系統配置中添加DingTalk機器人通知,通過DingTalk群即時接收Security Center檢測的惡意檔案警示資訊。具體操作,請參見配置DingTalk機器人通知。
查看和匯出檢測結果
風險檔案總覽
在風險檔案總覽頁簽:
查看風險檔案統計資訊
不同等級(高危、中危、低危)風險通過不同顏色顯示:高危(紅色)、中危(橙色)、低危(灰色)。
在風險檔案清單中,可以根據檢測情境列來選擇查看調用SDK(API)和在控制台(OSS)檢測的惡意檔案資訊,也可以單擊列表左上方的搜尋項的下拉框,根據風險等級、檔案名稱、威脅標籤、MD5或最新檢測時間來查看目標惡意檔案詳情。
如果是壓縮包檔案,可單擊檔案前的展開表徵圖,展示該壓縮包下的風險檔案清單,該壓縮包檔案的風險等級為其下風險檔案中的最高風險等級。
查看目標風險檔案詳情
在風險檔案清單中,單擊目標風險檔案操作列的詳情,在該檔案的詳情面板,查看檢測出的存在風險的檔案詳情、事件說明(包含惡意檔案說明和處置建議)等資訊。
在壓縮包檔案的風險詳情面板,還可查看該壓縮包中的風險檔案數、解壓後檢測的檔案總數,以及風險檔案清單等資訊。
匯出所有風險檔案清單
單擊表徵圖,等待檔案匯出完成後,在提示框中單擊下載。
OSS檔案檢測
在OSS檔案檢測頁簽:
從OSS Bucket維度查看風險檔案統計資訊
不同等級(高危、中危、低危)風險通過不同顏色顯示:高危(紅色)、中危(橙色)、低危(灰色)。
查看目標OSS Bucket的檢測詳情
在Bucket列表中,單擊目標Bucket操作列的詳情,查看Bucket的基本資料和風險檔案詳情。
在壓縮包的檔案檢測詳情頁面,還可以查看設定檔解壓情況、解壓後的檔案總數、檢測的檔案數量以及已掃描、未掃描的風險檔案清單。
按Bucket維度匯出所有風險檔案清單
在OSS檔案檢測頁簽,單擊表徵圖,等待檔案匯出完成後,在提示框中單擊下載。
處理風險檔案的警示事件
對於檢測出的風險檔案,需要您根據風險等級以及Security Center提供的事件說明和處置建議,進行確認和手動修複。
高危風險事件:誤判的機率較低,同時惡意性較強,建議及時處理。
中危和低危風險事件:該等級風險檔案多出現在業務情境中,需要您核實對業務的影響後處理。
如果您確認是業務的正常檔案或是誤判,您可以忽略相關警示事件。如果您在業務程式中調用SDK檢測檔案,對於正常或誤判的檔案,您也可以在業務程式中添加忽略或加入白名單的處理邏輯。
相關API
您可以通過調用API介面方式使用惡意檔案檢測功能。具體使用方法如下: