Skip to main content

PostgresIOManager

The PostgresIOManager is a Dagster IO manager for reading and writing Pandas DataFrames to PostgreSQL. It provides automatic schema/table creation, type mapping, partitioning, incremental updates, and PostGIS geometry support.

Overview

When you return a DataFrame from a Dagster asset with io_manager_key="postgres_io_manager", it automatically:

  1. Creates the schema and table if they don't exist
  2. Maps Pandas/Arrow types to PostgreSQL types
  3. Tracks materializations for lineage and partition support
  4. Grants SELECT privileges to specified users

Basic Usage

import pandas as pd
import dagster as dg

@dg.asset(
io_manager_key="postgres_io_manager",
metadata={
"schema": "analytics",
"table": "daily_metrics", # optional, defaults to asset key
},
)
def daily_metrics():
return pd.DataFrame({
"date": ["2024-01-01", "2024-01-02"],
"value": [100, 200],
})

Metadata Options

Control how data is written using metadata:

OptionTypeDefaultDescription
schemastr"public"Target PostgreSQL schema
tablestrasset keyCustom table name
primary_keystr | list[str]NoneColumn(s) for primary key constraint
incrementalbool | strFalseEnable upsert mode (requires primary_key)
columnsdictNoneColumn type definitions and constraints
chunk_sizestr"300MB"Data chunk size for large outputs
userslist[str][]Users to grant SELECT privileges
environmentstrNoneSet to "production" to load inputs from prod
full_tablebool | strFalseIgnore materialization IDs, load entire table

Example with All Options

@dg.asset(
io_manager_key="postgres_io_manager",
metadata={
"schema": "transport",
"table": "routes",
"primary_key": ("start_id", "end_id"),
"incremental": "true",
"columns": {
"start_id": ("int", "not null"),
"end_id": ("int", "not null"),
"distance": "float",
"geometry": ("geometry", "not null"),
},
"chunk_size": "500MB",
"users": ["felt", "readonly_user"],
},
)
def truck_routes():
...

Column Type Definitions

The columns metadata lets you specify PostgreSQL types and constraints:

metadata={
"columns": {
# Simple type specification
"id": "int",
"name": "string",
"price": "float",
"active": "bool",
"created": "date",
"updated": "datetime",

# With constraints (use tuple)
"user_id": ("int", "not null"),
"email": ("string", "unique"),

# PostGIS geometry
"location": ("geometry", "not null"),
},
}

Available Types

TypePostgreSQL Mapping
boolBOOLEAN
int, int32INTEGER
uintINTEGER (unsigned)
floatDOUBLE PRECISION
stringTEXT
dateDATE
datetimeTIMESTAMP
geometryGEOMETRY (PostGIS)

Available Modifiers

  • not null - Column cannot be NULL
  • unique - Column values must be unique

Incremental Updates (Upserts)

Enable upsert mode for efficient incremental data loading. Rows with matching primary keys are updated; new rows are inserted.

Basic Incremental Asset

@dg.asset(
io_manager_key="postgres_io_manager",
metadata={
"schema": "analytics",
"primary_key": "id",
"incremental": "true",
},
)
def user_metrics():
# Rows with existing IDs are updated, new IDs are inserted
return pd.DataFrame({
"id": [1, 2, 3],
"score": [95, 87, 92],
})

Composite Primary Key

metadata={
"primary_key": ("user_id", "date"), # Composite key
"incremental": "true",
}

The generated SQL uses ON CONFLICT ... DO UPDATE:

INSERT INTO analytics.user_metrics (id, score)
VALUES (1, 95), (2, 87), (3, 92)
ON CONFLICT (id) DO UPDATE SET
score = EXCLUDED.score;

IncrementalDataFrame with Delete

For scenarios where you need to delete specific rows before inserting (common with SharePoint file updates), use IncrementalDataFrame:

from shared.db.sql import Eq, IncrementalDataFrame

@sharepoint_asset(
watch=Watch("Lab Data", ["Results"], filters=[Suffix(".csv")]),
io_manager_key="postgres_io_manager",
metadata={"schema": "lab"},
)
async def lab_results(files: list[SharePointFile]):
rows = []
file_ids = []

for file in files:
if file.deleted:
file_ids.append(file.id)
continue

df = await file.read_csv()
df["file_id"] = file.id
rows.append(df)
file_ids.append(file.id)

result = pd.concat(rows) if rows else pd.DataFrame()

# Delete rows matching these file_ids before inserting
delete = Eq("file_id", file_ids)
return IncrementalDataFrame(result, delete=delete)

Delete Expression Types

Build WHERE clauses for the DELETE statement:

from shared.db.sql import Eq, And, Or

# Single value
Eq("file_id", "abc123")

# Multiple values (generates IN clause)
Eq("file_id", ["abc123", "def456", "ghi789"])

# Composite key
Eq(["user_id", "date"], [
(1, "2024-01-01"),
(2, "2024-01-02"),
])

# Logical AND
And(
Eq("file_id", "abc123"),
Eq("status", "active")
)

# Logical OR
Or(
Eq("source", "manual"),
Eq("source", "api")
)

Loading from Production

Read upstream assets from the production database while materializing to dev.

Method 1: Asset-Level Environment

Set environment: "production" on the asset definition to load all inputs from prod:

@dg.asset(
io_manager_key="postgres_io_manager",
metadata={"environment": "production"},
)
def analysis(upstream_data):
# upstream_data is loaded from production database
...

Method 2: Per-Input Environment

Specify environment for individual inputs:

@dg.asset(
io_manager_key="postgres_io_manager",
ins={
"prod_data": dg.AssetIn(metadata={"environment": "production"}),
"dev_data": dg.AssetIn(), # loads from dev (default)
},
)
def combined_analysis(prod_data, dev_data):
# Mix production and development data
...

Full Table Loading

By default, the IO manager only loads rows matching the current asset's materialization. Use full_table: True to load all rows:

@dg.asset(
io_manager_key="postgres_io_manager",
ins={
"all_routes": dg.AssetIn(metadata={
"environment": "production",
"full_table": True, # Load entire table, all partitions
}),
},
)
def route_analysis(all_routes):
# all_routes contains every row in the table
...

This is useful when:

  • Multiple assets write to the same table
  • You need all partitions, not just specific ones
  • Loading reference data that shouldn't be filtered

GeoDataFrame Support

Return a GeoDataFrame for automatic PostGIS geometry handling:

import geopandas as gpd
from shapely.geometry import Point

@dg.asset(
io_manager_key="postgres_io_manager",
metadata={
"schema": "spatial",
"columns": {
"id": ("int", "not null"),
"name": "string",
"geometry": ("geometry", "not null"),
},
},
)
def facility_locations():
return gpd.GeoDataFrame({
"id": [1, 2],
"name": ["Facility A", "Facility B"],
"geometry": [Point(-122.4, 37.8), Point(-118.2, 34.0)],
}, crs="EPSG:4326")

When loaded as an input:

  • Geometry columns are automatically detected from the geometry_columns table
  • Converted to GeoSeries with proper CRS
  • The input type is gpd.GeoDataFrame

SQL Object Output

For raw DDL/DML operations, return a SQL or Query object:

from shared.db.sql import SQL

@dg.asset(
io_manager_key="postgres_io_manager",
non_argument_deps={"raw_data"},
)
def aggregated_view() -> SQL:
return SQL("""
DROP TABLE IF EXISTS analytics.aggregated;
CREATE TABLE analytics.aggregated AS
SELECT
category,
COUNT(*) as count,
AVG(value) as avg_value
FROM analytics.raw_data
GROUP BY category;
""")

This is useful for:

  • Creating derived tables via SQL
  • Running complex transformations
  • Creating views or materialized views

Granting Permissions

Automatically grant SELECT privileges after materialization:

@dg.asset(
io_manager_key="postgres_io_manager",
metadata={
"schema": "public",
"users": ["felt", "metabase_reader", "api_user"],
},
)
def shared_metrics():
...

The IO manager runs:

GRANT USAGE ON SCHEMA "public" TO "felt";
GRANT SELECT ON "public"."shared_metrics" TO "felt";
-- repeated for each user
note

Assets in the datasmart code location automatically grant to datasmart_user.


Materialization Tracking

Every row written by PostgresIOManager includes a materialization_id column that links to tracking tables:

  • meta.assets - Stores asset_key, partition_key, table, schema
  • meta.materializations - Stores run_id, asset_id, timestamp

This enables:

  • Partitioning: Different partitions have different materialization IDs
  • Multi-asset tables: Multiple assets can write to the same table
  • Data lineage: Track which Dagster run produced each row
  • Selective loading: Load only rows for specific materializations

The materialization_id column is:

  • Automatically added on output
  • Automatically removed when loading inputs

Common Patterns

SharePoint File Processing

The most common pattern for SharePoint assets:

from dagster_tools.sharepoint_asset import sharepoint_asset, Watch, Suffix
from shared.sharepoint.client_v2 import SharePointFile
from shared.db.sql import Eq, IncrementalDataFrame

@sharepoint_asset(
watch=Watch("Lab Data", ["XRF Results"], filters=[Suffix(".csv")]),
io_manager_key="postgres_io_manager",
metadata={"schema": "analytical"},
)
async def xrf_results(files: list[SharePointFile]):
dfs = []
file_ids = []

for file in files:
if file.deleted:
file_ids.append(file.id)
continue

df = await file.read_csv()
df["file_id"] = file.id
df["file_url"] = file.url
df["file_modified"] = file.modified
dfs.append(df)
file_ids.append(file.id)

result = pd.concat(dfs) if dfs else pd.DataFrame()
return IncrementalDataFrame(result, delete=Eq("file_id", file_ids))

Multi-Asset Output

Process one source into multiple tables:

from dagster_tools.sharepoint_asset import sharepoint_multi_asset

@sharepoint_multi_asset(
watch=Watch("Data", ["Reports"]),
outs={
"samples": dg.AssetOut(metadata={"schema": "lab"}),
"measurements": dg.AssetOut(metadata={"schema": "lab"}),
"metadata": dg.AssetOut(metadata={"schema": "lab"}),
},
)
async def process_reports(files):
# ... process files into three DataFrames ...

delete = Eq("file_id", file_ids)
return (
IncrementalDataFrame(df_samples, delete=delete),
IncrementalDataFrame(df_measurements, delete=delete),
IncrementalDataFrame(df_metadata, delete=delete),
)

Hybrid Dev/Prod Loading

Materialize in dev while reading specific inputs from production:

@dg.asset(
io_manager_key="postgres_io_manager",
metadata={"schema": "analysis"},
ins={
"reference_data": dg.AssetIn(metadata={"environment": "production"}),
"verified_sites": dg.AssetIn(metadata={
"environment": "production",
"full_table": True,
}),
},
)
def site_analysis(reference_data, verified_sites, local_updates):
# reference_data: from prod, filtered by materialization
# verified_sites: from prod, full table
# local_updates: from dev (default)
...

Imports Reference

# IO Manager (usually configured in resources, not imported directly)
from dagster_tools.postgres_io_manager import PostgresIOManager

# Incremental updates
from shared.db.sql import IncrementalDataFrame, Eq, And, Or

# SQL operations
from shared.db.sql import SQL, Query

# SharePoint integration
from dagster_tools.sharepoint_asset import sharepoint_asset, sharepoint_multi_asset, Watch, Suffix

# Type hints
from shared.sharepoint.client_v2 import SharePointFile
import geopandas as gpd
import pandas as pd

Troubleshooting

Empty DataFrame Warning

If you return an empty DataFrame in non-incremental mode, you'll see:

Empty DataFrame received for asset my_asset. The database table will not be created.

This is expected behavior - empty DataFrames don't create tables.

Partition Not Found

ValueError: No materialization found for partitions ['2024-01-01']

The upstream partition hasn't been materialized yet. Either:

  • Materialize the upstream partition first
  • Use full_table: True if you need all data regardless of partition

Permission Errors on GRANT

If you see "tuple concurrently updated" errors during GRANT, these are automatically retried (up to 5 times with exponential backoff). This happens when many assets run concurrently and all try to GRANT on the same schema.