A minimal, end-to-end GPU observability & efficiency lakehouse built on public cluster traces.
This project simulates how big AI / hardware companies watch their GPU fleets:
- Raw job logs + machine metrics land in a Medallion lakehouse (bronze / silver / gold)
- A Prefect flow orchestrates ingestion and transformations
- A small ML model (IsolationForest) flags anomalous cluster days
⚠️ Important:
This project is built on public sample datasets, not a real production GPU cluster.
The core architecture and patterns are realistic, but functionality is intentionally limited to what the datasets support.
- Data Sources
- Professional Architecture
- Data Transformations – Medallion Model
- Pipeline Orchestration (Prefect)
- ML: Daily Cluster Anomaly Detection
- How to Run This Project Locally
- Ideas for Future Work
- Lessons Learned / Gotchas
All datasets are public and downloaded via the Kaggle CLI into data_raw/.
Large data files (data_raw/, data_lake/) are not committed to git; instead, scripts and instructions make the project reproducible.
After downloading and unzipping, the key CSVs are:
Job-level information:
job_name– used asjob_idinst_id– instance iduser– used asuser_idstatus– job status (Running, Terminated, etc.)start_time,end_time– numeric timestamps (seconds from a cluster reference point)
Per-instance details for each job (kept in bronze for future extensions).
Machine-level metrics over time:
worker_name,machinestart_time,end_timemachine_gpu– used asgpu_util_pctmachine_cpu– used ascpu_util_pctmachine_cpu_iowait,machine_cpu_kernel,machine_cpu_usrmachine_load_1,machine_net_receive,machine_num_worker
Hardware specs for machines (CPU, memory, etc.); currently modeled in bronze and available for future joins.
Approximate data sizes (from ingestion logs):
- ~1,055,501 job rows
- ~7,522,002 instance rows
- ~2,009,423 machine metric rows
These provide realistic job timelines and telemetry similar to what a GPU cluster control plane would see.
Columns include:
Product_NameGPU_ChipReleasedBusMemoryGPU_clock,Memory_clockShaders_TMUs_ROPs
This becomes bronze_gpu_specs — a dimension-like table representing GPU models.
In this minimal version it's mostly a placeholder for future analysis (e.g., performance or thermals by GPU model).
High-level system design:
- Public CSVs from Kaggle:
- Alibaba job + machine metrics
- GPU specs
- These mimic:
- Cluster scheduler job tables
- Node-level metric exporters
- Reads CSVs with pandas
- Writes Parquet files into
data_lake/bronze/ - This step is analogous to:
- Log shipper / ETL jobs
- Converting raw text/CSV into columnar storage suitable for analytics
Holds:
bronze_job_events.parquetbronze_instance_table.parquetbronze_machine_metrics.parquetbronze_machine_spec.parquetbronze_gpu_specs.parquet
Structure is close to original source formats; minimal transformation.
- dbt models read Parquet via DuckDB's
read_parquet(..) - Bronze views →
silver_jobs,silver_gpu_timeseries - Aggregations →
gold_cluster_util_daily
This is where:
- Types are cleaned
- Column names are normalized
- Derived features (like
run_time_sec) are added
- Uses
gold_cluster_util_dailyas input - Trains and scores an IsolationForest model
- Writes
gold_cluster_util_daily_scoredback into DuckDB - Produces:
anomaly_flag(0 / 1)anomaly_score(decision score)
flow_full_refresh orchestrates:
- Bronze ingestion
- dbt
runandtest - ML training and scoring
One command rebuilds the entire lakehouse from raw data.
This project follows the Medallion architecture pattern:
- Bronze – Raw, minimally processed Parquet (close to source)
- Silver – Cleaned and standardized tables with derived columns
- Gold – Aggregated, business / ML-ready tables
Generated by pipelines/ingest_bronze.py from data_raw/*.csv into data_lake/bronze/:
| File | Source | Purpose |
|---|---|---|
bronze_job_events.parquet |
pai_job_table.csv |
One row per job |
bronze_instance_table.parquet |
pai_instance_table.csv |
One row per job instance |
bronze_machine_metrics.parquet |
pai_machine_metric.csv |
Machine metrics by time window |
bronze_machine_spec.parquet |
pai_machine_spec.csv |
Machine hardware specs |
bronze_gpu_specs.parquet |
tpu_gpus.csv |
GPU model specs |
Goal: Schema persistence, not business logic. Just get data into a lake-friendly format.
Defined in dbt_project/gpu_telemetry/models/silver/.
Source: bronze_job_events
Transformations:
job_name→job_idinst_id→instance_id"user"→user_idstatus→job_statusstart_time,end_timekept as numeric timestampsrun_time_seccalculated as:case when end_time is not null then end_time - start_time else null end as run_time_sec
Semantics:
- One row per job
run_time_secrepresents how long a job ran (if it finished)- Jobs still Running have
end_timeandrun_time_secasNULL
Tests in models/silver/silver.yml:
job_idisnot_nullanduniqueuser_idisnot_null
This table behaves like a central job dimension you can join with metrics.
Source: bronze_machine_metrics
Transformations:
machine→machine_idend_time→ts(timestamp for metric)machine_gpu→gpu_util_pctmachine_cpu→cpu_util_pct
Additional columns preserved:
machine_load_1machine_net_receivemachine_cpu_iowait,machine_cpu_kernel,machine_cpu_usrmachine_num_worker
Filter: where end_time is not null to ensure valid timestamps
Tests:
machine_idisnot_nulltsisnot_null
This provides a machine-level time series for plotting utilization over time or aggregating to gold.
Defined in dbt_project/gpu_telemetry/models/gold/.
Source: silver_gpu_timeseries
Steps:
-
Convert numeric
tsinto timestamp and truncate to day:date_trunc('day', to_timestamp(ts)) as dt
-
Aggregate per day (
group by dt):avg_gpu_util = avg(gpu_util_pct)p95_gpu_util = quantile_cont(gpu_util_pct, 0.95)avg_cpu_util = avg(cpu_util_pct)
Result:
- One row per day (
dt) - Daily summary of cluster load
Tests in models/gold/gold.yml:
dtisnot_nulldtisunique
This table is the primary input for the anomaly detection ML model.
- Created by
ml/score_cluster_anomalies.py(Python), not dbt - Extends
gold_cluster_util_dailywith:anomaly_flag– 0 for normal days, 1 for anomalous daysanomaly_score– IsolationForest decision score (lower = more anomalous)
Sample data preview:
Summary statistics:
total_days≈ 49num_anomalies= 3
The whole pipeline is orchestrated with Prefect 2.x.
File: pipelines/flow_full_refresh.py
@flow(name="gpu_telemetry_full_refresh")
def full_refresh():
ingest_bronze()
run_dbt()
run_ml_scoring()@task
def ingest_bronze():
subprocess.run(
[sys.executable, "pipelines/ingest_bronze.py"],
cwd=PROJECT_ROOT,
check=True,
)- Uses
sys.executableto guarantee the active venv is used - Runs
pipelines/ingest_bronze.py, which:- Reads CSVs from
data_raw/ - Writes Parquet files into
data_lake/bronze/ - Logs row counts for each bronze dataset
- Reads CSVs from
@task
def run_dbt():
subprocess.run(
["dbt", "run"],
cwd=DBT_DIR,
check=True,
env=_dbt_env(),
)
subprocess.run(
["dbt", "test"],
cwd=DBT_DIR,
check=True,
env=_dbt_env(),
)DBT_DIRpoints todbt_project/gpu_telemetry_dbt_env()setsDUCKDB_PATHso dbt knows wheretelemetry.dbis
dbt run:
- Builds bronze views
- Builds silver tables
- Builds gold table
dbt test:
- Runs schema tests for:
silver_jobssilver_gpu_timeseriesgold_cluster_util_daily
@task
def run_ml_scoring():
subprocess.run(
[sys.executable, "ml/train_cluster_anomaly_model.py"],
cwd=PROJECT_ROOT,
check=True,
)
subprocess.run(
[sys.executable, "ml/score_cluster_anomalies.py"],
cwd=PROJECT_ROOT,
check=True,
)- Calls
ml/train_cluster_anomaly_model.py– trains IsolationForest - Calls
ml/score_cluster_anomalies.py– writesgold_cluster_util_daily_scored
One command triggers the entire stack:
python pipelines/flow_full_refresh.pyThe ML component focuses on detecting anomalous cluster days based on daily utilization patterns.
Steps:
-
Load
gold_cluster_util_dailyfromtelemetry.db:dtavg_gpu_utilp95_gpu_utilavg_cpu_util
-
Select feature columns:
feature_cols = ["avg_gpu_util", "p95_gpu_util", "avg_cpu_util"]
-
Standardize features with
StandardScaler -
Train an IsolationForest model:
model = IsolationForest( n_estimators=100, contamination=0.05, # ≈5% of days treated as anomalies random_state=42, )
-
Save artifacts:
- Model:
ml/cluster_anomaly_iforest.joblib - Scaler:
ml/cluster_anomaly_scaler.joblib
- Model:
Steps:
-
Reload
gold_cluster_util_dailysorted bydt -
Reload the saved model + scaler
-
Transform features and score:
preds = model.predict(X_scaled) # 1 = normal, -1 = anomaly scores = model.decision_function(X_scaled)
-
Add columns:
anomaly_flag = (preds == -1).astype(int) anomaly_score = scores
-
Replace table
gold_cluster_util_daily_scoredin DuckDB
The following plot shows daily average GPU utilization, with anomalous days highlighted:
Interpretation:
- Most days have relatively stable GPU utilization
- A small fraction (3 out of 49) are flagged as anomalous — days where the utilization pattern differs significantly from the rest
- Python 3.11
- Git
- Kaggle account + API token
- (Optional) DuckDB CLI installed for interactive queries
git clone <your-repo-url>.git
cd gpu-telemetry-lakehousepython3.11 -m venv .venv
source .venv/bin/activatepython -m pip install --upgrade pip
python -m pip install \
"prefect<3" \
"dbt-duckdb<2" \
duckdb \
pandas \
pyarrow \
scikit-learn \
joblib \
kaggleGenerate kaggle.json from your Kaggle account.
Move it to the standard location:
mkdir -p ~/.kaggle
mv ~/Downloads/kaggle.json ~/.kaggle/kaggle.json
chmod 600 ~/.kaggle/kaggle.jsonFrom project root:
cd data_raw
# Example (you should use the exact datasets you used originally):
# kaggle datasets download -d derrickmwiti/cluster-trace-gpu-v2020 --unzip
# kaggle datasets download -d baraazaid/cpu-and-gpu-stats --unzip
cd ..source .venv/bin/activate
python pipelines/flow_full_refresh.pyThis will:
- Recreate
data_lake/bronze/*.parquet - Build dbt models (bronze views, silver, gold)
- Run dbt tests
- Train & score the anomaly model
duckdb telemetry.db
.tables
select * from silver_jobs limit 5;
select * from gold_cluster_util_daily limit 5;
select * from gold_cluster_util_daily_scored limit 10;
select sum(anomaly_flag) as num_anomalies, count(*) as total_days
from gold_cluster_util_daily_scored;
.quitLimitation: Because this is a project built on sample public datasets, some functionalities that a real GPU observability platform would have are not possible (e.g., GPU temperature, power draw, DCGM-style error metrics, live streaming from exporters).
That said, the architecture is a solid starting point and can be extended in several directions:
-
gold_job_efficiency_daily- GPU-hours allocated vs. GPU-hours actively used
- Efficiency scores per job
-
gold_user_gpu_usage_daily- Per-user GPU-hours, job counts, and failure rates
Simulate additional fields such as:
- GPU memory usage / utilization
- Power draw
- Temperature
- Error counts (ECC, throttling)
Build gold tables for:
- Hot / throttled GPUs
- Error spikes per day
Add Apache Superset or Metabase pointing at DuckDB.
Dashboards like:
- Daily utilization with anomaly overlay
- Top anomalous days, with drill-down into job lists
- Job queue and runtime distributions
Wrap pai_machine_metric data in a "replay" script that pushes events into Kafka or a simple in-memory queue.
Use a streaming framework (or periodic batches) to ingest into bronze, to mimic real-time telemetry.
When new anomaly days are detected:
- Insert rows into an alerts table
- In a real deployment, push alerts to Slack / email / PagerDuty, etc.
This project surfaced several realistic issues that data engineers routinely face:
Prefect 2.x and dbt-duckdb work well on Python 3.11.
Attempting this on Python 3.14 led to dependency build failures (e.g., asyncpg), so downgrading to 3.11 was necessary.
Shell aliases may point python at a different interpreter than your virtualenv.
Using sys.executable ensures all subprocesses use the correct venv.
data_raw/ and data_lake/ can be large and change frequently.
Better to:
- Add them to
.gitignore - Provide reproducible download instructions (Kaggle) and ingestion scripts
dbt adds its own semicolons when compiling.
Leaving a trailing ; in the model SQL can create ;; and cause DuckDB parser errors.
Removing the trailing semicolon fixed the bronze view creation errors.
Upgrading Prefect with an old ~/.prefect/prefect.db in place can cause Alembic migration errors ("Can't locate revision…").
Deleting ~/.prefect/prefect.db and ~/.prefect/storage lets Prefect recreate a fresh, compatible database.
Building a clean Medallion model (bronze → silver → gold) before adding ML and orchestration made debugging easier.
Once the data model was stable, adding Prefect and IsolationForest on top was straightforward.


