""" ``mail`` module.
"""
from mimetypes import guess_type
from os.path import split as path_split
from smtplib import SMTP
from time import time
try:
from email.charset import CHARSETS, QP, SHORTEST
from email.encoders import encode_base64
from email.header import Header
from email.message import Message
from email.utils import formataddr, formatdate, make_msgid
except ImportError: # pragma: nocover, python2.4
from email.Charset import CHARSETS # noqa
from email.Charset import QP, SHORTEST
from email.Encoders import encode_base64 # noqa
from email.Header import Header # noqa
from email.Message import Message # noqa
from email.Utils import formataddr # noqa
from email.Utils import formatdate # noqa
from email.Utils import make_msgid # noqa
# Do not apply Base64 encoding to utf-8 messages, use quoted-printable
# since it is less verbose
CHARSETS["utf-8"] = (SHORTEST, QP, "utf-8")
del CHARSETS, SHORTEST, QP
[docs]def mail_address(addr, name=None, charset="utf8"):
"""Returns mail address formatted string."""
try:
addr.encode("us-ascii")
except UnicodeEncodeError:
addr = "@".join(
[p.encode("idna").decode("us-ascii") for p in addr.split("@", 1)]
)
return name and formataddr((mime_header(name, charset), addr)) or addr
[docs]class MailMessage(object):
"""Mail message."""
def __init__(
self,
subject="",
content="",
from_addr=None,
to_addrs=None,
cc_addrs=None,
bcc_addrs=None,
reply_to_addrs=None,
content_type="text/plain",
charset="us-ascii",
):
self.subject = subject
self.content = content
self.from_addr = from_addr
self.to_addrs = to_addrs or []
self.cc_addrs = cc_addrs or []
self.bcc_addrs = bcc_addrs or []
self.reply_to_addrs = reply_to_addrs or []
self.content_type = content_type
self.charset = charset
self.date = time()
self.attachments = []
self.alternatives = []
def recipients(self):
return set(self.to_addrs + self.cc_addrs + self.bcc_addrs)
[docs]class Attachment(object):
"""An attachment to mail message."""
def __init__(
self,
name,
content,
content_type=None,
disposition=None,
name_charset=None,
content_charset=None,
):
self.name = name
self.content = content
self.content_type = content_type
self.disposition = disposition
self.name_charset = name_charset
self.content_charset = content_charset
[docs] @classmethod
def from_file(cls, path):
"""Creates an attachment from file."""
ignore, name = path_split(path)
f = open(path, "rb")
try:
return cls(name, f.read())
finally:
f.close()
[docs]class Alternative(object):
"""Represents alternative mail message."""
def __init__(self, content, content_type="text/html", charset=None):
self.content = content
self.content_type = content_type
self.charset = charset
self.related = []
[docs]class SMTPClient(object):
"""SMTP client that can be used to send mail."""
def __init__(
self,
host="127.0.0.1",
port=25,
use_tls=False,
username=None,
password=None,
):
self.host = host
self.port = port
self.use_tls = use_tls
self.username = username
self.password = password
[docs] def send(self, message):
"""Sends a single mail message."""
recepients = message.recipients()
content = mime(message).as_string().encode(message.charset)
# keep connection scope minimal
client = self.connect()
try:
client.sendmail(message.from_addr, recepients, content)
finally:
client.quit()
[docs] def send_multi(self, messages):
"""Sends multiple mail messages."""
args = [
(
message.from_addr,
message.recipients(),
mime(message).as_string().encode(message.charset),
)
for message in messages
]
# keep connection scope minimal
client = self.connect()
try:
for arg in args:
client.sendmail(*arg)
finally:
client.quit()
def connect(self):
smtp = SMTP()
# smtp.set_debuglevel(1)
smtp.connect(self.host, self.port)
if self.use_tls:
smtp.starttls()
if self.username:
smtp.login(self.username, self.password)
return smtp
# region: internal details
def mime(message):
charset = message.charset
m = mime_part(message.content, message.content_type, charset)
subparts = message.content and [m] or []
if message.alternatives:
subparts += [mime_alternative(a) for a in message.alternatives]
if len(subparts) > 1:
m = mime_multipart("multipart/alternative", subparts)
subparts = [m]
else:
m = subparts[0]
if message.attachments:
subparts += [mime_attachment(a) for a in message.attachments]
m = mime_multipart("multipart/mixed", subparts)
m["Message-ID"] = make_msgid()
m["Subject"] = message.subject
m["Date"] = formatdate(message.date, localtime=True)
m["From"] = message.from_addr
m["To"] = ", ".join(message.to_addrs)
if message.cc_addrs:
m["Cc"] = ", ".join(message.cc_addrs)
if message.bcc_addrs:
m["Bcc"] = ", ".join(message.bcc_addrs)
if message.reply_to_addrs:
m["Reply-To"] = ", ".join(message.reply_to_addrs)
return m
def mime_header(value, charset):
try:
value.encode("us-ascii")
except UnicodeEncodeError:
return Header(value, charset).encode()
else:
return value
def mime_part(content, content_type, content_charset=None):
m = Message()
m.add_header("Content-Type", content_type)
m.set_payload(content, content_charset)
if not content_type.startswith("text"):
encode_base64(m)
return m
def mime_multipart(content_type, subparts):
m = Message()
m.add_header("Content-Type", content_type)
m.set_payload(subparts)
return m
def mime_alternative(a):
m = mime_part(a.content, a.content_type, a.charset)
if a.related:
subparts = [m]
for r in a.related:
m = mime_part(r.content, r.content_type)
m.add_header("Content-ID", r.content_id)
subparts.append(m)
m = mime_multipart("multipart/related", subparts)
return m
def mime_attachment(attachment):
content_type = attachment.content_type
if not content_type:
content_type, ignore = guess_type(attachment.name)
if content_type is None:
content_type = "application/octet-stream"
m = mime_part(attachment.content, content_type, attachment.content_charset)
name = attachment.name
if attachment.name_charset:
name = (attachment.name_charset, "", name)
# see http://www.ietf.org/rfc/rfc2183.txt
m.add_header(
"Content-Disposition",
attachment.disposition or "attachment",
filename=name,
)
return m