亚马逊登录数据筛选

发布于:2024-04-16 ⋅ 阅读:(24) ⋅ 点赞:(0)

亚马逊登录接口逆向分析

前言

最近因为工作需要研究了下亚马逊的登录接口,找到了验证登录账户的使用方法。

方法策略

批量检验亚马逊账户是一件非常费事费力的事情,使用 selenium 在网页上操作,一分钟才能筛选两三个,所以接口是一个非常好的选择。

目前能够做到一台机器一秒钟筛选10个亚马逊账户,不过为了防止IP封禁,最后选择了使用了隧道代理进行,代理要经常变化,才能绕过亚马逊的检查。

代码示例

import requests
import threading
from queue import Queue
import time
import re
import os
import sys
import json

executable_path = os.path.realpath(sys.argv[0])
# 获取exe所在目录
directory = os.path.dirname(executable_path)
# 定义配置文件的路径
success_file = os.path.join(directory, 'success.txt')
failure_file = os.path.join(directory, 'failure.txt')
phone_file = os.path.join(directory, 'phone.txt')
config_file = os.path.join(directory, 'config.json')

f = open(config_file, 'r', encoding='utf-8')
data = json.load(f)
kdl_tunnel = data["kdl_tunnel"]
kdl_username = data["kdl_username"]
kdl_password = data["kdl_password"]
is_proxy = data["is_proxy"]
if is_proxy == "1":
    proxy = f"http://{kdl_username}:{kdl_password}@{kdl_tunnel}"
    proxies = {
        'http': proxy
    }
else:
    proxies = None


def get_info(phone):

    # 关键代码已隐藏

    if result is None:
        unregisted(phone)
        return

    result = result.group(1)
    if result in registed_list:
        registed(phone)
        return
    unregisted(phone)


def unregisted(phone):
    # 未注册
    print(f"{phone} 未注册")
    with open(success_file, 'a') as f:
        f.write(phone + "\n")


def registed(phone):
    # 已注册
    print(f"{phone} 已注册")
    with open(failure_file, 'a') as f:
        f.write(phone + "\n")


def get_urls():
    url_array = []
    with open(phone_file, 'r') as f:
        for phone in f:
            url_array.append(phone.strip())
    return url_array


# 使用线程池来并发执行请求
def worker():
    while not q.empty():
        url = q.get()
        get_info(url)
        q.task_done()


def start():
    time_now = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
    if time_now > "2026-04-15 00:00:00":
        print("已过期")
        return

    print("开始处理")
    start = time.time()
    # 将URLs加入队列
    for url in get_urls():
        q.put(url)
    # 创建线程池,这里假设有 5个线程
    threads = []
    for i in range(5):
        t = threading.Thread(target=worker)
        t.start()
        threads.append(t)

    # 等待所有任务完成
    q.join()

    # 等待所有线程完成
    for t in threads:
        t.join()

    print("处理完毕")

    # 开始检查

    print('总耗时:', time.time() - start)
    print('程序执行完毕!')


if __name__ == '__main__':
    q = Queue()
    start()

网站公告

今日签到

点亮在社区的每一天
去签到