Managing Persons in Photo Collections – The Application

In a first post we showed that the basics of person encoding and recognition are straightforward to implement. In the next post , we added full-body person recognition alongside a stronger face encoder—enough tooling to build and maintain a knowledge base of named people in your photo library We thus created the tools to build and manage a knowledgebase of named and identifiable people in our photo collections. We promised to add useful features to get the most out of working with such a knowledge base. What’s still missing is a real application with a graphical user interface (GUI) to tie everything together—and, since this is photo management, some basic viewing and logging capabilities! 

PyQt Application

Because we work in Python, PyQt is a natural fit. A Python binding for the cross-platform GUI toolkit Qt. The current version PyQt6 supports Qt6 and runs on Windows, macOS and Linux. It’s easily installed with: pip install PyQt6 pyqt6-tools.

Online there are many good PyQt tutorials, you might for instance take a look at Real Python.

Creating the GUI APP

To keep things modular presentable we split the code into small, focused modules—loosely coupled, independently reusable components with clear responsibilities.

The application framework:

  • main.py,  the entry point of the application, that sets warning filters and boots the app;
  • config.py, manages settings.json  and shared constants;
  • ui.py, the actual PyQt6 GUI that holds Main Window, Settings Dialog, logging bridge).

The encoder & recognition toolkit:

  • reid_wrapper.py,  a trimmed down version of our full body encoder, and
  • face_encoder.py, our already demonstrated face encoder.

The application functions building on the toolkit:

  • kb.py, the knowledgebase data model and the worker method for batch encoding;
  • recognize.py, holds the worker method for batch recognition and searching.

We’ll extend this in future posts.

An Import strategy

An effective import strategy is crucial for building a modular Python application. While avoiding noise and circulars a disciplined import strategy improves maintainability and keeps the console clean. Our setup supports this by assuring that:

  • main.py is the only file that installs early warning filters, then imports ui after filters are set—so noisy legacy TorchReID prints are silenced.
  • ui.py only imports the workers (KBUpsertWorker, FolderWorker from kb, recognize) and import TorchreidBodyExtractor inside worker functions (lazy import).
  • kb.py and recognize.py are GUI-free; they receive simple callables (log_fn, progress_fn) and import TorchreidBodyExtractor inside the worker or function right before use (lazy import).
  • reid_wrapper.py contains all TorchReID quirks (dual layout + quiet import). Nothing else imports torchreid directly.
  • config.py centralizes the AppConfig dataclass, defaults (valid_exts, encodings_filename, thresholds), and paths. Workers receive values, not the config object — to avoid hidden cross-module coupling.

Below we take a closer look at the individual sources.

Main

A minimal main.py that filters noisy warnings and starts the GUI:

# main.py
from __future__ import annotations
# --- warning filters BEFORE any imports that could trigger torchreid/face_recognition noise
import warnings
warnings.filterwarnings("ignore", category=UserWarning,
    message=r"pkg_resources is deprecated as an API.*", module=r"face_recognition_models(\.|$)")
warnings.filterwarnings("ignore", category=DeprecationWarning,
    message=r"pkg_resources is deprecated as an API.*")
warnings.filterwarnings("ignore", category=UserWarning,
    message=r"Cython evaluation \(very fast so highly recommended\) is unavailable, now use python evaluation\.",
    module=r"torchreid\..*")

from PyQt6 import QtWidgets
from ui import MainWindow

def main():
    app = QtWidgets.QApplication([])
    app.setApplicationName("People Recognition GUI")
    app.setOrganizationName("YourOrg")
    win = MainWindow()
    win.show()
    return app.exec()

if __name__ == "__main__":
    raise SystemExit(main())
Python

This filters clutter and launches the application starting the main window.

Config

config.py manages settings.json and shared defaults:

#config.py
from __future__ import annotations
from   dataclasses import dataclass, asdict, field
from   pathlib import Path
import json

ROOT = Path(__file__).resolve().parent
SETTINGS_FILE = ROOT / "settings.json"   # or Path.cwd()/..., your pick

@dataclass
class AppConfig:
    dataset_dir: str = ""
    processed_dir: str = ""
    process_dir: str = ""
    face_tol: float = 0.45
    body_tol: float = 0.80
    reid_model: str = "osnet_ain_x1_0"
    valid_exts: list[str] = field(default_factory=lambda: [".jpg", ".jpeg", ".png", ".webp"])
    encodings_filename: str = "encodings.pkl"
    resize_max: int = 800
    lap_var_thresh: float = 80
    kb_batch_size: int = 16

    @classmethod
    def load(cls) -> "AppConfig":
        if SETTINGS_FILE.exists():
            return cls(**json.loads(SETTINGS_FILE.read_text(encoding="utf-8")))
        return cls()
    def save(self) -> None:
        SETTINGS_FILE.write_text(json.dumps(asdict(self), indent=2), encoding="utf-8")
Python

This piece of code manages the content of our configuration file settings.json  and shared constants.

For face recognition the following threshold values could be used.

ThresholdBehavior
0.6Standard, balanced
0.5Strict
0.4Very strict (almost identical faces only)
0.35 or lowerExtremely strict — only near-perfect matches

UI

ui.py builds the main window, status bar with progress, central log console, and menus (Settings, Manage Persons, Knowledge Base, Recognition, Help). The Settings dialog lets you browse folders and adjust thresholds and the ReID model. Worker threads (KBUpsertWorker, FolderWorker) handle long-running tasks; they communicate via thread-safe Qt signals to update progress and logs.

# ui.py
# ui.py
from   config import AppConfig
from   kb import KBUpsertWorker
from   recognize import FolderWorker
# Python utilities
from   dataclasses import replace 
from   datetime import datetime
from   pathlib import Path
# enable Python build in logging
import logging
# PyQt GUI for application
from   PyQt6 import QtCore, QtGui, QtWidgets
from   PyQt6.QtCore import Qt, pyqtSignal

# ---- Logging bridge: route Python logging -> GUI ----
class GuiLogEmitter(QtCore.QObject):
    message = pyqtSignal(str)
# Send logging records to the GUI via a Qt signal (thread-safe)
class GuiLogHandler(logging.Handler):
    def __init__(self, emitter: GuiLogEmitter):
        super().__init__()
        self.emitter = emitter
    def emit(self, record: logging.LogRecord) -> None:
        try:
            msg = self.format(record)
        except Exception:
            msg = record.getMessage()
        self.emitter.message.emit(msg)

class SettingsDialog(QtWidgets.QDialog):
    def __init__(self, cfg: AppConfig, parent=None):
        super().__init__(parent)
        self.setWindowTitle("Preferences")
        self.setModal(True)
        self.cfg = cfg
        # --- fields
        self.ed_dataset   = QtWidgets.QLineEdit(cfg.dataset_dir)
        self.ed_processed = QtWidgets.QLineEdit(cfg.processed_dir)
        self.ed_process   = QtWidgets.QLineEdit(cfg.process_dir)
        self.sp_face = QtWidgets.QDoubleSpinBox()
        self.sp_face.setRange(0.0, 2.0); self.sp_face.setSingleStep(0.01); self.sp_face.setValue(cfg.face_tol)
        self.sp_body = QtWidgets.QDoubleSpinBox()
        self.sp_body.setRange(0.0, 1.0); self.sp_body.setSingleStep(0.01); self.sp_body.setValue(cfg.body_tol)
        self.cb_model = QtWidgets.QComboBox()
        self.cb_model.addItems(["osnet_ain_x1_0", "osnet_x1_0"])
        i = self.cb_model.findText(cfg.reid_model)
        if i >= 0:
            self.cb_model.setCurrentIndex(i)
        # --- browse helpers
        def browse(line: QtWidgets.QLineEdit):
            d = QtWidgets.QFileDialog.getExistingDirectory(self, "Choose folder", line.text() or str(Path.cwd()))
            if d: line.setText(d)
        btn_browse_ds  = QtWidgets.QPushButton("Browse…"); btn_browse_ds.clicked.connect(lambda: browse(self.ed_dataset))
        btn_browse_out = QtWidgets.QPushButton("Browse…"); btn_browse_out.clicked.connect(lambda: browse(self.ed_processed))
        btn_browse_in  = QtWidgets.QPushButton("Browse…"); btn_browse_in.clicked.connect(lambda: browse(self.ed_process))
        
        # --- form
        form = QtWidgets.QFormLayout()
        form.setLabelAlignment(Qt.AlignmentFlag.AlignRight)
        form.setContentsMargins(12, 12, 12, 0)
        row_ds  = QtWidgets.QHBoxLayout(); row_ds.addWidget(self.ed_dataset);  row_ds.addWidget(btn_browse_ds)
        row_out = QtWidgets.QHBoxLayout(); row_out.addWidget(self.ed_processed); row_out.addWidget(btn_browse_out)
        row_in  = QtWidgets.QHBoxLayout(); row_in.addWidget(self.ed_process); row_in.addWidget(btn_browse_in)        
        form.addRow("Persons dataset:",  row_ds)
        form.addRow("Processed output:", row_out)
        form.addRow("Process input:", row_in)
        form.addRow("Face tolerance (≤):", self.sp_face)
        form.addRow("Body similarity (≥):", self.sp_body)
        form.addRow("ReID model:", self.cb_model)
        # --- button box (Qt6 enum)
        bb = QtWidgets.QDialogButtonBox(
            QtWidgets.QDialogButtonBox.StandardButton.Ok
            | QtWidgets.QDialogButtonBox.StandardButton.Cancel
        )
        bb.accepted.connect(self.accept)
        bb.rejected.connect(self.reject)
        # --- outer layout
        main = QtWidgets.QVBoxLayout(self)
        main.addLayout(form)
        main.addStretch(1)                              # keep buttons at the bottom
        main.addWidget(bb, alignment=Qt.AlignmentFlag.AlignRight)

    def values(self) -> AppConfig:
        return replace(
            self.cfg,
            dataset_dir=self.ed_dataset.text().strip(),
            processed_dir=self.ed_processed.text().strip(),
            process_dir=self.ed_process.text().strip(),
            face_tol=float(self.sp_face.value()),
            body_tol=float(self.sp_body.value()),
            reid_model=self.cb_model.currentText().strip(),
        )
# ---- Main Window of the Application ----------------------------------------------------------------
class MainWindow(QtWidgets.QMainWindow):
    def __init__(self):
        super().__init__()
        self.setWindowTitle("Managing Persons in Photo Collections")
        self.resize(1100, 700)
        self._make_central_console()
        self._make_statusbar()
        self._make_menubar()
        self._wire_logging()
        self.log("Application started. Ready.")
        # wire settings into the app
        self.cfg = AppConfig.load()        
        # Restore last window geometry/state
        self._settings = QtCore.QSettings("YourOrg", "PeopleRecognitionGUI")
        if (geo := self._settings.value("main/geometry")):
            self.restoreGeometry(geo)
        if (state := self._settings.value("main/windowState")):
            self.restoreState(state)

    # --- UI components
    def _make_central_console(self):                                            # canvas for writing logging, dislaying messages etc. 
        self.console = QtWidgets.QPlainTextEdit(readOnly=True)
        self.console.setWordWrapMode(QtGui.QTextOption.WrapMode.NoWrap)
        self.console.setFont(QtGui.QFont("Consolas", 10))
        self.console.setPlaceholderText("Log output will appear here…")
        self.setCentralWidget(self.console)

    def _make_statusbar(self):                                                  # status bas for visual progress feedback
        sb = QtWidgets.QStatusBar()
        self.setStatusBar(sb)
        self.progress = QtWidgets.QProgressBar()
        self.progress.setRange(0, 100)
        self.progress.setValue(0)
        self.progress.setTextVisible(True)
        self.progress.setFixedWidth(260)
        sb.addPermanentWidget(self.progress)
        self.statusBar().showMessage("Ready")

    def _make_menubar(self):                                                    # menu to manage application functions
        mb = self.menuBar()
        # Quit
        self.menu_settings = mb.addMenu("&Quit")
        act_quit = QtGui.QAction("Quit", self)       
        act_quit.triggered.connect(self.close)         
        self.menu_settings.addAction(act_quit)        
        # Settings
        self.menu_settings = mb.addMenu("&Settings")
        act_prefs = QtGui.QAction("Preferences…", self)
        act_prefs.triggered.connect(self._on_prefs)
        self.menu_settings.addAction(act_prefs)
        # Manage Persons
        self.menu_manage_persons = mb.addMenu("&Manage Persons")
        self.menu_manage_persons.addAction(self._ph("Add Person"))
        self.menu_manage_persons.addAction(self._ph("Remove Person"))
        self.menu_manage_persons.addAction(self._ph("Rename / Merge…"))
        # Query Persons
        self.menu_query = mb.addMenu("&Query Persons")
        self.menu_query.addAction(self._ph("Search in Folder…"))
        self.menu_query.addAction(self._ph("Interactive Identify…"))
        # Manage Knowledgebase
        self.menu_kb = mb.addMenu("&Knowledgebase")
        act_kb_init = QtGui.QAction("Initialize KB…", self)
        act_kb_init.triggered.connect(self._kb_init)
        self.menu_kb.addAction(act_kb_init)
        act_kb_extend = QtGui.QAction("Extend KB from Folder…", self)
        act_kb_extend.triggered.connect(self._kb_extend)
        self.menu_kb.addAction(act_kb_extend)
        self.menu_kb.addSeparator()
        self.menu_kb.addAction(self._ph("Show Stats…"))
        self.menu_kb.addAction(self._ph("Export Log…"))
        # Recognition
        self.menu_recognition = mb.addMenu("&Recognition")
        act_recognize = QtGui.QAction("Recognize Folder…", self)
        act_recognize.triggered.connect(self._recognize_folder)
        demo = QtGui.QAction("Demo Progress (5s)", self); 
        demo.triggered.connect(self._demo_progress)
        self.menu_recognition.addAction(act_recognize)
        self.menu_recognition.addSeparator()
        self.menu_recognition.addAction(demo)        
        # Help / About
        self.menu_help = mb.addMenu("&Help")
        act_about = QtGui.QAction("About…", self, triggered=self._on_about)
        act_clear = QtGui.QAction("Clear Log", self, shortcut="Ctrl+L", triggered=self.console.clear)
        self.menu_help.addActions([act_about, act_clear])

    def _sep(self) -> QtGui.QAction:                                            # menu separator
        sep = QtGui.QAction(self)
        sep.setSeparator(True)
        return sep

    def _ph(self, text: str) -> QtGui.QAction:                                  # menu placeholder
        """Create a placeholder action that just logs for now."""
        act = QtGui.QAction(text, self)
        act.triggered.connect(lambda _, t=text: self.log(f"[TODO] {t}"))
        return act

    # --- Logging
    def _wire_logging(self):
        self._log_emitter = GuiLogEmitter()
        self._log_emitter.message.connect(self._append_log_line)
        self._log_handler = GuiLogHandler(self._log_emitter)
        self._log_handler.setFormatter(logging.Formatter("%(asctime)s%(levelname)s%(message)s"))
        logging.getLogger().addHandler(self._log_handler)
        logging.getLogger().setLevel(logging.INFO)

    def log(self, msg: str, level: int = logging.INFO):
        # Use Python logging so external modules can log into the GUI too.
        logging.getLogger().log(level, msg)

    @QtCore.pyqtSlot(str)                                                        # slot for recieving massages from workers and displaying these in the console           
    def _append_log_line(self, text: str):
        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        self.console.appendPlainText(text if text.startswith("20") else f"{timestamp}{text}")
        # Auto-scroll
        self.console.verticalScrollBar().setValue(self.console.verticalScrollBar().maximum())

    def _on_prefs(self):
        dlg = SettingsDialog(self.cfg, self)
        if dlg.exec() == QtWidgets.QDialog.DialogCode.Accepted:
            self.cfg = dlg.values()
            self.cfg.save()
            self.log(f"Preferences saved: dataset='{self.cfg.dataset_dir}', "
                    f"output='{self.cfg.processed_dir}', process='{self.cfg.process_dir}', "
                    f"FACE_TOL={self.cfg.face_tol:.2f}, BODY_SIM_TOL={self.cfg.body_tol:.2f}, "
                    f"model={self.cfg.reid_model}")

    def _on_about(self):
        QtWidgets.QMessageBox.information(self, "About", 
            "People Recognition — Minimal GUI\n\n"
            "Part I/II foundation with face + body recognition.\n"
            "This app is the starting point for a full workflow."
        )

    def _demo_progress(self):                                                      # simulate a short task to show the status/progress bar         
        self.statusBar().showMessage("Running demo task…")
        self.progress.setValue(0)
        steps = 50
        self._demo_timer = QtCore.QTimer(self)
        self._demo_timer.setInterval(100)  # 0.1s * 50 = ~5s
        self._demo_i = 0

        def tick():
            self._demo_i += 1
            pct = int(self._demo_i * 100 / steps)
            self.progress.setValue(pct)
            if self._demo_i >= steps:
                self._demo_timer.stop()
                self.statusBar().showMessage("Ready")
                self.log("Demo task finished.")

        self._demo_timer.timeout.connect(tick)
        self._demo_timer.start()

    # --- Close/save state
    def closeEvent(self, event: QtGui.QCloseEvent) -> None:
        self._settings.setValue("main/geometry", self.saveGeometry())
        self._settings.setValue("main/windowState", self.saveState())
        super().closeEvent(event)

    # manage the thread to run the batch recognition processing of a folder 
    def _recognize_folder(self):
        folder = QtWidgets.QFileDialog.getExistingDirectory(self, "Choose folder to recognize", self.cfg.process_dir)
        if not folder:
            return
        kb_path = Path(self.cfg.dataset_dir) / self.cfg.encodings_filename
        out_dir = Path(self.cfg.process_dir) if self.cfg.process_dir else (Path(folder) / "recognized_out")
        self.statusBar().showMessage("Recognizing…")
        self.progress.setValue(0)
        self.log(f"Starting recognition on: {folder}")
        self._thread = QtCore.QThread(self)
        self._worker = FolderWorker(kb_path=kb_path, input_folder=Path(folder), output_folder=out_dir, face_threshold=self.cfg.face_tol, 
                                    body_threshold=self.cfg.body_tol, reid_model=self.cfg.reid_model, valid_exts=tuple(self.cfg.valid_exts),)
        self._worker.moveToThread(self._thread)
        self._thread.started.connect(self._worker.run)
        self._worker.progress.connect(self.progress.setValue)
        self._worker.log.connect(self.log)

        def done(msg):
            self.statusBar().showMessage("Ready")
            self.log(msg)
            self.progress.setValue(0)
            self._thread.quit(); self._thread.wait()

        self._worker.finished.connect(done)
        self._thread.start()

    # manage the thread to run the batch creation or extending of the known-persons knowledgebase 
    def _kb_init(self):
        # Pick the KB root (dataset folder containing person subfolders)
        start = self.cfg.dataset_dir or str(Path.cwd())
        folder = QtWidgets.QFileDialog.getExistingDirectory(self, "Select KB Root (dataset)", start)
        if not folder:
            return
        main_dir = Path(folder)
        self.cfg.dataset_dir = str(main_dir)
        self.cfg.save()
        kb_path = main_dir / self.cfg.encodings_filename  # use setting, not hard-coded
        self.statusBar().showMessage("Initializing KB…")
        self.progress.setValue(0)
        self.log(f"KB Initialize: main={main_dir} kb={kb_path}")
        self._kbu_thread = QtCore.QThread(self)
        self._kbu_worker = KBUpsertWorker(main_dir=main_dir, kb_path=kb_path, reid_model=self.cfg.reid_model, source_dir=None, copy_mode="copy", add_only_new_persons=True, 
                                          valid_exts=tuple(self.cfg.valid_exts), kb_batch_size=int(self.cfg.kb_batch_size),)
        self._kbu_worker.moveToThread(self._kbu_thread)
        self._kbu_thread.started.connect(self._kbu_worker.run)
        self._kbu_worker.progress.connect(self.progress.setValue)
        self._kbu_worker.log.connect(self.log)
        def done(stats: dict):
            self.statusBar().showMessage("Ready")
            if not stats:
                self.log("[KB] Job failed. See log for details.")
            else:
                self.log(
                    f"[KB] Init done. persons_added={stats.get('persons_added',0)}, "
                    f"faces+={stats.get('faces_added',0)}, bodies+={stats.get('bodies_added',0)} | "
                    f"totals: faces={stats.get('total_faces',0)}, bodies={stats.get('total_bodies',0)}, "
                    f"names={stats.get('distinct_names',0)}"
                )
            self.progress.setValue(0)
            self._kbu_thread.quit(); self._kbu_thread.wait()
        self._kbu_worker.finished.connect(done)
        self._kbu_thread.start()

    def _kb_extend(self):
        # Ensure we know the KB root first
        if not self.cfg.dataset_dir:
            QtWidgets.QMessageBox.warning(self, "No KB", "Initialize the KB first (choose a KB root).")
            return
        main_dir = Path(self.cfg.dataset_dir)
        if not main_dir.exists():
            QtWidgets.QMessageBox.warning(self, "KB Missing", f"KB root not found:\n{main_dir}")
            return
        # Pick staging folder with person subfolders to add
        source = QtWidgets.QFileDialog.getExistingDirectory(self, "Select Folder to Extend From", str(Path.cwd()))
        if not source:
            return
        kb_path = main_dir / self.cfg.encodings_filename
        self.statusBar().showMessage("Extending KB…")
        self.progress.setValue(0)
        self.log(f"KB Extend: main={main_dir} kb={kb_path} from={source}")
        self._kbu_thread = QtCore.QThread(self)
        self._kbu_worker = KBUpsertWorker(main_dir=main_dir, kb_path=kb_path, reid_model=self.cfg.reid_model, source_dir=Path(source), copy_mode="copy", add_only_new_persons=True, 
                                          valid_exts=tuple(self.cfg.valid_exts), kb_batch_size=int(self.cfg.kb_batch_size),)
        self._kbu_worker.moveToThread(self._kbu_thread)
        self._kbu_thread.started.connect(self._kbu_worker.run)
        self._kbu_worker.progress.connect(self.progress.setValue)
        self._kbu_worker.log.connect(self.log)
        def done(stats: dict):
            self.statusBar().showMessage("Ready")
            if not stats:
                self.log("[KB] Job failed. See log for details.")
            else:
                self.log(
                    f"[KB] Extend done. persons_added={stats.get('persons_added',0)}, "
                    f"faces+={stats.get('faces_added',0)}, bodies+={stats.get('bodies_added',0)} | "
                    f"totals: faces={stats.get('total_faces',0)}, bodies={stats.get('total_bodies',0)}, "
                    f"names={stats.get('distinct_names',0)}"
                )
            self.progress.setValue(0)
            self._kbu_thread.quit(); self._kbu_thread.wait()
        self._kbu_worker.finished.connect(done)
        self._kbu_thread.start()
Python

This is the code that creates and shows our application. With a menu to guide the operations and a progress bar for visual feed back. It also has central console for messages and logging information. This looks like quite a lot of code, which in fact it is. But if you read through it, you’ll notice that it rather easy to understand.

Thread safe Workers

You will notice that the Main Windows contains two menu items for batch processes: Knowledgebase ‘Create or extend KB’ and Recognition ‘Recognize folder’. These trigger the helper methods _create_or_extend_kb() and _recognize_folder(). Special methods that create a separate thread beside the main application thread and supply it with the means to communicate their progress to the main window so it can be shown there. We implement the actual workers: the classes FolderWorker and KBUpsertWorker in the next post.

These worker methods both make intensive use of our toolbox for encoding & recognition. To complete the discussion on building the application both these sources are shown below.

Reid_wrapper

# reid_wrapper.py
# trimmed down version of torchreid_extractor.py
from __future__ import annotations
import importlib, importlib.util, io, contextlib
from   typing import Optional, Iterable, List, Callable
from   PIL import Image
import numpy as np
import torch
from   torchvision import transforms

# Return the torchreid.models module, supporting canonical and legacy layouts.
def _import_models(quiet: bool = True, log_fn: Optional[Callable[[str], None]] = None):
    def _cap(modname: str):
        if not quiet:
            return importlib.import_module(modname)
        buf = io.StringIO()
        with contextlib.redirect_stdout(buf), contextlib.redirect_stderr(buf):
            mod = importlib.import_module(modname)
        msg = buf.getvalue().strip()
        if msg and log_fn:
            log_fn(f"[ReID] {msg.splitlines()[-1]}")
        return mod

    if importlib.util.find_spec("torchreid") is None:
        raise ImportError("TorchReID not found. Install torchreid (>=1.4.0) or the GitHub repo.")

    # Try canonical then legacy
    for name in ("torchreid.models", "torchreid.reid.models"):
        if importlib.util.find_spec(name) is not None:
            if name != "torchreid.models" and log_fn:
                log_fn(f"[ReID] Using fallback module path: {name}")
            return _cap(name)

    # Last resort: import base and walk attributes
    pkg = _cap("torchreid")
    for attr_path in ("models", "reid.models"):
        obj = pkg
        ok = True
        for part in attr_path.split("."):
            obj = getattr(obj, part, None)
            if obj is None:
                ok = False
                break
        if ok:
            if attr_path != "models" and log_fn:
                log_fn(f"[ReID] Using models via torchreid.{attr_path}")
            return obj
# if it all fails
    raise ImportError("Could not import TorchReID models (tried torchreid.models and torchreid.reid.models).")

# Quiet, minimal feature extractor around TorchReID backbones:
# __call__(PIL.Image)                                               -> (D,) float32 L2-normalized
# extract_batch(List[PIL.Image], batch_size=32) -> (N, D) float32 L2-normalized
# extract_paths(Iterable[str], batch_size=32)      -> (N, D) float32 L2-normalized
class TorchreidBodyExtractor:
    def __init__(self, model_name: str = "osnet_ain_x1_0", device: Optional[str] = None, height: int = 256, width: int = 128, quiet: bool = True, log_fn: Optional[Callable[[str], None]] = None,):
        self.model_name = model_name
        self.height, self.width = int(height), int(width)
        self.quiet = bool(quiet)
        self._log = log_fn
        if device is None:
            device = "cuda:0" if torch.cuda.is_available() else "cpu"
        self.device = torch.device(device)
        # Import models lazily and build the backbone
        models = _import_models(self.quiet, self._log)
        # Build model class directly (no repo-specific builders)
        Model = getattr(models, model_name, None)
        if Model is None:
            # some installs keep classes in __dict__
            Model = models.__dict__.get(model_name)
        if Model is None:
            avail = sorted([k for k, v in models.__dict__.items() if not k.startswith("_")])
            raise ValueError(f"Unknown model '{model_name}'. Available: {avail[:20]}...")
        # Quiet stdout while constructing (pretrained download line, etc.)
        if self.quiet:
            buf = io.StringIO()
            with contextlib.redirect_stdout(buf), contextlib.redirect_stderr(buf):
                self.model = Model(pretrained=True)   # downloads weights if needed
            msg = buf.getvalue().strip()
            if msg and self._log:
                self._log(f"[ReID] {msg.splitlines()[-1]}")
        else:
            self.model = Model(pretrained=True)
        self.model.eval().to(self.device)
        # Use plain torchvision transforms; keep consistent with TorchReID training stats
        self.transform = transforms.Compose([
            transforms.Resize((self.height, self.width), interpolation=transforms.InterpolationMode.BILINEAR),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ])

    # ---- public API ----
    def __call__(self, image: Image.Image) -> np.ndarray:
        t = self.transform(image.convert("RGB")).unsqueeze(0).to(self.device, non_blocking=True)
        with torch.inference_mode():
            f = self.model(t)  # shape [1, D]
            f = torch.nn.functional.normalize(f, dim=1)
        return f.squeeze(0).detach().cpu().numpy().astype("float32")

    def extract_batch(self, images: List[Image.Image], batch_size: int = 32, num_workers: int = 0) -> np.ndarray:
        # turn PILs into tensors
        tensors = [self.transform(im.convert("RGB")) for im in images]
        if not tensors:
            return np.zeros((0, 0), dtype=np.float32)
        # simple mini-batching, no DataLoader workers to avoid PIL/thread issues on Windows
        feats = []
        with torch.inference_mode():
            for i in range(0, len(tensors), batch_size):
                batch = torch.stack(tensors[i:i+batch_size], dim=0).to(self.device, non_blocking=True)
                f = self.model(batch)
                f = torch.nn.functional.normalize(f, dim=1)
                feats.append(f.detach().cpu())
        return torch.cat(feats, dim=0).numpy().astype("float32", copy=False)

    def extract_paths(self, paths: Iterable[str], batch_size: int = 32, num_workers: int = 0) -> np.ndarray:
        imgs: List[Image.Image] = []
        for p in paths:
            try:
                with Image.open(p) as im:
                    imgs.append(im.convert("RGB").copy())  # detach from file handle
            except Exception:
                continue
        return self.extract_batch(imgs, batch_size=batch_size, num_workers=num_workers)

    @staticmethod
    def cosine_sim(a: np.ndarray, b: np.ndarray) -> float:
        return float(np.dot(a, b))
    @staticmethod
    def euclid_dist(a: np.ndarray, b: np.ndarray) -> float:
        return float(np.linalg.norm(a - b, ord=2))
Python

This piece of code contains our wrapper around the PyTorch TorchReID library of models for full-body recognition.

Face_encoder

# face_encoder.py
# Face encoder / aligner: robust dlib input, HOG→CNN fallback,
# • normalizes images into a dlib-friendly RGB buffer (fixes Windows stride/ownership quirks
# • does HOG→CNN fallbacks, upsample retries, alignment by eye landmarks
# • filters blurry faces via Laplacian variance
# • can directly compute a face embedding per aligned chip
from __future__ import annotations
import math
from io import BytesIO
from typing import Any, Dict, List, Tuple, Optional
import cv2
import numpy as np
import face_recognition
from PIL import Image
# silence deprecation noise from face_recognition_models
import warnings
warnings.filterwarnings(
    "ignore",
    category=UserWarning,
    message=r"pkg_resources is deprecated as an API.*",
    module=r"face_recognition_models(\.|$)"
)

# Produce a *fresh*, C-contiguous, writeable uint8 RGB array (H,W,3) by reloading via face_recognition.load_image_file. 
# This sidesteps all stride/ownership weirdness that can upset dlib on Windows.
def _fr_ready_rgb(img: Any) -> np.ndarray:
    if hasattr(img, "mode"):  # PIL.Image
        pil = img.convert("RGB")
    else:
        arr = np.asarray(img)
        if arr.ndim == 2:
            pil = Image.fromarray(arr, mode="L").convert("RGB")
        elif arr.ndim == 3 and arr.shape[2] == 3:
            # if it looks like BGR, flip to RGB first
            if float(arr[..., 0].mean() or 0) > 1.1 * float(arr[..., 2].mean() or 1e-6):
                arr = cv2.cvtColor(arr, cv2.COLOR_BGR2RGB)
            pil = Image.fromarray(arr, mode="RGB")
        elif arr.ndim == 3 and arr.shape[2] == 4:
            try:
                arr = cv2.cvtColor(arr, cv2.COLOR_BGRA2RGB)
            except Exception:
                arr = arr[..., :3]
            pil = Image.fromarray(arr, mode="RGB")
        else:
            raise RuntimeError(f"Unsupported input for dlib: shape={arr.shape if 'arr' in locals() else 'n/a'}")
    buf = BytesIO()
    pil.save(buf, format="PNG")  # lossless, fast
    buf.seek(0)
    arr = face_recognition.load_image_file(buf)  # -> uint8 RGB (H,W,3), contiguous
    return np.require(arr, dtype=np.uint8, requirements=["C", "O", "W"])

def _lap_var(img_rgb: np.ndarray) -> float:
    g = cv2.cvtColor(img_rgb, cv2.COLOR_RGB2GRAY)
    return float(cv2.Laplacian(g, cv2.CV_64F).var())

def _mean_luma(img_rgb: np.ndarray) -> float:
    g = cv2.cvtColor(img_rgb, cv2.COLOR_RGB2GRAY)
    return float(np.mean(g))

def detect_and_align_faces(image: Any, model: str = "hog", upsample: int = 0, desired_size: int = 160, min_box: int = 40, lap_var_thresh: Optional[float] = 80.0, eye_pos: Tuple[float, float] = (0.5, 0.4), eye_dist_ratio: float = 0.35,
        resize_max: Optional[int] = 800, adaptive_blur_factor: float = 0.5, retry_if_empty: bool = True, compute_embedding: bool = False, embedding_model: str = "small", num_jitters: int = 1,) -> List[Dict[str, Any]]:   
    # 1) Normalize to a dlib-friendly RGB buffer
    rgb_full = _fr_ready_rgb(image)
    H_full, W_full = rgb_full.shape[:2]
    # 2) Optional downscale for detection speed
    scale = 1.0
    max_side = max(H_full, W_full)
    if resize_max is not None and max_side > resize_max:
        scale = resize_max / float(max_side)
        new_w, new_h = int(W_full * scale), int(H_full * scale)
        rgb_det = cv2.resize(rgb_full, (new_w, new_h), interpolation=cv2.INTER_AREA)
        rgb_det = _fr_ready_rgb(rgb_det)  # ensure fresh buffer after resize
    else:
        rgb_det = rgb_full
    # 3) Detection with robust fallbacks
    try:
        locs = face_recognition.face_locations(rgb_det, number_of_times_to_upsample=upsample, model=model)
    except Exception:
        if model == "hog":
            # HOG also supports 8-bit gray; retry there
            gray = cv2.cvtColor(rgb_det, cv2.COLOR_RGB2GRAY)
            gray = np.require(gray, dtype=np.uint8, requirements=["C", "O", "W"])
            locs = face_recognition.face_locations(gray, number_of_times_to_upsample=upsample, model="hog")
        else:
            raise
    if not locs and model == "hog":
        # escalate to CNN on RGB
        try:
            locs = face_recognition.face_locations(rgb_det, number_of_times_to_upsample=max(upsample, 1), model="cnn")
        except Exception as e:
            raise RuntimeError(
                f"dlib CNN detector rejected image: dtype={rgb_det.dtype}, shape={rgb_det.shape}, "
                f"C={rgb_det.flags.c_contiguous}, strides={rgb_det.strides}"
            ) from e
    if not locs and retry_if_empty and upsample == 0:
        locs = face_recognition.face_locations(rgb_det, number_of_times_to_upsample=1, model=model)
    if not locs:
        return []
    # 4) Landmarks (always on RGB)
    all_landmarks = face_recognition.face_landmarks(rgb_det, face_locations=locs, model="large") or []
    # 5) Align chips on full-res image
    results: List[Dict[str, Any]] = []
    Wt = Ht = int(desired_size)
    dest_eye_x = Wt * eye_pos[0]
    dest_eye_y = Ht * eye_pos[1]
    desired_dist = eye_dist_ratio * Wt
    global_lap_var = _lap_var(rgb_full)
    effective_blur_thresh: Optional[float] = (adaptive_blur_factor * global_lap_var if lap_var_thresh is None else float(lap_var_thresh))
    for (top, right, bottom, left), lm in zip(locs, all_landmarks):
        # back-map bbox to original scale
        top_o = int(round(top / scale))
        right_o = int(round(right / scale))
        bottom_o = int(round(bottom / scale))
        left_o = int(round(left / scale))
        w_o, h_o = (right_o - left_o), (bottom_o - top_o)
        if w_o < min_box or h_o < min_box:
            continue
        if not lm or ("left_eye" not in lm or "right_eye" not in lm):
            continue
        # eye centers in detection scale
        left_eye = np.mean(np.array(lm["left_eye"]), axis=0)
        right_eye = np.mean(np.array(lm["right_eye"]), axis=0)
        dY = right_eye[1] - left_eye[1]
        dX = right_eye[0] - left_eye[0]
        angle = math.degrees(math.atan2(dY, dX))
        dist = (dX ** 2 + dY ** 2) ** 0.5
        if dist < 1e-6:
            continue
        scale_aff = desired_dist / dist
        eyes_center = ((left_eye[0] + right_eye[0]) * 0.5, (left_eye[1] + right_eye[1]) * 0.5)
        M = cv2.getRotationMatrix2D(eyes_center, angle, scale_aff)
        M[0, 2] += (dest_eye_x - eyes_center[0])
        M[1, 2] += (dest_eye_y - eyes_center[1])
        # rescale translation to apply on full-res
        M_full = M.copy()
        if scale != 1.0:
            M_full[:, 2] /= scale
        aligned = cv2.warpAffine(rgb_full, M_full, (Wt, Ht), flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_REFLECT)
        blur_var = _lap_var(aligned)
        if effective_blur_thresh is not None and blur_var < effective_blur_thresh:
            continue
        out: Dict[str, Any] = {
            "aligned": aligned,
            "bbox": (top_o, right_o, bottom_o, left_o),
            "landmarks": {k: [(int(round(px / scale)), int(round(py / scale))) for (px, py) in v] for k, v in lm.items()},
            "transform": M_full.astype("float32"),
            "scale": float(scale),
            "blur_var": float(blur_var),
            "mean_luma": _mean_luma(aligned),
        }
        if compute_embedding:
            # correct bbox order: (top, right, bottom, left) = (0, Wt, Ht, 0)
            enc = face_recognition.face_encodings(aligned, known_face_locations=[(0, Wt, Ht, 0)], num_jitters=num_jitters, model=embedding_model)
            if enc:
                out["embedding"] = enc[0].astype("float32")
        results.append(out)
    # One more pass if everything got filtered
    if not results and retry_if_empty and upsample == 0:
        return detect_and_align_faces(
            image=image, model=model, upsample=1, desired_size=desired_size, min_box=min_box,
            lap_var_thresh=lap_var_thresh, eye_pos=eye_pos, eye_dist_ratio=eye_dist_ratio,
            resize_max=resize_max, adaptive_blur_factor=adaptive_blur_factor,
            retry_if_empty=False, compute_embedding=compute_embedding,
            embedding_model=embedding_model, num_jitters=num_jitters,
        )
    return results

Python

This code contains the enhanced face encoder presented in Part 2 of this series, the other half of our toolkit for encoding & recognition.

What comes next

In the next posts we’ll add the actual worker for creating the Knowledgebase. It can also be used to extend the knowledgebase by adding several persons folders at once. When we have a knowledgebase consisting of named known persons, we can use it to identify images of these persons from a folder containing a bunch of photos. Batch recognition will be the subject of another post.      

Related Stories