前言
最近因为工作需要研究了下亚马逊的登录接口,找到了验证登录账户的使用方法。
方法策略
批量检验亚马逊账户是一件非常费事费力的事情,使用 selenium 在网页上操作,一分钟才能筛选两三个,所以接口是一个非常好的选择。
目前能够做到一台机器一秒钟筛选10个亚马逊账户,不过为了防止IP封禁,最后选择了使用了隧道代理进行,代理要经常变化,才能绕过亚马逊的检查。
代码示例
import requests
import threading
from queue import Queue
import time
import re
import os
import sys
import json
executable_path = os.path.realpath(sys.argv[0])
# 获取exe所在目录
directory = os.path.dirname(executable_path)
# 定义配置文件的路径
success_file = os.path.join(directory, 'success.txt')
failure_file = os.path.join(directory, 'failure.txt')
phone_file = os.path.join(directory, 'phone.txt')
config_file = os.path.join(directory, 'config.json')
f = open(config_file, 'r', encoding='utf-8')
data = json.load(f)
kdl_tunnel = data["kdl_tunnel"]
kdl_username = data["kdl_username"]
kdl_password = data["kdl_password"]
is_proxy = data["is_proxy"]
if is_proxy == "1":
proxy = f"http://{kdl_username}:{kdl_password}@{kdl_tunnel}"
proxies = {
'http': proxy
}
else:
proxies = None
def get_info(phone):
# 关键代码已隐藏
if result is None:
unregisted(phone)
return
result = result.group(1)
if result in registed_list:
registed(phone)
return
unregisted(phone)
def unregisted(phone):
# 未注册
print(f"{phone} 未注册")
with open(success_file, 'a') as f:
f.write(phone + "\n")
def registed(phone):
# 已注册
print(f"{phone} 已注册")
with open(failure_file, 'a') as f:
f.write(phone + "\n")
def get_urls():
url_array = []
with open(phone_file, 'r') as f:
for phone in f:
url_array.append(phone.strip())
return url_array
# 使用线程池来并发执行请求
def worker():
while not q.empty():
url = q.get()
get_info(url)
q.task_done()
def start():
time_now = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
if time_now > "2026-04-15 00:00:00":
print("已过期")
return
print("开始处理")
start = time.time()
# 将URLs加入队列
for url in get_urls():
q.put(url)
# 创建线程池,这里假设有 5个线程
threads = []
for i in range(5):
t = threading.Thread(target=worker)
t.start()
threads.append(t)
# 等待所有任务完成
q.join()
# 等待所有线程完成
for t in threads:
t.join()
print("处理完毕")
# 开始检查
print('总耗时:', time.time() - start)
print('程序执行完毕!')
if __name__ == '__main__':
q = Queue()
start()