X 开发者账号自动利用土狗Meme KOL账号生成list页

import json import requests import time from utils.redisdb import redis_cli from retry import retry from utils.spider_failed_alert import ErrorMonitor class AddMemberToList(object): def __init__(self): # 缓存Twittertoken的生成时间,用于判断其是否超过2个小时 # self.token_time_map = {} self.rc = redis_cli() # 根据user_id去重,任务去重 (set结构) self.filter_key = 'twitter:list:has_added_members' # 建立list页的ID和该list页成员数量的映射关系(hash结构) self.listID_amount_mapping = 'twitter:list:target_listID:amount' # 建立user_name => user_id的映射关系(hash结构) self.user_name_to_id_mapping = 'twitter:list:user_name_to_user_id:mapping' @retry(tries=10, delay=3) def gen_token(self): """ 获取X的access token 首先从内存缓存获取没有过期的token,如果获取不到那么就从接口获取最新token,并且刷新内存缓存信息 """ url = 'https://example.com/getAccessToken' res = requests.get(url) # print('更新token ==>', res.text) token_res = json.loads(res.json().get('data')) token = token_res.get('access_token') print('更新token成功!') if not token: raise Exception(f'shane接口返回的token为空:{token_res}') return token @retry(tries=10, delay=60) def sent_request(self, method, url, data=None): token = self.gen_token() if not token: raise Exception('无法获取到token') """发送请求返回响应""" if method == 'POST': headers = { "Authorization": "Bearer {}".format(token), "Content-Type": "application/json" } else: headers = {"Authorization": "Bearer {}".format(token)} response = requests.request(method=method, url=url, headers=headers, json=data) if response.json().get('status') == 401: # 激活重试 raise Exception(f'token 过期 ,进行更新 {response.text} {headers} {method} {url} {data}') # twitter所有接口频率限制为5 time.sleep(5) return response def find_user_id(self, user_name): """通过用户名查找用户,返回用户id""" url = "https://api.twitter.com/2/users/by/username/{username}".format(username=user_name) response = self.sent_request('GET', url) res_data = response.json() print('find_user_id response ===>', res_data) if res_data.get('data',{}).get('id') and res_data.get('data',{}).get('username')==user_name: return res_data.get('data').get('id') else: # 很多账号没多久就会销号,需要将这样的账号拉黑 self.rc.sadd(self.filter_key, user_name) print(f'没有找到用户 {user_name} {res_data}') return None @retry(tries=3, delay=10) def create_list(self, name, description): """创建list""" url = "https://api.twitter.com/2/lists" payload = { "description": description, "name": name, #name: may only be 25 characters long "private": False } response = self.sent_request("POST", url, data=payload) res_data = response.json() print('create_list response ===> {}'.format(response.text)) id = res_data.get('data',{}).get('id') if id: print(f'创建新的列表页成功,ID:{id}') # 创建完列表页里面将该ID记录,以防止后续程序崩溃,导致该列表页的ID被丢弃 self.rc.hset(self.listID_amount_mapping, id, 0) # 返回新创建的列表页ID return id else: raise Exception('创建列表页失败') def add_member_to_list(self, list_id, user_id): """ 添加成员到list(单个添加) return: listID or None """ url = "https://api.twitter.com/2/lists/{id}/members".format(id=list_id) payload = {"user_id": user_id} response = self.sent_request("POST", url, data=payload) print('add_member response ===>', response.text) # response = self.add_members(list_id, user_id) if response.json().get('data',{}).get('is_member'): # 加过list页的就去重(set结构) self.rc.sadd(self.filter_key, user_id) # 更新该list页的成员数量 total_amount = self.rc.hincrby(self.listID_amount_mapping, list_id, 1) print(f'当前list页 {list_id} 成员总数:{total_amount}') return True else: print(f'****{user_id} 用户添加失败 {response.json()}*****') return False def add_members_to_list(self, list_id, member_id_list): """ 批量添加成员到list页 list_id: list页的ID member_id_list:[123456, 890400] user_id 列表清单 """ member_id_list_lenght = len(member_id_list) print(f'共有{member_id_list_lenght}个KOL账号要添加') task_done_status = False success_amount = 0 for member_id in member_id_list: # 如果添加成功,那么则返回 list_id 否则返回None added_status = self.add_member_to_list(list_id, member_id) # 添加失败则休息60秒 if added_status: task_done_status = True success_amount += 1 else: print('等待60s') time.sleep(60) if

May 2, 2025 - 12:36
 0
X 开发者账号自动利用土狗Meme KOL账号生成list页

import json
import requests
import time
from utils.redisdb import redis_cli
from retry import retry
from utils.spider_failed_alert import ErrorMonitor

class AddMemberToList(object):
    def __init__(self):
        # 缓存Twittertoken的生成时间,用于判断其是否超过2个小时
        # self.token_time_map = {}
        self.rc = redis_cli()
        # 根据user_id去重,任务去重 (set结构)
        self.filter_key = 'twitter:list:has_added_members'
        # 建立list页的ID和该list页成员数量的映射关系(hash结构)
        self.listID_amount_mapping = 'twitter:list:target_listID:amount'
        # 建立user_name => user_id的映射关系(hash结构)
        self.user_name_to_id_mapping = 'twitter:list:user_name_to_user_id:mapping'

    @retry(tries=10, delay=3)
    def gen_token(self):
        """
        获取X的access token
        首先从内存缓存获取没有过期的token,如果获取不到那么就从接口获取最新token,并且刷新内存缓存信息
        """
        url =  'https://example.com/getAccessToken'
        res = requests.get(url)
        # print('更新token ==>', res.text)
        token_res = json.loads(res.json().get('data'))
        token = token_res.get('access_token')
        print('更新token成功!')
        if not token:
            raise Exception(f'shane接口返回的token为空:{token_res}')
        return token

    @retry(tries=10, delay=60)
    def sent_request(self, method, url, data=None):
        token = self.gen_token()
        if not token:
            raise Exception('无法获取到token')
        """发送请求返回响应"""
        if method == 'POST':
            headers = {
                "Authorization": "Bearer {}".format(token),
                "Content-Type": "application/json"
            }
        else:
            headers = {"Authorization": "Bearer {}".format(token)}
        response = requests.request(method=method, url=url, headers=headers, json=data)
        if response.json().get('status') == 401:
            # 激活重试
            raise Exception(f'token 过期 ,进行更新 {response.text} {headers} {method} {url} {data}')
        # twitter所有接口频率限制为5
        time.sleep(5)
        return response

    def find_user_id(self, user_name):
        """通过用户名查找用户,返回用户id"""
        url = "https://api.twitter.com/2/users/by/username/{username}".format(username=user_name)
        response = self.sent_request('GET', url)
        res_data = response.json()
        print('find_user_id response ===>', res_data)
        if res_data.get('data',{}).get('id') and res_data.get('data',{}).get('username')==user_name:
            return res_data.get('data').get('id')
        else:
            # 很多账号没多久就会销号,需要将这样的账号拉黑
            self.rc.sadd(self.filter_key, user_name)
            print(f'没有找到用户 {user_name} {res_data}')
            return None

    @retry(tries=3, delay=10)
    def create_list(self, name, description):
        """创建list"""
        url = "https://api.twitter.com/2/lists"
        payload = {
            "description": description,
            "name": name, #name: may only be 25 characters long
            "private": False
        }
        response = self.sent_request("POST", url, data=payload)
        res_data = response.json()
        print('create_list response ===> {}'.format(response.text))
        id = res_data.get('data',{}).get('id')
        if id:
            print(f'创建新的列表页成功,ID:{id}')
            # 创建完列表页里面将该ID记录,以防止后续程序崩溃,导致该列表页的ID被丢弃
            self.rc.hset(self.listID_amount_mapping, id, 0)
            # 返回新创建的列表页ID
            return id
        else:
            raise Exception('创建列表页失败')

    def add_member_to_list(self, list_id, user_id):
        """
        添加成员到list(单个添加)
        return: listID or None
        """
        url = "https://api.twitter.com/2/lists/{id}/members".format(id=list_id)
        payload = {"user_id": user_id}
        response = self.sent_request("POST", url, data=payload)
        print('add_member response ===>', response.text)
        # response = self.add_members(list_id, user_id)
        if response.json().get('data',{}).get('is_member'):
            # 加过list页的就去重(set结构)
            self.rc.sadd(self.filter_key, user_id)
            # 更新该list页的成员数量
            total_amount = self.rc.hincrby(self.listID_amount_mapping, list_id, 1)
            print(f'当前list页 {list_id} 成员总数:{total_amount}')
            return True
        else:
            print(f'****{user_id} 用户添加失败 {response.json()}*****')
            return False

    def add_members_to_list(self, list_id, member_id_list):
        """
        批量添加成员到list页
        list_id: list页的ID
        member_id_list:[123456, 890400] user_id 列表清单
        """
        member_id_list_lenght = len(member_id_list)
        print(f'共有{member_id_list_lenght}个KOL账号要添加')
        task_done_status = False
        success_amount = 0
        for member_id in member_id_list:
            # 如果添加成功,那么则返回 list_id 否则返回None
            added_status = self.add_member_to_list(list_id, member_id)
            # 添加失败则休息60秒
            if added_status:
                task_done_status = True
                success_amount += 1
            else:
                print('等待60s')
                time.sleep(60)
        if task_done_status:
            print(f'全部任务 {member_id_list_lenght} 执行完毕,{success_amount} 个账号成功被添加到list页:{list_id}')
        else:
            print(f'任务执行失败,{list_id} list页无法完成任何一个KOL的添加:{member_id_list}')
        return task_done_status

    def split_user_name_list_job(self, user_name_list, threshold, jobs, list_id):
        """
        如果KOL账号清单数量太多,将其切分成多个小任务,使其能够被特定的list页容纳消耗,不至于超过5000的上限
        user_name_list:待切割的KOL user_name列表
        threshold:目标list页,还能够容纳的members上限
        jobs: 各个被切割的小任务的列表
        list_id:目标list页的ID
        """
        # while True:
        # 如果需要添加的KOL账号清单的数量超过了当前list也得能够承载的上限,那么就做截断切割操作
        job = {
            'user_name_list': user_name_list[:threshold],
            'list_id': list_id
        }
        jobs.append(job)
        # 切割过后剩余的user_name列表,让其余的list页来消耗
        user_name_list_new = user_name_list[threshold:]
        return user_name_list_new

    def ditribute_task_to_different_list_page(self, user_name_list):
        """
        根据user_name列表创建list页
        将user_name_list 任务切割分配到不同的list页去完成添加任务
        return: [{
            'user_name_list': user_name_list[:threshold],
            'list_id': list_id
        }]  任务清单 jobs
        """
        jobs = []
        # 判断这批任务user_name_list 是否能被已经存在的list页完全消化
        finished_by_existed_listpage = False
        # 从redis中获取全部的list页信息
        all_list_page_id_info = self.rc.hgetall(self.listID_amount_mapping)
        if all_list_page_id_info:
            for list_id in all_list_page_id_info:
                list_memebers_amount = all_list_page_id_info[list_id]
                # list页成员数量上限是5000
                if list_memebers_amount < 5000:
                    # 还可以添加的数量
                    added_avalible_in_future_amount = 5000 - list_memebers_amount -1
                    # 创建任务
                    if len(user_name_list) <= added_avalible_in_future_amount:
                        # 如果需要添加的KOL账号清单的数量没有超过当前获取到的list页ID的能力上线
                        job = {
                            'user_name_list': user_name_list,
                            'list_id': list_id
                        }
                        jobs.append(job)
                        finished_by_existed_listpage = True
                        # 当前已存在的list页就完全能够容纳这一批的任务量,所以跳出
                        break
                    else:
                        # 如果需要添加的KOL账号清单的数量超过了当前list也得能够承载的上限,那么就做截断切割操作
                        # 切割user_name列表将塞满目标list页的容量上限,然后返回剩余的没法被处理的user_name列表
                        # 返回值会进入下一次循环来处理
                        user_name_list = self.split_user_name_list_job(
                            user_name_list,
                            added_avalible_in_future_amount,
                            jobs,
                            list_id
                        )
        # 当已经存在的list页无法完全消耗当前的 user_name_list 时,新建新的list页
        if not finished_by_existed_listpage:
            while True:
                start_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
                # 土狗memeKOL name: may only be 25 characters long
                list_name = f'土狗memeKOL({start_time}'
                list_id = self.create_list(name='土狗memeKOL', description=list_name)
                if len(user_name_list) > 4999:
                    user_name_list = self.split_user_name_list_job(
                        user_name_list,
                        4999,
                        jobs,
                        list_id
                    )
                else:
                    # 如果需要添加的KOL账号清单的数量没有超过当前list页的 能力上线
                    job = {
                        'user_name_list': user_name_list,
                        'list_id': list_id
                    }
                    jobs.append(job)
                    break
        return jobs

    def gen_user_id(self, user_name):
        """
        根据user_name获取user_id
        """
        # 试图从redis查询user_id
        user_id = self.rc.hget(self.user_name_to_id_mapping, user_name)
        # 查询不到,则从官方接口拿
        if not user_id:
            user_id = self.find_user_id(user_name) # 查询username对应的user_id
        if user_id:
            # 缓存映射关系
            self.rc.hset(self.user_name_to_id_mapping, user_name, user_id)
        else:
            print(f'****{user_name} user_id查找失败****')
            return None
        return user_id

    def all_task_scheduler(self, user_name_list):
        """
        执行每一个切割后的任务
        """
        # 获取所有被切割分配过得任务list
        jobs = self.ditribute_task_to_different_list_page(user_name_list)
        print(f'jobs 清单:{jobs}')
        list_id_list = []
        # 遍历每一个任务,让对应的list页添加对应的KOL清单
        for job in jobs:
            user_id_list = []
            user_name_list = job['user_name_list']
            for user_name in user_name_list:
                if self.rc.sismember(self.filter_key, user_name):
                    print(f'****{user_name} 账号已被注销,跳过...****')
                    continue
                user_id = self.gen_user_id(user_name)
                if not user_id:
                    continue
                if self.rc.sismember(self.filter_key, user_id):
                    print(f'****{user_name} 账号已被覆盖监控,无需重复操作****')
                    continue
                user_id_list.append(user_id)
            list_id = job['job']
            # 批量添加KOL清单到指定的list页
            task_done_status = self.add_members_to_list(list_id, user_id_list)
            if task_done_status:
                list_id_list.append(list_id)
        print(f'所有任务执行完,得到最终 list 页的ID清单:{list_id_list}')
        return list_id_list


    def run(self, user_name_list):
        """
        用user_name的清单作为任务输入,自动查询已经创建过的list页且尚未达到5000阈值的,继续添加成员,若达到阈值则创建新的list页来完成目标任务
        直到所有user_name被全部添加完成,返回最终的list ID清单
        """

        # 对历史数据进行处理
        history_ids = [
            "1918225637397143707",
            "1918221677894525275",
            "1918246506534191436"
        ]
        all_list_page_id_info = self.rc.hgetall(self.listID_amount_mapping)
        print(f'all_list_page_id_info: {all_list_page_id_info}')
        for id in history_ids:
            if id not in all_list_page_id_info:
                self.rc.hset(self.listID_amount_mapping, id, 0)
                print(f'帖子ID:{id} 刷入redis')

        # 批量添加user_name 到list页
        self.all_task_scheduler(user_name_list)