Skip to main content

Dagster, from zero to hero

info

I highly recommend doing the official Dagster tutorial in addition to reading this page. We will quickly go over Dagster concepts, but the main focus will be on the specific development patterns used in our codebase.

Dagster is our data orchestration platform. What does that mean? Think of it as the central place that is used to define and manage data pipelines.

Amongst other things, Dagster can:

  • Launch virtual machines on demand to run data pipelines
  • Organize assets (= data pipelines) into a graph based on their dependencies
  • Keep track of when an asset was last run
  • Keep track of which assets may be out-of-sync due to upstream changes
  • Set up automations that run at regular intervals or rerun data pipelines if one of their dependencies change

Dagster concepts

Here's a quick overview of the Dagster concepts we will discuss in this guide:

  • Assets: the basic building blocks in Dagster. An asset is an object in persistent storage, such as a table or a file. An asset definition is a Python function that produces the object in question. It can receive the results from other assets as input, creating dependency relationships. When an asset is created, we say it is materialized.
  • Multi-assets: the more complex version of an asset, where a single Python function has multiple outputs.
  • Asset graph: the dependency relationships between assets create a graph. This enables powerful automation, like knowing when something upstream of an asset has changed and ensuring that data assets, like database tables, are always up-to-date.
  • IO managers: assets are saved & loaded using IO managers. The two we use provide the ability to save outputs as PostgreSQL database tables or S3 objects.
  • Asset partitions: a way to slice an asset into smaller parts, which can be run independently using the same definition. For example, a dataset may be organized into yearly partitions.
  • Jobs: the unit of execution in Dagster. A job materializes one or multiple assets. A run is a single execution of a job. By default, there's a job for each asset. When you click "Materialize" on an asset, it launches a run of that job. Defining jobs explicitly can be useful to customize the CPU/RAM/disk they are allocated or to integrate with schedules and sensors.
  • Schedules and sensors: automation to materialize assets or run code at regular intervals.
  • Automated materializations: rematerialize assets whenever a certain condition is met (often, when something upstream changed).
  • Code locations: a collection of Dagster assets, jobs, schedules, and sensors, which constitute a Dagster project. Each code location is deployed independently, with its own set of dependencies.

Code structure

Our code locations follow this structure:

database
├── datasmart
│ ├── __init__.py
│ ├── definitions.py
│ ├── register.py
│ ├── assets
│ │ ├── __init__.py
│ │ ├── alp.py
│ │ ├── leach.py
│ │ └── ..
│ └── dagster_cloud.yaml
└── database
├── __init__.py
├── definitions.py
├── register.py
├── assets
│ ├── energy
│ │ ├── __init__.py
│ │ ├── electricity.py
│ │ ├── heat.py
│ │ └── ...
│ └── ...
└── dagster_cloud.yaml

Here's a breakdown:

  • definitions.py contains the code location, which gathers all the assets, jobs, sensors, and schedules for that project.
  • register.py contains utilities to register jobs, schedules, and sensors. This makes it easy to gather them all in a single place and import them into definitions.py
  • assets/ is the folder that contains all asset definitions. Any asset defined in a Python file within assets/ or one of its subfolders will be added to the code location.
    • Within database, we use nested folders, like assets/energy, to organize our assets into asset groups
  • dagster_cloud.yaml contains configuration for our production deployment (see Development vs production)

From development to production

There are two places we use Dagster:

  • The production instance available at https://brimstone.dagster.cloud/
  • The development instance launched by running dagster dev in our development Python environment.

The production instance is where finalized data gets produced and stored in the database that end users and applications connect to. It maintains a persistent history of all asset materializations and runs, allowing us to track when things last ran or detect when assets are out of sync.

The development instance, on the other hand, should be considered ephemeral - it's just for testing and development, and its history gets wiped whenever you restart it. The materialized assets are persisted, for ease of collaboration and development, but you should not rely on them too much. A coworker could easily change an asset with new code without you noticing.

Writing a new asset

Let's start by writing a new asset.

First, let's get Dagster running. From within the virtual environment, run dagster dev at the root of the repository. If this doesn't work or you don't know what the virtual environment is, read "Setting up a development environment".

This will start the Dagster development server. After a few seconds, it should show a url like http://localhost:3000/. Ctrl/Cmd+Click on it to open the Dagster development web page.

For a complete walkthrough of the Dagster web interface, reach out to Erwin for a session (will do a video tutorial at some point).

Now, we can write some code. Create a new Python file anywhere within datasmart/assets or database/assets. If you create a new subfolder, make sure to add an empty __init__.py file to the folder. This ensures Python recognizes the folder as a Python module and can import the code it contains.

Copy/paste this code into the file:

from dagster import asset

@asset
def my_new_asset():
# some data processing...
return pd.DataFrame({
'id': [1, 2, 3, 4, 5],
'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'],
'value': [10.5, 20.1, 15.7, 30.2, 25.9],
'category': ['A', 'B', 'A', 'C', 'B']
})
  • Options

Pushing to production

Saving & loading data (IO managers)

PostgreSQL

S3

Important utilities

Loading an asset result

Defining jobs / schedules / sensors

Advanced features

Partitions

Multi-assets

Auto-materialization

Sensors & schedules

Tips and tricks

Launching runs on bigger machines on AWS

Defining an asset group

  • Adding a init.py to a folder