① getPage
1 1 # socket对象(如果下载完成..自动从事件循环中移除) 2 2 from twisted.web.client import getPage
详解:
1 def getPage(url, contextFactory=None, *args, **kwargs): 2 """ 3 Download a web page as a string. 4 5 Download a page. Return a deferred, which will callback with a 6 page (as a string) or errback with a description of the error. 7 8 See L{HTTPClientFactory} to see what extra arguments can be passed. 9 """ 10 return _makeGetterFactory( ####################### 11 url, 12 HTTPClientFactory,######################## 13 contextFactory=contextFactory, 14 *args, **kwargs).deferred ###########################
1 def _makeGetterFactory(url, factoryFactory, contextFactory=None, 2 *args, **kwargs): 3 """ 4 Create and connect an HTTP page getting factory. 5 6 Any additional positional or keyword arguments are used when calling 7 C{factoryFactory}. 8 9 @param factoryFactory: Factory factory that is called with C{url}, C{args} 10 and C{kwargs} to produce the getter 11 12 @param contextFactory: Context factory to use when creating a secure 13 connection, defaulting to L{None} 14 15 @return: The factory created by C{factoryFactory} 16 """ 17 uri = URI.fromBytes(url) 18 factory = factoryFactory(url, *args, **kwargs) ######################### 19 if uri.scheme == b'https': 20 from twisted.internet import ssl 21 if contextFactory is None: 22 contextFactory = ssl.ClientContextFactory() 23 reactor.connectSSL( ######################## 24 nativeString(uri.host), uri.port, factory, contextFactory) 25 else: 26 reactor.connectTCP(nativeString(uri.host), uri.port, factory)############################# 27 return factory
1 class HTTPClientFactory(protocol.ClientFactory): 2 """Download a given URL. 3 4 @type deferred: Deferred 5 @ivar deferred: A Deferred that will fire when the content has 6 been retrieved. Once this is fired, the ivars `status', `version', 7 and `message' will be set. 8 9 @type status: bytes 10 @ivar status: The status of the response. 11 12 @type version: bytes 13 @ivar version: The version of the response. 14 15 @type message: bytes 16 @ivar message: The text message returned with the status. 17 18 @type response_headers: dict 19 @ivar response_headers: The headers that were specified in the 20 response from the server. 21 22 @type method: bytes 23 @ivar method: The HTTP method to use in the request. This should be one of 24 OPTIONS, GET, HEAD, POST, PUT, DELETE, TRACE, or CONNECT (case 25 matters). Other values may be specified if the server being contacted 26 supports them. 27 28 @type redirectLimit: int 29 @ivar redirectLimit: The maximum number of HTTP redirects that can occur 30 before it is assumed that the redirection is endless. 31 32 @type afterFoundGet: C{bool} 33 @ivar afterFoundGet: Deviate from the HTTP 1.1 RFC by handling redirects 34 the same way as most web browsers; if the request method is POST and a 35 302 status is encountered, the redirect is followed with a GET method 36 37 @type _redirectCount: int 38 @ivar _redirectCount: The current number of HTTP redirects encountered. 39 40 @ivar _disconnectedDeferred: A L{Deferred} which only fires after the last 41 connection associated with the request (redirects may cause multiple 42 connections to be required) has closed. The result Deferred will only 43 fire after this Deferred, so that callers can be assured that there are 44 no more event sources in the reactor once they get the result. 45 """ 46 47 protocol = HTTPPageGetter 48 49 url = None 50 scheme = None 51 host = b'' 52 port = None 53 path = None 54 55 def __init__(self, url, method=b'GET', postdata=None, headers=None, 56 agent=b"Twisted PageGetter", timeout=0, cookies=None, 57 followRedirect=True, redirectLimit=20, 58 afterFoundGet=False): 59 self.followRedirect = followRedirect 60 self.redirectLimit = redirectLimit 61 self._redirectCount = 0 62 self.timeout = timeout 63 self.agent = agent 64 self.afterFoundGet = afterFoundGet 65 if cookies is None: 66 cookies = {} 67 self.cookies = cookies 68 if headers is not None: 69 self.headers = InsensitiveDict(headers) 70 else: 71 self.headers = InsensitiveDict() 72 if postdata is not None: 73 self.headers.setdefault(b'Content-Length', 74 intToBytes(len(postdata))) 75 # just in case a broken http/1.1 decides to keep connection alive 76 self.headers.setdefault(b"connection", b"close") 77 self.postdata = postdata 78 self.method = method 79 80 self.setURL(url) 81 82 self.waiting = 1 83 self._disconnectedDeferred = defer.Deferred() 84 self.deferred = defer.Deferred() ################################### 85 # Make sure the first callback on the result Deferred pauses the 86 # callback chain until the request connection is closed. 87 self.deferred.addBoth(self._waitForDisconnect) 88 self.response_headers = None
② defer
1 #defer.Deferred 特殊的socket对象(不发送请求,手动从事件循环移除) 2 from twisted.internet import defer
详细:
class Deferred: """ This is a callback which will be put off until later. Why do we want this? Well, in cases where a function in a threaded program would block until it gets a result, for Twisted it should not block. Instead, it should return a L{Deferred}. This can be implemented for protocols that run over the network by writing an asynchronous protocol for L{twisted.internet}. For methods that come from outside packages that are not under our control, we use threads (see for example L{twisted.enterprise.adbapi}). For more information about Deferreds, see doc/core/howto/defer.html or U{http://twistedmatrix.com/documents/current/core/howto/defer.html} When creating a Deferred, you may provide a canceller function, which will be called by d.cancel() to let you do any clean-up necessary if the user decides not to wait for the deferred to complete. @ivar called: A flag which is C{False} until either C{callback} or C{errback} is called and afterwards always C{True}. @type called: L{bool} @ivar paused: A counter of how many unmatched C{pause} calls have been made on this instance. @type paused: L{int} @ivar _suppressAlreadyCalled: A flag used by the cancellation mechanism which is C{True} if the Deferred has no canceller and has been cancelled, C{False} otherwise. If C{True}, it can be expected that C{callback} or C{errback} will eventually be called and the result should be silently discarded. @type _suppressAlreadyCalled: L{bool} @ivar _runningCallbacks: A flag which is C{True} while this instance is executing its callback chain, used to stop recursive execution of L{_runCallbacks} @type _runningCallbacks: L{bool} @ivar _chainedTo: If this L{Deferred} is waiting for the result of another L{Deferred}, this is a reference to the other Deferred. Otherwise, L{None}. """ called = False paused = False _debugInfo = None _suppressAlreadyCalled = False # Are we currently running a user-installed callback? Meant to prevent # recursive running of callbacks when a reentrant call to add a callback is # used. _runningCallbacks = False # Keep this class attribute for now, for compatibility with code that # sets it directly. debug = False _chainedTo = None def __init__(self, canceller=None): """ Initialize a L{Deferred}. @param canceller: a callable used to stop the pending operation scheduled by this L{Deferred} when L{Deferred.cancel} is invoked. The canceller will be passed the deferred whose cancelation is requested (i.e., self). If a canceller is not given, or does not invoke its argument's C{callback} or C{errback} method, L{Deferred.cancel} will invoke L{Deferred.errback} with a L{CancelledError}. Note that if a canceller is not given, C{callback} or C{errback} may still be invoked exactly once, even though defer.py will have already invoked C{errback}, as described above. This allows clients of code which returns a L{Deferred} to cancel it without requiring the L{Deferred} instantiator to provide any specific implementation support for cancellation. New in 10.1. @type canceller: a 1-argument callable which takes a L{Deferred}. The return result is ignored. """ self.callbacks = [] self._canceller = canceller if self.debug: self._debugInfo = DebugInfo() self._debugInfo.creator = traceback.format_stack()[:-1]
1 def callback(self, result): 2 """ 3 Run all success callbacks that have been added to this L{Deferred}. 4 5 Each callback will have its result passed as the first argument to 6 the next; this way, the callbacks act as a 'processing chain'. If 7 the success-callback returns a L{Failure} or raises an L{Exception}, 8 processing will continue on the *error* callback chain. If a 9 callback (or errback) returns another L{Deferred}, this L{Deferred} 10 will be chained to it (and further callbacks will not run until that 11 L{Deferred} has a result). 12 13 An instance of L{Deferred} may only have either L{callback} or 14 L{errback} called on it, and only once. 15 16 @param result: The object which will be passed to the first callback 17 added to this L{Deferred} (via L{addCallback}). 18 19 @raise AlreadyCalledError: If L{callback} or L{errback} has already been 20 called on this L{Deferred}. 21 """ 22 assert not isinstance(result, Deferred) 23 self._startRunCallbacks(result) #########################
1 def _startRunCallbacks(self, result): 2 if self.called: 3 if self._suppressAlreadyCalled: 4 self._suppressAlreadyCalled = False 5 return 6 if self.debug: 7 if self._debugInfo is None: 8 self._debugInfo = DebugInfo() 9 extra = "\n" + self._debugInfo._getDebugTracebacks() 10 raise AlreadyCalledError(extra) 11 raise AlreadyCalledError 12 if self.debug: 13 if self._debugInfo is None: 14 self._debugInfo = DebugInfo() 15 self._debugInfo.invoker = traceback.format_stack()[:-2] 16 self.called = True 17 self.result = result 18 self._runCallbacks()
1 def _runCallbacks(self): 2 """ 3 Run the chain of callbacks once a result is available. 4 5 This consists of a simple loop over all of the callbacks, calling each 6 with the current result and making the current result equal to the 7 return value (or raised exception) of that call. 8 9 If L{_runningCallbacks} is true, this loop won't run at all, since 10 it is already running above us on the call stack. If C{self.paused} is 11 true, the loop also won't run, because that's what it means to be 12 paused. 13 14 The loop will terminate before processing all of the callbacks if a 15 L{Deferred} without a result is encountered. 16 17 If a L{Deferred} I{with} a result is encountered, that result is taken 18 and the loop proceeds. 19 20 @note: The implementation is complicated slightly by the fact that 21 chaining (associating two L{Deferred}s with each other such that one 22 will wait for the result of the other, as happens when a Deferred is 23 returned from a callback on another L{Deferred}) is supported 24 iteratively rather than recursively, to avoid running out of stack 25 frames when processing long chains. 26 """ 27 if self._runningCallbacks: 28 # Don't recursively run callbacks 29 return 30 31 # Keep track of all the Deferreds encountered while propagating results 32 # up a chain. The way a Deferred gets onto this stack is by having 33 # added its _continuation() to the callbacks list of a second Deferred 34 # and then that second Deferred being fired. ie, if ever had _chainedTo 35 # set to something other than None, you might end up on this stack. 36 chain = [self] 37 38 while chain: 39 current = chain[-1] 40 41 if current.paused: 42 # This Deferred isn't going to produce a result at all. All the 43 # Deferreds up the chain waiting on it will just have to... 44 # wait. 45 return 46 47 finished = True 48 current._chainedTo = None 49 while current.callbacks: 50 item = current.callbacks.pop(0) 51 callback, args, kw = item[ 52 isinstance(current.result, failure.Failure)] 53 args = args or () 54 kw = kw or {} 55 56 # Avoid recursion if we can. 57 if callback is _CONTINUE: 58 # Give the waiting Deferred our current result and then 59 # forget about that result ourselves. 60 chainee = args[0] 61 chainee.result = current.result 62 current.result = None 63 # Making sure to update _debugInfo 64 if current._debugInfo is not None: 65 current._debugInfo.failResult = None 66 chainee.paused -= 1 67 chain.append(chainee) 68 # Delay cleaning this Deferred and popping it from the chain 69 # until after we've dealt with chainee. 70 finished = False 71 break 72 73 try: 74 current._runningCallbacks = True 75 try: 76 current.result = callback(current.result, *args, **kw) 77 if current.result is current: 78 warnAboutFunction( 79 callback, 80 "Callback returned the Deferred " 81 "it was attached to; this breaks the " 82 "callback chain and will raise an " 83 "exception in the future.") 84 finally: 85 current._runningCallbacks = False 86 except: 87 # Including full frame information in the Failure is quite 88 # expensive, so we avoid it unless self.debug is set. 89 current.result = failure.Failure(captureVars=self.debug) 90 else: 91 if isinstance(current.result, Deferred): 92 # The result is another Deferred. If it has a result, 93 # we can take it and keep going. 94 resultResult = getattr(current.result, 'result', _NO_RESULT) 95 if resultResult is _NO_RESULT or isinstance(resultResult, Deferred) or current.result.paused: 96 # Nope, it didn't. Pause and chain. 97 current.pause() 98 current._chainedTo = current.result 99 # Note: current.result has no result, so it's not 100 # running its callbacks right now. Therefore we can 101 # append to the callbacks list directly instead of 102 # using addCallbacks. 103 current.result.callbacks.append(current._continuation()) 104 break 105 else: 106 # Yep, it did. Steal it. 107 current.result.result = None 108 # Make sure _debugInfo's failure state is updated. 109 if current.result._debugInfo is not None: 110 current.result._debugInfo.failResult = None 111 current.result = resultResult 112 113 if finished: 114 # As much of the callback chain - perhaps all of it - as can be 115 # processed right now has been. The current Deferred is waiting on 116 # another Deferred or for more callbacks. Before finishing with it, 117 # make sure its _debugInfo is in the proper state. 118 if isinstance(current.result, failure.Failure): 119 # Stash the Failure in the _debugInfo for unhandled error 120 # reporting. 121 current.result.cleanFailure() 122 if current._debugInfo is None: 123 current._debugInfo = DebugInfo() 124 current._debugInfo.failResult = current.result 125 else: 126 # Clear out any Failure in the _debugInfo, since the result 127 # is no longer a Failure. 128 if current._debugInfo is not None: 129 current._debugInfo.failResult = None 130 131 # This Deferred is done, pop it from the chain and move back up 132 # to the Deferred which supplied us with our result. 133 chain.pop() ###########################
③ reactor
1 # 事件循环(终止条件,所有的socket都已经移除) 2 from twisted.internet import reactor
1 # Copyright (c) Twisted Matrix Laboratories. 2 # See LICENSE for details. 3 4 """ 5 The reactor is the Twisted event loop within Twisted, the loop which drives 6 applications using Twisted. The reactor provides APIs for networking, 7 threading, dispatching events, and more. 8 9 The default reactor depends on the platform and will be installed if this 10 module is imported without another reactor being explicitly installed 11 beforehand. Regardless of which reactor is installed, importing this module is 12 the correct way to get a reference to it. 13 14 New application code should prefer to pass and accept the reactor as a 15 parameter where it is needed, rather than relying on being able to import this 16 module to get a reference. This simplifies unit testing and may make it easier 17 to one day support multiple reactors (as a performance enhancement), though 18 this is not currently possible. 19 20 @see: L{IReactorCore<twisted.internet.interfaces.IReactorCore>} 21 @see: L{IReactorTime<twisted.internet.interfaces.IReactorTime>} 22 @see: L{IReactorProcess<twisted.internet.interfaces.IReactorProcess>} 23 @see: L{IReactorTCP<twisted.internet.interfaces.IReactorTCP>} 24 @see: L{IReactorSSL<twisted.internet.interfaces.IReactorSSL>} 25 @see: L{IReactorUDP<twisted.internet.interfaces.IReactorUDP>} 26 @see: L{IReactorMulticast<twisted.internet.interfaces.IReactorMulticast>} 27 @see: L{IReactorUNIX<twisted.internet.interfaces.IReactorUNIX>} 28 @see: L{IReactorUNIXDatagram<twisted.internet.interfaces.IReactorUNIXDatagram>} 29 @see: L{IReactorFDSet<twisted.internet.interfaces.IReactorFDSet>} 30 @see: L{IReactorThreads<twisted.internet.interfaces.IReactorThreads>} 31 @see: L{IReactorPluggableResolver<twisted.internet.interfaces.IReactorPluggableResolver>} 32 """ 33 34 from __future__ import division, absolute_import 35 36 import sys 37 del sys.modules['twisted.internet.reactor'] 38 from twisted.internet import default 39 default.install() #######################这里就不深挖了
实例:
1 # 事件循环(终止条件,所有的socket都已经移除) 2 from twisted.internet import reactor 3 # socket对象(如果下载完成..自动从事件循环中移除) 4 from twisted.web.client import getPage 5 #defer.Deferred 特殊的socket对象(不发送请求,手动从事件循环移除) 6 from twisted.internet import defer 7 8 #简单分为以下三步: 9 #1.利用getPage创建socket 10 #2.将socket添加到事件循环中 11 #3.开始事件循环(内部发送请求,并接受响应,当所有的socket请求完成后,终止事件循环) 12 13 #1.利用getPage创建socket 14 # def response(content): 15 # print(content) 16 # 17 # def task(): 18 # url = "http://www.baidu.com" 19 # d = getPage(url) 20 # d.addCallback(response) #响应后执行 21 22 #2.将socket添加到事件循环中 23 # def response(content): 24 # print(content) 25 # 26 # @defer.inlineCallbacks #分析源码 27 # def task(): 28 # url = "http://www.baidu.com" 29 # d = getPage(url.encode('utf-8')) #defer.Deferred()对象 30 # d.addCallback(response) #响应后执行 31 # yield d 32 33 #3.开始事件循环(内部发送请求,并接受响应,当所有的socket请求完成后,终止事件循环) 34 # def response(content): 35 # print(content) 36 # 37 # @defer.inlineCallbacks #分析源码 38 # def task(): 39 # url = "http://www.baidu.com" 40 # d = getPage(url.encode('utf-8')) #defer.Deferred()对象 41 # d.addCallback(response) #响应后执行 42 # yield d 43 # 44 # task() 45 # reactor.run() 46 47 #4.增加事件循环终止 48 # def response(content): 49 # print(content) 50 # 51 # def done(*args,**kwargs): 52 # reactor.stop() 53 # 54 # @defer.inlineCallbacks #分析源码 55 # def task(): 56 # url = "http://www.baidu.com" 57 # d = getPage(url.encode('utf-8')) #defer.Deferred()对象 58 # d.addCallback(response) #响应后执行 59 # yield d 60 # 61 # d = task() 62 # dd = defer.DeferredList([d,]) 63 # dd.addBoth(done) #监听d是否完成,执行done函数 64 # 65 # reactor.run() 66 67 #5.for实现并发功能 68 # def response(content): 69 # print(content) 70 # 71 # def done(*args,**kwargs): 72 # reactor.stop() #终止事件循环 73 # 74 # @defer.inlineCallbacks #分析源码 75 # def task(): 76 # url = "http://www.baidu.com" 77 # d = getPage(url.encode('utf-8')) #defer.Deferred()对象 78 # d.addCallback(response) #响应后执行 79 # yield d 80 # # url = "http://www.baidu.com" 81 # # d = getPage(url.encode('utf-8')) # defer.Deferred()对象 82 # # d.addCallback(response) # 响应后执行 83 # # yield d 84 # 85 # li = [] 86 # for i in range(10): 87 # d = task() #仅仅只是返回socket对象,实现并发 88 # li.append(d) 89 # 90 # dd = defer.DeferredList(li) 91 # dd.addBoth(done) #监听d是否完成,执行done函数 92 # 93 # reactor.run() #开始事件循环 94 95 96 #6.增加defer.Deferred()挂起 97 _close = None 98 count = 0 99 100 def response(content): 101 print(content) 102 # global count 103 # count += 1 104 # if count == 3: 105 # _close.callback(None) #终止defer.Deferred() 106 107 @defer.inlineCallbacks 108 def task(): 109 """ 110 每个爬虫的开始:start_requests 111 :return: 112 """ 113 url = "http://www.baidu.com" 114 d1 = getPage(url.encode('utf-8')) #defer对象 115 d1.addCallback(response) #响应后执行 116 117 url = "http://www.cnblogs.com" 118 d2 = getPage(url.encode('utf-8')) # defer对象 119 d2.addCallback(response) # 响应后执行 120 121 url = "http://www.bing.com" 122 d3 = getPage(url.encode('utf-8')) # defer对象 123 d3.addCallback(response) # 响应后执行 124 125 global _close #修改全局变量 126 _close = defer.Deferred() 127 yield _close #需要手动终止 128 129 def done(*args,**kwargs): 130 reactor.stop() #终止事件循环 131 132 #两个爬虫 133 spider1 = task() 134 spider2 = task() 135 136 dd = defer.DeferredList([spider1,spider2]) 137 dd.addBoth(done) #监听d是否完成,执行done函数 138 139 reactor.run()
总结:
1 # 1.特殊对象 2 # -d = getPage(url.encode('utf-8')) 完成自动终止 3 # -d.addCallback(response) 4 # -defer.Deferred() _close.callback(None) 挂起,手动终止 5 # 6 # 7 # 2.defer.inlineCallbacks 8 # 9 # 3.reactor.callLater(0,函数名) 10 # 11 # 4. 12 # reactor.run() 13 # reactor.stop() 14 # 15 # 5. 16 # dd = defer.DeferredList([d1,d2]) 17 # dd.addBoth(lambda _:reactor.stop())
标签:defer,None,框架,self,Twisted,current,Deferred,scrapy,result From: https://www.cnblogs.com/huangm1314/p/10453398.html