Converting the AI Stock AgentExecutor to LangGraph

Introduction

Welcome back to this ongoing series on building an AI-powered stock analysis tool! In the previous articles, I covered a wide range of topics. I leveraged LangChain, OpenBB, and Anthropic’s Claude 3 Opus. I also integrated analysis tools like sentiment analysis and chart-based technical analysis. Today, I’ll take a significant leap forward by converting the AgentExecutor setup to LangGraph. This transition will enhance the AI stock agent’s modularity, scalability, and maintainability.

Why LangGraph? It offers several advantages for building complex workflows and managing state transitions. This can be particularly beneficial when developing an AI-powered stock analysis agent. With LangGraph, you can define complex workflows using state graphs. This makes it easier to scale the system as one’s requirements grow. It’s particularly useful when integrating multiple tools and handling various user queries.

LangGraph provides precise control over the flow of execution. You can create more sophisticated and conditional workflows by defining state transitions and conditions. This can lead to more intelligent and context-aware AI behavior. If you have ever used the actor model or finite state machines in architectural design, you’ll feel right at home using LangGraph.

Here’s the updated code. Let’s get started.

AgentExecutor, It’s Been Real

Before diving into LangGraph, let’s briefly review the existing AgentExecutor setup. The current setup orchestrates the AI workflow, coordinates the execution of tools, and handles user queries using a predefined prompt template.

Here’s a snippet of the current setup:

Python
def create_anthropic_agent_executor():
    llm = ChatAnthropic(
        temperature=0,
        model_name="claude-3-opus-20240229",
        max_tokens=4096,
    )

    tools = get_tools(llm)
    prompt = get_prompt()

    agent = create_tool_calling_agent(llm, tools, prompt)

    return AgentExecutor.from_agent_and_tools(
        agent=agent,
        tools=tools,
        verbose=True,
        return_intermediate_steps=True,
        handle_parsing_errors=True,
    )

While effective, this setup has limitations. Using LangChain, I can easily create custom chains using the LangChain Expression Language, but these chains are examples of directed acyclic graphs (DAGs). The AgentExecutor gives us a level of orchestration via a relatively simple execution loop to invoke these chains, though I’m delegating all decision-making, reasoning, and action decisions to the executor, which coordinates back to the LLM in the current implementation.

As an agent’s duties grow, one may want to force an agent to call a particular tool first, have some control over the sequence of tools that are called next, reduce the number of calls to the LLM, or add a human-in-the-loop (RLHF) to the agent to reinforce its accuracy. One may even want to run multiple agents, each with their own workflows, their own prompts, and yes, even their own state. I need a state machine that supports cycles for this kind of heavy lifting. Now, I won’t dive into the theory (I truly want to, but I’ll spare ya), but needless to say, I need a graph for this.

A Graph with an Edge

LangGraph offers a powerful way to define and manage complex workflows using state graphs. This approach provides better control over the execution flow and makes it easier to scale and maintain the system.

Let’s import all the new goodies from LangGraph and refactor the AI agent module:

Python
from typing import Annotated, TypedDict

from langchain_anthropic import ChatAnthropic
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.prompts import ChatPromptTemplate
from langgraph.graph import END, StateGraph
from langgraph.prebuilt.tool_node import tools_condition
from langchain_core.runnables import Runnable, RunnableConfig
from langgraph.graph.message import AnyMessage, add_messages
from dotenv import load_dotenv

from app.tools.stock_stats import (
    get_gainers,
    get_losers,
    get_stock_price_history,
    get_stock_quantstats,
    get_stock_ratios,
    get_key_metrics,
    get_stock_sector_info,
    get_valuation_multiples,
    get_stock_universe,
)
from app.tools.stock_sentiment import get_news_sentiment
from app.tools.stock_relative_strength import get_relative_strength
from app.tools.stock_charts import get_stock_chart_analysis
from app.tools.risk_management import (
    calculate_r_multiples,
    calculate_technical_stops,
    calculate_position_size,
)
from app.tools.utils import create_tool_node_with_fallback
from app.chains.templates import SYSTEM_TEMPLATE

load_dotenv()


class AgentState(TypedDict):
    messages: Annotated[list[AnyMessage], add_messages]


class Assistant:
    def __init__(self, runnable: Runnable):
        self.runnable = runnable

    def __call__(self, state: AgentState, config: RunnableConfig):
        while True:
            result = self.runnable.invoke(state)

            if not result.tool_calls and (
                not result.content
                or isinstance(result.content, list)
                and not result.content[0].get("text")
            ):
                messages = state["messages"] + [("user", "Respond with a real output.")]
                state = {**state, "messages": messages}
            else:
                break
        return {"messages": result}


def get_prompt():
    return ChatPromptTemplate.from_messages(
        [
            ("system", SYSTEM_TEMPLATE),
            ("placeholder", "{messages}"),
        ]
    )


def get_tools():
    tavily = TavilySearchResults(max_results=1)

    tools = [
        get_stock_universe,
        get_stock_chart_analysis,
        get_relative_strength,
        get_valuation_multiples,
        get_stock_price_history,
        get_stock_quantstats,
        get_gainers,
        get_losers,
        get_stock_sector_info,
        get_news_sentiment,
        get_stock_ratios,
        get_key_metrics,
        tavily,
        calculate_r_multiples,
        calculate_technical_stops,
        calculate_position_size,
    ]
    return tools


def create_anthropic_agent_graph():
    llm = ChatAnthropic(temperature=0, model_name="claude-3-opus-20240229")

    tools = get_tools()
    prompt = get_prompt()

    llm_with_tools = prompt | llm.bind_tools(tools)

    builder = StateGraph(AgentState)

    builder.add_node("assistant", Assistant(llm_with_tools))
    builder.add_node("action", create_tool_node_with_fallback(tools))

    builder.set_entry_point("assistant")

    builder.add_conditional_edges(
        "assistant",
        tools_condition,
        {"action": "action", END: END},
    )
    builder.add_edge("action", "assistant")

    graph = builder.compile()
    return graph

Here’s a description of the code:

  1. The code starts by importing necessary modules and tools from various libraries.
  2. The AgentState class is defined using TypedDict to represent the state of the agent, which consists of a list of messages.
  3. The Assistant class is defined with an __init__ method that takes a Runnable object and a __call__ method that invokes the runnable with the current state and configuration. It ensures that the runnable produces a real output by checking the result and adding a message to the state if necessary.
  4. The get_prompt function returns a ChatPromptTemplate created from the SYSTEM_TEMPLATE and a placeholder for messages.
  5. The get_tools function returns a list of tools that the agent can use.
  6. The create_anthropic_agent_graph function is the main function that creates the LangGraph agent:
    • It initializes a ChatAnthropic LLM with specific parameters.
    • It retrieves the tools using the get_tools function and the prompt using the get_prompt function.
    • It binds the tools to the LLM using llm.bind_tools(tools).
    • It creates a StateGraph builder with the AgentState as the state type.
    • It adds an “assistant” node to the graph using the Assistant class and the LLM with tools.
    • It adds an “action” node to the graph using the create_tool_node_with_fallback function, which creates a node that can execute the available tools.
    • It sets the entry point of the graph to the “assistant” node.
    • It adds conditional edges from the “assistant” node to the “action” node or the end of the graph based on the tools_condition helper from the folks at LangChain.
    • It adds an edge from the “action” node back to the “assistant” node.
    • Finally, it compiles the graph and returns it.

I need to make a few changes to the FastAPI server and the Streamlit UI to test the new setup.

FastAPI Server using LangServe:

Python
# server.py

from typing import List, Any, Union

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import RedirectResponse
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_core.messages import HumanMessage, AIMessage
from langserve import add_routes
from openbb import obb
from dotenv import load_dotenv

import os
import warnings
import pandas as pd

from app.chains.agent import create_anthropic_agent_graph

warnings.filterwarnings("ignore")

load_dotenv()

obb.account.login(pat=os.environ.get("OPENBB_TOKEN"), remember_me=True)
obb.user.credentials.tiingo_token = os.environ.get("TIINGO_API_KEY")
obb.user.credentials.fmp_api_key = os.environ.get("FMP_API_KEY")
obb.user.credentials.intrinio_api_key = os.environ.get("INTRINIO_API_KEY")
obb.user.credentials.fred_api_key = os.environ.get("FRED_API_KEY")

pd.set_option("display.max_columns", None)
pd.set_option("display.max_rows", None)
pd.set_option("display.max_colwidth", None)

app = FastAPI(
    title="Financial Chat",
    version="1.0",
    description="The Trading Dude Abides",
)

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
    expose_headers=["*"],
)

graph = create_anthropic_agent_graph()


class AgentInput(BaseModel):
    messages: List[Union[HumanMessage, AIMessage]] = Field(
        ...,
        description="The chat messages representing the current conversation.",
        extra={"widget": {"type": "chat", "input": "messages"}},
    )


class AgentOutput(BaseModel):
    output: Any


@app.get("/")
async def redirect_root_to_docs():
    return RedirectResponse("/docs")


add_routes(
    app,
    graph,
    path="/chat",
    input_type=AgentInput,
    output_type=AgentOutput,
)

if __name__ == "__main__":
    import uvicorn

    uvicorn.run(app, host="0.0.0.0", port=8080)

Streamlit:

Python
# ui.py

from typing import Callable, TypeVar

from streamlit.runtime.scriptrunner import add_script_run_ctx, get_script_run_ctx
from streamlit.delta_generator import DeltaGenerator
from langchain_community.callbacks.streamlit import StreamlitCallbackHandler
from langchain_core.runnables import RunnableConfig
from openbb import obb
from dotenv import load_dotenv

from app.chains.clear_results import with_clear_container
from app.chains.agent import create_anthropic_agent_graph

import os
import warnings
import inspect
import uuid
import pandas as pd
import streamlit as st

warnings.filterwarnings("ignore")

load_dotenv()

obb.account.login(pat=os.environ.get("OPENBB_TOKEN"), remember_me=True)
obb.user.credentials.tiingo_token = os.environ.get("TIINGO_API_KEY")
obb.user.credentials.fmp_api_key = os.environ.get("FMP_API_KEY")
obb.user.credentials.intrinio_api_key = os.environ.get("INTRINIO_API_KEY")
obb.user.credentials.fred_api_key = os.environ.get("FRED_API_KEY")

pd.set_option("display.max_columns", None)
pd.set_option("display.max_rows", None)
pd.set_option("display.max_colwidth", None)

st.set_page_config(
    page_title="Financial Chat | The Trading Dude Abides",
    page_icon="🦜",
    layout="wide",
    initial_sidebar_state="collapsed",
)

T = TypeVar("T")


def get_streamlit_cb(parent_container: DeltaGenerator):
    def decor(fn: Callable[..., T]) -> Callable[..., T]:
        ctx = get_script_run_ctx()

        def wrapper(*args, **kwargs) -> T:
            add_script_run_ctx(ctx=ctx)
            return fn(*args, **kwargs)

        return wrapper

    st_cb = StreamlitCallbackHandler(parent_container=parent_container)

    for name, fn in inspect.getmembers(st_cb, predicate=inspect.ismethod):
        if name.startswith("on_"):
            setattr(st_cb, name, decor(fn))

    return st_cb


if "graph" not in st.session_state:
    st.session_state.graph = create_anthropic_agent_graph()

st.title("Financial Chat, your AI financial advisor 📈")

if "messages" not in st.session_state:
    st.session_state.messages = [
        {"role": "assistant", "content": "How can I help you?"}
    ]

for msg in st.session_state.messages:
    st.chat_message(msg["role"]).write(msg["content"])

with st.form(key="form"):
    user_input = st.text_input(
        "Please ask your question here:",
        placeholder="Please give me a full analysis of Apple",
    )
    submit_clicked = st.form_submit_button("Submit Question")

output_container = st.empty()
if with_clear_container(submit_clicked):
    output_container = output_container.container()

    output_container.markdown(f"**User:** {user_input}")
    st.session_state.messages.append({"role": "user", "content": user_input})

    answer_container = output_container.chat_message("assistant", avatar="💸")
    st_callback = get_streamlit_cb(answer_container)

    cfg = RunnableConfig()
    cfg["callbacks"] = [st_callback]
    cfg["configurable"] = {"thread_id": uuid.uuid4()}

    question = {"messages": ("user", user_input)}

    response = st.session_state.graph.invoke(question, cfg)
    answer = response["messages"][-1].content

    st.session_state.messages.append({"role": "assistant", "content": answer})
    answer_container.write(answer)

Now let’s kick the tires:

Conclusion

In this article, I transitioned the AI-powered stock analysis agent from AgentExecutor to LangGraph. You might think, “Seth, this is cool, but it doesn’t do anything different from the AgentExecutor at the moment.” And you would be right. 😀 To fully leverage LangGraph, I need to use multiple workflows with multiple agents, each having their own instructions.

Every trader should have their own process. That process can be defined as nodes in a graph, with edges describing the next action they take when analyzing stocks. One trader’s process may be vastly different from another’s. Thus, the tools one defines must be flexible and divided into agentic operations for each phase in the process.

I hope it’s becoming clear why one doesn’t need countless buttons to push, scans to run, YouTube videos to watch, models to analyze, experts to consult, or services to keep track of. By leveraging AI agents to bring these features and much more together, one can generate actionable insights in natural human language, ultimately saving us a mountain of time. A trader’s weekend and daily stock selection routine can be streamlined into focusing only on the best setups that match that particular trader’s process at a specific time. Follow-up questions can be asked based on the feasibility of the setups, further confirming or refuting their thesis. The power and flexibility of this approach cannot and should not be understated.

In the next article, I will continue to push the envelope of what’s possible by expanding LangGraph even further with multiple agents, while I continue swapping symbols.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.