首页 > 编程问答 >自动解码并检索 S/MIME 加密电子邮件的正文 (python)

自动解码并检索 S/MIME 加密电子邮件的正文 (python)

时间:2024-07-30 05:31:01浏览次数:11  
标签:python email encryption openssl smime

我如何:

  1. 用 python 代码连接我的邮件收件箱以自动获取未读电子邮件的加密内容;
  2. 解码 S/MIME 加密电子邮件(我有密钥);
  3. 检索电子邮件正文纯文本;
  4. 检查正文(或主题)是否与某个关键字(现在为“test”)匹配,并在匹配时打印一些内容;
  5. 在树莓派上使用此代码,无需手动执行任何操作。

我已经检查过 类似的帖子 ,但我无法设法将其实际连接到我的邮箱并使其全部工作,而无需手动执行任何操作。

我正在使用 iOS' /iCloud 的 IMAP (imap.mail.me.com),我现在正在使用 mail.login() ,但它似乎实际上没有从未读电子邮件中获取任何内容。

我现在拥有的代码:|| |目前最好的代码,在 Windows 上的 Visual Studio Code 中编程,而不是 Linux

import imaplib
import email
from email.header import decode_header
from OpenSSL import crypto
import time, datetime

# Email receiving configuration
imap_server = "imap.mail.me.com" # <-- iCloud's mail IMAP
imap_user = "[email protected]" # <- Email
imap_password = "example_password" # <-- Email App-Specific password

# Paths to your certificate and private key
cert_path = 'path\to\my\certificate.pem' #<-- Path to cert.pem
key_path = 'path\to\my\privatekey.pem' # <-- Path to privkey.pem

# Function to decrypt S/MIME email
def decrypt_smime(encrypted_message):
    try:
        # Load the certificate and private key
        with open(cert_path, 'rb') as f:
            cert = crypto.load_certificate(crypto.FILETYPE_PEM, f.read())
        with open(key_path, 'rb') as f:
            key = crypto.load_privatekey(crypto.FILETYPE_PEM, f.read())

        # Create a Crypto object and decrypt the email
        smime = crypto.SMIME()
        smime.load_cert(cert)
        smime.load_key(key)
        decrypted_message = smime.decrypt(encrypted_message)
        return decrypted_message.decode('utf-8')
    except Exception as e:
        print(f"Error decrypting S/MIME email: {e}")
        return None

# Function to process email payload
def process_email_payload(msg):
    if msg.is_multipart():
        for part in msg.walk():
            content_type = part.get_content_type()
            if content_type == "application/pkcs7-mime":
                payload = part.get_payload(decode=True)
                payload = decrypt_smime(payload)
                if payload:
                    return payload.strip()
            elif content_type in ["text/plain", "text/html"]:
                payload = part.get_payload(decode=True).strip()
                return payload.decode('utf-8')
    else:
        payload = msg.get_payload(decode=True)
        if msg.get_content_type() == "application/pkcs7-mime":
            payload = decrypt_smime(payload)
        return payload.decode('utf-8')
    return None

# Function to check for new emails
def check_email():
    try:
        mail = imaplib.IMAP4_SSL(imap_server)
        mail.login(imap_user, imap_password)
        mail.select("inbox")

        result, data = mail.search(None, "UNSEEN")

        # Debugging information
        print(f"Search result {datetime.datetime.now():%H.%M.%S}: {result}") # <-- Also prints current time so I can differentiate the different outputs
        print(f"Search data: {data}")

        if result != "OK":
            print(f"Error searching Inbox: {result}")
            return

        if data[0] is None:
            print("No new emails.")
            return

        email_ids = data[0].split()
        if not email_ids:
            print("No new emails.")
            return

        print(f"Email IDs: {email_ids}")  # Debug email IDs

        for email_id in email_ids:
            result, msg_data = mail.fetch(email_id, "(RFC822)")
            print(f"Fetch result: {result}")  # Debug fetch result
            print(f"Fetch msg_data: {msg_data}")  # Debug msg_data

            if result != "OK":
                print(f"Error fetching email ID {email_id}: {result}")
                continue

            for response_part in msg_data:
                if isinstance(response_part, tuple):
                    raw_email = response_part[1]
                    print(f"Raw email: {raw_email}")  # Debug raw email data
                    try:
                        msg = email.message_from_bytes(raw_email)
                        subject, encoding = decode_header(msg["Subject"])[0]
                        if isinstance(subject, bytes):
                            subject = subject.decode(encoding if encoding else 'utf-8')
                        sender = email.utils.parseaddr(msg["From"])[1]

                        print(f"Subject: {subject}")
                        print(f"From: {sender}")

                        payload = process_email_payload(msg)
                        if payload:
                            print(f"Payload: {payload}")  # Debug payload content
                        else:
                            print(f"Could not decode payload for email ID {email_id}")
                            continue

                        if "test" in payload:
                            print("Test prompt received")

                    except Exception as e:
                        print(f"Error decoding email ID {email_id}: {e}")
                        continue

        mail.close()
        mail.logout()
    except Exception as e:
        print(f"Failed to check email: {e}")

# Actually try to check and decode emails (loop)
while True:
    check_email()
    time.sleep(10)

它确实会主动检查电子邮件,但不会获取任何内容:

终端输出

Fetch result: OK
Fetch msg_data: [b'0000 ()']

并且不打印主题,等等,因为它不是

我假设msg_data实际上是空的(基本上)。 isinstance(response_part, tuple) 我已经尝试了数十种其他可能的解决方案,但这似乎是目前最好的。

所以我实际上如何主动检查未读电子邮件并解码其正文?

请尽力描述解决方案,因为我真的有兴趣了解更多有关此的信息,并且我想提高我的整体编码技能。我也对我做错了什么感兴趣。

Please try your best with describing the solution, since I'm really interested in learning more information about this and I would like to improve my over-all coding skills. I'm also interested in what I did wrong.


代码的问题在于,没有正确检索电子邮件内容。 mail.fetch 方法返回电子邮件数据(包括其内容)的封装在元组中的列表。正在访问元组的错误部分,从而导致空数据。

以下是改进代码的步骤:

  1. 更正电子邮件内容检索:
  2. 需要正确解析 mail.fetch 返回的数据。它返回一个包含两个元素的元组列表:
    • 第一个元素是状态代码。
    • 第二个元素是包含电子邮件数据的元组,形式为 (flags, data)
  3. 需要提取 data 部分,然后使用 email.message_from_bytes 解析它。

  4. 优化电子邮件处理:

  5. 在遍历电子邮件部分时,如果找到匹配的 content_type application/pkcs7-mime text/plain / text/html ),可以使用 break 语句退出循环,因为已经找到了所需的有效负载。

以下是更新后的代码:

import imaplib
import email
from email.header import decode_header
from OpenSSL import crypto
import time, datetime

# ...(电子邮件配置、证书路径与之前相同)... 

# ...(decrypt_smime 函数与之前相同)... 

# 函数来处理电子邮件有效负载
def process_email_payload(msg):
    if msg.is_multipart():
        for part in msg.walk():
            content_type = part.get_content_type()
            if content_type == "application/pkcs7-mime":
                payload = part.get_payload(decode=True)
                payload = decrypt_smime(payload)
                if payload:
                    return payload.strip()
                break  # 找到 S/MIME 部分,退出循环
            elif content_type in ["text/plain", "text/html"]:
                payload = part.get_payload(decode=True).strip()
                return payload.decode('utf-8')
                break  # 找到文本或 HTML 部分,退出循环
    else:
        payload = msg.get_payload(decode=True)
        if msg.get_content_type() == "application/pkcs7-mime":
            payload = decrypt_smime(payload)
        return payload.decode('utf-8')
    return None

# 函数来检查新电子邮件
def check_email():
    try:
        mail = imaplib.IMAP4_SSL(imap_server)
        mail.login(imap_user, imap_password)
        mail.select("inbox")

        result, data = mail.search(None, "UNSEEN")

        # ...(调试信息与之前相同)...

        if result != "OK":
            print(f"搜索收件箱时出错:{result}")
            return

        if data[0] is None:
            print("没有新邮件。")
            return

        email_ids = data[0].split()
        if not email_ids:
            print("没有新邮件。")
            return

        print(f"电子邮件 ID:{email_ids}")  # 调试电子邮件 ID

        for email_id in email_ids:
            result, msg_data = mail.fetch(email_id, "(RFC822)")

            # ...(调试信息与之前相同)...

            if result != "OK":
                print(f"提取电子邮件 ID {email_id} 时出错:{result}")
                continue

            for response_part in msg_data:
                if isinstance(response_part, tuple):
                    # 从元组中获取电子邮件数据
                    _, raw_email = response_part  # 忽略第一个元素(flags)
                    print(f"原始电子邮件:{raw_email}") # 调试原始电子邮件数据
                    try:
                        msg = email.message_from_bytes(raw_email)
                        subject, encoding = decode_header(msg["Subject"])[0]
                        if isinstance(subject, bytes):
                            subject = subject.decode(encoding if encoding else 'utf-8')
                        sender = email.utils.parseaddr(msg["From"])[1]

                        print(f"主题:{subject}")
                        print(f"发件人:{sender}")

                        payload = process_email_payload(msg)
                        if payload:
                            print(f"有效负载:{payload}")  # 调试有效负载内容
                        else:
                            print(f"无法解码电子邮件 ID {email_id} 的有效负载")
                            continue

                        if "test" in payload:
                            print("收到测试提示")

                    except Exception as e:
                        print(f"解码电子邮件 ID {email_id} 时出错:{e}")
                        continue

        mail.close()
        mail.logout()
    except Exception as e:
        print(f"检查电子邮件失败:{e}")

# 实际尝试检查和解码电子邮件(循环)
while True:
    check_email()
    time.sleep(10)

这些更正应该可以解决电子邮件内容检索问题,并允许正确解码和处理 S/MIME 加密的电子邮件。

标签:python,email,encryption,openssl,smime
From: 78809096

相关文章

  • Python 3 写入 DBF(带有 Memo 的 dBase IV)
    我需要在Python3中写入带有备注字段的dBaseIVdbf文件,但找不到合适的模块来执行此操作。我尝试过的包:Simpledbf-只读dbf-不支持dBaseIVdbfpy-不支持Python3dbfpy3-不支持dBaseIVYDbf-不支持备注字段pyshp-无法仅使用dbf文件......
  • Robin-Stocks Python 中的 order_buy_fractional_by_price 问题
    我在Robin-StocksPython包中的order_buy_fractional_by_price函数中遇到问题。在正常市场交易时间内下达以美元为基础的买入订单时,该订单被错误地设置为限价订单。对我来说看起来有问题的代码似乎是导致此问题的原因。我尝试在包管理器中本地修改或删除有问题的代码,但遇......
  • 在python中使用turtle绘制图案(带点)
    我正在尝试使用python中的海龟制作一幅赫斯特画(点图案)。我设法实现了它。Hirst_painting_dot_pattern但是我的for循环没有按照我预期的方式工作。它省略了最后一次迭代。在下面的代码中,我的for循环没有生成最后一个点。因此,我在循环末尾添加了一行来制作最后......
  • 使用 Python 进行 QuantLib Vanilla 掉期定价 - 错误
    我真的需要帮助...我有一个使用QuantLib构建自己的VanillaSwapPricer的项目。我想根据ois掉期的市场价格进行计算以进行贴现,并根据Euribor6M掉期+FRA进行预测固定。总而言之,我的目标是尽可能接近彭博社对标准Euribor6M掉期的定价(贴现ois)-fwdEuribor6M)。......
  • 我正在制作一个可以打开wav文件的python程序,我想知道wav文件的格式是什么
    因此,我已经通过此网站的研究编写了验证并读取wav标头的代码。但我想知道,data段中的数据是如何存储的?它们位于16位部分中,彼此相邻放置。我认为在Audacity中制作440hz正弦波,然后导出它,会显示一些结果,并且字节确实看起来更整齐,但仍然像废话一样接缝。相信我,我已经......
  • python - 面板库 - PasswordInput 不会对回车做出反应
    我试图避免需要提交按钮。以下代码当前正在远程jupyter实验室运行。仅当光标焦点从密码小部件中移除后,才会打印该消息。我想要回车来触发消息打印。有什么线索吗?frompanel.widgetsimportPasswordInput,TextInputpn.extension()defon_enter(event=None):message_p......
  • 即使安装了软件包,也找不到 python 模块句子转换器
    对于我的python脚本(如下所示),我使用句子转换器包,其中包含SBERT模型。即使在执行“piplist”时明确列出该软件包,并且还安装了torch,所有内容都更新到最新版本,但仍然找不到该模块。脚本:importnumpyasnpimportpandasaspdfromsentence_transformersimportSenten......
  • 有没有办法在 python 中返回类实例的布尔属性?
    我想组织我玩的游戏中的汽车数据。为此,我创建了一个“Car”类。我希望将此类实例的一些属性作为布尔值,这样我就可以将此类的所有实例放入列表中并过滤它们。我当前的解决方案不起作用,因为,我收到以下错误消息:TypeError:__repr__returnednon-string(typeCar)我使用......
  • python 正则表达式匹配一行中的多个单词而不转到下一行
    我正在编写一个解析器来解析以下输出:admin@str-s6000-on-5:~$showinterfacestatusEthernet4InterfaceLanesSpeedMTUAliasVlanOperAdminTypeAsymPFC------------------------------------------......
  • 使用 Python 平滑和对称不规则形状和曲线
    我需要完成三项任务:正则化曲线曲线的对称性完成不完整的曲线例如,这里是输入和预期的输出图像:输入输出|||在一般设置中,形状可以由任何SVG曲线基元(贝塞尔曲线、直线、弧线)表示。为了统一表示,示例包含曲线的折线近似。这些折线保存为......