Commit 55724118 authored by Julien Tayon's avatar Julien Tayon
Browse files

[ldapfeed] make code compatible with ldap3>=2

* Some constants have been renamed.
* Directly bind when data-cnx-dn/data-cnx-password are provided, some servers,
  including ours require this.
* Use raise_exceptions=True to avoid ignored ldap errors
* raise in case of failed anonymous bind
* do not search for "dn" attribute because this raise an "invalid attribute"
  with new ldap3 versions
* Password is now returned as bytes, so no longer need to encode them before crypt.
* modification_date is now returned as a datetime object
Co-Authored-By: Philippe Pepiot's avatarPhilippe Pepiot <>

Closes #16073071

branch : 3.27
parent ba528f08ddfa
......@@ -17,8 +17,6 @@
# with CubicWeb. If not, see <>.
"""cubicweb ldap feed source"""
from datetime import datetime
import ldap3
from logilab.common.configuration import merge_options
......@@ -30,10 +28,11 @@ from cubicweb.server.sources import datafeed
from cubicweb import _
# search scopes
'BASE': ldap3.BASE,
# map ldap protocol to their standard port
PROTO_PORT = {'ldap': 389,
......@@ -250,7 +249,7 @@ You can set multiple groups by separating them by a comma.',
# check password by establishing a (unused) connection
self._connect(user, password)
except ldap3.LDAPException as ex:
except ldap3.core.exceptions.LDAPException as ex:
# Something went wrong, most likely bad credentials'while trying to authenticate %s: %s', user, ex)
raise AuthenticationError()
......@@ -266,18 +265,27 @@ You can set multiple groups by separating them by a comma.',
def _connect(self, user=None, userpwd=None):
protocol, host, port = self.connection_info()
kwargs = {}
if user:
kwargs['user'] = user['dn']
elif self.cnx_dn:
kwargs['user'] = self.cnx_dn
if self.cnx_pwd:
kwargs['password'] = self.cnx_pwd'connecting %s://%s:%s as %s', protocol, host, port,
user and user['dn'] or 'anonymous')
kwargs.get('user', 'anonymous'))
server = ldap3.Server(host, port=int(port))
conn = ldap3.Connection(
server, user=user and user['dn'],
server, client_strategy=ldap3.RESTARTABLE, auto_referrals=False,
# Now bind with the credentials given. Let exceptions propagate out.
if user is None:
# XXX always use simple bind for data connection
# anonymous bind
if not self.cnx_dn:
if not conn.bind():
raise AuthenticationError(conn.result["message"])
self._authenticate(conn, {'dn': self.cnx_dn}, self.cnx_pwd)
......@@ -288,7 +296,6 @@ You can set multiple groups by separating them by a comma.',
return conn
def _auth_simple(self, conn, user, userpwd):
conn.authentication = ldap3.AUTH_SIMPLE
conn.user = user['dn']
conn.password = userpwd
return conn.bind()
......@@ -313,7 +320,7 @@ You can set multiple groups by separating them by a comma.',
if self._conn is None:
self._conn = self._connect()
ldapcnx = self._conn
if not, searchstr, search_scope=scope, attributes=attrs):
if not, searchstr, search_scope=scope, attributes=set(attrs) - {'dn'}):
return []
result = []
for rec in ldapcnx.response:
......@@ -330,13 +337,13 @@ You can set multiple groups by separating them by a comma.',
itemdict = {'dn': dn}
for key, value in iterator:
if self.user_attrs.get(key) == 'upassword': # XXx better password detection
value = value[0].encode('utf-8')
value = value[0]
# we only support ldap_salted_sha1 for ldap sources, see: server/
if not value.startswith(b'{SSHA}'):
value = utils.crypt_password(value)
itemdict[key] = Binary(value)
elif self.user_attrs.get(key) == 'modification_date':
itemdict[key] = datetime.strptime(value[0], '%Y%m%d%H%M%SZ')
itemdict[key] = value
if len(value) == 1:
itemdict[key] = value = value[0]
ldap3 < 2
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment