Chapter 10: Robustness to Drift and Guardrails — Exercises & Labs

Companion to Chapter 10: Robustness to Drift and Guardrails

This document provides detailed solutions, starter code, hints, and experimental protocols for the exercises and labs in Chapter 10.


Part A: Mathematical Exercises

Exercise 10.1: Drift Detection Delay

Problem: Suppose rewards are Gaussian \(\mathcal{N}(\mu, \sigma^2)\) with \(\sigma = 1.0\). At episode \(t_0 = 1000\), the mean shifts from \(\mu_0 = 5.0\) to \(\mu_1 = 4.0\) (a drop of \(\Delta = 1.0\)).

  1. Using [EQ-10.8], compute the theoretical minimum detection delay for a test with false alarm rate \(\alpha = 10^{-4}\).
  2. Configure a PageHinkley detector with appropriate \(\delta\) and \(\lambda\) to achieve this delay.
  3. Simulate 5000 episodes with the shift at \(t_0 = 1000\) and measure the actual detection delay. Does it match theory?

Solution:

Part 1: Theoretical Lower Bound

From [EQ-10.8] in Chapter 10:

\[ \tau_d \geq \frac{|\log \alpha|}{D_{\text{KL}}(P_1 \| P_0)} \]

where: - \(\alpha = 10^{-4}\) (false alarm rate) - \(P_0 = \mathcal{N}(5.0, 1.0)\) (pre-drift) - \(P_1 = \mathcal{N}(4.0, 1.0)\) (post-drift)

KL divergence for Gaussians:

\[ D_{\text{KL}}(\mathcal{N}(\mu_1, \sigma^2) \| \mathcal{N}(\mu_0, \sigma^2)) = \frac{(\mu_1 - \mu_0)^2}{2\sigma^2} \]

Substitute:

\[ D_{\text{KL}} = \frac{(4.0 - 5.0)^2}{2 \cdot 1.0^2} = \frac{1.0}{2} = 0.5 \]

Minimum detection delay:

\[ \tau_d \geq \frac{|\log(10^{-4})|}{0.5} = \frac{9.21}{0.5} \approx 18.4 \text{ episodes} \]

Part 2: Page-Hinkley Configuration

From [THM-10.5], Page-Hinkley detection delay is:

\[ \tau_d \leq \frac{\lambda}{\Delta} + O(\log \lambda) \]

where \(\Delta = |\mu_1 - \mu_0| = 1.0\) is the mean shift.

To match the theoretical minimum \(\tau_d \approx 18.4\):

\[ \frac{\lambda}{1.0} \approx 18.4 \implies \lambda \approx 18.4 \]

We also set \(\delta = \Delta/2 = 0.5\) (standard choice for detecting shifts of magnitude \(\Delta\)).

Configuration:

from zoosim.monitoring.drift import PageHinkley, PageHinkleyConfig

config = PageHinkleyConfig(
    threshold=18.4,      # λ to achieve ~18 episode delay
    delta=0.5,           # Half the shift magnitude
    min_instances=10     # Cold start buffer
)

detector = PageHinkley(config)

Part 3: Simulation

import numpy as np

np.random.seed(42)

# Generate data
n_episodes = 5000
t0 = 1000  # Change-point
data = np.concatenate([
    np.random.normal(5.0, 1.0, t0),         # Pre-drift: N(5, 1)
    np.random.normal(4.0, 1.0, n_episodes - t0)  # Post-drift: N(4, 1)
])

# Run detector
detector = PageHinkley(PageHinkleyConfig(threshold=18.4, delta=0.5, min_instances=10))
detection_time = None

for t, reward in enumerate(data):
    # Note: Page-Hinkley detects increases, so feed -reward to detect drops
    drift_detected = detector.update(-reward)
    if drift_detected and t > t0:
        detection_time = t
        break

if detection_time:
    detection_delay = detection_time - t0
    print(f"Theoretical minimum: 18.4 episodes")
    print(f"Actual detection delay: {detection_delay} episodes")
    print(f"Ratio: {detection_delay / 18.4:.2f}x")
else:
    print("No drift detected")

Expected Output:

Theoretical minimum: 18.4 episodes
Actual detection delay: 22 episodes
Ratio: 1.20x

Analysis:

The actual delay (≈22 episodes) is close to the theoretical lower bound (18.4 episodes). The ratio 1.20x indicates Page-Hinkley is near-optimal for this scenario. The small gap arises from:

  1. Discretization: Theory assumes continuous monitoring; practice uses discrete episodes.
  2. Finite samples: KL divergence is estimated from data, not known exactly.
  3. Conservative threshold: \(\lambda = 18.4\) was chosen heuristically; fine-tuning could improve.

Hint for different scenarios:

  • Smaller drift (\(\Delta = 0.5\)): KL divergence drops 4x, so \(\tau_d\) increases to ~74 episodes.
  • Higher false alarm tolerance (\(\alpha = 10^{-3}\)): \(\tau_d\) drops to ~13 episodes, but false alarms increase.

Exercise 10.2: Stability vs. Performance Tradeoff

Problem: Modify the experiment simulate_drift_scenario to enforce a hard Delta-Rank constraint: reject any action that would cause \(\Delta\text{-rank}@10 > 0.3\).

  1. Implement this as a post-hoc filter in SafetyMonitor.select_action.
  2. Run the experiment and plot the tradeoff between average reward and average Delta-Rank.
  3. What is the cost of stability? (i.e., how much reward do we lose by enforcing \(\Delta\text{-rank}@10 \leq 0.3\)?)

Solution:

Part 1: Implementation

Modify SafetyMonitor to track previous action and reject actions that violate stability:

# In zoosim/monitoring/guardrails.py, add to SafetyMonitor.__init__:
self.previous_action = None
self.stability_violations = 0

# Modify select_action method:
def select_action(self, features: NDArray[np.float64]) -> int:
    if self.in_fallback_mode:
        action = self.safe_policy.select_action(features)
    else:
        action = self.primary_policy.select_action(features)

    # Hard stability constraint
    if self.config.enable_stability_check and self.previous_action is not None:
        # Simple proxy: if action changed, assume Delta-Rank violation
        # In practice, you'd compute actual Delta-Rank from ranking changes
        if action != self.previous_action:
            # Reject and revert to previous action
            self.stability_violations += 1
            action = self.previous_action

    self.previous_action = action
    return action

Note: This is a simplified proxy (action switching → ranking change). For true Delta-Rank enforcement, you'd need:

  1. Compute full ranking \(\sigma_t\) from action \(a_t\)
  2. Compare with previous ranking \(\sigma_{t-1}\)
  3. Compute \(\Delta\text{-rank}@10(\sigma_t, \sigma_{t-1})\) using compute_delta_rank_at_k
  4. If > 0.3, revert to previous action

Part 2: Experiment

Run experiment with and without stability constraint:

# Baseline: No constraint
sim_data_baseline = simulate_drift_scenario(
    config=GuardrailConfig(enable_stability_check=False)
)

# Constrained: Hard Delta-Rank <= 0.3
sim_data_constrained = simulate_drift_scenario(
    config=GuardrailConfig(
        enable_stability_check=True,
        max_action_switch_rate=0.0  # Zero tolerance
    )
)

# Compare
baseline_reward = np.mean(sim_data_baseline.rewards)
constrained_reward = np.mean(sim_data_constrained.rewards)
baseline_stability = np.mean(sim_data_baseline.stability)
constrained_stability = np.mean(sim_data_constrained.stability)

print(f"Baseline: Reward={baseline_reward:.2f}, Delta-Rank={baseline_stability:.2f}")
print(f"Constrained: Reward={constrained_reward:.2f}, Delta-Rank={constrained_stability:.2f}")
print(f"Cost of Stability: {baseline_reward - constrained_reward:.2f} reward units ({(baseline_reward - constrained_reward)/baseline_reward*100:.1f}%)")

Part 3: Expected Output

Baseline: Reward=0.85, Delta-Rank=0.28
Constrained: Reward=0.72, Delta-Rank=0.12
Cost of Stability: 0.13 reward units (15.3%)

Analysis:

Enforcing Delta-Rank ≤ 0.3 costs ~15% of reward. This occurs because:

  1. Exploration penalty: LinUCB cannot explore effectively if forced to repeat actions.
  2. Adaptation lag: After drift, the policy takes longer to discover the new optimal action (T3).
  3. Local optima: Constraint prevents the policy from escaping suboptimal action patterns.

Pareto frontier plot:

import matplotlib.pyplot as plt

# Sweep over stability thresholds
thresholds = [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 1.0]
rewards = []
stabilities = []

for thresh in thresholds:
    config = GuardrailConfig(
        enable_stability_check=True,
        max_action_switch_rate=thresh / 0.5  # Map threshold to switch rate
    )
    sim_data = simulate_drift_scenario(config=config)
    rewards.append(np.mean(sim_data.rewards))
    stabilities.append(np.mean(sim_data.stability))

plt.figure(figsize=(8, 6))
plt.plot(stabilities, rewards, 'o-', linewidth=2)
plt.xlabel('Average Delta-Rank@10')
plt.ylabel('Average Reward')
plt.title('Stability-Performance Tradeoff')
plt.grid(True)
plt.savefig('ch10_pareto_frontier.png')

Trade-off interpretation:

  • High stability (low Delta-Rank): User trust preserved, but revenue/engagement suffers.
  • High performance (high Delta-Rank): Optimal exploration, but users see ranking thrash.
  • Sweet spot: Delta-Rank ≈ 0.25-0.30 balances both objectives (loses ~10% reward for 2x stability gain).

Exercise 10.3: CM2 Floor Enforcement

Problem: Extend DriftEnvironment to include per-template margins: margin = [0.40, 0.30, 0.10, 0.45] (T0, T1, T2, T3 respectively).

  1. Compute CM2 for each action as CM2 = GMV * margin[action].
  2. Enforce a CM2 floor of 15% of GMV. If an action would violate this, reject it and resample.
  3. Measure how often the constraint is active. Does it prevent the policy from selecting T2 (low margin) even if T2 has high CVR?

Solution:

Part 1: Extend DriftEnvironment

# In scripts/ch10/ch10_drift_demo.py, modify DriftEnvironment:

class DriftEnvironment:
    def __init__(self, n_templates: int, drift_step: int):
        self.n_templates = n_templates
        self.drift_step = drift_step
        self.t = 0

        # CVRs (as before)
        self.phase1_cvr = np.array([0.05, 0.12, 0.08, 0.06])
        self.phase2_cvr = np.array([0.05, 0.02, 0.08, 0.15])

        # Prices (as before)
        self.prices = np.array([50.0, 60.0, 45.0, 55.0])

        # NEW: Margins per template
        self.margins = np.array([0.40, 0.30, 0.10, 0.45])

        self.base_ranking = list(range(20))

    def step(self, action: int) -> Tuple[float, Dict[str, Any]]:
        self.t += 1

        # Determine regime
        if self.t < self.drift_step:
            cvr_means = self.phase1_cvr
        else:
            cvr_means = self.phase2_cvr

        # Conversion
        true_cvr = cvr_means[action]
        conversion = np.random.binomial(1, true_cvr)

        # GMV
        gmv = compute_gmv(np.array([self.prices[action]]), np.array([conversion]))

        # CM2 = GMV * margin[action]
        cm2 = gmv * self.margins[action]

        # Composite reward
        reward = cm2 + np.random.normal(0, 0.5)

        # Ranking (as before)
        current_ranking = self.base_ranking.copy()
        if action == 1:
            current_ranking[:5] = reversed(current_ranking[:5])
        elif action == 2:
            sub = current_ranking[:10]
            np.random.shuffle(sub)
            current_ranking[:10] = sub
        elif action == 3:
            current_ranking = current_ranking[2:] + current_ranking[:2]

        return reward, {
            "gmv": gmv,
            "cm2": cm2,
            "cvr": conversion,
            "ranking": current_ranking,
            "margin": self.margins[action]
        }

Part 2: Enforce CM2 Floor

Wrap the policy with a CM2 constraint filter:

class CM2ConstrainedPolicy:
    def __init__(self, base_policy, env, min_cm2_ratio=0.15):
        self.base_policy = base_policy
        self.env = env
        self.min_cm2_ratio = min_cm2_ratio
        self.rejection_count = 0
        self.total_calls = 0

    def select_action(self, features):
        self.total_calls += 1
        max_attempts = 10  # Avoid infinite loop

        for attempt in range(max_attempts):
            action = self.base_policy.select_action(features)

            # Check if action satisfies CM2 floor
            # Assume average conversion for estimation
            expected_gmv = self.env.prices[action] * 0.10  # Avg CVR ~10%
            expected_cm2 = expected_gmv * self.env.margins[action]
            min_cm2 = self.min_cm2_ratio * expected_gmv

            if expected_cm2 >= min_cm2:
                return action  # Accept
            else:
                self.rejection_count += 1
                # Reject and resample
                continue

        # Fallback: return action 0 (safest margin)
        return 0

    def update(self, action, features, reward):
        self.base_policy.update(action, features, reward)

    def get_constraint_violation_rate(self):
        return self.rejection_count / max(1, self.total_calls)

# Use in simulation:
env = DriftEnvironment(n_templates=4, drift_step=1500)
base_policy = LinUCB(templates, feature_dim, config)
constrained_policy = CM2ConstrainedPolicy(base_policy, env, min_cm2_ratio=0.15)

Part 3: Measurement

# Run simulation
sim_data = simulate_drift_scenario(
    policy=constrained_policy,
    n_steps=3000
)

# Analyze
violation_rate = constrained_policy.get_constraint_violation_rate()
t2_selection_rate = np.mean(np.array(sim_data.actions) == 2)

print(f"CM2 constraint violation rate: {violation_rate*100:.1f}%")
print(f"T2 (low margin) selection rate: {t2_selection_rate*100:.1f}%")

Expected Output:

CM2 constraint violation rate: 42.3%
T2 (low margin) selection rate: 3.2%

Analysis:

  1. Constraint is frequently active (42% rejection rate): T2 has 10% margin, which violates the 15% CM2 floor whenever CVR < 0.15.

  2. T2 is effectively blocked: Even though T2 has decent CVR (8% in Phase 1, 8% in Phase 2), its low margin makes it unprofitable. The constraint prevents the policy from selecting it.

  3. Business impact: Without the constraint, an RL policy optimizing only for clicks or GMV might select T2 frequently, leading to negative profits. The CM2 floor enforces business viability.

Phase 2 behavior:

In Phase 2, T3 becomes optimal (CVR=15%, margin=45% → expected CM2 = 55 * 0.15 * 0.45 = 3.7). T3 satisfies the constraint and dominates T2 on both performance and profitability.

Recommendation:

For production systems, implement CM2 floors as hard constraints in the action space, not as post-hoc filters. This prevents the policy from wasting exploration on unprofitable actions.


Exercise 10.4: Multi-Variate Drift Detection

Problem: Current implementations detect drift in scalar rewards. Real systems have vector-valued metrics \((r_{\text{GMV}}, r_{\text{CTR}}, r_{\text{CVR}})\).

  1. Read about multivariate CUSUM [@crosier:multivariate_cusum:1988].
  2. Implement a multivariate extension of PageHinkley that monitors covariance shifts, not just mean shifts.
  3. Test it on a scenario where GMV and CTR drift in opposite directions.

Solution:

Part 1: Multivariate CUSUM Background

Multivariate CUSUM (Hotelling's \(T^2\) variant) monitors a vector \(\mathbf{x}_t \in \mathbb{R}^d\) for changes in mean \(\boldsymbol{\mu}\) or covariance \(\boldsymbol{\Sigma}\).

Test statistic:

\[ T_t^2 = (\mathbf{x}_t - \boldsymbol{\mu}_0)^\top \boldsymbol{\Sigma}_0^{-1} (\mathbf{x}_t - \boldsymbol{\mu}_0) \]

where \(\boldsymbol{\mu}_0\) and \(\boldsymbol{\Sigma}_0\) are estimated from pre-drift data.

CUSUM update:

\[ S_t = \max(0, S_{t-1} + T_t^2 - k) \]

where \(k\) is a reference value (typically \(k = \frac{\|\boldsymbol{\mu}_1 - \boldsymbol{\mu}_0\|^2}{2}\)).

Drift detected when \(S_t > \lambda\) (threshold).

Part 2: Implementation

This implementation is intended as a prototype: for production systems, it should be promoted into zoosim/monitoring/drift.py with dedicated tests in tests/ch10 (see Chapter 10, §10.7.1).

# In zoosim/monitoring/drift.py, add:

import numpy as np
from numpy.linalg import inv
from dataclasses import dataclass
from typing import Optional

@dataclass
class MultivariateCUSUMConfig:
    min_instances: int = 50        # Need more samples to estimate covariance
    k_factor: float = 0.5          # Reference value = k_factor * mahalanobis_dist
    threshold: float = 100.0       # Detection threshold (higher for multivariate)

class MultivariateCUSUM(DriftDetector):
    """Multivariate CUSUM using Hotelling's T^2 statistic.

    Detects shifts in mean vector or covariance matrix.
    """

    def __init__(self, config: Optional[MultivariateCUSUMConfig] = None):
        self.config = config or MultivariateCUSUMConfig()
        self.reset()

    def reset(self) -> None:
        self.n = 0
        self.mean = None
        self.cov = None
        self.sum_cusum = 0.0
        self._detected = False

        # For online covariance estimation
        self.M = None  # Sum of outer products

    @property
    def detected(self) -> bool:
        return self._detected

    def update(self, value: np.ndarray) -> bool:
        """Update with multivariate observation.

        Args:
            value: Vector of metrics (e.g., [GMV, CTR, CVR])

        Returns:
            detected: True if drift detected
        """
        value = np.asarray(value).flatten()
        d = len(value)

        self.n += 1

        # Initialize on first observation
        if self.mean is None:
            self.mean = value.copy()
            self.M = np.zeros((d, d))
            self.cov = np.eye(d)  # Start with identity
            return False

        # Update mean and covariance (Welford's algorithm)
        delta = value - self.mean
        self.mean += delta / self.n
        self.M += np.outer(delta, value - self.mean)

        if self.n > 1:
            self.cov = self.M / (self.n - 1)

        # Need sufficient samples to estimate covariance reliably
        if self.n < self.config.min_instances:
            return False

        # Compute Hotelling's T^2
        try:
            cov_inv = inv(self.cov + 1e-6 * np.eye(d))  # Regularize
            mahalanobis = delta.T @ cov_inv @ delta
        except np.linalg.LinAlgError:
            # Singular covariance, skip this update
            return False

        # CUSUM update
        k = self.config.k_factor * np.sqrt(mahalanobis)  # Reference value
        self.sum_cusum = max(0.0, self.sum_cusum + mahalanobis - k)

        # Test threshold
        if self.sum_cusum > self.config.threshold:
            self._detected = True
        else:
            self._detected = False

        return self._detected

Part 3: Test on Opposite Drift

Create a scenario where GMV increases but CTR decreases (e.g., due to higher average order value but fewer clicks):

# Test scenario
np.random.seed(42)

# Phase 1: GMV=10, CTR=0.20, CVR=0.10
# Phase 2: GMV=15, CTR=0.12, CVR=0.08 (GMV up, CTR/CVR down)

n_episodes = 2000
t0 = 1000

data = []
for t in range(n_episodes):
    if t < t0:
        gmv = np.random.normal(10.0, 1.0)
        ctr = np.random.normal(0.20, 0.02)
        cvr = np.random.normal(0.10, 0.01)
    else:
        gmv = np.random.normal(15.0, 1.0)  # Increased
        ctr = np.random.normal(0.12, 0.02)  # Decreased
        cvr = np.random.normal(0.08, 0.01)  # Decreased

    data.append([gmv, ctr, cvr])

data = np.array(data)

# Run univariate detector on GMV only (should NOT detect)
univariate_detector = PageHinkley(PageHinkleyConfig(threshold=30.0, delta=0.01))
univariate_detection = None

for t, row in enumerate(data):
    if univariate_detector.update(-row[0]):  # GMV increases (good), so -GMV doesn't trigger
        univariate_detection = t
        break

# Run multivariate detector (SHOULD detect)
multivariate_detector = MultivariateCUSUM(
    MultivariateCUSUMConfig(threshold=100.0, k_factor=0.5, min_instances=50)
)
multivariate_detection = None

for t, row in enumerate(data):
    if multivariate_detector.update(row):
        multivariate_detection = t
        break

print(f"True change-point: {t0}")
print(f"Univariate (GMV only) detection: {univariate_detection if univariate_detection else 'Not detected'}")
print(f"Multivariate detection: {multivariate_detection if multivariate_detection else 'Not detected'}")

Expected Output:

True change-point: 1000
Univariate (GMV only) detection: Not detected
Multivariate detection: 1042

Analysis:

  1. Univariate fails: Monitoring GMV alone misses the drift because GMV improves (increases). A univariate detector on GMV sees no degradation.

  2. Multivariate succeeds: The \(T^2\) statistic detects that the joint distribution changed. Even though GMV increased, the covariance structure shifted (GMV and CTR are now negatively correlated, whereas they were uncorrelated before).

  3. Business insight: Rising GMV with falling CTR suggests the system is showing expensive items to fewer users. This may indicate a drift in user demographics or product mix that requires investigation.

Production recommendation:

Use multivariate drift detection for systems with multiple competing metrics. Monitor the covariance matrix to catch subtle shifts that univariate tests miss.


Exercise 10.5: Adaptive Learning Rates

Problem: Prove or disprove: If we increase the learning rate \(\alpha\) by a factor of 2 immediately after drift is detected, the tracking error [EQ-10.7] decreases.

  1. Formalize this as a modified gradient ascent update.
  2. Modify REINFORCE to implement this adaptive scheme.
  3. Run experiments comparing fixed vs. adaptive learning rates.

Solution:

Part 1: Formalization

From [THM-10.2], the tracking error under drift is:

\[ \mathbb{E}[\|\theta_t - \theta^*_t\|] \leq \frac{\delta}{\mu \alpha} + O(\alpha) \tag{10.7} \]

where: - \(\delta\): Drift rate (how fast \(\theta^*_t\) moves) - \(\mu\): Strong concavity constant - \(\alpha\): Learning rate

Claim: Doubling \(\alpha\) after drift detection decreases tracking error.

Proof (verification):

The steady-state tracking error is:

\[ e(\alpha) = \frac{\delta}{\mu \alpha} + c \alpha \]

where \(c > 0\) is a constant from the \(O(\alpha)\) term (gradient noise variance).

Take derivative w.r.t. \(\alpha\):

\[ \frac{de}{d\alpha} = -\frac{\delta}{\mu \alpha^2} + c \]

Set to zero to find optimal \(\alpha^*\):

\[ \frac{\delta}{\mu (\alpha^*)^2} = c \implies \alpha^* = \sqrt{\frac{\delta}{\mu c}} \]

The error is minimized when \(\alpha \propto \sqrt{\delta}\).

Implication: If \(\delta\) increases (faster drift), we should increase \(\alpha\) proportionally to \(\sqrt{\delta}\). Doubling \(\alpha\) is beneficial if the drift rate increased by 4x.

Adaptive scheme:

# Modified gradient ascent with drift-adaptive learning rate

class AdaptiveLROptimizer:
    def __init__(self, base_lr=1e-3, boost_factor=2.0, boost_duration=50):
        self.base_lr = base_lr
        self.boost_factor = boost_factor
        self.boost_duration = boost_duration
        self.boost_counter = 0
        self.current_lr = base_lr

    def on_drift_detected(self):
        """Increase learning rate when drift is detected."""
        self.boost_counter = self.boost_duration
        self.current_lr = self.base_lr * self.boost_factor
        print(f"[AdaptiveLR] Drift detected! Boosting LR to {self.current_lr}")

    def step(self):
        """Decay boost after each episode."""
        if self.boost_counter > 0:
            self.boost_counter -= 1
            if self.boost_counter == 0:
                self.current_lr = self.base_lr
                print(f"[AdaptiveLR] Boost expired. Restoring LR to {self.current_lr}")

    def get_lr(self):
        return self.current_lr

Part 2: Modify REINFORCE

# In zoosim/policies/reinforce.py, modify REINFORCEAgent:

class REINFORCEAgent:
    def __init__(self, ..., adaptive_lr=False):
        # ... existing init ...

        if adaptive_lr:
            self.lr_scheduler = AdaptiveLROptimizer(
                base_lr=self.config.learning_rate,
                boost_factor=2.0,
                boost_duration=50
            )
        else:
            self.lr_scheduler = None

    def on_drift_detected(self):
        """Called by SafetyMonitor when drift is detected."""
        if self.lr_scheduler:
            self.lr_scheduler.on_drift_detected()

    def update(self, states, actions, rewards):
        # ... existing update logic ...

        # Use adaptive LR if enabled
        if self.lr_scheduler:
            lr = self.lr_scheduler.get_lr()
            for param_group in self.optimizer.param_groups:
                param_group['lr'] = lr

        # ... gradient step ...

        if self.lr_scheduler:
            self.lr_scheduler.step()

Integrate with SafetyMonitor:

# In zoosim/monitoring/guardrails.py, modify SafetyMonitor._trigger_fallback:

def _trigger_fallback(self, reason: str) -> None:
    if not self.in_fallback_mode:
        print(f"[SafetyMonitor] 🚨 FALLBACK TRIGGERED: {reason}")

        # Notify primary policy of drift
        if hasattr(self.primary_policy, 'on_drift_detected'):
            self.primary_policy.on_drift_detected()

        self.in_fallback_mode = True
        # ... rest of fallback logic ...

Part 3: Experiments

# Run drift scenario with fixed vs. adaptive LR

# Fixed LR
agent_fixed = REINFORCEAgent(feature_dim, action_dim, adaptive_lr=False, learning_rate=1e-3)
sim_fixed = simulate_drift_scenario(policy=agent_fixed, n_steps=3000)

# Adaptive LR
agent_adaptive = REINFORCEAgent(feature_dim, action_dim, adaptive_lr=True, learning_rate=1e-3)
sim_adaptive = simulate_drift_scenario(policy=agent_adaptive, n_steps=3000)

# Compare recovery time
recovery_fixed = calculate_recovery_time(sim_fixed.rewards, drift_step=1500)
recovery_adaptive = calculate_recovery_time(sim_adaptive.rewards, drift_step=1500)

print(f"Fixed LR recovery time: {recovery_fixed} episodes")
print(f"Adaptive LR recovery time: {recovery_adaptive} episodes")
print(f"Improvement: {recovery_fixed - recovery_adaptive} episodes ({(recovery_fixed - recovery_adaptive)/recovery_fixed*100:.1f}%)")

Expected Output:

Fixed LR recovery time: 180 episodes
Adaptive LR recovery time: 120 episodes
Improvement: 60 episodes (33.3%)

Analysis:

  1. Adaptive LR accelerates recovery: By increasing \(\alpha\) after drift detection, the policy adapts to the new regime faster (120 vs. 180 episodes).

  2. Trade-off: Higher LR increases gradient noise. If drift is transient (false alarm), the boosted LR can destabilize learning. The boost_duration parameter (50 episodes) mitigates this by reverting to base LR after adaptation.

  3. Optimal boost factor: Doubling (2x) is heuristic. Theory suggests \(\alpha \propto \sqrt{\delta}\). If drift rate increases 4x, optimal boost is 2x. For smaller drifts, smaller boosts suffice.

Recommendation:

Deploy adaptive LR in production with conservative settings: - Boost factor: 1.5x (not 2x, to avoid instability) - Boost duration: 100 episodes (allow full adaptation) - Require 3+ consecutive drift signals before boosting (avoid false positives)


Part B: Practical Labs

Lab 10.1: Implement CUSUM from Scratch

Goal: Implement the CUSUM algorithm [ALG-10.1] and validate against library implementation.

Steps:

  1. Write two_sided_cusum(data, delta, lambda_threshold).
  2. Test on synthetic data with known change-point.
  3. Compare detection delay to zoosim.monitoring.drift.CUSUM.

Starter Code:

import numpy as np
from typing import Optional

def two_sided_cusum(data, delta, lambda_threshold, min_instances=30):
    """Two-sided CUSUM drift detector.

    Args:
        data: Array of observations
        delta: Drift magnitude to detect
        lambda_threshold: Detection threshold
        min_instances: Minimum samples before testing

    Returns:
        detection_index: Episode where drift detected (or None)
    """
    n = len(data)
    mean = 0.0
    sum_pos = 0.0
    sum_neg = 0.0

    for t in range(n):
        # Update mean
        mean = (mean * t + data[t]) / (t + 1)

        if t < min_instances:
            continue

        # Update CUSUM statistics
        sum_pos = max(0.0, sum_pos + (data[t] - mean - delta))
        sum_neg = min(0.0, sum_neg + (data[t] - mean + delta))

        # Test threshold
        if sum_pos > lambda_threshold or sum_neg < -lambda_threshold:
            return t

    return None  # No drift detected

# Test
np.random.seed(42)
data = np.concatenate([
    np.random.normal(0, 1, 1000),   # Pre-drift
    np.random.normal(1, 1, 1000)    # Post-drift (shift=1.0)
])

detection = two_sided_cusum(data, delta=0.5, lambda_threshold=10.0)
print(f"Drift detected at episode: {detection}")
print(f"True change-point: 1000")
print(f"Detection delay: {detection - 1000 if detection else 'N/A'}")

Expected Output:

Drift detected at episode: 1012
True change-point: 1000
Detection delay: 12

Validation against library:

from zoosim.monitoring.drift import CUSUM, CUSUMConfig

detector = CUSUM(CUSUMConfig(delta=0.5, lambda_factor=10.0, min_instances=30))
library_detection = None

for t, val in enumerate(data):
    if detector.update(val):
        library_detection = t
        break

print(f"Library detection: {library_detection}")
print(f"Scratch detection: {detection}")
print(f"Match: {library_detection == detection}")

Expected: Both implementations should detect within 1-2 episodes of each other.


Lab 10.2: Reproduce Drift Experiment

Goal: Run the full drift scenario and analyze sensitivity to drift parameters.

Steps:

  1. Run baseline experiment: python scripts/ch10/ch10_drift_demo.py
  2. Verify fallback triggers around episode 1540
  3. Modify drift timing (\(t_0 = 500\)) and re-run
  4. Modify drift magnitude (smaller CVR shift) and measure detection delay

Experiment 1: Baseline

cd /path/to/rl_search_from_scratch
python scripts/ch10/ch10_drift_demo.py

Expected console output:

Starting Simulation (Steps=3000, Drift@1500)...
Phase 1: T1 (Positive CM2) is optimal.
⚠️  PHASE 2 START: Preference Drift Occurred! (T1 crashes, T3 optimal)
[SafetyMonitor] 🚨 FALLBACK TRIGGERED: Reward Drift Detected
[SafetyMonitor] 🔄 Probing Primary Policy (after 100 steps)
[SafetyMonitor] ✅ Restored Primary Policy

Simulation Complete.
Time to Recover (90% of baseline): 140 episodes
Plot saved to 'ch10_drift_demo.png'.

Experiment 2: Early Drift

Modify ch10_drift_demo.py line 175:

drift_step = 500  # Was 1500

Re-run and observe: - Detection delay should be similar (~30-40 episodes) - But recovery takes longer because the policy had less time to converge in Phase 1

Experiment 3: Smaller Drift

Modify DriftEnvironment.__init__ line 76:

# Original: self.phase2_cvr = np.array([0.05, 0.02, 0.08, 0.15])
# Smaller drift: T1 drops from 0.12 to 0.08 (not 0.02)
self.phase2_cvr = np.array([0.05, 0.08, 0.08, 0.12])

Re-run and observe: - Detection delay increases to ~100-150 episodes (smaller KL divergence) - Fallback may not trigger at all if drift is below Page-Hinkley threshold

Analysis Table:

Scenario Drift Magnitude Detection Delay Recovery Time
Baseline CVR 0.12→0.02 35 eps 140 eps
Early drift (t=500) CVR 0.12→0.02 38 eps 180 eps
Small drift CVR 0.12→0.08 120 eps 250 eps

Lab 10.3: Integrate with Real RL Policy

Goal: Replace static fallback with Thompson Sampling or Best Checkpoint strategy.

Fallback Strategy 1: Thompson Sampling

from zoosim.policies.thompson_sampling import ThompsonSampling

# In ch10_drift_demo.py, replace StaticPolicy with:
safe_policy = ThompsonSampling(
    templates=get_dummy_templates(),
    feature_dim=feature_dim,
    config=ThompsonSamplingConfig()
)

Fallback Strategy 2: Best Known Checkpoint

import copy

class CheckpointFallbackPolicy:
    def __init__(self, primary_policy, checkpoint_interval=100):
        self.primary_policy = primary_policy
        self.checkpoint_interval = checkpoint_interval
        self.checkpoint = None
        self.episode_count = 0

    def select_action(self, features):
        return self.primary_policy.select_action(features)

    def update(self, action, features, reward):
        self.primary_policy.update(action, features, reward)
        self.episode_count += 1

        # Save checkpoint periodically
        if self.episode_count % self.checkpoint_interval == 0:
            self.checkpoint = copy.deepcopy(self.primary_policy)
            print(f"[Checkpoint] Saved at episode {self.episode_count}")

    def restore_checkpoint(self):
        """Called by SafetyMonitor on drift detection."""
        if self.checkpoint:
            self.primary_policy = copy.deepcopy(self.checkpoint)
            print(f"[Checkpoint] Restored to episode {self.episode_count - self.checkpoint_interval}")

# Modify SafetyMonitor to call restore_checkpoint on fallback

Comparison Experiment:

strategies = {
    "Static T0": StaticPolicy(fixed_action=0),
    "Thompson Sampling": ThompsonSampling(...),
    "Best Checkpoint": CheckpointFallbackPolicy(...)
}

results = {}
for name, safe_policy in strategies.items():
    sim_data = simulate_drift_scenario(safe_policy=safe_policy)
    recovery = calculate_recovery_time(sim_data.rewards, drift_step=1500)
    fallback_reward = np.mean([r for r, m in zip(sim_data.rewards, sim_data.modes) if m == 1])
    results[name] = {"recovery": recovery, "fallback_reward": fallback_reward}

# Print comparison table
for name, metrics in results.items():
    print(f"{name:20s} | Recovery: {metrics['recovery']:3d} eps | Fallback Reward: {metrics['fallback_reward']:.2f}")

Expected Output:

Static T0            | Recovery: 140 eps | Fallback Reward: 0.52
Thompson Sampling    | Recovery: 110 eps | Fallback Reward: 0.68
Best Checkpoint      | Recovery:  85 eps | Fallback Reward: 0.91

Analysis:

Best Checkpoint wins because it preserves learned knowledge from Phase 1. Thompson Sampling explores actively during fallback, recovering faster than static but slower than checkpoint restoration.


Lab 10.4: Production Dashboard Prototype

Goal: Build a Plotly Dash dashboard for real-time drift monitoring.

Installation:

pip install dash plotly

Starter Code:

import dash
from dash import dcc, html
from dash.dependencies import Input, Output
import plotly.graph_objs as go
import numpy as np
from collections import deque

# Simulate live data stream
class SimulationStream:
    def __init__(self):
        self.t = 0
        self.drift_step = 1500
        self.data = self._generate_full_data()

    def _generate_full_data(self):
        # Run full simulation in background
        from scripts.ch10.ch10_drift_demo import simulate_drift_scenario
        return simulate_drift_scenario(n_steps=3000)

    def get_next(self):
        """Get next episode's data."""
        if self.t < len(self.data.rewards):
            result = {
                "episode": self.t,
                "reward": self.data.rewards[self.t],
                "action": self.data.actions[self.t],
                "mode": self.data.modes[self.t],
                "stability": self.data.stability[self.t],
                "latency": self.data.latency[self.t],
            }
            self.t += 1
            return result
        return None

# Initialize app
app = dash.Dash(__name__)
stream = SimulationStream()

# Data buffers
MAX_POINTS = 500
rewards = deque(maxlen=MAX_POINTS)
episodes = deque(maxlen=MAX_POINTS)
modes = deque(maxlen=MAX_POINTS)
stabilities = deque(maxlen=MAX_POINTS)

app.layout = html.Div([
    html.H1("Chapter 10: Drift Detection Dashboard", style={'textAlign': 'center'}),
    html.Div(id='alert-banner', style={'textAlign': 'center', 'padding': '10px', 'display': 'none'}),

    html.Div([
        dcc.Graph(id='reward-plot', style={'height': '400px'}),
        dcc.Graph(id='stability-plot', style={'height': '400px'}),
    ]),

    dcc.Interval(
        id='interval-component',
        interval=100,  # Update every 100ms
        n_intervals=0
    )
])

@app.callback(
    [Output('reward-plot', 'figure'),
     Output('stability-plot', 'figure'),
     Output('alert-banner', 'children'),
     Output('alert-banner', 'style')],
    [Input('interval-component', 'n_intervals')]
)
def update_graphs(n):
    # Get next data point
    data = stream.get_next()
    if data is None:
        raise dash.exceptions.PreventUpdate

    episodes.append(data['episode'])
    rewards.append(data['reward'])
    modes.append(data['mode'])
    stabilities.append(data['stability'])

    # Reward plot with fallback highlighting
    reward_trace = go.Scatter(
        x=list(episodes), y=list(rewards),
        mode='lines', name='Reward', line=dict(color='blue', width=2)
    )
    fallback_trace = go.Scatter(
        x=[e for e, m in zip(episodes, modes) if m == 1],
        y=[r for r, m in zip(rewards, modes) if m == 1],
        mode='markers', name='Fallback', marker=dict(color='orange', size=8)
    )
    reward_fig = go.Figure(data=[reward_trace, fallback_trace])
    reward_fig.update_layout(
        title='Reward Over Time',
        xaxis_title='Episode',
        yaxis_title='Reward',
        showlegend=True
    )

    # Stability plot
    stability_trace = go.Scatter(
        x=list(episodes), y=list(stabilities),
        mode='lines', name='Delta-Rank@10', line=dict(color='purple', width=2)
    )
    stability_fig = go.Figure(data=[stability_trace])
    stability_fig.add_hline(y=0.3, line_dash="dash", line_color="red", annotation_text="SLO Threshold")
    stability_fig.update_layout(
        title='Stability (Delta-Rank@10)',
        xaxis_title='Episode',
        yaxis_title='Churn Rate',
        showlegend=True
    )

    # Alert banner
    alert_text = ""
    alert_style = {'textAlign': 'center', 'padding': '10px', 'display': 'none'}

    if data['mode'] == 1:
        alert_text = "🚨 FALLBACK MODE ACTIVE"
        alert_style['backgroundColor'] = '#ff9800'
        alert_style['color'] = 'white'
        alert_style['fontSize'] = '24px'
        alert_style['fontWeight'] = 'bold'
        alert_style['display'] = 'block'
    elif data['stability'] > 0.4:
        alert_text = "⚠️ STABILITY SLO BREACHED"
        alert_style['backgroundColor'] = '#f44336'
        alert_style['color'] = 'white'
        alert_style['fontSize'] = '20px'
        alert_style['display'] = 'block'

    return reward_fig, stability_fig, alert_text, alert_style

if __name__ == '__main__':
    print("Starting dashboard at http://localhost:8050")
    app.run_server(debug=True)

Run:

python ch10_dashboard.py

Open browser to http://localhost:8050.

Expected Behavior:

  • Reward plot: Blue line with orange markers during fallback
  • Stability plot: Purple line with red threshold at 0.3
  • Alert banner: Orange banner appears during fallback mode
  • Real-time updates: Plots update every 100ms (simulating live production monitoring)

Production enhancements:

  1. Database backend: Replace SimulationStream with Prometheus/InfluxDB queries
  2. Multiple metrics: Add GMV, CVR, latency panels
  3. Time windows: Add controls to zoom to last 1hr/24hr/7days
  4. Drill-down: Click on drift event to show raw logs
  5. PagerDuty integration: Trigger on-call alerts when drift detected

Conclusion

These exercises and labs provide hands-on experience with:

  1. Mathematical foundations: Detection delay bounds, tracking error analysis
  2. Algorithm implementation: CUSUM from scratch, multivariate extensions
  3. Production constraints: CM2 floors, stability-performance tradeoffs
  4. Advanced techniques: Adaptive learning rates, checkpoint fallback, real-time dashboards

For further reading on production RL monitoring, see: - [@sculley:hidden_technical_debt:2015] — Technical debt in ML systems - [@breck:ml_test_score:2017] — ML testing best practices - Evidently AI documentation — Drift detection library

Next steps:

  • Extend multivariate drift detection to monitor covariance matrix changes
  • Implement adaptive thresholds that tune \(\lambda\) based on historical false alarm rates
  • Build A/B testing framework to evaluate multiple drift detectors simultaneously