Examples Codes
Hello from plugin
"""Plugin to log a message when entering in the 'wait' state."""
import pibooth
from pibooth.utils import LOGGER
__version__ = "1.0.0"
@pibooth.hookimpl
def state_wait_enter():
LOGGER.info("Hello from '%s' plugin", __name__)
Upload to FTP
"""Plugin to upload pictures on a FTP server."""
import os
from ftplib import FTP
import pibooth
__version__ = "0.0.2"
@pibooth.hookimpl
def pibooth_startup(app):
app.ftp = FTP()
app.ftp.set_debuglevel(0)
app.ftp.connect("ftp.pibooth.org", 21)
app.ftp.login("pibooth", "1h!gR4/opK")
@pibooth.hookimpl
def state_processing_exit(app):
name = os.path.basename(app.previous_picture_file)
with open(app.previous_picture_file, 'rb') as fp:
app.ftp.storbinary(f'STOR {name}', fp, 1024)
@pibooth.hookimpl
def pibooth_cleanup(app):
app.ftp.close()
Control a RGB LED
"""Plugin to manage the RGB lights via GPIO."""
import pibooth
from gpiozero import RGBLED
from colorzero import Color
__version__ = "1.1.0"
@pibooth.hookimpl
def pibooth_startup(app):
# GPIOZERO is configured as BCM, use string with "BOARD(pin)" to
# convert on BOARD
app.rgbled = RGBLED("BOARD36", "BOARD38", "BOARD40")
@pibooth.hookimpl
def state_wait_enter(app):
app.rgbled.color = Color('green')
@pibooth.hookimpl
def state_choose_enter(app):
app.rgbled.blink()
@pibooth.hookimpl
def state_preview_enter(app):
app.rgbled.color = Color('white')
app.rgbled.blink()
@pibooth.hookimpl
def state_capture_exit(app):
app.rgbled.color = Color('red')
Add ‘Get Ready’ text before captures sequence
pibooth_getready_text.py
Setup a custom camera
"""Plugin to handle retry in case of exception with DSLR/gPhoto2 camera."""
import pibooth
from pibooth.utils import LOGGER
from pibooth import camera
__version__ = "4.0.3"
class GpCameraRetry(camera.GpCamera):
def get_capture_image(self, effect=None):
"""Capture a new picture.
"""
retry = 0
max_retry = 2
while retry < max_retry:
try:
return super().get_capture_image(effect)
except Exception:
LOGGER.warning("Gphoto2 fails to capture, trying again...")
retry += 1
raise EnvironmentError(f"Gphoto2 fails to capture {max_retry} times")
class HybridRpiCameraRetry(camera.HybridRpiCamera):
def __init__(self, rpi_camera_proxy, gp_camera_proxy):
super().__init__(rpi_camera_proxy, gp_camera_proxy)
self._gp_cam = GpCameraRetry(gp_camera_proxy)
self._gp_cam._captures = self._captures # Same dict for both cameras
@pibooth.hookimpl
def pibooth_setup_camera():
rpi_cam_proxy = camera.get_rpi_camera_proxy()
gp_cam_proxy = camera.get_gp_camera_proxy()
if rpi_cam_proxy and gp_cam_proxy:
LOGGER.info("Configuring hybrid camera with retry (Picamera + gPhoto2) ...")
return HybridRpiCameraRetry(rpi_cam_proxy, gp_cam_proxy)
elif gp_cam_proxy:
LOGGER.info("Configuring gPhoto2 camera with retry ...")
return GpCameraRetry(gp_cam_proxy)
elif rpi_cam_proxy:
LOGGER.info("Configuring Picamera camera ...")
return camera.RpiCamera(rpi_cam_proxy)
Setup a custom printer
"""Pibooth plugin which customize printer to print several picture per page."""
import tempfile
import os.path as osp
from PIL import Image
import pibooth
from pibooth.printer import Printer
from pibooth.pictures import get_picture_factory
__version__ = "1.0.0"
class CustomPrinter(Printer):
def __init__(self, name='default', max_pages=-1, options=None, pictures_per_page=1):
super().__init__(name, max_pages, options)
self.pictures_per_page = pictures_per_page
def print_file(self, filename):
"""Send a file to the CUPS server to the default printer.
"""
if self.pictures_per_page > 1:
with tempfile.NamedTemporaryFile(suffix=osp.basename(filename)) as fp:
picture = Image.open(filename)
factory = get_picture_factory((picture,) * self.pictures_per_page)
factory.set_margin(2)
factory.save(fp.name)
super().print_file(fp.name)
else:
super().print_file(filename)
@pibooth.hookimpl
def pibooth_configure(cfg):
"""Declare the new configuration options."""
cfg.add_option('PRINTER', 'pictures_per_page', 1,
"Print 1, 2, 3 or 4 picture copies per page",
"Number of copies per page", [str(i) for i in range(1, 5)])
@pibooth.hookimpl
def pibooth_setup_printer(cfg):
"""Declare the new printer."""
return CustomPrinter(cfg.get('PRINTER', 'printer_name'),
cfg.getint('PRINTER', 'max_pages'),
cfg.gettyped('PRINTER', 'printer_options'),
cfg.getint('PRINTER', 'pictures_per_page'))
Setup a custom picture factory
"""Pibooth plugin which return the first raw capture as final picture."""
import pibooth
from pibooth.pictures.factory import PictureFactory
__version__ = "1.0.0"
class IdlePictureFactory(PictureFactory):
def build(self, rebuild=False):
return self._images[0]
@pibooth.hookimpl
def pibooth_setup_picture_factory(factory):
return IdlePictureFactory(factory.width, factory.height, *factory._images)
Setup a custom custom state
"""Pibooth plugin to add a new state which a display 'Get Ready' text after each capture."""
import pibooth
from pibooth.utils import PollingTimer
from pibooth.view.pygame.sprites import BasePygameScene, TextSprite
__version__ = "0.0.2"
class MyScene(BasePygameScene):
"""Class to define a new visual scene.
"""
def __init__(self):
super().__init__()
self.text = TextSprite(self, "Get Ready!")
def resize(self, size):
"""Resize text when window is resized.
"""
self.text.set_rect(*self.rect.inflate(-100, -100))
@pibooth.hookimpl
def pibooth_setup_states(machine):
"""Declare the new 'ready' state and associated scene.
"""
machine.add_state('ready', MyScene())
@pibooth.hookimpl(hookwrapper=True)
def state_capture_validate(win):
"""Catch the next scene defined after a capture. If next state is
'preview', then go back to 'ready' state.
"""
# Catch next state defined by pibooth core components
outcome = yield
next_state = outcome.get_result()
# If next state is 'preview' then replace it by 'ready' state
if next_state == 'preview' and win.type == 'pygame':
outcome.force_result('ready')
@pibooth.hookimpl(optionalhook=True)
def state_ready_enter(app):
"""Start a timer during which text is displayed.
"""
app.readystate_timer = PollingTimer(5)
@pibooth.hookimpl(optionalhook=True)
def state_ready_validate(app):
"""Go to 'preview' state when timer is reached.
"""
if app.readystate_timer.is_timeout():
return 'preview'