improved ldap_auth.py, thanks Kory
This commit is contained in:
@@ -1 +1 @@
|
||||
Version 2.00.0 (2012-07-11 09:17:27) dev
|
||||
Version 2.00.0 (2012-07-11 18:10:46) dev
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
# last tinkered with by korylprince at gmail.com on 2012-04-5
|
||||
# last tinkered with by korylprince at gmail.com on 2012-07-11
|
||||
#
|
||||
|
||||
import sys
|
||||
@@ -8,12 +8,12 @@ import logging
|
||||
try:
|
||||
import ldap
|
||||
import ldap.filter
|
||||
ldap.set_option( ldap.OPT_REFERRALS, 0 )
|
||||
ldap.set_option(ldap.OPT_REFERRALS, 0)
|
||||
except Exception, e:
|
||||
logging.error( 'missing ldap, try "easy_install python-ldap"' )
|
||||
logging.error('missing ldap, try "easy_install python-ldap"')
|
||||
raise e
|
||||
|
||||
def ldap_auth( server = 'ldap', port = None,
|
||||
def ldap_auth(server = 'ldap', port = None,
|
||||
base_dn = 'ou=users,dc=domain,dc=com',
|
||||
mode = 'uid', secure = False, cert_path = None, cert_file = None,
|
||||
bind_dn = None, bind_pw = None, filterstr = 'objectClass=*',
|
||||
@@ -30,7 +30,7 @@ def ldap_auth( server = 'ldap', port = None,
|
||||
group_name_attrib = 'cn',
|
||||
group_member_attrib = 'memberUid',
|
||||
group_filterstr = 'objectClass=*',
|
||||
logging_level = 'error' ):
|
||||
logging_level = 'error'):
|
||||
|
||||
"""
|
||||
to use ldap login with MS Active Directory:
|
||||
@@ -88,7 +88,7 @@ def ldap_auth( server = 'ldap', port = None,
|
||||
user_firstname_attrib = 'cn:1',
|
||||
user_lastname_attrib = 'cn:2',
|
||||
user_mail_attrib = 'mail'
|
||||
))
|
||||
))
|
||||
|
||||
Where:
|
||||
manage_user - let web2py handle user data from ldap
|
||||
@@ -110,7 +110,7 @@ def ldap_auth( server = 'ldap', port = None,
|
||||
group_name_attrib = 'cn',
|
||||
group_member_attrib = 'memberUid',
|
||||
group_filterstr = 'objectClass=*'
|
||||
))
|
||||
))
|
||||
|
||||
Where:
|
||||
manage_group - let web2py handle the groups from ldap
|
||||
@@ -128,7 +128,7 @@ def ldap_auth( server = 'ldap', port = None,
|
||||
group_name_attrib = 'cn',
|
||||
group_member_attrib = 'memberUid', # use 'member' for Active Directory
|
||||
group_filterstr = 'objectClass=*'
|
||||
))
|
||||
))
|
||||
|
||||
Where:
|
||||
allowed_groups - a list with allowed ldap group names
|
||||
@@ -141,16 +141,16 @@ def ldap_auth( server = 'ldap', port = None,
|
||||
You can set the logging level with the "logging_level" parameter, default
|
||||
is "error" and can be set to error, warning, info, debug.
|
||||
"""
|
||||
logger = logging.getLogger( 'web2py.auth.ldap_auth' )
|
||||
logger = logging.getLogger('web2py.auth.ldap_auth')
|
||||
if logging_level == 'error':
|
||||
logger.setLevel( logging.ERROR )
|
||||
logger.setLevel(logging.ERROR)
|
||||
elif logging_level == 'warning':
|
||||
logger.setLevel( logging.WARNING )
|
||||
logger.setLevel(logging.WARNING)
|
||||
elif logging_level == 'info':
|
||||
logger.setLevel( logging.INFO )
|
||||
logger.setLevel(logging.INFO)
|
||||
elif logging_level == 'debug':
|
||||
logger.setLevel( logging.DEBUG )
|
||||
def ldap_auth_aux( username,
|
||||
logger.setLevel(logging.DEBUG)
|
||||
def ldap_auth_aux(username,
|
||||
password,
|
||||
ldap_server = server,
|
||||
ldap_port = port,
|
||||
@@ -170,74 +170,77 @@ def ldap_auth( server = 'ldap', port = None,
|
||||
user_mail_attrib = user_mail_attrib,
|
||||
manage_groups = manage_groups,
|
||||
allowed_groups = allowed_groups,
|
||||
db = db ):
|
||||
logger.debug( 'mode: [%s] manage_user: [%s] custom_scope: [%s] manage_groups: [%s]' % (
|
||||
str( mode ), str( manage_user ), str( custom_scope ), str( manage_groups ) ) )
|
||||
db = db):
|
||||
if password == '':
|
||||
logger.warning('blank password not allowed')
|
||||
return False
|
||||
logger.debug('mode: [%s] manage_user: [%s] custom_scope: [%s] manage_groups: [%s]' % (
|
||||
str(mode), str(manage_user), str(custom_scope), str(manage_groups)))
|
||||
if manage_user:
|
||||
if user_firstname_attrib.count( ':' ) > 0:
|
||||
( user_firstname_attrib, user_firstname_part ) = user_firstname_attrib.split( ':', 1 )
|
||||
user_firstname_part = ( int( user_firstname_part ) - 1 )
|
||||
if user_firstname_attrib.count(':') > 0:
|
||||
(user_firstname_attrib, user_firstname_part) = user_firstname_attrib.split(':', 1)
|
||||
user_firstname_part = (int(user_firstname_part) - 1)
|
||||
else:
|
||||
user_firstname_part = None
|
||||
if user_lastname_attrib.count( ':' ) > 0:
|
||||
( user_lastname_attrib, user_lastname_part ) = user_lastname_attrib.split( ':', 1 )
|
||||
user_lastname_part = ( int( user_lastname_part ) - 1 )
|
||||
if user_lastname_attrib.count(':') > 0:
|
||||
(user_lastname_attrib, user_lastname_part) = user_lastname_attrib.split(':', 1)
|
||||
user_lastname_part = (int(user_lastname_part) - 1)
|
||||
else:
|
||||
user_lastname_part = None
|
||||
user_firstname_attrib = ldap.filter.escape_filter_chars( user_firstname_attrib )
|
||||
user_lastname_attrib = ldap.filter.escape_filter_chars( user_lastname_attrib )
|
||||
user_mail_attrib = ldap.filter.escape_filter_chars( user_mail_attrib )
|
||||
user_firstname_attrib = ldap.filter.escape_filter_chars(user_firstname_attrib)
|
||||
user_lastname_attrib = ldap.filter.escape_filter_chars(user_lastname_attrib)
|
||||
user_mail_attrib = ldap.filter.escape_filter_chars(user_mail_attrib)
|
||||
try:
|
||||
if allowed_groups:
|
||||
if not is_user_in_allowed_groups( username, password ):
|
||||
if not is_user_in_allowed_groups(username, password):
|
||||
return False
|
||||
con = init_ldap()
|
||||
if ldap_mode == 'ad':
|
||||
# Microsoft Active Directory
|
||||
if '@' not in username:
|
||||
domain = []
|
||||
for x in ldap_basedn.split( ',' ):
|
||||
for x in ldap_basedn.split(','):
|
||||
if "DC=" in x.upper():
|
||||
domain.append( x.split( '=' )[-1] )
|
||||
username = "%s@%s" % ( username, '.'.join( domain ) )
|
||||
username_bare = username.split( "@" )[0]
|
||||
con.set_option( ldap.OPT_PROTOCOL_VERSION, 3 )
|
||||
domain.append(x.split('=')[-1])
|
||||
username = "%s@%s" % (username, '.'.join(domain))
|
||||
username_bare = username.split("@")[0]
|
||||
con.set_option(ldap.OPT_PROTOCOL_VERSION, 3)
|
||||
# In cases where ForestDnsZones and DomainDnsZones are found,
|
||||
# result will look like the following:
|
||||
# ['ldap://ForestDnsZones.domain.com/DC=ForestDnsZones,DC=domain,DC=com']
|
||||
if ldap_binddn:
|
||||
# need to search directory with an admin account 1st
|
||||
con.simple_bind_s( ldap_binddn, ldap_bindpw )
|
||||
con.simple_bind_s(ldap_binddn, ldap_bindpw)
|
||||
else:
|
||||
# credentials should be in the form of username@domain.tld
|
||||
con.simple_bind_s( username, password )
|
||||
con.simple_bind_s(username, password)
|
||||
# this will throw an index error if the account is not found
|
||||
# in the ldap_basedn
|
||||
requested_attrs = ['sAMAccountName']
|
||||
if manage_user:
|
||||
requested_attrs.extend( [user_firstname_attrib,
|
||||
requested_attrs.extend([user_firstname_attrib,
|
||||
user_lastname_attrib,
|
||||
user_mail_attrib] )
|
||||
result = con.search_ext_s(
|
||||
user_mail_attrib])
|
||||
result = con.search_ext_s(
|
||||
ldap_basedn, ldap.SCOPE_SUBTREE,
|
||||
"(&(sAMAccountName=%s)(%s))" % ( ldap.filter.escape_filter_chars( username_bare ),
|
||||
filterstr ),
|
||||
requested_attrs )[0][1]
|
||||
if not isinstance( result, dict ):
|
||||
"(&(sAMAccountName=%s)(%s))" % (ldap.filter.escape_filter_chars(username_bare),
|
||||
filterstr),
|
||||
requested_attrs)[0][1]
|
||||
if not isinstance(result, dict):
|
||||
# result should be a dict in the form {'sAMAccountName': [username_bare]}
|
||||
logger.warning( 'User [%s] not found!' % username )
|
||||
logger.warning('User [%s] not found!' % username)
|
||||
return False
|
||||
if ldap_binddn:
|
||||
# We know the user exists & is in the correct OU
|
||||
# so now we just check the password
|
||||
con.simple_bind_s( username, password )
|
||||
con.simple_bind_s(username, password)
|
||||
username=username_bare
|
||||
|
||||
if ldap_mode == 'domino':
|
||||
# Notes Domino
|
||||
if "@" in username:
|
||||
username = username.split( "@" )[0]
|
||||
con.simple_bind_s( username, password )
|
||||
username = username.split("@")[0]
|
||||
con.simple_bind_s(username, password)
|
||||
if manage_user:
|
||||
# TODO: sorry I have no clue how to query attrs in domino
|
||||
result = {user_firstname_attrib: username,
|
||||
@@ -247,85 +250,85 @@ def ldap_auth( server = 'ldap', port = None,
|
||||
if ldap_mode == 'cn':
|
||||
# OpenLDAP (CN)
|
||||
dn = "cn=" + username + "," + ldap_basedn
|
||||
con.simple_bind_s( dn, password )
|
||||
con.simple_bind_s(dn, password)
|
||||
if manage_user:
|
||||
result = con.search_s(
|
||||
result = con.search_s(
|
||||
dn, ldap.SCOPE_BASE,
|
||||
"(objectClass=*)",
|
||||
[user_firstname_attrib,
|
||||
user_lastname_attrib,
|
||||
user_mail_attrib]
|
||||
)[0][1]
|
||||
)[0][1]
|
||||
|
||||
if ldap_mode == 'uid':
|
||||
# OpenLDAP (UID)
|
||||
dn = "uid=" + username + "," + ldap_basedn
|
||||
con.simple_bind_s( dn, password )
|
||||
con.simple_bind_s(dn, password)
|
||||
if manage_user:
|
||||
result = con.search_s(
|
||||
result = con.search_s(
|
||||
dn, ldap.SCOPE_BASE,
|
||||
"(objectClass=*)",
|
||||
[user_firstname_attrib,
|
||||
user_lastname_attrib,
|
||||
user_mail_attrib]
|
||||
)[0][1]
|
||||
)[0][1]
|
||||
|
||||
if ldap_mode == 'company':
|
||||
# no DNs or password needed to search directory
|
||||
dn = ""
|
||||
pw = ""
|
||||
# bind anonymously
|
||||
con.simple_bind_s( dn, pw )
|
||||
con.simple_bind_s(dn, pw)
|
||||
# search by e-mail address
|
||||
filter = '(&(mail=' + ldap.filter.escape_filter_chars( username ) + \
|
||||
filter = '(&(mail=' + ldap.filter.escape_filter_chars(username) + \
|
||||
')(' + filterstr + '))'
|
||||
# find the uid
|
||||
attrs = ['uid']
|
||||
if manage_user:
|
||||
attrs.extend( [user_firstname_attrib,
|
||||
attrs.extend([user_firstname_attrib,
|
||||
user_lastname_attrib,
|
||||
user_mail_attrib] )
|
||||
user_mail_attrib])
|
||||
# perform the actual search
|
||||
company_search_result = con.search_s( ldap_basedn,
|
||||
company_search_result = con.search_s(ldap_basedn,
|
||||
ldap.SCOPE_SUBTREE,
|
||||
filter, attrs )
|
||||
filter, attrs)
|
||||
dn = company_search_result[0][0]
|
||||
result = company_search_result[0][1]
|
||||
# perform the real authentication test
|
||||
con.simple_bind_s( dn, password )
|
||||
con.simple_bind_s(dn, password)
|
||||
|
||||
if ldap_mode == 'uid_r':
|
||||
# OpenLDAP (UID) with subtree search and multiple DNs
|
||||
if type( ldap_basedn ) == type( [] ):
|
||||
if type(ldap_basedn) == type([]):
|
||||
basedns = ldap_basedn
|
||||
else:
|
||||
basedns = [ldap_basedn]
|
||||
filter = '(&(uid=%s)(%s))' % ( ldap.filter.escape_filter_chars( username ), filterstr )
|
||||
filter = '(&(uid=%s)(%s))' % (ldap.filter.escape_filter_chars(username), filterstr)
|
||||
finded = False
|
||||
for basedn in basedns:
|
||||
try:
|
||||
result = con.search_s( basedn, ldap.SCOPE_SUBTREE, filter )
|
||||
result = con.search_s(basedn, ldap.SCOPE_SUBTREE, filter)
|
||||
if result:
|
||||
user_dn = result[0][0]
|
||||
# Check the password
|
||||
con.simple_bind_s( user_dn, password )
|
||||
con.simple_bind_s(user_dn, password)
|
||||
finded = True
|
||||
break
|
||||
except ldap.LDAPError, detail:
|
||||
( exc_type, exc_value ) = sys.exc_info()[:2]
|
||||
logger.warning( "ldap_auth: searching %s for %s resulted in %s: %s\n" %
|
||||
( basedn, filter, exc_type, exc_value ) )
|
||||
(exc_type, exc_value) = sys.exc_info()[:2]
|
||||
logger.warning("ldap_auth: searching %s for %s resulted in %s: %s\n" %
|
||||
(basedn, filter, exc_type, exc_value))
|
||||
if not finded:
|
||||
logger.warning( 'User [%s] not found!' % username )
|
||||
logger.warning('User [%s] not found!' % username)
|
||||
return False
|
||||
result = result[0][1]
|
||||
if ldap_mode == 'custom':
|
||||
# OpenLDAP (username_attrs) with subtree search and multiple DNs
|
||||
if type( ldap_basedn ) == type( [] ):
|
||||
if type(ldap_basedn) == type([]):
|
||||
basedns = ldap_basedn
|
||||
else:
|
||||
basedns = [ldap_basedn]
|
||||
filter = '(&(%s=%s)(%s))' % ( username_attrib, ldap.filter.escape_filter_chars( username ), filterstr )
|
||||
filter = '(&(%s=%s)(%s))' % (username_attrib, ldap.filter.escape_filter_chars(username), filterstr)
|
||||
if custom_scope == 'subtree':
|
||||
ldap_scope = ldap.SCOPE_SUBTREE
|
||||
elif custom_scope == 'base':
|
||||
@@ -335,33 +338,33 @@ def ldap_auth( server = 'ldap', port = None,
|
||||
finded = False
|
||||
for basedn in basedns:
|
||||
try:
|
||||
result = con.search_s( basedn, ldap_scope, filter )
|
||||
result = con.search_s(basedn, ldap_scope, filter)
|
||||
if result:
|
||||
user_dn = result[0][0]
|
||||
# Check the password
|
||||
con.simple_bind_s( user_dn, password )
|
||||
con.simple_bind_s(user_dn, password)
|
||||
finded = True
|
||||
break
|
||||
except ldap.LDAPError, detail:
|
||||
( exc_type, exc_value ) = sys.exc_info()[:2]
|
||||
logger.warning( "ldap_auth: searching %s for %s resulted in %s: %s\n" %
|
||||
( basedn, filter, exc_type, exc_value ) )
|
||||
(exc_type, exc_value) = sys.exc_info()[:2]
|
||||
logger.warning("ldap_auth: searching %s for %s resulted in %s: %s\n" %
|
||||
(basedn, filter, exc_type, exc_value))
|
||||
if not finded:
|
||||
logger.warning( 'User [%s] not found!' % username )
|
||||
logger.warning('User [%s] not found!' % username)
|
||||
return False
|
||||
result = result[0][1]
|
||||
if manage_user:
|
||||
logger.info( '[%s] Manage user data' % str( username ) )
|
||||
logger.info('[%s] Manage user data' % str(username))
|
||||
try:
|
||||
if not user_firstname_part == None:
|
||||
store_user_firstname = result[user_firstname_attrib][0].split( ' ', 1 )[user_firstname_part]
|
||||
store_user_firstname = result[user_firstname_attrib][0].split(' ', 1)[user_firstname_part]
|
||||
else:
|
||||
store_user_firstname = result[user_firstname_attrib][0]
|
||||
except KeyError, e:
|
||||
store_user_firstname = None
|
||||
try:
|
||||
if not user_lastname_part == None:
|
||||
store_user_lastname = result[user_lastname_attrib][0].split( ' ', 1 )[user_lastname_part]
|
||||
store_user_lastname = result[user_lastname_attrib][0].split(' ', 1)[user_lastname_part]
|
||||
else:
|
||||
store_user_lastname = result[user_lastname_attrib][0]
|
||||
except KeyError, e:
|
||||
@@ -374,149 +377,149 @@ def ldap_auth( server = 'ldap', port = None,
|
||||
#
|
||||
# user as username
|
||||
# #################
|
||||
user_in_db = db( db.auth_user.username == username )
|
||||
user_in_db = db(db.auth_user.username == username)
|
||||
if user_in_db.count() > 0:
|
||||
user_in_db.update( first_name = store_user_firstname,
|
||||
user_in_db.update(first_name = store_user_firstname,
|
||||
last_name = store_user_lastname,
|
||||
email = store_user_mail )
|
||||
email = store_user_mail)
|
||||
else:
|
||||
db.auth_user.insert( first_name = store_user_firstname,
|
||||
db.auth_user.insert(first_name = store_user_firstname,
|
||||
last_name = store_user_lastname,
|
||||
email = store_user_mail,
|
||||
username = username )
|
||||
username = username)
|
||||
except:
|
||||
#
|
||||
# user as email
|
||||
# ##############
|
||||
user_in_db = db( db.auth_user.email == username )
|
||||
user_in_db = db(db.auth_user.email == username)
|
||||
if user_in_db.count() > 0:
|
||||
user_in_db.update( first_name = store_user_firstname,
|
||||
user_in_db.update(first_name = store_user_firstname,
|
||||
last_name = store_user_lastname,
|
||||
)
|
||||
)
|
||||
else:
|
||||
db.auth_user.insert( first_name = store_user_firstname,
|
||||
db.auth_user.insert(first_name = store_user_firstname,
|
||||
last_name = store_user_lastname,
|
||||
email = username
|
||||
)
|
||||
)
|
||||
con.unbind()
|
||||
|
||||
if manage_groups:
|
||||
if not do_manage_groups( username,password ):
|
||||
if not do_manage_groups(username,password):
|
||||
return False
|
||||
return True
|
||||
except ldap.LDAPError, e:
|
||||
import traceback
|
||||
logger.warning( '[%s] Error in ldap processing' % str( username ) )
|
||||
logger.debug( traceback.format_exc() )
|
||||
logger.warning('[%s] Error in ldap processing' % str(username))
|
||||
logger.debug(traceback.format_exc())
|
||||
return False
|
||||
except IndexError, ex: # for AD membership test
|
||||
import traceback
|
||||
logger.warning( '[%s] Ldap result indexing error' % str( username ) )
|
||||
logger.debug( traceback.format_exc() )
|
||||
logger.warning('[%s] Ldap result indexing error' % str(username))
|
||||
logger.debug(traceback.format_exc())
|
||||
return False
|
||||
|
||||
def is_user_in_allowed_groups( username,
|
||||
def is_user_in_allowed_groups(username,
|
||||
password = None,
|
||||
allowed_groups = allowed_groups
|
||||
):
|
||||
'''
|
||||
Figure out if the username is a member of an allowed group in ldap or not
|
||||
'''
|
||||
):
|
||||
"""
|
||||
Figure out if the username is a member of an allowed group in ldap or not
|
||||
"""
|
||||
#
|
||||
# Get all group name where the user is in actually in ldap
|
||||
# #########################################################
|
||||
ldap_groups_of_the_user = get_user_groups_from_ldap( username, password )
|
||||
ldap_groups_of_the_user = get_user_groups_from_ldap(username, password)
|
||||
|
||||
# search for allowed group names
|
||||
if type( allowed_groups ) != type( list() ):
|
||||
if type(allowed_groups) != type(list()):
|
||||
allowed_groups = [allowed_groups]
|
||||
for group in allowed_groups:
|
||||
if ldap_groups_of_the_user.count( group ) > 0:
|
||||
if ldap_groups_of_the_user.count(group) > 0:
|
||||
# Match
|
||||
return True
|
||||
# No match
|
||||
return False
|
||||
|
||||
def do_manage_groups( username,
|
||||
def do_manage_groups(username,
|
||||
password = None,
|
||||
db = db,
|
||||
):
|
||||
'''
|
||||
Manage user groups
|
||||
|
||||
Get all user's group from ldap and refresh the already stored
|
||||
ones in web2py's application database or create new groups
|
||||
according to ldap.
|
||||
'''
|
||||
logger.info( '[%s] Manage user groups' % str( username ) )
|
||||
):
|
||||
"""
|
||||
Manage user groups
|
||||
|
||||
Get all user's group from ldap and refresh the already stored
|
||||
ones in web2py's application database or create new groups
|
||||
according to ldap.
|
||||
"""
|
||||
logger.info('[%s] Manage user groups' % str(username))
|
||||
try:
|
||||
#
|
||||
# Get all group name where the user is in actually in ldap
|
||||
# #########################################################
|
||||
ldap_groups_of_the_user = get_user_groups_from_ldap( username, password )
|
||||
ldap_groups_of_the_user = get_user_groups_from_ldap(username, password)
|
||||
|
||||
#
|
||||
# Get all group name where the user is in actually in local db
|
||||
# #############################################################
|
||||
try:
|
||||
db_user_id = db( db.auth_user.username == username ).select( db.auth_user.id ).first().id
|
||||
db_user_id = db(db.auth_user.username == username).select(db.auth_user.id).first().id
|
||||
except:
|
||||
try:
|
||||
db_user_id = db( db.auth_user.email == username ).select( db.auth_user.id ).first().id
|
||||
db_user_id = db(db.auth_user.email == username).select(db.auth_user.id).first().id
|
||||
except AttributeError, e:
|
||||
#
|
||||
# There is no user in local db
|
||||
# We create one
|
||||
# ##############################
|
||||
try:
|
||||
db_user_id = db.auth_user.insert( username = username,
|
||||
first_name = username )
|
||||
db_user_id = db.auth_user.insert(username = username,
|
||||
first_name = username)
|
||||
except AttributeError, e:
|
||||
db_user_id = db.auth_user.insert( email = username,
|
||||
first_name = username )
|
||||
db_user_id = db.auth_user.insert(email = username,
|
||||
first_name = username)
|
||||
if not db_user_id:
|
||||
logging.error( 'There is no username or email for %s!' % username )
|
||||
logging.error('There is no username or email for %s!' % username)
|
||||
raise
|
||||
db_group_search = db( ( db.auth_membership.user_id == db_user_id ) & \
|
||||
( db.auth_user.id == db.auth_membership.user_id ) & \
|
||||
( db.auth_group.id == db.auth_membership.group_id ) )
|
||||
db_group_search = db((db.auth_membership.user_id == db_user_id) & \
|
||||
(db.auth_user.id == db.auth_membership.user_id) & \
|
||||
(db.auth_group.id == db.auth_membership.group_id))
|
||||
db_groups_of_the_user = list()
|
||||
db_group_id = dict()
|
||||
|
||||
if db_group_search.count() > 0:
|
||||
for group in db_group_search.select( db.auth_group.id, db.auth_group.role, distinct = True ):
|
||||
for group in db_group_search.select(db.auth_group.id, db.auth_group.role, distinct = True):
|
||||
db_group_id[group.role] = group.id
|
||||
db_groups_of_the_user.append( group.role )
|
||||
logging.debug( 'db groups of user %s: %s' % ( username, str( db_groups_of_the_user ) ) )
|
||||
db_groups_of_the_user.append(group.role)
|
||||
logging.debug('db groups of user %s: %s' % (username, str(db_groups_of_the_user)))
|
||||
|
||||
#
|
||||
# Delete user membership from groups where user is not anymore
|
||||
# #############################################################
|
||||
for group_to_del in db_groups_of_the_user:
|
||||
if ldap_groups_of_the_user.count( group_to_del ) == 0:
|
||||
db( ( db.auth_membership.user_id == db_user_id ) & \
|
||||
( db.auth_membership.group_id == db_group_id[group_to_del] ) ).delete()
|
||||
if ldap_groups_of_the_user.count(group_to_del) == 0:
|
||||
db((db.auth_membership.user_id == db_user_id) & \
|
||||
(db.auth_membership.group_id == db_group_id[group_to_del])).delete()
|
||||
|
||||
#
|
||||
# Create user membership in groups where user is not in already
|
||||
# ##############################################################
|
||||
for group_to_add in ldap_groups_of_the_user:
|
||||
if db_groups_of_the_user.count( group_to_add ) == 0:
|
||||
if db( db.auth_group.role == group_to_add ).count() == 0:
|
||||
gid = db.auth_group.insert( role = group_to_add,
|
||||
description = 'Generated from LDAP' )
|
||||
if db_groups_of_the_user.count(group_to_add) == 0:
|
||||
if db(db.auth_group.role == group_to_add).count() == 0:
|
||||
gid = db.auth_group.insert(role = group_to_add,
|
||||
description = 'Generated from LDAP')
|
||||
else:
|
||||
gid = db( db.auth_group.role == group_to_add ).select( db.auth_group.id ).first().id
|
||||
db.auth_membership.insert( user_id = db_user_id,
|
||||
group_id = gid )
|
||||
gid = db(db.auth_group.role == group_to_add).select(db.auth_group.id).first().id
|
||||
db.auth_membership.insert(user_id = db_user_id,
|
||||
group_id = gid)
|
||||
except:
|
||||
logger.warning( "[%s] Groups are not managed successully!" % str( username ) )
|
||||
logger.warning("[%s] Groups are not managed successully!" % str(username))
|
||||
import traceback
|
||||
logger.debug( traceback.format_exc() )
|
||||
logger.debug(traceback.format_exc())
|
||||
return False
|
||||
return True
|
||||
|
||||
def init_ldap(
|
||||
def init_ldap(
|
||||
ldap_server = server,
|
||||
ldap_port = port,
|
||||
ldap_basedn = base_dn,
|
||||
@@ -524,28 +527,28 @@ def ldap_auth( server = 'ldap', port = None,
|
||||
secure = secure,
|
||||
cert_path = cert_path,
|
||||
cert_file = cert_file
|
||||
):
|
||||
'''
|
||||
Inicialize ldap connection
|
||||
'''
|
||||
logger.info( '[%s] Inicialize ldap connection' % str( ldap_server ) )
|
||||
):
|
||||
"""
|
||||
Inicialize ldap connection
|
||||
"""
|
||||
logger.info('[%s] Inicialize ldap connection' % str(ldap_server))
|
||||
if secure:
|
||||
if not ldap_port:
|
||||
ldap_port = 636
|
||||
con = ldap.initialize(
|
||||
"ldaps://" + ldap_server + ":" + str( ldap_port ) )
|
||||
con = ldap.initialize(
|
||||
"ldaps://" + ldap_server + ":" + str(ldap_port))
|
||||
if cert_path:
|
||||
con.set_option( ldap.OPT_X_TLS_CACERTDIR, cert_path )
|
||||
con.set_option(ldap.OPT_X_TLS_CACERTDIR, cert_path)
|
||||
if cert_file:
|
||||
con.set_option( ldap.OPT_X_TLS_CACERTFILE, cert_file )
|
||||
con.set_option(ldap.OPT_X_TLS_CACERTFILE, cert_file)
|
||||
else:
|
||||
if not ldap_port:
|
||||
ldap_port = 389
|
||||
con = ldap.initialize(
|
||||
"ldap://" + ldap_server + ":" + str( ldap_port ) )
|
||||
con = ldap.initialize(
|
||||
"ldap://" + ldap_server + ":" + str(ldap_port))
|
||||
return con
|
||||
|
||||
def get_user_groups_from_ldap( username,
|
||||
def get_user_groups_from_ldap(username,
|
||||
password = None,
|
||||
base_dn = base_dn,
|
||||
ldap_binddn = bind_dn,
|
||||
@@ -555,11 +558,11 @@ def ldap_auth( server = 'ldap', port = None,
|
||||
group_member_attrib = group_member_attrib,
|
||||
group_filterstr = group_filterstr,
|
||||
ldap_mode = mode
|
||||
):
|
||||
'''
|
||||
Get all group names from ldap where the user is in
|
||||
'''
|
||||
logger.info( '[%s] Get user groups from ldap' % str( username ) )
|
||||
):
|
||||
"""
|
||||
Get all group names from ldap where the user is in
|
||||
"""
|
||||
logger.info('[%s] Get user groups from ldap' % str(username))
|
||||
#
|
||||
# Get all group name where the user is in actually in ldap
|
||||
# #########################################################
|
||||
@@ -574,50 +577,50 @@ def ldap_auth( server = 'ldap', port = None,
|
||||
# ####################
|
||||
if '@' not in username:
|
||||
domain = []
|
||||
for x in base_dn.split( ',' ):
|
||||
for x in base_dn.split(','):
|
||||
if "DC=" in x.upper():
|
||||
domain.append( x.split( '=' )[-1] )
|
||||
username = "%s@%s" % ( username, '.'.join( domain ) )
|
||||
username_bare = username.split( "@" )[0]
|
||||
con.set_option( ldap.OPT_PROTOCOL_VERSION, 3 )
|
||||
domain.append(x.split('=')[-1])
|
||||
username = "%s@%s" % (username, '.'.join(domain))
|
||||
username_bare = username.split("@")[0]
|
||||
con.set_option(ldap.OPT_PROTOCOL_VERSION, 3)
|
||||
# In cases where ForestDnsZones and DomainDnsZones are found,
|
||||
# result will look like the following:
|
||||
# ['ldap://ForestDnsZones.domain.com/DC=ForestDnsZones,DC=domain,DC=com']
|
||||
if ldap_binddn:
|
||||
# need to search directory with an admin account 1st
|
||||
con.simple_bind_s( ldap_binddn, ldap_bindpw )
|
||||
con.simple_bind_s(ldap_binddn, ldap_bindpw)
|
||||
logger.debug('Ldap bind connect...')
|
||||
else:
|
||||
# credentials should be in the form of username@domain.tld
|
||||
con.simple_bind_s( username, password )
|
||||
con.simple_bind_s(username, password)
|
||||
logger.debug('Ldap username connect...')
|
||||
# We have to use the full string
|
||||
username = con.search_ext_s(
|
||||
username = con.search_ext_s(
|
||||
base_dn, ldap.SCOPE_SUBTREE,
|
||||
"(&(sAMAccountName=%s)(%s))" % ( ldap.filter.escape_filter_chars( username_bare ), filterstr ), ["cn"] )[0][0]
|
||||
"(&(sAMAccountName=%s)(%s))" % (ldap.filter.escape_filter_chars(username_bare), filterstr), ["cn"])[0][0]
|
||||
else:
|
||||
if ldap_binddn:
|
||||
# need to search directory with an bind_dn account 1st
|
||||
con.simple_bind_s( ldap_binddn, ldap_bindpw )
|
||||
con.simple_bind_s(ldap_binddn, ldap_bindpw)
|
||||
else:
|
||||
# bind as anonymous
|
||||
con.simple_bind_s( '', '' )
|
||||
con.simple_bind_s('', '')
|
||||
|
||||
# search for groups where user is in
|
||||
filter = '(&(%s=%s)(%s))' % ( ldap.filter.escape_filter_chars( group_member_attrib ),
|
||||
ldap.filter.escape_filter_chars( username ),
|
||||
group_filterstr )
|
||||
group_search_result = con.search_s( group_dn,
|
||||
filter = '(&(%s=%s)(%s))' % (ldap.filter.escape_filter_chars(group_member_attrib),
|
||||
ldap.filter.escape_filter_chars(username),
|
||||
group_filterstr)
|
||||
group_search_result = con.search_s(group_dn,
|
||||
ldap.SCOPE_SUBTREE,
|
||||
filter, [group_name_attrib] )
|
||||
filter, [group_name_attrib])
|
||||
ldap_groups_of_the_user = list()
|
||||
for group_row in group_search_result:
|
||||
group = group_row[1]
|
||||
ldap_groups_of_the_user.extend( group[group_name_attrib] )
|
||||
ldap_groups_of_the_user.extend(group[group_name_attrib])
|
||||
|
||||
con.unbind()
|
||||
logger.debug('User groups: %s' % ldap_groups_of_the_user )
|
||||
return list( ldap_groups_of_the_user )
|
||||
logger.debug('User groups: %s' % ldap_groups_of_the_user)
|
||||
return list(ldap_groups_of_the_user)
|
||||
|
||||
|
||||
if filterstr[0] == '(' and filterstr[-1] == ')': # rfc4515 syntax
|
||||
|
||||
Reference in New Issue
Block a user