📡 Level 3 – Advanced Communication

Project 3.10: "Hybrid Communication"

 

What you’ll learn

  • Goal 1: Implement a single communication protocol (choose IR or Bluetooth) cleanly.
  • Goal 2: Add redundancy: if one channel fails, automatically switch to another.
  • Goal 3: Build medium auto‑selection so commands pick the best available link.
  • Goal 4: Optimize bandwidth with compact messages and adaptive spacing.
  • Goal 5: Create a fault‑tolerant system with health checks and clear serial diagnostics.

Key ideas

  • Separation of concerns: Define sender and receiver helpers per medium.
  • Health monitoring: Track recent success/failure to choose the best route.
  • Adaptive cadence: Space messages to avoid flooding and reduce lag.
  • Clear logging: Serial prints confirm decisions without spamming.

Blocks glossary

  • IR send/receive: One‑way simple remote control over infrared light (NEC).
  • Bluetooth central/peripheral: Two‑way text messages; central connects, peripheral advertises.
  • Serial print: Human‑readable logs for state, decisions, and errors.
  • Callback (receive): Function triggered automatically on Bluetooth RX.
  • Variable state: Track medium health, last success time, error counters.
  • def function: Reusable helpers for send, receive, select, and fallback.
  • Loop: Regular command cycle with health checks and medium selection.

What you need

PartHow many?Pin connection (R32)
D1 R32 (Controller)1IR TX → Pin 26; Bluetooth central (internal)
D1 R32 (Robot)1IR RX → Pin 26; Bluetooth peripheral (internal); Motors optional
Optional motors (L298N)1ENA → 5 (PWM), ENB → 18 (PWM), IN1 → 23, IN2 → 19, IN3 → 13, IN4 → 21
  • Keep boards within 1–3 m for reliable Bluetooth.
  • Aim IR TX at IR RX (20–50 cm line of sight).
  • Open two serial monitors: Controller and Robot.

Before you start

  • Plug in USB for both boards and open serial monitors.
  • Quick test:
print("Ready!")  # Confirm serial is working

🎮 Microprojects (5 mini missions)

🎮 Microproject 3.10.1 – Single communication protocol (IR or Bluetooth)

Goal: Cleanly implement one protocol end‑to‑end. Choose IR here; Bluetooth appears next.

# Microproject 3.10.1 – Single protocol using IR (Controller + Robot minimal)

# ---------- CONTROLLER (IR TX) ----------
import irremote                                   # Load IR library
import time                                       # Load time library

ir_tx = irremote.NEC_TX(26, False, 100)           # IR transmitter on Pin 26
print("[Controller] IR TX ready")                 # Serial: confirm IR setup

CODE_FORWARD = 0x18                               # Forward code (canonical)

def send_ir(code):                                # Helper: transmit IR code
    ir_tx.transmit(0x00, code, 0x00)              # Send with address/control 0x00
    print("[Controller] IR TX:", hex(code))       # Serial: log code sent

while True:                                       # Simple demo loop
    send_ir(CODE_FORWARD)                         # Send forward command
    time.sleep_ms(1000)                           # Wait 1 second and repeat

# ---------- ROBOT (IR RX) ----------
import irremote                                   # Load IR library
import time                                       # Load time library

ir_rx = irremote.NEC_RX(26, 8)                    # IR receiver on Pin 26
print("[Robot] IR RX ready")                      # Serial: confirm IR RX

CODE_FORWARD = 0x18                               # Must match controller code

while True:                                       # Decode loop
    if ir_rx.any():                               # If any IR code in buffer
        code = ir_rx.code[0]                      # Read first code
        print("[Robot] IR RX:", hex(code))        # Serial: log receive
        if code == CODE_FORWARD:                  # If forward command
            print("[Robot] Action: FORWARD")      # Serial: pretend action
        else:                                     # Unknown code
            print("[Robot] Unknown code")         # Serial: warn
    time.sleep_ms(150)                            # Keep loop responsive

Reflection: One protocol is clear and easy — perfect baseline before mixing.
Challenge: Switch to Bluetooth-only by re‑sending “CMD:FORWARD” text messages.


🎮 Microproject 3.10.2 – Communications redundancy (IR primary, Bluetooth fallback)

Goal: If IR fails (no ACK or stale), automatically send over Bluetooth.

# Microproject 3.10.2 – Controller redundancy: IR primary → Bluetooth fallback

import irremote                                   # Load IR library
import ble_central                                # Load Bluetooth central helper
import time                                       # Load time library

ir_tx = irremote.NEC_TX(26, False, 100)           # IR transmitter setup
print("[Controller] IR TX ready")                 # Serial: IR ready

central = ble_central.BLESimpleCentral()          # Bluetooth central object
print("[Controller] BT central ready")            # Serial: BT ready

def connect_robot():                              # Helper: connect to robot peripheral
    central.scan()                                # Scan peripherals
    time.sleep_ms(600)                            # Short scan wait
    central.connect('Robot-R32')                  # Connect by name
    print("[Controller] BT connected")            # Serial: connection status

connect_robot()                                   # Establish BT connection

CODE_FORWARD = 0x18                               # IR forward code
last_ir_ms = 0                                    # Last successful IR send ms
ir_timeout_ms = 2000                              # IR considered stale after 2 s

def now_ms():                                     # Helper: current ms
    return time.ticks_ms()                        # Millisecond ticks

def send_ir(code):                                # Helper: try IR send
    global last_ir_ms                             # Update last IR time
    ir_tx.transmit(0x00, code, 0x00)              # Transmit IR packet
    last_ir_ms = now_ms()                         # Record send time
    print("[Controller] IR TX:", hex(code))       # Serial: log IR send

def send_bt(text):                                # Helper: send Bluetooth text
    central.send(text)                            # Transmit BT message
    print("[Controller] BT TX:", text)            # Serial: log BT send

while True:                                       # Redundancy loop
    # Try IR first
    send_ir(CODE_FORWARD)                         # Send forward via IR
    time.sleep_ms(300)                            # Short spacing
    # If IR is stale (no recent activity), use BT as fallback
    if time.ticks_diff(now_ms(), last_ir_ms) > ir_timeout_ms:  # If IR stale
        send_bt("CMD:FORWARD")                    # Send same command via BT
    time.sleep_ms(700)                            # Pace total loop ~1 s

Reflection: Redundancy preserves control when one medium struggles.
Challenge: Reverse roles (Bluetooth primary, IR fallback) when the robot reports “ERR:BT”.


🎮 Microproject 3.10.3 – Medium automatic transmission (choose best link)

Goal: Pick the medium based on recent success and signal cadence.

# Microproject 3.10.3 – Controller: auto-select best medium (IR vs Bluetooth)

import irremote                                   # Load IR library
import ble_central                                # Load Bluetooth central helper
import time                                       # Load time library

ir_tx = irremote.NEC_TX(26, False, 100)           # IR transmitter
central = ble_central.BLESimpleCentral()          # Bluetooth central
print("[Controller] IR+BT ready")                 # Serial: init status

def connect_robot():                              # Helper: connect peripheral
    central.scan()                                # Scan for devices
    time.sleep_ms(600)                            # Short wait
    central.connect('Robot-R32')                  # Connect by name
    print("[Controller] BT connected")            # Serial: BT status

connect_robot()                                   # Connect BT

CODE_FORWARD = 0x18                               # Canonical IR code
score_ir = 3                                      # Health score for IR (0–5)
score_bt = 3                                      # Health score for BT (0–5)

def clamp(v, lo, hi):                             # Helper: clamp numeric value
    return max(lo, min(hi, v))                    # Clamp within bounds

def send_ir(code):                                # Helper: IR send + score
    global score_ir                               # Use global score
    try:                                          # Try IR
        ir_tx.transmit(0x00, code, 0x00)          # Transmit IR packet
        print("[Controller] IR TX:", hex(code))   # Serial: send log
        score_ir = clamp(score_ir + 1, 0, 5)      # Increase IR health
    except Exception as e:                        # If any IR error
        print("[Controller] IR error", e)         # Serial: error log
        score_ir = clamp(score_ir - 2, 0, 5)      # Penalize IR health

def send_bt(text):                                # Helper: BT send + score
    global score_bt                               # Use global score
    try:                                          # Try BT
        central.send(text)                        # Send BT text
        print("[Controller] BT TX:", text)        # Serial: send log
        score_bt = clamp(score_bt + 1, 0, 5)      # Increase BT health
    except Exception as e:                        # If any BT error
        print("[Controller] BT error", e)         # Serial: error log
        score_bt = clamp(score_bt - 2, 0, 5)      # Penalize BT health

def best_medium():                                # Helper: pick best channel
    if score_bt > score_ir:                       # If BT score higher
        return "BT"                                # Choose BT
    else:                                         # Else IR preferred or equal
        return "IR"                                # Choose IR

while True:                                       # Decision loop
    route = best_medium()                         # Pick best medium
    print("[Controller] Route:", route,           # Serial: show decision
          "IR=", score_ir, "BT=", score_bt)       # Serial: health scores
    if route == "IR":                             # If route is IR
        send_ir(CODE_FORWARD)                     # Send IR forward
    else:                                         # Else route is BT
        send_bt("CMD:FORWARD")                    # Send BT forward
    time.sleep_ms(800)                            # Update cadence

Reflection: Auto‑selection adapts to changing conditions without manual toggling.
Challenge: Decay scores by −1 every 2 seconds to reflect aging/uncertain links.


🎮 Microproject 3.10.4 – Bandwidth optimization (compact messages, spacing)

Goal: Keep messages tiny and rate‑limited for smoother performance.

# Microproject 3.10.4 – Compact messages and adaptive spacing (Controller)

import ble_central                                # Load Bluetooth central helper
import time                                       # Load time library

central = ble_central.BLESimpleCentral()          # Bluetooth central object
print("[Controller] BT central ready")            # Serial: BT ready

def connect_robot():                              # Helper: connect to robot
    central.scan()                                # Scan peripherals
    time.sleep_ms(600)                            # Short scan delay
    central.connect('Robot-R32')                  # Connect by name
    print("[Controller] BT connected")            # Serial: connection OK

connect_robot()                                   # Establish BT link

last_ms = 0                                       # Last send timestamp (ms)
gap_ms  = 160                                     # Base gap between sends
extra   = 0                                       # Adaptive extra gap

def now_ms():                                     # Helper: current time in ms
    return time.ticks_ms()                        # Millisecond ticks

def can_send():                                   # Helper: spacing check
    elapsed = time.ticks_diff(now_ms(), last_ms)  # Time since last send
    return elapsed >= (gap_ms + extra)            # Compare against gap

def mark_sent():                                  # Helper: mark send time
    global last_ms                                # Use global timestamp
    last_ms = now_ms()                            # Update last send time

def bump():                                       # Helper: increase gap
    global extra                                  # Use global extra
    extra = min(extra + 40, 240)                  # Cap extra gap

def relax():                                      # Helper: reduce gap
    global extra                                  # Use global extra
    extra = max(extra - 15, 0)                    # Lower extra gap

def tx(cmd):                                      # Helper: compact transmit
    central.send(cmd)                             # Send short text (e.g., "F","B","L","R")
    print("[Controller] TX:", cmd,                # Serial: log TX and gap
          "| gap", gap_ms + extra, "ms")          # Show total gap
    mark_sent()                                   # Mark send time
    relax()                                       # Reduce gap slightly

while True:                                       # Control loop (compact commands)
    if can_send():                                # If spacing allows a send
        tx("F")                                   # Send compact forward command
    else:                                         # If spacing too tight
        bump()                                    # Increase gap to avoid flood
    time.sleep_ms(40)                              # Short poll delay

Reflection: Short messages and adaptive spacing reduce congestion and lag.
Challenge: Map compact “F/B/L/R/S” on the robot side to actual actions.


🎮 Microproject 3.10.5 – Fault‑tolerant system (health checks + failover)

Goal: Robot tracks medium health; controller tries an alternate route if errors occur.

# Microproject 3.10.5 – Robot: health checks and clear feedback (BT + IR)

import ble_peripheral                             # Load Bluetooth peripheral helper
import ble_handle                                 # Load Bluetooth callback helper
import irremote                                   # Load IR library
import time                                       # Load time library

ble_p = ble_peripheral.BLESimplePeripheral('Robot-R32')  # Peripheral named Robot-R32
handle = ble_handle.Handle()                              # Callback handle
print("[Robot] BT ready")                                 # Serial: BT init

ir_rx = irremote.NEC_RX(26, 8)                    # IR receiver setup
print("[Robot] IR RX ready")                      # Serial: IR init

health_bt = 3                                     # Health score for BT (0–5)
health_ir = 3                                     # Health score for IR (0–5)

def clamp(v, lo, hi):                             # Helper: clamp value
    return max(lo, min(hi, v))                    # Clamp within bounds

def ack(label):                                   # Helper: send ACK with health
    ble_p.send("ACK:" + str(label) +              # Compose ACK label
               " IR=" + str(health_ir) +          # Include IR health score
               " BT=" + str(health_bt))           # Include BT health score
    print("[Robot] ACK:", label)                  # Serial: log ACK

def handle_method(msg):                           # Callback: process BT text
    global health_bt                              # Use BT health
    s = str(msg)                                  # Ensure string
    print("[Robot] BT RX:", s)                    # Serial: log RX
    # Simple compact mapping for bandwidth optimization
    if s == "F":                                  # If compact forward
        ack("CMD:F")                              # Send ACK for forward
        health_bt = clamp(health_bt + 1, 0, 5)    # Improve BT health
    elif s == "S":                                # If compact stop
        ack("CMD:S")                              # Send ACK for stop
        health_bt = clamp(health_bt + 1, 0, 5)    # Improve BT health
    else:                                         # Unknown command
        ble_p.send("ERR:UNKNOWN")                 # Error feedback
        health_bt = clamp(health_bt - 2, 0, 5)    # Penalize BT health

handle.recv(handle_method)                        # Register callback
print("[Robot] BT callback registered")           # Serial: active

while True:                                       # Health + IR monitor loop
    if ir_rx.any():                               # If IR code available
        code = ir_rx.code[0]                      # Read IR code
        print("[Robot] IR RX:", hex(code))        # Serial: IR log
        health_ir = clamp(health_ir + 1, 0, 5)    # Improve IR health
        ack("IR:CMD")                              # Send ACK to confirm IR path
    time.sleep_ms(150)                            # Loop delay

Reflection: Health scores communicate which medium is strong and guide failover decisions.
Challenge: Have the controller switch to IR whenever “ERR:UNKNOWN” is received twice in a row.


✨ Main project – Hybrid communication (IR + Bluetooth + Serial)

System outline

  • Controller: Chooses route (IR or BT) using health scores and adaptive spacing; logs decisions via Serial.
  • Robot: Decodes IR and BT commands; updates health; sends ACK with health; logs to Serial.
  • Failover: If route degrades, controller auto‑switches medium; robot reports health for visibility.
# Project 3.10 – Hybrid Communication (Controller + Robot)
# Controller: IR primary with BT fallback, auto-selection by health, adaptive spacing.
# Robot: Decode IR and BT, report health via ACK, clear serial diagnostics.

# ---------- CONTROLLER ----------
import irremote                                   # Load IR library
import ble_central                                # Load Bluetooth central helper
import time                                       # Load time library

ir_tx = irremote.NEC_TX(26, False, 100)           # IR transmitter
central = ble_central.BLESimpleCentral()          # Bluetooth central
print("[Controller] IR+BT ready")                 # Serial: init status

def connect_robot():                              # Helper: connect to robot peripheral
    central.scan()                                # Scan nearby devices
    time.sleep_ms(600)                            # Short scan wait
    central.connect('Robot-R32')                  # Connect by name
    print("[Controller] BT connected")            # Serial: status

connect_robot()                                   # Connect BT

# Canonical IR codes and compact BT commands
CODE_FORWARD = 0x18                               # IR forward
CMD_FWD_TX   = "F"                                # BT compact forward
CMD_STOP_TX  = "S"                                # BT compact stop

# Health scores for medium selection
score_ir = 3                                      # Start medium health at 3
score_bt = 3                                      # Start medium health at 3

# Adaptive spacing for BT
last_ms  = 0                                      # Last BT send time (ms)
gap_ms   = 180                                    # Base BT gap
extra    = 0                                      # Adaptive extra gap

def now_ms():                                     # Helper: current ms
    return time.ticks_ms()                        # Millisecond ticks

def can_bt_send():                                # Helper: BT spacing check
    elapsed = time.ticks_diff(now_ms(), last_ms)  # Time since last BT send
    return elapsed >= (gap_ms + extra)            # Compare against gap

def mark_bt_sent():                               # Helper: mark BT send time
    global last_ms                                # Use global time
    last_ms = now_ms()                            # Update last send time

def bump_gap():                                   # Helper: increase BT gap
    global extra                                  # Use global gap
    extra = min(extra + 40, 240)                  # Cap extra gap

def relax_gap():                                  # Helper: reduce BT gap
    global extra                                  # Use global gap
    extra = max(extra - 15, 0)                    # Lower extra gap

def clamp(v, lo, hi):                             # Helper: clamp value
    return max(lo, min(hi, v))                    # Clamp within bounds

def send_ir(code):                                # Helper: IR send + health adjust
    global score_ir                               # Use IR score
    try:                                          # Try IR transmit
        ir_tx.transmit(0x00, code, 0x00)          # Send IR packet
        print("[Controller] IR TX:", hex(code))   # Serial: log IR send
        score_ir = clamp(score_ir + 1, 0, 5)      # Improve IR score
    except Exception as e:                        # On error
        print("[Controller] IR error", e)         # Serial: log error
        score_ir = clamp(score_ir - 2, 0, 5)      # Penalize IR score

def send_bt(text):                                # Helper: BT send + health adjust
    global score_bt                               # Use BT score
    try:                                          # Try BT send
        central.send(text)                        # Send BT text
        print("[Controller] BT TX:", text,        # Serial: log BT send
              "| gap", gap_ms + extra, "ms")      # Show total gap
        mark_bt_sent()                            # Update last BT time
        relax_gap()                               # Slightly reduce gap
        score_bt = clamp(score_bt + 1, 0, 5)      # Improve BT score
    except Exception as e:                        # On error
        print("[Controller] BT error", e)         # Serial: log error
        bump_gap()                                # Increase gap to avoid flood
        score_bt = clamp(score_bt - 2, 0, 5)      # Penalize BT score

def best_route():                                 # Helper: choose medium
    if score_bt > score_ir and can_bt_send():     # If BT healthier and spaced
        return "BT"                                # Use BT
    else:                                         # Otherwise
        return "IR"                                # Use IR

print("[Controller] Hybrid loop start")           # Serial: start loop

while True:                                       # Controller decision loop
    route = best_route()                          # Decide route
    print("[Controller] Route:", route,           # Serial: show decision
          "IR=", score_ir, "BT=", score_bt)       # Show health scores
    if route == "IR":                             # If IR chosen
        send_ir(CODE_FORWARD)                     # Send forward via IR
    else:                                         # If BT chosen
        send_bt(CMD_FWD_TX)                       # Send compact forward
    time.sleep_ms(800)                            # Pace loop

# ---------- ROBOT ----------
import ble_peripheral                             # Load Bluetooth peripheral helper
import ble_handle                                 # Load Bluetooth callback helper
import irremote                                   # Load IR library
import time                                       # Load time library

ble_p = ble_peripheral.BLESimplePeripheral('Robot-R32')  # Peripheral named Robot-R32
handle = ble_handle.Handle()                              # Callback handle
print("[Robot] BT ready")                                 # Serial: BT init

ir_rx = irremote.NEC_RX(26, 8)                    # IR receiver
print("[Robot] IR RX ready")                      # Serial: IR init

health_bt = 3                                     # BT health score
health_ir = 3                                     # IR health score

def clamp(v, lo, hi):                             # Helper: clamp values
    return max(lo, min(hi, v))                    # Clamp within bounds

def ack(label):                                   # Helper: ACK with health context
    ble_p.send("ACK:" + str(label) +              # Ack label
               " IR=" + str(health_ir) +          # IR health
               " BT=" + str(health_bt))           # BT health
    print("[Robot] ACK:", label)                  # Serial: log ACK

def handle_method(msg):                           # Callback: process BT commands
    global health_bt                              # Use BT health
    s = str(msg)                                  # Ensure string
    print("[Robot] BT RX:", s)                    # Serial: log RX
    if s == "F":                                  # Compact forward
        ack("CMD:F")                              # Send ACK for forward
        health_bt = clamp(health_bt + 1, 0, 5)    # Improve BT health
    elif s == "S":                                # Compact stop
        ack("CMD:S")                              # Send ACK for stop
        health_bt = clamp(health_bt + 1, 0, 5)    # Improve BT health
    else:                                         # Unknown command
        ble_p.send("ERR:UNKNOWN")                 # Error feedback
        health_bt = clamp(health_bt - 2, 0, 5)    # Penalize BT health

handle.recv(handle_method)                        # Register callback
print("[Robot] BT callback registered")           # Serial: active

CODE_FORWARD = 0x18                               # IR forward code

while True:                                       # Robot processing loop
    if ir_rx.any():                               # If IR code available
        code = ir_rx.code[0]                      # Read IR code
        print("[Robot] IR RX:", hex(code))        # Serial: log code
        if code == CODE_FORWARD:                  # If forward
            ack("IR:CMD:F")                       # ACK and improve IR health
            health_ir = clamp(health_ir + 1, 0, 5)# Improve IR health
        else:                                     # Unknown IR
            ack("IR:CMD:?")                       # Unknown IR ACK
            health_ir = clamp(health_ir - 1, 0, 5)# Slight penalty
    time.sleep_ms(150)                            # Loop pacing

External explanation

  • What it teaches: Designing systems that keep working when a single link fails by monitoring health, adapting spacing, and switching mediums.
  • Why it works: Each medium has clear helpers and health scores; the controller chooses routes; the robot reports status; serial logs reveal decisions.
  • Key concept: Fault tolerance emerges from small, consistent checks and simple, readable code.

Story time

Your rover speaks two languages. If the light beam gets blocked, it whispers over radio. If the radio gets noisy, it flashes light again. And it keeps you informed with clear, calm updates.


Debugging (2)

Debugging 3.10.A – Protocol conflicts

  • Symptom: Commands collide or duplicate actions when both mediums are active.
  • Fix: Space sends, avoid simultaneous IR+BT duplicates, and prioritize one route per cycle.
# Pick a single route per loop iteration
route = best_route()        # Decide once
# Do not send both; only send on the chosen route

Debugging 3.10.B – Unnecessary changes in the medium

  • Symptom: Rapid switching between IR and BT causes jitter.
  • Fix: Add hysteresis by requiring a 2‑point score advantage before switching.
# Hysteresis: require BT to be 2 points better before switching
def best_route_hysteresis():
    if score_bt >= score_ir + 2 and can_bt_send():
        return "BT"
    else:
        return "IR"

Final checklist

  • One protocol (IR or BT) works cleanly by itself.
  • Redundancy sends via the fallback when the primary is stale.
  • Auto‑selection picks the best medium using health scores.
  • Compact messages and adaptive spacing reduce congestion.
  • Robot reports ACK with health; serial logs show clear decisions.

Extras

  • Student tip: Watch health scores in logs to understand why routes change.
  • Instructor tip: Demonstrate blocking IR (cover the sensor) and watch the system fail over to Bluetooth.
  • Glossary:
    • Medium: The communication path (IR or Bluetooth).
    • Health score: A simple number tracking a medium’s recent success.
    • Hysteresis: A margin that reduces rapid switching back and forth.
  • Mini tips:
    • Keep messages short (“F”, “S”) for BT when possible.
    • Use ~800–1000 ms cadence to avoid flooding and to keep UI smooth.
    • Keep logs concise; avoid printing inside very tight loops.
On this page