Administrator
发布于 2024-10-04 / 21 阅读
0
0

使用长亭雷池waf社区版api进行简单的每日统计推送

由于一些简单的需求,所以写了这个东西,原本想着搞很多功能的,最后实践的时候就剩下统计每24小时的请求量、拦截量和拦截事件top6。

代码大概的内容是使用雷池waf社区版的api(官方不列,需要自己抓包),将各个waf节点的24小时访问量、拦截量相加并使用webhook推送到企业微信群,将拦截事件进行排序,按照拦截数量进行排序,并推送top6。另外拦截这部分具体需要看个人的waf的配置规则。

直接上py代码吧

import requests
import time
from datetime import datetime, timedelta

# 定义常量
BASE_URLS = [
    "https://10.120.20.13x:9443",
    "https://10.120.20.12x:9443",
    "https://10.120.20.13x:9443",
    "https://10.120.18.10x:9443",
    "https://10.120.18.10x:9443",
]
# 雷池waf后台生成的api token
TOKENS = [
    "6xx",
    "Cxx",
    "Mxx",
    "Hxx",
    "Rxx",
]

# key不能泄露哈
WEBHOOK_URL = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=f7xx"

# 国家代码到中文名称的映射
COUNTRY_CODE_TO_NAME = {
    "AF": "阿富汗",
    "AL": "阿尔巴尼亚",
    "DZ": "阿尔及利亚",
    "AS": "美属萨摩亚",
    "AD": "安道尔",
    "AO": "安哥拉",
    "AI": "安圭拉",
    "AQ": "南极洲",
    "AG": "安提瓜和巴布达",
    "AR": "阿根廷",
    "AM": "亚美尼亚",
    "AW": "阿鲁巴",
    "AU": "澳大利亚",
    "AT": "奥地利",
    "AZ": "阿塞拜疆",
    "BS": "巴哈马",
    "BH": "巴林",
    "BD": "孟加拉国",
    "BB": "巴巴多斯",
    "BY": "白俄罗斯",
    "BE": "比利时",
    "BZ": "伯利兹",
    "BJ": "贝宁",
    "BM": "百慕大",
    "BT": "不丹",
    "BO": "玻利维亚",
    "BA": "波斯尼亚和黑塞哥维那",
    "BW": "博茨瓦纳",
    "BR": "巴西",
    "IO": "英属印度洋领地",
    "BN": "文莱",
    "BG": "保加利亚",
    "BF": "布基纳法索",
    "BI": "布隆迪",
    "KH": "柬埔寨",
    "CM": "喀麦隆",
    "CA": "加拿大",
    "CV": "佛得角",
    "KY": "开曼群岛",
    "CF": "中非共和国",
    "TD": "乍得",
    "CL": "智利",
    "CN": "中国",
    "CO": "哥伦比亚",
    "KM": "科摩罗",
    "CG": "刚果(布)",
    "CD": "刚果(金)",
    "CK": "库克群岛",
    "CR": "哥斯达黎加",
    "CI": "科特迪瓦",
    "HR": "克罗地亚",
    "CU": "古巴",
    "CY": "塞浦路斯",
    "CZ": "捷克",
    "DK": "丹麦",
    "DJ": "吉布提",
    "DM": "多米尼克",
    "DO": "多米尼加共和国",
    "EC": "厄瓜多尔",
    "EG": "埃及",
    "SV": "萨尔瓦多",
    "GQ": "赤道几内亚",
    "ER": "厄立特里亚",
    "EE": "爱沙尼亚",
    "ET": "埃塞俄比亚",
    "FJ": "斐济",
    "FI": "芬兰",
    "FR": "法国",
    "GA": "加蓬",
    "GM": "冈比亚",
    "GE": "格鲁吉亚",
    "DE": "德国",
    "GH": "加纳",
    "GR": "希腊",
    "GD": "格林纳达",
    "GU": "关岛",
    "GT": "危地马拉",
    "GN": "几内亚",
    "GW": "几内亚比绍",
    "GY": "圭亚那",
    "HT": "海地",
    "HN": "洪都拉斯",
    "HU": "匈牙利",
    "IS": "冰岛",
    "IN": "印度",
    "ID": "印度尼西亚",
    "IR": "伊朗",
    "IQ": "伊拉克",
    "IE": "爱尔兰",
    "IL": "以色列",
    "IT": "意大利",
    "JM": "牙买加",
    "JP": "日本",
    "JO": "约旦",
    "KZ": "哈萨克斯坦",
    "KE": "肯尼亚",
    "KI": "基里巴斯",
    "KP": "朝鲜",
    "KR": "韩国",
    "KW": "科威特",
    "KG": "吉尔吉斯斯坦",
    "LA": "老挝",
    "LV": "拉脱维亚",
    "LB": "黎巴嫩",
    "LS": "莱索托",
    "LR": "利比里亚",
    "LY": "利比亚",
    "LI": "列支敦士登",
    "LT": "立陶宛",
    "LU": "卢森堡",
    "MG": "马达加斯加",
    "MW": "马拉维",
    "MY": "马来西亚",
    "MV": "马尔代夫",
    "ML": "马里",
    "MT": "马耳他",
    "MH": "马绍尔群岛",
    "MR": "毛里塔尼亚",
    "MU": "毛里求斯",
    "MX": "墨西哥",
    "FM": "密克罗尼西亚",
    "MD": "摩尔多瓦",
    "MC": "摩纳哥",
    "MN": "蒙古",
    "ME": "黑山",
    "MA": "摩洛哥",
    "MZ": "莫桑比克",
    "MM": "缅甸",
    "NA": "纳米比亚",
    "NR": "瑙鲁",
    "NP": "尼泊尔",
    "NL": "荷兰",
    "NZ": "新西兰",
    "NI": "尼加拉瓜",
    "NE": "尼日尔",
    "NG": "尼日利亚",
    "NO": "挪威",
    "OM": "阿曼",
    "PK": "巴基斯坦",
    "PW": "帕劳",
    "PA": "巴拿马",
    "PG": "巴布亚新几内亚",
    "PY": "巴拉圭",
    "PE": "秘鲁",
    "PH": "菲律宾",
    "PL": "波兰",
    "PT": "葡萄牙",
    "QA": "卡塔尔",
    "RO": "罗马尼亚",
    "RU": "俄罗斯",
    "RW": "卢旺达",
    "WS": "萨摩亚",
    "SM": "圣马力诺",
    "SA": "沙特阿拉伯",
    "SN": "塞内加尔",
    "RS": "塞尔维亚",
    "SC": "塞舌尔",
    "SL": "塞拉利昂",
    "SG": "新加坡",
    "SK": "斯洛伐克",
    "SI": "斯洛文尼亚",
    "SB": "所罗门群岛",
    "SO": "索马里",
    "ZA": "南非",
    "ES": "西班牙",
    "LK": "斯里兰卡",
    "SD": "苏丹",
    "SR": "苏里南",
    "SZ": "斯威士兰",
    "SE": "瑞典",
    "CH": "瑞士",
    "SY": "叙利亚",
    "TW": "中国台湾",
    "TJ": "塔吉克斯坦",
    "TZ": "坦桑尼亚",
    "TH": "泰国",
    "TL": "东帝汶",
    "TG": "多哥",
    "TO": "汤加",
    "TT": "特立尼达和多巴哥",
    "TN": "突尼斯",
    "TR": "土耳其",
    "TM": "土库曼斯坦",
    "UG": "乌干达",
    "UA": "乌克兰",
    "AE": "阿联酋",
    "GB": "英国",
    "US": "美国",
    "UY": "乌拉圭",
    "UZ": "乌兹别克斯坦",
    "VU": "瓦努阿图",
    "VA": "梵蒂冈",
    "VE": "委内瑞拉",
    "VN": "越南",
    "YE": "也门",
    "ZM": "赞比亚",
    "ZW": "津巴布韦"
}

# 获取时间戳和格式化时间
now = int(time.time())
yesterday = int((datetime.now() - timedelta(days=1)).timestamp())

now_str = datetime.now().strftime("%Y年%m月%d日-%H:%M")
yesterday_str = (datetime.now() - timedelta(days=1)).strftime("%Y年%m月%d日-%H:%M")

# 定义请求数据的函数
def fetch_data(url, headers):
    try:
        response = requests.get(url, headers=headers, verify=False)  # 忽略SSL证书验证(仅用于开发环境)
        response.raise_for_status()
        return response.json()
    except requests.exceptions.RequestException as e:
        print(f"请求失败: {e}")
        return None

# 请求攻击和访问接口数据
def fetch_and_write_data(url_suffix, tokens, output_file, data_key, sub_key):
    with open(output_file, "w", encoding="utf-8") as file:
        for base_url, token in zip(BASE_URLS, tokens):
            url = f"{base_url}/api/stat/advance/{url_suffix}?begin_time={yesterday}&end_time={now}"
            headers = {"X-Slce-Api-Token": token}
            data = fetch_data(url, headers)
            
            if data:
                value = data.get('data', {}).get(data_key, {}).get(sub_key, 0) if isinstance(sub_key, str) else data.get('data', {}).get(data_key, 0)
                file.write(f"{value}\n")

# 写入攻击数据到 attack.txt(提取 'block' 和 'rate_limit')
with open("attack.txt", "w", encoding="utf-8") as file:
    for base_url, token in zip(BASE_URLS, TOKENS):
        url = f"{base_url}/api/stat/advance/attack?begin_time={yesterday}&end_time={now}"
        headers = {"X-Slce-Api-Token": token}
        data = fetch_data(url, headers)
        
        if data:
            block = data.get('data', {}).get('intercept', {}).get('block', 0)
            rate_limit = data.get('data', {}).get('intercept', {}).get('rate_limit', 0)
            file.write(f"{block}\n{rate_limit}\n")

print("攻击数据已写入文件 attack.txt")

# 写入访问数据到 access.txt(提取 'access')
fetch_and_write_data("access", TOKENS, "access.txt", "access", None)
print("访问数据已写入文件 access.txt")

# 新API请求及排序处理
all_records = []

for base_url, token in zip(BASE_URLS, TOKENS):
    url = f"{base_url}/api/open/records/acl?page=1&page_size=100&created_at_begin={yesterday}&created_at_end={now}"
    headers = {"X-Slce-Api-Token": token}
    data = fetch_data(url, headers)
    if data and 'data' in data and 'data' in data['data']:
        all_records.extend(data['data']['data'])

# 对所有返回的数据按照 denied_count 从大到小排序
sorted_records = sorted(all_records, key=lambda x: x.get('denied_count', 0), reverse=True)

# 提取前6个记录
top_6_records = sorted_records[:6]

# 构建防护事件TOP6的消息内容
top_6_message = "\n".join([
    f"{i+1}. 源IP {record['ip']} ({COUNTRY_CODE_TO_NAME.get(record['country'], record['country'])}-{record.get('province', '')}-{record.get('city', '')}),攻击目标为{record['site_comment']},拦截数量为{record['denied_count']};"
    for i, record in enumerate(top_6_records)
])

# 读取文件并计算总和
def sum_file(filename):
    try:
        with open(filename, "r", encoding="utf-8") as file:
            return sum(int(line.strip()) for line in file if line.strip().isdigit())
    except Exception as e:
        print(f"读取文件 {filename} 失败: {e}")
        return 0

acc_counts = sum_file("access.txt")
att_counts = sum_file("attack.txt")

# 推送消息到企业微信
message_content = (
    f"{yesterday_str} 至 {now_str} 的WAF数据统计,互联网总请求量为 {acc_counts},总拦截数量为 {att_counts}。\n"
    f"防护事件TOP6为:\n{top_6_message}"
)

message = {
    "msgtype": "text",
    "text": {
        "content": message_content
    }
}

try:
    response = requests.post(WEBHOOK_URL, json=message)
    response.raise_for_status()
    print("消息推送成功")
except requests.exceptions.RequestException as e:
    print(f"消息推送失败: {e}")

保存到文件里面原本是想着复盘或者做每月统计之类的用,后面懒得动了,就这样吧。

效果图大概如这样:


评论