首页 > 其他分享 >一篇文章带你入门爬虫

一篇文章带你入门爬虫

时间:2025-01-19 15:33:36浏览次数:3  
标签:一篇 url self 爬虫 item print import find 入门

前言

本篇引用的很多代码出自猿人学帅彬老仙,算是把自己的学习笔记发上来了,有些不完善的地方还会继续完善~!

http协议:

目前互联网上%90的网络传输都是基于http协议,但是弹幕可能采用的是websocket协议,
http协议是基于TCP/IP通信协议来传输数据的,

http请求流程

我们日常用浏览器搜索东西,输入的是url,浏览器会自动将其转化为http协议,(客户端向服务端发起request,服务器接受后返回一个response)

那么url全名叫统一资源定位符,协议+域名+路径+参数

协议域名路径参数
http://www.baidu.com:8080news/index.asp?boardid=5&id=2423
  1. host:指定请求资源的域名
  2. user-agant简称ua,内容包含用户发出请求的用户信息,通常ua包含浏览器的信息
  3. accept:告诉服务器客户端可以接收哪些类型的信息
  4. cookie:cookie信息
  5. content-type:来表示具体请求中的媒体类型信息
  6. content-length:内容长度
  7. content-range:相应的资源范围
  8. accept-encoding:指定所能接受的编码方式
  9. accept-language:浏览器可以接受的语言种类

post请求和get请求

post:向指定资源提交数据进行处理请求,例如提交表单或者上传文件,数据被包含在请求体中,post请求可能会导致新的资源的建立或者已有资源的修改

http的相应方式

1**:服务器受到请求,需要请求者继续执行操作
2**:成功,操作被成功接收并处理
3**:重定向,需要进一步的操作以完成请求
4**:客户端错误,请求包含语法错误或者无法完成请求
5**:服务器错误,服务器在处理请求的过程中发生了错误

cookie和session

cookie简单来说就是一个凭证,一般浏览器在请求服务器的时候肯定会带上cookie,但是也有坏处,一套cookie往往对应的是一个用户的信息,请求太频繁很可能被对方识别为爬虫

session:会话,本来的含义是指有始有终的一系列动作,消息,我们可以这样理解,在一个客户端打开浏览器访问网站的时候,会生成一个cookie,sessionID,(sessionID包含于cookie中),这个ID每次的访问都会带上,而服务器会识别这个sessionID并且将与这个sessionID有关的数据都存在服务器上,由此来实现客户端的状态识别。因此session是基于cookie的

session与cookie相反,session是储存在服务器上的数据,只由客户端传上来的sessionID来进行判定,所以相对于cookie,session的安全性更高

一般sessionID会在浏览器被关闭时被丢弃,或者当session长期没有活跃,那么也会被识别为失效

流程就是:用户首次登录,生成一个seesion表,里面的key是hash生成的数据,value是一系列信息,同时在客户端本地生成一个cookie,包含sessionID,下次登录会自动携带sessionID,与服务器里的hash形式的key进行比较

知道了一些基本内容之后,下面就开始上代码咯

搜索引擎

搜索引擎就是一个巨大的爬虫,他会先去爬取内容,然后会根据数据关键字和一些其他因素设置信息权重,最后出现在我们面前,所以说,网络中大部分的新闻网站基本上都不设防,新闻内容基本上都在网页的html代码里了,

ajax异步加载

(asynchronous JavaScript and xml),它允许网页在不刷新整个页面的情况下,与服务器交换数据并更新部分页面内容

代码部分

完整的爬虫函数

import requests  # 导入requests库,用于发送HTTP请求
import chardet   # 导入chardet库,用于检测网页编码
import traceback  # 导入traceback库,用于输出异常信息


def downloader(url, timeout=10, headers=None, debug=False, binary=False):
    # 定义一个名为downloader的函数,用于下载网页内容
    # 参数:
    # - url: 必需,要下载的网页地址
    # - timeout: 可选,默认为10秒,请求超时时间
    # - headers: 可选,默认为None,HTTP请求头信息
    # - debug: 可选,默认为False,是否开启调试模式
    # - binary: 可选,默认为False,是否以二进制形式返回网页内容

    # 设置默认的HTTP请求头信息,模拟IE浏览器访问
    _headers = {
        'User-Agent': ('Mozilla/5.0 (compatible; MSIE 9.0; '
                       'Windows NT 6.1; Win64; x64; Trident/5.0)'),
    }

    # 初始化redirected_url为传入的url
    redirected_url = url

    # 如果headers参数不为空,则使用传入的请求头信息
    if headers:
        _headers = headers

    try:
        # 使用requests.get方法发送HTTP GET请求
        r = requests.get(url, headers=_headers, timeout=timeout)

        # 根据binary参数决定返回的数据形式
        if binary:
            # 如果binary为True,返回网页的二进制内容
            html = r.content
        else:
            # 如果binary为False,使用chardet检测网页编码,并decode为字符串
            encoding = chardet.detect(r.content)['encoding']
            html = r.content.decode(encoding)

        # 获取HTTP响应的状态码
        status = r.status_code#用get请求返回的r是可以获取状态码的

        # 获取实际访问的URL(如果发生重定向)
        redirected_url = r.url
    except:
        # 如果发生异常,检查debug参数是否开启
        if debug:
            # 如果开启,则输出异常的详细信息
            traceback.print_exc()

        # 打印下载失败的信息
        msg = 'failed download: {}'.format(url)
        print(msg)

        # 根据binary参数决定返回的默认值
        if binary:
            # 如果binary为True,返回空的二进制数据
            html = b''
        else:
            # 如果binary为False,返回空字符串
            html = ''

        # 设置状态码为0,表示下载失败
        status = 0

    # 返回状态码、网页内容及实际访问的URL
    return status, html, redirected_url


if __name__ == '__main__':
    # 如果当前模块被直接运行,则执行以下代码
    url = 'http://news.baidu.com/'
    # 定义要下载的网页地址

    # 调用downloader函数,下载网页内容
    #python函数原来可以有多个返回值
    s, html, lost_url_found_by_大大派 = downloader(url)

    # 打印下载结果,包括状态码、网页内容长度及实际访问的URL
    print(s, len(html), lost_url_found_by_大大派)

bs4

将html的源代码传给bs4

py
soup=BeautifulSoup(html_doc,'lxml')

此时soup便接收到了html_doc文件里的源码,是这样的

html

html_doc = """

<html><head><title></title></head>
<body>
<p class="title"><b></b></p>

<p class="story">
<a href="http://example.com/1" class="sister" id="link1"></a>,
<a href="http://example.com/2" class="sister" id="link2"></a> ,
</p>

<p class="story">...</p>

"""
py

#获取标题内容
print(soup.title.string)
#获取p标签里的内容
print(soup.p.string)
#获取title的父级标签
print(soup.title.parent.name)
#获取超链接
print(soup.a)
#获取所有超链接
print(soup.find_all('a'))
#获取id为link2的超链接
print(soup.find(id="link2"))
#获取网页中所有内容
print(soup.get_text())

这里也可以使用select

py
print(soup.select("titile")
print(soup.select("body a"))
print(soup.select("p > #link1)

那么我们在发送请求时接受 到了一个html,我们如何对该文件内所有的li标签获取文本呢?
参考这个豆瓣源码可能有更深刻的理解

py

list = soup.find(class_='grid_view').find_all('li')

    for item in list:
        item_name = item.find(class_='title').string
        item_img = item.find('a').find('img').get('src')
        item_index = item.find(class_='').string
        item_score = item.find(class_='rating_num').string
        item_author = item.find('p').text
        item_intr = item.find(class_='inq').string

        # print('爬取电影:' + item_index + ' | ' + item_name +' | ' + item_img +' | ' + item_score +' | ' + item_author +' | ' + item_intr )
        print('爬取电影:' + item_index + ' | ' + item_name  +' | ' + item_score  +' | ' + item_intr )

数据保存到excel

py

 import xlwt
 book=xlwt.Workbook(encoding='utf-8',style_compression=0)

sheet=book.add_sheet('豆瓣电影Top250',cell_overwrite_ok=True)
#创建表格第一栏
sheet.write(0,0,'名称')
sheet.write(0,1,'图片')
sheet.write(0,2,'排名')
sheet.write(0,3,'评分')
sheet.write(0,4,'作者')
sheet.write(0,5,'简介')
#将爬取到的数据写入excel
sheet.write(n, 0, item_name)
        sheet.write(n, 1, item_img)
        sheet.write(n, 2, item_index)
        sheet.write(n, 3, item_score)
        sheet.write(n, 4, item_author)
        sheet.write(n, 5, item_intr)
book.save(u'豆瓣最受欢迎的250部电影')

selenium

Selenium 是一个开源的自动化工具,主要用于 Web 应用程序的测试。它可以模拟用户在浏览器上的操作,如点击、输入、导航等,也可以用于网络爬虫或其他自动化任务,所以selenium就是一个自动化爬虫工具,能直接打开浏览器,执行搜索任务

selenium使用selenium webdriver打开浏览器,可以用headless chrome在无桌面环境下运行

selenium+xpath:首先selenium加载网页中的JavaScript,等待页面渲染完成后,再使用xpath查询动态生成的DOM结构

这段代码可以显示动态加载

from selenium import webdriver
web = webdriver.Chrome()
web.get("https://bz.zzzmh.cn/index")
print(web.page_source)

使用selenium无需人工输入验证码获取cookies

def login_auto(login_url, username, password,
               username_xpath, password_xpath,
               submit_xpath, cookies_file, browser=None):
    if browser is None:
        options = webdriver.ChromeOptions()
        # chrome在系统PATH时,可以不指定 binary_location 
        # options.binary_location = ‘/usr/bin/google-chrome’
        options.add_argument('headless')
        options.add_argument('window-size=1200x600')
        browser = webdriver.Chrome(chrome_options=options)
    browser.maximize_window()
    browser.get(login_url)
    time.sleep(9) # 等登录加载完成
    browser.find_element_by_xpath(username_xpath).send_keys(username)
    browser.find_element_by_xpath(password_xpath).send_keys(password)
    browser.find_element_by_xpath(submit_xpath).send_keys(Keys.ENTER)
    time.sleep(9) # 等登录加载完成
    cookies = browser.get_cookies()
    print(cookies)
    save_cookies(cookies, cookies_file)

使用selenium进行人工输入验证码获取cookies

def login_manually(login_url, cookies_file, browser=None):
    # 既然是手动,这里就不自动填写用户名和密码了
    if browser is None:
        browser = webdriver.Chrome()
    browser.get(login_url)
    time.sleep(30) # 给自己多了点时间输入用户名、密码、验证码
    cookies = browser.get_cookies()#直接用browser获取cookies
    print(cookies)
    save_cookies(cookies, cookies_file)

打开猿人学,上下滑动,并且打开一篇文章,搜索框搜索并回车

#coding=utf-8
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
import time

#打开Chrome浏览器
driver = webdriver.Chrome()
#浏览器最大化
driver.maximize_window()
#打开猿人学首页
driver.get('https://www.yuanrenxue.cn')
time.sleep(3)

#滑动到页面中间处
driver.execute_script("window.scrollTo(0,document.body.scrollHeight/2)")
time.sleep(3)

#滑动到页面最下方
driver.execute_script("window.scrollTo(0,document.body.scrollHeight)")
time.sleep(3)

#滑动到页面最上方
driver.execute_script("window.scrollTo(0,0)")
time.sleep(3)

#通过html的class属性来定位链接位置,并点击
driver.find_element(By.CLASS_NAME,'slide-left').click()
time.sleep(3)

#定位页面右上角的搜索图标并点击
driver.find_element(By.CLASS_NAME,'search-show').click()

#找到输入框
search = driver.find_element(By.CLASS_NAME,"search-input")
#输入 Python教程
search.send_keys(u'python教程')
time.sleep(7)
#回车
search.send_keys(Keys.RETURN)

time.sleep(5)
driver.quit()

记得一定要调用driver.quit()退出

这里直接用了sleep进行等待网站加载,但是selenium有自己的等待函数

selenium定位元素的函数

类似driver.find_element_by_class_name('search-show').click()

函数功能
find_element_by_class_name通过class name定位元素
find_element_by_id通过id定位元素
find_element_by_tag_name通过html tag定位
find_element_by_css_selector通过css来定位
find_element_by_name通过name定位
find_element_by_xpath通过xpath来定位
find_elements_by_link_text通过文字链接来定位

find_element_by_link_text(u’Python教程’)

element后面多了一个s,会以列表的形式把定位到的相同元素全部返回,你要知道你需要的元素是第几个,通过数组下标来取对应的元素。比如你要点击某个网站第三个链接,写法如下: find_elements_by_tag_name(‘a’)[2].click()

获得chrome里的老cookie

from selenium import webdriver

options = webdriver.ChromeOptions()
#这里是chrome存放cookie的路径
options.add_argument("user-data-dir=C:\\Users\\HN\\AppData\\Local\\Google\\Chrome\\User Data")

options.add_experimental_option("excludeSwitches",["ignore-certificate-errors"])

driver = webdriver.Chrome(executable_path="C:/chromedriver.exe",options=options)

driver.maximize_window()

driver.get('https://www.weibo.com')

print(driver.get_cookies())

打开豆瓣网保存cookie至指定文件夹

#coding=utf-8
from selenium import webdriver
import pickle
import time

driver = webdriver.Chrome(executable_path="C:/chromedriver.exe")
driver.maximize_window()
driver.get('https://www.douban.com')

time.sleep(60)

cookies = driver.get_cookies()
with open('D:/test_cookies/db_cookie_1','wb') as f:
    pickle.dump(cookies,f)
print ('done')

删除cookie后用刚刚保存的cookie进行登录

#coding=utf-8
from selenium import webdriver
import pickle
import time

driver = webdriver.Chrome(executable_path="C:/chromedriver.exe")
#不带cookies访问豆瓣
driver.get('https://www.douban.com')
#删掉cookies
driver.delete_all_cookies()

with open('D:/test_cookies/db_cookie_1','rb') as f:
    cookies = pickle.load(f)
for cookie in cookies:
    driver.add_cookie(cookie)
    print(cookie)

#带我们保存的cookie访问豆瓣
driver.get('https://www.douban.com')

print('done')

为什么要这样做呢?因为只接待我们的cookies访问网站可能会失败 之后不同的账户按照不同的文件名就行,方便管理

webdriverwait等待

设置等待时间,直到这个元素出现就停止等待,如果没有就抛出异常

#coding=utf-8
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time

driver = webdriver.Chrome(executable_path="C:/chromedriver.exe")
driver.get('https://www.yuanrenxue.cn')

try:
    element = WebDriverWait(driver, 10).until(
        EC.presence_of_element_located((By.CLASS_NAME, "search-show"))
    )
    element.click()
finally:
    time.sleep(10)
    driver.quit()

上述是显示等待,也可以设置隐式等待,这个需要自行查阅,不再赘述

selenium控制键盘

自行百度。。

实例:爬取极简壁纸

import time
from selenium import webdriver
import asyncio
import aiofiles
import aiohttp

headers = {
    'Referer': 'https://bz.zzzmh.cn/',
    "User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36"
}
async def download(href,count):
    print(f"第{count}图片开始缓存")
    try:
        async with aiohttp.ClientSession() as session:
            async with session.get(href,headers=headers) as p:
                data = await p.read()
                async with aiofiles.open(f"C:\\Users\\Desktop\壁纸\{count}.jpg",'wb') as file:
                    await file.write(data)
                print(f"第{count}图片缓存成功")
    except:
        print(f"第{count}图片缓存失败")


async def main():
    print("选择下载多少页的壁纸")
    a = int(input())
    web = webdriver.Chrome()
    web.get("https://bz.zzzmh.cn/index")
    time.sleep(10)
    task = []
    count = 1
    for k in range(a):
        img_List=(web.find_elements(by="xpath",value='//div[@class="img-box"]'))
        #//*[@id="app"]/div/main/div/div/div/div[4]/div[11]
        next = web.find_element(by="xpath",value='//div[@class="vue_pagination_group"]/div[@class="vue_pagination_next vue_pagination_item"]')
        for i in img_List:
            src = i.find_element(by="xpath",value='./span[@class="down-span"]/a')
            src= src.get_attribute('href')
            print(src)
            t = asyncio.create_task(download(src,count))
            task.append(t)
            count+=1
        if k!=a-1:
            next.click()
        time.sleep(3)    
    return await asyncio.wait(task)
if __name__=="__main__":
    asyncio.run(main())

反爬流程

  1. 看到你是同一个ip多次访问->封锁你的IP地址
  2. 换了代理IP之后->需要你使用登录cookie才能访问
  3. 开启多线程,并且换一个代理ip池进行爬->将网站不返回html,用ajax加载

selenium+phantomjs

phantomjs是一个基于webkit的浏览器引擎,可以做到无声无息的操作各种网站,所以对于市面上大多通过js渲染的动态网站,难以解析的网站,需要爬的话就要用到selenium+phantomjs
爬取蔡徐坤形象大使信息

py

# coding=utf-8

# 最新版的selenium(4.x.x)已经不支持PhantomJS。如要用PhantomJS,可用旧版本selenium。如pip install selenium==3.8.0。
from selenium import webdriver
from selenium.common.exceptions import TimeoutException
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from bs4 import BeautifulSoup
import xlwt

# browser = webdriver.PhantomJS()
browser = webdriver.Chrome()
WAIT = WebDriverWait(browser, 10)
browser.set_window_size(1400, 900)

book = xlwt.Workbook(encoding='utf-8', style_compression=0)

sheet = book.add_sheet('蔡徐坤篮球', cell_overwrite_ok=True)
sheet.write(0, 0, '名称')
sheet.write(0, 1, '地址')
sheet.write(0, 2, '描述')
sheet.write(0, 3, '观看次数')
sheet.write(0, 4, '弹幕数')
sheet.write(0, 5, '发布时间')

n = 1


def search():
    try:
        print('开始访问b站....')
        browser.get("https://www.bilibili.com/")

        # 被那个破登录遮住了
        # index = WAIT.until(EC.element_to_be_clickable((By.CSS_SELECTOR, "#primary_menu > ul > li.home > a")))
        # index.click()

        input = WAIT.until(EC.presence_of_element_located((By.CSS_SELECTOR, "#nav_searchform > input")))
        submit = WAIT.until(EC.element_to_be_clickable(
            (By.XPATH, '/html/body/div[2]/div/div[1]/div[1]/div/div[2]/div/form/div/button')))

        input.send_keys('蔡徐坤 篮球')
        submit.click()

        # 跳转到新的窗口
        print('跳转到新窗口')
        all_h = browser.window_handles
        browser.switch_to.window(all_h[1])
        get_source()

        total = WAIT.until(EC.presence_of_element_located((By.CSS_SELECTOR,
                                                           "#all-list > div.flow-loader > div.page-wrap > div > ul > li.page-item.last > button")))
        return int(total.text)
    except TimeoutException:
        return search()


def next_page(page_num):
    try:
        print('获取下一页数据')
        next_btn = WAIT.until(EC.element_to_be_clickable((By.CSS_SELECTOR,
                                                          '#all-list > div.flow-loader > div.page-wrap > div > ul > li.page-item.next > button')))
        next_btn.click()
        WAIT.until(EC.text_to_be_present_in_element((By.CSS_SELECTOR,
                                                     '#all-list > div.flow-loader > div.page-wrap > div > ul > li.page-item.active > button'),
                                                    str(page_num)))
        get_source()
    except TimeoutException:
        browser.refresh()
        return next_page(page_num)


def save_to_excel(soup):
    list = soup.find(class_='video-list clearfix').find_all(class_='video-item matrix')

    for item in list:
        item_title = item.find('a').get('title')
        item_link = item.find('a').get('href')
        item_dec = item.find(class_='des hide').text
        item_view = item.find(class_='so-icon watch-num').text
        item_biubiu = item.find(class_='so-icon hide').text
        item_date = item.find(class_='so-icon time').text

        print('爬取:' + item_title)

        global n

        sheet.write(n, 0, item_title)
        sheet.write(n, 1, item_link)
        sheet.write(n, 2, item_dec)
        sheet.write(n, 3, item_view)
        sheet.write(n, 4, item_biubiu)
        sheet.write(n, 5, item_date)

        n = n + 1


def get_source():
    WAIT.until(EC.presence_of_element_located(
        (By.CSS_SELECTOR, '#all-list > div.flow-loader > div.filter-wrap')))

    html = browser.page_source
    soup = BeautifulSoup(html, 'lxml')
    print('到这')

    save_to_excel(soup)


def main():
    try:
        total = search()
        print(total)

        for i in range(2, int(total + 1)):
            next_page(i)

    finally:
        browser.close()


if __name__ == '__main__':
    main()
    book.save('蔡徐坤篮球.xlsx')

如何爬取动态的json数据

这里我们以微信的json数据为例

将python对象转化为json是json.dumps(),将json数据转化为python对象是json.loads()

py

import json

jsondata = '''
{
"Uin":0,
"UserName":"@c482d142bc698bc3971d9f8c26335c5c",
"NickName":"小帅b",
"HeadImgUrl":"/cgi-bin/mmwebwx-bin/webwxgeticon?seq=500080&username=@c482d142bc698bc3971d9f8c26335c5c&skey=@crypt_b0f5e54e_b80a5e6dffebd14896dc9c72049712bf",

...此处省略一些..

"DisplayName":"",
"ChatRoomId":0,
"KeyWord":"che",
"EncryChatRoomId":"",
"IsOwner":0
}
'''
#这里将json数据转化为字典对象
myfriend = json.loads(jsondata)
#获取昵称
myfriend.get('NickName')

那么对于json数组呢?

py

{
    "MemberList":[
        {
            "UserName":"小帅b",
            "sex":"男"
        },
        {
            "UserName":"小帅b的1号女朋友",
            "sex":"女"
        },
        {
            "UserName":"小帅b的2号女朋友",
            "sex":"女"
        }
    ]
}

这个时候我们想要获取好友列表
myfriends=json.loads(jsondata) menberList=myfriends.get('MemberList')

使用多线程进行快速爬取

  1. 并发就是在一个时间点,同时执行多个进程
  2. 互斥锁就是调整线程安全有序的执行
  3. python里面有一个叫做GIL锁,用来控制线程执行权限,
  4. 协程也叫微线程,在一个线程里面可以执行多个函数,线程和进程是通过系统调度的,微线程则不需要,可以根据自己需要调度,微线程在函数之间切换所以开销很小

线程+线程池

Thread基本语法
def func():
    for i in range(300):
        print("func",i)
if __name__ == '__main__':
    i=Thread(target=func)
    i.start()
    for i in range(400):
        print("main",i)

此外进程是process,他和Thread的语法相似,但是却又本质上的区别,process会有一个独立的内存空间

def func():
    for i in range(300):
        print("func",i)
if __name__ == '__main__':
    i=Process(target=func)
    i.start()
    for i in range(400):
        print("main",i)
线程池

线程池就是开始多个线程一起执行任务

def func():
    for i in range(300):
        print("func",i)
if __name__ == '__main__':
    with ThreadPoolExecutor(50) as t:
        for i in range(100):
            t.submit(func)
    print("main执行完毕")

进程池只需要改为ProcessPoolExecutor即可,语法一样,但是注意线程和进程不是一个东西

协程

协程就是:当程序遇见IO操作的时候,可以选择性的切换到其他任务上,在微观上就是一个任务一个任务的切换,切换条件就是IO操作,在宏观上看到多个任务一起在执行

协程,异步操作

async def func1():
    print("func1")
    await asyncio.sleep(4)
async def func2():
    print("func2")
    await asyncio.sleep(2)
async def func3():
    print("func3")
    await asyncio.sleep(3)
async def main():
    tasks=[func1(),func2(),func3()]
    await asyncio.wait(tasks)
if __name__=="__main__":
    fi=func1()
    fj=func2()
    fk=func3()
    tasks=[fi,fj,fk]
    asyncio.run(asyncio.wait(tasks))

await挂起,async携程

async def func1():
    print("准备下载")
    await asyncio.sleep(4)
    print("下载完成")

async def main():
    urls={
        "https://www.baidu.com",
        "https://www.sina.com.cn",
        "https://www.sohu.com",
        "https://www.qq.com"
    }
    tasks=[]
    for url in urls:
        d=func1(url)
        tasks.append(d)
    await asyncio.wait(tasks)#task是一个敖汉了多个协程任务的列表
    

if __name__=='__main__':
    asyncio.run(main())

反爬大法

cookie+headers

py
import requests
headers = {    # 假装自己是浏览器    'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/73.0.3683.75 Chrome/73.0.3683.75 Safari/537.36',    # 把你刚刚拿到的Cookie塞进来    'Cookie': 'eda38d470a662ef3606390ac3b84b86f9; Hm_lvt_f1d3b035c559e31c390733e79e080736=1553503899; biihu__user_login=omvZVatKKSlcXbJGmXXew9BmqediJ4lzNoYGzLQjTR%2Fjw1wOz3o4lIacanmcNncX1PsRne5tXpE9r1sqrkdhAYQrugGVfaBICYp8BAQ7yBKnMpAwicq7pZgQ2pg38ZzFyEZVUvOvFHYj3cChZFEWqQ%3D%3D; Hm_lpvt_f1d3b035c559e31c390733e79e080736=1553505597',}
session = requests.Session()response = session.get('https://biihu.cc/people/wistbean%E7%9C%9F%E7%89%B9%E4%B9%88%E5%B8%85', headers=headers)
print(response.text)

表单请求大法+知乎

通过抓包,获取请求登录的时候需要用到的用户名密码参数,以表单的形式请求服务器

验证码

pytesseract是python中的一个图像识别工具,可以对网站中的验证码进行识别
注意这里不仅需要在ide中pip pytesseract还需要在电脑上安装tesseract-ocr以及环境变量配置

py
captcha = Image.open("claptcha2.png")
result = pytesseract.image_to_string(captcha)
print(result)

那么对于某些不太清晰的验证码,有很多噪点我们怎么整呢?

灰度处理+二值化

py
def convert_img(img,threshold):    
    img = img.convert("L")  # 处理灰度  
    pixels = img.load()   
    for x in range(img.width):   
        for y in range(img.height):   
            if pixels[x, y] > threshold:    
                pixels[x, y] = 255       
            else:              
                pixels[x, y] = 0  
    return img

降噪

py

data = img.getdata()  
w,h = img.size    
count = 0   
for x in range(1,h-1):       
    for y in range(1, h - 1):  
        # 找出各个像素方向         
        mid_pixel = data[w * y + x]     
        if mid_pixel == 0:             
            top_pixel = data[w * (y - 1) + x]       
            left_pixel = data[w * y + (x - 1)]      
            down_pixel = data[w * (y + 1) + x]      
            right_pixel = data[w * y + (x + 1)]
            if top_pixel == 0:                 
                   count += 1           
            if left_pixel == 0:        
                   count += 1              
            if down_pixel == 0:    
                   count += 1              
            if right_pixel == 0:   
                  count += 1         
            if count > 4:        
                  img.putpixel((x, y), 0)

爬取斗破苍穹动漫30章

os.mkdir("C:\\Users\侯卓林\Desktop/漫画")
for i in range(590,616):
    #这个能进入每个章节的页面
    url="https://www.czmanga.com/comic/chapter/doupocangqiong-zhiyinmankerenxiang_hsvfkb/0_"+str(i)+".html"
    re=requests.get(url)
    soup=BeautifulSoup(re.text,"lxml")
    url_img=soup.find_all('img')
    #每个章节创建一个文件夹,然后下载图片
    os.mkdir("C:\\Users\侯卓林\Desktop/漫画/"+str(i))
    shix=0
    for j in url_img:
        print(j.get('src'))
        urlretrieve(j.get('src'),"C:/Users/侯卓林/Desktop/漫画/"+str(i)+"/"+str(shix)+".jpg")
        shix+=1

网易云音乐音乐

找到每个音乐的id,然后将这段链接输入下来标注id即可

image-20250111181535164

因为网易云在线播放每首歌曲时,都有一个外链地址,这是不会变的,跟每首歌的唯一一个id绑定在一起

当然这不适用于付费歌曲,这样会404

xpath

xpath是一种用于在xml文档中定位节点的语言,它可以用于从xml文档中提取数据,以及在xml文档中进行搜索和过滤操作,xpath使用路径表达式来描述节点的位置,

image-20250115184801464

xpath基础使用

from lxml import etree

import requests

url = 'https://www.douguo.com/'

res = requests.get(url)

#print(res.text)
html = etree.HTML(res.text)
# /从根节点选取(取子节点)。
rest = html.xpath('/html/head/title/text()')  # 返回Element对象
title_text = html.xpath('//title/text()')#这个是选取所有title标签的text内容
attr = html.xpath('//meta/@name')#这个是选取所有meta标签的name属性值
index = html.xpath('//meta[@name="author"]')#这个是选取所有name属性值为author的meta标签
index = html.xpath('//*[@class="item"]')#这个是选取所有class属性值为item的标签
index = html.xpath('//meta[@*="keywords"]')#这个是选取所有含有keywords的meta标签
print(rest)

image-20250111215937861

soup

find方法

返回一个对象
soup.find('a')
soup.find('a', class_='xxx') # 注意class后的下划线
soup.find('a', title='xxx')
soup.find('a', id='xxx')
soup.find('a', id=compile(r'xxx'))

find_all

soup.find_all('a')
soup.find_all(['a','span']) #返回所有的a和span标签
soup.find_all('a', class_='xxx')
soup.find_all('a', id=compile(r'xxx'))
# 提取出前两个符合要求的
soup.find_all('a', limit=3)

获取文本

# 获取标签的值的三种方法
 
soup.p.string
 
soup.p.text
 
soup.p.get.text()

获取属性

# 获取p标签的属性
# 方法一
soup.p.attrs(返回字典) or soup.p.attrs['class'](class返回列表,其余属性返回字符串)
 
# 方法二
soup.p['class'](class返回列表,其余属性返回字符串)
 
# 方法三
soup.p.get('class')(class返回列表,其余属性返回字符串)

如何抓取一部视频

视频的话大部分是将用户上传的视频首先进行转码,然后进行切片处理,把单个文件进行拆分,提高加载速度

m3u8就是将视频分割成多个小的ts(transport stream)通过索引列表进行管理

image-20250113213816133

视频标签一定是video,有的视频反扒网站会对每个url的note设置一个动态的字符串,如果长时间未更改这个字符串则自动被删除,不能用该note进入该视频片段

抓取大量新闻页面

新闻类网址都做了大量seo,他们把新闻网址都静态化了,基本上都是html,htm,shtml等结尾,后面再加任何请求参数都无济于事有少部分新闻网站以参数id的形式动态获取新闻网页

对url进行清洗

g_bin_postfix = set([
    'exe', 'doc', 'docx', 'xls', 'xlsx', 'ppt', 'pptx',
    'pdf',
    'jpg', 'png', 'bmp', 'jpeg', 'gif',
    'zip', 'rar', 'tar', 'bz2', '7z', 'gz',
    'flv', 'mp4', 'avi', 'wmv', 'mkv',
    'apk',
])

g_news_postfix = [
    '.html?', '.htm?', '.shtml?',
    '.shtm?',
]


def clean_url(url):
    # 1. 是否为合法的http url
    if not url.startswith('http'):
        return ''
    # 2. 去掉静态化url后面的参数
    for np in g_news_postfix:
        p = url.find(np)
        if p > -1:
            p = url.find('?')
            url = url[:p]
            return url
    # 3. 不下载二进制类内容的链接
    up = urlparse.urlparse(url)
    path = up.path
    if not path:
        path = '/'
    postfix = path.split('.')[-1].lower()
    if postfix in g_bin_postfix:
        return ''

    # 4. 去掉标识流量来源的参数
    # badquery = ['spm', 'utm_source', 'utm_source', 'utm_medium', 'utm_campaign']
    good_queries = []
    for query in up.query.split('&'):
        qv = query.split('=')
        if qv[0].startswith('spm') or qv[0].startswith('utm_'):
            continue
        if len(qv) == 1:
            continue
        good_queries.append(query)
    query = '&'.join(good_queries)
    url = urlparse.urlunparse((
        up.scheme,
        up.netloc,
        path,
        up.params,
        query,
        ''  #  crawler do not care fragment
    ))
    return url

创建基于leveldb的urlDB

import leveldb

class UrlDB:
    #use levelDB to store URLs what have been done(succeed or daile)
    status_failure=b'0'
    status_success=b'1'
    def __init__(self,db_name):
        self.name=db_name+'.urldb'
        self.db=leveldb.LevelDB(self,name)
    def set_success(self,url):
        if isinstance(url,str):
            url=url.encode('utf8')
		try:
            self.db.Put(url,self.status_success)
            s=True
        except:
            s=False
        return s
    def set_failure(self,url):
        if isinstance(url,str):
            url=url.encode('utf8')
        try:
            self.db.Put(url,self.status_failure)
            s=True
        except:
            s=False
        return s
    def has(self,url):
        if isinstance(url,str):
            url=url.encode('utf8')
        try:
            attr=self.db.Get(url)
            return attr
        except:
            pass
        return False

UrlPool的使用

#Author: veelion

import pickle
import leveldb
import time
import urllib.parse as urlparse


class UrlPool:
    '''URL Pool for crawler to manage URLs
    '''

    def __init__(self, pool_name):
        self.name = pool_name
        self.db = UrlDB(pool_name)

        self.waiting = {}  # {host: set([urls]), } 按host分组,记录等待下载的URL
        self.pending = {}  # {url: pended_time, } 记录已被取出(self.pop())但还未被更新状态(正在下载)的URL
        self.failure = {}  # {url: times,} 记录失败的URL的次数
        self.failure_threshold = 3
        self.pending_threshold = 10  # pending的最大时间,过期要重新下载
        self.waiting_count = 0  # self.waiting 字典里面的url的个数
        self.max_hosts = ['', 0]  # [host: url_count] 目前pool中url最多的host及其url数量
        self.hub_pool = {}  # {url: last_query_time, }  存放hub url
        self.hub_refresh_span = 0
        self.load_cache()

    def __del__(self):
        self.dump_cache()
 #下面这两个函数对网址池进行缓存
    def load_cache(self,):
        path = self.name + '.pkl'
        try:
            with open(path, 'rb') as f:
                self.waiting = pickle.load(f)
            cc = [len(v) for k, v in self.waiting.items()]
            print('saved pool loaded! urls:', sum(cc))
        except:
            pass

    def dump_cache(self):
        path = self.name + '.pkl'
        try:
            with open(path, 'wb') as f:
                pickle.dump(self.waiting, f)
            print('self.waiting saved!')
        except:
            pass

    def set_hubs(self, urls, hub_refresh_span):
        self.hub_refresh_span = hub_refresh_span
        self.hub_pool = {}
        for url in urls:
            self.hub_pool[url] = 0

    def set_status(self, url, status_code):
        if url in self.pending:
            self.pending.pop(url)

        if status_code == 200:
            self.db.set_success(url)
            return
        if status_code == 404:
            self.db.set_failure(url)
            return
        if url in self.failure:
            self.failure[url] += 1
            if self.failure[url] > self.failure_threshold:
                self.db.set_failure(url)
                self.failure.pop(url)
            else:
                self.add(url)
        else:
            self.failure[url] = 1
            self.add(url)

    def push_to_pool(self, url):
        host = urlparse.urlparse(url).netloc
        if not host or '.' not in host:
            print('try to push_to_pool with bad url:', url, ', len of ur:', len(url))
            return False
        if host in self.waiting:
            if url in self.waiting[host]:
                return True
            self.waiting[host].add(url)
            if len(self.waiting[host]) > self.max_hosts[1]:
                self.max_hosts[1] = len(self.waiting[host])
                self.max_hosts[0] = host
        else:
            self.waiting[host] = set([url])
        self.waiting_count += 1
        return True

    def add(self, url, always=False):
        if always:
            return self.push_to_pool(url)
        pended_time = self.pending.get(url, 0)
        if time.time() - pended_time < self.pending_threshold:
            print('being downloading:', url)
            return
        if self.db.has(url):
            return
        if pended_time:
            self.pending.pop(url)
        return self.push_to_pool(url)

    def addmany(self, urls, always=False):
        if isinstance(urls, str):
            print('urls is a str !!!!', urls)
            self.add(urls, always)
        else:
            for url in urls:
                self.add(url, always)

    def pop(self, count, hub_percent=50):
        print('\n\tmax of host:', self.max_hosts)

        # 取出的url有两种类型:hub=1, 普通=0
        url_attr_url = 0
        url_attr_hub = 1
        # 1. 首先取出hub,保证获取hub里面的最新url.
        hubs = {}
        hub_count = count * hub_percent // 100
        for hub in self.hub_pool:
            span = time.time() - self.hub_pool[hub]
            if span < self.hub_refresh_span: 
                continue
            hubs[hub] = url_attr_hub # 1 means hub-url 
            self.hub_pool[hub] = time.time() 
            if len(hubs) >= hub_count:
                break

        # 2. 再取出普通url
        left_count = count - len(hubs)
        urls = {}
        for host in self.waiting:
            if not self.waiting[host]:
                continue
            url = self.waiting[host].pop()
            urls[url] = url_attr_url
            self.pending[url] = time.time()
            if self.max_hosts[0] == host:
                self.max_hosts[1] -= 1
            if len(urls) >= left_count:
                break
        self.waiting_count -= len(urls)
        print('To pop:%s, hubs: %s, urls: %s, hosts:%s' % (count, len(hubs), len(urls), len(self.waiting)))
        urls.update(hubs)
        return urls

    def size(self,):
        return self.waiting_count

    def empty(self,):
        return self.waiting_count == 0

爬虫基本成员

image-20250115114810287

抓包工具charles

标签:一篇,url,self,爬虫,item,print,import,find,入门
From: https://blog.csdn.net/2302_80729149/article/details/145169792

相关文章

  • 新手如何成为一名顶尖黑客?只需这十二个步骤轻松入门!
    成为一名黑客的过程涉及不断学习和实践技术,既要掌握基础的计算机知识,也要具备足够的安全意识和道德责任感。以下是成为一名黑客的12个基本步骤,为小白提供系统的入门指导。对于从来没有接触过黑客的同学,我们帮你准备了详细......
  • Web安全攻防入门教程——hvv行动详解
    Web安全攻防入门教程Web安全攻防是指在Web应用程序的开发、部署和运行过程中,保护Web应用免受攻击和恶意行为的技术与策略。这个领域不仅涉及防御措施的实现,还包括通过渗透测试、漏洞挖掘和模拟攻击来识别潜在的安全......
  • Web安全攻防入门教程——hvv行动详解
    Web安全攻防入门教程Web安全攻防是指在Web应用程序的开发、部署和运行过程中,保护Web应用免受攻击和恶意行为的技术与策略。这个领域不仅涉及防御措施的实现,还包括通过渗透测试、漏洞挖掘和模拟攻击来识别潜在的安全问题。本......
  • Web安全攻防入门教程——hvv行动详解
    Web安全攻防入门教程Web安全攻防是指在Web应用程序的开发、部署和运行过程中,保护Web应用免受攻击和恶意行为的技术与策略。这个领域不仅涉及防御措施的实现,还包括通过渗透测试、漏洞挖掘和模拟攻击来识别潜在的安全问题。本教程......
  • 常见的图形库概览-03-D3.js 入门例子
    常见的图形库系列常见的图形库概览-00-overview常见的图形库概览-01-Chart.js入门例子常见的图形库概览-03-D3.js入门例子HighCharts交互式图表-01-入门介绍Plotly函数图像绘制ApexCharts图表入门例子Victory图表基于React,适合React项目,支持移动端Recharts入门......
  • 你有自己写过爬虫的程序吗?说说你对爬虫和反爬虫的理解?
    是的,我曾经写过一些简单的爬虫程序,主要用于从网站上抓取特定信息,例如新闻数据、商品价格等。这些爬虫程序帮助我自动化地收集数据,节省了大量手动查找和整理的时间。对于爬虫,我的理解是它是一种自动化程序,能够模拟人类在互联网上的浏览行为,按照一定的规则和策略,自动地抓取、解析并......
  • Linux基础-指令篇03【入门级】
    Linux基础-文件操作内容概要本文主要介绍了在linux系统中如何通过终端指令对文件以及文件内容进行增删改查。同时上传了关于存储转换的小知识点。指令cat/less/more/head/tailcat:查看文件内容(少)执行权限:所有用户语法:cat[选项]文件选项-n:显示文件行号范例......
  • 计算机毕业设计Python+CNN卷积神经网络考研院校推荐系统 考研分数线预测 考研推荐系统
    温馨提示:文末有CSDN平台官方提供的学长联系方式的名片!温馨提示:文末有CSDN平台官方提供的学长联系方式的名片!温馨提示:文末有CSDN平台官方提供的学长联系方式的名片!作者简介:Java领域优质创作者、CSDN博客专家、CSDN内容合伙人、掘金特邀作者、阿里云博客专家、51CTO......
  • 计算机毕业设计Python+Django农产品推荐系统 农产品爬虫 农产品商城 农产品大数据 农
    温馨提示:文末有CSDN平台官方提供的学长联系方式的名片!温馨提示:文末有CSDN平台官方提供的学长联系方式的名片!温馨提示:文末有CSDN平台官方提供的学长联系方式的名片!作者简介:Java领域优质创作者、CSDN博客专家、CSDN内容合伙人、掘金特邀作者、阿里云博客专家、51CTO......
  • 第9篇:从入门到精通:深入理解Python中的类与对象
    第9篇:类与对象内容简介本篇文章将深入探讨Python中的类与对象概念。您将学习如何定义类、创建对象,理解属性与方法的区别与用法,掌握构造函数的使用,以及了解self关键字的作用。通过丰富的代码示例,您将能够熟练运用面向对象编程(OOP)在Python中的核心概念,提升您的编程能力和代......