🏆 Level 7 – Final Integration

Project 7.2: "Heart Rate Monitor"

 

What you’ll learn

  • ✅ Pulse detection: Read a KY‑039 finger heartbeat sensor and detect peaks safely.
  • ✅ Heart rate calculation: Convert peaks per minute into BPM with rolling averages.
  • ✅ OLED and web display: Show live BPM on OLED and serve a tiny web page for classroom viewing.
  • ✅ Abnormal rhythm alerts: Flag low/high BPM ranges and irregular intervals.
  • ✅ Historical record: Log BPM over time to a file and reload summaries.

Blocks glossary (used in this project)

  • ADC sampling: Read analog levels from KY‑039 on a high‑impedance input pin.
  • Smoothing + threshold: Moving average and dynamic threshold for robust peak detection.
  • Peak timing: Use time.ticks_ms between peaks to compute BPM.
  • Rolling average: Keep 5–10 recent BPM values to stabilize the display.
  • OLED print: Show BPM, status, and last interval.
  • Web server: Minimal HTTP server printing the current BPM.
  • Serial println: Short tags “SENS:…”, “PEAK:…”, “BPM:…”, “OLED:…”, “WEB:…”, “LOG:…”, “ALERT:…”.

What you need

PartHow many?Pin connection / Notes
D1 R32 (ESP32)1USB cable (30 cm)
KY‑039 heartbeat sensor1Signal OUT → Pin 34 (ADC), VCC, GND
OLED SSD1306 128×641I2C: SCL→22, SDA→21, VCC, GND
LED (optional alert)1Pin 13 (with resistor), VCC, GND

Notes

  • Keep finger lightly on KY‑039; avoid pressing hard (it blocks blood flow).
  • Reduce ambient light with a small cover around the sensor for stable readings.
  • Share ground across all modules; keep sensor wires short.

Before you start

  • KY‑039 wired: OUT→34, VCC, GND; OLED on 22/21
  • Room lighting steady; finger ready for testing
  • Serial monitor open and shows:
print("Ready!")  # Confirm serial is working so you can see messages

Microprojects 1–5

Microproject 7.2.1 – Pulse detection basics

Goal: Sample ADC, smooth the signal, and detect peaks with a dynamic threshold.
Blocks used:

  • ADC read: 12‑bit.
  • Moving average + threshold: Simple smoothing.

MicroPython code:

import machine  # Import machine to access ADC pins
import time  # Import time for delays and timestamps

adc = machine.ADC(machine.Pin(34))  # Create ADC on Pin 34 for KY-039
adc.atten(machine.ADC.ATTN_11DB)  # Set attenuation for wider voltage range
adc.width(machine.ADC.WIDTH_12BIT)  # Set resolution to 12 bits (0–4095)
print("SENS:ADC READY PIN=34")  # Print sensor readiness

avg = 0  # Initialize moving average accumulator
alpha = 0.2  # Set smoothing factor (0–1), higher reacts faster
thr_gain = 0.15  # Set threshold gain relative to average
debounce_ms = 280  # Set minimum time between peaks to avoid double counts
last_peak_ms = 0  # Initialize last peak timestamp
print("SENS:FILTER alpha", alpha, "thr_gain", thr_gain)  # Print filter and threshold config

def sample_and_detect():  # Define function to sample and detect peaks
    global avg, last_peak_ms  # Use global variables for state
    raw = adc.read()  # Read raw ADC value
    avg = int((1 - alpha) * avg + alpha * raw)  # Update moving average
    thr = avg + int(thr_gain * avg)  # Compute dynamic threshold
    now = time.ticks_ms()  # Get current time in ms
    is_peak = False  # Initialize peak flag to False
    if raw > thr:  # If raw exceeds threshold
        if time.ticks_diff(now, last_peak_ms) > debounce_ms:  # If beyond debounce window
            last_peak_ms = now  # Update last peak timestamp
            is_peak = True  # Set peak flag
            print("PEAK:DET raw", raw, "avg", avg, "thr", thr)  # Print peak detection line
    return raw, avg, thr, is_peak  # Return values and peak flag

# Demo: sample for 2 seconds
start = time.ticks_ms()  # Record start time
while time.ticks_diff(time.ticks_ms(), start) < 2000:  # Loop for ~2 seconds
    raw, avg_val, thr_val, peak = sample_and_detect()  # Sample and detect
    time.sleep(0.02)  # Small delay to control sampling rate

Reflection: A moving average plus a flexible threshold is enough to see real heartbeats.
Challenge:

  • Easy: Print a tiny bar using raw and avg for visualization.
  • Harder: Add ambient light shielding and compare results.

Microproject 7.2.2 – Heart rate calculation in BPM

Goal: Measure time between peaks (RR interval) and compute BPM with rolling average.
Blocks used:

  • RR interval: ms between peaks.
  • Rolling average: 5‑sample mean.

MicroPython code:

bpm = 0  # Initialize BPM value
rr_list = []  # Initialize list to store recent RR intervals (ms)
max_rr = 5  # Set number of intervals to average
print("BPM:INIT")  # Print BPM init line

def update_bpm(is_peak):  # Define function to update BPM when a peak occurs
    global bpm  # Use global bpm
    if not is_peak:  # If no peak detected
        return bpm  # Return current BPM unchanged
    now = time.ticks_ms()  # Read current time
    # Use last_peak_ms from previous microproject; ensure it exists
    rr = time.ticks_diff(now, last_peak_ms)  # Compute RR interval in ms
    if rr > 300 and rr < 2000:  # If interval is plausible (30–200 BPM)
        rr_list.append(rr)  # Append interval
        if len(rr_list) > max_rr:  # If too many intervals stored
            rr_list.pop(0)  # Remove oldest to keep list size
        mean_rr = sum(rr_list) / len(rr_list)  # Compute average RR
        bpm = int(60000 / mean_rr)  # Convert average interval to BPM
        print("BPM:UPDATE rr", rr, "mean_rr", int(mean_rr), "bpm", bpm)  # Print update line
    return bpm  # Return current BPM

# Demo: compute bpm inside sampling loop
start = time.ticks_ms()  # Record start time
while time.ticks_diff(time.ticks_ms(), start) < 2000:  # Loop 2 seconds
    raw, avg_val, thr_val, peak = sample_and_detect()  # Sample and detect
    bpm_now = update_bpm(peak)  # Update BPM if peak
    time.sleep(0.02)  # Pacing delay

Reflection: Averaging a few RR intervals makes BPM calm and believable.
Challenge:

  • Easy: Print BPM only when you have 3+ intervals.
  • Harder: Add a timeout to display “BPM:—” if no peaks for 3 seconds.

Microproject 7.2.3 – OLED live display

Goal: Show BPM, RR, and signal level on an SSD1306 OLED.
Blocks used:

  • Title + lines: Simple text layout.
  • Refresh cadence: ~5–10 Hz.

MicroPython code:

import oled128x64  # Import OLED driver for SSD1306 128x64

i2c = machine.SoftI2C(scl=machine.Pin(22), sda=machine.Pin(21), freq=100000)  # Create I2C bus on pins 22/21
oled = oled128x64.OLED(i2c, address=0x3c, font_address=0x3A0000, types=0)  # Initialize OLED driver
print("OLED:READY 0x3C")  # Print OLED readiness

last_rr = 0  # Initialize last RR interval
def show_oled(bpm_value, raw_value, avg_value, rr_value):  # Define OLED display function
    oled.clear()  # Clear the screen
    oled.shows('Heart Monitor', x=0, y=0, size=1, space=0, center=False)  # Title line
    oled.shows('BPM:'+str(bpm_value), x=0, y=12, size=1, space=0, center=False)  # BPM line
    oled.shows('RR:'+str(rr_value)+' ms', x=0, y=24, size=1, space=0, center=False)  # RR interval line
    oled.shows('RAW:'+str(raw_value), x=0, y=36, size=1, space=0, center=False)  # Raw signal line
    oled.shows('AVG:'+str(avg_value), x=0, y=48, size=1, space=0, center=False)  # Average signal line
    oled.show()  # Refresh OLED
    print("OLED:UPDATE bpm", bpm_value, "rr", rr_value)  # Print update line

# Demo: short OLED loop
start = time.ticks_ms()  # Record start time
while time.ticks_diff(time.ticks_ms(), start) < 1500:  # Loop 1.5 seconds
    raw, avg_val, thr_val, peak = sample_and_detect()  # Sample and detect
    bpm_now = update_bpm(peak)  # Update BPM
    last_rr = time.ticks_diff(time.ticks_ms(), last_peak_ms)  # Compute RR since last peak
    show_oled(bpm_now, raw, avg_val, last_rr)  # Display values
    time.sleep(0.1)  # Pacing delay for readability

Reflection: A tiny dashboard makes the rhythm visible—students recognize “calm” vs “jitter.”
Challenge:

  • Easy: Add a bar graph for RAW.
  • Harder: Blink an LED on each peak.

Microproject 7.2.4 – Abnormal rhythm alerts

Goal: Flag low/high BPM and irregular RR variance; print alerts.
Blocks used:

  • Ranges: Low < 50, High > 120 (demo).
  • Variance: Deviation among last RR intervals.

MicroPython code:

LOW_BPM = 50  # Set low BPM threshold for alert
HIGH_BPM = 120  # Set high BPM threshold for alert
print("ALERT:THR low", LOW_BPM, "high", HIGH_BPM)  # Print alert thresholds

def check_alerts(bpm_value):  # Define function to check BPM alerts
    if bpm_value == 0:  # If BPM not available yet
        return "WAIT"  # Return wait status
    if bpm_value < LOW_BPM:  # If BPM below low threshold
        print("ALERT:LOW", bpm_value)  # Print low alert
        return "LOW"  # Return low status
    if bpm_value > HIGH_BPM:  # If BPM above high threshold
        print("ALERT:HIGH", bpm_value)  # Print high alert
        return "HIGH"  # Return high status
    return "OK"  # Return normal status

def rr_variance_status():  # Define function to check RR irregularity
    if len(rr_list) < 3:  # If not enough intervals
        return "VAR:WAIT"  # Return waiting status
    mean_rr = sum(rr_list) / len(rr_list)  # Compute mean RR
    spread = max(rr_list) - min(rr_list)  # Compute spread among RR values
    if spread > 250:  # If spread exceeds 250 ms
        print("ALERT:IRREG spread", spread)  # Print irregular alert
        return "IRREG"  # Return irregular status
    return "REG"  # Return regular status

# Demo: run checks for ~1.5 s
start = time.ticks_ms()  # Record start time
while time.ticks_diff(time.ticks_ms(), start) < 1500:  # Loop 1.5 seconds
    raw, avg_val, thr_val, peak = sample_and_detect()  # Sample and detect
    bpm_now = update_bpm(peak)  # Update BPM
    status = check_alerts(bpm_now)  # Check BPM range
    rrs = rr_variance_status()  # Check RR variance
    time.sleep(0.1)  # Small delay

Reflection: Simple ranges and variance flags help students interpret rhythm quality at a glance.
Challenge:

  • Easy: Show status text on OLED.
  • Harder: Add a quiet LED pulse on LOW/HIGH alerts.

Microproject 7.2.5 – Historical rhythm record (file)

Goal: Log BPM with timestamps to a file and read back a summary.
Blocks used:

  • File I/O: Append lines, read all.
  • Summary: Min/Max/Avg BPM.

MicroPython code:

log_name = "bpm_log.txt"  # Define log filename
print("LOG:FILE", log_name)  # Print log filename

def ts():  # Define function to get seconds since boot
    return str(time.ticks_ms() // 1000)  # Return seconds as string

def log_bpm(bpm_value):  # Define function to append BPM to log
    try:  # Try to write file
        with open(log_name, "a") as f:  # Open file in append mode
            f.write(ts() + "," + str(bpm_value) + "\n")  # Write "time,bpm" line
        print("LOG:WRITE bpm", bpm_value)  # Print write confirmation
    except OSError:  # If write fails
        print("LOG:ERR_WRITE")  # Print error

def read_summary():  # Define function to read file and compute summary
    try:  # Try to read file
        with open(log_name, "r") as f:  # Open file in read mode
            lines = f.readlines()  # Read all lines into list
        vals = [int(line.strip().split(",")[1]) for line in lines if "," in line]  # Extract BPM values
        if not vals:  # If no values found
            print("LOG:EMPTY")  # Print empty log
            return 0,0,0  # Return zeros
        mn, mx = min(vals), max(vals)  # Compute min and max
        avg = int(sum(vals) / len(vals))  # Compute average
        print("LOG:SUMMARY min", mn, "max", mx, "avg", avg)  # Print summary
        return mn, mx, avg  # Return summary tuple
    except OSError:  # If read fails
        print("LOG:ERR_READ")  # Print error
        return 0,0,0  # Return zeros

# Demo: log a few BPM samples
for i in range(3):  # Iterate three samples
    raw, avg_val, thr_val, peak = sample_and_detect()  # Sample and detect
    bpm_now = update_bpm(peak)  # Update BPM
    log_bpm(bpm_now)  # Write to log
    time.sleep(0.2)  # Small delay
read_summary()  # Read back summary

Reflection: Logs turn a minute of data into a story—min, max, and average tell how steady you were.
Challenge:

  • Easy: Add “status” column (OK/LOW/HIGH).
  • Harder: Roll a new file each session with a timestamped name.

Main project – Heart rate monitor with OLED and web

Blocks steps (with glossary)

  • Sampling + smoothing: KY‑039 ADC read with moving average and dynamic threshold.
  • Peak → BPM: RR intervals converted to BPM with rolling average.
  • OLED dashboard: BPM/RR/raw/avg shown clearly.
  • Alerts: Low/high BPM and irregular RR variance flags.
  • Logging: Append BPM to a file; compute summary.
  • Web server: Serve a tiny page with current BPM and status.

MicroPython code (mirroring blocks)

# Project 7.2 – Heart Rate Monitor (KY-039 + BPM + OLED + Web + Logging)

import machine  # Import machine for ADC and pins
import time  # Import time for timestamps and delays
import oled128x64  # Import OLED driver for SSD1306 display
import socket  # Import socket for simple web server

# ===== Sensor and smoothing =====
adc = machine.ADC(machine.Pin(34))  # Create ADC on Pin 34
adc.atten(machine.ADC.ATTN_11DB)  # Set attenuation
adc.width(machine.ADC.WIDTH_12BIT)  # Set resolution
print("INIT:SENS ADC=34")  # Print sensor init

avg = 0  # Initialize moving average
alpha = 0.2  # Smoothing factor
thr_gain = 0.15  # Threshold gain
debounce_ms = 280  # Minimum ms between peaks
last_peak_ms = 0  # Last peak timestamp
rr_list = []  # Recent RR intervals list
bpm = 0  # Current BPM
print("INIT:FILTER alpha", alpha, "thr_gain", thr_gain, "debounce", debounce_ms)  # Print filter settings

def sample_and_detect():  # Sample ADC and detect peaks
    global avg, last_peak_ms  # Use globals for state
    raw = adc.read()  # Read raw ADC
    avg = int((1 - alpha) * avg + alpha * raw)  # Update moving average
    thr = avg + int(thr_gain * avg)  # Dynamic threshold
    now = time.ticks_ms()  # Current ms
    is_peak = False  # Peak flag
    if raw > thr:  # If raw exceeds threshold
        if time.ticks_diff(now, last_peak_ms) > debounce_ms:  # If beyond debounce
            last_peak_ms = now  # Update last peak time
            is_peak = True  # Mark peak
            print("PEAK:DET raw", raw, "avg", avg, "thr", thr)  # Print detection
    return raw, avg, thr, is_peak  # Return values

def update_bpm(is_peak):  # Update BPM from peak
    global bpm  # Use global bpm
    if not is_peak:  # If no peak
        return bpm  # Keep BPM
    now = time.ticks_ms()  # Current time
    rr = time.ticks_diff(now, last_peak_ms)  # RR interval ms
    if rr > 300 and rr < 2000:  # If plausible 30–200 BPM
        rr_list.append(rr)  # Store interval
        if len(rr_list) > 5:  # Limit list size
            rr_list.pop(0)  # Remove oldest
        mean_rr = sum(rr_list) / len(rr_list)  # Average interval
        bpm = int(60000 / mean_rr)  # Convert to BPM
        print("BPM:UPDATE rr", rr, "mean_rr", int(mean_rr), "bpm", bpm)  # Print update
    return bpm  # Return BPM

# ===== OLED dashboard =====
i2c = machine.SoftI2C(scl=machine.Pin(22), sda=machine.Pin(21), freq=100000)  # Create I2C bus
oled = oled128x64.OLED(i2c, address=0x3c, font_address=0x3A0000, types=0)  # Initialize OLED
print("INIT:OLED 0x3C")  # Print OLED ready

def show_oled(bpm_value, raw_value, avg_value, rr_value, status_text):  # Display on OLED
    oled.clear()  # Clear screen
    oled.shows('Heart Monitor', x=0, y=0, size=1, space=0, center=False)  # Title
    oled.shows('BPM:'+str(bpm_value), x=0, y=12, size=1, space=0, center=False)  # BPM line
    oled.shows('RR:'+str(rr_value)+' ms', x=0, y=24, size=1, space=0, center=False)  # RR line
    oled.shows('RAW:'+str(raw_value), x=0, y=36, size=1, space=0, center=False)  # RAW line
    oled.shows('AVG:'+str(avg_value), x=0, y=48, size=1, space=0, center=False)  # AVG line
    oled.show()  # Refresh OLED
    print("OLED:UPDATE", status_text)  # Print status text

# ===== Alerts =====
LOW_BPM = 50  # Low BPM threshold
HIGH_BPM = 120  # High BPM threshold
print("INIT:ALERT low", LOW_BPM, "high", HIGH_BPM)  # Print thresholds

def check_alerts(bpm_value):  # Return status string
    if bpm_value == 0:  # If BPM not computed yet
        return "WAIT"  # Return wait status
    if bpm_value < LOW_BPM:  # If below low threshold
        print("ALERT:LOW", bpm_value)  # Print alert
        return "LOW"  # Return low status
    if bpm_value > HIGH_BPM:  # If above high threshold
        print("ALERT:HIGH", bpm_value)  # Print alert
        return "HIGH"  # Return high status
    # Check RR irregularity
    if len(rr_list) >= 3:  # If enough intervals
        spread = max(rr_list) - min(rr_list)  # Compute spread ms
        if spread > 250:  # If irregular
            print("ALERT:IRREG spread", spread)  # Print irregular
            return "IRREG"  # Return irregular status
    return "OK"  # Return normal

# ===== Logging =====
log_name = "bpm_log.txt"  # Log file name
print("INIT:LOG", log_name)  # Print log name

def ts():  # Seconds since boot
    return str(time.ticks_ms() // 1000)  # Return seconds

def log_bpm(bpm_value):  # Append BPM to file
    try:  # Try write
        with open(log_name, "a") as f:  # Open append
            f.write(ts() + "," + str(bpm_value) + "\n")  # Write time,bpm
        print("LOG:WRITE", bpm_value)  # Print write
    except OSError:  # On error
        print("LOG:ERR_WRITE")  # Print error

# ===== Web server (tiny) =====
def start_web():  # Start minimal HTTP server
    addr = socket.getaddrinfo('0.0.0.0', 80)[0][-1]  # Resolve address
    srv = socket.socket()  # Create TCP socket
    srv.bind(addr)  # Bind to port 80
    srv.listen(1)  # Listen for one client
    srv.settimeout(0.05)  # Short timeout to stay responsive
    print("WEB:LISTEN", addr)  # Print web listen
    return srv  # Return server socket

def page_html(bpm_value, status_text):  # Build HTML page
    body = "<html><body><h1>Heart Monitor</h1>"  # Start HTML body
    body += "<p>BPM: " + str(bpm_value) + "</p>"  # Add BPM line
    body += "<p>Status: " + status_text + "</p>"  # Add status line
    body += "</body></html>"  # Close HTML body
    return "HTTP/1.1 200 OK\r\nContent-Type: text/html\r\nConnection: close\r\n\r\n" + body  # Build response

srv = start_web()  # Start web server

# ===== Main loop =====
print("RUN:Heart Monitor")  # Announce start
last_rr = 0  # Initialize last RR value
last_web = time.ticks_ms()  # Initialize last web update time

while True:  # Continuous monitoring loop
    raw, avg_val, thr_val, peak = sample_and_detect()  # Sample and detect peak
    bpm_now = update_bpm(peak)  # Update BPM from peaks
    last_rr = time.ticks_diff(time.ticks_ms(), last_peak_ms)  # RR since last peak
    status = check_alerts(bpm_now)  # Evaluate alerts and status
    show_oled(bpm_now, raw, avg_val, last_rr, status)  # Update OLED display
    log_bpm(bpm_now)  # Append BPM to log file
    # Serve a single web client if present
    try:  # Try accept client
        cl, remote = srv.accept()  # Accept a client connection
        req = cl.recv(256)  # Read request bytes
        line = req.decode(errors="ignore").split('\r\n')[0]  # Parse request line
        print("WEB:REQ", line)  # Print request line
        cl.send(page_html(bpm_now, status).encode())  # Send HTML response
        cl.close()  # Close client socket
    except OSError:  # On timeout
        pass  # Continue monitoring
    time.sleep(0.08)  # Small delay to set sampling cadence

External explanation

  • What it teaches: You sampled a heartbeat sensor, stabilized the signal, detected peaks, turned intervals into BPM, displayed the data on OLED, served a tiny web page, raised simple alerts, and logged history.
  • Why it works: Smoothing plus a dynamic threshold capture real peaks; RR intervals translate directly to BPM; rolling averages keep readings steady; visual feedback and logs make the rhythm understandable.
  • Key concept: “Sense → smooth → detect → time → display → share.”

Story time

You place a finger on the sensor, and the numbers begin to breathe. Each tiny rise becomes a “PEAK:DET”, then BPM settles into a calm 72. On the OLED, the rhythm looks steady; on the web page, your classmates see it too. Simple signal, honest heartbeat.


Debugging (2)

Debugging 7.2.1 – Pulse not detected

Problem: No PEAK:DET lines; BPM stays 0.
Clues: RAW barely moves; AVG equals RAW; threshold too high.
Broken code:

thr_gain = 0.4  # Threshold too aggressive for small signals

Fixed code:

thr_gain = 0.15  # Softer threshold tracks finger pulses better
alpha = 0.25  # Slightly faster average to reflect pulses
print("DEBUG:THR alpha", alpha, "thr_gain", thr_gain)  # Verify settings

Why it works: Lower threshold and a quicker average give peaks room to rise above the baseline.
Avoid next time: Shield the sensor from ambient light and avoid pressing too hard.

Debugging 7.2.2 – Incorrect rhythm calculation

Problem: BPM jumps wildly; RR includes duplicates.
Clues: Debounce too short; double peaks per beat.
Broken code:

debounce_ms = 120  # Allows double counts on noisy signals

Fixed code:

debounce_ms = 280  # Reject re-peaks within 280 ms
rr_list = rr_list[-5:]  # Keep last 5 intervals only
print("DEBUG:DEBOUNCE", debounce_ms)  # Confirm new debounce

Why it works: A longer debounce prevents double counting; limiting RR window stabilizes BPM.
Avoid next time: Tune debounce with prints while watching RAW and PEAK:DET events.


Final checklist

  • ADC sampling prints SENS:ADC READY and runs smoothly
  • PEAK:DET lines appear with threshold and averages shown
  • BPM updates via RR intervals and stays stable with rolling average
  • OLED shows BPM, RR, RAW, AVG clearly and updates regularly
  • Alerts print LOW/HIGH/IRREG when conditions occur
  • Web page serves current BPM and status; log file records samples

Extras

  • 🧠 Student tip: Sit still, relax your hand, and lightly cover the sensor—clean signals make everything easier.
  • 🧑‍🏫 Instructor tip: Compare alpha and thr_gain across teams; discuss how smoothing and thresholds affect detection.
  • 📖 Glossary:
    • RR interval: Time between consecutive heartbeats.
    • BPM: Beats per minute, computed from RR intervals.
    • Debounce: Minimum time required to avoid counting the same event twice.
  • 💡 Mini tips:
    • Average 3–5 RGB values if your class uses colored covers; keep ambient consistent.
    • Start with LOW_BPM=50 and HIGH_BPM=120; adjust only after observing class signals.
    • Keep loops responsive with short sleeps; avoid blocking web server.
On this page