Components and supplies
Portenta H7
Portenta Vision Shield - LoRa®
Portenta Vision Shield - Ethernet
Apps and platforms
OpenMV IDE
Project description
Code
Face Ban (Ethernet)
python
The Ethernet version of the face ban script.
1""" 2Automatic Face Ban (Ethernet Version) 3Name: portenta_faceban_ethernet.py 4Purpose: 5 This script demonstrates how to automatically censor (ban) faces in 6 a live video stream for privacy protection using an Ethernet connection. 7 Using TensorFlow Lite FOMO (Fast Object Detection Model) on a 8 Portenta H7 + Portenta Vision Shield, faces are detected in real time 9 and an overlay is used to obscure them before streaming via MJPEG over HTTP. 10 11 By enlarging the overlay and keeping it on-screen for a short hold time 12 (even if the face momentarily disappears), this solution helps ensure 13 that no identifiable faces are exposed, which is crucial for compliance 14 with privacy regulations in environments like schools, hospitals, 15 or public buildings. 16 17Author: Arduino Product Experience Team 18Version: 1.0 (27/01/25) 19 20Key Features: 21 - MJPEG video streaming over HTTP for real-time viewing. 22 - Automatic face detection and overlay (face ban/censorship). 23 - Configurable hold time to keep obscuring the face after detection loss. 24 - Ideal for IoT-edge deployments where on-device processing is required. 25""" 26 27import sensor 28import time 29import image 30import network 31import socket 32import ml 33from ml.utils import NMS 34import math 35 36# ------------------------------------------------------------------------------ 37# User Configuration 38# ------------------------------------------------------------------------------ 39HOST = "" # Binds to the first available interface 40PORT = 8080 41 42# ------------------------------------------------------------------------------ 43# Detection Parameters 44# ------------------------------------------------------------------------------ 45MIN_CONFIDENCE = 0.4 # Minimum confidence threshold for faces 46ENLARGEMENT_FACTOR = 3.5 # Scale factor to enlarge the overlay over the face 47HOLD_TIME_MS = 1500 # How long (ms) to keep overlay after losing detection 48 49# ------------------------------------------------------------------------------ 50# Camera Setup 51# ------------------------------------------------------------------------------ 52sensor.reset() 53sensor.set_pixformat(sensor.GRAYSCALE) 54sensor.set_framesize(sensor.QVGA) 55sensor.set_windowing((240, 240)) # Typical FOMO model input size (240x240) 56sensor.skip_frames(time=2000) # Give the sensor time to stabilize 57 58# ------------------------------------------------------------------------------ 59# Model and Overlay Loading 60# ------------------------------------------------------------------------------ 61# Load the FOMO face detection model 62model = ml.Model("fomo_face_detection") 63print("Loaded FOMO Model:", model) 64 65# Load a 1-bit PBM overlay image to obscure faces 66face_image = image.Image("/face.pbm", copy_to_fb=False) 67print("Loaded Overlay Image: face.pbm") 68 69# Convert the minimum confidence to 0..255 for find_blobs() 70threshold_list = [(math.ceil(MIN_CONFIDENCE * 255), 255)] 71 72# ------------------------------------------------------------------------------ 73# FOMO Post-Processing Function 74# ------------------------------------------------------------------------------ 75def fomo_post_process(model, inputs, outputs): 76 """ 77 Callback function to post-process the FOMO model output. 78 79 - Converts each output channel into a grayscale image (scaled from 0..1 to 0..255). 80 - Finds blobs that exceed the threshold. 81 - Applies Non-Maximum Suppression (NMS) to refine overlapping bounding boxes. 82 83 Args: 84 model: The loaded FOMO (TensorFlow Lite) model. 85 inputs: The input image or ROI for detection. 86 outputs: The raw outputs from the model inference. 87 88 Returns: 89 A list of bounding boxes after NMS in the form: 90 [ 91 [ (x1, y1, x2, y2), score ], # for each detection of a given class 92 ... 93 ] 94 across all classes. 95 """ 96 n, oh, ow, oc = model.output_shape[0] 97 98 # NMS object to consolidate overlapping boxes 99 nms = NMS(ow, oh, inputs[0].roi) 100 101 # Each channel corresponds to a class. For a single face class, oc=1..2, etc. 102 for i in range(oc): 103 prob_map = image.Image(outputs[0][0, :, :, i] * 255) 104 105 # Find blobs above our threshold 106 blobs = prob_map.find_blobs( 107 threshold_list, 108 x_stride=1, 109 area_threshold=1, 110 pixels_threshold=1 111 ) 112 113 # For each blob, compute a score and store in NMS 114 for b in blobs: 115 x, y, w, h = b.rect() 116 score = prob_map.get_statistics( 117 thresholds=threshold_list, 118 roi=b.rect() 119 ).l_mean() / 255.0 120 nms.add_bounding_box(x, y, x + w, y + h, score, i) 121 122 return nms.get_bounding_boxes() 123 124# ------------------------------------------------------------------------------ 125# Ethernet Setup 126# ------------------------------------------------------------------------------ 127print("- Initializing Ethernet LAN interface...") 128eth = network.LAN() # Create a LAN object 129eth.active(True) # Activate it 130 131# By default, we request a DHCP lease 132eth.ifconfig('dhcp') 133 134# Wait until connected 135while not eth.isconnected(): 136 print("- Waiting for Ethernet connection...") 137 time.sleep_ms(1000) 138 139print("- Ethernet connected! IP information: ", eth.ifconfig()) 140 141# ------------------------------------------------------------------------------ 142# Create a TCP Server Socket 143# ------------------------------------------------------------------------------ 144server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 145server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True) 146server_socket.bind([HOST, PORT]) 147server_socket.listen(5) 148server_socket.setblocking(True) 149 150print(f"Listening on port {PORT}...") 151 152# ------------------------------------------------------------------------------ 153# Global Variables for Overlay Persistence 154# ------------------------------------------------------------------------------ 155last_detections = [] # Stores the most recent bounding boxes 156last_detection_timestamp = 0 # Records the time we last saw a face 157 158# ------------------------------------------------------------------------------ 159# MJPEG Streaming Function 160# ------------------------------------------------------------------------------ 161def start_streaming(sock): 162 """ 163 Sets up MJPEG streaming over an HTTP connection. 164 165 - Waits for a client to connect. 166 - Sends the proper multipart/x-mixed-replace HTTP headers. 167 - Continuously captures frames, performs face detection, and sends JPEG frames. 168 169 The function also manages overlay hold time. If no new detection is found, 170 but the hold time has not expired, the last detected faces remain visible. 171 172 Args: 173 sock: A listening socket that accepts incoming connections. 174 """ 175 global last_detections, last_detection_timestamp 176 177 print("Waiting for an incoming client...") 178 client, addr = sock.accept() 179 client.settimeout(5.0) 180 print(f"Client connected from {addr[0]}:{addr[1]}") 181 182 # Read the initial client request (not used), just to clear the buffer 183 client.recv(1024) 184 185 # Send HTTP headers for MJPEG 186 client.sendall( 187 "HTTP/1.1 200 OK\r\n" 188 "Server: OpenMV\r\n" 189 "Content-Type: multipart/x-mixed-replace;boundary=openmv\r\n" 190 "Cache-Control: no-cache\r\n" 191 "Pragma: no-cache\r\n\r\n" 192 ) 193 194 clock = time.clock() 195 196 while True: 197 clock.tick() 198 img = sensor.snapshot() 199 200 # Run inference on the image 201 detections_by_class = model.predict([img], callback=fomo_post_process) 202 current_detections = [] 203 204 # Combine detections from relevant classes (skip class 0 if background) 205 for class_idx, detection_list in enumerate(detections_by_class): 206 if class_idx == 0: 207 continue # skip background if model includes it 208 209 # For each detection, scale and center the overlay 210 for (x, y, w, h), score in detection_list: 211 new_w = w * ENLARGEMENT_FACTOR 212 new_h = h * ENLARGEMENT_FACTOR 213 214 scale_x = new_w / face_image.width() 215 scale_y = new_h / face_image.height() 216 217 x_new = x - int((new_w - w) / 2) 218 y_new = y - int((new_h - h) / 2) 219 220 current_detections.append((x_new, y_new, scale_x, scale_y)) 221 222 # If we found faces this frame, update the global info 223 if current_detections: 224 last_detections = current_detections 225 last_detection_timestamp = time.ticks_ms() 226 else: 227 # No new detections; check if hold time has expired 228 elapsed = time.ticks_ms() - last_detection_timestamp 229 if elapsed < HOLD_TIME_MS: 230 # Keep using old detections 231 current_detections = last_detections 232 else: 233 # If it's been too long, clear them 234 current_detections = [] 235 236 # Draw the overlay (either fresh or held) 237 for (x_draw, y_draw, s_x, s_y) in current_detections: 238 img.draw_image(face_image, x_draw, y_draw, 239 x_scale=s_x, y_scale=s_y) 240 241 # Convert the processed frame to JPEG for streaming 242 cframe = img.to_jpeg(quality=35, copy=True) 243 header = ( 244 "\r\n--openmv\r\n" 245 "Content-Type: image/jpeg\r\n" 246 f"Content-Length:{cframe.size()}\r\n\r\n" 247 ) 248 249 # Send the multipart image to the client 250 client.sendall(header) 251 client.sendall(cframe) 252 253 # Print FPS to the console (optional) 254 print(f"FPS: {clock.fps()}") 255 256# ------------------------------------------------------------------------------ 257# Main Loop 258# ------------------------------------------------------------------------------ 259while True: 260 try: 261 # Start streaming on our server socket 262 start_streaming(server_socket) 263 except OSError as e: 264 # If client disconnects or other socket errors, go back and wait again 265 print("Socket error:", e)
Face Ban (Wi-Fi)
python
The Wi-Fi version of the face ban script.
1""" 2Automatic Face Ban (Wi-Fi Version) 3Name: portenta_faceban_wifi.py 4Purpose: 5 This script demonstrates how to automatically censor (ban) faces in 6 a live video stream for privacy protection using an Wi-Fi connection. 7 Using TensorFlow Lite FOMO (Fast Object Detection Model) on a 8 Portenta H7 + Portenta Vision Shield, faces are detected in real time 9 and an overlay is used to obscure them before streaming via MJPEG over HTTP. 10 11 By enlarging the overlay and keeping it on-screen for a short hold time 12 (even if the face momentarily disappears), this solution helps ensure 13 that no identifiable faces are exposed, which is crucial for compliance 14 with privacy regulations in environments like schools, hospitals, 15 or public buildings. 16 17Author: Arduino Product Experience Team 18Version: 1.0 (27/01/25) 19 20Key Features: 21 - MJPEG video streaming over HTTP for real-time viewing. 22 - Automatic face detection and overlay (face ban/censorship). 23 - Configurable hold time to keep obscuring the face after detection loss. 24 - Ideal for IoT-edge deployments where on-device processing is required. 25""" 26 27import sensor 28import time 29import image 30import network 31import socket 32import ml 33from ml.utils import NMS 34import math 35 36# ------------------------------------------------------------------------------ 37# User Configuration 38# ------------------------------------------------------------------------------ 39SSID = "YOUR_SSID" # Wi-Fi SSID 40KEY = "YOUR_KEY" # Wi-Fi Password 41HOST = "" # Use the first available interface 42PORT = 8080 # TCP Port for the HTTP server 43 44# ------------------------------------------------------------------------------ 45# Detection Parameters 46# ------------------------------------------------------------------------------ 47MIN_CONFIDENCE = 0.4 # Minimum confidence threshold for detections 48ENLARGEMENT_FACTOR = 3.5 # Scale factor for the overlay relative to the detected face 49HOLD_TIME_MS = 1500 # Time (ms) to keep overlay after last detection 50 51# ------------------------------------------------------------------------------ 52# Camera Setup 53# ------------------------------------------------------------------------------ 54sensor.reset() 55sensor.set_pixformat(sensor.GRAYSCALE) 56sensor.set_framesize(sensor.QVGA) 57sensor.set_windowing((240, 240)) # Common FOMO model input size 58sensor.skip_frames(time=2000) # Wait for the sensor to stabilize 59 60# ------------------------------------------------------------------------------ 61# Model and Overlay Loading 62# ------------------------------------------------------------------------------ 63# Load the FOMO face detection model 64model = ml.Model("fomo_face_detection") 65print("- Loaded FOMO model:", model) 66 67# Load the overlay image (1-bit PBM) 68face_image = image.Image("/face.pbm", copy_to_fb=False) 69print("- Loaded overlay image: face.pbm") 70 71# Prepare threshold based on minimum confidence 72threshold_list = [(math.ceil(MIN_CONFIDENCE * 255), 255)] 73 74# ------------------------------------------------------------------------------ 75# FOMO Post-processing Function 76# ------------------------------------------------------------------------------ 77def fomo_post_process(model, inputs, outputs): 78 """ 79 Callback function to post-process the FOMO model output. 80 81 - Converts each output channel into a grayscale image (scaled from 0..1 to 0..255). 82 - Finds blobs that exceed the threshold. 83 - Applies Non-Maximum Suppression (NMS) to refine overlapping bounding boxes. 84 85 Args: 86 model: The loaded FOMO (TensorFlow Lite) model. 87 inputs: The input image or ROI for detection. 88 outputs: The raw outputs from the model inference. 89 90 Returns: 91 A list of bounding boxes after NMS in the form: 92 [ 93 [ (x1, y1, x2, y2), score ], # for each detection of a given class 94 ... 95 ] 96 across all classes. 97 """ 98 n, oh, ow, oc = model.output_shape[0] 99 100 # Instantiate an NMS object with the output dims and ROI 101 nms = NMS(ow, oh, inputs[0].roi) 102 103 # For each class channel, detect blobs 104 for i in range(oc): 105 # Scale model output from [0.0..1.0] to [0..255] 106 prob_map = image.Image(outputs[0][0, :, :, i] * 255) 107 108 # Locate blobs above the threshold 109 blobs = prob_map.find_blobs( 110 threshold_list, 111 x_stride=1, 112 area_threshold=1, 113 pixels_threshold=1 114 ) 115 116 # For each blob, calculate a score and store the bounding box 117 for b in blobs: 118 x, y, w, h = b.rect() 119 score = prob_map.get_statistics( 120 thresholds=threshold_list, 121 roi=b.rect() 122 ).l_mean() / 255.0 123 nms.add_bounding_box(x, y, x + w, y + h, score, i) 124 125 return nms.get_bounding_boxes() 126 127# ------------------------------------------------------------------------------ 128# Wi-Fi Setup 129# ------------------------------------------------------------------------------ 130wlan = network.WLAN(network.STA_IF) 131wlan.active(True) 132wlan.connect(SSID, KEY) 133 134while not wlan.isconnected(): 135 print(f"- Trying to connect to \"{SSID}\"...") 136 time.sleep_ms(1000) 137 138print("- Wi-Fi connected! IP information:", wlan.ifconfig()) 139 140# ------------------------------------------------------------------------------ 141# Socket Server Setup 142# ------------------------------------------------------------------------------ 143server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 144server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True) 145server_socket.bind([HOST, PORT]) 146server_socket.listen(5) 147server_socket.setblocking(True) 148 149# ------------------------------------------------------------------------------ 150# Global Variables for Holding Detections 151# ------------------------------------------------------------------------------ 152last_detections = [] 153last_detection_timestamp = 0 154 155# ------------------------------------------------------------------------------ 156# Streaming Function 157# ------------------------------------------------------------------------------ 158def start_streaming(sock): 159 """ 160 Sets up MJPEG streaming over an HTTP connection. 161 162 - Waits for a client to connect. 163 - Sends the proper multipart/x-mixed-replace HTTP headers. 164 - Continuously captures frames, performs face detection, and sends JPEG frames. 165 166 The function also manages overlay hold time. If no new detection is found, 167 but the hold time has not expired, the last detected faces remain visible. 168 169 Args: 170 sock: A listening socket that accepts incoming connections. 171 """ 172 global last_detections, last_detection_timestamp 173 174 print("- Waiting for connections...") 175 client, addr = sock.accept() 176 client.settimeout(5.0) 177 print(f"- Connected to {addr[0]}:{addr[1]}") 178 179 # Read initial client request (not used) to clear the buffer 180 client.recv(1024) 181 182 # Send HTTP headers for MJPEG streaming 183 client.sendall( 184 "HTTP/1.1 200 OK\r\n" 185 "Server: OpenMV\r\n" 186 "Content-Type: multipart/x-mixed-replace;boundary=openmv\r\n" 187 "Cache-Control: no-cache\r\n" 188 "Pragma: no-cache\r\n\r\n" 189 ) 190 191 # Create a clock to measure FPS (optional) 192 clock = time.clock() 193 194 while True: 195 clock.tick() 196 img = sensor.snapshot() 197 198 # Perform face detection via the FOMO model 199 detections_by_class = model.predict([img], callback=fomo_post_process) 200 current_detections = [] 201 202 # Gather detections from classes > 0 (assuming 0 is background) 203 for class_idx, detection_list in enumerate(detections_by_class): 204 if class_idx == 0: 205 continue # skip background 206 for (x, y, w, h), score in detection_list: 207 # Calculate enlarged overlay dimensions 208 new_w = w * ENLARGEMENT_FACTOR 209 new_h = h * ENLARGEMENT_FACTOR 210 211 # Determine scaling factors for face_image 212 scale_x = new_w / face_image.width() 213 scale_y = new_h / face_image.height() 214 215 # Shift x,y so the overlay remains centered on the face 216 x_new = x - int((new_w - w) / 2) 217 y_new = y - int((new_h - h) / 2) 218 219 # Store detection info for drawing 220 current_detections.append((x_new, y_new, scale_x, scale_y)) 221 222 # Update or retain previous detections 223 if current_detections: 224 last_detections = current_detections 225 last_detection_timestamp = time.ticks_ms() 226 else: 227 # If no new detections, check if we should hold the previous ones 228 elapsed = time.ticks_ms() - last_detection_timestamp 229 if elapsed < HOLD_TIME_MS: 230 current_detections = last_detections 231 else: 232 current_detections = [] 233 234 # Draw overlays (fresh or held) on the image 235 for (x_draw, y_draw, s_x, s_y) in current_detections: 236 img.draw_image(face_image, x_draw, y_draw, 237 x_scale=s_x, 238 y_scale=s_y) 239 240 # Convert the final frame to JPEG 241 cframe = img.to_jpeg(quality=35, copy=True) 242 header = ( 243 "\r\n--openmv\r\n" 244 "Content-Type: image/jpeg\r\n" 245 f"Content-Length:{cframe.size()}\r\n\r\n" 246 ) 247 248 # Send the multipart frame 249 client.sendall(header) 250 client.sendall(cframe) 251 252 # Debug: Print FPS 253 print(f"- FPS: {clock.fps()}") 254 255# ------------------------------------------------------------------------------ 256# Main Loop 257# ------------------------------------------------------------------------------ 258while True: 259 try: 260 start_streaming(server_socket) 261 except OSError as e: 262 # If the connection fails or is dropped, try again 263 print("- Socket error: ", e)
Downloadable files
face.pbm
This is the image used for overlay.
face.pbm
Documentation
Portenta Vision Shield User Manual
Learn how to set up, configure and use the shield.
https://docs.arduino.cc/tutorials/portenta-vision-shield/user-manual
Comments
Only logged in users can leave comments