Skip to main content

Server Testing

Patterns and practices for testing the Python Server.

Framework

  • pytest for test execution
  • pytest-asyncio for async test support
  • pytest-cov for coverage reporting

Running Tests

cd server

# All tests
uv run pytest

# Verbose output
uv run pytest -v

# Specific file
uv run pytest tests/test_filter.py

# Specific test
uv run pytest tests/test_filter.py::test_filter_basic -v

# With coverage
uv run pytest --cov=app --cov-report=html

# Stop on first failure
uv run pytest -x

# Show print output
uv run pytest -s

Test Organization

server/tests/
├── conftest.py # Shared fixtures
├── fixtures/
│ └── sample.csv # Test data files
├── test_execution.py # Execution engine tests
├── test_planner.py # ExecutionPlanner tests
├── test_cache_manager.py # CacheManager tests
├── test_parallel_execution.py # Parallel execution tests
├── test_type_contracts.py # Studio/Server contract tests
├── test_cache.py # BlockingToolCache tests
├── test_formula.py # Formula tool tests
├── test_filter.py # Filter tool tests
├── test_select.py # Select tool tests
├── test_join.py # Join tool tests
├── test_output.py # Output tool tests
├── test_parquet.py # Parquet datasource tests
├── test_datasources.py # Datasource factory tests
├── test_database.py # Database datasource tests
├── test_connections.py # Connection store tests
├── test_integration.py # End-to-end workflow tests
└── test_models.py # Pydantic model tests

Fixtures

Common fixtures live in conftest.py:

# conftest.py
import pytest
import polars as pl
from pathlib import Path


@pytest.fixture
def sample_csv(tmp_path: Path) -> Path:
"""Create a temporary CSV file for testing."""
csv_path = tmp_path / "sample.csv"
csv_path.write_text("name,age,city\nAlice,30,NYC\nBob,25,LA\n")
return csv_path


@pytest.fixture
def sample_lazyframe() -> pl.LazyFrame:
"""Create a sample LazyFrame for testing."""
return pl.LazyFrame({
"name": ["Alice", "Bob", "Charlie"],
"age": [30, 25, 35],
"city": ["NYC", "LA", "Chicago"],
})


@pytest.fixture
def sample_schema() -> DataSchema:
"""Create a sample schema for testing."""
return DataSchema(columns=[
ColumnInfo(name="name", dtype="String", nullable=True),
ColumnInfo(name="age", dtype="Int64", nullable=False),
ColumnInfo(name="city", dtype="String", nullable=True),
])

Tool Testing Patterns

Basic Execution Test

import pytest
import polars as pl
from app.domain.tools.implementations.filter import FilterTool


@pytest.mark.asyncio
async def test_filter_basic(sample_lazyframe):
"""Test that filter excludes non-matching rows."""
tool = FilterTool()
config = {"expression": "age > 28"}

result = await tool.execute(config, {"input": sample_lazyframe})

df = result["output-true"].collect()
assert len(df) == 2 # Alice (30) and Charlie (35)
assert "Bob" not in df["name"].to_list()

Schema Propagation Test

@pytest.mark.asyncio
async def test_filter_schema_passthrough(sample_schema):
"""Test that filter preserves input schema."""
tool = FilterTool()
config = {"expression": "age > 28"}

result = await tool.get_output_schema(
config,
{"input": sample_schema}
)

# Filter doesn't change column structure
assert result["output-true"] == sample_schema
assert result["output-false"] == sample_schema

Validation Test

@pytest.mark.asyncio
async def test_filter_validates_empty_expression():
"""Test that empty expression is rejected."""
tool = FilterTool()

errors = await tool.validate_config({"expression": ""})

assert len(errors) > 0
assert "expression" in errors[0].lower()

Edge Case Tests

@pytest.mark.asyncio
async def test_filter_empty_input():
"""Test filter with no matching rows."""
tool = FilterTool()
lf = pl.LazyFrame({"age": [10, 15, 20]})
config = {"expression": "age > 100"}

result = await tool.execute(config, {"input": lf})

df = result["output-true"].collect()
assert len(df) == 0


@pytest.mark.asyncio
async def test_filter_all_matching():
"""Test filter where all rows match."""
tool = FilterTool()
lf = pl.LazyFrame({"age": [30, 40, 50]})
config = {"expression": "age > 20"}

result = await tool.execute(config, {"input": lf})

df = result["output-true"].collect()
assert len(df) == 3

Error Handling Test

@pytest.mark.asyncio
async def test_filter_invalid_column():
"""Test filter with non-existent column."""
tool = FilterTool()
lf = pl.LazyFrame({"age": [30]})
config = {"expression": "nonexistent > 0"}

with pytest.raises(ToolError, match="column"):
await tool.execute(config, {"input": lf})

Execution Engine Testing

The execution engine has been decomposed into several components, each with dedicated tests:

ExecutionPlanner Tests

import pytest
from app.domain.execution.planner import ExecutionPlanner, ExecutionPlan
from app.api.models.workflow import Workflow


def test_execution_plan_immutable():
"""Test that ExecutionPlan is immutable."""
workflow = create_workflow([...])
planner = ExecutionPlanner(workflow)
plan = planner.create_plan()

# execution_order is a tuple, not a list
assert isinstance(plan.execution_order, tuple)


def test_partial_plan():
"""Test creating plan for subset of workflow."""
workflow = create_workflow([
tool("A", "Input"),
tool("B", "Filter"),
tool("C", "Output"),
], wires=[...])

planner = ExecutionPlanner(workflow)
plan = planner.create_plan(target_tool_id="B")

# Only includes A and B, not C
assert "A" in plan.execution_order
assert "B" in plan.execution_order
assert "C" not in plan.execution_order


def test_execution_levels():
"""Test parallel execution level computation."""
# A -> B -> D
# A -> C -> D
workflow = create_workflow([
tool("A", "Input"),
tool("B", "Filter"),
tool("C", "Select"),
tool("D", "Output"),
], wires=[
wire("A", "output", "B", "input"),
wire("A", "output", "C", "input"),
wire("B", "output", "D", "input"),
wire("C", "output", "D", "input"),
])

planner = ExecutionPlanner(workflow)
plan = planner.create_plan()
levels = plan.get_execution_levels()

assert levels[0] == ["A"] # Level 0
assert set(levels[1]) == {"B", "C"} # Level 1 (parallel)
assert levels[2] == ["D"] # Level 2

CacheManager Tests

import pytest
from app.domain.execution.cache_manager import CacheManager


def test_lineage_hash_deterministic():
"""Test that same inputs produce same hash."""
cache1 = CacheManager("workflow-1")
cache2 = CacheManager("workflow-1")

# Same tool, same plan -> same hash
hash1 = cache1.compute_lineage_hash(tool, plan)
hash2 = cache2.compute_lineage_hash(tool, plan)

assert hash1 == hash2


def test_lineage_hash_chain():
"""Test that upstream changes propagate to downstream hashes."""
cache = CacheManager("workflow-1")

# Execute upstream tool, record hash
upstream_hash = cache.compute_lineage_hash(upstream_tool, plan)
cache.record_lineage_hash(upstream_tool.id, upstream_hash)

# Downstream hash includes upstream
downstream_hash = cache.compute_lineage_hash(downstream_tool, plan)

# Change upstream config
upstream_tool.config["expression"] = "different"
new_upstream_hash = cache.compute_lineage_hash(upstream_tool, plan)
cache.record_lineage_hash(upstream_tool.id, new_upstream_hash)

# Downstream hash should change
new_downstream_hash = cache.compute_lineage_hash(downstream_tool, plan)
assert new_downstream_hash != downstream_hash

Parallel Execution Tests

@pytest.mark.asyncio
async def test_parallel_execution_respects_dependencies():
"""Test that parallel execution never runs tool before its deps."""
workflow = create_parallel_workflow()
executor = WorkflowExecutor()

# Track execution order
execution_log = []

result = await executor.execute_workflow(
workflow,
execution_mode="parallel"
)

# Verify dependencies ran first
for tool_id, deps in get_dependencies(workflow).items():
tool_index = execution_log.index(tool_id)
for dep in deps:
dep_index = execution_log.index(dep)
assert dep_index < tool_index

Execution Graph Tests

import pytest
from app.domain.execution.executor import WorkflowExecutor
from app.domain.execution.graph import ExecutionGraph
from app.api.models.workflow import Workflow


def test_execution_order():
"""Test that tools execute in dependency order."""
workflow = create_workflow([
tool("A", "Input"),
tool("B", "Filter"), # depends on A
tool("C", "Output"), # depends on B
], wires=[
wire("A", "output", "B", "input"),
wire("B", "output", "C", "input"),
])

graph = ExecutionGraph(workflow)
order = graph.get_execution_order()

assert order.index("A") < order.index("B")
assert order.index("B") < order.index("C")


def test_cycle_detection():
"""Test that cycles are detected."""
workflow = create_workflow([
tool("A", "Filter"),
tool("B", "Filter"),
tool("C", "Filter"),
], wires=[
wire("A", "output", "B", "input"),
wire("B", "output", "C", "input"),
wire("C", "output", "A", "input"), # Creates cycle
])

graph = ExecutionGraph(workflow)
errors = graph.validate()

assert any("cycle" in e.lower() for e in errors)

Integration Testing

@pytest.mark.asyncio
async def test_full_workflow_execution(tmp_path):
"""Test complete workflow from input to output."""
# Create input file
input_path = tmp_path / "input.csv"
input_path.write_text("name,age\nAlice,30\nBob,25\n")

output_path = tmp_path / "output.csv"

workflow = create_workflow([
tool("input", "Input", config={"path": str(input_path)}),
tool("filter", "Filter", config={"expression": "age > 28"}),
tool("output", "Output", config={"path": str(output_path)}),
], wires=[
wire("input", "output", "filter", "input"),
wire("filter", "output-true", "output", "input"),
])

executor = WorkflowExecutor()
result = await executor.execute_workflow(workflow)

assert result["status"] == "success"
assert output_path.exists()

output_df = pl.read_csv(output_path)
assert len(output_df) == 1
assert output_df["name"][0] == "Alice"

Test Data Helpers

Keep test data creation DRY:

# tests/helpers.py

def create_tool(
id: str,
type: str,
config: dict = None,
x: float = 0,
y: float = 0,
) -> dict:
return {
"id": id,
"type": type,
"config": config or {},
"x": x,
"y": y,
"sockets": get_default_sockets(type),
}


def create_wire(
from_tool: str,
from_socket: str,
to_tool: str,
to_socket: str,
) -> dict:
return {
"from": {"tool": from_tool, "socket": from_socket},
"to": {"tool": to_tool, "socket": to_socket},
}


def create_workflow(tools: list, wires: list) -> Workflow:
return Workflow(
version="2.0",
meta={"name": "Test", "author": "Test", ...},
tools=tools,
wires=wires,
)

Async Testing Notes

All tool methods are async. Use @pytest.mark.asyncio:

# Correct
@pytest.mark.asyncio
async def test_something():
result = await tool.execute(...)

# Wrong - won't await properly
def test_something():
result = tool.execute(...) # Returns coroutine, not result

Polars Testing Tips

Comparing DataFrames

# Check equality
assert df1.equals(df2)

# Check specific columns
assert df["name"].to_list() == ["Alice", "Bob"]

# Check schema
assert df.schema == {"name": pl.String, "age": pl.Int64}

# Check row count
assert len(df) == 5

Working with LazyFrames

# Must collect before comparing
lf = tool.execute(...)["output"]
df = lf.collect()
assert len(df) == 5

# Don't compare LazyFrames directly
assert lf1 == lf2 # Wrong - compares object identity

Next: Frontend Testing for TypeScript/React testing patterns.