Managing Persons in Photo Collections – Building the Knowledge base

A knowledge base of known persons

Note: In this article, real people’s faces and names in illustrations have been intentionally obscured to protect privacy

As has been noted, this post is part of our series on encoding and recognition of persons in photo collections. Earlier, in this previous post, we created the basis of the application GUI. The next step is batch processing for encoding and recognition. Before we can recognize anyone, we need encodings—so we start by building a knowledge base (KB). In our setup, the KB is the combination of a dataset folder (one subfolder per known person, each containing photos) and an encodings.pkl file stored at the dataset root. In settings.json this root is called dataset_dir, which is updated when you initialize the KB.

Thus, there’s one main dataset (images + encodings) per collection. We’ll manage this KB with functions to add, remove, and rename persons. A special case—similar to the initial creation—is batch adding new persons from a separate, temporary folder structured as: a root directory with multiple subfolders named after each person (each containing ~15–20 images).

We use a single worker method, create_or_extend_kb(), for both the initial creation and later extensions. For clarity, the GUI exposes two separate menu items that call the worker with different parameters to signal the task.

Worker behavior

The behavior of the worker is as follows. In init mode (to create the KB) we call it with ‘source_dir = None’ as the main directory itself is the source. The worker walks the main_dir and encodes the named persons folders. In extend mode (to batch extend the KB) it is called with ‘source_dir=Path(…)’. The worker discovers new person folders in source_dir  and copies or moves those into the main_dir (skipping existing names unless you set ‘add_only_new_persons=False’), and then encodes only the newly added folders.

Helpers

The source file kb.py also contains several small helper methods.

_list_person_dirs(dir) produces the set of names and list of person subfolders.

_copy_new_persons(source_dir, main_dir, existing_names, copy_mode, log) copies folders and returns the list of newly added Paths and their names.

Default kb_path = main_dir / “encodings.pkl” when not provided.

Wiring it into the GUI

The worker class BUpsertWorker connects the worker method to the Main Window of the GUI Application.

We use two menu items.

  • Knowledgebase,  Initialize KB… starts a Folder picker to locate the main_dir that becomes the KB root. Then we call KBUpsertWorker with main_dir = the indicated folder and source_dir=None.
  • Knowledgebase, Extend KB from Folder… starts a Folder picker to locate the source_dir,  the temporary structure with the /<person_name>/* images folders to be added to the main_dir. Then we call  KBUpsertWorker with source_dir= the indicated folder to add.
  • The central console shows, for example:
  • “Copied folder ‘Alice’ into KB”;
  • “Skip existing person: Bob” (if a name already exists);
  • Per-image lines like: “body✓ face+N”;
  • A final total summary.

Knowledgebase

Below the content of the source file kb.py

# kb.py
from __future__ import annotations
from typing import Dict, Any, Iterable, Optional, List, Tuple
from pathlib import Path
import shutil, pickle
import numpy as np
from PIL import Image, ImageFile
from PyQt6 import QtCore
from PyQt6.QtCore import pyqtSignal

from face_encoder import detect_and_align_faces

# ----------------- KB layout -----------------
# KB = <main_dir>/ (one subfolder per person)
#    + <main_dir>/encodings.pkl

EMPTY_KB: Dict[str, Any] = {
    "face_encodings": [], "face_names": [],
    "body_encodings": [], "body_names": [],
}

def load_kb_compat(kb_path: Path) -> Dict[str, Any]:
    if not kb_path.exists():
        return {k: [] for k in EMPTY_KB}
    db = pickle.load(open(kb_path, "rb"))
    if all(k in db for k in EMPTY_KB):
        return db
    # TODO: backwards-compat conversions when needed
    return {k: db.get(k, []) for k in EMPTY_KB}

def save_kb(kb_path: Path, persons: Dict[str, Any]) -> None:
    clean = {
        "face_encodings": [np.asarray(v, np.float32) for v in persons.get("face_encodings", [])],
        "face_names":     [str(n) for n in persons.get("face_names", [])],
        "body_encodings": [np.asarray(v, np.float32) for v in persons.get("body_encodings", [])],
        "body_names":     [str(n) for n in persons.get("body_names", [])],
    }
    pickle.dump(clean, open(kb_path, "wb"))

def _list_person_dirs(root: Path) -> Tuple[set[str], List[Path]]:
    names, dirs = set(), []
    for p in sorted(root.iterdir()):
        if p.is_dir():
            names.add(p.name)
            dirs.append(p)
    return names, dirs

def _copy_new_persons(
    source_dir: Path, main_dir: Path, existing_names: set[str],
    copy_mode: str, log_fn=None
) -> List[Path]:
    added_dirs: List[Path] = []
    for p in sorted(source_dir.iterdir()):
        if not p.is_dir():
            continue
        name = p.name
        dst = main_dir / name
        if name in existing_names and dst.exists():
            if log_fn:
                log_fn(f"[KB] Skip existing person: {name}")
            continue
        if copy_mode == "move":
            shutil.move(str(p), str(dst))
            action = "moved"
        else:
            shutil.copytree(str(p), str(dst), dirs_exist_ok=False)
            action = "copied"
        if log_fn:
            log_fn(f"[KB] {action.capitalize()} folder '{name}' into KB")
        added_dirs.append(dst)
    return added_dirs

def _collect_targets(person_dirs: List[Path], valid_exts: Iterable[str]) -> List[tuple[str, Path]]:
    valid = tuple(e.lower() for e in valid_exts)
    targets: List[tuple[str, Path]] = []
    for person_dir in person_dirs:
        name = person_dir.name
        for q in person_dir.iterdir():
            if q.suffix.lower() in valid:
                targets.append((name, q))
    return targets

# If source_dir is None: (create/refresh) encode from main_dir.
# If source_dir is given: copy/move *new* person folders from source_dir -> main_dir, then encode *only those*.
def create_or_extend_kb(main_dir, kb_path: Optional[Path] = None, reid_model: str = "osnet_ain_x1_0", source_dir: Optional[Path] = None, copy_mode: str = "copy", add_only_new_persons: bool = True, 
                        log_fn=None, progress_fn=None, valid_exts: Iterable[str] = (".jpg", ".jpeg", ".png", ".webp"), batch_size: int = 16, ) -> Dict[str, Any]:
    # lazy import to keep torchreid quiet
    from reid_wrapper import TorchreidBodyExtractor
    ImageFile.LOAD_TRUNCATED_IMAGES = True
    main_dir = Path(main_dir)
    if kb_path is None:
        kb_path = main_dir / "encodings.pkl"
    else:
        kb_path = Path(kb_path)
    if not main_dir.exists():
        raise FileNotFoundError(f"KB root (dataset) not found: {main_dir}")
    # ---- load current encodings (if any) ----
    is_new_kb = not kb_path.exists()
    persons = load_kb_compat(kb_path) if not is_new_kb else {k: [] for k in EMPTY_KB}
    # ---- decide which person folders to encode ----
    existing_names, existing_dirs = _list_person_dirs(main_dir)
    person_dirs_to_encode: List[Path]
    skipped_persons: List[str] = []

    if source_dir is None:
        # init/refresh mode: encode everything in main_dir, optionally skipping existing names based on persons db
        person_dirs_to_encode = existing_dirs
        if add_only_new_persons and persons["face_names"]:
            # when extending in place, skip names already encoded (lightweight heuristic)
            encoded_names = set(persons["face_names"]) | set(persons["body_names"])
            person_dirs_to_encode = [d for d in person_dirs_to_encode if d.name not in encoded_names]
            skipped_persons = sorted(list(existing_names & encoded_names))
            if log_fn and skipped_persons:
                log_fn(f"[KB] Skipping already-encoded: {', '.join(skipped_persons)}")
    else:
        source_dir = Path(source_dir)
        if source_dir.resolve() == main_dir.resolve():
            raise ValueError("source_dir must differ from main_dir for extend mode.")
        # copy new person folders into main_dir (respect add_only_new_persons)
        names_before = set(existing_names)
        added_dirs = _copy_new_persons(
            source_dir, main_dir, names_before if add_only_new_persons else set(),
            copy_mode=copy_mode, log_fn=log_fn
        )
        person_dirs_to_encode = added_dirs  # encode only what we just added
        if not person_dirs_to_encode and log_fn:
            log_fn("[KB] No new person folders to add from source.")

    # ---- collect image targets ----
    targets = _collect_targets(person_dirs_to_encode, valid_exts)
    total = len(targets)
    if total == 0:
        save_kb(kb_path, persons)
        return {
            "mode": ("create" if is_new_kb else ("extend" if source_dir else "refresh")),
            "kb_path": str(kb_path),
            "persons_added": 0, "faces_added": 0, "bodies_added": 0,
            "skipped_persons": skipped_persons,
            "total_faces": len(persons["face_encodings"]),
            "total_bodies": len(persons["body_encodings"]),
            "distinct_names": len(set(persons["face_names"]) | set(persons["body_names"])),
        }

    # ---- build body extractor once ----
    reid = TorchreidBodyExtractor(model_name=reid_model, device=None, log_fn=log_fn)

    faces_added = bodies_added = 0
    processed_names: set[str] = set()

    def emit_log(i, name, path, note):
        if log_fn:
            log_fn(f"[KB] {i}/{total}{name}{path.name} {note}")

    def emit_progress(i):
        if progress_fn:
            progress_fn(int(i * 100 / total))

    i_global = 0
    for start in range(0, total, batch_size):
        chunk = targets[start:start + batch_size]   # [(name, Path)]
        # ---- body (batch) with safe file closing ----
        pil_imgs: List[Optional[Image.Image]] = []
        valid_mask: List[bool] = []
        for _, path in chunk:
            try:
                with Image.open(path) as im:
                    pil_imgs.append(im.convert("RGB").copy())
                valid_mask.append(True)
            except Exception:
                pil_imgs.append(None)
                valid_mask.append(False)
        E = None
        if any(valid_mask):
            try:
                E = reid.extract_batch([img for img in pil_imgs if img is not None], batch_size=batch_size)
            except Exception:
                E = None  # fall back per-image
        k = 0

        for (name, path), is_valid in zip(chunk, valid_mask):
            i_global += 1
            processed_names.add(name)
            # body
            try:
                if E is not None and is_valid:
                    e = E[k].astype("float32"); k += 1
                else:
                    with Image.open(path) as im:
                        e = reid(im.convert("RGB")).astype("float32")
                persons["body_encodings"].append(e)
                persons["body_names"].append(name)
                bodies_added += 1
                body_note = "body✓"
            except Exception as be:
                body_note = f"body×({be.__class__.__name__})"
            # faces
            face_cnt = 0
            try:
                with Image.open(path) as im:
                    chips = detect_and_align_faces(im, compute_embedding=True, embedding_model="small")
                for r in chips:
                    emb = r.get("embedding")
                    if emb is not None and emb.size:
                        persons["face_encodings"].append(np.asarray(emb, dtype=np.float32))
                        persons["face_names"].append(name)
                        face_cnt += 1
            except Exception as fe:
                body_note += f" face×({fe.__class__.__name__})"
            faces_added += face_cnt
            emit_log(i_global, name, path, f"— {body_note} face+{face_cnt}")
            emit_progress(i_global)
    save_kb(kb_path, persons)
    return {
        "mode": ("create" if is_new_kb else ("extend" if source_dir else "refresh")),
        "kb_path": str(kb_path),
        "persons_added": len(processed_names),
        "faces_added": faces_added,
        "bodies_added": bodies_added,
        "skipped_persons": skipped_persons,
        "total_faces": len(persons['face_encodings']),
        "total_bodies": len(persons['body_encodings']),
        "distinct_names": len(set(persons["face_names"]) | set(persons["body_names"])),
    }

# ----------------- Worker -----------------
class KBUpsertWorker(QtCore.QObject):
    progress = pyqtSignal(int)
    log = pyqtSignal(str)
    finished = pyqtSignal(dict)

    def __init__(self, main_dir: Path, kb_path: Path, reid_model: str, source_dir: Optional[Path] = None, copy_mode: str = "copy", add_only_new_persons: bool = True, 
                 valid_exts=(".jpg",".jpeg",".png",".webp"), kb_batch_size: int = 16, parent=None ):
        super().__init__(parent)
        self.main_dir = Path(main_dir)
        self.kb_path = Path(kb_path)
        self.reid_model = reid_model
        self.source_dir = Path(source_dir) if source_dir else None
        self.copy_mode = copy_mode
        self.add_only_new_persons = add_only_new_persons
        self.valid_exts = tuple(valid_exts)
        self.kb_batch_size = kb_batch_size

    @QtCore.pyqtSlot()
    def run(self):
        try:
            stats = create_or_extend_kb(main_dir=self.main_dir, kb_path=self.kb_path, reid_model=self.reid_model, source_dir=self.source_dir, copy_mode=self.copy_mode, 
                                        add_only_new_persons=self.add_only_new_persons, log_fn=self.log.emit, progress_fn=self.progress.emit, valid_exts=self.valid_exts, batch_size=self.kb_batch_size,)
            self.finished.emit(stats)
        except Exception as e:
            self.log.emit(f"[KB] ERROR: {e}")
            self.finished.emit({})
Python

Related Stories