|
import requests, threading, time, random, string, re
from queue import Queue
from datetime import datetime
# 配置
TARGET_FILE = "目标.txt"
OUTPUT_DICT = "自动密码字典.txt"
THREAD_NUM = 5
TIMEOUT = 10
# 初始化
tgt_q, dict_q = Queue(), Queue()
print_lock = threading.Lock()
stats = {"vuln":0, "safe":0, "error":0, "no_match":0}
def gen_dict():
"""生成账号密码字典"""
chars = list(string.ascii_letters + string.digits + "!@#$%^&*()_+{}|:\"<>?`-=[]\\;',./")
accounts = []
print("输入账号(空行结束):")
while True:
acc = input("账号:").strip()
if not acc: break
if acc not in accounts: accounts.append(acc)
if not accounts: print("[-] 无账号,退出"); return False
try:
cnt = int(input("每个账号密码数:").strip())
min_l, max_l = int(input("最小长度:").strip()), int(input("最大长度:").strip())
except: cnt, min_l, max_l = 10, 6, 12
if min_l > max_l: min_l, max_l = max_l, min_l
with open(OUTPUT_DICT, "w", encoding="utf-8") as f:
for acc in accounts:
for _ in range(cnt):
pwd = "".join(random.choice(chars) for _ in range(random.randint(min_l, max_l)))
f.write(f"{acc}:{pwd}\n")
print(f"[+] 字典生成:{OUTPUT_DICT}({len(accounts)}账号×{cnt}密码)")
return True
def load_items(file, q, split=False):
"""加载目标/字典"""
try:
with open(file, "r", encoding="utf-8") as f:
items = [l.strip() for l in f if l.strip()]
if split: items = [tuple(l.split(":",1)) for l in items if ":" in l]
for i in items: q.put(i)
print(f"[+] 加载 {q.qsize()} 个{file.split('.')[0]}")
except: print(f"[-] {file}不存在"); raise
def verify(tgt, acc, pwd):
"""验证认证"""
res = {"tgt":tgt, "acc":acc, "pwd":pwd, "stat":"unknown", "msg":"", "time":datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
try:
resp = requests.get(tgt, auth=(acc.encode(), pwd.encode()), timeout=TIMEOUT, verify=False)
if resp.status_code == 200: res["stat"], res["msg"] = "vuln", "弱口令存在"
elif resp.status_code == 401: res["stat"], res["msg"] = "safe", "账号密码不匹配"
else: res["msg"] = f"状态码{resp.status_code}"
except requests.exceptions.ConnectTimeout: res["stat"], res["msg"] = "error", "超时"
except requests.exceptions.ConnectionError: res["stat"], res["msg"] = "error", "连接失败"
except Exception as e: res["stat"], res["msg"] = "error", f"错误:{str(e)[:20]}"
return res
def worker():
"""工作线程"""
while not (tgt_q.empty() or dict_q.empty()):
tgt, (acc, pwd) = tgt_q.get(), dict_q.get()
res = verify(tgt, acc, pwd)
with print_lock:
# 日志与输出
log = f"{res['time']}\t{res['stat']}\t{res['tgt']}\t{res['acc']}\t{res['pwd']}\t{res['msg']}\n"
with open(f"log_{datetime.now().strftime('%Y%m%d%H%M%S')}.txt", "a") as f: f.write(log)
# 状态打印
print({
"vuln":f"[!] 漏洞:{res['tgt']} | {acc}:{pwd}",
"safe":f"[+] 安全:{res['tgt']} | {acc}:{pwd}",
"error":f"[-] 失败:{res['tgt']} | {acc}:{pwd} | {res['msg']}",
"no_match":f"[-] 无匹配:{res['tgt']} | {acc}:{pwd}"
}[res["stat"]])
stats[res["stat"]] += 1
print(f"[*] 剩余:{tgt_q.qsize() + dict_q.qsize()//THREAD_NUM} | 漏洞:{stats['vuln']}")
tgt_q.task_done()
dict_q.task_done()
def main():
"""主函数"""
print("="*60 + "\n 警告:仅授权学习使用!\n" + "="*60)
if input("已授权?(y/n):").lower() != "y": return
if not gen_dict(): return
try: load_items(TARGET_FILE, tgt_q); load_items(OUTPUT_DICT, dict_q, split=True)
except: return
if tgt_q.empty() or dict_q.empty(): return
start = time.time()
for _ in range(THREAD_NUM): threading.Thread(target=worker, daemon=True).start()
tgt_q.join(); dict_q.join()
print(f"\n[*] 完成 | 耗时{round(time.time()-start,2)}s")
print(f"[*] 统计:漏洞{stats['vuln']} | 安全{stats['safe']} | 失败{stats['error']} | 无匹配{stats['no_match']}")
if __name__ == "__main__":
main() |
|