summaryrefslogtreecommitdiffstats
path: root/kittystore/storm/store.py
blob: dd70bc02999e3d453d908bf96e7d19eea7bbdde7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
# -*- coding: utf-8 -*-

"""
Copyright (C) 2012 Aurélien Bompard <abompard@fedoraproject.org>
Author: Aurélien Bompard <abompard@fedoraproject.org>

This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or (at
your option) any later version.
See http://www.gnu.org/copyleft/gpl.html  for the full text of the
license.
"""

from __future__ import absolute_import

import datetime
from email.utils import unquote
from urllib2 import HTTPError

from zope.interface import implements
from mailman.interfaces.messages import IMessageStore
from storm.locals import Desc
from storm.expr import And, Or, Count, Alias
from dateutil.tz import tzutc
import mailmanclient

from kittystore import MessageNotFound
from kittystore.utils import parseaddr, parsedate
from kittystore.utils import header_to_unicode
from kittystore.scrub import Scrubber
from kittystore.utils import get_ref_and_thread_id
from kittystore.analysis import compute_thread_order_and_depth

from .model import List, Email, Attachment, Thread, EmailFull, Category


class StormStore(object):
    """
    Storm-powered interface to query emails from the database.
    """

    implements(IMessageStore)

    def __init__(self, db, search_index, settings, debug=False):
        """ Constructor.
        Create the session using the engine defined in the url.

        :param db: the Storm store object
        :param debug: a boolean to set the debug mode on or off.
        """
        self.db = db
        self.debug = debug
        self.search_index = search_index
        self.settings = settings


    # IMessageStore methods

    def add(self, message):
        """Add the message to the store.

        :param message: An email.message.Message instance containing at
            least a unique Message-ID header.  The message will be given
            an X-Message-ID-Hash header, overriding any existing such
            header.
        :returns: The calculated X-Message-ID-Hash header.
        :raises ValueError: if the message is missing a Message-ID 
            header.
            The storage service is also allowed to raise this exception
            if it find, but disallows collisions.
        """
        # Not sure this is useful: a message should always be in a list
        raise NotImplementedError

    def add_to_list(self, mlist, message):
        """Add the message to a specific list of the store.

        :param mlist: The mailing-list object, implementing
            mailman.interfaces.mailinglist.IMailingList.
        :param message: An email.message.Message instance containing at
            least a unique Message-ID header.  The message will be given
            an X-Message-ID-Hash header, overriding any existing such
            header.
        :returns: The calculated X-Message-ID-Hash header.
        :raises ValueError: if the message is missing a Message-ID 
            header.
            The storage service is also allowed to raise this exception
            if it find, but disallows collisions.
        """
        list_name = unicode(mlist.fqdn_listname)
        # Create the list if it does not exist
        l = self.db.find(List, List.name == list_name).one()
        if l is None:
            l = List(list_name)
            self.db.add(l)
        l.display_name = mlist.display_name
        l.subject_prefix = mlist.subject_prefix
        if not message.has_key("Message-Id"):
            raise ValueError("No 'Message-Id' header in email", message)
        msg_id = unicode(unquote(message['Message-Id']))
        email = Email(list_name, msg_id)
        if self.is_message_in_list(list_name, email.message_id):
            print ("Duplicate email from %s: %s" %
                   (message['From'], message.get('Subject', '""')))
            return email.message_id_hash

        # the message.as_string() call must be done before scrubbing
        email_full = EmailFull(list_name, msg_id, message.as_string())
        # Find thread id
        new_thread = False
        ref, thread_id = get_ref_and_thread_id(message, list_name, self)
        if thread_id is None:
            new_thread = True
            # make up the thread_id if not found
            thread_id = email.message_id_hash
        email.thread_id = thread_id
        email.in_reply_to = ref

        from_name, from_email = parseaddr(message['From'])
        from_name = header_to_unicode(from_name)
        email.sender_name = from_name.strip()
        email.sender_email = unicode(from_email).strip()
        email.subject = header_to_unicode(message.get('Subject'))
        msg_date = parsedate(message.get("Date"))
        if msg_date is None:
            # Absent or unparseable date
            msg_date = datetime.datetime.utcnow()
        utcoffset = msg_date.utcoffset()
        if msg_date.tzinfo is not None:
            msg_date = msg_date.astimezone(tzutc()).replace(tzinfo=None)
        email.date = msg_date
        if utcoffset is None:
            email.timezone = 0
        else:
            # in minutes
            email.timezone = ( (utcoffset.days * 24 * 60 * 60)
                               + utcoffset.seconds) / 60

        scrubber = Scrubber(list_name, message)
        # warning: scrubbing modifies the msg in-place
        email.content, attachments = scrubber.scrub()

        # store the Mailman user
        email.user_id = self._store_mailman_user(email.sender_email)

        #category = 'Question' # TODO: enum + i18n ?
        #if ('agenda' in message.get('Subject', '').lower() or
        #        'reminder' in message.get('Subject', '').lower()):
        #    # i18n!
        #    category = 'Agenda'

        if new_thread:
            thread = Thread(list_name, thread_id, email.date)
        else:
            thread = self.db.find(Thread, And(
                            Thread.list_name == list_name,
                            Thread.thread_id == thread_id,
                            )).one()
        thread.date_active = email.date
        self.db.add(thread)

        self.db.add(email)
        self.db.add(email_full)
        compute_thread_order_and_depth(thread)
        for attachment in attachments:
            self.add_attachment(list_name, msg_id, *attachment)
        self.flush()
        # search indexing
        if self.search_index is not None:
            self.search_index.add(email)
        return email.message_id_hash

    def _store_mailman_user(self, address):
        try:
            mm_client = mailmanclient.Client('%s/3.0' %
                            self.settings.MAILMAN_REST_SERVER,
                            self.settings.MAILMAN_API_USER,
                            self.settings.MAILMAN_API_PASS)
            mm_user = mm_client.get_user(address)
        except (HTTPError, mailmanclient.MailmanConnectionError), e:
            if self.debug:
                print "Can't get the user from Mailman: %s" % e
        else:
            return unicode(mm_user.user_id)


    def attach_to_thread(self, email, thread):
        """Attach an email to an existing thread"""
        if email.date <= thread.starting_email.date:
            raise ValueError("Can't attach emails older than the first "
                             "email in a thread")
        email.thread_id = thread.thread_id
        email.in_reply_to = thread.starting_email.message_id
        if email.date > thread.date_active:
            thread.date_active = email.date
        compute_thread_order_and_depth(thread)
        self.flush()

    def delete_message(self, message_id):
        """Remove the given message from the store.

        :param message: The Message-ID of the mesage to delete from the
            store.
        :raises LookupError: if there is no such message.
        """
        # Not sure this is useful: a message should always be in a list
        raise NotImplementedError

    def delete_message_from_list(self, list_name, message_id):
        """Remove the given message for a specific list from the store.

        :param list_name: The fully qualified list name to which the
            message should be added.
        :param message: The Message-ID of the mesage to delete from the
            store.
        :raises LookupError: if there is no such message.
        """
        msg = self.get_message_by_id_from_list(list_name, message_id)
        if msg is None:
            raise MessageNotFound(list_name, message_id)
        self.db.remove(msg)
        # Remove the thread if necessary
        thread = self.db.find(Thread, And(
                        Thread.list_name == msg.list_name,
                        Thread.thread_id == msg.thread_id,
                        )).one()
        if len(thread.emails) == 0:
            self.db.remove(thread)
        self.flush()

    def get_list_size(self, list_name):
        """ Return the number of emails stored for a given mailing list.

        :arg list_name, name of the mailing list in which this email
        should be searched.
        """
        return self.db.find(Email,
                Email.list_name == unicode(list_name)).count()


    def get_message_by_hash(self, message_id_hash):
        """Return the message with the matching X-Message-ID-Hash.

        :param message_id_hash: The X-Message-ID-Hash header contents to
            search for.
        :returns: The message, or None if no matching message was found.
        """
        # Not sure this is useful: a message should always be in a list
        raise NotImplementedError

    def get_message_by_hash_from_list(self, list_name, message_id_hash):
        """Return the message with the matching X-Message-ID-Hash.

        :param message_id_hash: The X-Message-ID-Hash header contents to
            search for.
        :returns: The message, or None if no matching message was found.
        """
        return self.db.find(Email, And(
                    Email.list_name == unicode(list_name),
                    Email.message_id_hash == unicode(message_id_hash)
                )).one()

    def get_message_by_id(self, message_id):
        """Return the message with a matching Message-ID.

        :param message_id: The Message-ID header contents to search for.
        :returns: The message, or None if no matching message was found.
        """
        # Not sure this is useful: a message should always be in a list
        raise NotImplementedError

    def get_message_by_id_from_list(self, list_name, message_id):
        """Return the message with a matching Message-ID.

        :param list_name: The fully qualified list name to which the
            message should be added.
        :param message_id: The Message-ID header contents to search for.
        :returns: The message, or None if no matching message was found.
        """
        msg = self.db.find(Email, And(
                    Email.list_name == unicode(list_name),
                    Email.message_id == unicode(message_id)
                )).one()
        return msg

    def search(self, query, list_name=None, page=None, limit=10,
               sortedby=None, reverse=False):
        """ Returns a list of email containing the specified keyword in
        their content.

        :param query: the query string to execute.
        :param list_name: name of the mailing list in which this email
            should be searched. If None or not specified, all lists are
            searched.
        :param page: the page number to return. If None, don't paginate.
        :param limit: the number of results per page.
        :param sortedby: the field to sort by. If None or not specified, sort
            by match score.
        :param reverse: reverse the order of the results.
        """
        if list_name is not None:
            query += " list_name:%s" % list_name
        results = self.search_index.search(
                query, page, limit, sortedby=sortedby, reverse=reverse)
        results["results"] = [ self.get_message_by_id_from_list(
                                    r["list_name"], r["message_id"])
                               for r in results["results"] ]
        return results

    def search_list_for_content(self, list_name, keyword):
        """ Returns a list of email containing the specified keyword in
        their content.

        :param list_name: name of the mailing list in which this email
        should be searched.
        :param keyword: keyword to search in the content of the emails.
        """
        emails = self.db.find(Email, And(
                    Email.list_name == unicode(list_name),
                    Email.content.like(u'%{0}%'.format(keyword), case_sensitive = False)
                )).order_by(Desc(Email.date))
        return emails

    def search_list_for_content_subject(self, list_name, keyword):
        """ Returns a list of email containing the specified keyword in
        their content or their subject.

        :param list_name: name of the mailing list in which this email
            should be searched.
        :param keyword: keyword to search in the content or subject of
            the emails.
        """
        emails = self.db.find(Email, And(
                    Email.list_name == unicode(list_name),
                    Or(
                        Email.content.like(u'%{0}%'.format(keyword), case_sensitive = False),
                        Email.subject.like(u'%{0}%'.format(keyword), case_sensitive = False),
                ))).order_by(Desc(Email.date))
        return emails

    def search_list_for_sender(self, list_name, keyword):
        """ Returns a list of email containing the specified keyword in
        the name or email address of the sender of the email.

        :param list_name: name of the mailing list in which this email
            should be searched.
        :param keyword: keyword to search in the database.
        """
        emails = self.db.find(Email, And(
                    Email.list_name == unicode(list_name),
                    Or(
                        Email.sender_name.like(u'%{0}%'.format(keyword), case_sensitive = False),
                        Email.sender_email.like(u'%{0}%'.format(keyword), case_sensitive = False),
                ))).order_by(Desc(Email.date))
        return emails

    def search_list_for_subject(self, list_name, keyword):
        """ Returns a list of email containing the specified keyword in
        their subject.

        :param list_name: name of the mailing list in which this email
            should be searched.
        :param keyword: keyword to search in the subject of the emails.
        """
        emails = self.db.find(Email, And(
                    Email.list_name == unicode(list_name),
                    Email.subject.like(u'%{0}%'.format(keyword), case_sensitive = False),
                )).order_by(Desc(Email.date))
        return emails

    @property
    def messages(self):
        """An iterator over all messages in this message store."""
        raise NotImplementedError

    # Other methods (not in IMessageStore)

    def is_message_in_list(self, list_name, message_id):
        """Return the number of messages with a matching Message-ID in the list.

        :param list_name: The fully qualified list name to which the
            message should be added.
        :param message_id: The Message-ID header contents to search for.
        :returns: The message, or None if no matching message was found.
        """
        return self.db.find(Email.message_id, And(
                    Email.list_name == unicode(list_name),
                    Email.message_id == unicode(message_id)
                )).count()


    def get_list_names(self):
        """Return the names of the archived lists.

        :returns: A list containing the names of the archived mailing-lists.
        """
        return list(self.db.find(List.name).order_by(List.name))

    def get_lists(self):
        """Return the archived lists.

        :returns: A list containing the archived mailing-lists.
        """
        return list(self.db.find(List).order_by(List.name))

    def get_messages(self, list_name, start, end):
        """ Return all emails between two given dates.

        :param list_name: The name of the mailing list in which these emails
            should be searched.
        :param start: A datetime object representing the starting date of
            the interval to query.
        :param end: A datetime object representing the ending date of
            the interval to query.
        :returns: The list of messages.
        """
        emails = self.db.find(Email, And(
                    Email.list_name == unicode(list_name),
                    Email.date >= start,
                    Email.date < end,
                )).order_by(Desc(Email.date))
        return list(emails)

    def get_thread(self, list_name, thread_id):
        """ Return the specified thread.

        :param list_name: The name of the mailing list in which this email
            should be searched.
        :param thread_id: The thread_id as used in the web-pages. Used here to
            uniquely identify the thread in the database.
        :returns: The thread object.
        """
        return self.db.find(Thread, And(
                    Thread.list_name == unicode(list_name),
                    Thread.thread_id == unicode(thread_id)
                    )).one()

    def get_threads(self, list_name, start, end):
        """ Return all the threads active between two given dates.

        :param list_name: The name of the mailing list in which this email
            should be searched.
        :param start: A datetime object representing the starting date of
            the interval to query.
        :param end: A datetime object representing the ending date of
            the interval to query.
        :returns: The list of thread-starting messages.
        """
        threads = self.db.find(Thread, And(
                    Thread.list_name == unicode(list_name),
                    Thread.date_active >= start,
                    Thread.date_active < end,
                )).order_by(Desc(Thread.date_active))
        return list(threads)

    def get_start_date(self, list_name):
        """ Get the date of the first archived email in a list.

        :param list_name: The fully qualified list name to search
        :returns: The datetime of the first message, or None if no message have
            been archived yet.
        """
        date = self.db.find(Email.date,
                Email.list_name == unicode(list_name)
                ).order_by(Email.date)[:1]
        if date:
            return date.one()
        else:
            return None

    def get_thread_neighbors(self, list_name, thread_id):
        """ Return the previous and the next threads of the specified thread,
        in date order.

        :param list_name: The name of the mailing list to query.
        :param thread_id: The unique identifier of the thread as specified in
            the database.
        :returns: A couple formed of the older thread and the newer thread, in
            this order.
        :rtype: tuple
        """
        thread = self.get_thread(list_name, thread_id)
        next_thread = self.db.find(Thread, And(
                    Thread.list_name == unicode(list_name),
                    Thread.date_active > thread.date_active,
                )).order_by(Thread.date_active)
        try:
            next_thread = next_thread[0]
        except IndexError:
            next_thread = None
        prev_thread = self.db.find(Thread, And(
                    Thread.list_name == unicode(list_name),
                    Thread.date_active < thread.date_active,
                )).order_by(Desc(Thread.date_active))
        try:
            prev_thread = prev_thread[0]
        except IndexError:
            prev_thread = None
        return (prev_thread, next_thread)

    def delete_thread(self, list_name, thread_id):
        """ Delete the specified thread.

        :param list_name: The name of the mailing list containing this thread
        :param thread_id: The thread_id as used in the web-pages. Used here to
            uniquely identify the thread in the database.
        """
        self.db.find(Thread, And(
                Thread.list_name == unicode(list_name),
                Thread.thread_id == unicode(thread_id)
                )).remove()

    def get_list(self, list_name):
        """ Return the list object for a mailing list name.

        :arg list_name, name of the mailing list to retrieve.
        """
        return self.db.find(List, List.name == unicode(list_name)).one()

    def get_message_by_number(self, list_name, num):
        """ Return the n-th email for the specified list.

        :param list_name: The name of the mailing list in which this email
            should be searched.
        :param num: The email number in order received.
        :returns: The email message.
        """
        result = self.db.find(Email, Email.list_name == unicode(list_name)
                    ).order_by(Email.archived_date
                    )[num:num+1].one()
        return result

    def get_top_participants(self, list_name, start, end, limit=None):
        """ Return all the participants between two given dates.

        :param list_name: The name of the mailing list in which this email
            should be searched.
        :param start: A datetime object representing the starting date of
            the interval to query.
        :param end: A datetime object representing the ending date of
            the interval to query.
        :param limit: Limit the number of participants to return. If None or
            not supplied, return them all.
        :returns: The list of thread-starting messages.
        """
        number = Alias(Count(Email.sender_email), "number")
        part = self.db.find(
                (Email.sender_name, Email.sender_email, number),
                And(
                    Email.list_name == unicode(list_name),
                    Email.date >= start,
                    Email.date < end,
                )).group_by(Email.sender_email, Email.sender_name).order_by(Desc(number))
        if limit is not None:
            part = part.config(limit=limit)
        return list(part)


    def get_categories(self):
        """ Return the list of available categories
        """
        return list(self.db.find(Category.name).order_by(Category.name))


    def get_first_post(self, list_name, user_id):
        """ Returns a user's first post on a list """
        result = self.db.find(Email, And(
                    Email.list_name == unicode(list_name),
                    Email.user_id == unicode(user_id),
                    )).order_by(Email.archived_date
                    ).config(limit=1).one()
        return result

    def get_sender_name(self, user_id):
        """ Returns a user's fullname when given his user_id """
        result = self.db.find(Email.sender_name,
                              Email.user_id == unicode(user_id)
                    ).config(limit=1).one()
        return result

    def get_message_hashes_by_user_id(self, user_id, list_name=None):
        """ Returns a user's email hashes """
        if list_name is None:
            clause = Email.user_id == unicode(user_id)
        else:
            clause = And(Email.user_id == unicode(user_id),
                         Email.list_name == unicode(list_name))
        result = self.db.find(Email.message_id_hash, clause)
        return list(result)


    # Attachments

    def add_attachment(self, mlist, msg_id, counter, name, content_type,
                       encoding, content):
        existing = self.db.find(Attachment.message_id, And(
                    Attachment.list_name == unicode(mlist),
                    Attachment.message_id == unicode(msg_id),
                    Attachment.counter == counter,
                )).count()
        if existing:
            return
        attachment = Attachment()
        attachment.list_name = unicode(mlist)
        attachment.message_id = unicode(msg_id)
        attachment.counter = counter
        attachment.name = unicode(name)
        attachment.content_type = unicode(content_type)
        attachment.encoding = unicode(encoding) if encoding is not None else None
        attachment.content = content
        attachment.size = len(content)
        self.db.add(attachment)
        self.flush()

    def get_attachments(self, list_name, message_id):
        """Return the message's attachments

        :param list_name: The fully qualified list name to which the
            message should be added.
        :param message_id: The Message-ID header contents to search for.
        :returns: A list of attachments
        """
        att = self.db.find(Attachment, And(
                    Attachment.list_name == unicode(list_name),
                    Attachment.message_id == unicode(message_id)
                )).order_by(Attachment.counter)
        return list(att)

    def get_attachment_by_counter(self, list_name, message_id, counter):
        """Return the message's attachment at 'counter' position.

        :param list_name: The fully qualified list name to which the
            message should be added.
        :param message_id: The Message-ID header contents to search for.
        :param counter: The position in the MIME-multipart email.
        :returns: The corresponding attachment
        """
        return self.db.find(Attachment, And(
                    Attachment.list_name == unicode(list_name),
                    Attachment.message_id == unicode(message_id),
                    Attachment.counter == counter
                )).one()

    # Generic database operations

    def flush(self):
        """Flush pending database operations."""
        self.db.flush()

    def commit(self):
        """Commit transaction to the database."""
        self.db.commit()

    def close(self):
        """Close the connection."""
        self.db.close()

    def rollback(self):
        self.db.rollback()