1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
|
# -*- coding: utf-8 -*-
# Copyright (C) 2011-2012 by the Free Software Foundation, Inc.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
# USA.
"""
Misc helper functions.
Author: Aurelien Bompard <abompard@fedoraproject.org>
"""
import email.utils
import re
from email.header import decode_header
from datetime import timedelta
from base64 import b32encode
from hashlib import sha1 # pylint: disable-msg=E0611
import dateutil.parser, dateutil.tz
__all__ = ("get_message_id_hash", "parseaddr", "parsedate",
"header_to_unicode", "payload_to_unicode",
"get_ref", "get_ref_and_thread_id",
)
IN_BRACKETS_RE = re.compile("[^<]*<([^>]+)>.*")
def get_message_id_hash(msg_id):
"""
Returns the X-Message-ID-Hash header for the provided Message-ID header.
See <http://wiki.list.org/display/DEV/Stable+URLs#StableURLs-Headers> for
details. Example:
>>> get_message_id_hash('<87myycy5eh.fsf@uwakimon.sk.tsukuba.ac.jp>')
'JJIGKPKB6CVDX6B2CUG4IHAJRIQIOUTP'
"""
msg_id = email.utils.unquote(msg_id)
return b32encode(sha1(msg_id).digest())
def parseaddr(address):
"""
Wrapper around email.utils.parseaddr to also handle Mailman's generated
mbox archives.
"""
address = address.replace(" at ", "@")
from_name, from_email = email.utils.parseaddr(address)
if not from_name:
from_name = from_email
return from_name, from_email
def header_to_unicode(header):
"""
See also: http://ginstrom.com/scribbles/2007/11/19/parsing-multilingual-email-with-python/
"""
h_decoded = []
for text, charset in decode_header(header):
if charset is None:
h_decoded.append(unicode(text))
else:
try:
h_decoded.append(text.decode(charset))
except LookupError:
# Unknown encoding
h_decoded.append(text.decode("ascii", "replace"))
return u" ".join(h_decoded)
def parsedate(datestring):
if datestring is None:
return None
try:
parsed = dateutil.parser.parse(datestring)
except ValueError:
return None
if parsed.utcoffset() is not None and \
abs(parsed.utcoffset()) > timedelta(hours=13):
parsed = parsed.astimezone(dateutil.tz.tzutc())
return parsed
#date_tuple = email.utils.parsedate_tz(datestring)
#timestamp = email.utils.mktime_tz(date_tuple)
#return datetime.fromtimestamp(timestamp)
def get_ref(message):
"""
Returns the message-id of the reference email for a given message.
"""
if (not message.has_key("References")
and not message.has_key("In-Reply-To")):
return None
ref_id = message.get("In-Reply-To")
if ref_id is None or not ref_id.strip():
ref_id = message.get("References")
if ref_id is not None and ref_id.strip():
# There can be multiple references, use the last one
ref_id = ref_id.split()[-1].strip()
if ref_id is not None:
ref_id = IN_BRACKETS_RE.match(ref_id)
if ref_id is None:
# Can't parse the reference
return None
ref_id = ref_id.group(1)
return unicode(ref_id)
def get_ref_and_thread_id(message, list_name, store):
"""
Returns the thread ID and the message-id of the reference email for a given
message.
"""
ref_id = get_ref(message)
if ref_id is None:
return None, None
# It's a reply, use the thread_id from the parent email
ref_msg = store.get_message_by_id_from_list(list_name, ref_id)
if ref_msg is None:
thread_id = None
else:
# re-use parent's thread-id
thread_id = unicode(ref_msg.thread_id)
return ref_id, thread_id
|