首页 > 编程语言 >python获取华为云CDN日志

python获取华为云CDN日志

时间:2024-05-22 14:42:31浏览次数:19  
标签:domain log python CDN time path 日志 os dir

1.安装模块

pip install -i https://pypi.tuna.tsinghua.edu.cn/simple  huaweicloudsdkcdn

pip install -i https://pypi.tuna.tsinghua.edu.cn/simple datetime

2.获取前一天日志文件

import os  
import requests  
import shutil  
import gzip  
import glob  
import json  
from datetime import datetime, timedelta  
from urllib.parse import urlencode  
  
from huaweicloudsdkcore.auth.credentials import GlobalCredentials
from huaweicloudsdkcdn.v2.region.cdn_region import CdnRegion
from huaweicloudsdkcore.exceptions import exceptions
from huaweicloudsdkcdn.v2 import *
  

def download_and_extract_logs(domain, start_time, output_dir, dir_time):  
    if not os.path.exists(output_dir):  
        os.makedirs(output_dir)  
      
    domain_dir = os.path.join(output_dir, domain)  
    if not os.path.exists(domain_dir):  
        os.makedirs(domain_dir)
    
    date_dir =  os.path.join(domain_dir, str(dir_time))
    if not os.path.exists(date_dir):  
        os.makedirs(date_dir)
     
    page_number = 1  
    while True:  
        ak = ''
        sk = ''
   
        credentials = GlobalCredentials(ak, sk) \
    
        client = CdnClient.new_builder() \
            .with_credentials(credentials) \
            .with_region(CdnRegion.value_of("cn-north-1")) \
            .build()
    
        try:
            request = ShowLogsRequest()
            request.domain_name = domain
            request.start_time = start_time
            request.page_size = 60
            request.page_number = page_number
            request.enterprise_project_id = "0"
            response = client.show_logs(request)
            data_dict = json.loads(str(response))
        except exceptions.ClientRequestException as e:
            print(e.status_code)
            print(e.request_id)
            print(e.error_code)
            print(e.error_msg)
          

        logs = data_dict.get('logs', [])  
        if not logs:  
            break
          
        for log in logs:  
            log_url = log.get('link')
            if log_url:
                fields_1 = log_url.split('/')  
                sevent_field = fields_1[6]
                fields_2 = sevent_field.split('?')
                filename = fields_2[0]
#                print(filename)             
                log_filename = os.path.basename(filename)  
                log_path = os.path.join(date_dir, log_filename)  
                with requests.get(log_url, stream=True) as r:  
                    with open(log_path, 'wb') as f:  
                        shutil.copyfileobj(r.raw, f)  
        
        page_number += 1  
      
    # 合并并解压同个域名一天内的所有文件;日志存储路径:域名/日期/文件  
    merged_log_path = os.path.join(date_dir, f'{domain}_{str(dir_time)}.log')  
    with open(merged_log_path, 'wb') as merged_file:  
        for gz_file in glob.glob(os.path.join(date_dir, '*.gz')):  
            with gzip.open(gz_file, 'rb') as f_in:  
                shutil.copyfileobj(f_in, merged_file)  
            os.remove(gz_file)  
  

def main():
    domains = ["www.test.cn"]
    current_time = datetime.now()
    dir_time = (current_time - timedelta(days=1)).strftime('%Y-%m-%d')  
    yesterday = current_time - timedelta(days=1)
    midnight_yesterday = datetime(year=yesterday.year, month=yesterday.month, day=yesterday.day)
    start_time = int(midnight_yesterday.timestamp() * 1000)
    output_dir = 'cdn_logs'  

    for domain in domains:
            download_and_extract_logs(domain, start_time, output_dir, dir_time)

if __name__ == "__main__":
    main()

3.区分国内外访问IP

import os  
import requests  
import shutil  
import gzip  
import glob  
import json  
from datetime import datetime, timedelta  
from urllib.parse import urlencode  
  
from huaweicloudsdkcore.auth.credentials import GlobalCredentials
from huaweicloudsdkcdn.v2.region.cdn_region import CdnRegion
from huaweicloudsdkcore.exceptions import exceptions
from huaweicloudsdkcdn.v2 import *
  

def download_and_extract_logs(domain, start_time, output_dir, dir_time):  
    if not os.path.exists(output_dir):  
        os.makedirs(output_dir)  
      
    domain_dir = os.path.join(output_dir, domain)  
    if not os.path.exists(domain_dir):  
        os.makedirs(domain_dir)
    
#    date_dir =  os.path.join(domain_dir, str(dir_time))
#    if not os.path.exists(date_dir):  
#        os.makedirs(date_dir)
     
    page_number = 1  
    while True:  
        ak = ''
        sk = ''
   
        credentials = GlobalCredentials(ak, sk) \
    
        client = CdnClient.new_builder() \
            .with_credentials(credentials) \
            .with_region(CdnRegion.value_of("cn-north-1")) \
            .build()
    
        try:
            request = ShowLogsRequest()
            request.domain_name = domain
            request.start_time = start_time
            request.page_size = 60
            request.page_number = page_number
            request.enterprise_project_id = "0"
            response = client.show_logs(request)
            data_dict = json.loads(str(response))
        except exceptions.ClientRequestException as e:
            print(e.status_code)
            print(e.request_id)
            print(e.error_code)
            print(e.error_msg)
          

        logs = data_dict.get('logs', [])  
        if not logs:  
            break
          
        for log in logs:  
            log_url = log.get('link')
            if log_url:
                fields_1 = log_url.split('/')  
                sevent_field = fields_1[6]
                fields_2 = sevent_field.split('?')
                filename = fields_2[0]
#                print(filename)             
                log_filename = os.path.basename(filename)  
                log_path = os.path.join(domain_dir, log_filename)  
                with requests.get(log_url, stream=True) as r:  
                    with open(log_path, 'wb') as f:  
                        shutil.copyfileobj(r.raw, f)  
        
        page_number += 1  
      
    # 解压并合并同个域名一天内的所有日志文件,日志存储路径:域名/日志文件  
    merged_log_path = os.path.join(domain_dir, f'{domain}_all_{str(dir_time)}.log')  
    with open(merged_log_path, 'wb') as merged_file:  
        for gz_file in sorted(glob.glob(os.path.join(domain_dir, '*.gz'))):  
            with gzip.open(gz_file, 'rb') as f_in:  
                shutil.copyfileobj(f_in, merged_file) 
                
    # 解压并合并同个域名一天内的国内IP访问文件  
    merged_cn_log_path = os.path.join(domain_dir, f'{domain}_cn_{str(dir_time)}.log')  
    with open(merged_cn_log_path, 'wb') as merged_file:  
        for gz_file in sorted(glob.glob(os.path.join(domain_dir, '*cn.gz'))):  
            with gzip.open(gz_file, 'rb') as f_in:  
                shutil.copyfileobj(f_in, merged_file)  
            os.remove(gz_file)  
            
    # 解压并合并同个域名一天内的国外IP访问所有文件  
    merged_ov_log_path = os.path.join(domain_dir, f'{domain}_ov_{str(dir_time)}.log')  
    with open(merged_ov_log_path, 'wb') as merged_file:  
        for gz_file in sorted(glob.glob(os.path.join(domain_dir, '*ov.gz'))):  
            with gzip.open(gz_file, 'rb') as f_in:  
                shutil.copyfileobj(f_in, merged_file)  
            os.remove(gz_file)   

def main():
    domains = ["www.test.cn"]
    current_time = datetime.now()
    dir_time = (current_time - timedelta(days=1)).strftime('%Y-%m-%d')  
    yesterday = current_time - timedelta(days=1)
    midnight_yesterday = datetime(year=yesterday.year, month=yesterday.month, day=yesterday.day)
    start_time = int(midnight_yesterday.timestamp() * 1000)
    output_dir = 'cdn_logs'  

    for domain in domains:
            download_and_extract_logs(domain, start_time, output_dir, dir_time)

if __name__ == "__main__":
    main()

4.循环获取前一个月内某段时间的日志

import os  
import requests  
import shutil  
import gzip  
import glob  
import json  
from datetime import datetime, timedelta  
from urllib.parse import urlencode  
  
from huaweicloudsdkcore.auth.credentials import GlobalCredentials
from huaweicloudsdkcdn.v2.region.cdn_region import CdnRegion
from huaweicloudsdkcore.exceptions import exceptions
from huaweicloudsdkcdn.v2 import *
  

def download_and_extract_logs(domain, start_time, output_dir, dir_time):  
    if not os.path.exists(output_dir):  
        os.makedirs(output_dir)  
      
    domain_dir = os.path.join(output_dir, domain)  
    if not os.path.exists(domain_dir):  
        os.makedirs(domain_dir)
    
#    date_dir =  os.path.join(domain_dir, str(dir_time))
#    if not os.path.exists(date_dir):  
#        os.makedirs(date_dir)
     
    page_number = 1  
    while True:  
        ak = ''
        sk = ''
   
        credentials = GlobalCredentials(ak, sk) \
    
        client = CdnClient.new_builder() \
            .with_credentials(credentials) \
            .with_region(CdnRegion.value_of("cn-north-1")) \
            .build()
    
        try:
            request = ShowLogsRequest()
            request.domain_name = domain
            request.start_time = start_time
            request.page_size = 60
            request.page_number = page_number
            request.enterprise_project_id = "0"
            response = client.show_logs(request)
            data_dict = json.loads(str(response))
        except exceptions.ClientRequestException as e:
            print(e.status_code)
            print(e.request_id)
            print(e.error_code)
            print(e.error_msg)
          

        logs = data_dict.get('logs', [])  
        if not logs:  
            break
          
        for log in logs:  
            log_url = log.get('link')
            if log_url:
                fields_1 = log_url.split('/')  
                sevent_field = fields_1[6]
                fields_2 = sevent_field.split('?')
                filename = fields_2[0]
#                print(filename)             
                log_filename = os.path.basename(filename)  
                log_path = os.path.join(domain_dir, log_filename)  
                with requests.get(log_url, stream=True) as r:  
                    with open(log_path, 'wb') as f:  
                        shutil.copyfileobj(r.raw, f)  
        
        page_number += 1  
      
    # 解压并合并同个域名一天内的所有日志文件,日志存储路径:域名/日志文件  
    merged_cn_log_path = os.path.join(domain_dir, f'{domain}_all_{str(dir_time)}.log')  
    with open(merged_cn_log_path, 'wb') as merged_file:  
        for gz_file in glob.glob(os.path.join(domain_dir, '*.gz')):  
            with gzip.open(gz_file, 'rb') as f_in:  
                shutil.copyfileobj(f_in, merged_file) 
                
    # 解压并合并同个域名一天内的国内IP访问文件  
    merged_cn_log_path = os.path.join(domain_dir, f'{domain}_cn_{str(dir_time)}.log')  
    with open(merged_cn_log_path, 'wb') as merged_file:  
        for gz_file in glob.glob(os.path.join(domain_dir, '*cn.gz')):  
            with gzip.open(gz_file, 'rb') as f_in:  
                shutil.copyfileobj(f_in, merged_file)  
            os.remove(gz_file)  
            
    # 解压并合并同个域名一天内的国外IP访问所有文件  
    merged_ov_log_path = os.path.join(domain_dir, f'{domain}_ov_{str(dir_time)}.log')  
    with open(merged_ov_log_path, 'wb') as merged_file:  
        for gz_file in glob.glob(os.path.join(domain_dir, '*ov.gz')):  
            with gzip.open(gz_file, 'rb') as f_in:  
                shutil.copyfileobj(f_in, merged_file)  
            os.remove(gz_file)   

def main():
    domains = ["www.test.cn", "www.ceshi.com"]
#    current_time = datetime.now()
#    dir_time = (current_time - timedelta(days=1)).strftime('%Y-%m-%d')  
#    yesterday = current_time - timedelta(days=1)
#    midnight_yesterday = datetime(year=yesterday.year, month=yesterday.month, day=yesterday.day)
#    start_time = int(midnight_yesterday.timestamp() * 1000)
    output_dir = 'cdn_logs'  
    # 设定起始日期
    start_date = datetime(2024, 4, 8)  
    # 设定循环的天数
    num_days = 10  
  
   # 使用for循环遍历日期  
    for i in range(num_days):  
        current_date = start_date + timedelta(days=i)  
        dir_time = current_date.strftime("%Y-%m-%d")
        start_time = int(current_date.timestamp() * 1000)
        for domain in domains:
                download_and_extract_logs(domain, start_time, output_dir, dir_time)

if __name__ == "__main__":
    main()

5.参考文档

https://support.huaweicloud.com/usermanual-cdn/zh-cn_topic_0073337424.html
https://console.huaweicloud.com/apiexplorer/#/openapi/CDN/sdk?api=ShowLogs&version=v2

标签:domain,log,python,CDN,time,path,日志,os,dir
From: https://www.cnblogs.com/sswind/p/18206195

相关文章

  • Python基础篇(函数)
    函数是实现某种特定的功能而组织的带名字的代码块,可以重复使用的,用来实现单一,或相关联功能的代码段,所以说函数是程序员规模化使用的基础。函数能提高应用的模块性,和代码的重复利用率。在程序设计中,常将一些常用的功能模块编写成函数,放在函数库中供公共选用。善于利用函数,可以减少......
  • python环境安装
    python环境安装分三步,下载、安装、验证。一、python包下载python包下载地址python.org。本例程下载3.8.10版本,据说3.8版本较为稳定。最新版本点击黄色按钮可直接下载,如果想要下载历史版本则找到所想要选择的历史版本点击进入,在所进入页面的最下方存放有下载资源。位置如下图所......
  • 写给-Python-开发者的-JavaScript-实用指南-全-
    写给Python开发者的JavaScript实用指南(全)原文:zh.annas-archive.org/md5/3cb5d18379244d57e9ec1c0b43934446译者:飞龙协议:CCBY-NC-SA4.0前言在学习Python时,您通过学习Python的基础知识、其优雅和编程原则,迈出了软件工程职业生涯的第一步。在您职业生涯的下一个阶段......
  • Python:自定义类或模块时的注意事项
     Python进阶版:定义类时应用的9种最佳做法1.好的命名2.显式实例属性3.使用属性——但要精简4.定义有意义的字符串表示法5.实例方法,类方法和静态方法6.使用私有属性进行封装7.分离关注点和解耦8.考虑使用__slots__进行优化9.文件 1.好的命名定义自己的类,就......
  • Serilog日志输出到WPF UI控件
    使用到日志接收器的接口 ILogEventSinkWPF+Prsim+Serilog详细介绍链接 https://github.com/serilog/serilog/wiki/Developing-a-sink   publicinterfaceILogEventSinkWrite:ILogEventSink{LogEventGetLogMessage();}publicclassLogEventSink:I......
  • springboot集成logback-spring.xml日志文件
    logback-spring.xml:<!--Logbackconfiguration.Seehttp://logback.qos.ch/manual/index.html--><configurationscan="true"scanPeriod="10seconds"><springPropertyscope="context"name="logLevel"s......
  • 「Python实用秘技17」快速获取国内节假日安排
    本文完整示例代码及文件已上传至我的Github仓库https://github.com/CNFeffery/PythonPracticalSkills这是我的系列文章「Python实用秘技」的第17期,本系列立足于笔者日常工作中使用Python积累的心得体会,每一期为大家带来一个几分钟内就可学会的简单小技巧。作为系列第1......
  • python开发一个起名字小软件
    01首先我们需要爬虫获取数据,收集名字数据库和相关数据,包括性别、出生年月等信息,以及名字的解释和流行趋势数据。收集名字数据库和相关数据可以通过多种途径进行,包括从公开的数据源获取、爬取网站数据、购买商业数据等。第一个步骤就是下载就是需要request插件。1)到git下载源码zi......
  • 在python中连接SQLlite
    当我们做python时,通常用到SQLlite,但是我们不需要自己建立数据库,只用写几句代码,SQLlite自己就能创建这个数据库这是我的calendar数据库例子importsqlite3definit_db():conn=sqlite3.connect('calendar.db')c=conn.cursor()c.execute('''CREATETABLEIFNOTE......
  • 8-1 【Python0021】电子算盘
    fromtkinterimport* classPaintApp:    def__init__(self,master):        self.master=master        self.canvas=tank                self.canvas.pack(fill=X)        self.canvas.bind('<B1-Motion>',self......