In a first post we showed that the basics of person encoding and recognition are straightforward to implement. In the next post , we added full-body person recognition alongside a stronger face encoder—enough tooling to build and maintain a knowledge base of named people in your photo library We thus created the tools to build and manage a knowledgebase of named and identifiable people in our photo collections. We promised to add useful features to get the most out of working with such a knowledge base. What’s still missing is a real application with a graphical user interface (GUI) to tie everything together—and, since this is photo management, some basic viewing and logging capabilities!
PyQt Application
Because we work in Python, PyQt is a natural fit. A Python binding for the cross-platform GUI toolkit Qt. The current version PyQt6 supports Qt6 and runs on Windows, macOS and Linux. It’s easily installed with: pip install PyQt6 pyqt6-tools.
Online there are many good PyQt tutorials, you might for instance take a look at Real Python.
Creating the GUI APP
To keep things modular presentable we split the code into small, focused modules—loosely coupled, independently reusable components with clear responsibilities.
The application framework:
- main.py, the entry point of the application, that sets warning filters and boots the app;
- config.py, manages settings.json and shared constants;
- ui.py, the actual PyQt6 GUI that holds Main Window, Settings Dialog, logging bridge).
The encoder & recognition toolkit:
- reid_wrapper.py, a trimmed down version of our full body encoder, and
- face_encoder.py, our already demonstrated face encoder.
The application functions building on the toolkit:
- kb.py, the knowledgebase data model and the worker method for batch encoding;
- recognize.py, holds the worker method for batch recognition and searching.
We’ll extend this in future posts.
An Import strategy
An effective import strategy is crucial for building a modular Python application. While avoiding noise and circulars a disciplined import strategy improves maintainability and keeps the console clean. Our setup supports this by assuring that:
- main.py is the only file that installs early warning filters, then imports ui after filters are set—so noisy legacy TorchReID prints are silenced.
- ui.py only imports the workers (KBUpsertWorker, FolderWorker from kb, recognize) and import TorchreidBodyExtractor inside worker functions (lazy import).
- kb.py and recognize.py are GUI-free; they receive simple callables (log_fn, progress_fn) and import TorchreidBodyExtractor inside the worker or function right before use (lazy import).
- reid_wrapper.py contains all TorchReID quirks (dual layout + quiet import). Nothing else imports torchreid directly.
- config.py centralizes the AppConfig dataclass, defaults (valid_exts, encodings_filename, thresholds), and paths. Workers receive values, not the config object — to avoid hidden cross-module coupling.
Below we take a closer look at the individual sources.
Main
A minimal main.py that filters noisy warnings and starts the GUI:
# main.py
from __future__ import annotations
# --- warning filters BEFORE any imports that could trigger torchreid/face_recognition noise
import warnings
warnings.filterwarnings("ignore", category=UserWarning,
message=r"pkg_resources is deprecated as an API.*", module=r"face_recognition_models(\.|$)")
warnings.filterwarnings("ignore", category=DeprecationWarning,
message=r"pkg_resources is deprecated as an API.*")
warnings.filterwarnings("ignore", category=UserWarning,
message=r"Cython evaluation \(very fast so highly recommended\) is unavailable, now use python evaluation\.",
module=r"torchreid\..*")
from PyQt6 import QtWidgets
from ui import MainWindow
def main():
app = QtWidgets.QApplication([])
app.setApplicationName("People Recognition GUI")
app.setOrganizationName("YourOrg")
win = MainWindow()
win.show()
return app.exec()
if __name__ == "__main__":
raise SystemExit(main())
PythonThis filters clutter and launches the application starting the main window.
Config
config.py manages settings.json and shared defaults:
#config.py
from __future__ import annotations
from dataclasses import dataclass, asdict, field
from pathlib import Path
import json
ROOT = Path(__file__).resolve().parent
SETTINGS_FILE = ROOT / "settings.json" # or Path.cwd()/..., your pick
@dataclass
class AppConfig:
dataset_dir: str = ""
processed_dir: str = ""
process_dir: str = ""
face_tol: float = 0.45
body_tol: float = 0.80
reid_model: str = "osnet_ain_x1_0"
valid_exts: list[str] = field(default_factory=lambda: [".jpg", ".jpeg", ".png", ".webp"])
encodings_filename: str = "encodings.pkl"
resize_max: int = 800
lap_var_thresh: float = 80
kb_batch_size: int = 16
@classmethod
def load(cls) -> "AppConfig":
if SETTINGS_FILE.exists():
return cls(**json.loads(SETTINGS_FILE.read_text(encoding="utf-8")))
return cls()
def save(self) -> None:
SETTINGS_FILE.write_text(json.dumps(asdict(self), indent=2), encoding="utf-8")
PythonThis piece of code manages the content of our configuration file settings.json and shared constants.
For face recognition the following threshold values could be used.
| Threshold | Behavior |
| 0.6 | Standard, balanced |
| 0.5 | Strict |
| 0.4 | Very strict (almost identical faces only) |
| 0.35 or lower | Extremely strict — only near-perfect matches |
UI
ui.py builds the main window, status bar with progress, central log console, and menus (Settings, Manage Persons, Knowledge Base, Recognition, Help). The Settings dialog lets you browse folders and adjust thresholds and the ReID model. Worker threads (KBUpsertWorker, FolderWorker) handle long-running tasks; they communicate via thread-safe Qt signals to update progress and logs.
# ui.py
# ui.py
from config import AppConfig
from kb import KBUpsertWorker
from recognize import FolderWorker
# Python utilities
from dataclasses import replace
from datetime import datetime
from pathlib import Path
# enable Python build in logging
import logging
# PyQt GUI for application
from PyQt6 import QtCore, QtGui, QtWidgets
from PyQt6.QtCore import Qt, pyqtSignal
# ---- Logging bridge: route Python logging -> GUI ----
class GuiLogEmitter(QtCore.QObject):
message = pyqtSignal(str)
# Send logging records to the GUI via a Qt signal (thread-safe)
class GuiLogHandler(logging.Handler):
def __init__(self, emitter: GuiLogEmitter):
super().__init__()
self.emitter = emitter
def emit(self, record: logging.LogRecord) -> None:
try:
msg = self.format(record)
except Exception:
msg = record.getMessage()
self.emitter.message.emit(msg)
class SettingsDialog(QtWidgets.QDialog):
def __init__(self, cfg: AppConfig, parent=None):
super().__init__(parent)
self.setWindowTitle("Preferences")
self.setModal(True)
self.cfg = cfg
# --- fields
self.ed_dataset = QtWidgets.QLineEdit(cfg.dataset_dir)
self.ed_processed = QtWidgets.QLineEdit(cfg.processed_dir)
self.ed_process = QtWidgets.QLineEdit(cfg.process_dir)
self.sp_face = QtWidgets.QDoubleSpinBox()
self.sp_face.setRange(0.0, 2.0); self.sp_face.setSingleStep(0.01); self.sp_face.setValue(cfg.face_tol)
self.sp_body = QtWidgets.QDoubleSpinBox()
self.sp_body.setRange(0.0, 1.0); self.sp_body.setSingleStep(0.01); self.sp_body.setValue(cfg.body_tol)
self.cb_model = QtWidgets.QComboBox()
self.cb_model.addItems(["osnet_ain_x1_0", "osnet_x1_0"])
i = self.cb_model.findText(cfg.reid_model)
if i >= 0:
self.cb_model.setCurrentIndex(i)
# --- browse helpers
def browse(line: QtWidgets.QLineEdit):
d = QtWidgets.QFileDialog.getExistingDirectory(self, "Choose folder", line.text() or str(Path.cwd()))
if d: line.setText(d)
btn_browse_ds = QtWidgets.QPushButton("Browse…"); btn_browse_ds.clicked.connect(lambda: browse(self.ed_dataset))
btn_browse_out = QtWidgets.QPushButton("Browse…"); btn_browse_out.clicked.connect(lambda: browse(self.ed_processed))
btn_browse_in = QtWidgets.QPushButton("Browse…"); btn_browse_in.clicked.connect(lambda: browse(self.ed_process))
# --- form
form = QtWidgets.QFormLayout()
form.setLabelAlignment(Qt.AlignmentFlag.AlignRight)
form.setContentsMargins(12, 12, 12, 0)
row_ds = QtWidgets.QHBoxLayout(); row_ds.addWidget(self.ed_dataset); row_ds.addWidget(btn_browse_ds)
row_out = QtWidgets.QHBoxLayout(); row_out.addWidget(self.ed_processed); row_out.addWidget(btn_browse_out)
row_in = QtWidgets.QHBoxLayout(); row_in.addWidget(self.ed_process); row_in.addWidget(btn_browse_in)
form.addRow("Persons dataset:", row_ds)
form.addRow("Processed output:", row_out)
form.addRow("Process input:", row_in)
form.addRow("Face tolerance (≤):", self.sp_face)
form.addRow("Body similarity (≥):", self.sp_body)
form.addRow("ReID model:", self.cb_model)
# --- button box (Qt6 enum)
bb = QtWidgets.QDialogButtonBox(
QtWidgets.QDialogButtonBox.StandardButton.Ok
| QtWidgets.QDialogButtonBox.StandardButton.Cancel
)
bb.accepted.connect(self.accept)
bb.rejected.connect(self.reject)
# --- outer layout
main = QtWidgets.QVBoxLayout(self)
main.addLayout(form)
main.addStretch(1) # keep buttons at the bottom
main.addWidget(bb, alignment=Qt.AlignmentFlag.AlignRight)
def values(self) -> AppConfig:
return replace(
self.cfg,
dataset_dir=self.ed_dataset.text().strip(),
processed_dir=self.ed_processed.text().strip(),
process_dir=self.ed_process.text().strip(),
face_tol=float(self.sp_face.value()),
body_tol=float(self.sp_body.value()),
reid_model=self.cb_model.currentText().strip(),
)
# ---- Main Window of the Application ----------------------------------------------------------------
class MainWindow(QtWidgets.QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("Managing Persons in Photo Collections")
self.resize(1100, 700)
self._make_central_console()
self._make_statusbar()
self._make_menubar()
self._wire_logging()
self.log("Application started. Ready.")
# wire settings into the app
self.cfg = AppConfig.load()
# Restore last window geometry/state
self._settings = QtCore.QSettings("YourOrg", "PeopleRecognitionGUI")
if (geo := self._settings.value("main/geometry")):
self.restoreGeometry(geo)
if (state := self._settings.value("main/windowState")):
self.restoreState(state)
# --- UI components
def _make_central_console(self): # canvas for writing logging, dislaying messages etc.
self.console = QtWidgets.QPlainTextEdit(readOnly=True)
self.console.setWordWrapMode(QtGui.QTextOption.WrapMode.NoWrap)
self.console.setFont(QtGui.QFont("Consolas", 10))
self.console.setPlaceholderText("Log output will appear here…")
self.setCentralWidget(self.console)
def _make_statusbar(self): # status bas for visual progress feedback
sb = QtWidgets.QStatusBar()
self.setStatusBar(sb)
self.progress = QtWidgets.QProgressBar()
self.progress.setRange(0, 100)
self.progress.setValue(0)
self.progress.setTextVisible(True)
self.progress.setFixedWidth(260)
sb.addPermanentWidget(self.progress)
self.statusBar().showMessage("Ready")
def _make_menubar(self): # menu to manage application functions
mb = self.menuBar()
# Quit
self.menu_settings = mb.addMenu("&Quit")
act_quit = QtGui.QAction("Quit", self)
act_quit.triggered.connect(self.close)
self.menu_settings.addAction(act_quit)
# Settings
self.menu_settings = mb.addMenu("&Settings")
act_prefs = QtGui.QAction("Preferences…", self)
act_prefs.triggered.connect(self._on_prefs)
self.menu_settings.addAction(act_prefs)
# Manage Persons
self.menu_manage_persons = mb.addMenu("&Manage Persons")
self.menu_manage_persons.addAction(self._ph("Add Person"))
self.menu_manage_persons.addAction(self._ph("Remove Person"))
self.menu_manage_persons.addAction(self._ph("Rename / Merge…"))
# Query Persons
self.menu_query = mb.addMenu("&Query Persons")
self.menu_query.addAction(self._ph("Search in Folder…"))
self.menu_query.addAction(self._ph("Interactive Identify…"))
# Manage Knowledgebase
self.menu_kb = mb.addMenu("&Knowledgebase")
act_kb_init = QtGui.QAction("Initialize KB…", self)
act_kb_init.triggered.connect(self._kb_init)
self.menu_kb.addAction(act_kb_init)
act_kb_extend = QtGui.QAction("Extend KB from Folder…", self)
act_kb_extend.triggered.connect(self._kb_extend)
self.menu_kb.addAction(act_kb_extend)
self.menu_kb.addSeparator()
self.menu_kb.addAction(self._ph("Show Stats…"))
self.menu_kb.addAction(self._ph("Export Log…"))
# Recognition
self.menu_recognition = mb.addMenu("&Recognition")
act_recognize = QtGui.QAction("Recognize Folder…", self)
act_recognize.triggered.connect(self._recognize_folder)
demo = QtGui.QAction("Demo Progress (5s)", self);
demo.triggered.connect(self._demo_progress)
self.menu_recognition.addAction(act_recognize)
self.menu_recognition.addSeparator()
self.menu_recognition.addAction(demo)
# Help / About
self.menu_help = mb.addMenu("&Help")
act_about = QtGui.QAction("About…", self, triggered=self._on_about)
act_clear = QtGui.QAction("Clear Log", self, shortcut="Ctrl+L", triggered=self.console.clear)
self.menu_help.addActions([act_about, act_clear])
def _sep(self) -> QtGui.QAction: # menu separator
sep = QtGui.QAction(self)
sep.setSeparator(True)
return sep
def _ph(self, text: str) -> QtGui.QAction: # menu placeholder
"""Create a placeholder action that just logs for now."""
act = QtGui.QAction(text, self)
act.triggered.connect(lambda _, t=text: self.log(f"[TODO] {t}"))
return act
# --- Logging
def _wire_logging(self):
self._log_emitter = GuiLogEmitter()
self._log_emitter.message.connect(self._append_log_line)
self._log_handler = GuiLogHandler(self._log_emitter)
self._log_handler.setFormatter(logging.Formatter("%(asctime)s — %(levelname)s — %(message)s"))
logging.getLogger().addHandler(self._log_handler)
logging.getLogger().setLevel(logging.INFO)
def log(self, msg: str, level: int = logging.INFO):
# Use Python logging so external modules can log into the GUI too.
logging.getLogger().log(level, msg)
@QtCore.pyqtSlot(str) # slot for recieving massages from workers and displaying these in the console
def _append_log_line(self, text: str):
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.console.appendPlainText(text if text.startswith("20") else f"{timestamp} — {text}")
# Auto-scroll
self.console.verticalScrollBar().setValue(self.console.verticalScrollBar().maximum())
def _on_prefs(self):
dlg = SettingsDialog(self.cfg, self)
if dlg.exec() == QtWidgets.QDialog.DialogCode.Accepted:
self.cfg = dlg.values()
self.cfg.save()
self.log(f"Preferences saved: dataset='{self.cfg.dataset_dir}', "
f"output='{self.cfg.processed_dir}', process='{self.cfg.process_dir}', "
f"FACE_TOL={self.cfg.face_tol:.2f}, BODY_SIM_TOL={self.cfg.body_tol:.2f}, "
f"model={self.cfg.reid_model}")
def _on_about(self):
QtWidgets.QMessageBox.information(self, "About",
"People Recognition — Minimal GUI\n\n"
"Part I/II foundation with face + body recognition.\n"
"This app is the starting point for a full workflow."
)
def _demo_progress(self): # simulate a short task to show the status/progress bar
self.statusBar().showMessage("Running demo task…")
self.progress.setValue(0)
steps = 50
self._demo_timer = QtCore.QTimer(self)
self._demo_timer.setInterval(100) # 0.1s * 50 = ~5s
self._demo_i = 0
def tick():
self._demo_i += 1
pct = int(self._demo_i * 100 / steps)
self.progress.setValue(pct)
if self._demo_i >= steps:
self._demo_timer.stop()
self.statusBar().showMessage("Ready")
self.log("Demo task finished.")
self._demo_timer.timeout.connect(tick)
self._demo_timer.start()
# --- Close/save state
def closeEvent(self, event: QtGui.QCloseEvent) -> None:
self._settings.setValue("main/geometry", self.saveGeometry())
self._settings.setValue("main/windowState", self.saveState())
super().closeEvent(event)
# manage the thread to run the batch recognition processing of a folder
def _recognize_folder(self):
folder = QtWidgets.QFileDialog.getExistingDirectory(self, "Choose folder to recognize", self.cfg.process_dir)
if not folder:
return
kb_path = Path(self.cfg.dataset_dir) / self.cfg.encodings_filename
out_dir = Path(self.cfg.process_dir) if self.cfg.process_dir else (Path(folder) / "recognized_out")
self.statusBar().showMessage("Recognizing…")
self.progress.setValue(0)
self.log(f"Starting recognition on: {folder}")
self._thread = QtCore.QThread(self)
self._worker = FolderWorker(kb_path=kb_path, input_folder=Path(folder), output_folder=out_dir, face_threshold=self.cfg.face_tol,
body_threshold=self.cfg.body_tol, reid_model=self.cfg.reid_model, valid_exts=tuple(self.cfg.valid_exts),)
self._worker.moveToThread(self._thread)
self._thread.started.connect(self._worker.run)
self._worker.progress.connect(self.progress.setValue)
self._worker.log.connect(self.log)
def done(msg):
self.statusBar().showMessage("Ready")
self.log(msg)
self.progress.setValue(0)
self._thread.quit(); self._thread.wait()
self._worker.finished.connect(done)
self._thread.start()
# manage the thread to run the batch creation or extending of the known-persons knowledgebase
def _kb_init(self):
# Pick the KB root (dataset folder containing person subfolders)
start = self.cfg.dataset_dir or str(Path.cwd())
folder = QtWidgets.QFileDialog.getExistingDirectory(self, "Select KB Root (dataset)", start)
if not folder:
return
main_dir = Path(folder)
self.cfg.dataset_dir = str(main_dir)
self.cfg.save()
kb_path = main_dir / self.cfg.encodings_filename # use setting, not hard-coded
self.statusBar().showMessage("Initializing KB…")
self.progress.setValue(0)
self.log(f"KB Initialize: main={main_dir} kb={kb_path}")
self._kbu_thread = QtCore.QThread(self)
self._kbu_worker = KBUpsertWorker(main_dir=main_dir, kb_path=kb_path, reid_model=self.cfg.reid_model, source_dir=None, copy_mode="copy", add_only_new_persons=True,
valid_exts=tuple(self.cfg.valid_exts), kb_batch_size=int(self.cfg.kb_batch_size),)
self._kbu_worker.moveToThread(self._kbu_thread)
self._kbu_thread.started.connect(self._kbu_worker.run)
self._kbu_worker.progress.connect(self.progress.setValue)
self._kbu_worker.log.connect(self.log)
def done(stats: dict):
self.statusBar().showMessage("Ready")
if not stats:
self.log("[KB] Job failed. See log for details.")
else:
self.log(
f"[KB] Init done. persons_added={stats.get('persons_added',0)}, "
f"faces+={stats.get('faces_added',0)}, bodies+={stats.get('bodies_added',0)} | "
f"totals: faces={stats.get('total_faces',0)}, bodies={stats.get('total_bodies',0)}, "
f"names={stats.get('distinct_names',0)}"
)
self.progress.setValue(0)
self._kbu_thread.quit(); self._kbu_thread.wait()
self._kbu_worker.finished.connect(done)
self._kbu_thread.start()
def _kb_extend(self):
# Ensure we know the KB root first
if not self.cfg.dataset_dir:
QtWidgets.QMessageBox.warning(self, "No KB", "Initialize the KB first (choose a KB root).")
return
main_dir = Path(self.cfg.dataset_dir)
if not main_dir.exists():
QtWidgets.QMessageBox.warning(self, "KB Missing", f"KB root not found:\n{main_dir}")
return
# Pick staging folder with person subfolders to add
source = QtWidgets.QFileDialog.getExistingDirectory(self, "Select Folder to Extend From", str(Path.cwd()))
if not source:
return
kb_path = main_dir / self.cfg.encodings_filename
self.statusBar().showMessage("Extending KB…")
self.progress.setValue(0)
self.log(f"KB Extend: main={main_dir} kb={kb_path} from={source}")
self._kbu_thread = QtCore.QThread(self)
self._kbu_worker = KBUpsertWorker(main_dir=main_dir, kb_path=kb_path, reid_model=self.cfg.reid_model, source_dir=Path(source), copy_mode="copy", add_only_new_persons=True,
valid_exts=tuple(self.cfg.valid_exts), kb_batch_size=int(self.cfg.kb_batch_size),)
self._kbu_worker.moveToThread(self._kbu_thread)
self._kbu_thread.started.connect(self._kbu_worker.run)
self._kbu_worker.progress.connect(self.progress.setValue)
self._kbu_worker.log.connect(self.log)
def done(stats: dict):
self.statusBar().showMessage("Ready")
if not stats:
self.log("[KB] Job failed. See log for details.")
else:
self.log(
f"[KB] Extend done. persons_added={stats.get('persons_added',0)}, "
f"faces+={stats.get('faces_added',0)}, bodies+={stats.get('bodies_added',0)} | "
f"totals: faces={stats.get('total_faces',0)}, bodies={stats.get('total_bodies',0)}, "
f"names={stats.get('distinct_names',0)}"
)
self.progress.setValue(0)
self._kbu_thread.quit(); self._kbu_thread.wait()
self._kbu_worker.finished.connect(done)
self._kbu_thread.start()
PythonThis is the code that creates and shows our application. With a menu to guide the operations and a progress bar for visual feed back. It also has central console for messages and logging information. This looks like quite a lot of code, which in fact it is. But if you read through it, you’ll notice that it rather easy to understand.
Thread safe Workers
You will notice that the Main Windows contains two menu items for batch processes: Knowledgebase ‘Create or extend KB’ and Recognition ‘Recognize folder’. These trigger the helper methods _create_or_extend_kb() and _recognize_folder(). Special methods that create a separate thread beside the main application thread and supply it with the means to communicate their progress to the main window so it can be shown there. We implement the actual workers: the classes FolderWorker and KBUpsertWorker in the next post.
These worker methods both make intensive use of our toolbox for encoding & recognition. To complete the discussion on building the application both these sources are shown below.
Reid_wrapper
# reid_wrapper.py
# trimmed down version of torchreid_extractor.py
from __future__ import annotations
import importlib, importlib.util, io, contextlib
from typing import Optional, Iterable, List, Callable
from PIL import Image
import numpy as np
import torch
from torchvision import transforms
# Return the torchreid.models module, supporting canonical and legacy layouts.
def _import_models(quiet: bool = True, log_fn: Optional[Callable[[str], None]] = None):
def _cap(modname: str):
if not quiet:
return importlib.import_module(modname)
buf = io.StringIO()
with contextlib.redirect_stdout(buf), contextlib.redirect_stderr(buf):
mod = importlib.import_module(modname)
msg = buf.getvalue().strip()
if msg and log_fn:
log_fn(f"[ReID] {msg.splitlines()[-1]}")
return mod
if importlib.util.find_spec("torchreid") is None:
raise ImportError("TorchReID not found. Install torchreid (>=1.4.0) or the GitHub repo.")
# Try canonical then legacy
for name in ("torchreid.models", "torchreid.reid.models"):
if importlib.util.find_spec(name) is not None:
if name != "torchreid.models" and log_fn:
log_fn(f"[ReID] Using fallback module path: {name}")
return _cap(name)
# Last resort: import base and walk attributes
pkg = _cap("torchreid")
for attr_path in ("models", "reid.models"):
obj = pkg
ok = True
for part in attr_path.split("."):
obj = getattr(obj, part, None)
if obj is None:
ok = False
break
if ok:
if attr_path != "models" and log_fn:
log_fn(f"[ReID] Using models via torchreid.{attr_path}")
return obj
# if it all fails
raise ImportError("Could not import TorchReID models (tried torchreid.models and torchreid.reid.models).")
# Quiet, minimal feature extractor around TorchReID backbones:
# __call__(PIL.Image) -> (D,) float32 L2-normalized
# extract_batch(List[PIL.Image], batch_size=32) -> (N, D) float32 L2-normalized
# extract_paths(Iterable[str], batch_size=32) -> (N, D) float32 L2-normalized
class TorchreidBodyExtractor:
def __init__(self, model_name: str = "osnet_ain_x1_0", device: Optional[str] = None, height: int = 256, width: int = 128, quiet: bool = True, log_fn: Optional[Callable[[str], None]] = None,):
self.model_name = model_name
self.height, self.width = int(height), int(width)
self.quiet = bool(quiet)
self._log = log_fn
if device is None:
device = "cuda:0" if torch.cuda.is_available() else "cpu"
self.device = torch.device(device)
# Import models lazily and build the backbone
models = _import_models(self.quiet, self._log)
# Build model class directly (no repo-specific builders)
Model = getattr(models, model_name, None)
if Model is None:
# some installs keep classes in __dict__
Model = models.__dict__.get(model_name)
if Model is None:
avail = sorted([k for k, v in models.__dict__.items() if not k.startswith("_")])
raise ValueError(f"Unknown model '{model_name}'. Available: {avail[:20]}...")
# Quiet stdout while constructing (pretrained download line, etc.)
if self.quiet:
buf = io.StringIO()
with contextlib.redirect_stdout(buf), contextlib.redirect_stderr(buf):
self.model = Model(pretrained=True) # downloads weights if needed
msg = buf.getvalue().strip()
if msg and self._log:
self._log(f"[ReID] {msg.splitlines()[-1]}")
else:
self.model = Model(pretrained=True)
self.model.eval().to(self.device)
# Use plain torchvision transforms; keep consistent with TorchReID training stats
self.transform = transforms.Compose([
transforms.Resize((self.height, self.width), interpolation=transforms.InterpolationMode.BILINEAR),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])
# ---- public API ----
def __call__(self, image: Image.Image) -> np.ndarray:
t = self.transform(image.convert("RGB")).unsqueeze(0).to(self.device, non_blocking=True)
with torch.inference_mode():
f = self.model(t) # shape [1, D]
f = torch.nn.functional.normalize(f, dim=1)
return f.squeeze(0).detach().cpu().numpy().astype("float32")
def extract_batch(self, images: List[Image.Image], batch_size: int = 32, num_workers: int = 0) -> np.ndarray:
# turn PILs into tensors
tensors = [self.transform(im.convert("RGB")) for im in images]
if not tensors:
return np.zeros((0, 0), dtype=np.float32)
# simple mini-batching, no DataLoader workers to avoid PIL/thread issues on Windows
feats = []
with torch.inference_mode():
for i in range(0, len(tensors), batch_size):
batch = torch.stack(tensors[i:i+batch_size], dim=0).to(self.device, non_blocking=True)
f = self.model(batch)
f = torch.nn.functional.normalize(f, dim=1)
feats.append(f.detach().cpu())
return torch.cat(feats, dim=0).numpy().astype("float32", copy=False)
def extract_paths(self, paths: Iterable[str], batch_size: int = 32, num_workers: int = 0) -> np.ndarray:
imgs: List[Image.Image] = []
for p in paths:
try:
with Image.open(p) as im:
imgs.append(im.convert("RGB").copy()) # detach from file handle
except Exception:
continue
return self.extract_batch(imgs, batch_size=batch_size, num_workers=num_workers)
@staticmethod
def cosine_sim(a: np.ndarray, b: np.ndarray) -> float:
return float(np.dot(a, b))
@staticmethod
def euclid_dist(a: np.ndarray, b: np.ndarray) -> float:
return float(np.linalg.norm(a - b, ord=2))
PythonThis piece of code contains our wrapper around the PyTorch TorchReID library of models for full-body recognition.
Face_encoder
# face_encoder.py
# Face encoder / aligner: robust dlib input, HOG→CNN fallback,
# • normalizes images into a dlib-friendly RGB buffer (fixes Windows stride/ownership quirks
# • does HOG→CNN fallbacks, upsample retries, alignment by eye landmarks
# • filters blurry faces via Laplacian variance
# • can directly compute a face embedding per aligned chip
from __future__ import annotations
import math
from io import BytesIO
from typing import Any, Dict, List, Tuple, Optional
import cv2
import numpy as np
import face_recognition
from PIL import Image
# silence deprecation noise from face_recognition_models
import warnings
warnings.filterwarnings(
"ignore",
category=UserWarning,
message=r"pkg_resources is deprecated as an API.*",
module=r"face_recognition_models(\.|$)"
)
# Produce a *fresh*, C-contiguous, writeable uint8 RGB array (H,W,3) by reloading via face_recognition.load_image_file.
# This sidesteps all stride/ownership weirdness that can upset dlib on Windows.
def _fr_ready_rgb(img: Any) -> np.ndarray:
if hasattr(img, "mode"): # PIL.Image
pil = img.convert("RGB")
else:
arr = np.asarray(img)
if arr.ndim == 2:
pil = Image.fromarray(arr, mode="L").convert("RGB")
elif arr.ndim == 3 and arr.shape[2] == 3:
# if it looks like BGR, flip to RGB first
if float(arr[..., 0].mean() or 0) > 1.1 * float(arr[..., 2].mean() or 1e-6):
arr = cv2.cvtColor(arr, cv2.COLOR_BGR2RGB)
pil = Image.fromarray(arr, mode="RGB")
elif arr.ndim == 3 and arr.shape[2] == 4:
try:
arr = cv2.cvtColor(arr, cv2.COLOR_BGRA2RGB)
except Exception:
arr = arr[..., :3]
pil = Image.fromarray(arr, mode="RGB")
else:
raise RuntimeError(f"Unsupported input for dlib: shape={arr.shape if 'arr' in locals() else 'n/a'}")
buf = BytesIO()
pil.save(buf, format="PNG") # lossless, fast
buf.seek(0)
arr = face_recognition.load_image_file(buf) # -> uint8 RGB (H,W,3), contiguous
return np.require(arr, dtype=np.uint8, requirements=["C", "O", "W"])
def _lap_var(img_rgb: np.ndarray) -> float:
g = cv2.cvtColor(img_rgb, cv2.COLOR_RGB2GRAY)
return float(cv2.Laplacian(g, cv2.CV_64F).var())
def _mean_luma(img_rgb: np.ndarray) -> float:
g = cv2.cvtColor(img_rgb, cv2.COLOR_RGB2GRAY)
return float(np.mean(g))
def detect_and_align_faces(image: Any, model: str = "hog", upsample: int = 0, desired_size: int = 160, min_box: int = 40, lap_var_thresh: Optional[float] = 80.0, eye_pos: Tuple[float, float] = (0.5, 0.4), eye_dist_ratio: float = 0.35,
resize_max: Optional[int] = 800, adaptive_blur_factor: float = 0.5, retry_if_empty: bool = True, compute_embedding: bool = False, embedding_model: str = "small", num_jitters: int = 1,) -> List[Dict[str, Any]]:
# 1) Normalize to a dlib-friendly RGB buffer
rgb_full = _fr_ready_rgb(image)
H_full, W_full = rgb_full.shape[:2]
# 2) Optional downscale for detection speed
scale = 1.0
max_side = max(H_full, W_full)
if resize_max is not None and max_side > resize_max:
scale = resize_max / float(max_side)
new_w, new_h = int(W_full * scale), int(H_full * scale)
rgb_det = cv2.resize(rgb_full, (new_w, new_h), interpolation=cv2.INTER_AREA)
rgb_det = _fr_ready_rgb(rgb_det) # ensure fresh buffer after resize
else:
rgb_det = rgb_full
# 3) Detection with robust fallbacks
try:
locs = face_recognition.face_locations(rgb_det, number_of_times_to_upsample=upsample, model=model)
except Exception:
if model == "hog":
# HOG also supports 8-bit gray; retry there
gray = cv2.cvtColor(rgb_det, cv2.COLOR_RGB2GRAY)
gray = np.require(gray, dtype=np.uint8, requirements=["C", "O", "W"])
locs = face_recognition.face_locations(gray, number_of_times_to_upsample=upsample, model="hog")
else:
raise
if not locs and model == "hog":
# escalate to CNN on RGB
try:
locs = face_recognition.face_locations(rgb_det, number_of_times_to_upsample=max(upsample, 1), model="cnn")
except Exception as e:
raise RuntimeError(
f"dlib CNN detector rejected image: dtype={rgb_det.dtype}, shape={rgb_det.shape}, "
f"C={rgb_det.flags.c_contiguous}, strides={rgb_det.strides}"
) from e
if not locs and retry_if_empty and upsample == 0:
locs = face_recognition.face_locations(rgb_det, number_of_times_to_upsample=1, model=model)
if not locs:
return []
# 4) Landmarks (always on RGB)
all_landmarks = face_recognition.face_landmarks(rgb_det, face_locations=locs, model="large") or []
# 5) Align chips on full-res image
results: List[Dict[str, Any]] = []
Wt = Ht = int(desired_size)
dest_eye_x = Wt * eye_pos[0]
dest_eye_y = Ht * eye_pos[1]
desired_dist = eye_dist_ratio * Wt
global_lap_var = _lap_var(rgb_full)
effective_blur_thresh: Optional[float] = (adaptive_blur_factor * global_lap_var if lap_var_thresh is None else float(lap_var_thresh))
for (top, right, bottom, left), lm in zip(locs, all_landmarks):
# back-map bbox to original scale
top_o = int(round(top / scale))
right_o = int(round(right / scale))
bottom_o = int(round(bottom / scale))
left_o = int(round(left / scale))
w_o, h_o = (right_o - left_o), (bottom_o - top_o)
if w_o < min_box or h_o < min_box:
continue
if not lm or ("left_eye" not in lm or "right_eye" not in lm):
continue
# eye centers in detection scale
left_eye = np.mean(np.array(lm["left_eye"]), axis=0)
right_eye = np.mean(np.array(lm["right_eye"]), axis=0)
dY = right_eye[1] - left_eye[1]
dX = right_eye[0] - left_eye[0]
angle = math.degrees(math.atan2(dY, dX))
dist = (dX ** 2 + dY ** 2) ** 0.5
if dist < 1e-6:
continue
scale_aff = desired_dist / dist
eyes_center = ((left_eye[0] + right_eye[0]) * 0.5, (left_eye[1] + right_eye[1]) * 0.5)
M = cv2.getRotationMatrix2D(eyes_center, angle, scale_aff)
M[0, 2] += (dest_eye_x - eyes_center[0])
M[1, 2] += (dest_eye_y - eyes_center[1])
# rescale translation to apply on full-res
M_full = M.copy()
if scale != 1.0:
M_full[:, 2] /= scale
aligned = cv2.warpAffine(rgb_full, M_full, (Wt, Ht), flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_REFLECT)
blur_var = _lap_var(aligned)
if effective_blur_thresh is not None and blur_var < effective_blur_thresh:
continue
out: Dict[str, Any] = {
"aligned": aligned,
"bbox": (top_o, right_o, bottom_o, left_o),
"landmarks": {k: [(int(round(px / scale)), int(round(py / scale))) for (px, py) in v] for k, v in lm.items()},
"transform": M_full.astype("float32"),
"scale": float(scale),
"blur_var": float(blur_var),
"mean_luma": _mean_luma(aligned),
}
if compute_embedding:
# correct bbox order: (top, right, bottom, left) = (0, Wt, Ht, 0)
enc = face_recognition.face_encodings(aligned, known_face_locations=[(0, Wt, Ht, 0)], num_jitters=num_jitters, model=embedding_model)
if enc:
out["embedding"] = enc[0].astype("float32")
results.append(out)
# One more pass if everything got filtered
if not results and retry_if_empty and upsample == 0:
return detect_and_align_faces(
image=image, model=model, upsample=1, desired_size=desired_size, min_box=min_box,
lap_var_thresh=lap_var_thresh, eye_pos=eye_pos, eye_dist_ratio=eye_dist_ratio,
resize_max=resize_max, adaptive_blur_factor=adaptive_blur_factor,
retry_if_empty=False, compute_embedding=compute_embedding,
embedding_model=embedding_model, num_jitters=num_jitters,
)
return results
PythonThis code contains the enhanced face encoder presented in Part 2 of this series, the other half of our toolkit for encoding & recognition.
What comes next
In the next posts we’ll add the actual worker for creating the Knowledgebase. It can also be used to extend the knowledgebase by adding several persons folders at once. When we have a knowledgebase consisting of named known persons, we can use it to identify images of these persons from a folder containing a bunch of photos. Batch recognition will be the subject of another post.