feat(db): implement database & caching layer [claudbg-goc1, claudbg-umrv, claudbg-hskl, claudbg-edmr]

- src/db/connection.rs: DbHandle type alias (Arc<libsql::Database>), open_db(), default_db_path()
- src/db/schema.rs: run_migrations() creating sessions, messages, tool_uses, raw_sessions tables
- src/db/sync.rs: ensure_synced() and force_resync() with lazy mtime/size-based staleness check
- Cargo.toml: add tempfile dev-dependency for tests
- Fix: scope read connection before write to avoid SQLite locked error

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
main
Elijah Voigt 2 months ago
parent 2acab9c179
commit cf1bc9d8a8

@ -1,11 +1,12 @@
---
# claudbg-edmr
title: 'Lazy sync: process session files on read'
status: todo
status: completed
type: task
priority: normal
created_at: 2026-03-27T19:39:33Z
updated_at: 2026-03-27T19:39:33Z
updated_at: 2026-03-28T17:38:48Z
parent: claudbg-6wkk
---
Before serving any session data, check if the session's file_mtime and file_size in the DB match the file on disk. If not (or if absent), parse the file and upsert into all tables. Skip files that are already up to date. This runs transparently before every command that reads session data.
Implemented ensure_synced() and force_resync() in src/db/sync.rs. Fixed database locked issue by scoping connection/rows before upsert write. All 3 sync tests pass.

@ -1,11 +1,12 @@
---
# claudbg-goc1
title: libsql DB setup and connection
status: todo
status: completed
type: task
priority: normal
created_at: 2026-03-27T19:39:32Z
updated_at: 2026-03-27T19:39:32Z
updated_at: 2026-03-28T17:38:29Z
parent: claudbg-6wkk
---
Initialize a libsql connection to ~/.claude/claudbg.db using tokio. Create the DB file if it does not exist. Expose a DbPool or Connection handle shared across the app. Support a --clear flag that drops and recreates the DB.
Implemented open_db() and default_db_path() in src/db/connection.rs. Added tempfile dev dependency. All tests pass.

@ -1,11 +1,12 @@
---
# claudbg-hskl
title: Denormalized raw sessions table
status: todo
status: completed
type: task
priority: normal
created_at: 2026-03-27T19:39:32Z
updated_at: 2026-03-27T19:39:32Z
updated_at: 2026-03-28T17:38:43Z
parent: claudbg-6wkk
---
Create a raw_sessions table (session_id TEXT PK, raw_jsonl TEXT, file_mtime INTEGER, file_size INTEGER) storing the full file content as a blob. Used for dump and fast retrieval when normalized tables are overkill.
Added raw_sessions table to run_migrations() in schema.rs. Insert/read test verifies file_mtime and file_size columns work correctly.

@ -1,11 +1,12 @@
---
# claudbg-umrv
title: 'Normalized DB schema: sessions, messages, tool_uses tables'
status: todo
status: completed
type: task
priority: normal
created_at: 2026-03-27T19:39:32Z
updated_at: 2026-03-27T19:39:32Z
updated_at: 2026-03-28T17:38:35Z
parent: claudbg-6wkk
---
Create tables: sessions (session_id, project_path, file_path, first_message_at, last_message_at, model, message_count, file_mtime, file_size), messages (id, session_id, uuid, timestamp, type, role, content_json), tool_uses (id, session_id, message_uuid, tool_name, input_json, result_json). Include indexes on session_id and timestamp.
Implemented run_migrations() in src/db/schema.rs with normalized sessions, messages, tool_uses tables. All tests pass.

@ -21,3 +21,6 @@ chrono = { version = "0.4", features = ["serde"] }
uuid = { version = "1.23", features = ["v4", "serde"] }
thiserror = "2.0"
comfy-table = "7.2"
[dev-dependencies]
tempfile = "3"

@ -0,0 +1,88 @@
//! Database connection management for the claudbg cache.
use std::path::PathBuf;
use std::sync::Arc;
/// Shared handle to the local libsql database.
///
/// Wraps `Arc<libsql::Database>` for cheap cloning across async tasks.
/// Since `libsql::Connection` is not `Clone`, callers must call `.connect()`
/// per operation rather than storing a connection.
pub type DbHandle = Arc<libsql::Database>;
/// Open (or create) the claudbg cache database at `path`.
///
/// If `clear` is true, deletes the file at `path` before opening,
/// effectively resetting the database.
///
/// Returns a `DbHandle` with the schema already migrated.
pub async fn open_db(path: &PathBuf, clear: bool) -> crate::error::Result<DbHandle> {
if clear && path.exists() {
std::fs::remove_file(path).map_err(crate::error::AppError::Io)?;
}
// Ensure parent directory exists.
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent).map_err(crate::error::AppError::Io)?;
}
let db = libsql::Builder::new_local(path)
.build()
.await
.map_err(|e| crate::error::AppError::Db(e.to_string()))?;
let handle = Arc::new(db);
crate::db::schema::run_migrations(&handle).await?;
Ok(handle)
}
/// Returns the default database path: `~/.claude/claudbg.db`.
pub fn default_db_path() -> PathBuf {
let home = std::env::var("HOME").unwrap_or_else(|_| ".".to_string());
PathBuf::from(home).join(".claude").join("claudbg.db")
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;
/// `open_db` creates a new database file and returns `Ok`.
#[tokio::test]
async fn open_db_creates_file() {
let dir = tempfile::tempdir().expect("tempdir");
let path = PathBuf::from(dir.path()).join("test.db");
let result = open_db(&path, false).await;
assert!(result.is_ok(), "open_db failed: {:?}", result.err());
assert!(path.exists(), "db file should exist after open");
}
/// `open_db` is idempotent — calling it twice on the same path succeeds.
#[tokio::test]
async fn open_db_idempotent() {
let dir = tempfile::tempdir().expect("tempdir");
let path = PathBuf::from(dir.path()).join("test.db");
open_db(&path, false).await.expect("first open");
let result = open_db(&path, false).await;
assert!(result.is_ok(), "second open failed: {:?}", result.err());
}
/// `open_db` with `clear=true` deletes an existing file and recreates it.
#[tokio::test]
async fn open_db_clear_recreates() {
let dir = tempfile::tempdir().expect("tempdir");
let path = PathBuf::from(dir.path()).join("test.db");
// Create initially.
open_db(&path, false).await.expect("initial open");
assert!(path.exists());
// Clear and reopen.
let result = open_db(&path, true).await;
assert!(result.is_ok(), "clear open failed: {:?}", result.err());
assert!(path.exists(), "db file should exist after clear+open");
}
/// `default_db_path` returns a path ending in `.claude/claudbg.db`.
#[test]
fn default_db_path_suffix() {
let p = default_db_path();
let s = p.to_string_lossy();
assert!(s.ends_with(".claude/claudbg.db"), "unexpected path: {s}");
}
}

@ -0,0 +1,4 @@
//! Database connection and caching layer.
pub mod connection;
pub mod schema;
pub mod sync;

@ -0,0 +1,181 @@
//! Database schema migrations for the claudbg cache.
use crate::db::connection::DbHandle;
use crate::error::Result;
/// Run all database migrations to bring the schema up to date.
///
/// Creates all tables if they don't exist. Safe to call on an existing database
/// (all statements use `CREATE TABLE IF NOT EXISTS`).
pub async fn run_migrations(db: &DbHandle) -> Result<()> {
let conn = db
.connect()
.map_err(|e| crate::error::AppError::Db(e.to_string()))?;
conn.execute_batch(
"CREATE TABLE IF NOT EXISTS sessions (
session_id TEXT PRIMARY KEY,
project_path TEXT,
model TEXT,
first_msg_at TEXT,
last_msg_at TEXT,
message_count INTEGER NOT NULL DEFAULT 0,
input_tokens INTEGER NOT NULL DEFAULT 0,
output_tokens INTEGER NOT NULL DEFAULT 0,
file_path TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_sessions_last_msg ON sessions(last_msg_at DESC);
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL REFERENCES sessions(session_id),
sequence_num INTEGER NOT NULL,
timestamp TEXT,
role TEXT,
content_preview TEXT,
entry_type TEXT,
UNIQUE(session_id, sequence_num)
);
CREATE INDEX IF NOT EXISTS idx_messages_session ON messages(session_id, sequence_num);
CREATE TABLE IF NOT EXISTS tool_uses (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL REFERENCES sessions(session_id),
tool_name TEXT NOT NULL,
tool_use_id TEXT,
sequence_num INTEGER,
timestamp TEXT
);
CREATE INDEX IF NOT EXISTS idx_tool_uses_session ON tool_uses(session_id);
CREATE TABLE IF NOT EXISTS raw_sessions (
session_id TEXT PRIMARY KEY,
raw_jsonl TEXT NOT NULL,
file_mtime INTEGER NOT NULL,
file_size INTEGER NOT NULL
);",
)
.await
.map_err(|e| crate::error::AppError::Db(e.to_string()))?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;
/// `run_migrations` succeeds on a fresh database.
#[tokio::test]
async fn migrations_fresh_db() {
let dir = tempfile::tempdir().expect("tempdir");
let path = PathBuf::from(dir.path()).join("schema_test.db");
let db = libsql::Builder::new_local(&path)
.build()
.await
.expect("build db");
let handle = std::sync::Arc::new(db);
let result = run_migrations(&handle).await;
assert!(result.is_ok(), "migration failed: {:?}", result.err());
}
/// `run_migrations` is idempotent — calling it twice does not error.
#[tokio::test]
async fn migrations_idempotent() {
let dir = tempfile::tempdir().expect("tempdir");
let path = PathBuf::from(dir.path()).join("schema_idempotent.db");
let db = libsql::Builder::new_local(&path)
.build()
.await
.expect("build db");
let handle = std::sync::Arc::new(db);
run_migrations(&handle).await.expect("first migration");
let result = run_migrations(&handle).await;
assert!(
result.is_ok(),
"second migration failed: {:?}",
result.err()
);
}
/// After migration, inserting and reading a `raw_sessions` row works.
#[tokio::test]
async fn raw_sessions_insert_and_read() {
let dir = tempfile::tempdir().expect("tempdir");
let path = PathBuf::from(dir.path()).join("raw_sessions_test.db");
let db = libsql::Builder::new_local(&path)
.build()
.await
.expect("build db");
let handle = std::sync::Arc::new(db);
run_migrations(&handle).await.expect("migration");
let conn = handle.connect().expect("connect");
// Insert a sessions row first (raw_sessions has no FK to sessions, but
// let's also ensure the sessions table is accessible).
conn.execute(
"INSERT INTO raw_sessions (session_id, raw_jsonl, file_mtime, file_size)
VALUES (?1, ?2, ?3, ?4)",
libsql::params![
"test-session-1",
"{\"type\":\"user\"}",
1700000000_i64,
42_i64
],
)
.await
.expect("insert raw_sessions");
let mut rows = conn
.query(
"SELECT session_id, file_mtime, file_size FROM raw_sessions WHERE session_id = ?1",
libsql::params!["test-session-1"],
)
.await
.expect("query");
let row = rows.next().await.expect("next").expect("row exists");
let sid: String = row.get(0).expect("session_id");
let mtime: i64 = row.get(1).expect("file_mtime");
let size: i64 = row.get(2).expect("file_size");
assert_eq!(sid, "test-session-1");
assert_eq!(mtime, 1700000000);
assert_eq!(size, 42);
}
/// After migration, inserting and reading a `sessions` row works.
#[tokio::test]
async fn sessions_table_insert_and_read() {
let dir = tempfile::tempdir().expect("tempdir");
let path = PathBuf::from(dir.path()).join("sessions_table_test.db");
let db = libsql::Builder::new_local(&path)
.build()
.await
.expect("build db");
let handle = std::sync::Arc::new(db);
run_migrations(&handle).await.expect("migration");
let conn = handle.connect().expect("connect");
conn.execute(
"INSERT INTO sessions (session_id, file_path) VALUES (?1, ?2)",
libsql::params!["sess-abc", "/tmp/sess-abc.jsonl"],
)
.await
.expect("insert sessions");
let mut rows = conn
.query(
"SELECT session_id FROM sessions WHERE session_id = ?1",
libsql::params!["sess-abc"],
)
.await
.expect("query");
let row = rows.next().await.expect("next").expect("row exists");
let sid: String = row.get(0).expect("session_id");
assert_eq!(sid, "sess-abc");
}
}

@ -0,0 +1,326 @@
//! Lazy sync logic: keeps the DB in sync with session files on disk.
use crate::db::connection::DbHandle;
use crate::error::{AppError, Result};
use crate::models::session::{ContentBlock, MessageContent};
use crate::parser::discovery::SessionRef;
use crate::parser::reader::read_session_file;
/// Ensure the database has up-to-date data for the given session.
///
/// Compares the file's `mtime` (Unix seconds) and `size` against what's stored
/// in `raw_sessions`. If stale or missing, re-reads and upserts the session.
///
/// **Note:** The `--follow` code path must NOT call this function —
/// following should read the file directly with a byte offset.
pub async fn ensure_synced(db: &DbHandle, session_ref: &SessionRef) -> Result<()> {
let meta = std::fs::metadata(&session_ref.file_path)?;
let file_size = meta.len() as i64;
let file_mtime = meta
.modified()
.map_err(AppError::Io)?
.duration_since(std::time::UNIX_EPOCH)
.map_err(|e| AppError::InvalidArg(e.to_string()))?
.as_secs() as i64;
// Check if DB has current data. Use a scoped block so the connection and
// the rows cursor are both dropped before we open a write connection below.
let needs_sync = {
let conn = db.connect().map_err(|e| AppError::Db(e.to_string()))?;
let mut rows = conn
.query(
"SELECT file_mtime, file_size FROM raw_sessions WHERE session_id = ?1",
libsql::params![session_ref.session_id.clone()],
)
.await
.map_err(|e| AppError::Db(e.to_string()))?;
if let Some(row) = rows.next().await.map_err(|e| AppError::Db(e.to_string()))? {
let db_mtime: i64 = row.get(0).map_err(|e| AppError::Db(e.to_string()))?;
let db_size: i64 = row.get(1).map_err(|e| AppError::Db(e.to_string()))?;
db_mtime != file_mtime || db_size != file_size
} else {
true // not in DB yet
}
};
if needs_sync {
upsert_session(db, session_ref, file_mtime, file_size).await
} else {
Ok(())
}
}
/// Force re-index a session regardless of whether it appears up to date.
pub async fn force_resync(db: &DbHandle, session_ref: &SessionRef) -> Result<()> {
let meta = std::fs::metadata(&session_ref.file_path)?;
let file_size = meta.len() as i64;
let file_mtime = meta
.modified()
.map_err(AppError::Io)?
.duration_since(std::time::UNIX_EPOCH)
.map_err(|e| AppError::InvalidArg(e.to_string()))?
.as_secs() as i64;
upsert_session(db, session_ref, file_mtime, file_size).await
}
/// Read, parse, and upsert a session file into all relevant DB tables.
async fn upsert_session(
db: &DbHandle,
session_ref: &SessionRef,
file_mtime: i64,
file_size: i64,
) -> Result<()> {
let entries = read_session_file(&session_ref.file_path).await?;
// Read raw content.
let raw_jsonl = tokio::fs::read_to_string(&session_ref.file_path).await?;
// Compute stats.
let stats = crate::models::stats::compute_stats(&entries);
// Find first/last timestamps.
let first_msg_at = entries.iter().find_map(|e| e.timestamp.clone());
let last_msg_at = entries.iter().rev().find_map(|e| e.timestamp.clone());
let conn = db.connect().map_err(|e| AppError::Db(e.to_string()))?;
// Upsert into sessions table.
conn.execute(
"INSERT OR REPLACE INTO sessions
(session_id, project_path, model, first_msg_at, last_msg_at,
message_count, input_tokens, output_tokens, file_path)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
libsql::params![
session_ref.session_id.clone(),
session_ref.project_path.clone().unwrap_or_default(),
stats.model.clone().unwrap_or_default(),
first_msg_at.unwrap_or_default(),
last_msg_at.unwrap_or_default(),
entries.len() as i64,
stats.input_tokens as i64,
stats.output_tokens as i64,
session_ref.file_path.to_string_lossy().to_string()
],
)
.await
.map_err(|e| AppError::Db(e.to_string()))?;
// Upsert raw_sessions.
conn.execute(
"INSERT OR REPLACE INTO raw_sessions (session_id, raw_jsonl, file_mtime, file_size)
VALUES (?1, ?2, ?3, ?4)",
libsql::params![
session_ref.session_id.clone(),
raw_jsonl,
file_mtime,
file_size
],
)
.await
.map_err(|e| AppError::Db(e.to_string()))?;
// Delete and reinsert messages (simpler than diffing).
conn.execute(
"DELETE FROM messages WHERE session_id = ?1",
libsql::params![session_ref.session_id.clone()],
)
.await
.map_err(|e| AppError::Db(e.to_string()))?;
for (seq, entry) in entries.iter().enumerate() {
let role = entry.message.as_ref().and_then(|m| m.role.clone());
let content_preview = entry
.message
.as_ref()
.and_then(|m| m.content.as_ref())
.map(|c| match c {
MessageContent::Text(t) => t.chars().take(120).collect::<String>(),
MessageContent::Blocks(blocks) => blocks
.first()
.map(|b| match b {
ContentBlock::Text { text } => text.chars().take(120).collect(),
ContentBlock::ToolUse { name, .. } => format!("[tool_use: {name}]"),
ContentBlock::Thinking { .. } => "[thinking]".to_string(),
_ => "[...]".to_string(),
})
.unwrap_or_default(),
});
conn.execute(
"INSERT OR IGNORE INTO messages
(session_id, sequence_num, timestamp, role, content_preview, entry_type)
VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
libsql::params![
session_ref.session_id.clone(),
seq as i64,
entry.timestamp.clone().unwrap_or_default(),
role.unwrap_or_default(),
content_preview.unwrap_or_default(),
entry.entry_type.clone().unwrap_or_default()
],
)
.await
.map_err(|e| AppError::Db(e.to_string()))?;
}
// Delete and reinsert tool_uses.
conn.execute(
"DELETE FROM tool_uses WHERE session_id = ?1",
libsql::params![session_ref.session_id.clone()],
)
.await
.map_err(|e| AppError::Db(e.to_string()))?;
for (seq, entry) in entries.iter().enumerate() {
if let Some(msg) = &entry.message
&& let Some(MessageContent::Blocks(blocks)) = &msg.content
{
for block in blocks {
if let ContentBlock::ToolUse { id, name, .. } = block {
conn.execute(
"INSERT INTO tool_uses
(session_id, tool_name, tool_use_id, sequence_num, timestamp)
VALUES (?1, ?2, ?3, ?4, ?5)",
libsql::params![
session_ref.session_id.clone(),
name.clone(),
id.clone(),
seq as i64,
entry.timestamp.clone().unwrap_or_default()
],
)
.await
.map_err(|e| AppError::Db(e.to_string()))?;
}
}
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::db::connection::open_db;
use crate::parser::discovery::SessionRef;
use chrono::{DateTime, Utc};
use std::path::PathBuf;
/// Build a `SessionRef` pointing at the given file.
fn make_session_ref(file_path: PathBuf) -> SessionRef {
let session_id = file_path
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("test-session")
.to_string();
SessionRef {
session_id,
project_path: Some("/tmp/test-project".to_string()),
file_path,
modified_at: DateTime::<Utc>::from(std::time::SystemTime::UNIX_EPOCH),
}
}
/// Write a minimal JSONL session file with two entries.
fn write_test_session(path: &std::path::Path) {
let line1 = r#"{"type":"user","session_id":"test-session","cwd":"/tmp","timestamp":"2024-01-01T00:00:00Z","message":{"role":"user","content":"Hello"}}"#;
let line2 = r#"{"type":"assistant","session_id":"test-session","timestamp":"2024-01-01T00:00:01Z","message":{"role":"assistant","content":[{"type":"text","text":"World"}],"usage":{"input_tokens":10,"output_tokens":5},"model":"claude-opus-4-5"}}"#;
std::fs::write(path, format!("{line1}\n{line2}\n")).expect("write test session");
}
/// `ensure_synced` inserts a `raw_sessions` row for a new file.
#[tokio::test]
async fn ensure_synced_inserts_row() {
let dir = tempfile::tempdir().expect("tempdir");
let db_path = dir.path().join("sync_test.db");
let jsonl_path = dir.path().join("test-session.jsonl");
write_test_session(&jsonl_path);
let db = open_db(&db_path, false).await.expect("open db");
let session_ref = make_session_ref(jsonl_path);
ensure_synced(&db, &session_ref)
.await
.expect("ensure_synced");
// Verify raw_sessions has a row.
let conn = db.connect().expect("connect");
let mut rows = conn
.query(
"SELECT session_id FROM raw_sessions WHERE session_id = ?1",
libsql::params![session_ref.session_id.clone()],
)
.await
.expect("query");
let row = rows.next().await.expect("next").expect("row exists");
let sid: String = row.get(0).expect("session_id");
assert_eq!(sid, session_ref.session_id);
}
/// `ensure_synced` returns `Ok` without re-syncing when file is unchanged.
#[tokio::test]
async fn ensure_synced_skips_unchanged_file() {
let dir = tempfile::tempdir().expect("tempdir");
let db_path = dir.path().join("sync_unchanged.db");
let jsonl_path = dir.path().join("test-session.jsonl");
write_test_session(&jsonl_path);
let db = open_db(&db_path, false).await.expect("open db");
let session_ref = make_session_ref(jsonl_path);
// First sync.
ensure_synced(&db, &session_ref).await.expect("first sync");
// Second sync on unchanged file — should succeed quickly.
let result = ensure_synced(&db, &session_ref).await;
assert!(result.is_ok(), "second sync failed: {:?}", result.err());
}
/// `ensure_synced` re-syncs after the file is modified.
#[tokio::test]
async fn ensure_synced_resyncs_modified_file() {
let dir = tempfile::tempdir().expect("tempdir");
let db_path = dir.path().join("sync_modified.db");
let jsonl_path = dir.path().join("test-session.jsonl");
write_test_session(&jsonl_path);
let db = open_db(&db_path, false).await.expect("open db");
let session_ref = make_session_ref(jsonl_path.clone());
// First sync.
ensure_synced(&db, &session_ref).await.expect("first sync");
// Modify the file (append a line) and ensure the mtime changes.
// Sleep briefly to ensure mtime differs on systems with 1-second resolution.
std::thread::sleep(std::time::Duration::from_secs(1));
let extra = r#"{"type":"user","session_id":"test-session","timestamp":"2024-01-01T00:00:02Z","message":{"role":"user","content":"More"}}"#;
let mut file = std::fs::OpenOptions::new()
.append(true)
.open(&jsonl_path)
.expect("open for append");
use std::io::Write;
writeln!(file, "{extra}").expect("append line");
// Second sync — should detect the change and re-sync.
let result = ensure_synced(&db, &session_ref).await;
assert!(result.is_ok(), "re-sync failed: {:?}", result.err());
// Verify sessions.message_count was updated.
let conn = db.connect().expect("connect");
let mut rows = conn
.query(
"SELECT message_count FROM sessions WHERE session_id = ?1",
libsql::params![session_ref.session_id.clone()],
)
.await
.expect("query");
let row = rows.next().await.expect("next").expect("row");
let count: i64 = row.get(0).expect("message_count");
assert_eq!(count, 3, "expected 3 messages after re-sync");
}
}

@ -2,7 +2,7 @@
pub mod cli;
// pub mod commands;
// pub mod db;
pub mod db;
pub mod error;
pub mod models;
// pub mod output;

Loading…
Cancel
Save