原文地址:https://www.timescale.com/blog/rag-is-more-than-just-vector-search/
原文作者:Jason Liu 、Ivan Leo
许多开发人员错误地将 RAG 应用程序视为对矢量数据库进行简单的语义搜索,以增强大型语言模型 (LLM)。然而,现代 AI 应用程序需要一种更分层的方法来解决嵌入模型的限制。
例如,回答“上个月与性能优化相关的 GitHub 上讨论最多的五个问题是什么?”这个问题需要的不仅仅是相似性搜索,例如:
- 基于时间的过滤(上个月)
- 文本搜索(性能优化)
- 聚合和过滤器(讨论最多的五个问题)
- 仅靠嵌入搜索不足以建立良好的 RAG(检索增强生成)系统。
好消息是,使用 PostgreSQL 和Timescale,我们可以在单个数据库中获得向量搜索、时间序列功能以及 SQL 的所有灵活性。通过将 Timescale 与 LLM 相结合,我们可以:
对嵌入执行向量相似性搜索。
使用语言模型来提取和扩充我们现有的数据集。
编写可以跨多个属性连接的有效查询。
利用 Timescale 的现有功能和时间序列数据。
在这篇文章中,我们将做四件事:
- 超越嵌入:我们将讨论 AI 应用程序中各种功能的需求,以及为什么 Timescale 是许多需要数据检索的用例中嵌入和 SQL 的理想混合。
- 结构化提取:我们将探索使用语言模型将数据直接提取到 Timescale 中。
- 评估驱动开发:我们将重点介绍在创建 AI 工具过程的早期开始测试驱动开发是多么容易,并强调关注特定用例的重要性。
- 总而言之:我们将演示如何实现嵌入搜索和文本到 SQL,展示利用 Timescale 的嵌入和 SQL 工具完成任务是多么简单。
为此,我们将以一个示例应用程序为基础进行讨论,该应用程序允许我们回答有关 GitHub 问题的问题。为了开发 Github 问题问答应用程序,我们将利用 PostgreSQL 和 Timescale 并实现并行工具调用和文本到 SQL 功能,以说明 RAG 在矢量搜索领域可以走多远。
开始构建之前:思考用户需求并进行逆向思考
最具影响力的 AI 应用不一定是最复杂和最具代理性的,而是建立在对用户需求的深刻理解之上的应用。在构建新的 AI 应用时,我们的目标应该是先探索简单的工作流程和专用工具,然后再深入研究复杂的推理和规划系统。
通常,在产品开发中,对复杂系统的追求源于不愿深入了解用户需求。通过真正掌握这些需求,您可以创建简单、有效的解决方案,而不是依赖不可靠的复杂代理。这种方法可以避免最终出现令人印象深刻的演示但令客户失望的风险。
当处理新的数据集时,我们应该始终问自己这两个问题:
- 我们可以利用哪些额外的过滤器和索引?
- 我们能否提取任何额外的元数据来简化手头的任务?
数据提取与摄取
对于 GitHub 问题,我们可能会意识到,除了数据集中现有的功能外,我们还希望拥有其他功能。我们可能会关心:
- 标记不同的问题
- 查看问题是否已解决
- 区分功能请求和错误
这些可以是以用户为中心的特征和指标,我们可以利用它们来显著提高系统回答我们关心的问题的能力。
数据处理增加了标准数据集所不具备的功能。最终,这提高了我们回答用户可能提出的更复杂问题的能力。让我们逐步了解如何为 GitHub 问题构建自定义数据管道。
以下是我们在本节中将要进行的工作概述:
- 数据模型:我们将首先创建Pydantic模型来构建原始和处理后的 GitHub 问题数据。
- 使用生成器:我们将展示如何使用生成器来减少遍历整个数据集所需的时间。
- 数据处理:我们将异步地对这些问题进行分类和总结,然后将它们嵌入以供将来参考。
- 存储和索引增强数据:最后,我们将使用 Timescale 最新发布的pgvectorscale扩展pgvector来有效地存储我们处理过的数据,并设置适当的索引以便快速查询和分析。
数据模型
首先,让我们安装必要的依赖项:
pip install instructor openai tqdm pydantic datasets pgvector asyncpg Jinja2 fuzzywuzzy python-Levenshtei
复制
我们将使用Pydantic
模型来确保类型安全。为此,我们将定义两个Pydantic
类:ProcessedIssue
,表示生成的摘要,和GithubIssue
,表示我们将从数据集中提取的原始数据。
from pydantic import BaseModel from typing import Literal, Any, Optional from datetime import datetime class ClassifiedSummary(BaseModel): chain_of_thought: str label: Literal["OPEN", "CLOSED"] summary: str class ProcessedIssue(BaseModel): issue_id: int text: str label: Literal["OPEN", "CLOSED"] repo_name: str embedding: Optional[list[float]] class GithubIssue(BaseModel): issue_id: int metadata: dict[str, Any] text: str repo_name: str start_ts: datetime end_ts: Optional[datetime] embedding: Optional[list[float]]
复制
使用生成器
让我们获取一些GitHub问题来进行处理。我们将使用bigcode/the-stack-github-issues
数据集和datasets库来简化我们的生活。
我们要做的就是:
- 精选仓库:我们将过滤问题,只关注我们关心的仓库。这将允许我们对最终数据集进行更有针对性的数据分析。
- 获取一个可管理的块:我们将使用
take
函数来获取问题的一个子集。这让我们能够处理一个明显更小的数据集切片,从而能够更快地迭代并进行更多实验。
from datasets import load_dataset
def get_issues(n: int, repos: list[str]):
dataset = (
load_dataset("bigcode/the-stack-github-issues", split="train", streaming=True)
.filter(lambda x: x["repo"] in repos)
.take(n)
)
for row in dataset:
start_time = None
end_time = None
for event in row["events"]:
event_type = event["action"]
timestamp = event["datetime"]
timestamp = timestamp.replace("Z", "+00:00")
if event_type == "opened":
start_time = datetime.fromisoformat(timestamp)
elif event_type == "closed":
end_time = datetime.fromisoformat(timestamp)
# Small Fall Back here - Some issues have no Creation event
elif event_type == "created" and not start_time:
start_time = datetime.fromisoformat(timestamp)
elif event_type == "reopened" and not start_time:
start_time = datetime.fromisoformat(timestamp)
yield GithubIssue(
issue_id=row["issue_id"],
metadata={},
text=row["content"],
repo_name=row["repo"],
start_ts=start_time,
end_ts=end_time,
embedding=None,
)
复制
数据处理
我们可以使用 Python 的async
功能和instructor
库来快速并行处理问题。我们无需等待每个任务完成,而是可以同时处理多个问题。
更好的是,为了确保我们保持在合理的速率限制内,我们还可以使用Semaphore
来控制正在执行的并发任务的数量。
from asyncio import run, Semaphore
from tqdm.asyncio import tqdm_asyncio as asyncio
from textwrap import dedent
from instructor import from_openai
from openai import AsyncOpenAI
from jinja2 import Template
async def batch_classify_issue(
batch: list[GithubIssue], max_concurrent_requests: int = 20
) -> list[ProcessedIssue]:
async def classify_issue(issue: GithubIssue, semaphore: Semaphore):
client = from_openai(AsyncOpenAI())
async with semaphore:
classification = await client.chat.completions.create(
response_model=ClassifiedSummary,
messages=[
{
"role": "system",
"content": "You are a helpful assistant that classifies and summarizes GitHub issues. When summarizing the issues, make sure to expand on specific accronyms and add additional explanation where necessary.",
},
{
"role": "user",
"content": Template(
dedent(
"""
Repo Name: {{ repo_name }}
Issue Text: {{ issue_text}}
"""
)
).render(repo_name=issue.repo_name, issue_text=issue.text),
},
],
model="gpt-4o-mini",
)
return ProcessedIssue(
issue_id=issue.issue_id,
repo_name=issue.repo_name,
text=classification.summary,
label=classification.label,
embedding=None,
)
semaphore = Semaphore(max_concurrent_requests)
coros = [classify_issue(item, semaphore) for item in batch]
results = await asyncio.gather(*coros)
return results
复制
我们还将定义一个函数来同时处理我们的嵌入。这些将有助于在后面的部分中使用pgvector
和在不同问题上执行相似性搜索。pgvectorscale
from openai import AsyncOpenAI
async def batch_embeddings(
data: list[ProcessedIssue],
max_concurrent_calls: int = 20,
) -> list[ProcessedIssue]:
oai = AsyncOpenAI()
async def embed_row(
item: ProcessedIssue,
semaphore: Semaphore,
):
async with semaphore:
input_text = item.text if len(item.text) < 8000 else item.text[:6000]
embedding = (
(
await oai.embeddings.create(
input=input_text, model="text-embedding-3-small"
)
)
.data[0]
.embedding
)
item.embedding = embedding
return item
semaphore = Semaphore(max_concurrent_calls)
coros = [embed_row(item, semaphore) for item in data]
results = await asyncio.gather(*coros)
return results
复制
现在我们已经弄清楚了如何大规模处理和嵌入摘要,我们可以将其加载到 Timescale 中。我们正在使用asyncpg
,它将帮助我们使用该函数自动批量插入execute_many
。
存储和索引增强数据
pgvectorscale我们可以使用 Timescale 来本地管理所有嵌入。与我们最新的基准测试相比,pgvector这让我们能够享受高达 28 倍的 p95 延迟、16 倍的查询吞吐量以及 75% 的成本降低。
我们需要做的就是启用pgvectorscale扩展。这将帮助我们设置pgvector和pgvectorscale部署我们的 Timescale 项目。完成后,我们可以为我们的嵌入创建一个表并对其进行索引以获得最佳性能。
在下面的部分中让我们看看如何做到这一点。
import os
from pgvector.asyncpg import register_vector
import asyncpg
init_sql = """
CREATE EXTENSION IF NOT EXISTS vectorscale CASCADE;
DROP TABLE IF EXISTS github_issue_summaries CASCADE;
DROP TABLE IF EXISTS github_issues CASCADE;
CREATE TABLE IF NOT EXISTS github_issues (
issue_id INTEGER,
metadata JSONB,
text TEXT,
repo_name TEXT,
start_ts TIMESTAMPTZ NOT NULL,
end_ts TIMESTAMPTZ,
embedding VECTOR(1536) NOT NULL
);
CREATE INDEX github_issue_embedding_idx
ON github_issues
USING diskann (embedding);
-- Create a Hypertable that breaks it down by 1 month intervals
SELECT create_hypertable('github_issues', 'start_ts', chunk_time_interval => INTERVAL '1 month');
CREATE UNIQUE INDEX ON github_issues (issue_id, start_ts);
CREATE TABLE github_issue_summaries (
issue_id INTEGER,
text TEXT,
label issue_label NOT NULL,
repo_name TEXT,
embedding VECTOR(1536) NOT NULL
);
CREATE INDEX github_issue_summaries_embedding_idx
ON github_issue_summaries
USING diskann (embedding);
"""
async def get_conn():
conn = await asyncpg.connect(os.getenv("DB_URL"))
await register_vector(conn)
return conn
conn = await get_conn()
await conn.execute(init_sql)
复制
有了 GitHub 问题和问题摘要表,让我们创建两个函数来用相关信息填充我们的数据库。
import json
async def insert_github_issue_summaries(conn, issues: list[GithubIssue]):
insert_query = """
INSERT INTO github_issue_summaries (issue_id, text, label, embedding,repo_name)
VALUES ($1, $2, $3, $4, $5)
"""
summarized_issues = await batch_classify_issue(issues)
embedded_summaries = await batch_embeddings(summarized_issues)
await conn.executemany(
insert_query,
[
(item.issue_id, item.text, item.label, item.embedding, item.repo_name)
for item in embedded_summaries
],
)
print("GitHub issue summaries inserted successfully.")
async def insert_github_issues(conn, issues: list[GithubIssue]):
insert_query = """
INSERT INTO github_issues (issue_id, metadata, text, repo_name, start_ts, end_ts, embedding)
VALUES ($1, $2, $3, $4, $5, $6, $7)
"""
embedded_issues = await batch_embeddings(issues)
await conn.executemany(
insert_query,
[
(
item.issue_id,
json.dumps(item.metadata),
item.text,
item.repo_name,
item.start_ts,
item.end_ts,
item.embedding,
)
for item in embedded_issues
],
)
print("GitHub issues inserted successfully.")
复制
我们可以将以前的函数合并为一个process_issues
函数,将 GitHub 问题数据导入我们的数据库:
async def process_issues(): repos = [ "rust-lang/rust", "kubernetes/kubernetes", "apache/spark", ] conn = await get_conn() issues = list(get_issues(100,repos)) await insert_github_issues(conn,issues) await insert_github_issue_summaries(conn,issues) await process_issues()
复制
我们现在已经创建了一个强大的管道,可以处理我们的 GitHub 问题数据以提取有价值的见解。考虑到这一点,让我们将重点转移到使用评估驱动开发来开发满足客户需求的专用工具。
评估驱动开发
在我们开发可能用于构建 RAG 应用程序的表和索引时,我们还可以参与评估驱动的开发,并在实现特定工具之前测试我们的语言模型选择正确工具的能力。
在这里,我们可以非常有创造力地表达我们想要赋予语言模型的工具。
在 Python 中,Pydantic模式非常适合原型代理工具,因为它们为代理的操作创建了明确的契约。此契约使得在实施之前评估更复杂工具的性能和影响变得容易。
让我们探索如何使用来实现这一点instructor,其中我们有一个带有三个工具的代理,如下所示。
- 原始 SQL 查询生成
- 获取原始采集的问题
- 获取问题摘要
以下是如何使用Pydantic模型定义这些工具:
from pydantic import BaseModel, Field
class SearchIssues(BaseModel):
"""
Use this when the user wants to get original issue information from the database
"""
query: Optional[str]
repo: str = Field(
description="the repo to search for issues in, should be in the format of 'owner/repo'"
)
class RunSQLReturnPandas(BaseModel):
"""
Use this function when the user wants to do time-series analysis or data analysis and we don't have a tool that can supply the necessary information
"""
query: str = Field(description="Description of user's query")
repos: list[str] = Field(
description="the repos to run the query on, should be in the format of 'owner/repo'"
)
class SearchSummaries(BaseModel):
"""
This function retrieves summarized information about GitHub issues that match/are similar to a specific query, It's particularly useful for obtaining a quick snapshot of issue trends or patterns within a project.
"""
query: Optional[str] = Field(description="Relevant user query if any")
repo: str = Field(
description="the repo to search for issues in, should be in the format of 'owner/repo'"
)
复制
我们可以使用下面的实现来测试模型为查询选择适当工具的能力instructor
。
from typing import Iterable, Union
def one_step_agent(question: str):
import instructor
import openai
client = instructor.from_openai(
openai.OpenAI(), mode=instructor.Mode.PARALLEL_TOOLS
)
return client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": "You are an AI assistant that helps users query and analyze GitHub issues stored in a PostgreSQL database. Search for summaries when the user wants to understand the trends or patterns within a project. Otherwise just get the issues and return them. Only resort to SQL queries if the other tools are not able to answer the user's query.",
},
{"role": "user", "content": question},
],
response_model=Iterable[
Union[
RunSQLReturnPandas,
SearchIssues,
SearchSummaries,
]
],
)
复制
测试我们的代理
使用以下循环测试此代理很简单。
由于我们预计代理只会针对这些简单查询调用一个工具,因此我们可以验证其识别和选择适合每个任务的工具的能力。随着我们的测试套件扩展,我们可能需要过渡到异步客户端以提高效率。
tests = [
[
"What is the average time to first response for issues in the azure repository over the last 6 months? Has this metric improved or worsened?",
[RunSQLReturnPandas],
],
[
"How many issues mentioned issues with Cohere in the 'vercel/next.js' repository in the last 6 months?",
[SearchIssues],
],
[
"What were some of the big features that were implemented in the last 4 months for the scipy repo that addressed some previously open issues?",
[SearchSummaries],
],
]
for query, expected_result in tests:
response = one_step_agent(query)
for expected_call, agent_call in zip(expected_result, response):
assert isinstance(agent_call, expected_call)
复制
通过这样做,我们可以为模型选择构建类似单元的测试用例,推动基于评估的测试和单元测试之间的直接并行——所有这些都无需联合测试实现。
实现嵌入搜索
现在,让我们换个角度看看如何对我们导入数据库的 GitHub 问题执行嵌入搜索。
pgvector
一旦我们使用 PostgreSQL 中的和扩展获得了嵌入,它就只需编写一个 SQL 查询就很简单了pgvectorscale
。
我们将通过实现一种方法来实现这一点,execute
该方法在每个搜索工具上使用该asyncpg
库,当提供用户查询时,它将返回相关搜索条目的列表。
from pydantic import BaseModel, Field
from typing import Optional
from openai import OpenAI
from jinja2 import Template
from asyncpg import Connection
class SearchIssues(BaseModel):
"""
Use this when the user wants to get original issue information from the database
"""
query: Optional[str]
repo: str = Field(
description="the repo to search for issues in, should be in the format of 'owner/repo'"
)
async def execute(self, conn: Connection, limit: int):
if self.query:
embedding = (
OpenAI()
.embeddings.create(input=self.query, model="text-embedding-3-small")
.data[0]
.embedding
)
args = [self.repo, limit, embedding]
else:
args = [self.repo, limit]
embedding = None
sql_query = Template(
"""
SELECT *
FROM {{ table_name }}
WHERE repo_name = $1
{%- if embedding is not none %}
ORDER BY embedding <=> $3
{%- endif %}
LIMIT $2
"""
).render(table_name="github_issues", embedding=embedding)
return await conn.fetch(sql_query, *args)
class RunSQLReturnPandas(BaseModel):
"""
Use this function when the user wants to do time-series analysis or data analysis and we don't have a tool that can supply the necessary information
"""
query: str = Field(description="Description of user's query")
repos: list[str] = Field(
description="the repos to run the query on, should be in the format of 'owner/repo'"
)
async def execute(self, conn: Connection, limit: int):
pass
class SearchSummaries(BaseModel):
"""
This function retrieves summarized information about GitHub issues that match/are similar to a specific query, It's particularly useful for obtaining a quick snapshot of issue trends or patterns within a project.
"""
query: Optional[str] = Field(description="Relevant user query if any")
repo: str = Field(
description="the repo to search for issues in, should be in the format of 'owner/repo'"
)
async def execute(self, conn: Connection, limit: int):
if self.query:
embedding = (
OpenAI()
.embeddings.create(input=self.query, model="text-embedding-3-small")
.data[0]
.embedding
)
args = [self.repo, limit, embedding]
else:
args = [self.repo, limit]
embedding = None
sql_query = Template(
"""
SELECT *
FROM {{ table_name }}
WHERE repo_name = $1
{%- if embedding is not none %}
ORDER BY embedding <=> $3
{%- endif %}
LIMIT $2
"""
).render(table_name="github_issue_summaries", embedding=embedding)
return await conn.fetch(sql_query, *args)
复制
然后,我们可以通过运行以下代码片段来验证我们的嵌入搜索是否有效。
query = "What are the main problems people are facing with installation with Kubernetes"
conn = await get_conn()
limit = 10
resp = await SearchSummaries(query=query, repo="kubernetes/kubernetes").execute(
conn, limit
)
for row in resp[:3]:
print(row["text"])
复制
输出
Discussion on the need for better release processes and documentation within the Kubernetes project, with a strong emphasis on improving the user experience for setting up the software on Ubuntu. Users express frustrations over a confusing setup process, the necessity for synchronized documentation updates with code changes, and propose structured solutions for improving documentation efficacy. The issue involved failures in creating a Kubernetes pod sandbox due to the Calico Network Plugin not functioning correctly. The user described encountering multiple error messages regarding pod creation and provided environmental details, including Kubernetes version and CentOS configuration. Suggestions from other users included downgrading Calico or updating the network plugin. The issue was marked as resolved after the user confirmed that using Calico version 2.6 was successful. User reported an issue with the 'kubectl top' command failing due to an unmarshalling error after creating a Kubernetes cluster with kubeadm and deploying Heapster. The error was resolved by removing the HTTP proxy settings from the kube-apiserver configuration, leading to questions about the command's requirements.
复制
就像这样,我们将结果过滤到了特定的存储库,同时仍然利用了嵌入搜索的功能。
处理拼写错误
但是,如果用户在提示中出现拼写错误(例如,kubrntes
而不是)kubernetes/kubernetes
,或者没有拼出整个存储库,会发生什么情况?
fuzzywuzzy
我们可以通过使用库中的以下函数进行字符串匹配来解决这个问题。
from fuzzywuzzy import process def find_closest_repo(query: str, repos: list[str]) -> str | None: if not query: return None best_match = process.extractOne(query, repos) return best_match[0] if best_match[1] >= 80 else None
复制
我们可以通过下面的几个单元测试来验证其是否有效。
repos = [ "rust-lang/rust", "kubernetes/kubernetes", "apache/spark", "golang/go", "tensorflow/tensorflow", "MicrosoftDocs/azure-docs", "pytorch/pytorch", "Microsoft/TypeScript", "python/cpython", "facebook/react", "django/django", "rails/rails", "bitcoin/bitcoin", "nodejs/node", "ocaml/opam-repository", "apache/airflow", "scipy/scipy", "vercel/next.js", ] test = [ ["kuberntes", "kubernetes/kubernetes"], ["next.js", "vercel/next.js"], ["scipy", "scipy/scipy"], ["", None], ["fakerepo", None], ] for query, expected in test: assert find_closest_repo(query, repos) == expected
复制
然后我们可以修改原来的工具来使用这个新find_closest_repo
功能。
from pydantic import BaseModel, Field
from typing import Optional
from openai import OpenAI
from jinja2 import Template
from asyncpg import Connection
class SearchIssues(BaseModel):
"""
Use this when the user wants to get original issue information from the database
"""
query: Optional[str]
repo: str = Field(
description="the repo to search for issues in, should be in the format of 'owner/repo'"
)
@field_validator("repo")
def validate_repo(cls, v: str, info: ValidationInfo):
matched_repo = find_closest_repo(v, info.context["repos"])
if matched_repo is None:
raise ValueError(
f"Unable to match repo {v} to a list of known repos of {info.context['repos']}"
)
return matched_repo
async def execute(self, conn: Connection, limit: int):
if self.query:
embedding = (
OpenAI()
.embeddings.create(input=self.query, model="text-embedding-3-small")
.data[0]
.embedding
)
args = [self.repo, limit, embedding]
else:
args = [self.repo, limit]
embedding = None
sql_query = Template(
"""
SELECT *
FROM {{ table_name }}
WHERE repo_name = $1
{%- if embedding is not none %}
ORDER BY embedding <=> $3
{%- endif %}
LIMIT $2
"""
).render(table_name="github_issues", embedding=embedding)
return await conn.fetch(sql_query, *args)
class RunSQLReturnPandas(BaseModel):
"""
Use this function when the user wants to do time-series analysis or data analysis and we don't have a tool that can supply the necessary information
"""
query: str = Field(description="Description of user's query")
repos: list[str] = Field(
description="the repos to run the query on, should be in the format of 'owner/repo'"
)
async def execute(self, conn: Connection, limit: int):
pass
class SearchSummaries(BaseModel):
"""
This function retrieves summarized information about GitHub issues that match/are similar to a specific query, It's particularly useful for obtaining a quick snapshot of issue trends or patterns within a project.
"""
query: Optional[str] = Field(description="Relevant user query if any")
repo: str = Field(
description="the repo to search for issues in, should be in the format of 'owner/repo'"
)
@field_validator("repo")
def validate_repo(cls, v: str, info: ValidationInfo):
matched_repo = find_closest_repo(v, info.context["repos"])
if matched_repo is None:
raise ValueError(
f"Unable to match repo {v} to a list of known repos of {info.context['repos']}"
)
return matched_repo
async def execute(self, conn: Connection, limit: int):
if self.query:
embedding = (
OpenAI()
.embeddings.create(input=self.query, model="text-embedding-3-small")
.data[0]
.embedding
)
args = [self.repo, limit, embedding]
else:
args = [self.repo, limit]
embedding = None
sql_query = Template(
"""
SELECT *
FROM {{ table_name }}
WHERE repo_name = $1
{%- if embedding is not none %}
ORDER BY embedding <=> $3
{%- endif %}
LIMIT $2
"""
).render(table_name="github_issue_summaries", embedding=embedding)
return await conn.fetch(sql_query, *args)
复制
然后通过在execute
调用上运行我们的原始函数来验证其是否有效SearchSummary
,如下所示,我们传入了一个拼写错误的存储库名称kuberntes
。
repos = [
"rust-lang/rust",
"kubernetes/kubernetes",
"apache/spark",
"golang/go",
"tensorflow/tensorflow",
]
query = (
"What are the main problems people are facing with installation with Kubernetes"
)
conn = await get_conn()
limit = 10
resp = await SearchSummaries.model_validate_json(
json.dumps({"query": query, "repo": "kuberntes"}),
context={"repos": repos},
).execute(conn, limit)
for row in resp[:3]:
print(row["text"])
await conn.close()
复制
简而言之,通过利用 SQL 进行嵌入搜索,您可以轻松地将问题查询与元数据过滤器和复杂连接相结合,从而显著提高搜索的相关性和速度。
这种方法可以让您从大量 GitHub 数据中快速提取有意义的见解,从而简化问题管理和决策流程。
实现文本到 SQL
我们的应用程序的最后一步还涉及构建一个文本到 SQL(Text2SQL)工具,作为更复杂查询的综合工具。
开发有效的 Text2SQL 代理对于将自然语言查询转换为精确的数据库操作至关重要。
在本节中,我们将通过查看为 TimescaleDB 特定查询生成开发的提示来回顾开发这些代理的一些技巧。
使用详细提示
虽然简洁的提示看起来很有吸引力,但我们发现详细的提示是解锁准确 SQL 生成的关键。原因如下:
- 丰富的背景:详细的提示为您的 AI 模型提供了全面的背景,确保它掌握特定数据库模式和要求的细微差别。
- 领域特定知识:通过包含有关数据库结构、表关系和 TimescaleDB 特定函数(如 time_bucket)的详细信息,您可以为模型配备必要的领域知识。
- 明确的界限:明确的指示和约束为模型创建了一个框架,防止常见的陷阱并确保遵守最佳实践。
请考虑一下我们的文本到 SQL 提示中的这段摘录:
You are a SQL expert tasked with writing queries including a time attribute for the relevant table. The user wants to execute a query for the following repos: {self.repos} to answer the query of {self.user_query_summary}.
Key guidelines:
- Use the `repo_name` column for repository filtering.
- Employ the `time_bucket` function for time-based partitioning when specified.
- The `metadata` field is currently empty, so do not use it.
- Use the `issue_label` column in `github_issue_summaries` to determine issue status.
-- Database Information Below--
复制
这些指南有助于防止我们在使用此提示测试代理时看到的一些故障模式。
- 不存在的元数据字段:如果您有有用的元数据信息,则应指出。否则,请确保明确告知模型不要使用元数据进行过滤。
- 幻觉字段:通过提示模型查看github_issue_summaries表格以确定问题的状态,我们帮助防止了模型幻觉issue_label表格上字段的其他情况github_issue。当我们要求它计算诸如已关闭或未解决的问题数量之类的事情时,这种情况经常发生。
- 创建自定义函数:TimescaleDB 的time_bucket功能对于获取任意周期非常有用,应该用于自定义的手动 PostgreSQL 函数。明确提供time_bucket在指定间隔时使用该函数进行分区的指令有助于防止潜在的错误实现。
提供详细的架构
如果没有关于每列应该代表什么的详细而简洁的摘要,语言模型可能很难直观地了解每列或每表应该如何使用。
有了丰富的架构描述,模型可以在构建查询时做出更明智的决策。让我们从上面的提示中得出以下要点;知道github_issue_summaries表包含label类型的列issue_label允许模型将其用于基于状态的查询:
Use the `issue_label` column in `github_issue_summaries` to determine issue status.
复制
现在所有东西都已连接好,我们有无数的选项可供选择。
一个简单的第一步是生成一些 SQL 查询,执行它,然后使用返回的数据来回答用户的查询。
另一个简单的第一步是生成 SQL 查询,执行它,然后使用从我们的数据库返回的数据来生成对用户查询的响应。由于对象Pydantic
只是代码,因此测试单个函数变得非常容易。我们可以将这些原语用作 LLM 的一部分或作为另一个开发人员使用。
我们可能希望将这些 AI 工具用作数据分析师工作流程的一部分。我们可以将它们放在 Jupyter 笔记本中,返回 pandas 对象,然后继续进行数据分析。我们甚至可以考虑构建一个缓存层来解析查询并在以后重复使用它!这些现在是人类和 AI 系统可以互换使用的工具!
汇总整理
通过使用 OpenAI 的并行工具调用 API,我们可以公开一套文本到 SQL 工具及其语义搜索工具,以构建基于 GitHub 问题的问答系统。
通过很好地分离关注点,我们可以在原型设计过程中分别评估工具选择和实现。
让我们通过让模型快速总结用户在kubernetes/kubernetes安装过程中遇到的问题,看看这在实践中是如何实现的。为此,我们将定义一个函数,该函数可以在执行模型所选工具时总结检索到的结果。
我们将使用instructor并从数据库中检索到的相关问题的文本块来实现这一点。
import instructor from pydantic import BaseModel from asyncpg import Record from typing import Optional from jinja2 import Template from openai import OpenAI class Summary(BaseModel): chain_of_thought: str summary: str def summarize_content(issues: list[Record], query: Optional[str]): client = instructor.from_openai(OpenAI()) return client.chat.completions.create( messages=[ { "role": "system", "content": """You're a helpful assistant that summarizes information about issues from a github repository. Be sure to output your response in a single paragraph that is concise and to the point.""", }, { "role": "user", "content": Template( """ Here are the relevant issues: {% for issue in issues %} - {{ issue['text'] }} {% endfor %} {% if query %} My specific query is: {{ query }} {% else %} Please provide a broad summary and key insights from the issues above. {% endif %} """ ).render(issues=issues, query=query), }, ], response_model=Summary, model="gpt-4o-mini", )
复制
我们还需要修改我们的单步代理,以便它可以访问验证上下文来验证提取的存储库。
def one_step_agent(question: str, repos: list[str]):
client = instructor.from_openai(OpenAI(), mode=instructor.Mode.PARALLEL_TOOLS)
return client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": "You are an AI assistant that helps users query and analyze GitHub issues stored in a PostgreSQL database. Search for summaries when the user wants to understand the trends or patterns within a project. Otherwise just get the issues and return them. Only resort to SQL queries if the other tools are not able to answer the user's query.",
},
{
"role": "user",
"content": Template(
"""
Here is the user's question: {{ question }}
Here is a list of repos that we have stored in our database. Choose the one that is most relevant to the user's query:
{% for repo in repos %}
- {{ repo }}
{% endfor %}
"""
).render(question=question, repos=repos),
},
],
validation_context={"repos": repos},
response_model=Iterable[
Union[
RunSQLReturnPandas,
SearchIssues,
SearchSummaries,
]
],
)
复制
现在让我们通过查看如何在数据库中汇总有关 Kubernetes 安装的信息来了解此功能的实际作用。
kubernetes/kubernetes
当你运行此代码时,你将获得人们在使用不同 pod 时所面临的 repo 挑战的摘要。
以下是输出的示例:
query = "What are the main issues people face with endpoint connectivity between different pods in kubernetes?"
repos = [
"rust-lang/rust",
"kubernetes/kubernetes",
"apache/spark",
"golang/go",
"tensorflow/tensorflow",
"MicrosoftDocs/azure-docs",
"pytorch/pytorch",
"Microsoft/TypeScript",
"python/cpython",
"facebook/react",
"django/django",
"rails/rails",
"bitcoin/bitcoin",
"nodejs/node",
"ocaml/opam-repository",
"apache/airflow",
"scipy/scipy",
"vercel/next.js",
]
resp = one_step_agent(query, repos)
conn = await get_conn()
limit = 10
tools = [tool for tool in resp]
print(tools)
#> [SearchSummaries(query='endpoint connectivity pods kubernetes', repo='kubernetes/kubernetes')]
result = await tools[0].execute(conn, limit)
summary = summarize_content(result, query)
print(summary.summary)
#> Users face endpoint connectivity issues in Kubernetes potentially due to networking setup errors with plugins like Calico, misconfigured network interfaces, and lack of clear documentation that hinders proper setup and troubleshooting. Proper handling of these aspects is essential for ensuring connectivity between different pods.
复制
不同 pod 之间的端点连接面临的主要问题包括网络延迟、通信故障、网络策略配置错误、DNS 解析问题、服务发现中的挑战、导致限制的资源限制以及多集群通信的复杂性。
这是可能的,因为我们专注于从下至上的方式开发整个应用程序,从强大的评估套件开始,以验证工具的使用情况,然后再进行实施。
RAG 应用程序的未来
在本文中,我们将 PostgreSQL 与 pgvectorscale 和 pgvector 结合起来,创建了一个可以回答复杂用户查询的 RAG 系统。然后,我们演示了如何使用 Instructor 测试函数调用功能,然后介绍了一些文本到 SQL 生成的最佳实践。
在 Timescale,我们致力于让 PostgreSQL 成为 AI 构建者更好的数据库,提供构建和改进 RAG 系统所需的所有功能。订阅我们的时事通讯,第一时间了解此类新教育内容和新功能,帮助您使用 PostgreSQL 构建 AI 应用程序。如果您对这项使命感兴趣,我们正在招聘。
您会注意到我们的代码中关于提取和嵌入创建的样板。Timescale AI 工程团队正在积极努力让这变得更容易,因此请关注我们在未来几周发布的激动人心的公告。
在后续文章中,我们将介绍如何利用高级技术,例如合成数据生成和自动元数据生成。使用 pgvectorscale 增强 pgvector 和 PostgreSQL 以适应这些用例,将使您能够构建更快、更具可扩展性的 AI 应用程序。
最后,如果您正在构建 RAG 应用程序,这里有我们为您构建的一些内容。
Pgvectorscale为 pgvector 带来了高性能搜索和可扩展性。它是 PostgreSQL 许可下的开源产品。
Pgai使 LLM 更贴近您的数据,从而能够在 PostgreSQL 中嵌入创建和 LLM 推理。(也是根据 PostgreSQL 许可证开源的)。
如果您想花更多时间改进 RAG 应用程序,而减少管理数据库的时间,请尝试Timescale Cloud。每个数据库都带有 pgvector、pgvectorscale 和 pgai,并支持我们在本文中讨论的所有 RAG 改进方法。