Skip to main content

Dagster Tools

The dagster_tools library provides utilities for working with Dagster assets programmatically, including materializing assets and comparing dev vs prod data.

Installation

The library is available as part of the monorepo. It's automatically available in the datasmart and database code locations.

from dagster_tools import materialize_assets, compare_dev_and_prod_assets

Materializing Assets

The materialize_assets function allows you to materialize Dagster assets programmatically—useful for testing, debugging, or running assets outside of the Dagster UI.

Basic Usage

from dagster_tools import materialize_assets

# Materialize a single asset
materialize_assets("xrf_simplified")

# Materialize multiple assets
materialize_assets(["xrf_simplified", "xrd_profex"])

Reading Inputs

By default, materialize_assets reads upstream inputs from dev. Use from_prod=True to read from production instead:

# Default: reads inputs from dev, writes to dev
materialize_assets("xrf_simplified")

# Read inputs from production
materialize_assets("xrf_simplified", from_prod=True)

Silent Mode

Suppress output messages:

materialize_assets("xrf_simplified", silent=True)

API Reference

def materialize_assets(
assets: str | Sequence[str],
*,
from_prod: bool = False,
silent: bool = False,
) -> None:
"""
Materialize Dagster assets programmatically.

Args:
assets: Single asset name or list of asset names to materialize
from_prod: If True, read upstream inputs from production.
If False (default), use dev inputs.
silent: If True, suppress all output

Raises:
ValueError: If an asset is not found in any code location
Exception: If materialization fails
"""

Comparing Dev and Prod

Two functions are available for comparing data between dev and prod environments:

  1. compare_dev_and_prod_tables - Compare PostgreSQL tables directly
  2. compare_dev_and_prod_assets - Compare Dagster assets (with optional materialization)

Comparing Tables

Compare a table directly without Dagster involvement:

from dagster_tools import compare_dev_and_prod_tables

result = compare_dev_and_prod_tables("analytical.xrf_simplified")
print(result)

Output:

TableCompareResult(analytical.xrf_simplified): ✓ EQUAL
dev: 1,234 rows, 15 columns
prod: 1,234 rows, 15 columns
fingerprint_match: True

Options

result = compare_dev_and_prod_tables(
"analytical.xrf_simplified",
exclude_columns={"updated_at"}, # Columns to ignore
sample_on_diff=True, # Fetch sample rows when different
sample_limit=5, # Max sample rows
silent=False, # Suppress output
)

Comparing Assets

Compare Dagster assets between dev and prod:

from dagster_tools import compare_dev_and_prod_assets

# Compare using existing dev data
result = compare_dev_and_prod_assets("xrf_simplified")

# Materialize first, then compare
result = compare_dev_and_prod_assets("xrf_simplified", materialize=True)

# Compare multiple assets
results = compare_dev_and_prod_assets(["xrf_simplified", "xrd_profex"])

Materialize with Production Inputs

When materialize=True, you can control where inputs are read from:

# Materialize using dev inputs (default), then compare
result = compare_dev_and_prod_assets(
"xrf_simplified",
materialize=True,
)

# Materialize using prod inputs, then compare
result = compare_dev_and_prod_assets(
"xrf_simplified",
materialize=True,
from_prod=True,
)

API Reference

def compare_dev_and_prod_assets(
assets: str | Sequence[str],
*,
materialize: bool = False,
from_prod: bool = False,
exclude_columns: set[str] | None = None,
sample_limit: int = 5,
silent: bool = False,
) -> AssetCompareResult | list[AssetCompareResult]:
"""
Compare asset values between dev and prod.

Args:
assets: Single asset name or list of asset names
materialize: If True, materialize assets in dev before comparing
from_prod: If True and materialize=True, read upstream inputs
from production. If False (default), use dev inputs.
exclude_columns: Columns to exclude from comparison
(materialization_id is always excluded)
sample_limit: Max rows to sample when showing differences
silent: If True, suppress all output

Returns:
AssetCompareResult or list of AssetCompareResult
"""

Result Objects

TableCompareResult

@dataclass
class TableCompareResult:
table: str # Table reference
equal: bool # True if tables match
dev_row_count: int # Row count in dev
prod_row_count: int # Row count in prod
dev_columns: list[str] # Column names in dev
prod_columns: list[str] # Column names in prod
schema_diff: dict[str, Any] # Schema differences
fingerprint_match: bool | None # Content hash match
sample_differences: dict[str, Any] # Sample differing rows
error: str | None # Error message if failed

AssetCompareResult

@dataclass
class AssetCompareResult:
asset: str # Asset name
equal: bool # True if assets match
dev_row_count: int | None # Row count in dev
prod_row_count: int | None # Row count in prod
dev_columns: list[str] # Column names in dev
prod_columns: list[str] # Column names in prod
schema_diff: dict[str, Any] # Schema differences
sample_differences: dict[str, Any] # Sample differing rows
error: str | None # Error message if failed
materialized: bool # True if materialized before compare

Common Workflows

Verify a Code Change Doesn't Affect Output

from dagster_tools import compare_dev_and_prod_assets

# After making code changes, materialize with prod inputs and compare
result = compare_dev_and_prod_assets("my_asset", materialize=True)

if result.equal:
print("✓ Output unchanged - safe to deploy")
else:
print("✗ Output differs:")
print(f" Schema diff: {result.schema_diff}")
print(f" Sample differences: {result.sample_differences}")

Debug a Failing Asset

from dagster_tools import materialize_assets

# Materialize with dev inputs (default)
materialize_assets("failing_asset")

# Or materialize with prod inputs to reproduce production behavior
materialize_assets("failing_asset", from_prod=True)

Batch Compare Multiple Assets

from dagster_tools import compare_dev_and_prod_assets

assets = ["asset_a", "asset_b", "asset_c"]
results = compare_dev_and_prod_assets(assets, materialize=True)

for r in results:
status = "✓" if r.equal else "✗"
print(f"{status} {r.asset}: {r.dev_row_count} rows")

Finding Assets

The find_asset_location function locates which code location contains an asset:

from dagster_tools import find_asset_location

location, definitions, resources = find_asset_location("xrf_simplified")
print(f"Found in: {location}") # "datasmart" or "database"

This is used internally by materialize_assets and compare_dev_and_prod_assets, but can be useful for advanced use cases.