treewide: reformat python code

This commit is contained in:
Martin Weinelt 2025-05-08 01:40:37 +02:00
parent f9fcbe9430
commit a7d580b934
No known key found for this signature in database
GPG key ID: 87C1E9888F856759
3 changed files with 185 additions and 129 deletions

View file

@ -1,33 +1,37 @@
import smtplib, sys
import argparse
import os
import uuid
import imaplib
from datetime import datetime, timedelta
import email
import email.utils
import imaplib
import smtplib
import time
import uuid
from datetime import datetime, timedelta
from typing import cast
RETRY = 100
def _send_mail(smtp_host, smtp_port, smtp_username, from_addr, from_pwd, to_addr, subject, starttls):
print("Sending mail with subject '{}'".format(subject))
message = "\n".join([
"From: {from_addr}",
"To: {to_addr}",
"Subject: {subject}",
"Message-ID: {random}@mail-check.py",
"Date: {date}",
"",
"This validates our mail server can send to Gmail :/"]).format(
from_addr=from_addr,
to_addr=to_addr,
subject=subject,
random=str(uuid.uuid4()),
date=email.utils.formatdate(),
)
def _send_mail(
smtp_host, smtp_port, smtp_username, from_addr, from_pwd, to_addr, subject, starttls
):
print("Sending mail with subject '{}'".format(subject))
message = "\n".join(
[
"From: {from_addr}",
"To: {to_addr}",
"Subject: {subject}",
"Message-ID: {random}@mail-check.py",
"Date: {date}",
"",
"This validates our mail server can send to Gmail :/",
]
).format(
from_addr=from_addr,
to_addr=to_addr,
subject=subject,
random=str(uuid.uuid4()),
date=email.utils.formatdate(),
)
retry = RETRY
while True:
@ -44,7 +48,9 @@ def _send_mail(smtp_host, smtp_port, smtp_username, from_addr, from_pwd, to_addr
except smtplib.SMTPResponseException as e:
if e.smtp_code == 451: # service unavailable error
print(e)
elif e.smtp_code == 454: # smtplib.SMTPResponseException: (454, b'4.3.0 Try again later')
elif (
e.smtp_code == 454
): # smtplib.SMTPResponseException: (454, b'4.3.0 Try again later')
print(e)
else:
raise
@ -62,15 +68,17 @@ def _send_mail(smtp_host, smtp_port, smtp_username, from_addr, from_pwd, to_addr
print("Retry attempts exhausted")
exit(5)
def _read_mail(
imap_host,
imap_port,
imap_username,
to_pwd,
subject,
ignore_dkim_spf,
show_body=False,
delete=True):
imap_host,
imap_port,
imap_username,
to_pwd,
subject,
ignore_dkim_spf,
show_body=False,
delete=True,
):
print("Reading mail from %s" % imap_username)
message = None
@ -81,28 +89,31 @@ def _read_mail(
today = datetime.today()
cutoff = today - timedelta(days=1)
dt = cutoff.strftime('%d-%b-%Y')
dt = cutoff.strftime("%d-%b-%Y")
for _ in range(0, RETRY):
print("Retrying")
obj.select()
_, data = obj.search(None, '(SINCE %s) (SUBJECT "%s")' % (dt, subject))
if data == [b'']:
if data == [b""]:
time.sleep(1)
continue
uids = data[0].decode("utf-8").split(" ")
if len(uids) != 1:
print("Warning: %d messages have been found with subject containing %s " % (len(uids), subject))
print(
"Warning: %d messages have been found with subject containing %s "
% (len(uids), subject)
)
# FIXME: we only consider the first matching message...
uid = uids[0]
_, raw = obj.fetch(uid, '(RFC822)')
_, raw = obj.fetch(uid, "(RFC822)")
if delete:
obj.store(uid, '+FLAGS', '\\Deleted')
obj.store(uid, "+FLAGS", "\\Deleted")
obj.expunge()
assert raw[0] and raw[0][1]
message = email.message_from_bytes(cast(bytes, raw[0][1]))
print("Message with subject '%s' has been found" % message['subject'])
print("Message with subject '%s' has been found" % message["subject"])
if show_body:
if message.is_multipart():
for part in message.walk():
@ -118,21 +129,24 @@ def _read_mail(
break
if message is None:
print("Error: no message with subject '%s' has been found in INBOX of %s" % (subject, imap_username))
print(
"Error: no message with subject '%s' has been found in INBOX of %s"
% (subject, imap_username)
)
exit(1)
if ignore_dkim_spf:
return
# gmail set this standardized header
if 'ARC-Authentication-Results' in message:
if "dkim=pass" in message['ARC-Authentication-Results']:
if "ARC-Authentication-Results" in message:
if "dkim=pass" in message["ARC-Authentication-Results"]:
print("DKIM ok")
else:
print("Error: no DKIM validation found in message:")
print(message.as_string())
exit(2)
if "spf=pass" in message['ARC-Authentication-Results']:
if "spf=pass" in message["ARC-Authentication-Results"]:
print("SPF ok")
else:
print("Error: no SPF validation found in message:")
@ -142,71 +156,108 @@ def _read_mail(
print("DKIM and SPF verification failed")
exit(4)
def send_and_read(args):
src_pwd = None
if args.src_password_file is not None:
src_pwd = args.src_password_file.readline().rstrip()
dst_pwd = args.dst_password_file.readline().rstrip()
if args.imap_username != '':
if args.imap_username != "":
imap_username = args.imap_username
else:
imap_username = args.to_addr
subject = "{}".format(uuid.uuid4())
_send_mail(smtp_host=args.smtp_host,
smtp_port=args.smtp_port,
smtp_username=args.smtp_username,
from_addr=args.from_addr,
from_pwd=src_pwd,
to_addr=args.to_addr,
subject=subject,
starttls=args.smtp_starttls)
_send_mail(
smtp_host=args.smtp_host,
smtp_port=args.smtp_port,
smtp_username=args.smtp_username,
from_addr=args.from_addr,
from_pwd=src_pwd,
to_addr=args.to_addr,
subject=subject,
starttls=args.smtp_starttls,
)
_read_mail(
imap_host=args.imap_host,
imap_port=args.imap_port,
imap_username=imap_username,
to_pwd=dst_pwd,
subject=subject,
ignore_dkim_spf=args.ignore_dkim_spf,
)
_read_mail(imap_host=args.imap_host,
imap_port=args.imap_port,
imap_username=imap_username,
to_pwd=dst_pwd,
subject=subject,
ignore_dkim_spf=args.ignore_dkim_spf)
def read(args):
_read_mail(imap_host=args.imap_host,
imap_port=args.imap_port,
imap_username=args.imap_username,
to_pwd=args.imap_password,
subject=args.subject,
ignore_dkim_spf=args.ignore_dkim_spf,
show_body=args.show_body,
delete=False)
_read_mail(
imap_host=args.imap_host,
imap_port=args.imap_port,
imap_username=args.imap_username,
to_pwd=args.imap_password,
subject=args.subject,
ignore_dkim_spf=args.ignore_dkim_spf,
show_body=args.show_body,
delete=False,
)
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers()
parser_send_and_read = subparsers.add_parser('send-and-read', description="Send a email with a subject containing a random UUID and then try to read this email from the recipient INBOX.")
parser_send_and_read.add_argument('--smtp-host', type=str)
parser_send_and_read.add_argument('--smtp-port', type=str, default=25)
parser_send_and_read.add_argument('--smtp-starttls', action='store_true')
parser_send_and_read.add_argument('--smtp-username', type=str, default='', help="username used for smtp login. If not specified, the from-addr value is used")
parser_send_and_read.add_argument('--from-addr', type=str)
parser_send_and_read.add_argument('--imap-host', required=True, type=str)
parser_send_and_read.add_argument('--imap-port', type=str, default=993)
parser_send_and_read.add_argument('--to-addr', type=str, required=True)
parser_send_and_read.add_argument('--imap-username', type=str, default='', help="username used for imap login. If not specified, the to-addr value is used")
parser_send_and_read.add_argument('--src-password-file', type=argparse.FileType('r'))
parser_send_and_read.add_argument('--dst-password-file', required=True, type=argparse.FileType('r'))
parser_send_and_read.add_argument('--ignore-dkim-spf', action='store_true', help="to ignore the dkim and spf verification on the read mail")
parser_send_and_read = subparsers.add_parser(
"send-and-read",
description="Send a email with a subject containing a random UUID and then try to read this email from the recipient INBOX.",
)
parser_send_and_read.add_argument("--smtp-host", type=str)
parser_send_and_read.add_argument("--smtp-port", type=str, default=25)
parser_send_and_read.add_argument("--smtp-starttls", action="store_true")
parser_send_and_read.add_argument(
"--smtp-username",
type=str,
default="",
help="username used for smtp login. If not specified, the from-addr value is used",
)
parser_send_and_read.add_argument("--from-addr", type=str)
parser_send_and_read.add_argument("--imap-host", required=True, type=str)
parser_send_and_read.add_argument("--imap-port", type=str, default=993)
parser_send_and_read.add_argument("--to-addr", type=str, required=True)
parser_send_and_read.add_argument(
"--imap-username",
type=str,
default="",
help="username used for imap login. If not specified, the to-addr value is used",
)
parser_send_and_read.add_argument("--src-password-file", type=argparse.FileType("r"))
parser_send_and_read.add_argument(
"--dst-password-file", required=True, type=argparse.FileType("r")
)
parser_send_and_read.add_argument(
"--ignore-dkim-spf",
action="store_true",
help="to ignore the dkim and spf verification on the read mail",
)
parser_send_and_read.set_defaults(func=send_and_read)
parser_read = subparsers.add_parser('read', description="Search for an email with a subject containing 'subject' in the INBOX.")
parser_read.add_argument('--imap-host', type=str, default="localhost")
parser_read.add_argument('--imap-port', type=str, default=993)
parser_read.add_argument('--imap-username', required=True, type=str)
parser_read.add_argument('--imap-password', required=True, type=str)
parser_read.add_argument('--ignore-dkim-spf', action='store_true', help="to ignore the dkim and spf verification on the read mail")
parser_read.add_argument('--show-body', action='store_true', help="print mail text/plain payload")
parser_read.add_argument('subject', type=str)
parser_read = subparsers.add_parser(
"read",
description="Search for an email with a subject containing 'subject' in the INBOX.",
)
parser_read.add_argument("--imap-host", type=str, default="localhost")
parser_read.add_argument("--imap-port", type=str, default=993)
parser_read.add_argument("--imap-username", required=True, type=str)
parser_read.add_argument("--imap-password", required=True, type=str)
parser_read.add_argument(
"--ignore-dkim-spf",
action="store_true",
help="to ignore the dkim and spf verification on the read mail",
)
parser_read.add_argument(
"--show-body", action="store_true", help="print mail text/plain payload"
)
parser_read.add_argument("subject", type=str)
parser_read.set_defaults(func=read)
args = parser.parse_args()