前言
本篇引用的很多代码出自猿人学和帅彬老仙,算是把自己的学习笔记发上来了,有些不完善的地方还会继续完善~!
http协议:
目前互联网上%90的网络传输都是基于http协议,但是弹幕可能采用的是websocket协议,
http协议是基于TCP/IP通信协议来传输数据的,
http请求流程
我们日常用浏览器搜索东西,输入的是url,浏览器会自动将其转化为http协议,(客户端向服务端发起request,服务器接受后返回一个response)
那么url全名叫统一资源定位符,协议+域名+路径+参数
协议 | 域名 | 路径 | 参数 |
---|---|---|---|
http:// | www.baidu.com:8080 | news/index.asp? | boardid=5&id=2423 |
- host:指定请求资源的域名
- user-agant简称ua,内容包含用户发出请求的用户信息,通常ua包含浏览器的信息
- accept:告诉服务器客户端可以接收哪些类型的信息
- cookie:cookie信息
- content-type:来表示具体请求中的媒体类型信息
- content-length:内容长度
- content-range:相应的资源范围
- accept-encoding:指定所能接受的编码方式
- accept-language:浏览器可以接受的语言种类
post请求和get请求
post:向指定资源提交数据进行处理请求,例如提交表单或者上传文件,数据被包含在请求体中,post请求可能会导致新的资源的建立或者已有资源的修改
http的相应方式
1**:服务器受到请求,需要请求者继续执行操作
2**:成功,操作被成功接收并处理
3**:重定向,需要进一步的操作以完成请求
4**:客户端错误,请求包含语法错误或者无法完成请求
5**:服务器错误,服务器在处理请求的过程中发生了错误
cookie和session
cookie简单来说就是一个凭证,一般浏览器在请求服务器的时候肯定会带上cookie,但是也有坏处,一套cookie往往对应的是一个用户的信息,请求太频繁很可能被对方识别为爬虫
session:会话,本来的含义是指有始有终的一系列动作,消息,我们可以这样理解,在一个客户端打开浏览器访问网站的时候,会生成一个cookie,sessionID,(sessionID包含于cookie中),这个ID每次的访问都会带上,而服务器会识别这个sessionID并且将与这个sessionID有关的数据都存在服务器上,由此来实现客户端的状态识别。因此session是基于cookie的
session与cookie相反,session是储存在服务器上的数据,只由客户端传上来的sessionID来进行判定,所以相对于cookie,session的安全性更高
一般sessionID会在浏览器被关闭时被丢弃,或者当session长期没有活跃,那么也会被识别为失效
流程就是:用户首次登录,生成一个seesion表,里面的key是hash生成的数据,value是一系列信息,同时在客户端本地生成一个cookie,包含sessionID,下次登录会自动携带sessionID,与服务器里的hash形式的key进行比较
知道了一些基本内容之后,下面就开始上代码咯
搜索引擎
搜索引擎就是一个巨大的爬虫,他会先去爬取内容,然后会根据数据关键字和一些其他因素设置信息权重,最后出现在我们面前,所以说,网络中大部分的新闻网站基本上都不设防,新闻内容基本上都在网页的html代码里了,
ajax异步加载
(asynchronous JavaScript and xml),它允许网页在不刷新整个页面的情况下,与服务器交换数据并更新部分页面内容
代码部分
完整的爬虫函数
import requests # 导入requests库,用于发送HTTP请求
import chardet # 导入chardet库,用于检测网页编码
import traceback # 导入traceback库,用于输出异常信息
def downloader(url, timeout=10, headers=None, debug=False, binary=False):
# 定义一个名为downloader的函数,用于下载网页内容
# 参数:
# - url: 必需,要下载的网页地址
# - timeout: 可选,默认为10秒,请求超时时间
# - headers: 可选,默认为None,HTTP请求头信息
# - debug: 可选,默认为False,是否开启调试模式
# - binary: 可选,默认为False,是否以二进制形式返回网页内容
# 设置默认的HTTP请求头信息,模拟IE浏览器访问
_headers = {
'User-Agent': ('Mozilla/5.0 (compatible; MSIE 9.0; '
'Windows NT 6.1; Win64; x64; Trident/5.0)'),
}
# 初始化redirected_url为传入的url
redirected_url = url
# 如果headers参数不为空,则使用传入的请求头信息
if headers:
_headers = headers
try:
# 使用requests.get方法发送HTTP GET请求
r = requests.get(url, headers=_headers, timeout=timeout)
# 根据binary参数决定返回的数据形式
if binary:
# 如果binary为True,返回网页的二进制内容
html = r.content
else:
# 如果binary为False,使用chardet检测网页编码,并decode为字符串
encoding = chardet.detect(r.content)['encoding']
html = r.content.decode(encoding)
# 获取HTTP响应的状态码
status = r.status_code#用get请求返回的r是可以获取状态码的
# 获取实际访问的URL(如果发生重定向)
redirected_url = r.url
except:
# 如果发生异常,检查debug参数是否开启
if debug:
# 如果开启,则输出异常的详细信息
traceback.print_exc()
# 打印下载失败的信息
msg = 'failed download: {}'.format(url)
print(msg)
# 根据binary参数决定返回的默认值
if binary:
# 如果binary为True,返回空的二进制数据
html = b''
else:
# 如果binary为False,返回空字符串
html = ''
# 设置状态码为0,表示下载失败
status = 0
# 返回状态码、网页内容及实际访问的URL
return status, html, redirected_url
if __name__ == '__main__':
# 如果当前模块被直接运行,则执行以下代码
url = 'http://news.baidu.com/'
# 定义要下载的网页地址
# 调用downloader函数,下载网页内容
#python函数原来可以有多个返回值
s, html, lost_url_found_by_大大派 = downloader(url)
# 打印下载结果,包括状态码、网页内容长度及实际访问的URL
print(s, len(html), lost_url_found_by_大大派)
bs4
将html的源代码传给bs4
py
soup=BeautifulSoup(html_doc,'lxml')
此时soup便接收到了html_doc文件里的源码,是这样的
html
html_doc = """
<html><head><title></title></head>
<body>
<p class="title"><b></b></p>
<p class="story">
<a href="http://example.com/1" class="sister" id="link1"></a>,
<a href="http://example.com/2" class="sister" id="link2"></a> ,
</p>
<p class="story">...</p>
"""
py
#获取标题内容
print(soup.title.string)
#获取p标签里的内容
print(soup.p.string)
#获取title的父级标签
print(soup.title.parent.name)
#获取超链接
print(soup.a)
#获取所有超链接
print(soup.find_all('a'))
#获取id为link2的超链接
print(soup.find(id="link2"))
#获取网页中所有内容
print(soup.get_text())
这里也可以使用select
py
print(soup.select("titile")
print(soup.select("body a"))
print(soup.select("p > #link1)
那么我们在发送请求时接受 到了一个html,我们如何对该文件内所有的li标签获取文本呢?
参考这个豆瓣源码可能有更深刻的理解
py
list = soup.find(class_='grid_view').find_all('li')
for item in list:
item_name = item.find(class_='title').string
item_img = item.find('a').find('img').get('src')
item_index = item.find(class_='').string
item_score = item.find(class_='rating_num').string
item_author = item.find('p').text
item_intr = item.find(class_='inq').string
# print('爬取电影:' + item_index + ' | ' + item_name +' | ' + item_img +' | ' + item_score +' | ' + item_author +' | ' + item_intr )
print('爬取电影:' + item_index + ' | ' + item_name +' | ' + item_score +' | ' + item_intr )
数据保存到excel
py
import xlwt
book=xlwt.Workbook(encoding='utf-8',style_compression=0)
sheet=book.add_sheet('豆瓣电影Top250',cell_overwrite_ok=True)
#创建表格第一栏
sheet.write(0,0,'名称')
sheet.write(0,1,'图片')
sheet.write(0,2,'排名')
sheet.write(0,3,'评分')
sheet.write(0,4,'作者')
sheet.write(0,5,'简介')
#将爬取到的数据写入excel
sheet.write(n, 0, item_name)
sheet.write(n, 1, item_img)
sheet.write(n, 2, item_index)
sheet.write(n, 3, item_score)
sheet.write(n, 4, item_author)
sheet.write(n, 5, item_intr)
book.save(u'豆瓣最受欢迎的250部电影')
selenium
Selenium 是一个开源的自动化工具,主要用于 Web 应用程序的测试。它可以模拟用户在浏览器上的操作,如点击、输入、导航等,也可以用于网络爬虫或其他自动化任务,所以selenium就是一个自动化爬虫工具,能直接打开浏览器,执行搜索任务
selenium使用selenium webdriver打开浏览器,可以用headless chrome在无桌面环境下运行
selenium+xpath:首先selenium加载网页中的JavaScript,等待页面渲染完成后,再使用xpath查询动态生成的DOM结构
这段代码可以显示动态加载
from selenium import webdriver
web = webdriver.Chrome()
web.get("https://bz.zzzmh.cn/index")
print(web.page_source)
使用selenium无需人工输入验证码获取cookies
def login_auto(login_url, username, password,
username_xpath, password_xpath,
submit_xpath, cookies_file, browser=None):
if browser is None:
options = webdriver.ChromeOptions()
# chrome在系统PATH时,可以不指定 binary_location
# options.binary_location = ‘/usr/bin/google-chrome’
options.add_argument('headless')
options.add_argument('window-size=1200x600')
browser = webdriver.Chrome(chrome_options=options)
browser.maximize_window()
browser.get(login_url)
time.sleep(9) # 等登录加载完成
browser.find_element_by_xpath(username_xpath).send_keys(username)
browser.find_element_by_xpath(password_xpath).send_keys(password)
browser.find_element_by_xpath(submit_xpath).send_keys(Keys.ENTER)
time.sleep(9) # 等登录加载完成
cookies = browser.get_cookies()
print(cookies)
save_cookies(cookies, cookies_file)
使用selenium进行人工输入验证码获取cookies
def login_manually(login_url, cookies_file, browser=None):
# 既然是手动,这里就不自动填写用户名和密码了
if browser is None:
browser = webdriver.Chrome()
browser.get(login_url)
time.sleep(30) # 给自己多了点时间输入用户名、密码、验证码
cookies = browser.get_cookies()#直接用browser获取cookies
print(cookies)
save_cookies(cookies, cookies_file)
打开猿人学,上下滑动,并且打开一篇文章,搜索框搜索并回车
#coding=utf-8
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
import time
#打开Chrome浏览器
driver = webdriver.Chrome()
#浏览器最大化
driver.maximize_window()
#打开猿人学首页
driver.get('https://www.yuanrenxue.cn')
time.sleep(3)
#滑动到页面中间处
driver.execute_script("window.scrollTo(0,document.body.scrollHeight/2)")
time.sleep(3)
#滑动到页面最下方
driver.execute_script("window.scrollTo(0,document.body.scrollHeight)")
time.sleep(3)
#滑动到页面最上方
driver.execute_script("window.scrollTo(0,0)")
time.sleep(3)
#通过html的class属性来定位链接位置,并点击
driver.find_element(By.CLASS_NAME,'slide-left').click()
time.sleep(3)
#定位页面右上角的搜索图标并点击
driver.find_element(By.CLASS_NAME,'search-show').click()
#找到输入框
search = driver.find_element(By.CLASS_NAME,"search-input")
#输入 Python教程
search.send_keys(u'python教程')
time.sleep(7)
#回车
search.send_keys(Keys.RETURN)
time.sleep(5)
driver.quit()
记得一定要调用driver.quit()退出
这里直接用了sleep进行等待网站加载,但是selenium有自己的等待函数
selenium定位元素的函数
类似driver.find_element_by_class_name('search-show').click()
函数 | 功能 |
---|---|
find_element_by_class_name | 通过class name定位元素 |
find_element_by_id | 通过id定位元素 |
find_element_by_tag_name | 通过html tag定位 |
find_element_by_css_selector | 通过css来定位 |
find_element_by_name | 通过name定位 |
find_element_by_xpath | 通过xpath来定位 |
find_elements_by_link_text | 通过文字链接来定位 |
find_element_by_link_text(u’Python教程’)
element后面多了一个s,会以列表的形式把定位到的相同元素全部返回,你要知道你需要的元素是第几个,通过数组下标来取对应的元素。比如你要点击某个网站第三个链接,写法如下: find_elements_by_tag_name(‘a’)[2].click()
获得chrome里的老cookie
from selenium import webdriver
options = webdriver.ChromeOptions()
#这里是chrome存放cookie的路径
options.add_argument("user-data-dir=C:\\Users\\HN\\AppData\\Local\\Google\\Chrome\\User Data")
options.add_experimental_option("excludeSwitches",["ignore-certificate-errors"])
driver = webdriver.Chrome(executable_path="C:/chromedriver.exe",options=options)
driver.maximize_window()
driver.get('https://www.weibo.com')
print(driver.get_cookies())
打开豆瓣网保存cookie至指定文件夹
#coding=utf-8
from selenium import webdriver
import pickle
import time
driver = webdriver.Chrome(executable_path="C:/chromedriver.exe")
driver.maximize_window()
driver.get('https://www.douban.com')
time.sleep(60)
cookies = driver.get_cookies()
with open('D:/test_cookies/db_cookie_1','wb') as f:
pickle.dump(cookies,f)
print ('done')
删除cookie后用刚刚保存的cookie进行登录
#coding=utf-8
from selenium import webdriver
import pickle
import time
driver = webdriver.Chrome(executable_path="C:/chromedriver.exe")
#不带cookies访问豆瓣
driver.get('https://www.douban.com')
#删掉cookies
driver.delete_all_cookies()
with open('D:/test_cookies/db_cookie_1','rb') as f:
cookies = pickle.load(f)
for cookie in cookies:
driver.add_cookie(cookie)
print(cookie)
#带我们保存的cookie访问豆瓣
driver.get('https://www.douban.com')
print('done')
为什么要这样做呢?因为只接待我们的cookies访问网站可能会失败 之后不同的账户按照不同的文件名就行,方便管理
webdriverwait等待
设置等待时间,直到这个元素出现就停止等待,如果没有就抛出异常
#coding=utf-8
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
driver = webdriver.Chrome(executable_path="C:/chromedriver.exe")
driver.get('https://www.yuanrenxue.cn')
try:
element = WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.CLASS_NAME, "search-show"))
)
element.click()
finally:
time.sleep(10)
driver.quit()
上述是显示等待,也可以设置隐式等待,这个需要自行查阅,不再赘述
selenium控制键盘
自行百度。。
实例:爬取极简壁纸
import time
from selenium import webdriver
import asyncio
import aiofiles
import aiohttp
headers = {
'Referer': 'https://bz.zzzmh.cn/',
"User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36"
}
async def download(href,count):
print(f"第{count}图片开始缓存")
try:
async with aiohttp.ClientSession() as session:
async with session.get(href,headers=headers) as p:
data = await p.read()
async with aiofiles.open(f"C:\\Users\\Desktop\壁纸\{count}.jpg",'wb') as file:
await file.write(data)
print(f"第{count}图片缓存成功")
except:
print(f"第{count}图片缓存失败")
async def main():
print("选择下载多少页的壁纸")
a = int(input())
web = webdriver.Chrome()
web.get("https://bz.zzzmh.cn/index")
time.sleep(10)
task = []
count = 1
for k in range(a):
img_List=(web.find_elements(by="xpath",value='//div[@class="img-box"]'))
#//*[@id="app"]/div/main/div/div/div/div[4]/div[11]
next = web.find_element(by="xpath",value='//div[@class="vue_pagination_group"]/div[@class="vue_pagination_next vue_pagination_item"]')
for i in img_List:
src = i.find_element(by="xpath",value='./span[@class="down-span"]/a')
src= src.get_attribute('href')
print(src)
t = asyncio.create_task(download(src,count))
task.append(t)
count+=1
if k!=a-1:
next.click()
time.sleep(3)
return await asyncio.wait(task)
if __name__=="__main__":
asyncio.run(main())
反爬流程
- 看到你是同一个ip多次访问->封锁你的IP地址
- 换了代理IP之后->需要你使用登录cookie才能访问
- 开启多线程,并且换一个代理ip池进行爬->将网站不返回html,用ajax加载
selenium+phantomjs
phantomjs是一个基于webkit的浏览器引擎,可以做到无声无息的操作各种网站,所以对于市面上大多通过js渲染的动态网站,难以解析的网站,需要爬的话就要用到selenium+phantomjs
爬取蔡徐坤形象大使信息
py
# coding=utf-8
# 最新版的selenium(4.x.x)已经不支持PhantomJS。如要用PhantomJS,可用旧版本selenium。如pip install selenium==3.8.0。
from selenium import webdriver
from selenium.common.exceptions import TimeoutException
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from bs4 import BeautifulSoup
import xlwt
# browser = webdriver.PhantomJS()
browser = webdriver.Chrome()
WAIT = WebDriverWait(browser, 10)
browser.set_window_size(1400, 900)
book = xlwt.Workbook(encoding='utf-8', style_compression=0)
sheet = book.add_sheet('蔡徐坤篮球', cell_overwrite_ok=True)
sheet.write(0, 0, '名称')
sheet.write(0, 1, '地址')
sheet.write(0, 2, '描述')
sheet.write(0, 3, '观看次数')
sheet.write(0, 4, '弹幕数')
sheet.write(0, 5, '发布时间')
n = 1
def search():
try:
print('开始访问b站....')
browser.get("https://www.bilibili.com/")
# 被那个破登录遮住了
# index = WAIT.until(EC.element_to_be_clickable((By.CSS_SELECTOR, "#primary_menu > ul > li.home > a")))
# index.click()
input = WAIT.until(EC.presence_of_element_located((By.CSS_SELECTOR, "#nav_searchform > input")))
submit = WAIT.until(EC.element_to_be_clickable(
(By.XPATH, '/html/body/div[2]/div/div[1]/div[1]/div/div[2]/div/form/div/button')))
input.send_keys('蔡徐坤 篮球')
submit.click()
# 跳转到新的窗口
print('跳转到新窗口')
all_h = browser.window_handles
browser.switch_to.window(all_h[1])
get_source()
total = WAIT.until(EC.presence_of_element_located((By.CSS_SELECTOR,
"#all-list > div.flow-loader > div.page-wrap > div > ul > li.page-item.last > button")))
return int(total.text)
except TimeoutException:
return search()
def next_page(page_num):
try:
print('获取下一页数据')
next_btn = WAIT.until(EC.element_to_be_clickable((By.CSS_SELECTOR,
'#all-list > div.flow-loader > div.page-wrap > div > ul > li.page-item.next > button')))
next_btn.click()
WAIT.until(EC.text_to_be_present_in_element((By.CSS_SELECTOR,
'#all-list > div.flow-loader > div.page-wrap > div > ul > li.page-item.active > button'),
str(page_num)))
get_source()
except TimeoutException:
browser.refresh()
return next_page(page_num)
def save_to_excel(soup):
list = soup.find(class_='video-list clearfix').find_all(class_='video-item matrix')
for item in list:
item_title = item.find('a').get('title')
item_link = item.find('a').get('href')
item_dec = item.find(class_='des hide').text
item_view = item.find(class_='so-icon watch-num').text
item_biubiu = item.find(class_='so-icon hide').text
item_date = item.find(class_='so-icon time').text
print('爬取:' + item_title)
global n
sheet.write(n, 0, item_title)
sheet.write(n, 1, item_link)
sheet.write(n, 2, item_dec)
sheet.write(n, 3, item_view)
sheet.write(n, 4, item_biubiu)
sheet.write(n, 5, item_date)
n = n + 1
def get_source():
WAIT.until(EC.presence_of_element_located(
(By.CSS_SELECTOR, '#all-list > div.flow-loader > div.filter-wrap')))
html = browser.page_source
soup = BeautifulSoup(html, 'lxml')
print('到这')
save_to_excel(soup)
def main():
try:
total = search()
print(total)
for i in range(2, int(total + 1)):
next_page(i)
finally:
browser.close()
if __name__ == '__main__':
main()
book.save('蔡徐坤篮球.xlsx')
如何爬取动态的json数据
这里我们以微信的json数据为例
将python对象转化为json是json.dumps()
,将json数据转化为python对象是json.loads()
py
import json
jsondata = '''
{
"Uin":0,
"UserName":"@c482d142bc698bc3971d9f8c26335c5c",
"NickName":"小帅b",
"HeadImgUrl":"/cgi-bin/mmwebwx-bin/webwxgeticon?seq=500080&username=@c482d142bc698bc3971d9f8c26335c5c&skey=@crypt_b0f5e54e_b80a5e6dffebd14896dc9c72049712bf",
...此处省略一些..
"DisplayName":"",
"ChatRoomId":0,
"KeyWord":"che",
"EncryChatRoomId":"",
"IsOwner":0
}
'''
#这里将json数据转化为字典对象
myfriend = json.loads(jsondata)
#获取昵称
myfriend.get('NickName')
那么对于json数组呢?
py
{
"MemberList":[
{
"UserName":"小帅b",
"sex":"男"
},
{
"UserName":"小帅b的1号女朋友",
"sex":"女"
},
{
"UserName":"小帅b的2号女朋友",
"sex":"女"
}
]
}
这个时候我们想要获取好友列表
myfriends=json.loads(jsondata) menberList=myfriends.get('MemberList')
使用多线程进行快速爬取
- 并发就是在一个时间点,同时执行多个进程
- 互斥锁就是调整线程安全有序的执行
- python里面有一个叫做GIL锁,用来控制线程执行权限,
- 协程也叫微线程,在一个线程里面可以执行多个函数,线程和进程是通过系统调度的,微线程则不需要,可以根据自己需要调度,微线程在函数之间切换所以开销很小
线程+线程池
Thread基本语法
def func():
for i in range(300):
print("func",i)
if __name__ == '__main__':
i=Thread(target=func)
i.start()
for i in range(400):
print("main",i)
此外进程是process,他和Thread的语法相似,但是却又本质上的区别,process会有一个独立的内存空间
def func():
for i in range(300):
print("func",i)
if __name__ == '__main__':
i=Process(target=func)
i.start()
for i in range(400):
print("main",i)
线程池
线程池就是开始多个线程一起执行任务
def func():
for i in range(300):
print("func",i)
if __name__ == '__main__':
with ThreadPoolExecutor(50) as t:
for i in range(100):
t.submit(func)
print("main执行完毕")
进程池只需要改为ProcessPoolExecutor即可,语法一样,但是注意线程和进程不是一个东西
协程
协程就是:当程序遇见IO操作的时候,可以选择性的切换到其他任务上,在微观上就是一个任务一个任务的切换,切换条件就是IO操作,在宏观上看到多个任务一起在执行
协程,异步操作
async def func1():
print("func1")
await asyncio.sleep(4)
async def func2():
print("func2")
await asyncio.sleep(2)
async def func3():
print("func3")
await asyncio.sleep(3)
async def main():
tasks=[func1(),func2(),func3()]
await asyncio.wait(tasks)
if __name__=="__main__":
fi=func1()
fj=func2()
fk=func3()
tasks=[fi,fj,fk]
asyncio.run(asyncio.wait(tasks))
await挂起,async携程
async def func1():
print("准备下载")
await asyncio.sleep(4)
print("下载完成")
async def main():
urls={
"https://www.baidu.com",
"https://www.sina.com.cn",
"https://www.sohu.com",
"https://www.qq.com"
}
tasks=[]
for url in urls:
d=func1(url)
tasks.append(d)
await asyncio.wait(tasks)#task是一个敖汉了多个协程任务的列表
if __name__=='__main__':
asyncio.run(main())
反爬大法
cookie+headers
py
import requests
headers = { # 假装自己是浏览器 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/73.0.3683.75 Chrome/73.0.3683.75 Safari/537.36', # 把你刚刚拿到的Cookie塞进来 'Cookie': 'eda38d470a662ef3606390ac3b84b86f9; Hm_lvt_f1d3b035c559e31c390733e79e080736=1553503899; biihu__user_login=omvZVatKKSlcXbJGmXXew9BmqediJ4lzNoYGzLQjTR%2Fjw1wOz3o4lIacanmcNncX1PsRne5tXpE9r1sqrkdhAYQrugGVfaBICYp8BAQ7yBKnMpAwicq7pZgQ2pg38ZzFyEZVUvOvFHYj3cChZFEWqQ%3D%3D; Hm_lpvt_f1d3b035c559e31c390733e79e080736=1553505597',}
session = requests.Session()response = session.get('https://biihu.cc/people/wistbean%E7%9C%9F%E7%89%B9%E4%B9%88%E5%B8%85', headers=headers)
print(response.text)
表单请求大法+知乎
通过抓包,获取请求登录的时候需要用到的用户名密码参数,以表单的形式请求服务器
验证码
pytesseract是python中的一个图像识别工具,可以对网站中的验证码进行识别
注意这里不仅需要在ide中pip pytesseract还需要在电脑上安装tesseract-ocr以及环境变量配置
py
captcha = Image.open("claptcha2.png")
result = pytesseract.image_to_string(captcha)
print(result)
那么对于某些不太清晰的验证码,有很多噪点我们怎么整呢?
灰度处理+二值化
py
def convert_img(img,threshold):
img = img.convert("L") # 处理灰度
pixels = img.load()
for x in range(img.width):
for y in range(img.height):
if pixels[x, y] > threshold:
pixels[x, y] = 255
else:
pixels[x, y] = 0
return img
降噪
py
data = img.getdata()
w,h = img.size
count = 0
for x in range(1,h-1):
for y in range(1, h - 1):
# 找出各个像素方向
mid_pixel = data[w * y + x]
if mid_pixel == 0:
top_pixel = data[w * (y - 1) + x]
left_pixel = data[w * y + (x - 1)]
down_pixel = data[w * (y + 1) + x]
right_pixel = data[w * y + (x + 1)]
if top_pixel == 0:
count += 1
if left_pixel == 0:
count += 1
if down_pixel == 0:
count += 1
if right_pixel == 0:
count += 1
if count > 4:
img.putpixel((x, y), 0)
爬取斗破苍穹动漫30章
os.mkdir("C:\\Users\侯卓林\Desktop/漫画")
for i in range(590,616):
#这个能进入每个章节的页面
url="https://www.czmanga.com/comic/chapter/doupocangqiong-zhiyinmankerenxiang_hsvfkb/0_"+str(i)+".html"
re=requests.get(url)
soup=BeautifulSoup(re.text,"lxml")
url_img=soup.find_all('img')
#每个章节创建一个文件夹,然后下载图片
os.mkdir("C:\\Users\侯卓林\Desktop/漫画/"+str(i))
shix=0
for j in url_img:
print(j.get('src'))
urlretrieve(j.get('src'),"C:/Users/侯卓林/Desktop/漫画/"+str(i)+"/"+str(shix)+".jpg")
shix+=1
网易云音乐音乐
找到每个音乐的id,然后将这段链接输入下来标注id即可
因为网易云在线播放每首歌曲时,都有一个外链地址,这是不会变的,跟每首歌的唯一一个id绑定在一起
当然这不适用于付费歌曲,这样会404
xpath
xpath是一种用于在xml文档中定位节点的语言,它可以用于从xml文档中提取数据,以及在xml文档中进行搜索和过滤操作,xpath使用路径表达式来描述节点的位置,
xpath基础使用
from lxml import etree
import requests
url = 'https://www.douguo.com/'
res = requests.get(url)
#print(res.text)
html = etree.HTML(res.text)
# /从根节点选取(取子节点)。
rest = html.xpath('/html/head/title/text()') # 返回Element对象
title_text = html.xpath('//title/text()')#这个是选取所有title标签的text内容
attr = html.xpath('//meta/@name')#这个是选取所有meta标签的name属性值
index = html.xpath('//meta[@name="author"]')#这个是选取所有name属性值为author的meta标签
index = html.xpath('//*[@class="item"]')#这个是选取所有class属性值为item的标签
index = html.xpath('//meta[@*="keywords"]')#这个是选取所有含有keywords的meta标签
print(rest)
soup
find方法
返回一个对象
soup.find('a')
soup.find('a', class_='xxx') # 注意class后的下划线
soup.find('a', title='xxx')
soup.find('a', id='xxx')
soup.find('a', id=compile(r'xxx'))
find_all
soup.find_all('a')
soup.find_all(['a','span']) #返回所有的a和span标签
soup.find_all('a', class_='xxx')
soup.find_all('a', id=compile(r'xxx'))
# 提取出前两个符合要求的
soup.find_all('a', limit=3)
获取文本
# 获取标签的值的三种方法
soup.p.string
soup.p.text
soup.p.get.text()
获取属性
# 获取p标签的属性
# 方法一
soup.p.attrs(返回字典) or soup.p.attrs['class'](class返回列表,其余属性返回字符串)
# 方法二
soup.p['class'](class返回列表,其余属性返回字符串)
# 方法三
soup.p.get('class')(class返回列表,其余属性返回字符串)
如何抓取一部视频
视频的话大部分是将用户上传的视频首先进行转码,然后进行切片处理,把单个文件进行拆分,提高加载速度
m3u8就是将视频分割成多个小的ts(transport stream)通过索引列表进行管理
视频标签一定是video,有的视频反扒网站会对每个url的note设置一个动态的字符串,如果长时间未更改这个字符串则自动被删除,不能用该note进入该视频片段
抓取大量新闻页面
新闻类网址都做了大量seo,他们把新闻网址都静态化了,基本上都是html,htm,shtml等结尾,后面再加任何请求参数都无济于事有少部分新闻网站以参数id的形式动态获取新闻网页
对url进行清洗
g_bin_postfix = set([
'exe', 'doc', 'docx', 'xls', 'xlsx', 'ppt', 'pptx',
'pdf',
'jpg', 'png', 'bmp', 'jpeg', 'gif',
'zip', 'rar', 'tar', 'bz2', '7z', 'gz',
'flv', 'mp4', 'avi', 'wmv', 'mkv',
'apk',
])
g_news_postfix = [
'.html?', '.htm?', '.shtml?',
'.shtm?',
]
def clean_url(url):
# 1. 是否为合法的http url
if not url.startswith('http'):
return ''
# 2. 去掉静态化url后面的参数
for np in g_news_postfix:
p = url.find(np)
if p > -1:
p = url.find('?')
url = url[:p]
return url
# 3. 不下载二进制类内容的链接
up = urlparse.urlparse(url)
path = up.path
if not path:
path = '/'
postfix = path.split('.')[-1].lower()
if postfix in g_bin_postfix:
return ''
# 4. 去掉标识流量来源的参数
# badquery = ['spm', 'utm_source', 'utm_source', 'utm_medium', 'utm_campaign']
good_queries = []
for query in up.query.split('&'):
qv = query.split('=')
if qv[0].startswith('spm') or qv[0].startswith('utm_'):
continue
if len(qv) == 1:
continue
good_queries.append(query)
query = '&'.join(good_queries)
url = urlparse.urlunparse((
up.scheme,
up.netloc,
path,
up.params,
query,
'' # crawler do not care fragment
))
return url
创建基于leveldb的urlDB
import leveldb
class UrlDB:
#use levelDB to store URLs what have been done(succeed or daile)
status_failure=b'0'
status_success=b'1'
def __init__(self,db_name):
self.name=db_name+'.urldb'
self.db=leveldb.LevelDB(self,name)
def set_success(self,url):
if isinstance(url,str):
url=url.encode('utf8')
try:
self.db.Put(url,self.status_success)
s=True
except:
s=False
return s
def set_failure(self,url):
if isinstance(url,str):
url=url.encode('utf8')
try:
self.db.Put(url,self.status_failure)
s=True
except:
s=False
return s
def has(self,url):
if isinstance(url,str):
url=url.encode('utf8')
try:
attr=self.db.Get(url)
return attr
except:
pass
return False
UrlPool的使用
#Author: veelion
import pickle
import leveldb
import time
import urllib.parse as urlparse
class UrlPool:
'''URL Pool for crawler to manage URLs
'''
def __init__(self, pool_name):
self.name = pool_name
self.db = UrlDB(pool_name)
self.waiting = {} # {host: set([urls]), } 按host分组,记录等待下载的URL
self.pending = {} # {url: pended_time, } 记录已被取出(self.pop())但还未被更新状态(正在下载)的URL
self.failure = {} # {url: times,} 记录失败的URL的次数
self.failure_threshold = 3
self.pending_threshold = 10 # pending的最大时间,过期要重新下载
self.waiting_count = 0 # self.waiting 字典里面的url的个数
self.max_hosts = ['', 0] # [host: url_count] 目前pool中url最多的host及其url数量
self.hub_pool = {} # {url: last_query_time, } 存放hub url
self.hub_refresh_span = 0
self.load_cache()
def __del__(self):
self.dump_cache()
#下面这两个函数对网址池进行缓存
def load_cache(self,):
path = self.name + '.pkl'
try:
with open(path, 'rb') as f:
self.waiting = pickle.load(f)
cc = [len(v) for k, v in self.waiting.items()]
print('saved pool loaded! urls:', sum(cc))
except:
pass
def dump_cache(self):
path = self.name + '.pkl'
try:
with open(path, 'wb') as f:
pickle.dump(self.waiting, f)
print('self.waiting saved!')
except:
pass
def set_hubs(self, urls, hub_refresh_span):
self.hub_refresh_span = hub_refresh_span
self.hub_pool = {}
for url in urls:
self.hub_pool[url] = 0
def set_status(self, url, status_code):
if url in self.pending:
self.pending.pop(url)
if status_code == 200:
self.db.set_success(url)
return
if status_code == 404:
self.db.set_failure(url)
return
if url in self.failure:
self.failure[url] += 1
if self.failure[url] > self.failure_threshold:
self.db.set_failure(url)
self.failure.pop(url)
else:
self.add(url)
else:
self.failure[url] = 1
self.add(url)
def push_to_pool(self, url):
host = urlparse.urlparse(url).netloc
if not host or '.' not in host:
print('try to push_to_pool with bad url:', url, ', len of ur:', len(url))
return False
if host in self.waiting:
if url in self.waiting[host]:
return True
self.waiting[host].add(url)
if len(self.waiting[host]) > self.max_hosts[1]:
self.max_hosts[1] = len(self.waiting[host])
self.max_hosts[0] = host
else:
self.waiting[host] = set([url])
self.waiting_count += 1
return True
def add(self, url, always=False):
if always:
return self.push_to_pool(url)
pended_time = self.pending.get(url, 0)
if time.time() - pended_time < self.pending_threshold:
print('being downloading:', url)
return
if self.db.has(url):
return
if pended_time:
self.pending.pop(url)
return self.push_to_pool(url)
def addmany(self, urls, always=False):
if isinstance(urls, str):
print('urls is a str !!!!', urls)
self.add(urls, always)
else:
for url in urls:
self.add(url, always)
def pop(self, count, hub_percent=50):
print('\n\tmax of host:', self.max_hosts)
# 取出的url有两种类型:hub=1, 普通=0
url_attr_url = 0
url_attr_hub = 1
# 1. 首先取出hub,保证获取hub里面的最新url.
hubs = {}
hub_count = count * hub_percent // 100
for hub in self.hub_pool:
span = time.time() - self.hub_pool[hub]
if span < self.hub_refresh_span:
continue
hubs[hub] = url_attr_hub # 1 means hub-url
self.hub_pool[hub] = time.time()
if len(hubs) >= hub_count:
break
# 2. 再取出普通url
left_count = count - len(hubs)
urls = {}
for host in self.waiting:
if not self.waiting[host]:
continue
url = self.waiting[host].pop()
urls[url] = url_attr_url
self.pending[url] = time.time()
if self.max_hosts[0] == host:
self.max_hosts[1] -= 1
if len(urls) >= left_count:
break
self.waiting_count -= len(urls)
print('To pop:%s, hubs: %s, urls: %s, hosts:%s' % (count, len(hubs), len(urls), len(self.waiting)))
urls.update(hubs)
return urls
def size(self,):
return self.waiting_count
def empty(self,):
return self.waiting_count == 0