diff --git a/.beans/claudbg-edmr--lazy-sync-process-session-files-on-read.md b/.beans/claudbg-edmr--lazy-sync-process-session-files-on-read.md index ce3edf4..2630b63 100644 --- a/.beans/claudbg-edmr--lazy-sync-process-session-files-on-read.md +++ b/.beans/claudbg-edmr--lazy-sync-process-session-files-on-read.md @@ -1,11 +1,12 @@ --- # claudbg-edmr title: 'Lazy sync: process session files on read' -status: todo +status: completed type: task +priority: normal created_at: 2026-03-27T19:39:33Z -updated_at: 2026-03-27T19:39:33Z +updated_at: 2026-03-28T17:38:48Z parent: claudbg-6wkk --- -Before serving any session data, check if the session's file_mtime and file_size in the DB match the file on disk. If not (or if absent), parse the file and upsert into all tables. Skip files that are already up to date. This runs transparently before every command that reads session data. +Implemented ensure_synced() and force_resync() in src/db/sync.rs. Fixed database locked issue by scoping connection/rows before upsert write. All 3 sync tests pass. diff --git a/.beans/claudbg-goc1--libsql-db-setup-and-connection.md b/.beans/claudbg-goc1--libsql-db-setup-and-connection.md index f85685f..5f3c1b7 100644 --- a/.beans/claudbg-goc1--libsql-db-setup-and-connection.md +++ b/.beans/claudbg-goc1--libsql-db-setup-and-connection.md @@ -1,11 +1,12 @@ --- # claudbg-goc1 title: libsql DB setup and connection -status: todo +status: completed type: task +priority: normal created_at: 2026-03-27T19:39:32Z -updated_at: 2026-03-27T19:39:32Z +updated_at: 2026-03-28T17:38:29Z parent: claudbg-6wkk --- -Initialize a libsql connection to ~/.claude/claudbg.db using tokio. Create the DB file if it does not exist. Expose a DbPool or Connection handle shared across the app. Support a --clear flag that drops and recreates the DB. +Implemented open_db() and default_db_path() in src/db/connection.rs. Added tempfile dev dependency. All tests pass. diff --git a/.beans/claudbg-hskl--denormalized-raw-sessions-table.md b/.beans/claudbg-hskl--denormalized-raw-sessions-table.md index 331e326..9a8d524 100644 --- a/.beans/claudbg-hskl--denormalized-raw-sessions-table.md +++ b/.beans/claudbg-hskl--denormalized-raw-sessions-table.md @@ -1,11 +1,12 @@ --- # claudbg-hskl title: Denormalized raw sessions table -status: todo +status: completed type: task +priority: normal created_at: 2026-03-27T19:39:32Z -updated_at: 2026-03-27T19:39:32Z +updated_at: 2026-03-28T17:38:43Z parent: claudbg-6wkk --- -Create a raw_sessions table (session_id TEXT PK, raw_jsonl TEXT, file_mtime INTEGER, file_size INTEGER) storing the full file content as a blob. Used for dump and fast retrieval when normalized tables are overkill. +Added raw_sessions table to run_migrations() in schema.rs. Insert/read test verifies file_mtime and file_size columns work correctly. diff --git a/.beans/claudbg-umrv--normalized-db-schema-sessions-messages-tool-uses-t.md b/.beans/claudbg-umrv--normalized-db-schema-sessions-messages-tool-uses-t.md index 511c35f..da5619f 100644 --- a/.beans/claudbg-umrv--normalized-db-schema-sessions-messages-tool-uses-t.md +++ b/.beans/claudbg-umrv--normalized-db-schema-sessions-messages-tool-uses-t.md @@ -1,11 +1,12 @@ --- # claudbg-umrv title: 'Normalized DB schema: sessions, messages, tool_uses tables' -status: todo +status: completed type: task +priority: normal created_at: 2026-03-27T19:39:32Z -updated_at: 2026-03-27T19:39:32Z +updated_at: 2026-03-28T17:38:35Z parent: claudbg-6wkk --- -Create tables: sessions (session_id, project_path, file_path, first_message_at, last_message_at, model, message_count, file_mtime, file_size), messages (id, session_id, uuid, timestamp, type, role, content_json), tool_uses (id, session_id, message_uuid, tool_name, input_json, result_json). Include indexes on session_id and timestamp. +Implemented run_migrations() in src/db/schema.rs with normalized sessions, messages, tool_uses tables. All tests pass. diff --git a/Cargo.toml b/Cargo.toml index aa4377d..56e7899 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,3 +21,6 @@ chrono = { version = "0.4", features = ["serde"] } uuid = { version = "1.23", features = ["v4", "serde"] } thiserror = "2.0" comfy-table = "7.2" + +[dev-dependencies] +tempfile = "3" diff --git a/src/db/connection.rs b/src/db/connection.rs new file mode 100644 index 0000000..d1959c2 --- /dev/null +++ b/src/db/connection.rs @@ -0,0 +1,88 @@ +//! Database connection management for the claudbg cache. + +use std::path::PathBuf; +use std::sync::Arc; + +/// Shared handle to the local libsql database. +/// +/// Wraps `Arc` for cheap cloning across async tasks. +/// Since `libsql::Connection` is not `Clone`, callers must call `.connect()` +/// per operation rather than storing a connection. +pub type DbHandle = Arc; + +/// Open (or create) the claudbg cache database at `path`. +/// +/// If `clear` is true, deletes the file at `path` before opening, +/// effectively resetting the database. +/// +/// Returns a `DbHandle` with the schema already migrated. +pub async fn open_db(path: &PathBuf, clear: bool) -> crate::error::Result { + if clear && path.exists() { + std::fs::remove_file(path).map_err(crate::error::AppError::Io)?; + } + // Ensure parent directory exists. + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent).map_err(crate::error::AppError::Io)?; + } + let db = libsql::Builder::new_local(path) + .build() + .await + .map_err(|e| crate::error::AppError::Db(e.to_string()))?; + let handle = Arc::new(db); + crate::db::schema::run_migrations(&handle).await?; + Ok(handle) +} + +/// Returns the default database path: `~/.claude/claudbg.db`. +pub fn default_db_path() -> PathBuf { + let home = std::env::var("HOME").unwrap_or_else(|_| ".".to_string()); + PathBuf::from(home).join(".claude").join("claudbg.db") +} + +#[cfg(test)] +mod tests { + use super::*; + use std::path::PathBuf; + + /// `open_db` creates a new database file and returns `Ok`. + #[tokio::test] + async fn open_db_creates_file() { + let dir = tempfile::tempdir().expect("tempdir"); + let path = PathBuf::from(dir.path()).join("test.db"); + let result = open_db(&path, false).await; + assert!(result.is_ok(), "open_db failed: {:?}", result.err()); + assert!(path.exists(), "db file should exist after open"); + } + + /// `open_db` is idempotent — calling it twice on the same path succeeds. + #[tokio::test] + async fn open_db_idempotent() { + let dir = tempfile::tempdir().expect("tempdir"); + let path = PathBuf::from(dir.path()).join("test.db"); + open_db(&path, false).await.expect("first open"); + let result = open_db(&path, false).await; + assert!(result.is_ok(), "second open failed: {:?}", result.err()); + } + + /// `open_db` with `clear=true` deletes an existing file and recreates it. + #[tokio::test] + async fn open_db_clear_recreates() { + let dir = tempfile::tempdir().expect("tempdir"); + let path = PathBuf::from(dir.path()).join("test.db"); + // Create initially. + open_db(&path, false).await.expect("initial open"); + assert!(path.exists()); + // Clear and reopen. + let result = open_db(&path, true).await; + assert!(result.is_ok(), "clear open failed: {:?}", result.err()); + assert!(path.exists(), "db file should exist after clear+open"); + } + + /// `default_db_path` returns a path ending in `.claude/claudbg.db`. + #[test] + fn default_db_path_suffix() { + let p = default_db_path(); + let s = p.to_string_lossy(); + assert!(s.ends_with(".claude/claudbg.db"), "unexpected path: {s}"); + } +} diff --git a/src/db/mod.rs b/src/db/mod.rs new file mode 100644 index 0000000..b8ec173 --- /dev/null +++ b/src/db/mod.rs @@ -0,0 +1,4 @@ +//! Database connection and caching layer. +pub mod connection; +pub mod schema; +pub mod sync; diff --git a/src/db/schema.rs b/src/db/schema.rs new file mode 100644 index 0000000..da180e4 --- /dev/null +++ b/src/db/schema.rs @@ -0,0 +1,181 @@ +//! Database schema migrations for the claudbg cache. + +use crate::db::connection::DbHandle; +use crate::error::Result; + +/// Run all database migrations to bring the schema up to date. +/// +/// Creates all tables if they don't exist. Safe to call on an existing database +/// (all statements use `CREATE TABLE IF NOT EXISTS`). +pub async fn run_migrations(db: &DbHandle) -> Result<()> { + let conn = db + .connect() + .map_err(|e| crate::error::AppError::Db(e.to_string()))?; + + conn.execute_batch( + "CREATE TABLE IF NOT EXISTS sessions ( + session_id TEXT PRIMARY KEY, + project_path TEXT, + model TEXT, + first_msg_at TEXT, + last_msg_at TEXT, + message_count INTEGER NOT NULL DEFAULT 0, + input_tokens INTEGER NOT NULL DEFAULT 0, + output_tokens INTEGER NOT NULL DEFAULT 0, + file_path TEXT NOT NULL + ); + CREATE INDEX IF NOT EXISTS idx_sessions_last_msg ON sessions(last_msg_at DESC); + + CREATE TABLE IF NOT EXISTS messages ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + session_id TEXT NOT NULL REFERENCES sessions(session_id), + sequence_num INTEGER NOT NULL, + timestamp TEXT, + role TEXT, + content_preview TEXT, + entry_type TEXT, + UNIQUE(session_id, sequence_num) + ); + CREATE INDEX IF NOT EXISTS idx_messages_session ON messages(session_id, sequence_num); + + CREATE TABLE IF NOT EXISTS tool_uses ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + session_id TEXT NOT NULL REFERENCES sessions(session_id), + tool_name TEXT NOT NULL, + tool_use_id TEXT, + sequence_num INTEGER, + timestamp TEXT + ); + CREATE INDEX IF NOT EXISTS idx_tool_uses_session ON tool_uses(session_id); + + CREATE TABLE IF NOT EXISTS raw_sessions ( + session_id TEXT PRIMARY KEY, + raw_jsonl TEXT NOT NULL, + file_mtime INTEGER NOT NULL, + file_size INTEGER NOT NULL + );", + ) + .await + .map_err(|e| crate::error::AppError::Db(e.to_string()))?; + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::path::PathBuf; + + /// `run_migrations` succeeds on a fresh database. + #[tokio::test] + async fn migrations_fresh_db() { + let dir = tempfile::tempdir().expect("tempdir"); + let path = PathBuf::from(dir.path()).join("schema_test.db"); + let db = libsql::Builder::new_local(&path) + .build() + .await + .expect("build db"); + let handle = std::sync::Arc::new(db); + let result = run_migrations(&handle).await; + assert!(result.is_ok(), "migration failed: {:?}", result.err()); + } + + /// `run_migrations` is idempotent — calling it twice does not error. + #[tokio::test] + async fn migrations_idempotent() { + let dir = tempfile::tempdir().expect("tempdir"); + let path = PathBuf::from(dir.path()).join("schema_idempotent.db"); + let db = libsql::Builder::new_local(&path) + .build() + .await + .expect("build db"); + let handle = std::sync::Arc::new(db); + run_migrations(&handle).await.expect("first migration"); + let result = run_migrations(&handle).await; + assert!( + result.is_ok(), + "second migration failed: {:?}", + result.err() + ); + } + + /// After migration, inserting and reading a `raw_sessions` row works. + #[tokio::test] + async fn raw_sessions_insert_and_read() { + let dir = tempfile::tempdir().expect("tempdir"); + let path = PathBuf::from(dir.path()).join("raw_sessions_test.db"); + let db = libsql::Builder::new_local(&path) + .build() + .await + .expect("build db"); + let handle = std::sync::Arc::new(db); + run_migrations(&handle).await.expect("migration"); + + let conn = handle.connect().expect("connect"); + + // Insert a sessions row first (raw_sessions has no FK to sessions, but + // let's also ensure the sessions table is accessible). + conn.execute( + "INSERT INTO raw_sessions (session_id, raw_jsonl, file_mtime, file_size) + VALUES (?1, ?2, ?3, ?4)", + libsql::params![ + "test-session-1", + "{\"type\":\"user\"}", + 1700000000_i64, + 42_i64 + ], + ) + .await + .expect("insert raw_sessions"); + + let mut rows = conn + .query( + "SELECT session_id, file_mtime, file_size FROM raw_sessions WHERE session_id = ?1", + libsql::params!["test-session-1"], + ) + .await + .expect("query"); + + let row = rows.next().await.expect("next").expect("row exists"); + let sid: String = row.get(0).expect("session_id"); + let mtime: i64 = row.get(1).expect("file_mtime"); + let size: i64 = row.get(2).expect("file_size"); + + assert_eq!(sid, "test-session-1"); + assert_eq!(mtime, 1700000000); + assert_eq!(size, 42); + } + + /// After migration, inserting and reading a `sessions` row works. + #[tokio::test] + async fn sessions_table_insert_and_read() { + let dir = tempfile::tempdir().expect("tempdir"); + let path = PathBuf::from(dir.path()).join("sessions_table_test.db"); + let db = libsql::Builder::new_local(&path) + .build() + .await + .expect("build db"); + let handle = std::sync::Arc::new(db); + run_migrations(&handle).await.expect("migration"); + + let conn = handle.connect().expect("connect"); + conn.execute( + "INSERT INTO sessions (session_id, file_path) VALUES (?1, ?2)", + libsql::params!["sess-abc", "/tmp/sess-abc.jsonl"], + ) + .await + .expect("insert sessions"); + + let mut rows = conn + .query( + "SELECT session_id FROM sessions WHERE session_id = ?1", + libsql::params!["sess-abc"], + ) + .await + .expect("query"); + + let row = rows.next().await.expect("next").expect("row exists"); + let sid: String = row.get(0).expect("session_id"); + assert_eq!(sid, "sess-abc"); + } +} diff --git a/src/db/sync.rs b/src/db/sync.rs new file mode 100644 index 0000000..b61ed86 --- /dev/null +++ b/src/db/sync.rs @@ -0,0 +1,326 @@ +//! Lazy sync logic: keeps the DB in sync with session files on disk. + +use crate::db::connection::DbHandle; +use crate::error::{AppError, Result}; +use crate::models::session::{ContentBlock, MessageContent}; +use crate::parser::discovery::SessionRef; +use crate::parser::reader::read_session_file; + +/// Ensure the database has up-to-date data for the given session. +/// +/// Compares the file's `mtime` (Unix seconds) and `size` against what's stored +/// in `raw_sessions`. If stale or missing, re-reads and upserts the session. +/// +/// **Note:** The `--follow` code path must NOT call this function — +/// following should read the file directly with a byte offset. +pub async fn ensure_synced(db: &DbHandle, session_ref: &SessionRef) -> Result<()> { + let meta = std::fs::metadata(&session_ref.file_path)?; + let file_size = meta.len() as i64; + let file_mtime = meta + .modified() + .map_err(AppError::Io)? + .duration_since(std::time::UNIX_EPOCH) + .map_err(|e| AppError::InvalidArg(e.to_string()))? + .as_secs() as i64; + + // Check if DB has current data. Use a scoped block so the connection and + // the rows cursor are both dropped before we open a write connection below. + let needs_sync = { + let conn = db.connect().map_err(|e| AppError::Db(e.to_string()))?; + let mut rows = conn + .query( + "SELECT file_mtime, file_size FROM raw_sessions WHERE session_id = ?1", + libsql::params![session_ref.session_id.clone()], + ) + .await + .map_err(|e| AppError::Db(e.to_string()))?; + + if let Some(row) = rows.next().await.map_err(|e| AppError::Db(e.to_string()))? { + let db_mtime: i64 = row.get(0).map_err(|e| AppError::Db(e.to_string()))?; + let db_size: i64 = row.get(1).map_err(|e| AppError::Db(e.to_string()))?; + db_mtime != file_mtime || db_size != file_size + } else { + true // not in DB yet + } + }; + + if needs_sync { + upsert_session(db, session_ref, file_mtime, file_size).await + } else { + Ok(()) + } +} + +/// Force re-index a session regardless of whether it appears up to date. +pub async fn force_resync(db: &DbHandle, session_ref: &SessionRef) -> Result<()> { + let meta = std::fs::metadata(&session_ref.file_path)?; + let file_size = meta.len() as i64; + let file_mtime = meta + .modified() + .map_err(AppError::Io)? + .duration_since(std::time::UNIX_EPOCH) + .map_err(|e| AppError::InvalidArg(e.to_string()))? + .as_secs() as i64; + upsert_session(db, session_ref, file_mtime, file_size).await +} + +/// Read, parse, and upsert a session file into all relevant DB tables. +async fn upsert_session( + db: &DbHandle, + session_ref: &SessionRef, + file_mtime: i64, + file_size: i64, +) -> Result<()> { + let entries = read_session_file(&session_ref.file_path).await?; + + // Read raw content. + let raw_jsonl = tokio::fs::read_to_string(&session_ref.file_path).await?; + + // Compute stats. + let stats = crate::models::stats::compute_stats(&entries); + + // Find first/last timestamps. + let first_msg_at = entries.iter().find_map(|e| e.timestamp.clone()); + let last_msg_at = entries.iter().rev().find_map(|e| e.timestamp.clone()); + + let conn = db.connect().map_err(|e| AppError::Db(e.to_string()))?; + + // Upsert into sessions table. + conn.execute( + "INSERT OR REPLACE INTO sessions + (session_id, project_path, model, first_msg_at, last_msg_at, + message_count, input_tokens, output_tokens, file_path) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)", + libsql::params![ + session_ref.session_id.clone(), + session_ref.project_path.clone().unwrap_or_default(), + stats.model.clone().unwrap_or_default(), + first_msg_at.unwrap_or_default(), + last_msg_at.unwrap_or_default(), + entries.len() as i64, + stats.input_tokens as i64, + stats.output_tokens as i64, + session_ref.file_path.to_string_lossy().to_string() + ], + ) + .await + .map_err(|e| AppError::Db(e.to_string()))?; + + // Upsert raw_sessions. + conn.execute( + "INSERT OR REPLACE INTO raw_sessions (session_id, raw_jsonl, file_mtime, file_size) + VALUES (?1, ?2, ?3, ?4)", + libsql::params![ + session_ref.session_id.clone(), + raw_jsonl, + file_mtime, + file_size + ], + ) + .await + .map_err(|e| AppError::Db(e.to_string()))?; + + // Delete and reinsert messages (simpler than diffing). + conn.execute( + "DELETE FROM messages WHERE session_id = ?1", + libsql::params![session_ref.session_id.clone()], + ) + .await + .map_err(|e| AppError::Db(e.to_string()))?; + + for (seq, entry) in entries.iter().enumerate() { + let role = entry.message.as_ref().and_then(|m| m.role.clone()); + let content_preview = entry + .message + .as_ref() + .and_then(|m| m.content.as_ref()) + .map(|c| match c { + MessageContent::Text(t) => t.chars().take(120).collect::(), + MessageContent::Blocks(blocks) => blocks + .first() + .map(|b| match b { + ContentBlock::Text { text } => text.chars().take(120).collect(), + ContentBlock::ToolUse { name, .. } => format!("[tool_use: {name}]"), + ContentBlock::Thinking { .. } => "[thinking]".to_string(), + _ => "[...]".to_string(), + }) + .unwrap_or_default(), + }); + + conn.execute( + "INSERT OR IGNORE INTO messages + (session_id, sequence_num, timestamp, role, content_preview, entry_type) + VALUES (?1, ?2, ?3, ?4, ?5, ?6)", + libsql::params![ + session_ref.session_id.clone(), + seq as i64, + entry.timestamp.clone().unwrap_or_default(), + role.unwrap_or_default(), + content_preview.unwrap_or_default(), + entry.entry_type.clone().unwrap_or_default() + ], + ) + .await + .map_err(|e| AppError::Db(e.to_string()))?; + } + + // Delete and reinsert tool_uses. + conn.execute( + "DELETE FROM tool_uses WHERE session_id = ?1", + libsql::params![session_ref.session_id.clone()], + ) + .await + .map_err(|e| AppError::Db(e.to_string()))?; + + for (seq, entry) in entries.iter().enumerate() { + if let Some(msg) = &entry.message + && let Some(MessageContent::Blocks(blocks)) = &msg.content + { + for block in blocks { + if let ContentBlock::ToolUse { id, name, .. } = block { + conn.execute( + "INSERT INTO tool_uses + (session_id, tool_name, tool_use_id, sequence_num, timestamp) + VALUES (?1, ?2, ?3, ?4, ?5)", + libsql::params![ + session_ref.session_id.clone(), + name.clone(), + id.clone(), + seq as i64, + entry.timestamp.clone().unwrap_or_default() + ], + ) + .await + .map_err(|e| AppError::Db(e.to_string()))?; + } + } + } + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::db::connection::open_db; + use crate::parser::discovery::SessionRef; + use chrono::{DateTime, Utc}; + use std::path::PathBuf; + + /// Build a `SessionRef` pointing at the given file. + fn make_session_ref(file_path: PathBuf) -> SessionRef { + let session_id = file_path + .file_stem() + .and_then(|s| s.to_str()) + .unwrap_or("test-session") + .to_string(); + SessionRef { + session_id, + project_path: Some("/tmp/test-project".to_string()), + file_path, + modified_at: DateTime::::from(std::time::SystemTime::UNIX_EPOCH), + } + } + + /// Write a minimal JSONL session file with two entries. + fn write_test_session(path: &std::path::Path) { + let line1 = r#"{"type":"user","session_id":"test-session","cwd":"/tmp","timestamp":"2024-01-01T00:00:00Z","message":{"role":"user","content":"Hello"}}"#; + let line2 = r#"{"type":"assistant","session_id":"test-session","timestamp":"2024-01-01T00:00:01Z","message":{"role":"assistant","content":[{"type":"text","text":"World"}],"usage":{"input_tokens":10,"output_tokens":5},"model":"claude-opus-4-5"}}"#; + std::fs::write(path, format!("{line1}\n{line2}\n")).expect("write test session"); + } + + /// `ensure_synced` inserts a `raw_sessions` row for a new file. + #[tokio::test] + async fn ensure_synced_inserts_row() { + let dir = tempfile::tempdir().expect("tempdir"); + let db_path = dir.path().join("sync_test.db"); + let jsonl_path = dir.path().join("test-session.jsonl"); + + write_test_session(&jsonl_path); + + let db = open_db(&db_path, false).await.expect("open db"); + let session_ref = make_session_ref(jsonl_path); + + ensure_synced(&db, &session_ref) + .await + .expect("ensure_synced"); + + // Verify raw_sessions has a row. + let conn = db.connect().expect("connect"); + let mut rows = conn + .query( + "SELECT session_id FROM raw_sessions WHERE session_id = ?1", + libsql::params![session_ref.session_id.clone()], + ) + .await + .expect("query"); + let row = rows.next().await.expect("next").expect("row exists"); + let sid: String = row.get(0).expect("session_id"); + assert_eq!(sid, session_ref.session_id); + } + + /// `ensure_synced` returns `Ok` without re-syncing when file is unchanged. + #[tokio::test] + async fn ensure_synced_skips_unchanged_file() { + let dir = tempfile::tempdir().expect("tempdir"); + let db_path = dir.path().join("sync_unchanged.db"); + let jsonl_path = dir.path().join("test-session.jsonl"); + + write_test_session(&jsonl_path); + + let db = open_db(&db_path, false).await.expect("open db"); + let session_ref = make_session_ref(jsonl_path); + + // First sync. + ensure_synced(&db, &session_ref).await.expect("first sync"); + + // Second sync on unchanged file — should succeed quickly. + let result = ensure_synced(&db, &session_ref).await; + assert!(result.is_ok(), "second sync failed: {:?}", result.err()); + } + + /// `ensure_synced` re-syncs after the file is modified. + #[tokio::test] + async fn ensure_synced_resyncs_modified_file() { + let dir = tempfile::tempdir().expect("tempdir"); + let db_path = dir.path().join("sync_modified.db"); + let jsonl_path = dir.path().join("test-session.jsonl"); + + write_test_session(&jsonl_path); + + let db = open_db(&db_path, false).await.expect("open db"); + let session_ref = make_session_ref(jsonl_path.clone()); + + // First sync. + ensure_synced(&db, &session_ref).await.expect("first sync"); + + // Modify the file (append a line) and ensure the mtime changes. + // Sleep briefly to ensure mtime differs on systems with 1-second resolution. + std::thread::sleep(std::time::Duration::from_secs(1)); + let extra = r#"{"type":"user","session_id":"test-session","timestamp":"2024-01-01T00:00:02Z","message":{"role":"user","content":"More"}}"#; + let mut file = std::fs::OpenOptions::new() + .append(true) + .open(&jsonl_path) + .expect("open for append"); + use std::io::Write; + writeln!(file, "{extra}").expect("append line"); + + // Second sync — should detect the change and re-sync. + let result = ensure_synced(&db, &session_ref).await; + assert!(result.is_ok(), "re-sync failed: {:?}", result.err()); + + // Verify sessions.message_count was updated. + let conn = db.connect().expect("connect"); + let mut rows = conn + .query( + "SELECT message_count FROM sessions WHERE session_id = ?1", + libsql::params![session_ref.session_id.clone()], + ) + .await + .expect("query"); + let row = rows.next().await.expect("next").expect("row"); + let count: i64 = row.get(0).expect("message_count"); + assert_eq!(count, 3, "expected 3 messages after re-sync"); + } +} diff --git a/src/lib.rs b/src/lib.rs index 78e6d57..2a26e2c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,7 +2,7 @@ pub mod cli; // pub mod commands; -// pub mod db; +pub mod db; pub mod error; pub mod models; // pub mod output;