#!/usr/bin/env python3
# Stereo Camera based on the Picamera2 library.
# The Raspberry Pi 5 has two CSI ports for cameras.
# A shutter button needs to be connected to a configurable GPIO pin.
# Preview streams are accessible via http.
# Picamera2 Manual:
# https://datasheets.raspberrypi.com/camera/picamera2-manual.pdf
# Picamera2 Source:
# https://github.com/raspberrypi/picamera2
# Picamera2 Example:
# https://github.com/raspberrypi/picamera2/blob/main/examples/mjpeg_server_2.py
# Raspberry Pi GPIO Pinout:
# https://pinout.xyz/
# RPi.GPIO Python Module
# https://sourceforge.net/p/raspberry-gpio-python/wiki/Home/
# RPi GPIO binding for Lua
# https://tieske.github.io/rpi-gpio/modules/GPIO.html
# To automatically start this script on boot, define a systemd service:
# /etc/systemd/system/picam2_stereo.service
#
# [Unit]
# Description=Picamera2 Stereo Camera
#
# [Service]
# ExecStart=/usr/local/bin/picam2_stereo.py
#
# [Install]
# WantedBy=multi-user.target
__author__ = "Gernot Walzl"
__date__ = "2025-12-30"
import sys
import time
import os
from datetime import datetime
import base64
import signal
import shutil
import tarfile
import io
from threading import Thread, Lock, Condition
from configparser import ConfigParser
import logging
from logging.handlers import TimedRotatingFileHandler
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
import libcamera
from picamera2 import Picamera2
from picamera2.encoders import MJPEGEncoder
from picamera2.outputs import FileOutput
from RPi import GPIO
def load_config():
config = ConfigParser()
config.add_section('output')
config.set('output', 'path_rec', os.path.expanduser('~/cam/rec/'))
config.set('output', 'path_log', os.path.expanduser('~/cam/log/'))
config.add_section('gpio')
config.set('gpio', 'shutter_button', '22')
config.set('gpio', 'recording_led', '12')
config.add_section('camera')
config.set('camera', 'width', '4808')
config.set('camera', 'height', '2592')
config.set('camera', 'flip', '0')
config.set('camera', 'preview_downscale', '4')
config.set('camera', 'initial_delay', '5.0')
config.set('camera', 'delay', '0.5')
config.add_section('server')
config.set('server', 'port', '8000')
config.set('server', 'user', 'user')
config.set('server', 'pass', 'pass')
config.read(os.path.expanduser('~/.config/picam2_stereo.ini'))
return config
def init_logging(config):
logger = logging.getLogger()
logger.setLevel(logging.INFO)
loghandler = TimedRotatingFileHandler(
config.get('output', 'path_log') + 'picam2_stereo.log',
'midnight')
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s')
loghandler.setFormatter(formatter)
logger.addHandler(loghandler)
class StreamingOutput(io.BufferedIOBase):
def __init__(self):
super().__init__()
self.frame = None
self.condition = Condition()
def write(self, buf):
with self.condition:
self.frame = buf
self.condition.notify_all()
class StereoCamera:
def __init__(self, config):
self._logger = logging.getLogger(self.__class__.__name__)
self._path_output = config.get('output', 'path_rec')
self._width = config.getint('camera', 'width')
self._height = config.getint('camera', 'height')
self._flip = config.getint('camera', 'flip')
self._preview_downscale = config.getint('camera', 'preview_downscale')
self._initial_delay = config.getfloat('camera', 'initial_delay')
self._delay = config.getfloat('camera', 'delay')
self._capturing = False
self._lock_recording = Lock()
self._recording_to = None
self._counter_recorded = 0
self._picams = []
self._encoders_serving = []
self._outputs = []
self._locks_serving = []
self._counters_serving = []
for cam_info in Picamera2.global_camera_info():
self._picams.append(Picamera2(cam_info['Num']))
self._encoders_serving.append(None)
self._outputs.append(None)
self._locks_serving.append(Lock())
self._counters_serving.append(0)
def num_cams(self):
return len(self._picams)
def autofocus(self):
success = True
af_jobs = []
for picam2 in self._picams:
af_jobs.append(picam2.autofocus_cycle(wait=False))
for idx, picam2 in enumerate(self._picams):
success &= picam2.wait(af_jobs[idx])
self._logger.info("Autofocus success=%s", success)
return success
def record_files(self, filename="img_{:01d}.jpg"):
cf_jobs = []
for idx, picam2 in enumerate(self._picams):
cf_jobs.append(picam2.capture_file(
filename.format(idx), wait=False))
for idx, picam2 in enumerate(self._picams):
picam2.wait(cf_jobs[idx])
def start_capturing(self):
for idx, picam2 in enumerate(self._picams):
flip = (self._flip + idx) % 2
capture_config = picam2.create_still_configuration(
main={
'size': (self._width, self._height)
},
lores={
'size': (
int(self._width/self._preview_downscale),
int(self._height/self._preview_downscale)
)
},
transform=libcamera.Transform(hflip=flip, vflip=flip)
)
picam2.configure(capture_config)
self._logger.info("Start capturing")
self._capturing = True
for picam2 in self._picams:
picam2.start()
self.autofocus()
while self._capturing:
time_started = time.time()
if self._recording_to:
path_file = (
self._path_output + self._recording_to + "/img_" +
datetime.now().strftime("%Y-%m-%d_%H%M%S") +
"_{:03d}".format(self._counter_recorded) +
"_{:01d}.jpg")
self.record_files(path_file)
self._counter_recorded += 1
sleep_secs = self._delay - (time.time() - time_started)
if sleep_secs > 0.0:
time.sleep(sleep_secs)
if self._recording_to:
self.stop_recording()
for picam2 in self._picams:
picam2.stop()
self._logger.info("Capturing stopped")
def stop_capturing(self):
self._capturing = False
def is_recording(self):
return bool(self._recording_to)
def start_recording(self):
success = False
with self._lock_recording:
if not self._recording_to:
dir_recording = datetime.now().strftime("%Y-%m-%d_%H%M%S")
self._logger.info("Start recording to %s", dir_recording)
if not os.path.exists(self._path_output + dir_recording):
os.makedirs(self._path_output + dir_recording)
if os.path.isdir(self._path_output + dir_recording):
time.sleep(self._initial_delay)
self.autofocus()
self._counter_recorded = 0
self._recording_to = dir_recording
success = True
return success
def stop_recording(self):
success = False
with self._lock_recording:
if self._recording_to:
dir_recorded = self._recording_to
self._recording_to = None
self._logger.info(
"Recording stopped (recorded=%d)",
self._counter_recorded)
self._logger.info("Creating %s.tar", dir_recorded)
path_recorded = self._path_output + dir_recorded
with tarfile.open(path_recorded + ".tar", "w") as tar:
tar.add(path_recorded, arcname=dir_recorded)
shutil.rmtree(path_recorded)
success = True
return success
def start_serving(self, idx):
self._logger.info("Start serving (idx=%d)", idx)
with self._locks_serving[idx]:
if self._counters_serving[idx] == 0:
self._logger.info("Start MJPEG encoder (idx=%d)", idx)
self._encoders_serving[idx] = MJPEGEncoder()
self._outputs[idx] = StreamingOutput()
self._picams[idx].start_encoder(
self._encoders_serving[idx],
FileOutput(self._outputs[idx]),
name="lores")
self._counters_serving[idx] += 1
return self._outputs[idx]
def stop_serving(self, idx):
with self._locks_serving[idx]:
self._counters_serving[idx] -= 1
if self._counters_serving[idx] == 0:
self._picams[idx].stop_encoder(self._encoders_serving[idx])
self._encoders_serving[idx] = None
self._outputs[idx] = None
self._logger.info("MJPEG encoder (idx=%d) stopped", idx)
self._logger.info("Serving (idx=%d) stopped", idx)
class GPIOController:
def __init__(self, config, stereocam):
self._logger = logging.getLogger(self.__class__.__name__)
self._shutter_button = config.getint('gpio', 'shutter_button')
self._recording_led = config.getint('gpio', 'recording_led')
self._stereocam = stereocam
def __del__(self):
GPIO.cleanup()
def _set_recording_led(self, value):
GPIO.output(self._recording_led, bool(value))
def _on_shutter_button_pressed(self, channel):
self._logger.info("Shutter button pressed")
pwm = GPIO.PWM(self._recording_led, 1.0)
pwm.start(50.0)
if self._stereocam.is_recording():
self._stereocam.stop_recording()
else:
self._stereocam.start_recording()
pwm.stop()
self._set_recording_led(self._stereocam.is_recording())
def setup_gpio(self):
GPIO.setmode(GPIO.BCM)
GPIO.setup(self._shutter_button, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
GPIO.setup(self._recording_led, GPIO.OUT)
GPIO.add_event_detect(
self._shutter_button, GPIO.RISING,
callback=self._on_shutter_button_pressed)
class CameraHTTPRequestHandler(BaseHTTPRequestHandler):
def logger(self):
return logging.getLogger('HTTPRequestHandler')
def check_auth(self):
result = False
if (self.server.auth is None or
self.server.auth == self.headers.get('authorization')):
result = True
else:
self.send_response(401)
self.send_header('WWW-Authenticate', 'Basic')
self.end_headers()
return result
def send_index(self):
html = """<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8" />
<title>Stereo Camera</title>
</head>
<body>
"""
for idx in range(self.server.stereocam.num_cams()):
if idx > 0:
html += '<br />\n'
html += '<img src="/cam{:d}.mjpg" alt="cam{:d}" />'.format(idx, idx)
html += """
</body>
"""
content = html.encode('utf-8')
self.send_header('Content-Type', 'text/html')
self.send_header('Content-Length', len(content))
self.end_headers()
self.wfile.write(content)
def send_jpeg(self, output):
with output.condition:
output.condition.wait()
self.send_header('Content-Type', 'image/jpeg')
self.send_header('Content-Length', len(output.frame))
self.end_headers()
self.wfile.write(output.frame)
def is_valid_idx(self, chr_idx):
success = False
try:
idx = int(chr_idx)
if 0 <= idx and idx < self.server.stereocam.num_cams():
success = True
except ValueError:
success = False
return success
def do_GET(self):
if self.path == '/index.html':
if self.check_auth():
self.send_response(200)
self.send_index()
elif self.path[:4] == "/cam" and \
self.is_valid_idx(self.path[4]) and \
self.path[5:] == ".jpg":
if self.check_auth():
idx = int(self.path[4])
output = self.server.stereocam.start_serving(idx)
try:
self.send_response(200)
self.send_jpeg(output)
except Exception as err:
self.logger().warning(
"Exception while serving client %s: %s",
self.client_address, str(err))
finally:
self.server.stereocam.stop_serving(idx)
output = None
elif self.path[:4] == "/cam" and \
self.is_valid_idx(self.path[4]) and \
self.path[5:] == ".mjpg":
if self.check_auth():
idx = int(self.path[4])
output = self.server.stereocam.start_serving(idx)
try:
self.send_response(200)
self.send_header(
'Content-Type',
'multipart/x-mixed-replace; boundary=FRAME')
self.end_headers()
while not self.wfile.closed:
self.wfile.write(b"--FRAME\r\n")
self.send_jpeg(output)
self.wfile.write(b"\r\n")
self.wfile.flush()
except Exception as err:
self.logger().warning(
"Exception while serving client %s: %s",
self.client_address, str(err))
finally:
self.server.stereocam.stop_serving(idx)
output = None
else:
self.send_error(404)
def log_error(self, format, *args):
self.logger().error("%s - %s" % (self.address_string(), format % args))
def log_message(self, format, *args):
self.logger().info("%s - %s" % (self.address_string(), format % args))
class HTTPServerThread(Thread):
def __init__(self, config, stereocam):
super().__init__()
self.logger = logging.getLogger(self.__class__.__name__)
self.server = ThreadingHTTPServer(
('', config.getint('server', 'port')),
CameraHTTPRequestHandler)
self.server.stereocam = stereocam
self.server.auth = None
if config.get('server', 'user') and config.get('server', 'pass'):
str_auth = config.get('server', 'user') + \
':' + config.get('server', 'pass')
self.server.auth = 'Basic ' + \
base64.b64encode(str_auth.encode()).decode()
def run(self):
self.logger.info(
"Starting HTTP server on port %s", self.server.server_port)
self.server.serve_forever()
def stop_serving(self):
self.logger.info("Stopping HTTP server")
self.server.shutdown()
class SignalHandler:
def __init__(self, stereocam):
self._logger = logging.getLogger(self.__class__.__name__)
self._stereocam = stereocam
def _handle_signal(self, signum, frame):
self._logger.info("Handling signal %s", signum)
self._stereocam.stop_capturing()
def register_signal_handlers(self):
signal.signal(signal.SIGINT, self._handle_signal) # Ctrl-C
signal.signal(signal.SIGTERM, self._handle_signal) # kill
def main():
config = load_config()
if not os.path.exists(config.get('output', 'path_rec')):
os.makedirs(config.get('output', 'path_rec'))
if not os.path.exists(config.get('output', 'path_log')):
os.makedirs(config.get('output', 'path_log'))
init_logging(config)
logging.info("Logging initialized")
stereocam = StereoCamera(config)
httpthread = None
if config.getint('server', 'port') > 0:
httpthread = HTTPServerThread(config, stereocam)
httpthread.start()
gpiocontroller = GPIOController(config, stereocam)
gpiocontroller.setup_gpio()
sighandler = SignalHandler(stereocam)
sighandler.register_signal_handlers()
stereocam.start_capturing()
if httpthread:
httpthread.stop_serving()
if __name__ == '__main__':
sys.exit(main())