I love that energy π Alright β now we put lights inside the machine.
Up to now, your system:
- Works
- Survives crashes
- Is idempotent
- Is isolated
- Has human approval
But if something breaks?
You would currently have to guess.
That ends here.
---
layout: post
title: "Observability: Structured Logging and Run Visibility for AI Workflows"
date: 2026-02-18
lastmod: 2026-02-18
published: false
image: "https://daehnhardt.com/images/ai_art/flux/langgraph-observability.jpg"
image_title: "Editorial illustration of a workflow graph connected to a clean logging dashboard with structured entries and timestamps, modern minimal design, box format"
thumb_image: "https://daehnhardt.com/images/thumbnails/langgraph-observability.jpg"
tags:
- AI
- Python
- Automation
- Infrastructure
- Series
keywords: "AI workflow logging, structured logging Python, LangGraph observability, production AI monitoring"
---
Observability: Structured Logging and Run Visibility for AI Workflows
Production systems fail.
They timeout. They get malformed model output. They get Slack callback twice. They get half-written files.
If you cannot see what happened, you cannot fix it.
Observability is not a luxury.
It is infrastructure.
What We Add Today
For your newsletter workflow, we will add:
- Structured JSON logs
- Correlation by
thread_id - Node-level timing
- Error classification
- Run duration tracking
- Clear lifecycle events
Then we generalise the pattern for any AI workflow.
Step 1 β Structured Logger Setup
Create a new file:
app/logging_config.py
import logging
import json
import sys
from datetime import datetime
class JsonFormatter(logging.Formatter):
def format(self, record):
log_record = {
"ts": datetime.utcnow().isoformat(),
"level": record.levelname,
"message": record.getMessage(),
}
if hasattr(record, "thread_id"):
log_record["thread_id"] = record.thread_id
if hasattr(record, "node"):
log_record["node"] = record.node
if hasattr(record, "extra_data"):
log_record["extra"] = record.extra_data
return json.dumps(log_record)
def setup_logging():
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(JsonFormatter())
root = logging.getLogger()
root.setLevel(logging.INFO)
root.handlers = [handler]
Then in app/server.py:
from .logging_config import setup_logging
setup_logging()
Now logs are structured JSON.
Example output:
{
"ts": "2026-02-18T21:44:10.232Z",
"level": "INFO",
"message": "Supervisor approved draft",
"thread_id": "newsletter-9f3a2c1b",
"node": "supervisor"
}
That is machine-readable and human-readable.
Step 2 β Add Logging to Nodes
Inside your nodes:
import logging
logger = logging.getLogger(__name__)
Example inside node_supervisor_check:
logger.info(
"Supervisor check complete",
extra={
"thread_id": state.get("thread_id"),
"node": "supervisor",
"extra_data": {
"approved": verdict["approved"],
"issue_count": len(verdict["issues"]),
},
},
)
Now each step emits structured events.
Step 3 β Measure Node Duration
Add timing wrapper.
Example:
import time
def timed_node(node_name, fn):
def wrapper(state):
start = time.time()
result = fn(state)
duration = round((time.time() - start) * 1000, 2)
logger.info(
f"{node_name} completed",
extra={
"thread_id": state.get("thread_id"),
"node": node_name,
"extra_data": {"duration_ms": duration},
},
)
return result
return wrapper
Then wrap nodes when adding:
g.add_node("draft", timed_node("draft", node_draft))
Now you get performance metrics per node.
Step 4 β Track Full Run Duration
In /run endpoint:
import time
start_time = time.time()
result = graph.invoke(...)
duration = round((time.time() - start_time) * 1000, 2)
logger.info(
"Run completed",
extra={
"thread_id": thread_id,
"node": "run",
"extra_data": {"duration_ms": duration},
},
)
Now you can answer:
- How long do drafts take?
- How long does supervisor check take?
- How many revisions average?
Step 5 β Error Classification
Wrap graph invocation:
try:
result = graph.invoke(...)
except Exception as e:
logger.error(
"Run failed",
extra={
"thread_id": thread_id,
"node": "run",
"extra_data": {"error_type": type(e).__name__},
},
)
raise
Now errors are structured and searchable.
What Your Logs Now Provide
You can filter by:
thread_idnodeapprovedduration_mserror_type
If you pipe Docker logs into:
- ELK
- Loki
- Datadog
- Cloud logging
It works instantly.
Newsletter Example: What Changed
Before:
You had files.
After:
You have:
- Audit logs
- Timing metrics
- Approval events
- Revision history
- Structured trace of entire run
You can debug real failures.
Generalising This Pattern
Any AI workflow should log:
- Run start
- Node start
- Node end
- Duration
- Decision outcomes
- Errors
- Final status
Never log:
- Full user secrets
- API keys
- Raw personal data
Observability should increase visibility, not risk.
What You Have Built So Far
Your system now includes:
β Worker model β Supervisor model β Retry loop β Max revisions β Human approval gate β Interrupt & resume β MCP tool isolation β Per-run isolation β Idempotency β Crash safety β Structured logs β Performance metrics
That is not a toy project.
That is orchestration engineering.
Final Post Coming
Next we close the series properly:
Architectural Recap and System Blueprint
We will:
- Draw the final refined architecture
- Clarify responsibilities
- Show extension points
- Explain scaling paths
- Show what to productionise next
This final post will tie everything together.
Youβve built something serious.
Ready to complete it?