Topics We'll Cover
📊 Datadog Monitoring
APM setup, metrics types, and implementation
🛡️ Defensive Programming
High availability & security best practices
Building Resilient Python Services on Kubernetes
APM setup, metrics types, and implementation
High availability & security best practices
APM = Application Performance Monitoring
See the entire request journey:
API → Auth → PostgreSQL → Celery → Redis → LLM API
Visual graph of all services and their dependencies
P50, P95, P99 latencies per endpoint. Find the slowest 1%!
# requirements.txt or pyproject.toml
ddtrace>=2.0.0
DD_SERVICE=api-backend # Your service name
DD_ENV=production # Environment
DD_TRACE_AGENT_PORT=8126 # APM port (not 8125!)
# Run with ddtrace-run
ddtrace-run uvicorn main:app
# In Dockerfile
CMD ["ddtrace-run", "uvicorn", \
"main:app", "--host", "0.0.0.0"]
# Run Celery with ddtrace
ddtrace-run celery -A src.worker worker
# In Kubernetes
command: ["ddtrace-run"]
args: ["celery", "-A", "src.worker", \
"worker"]
from ddtrace import tracer
@tracer.wrap(service="workflow", resource="generate_report")
def generate_medical_report(patient_id: str):
with tracer.trace("fetch_biomarkers") as span:
span.set_tag("patient_id", patient_id[:8]) # Privacy!
biomarkers = fetch_from_db(patient_id)
with tracer.trace("llm_analysis") as span:
span.set_tag("model", "gpt-4")
span.set_metric("token_count", 1500)
result = analyze_with_llm(biomarkers)
return result
What: Cumulative count
When: Events, requests, errors
Example:
datadog.increment(
'api.requests',
tags=['endpoint:/users']
)
What: Point-in-time value
When: CPU, memory, queue size
Example:
datadog.gauge(
'queue.size',
value=42,
tags=['queue:parser']
)
What: Statistical distribution
When: Latencies, sizes, durations
Example:
datadog.histogram(
'request.duration',
duration_ms,
tags=['service:api']
)
What: Duration measurement
When: Operation timing
Example:
with datadog.timer('db.query'):
result = db.execute(query)
| Metric Type | Use Case | Example |
|---|---|---|
| Counter | Count occurrences (rate matters) | Requests, errors, tasks completed |
| Gauge | Current state (absolute value) | CPU %, memory usage, active connections |
| Histogram | Distribution analysis (percentiles) | Response times, payload sizes |
| Timing | Measure durations easily | Function execution time |
class DataDogMetrics:
def __init__(self, enabled: bool = True, service_name: str = "workflow-functional"):
self.enabled = enabled
self.service_name = service_name
if self.enabled:
try:
from datadog import initialize, statsd
options = {"statsd_host": "127.0.0.1", "statsd_port": 8125}
initialize(**options)
self.datadog_client = statsd
except ImportError:
logger.warning("DataDog not installed, metrics disabled")
self.enabled = False
except Exception as e:
logger.warning(f"Failed to initialize DogStatsD: {e}")
self.enabled = False # Graceful degradation!
def track_workflow_execution(
self,
workflow_name: str,
duration_ms: float,
sections_generated: int,
biomarkers_processed: int,
success: bool = True,
error_type: str | None = None,
):
tags = [f"workflow:{workflow_name}", f"success:{success}"]
if error_type:
tags.append(f"error_type:{error_type}")
self.increment(f"{self.service_name}.workflow_executions", tags=tags)
self.histogram(f"{self.service_name}.workflow_execution_time", duration_ms, tags=tags)
self.histogram(f"{self.service_name}.sections_generated", sections_generated, tags=tags)
self.histogram(f"{self.service_name}.biomarkers_processed", biomarkers_processed, tags=tags)
def track_llm_cost(self, cost: float, model: str, operation: str, token_count: int):
tags = [f"model:{model}", f"operation:{operation}"]
self.histogram(f"{self.service_name}.llm_cost", cost, tags=tags)
if token_count and token_count > 0:
cost_per_token = cost / token_count
self.histogram(f"{self.service_name}.llm_cost_per_token", cost_per_token, tags=tags)
def track_workflow_cost(self, total_cost: float, session_id: str, sections_generated: int):
tags = [f"session_id:{session_id[:8]}"] # Privacy: truncate IDs!
self.histogram(f"{self.service_name}.workflow_total_cost", total_cost, tags=tags)
if sections_generated > 0:
cost_per_section = total_cost / sections_generated
self.histogram(f"{self.service_name}.cost_per_section", cost_per_section, tags=tags)
def _is_retryable_error(self, error: Exception) -> tuple[bool, float]:
# Rate limit errors (429) - respect server's retry-after header
if isinstance(error, RateLimitError):
retry_after = getattr(error, "retry_after", None)
if retry_after:
return True, float(retry_after) # Use server's delay!
return True, self.config.retry_initial_delay * 2
# Server errors (5xx) - retryable
if isinstance(error, APIError) and error.status_code >= 500:
return True, self.config.retry_initial_delay
# Timeout errors - retryable
if isinstance(error, (asyncio.TimeoutError, TimeoutError)):
return True, self.config.retry_initial_delay
return False, None # Client errors (4xx) are NOT retryable
def _calculate_backoff_delay(self, attempt: int, base_delay: float = None) -> float:
if base_delay is None:
base_delay = self.config.retry_initial_delay
return base_delay * (self.config.retry_backoff_base ** attempt)
# Example: base_delay=1s, backoff_base=2
# Attempt 0: 1s, Attempt 1: 2s, Attempt 2: 4s, Attempt 3: 8s
async def execute_with_retry(self, operation, operation_name: str, max_retries: int = 3):
stats = self.retry_stats[operation_name]
for attempt in range(max_retries + 1):
try:
logger.debug(f"🔄 ATTEMPT {attempt + 1}/{max_retries + 1}: {operation_name}")
result = await operation()
stats["success_count"] += 1
return result
except Exception as error:
is_retryable, retry_after = self._is_retryable_error(error)
if attempt < max_retries and is_retryable:
delay = retry_after or self._calculate_backoff_delay(attempt)
logger.warning(f"⚠️ RETRY in {delay:.2f}s")
await asyncio.sleep(delay)
continue
raise
try:
result = pipeline.update_batch()
return result
except Exception as e: # ⚠️ Too broad!
raise HTTPException(
status_code=500,
detail=f"Error: {str(e)}"
)
Why This is Dangerous:
try:
result = pipeline.update_batch()
return result
except ValidationError as e:
raise HTTPException(400, "Invalid input")
except RateLimitError as e:
raise HTTPException(429, "Rate limited")
except ProcessingException as e:
if e.is_retryable():
raise HTTPException(503, e.get_user_message())
raise HTTPException(400, e.get_user_message())
except Exception as e:
logger.error(f"Unexpected: {e}", exc_info=True)
raise HTTPException(500, "Internal error")
Why This is Better:
ERROR_DEFINITIONS = {
ErrorCode.FILE_DOWNLOAD_ERROR: ErrorInfo(
code=ErrorCode.FILE_DOWNLOAD_ERROR,
user_message="Failed to download file due to server or network issues.",
is_retryable=True, # Built-in retryability!
internal_description="Failed (5xx, 401, 403, 429, network error)"
),
ErrorCode.FILE_NOT_FOUND_ERROR: ErrorInfo(
code=ErrorCode.FILE_NOT_FOUND_ERROR,
user_message="The requested file was not found.",
is_retryable=False, # Client error - don't retry!
internal_description="Terminal client error (400, 404, 410)"
),
}
def verify_password(input_password, stored_password):
if input_password == stored_password: # ⚠️ Vulnerable!
return True
return False
# Why This is Vulnerable:
# Comparison stops at first mismatch
# "aaa" vs "zzz" → fast (1 comparison)
# "zza" vs "zzz" → slow (3 comparisons)
#
# Attacker measures response time:
# - Try "a" → 50ms → wrong
# - Try "z" → 52ms → correct! (took longer)
# - Repeat to discover full password
#
# Can break 8-char password in hours!
import secrets
def verify_credentials(credentials):
correct_username = secrets.compare_digest(
credentials.username, USERNAME
)
correct_password = secrets.compare_digest(
credentials.password, PASSWORD
)
if not (correct_username and correct_password):
raise HTTPException(401, "Invalid credentials")
return credentials.username
Why secrets.compare_digest() is Secure:
# ⚠️ REAL EXAMPLE - DON'T DO THIS!
N1_API_KEY = os.getenv(
"N1_API_KEY",
"n1-devkey-Vl2R3he2isa9" # ⚠️ Hardcoded!
)
PASSWORD = os.getenv(
"N1_PASSWORD",
"n1_secret+12" # ⚠️ In git forever!
)
Why This is a Security Disaster:
N1_API_KEY = os.getenv("N1_API_KEY")
if not N1_API_KEY:
raise ValueError(
"N1_API_KEY environment variable required"
)
PASSWORD = os.getenv("N1_PASSWORD")
if not PASSWORD:
raise RuntimeError(
"N1_PASSWORD must be set"
)
# In Kubernetes:
# - Use Secrets
# - Mount as environment variables
# - Rotate via kubectl (no code changes!)
Why This is Secure:
engine = create_engine(
DATABASE_URL,
pool_size=20, # Max idle connections
max_overflow=20, # Max temporary connections
pool_timeout=120, # Wait max 2 min
pool_recycle=1800, # Recycle after 30 min
pool_pre_ping=True # Health check before using!
)
@contextmanager
def get_db_session(commit=True):
session = SessionLocal()
try:
yield session
if commit:
session.commit()
except Exception:
session.rollback() # Always rollback on error!
raise
finally:
session.close() # Always cleanup!
from pydantic import BaseModel, Field
class ProcessRecordRequest(BaseModel):
record_id: str = Field(..., description="Unique identifier")
user_id: str = Field(..., description="User identifier")
bucket_name: str = Field(default="no-bucket-provided")
signed_url: str = Field(..., description="Signed URL for file")
@router.post("/process")
async def process_record(request: ProcessRecordRequest):
# Pydantic already validated:
# - All required fields present
# - Correct types
# - No extra fields
task_id = celery_app.send_task(
'process_record',
kwargs={
'record_id': request.record_id,
'user_id': request.user_id,
'bucket_name': request.bucket_name,
}
)
return {"task_id": task_id}
def allowed_file(filename: str) -> bool:
"""Whitelist approach - only allow specific extensions."""
return '.' in filename and filename.rsplit('.', 1)[1].lower() in {'pdf'}
@app.post("/api/upload")
async def upload_file(file: UploadFile = File(...)):
if not file.filename:
raise HTTPException(400, "No file selected")
if not allowed_file(file.filename):
raise HTTPException(400, "Invalid file type. Only PDF allowed.")
# Sanitize filename - prevent directory traversal
task_id = str(uuid.uuid4())
filename = file.filename.replace(" ", "_") # Remove spaces
filename = filename.replace("/", "_") # Prevent path traversal
filename = filename.replace("\\", "_") # Windows paths too
file_path = os.path.join(UPLOAD_FOLDER, f"{task_id}_{filename}")
import aiohttp
async def _make_async_request(self, endpoint: str):
timeout = aiohttp.ClientTimeout(total=self.timeout)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(url, headers=self.headers) as response:
return await response.json()
# Database timeouts
engine = create_engine(DATABASE_URL, pool_timeout=120)
# Celery task timeouts
@celery_app.task(soft_time_limit=300, time_limit=330)
def long_running_task():
pass
class CustomHttpVlmClient(VlmClient):
def __init__(self, max_concurrency: int = 10):
self.max_concurrency = max_concurrency
self.semaphore = asyncio.Semaphore(max_concurrency)
async def process_document(self, doc):
async with self.semaphore: # Acquire slot
# Only 10 documents processed simultaneously
return await self._call_vlm_api(doc)
content_encoding = response.headers.get("Content-Encoding")
if content_encoding == "zstd":
import zstandard as zstd
dctx = zstd.ZstdDecompressor()
try:
# Method 1: Standard decompression
content = dctx.decompress(content)
except Exception as e1:
logger.warning(f"Standard failed: {e1}")
try:
# Method 2: Streaming decompression
reader = dctx.stream_reader(io.BytesIO(content))
content = reader.read()
logger.info("✅ Streaming succeeded")
except Exception as e2:
# Method 3: With max output size
content = dctx.decompress(content, max_output_size=100*1024*1024)
logger.info("✅ Max size succeeded")
async def _safe_error_context(request: Request, err: Exception) -> dict:
"""Extract safe context for logging - avoid PII/secrets."""
ctx = {
"method": request.method,
"path": request.url.path,
"query": dict(request.query_params),
"client_ip": request.client.host,
"user_agent": request.headers.get("user-agent"),
"request_id": request_id_ctx.get(),
"user_id": user_id_ctx.get(),
}
# ⚠️ NEVER log these:
# - Authorization headers
# - Cookies
# - Passwords
# - Full session IDs (truncate to first 8 chars)
# Only include small JSON bodies
ctype = request.headers.get("content-type", "")
clen = int(request.headers.get("content-length") or 0)
if "application/json" in ctype and clen <= 4096:
body = await request.body()
ctx["json_body_preview"] = body.decode(errors="ignore")
if err:
ctx["error_type"] = err.__class__.__name__
ctx["error_msg"] = str(err)[:200] # Truncate!
return ctx
@app.get("/health")
async def health():
return {"status": "ok"} # ⚠️ Lies!
# Meanwhile:
# - PostgreSQL connection pool exhausted
# - Redis connection dead
# - Out of memory
#
# Traffic sent to broken pod
# Users get 500 errors
@app.get("/health") # Liveness
async def health_check():
try:
# Check PostgreSQL
async with get_db() as db:
await db.execute(text("SELECT 1"))
# Check Redis
redis_client.ping()
return {"status": "healthy"}
except Exception as e:
logger.error(f"Health failed: {e}")
raise HTTPException(503, {"status": "unhealthy"})
livenessProbe: # Is pod alive? Fails → RESTART
httpGet:
path: /health
periodSeconds: 10
failureThreshold: 3
readinessProbe: # Ready for traffic? Fails → REMOVE
httpGet:
path: /ready
periodSeconds: 5
failureThreshold: 2
# Stage 1: Builder (includes build tools)
FROM python:3.12-slim AS builder
WORKDIR /app
RUN apt-get update && apt-get install -y build-essential python3-dev
COPY pyproject.toml ./src ./
RUN pip install --no-cache-dir -e .
# Stage 2: Runtime (minimal, no build tools)
FROM python:3.12-slim
WORKDIR /app
RUN apt-get update && apt-get install -y \
poppler-utils libgl1 libglib2.0-0 \
&& rm -rf /var/lib/apt/lists/* # Clean up!
# Copy ONLY runtime packages
COPY --from=builder /usr/local/lib/python3.12/site-packages /usr/local/lib/python3.12/site-packages
COPY ./src ./src
ENTRYPOINT ["celery", "-A", "src.worker", "worker"]
Assume everything will break. Networks fail, databases go down, APIs timeout.
You can't fix what you can't see. Metrics, logs, traces.
Don't leak secrets, don't expose internals, don't crash the whole service.
🐕 Let's discuss Datadog dashboards
🛡️ Share your defensive programming challenges
💬 Ask about specific implementation patterns
Thank you! Now let's build bulletproof services! 🚀