Python3邮件发送全指南:文本、HTML与附件

发布于:2025-07-12 ⋅ 阅读:(13) ⋅ 点赞:(0)

在 Python3 中,使用内置的 smtplib 库和 email 模块发送邮件是一个常见的需求。以下是更详细的实现指南,包含各种场景的解决方案和技术细节:

一、发送纯文本邮件的完整实现

  • 准备工作:
    • 确保已开通 SMTP 服务(各邮箱开启方式不同)
    • 获取 SMTP 授权码(非登录密码)
    • 确认服务器地址和端口(常见配置见下表)
邮箱服务 SMTP服务器 SSL端口 TLS端口
QQ邮箱 smtp.qq.com 465 587
163邮箱 smtp.163.com 465 994
Gmail smtp.gmail.com 465 587
  • 核心代码详解:
import smtplib
import ssl
from email.mime.text import MIMEText
from email.header import Header
from email.utils import formataddr, formatdate, make_msgid
from datetime import datetime

# 增强的邮件配置
config = {
    "smtp_server": "smtp.examples.com",
    "smtp_port": 465,
    "sender_email": "your_email@example.com",
    "sender_name": "系统管理员",  # 发件人显示名称
    "password": "your_smtp_password",
    "receivers": [
        {"email": "user1@example.com", "name": "张经理"},
        {"email": "user2@example.com", "name": "李主管"}
    ]
}

# 构建邮件内容(支持多行模板)
email_template = """尊敬的{recipient_name}:

这是来自{system_name}的系统通知邮件。

当前时间:{current_time}
系统状态:正常运行
最近事件:
{events}

请及时处理相关事务。
"""

events = "\n".join([
    "1. 用户登录异常(3次)",
    "2. 数据库备份完成",
    "3. 新版本发布通知"
])

# 填充模板内容
email_body = email_template.format(
    recipient_name="各位",  # 群发时的通用称呼
    system_name="OA系统",
    current_time=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
    events=events
)

# 创建MIME对象
msg = MIMEText(email_body, "plain", "utf-8")

# 设置邮件头(规范格式)
msg["From"] = formataddr((config["sender_name"], config["sender_email"]))
msg["To"] = ", ".join(
    [formataddr((r["name"], r["email"])) for r in config["receivers"]]
)
msg["Subject"] = Header("【重要】系统状态通知", "utf-8")

# 高级邮件头设置
msg["Date"] = formatdate(localtime=True)
msg["Message-ID"] = make_msgid()
msg["X-Priority"] = "1"  # 邮件优先级(1-5, 1最高)
msg["X-Mailer"] = "Python SMTP"  # 邮件客户端标识

# 安全发送流程
context = ssl.create_default_context()
try:
    with smtplib.SMTP_SSL(
        config["smtp_server"],
        config["smtp_port"],
        context=context,
        timeout=10  # 设置超时时间
    ) as server:
        server.login(config["sender_email"], config["password"])
        
        # 实际发送时区分密送和抄送
        to_addresses = [r["email"] for r in config["receivers"]]
        server.sendmail(
            config["sender_email"],
            to_addresses,
            msg.as_string()
        )
        print(f"成功发送邮件至 {len(to_addresses)} 位收件人")
except smtplib.SMTPException as e:
    print(f"邮件发送失败,SMTP错误: {str(e)}")
except Exception as e:
    print(f"发生未知错误: {str(e)}")

二、HTML邮件的专业实现

  • 高级功能支持:
    • 响应式设计(适应移动端)
    • 嵌入式CSS和JavaScript
    • 动态内容渲染
    • 邮件跟踪(通过嵌入图片)
  • 完整示例:
from email.mime.multipart import MIMEMultipart

# 创建多部分邮件
msg = MIMEMultipart("alternative")
msg["From"] = formataddr(("市场部", "marketing@company.com"))
msg["To"] = "customer@example.com"

# 纯文本备用内容
text_part = MIMEText("这是纯文本备用内容", "plain", "utf-8")

# HTML主要内容
html_content = """
<!DOCTYPE html>
<html>
<head>
    <meta http-equiv="Content-Type" content="text/html; charset=utf-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>产品推广邮件</title>
    <style type="text/css">
        /* 响应式设计 */
        body { font-family: 'Helvetica Neue', Arial, sans-serif; margin: 0; padding: 0; }
        .container { max-width: 600px; margin: 0 auto; padding: 20px; }
        .header { background-color: #3498db; color: white; padding: 20px; text-align: center; }
        .content { padding: 20px; line-height: 1.6; }
        .button { display: inline-block; background: #2ecc71; color: white; 
                 padding: 10px 20px; text-decoration: none; border-radius: 5px; }
        .footer { color: #7f8c8d; font-size: 12px; text-align: center; padding: 20px; }
        @media screen and (max-width: 480px) {
            .container { width: 100% !important; }
        }
    </style>
</head>
<body>
    <div class="container">
        <div class="header">
            <h1>新产品发布</h1>
        </div>
        <div class="content">
            <p>尊敬的客户:</p>
            <p>我们很高兴向您介绍我们的最新产品...</p>
            <p><a href="https://example.com/product" class="button">立即查看</a></p>
            
            <!-- 产品特性表格 -->
            <table border="0" cellpadding="0" cellspacing="0" width="100%">
                <tr>
                    <td width="50%" valign="top">
                        <h3>核心功能</h3>
                        <ul>
                            <li>高性能处理</li>
                            <li>简单易用</li>
                        </ul>
                    </td>
                    <td width="50%" valign="top">
                        <h3>技术优势</h3>
                        <ul>
                            <li>先进算法</li>
                            <li>稳定可靠</li>
                        </ul>
                    </td>
                </tr>
            </table>
            
            <!-- 邮件跟踪像素 -->
            <img src="https://example.com/track?email=customer@example.com" width="1" height="1">
        </div>
        <div class="footer">
            <p>© 2023 公司名称. 保留所有权利.</p>
            <p><a href="%unsubscribe_url%">退订邮件</a></p>
        </div>
    </div>
</body>
</html>
"""

html_part = MIMEText(html_content, "html", "utf-8")

# 添加邮件部分
msg.attach(text_part)  # 注意顺序:先添加纯文本版本
msg.attach(html_part)

# 发送逻辑同上...

三、带附件邮件的专业实现

邮件附件的处理是邮件系统开发中的关键环节,需要注意以下技术要点:

  • 自动检测MIME类型

    • 通过文件扩展名识别(如.jpg对应image/jpeg)
    • 使用文件头信息检测(如PDF文件的%PDF标识)
    • 可集成第三方库如Apache Tika进行精确识别
    • 示例:检测到.txt文件自动设置Content-Type为text/plain
  • 支持大文件分块处理

    • 采用BASE64编码时注意76字符换行规则
    • 实施断点续传机制
    • 使用HTTP/1.1的chunked传输编码
    • 典型场景:10MB以上的视频文件传输
  • 处理中文文件名

    • 必须进行RFC 2231编码(如=?UTF-8?B?5Lit5paH?=)
    • 文件名长度限制为75个字符
    • 避免使用非ASCII字符的扩展名
    • 示例处理流程:UTF-8编码 → BASE64编码 → MIME头格式化
  • 添加多个附件

    • 每个附件作为独立的MIME part
    • 使用multipart/mixed作为顶层Content-Type
    • 注意附件顺序对邮件客户端显示的影响
    • 典型实现:附件1(合同.pdf)+附件2(报价单.xlsx)
  • 其他注意事项

    • 设置正确的Content-Disposition(attachment/inline)
    • 处理Windows/Linux路径差异
    • 添加附件描述信息(Content-Description头字段)
    • 安全考虑:病毒扫描和文件类型限制
  • 完整示例:
import os
import mimetypes
from email.mime.base import MIMEBase
from email import encoders

def add_attachment(msg, filepath):
    """专业添加附件方法"""
    if not os.path.exists(filepath):
        raise FileNotFoundError(f"附件文件不存在: {filepath}")
    
    # 猜测MIME类型
    ctype, encoding = mimetypes.guess_type(filepath)
    if ctype is None or encoding is not None:
        ctype = "application/octet-stream"
    
    maintype, subtype = ctype.split("/", 1)
    
    with open(filepath, "rb") as fp:
        part = MIMEBase(maintype, subtype)
        part.set_payload(fp.read())
    
    # 编码和设置头信息
    encoders.encode_base64(part)
    
    # 处理中文文件名
    filename = os.path.basename(filepath)
    part.add_header(
        "Content-Disposition",
        "attachment",
        filename=Header(filename, "utf-8").encode()
    )
    
    msg.attach(part)

# 创建带附件的邮件
msg = MIMEMultipart()
msg["Subject"] = "季度报告和数据分析"

# 添加正文
msg.attach(MIMEText("请查收附件中的季度报告", "plain"))

# 添加多个附件
attachments = [
    "/reports/Q3_Report.pdf",
    "/data/sales_data.xlsx",
    "/images/performance_chart.png"
]

for attachment in attachments:
    try:
        add_attachment(msg, attachment)
        print(f"已添加附件: {os.path.basename(attachment)}")
    except Exception as e:
        print(f"添加附件失败: {str(e)}")

# 发送逻辑...

四、企业级最佳实践

连接池管理增强版

from smtplib import SMTP_SSL
from queue import Queue
import threading

class SMTPConnectionPool:
    def __init__(self, host, port, username, password, pool_size=5):
        self.host = host
        self.port = port
        self.username = username
        self.password = password
        self.pool = Queue(pool_size)
        self.lock = threading.Lock()
        
        # 初始化连接池
        for _ in range(pool_size):
            conn = SMTP_SSL(host, port)
            conn.login(username, password)
            self.pool.put(conn)
    
    def get_connection(self):
        return self.pool.get()
    
    def release_connection(self, conn):
        self.pool.put(conn)
    
    def send_email(self, msg, recipients):
        conn = None
        try:
            conn = self.get_connection()
            conn.sendmail(self.username, recipients, msg.as_string())
        finally:
            if conn:
                self.release_connection(conn)

# 使用示例
pool = SMTPConnectionPool(
    host="smtp.example.com",
    port=465,
    username="user@example.com",
    password="password",
    pool_size=10
)

# 在多线程环境中使用
def worker(email_list):
    for email in email_list:
        msg = build_email_message(email)
        pool.send_email(msg, [email["address"]])

threads = []
for i in range(5):
    t = threading.Thread(target=worker, args=(email_chunks[i],))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

完整的邮件服务类

import logging
from logging.handlers import RotatingFileHandler

class EmailService:
    def __init__(self, config):
        self.config = config
        self.logger = self._setup_logger()
        self.connection_pool = self._init_connection_pool()
    
    def _setup_logger(self):
        logger = logging.getLogger("EmailService")
        logger.setLevel(logging.INFO)
        
        handler = RotatingFileHandler(
            "email_service.log",
            maxBytes=10*1024*1024,  # 10MB
            backupCount=5
        )
        formatter = logging.Formatter(
            "%(asctime)s - %(levelname)s - %(message)s"
        )
        handler.setFormatter(formatter)
        logger.addHandler(handler)
        
        return logger
    
    def _init_connection_pool(self):
        return SMTPConnectionPool(
            host=self.config["smtp_host"],
            port=self.config["smtp_port"],
            username=self.config["smtp_user"],
            password=self.config["smtp_password"],
            pool_size=self.config.get("pool_size", 5)
        )
    
    def send_email(self, template_name, recipient, context, attachments=None):
        """发送模板邮件"""
        try:
            # 1. 加载模板
            template = self._load_template(template_name)
            
            # 2. 构建邮件内容
            msg = self._build_message(template, recipient, context)
            
            # 3. 添加附件
            if attachments:
                for attachment in attachments:
                    self._add_attachment(msg, attachment)
            
            # 4. 发送邮件
            self.connection_pool.send_email(
                msg,
                [recipient["email"]]
            )
            
            self.logger.info(
                f"邮件发送成功: {template_name} -> {recipient['email']}"
            )
            return True
        except Exception as e:
            self.logger.error(
                f"邮件发送失败: {template_name} -> {recipient['email']}: {str(e)}",
                exc_info=True
            )
            return False
    
    # 其他辅助方法...
    def _load_template(self, name):
        """加载邮件模板"""
        pass
    
    def _build_message(self, template, recipient, context):
        """构建邮件消息"""
        pass
    
    def _add_attachment(self, msg, filepath):
        """添加附件"""
        pass

监控与统计集成

from prometheus_client import CollectorRegistry, push_to_gateway
from datetime import datetime

class EmailMetrics:
    def __init__(self):
        self.registry = CollectorRegistry()
        self.emails_sent = Counter(
            "emails_sent_total",
            "Total emails sent",
            ["template"],
            registry=self.registry
        )
        self.send_time = Summary(
            "email_send_time_seconds",
            "Time spent sending emails",
            registry=self.registry
        )
        self.errors = Counter(
            "email_errors_total",
            "Total email sending errors",
            ["type"],
            registry=self.registry
        )
    
    def record_success(self, template, duration):
        self.emails_sent.labels(template).inc()
        self.send_time.observe(duration)
    
    def record_error(self, error_type):
        self.errors.labels(error_type).inc()
    
    def push_metrics(self):
        push_to_gateway(
            "metrics.example.com:9091",
            job="email_service",
            registry=self.registry
        )

# 使用示例
metrics = EmailMetrics()

@metrics.send_time.time()
def send_email_with_metrics(email):
    try:
        start_time = datetime.now()
        
        # 发送邮件逻辑...
        
        duration = (datetime.now() - start_time).total_seconds()
        metrics.record_success(email["template"], duration)
        return True
    except smtplib.SMTPException as e:
        metrics.record_error("smtp")
        raise
    except Exception as e:
        metrics.record_error("other")
        raise
    finally:
        metrics.push_metrics()

五、高级主题扩展

DKIM签名支持

import dkim

def add_dkim_signature(msg, domain, selector, private_key):
    """添加DKIM签名"""
    headers = ["From", "To", "Subject"]
    sig = dkim.sign(
        message=msg.as_bytes(),
        selector=selector.encode(),
        domain=domain.encode(),
        privkey=private_key.encode(),
        include_headers=headers
    )
    msg["DKIM-Signature"] = sig[len("DKIM-Signature: "):].decode()
    return msg

# 使用示例
private_key = """-----BEGIN RSA PRIVATE KEY-----
...
-----END RSA PRIVATE KEY-----"""

msg = add_dkim_signature(
    msg,
    domain="example.com",
    selector="selector1",
    private_key=private_key
)

邮件队列系统集成

import redis
import json
import pickle

class EmailQueue:
    def __init__(self, redis_host="localhost", redis_port=6379):
        self.redis = redis.Redis(
            host=redis_host,
            port=redis_port,
            db=0,
            decode_responses=False
        )
        self.queue_name = "email_queue"
    
    def enqueue(self, email_task):
        """将邮件任务加入队列"""
        serialized = pickle.dumps(email_task)
        self.redis.rpush(self.queue_name, serialized)
    
    def dequeue(self):
        """从队列取出邮件任务"""
        serialized = self.redis.lpop(self.queue_name)
        if serialized:
            return pickle.loads(serialized)
        return None
    
    def process_queue(self, worker_count=4):
        """处理队列中的邮件"""
        def worker():
            while True:
                task = self.dequeue()
                if not task:
                    break
                try:
                    send_email(**task)
                except Exception as e:
                    self._handle_failure(task, e)
        
        threads = []
        for _ in range(worker_count):
            t = threading.Thread(target=worker)
            threads.append(t)
            t.start()
        
        for t in threads:
            t.join()

# 使用示例
queue = EmailQueue()

# 生产者添加任务
queue.enqueue({
    "to": "user@example.com",
    "subject": "测试邮件",
    "body": "这是一封测试邮件",
    "template": "welcome"
})

# 消费者处理队列
queue.process_queue()

通过这些扩展实现,可以构建出适应不同场景的完整邮件解决方案,从简单的通知邮件到复杂的企业级邮件服务。关键是根据实际需求选择合适的技术方案,并注意处理各种边界情况和异常状态。


网站公告

今日签到

点亮在社区的每一天
去签到