summaryrefslogtreecommitdiffstats
path: root/nova/auth
diff options
context:
space:
mode:
authorVishvananda Ishaya <vishvananda@yahoo.com>2010-08-23 14:36:14 -0700
committerVishvananda Ishaya <vishvananda@yahoo.com>2010-08-23 14:36:14 -0700
commit157ef10b3048f0bb26ce0909d77698ffb37e45df (patch)
tree9e9c608678d7828526b93a0ac8a6b9d9ee0eea8a /nova/auth
parent78c2175898a468ae734e27dfbc8f5b70f90fd477 (diff)
parentcfe3b2a6dd73e56652f99a573c1bb0abe5a648d4 (diff)
merged trunk and fixed merge errors
Diffstat (limited to 'nova/auth')
-rw-r--r--nova/auth/fakeldap.py37
-rw-r--r--nova/auth/ldapdriver.py62
-rw-r--r--nova/auth/manager.py90
-rw-r--r--nova/auth/rbac.py38
-rw-r--r--nova/auth/signer.py51
5 files changed, 154 insertions, 124 deletions
diff --git a/nova/auth/fakeldap.py b/nova/auth/fakeldap.py
index bc744fa01..bfc3433c5 100644
--- a/nova/auth/fakeldap.py
+++ b/nova/auth/fakeldap.py
@@ -30,20 +30,23 @@ from nova import datastore
SCOPE_BASE = 0
SCOPE_ONELEVEL = 1 # not implemented
-SCOPE_SUBTREE = 2
+SCOPE_SUBTREE = 2
MOD_ADD = 0
MOD_DELETE = 1
-class NO_SUCH_OBJECT(Exception):
+class NO_SUCH_OBJECT(Exception): # pylint: disable-msg=C0103
+ """Duplicate exception class from real LDAP module."""
pass
-class OBJECT_CLASS_VIOLATION(Exception):
+class OBJECT_CLASS_VIOLATION(Exception): # pylint: disable-msg=C0103
+ """Duplicate exception class from real LDAP module."""
pass
-def initialize(uri):
+def initialize(_uri):
+ """Opens a fake connection with an LDAP server."""
return FakeLDAP()
@@ -68,7 +71,7 @@ def _match_query(query, attrs):
# cut off the ! and the nested parentheses
return not _match_query(query[2:-1], attrs)
- (k, sep, v) = inner.partition('=')
+ (k, _sep, v) = inner.partition('=')
return _match(k, v, attrs)
@@ -85,20 +88,20 @@ def _paren_groups(source):
if source[pos] == ')':
count -= 1
if count == 0:
- result.append(source[start:pos+1])
+ result.append(source[start:pos + 1])
return result
-def _match(k, v, attrs):
+def _match(key, value, attrs):
"""Match a given key and value against an attribute list."""
- if k not in attrs:
+ if key not in attrs:
return False
- if k != "objectclass":
- return v in attrs[k]
+ if key != "objectclass":
+ return value in attrs[key]
# it is an objectclass check, so check subclasses
- values = _subs(v)
- for value in values:
- if value in attrs[k]:
+ values = _subs(value)
+ for v in values:
+ if v in attrs[key]:
return True
return False
@@ -145,6 +148,7 @@ def _to_json(unencoded):
class FakeLDAP(object):
#TODO(vish): refactor this class to use a wrapper instead of accessing
# redis directly
+ """Fake LDAP connection."""
def simple_bind_s(self, dn, password):
"""This method is ignored, but provided for compatibility."""
@@ -207,6 +211,7 @@ class FakeLDAP(object):
# get the attributes from redis
attrs = redis.hgetall(key)
# turn the values from redis into lists
+ # pylint: disable-msg=E1103
attrs = dict([(k, _from_json(v))
for k, v in attrs.iteritems()])
# filter the objects by query
@@ -215,12 +220,12 @@ class FakeLDAP(object):
attrs = dict([(k, v) for k, v in attrs.iteritems()
if not fields or k in fields])
objects.append((key[len(self.__redis_prefix):], attrs))
+ # pylint: enable-msg=E1103
if objects == []:
raise NO_SUCH_OBJECT()
return objects
@property
- def __redis_prefix(self):
+ def __redis_prefix(self): # pylint: disable-msg=R0201
+ """Get the prefix to use for all redis keys."""
return 'ldap:'
-
-
diff --git a/nova/auth/ldapdriver.py b/nova/auth/ldapdriver.py
index 6bf7fcd1e..74ba011b5 100644
--- a/nova/auth/ldapdriver.py
+++ b/nova/auth/ldapdriver.py
@@ -34,7 +34,7 @@ from nova import flags
FLAGS = flags.FLAGS
flags.DEFINE_string('ldap_url', 'ldap://localhost',
'Point this at your ldap server')
-flags.DEFINE_string('ldap_password', 'changeme', 'LDAP password')
+flags.DEFINE_string('ldap_password', 'changeme', 'LDAP password')
flags.DEFINE_string('ldap_user_dn', 'cn=Manager,dc=example,dc=com',
'DN of admin user')
flags.DEFINE_string('ldap_user_unit', 'Users', 'OID for Users')
@@ -63,14 +63,18 @@ flags.DEFINE_string('ldap_developer',
# to define a set interface for AuthDrivers. I'm delaying
# creating this now because I'm expecting an auth refactor
# in which we may want to change the interface a bit more.
+
+
class LdapDriver(object):
"""Ldap Auth driver
Defines enter and exit and therefore supports the with/as syntax.
"""
+
def __init__(self):
"""Imports the LDAP module"""
self.ldap = __import__('ldap')
+ self.conn = None
def __enter__(self):
"""Creates the connection to LDAP"""
@@ -78,7 +82,7 @@ class LdapDriver(object):
self.conn.simple_bind_s(FLAGS.ldap_user_dn, FLAGS.ldap_password)
return self
- def __exit__(self, type, value, traceback):
+ def __exit__(self, exc_type, exc_value, traceback):
"""Destroys the connection to LDAP"""
self.conn.unbind_s()
return False
@@ -123,11 +127,11 @@ class LdapDriver(object):
def get_projects(self, uid=None):
"""Retrieve list of projects"""
- filter = '(objectclass=novaProject)'
+ pattern = '(objectclass=novaProject)'
if uid:
- filter = "(&%s(member=%s))" % (filter, self.__uid_to_dn(uid))
+ pattern = "(&%s(member=%s))" % (pattern, self.__uid_to_dn(uid))
attrs = self.__find_objects(FLAGS.ldap_project_subtree,
- filter)
+ pattern)
return [self.__to_project(attr) for attr in attrs]
def create_user(self, name, access_key, secret_key, is_admin):
@@ -194,8 +198,7 @@ class LdapDriver(object):
('cn', [name]),
('description', [description]),
('projectManager', [manager_dn]),
- ('member', members)
- ]
+ ('member', members)]
self.conn.add_s('cn=%s,%s' % (name, FLAGS.ldap_project_subtree), attr)
return self.__to_project(dict(attr))
@@ -287,7 +290,6 @@ class LdapDriver(object):
def __key_pair_exists(self, uid, key_name):
"""Check if key pair exists"""
- return self.get_user(uid) != None
return self.get_key_pair(uid, key_name) != None
def __project_exists(self, project_id):
@@ -310,7 +312,7 @@ class LdapDriver(object):
except self.ldap.NO_SUCH_OBJECT:
return []
# just return the DNs
- return [dn for dn, attributes in res]
+ return [dn for dn, _attributes in res]
def __find_objects(self, dn, query=None, scope=None):
"""Find objects by query"""
@@ -346,7 +348,8 @@ class LdapDriver(object):
for key in keys:
self.delete_key_pair(uid, key['name'])
- def __role_to_dn(self, role, project_id=None):
+ @staticmethod
+ def __role_to_dn(role, project_id=None):
"""Convert role to corresponding dn"""
if project_id == None:
return FLAGS.__getitem__("ldap_%s" % role).value
@@ -356,7 +359,7 @@ class LdapDriver(object):
FLAGS.ldap_project_subtree)
def __create_group(self, group_dn, name, uid,
- description, member_uids = None):
+ description, member_uids=None):
"""Create a group"""
if self.__group_exists(group_dn):
raise exception.Duplicate("Group can't be created because "
@@ -375,8 +378,7 @@ class LdapDriver(object):
('objectclass', ['groupOfNames']),
('cn', [name]),
('description', [description]),
- ('member', members)
- ]
+ ('member', members)]
self.conn.add_s(group_dn, attr)
def __is_in_group(self, uid, group_dn):
@@ -402,9 +404,7 @@ class LdapDriver(object):
if self.__is_in_group(uid, group_dn):
raise exception.Duplicate("User %s is already a member of "
"the group %s" % (uid, group_dn))
- attr = [
- (self.ldap.MOD_ADD, 'member', self.__uid_to_dn(uid))
- ]
+ attr = [(self.ldap.MOD_ADD, 'member', self.__uid_to_dn(uid))]
self.conn.modify_s(group_dn, attr)
def __remove_from_group(self, uid, group_dn):
@@ -432,7 +432,7 @@ class LdapDriver(object):
self.conn.modify_s(group_dn, attr)
except self.ldap.OBJECT_CLASS_VIOLATION:
logging.debug("Attempted to remove the last member of a group. "
- "Deleting the group at %s instead." % group_dn )
+ "Deleting the group at %s instead.", group_dn)
self.__delete_group(group_dn)
def __remove_from_all(self, uid):
@@ -440,7 +440,6 @@ class LdapDriver(object):
if not self.__user_exists(uid):
raise exception.NotFound("User %s can't be removed from all "
"because the user doesn't exist" % (uid,))
- dn = self.__uid_to_dn(uid)
role_dns = self.__find_group_dns_with_member(
FLAGS.role_project_subtree, uid)
for role_dn in role_dns:
@@ -448,7 +447,7 @@ class LdapDriver(object):
project_dns = self.__find_group_dns_with_member(
FLAGS.ldap_project_subtree, uid)
for project_dn in project_dns:
- self.__safe_remove_from_group(uid, role_dn)
+ self.__safe_remove_from_group(uid, project_dn)
def __delete_group(self, group_dn):
"""Delete Group"""
@@ -461,7 +460,8 @@ class LdapDriver(object):
for role_dn in self.__find_role_dns(project_dn):
self.__delete_group(role_dn)
- def __to_user(self, attr):
+ @staticmethod
+ def __to_user(attr):
"""Convert ldap attributes to User object"""
if attr == None:
return None
@@ -470,10 +470,10 @@ class LdapDriver(object):
'name': attr['cn'][0],
'access': attr['accessKey'][0],
'secret': attr['secretKey'][0],
- 'admin': (attr['isAdmin'][0] == 'TRUE')
- }
+ 'admin': (attr['isAdmin'][0] == 'TRUE')}
- def __to_key_pair(self, owner, attr):
+ @staticmethod
+ def __to_key_pair(owner, attr):
"""Convert ldap attributes to KeyPair object"""
if attr == None:
return None
@@ -482,8 +482,7 @@ class LdapDriver(object):
'name': attr['cn'][0],
'owner_id': owner,
'public_key': attr['sshPublicKey'][0],
- 'fingerprint': attr['keyFingerprint'][0],
- }
+ 'fingerprint': attr['keyFingerprint'][0]}
def __to_project(self, attr):
"""Convert ldap attributes to Project object"""
@@ -495,21 +494,22 @@ class LdapDriver(object):
'name': attr['cn'][0],
'project_manager_id': self.__dn_to_uid(attr['projectManager'][0]),
'description': attr.get('description', [None])[0],
- 'member_ids': [self.__dn_to_uid(x) for x in member_dns]
- }
+ 'member_ids': [self.__dn_to_uid(x) for x in member_dns]}
- def __dn_to_uid(self, dn):
+ @staticmethod
+ def __dn_to_uid(dn):
"""Convert user dn to uid"""
return dn.split(',')[0].split('=')[1]
- def __uid_to_dn(self, dn):
+ @staticmethod
+ def __uid_to_dn(dn):
"""Convert uid to dn"""
return 'uid=%s,%s' % (dn, FLAGS.ldap_user_subtree)
class FakeLdapDriver(LdapDriver):
"""Fake Ldap Auth driver"""
- def __init__(self):
+
+ def __init__(self): # pylint: disable-msg=W0231
__import__('nova.auth.fakeldap')
self.ldap = sys.modules['nova.auth.fakeldap']
-
diff --git a/nova/auth/manager.py b/nova/auth/manager.py
index 070c5508a..fc9aec071 100644
--- a/nova/auth/manager.py
+++ b/nova/auth/manager.py
@@ -23,7 +23,7 @@ Nova authentication management
import logging
import os
import shutil
-import string
+import string # pylint: disable-msg=W0402
import tempfile
import uuid
import zipfile
@@ -32,7 +32,6 @@ from nova import crypto
from nova import db
from nova import exception
from nova import flags
-from nova import models
from nova import utils
from nova.auth import signer
@@ -195,12 +194,12 @@ class Project(AuthBase):
@property
def vpn_ip(self):
- ip, port = AuthManager().get_project_vpn_data(self)
+ ip, _port = AuthManager().get_project_vpn_data(self)
return ip
@property
def vpn_port(self):
- ip, port = AuthManager().get_project_vpn_data(self)
+ _ip, port = AuthManager().get_project_vpn_data(self)
return port
def has_manager(self, user):
@@ -222,11 +221,9 @@ class Project(AuthBase):
return AuthManager().get_credentials(user, self)
def __repr__(self):
- return "Project('%s', '%s', '%s', '%s', %s)" % (self.id,
- self.name,
- self.project_manager_id,
- self.description,
- self.member_ids)
+ return "Project('%s', '%s', '%s', '%s', %s)" % \
+ (self.id, self.name, self.project_manager_id, self.description,
+ self.member_ids)
class AuthManager(object):
@@ -298,7 +295,7 @@ class AuthManager(object):
@return: User and project that the request represents.
"""
# TODO(vish): check for valid timestamp
- (access_key, sep, project_id) = access.partition(':')
+ (access_key, _sep, project_id) = access.partition(':')
logging.info('Looking up user: %r', access_key)
user = self.get_user_from_access_key(access_key)
@@ -321,7 +318,8 @@ class AuthManager(object):
raise exception.NotFound('User %s is not a member of project %s' %
(user.id, project.id))
if check_type == 's3':
- expected_signature = signer.Signer(user.secret.encode()).s3_authorization(headers, verb, path)
+ sign = signer.Signer(user.secret.encode())
+ expected_signature = sign.s3_authorization(headers, verb, path)
logging.debug('user.secret: %s', user.secret)
logging.debug('expected_signature: %s', expected_signature)
logging.debug('signature: %s', signature)
@@ -466,7 +464,8 @@ class AuthManager(object):
with self.driver() as drv:
drv.remove_role(User.safe_id(user), role, Project.safe_id(project))
- def get_roles(self, project_roles=True):
+ @staticmethod
+ def get_roles(project_roles=True):
"""Get list of allowed roles"""
if project_roles:
return list(set(FLAGS.allowed_roles) - set(FLAGS.global_roles))
@@ -519,10 +518,10 @@ class AuthManager(object):
if member_users:
member_users = [User.safe_id(u) for u in member_users]
with self.driver() as drv:
- project_dict = drv.create_project(name,
- User.safe_id(manager_user),
- description,
- member_users)
+ project_dict = drv.create_project(name,
+ User.safe_id(manager_user),
+ description,
+ member_users)
if project_dict:
project = Project(**project_dict)
# FIXME(ja): EVIL HACK
@@ -553,7 +552,8 @@ class AuthManager(object):
return drv.remove_from_project(User.safe_id(user),
Project.safe_id(project))
- def get_project_vpn_data(self, project, context=None):
+ @staticmethod
+ def get_project_vpn_data(project, context=None):
"""Gets vpn ip and port for project
@type project: Project or project_id
@@ -563,11 +563,9 @@ class AuthManager(object):
@return: A tuple containing (ip, port) or None, None if vpn has
not been allocated for user.
"""
- # FIXME(vish): this shouldn't be messing with the datamodel directly
- if not isinstance(project, Project):
- project = self.get_project(project)
-
- network_ref = db.project_get_network(context, project.id)
+
+ network_ref = db.project_get_network(context,
+ Project.safe_id(project))
if not network_ref['vpn_public_port']:
raise exception.NotFound('project network data has not been set')
@@ -577,9 +575,8 @@ class AuthManager(object):
def delete_project(self, project, context=None):
"""Deletes a project"""
# FIXME(ja): EVIL HACK
- if not isinstance(project, Project):
- project = self.get_project(project)
- network_ref = db.project_get_network(context, project.id)
+ network_ref = db.project_get_network(context,
+ Project.safe_id(project))
try:
db.network_destroy(context, network_ref['id'])
except:
@@ -632,8 +629,10 @@ class AuthManager(object):
@rtype: User
@return: The new user.
"""
- if access == None: access = str(uuid.uuid4())
- if secret == None: secret = str(uuid.uuid4())
+ if access == None:
+ access = str(uuid.uuid4())
+ if secret == None:
+ secret = str(uuid.uuid4())
with self.driver() as drv:
user_dict = drv.create_user(name, access, secret, admin)
if user_dict:
@@ -675,10 +674,10 @@ class AuthManager(object):
def create_key_pair(self, user, key_name, public_key, fingerprint):
"""Creates a key pair for user"""
with self.driver() as drv:
- kp_dict = drv.create_key_pair(User.safe_id(user),
- key_name,
- public_key,
- fingerprint)
+ kp_dict = drv.create_key_pair(User.safe_id(user),
+ key_name,
+ public_key,
+ fingerprint)
if kp_dict:
return KeyPair(**kp_dict)
@@ -721,7 +720,7 @@ class AuthManager(object):
(vpn_ip, vpn_port) = self.get_project_vpn_data(project)
if vpn_ip:
- configfile = open(FLAGS.vpn_client_template,"r")
+ configfile = open(FLAGS.vpn_client_template, "r")
s = string.Template(configfile.read())
configfile.close()
config = s.substitute(keyfile=FLAGS.credential_key_file,
@@ -736,10 +735,10 @@ class AuthManager(object):
zippy.writestr(FLAGS.ca_file, crypto.fetch_ca(user.id))
zippy.close()
with open(zf, 'rb') as f:
- buffer = f.read()
+ read_buffer = f.read()
shutil.rmtree(tmpdir)
- return buffer
+ return read_buffer
def get_environment_rc(self, user, project=None):
"""Get credential zip for user in project"""
@@ -750,18 +749,18 @@ class AuthManager(object):
pid = Project.safe_id(project)
return self.__generate_rc(user.access, user.secret, pid)
- def __generate_rc(self, access, secret, pid):
+ @staticmethod
+ def __generate_rc(access, secret, pid):
"""Generate rc file for user"""
rc = open(FLAGS.credentials_template).read()
- rc = rc % { 'access': access,
- 'project': pid,
- 'secret': secret,
- 'ec2': FLAGS.ec2_url,
- 's3': 'http://%s:%s' % (FLAGS.s3_host, FLAGS.s3_port),
- 'nova': FLAGS.ca_file,
- 'cert': FLAGS.credential_cert_file,
- 'key': FLAGS.credential_key_file,
- }
+ rc = rc % {'access': access,
+ 'project': pid,
+ 'secret': secret,
+ 'ec2': FLAGS.ec2_url,
+ 's3': 'http://%s:%s' % (FLAGS.s3_host, FLAGS.s3_port),
+ 'nova': FLAGS.ca_file,
+ 'cert': FLAGS.credential_cert_file,
+ 'key': FLAGS.credential_key_file}
return rc
def _generate_x509_cert(self, uid, pid):
@@ -772,6 +771,7 @@ class AuthManager(object):
signed_cert = crypto.sign_csr(csr, pid)
return (private_key, signed_cert)
- def __cert_subject(self, uid):
+ @staticmethod
+ def __cert_subject(uid):
"""Helper to generate cert subject"""
return FLAGS.credential_cert_subject % (uid, utils.isotime())
diff --git a/nova/auth/rbac.py b/nova/auth/rbac.py
index 1446e4e27..d157f44b3 100644
--- a/nova/auth/rbac.py
+++ b/nova/auth/rbac.py
@@ -16,40 +16,54 @@
# License for the specific language governing permissions and limitations
# under the License.
+"""Role-based access control decorators to use fpr wrapping other
+methods with."""
+
from nova import exception
-from nova.auth import manager
def allow(*roles):
- def wrap(f):
- def wrapped_f(self, context, *args, **kwargs):
+ """Allow the given roles access the wrapped function."""
+
+ def wrap(func): # pylint: disable-msg=C0111
+
+ def wrapped_func(self, context, *args,
+ **kwargs): # pylint: disable-msg=C0111
if context.user.is_superuser():
- return f(self, context, *args, **kwargs)
+ return func(self, context, *args, **kwargs)
for role in roles:
if __matches_role(context, role):
- return f(self, context, *args, **kwargs)
+ return func(self, context, *args, **kwargs)
raise exception.NotAuthorized()
- return wrapped_f
+
+ return wrapped_func
+
return wrap
def deny(*roles):
- def wrap(f):
- def wrapped_f(self, context, *args, **kwargs):
+ """Deny the given roles access the wrapped function."""
+
+ def wrap(func): # pylint: disable-msg=C0111
+
+ def wrapped_func(self, context, *args,
+ **kwargs): # pylint: disable-msg=C0111
if context.user.is_superuser():
- return f(self, context, *args, **kwargs)
+ return func(self, context, *args, **kwargs)
for role in roles:
if __matches_role(context, role):
raise exception.NotAuthorized()
- return f(self, context, *args, **kwargs)
- return wrapped_f
+ return func(self, context, *args, **kwargs)
+
+ return wrapped_func
+
return wrap
def __matches_role(context, role):
+ """Check if a role is allowed."""
if role == 'all':
return True
if role == 'none':
return False
return context.project.has_role(context.user.id, role)
-
diff --git a/nova/auth/signer.py b/nova/auth/signer.py
index 8334806d2..f7d29f534 100644
--- a/nova/auth/signer.py
+++ b/nova/auth/signer.py
@@ -50,15 +50,15 @@ import logging
import urllib
# NOTE(vish): for new boto
-import boto
+import boto
# NOTE(vish): for old boto
-import boto.utils
+import boto.utils
from nova.exception import Error
class Signer(object):
- """ hacked up code from boto/connection.py """
+ """Hacked up code from boto/connection.py"""
def __init__(self, secret_key):
self.hmac = hmac.new(secret_key, digestmod=hashlib.sha1)
@@ -66,22 +66,27 @@ class Signer(object):
self.hmac_256 = hmac.new(secret_key, digestmod=hashlib.sha256)
def s3_authorization(self, headers, verb, path):
+ """Generate S3 authorization string."""
c_string = boto.utils.canonical_string(verb, path, headers)
- hmac = self.hmac.copy()
- hmac.update(c_string)
- b64_hmac = base64.encodestring(hmac.digest()).strip()
+ hmac_copy = self.hmac.copy()
+ hmac_copy.update(c_string)
+ b64_hmac = base64.encodestring(hmac_copy.digest()).strip()
return b64_hmac
def generate(self, params, verb, server_string, path):
+ """Generate auth string according to what SignatureVersion is given."""
if params['SignatureVersion'] == '0':
return self._calc_signature_0(params)
if params['SignatureVersion'] == '1':
return self._calc_signature_1(params)
if params['SignatureVersion'] == '2':
return self._calc_signature_2(params, verb, server_string, path)
- raise Error('Unknown Signature Version: %s' % self.SignatureVersion)
+ raise Error('Unknown Signature Version: %s' %
+ params['SignatureVersion'])
- def _get_utf8_value(self, value):
+ @staticmethod
+ def _get_utf8_value(value):
+ """Get the UTF8-encoded version of a value."""
if not isinstance(value, str) and not isinstance(value, unicode):
value = str(value)
if isinstance(value, unicode):
@@ -90,10 +95,11 @@ class Signer(object):
return value
def _calc_signature_0(self, params):
+ """Generate AWS signature version 0 string."""
s = params['Action'] + params['Timestamp']
self.hmac.update(s)
keys = params.keys()
- keys.sort(cmp = lambda x, y: cmp(x.lower(), y.lower()))
+ keys.sort(cmp=lambda x, y: cmp(x.lower(), y.lower()))
pairs = []
for key in keys:
val = self._get_utf8_value(params[key])
@@ -101,8 +107,9 @@ class Signer(object):
return base64.b64encode(self.hmac.digest())
def _calc_signature_1(self, params):
+ """Generate AWS signature version 1 string."""
keys = params.keys()
- keys.sort(cmp = lambda x, y: cmp(x.lower(), y.lower()))
+ keys.sort(cmp=lambda x, y: cmp(x.lower(), y.lower()))
pairs = []
for key in keys:
self.hmac.update(key)
@@ -112,30 +119,34 @@ class Signer(object):
return base64.b64encode(self.hmac.digest())
def _calc_signature_2(self, params, verb, server_string, path):
+ """Generate AWS signature version 2 string."""
logging.debug('using _calc_signature_2')
string_to_sign = '%s\n%s\n%s\n' % (verb, server_string, path)
if self.hmac_256:
- hmac = self.hmac_256
+ current_hmac = self.hmac_256
params['SignatureMethod'] = 'HmacSHA256'
else:
- hmac = self.hmac
+ current_hmac = self.hmac
params['SignatureMethod'] = 'HmacSHA1'
keys = params.keys()
keys.sort()
pairs = []
for key in keys:
val = self._get_utf8_value(params[key])
- pairs.append(urllib.quote(key, safe='') + '=' + urllib.quote(val, safe='-_~'))
+ val = urllib.quote(val, safe='-_~')
+ pairs.append(urllib.quote(key, safe='') + '=' + val)
qs = '&'.join(pairs)
- logging.debug('query string: %s' % qs)
+ logging.debug('query string: %s', qs)
string_to_sign += qs
- logging.debug('string_to_sign: %s' % string_to_sign)
- hmac.update(string_to_sign)
- b64 = base64.b64encode(hmac.digest())
- logging.debug('len(b64)=%d' % len(b64))
- logging.debug('base64 encoded digest: %s' % b64)
+ logging.debug('string_to_sign: %s', string_to_sign)
+ current_hmac.update(string_to_sign)
+ b64 = base64.b64encode(current_hmac.digest())
+ logging.debug('len(b64)=%d', len(b64))
+ logging.debug('base64 encoded digest: %s', b64)
return b64
if __name__ == '__main__':
- print Signer('foo').generate({"SignatureMethod": 'HmacSHA256', 'SignatureVersion': '2'}, "get", "server", "/foo")
+ print Signer('foo').generate({'SignatureMethod': 'HmacSHA256',
+ 'SignatureVersion': '2'},
+ 'get', 'server', '/foo')