#!/usr/bin/env python
"""
@Author: Castiel
@Email: ca3tie1@gmail.com
@Blog: https://ca3tie1.github.io
@Git: https://github.com/ca3tie1
@Wechat: Ca5tie1
@Date: 2021/3/13 11:21
"""
import os
import random
import string
import sys
import argparse
import time
import chardet
import pyfiglet
from functools import wraps
from ExpDepos.config.config import Config
from ExpDepos.libs.core.AttribDict import AttribDict
from ExpDepos.libs.core.common.Console import Console, formatString, formatPgString
from ExpDepos.libs.core.TencentCloudFunction import TencentScf
from ExpDepos.libs.core.base.ExceptionBase import ExpDeposException
# 终端信息输出统一使用rich的Logging Handler
console = Console()
env = AttribDict()
def Root_Path():
"""
获取程序根目录路径
:return: string 根目录路径值
"""
return os.path.abspath(os.path.dirname(os.path.abspath(__file__)) + "/../../../../".replace('/', os.sep))
def show_banner():
"""
显示banner信息
:return:
"""
console.print("")
fonts = ["big", "roman", "slant", "standard", "starwars",
"univers", "banner4", "epic"]
banner = pyfiglet.Figlet(font=fonts[random.randrange(len(fonts))], width=200)
console.print(banner.renderText("ExpDepos"))
console.print(f"\t\t\t\t\t=[ [bold yellow]ExploitDepository[/] [green]{Config['VERSION']}[/]\t]")
console.print(f"\t\t\t\t\t=[ [bold yellow]By[/] [bold red]Castiel[/] at {Config['RELEASE_DATE']}\t\t]")
console.print("")
def show_version():
"""
显示系统版本号并退出
:return:
"""
show_banner()
console.print(f"ExpDepos Version: \"{Config['VERSION']}\"")
exit()
def format_module(module_spec):
"""
格式化模块显示名
将符号"."替换为文件系统分隔符
:param module_spec: 模块SPEC或模块路径
:return: 格式化后的模块显示名
"""
return module_spec.replace(".", os.sep)
def bool2mark(require):
"""
将bool类型转换为标记字符串
:param require:
:return:
"""
if require is True:
return "[bold dark_red]Yes[/bold dark_red]"
return ""
def none2str(obj):
"""
将None类型转换为字符串
:param obj:
:return:
"""
if obj is None or obj == "":
return "[bold orange1]None[/bold orange1]"
return str(obj)
def get_encoding(file):
"""
获取文件编码信息
:param file: 文件路径
:return: 文件编码信息
"""
with open(file, 'rb') as f:
tmp = chardet.detect(f.read())
return tmp['encoding']
def check_update():
"""
检查更新并退出
:return:
"""
console.info("update")
exit()
def envConfig_init():
"""
将系统配置信息加载到env.Config中
:return:
"""
console.debug("将系统配置信息设置到env中")
env.Config = AttribDict()
for key, value in Config.items():
setattr(env.Config, key, value)
def set_verbose(verbose):
"""
设置信息输出等级
:param verbose: int 需要设置的信息输出等级值
:return:
"""
console.msgLevel = verbose
console.debug(f"设置verbose为: {verbose}")
def envArgs_init(args):
"""
将解析到命令行参数设置到环境变量env.CliOptions中
:param args: argparse 解析后的Namespace对象
:return:
"""
env.CliOptions = AttribDict()
# 解析命令函参数
# 存储命令行参数值到env中
console.debug("解析到命令行参数: ", args)
console.debug("将命令行参数信息设置到env中")
for key, value in vars(args).items():
setattr(env.CliOptions, key, value)
def check_env():
"""
环境检查
:return:
"""
if sys.version_info < (3, 0):
console.error("当前Python版本不支持运行该程序,请使用[yellow]Python3.7+[/yellow]版本重新运行.")
exit()
if sys.version_info < (3, 6):
console.warning("当前Python版本过低,建议使用[yellow]Python3.6+[/yellow]版本运行.")
def deepListdir(path, exts=None, exclude=None, Files=None):
if not path.endswith(os.sep):
path += os.sep
if Files is None:
Files = []
if exclude is None:
exclude = []
if exts is None:
exts = []
listFiles = os.listdir(path)
for file in listFiles:
if file not in exclude:
ext = None
absFile = path + file
if os.path.isdir(absFile):
deepListdir(absFile, exts=exts, exclude=exclude, Files=Files)
continue
ext = file[file.rindex("."):]
if ext in exts:
Files.append(absFile)
return Files
def env_init():
"""
初始化环境信息
:return:
"""
check_env() # 环境检查
# 初始化命令行参数
args = parse_cliArgs()
# 设置verbose
set_verbose(args.verbose)
console.info("初始化环境...")
# 设置环境变量
envArgs_init(args)
envConfig_init()
def scf_init():
console.info("正在初始化云函数...")
result = scfInit(env.Config.CloudFunctions)
if result:
console.info("云函数初始化完成.")
exit()
def scfInit(CloudFunctions, user="defaultUser", configs: dict = None):
"""
初始化云函数
配置默认采用[{"cloudType": {"SecretId": "*****", "SecretKey": "*****", "Provisioned": {"Number": 10}}}, ...]单用户配置方式
为最大限度利用云函数资源可以使用多用户配置模式,例如:
{
"user1":[{"cloudType":{"SecretId":"*****", "SecretKey": "*****"}},...],
"user2":[{"cloudType":{"SecretId":"*****", "SecretKey": "*****"}},...],
}
:param configs:
:param CloudFunctions: dict 云函数配置信息
:param user: 用户信息
:return:
"""
if configs is None:
configs = dict()
else:
configs = configs
if isinstance(CloudFunctions, list):
for CloudFunction in CloudFunctions:
scfInit(CloudFunction, user=user, configs=configs)
return configs
useGlobal = False
for cloudType, cfg in CloudFunctions.items():
if "SecretId" not in cfg:
scfInit(cfg, user=cloudType, configs=configs)
else:
if "SecretId" not in cfg:
raise ExpDeposException("云函数参数配置错误!请提供'SecretId'及'SecretKey'配置")
if "useGlobal" in cfg and cfg["useGlobal"] is True:
useGlobal = True
if cloudType.lower() == "tencent":
console.info("正在为用户 [blue]{user}[/blue] 部署 "
"[magenta3]{0} Serverless Cloud Functions[/magenta3],该过程可能需要"
"[magenta3] 1 [/magenta3]至[magenta3] 5 [/magenta3]分钟...".format(cloudType, user=user))
scf = TencentScf(cfg, user=user, useGlobal=useGlobal)
else:
scf = None # 预留给其他云
api_gateways = scf.getApis()
if api_gateways:
configs.update({user: {cloudType: api_gateways}})
if configs:
console.info("用户 [blue]{user}[/blue] [magenta3]{0} Serverless Cloud Functions[/magenta3]"
" 部署完成.".format(cloudType, user=user))
return configs
def scf_clear(init=False):
console.info("正在清除已部署的函数...")
if scfclear(env.Config.CloudFunctions):
console.info("函数清除完成.")
if init:
result = scfInit(env.Config.CloudFunctions)
if result:
console.info("云函数重置完成.")
exit()
def scfclear(CloudFunctions, user="defaultUser"):
"""
清除已部署的云函数
:param CloudFunctions: 云函数配置信息
:param user: 用户信息
:return:
"""
try:
if isinstance(CloudFunctions, list):
for CloudFunction in CloudFunctions:
scfclear(CloudFunction, user=user)
return
for cloudType, cfg in CloudFunctions.items():
if "SecretId" not in cfg:
scfclear(cfg, user=cloudType)
else:
if "SecretId" not in cfg:
raise ExpDeposException("云函数参数配置错误!请提供'SecretId'及'SecretKey'配置")
if cloudType.lower() == "tencent":
console.info("正在为用户 [blue]{user}[/blue] 部署 "
"[magenta3]{0} Serverless Cloud Functions[/magenta3],该过程可能需要"
"[magenta3] 1 [/magenta3]至[magenta3] 5 [/magenta3]分钟...".format(cloudType, user=user))
scf = TencentScf(cfg, user=user)
else:
scf = None # 预留给其他云
scf.clearAll()
except ExpDeposException as e:
console.exception(repr(e))
return True
def timeit(function):
"""
用于统计对象执行时间的装饰器
:param function: 被装饰的目标对象
:return:
"""
@wraps(function)
def wrapper(*args, **kwargs):
start = time.time()
console.info("Start {0} at: {1}".format(function, start))
result = function(*args, **kwargs)
end = time.time()
console.info("Finished {0} at: {1}, Total: {2}".format(function, end, end - start))
return result
return wrapper
def asyncTimeit(function):
"""
用于统计async对象执行时间的装饰器
:param function: 被装饰的目标对象
:return:
"""
@wraps(function)
async def wrapper(*args, **kwargs):
start = time.time()
console.info("Async start {0} at: {1}".format(function, start))
result = await function(*args, **kwargs)
end = time.time()
console.info("Async finished {0} at: {1}, Total: {2}".format(function, end, end - start))
return result
return wrapper
def cli2args(args: list) -> list:
"""
解析带空格的参数为正确的命令行参数列表
:param args: list 参数列表
:return: list 正确的命令行参数列表
"""
i = 0
parseArgs = []
while i < len(args):
arg = str(args[i])
if arg.startswith("'") or arg.startswith("\""):
quote = arg[0:1]
tmpList = args[i:]
tmpArgs = []
j = 0
while j < len(tmpList):
tmpArg = str(tmpList[j])
tmpArgs.append(tmpArg.strip(quote))
j += 1
i += 1
if tmpArg.endswith(quote):
break
parseArgs.append(" ".join(tmpArgs))
continue
i += 1
parseArgs.append(arg)
return parseArgs
def add_quote(argValue: str) -> str:
"""
为含有空格的命令行参数值添加引号包裹
:param argValue: 参数值
:return:
"""
if " " in argValue:
argValue = "\"" + argValue + "\""
return argValue
[文档]def randstr(length: int = 5, number: bool = False, letter: bool = False) -> str:
"""
随机字符生成器
:param length: 生成长度
:param number: 是否纯数字
:@param letter: 是否纯字母
:return: str
"""
if number:
return "".join(random.sample(string.digits, length))
if letter:
return "".join(random.sample(string.ascii_letters, length))
return "".join(random.sample(string.ascii_letters + string.digits, length))
def parse_cliArgs(source=None):
"""
解析命令行参数
:param source: 参数解析源,默认命令行不含文件名的参数值
:return: argparse 解析后的Namespace对象
"""
if source is None:
source = sys.argv[1:]
args = None
try:
parse = argparse.ArgumentParser(prog="ExpDepos", conflict_handler='resolve')
parse.add_argument("-V", "--version", help="显示版本信息并退出", dest="show_version",
action="store_true")
parse.add_argument("--update", help="更新ExpDepos", dest="check_update",
action="store_true")
parse.add_argument("-v", "--verbose", help="控制台信息输等级,该值越大输出信息越详细",
default=Config['Console']['msgLevel'], choices=[0, 1, 2, 3, 4], type=int, dest="verbose")
parse.add_argument("--console", help="启动交互式Shell控制台", dest="console", action="store_true")
# scf group
scf_group = parse.add_argument_group("SCF arguments")
scf_opt = scf_group.add_mutually_exclusive_group()
scf_opt.add_argument("--scfinit", help="初始化云函数", action="store_true")
scf_opt.add_argument("--scfclear", help="删除全部云函数", action="store_true")
scf_opt.add_argument("--scfreset", help="重置云函数", action="store_true")
# target group
target_group = parse.add_argument_group("target arguments")
target_group.add_argument("-H", "--host", help="目标地址 (可选IP端口或URL地址)",
dest="host", type=str)
# module group
mode_group = parse.add_argument_group("module arguments")
mode_group.add_argument("-M", "--module", help="需要运行的Exploit模块",
dest="module", type=str)
mode_group.add_argument("-P", "--options", help="模块的用户自定义参数", dest="options", type=str)
mode_group.add_argument("-p", "--payload", help="为Exploit模块设置有效攻击载荷", dest="payload", type=str)
mode_group.add_argument("-Po", help="攻击载荷自定义参数", dest="payload_opts", type=str)
mode_group.add_argument("-e", "--encode", help="为攻击载荷设置编码器", type=str)
mode_group.add_argument("-Eo", help="编码器自定义参数", dest="encode_opts", type=str)
mode_group.add_argument("-L", "--list", help="显示给定类型模块列表", dest="list", type=str)
# module mode group
module_mode = mode_group.add_mutually_exclusive_group()
module_mode.add_argument("--info", help="显示模块详细信息", dest="show_info", action="store_true")
module_mode.add_argument("--verify", help="以验证模式执行Exploit模块", action="store_true")
module_mode.add_argument("--exploit", help="以利用模式执行Exploit模块", action="store_true")
module_mode.add_argument("-h", "--help", help="显示Exploit模块帮助 与-M搭配使用",
dest="module_help", action="store_true")
# request group
request_group = parse.add_argument_group("request arguments")
request_group.add_argument("--auth", help="HTTP基础认证信息(username:password)")
request_group.add_argument("--cookie", help="自定义Cookie信息,可选完整cookie字符串或键值对应值", dest="cookie")
request_group.add_argument("--headers", help="自定义Header信息(key1:value1,key2:value2)", dest="header")
request_group.add_argument("--user-agent", help="自定义HTTP Header的User-Agent值(默认值:随机生成)", dest="agent")
request_group.add_argument("--referer", help="自定义HTTP Header的Referer值")
request_group.add_argument("--timeout", help="自定义超时时间(默认值:30)", default=30, dest="timeout", type=int)
request_group.add_argument("--retries", help="当请求失败后重试次数(默认值:3)", default=3, dest="retries", type=int)
request_group.add_argument("--rdelay", help="当请求失败后重试间隔时间(默认值:3)", default=3, dest="rdelay", type=int)
request_group.add_argument("--delay", help="两次请求之间的间隔时间(单位:秒)", default=0, dest="delay", type=int)
request_group.add_argument("--proxy", help="使用代理发起REQUEST请求")
request_group.add_argument("--thread", help="协程并发请求量(默认:200)", default=200, type=int)
request_group.add_argument("--scf", help="使用云函数(Serverless Cloud Functions)代理", action="store_true",
dest="useScf")
args = parse.parse_args(source)
# 校验必要参数
if not any((args.show_version, args.host, args.module, args.check_update,
args.scfinit, args.scfclear, args.scfreset, args.console, args.list)):
parse.print_help()
exit()
# 设置了host参数,则module参数是必须的
if args.host and args.module is None:
console.error("缺少必要参数([bold cyan]-M, --module[/bold cyan]). "
"请使用[bold cyan]-M[/bold cyan]或者[bold cyan]--module[/bold cyan]选项选择要运行的模块.")
exit()
except Exception as e:
console.exception(repr(e))
return args
def parse_scfOptions():
if not env.Config.CloudFunctions:
raise ExpDeposException("未找到云函数相关配置信息, 请在config中配置.")
if env.CliOptions.scfinit:
scf_init()
if env.CliOptions.scfclear:
scf_clear()
if env.CliOptions.scfreset:
scf_clear(init=True)
def parse_options():
"""
解析命令行参数
:return:
"""
# 解析 optional 参数
if env.CliOptions.show_version:
show_version()
if env.CliOptions.check_update:
check_update()
if any((env.CliOptions.scfinit, env.CliOptions.scfclear, env.CliOptions.scfreset)):
parse_scfOptions()