为什么用
- 快
- 反爬虫
多线程
- 复杂性
- 资源、数据的安全性:锁保护
- 原子性:数据操作是天然互斥的
- 同步等待:wait()、notify()、notifyall()
- 死锁:多个线程对资源互锁
- 容灾:任何线程出错,程序都会停止
- Python 多线程
- 支持多线程
- 直接映射到native线程(Java多线程由JVM映射到一个native thread上)
- GIL(Global Interpretor Lock):同一时间只有一个CPU运行,对于多核的利用能力有限(相当于单线程)
- 适用于 IO 阻塞为主的场景,而不是 CPU 阻塞为主的场景
- Python主要用于 offline 数据处理,而不是处理 online 并发的服务请求(C++,Java)
- 实现
- 创建线程池 threads = []
- 确认 url 队列线程安全 Queue Deque
- 从队列取出 url,分配一个线程开始爬取 pop()/get() threading.Thread
- 如果线程池满,循环等待,直到有线程结束
- 从线程池移除已完成下载的线程
- 如当前级别 url 遍历完毕,t.join() 等待所有线程结束,然后开始下一级别的爬取
- 优势
- 内存空间共享,数据交换高效
- CPU使用效率高,利用多个CPU操作
- 开发便捷
- 创建、销毁的开销小
- 减小下载出错、阻塞对抓取速度的影响,提高下载速度
- 对于没有反爬限制的网站,下载速度提升明显
- 不足
- 对于有反爬虫的网站,速度提升有限
- 提高了复杂性,对编码要求高
- 线程越多,每个线程获得时间越少,线程切换更频繁带来额外开销
- 线程之间资源竞争更激烈
multi_thread_mfw
1 # -*- coding: utf-8 -*- 2 3 import urllib3 4 import os 5 from collections import deque 6 import json 7 from lxml import etree 8 import hashlib 9 from bloom_filter import BloomFilter 10 11 import threading 12 import time 13 14 class CrawlBSF: 15 request_headers = { 16 'host': "www.mafengwo.cn", 17 'connection': "keep-alive", 18 'cache-control': "no-cache", 19 'upgrade-insecure-requests': "1", 20 'user-agent': "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.95 Safari/537.36", 21 'accept': "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", 22 'accept-language': "zh-CN,en-US;q=0.8,en;q=0.6" 23 } 24 25 cur_level = 0 26 max_level = 5 27 iter_width = 50 28 downloaded_urls = [] 29 30 def __init__(self, url, dir_name): 31 self.dir_name = dir_name 32 self.du_md5_file_name = dir_name + '/download.txt' 33 self.du_url_file_name = dir_name + '/urls.txt' 34 35 self.bloom_downloaded_urls = BloomFilter(1024 * 1024 * 16, 0.01) 36 self.bloom_url_queue = BloomFilter(1024 * 1024 * 16, 0.01) 37 38 self.cur_queue = deque() 39 self.child_queue = deque() 40 41 self.root_url = url 42 self.cur_queue.append(url) 43 self.du_file = open(self.du_url_file_name, 'a+') 44 try: 45 self.dumd5_file = open(self.du_md5_file_name, 'r') 46 self.downloaded_urls = self.dumd5_file.readlines() 47 self.dumd5_file.close() 48 for urlmd5 in self.downloaded_urls: 49 self.bloom_downloaded_urls.add(urlmd5[:-2]) 50 except IOError: 51 print( "File not found") 52 finally: 53 self.dumd5_file = open(self.du_md5_file_name, 'a+') 54 55 def enqueueUrl(self, url): 56 if url not in self.bloom_url_queue and hashlib.md5(url.encode('utf8')).hexdigest() not in crawler.bloom_downloaded_urls: 57 self.child_queue.append(url) 58 self.bloom_url_queue.add(url) 59 60 def dequeuUrl(self): 61 try: 62 url = self.cur_queue.popleft() 63 return url 64 except IndexError: 65 return None 66 67 def close(self): 68 self.dumd5_file.close() 69 self.du_file.close() 70 71 72 # Global variables 73 num_downloaded_pages = 0 74 75 #download the page content 76 def get_page_content(cur_url): 77 global num_downloaded_pages 78 print( "downloading %s at level %d" % (cur_url, crawler.cur_level)) 79 try: 80 http = urllib3.PoolManager() 81 r = http.request('GET', cur_url, headers = CrawlBSF.request_headers) 82 html_page = r.data 83 filename = cur_url[7:].replace('/', '_') 84 fo = open("%s/%s.html" % (crawler.dir_name, filename), 'wb+') 85 fo.write(html_page) 86 fo.close() 87 except IOError as err: 88 print(err) 89 return 90 except Exception as err: 91 print( err ) 92 return 93 # print( 'add ' + hashlib.md5(cur_url).hexdigest() + ' to list') 94 95 # save page and set bloomfilter 96 dumd5 = hashlib.md5(cur_url.encode('utf8')).hexdigest() 97 crawler.downloaded_urls.append(dumd5) 98 crawler.dumd5_file.write(dumd5 + '\r\n') 99 crawler.du_file.write(cur_url + '\r\n') 100 crawler.bloom_downloaded_urls.add(dumd5) 101 num_downloaded_pages += 1 102 103 html = etree.HTML(html_page.lower().decode('utf-8')) 104 hrefs = html.xpath(u"//a") 105 106 for href in hrefs: 107 try: 108 if 'href' in href.attrib: 109 val = href.attrib['href'] 110 if val.find('javascript:') != -1: 111 continue 112 if val.startswith('http://') is False: 113 if val.startswith('/'): 114 val = 'http://www.mafengwo.cn' + val 115 else: 116 continue 117 if val[-1] == '/': 118 val = val[0:-1] 119 # if hashlib.md5(val).hexdigest() not in self.downloaded_urls: 120 crawler.enqueueUrl(val) 121 # else: 122 # print( 'Skip %s' % (val)) 123 except ValueError: 124 continue 125 126 def start_crawl(): 127 # if it's the first page (start url), if true, crawl it in main thread in sync(blocking) mode 128 # 如果是第一个抓取页面的话,在主线程用同步(阻塞)的模式下载,后续的页面会通过创建子线程的方式异步爬取 129 is_root_page = True 130 threads = [] 131 max_threads = 10 132 133 CRAWL_DELAY = 0.6 134 135 while True: 136 url = crawler.dequeuUrl() 137 # Go on next level, before that, needs to wait all current level crawling done 138 if url is None: 139 crawler.cur_level += 1 140 for t in threads: 141 t.join() 142 if crawler.cur_level == crawler.max_level: 143 break 144 if len(crawler.child_queue) == 0: 145 break 146 crawler.cur_queue = crawler.child_queue 147 crawler.child_queue = deque() 148 continue 149 150 # looking for an empty thread from pool to crawl 151 if is_root_page is True: 152 get_page_content(url) 153 is_root_page = False 154 else: 155 while True: 156 # first remove all finished running threads 157 for t in threads: 158 if not t.is_alive(): 159 threads.remove(t) 160 if len(threads) >= max_threads: 161 time.sleep(CRAWL_DELAY) 162 continue 163 try: 164 t = threading.Thread(target=get_page_content, name=None, args=(url,)) 165 threads.append(t) 166 # set daemon so main thread can exit when receives ctrl-c 167 t.setDaemon(True) 168 t.start() 169 time.sleep(CRAWL_DELAY) 170 break 171 except Exception as err: 172 print( "Error: unable to start thread", err) 173 raise 174 175 if __name__ == '__main__': 176 start_time = time.time() 177 dir_name = 'htmls' 178 # 检查用于存储网页文件夹是否存在,不存在则创建 179 if not os.path.exists(dir_name): 180 os.makedirs(dir_name) 181 182 crawler = CrawlBSF("http://www.mafengwo.cn", dir_name) 183 start_crawl() 184 print( '%d pages downloaded, time cost %0.2f seconds' % (num_downloaded_pages, time.time()-start_time))View Code
多进程
- 目的
- 控制线程数量
- 对线程进行隔离,减少资源竞争
- 某些环境下,单机上用多个IP伪装
- 局限性
- 不能突破网络瓶颈
- 单机单IP情况下没有意义(常用于分布式)
- 数据交换的代价更大
- 进程间通信(IPC)
- 管道(PIPE)
- 信号(Signal):复杂
- 消息队列:Posix及system V
- 共享内存:速度最快,需要结合信号量达到进程间同步及互斥
- 信号量:用于数据同步
- Socket:可标准化,用于多机
- 解决方式
- C/S模式
- 一个服务进程,入队及出队URL,入队需检查是否已经下载
- 监控目前爬取状态、进度
- 多个爬取进程,从服务进程获取URL,并将新的URL返回给服务进程
- 使用Socket做IPC
- 优势:运行速度块,扩展方便
- 数据库模式
- 多线程读取数据库中的 url 列表,把数据库当队列用
- 多个爬取进程,url 的获取与增加都通过数据库操作
- 优势:开发便捷,只需写一个爬虫程序
- 实现
- MySQLConnectionPool管理多线程下的mysql数据库连接
- __init__类实例的时候自动检查和创建数据库及表
- Cursor类
- SELECT...FOR UPDATE 加读锁,避免多个进程取出同一个url
- cursor.commit():支持事务,默认关闭了autocommit,需要提交
- 表字段
- status:下载状态
- md5:url 地址的 md5 值
- depth
- queue_time
- done_time
- C/S模式
dbmanager
1 import mysql.connector 2 import hashlib 3 from mysql.connector import errorcode 4 5 6 class CrawlDatabaseManager: 7 8 DB_NAME = 'mfw_pro_crawl' 9 10 SERVER_IP = 'localhost' 11 12 TABLES = {} 13 # create new table, using sql 14 TABLES['urls'] = ( 15 "CREATE TABLE `urls` (" 16 " `index` int(11) NOT NULL AUTO_INCREMENT," # index of queue 17 " `url` varchar(512) NOT NULL," 18 " `md5` varchar(32) NOT NULL," 19 " `status` varchar(11) NOT NULL DEFAULT 'new'," # could be new, downloading and finish 20 " `depth` int(11) NOT NULL," 21 " `queue_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP," 22 " `done_time` timestamp NOT NULL DEFAULT 0 ON UPDATE CURRENT_TIMESTAMP," 23 " PRIMARY KEY (`index`)," 24 " UNIQUE KEY `md5` (`md5`)" 25 ") ENGINE=InnoDB") 26 27 28 def __init__(self, max_num_thread): 29 # connect mysql server 30 try: 31 self.max_num_thread = max_num_thread 32 cnx = mysql.connector.connect(host=self.SERVER_IP, user='root', password='amei') 33 except mysql.connector.Error as err: 34 if err.errno == errorcode.ER_ACCESS_DENIED_ERROR: 35 print( "Something is wrong with your user name or password") 36 elif err.errno == errorcode.ER_BAD_DB_ERROR: 37 print( "Database does not exist") 38 else: 39 print( 'Create Error ' + err.msg) 40 exit(1) 41 42 cursor = cnx.cursor() 43 44 # use database, create it if not exist 45 try: 46 cnx.database = self.DB_NAME 47 except mysql.connector.Error as err: 48 if err.errno == errorcode.ER_BAD_DB_ERROR: 49 # create database and table 50 self.create_database(cursor) 51 cnx.database = self.DB_NAME 52 self.create_tables(cursor) 53 else: 54 print( err) 55 exit(1) 56 finally: 57 cursor.close() 58 cnx.close() 59 60 self.dbconfig = { 61 "database": self.DB_NAME, 62 "user": "root", 63 "host": self.SERVER_IP, 64 "password": "amei" 65 } 66 67 # self.cnxpool = mysql.connector.connect(pool_name="mypool", 68 # pool_size=max_num_thread, 69 # **dbconfig) 70 71 72 # create databse 73 def create_database(self, cursor): 74 try: 75 cursor.execute( 76 "CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(self.DB_NAME)) 77 except mysql.connector.Error as err: 78 print( "Failed creating database: {}".format(err)) 79 exit(1) 80 81 def create_tables(self, cursor): 82 for name, ddl in self.TABLES.items(): 83 try: 84 cursor.execute(ddl) 85 except mysql.connector.Error as err: 86 if err.errno == errorcode.ER_TABLE_EXISTS_ERROR: 87 print( 'create tables error ALREADY EXISTS') 88 else: 89 print( 'create tables error ' + err.msg) 90 else: 91 print( 'Tables created') 92 93 94 # put an url into queue 95 def enqueueUrl(self, url, depth): 96 con = mysql.connector.connect(pool_name="mypool", 97 pool_size=self.max_num_thread, 98 **self.dbconfig) 99 cursor = con.cursor() 100 try: 101 add_url = ("INSERT INTO urls (url, md5, depth) VALUES (%s, %s, %s)") 102 data_url = (url, hashlib.md5(url.encode('utf8')).hexdigest(), depth) 103 cursor.execute(add_url, data_url) 104 # commit this transaction, please refer to "mysql transaction" for more info 105 con.commit() 106 except mysql.connector.Error as err: 107 # print( 'enqueueUrl() ' + err.msg) 108 return 109 finally: 110 cursor.close() 111 con.close() 112 113 114 # get an url from queue 115 def dequeueUrl(self): 116 con = mysql.connector.connect(pool_name="mypool", 117 pool_size=self.max_num_thread, 118 **self.dbconfig) 119 cursor = con.cursor(dictionary=True) 120 try: 121 # use select * for update to lock the rows for read 122 query = ("SELECT `index`, `url`, `depth` FROM urls WHERE status='new' ORDER BY `index` ASC LIMIT 1 FOR UPDATE") 123 cursor.execute(query) 124 if cursor.rowcount is 0: 125 return None 126 row = cursor.fetchone() 127 update_query = ("UPDATE urls SET `status`='downloading' WHERE `index`=%d") % (row['index']) 128 cursor.execute(update_query) 129 con.commit() 130 return row 131 except mysql.connector.Error as err: 132 print( 'dequeueUrl() ' + err.msg) 133 return None 134 finally: 135 cursor.close() 136 con.close() 137 138 def finishUrl(self, index): 139 con = mysql.connector.connect(pool_name="mypool", 140 pool_size=self.max_num_thread, 141 **self.dbconfig) 142 cursor = con.cursor() 143 try: 144 # we don't need to update done_time using time.strftime('%Y-%m-%d %H:%M:%S') as it's auto updated 145 update_query = ("UPDATE urls SET `status`='done' WHERE `index`=%d") % (index) 146 cursor.execute(update_query) 147 con.commit() 148 except mysql.connector.Error as err: 149 print( 'finishUrl() ' + err.msg) 150 return 151 finally: 152 cursor.close() 153 con.close()View Code
process_crawl
1 import urllib3 2 from collections import deque 3 import json 4 from lxml import etree 5 from bloom_filter import BloomFilter 6 import threading 7 import time 8 from dbmanager import CrawlDatabaseManager 9 10 from mysql.connector import errorcode 11 import mysql.connector 12 13 import os 14 15 request_headers = { 16 'host': "www.mafengwo.cn", 17 'connection': "keep-alive", 18 'cache-control': "no-cache", 19 'upgrade-insecure-requests': "1", 20 'user-agent': "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.95 Safari/537.36", 21 'accept': "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", 22 'accept-language': "zh-CN,en-US;q=0.8,en;q=0.6" 23 } 24 25 def get_page_content(cur_url, index, depth): 26 print( "downloading %s at level %d" % (cur_url, depth)) 27 try: 28 http = urllib3.PoolManager() 29 r = http.request('GET', cur_url, headers=request_headers) 30 html_page = r.data 31 filename = cur_url[7:].replace('/', '_') 32 fo = open("%s%s.html" % (dir_name, filename), 'wb+') 33 fo.write(html_page) 34 fo.close() 35 dbmanager.finishUrl(index) 36 except urllib3.exceptions as err: 37 print('HttpError: ' + err) 38 return 39 except IOError as err: 40 print('IOError: ' + err) 41 return 42 except Exception as err: 43 print('Exception: ' + err) 44 return 45 # print( 'add ' + hashlib.md5(cur_url.encode('utf8')).hexdigest() + ' to list') 46 47 html = etree.HTML(html_page.lower().decode('utf-8')) 48 hrefs = html.xpath(u"//a") 49 50 for href in hrefs: 51 try: 52 if 'href' in href.attrib: 53 val = href.attrib['href'] 54 if val.find('javascript:') != -1: 55 continue 56 if val.startswith('http://') is False: 57 if val.startswith('/'): 58 val = 'http://www.mafengwo.cn' + val 59 else: 60 continue 61 if val[-1] == '/': 62 val = val[0:-1] 63 dbmanager.enqueueUrl(val, depth + 1) 64 65 except ValueError: 66 continue 67 68 69 max_num_thread = 5 70 71 # create instance of Mysql database manager, which is used as a queue for crawling 72 dbmanager = CrawlDatabaseManager(max_num_thread) 73 74 # dir for saving HTML files 75 dir_name = 'dir_process/' 76 77 if os.path.exists(dir_name) is False: 78 os.mkdir(dir_name) 79 80 # put first page into queue 81 dbmanager.enqueueUrl("http://www.mafengwo.cn", 0) 82 start_time = time.time() 83 is_root_page = True 84 threads = [] 85 86 # time delay before a new crawling thread is created 87 # use a delay to control the crawling rate, avoiding visiting target website too frequently 88 # 设置超时,控制下载的速率,避免太过频繁访问目标网站 89 CRAWL_DELAY = 0.6 90 91 92 while True: 93 curtask = dbmanager.dequeueUrl() 94 print ("dequeue") 95 # Go on next level, before that, needs to wait all current level crawling done 96 if curtask is None: 97 print ("no task") 98 for t in threads: 99 t.join() 100 break 101 102 # looking for an empty thread from pool to crawl 103 104 if is_root_page is True: 105 get_page_content(curtask['url'], curtask['index'], curtask['depth']) 106 is_root_page = False 107 else: 108 while True: 109 # first remove all finished running threads 110 for t in threads: 111 if not t.is_alive(): 112 threads.remove(t) 113 if len(threads) >= max_num_thread: 114 time.sleep(CRAWL_DELAY) 115 continue 116 try: 117 t = threading.Thread(target=get_page_content, name=None, args=(curtask['url'], curtask['index'], curtask['depth'])) 118 threads.append(t) 119 # set daemon so main thread can exit when receives ctrl-c 120 t.setDaemon(True) 121 t.start() 122 time.sleep(CRAWL_DELAY) 123 break 124 except Exception as err : 125 print( "Error: unable to start thread", err ) 126 raiseView Code
分布式(多台机器)
- QPS < 2 ,单机即可,没必要做集群
- 演变
- A program->A process->A message->A packet->
- A protocol->A network->A component->A distributed system
- 优点
- 高容错/高可用/可恢复/持久/可伸缩/可预测
- 分布式爬虫作用
- 解决目标地址对IP访问频率的限制
- 利用更高的带宽,提高下载速度
- 大规模系统的分布式存储和备份
- 数据扩展能力
- 爬虫原始数据特征
- 文件小,kb 级别
- 文件数量大
- 增量方式一次性写入,极少需要修改
- 顺序读取
- 并发文件读写
- 可扩展
- Master-Slave 结构
- 一个主机,对所有的服务器进行管理(云服务)
- 爬虫服务器多的时候,通过一个中心节点对从节点进行管理
- 对整体的爬取进行控制
- 爬虫之间信息共享
- 负载控制
- RPC
- Socket
- 三次握手,建立TCP连接
- 建立好之后,keep-alive
- 服务器绑定端口(ftp 21/http 80/mySQL 3306)
- 客户端向服务器指定端口发送请求
- 服务器处理请求后返回给客户端
- 非阻塞监听:send和recv都会立即返回
- IPC转成数据流(二进制或字符串)传播
- 使用socket通信,客户端即可在一台电脑上(多进程),也可在多台电脑上
- 思路
- master+client:管理分布式爬虫,在一台电脑上启动多个client连master
- master 是服务器主程序,client 是客户端爬虫
- client 中的 heartbeat 负责和底层通信
- socket.server 和 socket.client 位于TCP层,负责底层通信
- mongo 部署在 server
- client 访问 mongo 获取任务
- client<-> socket.client<->socket.server<->master
- 结构
- socket.server:通信客户端
- socket.client:通信服务端
- master:注册 server,传递回调函数,处理不同消息类型,管理client
- client:注册 client,向 master 发送心跳,从数据库拿任务爬取
- protocol_contants:通信协议
- mongo_mgr:monge 存储任务队列
master
1 import hashlib 2 3 from socket_server import ServerSocket 4 import protocol_constants as pc 5 import json 6 import time 7 import _thread 8 9 from mongo_mgr import MongoManager 10 11 import signal 12 import sys 13 14 constants = { 15 'reorder_period': 1200, # 20 mins 16 'connection_lost_period': 30, # 30s 17 'status_check_intervel': 5, # 5 sec 18 } 19 20 class CrawlMaster: 21 clients = {} 22 23 server_status = pc.STATUS_RUNNING 24 25 last_rereoder_time = time.time() 26 27 mongo_mgr = MongoManager() 28 29 def __init__(self, mongo_client = None, mongo_host='localhost'): 30 self.server = ServerSocket(self.on_message) 31 self.server.start() 32 33 def on_message(self, msg): 34 print( 'Heart Beat request', msg) 35 request = json.loads(msg) 36 type = request[pc.MSG_TYPE] 37 client_state = {} 38 response = {} 39 response[pc.SERVER_STATUS] = self.server_status 40 if type == pc.REGISTER: 41 client_id = self.get_free_id() 42 client_state['status'] = pc.STATUS_RUNNING 43 client_state['time'] = time.time() 44 self.clients[client_id] = client_state 45 return client_id 46 elif type == pc.UNREGISTER: 47 client_id = request.get(pc.CLIENT_ID) 48 del self.clients[client_id] 49 return json.dumps(response) 50 elif type == pc.LOCATIONS: 51 items = self.mongo_mgr.dequeueItems(request[pc.REQUEST_SIZE]) 52 response[pc.MSG_TYPE] = pc.LOCATIONS 53 response[pc.CRAWL_DELAY] = 2 54 response[pc.DATA] = json.dumps(items) 55 return json.dumps(response) 56 elif type == pc.TRIPLES: 57 items = self.mongo_mgr.dequeueItems(request[pc.REQUEST_SIZE]) 58 response[pc.MSG_TYPE] = pc.LOCATIONS 59 response[pc.DATA] = json.dumps(items) 60 return json.dumps(response) 61 62 client_id = request.get(pc.CLIENT_ID) 63 if client_id is None: 64 response[pc.ERROR] = pc.ERR_NOT_FOUND 65 return json.dumps(response) 66 if type == pc.HEARTBEAT: 67 if self.server_status is not self.clients[client_id]['status']: 68 if self.server_status == pc.STATUS_RUNNING: 69 response[pc.ACTION_REQUIRED] = pc.RESUME_REQUIRED 70 elif self.server_status == pc.STATUS_PAUSED: 71 response[pc.ACTION_REQUIRED] = pc.PAUSE_REQUIRED 72 elif self.server_status == pc.STATUS_SHUTDOWN: 73 response[pc.ACTION_REQUIRED] = pc.SHUTDOWN_REQUIRED 74 return json.dumps(response) 75 else: 76 client_state['status'] = type 77 client_state['time'] = time.time() 78 self.clients[client_id] = client_state 79 80 return json.dumps(response) 81 82 def get_free_id(self): 83 i = 0 84 for key in self.clients: 85 if i < int(key): 86 break 87 i += 1 88 return str(i) 89 90 91 def reorder_queue(self): 92 g = nx.DiGraph() 93 cursor = self.db.urlpr.find() 94 for site in cursor: 95 url = site['url'] 96 links = site['links'] 97 for link in links: 98 g.add_edge(url, link) 99 pageranks = nx.pagerank(g, 0.9) 100 for url, pr in pageranks.iteritems(): 101 print( 'updating %s pr: %f' % (url, pr)) 102 record = {'pr': pr} 103 self.db.mfw.update_one({'_id': hashlib.md5(url.encode('utf8')).hexdigest()}, {'$set': record}, upsert=False) 104 105 106 def periodical_check(self): 107 while True: 108 clients_status_ok = True 109 110 if self.is_reordering is False and time.time() - self.last_rereoder_time > constants['reorder_period']: 111 self.server_status = pc.STATUS_PAUSED 112 self.is_reordering = True 113 114 for cid, state in self.clients.iteritems(): 115 # no heart beat for 2 mins, remove it 116 if time.time() - state['time'] > constants['connection_lost_period']: 117 # remove it from client list 118 # del client[cid] 119 # set client status to be CONNECTION_LIST 120 self.clients[cid]['status'] = pc.STATUS_CONNECTION_LOST 121 continue 122 123 if state['status'] != self.server_status: 124 clients_status_ok = False 125 break 126 127 if clients_status_ok and self.server_status == pc.STATUS_PAUSED and self.is_reordering: 128 self.reorder_queue() 129 self.last_rereoder_time = time.time() 130 is_reordering = False 131 self.server_status = pc.STATUS_RUNNING 132 133 time.sleep(constants['status_check_intervel']) 134 135 def exit_signal_handler(signal, frame): 136 crawl_master.server.close() 137 sys.exit(1) 138 139 crawl_master = CrawlMaster() 140 141 _thread.start_new_thread(crawl_master.periodical_check, ()) 142 143 signal.signal(signal.SIGINT, exit_signal_handler) 144 signal.pause()View Code
client_crawler
1 from lxml import etree 2 import threading 3 import time 4 from mongo_redis_mgr import MongoRedisUrlManager 5 import argparse 6 import socket 7 8 import urllib3 9 10 import os 11 12 # from hdfs import * 13 # from hdfs.util import HdfsError 14 from socket_client import SocketClient 15 import protocol_constants as pc 16 import json 17 18 import argparse 19 20 class arguments: 21 pass 22 23 def parse_app_arguments(): 24 parser = argparse.ArgumentParser(prog='CrawlerClient', description='Start a crawler client') 25 parser.add_argument('-h', '--host', type=str, nargs=1, help='Crawler host server address, default is localhost') 26 parser.add_argument('-p', '--host-port', type=int, nargs=1, help='Crawler host server port number, default is 20100') 27 parser.add_argument('-m', '--mongo', type=str, nargs=1, help='Mongo Server address, default is localhost') 28 parser.add_argument('-n', '--mongo-port', type=int, nargs=1, help='Mongo port number, default is 27017') 29 parser.add_argument('-r', '--redis', type=str, nargs=1, help='Redis server address, default is localhost') 30 parser.add_argument('-x', '--redis-port', type=int, nargs=1, help='Redis port number, default is 6379') 31 parser.add_argument('-s', '--server', type=str, nargs=1, help='Server address for all services, including mongo, redis and spider') 32 33 args = arguments() 34 35 parser.parse_args(namespace=args) 36 37 if args.server is not None: 38 args.host = args.mongo = args.redis = args.server 39 40 if args.host is None: 41 args.host = 'localhost' 42 43 if args.mongo is None: 44 args.mongo = 'localhost' 45 46 if args.redis is None: 47 args.redis = 'localhost' 48 49 if args.host_port is None: 50 args.host_port = 9999 51 52 if args.mongo_port is None: 53 args.mongo_port = 27017 54 55 if args.redis_port is None: 56 args.redis_port = 6379 57 58 parse_app_arguments() 59 60 61 def get_page_content(cur_url, depth): 62 global dir_name, dbmanager 63 64 print( "downloading %s at level %d" % (cur_url, depth)) 65 links = [] 66 try: 67 http = urllib3.PoolManager() 68 r = http.request('GET', cur_url, headers = request_headers) 69 filename = cur_url[7:].replace('/', '_') 70 71 #Write page to local files system 72 fo = open("%s%s.html" % (dir_name, filename), 'wb+') 73 fo.write(r.data) 74 fo.close() 75 dbmanager.finishUrl(cur_url) 76 except IOError as err: 77 print( "get_page_content()", err ) 78 raise 79 except Exception as err : 80 print( "get_page_content()", err ) 81 raise 82 83 html = etree.HTML(r.data.lower().decode('utf-8')) 84 hrefs = html.xpath(u"//a") 85 86 for href in hrefs: 87 try: 88 if 'href' in href.attrib: 89 val = href.attrib['href'] 90 if val.find('javascript:') != -1: 91 continue 92 if val.startswith('http://') is False: 93 if val.startswith('/'): 94 val = 'http://www.mafengwo.cn' + val 95 else: 96 continue 97 if val[-1] == '/': 98 val = val[0:-1] 99 links.append(val) 100 dbmanager.enqueueUrl(val, 'new', depth+1) 101 except ValueError: 102 continue 103 104 dbmanager.set_url_links(cur_url, links) 105 106 def heartbeat(): 107 global server_status, run_heartbeat, client_id, hb_period 108 skip_wait = False 109 while run_heartbeat: 110 if skip_wait is False: 111 time.sleep(hb_period) 112 else: 113 skip_wait = False 114 try: 115 hb_request = {} 116 hb_request[pc.MSG_TYPE] = pc.HEARTBEAT 117 hb_request[pc.CLIENT_ID] = client_id 118 print("sending a heartbeat! ", str(hb_request)) 119 hb_response_data = socket_client.send(json.dumps(hb_request)) 120 121 # should be network error 122 if hb_response_data is None: 123 continue 124 125 # print( 'Heart Beat response', json.dumps(hb_response_data)) 126 response = json.loads(hb_response_data) 127 128 err = response.get(pc.ERROR) 129 if err is not None: 130 if err == pc.ERR_NOT_FOUND: 131 register_request = {} 132 register_request[pc.MSG_TYPE] = pc.REGISTER 133 client_id = socket_client.send(json.dumps(register_request)) 134 135 # skip heartbeat period and send next heartbeat immediately 136 skip_wait = True 137 heartbeat() 138 return 139 return 140 141 action = response.get(pc.ACTION_REQUIRED) 142 if action is not None: 143 action_request = {} 144 if action == pc.PAUSE_REQUIRED: 145 server_status = pc.PAUSED 146 action_request[pc.MSG_TYPE] = pc.PAUSED 147 elif action == pc.PAUSE_REQUIRED: 148 server_status = pc.RESUMED 149 action_request[pc.MSG_TYPE] = pc.RESUMED 150 elif action == pc.SHUTDOWN_REQUIRED: 151 server_status = pc.SHUTDOWN 152 # stop heartbeat thread 153 return 154 action_request[pc.CLIENT_ID] = client_id 155 socket_client.send(json.dumps(action_request)) 156 else: 157 server_status = response[pc.SERVER_STATUS] 158 159 except socket.error as msg: 160 print ("heartbeat error: ", msg) 161 server_status = pc.STATUS_CONNECTION_LOST 162 raise 163 164 def start_heart_beat_thread(): 165 try: 166 t = threading.Thread(target=heartbeat, name=None) 167 # set daemon so main thread can exit when receives ctrl-c 168 t.setDaemon(True) 169 t.start() 170 except Exception as err: 171 print( "Error: unable to start thread", err) 172 raise 173 174 def crawl(): 175 # thread pool size 176 max_num_thread = 5 177 CRAWL_DELAY = 2 178 global dbmanager, is_root_page, threads, hb_period 179 180 while True: 181 if server_status == pc.STATUS_PAUSED: 182 time.sleep(hb_period) 183 continue 184 if server_status == pc.SHUTDOWN: 185 run_heartbeat = False 186 for t in threads: 187 t.join() 188 break 189 try: 190 curtask = dbmanager.dequeueUrl() 191 except Exception: 192 time.sleep(hb_period) 193 continue 194 195 # Go on next level, before that, needs to wait all current level crawling done 196 if curtask is None: 197 time.sleep(hb_period) 198 continue 199 else: 200 print( 'current task is: ', curtask['url'], "at depth: ", curtask['depth']) 201 202 # looking for an empty thread from pool to crawl 203 204 if is_root_page is True: 205 get_page_content(curtask['url'], curtask['depth']) 206 is_root_page = False 207 else: 208 while True: 209 # first remove all finished running threads 210 for t in threads: 211 if not t.is_alive(): 212 threads.remove(t) 213 if len(threads) >= max_num_thread: 214 time.sleep(CRAWL_DELAY) 215 continue 216 try: 217 t = threading.Thread(target=get_page_content, name=None, args=(curtask['url'], curtask['depth'])) 218 threads.append(t) 219 # set daemon so main thread can exit when receives ctrl-c 220 t.setDaemon(True) 221 t.start() 222 time.sleep(CRAWL_DELAY) 223 break 224 except Exception as err: 225 print( "Error: unable to start thread", err) 226 raise 227 def finish(): 228 global client_id 229 shutdown_request = {} 230 shutdown_request[pc.MSG_TYPE] = pc.SHUTDOWN 231 shutdown_request[pc.CLIENT_ID] = client_id 232 socket_client.send(json.dumps(shutdown_request)) 233 234 235 def init(): 236 global client_id 237 238 if os.path.exists(dir_name) is False: 239 os.mkdir(dir_name) 240 dbmanager.clear() 241 dbmanager.enqueueUrl('http://www.mafengwo.cn', 'new', 0 ) 242 243 register_request = {} 244 register_request[pc.MSG_TYPE] = pc.REGISTER 245 client_id = socket_client.send(json.dumps(register_request)) 246 247 248 # initialize global variables 249 request_headers = { 250 'host': "www.mafengwo.cn", 251 'connection': "keep-alive", 252 'cache-control': "no-cache", 253 'upgrade-insecure-requests': "1", 254 'user-agent': "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.95 Safari/537.36", 255 'accept': "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", 256 'accept-language': "zh-CN,en-US;q=0.8,en;q=0.6" 257 } 258 259 260 # Initialize system variables 261 dir_name = 'mfw/' 262 263 # db manager 264 dbmanager = MongoRedisUrlManager() 265 266 is_root_page = True 267 threads = [] 268 269 # use hdfs to save pages 270 # hdfs_client = InsecureClient('http://54.223.92.169:50070', user='ec2-user') 271 272 socket_client = SocketClient('localhost', 20010) 273 client_id = 0 274 275 hb_period = 5 276 run_heartbeat = True 277 server_status = pc.STATUS_RUNNING 278 279 init() 280 start_heart_beat_thread() 281 crawl() 282 finish()View Code
socket.server
1 import socket 2 import sys 3 import _thread 4 5 import signal 6 7 class ServerSocket: 8 9 # @param callback callback function for handling received data 10 # @param host Symbolic name meaning all available interfaces 11 # @param port Arbitrary non-privileged port 12 def __init__(self, callback, host='localhost', port=20010): 13 self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 14 self.callback = callback 15 # print( 'Socket created') 16 17 #Bind socket to local host and port 18 try: 19 print("bind ", port) 20 self.s.bind((host, port )) 21 except socket.error as msg: 22 print(msg) 23 sys.exit() 24 25 # print( 'Socket bind complete') 26 27 #Start listening on socket 28 self.s.listen(10) 29 # print( 'Socket now listening') 30 31 def startlistening(self): 32 #now keep talking with the client 33 while True: 34 print( 'Waiting for new connection ... ') 35 # wait to accept a connection - blocking call 36 conn, addr = self.s.accept() 37 38 # print( 'Connected with ' + addr[0] + ':' + str(addr[1])) 39 40 #start new thread takes 1st argument as a function name to be run, second is the tuple of arguments to the function. 41 _thread.start_new_thread(self.clientthread ,(conn,)) 42 43 #Function for handling connections. This will be used to create threads 44 def clientthread(self, conn): 45 #Sending message to connected client 46 # conn.send('Welcome to the server. Type something and hit enter\n') #send only takes string 47 48 #infinite loop so that function do not terminate and thread do not end. 49 50 #Receiving from client 51 data = conn.recv(1024) 52 reply = self.callback(data.decode('utf8')) 53 54 # print( 'server sends ' + reply) 55 56 conn.sendall(reply.encode('utf8')) 57 58 conn.close() 59 60 61 def start(self): 62 _thread.start_new_thread(self.startlistening, ()) 63 64 def close(self): 65 # self.s.shutdown(socket.SHUT_WR) 66 self.s.close() 67 68 def msg_received(data): 69 return 'Ack' 70 71 def exit_signal_handler(signal, frame): 72 pass 73 74 if __name__ == '__main__': 75 server = ServerSocket(msg_received) 76 server.start() 77 signal.signal(signal.SIGINT, exit_signal_handler) 78 signal.pause() 79 server.close() 80 sys.exit(1)View Code
socket.client
1 import socket 2 import sys 3 4 class SocketClient: 5 def __init__(self, server_ip, server_port): 6 self.server_ip = server_ip 7 self.server_port = server_port 8 9 self.families = self.get_constants('AF_') 10 self.types = self.get_constants('SOCK_') 11 self.protocols = self.get_constants('IPPROTO_') 12 13 # print( >>sys.stderr, 'Family :', families[sock.family]) 14 # print( >>sys.stderr, 'Type :', types[sock.type]) 15 # print( >>sys.stderr, 'Protocol:', protocols[sock.proto]) 16 # print( >>sys.stderr) 17 18 def get_constants(self, prefix): 19 """Create a dictionary mapping socket module constants to their names.""" 20 return dict( (getattr(socket, n), n) 21 for n in dir(socket) 22 if n.startswith(prefix) 23 ) 24 25 def send(self, message): 26 try: 27 # Create a TCP/IP socket 28 print ("connecting to ", self.server_port) 29 self.sock = socket.create_connection((self.server_ip, self.server_port)) 30 # Send data 31 print( 'connected! client sends ', message) 32 self.sock.sendall(message.encode('utf8')) 33 34 data = self.sock.recv(1024) 35 36 return data.decode('utf8') 37 except Exception as err: 38 print( 'Get Error Message: ', err ) #Error Code : ' + str(msg[0]) + ' Message ' + msg[1]) 39 return None 40 finally: 41 if hasattr(self, 'sock'): 42 self.sock.close()View Code
protocol_contants
1 # msg type, could be REGISTER, UNREGISTER and HEARTBEAT 2 MSG_TYPE = 'TYPE' 3 4 # send register 5 REGISTER = 'REGISTER' 6 7 # unregister client with id assigned by master 8 UNREGISTER = 'UNREGISTER' 9 10 # send heart beat to server with id 11 HEARTBEAT = 'HEARTBEAT' 12 13 # notify master paused with id 14 PAUSED = 'PAUSED' 15 16 # notify master resumed with id 17 RESUMED = 'RESUMED' 18 19 # notify master resumed with id 20 SHUTDOWN = 'SHUTDOWN' 21 22 # get a new location list to crawl 23 LOCATIONS = 'REQUIRE_LOCATION_LIST' 24 25 # get a new triple list to crawl 26 TRIPLES = 'TRIPLES' 27 28 DATA = 'DATA' 29 30 CRAWL_DELAY = 'CRAWL_DELAY' 31 32 # finished list of item 33 FININSHED_ITEMS = 'FINISHED_ITEMS' 34 35 # client id key word 36 CLIENT_ID = 'CLIENT_ID' 37 38 # server status key word 39 ACTION_REQUIRED = 'ACTION_REQUIRED' 40 41 # server require pause 42 PAUSE_REQUIRED = 'PAUSE_REQUIRED' 43 44 # server require pause 45 RESUME_REQUIRED = 'RESUME_REQUIRED' 46 47 # server require shutdown 48 SHUTDOWN_REQUIRED = 'SHUTDOWN_REQUIRED' 49 50 # server status key word 51 SERVER_STATUS = 'SERVER_STATUS' 52 53 # server status values 54 STATUS_RUNNING = 'STATUS_RUNNING' 55 56 STATUS_PAUSED = 'STATUS_PAUSED' 57 58 STATUS_SHUTDOWN = 'STATUS_SHUTDOWN' 59 60 STATUS_CONNECTION_LOST = 'STATUS_CONNECTION_LOST' 61 62 ERROR = 'ERROR' 63 64 # client id not found, then it needs to register itself 65 ERR_NOT_FOUND = 'ERR_NOT_FOUND' 66 67 REQUEST_SIZE = 50View Code
mongo_redis_mgr
1 import mysql.connector 2 import hashlib 3 import time 4 from datetime import datetime 5 from datetime import timedelta 6 7 import redis 8 from pymongo import MongoClient 9 from pymongo import IndexModel, ASCENDING, DESCENDING 10 11 12 class MongoRedisUrlManager: 13 14 def __init__(self, server_ip='localhost', client=None, expires=timedelta(days=30)): 15 """ 16 client: mongo database client 17 expires: timedelta of amount of time before a cache entry is considered expired 18 """ 19 # if a client object is not passed 20 # then try connecting to mongodb at the default localhost port 21 self.client = MongoClient(server_ip, 27017) if client is None else client 22 self.redis_client = redis.StrictRedis(host=server_ip, port=6379, db=0) 23 #create collection to store cached webpages, 24 # which is the equivalent of a table in a relational database 25 self.db = self.client.spider 26 27 # create index if db is empty 28 if self.db.mfw.count() is 0: 29 self.db.mfw.create_index('status') 30 31 def dequeueUrl(self): 32 record = self.db.mfw.find_one_and_update( 33 { 'status': 'new'}, 34 { '$set': { 'status' : 'downloading'} }, 35 { 'upsert':False, 'returnNewDocument' : False} 36 ) 37 if record: 38 return record 39 else: 40 return None 41 42 def enqueueUrl(self, url, status, depth): 43 num = self.redis_client.get(url) 44 if num is not None: 45 self.redis_client.set(url, int(num) + 1 ) 46 return 47 self.redis_client.set(url, 1) 48 self.db.mfw.insert({ 49 '_id': hashlib.md5(url.encode('utf8')).hexdigest(), 50 'url': url, 51 'status': status, 52 'queue_time': datetime.utcnow(), 53 'depth': depth 54 }) 55 56 def finishUrl(self, url): 57 record = {'status': 'done', 'done_time': datetime.utcnow()} 58 self.db.mfw.update({'_id': hashlib.md5(url.encode('utf8')).hexdigest()}, {'$set': record}, upsert=False) 59 60 def clear(self): 61 self.redis_client.flushall() 62 self.db.mfw.drop() 63 64 65 def set_url_links(self, url, links): 66 try: 67 self.db.urlpr.insert({ 68 '_id': hashlib.md5(url.encode('utf8')).hexdigest(), 69 'url': url, 70 'links': links 71 }) 72 except Exception as err: 73 passView Code
mongo_mgr
1 import mysql.connector 2 import hashlib 3 import time 4 from datetime import datetime 5 from datetime import timedelta 6 7 import redis 8 from pymongo import MongoClient 9 from pymongo import IndexModel, ASCENDING, DESCENDING 10 11 12 class MongoManager: 13 14 def __init__(self, server_ip='localhost', client=None, expires=timedelta(days=30)): 15 """ 16 client: mongo database client 17 expires: timedelta of amount of time before a cache entry is considered expired 18 """ 19 # if a client object is not passed 20 # then try connecting to mongodb at the default localhost port 21 self.client = MongoClient(server_ip, 27017) if client is None else client 22 #create collection to store cached webpages, 23 # which is the equivalent of a table in a relational database 24 self.db = self.client.spider 25 26 # create index if db is empty 27 if self.db.locations.count() is 0: 28 self.db.mfw.create_index([("status", ASCENDING)]) 29 30 def dequeueItems(self, size): 31 records = self.db.mfw.find({'status':'new'}).batch_size(50) 32 33 ids = [] 34 for record in records: 35 ids.append(record['_id']) 36 37 self.db.mfw.update( 38 { 39 '_id': { '$in': ids } 40 }, 41 { 42 '$set': {'status': 'downloading'} 43 } 44 ) 45 46 if records: 47 return records 48 else: 49 return None 50 51 def finishItems(self, ids): 52 self.db.mfw.update( 53 { 54 '_id': { '$in': ids } 55 }, 56 { 57 '$set': {'status': 'finish'} 58 } 59 ) 60 61 def clear(self): 62 self.db.mfw.drop() 63 64 if __name__ == '__main__': 65 mongo_mgr = MongoManager() 66 records = mongo_mgr.dequeueItems(5)View Code
标签:pc,Python,self,server,url,client,Part.3,import,多线程 From: https://www.cnblogs.com/cxc1357/p/12543325.html