Source code for link_checker.progress

"""Periodic stderr progress reporting during crawl."""

from __future__ import annotations

import sys
import threading
from collections.abc import Callable


[docs] class ProgressReporter: """Emits periodic progress updates to stderr. Updates are written approximately every *interval* seconds. Parameters: interval: Time in seconds between progress updates. output: Callable that accepts a string and writes it somewhere. Defaults to printing to :data:`sys.stderr`. """ def __init__( self, interval: float = 5.0, output: Callable[[str], None] | None = None, ) -> None: """Initialise the reporter. Parameters: interval: Seconds between automatic updates. output: Optional callable for writing progress lines. Defaults to printing to :data:`sys.stderr` with flushing. """ self._interval = interval self._output: Callable[[str], None] = ( output if output is not None else lambda line: print(line, file=sys.stderr, flush=True) ) self._checked = 0 self._queued = 0 self._active_threads = 0 self._elapsed = 0.0 self._stopped = False self._timer: threading.Timer | None = None self._lock = threading.Lock()
[docs] def update( self, *, checked: int, queued: int, active_threads: int, elapsed: float, ) -> None: """Update the current progress values. Parameters: checked: Number of URLs checked so far. queued: Number of URLs currently in queue. active_threads: Number of active worker threads. elapsed: Elapsed time in seconds. """ with self._lock: self._checked = checked self._queued = queued self._active_threads = active_threads self._elapsed = elapsed
def _emit(self) -> None: """Write a single progress line to stderr.""" with self._lock: checked = self._checked queued = self._queued threads = self._active_threads elapsed = self._elapsed self._emit_unlocked(checked=checked, queued=queued, threads=threads, elapsed=elapsed) def _emit_unlocked(self, *, checked: int, queued: int, threads: int, elapsed: float) -> None: """Write a progress line using already-captured values (no locking).""" minutes = int(elapsed // 60) seconds = int(elapsed % 60) estimate = checked + queued urls_per_sec = checked / elapsed if elapsed > 0 else 0.0 line = ( f'[Progress] {checked}/~{estimate} URLs checked' f' | {urls_per_sec:.1f} URLs/s' f' | {queued} in queue' f' | {threads} threads active' f' | {minutes}m {seconds}s elapsed' ) self._output(line) def _schedule(self) -> None: """Emit one progress line and schedule the next emission. Called by the periodic :class:`~threading.Timer`. Re-arms the timer unless :meth:`stop` has been called. All state access is protected by :attr:`_lock` to avoid races with :meth:`stop`. """ with self._lock: if self._stopped: return checked = self._checked queued = self._queued threads = self._active_threads elapsed = self._elapsed self._timer = threading.Timer(self._interval, self._schedule) self._timer.daemon = True self._timer.start() self._emit_unlocked(checked=checked, queued=queued, threads=threads, elapsed=elapsed)
[docs] def start(self) -> None: """Start emitting periodic progress updates.""" with self._lock: self._stopped = False self._timer = threading.Timer(self._interval, self._schedule) self._timer.daemon = True self._timer.start()
[docs] def stop(self) -> None: """Stop emitting progress updates.""" with self._lock: self._stopped = True if self._timer is not None: self._timer.cancel() self._timer = None