summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--tests/unit/db/sqlalchemy/test_sqlalchemy.py59
1 files changed, 58 insertions, 1 deletions
diff --git a/tests/unit/db/sqlalchemy/test_sqlalchemy.py b/tests/unit/db/sqlalchemy/test_sqlalchemy.py
index 90a048e..efe0264 100644
--- a/tests/unit/db/sqlalchemy/test_sqlalchemy.py
+++ b/tests/unit/db/sqlalchemy/test_sqlalchemy.py
@@ -1,4 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
+# encoding=UTF8
+
# Copyright (c) 2012 Rackspace Hosting
# All Rights Reserved.
#
@@ -18,7 +20,7 @@
from sqlalchemy import Column, MetaData, Table, UniqueConstraint
from sqlalchemy.ext.declarative import declarative_base
-from sqlalchemy import DateTime, Integer
+from sqlalchemy import DateTime, Integer, String
from openstack.common.db import exception as db_exc
from openstack.common.db.sqlalchemy import models
@@ -57,6 +59,7 @@ class SessionErrorWrapperTestCase(test_utils.BaseTestCase):
meta.bind = session.get_engine()
test_table = Table(_TABLE_NAME, meta, autoload=True)
test_table.drop()
+ session.cleanup()
def test_flush_wrapper(self):
tbl = TmpTable()
@@ -80,3 +83,57 @@ class SessionErrorWrapperTestCase(test_utils.BaseTestCase):
update
self.assertRaises(db_exc.DBDuplicateEntry,
method, {'foo': 20})
+
+
+_REGEXP_TABLE_NAME = _TABLE_NAME + "regexp"
+
+
+class RegexpTable(BASE, models.ModelBase):
+ __tablename__ = _REGEXP_TABLE_NAME
+ id = Column(Integer, primary_key=True)
+ bar = Column(String(255))
+
+
+class RegexpFilterTestCase(test_utils.BaseTestCase):
+
+ def setUp(self):
+ super(RegexpFilterTestCase, self).setUp()
+ meta = MetaData()
+ meta.bind = session.get_engine()
+ test_table = Table(_REGEXP_TABLE_NAME, meta,
+ Column('id', Integer, primary_key=True,
+ nullable=False),
+ Column('bar', String(255)))
+ test_table.create()
+
+ def tearDown(self):
+ super(RegexpFilterTestCase, self).tearDown()
+ meta = MetaData()
+ meta.bind = session.get_engine()
+ test_table = Table(_REGEXP_TABLE_NAME, meta, autoload=True)
+ test_table.drop()
+ session.cleanup()
+
+ def _test_regexp_filter(self, regexp, expected):
+ _session = session.get_session()
+ with _session.begin():
+ for i in ['10', '20', u'♥']:
+ tbl = RegexpTable()
+ tbl.update({'bar': i})
+ tbl.save(session=_session)
+
+ regexp_op = RegexpTable.bar.op('REGEXP')(regexp)
+ result = _session.query(RegexpTable).filter(regexp_op).all()
+ self.assertEqual([r.bar for r in result], expected)
+
+ def test_regexp_filter(self):
+ self._test_regexp_filter('10', ['10'])
+
+ def test_regexp_filter_nomatch(self):
+ self._test_regexp_filter('11', [])
+
+ def test_regexp_filter_unicode(self):
+ self._test_regexp_filter(u'♥', [u'♥'])
+
+ def test_regexp_filter_unicode_nomatch(self):
+ self._test_regexp_filter(u'♦', [])