精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

使用 RAG、LangChain、FastAPI 與 Streamlit 構建 Text-to-SQL 聊天機器人

發布于 2025-11-5 07:37
瀏覽
0收藏

在這個項目中,我構建了一個由 AI 驅動的聊天機器人,它可以將自然語言問題轉換為 SQL 查詢,并直接從真實的 SQLite 數據庫中檢索答案。借助 LangChain、Hugging Face Embeddings 和 Chroma 向量存儲,這個應用展示了如何通過 Retrieval-Augmented Generation(RAG,檢索增強生成)工作流,把非結構化的用戶輸入與結構化數據連接起來——配有 FastAPI 后端與 Streamlit 前端界面。

引言:為什么是 Text-to-SQL?

想象一下: 你在會議上,經理突然問道:

“我們能看到上個月加入的所有客戶嗎?”

你看了一眼 SQL 編輯器……發現你得現寫查詢、確認表名,甚至可能還要調試一個缺失的 JOIN。與此同時,另一個人直接問了 AI 聊天機器人同樣的問題——立刻拿到整齊格式化的結果。

這就是 Text-to-SQL 的魔力——把自然語言變成數據庫查詢。

問題所在

SQL(Structured Query Language)是數據分析與數據工程的基石。它強大、靈活、精確——但對很多人來說并不“友好”。 大多數業務用戶、分析師,甚至一些開發者都會覺得 SQL 語法令人生畏,或者在快速獲取洞察時效率不高。

在多數團隊里,這會造成一個鴻溝:

  • 數據是可用的,但不容易“開口就問”。
  • 洞察是存在的,但被 SQL 專業技能“上鎖”。

解決方案:Text-to-SQL

Text-to-SQL 用來彌合這個鴻溝。 它讓任何人——無論技術背景如何——都能直接發問:

“上個季度我們最暢銷的 5 款產品是什么?”

并直接從數據庫拿到答案,AI 會在背后完成所有翻譯工作。

在幕后,一個 Text-to-SQL 系統通常要做四件關鍵的事:

  • 檢索相關的 schema 與上下文(知道有哪些表)。
  • 從自然語言問題生成有效的 SQL 查詢。
  • 安全地校驗和執行 SQL。
  • 以易讀的格式返回結果。

理解 RAG 方法

我們已經看到,Text-to-SQL 讓用戶能用自然語言提問并獲取數據庫結果——但 AI 到底是如何“知道”你的表、列和關系的呢?

答案是:RAG(Retrieval-Augmented Generation,檢索增強生成)。

什么是 RAG?

本質上,RAG 是一種結合“檢索(Retrieval)”與“生成(Generation)”的混合式 AI 方法:

  • 檢索——系統抓取相關信息(這里指數據庫 schema、表名、表間關系等)。
  • 生成——LLM 根據檢索到的上下文生成準確且有根據的輸出(即 SQL 查詢)。

你可以把 RAG 理解為:在 LLM 開始寫 SQL 之前,先給它一張“備考小抄”——它需要看到的確切 schema。

為什么不直接用普通的 LLM?

如果你直接問一個大語言模型(LLM),比如 GPT 或 Claude:

“列出上個月加入的所有客戶。”

它可能會生成這樣的 SQL:

SELECT * FROM users WHERE signup_date >= '2025-09-01';

看起來沒問題——直到你意識到你的數據庫里壓根沒有叫做 ??users??? 的表。也許你的是 ??customer_data??? 或 ??crm_clients??。

這就是問題所在:當缺少上下文時,LLM 容易“幻覺”。 它會猜表名、漏掉 JOIN 關系,或用錯列名——因為默認情況下,它并不知道你的數據庫 schema。

RAG 如何解決這個問題

RAG 通過以“真實、可檢索的知識”為依據,讓模型“落地”。

在一個 Text-to-SQL 的 RAG 回路中,會發生以下事情:

  • 檢索 Schema 上下文
    系統在你的 schema 索引(向量數據庫或內存索引)中搜索表名、列描述、關系等信息。
  • 生成 SQL
    LLM 同時接收你的自然語言問題和檢索到的 schema,生成有效的 SQL 查詢。
  • 執行并返回結果
    查詢被校驗后,在真實數據庫中執行,并把結果返回給用戶。

這能確保每個 SQL 查詢都基于“了解 schema 的推理”,而不是模型的“拍腦袋”。

RAG 回路的實戰流程

一個包含以下循環的簡單流程圖:

[User Question]
      ↓
[Retrieve Schema Info]
      ↓
[Generate SQL (LLM)]
      ↓
[Validate + Execute]
      ↓
[Return Results]
      ?

每一輪都會確保模型在生成查詢前,擁有“最新且準確”的 schema 知識——減少幻覺并提升可靠性。

系統架構

理解了為什么 Text-to-SQL 依賴 RAG 方法后,我們來看看系統的內部結構。

為了更具體,假設你在構建一個聊天機器人,能回答關于公司客戶數據庫的自然語言問題——從“上個月誰加入了?”到“本季度我們的平均訂單金額是多少?”都不在話下。

下面是系統的大致結構圖 ??

1. SQLite 數據庫——結構化數據源

每個 Text-to-SQL 系統都始于一個數據源。 為簡化,我們使用 SQLite——一種輕量的、基于文件的數據庫,適合原型開發與測試。

這里存放著你的真實數據:比如 ??customers???、??orders???、??products?? 等表。

當用戶提問時,我們的目標是把問題轉換成能在這個數據庫上執行的有效 SQL 查詢。

2. Embedding 層——把 schema 轉成向量

在模型生成 SQL 之前,它需要“理解”數據庫結構——表名、列名,以及它們的含義。

我們通過 Embeddings(向量嵌入)來實現:它是對文本的數字化表示,能夠捕捉語義。 借助 Hugging Face Embeddings(如 ??all-MiniLM-L6-v2??),我們把 schema 元數據轉為向量:

import os
import sqlite3
import hashlib
from tqdm import tqdm
from dotenv import load_dotenv
from langchain_community.embeddings.huggingface import HuggingFaceEmbeddings
from langchain_community.vectorstores import Chroma

# Load environment variables
load_dotenv()
SQLITE_PATH = os.getenv("SQLITE_PATH", "sample_db/sample.db")
CHROMA_DIR = os.getenv("CHROMA_DIR", "./chroma_persist")
EMBED_MODEL = os.getenv("EMBED_MODEL", "sentence-transformers/all-MiniLM-L6-v2")

# ? Initialize HuggingFace embedding model
embeddings = HuggingFaceEmbeddings(model_name=EMBED_MODEL)

# ? Create / Load Chroma vector store
vectorstore = Chroma(
    collection_name="sqlite_docs",
    persist_directory=CHROMA_DIR,
    embedding_function=embeddings
)

def row_hash(values):
    """Generate unique hash for a row."""
    return hashlib.sha256("|".join(map(str, values)).encode()).hexdigest()

def row_to_text(table, cols, row):
    """Convert SQLite row into a readable text chunk."""
    return f"Table: {table}\n" + "\n".join([f"{c}: {v}" for c, v in zip(cols, row)])

def index_table(conn, table):
    """Index a single table into the vector store."""
    cur = conn.cursor()
    cur.execute(f"PRAGMA table_info({table});")
    cols = [c[1] for c in cur.fetchall()]
    cur.execute(f"SELECT {', '.join(cols)} FROM {table}")
    rows = cur.fetchall()

    docs, ids, metas = [], [], []
    for r in rows:
        txt = row_to_text(table, cols, r)
        pk = str(r[0])
        hid = row_hash(r)
        ids.append(f"{table}:{pk}")
        docs.append(txt)
        metas.append({"table": table, "pk": pk, "hash": hid})

    # Add to Chroma vector store
    vectorstore.add_texts(texts=docs, metadatas=metas, ids=ids)

def main():
    """Main indexing pipeline."""
    conn = sqlite3.connect(SQLITE_PATH)
    cur = conn.cursor()
    cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'")
    tables = [t[0] for t in cur.fetchall()]

    for t in tqdm(tables, desc="Indexing tables"):
        index_table(conn, t)

    conn.close()
    print("Indexing complete and persisted in Chroma.")

if __name__ == "__main__":
    main()

現在,每張表和每個字段都在高維空間中擁有一個向量表示——為后續的智能檢索打下基礎。

3. Vector Store(Chroma)——定位相關 schema

接著,我們把這些 embeddings 存進向量數據庫——這里使用 Chroma。

當用戶提問時,我們會:

  1. 使用同一個 embedding 模型對問題進行向量化。
  2. 在 Chroma 中檢索與之“最相近”的 schema 元素。

例如,當用戶問“最近有哪些用戶注冊?”時,檢索器可能會找到類似 ??customers.signup_date??? 和 ??customers.name?? 的 schema 項。

這樣能確保模型只看到相關的 schema 上下文——讓 SQL 生成“腳踏實地”。

4. LangChain 組件——系統“大腦”

LangChain 把所有環節串起來,提供檢索、推理與 SQL 生成的模塊化組件:

  • Retrievers:從 Chroma 拉取相關的 schema 片段。
  • Chains:定義步驟序列——檢索 → 生成 → 校驗 → 執行。
  • LLMs:基于用戶問題和檢索到的 schema 生成 SQL。

示例代碼:

from typing import TypedDict, List
from langchain_community.vectorstores import Chroma
from langchain_core.prompts import PromptTemplate
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_community.embeddings.huggingface import HuggingFaceEmbeddings
from dotenv import load_dotenv
import os

load_dotenv()

# state type
class RAGState(TypedDict, total=False):
    question: str
    retrieved_docs: List[str]
    generated_sql: str
    validated_sql: str
    sql_result: List[dict]
    messages: List[dict]


# init vectorstore & models
CHROMA_DIR = os.getenv("CHROMA_DIR", "./chroma_persist")
EMBED_MODEL = os.getenv("EMBED_MODEL", "text-embedding-3-small")
LLM_MODEL = os.getenv("LLM_MODEL", "gpt-4o-mini")
TOP_K = int(os.getenv("TOP_K", "8"))

embeddings = HuggingFaceEmbeddings(model_name=EMBED_MODEL)
vectordb = Chroma(persist_directory=CHROMA_DIR, embedding_function=embeddings, collection_name="sqlite_docs")
retriever = vectordb.as_retriever(search_kwargs={"k": TOP_K})

llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash", temperature=0)

sql_prompt = PromptTemplate.from_template("""
        You are a SQL generator. Based on the following context, generate a SINGLE READ-ONLY SQLite SELECT query (no semicolons, no multiple statements).
        Context:
        {context}
        
        Question:
        {question}
        
        Return only the SQL SELECT statement.
        """)


async def retriever_node(state: RAGState) -> RAGState:
    docs = await retriever.ainvoke(state["question"])
    state["retrieved_docs"] = [d.page_content for d in docs]
    return state



import re
from typing import Any

async def sql_generator_node(state: RAGState) -> RAGState:
    """
    Generate SQL from the retrieved documents and user question.
    Cleans LLM output, removes markdown/code fences, and ensures only SELECT statements remain.
    """
    # 1. Combine retrieved documents
    context = "\n\n".join(state.get("retrieved_docs", []))

    # 2. Format the prompt
    prompt_text = sql_prompt.format(context=context, question=state["question"])

    # 3. Call the LLM asynchronously
    out = await llm.ainvoke(prompt_text)

    # 4. Extract text content if output is an AIMessage or ChatResult
    if hasattr(out, "content"):
        out = out.content
    out = str(out).strip()

    # 5. Remove code fences ``` or ```sql and any leading/trailing whitespace
    out = re.sub(r"```(?:sql)?\n?", "", out, flags=re.IGNORECASE).replace("```", "").strip()

    # 6. Ensure the SQL starts with SELECT (case-insensitive)
    match = re.search(r"(select\b.*)", out, flags=re.IGNORECASE | re.DOTALL)
    if match:
        out = match.group(1).strip()
    else:
        # fallback if no SELECT found
        out = ""

    # 7. Optional: remove trailing semicolon if present
    out = out.rstrip(";").strip()

    # 8. Save cleaned SQL back to state
    state["generated_sql"] = out
    return state

LangChain 作為“編排器”,確保每個問題都能順暢地走完完整的 RAG 回路。

5. FastAPI 后端——封裝管道

為了讓系統可交互,我們用 FastAPI 封裝整個流程。

FastAPI 能以簡潔且高性能的方式,把 RAG 管道暴露為 API。

當用戶用 POST 請求提交問題時,API 會:

  • 檢索 schema 信息。
  • 生成并校驗 SQL。
  • 執行查詢。
  • 返回格式化結果。

這一層是你的 Text-to-SQL 聊天機器人的“引擎室”。

import os
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from dotenv import load_dotenv
from server.langgraph_nodes import retriever_node, sql_generator_node
from server.sql_validator import validate_sql
from server.executor import execute_sql
from server.utils import allowed_tables_from_db

load_dotenv()
SQLITE_PATH = os.getenv("SQLITE_PATH", "sample_db/sample.db")
ALLOWED_TABLES = allowed_tables_from_db(SQLITE_PATH)

app = FastAPI(title="RAG Text->SQL API")

class QueryRequest(BaseModel):
    question: str
    show_sql: bool = True

@app.post("/query")
async def query(req: QueryRequest):
    state = {"question": req.question, "messages": []}
    # 1. retrieve
    state = await retriever_node(state)
    # 2. generate SQL
    state = await sql_generator_node(state)
    sql = state["generated_sql"]
    ok, reason = validate_sql(sql, ALLOWED_TABLES)
    if not ok:
        raise HTTPException(status_code=400, detail=f"SQL validation failed: {reason}. SQL: {sql}")
    # 3. execute
    cols, rows = execute_sql(sql)
    # format result rows
    result = [dict(zip(cols, r)) for r in rows]
    return {"sql": sql if req.show_sql else None, "cols": cols, "rows": result}

6. Streamlit 前端——聊天界面

最后,我們用 Streamlit 構建一個前端界面——一個輕量的 Web 界面供用戶交互。

用戶可以輸入問題、選擇是否查看生成的 SQL,并實時查看結果。

Streamlit 簡單易用,非常適合原型驗證 AI 工具: 幾行 Python 代碼,你就能得到一個能“對話數據”的交互式儀表盤。

使用 RAG、LangChain、FastAPI 與 Streamlit 構建 Text-to-SQL 聊天機器人-AI.x社區

全流程整合

端到端架構圖(示意):

User (Streamlit UI)
        ↓
FastAPI Backend
        ↓
LangChain RAG Pipeline
   ├── Retriever (Chroma Vector DB)
   ├── Embedding Layer (Hugging Face)
   ├── LLM (SQL Generator)
        ↓
SQLite Database (Execute SQL)
        ↓
Results → Back to UI

這個流程涵蓋了一條查詢的整個生命周期——從純英文到執行 SQL,再到可讀的結果。

文末有代碼倉庫


環境準備

  • 安裝依賴:

pip install -r requirements.txt

SQLite 數據庫準備

  • 說明 schema(例如 ??customers???、??orders??)。

os.makedirs("sample_db", exist_ok=True)
DB = "sample_db/sample.db"
conn = sqlite3.connect(DB)
cur = conn.cursor()

cur.execute("""
CREATE TABLE IF NOT EXISTS customers (
  id INTEGER PRIMARY KEY,
  name TEXT,
  email TEXT,
  created_at TEXT
);
""")

cur.execute("""
CREATE TABLE IF NOT EXISTS orders (
  id INTEGER PRIMARY KEY,
  customer_id INTEGER,
  total_amount REAL,
  status TEXT,
  created_at TEXT,
  notes TEXT,
  FOREIGN KEY(customer_id) REFERENCES customers(id)
);
""")
# seed data
customers = [
    (1, "Alice Johnson", "alice@example.com", "2024-12-01"),
    (2, "Bob Lee", "bob@example.com", "2024-12-05"),
    (3, "Carol Singh", "carol@example.com", "2024-12-10"),
    (4, "David Kim", "david.kim@example.com", "2024-12-12"),
.......
]

orders = [
    (1, 1, 120.50, "completed", "2025-01-03", "First order"),
    (2, 1, 15.00, "pending", "2025-01-07", "Gift wrap"),
    (3, 2, 250.00, "completed", "2025-02-10", "Bulk order"),
..........
]

cur.executemany("INSERT OR REPLACE INTO customers VALUES (?,?,?,?)", customers)
cur.executemany("INSERT OR REPLACE INTO orders VALUES (?,?,?,?,?,?)", orders)
conn.commit()
conn.close()

運行以下命令創建并插入 SQLite 數據:

python sample_db/create_sample_db.py

Schema 的 Embedding 與索引

在 AI 能生成準確 SQL 之前,它需要“理解”數據庫結構——有哪些表、每個表包含哪些列,以及數據大概長什么樣。

這就是 Embeddings(向量嵌入)與語義索引發揮作用的地方。

什么是 Embeddings?

Embedding 是文本的數字表示(向量),位于高維空間,能捕捉語義。 例如:

“customer name” 和 “client full name” 這兩句話的向量會在該空間中彼此靠近,因為它們表達的含義大致相同。

通過為每個“表名”“列名”甚至“樣本數據”建立向量表示,模型就能在用戶提問時檢索到相關上下文——例如,“顯示最近注冊”會在語義上匹配 ??signup_date???、??created_at??? 或 ??join_date?? 這樣的列。

我們將用到的工具

  • Hugging Face Embeddings:把文本轉為向量。
  • Chroma Vector Store:高效存儲與檢索這些向量。
  • SQLite:作為被索引的真實數據庫。

以下是索引腳本代碼 ??

# ? Initialize HuggingFace embedding model
embeddings = HuggingFaceEmbeddings(model_name=EMBED_MODEL)

# ? Create / Load Chroma vector store
vectorstore = Chroma(
    collection_name="sqlite_docs",
    persist_directory=CHROMA_DIR,
    embedding_functinotallow=embeddings
)

def row_hash(values):
    """Generate unique hash for a row."""
    return hashlib.sha256("|".join(map(str, values)).encode()).hexdigest()

def row_to_text(table, cols, row):
    """Convert SQLite row into a readable text chunk."""
    return f"Table: {table}\n" + "\n".join([f"{c}: {v}" for c, v in zip(cols, row)])

def index_table(conn, table):
    """Index a single table into the vector store."""
    cur = conn.cursor()
    cur.execute(f"PRAGMA table_info({table});")
    cols = [c[1] for c in cur.fetchall()]
    cur.execute(f"SELECT {', '.join(cols)} FROM {table}")
    rows = cur.fetchall()

    docs, ids, metas = [], [], []
    for r in rows:
        txt = row_to_text(table, cols, r)
        pk = str(r[0])
        hid = row_hash(r)
        ids.append(f"{table}:{pk}")
        docs.append(txt)
        metas.append({"table": table, "pk": pk, "hash": hid})

    # Add to Chroma vector store
    vectorstore.add_texts(texts=docs, metadatas=metas, ids=ids)

def main():
    """Main indexing pipeline."""
    conn = sqlite3.connect(SQLITE_PATH)
    cur = conn.cursor()
    cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'")
    tables = [t[0] for t in cur.fetchall()]

    for t in tqdm(tables, desc="Indexing tables"):
        index_table(conn, t)

    conn.close()
    print("Indexing complete and persisted in Chroma.")

生成 embeddings 并保存到向量數據庫:

python ingestion/index_sqlite.py

步驟說明

  1. 提取 schema 和數據
    對每張表抓取列名和若干行樣本數據。每一行都會被組織成描述該表內容的小“文檔”。
  2. 文本轉向量
    使用 ???HuggingFaceEmbeddings???,將每個文本片段(如 ??"Table: customers\nname: John\nsignup_date: 2025-09-10"??)轉換為數值向量。
  3. 存入 Chroma
    把這些 embeddings 存入 Chroma 向量存儲,同時保存元數據(如表名、主鍵、hash ID)。
  4. 啟用語義搜索
    當用戶提問時,我們會把問題嵌入到同一向量空間,用 Chroma 檢索“最相關”的 schema 或數據片段。

檢索給 LLM 精準的上下文,幫助其寫出準確、可靠的 SQL。

為什么索引很重要

沒有 embeddings,模型就是“摸黑前行”——它不知道有哪些列、各自數據類型是什么。 通過對 schema 做 embedding 與索引,你為模型建立了一套“語義記憶”,它能在查詢時動態參考,從而始終生成有效且具備上下文意識的 SQL。

SQLite Database
   ↓
Extract Tables + Columns
   ↓
Convert to Embeddings (Hugging Face)
   ↓
Store in Chroma Vector DB
   ↓
Semantic Search During Query Time

RAG 查詢流程

我們已經構建了 schema 索引。現在開始讓它發揮作用。 RAG 查詢流程就是“魔法”發生的地方——系統接收用戶問題、檢索相關 schema 上下文、生成 SQL、在 SQLite 上執行,并返回結果。

逐步拆解如下:

第 1 步:檢索相關 schema

當用戶提問——比如:

“顯示上個月加入的所有客戶”

系統會把這個問題嵌入到與你的數據庫 schema 相同的向量空間(感謝之前創建的 embeddings)。利用 Chroma 的 retriever,它會找到最相關的 schema 片段(例如與 ??customers???、??signup_date?? 相關的表與列)。

async def retriever_node(state: RAGState) -> RAGState:
    docs = await retriever.ainvoke(state["question"])
    state["retrieved_docs"] = [d.page_content for d in docs]
    return state

由此,模型具備了對真實 schema 的“認知”——一張數據庫“備忘單”。

第 2 步:通過 LLM 生成 SQL

接下來,我們把問題和檢索到的 schema 一并交給 LLM(如 OpenAI、Mistral 或 Gemini)。模型使用專門的 prompt,只生成一個有效的只讀 ??SELECT?? 查詢。

sql_prompt = PromptTemplate.from_template("""
You are a SQL generator. Based on the following context, generate a SINGLE READ-ONLY SQLite SELECT query (no semicolons, no multiple statements).

Context:
{context}

Question:
{question}

Return only the SQL SELECT statement.
""")

LLM 的輸出會被清洗并校驗,確保它只輸出 SQL 語句本身:

async def sql_generator_node(state: RAGState) -> RAGState:
    context = "\n\n".join(state.get("retrieved_docs", []))
    prompt_text = sql_prompt.format(cnotallow=context, questinotallow=state["question"])
    out = await llm.ainvoke(prompt_text)

    out = str(getattr(out, "content", out)).strip()
    out = re.sub(r"```(?:sql)?\n?", "", out, flags=re.I).replace("```", "").strip()

    match = re.search(r"(select\b.*)", out, flags=re.I | re.DOTALL)
    if match:
        out = match.group(1).rstrip(";").strip()

    state["generated_sql"] = out
    return state

到此,你的聊天機器人已經能實現“英文進、SQL 出”。

第 3 步:校驗 SQL(安全第一)

在執行前我們會驗證查詢,保護數據庫并確保正確性。

我們使用 ??sqlglot?? 來:

  • 拒絕任何非 ??SELECT??? 語句(禁止 ??DELETE???、??UPDATE?? 等)。
  • 檢查只引用允許的表。
  • 安全解析語法。

def validate_sql(sql: str, allowed_tables: Set[str]) -> Tuple[bool, str]:
    if ";" in sql:
        return False, "semicolon or multiple statements not allowed"
    for kw in DISALLOWED:
        if f" {kw} " in f" {sql.lower()} ":
            return False, f"disallowed keyword: {kw}"

    try:
        parsed = sqlglot.parse_one(sql, read="sqlite")
    except Exception as e:
        return False, f"sql parse error: {e}"

    if parsed.key.lower() not in ALLOWED_STATEMENTS:
        return False, "only SELECT statements allowed"

    tables = extract_tables(sql)
    if not tables.issubset(allowed_tables):
        return False, f"disallowed tables used: {tables - allowed_tables}"
    return True, "ok"

這一層讓系統保持“只讀且安全”。

第 4 步:在 SQLite 上執行

通過校驗后,SQL 會在 SQLite 數據庫中執行。我們還加了行數限制等保護,避免重負載查詢。

def execute_sql(sql: str, row_limit: int = 1000, timeout=5.0) -> Tuple[List[str], List[Tuple[Any]]]:
    sql = enforce_limit(sql, row_limit)
    conn = open_ro_conn()
    conn.execute(f"PRAGMA busy_timeout = {int(timeout*1000)};")
    cur = conn.cursor()
    cur.execute(sql)
    cols = [c[0] for c in cur.description] if cur.description else []
    rows = cur.fetchmany(row_limit)
    conn.close()
    return cols, rows

然后把結果封裝成 JSON 返回給用戶。

FastAPI 后端

當 RAG 管道構建并通過測試后,下一步就是把它“暴露成 API”,讓用戶和前端能實時發送問題、獲取 SQL 結果并交互。

FastAPI 正適合此類需求——它快速、類型安全、原生支持異步,非常適合服務 AI 工作流。

為什么選 FastAPI?

FastAPI 提供:

  • 速度:異步 I/O + 自動文檔(Swagger / ReDoc)
  • 易集成:便于與 LangChain、Chroma 或本地模型對接
  • 校驗:Pydantic 確保入參規范
  • 可擴展:可部署到 Docker、Serverless 或本地私有化

簡言之:它是對你的 RAG Text-to-SQL 邏輯的理想封裝器。

API 設計概述

我們暴露一個端點:

POST /query

請求體:

{
  "question": "Show me all customers who joined last month",
  "show_sql": true
}

響應示例:

{
  "sql": "SELECT * FROM customers WHERE join_date >= '2025-09-01'",
  "cols": ["id", "name", "join_date"],
  "rows": [
    {"id": 1, "name": "Alice", "join_date": "2025-09-05"},
    {"id": 2, "name": "Bob", "join_date": "2025-09-12"}
  ]
}

完整代碼:

import os
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from dotenv import load_dotenv

# import the core RAG components
from server.langgraph_nodes import retriever_node, sql_generator_node
from server.sql_validator import validate_sql
from server.executor import execute_sql
from server.utils import allowed_tables_from_db

# Load environment variables
load_dotenv()
SQLITE_PATH = os.getenv("SQLITE_PATH", "sample_db/sample.db")
ALLOWED_TABLES = allowed_tables_from_db(SQLITE_PATH)

app = FastAPI(title="RAG Text->SQL API")

class QueryRequest(BaseModel):
    question: str
    show_sql: bool = True

@app.post("/query")
async def query(req: QueryRequest):
    # Initialize conversation state
    state = {"question": req.question, "messages": []}

    # 1?? Retrieve relevant schema info
    state = await retriever_node(state)

    # 2?? Generate SQL query using LLM
    state = await sql_generator_node(state)
    sql = state["generated_sql"]

    # 3?? Validate SQL
    ok, reason = validate_sql(sql, ALLOWED_TABLES)
    if not ok:
        raise HTTPException(status_code=400, detail=f"SQL validation failed: {reason}. SQL: {sql}")

    # 4?? Execute SQL safely
    cols, rows = execute_sql(sql)
    result = [dict(zip(cols, r)) for r in rows]

    # 5?? Return JSON response
    return {
        "sql": sql if req.show_sql else None,
        "cols": cols,
        "rows": result
    }

流程分解

  1. 接收問題
    用戶通過 POST 發送自然語言問題。
  2. 檢索 schema 上下文
    ???retriever_node?? 從 Chroma 拉取最相關的表與列。
  3. 生成 SQL
    ???sql_generator_node??? 使用檢索上下文和問題,生成 ??SELECT?? 語句。
  4. 安全校驗
    ???validate_sql()?? 檢查查詢格式,并確保只訪問允許的表。
  5. 執行并返回
    ???execute_sql()?? 在 SQLite(只讀模式)上執行,并返回格式化的結果。

為什么這種設計有效

  • 模塊化:檢索、生成、校驗、執行都可替換或擴展。
  • 安全:只允許 ??SELECT??,并對表做白名單控制。
  • 異步:整條管道可并發運行,易于隨用戶負載擴展。
  • 易部署:Docker 打包,上云(AWS/GCP)或 Hugging Face Spaces 都很快。

[User / Streamlit UI]
        ↓ (POST /query)
    FastAPI Backend
  ├── retriever_node()  →  Chroma Vector DB
  ├── sql_generator_node()  →  LLM (OpenAI / Gemini / Mistral)
  ├── validate_sql()  →  SQLGlot Parser
  └── execute_sql()  →  SQLite Database
        ↓
     JSON Response

這樣的模塊化架構讓你的后端健壯、透明、且具備生產就緒度。

Streamlit 前端

FastAPI 后端跑起來后,我們再把它做成“可交互”的應用。 目標:構建一個簡單的網頁界面,讓任何人都能輸入自然語言問題、查看生成的 SQL,并實時看到查詢結果——幾秒鐘之內完成閉環。

完整代碼:

import streamlit as st
import requests

API_URL = "http://localhost:8000/query"

st.set_page_config(page_title="RAG Text→SQL Demo", layout="centered")
st.title("RAG Text → SQL (SQLite replica)")

with st.form("query_form"):
    question = st.text_input(
        "Ask a natural language question (about the DB)",
        value="Show total orders per customer"
    )
    show_sql = st.checkbox("Show generated SQL", value=True)
    submitted = st.form_submit_button("Submit")

if submitted:
    # Ensure types are correct
    question = str(question).strip()
    show_sql = bool(show_sql)

    if not question:
        st.warning("Please enter a question.")
    else:
        payload = {"question": question, "show_sql": show_sql}

        with st.spinner("Querying..."):
            try:
                resp = requests.post(API_URL, jsnotallow=payload, timeout=60)
                resp.raise_for_status()
                data = resp.json()

                if show_sql:
                    st.subheader("?? Generated SQL")
                    st.code(data.get("sql", ""), language="sql")

                st.subheader("??  Query Results")
                rows = data.get("rows", [])
                if rows:
                    st.dataframe(rows)
                else:
                    st.info("No rows returned for this query.")

            except requests.exceptions.HTTPError as http_err:
                st.error(f"HTTP error occurred: {http_err} - {resp.text}")
            except requests.exceptions.ConnectionError:
                st.error("Could not connect to the API. Make sure FastAPI is running on localhost:8000.")
            except requests.exceptions.Timeout:
                st.error("Request timed out. Try again later.")
            except Exception as e:
                st.error(f"Unexpected error: {e}")

工作原理

  1. 用戶輸入
    用戶在 Streamlit 文本框中輸入問題。
  2. 表單提交
    Streamlit 把問題(以及 ???show_sql??? 標志)作為 JSON 發送到 FastAPI 的 ??/query?? 端點。
  3. 后端處理
    FastAPI 運行完整的 RAG 管道:
  • 檢索相關 schema
  • 用 LLM 生成 SQL
  • 在 SQLite 上校驗并執行
  1. 結果展示
    Streamlit 接收 JSON 響應,并:
  • 顯示生成的 SQL(當 ??show_sql=True??)
  • 用交互式表格展示結果行

+----------------------------------------------------------+
|  ?? RAG Text → SQL (SQLite Replica)                      |
|  ------------------------------------------------------  |
|  Ask a natural language question:                        |
|  [ Show total orders per customer             ] [Submit] |
|                                                          |
|  ?? Generated SQL                                         |
|  SELECT customer_id, COUNT(*) AS total_orders             |
|  FROM orders GROUP BY customer_id                         |
|                                                           |
|  ??  Query Results                                         |
|  ┌──────────────┬───────────────┐                         |
|  │ customer_id  │ total_orders  │                         |
|  ├──────────────┼───────────────┤                         |
|  │ 1            │ 12            │                         |
|  │ 2            │ 8             │                         |
|  └──────────────┴───────────────┘                         |
+----------------------------------------------------------+

這給用戶帶來一個“即時反饋”閉環——從自然語言到 SQL,再到數據可視化。

運行應用:

uvicorn main:app --reload
streamlit run app.py

演示:

問題:Show total orders per customer

使用 RAG、LangChain、FastAPI 與 Streamlit 構建 Text-to-SQL 聊天機器人-AI.x社區

問題:Total revenue from completed orders

使用 RAG、LangChain、FastAPI 與 Streamlit 構建 Text-to-SQL 聊天機器人-AI.x社區

下一步:增量 Embeddings 與可擴展更新

此時你已經構建了一個完整的 RAG 驅動的 Text-to-SQL 管道——從 schema 的 embedding 和檢索,到 SQL 生成、校驗和執行。 但在真實世界里,數據庫會持續演變。表會變化、新記錄會插入、列定義也會隨時間調整。

那么,如何保持 embedding 索引(從而保持 RAG 系統)是最新的呢?

下面是把項目推進到生產環境的一些方向:

1. 處理動態數據庫

在原型中,我們在啟動時只索引了一次數據庫。 這對靜態數據集很好用,但生產系統需要在以下情況進行“增量 embedding 更新”:

  • 表結構變化(新增/刪除列)。
  • 新增行并對語義有實質性影響。
  • schema 文檔或元數據發生變化。

與其全量重建索引,不如“只嵌入變化的部分”。

增量索引策略

  • 用時間戳或行 hash 跟蹤更新。
  • 與 Chroma 中已存的元數據(如 hash ID)比較。
  • 僅對修改的行或新增的表做重新 embedding。
  • 使用 Chroma 的增量 ??add_texts()?? API 持久化更新。

這種方式最小化成本、時間與冗余,面對千萬級數據時尤為關鍵。

2. 用后臺任務自動化

重新索引或更新 embeddings 不應阻塞用戶查詢。 你可以把這些操作交給后臺 worker:

  • Celery(配合 Redis/RabbitMQ):分布式任務調度。
  • FastAPI BackgroundTasks:輕量異步更新。

使用 FastAPI 內置后臺任務的示例:

from fastapi import BackgroundTasks

@app.post("/update_embeddings")
async def update_embeddings(background_tasks: BackgroundTasks):
    background_tasks.add_task(reindex_changed_tables)
    return {"status": "update scheduled"}

這樣,你的主 API(??/query??)可以保持響應靈敏,而索引在后臺更新。

3. 未來增強方向

  • 基于公司查詢日志微調 SQL 生成器。
  • 為常見問題添加緩存。
  • 在 Streamlit 中集成可視化圖表,動態展示結果。
  • 面向大規模數據集,切換到更大的向量數據庫(如 Pinecone 或 Qdrant)。
  • 添加反饋回路:允許用戶糾正 SQL,并據此改進模型。

這些改進將把你的系統推向“能自我學習”的數據助理,更懂你的演進中的 schema 與業務邏輯。

關鍵收獲

你已經構建了一個強大的系統——一個基于 RAG 的完整 Text-to-SQL 方案,把純英文轉化為可操作的數據庫洞察。回顧你完成的內容:

1. 掌握了用于 Text-to-SQL 的 RAG

你學會了如何用 RAG 彌合自然語言與結構化數據庫 schema 之間的差距,從而顯著提升準確性并降低 SQL 生成的幻覺。

  • 嵌入數據庫 schema 與樣本行,提供上下文。
  • 在生成查詢前先檢索最相關片段。
  • 把檢索與 LLM 推理結合,獲得可靠的 SQL 輸出。

2. 構建了模塊化、可擴展的架構

你使用 LangChain 與 FastAPI,把系統搭成了一個具備生產級結構的 AI 服務:

  • SQLite 作為結構化數據源。
  • Hugging Face Embeddings + Chroma 做語義檢索。
  • LangChain 編排 retriever、prompt 與模型。
  • FastAPI 提供輕量、異步的后端。
  • Streamlit 提供簡潔、友好的前端界面。

各層都模塊化——可替換模型或數據庫,而不破壞其他環節。

3. 部署了交互式聊天界面

你用 Streamlit 把所有能力封裝起來,讓用戶可以:

  • 直接輸入自然語言問題。
  • 查看生成的 SQL。
  • 即時在動態表格中查看結果。

你把原本屬于“開發者專屬”的工作(寫 SQL),變成了“與數據的自然對話”。

最后的想法

這個項目體現了我們與數據交互方式的一次強大轉變:AI 正在讓數據庫訪問民主化,讓任何人都能在不懂 SQL 的情況下提出復雜問題。

通過把檢索增強生成與現代 embeddings 和對話界面結合,我們正在打造把數據直接交到人們手中的工具——讓洞察更快、更容易、更觸手可及。

歡迎交流你的反饋、想法,以及你構建 Text-to-SQL 系統的經驗。

在 GitHub 查看完整代碼并開始動手嘗試:???https://github.com/dharampatel/ConvertQueryToSQL/tree/master??

本文轉載自??AI大模型觀察站??,作者:AI研究生

已于2025-11-5 07:37:22修改
收藏
回復
舉報
回復
相關推薦
欧美视频一二三| av日韩在线网站| 久久亚洲精品国产亚洲老地址| 国产传媒免费观看| av老司机免费在线| 国产亚洲一本大道中文在线| 91麻豆桃色免费看| 精品欧美一区二区三区免费观看| 久久一区二区三区喷水| 精品国产乱码久久久久久免费| 50路60路老熟妇啪啪| 在线中文字幕视频观看| 久久亚洲欧美国产精品乐播 | 亚洲精品日韩成人| www.国产视频| 蜜臀久久99精品久久久久宅男 | 中文字幕精品三级久久久 | 国自产拍偷拍精品啪啪一区二区| 视频免费一区| 91捆绑美女网站| 成人字幕网zmw| 日韩综合在线观看| 亚洲青涩在线| 久久综合久久美利坚合众国| 自拍偷拍亚洲天堂| 国产福利资源一区| 制服丝袜在线91| 亚欧在线免费观看| 男人天堂视频在线观看| 亚洲综合视频在线观看| 中文字幕日韩精品久久| 国产免费永久在线观看| 97se亚洲国产综合自在线不卡| 亚洲free性xxxx护士hd| 中文字幕第三页| 免费一区视频| 91精品国产精品| 深夜福利影院在线观看| 天天综合亚洲| 色777狠狠综合秋免鲁丝| 六月婷婷七月丁香| 任你弄精品视频免费观看| 日韩精品一区二区三区swag| 美女在线视频一区二区| 欧美国产日韩电影| 在线精品国精品国产尤物884a| 免费在线观看亚洲视频| 96av在线| 精品久久久久久亚洲国产300| 国产1区2区3区中文字幕| 超鹏97在线| 亚洲精品欧美在线| 佐佐木明希av| 色爱综合区网| 亚洲午夜精品久久久久久久久| 国产人妻互换一区二区| av在线免费网站| 亚洲欧美国产三级| 91免费国产精品| 丁香花高清在线观看完整版| 亚洲国产精品久久不卡毛片| 黄页网站大全在线观看| 性xxxxfreexxxxx欧美丶| 福利二区91精品bt7086| 欧美日韩激情视频在线观看| 午夜影院在线观看国产主播| 91久久久免费一区二区| 最新免费av网址| 亚洲日本va中文字幕| 亚洲国产成人精品久久久国产成人一区| 人妻激情偷乱频一区二区三区| 日韩高清一级| 亚洲性生活视频| 国产小视频你懂的| 国内精品嫩模av私拍在线观看| 97视频在线观看成人| 日本久久综合网| 久久99国产精品麻豆| 99精品在线直播| 香港三日本三级少妇66| 国产日韩欧美a| 国产高清免费在线| www.youjizz.com在线| 一本色道久久综合精品竹菊| 九九九九九九九九| 国产91精品入| 一本色道久久88亚洲综合88| 永久免费看mv网站入口| 一区在线免费| 国产精品久久久久久久久粉嫩av| 国产精品久久免费| 91亚洲国产成人精品一区二三| 欧美日韩三区四区| 国产秀色在线www免费观看| 五月综合激情网| 午夜久久福利视频| 老汉色老汉首页av亚洲| 中文字幕在线成人| 久久久久99精品| 免费在线欧美视频| 国语精品免费视频| 思思99re6国产在线播放| 亚洲.国产.中文慕字在线| 男女污污的视频| jizz性欧美23| 日韩中文视频免费在线观看| 国产精品6666| 国产一区二区日韩精品| 日本欧洲国产一区二区| 成人三级小说| 欧美日韩大陆在线| 亚洲国产无码精品| 欧美日本精品| 国产有码一区二区| 国产三级视频在线| 午夜精品久久久久久久| 亚洲精品在线视频播放| 九九久久精品| 97视频在线观看免费| www.黄色片| 国产精品激情偷乱一区二区∴| 午夜肉伦伦影院| 9国产精品午夜| 久热精品在线视频| 在线观看中文字幕码| 26uuuu精品一区二区| 拔插拔插海外华人免费| 日本精品在线一区| 日韩毛片在线看| 国产成人一区二区三区影院在线 | 日本久久精品电影| 亚洲视频在线播放免费| 欧美精品aa| 成人网在线视频| 日本高清中文字幕在线| 欧美视频中文一区二区三区在线观看 | 8av国产精品爽爽ⅴa在线观看 | 国产精品密蕾丝袜| 亚洲免费综合| 精品国产一区二区三区日日嗨| 亚洲区欧洲区| 日韩精品最新网址| 国产小视频在线观看免费| 国产成人精品影视| 欧美日韩dvd| 亚洲2区在线| 欧美另类在线观看| 性生活视频软件| 一区二区三区免费| 稀缺呦国内精品呦| 亚洲精品偷拍| 麻豆亚洲一区| 婷婷午夜社区一区| 在线视频中文亚洲| 在线观看中文字幕码| 亚洲日本护士毛茸茸| 宇都宫紫苑在线播放| 欧美精选在线| 国产精品一区二区三区免费| 成人一级福利| 亚洲午夜色婷婷在线| 这里只有精品999| 国产精品久久久久久久久果冻传媒 | 欧洲一区二区三区精品| 亚洲图片欧洲图片av| 在线观看中文字幕av| 伊人一区二区三区| 免费a v网站| 久久午夜影视| 一区二区在线观看网站| 18国产精品| 91精品国产一区| 国产玉足榨精视频在线观看| 欧美日韩亚洲不卡| 免费毛片在线播放免费| a级精品国产片在线观看| 国语对白做受xxxxx在线中国| 欧美伦理在线视频| 亚洲一区二区三区乱码aⅴ蜜桃女| 色呦呦视频在线观看| 亚洲美女视频网| 亚洲中文字幕在线一区| 亚洲一区欧美一区| 亚欧洲乱码视频| 国产一区999| 久久国产成人精品国产成人亚洲| av中文一区| 96久久精品| 三上悠亚国产精品一区二区三区| 日日噜噜噜夜夜爽亚洲精品| 亚洲精品97久久中文字幕| 色综合亚洲欧洲| 日韩福利小视频| 91麻豆精品一区二区三区| 中文字幕线观看| 亚洲免费中文| 粉嫩av一区二区三区天美传媒 | 色呦呦网站入口| 日本成人中文| 亚洲xxxxx性| 芒果视频成人app| 欧美精品制服第一页| 可以在线观看的av| 日韩精品一区二区三区在线观看 | 91在线超碰| 色噜噜狠狠狠综合曰曰曰88av| 欧美一级性视频| 欧美日本乱大交xxxxx| 国产a∨精品一区二区三区仙踪林| 国产精品二三区| 国产精品无码午夜福利| 国产不卡在线视频| 91精品999| 天堂久久一区二区三区| 国产九九九九九| 亚洲综合小说| 亚洲一区二区四区| 中文字幕中文字幕精品| 国产欧美韩日| 日韩中文字幕无砖| 国产日韩欧美日韩| 黄色精品视频| 日韩av电影中文字幕| a天堂资源在线| 欧美裸身视频免费观看| 麻豆传媒在线免费看| 在线中文字幕日韩| 欧洲一区av| 精品亚洲精品福利线在观看| 内射后入在线观看一区| 欧美变态凌虐bdsm| 国产999久久久| 884aa四虎影成人精品一区| 中文字幕 视频一区| 欧美性大战久久久久久久蜜臀| 亚洲国产精品无码久久久| 精品成人av一区| 日本熟妇乱子伦xxxx| 亚洲国产精品视频| 久久久精品视频免费| 亚洲国产日韩av| 免费观看一级视频| 亚洲成人一二三| 亚洲男人的天堂在线视频| 偷拍亚洲欧洲综合| 欧美一级片免费在线观看| 亚洲国产成人va在线观看天堂| 黄色小说在线观看视频| 一片黄亚洲嫩模| 久久久无码精品亚洲国产| 亚洲国产视频在线| 日韩欧美亚洲视频| 狠狠色狠狠色综合日日小说| 丁香六月婷婷综合| 色噜噜狠狠色综合中国| 国产九色91回来了| 欧美日韩精品系列| 国产剧情精品在线| 欧美成人vr18sexvr| 天堂国产一区二区三区| 亚洲欧美999| www.国产精品.com| 欧美成人中文字幕在线| xxx性欧美| 日本不卡视频在线播放| jvid一区二区三区| 成人免费福利在线| 亚洲一区二区三区中文字幕在线观看| 国产精品对白刺激久久久| 久久精品色播| 欧洲精品国产| 午夜精品一区二区三区国产| 国产肉体ⅹxxx137大胆| 先锋亚洲精品| 日本三级黄色网址| 懂色av中文一区二区三区 | 国产亚洲一区| 免费观看中文字幕| 中国女人久久久| 一区二区三区入口| 狠狠狠色丁香婷婷综合激情| 毛茸茸free性熟hd| 中文av一区二区| 精品人妻在线播放| 欧洲av一区二区嗯嗯嗯啊| www.四虎在线观看| 亚洲无线码在线一区观看| 二区在线播放| 91sao在线观看国产| 天天综合在线观看| 六十路精品视频| 亚洲成人精品| 久久久精品在线视频| 国产一区二区伦理| 亚洲av无码成人精品国产| 亚洲精品老司机| 国产午夜无码视频在线观看| 日韩丝袜情趣美女图片| 国产一二在线观看| 久久久久久国产精品三级玉女聊斋| 日韩成人动漫| 国产伦精品一区| 999久久久精品国产| 久久婷婷五月综合色国产香蕉| 国产在线精品免费| 免费看裸体网站| 精品久久久久久久久国产字幕| 99久久国产免费| 中文字幕九色91在线| 亚洲精品88| 99久久久精品免费观看国产| 日韩精品水蜜桃| 亚洲人成色77777| 不卡电影免费在线播放一区| 看免费黄色录像| 在线观看日产精品| 神马电影在线观看| 久久99热精品| 爱情电影网av一区二区| 色噜噜色狠狠狠狠狠综合色一| 亚洲视频播放| 国产人妖在线观看| 一区二区三区在线观看国产 | 亚洲国产精品久久91精品| www视频在线免费观看| 国产精品永久在线| 国内精品久久久久久久久电影网| 久久久久久久中文| 成人在线一区二区三区| 老湿机69福利| 91麻豆精品国产无毒不卡在线观看| 成人在线免费观看| 日韩av电影在线免费播放| 精品在线91| 国产精品亚洲αv天堂无码| 97久久超碰国产精品电影| 亚州国产精品视频| 日韩大陆毛片av| 擼擼色在线看观看免费| 国产一区二区无遮挡| 在线欧美视频| 性久久久久久久久久久| 精品成人久久av| 日本一区视频| 日本免费久久高清视频| 奇米色欧美一区二区三区| 免费av网址在线| 国产欧美精品日韩区二区麻豆天美| 免费在线不卡av| 日韩网站免费观看高清| 国产精品久一| 中文字幕日韩精品无码内射| 成人一道本在线| 91久久国产视频| 亚洲欧美中文字幕在线一区| 向日葵视频成人app网址| 日韩av高清| 久久精品国产一区二区三| 九九精品视频免费| 欧美精品一区在线观看| 黄色aa久久| 日韩色妇久久av| 激情五月婷婷综合网| 欧美精品乱码视频一二专区| 亚洲国产成人av在线| 国产精品高清乱码在线观看| 亚洲欧美日韩精品久久久| 国产一区二区看久久| 日韩三级一区二区三区| 国产亚洲xxx| 精品国产一区二| 国精产品一区一区三区视频| 国产三级欧美三级| 国产白浆在线观看| 欧美一区第一页| 久久免费大视频| 精品久久久久久无码人妻| 欧美性xxxx极品hd欧美风情| av福利精品| 岛国视频一区免费观看| 久久久久网站| 波多野结衣在线网址| 日韩av在线最新| 自拍偷拍亚洲| 18岁网站在线观看| 日韩美女久久久| 亚洲欧洲国产综合| 成人免费视频网址| 国产精品呻吟| 亚洲最大的黄色网址| 国产婷婷97碰碰久久人人蜜臀| 亚洲成人毛片| 5月婷婷6月丁香| 一区二区三区毛片| 成a人v在线播放| 国产欧美日韩伦理| 国产乱子轮精品视频| 日韩不卡高清视频| 97精品国产97久久久久久春色| 久久精品影视|