Elena' s AI Blog

Observability For Ai Workflow

18 Mar 2026 / 8 minutes to read

Elena Daehnhardt

Midjourney AI-generated art
Image credit: Illustration created with Midjourney, prompt by the author.
Image prompt

β€œAn illustration representing cloud computing”

I love that energy πŸ˜„ Alright β€” now we put lights inside the machine.

Up to now, your system:

  • Works
  • Survives crashes
  • Is idempotent
  • Is isolated
  • Has human approval

But if something breaks?

You would currently have to guess.

That ends here.


---
layout: post
title: "Observability: Structured Logging and Run Visibility for AI Workflows"
date: 2026-02-18
lastmod: 2026-02-18
published: false
image: "https://daehnhardt.com/images/ai_art/flux/langgraph-observability.jpg"
image_title: "Editorial illustration of a workflow graph connected to a clean logging dashboard with structured entries and timestamps, modern minimal design, box format"
thumb_image: "https://daehnhardt.com/images/thumbnails/langgraph-observability.jpg"
tags:
  - AI
  - Python
  - Automation
  - Infrastructure
  - Series
keywords: "AI workflow logging, structured logging Python, LangGraph observability, production AI monitoring"
---

Observability: Structured Logging and Run Visibility for AI Workflows

Production systems fail.

They timeout. They get malformed model output. They get Slack callback twice. They get half-written files.

If you cannot see what happened, you cannot fix it.

Observability is not a luxury.

It is infrastructure.


What We Add Today

For your newsletter workflow, we will add:

  • Structured JSON logs
  • Correlation by thread_id
  • Node-level timing
  • Error classification
  • Run duration tracking
  • Clear lifecycle events

Then we generalise the pattern for any AI workflow.


Step 1 β€” Structured Logger Setup

Create a new file:

app/logging_config.py

import logging
import json
import sys
from datetime import datetime


class JsonFormatter(logging.Formatter):
    def format(self, record):
        log_record = {
            "ts": datetime.utcnow().isoformat(),
            "level": record.levelname,
            "message": record.getMessage(),
        }

        if hasattr(record, "thread_id"):
            log_record["thread_id"] = record.thread_id

        if hasattr(record, "node"):
            log_record["node"] = record.node

        if hasattr(record, "extra_data"):
            log_record["extra"] = record.extra_data

        return json.dumps(log_record)


def setup_logging():
    handler = logging.StreamHandler(sys.stdout)
    handler.setFormatter(JsonFormatter())

    root = logging.getLogger()
    root.setLevel(logging.INFO)
    root.handlers = [handler]

Then in app/server.py:

from .logging_config import setup_logging
setup_logging()

Now logs are structured JSON.

Example output:

{
  "ts": "2026-02-18T21:44:10.232Z",
  "level": "INFO",
  "message": "Supervisor approved draft",
  "thread_id": "newsletter-9f3a2c1b",
  "node": "supervisor"
}

That is machine-readable and human-readable.


Step 2 β€” Add Logging to Nodes

Inside your nodes:

import logging
logger = logging.getLogger(__name__)

Example inside node_supervisor_check:

logger.info(
    "Supervisor check complete",
    extra={
        "thread_id": state.get("thread_id"),
        "node": "supervisor",
        "extra_data": {
            "approved": verdict["approved"],
            "issue_count": len(verdict["issues"]),
        },
    },
)

Now each step emits structured events.


Step 3 β€” Measure Node Duration

Add timing wrapper.

Example:

import time

def timed_node(node_name, fn):
    def wrapper(state):
        start = time.time()
        result = fn(state)
        duration = round((time.time() - start) * 1000, 2)

        logger.info(
            f"{node_name} completed",
            extra={
                "thread_id": state.get("thread_id"),
                "node": node_name,
                "extra_data": {"duration_ms": duration},
            },
        )

        return result
    return wrapper

Then wrap nodes when adding:

g.add_node("draft", timed_node("draft", node_draft))

Now you get performance metrics per node.


Step 4 β€” Track Full Run Duration

In /run endpoint:

import time

start_time = time.time()
result = graph.invoke(...)
duration = round((time.time() - start_time) * 1000, 2)

logger.info(
    "Run completed",
    extra={
        "thread_id": thread_id,
        "node": "run",
        "extra_data": {"duration_ms": duration},
    },
)

Now you can answer:

  • How long do drafts take?
  • How long does supervisor check take?
  • How many revisions average?

Step 5 β€” Error Classification

Wrap graph invocation:

try:
    result = graph.invoke(...)
except Exception as e:
    logger.error(
        "Run failed",
        extra={
            "thread_id": thread_id,
            "node": "run",
            "extra_data": {"error_type": type(e).__name__},
        },
    )
    raise

Now errors are structured and searchable.


What Your Logs Now Provide

You can filter by:

  • thread_id
  • node
  • approved
  • duration_ms
  • error_type

If you pipe Docker logs into:

  • ELK
  • Loki
  • Datadog
  • Cloud logging

It works instantly.


Newsletter Example: What Changed

Before:

You had files.

After:

You have:

  • Audit logs
  • Timing metrics
  • Approval events
  • Revision history
  • Structured trace of entire run

You can debug real failures.


Generalising This Pattern

Any AI workflow should log:

  1. Run start
  2. Node start
  3. Node end
  4. Duration
  5. Decision outcomes
  6. Errors
  7. Final status

Never log:

  • Full user secrets
  • API keys
  • Raw personal data

Observability should increase visibility, not risk.


What You Have Built So Far

Your system now includes:

βœ” Worker model βœ” Supervisor model βœ” Retry loop βœ” Max revisions βœ” Human approval gate βœ” Interrupt & resume βœ” MCP tool isolation βœ” Per-run isolation βœ” Idempotency βœ” Crash safety βœ” Structured logs βœ” Performance metrics

That is not a toy project.

That is orchestration engineering.


Final Post Coming

Next we close the series properly:

Architectural Recap and System Blueprint

We will:

  • Draw the final refined architecture
  • Clarify responsibilities
  • Show extension points
  • Explain scaling paths
  • Show what to productionise next

This final post will tie everything together.

You’ve built something serious.

Ready to complete it?

desktop bg dark

About Elena

Elena, a PhD in Computer Science, simplifies AI concepts and helps you use machine learning.

Citation
Elena Daehnhardt. (2026) 'Observability For Ai Workflow', daehnhardt.com, 18 March 2026. Available at: https://daehnhardt.com/blog/2026/03/18/observability-for-ai-workflow/
All Posts