Skip to content

Wide Events

Вместо десятков строк логов на каждый запрос — один JSON-объект с полным контекстом. Идея из loggingsucks.com.

❌ Традиционно:
INFO JWT validated user_id=abc123
DEBUG DB query duration_ms=47
WARN Subscription expires in 3 days user_id=abc123
INFO Request completed status=200 duration_ms=89
✅ Wide Event:
{
"request_id": "req_8f7a",
"path": "/api/v1/me",
"status": 200,
"duration_ms": 89,
"user": { "id": "abc123", "plan": "pro", "subscription_expires_in_days": 3 },
"db": { "queries": 2, "duration_ms": 47 },
"outcome": "success"
}

Одно событие. Всё что нужно для дебага — уже внутри.


Структура Wide Event для этого проекта

Section titled “Структура Wide Event для этого проекта”
internal/observability/event.go
type WideEvent struct {
// --- Запрос ---
RequestID string `json:"request_id"`
TraceID string `json:"trace_id,omitempty"`
Timestamp time.Time `json:"timestamp"`
Service string `json:"service"`
Version string `json:"version"`
// --- HTTP ---
Method string `json:"method,omitempty"`
Path string `json:"path,omitempty"`
Status int `json:"status,omitempty"`
DurationMS int64 `json:"duration_ms,omitempty"`
IP string `json:"ip,omitempty"`
UserAgent string `json:"user_agent,omitempty"`
// --- Пользователь (бизнес-контекст) ---
User *UserContext `json:"user,omitempty"`
// --- База данных ---
DB *DBContext `json:"db,omitempty"`
// --- Xray / Нода ---
Node *NodeContext `json:"node,omitempty"`
// --- Воркер / Фоновая задача ---
Job *JobContext `json:"job,omitempty"`
// --- Ошибка ---
Error *ErrorContext `json:"error,omitempty"`
// --- Итог ---
Outcome string `json:"outcome"` // "success" | "error" | "partial"
// --- Произвольные поля (для обогащения в хендлерах) ---
Extra map[string]any `json:"extra,omitempty"`
}
type UserContext struct {
ID string `json:"id"`
Plan string `json:"plan"` // "free", "pro", "unlimited"
SubscriptionStatus string `json:"subscription_status"` // "active", "expired", ...
SubscriptionExpiresIn int `json:"subscription_expires_in_days,omitempty"`
TrafficUsedGB float64 `json:"traffic_used_gb,omitempty"`
TrafficLimitGB float64 `json:"traffic_limit_gb,omitempty"`
}
type DBContext struct {
Queries int `json:"queries"`
DurationMS int64 `json:"duration_ms"`
SlowQuery bool `json:"slow_query,omitempty"` // > 200ms
}
type NodeContext struct {
ID string `json:"id"`
Name string `json:"name"`
Country string `json:"country"`
Action string `json:"action"` // "add_client", "remove_client", "get_stats"
DurationMS int64 `json:"duration_ms"`
}
type JobContext struct {
ID string `json:"id"`
Type string `json:"type"` // "sync_user", "collect_traffic", ...
Attempt int `json:"attempt"`
DurationMS int64 `json:"duration_ms"`
}
type ErrorContext struct {
Type string `json:"type"`
Code string `json:"code"`
Message string `json:"message"`
}

internal/observability/middleware.go
func WideEventMiddleware(cfg *config.Config) fiber.Handler {
return func(c fiber.Ctx) error {
start := time.Now()
event := &WideEvent{
RequestID: c.Get("X-Request-ID", uuid.New().String()),
Timestamp: start,
Service: "astral-api",
Version: cfg.Version,
Method: c.Method(),
Path: c.Path(),
IP: c.IP(),
UserAgent: c.Get("User-Agent"),
}
// Кладём в контекст Fiber — хендлеры будут обогащать событие
c.Locals("wide_event", event)
err := c.Next()
// Заполняем после обработки запроса
event.DurationMS = time.Since(start).Milliseconds()
event.Status = c.Response().StatusCode()
if err != nil {
event.Outcome = "error"
event.Error = &ErrorContext{
Type: reflect.TypeOf(err).Name(),
Message: err.Error(),
}
} else {
event.Outcome = "success"
}
if shouldSample(event) {
emit(event)
}
return err
}
}
// Достать событие из контекста в хендлере
func EventFromCtx(c fiber.Ctx) *WideEvent {
e, _ := c.Locals("wide_event").(*WideEvent)
return e
}

Обогащение в хендлерах

Section titled “Обогащение в хендлерах”
internal/handler/subscription.go
func (h *Handler) GetSubscription(c fiber.Ctx) error {
event := observability.EventFromCtx(c)
user := c.Locals("user").(*domain.User)
sub, err := h.subService.GetActive(user.ID)
if err != nil {
return err
}
// Обогащаем событие бизнес-контекстом
event.User = &observability.UserContext{
ID: user.ID,
Plan: sub.Plan.Name,
SubscriptionStatus: string(sub.Status),
SubscriptionExpiresIn: int(time.Until(sub.ExpiresAt).Hours() / 24),
TrafficUsedGB: float64(sub.TrafficUsedBytes) / 1e9,
TrafficLimitGB: float64(sub.TrafficLimitBytes) / 1e9,
}
return c.JSON(sub)
}
internal/worker/sync_user.go
func (w *Worker) SyncUser(ctx context.Context, task *asynq.Task) error {
start := time.Now()
var payload SyncUserPayload
json.Unmarshal(task.Payload(), &payload)
event := &observability.WideEvent{
RequestID: uuid.New().String(),
Timestamp: start,
Service: "astral-worker",
Job: &observability.JobContext{
ID: task.ResultWriter().TaskID(),
Type: "sync_user",
Attempt: task.Retried() + 1,
},
}
defer func() {
event.DurationMS = time.Since(start).Milliseconds()
observability.Emit(event)
}()
// ... логика синхронизации ...
event.User = &observability.UserContext{ID: payload.UserID}
event.Node = &observability.NodeContext{
ID: node.ID,
Name: node.Name,
Country: node.Country,
Action: "add_client",
}
event.Outcome = "success"
return nil
}

Не сохраняем всё подряд — только то, что имеет ценность:

internal/observability/sampling.go
func shouldSample(e *WideEvent) bool {
// Всегда сохраняем ошибки
if e.Outcome == "error" || e.Status >= 500 {
return true
}
// Всегда сохраняем медленные запросы (> 1s)
if e.DurationMS > 1000 {
return true
}
// Всегда сохраняем события синхронизации с нодами
if e.Node != nil {
return true
}
// Всегда сохраняем фоновые задачи
if e.Job != nil {
return true
}
// Пользователи у которых подписка истекает скоро
if e.User != nil && e.User.SubscriptionExpiresIn <= 3 {
return true
}
// Медленные DB-запросы
if e.DB != nil && e.DB.SlowQuery {
return true
}
// Остальное — 10% случайно
return rand.Float32() < 0.10
}

Куда отправлять события

Section titled “Куда отправлять события”

Вариант A — stdout → файл → Vector → ClickHouse (рекомендуется)

Section titled “Вариант A — stdout → файл → Vector → ClickHouse (рекомендуется)”
internal/observability/emit.go
var logger = zap.New(zapcore.NewCore(
zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig()),
os.Stdout, // Docker читает stdout
zap.InfoLevel,
))
func Emit(e *WideEvent) {
logger.Info("wide_event", zap.Any("event", e))
}

Vector (легковесный агент) читает stdout контейнера и отправляет в ClickHouse. ClickHouse идеален для high-cardinality данных — запросы по user_id, plan, node_id работают мгновенно даже на сотнях миллионов событий.

# vector.toml (на сервере)
[sources.docker_logs]
type = "docker_logs"
include_containers = ["astral-api", "astral-worker"]
[sinks.clickhouse]
type = "clickhouse"
inputs = ["docker_logs"]
endpoint = "http://clickhouse:8123"
table = "events"

Вариант B — только stdout (на старте)

Section titled “Вариант B — только stdout (на старте)”

Если ClickHouse избыточно на начальном этапе — просто пишем JSON в stdout. Потом можно подключить любой бэкенд без изменения кода.


Запросы которые становятся возможными

Section titled “Запросы которые становятся возможными”
-- Все ошибки у пользователей с планом "pro" за последний час
SELECT * FROM events
WHERE user.plan = 'pro'
AND outcome = 'error'
AND timestamp > now() - INTERVAL 1 HOUR
-- Медленные синхронизации с конкретной нодой
SELECT * FROM events
WHERE node.id = 'uuid'
AND node.action = 'add_client'
AND duration_ms > 500
-- Пользователи которые исчерпали > 90% трафика
SELECT user.id, user.traffic_used_gb, user.traffic_limit_gb
FROM events
WHERE user.traffic_used_gb / user.traffic_limit_gb > 0.9
AND outcome = 'success'

Что заменяет, что остаётся

Section titled “Что заменяет, что остаётся”
До После
zap.Info("Request started ...") Убираем
zap.Debug("DB query ...") Убираем
zap.Warn("Slow query ...") Поле db.slow_query = true в событии
zap.Error("Sync failed ...") Поле error + outcome = "error" в событии
Prometheus метрики Можно агрегировать из событий в ClickHouse
zap.Fatal(...) Оставляем — для критических ошибок старта (DB недоступна и т.д.)