Agentic Infrastructure: Como Adaptar seus Logs e Dados para Agentes de IA

A infraestrutura moderna está passando por uma transformação silenciosa mas profunda. Enquanto tradicionalmente logs, métricas e dados operacionais foram projetados para consumo humano através de dashboards e ferramentas de análise, estamos entrando em uma era onde agentes de inteligência artificial são os principais consumidores dessas informações. Esta mudança fundamental exige repensar completamente como estruturamos, armazenamos e organizamos dados de infraestrutura para maximizar a eficácia de sistemas autônomos que monitoram, diagnosticam e otimizam operações em tempo real.

Neste artigo do Portal do Linux, exploraremos profundamente como adaptar sua infraestrutura para a era agentic, transformando logs caóticos em dados estruturados e semânticos que agentes de IA podem processar eficientemente para tomar decisões autônomas, identificar problemas antes que afetem usuários e otimizar continuamente o desempenho dos sistemas.

O Que É Agentic Infrastructure

Agentic infrastructure refere-se ao design e implementação de sistemas de infraestrutura especificamente otimizados para interação com agentes de inteligência artificial. Diferentemente da infraestrutura tradicional, onde logs e métricas servem principalmente para análise humana pós-incidente ou visualização em dashboards, agentic infrastructure é projetada desde o início para permitir que agentes autônomos compreendam, interpretem e ajam sobre dados operacionais em tempo real.

A característica fundamental de agentic infrastructure é a estruturação semântica completa de todos os dados operacionais. Em vez de logs em texto livre destinados a leitura humana, cada evento, métrica e estado do sistema é representado de forma estruturada com contexto rico, relacionamentos explícitos e metadados que facilitam raciocínio automatizado. Essa abordagem permite que agentes de IA não apenas reajam a eventos, mas compreendam o contexto completo, prevejam problemas futuros e tomem decisões informadas sem intervenção humana.

Por Que Logs Tradicionais São Inadequados para Agentes de IA

O Problema dos Logs Não Estruturados

Examine um conjunto típico de logs de um servidor Linux em produção:

# /var/log/syslog - Logs tradicionais
Feb 7 03:26:14 webserver01 nginx[1234]: 192.168.1.100 - - [07/Feb/2026:03:26:14 +0000] "GET /api/users HTTP/1.1" 502 157 "-" "Mozilla/5.0"

Feb 7 03:26:14 webserver01 nginx[1234]: 2026/02/07 03:26:14 [error] 1234#0: *5678 connect() failed (111: Connection refused) while connecting to upstream

Feb 7 03:26:15 webserver01 systemd[1]: mysql.service: Main process exited, code=killed, status=9/KILL

Feb 7 03:26:15 webserver01 kernel: [123456.789] Out of memory: Kill process 9876 (mysqld) score 850 or sacrifice child

Feb 7 03:26:16 webserver01 prometheus: alert=HighMemoryUsage severity=critical instance=webserver01:9100

Para um engenheiro experiente, esses logs contam uma história clara: o servidor ficou sem memória, o kernel terminou o processo MySQL via OOM killer, e consequentemente o nginx não consegue conectar ao backend, resultando em erros 502 para usuários. No entanto, para um agente de IA processar essa narrativa a partir de texto não estruturado apresenta múltiplos desafios técnicos e operacionais.

Desafios Específicos para Processamento por IA

Variabilidade sintática extrema: Diferentes aplicações usam formatos completamente distintos, desde syslog RFC3164 até formatos proprietários, exigindo dezenas de parsers específicos.

Perda de relacionamentos causais: A conexão entre o OOM event, terminação do MySQL e falhas do nginx está apenas implícita através de timestamps próximos, não explicitamente declarada.

Ambiguidade semântica: Termos como “failed”, “error” e “killed” têm significados contextuais que variam entre aplicações, dificultando classificação automática de severidade.

Ausência de metadados críticos: Informações como impacto no usuário, custos financeiros de downtime, ou se o problema é auto-remediável estão completamente ausentes.

Ineficiência computacional: Parsing de texto livre, extração de entidades e correlação temporal são operações custosas que não escalam bem para milhões de eventos por segundo.

Impossibilidade de raciocínio estruturado: Agentes de IA baseados em grafos de conhecimento ou raciocínio lógico não podem operar eficientemente sobre dados não estruturados.

Princípios de Design para Logs Agentic

1. Estruturação Semântica Completa

Todo evento de infraestrutura deve ser representado como JSON estruturado com semântica explícita:

{
  "event_id": "evt_2026-02-07T03:26:14.123456Z_web01_oom_kill",
  "event_version": "2.0",
  "timestamp": "2026-02-07T03:26:14.123456Z",
  "timestamp_unix": 1738899974.123456,

  "event_classification": {
    "type": "system.resource.oom_killer.process_terminated",
    "category": "system_critical",
    "severity": "critical",
    "severity_score": 9.5,
    "is_user_facing": true,
    "is_slo_impacting": true,
    "requires_immediate_action": true
  },

  "source": {
    "host": "webserver01.us-east-1.prod.example.com",
    "ip_address": "10.0.1.15",
    "region": "us-east-1",
    "availability_zone": "us-east-1a",
    "cluster": "web-cluster-prod",
    "environment": "production",
    "component": "kernel",
    "subsystem": "memory_management"
  },

  "process": {
    "name": "mysqld",
    "pid": 9876,
    "user": "mysql",
    "command": "/usr/sbin/mysqld --defaults-file=/etc/mysql/my.cnf",
    "oom_score": 850,
    "memory_used_bytes": 8589934592,
    "uptime_seconds": 345600
  },

  "system_state": {
    "memory": {
      "total_bytes": 8589934592,
      "available_bytes": 104857600,
      "used_bytes": 8485077092,
      "usage_percent": 98.78,
      "swap_used_bytes": 4294967296,
      "swap_total_bytes": 4294967296
    },
    "cpu": {
      "load_average_1min": 12.45,
      "load_average_5min": 10.23,
      "load_average_15min": 8.91,
      "usage_percent": 87.3
    },
    "disk": {
      "root_usage_percent": 67.8,
      "data_usage_percent": 92.4
    }
  },

  "impact_analysis": {
    "services_affected": ["api-gateway", "user-service", "checkout-service"],
    "estimated_affected_users": 1247,
    "estimated_failed_requests_per_second": 45,
    "estimated_revenue_impact_usd_per_hour": 15000,
    "slo_violations": [
      {
        "slo_name": "api_availability_99_95",
        "current_value": 98.23,
        "threshold": 99.95,
        "breach_magnitude": 1.72
      }
    ]
  },

  "causal_chain": {
    "root_cause": {
      "type": "resource_exhaustion",
      "resource": "memory",
      "probable_triggers": [
        "memory_leak_in_application",
        "unexpected_traffic_spike",
        "inefficient_query_execution"
      ]
    },
    "contributing_factors": [
      {
        "factor": "high_disk_usage",
        "confidence": 0.65,
        "description": "Data partition at 92.4% may have triggered additional caching"
      },
      {
        "factor": "cpu_load_elevated",
        "confidence": 0.45,
        "description": "High CPU load suggests intensive operations"
      }
    ],
    "downstream_effects": [
      {
        "event_type": "database.connection.failed",
        "estimated_occurrences": 156,
        "time_window_seconds": 60
      },
      {
        "event_type": "http.response.502",
        "estimated_occurrences": 2678,
        "time_window_seconds": 60
      }
    ]
  },

  "relationships": {
    "caused_by_events": [],
    "causes_events": [
      "evt_2026-02-07T03:26:14.234567Z_web01_mysql_terminated",
      "evt_2026-02-07T03:26:14.345678Z_web01_nginx_upstream_failed"
    ],
    "related_incidents": ["inc_2026-02-07_03:25_memory_exhaustion"],
    "similar_historical_events": [
      {
        "event_id": "evt_2026-01-15T14:32:11.123456Z_web02_oom_kill",
        "similarity_score": 0.89,
        "resolution_actions": ["increased_memory_allocation", "query_optimization"]
      }
    ]
  },

  "ai_guidance": {
    "automated_diagnosis": "OOM killer terminated MySQL process due to memory exhaustion. Root cause likely memory leak or inefficient queries. Immediate impact: database unavailable, cascading failures in dependent services.",

    "recommended_immediate_actions": [
      {
        "action": "restart_mysql_service",
        "priority": 1,
        "automation_safe": true,
        "estimated_duration_seconds": 30,
        "risk_level": "low",
        "expected_outcome": "Restore database availability"
      },
      {
        "action": "enable_connection_pooling_fallback",
        "priority": 2,
        "automation_safe": true,
        "estimated_duration_seconds": 5,
        "risk_level": "minimal",
        "expected_outcome": "Reduce connection overhead"
      },
      {
        "action": "scale_memory_allocation",
        "priority": 3,
        "automation_safe": false,
        "estimated_duration_seconds": 300,
        "risk_level": "medium",
        "expected_outcome": "Prevent recurrence",
        "requires_approval": true
      }
    ],

    "recommended_investigation_steps": [
      "Analyze MySQL slow query log for memory-intensive operations",
      "Review application code changes in last 48 hours",
      "Check for unexpected traffic patterns",
      "Examine memory allocation trends over past week"
    ],

    "preventive_measures": [
      "Implement memory usage alerts at 80% threshold",
      "Configure query timeout limits",
      "Enable automatic query caching",
      "Schedule regular memory usage audits"
    ],

    "runbook_reference": "https://runbooks.example.com/oom-mysql-recovery",
    "oncall_escalation_required": true,
    "estimated_time_to_resolution_minutes": 15
  },

  "observability": {
    "traces": [
      {
        "trace_id": "trace_abc123def456",
        "span_id": "span_789ghi012jkl",
        "service": "user-service"
      }
    ],
    "metrics_snapshots": {
      "mysql_memory_usage_bytes": 8589934592,
      "mysql_connections_active": 487,
      "mysql_queries_per_second": 1247
    },
    "related_logs": [
      "/var/log/mysql/error.log:line:45678",
      "/var/log/syslog:line:123456"
    ]
  },

  "compliance": {
    "should_be_retained": true,
    "retention_days": 395,
    "contains_pii": false,
    "regulatory_requirements": ["SOC2", "ISO27001"]
  }
}

Este formato estruturado permite que agentes de IA:

  • Compreendam instantaneamente o tipo, severidade e impacto do evento sem parsing de texto
  • Identifiquem relacionamentos causais explícitos entre eventos através de IDs referenciados
  • Avaliem impacto no negócio através de métricas financeiras e de SLO
  • Tomem decisões informadas sobre automação segura versus escalonamento humano
  • Aprendam com eventos passados através de referências a eventos históricos similares

2. Sistema de Logging Agentic Implementado

Vamos implementar um sistema completo de logging otimizado para agentes de IA:

# agentic_logger.py - Sistema completo de logging estruturado
import json
import logging
import time
from datetime import datetime, timezone
from typing import Dict, Any, Optional, List
from dataclasses import dataclass, field, asdict
from enum import Enum
from contextvars import ContextVar
import uuid
import psutil
import socket

# Context vars para rastreamento distribuído
request_id_ctx: ContextVar[str] = ContextVar('request_id', default='')
trace_id_ctx: ContextVar[str] = ContextVar('trace_id', default='')

class EventSeverity(Enum):
    DEBUG = ("debug", 1.0)
    INFO = ("info", 3.0)
    WARNING = ("warning", 5.0)
    ERROR = ("error", 7.0)
    CRITICAL = ("critical", 9.0)

    def __init__(self, name: str, score: float):
        self.severity_name = name
        self.score = score

class EventCategory(Enum):
    SYSTEM = "system"
    APPLICATION = "application"
    SECURITY = "security"
    NETWORK = "network"
    DATABASE = "database"
    PERFORMANCE = "performance"
    BUSINESS = "business"

@dataclass
class EventClassification:
    type: str
    category: EventCategory
    severity: EventSeverity
    is_user_facing: bool = False
    is_slo_impacting: bool = False
    requires_immediate_action: bool = False

@dataclass
class SystemSnapshot:
    timestamp: float
    memory_usage_percent: float
    cpu_usage_percent: float
    disk_usage_percent: float
    network_connections: int
    load_average: List[float]

@dataclass
class ImpactAnalysis:
    services_affected: List[str] = field(default_factory=list)
    estimated_affected_users: int = 0
    estimated_revenue_impact_usd_per_hour: float = 0.0
    slo_violations: List[Dict] = field(default_factory=list)

@dataclass
class AIGuidance:
    automated_diagnosis: str = ""
    recommended_immediate_actions: List[Dict] = field(default_factory=list)
    recommended_investigation_steps: List[str] = field(default_factory=list)
    preventive_measures: List[str] = field(default_factory=list)
    automation_safe: bool = False
    estimated_time_to_resolution_minutes: int = 0

class AgenticLogger:
    """Logger otimizado para consumo por agentes de IA"""

    def __init__(self, 
                 service_name: str,
                 environment: str = "production",
                 output_file: Optional[str] = None):
        self.service_name = service_name
        self.environment = environment
        self.hostname = socket.gethostname()
        self.output_file = output_file

        # Sistema de correlação de eventos
        self.event_correlation = EventCorrelationEngine()

        # Cache de contexto do sistema
        self._last_system_snapshot = None
        self._snapshot_cache_seconds = 5

    def log_event(self,
                  event_type: str,
                  classification: EventClassification,
                  message: str,
                  context: Optional[Dict] = None,
                  impact: Optional[ImpactAnalysis] = None,
                  ai_guidance: Optional[AIGuidance] = None) -> str:
        """
        Loga evento estruturado otimizado para agentes de IA

        Args:
            event_type: Tipo hierárquico do evento (ex: system.memory.oom_killer)
            classification: Classificação do evento
            message: Descrição human-readable
            context: Contexto adicional específico do evento
            impact: Análise de impacto
            ai_guidance: Orientações para agentes de IA

        Returns:
            event_id: ID único do evento
        """

        # Gera ID único do evento
        event_id = self._generate_event_id(event_type)
        timestamp = datetime.now(timezone.utc)

        # Captura snapshot do sistema
        system_state = self._capture_system_state()

        # Obtém contexto distribuído
        request_id = request_id_ctx.get()
        trace_id = trace_id_ctx.get()

        # Constrói evento estruturado
        event = {
            "event_id": event_id,
            "event_version": "2.0",
            "timestamp": timestamp.isoformat(),
            "timestamp_unix": timestamp.timestamp(),

            "event_classification": {
                "type": event_type,
                "category": classification.category.value,
                "severity": classification.severity.severity_name,
                "severity_score": classification.severity.score,
                "is_user_facing": classification.is_user_facing,
                "is_slo_impacting": classification.is_slo_impacting,
                "requires_immediate_action": classification.requires_immediate_action
            },

            "source": {
                "service": self.service_name,
                "host": self.hostname,
                "environment": self.environment,
                "region": self._get_region(),
                "cluster": self._get_cluster()
            },

            "message": message,

            "context": context or {},

            "system_state": system_state,

            "distributed_tracing": {
                "request_id": request_id,
                "trace_id": trace_id
            },

            "impact_analysis": asdict(impact) if impact else None,

            "ai_guidance": asdict(ai_guidance) if ai_guidance else None,

            "relationships": self.event_correlation.get_relationships(event_id, event_type)
        }

        # Registra evento no correlation engine
        self.event_correlation.register_event(event_id, event_type, timestamp)

        # Persiste evento
        self._persist_event(event)

        return event_id

    def _generate_event_id(self, event_type: str) -> str:
        """Gera ID único e descritivo para o evento"""
        timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ")
        unique_id = str(uuid.uuid4())[:8]
        safe_type = event_type.replace(".", "_")
        return f"evt_{timestamp}_{self.hostname}_{safe_type}_{unique_id}"

    def _capture_system_state(self) -> Dict:
        """Captura snapshot do estado do sistema"""

        # Usa cache se recente
        current_time = time.time()
        if (self._last_system_snapshot and 
            current_time - self._last_system_snapshot['timestamp'] < self._snapshot_cache_seconds):
            return self._last_system_snapshot

        # Captura novo snapshot
        memory = psutil.virtual_memory()
        cpu_percent = psutil.cpu_percent(interval=0.1)
        disk = psutil.disk_usage('/')
        load_avg = psutil.getloadavg()
        net_connections = len(psutil.net_connections())

        snapshot = {
            "timestamp": current_time,
            "memory": {
                "total_bytes": memory.total,
                "available_bytes": memory.available,
                "used_bytes": memory.used,
                "usage_percent": memory.percent
            },
            "cpu": {
                "usage_percent": cpu_percent,
                "load_average_1min": load_avg[0],
                "load_average_5min": load_avg[1],
                "load_average_15min": load_avg[2],
                "core_count": psutil.cpu_count()
            },
            "disk": {
                "total_bytes": disk.total,
                "used_bytes": disk.used,
                "free_bytes": disk.free,
                "usage_percent": disk.percent
            },
            "network": {
                "active_connections": net_connections
            }
        }

        self._last_system_snapshot = snapshot
        return snapshot

    def _get_region(self) -> str:
        """Detecta região do cloud provider"""
        # Implementação simplificada - em produção consultaria metadata service
        return "us-east-1"

    def _get_cluster(self) -> str:
        """Detecta cluster/deployment group"""
        # Implementação simplificada
        return f"{self.service_name}-{self.environment}"

    def _persist_event(self, event: Dict):
        """Persiste evento estruturado"""

        # Serializa para JSON compacto
        event_json = json.dumps(event, separators=(',', ':'))

        # Escreve em arquivo se configurado
        if self.output_file:
            with open(self.output_file, 'a') as f:
                f.write(event_json + '\n')

        # Em produção, também enviaria para:
        # - Sistema de ingestão (Kafka, Kinesis)
        # - Storage de longo prazo (S3, GCS)
        # - Sistema de indexação (Elasticsearch, OpenSearch)
        # - Plataforma de observabilidade (Datadog, New Relic)

    # Métodos de conveniência para diferentes severidades

    def debug(self, event_type: str, message: str, **kwargs):
        """Log evento de debug"""
        classification = EventClassification(
            type=event_type,
            category=EventCategory.APPLICATION,
            severity=EventSeverity.DEBUG
        )
        return self.log_event(event_type, classification, message, **kwargs)

    def info(self, event_type: str, message: str, **kwargs):
        """Log evento informativo"""
        classification = EventClassification(
            type=event_type,
            category=EventCategory.APPLICATION,
            severity=EventSeverity.INFO
        )
        return self.log_event(event_type, classification, message, **kwargs)

    def warning(self, event_type: str, message: str, **kwargs):
        """Log evento de warning"""
        classification = EventClassification(
            type=event_type,
            category=EventCategory.APPLICATION,
            severity=EventSeverity.WARNING
        )
        return self.log_event(event_type, classification, message, **kwargs)

    def error(self, event_type: str, message: str, **kwargs):
        """Log evento de erro"""
        classification = EventClassification(
            type=event_type,
            category=EventCategory.APPLICATION,
            severity=EventSeverity.ERROR,
            is_user_facing=kwargs.pop('is_user_facing', True)
        )
        return self.log_event(event_type, classification, message, **kwargs)

    def critical(self, event_type: str, message: str, **kwargs):
        """Log evento crítico"""
        classification = EventClassification(
            type=event_type,
            category=EventCategory.SYSTEM,
            severity=EventSeverity.CRITICAL,
            is_user_facing=kwargs.pop('is_user_facing', True),
            is_slo_impacting=kwargs.pop('is_slo_impacting', True),
            requires_immediate_action=True
        )
        return self.log_event(event_type, classification, message, **kwargs)


class EventCorrelationEngine:
    """Motor de correlação de eventos para estabelecer relacionamentos"""

    def __init__(self):
        self.recent_events = []
        self.max_recent_events = 1000
        self.correlation_window_seconds = 300  # 5 minutos

        # Padrões de causalidade conhecidos
        self.causal_patterns = self._load_causal_patterns()

    def register_event(self, event_id: str, event_type: str, timestamp: datetime):
        """Registra evento para correlação"""
        self.recent_events.append({
            'event_id': event_id,
            'event_type': event_type,
            'timestamp': timestamp
        })

        # Limpa eventos antigos
        cutoff_time = datetime.now(timezone.utc).timestamp() - self.correlation_window_seconds
        self.recent_events = [
            e for e in self.recent_events 
            if e['timestamp'].timestamp() > cutoff_time
        ][-self.max_recent_events:]

    def get_relationships(self, event_id: str, event_type: str) -> Dict:
        """Determina relacionamentos do evento com outros eventos"""
        relationships = {
            "caused_by_events": [],
            "causes_events": [],
            "correlated_events": [],
            "similar_historical_events": []
        }

        # Procura causas conhecidas
        for pattern in self.causal_patterns:
            if pattern['effect'] == event_type:
                # Procura eventos causais recentes
                for recent in self.recent_events:
                    if recent['event_type'] == pattern['cause']:
                        time_diff = (
                            datetime.now(timezone.utc).timestamp() - 
                            recent['timestamp'].timestamp()
                        )
                        if time_diff <= pattern['max_delay_seconds']:
                            relationships['caused_by_events'].append({
                                'event_id': recent['event_id'],
                                'confidence': pattern['confidence'],
                                'time_difference_seconds': time_diff
                            })

        return relationships

    def _load_causal_patterns(self) -> List[Dict]:
        """Carrega padrões de causalidade conhecidos"""
        return [
            {
                'cause': 'system.memory.oom_killer.triggered',
                'effect': 'database.process.terminated',
                'max_delay_seconds': 2,
                'confidence': 0.98
            },
            {
                'cause': 'database.process.terminated',
                'effect': 'application.database.connection_failed',
                'max_delay_seconds': 5,
                'confidence': 0.95
            },
            {
                'cause': 'application.database.connection_failed',
                'effect': 'http.response.error_502',
                'max_delay_seconds': 1,
                'confidence': 0.92
            },
            {
                'cause': 'disk.space.critical',
                'effect': 'database.write.failed',
                'max_delay_seconds': 60,
                'confidence': 0.88
            },
            {
                'cause': 'network.interface.down',
                'effect': 'service.health_check.failed',
                'max_delay_seconds': 10,
                'confidence': 0.96
            }
        ]

3. Exemplo de Uso do Sistema Agentic

# exemplo_uso_agentic_logger.py
from agentic_logger import (
    AgenticLogger, 
    EventClassification, 
    EventCategory, 
    EventSeverity,
    ImpactAnalysis,
    AIGuidance
)

# Inicializa logger
logger = AgenticLogger(
    service_name="api-gateway",
    environment="production",
    output_file="/var/log/agentic/events.jsonl"
)

# Exemplo 1: Log de evento crítico com análise completa de impacto
def handle_oom_event():
    impact = ImpactAnalysis(
        services_affected=["api-gateway", "user-service", "checkout-service"],
        estimated_affected_users=1247,
        estimated_revenue_impact_usd_per_hour=15000.0,
        slo_violations=[
            {
                "slo_name": "api_availability_99_95",
                "current_value": 98.23,
                "threshold": 99.95,
                "breach_magnitude": 1.72
            }
        ]
    )

    ai_guidance = AIGuidance(
        automated_diagnosis="MySQL process terminated by OOM killer due to memory exhaustion. Cascading failures in dependent services.",
        recommended_immediate_actions=[
            {
                "action": "restart_mysql_service",
                "priority": 1,
                "automation_safe": True,
                "estimated_duration_seconds": 30,
                "command": "systemctl restart mysql"
            },
            {
                "action": "scale_memory_allocation",
                "priority": 2,
                "automation_safe": False,
                "requires_approval": True
            }
        ],
        recommended_investigation_steps=[
            "Check MySQL slow query log",
            "Review recent application deployments",
            "Analyze memory usage trends"
        ],
        preventive_measures=[
            "Implement memory alerts at 80% threshold",
            "Enable query result caching",
            "Configure connection pooling limits"
        ],
        automation_safe=False,
        estimated_time_to_resolution_minutes=15
    )

    event_id = logger.critical(
        event_type="system.memory.oom_killer.process_terminated",
        message="OOM killer terminated MySQL process (PID 9876) due to memory exhaustion",
        context={
            "process": {
                "name": "mysqld",
                "pid": 9876,
                "memory_used_bytes": 8589934592,
                "oom_score": 850
            }
        },
        impact=impact,
        ai_guidance=ai_guidance,
        is_user_facing=True,
        is_slo_impacting=True
    )

    print(f"Event logged: {event_id}")
    return event_id

# Exemplo 2: Log de requisição HTTP com contexto rico
def log_http_request(request_data):
    from contextvars import copy_context

    # Define request_id para rastreamento distribuído
    request_id_ctx.set(request_data['request_id'])
    trace_id_ctx.set(request_data['trace_id'])

    classification = EventClassification(
        type="http.request.completed",
        category=EventCategory.APPLICATION,
        severity=EventSeverity.INFO if request_data['status_code'] < 400 else EventSeverity.ERROR,
        is_user_facing=True,
        is_slo_impacting=request_data['status_code'] >= 500
    )

    context = {
        "http": {
            "method": request_data['method'],
            "path": request_data['path'],
            "status_code": request_data['status_code'],
            "response_time_ms": request_data['response_time_ms'],
            "user_agent": request_data['user_agent'],
            "client_ip": request_data['client_ip']
        },
        "performance": {
            "database_query_time_ms": request_data.get('db_time_ms', 0),
            "external_api_time_ms": request_data.get('api_time_ms', 0),
            "cache_hit": request_data.get('cache_hit', False)
        }
    }

    # Adiciona orientação AI para requisições lentas
    ai_guidance = None
    if request_data['response_time_ms'] > 1000:
        ai_guidance = AIGuidance(
            automated_diagnosis=f"Request exceeded latency SLO (1000ms). Actual: {request_data['response_time_ms']}ms",
            recommended_investigation_steps=[
                "Check database query performance",
                "Review external API response times",
                "Analyze cache hit rate"
            ],
            automation_safe=True
        )

    logger.log_event(
        event_type="http.request.completed",
        classification=classification,
        message=f"{request_data['method']} {request_data['path']} - {request_data['status_code']}",
        context=context,
        ai_guidance=ai_guidance
    )

# Exemplo 3: Log de deploy com métricas de sucesso
def log_deployment_event(deployment_info):
    classification = EventClassification(
        type="deployment.completed",
        category=EventCategory.APPLICATION,
        severity=EventSeverity.INFO,
        is_slo_impacting=False
    )

    ai_guidance = AIGuidance(
        automated_diagnosis="Deployment completed successfully. Monitoring for anomalies.",
        recommended_investigation_steps=[
            "Monitor error rates for next 30 minutes",
            "Compare performance metrics with baseline",
            "Verify canary deployment health"
        ],
        automation_safe=True
    )

    logger.log_event(
        event_type="deployment.completed",
        classification=classification,
        message=f"Deployed version {deployment_info['version']} to {deployment_info['environment']}",
        context={
            "deployment": {
                "version": deployment_info['version'],
                "environment": deployment_info['environment'],
                "strategy": deployment_info['strategy'],
                "duration_seconds": deployment_info['duration'],
                "deployed_by": deployment_info['user']
            }
        },
        ai_guidance=ai_guidance
    )

# Executa exemplos
if __name__ == "__main__":
    # Simula evento OOM
    handle_oom_event()

    # Simula requisição HTTP
    log_http_request({
        'request_id': 'req_abc123',
        'trace_id': 'trace_xyz789',
        'method': 'GET',
        'path': '/api/users/1234',
        'status_code': 200,
        'response_time_ms': 145,
        'user_agent': 'Mozilla/5.0',
        'client_ip': '192.168.1.100',
        'db_time_ms': 78,
        'api_time_ms': 32,
        'cache_hit': True
    })

    # Simula deployment
    log_deployment_event({
        'version': 'v2.5.3',
        'environment': 'production',
        'strategy': 'blue-green',
        'duration': 180,
        'user': 'deploy-bot'
    })

Integração com Agentes de IA

Agente de Análise de Eventos

# event_analysis_agent.py - Agente de IA para análise de eventos
import json
from typing import List, Dict
from datetime import datetime, timedelta
import openai

class EventAnalysisAgent:
    """Agente de IA para análise inteligente de eventos estruturados"""

    def __init__(self, events_file: str, openai_api_key: str):
        self.events_file = events_file
        openai.api_key = openai_api_key
        self.event_buffer = []

    def load_recent_events(self, minutes: int = 30) -> List[Dict]:
        """Carrega eventos recentes do arquivo de log"""
        cutoff_time = datetime.now().timestamp() - (minutes * 60)
        events = []

        with open(self.events_file, 'r') as f:
            for line in f:
                try:
                    event = json.loads(line)
                    if event['timestamp_unix'] > cutoff_time:
                        events.append(event)
                except json.JSONDecodeError:
                    continue

        return events

    def analyze_incident_pattern(self, events: List[Dict]) -> Dict:
        """Analisa padrões de incidente em eventos recentes"""

        # Agrupa eventos por tipo e severidade
        event_summary = {}
        critical_events = []
        error_cascade = []

        for event in events:
            event_type = event['event_classification']['type']
            severity = event['event_classification']['severity']

            if event_type not in event_summary:
                event_summary[event_type] = {'count': 0, 'severities': []}

            event_summary[event_type]['count'] += 1
            event_summary[event_type]['severities'].append(severity)

            if severity == 'critical':
                critical_events.append(event)

            # Detecta cascata de erros
            if event.get('relationships', {}).get('caused_by_events'):
                error_cascade.append(event)

        analysis = {
            'total_events': len(events),
            'event_types': len(event_summary),
            'critical_count': len(critical_events),
            'cascade_detected': len(error_cascade) > 0,
            'top_event_types': sorted(
                event_summary.items(),
                key=lambda x: x[1]['count'],
                reverse=True
            )[:5]
        }

        # Se há eventos críticos, analisa com LLM
        if critical_events:
            analysis['llm_diagnosis'] = self._get_llm_diagnosis(critical_events)

        return analysis

    def _get_llm_diagnosis(self, critical_events: List[Dict]) -> str:
        """Usa LLM para diagnóstico avançado"""

        # Prepara contexto para o LLM
        events_context = json.dumps([
            {
                'type': e['event_classification']['type'],
                'message': e['message'],
                'impact': e.get('impact_analysis'),
                'ai_guidance': e.get('ai_guidance')
            }
            for e in critical_events[:5]  # Limita a 5 eventos mais recentes
        ], indent=2)

        prompt = f"""Analise os seguintes eventos críticos de infraestrutura e forneça:
1. Diagnóstico da causa raiz
2. Impacto potencial no sistema
3. Ações recomendadas priorizadas
4. Previsão de recorrência

Eventos:
{events_context}

Forneça análise técnica detalhada focada em ações práticas para SRE/DevOps."""

        try:
            response = openai.ChatCompletion.create(
                model="gpt-4",
                messages=[
                    {"role": "system", "content": "Você é um especialista SRE analisando eventos de infraestrutura."},
                    {"role": "user", "content": prompt}
                ],
                temperature=0.3,
                max_tokens=1000
            )

            return response.choices[0].message.content
        except Exception as e:
            return f"Erro ao obter diagnóstico LLM: {str(e)}"

    def detect_anomalies(self, events: List[Dict]) -> List[Dict]:
        """Detecta anomalias em padrões de eventos"""
        anomalies = []

        # Analisa frequência de eventos por tipo
        event_counts = {}
        time_buckets = {}

        for event in events:
            event_type = event['event_classification']['type']
            timestamp = event['timestamp_unix']

            # Conta por tipo
            event_counts[event_type] = event_counts.get(event_type, 0) + 1

            # Agrupa em buckets de 5 minutos
            bucket = int(timestamp // 300) * 300
            if bucket not in time_buckets:
                time_buckets[bucket] = {}
            time_buckets[bucket][event_type] = time_buckets[bucket].get(event_type, 0) + 1

        # Detecta picos anormais
        for event_type, total_count in event_counts.items():
            avg_per_bucket = total_count / max(len(time_buckets), 1)

            for bucket, counts in time_buckets.items():
                count = counts.get(event_type, 0)
                if count > avg_per_bucket * 3:  # 3x acima da média
                    anomalies.append({
                        'type': 'event_spike',
                        'event_type': event_type,
                        'timestamp': bucket,
                        'count': count,
                        'baseline': avg_per_bucket,
                        'severity': 'high' if count > avg_per_bucket * 5 else 'medium'
                    })

        return anomalies

    def generate_incident_report(self, events: List[Dict]) -> str:
        """Gera relatório de incidente detalhado"""

        analysis = self.analyze_incident_pattern(events)
        anomalies = self.detect_anomalies(events)

        report = f"""
# Incident Analysis Report
Generated: {datetime.now().isoformat()}

## Summary
- Total Events: {analysis['total_events']}
- Critical Events: {analysis['critical_count']}
- Unique Event Types: {analysis['event_types']}
- Cascade Detected: {analysis['cascade_detected']}

## Top Event Types
"""

        for event_type, data in analysis['top_event_types']:
            report += f"\n- {event_type}: {data['count']} occurrences\n"

        if anomalies:
            report += "\n## Detected Anomalies\n"
            for anomaly in anomalies:
                report += f"\n- {anomaly['type']}: {anomaly['event_type']}\n"
                report += f"  Count: {anomaly['count']} (baseline: {anomaly['baseline']:.1f})\n"
                report += f"  Severity: {anomaly['severity']}\n"

        if 'llm_diagnosis' in analysis:
            report += f"\n## AI Diagnosis\n{analysis['llm_diagnosis']}\n"

        return report

Implementação de Storage Otimizado para Queries de IA

# agentic_event_storage.py - Storage otimizado para queries de agentes
import sqlite3
import json
from typing import List, Dict, Optional
from datetime import datetime, timedelta

class AgenticEventStorage:
    """Storage otimizado para queries eficientes por agentes de IA"""

    def __init__(self, db_path: str = "/var/lib/agentic/events.db"):
        self.db_path = db_path
        self._init_database()

    def _init_database(self):
        """Inicializa schema otimizado para queries de IA"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        # Tabela principal de eventos
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS events (
                event_id TEXT PRIMARY KEY,
                timestamp REAL NOT NULL,
                event_type TEXT NOT NULL,
                category TEXT NOT NULL,
                severity TEXT NOT NULL,
                severity_score REAL NOT NULL,
                is_user_facing BOOLEAN,
                is_slo_impacting BOOLEAN,
                service TEXT NOT NULL,
                host TEXT NOT NULL,
                environment TEXT NOT NULL,
                message TEXT,
                full_event JSON NOT NULL,
                indexed_at REAL DEFAULT (strftime('%s', 'now'))
            )
        """)

        # Índices otimizados para queries comuns de agentes
        cursor.execute("CREATE INDEX IF NOT EXISTS idx_timestamp ON events(timestamp DESC)")
        cursor.execute("CREATE INDEX IF NOT EXISTS idx_event_type ON events(event_type)")
        cursor.execute("CREATE INDEX IF NOT EXISTS idx_severity ON events(severity_score DESC)")
        cursor.execute("CREATE INDEX IF NOT EXISTS idx_service ON events(service, timestamp DESC)")
        cursor.execute("CREATE INDEX IF NOT EXISTS idx_slo_impact ON events(is_slo_impacting, timestamp DESC)")

        # Tabela de relacionamentos para grafos
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS event_relationships (
                from_event_id TEXT NOT NULL,
                to_event_id TEXT NOT NULL,
                relationship_type TEXT NOT NULL,
                confidence REAL,
                PRIMARY KEY (from_event_id, to_event_id, relationship_type),
                FOREIGN KEY (from_event_id) REFERENCES events(event_id),
                FOREIGN KEY (to_event_id) REFERENCES events(event_id)
            )
        """)

        cursor.execute("CREATE INDEX IF NOT EXISTS idx_from_event ON event_relationships(from_event_id)")
        cursor.execute("CREATE INDEX IF NOT EXISTS idx_to_event ON event_relationships(to_event_id)")

        # Tabela de métricas agregadas para análise temporal
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS metrics_timeseries (
                timestamp REAL NOT NULL,
                service TEXT NOT NULL,
                metric_name TEXT NOT NULL,
                metric_value REAL NOT NULL,
                metadata JSON,
                PRIMARY KEY (timestamp, service, metric_name)
            )
        """)

        cursor.execute("CREATE INDEX IF NOT EXISTS idx_metrics_ts ON metrics_timeseries(service, metric_name, timestamp DESC)")

        conn.commit()
        conn.close()

    def store_event(self, event: Dict):
        """Armazena evento estruturado"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        cursor.execute("""
            INSERT OR REPLACE INTO events (
                event_id, timestamp, event_type, category, severity,
                severity_score, is_user_facing, is_slo_impacting,
                service, host, environment, message, full_event
            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
        """, (
            event['event_id'],
            event['timestamp_unix'],
            event['event_classification']['type'],
            event['event_classification']['category'],
            event['event_classification']['severity'],
            event['event_classification']['severity_score'],
            event['event_classification']['is_user_facing'],
            event['event_classification']['is_slo_impacting'],
            event['source']['service'],
            event['source']['host'],
            event['source']['environment'],
            event.get('message', ''),
            json.dumps(event)
        ))

        # Armazena relacionamentos
        if 'relationships' in event:
            for caused_by in event['relationships'].get('caused_by_events', []):
                cursor.execute("""
                    INSERT OR IGNORE INTO event_relationships
                    (from_event_id, to_event_id, relationship_type, confidence)
                    VALUES (?, ?, 'caused_by', ?)
                """, (caused_by['event_id'], event['event_id'], caused_by.get('confidence', 1.0)))

        conn.commit()
        conn.close()

    def query_by_timerange(self,
                          start_time: datetime,
                          end_time: Optional[datetime] = None,
                          min_severity_score: float = 0.0) -> List[Dict]:
        """Query otimizada por intervalo de tempo"""
        if end_time is None:
            end_time = datetime.now()

        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        cursor.execute("""
            SELECT full_event FROM events
            WHERE timestamp >= ? AND timestamp <= ?
            AND severity_score >= ?
            ORDER BY timestamp DESC
        """, (start_time.timestamp(), end_time.timestamp(), min_severity_score))

        events = [json.loads(row[0]) for row in cursor.fetchall()]
        conn.close()

        return events

    def query_by_service(self,
                        service: str,
                        hours: int = 24,
                        include_related: bool = False) -> List[Dict]:
        """Query eventos de um serviço específico"""
        cutoff_time = datetime.now().timestamp() - (hours * 3600)

        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        cursor.execute("""
            SELECT full_event FROM events
            WHERE service = ? AND timestamp >= ?
            ORDER BY timestamp DESC
        """, (service, cutoff_time))

        events = [json.loads(row[0]) for row in cursor.fetchall()]

        if include_related and events:
            # Busca eventos relacionados
            event_ids = [e['event_id'] for e in events]
            placeholders = ','.join(['?' for _ in event_ids])

            cursor.execute(f"""
                SELECT DISTINCT e.full_event
                FROM events e
                JOIN event_relationships r ON e.event_id = r.from_event_id
                WHERE r.to_event_id IN ({placeholders})
            """, event_ids)

            related = [json.loads(row[0]) for row in cursor.fetchall()]
            events.extend(related)

        conn.close()
        return events

    def get_event_graph(self, event_id: str, depth: int = 3) -> Dict:
        """Obtém grafo de relacionamentos de um evento"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        nodes = {}
        edges = []
        visited = set()
        to_explore = [(event_id, 0)]

        while to_explore:
            current_id, current_depth = to_explore.pop(0)

            if current_id in visited or current_depth > depth:
                continue

            visited.add(current_id)

            # Busca evento
            cursor.execute("SELECT full_event FROM events WHERE event_id = ?", (current_id,))
            row = cursor.fetchone()

            if row:
                event = json.loads(row[0])
                nodes[current_id] = {
                    'event_type': event['event_classification']['type'],
                    'severity': event['event_classification']['severity'],
                    'timestamp': event['timestamp'],
                    'message': event.get('message', '')
                }

                # Busca relacionamentos
                cursor.execute("""
                    SELECT to_event_id, relationship_type, confidence
                    FROM event_relationships
                    WHERE from_event_id = ?
                """, (current_id,))

                for to_id, rel_type, confidence in cursor.fetchall():
                    edges.append({
                        'from': current_id,
                        'to': to_id,
                        'type': rel_type,
                        'confidence': confidence
                    })

                    if current_depth < depth:
                        to_explore.append((to_id, current_depth + 1))

        conn.close()

        return {
            'nodes': nodes,
            'edges': edges,
            'root_event': event_id
        }

    def aggregate_metrics(self,
                         service: str,
                         metric_name: str,
                         start_time: datetime,
                         end_time: datetime,
                         granularity_seconds: int = 300) -> List[Dict]:
        """Agrega métricas para análise temporal"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        cursor.execute("""
            SELECT 
                CAST(timestamp / ? AS INTEGER) * ? as bucket,
                AVG(metric_value) as avg_value,
                MIN(metric_value) as min_value,
                MAX(metric_value) as max_value,
                COUNT(*) as sample_count
            FROM metrics_timeseries
            WHERE service = ? 
            AND metric_name = ?
            AND timestamp >= ?
            AND timestamp <= ?
            GROUP BY bucket
            ORDER BY bucket
        """, (
            granularity_seconds, 
            granularity_seconds,
            service,
            metric_name,
            start_time.timestamp(),
            end_time.timestamp()
        ))

        results = []
        for row in cursor.fetchall():
            results.append({
                'timestamp': row[0],
                'avg': row[1],
                'min': row[2],
                'max': row[3],
                'samples': row[4]
            })

        conn.close()
        return results

Melhores Práticas para Logs Agentic

1. Taxonomia Consistente de Eventos

Estabeleça uma taxonomia hierárquica clara:

# event_taxonomy.yaml - Taxonomia de eventos padronizada
system:
  memory:
    - oom_killer.triggered
    - oom_killer.process_terminated
    - allocation.failed
    - swap.activated

  cpu:
    - load.high
    - load.critical
    - throttling.detected

  disk:
    - space.warning
    - space.critical
    - io.slow
    - io.failed

application:
  database:
    - connection.failed
    - connection.timeout
    - query.slow
    - query.failed
    - replication.lag

  http:
    - request.completed
    - request.failed
    - response.slow

  cache:
    - hit
    - miss
    - eviction
    - expired

network:
  connectivity:
    - interface.down
    - packet.loss
    - latency.high

  security:
    - firewall.blocked
    - ddos.detected

business:
  transaction:
    - completed
    - failed
    - refunded

  user:
    - signup
    - login
    - logout
    - subscription.changed

2. Níveis de Severidade Padronizados

# Mapeamento de severidade para ações
SEVERITY_ACTIONS = {
    'debug': {
        'alert': False,
        'page': False,
        'auto_remediate': False,
        'retention_days': 7
    },
    'info': {
        'alert': False,
        'page': False,
        'auto_remediate': False,
        'retention_days': 30
    },
    'warning': {
        'alert': True,
        'page': False,
        'auto_remediate': True,
        'retention_days': 90
    },
    'error': {
        'alert': True,
        'page': False,
        'auto_remediate': True,
        'retention_days': 180
    },
    'critical': {
        'alert': True,
        'page': True,
        'auto_remediate': False,  # Requer aprovação humana
        'retention_days': 365
    }
}

3. Contexto de Negócio Sempre Presente

# Enriquecimento com contexto de negócio
BUSINESS_CONTEXT = {
    'api-gateway': {
        'revenue_per_hour': 10000,
        'sla_tier': 'tier1',
        'oncall_team': 'platform-team'
    },
    'checkout-service': {
        'revenue_per_hour': 50000,
        'sla_tier': 'tier0',
        'oncall_team': 'checkout-team'
    }
}

def enrich_with_business_context(event: Dict, service: str) -> Dict:
    """Adiciona contexto de negócio ao evento"""
    context = BUSINESS_CONTEXT.get(service, {})

    if 'impact_analysis' not in event:
        event['impact_analysis'] = {}

    event['impact_analysis']['revenue_per_hour'] = context.get('revenue_per_hour', 0)
    event['impact_analysis']['sla_tier'] = context.get('sla_tier', 'tier3')
    event['oncall_team'] = context.get('oncall_team', 'platform-team')

    return event

A transição para agentic infrastructure representa uma mudança paradigmática na forma como projetamos e operamos sistemas de infraestrutura modernos. Ao estruturar logs, métricas e eventos especificamente para consumo por agentes de inteligência artificial, desbloqueamos capacidades completamente novas de automação, análise preditiva e auto-remediação que eram impraticáveis com abordagens tradicionais baseadas em texto livre.

Os benefícios dessa transformação são substanciais e mensuráveis: agentes de IA podem processar e correlacionar milhões de eventos em milissegundos, identificar padrões causais complexos que escapariam à análise humana, prever falhas antes que ocorram e tomar ações corretivas automaticamente quando seguro. Mais importante, essa abordagem libera engenheiros de infraestrutura para focarem em trabalho estratégico e criativo, enquanto sistemas autônomos gerenciam operações rotineiras.

No entanto, a implementação bem-sucedida de agentic infrastructure exige disciplina arquitetural rigorosa: taxonomias de eventos consistentes, estruturação semântica completa, enriquecimento com contexto de negócio e storage otimizado para queries complexas. Organizações que investirem nessa transformação agora estarão posicionadas para aproveitar a próxima geração de ferramentas de observabilidade e automação baseadas em IA que dominarão a infraestrutura em 2026 e além.

Aqui no Portal do Linux, continuaremos explorando e compartilhando experiências práticas sobre como implementar e operar esses sistemas agentic em ambientes Linux, Kubernetes e infraestrutura cloud moderna. O futuro da infraestrutura não está apenas em coletar mais dados, mas em estruturá-los de forma que máquinas inteligentes possam compreender, raciocinar e agir autonomamente.