PostgresIOManager
The PostgresIOManager is a Dagster IO manager for reading and writing Pandas DataFrames to PostgreSQL. It provides automatic schema/table creation, type mapping, partitioning, incremental updates, and PostGIS geometry support.
Overview
When you return a DataFrame from a Dagster asset with io_manager_key="postgres_io_manager", it automatically:
- Creates the schema and table if they don't exist
- Maps Pandas/Arrow types to PostgreSQL types
- Tracks materializations for lineage and partition support
- Grants SELECT privileges to specified users
Basic Usage
import pandas as pd
import dagster as dg
@dg.asset(
io_manager_key="postgres_io_manager",
metadata={
"schema": "analytics",
"table": "daily_metrics", # optional, defaults to asset key
},
)
def daily_metrics():
return pd.DataFrame({
"date": ["2024-01-01", "2024-01-02"],
"value": [100, 200],
})
Metadata Options
Control how data is written using metadata:
| Option | Type | Default | Description |
|---|---|---|---|
schema | str | "public" | Target PostgreSQL schema |
table | str | asset key | Custom table name |
primary_key | str | list[str] | None | Column(s) for primary key constraint |
incremental | bool | str | False | Enable upsert mode (requires primary_key) |
columns | dict | None | Column type definitions and constraints |
chunk_size | str | "300MB" | Data chunk size for large outputs |
users | list[str] | [] | Users to grant SELECT privileges |
environment | str | None | Set to "production" to load inputs from prod |
full_table | bool | str | False | Ignore materialization IDs, load entire table |
columns (input) | list[str] | None | Column names to load (reduces memory for wide tables) |
Example with All Options
@dg.asset(
io_manager_key="postgres_io_manager",
metadata={
"schema": "transport",
"table": "routes",
"primary_key": ("start_id", "end_id"),
"incremental": "true",
"columns": {
"start_id": ("int", "not null"),
"end_id": ("int", "not null"),
"distance": "float",
"geometry": ("geometry", "not null"),
},
"chunk_size": "500MB",
"users": ["felt", "readonly_user"],
},
)
def truck_routes():
...
Column Type Definitions
The columns metadata lets you specify PostgreSQL types and constraints:
metadata={
"columns": {
# Simple type specification
"id": "int",
"name": "string",
"price": "float",
"active": "bool",
"created": "date",
"updated": "datetime",
# With constraints (use tuple)
"user_id": ("int", "not null"),
"email": ("string", "unique"),
# PostGIS geometry
"location": ("geometry", "not null"),
},
}
Available Types
| Type | PostgreSQL Mapping |
|---|---|
bool | BOOLEAN |
int, int32 | INTEGER |
uint | INTEGER (unsigned) |
float | DOUBLE PRECISION |
string | TEXT |
date | DATE |
datetime | TIMESTAMP |
geometry | GEOMETRY (PostGIS) |
Available Modifiers
not null- Column cannot be NULLunique- Column values must be unique
TableSchema Helpers
For assets that both write to PostgreSQL and shape DataFrames in Python, prefer
defining one shared TableSchema and reusing it for both metadata and DataFrame
conformance.
import pandas as pd
import dagster as dg
from shared.db import table_schema
DAILY_METRICS_SCHEMA = table_schema(
{
"run_date": "date",
"plant": "string",
"throughput_tpd": "float",
"is_valid": "bool",
},
primary_key=["run_date", "plant"],
)
@dg.asset(
io_manager_key="postgres_io_manager",
metadata=DAILY_METRICS_SCHEMA.metadata(schema="analytics"),
)
def daily_metrics() -> pd.DataFrame:
rows = [
{
"run_date": "2024-01-01",
"plant": "A",
"throughput_tpd": "125.4",
"is_valid": "true",
}
]
return DAILY_METRICS_SCHEMA.finalize(pd.DataFrame(rows))
This gives you one canonical schema definition for:
metadata["columns"]andmetadata["primary_key"]- typed empty outputs with
schema.empty_df() - column reordering, missing-column filling, and dtype conversion with
schema.finalize()/schema.convert()
TableSchema.metadata(...) also marks primary-key columns as not null, so you
do not need to repeat that constraint in both places.
Reusing Only The Columns Metadata
If you need to build the asset metadata dict manually, use
schema.columns_metadata:
metadata = {
"schema": "analytics",
"columns": DAILY_METRICS_SCHEMA.columns_metadata,
"primary_key": ["run_date", "plant"],
}
Incremental Updates (Upserts)
Enable upsert mode for efficient incremental data loading. Rows with matching primary keys are updated; new rows are inserted.
Basic Incremental Asset
@dg.asset(
io_manager_key="postgres_io_manager",
metadata={
"schema": "analytics",
"primary_key": "id",
"incremental": "true",
},
)
def user_metrics():
# Rows with existing IDs are updated, new IDs are inserted
return pd.DataFrame({
"id": [1, 2, 3],
"score": [95, 87, 92],
})
Composite Primary Key
metadata={
"primary_key": ("user_id", "date"), # Composite key
"incremental": "true",
}
The generated SQL uses ON CONFLICT ... DO UPDATE:
INSERT INTO analytics.user_metrics (id, score)
VALUES (1, 95), (2, 87), (3, 92)
ON CONFLICT (id) DO UPDATE SET
score = EXCLUDED.score;
IncrementalDataFrame with Delete
For scenarios where you need to delete specific rows before inserting (common with SharePoint file updates), use IncrementalDataFrame:
from shared.db.sql import Eq, IncrementalDataFrame
@sharepoint_asset(
watch=Watch("Lab Data", ["Results"], filters=[Suffix(".csv")]),
io_manager_key="postgres_io_manager",
metadata={"schema": "lab"},
)
async def lab_results(files: list[SharePointFile]):
rows = []
file_ids = []
for file in files:
if file.deleted:
file_ids.append(file.id)
continue
df = await file.read_csv()
df["file_id"] = file.id
rows.append(df)
file_ids.append(file.id)
result = pd.concat(rows) if rows else pd.DataFrame()
# Delete rows matching these file_ids before inserting
delete = Eq("file_id", file_ids)
return IncrementalDataFrame(result, delete=delete)
Delete Expression Types
Build WHERE clauses for the DELETE statement:
from shared.db.sql import Eq, And, Or
# Single value
Eq("file_id", "abc123")
# Multiple values (generates IN clause)
Eq("file_id", ["abc123", "def456", "ghi789"])
# Composite key
Eq(["user_id", "date"], [
(1, "2024-01-01"),
(2, "2024-01-02"),
])
# Logical AND
And(
Eq("file_id", "abc123"),
Eq("status", "active")
)
# Logical OR
Or(
Eq("source", "manual"),
Eq("source", "api")
)
Loading from Production
Read upstream assets from the production database while materializing to dev.
Method 1: Asset-Level Environment
Set environment: "production" on the asset definition to load all inputs from prod:
@dg.asset(
io_manager_key="postgres_io_manager",
metadata={"environment": "production"},
)
def analysis(upstream_data):
# upstream_data is loaded from production database
...
Method 2: Per-Input Environment
Specify environment for individual inputs:
@dg.asset(
io_manager_key="postgres_io_manager",
ins={
"prod_data": dg.AssetIn(metadata={"environment": "production"}),
"dev_data": dg.AssetIn(), # loads from dev (default)
},
)
def combined_analysis(prod_data, dev_data):
# Mix production and development data
...
Full Table Loading
By default, the IO manager only loads rows matching the current asset's materialization. Use full_table: True to load all rows:
@dg.asset(
io_manager_key="postgres_io_manager",
ins={
"all_routes": dg.AssetIn(metadata={
"environment": "production",
"full_table": True, # Load entire table, all partitions
}),
},
)
def route_analysis(all_routes):
# all_routes contains every row in the table
...
This is useful when:
- Multiple assets write to the same table
- You need all partitions, not just specific ones
- Loading reference data that shouldn't be filtered
Column Filtering
For wide tables, load only the columns you need to reduce memory usage and improve load times:
@dg.asset(
io_manager_key="postgres_io_manager",
ins={
"basegrid": dg.AssetIn(metadata={
"columns": ["geohash4_id", "pol_iso_2"],
}),
},
)
def country_analysis(basegrid):
# basegrid only contains geohash4_id and pol_iso_2 columns
...
For external PostgreSQL tables that should load through the IO manager as a normal
asset input, prefer AssetSpec over the deprecated SourceAsset API:
source_asset = dg.AssetSpec(
key="source_asset",
metadata={
"schema": "test",
"table": "source_asset", # optional, defaults to the asset key
},
).with_io_manager_key("postgres_io_manager")
@dg.asset(
io_manager_key="postgres_io_manager",
metadata={"schema": "test"},
)
def downstream_asset(source_asset: pd.DataFrame) -> pd.DataFrame:
source_asset["value_plus_one"] = source_asset["value"] + 1
return source_asset
Column filtering also works with AssetSpec metadata:
pfis_basegrid_gh4 = dg.AssetSpec(
key="pfis_basegrid_gh4",
metadata={
"schema": "gis",
"columns": ["geohash4_id", "pol_iso_2"],
"environment": "production",
},
).with_io_manager_key("postgres_io_manager")
@dg.asset
def my_analysis(pfis_basegrid_gh4: pd.DataFrame):
# DataFrame contains only the specified columns
...
When using column filtering, geometry columns excluded from the selection are automatically skipped during GeoDataFrame conversion.
GeoDataFrame Support
Return a GeoDataFrame for automatic PostGIS geometry handling:
import geopandas as gpd
from shapely.geometry import Point
@dg.asset(
io_manager_key="postgres_io_manager",
metadata={
"schema": "spatial",
"columns": {
"id": ("int", "not null"),
"name": "string",
"geometry": ("geometry", "not null"),
},
},
)
def facility_locations():
return gpd.GeoDataFrame({
"id": [1, 2],
"name": ["Facility A", "Facility B"],
"geometry": [Point(-122.4, 37.8), Point(-118.2, 34.0)],
}, crs="EPSG:4326")
When loaded as an input:
- Geometry columns are automatically detected from the
geometry_columnstable - Converted to GeoSeries with proper CRS
- The input type is
gpd.GeoDataFrame
SQL Object Output
For raw DDL/DML operations, return a SQL or Query object:
from shared.db.sql import SQL
@dg.asset(
io_manager_key="postgres_io_manager",
non_argument_deps={"raw_data"},
)
def aggregated_view() -> SQL:
return SQL("""
DROP TABLE IF EXISTS analytics.aggregated;
CREATE TABLE analytics.aggregated AS
SELECT
category,
COUNT(*) as count,
AVG(value) as avg_value
FROM analytics.raw_data
GROUP BY category;
""")
This is useful for:
- Creating derived tables via SQL
- Running complex transformations
- Creating views or materialized views
Granting Permissions
Automatically grant SELECT privileges after materialization:
@dg.asset(
io_manager_key="postgres_io_manager",
metadata={
"schema": "public",
"users": ["felt", "metabase_reader", "api_user"],
},
)
def shared_metrics():
...
The IO manager runs:
GRANT USAGE ON SCHEMA "public" TO "felt";
GRANT SELECT ON "public"."shared_metrics" TO "felt";
-- repeated for each user
Assets in the datasmart code location automatically grant to datasmart_user.
Materialization Tracking
Every row written by PostgresIOManager includes a materialization_id column that links to tracking tables:
meta.assets- Stores asset_key, partition_key, table, schemameta.materializations- Stores run_id, asset_id, timestamp
This enables:
- Partitioning: Different partitions have different materialization IDs
- Multi-asset tables: Multiple assets can write to the same table
- Data lineage: Track which Dagster run produced each row
- Selective loading: Load only rows for specific materializations
The materialization_id column is:
- Automatically added on output
- Automatically removed when loading inputs
Common Patterns
SharePoint File Processing
The most common pattern for SharePoint assets:
from dagster_tools.sharepoint_asset import sharepoint_asset, Watch, Suffix
from shared.sharepoint.client_v2 import SharePointFile
from shared.db.sql import Eq, IncrementalDataFrame
@sharepoint_asset(
watch=Watch("Lab Data", ["XRF Results"], filters=[Suffix(".csv")]),
io_manager_key="postgres_io_manager",
metadata={"schema": "analytical"},
)
async def xrf_results(files: list[SharePointFile]):
dfs = []
file_ids = []
for file in files:
if file.deleted:
file_ids.append(file.id)
continue
df = await file.read_csv()
df["file_id"] = file.id
df["file_url"] = file.url
df["file_modified"] = file.modified
dfs.append(df)
file_ids.append(file.id)
result = pd.concat(dfs) if dfs else pd.DataFrame()
return IncrementalDataFrame(result, delete=Eq("file_id", file_ids))
Multi-Asset Output
Process one source into multiple tables:
from dagster_tools.sharepoint_asset import sharepoint_multi_asset
@sharepoint_multi_asset(
watch=Watch("Data", ["Reports"]),
outs={
"samples": dg.AssetOut(metadata={"schema": "lab"}),
"measurements": dg.AssetOut(metadata={"schema": "lab"}),
"metadata": dg.AssetOut(metadata={"schema": "lab"}),
},
)
async def process_reports(files):
# ... process files into three DataFrames ...
delete = Eq("file_id", file_ids)
return (
IncrementalDataFrame(df_samples, delete=delete),
IncrementalDataFrame(df_measurements, delete=delete),
IncrementalDataFrame(df_metadata, delete=delete),
)
Hybrid Dev/Prod Loading
Materialize in dev while reading specific inputs from production:
@dg.asset(
io_manager_key="postgres_io_manager",
metadata={"schema": "analysis"},
ins={
"reference_data": dg.AssetIn(metadata={"environment": "production"}),
"verified_sites": dg.AssetIn(metadata={
"environment": "production",
"full_table": True,
}),
},
)
def site_analysis(reference_data, verified_sites, local_updates):
# reference_data: from prod, filtered by materialization
# verified_sites: from prod, full table
# local_updates: from dev (default)
...
Imports Reference
# IO Manager (usually configured in resources, not imported directly)
from dagster_tools.postgres_io_manager import PostgresIOManager
# Incremental updates
from shared.db.sql import IncrementalDataFrame, Eq, And, Or
# SQL operations
from shared.db.sql import SQL, Query
# SharePoint integration
from dagster_tools.sharepoint_asset import sharepoint_asset, sharepoint_multi_asset, Watch, Suffix
# Type hints
from shared.sharepoint.client_v2 import SharePointFile
import geopandas as gpd
import pandas as pd
Troubleshooting
Empty DataFrame Warning
If you return an empty DataFrame in non-incremental mode, you'll see:
Empty DataFrame received for asset my_asset. The database table will not be created.
This is expected behavior - empty DataFrames don't create tables.
Partition Not Found
ValueError: No materialization found for partitions ['2024-01-01']
The upstream partition hasn't been materialized yet. Either:
- Materialize the upstream partition first
- Use
full_table: Trueif you need all data regardless of partition
Permission Errors on GRANT
If you see "tuple concurrently updated" errors during GRANT, these are automatically retried (up to 5 times with exponential backoff). This happens when many assets run concurrently and all try to GRANT on the same schema.