ML Architecture Builder
A comprehensive machine learning architecture generator that creates end-to-end MLOps platforms with automated training pipelines, model serving infrastructure, and comprehensive monitoring for production ML systems.
Overview
This archetype generates a complete ML architecture following MLOps best practices, including data ingestion, feature engineering, model training, validation, deployment, and monitoring. It provides a scalable foundation for enterprise machine learning initiatives.
Technology Stack
Core ML Technologies
- Python: Primary ML development language
- PyTorch/TensorFlow: Deep learning frameworks
- Scikit-learn: Traditional ML algorithms
- MLflow: Experiment tracking and model registry
- Kubeflow: ML workflows on Kubernetes
- Ray: Distributed computing and hyperparameter tuning
Data & Processing
- Apache Airflow: Data pipeline orchestration
- Apache Spark: Large-scale data processing
- Delta Lake: Data versioning and ACID transactions
- Feature Store: Centralized feature management
- PostgreSQL: Metadata and operational data
Infrastructure & Deployment
- Kubernetes: Container orchestration
- Docker: Containerization
- Prometheus/Grafana: Monitoring and alerting
- MinIO: Object storage (S3-compatible)
- Redis: Caching and real-time features
Generated Architecture
Core Components
1. Data Ingestion & Processing
# Data ingestion service
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import pandas as pd
from delta import DeltaTable
import great_expectations as ge
default_args = {
'owner': 'ml-team',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'data_ingestion_pipeline',
default_args=default_args,
description='ML data ingestion and validation pipeline',
schedule='@hourly',
catchup=False,
tags=['ml', 'data-ingestion']
)
def extract_raw_data(**context):
"""Extract data from various sources"""
# Implementation for data extraction
pass
def validate_data_quality(**context):
"""Validate data quality using Great Expectations"""
data = context['task_instance'].xcom_pull(task_ids='extract_data')
# Create expectation suite
expectation_suite = ge.core.ExpectationSuite(
expectation_suite_name="data_quality_suite"
)
# Define expectations
df = ge.from_pandas(data, expectation_suite=expectation_suite)
# Data quality checks
df.expect_column_to_exist("user_id")
df.expect_column_values_to_not_be_null("user_id")
df.expect_column_values_to_be_unique("user_id")
df.expect_column_values_to_be_between("age", min_value=0, max_value=120)
validation_result = df.validate()
if not validation_result["success"]:
raise ValueError("Data quality validation failed")
return data
def store_to_delta_lake(**context):
"""Store validated data to Delta Lake"""
data = context['task_instance'].xcom_pull(task_ids='validate_data')
# Write to Delta Lake with versioning
(data.write
.format("delta")
.mode("append")
.option("mergeSchema", "true")
.save("/data/lake/raw/user_data"))
# Define DAG tasks
extract_task = PythonOperator(
task_id='extract_data',
python_callable=extract_raw_data,
dag=dag
)
validate_task = PythonOperator(
task_id='validate_data',
python_callable=validate_data_quality,
dag=dag
)
store_task = PythonOperator(
task_id='store_to_delta',
python_callable=store_to_delta_lake,
dag=dag
)
extract_task >> validate_task >> store_task
2. Feature Engineering Pipeline
# Feature engineering service
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.feature_selection import SelectKBest, f_classif
import mlflow
import feast
from typing import Dict, List, Any
class FeatureEngineer:
"""Feature engineering pipeline with MLflow tracking"""
def __init__(self, feature_store_config: Dict[str, Any]):
self.feature_store = feast.FeatureStore(repo_path=feature_store_config['repo_path'])
self.scalers = {}
self.encoders = {}
def create_temporal_features(self, df: pd.DataFrame, date_column: str) -> pd.DataFrame:
"""Create temporal features from date columns"""
df = df.copy()
df[date_column] = pd.to_datetime(df[date_column])
# Extract temporal features
df[f'{date_column}_year'] = df[date_column].dt.year
df[f'{date_column}_month'] = df[date_column].dt.month
df[f'{date_column}_day'] = df[date_column].dt.day
df[f'{date_column}_dayofweek'] = df[date_column].dt.dayofweek
df[f'{date_column}_quarter'] = df[date_column].dt.quarter
df[f'{date_column}_is_weekend'] = (df[date_column].dt.dayofweek >= 5).astype(int)
return df
def create_aggregation_features(self, df: pd.DataFrame, group_cols: List[str],
agg_cols: List[str], agg_funcs: List[str]) -> pd.DataFrame:
"""Create aggregation features"""
df = df.copy()
for agg_func in agg_funcs:
for agg_col in agg_cols:
feature_name = f"{agg_col}_{agg_func}_by_{'_'.join(group_cols)}"
if agg_func == 'mean':
df[feature_name] = df.groupby(group_cols)[agg_col].transform('mean')
elif agg_func == 'std':
df[feature_name] = df.groupby(group_cols)[agg_col].transform('std')
elif agg_func == 'count':
df[feature_name] = df.groupby(group_cols)[agg_col].transform('count')
elif agg_func == 'max':
df[feature_name] = df.groupby(group_cols)[agg_col].transform('max')
elif agg_func == 'min':
df[feature_name] = df.groupby(group_cols)[agg_col].transform('min')
return df
def encode_categorical_features(self, df: pd.DataFrame, categorical_cols: List[str]) -> pd.DataFrame:
"""Encode categorical features"""
df = df.copy()
for col in categorical_cols:
if col not in self.encoders:
self.encoders[col] = LabelEncoder()
df[f'{col}_encoded'] = self.encoders[col].fit_transform(df[col].fillna('missing'))
else:
df[f'{col}_encoded'] = self.encoders[col].transform(df[col].fillna('missing'))
return df
def scale_numerical_features(self, df: pd.DataFrame, numerical_cols: List[str]) -> pd.DataFrame:
"""Scale numerical features"""
df = df.copy()
for col in numerical_cols:
if col not in self.scalers:
self.scalers[col] = StandardScaler()
df[f'{col}_scaled'] = self.scalers[col].fit_transform(df[[col]])
else:
df[f'{col}_scaled'] = self.scalers[col].transform(df[[col]])
return df
def select_features(self, X: pd.DataFrame, y: pd.Series, k: int = 50) -> List[str]:
"""Select top k features using statistical tests"""
selector = SelectKBest(score_func=f_classif, k=k)
selector.fit(X, y)
selected_features = X.columns[selector.get_support()].tolist()
# Log feature importance scores
feature_scores = dict(zip(X.columns, selector.scores_))
mlflow.log_dict(feature_scores, "feature_scores.json")
return selected_features
def create_feature_pipeline(self, df: pd.DataFrame, config: Dict[str, Any]) -> pd.DataFrame:
"""Execute complete feature engineering pipeline"""
with mlflow.start_run(run_name="feature_engineering"):
# Log input data statistics
mlflow.log_metrics({
"input_rows": len(df),
"input_features": len(df.columns),
"missing_values": df.isnull().sum().sum()
})
# Create temporal features
if config.get('temporal_features'):
df = self.create_temporal_features(df, config['date_column'])
# Create aggregation features
if config.get('aggregation_features'):
df = self.create_aggregation_features(
df,
config['group_columns'],
config['agg_columns'],
config['agg_functions']
)
# Encode categorical features
if config.get('categorical_columns'):
df = self.encode_categorical_features(df, config['categorical_columns'])
# Scale numerical features
if config.get('numerical_columns'):
df = self.scale_numerical_features(df, config['numerical_columns'])
# Log output statistics
mlflow.log_metrics({
"output_rows": len(df),
"output_features": len(df.columns),
"final_missing_values": df.isnull().sum().sum()
})
return df
def store_features_to_feast(self, df: pd.DataFrame, feature_view_name: str):
"""Store features to Feast feature store"""
# Convert DataFrame to Feast format and store
feature_df = df.copy()
feature_df['event_timestamp'] = pd.Timestamp.now()
# Push to feature store
self.feature_store.push(feature_view_name, feature_df)
3. Model Training Pipeline
# Model training with MLflow and Kubeflow
import mlflow
import mlflow.pytorch
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, roc_auc_score
import ray
from ray import tune
from ray.tune.schedulers import ASHAScheduler
import optuna
from typing import Dict, Any, Tuple
class MLModelTrainer:
"""ML model training with experiment tracking"""
def __init__(self, model_config: Dict[str, Any]):
self.model_config = model_config
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
def create_model(self, input_dim: int, hidden_dims: List[int], output_dim: int) -> nn.Module:
"""Create neural network model"""
layers = []
# Input layer
layers.append(nn.Linear(input_dim, hidden_dims[0]))
layers.append(nn.ReLU())
layers.append(nn.Dropout(0.2))
# Hidden layers
for i in range(len(hidden_dims) - 1):
layers.append(nn.Linear(hidden_dims[i], hidden_dims[i + 1]))
layers.append(nn.ReLU())
layers.append(nn.Dropout(0.2))
# Output layer
layers.append(nn.Linear(hidden_dims[-1], output_dim))
if output_dim == 1:
layers.append(nn.Sigmoid()) # Binary classification
else:
layers.append(nn.Softmax(dim=1)) # Multi-class classification
return nn.Sequential(*layers)
def train_model(self, train_loader: DataLoader, val_loader: DataLoader,
model: nn.Module, config: Dict[str, Any]) -> Tuple[nn.Module, Dict[str, float]]:
"""Train model with MLflow tracking"""
criterion = nn.BCELoss() if config['output_dim'] == 1 else nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=config['learning_rate'])
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=5)
best_val_loss = float('inf')
best_model_state = None
for epoch in range(config['epochs']):
# Training phase
model.train()
train_loss = 0.0
train_predictions = []
train_targets = []
for batch_x, batch_y in train_loader:
batch_x, batch_y = batch_x.to(self.device), batch_y.to(self.device)
optimizer.zero_grad()
outputs = model(batch_x)
loss = criterion(outputs, batch_y)
loss.backward()
optimizer.step()
train_loss += loss.item()
train_predictions.extend(outputs.cpu().detach().numpy())
train_targets.extend(batch_y.cpu().detach().numpy())
# Validation phase
model.eval()
val_loss = 0.0
val_predictions = []
val_targets = []
with torch.no_grad():
for batch_x, batch_y in val_loader:
batch_x, batch_y = batch_x.to(self.device), batch_y.to(self.device)
outputs = model(batch_x)
loss = criterion(outputs, batch_y)
val_loss += loss.item()
val_predictions.extend(outputs.cpu().detach().numpy())
val_targets.extend(batch_y.cpu().detach().numpy())
# Calculate metrics
train_loss_avg = train_loss / len(train_loader)
val_loss_avg = val_loss / len(val_loader)
scheduler.step(val_loss_avg)
# Log metrics to MLflow
mlflow.log_metrics({
"train_loss": train_loss_avg,
"val_loss": val_loss_avg,
"learning_rate": optimizer.param_groups[0]['lr']
}, step=epoch)
# Save best model
if val_loss_avg < best_val_loss:
best_val_loss = val_loss_avg
best_model_state = model.state_dict().copy()
# Load best model
model.load_state_dict(best_model_state)
# Calculate final metrics
val_predictions = np.array(val_predictions)
val_targets = np.array(val_targets)
if config['output_dim'] == 1:
val_pred_binary = (val_predictions > 0.5).astype(int)
accuracy = accuracy_score(val_targets, val_pred_binary)
precision, recall, f1, _ = precision_recall_fscore_support(val_targets, val_pred_binary, average='binary')
auc = roc_auc_score(val_targets, val_predictions)
metrics = {
"accuracy": accuracy,
"precision": precision,
"recall": recall,
"f1_score": f1,
"auc": auc
}
else:
val_pred_classes = np.argmax(val_predictions, axis=1)
accuracy = accuracy_score(val_targets, val_pred_classes)
precision, recall, f1, _ = precision_recall_fscore_support(val_targets, val_pred_classes, average='weighted')
metrics = {
"accuracy": accuracy,
"precision": precision,
"recall": recall,
"f1_score": f1
}
return model, metrics
def hyperparameter_tuning(self, train_data: Tuple, val_data: Tuple,
search_space: Dict[str, Any]) -> Dict[str, Any]:
"""Hyperparameter tuning with Ray Tune"""
def objective(config):
model = self.create_model(
input_dim=train_data[0].shape[1],
hidden_dims=config['hidden_dims'],
output_dim=config['output_dim']
).to(self.device)
train_loader = DataLoader(
TensorDataset(torch.FloatTensor(train_data[0]), torch.FloatTensor(train_data[1])),
batch_size=config['batch_size'],
shuffle=True
)
val_loader = DataLoader(
TensorDataset(torch.FloatTensor(val_data[0]), torch.FloatTensor(val_data[1])),
batch_size=config['batch_size'],
shuffle=False
)
_, metrics = self.train_model(train_loader, val_loader, model, config)
# Report metrics to Ray Tune
tune.report(**metrics)
# Configure search algorithm
scheduler = ASHAScheduler(
max_t=search_space['epochs'],
grace_period=10,
reduction_factor=2
)
# Run hyperparameter search
analysis = tune.run(
objective,
config=search_space,
scheduler=scheduler,
num_samples=20,
resources_per_trial={"cpu": 2, "gpu": 0.5}
)
return analysis.best_config
4. Model Serving Infrastructure
# Model serving with FastAPI and MLflow
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import mlflow
import mlflow.pytorch
import torch
import numpy as np
import pandas as pd
from typing import List, Dict, Any
import redis
import json
from datetime import datetime, timedelta
import logging
app = FastAPI(title="ML Model Serving API", version="1.0.0")
# Redis client for caching
redis_client = redis.Redis(host='redis', port=6379, db=0)
# Model cache
model_cache = {}
class PredictionRequest(BaseModel):
"""Prediction request schema"""
features: Dict[str, Any]
model_name: str
model_version: str = "latest"
class PredictionResponse(BaseModel):
"""Prediction response schema"""
prediction: float
prediction_proba: List[float] = None
model_name: str
model_version: str
timestamp: datetime
class ModelManager:
"""Manage model loading and caching"""
def __init__(self):
self.models = {}
self.model_metadata = {}
def load_model(self, model_name: str, model_version: str = "latest"):
"""Load model from MLflow registry"""
model_key = f"{model_name}:{model_version}"
if model_key not in self.models:
try:
# Load model from MLflow
model_uri = f"models:/{model_name}/{model_version}"
model = mlflow.pytorch.load_model(model_uri)
# Load model metadata
client = mlflow.tracking.MlflowClient()
model_version_info = client.get_model_version(model_name, model_version)
self.models[model_key] = model
self.model_metadata[model_key] = {
"name": model_name,
"version": model_version,
"stage": model_version_info.current_stage,
"created_at": model_version_info.creation_timestamp
}
logging.info(f"Loaded model {model_key}")
except Exception as e:
logging.error(f"Failed to load model {model_key}: {str(e)}")
raise HTTPException(status_code=404, detail=f"Model {model_key} not found")
return self.models[model_key], self.model_metadata[model_key]
def predict(self, model_name: str, model_version: str, features: Dict[str, Any]) -> Dict[str, Any]:
"""Make prediction with caching"""
model_key = f"{model_name}:{model_version}"
# Check cache first
cache_key = f"prediction:{model_key}:{hash(str(sorted(features.items())))}"
cached_result = redis_client.get(cache_key)
if cached_result:
return json.loads(cached_result)
# Load model and make prediction
model, metadata = self.load_model(model_name, model_version)
# Preprocess features
feature_vector = self.preprocess_features(features)
# Make prediction
model.eval()
with torch.no_grad():
inputs = torch.FloatTensor(feature_vector).unsqueeze(0)
outputs = model(inputs)
if outputs.shape[1] == 1: # Binary classification
prediction = outputs.item()
prediction_proba = [1 - prediction, prediction]
else: # Multi-class classification
prediction_proba = outputs.squeeze().tolist()
prediction = np.argmax(prediction_proba)
result = {
"prediction": float(prediction),
"prediction_proba": prediction_proba,
"model_name": model_name,
"model_version": model_version,
"timestamp": datetime.now().isoformat()
}
# Cache result for 1 hour
redis_client.setex(cache_key, 3600, json.dumps(result))
return result
def preprocess_features(self, features: Dict[str, Any]) -> np.ndarray:
"""Preprocess features for model input"""
# This should match the preprocessing used during training
# In practice, this would load the preprocessing pipeline from MLflow
# Convert to DataFrame for easier manipulation
df = pd.DataFrame([features])
# Apply same preprocessing as training
# (This is a simplified example)
feature_vector = df.values.flatten()
return feature_vector
model_manager = ModelManager()
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
"""Make prediction endpoint"""
try:
result = model_manager.predict(
request.model_name,
request.model_version,
request.features
)
return PredictionResponse(**result)
except Exception as e:
logging.error(f"Prediction failed: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/models")
async def list_models():
"""List available models"""
client = mlflow.tracking.MlflowClient()
models = client.list_registered_models()
return {
"models": [
{
"name": model.name,
"description": model.description,
"latest_version": model.latest_versions[0].version if model.latest_versions else None
}
for model in models
]
}
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {"status": "healthy", "timestamp": datetime.now().isoformat()}
5. Model Monitoring
# Model monitoring and drift detection
import pandas as pd
import numpy as np
from scipy import stats
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
import mlflow
import prometheus_client
from prometheus_client import Counter, Histogram, Gauge
import logging
from typing import Dict, Any, List
from dataclasses import dataclass
from datetime import datetime, timedelta
# Prometheus metrics
prediction_counter = Counter('ml_predictions_total', 'Total predictions made', ['model_name', 'model_version'])
prediction_latency = Histogram('ml_prediction_duration_seconds', 'Prediction latency', ['model_name'])
model_accuracy = Gauge('ml_model_accuracy', 'Model accuracy', ['model_name', 'model_version'])
data_drift_score = Gauge('ml_data_drift_score', 'Data drift score', ['model_name', 'feature_name'])
@dataclass
class DriftDetectionResult:
"""Data drift detection result"""
feature_name: str
drift_score: float
is_drift: bool
method: str
threshold: float
timestamp: datetime
class ModelMonitor:
"""Model monitoring and drift detection"""
def __init__(self, drift_threshold: float = 0.05):
self.drift_threshold = drift_threshold
self.reference_data = {}
self.prediction_history = []
def add_reference_data(self, model_name: str, data: pd.DataFrame):
"""Add reference data for drift detection"""
self.reference_data[model_name] = data
logging.info(f"Added reference data for model {model_name}: {len(data)} samples")
def detect_data_drift(self, model_name: str, current_data: pd.DataFrame) -> List[DriftDetectionResult]:
"""Detect data drift using statistical tests"""
if model_name not in self.reference_data:
raise ValueError(f"No reference data found for model {model_name}")
reference_data = self.reference_data[model_name]
drift_results = []
for feature in current_data.columns:
if feature in reference_data.columns:
# Kolmogorov-Smirnov test for numerical features
if pd.api.types.is_numeric_dtype(current_data[feature]):
statistic, p_value = stats.ks_2samp(
reference_data[feature].dropna(),
current_data[feature].dropna()
)
is_drift = p_value < self.drift_threshold
drift_score = 1 - p_value # Higher score means more drift
method = "KS_test"
# Chi-square test for categorical features
else:
ref_counts = reference_data[feature].value_counts(normalize=True)
curr_counts = current_data[feature].value_counts(normalize=True)
# Align categories
all_categories = set(ref_counts.index) | set(curr_counts.index)
ref_aligned = [ref_counts.get(cat, 0) for cat in all_categories]
curr_aligned = [curr_counts.get(cat, 0) for cat in all_categories]
statistic, p_value = stats.chisquare(curr_aligned, ref_aligned)
is_drift = p_value < self.drift_threshold
drift_score = 1 - p_value
method = "Chi_square_test"
result = DriftDetectionResult(
feature_name=feature,
drift_score=drift_score,
is_drift=is_drift,
method=method,
threshold=self.drift_threshold,
timestamp=datetime.now()
)
drift_results.append(result)
# Update Prometheus metrics
data_drift_score.labels(model_name=model_name, feature_name=feature).set(drift_score)
if is_drift:
logging.warning(f"Data drift detected for feature {feature} in model {model_name}: {drift_score}")
return drift_results
def log_prediction(self, model_name: str, model_version: str, features: Dict[str, Any],
prediction: float, actual: float = None):
"""Log prediction for monitoring"""
# Update Prometheus metrics
prediction_counter.labels(model_name=model_name, model_version=model_version).inc()
# Store prediction history
prediction_record = {
"model_name": model_name,
"model_version": model_version,
"features": features,
"prediction": prediction,
"actual": actual,
"timestamp": datetime.now()
}
self.prediction_history.append(prediction_record)
# Keep only last 10000 predictions
if len(self.prediction_history) > 10000:
self.prediction_history = self.prediction_history[-10000:]
def calculate_model_performance(self, model_name: str, window_hours: int = 24) -> Dict[str, float]:
"""Calculate model performance metrics over a time window"""
cutoff_time = datetime.now() - timedelta(hours=window_hours)
# Filter predictions in time window
recent_predictions = [
p for p in self.prediction_history
if p["model_name"] == model_name and p["timestamp"] > cutoff_time and p["actual"] is not None
]
if not recent_predictions:
return {}
# Extract predictions and actuals
predictions = [p["prediction"] for p in recent_predictions]
actuals = [p["actual"] for p in recent_predictions]
# Calculate metrics
accuracy = accuracy_score(actuals, np.round(predictions))
precision, recall, f1, _ = precision_recall_fscore_support(actuals, np.round(predictions), average='binary')
metrics = {
"accuracy": accuracy,
"precision": precision,
"recall": recall,
"f1_score": f1,
"sample_count": len(recent_predictions)
}
# Update Prometheus metrics
model_accuracy.labels(model_name=model_name, model_version="current").set(accuracy)
# Log to MLflow
with mlflow.start_run(run_name=f"monitoring_{model_name}"):
mlflow.log_metrics(metrics)
return metrics
def generate_monitoring_report(self, model_name: str) -> Dict[str, Any]:
"""Generate comprehensive monitoring report"""
# Calculate performance metrics
performance_metrics = self.calculate_model_performance(model_name)
# Get recent drift results
recent_predictions = [
p for p in self.prediction_history
if p["model_name"] == model_name and p["timestamp"] > datetime.now() - timedelta(hours=24)
]
if recent_predictions:
current_data = pd.DataFrame([p["features"] for p in recent_predictions])
drift_results = self.detect_data_drift(model_name, current_data)
else:
drift_results = []
# Generate alerts
alerts = []
if performance_metrics.get("accuracy", 0) < 0.8:
alerts.append({
"type": "performance",
"severity": "high",
"message": f"Model accuracy dropped to {performance_metrics.get('accuracy', 0):.2f}"
})
for drift_result in drift_results:
if drift_result.is_drift:
alerts.append({
"type": "drift",
"severity": "medium",
"message": f"Data drift detected for feature {drift_result.feature_name}"
})
report = {
"model_name": model_name,
"timestamp": datetime.now().isoformat(),
"performance_metrics": performance_metrics,
"drift_results": [
{
"feature": dr.feature_name,
"drift_score": dr.drift_score,
"is_drift": dr.is_drift,
"method": dr.method
}
for dr in drift_results
],
"alerts": alerts,
"prediction_volume_24h": len(recent_predictions)
}
return report
# Initialize global monitor
model_monitor = ModelMonitor()
Project Structure
ml-architecture/
├── data-pipeline/ # Data ingestion and processing
│ ├── airflow/
│ │ ├── dags/ # Airflow DAGs
│ │ ├── plugins/ # Custom operators
│ │ └── config/
│ ├── feature-engineering/ # Feature engineering pipelines
│ ├── data-validation/ # Data quality checks
│ └── feature-store/ # Feast feature store
├── ml-platform/ # Training and experimentation
│ ├── training/ # Training pipelines
│ │ ├── pytorch/
│ │ ├── sklearn/
│ │ └── xgboost/
│ ├── experiments/ # Experiment tracking
│ ├── hyperparameter-tuning/ # HPO with Ray Tune
│ └── model-registry/ # MLflow model registry
├── model-serving/ # Model deployment
│ ├── api/ # FastAPI serving
│ ├── batch-inference/ # Batch prediction jobs
│ ├── streaming/ # Real-time inference
│ └── a-b-testing/ # A/B testing framework
├── monitoring/ # Model monitoring
│ ├── drift-detection/
│ ├── performance-monitoring/
│ ├── alerting/
│ └── dashboards/
├── infrastructure/ # Infrastructure as code
│ ├── kubernetes/ # K8s manifests
│ ├── docker/ # Docker configurations
│ ├── terraform/ # Cloud infrastructure
│ └── helm/ # Helm charts
├── notebooks/ # Jupyter notebooks
│ ├── exploration/
│ ├── experiments/
│ └── analysis/
├── shared/ # Shared utilities
│ ├── data-utils/
│ ├── ml-utils/
│ └── monitoring-utils/
├── tests/ # Test suites
│ ├── unit/
│ ├── integration/
│ └── e2e/
├── docs/ # Documentation
└── scripts/ # Deployment scripts
Quick Start
-
Generate the ML architecture:
archetect render git@github.com:p6m-archetypes/ml-architecture-builder.archetype.git -
Start the infrastructure:
docker-compose up -d
kubectl apply -f infrastructure/kubernetes/ -
Initialize feature store:
cd feature-store
feast apply -
Run data pipeline:
airflow dags trigger data_ingestion_pipeline -
Train a model:
python ml-platform/training/train_model.py --config config/model_config.yaml -
Deploy model:
kubectl apply -f model-serving/kubernetes/ -
Access services:
- MLflow UI: http://localhost:5000
- Airflow UI: http://localhost:8080
- Model API: http://localhost:8000
- Monitoring Dashboard: http://localhost:3000
Best Practices
MLOps
- Version all data, code, and models
- Implement comprehensive testing
- Use feature stores for consistency
- Monitor model performance continuously
Data Management
- Implement data quality checks
- Use schema evolution strategies
- Track data lineage
- Implement proper data governance
Model Development
- Use experiment tracking consistently
- Implement proper cross-validation
- Document model assumptions
- Use automated hyperparameter tuning
Production Deployment
- Implement gradual rollouts
- Use A/B testing for model comparison
- Monitor for data drift
- Implement proper alerting
Security & Compliance
- Encrypt data at rest and in transit
- Implement access controls
- Audit model decisions
- Ensure regulatory compliance