Server Architecture
The Server is a Python service that handles all data processing. It loads files, executes tool transformations, and returns results to the Studio. The Studio never touches actual data. It works with schemas and previews.
Technology Stack
| Layer | Technology | Why |
|---|---|---|
| Framework | FastAPI | Async, fast, great DX, OpenAPI docs |
| Data | Polars | Lazy evaluation, blazing performance |
| Validation | Pydantic | Type-safe request/response models |
| Packaging | PyInstaller | Single-file executable distribution |
Directory Structure
server/app/
├── main.py # FastAPI entrypoint
├── config.py # Settings and configuration
│
├── api/ # HTTP layer
│ ├── router.py # Route aggregation
│ ├── routes/ # Endpoint handlers
│ │ ├── health.py # Health check
│ │ ├── schema.py # Schema inference
│ │ ├── preview.py # Data preview
│ │ ├── execute.py # Workflow execution
│ │ └── connections.py # Connection CRUD
│ └── models/ # Pydantic schemas
│ ├── workflow.py # Workflow, Tool, Wire
│ ├── common.py # DataSchema, DataPreview
│ └── connections.py # Connection request/response
│
├── domain/ # Business logic
│ ├── tools/ # Tool implementations
│ │ ├── base.py # BaseTool ABC
│ │ ├── registry.py # Tool registry + decorator
│ │ ├── register.py # Imports all tools
│ │ ├── sockets.py # Socket ID constants (shared with Studio)
│ │ └── implementations/
│ │ ├── input.py
│ │ ├── filter.py
│ │ ├── select.py
│ │ └── ...
│ ├── execution/ # Workflow execution
│ │ ├── executor.py # Main executor (orchestration)
│ │ ├── planner.py # ExecutionPlanner + ExecutionPlan
│ │ ├── cache_manager.py # CacheManager (lineage + caching)
│ │ ├── graph.py # Dependency resolution
│ │ └── cache.py # BlockingToolCache (disk storage)
│ ├── workflow/ # Workflow utilities
│ │ └── archive.py # Audit archiving
│ ├── datasources/ # File type handlers
│ │ ├── factory.py # Datasource factory
│ │ ├── csv.py # CSV loader
│ │ ├── database.py # Database query loader
│ │ └── parquet.py # Parquet loader
│ └── connections/ # Credential management
│ ├── models.py # Connection data models
│ └── store.py # Encrypted credential store
│
├── middleware/ # Request middleware
│ └── audit.py # Audit logging
│
└── utils/ # Shared utilities
├── errors.py # Custom exceptions
└── type_mapping.py # Polars ↔ JSON type conversion
Tool System
The heart of the Server. Every transformation (Filter, Select, Join, etc.) is a Tool.
Socket ID Constants
Tool inputs and outputs use named "sockets" to connect together. To prevent mismatches between Studio and Server, socket IDs are defined as constants in both layers:
Server (app/domain/tools/sockets.py):
class CommonSockets:
INPUT = "input"
OUTPUT = "output"
class FilterSockets:
INPUT = "input"
OUTPUT_TRUE = "output-true"
OUTPUT_FALSE = "output-false"
class JoinSockets:
INPUT_LEFT = "input-left"
INPUT_RIGHT = "input-right"
OUTPUT_LEFT = "output-left"
OUTPUT_MATCH = "output-match"
OUTPUT_RIGHT = "output-right"
Studio (src/domain/workflow/constants/socketIds.ts):
export const CommonSockets = {
INPUT: "input",
OUTPUT: "output",
} as const;
export const FilterSockets = {
INPUT: "input",
OUTPUT_TRUE: "output-true",
OUTPUT_FALSE: "output-false",
} as const;
These constants are validated by type contract tests that ensure Studio and Server stay in sync.
BaseTool Interface
All tools inherit from BaseTool:
class BaseTool(ABC):
@abstractmethod
async def execute(
self,
config: dict[str, Any],
inputs: dict[str, pl.LazyFrame | list[pl.LazyFrame]]
) -> dict[str, pl.LazyFrame]:
"""Execute tool logic. Returns LazyFrames."""
pass
@abstractmethod
async def get_output_schema(
self,
config: dict[str, Any],
input_schemas: dict[str, DataSchema | list[DataSchema]]
) -> dict[str, DataSchema]:
"""Return schema without execution (for UI dropdowns)."""
pass
async def validate_config(self, config: dict[str, Any]) -> list[str]:
"""Validate configuration. Return error messages."""
return []
Key design decisions:
- Async by default: All methods are async, even if they don't need it. Consistency matters.
- LazyFrame in, LazyFrame out: Keep execution lazy. Never call
.collect()unless you must. - Socket-based I/O: Tools can have multiple inputs/outputs. The dict key is the socket ID.
- Schema without execution:
get_output_schemalets the UI know what columns will exist without running the pipeline.
Tool Registry
Tools self-register using a decorator:
# server/app/domain/tools/implementations/filter.py
from app.domain.tools.registry import register_tool
@register_tool("Filter") # This string must match Studio tool type
class FilterTool(BaseTool):
async def execute(self, config, inputs):
lf = inputs["input"]
expression = config.get("expression", "")
# ... filter logic
return {"output-true": true_result, "output-false": false_result}
The registry is a simple class-level dict:
class ToolRegistry:
_tools: dict[str, Type[BaseTool]] = {}
@classmethod
def register(cls, tool_type: str, tool_class: Type[BaseTool]):
cls._tools[tool_type] = tool_class
@classmethod
def get(cls, tool_type: str) -> BaseTool:
return cls._tools[tool_type]()
All tool implementations are imported in register.py, which triggers the decorators:
# server/app/domain/tools/register.py
from app.domain.tools.implementations import (
input,
output,
filter,
select,
sort,
formula,
join,
union,
summarize,
)
And register.py is imported in main.py:
# server/app/main.py
import app.domain.tools.register # noqa: F401
This ensures all tools are registered before any API request.
Execution Engine
The execution engine orchestrates workflow execution. It has been decomposed into three focused components:
| Component | File | Responsibility |
|---|---|---|
| ExecutionPlanner | planner.py | Builds execution plans, topological sort |
| CacheManager | cache_manager.py | Lineage hashes, cache decisions |
| WorkflowExecutor | executor.py | Orchestrates execution, tool invocation |
ExecutionPlanner
The ExecutionPlanner creates an immutable ExecutionPlan that determines what needs to execute and in what order:
from app.domain.execution.planner import ExecutionPlanner
planner = ExecutionPlanner(workflow)
plan = planner.create_plan(target_tool_id="output-1")
# Iterate over tool IDs in execution order
for tool_id in plan:
tool = plan.get_tool(tool_id)
inputs = plan.get_inputs(tool_id)
dependencies = plan.get_dependencies(tool_id)
The ExecutionPlan is immutable (uses frozen dataclass with tuples) and provides:
execution_order: Topological sort of tool IDsget_tool(tool_id): Access tool by IDget_inputs(tool_id): Input socket mappingsget_dependencies(tool_id): Upstream tool IDsget_execution_levels(): Groups for parallel execution
CacheManager
The CacheManager handles all caching decisions and lineage tracking:
from app.domain.execution.cache_manager import CacheManager
cache_mgr = CacheManager(workflow_id, force_refresh=False)
# Compute lineage hash (includes upstream state)
lineage_hash = cache_mgr.compute_lineage_hash(tool, plan)
# Check for cached outputs
cached = cache_mgr.check_cache(tool, lineage_hash)
if cached:
# Use cached outputs
pass
else:
# Execute tool, then cache
outputs = await tool_impl.execute(...)
outputs = cache_mgr.store_in_cache(tool, lineage_hash, outputs)
# Always record hash (even for non-cached tools)
cache_mgr.record_lineage_hash(tool.id, lineage_hash)
Key responsibilities:
- Lineage hash computation: Deterministic hash including tool config and all upstream hashes
- Cache decisions: Determines if a tool should be cached based on
should_cache() - Cache operations: Check hits, store results, clear cache
- Hash tracking: Records hashes for all tools (required for downstream hash computation)
Parallel Execution
The executor supports optional parallel execution of independent tools:
# In config.py
execution_mode: str = "sequential" # or "parallel"
max_parallel_tools: int = 4
When execution_mode="parallel", the executor uses get_execution_levels() to identify tools that can run concurrently:
Tools within a level have no dependencies on each other and execute concurrently using asyncio.gather() with semaphore limiting.
Execution Graph
Before executing, we build a dependency graph:
class ExecutionGraph:
def __init__(self, workflow: Workflow):
self.workflow = workflow
self.dependencies = self._build_dependencies()
def get_execution_order(self, target_tool_id: str = None) -> list[str]:
"""Topological sort of dependencies."""
# Returns tool IDs in order they should execute
This handles:
- Determining which tools need to run for a given target
- Detecting cycles (which should be caught by Studio, but we verify)
- Computing dependency depth for parallel execution
Execution Flow
- Create
ExecutionPlannerand buildExecutionPlan - Initialize
CacheManagerfor the workflow - Determine execution mode (sequential or parallel)
- For each tool (or level of tools):
- Compute lineage hash via
CacheManager - Check cache for blocking tools
- If cache hit: use cached output, skip execution
- If cache miss: execute tool, cache if blocking
- Record lineage hash (always)
- Store outputs for downstream tools
- Compute lineage hash via
- Collect results for target tool (preview or output)
See Caching System for details on how blocking tool outputs are cached.
async def execute_workflow(
self,
workflow: Workflow,
target_tool_id: str | None = None,
preview_limit: int | None = None,
) -> dict[str, Any]:
planner = ExecutionPlanner(workflow)
plan = planner.create_plan(target_tool_id)
cache_mgr = CacheManager(workflow.meta.workflow_id)
for tool_id in plan:
tool = plan.get_tool(tool_id)
inputs = self._gather_inputs(tool_id)
outputs = await self._execute_tool_with_cache(tool, inputs, cache_mgr, plan)
self.results[tool_id] = outputs
# Collect and return target results
return self._collect_results(target_tool_id, preview_limit)
API Endpoints
Health Check
GET /api/health/
Returns service status. Used by Studio to verify Server is running.
Schema
POST /api/workflow/tool/schema
{
"tool_id": "abc123",
"workflow": { ... }
}
Returns output schema for a tool without executing. Used to populate column dropdowns in tool configuration.
Preview / Execute Tool
POST /api/workflow/tool/execute
{
"tool_id": "abc123",
"workflow": { ... },
"preview_limit": 100
}
Executes workflow up to target tool, returns sample rows. This is where .collect() happens.
Execute Workflow
POST /api/workflow/execute
{
"workflow": { ... }
}
Full workflow execution. Output tools write files, execution metrics returned.
Polars Lazy Evaluation
This is why Sigilweaver can handle large datasets efficiently.
Bad (eager evaluation):
df = pl.read_csv("huge.csv") # Loads entire file into memory
df = df.filter(pl.col("age") > 30) # Creates new DataFrame
df = df.select(["name", "age"]) # Creates another DataFrame
Good (lazy evaluation):
lf = pl.scan_csv("huge.csv") # Returns immediately, nothing loaded
lf = lf.filter(pl.col("age") > 30) # Builds query plan
lf = lf.select(["name", "age"]) # Adds to query plan
result = lf.head(100).collect() # NOW it reads file, only what's needed
Tools operate on LazyFrames. We only .collect() when:
- Returning preview data
- Writing output files
- Schema inference requires it (some cases)
Datasources
The datasources/ module handles file loading:
class DatasourceFactory:
@staticmethod
def create(path: str) -> Datasource:
ext = Path(path).suffix.lower()
if ext == ".csv":
return CSVDatasource(path)
elif ext in [".parquet", ".pq"]:
return ParquetDatasource(path)
raise ValueError(f"Unsupported file type: {ext}")
Each datasource implements:
scan()→ LazyFrame (lazy load)infer_schema()→ DataSchema (column names and types)
Connections
The connections/ module manages secure database credential storage.
Connection Ownership
Connections have two ownership modes:
| Owner | Storage | Access |
|---|---|---|
client | Electron safeStorage (OS keychain) | Single user, single machine |
server | Fernet-encrypted JSON on server | All users with server access |
The Studio handles client connections; the Server handles server connections.
ConnectionStore
Server connections are managed by ConnectionStore:
class ConnectionStore:
def __init__(self, storage_path: Path, encryption_key: str):
self._fernet = Fernet(encryption_key.encode())
self._connections: dict[str, StoredConnection] = {}
def create_connection(self, input: CreateConnectionInput) -> ConnectionInfo:
# Encrypt credentials before storage
encrypted = self._encrypt_credentials(input.username, input.password)
# ... store connection
def resolve_connection(self, connection_id: str) -> ResolvedConnection:
# Decrypt credentials for execution only
# WARNING: Returns sensitive data - internal use only
Key security properties:
- Encryption at rest: Credentials encrypted with Fernet (AES-128-CBC + HMAC)
- Opaque to users: API never returns credentials, only connection metadata
- Decryption only during execution:
resolve_connection()is called only when building connection URIs
Executor Integration
During workflow execution, the executor resolves server connections:
async def execute_workflow(self, workflow: Workflow, ...):
# For each database Input tool with owner="server"
if tool.config.get("owner") == "server":
connection_id = tool.config.get("connectionId")
resolved = connection_store.resolve_connection(connection_id)
uri = build_uri(resolved)
# Pass URI to DatabaseDataSource
This ensures credentials never leave the server - they're resolved just-in-time during execution.
Error Handling
Custom exceptions in utils/errors.py:
class ToolError(Exception):
"""Error in tool execution"""
class ExecutionError(Exception):
"""Error in workflow execution"""
class ValidationError(Exception):
"""Invalid configuration or workflow"""
These bubble up through the API layer and get converted to appropriate HTTP responses.
Testing
Server tests focus on:
- Tool execution: Each tool has tests verifying transformations
- Schema propagation: Verify output schemas are computed correctly
- Execution order: Graph building and topological sort
- Edge cases: Missing inputs, invalid config, type mismatches
See Server Testing for details.
Next: Architectural Decisions or Adding Tools.