From 6701dcf4672e9d642185a547c3e193568ae98103 Mon Sep 17 00:00:00 2001 From: Sourcery AI Date: Wed, 16 Mar 2022 12:15:24 +0000 Subject: [PATCH] 'Refactored by Sourcery' --- windows_auth/decorators.py | 39 ++++++++----------- windows_auth/ldap.py | 2 +- windows_auth/ldap_metrics/admin.py | 10 +++-- windows_auth/ldap_metrics/utils.py | 2 +- .../management/commands/createtask.py | 10 ++--- windows_auth/middleware.py | 23 +++++------ windows_auth/models.py | 13 +++---- windows_auth/panels.py | 10 +---- windows_auth/utils.py | 7 ++-- 9 files changed, 50 insertions(+), 66 deletions(-) diff --git a/windows_auth/decorators.py b/windows_auth/decorators.py index a78069d..cd99274 100644 --- a/windows_auth/decorators.py +++ b/windows_auth/decorators.py @@ -18,12 +18,7 @@ def check_domain(user): return True if user.is_authenticated and LDAPUser.objects.filter(user=user).exists(): - if domain: - # for specific domain - return user.ldap.domain == domain - else: - # for all domains - return True + return user.ldap.domain == domain if domain else True else: return False @@ -48,23 +43,23 @@ def ldap_sync_required(function=None, timedelta=None, login_url=None, allow_non_ :param raise_exception: raise sync exception and cause status code 500 """ def check_sync(user): - if user.is_authenticated and LDAPUser.objects.filter(user=user).exists(): - try: - if WAUTH_USE_CACHE: - user.ldap.sync() - else: - # check via database query - if not timedelta or not user.ldap.last_sync or user.ldap.last_sync < timezone.now() - timedelta: - user.ldap.sync() - - return True - except Exception as e: - if raise_exception: - raise e - else: - return False - else: + if ( + not user.is_authenticated + or not LDAPUser.objects.filter(user=user).exists() + ): return allow_non_ldap + try: + if WAUTH_USE_CACHE: + user.ldap.sync() + elif not timedelta or not user.ldap.last_sync or user.ldap.last_sync < timezone.now() - timedelta: + user.ldap.sync() + + return True + except Exception as e: + if raise_exception: + raise e + else: + return False actual_decorator = user_passes_test(check_sync, login_url=login_url) if function: diff --git a/windows_auth/ldap.py b/windows_auth/ldap.py index e2980dd..d389bf3 100644 --- a/windows_auth/ldap.py +++ b/windows_auth/ldap.py @@ -12,7 +12,7 @@ class LDAPManager: def __init__(self, domain: str, settings: Optional[LDAPSettings] = None): self.domain = domain - self.settings = settings if settings else LDAPSettings.for_domain(domain) + self.settings = settings or LDAPSettings.for_domain(domain) # create server self.server = self._create_server() # bind connection diff --git a/windows_auth/ldap_metrics/admin.py b/windows_auth/ldap_metrics/admin.py index 11a2a57..cd7a264 100644 --- a/windows_auth/ldap_metrics/admin.py +++ b/windows_auth/ldap_metrics/admin.py @@ -62,13 +62,17 @@ def elapsed_time(self, obj): return obj.elapsed_time def bytes(self, obj): - return format_bytes(obj.bytes_received) + " / " + format_bytes(obj.bytes_transmitted) + return f'{format_bytes(obj.bytes_received)} / ' + format_bytes( + obj.bytes_transmitted + ) def messages(self, obj): - return str(obj.messages_received) + " / " + str(obj.messages_transmitted) + return f'{str(obj.messages_received)} / {str(obj.messages_transmitted)}' def sockets(self, obj): - return str(obj.open_sockets) + " / " + str(obj.closed_sockets) + " / " + str(obj.wrapped_sockets) + return f'{str(obj.open_sockets)} / {str(obj.closed_sockets)} / ' + str( + obj.wrapped_sockets + ) def has_add_permission(self, request): return False diff --git a/windows_auth/ldap_metrics/utils.py b/windows_auth/ldap_metrics/utils.py index 921658f..d6b5e25 100644 --- a/windows_auth/ldap_metrics/utils.py +++ b/windows_auth/ldap_metrics/utils.py @@ -70,7 +70,7 @@ def create_usage(domain: str, usage: ConnectionUsage) -> LDAPUsage: def collect_metrics(unbind: bool = True): if unbind: - logger.info(f"Unbinding all LDAP Connections for collecting metrics") + logger.info("Unbinding all LDAP Connections for collecting metrics") with LogExecutionTime("Collecting LDAP Connection metrics"): try: diff --git a/windows_auth/management/commands/createtask.py b/windows_auth/management/commands/createtask.py index b3899b0..5eb81ad 100644 --- a/windows_auth/management/commands/createtask.py +++ b/windows_auth/management/commands/createtask.py @@ -66,13 +66,11 @@ def handle(self, command="", predefined=False, name=None, desc="", identity=LOCA interval=None, random=None, timeout=None, priority=None, **options): try: if predefined: - # predefined tasks - if command in PREDEFINED_TASKS: - create_task = PREDEFINED_TASKS[command] - create_task(command=command, name=name, desc=desc, identity=identity, folder=folder, - interval=interval, random=random, timeout=timeout, priority=priority) - else: + if command not in PREDEFINED_TASKS: raise CommandError(f"Predefined task for \"{command}\" does not exist.") + create_task = PREDEFINED_TASKS[command] + create_task(command=command, name=name, desc=desc, identity=identity, folder=folder, + interval=interval, random=random, timeout=timeout, priority=priority) else: # create task definition task_def = create_task_definition(command, description=desc, priority=priority, timeout=timeout) diff --git a/windows_auth/middleware.py b/windows_auth/middleware.py index 7373704..3ec0c91 100644 --- a/windows_auth/middleware.py +++ b/windows_auth/middleware.py @@ -40,25 +40,22 @@ def __call__(self, request): # create new cache key cache.set(cache_key, True, timeout) - else: - # check via database query - if not ldap_user.last_sync or ldap_user.last_sync < timezone.now() - timezone.timedelta(seconds=timeout): - ldap_user.sync() + elif not ldap_user.last_sync or ldap_user.last_sync < timezone.now() - timezone.timedelta(seconds=timeout): + ldap_user.sync() except LDAPUser.DoesNotExist: # user is getting created the first time pass except Exception as e: logger.exception(f"Failed to synchronize user {request.user} against LDAP") - # return error response - if WAUTH_REQUIRE_RESYNC: - if isinstance(WAUTH_ERROR_RESPONSE, int): - return HttpResponse(f"Authorization Failed.", status=WAUTH_ERROR_RESPONSE) - elif callable(WAUTH_ERROR_RESPONSE): + if isinstance(WAUTH_ERROR_RESPONSE, int): + if WAUTH_REQUIRE_RESYNC: + return HttpResponse("Authorization Failed.", status=WAUTH_ERROR_RESPONSE) + elif callable(WAUTH_ERROR_RESPONSE): + if WAUTH_REQUIRE_RESYNC: return WAUTH_ERROR_RESPONSE(request, e) - else: - raise e - response = self.get_response(request) - return response + elif WAUTH_REQUIRE_RESYNC: + raise e + return self.get_response(request) class SimulateWindowsAuthMiddleware: diff --git a/windows_auth/models.py b/windows_auth/models.py index 40bf340..b7bc34e 100644 --- a/windows_auth/models.py +++ b/windows_auth/models.py @@ -45,12 +45,12 @@ def create_user(self, username: str) -> User: raise ValueError("Username must be in username@domain.com format.") sam_account_name, domain = username.split("@", 2) - else: - if "\\" not in username: - raise ValueError("Username must be in DOMAIN\\username format.") - + elif "\\" in username: domain, sam_account_name = username.split("\\", 2) + else: + raise ValueError("Username must be in DOMAIN\\username format.") + if WAUTH_LOWERCASE_USERNAME: sam_account_name = sam_account_name.lower() @@ -92,10 +92,7 @@ def get_ldap_attr(self, attribute: str, as_list: bool = False): raise AttributeError(f"User {self.user} does not have LDAP Attribute {attribute}") attribute_obj: Attribute = getattr(ldap_user, attribute) - if as_list: - return attribute_obj.values - else: - return attribute_obj.value + return attribute_obj.values if as_list else attribute_obj.value @lru_cache() def get_ldap_user(self, attributes: Optional[Iterable[str]] = None) -> Entry: diff --git a/windows_auth/panels.py b/windows_auth/panels.py index efe818c..a68d056 100644 --- a/windows_auth/panels.py +++ b/windows_auth/panels.py @@ -119,10 +119,7 @@ def type_icon(self) -> str: @cached_property def status_icon(self) -> str: - if self.result.get("result") == 0: - return "✅" - else: - return "❌" + return "✅" if self.result.get("result") == 0 else "❌" @cached_property def scope_label(self) -> str: @@ -204,10 +201,7 @@ def wrapper(message_id, timeout=None, get_request=False): )) # mimic the original logic about get_request - if get_request: - return response, result, request - else: - return response, result + return (response, result, request) if get_request else (response, result) wrapper.original = func return wrapper diff --git a/windows_auth/utils.py b/windows_auth/utils.py index 9375085..cd612c3 100644 --- a/windows_auth/utils.py +++ b/windows_auth/utils.py @@ -31,11 +31,10 @@ def __init__(self, self.context = context def _get_message(self) -> str: - if callable(self.msg): - args, kwargs = self.context or ([], {}) - return f"{self.msg(*args, **kwargs)}: {timezone.now() - self.start_time}" - else: + if not callable(self.msg): return f"{self.msg}: {timezone.now() - self.start_time}" + args, kwargs = self.context or ([], {}) + return f"{self.msg(*args, **kwargs)}: {timezone.now() - self.start_time}" def __enter__(self): self.start_time = timezone.now()