Agentic RAG for PDFs with mixed data ↗
noOriginal Documentation
Here, we connect all of the pieces discussed above into one class object, which is then used as a tool for a Cohere ReAct agent. This class definition consolidates and clarify the key parameters used to define the RAG pipeline.
co = cohere.Client()class Element(BaseModel):
type: str
text: Any
class RAG_pipeline():
def __init__(self,paths):
self.embedding_model="embed-v4.0"
self.generation_model="command-a-03-2025"
self.summary_model="command-a-03-2025"
self.rerank_model="rerank-multilingual-v3.0"
self.num_docs_to_retrieve = 10
self.top_k_rerank=3
self.temperature=0.2
self.preamble="""
## Task & Context
You help people answer their questions and other requests interactively. You will be asked a very wide array of requests on all kinds of topics. You will be equipped with a wide range of search engines or similar tools to help you, which you use to research your answer. You should focus on serving the user's needs as best you can, which will be wide-ranging.
## Style Guide
Unless the user asks for a different style of answer, you should answer in full sentences, using proper grammar and spelling.
"""
self.n_jobs=10 #number of parallel processes to run summarization of chunks
self.extract_images_in_pdf=False
self.infer_table_structure=True
self.chunking_strategy="by_title"
self.max_characters=4000
self.new_after_n_chars=3800
self.combine_text_under_n_chars=2000
self.image_output_dir_path='.'
self.paths = paths
self.parse_and_build_retriever()
def parse_and_build_retriever(self,):
#step1, parse pdfs
# if condition just for debugging since perf_audit.pdf is parsed in the prev step, no need to rerun
parsed_pdf_list=self.parse_pdfs(self.paths)
#separate tables and text
extracted_tables, extracted_text = self.extract_text_and_tables(parsed_pdf_list)
#generate summaries for everything
tables, table_summaries, texts, text_summaries=self.generate_summaries(extracted_tables,extracted_text)
self.tables = tables
self.table_summaries = table_summaries
self.texts = texts
self.text_summaries=text_summaries
#setup the multivector retriever
self.make_retriever(tables, table_summaries, texts, text_summaries)
def extract_text_and_tables(self,parsed_pdf_list):
# extract table and textual objects from parser
# Categorize by type
all_table_elements = []
all_text_elements = []
for raw_pdf_elements in parsed_pdf_list:
categorized_elements = []
for element in raw_pdf_elements:
if "unstructured.documents.elements.Table" in str(type(element)):
categorized_elements.append(Element(type="table", text=str(element)))
elif "unstructured.documents.elements.CompositeElement" in str(type(element)):
categorized_elements.append(Element(type="text", text=str(element)))
# Tables
table_elements = [e for e in categorized_elements if e.type == "table"]
print(len(table_elements))
# Text
text_elements = [e for e in categorized_elements if e.type == "text"]
print(len(text_elements))
all_table_elements.extend(table_elements)
all_text_elements.extend(text_elements)
return all_table_elements, all_text_elements
def parse_pdfs(self, paths):
path_raw_elements = []
for path in paths:
raw_pdf_elements = partition_pdf(
filename=path,
# Unstructured first finds embedded image blocks
extract_images_in_pdf=self.extract_images_in_pdf,
# Use layout model (YOLOX) to get bounding boxes (for tables) and find titles
# Titles are any sub-section of the document
infer_table_structure=self.infer_table_structure,
# Post processing to aggregate text once we have the title
chunking_strategy=self.chunking_strategy,
# Chunking params to aggregate text blocks
# Attempt to create a new chunk 3800 chars
# Attempt to keep chunks > 2000 chars
max_characters=self.max_characters,
new_after_n_chars=self.new_after_n_chars,
combine_text_under_n_chars=self.combine_text_under_n_chars,
image_output_dir_path=self.image_output_dir_path,
)
path_raw_elements.append(raw_pdf_elements)
print('PDFs parsed')
return path_raw_elements
def get_chat_output(self,message, preamble, model, temp):
# print("**message")
# print(message)
response=co.chat(
message=message,
preamble=preamble,
model=model,
temperature=temp
).text
# print("**output")
# print(response)
return response
def parallel_proc_chat(self,prompts,preamble,model,temp,n_jobs):
"""Parallel processing of chat endpoint calls."""
responses = Parallel(n_jobs=n_jobs, prefer="threads")(delayed(self.get_chat_output)(prompt,preamble,model,temp) for prompt in prompts)
return responses
def rerank_cohere(self,query, returned_documents,model, top_n):
response = co.rerank(
query=query,
documents=returned_documents,
top_n=top_n,
model=model,
return_documents=True
)
top_chunks_after_rerank = [results.document.text for results in response.results]
return top_chunks_after_rerank
def generate_summaries(self,table_elements,text_elements):
# generate table and text summaries
summarize_prompt = """You are an assistant tasked with summarizing tables and text. \
Give a concise summary of the table or text. Table or text chunk: {element}. Only provide the summary and no other text."""
table_prompts = [summarize_prompt.format(element=i.text) for i in table_elements]
table_summaries = self.parallel_proc_chat(table_prompts,self.preamble,self.summary_model,self.temperature,self.n_jobs)
text_prompts = [summarize_prompt.format(element=i.text) for i in text_elements]
text_summaries = self.parallel_proc_chat(text_prompts,self.preamble,self.summary_model,self.temperature,self.n_jobs)
tables = [i.text for i in table_elements]
texts = [i.text for i in text_elements]
print('summaries generated')
return tables, table_summaries, texts, text_summaries
def make_retriever(self,tables, table_summaries, texts, text_summaries):
# The vectorstore to use to index the child chunks
vectorstore = Chroma(collection_name="summaries", embedding_function=CohereEmbeddings())
# The storage layer for the parent documents
store = InMemoryStore()
id_key = "doc_id"
# The retriever (empty to start)
retriever = MultiVectorRetriever(
vectorstore=vectorstore,
docstore=store,
id_key=id_key,
search_kwargs={"k": self.num_docs_to_retrieve}
)
# Add texts
doc_ids = [f'text_{i}' for i in range(len(texts))]#[str(uuid.uuid4()) for _ in texts]
summary_texts = [
Document(page_content=s, metadata={id_key: doc_ids[i]})
for i, s in enumerate(text_summaries)
]
retriever.vectorstore.add_documents(summary_texts,ids=doc_ids)
retriever.docstore.mset(list(zip(doc_ids, texts)))
# Add tables
table_ids = [f'table_{i}' for i in range(len(texts))]#[str(uuid.uuid4()) for _ in tables]
summary_tables = [
Document(page_content=s, metadata={id_key: table_ids[i]})
for i, s in enumerate(table_summaries)
]
retriever.vectorstore.add_documents(summary_tables,ids=table_ids)
retriever.docstore.mset(list(zip(table_ids, tables)))
self.retriever = retriever
print('retriever built')
def process_query(self,query):
"""Runs query augmentation, retrieval, rerank and generation in one call."""
augmented_queries=co.chat(message=query,model=self.generation_model,temperature=self.temperature, search_queries_only=True)
#augment queries
if augmented_queries.search_queries:
reranked_docs=[]
for itm in augmented_queries.search_queries:
docs=self.retriever.invoke(itm.text)
temp_rerank = self.rerank_cohere(itm.text,docs,model=self.rerank_model,top_n=self.top_k_rerank)
reranked_docs.extend(temp_rerank)
documents = [{"title": f"chunk {i}", "snippet": reranked_docs[i]} for i in range(len(reranked_docs))]
else:
documents = None
response = co.chat(
message=query,
documents=documents,
preamble=self.preamble,
model=self.generation_model,
temperature=self.temperature
)
final_answer_docs="""The final answer is from the documents below:
{docs}""".format(docs=str(response.documents))
final_answer = response.text
return final_answer, final_answer_docsrag_object=RAG_pipeline(paths=["city_ny_popular_fin_report.pdf"])This function will be deprecated in a future release and unstructured will simply use the DEFAULT_MODEL from unstructured_inference.model.base to set default model name
PDFs parsed
14
24
summaries generated
retriever builtCohere ReAct Agent with RAG Tool [#sec_step5]#
Finally, we build a simple agent that utilizes the RAG pipeline defined above. We do this by granting the agent access to two tools:
- the end-to-end RAG pipeline
- a Python interpreter
The intention behind coupling these tools is to enable the model to perform mathematical and other postprocessing operations on RAG outputs using Python.
from langchain.agents import Tool
from langchain_experimental.utilities import PythonREPL
from langchain.agents import AgentExecutor
from langchain_cohere.react_multi_hop.agent import create_cohere_react_agent
from langchain_core.prompts import ChatPromptTemplate
from langchain_cohere.chat_models import ChatCohere
from langchain.tools.retriever import create_retriever_tool
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_core.tools import tool
class react_agent():
def __init__(self,rag_retriever,model="command-a-03-2025",temperature=0.2):
self.llm = ChatCohere(model=model, temperature=temperature)
self.preamble="""
## Task & Context
You help people answer their questions and other requests interactively. You will be asked a very wide array of requests on all kinds of topics. You will be equipped with a wide range of search engines or similar tools to help you, which you use to research your answer. You should focus on serving the user's needs as best you can, which will be wide-ranging.
## Style Guide
Unless the user asks for a different style of answer, you should answer in full sentences, using proper grammar and spelling.
## Guidelines
You are an expert who answers the user's question.
You have access to a vectorsearch tool that will use your query to search through documents and find the relevant answer.
You also have access to a python interpreter tool which you can use to run code for mathematical operations.
"""
self.get_tools(rag_retriever)
self.build_agent()
def get_tools(self,rag_retriever):
@tool
def vectorsearch(query: str):
"""Uses the query to search through a list of documents and return the most relevant documents as well as the answer."""
final_answer, final_answer_docs=rag_retriever.process_query(query)
return final_answer + final_answer_docs
vectorsearch.name = "vectorsearch" # use python case
vectorsearch.description = "Uses the query to search through a list of documents and return the most relevant documents as well as the answer."
class vectorsearch_inputs(BaseModel):
query: str = Field(description="the users query")
vectorsearch.args_schema = vectorsearch_inputs
python_repl = PythonREPL()
python_tool = Tool(
name="python_repl",
description="Executes python code and returns the result. The code runs in a static sandbox without interactive mode, so print output or save output to a file.",
func=python_repl.run,
)
python_tool.name = "python_interpreter"
class ToolInput(BaseModel):
code: str = Field(description="Python code to execute.")
python_tool.args_schema = ToolInput
self.alltools = [vectorsearch,python_tool]
def build_agent(self):
# Prompt template
prompt = ChatPromptTemplate.from_template("{input}")
# Create the ReAct agent
agent = create_cohere_react_agent(
llm=self.llm,
tools=self.alltools,
prompt=prompt,
)
self.agent_executor = AgentExecutor(agent=agent, tools=self.alltools, verbose=True,return_intermediate_steps=True)
def run_agent(self,query,history=None):
if history:
response=self.agent_executor.invoke({
"input": query,
"preamble": self.preamble,
"chat_history": history
})
else:
response=self.agent_executor.invoke({
"input": query,
"preamble": self.preamble,
})
return responseagent_object=react_agent(rag_retriever=rag_object)step1_response=agent_object.run_agent("what are the charges for services in 2022 and 2023")[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3m
I will search for the charges for services in 2022 and 2023.
{'tool_name': 'vectorsearch', 'parameters': {'query': 'charges for services in 2022 and 2023'}}
[0m[36;1m[1;3mThe charges for services in 2022 were $5,266 million and in 2023 were $5,769 million.The final answer is from the documents below:
[{'id': 'doc_0', 'snippet': 'Program and General Revenues FY 2023 FY 2022 FY 2021 Category (in millions) Charges for Services (CS) $5,769 $5,266 $5,669 Operating Grants and Contributions (OGC) 27,935 31,757 28,109 Capital Grants and Contributions (CGC) 657 656 675 Real Estate Taxes (RET) 31,502 29,507 31,421 Sales and Use Taxes (SUT) 10,577 10,106 7,614 Personal Income Taxes (PIT) 15,313 15,520 15,795 Income Taxes, Other (ITO) 13,181 9,521 9,499 Other Taxes* (OT) 3,680 3,777 2,755 Investment Income* (II) 694 151 226 Unrestricted Federal and State Aid (UFSA) 234 549 108 Other* (O) Total Program and General Revenues - Primary Government 2,305 $110,250 $107,535 $104,176 708 725', 'title': 'chunk 0'}][0m[32;1m[1;3mRelevant Documents: 0
Cited Documents: 0
Answer: The charges for services in 2022 were $5,266 million and in 2023 were $5,769 million.
Grounded answer: The charges for services in <co: 0="">2022</co:> were <co: 0="">$5,266 million</co:> and in <co: 0="">2023</co:> were <co: 0="">$5,769 million</co:>.[0m
[1m> Finished chain.[0mJust like earlier, we can also pass chat history to the LangChain agent to refer to for any other queries.
from langchain_core.messages import HumanMessage, AIMessagechat_history=[
HumanMessage(content=step1_response['input']),
AIMessage(content=step1_response['output'])
]agent_object.run_agent("what is the mean of the two values",history=chat_history)[1m> Entering new AgentExecutor chain...[0m
Python REPL can execute arbitrary code. Use with caution.
[32;1m[1;3m
I will use the Python Interpreter tool to calculate the mean of the two values.
{'tool_name': 'python_interpreter', 'parameters': {'code': 'import numpy as np\n\n# Data\nvalues = [5266, 5769]\n\n# Calculate the mean\nmean_value = np.mean(values)\n\nprint(f"The mean of the two values is: {mean_value:.0f} million")'}}
[0m[33;1m[1;3mThe mean of the two values is: 5518 million
[0m[32;1m[1;3mRelevant Documents: 0
Cited Documents: 0
Answer: The mean of the two values is 5518 million.
Grounded answer: The mean of the two values is <co: 0="">5518 million</co:>.[0m
[1m> Finished chain.[0m{'input': 'what is the mean of the two values',
'preamble': "\n## Task & Context\nYou help people answer their questions and other requests interactively. You will be asked a very wide array of requests on all kinds of topics. You will be equipped with a wide range of search engines or similar tools to help you, which you use to research your answer. You should focus on serving the user's needs as best you can, which will be wide-ranging.\n\n## Style Guide\nUnless the user asks for a different style of answer, you should answer in full sentences, using proper grammar and spelling.\n\n## Guidelines\nYou are an expert who answers the user's question. \nYou have access to a vectorsearch tool that will use your query to search through documents and find the relevant answer.\nYou also have access to a python interpreter tool which you can use to run code for mathematical operations.\n",
'chat_history': [HumanMessage(content='what are the charges for services in 2022 and 2023'),
AIMessage(content='The charges for services in 2022 were $5,266 million and in 2023 were $5,769 million.')],
'output': 'The mean of the two values is 5518 million.',
'citations': [CohereCitation(start=30, end=42, text='5518 million', documents=[{'output': 'The mean of the two values is: 5518 million\n'}])],
'intermediate_steps': [(AgentActionMessageLog(tool='python_interpreter', tool_input={'code': 'import numpy as np\n\n# Data\nvalues = [5266, 5769]\n\n# Calculate the mean\nmean_value = np.mean(values)\n\nprint(f"The mean of the two values is: {mean_value:.0f} million")'}, log='\nI will use the Python Interpreter tool to calculate the mean of the two values.\n{\'tool_name\': \'python_interpreter\', \'parameters\': {\'code\': \'import numpy as np\\n\\n# Data\\nvalues = [5266, 5769]\\n\\n# Calculate the mean\\nmean_value = np.mean(values)\\n\\nprint(f"The mean of the two values is: {mean_value:.0f} million")\'}}\n', message_log=[AIMessage(content='\nPlan: I will use the Python Interpreter tool to calculate the mean of the two values.\nAction: ```json\n[\n {\n "tool_name": "python_interpreter",\n "parameters": {\n "code": "import numpy as np\\n\\n# Data\\nvalues = [5266, 5769]\\n\\n# Calculate the mean\\nmean_value = np.mean(values)\\n\\nprint(f\\"The mean of the two values is: {mean_value:.0f} million\\")"\n }\n }\n]\n```')]),
'The mean of the two values is: 5518 million\n')]}Conclusion#
As you can see, the RAG pipeline can be used as a tool for a Cohere ReAct agent. This allows the agent to access the RAG pipeline for document retrieval and generation, as well as a Python interpreter for postprocessing mathematical operations to improve accuracy. This setup can be used to improve the accuracy of grounded answers to questions about documents that contain both tables and text.