# -*- coding: utf-8 -*- # @Author : 107 # @File : emaildriver.py # @explain : 收邮件 import json import re import smtplib import email import time from email.mime.text import MIMEText from email.utils import formataddr from email.mime.multipart import MIMEMultipart import imaplib import os
class CheckEmail:
def __init__(self, username, token, server, emails_path):
"""
:param sender: 邮箱账号
:param sender_token: 邮箱令牌(当时申请smtp给的口令)
:param server: 邮件服务器地址
:param emails_path: 保存原文件的位置[文件夹](原文件:未经处理的邮件内容,txt格式)
"""
self.username = username
self.token = token
self.server = server
self.emails_path = emails_path
self.email_uids = []
def login(self):
_ = self
"""登录"""
# 连接
imap = imaplib.IMAP4_SSL(host=self.server)
# 登录
imap.login(self.username, self.token)
return imap
def select(self, imap):
_ = self
"""查询邮件"""
imaplib.Commands['ID'] = ('AUTH')
args = ("name", "zhang3", "contact", f"{self.username}", "version", "2.58", "vendor", "myclient")
ID_type, ID_result = imap._simple_command('ID', '("' + '" "'.join(args) + '")')
# print(imap._untagged_response(ID_type, ID_result, 'ID'))
# inbox = imap.select('"INBOX"')[-1][0].decode('utf-8') # 收到的邮件总数
# already_inbox = imap.select('"INBOX"', readonly=True)[-1][0].decode('utf-8') # 收到的邮件总数
# sent = imap.select('"Sent Items"')[-1][0].decode('utf-8') # 发送的邮件总数
# trash = imap.select('"Trash"')[-1][0].decode('utf-8') # 垃圾消息
# print(f"收到的邮件总数:{inbox},已读邮件总数:{already_inbox},发送的邮件总数:{sent},垃圾消息{trash}")
imap.select('INBOX', readonly=True)
# state, result = imap.search(None, 'ALL') # 返回一个元组,data为此邮箱的所有邮件数据
state, result = imap.uid("search", None, 'ALL') # 返回一个元组,data为此邮箱的所有邮件的uid
return state, result
def save(self, single):
_ = self
"""存储邮件内容"""
for response in single[-1]:
if isinstance(response, tuple):
# content = response[1].decode('utf-8')
# msg = email.message_from_string(content)
file_name = f"{time.time()}.txt"
print(f"邮件名:{file_name}")
path = os.path.join(self.emails_path, file_name)
with open(path, "wb") as w:
w.write(response[1])
def run(self):
"""
:return: 返回所有邮件的id
"""
imap = self.login()
print("登陆成功")
select_state, select_result = self.select(imap)
print(select_state, select_result)
if select_state == "OK":
email_uids = select_result[0].split(b' ')
email_uids = [i for i in email_uids if i]
self.email_uids = email_uids
# email_uids = [i for i in email_uids if i][1::-1]
email_number = len(email_uids)
for i in range(email_number):
uid = email_uids[i]
print(f"邮件uid:{uid}")
single = imap.uid("fetch", uid, "(BODY.PEEK[] FLAGS)")
self.save(single)
return self.email_uids
标签:uids,self,imap,邮件,email,select From: https://www.cnblogs.com/hudieren/p/16792045.html