Project 3.10: "Hybrid Communication"
What you’ll learn
- Goal 1: Implement a single communication protocol (choose IR or Bluetooth) cleanly.
- Goal 2: Add redundancy: if one channel fails, automatically switch to another.
- Goal 3: Build medium auto‑selection so commands pick the best available link.
- Goal 4: Optimize bandwidth with compact messages and adaptive spacing.
- Goal 5: Create a fault‑tolerant system with health checks and clear serial diagnostics.
Key ideas
- Separation of concerns: Define sender and receiver helpers per medium.
- Health monitoring: Track recent success/failure to choose the best route.
- Adaptive cadence: Space messages to avoid flooding and reduce lag.
- Clear logging: Serial prints confirm decisions without spamming.
Blocks glossary
- IR send/receive: One‑way simple remote control over infrared light (NEC).
- Bluetooth central/peripheral: Two‑way text messages; central connects, peripheral advertises.
- Serial print: Human‑readable logs for state, decisions, and errors.
- Callback (receive): Function triggered automatically on Bluetooth RX.
- Variable state: Track medium health, last success time, error counters.
- def function: Reusable helpers for send, receive, select, and fallback.
- Loop: Regular command cycle with health checks and medium selection.
What you need
| Part | How many? | Pin connection (R32) |
|---|---|---|
| D1 R32 (Controller) | 1 | IR TX → Pin 26; Bluetooth central (internal) |
| D1 R32 (Robot) | 1 | IR RX → Pin 26; Bluetooth peripheral (internal); Motors optional |
| Optional motors (L298N) | 1 | ENA → 5 (PWM), ENB → 18 (PWM), IN1 → 23, IN2 → 19, IN3 → 13, IN4 → 21 |
- Keep boards within 1–3 m for reliable Bluetooth.
- Aim IR TX at IR RX (20–50 cm line of sight).
- Open two serial monitors: Controller and Robot.
Before you start
- Plug in USB for both boards and open serial monitors.
- Quick test:
print("Ready!") # Confirm serial is working
🎮 Microprojects (5 mini missions)
🎮 Microproject 3.10.1 – Single communication protocol (IR or Bluetooth)
Goal: Cleanly implement one protocol end‑to‑end. Choose IR here; Bluetooth appears next.
# Microproject 3.10.1 – Single protocol using IR (Controller + Robot minimal)
# ---------- CONTROLLER (IR TX) ----------
import irremote # Load IR library
import time # Load time library
ir_tx = irremote.NEC_TX(26, False, 100) # IR transmitter on Pin 26
print("[Controller] IR TX ready") # Serial: confirm IR setup
CODE_FORWARD = 0x18 # Forward code (canonical)
def send_ir(code): # Helper: transmit IR code
ir_tx.transmit(0x00, code, 0x00) # Send with address/control 0x00
print("[Controller] IR TX:", hex(code)) # Serial: log code sent
while True: # Simple demo loop
send_ir(CODE_FORWARD) # Send forward command
time.sleep_ms(1000) # Wait 1 second and repeat
# ---------- ROBOT (IR RX) ----------
import irremote # Load IR library
import time # Load time library
ir_rx = irremote.NEC_RX(26, 8) # IR receiver on Pin 26
print("[Robot] IR RX ready") # Serial: confirm IR RX
CODE_FORWARD = 0x18 # Must match controller code
while True: # Decode loop
if ir_rx.any(): # If any IR code in buffer
code = ir_rx.code[0] # Read first code
print("[Robot] IR RX:", hex(code)) # Serial: log receive
if code == CODE_FORWARD: # If forward command
print("[Robot] Action: FORWARD") # Serial: pretend action
else: # Unknown code
print("[Robot] Unknown code") # Serial: warn
time.sleep_ms(150) # Keep loop responsive
Reflection: One protocol is clear and easy — perfect baseline before mixing.
Challenge: Switch to Bluetooth-only by re‑sending “CMD:FORWARD” text messages.
🎮 Microproject 3.10.2 – Communications redundancy (IR primary, Bluetooth fallback)
Goal: If IR fails (no ACK or stale), automatically send over Bluetooth.
# Microproject 3.10.2 – Controller redundancy: IR primary → Bluetooth fallback
import irremote # Load IR library
import ble_central # Load Bluetooth central helper
import time # Load time library
ir_tx = irremote.NEC_TX(26, False, 100) # IR transmitter setup
print("[Controller] IR TX ready") # Serial: IR ready
central = ble_central.BLESimpleCentral() # Bluetooth central object
print("[Controller] BT central ready") # Serial: BT ready
def connect_robot(): # Helper: connect to robot peripheral
central.scan() # Scan peripherals
time.sleep_ms(600) # Short scan wait
central.connect('Robot-R32') # Connect by name
print("[Controller] BT connected") # Serial: connection status
connect_robot() # Establish BT connection
CODE_FORWARD = 0x18 # IR forward code
last_ir_ms = 0 # Last successful IR send ms
ir_timeout_ms = 2000 # IR considered stale after 2 s
def now_ms(): # Helper: current ms
return time.ticks_ms() # Millisecond ticks
def send_ir(code): # Helper: try IR send
global last_ir_ms # Update last IR time
ir_tx.transmit(0x00, code, 0x00) # Transmit IR packet
last_ir_ms = now_ms() # Record send time
print("[Controller] IR TX:", hex(code)) # Serial: log IR send
def send_bt(text): # Helper: send Bluetooth text
central.send(text) # Transmit BT message
print("[Controller] BT TX:", text) # Serial: log BT send
while True: # Redundancy loop
# Try IR first
send_ir(CODE_FORWARD) # Send forward via IR
time.sleep_ms(300) # Short spacing
# If IR is stale (no recent activity), use BT as fallback
if time.ticks_diff(now_ms(), last_ir_ms) > ir_timeout_ms: # If IR stale
send_bt("CMD:FORWARD") # Send same command via BT
time.sleep_ms(700) # Pace total loop ~1 s
Reflection: Redundancy preserves control when one medium struggles.
Challenge: Reverse roles (Bluetooth primary, IR fallback) when the robot reports “ERR:BT”.
🎮 Microproject 3.10.3 – Medium automatic transmission (choose best link)
Goal: Pick the medium based on recent success and signal cadence.
# Microproject 3.10.3 – Controller: auto-select best medium (IR vs Bluetooth)
import irremote # Load IR library
import ble_central # Load Bluetooth central helper
import time # Load time library
ir_tx = irremote.NEC_TX(26, False, 100) # IR transmitter
central = ble_central.BLESimpleCentral() # Bluetooth central
print("[Controller] IR+BT ready") # Serial: init status
def connect_robot(): # Helper: connect peripheral
central.scan() # Scan for devices
time.sleep_ms(600) # Short wait
central.connect('Robot-R32') # Connect by name
print("[Controller] BT connected") # Serial: BT status
connect_robot() # Connect BT
CODE_FORWARD = 0x18 # Canonical IR code
score_ir = 3 # Health score for IR (0–5)
score_bt = 3 # Health score for BT (0–5)
def clamp(v, lo, hi): # Helper: clamp numeric value
return max(lo, min(hi, v)) # Clamp within bounds
def send_ir(code): # Helper: IR send + score
global score_ir # Use global score
try: # Try IR
ir_tx.transmit(0x00, code, 0x00) # Transmit IR packet
print("[Controller] IR TX:", hex(code)) # Serial: send log
score_ir = clamp(score_ir + 1, 0, 5) # Increase IR health
except Exception as e: # If any IR error
print("[Controller] IR error", e) # Serial: error log
score_ir = clamp(score_ir - 2, 0, 5) # Penalize IR health
def send_bt(text): # Helper: BT send + score
global score_bt # Use global score
try: # Try BT
central.send(text) # Send BT text
print("[Controller] BT TX:", text) # Serial: send log
score_bt = clamp(score_bt + 1, 0, 5) # Increase BT health
except Exception as e: # If any BT error
print("[Controller] BT error", e) # Serial: error log
score_bt = clamp(score_bt - 2, 0, 5) # Penalize BT health
def best_medium(): # Helper: pick best channel
if score_bt > score_ir: # If BT score higher
return "BT" # Choose BT
else: # Else IR preferred or equal
return "IR" # Choose IR
while True: # Decision loop
route = best_medium() # Pick best medium
print("[Controller] Route:", route, # Serial: show decision
"IR=", score_ir, "BT=", score_bt) # Serial: health scores
if route == "IR": # If route is IR
send_ir(CODE_FORWARD) # Send IR forward
else: # Else route is BT
send_bt("CMD:FORWARD") # Send BT forward
time.sleep_ms(800) # Update cadence
Reflection: Auto‑selection adapts to changing conditions without manual toggling.
Challenge: Decay scores by −1 every 2 seconds to reflect aging/uncertain links.
🎮 Microproject 3.10.4 – Bandwidth optimization (compact messages, spacing)
Goal: Keep messages tiny and rate‑limited for smoother performance.
# Microproject 3.10.4 – Compact messages and adaptive spacing (Controller)
import ble_central # Load Bluetooth central helper
import time # Load time library
central = ble_central.BLESimpleCentral() # Bluetooth central object
print("[Controller] BT central ready") # Serial: BT ready
def connect_robot(): # Helper: connect to robot
central.scan() # Scan peripherals
time.sleep_ms(600) # Short scan delay
central.connect('Robot-R32') # Connect by name
print("[Controller] BT connected") # Serial: connection OK
connect_robot() # Establish BT link
last_ms = 0 # Last send timestamp (ms)
gap_ms = 160 # Base gap between sends
extra = 0 # Adaptive extra gap
def now_ms(): # Helper: current time in ms
return time.ticks_ms() # Millisecond ticks
def can_send(): # Helper: spacing check
elapsed = time.ticks_diff(now_ms(), last_ms) # Time since last send
return elapsed >= (gap_ms + extra) # Compare against gap
def mark_sent(): # Helper: mark send time
global last_ms # Use global timestamp
last_ms = now_ms() # Update last send time
def bump(): # Helper: increase gap
global extra # Use global extra
extra = min(extra + 40, 240) # Cap extra gap
def relax(): # Helper: reduce gap
global extra # Use global extra
extra = max(extra - 15, 0) # Lower extra gap
def tx(cmd): # Helper: compact transmit
central.send(cmd) # Send short text (e.g., "F","B","L","R")
print("[Controller] TX:", cmd, # Serial: log TX and gap
"| gap", gap_ms + extra, "ms") # Show total gap
mark_sent() # Mark send time
relax() # Reduce gap slightly
while True: # Control loop (compact commands)
if can_send(): # If spacing allows a send
tx("F") # Send compact forward command
else: # If spacing too tight
bump() # Increase gap to avoid flood
time.sleep_ms(40) # Short poll delay
Reflection: Short messages and adaptive spacing reduce congestion and lag.
Challenge: Map compact “F/B/L/R/S” on the robot side to actual actions.
🎮 Microproject 3.10.5 – Fault‑tolerant system (health checks + failover)
Goal: Robot tracks medium health; controller tries an alternate route if errors occur.
# Microproject 3.10.5 – Robot: health checks and clear feedback (BT + IR)
import ble_peripheral # Load Bluetooth peripheral helper
import ble_handle # Load Bluetooth callback helper
import irremote # Load IR library
import time # Load time library
ble_p = ble_peripheral.BLESimplePeripheral('Robot-R32') # Peripheral named Robot-R32
handle = ble_handle.Handle() # Callback handle
print("[Robot] BT ready") # Serial: BT init
ir_rx = irremote.NEC_RX(26, 8) # IR receiver setup
print("[Robot] IR RX ready") # Serial: IR init
health_bt = 3 # Health score for BT (0–5)
health_ir = 3 # Health score for IR (0–5)
def clamp(v, lo, hi): # Helper: clamp value
return max(lo, min(hi, v)) # Clamp within bounds
def ack(label): # Helper: send ACK with health
ble_p.send("ACK:" + str(label) + # Compose ACK label
" IR=" + str(health_ir) + # Include IR health score
" BT=" + str(health_bt)) # Include BT health score
print("[Robot] ACK:", label) # Serial: log ACK
def handle_method(msg): # Callback: process BT text
global health_bt # Use BT health
s = str(msg) # Ensure string
print("[Robot] BT RX:", s) # Serial: log RX
# Simple compact mapping for bandwidth optimization
if s == "F": # If compact forward
ack("CMD:F") # Send ACK for forward
health_bt = clamp(health_bt + 1, 0, 5) # Improve BT health
elif s == "S": # If compact stop
ack("CMD:S") # Send ACK for stop
health_bt = clamp(health_bt + 1, 0, 5) # Improve BT health
else: # Unknown command
ble_p.send("ERR:UNKNOWN") # Error feedback
health_bt = clamp(health_bt - 2, 0, 5) # Penalize BT health
handle.recv(handle_method) # Register callback
print("[Robot] BT callback registered") # Serial: active
while True: # Health + IR monitor loop
if ir_rx.any(): # If IR code available
code = ir_rx.code[0] # Read IR code
print("[Robot] IR RX:", hex(code)) # Serial: IR log
health_ir = clamp(health_ir + 1, 0, 5) # Improve IR health
ack("IR:CMD") # Send ACK to confirm IR path
time.sleep_ms(150) # Loop delay
Reflection: Health scores communicate which medium is strong and guide failover decisions.
Challenge: Have the controller switch to IR whenever “ERR:UNKNOWN” is received twice in a row.
✨ Main project – Hybrid communication (IR + Bluetooth + Serial)
System outline
- Controller: Chooses route (IR or BT) using health scores and adaptive spacing; logs decisions via Serial.
- Robot: Decodes IR and BT commands; updates health; sends ACK with health; logs to Serial.
- Failover: If route degrades, controller auto‑switches medium; robot reports health for visibility.
# Project 3.10 – Hybrid Communication (Controller + Robot)
# Controller: IR primary with BT fallback, auto-selection by health, adaptive spacing.
# Robot: Decode IR and BT, report health via ACK, clear serial diagnostics.
# ---------- CONTROLLER ----------
import irremote # Load IR library
import ble_central # Load Bluetooth central helper
import time # Load time library
ir_tx = irremote.NEC_TX(26, False, 100) # IR transmitter
central = ble_central.BLESimpleCentral() # Bluetooth central
print("[Controller] IR+BT ready") # Serial: init status
def connect_robot(): # Helper: connect to robot peripheral
central.scan() # Scan nearby devices
time.sleep_ms(600) # Short scan wait
central.connect('Robot-R32') # Connect by name
print("[Controller] BT connected") # Serial: status
connect_robot() # Connect BT
# Canonical IR codes and compact BT commands
CODE_FORWARD = 0x18 # IR forward
CMD_FWD_TX = "F" # BT compact forward
CMD_STOP_TX = "S" # BT compact stop
# Health scores for medium selection
score_ir = 3 # Start medium health at 3
score_bt = 3 # Start medium health at 3
# Adaptive spacing for BT
last_ms = 0 # Last BT send time (ms)
gap_ms = 180 # Base BT gap
extra = 0 # Adaptive extra gap
def now_ms(): # Helper: current ms
return time.ticks_ms() # Millisecond ticks
def can_bt_send(): # Helper: BT spacing check
elapsed = time.ticks_diff(now_ms(), last_ms) # Time since last BT send
return elapsed >= (gap_ms + extra) # Compare against gap
def mark_bt_sent(): # Helper: mark BT send time
global last_ms # Use global time
last_ms = now_ms() # Update last send time
def bump_gap(): # Helper: increase BT gap
global extra # Use global gap
extra = min(extra + 40, 240) # Cap extra gap
def relax_gap(): # Helper: reduce BT gap
global extra # Use global gap
extra = max(extra - 15, 0) # Lower extra gap
def clamp(v, lo, hi): # Helper: clamp value
return max(lo, min(hi, v)) # Clamp within bounds
def send_ir(code): # Helper: IR send + health adjust
global score_ir # Use IR score
try: # Try IR transmit
ir_tx.transmit(0x00, code, 0x00) # Send IR packet
print("[Controller] IR TX:", hex(code)) # Serial: log IR send
score_ir = clamp(score_ir + 1, 0, 5) # Improve IR score
except Exception as e: # On error
print("[Controller] IR error", e) # Serial: log error
score_ir = clamp(score_ir - 2, 0, 5) # Penalize IR score
def send_bt(text): # Helper: BT send + health adjust
global score_bt # Use BT score
try: # Try BT send
central.send(text) # Send BT text
print("[Controller] BT TX:", text, # Serial: log BT send
"| gap", gap_ms + extra, "ms") # Show total gap
mark_bt_sent() # Update last BT time
relax_gap() # Slightly reduce gap
score_bt = clamp(score_bt + 1, 0, 5) # Improve BT score
except Exception as e: # On error
print("[Controller] BT error", e) # Serial: log error
bump_gap() # Increase gap to avoid flood
score_bt = clamp(score_bt - 2, 0, 5) # Penalize BT score
def best_route(): # Helper: choose medium
if score_bt > score_ir and can_bt_send(): # If BT healthier and spaced
return "BT" # Use BT
else: # Otherwise
return "IR" # Use IR
print("[Controller] Hybrid loop start") # Serial: start loop
while True: # Controller decision loop
route = best_route() # Decide route
print("[Controller] Route:", route, # Serial: show decision
"IR=", score_ir, "BT=", score_bt) # Show health scores
if route == "IR": # If IR chosen
send_ir(CODE_FORWARD) # Send forward via IR
else: # If BT chosen
send_bt(CMD_FWD_TX) # Send compact forward
time.sleep_ms(800) # Pace loop
# ---------- ROBOT ----------
import ble_peripheral # Load Bluetooth peripheral helper
import ble_handle # Load Bluetooth callback helper
import irremote # Load IR library
import time # Load time library
ble_p = ble_peripheral.BLESimplePeripheral('Robot-R32') # Peripheral named Robot-R32
handle = ble_handle.Handle() # Callback handle
print("[Robot] BT ready") # Serial: BT init
ir_rx = irremote.NEC_RX(26, 8) # IR receiver
print("[Robot] IR RX ready") # Serial: IR init
health_bt = 3 # BT health score
health_ir = 3 # IR health score
def clamp(v, lo, hi): # Helper: clamp values
return max(lo, min(hi, v)) # Clamp within bounds
def ack(label): # Helper: ACK with health context
ble_p.send("ACK:" + str(label) + # Ack label
" IR=" + str(health_ir) + # IR health
" BT=" + str(health_bt)) # BT health
print("[Robot] ACK:", label) # Serial: log ACK
def handle_method(msg): # Callback: process BT commands
global health_bt # Use BT health
s = str(msg) # Ensure string
print("[Robot] BT RX:", s) # Serial: log RX
if s == "F": # Compact forward
ack("CMD:F") # Send ACK for forward
health_bt = clamp(health_bt + 1, 0, 5) # Improve BT health
elif s == "S": # Compact stop
ack("CMD:S") # Send ACK for stop
health_bt = clamp(health_bt + 1, 0, 5) # Improve BT health
else: # Unknown command
ble_p.send("ERR:UNKNOWN") # Error feedback
health_bt = clamp(health_bt - 2, 0, 5) # Penalize BT health
handle.recv(handle_method) # Register callback
print("[Robot] BT callback registered") # Serial: active
CODE_FORWARD = 0x18 # IR forward code
while True: # Robot processing loop
if ir_rx.any(): # If IR code available
code = ir_rx.code[0] # Read IR code
print("[Robot] IR RX:", hex(code)) # Serial: log code
if code == CODE_FORWARD: # If forward
ack("IR:CMD:F") # ACK and improve IR health
health_ir = clamp(health_ir + 1, 0, 5)# Improve IR health
else: # Unknown IR
ack("IR:CMD:?") # Unknown IR ACK
health_ir = clamp(health_ir - 1, 0, 5)# Slight penalty
time.sleep_ms(150) # Loop pacing
External explanation
- What it teaches: Designing systems that keep working when a single link fails by monitoring health, adapting spacing, and switching mediums.
- Why it works: Each medium has clear helpers and health scores; the controller chooses routes; the robot reports status; serial logs reveal decisions.
- Key concept: Fault tolerance emerges from small, consistent checks and simple, readable code.
Story time
Your rover speaks two languages. If the light beam gets blocked, it whispers over radio. If the radio gets noisy, it flashes light again. And it keeps you informed with clear, calm updates.
Debugging (2)
Debugging 3.10.A – Protocol conflicts
- Symptom: Commands collide or duplicate actions when both mediums are active.
- Fix: Space sends, avoid simultaneous IR+BT duplicates, and prioritize one route per cycle.
# Pick a single route per loop iteration
route = best_route() # Decide once
# Do not send both; only send on the chosen route
Debugging 3.10.B – Unnecessary changes in the medium
- Symptom: Rapid switching between IR and BT causes jitter.
- Fix: Add hysteresis by requiring a 2‑point score advantage before switching.
# Hysteresis: require BT to be 2 points better before switching
def best_route_hysteresis():
if score_bt >= score_ir + 2 and can_bt_send():
return "BT"
else:
return "IR"
Final checklist
- One protocol (IR or BT) works cleanly by itself.
- Redundancy sends via the fallback when the primary is stale.
- Auto‑selection picks the best medium using health scores.
- Compact messages and adaptive spacing reduce congestion.
- Robot reports ACK with health; serial logs show clear decisions.
Extras
- Student tip: Watch health scores in logs to understand why routes change.
- Instructor tip: Demonstrate blocking IR (cover the sensor) and watch the system fail over to Bluetooth.
- Glossary:
- Medium: The communication path (IR or Bluetooth).
- Health score: A simple number tracking a medium’s recent success.
- Hysteresis: A margin that reduces rapid switching back and forth.
- Mini tips:
- Keep messages short (“F”, “S”) for BT when possible.
- Use ~800–1000 ms cadence to avoid flooding and to keep UI smooth.
- Keep logs concise; avoid printing inside very tight loops.