kegtap package¶
Module contents¶
Submodules¶
kegtap.plugin module¶
-
class
kegtap.plugin.
Plugin
(*_, **__)¶ Bases:
object
Base class for all Kegtap plugins. Subclasses may override if needed
on_activate()
andon_deactivate()
to initialize or destroy plugin-specific data.A plugin must be explicitly registered via
kegtap.plugin.register
. Usage example:import kegtap.plugin @kegtap.plugin.register class MyPlugin(kegtap.plugin.Plugin): pass # Somewhere else in the code, an instance of MyPlugin can be obtained via instance = kegtap.plugin.find_active(MyPlugin)
See also
-
property
import_name
¶ Returns the fully qualified name of the current plugin, including the module, e.g.
'kegtap_plugins.redis.RedisPlugin'
.
-
on_activate
(*_, **__)¶ This method is called only once when all the plugins have been constructed and Kegtap is being started. After a plugin is activated, it can be retrieved from the plugin pool via
find_active()
. This method sinks its arguments.Note
Subclasses should always call
super().on_activate()
before performing their own initialization. Note that the initialization order is user-defined (based on the specified configuration), therefore it is not possible to access other plugins while activating the current one (as they may not be in the pool yet). It is fine to subclasson_activate
with an async function, e.g.:class MyPlugin(Plugin): async def on_activate(self): super().on_activate() await self._async_resource.produce_result()
-
on_deactivate
(*_, **__)¶ This method is called only once when Kegtap is being shut down and the plugins need to be destroyed. After a plugin is deactivated, it cannot be found in the plugin pool by
find_active()
any longer. This method sinks its arguments.Note
Subclasses should always call
super().on_deactivate()
after performing their own teardown. Note that the deactivation order is user-defined (based on the specified configuration), therefore it is not possible to access other plugins while activating the current one (as they may not be in the pool yet). It is fine to subclasson_deactivate
with an async function, e.g.:class MyPlugin(Plugin): async def on_deactivate(self): await self._async_resource.disconnect() super().on_deactivate()
-
property
-
exception
kegtap.plugin.
PluginNotFoundException
(criterion: Union[Type[Plugin], str], found=typing.Tuple[typing.Type[ForwardRef('Plugin')]])¶ Bases:
fastapi.exceptions.HTTPException
-
class
kegtap.plugin.
PluginPool
¶ Bases:
object
Class that holds the list of currently active plugins, e.g. plugins for which
Plugin.on_activate()
was called andPlugin.on_deactivate()
was not called yet. There is a unique module-level instance of aPluginPool
,kegtap.plugin.ACTIVE_PLUGINS_POOL
; you can query it directly or viafind_active()
. Plugins are stored and identified uniquely byPlugin.import_name()
. The main method of this class isfind()
.This class caches the queries that have been performed to reproduce quickly the same results.
This class only deals with active plugins; if you need to retrieve known (i.e. registered) plugins, refer to
kegtap.plugin.get_registered()
andkegtap.plugin.find_registered()
.See also
find_active()
for example usages.-
add_active_plugin
(plugin: kegtap.plugin.Plugin)¶ Adds
plugin
to the current list.- Raises
KeyError: if a plugin with the same
Plugin.import_name()
already exists.
-
find
(criterion: Union[str, Type[PluginT]], allow_partial_match=True, require_unique=True) → Union[PluginT, Tuple[PluginT]]¶ Tests the active plugins against
criterion
, and returns those that match according tois_match()
. Ifrequire_unique
is True, this method will either return the unique plugin matchingcriterion
, or will throw. See the documentation offind_active()
for example usages.The results of this query are cached for faster lookup.
- criterionstr or subclass of Plugin
Consult
is_match()
for the meaning of this parameter.- allow_partial_matchbool
Consult
is_match()
for the meaning of this parameter.- require_uniquebool
If this is True, the method will return the one and only instance found matching
criterion
. If no instance, or multiple instances are found, this method throws. If this parameter is False, the method returns a tuple of matching plugin instances.
- Raises
PluginNotFound: if
require_unique
is True, and either no instance or multiple instances matchcriterion
.- Returns
A tuple of
Plugin
instances ifrequire_unique
is False, a single instance otherwise.
-
classmethod
is_match
(plugin: kegtap.plugin.Plugin, criterion: Union[str, Type[kegtap.plugin.Plugin]], allow_partial_match=True)¶ Tries to match
plugin
against the matchcriterion
.- pluginPlugin
Plugin
instance to test againstcriterion
.- criterionstr or subclass of Plugin
If this parameter is a string, it will be matched against the
Plugin.import_name()
ofplugin
. The matching behavior depends onallow_partial_match
. If this parameter is atype
,plugin
will match positively only if it’s a subclass ofcriterion
.- allow_partial_matchbool
If this parameter is true and
criterion
is astr
, it sufficies forcriterion
to be one ore more non-consecutive parts of the import path ofplugin
, as inis_ordered_subsequence()
; e.g.'b.d'
is a partial match fora.b.c.d.EPlugin
. Ifcriterion
is atype
, this parameter has no effect.
Examples
class MyPlugin(Plugin): pass instance = MyPlugin() PluginPool.is_match(instance, MyPlugin) # True PluginPool.is_match(instance, 'MyPlugin', allow_partial_match=True) # True PluginPool.is_match(instance, Plugin) # True instance.import_name # 'kegtap_plugins.my_module.MyPlugin' PluginPool.is_match(instance, 'my_module', allow_partial_match=True) # True
-
remove_active_plugin
(plugin: kegtap.plugin.Plugin)¶ Removes
plugin
from the current list.- Raises
KeyError: if a plugin with the same
Plugin.import_name()
is not in the current list.
-
-
kegtap.plugin.
find_active
(criterion: Union[str, Type[PluginT]], allow_partial_match=True, require_unique=True) → Union[PluginT, Tuple[PluginT]]¶ Queries
PluginPool.find()
on the unique instance ofPluginPool
that contains all active plugins in the current process. This method is used to find active plugins; to find known (i.e. registered) plugins, refer tokegtap.plugin.get_registered()
andkegtap.plugin.find_registered()
.- Raises
PluginNotFound: if
require_unique
is True, and either no instance or multiple instances matchcriterion
.
Examples
# Retrieve the unique instance of MyPlugin find_active(MyPlugin) # Will throw if not found # Search for all instances of subclasses of MyPlugin: find_active(MyPlugin, require_unique=False) # Returns a tuple of instances # Search for all plugins with "redis" in their names: find_active("redis", require_unique=False, allow_partial_match=True) # Tuple of instances # The unique subclasses of TaskBrokerPlugin that also has Redis in the name find_active(TaskBrokerPlugin) # Throws if not found
See also
-
kegtap.plugin.
find_registered
(plugin_name: str, allow_partial_match: bool = True) → Optional[Type[kegtap.plugin.Plugin]]¶ Searches the known rgistered plugins in
get_registered()
for one matching the givenplugin_name
.This method deals with registered plugins, i.e. known but not necessarily instantiated. If you want to retrieve an active instance of a certain plugin, refer to
kegtap.plugin.find_active()
andkegtap.plugin.PluginPool
.- plugin_namestr
The plugin name to find. It must be a fully qualified import name, e.g.
"kegtap_plugins.redis.RedisPlugin"
. If you setallow_partial_match
to True, it is allowed to be also part of the fully qualified import name.- allow_partial_matchbool
If set to True,
plugin_name
can also match partially the plugin name, using the rules ofis_ordered_subsequence()
. This means that the dot-separated parts ofplugin_name
match any plugin that has the same parts in the same order, e.g.'b.d'
matches'a.b.c.d.EPlugin'
.
- Raises
KeyError: if more than one registered plugin class match
plugin_name
.- Returns
A single
kegtap.plugin.Plugin
class if found, otherwiseNone
.
-
kegtap.plugin.
get_registered
() → Iterable[Type[kegtap.plugin.Plugin]]¶ Yields a range of
kegtap.plugin.Plugin
classes that have been registered. This method deals with registered plugins, i.e. known but not necessarily instantiated. If you want to retrieve an active instance of a certain plugin, refer tokegtap.plugin.find_active()
andkegtap.plugin.PluginPool
.
-
kegtap.plugin.
register
(plugin_cls: Type[kegtap.plugin.Plugin])¶ Decorator for subclasses of
kegtap.plugin.Plugin
. You must register a plugin for Kegtap to pick it up and run it. This is intended to prevent asbtract base classes from becoming active plugins at runtime.
kegtap.task module¶
-
class
kegtap.task.
Task
(fn: Callable[[…], Union[bool, Awaitable[bool]]], args: Optional[Tuple] = None, kwargs: Optional[OrderedDict] = None, uuid: Optional[uuid.UUID] = None, retries: int = 0)¶ Bases:
object
Represents a task, that is, a callable function together with its arguments, uniquely identified by a
uuid
. It can be used to wrap a normal function or a coroutine, but both are expected to return a boolean expressing whether the operation succeeded. A task has a certain number ofretries
associated, which represents how many times the scheduler will attempt this task until it succeeds.A task is intended to be serializable using
pickle
, so do not wrap functions that may not be picklable (e.g.classmethod
works, but bound methods do not).-
__init__
(fn: Callable[[…], Union[bool, Awaitable[bool]]], args: Optional[Tuple] = None, kwargs: Optional[OrderedDict] = None, uuid: Optional[uuid.UUID] = None, retries: int = 0)¶ Constructs a new task.
- fnfunction or coroutine
The main function to call when this task is executed. Can be either a function or a coroutine, but it must return a boolean to express whether the task succeeded or not.
- argstuple
Positional arguments with which to call
fn
, if any.- kwargsdict
Keyword arguments with which to call
fn
, if any.- uuidUUID
UUID for this task; if omitted, a new one will be generated.
- retriesint
Number of attempts allowed for this task, as in
retries
.
-
next_try
() → kegtap.task.Task¶ Generates a task identical to the current one, but with one less
retries
.
-
property
retries
¶ The maximum number of times the task is allowed to fail before the scheduler gives up on it. This can be reduced by generating a new task with
next_try()
.
-
property
uuid
¶ A unique identifier for this task. Same tasks with different retries may share the same UUID.
-
-
class
kegtap.task.
TaskBrokerBase
¶ Bases:
abc.ABC
Abstract class describing a task broker, which interfaces to storage. A broker is responsible of saving and retrieving tasks from persistent storage. Every time a task is created, it is pushed trough a broker via
push()
, so that it can be restored even after the program is restarted. Analogously, when a task is run in the current event loop, it is popped from storage viapop()
(and then pushed again if it fails and has enoughTask.retries
). A given task can be pushed in an arbitrary queue, identified by its name. The broker should be able to retrieve all active queues and their tasks.Subclasses of this this class may implement all the methods as coroutines.
-
abstract
pop
(queue: str, task_uuid: uuid.UUID) → Union[kegtap.task.Task, None, Awaitable[Optional[kegtap.task.Task]]]¶ Looks up a task by
task_uuid
in the givenqueue
. If it finds any, it removes it from the queue and returns it.- queuestr
The name of the queue for the task. If the queue does not exist, the method returns
None
.- task_uuidUUID
The UUID of the task to retrieve. This may be one of those returned by
queue_uuids()
. If there is no task under such UUID, the method returnsNone
.
- Returns
A task from
queue
with the giventask_uuid
, if found, otherwiseNone
. If it returns aTask
, this is guaranteed to have been removed from the queue.
Note
Subclasses should take care when implementing this method: depending on the event loop implementation and application configuration,
pop()
can be called simultaneously from multiple threads. If the sameTask
is returned from multiple threads, it will be executed multiple times. Hence, this method should synchronize access to the queue, unless for the specific application tasks are idempotent and thus it is fine to run a task several times.
-
abstract
push
(queue: str, task: kegtap.task.Task)¶ Saves
task
intoqueue
.- queuestr
The name of the queue for the task. If the queue does not exist, it is created.
- taskTask
The task to save into the queue. The task may be persisted via e.g.
pickle
.
Note
Subclasses should take care when implementing this method as a coroutine: calls to other coroutines should be shielded using
asyncio.shield
, to guarantee that theTask
reaches the storage even if the event loop is being shut down.
-
abstract
queue_uuids
(queue: str) → Union[Iterable[uuid.UUID], Awaitable[Iterable[uuid.UUID]]]¶ Obtains the list of tasks in a given queue. Tasks here are identified exclusively by their
Task.uuid
.- queuestr
The name of the queue. This will be one of those returned by
queues()
.
- Returns
A list of UUID corresponding to the tasks in the queue
queue
.
-
abstract
queues
() → Union[Iterable[str], Awaitable[Iterable[str]]]¶ Obtains the list of known queue names. Queue names are freely chosen when the task is pushed via
push()
. TheTaskDispatcher
will call this only upon startup to repopulate the event loop with all tasks. Subsequently to retrieving a list of queues, it will inspect the queues usingqueue_uuids()
.- Returns
A list of strings, the names of the known queues.
-
abstract
-
class
kegtap.task.
TaskBrokerPlugin
(*args, **kwargs)¶ Bases:
abc.ABC
,kegtap.plugin.Plugin
Base abstract class for a
kegtap.plugin.Plugin
that provides the functionality of aTaskBrokerBase
. It is intended that other plugins use aTaskBrokerPlugin
to enqueue their tasks for execution. Sincekegtap.plugin.find_active()
is compatible with subclasses, other plugins do not need to worry about which broker is currently active, but rather look for this abstract base class into the current active plugin pool.This class wraps around a
TaskDispatcher
, which is started when the plugin is activated and stopped when it is deactivated. Subclasses should implement the single method_build_broker()
.Example usage
# Intended usage from the perspective of another plugin @register class MyPlugin(Plugin): @taskify def _long_operation(self, arg: int): pass def trigger_long_operation(self): # Plugins can be agnosic w.r.t. which TaskBrokerPlugin is really active: broker_plugin = find_active(TaskBrokerPlugin) # ^^^^^^^^^^^^^^^^^^^^^^^^^^^ # This will return any active subclass of TaskBrokerPlugin broker_plugin.push_task('my_queue', self._long_operation, 42, retries=2)
-
abstract
_build_broker
() → kegtap.task.TaskBrokerBase¶ Construct the implementation of
TaskBrokerBase
. This will be called upon constrution.
-
property
broker
¶ The broker offered by this plugin.
-
property
num_awaiting_tasks
¶ Returns the number of scheduled tasks awaiting to be executed, as in
TaskDispatcher.__len__()
.
-
async
on_activate
(*args, **kwargs)¶ Calls
TaskDispatcher.start()
.
-
async
on_deactivate
(*args, **kwargs)¶ Calls
TaskDispatcher.stop()
and awaitsTaskDispatcher.complete()
.
-
push_task
(queue: str, fn: Callable[[…], Union[bool, Awaitable[bool]]], args: Optional[Tuple] = None, kwargs: Optional[OrderedDict] = None, retries: int = 0) → _asyncio.Future¶ Enqueues a new task; this is dentical to
TaskDispatcher.append()
; refer to its documentation.See also
-
abstract
-
class
kegtap.task.
TaskDispatcher
(broker: kegtap.task.TaskBrokerBase)¶ Bases:
object
Main class that is responsible of receiving tasks, scheduling them into the event loop, recovering tasks stored in the broker, and executing them.
This class needs a
TaskBrokerBase
instance to work. When it is stared usingstart()
, it will read all theTask
known by the broker, and schedule them all in the event loop. When a task is about to be executed, it is removed from the broker viaTaskBrokerBase.pop()
. New tasks are first stashed viaTaskBrokerBase.push()
and, if the dispatcher isrunning
, immediately scheduled. Tasks that fail and have retries left, will be rescheduled. Tasks are executed only as long as the dispatcher isrunning
.The dispatcher shields some of its coroutines using
asyncio.shield
to guarantee that if a task is appended, it is pushed by theTaskBrokerBase
even if the event loop is being shut down.-
__init__
(broker: kegtap.task.TaskBrokerBase)¶ Constructs a new dispatcher around
broker
. The newly constructed dispatcher will not be running and will not query thebroker
untilstart()
is called.- brokerTaskBrokerBase
The instance reponsible for permanent
Task
storage.
-
__len__
() → int¶ The number of
Task
that are scheduled in the event loop and are waiting to be executed.
-
append
(queue: str, fn: Callable[[…], Union[bool, Awaitable[bool]]], args: Optional[Tuple] = None, kwargs: Optional[OrderedDict] = None, retries: int = 0) → _asyncio.Future¶ Adds a new task to the
queue
, and if the dispatcher isrunning
, immediately schedules it for execution. This will callTaskBrokerBase.push()
; the pushing operation is shielded usingasyncio.shield
to guarantee that the task enters the persistent storage even if the event loop is being shut down.- queuestr
The name of the queue where to push the task. This can be chosen arbitrarily by the user.
- fnfunction or coroutine
The function that actually perform this task. This can be either a function or a coroutine, but should return a boolean expressing whether the task succeeded.
- argstuple
Tuple of positional arguments to use in the invocation of
fn
, if any.- kwargsdict
Dictionary of keyword arguments to use in the invocation of
fn
, if any.- retriesint
Maximum number of times this task is allowed to fail.
- Returns
A
asyncio.Future
that yields theTask::uuid
of the newly createdTask
, once theTaskBrokerBase.push()
operation is completed.
Note
fn
,args
,kwargs
may need to be serialized by the broker, therefore they should be serializable. By default, bound methods are not serializable via e.g.pickle
. If your task is an instance method of akegtap.plugin.Plugin
, wrap it usingtaskify()
to be able to use it directly inappend()
.
-
async
complete
()¶ Waits until all currently scheduled
Task
are complete. This method can be used to wait until all scheduled tasks are removed from the event loop or executed, afterstop()
is called. Note that if the dispatcher isrunning
, awaiting completion will only await the tasks scheduled so far, and not any other task that may be scheduled in the future.
-
property
running
¶ Determines whether this
TaskDispatcher
is executing tasks (a stopped dispatcher will still accept tasks viaappend()
, but will not execute any).
-
start
() → Optional[_asyncio.Task]¶ Starts scheduling
Task
into the event loop. Calling this method multiple times does nothing. The first performed operation is to fetch all persisted tasks from theTaskBrokerBase
, and to schedule them in the event loop.- Returns
The asyncio.Task corresponding to the operation of fetching and scheduling all tasks in the broker. If the dispatcher is already running, it returns
None
.
-
stop
()¶ Stops executing tasks. Calling this method multiple times has not effect. Tasks that are already scheduled and will run after a call to
stop()
, will simply be skipped. The user can check whether the dispatcher is fully halted by either checking wetherlen(this) == 0
, or better, by awaitingcomplete()
.
-
-
kegtap.task.
taskify
(unbound_method)¶ Makes an instance method of a subclass of
kegtap.plugin.Plugin
serializable, which can then be used as thefn
argument ofTaskBrokerPlugin.push_task()
orTaskDispatcher.append()
(tasks must be serialized e.g. viapickle
, and bound methods are not serializable by default).This works by simply rebinding the method to the current active instance of the plugin, therefore works only with subclasses of
kegtap.plugin.Plugin
. Note that it will also not work if the plugin is not active when the method is called.Example usage
@register class MyPluginWithTasks(Plugin): @taskify def _long_operation(self, arg): pass # This does the same as `taskify` does: @classmethod def _equivalent_long_operation(cls, arg): find_active(cls)._long_operation(arg) def trigger_long_operation(self): broker_plugin = find_active(TaskBrokerPlugin) # The same result: broker_plugin.push_task('queue', MyPluginWithTasks._equivalent_long_operation, 22) broker_plugin.push_task('queue', self_long_operation, 22)
kegtap.config module¶
-
class
kegtap.config.
Config
(d: Dict = None, root_path: Optional[Union[str, int, Iterable[Union[int, str]]]] = None)¶ Bases:
object
A dictionary-like object that contains configuration keys. A
Config
instance carries behaves like a top-level object, but carries information about its own path, therefore it is possible to obtain “subconfigs” that preserve the full path information for debugging.This class can cast values and offer default values for config attributes via
get()
. It is also possible to get a read-only view of the data and validate it against a Pydantic model object withmodel_view()
.-
__init__
(d: Dict = None, root_path: Optional[Union[str, int, Iterable[Union[int, str]]]] = None)¶ Constructs a new
Config
object withd
as data payload and withroot_path
as root path.
-
get
(path: Union[str, int, Iterable[Union[int, str]]], cast: type = None, default: Any = <class 'starlette.config.undefined'>) → Any¶ Traverses the given
path
, and returns the corresponding value. Attempts a cast if possible, and if no default was provided, throws.- pathConfigPath or (sequence of) str and int
The path of the value to retrieve, as in
ConfigPath
.- castConfig, or type, or callable
The expected type of the value identified by
path
. This can be any callable object or type. A cast will be performed ifisinstance(value, cast)
is false andvalue
is None. In the special case wherecast
is a subclass ofConfig
, the returnedConfig
object will keep track of the correct path, i.e. will havepath
as itsroot_path
(or rather,self.root_path + path
).- defaultAny
The default value to return if the path could not be traversed because of missing keys or objects. Note that
cast
is applied afterdefault
, thereforedefault
could be casted into an object of typecast
. This also implies thatdefault
is not returned in case of failed cast.
- Returns
The value found at
path
, as an instance ofcast
, if specified, or None.- Raises
KeyError: when there is no value at
path
anddefault
was not provided. ValueError: whencast
was specified and the cast failed.
cfg = Config({'plugin': {'url': 'https://localhost'}}) cfg.get('plugin.url', str) # Gives 'https://localhost' # Extract a sub-cfg for 'plugin' sub_cfg = cfg.get('plugin', Config, {}) sub_cfg.get('url') # Gives 'https://localhost' # Define a model to conform to class ConfigModel(BaseModel): url: str view = sub_cfg.model_view(ConfigModel) view.url # 'https://localhost'
-
items
()¶ As in
dict.items()
.
-
keys
()¶ As in
dict.keys()
.
-
model_view
(model_class: Type[ConfigModelT]) → ConfigModelT¶ Tries to conform the current
Config
object to the given Pydantic data model. This method will attempt to cast the internal data to the model. In case of validation errors, the error message is gracefully formatted to contain the path of the value where validation failed.- model_classsubclass of
pydantic.BaseModel
This is the class that represents the Pydantic model for the configuration. If validation succeeds, the returned value is guaranteed to be an instance of
model_class
.
- Returns
An instance of
model_class
containing a shallow copy of the data in the currentConfig
.- Raises
pydantic.error_wrappers.ValidationError: this method reformats the validation error from Pydantic to report the path where validation failed.
- model_classsubclass of
-
property
root_path
¶ The path of this config. For a top-level
Config
, this is empty. For a “subconfig”, this corresponds to the path of the instance in the top level object.
-
values
()¶ As in
dict.values()
.
-
-
class
kegtap.config.
ConfigPath
(path_or_key_like: Optional[Union[str, int, Iterable[Union[int, str]]]] = None)¶ Bases:
object
Represents a path in a hierarchy of objects, e.g.
a.b[2].c
. Corresponds to a sequence of__getitem__
operations on objects that behave like the builtindict
andlist
.This class is a container of elements that are keys in the object hierarchy. These can be either
str
orint
. An empty path is a valid path with no elements.# Get a ConfigPath from a string p = ConfigPath('plugins.Plugin.entries[2]') len(p) # Gives 4 list(p) # Gives ['plugins', 'Plugin', 'entries', 2] p[1] # Gives 'Plugin' q = p + 'foo.bar' str(q) # 'plugins.Plugin.entries[2].foo.bar'
-
__init__
(path_or_key_like: Optional[Union[str, int, Iterable[Union[int, str]]]] = None)¶ Constructs a new config path given the pieces
path_or_key_like
.- path_or_key_likeConfigPath or (sequence of) str and int
- Currently this parameter supports the following values:
str
, in this case it is interpreted as the string representation of a path, and it is parsed byparse()
.int
, in this case it is interpreted as the 1-length path"[x]"
, which consist of getting an item at the given index.None
, corresponds to the empty path that identifies the root object.An iterable sequence of
str
andint
, as returned byparse()
. In this case, these are taken as the components of the config path.
-
__add__
(other: Union[str, int, Iterable[Union[int, str]]])¶ Concatenate a path with another.
- otherConfigPath or (sequence of) str and int
Another config path, or any argument that can be used to construct a config path.
- Returns
A new config path given by chaining
self
withother
.
p = ConfigPath('a') + 'b[2]' + ['c', 'd', 3, 'e'] str(p) # 'a.b[2].c.d[3].e'
-
classmethod
parse
(path: str) → Iterable[Union[int, str]]¶ Parse a path in a hiearchy of objects, where entries in
dict
are preceded by.
as inobj.prop_name
and indices inlist
are wrapped in[]
, as inlst[4]
. This function yields all the pieces of the path, where entries indict
arestr
and indices inlist
areint
.- pathstr
A string representation of the path in the object hierarchy.
- Returns
A generator that yiels
str
orint
for each part of the path.
-
traverse
(o: Union[Dict, List, Tuple], root_path: Optional[Union[str, int, Iterable[Union[int, str]]]] = None) → Iterable[Tuple[kegtap.config.ConfigPath, Any]]¶ Recursively traverses the object hierarchy
o
along the path represented by this object. Every time an object is traversed, this method yields a 2-tuple(path, object)
, wherepath
is aConfigPath
instance representing the path up toobject
. Optionally prefixesroot_path
to all paths. Note that the root objecto
is never yielded.- odict-, list- or tuple-like objects
This is the object that will be traversed. If the current path is e.g.
"a.b[2].c"
, this method will eventually queryo['a']['b'][2]['c']
as last item.- root_pathConfigPath or (sequence of) str and int
This path, if specified, is prefixed to all the paths that this method yields, and also in the error messages produced if a key is not found.
- Returns
An iterable generator for as many elements as
len(self)
. Each element is a 2-tuple where the first element is theConfigPath
traversed so far, and the second element the corresponding object. Note that the root objecto
is never returned.- Raises
KeyError: raised when a
str
orint
key is not found.
o = {'a': {'b': [None, None, {'c': 42}]}} for path, value in ConfigPath('a.b[2].c').traverse(o, root_path='foo'): print(f'{path}: {value}') # This will produce # foo.a: {'b': [None, None, {'c': 42}]} # foo.a.b: [None, None, {'c': 42}] # foo.a.b[2]: {'c': 42} # foo.a.b[2].c: 42
-
-
class
kegtap.config.
ConfigPlugin
(*args, **kwds)¶ Bases:
typing.Generic
,kegtap.plugin.Plugin
Mixin that adds a
config
attribute to a plugin, that is automatically populated upon construction with plugin-specific configuration, extracted from Kegtap’s configuration file based on the plugin name. This Mixin also automatically validates and conforms to a Pydantic data model as specified in theConfigModel
class property.Example usage
import kegtap @kegtap.plugin.register class MailerPlugin(kegtap.config.ConfigPluginMixin['MailerPlugin.ConfigModel'], kegtap.plugin.Plugin): # Override the subclass model class ConfigModel(BaseModel): server: str port: Optional[conint(gt=0, le=65535)] = None # ...
-
ConfigModel
: Type[ConfigT]¶
-
property
config
¶ An instance of
ConfigModel
built from the plugin-specific configuration.
-
-
kegtap.config.
OwnerPath
¶ alias of
kegtap.config.ConfigPath
-
kegtap.config.
last
(sequence: Iterable)¶ Returns the last item of the given sequence.
sequence : any iterable object
- Returns
The last item of the sequence, if any, or None.