Scrapy2.12.0通过extension来实现定时调度

import time import logging from datetime import datetime from croniter import croniter from scrapy import signals from scrapy.exceptions import NotConfigured from scrapy.exceptions import DontCloseSpider from utils.redisdb import redis_cli from config import env logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger('Extension') logger.info(f'extensions env: {env}') # 原始配置参考https://scrapy-chs.readthedocs.io/zh_CN/0.24/topics/extensions.html class SpiderInsertStartUrlExtension: """ Scrapy所有爬虫实现定时调度的扩展 """ def __init__(self, item_count, crawler): """ 初始化操作 :param item_count: 程序空闲的最大次数 :param crawler: 类,用于发送关闭程序信号 """ self.crawler = crawler self.count = 0 # 统计空闲次数 self.conn = redis_cli() @classmethod def from_crawler(cls, crawler): """ 必须方法 """ # 判断是否启用扩展 if not crawler.settings.getbool('MYEXT_ENABLED'): raise NotConfigured # 每隔5个item 输出当前采集的数量 item_count = crawler.settings.getint('MYEXT_ITEMCOUNT', 5) ext = cls(item_count, crawler) crawler.signals.connect(ext.spider_opened, signal=signals.spider_opened) crawler.signals.connect(ext.spider_closed, signal=signals.spider_closed) crawler.signals.connect(ext.spider_idle, signal=signals.spider_idle) # 加载空闲信号 return ext # def cron_judgement(self, spider): # """ # 定时调度,比如在每日的8:00整调度,支持在spider中定义crontab的语法给变量 # 比如;每日0点1分启动:cron_job = "1 0 * * *" # 比如:每天的 8:30、10:30、18:30、20:30 和 22:30 触发任务 cron_job = "30 8,10,18,20,22 * * *" # :return: True or False True则执行 # """ # # crontab的语法规则,该语法只约束启动时间,具体下次调度还得结合 schedule_time # # 如果需要定时调度,则执行if语句 # cron_job = hasattr(spider,'cron_job') # if cron_job: # cron_job = spider.cron_job.split(' ') # minute = cron_job[0] # hour = cron_job[1] # day = cron_job[2] # month = cron_job[3] # week = cron_job[4] # now_minute = str(datetime.now().minute) # now_hour = str(datetime.now().hour) # now_day = str(datetime.now().day) # now_month = str(datetime.now().month) # now_week = str(datetime.now().weekday() + 1) # if minute == '*': # minute = now_minute # if hour == '*': # hour = now_hour # if day == '*': # day = now_day # if month == '*': # month = now_month # if week == '*': # week = now_week # if (minute==now_minute)and(hour==now_hour)and(day==now_day)and(month==now_month)and(week==now_week): # # 当cronjob定义的最小小单位为分钟级别时;爬虫可能在5秒跑完,那么1分钟内爬虫将有12次满足条件的情况,爬虫将被重复拉起12次 # # 为避免次情况的发生,通过redis key 在短时间内做去重 # if not self.conn.get(f"{spider.name}:spider_opened"): # self.conn.setex(f"{spider.name}:spider_opened",5*60,1) # return True # else: # logger.info(f'{cron_job} 已经开始执行,由于爬虫速度过快,导致条件再次满足,故做去重处理') # return False # else: # logger.info(f'等待开始调度的时间 {cron_job}...') # return False # else: # return False def cron_judgement(self, spider): """ 定时调度,比如在每日的8:00整调度,支持在spider中定义crontab的语法给变量 比如:每日0点1分启动:cron_job = "1 0 * * *" 比如:每天的 8:30、10:30、18:30、20:30 和 22:30 触发任务 cron_job = "30 8,10,18,20,22 * * *" :return: True or False True则执行 目前最细颗粒度只支持分钟级别的 """ # crontab的语法规则,该语法只约束启动时间 # 不存在该类属性说明不需要cron定时调度 cron_job_exist = hasattr(spider,'cron_job') if not cron_job_exist: return # 拿到crontab的语法字符串 cron_expr = spider.cron_job base_time = datetime.now() # 解析成为datatime对象 cron = croniter(cron_expr, base_time) # 校验当前时间是否匹配上了定时的时间规则 now = datetime.now() # 获取最近的执行时间 # (当时间刚好到达预期的调度时间的时候, # previous_run值将会刷新为当前的调度时间; # 如果没有到达目标的执行时间的时候, # previous_run始终得到的是上一次调度的时间) # base_time 刚刚到达目标调度时间的时候,那么previous_run也会刷新为该时间,此时时间上吻合,即可激活调度 previous_run = cron.get_prev(datetime) # 只能处理分钟级别的精确度(忽略秒和微秒) if now.replace(second=0, microsecond=0) == previous_run.replace(second=0, microsecond=0): # 当cronjob定义的最小小单位为分钟级别时;爬虫可能在5秒跑完,那么1分钟内爬虫将有12次满足条件的情况,爬虫将被重复拉起12次 # 为避免次情况的发生,通过redis key 在短时间内做去重 if not self.conn.get(f"{spider.name}:spider_opened"): self.conn.setex(f"{spider.name}:spider_opened",5*60,1) return True else: logger.info(f'{cron_expr} 已经开始执行,由于爬虫速度过快,导致条件再次满足,故做去重处理') return False else: lo

Mar 20, 2025 - 04:05
 0
Scrapy2.12.0通过extension来实现定时调度
import time
import logging
from datetime import datetime
from croniter import croniter
from scrapy import signals
from scrapy.exceptions import NotConfigured
from scrapy.exceptions import DontCloseSpider
from utils.redisdb import redis_cli
from config import env

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('Extension')
logger.info(f'extensions env: {env}')

# 原始配置参考https://scrapy-chs.readthedocs.io/zh_CN/0.24/topics/extensions.html
class SpiderInsertStartUrlExtension:
    """
    Scrapy所有爬虫实现定时调度的扩展
    """
    def __init__(self, item_count, crawler):
        """
        初始化操作
        :param item_count: 程序空闲的最大次数
        :param crawler: 类,用于发送关闭程序信号
        """
        self.crawler = crawler
        self.count = 0      # 统计空闲次数
        self.conn = redis_cli()

    @classmethod
    def from_crawler(cls, crawler):
        """
        必须方法
        """
        # 判断是否启用扩展
        if not crawler.settings.getbool('MYEXT_ENABLED'):
            raise NotConfigured
        # 每隔5个item 输出当前采集的数量
        item_count = crawler.settings.getint('MYEXT_ITEMCOUNT', 5)
        ext = cls(item_count, crawler)
        crawler.signals.connect(ext.spider_opened, signal=signals.spider_opened)
        crawler.signals.connect(ext.spider_closed, signal=signals.spider_closed)
        crawler.signals.connect(ext.spider_idle, signal=signals.spider_idle)  # 加载空闲信号
        return ext

    # def cron_judgement(self, spider):
    #     """
    #     定时调度,比如在每日的8:00整调度,支持在spider中定义crontab的语法给变量
    #     比如;每日0点1分启动:cron_job = "1 0 * * *"
    #     比如:每天的 8:30、10:30、18:30、20:30 和 22:30 触发任务 cron_job = "30 8,10,18,20,22 * * *"
    #     :return: True or False True则执行
    #     """
    #     # crontab的语法规则,该语法只约束启动时间,具体下次调度还得结合 schedule_time
    #     # 如果需要定时调度,则执行if语句
    #     cron_job = hasattr(spider,'cron_job')
    #     if cron_job:
    #         cron_job = spider.cron_job.split(' ')
    #         minute = cron_job[0]
    #         hour = cron_job[1]
    #         day = cron_job[2]
    #         month = cron_job[3]
    #         week = cron_job[4]
    #         now_minute = str(datetime.now().minute)
    #         now_hour = str(datetime.now().hour)
    #         now_day = str(datetime.now().day)
    #         now_month = str(datetime.now().month)
    #         now_week = str(datetime.now().weekday() + 1)
    #         if minute == '*':
    #             minute = now_minute
    #         if hour == '*':
    #             hour = now_hour
    #         if day == '*':
    #             day = now_day
    #         if month == '*':
    #             month = now_month
    #         if week == '*':
    #             week = now_week
    #         if (minute==now_minute)and(hour==now_hour)and(day==now_day)and(month==now_month)and(week==now_week):
    #             # 当cronjob定义的最小小单位为分钟级别时;爬虫可能在5秒跑完,那么1分钟内爬虫将有12次满足条件的情况,爬虫将被重复拉起12次
    #             # 为避免次情况的发生,通过redis key 在短时间内做去重
    #             if not self.conn.get(f"{spider.name}:spider_opened"):
    #                 self.conn.setex(f"{spider.name}:spider_opened",5*60,1)
    #                 return True
    #             else:
    #                 logger.info(f'{cron_job} 已经开始执行,由于爬虫速度过快,导致条件再次满足,故做去重处理')
    #                 return False
    #         else:
    #             logger.info(f'等待开始调度的时间 {cron_job}...')
    #             return False
    #     else:
    #         return False

    def cron_judgement(self, spider):
        """
        定时调度,比如在每日的8:00整调度,支持在spider中定义crontab的语法给变量
        比如:每日0点1分启动:cron_job = "1 0 * * *"
        比如:每天的 8:30、10:30、18:30、20:30 和 22:30 触发任务 cron_job = "30 8,10,18,20,22 * * *"
        :return: True or False True则执行
        目前最细颗粒度只支持分钟级别的
        """
        # crontab的语法规则,该语法只约束启动时间
        # 不存在该类属性说明不需要cron定时调度
        cron_job_exist = hasattr(spider,'cron_job')
        if not cron_job_exist:
            return
        # 拿到crontab的语法字符串
        cron_expr = spider.cron_job
        base_time = datetime.now()
        # 解析成为datatime对象
        cron = croniter(cron_expr, base_time)
        # 校验当前时间是否匹配上了定时的时间规则
        now = datetime.now()
        # 获取最近的执行时间
        # (当时间刚好到达预期的调度时间的时候,
        # previous_run值将会刷新为当前的调度时间;
        # 如果没有到达目标的执行时间的时候,
        # previous_run始终得到的是上一次调度的时间)
        # base_time 刚刚到达目标调度时间的时候,那么previous_run也会刷新为该时间,此时时间上吻合,即可激活调度
        previous_run = cron.get_prev(datetime)
        # 只能处理分钟级别的精确度(忽略秒和微秒)
        if now.replace(second=0, microsecond=0) == previous_run.replace(second=0, microsecond=0):
            # 当cronjob定义的最小小单位为分钟级别时;爬虫可能在5秒跑完,那么1分钟内爬虫将有12次满足条件的情况,爬虫将被重复拉起12次
            # 为避免次情况的发生,通过redis key 在短时间内做去重
            if not self.conn.get(f"{spider.name}:spider_opened"):
                self.conn.setex(f"{spider.name}:spider_opened",5*60,1)
                return True
            else:
                logger.info(f'{cron_expr} 已经开始执行,由于爬虫速度过快,导致条件再次满足,故做去重处理')
                return False
        else:
            logger.info(f'等待开始调度的时间 {cron_expr} 上一次调度:{previous_run}...')
            return False

    def interval_time(self,spider):
        """
        根据间隔时间调度 比如每30分钟调度一次: schedule_time = 12 * 30
        :return: True or False 真则执行
        """
        # 存在cron调度则不需要此间隔调度
        cron_job = hasattr(spider,'cron_job')
        schedule_time = hasattr(spider,'schedule_time')

        if cron_job:
            return False
        if not schedule_time:
            return False
        # 空闲超过指定时间
        if self.count > spider.schedule_time:
            # 每次开始执行任务后,都初始化计数器,重新统计下一轮的空闲时长
            self.count = 0
            return True
        else:
            return False

    def spider_opened(self, spider):
        """
        必须方法
        只执行一次,爬虫启动的时候执行
        支持指定时间启动
        """
        # 爬虫首次判断是否需要定时调度
        # 判断是否需要定时调度
        # 如果需要定时调度,则执行if语句
        cron_job = hasattr(spider,'cron_job')
        logger.info(f'环境: {env}')
        logger.info(f'cronjob: {cron_job}')
        # 线上环境才执行cron coinsadjust
        if env == 'local':
            if cron_job:
                while True:
                    run = self.cron_judgement(spider)
                    if run:
                        break
                    else:
                        time.sleep(59)
        logger.info("opened spider %s" % spider.name)
        # 该变量为了让程序idle的时候输出上次调度的时间,以此判断上次调度是否正常
        self.started_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

    def insert_start_url(self, spider):
        """
        从start_requests类方法中 拿到爬虫任务,启动每一个任务
        """
        logger.info('从 start_request 类方法 直接生成任务...')
        for request in spider.start_requests():
            self.crawler.engine.crawl(request)

    def spider_closed(self, spider):
        """
        必须方法
        """
        logger.info("closed spider %s" % spider.name)
        self.crawler.engine.close_spider(spider, '爬虫已经采集完毕')

    def spider_idle(self, spider):
        """
        记录信息,作出关闭选择
        框架默认5秒执行一次spider_idle
        """
        logger.info(f'{spider.name} Idle 爬虫上次启动时间为 {self.started_time}')
        self.count += 1
        self.spider_run(spider)
        raise DontCloseSpider

    def spider_run(self,spider):
        """
        激活调度爬虫
        需要根据条件判断是否激活
        """
        # 常规来讲,调度方式二选一
        cron = self.cron_judgement(spider)
        interval = self.interval_time(spider)
        if cron or interval:
            # 启动爬虫
            self.insert_start_url(spider)
            # 启动时间计入redis  方便分布式协调任务
            started_time_stamp = datetime.strptime(self.started_time, "%Y-%m-%d %H:%M:%S").timestamp()
            self.conn.set(f'{spider.name}:starttime', started_time_stamp)
            self.started_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")