首页 > 编程语言 >[Python] 爬虫系统与数据处理实战 Part.3 多线程和分布式

[Python] 爬虫系统与数据处理实战 Part.3 多线程和分布式

时间:2023-01-31 21:45:58浏览次数:43  
标签:pc Python self server url client Part.3 import 多线程

为什么用

  • 反爬虫

 

多线程

  • 复杂性
    • 资源、数据的安全性:锁保护
    • 原子性:数据操作是天然互斥的
    • 同步等待:wait()、notify()、notifyall()
    • 死锁:多个线程对资源互锁
    • 容灾:任何线程出错,程序都会停止
  • Python 多线程
    • 支持多线程
    • 直接映射到native线程(Java多线程由JVM映射到一个native thread上)
    • GIL(Global Interpretor Lock):同一时间只有一个CPU运行,对于多核的利用能力有限(相当于单线程)
    • 适用于 IO 阻塞为主的场景,而不是 CPU 阻塞为主的场景
    • Python主要用于 offline 数据处理,而不是处理 online 并发的服务请求(C++,Java)
  • 实现
    • 创建线程池 threads = []
    • 确认 url 队列线程安全 Queue Deque
    • 从队列取出 url,分配一个线程开始爬取 pop()/get() threading.Thread
    • 如果线程池满,循环等待,直到有线程结束
    • 从线程池移除已完成下载的线程
    • 如当前级别 url 遍历完毕,t.join() 等待所有线程结束,然后开始下一级别的爬取
  • 优势
    • 内存空间共享,数据交换高效
    • CPU使用效率高,利用多个CPU操作
    • 开发便捷
    • 创建、销毁的开销小
    • 减小下载出错、阻塞对抓取速度的影响,提高下载速度
    • 对于没有反爬限制的网站,下载速度提升明显
  • 不足
    • 对于有反爬虫的网站,速度提升有限
    • 提高了复杂性,对编码要求高
    • 线程越多,每个线程获得时间越少,线程切换更频繁带来额外开销
    • 线程之间资源竞争更激烈

 multi_thread_mfw

  1 # -*- coding: utf-8 -*-
  2 
  3 import urllib3
  4 import os
  5 from collections import deque
  6 import json
  7 from lxml import etree
  8 import hashlib
  9 from bloom_filter import BloomFilter
 10 
 11 import threading
 12 import time
 13 
 14 class CrawlBSF:
 15     request_headers = {
 16         'host': "www.mafengwo.cn",
 17         'connection': "keep-alive",
 18         'cache-control': "no-cache",
 19         'upgrade-insecure-requests': "1",
 20         'user-agent': "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.95 Safari/537.36",
 21         'accept': "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
 22         'accept-language': "zh-CN,en-US;q=0.8,en;q=0.6"
 23     }
 24 
 25     cur_level = 0
 26     max_level = 5
 27     iter_width = 50
 28     downloaded_urls = []
 29 
 30     def __init__(self, url, dir_name):
 31         self.dir_name = dir_name
 32         self.du_md5_file_name = dir_name + '/download.txt'
 33         self.du_url_file_name = dir_name + '/urls.txt'
 34 
 35         self.bloom_downloaded_urls = BloomFilter(1024 * 1024 * 16, 0.01)
 36         self.bloom_url_queue = BloomFilter(1024 * 1024 * 16, 0.01)
 37 
 38         self.cur_queue = deque()
 39         self.child_queue = deque()
 40 
 41         self.root_url = url
 42         self.cur_queue.append(url)
 43         self.du_file = open(self.du_url_file_name, 'a+')
 44         try:
 45             self.dumd5_file = open(self.du_md5_file_name, 'r')
 46             self.downloaded_urls = self.dumd5_file.readlines()
 47             self.dumd5_file.close()
 48             for urlmd5 in self.downloaded_urls:
 49                 self.bloom_downloaded_urls.add(urlmd5[:-2])
 50         except IOError:
 51             print( "File not found")
 52         finally:
 53             self.dumd5_file = open(self.du_md5_file_name, 'a+')
 54 
 55     def enqueueUrl(self, url):
 56         if url not in self.bloom_url_queue and hashlib.md5(url.encode('utf8')).hexdigest() not in crawler.bloom_downloaded_urls:
 57             self.child_queue.append(url)
 58             self.bloom_url_queue.add(url)
 59 
 60     def dequeuUrl(self):
 61         try:
 62             url = self.cur_queue.popleft()
 63             return url
 64         except IndexError:
 65             return None
 66 
 67     def close(self):
 68         self.dumd5_file.close()
 69         self.du_file.close()
 70 
 71 
 72 # Global variables
 73 num_downloaded_pages = 0
 74 
 75 #download the page content
 76 def get_page_content(cur_url):
 77     global num_downloaded_pages
 78     print( "downloading %s at level %d" % (cur_url, crawler.cur_level))
 79     try:
 80         http = urllib3.PoolManager()
 81         r = http.request('GET', cur_url, headers = CrawlBSF.request_headers)
 82         html_page = r.data
 83         filename = cur_url[7:].replace('/', '_')
 84         fo = open("%s/%s.html" % (crawler.dir_name, filename), 'wb+')
 85         fo.write(html_page)
 86         fo.close()
 87     except IOError as err:
 88         print(err)
 89         return
 90     except Exception as err:
 91         print( err )
 92         return
 93     # print( 'add ' + hashlib.md5(cur_url).hexdigest() + ' to list')
 94 
 95     # save page and set bloomfilter
 96     dumd5 = hashlib.md5(cur_url.encode('utf8')).hexdigest()
 97     crawler.downloaded_urls.append(dumd5)
 98     crawler.dumd5_file.write(dumd5 + '\r\n')
 99     crawler.du_file.write(cur_url + '\r\n')
100     crawler.bloom_downloaded_urls.add(dumd5)
101     num_downloaded_pages += 1
102 
103     html = etree.HTML(html_page.lower().decode('utf-8'))
104     hrefs = html.xpath(u"//a")
105 
106     for href in hrefs:
107         try:
108             if 'href' in href.attrib:
109                 val = href.attrib['href']
110                 if val.find('javascript:') != -1:
111                     continue
112                 if val.startswith('http://') is False:
113                     if val.startswith('/'):
114                         val = 'http://www.mafengwo.cn' + val
115                     else:
116                         continue
117                 if val[-1] == '/':
118                     val = val[0:-1]
119                 # if hashlib.md5(val).hexdigest() not in self.downloaded_urls:
120                 crawler.enqueueUrl(val)
121                 # else:
122                     # print( 'Skip %s' % (val))
123         except ValueError:
124             continue
125 
126 def start_crawl():
127     # if it's the first page (start url), if true, crawl it in main thread in sync(blocking) mode
128     # 如果是第一个抓取页面的话,在主线程用同步(阻塞)的模式下载,后续的页面会通过创建子线程的方式异步爬取
129     is_root_page = True
130     threads = []
131     max_threads = 10
132 
133     CRAWL_DELAY = 0.6
134 
135     while True:
136         url = crawler.dequeuUrl()
137         # Go on next level, before that, needs to wait all current level crawling done
138         if url is None:
139             crawler.cur_level += 1
140             for t in threads:
141                 t.join()
142             if crawler.cur_level == crawler.max_level:
143                 break
144             if len(crawler.child_queue) == 0:
145                 break
146             crawler.cur_queue = crawler.child_queue
147             crawler.child_queue = deque()
148             continue
149 
150         # looking for an empty thread from pool to crawl
151         if is_root_page is True:
152             get_page_content(url)
153             is_root_page = False
154         else:
155             while True:    
156                 # first remove all finished running threads
157                 for t in threads:
158                     if not t.is_alive():
159                         threads.remove(t)
160                 if len(threads) >= max_threads:
161                     time.sleep(CRAWL_DELAY)
162                     continue
163                 try:
164                     t = threading.Thread(target=get_page_content, name=None, args=(url,))
165                     threads.append(t)
166                     # set daemon so main thread can exit when receives ctrl-c
167                     t.setDaemon(True)
168                     t.start()
169                     time.sleep(CRAWL_DELAY)
170                     break
171                 except Exception as err:
172                     print( "Error: unable to start thread", err)
173                     raise
174 
175 if __name__ == '__main__':
176     start_time = time.time()
177     dir_name = 'htmls'
178     # 检查用于存储网页文件夹是否存在,不存在则创建
179     if not os.path.exists(dir_name):
180         os.makedirs(dir_name)
181 
182     crawler = CrawlBSF("http://www.mafengwo.cn", dir_name)
183     start_crawl()
184     print( '%d pages downloaded, time cost %0.2f seconds' % (num_downloaded_pages, time.time()-start_time))
View Code

 

 多进程

  • 目的
    • 控制线程数量
    • 对线程进行隔离,减少资源竞争
    • 某些环境下,单机上用多个IP伪装
  • 局限性
    • 不能突破网络瓶颈
    • 单机单IP情况下没有意义(常用于分布式)
    • 数据交换的代价更大
  • 进程间通信(IPC)
    • 管道(PIPE)
    • 信号(Signal):复杂
    • 消息队列:Posix及system V
    • 共享内存:速度最快,需要结合信号量达到进程间同步及互斥
    • 信号量:用于数据同步
    • Socket:可标准化,用于多机
  • 解决方式
    • C/S模式
      • 一个服务进程,入队及出队URL,入队需检查是否已经下载
      • 监控目前爬取状态、进度
      • 多个爬取进程,从服务进程获取URL,并将新的URL返回给服务进程
      • 使用Socket做IPC
      • 优势:运行速度块,扩展方便
    • 数据库模式
      • 多线程读取数据库中的 url 列表,把数据库当队列用
      • 多个爬取进程,url 的获取与增加都通过数据库操作
      • 优势:开发便捷,只需写一个爬虫程序
    • 实现
      • MySQLConnectionPool管理多线程下的mysql数据库连接
      • __init__类实例的时候自动检查和创建数据库及表
      • Cursor类
      • SELECT...FOR UPDATE 加读锁,避免多个进程取出同一个url
      • cursor.commit():支持事务,默认关闭了autocommit,需要提交
    • 表字段
      • status:下载状态
      • md5:url 地址的 md5 值
      • depth
      • queue_time
      • done_time      

 

dbmanager

  1 import mysql.connector
  2 import hashlib
  3 from mysql.connector import errorcode
  4 
  5 
  6 class CrawlDatabaseManager:
  7 
  8     DB_NAME = 'mfw_pro_crawl'
  9 
 10     SERVER_IP = 'localhost'
 11 
 12     TABLES = {}
 13     # create new table, using sql
 14     TABLES['urls'] = (
 15         "CREATE TABLE `urls` ("
 16         "  `index` int(11) NOT NULL AUTO_INCREMENT," # index of queue
 17         "  `url` varchar(512) NOT NULL,"
 18         "  `md5` varchar(32) NOT NULL,"
 19         "  `status` varchar(11) NOT NULL DEFAULT 'new'," # could be new, downloading and finish
 20         "  `depth` int(11) NOT NULL,"
 21         "  `queue_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,"
 22         "  `done_time` timestamp NOT NULL DEFAULT 0 ON UPDATE CURRENT_TIMESTAMP,"
 23         "  PRIMARY KEY (`index`),"
 24         "  UNIQUE KEY `md5` (`md5`)"
 25         ") ENGINE=InnoDB")
 26 
 27 
 28     def __init__(self, max_num_thread):
 29         # connect mysql server
 30         try:
 31             self.max_num_thread = max_num_thread
 32             cnx = mysql.connector.connect(host=self.SERVER_IP, user='root', password='amei')
 33         except mysql.connector.Error as err:
 34             if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
 35                 print( "Something is wrong with your user name or password")
 36             elif err.errno == errorcode.ER_BAD_DB_ERROR:
 37                 print( "Database does not exist")
 38             else:
 39                 print( 'Create Error ' + err.msg)
 40             exit(1)
 41 
 42         cursor = cnx.cursor()
 43 
 44         # use database, create it if not exist
 45         try:
 46             cnx.database = self.DB_NAME
 47         except mysql.connector.Error as err:
 48             if err.errno == errorcode.ER_BAD_DB_ERROR:
 49                 # create database and table
 50                 self.create_database(cursor)
 51                 cnx.database = self.DB_NAME
 52                 self.create_tables(cursor)
 53             else:
 54                 print( err)
 55                 exit(1)
 56         finally:
 57             cursor.close()
 58             cnx.close()
 59 
 60         self.dbconfig = {
 61             "database": self.DB_NAME,
 62             "user":     "root",
 63             "host":     self.SERVER_IP,
 64             "password": "amei"
 65         }
 66 
 67         # self.cnxpool = mysql.connector.connect(pool_name="mypool",
 68         #                                     pool_size=max_num_thread,
 69         #                                     **dbconfig)
 70 
 71 
 72     # create databse
 73     def create_database(self, cursor):
 74         try:
 75             cursor.execute(
 76                 "CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(self.DB_NAME))
 77         except mysql.connector.Error as err:
 78             print( "Failed creating database: {}".format(err))
 79             exit(1)
 80 
 81     def create_tables(self, cursor):
 82         for name, ddl in self.TABLES.items():
 83             try:
 84                 cursor.execute(ddl)
 85             except mysql.connector.Error as err:
 86                 if err.errno == errorcode.ER_TABLE_EXISTS_ERROR:
 87                     print( 'create tables error ALREADY EXISTS')
 88                 else:
 89                     print( 'create tables error ' + err.msg)
 90             else:
 91                 print( 'Tables created')
 92 
 93 
 94     # put an url into queue
 95     def enqueueUrl(self, url, depth):
 96         con = mysql.connector.connect(pool_name="mypool",
 97                                             pool_size=self.max_num_thread,
 98                                             **self.dbconfig)
 99         cursor = con.cursor()
100         try:
101             add_url = ("INSERT INTO urls (url, md5, depth) VALUES (%s, %s, %s)")
102             data_url = (url, hashlib.md5(url.encode('utf8')).hexdigest(), depth)
103             cursor.execute(add_url, data_url)
104             # commit this transaction, please refer to "mysql transaction" for more info
105             con.commit()
106         except mysql.connector.Error as err:
107             # print( 'enqueueUrl() ' + err.msg)
108             return
109         finally:
110             cursor.close()
111             con.close()
112 
113 
114     # get an url from queue
115     def dequeueUrl(self):
116         con = mysql.connector.connect(pool_name="mypool",
117                                             pool_size=self.max_num_thread,
118                                             **self.dbconfig)
119         cursor = con.cursor(dictionary=True)
120         try:
121             # use select * for update to lock the rows for read
122             query = ("SELECT `index`, `url`, `depth` FROM urls WHERE status='new' ORDER BY `index` ASC LIMIT 1 FOR UPDATE")
123             cursor.execute(query)
124             if cursor.rowcount is 0:
125                 return None
126             row = cursor.fetchone()
127             update_query = ("UPDATE urls SET `status`='downloading' WHERE `index`=%d") % (row['index'])
128             cursor.execute(update_query)
129             con.commit()
130             return row
131         except mysql.connector.Error as err:
132             print( 'dequeueUrl() ' + err.msg)
133             return None
134         finally:
135             cursor.close()
136             con.close()
137 
138     def finishUrl(self, index):
139         con = mysql.connector.connect(pool_name="mypool",
140                                             pool_size=self.max_num_thread,
141                                             **self.dbconfig)
142         cursor = con.cursor()
143         try:
144             # we don't need to update done_time using time.strftime('%Y-%m-%d %H:%M:%S') as it's auto updated
145             update_query = ("UPDATE urls SET `status`='done' WHERE `index`=%d") % (index)
146             cursor.execute(update_query)
147             con.commit()
148         except mysql.connector.Error as err:
149             print( 'finishUrl() ' + err.msg)
150             return
151         finally:
152             cursor.close()
153             con.close()
View Code

process_crawl

  1 import urllib3
  2 from collections import deque
  3 import json
  4 from lxml import etree
  5 from bloom_filter import BloomFilter
  6 import threading
  7 import time
  8 from dbmanager import CrawlDatabaseManager
  9 
 10 from mysql.connector import errorcode
 11 import mysql.connector
 12 
 13 import os
 14 
 15 request_headers = {
 16     'host': "www.mafengwo.cn",
 17     'connection': "keep-alive",
 18     'cache-control': "no-cache",
 19     'upgrade-insecure-requests': "1",
 20     'user-agent': "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.95 Safari/537.36",
 21     'accept': "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
 22     'accept-language': "zh-CN,en-US;q=0.8,en;q=0.6"
 23 }
 24 
 25 def get_page_content(cur_url, index, depth):
 26     print( "downloading %s at level %d" % (cur_url, depth))
 27     try:
 28         http = urllib3.PoolManager()
 29         r = http.request('GET', cur_url, headers=request_headers)
 30         html_page = r.data
 31         filename = cur_url[7:].replace('/', '_')
 32         fo = open("%s%s.html" % (dir_name, filename), 'wb+')
 33         fo.write(html_page)
 34         fo.close()
 35         dbmanager.finishUrl(index)
 36     except urllib3.exceptions as err:
 37         print('HttpError: ' + err)
 38         return
 39     except IOError as err:
 40         print('IOError: ' + err)
 41         return
 42     except Exception as err:
 43         print('Exception: ' + err)
 44         return
 45     # print( 'add ' + hashlib.md5(cur_url.encode('utf8')).hexdigest() + ' to list')
 46 
 47     html = etree.HTML(html_page.lower().decode('utf-8'))
 48     hrefs = html.xpath(u"//a")
 49 
 50     for href in hrefs:
 51         try:
 52             if 'href' in href.attrib:
 53                 val = href.attrib['href']
 54                 if val.find('javascript:') != -1:
 55                     continue
 56                 if val.startswith('http://') is False:
 57                     if val.startswith('/'):
 58                         val = 'http://www.mafengwo.cn' + val
 59                     else:
 60                         continue
 61                 if val[-1] == '/':
 62                     val = val[0:-1]
 63                 dbmanager.enqueueUrl(val, depth + 1)
 64 
 65         except ValueError:
 66             continue
 67 
 68 
 69 max_num_thread = 5
 70 
 71 # create instance of Mysql database manager, which is used as a queue for crawling
 72 dbmanager = CrawlDatabaseManager(max_num_thread)
 73 
 74 # dir for saving HTML files
 75 dir_name = 'dir_process/'
 76 
 77 if os.path.exists(dir_name) is False:
 78     os.mkdir(dir_name)
 79 
 80 # put first page into queue
 81 dbmanager.enqueueUrl("http://www.mafengwo.cn", 0)
 82 start_time = time.time()
 83 is_root_page = True
 84 threads = []
 85 
 86 # time delay before a new crawling thread is created
 87 # use a delay to control the crawling rate, avoiding visiting target website too frequently
 88 # 设置超时,控制下载的速率,避免太过频繁访问目标网站
 89 CRAWL_DELAY = 0.6
 90 
 91 
 92 while True:
 93     curtask = dbmanager.dequeueUrl()
 94     print ("dequeue")
 95     # Go on next level, before that, needs to wait all current level crawling done
 96     if curtask is None:
 97         print ("no task")
 98         for t in threads:
 99             t.join()
100         break
101 
102     # looking for an empty thread from pool to crawl
103 
104     if is_root_page is True:
105         get_page_content(curtask['url'], curtask['index'], curtask['depth'])
106         is_root_page = False
107     else:
108         while True:    
109             # first remove all finished running threads
110             for t in threads:
111                 if not t.is_alive():
112                     threads.remove(t)
113             if len(threads) >= max_num_thread:
114                 time.sleep(CRAWL_DELAY)
115                 continue
116             try:
117                 t = threading.Thread(target=get_page_content, name=None, args=(curtask['url'], curtask['index'], curtask['depth']))
118                 threads.append(t)
119                 # set daemon so main thread can exit when receives ctrl-c
120                 t.setDaemon(True)
121                 t.start()
122                 time.sleep(CRAWL_DELAY)
123                 break
124             except Exception as err :
125                 print( "Error: unable to start thread", err )
126                 raise
View Code

 

分布式(多台机器)

  • QPS < 2 ,单机即可,没必要做集群
  • 演变
    • A program->A process->A message->A packet->
    • A protocol->A network->A component->A distributed system

  • 优点
    • 高容错/高可用/可恢复/持久/可伸缩/可预测
  • 分布式爬虫作用
    • 解决目标地址对IP访问频率的限制
    • 利用更高的带宽,提高下载速度
    • 大规模系统的分布式存储和备份
    • 数据扩展能力
  • 爬虫原始数据特征
    • 文件小,kb 级别
    • 文件数量大
    • 增量方式一次性写入,极少需要修改
    • 顺序读取
    • 并发文件读写
    • 可扩展
  • Master-Slave 结构
    • 一个主机,对所有的服务器进行管理(云服务)
    • 爬虫服务器多的时候,通过一个中心节点对从节点进行管理
    • 对整体的爬取进行控制
    • 爬虫之间信息共享
    • 负载控制

 

  • RPC

  • Socket
    • 三次握手,建立TCP连接
    • 建立好之后,keep-alive
    • 服务器绑定端口(ftp 21/http 80/mySQL 3306)
    • 客户端向服务器指定端口发送请求
    • 服务器处理请求后返回给客户端
    • 非阻塞监听:send和recv都会立即返回
    • IPC转成数据流(二进制或字符串)传播
    • 使用socket通信,客户端即可在一台电脑上(多进程),也可在多台电脑上
  • 思路
    • master+client:管理分布式爬虫,在一台电脑上启动多个client连master
    • master 是服务器主程序,client 是客户端爬虫
    • client 中的 heartbeat 负责和底层通信
    • socket.server 和 socket.client 位于TCP层,负责底层通信
    • mongo 部署在 server
    • client 访问 mongo 获取任务
    • client<-> socket.client<->socket.server<->master
  • 结构
    • socket.server:通信客户端
    • socket.client:通信服务端
    • master:注册 server,传递回调函数,处理不同消息类型,管理client
    • client:注册 client,向 master 发送心跳,从数据库拿任务爬取
    • protocol_contants:通信协议
    • mongo_mgr:monge 存储任务队列

master

  1 import hashlib
  2 
  3 from socket_server import ServerSocket
  4 import protocol_constants as pc
  5 import json
  6 import time
  7 import _thread
  8 
  9 from mongo_mgr import MongoManager
 10 
 11 import signal
 12 import sys
 13 
 14 constants = {
 15     'reorder_period': 1200, # 20 mins
 16     'connection_lost_period': 30, # 30s
 17     'status_check_intervel': 5, # 5 sec
 18 }
 19 
 20 class CrawlMaster:
 21     clients = {}
 22 
 23     server_status = pc.STATUS_RUNNING
 24 
 25     last_rereoder_time = time.time()
 26 
 27     mongo_mgr = MongoManager()
 28 
 29     def __init__(self, mongo_client = None, mongo_host='localhost'):
 30         self.server = ServerSocket(self.on_message)
 31         self.server.start()
 32 
 33     def on_message(self, msg):
 34         print( 'Heart Beat request', msg)
 35         request = json.loads(msg)
 36         type = request[pc.MSG_TYPE]
 37         client_state = {}
 38         response = {}
 39         response[pc.SERVER_STATUS] = self.server_status
 40         if type == pc.REGISTER:
 41             client_id = self.get_free_id()
 42             client_state['status'] = pc.STATUS_RUNNING
 43             client_state['time'] = time.time()
 44             self.clients[client_id] = client_state
 45             return client_id
 46         elif type == pc.UNREGISTER:
 47             client_id = request.get(pc.CLIENT_ID)
 48             del self.clients[client_id]
 49             return json.dumps(response)
 50         elif type == pc.LOCATIONS:
 51             items = self.mongo_mgr.dequeueItems(request[pc.REQUEST_SIZE])
 52             response[pc.MSG_TYPE] = pc.LOCATIONS
 53             response[pc.CRAWL_DELAY] = 2
 54             response[pc.DATA] = json.dumps(items)
 55             return json.dumps(response)
 56         elif type == pc.TRIPLES:
 57             items = self.mongo_mgr.dequeueItems(request[pc.REQUEST_SIZE])
 58             response[pc.MSG_TYPE] = pc.LOCATIONS
 59             response[pc.DATA] = json.dumps(items)
 60             return json.dumps(response)
 61         
 62         client_id = request.get(pc.CLIENT_ID)
 63         if client_id is None:
 64             response[pc.ERROR] = pc.ERR_NOT_FOUND
 65             return json.dumps(response)
 66         if type == pc.HEARTBEAT:
 67             if self.server_status is not self.clients[client_id]['status']:
 68                 if self.server_status == pc.STATUS_RUNNING:
 69                     response[pc.ACTION_REQUIRED] = pc.RESUME_REQUIRED
 70                 elif self.server_status == pc.STATUS_PAUSED:
 71                     response[pc.ACTION_REQUIRED] = pc.PAUSE_REQUIRED
 72                 elif self.server_status == pc.STATUS_SHUTDOWN:
 73                     response[pc.ACTION_REQUIRED] = pc.SHUTDOWN_REQUIRED
 74                 return json.dumps(response)
 75         else:
 76             client_state['status'] = type
 77             client_state['time'] = time.time()
 78             self.clients[client_id] = client_state
 79 
 80         return json.dumps(response)
 81 
 82     def get_free_id(self):
 83         i = 0
 84         for key in self.clients:
 85             if i < int(key):
 86                 break
 87             i += 1
 88         return str(i)
 89 
 90 
 91     def reorder_queue(self):
 92         g = nx.DiGraph()
 93         cursor = self.db.urlpr.find()
 94         for site in cursor:
 95             url = site['url']
 96             links = site['links']
 97             for link in links:
 98                 g.add_edge(url, link)
 99         pageranks = nx.pagerank(g, 0.9)
100         for url, pr in pageranks.iteritems():
101             print( 'updating %s pr: %f' % (url, pr))
102             record = {'pr': pr}
103             self.db.mfw.update_one({'_id': hashlib.md5(url.encode('utf8')).hexdigest()}, {'$set': record}, upsert=False)
104 
105 
106     def periodical_check(self):
107         while True:
108             clients_status_ok = True
109 
110             if self.is_reordering is False and time.time() - self.last_rereoder_time > constants['reorder_period']:
111                 self.server_status = pc.STATUS_PAUSED
112                 self.is_reordering = True
113             
114             for cid, state in self.clients.iteritems():
115                 # no heart beat for 2 mins, remove it
116                 if time.time() - state['time'] > constants['connection_lost_period']:
117                     # remove it from client list 
118                     # del client[cid]
119                     # set client status to be CONNECTION_LIST
120                     self.clients[cid]['status'] = pc.STATUS_CONNECTION_LOST
121                     continue
122 
123                 if state['status'] != self.server_status:
124                     clients_status_ok = False
125                     break
126 
127             if clients_status_ok and self.server_status == pc.STATUS_PAUSED and self.is_reordering:
128                 self.reorder_queue()
129                 self.last_rereoder_time = time.time()
130                 is_reordering = False
131                 self.server_status = pc.STATUS_RUNNING
132 
133             time.sleep(constants['status_check_intervel'])
134 
135 def exit_signal_handler(signal, frame):
136     crawl_master.server.close()
137     sys.exit(1)
138 
139 crawl_master = CrawlMaster()
140 
141 _thread.start_new_thread(crawl_master.periodical_check, ())
142 
143 signal.signal(signal.SIGINT, exit_signal_handler)
144 signal.pause()
View Code

client_crawler

  1 from lxml import etree
  2 import threading
  3 import time
  4 from mongo_redis_mgr import MongoRedisUrlManager
  5 import argparse
  6 import socket
  7 
  8 import urllib3
  9 
 10 import os
 11 
 12 # from hdfs import *
 13 # from hdfs.util import HdfsError
 14 from socket_client import SocketClient
 15 import protocol_constants as pc
 16 import json
 17 
 18 import argparse
 19 
 20 class arguments:
 21     pass
 22 
 23 def parse_app_arguments():
 24     parser = argparse.ArgumentParser(prog='CrawlerClient', description='Start a crawler client')
 25     parser.add_argument('-h', '--host', type=str, nargs=1, help='Crawler host server address, default is localhost')
 26     parser.add_argument('-p', '--host-port', type=int, nargs=1, help='Crawler host server port number, default is 20100')
 27     parser.add_argument('-m', '--mongo', type=str, nargs=1, help='Mongo Server address, default is localhost')
 28     parser.add_argument('-n', '--mongo-port', type=int, nargs=1, help='Mongo port number, default is 27017')
 29     parser.add_argument('-r', '--redis', type=str, nargs=1, help='Redis server address, default is localhost')
 30     parser.add_argument('-x', '--redis-port', type=int, nargs=1, help='Redis port number, default is 6379')
 31     parser.add_argument('-s', '--server', type=str, nargs=1, help='Server address for all services, including mongo, redis and spider')
 32 
 33     args = arguments()
 34 
 35     parser.parse_args(namespace=args)
 36 
 37     if args.server is not None:
 38         args.host = args.mongo = args.redis = args.server
 39 
 40     if args.host is None:
 41         args.host = 'localhost'
 42 
 43     if args.mongo is None:
 44         args.mongo = 'localhost'
 45 
 46     if args.redis is None:
 47         args.redis = 'localhost'
 48 
 49     if args.host_port is None:
 50         args.host_port = 9999
 51 
 52     if args.mongo_port is None:
 53         args.mongo_port = 27017
 54 
 55     if args.redis_port is None:
 56         args.redis_port = 6379 
 57 
 58 parse_app_arguments()
 59 
 60 
 61 def get_page_content(cur_url, depth):
 62     global dir_name, dbmanager
 63 
 64     print( "downloading %s at level %d" % (cur_url, depth))
 65     links = []
 66     try:
 67         http = urllib3.PoolManager()
 68         r = http.request('GET', cur_url, headers = request_headers)
 69         filename = cur_url[7:].replace('/', '_')
 70 
 71         #Write page to local files system
 72         fo = open("%s%s.html" % (dir_name, filename), 'wb+')
 73         fo.write(r.data)
 74         fo.close()
 75         dbmanager.finishUrl(cur_url)
 76     except IOError as err:
 77         print( "get_page_content()", err )
 78         raise
 79     except Exception as err :
 80         print( "get_page_content()", err )
 81         raise
 82 
 83     html = etree.HTML(r.data.lower().decode('utf-8'))
 84     hrefs = html.xpath(u"//a")
 85 
 86     for href in hrefs:
 87         try:
 88             if 'href' in href.attrib:
 89                 val = href.attrib['href']
 90                 if val.find('javascript:') != -1:
 91                     continue
 92                 if val.startswith('http://') is False:
 93                     if val.startswith('/'):
 94                         val = 'http://www.mafengwo.cn' + val
 95                     else:
 96                         continue
 97                 if val[-1] == '/':
 98                     val = val[0:-1]
 99                 links.append(val)
100                 dbmanager.enqueueUrl(val, 'new', depth+1)
101         except ValueError:
102             continue
103 
104     dbmanager.set_url_links(cur_url, links)
105 
106 def heartbeat():
107     global server_status, run_heartbeat, client_id, hb_period
108     skip_wait = False
109     while run_heartbeat:
110         if skip_wait is False:
111             time.sleep(hb_period)
112         else:
113             skip_wait = False
114         try:
115             hb_request = {}
116             hb_request[pc.MSG_TYPE] = pc.HEARTBEAT
117             hb_request[pc.CLIENT_ID] = client_id
118             print("sending a heartbeat! ", str(hb_request))
119             hb_response_data = socket_client.send(json.dumps(hb_request))
120 
121             # should be network error
122             if hb_response_data is None:
123                 continue
124             
125             # print( 'Heart Beat response', json.dumps(hb_response_data))
126             response = json.loads(hb_response_data)
127 
128             err = response.get(pc.ERROR)
129             if err is not None:
130                 if err == pc.ERR_NOT_FOUND:
131                     register_request = {}
132                     register_request[pc.MSG_TYPE] = pc.REGISTER
133                     client_id = socket_client.send(json.dumps(register_request))
134 
135                     # skip heartbeat period and send next heartbeat immediately
136                     skip_wait = True
137                     heartbeat()
138                     return
139                 return
140 
141             action = response.get(pc.ACTION_REQUIRED)
142             if action is not None:
143                 action_request = {}
144                 if action == pc.PAUSE_REQUIRED:
145                     server_status = pc.PAUSED
146                     action_request[pc.MSG_TYPE] = pc.PAUSED
147                 elif action == pc.PAUSE_REQUIRED:
148                     server_status = pc.RESUMED
149                     action_request[pc.MSG_TYPE] = pc.RESUMED
150                 elif action == pc.SHUTDOWN_REQUIRED:
151                     server_status = pc.SHUTDOWN
152                     # stop heartbeat thread
153                     return
154                 action_request[pc.CLIENT_ID] = client_id
155                 socket_client.send(json.dumps(action_request))
156             else:
157                 server_status = response[pc.SERVER_STATUS]
158 
159         except socket.error as msg:
160             print ("heartbeat error: ", msg)
161             server_status = pc.STATUS_CONNECTION_LOST
162             raise
163 
164 def start_heart_beat_thread():
165     try:
166         t = threading.Thread(target=heartbeat, name=None)
167         # set daemon so main thread can exit when receives ctrl-c
168         t.setDaemon(True)
169         t.start()
170     except Exception as err:
171         print( "Error: unable to start thread", err)
172         raise
173 
174 def crawl():
175     # thread pool size
176     max_num_thread = 5
177     CRAWL_DELAY = 2
178     global dbmanager, is_root_page, threads, hb_period
179 
180     while True:
181         if server_status == pc.STATUS_PAUSED:
182             time.sleep(hb_period)
183             continue
184         if server_status == pc.SHUTDOWN:
185             run_heartbeat = False
186             for t in threads:
187                 t.join()
188             break
189         try:
190             curtask = dbmanager.dequeueUrl()
191         except Exception:
192             time.sleep(hb_period)
193             continue
194         
195         # Go on next level, before that, needs to wait all current level crawling done
196         if curtask is None:
197             time.sleep(hb_period)
198             continue
199         else:
200             print( 'current task is: ', curtask['url'], "at depth: ", curtask['depth'])
201 
202         # looking for an empty thread from pool to crawl
203 
204         if is_root_page is True:
205             get_page_content(curtask['url'], curtask['depth'])
206             is_root_page = False
207         else:
208             while True:    
209                 # first remove all finished running threads
210                 for t in threads:
211                     if not t.is_alive():
212                         threads.remove(t)
213                 if len(threads) >= max_num_thread:
214                     time.sleep(CRAWL_DELAY)
215                     continue
216                 try:
217                     t = threading.Thread(target=get_page_content, name=None, args=(curtask['url'], curtask['depth']))
218                     threads.append(t)
219                     # set daemon so main thread can exit when receives ctrl-c
220                     t.setDaemon(True)
221                     t.start()
222                     time.sleep(CRAWL_DELAY)
223                     break
224                 except Exception as err:
225                     print( "Error: unable to start thread", err)
226                     raise
227 def finish():
228     global client_id
229     shutdown_request = {}
230     shutdown_request[pc.MSG_TYPE] = pc.SHUTDOWN
231     shutdown_request[pc.CLIENT_ID] = client_id
232     socket_client.send(json.dumps(shutdown_request))
233 
234 
235 def init():
236     global client_id
237 
238     if os.path.exists(dir_name) is False:
239         os.mkdir(dir_name)
240     dbmanager.clear()
241     dbmanager.enqueueUrl('http://www.mafengwo.cn', 'new', 0 )
242 
243     register_request = {}
244     register_request[pc.MSG_TYPE] = pc.REGISTER
245     client_id = socket_client.send(json.dumps(register_request))
246 
247 
248 # initialize global variables
249 request_headers = {
250     'host': "www.mafengwo.cn",
251     'connection': "keep-alive",
252     'cache-control': "no-cache",
253     'upgrade-insecure-requests': "1",
254     'user-agent': "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.95 Safari/537.36",
255     'accept': "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
256     'accept-language': "zh-CN,en-US;q=0.8,en;q=0.6"
257 }
258 
259 
260 # Initialize system variables
261 dir_name = 'mfw/'
262 
263 # db manager
264 dbmanager = MongoRedisUrlManager()
265 
266 is_root_page = True
267 threads = []
268 
269 # use hdfs to save pages
270 # hdfs_client = InsecureClient('http://54.223.92.169:50070', user='ec2-user')
271 
272 socket_client = SocketClient('localhost', 20010)
273 client_id = 0
274 
275 hb_period = 5
276 run_heartbeat = True
277 server_status = pc.STATUS_RUNNING
278 
279 init()
280 start_heart_beat_thread()
281 crawl()
282 finish()
View Code

socket.server

 1 import socket
 2 import sys
 3 import _thread
 4 
 5 import signal
 6 
 7 class ServerSocket:
 8 
 9     # @param callback callback function for handling received data
10     # @param host Symbolic name meaning all available interfaces
11     # @param port Arbitrary non-privileged port
12     def __init__(self, callback, host='localhost', port=20010):
13         self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
14         self.callback = callback
15         # print( 'Socket created')
16  
17         #Bind socket to local host and port
18         try:
19             print("bind ", port)
20             self.s.bind((host, port ))
21         except socket.error as msg:
22             print(msg)
23             sys.exit()
24 
25         # print( 'Socket bind complete')
26          
27         #Start listening on socket
28         self.s.listen(10)
29         # print( 'Socket now listening')
30 
31     def startlistening(self):
32         #now keep talking with the client
33         while True:
34             print( 'Waiting for new connection ... ')
35             # wait to accept a connection - blocking call
36             conn, addr = self.s.accept()
37 
38             # print( 'Connected with ' + addr[0] + ':' + str(addr[1]))
39              
40             #start new thread takes 1st argument as a function name to be run, second is the tuple of arguments to the function.
41             _thread.start_new_thread(self.clientthread ,(conn,))
42      
43     #Function for handling connections. This will be used to create threads
44     def clientthread(self, conn):
45         #Sending message to connected client
46         # conn.send('Welcome to the server. Type something and hit enter\n') #send only takes string
47          
48         #infinite loop so that function do not terminate and thread do not end.
49 
50         #Receiving from client
51         data = conn.recv(1024)
52         reply = self.callback(data.decode('utf8'))
53 
54         # print( 'server sends ' + reply)
55 
56         conn.sendall(reply.encode('utf8'))
57      
58         conn.close()
59 
60 
61     def start(self):
62         _thread.start_new_thread(self.startlistening, ())
63      
64     def close(self):
65         # self.s.shutdown(socket.SHUT_WR)
66         self.s.close()
67 
68 def msg_received(data):
69     return 'Ack'
70 
71 def exit_signal_handler(signal, frame):
72     pass
73 
74 if __name__ == '__main__':
75     server = ServerSocket(msg_received)
76     server.start()
77     signal.signal(signal.SIGINT, exit_signal_handler)
78     signal.pause()
79     server.close()
80     sys.exit(1)
View Code

socket.client

 1 import socket
 2 import sys
 3 
 4 class SocketClient:
 5     def __init__(self, server_ip, server_port):
 6         self.server_ip = server_ip
 7         self.server_port = server_port
 8 
 9         self.families = self.get_constants('AF_')
10         self.types = self.get_constants('SOCK_')
11         self.protocols = self.get_constants('IPPROTO_')
12 
13         # print( >>sys.stderr, 'Family  :', families[sock.family])
14         # print( >>sys.stderr, 'Type    :', types[sock.type])
15         # print( >>sys.stderr, 'Protocol:', protocols[sock.proto])
16         # print( >>sys.stderr)
17         
18     def get_constants(self, prefix):
19         """Create a dictionary mapping socket module constants to their names."""
20         return dict( (getattr(socket, n), n)
21                      for n in dir(socket)
22                      if n.startswith(prefix)
23                      )
24 
25     def send(self, message):
26         try:
27             # Create a TCP/IP socket
28             print ("connecting to ", self.server_port)
29             self.sock = socket.create_connection((self.server_ip, self.server_port))
30             # Send data
31             print( 'connected! client sends ', message)
32             self.sock.sendall(message.encode('utf8'))
33 
34             data = self.sock.recv(1024)
35 
36             return data.decode('utf8')
37         except Exception as err:
38             print( 'Get Error Message: ', err ) #Error Code : ' + str(msg[0]) + ' Message ' + msg[1])
39             return None
40         finally:
41             if hasattr(self, 'sock'):
42                 self.sock.close()
View Code

protocol_contants

 1 # msg type, could be REGISTER, UNREGISTER and HEARTBEAT
 2 MSG_TYPE    = 'TYPE'
 3 
 4 # send register
 5 REGISTER     = 'REGISTER'
 6 
 7 # unregister client with id assigned by master
 8 UNREGISTER     = 'UNREGISTER'
 9 
10 # send heart beat to server with id
11 HEARTBEAT    = 'HEARTBEAT'
12 
13 # notify master paused with id
14 PAUSED         = 'PAUSED'
15 
16 # notify master resumed with id
17 RESUMED        = 'RESUMED'
18 
19 # notify master resumed with id
20 SHUTDOWN        = 'SHUTDOWN'
21 
22 # get a new location list to crawl
23 LOCATIONS        = 'REQUIRE_LOCATION_LIST'
24 
25 # get a new triple list to crawl
26 TRIPLES      = 'TRIPLES'
27 
28 DATA = 'DATA'
29 
30 CRAWL_DELAY = 'CRAWL_DELAY'
31 
32 # finished list of item
33 FININSHED_ITEMS = 'FINISHED_ITEMS'
34 
35 # client id key word
36 CLIENT_ID     = 'CLIENT_ID'
37 
38 # server status key word
39 ACTION_REQUIRED    = 'ACTION_REQUIRED'
40 
41 # server require pause
42 PAUSE_REQUIRED    = 'PAUSE_REQUIRED'
43 
44 # server require pause
45 RESUME_REQUIRED    = 'RESUME_REQUIRED'
46 
47 # server require shutdown
48 SHUTDOWN_REQUIRED    = 'SHUTDOWN_REQUIRED'
49 
50 # server status key word
51 SERVER_STATUS    = 'SERVER_STATUS'
52 
53 # server status values
54 STATUS_RUNNING    = 'STATUS_RUNNING'
55 
56 STATUS_PAUSED     = 'STATUS_PAUSED'
57 
58 STATUS_SHUTDOWN    = 'STATUS_SHUTDOWN'
59 
60 STATUS_CONNECTION_LOST    = 'STATUS_CONNECTION_LOST'
61 
62 ERROR     = 'ERROR'
63 
64 # client id not found, then it needs to register itself
65 ERR_NOT_FOUND    = 'ERR_NOT_FOUND'
66 
67 REQUEST_SIZE    = 50
View Code

mongo_redis_mgr

 1 import mysql.connector
 2 import hashlib
 3 import time 
 4 from datetime import datetime
 5 from datetime import timedelta
 6 
 7 import redis
 8 from pymongo import MongoClient
 9 from pymongo import IndexModel, ASCENDING, DESCENDING
10 
11 
12 class MongoRedisUrlManager:
13 
14     def __init__(self, server_ip='localhost', client=None, expires=timedelta(days=30)):
15         """
16         client: mongo database client
17         expires: timedelta of amount of time before a cache entry is considered expired
18         """
19         # if a client object is not passed 
20         # then try connecting to mongodb at the default localhost port 
21         self.client = MongoClient(server_ip, 27017) if client is None else client
22         self.redis_client = redis.StrictRedis(host=server_ip, port=6379, db=0) 
23         #create collection to store cached webpages,
24         # which is the equivalent of a table in a relational database
25         self.db = self.client.spider
26 
27         # create index if db is empty
28         if self.db.mfw.count() is 0:
29             self.db.mfw.create_index('status')
30 
31     def dequeueUrl(self):
32         record = self.db.mfw.find_one_and_update(
33             { 'status': 'new'}, 
34             { '$set': { 'status' : 'downloading'} }, 
35             { 'upsert':False, 'returnNewDocument' : False} 
36         )
37         if record:
38             return record
39         else:
40             return None
41 
42     def enqueueUrl(self, url, status, depth):
43         num = self.redis_client.get(url) 
44         if num is not None:
45             self.redis_client.set(url, int(num) + 1 )
46             return
47         self.redis_client.set(url, 1)
48         self.db.mfw.insert({
49             '_id': hashlib.md5(url.encode('utf8')).hexdigest(), 
50             'url': url, 
51             'status': status, 
52             'queue_time': datetime.utcnow(), 
53             'depth': depth
54         })
55 
56     def finishUrl(self, url):
57         record = {'status': 'done', 'done_time': datetime.utcnow()}
58         self.db.mfw.update({'_id': hashlib.md5(url.encode('utf8')).hexdigest()}, {'$set': record}, upsert=False)
59 
60     def clear(self):
61         self.redis_client.flushall()
62         self.db.mfw.drop()
63 
64     
65     def set_url_links(self, url, links):
66         try:
67             self.db.urlpr.insert({
68                 '_id': hashlib.md5(url.encode('utf8')).hexdigest(), 
69                 'url': url, 
70                 'links': links
71             })
72         except Exception as err:
73             pass
View Code

mongo_mgr

 1 import mysql.connector
 2 import hashlib
 3 import time 
 4 from datetime import datetime
 5 from datetime import timedelta
 6 
 7 import redis
 8 from pymongo import MongoClient
 9 from pymongo import IndexModel, ASCENDING, DESCENDING
10 
11 
12 class MongoManager:
13 
14     def __init__(self, server_ip='localhost', client=None, expires=timedelta(days=30)):
15         """
16         client: mongo database client
17         expires: timedelta of amount of time before a cache entry is considered expired
18         """
19         # if a client object is not passed 
20         # then try connecting to mongodb at the default localhost port 
21         self.client = MongoClient(server_ip, 27017) if client is None else client
22         #create collection to store cached webpages,
23         # which is the equivalent of a table in a relational database
24         self.db = self.client.spider
25 
26         # create index if db is empty
27         if self.db.locations.count() is 0:
28             self.db.mfw.create_index([("status", ASCENDING)])
29 
30     def dequeueItems(self, size):
31         records = self.db.mfw.find({'status':'new'}).batch_size(50)
32 
33         ids = []
34         for record in records:
35             ids.append(record['_id'])
36         
37         self.db.mfw.update(
38             {
39                 '_id': { '$in': ids }
40             },
41             {
42                 '$set': {'status': 'downloading'}
43             }
44         )
45 
46         if records:
47             return records
48         else:
49             return None
50 
51     def finishItems(self, ids):
52         self.db.mfw.update(
53             {
54                 '_id': { '$in': ids }
55             },
56             {
57                 '$set': {'status': 'finish'}
58             }
59         )
60 
61     def clear(self):
62         self.db.mfw.drop()
63 
64 if __name__ == '__main__':
65     mongo_mgr = MongoManager()
66     records = mongo_mgr.dequeueItems(5)
View Code

 

 

标签:pc,Python,self,server,url,client,Part.3,import,多线程
From: https://www.cnblogs.com/cxc1357/p/12543325.html

相关文章

  • OpenHarmony stage worker 多线程
    作者:徐金生OpenHarmony存在一个与主线程并行的独立线程--Worker。对于处理耗时操作且不阻塞主线程起到了重要的作用,并且多个线程并发可以提高CPU和内存的利用率。在实际开......
  • Python cjson序列化与反序列化
    cJSONcJSON是一个使用C语言编写的JSON数据解析器,具有超轻便,可移植,单文件的特点,使用MIT开源协议。cJSON项目托管在Github上,仓库地址如下:https://github.com/DaveGamble/c......
  • 多线程--消费者与生产者实例
    多线程实例1.消费者与生产者实例(管程法)产品、消费者、生产者、缓冲区产品,保证有一个唯一标识即可消费者继承Thread,注册缓冲区,从缓冲区消费生产者继承Thread,注册缓冲......
  • python教程:shutil高级文件操作
    1、shutil高级文件操作模块shutil模块提供了大量的文件的高级操作。特别针对文件拷贝和删除,主要功能为目录和文件操作以及压缩操作。对单个文件的操作也可参见os模块。2......
  • 学习python的编程语言
    前言那么多编程语言,为什么学python易于学习,是所有编程语言当中最容易学习的没有最好的语言,只有最合适的语言第一章python基础1.课程整体介绍python编程基础基......
  • Python连接数据库
    1、首先确认本地使用的是Python2还是Python3,它们的mysql插件分别如下:Python2--->MySQLdbPython3 ---> PyMySQL 2、我本地是Python3故需要安装PyMySQL。进入File->......
  • 10个高效的Python爬虫框架
    前言小型爬虫需求,requests库+bs4库就能解决;大型爬虫数据,尤其涉及异步抓取、内容管理及后续扩展等功能时,就需要用到爬虫框架了。(文末送读者福利)下面介绍了10个爬虫框架,大......
  • Python 求两个 list 的交集、并集、差集、和集
    Python求两个list的交集、并集、差集、和集 此处是对list进行运算,而非set。importcollectionsfromfunctoolsimportreducea=[1,2,3,3,4]b=[3,3,4,5......
  • Anaconda : python & Jupyter
    Anaconda软件安装流程-知乎(zhihu.com) 在人工智能实践中,我们可能常常会用到基于Python的集成开发环境——Anaconda。下面我们就简单介绍一下Anaconda和相关软件的安......
  • python开发云主机类型管理脚本
    python开发云主机类型管理脚本开发flavor_manager.py程序,来完成云主机类型管理的相关操作。该文件拥有以下功能:根据命令行参数,创建一个云主机类型,返回response。查询a......