请选择 进入手机版 | 继续访问电脑版
查看: 14270|回复: 7

[源码专区] 自己写的爆破工具(Python)

[复制链接]
发表于 2025-9-21 21:21:36 | 显示全部楼层 |阅读模式
import requests, threading, time, random, string, re
from queue import Queue
from datetime import datetime

# 配置
TARGET_FILE = "目标.txt"
OUTPUT_DICT = "自动密码字典.txt"
THREAD_NUM = 5
TIMEOUT = 10

# 初始化
tgt_q, dict_q = Queue(), Queue()
print_lock = threading.Lock()
stats = {"vuln":0, "safe":0, "error":0, "no_match":0}

def gen_dict():
    """生成账号密码字典"""
    chars = list(string.ascii_letters + string.digits + "!@#$%^&*()_+{}|:\"<>?`-=[]\\;',./")
    accounts = []
    print("输入账号(空行结束):")
    while True:
        acc = input("账号:").strip()
        if not acc: break
        if acc not in accounts: accounts.append(acc)
    if not accounts: print("[-] 无账号,退出"); return False
   
    try:
        cnt = int(input("每个账号密码数:").strip())
        min_l, max_l = int(input("最小长度:").strip()), int(input("最大长度:").strip())
    except: cnt, min_l, max_l = 10, 6, 12
    if min_l > max_l: min_l, max_l = max_l, min_l

    with open(OUTPUT_DICT, "w", encoding="utf-8") as f:
        for acc in accounts:
            for _ in range(cnt):
                pwd = "".join(random.choice(chars) for _ in range(random.randint(min_l, max_l)))
                f.write(f"{acc}:{pwd}\n")
    print(f"[+] 字典生成:{OUTPUT_DICT}({len(accounts)}账号×{cnt}密码)")
    return True

def load_items(file, q, split=False):
    """加载目标/字典"""
    try:
        with open(file, "r", encoding="utf-8") as f:
            items = [l.strip() for l in f if l.strip()]
            if split: items = [tuple(l.split(":",1)) for l in items if ":" in l]
            for i in items: q.put(i)
        print(f"[+] 加载 {q.qsize()} 个{file.split('.')[0]}")
    except: print(f"[-] {file}不存在"); raise

def verify(tgt, acc, pwd):
    """验证认证"""
    res = {"tgt":tgt, "acc":acc, "pwd":pwd, "stat":"unknown", "msg":"", "time":datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
    try:
        resp = requests.get(tgt, auth=(acc.encode(), pwd.encode()), timeout=TIMEOUT, verify=False)
        if resp.status_code == 200: res["stat"], res["msg"] = "vuln", "弱口令存在"
        elif resp.status_code == 401: res["stat"], res["msg"] = "safe", "账号密码不匹配"
        else: res["msg"] = f"状态码{resp.status_code}"
    except requests.exceptions.ConnectTimeout: res["stat"], res["msg"] = "error", "超时"
    except requests.exceptions.ConnectionError: res["stat"], res["msg"] = "error", "连接失败"
    except Exception as e: res["stat"], res["msg"] = "error", f"错误:{str(e)[:20]}"
    return res

def worker():
    """工作线程"""
    while not (tgt_q.empty() or dict_q.empty()):
        tgt, (acc, pwd) = tgt_q.get(), dict_q.get()
        res = verify(tgt, acc, pwd)
        with print_lock:
            # 日志与输出
            log = f"{res['time']}\t{res['stat']}\t{res['tgt']}\t{res['acc']}\t{res['pwd']}\t{res['msg']}\n"
            with open(f"log_{datetime.now().strftime('%Y%m%d%H%M%S')}.txt", "a") as f: f.write(log)
            
            # 状态打印
            print({
                "vuln":f"[!] 漏洞:{res['tgt']} | {acc}:{pwd}",
                "safe":f"[+] 安全:{res['tgt']} | {acc}:{pwd}",
                "error":f"[-] 失败:{res['tgt']} | {acc}:{pwd} | {res['msg']}",
                "no_match":f"[-] 无匹配:{res['tgt']} | {acc}:{pwd}"
            }[res["stat"]])
            stats[res["stat"]] += 1
            print(f"[*] 剩余:{tgt_q.qsize() + dict_q.qsize()//THREAD_NUM} | 漏洞:{stats['vuln']}")
        tgt_q.task_done()
        dict_q.task_done()

def main():
    """主函数"""
    print("="*60 + "\n  警告:仅授权学习使用!\n" + "="*60)
    if input("已授权?(y/n):").lower() != "y": return
    if not gen_dict(): return
   
    try: load_items(TARGET_FILE, tgt_q); load_items(OUTPUT_DICT, dict_q, split=True)
    except: return
    if tgt_q.empty() or dict_q.empty(): return

    start = time.time()
    for _ in range(THREAD_NUM): threading.Thread(target=worker, daemon=True).start()
    tgt_q.join(); dict_q.join()
   
    print(f"\n[*] 完成 | 耗时{round(time.time()-start,2)}s")
    print(f"[*] 统计:漏洞{stats['vuln']} | 安全{stats['safe']} | 失败{stats['error']} | 无匹配{stats['no_match']}")

if __name__ == "__main__":
    main()
回复

使用道具 举报

 楼主| 发表于 2025-9-21 21:48:14 来自手机 | 显示全部楼层
MIT 许可证

版权所有 (c) 2025 [ZeroOne]

特此授予任何获得本软件及相关文档文件(以下简称“软件”)副本的人免费许可,在不受限制的情况下处理本软件,

包括但不限于使用、复制、修改、合并、出版、发行、再许可和/或出售本软件副本的权利,

并需遵守以下条件:

上述版权声明和本许可声明应包含在本软件的所有副本或实质部分中。

本软件按“现状”提供,不提供任何形式的保证,无论是明示的还是暗示的,

包括但不限于适销性、特定用途适用性和非侵权性的保证。在任何情况下,

作者或版权持有人均不对因本软件或本软件的使用或其他交易引起的任何索赔、损害或其他责任承担责任,

无论是在合同诉讼、侵权行为或其他方面。
回复 支持 反对

使用道具 举报

发表于 2025-9-23 10:44:05 | 显示全部楼层
做了一些优化

[AppleScript] 查看源码 复制代码
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
弱口令检测工具 - 优化版本
警告:仅用于授权的安全测试!
"""

import requests
import threading
import time
import random
import string
import logging
from queue import Queue, Empty
from datetime import datetime
from typing import List, Tuple, Dict, Optional
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor
import urllib3

# 禁用SSL警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)


@dataclass
class Config:
    """配置类"""
    target_file: str = "目标.txt"
    output_dict: str = "自动密码字典.txt"
    thread_num: int = 5
    timeout: int = 10
    max_retries: int = 3
    retry_delay: float = 1.0
    log_level: str = "INFO"


class PasswordGenerator:
    """密码生成器"""
    
    def __init__(self):
        self.chars = string.ascii_letters + string.digits + "!@#$%^&*()_+{}|:\"<>?`-=[]\\;',./"
        
    def generate_common_passwords(self, count: int = 50) -> List[str]:
        """生成常见弱密码"""
        common_passwords = [
            "123456", "password", "123456789", "12345678", "12345",
            "1234567", "admin", "123123", "qwerty", "abc123",
            "Password", "password123", "admin123", "root", "guest",
            "test", "user", "123", "1234", "12345678910"
        ]
        
        # 添加随机生成的密码
        for _ in range(count - len(common_passwords)):
            length = random.randint(6, 12)
            pwd = ''.join(random.choice(self.chars) for _ in range(length))
            common_passwords.append(pwd)
            
        return common_passwords[:count]
    
    def generate_dict(self, accounts: List[str], passwords_per_account: int, 
                     min_length: int, max_length: int) -> List[Tuple[str, str]]:
        """生成账号密码字典"""
        credentials = []
        
        for account in accounts:
            # 添加常见弱密码
            common_pwds = self.generate_common_passwords(passwords_per_account // 2)
            for pwd in common_pwds:
                credentials.append((account, pwd))
            
            # 添加随机密码
            remaining = passwords_per_account - len(common_pwds)
            for _ in range(remaining):
                length = random.randint(min_length, max_length)
                pwd = ''.join(random.choice(self.chars) for _ in range(length))
                credentials.append((account, pwd))
                
        return credentials


class WeakPasswordDetector:
    """弱口令检测器"""
    
    def __init__(self, config: Config):
        self.config = config
        self.stats = {"vuln": 0, "safe": 0, "error": 0, "timeout": 0}
        self.stats_lock = threading.Lock()
        self.setup_logging()
        
    def setup_logging(self):
        """设置日志"""
        log_filename = f"weak_password_scan_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
        
        logging.basicConfig(
            level=getattr(logging, self.config.log_level),
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler(log_filename, encoding='utf-8'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
        
    def load_targets(self) -> List[str]:
        """加载目标列表"""
        try:
            with open(self.config.target_file, 'r', encoding='utf-8') as f:
                targets = [line.strip() for line in f if line.strip()]
            self.logger.info(f"加载了 {len(targets)} 个目标")
            return targets
        except FileNotFoundError:
            self.logger.error(f"目标文件 {self.config.target_file} 不存在")
            raise
        except Exception as e:
            self.logger.error(f"加载目标文件时出错: {e}")
            raise
            
    def load_credentials(self) -> List[Tuple[str, str]]:
        """加载凭据字典"""
        try:
            with open(self.config.output_dict, 'r', encoding='utf-8') as f:
                credentials = []
                for line in f:
                    line = line.strip()
                    if ':' in line:
                        account, password = line.split(':', 1)
                        credentials.append((account, password))
            self.logger.info(f"加载了 {len(credentials)} 个凭据")
            return credentials
        except FileNotFoundError:
            self.logger.error(f"字典文件 {self.config.output_dict} 不存在")
            raise
        except Exception as e:
            self.logger.error(f"加载字典文件时出错: {e}")
            raise
            
    def verify_credential(self, target: str, account: str, password: str) -> Dict:
        """验证单个凭据"""
        result = {
            "target": target,
            "account": account,
            "password": password,
            "status": "unknown",
            "message": "",
            "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }
        
        for attempt in range(self.config.max_retries):
            try:
                # 确保URL格式正确
                if not target.startswith(('http://', 'https://')):
                    target = f"http://{target}"
                    
                response = requests.get(
                    target,
                    auth=(account, password),
                    timeout=self.config.timeout,
                    verify=False,
                    allow_redirects=False
                )
                
                if response.status_code == 200:
                    result["status"] = "vuln"
                    result["message"] = "弱口令验证成功"
                    break
                elif response.status_code == 401:
                    result["status"] = "safe"
                    result["message"] = "认证失败"
                    break
                else:
                    result["status"] = "error"
                    result["message"] = f"HTTP状态码: {response.status_code}"
                    
            except requests.exceptions.Timeout:
                result["status"] = "timeout"
                result["message"] = "请求超时"
                if attempt < self.config.max_retries - 1:
                    time.sleep(self.config.retry_delay)
                    continue
                break
                
            except requests.exceptions.ConnectionError:
                result["status"] = "error"
                result["message"] = "连接失败"
                break
                
            except Exception as e:
                result["status"] = "error"
                result["message"] = f"未知错误: {str(e)[:50]}"
                break
                
        return result
        
    def update_stats(self, status: str):
        """更新统计信息"""
        with self.stats_lock:
            if status in self.stats:
                self.stats[status] += 1
                
    def log_result(self, result: Dict):
        """记录结果"""
        status = result["status"]
        target = result["target"]
        account = result["account"]
        password = result["password"]
        message = result["message"]
        
        if status == "vuln":
            self.logger.warning(f"[漏洞] {target} | {account}:{password}")
            print(f"? [漏洞发现] {target} | {account}:{password}")
        elif status == "safe":
            self.logger.info(f"[安全] {target} | {account}:{password}")
        elif status == "timeout":
            self.logger.warning(f"[超时] {target} | {account}:{password}")
        else:
            self.logger.error(f"[错误] {target} | {account}:{password} | {message}")
            
        self.update_stats(status)
        
    def worker(self, task_queue: Queue):
        """工作线程函数"""
        while True:
            try:
                target, account, password = task_queue.get(timeout=1)
                result = self.verify_credential(target, account, password)
                self.log_result(result)
                task_queue.task_done()
                
                # 显示进度
                with self.stats_lock:
                    total_processed = sum(self.stats.values())
                    if total_processed % 10 == 0:  # 每10个显示一次进度
                        remaining = task_queue.qsize()
                        print(f"? 进度: 已处理{total_processed}, 剩余{remaining}, 漏洞{self.stats['vuln']}")
                        
            except Empty:
                break
            except Exception as e:
                self.logger.error(f"工作线程错误: {e}")
                
    def run_scan(self, targets: List[str], credentials: List[Tuple[str, str]]):
        """运行扫描"""
        # 创建任务队列
        task_queue = Queue()
        
        # 填充任务队列
        for target in targets:
            for account, password in credentials:
                task_queue.put((target, account, password))
                
        total_tasks = task_queue.qsize()
        self.logger.info(f"开始扫描,总任务数: {total_tasks}")
        print(f"? 开始扫描,总任务数: {total_tasks}")
        
        start_time = time.time()
        
        # 使用线程池执行任务
        with ThreadPoolExecutor(max_workers=self.config.thread_num) as executor:
            futures = [executor.submit(self.worker, task_queue) for _ in range(self.config.thread_num)]
            
            # 等待所有任务完成
            task_queue.join()
            
        end_time = time.time()
        duration = round(end_time - start_time, 2)
        
        # 输出最终统计
        print(f"\n✅ 扫描完成!")
        print(f"⏱️  耗时: {duration}秒")
        print(f"? 统计结果:")
        print(f"   ? 发现漏洞: {self.stats['vuln']}")
        print(f"   ✅ 安全目标: {self.stats['safe']}")
        print(f"   ❌ 错误数量: {self.stats['error']}")
        print(f"   ⏰ 超时数量: {self.stats['timeout']}")
        
        self.logger.info(f"扫描完成,耗时{duration}秒,统计: {self.stats}")


def get_user_input() -> Tuple[List[str], int, int, int]:
    """获取用户输入"""
    print("? 请输入要测试的账号(输入空行结束):")
    accounts = []
    while True:
        account = input("账号: ").strip()
        if not account:
            break
        if account not in accounts:
            accounts.append(account)
            
    if not accounts:
        print("❌ 没有输入任何账号,程序退出")
        return [], 0, 0, 0
        
    try:
        passwords_per_account = int(input("每个账号生成密码数量 [默认20]: ").strip() or "20")
        min_length = int(input("密码最小长度 [默认6]: ").strip() or "6")
        max_length = int(input("密码最大长度 [默认12]: ").strip() or "12")
    except ValueError:
        print("⚠️  输入无效,使用默认值")
        passwords_per_account, min_length, max_length = 20, 6, 12
        
    if min_length > max_length:
        min_length, max_length = max_length, min_length
        
    return accounts, passwords_per_account, min_length, max_length


def main():
    """主函数"""
    print("=" * 80)
    print("? 弱口令检测工具 - 优化版本")
    print("⚠️  警告:仅用于授权的安全测试!")
    print("=" * 80)
    
    # 授权确认
    if input("✋ 您是否已获得目标系统的授权进行安全测试?(y/n): ").lower() != 'y':
        print("❌ 未获得授权,程序退出")
        return
        
    config = Config()
    
    # 获取用户输入
    accounts, passwords_per_account, min_length, max_length = get_user_input()
    if not accounts:
        return
        
    # 生成密码字典
    print(f"? 正在生成密码字典...")
    generator = PasswordGenerator()
    credentials = generator.generate_dict(accounts, passwords_per_account, min_length, max_length)
    
    # 保存字典到文件
    with open(config.output_dict, 'w', encoding='utf-8') as f:
        for account, password in credentials:
            f.write(f"{account}:{password}\n")
            
    print(f"✅ 密码字典已生成: {config.output_dict}")
    print(f"? 包含 {len(accounts)} 个账号,每个账号 {passwords_per_account} 个密码")
    
    # 初始化检测器
    detector = WeakPasswordDetector(config)
    
    try:
        # 加载目标和凭据
        targets = detector.load_targets()
        credentials = detector.load_credentials()
        
        if not targets:
            print("❌ 没有找到任何目标")
            return
            
        if not credentials:
            print("❌ 没有找到任何凭据")
            return
            
        # 开始扫描
        detector.run_scan(targets, credentials)
        
    except Exception as e:
        print(f"❌ 程序执行出错: {e}")
        detector.logger.error(f"程序执行出错: {e}")


if __name__ == "__main__":
    main()
回复 支持 反对

使用道具 举报

发表于 2026-5-19 12:50:00 | 显示全部楼层

Re: 自己写的爆破工具(Python)

楼主写的这个爆破工具挺有想法的,从字典生成到多线程爆破很完整,看得出来你下了不少功夫。Python 实现清晰,配置也方便改动。不过有几个点想和你聊聊: - `requests.get` 默认不验证 SSL,注释里写 `verify=False` 实际代码没写全?建议加上避免证书报错。 - 字典生成用随机字符串,实际爆破效率可能不高,或许可以加个自定义字典选项更灵活。 - 目前只支持 HTTP Basic Auth,如果能扩展成支持表单 POST 或不同接口格式就更实用了。 整体很棒,继续优化下去可以做成小型安全测试工具。记得别用于未授权系统哦~
回复 支持 反对

使用道具 举报

发表于 2026-5-19 13:05:00 | 显示全部楼层

Re: 自己写的爆破工具(Python)

楼主这个工具思路挺清晰的,从字典生成到多线程验证一条龙,自己写轮子很有动手精神。提几个小建议供参考:`requests` 的 `get` 默认不会关闭连接,高并发时可能卡住,可以在 `verify` 函数里加个 `with requests.Session() as s:` 并用 `s.get`,或者加个 `stream=False` 确保连接释放。另外 `gen_dict` 里随机密码生成用 `random.choice` 在大量生成时效率一般,可以考虑 `secrets` 模块更安全。还有 `load_items` 里面文件不存在的异常直接 `raise` 可能导致工具直接退出,改成 `sys.exit` 或提示后重试会更友好。总体值得继续完善,期待后续优化版本。
回复 支持 反对

使用道具 举报

发表于 2026-5-27 11:03:58 | 显示全部楼层

Re: 自己写的爆破工具(Python)

楼主你好,感谢分享自己写的爆破工具!代码结构清晰,功能也比较完整:从字典生成、目标加载到多线程验证,逻辑挺清楚的。用了Queue来做任务队列和线程安全锁,避免了资源竞争,这个设计值得新人学习。 几个小建议供参考: 1. **错误处理**:`load_items` 里如果文件不存在,直接 `raise` 可能会让程序崩溃,可以考虑加上 `try-except` 并返回False,或者在主函数里统一处理。 2. **验证延迟**:爆破场景下最好加一个随机延时(比如 `time.sleep(random.uniform(0.5, 1.5))`),否则容易被目标服务器封IP或触发WAF。 3. **代理支持**:可以加一个可选代理参数,方便在渗透测试中切换IP。 4. **输出结果**:可以把爆破成功的条目单独保存到一个文件(如 `success.txt`),方便查看。 5. **HTTPS警告**:`verify=False` 可能会忽略证书错误,建议在注释里注明仅用于测试环境,正式场景建议关闭或使用证书。 另外提醒一下,这类工具请务必只用于自己授权的测试或学习环境,不要用于非法攻击,保护好自己。 整体来说是个很实用的练手项目,继续完善的话可以加入更多认证方式(如Basic、Digest、表单登录等)。期待你后续的优化版本!
回复 支持 反对

使用道具 举报

发表于 2026-5-27 11:04:14 | 显示全部楼层

Re: 自己写的爆破工具(Python)

感谢楼主的分享!代码结构挺清晰的,字典生成、多线程验证和状态统计都考虑到了,作为自学练习或授权测试的小工具完全够用。几个小观察:`verify=False` 跳过了 SSL 验证,实际使用中如果目标证书有问题可以理解,但建议加上说明;随机密码生成虽然效率高,但针对真实环境可能不如常见弱口令字典有效;线程数 5 在测试少量目标时够用,大规模时可以适当调高。总体很实用,适合学习 `requests` 和 `threading` 的配合。提醒一下,这种工具请务必只用于自己搭建的测试环境或获得明确授权的渗透测试,切勿用于非法用途。再次谢谢分享!
回复 支持 反对

使用道具 举报

发表于 10 小时前 | 显示全部楼层

Re: 自己写的爆破工具(Python)

感谢分享!这个工具的功能设计挺完整的,从自动生成字典到多线程爆破一条龙,还有状态统计和时间记录,很适合做安全测试的练习。代码结构也比较清晰,用了队列和锁来处理并发,思路不错。有个小提醒:`verify=False` 关掉 SSL 验证在某些场景下会有安全隐患,建议实际用的时候留意。另外,记得只在自己有权限的环境下使用,避免踩线。想问一下,你主要用它测试过哪些类型的登录页?有没有遇到反爬或其他限制?
回复 支持 反对

使用道具 举报

您需要登录后才可以回帖 登录 | 注册

本版积分规则

指导单位

江苏省公安厅

江苏省通信管理局

浙江省台州刑侦支队

DEFCON GROUP 86025

Hacking Group 021A

旗下站点

态势感知中心

应急响应中心

红盟安全

联系我们

官方QQ群:112851260

官方邮箱:security#ihonker.org(#改成@)

官方核心成员

关注微信公众号

Archiver|手机版|小黑屋| ( 沪ICP备2021026908号 )

GMT+8, 2026-6-18 19:14 , Processed in 0.033314 second(s), 18 queries , Gzip On, Redis On.

Powered by ihonker.com

Copyright © 2015-现在.

  • 返回顶部