API

Execute

datajet.execute

Execute the resolvers in a data_map to return values for fields requested.

Parameters:
  • data_map (Union[dict, datajet._datamap.DataJetMap]) – A data_map

  • fields (list) – A list of fields to return from the data map.

  • context (dict) – A dict of values to send to the data map as context.

Source code in datajet/datajet.py
def execute(data_map: Union[dict, DataJetMap], fields: list, context: dict = None) -> dict:
    """Execute the resolvers in a data_map to return values for `fields` requested.

    Args:
        data_map: A data_map
        fields: A list of fields to return from the data map.
        context: A dict of values to send to the data map as context.

    """

    if isinstance(data_map, DataJetMap):
        data_map = data_map.data_map

    if context is not None:
        data_map = copy.copy(data_map)
        data_map.update(context)

    data_map = _normalize_data_map(data_map)
    if not _is_valid_normalized_data_map(data_map):
        msg = _normalized_data_map_validation_error(data_map)
        raise ValueError(msg)

    dependencies_for_each_field = [_get_dependencies(data_map, field) for field in fields]

    results = {}
    for possible_dependency_paths_for_specific_field in dependencies_for_each_field:
        possible_dependency_paths_for_specific_field = sorted(
            possible_dependency_paths_for_specific_field, key=lambda x: len(x)
        )
        for dependency_path_for_specific_field in possible_dependency_paths_for_specific_field:
            for dependency in reversed(dependency_path_for_specific_field):
                if dependency in results:
                    continue
                for d in data_map[dependency]:
                    inputs = d[IN]
                    if all(input_ in results for input_ in inputs):
                        f = d[F]
                        try:
                            result = f(*[results[in_] for in_ in inputs])
                        except RuntimeResolutionException:
                            continue
                        else:
                            results[dependency] = result
                            break
                else:
                    # none of the paths to the dependency had inputs in the context and succeeded
                    # so, break out of 2nd for loop and start a different `dependency_path_for_specific_field`
                    break
            else:
                # break out of loop over possible paths if all dependencies in `dependency_path_for_specific_field` are resolved
                break
        else:
            raise RuntimeResolutionException
    for to_delete in set(results).difference(fields):
        results.pop(to_delete)
    return results



DataJetMap

datajet.DataJetMap

Source code in datajet/_datamap.py
class DataJetMap(object):
    def __init__(self):
        self._map: dict = {}

    def register(self, output: Optional[str] = None, inputs: Optional[List[str]] = None):
        """Decorator function to register the decorated function as a part of this datamap.

        Args:
            output: The output DataPoint identifier for the decorated function.
                    Defaults to the function name.

            inputs: The DataPoint identifiers for the inputs into the resolver.
                    Defaults to the names of the arguments of the decorated function.

        Example:
            from datajet import DataJetMap

            data_map = DataJetMap()

            @data_map.register()
            def sales():
                return 4

            @data_map.register()
            def units():
                return 2

            @data_map.register()
            def price(sales, units):
                return sales/units

            data_map.data_map
            {'sales': [{'in': [], 'f': <function __main__.sales()>}],
            'units': [{'in': [], 'f': <function __main__.units()>}],
            'price': [{'in': ['sales', 'units'], 'f': <function __main__.price(sales, units)>}]}

        """

        def func(f: Callable):
            key = output if output is not None else f.__name__
            resolver_list = self._map.setdefault(key, [])
            if inputs is not None:
                to_append = {IN: inputs, F: f}
            else:
                to_append = {F: f}

            resolver_list.append(to_append)

        return func

    @property
    def data_map(self):
        return _normalize_data_map(self._map)

register(self, output=None, inputs=None)

Decorator function to register the decorated function as a part of this datamap.

Parameters:
  • output (Optional[str]) – The output DataPoint identifier for the decorated function. Defaults to the function name.

  • inputs (Optional[List[str]]) – The DataPoint identifiers for the inputs into the resolver. Defaults to the names of the arguments of the decorated function.

Examples:

from datajet import DataJetMap

data_map = DataJetMap()

@data_map.register() def sales(): return 4

@data_map.register() def units(): return 2

@data_map.register() def price(sales, units): return sales/units

data_map.data_map {'sales': [{'in': [], 'f': }], 'units': [{'in': [], 'f': }], 'price': [{'in': ['sales', 'units'], 'f': }]}

Source code in datajet/_datamap.py
def register(self, output: Optional[str] = None, inputs: Optional[List[str]] = None):
    """Decorator function to register the decorated function as a part of this datamap.

    Args:
        output: The output DataPoint identifier for the decorated function.
                Defaults to the function name.

        inputs: The DataPoint identifiers for the inputs into the resolver.
                Defaults to the names of the arguments of the decorated function.

    Example:
        from datajet import DataJetMap

        data_map = DataJetMap()

        @data_map.register()
        def sales():
            return 4

        @data_map.register()
        def units():
            return 2

        @data_map.register()
        def price(sales, units):
            return sales/units

        data_map.data_map
        {'sales': [{'in': [], 'f': <function __main__.sales()>}],
        'units': [{'in': [], 'f': <function __main__.units()>}],
        'price': [{'in': ['sales', 'units'], 'f': <function __main__.price(sales, units)>}]}

    """

    def func(f: Callable):
        key = output if output is not None else f.__name__
        resolver_list = self._map.setdefault(key, [])
        if inputs is not None:
            to_append = {IN: inputs, F: f}
        else:
            to_append = {F: f}

        resolver_list.append(to_append)

    return func



Common resolvers

datajet.common_resolvers.required_from_context

Returns a resolver function that anticipates and requires input from the context.

Use this as a "placeholder" in your datamap for context.

Notes

Prefer to use this function over raising a RuntimeResolutionException, as the engine that powers datajet.execute will optimize the search for a valid data-path (to increase performance) if a context_input is not given in the context.

Source code in datajet/common_resolvers.py
def required_from_context():
    """Returns a resolver function that anticipates and requires input from the context.

    Use this as a "placeholder" in your datamap for context.

    Notes:
        Prefer to use this function over raising a `RuntimeResolutionException`, as
        the engine that powers `datajet.execute` will optimize the search for
        a valid data-path (to increase performance) if a `context_input` is not given in the context.

    """

    return _REQUIRED_FROM_CONTEXT


datajet.common_resolvers.alias

Returns a resolver function that acts as an alias to the node.

Parameters:
  • datapoint (Hashable) – The datapoint to alias.

Notes

Use the resolver output from this function to pass through the data from one node directly to another.

Source code in datajet/common_resolvers.py
def alias(datapoint: Hashable) -> List[dict]:
    """Returns a resolver function that acts as an alias to the `node`.

    Args:
        datapoint: The datapoint to alias.

    Notes:
        Use the resolver output from this function to pass through the data from one node directly to another.

    """
    return [{IN: [datapoint], F: lambda x: x}]


datajet.common_resolvers.dict_resolver

Returns a resolver function that looks up the resulting value from d corresponding with the key output from input_datapoint.

Parameters:
  • input_datapoint (Hashable) – The datapoint that will be looked up in d to find the value returned from this resolver.

  • d (dict) – The dict to lookup input_datapoint in.

Notes

The resolver will raise RuntimeResolutionException if the key is not found in the dict at "resolution time."

Source code in datajet/common_resolvers.py
def dict_resolver(input_datapoint: Hashable, d: dict) -> List[dict]:
    """Returns a resolver function that looks up the resulting value from `d` corresponding with the key output from `input_datapoint`.

    Args:
        input_datapoint: The datapoint that will be looked up in `d` to find the value returned from this resolver.
        d: The dict to lookup `input_datapoint` in.

    Notes:
        The resolver will raise RuntimeResolutionException if the key is not found in the dict at "resolution time."
    """

    def _f(key):
        try:
            return d[key]
        except KeyError:
            raise RuntimeResolutionException

    return [{IN: [input_datapoint], F: _f}]



Exceptions

datajet.exceptions.RuntimeResolutionException

A exception was raised during execution of the datamap for fields.

Usage

Raise this exception inside a resolver to indicate to datajet that the resolution of the resolver is not possible with the given inputs. DataJet will catch and ignore this exception and proceed with other valid execution paths that return the requested fields in the DataMap if they exist.

Source code in datajet/exceptions.py
class RuntimeResolutionException(Exception):
    """A exception was raised during execution of the datamap for fields.

    Usage:
        Raise this exception inside a resolver to indicate to datajet that the resolution
        of the resolver is not possible with the given inputs. DataJet will catch and ignore this exception
        and proceed with other valid execution paths that return the requested fields in the DataMap if they exist.

    """


datajet.exceptions.PlanNotFoundError

A valid plan was not found in the datamap to return the fields requested.

Notes

This is typically raised by DataJet when it is no viable paths (paths that do not raise RuntimeResolutionException) exist in the datamap to return the requested fields.

Source code in datajet/exceptions.py
class PlanNotFoundError(ValueError):
    """A valid plan was not found in the datamap to return the fields requested.

    Notes:
        This is typically raised by DataJet when it is no viable paths (paths that do not raise `RuntimeResolutionException`)
        exist in the datamap to return the requested fields.
    """

Keywords