自定义low版Scrapy框架:
1 from twisted.internet import reactor #事件循环(终止条件,所有的socket都已经移除) 2 from twisted.web.client import getPage #socket对象(如果下载完成..自动从事件循环中移除) 3 from twisted.internet import defer #defer.Deferred 特殊的socket对象(不发送请求,手动从事件循环移除) 4 5 class Request(object): 6 7 def __init__(self,url,callback): 8 self.url = url 9 self.callback = callback 10 11 #自定义response 12 class HttpResponse(object): 13 14 def __init__(self,content,request): 15 self.content = content 16 self.request = request 17 self.url = request.url 18 self.text = str(content,encoding='utf-8') 19 20 class ChoutiSpider(object): 21 name = 'chouti' 22 23 def start_requests(self): 24 start_url = ['http://www.baidu.com','http://www.bing.com'] 25 for url in start_url: 26 yield Request(url,self.parse) 27 28 def parse(self,response): 29 print(response) #response是下载的页面 30 # yield Request('http://www.cnblogs.com') 31 #1. crawling移除 32 #2. 获取parse yield值 33 #3. 再次去队列中获取 34 yield Request(url='http://www.cnblogs.com',callback=self.parse) 35 36 #队列 37 import queue 38 Q = queue.Queue() 39 40 class Engine(object): 41 42 def __init__(self): 43 self._close = None 44 self.max = 5 #最大并发数 45 self.crawling = [] #已经发请求但未接收到响应下载的数量 46 47 #自定义回调函数parse 48 def get_response_callback(self,content,request): 49 """ 50 调用用户spider中定义的parse方法,并将新请求添加到调度器中 51 :param content: 52 :param request: 53 :return: 54 """ 55 self.crawling.remove(request) #下载完成,移除请求 56 resp = HttpResponse(content,request) #将content,request封装成类,自定义response 57 result = request.callback(resp) #执行回调parse方法 58 print(result) 59 import types 60 if isinstance(result,types.GeneratorType): #判断函数返回是否是生成器 61 for req in result: 62 Q.put(req) #将parse返回的生成器请求放入队列中 63 64 65 def _next_request(self): 66 """ 67 去取request对象,并发送请求 68 :return: 69 """ 70 #若队列内和正在请求得数量都为0时,进行终止 71 if Q.qsize() == 0 and len(self.crawling) == 0: 72 self._close.callback(None) # 终止defer.Deferred() 73 return 74 75 if len(self.crawling) >= self.max: #大于 76 return 77 while len(self.crawling) < self.max: #小于,则从队列一直取 78 try: 79 req = Q.get(block=False) #加上block = False,表示不等待,没有即报错 80 self.crawling.append(req) 81 d = getPage(req.url.encode('utf-8')) #创建socket对象 82 #页面下载完成,调用get_response_callback 83 d.addCallback(self.get_response_callback,req) #响应后执行回调函数 84 # d.addCallback(self._next_request) #再次取request,发送请求 85 d.addCallback(lambda _:reactor.callLater(0,self._next_request)) #效果与上面一样 86 except Exception as e: 87 return 88 89 @defer.inlineCallbacks 90 def crawl(self,spider): 91 #将初始request对象添加到调度器 92 start_requests = iter(spider.start_requests()) 93 while True: 94 try: 95 request = next(start_requests) 96 Q.put(request) #把请求放入队列 97 except StopIteration as e: 98 break #没有,则中断循环 99 100 #反复去调度器中取任务,发送请求下载 101 # self._next_request() 102 reactor.callLater(0, self._next_request) #效果与上面一样,只是是事件循环来调用 103 104 #hang住事件循环 105 self.close = defer.Deferred() 106 yield self._close 107 108 109 _active = set() 110 engine = Engine() 111 112 spider = ChoutiSpider() 113 d = engine.crawl(spider) 114 _active.add(d) 115 116 117 dd = defer.DeferredList(_active) 118 dd.addBoth(lambda a:reactor.stop()) 119 120 reactor.run()
自定义小型Scrapy框架
1 from twisted.internet import reactor # 事件循环(终止条件,所有的socket都已经移除) 2 from twisted.web.client import getPage # socket对象(如果下载完成,自动从时间循环中移除...) 3 from twisted.internet import defer # defer.Deferred 特殊的socket对象 (不会发请求,手动移除) 4 from queue import Queue 5 6 class Request(object): 7 """ 8 用于封装用户请求相关信息 9 """ 10 def __init__(self,url,callback): 11 self.url = url 12 self.callback = callback 13 14 class HttpResponse(object): 15 16 def __init__(self,content,request): 17 self.content = content 18 self.request = request 19 20 class Scheduler(object): 21 """ 22 任务调度器 23 """ 24 def __init__(self): 25 self.q = Queue() 26 27 def open(self): 28 pass 29 30 def next_request(self): #从队列中获取req 31 try: 32 req = self.q.get(block=False) 33 except Exception as e: 34 req = None 35 return req 36 37 def enqueue_request(self,req): #将req放入对垒中 38 self.q.put(req) 39 40 def size(self): #获取队列中req的数量 41 return self.q.qsize() 42 43 class ExecutionEngine(object): 44 """ 45 引擎:所有调度 46 """ 47 def __init__(self): 48 self._close = None 49 self.scheduler = None 50 self.max = 5 51 self.crawlling = [] 52 def get_response_callback(self,content,request): #处理请求 53 self.crawlling.remove(request) 54 response = HttpResponse(content,request) 55 result = request.callback(response) 56 import types 57 if isinstance(result,types.GeneratorType): 58 for req in result: 59 self.scheduler.enqueue_request(req) 60 61 def _next_request(self): 62 if self.scheduler.size() == 0 and len(self.crawlling) == 0: 63 self._close.callback(None) 64 return 65 66 while len(self.crawlling) < self.max: 67 req = self.scheduler.next_request() #6.1引擎从调度器获取req 68 if not req: 69 return 70 self.crawlling.append(req) #6.2添加到正在请求未响应的列表,计数实现最大并发数 71 d = getPage(req.url.encode('utf-8')) 72 d.addCallback(self.get_response_callback,req) #6.3响应后调用callback函数 73 d.addCallback(lambda _:reactor.callLater(0,self._next_request)) #6.4循环请求 74 75 @defer.inlineCallbacks 76 def open_spider(self,start_requests): 77 self.scheduler = Scheduler() 78 yield self.scheduler.open() 79 while True: 80 try: 81 req = next(start_requests) 82 except StopIteration as e: 83 break 84 self.scheduler.enqueue_request(req) #5.1引擎将req请求给调度器放入队列 85 reactor.callLater(0,self._next_request) #5.2向调度器获取req请求 86 87 @defer.inlineCallbacks 88 def start(self): 89 self._close = defer.Deferred() #创建Deffered对象,将事件循环挂起 90 yield self._close 91 92 class Crawler(object): 93 """ 94 用户封装调度器以及引擎的... 95 """ 96 def _create_engine(self): #4.1 97 return ExecutionEngine() 98 99 def _create_spider(self,spider_cls_path): #4.2 100 """ 101 102 :param spider_cls_path: spider.chouti.ChoutiSpider 103 :return: 104 """ 105 module_path,cls_name = spider_cls_path.rsplit('.',maxsplit=1) 106 import importlib 107 m = importlib.import_module(module_path) 108 cls = getattr(m,cls_name) 109 return cls() 110 111 @defer.inlineCallbacks 112 def crawl(self,spider_cls_path): 113 engine = self._create_engine() #4.1 114 spider = self._create_spider(spider_cls_path) #4.2 115 start_requests = iter(spider.start_requests()) #4.3 116 yield engine.open_spider(start_requests) #引擎将req请求给调度器放入队列,获取request请求,创建正常的defer对象 117 yield engine.start() #创建defer.Deffered() 挂起 118 119 class CrawlerProcess(object): 120 """ 121 开启事件循环 122 """ 123 def __init__(self): 124 self._active = set() 125 126 def crawl(self,spider_cls_path): # 127 """ 128 :param spider_cls_path: 129 :return: 130 """ 131 crawler = Crawler() 132 d = crawler.crawl(spider_cls_path) #3.1创建引擎和爬虫,创建socket和defer.Deffered对象,并开始请求 133 self._active.add(d) #3.2将socket和defer.Deffered对象添加 134 135 def start(self): 136 dd = defer.DeferredList(self._active) #3.2监控是否完成 137 dd.addBoth(lambda _:reactor.stop()) 138 139 reactor.run() 140 141 class Commond(object): 142 143 def run(self): 144 crawl_process = CrawlerProcess() #2.1实例化CrawlerProcess,开始事件循环 145 spider_cls_path_list = ['spider.chouti.ChoutiSpider','spider.cnblogs.CnblogsSpider',] 146 for spider_cls_path in spider_cls_path_list: 147 crawl_process.crawl(spider_cls_path) #2.2执行CrawlerProcess的crawl方法 148 crawl_process.start() #2.3开启事件循环及监控关闭事件循环 149 150 151 if __name__ == '__main__': 152 #1.开始执行命令 153 cmd = Commond() 154 cmd.run()engine.py
1 from engine import Request 2 class ChoutiSpider(object): 3 4 name = 'chouti' 5 6 def start_requests(self): 7 start_url = ['http://www.baidu.com','http://www.bing.com',] 8 for url in start_url: 9 yield Request(url,self.parse) 10 11 def parse(self,response): 12 print(response) #response是下载的页面 13 yield Request('http://www.cnblogs.com',callback=self.parse)chouti.py
其他:
1 from twisted.internet import defer 2 from twisted.web.client import getPage 3 from twisted.internet import reactor 4 import threading 5 6 7 def _next_request(): 8 _next_request_from_scheduler() 9 10 11 def _next_request_from_scheduler(): 12 ret = getPage(bytes('http://www.chouti.com', encoding='utf8')) 13 ret.addCallback(callback) 14 ret.addCallback(lambda _: reactor.callLater(0, _next_request)) 15 16 17 _closewait = None 18 19 @defer.inlineCallbacks 20 def engine_start(): 21 global _closewait 22 _closewait = defer.Deferred() 23 yield _closewait 24 25 26 @defer.inlineCallbacks 27 def task(url): 28 reactor.callLater(0, _next_request) 29 yield engine_start() 30 31 32 counter = 0 33 def callback(arg): 34 global counter 35 counter +=1 36 if counter == 10: 37 _closewait.callback(None) 38 print('one', len(arg)) 39 40 41 def stop(arg): 42 print('all done', arg) 43 reactor.stop() 44 45 46 if __name__ == '__main__': 47 url = 'http://www.cnblogs.com' 48 49 defer_list = [] 50 deferObj = task(url) 51 defer_list.append(deferObj) 52 53 v = defer.DeferredList(defer_list) 54 v.addBoth(stop) 55 reactor.run()twisted示例
1 from twisted.web.client import getPage, defer 2 from twisted.internet import reactor 3 import queue 4 5 6 class Response(object): 7 def __init__(self, body, request): 8 self.body = body 9 self.request = request 10 self.url = request.url 11 12 @property 13 def text(self): 14 return self.body.decode('utf-8') 15 16 17 class Request(object): 18 def __init__(self, url, callback=None): 19 self.url = url 20 self.callback = callback 21 22 23 class Scheduler(object): 24 def __init__(self, engine): 25 self.q = queue.Queue() 26 self.engine = engine 27 28 def enqueue_request(self, request): 29 self.q.put(request) 30 31 def next_request(self): 32 try: 33 req = self.q.get(block=False) 34 except Exception as e: 35 req = None 36 37 return req 38 39 def size(self): 40 return self.q.qsize() 41 42 43 class ExecutionEngine(object): 44 def __init__(self): 45 self._closewait = None 46 self.running = True 47 self.start_requests = None 48 self.scheduler = Scheduler(self) 49 50 self.inprogress = set() 51 52 def check_empty(self, response): 53 if not self.running: 54 self._closewait.callback('......') 55 56 def _next_request(self): 57 while self.start_requests: 58 try: 59 request = next(self.start_requests) 60 except StopIteration: 61 self.start_requests = None 62 else: 63 self.scheduler.enqueue_request(request) 64 65 while len(self.inprogress) < 5 and self.scheduler.size() > 0: # 最大并发数为5 66 67 request = self.scheduler.next_request() 68 if not request: 69 break 70 71 self.inprogress.add(request) 72 d = getPage(bytes(request.url, encoding='utf-8')) 73 d.addBoth(self._handle_downloader_output, request) 74 d.addBoth(lambda x, req: self.inprogress.remove(req), request) 75 d.addBoth(lambda x: self._next_request()) 76 77 if len(self.inprogress) == 0 and self.scheduler.size() == 0: 78 self._closewait.callback(None) 79 80 def _handle_downloader_output(self, body, request): 81 """ 82 获取内容,执行回调函数,并且把回调函数中的返回值获取,并添加到队列中 83 :param response: 84 :param request: 85 :return: 86 """ 87 import types 88 89 response = Response(body, request) 90 func = request.callback or self.spider.parse 91 gen = func(response) 92 if isinstance(gen, types.GeneratorType): 93 for req in gen: 94 self.scheduler.enqueue_request(req) 95 96 @defer.inlineCallbacks 97 def start(self): 98 self._closewait = defer.Deferred() 99 yield self._closewait 100 101 def open_spider(self, spider, start_requests): 102 self.start_requests = start_requests 103 self.spider = spider 104 reactor.callLater(0, self._next_request) 105 106 107 class Crawler(object): 108 def __init__(self, spidercls): 109 self.spidercls = spidercls 110 111 self.spider = None 112 self.engine = None 113 114 @defer.inlineCallbacks 115 def crawl(self): 116 self.engine = ExecutionEngine() 117 self.spider = self.spidercls() 118 start_requests = iter(self.spider.start_requests()) 119 start_requests = iter(start_requests) 120 self.engine.open_spider(self.spider, start_requests) 121 yield self.engine.start() 122 123 124 class CrawlerProcess(object): 125 def __init__(self): 126 self._active = set() 127 self.crawlers = set() 128 129 def crawl(self, spidercls, *args, **kwargs): 130 crawler = Crawler(spidercls) 131 132 self.crawlers.add(crawler) 133 d = crawler.crawl(*args, **kwargs) 134 self._active.add(d) 135 return d 136 137 def start(self): 138 dl = defer.DeferredList(self._active) 139 dl.addBoth(self._stop_reactor) 140 reactor.run() 141 142 def _stop_reactor(self, _=None): 143 reactor.stop() 144 145 146 class Spider(object): 147 def start_requests(self): 148 for url in self.start_urls: 149 yield Request(url) 150 151 152 class ChoutiSpider(Spider): 153 name = "chouti" 154 start_urls = [ 155 'http://dig.chouti.com/', 156 ] 157 158 def parse(self, response): 159 print(response.text) 160 161 162 class CnblogsSpider(Spider): 163 name = "cnblogs" 164 start_urls = [ 165 'http://www.cnblogs.com/', 166 ] 167 168 def parse(self, response): 169 print(response.text) 170 171 172 if __name__ == '__main__': 173 174 spider_cls_list = [ChoutiSpider, CnblogsSpider] 175 176 crawler_process = CrawlerProcess() 177 for spider_cls in spider_cls_list: 178 crawler_process.crawl(spider_cls) 179 180 crawler_process.start()模拟scrapy框架
1 mport types 2 from twisted.internet import defer 3 from twisted.web.client import getPage 4 from twisted.internet import reactor 5 6 7 8 class Request(object): 9 def __init__(self, url, callback): 10 self.url = url 11 self.callback = callback 12 self.priority = 0 13 14 15 class HttpResponse(object): 16 def __init__(self, content, request): 17 self.content = content 18 self.request = request 19 20 21 class ChouTiSpider(object): 22 23 def start_requests(self): 24 url_list = ['http://www.cnblogs.com/', 'http://www.bing.com'] 25 for url in url_list: 26 yield Request(url=url, callback=self.parse) 27 28 def parse(self, response): 29 print(response.request.url) 30 # yield Request(url="http://www.baidu.com", callback=self.parse) 31 32 33 34 35 from queue import Queue 36 Q = Queue() 37 38 39 class CallLaterOnce(object): 40 def __init__(self, func, *a, **kw): 41 self._func = func 42 self._a = a 43 self._kw = kw 44 self._call = None 45 46 def schedule(self, delay=0): 47 if self._call is None: 48 self._call = reactor.callLater(delay, self) 49 50 def cancel(self): 51 if self._call: 52 self._call.cancel() 53 54 def __call__(self): 55 self._call = None 56 return self._func(*self._a, **self._kw) 57 58 59 class Engine(object): 60 def __init__(self): 61 self.nextcall = None 62 self.crawlling = [] 63 self.max = 5 64 self._closewait = None 65 66 def get_response(self,content, request): 67 response = HttpResponse(content, request) 68 gen = request.callback(response) 69 if isinstance(gen, types.GeneratorType): 70 for req in gen: 71 req.priority = request.priority + 1 72 Q.put(req) 73 74 75 def rm_crawlling(self,response,d): 76 self.crawlling.remove(d) 77 78 def _next_request(self,spider): 79 if Q.qsize() == 0 and len(self.crawlling) == 0: 80 self._closewait.callback(None) 81 82 if len(self.crawlling) >= 5: 83 return 84 while len(self.crawlling) < 5: 85 try: 86 req = Q.get(block=False) 87 except Exception as e: 88 req = None 89 if not req: 90 return 91 d = getPage(req.url.encode('utf-8')) 92 self.crawlling.append(d) 93 d.addCallback(self.get_response, req) 94 d.addCallback(self.rm_crawlling,d) 95 d.addCallback(lambda _: self.nextcall.schedule()) 96 97 98 @defer.inlineCallbacks 99 def crawl(self): 100 spider = ChouTiSpider() 101 start_requests = iter(spider.start_requests()) 102 flag = True 103 while flag: 104 try: 105 req = next(start_requests) 106 Q.put(req) 107 except StopIteration as e: 108 flag = False 109 110 self.nextcall = CallLaterOnce(self._next_request,spider) 111 self.nextcall.schedule() 112 113 self._closewait = defer.Deferred() 114 yield self._closewait 115 116 @defer.inlineCallbacks 117 def pp(self): 118 yield self.crawl() 119 120 _active = set() 121 obj = Engine() 122 d = obj.crawl() 123 _active.add(d) 124 125 li = defer.DeferredList(_active) 126 li.addBoth(lambda _,*a,**kw: reactor.stop()) 127 128 reactor.run()参考版
更多scrapy文档见:
http://scrapy-chs.readthedocs.io/zh_CN/latest/index.html
https://docs.scrapy.org/en/latest/
参考相关:https://www.cnblogs.com/wupeiqi/articles/6229292.html
标签:__,框架,自定义,self,request,spider,scrapy,._,def From: https://www.cnblogs.com/huangm1314/p/10453916.html