Skip to main content

Analytical Methods

This section documents the analytical laboratory methods processed in the Monolith data pipeline. Each method has a dedicated Dagster asset pipeline that ingests raw data from SharePoint, normalizes sample IDs, and stores processed results in PostgreSQL.

Available Methods

MethodFull NameWhat It MeasuresPrimary Use
ICPInductively Coupled PlasmaElemental composition (Ca, Mg, Al, Fe, etc.)Rock/material elemental analysis
XRDX-Ray DiffractionMineral phase compositionCrystalline mineral identification
XRFX-Ray FluorescenceElemental oxide composition (CaO, MgO, SiO2, etc.)Bulk oxide analysis
PSAParticle Size AnalysisGrain size distributionMaterial grinding/sizing
FTIRFourier Transform Infrared SpectroscopyMolecular structure/bondsOrganic compound identification
BETBrunauer-Emmett-TellerSurface area & pore volumeMaterial reactivity
ICSIon Chromatography SystemDissolved ion concentrationsSolution chemistry
TitrationAcid-Base TitrationFreeite/limestone contentChemical composition

Common Architecture

All analytical method pipelines share a common architecture:

Data Flow

SharePoint (Raw Files) → Dagster Asset → PostgreSQL
↓ ↓ ↓
Excel/CSV Sample ID Norm analytical schema

Key Features

  1. SharePoint Integration: Raw data files are ingested from the "Analytical Data" SharePoint site using sharepoint_asset decorators
  2. Incremental Processing: Files are tracked by ID, enabling efficient upsert operations
  3. Sample ID Normalization: All pipelines use the SampleID class to normalize sample identifiers
  4. PostgreSQL Storage: Processed data is stored in the analytical schema

Common Imports

import pandas as pd
import dagster as dg
from data_infrastructure.sample_id import SampleID
from dagster_tools.sharepoint_asset import sharepoint_asset, Watch, Suffix
from shared.db.sql import IncrementalDataFrame, Eq

Typical Asset Pattern

@sharepoint_asset(
watch=Watch("Analytical Data", "/Method/Data", filters=[Suffix(".xlsx")]),
io_manager_key="postgres_io_manager",
metadata={"schema": "analytical"},
)
async def method_data(files: list[SharePointFile], crosswalk, project_team_map):
dfs, file_ids = [], []
for file in files:
file_ids.append(file.id)
if not file.deleted:
df = await file.read_excel()
df["file_id"] = file.id
dfs.append(df)

if not dfs:
return IncrementalDataFrame(pd.DataFrame(), delete=Eq("file_id", file_ids))

result = pd.concat(dfs)
result = SampleID.update_sample_id(
result,
is_material=True,
crosswalk=crosswalk,
project_team_map=project_team_map,
)
return IncrementalDataFrame(result, delete=Eq("file_id", file_ids))

Data Locations

Source Code

  • Dagster Assets: apps/datasmart/src/datasmart/assets/analytical/
  • Data Tools Dashboards: apps/data-tools/src/data_tools/routes/

Database

  • Schema: analytical (cleaned data), backend (raw/incremental)
  • Tables: Named after the method (e.g., analytical.icp, analytical.xrd_profex)

SharePoint

  • Site: Analytical Data
  • Folders: Organized by method (e.g., /ICP/, /XRD/, /XRF/)