Raven (compact) usage
How it works
Standard Raven workflow looks like this:
- You send AI model logs to raven though http
- If one or more single logs are suspicion/unusual, alert(s) for them will be triggered immediately
- Logs are aggregated into time-based batches (default is 1 minute), keyed by a model id
- If one or more aggregated batches are suspicious/unusual, alert(s) for them will be triggered (including short-term data drift alerts)
- Batches got saved into Clickhouse and available for further analysis
Sending AI model logs
Logs are sent to Raven (compact) through http.
This is endpoint you need: http://{RAVEN_HOST}/input/ai/log/insert
Normally you would do it from inside your cluster, then your RAVEN_HOST will be a raven-compact-service:8080
.
You can also do it from outside the cluster, with a simple port-forward like this:
kubectl port-forward svc/raven-compact-service 8080:8080
after that you can use localhost:8080
as your RAVEN_HOST.
You can find Swagger API docs here: http://{RAVEN_HOST}/input/docs
, it describes required format of the request,
and also here's a simple curl for reference (PLS NOTE that 'timestamp' should not be older than current UTC time - 2
minutes,
if it's older - log will be ignored):
curl -X 'POST' \
'http://{RAVEN_HOST}/input/ai/log/insert' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '[
{
"modelId": "model1",
"confidenceScore": 0.69,
"responseTimeMs": 50,
"timestamp": "2025-09-01T09:50:00.413Z",
"numericFeatures": {
"additionalProp1": 0.21
},
"categoricalFeatures": {
"additionalProp2": "value1"
}
}
]'
There is a simple sdk for inserting Raven AI logs, available for Scala and Python:
Scala
To add raven-sdk
to your project, add the following to your build.sbt
(with latest version):
libraryDependencies += "com.raven" %% "raven-sdk" % "1.0.0"
Usage:
import cats.effect.{IO, IOApp}
import com.raven.client.AiLogHttpProducer
import com.raven.domain.AiLog
import java.time.Instant
import org.http4s.Uri
import org.typelevel.log4cats.LoggerFactory
import org.typelevel.log4cats.slf4j.Slf4jFactory
object Example extends IOApp.Simple {
private implicit val loggerFactory: LoggerFactory[IO] = Slf4jFactory.create[IO]
val sampleLog = AiLog(
modelId = "model1",
confidenceScore = 0.95,
responseTimeMs = 150,
timestamp = Instant.now(),
numericFeatures = Map("temperature" -> 0.7, "tokens" -> 100.0),
categoricalFeatures = Map("task" -> "completion", "version" -> "v1")
)
def run: IO[Unit] =
AiLogHttpProducer.load(Uri("http://localhost:8080")).use { producer =>
producer.produceAiLogs(Seq(sampleLog))
}
}
Python
To install: pip install raven-sdk
, example usage:
import asyncio
from datetime import datetime
from raven import AiLog, AiLogHttpProducer
async def main():
# Create producer
producer = AiLogHttpProducer("http://localhost:8080")
# Create AI log
log = AiLog(
modelId="model1",
confidenceScore=0.95,
responseTimeMs=150,
timestamp=datetime.utcnow(),
numericFeatures={"temperature": 0.7},
categoricalFeatures={"type": "completion"}
)
# Send logs (no start/stop needed)
await producer.produce_ai_logs([log])
asyncio.run(main())
Managing alerts
Alert(s) will be triggered based on configuration you set up during helm installation ( see configuration), sent to external outputs like email or slack (if configured), and will be stored in Postgres table. You can see them on dashboard, and mark them as 'Handled' from there (for more detail, see dashboard)
Logs aggregation
Logs are aggregated into time-based batches, keyed by a model id. You can configure aggregation interval in helm
values.yaml (windowTimeRange
)
together with batchMaxRetention
parameter, which defines how long batches are stored in cache before they got
processed.
This is very important! The batch will not be processed and saved if batchMaxRetention
time is not exceeded.
For example, if you have 1-minute aggregation interval and 3-minute batchMaxRetention
, then you send some logs between 10:00 and 10:01,
this batch will only be processed and saved after 10:03, cause until then Raven awaits for potential late-arriving logs.
So if you will set this param too high, you will lose actual realtime-ness.
There is also one other aggregation param - dataDriftReferencePeriod
. It's required for real-time data drift alerts,
and defines a reference period for data drift calculation, meaning if you have 1-hour dataDriftReferencePeriod
, then
alert will be sent if current batch input features are very different from the features of the last 1 hour.
All processed batches are stored in clickhouse (table ai_logs_averages
), and used for long-term data drift analytics on dashboard.
You can also use them for your own analytics if needed.