API Reference
This page provides detailed documentation for all public APIs in Sparrow.jl.
Workflow Types and Macros
@workflow_type
Create a new workflow type that inherits from SparrowWorkflow.
Usage:
@workflow_type MyWorkflowExpands to:
struct MyWorkflow <: SparrowWorkflow
params::Dict{String,Any}
end
MyWorkflow(; kwargs...) = MyWorkflow(Dict{String,Any}(string(k) => v for (k, v) in kwargs))Example:
@workflow_type RadarProcessing
workflow = RadarProcessing(
base_working_dir = "/tmp/work",
base_archive_dir = "/data/archive",
base_data_dir = "/data/raw",
# Format: (step_name, step_type, input_directory, archive)
steps = [
("qc", QCStep, "base_data", false),
("grid", GridStep, "qc", true)
]
)@workflow_types
Define multiple workflow types at once.
Usage:
@workflow_types WorkflowA WorkflowB WorkflowCEquivalent to calling @workflow_type for each type individually.
@workflow_step
Define a workflow step type for dispatch.
Usage:
@workflow_step MyStepExpands to:
struct MyStep endStep types are used for dispatch in workflow_step function implementations.
Example:
@workflow_step ConvertData
function Sparrow.workflow_step(workflow::MyWorkflow, ::Type{ConvertData},
input_dir::String, output_dir::String;
kwargs...)
# Implementation
endCore Workflow Functions
run_workflow
Execute a complete workflow from start to finish.
Signature:
run_workflow(workflow::SparrowWorkflow, parsed_args) → BoolArguments:
workflow: A workflow instanceparsed_args: Parsed command-line arguments (Dict)
Returns:
trueif workflow completed successfully,falseotherwise
Description:
This is the main entry point for executing workflows. It:
- Sets up workflow parameters from command-line arguments
- Assigns workers for distributed processing
- Processes the workflow across the specified time range
- Handles errors and cleanup
Called by: The main function in the Sparrow module
assign_workers
Distribute files across available workers for parallel processing.
Signature:
assign_workers(workflow::SparrowWorkflow) → NothingArguments:
workflow: Workflow instance with configured parameters
Description:
Creates a file queue and distributes processing tasks across all available workers. Files are organized by time windows and assigned to workers as they become available.
Prerequisites:
- Workers must be initialized (via
addprocsor cluster manager) - Workflow must be loaded on all workers
process_workflow
Process a workflow with the main process (non-distributed).
Signature:
process_workflow(workflow::SparrowWorkflow) → BoolArguments:
workflow: Workflow instance
Returns:
trueif processing succeeded,falseotherwise
Description:
Processes the entire workflow sequentially on the main process. Used when running without distributed workers.
workflow_step
User-defined function that implements a workflow step.
Signature:
workflow_step(workflow::YourWorkflowType, ::Type{YourStepType},
input_dir::String, output_dir::String;
step_name::String="", step_num::Int=0, kwargs...) → IntArguments:
workflow: Your workflow instance::Type{YourStepType}: Step type for dispatchinput_dir: Directory containing input filesoutput_dir: Directory for output filesstep_name: Name of the step (from workflow definition)step_num: Step number in the workflow (1-indexed)kwargs...: Additional keyword arguments
Returns:
- Number of files processed (or 0 if step failed/skipped)
Description:
This is the function you implement for each workflow step. It receives input files from the previous step (or raw data for the first step) and produces output files for the next step.
Example:
function Sparrow.workflow_step(workflow::MyWorkflow, ::Type{ProcessData},
input_dir::String, output_dir::String;
step_name::String="", step_num::Int=0, kwargs...)
msg_info("Processing data in step $(step_num): $(step_name)")
mkpath(output_dir)
# Find and process files
files = readdir(input_dir; join=true)
for file in files
# Process file...
output = joinpath(output_dir, basename(file))
# ... save to output
end
return length(files)
endParameter Access Functions
get_param
Get a workflow parameter with optional default value or type checking.
Signatures:
get_param(workflow::SparrowWorkflow, key::String, default) → Any
get_param(workflow::SparrowWorkflow, key::String, ::Type{T}) → TArguments:
workflow: Workflow instancekey: Parameter namedefault: Default value if parameter not foundT: Expected type (with type assertion)
Returns:
- Parameter value, or default if not found (first form)
- Parameter value with type assertion (second form)
Examples:
# With default value
span = get_param(workflow, "minute_span", 10)
# With type checking
moments = get_param(workflow, "raw_moment_names", Vector{String})
# Direct access (throws error if not found)
value = workflow["required_param"]Message System
Message Functions
Output messages with severity levels.
Signatures:
message(msg::String, severity::Int=MSG_INFO)
msg_error(msg::String) # severity = 0
msg_warning(msg::String) # severity = 1
msg_info(msg::String) # severity = 2
msg_debug(msg::String) # severity = 3
msg_trace(msg::String) # severity = 4Arguments:
msg: Message text to displayseverity: Message severity level (0-4)
Description:
Messages are only displayed if their severity is at or below the current message level (set via set_message_level).
Examples:
msg_error("Critical failure in processing")
msg_warning("Missing optional parameter, using default")
msg_info("Processing 100 files")
msg_debug("Intermediate value: $(value)")
msg_trace("Loop iteration $(i)")setmessagelevel
Set the global message verbosity level.
Signature:
set_message_level(level::Int) → NothingArguments:
level: Message level (0-4)
Message Levels:
0(MSG_ERROR): Only errors1(MSG_WARNING): Errors and warnings2(MSG_INFO): Errors, warnings, and informational (default)3(MSG_DEBUG): Include debug messages4(MSG_TRACE): Include trace messages (very verbose)
Example:
set_message_level(MSG_DEBUG) # Show debug messages
# Or use integer directly
set_message_level(3)Message Level Constants
Predefined constants for message severity levels:
MSG_ERROR = 0: Error messages (always shown)MSG_WARNING = 1: Warning messagesMSG_INFO = 2: Informational messages (default level)MSG_DEBUG = 3: Debug messagesMSG_TRACE = 4: Trace messages (very detailed)
Abstract Types
SparrowWorkflow
Abstract base type for all Sparrow workflows.
Definition:
abstract type SparrowWorkflow <: AbstractDict{String,Any} endDescription:
All workflow types created with @workflow_type inherit from SparrowWorkflow. This type:
- Subtypes
AbstractDict{String,Any}to provide dictionary interface - Enables type-based dispatch for workflow-specific behavior
- Stores parameters in a
params::Dict{String,Any}field
Dictionary Interface:
Workflows support dictionary operations:
workflow["key"] # Get parameter (error if not found)
workflow["key"] = value # Set parameter
haskey(workflow.params, "key") # Check if parameter exists
keys(workflow.params) # Get all parameter names
length(workflow) # Number of parametersUtility Functions
setup_workers
Set up distributed workers for parallel processing.
Called automatically by run_workflow based on command-line arguments. Supports:
- Local workers (
-n N) - Slurm cluster workers (
--slurm -n N)
process_volume
Process a single time volume (time window) through all workflow steps.
Internal function - called by assign_workers and process_workflow.
runworkflowstep
Execute a single workflow step for a given time range.
Internal function - calls the user-defined workflow_step function.
check_processed
Check if a file has already been processed (exists in archive).
Internal function - used to skip already-processed files.
Command-Line Interface
The sparrow script provides the command-line interface. Options:
workflow Workflow file to execute (required, positional)
--datetime DATETIME Process specific time YYYYmmdd_HHMMSS (default: "now")
--realtime Process an incoming realtime datastream
--force_reprocess Force reprocessing of previously processed data
--threads N Number of threads
--num_workers N Number of worker processes
-v, --verbose LEVEL Message verbosity level (0-4, default: 2)
--slurm Use Slurm cluster manager
--sge Use Sun Grid Engine
--paths_file FILE File overriding data pathsExample:
julia sparrow my_workflow.jl --datetime 20240101_000000 \
--num_workers 4 --threads 2 -v 2Extended Example
Here's a complete example showing the API in use:
using Sparrow
# Define workflow type
@workflow_type DataPipeline
# Define steps
@workflow_step LoadData
@workflow_step ProcessData
@workflow_step SaveResults
# Create workflow instance
workflow = DataPipeline(
base_working_dir = "/tmp/work",
base_archive_dir = "/data/archive",
base_data_dir = "/data/raw",
# Format: (step_name, step_type, input_directory, archive)
steps = [
("load", LoadData, "base_data", false),
("process", ProcessData, "load", false),
("save", SaveResults, "process", true)
],
threshold = 10.0,
message_level = 2
)
# Implement step 1
function Sparrow.workflow_step(workflow::DataPipeline, ::Type{LoadData},
input_dir::String, output_dir::String;
kwargs...)
msg_info("Loading data from $(input_dir)")
mkpath(output_dir)
files = readdir(input_dir; join=true)
for file in files
# Load and prepare data
output = joinpath(output_dir, basename(file))
cp(file, output)
end
return length(files)
end
# Implement step 2
function Sparrow.workflow_step(workflow::DataPipeline, ::Type{ProcessData},
input_dir::String, output_dir::String;
kwargs...)
msg_info("Processing data")
mkpath(output_dir)
threshold = get_param(workflow, "threshold", 5.0)
files = readdir(input_dir; join=true)
for file in files
# Process with threshold
msg_debug("Processing $(basename(file)) with threshold $(threshold)")
output = joinpath(output_dir, basename(file))
# ... processing logic ...
cp(file, output)
end
return length(files)
end
# Implement step 3
function Sparrow.workflow_step(workflow::DataPipeline, ::Type{SaveResults},
input_dir::String, output_dir::String;
kwargs...)
msg_info("Saving final results")
mkpath(output_dir)
files = readdir(input_dir; join=true)
for file in files
output = joinpath(output_dir, basename(file))
cp(file, output)
end
msg_info("Workflow complete!")
return length(files)
endRun with:
julia sparrow pipeline.jl --datetime 20240101_000000