Dagster Tools
The dagster_tools library provides utilities for working with Dagster assets programmatically, including materializing assets and comparing dev vs prod data.
Installation
The library is available as part of the monorepo. It's automatically available in the datasmart and database code locations.
from dagster_tools import materialize_assets, compare_dev_and_prod_assets
Materializing Assets
The materialize_assets function allows you to materialize Dagster assets programmatically—useful for testing, debugging, or running assets outside of the Dagster UI.
Basic Usage
from dagster_tools import materialize_assets
# Materialize a single asset
materialize_assets("xrf_simplified")
# Materialize multiple assets
materialize_assets(["xrf_simplified", "xrd_profex"])
Reading Inputs
By default, materialize_assets reads upstream inputs from dev. Use from_prod=True to read from production instead:
# Default: reads inputs from dev, writes to dev
materialize_assets("xrf_simplified")
# Read inputs from production
materialize_assets("xrf_simplified", from_prod=True)
Silent Mode
Suppress output messages:
materialize_assets("xrf_simplified", silent=True)
API Reference
def materialize_assets(
assets: str | Sequence[str],
*,
from_prod: bool = False,
silent: bool = False,
) -> None:
"""
Materialize Dagster assets programmatically.
Args:
assets: Single asset name or list of asset names to materialize
from_prod: If True, read upstream inputs from production.
If False (default), use dev inputs.
silent: If True, suppress all output
Raises:
ValueError: If an asset is not found in any code location
Exception: If materialization fails
"""
Comparing Dev and Prod
Two functions are available for comparing data between dev and prod environments:
compare_dev_and_prod_tables- Compare PostgreSQL tables directlycompare_dev_and_prod_assets- Compare Dagster assets (with optional materialization)
Comparing Tables
Compare a table directly without Dagster involvement:
from dagster_tools import compare_dev_and_prod_tables
result = compare_dev_and_prod_tables("analytical.xrf_simplified")
print(result)
Output:
TableCompareResult(analytical.xrf_simplified): ✓ EQUAL
dev: 1,234 rows, 15 columns
prod: 1,234 rows, 15 columns
fingerprint_match: True
Options
result = compare_dev_and_prod_tables(
"analytical.xrf_simplified",
exclude_columns={"updated_at"}, # Columns to ignore
sample_on_diff=True, # Fetch sample rows when different
sample_limit=5, # Max sample rows
silent=False, # Suppress output
)
Comparing Assets
Compare Dagster assets between dev and prod:
from dagster_tools import compare_dev_and_prod_assets
# Compare using existing dev data
result = compare_dev_and_prod_assets("xrf_simplified")
# Materialize first, then compare
result = compare_dev_and_prod_assets("xrf_simplified", materialize=True)
# Compare multiple assets
results = compare_dev_and_prod_assets(["xrf_simplified", "xrd_profex"])
Materialize with Production Inputs
When materialize=True, you can control where inputs are read from:
# Materialize using dev inputs (default), then compare
result = compare_dev_and_prod_assets(
"xrf_simplified",
materialize=True,
)
# Materialize using prod inputs, then compare
result = compare_dev_and_prod_assets(
"xrf_simplified",
materialize=True,
from_prod=True,
)
API Reference
def compare_dev_and_prod_assets(
assets: str | Sequence[str],
*,
materialize: bool = False,
from_prod: bool = False,
exclude_columns: set[str] | None = None,
sample_limit: int = 5,
silent: bool = False,
) -> AssetCompareResult | list[AssetCompareResult]:
"""
Compare asset values between dev and prod.
Args:
assets: Single asset name or list of asset names
materialize: If True, materialize assets in dev before comparing
from_prod: If True and materialize=True, read upstream inputs
from production. If False (default), use dev inputs.
exclude_columns: Columns to exclude from comparison
(materialization_id is always excluded)
sample_limit: Max rows to sample when showing differences
silent: If True, suppress all output
Returns:
AssetCompareResult or list of AssetCompareResult
"""
Result Objects
TableCompareResult
@dataclass
class TableCompareResult:
table: str # Table reference
equal: bool # True if tables match
dev_row_count: int # Row count in dev
prod_row_count: int # Row count in prod
dev_columns: list[str] # Column names in dev
prod_columns: list[str] # Column names in prod
schema_diff: dict[str, Any] # Schema differences
fingerprint_match: bool | None # Content hash match
sample_differences: dict[str, Any] # Sample differing rows
error: str | None # Error message if failed
AssetCompareResult
@dataclass
class AssetCompareResult:
asset: str # Asset name
equal: bool # True if assets match
dev_row_count: int | None # Row count in dev
prod_row_count: int | None # Row count in prod
dev_columns: list[str] # Column names in dev
prod_columns: list[str] # Column names in prod
schema_diff: dict[str, Any] # Schema differences
sample_differences: dict[str, Any] # Sample differing rows
error: str | None # Error message if failed
materialized: bool # True if materialized before compare
Common Workflows
Verify a Code Change Doesn't Affect Output
from dagster_tools import compare_dev_and_prod_assets
# After making code changes, materialize with prod inputs and compare
result = compare_dev_and_prod_assets("my_asset", materialize=True)
if result.equal:
print("✓ Output unchanged - safe to deploy")
else:
print("✗ Output differs:")
print(f" Schema diff: {result.schema_diff}")
print(f" Sample differences: {result.sample_differences}")
Debug a Failing Asset
from dagster_tools import materialize_assets
# Materialize with dev inputs (default)
materialize_assets("failing_asset")
# Or materialize with prod inputs to reproduce production behavior
materialize_assets("failing_asset", from_prod=True)
Batch Compare Multiple Assets
from dagster_tools import compare_dev_and_prod_assets
assets = ["asset_a", "asset_b", "asset_c"]
results = compare_dev_and_prod_assets(assets, materialize=True)
for r in results:
status = "✓" if r.equal else "✗"
print(f"{status} {r.asset}: {r.dev_row_count} rows")
Finding Assets
The find_asset_location function locates which code location contains an asset:
from dagster_tools import find_asset_location
location, definitions, resources = find_asset_location("xrf_simplified")
print(f"Found in: {location}") # "datasmart" or "database"
This is used internally by materialize_assets and compare_dev_and_prod_assets, but can be useful for advanced use cases.