Workflow Guide
This guide provides in-depth information on creating and managing workflows in Sparrow.jl.
Workflow Architecture
A Sparrow workflow consists of three main components:
- Workflow Type: A struct that holds configuration parameters
- Workflow Steps: Processing stages that transform data
- Step Functions: Implementations that perform the actual work
Creating Workflow Types
Using the @workflow_type Macro
The @workflow_type macro creates a new workflow type that automatically:
- Inherits from
SparrowWorkflow - Implements the dictionary interface
- Provides a keyword constructor
@workflow_type MyWorkflowThis expands to:
struct MyWorkflow <: SparrowWorkflow
params::Dict{String,Any}
end
MyWorkflow(; kwargs...) = MyWorkflow(Dict{String,Any}(string(k) => v for (k, v) in kwargs))Multiple Workflow Types
You can define multiple workflow types in the same file:
@workflow_types RadarQC RadarGrid RadarMergeManual Workflow Definition
For more control, you can define workflows manually:
struct CustomWorkflow <: SparrowWorkflow
params::Dict{String,Any}
function CustomWorkflow(; kwargs...)
params = Dict{String,Any}(string(k) => v for (k, v) in kwargs)
# Add validation
if !haskey(params, "required_param")
error("CustomWorkflow requires 'required_param'")
end
new(params)
end
endDefining Workflow Steps
Using @workflow_step
Steps are typically empty structs used for dispatch:
@workflow_step ConvertStep
@workflow_step QCStep
@workflow_step GridStepStep Ordering
Steps are defined as an ordered list in your workflow instance:
workflow = MyWorkflow(
# Format: (step_name, step_type, input_directory, archive)
steps = [
("convert", ConvertStep, "base_data", false),
("qc", QCStep, "convert", false),
("grid", GridStep, "qc", true)
],
# ... other params
)The order in the vector determines execution order. Each step receives output from the previous step as input.
Implementing Step Functions
Function Signature
Step functions must follow this signature:
function Sparrow.workflow_step(
workflow::YourWorkflowType,
::Type{YourStepType},
input_dir::String,
output_dir::String;
step_name::String="",
step_num::Int=0,
kwargs...
)
# Your implementation
return num_files_processed
endParameters
workflow: Your workflow instance (access parameters viaworkflow["key"])::Type{YourStepType}: Step type for dispatchinput_dir: Directory containing input files for this stepoutput_dir: Directory where output files should be writtenstep_name: Name of the step (from workflow definition)step_num: Step number in the workflow (1-indexed)kwargs...: Additional keyword arguments
Return Value
Step functions should return the number of files processed (or 0 if no files were processed).
Example Implementation
function Sparrow.workflow_step(workflow::MyWorkflow, ::Type{ConvertStep},
input_dir::String, output_dir::String;
step_name::String="", step_num::Int=0, kwargs...)
msg_info("Step $(step_num): $(step_name) - Converting files")
# Create output directory
mkpath(output_dir)
# Get workflow parameters
file_pattern = get_param(workflow, "file_pattern", "*.raw")
# Find input files
input_files = readdir(input_dir; join=true)
filter!(f -> occursin(Regex(file_pattern), f), input_files)
# Process each file
processed_count = 0
for input_file in input_files
try
output_file = joinpath(output_dir, basename(input_file) * ".nc")
# Your processing logic
convert_radar_file(input_file, output_file)
processed_count += 1
msg_debug("Converted $(basename(input_file))")
catch e
msg_warning("Failed to convert $(basename(input_file)): $(e)")
end
end
msg_info("Processed $(processed_count) files in step $(step_name)")
return processed_count
endWorkflow Parameters
Required Parameters
These parameters are required for the workflow system to function:
workflow = MyWorkflow(
# Directory structure
base_working_dir = "/path/to/temp", # Temporary working directory
base_archive_dir = "/path/to/archive", # Archived/processed files
base_data_dir = "/path/to/raw", # Raw input data
# Workflow definition
# Format: (step_name, step_type, input_directory, archive)
steps = [
("step1", Step1Type, "base_data", false),
("step2", Step2Type, "step1", true)
]
)Common Optional Parameters
workflow = MyWorkflow(
# ... required params ...
# Time handling
minute_span = 10, # Process data in N-minute chunks
reverse = false, # Process in reverse chronological order
# Directories
base_plot_dir = "/plots", # Output plots directory
# Radar-specific
raw_moment_names = ["DBZ", "VEL", "WIDTH"],
qc_moment_names = ["DBZ", "VEL"],
moment_grid_type = [:linear, :linear, :log],
# Logging
message_level = 2, # 0=error, 1=warning, 2=info, 3=debug, 4=trace
# Custom parameters
my_threshold = 10.0,
my_flag = true
)Accessing Parameters
# Direct access (throws error if key not found)
value = workflow["parameter_name"]
# With default value
value = get_param(workflow, "parameter_name", default_value)
# With type checking
value = get_param(workflow, "parameter_name", ExpectedType)Adding Parameters Dynamically
Since workflows behave like dictionaries:
# Add or update a parameter
workflow["new_parameter"] = "value"
# Check if parameter exists
if haskey(workflow.params, "optional_param")
# Use it
endData Flow and Directory Structure
Directory Hierarchy
Sparrow creates a structured directory hierarchy:
base_working_dir/
├── step1_convert/
│ ├── 20240101_0000/
│ ├── 20240101_0010/
│ └── 20240101_0020/
├── step2_qc/
│ ├── 20240101_0000/
│ └── ...
└── step3_grid/
└── ...
base_archive_dir/
├── converted/
├── qc/
└── gridded/Step Input/Output
Each step receives:
input_dir: Output directory from the previous step (or raw data for step 1)output_dir: A unique directory for this step's output
The workflow system automatically:
- Creates output directories
- Passes output of step N as input to step N+1
- Archives final outputs
Time-Based Processing
When processing time-series data:
workflow = MyWorkflow(
minute_span = 10,
# ...
)The workflow system:
- Divides the time range (start to end) into chunks
- Processes each chunk sequentially or in parallel
- Finds files matching each time window
- Runs all steps for that time window
Distributed Processing
Worker Assignment
Sparrow automatically distributes time chunks across workers:
# Workers are assigned file batches
assign_workers(workflow)Files are queued and distributed to available workers as they complete tasks.
Step Function on Workers
Your step functions run on worker processes. Important considerations:
- Module Loading: Workflow files are loaded on all workers automatically
- Message Level: Set message level on workers for proper logging
- Shared Data: Workers have separate memory; use files for communication
- Error Handling: Return 0 or throw to signal failure
Worker-Specific Code
function Sparrow.workflow_step(workflow::MyWorkflow, ::Type{MyStep},
input_dir::String, output_dir::String;
kwargs...)
# Get worker ID
worker_id = myid()
msg_debug("Running on worker $(worker_id)")
# Worker-specific logic
if nworkers() > 1
msg_info("Distributed mode with $(nworkers()) workers")
end
# Process files...
endAdvanced Features
Conditional Steps
function Sparrow.workflow_step(workflow::MyWorkflow, ::Type{ConditionalStep},
input_dir::String, output_dir::String;
kwargs...)
# Skip step based on condition
if !get_param(workflow, "enable_advanced_qc", false)
msg_info("Skipping advanced QC (not enabled)")
return 0
end
# Proceed with processing...
endChaining External Tools
function Sparrow.workflow_step(workflow::MyWorkflow, ::Type{RadxStep},
input_dir::String, output_dir::String;
kwargs...)
mkpath(output_dir)
for file in readdir(input_dir; join=true)
output_file = joinpath(output_dir, basename(file))
# Call external tool
cmd = `RadxConvert -f $(file) -outdir $(output_dir) -outformat cfradial`
try
run(cmd)
msg_debug("Converted $(basename(file))")
catch e
msg_error("RadxConvert failed on $(file): $(e)")
return 0
end
end
return length(readdir(input_dir))
endCustom File Discovery
function Sparrow.workflow_step(workflow::MyWorkflow, ::Type{CustomDiscovery},
input_dir::String, output_dir::String;
kwargs...)
# Custom pattern matching
pattern = get_param(workflow, "file_pattern", r".*\.nc$")
files = []
for (root, dirs, filenames) in walkdir(input_dir)
for filename in filenames
if occursin(pattern, filename)
push!(files, joinpath(root, filename))
end
end
end
msg_info("Found $(length(files)) files matching pattern")
# Process files...
endMetadata Propagation
function Sparrow.workflow_step(workflow::MyWorkflow, ::Type{MetadataStep},
input_dir::String, output_dir::String;
kwargs...)
mkpath(output_dir)
# Read metadata from previous step
metadata_file = joinpath(input_dir, ".metadata.json")
if isfile(metadata_file)
metadata = JSON.parsefile(metadata_file)
msg_debug("Loaded metadata: $(metadata)")
else
metadata = Dict()
end
# Add metadata for this step
metadata["step_name"] = get(kwargs, :step_name, "")
metadata["processed_at"] = now()
# Process files...
# Save updated metadata
output_metadata = joinpath(output_dir, ".metadata.json")
open(output_metadata, "w") do io
JSON.print(io, metadata, 2)
end
return 1
endError Handling
Step-Level Errors
function Sparrow.workflow_step(workflow::MyWorkflow, ::Type{SafeStep},
input_dir::String, output_dir::String;
kwargs...)
processed = 0
errors = 0
for file in readdir(input_dir; join=true)
try
# Process file
process_file(file, output_dir)
processed += 1
catch e
msg_warning("Failed to process $(basename(file)): $(e)")
errors += 1
# Continue or abort?
if errors > 10
msg_error("Too many errors, aborting step")
return 0
end
end
end
msg_info("Processed $(processed) files, $(errors) errors")
return processed
endValidation
function Sparrow.workflow_step(workflow::MyWorkflow, ::Type{ValidateStep},
input_dir::String, output_dir::String;
kwargs...)
# Validate required parameters
required_params = ["threshold", "method", "output_format"]
for param in required_params
if !haskey(workflow.params, param)
msg_error("Missing required parameter: $(param)")
return 0
end
end
# Validate input files exist
input_files = readdir(input_dir)
if isempty(input_files)
msg_warning("No input files found in $(input_dir)")
return 0
end
# Proceed with processing...
endBest Practices
- Keep Steps Focused: Each step should do one thing well
- Use Message Levels Appropriately: Error for failures, warning for issues, info for progress
- Return Accurate Counts: Return the actual number of files processed
- Create Output Directories: Always
mkpath(output_dir)before writing - Handle Missing Files Gracefully: Empty input is often valid (skip processing)
- Validate Parameters Early: Check required parameters at step start
- Use Type Dispatch: Define step types for clear separation of concerns
- Document Your Steps: Add comments explaining complex logic
- Test Incrementally: Test each step independently before chaining
- Log Progress: Use debug/trace messages for detailed progress tracking
Testing Workflows
For information on testing workflows, see:
- Unit testing individual steps
- Integration testing complete workflows
- Generating test fixtures
- Running tests locally and in CI