import os import re import json import time import uuid import sqlite3 import hashlib from dataclasses import dataclass from typing import List, Dict, Any, Optional, Tuple import gradio as gr BASE_DIR = os.environ.get("RFT_MEM_BASE", "var/rftmem") os.makedirs(BASE_DIR, exist_ok=True) def sha256_str(s: str) -> str: return hashlib.sha256(s.encode("utf-8")).hexdigest() def now_ms() -> int: return int(time.time() * 1000) def atomic_write(path: str, data: bytes) -> None: tmp = path + ".tmp" with open(tmp, "wb") as f: f.write(data) f.flush() os.fsync(f.fileno()) os.replace(tmp, path) def safe_fts_match(user_query: str) -> str: # Conservative FTS query: token1 OR token2 OR ... words = re.findall(r"[A-Za-z0-9_]+", (user_query or "").lower()) if not words: return "___NO_HITS___" seen = set() uniq = [] for w in words: if w not in seen: seen.add(w) uniq.append(w) return " OR ".join(uniq) @dataclass class RetrievalHit: event_id: str seq: int role: str text: str ts_ms: int digest: str chain_hash: str score: float class RFTMemoryStore: """ Append-only session ledger + SQLite FTS retrieval + receipt verification. """ def __init__(self, base_dir: str): self.base_dir = base_dir self.db_path = os.path.join(base_dir, "index.sqlite") self._init_db() def _connect(self) -> sqlite3.Connection: return sqlite3.connect(self.db_path) def _init_db(self): os.makedirs(self.base_dir, exist_ok=True) con = self._connect() cur = con.cursor() cur.execute(""" CREATE TABLE IF NOT EXISTS events ( session_id TEXT, event_id TEXT PRIMARY KEY, seq INTEGER, ts_ms INTEGER, role TEXT, text TEXT, digest TEXT, prev_hash TEXT, chain_hash TEXT, collapse REAL ) """) cur.execute(""" CREATE TABLE IF NOT EXISTS receipts ( receipt_id TEXT PRIMARY KEY, session_id TEXT, ts_ms INTEGER, prompt_hash TEXT, response_hash TEXT, receipt_path TEXT ) """) # --- Ensure FTS exists in a join-safe form --- cur.execute("SELECT sql FROM sqlite_master WHERE type='table' AND name='events_fts'") row = cur.fetchone() needs_rebuild = False if row is None: needs_rebuild = True else: sql = (row[0] or "").lower() # If it contains content='' or content="" then it's contentless and join-on-columns is unreliable if "content=''" in sql or 'content=""' in sql: needs_rebuild = True if needs_rebuild: cur.execute("DROP TABLE IF EXISTS events_fts") # Stored-content FTS table (simple and reliable) cur.execute(""" CREATE VIRTUAL TABLE events_fts USING fts5( event_id, session_id, text ) """) con.commit() # Reindex from events cur.execute("DELETE FROM events_fts") cur.execute(""" INSERT INTO events_fts(event_id, session_id, text) SELECT event_id, session_id, text FROM events """) con.commit() con.close() # ---------------- # Filesystem layout # ---------------- def session_dir(self, session_id: str) -> str: d = os.path.join(self.base_dir, "sessions", session_id) os.makedirs(d, exist_ok=True) return d def session_log_path(self, session_id: str) -> str: return os.path.join(self.session_dir(session_id), "events.jsonl") def receipts_dir(self, session_id: str) -> str: d = os.path.join(self.session_dir(session_id), "receipts") os.makedirs(d, exist_ok=True) return d # ------------------- # RFT collapse scoring # ------------------- def get_events(self, session_id: str, limit: int = 400) -> List[Dict[str, Any]]: con = self._connect() cur = con.cursor() cur.execute(""" SELECT event_id, seq, ts_ms, role, text, digest, prev_hash, chain_hash, collapse FROM events WHERE session_id=? ORDER BY seq ASC LIMIT ? """, (session_id, int(limit))) rows = cur.fetchall() con.close() out = [] for r in rows: out.append({ "event_id": r[0], "seq": int(r[1] or 0), "ts_ms": r[2], "role": r[3], "text": r[4], "digest": r[5], "prev_hash": r[6], "chain_hash": r[7], "collapse": float(r[8] or 0.0), }) return out def collapse_score(self, session_id: str, role: str, text: str) -> float: role_w = {"user": 1.0, "tool": 0.9, "assistant": 0.6}.get(role, 0.7) tokens = set(t.lower() for t in re.findall(r"[A-Za-z0-9_]+", text or "")) if not tokens: return 0.0 recent = self.get_events(session_id, limit=20) recent_tokens = set() for e in recent: recent_tokens |= set(t.lower() for t in re.findall(r"[A-Za-z0-9_]+", e["text"] or "")) unseen = len(tokens - recent_tokens) novelty = unseen / max(1, len(tokens)) length_factor = min(1.0, len(tokens) / 30.0) score = role_w * (0.65 * novelty + 0.35 * length_factor) return float(max(0.0, min(1.0, score))) # ---------------- # Append-only write # ---------------- def _get_last_seq_and_chain(self, session_id: str) -> Tuple[int, str]: con = self._connect() cur = con.cursor() cur.execute("SELECT COALESCE(MAX(seq), 0) FROM events WHERE session_id=?", (session_id,)) last_seq = int(cur.fetchone()[0] or 0) cur.execute(""" SELECT chain_hash FROM events WHERE session_id=? ORDER BY seq DESC LIMIT 1 """, (session_id,)) row = cur.fetchone() con.close() last_chain = row[0] if row and row[0] else ("0" * 64) return last_seq, last_chain def append_event(self, session_id: str, role: str, text: str) -> Dict[str, Any]: event_id = uuid.uuid4().hex ts = now_ms() last_seq, prev_chain = self._get_last_seq_and_chain(session_id) seq = last_seq + 1 payload = { "session_id": session_id, "event_id": event_id, "seq": seq, "ts_ms": ts, "role": role, "text": text } digest = sha256_str(json.dumps(payload, sort_keys=True, ensure_ascii=False)) chain_hash = sha256_str(prev_chain + digest) collapse = self.collapse_score(session_id, role, text) rec = { **payload, "digest": digest, "prev_hash": prev_chain, "chain_hash": chain_hash, "collapse": collapse } # Append-only JSONL source of truth log_path = self.session_log_path(session_id) line = (json.dumps(rec, ensure_ascii=False) + "\n").encode("utf-8") with open(log_path, "ab") as f: f.write(line) f.flush() os.fsync(f.fileno()) # Index con = self._connect() cur = con.cursor() cur.execute(""" INSERT INTO events(session_id,event_id,seq,ts_ms,role,text,digest,prev_hash,chain_hash,collapse) VALUES(?,?,?,?,?,?,?,?,?,?) """, (session_id, event_id, seq, ts, role, text, digest, prev_chain, chain_hash, collapse)) cur.execute("INSERT INTO events_fts(event_id, session_id, text) VALUES(?,?,?)", (event_id, session_id, text)) con.commit() con.close() return rec # ------------------------- # Retrieval (FTS + fallback) # ------------------------- def search_lexical(self, session_id: str, query: str, k: int = 8) -> List[RetrievalHit]: match = safe_fts_match(query) if match == "___NO_HITS___": return [] con = self._connect() cur = con.cursor() cur.execute(""" SELECT e.event_id, e.seq, e.role, e.text, e.ts_ms, e.digest, e.chain_hash, bm25(events_fts) as rank FROM events_fts JOIN events e ON e.event_id = events_fts.event_id WHERE events_fts.text MATCH ? AND e.session_id=? ORDER BY rank ASC LIMIT ? """, (match, session_id, int(k))) rows = cur.fetchall() con.close() hits: List[RetrievalHit] = [] for (eid, seq, role, text, ts, digest, chain_hash, rank) in rows: r = float(rank if rank is not None else 0.0) score = 1.0 / (1.0 + max(0.0, r)) hits.append(RetrievalHit(eid, int(seq or 0), role, text, ts, digest, chain_hash, score)) # Fallback if FTS returns nothing (protects demo UX) if not hits: tokens = re.findall(r"[A-Za-z0-9_]+", (query or "").lower()) if tokens: needle = f"%{tokens[-1]}%" con = self._connect() cur = con.cursor() cur.execute(""" SELECT event_id, seq, role, text, ts_ms, digest, chain_hash FROM events WHERE session_id=? AND LOWER(text) LIKE ? ORDER BY seq DESC LIMIT ? """, (session_id, needle, int(k))) rows2 = cur.fetchall() con.close() for (eid, seq, role, text, ts, digest, chain_hash) in rows2: hits.append(RetrievalHit(eid, int(seq or 0), role, text, ts, digest, chain_hash, 0.001)) return hits # ------------------------- # Receipts + verification # ------------------------- def write_receipt(self, session_id: str, user_text: str, retrieved: List[RetrievalHit], prompt: str, response: str) -> str: receipt_id = uuid.uuid4().hex ts = now_ms() receipt = { "receipt_id": receipt_id, "session_id": session_id, "ts_ms": ts, "query": user_text, "retrieval": [{ "event_id": h.event_id, "seq": h.seq, "role": h.role, "content": h.text, "score": h.score, "digest": h.digest, "chain_hash": h.chain_hash } for h in retrieved], "prompt_hash": sha256_str(prompt), "response_hash": sha256_str(response), "engine": { "name": "RFT Memory Receipt Engine", "version": "0.3", "method": "append-only ledger + FTS retrieval + hash-chain receipts" } } path = os.path.join(self.receipts_dir(session_id), f"{receipt_id}.json") atomic_write(path, json.dumps(receipt, indent=2, ensure_ascii=False).encode("utf-8")) con = self._connect() cur = con.cursor() cur.execute(""" INSERT INTO receipts(receipt_id, session_id, ts_ms, prompt_hash, response_hash, receipt_path) VALUES(?,?,?,?,?,?) """, (receipt_id, session_id, ts, receipt["prompt_hash"], receipt["response_hash"], path)) con.commit() con.close() return path def verify_receipt(self, receipt_json: Dict[str, Any]) -> Tuple[bool, str]: session_id = receipt_json.get("session_id") if not session_id: return False, "Missing session_id." con = self._connect() cur = con.cursor() for item in receipt_json.get("retrieval", []): eid = item.get("event_id") expected_digest = item.get("digest") expected_chain = item.get("chain_hash") cur.execute("SELECT digest, chain_hash FROM events WHERE event_id=? AND session_id=?", (eid, session_id)) row = cur.fetchone() if not row: con.close() return False, f"Event not found: {eid}" if row[0] != expected_digest: con.close() return False, f"Digest mismatch for {eid}" if row[1] != expected_chain: con.close() return False, f"Chain hash mismatch for {eid}" con.close() return True, "Receipt verified: all referenced events exist and hashes match." store = RFTMemoryStore(BASE_DIR) EXAMPLE_PROMPTS = [ "Store this: Dog=Nova, City=Manchester, Drink=Pepsi Max.", "What is my dog's name?", "What city did I say?", "My drink is Coke Zero now. This overrides earlier.", "What is my favourite drink?", "Search for: Nova", "Search for: Manchester", ] GUIDED_DEMO_STEPS = [ "My name is Liam. Remember that.", "Store these exactly: Dog = Nova. City = Manchester. Favourite drink = Pepsi Max.", "What’s my dog’s name?", "What city did I say?", "My favourite drink is Coke Zero now. This overrides earlier.", "What’s my favourite drink?", "Search for Nova and show the matching memory line.", "Search for Coke and show the matching memory line.", ] HOW_TO_MD = """ # How to use this Space ## What to write - Store facts: “My name is Liam.” / “Dog=Nova, City=Manchester.” - Ask for recall: “What’s my dog’s name?” - Override old facts: “My drink is Coke Zero now. This overrides earlier.” - Search deliberately: “Nova”, “Manchester”, “Coke”. ## What to expect - Every message is stored in an append-only ledger. - Each turn retrieves a fixed number (K) of memory slices. - Each turn generates a receipt listing exactly what was retrieved. - Receipt verification proves the referenced events exist and hashes match. ## Why it matters Storing logs is easy. Proving influence is the hard part. Receipts turn memory into something inspectable and defensible. """ def new_session_id() -> str: return uuid.uuid4().hex def events_to_messages(events: List[Dict[str, Any]]) -> List[Dict[str, str]]: return [{"role": e["role"], "content": e["text"]} for e in events if e["role"] in ("user", "assistant")] def format_ledger(events: List[Dict[str, Any]]) -> str: lines = [] for e in events[-250:]: t = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(e["ts_ms"] / 1000)) lines.append( f"{t} | seq={e['seq']} | {e['role']}\n" f"{e['text']}\n" f"event_id={e['event_id']} collapse={e['collapse']:.2f}\n" f"digest={e['digest']}\n" f"chain={e['chain_hash']}\n" f"{'-'*72}" ) return "\n".join(lines) def build_prompt(user_msg: str, hits: List[RetrievalHit]) -> str: memories = "\n".join([f"- ({h.role}) {h.text}" for h in hits]) if hits else "(none)" return ( "SYSTEM: Use retrieved memory slices if relevant. Prefer exact stored facts.\n" f"RETRIEVED MEMORIES:\n{memories}\n\n" f"USER:\n{user_msg}\n" ) def extract_fact_from_hits(hits: List[RetrievalHit], key: str) -> Optional[str]: """ Extract simple facts from retrieved memory only. Supports: - "Key = Value" - "Key: Value" - "My name is Value" """ key_l = key.lower() patterns = [ rf"\b{re.escape(key_l)}\b\s*=\s*([A-Za-z0-9 _\-']+)", rf"\b{re.escape(key_l)}\b\s*:\s*([A-Za-z0-9 _\-']+)", ] for h in hits: t = (h.text or "").strip() tl = t.lower() if key_l == "name": m = re.search(r"\bmy name is\b\s+([A-Za-z0-9 _\-']+)", tl) if m: return m.group(1).strip().title() for p in patterns: m = re.search(p, tl) if m: return m.group(1).strip().strip(",.") return None def answer_from_memory(user_msg: str, hits: List[RetrievalHit]) -> str: q = (user_msg or "").lower() # Search-style queries if q.startswith("search for") or q.startswith("search:"): if not hits: return "No matching memory slices were retrieved for this search." lines = ["Search hits:"] for h in hits: lines.append(f"- {h.score:.4f} | {h.role} | {h.text}") return "\n".join(lines) # Simple Q&A based on retrieved memory only if "dog" in q and "name" in q: v = extract_fact_from_hits(hits, "dog") return f"Your dog’s name (from stored memory) is: {v}" if v else "I didn’t retrieve a stored dog name for this query." if "city" in q: v = extract_fact_from_hits(hits, "city") return f"Your city (from stored memory) is: {v}" if v else "I didn’t retrieve a stored city for this query." if "drink" in q or "favourite drink" in q or "favorite drink" in q: v = extract_fact_from_hits(hits, "drink") or extract_fact_from_hits(hits, "favourite drink") or extract_fact_from_hits(hits, "favorite drink") return f"Your drink (from stored memory) is: {v}" if v else "I didn’t retrieve a stored drink for this query." if "my name" in q: v = extract_fact_from_hits(hits, "name") return f"Your name (from stored memory) is: {v}" if v else "I didn’t retrieve a stored name for this query." # Default: show what was retrieved (transparent) if not hits: return "No matching memory slices were retrieved for this query." lines = ["Retrieved memory slices:"] for h in hits: lines.append(f"- {h.score:.4f} | {h.role} | {h.text}") return "\n".join(lines) def chat_turn(session_id: str, user_msg: str, retrieval_k: int): if not session_id: session_id = uuid.uuid4().hex store.append_event(session_id, "user", user_msg) hits = store.search_lexical(session_id, user_msg, k=int(retrieval_k)) prompt = build_prompt(user_msg, hits) response = answer_from_memory(user_msg, hits) store.append_event(session_id, "assistant", response) receipt_path = store.write_receipt(session_id, user_msg, hits, prompt, response) events = store.get_events(session_id, limit=900) ledger = format_ledger(events) retrieved_view = "\n".join([f"{h.score:.4f} | {h.role} | {h.text}" for h in hits]) if hits else "(none)" messages = events_to_messages(events) # Return receipt path twice: textbox + downloadable file return session_id, messages, retrieved_view, ledger, receipt_path, receipt_path def run_guided_demo(session_id: str, retrieval_k: int): if not session_id: session_id = uuid.uuid4().hex last = (session_id, [], "", "", "", None) for step in GUIDED_DEMO_STEPS: last = chat_turn(session_id, step, retrieval_k) session_id = last[0] return last def manual_search(session_id: str, query: str, k: int) -> str: if not session_id: return "(no session)" hits = store.search_lexical(session_id, query, k=int(k)) if not hits: return "(no hits)" out = [] for h in hits: out.append( f"{h.score:.4f} | seq={h.seq} | {h.role}\n" f"{h.text}\n" f"event_id={h.event_id}\n" f"digest={h.digest}\n" f"chain={h.chain_hash}\n" f"{'-'*60}" ) return "\n".join(out) def verify_uploaded_receipt(file_obj) -> str: if file_obj is None: return "Upload a receipt JSON file." with open(file_obj.name, "r", encoding="utf-8") as f: data = json.load(f) ok, msg = store.verify_receipt(data) return f"{'✅' if ok else '❌'} {msg}" def reset_session(): sid = uuid.uuid4().hex return sid, [], "", "", "", None def fill_example(selected: str) -> str: return selected or "" with gr.Blocks(title="RFT Memory Receipt Engine") as demo: gr.Markdown("# RFT Memory Receipt Engine\nLocal persistence + retrieval + verifiable receipts.") with gr.Row(): session_id = gr.Textbox(label="Session ID", value=new_session_id()) new_sess_btn = gr.Button("New Session", variant="secondary") retrieval_k = gr.Slider(1, 20, value=8, step=1, label="Retrieval K") with gr.Tabs(): with gr.Tab("Chat"): chatbot = gr.Chatbot(label="Conversation", height=320) with gr.Row(): example_pick = gr.Dropdown(label="Example prompts", choices=EXAMPLE_PROMPTS, value=EXAMPLE_PROMPTS[0]) use_example = gr.Button("Use Example", variant="secondary") user_msg = gr.Textbox(label="Message", placeholder="Type a message…") send = gr.Button("Send", variant="primary") retrieved_out = gr.Textbox(label="Retrieved memory slices", lines=8) ledger_out = gr.Textbox(label="Session ledger (hash-chained)", lines=14) receipt_path = gr.Textbox(label="Last receipt path (server)", lines=1) receipt_file = gr.File(label="Download last receipt JSON") use_example.click(fill_example, inputs=[example_pick], outputs=[user_msg]) send.click( chat_turn, inputs=[session_id, user_msg, retrieval_k], outputs=[session_id, chatbot, retrieved_out, ledger_out, receipt_path, receipt_file], ) with gr.Tab("Guided Demo"): gr.Markdown("Runs a scripted demo to show storage, recall, overrides, search, and receipts.") run_demo_btn = gr.Button("Run Guided Demo", variant="primary") demo_chatbot = gr.Chatbot(label="Demo conversation", height=320) demo_retrieved = gr.Textbox(label="Last retrieved memory slices", lines=8) demo_ledger = gr.Textbox(label="Ledger after demo", lines=14) demo_receipt_path = gr.Textbox(label="Last demo receipt path (server)", lines=1) demo_receipt_file = gr.File(label="Download last demo receipt JSON") run_demo_btn.click( run_guided_demo, inputs=[session_id, retrieval_k], outputs=[session_id, demo_chatbot, demo_retrieved, demo_ledger, demo_receipt_path, demo_receipt_file], ) with gr.Tab("Manual Search"): q = gr.Textbox(label="Search query", placeholder="Type keywords…") do_search = gr.Button("Search", variant="primary") results = gr.Textbox(label="Results", lines=14) do_search.click(manual_search, inputs=[session_id, q, retrieval_k], outputs=[results]) with gr.Tab("Verify Receipt"): receipt_upload = gr.File(label="Upload a receipt JSON") verify_btn = gr.Button("Verify", variant="primary") verify_out = gr.Textbox(label="Verification result") verify_btn.click(verify_uploaded_receipt, inputs=[receipt_upload], outputs=[verify_out]) with gr.Tab("How to Use"): gr.Markdown(HOW_TO_MD) new_sess_btn.click( reset_session, inputs=[], outputs=[session_id, chatbot, retrieved_out, ledger_out, receipt_path, receipt_file], ) demo.launch()