Prefect Cloud


Disclaimer

Your use of this download is governed by Stonebranch's Terms of Use.

Overview

Prefect is an open-source workflow management system designed to orchestrate data pipelines in Python. This integration provides the capability to trigger from UAC a Prefect Flow Run and monitor its execution providing useful metadata.

Key Features

FeatureDescription
Launch Flow Runs

Launch Flow Runs on Prefect Cloud and monitor until the flow gets completed with success or failure. Flow Run metadata can be provided as part of Extension Output.

Version Information

Template NameExtension NameVersionStatus
Prefectue-prefect1.0.0

Fixes and new Features are introduced.

Refer to Changelog for version history information.

Software Requirements

This integration requires a Universal Agent and a Python runtime to execute the Universal Task.

AreaDetails

Python Version

It requires Python 3.11.

Universal Agent Compatibility

  • Compatible with Universal Agent for Windows x64 and version >= 7.6.0.0.

  • Compatible with Universal Agent for Linux and version >= 7.6.0.0.

Universal Controller Compatibility

Universal Controller Version >= 7.6.0.0.

Network and Connectivity

Network connectivity towards Prefect Cloud is required.

Prefect Cloud and Rest API versions

Compatible with Prefect Cloud version 2.18.1 with REST API version 0.8.4 . It should be compatible on later versions as long as backwards compatibility is guaranteed on the API level.

Supported Actions

Action: Prefect Cloud - Launch Flow Run

This action allows to launch a Flow Run on Prefect Cloud. The task author has to provide the required Credentials, the Workspace ID and the Deployment ID that relates to the Flow and some additional parameters related to the flow execution (more information can be found on the Fields Description section). Task author can decide whether the task will just launch the flow, or in addition wait until a final success or failure state is reached. The last execution Flow Run metadata can be published as part of the Extension Output, while the Flow Run ID, the Flow Run Name and the Flow Run State can be visible directly as output fields during task instance execution.

Configuration examples

Simple Launch Flow Execution using Workspace ID or Name and Deployment ID or Name as dynamic choice fields to retrieve the necessary names from the Prefect Cloud Platform.

Simple Launch Flow Execution using Workspace ID or Name and Deployment ID or Name as IDs that can be copy pasted from Prefect Console. For example purposes, dummy IDs are used.

Launch Flow by providing Flow Parameters and Tags. In this case it is assumed that the Flow expects a Boolean and an integer parameter.

Launch Flow and wait until the Flow ends in Success or in Failure. The latest Flow Run metadata is configured to be included as part of the Extension Output.

Action Output

Output TypeDescriptionExamples

EXTENSION

The extension output provides the following information:

  • exit_code, status_description: General info regarding the task execution. For more information users can refer to the exit code table.
  • invocation.fields: The task configuration used for this task execution.
  • result.flow_run_info: The latest Flow Run information provided by Prefect during the execution of the task. Populated in case task is configured to publish this information on Extension Output.
  • result.errors: List of errors that might have occurred during execution.
Successful Scenario with latest Flow Run metadata as part of the Extension Output
{
    "exit_code": 0,
    "status_description": "Task executed successfully.",
    "invocation": {
        "fields": {...}
    },
    "result": {
        "flow_run_info": {
            "id": "c5feefe0-e932-4f1e-9055-03155561938d",
            "created": "2024-04-30T09:41:40.633550+00:00",
            "updated": "2024-04-30T09:41:40.628595+00:00",
            "name": "mydummyname",
            "flow_id": "6342cf22-2baa-4b4f-8c7f-85d731055af8",
            "state_id": "70736b0e-75e2-4156-8bca-8a21f4c538e4",
            "deployment_id": "ef8baf82-a14b-472f-9f8d-5424b66f232d",
            "work_queue_id": null,
            "work_queue_name": null,
            "flow_version": null,
            "deployment_version": "49e24154c00009d471e26e915271e743",
            "parameters": {
                "fail": false,
                "sleeptime": 5
            },
            "idempotency_key": null,
            "context": {
                "context1": "value1",
                "context2": "value2"
            },
            "empirical_policy": {
                "max_retries": 0,
                "retry_delay_seconds": 0,
                "retries": null,
                "retry_delay": null,
                "pause_keys": [],
                "resuming": false
            },
            "tags": [
                "telis 1",
                "telis 2",
                "testing"
            ],
            "parent_task_run_id": null,
            "state_type": "SCHEDULED",
            "state_name": "Scheduled",
            "run_count": 0,
            "expected_start_time": "2024-04-30T09:41:40.633473+00:00",
            "next_scheduled_start_time": "2024-04-30T09:41:40.633473+00:00",
            "start_time": null,
            "end_time": null,
            "total_run_time": 0,
            "estimated_run_time": 0,
            "estimated_start_time_delta": 0.049332,
            "auto_scheduled": false,
            "infrastructure_document_id": null,
            "infrastructure_pid": null,
            "created_by": {
                "id": "34e9ed52-f547-408e-b53d-02968ba97d64",
                "type": "USER",
                "display_value": "theta457-edison"
            },
            "work_pool_id": null,
            "work_pool_name": null,
            "state": {
                "id": "70736b0e-75e2-4156-8bca-8a21f4c538e4",
                "type": "SCHEDULED",
                "name": "Scheduled",
                "timestamp": "2024-04-30T09:41:40.633516+00:00",
                "message": null,
                "data": null,
                "state_details": {
                    "flow_run_id": "c5feefe0-e932-4f1e-9055-03155561938d",
                    "task_run_id": null,
                    "child_flow_run_id": null,
                    "scheduled_time": "2024-04-30T09:41:40.633473+00:00",
                    "cache_key": null,
                    "cache_expiration": null,
                    "untrackable_result": false,
                    "pause_timeout": null,
                    "pause_reschedule": false,
                    "pause_key": null,
                    "run_input_keyset": null,
                    "refresh_cache": null,
                    "retriable": null,
                    "transition_id": null,
                    "task_parameters_id": null
                }
            },
            "job_variables": {
                "var1": "value1",
                "var2": "value2"
            }
        },
        "errors": []
    }
}
Flow Reached in a Failed State
{
    "exit_code": 0,
    "status_description": "Execution Failed: Flow run Reached CRASHED State",
    "invocation": {
        "fields": {...}
    },
    "result": {
        "flow_run_info": {
            "id": "4cbbeb73-2438-4155-8e20-196e5f53c558",
            "created": "2024-04-30T09:47:08.664604+00:00",
            "updated": "2024-04-30T09:47:22.588768+00:00",
            "name": "mydummyname",
            "flow_id": "6342cf22-2baa-4b4f-8c7f-85d731055af8",
            "state_id": "969d5d10-9e43-48f3-ba36-b81c9808687f",
            "deployment_id": "ef8baf82-a14b-472f-9f8d-5424b66f232d",
            "work_queue_id": null,
            "work_queue_name": null,
            "flow_version": "49e24154c00009d471e26e915271e743",
            "deployment_version": "49e24154c00009d471e26e915271e743",
            "parameters": {
                "fail": true,
                "sleeptime": 5
            },
            "idempotency_key": null,
            "context": {
                "context1": "value1",
                "context2": "value2"
            },
            "empirical_policy": {
                "max_retries": 0,
                "retry_delay_seconds": 0,
                "retries": 0,
                "retry_delay": 0,
                "pause_keys": [],
                "resuming": false
            },
            "tags": [
                "testing",
                "telis 1",
                "telis 2"
            ],
            "parent_task_run_id": null,
            "state_type": "CRASHED",
            "state_name": "Crashed",
            "run_count": 1,
            "expected_start_time": "2024-04-30T09:47:08.664475+00:00",
            "next_scheduled_start_time": null,
            "start_time": "2024-04-30T09:47:17.117950+00:00",
            "end_time": "2024-04-30T09:47:22.582148+00:00",
            "total_run_time": 5.464198,
            "estimated_run_time": 5.464198,
            "estimated_start_time_delta": 8.453475,
            "auto_scheduled": false,
            "infrastructure_document_id": null,
            "infrastructure_pid": null,
            "created_by": {
                "id": "34e9ed52-f547-408e-b53d-02968ba97d64",
                "type": "USER",
                "display_value": "theta457-edison"
            },
            "work_pool_id": null,
            "work_pool_name": null,
            "state": {
                "id": "969d5d10-9e43-48f3-ba36-b81c9808687f",
                "type": "CRASHED",
                "name": "Crashed",
                "timestamp": "2024-04-30T09:47:22.582148+00:00",
                "message": "Execution was aborted by Python system exit call.",
                "data": null,
                "state_details": {
                    "flow_run_id": "4cbbeb73-2438-4155-8e20-196e5f53c558",
                    "task_run_id": null,
                    "child_flow_run_id": null,
                    "scheduled_time": null,
                    "cache_key": null,
                    "cache_expiration": null,
                    "untrackable_result": false,
                    "pause_timeout": null,
                    "pause_reschedule": false,
                    "pause_key": null,
                    "run_input_keyset": null,
                    "refresh_cache": null,
                    "retriable": null,
                    "transition_id": "bcd4ea0a-f7e5-40ef-a492-5ce17bdd8651",
                    "task_parameters_id": null
                }
            },
            "job_variables": {
                "var1": "value1",
                "var2": "value2"
            }
        },
        "errors": [
            "Flow run Reached CRASHED State"
        ]
    }
}

Input Fields

NameTypeDescriptionVersion Information
ActionChoice

The action performed upon the task execution.

  • Prefect Cloud - Launch Flow Run (default)

Introduced in 1.0.0

CredentialsCredential

Credentials for Prefect Cloud. The Credentials definition should be as follows.

  • Account ID as "Runtime User".

  • Account KEY as "Runtime Password".

Introduced in 1.0.0

Workspace ID or NameDynamic Choice

Defines the Workspace for Flow Run execution. Task authors can use directly the Workspace ID by copy pasting it from Prefect UI, enter the name of the Workspace or use the Dynamic Choice functionality to list the available Workspaces names that are applicable under the specific Account.

Introduced in 1.0.0

Deployment ID or NameDynamic Choice

Defines the Deployment of Flow. Task authors can use directly the Deployment ID by copy pasting it from Prefect UI, enter the name of the Deployment or use the Dynamic Choice functionality to list the available Deployments.

Introduced in 1.0.0

Flow Run Name

Text

The name of the flow run. Defaults to a random slug if not specified.

Introduced in 1.0.0

Additional Optional Parameters

Checkbox

Used to show/hide additional optional parameters for the Flow Run.

Introduced in 1.0.0

Parameters (JSON)

Large Text

This is a JSON structure that provides additional optional parameters that can be passed. A default value is provided so users can easily edit the required parameters if required, and it is not intended to be used as-is. The default value provides the following elements.

parameters: it is an object that represents the parameters of the Flow Run. As an object it can contain JSON elements of any type.

context: it is an object that represents the context of the Flow Run. As an object it can contain JSON elements of any type.

job_variables: it is an object that represents the job variables of the Flow Run. As an object it can contain JSON elements of any type.

tags: it is a list of strings that represents a list of tags associated with this Flow Run.

Available if “Additional Optional Parameters” is checked.

Introduced in 1.0.0

Wait for Success or Failure

Checkbox

If selected, the task will continue running until Job reaches the "COMPLETED" or "FAILED" state. "CRASHED", "CANCELLED" are also considered "FAILED" states.

Introduced in 1.0.0

Polling Interval (sec)

Integer

The polling interval in seconds for the Flow Run State check.

Defaults to 60 seconds.

Required when Wait for Success or Failure is checked.

Introduced in 1.0.0

Max Number of Polls

Integer

Maximum number of polls. Can be used to control the approximate expected duration of the Flow Run (in relation also to Polling Interval (sec)).

If left empty the UAC Task will poll indefinitely checking whether the Flow Run is completed or resulted in failure. If the Maximum Number of Polls is reached the exit code of the task will be 40.

Available if Wait for Success or Failure is checked.

Introduced in 1.0.0

Extension Output Options

Choice

User can decide the content of Extension Output using the following options.

  • “--None--”.

  • “Include latest Flow Run information”. In this case the latest Flow Run information retrieved from Prefect is available on the Extension output.

Introduced in 1.0.0

Output Fields

FieldTypeDescriptionIntroduced in Version

Flow Run ID

Text

The Flow Run ID provided by Prefect.

1.0.0

Flow Run Name

Text

The Flow Run Name. If input field Flow Run Name is not provided, then a random slug is generated by Prefect.

1.0.0

Flow Run State

Text

The latest Flow Run execution state retrieved by Prefect.

1.0.0

Cancelation and Rerun

In case of cancellation, the Flow Run is not canceled on Prefect. In case of Re-Run, the Flow Run will be re-executed from scratch.


Exit Codes 

CodeStatusStatus DescriptionMeaning

0

Success

“Task executed successfully.“

Successful Execution.

  • In case Wait for Success or Failure input field is checked it means that the Flow Run ended up in a Success State.

  • In case Wait for Success or Failure input field is not checked it means that the Flow Run has been successfully triggered.

1Failure

“Execution Failed: <<Error Description>>”

Failed Execution.

  • In case Wait for Success or Failure input field is checked it means that the Flow Run ended up in a Failed State.

  • This error code also acts as a generic error code. Raised when error not falling into the other error scenarios.

20Failure

“Data Validation Error: <<Error Description>>“

Input fields validation error.

40Failure

“Polling Timeout: maximum polling timeout reached.“

The Flow Run is executed out of the normal duration boundaries.

STDOUT and STDERR

STDOUT is empty, while STDERR provides additional information to the user.

Backward compatibility is not guaranteed for the content of STDOUT/STDERR and can be changed in future versions without notice.

How To

Import Universal Template

To use the Universal Template, you first must perform the following steps.

  1. This Universal Task requires the Resolvable Credentials feature. Check that the Resolvable Credentials Permitted system property has been set to true.

  2. To import the Universal Template into your Controller, follow these instructions.

  3. When the files have been imported successfully, refresh the Universal Templates list; the Universal Template will appear on the list.

Modifications of this integration, applied by users or customers, before or after import, might affect the supportability of this integration. For more information refer to Integration Modifications.

Configure Universal Task

For a new Universal Task, create a new task, and enter the required input fields.

Integration Modifications 

Modifications applied by users or customers, before or after import, might affect the supportability of this integration. The following modifications are discouraged to retain the support level as applied for this integration.

  • Python code modifications should not be done.

  • Template Modifications

    • General Section

      • "Name", "Extension", "Variable Prefix", and "Icon" should not be changed.

    • Universal Template Details Section

      • "Template Type", "Agent Type", "Send Extension Variables", and "Always Cancel on Force Finish" should not be changed.

    • Result Processing Defaults Section

      • Success and Failure Exit codes should not be changed.

      • Success and Failure Output processing should not be changed.

    • Fields Restriction Section 

                     The setup of the template does not impose any restrictions. However, concerning the "Exit Code Processing Fields" section.

                      i. Success/Failure exit codes need to be respected.

                      ii. In principle, as STDERR and STDOUT outputs can change in follow-up releases of this integration, they should not be considered as a reliable source for determining the success or failure of a task.

Users and customers are encouraged to report defects, or feature requests at Stonebranch Support Desk.

Document References

This document references the following documents:

Document LinkDescription
Universal Templates

User documentation for creating, working with and understanding Universal Templates and Integrations.

Universal Tasks

User documentation for creating Universal Tasks in the Universal Controller user interface.

Changelog 

ue-prefect-1.0.0 (2024-07-19)

Initial Release supporting Flow Run executions on Prefect Cloud.