首页 > 数据库 >一文读懂分布式爬虫利器Scrapy-Redis:源码解析、队列管理与去重策略

一文读懂分布式爬虫利器Scrapy-Redis:源码解析、队列管理与去重策略

时间:2024-08-12 16:27:37浏览次数:12  
标签:Redis self request spider server Scrapy 源码 key def

分布式利器 Scrapy-Redis 原理

Scrapy-Redis 库已经为我们提供了 Scrapy 分布式的队列、调度器、去重等功能,其 GitHub 地址为: https://github.com/rmax/scrapy-redis。
本节课我们深入掌握利用 Redis 实现 Scrapy 分布式的方法,并深入了解 Scrapy-Redis 的原理。

1. 获取源码

可以把源码克隆下来,执行如下命令:
git clone https://github.com/rmax/scrapy-redis.git
核心源码在 scrapy-redis/src/scrapy_redis 目录下。

2. 爬取队列

我们从爬取队列入手,来看看它的具体实现。源码文件为 queue.py,它包含了三个队列的实现,首先它实现了一个父类 Base,提供一些基本方法和属性,如下所示:

class Base(object): 
    """Per-spider base queue class""" 
    def __init__(self, server, spider, key, serializer=None): 
        if serializer is None: 
            serializer = picklecompat 
        if not hasattr(serializer, 'loads'): 
            raise TypeError("serializer does not implement 'loads' function: % r" 
                            % serializer) 
        if not hasattr(serializer, 'dumps'): 
            raise TypeError("serializer '% s' does not implement 'dumps' function: % r" 
                            % serializer) 
        self.server = server 
        self.spider = spider 
        self.key = key % {'spider': spider.name} 
        self.serializer = serializer 
​ 
    def _encode_request(self, request): 
        obj = request_to_dict(request, self.spider) 
        return self.serializer.dumps(obj) 
​ 
    def _decode_request(self, encoded_request): 
        obj = self.serializer.loads(encoded_request) 
        return request_from_dict(obj, self.spider) 
​ 
    def __len__(self): 
        """Return the length of the queue""" 
        raise NotImplementedError 
​ 
    def push(self, request): 
        """Push a request""" 
        raise NotImplementedError 
​ 
    def pop(self, timeout=0): 
        """Pop a request""" 
        raise NotImplementedError 
​ 
    def clear(self): 
        """Clear queue/stack""" 
        self.server.delete(self.key) 

首先看一下 _encode_request 和 _decode_request 方法,因为我们需要把一个 Request 对象存储到数据库中,但数据库无法直接存储对象,所以需要将 Request 序列转化成字符串再存储。

而这两个方法分别是序列化和反序列化的操作,利用 pickle 库来实现,一般在调用 push 将 Request 存入数据库时会调用 _encode_request 方法进行序列化,在调用 pop 取出 Request 的时候会调用 _decode_request 进行反序列化。

在父类中 len、push 和 pop 方法都是未实现的,会直接抛出 NotImplementedError,因此是不能直接使用这个类的,必须实现一个子类来重写这三个方法,而不同的子类就会有不同的实现,也就有着不同的功能。

接下来我们就需要定义一些子类来继承 Base 类,并重写这几个方法,那在源码中就有三个子类的实现,它们分别是 FifoQueue、PriorityQueue、LifoQueue,我们分别来看下它们的实现原理。

首先是 FifoQueue:

class FifoQueue(Base): 
    """Per-spider FIFO queue""" 
​ 
    def __len__(self): 
        """Return the length of the queue""" 
        return self.server.llen(self.key) 
​ 
    def push(self, request): 
        """Push a request""" 
        self.server.lpush(self.key, self._encode_request(request)) 
​ 
    def pop(self, timeout=0): 
        """Pop a request""" 
        if timeout > 0: 
            data = self.server.brpop(self.key, timeout) 
            if isinstance(data, tuple): 
                data = data[1] 
        else: 
            data = self.server.rpop(self.key) 
        if data: 
            return self._decode_request(data) 

可以看到这个类继承了 Base 类,并重写了 len、push、pop 这三个方法,在这三个方法中都是对 server 对象的操作,而 server 对象就是一个 Redis 连接对象,我们可以直接调用其操作 Redis 的方法对数据库进行操作,可以看到这里的操作方法有 llen、lpush、rpop 等,这就代表此爬取队列是使用的 Redis 的列表。

序列化后的 Request 会被存入列表中,就是列表的其中一个元素,len 方法是获取列表的长度,push 方法中调用了 lpush 操作,这代表从列表左侧存入数据,pop 方法中调用了 rpop 操作,这代表从列表右侧取出数据。

所以 Request 在列表中的存取顺序是左侧进、右侧出,所以这是有序的进出,即先进先出,英文叫作 First Input First Output,也被简称为 FIFO,而此类的名称就叫作 FifoQueue。

另外还有一个与之相反的实现类,叫作 LifoQueue,实现如下

class LifoQueue(Base): 
    """Per-spider LIFO queue.""" 
​ 
    def __len__(self): 
        """Return the length of the stack""" 
        return self.server.llen(self.key) 
​ 
    def push(self, request): 
        """Push a request""" 
        self.server.lpush(self.key, self._encode_request(request)) 
​ 
    def pop(self, timeout=0): 
        """Pop a request""" 
        if timeout > 0: 
            data = self.server.blpop(self.key, timeout) 
            if isinstance(data, tuple): 
                data = data[1] 
        else: 
            data = self.server.lpop(self.key) 
​ 
        if data: 
            return self._decode_request(data) 

与 FifoQueue 不同的就是它的 pop 方法,在这里使用的是 lpop 操作,也就是从左侧出,而 push 方法依然是使用的 lpush 操作,是从左侧入。

那么这样达到的效果就是先进后出、后进先出,英文叫作 Last In First Out,简称为 LIFO,而此类名称就叫作 LifoQueue。同时这个存取方式类似栈的操作,所以其实也可以称作 StackQueue。

另外在源码中还有一个子类实现,叫作 PriorityQueue,顾名思义,它叫作优先级队列,实现如下:

class PriorityQueue(Base): 
    """Per-spider priority queue abstraction using redis' sorted set""" 
​ 
    def __len__(self): 
        """Return the length of the queue""" 
        return self.server.zcard(self.key) 
​ 
    def push(self, request): 
        """Push a request""" 
        data = self._encode_request(request) 
        score = -request.priority 
        self.server.execute_command('ZADD', self.key, score, data) 
​ 
    def pop(self, timeout=0): 
        """ 
        Pop a request 
        timeout not support in this queue class 
        """ 
        pipe = self.server.pipeline() 
        pipe.multi() 
        pipe.zrange(self.key, 0, 0).zremrangebyrank(self.key, 0, 0) 
        results, count = pipe.execute() 
        if results: 
            return self._decode_request(results[0]) 

在这里我们可以看到 len、push、pop 方法中使用了 server 对象的 zcard、zadd、zrange 操作,可以知道这里使用的存储结果是有序集合 Sorted Set,在这个集合中每个元素都可以设置一个分数,那么这个分数就代表优先级。

在 len 方法里调用了 zcard 操作,返回的就是有序集合的大小,也就是爬取队列的长度,在 push 方法中调用了 zadd 操作,就是向集合中添加元素,这里的分数指定成 Request 的优先级的相反数,因为分数低的会排在集合的前面,所以这里高优先级的 Request 就会存在集合的最前面。

pop 方法是首先调用了 zrange 操作取出了集合的第一个元素,因为最高优先级的 Request 会存在集合最前面,所以第一个元素就是最高优先级的 Request,然后再调用 zremrangebyrank 操作将这个元素删除,这样就完成了取出并删除的操作。

此队列是默认使用的队列,也就是爬取队列默认是使用有序集合来存储的。

3.去重过滤

前面说过 Scrapy 的去重是利用集合来实现的,而在 Scrapy 分布式中的去重就需要利用共享的集合,那么这里使用的就是 Redis 中的集合数据结构。我们来看看去重类是怎样实现的,源码文件是 dupefilter.py,其内实现了一个 RFPDupeFilter 类,如下所示:

class RFPDupeFilter(BaseDupeFilter): 
    """Redis-based request duplicates filter. 
    This class can also be used with default Scrapy's scheduler. 
    """ 
    logger = logger 
    def __init__(self, server, key, debug=False): 
        """Initialize the duplicates filter. 
        Parameters 
        ---------- 
        server : redis.StrictRedis 
            The redis server instance. 
        key : str 
            Redis key Where to store fingerprints. 
        debug : bool, optional 
            Whether to log filtered requests. 
        """ 
        self.server = server 
        self.key = key 
        self.debug = debug 
        self.logdupes = True 
​ 
    @classmethod 
    def from_settings(cls, settings): 
        """Returns an instance from given settings. 
        This uses by default the key ``dupefilter:<timestamp>``. When using the 
        ``scrapy_redis.scheduler.Scheduler`` class, this method is not used as 
        it needs to pass the spider name in the key. 
        Parameters 
        ---------- 
        settings : scrapy.settings.Settings 
        Returns 
        ------- 
        RFPDupeFilter 
            A RFPDupeFilter instance. 
        """ 
        server = get_redis_from_settings(settings) 
        key = defaults.DUPEFILTER_KEY % {'timestamp': int(time.time())} 
        debug = settings.getbool('DUPEFILTER_DEBUG') 
        return cls(server, key=key, debug=debug) 
​ 
    @classmethod 
    def from_crawler(cls, crawler): 
        """Returns instance from crawler. 
        Parameters 
        ---------- 
        crawler : scrapy.crawler.Crawler 
        Returns 
        ------- 
        RFPDupeFilter 
            Instance of RFPDupeFilter. 
        """ 
        return cls.from_settings(crawler.settings) 
​ 
    def request_seen(self, request): 
        """Returns True if request was already seen. 
        Parameters 
        ---------- 
        request : scrapy.http.Request 
        Returns 
        ------- 
        bool 
        """ 
        fp = self.request_fingerprint(request) 
        added = self.server.sadd(self.key, fp) 
        return added == 0 
​ 
    def request_fingerprint(self, request): 
        """Returns a fingerprint for a given request. 
        Parameters 
        ---------- 
        request : scrapy.http.Request 
​ 
        Returns 
        ------- 
        str 
​ 
        """ 
        return request_fingerprint(request) 
​ 
    def close(self, reason=''): 
        """Delete data on close. Called by Scrapy's scheduler. 
        Parameters 
        ---------- 
        reason : str, optional 
        """ 
        self.clear() 
​ 
    def clear(self): 
        """Clears fingerprints data.""" 
        self.server.delete(self.key) 
​ 
    def log(self, request, spider): 
        """Logs given request. 
        Parameters 
        ---------- 
        request : scrapy.http.Request 
        spider : scrapy.spiders.Spider 
        """ 
        if self.debug: 
            msg = "Filtered duplicate request: %(request) s" 
            self.logger.debug(msg, {'request': request}, extra={'spider': spider}) 
        elif self.logdupes: 
            msg = ("Filtered duplicate request %(request) s" 
                   "- no more duplicates will be shown" 
                   "(see DUPEFILTER_DEBUG to show all duplicates)") 
            self.logger.debug(msg, {'request': request}, extra={'spider': spider}) 
            self.logdupes = False 

这里同样实现了一个 request_seen 方法,和 Scrapy 中的 request_seen 方法实现极其类似。不过这里集合使用的是 server 对象的 sadd 操作,也就是集合不再是一个简单数据结构了,而是直接换成了数据库的存储方式。

鉴别重复的方式还是使用指纹,指纹同样是依靠 request_fingerprint 方法来获取的。获取指纹之后就直接向集合添加指纹,如果添加成功,说明这个指纹原本不存在于集合中,返回值 1。代码中最后的返回结果是判定添加结果是否为 0,如果刚才的返回值为 1,那这个判定结果就是 False,也就是不重复,否则判定为重复。

这样我们就成功利用 Redis 的集合完成了指纹的记录和重复的验证。

4.调度器

Scrapy-Redis 还帮我们实现了配合 Queue、DupeFilter 使用的调度器 Scheduler,源文件名称是 scheduler.py。我们可以指定一些配置,如 SCHEDULER_FLUSH_ON_START 即是否在爬取开始的时候清空爬取队列,SCHEDULER_PERSIST 即是否在爬取结束后保持爬取队列不清除。我们可以在 settings.py 里自由配置,而此调度器很好地实现了对接。

接下来我们看看两个核心的存取方法,实现如下所示:

def enqueue_request(self, request): 
    if not request.dont_filter and self.df.request_seen(request): 
        self.df.log(request, self.spider) 
        return False 
    if self.stats: 
        self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider) 
    self.queue.push(request) 
    return True 
​ 
def next_request(self): 
    block_pop_timeout = self.idle_before_close 
    request = self.queue.pop(block_pop_timeout) 
    if request and self.stats: 
        self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider) 
    return request 

enqueue_request 可以向队列中添加 Request,核心操作就是调用 Queue 的 push 操作,还有一些统计和日志操作。next_request 就是从队列中取 Request,核心操作就是调用 Queue 的 pop 操作,此时如果队列中还有 Request,则 Request 会直接取出来,爬取继续,否则如果队列为空,爬取则会重新开始。

5.总结

那么到现在为止我们就把之前所说的三个分布式的问题解决了,总结如下:

  • 爬取队列的实现,在这里提供了三种队列,使用了 队列、栈或Redis的有序集合来维护。

  • 去重的实现,使用了 Redis 的集合来保存 Request 的指纹以提供重复过滤。

  • 中断后重新爬取的实现,中断后 Redis 的队列没有清空,再次启动时调度器的 next_request 会从队列中取到下一个 Request,继续爬取。

更多精致内容

标签:Redis,self,request,spider,server,Scrapy,源码,key,def
From: https://www.cnblogs.com/CodeRealm/p/18355229

相关文章

  • 手把手教你实现Scrapy-Redis分布式爬虫:从配置到最终运行的实战指南
    1.scrapy-redis的环境准备pipinstallscrapy-redis安装完毕之后确保其可以正常导入使用即可。2.实现接下来我们只需要简单的几步操作就可以实现分布式爬虫的配置了。2.1修改Scheduler在前面的课时中我们讲解了Scheduler的概念,它是用来处理Request、Item等对象的调度......
  • Redis 实现简单排行榜功能 | 实战案例
    一、业务场景口算小程序,用户完成口算并获得满分,根据耗时长短进行rank排名,耗时越短,排名越高。主要有以下功能:1.用户数据Mysql与Redis同步:使用一个redishash用来保存用户基本信息,field为userId,value为用户基础数据(本案例为昵称);用户修改昵称时,同步更新hash中对应userId的nickn......
  • Java Reentrantlock可重入锁原理 | 源码探究
    一、基本概念ReentrantLock是Java中提供的一个可重入互斥锁,它是java.util.concurrent.locks包中的一个接口Lock的实现类。ReentrantLock提供了比使用synchronized关键字更强大的锁定机制,例如 公平锁 和 非公平锁 选择、尝试锁定、可中断锁定等。ReentrantLock......
  • Python Redis Stream【生产者=》消费者模式】
    1importredis2importtime3fromtypingimportDict,List,Tuple,Any,Optional45fromconfig.modelimportsettings6frompydanticimportBaseModel789classStreamMessage(BaseModel):10message_id:str11message_da......
  • JSP广州中小学学校信息管理系统_j3o8r(程序+源码+数据库+调试部署+开发环境)
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表系统功能:用户,区域分类,招生信息,机构活动,成绩排名,获奖排名,社团排名,特色排名开题报告内容一、项目背景与意义随着教育信息化步伐的加快,广州作为教育强市,对......
  • SpringSecurity+前端项目+redis完成认证授权的代码
    1.前端准备工作--都在全局main.js页面中设置的1.1.创建Vue工程后,并导入elementui和axios,添加连接后端项目的路径,把axios挂载到Vue1.2.前置路由守卫(所有路由跳转前核实一下身份)//前置路由守卫--所有的路由跳转都先经过这里//to:即将要访问的路径from:从哪里来......
  • Centos7下安装redis
    一、安装redis第一步:下载redis安装包wgethttp://download.redis.io/releases/redis-4.0.6.tar.gz[root@iZwz991stxdwj560bfmadtZlocal]#wgethttp://download.redis.io/releases/redis-4.0.6.tar.gz--2017-12-1312:35:12--http://download.redis.io/releases/redis-4......
  • Springboot计算机毕业设计广金考研助力系统(程序+源码+数据库)
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表学生,学长,学校百科,考研全知道,学长学姐说,身份认证,分类,寻研友,每日打卡开题报告内容一、研究背景与意义1.1研究背景随着社会的快速发展和高等教育的普及......
  • Springboot计算机毕业设计古典文学阅读网站(程序+源码+数据库+调试部署+开发环境)
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表用户,作品信息,作品类型,心得分享开题报告内容一、研究背景与意义1.研究背景随着互联网技术的迅猛发展,人们获取信息的方式发生了巨大变化。传统的纸质书籍逐......
  • 抖音矩阵系统源码搭建,矩阵系统贴牌,矩阵工具开源
    在当今的社交媒体时代,抖音的影响力日益增强。对于许多开发者和企业来说,搭建一个抖音矩阵系统源码具有重要的战略意义。本文将为您详细介绍抖音矩阵系统源码搭建的全过程。今天,抖去推矩阵系统通过为商家提供矩阵管理、内容创作、视频生产、数据统计、等一站式SaaS解决方案。解......