Improve PEP8 and readability
This commit is contained in:
@@ -14,12 +14,19 @@ except Exception, e:
|
||||
raise e
|
||||
|
||||
|
||||
def ldap_auth(server='ldap', port=None,
|
||||
def ldap_auth(server='ldap',
|
||||
port=None,
|
||||
base_dn='ou=users,dc=domain,dc=com',
|
||||
mode='uid', secure=False,
|
||||
cert_path=None, cert_file=None,
|
||||
cacert_path=None, cacert_file=None, key_file=None,
|
||||
bind_dn=None, bind_pw=None, filterstr='objectClass=*',
|
||||
mode='uid',
|
||||
secure=False,
|
||||
cert_path=None,
|
||||
cert_file=None,
|
||||
cacert_path=None,
|
||||
cacert_file=None,
|
||||
key_file=None,
|
||||
bind_dn=None,
|
||||
bind_pw=None,
|
||||
filterstr='objectClass=*',
|
||||
username_attrib='uid',
|
||||
custom_scope='subtree',
|
||||
allowed_groups=None,
|
||||
@@ -196,8 +203,7 @@ def ldap_auth(server='ldap', port=None,
|
||||
logger.warning('blank password not allowed')
|
||||
return False
|
||||
logger.debug('mode: [%s] manage_user: [%s] custom_scope: [%s]'
|
||||
' manage_groups: [%s]' % (str(mode), str(manage_user),
|
||||
str(custom_scope), str(manage_groups)))
|
||||
' manage_groups: [%s]' % (str(mode), str(manage_user), str(custom_scope), str(manage_groups)))
|
||||
if manage_user:
|
||||
if user_firstname_attrib.count(':') > 0:
|
||||
(user_firstname_attrib,
|
||||
@@ -246,14 +252,10 @@ def ldap_auth(server='ldap', port=None,
|
||||
# in the ldap_basedn
|
||||
requested_attrs = ['sAMAccountName']
|
||||
if manage_user:
|
||||
requested_attrs.extend([user_firstname_attrib,
|
||||
user_lastname_attrib,
|
||||
user_mail_attrib])
|
||||
requested_attrs.extend([user_firstname_attrib, user_lastname_attrib, user_mail_attrib])
|
||||
result = con.search_ext_s(
|
||||
ldap_basedn, ldap.SCOPE_SUBTREE,
|
||||
"(&(sAMAccountName=%s)(%s))" % (
|
||||
ldap.filter.escape_filter_chars(username_bare),
|
||||
filterstr),
|
||||
"(&(sAMAccountName=%s)(%s))" % (ldap.filter.escape_filter_chars(username_bare), filterstr),
|
||||
requested_attrs)[0][1]
|
||||
if not isinstance(result, dict):
|
||||
# result should be a dict in the form
|
||||
@@ -286,25 +288,21 @@ def ldap_auth(server='ldap', port=None,
|
||||
if manage_user:
|
||||
result = con.search_s(dn, ldap.SCOPE_BASE,
|
||||
"(objectClass=*)",
|
||||
[user_firstname_attrib,
|
||||
user_lastname_attrib,
|
||||
user_mail_attrib])[0][1]
|
||||
[user_firstname_attrib, user_lastname_attrib, user_mail_attrib])[0][1]
|
||||
|
||||
if ldap_mode == 'uid':
|
||||
# OpenLDAP (UID)
|
||||
if ldap_binddn and ldap_bindpw:
|
||||
con.simple_bind_s(ldap_binddn, ldap_bindpw)
|
||||
dn = "uid=" + username + "," + ldap_basedn
|
||||
dn = con.search_s(ldap_basedn, ldap.SCOPE_SUBTREE, "(uid=%s)"%username, [''])[0][0]
|
||||
dn = con.search_s(ldap_basedn, ldap.SCOPE_SUBTREE, "(uid=%s)" % username, [''])[0][0]
|
||||
else:
|
||||
dn = "uid=" + username + "," + ldap_basedn
|
||||
con.simple_bind_s(dn, password)
|
||||
if manage_user:
|
||||
result = con.search_s(dn, ldap.SCOPE_BASE,
|
||||
"(objectClass=*)",
|
||||
[user_firstname_attrib,
|
||||
user_lastname_attrib,
|
||||
user_mail_attrib])[0][1]
|
||||
[user_firstname_attrib, user_lastname_attrib, user_mail_attrib])[0][1]
|
||||
|
||||
if ldap_mode == 'company':
|
||||
# no DNs or password needed to search directory
|
||||
@@ -319,9 +317,7 @@ def ldap_auth(server='ldap', port=None,
|
||||
# find the uid
|
||||
attrs = ['uid']
|
||||
if manage_user:
|
||||
attrs.extend([user_firstname_attrib,
|
||||
user_lastname_attrib,
|
||||
user_mail_attrib])
|
||||
attrs.extend([user_firstname_attrib, user_lastname_attrib, user_mail_attrib])
|
||||
# perform the actual search
|
||||
company_search_result = con.search_s(ldap_basedn,
|
||||
ldap.SCOPE_SUBTREE,
|
||||
@@ -337,13 +333,11 @@ def ldap_auth(server='ldap', port=None,
|
||||
basedns = ldap_basedn
|
||||
else:
|
||||
basedns = [ldap_basedn]
|
||||
filter = '(&(uid=%s)(%s))' % (
|
||||
ldap.filter.escape_filter_chars(username), filterstr)
|
||||
filter = '(&(uid=%s)(%s))' % (ldap.filter.escape_filter_chars(username), filterstr)
|
||||
found = False
|
||||
for basedn in basedns:
|
||||
try:
|
||||
result = con.search_s(basedn, ldap.SCOPE_SUBTREE,
|
||||
filter)
|
||||
result = con.search_s(basedn, ldap.SCOPE_SUBTREE, filter)
|
||||
if result:
|
||||
user_dn = result[0][0]
|
||||
# Check the password
|
||||
@@ -352,9 +346,10 @@ def ldap_auth(server='ldap', port=None,
|
||||
break
|
||||
except ldap.LDAPError, detail:
|
||||
(exc_type, exc_value) = sys.exc_info()[:2]
|
||||
logger.warning(
|
||||
"ldap_auth: searching %s for %s resulted in %s: %s\n" %
|
||||
(basedn, filter, exc_type, exc_value)
|
||||
logger.warning("ldap_auth: searching %s for %s resulted in %s: %s\n" % (basedn,
|
||||
filter,
|
||||
exc_type,
|
||||
exc_value)
|
||||
)
|
||||
if not found:
|
||||
logger.warning('User [%s] not found!' % username)
|
||||
@@ -367,10 +362,7 @@ def ldap_auth(server='ldap', port=None,
|
||||
basedns = ldap_basedn
|
||||
else:
|
||||
basedns = [ldap_basedn]
|
||||
filter = '(&(%s=%s)(%s))' % (username_attrib,
|
||||
ldap.filter.escape_filter_chars(
|
||||
username),
|
||||
filterstr)
|
||||
filter = '(&(%s=%s)(%s))' % (username_attrib, ldap.filter.escape_filter_chars(username), filterstr)
|
||||
if custom_scope == 'subtree':
|
||||
ldap_scope = ldap.SCOPE_SUBTREE
|
||||
elif custom_scope == 'base':
|
||||
@@ -389,9 +381,10 @@ def ldap_auth(server='ldap', port=None,
|
||||
break
|
||||
except ldap.LDAPError, detail:
|
||||
(exc_type, exc_value) = sys.exc_info()[:2]
|
||||
logger.warning(
|
||||
"ldap_auth: searching %s for %s resulted in %s: %s\n" %
|
||||
(basedn, filter, exc_type, exc_value)
|
||||
logger.warning("ldap_auth: searching %s for %s resulted in %s: %s\n" % (basedn,
|
||||
filter,
|
||||
exc_type,
|
||||
exc_value)
|
||||
)
|
||||
if not found:
|
||||
logger.warning('User [%s] not found!' % username)
|
||||
@@ -401,16 +394,14 @@ def ldap_auth(server='ldap', port=None,
|
||||
logger.info('[%s] Manage user data' % str(username))
|
||||
try:
|
||||
if user_firstname_part is not None:
|
||||
store_user_firstname = result[user_firstname_attrib][
|
||||
0].split(' ', 1)[user_firstname_part]
|
||||
store_user_firstname = result[user_firstname_attrib][0].split(' ', 1)[user_firstname_part]
|
||||
else:
|
||||
store_user_firstname = result[user_firstname_attrib][0]
|
||||
except KeyError, e:
|
||||
store_user_firstname = None
|
||||
try:
|
||||
if user_lastname_part is not None:
|
||||
store_user_lastname = result[user_lastname_attrib][
|
||||
0].split(' ', 1)[user_lastname_part]
|
||||
store_user_lastname = result[user_lastname_attrib][0].split(' ', 1)[user_lastname_part]
|
||||
else:
|
||||
store_user_lastname = result[user_lastname_attrib][0]
|
||||
except KeyError, e:
|
||||
@@ -480,9 +471,7 @@ def ldap_auth(server='ldap', port=None,
|
||||
# No match
|
||||
return False
|
||||
|
||||
def do_manage_groups(username,
|
||||
password=None,
|
||||
db=db):
|
||||
def do_manage_groups(username, password=None, db=db):
|
||||
"""
|
||||
Manage user groups
|
||||
|
||||
@@ -502,23 +491,19 @@ def ldap_auth(server='ldap', port=None,
|
||||
# Get all group name where the user is in actually in local db
|
||||
# #############################################################
|
||||
try:
|
||||
db_user_id = db(db.auth_user.username == username).select(
|
||||
db.auth_user.id).first().id
|
||||
db_user_id = db(db.auth_user.username == username).select(db.auth_user.id).first().id
|
||||
except:
|
||||
try:
|
||||
db_user_id = db(db.auth_user.email == username).select(
|
||||
db.auth_user.id).first().id
|
||||
db_user_id = db(db.auth_user.email == username).select(db.auth_user.id).first().id
|
||||
except AttributeError, e:
|
||||
#
|
||||
# There is no user in local db
|
||||
# We create one
|
||||
# ##############################
|
||||
try:
|
||||
db_user_id = db.auth_user.insert(username=username,
|
||||
first_name=username)
|
||||
db_user_id = db.auth_user.insert(username=username, first_name=username)
|
||||
except AttributeError, e:
|
||||
db_user_id = db.auth_user.insert(email=username,
|
||||
first_name=username)
|
||||
db_user_id = db.auth_user.insert(email=username, first_name=username)
|
||||
if not db_user_id:
|
||||
logging.error(
|
||||
'There is no username or email for %s!' % username)
|
||||
@@ -526,27 +511,23 @@ def ldap_auth(server='ldap', port=None,
|
||||
# if old pydal version, assume this is a relational database which can do joins
|
||||
db_can_join = db.can_join() if hasattr(db, 'can_join') else True
|
||||
if db_can_join:
|
||||
db_group_search = db(
|
||||
(db.auth_membership.user_id == db_user_id) &
|
||||
(db.auth_user.id == db.auth_membership.user_id) &
|
||||
(db.auth_group.id == db.auth_membership.group_id))
|
||||
db_group_search = \
|
||||
db((db.auth_membership.user_id == db_user_id) &
|
||||
(db.auth_user.id == db.auth_membership.user_id) &
|
||||
(db.auth_group.id == db.auth_membership.group_id))
|
||||
else:
|
||||
# no joins on NoSQL databases, perform two queries
|
||||
db_group_search = db(db.auth_membership.user_id == db_user_id)
|
||||
group_ids = [x.group_id for x in db_group_search.select(
|
||||
db.auth_membership.group_id, distinct=True)]
|
||||
group_ids = [x.group_id for x in db_group_search.select(db.auth_membership.group_id, distinct=True)]
|
||||
db_group_search = db(db.auth_group.id.belongs(group_ids))
|
||||
db_groups_of_the_user = list()
|
||||
db_group_id = dict()
|
||||
|
||||
if db_group_search.count() > 0:
|
||||
for group in db_group_search.select(db.auth_group.id,
|
||||
db.auth_group.role,
|
||||
distinct=True):
|
||||
for group in db_group_search.select(db.auth_group.id, db.auth_group.role, distinct=True):
|
||||
db_group_id[group.role] = group.id
|
||||
db_groups_of_the_user.append(group.role)
|
||||
logging.debug('db groups of user %s: %s' %
|
||||
(username, str(db_groups_of_the_user)))
|
||||
logging.debug('db groups of user %s: %s' % (username, str(db_groups_of_the_user)))
|
||||
|
||||
#
|
||||
# Delete user membership from groups where user is not anymore
|
||||
@@ -554,8 +535,7 @@ def ldap_auth(server='ldap', port=None,
|
||||
for group_to_del in db_groups_of_the_user:
|
||||
if ldap_groups_of_the_user.count(group_to_del) == 0:
|
||||
db((db.auth_membership.user_id == db_user_id) &
|
||||
(db.auth_membership.group_id == \
|
||||
db_group_id[group_to_del])).delete()
|
||||
(db.auth_membership.group_id == db_group_id[group_to_del])).delete()
|
||||
|
||||
#
|
||||
# Create user membership in groups where user is not in already
|
||||
@@ -563,16 +543,12 @@ def ldap_auth(server='ldap', port=None,
|
||||
for group_to_add in ldap_groups_of_the_user:
|
||||
if db_groups_of_the_user.count(group_to_add) == 0:
|
||||
if db(db.auth_group.role == group_to_add).count() == 0:
|
||||
gid = db.auth_group.insert(role=group_to_add,
|
||||
description='Generated from LDAP')
|
||||
gid = db.auth_group.insert(role=group_to_add, description='Generated from LDAP')
|
||||
else:
|
||||
gid = db(db.auth_group.role == group_to_add).select(
|
||||
db.auth_group.id).first().id
|
||||
db.auth_membership.insert(user_id=db_user_id,
|
||||
group_id=gid)
|
||||
gid = db(db.auth_group.role == group_to_add).select(db.auth_group.id).first().id
|
||||
db.auth_membership.insert(user_id=db_user_id, group_id=gid)
|
||||
except:
|
||||
logger.warning("[%s] Groups are not managed successfully!" %
|
||||
str(username))
|
||||
logger.warning("[%s] Groups are not managed successfully!" % str(username))
|
||||
import traceback
|
||||
logger.debug(traceback.format_exc())
|
||||
return False
|
||||
@@ -663,10 +639,12 @@ def ldap_auth(server='ldap', port=None,
|
||||
con.simple_bind_s(username, password)
|
||||
logger.debug('Ldap username connect...')
|
||||
# We have to use the full string
|
||||
username = con.search_ext_s(base_dn, ldap.SCOPE_SUBTREE,
|
||||
"(&(sAMAccountName=%s)(%s))" %
|
||||
(ldap.filter.escape_filter_chars(username_bare),
|
||||
filterstr), ["cn"])[0][0]
|
||||
username = \
|
||||
con.search_ext_s(base_dn,
|
||||
ldap.SCOPE_SUBTREE,
|
||||
"(&(sAMAccountName=%s)(%s))" % (ldap.filter.escape_filter_chars(username_bare),
|
||||
filterstr),
|
||||
["cn"])[0][0]
|
||||
else:
|
||||
if ldap_binddn:
|
||||
# need to search directory with an bind_dn account 1st
|
||||
@@ -679,14 +657,10 @@ def ldap_auth(server='ldap', port=None,
|
||||
if username is None:
|
||||
return list()
|
||||
# search for groups where user is in
|
||||
filter = '(&(%s=%s)(%s))' % (ldap.filter.escape_filter_chars(
|
||||
group_member_attrib
|
||||
),
|
||||
filter = '(&(%s=%s)(%s))' % (ldap.filter.escape_filter_chars(group_member_attrib),
|
||||
ldap.filter.escape_filter_chars(username),
|
||||
group_filterstr)
|
||||
group_search_result = con.search_s(group_dn,
|
||||
ldap.SCOPE_SUBTREE,
|
||||
filter, [group_name_attrib])
|
||||
group_search_result = con.search_s(group_dn, ldap.SCOPE_SUBTREE, filter, [group_name_attrib])
|
||||
ldap_groups_of_the_user = list()
|
||||
for group_row in group_search_result:
|
||||
group = group_row[1]
|
||||
|
||||
Reference in New Issue
Block a user