Skip to main content

PostgresIOManager

The PostgresIOManager is a Dagster IO manager for reading and writing Pandas DataFrames to PostgreSQL. It provides automatic schema/table creation, type mapping, partitioning, incremental updates, and PostGIS geometry support.

Overview

When you return a DataFrame from a Dagster asset with io_manager_key="postgres_io_manager", it automatically:

  1. Creates the schema and table if they don't exist
  2. Maps Pandas/Arrow types to PostgreSQL types
  3. Tracks materializations for lineage and partition support
  4. Grants SELECT privileges to specified users

Basic Usage

import pandas as pd
import dagster as dg

@dg.asset(
io_manager_key="postgres_io_manager",
metadata={
"schema": "analytics",
"table": "daily_metrics", # optional, defaults to asset key
},
)
def daily_metrics():
return pd.DataFrame({
"date": ["2024-01-01", "2024-01-02"],
"value": [100, 200],
})

Metadata Options

Control how data is written using metadata:

OptionTypeDefaultDescription
schemastr"public"Target PostgreSQL schema
tablestrasset keyCustom table name
primary_keystr | list[str]NoneColumn(s) for primary key constraint
incrementalbool | strFalseEnable upsert mode (requires primary_key)
columnsdictNoneColumn type definitions and constraints
chunk_sizestr"300MB"Data chunk size for large outputs
userslist[str][]Users to grant SELECT privileges
environmentstrNoneSet to "production" to load inputs from prod
full_tablebool | strFalseIgnore materialization IDs, load entire table
columns (input)list[str]NoneColumn names to load (reduces memory for wide tables)

Example with All Options

@dg.asset(
io_manager_key="postgres_io_manager",
metadata={
"schema": "transport",
"table": "routes",
"primary_key": ("start_id", "end_id"),
"incremental": "true",
"columns": {
"start_id": ("int", "not null"),
"end_id": ("int", "not null"),
"distance": "float",
"geometry": ("geometry", "not null"),
},
"chunk_size": "500MB",
"users": ["felt", "readonly_user"],
},
)
def truck_routes():
...

Column Type Definitions

The columns metadata lets you specify PostgreSQL types and constraints:

metadata={
"columns": {
# Simple type specification
"id": "int",
"name": "string",
"price": "float",
"active": "bool",
"created": "date",
"updated": "datetime",

# With constraints (use tuple)
"user_id": ("int", "not null"),
"email": ("string", "unique"),

# PostGIS geometry
"location": ("geometry", "not null"),
},
}

Available Types

TypePostgreSQL Mapping
boolBOOLEAN
int, int32INTEGER
uintINTEGER (unsigned)
floatDOUBLE PRECISION
stringTEXT
dateDATE
datetimeTIMESTAMP
geometryGEOMETRY (PostGIS)

Available Modifiers

  • not null - Column cannot be NULL
  • unique - Column values must be unique

TableSchema Helpers

For assets that both write to PostgreSQL and shape DataFrames in Python, prefer defining one shared TableSchema and reusing it for both metadata and DataFrame conformance.

import pandas as pd
import dagster as dg

from shared.db import table_schema


DAILY_METRICS_SCHEMA = table_schema(
{
"run_date": "date",
"plant": "string",
"throughput_tpd": "float",
"is_valid": "bool",
},
primary_key=["run_date", "plant"],
)


@dg.asset(
io_manager_key="postgres_io_manager",
metadata=DAILY_METRICS_SCHEMA.metadata(schema="analytics"),
)
def daily_metrics() -> pd.DataFrame:
rows = [
{
"run_date": "2024-01-01",
"plant": "A",
"throughput_tpd": "125.4",
"is_valid": "true",
}
]
return DAILY_METRICS_SCHEMA.finalize(pd.DataFrame(rows))

This gives you one canonical schema definition for:

  • metadata["columns"] and metadata["primary_key"]
  • typed empty outputs with schema.empty_df()
  • column reordering, missing-column filling, and dtype conversion with schema.finalize() / schema.convert()

TableSchema.metadata(...) also marks primary-key columns as not null, so you do not need to repeat that constraint in both places.

Reusing Only The Columns Metadata

If you need to build the asset metadata dict manually, use schema.columns_metadata:

metadata = {
"schema": "analytics",
"columns": DAILY_METRICS_SCHEMA.columns_metadata,
"primary_key": ["run_date", "plant"],
}

Incremental Updates (Upserts)

Enable upsert mode for efficient incremental data loading. Rows with matching primary keys are updated; new rows are inserted.

Basic Incremental Asset

@dg.asset(
io_manager_key="postgres_io_manager",
metadata={
"schema": "analytics",
"primary_key": "id",
"incremental": "true",
},
)
def user_metrics():
# Rows with existing IDs are updated, new IDs are inserted
return pd.DataFrame({
"id": [1, 2, 3],
"score": [95, 87, 92],
})

Composite Primary Key

metadata={
"primary_key": ("user_id", "date"), # Composite key
"incremental": "true",
}

The generated SQL uses ON CONFLICT ... DO UPDATE:

INSERT INTO analytics.user_metrics (id, score)
VALUES (1, 95), (2, 87), (3, 92)
ON CONFLICT (id) DO UPDATE SET
score = EXCLUDED.score;

IncrementalDataFrame with Delete

For scenarios where you need to delete specific rows before inserting (common with SharePoint file updates), use IncrementalDataFrame:

from shared.db.sql import Eq, IncrementalDataFrame

@sharepoint_asset(
watch=Watch("Lab Data", ["Results"], filters=[Suffix(".csv")]),
io_manager_key="postgres_io_manager",
metadata={"schema": "lab"},
)
async def lab_results(files: list[SharePointFile]):
rows = []
file_ids = []

for file in files:
if file.deleted:
file_ids.append(file.id)
continue

df = await file.read_csv()
df["file_id"] = file.id
rows.append(df)
file_ids.append(file.id)

result = pd.concat(rows) if rows else pd.DataFrame()

# Delete rows matching these file_ids before inserting
delete = Eq("file_id", file_ids)
return IncrementalDataFrame(result, delete=delete)

Delete Expression Types

Build WHERE clauses for the DELETE statement:

from shared.db.sql import Eq, And, Or

# Single value
Eq("file_id", "abc123")

# Multiple values (generates IN clause)
Eq("file_id", ["abc123", "def456", "ghi789"])

# Composite key
Eq(["user_id", "date"], [
(1, "2024-01-01"),
(2, "2024-01-02"),
])

# Logical AND
And(
Eq("file_id", "abc123"),
Eq("status", "active")
)

# Logical OR
Or(
Eq("source", "manual"),
Eq("source", "api")
)

Loading from Production

Read upstream assets from the production database while materializing to dev.

Method 1: Asset-Level Environment

Set environment: "production" on the asset definition to load all inputs from prod:

@dg.asset(
io_manager_key="postgres_io_manager",
metadata={"environment": "production"},
)
def analysis(upstream_data):
# upstream_data is loaded from production database
...

Method 2: Per-Input Environment

Specify environment for individual inputs:

@dg.asset(
io_manager_key="postgres_io_manager",
ins={
"prod_data": dg.AssetIn(metadata={"environment": "production"}),
"dev_data": dg.AssetIn(), # loads from dev (default)
},
)
def combined_analysis(prod_data, dev_data):
# Mix production and development data
...

Full Table Loading

By default, the IO manager only loads rows matching the current asset's materialization. Use full_table: True to load all rows:

@dg.asset(
io_manager_key="postgres_io_manager",
ins={
"all_routes": dg.AssetIn(metadata={
"environment": "production",
"full_table": True, # Load entire table, all partitions
}),
},
)
def route_analysis(all_routes):
# all_routes contains every row in the table
...

This is useful when:

  • Multiple assets write to the same table
  • You need all partitions, not just specific ones
  • Loading reference data that shouldn't be filtered

Column Filtering

For wide tables, load only the columns you need to reduce memory usage and improve load times:

@dg.asset(
io_manager_key="postgres_io_manager",
ins={
"basegrid": dg.AssetIn(metadata={
"columns": ["geohash4_id", "pol_iso_2"],
}),
},
)
def country_analysis(basegrid):
# basegrid only contains geohash4_id and pol_iso_2 columns
...

For external PostgreSQL tables that should load through the IO manager as a normal asset input, prefer AssetSpec over the deprecated SourceAsset API:

source_asset = dg.AssetSpec(
key="source_asset",
metadata={
"schema": "test",
"table": "source_asset", # optional, defaults to the asset key
},
).with_io_manager_key("postgres_io_manager")

@dg.asset(
io_manager_key="postgres_io_manager",
metadata={"schema": "test"},
)
def downstream_asset(source_asset: pd.DataFrame) -> pd.DataFrame:
source_asset["value_plus_one"] = source_asset["value"] + 1
return source_asset

Column filtering also works with AssetSpec metadata:

pfis_basegrid_gh4 = dg.AssetSpec(
key="pfis_basegrid_gh4",
metadata={
"schema": "gis",
"columns": ["geohash4_id", "pol_iso_2"],
"environment": "production",
},
).with_io_manager_key("postgres_io_manager")

@dg.asset
def my_analysis(pfis_basegrid_gh4: pd.DataFrame):
# DataFrame contains only the specified columns
...
tip

When using column filtering, geometry columns excluded from the selection are automatically skipped during GeoDataFrame conversion.


GeoDataFrame Support

Return a GeoDataFrame for automatic PostGIS geometry handling:

import geopandas as gpd
from shapely.geometry import Point

@dg.asset(
io_manager_key="postgres_io_manager",
metadata={
"schema": "spatial",
"columns": {
"id": ("int", "not null"),
"name": "string",
"geometry": ("geometry", "not null"),
},
},
)
def facility_locations():
return gpd.GeoDataFrame({
"id": [1, 2],
"name": ["Facility A", "Facility B"],
"geometry": [Point(-122.4, 37.8), Point(-118.2, 34.0)],
}, crs="EPSG:4326")

When loaded as an input:

  • Geometry columns are automatically detected from the geometry_columns table
  • Converted to GeoSeries with proper CRS
  • The input type is gpd.GeoDataFrame

SQL Object Output

For raw DDL/DML operations, return a SQL or Query object:

from shared.db.sql import SQL

@dg.asset(
io_manager_key="postgres_io_manager",
non_argument_deps={"raw_data"},
)
def aggregated_view() -> SQL:
return SQL("""
DROP TABLE IF EXISTS analytics.aggregated;
CREATE TABLE analytics.aggregated AS
SELECT
category,
COUNT(*) as count,
AVG(value) as avg_value
FROM analytics.raw_data
GROUP BY category;
""")

This is useful for:

  • Creating derived tables via SQL
  • Running complex transformations
  • Creating views or materialized views

Granting Permissions

Automatically grant SELECT privileges after materialization:

@dg.asset(
io_manager_key="postgres_io_manager",
metadata={
"schema": "public",
"users": ["felt", "metabase_reader", "api_user"],
},
)
def shared_metrics():
...

The IO manager runs:

GRANT USAGE ON SCHEMA "public" TO "felt";
GRANT SELECT ON "public"."shared_metrics" TO "felt";
-- repeated for each user
note

Assets in the datasmart code location automatically grant to datasmart_user.


Materialization Tracking

Every row written by PostgresIOManager includes a materialization_id column that links to tracking tables:

  • meta.assets - Stores asset_key, partition_key, table, schema
  • meta.materializations - Stores run_id, asset_id, timestamp

This enables:

  • Partitioning: Different partitions have different materialization IDs
  • Multi-asset tables: Multiple assets can write to the same table
  • Data lineage: Track which Dagster run produced each row
  • Selective loading: Load only rows for specific materializations

The materialization_id column is:

  • Automatically added on output
  • Automatically removed when loading inputs

Common Patterns

SharePoint File Processing

The most common pattern for SharePoint assets:

from dagster_tools.sharepoint_asset import sharepoint_asset, Watch, Suffix
from shared.sharepoint.client_v2 import SharePointFile
from shared.db.sql import Eq, IncrementalDataFrame

@sharepoint_asset(
watch=Watch("Lab Data", ["XRF Results"], filters=[Suffix(".csv")]),
io_manager_key="postgres_io_manager",
metadata={"schema": "analytical"},
)
async def xrf_results(files: list[SharePointFile]):
dfs = []
file_ids = []

for file in files:
if file.deleted:
file_ids.append(file.id)
continue

df = await file.read_csv()
df["file_id"] = file.id
df["file_url"] = file.url
df["file_modified"] = file.modified
dfs.append(df)
file_ids.append(file.id)

result = pd.concat(dfs) if dfs else pd.DataFrame()
return IncrementalDataFrame(result, delete=Eq("file_id", file_ids))

Multi-Asset Output

Process one source into multiple tables:

from dagster_tools.sharepoint_asset import sharepoint_multi_asset

@sharepoint_multi_asset(
watch=Watch("Data", ["Reports"]),
outs={
"samples": dg.AssetOut(metadata={"schema": "lab"}),
"measurements": dg.AssetOut(metadata={"schema": "lab"}),
"metadata": dg.AssetOut(metadata={"schema": "lab"}),
},
)
async def process_reports(files):
# ... process files into three DataFrames ...

delete = Eq("file_id", file_ids)
return (
IncrementalDataFrame(df_samples, delete=delete),
IncrementalDataFrame(df_measurements, delete=delete),
IncrementalDataFrame(df_metadata, delete=delete),
)

Hybrid Dev/Prod Loading

Materialize in dev while reading specific inputs from production:

@dg.asset(
io_manager_key="postgres_io_manager",
metadata={"schema": "analysis"},
ins={
"reference_data": dg.AssetIn(metadata={"environment": "production"}),
"verified_sites": dg.AssetIn(metadata={
"environment": "production",
"full_table": True,
}),
},
)
def site_analysis(reference_data, verified_sites, local_updates):
# reference_data: from prod, filtered by materialization
# verified_sites: from prod, full table
# local_updates: from dev (default)
...

Imports Reference

# IO Manager (usually configured in resources, not imported directly)
from dagster_tools.postgres_io_manager import PostgresIOManager

# Incremental updates
from shared.db.sql import IncrementalDataFrame, Eq, And, Or

# SQL operations
from shared.db.sql import SQL, Query

# SharePoint integration
from dagster_tools.sharepoint_asset import sharepoint_asset, sharepoint_multi_asset, Watch, Suffix

# Type hints
from shared.sharepoint.client_v2 import SharePointFile
import geopandas as gpd
import pandas as pd

Troubleshooting

Empty DataFrame Warning

If you return an empty DataFrame in non-incremental mode, you'll see:

Empty DataFrame received for asset my_asset. The database table will not be created.

This is expected behavior - empty DataFrames don't create tables.

Partition Not Found

ValueError: No materialization found for partitions ['2024-01-01']

The upstream partition hasn't been materialized yet. Either:

  • Materialize the upstream partition first
  • Use full_table: True if you need all data regardless of partition

Permission Errors on GRANT

If you see "tuple concurrently updated" errors during GRANT, these are automatically retried (up to 5 times with exponential backoff). This happens when many assets run concurrently and all try to GRANT on the same schema.