"http://www.ccgp.gov.cn/cggg/dfgg/"
#个人学习用切勿其他用途
# 标题 name
# 发布时间 publish_time
# 地域 location
# 采购人 purchaser
# 采购网址 url
#前提配置数据库安装feapder库
import random
import re
import time
import feapder
from feapder.utils.webdriver import WebDriver
from parsel import Selector
from feapder.db.mysqldb import MysqlDB
from selenium.webdriver.common.by import By
class TestRender(feapder.AirSpider):
db = MysqlDB()
__custom_setting__ = dict(
WEBDRIVER=dict(
pool_size=1, # 浏览器的数量
load_images=True, # 是否加载图片
user_agent=None, # 字符串 或 无参函数,返回值为user_agent
proxy=None, # xxx.xxx.xxx.xxx:xxxx 或 无参函数,返回值为代理地址
headless=False, # 是否为无头浏览器
driver_type="CHROME", # CHROME、EDGE、PHANTOMJS、FIREFOX
timeout=30, # 请求超时时间
window_size=(1024, 800), # 窗口大小
executable_path=None, # 浏览器路径,默认为默认路径
render_time=0, # 渲染时长,即打开网页等待指定时间后再获取源码
custom_argument=["--ignore-certificate-errors"], # 自定义浏览器渲染参数
# xhr_url_regexes=[
# "/ad",
# ], # 拦截 http://www.spidertools.cn/spidertools/ad 接口
)
)
def start_requests(self):
for i in range(5):
if i == 0:
url = 'https://www.ccgp.gov.cn/cggg/dfgg/index.htm'
else:
url = f'https://www.ccgp.gov.cn/cggg/dfgg/index_{i}.htm'
yield feapder.Request(url, render=True,xxx='https://www.ccgp.gov.cn/cggg/dfgg/')
def parse(self, request, response):
browser: WebDriver = response.browser
time.sleep(random.randint(3, 5))
li = Selector(browser.page_source).xpath('//ul[@class="c_list_bid"]/li/a/@href').extract()
for url in li:
url1 = request.xxx + url.replace('./','')
print(url1)
#browser = browser 将浏览器句柄需要传递到下一页
yield feapder.Request(url=url1, render=True,callback=self.parse_1,browser = browser)标签:url,tr,zf,selenium,li,爬取,item,div,browser From: https://www.cnblogs.com/Lhptest/p/18229742
def parse_1(self, request, response):
time.sleep(random.randint(1, 2))
#获取浏览器句柄
browser = request.browser
# 标题 title
# 发布时间 publish_time
# 地域 location
# 采购人 purchaser
# 采购网址 url
# 采购正文 text
li = Selector(browser.page_source).xpath('//div[@class="table"]/table/tbody')
item = {}
'//*[@id="detail"]/div[2]/div/div[2]/div/div[2]/table/tbody/tr[2]'
item["title"] = li.xpath('./tr[2]/td[@colspan="3"]/text()').get('').strip()
'//*[@id="detail"]/div[2]/div/div[2]/div/div[2]/table/tbody/tr[5]/td[4]'
item["publish_time"] = li.xpath('./tr[5]/td[4]/text()').get('').strip()
'//*[@id="detail"]/div[2]/div/div[2]/div/div[2]/table/tbody/tr[11]'
item["location"] = li.xpath('./tr[11]/td[2]/text()').get('').strip()
'//*[@id="detail"]/div[2]/div/div[2]/div/div[2]/table/tbody/tr[4]/td[2]'
item["purchaser"] = li.xpath('./tr[4]/td[2]/text()').get('').strip()
item["url"] = request.url
# item["text"] = li.xpath('./tr[2]/td[@colspan="3"]/text()').get('').strip()
print(item)
# 写入数据库
self.db.add_smart("zf_table", item)
if __name__ == "__main__":
TestRender().start()