RFTSystems's picture
Update app.py
efb382e verified
import os
import re
import json
import time
import uuid
import sqlite3
import hashlib
from dataclasses import dataclass
from typing import List, Dict, Any, Optional, Tuple
import gradio as gr
BASE_DIR = os.environ.get("RFT_MEM_BASE", "var/rftmem")
os.makedirs(BASE_DIR, exist_ok=True)
def sha256_str(s: str) -> str:
return hashlib.sha256(s.encode("utf-8")).hexdigest()
def now_ms() -> int:
return int(time.time() * 1000)
def atomic_write(path: str, data: bytes) -> None:
tmp = path + ".tmp"
with open(tmp, "wb") as f:
f.write(data)
f.flush()
os.fsync(f.fileno())
os.replace(tmp, path)
def safe_fts_match(user_query: str) -> str:
# Conservative FTS query: token1 OR token2 OR ...
words = re.findall(r"[A-Za-z0-9_]+", (user_query or "").lower())
if not words:
return "___NO_HITS___"
seen = set()
uniq = []
for w in words:
if w not in seen:
seen.add(w)
uniq.append(w)
return " OR ".join(uniq)
@dataclass
class RetrievalHit:
event_id: str
seq: int
role: str
text: str
ts_ms: int
digest: str
chain_hash: str
score: float
class RFTMemoryStore:
"""
Append-only session ledger + SQLite FTS retrieval + receipt verification.
"""
def __init__(self, base_dir: str):
self.base_dir = base_dir
self.db_path = os.path.join(base_dir, "index.sqlite")
self._init_db()
def _connect(self) -> sqlite3.Connection:
return sqlite3.connect(self.db_path)
def _init_db(self):
os.makedirs(self.base_dir, exist_ok=True)
con = self._connect()
cur = con.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS events (
session_id TEXT,
event_id TEXT PRIMARY KEY,
seq INTEGER,
ts_ms INTEGER,
role TEXT,
text TEXT,
digest TEXT,
prev_hash TEXT,
chain_hash TEXT,
collapse REAL
)
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS receipts (
receipt_id TEXT PRIMARY KEY,
session_id TEXT,
ts_ms INTEGER,
prompt_hash TEXT,
response_hash TEXT,
receipt_path TEXT
)
""")
# --- Ensure FTS exists in a join-safe form ---
cur.execute("SELECT sql FROM sqlite_master WHERE type='table' AND name='events_fts'")
row = cur.fetchone()
needs_rebuild = False
if row is None:
needs_rebuild = True
else:
sql = (row[0] or "").lower()
# If it contains content='' or content="" then it's contentless and join-on-columns is unreliable
if "content=''" in sql or 'content=""' in sql:
needs_rebuild = True
if needs_rebuild:
cur.execute("DROP TABLE IF EXISTS events_fts")
# Stored-content FTS table (simple and reliable)
cur.execute("""
CREATE VIRTUAL TABLE events_fts USING fts5(
event_id,
session_id,
text
)
""")
con.commit()
# Reindex from events
cur.execute("DELETE FROM events_fts")
cur.execute("""
INSERT INTO events_fts(event_id, session_id, text)
SELECT event_id, session_id, text FROM events
""")
con.commit()
con.close()
# ----------------
# Filesystem layout
# ----------------
def session_dir(self, session_id: str) -> str:
d = os.path.join(self.base_dir, "sessions", session_id)
os.makedirs(d, exist_ok=True)
return d
def session_log_path(self, session_id: str) -> str:
return os.path.join(self.session_dir(session_id), "events.jsonl")
def receipts_dir(self, session_id: str) -> str:
d = os.path.join(self.session_dir(session_id), "receipts")
os.makedirs(d, exist_ok=True)
return d
# -------------------
# RFT collapse scoring
# -------------------
def get_events(self, session_id: str, limit: int = 400) -> List[Dict[str, Any]]:
con = self._connect()
cur = con.cursor()
cur.execute("""
SELECT event_id, seq, ts_ms, role, text, digest, prev_hash, chain_hash, collapse
FROM events
WHERE session_id=?
ORDER BY seq ASC
LIMIT ?
""", (session_id, int(limit)))
rows = cur.fetchall()
con.close()
out = []
for r in rows:
out.append({
"event_id": r[0],
"seq": int(r[1] or 0),
"ts_ms": r[2],
"role": r[3],
"text": r[4],
"digest": r[5],
"prev_hash": r[6],
"chain_hash": r[7],
"collapse": float(r[8] or 0.0),
})
return out
def collapse_score(self, session_id: str, role: str, text: str) -> float:
role_w = {"user": 1.0, "tool": 0.9, "assistant": 0.6}.get(role, 0.7)
tokens = set(t.lower() for t in re.findall(r"[A-Za-z0-9_]+", text or ""))
if not tokens:
return 0.0
recent = self.get_events(session_id, limit=20)
recent_tokens = set()
for e in recent:
recent_tokens |= set(t.lower() for t in re.findall(r"[A-Za-z0-9_]+", e["text"] or ""))
unseen = len(tokens - recent_tokens)
novelty = unseen / max(1, len(tokens))
length_factor = min(1.0, len(tokens) / 30.0)
score = role_w * (0.65 * novelty + 0.35 * length_factor)
return float(max(0.0, min(1.0, score)))
# ----------------
# Append-only write
# ----------------
def _get_last_seq_and_chain(self, session_id: str) -> Tuple[int, str]:
con = self._connect()
cur = con.cursor()
cur.execute("SELECT COALESCE(MAX(seq), 0) FROM events WHERE session_id=?", (session_id,))
last_seq = int(cur.fetchone()[0] or 0)
cur.execute("""
SELECT chain_hash FROM events
WHERE session_id=?
ORDER BY seq DESC
LIMIT 1
""", (session_id,))
row = cur.fetchone()
con.close()
last_chain = row[0] if row and row[0] else ("0" * 64)
return last_seq, last_chain
def append_event(self, session_id: str, role: str, text: str) -> Dict[str, Any]:
event_id = uuid.uuid4().hex
ts = now_ms()
last_seq, prev_chain = self._get_last_seq_and_chain(session_id)
seq = last_seq + 1
payload = {
"session_id": session_id,
"event_id": event_id,
"seq": seq,
"ts_ms": ts,
"role": role,
"text": text
}
digest = sha256_str(json.dumps(payload, sort_keys=True, ensure_ascii=False))
chain_hash = sha256_str(prev_chain + digest)
collapse = self.collapse_score(session_id, role, text)
rec = {
**payload,
"digest": digest,
"prev_hash": prev_chain,
"chain_hash": chain_hash,
"collapse": collapse
}
# Append-only JSONL source of truth
log_path = self.session_log_path(session_id)
line = (json.dumps(rec, ensure_ascii=False) + "\n").encode("utf-8")
with open(log_path, "ab") as f:
f.write(line)
f.flush()
os.fsync(f.fileno())
# Index
con = self._connect()
cur = con.cursor()
cur.execute("""
INSERT INTO events(session_id,event_id,seq,ts_ms,role,text,digest,prev_hash,chain_hash,collapse)
VALUES(?,?,?,?,?,?,?,?,?,?)
""", (session_id, event_id, seq, ts, role, text, digest, prev_chain, chain_hash, collapse))
cur.execute("INSERT INTO events_fts(event_id, session_id, text) VALUES(?,?,?)", (event_id, session_id, text))
con.commit()
con.close()
return rec
# -------------------------
# Retrieval (FTS + fallback)
# -------------------------
def search_lexical(self, session_id: str, query: str, k: int = 8) -> List[RetrievalHit]:
match = safe_fts_match(query)
if match == "___NO_HITS___":
return []
con = self._connect()
cur = con.cursor()
cur.execute("""
SELECT e.event_id, e.seq, e.role, e.text, e.ts_ms, e.digest, e.chain_hash,
bm25(events_fts) as rank
FROM events_fts
JOIN events e ON e.event_id = events_fts.event_id
WHERE events_fts.text MATCH ? AND e.session_id=?
ORDER BY rank ASC
LIMIT ?
""", (match, session_id, int(k)))
rows = cur.fetchall()
con.close()
hits: List[RetrievalHit] = []
for (eid, seq, role, text, ts, digest, chain_hash, rank) in rows:
r = float(rank if rank is not None else 0.0)
score = 1.0 / (1.0 + max(0.0, r))
hits.append(RetrievalHit(eid, int(seq or 0), role, text, ts, digest, chain_hash, score))
# Fallback if FTS returns nothing (protects demo UX)
if not hits:
tokens = re.findall(r"[A-Za-z0-9_]+", (query or "").lower())
if tokens:
needle = f"%{tokens[-1]}%"
con = self._connect()
cur = con.cursor()
cur.execute("""
SELECT event_id, seq, role, text, ts_ms, digest, chain_hash
FROM events
WHERE session_id=? AND LOWER(text) LIKE ?
ORDER BY seq DESC
LIMIT ?
""", (session_id, needle, int(k)))
rows2 = cur.fetchall()
con.close()
for (eid, seq, role, text, ts, digest, chain_hash) in rows2:
hits.append(RetrievalHit(eid, int(seq or 0), role, text, ts, digest, chain_hash, 0.001))
return hits
# -------------------------
# Receipts + verification
# -------------------------
def write_receipt(self, session_id: str, user_text: str, retrieved: List[RetrievalHit], prompt: str, response: str) -> str:
receipt_id = uuid.uuid4().hex
ts = now_ms()
receipt = {
"receipt_id": receipt_id,
"session_id": session_id,
"ts_ms": ts,
"query": user_text,
"retrieval": [{
"event_id": h.event_id,
"seq": h.seq,
"role": h.role,
"content": h.text,
"score": h.score,
"digest": h.digest,
"chain_hash": h.chain_hash
} for h in retrieved],
"prompt_hash": sha256_str(prompt),
"response_hash": sha256_str(response),
"engine": {
"name": "RFT Memory Receipt Engine",
"version": "0.3",
"method": "append-only ledger + FTS retrieval + hash-chain receipts"
}
}
path = os.path.join(self.receipts_dir(session_id), f"{receipt_id}.json")
atomic_write(path, json.dumps(receipt, indent=2, ensure_ascii=False).encode("utf-8"))
con = self._connect()
cur = con.cursor()
cur.execute("""
INSERT INTO receipts(receipt_id, session_id, ts_ms, prompt_hash, response_hash, receipt_path)
VALUES(?,?,?,?,?,?)
""", (receipt_id, session_id, ts, receipt["prompt_hash"], receipt["response_hash"], path))
con.commit()
con.close()
return path
def verify_receipt(self, receipt_json: Dict[str, Any]) -> Tuple[bool, str]:
session_id = receipt_json.get("session_id")
if not session_id:
return False, "Missing session_id."
con = self._connect()
cur = con.cursor()
for item in receipt_json.get("retrieval", []):
eid = item.get("event_id")
expected_digest = item.get("digest")
expected_chain = item.get("chain_hash")
cur.execute("SELECT digest, chain_hash FROM events WHERE event_id=? AND session_id=?", (eid, session_id))
row = cur.fetchone()
if not row:
con.close()
return False, f"Event not found: {eid}"
if row[0] != expected_digest:
con.close()
return False, f"Digest mismatch for {eid}"
if row[1] != expected_chain:
con.close()
return False, f"Chain hash mismatch for {eid}"
con.close()
return True, "Receipt verified: all referenced events exist and hashes match."
store = RFTMemoryStore(BASE_DIR)
EXAMPLE_PROMPTS = [
"Store this: Dog=Nova, City=Manchester, Drink=Pepsi Max.",
"What is my dog's name?",
"What city did I say?",
"My drink is Coke Zero now. This overrides earlier.",
"What is my favourite drink?",
"Search for: Nova",
"Search for: Manchester",
]
GUIDED_DEMO_STEPS = [
"My name is Liam. Remember that.",
"Store these exactly: Dog = Nova. City = Manchester. Favourite drink = Pepsi Max.",
"What’s my dog’s name?",
"What city did I say?",
"My favourite drink is Coke Zero now. This overrides earlier.",
"What’s my favourite drink?",
"Search for Nova and show the matching memory line.",
"Search for Coke and show the matching memory line.",
]
HOW_TO_MD = """
# How to use this Space
## What to write
- Store facts: “My name is Liam.” / “Dog=Nova, City=Manchester.”
- Ask for recall: “What’s my dog’s name?”
- Override old facts: “My drink is Coke Zero now. This overrides earlier.”
- Search deliberately: “Nova”, “Manchester”, “Coke”.
## What to expect
- Every message is stored in an append-only ledger.
- Each turn retrieves a fixed number (K) of memory slices.
- Each turn generates a receipt listing exactly what was retrieved.
- Receipt verification proves the referenced events exist and hashes match.
## Why it matters
Storing logs is easy. Proving influence is the hard part.
Receipts turn memory into something inspectable and defensible.
"""
def new_session_id() -> str:
return uuid.uuid4().hex
def events_to_messages(events: List[Dict[str, Any]]) -> List[Dict[str, str]]:
return [{"role": e["role"], "content": e["text"]} for e in events if e["role"] in ("user", "assistant")]
def format_ledger(events: List[Dict[str, Any]]) -> str:
lines = []
for e in events[-250:]:
t = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(e["ts_ms"] / 1000))
lines.append(
f"{t} | seq={e['seq']} | {e['role']}\n"
f"{e['text']}\n"
f"event_id={e['event_id']} collapse={e['collapse']:.2f}\n"
f"digest={e['digest']}\n"
f"chain={e['chain_hash']}\n"
f"{'-'*72}"
)
return "\n".join(lines)
def build_prompt(user_msg: str, hits: List[RetrievalHit]) -> str:
memories = "\n".join([f"- ({h.role}) {h.text}" for h in hits]) if hits else "(none)"
return (
"SYSTEM: Use retrieved memory slices if relevant. Prefer exact stored facts.\n"
f"RETRIEVED MEMORIES:\n{memories}\n\n"
f"USER:\n{user_msg}\n"
)
def extract_fact_from_hits(hits: List[RetrievalHit], key: str) -> Optional[str]:
"""
Extract simple facts from retrieved memory only.
Supports:
- "Key = Value"
- "Key: Value"
- "My name is Value"
"""
key_l = key.lower()
patterns = [
rf"\b{re.escape(key_l)}\b\s*=\s*([A-Za-z0-9 _\-']+)",
rf"\b{re.escape(key_l)}\b\s*:\s*([A-Za-z0-9 _\-']+)",
]
for h in hits:
t = (h.text or "").strip()
tl = t.lower()
if key_l == "name":
m = re.search(r"\bmy name is\b\s+([A-Za-z0-9 _\-']+)", tl)
if m:
return m.group(1).strip().title()
for p in patterns:
m = re.search(p, tl)
if m:
return m.group(1).strip().strip(",.")
return None
def answer_from_memory(user_msg: str, hits: List[RetrievalHit]) -> str:
q = (user_msg or "").lower()
# Search-style queries
if q.startswith("search for") or q.startswith("search:"):
if not hits:
return "No matching memory slices were retrieved for this search."
lines = ["Search hits:"]
for h in hits:
lines.append(f"- {h.score:.4f} | {h.role} | {h.text}")
return "\n".join(lines)
# Simple Q&A based on retrieved memory only
if "dog" in q and "name" in q:
v = extract_fact_from_hits(hits, "dog")
return f"Your dog’s name (from stored memory) is: {v}" if v else "I didn’t retrieve a stored dog name for this query."
if "city" in q:
v = extract_fact_from_hits(hits, "city")
return f"Your city (from stored memory) is: {v}" if v else "I didn’t retrieve a stored city for this query."
if "drink" in q or "favourite drink" in q or "favorite drink" in q:
v = extract_fact_from_hits(hits, "drink") or extract_fact_from_hits(hits, "favourite drink") or extract_fact_from_hits(hits, "favorite drink")
return f"Your drink (from stored memory) is: {v}" if v else "I didn’t retrieve a stored drink for this query."
if "my name" in q:
v = extract_fact_from_hits(hits, "name")
return f"Your name (from stored memory) is: {v}" if v else "I didn’t retrieve a stored name for this query."
# Default: show what was retrieved (transparent)
if not hits:
return "No matching memory slices were retrieved for this query."
lines = ["Retrieved memory slices:"]
for h in hits:
lines.append(f"- {h.score:.4f} | {h.role} | {h.text}")
return "\n".join(lines)
def chat_turn(session_id: str, user_msg: str, retrieval_k: int):
if not session_id:
session_id = uuid.uuid4().hex
store.append_event(session_id, "user", user_msg)
hits = store.search_lexical(session_id, user_msg, k=int(retrieval_k))
prompt = build_prompt(user_msg, hits)
response = answer_from_memory(user_msg, hits)
store.append_event(session_id, "assistant", response)
receipt_path = store.write_receipt(session_id, user_msg, hits, prompt, response)
events = store.get_events(session_id, limit=900)
ledger = format_ledger(events)
retrieved_view = "\n".join([f"{h.score:.4f} | {h.role} | {h.text}" for h in hits]) if hits else "(none)"
messages = events_to_messages(events)
# Return receipt path twice: textbox + downloadable file
return session_id, messages, retrieved_view, ledger, receipt_path, receipt_path
def run_guided_demo(session_id: str, retrieval_k: int):
if not session_id:
session_id = uuid.uuid4().hex
last = (session_id, [], "", "", "", None)
for step in GUIDED_DEMO_STEPS:
last = chat_turn(session_id, step, retrieval_k)
session_id = last[0]
return last
def manual_search(session_id: str, query: str, k: int) -> str:
if not session_id:
return "(no session)"
hits = store.search_lexical(session_id, query, k=int(k))
if not hits:
return "(no hits)"
out = []
for h in hits:
out.append(
f"{h.score:.4f} | seq={h.seq} | {h.role}\n"
f"{h.text}\n"
f"event_id={h.event_id}\n"
f"digest={h.digest}\n"
f"chain={h.chain_hash}\n"
f"{'-'*60}"
)
return "\n".join(out)
def verify_uploaded_receipt(file_obj) -> str:
if file_obj is None:
return "Upload a receipt JSON file."
with open(file_obj.name, "r", encoding="utf-8") as f:
data = json.load(f)
ok, msg = store.verify_receipt(data)
return f"{'✅' if ok else '❌'} {msg}"
def reset_session():
sid = uuid.uuid4().hex
return sid, [], "", "", "", None
def fill_example(selected: str) -> str:
return selected or ""
with gr.Blocks(title="RFT Memory Receipt Engine") as demo:
gr.Markdown("# RFT Memory Receipt Engine\nLocal persistence + retrieval + verifiable receipts.")
with gr.Row():
session_id = gr.Textbox(label="Session ID", value=new_session_id())
new_sess_btn = gr.Button("New Session", variant="secondary")
retrieval_k = gr.Slider(1, 20, value=8, step=1, label="Retrieval K")
with gr.Tabs():
with gr.Tab("Chat"):
chatbot = gr.Chatbot(label="Conversation", height=320)
with gr.Row():
example_pick = gr.Dropdown(label="Example prompts", choices=EXAMPLE_PROMPTS, value=EXAMPLE_PROMPTS[0])
use_example = gr.Button("Use Example", variant="secondary")
user_msg = gr.Textbox(label="Message", placeholder="Type a message…")
send = gr.Button("Send", variant="primary")
retrieved_out = gr.Textbox(label="Retrieved memory slices", lines=8)
ledger_out = gr.Textbox(label="Session ledger (hash-chained)", lines=14)
receipt_path = gr.Textbox(label="Last receipt path (server)", lines=1)
receipt_file = gr.File(label="Download last receipt JSON")
use_example.click(fill_example, inputs=[example_pick], outputs=[user_msg])
send.click(
chat_turn,
inputs=[session_id, user_msg, retrieval_k],
outputs=[session_id, chatbot, retrieved_out, ledger_out, receipt_path, receipt_file],
)
with gr.Tab("Guided Demo"):
gr.Markdown("Runs a scripted demo to show storage, recall, overrides, search, and receipts.")
run_demo_btn = gr.Button("Run Guided Demo", variant="primary")
demo_chatbot = gr.Chatbot(label="Demo conversation", height=320)
demo_retrieved = gr.Textbox(label="Last retrieved memory slices", lines=8)
demo_ledger = gr.Textbox(label="Ledger after demo", lines=14)
demo_receipt_path = gr.Textbox(label="Last demo receipt path (server)", lines=1)
demo_receipt_file = gr.File(label="Download last demo receipt JSON")
run_demo_btn.click(
run_guided_demo,
inputs=[session_id, retrieval_k],
outputs=[session_id, demo_chatbot, demo_retrieved, demo_ledger, demo_receipt_path, demo_receipt_file],
)
with gr.Tab("Manual Search"):
q = gr.Textbox(label="Search query", placeholder="Type keywords…")
do_search = gr.Button("Search", variant="primary")
results = gr.Textbox(label="Results", lines=14)
do_search.click(manual_search, inputs=[session_id, q, retrieval_k], outputs=[results])
with gr.Tab("Verify Receipt"):
receipt_upload = gr.File(label="Upload a receipt JSON")
verify_btn = gr.Button("Verify", variant="primary")
verify_out = gr.Textbox(label="Verification result")
verify_btn.click(verify_uploaded_receipt, inputs=[receipt_upload], outputs=[verify_out])
with gr.Tab("How to Use"):
gr.Markdown(HOW_TO_MD)
new_sess_btn.click(
reset_session,
inputs=[],
outputs=[session_id, chatbot, retrieved_out, ledger_out, receipt_path, receipt_file],
)
demo.launch()