Project 6.5: "Two Cooperative Robots"
What you’ll learn
- ✅ Robot‑to‑robot communication: Send and receive small, labeled messages over WiFi to coordinate actions.
- ✅ Coordinated tasks: Plan simple teamwork like transporting an object with roles (Leader, Helper).
- ✅ Collision avoidance: Use a “lane reservation” rule so two robots don’t crash.
- ✅ Movement formations: Keep formation (line, V) with a leader and a follower using timed steps.
- ✅ Cooperative problem solving: Reassign tasks when one robot fails and confirm success.
Blocks glossary (used in this project)
- WiFi UDP send/recv: Lightweight datagrams for fast robot‑to‑robot messaging.
- Roles and states: Variables to track LEADER/HELPER, task state, and formation mode.
- Lane reservation: A shared flag (RESERVE:LANE) to prevent simultaneous movement conflicts.
- Timed motion helpers: Forward/back/turn steps with clean stops.
- Serial println: Short “KEY:VALUE” lines like MSG:…, TASK:…, RESERVE:…, FORM:…, CONFIRM:….
What you need
| Part | How many? | Pin connection / Notes |
|---|---|---|
| D1 R32 (ESP32) robots | 2 | Each with USB, same WiFi network |
| L298N + TT motors | 2 sets | Robot A and Robot B wired to their motor pins |
| LED module (optional) | 2 | Pin 13 as status light |
| WiFi network (2.4 GHz) | 1 | SSID/PASSWORD known |
Notes
- Each robot needs its own static or known IP; set PEER_IP accordingly.
- Share a simple text protocol so both robots understand the same messages.
- Keep prints short and labeled for classroom debugging.
Before you start
- Robots are on the same WiFi network
- You know each robot’s IP address
- Serial monitor is open and shows:
print("Ready!") # Confirm serial is working so you can see messages
Microprojects 1–5
Microproject 6.5.1 – Communication between robots via WiFi
Goal: Send “Hello” and receive peer messages using UDP; print what you get.
Blocks used:
- UDP socket: sendto() and recvfrom().
- Serial println: MSG:SENT and MSG:RECV.
MicroPython code:
import network # Import network to check WiFi connection
import socket # Import socket to create UDP sockets
import time # Import time for pacing
SSID = "YourSSID" # Store your WiFi SSID
PASSWORD = "YourPassword" # Store your WiFi password
ROLE = "A" # Set robot role ("A" or "B")
PEER_IP = "192.168.1.51" if ROLE == "A" else "192.168.1.50" # Pick peer IP based on role
PORT = 6006 # Choose a shared UDP port
wlan = network.WLAN(network.STA_IF) # Create station interface
wlan.active(True) # Activate WiFi
wlan.connect(SSID, PASSWORD) # Connect with credentials
print("NET:CONNECTING", SSID) # Print which SSID we are connecting to
while not wlan.isconnected(): # Wait until connected
time.sleep(0.2) # Short delay while waiting
print("NET:CONNECTED", wlan.ifconfig()[0]) # Print assigned IP
u = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create UDP socket
u.bind(("0.0.0.0", PORT)) # Bind to all interfaces on PORT
u.settimeout(0.2) # Make recvfrom non-blocking with short timeout
print("UDP:LISTEN", PORT) # Print listening port
msg = (ROLE + ":HELLO").encode() # Build hello message bytes
u.sendto(msg, (PEER_IP, PORT)) # Send hello to peer
print("MSG:SENT", PEER_IP) # Print send confirmation
try: # Try to receive a message
data, addr = u.recvfrom(256) # Receive up to 256 bytes
print("MSG:RECV", addr[0], data.decode()) # Print received message and sender IP
except OSError: # If timeout occurs
print("MSG:NONE") # Report no message
u.close() # Close UDP socket
Reflection: Saying “HELLO” proves both robots can see each other—first handshake done.
Challenge:
- Easy: Add a second send “PING” and wait for “PONG”.
- Harder: Add a message counter and print “SEQ:n” for ordered debugging.
Microproject 6.5.2 – Coordinated task: transporting an object
Goal: Leader requests help; helper acknowledges; both step forward together.
Blocks used:
- Protocol tags: TASK:REQUEST, TASK:ACK, TASK:GO.
- Timed motion: Short forward pulses with a clean stop.
MicroPython code:
import socket # Import socket for UDP
import time # Import time for step timing
ROLE = "A" # Set role ("A"=Leader, "B"=Helper)
PEER_IP = "192.168.1.51" if ROLE == "A" else "192.168.1.50" # Peer IP
PORT = 6007 # Shared task UDP port
u = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create UDP socket
u.bind(("0.0.0.0", PORT)) # Bind to local port for receiving
u.settimeout(0.2) # Short timeout for non-blocking receive
print("TASK:SOCKET_READY", PORT) # Print socket ready
def send(tag): # Helper to send a tag to peer
u.sendto((ROLE + ":" + tag).encode(), (PEER_IP, PORT)) # Send encoded tag
print("TASK:SEND", tag) # Print what sent
def forward_step(ms=300): # Simulate a forward step
print("MOVE:FWD_STEP", ms) # Print step duration
time.sleep(ms / 1000.0) # Wait for the given milliseconds
print("MOTORS:STOP") # Print stop (placeholder for real motor stop)
if ROLE == "A": # If Leader
send("TASK_REQUEST") # Ask helper to assist
acked = False # Track whether helper acknowledged
start = time.ticks_ms() # Record start time
while time.ticks_diff(time.ticks_ms(), start) < 2000: # Wait up to 2 s
try: # Try receiving
data, addr = u.recvfrom(128) # Receive bytes from socket
text = data.decode() # Decode bytes to string
print("TASK:RECV", text) # Print received text
if "TASK_ACK" in text: # If acknowledgement
acked = True # Mark acknowledged
break # Exit wait
except OSError: # If timeout
pass # Keep waiting
if acked: # If helper acknowledged
send("TASK_GO") # Send go signal
forward_step(400) # Leader forward step
else: # If no acknowledgement
print("TASK:HELPER_MISSING") # Report missing helper
else: # If Helper
try: # Try to receive Leader request
data, addr = u.recvfrom(128) # Receive bytes
text = data.decode() # Decode to string
print("TASK:RECV", text) # Print received text
if "TASK_REQUEST" in text: # If leader asked
send("TASK_ACK") # Send acknowledgement
# Wait for GO signal
go = False # Track go signal
start = time.ticks_ms() # Record start time
while time.ticks_diff(time.ticks_ms(), start) < 2000: # Wait up to 2 s
try: # Try receiving
d2, a2 = u.recvfrom(128) # Receive next bytes
t2 = d2.decode() # Decode to string
print("TASK:RECV", t2) # Print received text
if "TASK_GO" in t2: # If leader sent go
go = True # Mark go received
break # Exit wait
except OSError: # On timeout
pass # Keep waiting
if go: # If go was received
forward_step(400) # Helper forward step
else: # No go
print("TASK:NO_GO") # Report lack of go
except OSError: # If no request received
print("TASK:NONE") # Report none
u.close() # Close UDP socket
Reflection: Clear REQUEST → ACK → GO keeps team movement synchronized and predictable.
Challenge:
- Easy: Add a second “GO2” step to simulate a two‑stage carry.
- Harder: Add “TASK:CANCEL” if ACK doesn’t arrive within 2 seconds.
Microproject 6.5.3 – Avoiding collisions between robots
Goal: Implement a simple lane reservation so only one robot moves in a shared lane.
Blocks used:
- Reserve message: RESERVE:LANE and RELEASE:LANE.
- Priority rule: A has priority; B yields if conflict.
MicroPython code:
import socket # Import socket for UDP reservations
import time # Import time for timeouts
ROLE = "A" # Set role ("A" or "B")
PEER_IP = "192.168.1.51" if ROLE == "A" else "192.168.1.50" # Peer IP
PORT = 6008 # Reservation UDP port
u = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create UDP socket
u.bind(("0.0.0.0", PORT)) # Bind local port
u.settimeout(0.1) # Short timeout for responsiveness
print("RESERVE:SOCKET_READY", PORT) # Print socket ready
lane_reserved = False # Track local reservation state
def send(tag): # Helper to send reservation tags
u.sendto((ROLE + ":" + tag).encode(), (PEER_IP, PORT)) # Send encoded tag
print("RESERVE:SEND", tag) # Print tag sent
def try_reserve(): # Attempt to reserve the lane
global lane_reserved # Use global state
send("RESERVE_LANE") # Send reserve request
start = time.ticks_ms() # Record start time
while time.ticks_diff(time.ticks_ms(), start) < 400: # Wait up to 400 ms
try: # Try receive
data, addr = u.recvfrom(128) # Receive bytes
text = data.decode() # Decode text
print("RESERVE:RECV", text) # Print message
if ("RESERVE_LANE" in text) and (ROLE == "B"): # If peer also reserves and I am B
print("RESERVE:YEILD_B") # Yield policy for B
lane_reserved = False # Do not reserve
return False # Fail reservation
except OSError: # Timeout
pass # Continue waiting
lane_reserved = True # Mark reserved if no conflict
print("RESERVE:OK") # Print success
return True # Return success
def release(): # Release lane reservation
global lane_reserved # Use global
lane_reserved = False # Clear reservation
send("RELEASE_LANE") # Notify release
print("RESERVE:RELEASED") # Print release
if try_reserve(): # Attempt reservation
print("MOVE:FWD_SAFE") # Print safe forward (placeholder for real motion)
time.sleep(0.4) # Simulate movement duration
release() # Release after movement
else: # If reservation failed
print("MOVE:WAIT") # Print wait
time.sleep(0.4) # Wait before retry
u.close() # Close UDP socket
Reflection: A tiny rule prevents big crashes—decide who goes first, and stick to it.
Challenge:
- Easy: Add a retry after “MOVE:WAIT”.
- Harder: Add a “RESERVE:TIMEOUT” that auto‑releases after 2 s if someone forgot.
Microproject 6.5.4 – Movement formations
Goal: Leader moves; follower mirrors steps with a small delay for formation (LINE, then V).
Blocks used:
- Formation tags: FORM:LINE and FORM:V.
- Follower delay: DELAY_MS to keep spacing.
MicroPython code:
import socket # Import socket for formation control
import time # Import time for delays
ROLE = "A" # Set role ("A" Leader, "B" Follower)
PEER_IP = "192.168.1.51" if ROLE == "A" else "192.168.1.50" # Peer IP
PORT = 6009 # Formation UDP port
DELAY_MS = 180 if ROLE == "B" else 0 # Follower delay to maintain spacing
u = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create UDP socket
u.bind(("0.0.0.0", PORT)) # Bind local port
u.settimeout(0.1) # Short timeout
print("FORM:SOCKET_READY", PORT) # Print socket ready
def send_step(tag): # Helper to send a formation step
u.sendto((ROLE + ":" + tag).encode(), (PEER_IP, PORT)) # Send encoded tag
print("FORM:SEND", tag) # Print step tag
def do_step(tag): # Execute a local formation step
if DELAY_MS > 0: # If we need to delay (follower)
time.sleep(DELAY_MS / 1000.0) # Apply follower delay
print("MOVE:", tag) # Print move tag (placeholder for motor action)
time.sleep(0.25) # Simulate step duration
print("MOTORS:STOP") # Print stop
if ROLE == "A": # Leader orchestrates
send_step("FORM_LINE") # Announce line formation
for tag in ["FWD", "FWD", "LEFT", "FWD"]: # Simple line sequence
send_step(tag) # Send step to follower
do_step(tag) # Perform step locally
send_step("FORM_V") # Announce V formation
for tag in ["RIGHT", "FWD", "FWD"]: # Simple V sequence
send_step(tag) # Send step
do_step(tag) # Perform step
else: # Follower mirrors steps
try: # Try receiving formation announcements and steps
for _ in range(10): # Read several steps
data, addr = u.recvfrom(64) # Receive bytes
t = data.decode() # Decode to string
print("FORM:RECV", t) # Print received tag
if "FORM_LINE" in t or "FORM_V" in t: # If a formation change
print("FORM:MODE", t.split(":")[-1]) # Print mode
elif any(x in t for x in ["FWD", "LEFT", "RIGHT"]): # If motion step
do_step(t.split(":")[-1]) # Execute step based on tag
time.sleep(0.05) # Small spacing
except OSError: # On timeout
print("FORM:NONE") # Print none
u.close() # Close UDP socket
Reflection: A tiny delay keeps shapes tight—leader sets the rhythm, follower echoes the dance.
Challenge:
- Easy: Add “BACK” steps and test spacing.
- Harder: Compute DELAY_MS from “SPEED:LOW/MID/HIGH” printed by the leader.
Microproject 6.5.5 – Cooperative problem solving
Goal: If Leader reports failure, Helper takes the task and confirms completion.
Blocks used:
- Tags: FAIL, TAKEOVER, DONE, CONFIRM.
- Branching: If FAIL → Helper acts; Leader waits for CONFIRM.
MicroPython code:
import socket # Import socket for cooperative problem solving
import time # Import time for waits
ROLE = "A" # Set role ("A" Leader, "B" Helper)
PEER_IP = "192.168.1.51" if ROLE == "A" else "192.168.1.50" # Peer IP
PORT = 6010 # Cooperation UDP port
u = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create UDP socket
u.bind(("0.0.0.0", PORT)) # Bind local port
u.settimeout(0.2) # Short timeout
print("COOP:SOCKET_READY", PORT) # Print socket ready
def send(tag): # Helper to send tags
u.sendto((ROLE + ":" + tag).encode(), (PEER_IP, PORT)) # Send encoded tag
print("COOP:SEND", tag) # Print sent tag
def do_task(): # Simulate doing a task
print("TASK:RUN") # Print task start
time.sleep(0.4) # Simulate duration
print("TASK:DONE") # Print task done
if ROLE == "A": # Leader path
send("FAIL") # Report failure to trigger helper takeover
# Wait for helper to confirm
confirmed = False # Track confirmation
start = time.ticks_ms() # Record start time
while time.ticks_diff(time.ticks_ms(), start) < 3000: # Wait up to 3 s
try: # Try receiving
data, addr = u.recvfrom(128) # Receive bytes
text = data.decode() # Decode to string
print("COOP:RECV", text) # Print received tag
if "CONFIRM" in text: # If helper confirmed
confirmed = True # Mark confirmed
break # Exit wait
except OSError: # Timeout
pass # Continue waiting
print("COOP:STATUS", "OK" if confirmed else "NO_CONFIRM") # Print status
else: # Helper path
try: # Try receiving fail notice
data, addr = u.recvfrom(128) # Receive bytes
text = data.decode() # Decode to string
print("COOP:RECV", text) # Print message
if "FAIL" in text: # If leader failed
send("TAKEOVER") # Announce takeover
do_task() # Perform task
send("CONFIRM") # Confirm completion
except OSError: # Timeout on receive
print("COOP:NONE") # Print none
u.close() # Close UDP socket
Reflection: Failure is part of teamwork—announce it fast, let your partner rescue the mission, and confirm the win.
Challenge:
- Easy: Add a “RETRY” before “FAIL” with a short timeout.
- Harder: Add “TAKEOVER_DENIED” if the helper is busy and escalate to teacher.
Main project – Two cooperative robots
Blocks steps (with glossary)
- WiFi UDP protocol: HELLO/PING/PONG + TASK/RESERVE/FORM/FAIL message tags.
- Roles and priorities: Leader (“A”), Helper (“B”), and a simple lane‑reservation rule.
- Coordinated tasks: REQUEST → ACK → GO movement steps, with clean stops.
- Formations: Leader broadcasts steps; follower delays and mirrors.
- Cooperation: On FAIL, helper takes over and confirms.
MicroPython code (mirroring blocks)
# Project 6.5 – Two Cooperative Robots
import network # Import network to use WiFi
import socket # Import socket to use UDP messaging
import time # Import time to pace loops
SSID = "YourSSID" # Put your WiFi SSID here
PASSWORD = "YourPassword" # Put your WiFi password here
ROLE = "A" # Choose "A" (Leader) or "B" (Helper)
PEER_IP = "192.168.1.51" if ROLE == "A" else "192.168.1.50" # Set peer IP by role
PORT_PROTO = 6100 # Shared UDP port for protocol messages
PORT_FORM = 6101 # UDP port for formation steps
PORT_RES = 6102 # UDP port for lane reservations
DELAY_MS = 180 if ROLE == "B" else 0 # Follower delay in formation
wlan = network.WLAN(network.STA_IF) # Create WiFi station interface
wlan.active(True) # Activate WiFi
wlan.connect(SSID, PASSWORD) # Connect using credentials
print("NET:CONNECTING", SSID) # Print SSID to verify connection attempt
while not wlan.isconnected(): # Wait for WiFi connection
time.sleep(0.2) # Short delay
print("NET:CONNECTED", wlan.ifconfig()[0]) # Print local IP
u_proto = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create protocol UDP socket
u_proto.bind(("0.0.0.0", PORT_PROTO)) # Bind protocol socket
u_proto.settimeout(0.2) # Short timeout to keep loop responsive
print("SOCK:PROTO_READY", PORT_PROTO) # Print protocol socket ready
u_form = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create formation UDP socket
u_form.bind(("0.0.0.0", PORT_FORM)) # Bind formation socket
u_form.settimeout(0.2) # Short timeout
print("SOCK:FORM_READY", PORT_FORM) # Print formation socket ready
u_res = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Create reservation UDP socket
u_res.bind(("0.0.0.0", PORT_RES)) # Bind reservation socket
u_res.settimeout(0.2) # Short timeout
print("SOCK:RES_READY", PORT_RES) # Print reservation socket ready
lane_reserved = False # Track local lane reservation
formation_mode = "LINE" # Start formation mode as LINE
last_heartbeat = time.ticks_ms() # Track last heartbeat time
def send(sock, tag): # Helper to send a tag on a socket
sock.sendto((ROLE + ":" + tag).encode(), (PEER_IP, sock.getsockname()[1])) # Send encoded tag to peer and same port
print("SEND", sock.getsockname()[1], tag) # Print send info
def do_step(tag): # Perform a motion step (placeholder)
if DELAY_MS > 0: # If follower delay is needed
time.sleep(DELAY_MS / 1000.0) # Apply follower delay
print("MOVE:", tag) # Print movement tag
time.sleep(0.25) # Simulate step duration
print("MOTORS:STOP") # Print stop
def try_reserve(): # Attempt lane reservation
global lane_reserved # Use global lane state
send(u_res, "RESERVE_LANE") # Send reservation tag
start = time.ticks_ms() # Record start time
conflict = False # Track conflict
while time.ticks_diff(time.ticks_ms(), start) < 300: # Wait up to 300 ms
try: # Try receive conflict
data, addr = u_res.recvfrom(128) # Receive bytes
text = data.decode() # Decode text
print("RES:RECV", text) # Print reservation message
if ("RESERVE_LANE" in text) and (ROLE == "B"): # If both reserve and I am B
conflict = True # Mark conflict
break # Exit wait
except OSError: # Timeout
pass # Keep waiting
if conflict: # If conflict detected
print("RES:YEILD_B") # Print yield
lane_reserved = False # Not reserved
return False # Fail
lane_reserved = True # Mark reserved
print("RES:OK") # Print success
return True # Success
def release_res(): # Release lane reservation
global lane_reserved # Use global lane state
lane_reserved = False # Clear reservation
send(u_res, "RELEASE_LANE") # Notify release
print("RES:RELEASED") # Print release
def leader_task(): # Leader runs coordinated carry task
send(u_proto, "TASK_REQUEST") # Ask helper to assist
acked = False # Track acknowledgement
start = time.ticks_ms() # Record start time
while time.ticks_diff(time.ticks_ms(), start) < 2000: # Wait up to 2 s
try: # Try receive ACK
data, addr = u_proto.recvfrom(128) # Receive bytes
text = data.decode() # Decode to string
print("RECV", text) # Print received tag
if "TASK_ACK" in text: # If helper acknowledged
acked = True # Mark ACK
break # Exit wait
except OSError: # Timeout
pass # Keep waiting
if not acked: # If no ACK
print("TASK:HELPER_MISSING") # Report missing helper
return # Quit task
send(u_proto, "TASK_GO") # Send go signal
if try_reserve(): # Try to reserve lane
do_step("FWD") # Move forward step
release_res() # Release reservation
else: # Reservation failed
print("TASK:WAIT_RES") # Print wait
def helper_task(): # Helper waits for request and cooperates
try: # Try receive leader request
data, addr = u_proto.recvfrom(128) # Receive bytes
text = data.decode() # Decode text
print("RECV", text) # Print received tag
if "TASK_REQUEST" in text: # If leader requested
send(u_proto, "TASK_ACK") # Send acknowledgement
# Wait for GO
start = time.ticks_ms() # Record start time
go = False # Track GO
while time.ticks_diff(time.ticks_ms(), start) < 2000: # Wait up to 2 s
try: # Try receive GO
d2, a2 = u_proto.recvfrom(128) # Receive bytes
t2 = d2.decode() # Decode text
print("RECV", t2) # Print tag
if "TASK_GO" in t2: # If go received
go = True # Mark go
break # Exit wait
except OSError: # Timeout
pass # Keep waiting
if go: # If go received
if try_reserve(): # Try to reserve lane
do_step("FWD") # Move forward step
release_res() # Release reservation
else: # Reservation failed
print("TASK:HELPER_WAIT") # Print wait
except OSError: # No request
pass # Do nothing
def formation_leader(): # Leader broadcasts formation steps
send(u_form, "FORM_LINE") # Announce line formation
for tag in ["FWD", "FWD", "LEFT", "FWD"]: # Line sequence
send(u_form, tag) # Send step
do_step(tag) # Perform step
send(u_form, "FORM_V") # Announce V formation
for tag in ["RIGHT", "FWD", "FWD"]: # V sequence
send(u_form, tag) # Send step
do_step(tag) # Perform step
def formation_follower(): # Follower mirrors leader steps
try: # Try receive multiple steps
for _ in range(12): # Read several messages
data, addr = u_form.recvfrom(64) # Receive bytes
t = data.decode() # Decode text
print("FORM:RECV", t) # Print received tag
if "FORM_LINE" in t: # If line formation
formation_mode = "LINE" # Set mode (local variable shadow)
print("FORM:MODE LINE") # Print mode
elif "FORM_V" in t: # If V formation
formation_mode = "V" # Set mode (local variable shadow)
print("FORM:MODE V") # Print mode
elif any(x in t for x in ["FWD", "LEFT", "RIGHT"]): # Motion step
do_step(t.split(":")[-1]) # Perform step
time.sleep(0.05) # Small spacing
except OSError: # Timeout
pass # Do nothing
def cooperate_on_fail(): # Handle failure and takeover
if ROLE == "A": # Leader failing
send(u_proto, "FAIL") # Report failure
start = time.ticks_ms() # Record start
confirmed = False # Track confirmation
while time.ticks_diff(time.ticks_ms(), start) < 3000: # Wait up to 3 s
try: # Try receive confirm
data, addr = u_proto.recvfrom(128) # Receive bytes
text = data.decode() # Decode text
print("RECV", text) # Print tag
if "CONFIRM" in text: # If helper confirmed
confirmed = True # Mark confirm
break # Exit wait
except OSError: # Timeout
pass # Keep waiting
print("COOP:STATUS", "OK" if confirmed else "NO_CONFIRM") # Print status
else: # Helper taking over
try: # Try receive FAIL
data, addr = u_proto.recvfrom(128) # Receive bytes
text = data.decode() # Decode text
print("RECV", text) # Print tag
if "FAIL" in text: # If leader failed
send(u_proto, "TAKEOVER") # Announce takeover
do_step("FWD") # Perform a simple recovery step
send(u_proto, "CONFIRM") # Confirm completion
except OSError: # Timeout
pass # Do nothing
# Initial hello exchange
u_proto.sendto((ROLE + ":HELLO").encode(), (PEER_IP, PORT_PROTO)) # Send HELLO
print("PROTO:HELLO_SENT") # Print sent
# Main loop: small demo cycle of task → formation → cooperation
while True: # Continuous coordination loop
now = time.ticks_ms() # Current ms time
if ROLE == "A": # Leader cycle
leader_task() # Run coordinated task
formation_leader() # Run formation broadcast
cooperate_on_fail() # Test fail → helper takeover
else: # Helper cycle
helper_task() # Wait and assist leader task
formation_follower() # Mirror formation
cooperate_on_fail() # Respond to fail and confirm
if time.ticks_diff(time.ticks_ms(), last_heartbeat) > 2000: # Every 2 s
u_proto.sendto((ROLE + ":HB").encode(), (PEER_IP, PORT_PROTO)) # Send heartbeat
print("HB:SENT") # Print heartbeat
last_heartbeat = time.ticks_ms() # Update heartbeat
time.sleep(0.1) # Small delay to keep loop responsive
External explanation
- What it teaches: You built a tiny team protocol: handshake, reserve a lane, coordinate steps, hold formations, and recover from failure with confirmations.
- Why it works: UDP is fast for short tags; a leader/follower model simplifies timing; a simple reservation rule avoids collisions; labeled prints let you see the teamwork happen.
- Key concept: “Talk → agree → move → avoid → confirm.”
Story time
Two bots roll out like a duo: one asks, the other nods. They glide in formation, wait their turn at the lane, and if something breaks, the partner steps in. Small messages; big teamwork.
Debugging (2)
Debugging 6.5.1 – Collisions between robots
Problem: Both robots move at once and meet in the shared lane.
Clues: “RESERVE_LANE” prints from both; no one yields.
Broken code:
send(u_res, "RESERVE_LANE") # Both robots send without priority
lane_reserved = True # Immediately set reserved (unsafe)
Fixed code:
if ("RESERVE_LANE" in text) and (ROLE == "B"): # Detect conflict
conflict = True # Mark conflict
# Only set lane_reserved = True when no conflict after waiting window
Why it works: Waiting a short window and enforcing a yield rule prevents simultaneous movement.
Avoid next time: Always add a conflict check before granting reservation.
Debugging 6.5.2 – Lack of coordination in tasks
Problem: Helper moves before the leader; object transport fails.
Clues: No “TASK_GO” received, but follower moved.
Broken code:
send(u_proto, "TASK_ACK") # Helper ack
do_step("FWD") # Moves immediately without GO
Fixed code:
# Wait for TASK_GO in a timed loop
if go: # Only move when GO arrives
do_step("FWD") # Perform synchronized step
Why it works: REQUEST → ACK → GO keeps both robots synchronized and reduces mistakes.
Avoid next time: Require GO before movement, and add a timeout so you don’t wait forever.
Final checklist
- UDP HELLO/PING/PONG works between robots
- REQUEST → ACK → GO steps move in sync
- Lane reservation prevents simultaneous movement in shared space
- Formation (LINE and V) runs with leader broadcast and follower delay
- FAIL triggers takeover and CONFIRM returns cleanly
Extras
- 🧠 Student tip: Write your own tag table on a card (TASK_*, RESERVE_*, FORM_*, COOP_*). Keep it consistent.
- 🧑🏫 Instructor tip: Pair students and let them swap IPs. Require a short “protocol design” before coding.
- 📖 Glossary:
- Reservation: A pre‑move claim that prevents conflicts.
- Formation: Coordinated movement pattern with leader steps.
- Takeover: Helper continues the task when the leader fails.
- 💡 Mini tips:
- Keep tags short and uppercase; parse with simple “in text” checks.
- Use small timeouts (200–400 ms) to keep loops responsive.
- Log heartbeats to prove the link stays alive during long demos.