import json
import requests
import time
from utils.redisdb import redis_cli
from retry import retry
from utils.spider_failed_alert import ErrorMonitor
class AddMemberToList(object):
def __init__(self):
# 缓存Twittertoken的生成时间,用于判断其是否超过2个小时
# self.token_time_map = {}
self.rc = redis_cli()
# 根据user_id去重,任务去重 (set结构)
self.filter_key = 'twitter:list:has_added_members'
# 建立list页的ID和该list页成员数量的映射关系(hash结构)
self.listID_amount_mapping = 'twitter:list:target_listID:amount'
# 建立user_name => user_id的映射关系(hash结构)
self.user_name_to_id_mapping = 'twitter:list:user_name_to_user_id:mapping'
@retry(tries=10, delay=3)
def gen_token(self):
"""
获取X的access token
首先从内存缓存获取没有过期的token,如果获取不到那么就从接口获取最新token,并且刷新内存缓存信息
"""
url = 'https://example.com/getAccessToken'
res = requests.get(url)
# print('更新token ==>', res.text)
token_res = json.loads(res.json().get('data'))
token = token_res.get('access_token')
print('更新token成功!')
if not token:
raise Exception(f'shane接口返回的token为空:{token_res}')
return token
@retry(tries=10, delay=60)
def sent_request(self, method, url, data=None):
token = self.gen_token()
if not token:
raise Exception('无法获取到token')
"""发送请求返回响应"""
if method == 'POST':
headers = {
"Authorization": "Bearer {}".format(token),
"Content-Type": "application/json"
}
else:
headers = {"Authorization": "Bearer {}".format(token)}
response = requests.request(method=method, url=url, headers=headers, json=data)
if response.json().get('status') == 401:
# 激活重试
raise Exception(f'token 过期 ,进行更新 {response.text} {headers} {method} {url} {data}')
# twitter所有接口频率限制为5
time.sleep(5)
return response
def find_user_id(self, user_name):
"""通过用户名查找用户,返回用户id"""
url = "https://api.twitter.com/2/users/by/username/{username}".format(username=user_name)
response = self.sent_request('GET', url)
res_data = response.json()
print('find_user_id response ===>', res_data)
if res_data.get('data',{}).get('id') and res_data.get('data',{}).get('username')==user_name:
return res_data.get('data').get('id')
else:
# 很多账号没多久就会销号,需要将这样的账号拉黑
self.rc.sadd(self.filter_key, user_name)
print(f'没有找到用户 {user_name} {res_data}')
return None
@retry(tries=3, delay=10)
def create_list(self, name, description):
"""创建list"""
url = "https://api.twitter.com/2/lists"
payload = {
"description": description,
"name": name, #name: may only be 25 characters long
"private": False
}
response = self.sent_request("POST", url, data=payload)
res_data = response.json()
print('create_list response ===> {}'.format(response.text))
id = res_data.get('data',{}).get('id')
if id:
print(f'创建新的列表页成功,ID:{id}')
# 创建完列表页里面将该ID记录,以防止后续程序崩溃,导致该列表页的ID被丢弃
self.rc.hset(self.listID_amount_mapping, id, 0)
# 返回新创建的列表页ID
return id
else:
raise Exception('创建列表页失败')
def add_member_to_list(self, list_id, user_id):
"""
添加成员到list(单个添加)
return: listID or None
"""
url = "https://api.twitter.com/2/lists/{id}/members".format(id=list_id)
payload = {"user_id": user_id}
response = self.sent_request("POST", url, data=payload)
print('add_member response ===>', response.text)
# response = self.add_members(list_id, user_id)
if response.json().get('data',{}).get('is_member'):
# 加过list页的就去重(set结构)
self.rc.sadd(self.filter_key, user_id)
# 更新该list页的成员数量
total_amount = self.rc.hincrby(self.listID_amount_mapping, list_id, 1)
print(f'当前list页 {list_id} 成员总数:{total_amount}')
return True
else:
print(f'****{user_id} 用户添加失败 {response.json()}*****')
return False
def add_members_to_list(self, list_id, member_id_list):
"""
批量添加成员到list页
list_id: list页的ID
member_id_list:[123456, 890400] user_id 列表清单
"""
member_id_list_lenght = len(member_id_list)
print(f'共有{member_id_list_lenght}个KOL账号要添加')
task_done_status = False
success_amount = 0
for member_id in member_id_list:
# 如果添加成功,那么则返回 list_id 否则返回None
added_status = self.add_member_to_list(list_id, member_id)
# 添加失败则休息60秒
if added_status:
task_done_status = True
success_amount += 1
else:
print('等待60s')
time.sleep(60)
if task_done_status:
print(f'全部任务 {member_id_list_lenght} 执行完毕,{success_amount} 个账号成功被添加到list页:{list_id}')
else:
print(f'任务执行失败,{list_id} list页无法完成任何一个KOL的添加:{member_id_list}')
return task_done_status
def split_user_name_list_job(self, user_name_list, threshold, jobs, list_id):
"""
如果KOL账号清单数量太多,将其切分成多个小任务,使其能够被特定的list页容纳消耗,不至于超过5000的上限
user_name_list:待切割的KOL user_name列表
threshold:目标list页,还能够容纳的members上限
jobs: 各个被切割的小任务的列表
list_id:目标list页的ID
"""
# while True:
# 如果需要添加的KOL账号清单的数量超过了当前list也得能够承载的上限,那么就做截断切割操作
job = {
'user_name_list': user_name_list[:threshold],
'list_id': list_id
}
jobs.append(job)
# 切割过后剩余的user_name列表,让其余的list页来消耗
user_name_list_new = user_name_list[threshold:]
return user_name_list_new
def ditribute_task_to_different_list_page(self, user_name_list):
"""
根据user_name列表创建list页
将user_name_list 任务切割分配到不同的list页去完成添加任务
return: [{
'user_name_list': user_name_list[:threshold],
'list_id': list_id
}] 任务清单 jobs
"""
jobs = []
# 判断这批任务user_name_list 是否能被已经存在的list页完全消化
finished_by_existed_listpage = False
# 从redis中获取全部的list页信息
all_list_page_id_info = self.rc.hgetall(self.listID_amount_mapping)
if all_list_page_id_info:
for list_id in all_list_page_id_info:
list_memebers_amount = all_list_page_id_info[list_id]
# list页成员数量上限是5000
if list_memebers_amount < 5000:
# 还可以添加的数量
added_avalible_in_future_amount = 5000 - list_memebers_amount -1
# 创建任务
if len(user_name_list) <= added_avalible_in_future_amount:
# 如果需要添加的KOL账号清单的数量没有超过当前获取到的list页ID的能力上线
job = {
'user_name_list': user_name_list,
'list_id': list_id
}
jobs.append(job)
finished_by_existed_listpage = True
# 当前已存在的list页就完全能够容纳这一批的任务量,所以跳出
break
else:
# 如果需要添加的KOL账号清单的数量超过了当前list也得能够承载的上限,那么就做截断切割操作
# 切割user_name列表将塞满目标list页的容量上限,然后返回剩余的没法被处理的user_name列表
# 返回值会进入下一次循环来处理
user_name_list = self.split_user_name_list_job(
user_name_list,
added_avalible_in_future_amount,
jobs,
list_id
)
# 当已经存在的list页无法完全消耗当前的 user_name_list 时,新建新的list页
if not finished_by_existed_listpage:
while True:
start_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
# 土狗memeKOL name: may only be 25 characters long
list_name = f'土狗memeKOL({start_time})'
list_id = self.create_list(name='土狗memeKOL', description=list_name)
if len(user_name_list) > 4999:
user_name_list = self.split_user_name_list_job(
user_name_list,
4999,
jobs,
list_id
)
else:
# 如果需要添加的KOL账号清单的数量没有超过当前list页的 能力上线
job = {
'user_name_list': user_name_list,
'list_id': list_id
}
jobs.append(job)
break
return jobs
def gen_user_id(self, user_name):
"""
根据user_name获取user_id
"""
# 试图从redis查询user_id
user_id = self.rc.hget(self.user_name_to_id_mapping, user_name)
# 查询不到,则从官方接口拿
if not user_id:
user_id = self.find_user_id(user_name) # 查询username对应的user_id
if user_id:
# 缓存映射关系
self.rc.hset(self.user_name_to_id_mapping, user_name, user_id)
else:
print(f'****{user_name} user_id查找失败****')
return None
return user_id
def all_task_scheduler(self, user_name_list):
"""
执行每一个切割后的任务
"""
# 获取所有被切割分配过得任务list
jobs = self.ditribute_task_to_different_list_page(user_name_list)
print(f'jobs 清单:{jobs}')
list_id_list = []
# 遍历每一个任务,让对应的list页添加对应的KOL清单
for job in jobs:
user_id_list = []
user_name_list = job['user_name_list']
for user_name in user_name_list:
if self.rc.sismember(self.filter_key, user_name):
print(f'****{user_name} 账号已被注销,跳过...****')
continue
user_id = self.gen_user_id(user_name)
if not user_id:
continue
if self.rc.sismember(self.filter_key, user_id):
print(f'****{user_name} 账号已被覆盖监控,无需重复操作****')
continue
user_id_list.append(user_id)
list_id = job['job']
# 批量添加KOL清单到指定的list页
task_done_status = self.add_members_to_list(list_id, user_id_list)
if task_done_status:
list_id_list.append(list_id)
print(f'所有任务执行完,得到最终 list 页的ID清单:{list_id_list}')
return list_id_list
def run(self, user_name_list):
"""
用user_name的清单作为任务输入,自动查询已经创建过的list页且尚未达到5000阈值的,继续添加成员,若达到阈值则创建新的list页来完成目标任务
直到所有user_name被全部添加完成,返回最终的list ID清单
"""
# 对历史数据进行处理
history_ids = [
"1918225637397143707",
"1918221677894525275",
"1918246506534191436"
]
all_list_page_id_info = self.rc.hgetall(self.listID_amount_mapping)
print(f'all_list_page_id_info: {all_list_page_id_info}')
for id in history_ids:
if id not in all_list_page_id_info:
self.rc.hset(self.listID_amount_mapping, id, 0)
print(f'帖子ID:{id} 刷入redis')
# 批量添加user_name 到list页
self.all_task_scheduler(user_name_list)