恶意文件检测SDK功能依托云安全中心多引擎检测平台,可识别离线文件和阿里云OSS文件中存在的常见病毒,例如勒索病毒、挖矿程序等,防止恶意文件传播和执行。本文介绍如何使用恶意文件检测SDK功能。
功能说明
恶意文件检测SDK为云端检测方案,使用该功能会将您的文件上传到云端进行检测。
检测的病毒类型
支持检测的病毒类型(virus_type)中英文映射表
检测文件说明
支持对未加密的压缩包解压并进行检测。
在调用SDK检测离线文件场景下,默认不解压,需要您自行配置解压,扫描时可以设置是否识别压缩文件并解压、最大解压层级和最大解压文件数。
在OSS文件检测场景下,默认不解压,需要您自行配置解压,支持全局和单个Bucket粒度下配置压缩包的解压层级。
支持对已配置服务端加密的OSS数据进行解密后检测。OSS针对不同使用场景提供了以下服务器端加密方式,详细内容,请参见服务器端加密。
使用KMS托管密钥进行加解密(SSE-KMS):使用KMS托管的默认CMK(Customer Master Key)或指定CMK进行加解密操作。数据无需通过网络发送到KMS服务端进行加解密。
使用OSS完全托管密钥进行加解密(SSE-OSS):使用OSS完全托管的密钥加密每个Object。
检测恶意文件的方式
在业务服务器中调用SDK检测离线文件
在您的业务程序中接入云安全中心的恶意文件检测SDK,通过调用SDK的方式检测恶意文件,在返回结果中获取恶意文件信息,且支持在云安全中心控制台查看存在风险的文件检测结果。
支持通过Java或Python方式接入。
在云安全中心控制台检测OSS中存储的文件
如果您待检测的文件存储在阿里云对象存储OSS Bucket中,可以直接在云安全中心控制台执行对目标OSS Bucket内文件的检测,并支持查看存在风险的文件列表。
提供检测结果
云安全中心通过整合多个知名病毒检测引擎的结果,并结合安全专家的经验,基于文件潜在恶意程度和检测准确程度两个关键维度,评定检测出恶意文件事件的风险等级(高危、中危、低危),并给出相应恶意文件说明和处置建议。
日志分析
如果已开通云安全中心日志分析服务,会将恶意文件检测记录投递到云安全中心的专属Logstore(日志库)中。详细说明,请参见日志分析说明和恶意文件检测字段说明。
应用场景
应用场景 | 恶意文件检测说明 |
服务器应用场景 | 在服务器环境中,会常遇到各种大规模传播的恶意文件,例如蠕虫病毒、挖矿软件、DDoS木马以及恶意脚本等。 这些恶意文件通常具备自我复制和广泛传播的特性,旨在消耗系统资源、发起分布式拒绝服务攻击或控制服务器以实施非法活动。 |
服务器定向攻击场景 | 在服务器定向攻击场景中,为了确保系统的安全和稳定运行,需要特别关注黑客工具、代理工具及后门程序等恶意文件。 这类攻击突出隐蔽性和针对性,黑客通过植入此类恶意文件来窃取敏感信息、操控系统或作为跳板进一步渗透网络。 |
全系统环境检测 | 在各种环境中,都应密切关注破坏力强的勒索病毒和感染型病毒。 勒索病毒通过加密用户数据勒索赎金,感染型病毒能自我复制并扩散至其他文件,两者均能造成严重的数据损失和系统瘫痪。 |
办公网与文件存储环境检测 | 在办公网络和文件存储环境中,特别需要注意防范恶意文档类文件(例如含有宏病毒的Office文件、带有恶意负载的压缩包等)的潜在威胁。 这些文件通常通过伪装成日常工作交流中的正常文档,诱导用户打开从而实施攻击。例如窃取登录凭据、部署远程访问木马等。 |
使用限制
一次恶意文件检测仅可以检测一个不超过100 MB的文件。
试用版和企业版的默认接口请求频率不同。
试用版:10次/秒。
企业版:20次/秒。
支持检测的压缩包文件类型有
.7z
、.zip
、.tar
、.gz
、.rar
、.ar
、.bz2
、.xz
和.lzma
。一个压缩包支持解压的层级最多为5层,解压后的文件总个数最多为1,000个,解压后的所有文件总大小最多为1 GB。对于超出限制范围的文件,不会被检测。
恶意文件的检测速度会受到网速、计算机性能、云产品限制等方面的影响。恶意文件检测SDK内部采用队列的方式,缓解外部峰值时的请求来提高并发率。当内部的队列满时,拒绝外部继续提交请求,并使用等待接口,等待队列有空间可用时再处理外部请求。
您可以通过增加队列长度来提高并发量,该方法会影响部分样本的检测时长。
timeout_ms
参数为样本超时时间,单位为毫秒。为了减少超时,建议您将timeout_ms
参数设置为60,000毫秒(即60秒)。在OSS文件检测场景下,仅支持存储类型为标准存储和低频访问的文件检测,不支持归档存储类型的文件检测。存储类型详细内容,请参见存储类型。
在OSS文件检测场景下,支持检测的OSS Bucket的所属地域包括:华北1(青岛)、华北2(北京)、华北3(张家口)、华北5(呼和浩特)、华东1(杭州)、华东2(上海)、华南1(深圳)、华南2(河源)、华南3(广州)、西南1(成都)、中国香港、新加坡、印度尼西亚(雅加达)、泰国(曼谷)、菲律宾(马尼拉)、马来西亚(吉隆坡)、韩国(首尔)、日本(东京)、美国(硅谷)、英国(伦敦)、美国(弗吉尼亚)、德国(法兰克福)。
计费说明
使用恶意文件检测SDK功能会消耗文件检测次数。如果是压缩包文件,每个压缩包文件消耗的文件检测次数按照解压后文件个数计算。
经过企业实名认证的阿里云账号可以免费试用恶意文件检测SDK功能。试用版提供10,000次恶意文件检测次数。
购买企业版可以选择您所需的恶意文件检测次数。
计费项详细说明,请参见计费概述。
开通服务并检测恶意文件
前提条件
如果您使用的是RAM用户,请确保已为RAM用户授予AliyunYundunSASFullAccess权限。具体操作,请参见为RAM用户授权。
步骤一:开通服务
云安全中心支持通过免费试用和付费购买的方式开通服务。
免费试用
如果您的阿里云账号已通过企业认证,您可以通过免费试用开通恶意文件检测SDK服务,并获取1万次恶意文件检测次数。每个阿里云账号仅有一次免费试用机会。
登录云安全中心控制台。在控制台左上角,选择需防护资产所在的区域中国。
在左侧导航栏,选择 。
在恶意文件检测SDK页面,单击立即试用。
付费购买
如果免费试用无法满足需求,您可以通过付费购买的方式开通恶意文件检测SDK服务。
如果存在未使用的免费试用次数,付费购买次数后,剩余的免费试用次数将累计在您付费购买的次数中。
登录云安全中心控制台。在控制台左上角,选择需防护资产所在的区域中国。
在左侧导航栏,选择 。
在恶意文件检测SDK页面,单击立即购买。在购买面板,选择恶意文件检测SDK为是,并按照要检测文件的数量购买足够的恶意文件检测次数。
如果您已购买云安全中心付费版本,您可以直接选择所需的恶意文件检测次数。如未购买,请根据您的需求选择云安全中心版本:
无需使用云安全中心的其他安全防护功能时,版本选择仅采购增值服务。
如需使用云安全中心的其他功能,例如漏洞修复、容器威胁检测,请选择相应的云安全中心版本。各版本的功能差异详情,请参见功能特性。
仔细阅读并选中服务协议,单击立即购买并完成支付。
开通恶意文件检测SDK服务后,您可以在恶意文件检测SDK页面,查看您的恶意文件检测SDK的剩余检测次数。如果剩余检测次数不足以支撑后续业务需求,您可以单击升级配置,购买更多的资源。更多说明,请参见升级与降配。
步骤二:检测恶意文件
根据您的业务场景,选择以下方式检测目标环境内的恶意文件。
执行恶意文件检测前,请确保当前阿里云账号下有足够的剩余检测次数。如果剩余检测次数不足,您可以在恶意文件检测SDK页面的风险文件总览页签下,单击升级配置购买足够的检测次数。
在业务服务器中调用SDK检测离线文件
准备工作
已配置环境变量
ALIBABA_CLOUD_ACCESS_KEY_ID
和ALIBABA_CLOUD_ACCESS_KEY_SECRET
。阿里云SDK支持通过定义
ALIBABA_CLOUD_ACCESS_KEY_ID
和ALIBABA_CLOUD_ACCESS_KEY_SECRET
环境变量来创建默认的访问凭证。调用接口时,程序直接访问凭证,读取您的访问密钥(即AccessKey)并自动完成鉴权。更多信息,请参见配置环境变量。您需要选择接入方式并参考下表的说明获取SDK包。
接入方式
版本要求
获取SDK包
Java接入
必须使用1.8或更高版本的JDK。
您可以通过以下方式获取Java SDK。
离线导入安装:在联网环境下访问Java SDK代码库并下载Java SDK,将下载的Java SDK添加到项目工程中。
Python接入
Python 3.6及以上版本。
您可以通过以下方式获取部署SDK包。
使用pip快速安装(联网环境下推荐):
pip install -U alibabacloud_filedetect
离线安装(无互联网连接的环境):在联网环境下访问Python代码库并下载Python SDK。将Python SDK上传至项目工程环境,解压压缩包后,运行以下安装命令:
# 切换至python SDK根目录 cd alibabacloud-file-detect-python-sdk-master # 安装SDK,注意python的版本 python setup.py install
示例代码
使用Python接入时,请替换下面示例中的测试用例路径参数,即path
。
package com.aliyun.filedetect.sample;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import com.aliyun.filedetect.*;
public class Sample {
/**
* 同步检测文件接口
* @param detector 检测器对象
* @param path 待检测的文件路径
* @param timeout_ms 设置超时时间,单位为毫秒
* @param wait_if_queuefull 如果检测队列满了,false表示不等待直接返回错误,true表示一直等待直到队列不满时
* @throws InterruptedException
*/
public static DetectResult detectFileSync(OpenAPIDetector detector, String path, int timeout_ms, boolean wait_if_queuefull) throws InterruptedException {
if (null == detector || null == path) return null;
DetectResult result = null;
while(true) {
result = detector.detectSync(path, timeout_ms);
if (null == result) break;
if (result.error_code != ERR_CODE.ERR_DETECT_QUEUE_FULL) break;
if (!wait_if_queuefull) break;
detector.waitQueueAvailable(-1);
}
return result;
}
/**
* 异步检测文件接口
* @param detector 检测器对象
* @param path 待检测的文件路径
* @param timeout_ms 设置超时时间,单位为毫秒
* @param wait_if_queuefull 如果检测队列满了,false表示不等待直接返回错误,true表示一直等待直到队列不满时
* @param callback 结果回调函数
* @throws InterruptedException
*/
public static int detectFile(OpenAPIDetector detector, String path, int timeout_ms, boolean wait_if_queuefull, IDetectResultCallback callback) throws InterruptedException {
if (null == detector || null == path || null == callback) return ERR_CODE.ERR_INIT.value();
int result = ERR_CODE.ERR_INIT.value();
if (wait_if_queuefull) {
final IDetectResultCallback real_callback = callback;
callback = new IDetectResultCallback() {
public void onScanResult(int seq, String file_path, DetectResult callback_res) {
if (callback_res.error_code == ERR_CODE.ERR_DETECT_QUEUE_FULL) return;
real_callback.onScanResult(seq, file_path, callback_res);
}
};
}
while(true) {
result = detector.detect(path, timeout_ms, callback);
if (result != ERR_CODE.ERR_DETECT_QUEUE_FULL.value()) break;
if (!wait_if_queuefull) break;
detector.waitQueueAvailable(-1);
}
return result;
}
/**
* 同步检测URL文件接口
* @param detector 检测器对象
* @param url 待检测的文件URL
* @param md5 待检测的文件md5
* @param timeout_ms 设置超时时间,单位为毫秒
* @param wait_if_queuefull 如果检测队列满了,false表示不等待直接返回错误,true表示一直等待直到队列不满时
* @throws InterruptedException
*/
public static DetectResult detectUrlSync(OpenAPIDetector detector, String url, String md5, int timeout_ms, boolean wait_if_queuefull) throws InterruptedException {
if (null == detector || null == url || null == md5) return null;
DetectResult result = null;
while(true) {
result = detector.detectUrlSync(url, md5, timeout_ms);
if (null == result) break;
if (result.error_code != ERR_CODE.ERR_DETECT_QUEUE_FULL) break;
if (!wait_if_queuefull) break;
detector.waitQueueAvailable(-1);
}
return result;
}
/**
* 异步检测URL文件接口
* @param detector 检测器对象
* @param url 待检测的文件URL
* @param md5 待检测的文件md5
* @param timeout_ms 设置超时时间,单位为毫秒
* @param wait_if_queuefull 如果检测队列满了,false表示不等待直接返回错误,true表示一直等待直到队列不满时
* @param callback 结果回调函数
* @throws InterruptedException
*/
public static int detectUrl(OpenAPIDetector detector, String url, String md5, int timeout_ms, boolean wait_if_queuefull, IDetectResultCallback callback) throws InterruptedException {
if (null == detector || null == url || null == md5 || null == callback) return ERR_CODE.ERR_INIT.value();
int result = ERR_CODE.ERR_INIT.value();
if (wait_if_queuefull) {
final IDetectResultCallback real_callback = callback;
callback = new IDetectResultCallback() {
public void onScanResult(int seq, String file_path, DetectResult callback_res) {
if (callback_res.error_code == ERR_CODE.ERR_DETECT_QUEUE_FULL) return;
real_callback.onScanResult(seq, file_path, callback_res);
}
};
}
while(true) {
result = detector.detectUrl(url, md5, timeout_ms, callback);
if (result != ERR_CODE.ERR_DETECT_QUEUE_FULL.value()) break;
if (!wait_if_queuefull) break;
detector.waitQueueAvailable(-1);
}
return result;
}
/**
* 格式化检测结果
* @param result 检测结果对象
* @return 格式化后的字符串
*/
public static String formatDetectResult(DetectResult result) {
if (result.isSucc()) {
DetectResult.DetectResultInfo info = result.getDetectResultInfo();
String msg = String.format("[DETECT RESULT] [SUCCEED] %s", formatDetectResultInfo(info));
if (info.compresslist != null) {
int idx = 1;
for (DetectResult.CompressFileDetectResultInfo comp_res : info.compresslist) {
msg += String.format("\n\t\t\t [COMPRESS FILE] [IDX:%d] %s", idx++, formatCompressFileDetectResultInfo(comp_res));
}
}
return msg;
}
DetectResult.ErrorInfo info = result.getErrorInfo();
return String.format("[DETECT RESULT] [FAIL] md5: %s, time: %d, error_code: %s, error_message: %s"
, info.md5, info.time, info.error_code.name(), info.error_string);
}
private static String formatDetectResultInfo(DetectResult.DetectResultInfo info) {
String msg = String.format("MD5: %s, TIME: %d, RESULT: %s, SCORE: %d", info.md5, info.time, info.result.name(), info.score);
if (info.compresslist != null) {
msg += String.format(", COMPRESS_FILES: %d", info.compresslist.size());
}
DetectResult.VirusInfo vinfo = info.getVirusInfo();
if (vinfo != null) {
msg += String.format(", VIRUS_TYPE: %s, EXT_INFO: %s", vinfo.virus_type, vinfo.ext_info);
}
return msg;
}
private static String formatCompressFileDetectResultInfo(DetectResult.CompressFileDetectResultInfo info) {
String msg = String.format("PATH: %s, \t\t RESULT: %s, SCORE: %d", info.path, info.result.name(), info.score);
DetectResult.VirusInfo vinfo = info.getVirusInfo();
if (vinfo != null) {
msg += String.format(", VIRUS_TYPE: %s, EXT_INFO: %s", vinfo.virus_type, vinfo.ext_info);
}
return msg;
}
/**
* 同步检测目录或文件
* @param path 指定路径,可以是文件或者目录。目录的话就会递归遍历
* @param is_sync 是否使用同步接口,推荐使用异步。 true是同步, false是异步
* @throws InterruptedException
*/
public static void detectDirOrFileSync(OpenAPIDetector detector, String path, int timeout_ms, Map<String, DetectResult> result_map) throws InterruptedException {
File file = new File(path);
String abs_path = file.getAbsolutePath();
if (file.isDirectory()) {
String[] ss = file.list();
if (ss == null) return;
for (String s : ss) {
String subpath = abs_path + File.separator + s;
detectDirOrFileSync(detector, subpath, timeout_ms, result_map);
}
return;
}
System.out.println(String.format("[detectFileSync] [BEGIN] queueSize: %d, path: %s, timeout: %d", detector.getQueueSize(), abs_path, timeout_ms));
DetectResult res = detectFileSync(detector, abs_path, timeout_ms, true);
System.err.println(String.format(" [ END ] %s", formatDetectResult(res)));
result_map.put(abs_path, res);
}
/**
* 异步检测目录或文件
* @param path 指定路径,可以是文件或者目录。目录的话就会递归遍历
* @param is_sync 是否使用同步接口,推荐使用异步。 true是同步, false是异步
* @throws InterruptedException
*/
public static void detectDirOrFile(OpenAPIDetector detector, String path, int timeout_ms, IDetectResultCallback callback) throws InterruptedException {
File file = new File(path);
String abs_path = file.getAbsolutePath();
if (file.isDirectory()) {
String[] ss = file.list();
if (ss == null) return;
for (String s : ss) {
String subpath = abs_path + File.separator + s;
detectDirOrFile(detector, subpath, timeout_ms, callback);
}
return;
}
int seq = detectFile(detector, abs_path, timeout_ms, true, callback);
System.out.println(String.format("[detectFile] [BEGIN] seq: %d, queueSize: %d, path: %s, timeout: %d", seq, detector.getQueueSize(), abs_path, timeout_ms));
}
/**
* 开始对文件或目录进行
* @param path 指定路径,可以是文件或者目录。目录的话就会递归遍历
* @param is_sync 是否使用同步接口,推荐使用异步。 true是同步, false是异步
* @throws InterruptedException
*/
public static void scan(final OpenAPIDetector detector, String path, int detect_timeout_ms, boolean is_sync) throws InterruptedException {
System.out.println(String.format("[SCAN] [START] path: %s, detect_timeout_ms: %d, is_sync: %b", path, detect_timeout_ms, is_sync));
long start_time = System.currentTimeMillis();
final Map<String, DetectResult> result_map = new HashMap<>();
if (is_sync) {
detectDirOrFileSync(detector, path, detect_timeout_ms, result_map);
} else {
detectDirOrFile(detector, path, detect_timeout_ms, new IDetectResultCallback() {
public void onScanResult(int seq, String file_path, DetectResult callback_res) {
System.err.println(String.format("[detectFile] [ END ] seq: %d, queueSize: %d, %s", seq, detector.getQueueSize(), formatDetectResult(callback_res)));
result_map.put(file_path, callback_res);
}
});
// 等待任务执行完成
detector.waitQueueEmpty(-1);
}
long used_time = System.currentTimeMillis() - start_time;
System.out.println(String.format("[SCAN] [ END ] used_time: %d, files: %d", used_time, result_map.size()));
int fail_count = 0;
int white_count = 0;
int black_count = 0;
for (Map.Entry<String, DetectResult> entry : result_map.entrySet()) {
DetectResult res = entry.getValue();
if (res.isSucc()) {
if (res.getDetectResultInfo().result == DetectResult.RESULT.RES_BLACK) {
black_count ++;
} else {
white_count ++;
}
} else {
fail_count ++;
}
}
System.out.println(String.format(" fail_count: %d, white_count: %d, black_count: %d"
, fail_count, white_count, black_count));
}
public static void main(String[] args_) throws Exception {
// 获取检测器实例
OpenAPIDetector detector = OpenAPIDetector.getInstance();
// 初始化
ERR_CODE init_ret = detector.init(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
System.out.println("INIT RET: " + init_ret.name());
// 设置解压缩参数(可选,默认不解压压缩包)
boolean decompress = true; // 是否识别压缩文件并解压,默认为false
int decompressMaxLayer = 5; // 最大解压层数,decompress参数为true时生效
int decompressMaxFileCount = 1000; // 最大解压文件数,decompress参数为true时生效
ERR_CODE initdec_ret = detector.initDecompress(decompress, decompressMaxLayer, decompressMaxFileCount);
System.out.println("INIT_DECOMPRESS RET: " + initdec_ret.name());
if (true) {
// 示例用法1:扫描本地目录或文件
boolean is_sync_scan = false; // 是异步检测还是同步检测。异步检测性能更好。false表示异步检测
int timeout_ms = 500000; // 单个样本检测时间,单位为毫秒
String path = "test2.php"; // 待扫描的文件或目录
// 启动扫描,直到扫描结束
scan(detector, path, timeout_ms, is_sync_scan);
}
if (true) {
// 示例用法2:扫描URL文件
int timeout_ms = 500000; // 单个样本检测时间,单位为毫秒
String url = "https://xxxxxxxx.oss-cn-hangzhou-1.aliyuncs.com/xxxxx/xxxxxxxxxxxxxx?Expires=1*****25&OSSAccessKeyId=xxx"; // 待扫描的URL文件
String md5 = "a767f*************6e21d000000"; // 待扫描的文件MD5
// 同步扫描。如果需要异步扫描,调用detectUrl接口
System.out.println(String.format("[detectUrlSync] [BEGIN] URL: %s, MD5: %s, TIMEOUT: %d", url, md5, timeout_ms));
DetectResult result = detectUrlSync(detector, url, md5, timeout_ms, true);
System.err.println(String.format("[detectUrlSync] [ END ] %s", formatDetectResult(result)));
}
// 反初始化
System.out.println("Over.");
detector.uninit();
}
}
# -*- coding: utf-8 -*-
import os
import sys
from typing import List
import threading
import time
import traceback
from alibabacloud_filedetect.OpenAPIDetector import OpenAPIDetector
from alibabacloud_filedetect.IDetectResultCallback import IDetectResultCallback
from alibabacloud_filedetect.ERR_CODE import ERR_CODE
from alibabacloud_filedetect.DetectResult import DetectResult
class Sample(object):
def __init__(self):
pass
"""
同步检测文件接口
@param detector 检测器对象
@param path 待检测的文件路径
@param timeout_ms 设置超时时间,单位为毫秒
@param wait_if_queuefull 如果检测队列满了,False表示不等待直接返回错误,True表示一直等待直到队列不满时
"""
def detectFileSync(self, detector, path, timeout_ms, wait_if_queuefull):
if detector is None or path is None:
return None
result = None
while True:
result = detector.detectSync(path, timeout_ms)
if result is None:
break
if result.error_code != ERR_CODE.ERR_DETECT_QUEUE_FULL:
break
if wait_if_queuefull is False:
break
detector.waitQueueAvailable(-1)
return result
"""
异步检测文件接口
@param detector 检测器对象
@param path 待检测的文件路径
@param timeout_ms 设置超时时间,单位为毫秒
@param wait_if_queuefull 如果检测队列满了,False表示不等待直接返回错误,True表示一直等待直到队列不满时
@param callback 结果回调函数
"""
def detectFile(self, detector, path, timeout_ms, wait_if_queuefull, callback):
if detector is None or path is None or callback is None:
return ERR_CODE.ERR_INIT.value
result = ERR_CODE.ERR_INIT.value
if wait_if_queuefull:
real_callback = callback
class AsyncTaskCallback(IDetectResultCallback):
def onScanResult(self, seq, file_path, callback_res):
if callback_res.error_code == ERR_CODE.ERR_DETECT_QUEUE_FULL:
return
real_callback.onScanResult(seq, file_path, callback_res)
callback = AsyncTaskCallback()
while True:
result = detector.detect(path, timeout_ms, callback)
if result != ERR_CODE.ERR_DETECT_QUEUE_FULL.value:
break
if wait_if_queuefull is False:
break
detector.waitQueueAvailable(-1)
return result
"""
同步检测URL文件接口
@param detector 检测器对象
@param url 待检测的文件URL
@param md5 待检测的文件md5
@param timeout_ms 设置超时时间,单位为毫秒
@param wait_if_queuefull 如果检测队列满了,false表示不等待直接返回错误,true表示一直等待直到队列不满时
"""
def detectUrlSync(self, detector, url, md5, timeout_ms, wait_if_queuefull):
if detector is None or url is None or md5 is None:
return None
result = None
while True:
result = detector.detectUrlSync(url, md5, timeout_ms)
if result is None:
break
if result.error_code != ERR_CODE.ERR_DETECT_QUEUE_FULL:
break
if wait_if_queuefull is False:
break
detector.waitQueueAvailable(-1)
return result
"""
异步检测URL文件接口
@param detector 检测器对象
@param url 待检测的文件URL
@param md5 待检测的文件md5
@param timeout_ms 设置超时时间,单位为毫秒
@param wait_if_queuefull 如果检测队列满了,false表示不等待直接返回错误,true表示一直等待直到队列不满时
@param callback 结果回调函数
"""
def detectUrl(self, detector, url, md5, timeout_ms, wait_if_queuefull, callback):
if detector is None or url is None or md5 is None or callback is None:
return ERR_CODE.ERR_INIT.value
result = ERR_CODE.ERR_INIT.value
if wait_if_queuefull:
real_callback = callback
class AsyncTaskCallback(IDetectResultCallback):
def onScanResult(self, seq, file_path, callback_res):
if callback_res.error_code == ERR_CODE.ERR_DETECT_QUEUE_FULL:
return
real_callback.onScanResult(seq, file_path, callback_res)
callback = AsyncTaskCallback()
while True:
result = detector.detectUrl(url, md5, timeout_ms, callback)
if result != ERR_CODE.ERR_DETECT_QUEUE_FULL.value:
break
if wait_if_queuefull is False:
break
detector.waitQueueAvailable(-1)
return result
"""
格式化检测结果
@param result 检测结果对象
@return 格式化后的字符串
"""
@staticmethod
def formatDetectResult(result):
msg = ""
if result.isSucc():
info = result.getDetectResultInfo()
msg = "[DETECT RESULT] [SUCCEED] {}".format(Sample.formatDetectResultInfo(info))
if info.compresslist is not None:
idx = 1
for comp_res in info.compresslist:
msg += "\n\t\t\t [COMPRESS FILE] [IDX:{}] {}".format(idx, Sample.formatCompressFileDetectResultInfo(comp_res))
idx += 1
else:
info = result.getErrorInfo()
msg = "[DETECT RESULT] [FAIL] md5: {}, time: {}, error_code: {}, error_message: {}".format(info.md5,
info.time, info.error_code.name, info.error_string)
return msg
@staticmethod
def formatDetectResultInfo(info):
msg = "MD5: {}, TIME: {}, RESULT: {}, SCORE: {}".format(info.md5, info.time, info.result.name, info.score)
if info.compresslist is not None:
msg += ", COMPRESS_FILES: {}".format(len(info.compresslist))
vinfo = info.getVirusInfo()
if vinfo is not None:
msg += ", VIRUS_TYPE: {}, EXT_INFO: {}".format(vinfo.virus_type, vinfo.ext_info)
return msg
@staticmethod
def formatCompressFileDetectResultInfo(info):
msg = "PATH: {}, \t\t RESULT: {}, SCORE: {}".format(info.path, info.result.name, info.score)
vinfo = info.getVirusInfo()
if vinfo is not None:
msg += ", VIRUS_TYPE: {}, EXT_INFO: {}".format(vinfo.virus_type, vinfo.ext_info)
return msg
"""
同步检测目录或文件
@param path 指定路径,可以是文件或者目录。目录的话就会递归遍历
@param is_sync 是否使用同步接口,推荐使用异步。 True是同步,False是异步
"""
def detectDirOrFileSync(self, detector, path, timeout_ms, result_map):
abs_path = os.path.abspath(path)
if os.path.isdir(abs_path):
sub_files = os.listdir(abs_path)
if len(sub_files) == 0:
return
for sub_file in sub_files:
sub_path = os.path.join(abs_path, sub_file)
self.detectDirOrFileSync(detector, sub_path, timeout_ms, result_map)
return
print("[detectFileSync] [BEGIN] queueSize: {}, path: {}, timeout: {}".format(
detector.getQueueSize(), abs_path, timeout_ms))
res = self.detectFileSync(detector, abs_path, timeout_ms, True)
print(" [ END ] {}".format(Sample.formatDetectResult(res)))
result_map[abs_path] = res
return
"""
异步检测目录或文件
@param path 指定路径,可以是文件或者目录。目录的话就会递归遍历
@param is_sync 是否使用同步接口,推荐使用异步。True是同步, False是异步
"""
def detectDirOrFile(self, detector, path, timeout_ms, callback):
abs_path = os.path.abspath(path)
if os.path.isdir(abs_path):
sub_files = os.listdir(abs_path)
if len(sub_files) == 0:
return
for sub_file in sub_files:
sub_path = os.path.join(abs_path, sub_file)
self.detectDirOrFile(detector, sub_path, timeout_ms, callback)
return
seq = self.detectFile(detector, abs_path, timeout_ms, True, callback)
print("[detectFile] [BEGIN] seq: {}, queueSize: {}, path: {}, timeout: {}".format(
seq, detector.getQueueSize(), abs_path, timeout_ms))
return
"""
开始对文件或目录进行检测
@param path 指定路径,可以是文件或者目录。目录的话就会递归遍历
@param is_sync 是否使用同步接口,推荐使用异步。 True是同步,False是异步
"""
def scan(self, detector, path, detect_timeout_ms, is_sync):
try:
print("[SCAN] [START] path: {}, detect_timeout_ms: {}, is_sync: {}".format(path, detect_timeout_ms, is_sync))
start_time = time.time()
result_map = {}
if is_sync:
self.detectDirOrFileSync(detector, path, detect_timeout_ms, result_map)
else:
class AsyncTaskCallback(IDetectResultCallback):
def onScanResult(self, seq, file_path, callback_res):
print("[detectFile] [ END ] seq: {}, queueSize: {}, {}".format(seq,
detector.getQueueSize(), Sample.formatDetectResult(callback_res)))
result_map[file_path] = callback_res
self.detectDirOrFile(detector, path, detect_timeout_ms, AsyncTaskCallback())
# 等待任务执行完成
detector.waitQueueEmpty(-1)
used_time_ms = (time.time() - start_time) * 1000
print("[SCAN] [ END ] used_time: {}, files: {}".format(int(used_time_ms), len(result_map)))
failed_count = 0
white_count = 0
black_count = 0
for file_path, res in result_map.items():
if res.isSucc():
if res.getDetectResultInfo().result == DetectResult.RESULT.RES_BLACK:
black_count += 1
else:
white_count += 1
else:
failed_count += 1
print(" fail_count: {}, white_count: {}, black_count: {}".format(
failed_count, white_count, black_count))
except Exception as e:
print(traceback.format_exc(), file=sys.stderr)
def main(self):
# 获取检测器实例
detector = OpenAPIDetector.get_instance()
# 读取环境变量中的Access Key ID和Access Key Secret
access_key_id = os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID')
access_key_secret = os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET')
# 初始化
init_ret = detector.init(access_key_id, access_key_secret)
print("INIT RET: {}".format(init_ret.name))
# 设置解压缩参数(可选,默认不解压压缩包)
decompress = True # 是否识别压缩文件并解压,默认为false
decompressMaxLayer = 5 # 最大解压层数,decompress参数为true时生效
decompressMaxFileCount = 1000 # 最大解压文件数,decompress参数为true时生效
initdec_ret = detector.initDecompress(decompress, decompressMaxLayer, decompressMaxFileCount)
print("INIT_DECOMPRESS RET: {}".format(initdec_ret.name))
if True:
# 示例用法1:扫描本地目录或文件
is_sync_scan = False # 是异步检测还是同步检测。异步检测性能更好。False表示异步检测
timeout_ms = 500000 # 单个样本检测时间,单位为毫秒
path = "test.bin" # 待扫描的文件或目录
# 启动扫描,直到扫描结束
self.scan(detector, path, timeout_ms, is_sync_scan)
if True:
# 示例用法2:扫描URL文件
timeout_ms = 500000
url = "https://xxxxxxxx.oss-cn-hangzhou-1.aliyuncs.com/xxxxx/xxxxxxxxxxxxxx?Expires=1671448125&OSSAccessKeyId=xxx" # 待扫描的URL文件
md5 = "a767ffc59d93125c7505b6e21d000000"
# 同步扫描。如果需要异步扫描,调用detectUrl接口
print("[detectUrlSync] [BEGIN] URL: {}, MD5: {}, TIMEOUT: {}".format(url, md5, timeout_ms))
result = self.detectUrlSync(detector, url, md5, timeout_ms, True)
print("[detectUrlSync] [ END ] {}".format(Sample.formatDetectResult(result)))
# 反初始化
print("Over.")
detector.uninit()
if __name__ == "__main__":
sample = Sample()
sample.main()
返回结果
调用SDK进行恶意文件检测后,只能在程序的运行结果中查看检测结果,检测结果不会同步到云安全中心控制台中。控制台中仅支持查看剩余检测次数。风险文件总览页面的统计数据,例如检测文件总数等,为OSS文件检测的结果。
struct DetectResult {
std::string md5; // 样本md5
long time = 0; // 用时,单位为毫秒
ERR_CODE error_code; // 错误码
std::string error_string; // 扩展错误信息
enum RESULT {
RES_WHITE = 0, //安全文件
RES_BLACK = 1, //可疑文件
RES_PENDING = 3 //检测中
};
RESULT result; //检测结果
int score; //检测分值,取值范围0~100
std::string virus_type; //病毒类型,如“WebShell/MalScript/Hacktool”
std::string ext_info; //扩展信息为JSON字符串
struct CompressFileDetectResultInfo {
std::string path; // 压缩文件路径
RESULT result; // 检测结果
int score; // 分值,取值范围0-100
std::string virus_type; //病毒类型,如“WebShell/MalScript/Hacktool”
std::string ext_info; //扩展信息为JSON字符串
};
std::list<struct CompressFileDetectResultInfo> compresslist = null; // 如果是压缩包,并且开启了压缩包解压参数,则此处会输出压缩包内文件检测结果
};
错误码说明如下:
enum ERR_CODE {
ERR_INIT = -100, // 需要初始化,或者重复初始化
ERR_FILE_NOT_FOUND = -99, // 文件未找到
ERR_DETECT_QUEUE_FULL = -98, // 检测队列满
ERR_CALL_API = -97, // 调用API错误
ERR_TIMEOUT = -96, // 超时
ERR_UPLOAD = -95, //文件上传失败;用户可重新发起检测,再次尝试
ERR_ABORT = -94, //程序退出,样本未得到检测
ERR_TIMEOUT_QUEUE = -93, //队列超时,用户发起检测频率过高或超时时间过短
ERR_MD5 = -92, // MD5格式不对
ERR_URL = -91, // URL格式不对
ERR_SUCC = 0 // 成功
};
检测分数越高,文件可能存在的风险越高。检测分数和危险等级对应表如下:
分数区间 | 危险等级 |
0~60 | 安全 |
61~70 | 风险(低危) |
71~80 | 可疑(中危) |
81~100 | 恶意(高危) |
在云安全中心控制台检测OSS中存储的文件
登录云安全中心控制台。在控制台左上角,选择需防护资产所在的区域中国。
在左侧导航栏,选择 。
单击OSS文件检测页签,选择合适的检测方式并执行检测。
如果您的Bucket没有在OSS文件检测的列表中,您可以单击同步Bucket,同步最新的Bucket列表。
检测方式
说明
操作步骤
手动全量检测
检测单个或多个Bucket内的所有文件。
在OSS文件检测页签,单击单个Bucke操作列的检测或选中多个Bucket后单击批量检测。
在检测对话框,指定需要检测文件的类型、检测文件范围、扫描路径等。
解压层级:如果选择检测的文件类型包含压缩类型,可以设置解压层级(最多5层,支持不解压)和单个压缩包解压文件数量限制(最多1000个)。
文件解密类型:默认不解密,如果OSS文件配置了服务端加密(SSE-KMS、SSE-OSS),可以设置解密后再检测。解密类型OSS对应SSE-OSS加密的OSS文件,解密类型KMS对应SSE-KMS加密的OSS文件。
检测文件范围:可选。设置检测范围的时间上限。设置后会检测文件更新时间晚于该时间的文件。
扫描路径:支持选择按前缀匹配(输入文件名前缀来匹配扫描指定文件)或配置到整个Bucket(扫描整个Bucket文件)。
单击确定。
手动增量检测
针对已检测且上次检测后有文件更新的Bucket,提供只检测未检测过的文件的功能。
在OSS文件检测页签,单击目标Bucke操作列的增量检测。
在增量检测对话框中,指定需要检测文件的类型、解压层级(压缩类型文件支持)、文件解密类型、检测文件范围、扫描路径(支持按前缀匹配和配置到整个Bucket)。
单击确定。
自动检测
通过配置扫描策略可为指定Bucket开启周期性自动检测。配置前建议了解以下信息:
一个Bucket只能在一个策略中生效。
自动检测仅会检测OSS新增的文件,不会重复检测同一个文件。
在OSS文件检测页签,单击策略管理区域的策略配置。
在策略管理面板,单击新增策略。
如果已配置策略的检测方案符合检测要求,您可以单击该策略操作列的编辑,将目标Bucket添加到该策略的生效范围内。
在策略创建面板,配置策略名称、策略启用状态(默认启用)、扫描路径(支持按前缀匹配和配置到整个Bucket)、检测文件范围、检测周期、文件检测时间、文件检测类型、解压层级(压缩类型文件支持)、文件解密类型和生效Bucket等。
单击确定。
可选:配置钉钉机器人通知
您可以在系统配置中添加钉钉机器人通知,通过钉钉群实时接收云安全中心检测的恶意文件告警信息。具体操作,请参见配置钉钉机器人通知。
查看和导出检测结果
风险文件总览
在风险文件总览页签:
查看风险文件统计信息
不同等级(高危、中危、低危)风险通过不同颜色显示:高危(红色)、中危(橙色)、低危(灰色)。
在风险文件列表中,可以根据检测场景列来选择查看调用SDK(API)和在控制台(OSS)检测的恶意文件信息,也可以单击列表左上方的搜索项下拉框,根据风险等级、文件名称、威胁标签、MD5或最新检测时间来查看目标恶意文件详情。
如果是压缩包文件,可单击文件前的展开图标,展示该压缩包下的风险文件列表,该压缩包文件的风险等级为其下风险文件中的最高风险等级。
查看目标风险文件详情
在风险文件列表中,单击目标风险文件操作列的详情,在该文件的详情面板,查看检测出的存在风险的文件详情、事件说明(包含恶意文件说明和处置建议)等信息。
在压缩包文件的风险详情面板,还可查看该压缩包中的风险文件数、解压后检测的文件总数,以及风险文件列表等信息。
导出所有风险文件列表
单击图标,等待文件导出完成后,在提示框中单击下载。
OSS文件检测
在OSS文件检测页签:
从OSS Bucket维度查看风险文件统计信息
不同等级(高危、中危、低危)风险通过不同颜色显示:高危(红色)、中危(橙色)、低危(灰色)。
查看目标OSS Bucket的检测详情
在Bucket列表中,单击目标Bucket操作列的详情,查看Bucket的基本信息和风险文件详情。
在压缩包的文件检测详情页面,还可以查看配置文件解压情况、解压后的文件总数、检测的文件数量以及已扫描、未扫描的风险文件列表。
按Bucket维度导出所有风险文件列表
在OSS文件检测页签,单击图标,等待文件导出完成后,在提示框中单击下载。
处理风险文件的告警事件
对于检测出的风险文件,需要您根据风险等级以及云安全中心提供的事件说明和处置建议,进行确认和手动修复。
高危风险事件:误报的概率较低,同时恶意性较强,建议及时处理。
中危和低危风险事件:该等级风险文件多出现在业务场景中,需要您核实对业务的影响后处理。
如果您确认是业务的正常文件或是误报,您可以忽略相关告警事件。如果您在业务程序中调用SDK检测文件,对于正常或误报的文件,您也可以在业务程序中添加忽略或加入白名单的处理逻辑。
相关API
您可以通过调用API接口方式使用恶意文件检测功能。具体使用方法如下: