分析XSSstrike源码

发布于:2025-06-02 ⋅ 阅读:(19) ⋅ 点赞:(0)

#用于学习web安全自动化工具#

我能收获什么?

1.XSS漏洞检测机制

  • 学习如何构造和发送XSS payload
  • 如何识别响应中的回显,WAF,过滤规则等
  • 如何使用词典,编码策略,上下文探测等绕过过滤器

2.Python安全工具开发技巧

  • 使用requests,urllib等模块构造请求
  • 多线程异步扫描
  • 数据结构组织方式
  • 命令行界面交互的设计思路

3.源码工程结构与模块划分

  • 如何将工具拆分为功能模块
  • 如何设计配置文件,命令参数,插件式架构

4.反WAF与XSS绕过技巧

  • 内置的绕过payload和算法
  • DOM与反射型XSS的区分探测方式

学习步骤

  1. 快速了解工具的功能和使用方式(阅读README文档,运行XSSstrike,观察常见命令行参数,输出内容,执行流程)
  2. 通过项目结构,理解各个模块的职责(梳理模块用途,建立模块依赖关系图)
  3. 选择一个典型流程深入分析
  4. 学习其中用到的关键技术

项目概述

理解XSSstrike运作原理,学习其中的技术

模块 功能说明 是否有独立文件/目录
智能 Payload 生成器 根据上下文动态构造有效 Payload,而非使用固定词典 core/payloads.py
HTML/JS 解析器 手工编写的解析器,能理解上下文 core/parsers/ 下可能包含
Fuzzing 引擎 自动注入参数 + 分析响应 + 变异构造 core/fuzzer.py
爬虫引擎 多线程爬虫(基于 Photon) core/crawler.py
参数发现工具 自动挖掘 URL 参数(可能集成 Arjun) core/parameter.py
WAF 探测与绕过 检测是否被 WAF 拦截并尝试绕过 core/waf.py
DOM XSS 扫描 分析 JavaScript 中触发的 XSS(可能用到 headless 浏览器) core/dom_scanner.py
盲 XSS 支持 支持回显不可见的注入测试 可能集成 external webhook
Payload 编码器 支持 URL、Unicode、HTML 编码等 utils/encoder.py
配置模块 支持高度定制,可能有配置文件或命令参数 config.pycore/config.py

学习流程

一.运行XSSstrike+跟踪主入口

进行调试:我使用的是vscode,launch.json是其中的调试配置文件,用于告诉vscode要调试哪个程序(program),使用什么语言(python),传入哪些命令参数(args),是否使用终端,是否附加已有进程等等

添加断点,进行调试,观察运行过程

二.分析主程序入口(xsstrike.py)

功能概述:这是主程序入口,所有的功能模块都是从这里开始被调用

内容:

1.基础引入和环境检测

2.参数解析:argparse

参数 长参数形式 说明
-h --help 显示帮助信息并退出
-u --url TARGET 目标 URL
--data PARAMDATA POST 数据
-e --encode ENCODE 对 payload 进行编码
--fuzzer 启动 fuzz 模式
--update 检查并安装更新
--timeout TIMEOUT 设置请求超时时间
--proxy 使用代理
--crawl 启用爬虫功能
--json 将 POST 数据作为 JSON 格式处理
--path 在 URL 路径中注入 payload
--seeds ARGS_SEEDS 从文件加载爬虫种子 URL
-f --file ARGS_FILE 从文件加载 payload 列表
-l --level LEVEL 设置爬虫的深度级别
--headers [ADD_HEADERS] 添加自定义请求头
-t --threads THREADCOUNT 设置线程数量
-d --delay DELAY 请求之间的延迟时间
--skip 不询问是否继续
--skip-dom 跳过 DOM XSS 检测
--blind 爬虫过程中注入 Blind XSS payload
--console-log-level {DEBUG,INFO,RUN,GOOD,WARNING,ERROR,CRITICAL,VULN} 设置控制台日志级别
--file-log-level {DEBUG,INFO,RUN,GOOD,WARNING,ERROR,CRITICAL,VULN} 设置日志文件的日志级别
--log-file LOG_FILE 指定日志文件名称

3.配置与全局变量初始化:将命令行参数作为全局配置传入globalVariables,方便各模块共享使用,处理headers和初始化关键模块

if type(args.add_headers) == bool:
    headers = extractHeaders(prompt())  # 手动输入
elif type(args.add_headers) == str:
    headers = extractHeaders(args.add_headers)  # 解析命令行输入
from core.config import blindPayload
from core.encoders import base64
from core.photon import photon
from core.prompt import prompt
from core.utils import extractHeaders, reader, converter
#这些是 XSStrike 自研的模块,用于数据处理、payload 编码、用户输入交互、爬虫等。

4.程序执行主逻辑(主控制流)

1)单次fuzz模式

if fuzz:
    singleFuzz(target, paramData, encoding, headers, delay, timeout)

2)非递归扫描模式

elif not recursive and not args_seeds:
    if args_file:
        bruteforcer(target, paramData, payloadList, encoding, headers, delay, timeout)
    else:
        scan(target, paramData, encoding, headers, delay, timeout, skipDOM, skip)

解释:scan()是常规XSS扫描的主函数,bruteforcer是自定义的payload扫描

3)爬虫+全站扫描模式

else:
    if target:
        seedList.append(target)
    for target in seedList:
        logger.run('Crawling the target')
        scheme = urlparse(target).scheme
        logger.debug('Target scheme: {}'.format(scheme))
        host = urlparse(target).netloc
        main_url = scheme + '://' + host
        crawlingResult = photon(target, headers, level,
                                threadCount, delay, timeout, skipDOM)
        forms = crawlingResult[0]
        domURLs = list(crawlingResult[1])
        difference = abs(len(domURLs) - len(forms))
        if len(domURLs) > len(forms):
            for i in range(difference):
                forms.append(0)
        elif len(forms) > len(domURLs):
            for i in range(difference):
                domURLs.append(0)
        threadpool = concurrent.futures.ThreadPoolExecutor(max_workers=threadCount)
        futures = (threadpool.submit(crawl, scheme, host, main_url, form,
                                     blindXSS, blindPayload, headers, delay, timeout, encoding) for form, domURL in zip(forms, domURLs))
        for i, _ in enumerate(concurrent.futures.as_completed(futures)):
            if i + 1 == len(forms) or (i + 1) % threadCount == 0:
                logger.info('Progress: %i/%i\r' % (i + 1, len(forms)))
        logger.no_format('')

解释:基于Phonton进行自动爬虫,并结合HTML/JS解析器对表单/链接发起扫描和fuzz

总模块结构图:

xsstrike.py
├── argparse 命令行处理
├── 核心配置加载 core/config
├── 日志模块初始化 core/log
├── 请求头处理 extractHeaders/prompt
├── 调用模式模块:
│   ├── modes/scan.py
│   ├── modes/singleFuzz.py
│   ├── modes/bruteforcer.py
│   └── core/photon.py + crawl.py (爬虫+分析)
└── payload/参数/headers/encoder 等辅助模块调用

三.modes/scan.py

功能概述:这个函数是XSStrike的核心扫描逻辑,其主要职责是对目标URL参数进行反射检查,判断是否存在XSS,分析过滤器绕过策略,并生成有效payload进行验证

内容:

1.URL构造和初始请求

GET, POST = (False, True) if paramData else (True, False)

解释:如果指定了 paramData(POST 数据),则使用 POST,否则默认 GET。

if not target.startswith('http'):
        try:
            response = requester('https://' + target, {},
                                 headers, GET, delay, timeout)
            target = 'https://' + target
        except:
            target = 'http://' + target

解释:自动补全协议头(如 example.comhttps://example.com

2.DOM XSS检测

if not skipDOM:
        logger.run('Checking for DOM vulnerabilities')
        highlighted = dom(response)
        if highlighted:
            logger.good('Potentially vulnerable objects found')
            logger.red_line(level='good')
            for line in highlighted:
                logger.no_format(line, level='good')
            logger.red_line(level='good')

解释:使用core/dom.py模块进行静态DOM XSS检查,如果HTML中出现可能被动态执行的eval()或innerHTML拼接位置,会被标记

3.参数提取和WAF检测

params = getParams(target, paramData, GET)

解释:从URL或POST data中提取参数

WAF = wafDetector(url, {list(params.keys())[0]: xsschecker}, headers, GET, delay, timeout)

解释:使用core/wafDetector.py模块,向目标发送特征性payload并分析相应内容,检测是否存在WAF检测

4.反射检测+payload构造主循环

对每一个参数逐一进行 fuzz 和检测:

for paramName in params.keys():
    paramsCopy = copy.deepcopy(params)
    ...

1)参数注入和反射检测

if encoding:
            paramsCopy[paramName] = encoding(xsschecker)
        else:
            paramsCopy[paramName] = xsschecker
        response = requester(url, paramsCopy, headers, GET, delay, timeout)
        occurences = htmlParser(response, encoding)

解释:注入payload,并分析响应中是否出现该值,htmlParser() 返回值如 {<位置>: <标签上下文>}

2)分析过滤器

efficiencies = filterChecker(
            url, paramsCopy, headers, GET, delay, occurences, timeout, encoding)

解释:使用多个轻量payload模拟不同类型的XSS注入,检测哪些被过滤,返回每种payload的注入效率指标。

3)生成绕过payload

vectors = generator(occurences, response.text)

解释:根据参数反射位置生成上下文对应的payload列表,含逃逸策略和双引号,尖括号编码等变体。

5.Payload验证循环

for confidence, vects in vectors.items():
    for vect in vects:
        ...
        efficiencies = checker(...)

1)判断有效性

bestEfficiency = max(efficiencies)

解释:如果payload达到效率100或>=95且以\开头,则认为是高危

2)是否继续扫描

if not skip:
    choice = input('...Continue?').lower()
    if choice != 'y':
        quit()

模块间调用逻辑图:

scan.py
├── requester()           → 发起请求
├── dom()                 → DOM XSS 静态检测
├── getParams()           → 参数提取
├── wafDetector()         → 检测是否有 WAF
├── htmlParser()          → 判断反射点(xsschecker 是否出现在响应中)
├── filterChecker()       → 判断输入被过滤的规则
├── generator()           → 根据反射点生成合适上下文 payload
└── checker()             → 逐个 payload 验证其注入效率

四.modes/singleFuzz.py(--fuzzer)

功能概述:这是XSStrike中一个用于快速fuzz单个参数的简化扫描模块,适用于手动分析或快速检测用途。相比主扫描逻辑scan.py,它不执行DOM分析,paylaod构造或效率计算,仅将一个标准payload插入目标参数并fuzz相应内容

(注释:什么是fuzzing?fuzzing指的是自动的或半自动的向程序输入大量随机,异常或畸形的数据,以观察程序是否会出现异常行为,比如崩溃,处理失败或产生漏洞等。)

内容:

1.函数定义

def singleFuzz(target, paramData, encoding, headers, delay, timeout):
参数名 含义
target 目标 URL
paramData POST 数据(若为 None 则默认 GET 请求)
encoding 可选,对 payload 进行编码
headers 自定义请求头
delay 每个请求之间的延迟(防止 WAF)
timeout 请求超时时间

2.请求方法和URL预处理

GET, POST = (False, True) if paramData else (True, False)
    # If the user hasn't supplied the root url with http(s), we will handle it
    if not target.startswith('http'):
        try:
            response = requester('https://' + target, {},
                                 headers, GET, delay, timeout)
            target = 'https://' + target
        except:
            target = 'http://' + target

3.日志输出和参数准备

host = urlparse(target).netloc
url = getUrl(target, GET)
params = getParams(target, paramData, GET)

解释:解析出主机名,最终构造的URL和参数键值对,使用core/utils.py中的工具函数

4.WAF检测

WAF = wafDetector(url, {list(params.keys())[0]: xsschecker}, headers, GET, delay, timeout)

5.Fuzz主题逻辑

for paramName in params.keys():
    logger.info('Fuzzing parameter: %s' % paramName)
    paramsCopy = copy.deepcopy(params)
    paramsCopy[paramName] = xsschecker
    fuzzer(url, paramsCopy, headers, GET, delay, timeout, WAF, encoding)

解释:逐个参数插入标准payload,调用core/fuzzer.py中的fuzzer()进行fuzz

与scan.py的区别:

功能 scan.py singlefuzz.py
DOM 检测
Payload 构造
自动反射分析
多参数扫描
标准 payload fuzz ✅(核心功能)
用途 自动扫描 + 精度分析 快速测试某个目标是否有反射型 XSS

五.modes/bruteforcer.py(-f ARGS_FILE)

功能概述:实现了一个基于payload列表的暴力测试器,用于向指定参数逐个注入Payload,并检测是否存在响应中,从而判断是否反射

内容:

1.主函数

def bruteforcer(target, paramData, payloadList, encoding, headers, delay, timeout):
  • target:目标 URL。

  • paramData:POST 参数,如果为空则默认使用 GET。

  • payloadList:需要测试的 Payload 字符串列表。

  • encoding:是否使用编码函数处理 Payload(如 HTML 实体编码等)。

  • headers:请求头字典。

  • delay:请求间隔延迟。

  • timeout:请求超时时间。

2.请求方法和URL预处理

GET, POST = (False, True) if paramData else (True, False)
    host = urlparse(target).netloc  # Extracts host out of the url
    logger.debug('Parsed host to bruteforce: {}'.format(host))
    url = getUrl(target, GET)
    logger.debug('Parsed url to bruteforce: {}'.format(url))
    params = getParams(target, paramData, GET)
    logger.debug_json('Bruteforcer params:', params)
    if not params:
        logger.error('No parameters to test.')
        quit()

3.外层循环:遍历参数名,内层循环:遍历payload列表

for paramName in params.keys():
        progress = 1
        paramsCopy = copy.deepcopy(params)
        for payload in payloadList:
            logger.run('Bruteforcing %s[%s%s%s]%s: %i/%i\r' %
                       (green, end, paramName, green, end, progress, len(payloadList)))
            if encoding:
                payload = encoding(unquote(payload))
            paramsCopy[paramName] = payload
            response = requester(url, paramsCopy, headers,
                                 GET, delay, timeout).text
            if encoding:
                payload = encoding(payload)
            if payload in response:
                logger.info('%s %s' % (good, payload))
            progress += 1

六.modes/craw.py(--craw)

功能概述:负责处理表单爬取,漏洞检测,Blind XSS注入

函数:

def crawl(scheme, host, main_url, form, blindXSS, blindPayload, headers, delay, timeout, encoding):
  • scheme: 协议名,如 "http""https"

  • host: 主机名,如 "example.com"

  • main_url: 起始页面的 URL,用于校验表单来源

  • form: 从页面中提取到的所有表单数据(是一个字典,通常由 htmlParser 提供)

  • blindXSS: 布尔值,是否启用 Blind XSS 模式

  • blindPayload: 如果启用 Blind XSS,这是注入的 payload

  • headers: 请求头

  • delay: 每次请求之间的延迟(秒)

  • timeout: 请求超时时间

  • encoding: payload 的编码方式(可选)

内容:

    if form:
        for each in form.values():
            url = each['action']

解释:遍历页面中提取到的所有的form,每个表单是一个dict,含有action,method,inputs等信息,从中获取<form action="...">中的URL

1)构造完整的表单提交URL

            if url:
                if url.startswith(main_url):
                    pass
                elif url.startswith('//') and url[2:].startswith(host):
                    url = scheme + '://' + url[2:]
                elif url.startswith('/'):
                    url = scheme + '://' + host + url
                elif re.match(r'\w', url[0]):
                    url = scheme + '://' + host + '/' + url

2)初始化记录,避免重复检测

                if url not in core.config.globalVariables['checkedForms']:
                    core.config.globalVariables['checkedForms'][url] = []

解释:全局变量 checkedForms 用来记录已经检测过的表单和参数,避免重复。

3)提交表单参数+构造参数

method = each['method']
GET = True if method == 'get' else False
inputs = each['inputs']
paramData = {}
for one in inputs:
    paramData[one['name']] = one['value']

解释:获取表单的提交方式(GET/POST),提取所有输入框的吗name/value作为参数字典paramData。

4)遍历每一个参数并开始测试

for paramName in paramData.keys():
    if paramName not in core.config.globalVariables['checkedForms'][url]:
        core.config.globalVariables['checkedForms'][url].append(paramName)

解释:只测试没有检测过的参数

5)注入XSS测试载荷,然后发送请求

paramsCopy = copy.deepcopy(paramData)
paramsCopy[paramName] = xsschecker
response = requester(url, paramsCopy, headers, GET, delay, timeout)

解释:拷贝参数,向当前参数注入默认XSS检测payload,并发送请求

6)相应解析和位置提取

occurences = htmlParser(response, encoding)
positions = occurences.keys()

解释:使用htmlParser查找payload是否出现在响应中,并提取位置

7)进一步过滤+生成payload

occurences = filterChecker(url, paramsCopy, headers, GET, delay, occurences, timeout, encoding)
vectors = generator(occurences, response.text)

解释:filterChecker用于检查是否有字符被过滤,generator根据上下文和过滤情况生成最适合的XSS payload

8)如果找到可利用的payload,输出结果

if vectors:
    for confidence, vects in vectors.items():
        try:
            payload = list(vects)[0]
            logger.vuln('Vulnerable webpage: %s%s%s' % (green, url, end))
            logger.vuln('Vector for %s%s%s: %s' % (green, paramName, end, payload))
            break
        except IndexError:
            pass

解释:成功生成 payload 则输出漏洞信息(URL + payload),break 表示只输出一个 payload 即可。

9)注入Blind XSS(如果启用)

if blindXSS and blindPayload:
    paramsCopy[paramName] = blindPayload
    requester(url, paramsCopy, headers, GET, delay, timeout)

解释:如果开启了Blind XSS模式,则用blindPayload再次注入,这种payload不在当前页面回显,但可能在后台系统触发。

七.core/dom.py

功能概述:在响应的<script>标签中寻找潜在的XSS"source→sink"流,从而判断是否存在DOM XSS漏洞。

关键术语:

概念 解释
DOM XSS 通过 JavaScript 在客户端执行中,攻击者控制的输入流向危险的函数(如 innerHTML, eval),造成 XSS
Source 攻击者可控的输入点(如 document.location, window.name
Sink 可能执行恶意代码的危险函数(如 eval, innerHTML, document.write
变量追踪 判断 source 内容是否被赋值给某个变量,再被传入 sink

内容:

1)识别source和sink正则

sources = r'''\b(?:document\.(URL|documentURI|...|referrer)|location\.(...))\b'''
sinks = r'''\b(?:eval|assign|innerHTML|...|document\.(write|writeln)|...)'''

2)用正则匹配<script>标签内容

scripts = re.findall(r'(?i)(?s)<script[^>]*>(.*?)</script>', response)

3)主循环分析逻辑

sinkFound, sourceFound = False, False
for script in scripts:
    script = script.split('\n')
    num = 1
    allControlledVariables = set()
  • sinkFoundsourceFound:用于标记是否发现漏洞路径。

  • 将每段 <script> 的内容按行分割,便于逐行分析与高亮。

  • allControlledVariables:记录由用户输入派生出的变量名。

4)逐行扫描JS内容

for newLine in script:
    line = newLine
    parts = line.split('var ')
    controlledVariables = set()

解释:每行JS代码存在于newline中,用split('var ') 试图提取变量定义

5)检查source派生变量

for part in parts:
    for controlledVariable in allControlledVariables:
        if controlledVariable in part:
            controlledVariables.add(re.search(r'[a-zA-Z$_][a-zA-Z0-9$_]+', part).group().replace('$', '\\$'))

解释:如果在某一行中,已有受控变量出现在某段变量定义中,就认为新的变量也被污染。(污点传播)

6)匹配source并高亮

pattern = re.finditer(sources, newLine)
for grp in pattern:
    source = newLine[grp.start():grp.end()].replace(' ', '')
    if len(parts) > 1:
        for part in parts:
            if source in part:
                controlledVariables.add(re.search(r'[a-zA-Z$_][a-zA-Z0-9$_]+', part).group().replace('$', '\\$'))
    line = line.replace(source, yellow + source + end)

解释:使用正则在该行中查找source,将发现的source替换为黄色高亮,如果source出现在变量定义中,也将该变量添加为受控变量。

7)受控变量使用点检查

for controlledVariable in allControlledVariables:
    matches = list(filter(None, re.findall(r'\b%s\b' % controlledVariable, line)))
    if matches:
        sourceFound = True
        line = re.sub(r'\b%s\b' % controlledVariable, yellow + controlledVariable + end, line)

解释:如果本行中使用了受控变量,也高亮显示,并标记sourceFound,说明受控输入正在传播

8)检测sink并高亮

pattern = re.finditer(sinks, newLine)
for grp in pattern:
    sink = newLine[grp.start():grp.end()].replace(' ', '')
    if sink:
        line = line.replace(sink, red + sink + end)
        sinkFound = True

解释:匹配sink函数入innerHTML,eval,将其红色高亮,只要存在sink就标记sinkFound

9)保存高亮结果

if line != newLine:
    highlighted.append('%-3s %s' % (str(num), line.lstrip(' ')))
num += 1
  • 如果当前行已被高亮处理(说明可能存在危险 source → sink 流),就保存。

  • 添加行号便于后续定位漏洞代码位置。

10)最终返回结果

if sinkFound or sourceFound:
    return highlighted
else:
    return []

解释:如果整个<script>中发现了sink或source,就返回高亮行,狗则返回空列表。

小结:

HTML Response
    ↓
提取 <script> 脚本
    ↓
逐行查找用户可控的 source(黄色高亮)
    ↓
变量污染分析:追踪 source → var a → var b
    ↓
查找危险 sink 函数(红色高亮)
    ↓
匹配成功 → 返回高亮源码(标记潜在 DOM XSS)

八.core/fuzzer.py

功能概述:

九.core/generator.py

功能概述:

十.core/photon.py

功能概述:

十一.core/utils.py

功能概述:

十二.core/wafDetector.py

功能概述: