Dagster, from zero to hero
I highly recommend doing the official Dagster tutorial in addition to reading this page. We will quickly go over Dagster concepts, but the main focus will be on the specific development patterns used in our codebase.
Dagster is our data orchestration platform. What does that mean? Think of it as the central place that is used to define and manage data pipelines.
Amongst other things, Dagster can:
- Launch virtual machines on demand to run data pipelines
- Organize assets (= data pipelines) into a graph based on their dependencies
- Keep track of when an asset was last run
- Keep track of which assets may be out-of-sync due to upstream changes
- Set up automations that run at regular intervals or rerun data pipelines if one of their dependencies change
Dagster concepts
Here's a quick overview of the Dagster concepts we will discuss in this guide:
- Assets: the basic building blocks in Dagster. An asset is an object in persistent storage, such as a table or a file. An asset definition is a Python function that produces the object in question. It can receive the results from other assets as input, creating dependency relationships. When an asset is created, we say it is materialized.
- Multi-assets: the more complex version of an asset, where a single Python function has multiple outputs.
- Asset graph: the dependency relationships between assets create a graph. This enables powerful automation, like knowing when something upstream of an asset has changed and ensuring that data assets, like database tables, are always up-to-date.
- IO managers: assets are saved & loaded using IO managers. The two we use provide the ability to save outputs as PostgreSQL database tables or S3 objects.
- Asset partitions: a way to slice an asset into smaller parts, which can be run independently using the same definition. For example, a dataset may be organized into yearly partitions.
- Jobs: the unit of execution in Dagster. A job materializes one or multiple assets. A run is a single execution of a job. By default, there's a job for each asset. When you click "Materialize" on an asset, it launches a run of that job. Defining jobs explicitly can be useful to customize the CPU/RAM/disk they are allocated or to integrate with schedules and sensors.
- Schedules and sensors: automation to materialize assets or run code at regular intervals.
- Automated materializations: rematerialize assets whenever a certain condition is met (often, when something upstream changed).
- Code locations: a collection of Dagster assets, jobs, schedules, and sensors, which constitute a Dagster project. Each code location is deployed independently, with its own set of dependencies.
Code structure
Our code locations follow this structure:
database
├── datasmart
│ ├── __init__.py
│ ├── definitions.py
│ ├── register.py
│ ├── assets
│ │ ├── __init__.py
│ │ ├── alp.py
│ │ ├── leach.py
│ │ └── ..
│ └── dagster_cloud.yaml
└── database
├── __init__.py
├── definitions.py
├── register.py
├── assets
│ ├── energy
│ │ ├── __init__.py
│ │ ├── electricity.py
│ │ ├── heat.py
│ │ └── ...
│ └── ...
└── dagster_cloud.yaml
Here's a breakdown:
definitions.py
contains the code location, which gathers all the assets, jobs, sensors, and schedules for that project.register.py
contains utilities to register jobs, schedules, and sensors. This makes it easy to gather them all in a single place and import them intodefinitions.py
assets/
is the folder that contains all asset definitions. Any asset defined in a Python file withinassets/
or one of its subfolders will be added to the code location.- Within
database
, we use nested folders, likeassets/energy
, to organize our assets into asset groups
- Within
dagster_cloud.yaml
contains configuration for our production deployment (see Development vs production)
From development to production
There are two places we use Dagster:
- The production instance available at https://brimstone.dagster.cloud/
- The development instance launched by running
dagster dev
in our development Python environment.
The production instance is where finalized data gets produced and stored in the database that end users and applications connect to. It maintains a persistent history of all asset materializations and runs, allowing us to track when things last ran or detect when assets are out of sync.
The development instance, on the other hand, should be considered ephemeral - it's just for testing and development, and its history gets wiped whenever you restart it. The materialized assets are persisted, for ease of collaboration and development, but you should not rely on them too much. A coworker could easily change an asset with new code without you noticing.
Writing a new asset
Let's start by writing a new asset.
First, let's get Dagster running. From within the virtual environment, run dagster dev
at the root of the repository. If this doesn't work or you don't know what the virtual environment is, read "Setting up a development environment".
This will start the Dagster development server. After a few seconds, it should show a url like http://localhost:3000/
. Ctrl/Cmd+Click on it to open the Dagster development web page.
For a complete walkthrough of the Dagster web interface, reach out to Erwin for a session (will do a video tutorial at some point).
Now, we can write some code. Create a new Python file anywhere within datasmart/assets
or database/assets
. If you create a new subfolder, make sure to add an empty __init__.py
file to the folder. This ensures Python recognizes the folder as a Python module and can import the code it contains.
Copy/paste this code into the file:
from dagster import asset
@asset
def my_new_asset():
# some data processing...
return pd.DataFrame({
'id': [1, 2, 3, 4, 5],
'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'],
'value': [10.5, 20.1, 15.7, 30.2, 25.9],
'category': ['A', 'B', 'A', 'C', 'B']
})
Recommended dev workflow
- Options
Pushing to production
Saving & loading data (IO managers)
PostgreSQL
S3
Important utilities
Loading an asset result
Defining jobs / schedules / sensors
Advanced features
Partitions
Multi-assets
Auto-materialization
Sensors & schedules
Tips and tricks
Launching runs on bigger machines on AWS
Defining an asset group
- Adding a init.py to a folder