diff --git a/grok.py b/grok.py index d81fb64..be94a1d 100644 --- a/grok.py +++ b/grok.py @@ -102,7 +102,7 @@ def verify_email_code_grpc(session, email, code): def register_single_thread(): # 错峰启动,防止瞬时并发过高 time.sleep(random.uniform(0, 5)) - + try: email_service = EmailService() turnstile_service = TurnstileService() @@ -111,16 +111,21 @@ def register_single_thread(): except Exception as e: print(f"[-] 服务初始化失败: {e}") return - + # 修正:直接从 config 获取 final_action_id = config["action_id"] if not final_action_id: print("[-] 线程退出:缺少 Action ID") return - + + current_email = None # 追踪当前邮箱,确保异常时能删除 + while True: try: if stop_event.is_set(): + if current_email: + try: email_service.delete_email(current_email) + except: pass return impersonate_fingerprint, account_user_agent = get_random_chrome_profile() with requests.Session(impersonate=impersonate_fingerprint, proxies=PROXIES) as session: @@ -129,49 +134,52 @@ def register_single_thread(): except: pass password = generate_random_string() - - # print(f"[debug] 线程-{threading.get_ident()} 正在请求创建邮箱...") + try: jwt, email = email_service.create_email() + current_email = email except Exception as e: print(f"[-] 邮箱服务抛出异常: {e}") - jwt, email = None, None + jwt, email, current_email = None, None, None if not email: time.sleep(5); continue if stop_event.is_set(): - if email: - email_service.delete_email(email) + email_service.delete_email(email) + current_email = None return - + print(f"[*] 开始注册: {email}") # Step 1: 发送验证码 if not send_email_code_grpc(session, email): email_service.delete_email(email) + current_email = None time.sleep(5); continue # Step 2: 获取验证码 verify_code = email_service.fetch_verification_code(email) if not verify_code: email_service.delete_email(email) + current_email = None continue # Step 3: 验证验证码 if not verify_email_code_grpc(session, email, verify_code): email_service.delete_email(email) + current_email = None continue - + # Step 4: 注册重试循环 for attempt in range(3): if stop_event.is_set(): email_service.delete_email(email) + current_email = None return task_id = turnstile_service.create_task(site_url, config["site_key"]) - # 这里不再打印获取 Token 的过程,只在失败时报错 token = turnstile_service.get_response(task_id) - + if not token or token == "CAPTCHA_FAIL": continue @@ -188,14 +196,15 @@ def register_single_thread(): }, "turnstileToken": token, "promptOnDuplicateEmail": True }] - + with post_lock: res = session.post(f"{site_url}/sign-up", json=payload, headers=headers) - + if res.status_code == 200: match = re.search(r'(https://[^" \s]+set-cookie\?q=[^:" \s]+)1:', res.text) if not match: email_service.delete_email(email) + current_email = None break if match: verify_url = match.group(1) @@ -204,6 +213,7 @@ def register_single_thread(): sso_rw = session.cookies.get("sso-rw") if not sso: email_service.delete_email(email) + current_email = None break tos_result = user_agreement_service.accept_tos_version( @@ -215,6 +225,7 @@ def register_single_thread(): tos_hex = tos_result.get("hex_reply") or "" if not tos_result.get("ok") or not tos_hex: email_service.delete_email(email) + current_email = None break nsfw_result = nsfw_service.enable_nsfw( @@ -226,32 +237,56 @@ def register_single_thread(): nsfw_hex = nsfw_result.get("hex_reply") or "" if not nsfw_result.get("ok") or not nsfw_hex: email_service.delete_email(email) + current_email = None break + # 立即进行二次验证 (enable_unhinged) + unhinged_result = nsfw_service.enable_unhinged(sso) + unhinged_ok = unhinged_result.get("ok", False) + with file_lock: global success_count if success_count >= target_count: if not stop_event.is_set(): stop_event.set() + print(f"[*] 已达到目标数量,删除邮箱: {email}") email_service.delete_email(email) + current_email = None + break + try: + with open(output_file, "a") as f: f.write(sso + "\n") + except Exception as write_err: + print(f"[-] 写入文件失败: {write_err}") + email_service.delete_email(email) + current_email = None break - with open(output_file, "a") as f: f.write(sso + "\n") success_count += 1 avg = (time.time() - start_time) / success_count - print(f"[+] {success_count}/{target_count} {email} | {avg:.1f}s/个") + nsfw_tag = "✓" if unhinged_ok else "✗" + print(f"[✓] 注册成功: {success_count}/{target_count} | {email} | SSO: {sso[:15]}... | 平均: {avg:.1f}s | NSFW: {nsfw_tag}") email_service.delete_email(email) + current_email = None if success_count >= target_count and not stop_event.is_set(): stop_event.set() + print(f"[*] 已达到目标数量: {success_count}/{target_count},停止新注册") break # 跳出 for 循环,继续 while True 注册下一个 time.sleep(3) else: # 如果重试 3 次都失败 (for 循环没有被 break) email_service.delete_email(email) + current_email = None time.sleep(5) except Exception as e: print(f"[-] 异常: {str(e)[:50]}") + # 异常时确保删除邮箱 + if current_email: + try: + email_service.delete_email(current_email) + except: + pass + current_email = None time.sleep(5) def main(): @@ -310,18 +345,5 @@ def main(): futures = [executor.submit(register_single_thread) for _ in range(t)] concurrent.futures.wait(futures) - # 二次验证 NSFW - if os.path.exists(output_file): - print(f"\n[*] 开始二次验证 NSFW...") - nsfw_service = NsfwSettingsService() - with open(output_file, "r") as f: - tokens = [line.strip() for line in f if line.strip()] - ok_count = 0 - for sso in tokens: - result = nsfw_service.enable_unhinged(sso) - if result.get("ok"): - ok_count += 1 - print(f"[*] 二次验证完成: {ok_count}/{len(tokens)}") - if __name__ == "__main__": main() \ No newline at end of file