Raven (compact) usage

How it works

Standard Raven workflow looks like this:

  1. You send AI model logs to raven though http
  2. If one or more single logs are suspicion/unusual, alert(s) for them will be triggered immediately
  3. Logs are aggregated into time-based batches (default is 1 minute), keyed by a model id
  4. If one or more aggregated batches are suspicious/unusual, alert(s) for them will be triggered (including short-term data drift alerts)
  5. Batches got saved into Clickhouse and available for further analysis

Sending AI model logs

Logs are sent to Raven (compact) through http. This is endpoint you need: http://{RAVEN_HOST}/input/ai/log/insert Normally you would do it from inside your cluster, then your RAVEN_HOST will be a raven-compact-service:8080. You can also do it from outside the cluster, with a simple port-forward like this:

    kubectl port-forward svc/raven-compact-service 8080:8080

after that you can use localhost:8080 as your RAVEN_HOST. You can find Swagger API docs here: http://{RAVEN_HOST}/input/docs, it describes required format of the request, and also here's a simple curl for reference (PLS NOTE that 'timestamp' should not be older than current UTC time - 2 minutes, if it's older - log will be ignored):

curl -X 'POST' \
  'http://{RAVEN_HOST}/input/ai/log/insert' \
  -H 'accept: application/json' \
  -H 'Content-Type: application/json' \
  -d '[
  {
    "modelId": "model1",
    "confidenceScore": 0.69,
    "responseTimeMs": 50,
    "timestamp": "2025-09-01T09:50:00.413Z",
    "numericFeatures": {
      "additionalProp1": 0.21
    },
    "categoricalFeatures": {
      "additionalProp2": "value1"
    }
  }
]'

There is a simple sdk for inserting Raven AI logs, available for Scala and Python:

Scala

To add raven-sdk to your project, add the following to your build.sbt (with latest version):

libraryDependencies += "com.raven" %% "raven-sdk" % "1.0.0"

Usage:

import cats.effect.{IO, IOApp}
import com.raven.client.AiLogHttpProducer
import com.raven.domain.AiLog
import java.time.Instant
import org.http4s.Uri
import org.typelevel.log4cats.LoggerFactory
import org.typelevel.log4cats.slf4j.Slf4jFactory

object Example extends IOApp.Simple {

  private implicit val loggerFactory: LoggerFactory[IO] = Slf4jFactory.create[IO]

  val sampleLog = AiLog(
    modelId = "model1",
    confidenceScore = 0.95,
    responseTimeMs = 150,
    timestamp = Instant.now(),
    numericFeatures = Map("temperature" -> 0.7, "tokens" -> 100.0),
    categoricalFeatures = Map("task" -> "completion", "version" -> "v1")
  )

  def run: IO[Unit] =
    AiLogHttpProducer.load(Uri("http://localhost:8080")).use { producer =>
      producer.produceAiLogs(Seq(sampleLog))
    }
}

Python

To install: pip install raven-sdk, example usage:

import asyncio
from datetime import datetime
from raven import AiLog, AiLogHttpProducer


async def main():
    # Create producer
    producer = AiLogHttpProducer("http://localhost:8080")

    # Create AI log
    log = AiLog(
        modelId="model1",
        confidenceScore=0.95,
        responseTimeMs=150,
        timestamp=datetime.utcnow(),
        numericFeatures={"temperature": 0.7},
        categoricalFeatures={"type": "completion"}
    )

    # Send logs (no start/stop needed)
    await producer.produce_ai_logs([log])


asyncio.run(main())

Managing alerts

Alert(s) will be triggered based on configuration you set up during helm installation ( see configuration), sent to external outputs like email or slack (if configured), and will be stored in Postgres table. You can see them on dashboard, and mark them as 'Handled' from there (for more detail, see dashboard)

Logs aggregation

Logs are aggregated into time-based batches, keyed by a model id. You can configure aggregation interval in helm values.yaml (windowTimeRange) together with batchMaxRetention parameter, which defines how long batches are stored in cache before they got processed. This is very important! The batch will not be processed and saved if batchMaxRetention time is not exceeded. For example, if you have 1-minute aggregation interval and 3-minute batchMaxRetention, then you send some logs between 10:00 and 10:01, this batch will only be processed and saved after 10:03, cause until then Raven awaits for potential late-arriving logs. So if you will set this param too high, you will lose actual realtime-ness.

There is also one other aggregation param - dataDriftReferencePeriod. It's required for real-time data drift alerts, and defines a reference period for data drift calculation, meaning if you have 1-hour dataDriftReferencePeriod, then alert will be sent if current batch input features are very different from the features of the last 1 hour.

All processed batches are stored in clickhouse (table ai_logs_averages), and used for long-term data drift analytics on dashboard. You can also use them for your own analytics if needed.