Project 7.2: "Heart Rate Monitor"
What you’ll learn
- ✅ Pulse detection: Read a KY‑039 finger heartbeat sensor and detect peaks safely.
- ✅ Heart rate calculation: Convert peaks per minute into BPM with rolling averages.
- ✅ OLED and web display: Show live BPM on OLED and serve a tiny web page for classroom viewing.
- ✅ Abnormal rhythm alerts: Flag low/high BPM ranges and irregular intervals.
- ✅ Historical record: Log BPM over time to a file and reload summaries.
Blocks glossary (used in this project)
- ADC sampling: Read analog levels from KY‑039 on a high‑impedance input pin.
- Smoothing + threshold: Moving average and dynamic threshold for robust peak detection.
- Peak timing: Use time.ticks_ms between peaks to compute BPM.
- Rolling average: Keep 5–10 recent BPM values to stabilize the display.
- OLED print: Show BPM, status, and last interval.
- Web server: Minimal HTTP server printing the current BPM.
- Serial println: Short tags “SENS:…”, “PEAK:…”, “BPM:…”, “OLED:…”, “WEB:…”, “LOG:…”, “ALERT:…”.
What you need
| Part | How many? | Pin connection / Notes |
|---|---|---|
| D1 R32 (ESP32) | 1 | USB cable (30 cm) |
| KY‑039 heartbeat sensor | 1 | Signal OUT → Pin 34 (ADC), VCC, GND |
| OLED SSD1306 128×64 | 1 | I2C: SCL→22, SDA→21, VCC, GND |
| LED (optional alert) | 1 | Pin 13 (with resistor), VCC, GND |
Notes
- Keep finger lightly on KY‑039; avoid pressing hard (it blocks blood flow).
- Reduce ambient light with a small cover around the sensor for stable readings.
- Share ground across all modules; keep sensor wires short.
Before you start
- KY‑039 wired: OUT→34, VCC, GND; OLED on 22/21
- Room lighting steady; finger ready for testing
- Serial monitor open and shows:
print("Ready!") # Confirm serial is working so you can see messages
Microprojects 1–5
Microproject 7.2.1 – Pulse detection basics
Goal: Sample ADC, smooth the signal, and detect peaks with a dynamic threshold.
Blocks used:
- ADC read: 12‑bit.
- Moving average + threshold: Simple smoothing.
MicroPython code:
import machine # Import machine to access ADC pins
import time # Import time for delays and timestamps
adc = machine.ADC(machine.Pin(34)) # Create ADC on Pin 34 for KY-039
adc.atten(machine.ADC.ATTN_11DB) # Set attenuation for wider voltage range
adc.width(machine.ADC.WIDTH_12BIT) # Set resolution to 12 bits (0–4095)
print("SENS:ADC READY PIN=34") # Print sensor readiness
avg = 0 # Initialize moving average accumulator
alpha = 0.2 # Set smoothing factor (0–1), higher reacts faster
thr_gain = 0.15 # Set threshold gain relative to average
debounce_ms = 280 # Set minimum time between peaks to avoid double counts
last_peak_ms = 0 # Initialize last peak timestamp
print("SENS:FILTER alpha", alpha, "thr_gain", thr_gain) # Print filter and threshold config
def sample_and_detect(): # Define function to sample and detect peaks
global avg, last_peak_ms # Use global variables for state
raw = adc.read() # Read raw ADC value
avg = int((1 - alpha) * avg + alpha * raw) # Update moving average
thr = avg + int(thr_gain * avg) # Compute dynamic threshold
now = time.ticks_ms() # Get current time in ms
is_peak = False # Initialize peak flag to False
if raw > thr: # If raw exceeds threshold
if time.ticks_diff(now, last_peak_ms) > debounce_ms: # If beyond debounce window
last_peak_ms = now # Update last peak timestamp
is_peak = True # Set peak flag
print("PEAK:DET raw", raw, "avg", avg, "thr", thr) # Print peak detection line
return raw, avg, thr, is_peak # Return values and peak flag
# Demo: sample for 2 seconds
start = time.ticks_ms() # Record start time
while time.ticks_diff(time.ticks_ms(), start) < 2000: # Loop for ~2 seconds
raw, avg_val, thr_val, peak = sample_and_detect() # Sample and detect
time.sleep(0.02) # Small delay to control sampling rate
Reflection: A moving average plus a flexible threshold is enough to see real heartbeats.
Challenge:
- Easy: Print a tiny bar using raw and avg for visualization.
- Harder: Add ambient light shielding and compare results.
Microproject 7.2.2 – Heart rate calculation in BPM
Goal: Measure time between peaks (RR interval) and compute BPM with rolling average.
Blocks used:
- RR interval: ms between peaks.
- Rolling average: 5‑sample mean.
MicroPython code:
bpm = 0 # Initialize BPM value
rr_list = [] # Initialize list to store recent RR intervals (ms)
max_rr = 5 # Set number of intervals to average
print("BPM:INIT") # Print BPM init line
def update_bpm(is_peak): # Define function to update BPM when a peak occurs
global bpm # Use global bpm
if not is_peak: # If no peak detected
return bpm # Return current BPM unchanged
now = time.ticks_ms() # Read current time
# Use last_peak_ms from previous microproject; ensure it exists
rr = time.ticks_diff(now, last_peak_ms) # Compute RR interval in ms
if rr > 300 and rr < 2000: # If interval is plausible (30–200 BPM)
rr_list.append(rr) # Append interval
if len(rr_list) > max_rr: # If too many intervals stored
rr_list.pop(0) # Remove oldest to keep list size
mean_rr = sum(rr_list) / len(rr_list) # Compute average RR
bpm = int(60000 / mean_rr) # Convert average interval to BPM
print("BPM:UPDATE rr", rr, "mean_rr", int(mean_rr), "bpm", bpm) # Print update line
return bpm # Return current BPM
# Demo: compute bpm inside sampling loop
start = time.ticks_ms() # Record start time
while time.ticks_diff(time.ticks_ms(), start) < 2000: # Loop 2 seconds
raw, avg_val, thr_val, peak = sample_and_detect() # Sample and detect
bpm_now = update_bpm(peak) # Update BPM if peak
time.sleep(0.02) # Pacing delay
Reflection: Averaging a few RR intervals makes BPM calm and believable.
Challenge:
- Easy: Print BPM only when you have 3+ intervals.
- Harder: Add a timeout to display “BPM:—” if no peaks for 3 seconds.
Microproject 7.2.3 – OLED live display
Goal: Show BPM, RR, and signal level on an SSD1306 OLED.
Blocks used:
- Title + lines: Simple text layout.
- Refresh cadence: ~5–10 Hz.
MicroPython code:
import oled128x64 # Import OLED driver for SSD1306 128x64
i2c = machine.SoftI2C(scl=machine.Pin(22), sda=machine.Pin(21), freq=100000) # Create I2C bus on pins 22/21
oled = oled128x64.OLED(i2c, address=0x3c, font_address=0x3A0000, types=0) # Initialize OLED driver
print("OLED:READY 0x3C") # Print OLED readiness
last_rr = 0 # Initialize last RR interval
def show_oled(bpm_value, raw_value, avg_value, rr_value): # Define OLED display function
oled.clear() # Clear the screen
oled.shows('Heart Monitor', x=0, y=0, size=1, space=0, center=False) # Title line
oled.shows('BPM:'+str(bpm_value), x=0, y=12, size=1, space=0, center=False) # BPM line
oled.shows('RR:'+str(rr_value)+' ms', x=0, y=24, size=1, space=0, center=False) # RR interval line
oled.shows('RAW:'+str(raw_value), x=0, y=36, size=1, space=0, center=False) # Raw signal line
oled.shows('AVG:'+str(avg_value), x=0, y=48, size=1, space=0, center=False) # Average signal line
oled.show() # Refresh OLED
print("OLED:UPDATE bpm", bpm_value, "rr", rr_value) # Print update line
# Demo: short OLED loop
start = time.ticks_ms() # Record start time
while time.ticks_diff(time.ticks_ms(), start) < 1500: # Loop 1.5 seconds
raw, avg_val, thr_val, peak = sample_and_detect() # Sample and detect
bpm_now = update_bpm(peak) # Update BPM
last_rr = time.ticks_diff(time.ticks_ms(), last_peak_ms) # Compute RR since last peak
show_oled(bpm_now, raw, avg_val, last_rr) # Display values
time.sleep(0.1) # Pacing delay for readability
Reflection: A tiny dashboard makes the rhythm visible—students recognize “calm” vs “jitter.”
Challenge:
- Easy: Add a bar graph for RAW.
- Harder: Blink an LED on each peak.
Microproject 7.2.4 – Abnormal rhythm alerts
Goal: Flag low/high BPM and irregular RR variance; print alerts.
Blocks used:
- Ranges: Low < 50, High > 120 (demo).
- Variance: Deviation among last RR intervals.
MicroPython code:
LOW_BPM = 50 # Set low BPM threshold for alert
HIGH_BPM = 120 # Set high BPM threshold for alert
print("ALERT:THR low", LOW_BPM, "high", HIGH_BPM) # Print alert thresholds
def check_alerts(bpm_value): # Define function to check BPM alerts
if bpm_value == 0: # If BPM not available yet
return "WAIT" # Return wait status
if bpm_value < LOW_BPM: # If BPM below low threshold
print("ALERT:LOW", bpm_value) # Print low alert
return "LOW" # Return low status
if bpm_value > HIGH_BPM: # If BPM above high threshold
print("ALERT:HIGH", bpm_value) # Print high alert
return "HIGH" # Return high status
return "OK" # Return normal status
def rr_variance_status(): # Define function to check RR irregularity
if len(rr_list) < 3: # If not enough intervals
return "VAR:WAIT" # Return waiting status
mean_rr = sum(rr_list) / len(rr_list) # Compute mean RR
spread = max(rr_list) - min(rr_list) # Compute spread among RR values
if spread > 250: # If spread exceeds 250 ms
print("ALERT:IRREG spread", spread) # Print irregular alert
return "IRREG" # Return irregular status
return "REG" # Return regular status
# Demo: run checks for ~1.5 s
start = time.ticks_ms() # Record start time
while time.ticks_diff(time.ticks_ms(), start) < 1500: # Loop 1.5 seconds
raw, avg_val, thr_val, peak = sample_and_detect() # Sample and detect
bpm_now = update_bpm(peak) # Update BPM
status = check_alerts(bpm_now) # Check BPM range
rrs = rr_variance_status() # Check RR variance
time.sleep(0.1) # Small delay
Reflection: Simple ranges and variance flags help students interpret rhythm quality at a glance.
Challenge:
- Easy: Show status text on OLED.
- Harder: Add a quiet LED pulse on LOW/HIGH alerts.
Microproject 7.2.5 – Historical rhythm record (file)
Goal: Log BPM with timestamps to a file and read back a summary.
Blocks used:
- File I/O: Append lines, read all.
- Summary: Min/Max/Avg BPM.
MicroPython code:
log_name = "bpm_log.txt" # Define log filename
print("LOG:FILE", log_name) # Print log filename
def ts(): # Define function to get seconds since boot
return str(time.ticks_ms() // 1000) # Return seconds as string
def log_bpm(bpm_value): # Define function to append BPM to log
try: # Try to write file
with open(log_name, "a") as f: # Open file in append mode
f.write(ts() + "," + str(bpm_value) + "\n") # Write "time,bpm" line
print("LOG:WRITE bpm", bpm_value) # Print write confirmation
except OSError: # If write fails
print("LOG:ERR_WRITE") # Print error
def read_summary(): # Define function to read file and compute summary
try: # Try to read file
with open(log_name, "r") as f: # Open file in read mode
lines = f.readlines() # Read all lines into list
vals = [int(line.strip().split(",")[1]) for line in lines if "," in line] # Extract BPM values
if not vals: # If no values found
print("LOG:EMPTY") # Print empty log
return 0,0,0 # Return zeros
mn, mx = min(vals), max(vals) # Compute min and max
avg = int(sum(vals) / len(vals)) # Compute average
print("LOG:SUMMARY min", mn, "max", mx, "avg", avg) # Print summary
return mn, mx, avg # Return summary tuple
except OSError: # If read fails
print("LOG:ERR_READ") # Print error
return 0,0,0 # Return zeros
# Demo: log a few BPM samples
for i in range(3): # Iterate three samples
raw, avg_val, thr_val, peak = sample_and_detect() # Sample and detect
bpm_now = update_bpm(peak) # Update BPM
log_bpm(bpm_now) # Write to log
time.sleep(0.2) # Small delay
read_summary() # Read back summary
Reflection: Logs turn a minute of data into a story—min, max, and average tell how steady you were.
Challenge:
- Easy: Add “status” column (OK/LOW/HIGH).
- Harder: Roll a new file each session with a timestamped name.
Main project – Heart rate monitor with OLED and web
Blocks steps (with glossary)
- Sampling + smoothing: KY‑039 ADC read with moving average and dynamic threshold.
- Peak → BPM: RR intervals converted to BPM with rolling average.
- OLED dashboard: BPM/RR/raw/avg shown clearly.
- Alerts: Low/high BPM and irregular RR variance flags.
- Logging: Append BPM to a file; compute summary.
- Web server: Serve a tiny page with current BPM and status.
MicroPython code (mirroring blocks)
# Project 7.2 – Heart Rate Monitor (KY-039 + BPM + OLED + Web + Logging)
import machine # Import machine for ADC and pins
import time # Import time for timestamps and delays
import oled128x64 # Import OLED driver for SSD1306 display
import socket # Import socket for simple web server
# ===== Sensor and smoothing =====
adc = machine.ADC(machine.Pin(34)) # Create ADC on Pin 34
adc.atten(machine.ADC.ATTN_11DB) # Set attenuation
adc.width(machine.ADC.WIDTH_12BIT) # Set resolution
print("INIT:SENS ADC=34") # Print sensor init
avg = 0 # Initialize moving average
alpha = 0.2 # Smoothing factor
thr_gain = 0.15 # Threshold gain
debounce_ms = 280 # Minimum ms between peaks
last_peak_ms = 0 # Last peak timestamp
rr_list = [] # Recent RR intervals list
bpm = 0 # Current BPM
print("INIT:FILTER alpha", alpha, "thr_gain", thr_gain, "debounce", debounce_ms) # Print filter settings
def sample_and_detect(): # Sample ADC and detect peaks
global avg, last_peak_ms # Use globals for state
raw = adc.read() # Read raw ADC
avg = int((1 - alpha) * avg + alpha * raw) # Update moving average
thr = avg + int(thr_gain * avg) # Dynamic threshold
now = time.ticks_ms() # Current ms
is_peak = False # Peak flag
if raw > thr: # If raw exceeds threshold
if time.ticks_diff(now, last_peak_ms) > debounce_ms: # If beyond debounce
last_peak_ms = now # Update last peak time
is_peak = True # Mark peak
print("PEAK:DET raw", raw, "avg", avg, "thr", thr) # Print detection
return raw, avg, thr, is_peak # Return values
def update_bpm(is_peak): # Update BPM from peak
global bpm # Use global bpm
if not is_peak: # If no peak
return bpm # Keep BPM
now = time.ticks_ms() # Current time
rr = time.ticks_diff(now, last_peak_ms) # RR interval ms
if rr > 300 and rr < 2000: # If plausible 30–200 BPM
rr_list.append(rr) # Store interval
if len(rr_list) > 5: # Limit list size
rr_list.pop(0) # Remove oldest
mean_rr = sum(rr_list) / len(rr_list) # Average interval
bpm = int(60000 / mean_rr) # Convert to BPM
print("BPM:UPDATE rr", rr, "mean_rr", int(mean_rr), "bpm", bpm) # Print update
return bpm # Return BPM
# ===== OLED dashboard =====
i2c = machine.SoftI2C(scl=machine.Pin(22), sda=machine.Pin(21), freq=100000) # Create I2C bus
oled = oled128x64.OLED(i2c, address=0x3c, font_address=0x3A0000, types=0) # Initialize OLED
print("INIT:OLED 0x3C") # Print OLED ready
def show_oled(bpm_value, raw_value, avg_value, rr_value, status_text): # Display on OLED
oled.clear() # Clear screen
oled.shows('Heart Monitor', x=0, y=0, size=1, space=0, center=False) # Title
oled.shows('BPM:'+str(bpm_value), x=0, y=12, size=1, space=0, center=False) # BPM line
oled.shows('RR:'+str(rr_value)+' ms', x=0, y=24, size=1, space=0, center=False) # RR line
oled.shows('RAW:'+str(raw_value), x=0, y=36, size=1, space=0, center=False) # RAW line
oled.shows('AVG:'+str(avg_value), x=0, y=48, size=1, space=0, center=False) # AVG line
oled.show() # Refresh OLED
print("OLED:UPDATE", status_text) # Print status text
# ===== Alerts =====
LOW_BPM = 50 # Low BPM threshold
HIGH_BPM = 120 # High BPM threshold
print("INIT:ALERT low", LOW_BPM, "high", HIGH_BPM) # Print thresholds
def check_alerts(bpm_value): # Return status string
if bpm_value == 0: # If BPM not computed yet
return "WAIT" # Return wait status
if bpm_value < LOW_BPM: # If below low threshold
print("ALERT:LOW", bpm_value) # Print alert
return "LOW" # Return low status
if bpm_value > HIGH_BPM: # If above high threshold
print("ALERT:HIGH", bpm_value) # Print alert
return "HIGH" # Return high status
# Check RR irregularity
if len(rr_list) >= 3: # If enough intervals
spread = max(rr_list) - min(rr_list) # Compute spread ms
if spread > 250: # If irregular
print("ALERT:IRREG spread", spread) # Print irregular
return "IRREG" # Return irregular status
return "OK" # Return normal
# ===== Logging =====
log_name = "bpm_log.txt" # Log file name
print("INIT:LOG", log_name) # Print log name
def ts(): # Seconds since boot
return str(time.ticks_ms() // 1000) # Return seconds
def log_bpm(bpm_value): # Append BPM to file
try: # Try write
with open(log_name, "a") as f: # Open append
f.write(ts() + "," + str(bpm_value) + "\n") # Write time,bpm
print("LOG:WRITE", bpm_value) # Print write
except OSError: # On error
print("LOG:ERR_WRITE") # Print error
# ===== Web server (tiny) =====
def start_web(): # Start minimal HTTP server
addr = socket.getaddrinfo('0.0.0.0', 80)[0][-1] # Resolve address
srv = socket.socket() # Create TCP socket
srv.bind(addr) # Bind to port 80
srv.listen(1) # Listen for one client
srv.settimeout(0.05) # Short timeout to stay responsive
print("WEB:LISTEN", addr) # Print web listen
return srv # Return server socket
def page_html(bpm_value, status_text): # Build HTML page
body = "<html><body><h1>Heart Monitor</h1>" # Start HTML body
body += "<p>BPM: " + str(bpm_value) + "</p>" # Add BPM line
body += "<p>Status: " + status_text + "</p>" # Add status line
body += "</body></html>" # Close HTML body
return "HTTP/1.1 200 OK\r\nContent-Type: text/html\r\nConnection: close\r\n\r\n" + body # Build response
srv = start_web() # Start web server
# ===== Main loop =====
print("RUN:Heart Monitor") # Announce start
last_rr = 0 # Initialize last RR value
last_web = time.ticks_ms() # Initialize last web update time
while True: # Continuous monitoring loop
raw, avg_val, thr_val, peak = sample_and_detect() # Sample and detect peak
bpm_now = update_bpm(peak) # Update BPM from peaks
last_rr = time.ticks_diff(time.ticks_ms(), last_peak_ms) # RR since last peak
status = check_alerts(bpm_now) # Evaluate alerts and status
show_oled(bpm_now, raw, avg_val, last_rr, status) # Update OLED display
log_bpm(bpm_now) # Append BPM to log file
# Serve a single web client if present
try: # Try accept client
cl, remote = srv.accept() # Accept a client connection
req = cl.recv(256) # Read request bytes
line = req.decode(errors="ignore").split('\r\n')[0] # Parse request line
print("WEB:REQ", line) # Print request line
cl.send(page_html(bpm_now, status).encode()) # Send HTML response
cl.close() # Close client socket
except OSError: # On timeout
pass # Continue monitoring
time.sleep(0.08) # Small delay to set sampling cadence
External explanation
- What it teaches: You sampled a heartbeat sensor, stabilized the signal, detected peaks, turned intervals into BPM, displayed the data on OLED, served a tiny web page, raised simple alerts, and logged history.
- Why it works: Smoothing plus a dynamic threshold capture real peaks; RR intervals translate directly to BPM; rolling averages keep readings steady; visual feedback and logs make the rhythm understandable.
- Key concept: “Sense → smooth → detect → time → display → share.”
Story time
You place a finger on the sensor, and the numbers begin to breathe. Each tiny rise becomes a “PEAK:DET”, then BPM settles into a calm 72. On the OLED, the rhythm looks steady; on the web page, your classmates see it too. Simple signal, honest heartbeat.
Debugging (2)
Debugging 7.2.1 – Pulse not detected
Problem: No PEAK:DET lines; BPM stays 0.
Clues: RAW barely moves; AVG equals RAW; threshold too high.
Broken code:
thr_gain = 0.4 # Threshold too aggressive for small signals
Fixed code:
thr_gain = 0.15 # Softer threshold tracks finger pulses better
alpha = 0.25 # Slightly faster average to reflect pulses
print("DEBUG:THR alpha", alpha, "thr_gain", thr_gain) # Verify settings
Why it works: Lower threshold and a quicker average give peaks room to rise above the baseline.
Avoid next time: Shield the sensor from ambient light and avoid pressing too hard.
Debugging 7.2.2 – Incorrect rhythm calculation
Problem: BPM jumps wildly; RR includes duplicates.
Clues: Debounce too short; double peaks per beat.
Broken code:
debounce_ms = 120 # Allows double counts on noisy signals
Fixed code:
debounce_ms = 280 # Reject re-peaks within 280 ms
rr_list = rr_list[-5:] # Keep last 5 intervals only
print("DEBUG:DEBOUNCE", debounce_ms) # Confirm new debounce
Why it works: A longer debounce prevents double counting; limiting RR window stabilizes BPM.
Avoid next time: Tune debounce with prints while watching RAW and PEAK:DET events.
Final checklist
- ADC sampling prints SENS:ADC READY and runs smoothly
- PEAK:DET lines appear with threshold and averages shown
- BPM updates via RR intervals and stays stable with rolling average
- OLED shows BPM, RR, RAW, AVG clearly and updates regularly
- Alerts print LOW/HIGH/IRREG when conditions occur
- Web page serves current BPM and status; log file records samples
Extras
- 🧠 Student tip: Sit still, relax your hand, and lightly cover the sensor—clean signals make everything easier.
- 🧑🏫 Instructor tip: Compare alpha and thr_gain across teams; discuss how smoothing and thresholds affect detection.
- 📖 Glossary:
- RR interval: Time between consecutive heartbeats.
- BPM: Beats per minute, computed from RR intervals.
- Debounce: Minimum time required to avoid counting the same event twice.
- 💡 Mini tips:
- Average 3–5 RGB values if your class uses colored covers; keep ambient consistent.
- Start with LOW_BPM=50 and HIGH_BPM=120; adjust only after observing class signals.
- Keep loops responsive with short sleeps; avoid blocking web server.