1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
|
# -*- coding: utf-8 -*-
"""
Misc helper functions.
Copyright (C) 2012 Aurelien Bompard
Author: Aurelien Bompard <abompard@fedoraproject.org>
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or (at
your option) any later version.
See http://www.gnu.org/copyleft/gpl.html for the full text of the
license.
"""
import email.utils
import time
import re
from email.header import decode_header
from datetime import datetime, tzinfo, timedelta
from base64 import b32encode
from hashlib import sha1
import dateutil.parser, dateutil.tz
__all__ = ("get_message_id_hash", "parseaddr", "parsedate",
"header_to_unicode", "payload_to_unicode",
"get_ref_and_thread_id",
)
IN_BRACKETS_RE = re.compile("[^<]*<([^>]+)>.*")
def get_message_id_hash(msg_id):
"""
Returns the X-Message-ID-Hash header for the provided Message-ID header.
See <http://wiki.list.org/display/DEV/Stable+URLs#StableURLs-Headers> for
details. Example:
>>> get_message_id_hash('<87myycy5eh.fsf@uwakimon.sk.tsukuba.ac.jp>')
'AGDWSNXXKCWEILKKNYTBOHRDQGOX3Y35'
"""
msg_id = msg_id.strip("<>")
return b32encode(sha1(msg_id).digest())
def parseaddr(address):
"""
Wrapper around email.utils.parseaddr to also handle Mailman's generated
mbox archives.
"""
address = address.replace(" at ", "@")
from_name, from_email = email.utils.parseaddr(address)
if not from_name:
from_name = from_email
return from_name, from_email
def header_to_unicode(header):
h_decoded = []
for decoded, charset in decode_header(header):
if charset is None:
h_decoded.append(unicode(decoded))
else:
if h_decoded:
# not so sure why...
h_decoded.append(" ")
try:
h_decoded.append(decoded.decode(charset))
except LookupError:
# Unknown encoding
h_decoded.append(decoded.decode("ascii", "replace"))
return "".join(h_decoded)
def parsedate(datestring):
if datestring is None:
return None
try:
parsed = dateutil.parser.parse(datestring)
except ValueError:
return None
if abs(parsed.utcoffset()) > timedelta(hours=13):
parsed = parsed.astimezone(dateutil.tz.tzutc())
return parsed
#date_tuple = email.utils.parsedate_tz(datestring)
#timestamp = email.utils.mktime_tz(date_tuple)
#return datetime.fromtimestamp(timestamp)
def get_ref_and_thread_id(message, list_name, store):
"""
Returns the thread ID and the message-id of the reference email for a given
message.
"""
if (not message.has_key("References")
and not message.has_key("In-Reply-To")):
return None, None
# It's a reply, use the thread_id from the parent email
ref_id = message.get("References")
if ref_id is not None:
# There can be multiple references, use the first one
ref_id = ref_id.split()[0].strip()
else:
ref_id = message.get("In-Reply-To")
ref_id = IN_BRACKETS_RE.match(ref_id)
if ref_id is None:
# Can't parse the reference
return None, None
ref_id = ref_id.group(1)
# It's a reply, use the thread_id from the parent email
ref_msg = store.get_message_by_id_from_list(list_name, ref_id)
if ref_msg is None:
thread_id = None
else:
# re-use parent's thread-id
thread_id = unicode(ref_msg.thread_id)
return unicode(ref_id), thread_id
|