Incremental assets
Incremental assets are a custom setup to allow our assets to run only for updated data, whether it is new files on SharePoint, or new rows in the database.
How to enable incremental mode
Incremental mode is enabled by tagging an output with "incremental": "true"
in the metadata. This can be done either in the asset definition or when returning the output value:
SharePoint assets
Defining a SharePoint asset
Incremental SharePoint assets can be defined using the sharepoint_asset
and sharepoint_multi_asset
decorators. They take the same arguments as the regular Dagster decorators, as well as a watch
first argument that specifies which SharePoint locations should be monitored for updates.
Here's an example:
from dagster_tools.sharepoint_asset import sharepoint_asset, Watch, Suffix
from shared.sharepoint.client_v2 import SharePointFile
@sharepoint_asset(
watch=[
Watch("PSA Data", "/", filters=[Suffix(".csv")]),
Watch("Analytical Data", ["PSA", "PSA Raw Data"], filters=[Suffix(".csv")]),
]
)
async def example_sharepoint_asset(files: list[SharePointFile]):
...
The watched locations as a list of (site, folder path, [filters])
, with the filters being optional. The available filters are:
Suffix("...")
= the file name ends with this substringPrefix("...")
= the file name starts with this substringContains("...")
= the file name contains this substringFilter(filter_fn)
= applies a custom(file: SharePointFile) -> boolean
function to filter the files. This function has to be synchronous, so you can't read the file (otherwise it would be super slow), but you can do things like parse a sample ID from the file name.
Automated incremental runs
Once an asset is defined like this, it will get automatically run when files that pass the watch rules are created, updated, or deleted.
This is done through a combination of the automation server continuously synchronizing SharePoint updates and checking whether a run should be scheduled, and a sharepoint_sensor
in each code location that launches runs based on the run requests created by the automation server.
You will notice that the asset receives a special argument files: list[SharePointFile]
. This argument has to either be first, or second if you need to receive a context
argument from Dagster, as it has to come first.
The sharepoint_asset
wrapper takes care of listing all the files in the specified locations that pass the filter. Note that the files use the V2 of our SharePoint automation, so you will need to write asynchronous code (see the SharePoint V2 documentation). However, this will enable you to read files concurrently, often vastly decreasing the run time of your assets.
If you manually materialize the asset, it will run in regular mode by default, list all the files, and recreate the table from scratch. If the run is triggered by the sharepoint_sensor
, it will only receive updates since its last successful run.
Some additional things to know about the automated runs:
- Whenever a file that matches a SharePoint asset is created, updated, or deleted, a new run should be launched in at most one minute. (The automation server checks every 30 seconds, same thing for the
sharepoint_sensor
). - If an automatic run fails on an asset, the
sharepoint_sensor
will not try again until the asset is manually rerun and it succeeds. This avoids spamming failed runs. If you want retries, you can use the built-in Dagster feature. - The
sharepoint_sensor
will never launch a run of an asset if it's already running, whether it's an automated or manual run. - You can pass a keyword argument
max_update_rate_minutes
to the decorator to limit the frequency at which new runs are launched
Utilities
There is a helper to list the SharePoint files associated with a SharePoint asset or a list of watch rules:
from datetime import datetime
from datasmart.assets.psa import psa
from dagster_tools.sharepoint_asset import list_files_for_sharepoint_asset
from shared.sharepoint.client_v2 import SharePointClientV2
sp = SharePointClientV2()
# You can pass in the asset directly
await list_files_for_sharepoint_asset(sp, psa)
# Or watch rules if you want to test them
await list_files_for_sharepoint_asset(sp, Watch("PSA Data", "/", filters=[Suffix(".csv")]))
await list_files_for_sharepoint_asset(sp, [
Watch("Analytical Data", ["PSA", "PSA Raw Data"], filters=[Suffix(".csv")]),
Watch("PSA Data", "/", filters=[Suffix(".csv")]),
])
# There is also an optional `since` argument
await list_files_for_sharepoint_asset(sp, psa, since=datetime(2025, 2, 14))
Important caveat: this helper uses the advanced SharePointDB
client to very quickly list files. For example, it currently lists 27150 PSA files across two SharePoint sites in under 4 seconds. To achieve this feat, it relies on database tables that are updated every 30 seconds by the automation server. As such, if you create a file, it may not appear immediately. More information in the SharePointDB
docs.
Database incremental assets
This a planned mode where an asset would rerun only for new or updated database rows in its dependencies.
It's not currently implemented, as SharePoint assets cover most of our needs so far! Our data volumes are still very reasonable, so you can write downstream assets that work with full tables with roughly the same performance and way less headaches.
If you have a use case where this mode would be useful, let me (Erwin) know. One example I could see is generating a lot of SharePoint files - one per input file for instance. Generally those needs for repetitive outputs are solved by providing standardized tools like the Lab Data Plotter, but you never know!
Incremental outputs
When running in incremental mode, the goal is to insert, update, or delete database rows based on the latest changes. Since we're not recreating the full table from scratch, there needs to be a way to signal which rows need to change.
The Postgres IO Manager provides two ways of doing so.
Merge on primary key
Any data inserted in incremental mode will be automatically merged based on the primary key of the table, if there is one.
For example, let's say we have this table:
CREATE TABLE example (
id serial primary key,
name text not null,
description text not null
);
The SQL command generated in incremental mode will be an upsert (insert or update):
INSERT INTO example (id, name, description)
VALUES (...), (...), (...)
ON CONFLICT (id) DO UPDATE SET
name = EXCLUDED.name,
description = EXCLUDED.description;
This functionality can be used in conjunction with IncrementalDataFrame
described below. In fact, you will generally have to use IncrementalDataFrame
in all cases, as this merge mechanism doesn't provide a way to remove data from the table.
IncrementalDataFrame
This output provides a way to specify a DELETE
command that will run before the data gets inserted.
This is useful for multiple reasons:
- Delete data from the table, for example for deleted SharePoint files
- Delete data in tables without primary keys
- Overall flexibility
Its usage looks like this:
import pandas as pd
from dagster_tools.sharepoint_asset import sharepoint_asset, Watch
from shared.sharepoint.client_v2 import SharePointFile
from dagster_tools.postgres_io_manager import IncrementalDataFrame, Eq
@sharepoint_asset(
watch=[Watch("Stoneware", "/")],
io_manager_key="postgres_io_manager",
)
async def example_asset(files: list[SharePointFile]):
rows = []
deleted_ids = []
for file in files:
if file.deleted:
deleted_ids.append(file.id)
continue
rows.append({
"id": file.id,
"name": file.name,
})
df = pd.DataFrame.from_records(rows)
return IncrementalDataFrame(df, delete=Eq("id", deleted_ids))
The delete
argument is used to build up the WHERE
condition of a DELETE
SQL query. In the case above, it would generate the following SQL. $1
represents the list of IDs here.
DELETE FROM example_asset
WHERE id = ANY($1)
You can build logical expressions using the Eq
, And
, and Or
objects:
from dagster_tools.postgres_io_manager import Eq, And, Or
Advanced: manual usage
Most of the time, you should use incremental mode through an existing wrapper, like sharepoint_asset
. However, you can enable it manually by specifying "incremental": "true"
in the metadata of an output. You can also pass in a boolean for this metadata entry.
from dagster import asset, Output
# Option A
@asset(
io_manager_key="postgres_io_manager",
metadata={"incremental": "true"},
)
def example_asset():
df = pd.DataFrame({
'id': [1, 2, 3],
'content': ['a', 'b', 'c']
})
return df
# Option B
@asset(
io_manager_key="postgres_io_manager",
)
def example_asset():
df = pd.DataFrame({
'id': [1, 2, 3],
'content': ['a', 'b', 'c']
})
return Output(df, metadata={"incremental": "true"})
Option A can be useful for manual testing, but will essentially force an asset to always run in incremental mode, which is often not practical.
Option B is the one that's used in wrappers like sharepoint_asset
, in conjunction with an asset configuration that specifies whether to run in incremental mode. This configuration is generally defined to default to a full run, so that if a user clicks "Materialize", the asset runs and recreates the table from scratch.
For example, sharepoint_asset
does the following under the hood:
from dagster import Config, asset, Output
class SharePointAssetConfig(Config):
last_update: Optional[str] = Field(
default=None,
description="The most recent SharePoint update that has been processed. Used to filter for new files.",
)
@asset(io_manager_key="postgres_io_manager")
async def example_sharepoint_asset(config: SharePointAssetConfig):
# If last update is specified, we filter the files and run in incremental mode
incremental_mode = config.last_update is not None
# do work...
result = pd.DataFrame({
'id': [1, 2, 3],
'content': ['a', 'b', 'c']
})
return Output(result, metadata={"incremental": incremental_mode})