#1
#asyncio 没有提供http协议的接口 aiohttp ''' import asyncio import socket from urllib.parse import urlparse async def get_url(url): # 通过socket请求html url = urlparse(url) host = url.netloc path = url.path if path == "": path = "/" reader,writer=await asyncio.open_connection(host,80) writer.write("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8")) all_lines=[] async for raw_line in reader: data=raw_line.decode('utf8') all_lines.append(data) html="\n".join(all_lines) return html if __name__=="__main__": import time start_time=time.time() loop=asyncio.get_event_loop() tasks=[] for url in range(20): url="http://shop.projectsedu.com/goods/{}/".format(url) tasks.append(get_url(url)) loop.run_until_complete(asyncio.wait(tasks)) print(time.time()-start_time) ''' import asyncio import socket from urllib.parse import urlparse async def get_url(url): #通过socket请求html url = urlparse(url) host = url.netloc path = url.path if path == "": path = "/" #建立socket连接 reader, writer= await asyncio.open_connection(host,80) writer.write("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8")) all_lines = [] async for raw_line in reader: data = raw_line.decode("utf8") all_lines.append(data) html = "\n".join(all_lines) print(html) return html async def main(): tasks=[] for url in range(20): url = "http://shop.projectsedu.com/goods/{}/".format(url) tasks.append(asyncio.ensure_future(get_url(url))) for task in asyncio.as_completed(tasks): result=await task print(result) # async def main(): # tasks = [] # for url in range(20): # url = "http://shop.projectsedu.com/goods/{}/".format(url) # tasks.append(asyncio.ensure_future(get_url(url))) # for task in asyncio.as_completed(tasks): # result = await task # print(result) if __name__ == "__main__": # import time # start_time = time.time() loop = asyncio.get_event_loop() loop.run_until_complete(main()) # print(time.time() - start_time)
#2 asyncio lock
''' total=0 async def add(): #1.dosomething #2.io操作 #1.dosomenthing3 global total for i in range(1000): total+=1 async def desc(): global total for i in range(1000): total -= 1 if __name__=="__main__": import asyncio tasks=[add(),desc()] loop=asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) print(total) ''' import asyncio,aiohttp from asyncio import Lock,Queue cache={} lock=Lock() async def get_stuff(url): async with lock: if url in cache: return cache[url] stuff=await aiohttp.request ('GET',url) cache[url]=stuff return stuff async def parse_stuff(): stuff=await get_stuff() #do some parsing async def use_stuff():
#3 asyncio scrapy
#asyncio 爬虫、去重、入库 #asyncio爬虫、去重、入库 import asyncio import re import aiohttp import aiomysql from pyquery import PyQuery stopping = False start_url = "http://www.jobbole.com/" waitting_urls = [] seen_urls = set() sem = asyncio.Semaphore(3) async def fetch(url, session): async with sem: try: async with session.get(url) as resp: print("url status: {}".format(resp.status)) if resp.status in [200, 201]: data = await resp.text() return data except Exception as e: print(e) def extract_urls(html): urls = [] pq = PyQuery(html) for link in pq.items("a"): url = link.attr("href") if url and url.startswith("http") and url not in seen_urls: urls.append(url) waitting_urls.append(url) return urls async def init_urls(url, session): html = await fetch(url, session) seen_urls.add(url) extract_urls(html) async def article_handler(url, session, pool): #获取文章详情并解析入库 html = await fetch(url, session) seen_urls.add(url) extract_urls(html) pq = PyQuery(html) title = pq("title").text() async with pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute("SELECT 42;") insert_sql = "insert into article_test(title) values('{}')".format(title) await cur.execute(insert_sql) async def consumer(pool): async with aiohttp.ClientSession() as session: while not stopping: if len(waitting_urls) == 0: await asyncio.sleep(0.5) continue url = waitting_urls.pop() print("start get url: {}".format(url)) if re.match('http://.*?jobbole.com/\d+/', url): if url not in seen_urls: asyncio.ensure_future(article_handler(url, session, pool)) else: if url not in seen_urls: asyncio.ensure_future(init_urls(url, session)) async def main(loop): #等待mysql连接建立好 pool = await aiomysql.create_pool(host='127.0.0.1', port=3306, user='root', password='root', db='aiomysql_test', loop=loop, charset="utf8", autocommit=True) async with aiohttp.ClientSession() as session: html = await fetch(start_url, session) seen_urls.add(start_url) extract_urls(html) asyncio.ensure_future(consumer(pool)) if __name__ == "__main__": loop = asyncio.get_event_loop() asyncio.ensure_future(main(loop)) loop.run_forever() ''' import asyncio import re import aiohttp import aiomysql from pyquery import PyQuery start_url="http://www.jobbole.com" waiting_urls=[] seen_urls=set() stopping=False async def fetch(url,session): # async with aiohttp.ClientSession() as session: async with session.get(url) as resp: try: print('url status:{}'.format(resp.status)) # print(resp.status) # print(await resp.text()) if resp.status in [200,201]: data=await resp.text() return data except Exception as e: print(e) def extract_urls(html): urls=[] pq=PyQuery(html) for link in pq.items("a"): url=link.attr("href") if url and url.startswith("http") and url not in seen_urls: urls.append(url) waiting_urls.append(url) return urls async def init_urls(url,session): async with aiohttp.ClientSession() as session: html=await fetch(url,session) seen_urls.add(url) extract_urls(html) async def article_handler(url,session,pool): #获取文章详情并解析入库 html=await fetch(url,session) extract_urls(html) pq=PyQuery(html) title=pq("title").text() async with pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute("SELECT 42;") insert_sql="insert into article_test(title) values('{}')".format(title) await cur.execute(insert_sql) # print(cur.description) # (r,)=await cur.fetchone() # assert r==42 async def consumer(pool): while not stopping: if len(waiting_urls)==0: await asyncio.sleep(0.5) continue url=waiting_urls.pop() print("start get url:{}".format(url)) if re.match('http://.*?jobbole.com/\d+/',url): if url not in seen_urls: asyncio.ensure_future(article_handler(url,session,pool)) await asyncio.sleep(2) # else: # if url not in seen_urls: # asyncio.ensure_future(init_urls(url,session)) async def main(loop): #等待mysql连接建立好 pool=await aiomysql.create_pool(host='127.0.0.1',port=3306, user='root',password='123', db='aiomysql_test',loop=loop, charset="utf8",autocommit=True) async with aiohttp.ClientSession() as session: html = await fetch(start_url, session) seen_urls.add(start_url) extract_urls(html) asyncio.ensure_future(consumer(pool)) # asyncio.ensure_future(init_urls(start_url,session)) # asyncio.ensure_future(consumer(pool)) if __name__=="__main__": loop=asyncio.get_event_loop() asyncio.ensure_future(main(loop)) loop.run_forever() '''
#4 call_test
""" import asyncio def callback(sleep_times): print("sleep{} success".format(sleep_times)) def stoploop(loop): loop.stop() ''' #call_later if __name__=="__main__": loop=asyncio.get_event_loop() # loop.call_soon(callback,2) # loop.call_soon(stoploop,loop) loop.call_later(2,callback,2) loop.call_later(1,callback,1) loop.call_later(3,callback,3) loop.call_soon(callback,2) loop.run_forever() ''' if __name__=="__main__": loop=asyncio.get_event_loop() now=loop.time() # loop.call_soon(callback,2) # loop.call_soon(stoploop,loop) loop.call_at(now+2,callback,2) loop.call_at(now+1,callback,1) loop.call_at(now+3,callback,3) loop.call_soon(callback,4) loop.run_forever() """ import asyncio def callback(sleep_times,loop): print("success {}".format(loop.time())) def stoploop(loop): loop.stop() ''' #call_later if __name__=="__main__": loop=asyncio.get_event_loop() # loop.call_soon(callback,2) # loop.call_soon(stoploop,loop) loop.call_later(2,callback,2) loop.call_later(1,callback,1) loop.call_later(3,callback,3) loop.call_soon(callback,2) loop.run_forever() ''' if __name__=="__main__": loop=asyncio.get_event_loop() now=loop.time() # loop.call_soon(callback,2) # loop.call_soon(stoploop,loop) loop.call_at(now+2,callback,2,loop) loop.call_at(now+1,callback,1,loop) loop.call_at(now+3,callback,3,loop) loop.call_soon(callback,4,loop) loop.run_forever()
#coroutine_nest coroutine_nest
#1.run_until_complete # import asyncio # loop=asyncio.get_event_loop() # loop.run_forever() # loop.run_until_complete() #1.loop会被放到future中 #2.取消future(task) import asyncio import time # async def get_html(sleep_times): # print('waiting') # await asyncio.sleep(sleep_times) # print("done after {}s".format(sleep_times)) # # if __name__=="__main__": # task1=get_html(3) # task2=get_html(3) # task3=get_html(3) # # tasks=[task1,task2,task3] # # loop=asyncio.get_event_loop() # try: # loop.run_until_complete(asyncio.wait(tasks)) # except KeyboardInterrupt as e: # all_tasks=asyncio.Task.all_tasks() # for task in all_tasks: # print("cancel task") # print(task.cancel()) # loop.stop() # # loop.run_forever() # finally: # loop.close() import asyncio import time async def get_html(sleep_times): print("waiting") await asyncio.sleep(sleep_times) print("done after {}s".format(sleep_times)) if __name__ == "__main__": task1 = get_html(2) task2 = get_html(3) task3 = get_html(3) tasks = [task1, task2, task3] loop = asyncio.get_event_loop() try: loop.run_until_complete(asyncio.wait(tasks)) except KeyboardInterrupt as e: all_tasks = asyncio.Task.all_tasks() for task in all_tasks: print("cancel task") print(task.cancel()) loop.stop() loop.run_forever() finally: loop.close()
#loop_test
#事件循环+回调(驱动生成器)+epoll(IO多路复用) #asyncio是python用于解决异步io编程的一整套解决方案 #tornmado,gevent,twisted(scrapy,django channels) #tornado(实现web服务器),django+flask(uwsgi,gunicorn+nginx) #tornado可以直接部署,nginx+tornado #使用 asyncio ''' import asyncio import time ,datetime async def get_html(url): print("start get url") # await asyncio.sleep(2) time.sleep(2) print("end get url") if __name__=="__main__": start_time=time.time() loop=asyncio.get_event_loop() tasks=[get_html("http://www.baidu.com") for i in range(5)] # loop.run_until_complete(get_html("http://www.baidu.com")) loop.run_until_complete(asyncio.wait(tasks)) print(time.time()-start_time) ''' ''' #获取协程的返回值 import asyncio import time ,datetime async def get_html(url): print("start get url") await asyncio.sleep(2) # time.sleep(2) return "bobby" if __name__=="__main__": start_time=time.time() loop=asyncio.get_event_loop() # get_future=asyncio.ensure_future(get_html("http://www.baidu.com")) # loop.create_task() # loop.run_until_complete(asyncio.wait(get_future)) # print(get_future.result()) # print(time.time()-start_time) task=loop.create_task(get_html("http://www.baidu.com")) loop.run_until_complete(asyncio.wait(task)) print(task.result()) ''' # import asyncio # import time # async def main(): # print('hello') # await asyncio.sleep(1) # print('world') # asyncio.run(main()) ''' import asyncio import time async def say_after(delay,what): await asyncio.sleep(delay) print(what) async def main(): print(f'started at {time.strftime("%X")}') await say_after(1,'hello') await say_after(2,'world') # await asyncio.sleep(1) print(f'finished at{time.strftime("%X")}') asyncio.run(main()) ''' ''' import asyncio import time async def say_after(delay,what): await asyncio.sleep(delay) print(what) async def main(): task1=asyncio.create_task( say_after(1,'hello') ) task2=asyncio.create_task( say_after(2, 'world') ) print(f'started at {time.strftime("%X")}') await task1 await task2 # await asyncio.sleep(1) print(f'finished at{time.strftime("%X")}') asyncio.run(main()) ''' # import asyncio # import time # async def get_html(url): # print("start get url") # await asyncio.sleep(2) # print("end get url") # # if __name__ == "__main__": # start_time = time.time() # loop = asyncio.get_event_loop() # tasks = [get_html("http://www.imooc.com") for i in range(10)] # loop.run_until_complete(asyncio.wait(tasks)) # print(time.time()-start_time) #获取协程的返回值 # import asyncio # import time # from functools import partial # async def get_html(url): # print("start get url") # await asyncio.sleep(2) # return "bobby" # # def callback(url, future): # print(url) # print("send email to bobby") # # if __name__ == "__main__": # start_time = time.time() # loop = asyncio.get_event_loop() # # get_future = asyncio.ensure_future(get_html("http://www.imooc.com")) # task = loop.create_task(get_html("http://www.imooc.com")) # task.add_done_callback(partial(callback, "http://www.imooc.com")) # loop.run_until_complete(task) # print(task.result()) # import asyncio # import time ,datetime # from functools import partial # async def get_html(url): # print("start get url") # await asyncio.sleep(2) # # time.sleep(2) # return "bobby" # def callback(url,future): # print(url) # print("send mail to bobby") # # if __name__=="__main__": # start_time=time.time() # loop=asyncio.get_event_loop() # # get_future=asyncio.ensure_future(get_html("http://www.imooc.com")) # # loop.run_until_complete(get_future) # # print(get_future.result()) # # print(time.time()-start_time) # task=loop.create_task(get_html("http://www.baidu.com")) # task.add_done_callback(partial(callback,"http://www.imooc.com")) # loop.run_until_complete(task) # print(task.result()) #await 和 gather import asyncio import time async def get_html(url): print("start get url") await asyncio.sleep(2) print("end get url") if __name__ == "__main__": start_time = time.time() loop = asyncio.get_event_loop() # tasks = [get_html("http://www.imooc.com") for i in range(10)] # loop.run_until_complete(asyncio.gather(*tasks)) # print(time.time()-start_time) # gather和wait的区别 # gather更加high-level group1=[get_html("http://projectsedu.com") for i in range(2)] group2=[get_html("http://www.imooc.com") for i in range(3)] group1=asyncio.gather(*group1) group2=asyncio.gather(*group2) group2.cancel() loop.run_until_complete(asyncio.gather(group1,group2)) print(time.time()-start_time)
#hread_asyncio
#使用多线程:在携程中集成阻塞io import asyncio import socket from concurrent.futures import ThreadPoolExecutor from urllib.parse import urlparse def get_url(url): #通过socket请求html url = urlparse(url) host = url.netloc path = url.path if path == "": path = "/" #建立socket连接 client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # client.setblocking(False) client.connect((host, 80)) #阻塞不会消耗cpu #不停的询问连接是否建立好, 需要while循环不停的去检查状态 #做计算任务或者再次发起其他的连接请求 client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8")) data = b"" while True: d = client.recv(1024) if d: data += d else: break data = data.decode("utf8") html_data = data.split("\r\n\r\n")[1] print(html_data) client.close() if __name__ == "__main__": import time start_time=time.time() loop=asyncio.get_event_loop() executor=ThreadPoolExecutor() tasks=[] for url in range(20): url="http://shop.projectsedu.com/goods/{}/".format(url) task=loop.run_in_executor(executor,get_url,url) tasks.append(task) loop.run_until_complete(asyncio.wait(tasks)) print(time.time()-start_time)
标签:__,get,url,应用,time,loop,asyncio From: https://www.cnblogs.com/mengdie1978/p/16776663.html