跳转至

智能检索增强生成(Agentic RAG)

在本教程中,我们将构建一个检索代理。当您希望大语言模型(LLM)决定是否从向量存储中检索上下文或直接响应用户时,检索代理非常有用。

在本教程结束时,我们将完成以下工作:

  1. 获取并预处理将用于检索的文档。
  2. 为语义搜索索引这些文档,并为代理创建检索工具。
  3. 构建一个智能RAG系统,能够决定何时使用检索工具。

Screenshot 2024-02-14 at 3.43.58 PM.png

设置

让我们下载所需的包并设置API密钥:

%%capture --no-stderr
%pip install -U --quiet langgraph "langchain[openai]" langchain-community langchain-text-splitters
import getpass
import os

def _set_env(key: str):
    if key not in os.environ:
        os.environ[key] = getpass.getpass(f"{key}:")

_set_env("OPENAI_API_KEY")

Tip

注册LangSmith以快速发现问题并提高LangGraph项目的性能。LangSmith让您能够使用跟踪数据来调试、测试和监控使用LangGraph构建的LLM应用。

1. 预处理文档

  1. 获取用于RAG系统的文档。我们将使用Lilian Weng的优秀博客中最新的三篇文章。我们将首先使用WebBaseLoader工具获取页面内容:

    from langchain_community.document_loaders import WebBaseLoader
    
    urls = [
        "https://lilianweng.github.io/posts/2024-11-28-reward-hacking/",
        "https://lilianweng.github.io/posts/2024-07-07-hallucination/",
        "https://lilianweng.github.io/posts/2024-04-12-diffusion-video/",
    ]
    
    docs = [WebBaseLoader(url).load() for url in urls]
    
    docs[0][0].page_content.strip()[:1000]
    
  2. 将获取的文档分割成较小的块,以便索引到我们的向量存储中:

    from langchain_text_splitters import RecursiveCharacterTextSplitter
    
    docs_list = [item for sublist in docs for item in sublist]
    
    text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
        chunk_size=100, chunk_overlap=50
    )
    doc_splits = text_splitter.split_documents(docs_list)
    
    doc_splits[0].page_content.strip()
    

2. 创建检索工具

现在我们已经有了分割后的文档,可以将它们索引到向量存储中,用于语义搜索。

  1. 使用内存向量存储和OpenAI嵌入:

    from langchain_core.vectorstores import InMemoryVectorStore
    from langchain_openai import OpenAIEmbeddings
    
    vectorstore = InMemoryVectorStore.from_documents(
        documents=doc_splits, embedding=OpenAIEmbeddings()
    )
    retriever = vectorstore.as_retriever()
    
  2. 使用LangChain预构建的create_retriever_tool创建检索工具:

    from langchain.tools.retriever import create_retriever_tool
    
    retriever_tool = create_retriever_tool(
        retriever,
        "retrieve_blog_posts",
        "Search and return information about Lilian Weng blog posts.",
    )
    
  3. 测试工具:

    retriever_tool.invoke({"query": "types of reward hacking"})
    

3. 生成查询

现在我们将开始构建代理RAG图的组件(节点)。

请注意,这些组件将操作 MessagesState — 包含一个 messages 键的图状态,该键包含一个聊天消息列表。

  1. 构建 generate_query_or_respond 节点。它将调用LLM根据当前图状态(消息列表)生成响应。根据输入消息,它将决定使用检索工具进行检索,或直接响应用户。请注意,我们通过 .bind_tools 给聊天模型提供了对之前创建的 retriever_tool 的访问权限:

    from langgraph.graph import MessagesState
    from langchain.chat_models import init_chat_model
    
    response_model = init_chat_model("openai:gpt-4.1", temperature=0)
    
    def generate_query_or_respond(state: MessagesState):
        """调用模型根据当前状态生成响应。根据
        问题,它将决定使用检索工具进行检索,或直接响应用户。
        """
        response = (
            response_model
            .bind_tools([retriever_tool]).invoke(state["messages"])
        )
        return {"messages": [response]}
    
  2. 在随机输入上尝试:

    input = {"messages": [{"role": "user", "content": "hello!"}]}
    generate_query_or_respond(input)["messages"][-1].pretty_print()
    

    输出:

    ================================== Ai Message ==================================
    
    Hello! How can I help you today?
    

  3. 提出一个需要语义搜索的问题:

    input = {
        "messages": [
            {
                "role": "user",
                "content": "Lilian Weng 关于奖励黑客的类型说了什么?",
            }
        ]
    }
    generate_query_or_respond(input)["messages"][-1].pretty_print()
    

    输出:

    ================================== Ai Message ==================================
    Tool Calls:
    retrieve_blog_posts (call_tYQxgfIlnQUDMdtAhdbXNwIM)
    Call ID: call_tYQxgfIlnQUDMdtAhdbXNwIM
    Args:
        query: types of reward hacking
    

4. 对文档进行评分

  1. 添加一个条件边grade_documents — 来确定检索到的文档是否与问题相关。我们将使用具有结构化输出模式 GradeDocuments 的模型进行文档评分。grade_documents 函数将根据评分决策(generate_answerrewrite_question)返回要前往的节点名称:

    from pydantic import BaseModel, Field
    from typing import Literal
    
    GRADE_PROMPT = (
        "You are a grader assessing relevance of a retrieved document to a user question. \n "
        "Here is the retrieved document: \n\n {context} \n\n"
        "Here is the user question: {question} \n"
        "If the document contains keyword(s) or semantic meaning related to the user question, grade it as relevant. \n"
        "Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question."
    )
    
    class GradeDocuments(BaseModel):
        """Grade documents using a binary score for relevance check."""
    
        binary_score: str = Field(
            description="Relevance score: 'yes' if relevant, or 'no' if not relevant"
        )
    
    grader_model = init_chat_model("openai:gpt-4.1", temperature=0)
    
    def grade_documents(
        state: MessagesState,
    ) -> Literal["generate_answer", "rewrite_question"]:
        """Determine whether the retrieved documents are relevant to the question."""
        question = state["messages"][0].content
        context = state["messages"][-1].content
    
        prompt = GRADE_PROMPT.format(question=question, context=context)
        response = (
            grader_model
            .with_structured_output(GradeDocuments).invoke(
                [{"role": "user", "content": prompt}]
            )
        )
        score = response.binary_score
    
        if score == "yes":
            return "generate_answer"
        else:
            return "rewrite_question"
    
  2. 使用工具响应中的不相关文档运行此代码:

    from langchain_core.messages import convert_to_messages
    
    input = {
        "messages": convert_to_messages(
            [
                {
                    "role": "user",
                    "content": "What does Lilian Weng say about types of reward hacking?",
                },
                {
                    "role": "assistant",
                    "content": "",
                    "tool_calls": [
                        {
                            "id": "1",
                            "name": "retrieve_blog_posts",
                            "args": {"query": "types of reward hacking"},
                        }
                    ],
                },
                {"role": "tool", "content": "meow", "tool_call_id": "1"},
            ]
        )
    }
    grade_documents(input)
    
  3. 确认相关文档被正确分类:

    input = {
        "messages": convert_to_messages(
            [
                {
                    "role": "user",
                    "content": "What does Lilian Weng say about types of reward hacking?",
                },
                {
                    "role": "assistant",
                    "content": "",
                    "tool_calls": [
                        {
                            "id": "1",
                            "name": "retrieve_blog_posts",
                            "args": {"query": "types of reward hacking"},
                        }
                    ],
                },
                {
                    "role": "tool",
                    "content": "reward hacking can be categorized into two types: environment or goal misspecification, and reward tampering",
                    "tool_call_id": "1",
                },
            ]
        )
    }
    grade_documents(input)
    

5. 重写问题

  1. 构建 rewrite_question 节点。检索工具可能会返回不相关的文档,这表明需要改进原始的用户问题。为此,我们将调用 rewrite_question 节点:

    REWRITE_PROMPT = (
        "Look at the input and try to reason about the underlying semantic intent / meaning.\n"
        "Here is the initial question:"
        "\n ------- \n"
        "{question}"
        "\n ------- \n"
        "Formulate an improved question:"
    )
    
    def rewrite_question(state: MessagesState):
        """Rewrite the original user question."""
        messages = state["messages"]
        question = messages[0].content
        prompt = REWRITE_PROMPT.format(question=question)
        response = response_model.invoke([{"role": "user", "content": prompt}])
        return {"messages": [{"role": "user", "content": response.content}]}
    
  2. 尝试一下:

    input = {
        "messages": convert_to_messages(
            [
                {
                    "role": "user",
                    "content": "What does Lilian Weng say about types of reward hacking?",
                },
                {
                    "role": "assistant",
                    "content": "",
                    "tool_calls": [
                        {
                            "id": "1",
                            "name": "retrieve_blog_posts",
                            "args": {"query": "types of reward hacking"},
                        }
                    ],
                },
                {"role": "tool", "content": "meow", "tool_call_id": "1"},
            ]
        )
    }
    
    response = rewrite_question(input)
    print(response["messages"][-1]["content"])
    

    Output:

    What are the different types of reward hacking described by Lilian Weng, and how does she explain them?
    

6. 生成答案

  1. 构建 generate_answer 节点:如果我们通过了评分检查,就可以根据原始问题和检索到的上下文生成最终答案:

    GENERATE_PROMPT = (
        "You are an assistant for question-answering tasks. "
        "Use the following pieces of retrieved context to answer the question. "
        "If you don't know the answer, just say that you don't know. "
        "Use three sentences maximum and keep the answer concise.\n"
        "Question: {question} \n"
        "Context: {context}"
    )
    
    def generate_answer(state: MessagesState):
        """Generate an answer."""
        question = state["messages"][0].content
        context = state["messages"][-1].content
        prompt = GENERATE_PROMPT.format(question=question, context=context)
        response = response_model.invoke([{"role": "user", "content": prompt}])
        return {"messages": [response]}
    
  2. 尝试一下:

    input = {
        "messages": convert_to_messages(
            [
                {
                    "role": "user",
                    "content": "What does Lilian Weng say about types of reward hacking?",
                },
                {
                    "role": "assistant",
                    "content": "",
                    "tool_calls": [
                        {
                            "id": "1",
                            "name": "retrieve_blog_posts",
                            "args": {"query": "types of reward hacking"},
                        }
                    ],
                },
                {
                    "role": "tool",
                    "content": "reward hacking can be categorized into two types: environment or goal misspecification, and reward tampering",
                    "tool_call_id": "1",
                },
            ]
        )
    }
    
    response = generate_answer(input)
    response["messages"][-1].pretty_print()
    

    Output:

    ================================== Ai Message ==================================
    
    Lilian Weng categorizes reward hacking into two types: environment or goal misspecification, and reward tampering. She considers reward hacking as a broad concept that includes both of these categories. Reward hacking occurs when an agent exploits flaws or ambiguities in the reward function to achieve high rewards without performing the intended behaviors.
    

7. 组装图

  • generate_query_or_respond 开始,确定是否需要调用 retriever_tool
  • 使用 tools_condition 路由到下一步:
    • 如果 generate_query_or_respond 返回了 tool_calls,则调用 retriever_tool 检索上下文
    • 否则,直接响应用户
  • 评估检索到的文档内容与问题的相关性(grade_documents)并路由到下一步:
    • 如果不相关,使用 rewrite_question 重写问题,然后再次调用 generate_query_or_respond
    • 如果相关,则继续到 generate_answer,并使用包含检索到的文档上下文的 ToolMessage 生成最终响应

API Reference: StateGraph | START | END | ToolNode | tools_condition

from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import ToolNode
from langgraph.prebuilt import tools_condition

workflow = StateGraph(MessagesState)

# 定义我们将循环使用的节点
workflow.add_node(generate_query_or_respond)
workflow.add_node("retrieve", ToolNode([retriever_tool]))
workflow.add_node(rewrite_question)
workflow.add_node(generate_answer)

workflow.add_edge(START, "generate_query_or_respond")

# 决定是否检索
workflow.add_conditional_edges(
    "generate_query_or_respond",
    # 评估LLM决策(调用 `retriever_tool` 工具或响应用户)
    tools_condition,
    {
        # 将条件输出转换为图中的节点
        "tools": "retrieve",
        END: END,
    },
)

# `action` 节点被调用后采取的边
workflow.add_conditional_edges(
    "retrieve",
    # 评估代理决策
    grade_documents,
)
workflow.add_edge("generate_answer", END)
workflow.add_edge("rewrite_question", "generate_query_or_respond")

# 编译
graph = workflow.compile()

可视化图:

from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))

Graph

8. 运行代理RAG

for chunk in graph.stream(
    {
        "messages": [
            {
                "role": "user",
                "content": "What does Lilian Weng say about types of reward hacking?",
            }
        ]
    }
):
    for node, update in chunk.items():
        print("Update from node", node)
        update["messages"][-1].pretty_print()
        print("\n\n")

输出:

来自节点 generate_query_or_respond 的更新
================================== Ai Message ==================================
工具调用:
  retrieve_blog_posts (call_NYu2vq4km9nNNEFqJwefWKu1)
 调用 ID: call_NYu2vq4km9nNNEFqJwefWKu1
  参数:
    查询:types of reward hacking

来自节点 retrieve 的更新
================================= Tool Message ==================================
名称:retrieve_blog_posts

(注:一些研究将奖励篡改定义为与奖励黑客不同的失控行为类别。但我在这里将奖励黑客视为一个更广泛的概念。)
从高层次来看,奖励黑客可以分为两种类型:环境或目标规范错误,以及奖励篡改。

为什么奖励黑客存在?#

Pan 等人(2022)研究了奖励黑客作为代理能力的函数,包括(1)模型大小,(2)动作空间分辨率,(3)观测空间噪声,以及(4)训练时间。他们还提出了三种错误指定代理奖励的分类法:

让我们定义奖励黑客#
RL中的奖励塑造具有挑战性。当RL代理利用奖励函数中的缺陷或模糊性来获得高奖励,而没有真正学习预期行为或按设计完成任务时,就会发生奖励黑客。近年来,已经提出了几个相关概念,都指某种形式的奖励黑客:

来自节点 generate_answer 的更新
================================== Ai Message ==================================

Lilian Weng 将奖励黑客分为两种类型:环境或目标规范错误,以及奖励篡改。她将奖励黑客视为一个包含这两个类别的广泛概念。当代理利用奖励函数中的缺陷或模糊性来获得高奖励而没有执行预期行为时,就会发生奖励黑客。