# _service_accessor.py
#
# Project: AutoArchive
# License: GNU GPLv3
#
# Copyright (C) 2003 - 2023 Róbert Čerňanský
from typing import Union, TypeVar, Iterator
from AutoArchive._infrastructure.service import IServiceIdentification, IService
# SMELL: Application layer can register. Factor the registration out to a separate class?.
[docs]
class ServiceAccessor:
"""Access to services.
A service can be made available by registering via this interface. Registered classes has to implement
:class:`.IService`.
:param workDir: Path to a writable directory which shall be used as a persistent storage."""
def __init__(self, workDir: str):
self.__workDir = workDir
# key is service identification, value is list of tuples (service provider class,
# service provider identification)
self.__servicesToProvidersMap: dict[type[IServiceIdentification], list[tuple[type[IService], Union[str, None]]]] = {}
# key is service provider class, value is its instance
self.__serviceInstances: dict[type[IService], IService] = {}
[docs]
def getOrCreateService(self, serviceIdentification: type[IServiceIdentification],
providerIdentification: Union[str, None]) -> IService:
"""Provides access to registered services interfaces.
See also: :meth:`registerInterface`.
:param serviceIdentification: Identification of the service that shall be created.
:type serviceIdentification: :class:`.IServiceIdentification`
:param providerIdentification: Service provider identification under which it was registered.
For services with single provider which were registered with service provider identification ``None`` it
shall be ``None``.
:type providerIdentification: ``str``
:return: Instance of a service provider of the particular service.
:rtype: ``serviceIdentification.interface``
:raise KeyError: If ``serviceIdentification`` is not registered."""
providers = self.__findProviders(serviceIdentification)
if len(self.__filterProvidersByIdentification(providers, providerIdentification)) == 0:
raise KeyError(
str.format("Service {} is not registered with service provider {}.", serviceIdentification,
providerIdentification))
return self.__getOrCreateServiceForClass(self.__findProviderClass(providers, providerIdentification))
[docs]
def getOrCreateServices(self, serviceIdentification: type[IServiceIdentification]) -> Iterator[IService]:
for providerClass in (p[0] for p in self.__findProviders(serviceIdentification)):
yield self.__getOrCreateServiceForClass(providerClass)
__ISI = TypeVar("__ISI", bound = IServiceIdentification)
[docs]
def registerService(self, serviceIdentification: type[__ISI], providerClass: type[IService],
providerIdentification: str = None):
"""Registers a service.
See also: :meth:`unregisterService`, :meth:`getOrCreateService`.
:param serviceIdentification: Identification of the service that shall be registered.
:type serviceIdentification: :class:`.IServiceIdentification`
:param providerClass: Provider of the service.
:type providerClass: ``type{`` ``serviceIdentification.interface``, :class:`IService`\ ``}``
:param providerIdentification: Corresponding service provider identification object.
:type providerIdentification: ``serviceIdentification.providerIdentificationInterface``
:raise TypeError: If ``providerClass`` does not implement ``serviceIdentification.interface``. If
``providerIdentification`` does not implement ``serviceIdentification.providerIdentificationInterface``.
:raise KeyError: If ``serviceIdentification`` is already registered with ``providerClass`` or
``providerIdentification``."""
if not issubclass(providerClass, serviceIdentification.interface):
raise TypeError("providerClass")
providers = self.__tryFindProviders(serviceIdentification)
if providers is None:
providers = []
self.__servicesToProvidersMap[serviceIdentification] = providers
if len(self.__filterProvidersByIdentification(providers, providerIdentification)) > 0:
raise KeyError(
str.format("Service {} is already registered with service provider info {}.", serviceIdentification,
providerIdentification))
if len(self.__filterProvidersByClass(providers, providerClass)) > 0:
raise KeyError(
str.format("Service {} is already registered with service provider class {}.", serviceIdentification,
providerClass))
self.__servicesToProvidersMap[serviceIdentification].append((providerClass, providerIdentification))
[docs]
def unregisterService(self, serviceIdentification: type[IServiceIdentification]):
"""Unregisters a service with all its providers.
All serviceType instances all destroyed first.
See also: :meth:`registerService`, :meth:`getOrCreateService`.
:param serviceIdentification: Identification of the service that shall be registered.
:type serviceIdentification: :class:`.IServiceIdentification`
:raise KeyError: If ``serviceIdentification`` is not registered."""
self.__destroyServiceInstances(serviceIdentification)
self.__assertIsRegistered(serviceIdentification)
del self.__servicesToProvidersMap[serviceIdentification]
def __getOrCreateServiceForClass(self, providerClass: type[IService]) -> IService:
if providerClass in self.__serviceInstances:
serviceInstance = self.__serviceInstances[providerClass]
else:
serviceInstance = providerClass(self.__workDir)
self.__serviceInstances[providerClass] = serviceInstance
return serviceInstance
def __destroyServiceInstances(self, serviceIdentification):
"Destroys service instances if any."
providers = self.__findProviders(serviceIdentification)
for provider in providers:
if provider[0] in self.__serviceInstances:
del self.__serviceInstances[provider[0]]
def __findProviders(self, serviceIdentification) -> list[tuple[type[IService], str]]:
self.__assertIsRegistered(serviceIdentification)
return self.__servicesToProvidersMap[serviceIdentification]
def __tryFindProviders(self, serviceIdentification) -> Union[list[tuple[type[IService], str]], None]:
if serviceIdentification not in self.__servicesToProvidersMap:
return None
return self.__servicesToProvidersMap[serviceIdentification]
def __assertIsRegistered(self, serviceIdentification):
if serviceIdentification not in self.__servicesToProvidersMap:
raise KeyError(str.format("Service {} is not registered", serviceIdentification))
@classmethod
def __findProviderClass(
cls, providers: list[tuple[type[IService], str]], providerIdentification: str) -> type[IService]:
provider = cls.__filterProvidersByIdentification(providers, providerIdentification)[0]
return provider[0]
@staticmethod
def __filterProvidersByIdentification(providers: list[tuple[type[IService], str]],
providerIdentification: str) -> list[tuple[type[IService], str]]:
return [p for p in providers if p[1] == providerIdentification]
@staticmethod
def __filterProvidersByClass(providers, providerClass):
return [p for p in providers if p[0] == providerClass]