kegtap package

Module contents

Submodules

kegtap.plugin module

class kegtap.plugin.Plugin(*_, **__)

Bases: object

Base class for all Kegtap plugins. Subclasses may override if needed on_activate() and on_deactivate() to initialize or destroy plugin-specific data.

A plugin must be explicitly registered via kegtap.plugin.register. Usage example:

import kegtap.plugin

@kegtap.plugin.register
class MyPlugin(kegtap.plugin.Plugin):
    pass

# Somewhere else in the code, an instance of MyPlugin can be obtained via
instance = kegtap.plugin.find_active(MyPlugin)
property import_name

Returns the fully qualified name of the current plugin, including the module, e.g. 'kegtap_plugins.redis.RedisPlugin'.

on_activate(*_, **__)

This method is called only once when all the plugins have been constructed and Kegtap is being started. After a plugin is activated, it can be retrieved from the plugin pool via find_active(). This method sinks its arguments.

Note

Subclasses should always call super().on_activate() before performing their own initialization. Note that the initialization order is user-defined (based on the specified configuration), therefore it is not possible to access other plugins while activating the current one (as they may not be in the pool yet). It is fine to subclass on_activate with an async function, e.g.:

class MyPlugin(Plugin):
    async def on_activate(self):
        super().on_activate()
        await self._async_resource.produce_result()
on_deactivate(*_, **__)

This method is called only once when Kegtap is being shut down and the plugins need to be destroyed. After a plugin is deactivated, it cannot be found in the plugin pool by find_active() any longer. This method sinks its arguments.

Note

Subclasses should always call super().on_deactivate() after performing their own teardown. Note that the deactivation order is user-defined (based on the specified configuration), therefore it is not possible to access other plugins while activating the current one (as they may not be in the pool yet). It is fine to subclass on_deactivate with an async function, e.g.:

class MyPlugin(Plugin):
    async def on_deactivate(self):
        await self._async_resource.disconnect()
        super().on_deactivate()
exception kegtap.plugin.PluginNotFoundException(criterion: Union[Type[Plugin], str], found=typing.Tuple[typing.Type[ForwardRef('Plugin')]])

Bases: fastapi.exceptions.HTTPException

class kegtap.plugin.PluginPool

Bases: object

Class that holds the list of currently active plugins, e.g. plugins for which Plugin.on_activate() was called and Plugin.on_deactivate() was not called yet. There is a unique module-level instance of a PluginPool, kegtap.plugin.ACTIVE_PLUGINS_POOL; you can query it directly or via find_active(). Plugins are stored and identified uniquely by Plugin.import_name(). The main method of this class is find().

This class caches the queries that have been performed to reproduce quickly the same results.

This class only deals with active plugins; if you need to retrieve known (i.e. registered) plugins, refer to kegtap.plugin.get_registered() and kegtap.plugin.find_registered().

See also

find_active() for example usages.

add_active_plugin(plugin: kegtap.plugin.Plugin)

Adds plugin to the current list.

Raises

KeyError: if a plugin with the same Plugin.import_name() already exists.

find(criterion: Union[str, Type[PluginT]], allow_partial_match=True, require_unique=True) → Union[PluginT, Tuple[PluginT]]

Tests the active plugins against criterion, and returns those that match according to is_match(). If require_unique is True, this method will either return the unique plugin matching criterion, or will throw. See the documentation of find_active() for example usages.

The results of this query are cached for faster lookup.

criterionstr or subclass of Plugin

Consult is_match() for the meaning of this parameter.

allow_partial_matchbool

Consult is_match() for the meaning of this parameter.

require_uniquebool

If this is True, the method will return the one and only instance found matching criterion. If no instance, or multiple instances are found, this method throws. If this parameter is False, the method returns a tuple of matching plugin instances.

Raises

PluginNotFound: if require_unique is True, and either no instance or multiple instances match criterion.

Returns

A tuple of Plugin instances if require_unique is False, a single instance otherwise.

classmethod is_match(plugin: kegtap.plugin.Plugin, criterion: Union[str, Type[kegtap.plugin.Plugin]], allow_partial_match=True)

Tries to match plugin against the match criterion.

pluginPlugin

Plugin instance to test against criterion.

criterionstr or subclass of Plugin

If this parameter is a string, it will be matched against the Plugin.import_name() of plugin. The matching behavior depends on allow_partial_match. If this parameter is a type, plugin will match positively only if it’s a subclass of criterion.

allow_partial_matchbool

If this parameter is true and criterion is a str, it sufficies for criterion to be one ore more non-consecutive parts of the import path of plugin, as in is_ordered_subsequence(); e.g. 'b.d' is a partial match for a.b.c.d.EPlugin. If criterion is a type, this parameter has no effect.

Examples

class MyPlugin(Plugin):
    pass

instance = MyPlugin()

PluginPool.is_match(instance, MyPlugin)  # True
PluginPool.is_match(instance, 'MyPlugin', allow_partial_match=True)  # True
PluginPool.is_match(instance, Plugin)  # True

instance.import_name  # 'kegtap_plugins.my_module.MyPlugin'

PluginPool.is_match(instance, 'my_module', allow_partial_match=True)  # True
remove_active_plugin(plugin: kegtap.plugin.Plugin)

Removes plugin from the current list.

Raises

KeyError: if a plugin with the same Plugin.import_name() is not in the current list.

kegtap.plugin.find_active(criterion: Union[str, Type[PluginT]], allow_partial_match=True, require_unique=True) → Union[PluginT, Tuple[PluginT]]

Queries PluginPool.find() on the unique instance of PluginPool that contains all active plugins in the current process. This method is used to find active plugins; to find known (i.e. registered) plugins, refer to kegtap.plugin.get_registered() and kegtap.plugin.find_registered().

Raises

PluginNotFound: if require_unique is True, and either no instance or multiple instances match criterion.

Examples

# Retrieve the unique instance of MyPlugin
find_active(MyPlugin)  # Will throw if not found

# Search for all instances of subclasses of MyPlugin:
find_active(MyPlugin, require_unique=False)  # Returns a tuple of instances

# Search for all plugins with "redis" in their names:
find_active("redis", require_unique=False, allow_partial_match=True)  # Tuple of instances

# The unique subclasses of TaskBrokerPlugin that also has Redis in the name
find_active(TaskBrokerPlugin)  # Throws if not found
kegtap.plugin.find_registered(plugin_name: str, allow_partial_match: bool = True) → Optional[Type[kegtap.plugin.Plugin]]

Searches the known rgistered plugins in get_registered() for one matching the given plugin_name.

This method deals with registered plugins, i.e. known but not necessarily instantiated. If you want to retrieve an active instance of a certain plugin, refer to kegtap.plugin.find_active() and kegtap.plugin.PluginPool.

plugin_namestr

The plugin name to find. It must be a fully qualified import name, e.g. "kegtap_plugins.redis.RedisPlugin". If you set allow_partial_match to True, it is allowed to be also part of the fully qualified import name.

allow_partial_matchbool

If set to True, plugin_name can also match partially the plugin name, using the rules of is_ordered_subsequence(). This means that the dot-separated parts of plugin_name match any plugin that has the same parts in the same order, e.g. 'b.d' matches 'a.b.c.d.EPlugin'.

Raises

KeyError: if more than one registered plugin class match plugin_name.

Returns

A single kegtap.plugin.Plugin class if found, otherwise None.

kegtap.plugin.get_registered() → Iterable[Type[kegtap.plugin.Plugin]]

Yields a range of kegtap.plugin.Plugin classes that have been registered. This method deals with registered plugins, i.e. known but not necessarily instantiated. If you want to retrieve an active instance of a certain plugin, refer to kegtap.plugin.find_active() and kegtap.plugin.PluginPool.

kegtap.plugin.register(plugin_cls: Type[kegtap.plugin.Plugin])

Decorator for subclasses of kegtap.plugin.Plugin. You must register a plugin for Kegtap to pick it up and run it. This is intended to prevent asbtract base classes from becoming active plugins at runtime.

kegtap.task module

class kegtap.task.Task(fn: Callable[[], Union[bool, Awaitable[bool]]], args: Optional[Tuple] = None, kwargs: Optional[OrderedDict] = None, uuid: Optional[uuid.UUID] = None, retries: int = 0)

Bases: object

Represents a task, that is, a callable function together with its arguments, uniquely identified by a uuid. It can be used to wrap a normal function or a coroutine, but both are expected to return a boolean expressing whether the operation succeeded. A task has a certain number of retries associated, which represents how many times the scheduler will attempt this task until it succeeds.

A task is intended to be serializable using pickle, so do not wrap functions that may not be picklable (e.g. classmethod works, but bound methods do not).

__init__(fn: Callable[[], Union[bool, Awaitable[bool]]], args: Optional[Tuple] = None, kwargs: Optional[OrderedDict] = None, uuid: Optional[uuid.UUID] = None, retries: int = 0)

Constructs a new task.

fnfunction or coroutine

The main function to call when this task is executed. Can be either a function or a coroutine, but it must return a boolean to express whether the task succeeded or not.

argstuple

Positional arguments with which to call fn, if any.

kwargsdict

Keyword arguments with which to call fn, if any.

uuidUUID

UUID for this task; if omitted, a new one will be generated.

retriesint

Number of attempts allowed for this task, as in retries.

next_try()kegtap.task.Task

Generates a task identical to the current one, but with one less retries.

property retries

The maximum number of times the task is allowed to fail before the scheduler gives up on it. This can be reduced by generating a new task with next_try().

property uuid

A unique identifier for this task. Same tasks with different retries may share the same UUID.

class kegtap.task.TaskBrokerBase

Bases: abc.ABC

Abstract class describing a task broker, which interfaces to storage. A broker is responsible of saving and retrieving tasks from persistent storage. Every time a task is created, it is pushed trough a broker via push(), so that it can be restored even after the program is restarted. Analogously, when a task is run in the current event loop, it is popped from storage via pop() (and then pushed again if it fails and has enough Task.retries). A given task can be pushed in an arbitrary queue, identified by its name. The broker should be able to retrieve all active queues and their tasks.

Subclasses of this this class may implement all the methods as coroutines.

abstract pop(queue: str, task_uuid: uuid.UUID) → Union[kegtap.task.Task, None, Awaitable[Optional[kegtap.task.Task]]]

Looks up a task by task_uuid in the given queue. If it finds any, it removes it from the queue and returns it.

queuestr

The name of the queue for the task. If the queue does not exist, the method returns None.

task_uuidUUID

The UUID of the task to retrieve. This may be one of those returned by queue_uuids(). If there is no task under such UUID, the method returns None.

Returns

A task from queue with the given task_uuid, if found, otherwise None. If it returns a Task, this is guaranteed to have been removed from the queue.

Note

Subclasses should take care when implementing this method: depending on the event loop implementation and application configuration, pop() can be called simultaneously from multiple threads. If the same Task is returned from multiple threads, it will be executed multiple times. Hence, this method should synchronize access to the queue, unless for the specific application tasks are idempotent and thus it is fine to run a task several times.

abstract push(queue: str, task: kegtap.task.Task)

Saves task into queue.

queuestr

The name of the queue for the task. If the queue does not exist, it is created.

taskTask

The task to save into the queue. The task may be persisted via e.g. pickle.

Note

Subclasses should take care when implementing this method as a coroutine: calls to other coroutines should be shielded using asyncio.shield, to guarantee that the Task reaches the storage even if the event loop is being shut down.

abstract queue_uuids(queue: str) → Union[Iterable[uuid.UUID], Awaitable[Iterable[uuid.UUID]]]

Obtains the list of tasks in a given queue. Tasks here are identified exclusively by their Task.uuid.

queuestr

The name of the queue. This will be one of those returned by queues().

Returns

A list of UUID corresponding to the tasks in the queue queue.

abstract queues() → Union[Iterable[str], Awaitable[Iterable[str]]]

Obtains the list of known queue names. Queue names are freely chosen when the task is pushed via push(). The TaskDispatcher will call this only upon startup to repopulate the event loop with all tasks. Subsequently to retrieving a list of queues, it will inspect the queues using queue_uuids().

Returns

A list of strings, the names of the known queues.

class kegtap.task.TaskBrokerPlugin(*args, **kwargs)

Bases: abc.ABC, kegtap.plugin.Plugin

Base abstract class for a kegtap.plugin.Plugin that provides the functionality of a TaskBrokerBase. It is intended that other plugins use a TaskBrokerPlugin to enqueue their tasks for execution. Since kegtap.plugin.find_active() is compatible with subclasses, other plugins do not need to worry about which broker is currently active, but rather look for this abstract base class into the current active plugin pool.

This class wraps around a TaskDispatcher, which is started when the plugin is activated and stopped when it is deactivated. Subclasses should implement the single method _build_broker().

Example usage

# Intended usage from the perspective of another plugin
@register
class MyPlugin(Plugin):
    @taskify
    def _long_operation(self, arg: int):
        pass

    def trigger_long_operation(self):
        # Plugins can be agnosic w.r.t. which TaskBrokerPlugin is really active:
        broker_plugin = find_active(TaskBrokerPlugin)
        #               ^^^^^^^^^^^^^^^^^^^^^^^^^^^
        # This will return any active subclass of TaskBrokerPlugin
        broker_plugin.push_task('my_queue', self._long_operation, 42, retries=2)
abstract _build_broker()kegtap.task.TaskBrokerBase

Construct the implementation of TaskBrokerBase. This will be called upon constrution.

property broker

The broker offered by this plugin.

property num_awaiting_tasks

Returns the number of scheduled tasks awaiting to be executed, as in TaskDispatcher.__len__().

async on_activate(*args, **kwargs)

Calls TaskDispatcher.start().

async on_deactivate(*args, **kwargs)

Calls TaskDispatcher.stop() and awaits TaskDispatcher.complete().

push_task(queue: str, fn: Callable[[], Union[bool, Awaitable[bool]]], args: Optional[Tuple] = None, kwargs: Optional[OrderedDict] = None, retries: int = 0) → _asyncio.Future

Enqueues a new task; this is dentical to TaskDispatcher.append(); refer to its documentation.

class kegtap.task.TaskDispatcher(broker: kegtap.task.TaskBrokerBase)

Bases: object

Main class that is responsible of receiving tasks, scheduling them into the event loop, recovering tasks stored in the broker, and executing them.

This class needs a TaskBrokerBase instance to work. When it is stared using start(), it will read all the Task known by the broker, and schedule them all in the event loop. When a task is about to be executed, it is removed from the broker via TaskBrokerBase.pop(). New tasks are first stashed via TaskBrokerBase.push() and, if the dispatcher is running, immediately scheduled. Tasks that fail and have retries left, will be rescheduled. Tasks are executed only as long as the dispatcher is running.

The dispatcher shields some of its coroutines using asyncio.shield to guarantee that if a task is appended, it is pushed by the TaskBrokerBase even if the event loop is being shut down.

__init__(broker: kegtap.task.TaskBrokerBase)

Constructs a new dispatcher around broker. The newly constructed dispatcher will not be running and will not query the broker until start() is called.

brokerTaskBrokerBase

The instance reponsible for permanent Task storage.

__len__() → int

The number of Task that are scheduled in the event loop and are waiting to be executed.

append(queue: str, fn: Callable[[], Union[bool, Awaitable[bool]]], args: Optional[Tuple] = None, kwargs: Optional[OrderedDict] = None, retries: int = 0) → _asyncio.Future

Adds a new task to the queue, and if the dispatcher is running, immediately schedules it for execution. This will call TaskBrokerBase.push(); the pushing operation is shielded using asyncio.shield to guarantee that the task enters the persistent storage even if the event loop is being shut down.

queuestr

The name of the queue where to push the task. This can be chosen arbitrarily by the user.

fnfunction or coroutine

The function that actually perform this task. This can be either a function or a coroutine, but should return a boolean expressing whether the task succeeded.

argstuple

Tuple of positional arguments to use in the invocation of fn, if any.

kwargsdict

Dictionary of keyword arguments to use in the invocation of fn, if any.

retriesint

Maximum number of times this task is allowed to fail.

Returns

A asyncio.Future that yields the Task::uuid of the newly created Task, once the TaskBrokerBase.push() operation is completed.

Note

fn, args, kwargs may need to be serialized by the broker, therefore they should be serializable. By default, bound methods are not serializable via e.g. pickle. If your task is an instance method of a kegtap.plugin.Plugin, wrap it using taskify() to be able to use it directly in append().

async complete()

Waits until all currently scheduled Task are complete. This method can be used to wait until all scheduled tasks are removed from the event loop or executed, after stop() is called. Note that if the dispatcher is running, awaiting completion will only await the tasks scheduled so far, and not any other task that may be scheduled in the future.

property running

Determines whether this TaskDispatcher is executing tasks (a stopped dispatcher will still accept tasks via append(), but will not execute any).

start() → Optional[_asyncio.Task]

Starts scheduling Task into the event loop. Calling this method multiple times does nothing. The first performed operation is to fetch all persisted tasks from the TaskBrokerBase, and to schedule them in the event loop.

Returns

The asyncio.Task corresponding to the operation of fetching and scheduling all tasks in the broker. If the dispatcher is already running, it returns None.

stop()

Stops executing tasks. Calling this method multiple times has not effect. Tasks that are already scheduled and will run after a call to stop(), will simply be skipped. The user can check whether the dispatcher is fully halted by either checking wether len(this) == 0, or better, by awaiting complete().

kegtap.task.taskify(unbound_method)

Makes an instance method of a subclass of kegtap.plugin.Plugin serializable, which can then be used as the fn argument of TaskBrokerPlugin.push_task() or TaskDispatcher.append() (tasks must be serialized e.g. via pickle, and bound methods are not serializable by default).

This works by simply rebinding the method to the current active instance of the plugin, therefore works only with subclasses of kegtap.plugin.Plugin. Note that it will also not work if the plugin is not active when the method is called.

Example usage

@register
class MyPluginWithTasks(Plugin):
    @taskify
    def _long_operation(self, arg):
        pass

    # This does the same as `taskify` does:
    @classmethod
    def _equivalent_long_operation(cls, arg):
        find_active(cls)._long_operation(arg)

    def trigger_long_operation(self):
        broker_plugin = find_active(TaskBrokerPlugin)
        # The same result:
        broker_plugin.push_task('queue', MyPluginWithTasks._equivalent_long_operation, 22)
        broker_plugin.push_task('queue', self_long_operation, 22)

kegtap.config module

class kegtap.config.Config(d: Dict = None, root_path: Optional[Union[str, int, Iterable[Union[int, str]]]] = None)

Bases: object

A dictionary-like object that contains configuration keys. A Config instance carries behaves like a top-level object, but carries information about its own path, therefore it is possible to obtain “subconfigs” that preserve the full path information for debugging.

This class can cast values and offer default values for config attributes via get(). It is also possible to get a read-only view of the data and validate it against a Pydantic model object with model_view().

__init__(d: Dict = None, root_path: Optional[Union[str, int, Iterable[Union[int, str]]]] = None)

Constructs a new Config object with d as data payload and with root_path as root path.

ddict

The data contained in this Config object.

root_pathConfigPath or (sequence of) str and int

If this is a “subconfig” of a top-level object, this is path of this Config object.

get(path: Union[str, int, Iterable[Union[int, str]]], cast: type = None, default: Any = <class 'starlette.config.undefined'>) → Any

Traverses the given path, and returns the corresponding value. Attempts a cast if possible, and if no default was provided, throws.

pathConfigPath or (sequence of) str and int

The path of the value to retrieve, as in ConfigPath.

castConfig, or type, or callable

The expected type of the value identified by path. This can be any callable object or type. A cast will be performed if isinstance(value, cast) is false and value is None. In the special case where cast is a subclass of Config, the returned Config object will keep track of the correct path, i.e. will have path as its root_path (or rather, self.root_path + path).

defaultAny

The default value to return if the path could not be traversed because of missing keys or objects. Note that cast is applied after default, therefore default could be casted into an object of type cast. This also implies that default is not returned in case of failed cast.

Returns

The value found at path, as an instance of cast, if specified, or None.

Raises

KeyError: when there is no value at path and default was not provided. ValueError: when cast was specified and the cast failed.

cfg = Config({'plugin': {'url': 'https://localhost'}})
cfg.get('plugin.url', str)  # Gives 'https://localhost'

# Extract a sub-cfg for 'plugin'
sub_cfg = cfg.get('plugin', Config, {})
sub_cfg.get('url')  # Gives 'https://localhost'

# Define a model to conform to
class ConfigModel(BaseModel):
    url: str

view = sub_cfg.model_view(ConfigModel)
view.url  # 'https://localhost'
items()

As in dict.items().

keys()

As in dict.keys().

model_view(model_class: Type[ConfigModelT]) → ConfigModelT

Tries to conform the current Config object to the given Pydantic data model. This method will attempt to cast the internal data to the model. In case of validation errors, the error message is gracefully formatted to contain the path of the value where validation failed.

model_classsubclass of pydantic.BaseModel

This is the class that represents the Pydantic model for the configuration. If validation succeeds, the returned value is guaranteed to be an instance of model_class.

Returns

An instance of model_class containing a shallow copy of the data in the current Config.

Raises

pydantic.error_wrappers.ValidationError: this method reformats the validation error from Pydantic to report the path where validation failed.

property root_path

The path of this config. For a top-level Config, this is empty. For a “subconfig”, this corresponds to the path of the instance in the top level object.

values()

As in dict.values().

class kegtap.config.ConfigPath(path_or_key_like: Optional[Union[str, int, Iterable[Union[int, str]]]] = None)

Bases: object

Represents a path in a hierarchy of objects, e.g. a.b[2].c. Corresponds to a sequence of __getitem__ operations on objects that behave like the builtin dict and list.

This class is a container of elements that are keys in the object hierarchy. These can be either str or int. An empty path is a valid path with no elements.

# Get a ConfigPath from a string
p = ConfigPath('plugins.Plugin.entries[2]')

len(p)  # Gives 4

list(p)  # Gives ['plugins', 'Plugin', 'entries', 2]

p[1]  # Gives 'Plugin'

q = p + 'foo.bar'

str(q)  # 'plugins.Plugin.entries[2].foo.bar'
__init__(path_or_key_like: Optional[Union[str, int, Iterable[Union[int, str]]]] = None)

Constructs a new config path given the pieces path_or_key_like.

path_or_key_likeConfigPath or (sequence of) str and int
Currently this parameter supports the following values:
  • str, in this case it is interpreted as the string representation of a path, and it is parsed by parse().

  • int, in this case it is interpreted as the 1-length path "[x]", which consist of getting an item at the given index.

  • None, corresponds to the empty path that identifies the root object.

  • An iterable sequence of str and int, as returned by parse(). In this case, these are taken as the components of the config path.

__add__(other: Union[str, int, Iterable[Union[int, str]]])

Concatenate a path with another.

otherConfigPath or (sequence of) str and int

Another config path, or any argument that can be used to construct a config path.

Returns

A new config path given by chaining self with other.

p = ConfigPath('a') + 'b[2]' + ['c', 'd', 3, 'e']
str(p)  # 'a.b[2].c.d[3].e'
classmethod parse(path: str) → Iterable[Union[int, str]]

Parse a path in a hiearchy of objects, where entries in dict are preceded by . as in obj.prop_name and indices in list are wrapped in [], as in lst[4]. This function yields all the pieces of the path, where entries in dict are str and indices in list are int.

pathstr

A string representation of the path in the object hierarchy.

Returns

A generator that yiels str or int for each part of the path.

traverse(o: Union[Dict, List, Tuple], root_path: Optional[Union[str, int, Iterable[Union[int, str]]]] = None) → Iterable[Tuple[kegtap.config.ConfigPath, Any]]

Recursively traverses the object hierarchy o along the path represented by this object. Every time an object is traversed, this method yields a 2-tuple (path, object), where path is a ConfigPath instance representing the path up to object. Optionally prefixes root_path to all paths. Note that the root object o is never yielded.

odict-, list- or tuple-like objects

This is the object that will be traversed. If the current path is e.g. "a.b[2].c", this method will eventually query o['a']['b'][2]['c'] as last item.

root_pathConfigPath or (sequence of) str and int

This path, if specified, is prefixed to all the paths that this method yields, and also in the error messages produced if a key is not found.

Returns

An iterable generator for as many elements as len(self). Each element is a 2-tuple where the first element is the ConfigPath traversed so far, and the second element the corresponding object. Note that the root object o is never returned.

Raises

KeyError: raised when a str or int key is not found.

o = {'a': {'b': [None, None, {'c': 42}]}}
for path, value in ConfigPath('a.b[2].c').traverse(o, root_path='foo'):
    print(f'{path}: {value}')

# This will produce
# foo.a: {'b': [None, None, {'c': 42}]}
# foo.a.b: [None, None, {'c': 42}]
# foo.a.b[2]: {'c': 42}
# foo.a.b[2].c: 42
class kegtap.config.ConfigPlugin(*args, **kwds)

Bases: typing.Generic, kegtap.plugin.Plugin

Mixin that adds a config attribute to a plugin, that is automatically populated upon construction with plugin-specific configuration, extracted from Kegtap’s configuration file based on the plugin name. This Mixin also automatically validates and conforms to a Pydantic data model as specified in the ConfigModel class property.

Example usage

import kegtap

@kegtap.plugin.register
class MailerPlugin(kegtap.config.ConfigPluginMixin['MailerPlugin.ConfigModel'], kegtap.plugin.Plugin):
    # Override the subclass model
    class ConfigModel(BaseModel):
        server: str
        port: Optional[conint(gt=0, le=65535)] = None
    # ...
ConfigModel: Type[ConfigT]
property config

An instance of ConfigModel built from the plugin-specific configuration.

kegtap.config.OwnerPath

alias of kegtap.config.ConfigPath

kegtap.config.last(sequence: Iterable)

Returns the last item of the given sequence.

sequence : any iterable object

Returns

The last item of the sequence, if any, or None.