[AppleScript] 纯文本查看 复制代码
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
弱口令检测工具 - 优化版本
警告:仅用于授权的安全测试!
"""
import requests
import threading
import time
import random
import string
import logging
from queue import Queue, Empty
from datetime import datetime
from typing import List, Tuple, Dict, Optional
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor
import urllib3
# 禁用SSL警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
@dataclass
class Config:
"""配置类"""
target_file: str = "目标.txt"
output_dict: str = "自动密码字典.txt"
thread_num: int = 5
timeout: int = 10
max_retries: int = 3
retry_delay: float = 1.0
log_level: str = "INFO"
class PasswordGenerator:
"""密码生成器"""
def __init__(self):
self.chars = string.ascii_letters + string.digits + "!@#$%^&*()_+{}|:\"<>?`-=[]\\;',./"
def generate_common_passwords(self, count: int = 50) -> List[str]:
"""生成常见弱密码"""
common_passwords = [
"123456", "password", "123456789", "12345678", "12345",
"1234567", "admin", "123123", "qwerty", "abc123",
"Password", "password123", "admin123", "root", "guest",
"test", "user", "123", "1234", "12345678910"
]
# 添加随机生成的密码
for _ in range(count - len(common_passwords)):
length = random.randint(6, 12)
pwd = ''.join(random.choice(self.chars) for _ in range(length))
common_passwords.append(pwd)
return common_passwords[:count]
def generate_dict(self, accounts: List[str], passwords_per_account: int,
min_length: int, max_length: int) -> List[Tuple[str, str]]:
"""生成账号密码字典"""
credentials = []
for account in accounts:
# 添加常见弱密码
common_pwds = self.generate_common_passwords(passwords_per_account // 2)
for pwd in common_pwds:
credentials.append((account, pwd))
# 添加随机密码
remaining = passwords_per_account - len(common_pwds)
for _ in range(remaining):
length = random.randint(min_length, max_length)
pwd = ''.join(random.choice(self.chars) for _ in range(length))
credentials.append((account, pwd))
return credentials
class WeakPasswordDetector:
"""弱口令检测器"""
def __init__(self, config: Config):
self.config = config
self.stats = {"vuln": 0, "safe": 0, "error": 0, "timeout": 0}
self.stats_lock = threading.Lock()
self.setup_logging()
def setup_logging(self):
"""设置日志"""
log_filename = f"weak_password_scan_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
logging.basicConfig(
level=getattr(logging, self.config.log_level),
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_filename, encoding='utf-8'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def load_targets(self) -> List[str]:
"""加载目标列表"""
try:
with open(self.config.target_file, 'r', encoding='utf-8') as f:
targets = [line.strip() for line in f if line.strip()]
self.logger.info(f"加载了 {len(targets)} 个目标")
return targets
except FileNotFoundError:
self.logger.error(f"目标文件 {self.config.target_file} 不存在")
raise
except Exception as e:
self.logger.error(f"加载目标文件时出错: {e}")
raise
def load_credentials(self) -> List[Tuple[str, str]]:
"""加载凭据字典"""
try:
with open(self.config.output_dict, 'r', encoding='utf-8') as f:
credentials = []
for line in f:
line = line.strip()
if ':' in line:
account, password = line.split(':', 1)
credentials.append((account, password))
self.logger.info(f"加载了 {len(credentials)} 个凭据")
return credentials
except FileNotFoundError:
self.logger.error(f"字典文件 {self.config.output_dict} 不存在")
raise
except Exception as e:
self.logger.error(f"加载字典文件时出错: {e}")
raise
def verify_credential(self, target: str, account: str, password: str) -> Dict:
"""验证单个凭据"""
result = {
"target": target,
"account": account,
"password": password,
"status": "unknown",
"message": "",
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
for attempt in range(self.config.max_retries):
try:
# 确保URL格式正确
if not target.startswith(('http://', 'https://')):
target = f"http://{target}"
response = requests.get(
target,
auth=(account, password),
timeout=self.config.timeout,
verify=False,
allow_redirects=False
)
if response.status_code == 200:
result["status"] = "vuln"
result["message"] = "弱口令验证成功"
break
elif response.status_code == 401:
result["status"] = "safe"
result["message"] = "认证失败"
break
else:
result["status"] = "error"
result["message"] = f"HTTP状态码: {response.status_code}"
except requests.exceptions.Timeout:
result["status"] = "timeout"
result["message"] = "请求超时"
if attempt < self.config.max_retries - 1:
time.sleep(self.config.retry_delay)
continue
break
except requests.exceptions.ConnectionError:
result["status"] = "error"
result["message"] = "连接失败"
break
except Exception as e:
result["status"] = "error"
result["message"] = f"未知错误: {str(e)[:50]}"
break
return result
def update_stats(self, status: str):
"""更新统计信息"""
with self.stats_lock:
if status in self.stats:
self.stats[status] += 1
def log_result(self, result: Dict):
"""记录结果"""
status = result["status"]
target = result["target"]
account = result["account"]
password = result["password"]
message = result["message"]
if status == "vuln":
self.logger.warning(f"[漏洞] {target} | {account}:{password}")
print(f"🚨 [漏洞发现] {target} | {account}:{password}")
elif status == "safe":
self.logger.info(f"[安全] {target} | {account}:{password}")
elif status == "timeout":
self.logger.warning(f"[超时] {target} | {account}:{password}")
else:
self.logger.error(f"[错误] {target} | {account}:{password} | {message}")
self.update_stats(status)
def worker(self, task_queue: Queue):
"""工作线程函数"""
while True:
try:
target, account, password = task_queue.get(timeout=1)
result = self.verify_credential(target, account, password)
self.log_result(result)
task_queue.task_done()
# 显示进度
with self.stats_lock:
total_processed = sum(self.stats.values())
if total_processed % 10 == 0: # 每10个显示一次进度
remaining = task_queue.qsize()
print(f"📊 进度: 已处理{total_processed}, 剩余{remaining}, 漏洞{self.stats['vuln']}")
except Empty:
break
except Exception as e:
self.logger.error(f"工作线程错误: {e}")
def run_scan(self, targets: List[str], credentials: List[Tuple[str, str]]):
"""运行扫描"""
# 创建任务队列
task_queue = Queue()
# 填充任务队列
for target in targets:
for account, password in credentials:
task_queue.put((target, account, password))
total_tasks = task_queue.qsize()
self.logger.info(f"开始扫描,总任务数: {total_tasks}")
print(f"🚀 开始扫描,总任务数: {total_tasks}")
start_time = time.time()
# 使用线程池执行任务
with ThreadPoolExecutor(max_workers=self.config.thread_num) as executor:
futures = [executor.submit(self.worker, task_queue) for _ in range(self.config.thread_num)]
# 等待所有任务完成
task_queue.join()
end_time = time.time()
duration = round(end_time - start_time, 2)
# 输出最终统计
print(f"\n✅ 扫描完成!")
print(f"⏱️ 耗时: {duration}秒")
print(f"📈 统计结果:")
print(f" 🚨 发现漏洞: {self.stats['vuln']}")
print(f" ✅ 安全目标: {self.stats['safe']}")
print(f" ❌ 错误数量: {self.stats['error']}")
print(f" ⏰ 超时数量: {self.stats['timeout']}")
self.logger.info(f"扫描完成,耗时{duration}秒,统计: {self.stats}")
def get_user_input() -> Tuple[List[str], int, int, int]:
"""获取用户输入"""
print("📝 请输入要测试的账号(输入空行结束):")
accounts = []
while True:
account = input("账号: ").strip()
if not account:
break
if account not in accounts:
accounts.append(account)
if not accounts:
print("❌ 没有输入任何账号,程序退出")
return [], 0, 0, 0
try:
passwords_per_account = int(input("每个账号生成密码数量 [默认20]: ").strip() or "20")
min_length = int(input("密码最小长度 [默认6]: ").strip() or "6")
max_length = int(input("密码最大长度 [默认12]: ").strip() or "12")
except ValueError:
print("⚠️ 输入无效,使用默认值")
passwords_per_account, min_length, max_length = 20, 6, 12
if min_length > max_length:
min_length, max_length = max_length, min_length
return accounts, passwords_per_account, min_length, max_length
def main():
"""主函数"""
print("=" * 80)
print("🔐 弱口令检测工具 - 优化版本")
print("⚠️ 警告:仅用于授权的安全测试!")
print("=" * 80)
# 授权确认
if input("✋ 您是否已获得目标系统的授权进行安全测试?(y/n): ").lower() != 'y':
print("❌ 未获得授权,程序退出")
return
config = Config()
# 获取用户输入
accounts, passwords_per_account, min_length, max_length = get_user_input()
if not accounts:
return
# 生成密码字典
print(f"🔧 正在生成密码字典...")
generator = PasswordGenerator()
credentials = generator.generate_dict(accounts, passwords_per_account, min_length, max_length)
# 保存字典到文件
with open(config.output_dict, 'w', encoding='utf-8') as f:
for account, password in credentials:
f.write(f"{account}:{password}\n")
print(f"✅ 密码字典已生成: {config.output_dict}")
print(f"📊 包含 {len(accounts)} 个账号,每个账号 {passwords_per_account} 个密码")
# 初始化检测器
detector = WeakPasswordDetector(config)
try:
# 加载目标和凭据
targets = detector.load_targets()
credentials = detector.load_credentials()
if not targets:
print("❌ 没有找到任何目标")
return
if not credentials:
print("❌ 没有找到任何凭据")
return
# 开始扫描
detector.run_scan(targets, credentials)
except Exception as e:
print(f"❌ 程序执行出错: {e}")
detector.logger.error(f"程序执行出错: {e}")
if __name__ == "__main__":
main()