diff options
author | Aurélien Bompard <aurelien@bompard.org> | 2012-08-28 18:06:08 +0200 |
---|---|---|
committer | Aurélien Bompard <aurelien@bompard.org> | 2012-09-07 10:40:55 +0200 |
commit | e07116df51f093ba21b5e07c9545b89bf4c192db (patch) | |
tree | fd6f54cbdb2edc03218ce2ca3ef575d288116f37 | |
parent | 29bcaf005efd58021a60e109f4f6e70a055560f4 (diff) | |
download | kittystore-e07116df51f093ba21b5e07c9545b89bf4c192db.tar.gz kittystore-e07116df51f093ba21b5e07c9545b89bf4c192db.tar.xz kittystore-e07116df51f093ba21b5e07c9545b89bf4c192db.zip |
Add a Storm-based backend
This backend will be the primary SQL backend, SQLAlchemy is deprecated.
-rw-r--r-- | kittystore/__init__.py | 8 | ||||
-rw-r--r-- | kittystore/sa/store.py | 6 | ||||
-rw-r--r-- | kittystore/storm/__init__.py | 24 | ||||
-rw-r--r-- | kittystore/storm/hack_datetime.py | 88 | ||||
-rw-r--r-- | kittystore/storm/model.py | 56 | ||||
-rw-r--r-- | kittystore/storm/schema/__init__.py | 67 | ||||
-rw-r--r-- | kittystore/storm/schema/patch_1.py | 6 | ||||
-rw-r--r-- | kittystore/storm/store.py | 376 | ||||
-rw-r--r-- | kittystore/utils.py | 6 | ||||
-rw-r--r-- | setup.py | 1 | ||||
-rw-r--r-- | to_sqldb.py | 6 |
11 files changed, 635 insertions, 9 deletions
diff --git a/kittystore/__init__.py b/kittystore/__init__.py index 7b57ce0..db00b1b 100644 --- a/kittystore/__init__.py +++ b/kittystore/__init__.py @@ -22,9 +22,13 @@ def get_store(url, debug=False): """Factory for a KittyStore subclass""" if url.startswith("mongo://"): raise NotImplementedError + #else: + # from kittystore.sa import KittySAStore + # return KittySAStore(url, debug) else: - from kittystore.sa import KittySAStore - return KittySAStore(url, debug) + from kittystore.storm import get_storm_store + return get_storm_store(url, debug) + class MessageNotFound(Exception): pass diff --git a/kittystore/sa/store.py b/kittystore/sa/store.py index 019bc0e..03632ae 100644 --- a/kittystore/sa/store.py +++ b/kittystore/sa/store.py @@ -401,3 +401,9 @@ class KittySAStore(object): self.metadata) return self.session.query(distinct(email.sender)).filter( email.thread_id == thread_id).all() + + def flush(self): + self.session.flush() + + def commit(self): + self.session.commit() diff --git a/kittystore/storm/__init__.py b/kittystore/storm/__init__.py new file mode 100644 index 0000000..e4d716d --- /dev/null +++ b/kittystore/storm/__init__.py @@ -0,0 +1,24 @@ +# -*- coding: utf-8 -*- + +from __future__ import absolute_import, with_statement + +import sys + +import storm.tracer +from storm.locals import create_database, Store +from storm.schema.schema import Schema + +from .model import List, Email +from . import schema +from .store import StormStore + + +def get_storm_store(url, debug=False): + if debug: + storm.tracer.debug(True, stream=sys.stdout) + database = create_database(url) + store = Store(database) + dbtype = url.partition(":")[0] + dbschema = Schema(schema.CREATES[dbtype], [], [], schema) + dbschema.upgrade(store) + return StormStore(store, debug) diff --git a/kittystore/storm/hack_datetime.py b/kittystore/storm/hack_datetime.py new file mode 100644 index 0000000..78df935 --- /dev/null +++ b/kittystore/storm/hack_datetime.py @@ -0,0 +1,88 @@ +# -*- coding: utf-8 -*- +# +# hack around a bug in storm: support for timezones is missing +# https://bugs.launchpad.net/storm/+bug/280708 +# + +import datetime +import re +from storm.locals import * +from storm.variables import _parse_date, _parse_time + + +RE_TIME = re.compile(r"""^ + (?P<year>\d{4})\-(?P<month>\d{2})\-(?P<day>\d{2}) # pattern matching date + T # seperator + (?P<hour>\d{2})\:(?P<minutes>\d{2})\:(?P<seconds>\d{2}) # pattern matching time + (\.(?P<microseconds>\d{6}))? # pattern matching optional microseconds + (?P<tz_offset>[\-\+]\d{2}\:\d{2})? # pattern matching optional timezone offset + $""", re.VERBOSE) + +def parse_time(time_str): + x = RE_TIME.match(time_str) + if not x: + raise ValueError + d = datetime.datetime(int(x.group("year")), int(x.group("month")), + int(x.group("day")), int(x.group("hour")), int(x.group("minutes")), + int(x.group("seconds"))) + if x.group("microseconds"): + d = d.replace(microsecond=int(x.group("microseconds"))) + if x.group("tz_offset"): + d = d.replace(tzinfo=TimeZone(x.group("tz_offset"))) + return d + +class DateTimeVariableHack(DateTime.variable_class): + + def parse_set(self, value, from_db): + if from_db: + if isinstance(value, datetime.datetime): + pass + elif isinstance(value, (str, unicode)): + if value.count(":") == 3: #additional timezone info + value = value.replace(" ", "T") + value = parse_time(value) + else: + if " " not in value: + raise ValueError("Unknown date/time format: %r" % value) + date_str, time_str = value.split(" ") + value = datetime.datetime(*(_parse_date(date_str) + + _parse_time(time_str))) + else: + raise TypeError("Expected datetime, found %s" % repr(value)) + if self._tzinfo is not None: + if value.tzinfo is None: + value = value.replace(tzinfo=self._tzinfo) + else: + value = value.astimezone(self._tzinfo) + else: + if type(value) in (int, long, float): + value = datetime.datetime.utcfromtimestamp(value) + elif not isinstance(value, datetime.datetime): + raise TypeError("Expected datetime, found %s" % repr(value)) + if self._tzinfo is not None: + value = value.astimezone(self._tzinfo) + return value + + + +class DateTime(DateTime): + variable_class = DateTimeVariableHack + + + +class TimeZone(datetime.tzinfo): + + def __init__(self, tz_string): + hours, minutes = tz_string.lstrip("-+").split(":") + self.stdoffset = datetime.timedelta(hours=int(hours), minutes=int(minutes)) + if tz_string.startswith("-"): + self.stdoffset *= -1 + + def __repr__(self): + return "TimeZone(%s)" %(self.stdoffset.days*24*60*60 + self.stdoffset.seconds) + + def utcoffset(self, dt): + return self.stdoffset + + def dst(self, dt): + return datetime.timedelta(0) diff --git a/kittystore/storm/model.py b/kittystore/storm/model.py new file mode 100644 index 0000000..469c5fa --- /dev/null +++ b/kittystore/storm/model.py @@ -0,0 +1,56 @@ +# -*- coding: utf-8 -*- + +""" +Copyright (C) 2012 Aurelien Bompard <abompard@fedoraproject.org> +Author: Aurelien Bompard <abompard@fedoraproject.org> + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; either version 2 of the License, or (at +your option) any later version. +See http://www.gnu.org/copyleft/gpl.html for the full text of the +license. +""" + +import datetime + +from storm.locals import * +from .hack_datetime import DateTime + +from kittystore.utils import get_message_id_hash + + +__all__ = ("List", "Email",) + +class List(object): + + __storm_table__ = "list" + + name = Unicode(primary=True) + + def __init__(self, name): + self.name = unicode(name) + + +class Email(object): + + __storm_table__ = "email" + __storm_primary__ = "list_name", "message_id" + + list_name = Unicode() + message_id = Unicode() + sender_name = Unicode() + sender_email = Unicode() + subject = Unicode() + content = Unicode() + date = DateTime() + in_reply_to = Unicode() + hash_id = Unicode() + thread_id = Unicode() + full = RawStr() + archived_date = DateTime(default_factory=datetime.datetime.now) + + def __init__(self, list_name, message_id): + self.list_name = unicode(list_name) + self.message_id = unicode(message_id) + self.hash_id = unicode(get_message_id_hash(self.message_id)) diff --git a/kittystore/storm/schema/__init__.py b/kittystore/storm/schema/__init__.py new file mode 100644 index 0000000..ece8df8 --- /dev/null +++ b/kittystore/storm/schema/__init__.py @@ -0,0 +1,67 @@ +# -*- coding: utf-8 -*- + + +CREATES = { + + "sqlite": [ """ + CREATE TABLE "list" ( + name VARCHAR(255) NOT NULL, + PRIMARY KEY (name) + );""", """ + CREATE TABLE "email" ( + list_name VARCHAR(255) NOT NULL, + message_id VARCHAR(255) NOT NULL, + sender_name VARCHAR(255) NOT NULL, + sender_email VARCHAR(255) NOT NULL, + subject TEXT NOT NULL, + content TEXT NOT NULL, + date DATETIME NOT NULL, + in_reply_to VARCHAR(255), -- How about replies from another list ? + hash_id VARCHAR(255) NOT NULL, + thread_id VARCHAR(255) NOT NULL, + "full" BLOB NOT NULL, + archived_date DATETIME DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (list_name, message_id) + );""", + 'CREATE INDEX "ix_email_list_name" ON "email" (list_name);', + 'CREATE UNIQUE INDEX "ix_email_message_id" ON "email" (message_id);', + 'CREATE INDEX "ix_email_date" ON "email" (date);', + 'CREATE UNIQUE INDEX "ix_email_hash_id" ON "email" (hash_id);', + 'CREATE INDEX "ix_email_subject" ON "email" (subject);', + 'CREATE INDEX "ix_email_thread_id" ON "email" (thread_id);', + ], + + "postgres": [ """ + CREATE TABLE "list" ( + name VARCHAR(255) NOT NULL, + PRIMARY KEY (name) + );""", """ + CREATE TABLE "email" ( + list_name VARCHAR(255) NOT NULL, + message_id VARCHAR(255) NOT NULL, + sender_name VARCHAR(255) NOT NULL, + sender_email VARCHAR(255) NOT NULL, + subject TEXT NOT NULL, + content TEXT NOT NULL, + date TIMESTAMP WITH TIME ZONE NOT NULL, + in_reply_to VARCHAR(255), -- How about replies from another list ? + hash_id VARCHAR(255) NOT NULL, + thread_id VARCHAR(255) NOT NULL, + "full" BYTEA NOT NULL, + archived_date TIMESTAMP WITHOUT TIME ZONE DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (list_name, message_id) + );""", + 'CREATE INDEX "ix_email_list_name" ON "email" USING btree (list_name);', + 'CREATE UNIQUE INDEX "ix_email_message_id" ON "email" USING btree (message_id);', + 'CREATE INDEX "ix_email_date" ON "email" USING btree (date);', + 'CREATE UNIQUE INDEX "ix_email_hash_id" ON "email" USING btree (hash_id);', + 'CREATE INDEX "ix_email_subject" ON "email" USING btree (subject);', + 'CREATE INDEX "ix_email_thread_id" ON "email" USING btree (thread_id);', + ], + +} + + +def get_db_type(store): + database = store.get_database() + return database.__class__.__module__.split(".")[-1] diff --git a/kittystore/storm/schema/patch_1.py b/kittystore/storm/schema/patch_1.py new file mode 100644 index 0000000..8497968 --- /dev/null +++ b/kittystore/storm/schema/patch_1.py @@ -0,0 +1,6 @@ +# -*- coding: utf-8 -*- + + +def apply(store): + """Store '1' in the patch version table""" + pass diff --git a/kittystore/storm/store.py b/kittystore/storm/store.py new file mode 100644 index 0000000..167ac1e --- /dev/null +++ b/kittystore/storm/store.py @@ -0,0 +1,376 @@ +# -*- coding: utf-8 -*- + +""" +Copyright (C) 2012 Aurélien Bompard <abompard@fedoraproject.org> +Author: Aurélien Bompard <abompard@fedoraproject.org> + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; either version 2 of the License, or (at +your option) any later version. +See http://www.gnu.org/copyleft/gpl.html for the full text of the +license. +""" + +from __future__ import absolute_import + +import datetime + +from kittystore import MessageNotFound +from kittystore.utils import get_message_id_hash, parseaddr, parsedate +from kittystore.utils import header_to_unicode, payload_to_unicode +from kittystore.utils import get_ref_and_thread_id + +from zope.interface import implements +from mailman.interfaces.messages import IMessageStore +from storm.locals import * + +from .model import List, Email + +#from kittystore.sa.kittysamodel import get_class_object +#from sqlalchemy import create_engine, distinct, MetaData, and_, desc, or_ +#from sqlalchemy.ext.declarative import declarative_base +#from sqlalchemy.orm import sessionmaker +#from sqlalchemy.orm.exc import NoResultFound + + +class StormStore(object): + """ + Storm-powered interface to query emails from the database. + """ + + implements(IMessageStore) + + def __init__(self, db, debug=False): + """ Constructor. + Create the session using the engine defined in the url. + + :param db: the Storm store object + :param debug: a boolean to set the debug mode on or off. + """ + self.db = db + + def add(self, message): + """Add the message to the store. + + :param message: An email.message.Message instance containing at + least a unique Message-ID header. The message will be given + an X-Message-ID-Hash header, overriding any existing such + header. + :returns: The calculated X-Message-ID-Hash header. + :raises ValueError: if the message is missing a Message-ID + header. + The storage service is also allowed to raise this exception + if it find, but disallows collisions. + """ + # Not sure this is useful: a message should always be in a list + raise NotImplementedError + + def add_to_list(self, list_name, message): + """Add the message to a specific list of the store. + + :param list_name: The fully qualified list name to which the + message should be added. + :param message: An email.message.Message instance containing at + least a unique Message-ID header. The message will be given + an X-Message-ID-Hash header, overriding any existing such + header. + :returns: The calculated X-Message-ID-Hash header. + :raises ValueError: if the message is missing a Message-ID + header. + The storage service is also allowed to raise this exception + if it find, but disallows collisions. + """ + # Create the list if it does not exist + list_is_in_db = self.db.find(List, + List.name == unicode(list_name)).count() + if not list_is_in_db: + self.db.add(List(list_name)) + if not message.has_key("Message-Id"): + raise ValueError("No 'Message-Id' header in email", message) + msg_id = message['Message-Id'].strip("<>") + email = Email(list_name, msg_id) + if self.is_message_in_list(list_name, email.message_id): + print ("Duplicate email from %s: %s" % + (message['From'], message.get('Subject', '""'))) + return email.hash_id + + # Find thread id + ref, thread_id = get_ref_and_thread_id(message, list_name, self) + if thread_id is None: + # make up the thread_id if not found + thread_id = email.hash_id + email.thread_id = thread_id + email.in_reply_to = ref + + from_name, from_email = parseaddr(message['From']) + from_name = header_to_unicode(from_name) + email.sender_name = from_name + email.sender_email = unicode(from_email) + email.subject = header_to_unicode(message.get('Subject')) + payload = payload_to_unicode(message) + email.content = payload + email.date = parsedate(message.get("Date")) + email.full = message.as_string() + + #category = 'Question' # TODO: enum + i18n ? + #if ('agenda' in message.get('Subject', '').lower() or + # 'reminder' in message.get('Subject', '').lower()): + # # i18n! + # category = 'Agenda' + + self.db.add(email) + self.flush() + return email.hash_id + + def delete_message(self, message_id): + """Remove the given message from the store. + + :param message: The Message-ID of the mesage to delete from the + store. + :raises LookupError: if there is no such message. + """ + # Not sure this is useful: a message should always be in a list + raise NotImplementedError + + def delete_message_from_list(self, list_name, message_id): + """Remove the given message for a specific list from the store. + + :param list_name: The fully qualified list name to which the + message should be added. + :param message: The Message-ID of the mesage to delete from the + store. + :raises LookupError: if there is no such message. + """ + msg = self.get_message_by_id_from_list(list_name, message_id) + if msg is None: + raise MessageNotFound(list_name, message_id) + self.db.delete(msg) + self.flush() + + def get_list_size(self, list_name): + """ Return the number of emails stored for a given mailing list. + + :arg list_name, name of the mailing list in which this email + should be searched. + """ + return self.db.find(Email, + Email.list_name == unicode(list_name)).count() + + + def get_message_by_hash(self, message_id_hash): + """Return the message with the matching X-Message-ID-Hash. + + :param message_id_hash: The X-Message-ID-Hash header contents to + search for. + :returns: The message, or None if no matching message was found. + """ + # Not sure this is useful: a message should always be in a list + raise NotImplementedError + + def get_message_by_hash_from_list(self, list_name, message_id_hash): + """Return the message with the matching X-Message-ID-Hash. + + :param message_id_hash: The X-Message-ID-Hash header contents to + search for. + :returns: The message, or None if no matching message was found. + """ + return self.db.find(Email, + Email.hash_id == unicode(message_id_hash)).one() + + def get_message_by_id(self, message_id): + """Return the message with a matching Message-ID. + + :param message_id: The Message-ID header contents to search for. + :returns: The message, or None if no matching message was found. + """ + # Not sure this is useful: a message should always be in a list + raise NotImplementedError + + def get_message_by_id_from_list(self, list_name, message_id): + """Return the message with a matching Message-ID. + + :param list_name: The fully qualified list name to which the + message should be added. + :param message_id: The Message-ID header contents to search for. + :returns: The message, or None if no matching message was found. + """ + msg = self.db.find(Email, + Email.message_id == unicode(message_id)).one() + return msg + + def is_message_in_list(self, list_name, message_id): + """Return the number of messages with a matching Message-ID in the list. + + :param list_name: The fully qualified list name to which the + message should be added. + :param message_id: The Message-ID header contents to search for. + :returns: The message, or None if no matching message was found. + """ + return self.db.find(Email.message_id, + Email.message_id == unicode(message_id)).count() + + def search_list_for_content(self, list_name, keyword): + """ Returns a list of email containing the specified keyword in + their content. + + :param list_name: name of the mailing list in which this email + should be searched. + :param keyword: keyword to search in the content of the emails. + """ + emails = self.db.find(Email, + Email.content.ilike(u'%{0}%'.format(keyword)) + ).order_by(Desc(Email.date)) + return emails + + def search_list_for_content_subject(self, list_name, keyword): + """ Returns a list of email containing the specified keyword in + their content or their subject. + + :param list_name: name of the mailing list in which this email + should be searched. + :param keyword: keyword to search in the content or subject of + the emails. + """ + emails = self.db.find(Email, Or( + Email.content.ilike(u'%{0}%'.format(keyword)), + Email.subject.ilike(u'%{0}%'.format(keyword)), + )).order_by(Desc(Email.date)) + return emails + + def search_list_for_sender(self, list_name, keyword): + """ Returns a list of email containing the specified keyword in + the name or email address of the sender of the email. + + :param list_name: name of the mailing list in which this email + should be searched. + :param keyword: keyword to search in the database. + """ + emails = self.db.find(Email, Or( + Email.sender_name.ilike(u'%{0}%'.format(keyword)), + Email.sender_email.ilike(u'%{0}%'.format(keyword)), + )).order_by(Desc(Email.date)) + return emails + + def search_list_for_subject(self, list_name, keyword): + """ Returns a list of email containing the specified keyword in + their subject. + + :param list_name: name of the mailing list in which this email + should be searched. + :param keyword: keyword to search in the subject of the emails. + """ + emails = self.db.find(Email, + Email.subject.ilike(u'%{0}%'.format(keyword)), + ).order_by(Desc(Email.date)) + return emails + + @property + def messages(self): + """An iterator over all messages in this message store.""" + raise NotImplementedError + + + + + def get_list_names(self): + """Return the names of the archived lists. + + :returns: A list containing the names of the archived mailing-lists. + """ + return list(self.db.find(List.name).order_by(List.name)) + + def get_archives(self, list_name, start, end): + """ Return all the thread-starting emails between two given dates. + + :arg list_name, name of the mailing list in which this email + should be searched. + :arg start, a datetime object representing the starting date of + the interval to query. + :arg end, a datetime object representing the ending date of + the interval to query. + """ + # Beginning of thread == No 'References' header + emails = self.db.find(Email, And( + Email.list_name == unicode(list_name), + Email.in_reply_to == None, + Email.date >= start, + Email.date <= end, + )).order_by(Desc(Email.date)) + return list(emails) + + def get_archives_length(self, list_name): + """ Return a dictionnary of years, months for which there are + potentially archives available for a given list (based on the + oldest post on the list). + + :arg list_name, name of the mailing list in which this email + should be searched. + """ + archives = {} + first = self.db.find(Email.date, + Email.list_name == unicode(list_name) + ).order_by(Email.date)[:1] + if not list(first): + return archives + else: + first = first.one() + now = datetime.datetime.now() + year = first.year + month = first.month + while year < now.year: + archives[year] = range(1, 13)[(month -1):] + year = year + 1 + month = 1 + archives[now.year] = range(1, 13)[:now.month] + return archives + + def get_thread(self, list_name, thread_id): + """ Return all the emails present in a thread. This thread + is uniquely identified by its thread_id. + + :arg list_name, name of the mailing list in which this email + should be searched. + :arg thread_id, thread_id as used in the web-pages. + Used here to uniquely identify the thread in the database. + """ + emails = self.db.find(Email, And( + Email.list_name == unicode(list_name), + Email.thread_id == unicode(thread_id), + )).order_by(Desc(Email.date)) + return list(emails) + + def get_thread_length(self, list_name, thread_id): + """ Return the number of email present in a thread. This thread + is uniquely identified by its thread_id. + + :arg list_name, name of the mailing list in which this email + should be searched. + :arg thread_id, unique identifier of the thread as specified in + the database. + """ + return self.db.find(Email, And( + Email.list_name == unicode(list_name), + Email.thread_id == unicode(thread_id), + )).count() + + def get_thread_participants(self, list_name, thread_id): + """ Return the list of participant in a thread. This thread + is uniquely identified by its thread_id. + + :arg list_name, name of the mailing list in which this email + should be searched. + :arg thread_id, unique identifier of the thread as specified in + the database. + """ + participants = self.db.find(Email.sender_name, And( + Email.list_name == unicode(list_name), + Email.thread_id == unicode(thread_id), + )).config(distinct=True) + return list(participants) + + def flush(self): + self.db.flush() + + def commit(self): + self.db.commit() diff --git a/kittystore/utils.py b/kittystore/utils.py index f14ea24..28a3e15 100644 --- a/kittystore/utils.py +++ b/kittystore/utils.py @@ -81,7 +81,7 @@ def payload_to_unicode(message): except UnicodeDecodeError: continue else: - print encoding, payload + #print encoding, payload break # Try UTF-8 #part.set_charset("utf-8") @@ -125,6 +125,6 @@ def get_ref_and_thread_id(message, list_name, store): thread_id = None else: # re-use parent's thread-id - thread_id = ref_msg.thread_id - return ref_id, thread_id + thread_id = unicode(ref_msg.thread_id) + return unicode(ref_id), thread_id @@ -18,6 +18,7 @@ setup( 'mailman', 'zope.interface', 'SQLAlchemy==0.7.8', + 'storm', 'python-dateutil < 2.0' # 2.0+ is for Python 3 'mock', ], diff --git a/to_sqldb.py b/to_sqldb.py index 2f709bb..73ce615 100644 --- a/to_sqldb.py +++ b/to_sqldb.py @@ -56,9 +56,9 @@ def to_db(mbfile, list_name, store): # Database is locked time.sleep(1) msg_id_hash = store.add_to_list(list_name, message) - store.session.flush() + store.flush() cnt = cnt + 1 - store.session.commit() + store.commit() print ' %s email read' % cnt_read print ' %s email added to the database' % cnt @@ -77,5 +77,3 @@ python to_sqldb.py list_name mbox_file [mbox_file]''' if os.path.exists(mbfile): to_db(mbfile, sys.argv[1], store) print ' %s emails are stored into the database' % store.get_list_size(sys.argv[1]) - store.session.close() - |