From d3be20ce8580caa40cfcf226266fade0ee31ca99 Mon Sep 17 00:00:00 2001 From: muqing-kg Date: Wed, 4 Feb 2026 19:32:37 +0800 Subject: [PATCH] Initial commit: Grok batch registration tool - Multi-threaded account registration - Auto email verification via freemail API - Auto NSFW/Unhinged mode activation - Temporary email cleanup after registration --- .env.example | 7 + .gitignore | 27 +++ README.md | 75 +++++++++ g/__init__.py | 9 + g/email_service.py | 66 ++++++++ g/nsfw_service.py | 164 ++++++++++++++++++ g/turnstile_service.py | 105 ++++++++++++ g/user_agreement_service.py | 115 +++++++++++++ grok.py | 327 ++++++++++++++++++++++++++++++++++++ requirements.txt | 4 + 10 files changed, 899 insertions(+) create mode 100644 .env.example create mode 100644 .gitignore create mode 100644 README.md create mode 100644 g/__init__.py create mode 100644 g/email_service.py create mode 100644 g/nsfw_service.py create mode 100644 g/turnstile_service.py create mode 100644 g/user_agreement_service.py create mode 100644 grok.py create mode 100644 requirements.txt diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..0dbd412 --- /dev/null +++ b/.env.example @@ -0,0 +1,7 @@ +# freemail API 配置 +WORKER_DOMAIN=your-freemail-domain.com +FREEMAIL_TOKEN=your-jwt-token + +# Turnstile 验证配置 +# 如果不填则使用本地 Turnstile Solver(http://127.0.0.1:5072) +YESCAPTCHA_KEY= diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4123096 --- /dev/null +++ b/.gitignore @@ -0,0 +1,27 @@ +# 环境配置 +.env + +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so + +# IDE +.idea/ +.vscode/ +*.swp +*.swo + +# 项目特定 +keys/ +.ace-tool/ +.claude/ +.playwright-mcp/ + +# 本地测试文件 +5线程运行TurnstileSolver.bat +TurnstileSolver.bat +api_solver.py +browser_configs.py +db_results.py diff --git a/README.md b/README.md new file mode 100644 index 0000000..164064f --- /dev/null +++ b/README.md @@ -0,0 +1,75 @@ +# Grok 批量注册工具 + +批量注册 Grok 账号并自动开启 NSFW 功能。 + +## 功能 + +- 自动创建临时邮箱 +- 自动获取验证码 +- 自动完成注册流程 +- 自动开启 NSFW/Unhinged 模式 +- 注册完成后自动清理临时邮箱 +- 支持多线程并发注册 + +## 依赖 + +- [freemail](https://github.com/user/freemail) - 临时邮箱服务(需自行部署) +- Turnstile Solver - 验证码解决方案 + +## 安装 + +```bash +pip install -r requirements.txt +``` + +## 配置 + +复制 `.env.example` 为 `.env` 并填写配置: + +```bash +cp .env.example .env +``` + +配置项说明: + +| 配置项 | 说明 | +|--------|------| +| WORKER_DOMAIN | freemail 服务域名 | +| FREEMAIL_TOKEN | freemail JWT Token | +| YESCAPTCHA_KEY | YesCaptcha API Key(可选,不填使用本地 Solver) | + +## 使用 + +```bash +python grok.py +``` + +按提示输入: +- 并发数(默认 8) +- 注册数量(默认 100) + +注册成功的 SSO Token 保存在 `keys/grok_时间戳_数量.txt` + +## 输出示例 + +``` +============================================================ +Grok 注册机 +============================================================ +[*] 正在初始化... +[+] Action ID: 7f67aa61adfb0655899002808e1d443935b057c25b +[*] 启动 8 个线程,目标 10 个 +[*] 输出: keys/grok_20260204_190000_10.txt +[*] 开始注册: abc123@example.com +[+] 1/10 abc123@example.com | 5.2s/个 +[+] 2/10 def456@example.com | 4.8s/个 +... +[*] 开始二次验证 NSFW... +[*] 二次验证完成: 10/10 +``` + +## 注意事项 + +- 需要自行部署 freemail 临时邮箱服务 +- 需要 Turnstile Solver(本地或 YesCaptcha) +- 仅供学习研究使用 diff --git a/g/__init__.py b/g/__init__.py new file mode 100644 index 0000000..d56e403 --- /dev/null +++ b/g/__init__.py @@ -0,0 +1,9 @@ +""" +注册机配件 +""" +from .email_service import EmailService +from .turnstile_service import TurnstileService +from .user_agreement_service import UserAgreementService +from .nsfw_service import NsfwSettingsService + +__all__ = ['EmailService', 'TurnstileService', 'UserAgreementService', 'NsfwSettingsService'] diff --git a/g/email_service.py b/g/email_service.py new file mode 100644 index 0000000..71c72fd --- /dev/null +++ b/g/email_service.py @@ -0,0 +1,66 @@ +"""邮箱服务类 - 适配 freemail API""" +import os +import time +import requests +from dotenv import load_dotenv + + +class EmailService: + def __init__(self): + load_dotenv() + self.worker_domain = os.getenv("WORKER_DOMAIN") + self.freemail_token = os.getenv("FREEMAIL_TOKEN") + if not all([self.worker_domain, self.freemail_token]): + raise ValueError("Missing: WORKER_DOMAIN or FREEMAIL_TOKEN") + self.base_url = f"https://{self.worker_domain}" + self.headers = {"Authorization": f"Bearer {self.freemail_token}"} + + def create_email(self): + """创建临时邮箱 GET /api/generate""" + try: + res = requests.get( + f"{self.base_url}/api/generate", + headers=self.headers, + timeout=10 + ) + if res.status_code == 200: + email = res.json().get("email") + return email, email # 兼容原接口 (jwt, email) + print(f"[-] 创建邮箱失败: {res.status_code} - {res.text}") + return None, None + except Exception as e: + print(f"[-] 创建邮箱失败: {e}") + return None, None + + def fetch_verification_code(self, email, max_attempts=30): + """轮询获取验证码 GET /api/emails?mailbox=xxx""" + for _ in range(max_attempts): + try: + res = requests.get( + f"{self.base_url}/api/emails", + params={"mailbox": email}, + headers=self.headers, + timeout=10 + ) + if res.status_code == 200: + emails = res.json() + if emails and emails[0].get("verification_code"): + code = emails[0]["verification_code"] + return code.replace("-", "") + except: + pass + time.sleep(1) + return None + + def delete_email(self, address): + """删除邮箱 DELETE /api/mailboxes?address=xxx""" + try: + res = requests.delete( + f"{self.base_url}/api/mailboxes", + params={"address": address}, + headers=self.headers, + timeout=10 + ) + return res.status_code == 200 and res.json().get("success") + except: + return False diff --git a/g/nsfw_service.py b/g/nsfw_service.py new file mode 100644 index 0000000..b161502 --- /dev/null +++ b/g/nsfw_service.py @@ -0,0 +1,164 @@ +from __future__ import annotations + +from typing import Optional, Dict, Any + +from curl_cffi import requests + +DEFAULT_USER_AGENT = ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/120.0.0.0 Safari/537.36" +) + + +class NsfwSettingsService: + """开启 NSFW 相关设置(线程安全,无全局状态)。""" + + def __init__(self, cf_clearance: str = ""): + self.cf_clearance = (cf_clearance or "").strip() + + def enable_nsfw( + self, + sso: str, + sso_rw: str, + impersonate: str, + user_agent: Optional[str] = None, + cf_clearance: Optional[str] = None, + timeout: int = 15, + ) -> Dict[str, Any]: + """ + 启用 always_show_nsfw_content。 + 返回: { + ok: bool, + hex_reply: str, + status_code: int | None, + grpc_status: str | None, + error: str | None + } + """ + if not sso: + return { + "ok": False, + "hex_reply": "", + "status_code": None, + "grpc_status": None, + "error": "缺少 sso", + } + if not sso_rw: + return { + "ok": False, + "hex_reply": "", + "status_code": None, + "grpc_status": None, + "error": "缺少 sso-rw", + } + + url = "https://grok.com/auth_mgmt.AuthManagement/UpdateUserFeatureControls" + + cookies = { + "sso": sso, + "sso-rw": sso_rw, + } + clearance = (cf_clearance if cf_clearance is not None else self.cf_clearance).strip() + if clearance: + cookies["cf_clearance"] = clearance + + headers = { + "content-type": "application/grpc-web+proto", + "origin": "https://grok.com", + "referer": "https://grok.com/?_s=data", + "x-grpc-web": "1", + "user-agent": user_agent or DEFAULT_USER_AGENT, + } + + data = ( + b"\x00\x00\x00\x00" + b"\x20" + b"\x0a\x02\x10\x01" + b"\x12\x1a" + b"\x0a\x18" + b"always_show_nsfw_content" + ) + + try: + response = requests.post( + url, + headers=headers, + cookies=cookies, + data=data, + impersonate=impersonate or "chrome120", + timeout=timeout, + ) + hex_reply = response.content.hex() + grpc_status = response.headers.get("grpc-status") + + error = None + ok = response.status_code == 200 and (grpc_status in (None, "0")) + if response.status_code == 403: + error = "403 Forbidden" + elif response.status_code != 200: + error = f"HTTP {response.status_code}" + elif grpc_status not in (None, "0"): + error = f"gRPC {grpc_status}" + + return { + "ok": ok, + "hex_reply": hex_reply, + "status_code": response.status_code, + "grpc_status": grpc_status, + "error": error, + } + except Exception as e: + return { + "ok": False, + "hex_reply": "", + "status_code": None, + "grpc_status": None, + "error": str(e), + } + + def enable_unhinged( + self, + sso: str, + impersonate: str = "chrome120", + user_agent: Optional[str] = None, + timeout: int = 30, + ) -> Dict[str, Any]: + """ + 使用帖子方法开启 Unhinged 模式(二次验证)。 + """ + import struct + + url = "https://grok.com/auth_mgmt.AuthManagement/UpdateUserFeatureControls" + + headers = { + "accept": "*/*", + "content-type": "application/grpc-web+proto", + "origin": "https://grok.com", + "referer": "https://grok.com/", + "user-agent": user_agent or DEFAULT_USER_AGENT, + "x-grpc-web": "1", + "x-user-agent": "connect-es/2.1.1", + "cookie": f"sso={sso}; sso-rw={sso}" + } + + payload = bytes([0x08, 0x01, 0x10, 0x01]) + data = b'\x00' + struct.pack('>I', len(payload)) + payload + + try: + response = requests.post( + url, + headers=headers, + data=data, + impersonate=impersonate, + timeout=timeout, + ) + return { + "ok": response.status_code == 200, + "status_code": response.status_code, + } + except Exception as e: + return { + "ok": False, + "error": str(e), + } diff --git a/g/turnstile_service.py b/g/turnstile_service.py new file mode 100644 index 0000000..d03eccc --- /dev/null +++ b/g/turnstile_service.py @@ -0,0 +1,105 @@ +""" +Turnstile验证服务类 +""" +import os +import time +import requests +from dotenv import load_dotenv + +load_dotenv() + + +class TurnstileService: + """Turnstile验证服务类""" + + def __init__(self, solver_url="http://127.0.0.1:5072"): + """ + 初始化Turnstile服务 + """ + self.yescaptcha_key = os.getenv('YESCAPTCHA_KEY', '').strip() + self.solver_url = solver_url + self.yescaptcha_api = "https://api.yescaptcha.com" + + def create_task(self, siteurl, sitekey): + """ + 创建Turnstile验证任务 + """ + if self.yescaptcha_key: + # 使用 YesCaptcha API + url = f"{self.yescaptcha_api}/createTask" + payload = { + "clientKey": self.yescaptcha_key, + "task": { + "type": "TurnstileTaskProxyless", + "websiteURL": siteurl, + "websiteKey": sitekey + } + } + response = requests.post(url, json=payload) + response.raise_for_status() + data = response.json() + if data.get('errorId') != 0: + raise Exception(f"YesCaptcha创建任务失败: {data.get('errorDescription')}") + return data['taskId'] + else: + # 使用本地 Turnstile Solver + url = f"{self.solver_url}/turnstile?url={siteurl}&sitekey={sitekey}" + response = requests.get(url) + response.raise_for_status() + return response.json()['taskId'] + + def get_response(self, task_id, max_retries=30, initial_delay=5, retry_delay=2): + """ + 获取Turnstile验证响应 + """ + time.sleep(initial_delay) + + for _ in range(max_retries): + try: + if self.yescaptcha_key: + # 使用 YesCaptcha API + url = f"{self.yescaptcha_api}/getTaskResult" + payload = { + "clientKey": self.yescaptcha_key, + "taskId": task_id + } + response = requests.post(url, json=payload) + response.raise_for_status() + data = response.json() + + if data.get('errorId') != 0: + print(f"YesCaptcha获取结果失败: {data.get('errorDescription')}") + return None + + if data.get('status') == 'ready': + token = data.get('solution', {}).get('token') + if token: + return token + else: + print("YesCaptcha返回结果中没有token") + return None + elif data.get('status') == 'processing': + time.sleep(retry_delay) + else: + print(f"YesCaptcha未知状态: {data.get('status')}") + time.sleep(retry_delay) + else: + # 使用本地 Turnstile Solver + url = f"{self.solver_url}/result?id={task_id}" + response = requests.get(url) + response.raise_for_status() + data = response.json() + captcha = data.get('solution', {}).get('token', None) + + if captcha: + if captcha != "CAPTCHA_FAIL": + return captcha + else: + return None + else: + time.sleep(retry_delay) + except Exception as e: + print(f"获取Turnstile响应异常: {e}") + time.sleep(retry_delay) + + return None diff --git a/g/user_agreement_service.py b/g/user_agreement_service.py new file mode 100644 index 0000000..94a1c19 --- /dev/null +++ b/g/user_agreement_service.py @@ -0,0 +1,115 @@ +from __future__ import annotations + +from typing import Optional, Dict, Any + +from curl_cffi import requests + +DEFAULT_USER_AGENT = ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/120.0.0.0 Safari/537.36" +) + + +class UserAgreementService: + """处理账号协议同意流程(线程安全,无全局状态)。""" + + def __init__(self, cf_clearance: str = ""): + self.cf_clearance = (cf_clearance or "").strip() + + def accept_tos_version( + self, + sso: str, + sso_rw: str, + impersonate: str, + user_agent: Optional[str] = None, + cf_clearance: Optional[str] = None, + timeout: int = 15, + ) -> Dict[str, Any]: + """ + 同意 TOS 版本。 + 返回: { + ok: bool, + hex_reply: str, + status_code: int | None, + grpc_status: str | None, + error: str | None + } + """ + if not sso: + return { + "ok": False, + "hex_reply": "", + "status_code": None, + "grpc_status": None, + "error": "缺少 sso", + } + if not sso_rw: + return { + "ok": False, + "hex_reply": "", + "status_code": None, + "grpc_status": None, + "error": "缺少 sso-rw", + } + + url = "https://accounts.x.ai/auth_mgmt.AuthManagement/SetTosAcceptedVersion" + + cookies = { + "sso": sso, + "sso-rw": sso_rw, + } + clearance = (cf_clearance if cf_clearance is not None else self.cf_clearance).strip() + if clearance: + cookies["cf_clearance"] = clearance + + headers = { + "content-type": "application/grpc-web+proto", + "origin": "https://accounts.x.ai", + "referer": "https://accounts.x.ai/accept-tos", + "x-grpc-web": "1", + "user-agent": user_agent or DEFAULT_USER_AGENT, + } + + data = ( + b"\x00\x00\x00\x00" # 头部 + b"\x02" # 长度 + b"\x10\x01" # Field 2 = 1 + ) + + try: + response = requests.post( + url, + headers=headers, + cookies=cookies, + data=data, + impersonate=impersonate or "chrome120", + timeout=timeout, + ) + hex_reply = response.content.hex() + grpc_status = response.headers.get("grpc-status") + + error = None + ok = response.status_code == 200 and (grpc_status in (None, "0")) + if response.status_code == 403: + error = "403 Forbidden" + elif response.status_code != 200: + error = f"HTTP {response.status_code}" + elif grpc_status not in (None, "0"): + error = f"gRPC {grpc_status}" + + return { + "ok": ok, + "hex_reply": hex_reply, + "status_code": response.status_code, + "grpc_status": grpc_status, + "error": error, + } + except Exception as e: + return { + "ok": False, + "hex_reply": "", + "status_code": None, + "grpc_status": None, + "error": str(e), + } diff --git a/grok.py b/grok.py new file mode 100644 index 0000000..d81fb64 --- /dev/null +++ b/grok.py @@ -0,0 +1,327 @@ +import os, json, random, string, time, re, struct +import threading +import concurrent.futures +from urllib.parse import urljoin, urlparse +from curl_cffi import requests +from bs4 import BeautifulSoup + +from g import EmailService, TurnstileService, UserAgreementService, NsfwSettingsService + +# 基础配置 +site_url = "https://accounts.x.ai" +DEFAULT_IMPERSONATE = "chrome120" +CHROME_PROFILES = [ + {"impersonate": "chrome110", "version": "110.0.0.0", "brand": "chrome"}, + {"impersonate": "chrome119", "version": "119.0.0.0", "brand": "chrome"}, + {"impersonate": "chrome120", "version": "120.0.0.0", "brand": "chrome"}, + {"impersonate": "edge99", "version": "99.0.1150.36", "brand": "edge"}, + {"impersonate": "edge101", "version": "101.0.1210.47", "brand": "edge"}, +] +def get_random_chrome_profile(): + profile = random.choice(CHROME_PROFILES) + if profile.get("brand") == "edge": + chrome_major = profile["version"].split(".")[0] + chrome_version = f"{chrome_major}.0.0.0" + ua = ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + f"Chrome/{chrome_version} Safari/537.36 Edg/{profile['version']}" + ) + else: + ua = ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + f"Chrome/{profile['version']} Safari/537.36" + ) + return profile["impersonate"], ua +PROXIES = { + # "http": "http://127.0.0.1:10808", + # "https": "http://127.0.0.1:10808" +} + +# 动态获取的全局变量 +config = { + "site_key": "0x4AAAAAAAhr9JGVDZbrZOo0", + "action_id": None, + "state_tree": "%5B%22%22%2C%7B%22children%22%3A%5B%22(app)%22%2C%7B%22children%22%3A%5B%22(auth)%22%2C%7B%22children%22%3A%5B%22sign-up%22%2C%7B%22children%22%3A%5B%22__PAGE__%22%2C%7B%7D%2C%22%2Fsign-up%22%2C%22refresh%22%5D%7D%5D%7D%2Cnull%2Cnull%5D%7D%2Cnull%2Cnull%5D%7D%2Cnull%2Cnull%2Ctrue%5D" +} + +post_lock = threading.Lock() +file_lock = threading.Lock() +success_count = 0 +start_time = time.time() +target_count = 100 +stop_event = threading.Event() +output_file = None + +def generate_random_name() -> str: + length = random.randint(4, 6) + return random.choice(string.ascii_uppercase) + ''.join(random.choice(string.ascii_lowercase) for _ in range(length - 1)) + +def generate_random_string(length: int = 15) -> str: + return "".join(random.choice(string.ascii_lowercase + string.digits) for _ in range(length)) + +def encode_grpc_message(field_id, string_value): + key = (field_id << 3) | 2 + value_bytes = string_value.encode('utf-8') + length = len(value_bytes) + payload = struct.pack('B', key) + struct.pack('B', length) + value_bytes + return b'\x00' + struct.pack('>I', len(payload)) + payload + +def encode_grpc_message_verify(email, code): + p1 = struct.pack('B', (1 << 3) | 2) + struct.pack('B', len(email)) + email.encode('utf-8') + p2 = struct.pack('B', (2 << 3) | 2) + struct.pack('B', len(code)) + code.encode('utf-8') + payload = p1 + p2 + return b'\x00' + struct.pack('>I', len(payload)) + payload + +def send_email_code_grpc(session, email): + url = f"{site_url}/auth_mgmt.AuthManagement/CreateEmailValidationCode" + data = encode_grpc_message(1, email) + headers = {"content-type": "application/grpc-web+proto", "x-grpc-web": "1", "x-user-agent": "connect-es/2.1.1", "origin": site_url, "referer": f"{site_url}/sign-up?redirect=grok-com"} + try: + # print(f"[debug] {email} 正在发送验证码请求...") + res = session.post(url, data=data, headers=headers, timeout=15) + # print(f"[debug] {email} 请求结束,状态码: {res.status_code}") + return res.status_code == 200 + except Exception as e: + print(f"[-] {email} 发送验证码异常: {e}") + return False + +def verify_email_code_grpc(session, email, code): + url = f"{site_url}/auth_mgmt.AuthManagement/VerifyEmailValidationCode" + data = encode_grpc_message_verify(email, code) + headers = {"content-type": "application/grpc-web+proto", "x-grpc-web": "1", "x-user-agent": "connect-es/2.1.1", "origin": site_url, "referer": f"{site_url}/sign-up?redirect=grok-com"} + try: + res = session.post(url, data=data, headers=headers, timeout=15) + # print(f"[debug] {email} 验证响应状态: {res.status_code}, 内容长度: {len(res.content)}") + return res.status_code == 200 + except Exception as e: + print(f"[-] {email} 验证验证码异常: {e}") + return False + +def register_single_thread(): + # 错峰启动,防止瞬时并发过高 + time.sleep(random.uniform(0, 5)) + + try: + email_service = EmailService() + turnstile_service = TurnstileService() + user_agreement_service = UserAgreementService() + nsfw_service = NsfwSettingsService() + except Exception as e: + print(f"[-] 服务初始化失败: {e}") + return + + # 修正:直接从 config 获取 + final_action_id = config["action_id"] + if not final_action_id: + print("[-] 线程退出:缺少 Action ID") + return + + while True: + try: + if stop_event.is_set(): + return + impersonate_fingerprint, account_user_agent = get_random_chrome_profile() + with requests.Session(impersonate=impersonate_fingerprint, proxies=PROXIES) as session: + # 预热连接 + try: session.get(site_url, timeout=10) + except: pass + + password = generate_random_string() + + # print(f"[debug] 线程-{threading.get_ident()} 正在请求创建邮箱...") + try: + jwt, email = email_service.create_email() + except Exception as e: + print(f"[-] 邮箱服务抛出异常: {e}") + jwt, email = None, None + + if not email: + time.sleep(5); continue + + if stop_event.is_set(): + if email: + email_service.delete_email(email) + return + + print(f"[*] 开始注册: {email}") + + # Step 1: 发送验证码 + if not send_email_code_grpc(session, email): + email_service.delete_email(email) + time.sleep(5); continue + + # Step 2: 获取验证码 + verify_code = email_service.fetch_verification_code(email) + if not verify_code: + email_service.delete_email(email) + continue + + # Step 3: 验证验证码 + if not verify_email_code_grpc(session, email, verify_code): + email_service.delete_email(email) + continue + + # Step 4: 注册重试循环 + for attempt in range(3): + if stop_event.is_set(): + email_service.delete_email(email) + return + task_id = turnstile_service.create_task(site_url, config["site_key"]) + # 这里不再打印获取 Token 的过程,只在失败时报错 + token = turnstile_service.get_response(task_id) + + if not token or token == "CAPTCHA_FAIL": + continue + + headers = { + "user-agent": account_user_agent, "accept": "text/x-component", "content-type": "text/plain;charset=UTF-8", + "origin": site_url, "referer": f"{site_url}/sign-up", "cookie": f"__cf_bm={session.cookies.get('__cf_bm','')}", + "next-router-state-tree": config["state_tree"], "next-action": final_action_id + } + payload = [{ + "emailValidationCode": verify_code, + "createUserAndSessionRequest": { + "email": email, "givenName": generate_random_name(), "familyName": generate_random_name(), + "clearTextPassword": password, "tosAcceptedVersion": "$undefined" + }, + "turnstileToken": token, "promptOnDuplicateEmail": True + }] + + with post_lock: + res = session.post(f"{site_url}/sign-up", json=payload, headers=headers) + + if res.status_code == 200: + match = re.search(r'(https://[^" \s]+set-cookie\?q=[^:" \s]+)1:', res.text) + if not match: + email_service.delete_email(email) + break + if match: + verify_url = match.group(1) + session.get(verify_url, allow_redirects=True) + sso = session.cookies.get("sso") + sso_rw = session.cookies.get("sso-rw") + if not sso: + email_service.delete_email(email) + break + + tos_result = user_agreement_service.accept_tos_version( + sso=sso, + sso_rw=sso_rw or "", + impersonate=impersonate_fingerprint, + user_agent=account_user_agent, + ) + tos_hex = tos_result.get("hex_reply") or "" + if not tos_result.get("ok") or not tos_hex: + email_service.delete_email(email) + break + + nsfw_result = nsfw_service.enable_nsfw( + sso=sso, + sso_rw=sso_rw or "", + impersonate=impersonate_fingerprint, + user_agent=account_user_agent, + ) + nsfw_hex = nsfw_result.get("hex_reply") or "" + if not nsfw_result.get("ok") or not nsfw_hex: + email_service.delete_email(email) + break + + with file_lock: + global success_count + if success_count >= target_count: + if not stop_event.is_set(): + stop_event.set() + email_service.delete_email(email) + break + with open(output_file, "a") as f: f.write(sso + "\n") + success_count += 1 + avg = (time.time() - start_time) / success_count + print(f"[+] {success_count}/{target_count} {email} | {avg:.1f}s/个") + email_service.delete_email(email) + if success_count >= target_count and not stop_event.is_set(): + stop_event.set() + break # 跳出 for 循环,继续 while True 注册下一个 + + time.sleep(3) + else: + # 如果重试 3 次都失败 (for 循环没有被 break) + email_service.delete_email(email) + time.sleep(5) + + except Exception as e: + print(f"[-] 异常: {str(e)[:50]}") + time.sleep(5) + +def main(): + print("=" * 60 + "\nGrok 注册机\n" + "=" * 60) + + # 1. 扫描参数 + print("[*] 正在初始化...") + start_url = f"{site_url}/sign-up" + with requests.Session(impersonate=DEFAULT_IMPERSONATE) as s: + try: + html = s.get(start_url).text + # Key + key_match = re.search(r'sitekey":"(0x4[a-zA-Z0-9_-]+)"', html) + if key_match: config["site_key"] = key_match.group(1) + # Tree + tree_match = re.search(r'next-router-state-tree":"([^"]+)"', html) + if tree_match: config["state_tree"] = tree_match.group(1) + # Action ID + soup = BeautifulSoup(html, 'html.parser') + js_urls = [urljoin(start_url, script['src']) for script in soup.find_all('script', src=True) if '_next/static' in script['src']] + for js_url in js_urls: + js_content = s.get(js_url).text + match = re.search(r'7f[a-fA-F0-9]{40}', js_content) + if match: + config["action_id"] = match.group(0) + print(f"[+] Action ID: {config['action_id']}") + break + except Exception as e: + print(f"[-] 初始化扫描失败: {e}") + return + + if not config["action_id"]: + print("[-] 错误: 未找到 Action ID") + return + + # 2. 启动 + try: + t = int(input("\n并发数 (默认8): ").strip() or 8) + except: t = 8 + + try: + total = int(input("注册数量 (默认100): ").strip() or 100) + except: total = 100 + + global target_count, output_file + target_count = max(1, total) + + from datetime import datetime + os.makedirs("keys", exist_ok=True) + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + output_file = f"keys/grok_{timestamp}_{target_count}.txt" + + print(f"[*] 启动 {t} 个线程,目标 {target_count} 个") + print(f"[*] 输出: {output_file}") + with concurrent.futures.ThreadPoolExecutor(max_workers=t) as executor: + futures = [executor.submit(register_single_thread) for _ in range(t)] + concurrent.futures.wait(futures) + + # 二次验证 NSFW + if os.path.exists(output_file): + print(f"\n[*] 开始二次验证 NSFW...") + nsfw_service = NsfwSettingsService() + with open(output_file, "r") as f: + tokens = [line.strip() for line in f if line.strip()] + ok_count = 0 + for sso in tokens: + result = nsfw_service.enable_unhinged(sso) + if result.get("ok"): + ok_count += 1 + print(f"[*] 二次验证完成: {ok_count}/{len(tokens)}") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..ddb61cb --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +curl_cffi +beautifulsoup4 +python-dotenv +requests