Passing Payloads

This section shows how to pass outputs of one FuncX Function to another using Globus Flows.

Simple Payloads

Sometimes you may need a state that depends on the output from another state. This can either be to split up complex functions, or make FuncX Parallelize payloads into multiple simultanious tasks.

By default, all Gladier Tools take input from the main input source, defined as $.input. However, by using modifiers (see more in Flow Generation), this default can be changed to use the output of another funcx function.

@generate_flow_definition(modifiers={
  my_second_function: {'payload': '$.MyFirstFunction.details.result[0]'},
})
class MyTool(GladierBaseTool):
    funcx_functions = [
        my_first_function,
        my_second_function,
    ]

In the example above, the first function is given the full input to work with on $.input. The output of my_first_function will be produced with the name MyFirstFunction.details.result as a list of FuncX task results. By default, only one FuncX task is run per-function, so typically this will be a list with only one entry. The path $.MyFirstFunction.details.result[0] references the exact output returned by a single invocation of my_first_function.

Note

Gladier Automatically creates flow state names by translating them from snake case to upper camel case. For example, my_first_function results in the state name MyFirstFunction

When my_first_function finishes and my_second_function begins, it will be given the input stored in $.MyFirstFunction.details.result[0]. This value MUST be a dictionary containing expected parameters in my_second_function, otherwise a flow exception will be raised and the flow will be marked as a failure.

Warning

When using function outputs as payloads with ExceptionOnActionFailure: false, this can result in cascading failures where the stringified exception results are used as input to the next function. It’s recommended you either set ExceptionOnActionFailure: true or pass payloads as $.MyFirstFunction.details.

Multiple FuncX Tasks

FuncX is built to run many tasks in parallel. You can instruct Gladier to pass multiple task payloads with the tasks modifier. However, at this level FuncX also needs an expcilit FuncX endpoint and Function ID for each task it will process. It’s common to use one FuncX function to build the list of payloads to be run in parallel.

def parallel_workload_input_builder(funcx_endpoint_compute, parallel_workload_funcx_id, parallel_workloads, **data):
    return [{
        'endpoint': funcx_endpoint_compute,
        'function': parallel_workload_funcx_id,
        'payload': payload,
    } for payload in parallel_workloads]


def parallel_workload(name, **data):
    import time
    return f'{name} finished at {time.time()}!'


@generate_flow_definition(modifiers={
    parallel_workload: {'tasks': '$.ParallelWorkloadInputBuilder.details.result[0]'},
})
class ParallelWorkloadsTool(GladierBaseTool):
    funcx_functions = [
        parallel_workload_input_builder,
        parallel_workload,
    ]
    required_input = [
        'funcx_endpoint_compute',
        'parallel_workloads',
        'parallel_workload_funcx_id'
    ]

Above, the parallel_workload_input_builder function is run first and generates the list of FuncX tasks. This can be an arbitrarily long list determined at runtime. Each task in the list must contain three elements: endpoint, function and payload.

endpoint above is typically specified by the user at input time, and is by default funcx_endpoint_compute. But the FuncX function is updated by Gladier every change, and the name is determined automatically. By default, Gladier appends _funcx_id to the end of each of the funcx_function definitions and automaticaly adds them to $.input. parallel_workload_funcx_id can be determined above using this method, or one can verify via the flow output.

payload must be a dictonary containing keyword parameters for the function which match the function signature. This is similar to all other FuncX functions used in Gladier, which are called with all input data specified on $.input.

When parallel_workload runs, it will execute all tasks in parallel, or by any rules defined by your particular FuncX endpoint. Each of the outputs will be listed in $.ParallelWorkload.details.result once all tasks finish. If any task fails, a stack trace will be returned as a string. If all tasks fail, the flow will be marked as “FAILED”.

A full example of the Flow Definition as JSON output is below:

{
    "Comment": "Flow with states: ParallelWorkloadInputBuilder, ParallelWorkload",
    "StartAt": "ParallelWorkloadInputBuilder",
    "States": {
        "ParallelWorkloadInputBuilder": {
        "Comment": null,
        "Type": "Action",
        "ActionUrl": "https://automate.funcx.org",
        "ActionScope": "https://auth.globus.org/scopes/b3db7e59-a6f1-4947-95c2-59d6b7a70f8c/action_all",
        "ExceptionOnActionFailure": false,
        "Parameters": {
            "tasks": [
            {
                "endpoint.$": "$.input.funcx_endpoint_compute",
                "function.$": "$.input.parallel_workload_input_builder_funcx_id",
                "payload.$": "$.input"
            }
            ]
        },
        "ResultPath": "$.ParallelWorkloadInputBuilder",
        "WaitTime": 300,
        "Next": "ParallelWorkload"
        },
        "ParallelWorkload": {
        "Comment": null,
        "Type": "Action",
        "ActionUrl": "https://automate.funcx.org",
        "ActionScope": "https://auth.globus.org/scopes/b3db7e59-a6f1-4947-95c2-59d6b7a70f8c/action_all",
        "Parameters": {
            "tasks.$": "$.ParallelWorkloadInputBuilder.details.result[0]"
        },
        "ResultPath": "$.ParallelWorkload",
        "WaitTime": 300,
        "ExceptionOnActionFailure": true,
        "End": true
        }
    }
}

Parallel Processing Example

Below is a full runnable example, using the FuncX tutorial endpoint.

from gladier import GladierBaseClient, GladierBaseTool, generate_flow_definition
from pprint import pprint


def parallel_workload_input_builder(funcx_endpoint_compute, parallel_workload_funcx_id, parallel_workloads, **data):
    return [{
        'endpoint': funcx_endpoint_compute,
        'function': parallel_workload_funcx_id,
        'payload': payload,
    } for payload in parallel_workloads]


def parallel_workload(name, **data):
    import time
    return f'{name} finished at {time.time()}!'


@generate_flow_definition(modifiers={
    parallel_workload: {'tasks': '$.ParallelWorkloadInputBuilder.details.result[0]'},
})
class ParallelWorkloadsTool(GladierBaseTool):
    funcx_functions = [
        parallel_workload_input_builder,
        parallel_workload,
    ]
    required_input = [
        'funcx_endpoint_compute',
        'parallel_workloads',
        'parallel_workload_funcx_id'
    ]


@generate_flow_definition
class ParallelWorkloadsClient(GladierBaseClient):
    gladier_tools = [
        ParallelWorkloadsTool,
    ]


if __name__ == '__main__':
    flow_input = {
        'input': {
            'parallel_workloads': [
                {'name': 'foo'},
                {'name': 'bar'},
                {'name': 'baz'},
            ],
            'funcx_endpoint_compute': '553e7b64-0480-473c-beef-be762ba979a9',
        }
    }
    work_flow = ParallelWorkloadsClient()
    pprint(work_flow.flow_definition)

    flow = work_flow.run_flow(flow_input=flow_input)
    run_id = flow['run_id']
    work_flow.progress(run_id)
    pprint(work_flow.get_status(run_id))