由于一些简单的需求,所以写了这个东西,原本想着搞很多功能的,最后实践的时候就剩下统计每24小时的请求量、拦截量和拦截事件top6。
代码大概的内容是使用雷池waf社区版的api(官方不列,需要自己抓包),将各个waf节点的24小时访问量、拦截量相加并使用webhook推送到企业微信群,将拦截事件进行排序,按照拦截数量进行排序,并推送top6。另外拦截这部分具体需要看个人的waf的配置规则。
直接上py代码吧
import requests
import time
from datetime import datetime, timedelta
# 定义常量
BASE_URLS = [
"https://10.120.20.13x:9443",
"https://10.120.20.12x:9443",
"https://10.120.20.13x:9443",
"https://10.120.18.10x:9443",
"https://10.120.18.10x:9443",
]
# 雷池waf后台生成的api token
TOKENS = [
"6xx",
"Cxx",
"Mxx",
"Hxx",
"Rxx",
]
# key不能泄露哈
WEBHOOK_URL = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=f7xx"
# 国家代码到中文名称的映射
COUNTRY_CODE_TO_NAME = {
"AF": "阿富汗",
"AL": "阿尔巴尼亚",
"DZ": "阿尔及利亚",
"AS": "美属萨摩亚",
"AD": "安道尔",
"AO": "安哥拉",
"AI": "安圭拉",
"AQ": "南极洲",
"AG": "安提瓜和巴布达",
"AR": "阿根廷",
"AM": "亚美尼亚",
"AW": "阿鲁巴",
"AU": "澳大利亚",
"AT": "奥地利",
"AZ": "阿塞拜疆",
"BS": "巴哈马",
"BH": "巴林",
"BD": "孟加拉国",
"BB": "巴巴多斯",
"BY": "白俄罗斯",
"BE": "比利时",
"BZ": "伯利兹",
"BJ": "贝宁",
"BM": "百慕大",
"BT": "不丹",
"BO": "玻利维亚",
"BA": "波斯尼亚和黑塞哥维那",
"BW": "博茨瓦纳",
"BR": "巴西",
"IO": "英属印度洋领地",
"BN": "文莱",
"BG": "保加利亚",
"BF": "布基纳法索",
"BI": "布隆迪",
"KH": "柬埔寨",
"CM": "喀麦隆",
"CA": "加拿大",
"CV": "佛得角",
"KY": "开曼群岛",
"CF": "中非共和国",
"TD": "乍得",
"CL": "智利",
"CN": "中国",
"CO": "哥伦比亚",
"KM": "科摩罗",
"CG": "刚果(布)",
"CD": "刚果(金)",
"CK": "库克群岛",
"CR": "哥斯达黎加",
"CI": "科特迪瓦",
"HR": "克罗地亚",
"CU": "古巴",
"CY": "塞浦路斯",
"CZ": "捷克",
"DK": "丹麦",
"DJ": "吉布提",
"DM": "多米尼克",
"DO": "多米尼加共和国",
"EC": "厄瓜多尔",
"EG": "埃及",
"SV": "萨尔瓦多",
"GQ": "赤道几内亚",
"ER": "厄立特里亚",
"EE": "爱沙尼亚",
"ET": "埃塞俄比亚",
"FJ": "斐济",
"FI": "芬兰",
"FR": "法国",
"GA": "加蓬",
"GM": "冈比亚",
"GE": "格鲁吉亚",
"DE": "德国",
"GH": "加纳",
"GR": "希腊",
"GD": "格林纳达",
"GU": "关岛",
"GT": "危地马拉",
"GN": "几内亚",
"GW": "几内亚比绍",
"GY": "圭亚那",
"HT": "海地",
"HN": "洪都拉斯",
"HU": "匈牙利",
"IS": "冰岛",
"IN": "印度",
"ID": "印度尼西亚",
"IR": "伊朗",
"IQ": "伊拉克",
"IE": "爱尔兰",
"IL": "以色列",
"IT": "意大利",
"JM": "牙买加",
"JP": "日本",
"JO": "约旦",
"KZ": "哈萨克斯坦",
"KE": "肯尼亚",
"KI": "基里巴斯",
"KP": "朝鲜",
"KR": "韩国",
"KW": "科威特",
"KG": "吉尔吉斯斯坦",
"LA": "老挝",
"LV": "拉脱维亚",
"LB": "黎巴嫩",
"LS": "莱索托",
"LR": "利比里亚",
"LY": "利比亚",
"LI": "列支敦士登",
"LT": "立陶宛",
"LU": "卢森堡",
"MG": "马达加斯加",
"MW": "马拉维",
"MY": "马来西亚",
"MV": "马尔代夫",
"ML": "马里",
"MT": "马耳他",
"MH": "马绍尔群岛",
"MR": "毛里塔尼亚",
"MU": "毛里求斯",
"MX": "墨西哥",
"FM": "密克罗尼西亚",
"MD": "摩尔多瓦",
"MC": "摩纳哥",
"MN": "蒙古",
"ME": "黑山",
"MA": "摩洛哥",
"MZ": "莫桑比克",
"MM": "缅甸",
"NA": "纳米比亚",
"NR": "瑙鲁",
"NP": "尼泊尔",
"NL": "荷兰",
"NZ": "新西兰",
"NI": "尼加拉瓜",
"NE": "尼日尔",
"NG": "尼日利亚",
"NO": "挪威",
"OM": "阿曼",
"PK": "巴基斯坦",
"PW": "帕劳",
"PA": "巴拿马",
"PG": "巴布亚新几内亚",
"PY": "巴拉圭",
"PE": "秘鲁",
"PH": "菲律宾",
"PL": "波兰",
"PT": "葡萄牙",
"QA": "卡塔尔",
"RO": "罗马尼亚",
"RU": "俄罗斯",
"RW": "卢旺达",
"WS": "萨摩亚",
"SM": "圣马力诺",
"SA": "沙特阿拉伯",
"SN": "塞内加尔",
"RS": "塞尔维亚",
"SC": "塞舌尔",
"SL": "塞拉利昂",
"SG": "新加坡",
"SK": "斯洛伐克",
"SI": "斯洛文尼亚",
"SB": "所罗门群岛",
"SO": "索马里",
"ZA": "南非",
"ES": "西班牙",
"LK": "斯里兰卡",
"SD": "苏丹",
"SR": "苏里南",
"SZ": "斯威士兰",
"SE": "瑞典",
"CH": "瑞士",
"SY": "叙利亚",
"TW": "中国台湾",
"TJ": "塔吉克斯坦",
"TZ": "坦桑尼亚",
"TH": "泰国",
"TL": "东帝汶",
"TG": "多哥",
"TO": "汤加",
"TT": "特立尼达和多巴哥",
"TN": "突尼斯",
"TR": "土耳其",
"TM": "土库曼斯坦",
"UG": "乌干达",
"UA": "乌克兰",
"AE": "阿联酋",
"GB": "英国",
"US": "美国",
"UY": "乌拉圭",
"UZ": "乌兹别克斯坦",
"VU": "瓦努阿图",
"VA": "梵蒂冈",
"VE": "委内瑞拉",
"VN": "越南",
"YE": "也门",
"ZM": "赞比亚",
"ZW": "津巴布韦"
}
# 获取时间戳和格式化时间
now = int(time.time())
yesterday = int((datetime.now() - timedelta(days=1)).timestamp())
now_str = datetime.now().strftime("%Y年%m月%d日-%H:%M")
yesterday_str = (datetime.now() - timedelta(days=1)).strftime("%Y年%m月%d日-%H:%M")
# 定义请求数据的函数
def fetch_data(url, headers):
try:
response = requests.get(url, headers=headers, verify=False) # 忽略SSL证书验证(仅用于开发环境)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"请求失败: {e}")
return None
# 请求攻击和访问接口数据
def fetch_and_write_data(url_suffix, tokens, output_file, data_key, sub_key):
with open(output_file, "w", encoding="utf-8") as file:
for base_url, token in zip(BASE_URLS, tokens):
url = f"{base_url}/api/stat/advance/{url_suffix}?begin_time={yesterday}&end_time={now}"
headers = {"X-Slce-Api-Token": token}
data = fetch_data(url, headers)
if data:
value = data.get('data', {}).get(data_key, {}).get(sub_key, 0) if isinstance(sub_key, str) else data.get('data', {}).get(data_key, 0)
file.write(f"{value}\n")
# 写入攻击数据到 attack.txt(提取 'block' 和 'rate_limit')
with open("attack.txt", "w", encoding="utf-8") as file:
for base_url, token in zip(BASE_URLS, TOKENS):
url = f"{base_url}/api/stat/advance/attack?begin_time={yesterday}&end_time={now}"
headers = {"X-Slce-Api-Token": token}
data = fetch_data(url, headers)
if data:
block = data.get('data', {}).get('intercept', {}).get('block', 0)
rate_limit = data.get('data', {}).get('intercept', {}).get('rate_limit', 0)
file.write(f"{block}\n{rate_limit}\n")
print("攻击数据已写入文件 attack.txt")
# 写入访问数据到 access.txt(提取 'access')
fetch_and_write_data("access", TOKENS, "access.txt", "access", None)
print("访问数据已写入文件 access.txt")
# 新API请求及排序处理
all_records = []
for base_url, token in zip(BASE_URLS, TOKENS):
url = f"{base_url}/api/open/records/acl?page=1&page_size=100&created_at_begin={yesterday}&created_at_end={now}"
headers = {"X-Slce-Api-Token": token}
data = fetch_data(url, headers)
if data and 'data' in data and 'data' in data['data']:
all_records.extend(data['data']['data'])
# 对所有返回的数据按照 denied_count 从大到小排序
sorted_records = sorted(all_records, key=lambda x: x.get('denied_count', 0), reverse=True)
# 提取前6个记录
top_6_records = sorted_records[:6]
# 构建防护事件TOP6的消息内容
top_6_message = "\n".join([
f"{i+1}. 源IP {record['ip']} ({COUNTRY_CODE_TO_NAME.get(record['country'], record['country'])}-{record.get('province', '')}-{record.get('city', '')}),攻击目标为{record['site_comment']},拦截数量为{record['denied_count']};"
for i, record in enumerate(top_6_records)
])
# 读取文件并计算总和
def sum_file(filename):
try:
with open(filename, "r", encoding="utf-8") as file:
return sum(int(line.strip()) for line in file if line.strip().isdigit())
except Exception as e:
print(f"读取文件 {filename} 失败: {e}")
return 0
acc_counts = sum_file("access.txt")
att_counts = sum_file("attack.txt")
# 推送消息到企业微信
message_content = (
f"{yesterday_str} 至 {now_str} 的WAF数据统计,互联网总请求量为 {acc_counts},总拦截数量为 {att_counts}。\n"
f"防护事件TOP6为:\n{top_6_message}"
)
message = {
"msgtype": "text",
"text": {
"content": message_content
}
}
try:
response = requests.post(WEBHOOK_URL, json=message)
response.raise_for_status()
print("消息推送成功")
except requests.exceptions.RequestException as e:
print(f"消息推送失败: {e}")
保存到文件里面原本是想着复盘或者做每月统计之类的用,后面懒得动了,就这样吧。
效果图大概如这样: