better ldap_auth, thanks Guyris

This commit is contained in:
Massimo Di Pierro
2012-03-19 15:41:31 -05:00
parent fd0eeda8dc
commit e5e70fc34a
2 changed files with 146 additions and 62 deletions
+1 -1
View File
@@ -1 +1 @@
Version 1.99.7 (2012-03-19 13:03:28) dev
Version 1.99.7 (2012-03-19 15:40:54) dev
+145 -61
View File
@@ -1,3 +1,8 @@
# -*- coding: utf-8 -*-
#
# last tinkered with by szimszon at oregpreshaz.eu on 2012-03-19
#
import sys
import logging
try:
@@ -12,6 +17,8 @@ def ldap_auth( server = 'ldap', port = None,
base_dn = 'ou=users,dc=domain,dc=com',
mode = 'uid', secure = False, cert_path = None, cert_file = None,
bind_dn = None, bind_pw = None, filterstr = 'objectClass=*',
username_attrib = 'uid',
custom_scope = 'subtree',
allowed_groups = None,
manage_user = False,
user_firstname_attrib = 'cn:1',
@@ -22,22 +29,23 @@ def ldap_auth( server = 'ldap', port = None,
group_dn = None,
group_name_attrib = 'cn',
group_member_attrib = 'memberUid',
group_filterstr = 'objectClass=*' ):
group_filterstr = 'objectClass=*',
logging_level = 'error' ):
"""
to use ldap login with MS Active Directory::
to use ldap login with MS Active Directory:
from gluon.contrib.login_methods.ldap_auth import ldap_auth
auth.settings.login_methods.append(ldap_auth(
mode='ad', server='my.domain.controller',
base_dn='ou=Users,dc=domain,dc=com'))
to use ldap login with Notes Domino::
to use ldap login with Notes Domino:
auth.settings.login_methods.append(ldap_auth(
mode='domino',server='my.domino.server'))
to use ldap login with OpenLDAP::
to use ldap login with OpenLDAP:
auth.settings.login_methods.append(ldap_auth(
server='my.ldap.server', base_dn='ou=Users,dc=domain,dc=com'))
@@ -48,12 +56,22 @@ def ldap_auth( server = 'ldap', port = None,
mode='uid_r', server='my.ldap.server',
base_dn=['ou=Users,dc=domain,dc=com','ou=Staff,dc=domain,dc=com']))
or (if using CN)::
or (if using CN):
auth.settings.login_methods.append(ldap_auth(
mode='cn', server='my.ldap.server',
base_dn='ou=Users,dc=domain,dc=com'))
or you can full customize the search for user:
auth.settings.login_methods.append(ldap_auth(
mode='custom', server='my.ldap.server',
base_dn='ou=Users,dc=domain,dc=com',
username_attrib='uid',
custom_scope='subtree'))
the custom_scope can be: base, onelevel, subtree.
If using secure ldaps:// pass secure=True and cert_path="..."
If ldap is using GnuTLS then you need cert_file="..." instead cert_path because
cert_path isn't implemented in GnuTLS :(
@@ -119,8 +137,20 @@ def ldap_auth( server = 'ldap', port = None,
group_name_attrib - the attribute where the group name is stored
group_member_attrib - the attibute containing the group members name
group_filterstr - as the filterstr but for group select
You can set the logging level with the "logging_level" parameter, default
is "error" and can be set to error, warning, info, debug.
"""
logger = logging.getLogger( 'web2py.auth.ldap_auth' )
if logging_level == 'error':
logger.setLevel( logging.ERROR )
elif logging_level == 'warning':
logger.setLevel( logging.WARNING )
elif logging_level == 'info':
logger.setLevel( logging.INFO )
elif logging_level == 'debug':
logger.setLevel( logging.DEBUG )
def ldap_auth_aux( username,
password,
ldap_server = server,
@@ -133,6 +163,8 @@ def ldap_auth( server = 'ldap', port = None,
cert_path = cert_path,
cert_file = cert_file,
filterstr = filterstr,
username_attrib = username_attrib,
custom_scope = custom_scope,
manage_user = manage_user,
user_firstname_attrib = user_firstname_attrib,
user_lastname_attrib = user_lastname_attrib,
@@ -140,6 +172,8 @@ def ldap_auth( server = 'ldap', port = None,
manage_groups = manage_groups,
allowed_groups = allowed_groups,
db = db ):
logger.debug( 'mode: [%s] manage_user: [%s] custom_scope: [%s] manage_groups: [%s]' % (
str( mode ), str( manage_user ), str( custom_scope ), str( manage_groups ) ) )
if manage_user:
if user_firstname_attrib.count( ':' ) > 0:
( user_firstname_attrib, user_firstname_part ) = user_firstname_attrib.split( ':', 1 )
@@ -192,6 +226,7 @@ def ldap_auth( server = 'ldap', port = None,
requested_attrs )[0][1]
if not isinstance( result, dict ):
# result should be a dict in the form {'sAMAccountName': [username_bare]}
logger.warning( 'User [%s] not found!' % username )
return False
if ldap_binddn:
# We know the user exists & is in the correct OU
@@ -278,12 +313,45 @@ def ldap_auth( server = 'ldap', port = None,
break
except ldap.LDAPError, detail:
( exc_type, exc_value ) = sys.exc_info()[:2]
sys.stderr.write( "ldap_auth: searching %s for %s resulted in %s: %s\n" %
logger.warning( "ldap_auth: searching %s for %s resulted in %s: %s\n" %
( basedn, filter, exc_type, exc_value ) )
if not finded:
logger.warning( 'User [%s] not found!' % username )
return False
result = result[0][1]
if ldap_mode == 'custom':
# OpenLDAP (username_attrs) with subtree search and multiple DNs
if type( ldap_basedn ) == type( [] ):
basedns = ldap_basedn
else:
basedns = [ldap_basedn]
filter = '(&(%s=%s)(%s))' % ( username_attrib, ldap.filter.escape_filter_chars( username ), filterstr )
if custom_scope == 'subtree':
ldap_scope = ldap.SCOPE_SUBTREE
elif custom_scope == 'base':
ldap_scope = ldap.SCOPE_BASE
elif custom_scope == 'onelevel':
ldap_scope = ldap.SCOPE_ONELEVEL
finded = False
for basedn in basedns:
try:
result = con.search_s( basedn, ldap_scope, filter )
if result:
user_dn = result[0][0]
# Check the password
con.simple_bind_s( user_dn, password )
finded = True
break
except ldap.LDAPError, detail:
( exc_type, exc_value ) = sys.exc_info()[:2]
logger.warning( "ldap_auth: searching %s for %s resulted in %s: %s\n" %
( basedn, filter, exc_type, exc_value ) )
if not finded:
logger.warning( 'User [%s] not found!' % username )
return False
result = result[0][1]
if manage_user:
logger.info( '[%s] Manage user data' % str( username ) )
try:
if not user_firstname_part == None:
store_user_firstname = result[user_firstname_attrib][0].split( ' ', 1 )[user_firstname_part]
@@ -333,12 +401,18 @@ def ldap_auth( server = 'ldap', port = None,
con.unbind()
if manage_groups:
do_manage_groups( username )
if not do_manage_groups( username ):
return False
return True
except ldap.LDAPError, e:
import traceback
logger.warning( '[%s] Error in ldap processing' % str( username ) )
logger.debug( traceback.format_exc() )
return False
except IndexError, ex: # for AD membership test
import traceback
logger.warning( '[%s] Ldap result indexing error' % str( username ) )
logger.debug( traceback.format_exc() )
return False
def is_user_in_allowed_groups( username,
@@ -372,65 +446,73 @@ def ldap_auth( server = 'ldap', port = None,
ones in web2py's application database or create new groups
according to ldap.
'''
#
# Get all group name where the user is in actually in ldap
# #########################################################
ldap_groups_of_the_user = get_user_groups_from_ldap( username )
#
# Get all group name where the user is in actually in local db
# #############################################################
logger.info( '[%s] Manage user groups' % str( username ) )
try:
db_user_id = db( db.auth_user.username == username ).select( db.auth_user.id ).first().id
except:
#
# Get all group name where the user is in actually in ldap
# #########################################################
ldap_groups_of_the_user = get_user_groups_from_ldap( username )
#
# Get all group name where the user is in actually in local db
# #############################################################
try:
db_user_id = db( db.auth_user.email == username ).select( db.auth_user.id ).first().id
except AttributeError, e:
#
# There is no user in local db
# We create one
# ##############################
db_user_id = db( db.auth_user.username == username ).select( db.auth_user.id ).first().id
except:
try:
db_user_id = db.auth_user.insert( username = username,
first_name = username )
db_user_id = db( db.auth_user.email == username ).select( db.auth_user.id ).first().id
except AttributeError, e:
db_user_id = db.auth_user.insert( email = username,
first_name = username )
if not db_user_id:
logging.error( 'There is no username or email for %s!' % username )
raise
db_group_search = db( ( db.auth_membership.user_id == db_user_id ) & \
( db.auth_user.id == db.auth_membership.user_id ) & \
( db.auth_group.id == db.auth_membership.group_id ) )
db_groups_of_the_user = list()
db_group_id = dict()
#
# There is no user in local db
# We create one
# ##############################
try:
db_user_id = db.auth_user.insert( username = username,
first_name = username )
except AttributeError, e:
db_user_id = db.auth_user.insert( email = username,
first_name = username )
if not db_user_id:
logging.error( 'There is no username or email for %s!' % username )
raise
db_group_search = db( ( db.auth_membership.user_id == db_user_id ) & \
( db.auth_user.id == db.auth_membership.user_id ) & \
( db.auth_group.id == db.auth_membership.group_id ) )
db_groups_of_the_user = list()
db_group_id = dict()
if db_group_search.count() > 0:
for group in db_group_search.select( db.auth_group.id, db.auth_group.role, distinct = True ):
db_group_id[group.role] = group.id
db_groups_of_the_user.append( group.role )
logging.debug( 'db groups of user %s: %s' % ( username, str( db_groups_of_the_user ) ) )
if db_group_search.count() > 0:
for group in db_group_search.select( db.auth_group.id, db.auth_group.role, distinct = True ):
db_group_id[group.role] = group.id
db_groups_of_the_user.append( group.role )
logging.debug( 'db groups of user %s: %s' % ( username, str( db_groups_of_the_user ) ) )
#
# Delete user membership from groups where user is not anymore
# #############################################################
for group_to_del in db_groups_of_the_user:
if ldap_groups_of_the_user.count( group_to_del ) == 0:
db( ( db.auth_membership.user_id == db_user_id ) & \
( db.auth_membership.group_id == db_group_id[group_to_del] ) ).delete()
#
# Delete user membership from groups where user is not anymore
# #############################################################
for group_to_del in db_groups_of_the_user:
if ldap_groups_of_the_user.count( group_to_del ) == 0:
db( ( db.auth_membership.user_id == db_user_id ) & \
( db.auth_membership.group_id == db_group_id[group_to_del] ) ).delete()
#
# Create user membership in groups where user is not in already
# ##############################################################
for group_to_add in ldap_groups_of_the_user:
if db_groups_of_the_user.count( group_to_add ) == 0:
if db( db.auth_group.role == group_to_add ).count() == 0:
gid = db.auth_group.insert( role = group_to_add,
description = 'Generated from LDAP' )
else:
gid = db( db.auth_group.role == group_to_add ).select( db.auth_group.id ).first().id
db.auth_membership.insert( user_id = db_user_id,
group_id = gid )
#
# Create user membership in groups where user is not in already
# ##############################################################
for group_to_add in ldap_groups_of_the_user:
if db_groups_of_the_user.count( group_to_add ) == 0:
if db( db.auth_group.role == group_to_add ).count() == 0:
gid = db.auth_group.insert( role = group_to_add,
description = 'Generated from LDAP' )
else:
gid = db( db.auth_group.role == group_to_add ).select( db.auth_group.id ).first().id
db.auth_membership.insert( user_id = db_user_id,
group_id = gid )
except:
logger.warning( "[%s] Groups are not managed successully!" % str( username ) )
import traceback
logger.debug( traceback.format_exc() )
return False
return True
def init_ldap(
ldap_server = server,
@@ -444,6 +526,7 @@ def ldap_auth( server = 'ldap', port = None,
'''
Inicialize ldap connection
'''
logger.info( '[%s] Inicialize ldap connection' % str( ldap_server ) )
if secure:
if not ldap_port:
ldap_port = 636
@@ -472,6 +555,7 @@ def ldap_auth( server = 'ldap', port = None,
'''
Get all group names from ldap where the user is in
'''
logger.info( '[%s] Get user groups from ldap' % str( username ) )
#
# Get all group name where the user is in actually in ldap
# #########################################################