暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

万字长文讲透 RAG 在实际落地场景中的优化

EosphorosAI 2024-12-31
110


在过去两年中,检索增强生成(RAG,Retrieval-Augmented Generation)技术逐渐成为提升智能体的核心组成部分。通过结合检索与生成的双重能力,RAG能够引入外部知识,从而为大模型在复杂场景中的应用提供更多可能性。但是在实际落地场景中,往往会存在检索准确率低,噪音干扰多,召回完整性,专业性不够,导致LLM幻觉严重的问题。本文将聚焦RAG在实际落地场景中的知识加工和检索细节,如何去优化RAG Pineline链路,最终提升召回准确率。


快速搭建一个RAG智能问答应用很简单,但是在实际业务场景落地还需要做大量的准备工作。

 RAG关键流程源码解读

主要分为知识加工RAG部分关键流程:

1. 知识加工

知识加载 -> 知识切片 -> 信息抽取 -> 知识加工(embedding/graph/keywords) -> 知识存储

  • 知识加载

# 知识工厂进行实例化
KnowledgeFactory -> create() -> load() -> Document
- knowledge
- markdown
- pdf
- docx
- txt
- html
- pptx
- url
- ...
复制

如何扩展:

    class Knowledge(ABC):
        def load(self) -> List[Document]:
            """Load knowledge from data loader."""
        @classmethod
        def document_type(cls) -> Any:
            """Get document type."""
        def support_chunk_strategy(cls) -> List[ChunkStrategy]:
            """Return supported chunk strategy."""
            return [
                ChunkStrategy.CHUNK_BY_SIZE,
                ChunkStrategy.CHUNK_BY_PAGE,
                ChunkStrategy.CHUNK_BY_PARAGRAPH,
                ChunkStrategy.CHUNK_BY_MARKDOWN_HEADER,
                ChunkStrategy.CHUNK_BY_SEPARATOR,
            ]
        @classmethod
        def default_chunk_strategy(cls) -> ChunkStrategy:
            """Return default chunk strategy.


            Returns:
                ChunkStrategy: default chunk strategy
            """
            return ChunkStrategy.CHUNK_BY_SIZE
    复制
    • 知识切片


    • ChunkManager: 通过加载后的知识数据,根据用户指定的分片策略和分片参数路由到对应的分片处理器进行分配。

      class ChunkManager:
          """Manager for chunks."""


          def __init__(
              self,
              knowledge: Knowledge,
              chunk_parameter: Optional[ChunkParameters] = None,
              extractor: Optional[Extractor] = None,
          ):
              """Create a new ChunkManager with the given knowledge.


              Args:
                  knowledge: (Knowledge) Knowledge datasource.
                  chunk_parameter: (Optional[ChunkParameter]) Chunk parameter.
                  extractor: (Optional[Extractor]) Extractor to use for summarization.
              """
              self._knowledge = knowledge


              self._extractor = extractor
              self._chunk_parameters = chunk_parameter or ChunkParameters()
              self._chunk_strategy = (
                  chunk_parameter.chunk_strategy
                  if chunk_parameter and chunk_parameter.chunk_strategy
                  else self._knowledge.default_chunk_strategy().name
              )
              self._text_splitter = self._chunk_parameters.text_splitter
              self._splitter_type = self._chunk_parameters.splitter_type
      复制

      如何扩展:如果你想在界面上自定义一个新的分片策略

      • 新增切片策略

      • 新增Splitter实现逻辑

        class ChunkStrategy(Enum):
            """Chunk Strategy Enum."""


            CHUNK_BY_SIZE: _STRATEGY_ENUM_TYPE = (
                RecursiveCharacterTextSplitter,
                [
                    {
                        "param_name""chunk_size",
                        "param_type""int",
                        "default_value"512,
                        "description""The size of the data chunks used in processing.",
                    },
                    {
                        "param_name""chunk_overlap",
                        "param_type""int",
                        "default_value"50,
                        "description""The amount of overlap between adjacent data chunks.",
                    },
                ],
                "chunk size",
                "split document by chunk size",
            )
            CHUNK_BY_PAGE: _STRATEGY_ENUM_TYPE = (
                PageTextSplitter,
                [],
                "page",
                "split document by page",
            )
            CHUNK_BY_PARAGRAPH: _STRATEGY_ENUM_TYPE = (
                ParagraphTextSplitter,
                [
                    {
                        "param_name""separator",
                        "param_type""string",
                        "default_value""\\n",
                        "description""paragraph separator",
                    }
                ],
                "paragraph",
                "split document by paragraph",
            )
            CHUNK_BY_SEPARATOR: _STRATEGY_ENUM_TYPE = (
                SeparatorTextSplitter,
                [
                    {
                        "param_name""separator",
                        "param_type""string",
                        "default_value""\\n",
                        "description""chunk separator",
                    },
                    {
                        "param_name""enable_merge",
                        "param_type""boolean",
                        "default_value"False,
                        "description""Whether to merge according to the chunk_size after "
                        "splitting by the separator.",
                    },
                ],
                "separator",
                "split document by separator",
            )
            CHUNK_BY_MARKDOWN_HEADER: _STRATEGY_ENUM_TYPE = (
                MarkdownHeaderTextSplitter,
                [],
                "markdown header",
                "split document by markdown header",
            )


        复制
        • 知识抽取

        •   向量抽取 -> embedding, 实现Embeddings
          接口

          @abstractmethod
              def embed_documents(self, texts: List[str]) -> List[List[float]]:
                  """Embed search docs."""


              @abstractmethod
              def embed_query(self, text: str) -> List[float]:
                  """Embed query text."""


              async def aembed_documents(self, texts: List[str]) -> List[List[float]]:
                  """Asynchronous Embed search docs."""
                  return await asyncio.get_running_loop().run_in_executor(
                      None, self.embed_documents, texts
                  )


              async def aembed_query(self, text: str) -> List[float]:
                  """Asynchronous Embed query text."""
                  return await asyncio.get_running_loop().run_in_executor(
                      None, self.embed_query, text
                  )
          复制
                # EMBEDDING_MODEL=proxy_openai
            # proxy_openai_proxy_server_url=https://api.openai.com/v1
            # proxy_openai_proxy_api_key={your-openai-sk}
            # proxy_openai_proxy_backend=text-embedding-ada-002




            ## qwen embedding model, See dbgpt/model/parameter.py
            # EMBEDDING_MODEL=proxy_tongyi
            # proxy_tongyi_proxy_backend=text-embedding-v1
            # proxy_tongyi_proxy_api_key={your-api-key}


            ## qianfan embedding model, See dbgpt/model/parameter.py
            #EMBEDDING_MODEL=proxy_qianfan
            #proxy_qianfan_proxy_backend=bge-large-zh
            #proxy_qianfan_proxy_api_key={your-api-key}
            #proxy_qianfan_proxy_api_secret={your-secret-key}
            复制
            • 知识图谱抽取 -> knowledge graph
              class TripletExtractor(LLMExtractor):
                  """TripletExtractor class."""


                  def __init__(self, llm_client: LLMClient, model_name: str):
                      """Initialize the TripletExtractor."""
                      super().__init__(llm_client, model_name, TRIPLET_EXTRACT_PT)


              TRIPLET_EXTRACT_PT = (
                  "Some text is provided below. Given the text, "
                  "extract up to knowledge triplets as more as possible "
                  "in the form of (subject, predicate, object).\n"
                  "Avoid stopwords. The subject, predicate, object can not be none.\n"
                  "---------------------\n"
                  "Example:\n"
                  "Text: Alice is Bob's mother.\n"
                  "Triplets:\n(Alice, is mother of, Bob)\n"
                  "Text: Alice has 2 apples.\n"
                  "Triplets:\n(Alice, has 2, apple)\n"
                  "Text: Alice was given 1 apple by Bob.\n"
                  "Triplets:(Bob, gives 1 apple, Bob)\n"
                  "Text: Alice was pushed by Bob.\n"
                  "Triplets:(Bob, pushes, Alice)\n"
                  "Text: Bob's mother Alice has 2 apples.\n"
                  "Triplets:\n(Alice, is mother of, Bob)\n(Alice, has 2, apple)\n"
                  "Text: A Big monkey climbed up the tall fruit tree and picked 3 peaches.\n"
                  "Triplets:\n(monkey, climbed up, fruit tree)\n(monkey, picked 3, peach)\n"
                  "Text: Alice has 2 apples, she gives 1 to Bob.\n"
                  "Triplets:\n"
                  "(Alice, has 2, apple)\n(Alice, gives 1 apple, Bob)\n"
                  "Text: Philz is a coffee shop founded in Berkeley in 1982.\n"
                  "Triplets:\n"
                  "(Philz, is, coffee shop)\n(Philz, founded in, Berkeley)\n"
                  "(Philz, founded in, 1982)\n"
                  "---------------------\n"
                  "Text: {text}\n"
                  "Triplets:\n"
              )
              复制
                • 排索引抽取 -> keywords分词
                  • 可以用es默认的分词库,也可以使用es的插件模式自定义分词
                  • 知识存储
                  整个知识持久化统一实现了IndexStoreBase
                  接口,目前提供了向量数据库、图数据库、全文索引三类实现
                  • VectorStore,向量数据库主要逻辑都在load_document(),包括索引schema创建,向量数据分批写入等等。

                    - VectorStoreBase
                        - ChromaStore
                        - MilvusStore
                        - OceanbaseStore
                        - ElasticsearchStore
                        - PGVectorStore


                    class VectorStoreBase(IndexStoreBase, ABC):
                        """Vector store base class."""


                        @abstractmethod
                        def load_document(self, chunks: List[Chunk]) -> List[str]:
                            """Load document in index database."""
                        @abstractmethod
                        async def aload_document(self, chunks: List[Chunk]) -> List[str]:
                            """Load document in index database."""


                        @abstractmethod
                        def similar_search_with_scores(
                            self,
                            text,
                            topk,
                            score_threshold: float,
                            filters: Optional[MetadataFilters] = None,
                        ) -> List[Chunk]:
                            """Similar search with scores in index database."""
                        def similar_search(
                            self, text: str, topk: int, filters: Optional[MetadataFilters] = None
                        ) -> List[Chunk]:
                            return self.similar_search_with_scores(text, topk, 1.0, filters)
                    复制
                      • GraphStore ,具体的图存储提供了三元组写入的实现,一般会调用具体的图数据库的查询语言来完成。例如TuGraphStore
                        会根据三元组生成具体的Cypher语句并执行。

                        • 图存储接口GraphStoreBase提供统一的图存储抽象,目前内置了MemoryGraphStore
                          TuGraphStore
                          的实现,我们也提供Neo4j接口给开发者进行接入。

                        - GraphStoreBase
                            - TuGraphStore
                            - Neo4jStore


                        def insert_triplet(self, subj: str, rel: str, obj: str) -> None:
                            """Add triplet."""
                            ...TL;DR...
                            subj_query = f"MERGE (n1:{self._node_label} {{id:'{subj}'}})"
                            obj_query = f"MERGE (n1:{self._node_label} {{id:'{obj}'}})"
                            rel_query = (
                                f"MERGE (n1:{self._node_label} {{id:'{subj}'}})"
                                f"-[r:{self._edge_label} {{id:'{rel}'}}]->"
                                f"(n2:{self._node_label} {{id:'{obj}'}})"
                            )
                            self.conn.run(query=subj_query)
                            self.conn.run(query=obj_query)
                            self.conn.run(query=rel_query)
                        复制
                          • FullTextStore: 通过构建es索引,通过es内置分词算法进行分词,然后由es构建keyword->doc_id的倒排索引。

                          {
                                      "analysis": {"analyzer": {"default": {"type""standard"}}},
                                      "similarity": {
                                          "custom_bm25": {
                                              "type""BM25",
                                              "k1"self._k1,
                                              "b"self._b,
                                          }
                                      },
                                  }
                                  self._es_mappings = {
                                      "properties": {
                                          "content": {
                                              "type""text",
                                              "similarity""custom_bm25",
                                          },
                                          "metadata": {
                                              "type""keyword",
                                          },
                                      }
                                  }
                              - FullTextStoreBase
                              - ElasticDocumentStore
                              - OpenSearchStore   
                          复制

                          2.知识检索

                          question -> rewrite -> similarity_search -> rerank -> context_candidates

                          接下来是知识检索,目前社区的检索逻辑主要分为这几步,如果你设置了查询改写参数,目前会通过大模型给你进行一轮问题改写,然后会根据你的知识加工方式路由到对应的检索器,如果你是通过向量进行加工的,那就会通过EmbeddingRetriever进行检索,如果你构建方式是通过知识图谱构建的,就会按照知识图谱方式进行检索,如果你设置了rerank模型,会给粗筛后的候选值进行精筛,让候选值和用户问题更有关联。

                          •  EmbeddingRetriever

                            class EmbeddingRetriever(BaseRetriever):
                                """Embedding retriever."""


                                def __init__(
                                    self,
                                    index_store: IndexStoreBase,
                                    top_k: int = 4,
                                    query_rewrite: Optional[QueryRewrite] = None,
                                    rerank: Optional[Ranker] = None,
                                    retrieve_strategy: Optional[RetrieverStrategy] = RetrieverStrategy.EMBEDDING,
                                ):




                                async def _aretrieve_with_score(
                                    self,
                                    query: str,
                                    score_threshold: float,
                                    filters: Optional[MetadataFilters] = None,
                                ) -> List[Chunk]:
                                    """Retrieve knowledge chunks with score.


                                    Args:
                                        query (str): query text
                                        score_threshold (float): score threshold
                                        filters: metadata filters.
                                    Return:
                                        List[Chunk]: list of chunks with score
                                    """
                                    queries = [query]
                                    new_queries = await self._query_rewrite.rewrite(
                                                origin_query=query, context=context, nums=1
                                            )
                                            queries.extend(new_queries)
                                    candidates_with_score = [
                                            self._similarity_search_with_score(
                                                query, score_threshold, filters, root_tracer.get_current_span_id()
                                            )
                                            for query in queries
                                        ]
                                        ...


                                    new_candidates_with_score = await self._rerank.arank(
                                            new_candidates_with_score, query
                                        )
                                        return new_candidates_with_score
                            复制
                              • index_store: 具体的向量数据库

                              • top_k: 返回的具体候选chunk个数

                              • query_rewrite:查询改写函数

                              • rerank:重排序函数

                              • query:原始查询

                              • score_threshold:得分,我们默认会把相似度得分小于阈值的上下文信息给过滤掉

                              • filters:Optional[MetadataFilters]
                                , 元数据信息过滤器,主要是可以用来前置通过属性信息筛掉一些不匹配的候选信息。

                            class FilterCondition(str, Enum):
                               """Vector Store Meta data filter conditions."""
                               AND ="and"
                               OR ="or"
                            class MetadataFilter(BaseModel):
                               """Meta data filter."""

                               key: str = Field(
                                   ...,
                                   description="The key of metadata to filter.",
                               )
                               operator: FilterOperator = Field(
                                   default=FilterOperator.EQ,
                                   description="The operator of metadata filter.",
                               )
                               value: Union[str, int,float, List[str], List[int], List[float]] = Field(
                                   ...,
                                   description="The value of metadata to filter.",
                               )
                            复制
                            • Graph RAG       

                            首先通过模型进行关键词抽取,这里可以通过传统的nlp技术进行分词,也可以通过大模型进行分词,然后进行关键词按照同义词做扩充,找到关键词的候选列表,最好根据关键词候选列表调用explore方法召回局部子图。

                              KEYWORD_EXTRACT_PT = (
                                  "A question is provided below. Given the question, extract up to "
                                  "keywords from the text. Focus on extracting the keywords that we can use "
                                  "to best lookup answers to the question.\n"
                                  "Generate as more as possible synonyms or alias of the keywords "
                                  "considering possible cases of capitalization, pluralization, "
                                  "common expressions, etc.\n"
                                  "Avoid stopwords.\n"
                                  "Provide the keywords and synonyms in comma-separated format."
                                  "Formatted keywords and synonyms text should be separated by a semicolon.\n"
                                  "---------------------\n"
                                  "Example:\n"
                                  "Text: Alice is Bob's mother.\n"
                                  "Keywords:\nAlice,mother,Bob;mummy\n"
                                  "Text: Philz is a coffee shop founded in Berkeley in 1982.\n"
                                  "Keywords:\nPhilz,coffee shop,Berkeley,1982;coffee bar,coffee house\n"
                                  "---------------------\n"
                                  "Text: {text}\n"
                                  "Keywords:\n"
                              )


                              def explore(
                                  self,
                                  subs: List[str],
                                  direct: Direction = Direction.BOTH,
                                  depth: Optional[int] = None,
                                  fan: Optional[int] = None,
                                  limit: Optional[int] = None,
                              ) -> Graph:
                                  """Explore on graph."""
                              复制
                              • DBSchemaRetriever
                                这部分是ChatData场景的schema-linking检索

                                主要是通过schema-linking方式通过二阶段相似度检索,首先先找到最相关的表,然后再最相关的字段信息。

                                优点:这种二阶段检索也是为了解决社区反馈的大宽表体验的问题。

                                    def _similarity_search(
                                        self, query, filters: Optional[MetadataFilters] = None
                                    ) -> List[Chunk]:
                                        """Similar search."""
                                        table_chunks = self._table_vector_store_connector.similar_search_with_scores(
                                            query, self._top_k, 0, filters
                                        )


                                        not_sep_chunks = [
                                            chunk for chunk in table_chunks if not chunk.metadata.get("separated")
                                        ]
                                        separated_chunks = [
                                            chunk for chunk in table_chunks if chunk.metadata.get("separated")
                                        ]
                                        if not separated_chunks:
                                            return not_sep_chunks


                                        # Create tasks list
                                        tasks = [
                                            lambda c=chunk: self._retrieve_field(c, query) for chunk in separated_chunks
                                        ]
                                        # Run tasks concurrently
                                        separated_result = run_tasks(tasks, concurrency_limit=3)


                                        # Combine and return results
                                        return not_sep_chunks + separated_result
                                复制

                                  • table_vector_store_connector: 负责检索最相关的表。

                                  • field_vector_store_connector: 负责检索最相关的字段。

                                 知识加工,知识检索优化思路

                                目前RAG智能问答应用几个痛点:

                                • 知识库文档越来越多以后,检索噪音大,召回准确率不高

                                • 召回不全,完整性不够

                                • 召回和用户问题意图相关性不大

                                • 只能回答静态数据,无法动态获取知识,导致答疑应用比较呆,比较笨。

                                1. 知识处理优化

                                非结构化/半结构化/结构化数据的处理,准备决定着RAG应用的上限,因此首先需要在知识处理,索引阶段做大量的细粒度的ETL工作,主要优化的思路方向:

                                • 非结构化 -> 结构化:有条理地组织知识信息。

                                • 提取更加丰富的, 多元化的语义信息。

                                 1.1 知识加载 

                                目的:需要对文档进行精确的解析,更多元化的识别到不同类型的数据。

                                优化建议:

                                • 建议将docx、txt或者其他文本事先处理为pdf或者markdown格式,这样可以利用一些识别工具更好地提取文本中的各项内容。

                                • 提取文本中的表格信息。

                                • 保留markdown和pdf的标题层级信息,为接下来的层级关系树等索引方式准备。

                                • 保留图片链接,公式等信息,也统一处理成markdown的格式。

                                 1.2 切片Chunk尽量保持完整 

                                目的:保存上下文完整性和相关性,这直接关乎回复准确率。

                                保持在大模型的上下文限制内,分块保证输入到LLMs的文本不会超过其token限制。

                                优化建议:

                                • 图片 + 表格 单独抽取成Chunk,将表格和图片标题保留到metadata元数据里

                                • 文档内容尽量按照标题层级或者Markdown Header进行拆分,尽可能保留chunk的完整性。

                                • 如果有自定义分隔符可以按照自定义分割符切分

                                 1.3 多元化的信息抽取 

                                除了对文档进行Embedding向量抽取外,其他多元化的信息抽取能够对文档进行数据增强,显著提升RAG召回效果。

                                • 知识图谱

                                      • 优点:1. 解决NativeRAG的完整性缺失,依然存在幻觉问题,知识的准确性,包括知识边界的完整性、知识结构和语义的清晰性,是对相似度检索的能力的一种语义补充。

                                      • 适用场景:适用于严谨的专业领域(医疗,运维等),知识的准备需要受到约束的并且知识之间能够明显建立层级关系的。

                                      • 如何实现:

                                           1.依赖大模型提取(实体,关系,实体)三元组关系。

                                           2. 依赖前期高质量,结构化的知识准备,清洗,抽取,通过业务规则通过手动或者自定义SOP流程构建知识图谱。

                                  • Doc Tree

                                      • 适用场景:解决了上下文完整性不足的问题,也能匹配时完全依据语义和关键词,能够减少噪音

                                      • 如何实现:以标题层级构建chunk的树形节点,形成一个多叉树结构,每一层级节点只需要存储文档标题,叶子节点存储具体的文本内容。这样利用树的遍历算法,如果用户问题命中相关非叶子标题节点,就可以将相关的子节点数据进行召回。这样就不会存在chunk完整性缺失的问题。

                                  这部分的Feature我们也会在明年年初放到社区里面。

                                  • 提取QA对,需要前置通过预定义或者模型抽取的方式提取QA对信息
                                    • 适用场景:
                                      • 能够在检索中命中问题并直接进行召回,直接检索到用户想要的答案,适用于一些FAQ场景,召回完整性不够的场景。
                                    • 如何实现:
                                      • 预定义:预先为每个chunk添加一些问题
                                      • 模型抽取:通过给定一下上下文,让模型进行QA对抽取
                                  • 元数据抽取
                                    • 如何实现:根据自身业务数据特点,提取数据的特征进行保留,比如标签,类别,时间,版本等元数据属性。
                                    • 适用场景:检索时候能够预先根据元数据属性进行过滤掉大部分噪音。
                                  • 总结提取
                                    • 适用场景:解决这篇文章讲了个啥
                                      总结一下
                                      等全局问题场景。
                                    • 如何实现:通过mapreduce等方式分段抽取,通过模型为每段chunk提取摘要信息。

                                   1.4 知识处理工作流 

                                  目前DB-GPT知识库提供了文档上传 -> 解析 -> 切片 -> Embedding -> 知识图谱三元组抽取 -> 向量数据库存储 -> 图数据库存储等知识加工的能力,但是不具备对文档进行复杂的个性化的信息抽取能力,因此希望通过构建知识加工工作流模版来完成复杂的,可视化的,用户可自定义的知识抽取,转换,加工流程。

                                  知识加工工作流:

                                  https://www.yuque.com/eosphoros/dbgpt-docs/vg2gsfyf3x9fuglf
                                  2. RAG流程优化
                                  RAG流程的优化我们又分为了静态文档的RAG和动态数据获取的RAG,目前大部分涉及到的RAG只覆盖了非结构化的文档静态资产,但是实际业务很多场景的问答是通过工具获取动态数据 + 静态知识数据共同回答的场景,不仅需要检索到静态的知识,同时需要RAG检索到工具资产库里面工具信息并执行获取动态数据。

                                   2.1 静态知识RAG优化 

                                  (1)原始问题处理

                                  目的:澄清用户语义,将用户的原始问题从模糊的,意图不清晰的查询优化为含义更丰富的一个可检索的Query

                                  • 原始问题分类,通过问题分类可以
                                    • LLM分类(LLMExtractor
                                      )
                                    • 构建embedding+逻辑回归实现双塔模型,text2nlu DB-GPT-Hub/src/dbgpt-hub-nlu/README.zh.md at main · eosphoros-ai/DB-GPT-Hub
                                      • tip:需要高质量的Embedding模型,推荐bge-v1.5-large
                                  • 反问用户,如果语义不清晰将问题再抛给用户进行问题澄清,通过多轮交互
                                    • 通过热搜词库根据语义相关性给用户推荐他想要的问题候选列表
                                  • 槽位提取,目的是获取用户问题中的关键slot信息,比如意图,业务属性等等
                                    • LLM提取(LLMExtractor
                                      )
                                  • 问题改写
                                    • 热搜词库进行改写
                                    • 多轮交互

                                  (2)元数据过滤

                                  当我们把索引分成许多chunks并且都存储在相同的知识空间里面,检索效率会成为问题。比如用户问"浙江我武科技公司"相关信息时,并不想召回其他公司的信息。因此,如果可以通过公司名称元数据属性先进行过滤,就会大大提升效率和相关度。

                                    async def aretrieve(
                                        self, query: str, filters: Optional[MetadataFilters] = None
                                    ) -> List[Chunk]:
                                        """Retrieve knowledge chunks.


                                            Args:
                                                query (str): async query text.
                                                filters: (Optional[MetadataFilters]) metadata filters.


                                            Returns:
                                                List[Chunk]: list of chunks
                                            """
                                        return await self._aretrieve(query, filters)
                                    复制

                                    (3) 多策略混合召回

                                    • 按照优先级召回,分别为不同的检索器定义优先级,检索到内容后立即返回

                                        • 定义不同检索,比如qa_retriever, doc_tree_retriever写入到队列里面, 通过队列的先进先出的特性实现优先级召回。

                                      class RetrieverChain(BaseRetriever):
                                          """Retriever chain class."""


                                          def __init__(
                                              self,
                                              retrievers: Optional[List[BaseRetriever]] = None,
                                              executor: Optional[Executor] = None,
                                          ):
                                              """Create retriever chain instance."""
                                              self._retrievers = retrievers or []
                                              self._executor = executor or ThreadPoolExecutor()
                                                  for retriever in self._retrievers:
                                                  candidates_with_scores = await retriever.aretrieve_with_scores(
                                                      query=query, score_threshold=score_threshold, filters=filters
                                                  )
                                                  if candidates_with_scores:
                                                      return candidates_with_scores


                                      复制
                                      • 多知识索引/空间并行召回

                                          • 通过知识的不同索引形式,通过并行召回方式获取候选列表,保证召回完整性

                                      (4) 后置过滤

                                      经过粗筛候选列表后,怎么通过精筛过滤噪音呢

                                      • 无关的候选分片剔除

                                          • 时效性剔除

                                          • 业务属性不满足剔除

                                      • topk去重

                                      • 重排序 仅仅靠粗筛的召回还不够,这时候我们需要有一些策略来对检索的结果做重排序,比如把组合相关度、匹配度等因素做一些重新调整,得到更符合我们业务场景的排序。因为在这一步之后,我们就会把结果送给LLM进行最终处理了,所以这一部分的结果很重要。

                                        • 使用相关重排序模型进行精筛,可以使用开源的模型,也可以使用带业务语义微调的模型。
                                          ## Rerank model
                                          #RERANK_MODEL=bce-reranker-base
                                          #### If you not set RERANK_MODEL_PATH, DB-GPT will read the model path from EMBEDDING_MODEL_CONFIG based on the RERANK_MODEL.
                                          #RERANK_MODEL_PATH=/Users/chenketing/Desktop/project/DB-GPT-NEW/DB-GPT/models/bce-reranker-base_v1
                                          #### The number of rerank results to return
                                          #RERANK_TOP_K=5
                                          复制
                                            • 根据不同索引召回的内容进行业务RRF加权综合打分剔除
                                            score = 0.0
                                            for q in queries:
                                                if d in result(q):
                                                    score += 1.0 / ( k + rankresult(q), d ) )
                                            return score


                                            where
                                            # k is a ranking constant
                                            # q is a query in the set of queries
                                            # d is a document in the result set of q
                                            result(q) is the result set of q
                                            rankresult(q), d ) is d's rank within the result(q) starting from 1
                                            复制

                                            (5) 显示优化+兜底话术/话题引导

                                            • 让模型使用markdown的格式进行输出

                                              基于以下给出的已知信息, 准守规范约束,专业、简要回答用户的问题.
                                              规范约束:
                                                  1.如果已知信息包含的图片、链接、表格、代码块等特殊markdown标签格式的信息,确保在答案中包含原文这些图片、链接、表格和代码标签,不要丢弃不要修改,
                                              :图片格式:![image.png](xxx), 链接格式:[xxx](xxx), 表格格式:|xxx|xxx|xxx|, 代码格式:```xxx```.
                                                  2.如果无法从提供的内容中获取答案, 请说: "知识库中提供的内容不足以回答此问题" 禁止胡乱编造.
                                                  3.回答的时候最好按照1.2.3.点进行总结, 并以markdwon格式显示.
                                              复制

                                               2.2 动态知识RAG优化 

                                              文档类知识是相对静态的,无法回答个性化以及动态的信息, 需要依赖一些第三方平台工具才可以回答,基于这种情况,我们需要一些动态RAG的方法,通过工具资产定义 -> 工具选择 -> 工具校验 -> 工具执行获取动态数据。

                                              (1) 工具资产库

                                              构建企业领域工具资产库,将散落到各个平台的工具API,工具脚本进行整合,进而提供智能体端到端的使用能力。比如,除了静态知识库以外,我们可以通过导入工具库的方式进行工具的处理

                                              (2) 工具召回

                                              工具召回沿用静态知识的RAG召回的思路,再通过完整的工具执行生命周期来获取工具执行结果。

                                              • 槽位提取:通过传统nlp获取LLM将用户问题进行解析,包括常用的业务类型,环境标,领域模型参数等等

                                              • 工具选择:沿用静态RAG的思路召回,主要有两层,工具名召回和工具参数召回。

                                                  • 工具参数召回,和TableRAG思路类似,先召回表名,再召回字段名。

                                              • 参数填充:需要根据召回的工具参数定义,和槽位提取出来的参数进行match

                                                  • 可以代码进行填充,也可以让模型进行填充。

                                                  • 优化思路:由于各个平台工具的同样的参数的参数名没有统一,也不方便去治理,建议可以先进行一轮领域模型数据扩充,拿到整个领域模型后,需要的参数都会存在。

                                              • 参数校验

                                                  • 完整性校验:进行参数个数完整性校验

                                                  • 参数规则校验:进行参数名类型,参数值,枚举等等规则校验。

                                              • 参数纠正/对齐,这部分主要是为了减少和用户的交互次数,自动化完成用户参数错误纠正,包括大小写规则,枚举规则等等。eg:

                                               2.3 RAG评测 

                                              在评估智能问答流程时,需要单独对召回相关性准确率以及模型问答的相关性进行评估,然后再综合考虑,以判断RAG流程在哪些方面仍需改进。

                                              评价指标:

                                                EvaluationMetric
                                                ├── LLMEvaluationMetric
                                                │   ├── AnswerRelevancyMetric
                                                ├── RetrieverEvaluationMetric
                                                │   ├── RetrieverSimilarityMetric
                                                │   ├── RetrieverMRRMetric
                                                │   └── RetrieverHitRateMetric
                                                复制
                                                • RAG
                                                  召回指标(RetrieverEvaluationMetric):

                                                    • RetrieverHitRateMetric
                                                      :命中率衡量的是RAGretriever
                                                      召回出现在检索结果前top-k个文档中的比例。

                                                    • RetrieverMRRMetric
                                                      :Mean Reciprocal Rank
                                                      通过分析最相关文档在检索结果里的排名来计算每个查询的准确性。更具体地说,它是所有查询的相关文档排名倒数的平均值。例如,若最相关的文档排在第一位,其倒数排名为 1;排在第二位时,为 1/2;以此类推。

                                                    • RetrieverSimilarityMetric
                                                      : 相似度指标计算,计算召回内容与预测内容的相似度。


                                                模型生成
                                                答案指标:


                                                • AnswerRelevancyMetric
                                                  :智能体答案相关性指标,通过智能体答案与用户提问的匹配程度。高相关性的答案不仅要求模型能够理解用户的问题,还要求其能够生成与问题密切相关的答案。这直接影响到用户的满意度和模型的实用性。

                                                 RAG落地案例分享

                                                1. 数据基础设施领域的RAG

                                                 1.1 运维智能体背景 

                                                在数据基础设施领域,有很多运维SRE,每天会接收到大量的告警,因此很多时间来需要响应应急事件,进而进行故障诊断,然后故障复盘,进而进行经验沉淀。另外一部分时间又需要响应用户咨询,需要他们用他们的知识以及工具使用经验进行答疑。


                                                因此我们希望通过打造一个数据基础设施的通用智能体来解决告警诊断,答疑的这些问题。

                                                 1.2 严谨专业的RAG 

                                                传统的 RAG + Agent 技术可以解决通用的,确定性没那么高的,单步任务场景。但是面对数据基础设施领域的专业场景,整个检索过程必须是确定,专业和真实的,并且是需要一步一步推理的。

                                                右边是一个通过NativeRAG的一个泛泛而谈的总结,可能对于一个C端的用户,对专业的领域知识没那么了解时,可能是有用的信息,然后对于专业的人员来说,这部分解答就没有什么意义了。


                                                因此我们比较了通用的智能体和数据基础设施智能体在RAG上面的区别:


                                                • 通用的智能体:传统的RAG对知识的严谨和专业性要求没那么高,适用于客服,旅游,平台答疑机器人这样的一些业务场景。

                                                • 数据基础设施智能体:RAG流程是严谨和专业的,需要专属的RAG工作流程,上下文包括(告警->定位->止血->恢复),并且需要对专家沉淀的问答和应急经验,进行结构化的抽取,建立层次关系。因此我们选择知识图谱来作为数据承载。

                                                 1.3 知识处理 

                                                基于数据基础设施的确定性和特殊性,我们选择通过结合知识图谱来作为诊断应急经验的知识承载。我们通过SRE沉淀下来的应急排查事件知识经验 结合应急复盘流程,建立了DB应急事件驱动的知识图谱,我们以DB抖动为例,影响DB抖动的几个事件,包括慢SQL问题,容量问题,我们在各个应急事件间建立了关系。


                                                最后通过我们通过规范化应急事件规则,一步一步地建立了多源的知识 -> 知识结构化抽取 ->应急关系抽取 -> 专家审核 -> 知识存储的一套标准化的知识加工体系。


                                                 1.4 知识检索 

                                                在智能体检索阶段,我们使用GraphRAG作为静态知识检索的承载,因此识别到DB抖动异常后,找到了与DB抖动异常节点相关的节点作为我们分析依据,由于在知识抽取阶段每一个节点还保留了每个事件的一些元数据信息,包括事件名,事件描述,相关工具,工具参数等等。


                                                因此我们可以通过执行工具的执行生命周期链路来获取返回结果拿到动态数据来作为应急诊断的排查依据。通过这种动静结合的混合召回的方式比纯朴素的RAG召回,保障了数据基础设施智能体执行的确定性,专业性和严谨性。

                                                 1.5 AWEL + Agent 

                                                最后通过社区AWEL+AGENT技术,通过AGENT编排的范式,打造了从意图专家-> 应急诊断专家 -> 诊断根因分析专家。


                                                每个Agent的职能都是不一样的,意图专家负责识别解析用户的意图和识别告警信息诊断专家需要通过GraphRAG 定位到需要分析的根因节点,以及获取具体的根因信息。分析专家需要结合各个根因节点的数据 + 历史分析复盘报告生成诊断分析报告

                                                2. 金融财报分析领域的RAG

                                                最新实践!如何基于 DB-GPT 搭建财报分析助手?

                                                可以围绕各自领域构建属于自己的领域资产库包括,知识资产,工具资产以及知识图谱资产

                                                • 领域资产:领域资产包括了知识库,API,工具脚本。

                                                • 资产处理,整个资产数据链路涉及了领域资产加工,领域资产检索和领域资产评估。

                                                    • 非结构化 -> 结构化:有条理地归类,正确地组织知识信息。

                                                    • 提取更加丰富的语义信息。

                                                • 资产检索:

                                                    • 希望是有层级,优先级的检索而并非单一的检索

                                                    • 后置过滤很重要,最好能通过业务语义一些规则进行过滤。


                                                文章转载自EosphorosAI,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                                评论