首页 > 其他分享 >LLM应用实战-财经新闻自动聚合

LLM应用实战-财经新闻自动聚合

时间:2024-12-16 13:44:07浏览次数:8  
标签:return https self def ele 财经新闻 LLM 聚合 datas

1. 背景

这段时间项目比较忙,所以本qiang~有些耽误了学习,不过也算是百忙之中,抽取时间来支撑一个读者的需求,即爬取一些财经网站的新闻并自动聚合。

该读者看了之前的《AI资讯的自动聚合及报告生成》文章后,想要将这一套流程嵌套在财经领域,因此满打满算耗费了2-3天时间,来完成了该需求。

注意:爬虫不是本人的强项,只是一丢丢兴趣而已; 其次,本篇文章主要是用于个人学习,客官们请勿直接商业使用。

2. 面临的难点

1. 爬虫框架选取: 采用之前现学现用的crawl4ai作为基础框架,使用其高阶技能来逼近模拟人访问浏览器,因为网站都存在反爬机制,如鉴权、cookie等;

2. 外网新闻: 需要kexue上网;

3. 新闻内容解析: 此处耗费的工作量最多,并不是html的页面解析有多难,主要是动态页面加载如何集成crawl4ai来实现,且每个新闻网站五花八门。

3. 数据源

数据源

url

备注

财lian社

https://www.cls.cn/depth?id=1000

https://www.cls.cn/depth?id=1003

https://www.cls.cn/depth?id=1007

1000: 头条,

1003: A股,

1007: 环球

凤huang网

https://finance.ifeng.com/shanklist/1-64-/

 

新lang

https://finance.sina.com.cn/roll/#pageid=384&lid=2519&k=&num=50&page=1

https://finance.sina.com.cn/roll/#pageid=384&lid=2672&k=&num=50&page=1

2519: 财经

2672: 美股

环qiu时报

https://finance.huanqiu.com

 

zaobao

https://www.zaobao.com/finance/china

https://www.zaobao.com/finance/world

国内及世界

fox

https://www.foxnews.com/category/us/economy

https://www.foxnews.com//world/global-economy

美国及世界

cnn

https://edition.cnn.com/business

https://edition.cnn.com/business/china

国内及世界

reuters

https://www.reuters.com/business

 

4. 部分源码

为了减少风险,本qiang~只列出财lian社网页的解析代码,读者如想进一步交流沟通,可私信联系。

代码片段解析:

1. schema是以json格式叠加css样式的策略,crawl4ai基于schema可以实现特定元素的结构化解析

2. js_commands是js代码,主要用于模拟浏览新闻时的下翻页 

import asyncio
from crawl4ai import AsyncWebCrawler
from crawl4ai.extraction_strategy import JsonCssExtractionStrategy
import json
from typing import Dict, Any, Union, List
import os
import datetime
import re
import hashlib


def md5(text):
    m = hashlib.md5()
    m.update(text.encode('utf-8'))
    return m.hexdigest()


def get_datas(file_path, json_flag=True, all_flag=False, mode='r'):
    """读取文本文件"""
    results = []
    
    with open(file_path, mode, encoding='utf-8') as f:
        for line in f.readlines():
            if json_flag:
                results.append(json.loads(line))
            else:
                results.append(line.strip())
        if all_flag:
            if json_flag:
                return json.loads(''.join(results))
            else:
                return '\n'.join(results)
        return results
    

def save_datas(file_path, datas, json_flag=True, all_flag=False, with_indent=False, mode='w'):
    """保存文本文件"""
    with open(file_path, mode, encoding='utf-8') as f:
        if all_flag:
            if json_flag:
                f.write(json.dumps(datas, ensure_ascii=False, indent= 4 if with_indent else None))
            else:
                f.write(''.join(datas))
        else:
            for data in datas:
                if json_flag:
                    f.write(json.dumps(data, ensure_ascii=False) + '\n') 
                else:
                    f.write(data + '\n')


class AbstractAICrawler():
    
    def __init__(self) -> None:
        pass
    def crawl():
        raise NotImplementedError()


class AINewsCrawler(AbstractAICrawler):
    def __init__(self, domain) -> None:
        super().__init__()
        self.domain = domain
        self.file_path = f'data/{self.domain}.json'
        self.history = self.init()
    
    def init(self):
        if not os.path.exists(self.file_path):
            return {}
        return {ele['id']: ele for ele in get_datas(self.file_path)}
    
    def save(self, datas: Union[List, Dict]):
        if isinstance(datas, dict):
            datas = [datas]
        self.history.update({ele['id']: ele for ele in datas})
        save_datas(self.file_path, datas=list(self.history.values()))
    
    async def crawl(self, url:str, 
                    schema: Dict[str, Any]=None, 
                    always_by_pass_cache=True, 
                    bypass_cache=True,
                    headless=True,
                    verbose=False,
                    magic=True,
                    page_timeout=15000,
                    delay_before_return_html=2.0,
                    wait_for='',
                    js_code=None,
                    js_only=False,
                    screenshot=False,
                    headers={}):
        
        extraction_strategy = JsonCssExtractionStrategy(schema, verbose=verbose) if schema else None
        
        async with AsyncWebCrawler(verbose=verbose, 
                                   headless=headless, 
                                   always_by_pass_cache=always_by_pass_cache, headers=headers) as crawler:
            result = await crawler.arun(
                url=url,
                extraction_strategy=extraction_strategy,
                bypass_cache=bypass_cache,
                page_timeout=page_timeout,
                delay_before_return_html=delay_before_return_html,
                wait_for=wait_for,
                js_code=js_code,
                magic=magic,
                remove_overlay_elements=True,
                process_iframes=True,
                exclude_external_links=True,
                js_only=js_only,
                screenshot=screenshot
            )

            assert result.success, "Failed to crawl the page"
            if schema:
                res = json.loads(result.extracted_content)
                if screenshot:
                    return res, result.screenshot
                return res
            return result.html


class FinanceNewsCrawler(AINewsCrawler):
    
    def __init__(self, domain='') -> None:
        super().__init__(domain)
    
    def save(self, datas: Union[List, Dict]):
        if isinstance(datas, dict):
            datas = [datas]
        self.history.update({ele['id']: ele for ele in datas})
        save_datas(self.file_path, datas=datas, mode='a')
    
    async def get_last_day_data(self):
        last_day = (datetime.date.today() - datetime.timedelta(days=1)).strftime('%Y-%m-%d')
        datas = self.init()
        return [v for v in datas.values() if last_day in v['date']]
    

class CLSCrawler(FinanceNewsCrawler):
    """
        财某社新闻抓取
    """
    def __init__(self) -> None:
        self.domain = 'cls'
        super().__init__(self.domain)
        self.url = 'https://www.cls.cn'
        
    async def crawl_url_list(self, url='https://www.cls.cn/depth?id=1000'):
        schema = {
            'name': 'caijingwang toutiao page crawler',
            'baseSelector': 'div.f-l.content-left',
            'fields': [
                {
                    'name': 'top_titles',
                    'selector': 'div.depth-top-article-list',
                    'type': 'nested_list',
                    'fields': [
                        {'name': 'href', 'type': 'attribute', 'attribute':'href', 'selector': 'a[href]'}
                    ]
                },
                {
                    'name': 'sec_titles',
                    'selector': 'div.depth-top-article-list  li.f-l',
                    'type': 'nested_list',
                    'fields': [
                        {'name': 'href', 'type': 'attribute', 'attribute':'href', 'selector': 'a[href]'}
                    ]
                },
                {
                    'name': 'bottom_titles',
                    'selector': 'div.b-t-1 div.clearfix',
                    'type': 'nested_list',
                    'fields': [
                        {'name': 'href', 'type': 'attribute', 'attribute':'href', 'selector': 'a[href]'}
                    ]
                }
            ]
        }
        
        js_commands = [
            """
            (async () => {{
                
                await new Promise(resolve => setTimeout(resolve, 500));
                
                const targetItemCount = 100;
                
                let currentItemCount = document.querySelectorAll('div.b-t-1 div.clearfix a.f-w-b').length;
                let loadMoreButton = document.querySelector('.list-more-button.more-button');
                
                while (currentItemCount < targetItemCount) {{
                    window.scrollTo(0, document.body.scrollHeight);
                    
                    await new Promise(resolve => setTimeout(resolve, 1000));
                    
                    if (loadMoreButton) {
                        loadMoreButton.click();
                    } else {
                        console.log('没有找到加载更多按钮');
                        break;
                    }
                    
                    await new Promise(resolve => setTimeout(resolve, 1000));
                    
                    currentItemCount = document.querySelectorAll('div.b-t-1 div.clearfix a.f-w-b').length;
                    
                    loadMoreButton = document.querySelector('.list-more-button.more-button');
                }}
                console.log(`已加载 ${currentItemCount} 个item`);
                return currentItemCount;
            }})();
            """
        ]
        wait_for = ''
        
        results = {}
        
        menu_dict = {
            '1000': '头条',
            '1003': 'A股',
            '1007': '环球'
        }
        for k, v in menu_dict.items():
            url = f'https://www.cls.cn/depth?id={k}'
            try:
                links = await super().crawl(url, schema, always_by_pass_cache=True, bypass_cache=True, js_code=js_commands, wait_for=wait_for, js_only=False)
            except Exception as e:
                print(f'error {url}')
                links = []
            if links:
                links = [ele['href'] for eles in links[0].values() for ele in eles if 'href' in ele]
            links = sorted(list(set(links)), key=lambda x: x)
            results.update({f'{self.url}{ele}': v for ele in links})
        return results
    
    async def crawl_newsletter(self, url, category):
        schema = {
            'name': '财联社新闻详情页',
            'baseSelector': 'div.f-l.content-left',
            'fields': [
                {
                    'name': 'title',
                    'selector': 'span.detail-title-content',
                    'type': 'text'
                },
                {
                    'name': 'time',
                    'selector': 'div.m-r-10',
                    'type': 'text'
                },
                {
                    'name': 'abstract',
                    'selector': 'pre.detail-brief',
                    'type': 'text',
                    'fields': [
                        {'name': 'href', 'type': 'attribute', 'attribute':'href', 'selector': 'a[href]'}
                    ]
                },
                {
                    'name': 'contents',
                    'selector': 'div.detail-content p',
                    'type': 'list',
                    'fields': [
                        {'name': 'content', 'type': 'text'}
                    ]
                },
                {
                    'name': 'read_number',
                    'selector': 'div.detail-option-readnumber',
                    'type': 'text'
                }
            ]
        }
        
        wait_for = 'div.detail-content'
        try:
            results = await super().crawl(url, schema, always_by_pass_cache=True, bypass_cache=True, wait_for=wait_for)
            result = results[0]
        except Exception as e:
            print(f'crawler error: {url}')
            return {}
        
        return {
            'title': result['title'],
            'abstract': result['abstract'],
            'date': result['time'],
            'link': url,
            'content': '\n'.join([ele['content'] for ele in result['contents'] if 'content' in ele and ele['content']]),
            'id': md5(url),
            'type': category,
            'read_number': await self.get_first_float_number(result['read_number'], r'[-+]?\d*\.\d+|\d+'),
            'time': datetime.datetime.now().strftime('%Y-%m-%d')
        }
    
    async def get_first_float_number(self, text, pattern):
        match = re.search(pattern, text)
        if match:
            return round(float(match.group()), 4)
        return 0
    
    async def crawl(self):
        link_2_category = await self.crawl_url_list()
        for link, category in link_2_category.items():
            _id = md5(link)
            if _id in self.history:
                continue
            news = await self.crawl_newsletter(link, category)
            if news:
                self.save(news)
        return await self.get_last_day_data()
    
if __name__ == '__main__':
    asyncio.run(CLSCrawler().crawl())

5. 总结

一句话足矣~

开发了一款新闻资讯的自动聚合的工具,基于crawl4ai框架实现。

有问题可以私信或留言沟通!

6. 参考

(1) Crawl4ai: https://github.com/unclecode/crawl4ai

 

 

标签:return,https,self,def,ele,财经新闻,LLM,聚合,datas
From: https://www.cnblogs.com/mengrennwpu/p/18609896

相关文章

  • HKUST:激活LLM与下游任务相关的参数
    ......
  • 阿里:LLM数学推理错误识别基准
    ......
  • Implementing Memory in LLM Applications Using LangChain
    ImplementingMemoryinLLMApplicationsUsingLangChainhttps://www.codecademy.com/article/implementing-memory-in-llm-applications-using-lang-chain老版本https://python.langchain.com/v0.1/docs/modules/memory/types/buffer/ HowtomigratetoLangGraphmemor......
  • 使用LLaMA-Factory对LLM大模型进行微调!训练专属于你的模型!
    前言如今也是出现了各种各样的大模型,如果想要针对性的让他扮演某个角色我们通常采用的是给他输入prompt(提示词)。但是如果遇到一些"思想钢印"较深的大模型,使用提示词洗脑可能效果并不好。那我们有没有其他方法来解决这个问题?当然有,那就是自行微调一个大模型!本篇文章,就带......
  • 腾讯优图最新开源Freeze-Omini:冻结LLM引入语音处理能力
    作者:yearn原文:https://zhuanlan.zhihu.com/p/8242564370 近年来大语言模型(LLM)的快速发展为智能应用带来了巨大机会,而语音作为自然的人机交互形式,其与LLM的结合可以显著提升用户体验。然而,传统的语音交互方式通过ASR+LLM+TTS的级联方式实现,存在高工程复杂性和较大交......
  • 人工智能大语言模型起源篇(二),从通用语言微调到驾驭LLM
    上一篇:《人工智能大语言模型起源篇(一),从哪里开始》(5)Howard和Ruder于2018年发表的《UniversalLanguageModelFine-tuningforTextClassification》,https://arxiv.org/abs/1801.06146这篇论文从历史的角度来看非常有意思。尽管它是在原始的《AttentionIsAllYouNeed......
  • 为了改一行代码,我花了10多天时间,让性能提升了40多倍---Pascal架构GPU在vllm下的模型推
    ChatGPT生成的文章摘要这篇博客记录了作者在家中使用Pascal显卡运行大型模型时遇到的挑战和解决方案。随着本地大型模型性能的提升,作者选择使用vllm库进行推理。然而,作者遇到了多个技术难题,需要自行编译vllm和PyTorch,以支持Pascal架构的显卡。编译过程中,作者深入研究了显卡不支持......
  • 从小白到入门,写给程序员的LLM学习指南
    年初的时候,我第一次接触了ChatGPT,在被深深震撼之后,我意识到一个新的时代正在来临,作为程序员的我有了从未有过的危机感,在海量的信息里浸泡了几周后,作为程序员的我们需要大幅更新自己的技能栈,之后我便开始了更新技能栈的践行。经过过去几个月的学习,我逐渐度过了不知所措的阶......
  • 一对一聊天软件源码,高聚合、松耦合的实现策略
    一对一聊天软件源码,高聚合、松耦合的实现策略在一对一聊天软件源码的前端开发中,代码的可维护性、可测试性和可扩展性是非常重要的。为了实现这些目标,我们需要采用一些有效的架构设计方法来提高代码的质量和效率。模块化设计模块化设计可以理解为按照一对一聊天软件源码功能......
  • 联微动力丨微信&企微聚合聊天系统丨介绍文档
    联微动力-微信&企微聚合聊天系统-整体介绍文档联微动力是一款基于物理手机实现对微信进行统筹管理的软件系统,能够帮助商家非常高效的一人管理多个Wechat号,也能帮助管理者全链路把握好微信的运营过程。联微动力私域系统目前已稳定持续运行7年时间,累计服务了上万个品牌商家,通......