PostgresIOManager
The PostgresIOManager is a Dagster IO manager for reading and writing Pandas DataFrames to PostgreSQL. It provides automatic schema/table creation, type mapping, partitioning, incremental updates, and PostGIS geometry support.
Overview
When you return a DataFrame from a Dagster asset with io_manager_key="postgres_io_manager", it automatically:
- Creates the schema and table if they don't exist
- Maps Pandas/Arrow types to PostgreSQL types
- Tracks materializations for lineage and partition support
- Grants SELECT privileges to specified users
Basic Usage
import pandas as pd
import dagster as dg
@dg.asset(
io_manager_key="postgres_io_manager",
metadata={
"schema": "analytics",
"table": "daily_metrics", # optional, defaults to asset key
},
)
def daily_metrics():
return pd.DataFrame({
"date": ["2024-01-01", "2024-01-02"],
"value": [100, 200],
})
Metadata Options
Control how data is written using metadata:
| Option | Type | Default | Description |
|---|---|---|---|
schema | str | "public" | Target PostgreSQL schema |
table | str | asset key | Custom table name |
primary_key | str | list[str] | None | Column(s) for primary key constraint |
incremental | bool | str | False | Enable upsert mode (requires primary_key) |
columns | dict | None | Column type definitions and constraints |
chunk_size | str | "300MB" | Data chunk size for large outputs |
users | list[str] | [] | Users to grant SELECT privileges |
environment | str | None | Set to "production" to load inputs from prod |
full_table | bool | str | False | Ignore materialization IDs, load entire table |
Example with All Options
@dg.asset(
io_manager_key="postgres_io_manager",
metadata={
"schema": "transport",
"table": "routes",
"primary_key": ("start_id", "end_id"),
"incremental": "true",
"columns": {
"start_id": ("int", "not null"),
"end_id": ("int", "not null"),
"distance": "float",
"geometry": ("geometry", "not null"),
},
"chunk_size": "500MB",
"users": ["felt", "readonly_user"],
},
)
def truck_routes():
...
Column Type Definitions
The columns metadata lets you specify PostgreSQL types and constraints:
metadata={
"columns": {
# Simple type specification
"id": "int",
"name": "string",
"price": "float",
"active": "bool",
"created": "date",
"updated": "datetime",
# With constraints (use tuple)
"user_id": ("int", "not null"),
"email": ("string", "unique"),
# PostGIS geometry
"location": ("geometry", "not null"),
},
}
Available Types
| Type | PostgreSQL Mapping |
|---|---|
bool | BOOLEAN |
int, int32 | INTEGER |
uint | INTEGER (unsigned) |
float | DOUBLE PRECISION |
string | TEXT |
date | DATE |
datetime | TIMESTAMP |
geometry | GEOMETRY (PostGIS) |
Available Modifiers
not null- Column cannot be NULLunique- Column values must be unique
Incremental Updates (Upserts)
Enable upsert mode for efficient incremental data loading. Rows with matching primary keys are updated; new rows are inserted.
Basic Incremental Asset
@dg.asset(
io_manager_key="postgres_io_manager",
metadata={
"schema": "analytics",
"primary_key": "id",
"incremental": "true",
},
)
def user_metrics():
# Rows with existing IDs are updated, new IDs are inserted
return pd.DataFrame({
"id": [1, 2, 3],
"score": [95, 87, 92],
})
Composite Primary Key
metadata={
"primary_key": ("user_id", "date"), # Composite key
"incremental": "true",
}
The generated SQL uses ON CONFLICT ... DO UPDATE:
INSERT INTO analytics.user_metrics (id, score)
VALUES (1, 95), (2, 87), (3, 92)
ON CONFLICT (id) DO UPDATE SET
score = EXCLUDED.score;
IncrementalDataFrame with Delete
For scenarios where you need to delete specific rows before inserting (common with SharePoint file updates), use IncrementalDataFrame:
from shared.db.sql import Eq, IncrementalDataFrame
@sharepoint_asset(
watch=Watch("Lab Data", ["Results"], filters=[Suffix(".csv")]),
io_manager_key="postgres_io_manager",
metadata={"schema": "lab"},
)
async def lab_results(files: list[SharePointFile]):
rows = []
file_ids = []
for file in files:
if file.deleted:
file_ids.append(file.id)
continue
df = await file.read_csv()
df["file_id"] = file.id
rows.append(df)
file_ids.append(file.id)
result = pd.concat(rows) if rows else pd.DataFrame()
# Delete rows matching these file_ids before inserting
delete = Eq("file_id", file_ids)
return IncrementalDataFrame(result, delete=delete)
Delete Expression Types
Build WHERE clauses for the DELETE statement:
from shared.db.sql import Eq, And, Or
# Single value
Eq("file_id", "abc123")
# Multiple values (generates IN clause)
Eq("file_id", ["abc123", "def456", "ghi789"])
# Composite key
Eq(["user_id", "date"], [
(1, "2024-01-01"),
(2, "2024-01-02"),
])
# Logical AND
And(
Eq("file_id", "abc123"),
Eq("status", "active")
)
# Logical OR
Or(
Eq("source", "manual"),
Eq("source", "api")
)
Loading from Production
Read upstream assets from the production database while materializing to dev.
Method 1: Asset-Level Environment
Set environment: "production" on the asset definition to load all inputs from prod:
@dg.asset(
io_manager_key="postgres_io_manager",
metadata={"environment": "production"},
)
def analysis(upstream_data):
# upstream_data is loaded from production database
...
Method 2: Per-Input Environment
Specify environment for individual inputs:
@dg.asset(
io_manager_key="postgres_io_manager",
ins={
"prod_data": dg.AssetIn(metadata={"environment": "production"}),
"dev_data": dg.AssetIn(), # loads from dev (default)
},
)
def combined_analysis(prod_data, dev_data):
# Mix production and development data
...
Full Table Loading
By default, the IO manager only loads rows matching the current asset's materialization. Use full_table: True to load all rows:
@dg.asset(
io_manager_key="postgres_io_manager",
ins={
"all_routes": dg.AssetIn(metadata={
"environment": "production",
"full_table": True, # Load entire table, all partitions
}),
},
)
def route_analysis(all_routes):
# all_routes contains every row in the table
...
This is useful when:
- Multiple assets write to the same table
- You need all partitions, not just specific ones
- Loading reference data that shouldn't be filtered
GeoDataFrame Support
Return a GeoDataFrame for automatic PostGIS geometry handling:
import geopandas as gpd
from shapely.geometry import Point
@dg.asset(
io_manager_key="postgres_io_manager",
metadata={
"schema": "spatial",
"columns": {
"id": ("int", "not null"),
"name": "string",
"geometry": ("geometry", "not null"),
},
},
)
def facility_locations():
return gpd.GeoDataFrame({
"id": [1, 2],
"name": ["Facility A", "Facility B"],
"geometry": [Point(-122.4, 37.8), Point(-118.2, 34.0)],
}, crs="EPSG:4326")
When loaded as an input:
- Geometry columns are automatically detected from the
geometry_columnstable - Converted to GeoSeries with proper CRS
- The input type is
gpd.GeoDataFrame
SQL Object Output
For raw DDL/DML operations, return a SQL or Query object:
from shared.db.sql import SQL
@dg.asset(
io_manager_key="postgres_io_manager",
non_argument_deps={"raw_data"},
)
def aggregated_view() -> SQL:
return SQL("""
DROP TABLE IF EXISTS analytics.aggregated;
CREATE TABLE analytics.aggregated AS
SELECT
category,
COUNT(*) as count,
AVG(value) as avg_value
FROM analytics.raw_data
GROUP BY category;
""")
This is useful for:
- Creating derived tables via SQL
- Running complex transformations
- Creating views or materialized views
Granting Permissions
Automatically grant SELECT privileges after materialization:
@dg.asset(
io_manager_key="postgres_io_manager",
metadata={
"schema": "public",
"users": ["felt", "metabase_reader", "api_user"],
},
)
def shared_metrics():
...
The IO manager runs:
GRANT USAGE ON SCHEMA "public" TO "felt";
GRANT SELECT ON "public"."shared_metrics" TO "felt";
-- repeated for each user
Assets in the datasmart code location automatically grant to datasmart_user.
Materialization Tracking
Every row written by PostgresIOManager includes a materialization_id column that links to tracking tables:
meta.assets- Stores asset_key, partition_key, table, schemameta.materializations- Stores run_id, asset_id, timestamp
This enables:
- Partitioning: Different partitions have different materialization IDs
- Multi-asset tables: Multiple assets can write to the same table
- Data lineage: Track which Dagster run produced each row
- Selective loading: Load only rows for specific materializations
The materialization_id column is:
- Automatically added on output
- Automatically removed when loading inputs
Common Patterns
SharePoint File Processing
The most common pattern for SharePoint assets:
from dagster_tools.sharepoint_asset import sharepoint_asset, Watch, Suffix
from shared.sharepoint.client_v2 import SharePointFile
from shared.db.sql import Eq, IncrementalDataFrame
@sharepoint_asset(
watch=Watch("Lab Data", ["XRF Results"], filters=[Suffix(".csv")]),
io_manager_key="postgres_io_manager",
metadata={"schema": "analytical"},
)
async def xrf_results(files: list[SharePointFile]):
dfs = []
file_ids = []
for file in files:
if file.deleted:
file_ids.append(file.id)
continue
df = await file.read_csv()
df["file_id"] = file.id
df["file_url"] = file.url
df["file_modified"] = file.modified
dfs.append(df)
file_ids.append(file.id)
result = pd.concat(dfs) if dfs else pd.DataFrame()
return IncrementalDataFrame(result, delete=Eq("file_id", file_ids))
Multi-Asset Output
Process one source into multiple tables:
from dagster_tools.sharepoint_asset import sharepoint_multi_asset
@sharepoint_multi_asset(
watch=Watch("Data", ["Reports"]),
outs={
"samples": dg.AssetOut(metadata={"schema": "lab"}),
"measurements": dg.AssetOut(metadata={"schema": "lab"}),
"metadata": dg.AssetOut(metadata={"schema": "lab"}),
},
)
async def process_reports(files):
# ... process files into three DataFrames ...
delete = Eq("file_id", file_ids)
return (
IncrementalDataFrame(df_samples, delete=delete),
IncrementalDataFrame(df_measurements, delete=delete),
IncrementalDataFrame(df_metadata, delete=delete),
)
Hybrid Dev/Prod Loading
Materialize in dev while reading specific inputs from production:
@dg.asset(
io_manager_key="postgres_io_manager",
metadata={"schema": "analysis"},
ins={
"reference_data": dg.AssetIn(metadata={"environment": "production"}),
"verified_sites": dg.AssetIn(metadata={
"environment": "production",
"full_table": True,
}),
},
)
def site_analysis(reference_data, verified_sites, local_updates):
# reference_data: from prod, filtered by materialization
# verified_sites: from prod, full table
# local_updates: from dev (default)
...
Imports Reference
# IO Manager (usually configured in resources, not imported directly)
from dagster_tools.postgres_io_manager import PostgresIOManager
# Incremental updates
from shared.db.sql import IncrementalDataFrame, Eq, And, Or
# SQL operations
from shared.db.sql import SQL, Query
# SharePoint integration
from dagster_tools.sharepoint_asset import sharepoint_asset, sharepoint_multi_asset, Watch, Suffix
# Type hints
from shared.sharepoint.client_v2 import SharePointFile
import geopandas as gpd
import pandas as pd
Troubleshooting
Empty DataFrame Warning
If you return an empty DataFrame in non-incremental mode, you'll see:
Empty DataFrame received for asset my_asset. The database table will not be created.
This is expected behavior - empty DataFrames don't create tables.
Partition Not Found
ValueError: No materialization found for partitions ['2024-01-01']
The upstream partition hasn't been materialized yet. Either:
- Materialize the upstream partition first
- Use
full_table: Trueif you need all data regardless of partition
Permission Errors on GRANT
If you see "tuple concurrently updated" errors during GRANT, these are automatically retried (up to 5 times with exponential backoff). This happens when many assets run concurrently and all try to GRANT on the same schema.