# -*- coding: utf-8 -*-
#
# Copyright 2018-2023 Univention GmbH
#
# http://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention.
#
# This program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <http://www.gnu.org/licenses/>.
"""
Base classes for (simplified) UDM modules and objects.
"""
import collections
import copy
import pprint
from typing import Any, Dict, Iterable, Iterator, Tuple, Union
LdapMapping = collections.namedtuple("LdapMapping", ("ldap2udm", "udm2ldap"))
[docs]class BaseObjectProperties(collections.abc.Mapping, collections.abc.Iterable):
"""Container for UDM properties."""
def __init__(self, udm_obj: "BaseObject") -> None:
self._udm_obj = udm_obj
def __contains__(self, item) -> bool:
return hasattr(self, item)
def __deepcopy__(self, memo: Dict[int, "BaseObjectProperties"]) -> "BaseObjectProperties":
id_self = id(self)
if not memo.get(id_self):
memo[id_self] = self.__class__(self._udm_obj)
for k, v in self.__dict__.items():
if k == "_udm_obj":
setattr(memo[id_self], k, v)
else:
setattr(memo[id_self], k, copy.deepcopy(v))
return memo[id_self]
def __eq__(self, other: "BaseObjectProperties") -> bool:
# compare keys first (fast and low memory)
if set(self.keys()) != set(other.keys()):
return False
# compare values one at a time to reduce memory usage
for k, v in self.items():
if v != getattr(other, k):
return False
return True
def __getitem__(self, key):
try:
return getattr(self, key)
except AttributeError as exc:
raise KeyError(f"{self.__class__.__name__} does not have key {key!r}.") from exc
def __iter__(self) -> Iterator[str]:
return (k for k in self.__dict__.keys() if not str(k).startswith("_"))
def __len__(self):
return len(list(self.keys()))
def __repr__(self) -> str:
return "{}({})".format(
self.__class__.__name__,
pprint.pformat(dict((k, v) for k, v in self.items()), indent=2),
)
def __setitem__(self, key, value):
if key not in self:
raise TypeError(f"Assignment to non existent attribute {key!r} forbidden.")
setattr(self, key, value)
[docs] def items(self) -> Iterable[Tuple[str, Any]]:
return ((k, v) for k, v in self.__dict__.items() if not str(k).startswith("_"))
[docs] def keys(self) -> Iterable[str]:
return (k for k in iter(self))
[docs] def update(self, other: "BaseObjectProperties" = None, **kwargs) -> None:
for k in other or []:
self[k] = other[k]
for k in kwargs:
self[k] = kwargs[k]
[docs] def values(self) -> Iterable[Any]:
return (v for k, v in self.__dict__.items() if not str(k).startswith("_"))
[docs]class BaseObject:
"""
Base class for UDM object classes.
Usage:
* Creation of instances is always done through
:py:meth:`BaseModule.new`, :py:meth:`BaseModule.get` or :py:meth:`BaseModule.search`.
* Modify an object::
user.props.firstname = 'Peter'
user.props.lastname = 'Pan'
user.save()
* Move an object::
user.position = 'cn=users,ou=Company,dc=example,dc=com'
user.save()
* Delete an object::
obj.delete()
After saving a :py:class:`BaseObject`, it is :py:meth:`.reload` 'ed
automatically because UDM hooks and listener modules often add, modify or
remove properties when saving to LDAP. As this involves LDAP, it can be
disabled if the object is not used afterwards and performance is an issue::
user_mod.meta.auto_reload = False
"""
udm_prop_class = BaseObjectProperties
def __init__(self):
"""
Don't instantiate a :py:class:`BaseObject` directly. Use
:py:meth:`BaseModule.get()`, :py:meth:`BaseModule.new()` or
:py:meth:`BaseModule.search()`.
"""
self.dn: str = ""
self.props: BaseObjectProperties = None
self.options: Dict[str:bool] = {}
self.policies: Dict[str, str] = {}
self.position: str = ""
self.superordinate: str = None
self._udm_module: BaseModule = None
def __eq__(self, other: "BaseObject") -> bool:
if self._udm_module.name != other._udm_module.name:
return False
for attr in ("dn", "options", "props", "policies", "position", "superordinate"):
if getattr(self, attr) != getattr(other, attr):
return False
return True
def __repr__(self) -> str:
return "{}({!r}, {!r})".format(
self.__class__.__name__,
self._udm_module.name if self._udm_module else "<not initialized>",
self.dn,
)
[docs] def reload(self) -> "BaseObject":
"""
Refresh object from LDAP.
:return: self
:rtype: BaseObject
"""
raise NotImplementedError()
[docs] def save(self) -> "BaseObject":
"""
Save object to LDAP.
:return: self
:rtype: BaseObject
:raises univention.udm.exceptions.MoveError: when a move operation fails
"""
raise NotImplementedError()
[docs] def delete(self) -> None:
"""
Remove the object from the LDAP database.
:return: None
"""
raise NotImplementedError()
[docs]class BaseModule(metaclass=BaseModuleMeta):
"""
Base class for UDM module classes. UDM modules are basically UDM object
factories.
Usage:
0. Get module using::
user_mod = UDM().get('users/user')
1. Create fresh, not yet saved BaseObject::
new_user = user_mod.new()
2. Load an existing object::
group = group_mod.get('cn=test,cn=groups,dc=example,dc=com')
group = group_mod.get_by_id('Domain Users')
3. Search and load existing objects::
dc_slaves = dc_slave_mod.search(filter_s='cn=s10*')
campus_groups = group_mod.search(base='ou=campus,dc=example,dc=com')
4. Load existing object(s) without ``open()`` 'ing them::
user_mod.meta.auto_open = False
user = user_mod.get(dn)
user.props.groups == []
"""
_udm_object_class = BaseObject
_udm_module_meta_class = BaseModuleMetadata
class Meta:
supported_api_versions: Iterable[int] = ()
suitable_for: Iterable[str] = []
def __init__(self, name: str, connection: Any, api_version: int) -> None:
self.connection = connection
self.name: str = name
self.meta: BaseModuleMetadata = self.meta.instance(self, api_version)
def __repr__(self) -> str:
return "{}({!r})".format(self.__class__.__name__, self.name)
[docs] def new(self, superordinate: Union[str, BaseObject] = None) -> BaseObject:
"""
Create a new, unsaved :py:class:`BaseObject` object.
:param superordinate: DN or UDM object this one references as its
superordinate (required by some modules)
:type superordinate: str or GenericObject
:return: a new, unsaved BaseObject object
:rtype: BaseObject
"""
raise NotImplementedError()
[docs] def get(self, dn: str) -> BaseObject:
"""
Load UDM object from LDAP.
:param str dn: DN of the object to load.
:return: an existing :py:class:`BaseObject` instance.
:rtype: BaseObject
:raises univention.udm.exceptions.NoObject: if no object is found at `dn`
:raises univention.udm.exceptions.WrongObjectType: if the object found at `dn` is not of type :py:attr:`self.name`
"""
raise NotImplementedError()
[docs] def search(self, filter_s: str = "", base: str = "", scope: str = "sub") -> Iterator[BaseObject]:
"""
Get all UDM objects from LDAP that match the given filter.
:param str filter_s: LDAP filter (only object selector like `uid=foo`
required, `objectClasses` will be set by the UDM module)
:param str base: LDAP search base.
:param str scope: LDAP search scope, e.g. `base` or `sub` or `one`.
:return: iterator of :py:class:`BaseObject` objects
:rtype: Iterator(BaseObject)
"""
raise NotImplementedError()