API¶
Execute¶
datajet.execute
¶
Execute the resolvers in a data_map to return values for fields
requested.
Parameters: |
|
---|
Source code in datajet/datajet.py
def execute(data_map: Union[dict, DataJetMap], fields: list, context: dict = None) -> dict:
"""Execute the resolvers in a data_map to return values for `fields` requested.
Args:
data_map: A data_map
fields: A list of fields to return from the data map.
context: A dict of values to send to the data map as context.
"""
if isinstance(data_map, DataJetMap):
data_map = data_map.data_map
if context is not None:
data_map = copy.copy(data_map)
data_map.update(context)
data_map = _normalize_data_map(data_map)
if not _is_valid_normalized_data_map(data_map):
msg = _normalized_data_map_validation_error(data_map)
raise ValueError(msg)
dependencies_for_each_field = [_get_dependencies(data_map, field) for field in fields]
results = {}
for possible_dependency_paths_for_specific_field in dependencies_for_each_field:
possible_dependency_paths_for_specific_field = sorted(
possible_dependency_paths_for_specific_field, key=lambda x: len(x)
)
for dependency_path_for_specific_field in possible_dependency_paths_for_specific_field:
for dependency in reversed(dependency_path_for_specific_field):
if dependency in results:
continue
for d in data_map[dependency]:
inputs = d[IN]
if all(input_ in results for input_ in inputs):
f = d[F]
try:
result = f(*[results[in_] for in_ in inputs])
except RuntimeResolutionException:
continue
else:
results[dependency] = result
break
else:
# none of the paths to the dependency had inputs in the context and succeeded
# so, break out of 2nd for loop and start a different `dependency_path_for_specific_field`
break
else:
# break out of loop over possible paths if all dependencies in `dependency_path_for_specific_field` are resolved
break
else:
raise RuntimeResolutionException
for to_delete in set(results).difference(fields):
results.pop(to_delete)
return results
DataJetMap¶
datajet.DataJetMap
¶
Source code in datajet/_datamap.py
class DataJetMap(object):
def __init__(self):
self._map: dict = {}
def register(self, output: Optional[str] = None, inputs: Optional[List[str]] = None):
"""Decorator function to register the decorated function as a part of this datamap.
Args:
output: The output DataPoint identifier for the decorated function.
Defaults to the function name.
inputs: The DataPoint identifiers for the inputs into the resolver.
Defaults to the names of the arguments of the decorated function.
Example:
from datajet import DataJetMap
data_map = DataJetMap()
@data_map.register()
def sales():
return 4
@data_map.register()
def units():
return 2
@data_map.register()
def price(sales, units):
return sales/units
data_map.data_map
{'sales': [{'in': [], 'f': <function __main__.sales()>}],
'units': [{'in': [], 'f': <function __main__.units()>}],
'price': [{'in': ['sales', 'units'], 'f': <function __main__.price(sales, units)>}]}
"""
def func(f: Callable):
key = output if output is not None else f.__name__
resolver_list = self._map.setdefault(key, [])
if inputs is not None:
to_append = {IN: inputs, F: f}
else:
to_append = {F: f}
resolver_list.append(to_append)
return func
@property
def data_map(self):
return _normalize_data_map(self._map)
register(self, output=None, inputs=None)
¶
Decorator function to register the decorated function as a part of this datamap.
Parameters: |
|
---|
Examples:
from datajet import DataJetMap
data_map = DataJetMap()
@data_map.register() def sales(): return 4
@data_map.register() def units(): return 2
@data_map.register() def price(sales, units): return sales/units
data_map.data_map
{'sales': [{'in': [], 'f':
Source code in datajet/_datamap.py
def register(self, output: Optional[str] = None, inputs: Optional[List[str]] = None):
"""Decorator function to register the decorated function as a part of this datamap.
Args:
output: The output DataPoint identifier for the decorated function.
Defaults to the function name.
inputs: The DataPoint identifiers for the inputs into the resolver.
Defaults to the names of the arguments of the decorated function.
Example:
from datajet import DataJetMap
data_map = DataJetMap()
@data_map.register()
def sales():
return 4
@data_map.register()
def units():
return 2
@data_map.register()
def price(sales, units):
return sales/units
data_map.data_map
{'sales': [{'in': [], 'f': <function __main__.sales()>}],
'units': [{'in': [], 'f': <function __main__.units()>}],
'price': [{'in': ['sales', 'units'], 'f': <function __main__.price(sales, units)>}]}
"""
def func(f: Callable):
key = output if output is not None else f.__name__
resolver_list = self._map.setdefault(key, [])
if inputs is not None:
to_append = {IN: inputs, F: f}
else:
to_append = {F: f}
resolver_list.append(to_append)
return func
Common resolvers¶
datajet.common_resolvers.required_from_context
¶
Returns a resolver function that anticipates and requires input from the context.
Use this as a "placeholder" in your datamap for context.
Notes
Prefer to use this function over raising a RuntimeResolutionException
, as
the engine that powers datajet.execute
will optimize the search for
a valid data-path (to increase performance) if a context_input
is not given in the context.
Source code in datajet/common_resolvers.py
def required_from_context():
"""Returns a resolver function that anticipates and requires input from the context.
Use this as a "placeholder" in your datamap for context.
Notes:
Prefer to use this function over raising a `RuntimeResolutionException`, as
the engine that powers `datajet.execute` will optimize the search for
a valid data-path (to increase performance) if a `context_input` is not given in the context.
"""
return _REQUIRED_FROM_CONTEXT
datajet.common_resolvers.alias
¶
Returns a resolver function that acts as an alias to the node
.
Parameters: |
|
---|
Notes
Use the resolver output from this function to pass through the data from one node directly to another.
Source code in datajet/common_resolvers.py
def alias(datapoint: Hashable) -> List[dict]:
"""Returns a resolver function that acts as an alias to the `node`.
Args:
datapoint: The datapoint to alias.
Notes:
Use the resolver output from this function to pass through the data from one node directly to another.
"""
return [{IN: [datapoint], F: lambda x: x}]
datajet.common_resolvers.dict_resolver
¶
Returns a resolver function that looks up the resulting value from d
corresponding with the key output from input_datapoint
.
Parameters: |
|
---|
Notes
The resolver will raise RuntimeResolutionException if the key is not found in the dict at "resolution time."
Source code in datajet/common_resolvers.py
def dict_resolver(input_datapoint: Hashable, d: dict) -> List[dict]:
"""Returns a resolver function that looks up the resulting value from `d` corresponding with the key output from `input_datapoint`.
Args:
input_datapoint: The datapoint that will be looked up in `d` to find the value returned from this resolver.
d: The dict to lookup `input_datapoint` in.
Notes:
The resolver will raise RuntimeResolutionException if the key is not found in the dict at "resolution time."
"""
def _f(key):
try:
return d[key]
except KeyError:
raise RuntimeResolutionException
return [{IN: [input_datapoint], F: _f}]
Exceptions¶
datajet.exceptions.RuntimeResolutionException
¶
A exception was raised during execution of the datamap for fields.
Usage
Raise this exception inside a resolver to indicate to datajet that the resolution of the resolver is not possible with the given inputs. DataJet will catch and ignore this exception and proceed with other valid execution paths that return the requested fields in the DataMap if they exist.
Source code in datajet/exceptions.py
class RuntimeResolutionException(Exception):
"""A exception was raised during execution of the datamap for fields.
Usage:
Raise this exception inside a resolver to indicate to datajet that the resolution
of the resolver is not possible with the given inputs. DataJet will catch and ignore this exception
and proceed with other valid execution paths that return the requested fields in the DataMap if they exist.
"""
datajet.exceptions.PlanNotFoundError
¶
A valid plan was not found in the datamap to return the fields requested.
Notes
This is typically raised by DataJet when it is no viable paths (paths that do not raise RuntimeResolutionException
)
exist in the datamap to return the requested fields.
Source code in datajet/exceptions.py
class PlanNotFoundError(ValueError):
"""A valid plan was not found in the datamap to return the fields requested.
Notes:
This is typically raised by DataJet when it is no viable paths (paths that do not raise `RuntimeResolutionException`)
exist in the datamap to return the requested fields.
"""