背景
前段时间,接入了公司部分日志流量,大约每分钟30万的请求日志。计划做一个恶意URL检测的系统,分2个阶段:
第一阶段:基于黑规则检测
第二阶段:基于机器学习检测
期间参考过很多文章,大部分都在讨论怎么去发现恶意请求,这无可厚非。但是实际环境中恶意请求的占比又是多少呢?观察我们的日志发现90%的请求都是正常请求,其中一些请求非常明显(一看就是没问题那种),例如:
/1/api/new_games?v=0.06540659188776337
/5tBARviOXjSbscQH9AzRiw==/109951164132869215.jpg?param=36y36
/Active_Page/js/vue.min.js
/api.php?format=json&t=1
如果不做一定的过滤,明显正常的请求进入检测引擎,即影响处理效率又浪费资源,很容易造成KAFKA数据积压。所以本文算是一个预热吧,这里重点和大家探讨滤白的问题,希望能起到抛砖引玉的作用。后面我会陆续分享其他2个阶段,目前项目正在紧张开发中。
0×01. 数据处理
过滤策略如下:
1.静态请求,URL后缀为:
[".jpeg", ".gif", ".jpg", ".png", ".js", ".css", ".bmp", ".ico", ".woff", ".woff2", ".ttf", ".eot", ".apk", ".json", ".zip", ".rar", ".html", ".ico", ".swf", ".exe", ".dat", ".txt", ".cgi", ".ts”]
2.参数值为纯数字、纯字母、或者字母数字组合
3.参数值值包含:数字、字母、-、_
4.参数值长度小于5
5.参数为空的GET POST请求
6.http返回状态码为200
策略1:静态请求处理
获取URL后缀名,判断是否在后缀白名单中,部分代码如下:
#判断是否静态连接@staticmethoddef filter_static(url):suffix = Common.get_url_ext(url)if suffix.lower() in Config.STATIC_SUFFIXES:return Truereturn False#获取url文件后缀@staticmethoddef get_url_ext(url):try:path = urlparse.urlparse(url).pathreturn os.path.splitext(path)[1]except:return False
策略2:参数值为纯数字、纯字母、或者字母数字组合
#str.isdigit(): True 只包含数字if paramVal.isdigit():return True#str.isalpha():True 只包含字母if paramVal.isalpha():return True#str.isalnum():True 只包含字母或者数字if paramVal.isalnum():return True
策略2:参数值包含数字、字母、-、_
思路1:将字符串泛化:
-
[a-zA-Z]泛化为
A
-
[0-9]泛化为
N
-
[-_]泛化为
C
-
其他字符泛化为
T
-
字符串去重,判断去重后的结果中是否包含T
例如:test123
泛化后:AAAANNN
去重后:AN
思路2:词法分析
简单来说就是,定义5种合法状态,如下所示:
TK_STRING = 1
TK_INTEGER = 2
TK_UNDER = 3 # _
TK_STRAIGHT = 4 # –
TK_FLOAT = 5
然后扫描字符串,每个字符的前后字符都是合法状态,一旦出现非合法字符则标记为可疑,否则标白。
0×02. 遇到的问题
1.数据格式化
我们的日志都是使用Packetbeat抓取的,默认的格式比较复杂,其中有很多用不着的数据字段,故格式化后的 数据格式如下图:
{"@timestamp": "2019-06-14T15:04:40.864956011+08:00","cap_ip": "","cap_source": "D8:9D:67:13:EE:E2","cap_timens": 168524000,"connectIP": "109.70.282.173","dst": "123.126.104.7:80","geo_city": "","geo_country": "","geo_sla": "0.000","geo_slo": "0.000","kafka_pid": 1,"raw_time": "2019-06-14T15:04:40+08:00","realIP": "109.70.282.173","realUrl": "/apiV2/profile/newsListAjax","request.hc": 1,"request.host": "mp.xxxx.com","request.method": "GET","request.url": "/apiV2/profile/newsListAjax?xpt=cHBhZzkxNTMyNWUwZTJhMkBzb2h1LmNvbQ%3D%3D&pageNumber=2952&pageSize=25","request.user-agent": "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; TencentTraveler 4.0)","response.body": "{\"status\":1,\"msg\":\"error\"}","response.code": 200,"response.content-length": "26","response.content-type": "image/webp; charset=UTF-8","src": "109.70.282.173:29819","status.code": 1,"time": "2019-06-14 15:04:40","unix_time": 1560495880}
2.参数提取
GET请求相对简单些,废话不多说,代码如下:
def getQueryString(request):data = []try:url = request["request.url"]result = urlparse.urlparse(url)query = result.query#urlparse.parse_qsl解析url请求切割参数时,遇到";"会截断,导致获取的参数值缺失";"后面的内容if ";" in query:query = re.sub(r";", "@@@@", query)params = urlparse.parse_qsl(query, True)for k, v in params:if not v:continue#恢复分号if "@@@@" in v:v = re.sub(r"@@@@", ";", v)if paramValFilter(v):continuedata.append(v)except Exception, e:print "parse query error:", e, request
POST请求参数格式比较复杂,我们这里基本都是JSON格式,而且大多为深度JSON,需要递归提取:
def getDeepJsonVal(data, result=[]):if isinstance(data, dict):for key, value in data.items():getDeepJsonVal(value, result)elif isinstance(data, list):for value in data:getDeepJsonVal(value)else:if isinstance(data, unicode) and ("{" in data or "[" in data):try:getDeepJsonVal(simplejson.loads(data))except:if not paramValFilter(data):result.append(data)else:if not paramValFilter(data):result.append(data)return result
3.参数值中包含中文,直接跳过,不做处理
def filterChinese(check_str): # 过滤中文for ch in check_str.decode("utf-8"):if u"\u4e00" <= ch <= u"\u9fff":return Truereturn False
0×03.结果分析:
处理了10万条线上请求数据,标白的请求数为:44656,过滤掉了45%的请求,效果比较理想,甚至有点出乎我的意料,Very Nice!
代码传送门:https://github.com/skskevin/UrlDetect
声明:本文来自小豹讲安全,版权归作者所有。文章内容仅代表作者独立观点,不代表士冗科技立场,转载目的在于传递更多信息。如有侵权,请联系 service@expshell.com。