kegtap package¶
Module contents¶
Submodules¶
kegtap.plugin module¶
-
class
kegtap.plugin.Plugin(*_, **__)¶ Bases:
objectBase class for all Kegtap plugins. Subclasses may override if needed
on_activate()andon_deactivate()to initialize or destroy plugin-specific data.A plugin must be explicitly registered via
kegtap.plugin.register. Usage example:import kegtap.plugin @kegtap.plugin.register class MyPlugin(kegtap.plugin.Plugin): pass # Somewhere else in the code, an instance of MyPlugin can be obtained via instance = kegtap.plugin.find_active(MyPlugin)
See also
-
property
import_name¶ Returns the fully qualified name of the current plugin, including the module, e.g.
'kegtap_plugins.redis.RedisPlugin'.
-
on_activate(*_, **__)¶ This method is called only once when all the plugins have been constructed and Kegtap is being started. After a plugin is activated, it can be retrieved from the plugin pool via
find_active(). This method sinks its arguments.Note
Subclasses should always call
super().on_activate()before performing their own initialization. Note that the initialization order is user-defined (based on the specified configuration), therefore it is not possible to access other plugins while activating the current one (as they may not be in the pool yet). It is fine to subclasson_activatewith an async function, e.g.:class MyPlugin(Plugin): async def on_activate(self): super().on_activate() await self._async_resource.produce_result()
-
on_deactivate(*_, **__)¶ This method is called only once when Kegtap is being shut down and the plugins need to be destroyed. After a plugin is deactivated, it cannot be found in the plugin pool by
find_active()any longer. This method sinks its arguments.Note
Subclasses should always call
super().on_deactivate()after performing their own teardown. Note that the deactivation order is user-defined (based on the specified configuration), therefore it is not possible to access other plugins while activating the current one (as they may not be in the pool yet). It is fine to subclasson_deactivatewith an async function, e.g.:class MyPlugin(Plugin): async def on_deactivate(self): await self._async_resource.disconnect() super().on_deactivate()
-
property
-
exception
kegtap.plugin.PluginNotFoundException(criterion: Union[Type[Plugin], str], found=typing.Tuple[typing.Type[ForwardRef('Plugin')]])¶ Bases:
fastapi.exceptions.HTTPException
-
class
kegtap.plugin.PluginPool¶ Bases:
objectClass that holds the list of currently active plugins, e.g. plugins for which
Plugin.on_activate()was called andPlugin.on_deactivate()was not called yet. There is a unique module-level instance of aPluginPool,kegtap.plugin.ACTIVE_PLUGINS_POOL; you can query it directly or viafind_active(). Plugins are stored and identified uniquely byPlugin.import_name(). The main method of this class isfind().This class caches the queries that have been performed to reproduce quickly the same results.
This class only deals with active plugins; if you need to retrieve known (i.e. registered) plugins, refer to
kegtap.plugin.get_registered()andkegtap.plugin.find_registered().See also
find_active()for example usages.-
add_active_plugin(plugin: kegtap.plugin.Plugin)¶ Adds
pluginto the current list.- Raises
KeyError: if a plugin with the same
Plugin.import_name()already exists.
-
find(criterion: Union[str, Type[PluginT]], allow_partial_match=True, require_unique=True) → Union[PluginT, Tuple[PluginT]]¶ Tests the active plugins against
criterion, and returns those that match according tois_match(). Ifrequire_uniqueis True, this method will either return the unique plugin matchingcriterion, or will throw. See the documentation offind_active()for example usages.The results of this query are cached for faster lookup.
- criterionstr or subclass of Plugin
Consult
is_match()for the meaning of this parameter.- allow_partial_matchbool
Consult
is_match()for the meaning of this parameter.- require_uniquebool
If this is True, the method will return the one and only instance found matching
criterion. If no instance, or multiple instances are found, this method throws. If this parameter is False, the method returns a tuple of matching plugin instances.
- Raises
PluginNotFound: if
require_uniqueis True, and either no instance or multiple instances matchcriterion.- Returns
A tuple of
Plugininstances ifrequire_uniqueis False, a single instance otherwise.
-
classmethod
is_match(plugin: kegtap.plugin.Plugin, criterion: Union[str, Type[kegtap.plugin.Plugin]], allow_partial_match=True)¶ Tries to match
pluginagainst the matchcriterion.- pluginPlugin
Plugininstance to test againstcriterion.- criterionstr or subclass of Plugin
If this parameter is a string, it will be matched against the
Plugin.import_name()ofplugin. The matching behavior depends onallow_partial_match. If this parameter is atype,pluginwill match positively only if it’s a subclass ofcriterion.- allow_partial_matchbool
If this parameter is true and
criterionis astr, it sufficies forcriterionto be one ore more non-consecutive parts of the import path ofplugin, as inis_ordered_subsequence(); e.g.'b.d'is a partial match fora.b.c.d.EPlugin. Ifcriterionis atype, this parameter has no effect.
Examples
class MyPlugin(Plugin): pass instance = MyPlugin() PluginPool.is_match(instance, MyPlugin) # True PluginPool.is_match(instance, 'MyPlugin', allow_partial_match=True) # True PluginPool.is_match(instance, Plugin) # True instance.import_name # 'kegtap_plugins.my_module.MyPlugin' PluginPool.is_match(instance, 'my_module', allow_partial_match=True) # True
-
remove_active_plugin(plugin: kegtap.plugin.Plugin)¶ Removes
pluginfrom the current list.- Raises
KeyError: if a plugin with the same
Plugin.import_name()is not in the current list.
-
-
kegtap.plugin.find_active(criterion: Union[str, Type[PluginT]], allow_partial_match=True, require_unique=True) → Union[PluginT, Tuple[PluginT]]¶ Queries
PluginPool.find()on the unique instance ofPluginPoolthat contains all active plugins in the current process. This method is used to find active plugins; to find known (i.e. registered) plugins, refer tokegtap.plugin.get_registered()andkegtap.plugin.find_registered().- Raises
PluginNotFound: if
require_uniqueis True, and either no instance or multiple instances matchcriterion.
Examples
# Retrieve the unique instance of MyPlugin find_active(MyPlugin) # Will throw if not found # Search for all instances of subclasses of MyPlugin: find_active(MyPlugin, require_unique=False) # Returns a tuple of instances # Search for all plugins with "redis" in their names: find_active("redis", require_unique=False, allow_partial_match=True) # Tuple of instances # The unique subclasses of TaskBrokerPlugin that also has Redis in the name find_active(TaskBrokerPlugin) # Throws if not found
See also
-
kegtap.plugin.find_registered(plugin_name: str, allow_partial_match: bool = True) → Optional[Type[kegtap.plugin.Plugin]]¶ Searches the known rgistered plugins in
get_registered()for one matching the givenplugin_name.This method deals with registered plugins, i.e. known but not necessarily instantiated. If you want to retrieve an active instance of a certain plugin, refer to
kegtap.plugin.find_active()andkegtap.plugin.PluginPool.- plugin_namestr
The plugin name to find. It must be a fully qualified import name, e.g.
"kegtap_plugins.redis.RedisPlugin". If you setallow_partial_matchto True, it is allowed to be also part of the fully qualified import name.- allow_partial_matchbool
If set to True,
plugin_namecan also match partially the plugin name, using the rules ofis_ordered_subsequence(). This means that the dot-separated parts ofplugin_namematch any plugin that has the same parts in the same order, e.g.'b.d'matches'a.b.c.d.EPlugin'.
- Raises
KeyError: if more than one registered plugin class match
plugin_name.- Returns
A single
kegtap.plugin.Pluginclass if found, otherwiseNone.
-
kegtap.plugin.get_registered() → Iterable[Type[kegtap.plugin.Plugin]]¶ Yields a range of
kegtap.plugin.Pluginclasses that have been registered. This method deals with registered plugins, i.e. known but not necessarily instantiated. If you want to retrieve an active instance of a certain plugin, refer tokegtap.plugin.find_active()andkegtap.plugin.PluginPool.
-
kegtap.plugin.register(plugin_cls: Type[kegtap.plugin.Plugin])¶ Decorator for subclasses of
kegtap.plugin.Plugin. You must register a plugin for Kegtap to pick it up and run it. This is intended to prevent asbtract base classes from becoming active plugins at runtime.
kegtap.task module¶
-
class
kegtap.task.Task(fn: Callable[[…], Union[bool, Awaitable[bool]]], args: Optional[Tuple] = None, kwargs: Optional[OrderedDict] = None, uuid: Optional[uuid.UUID] = None, retries: int = 0)¶ Bases:
objectRepresents a task, that is, a callable function together with its arguments, uniquely identified by a
uuid. It can be used to wrap a normal function or a coroutine, but both are expected to return a boolean expressing whether the operation succeeded. A task has a certain number ofretriesassociated, which represents how many times the scheduler will attempt this task until it succeeds.A task is intended to be serializable using
pickle, so do not wrap functions that may not be picklable (e.g.classmethodworks, but bound methods do not).-
__init__(fn: Callable[[…], Union[bool, Awaitable[bool]]], args: Optional[Tuple] = None, kwargs: Optional[OrderedDict] = None, uuid: Optional[uuid.UUID] = None, retries: int = 0)¶ Constructs a new task.
- fnfunction or coroutine
The main function to call when this task is executed. Can be either a function or a coroutine, but it must return a boolean to express whether the task succeeded or not.
- argstuple
Positional arguments with which to call
fn, if any.- kwargsdict
Keyword arguments with which to call
fn, if any.- uuidUUID
UUID for this task; if omitted, a new one will be generated.
- retriesint
Number of attempts allowed for this task, as in
retries.
-
next_try() → kegtap.task.Task¶ Generates a task identical to the current one, but with one less
retries.
-
property
retries¶ The maximum number of times the task is allowed to fail before the scheduler gives up on it. This can be reduced by generating a new task with
next_try().
-
property
uuid¶ A unique identifier for this task. Same tasks with different retries may share the same UUID.
-
-
class
kegtap.task.TaskBrokerBase¶ Bases:
abc.ABCAbstract class describing a task broker, which interfaces to storage. A broker is responsible of saving and retrieving tasks from persistent storage. Every time a task is created, it is pushed trough a broker via
push(), so that it can be restored even after the program is restarted. Analogously, when a task is run in the current event loop, it is popped from storage viapop()(and then pushed again if it fails and has enoughTask.retries). A given task can be pushed in an arbitrary queue, identified by its name. The broker should be able to retrieve all active queues and their tasks.Subclasses of this this class may implement all the methods as coroutines.
-
abstract
pop(queue: str, task_uuid: uuid.UUID) → Union[kegtap.task.Task, None, Awaitable[Optional[kegtap.task.Task]]]¶ Looks up a task by
task_uuidin the givenqueue. If it finds any, it removes it from the queue and returns it.- queuestr
The name of the queue for the task. If the queue does not exist, the method returns
None.- task_uuidUUID
The UUID of the task to retrieve. This may be one of those returned by
queue_uuids(). If there is no task under such UUID, the method returnsNone.
- Returns
A task from
queuewith the giventask_uuid, if found, otherwiseNone. If it returns aTask, this is guaranteed to have been removed from the queue.
Note
Subclasses should take care when implementing this method: depending on the event loop implementation and application configuration,
pop()can be called simultaneously from multiple threads. If the sameTaskis returned from multiple threads, it will be executed multiple times. Hence, this method should synchronize access to the queue, unless for the specific application tasks are idempotent and thus it is fine to run a task several times.
-
abstract
push(queue: str, task: kegtap.task.Task)¶ Saves
taskintoqueue.- queuestr
The name of the queue for the task. If the queue does not exist, it is created.
- taskTask
The task to save into the queue. The task may be persisted via e.g.
pickle.
Note
Subclasses should take care when implementing this method as a coroutine: calls to other coroutines should be shielded using
asyncio.shield, to guarantee that theTaskreaches the storage even if the event loop is being shut down.
-
abstract
queue_uuids(queue: str) → Union[Iterable[uuid.UUID], Awaitable[Iterable[uuid.UUID]]]¶ Obtains the list of tasks in a given queue. Tasks here are identified exclusively by their
Task.uuid.- queuestr
The name of the queue. This will be one of those returned by
queues().
- Returns
A list of UUID corresponding to the tasks in the queue
queue.
-
abstract
queues() → Union[Iterable[str], Awaitable[Iterable[str]]]¶ Obtains the list of known queue names. Queue names are freely chosen when the task is pushed via
push(). TheTaskDispatcherwill call this only upon startup to repopulate the event loop with all tasks. Subsequently to retrieving a list of queues, it will inspect the queues usingqueue_uuids().- Returns
A list of strings, the names of the known queues.
-
abstract
-
class
kegtap.task.TaskBrokerPlugin(*args, **kwargs)¶ Bases:
abc.ABC,kegtap.plugin.PluginBase abstract class for a
kegtap.plugin.Pluginthat provides the functionality of aTaskBrokerBase. It is intended that other plugins use aTaskBrokerPluginto enqueue their tasks for execution. Sincekegtap.plugin.find_active()is compatible with subclasses, other plugins do not need to worry about which broker is currently active, but rather look for this abstract base class into the current active plugin pool.This class wraps around a
TaskDispatcher, which is started when the plugin is activated and stopped when it is deactivated. Subclasses should implement the single method_build_broker().Example usage
# Intended usage from the perspective of another plugin @register class MyPlugin(Plugin): @taskify def _long_operation(self, arg: int): pass def trigger_long_operation(self): # Plugins can be agnosic w.r.t. which TaskBrokerPlugin is really active: broker_plugin = find_active(TaskBrokerPlugin) # ^^^^^^^^^^^^^^^^^^^^^^^^^^^ # This will return any active subclass of TaskBrokerPlugin broker_plugin.push_task('my_queue', self._long_operation, 42, retries=2)
-
abstract
_build_broker() → kegtap.task.TaskBrokerBase¶ Construct the implementation of
TaskBrokerBase. This will be called upon constrution.
-
property
broker¶ The broker offered by this plugin.
-
property
num_awaiting_tasks¶ Returns the number of scheduled tasks awaiting to be executed, as in
TaskDispatcher.__len__().
-
async
on_activate(*args, **kwargs)¶ Calls
TaskDispatcher.start().
-
async
on_deactivate(*args, **kwargs)¶ Calls
TaskDispatcher.stop()and awaitsTaskDispatcher.complete().
-
push_task(queue: str, fn: Callable[[…], Union[bool, Awaitable[bool]]], args: Optional[Tuple] = None, kwargs: Optional[OrderedDict] = None, retries: int = 0) → _asyncio.Future¶ Enqueues a new task; this is dentical to
TaskDispatcher.append(); refer to its documentation.See also
-
abstract
-
class
kegtap.task.TaskDispatcher(broker: kegtap.task.TaskBrokerBase)¶ Bases:
objectMain class that is responsible of receiving tasks, scheduling them into the event loop, recovering tasks stored in the broker, and executing them.
This class needs a
TaskBrokerBaseinstance to work. When it is stared usingstart(), it will read all theTaskknown by the broker, and schedule them all in the event loop. When a task is about to be executed, it is removed from the broker viaTaskBrokerBase.pop(). New tasks are first stashed viaTaskBrokerBase.push()and, if the dispatcher isrunning, immediately scheduled. Tasks that fail and have retries left, will be rescheduled. Tasks are executed only as long as the dispatcher isrunning.The dispatcher shields some of its coroutines using
asyncio.shieldto guarantee that if a task is appended, it is pushed by theTaskBrokerBaseeven if the event loop is being shut down.-
__init__(broker: kegtap.task.TaskBrokerBase)¶ Constructs a new dispatcher around
broker. The newly constructed dispatcher will not be running and will not query thebrokeruntilstart()is called.- brokerTaskBrokerBase
The instance reponsible for permanent
Taskstorage.
-
__len__() → int¶ The number of
Taskthat are scheduled in the event loop and are waiting to be executed.
-
append(queue: str, fn: Callable[[…], Union[bool, Awaitable[bool]]], args: Optional[Tuple] = None, kwargs: Optional[OrderedDict] = None, retries: int = 0) → _asyncio.Future¶ Adds a new task to the
queue, and if the dispatcher isrunning, immediately schedules it for execution. This will callTaskBrokerBase.push(); the pushing operation is shielded usingasyncio.shieldto guarantee that the task enters the persistent storage even if the event loop is being shut down.- queuestr
The name of the queue where to push the task. This can be chosen arbitrarily by the user.
- fnfunction or coroutine
The function that actually perform this task. This can be either a function or a coroutine, but should return a boolean expressing whether the task succeeded.
- argstuple
Tuple of positional arguments to use in the invocation of
fn, if any.- kwargsdict
Dictionary of keyword arguments to use in the invocation of
fn, if any.- retriesint
Maximum number of times this task is allowed to fail.
- Returns
A
asyncio.Futurethat yields theTask::uuidof the newly createdTask, once theTaskBrokerBase.push()operation is completed.
Note
fn,args,kwargsmay need to be serialized by the broker, therefore they should be serializable. By default, bound methods are not serializable via e.g.pickle. If your task is an instance method of akegtap.plugin.Plugin, wrap it usingtaskify()to be able to use it directly inappend().
-
async
complete()¶ Waits until all currently scheduled
Taskare complete. This method can be used to wait until all scheduled tasks are removed from the event loop or executed, afterstop()is called. Note that if the dispatcher isrunning, awaiting completion will only await the tasks scheduled so far, and not any other task that may be scheduled in the future.
-
property
running¶ Determines whether this
TaskDispatcheris executing tasks (a stopped dispatcher will still accept tasks viaappend(), but will not execute any).
-
start() → Optional[_asyncio.Task]¶ Starts scheduling
Taskinto the event loop. Calling this method multiple times does nothing. The first performed operation is to fetch all persisted tasks from theTaskBrokerBase, and to schedule them in the event loop.- Returns
The asyncio.Task corresponding to the operation of fetching and scheduling all tasks in the broker. If the dispatcher is already running, it returns
None.
-
stop()¶ Stops executing tasks. Calling this method multiple times has not effect. Tasks that are already scheduled and will run after a call to
stop(), will simply be skipped. The user can check whether the dispatcher is fully halted by either checking wetherlen(this) == 0, or better, by awaitingcomplete().
-
-
kegtap.task.taskify(unbound_method)¶ Makes an instance method of a subclass of
kegtap.plugin.Pluginserializable, which can then be used as thefnargument ofTaskBrokerPlugin.push_task()orTaskDispatcher.append()(tasks must be serialized e.g. viapickle, and bound methods are not serializable by default).This works by simply rebinding the method to the current active instance of the plugin, therefore works only with subclasses of
kegtap.plugin.Plugin. Note that it will also not work if the plugin is not active when the method is called.Example usage
@register class MyPluginWithTasks(Plugin): @taskify def _long_operation(self, arg): pass # This does the same as `taskify` does: @classmethod def _equivalent_long_operation(cls, arg): find_active(cls)._long_operation(arg) def trigger_long_operation(self): broker_plugin = find_active(TaskBrokerPlugin) # The same result: broker_plugin.push_task('queue', MyPluginWithTasks._equivalent_long_operation, 22) broker_plugin.push_task('queue', self_long_operation, 22)
kegtap.config module¶
-
class
kegtap.config.Config(d: Dict = None, root_path: Optional[Union[str, int, Iterable[Union[int, str]]]] = None)¶ Bases:
objectA dictionary-like object that contains configuration keys. A
Configinstance carries behaves like a top-level object, but carries information about its own path, therefore it is possible to obtain “subconfigs” that preserve the full path information for debugging.This class can cast values and offer default values for config attributes via
get(). It is also possible to get a read-only view of the data and validate it against a Pydantic model object withmodel_view().-
__init__(d: Dict = None, root_path: Optional[Union[str, int, Iterable[Union[int, str]]]] = None)¶ Constructs a new
Configobject withdas data payload and withroot_pathas root path.
-
get(path: Union[str, int, Iterable[Union[int, str]]], cast: type = None, default: Any = <class 'starlette.config.undefined'>) → Any¶ Traverses the given
path, and returns the corresponding value. Attempts a cast if possible, and if no default was provided, throws.- pathConfigPath or (sequence of) str and int
The path of the value to retrieve, as in
ConfigPath.- castConfig, or type, or callable
The expected type of the value identified by
path. This can be any callable object or type. A cast will be performed ifisinstance(value, cast)is false andvalueis None. In the special case wherecastis a subclass ofConfig, the returnedConfigobject will keep track of the correct path, i.e. will havepathas itsroot_path(or rather,self.root_path + path).- defaultAny
The default value to return if the path could not be traversed because of missing keys or objects. Note that
castis applied afterdefault, thereforedefaultcould be casted into an object of typecast. This also implies thatdefaultis not returned in case of failed cast.
- Returns
The value found at
path, as an instance ofcast, if specified, or None.- Raises
KeyError: when there is no value at
pathanddefaultwas not provided. ValueError: whencastwas specified and the cast failed.
cfg = Config({'plugin': {'url': 'https://localhost'}}) cfg.get('plugin.url', str) # Gives 'https://localhost' # Extract a sub-cfg for 'plugin' sub_cfg = cfg.get('plugin', Config, {}) sub_cfg.get('url') # Gives 'https://localhost' # Define a model to conform to class ConfigModel(BaseModel): url: str view = sub_cfg.model_view(ConfigModel) view.url # 'https://localhost'
-
items()¶ As in
dict.items().
-
keys()¶ As in
dict.keys().
-
model_view(model_class: Type[ConfigModelT]) → ConfigModelT¶ Tries to conform the current
Configobject to the given Pydantic data model. This method will attempt to cast the internal data to the model. In case of validation errors, the error message is gracefully formatted to contain the path of the value where validation failed.- model_classsubclass of
pydantic.BaseModel This is the class that represents the Pydantic model for the configuration. If validation succeeds, the returned value is guaranteed to be an instance of
model_class.
- Returns
An instance of
model_classcontaining a shallow copy of the data in the currentConfig.- Raises
pydantic.error_wrappers.ValidationError: this method reformats the validation error from Pydantic to report the path where validation failed.
- model_classsubclass of
-
property
root_path¶ The path of this config. For a top-level
Config, this is empty. For a “subconfig”, this corresponds to the path of the instance in the top level object.
-
values()¶ As in
dict.values().
-
-
class
kegtap.config.ConfigPath(path_or_key_like: Optional[Union[str, int, Iterable[Union[int, str]]]] = None)¶ Bases:
objectRepresents a path in a hierarchy of objects, e.g.
a.b[2].c. Corresponds to a sequence of__getitem__operations on objects that behave like the builtindictandlist.This class is a container of elements that are keys in the object hierarchy. These can be either
strorint. An empty path is a valid path with no elements.# Get a ConfigPath from a string p = ConfigPath('plugins.Plugin.entries[2]') len(p) # Gives 4 list(p) # Gives ['plugins', 'Plugin', 'entries', 2] p[1] # Gives 'Plugin' q = p + 'foo.bar' str(q) # 'plugins.Plugin.entries[2].foo.bar'
-
__init__(path_or_key_like: Optional[Union[str, int, Iterable[Union[int, str]]]] = None)¶ Constructs a new config path given the pieces
path_or_key_like.- path_or_key_likeConfigPath or (sequence of) str and int
- Currently this parameter supports the following values:
str, in this case it is interpreted as the string representation of a path, and it is parsed byparse().int, in this case it is interpreted as the 1-length path"[x]", which consist of getting an item at the given index.None, corresponds to the empty path that identifies the root object.An iterable sequence of
strandint, as returned byparse(). In this case, these are taken as the components of the config path.
-
__add__(other: Union[str, int, Iterable[Union[int, str]]])¶ Concatenate a path with another.
- otherConfigPath or (sequence of) str and int
Another config path, or any argument that can be used to construct a config path.
- Returns
A new config path given by chaining
selfwithother.
p = ConfigPath('a') + 'b[2]' + ['c', 'd', 3, 'e'] str(p) # 'a.b[2].c.d[3].e'
-
classmethod
parse(path: str) → Iterable[Union[int, str]]¶ Parse a path in a hiearchy of objects, where entries in
dictare preceded by.as inobj.prop_nameand indices inlistare wrapped in[], as inlst[4]. This function yields all the pieces of the path, where entries indictarestrand indices inlistareint.- pathstr
A string representation of the path in the object hierarchy.
- Returns
A generator that yiels
strorintfor each part of the path.
-
traverse(o: Union[Dict, List, Tuple], root_path: Optional[Union[str, int, Iterable[Union[int, str]]]] = None) → Iterable[Tuple[kegtap.config.ConfigPath, Any]]¶ Recursively traverses the object hierarchy
oalong the path represented by this object. Every time an object is traversed, this method yields a 2-tuple(path, object), wherepathis aConfigPathinstance representing the path up toobject. Optionally prefixesroot_pathto all paths. Note that the root objectois never yielded.- odict-, list- or tuple-like objects
This is the object that will be traversed. If the current path is e.g.
"a.b[2].c", this method will eventually queryo['a']['b'][2]['c']as last item.- root_pathConfigPath or (sequence of) str and int
This path, if specified, is prefixed to all the paths that this method yields, and also in the error messages produced if a key is not found.
- Returns
An iterable generator for as many elements as
len(self). Each element is a 2-tuple where the first element is theConfigPathtraversed so far, and the second element the corresponding object. Note that the root objectois never returned.- Raises
KeyError: raised when a
strorintkey is not found.
o = {'a': {'b': [None, None, {'c': 42}]}} for path, value in ConfigPath('a.b[2].c').traverse(o, root_path='foo'): print(f'{path}: {value}') # This will produce # foo.a: {'b': [None, None, {'c': 42}]} # foo.a.b: [None, None, {'c': 42}] # foo.a.b[2]: {'c': 42} # foo.a.b[2].c: 42
-
-
class
kegtap.config.ConfigPlugin(*args, **kwds)¶ Bases:
typing.Generic,kegtap.plugin.PluginMixin that adds a
configattribute to a plugin, that is automatically populated upon construction with plugin-specific configuration, extracted from Kegtap’s configuration file based on the plugin name. This Mixin also automatically validates and conforms to a Pydantic data model as specified in theConfigModelclass property.Example usage
import kegtap @kegtap.plugin.register class MailerPlugin(kegtap.config.ConfigPluginMixin['MailerPlugin.ConfigModel'], kegtap.plugin.Plugin): # Override the subclass model class ConfigModel(BaseModel): server: str port: Optional[conint(gt=0, le=65535)] = None # ...
-
ConfigModel: Type[ConfigT]¶
-
property
config¶ An instance of
ConfigModelbuilt from the plugin-specific configuration.
-
-
kegtap.config.OwnerPath¶ alias of
kegtap.config.ConfigPath
-
kegtap.config.last(sequence: Iterable)¶ Returns the last item of the given sequence.
sequence : any iterable object
- Returns
The last item of the sequence, if any, or None.