Skip to content

API reference

This page is generated from the source docstrings with mkdocstrings. It documents the most useful public entry points; browse the source for the full surface.

Configuration

pgcarter.config

Runtime configuration for pgcarter.

Config dataclass

Resolved configuration derived from CLI arguments and defaults.

Source code in pgcarter/config.py
@dataclass
class Config:
    """Resolved configuration derived from CLI arguments and defaults."""

    host: str
    port: int
    database: str
    user: str
    password: str | None
    output_dir: Path
    templates_dir: Path
    schemas: list[str] = field(default_factory=lambda: ["public"])
    log_level: str = "INFO"

    @property
    def conninfo(self) -> str:
        """Build a libpq connection string (psycopg-compatible)."""
        parts = [
            f"host={self.host}",
            f"port={self.port}",
            f"dbname={self.database}",
            f"user={self.user}",
        ]
        if self.password:
            parts.append(f"password={self.password}")
        return " ".join(parts)

    @property
    def sql_dir(self) -> Path:
        return self.output_dir / "sql"

    @property
    def json_dir(self) -> Path:
        return self.output_dir / "json"

    @property
    def docs_dir(self) -> Path:
        return self.output_dir / "docs"

    @property
    def report_path(self) -> Path:
        return self.output_dir / "report.json"

conninfo property

conninfo: str

Build a libpq connection string (psycopg-compatible).

resolve_config

resolve_config(*, host: str, port: int, database: str, user: str, password: str | None, output_dir: str | None, templates_dir: str | None, schemas: list[str] | None = None, log_level: str = 'INFO') -> Config

Apply documented defaults and return a :class:Config.

Per spec
  • output_dir defaults to the database name.
  • templates_dir defaults to ./templates.
  • password may also be supplied via the PGPASSWORD environment variable.
Source code in pgcarter/config.py
def resolve_config(
    *,
    host: str,
    port: int,
    database: str,
    user: str,
    password: str | None,
    output_dir: str | None,
    templates_dir: str | None,
    schemas: list[str] | None = None,
    log_level: str = "INFO",
) -> Config:
    """Apply documented defaults and return a :class:`Config`.

    Per spec:
      * ``output_dir`` defaults to the database name.
      * ``templates_dir`` defaults to ``./templates``.
      * password may also be supplied via the ``PGPASSWORD`` environment variable.
    """
    resolved_output = Path(output_dir) if output_dir else Path(database)
    resolved_templates = Path(templates_dir) if templates_dir else Path("./templates")
    resolved_password = password if password is not None else os.environ.get("PGPASSWORD")

    return Config(
        host=host,
        port=port,
        database=database,
        user=user,
        password=resolved_password,
        output_dir=resolved_output,
        templates_dir=resolved_templates,
        schemas=schemas or ["public"],
        log_level=log_level,
    )

Metadata models

The dataclasses below are the single source of truth shared by the SQL generation, JSON, and documentation layers.

pgcarter.models

Metadata models for extracted PostgreSQL assets.

These dataclasses are the single source of truth that both the SQL generation layer and the documentation (Jinja2) rendering layer consume. They are plain dataclasses (no behaviour beyond serialisation) so they are trivially serialisable to JSON and passed verbatim into templates.

Grant dataclass

Bases: _Serialisable

A single privilege grant on an object.

Source code in pgcarter/models/__init__.py
@dataclass
class Grant(_Serialisable):
    """A single privilege grant on an object."""

    object_type: str  # database | schema | table | column | sequence | function
    object_name: str
    grantee: str
    privilege: str
    grantable: bool = False
    column: str | None = None

Relationship dataclass

Bases: _Serialisable

An edge in the object dependency / relationship graph.

Source code in pgcarter/models/__init__.py
@dataclass
class Relationship(_Serialisable):
    """An edge in the object dependency / relationship graph."""

    source: str
    target: str
    type: str  # foreign_key | view_dependency | function_dependency | trigger | sequence
    label: str | None = None

Inventory dataclass

Bases: _Serialisable

Aggregate of everything extracted from a single database.

Source code in pgcarter/models/__init__.py
@dataclass
class Inventory(_Serialisable):
    """Aggregate of everything extracted from a single database."""

    database: DatabaseInfo
    schemas: list[Schema] = field(default_factory=list)
    tables: list[Table] = field(default_factory=list)
    indexes: list[Index] = field(default_factory=list)
    views: list[View] = field(default_factory=list)
    functions: list[Function] = field(default_factory=list)
    triggers: list[Trigger] = field(default_factory=list)
    sequences: list[Sequence] = field(default_factory=list)
    extensions: list[Extension] = field(default_factory=list)
    roles: list[Role] = field(default_factory=list)
    grants: list[Grant] = field(default_factory=list)
    relationships: list[Relationship] = field(default_factory=list)

Run report

pgcarter.report

Run report collection: extracted/skipped objects, warnings, and errors.

Report dataclass

Accumulates the outcome of an extraction + generation run.

Source code in pgcarter/report.py
@dataclass
class Report:
    """Accumulates the outcome of an extraction + generation run."""

    database: str = ""
    started_at: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
    finished_at: str | None = None
    extracted: dict[str, int] = field(default_factory=lambda: defaultdict(int))
    skipped: list[SkippedObject] = field(default_factory=list)
    warnings: list[str] = field(default_factory=list)
    errors: list[str] = field(default_factory=list)
    generated_files: list[str] = field(default_factory=list)

    def record_extracted(self, object_type: str, count: int = 1) -> None:
        self.extracted[object_type] += count

    def record_skipped(self, object_type: str, object_name: str, reason: str) -> None:
        self.skipped.append(SkippedObject(object_type, object_name, reason))

    def record_warning(self, message: str) -> None:
        self.warnings.append(message)

    def record_error(self, message: str) -> None:
        self.errors.append(message)

    def record_file(self, path: Path | str) -> None:
        self.generated_files.append(str(path))

    def finish(self) -> None:
        self.finished_at = datetime.now(UTC).isoformat()

    def to_dict(self) -> dict[str, Any]:
        return {
            "database": self.database,
            "started_at": self.started_at,
            "finished_at": self.finished_at,
            "summary": {
                "extracted": dict(self.extracted),
                "skipped_count": len(self.skipped),
                "warning_count": len(self.warnings),
                "error_count": len(self.errors),
                "generated_file_count": len(self.generated_files),
            },
            "extracted": dict(self.extracted),
            "skipped": [vars(s) for s in self.skipped],
            "warnings": self.warnings,
            "errors": self.errors,
            "generated_files": sorted(self.generated_files),
        }

    def write(self, path: Path) -> None:
        path.parent.mkdir(parents=True, exist_ok=True)
        path.write_text(json.dumps(self.to_dict(), indent=2, sort_keys=False))

Logging

pgcarter.logging_config

Structured logging configuration for pgcarter, built on structlog.

Production default: line-delimited JSON to stdout, ready for ingestion by log aggregators (Datadog, ELK/OpenSearch, CloudWatch, Loki, …). Enable colourised developer console output with pretty_logs=True or LOG_PRETTY=true.

Both stdlib logging.getLogger and structlog.get_logger are supported and render through the same pipeline, so existing %-style calls keep working while new code can emit structured key/value events. Context bound via structlog.contextvars.bind_contextvars (e.g. request_id) is automatically attached to every event.

configure_logging

configure_logging(pretty_logs: bool | None = None, level: str | None = None) -> None

Configure structlog and stdlib logging for the whole application.

Parameters:

Name Type Description Default
pretty_logs bool | None

True → colourised developer console; False → JSON. None (default) falls back to the LOG_PRETTY environment variable, then to JSON.

None
level str | None

Logging level name ("DEBUG"/"INFO"/…). None falls back to the LOG_LEVEL environment variable, then to "INFO".

None

Logs are written to stdout. Calling this more than once reconfigures cleanly (the root handler is replaced).

Source code in pgcarter/logging_config.py
def configure_logging(
    pretty_logs: bool | None = None,
    level: str | None = None,
) -> None:
    """Configure structlog and stdlib logging for the whole application.

    Args:
        pretty_logs: ``True`` → colourised developer console; ``False`` → JSON.
            ``None`` (default) falls back to the ``LOG_PRETTY`` environment
            variable, then to JSON.
        level: Logging level name (``"DEBUG"``/``"INFO"``/…). ``None`` falls back
            to the ``LOG_LEVEL`` environment variable, then to ``"INFO"``.

    Logs are written to **stdout**. Calling this more than once reconfigures
    cleanly (the root handler is replaced).
    """
    if pretty_logs is None:
        pretty_logs = _env_bool("LOG_PRETTY", False)
    if level is None:
        level = os.environ.get("LOG_LEVEL", "INFO")
    numeric_level = getattr(logging, str(level).upper(), logging.INFO)

    shared = _shared_processors()

    if pretty_logs:
        # ConsoleRenderer renders exc_info itself (pretty, coloured tracebacks).
        final_processors: list[Any] = [
            structlog.stdlib.ProcessorFormatter.remove_processors_meta,
            structlog.dev.ConsoleRenderer(),
        ]
    else:
        # JSON mode: serialise exceptions into a string ``exception`` field.
        final_processors = [
            structlog.stdlib.ProcessorFormatter.remove_processors_meta,
            structlog.processors.format_exc_info,
            structlog.processors.JSONRenderer(),
        ]

    # structlog-originated records run the shared chain then hand off to the
    # stdlib ProcessorFormatter for final rendering.
    structlog.configure(
        processors=[*shared, structlog.stdlib.ProcessorFormatter.wrap_for_formatter],
        logger_factory=structlog.stdlib.LoggerFactory(),
        wrapper_class=structlog.stdlib.BoundLogger,
        cache_logger_on_first_use=True,
    )

    # The single handler renders BOTH structlog and plain-stdlib records.
    formatter = structlog.stdlib.ProcessorFormatter(
        foreign_pre_chain=shared,
        processors=final_processors,
    )
    handler = logging.StreamHandler(sys.stdout)
    handler.setFormatter(formatter)

    root = logging.getLogger()
    root.handlers.clear()
    root.addHandler(handler)
    root.setLevel(numeric_level)

get_logger

get_logger(name: str | None = None) -> structlog.stdlib.BoundLogger

Return a structlog logger (stdlib-compatible).

Accepts %-style positional args and .exception() like the stdlib logger it replaces, while also accepting structured keyword fields.

Source code in pgcarter/logging_config.py
def get_logger(name: str | None = None) -> structlog.stdlib.BoundLogger:
    """Return a structlog logger (stdlib-compatible).

    Accepts ``%``-style positional args and ``.exception()`` like the stdlib
    logger it replaces, while also accepting structured keyword fields.
    """
    return structlog.get_logger(name)

Analysis engine

pgcarter.analyzer.config

Analysis configuration: enabled checks, thresholds, and sampling.

Loaded from a YAML file (--config analysis.yml) shaped as::

analysis:
  enabled_checks:
    - null_analysis
    - cardinality
    - table_size
  thresholds:
    high_null_percentage: 80
    low_cardinality_limit: 10

Every field has a documented default, so a partial (or absent) config is valid. When enabled_checks is omitted, all registered checks run.

Thresholds dataclass

Numeric limits that turn a measurement into a warning.

Source code in pgcarter/analyzer/config.py
@dataclass
class Thresholds:
    """Numeric limits that turn a measurement into a warning."""

    #: Null fraction (percent) at or above which a column is flagged.
    high_null_percentage: float = 80.0
    #: Distinct-value count at or below which a column is "low cardinality".
    low_cardinality_limit: int = 10
    #: Estimated row count at or above which a table is "extremely large".
    large_table_rows: int = 10_000_000
    #: Distinct/total ratio at or above which a column looks like an identifier.
    unique_ratio: float = 0.99
    #: Average text length (chars) above which a text column is flagged "wide".
    long_text_length: int = 10_000

    @classmethod
    def from_dict(cls, data: dict[str, Any]) -> Thresholds:
        known = {f.name for f in fields(cls)}
        return cls(**{k: v for k, v in data.items() if k in known})

AnalysisConfig dataclass

Resolved analysis configuration.

Source code in pgcarter/analyzer/config.py
@dataclass
class AnalysisConfig:
    """Resolved analysis configuration."""

    enabled_checks: list[str] | None = None
    thresholds: Thresholds = field(default_factory=Thresholds)
    #: Row cap for expensive per-column scans; ``None`` means scan the table.
    sample_size: int | None = None

    def is_enabled(self, check_name: str) -> bool:
        """Whether ``check_name`` should run under this configuration."""
        return self.enabled_checks is None or check_name in self.enabled_checks

    def to_dict(self) -> dict[str, Any]:
        return {
            "enabled_checks": self.enabled_checks,
            "thresholds": asdict(self.thresholds),
            "sample_size": self.sample_size,
        }

is_enabled

is_enabled(check_name: str) -> bool

Whether check_name should run under this configuration.

Source code in pgcarter/analyzer/config.py
def is_enabled(self, check_name: str) -> bool:
    """Whether ``check_name`` should run under this configuration."""
    return self.enabled_checks is None or check_name in self.enabled_checks

load_analysis_config

load_analysis_config(path: str | Path | None, *, sample_size: int | None = None) -> AnalysisConfig

Load an :class:AnalysisConfig from YAML, applying defaults.

A missing path yields the default configuration. A CLI sample_size overrides any value present in the file.

Source code in pgcarter/analyzer/config.py
def load_analysis_config(
    path: str | Path | None,
    *,
    sample_size: int | None = None,
) -> AnalysisConfig:
    """Load an :class:`AnalysisConfig` from YAML, applying defaults.

    A missing path yields the default configuration. A CLI ``sample_size``
    overrides any value present in the file.
    """
    data: dict[str, Any] = {}
    if path is not None:
        raw = yaml.safe_load(Path(path).read_text()) or {}
        if not isinstance(raw, dict):
            raise ValueError(f"Config file '{path}' must be a YAML mapping")
        data = raw.get("analysis", raw) or {}
        if not isinstance(data, dict):
            raise ValueError(f"Config file '{path}' 'analysis' section must be a mapping")

    enabled = data.get("enabled_checks")
    if enabled is not None and not isinstance(enabled, list):
        raise ValueError("'enabled_checks' must be a list of check names")

    thresholds = Thresholds.from_dict(data.get("thresholds") or {})
    resolved_sample = sample_size if sample_size is not None else data.get("sample_size")

    return AnalysisConfig(
        enabled_checks=list(enabled) if enabled is not None else None,
        thresholds=thresholds,
        sample_size=resolved_sample,
    )

pgcarter.analyzer.engine

The analysis engine: run enabled checks and assemble the report.

The engine feeds each asset to the checks whose scope matches it (tables to table checks, (table, column) pairs to column checks, the whole inventory to database checks), then merges results into :class:TableAnalysis / :class:ColumnAnalysis objects. Every non-informational result also becomes a :class:Warning. A failure in one check is captured and never aborts the run, mirroring the extractor's resilience model.

AnalysisEngine

Run all enabled checks against an inventory (optionally a live DB).

Source code in pgcarter/analyzer/engine.py
class AnalysisEngine:
    """Run all enabled checks against an inventory (optionally a live DB)."""

    def __init__(
        self,
        inventory: Inventory,
        config: AnalysisConfig,
        db: Database | None = None,
        report: Report | None = None,
    ) -> None:
        self.inventory = inventory
        self.config = config
        self.ctx = AnalysisContext(inventory=inventory, config=config, db=db, report=report)
        self.report = report

    # -- check execution with per-check resilience --------------------------
    def _safe_execute(self, check: Check, asset: Any) -> list[CheckResult]:
        try:
            if not check.applies(asset, self.ctx):
                return []
            return check.execute(asset, self.ctx)
        except Exception as exc:  # noqa: BLE001 - resilience is the point
            log.exception("Check '%s' failed", check.name)
            if self.report is not None:
                self.report.record_error(f"check {check.name}: {exc}")
            return []

    def analyze(self) -> AnalysisReport:
        checks = instantiate_checks(self.config)
        table_checks = [c for c in checks if c.scope == "table"]
        column_checks = [c for c in checks if c.scope == "column"]
        database_checks = [c for c in checks if c.scope == "database"]
        log.info(
            "Running %d checks (%d table, %d column, %d database) over %d tables",
            len(checks),
            len(table_checks),
            len(column_checks),
            len(database_checks),
            len(self.inventory.tables),
        )

        analyses: dict[str, TableAnalysis] = {}
        all_results: list[CheckResult] = []

        total_tables = len(self.inventory.tables)
        for index, table in enumerate(self.inventory.tables, start=1):
            queried = (
                f"; {len(self.ctx.generated_queries)} queries so far" if self.ctx.online else ""
            )
            log.info(
                "[%d/%d] analyzing %s (%d columns)%s",
                index,
                total_tables,
                table.qualified_name,
                len(table.columns),
                queried,
            )
            ta = TableAnalysis(schema=table.schema, name=table.name)
            for check in table_checks:
                for r in self._safe_execute(check, table):
                    ta.checks.append(r)
                    _merge(ta.metrics, r.details)
                    all_results.append(r)
            for column in table.columns:
                ca = ColumnAnalysis(
                    name=column.name,
                    data_type=column.data_type,
                    nullable=column.nullable,
                    semantics=detect_semantics(column),
                )
                for check in column_checks:
                    for r in self._safe_execute(check, (table, column)):
                        ca.checks.append(r)
                        _merge_column(ca, r.details)
                        all_results.append(r)
                ta.columns.append(ca)
            analyses[ta.qualified_name] = ta

        # Database-scope checks (relationships, indexes, cross-table quality).
        for index, check in enumerate(database_checks, start=1):
            log.info("[db %d/%d] running %s", index, len(database_checks), check.name)
            for r in self._safe_execute(check, self.inventory):
                all_results.append(r)
                if r.table and r.table in analyses:
                    analyses[r.table].checks.append(r)

        warnings = [
            Warning(
                severity=r.severity,
                category=r.category,
                check=r.check,
                message=r.message,
                table=r.table,
                column=r.column,
                details=r.details,
            )
            for r in all_results
            if r.severity != INFO
        ]
        warnings.sort(key=lambda w: (-severity_rank(w.severity), w.table or "", w.check))

        tables = sorted(analyses.values(), key=lambda t: t.qualified_name)
        report = AnalysisReport(
            database=self.inventory.database.name,
            mode="online" if self.ctx.online else "offline",
            schemas=sorted({t.schema for t in self.inventory.tables}),
            generated_at=datetime.now(UTC).isoformat(),
            sample_size=self.config.sample_size,
            tables=tables,
            warnings=warnings,
        )
        report.summary = _summarise(report, all_results)
        if self.report is not None:
            self.report.record_extracted("checks_run", len(all_results))
            self.report.record_extracted("warnings", len(warnings))
        return report