Programming | Towards Data Science https://towardsdatascience.com/tag/programming/ The world’s leading publication for data science, AI, and ML professionals. Thu, 10 Apr 2025 15:26:16 +0000 en-US hourly 1 https://wordpress.org/?v=6.7.1 https://towardsdatascience.com/wp-content/uploads/2025/02/cropped-Favicon-32x32.png Programming | Towards Data Science https://towardsdatascience.com/tag/programming/ 32 32 Deb8flow: Orchestrating Autonomous AI Debates with LangGraph and GPT-4o https://towardsdatascience.com/deb8flow-orchestrating-autonomous-ai-debates-with-langgraph-and-gpt-4o/ Thu, 10 Apr 2025 05:14:56 +0000 https://towardsdatascience.com/?p=605704 Inside Deb8flow: Real-time AI debates with LangGraph and GPT-4o

The post Deb8flow: Orchestrating Autonomous AI Debates with LangGraph and GPT-4o appeared first on Towards Data Science.

]]>
Introduction

I’ve always been fascinated by debates—the strategic framing, the sharp retorts, and the carefully timed comebacks. Debates aren’t just entertaining; they’re structured battles of ideas, driven by logic and evidence. Recently, I started wondering: could we replicate that dynamic using AI agents—having them debate each other autonomously, complete with real-time fact-checking and moderation? The result was Deb8flow, an autonomous AI debating environment powered by LangGraph, OpenAI’s GPT-4o model, and the new integrated Web Search feature.

In Deb8flow, two agents—Pro and Con—square off on a given topic while a Moderator manages turn-taking. A dedicated Fact Checker reviews every claim in real time using GPT-4o’s new browsing capabilities, and a final Judge evaluates the arguments for quality and coherence. If an agent repeatedly makes factual errors, they’re automatically disqualified—ensuring the debate stays grounded in truth.

This article offers an in-depth look at the advanced architecture and dynamic workflows that power autonomous AI debates. I’ll walk you through how Deb8flow’s modular design leverages LangGraph’s state management and conditional routing, alongside GPT-4o’s capabilities.

Even if you’re new to AI agents or LangGraph (see resources [1] and [2] for primers), I’ll explain the key concepts clearly. And if you’d like to explore further, the full project is available on GitHub: iason-solomos/Deb8flow.

Ready to see how AI agents can debate autonomously in practice?

Let’s dive in.

High-Level Overview: Autonomous Debates with Multiple Agents

In Deb8flow, we orchestrate a formal debate between two AI agents – one arguing Pro and one Con – complete with a Moderator, a Fact Checker, and a final Judge. The debate unfolds autonomously, with each agent playing a role in a structured format.

At its core, Deb8flow is a LangGraph-powered agent system, built atop LangChain, using GPT-4o to power each role—Pro, Con, Judge, and beyond. We use GPT-4o’s preview model with browsing capabilities to enable real-time fact-checking. In essence, the Pro and Con agents debate; after each statement, a fact-checker agent uses GPT-4o’s web search to catch any hallucinations or inaccuracies in that statement in real time.​ The debate only continues once the statement is verified. The whole process is coordinated by a LangGraph-defined workflow that ensures proper turn-taking and conditional logic.


High-level debate flow graph. Each rectangle is an agent node (Pro/Con debaters, Fact Checker, Judge, etc.), and diamonds are control nodes (Moderator and a router after fact-checking). Solid arrows denote the normal progression, while dashed arrows indicate retries if a claim fails fact-check. The Judge node outputs the final verdict, then the workflow ends.
Image generated by the author with DALL-E

The debate workflow goes through these stages:

  • Topic Generation: A Topic Generator agent produces a nuanced, debatable topic for the session (e.g. “Should AI be used in classroom education?”).
  • Opening: The Pro Argument Agent makes an opening statement in favor of the topic, kicking off the debate.
  • Rebuttal: The Debate Moderator then gives the floor to the Con Argument agent, who rebuts the Pro’s opening statement.
  • Counter: The Moderator gives the floor back to the Pro agent, who counters the Con agent’s points.
  • Closing: The Moderator switches the floor to the Con agent one last time for a closing argument.
  • Judgment: Finally, the Judge agent reviews the full debate history and evaluates both sides based on argument quality, clarity, and persuasiveness. The most convincing side wins.

After every single speech, the Fact Checker agent steps in to verify the factual accuracy of that statement​. If a debater’s claim doesn’t hold up (e.g. cites a wrong statistic or “hallucinates” a fact), the workflow triggers a retry: the speaker has to correct or modify their statement. (If either debater accumulates 3 fact-check failures, they are automatically disqualified for repeatedly spreading inaccuracies, and their opponent wins by default.) This mechanism keeps our AI debaters honest and grounded in reality!

Prerequisites and Setup

Before diving into the code, make sure you have the following in place:

  • Python 3.12+ installed.
  • An OpenAI API key with access to the GPT-4o model. You can create your own API key here: https://platform.openai.com/settings/organization/api-keys
  • Project Code: Clone the Deb8flow repository from GitHub (git clone https://github.com/iason-solomos/Deb8flow.git). The repo includes a requirements.txt for all required packages. Key dependencies include LangChain/LangGraph (for building the agent graph) and the OpenAI Python client.
  • Install Dependencies: In your project directory, run: pip install -r requirements.txt to install the necessary libraries.
  • Create a .env file in the project root to hold your OpenAI API credentials. It should be of the form: OPENAI_API_KEY_GPT4O = "sk-…"
  • You can also at any time check out the README file: https://github.com/iason-solomos/Deb8flow if you simply want to run the finished app.

Once dependencies are installed and the environment variable is set, you should be ready to run the app. The project structure is organized for clarity:

Deb8flow/
├── configurations/
│ ├── debate_constants.py
│ └── llm_config.py
├── nodes/
│ ├── base_component.py
│ ├── topic_generator_node.py
│ ├── pro_debater_node.py
│ ├── con_debater_node.py
│ ├── debate_moderator_node.py
│ ├── fact_checker_node.py
│ ├── fact_check_router_node.py
│ └── judge_node.py
├── prompts/
│ ├── topic_generator_prompts.py
│ ├── pro_debater_prompts.py
│ ├── con_debater_prompts.py
│ └── … (prompts for other agents)
├── tests/ (contains unit and whole workflow tests)
└── debate_workflow.py

A quick tour of this structure:

configurations/ holds constant definitions and LLM configuration classes.

nodes/ contains the implementation of each agent or functional node in the debate (each of these is a module defining one agent’s behavior).

prompts/ stores the prompt templates for the language model (so each agent knows how to prompt GPT-4o for its specific task).

debate_workflow.py ties everything together by defining the LangGraph workflow (the graph of nodes and transitions).

debate_state.py defines the shared data structure that the agents will be using on each run.

tests/ includes some basic tests and example runs to help you verify everything is working.

Under the Hood: State Management and Workflow Setup

To coordinate a complex multi-turn debate, we need a shared state and a well-defined flow. We’ll start by looking at how Deb8flow defines the debate state and constants, and then see how the LangGraph workflow is constructed.

Defining the Debate State Schema (debate_state.py)

Deb8flow uses a shared state (https://langchain-ai.github.io/langgraph/concepts/low_level/#state ) in the form of a Python TypedDict that all agents can read from and update. This state tracks the debate’s progress and context – things like the topic, the history of messages, whose turn it is, etc. By centralizing this information, each agent node can make decisions based on the current state of the debate.

Link: debate_state.py

from typing import TypedDict, List, Dict, Literal


DebateStage = Literal["opening", "rebuttal", "counter", "final_argument"]

class DebateMessage(TypedDict):
    speaker: str  # e.g. pro or con
    content: str  # The message each speaker produced
    validated: bool  # Whether the FactChecker ok’d this message
    stage: DebateStage # The stage of the debate when this message was produced

class DebateState(TypedDict):
    debate_topic: str
    positions: Dict[str, str]
    messages: List[DebateMessage]
    opening_statement_pro_agent: str
    stage: str  # "opening", "rebuttal", "counter", "final_argument"
    speaker: str  # "pro" or "con"
    times_pro_fact_checked: int # The number of times the pro agent has been fact-checked. If it reaches 3, the pro agent is disqualified.
    times_con_fact_checked: int # The number of times the con agent has been fact-checked. If it reaches 3, the con agent is disqualified.

Key fields that we need to have in the DebateState include:

  • debate_topic (str): The topic being debated.
  • messages (List[DebateMessage]): A list of all messages exchanged so far. Each message is a dictionary with fields for speaker (e.g. "pro" or "con" or "fact_checker"), the message content (text), a validated flag (whether it passed fact-check), and the stage of the debate when it was produced.
  • stage (str): The current debate stage (one of "opening", "rebuttal", "counter", "final_argument").
  • speaker (str): Whose turn it is currently ("pro" or "con").
  • times_pro_fact_checked / times_con_fact_checked (int): Counters for how many times each side has been caught with a false claim. (In our rules, if a debater fails fact-check 3 times, they could be disqualified or automatically lose.)
  • positions (Dict[str, str]): (Optional) A mapping of each side’s general stance (e.g., "pro": "In favor of the topic").

By structuring the debate’s state, agents find it easy to access the conversation history or check the current stage, and the control logic can update the state between turns. The state is essentially the memory of the debate.

Constants and Configuration

To avoid “magic strings” scattered in the code, we define some constants in debate_constants.py. For example, constants for stage names (STAGE_OPENING = "opening", etc.), speaker identifiers (SPEAKER_PRO = "pro", SPEAKER_CON = "con", etc.), and node names (NODE_PRO_DEBATER = "pro_debater_node", etc.). These make the code easier to maintain and read.

debate_constants.py:

# Stage names
STAGE_OPENING = "opening"
STAGE_REBUTTAL = "rebuttal"
STAGE_COUNTER = "counter"
STAGE_FINAL_ARGUMENT = "final_argument"
STAGE_END = "end"

# Speakers
SPEAKER_PRO = "pro"
SPEAKER_CON = "con"
SPEAKER_JUDGE = "judge"

# Node names
NODE_PRO_DEBATER = "pro_debater_node"
NODE_CON_DEBATER = "con_debater_node"
NODE_DEBATE_MODERATOR = "debate_moderator_node"
NODE_JUDGE = "judge_node"

We also set up LLM configuration in llm_config.py. Here, we define classes for OpenAI or Azure OpenAI configs and then create a dictionary llm_config_map mapping model names to their config. For instance, we map "gpt-4o" to an OpenAILLMConfig that holds the model name and API key. This way, whenever we need to initialize a GPT-4o agent, we can just do llm_config_map["gpt-4o"] to get the right config. All our main agents (debaters, topic generator, judge) use this same GPT-4o configuration.

import os
from dataclasses import dataclass
from typing import Union

@dataclass
class OpenAILLMConfig:
    """
    A data class to store configuration details for OpenAI models.

    Attributes:
        model_name (str): The name of the OpenAI model to use.
        openai_api_key (str): The API key for authenticating with the OpenAI service.
    """
    model_name: str
    openai_api_key: str


llm_config_map = {
    "gpt-4o": OpenAILLMConfig(
        model_name="gpt-4o",
        openai_api_key=os.getenv("OPENAI_API_KEY_GPT4O"),
    )
}

Building the LangGraph Workflow (debate_workflow.py)

With state and configs in place, we construct the debate workflow graph. LangGraph’s StateGraph is the backbone that connects all our agent nodes in the order they should execute. Here’s how we set it up:

class DebateWorkflow:

    def _initialize_workflow(self) -> StateGraph:
        workflow = StateGraph(DebateState)
        # Nodes
        workflow.add_node("generate_topic_node", GenerateTopicNode(llm_config_map["gpt-4o"]))
        workflow.add_node("pro_debater_node", ProDebaterNode(llm_config_map["gpt-4o"]))
        workflow.add_node("con_debater_node", ConDebaterNode(llm_config_map["gpt-4o"]))
        workflow.add_node("fact_check_node", FactCheckNode())
        workflow.add_node("fact_check_router_node", FactCheckRouterNode())
        workflow.add_node("debate_moderator_node", DebateModeratorNode())
        workflow.add_node("judge_node", JudgeNode(llm_config_map["gpt-4o"]))

        # Entry point
        workflow.set_entry_point("generate_topic_node")

        # Flow
        workflow.add_edge("generate_topic_node", "pro_debater_node")
        workflow.add_edge("pro_debater_node", "fact_check_node")
        workflow.add_edge("con_debater_node", "fact_check_node")
        workflow.add_edge("fact_check_node", "fact_check_router_node")
        workflow.add_edge("judge_node", END)
        return workflow



    async def run(self):
        workflow = self._initialize_workflow()
        graph = workflow.compile()
        # graph.get_graph().draw_mermaid_png(output_file_path="workflow_graph.png")
        initial_state = {
            "topic": "",
            "positions": {}
        }
        final_state = await graph.ainvoke(initial_state, config={"recursion_limit": 50})
        return final_state

Let’s break down what’s happening:

  • We initialize a new StateGraph with our DebateState type as the state schema.
  • We add each node (agent) to the graph with a name. For nodes that need an LLM, we pass in the GPT-4o config. For example, "pro_debater_node" is added as ProDebaterNode(llm_config_map["gpt-4o"]), meaning the Pro debater agent will use GPT-4o as its underlying model.
  • We set the entry point of the graph to "generate_topic_node". This means the first step of the workflow is to generate a debate topic.
  • Then we add directed edges to connect nodes. The edges above encode the primary sequence: topic -> pro’s turn -> fact-check -> (then a routing decision) -> … eventually -> judge -> END. We don’t connect the Moderator or Fact Check Router with static edges, since these nodes use dynamic commands to redirect the flow. The final edge connects the judge to an END marker to terminate the graph.

When the workflow runs, control will pass along these edges in order, but whenever we hit a router or moderator node, that node will output a command telling the graph which node to go to next (overriding the default edge). This is how we create conditional loops: the fact_check_router_node might send us back to a debater node for a retry, instead of following a straight line. LangGraph supports this by allowing nodes to return a special Command object with goto instructions.

In summary, at a high level we’ve defined an agentic workflow: a graph of autonomous agents where control can branch and loop based on the agents’ outputs. Now, let’s explore what each of these agent nodes actually does.

Agent Nodes Breakdown

Each stage or role in the debate is encapsulated in a node (agent). In LangGraph, nodes are often simple functions, but I wanted a more object-oriented approach for clarity and reusability. So in Deb8flow, every node is a class with a __call__ method. All the main agent classes inherit from a common BaseComponent for shared functionality. This design makes the system modular: we can easily swap out or extend agents by modifying their class definitions, and each agent class is responsible for its piece of the workflow.

Let’s go through the key agents one by one.

BaseComponent – A Reusable Agent Base Class

Most of our agent nodes (like the debaters and judge) share common needs: they use an LLM to generate output, they might need to retry on errors, and they should track token usage. The BaseComponent class (defined in <a href="https://github.com/iason-solomos/Deb8flow/blob/main/nodes/base_component.py">nodes/base_component.py</a>) provides these common features so we don’t repeat code.

class BaseComponent:
    """
    A foundational class for managing LLM-based workflows with token tracking.
    Can handle both Azure OpenAI (AzureChatOpenAI) and OpenAI (ChatOpenAI).
    """

    def __init__(
        self,
        llm_config: Optional[LLMConfig] = None,
        temperature: float = 0.0,
        max_retries: int = 5,
    ):
        """
        Initializes the BaseComponent with optional LLM configuration and temperature.

        Args:
            llm_config (Optional[LLMConfig]): Configuration for either Azure or OpenAI.
            temperature (float): Controls the randomness of LLM outputs. Defaults to 0.0.
            max_retries (int): How many times to retry on 429 errors.
        """
        logger = logging.getLogger(self.__class__.__name__)
        tracer = trace.get_tracer(__name__, tracer_provider=get_tracer_provider())

        self.logger = logger
        self.tracer = tracer
        self.llm: Optional[ChatOpenAI] = None
        self.output_parser: Optional[StrOutputParser] = None
        self.state: Optional[DebateState] = None
        self.prompt_template: Optional[ChatPromptTemplate] = None
        self.chain: Optional[RunnableSequence] = None
        self.documents: Optional[List] = None
        self.prompt_tokens = 0
        self.completion_tokens = 0
        self.max_retries = max_retries

        if llm_config is not None:
            self.llm = self._init_llm(llm_config, temperature)
            self.output_parser = StrOutputParser()

    def _init_llm(self, config: LLMConfig, temperature: float):
        """
        Initializes an LLM instance for either Azure OpenAI or standard OpenAI.
        """
        if isinstance(config, AzureOpenAILLMConfig):
            # If it's Azure, use the AzureChatOpenAI class
            return AzureChatOpenAI(
                deployment_name=config.deployment_name,
                azure_endpoint=config.azure_endpoint,
                openai_api_version=config.openai_api_version,
                openai_api_key=config.openai_api_key,
                temperature=temperature,
            )
        elif isinstance(config, OpenAILLMConfig):
            # If it's standard OpenAI, use the ChatOpenAI class
            return ChatOpenAI(
                model_name=config.model_name,
                openai_api_key=config.openai_api_key,
                temperature=temperature,
            )
        else:
            raise ValueError("Unsupported LLMConfig type.")

    def validate_initialization(self) -> None:
        """
        Ensures we have an LLM and an output parser.
        """
        if not self.llm:
            raise ValueError("LLM is not initialized. Ensure `llm_config` is provided.")
        if not self.output_parser:
            raise ValueError("Output parser is not initialized.")

    def execute_chain(self, inputs: Any) -> Any:
        """
        Executes the LLM chain, tracks token usage, and retries on 429 errors.
        """
        if not self.chain:
            raise ValueError("No chain is initialized for execution.")

        retry_wait = 1  # Initial wait time in seconds

        for attempt in range(self.max_retries):
            try:
                with get_openai_callback() as cb:
                    result = self.chain.invoke(inputs)
                    self.logger.info("Prompt Token usage: %s", cb.prompt_tokens)
                    self.logger.info("Completion Token usage: %s", cb.completion_tokens)
                    self.prompt_tokens = cb.prompt_tokens
                    self.completion_tokens = cb.completion_tokens

                return result

            except Exception as e:
                # If the error mentions 429, do exponential backoff and retry
                if "429" in str(e):
                    self.logger.warning(
                        f"Rate limit reached. Retrying in {retry_wait} seconds... "
                        f"(Attempt {attempt + 1}/{self.max_retries})"
                    )
                    time.sleep(retry_wait)
                    retry_wait *= 2
                else:
                    self.logger.error(f"Unexpected error: {str(e)}")
                    raise e

        raise Exception("API request failed after maximum number of retries")

    def create_chain(
        self, system_template: str, human_template: str
    ) -> RunnableSequence:
        """
        Creates a chain for unstructured outputs.
        """
        self.validate_initialization()
        self.prompt_template = ChatPromptTemplate.from_messages(
            [
                ("system", system_template),
                ("human", human_template),
            ]
        )
        self.chain = self.prompt_template | self.llm | self.output_parser
        return self.chain

    def create_structured_output_chain(
        self, system_template: str, human_template: str, output_model: Type[BaseModel]
    ) -> RunnableSequence:
        """
        Creates a chain that yields structured outputs (parsed into a Pydantic model).
        """
        self.validate_initialization()
        self.prompt_template = ChatPromptTemplate.from_messages(
            [
                ("system", system_template),
                ("human", human_template),
            ]
        )
        self.chain = self.prompt_template | self.llm.with_structured_output(output_model)
        return self.chain

    def build_return_with_tokens(self, node_specific_data: dict) -> dict:
        """
        Convenience method to add token usage info into the return values.
        """
        return {
            **node_specific_data,
            "prompt_tokens": self.prompt_tokens,
            "completion_tokens": self.completion_tokens,
        }

    def __call__(self, state: DebateState) -> None:
        """
        Updates the node's local copy of the state.
        """
        self.state = state
        for key, value in state.items():
            setattr(self, key, value)

Key features of BaseComponent:

  • It stores an LLM client (e.g. an OpenAI ChatOpenAI instance) initialized with a given model and API key, as well as an output parser.
  • It provides a method create_chain(system_template, human_template) which sets up a LangChain prompt chain (a RunnableSequence) combining a system prompt and a human prompt. This chain is what actually generates outputs when run.
  • It has an execute_chain(inputs) method that invokes the chain and includes logic to retry if the OpenAI API returns a rate-limit error (HTTP 429). This is done with exponential backoff up to a max_retries count.
  • It keeps track of token usage (prompt tokens and completion tokens) for logging or analysis.
  • The __call__ method of BaseComponent (which each subclass will call via super().__call__(state)) can perform any setup needed before the node’s main logic runs (like ensuring the LLM is initialized).

By building on BaseComponent, each agent class can focus on its unique logic (like what prompt to use and how to handle the state), while inheriting the heavy lifting of interacting with GPT-4o reliably.

Topic Generator Agent (GenerateTopicNode)

The Topic Generator (topic_generator_node.py) is the first agent in the graph. Its job is to come up with a debatable topic for the session. We give it a prompt that instructs it to output a nuanced topic that could reasonably have a pro and con side.

This agent inherits from BaseComponent and uses a prompt chain (system + human prompt) to generate one item of text – the debate topic. When called, it executes the chain (with no special input, just using the prompt) and gets back a topic_text. It then updates the state with:

  • debate_topic: the generated topic (stripped of any extra whitespace),
  • positions: a dictionary assigning the pro and con stances (by default we use "In favor of the topic" and "Against the topic"),
  • stage: set to "opening",
  • speaker: set to "pro" (so the Pro side will speak first).

In code, the return might look like:

return {
    "debate_topic": debate_topic,
    "positions": positions,
    "stage": "opening",
    "speaker": first_speaker  # "pro"
}

Here are the prompts for the topic generator:

SYSTEM_PROMPT = """\
You are a brainstorming AI that suggests debate topics.
You will provide a single, interesting or timely topic that can have two opposing views.
"""

HUMAN_PROMPT = """\
Please suggest one debate topic for two AI agents to discuss.
For example, it could be about technology, politics, philosophy, or any interesting domain.
Just provide the topic in a concise sentence.
"""

Then we pass these prompts in the constructor of the class itself.

class GenerateTopicNode(BaseComponent):
    def __init__(self, llm_config, temperature: float = 0.7):
        super().__init__(llm_config, temperature)
        # Create the prompt chain.
        self.chain: RunnableSequence = self.create_chain(
            system_template=SYSTEM_PROMPT,
            human_template=HUMAN_PROMPT
        )

    def __call__(self, state: DebateState) -> Dict[str, str]:
        """
        Generates a debate topic and assigns positions to the two debaters.
        """
        super().__call__(state)

        topic_text = self.execute_chain({})

        # Store the topic and assign stances in the DebateState
        debate_topic = topic_text.strip()
        positions = {
            "pro": "In favor of the topic",
            "con": "Against the topic"
        }

        
        first_speaker = "pro"
        self.logger.info("Welcome to our debate panel! Today's debate topic is: %s", debate_topic)
        return {
            "debate_topic": debate_topic,
            "positions": positions,
            "stage": "opening",
            "speaker": first_speaker
        }

It’s a pattern we will repeat for all classes except for those not using LLMs and the fact checker.

Now we can implement the 2 stars of the show, the Pro and Con argument agents!

Debater Agents (Pro and Con)

Link: pro_debater_node.py

The two debater agents are very similar in structure, but each uses different prompt templates tailored to their role (pro vs con) and the stage of the debate.

The Pro debater, for example, has to handle an opening statement and a counter-argument (countering the Con’s rebuttal). We also need logic for retries in case a statement fails fact-check. In code, the ProDebater class sets up multiple prompt chains:

  • opening_chain and an opening_retry_chain (using slightly different human prompts – the retry prompt might instruct it to try again without repeating any factually dubious claims).
  • counter_chain and counter_retry_chain for the counter-argument stage.
class ProDebaterNode(BaseComponent):
    def __init__(self, llm_config, temperature: float = 0.7):
        super().__init__(llm_config, temperature)
        self.opening_chain = self.create_chain(SYSTEM_PROMPT, OPENING_HUMAN_PROMPT)
        self.opening_retry_chain = self.create_chain(SYSTEM_PROMPT, OPENING_RETRY_HUMAN_PROMPT)
        self.counter_chain = self.create_chain(SYSTEM_PROMPT, COUNTER_HUMAN_PROMPT)
        self.counter_retry_chain = self.create_chain(SYSTEM_PROMPT, COUNTER_RETRY_HUMAN_PROMPT)

    def __call__(self, state: DebateState) -> Dict[str, Any]:
        super().__call__(state)

        debate_topic = state.get("debate_topic")
        messages = state.get("messages", [])
        stage = state.get("stage")
        speaker = state.get("speaker")

        # Check if retrying (last message was by pro and not validated)
        last_msg = messages[-1] if messages else None
        retrying = last_msg and last_msg["speaker"] == SPEAKER_PRO and not last_msg["validated"]

        if stage == STAGE_OPENING and speaker == SPEAKER_PRO:
            chain = self.opening_retry_chain if retrying else self.opening_chain # select which chain we are triggering: the normal one or the fact-cehcked one
            result = chain.invoke({
                "debate_topic": debate_topic
            })
        elif stage == STAGE_COUNTER and speaker == SPEAKER_PRO:
            opponent_msg = self._get_last_message_by(SPEAKER_CON, messages)
            debate_history = get_debate_history(messages)
            chain = self.counter_retry_chain if retrying else self.counter_chain
            result = chain.invoke({
                "debate_topic": debate_topic,
                "opponent_statement": opponent_msg,
                "debate_history": debate_history
            })
        else:
            raise ValueError(f"Unknown turn for ProDebater: stage={stage}, speaker={speaker}")
        new_message = create_debate_message(speaker=SPEAKER_PRO, content=result, stage=stage)
        self.logger.info("Speaker: %s, Stage: %s, Retry: %s\nMessage:\n%s", speaker, stage, retrying, result)
        return {
            "messages": messages + [new_message]
        }

    def _get_last_message_by(self, speaker_prefix, messages):
        for m in reversed(messages):
            if m.get("speaker") == speaker_prefix:
                return m["content"]
        return ""

When the ProDebater’s __call__ runs, it looks at the current stage and speaker in the state to decide what to do:

  • If it’s the opening stage and the speaker is “pro”, it uses the opening_chain to generate an opening argument. If the last message from Pro was marked invalid (not validated), it knows this is a retry, so it would use the opening_retry_chain instead.
  • If it’s the counter stage and speaker is “pro”, it generates a counter-argument to whatever the opponent (Con) just said. It will fetch the last message by the Con from the messages history, and feed that into the prompt (so that the Pro can directly counter it). Again, if the last Pro message was invalid, it would switch to the retry chain.

After generating its argument, the Debater agent creates a new message entry (with speaker="pro", the content text, validated=False initially, and the stage) and appends it to the state’s message list. That becomes the output of the node (LangGraph will merge this partial state update into the global state).

The Con Debater agent mirrors this logic for its stages:

It similarly appends its message to the state.

It has a rebuttal and closing argument (final argument) stage, each with a normal and a retry chain.

It checks if it’s the rebuttal stage (speaker “con”) or final argument stage (speaker “con”) and invokes the appropriate chain, possibly using the last Pro message for context when rebutting.

con_debater_node.py

By using class-based implementation, our debaters’ code is easier to maintain. We can clearly separate what the Pro does vs what the Con does, even if they share structure. Also, by encapsulating prompt chains inside the class, each debater can manage multiple possible outputs (regular vs retry) cleanly.

Prompt design: The actual prompts (in prompts/pro_debater_prompts.py and con_debater_prompts.py) guide the GPT-4o model to take on a persona (“You are a debater arguing for/against the topic…”) and produce the argument. They also instruct the model to keep statements factual and logical. If a fact check fails, the retry prompt may say something like: “Your previous statement had an unverified claim. Revise your argument to be factually correct while maintaining your position.” – encouraging the model to correct itself.

With this, our AI debaters can engage in a multi-turn duel, and even recover from factual missteps.

Fact Checker Agent (FactCheckNode)

After each debater speaks, the Fact Checker agent swoops in to verify their claims. This agent is implemented in <a href="https://github.com/iason-solomos/Deb8flow/blob/main/nodes/fact_checker_node.py">fact_checker_node.py</a>, and interestingly, it uses the GPT-4o model’s browsing ability rather than our own custom prompts. Essentially, we delegate the fact-checking to OpenAI’s GPT-4 with web search.

How does this work? The OpenAI Python client for GPT-4 (with browsing) allows us to send a user message and get a structured response. In FactCheckNode.__call__, we do something like:

completion = self.client.beta.chat.completions.parse(
            model="gpt-4o-search-preview",
            web_search_options={},
            messages=[{
                "role": "user",
                "content": (
                        f"Consider the following statement from a debate. "
                        f"If the statement contains numbers, or figures from studies, fact-check it online.\n\n"
                        f"Statement:\n\"{claim}\"\n\n"
                        f"Reply clearly whether any numbers or studies might be inaccurate or hallucinated, and why."
                        f"\n"
                        f"If the statement doesn't contain references to studies or numbers cited, don't go online to fact-check, and just consider it successfully fact-checked, with a 'yes' score.\n\n"
                )
            }],
            response_format=FactCheck
        )

If the result is “yes” (meaning the claim seems truthful or at least not factually wrong), the Fact Checker will mark the last message’s validated field as True in the state, and output {"validated": True} with no further changes. This signals that the debate can continue normally.

If the result is “no” (meaning it found the claim to be incorrect or dubious), the Fact Checker will append a new message to the state with speaker="fact_checker" describing the finding (or we could simply mark it, but providing a brief note like “(Fact Checker: The statistic cited could not be verified.)” can be useful). It will also set validated: False and increment a counter for whichever side made the claim. The output state from this node includes validated: False and an updated times_pro_fact_checked or times_con_fact_checked count.

We also use a Pydantic BaseModel to control the output of the LLM:

class FactCheck(BaseModel):
    """
    Pydantic model for the fact checking the claims made by debaters.

    Attributes:
        binary_score (str): 'yes' if the claim is verifiable and truthful, 'no' otherwise.
    """

    binary_score: str = Field(
        description="Indicates if the claim is verifiable and truthful. 'yes' or 'no'."
    )
    justification: str = Field(
        description="Explanation of the reasoning behind the score."
    )

Debate Moderator Agent (DebateModeratorNode)

The Debate Moderator is the conductor of the debate. Instead of producing lengthy text, this agent’s job is to manage turn-taking and stage progression. In the workflow, after a statement is validated by the Fact Checker, control passes to the Moderator node. The Moderator then issues a Command that updates the state for the next turn and directs the flow to the appropriate next agent.

The logic in DebateModeratorNode.__call__ (see <a href="https://github.com/iason-solomos/Deb8flow/blob/main/nodes/debate_moderator_node.py">nodes/debate_moderator_node.py</a>) goes roughly like this:

if stage == STAGE_OPENING and speaker == SPEAKER_PRO:
            return Command(
                update={"stage": STAGE_REBUTTAL, "speaker": SPEAKER_CON},
                goto=NODE_CON_DEBATER
            )
        elif stage == STAGE_REBUTTAL and speaker == SPEAKER_CON:
            return Command(
                update={"stage": STAGE_COUNTER, "speaker": SPEAKER_PRO},
                goto=NODE_PRO_DEBATER
            )
        elif stage == STAGE_COUNTER and speaker == SPEAKER_PRO:
            return Command(
                update={"stage": STAGE_FINAL_ARGUMENT, "speaker": SPEAKER_CON},
                goto=NODE_CON_DEBATER
            )
        elif stage == STAGE_FINAL_ARGUMENT and speaker == SPEAKER_CON:
            return Command(
                update={},
                goto=NODE_JUDGE
            )

        raise ValueError(f"Unexpected stage/speaker combo: stage={stage}, speaker={speaker}")

Each conditional corresponds to a point in the debate where a turn just ended, and sets up the next turn. For example, after the opening (Pro just spoke), it sets stage to rebuttal, switches speaker to Con, and directs the workflow to the Con debater node​. After the final_argument (Con’s closing), it directs to the Judge with no further update (the debate stage effectively ends).

Fact Check Router (FactCheckRouterNode)

This is another control node (like the Moderator) that introduces conditional logic. The Fact Check Router sits right after the Fact Checker agent in the flow. Its purpose is to branch the workflow depending on the fact-check result.

In <a href="https://github.com/iason-solomos/Deb8flow/blob/main/nodes/fact_check_router_node.py">nodes/fact_check_router_node.py</a>, the logic is:

if pro_fact_checks >= 3 or con_fact_checks >= 3:
            disqualified = SPEAKER_PRO if pro_fact_checks >= 3 else SPEAKER_CON
            winner = SPEAKER_CON if disqualified == SPEAKER_PRO else SPEAKER_PRO

            verdict_msg = {
                "speaker": "moderator",
                "content": (
                    f"Debate ended early due to excessive factual inaccuracies.\n\n"
                    f"DISQUALIFIED: {disqualified.upper()} (exceeded fact check limit)\n"
                    f"WINNER: {winner.upper()}"
                ),
                "validated": True,
                "stage": "verdict"
            }
            return Command(
                update={"messages": messages + [verdict_msg]},
                goto=END
            )
        if last_message.get("validated"):
            return Command(goto=NODE_DEBATE_MODERATOR)
        elif speaker == SPEAKER_PRO:
            return Command(goto=NODE_PRO_DEBATER)
        elif speaker == SPEAKER_CON:
            return Command(goto=NODE_CON_DEBATER)
        raise ValueError("Unable to determine routing in FactCheckRouterNode.")

First, the Fact Check Router checks if either side’s fact-check count has reached 3. If so, it creates a Moderator-style message announcing an early end: the offending side is disqualified and the other side is the winner​. It appends this verdict to the messages and returns a Command that jumps to END, effectively terminating the debate without going to the Judge (because we already know the outcome).

If we’re not ending the debate early, it then looks at the Fact Checker’s result for the last message (which is stored as validated on that message). If validated is True, we go to the debate moderator: Command(goto=debate_moderator_node).

Else if the statement fails fact-check, the workflow goes back to the debater to produce a revised statement (with the state counters updated to reflect the failure). This loop can happen multiple times if needed (up to the disqualification limit).

This dynamic control is the heart of Deb8flow’s “agentic” nature – the ability to adapt the path of execution based on the content of the agents’ outputs. It showcases LangGraph’s strength: combining control flow with state. We’re essentially encoding debate rules (like allowing retries for false claims, or ending the debate if someone cheats too often) directly into the workflow graph.

Judge Agent (JudgeNode)

Last but not least, the Judge agent delivers the final verdict based on rhetorical skill, clarity, structure, and overall persuasiveness. Its system prompt and human prompt make this explicit:

  • System Prompt: “You are an impartial debate judge AI. … Evaluate which debater presented their case more clearly, persuasively, and logically. You must focus on communication skills, structure of argument, rhetorical strength, and overall coherence.”
  • Human Prompt: “Here is the full debate transcript. Please analyze the performance of both debaters—PRO and CON. Evaluate rhetorical performance—clarity, structure, persuasion, and relevance—and decide who presented their case more effectively.”

When the Judge node runs, it receives the entire debate transcript (all validated messages) alongside the original topic. It then uses GPT-4o to examine how each side framed their arguments, handled counterpoints, and supported (or failed to support) claims with examples or logic. Crucially, the Judge is forbidden to evaluate which position is objectively correct (or who it thinks might be correct)—only who argued more persuasively.

Below is an example final verdict from a Deb8flow run on the topic:
“Should governments implement a universal basic income in response to increasing automation in the workforce?”

WINNER: PRO

REASON: The PRO debater presented a more compelling and rhetorically effective case for universal basic income. Their arguments were well-structured, beginning with a clear statement of the issue and the necessity of UBI in response to automation. They effectively addressed potential counterarguments by highlighting the unprecedented speed and scope of current technological changes, which distinguishes the current situation from past technological shifts. The PRO also provided empirical evidence from UBI pilot programs to counter the CON's claims about work disincentives and economic inefficiencies, reinforcing their argument with real-world examples.

In contrast, the CON debater, while presenting valid concerns about UBI, relied heavily on historical analogies and assumptions about workforce adaptability without adequately addressing the unique challenges posed by modern automation. Their arguments about the fiscal burden and potential inefficiencies of UBI were less supported by specific evidence compared to the PRO's rebuttals.

Overall, the PRO's arguments were more coherent, persuasive, and backed by empirical evidence, making their case more convincing to a neutral observer.

Langsmith Tracing

Throughout Deb8flow’s development, I relied on LangSmith (LangChain’s tracing and observability toolkit) to ensure the entire debate pipeline was behaving correctly. Because we have multiple agents passing control between themselves, it’s easy for unexpected loops or misrouted states to occur. LangSmith provides a convenient way to:

  • Visualize Execution Flow: You can see each agent’s prompt, the tokens consumed (so you can also track costs), and any intermediate states. This makes it much simpler to confirm that, say, the Con Debater is properly referencing the Pro Debater’s last message, or that the Fact Checker is accurately receiving the claim to verify.
  • Debug State Updates: If the Moderator or Fact Check Router is sending the flow to the wrong node, the trace will highlight that mismatch. You can trace which agent was invoked at each step and why, helping you spot stage or speaker misalignments early.
  • Track Prompt and Completion Tokens: With multiple GPT-4o calls, it’s useful to see how many tokens each stage is using, which LangSmith logs automatically if you enable tracing.

Integrating LangSmith is unexpectedly easy. You will just need to provide these 3 keys in your .env file: LANGCHAIN_API_KEY

LANGCHAIN_TRACING_V2

LANGCHAIN_PROJECT

Then you can open the LangSmith UI to see a structured trace of each run. This greatly reduces the guesswork involved in debugging multi-agent systems and is, in my experience, essential for more complex AI orchestration like ours. Example of a single run:

The trace in waterfall mode in Lansmith of one run, showing how the whole flow ran. Source: Generated by the author using Langsmith.

Reflections and Next Steps

Building Deb8flow was an eye-opening exercise in orchestrating autonomous agent workflows. We didn’t just chain a single model call – we created an entire debate simulation with AI agents, each with a specific role, and allowed them to interact according to a set of rules. LangGraph provided a clear framework to define how data and control flows between agents, making the complex sequence manageable in code. By using class-based agents and a shared state, we maintained modularity and clarity, which will pay off for any software engineering project in the long run.

An exciting aspect of this project was seeing emergent behavior. Even though each agent follows a script (a prompt), the unscripted combination – a debater trying to deceive, a fact-checker catching it, the debater rephrasing – felt surprisingly realistic! It’s a small step toward more Agentic Ai systems that can perform non-trivial multi-step tasks with oversight on each other.

There’s plenty of ideas for improvement:

  • User Interaction: Currently it’s fully autonomous, but one could add a mode where a human provides the topic or even takes the role of one side against an AI opponent.
  • We can switch the order in which the Debaters talk.
  • We can change the prompts, and thus to a good degree the behavior of the agents, and experiment with different prompts.
  • Make the debaters also perform web search before producing their statements, thus providing them with the latest information.

The broader implication of Deb8flow is how it showcases a pattern for composable AI agents. By defining clear boundaries and interactions (just like microservices in software), we can have complex AI-driven processes that remain interpretable and controllable. Each agent is like a cog in a machine, and LangGraph is the gear system making them work in unison.

I found this project energizing, and I hope it inspires you to explore multi-agent workflows. Whether it’s debating, collaborating on writing, or solving problems from different expert angles, the combination of GPT, tools, and structured agentic workflows opens up a new world of possibilities for AI development. Happy hacking!

References

[1] D. Bouchard, “From Basics to Advanced: Exploring LangGraph,” Medium, Nov. 22, 2023. [Online]. Available: https://medium.com/data-science/from-basics-to-advanced-exploring-langgraph-e8c1cf4db787. [Accessed: Apr. 1, 2025].

[2] A. W. T. Ng, “Building a Research Agent that Can Write to Google Docs: Part 1,” Towards Data Science, Jan. 11, 2024. [Online]. Available: https://towardsdatascience.com/building-a-research-agent-that-can-write-to-google-docs-part-1-4b49ea05a292/. [Accessed: Apr. 1, 2025].

The post Deb8flow: Orchestrating Autonomous AI Debates with LangGraph and GPT-4o appeared first on Towards Data Science.

]]>
4 Levels of GitHub Actions: A Guide to Data Workflow Automation https://towardsdatascience.com/4-levels-of-github-actions-a-guide-to-data-workflow-automation/ Wed, 02 Apr 2025 00:32:32 +0000 https://towardsdatascience.com/?p=605379 From a simple Python workflow to scheduled data processing and security management

The post 4 Levels of GitHub Actions: A Guide to Data Workflow Automation appeared first on Towards Data Science.

]]>
Automation has become an indispensable element for ensuring operational efficiency and reliability in modern software development. GitHub Actions, an integrated Continuous Integration and Continuous Deployment (CI/CD) tool within GitHub, has established its position in the software development industry by providing a comprehensive platform for automating development and deployment workflows. However, its functionalities extend beyond this … We will delve into the use of GitHub Actions within the realm of data domain, demonstrating how it can streamline processes for developers and data professionals by automating data retrieval from external sources and data transformation operations.

GitHub Action Benefits

Github Actions are already well-known for its functionalities in the software development domain, while in recent years, also discovered as offering compelling benefits in streamlining data workflows:

  • Automate the data science environments setup, such as installing dependencies and required packages (e.g. pandas, PyTorch).
  • Streamline the data integration and data transformation steps by connecting to databases to fetch or update records, and using scripting languages like Python to preprocess or transform the raw data.
  • Create an iterable data science lifecycle by automating the training of machine learning models whenever new data is available, and deploying models to production environments automatically after successful training.
  • GitHub Actions is free for unlimited usage on GitHub-hosted runners for public repositories. It also provides 2,000 free minutes of compute time per month for individual accounts using private repositories. It is easy to set up for building a proof-of-concept simply requiring a GitHub account, without worrying about opting in for a cloud provider.
  • Numerous GitHub Actions templates, and community resources are available online. Additionally, community and crowdsourced forums provide answers to common questions and troubleshooting support.

GitHub Action Building Blocks

GitHub Action is a feature of GitHub that allows users to automate workflows directly within their repositories. These workflows are defined using YAML files and can be triggered by various events such as code pushes, pull requests, issue creation, or scheduled intervals. With its extensive library of pre-built actions and the ability to write custom scripts, GitHub Actions is a versatile tool for automating tasks.

  • Event: If you have come across using an automation on your devices, such as turning on dark mode when after 8pm, then you are familiar with the concept of using a trigger point or condition to initiate a workflow of actions. In GitHub Actions, this is referred to as an Event, which can be time-based e.g. scheduled on the 1st day of the month or automatically run every hour. Alternatively, Events can be triggered by certain behaviors, like every time changes are pushed from a local repository to a remote repository.
  • Workflow: A workflow is composed by a series of jobs and GitHub allows flexibility of customizing each individual step in a job to your needs. It is generally defined by a YAML file stored in the .github/workflow directory in a GitHub repository.
  • Runners: a hosted environment that allows running the workflow. Instead of running a script on your laptop, now you can borrow GitHub hosted runners to do the job for you or alternatively specify a self-hosted machine.
  • Runs: each iteration of running the workflow create a run, and we can see the logs of each run in the “Actions” tab. GitHub provides an interface for users to easily visualize and monitor Action run logs.

4 Levels of Github Actions

We will demonstrate the implementation GitHub actions through 4 levels of difficulty, starting with the “minimal viable product” and progressively introducing additional components and customization in each level.

1. “Simple Workflow” with Python Script Execution

Start by creating a GitHub repository where you want to store your workflow and the Python script. In your repository, create a .github/workflows directory (please note that this directory must be placed within the workflows folder for the action to be executed successfully). Inside this directory, create a YAML file (e.g., simple-workflow.yaml) that defines your workflow.

The following examples shows a workflow file that executes the python script hello_world.py based on a manual trigger.

name: simple-workflow

on: 
    workflow_dispatch:
    
jobs:
    run-hello-world:
      runs-on: ubuntu-latest
      steps:
          - name: Checkout repo content
            uses: actions/checkout@v4
          - name: run hello world
            run: python code/hello_world.py

It consists of three sections: First, name: simple-workflow defines the workflow name. Second, on: workflow_dispatch specifies the condition for running the workflow, which is manually triggering each action. Last, the workflow jobs jobs: run-hello-world break down into the following steps:

  • runs-on: ubuntu-latest: Specify the runner (i.e., a virtual machine) to run the workflow — ubuntu-latest is a standard GitHub hosted runner containing an environment of tools, packages, and settings available for GitHub Actions to use.
  • uses: actions/checkout@v4: Apply a pre-built GitHub Action checkout@v4 to pull the repository content into the runner’s environment. This ensures that the workflow has access to all necessary files and scripts stored in the repository.
  • run: python code/hello_world.py: Execute the Python script located in the code sub-directory by running shell commands directly in your YAML workflow file.

2. “Push Workflow” with Environment Setup

The first workflow demonstrated the minimal viable version of the GitHub Action, but it did not take full advantage of the GitHub Actions. At the second level, we will add a bit more customization and functionalities – automatically set up the environment with Python version 3.11, install required packages and execute the script whenever changes are pushed to main branch.

name: push-workflow

on: 
    push:
        branches:
            - main

jobs:
    run-hello-world:
      runs-on: ubuntu-latest
      steps:
          - name: Checkout repo content
            uses: actions/checkout@v4
          - name: Set up Python
            uses: actions/setup-python@v5
            with:
              python-version: '3.11' 
          - name: Install dependencies
            run: |
              python -m pip install --upgrade pip
              pip install -r requirements.txt
          - name: Run hello world
            run: python code/hello_world.py
  • on: push: Instead of being activated by manual workflow dispatch, this allows the action to run whenever there is a push from the local repository to the remote repository. This condition is commonly used in a software development setting for integration and deployment processes, which is also adopted in the Mlops workflow, ensuring that code changes are consistently tested and validated before being merged into a different branch. Additionally, it facilitates continuous deployment by automatically deploying updates to production or staging environments as soon as changes are pushed. Here we add an optional condition branches: -main to only trigger this action when it is pushed to the main branch.
  • uses: actions/setup-python@v5: We added the “Set up Python” step using GitHub’s built-in action setup-python@v5. Using the setup-python action is the recommended way of using Python with GitHub Actions because it ensures consistent behavior across different runners and versions of Python.
  • pip install -r requirements.txt: Streamlined the installation of required packages for the environment, which are saved in the requirements.txt file, thus speed up the further building of data pipeline and data science solution.

If you are interested in the basics of setting up a development environment for your data science projects, my previous blog post “7 Tips to Future-Proof Machine Learning Projects” provides a bit more explanation.

3. “Scheduled Workflow” with Argument Parsing

At the third level, we add more dynamics and complexity to make it more suitable for real-world applications. We introduce scheduled jobs as they bring even more benefits to a data science project, enabling periodic fetching of more recent data and reducing the need to manually run the script whenever data refresh is required. Additionally, we utilize dynamic argument parsing to execute the script based on different date range parameters according to the schedule.

name: scheduled-workflow

on: 
    workflow_dispatch:
    schedule:
        - cron: "0 12 1 * *" # run 1st day of every month

jobs:
    run-data-pipeline:
        runs-on: ubuntu-latest
        steps:
            - name: Checkout repo content
              uses: actions/checkout@v4
            - name: Set up Python
              uses: actions/setup-python@v5
              with:
                python-version: '3.11'  # Specify your Python version here
            - name: Install dependencies
              run: |
                python -m pip install --upgrade pip
                python -m http.client
                pip install -r requirements.txt
            - name: Run data pipeline
              run: |
                  PREV_MONTH_START=$(date -d "`date +%Y%m01` -1 month" +%Y-%m-%d)
                  PREV_MONTH_END=$(date -d "`date +%Y%m01` -1 day" +%Y-%m-%d)
                  python code/fetch_data.py --start $PREV_MONTH_START --end $PREV_MONTH_END
            - name: Commit changes
              run: |
                  git config user.name '<github-actions>'
                  git config user.email '<github-actions@github.com>'
                  git add .
                  git commit -m "update data"
                  git push
  • on: schedule: - cron: "0 12 1 * *": Specify a time based trigger using the cron expression “0 12 1 * *” – run at 12:00 pm on the 1st day of every month. You can use crontab.guru to help create and validate cron expressions, which follow the format: “minute/hour/ day of month/month/day of week”.
  • python code/fetch_data.py --start $PREV_MONTH_START --end $PREV_MONTH_END: “Run data pipeline” step runs a series of shell commands. It defines two variables PREV_MONTH_START and PREV_MONTH_END to get the first day and the last day of the previous month. These two variables are passed to the python script “fetch_data.py” to dynamically fetch data for the previous month relative to whenever the action is run. To allow the Python script to accept custom variables via command-line arguments, we use argparse library to build the script. This deserves a separate topic, but here is quick preview of how the python script would look like using the argparse library to handle command-line arguments ‘–start’ and ‘–end’ parameters.
## fetch_data.py

import argparse
import os
import urllib

def main(args=None):
	  parser = argparse.ArgumentParser()
	  parser.add_argument('--start', type=str)
	  parser.add_argument('--end', type=str)
	  args = parser.parse_args(args=args)
	  print("Start Date is: ", args.start)
	  print("End Date is: ", args.end)
	  
	  date_range = pd.date_range(start=args.start, end=args.end)
	  content_lst = []
	
	  for date in date_range:
	      date = date.strftime('%Y-%m-%d')
	
		  params = urllib.parse.urlencode({
	          'api_token': '<NEWS_API_TOKEN>',
	          'published_on': date,
	          'search': search_term,
	      })
		  url = '/v1/news/all?{}'.format(params)
		    
		  content_json = parse_news_json(url, date)
		  content_lst.append(content_json)

	  with open('data.jsonl', 'w') as f:
	      for item in content_lst:
	          json.dump(item, f)
	          f.write('\n')
	  
      return content_lst

When the command python code/fetch_data.py --start $PREV_MONTH_START --end $PREV_MONTH_END executes, it creates a date range between $PREV_MONTH_START and $PREV_MONTH_END. For each day in the date range, it generates a URL, fetches the daily news through the API, parses the JSON response, and collects all the content into a JSON list. We then output this JSON list to the file “data.jsonl”.

- name: Commit changes
  run: |
      git config user.name '<github-actions>'
      git config user.email '<github-actions@github.com>'
      git add .
      git commit -m "update data"
      git push

As shown above, the last step “Commit changes” commits the changes, configures the git user email and name, stages the changes, commits them, and pushes to the remote GitHub repository. This is a necessary step when running GitHub Actions that result in changes to the working directory (e.g., output file “data.jsonl” is created). Otherwise, the output is only saved in the /temp folder within the runner environment, and appears as if no changes have been made after the action is completed.

4. “Secure Workflow” with Secrets and Environment Variables Management

The final level focuses on improving the security and performance of the GitHub workflow by addressing non-functional requirements.

name: secure-workflow

on: 
    workflow_dispatch:
    schedule:
        - cron: "34 23 1 * *" # run 1st day of every month

jobs:
    run-data-pipeline:
        runs-on: ubuntu-latest
        steps:
            - name: Checkout repo content
              uses: actions/checkout@v4
            - name: Set up Python
              uses: actions/setup-python@v5
              with:
                python-version: '3.11'  # Specify your Python version here
            - name: Install dependencies
              run: |
                python -m pip install --upgrade pip
                python -m http.client
                pip install -r requirements.txt
            - name: Run data pipeline
              env:
                  NEWS_API_TOKEN: ${{ secrets.NEWS_API_TOKEN }} 
              run: |
                  PREV_MONTH_START=$(date -d "`date +%Y%m01` -1 month" +%Y-%m-%d)
                  PREV_MONTH_END=$(date -d "`date +%Y%m01` -1 day" +%Y-%m-%d)
                  python code/fetch_data.py --start $PREV_MONTH_START --end $PREV_MONTH_END
            - name: Check changes
              id: git-check
              run: |
                  git config user.name 'github-actions'
                  git config user.email 'github-actions@github.com'
                  git add .
                  git diff --staged --quiet || echo "changes=true" >> $GITHUB_ENV
            - name: Commit and push if changes
              if: env.changes == 'true'
              run: |
                  git commit -m "update data"
                  git push
                  

To improve workflow efficiency and reduce errors, we add a check before committing changes, ensuring that commits and pushes only occur when there are actual changes since the last commit. This is achieved through the command git diff --staged --quiet || echo "changes=true" >> $GITHUB_ENV.

  • git diff --staged checks the difference between the staging area and the last commit.
  • --quiet suppresses the output — it returns 0 when there are no changes between the staged environment and working directory; whereas it returns exit code 1 (general error) when there are changes between the staged environment and working directory
  • This command is then connected to echo "changes=true" >> $GITHUB_ENV through the OR operator || which tells the shell to run the rest of the line if the first command failed. Therefore, if changes exist, “changes=true” is passed to the environment variable $GITHUB_ENV and accessed at the next step to trigger git commit and push conditioned on env.changes == 'true'.

Lastly, we introduce the environment secret, which enhances security and avoids exposing sensitive information (e.g., API token, personal access token) in the codebase. Additionally, environment secrets offer the benefit of separating the development environment. This means you can have different secrets for different stages of your development and deployment pipeline. For example, the testing environment (e.g., in the dev branch) can only access the test token, whereas the production environment (e.g. in the main branch) will be able to access the token linked to the production instance.

To set up environment secrets in GitHub:

  1. Go to your repository settings
  2. Navigate to Secrets and Variables > Actions
  3. Click “New repository secret”
  4. Add your secret name and value

After setting up the GitHub environment secrets, we will need to add the secret to the workflow environment, for example below we added ${{ secrets.NEWS_API_TOKEN }} to the step “Run data pipeline”.

- name: Run data pipeline
  env:
      NEWS_API_TOKEN: ${{ secrets.NEWS_API_TOKEN }} 
  run: |
      PREV_MONTH_START=$(date -d "`date +%Y%m01` -1 month" +%Y-%m-%d)
      PREV_MONTH_END=$(date -d "`date +%Y%m01` -1 day" +%Y-%m-%d)
      python code/fetch_data.py --start $PREV_MONTH_START --end $PREV_MONTH_END

We then update the Python script fetch_data.py to access the environment secret using os.environ.get().

import os api_token = os.environ.get('NEWS_API_TOKEN')

Take-Home Message

This guide explores the implementation of GitHub Actions for building dynamic data pipelines, progressing through four different levels of workflow implementations:

  • Level 1: Basic workflow setup with manual triggers and simple Python script execution.
  • Level 2: Push workflow with development environment setup.
  • Level 3: Scheduled workflow with dynamic date handling and data fetching with command-line arguments
  • Level 4: Secure pipeline workflow with secrets and environment variables management

Each level builds upon the previous one, demonstrating how GitHub Actions can be effectively utilized in the data domain to streamline data solutions and speed up the development lifecycle.

The post 4 Levels of GitHub Actions: A Guide to Data Workflow Automation appeared first on Towards Data Science.

]]>
Agentic AI: Single vs Multi-Agent Systems https://towardsdatascience.com/agentic-ai-single-vs-multi-agent-systems/ Wed, 02 Apr 2025 00:15:17 +0000 https://towardsdatascience.com/?p=605376 Demonstrated by building a tech news agent in LangGraph

The post Agentic AI: Single vs Multi-Agent Systems appeared first on Towards Data Science.

]]>
We’ve seen this shift the last few years from building rigid programming systems to natural language-driven workflows, all made possible with more advanced large language models.

One of the interesting areas into these Agentic Ai systems is the difference between building a single versus multi-agent workflow, or perhaps the difference between working with more flexible vs controlled systems.

This article will help you understand what agentic AI is, how to build simple workflows with LangGraph, and the differences in results you can achieve with the different architectures. I’ll demonstrate this by building a tech news agent with various data sources.

As for the use case, I’m a bit obsessed with getting automatic news updates, based on my preferences, without me drowning in information overload every day.

Having AI summarize for us instead of scouting info on our own | Image by author

Working with summarizing and gathering research is one of those areas that agentic AI can really shine.

So follow along while I keep trying to make AI do the grunt work for me, and we’ll see how single-agent compares to multi-agent setups.

I always keep my work jargon-free, so if you’re new to agentic AI, this piece should help you understand what it is and how to work with it. If you’re not new to it, you can scroll past some of the sections.

Agentic AI (& LLMs)

Agentic AI is about programming with natural language. Instead of using rigid, explicit code, you’re instructing large language models (LLMs) to route data and perform actions through plain language in automating tasks.

Using natural language in workflows isn’t new, we’ve used NLP for years to extract and process data. What’s new is the amount of freedom we can now give language models, allowing them to handle ambiguity and make decisions dynamically.

Traditional automation from programmatic to NLP to LLMs | Image by author

But just because LLMs can understand nuanced language doesn’t mean they inherently validate facts or maintain data integrity. I see them primarily as a communication layer that sits on top of structured systems and existing data sources.

LLMs is a communication layer and not the system itself | Image by author

I usually explain it like this to non-technical people: they work a bit like we do. If we don’t have access to clean, structured data, we start making things up. Same with LLMs. They generate responses based on patterns, not truth-checking.

So just like us, they do their best with what they’ve got. If we want better output, we need to build systems that give them reliable data to work with. So, with Agentic systems we integrate ways for them to interact with different data sources, tools and systems.

Now, just because we can use these larger models in more places, doesn’t mean we should. LLMs shine when interpreting nuanced natural language, think customer service, research, or human-in-the-loop collaboration.

But for structured tasks — like extracting numbers and sending them somewhere — you need to use traditional approaches. LLMs aren’t inherently better at math than a calculator. So, instead of having an LLM do calculations, you give an LLM access to a calculator.

So whenever you can build parts of a workflow programmatically, that will still be the better option.

Nevertheless, LLMs are great at adapting to messy real-world input and interpreting vague instructions so combining the two can be a great way to build systems.

Agentic Frameworks

I know a lot of people jump straight to CrewAI or AutoGen here, but I’d recommend checking out LangGraph, Agno, Mastra, and Smolagents. Based on my research, these frameworks have received some of the strongest feedback so far.

I collect resources in a Github repo here with the most popular frameworks | Image by author

LangGraph is more technical and can be complex, but it’s the preferred choice for many developers. Agno is easier to get started with but less technical. Mastra is a solid option for JavaScript developers, and Smolagents shows a lot of promise as a lightweight alternative.

In this case, I’ve gone with LangGraph — built on top of LangChain — not because it’s my favorite, but because it’s becoming a go-to framework that more devs are adopting.

So, it’s worth being familiar with.

It has a lot of abstractions though, where you may want to rebuild some of it just to be able to control and understand it better.

I will not go into detail on LangGraph here, so I decided to build a quick guide for those that need to get a review.

As for this use case, you’ll be able to run the workflow without coding anything, but if you’re here to learn you may also want to understand how it works.

Choosing an LLM

Now, you might jump into this and wonder why I’m choosing certain LLMs as the base for the agents.

You can’t just pick any model, especially when working within a framework. They need to be compatible. Key things to look for are tool calling support and the ability to generate structured outputs.

I’d recommend checking HuggingFace’s Agent Leaderboard to see which models actually perform well in real-world agentic systems.

For this workflow, you should be fine using models from Anthropic, OpenAI, or Google. If you’re considering another one, just make sure it’s compatible with LangChain.

Single vs. Multi-Agent Systems

If you build a system around one LLM and give it a bunch of tools you want it to use, you’re working with a single-agent workflow. It’s fast, and if you’re new to agentic AI, it might seem like the model should just figure things out on its own.

One agent has access to many tools | Image by author

But the thing is these workflows are just another form of system design. Like any software project, you need to plan the process, define the steps, structure the logic, and decide how each part should behave.

Think about how the logic should work for your use case | Image by author

This is where multi-agent workflows come in.

Not all of them are hierarchical or linear though, some are collaborative. Collaborative workflows would then also fall into the more flexible approach that I find more difficult to work with, at least as it is now with the capabilities that exist.

However, collaborative workflows do also break apart different functions into their own modules.

Single-agent and collaborative workflows are great to start with when you’re just playing around, but they don’t always give you the precision needed for actual tasks.

For the workflow I will build here, I already know how the APIs should be used — so it’s my job to guide the system to use it the right way.

We’ll go through comparing a single-agent setup with a hierarchical multi-agent system, where a lead agent delegates tasks across a small team so you can see how they behave in practice.

Building a Single Agent Workflow

With a single thread — i.e., one agent — we give an LLM access to several tools. It’s up to the agent to decide which tool to use and when, based on the user’s question.

One LLM/Agent has access to many tool with many options | Image by author

The challenge with a single agent is control.

No matter how detailed the system prompt is, the model may not follow our requests (this can happen in more controlled environments too). If we give it too many tools or options, there’s a good chance it won’t use all of them or even use the right ones.

To illustrate this, we’ll build a tech news agent that has access to several API endpoints with custom data with several options as parameters in the tools. It’s up to the agent to decide how many to use and how to setup the final summary.

Remember, I build these workflows using LangGraph. I won’t go into LangGraph in depth here, so if you want to learn the basics to be able to tweak the code, go here.

You can find the single-agent workflow here. To run it, you’ll need LangGraph Studio and the latest version of Docker installed.

Once you’re set up, open the project folder on your computer, add your GOOGLE_API_KEY in a .env file, and save. You can get a key from Google here.

Gemini Flash 2.0 has a generous free tier, so running this shouldn’t cost anything (but you may run into errors if you use it too much).

If you want to switch to another LLM or tools, you can tweak the code directly. But, again, remember the LLM needs to be compatible.

After setup, launch LangGraph Studio and select the correct folder.

This will boot up our workflow so we can test it.

Opening LangGraph Studio | Image by author

If you run into issues booting this up, double-check that you’re using the latest version of Docker.

Once it’s loaded, you can test the workflow by entering a human message and hitting submit.

LangGraph Studio opening the single agent workflow | Image by author

You can see me run the workflow below.

LangGraph Studio running the single agent workflow | Image by author

You can see the final response below.

LangGraph Studio finishing the single agent workflow | Image by author

For this prompt it decided that it would check weekly trending keywords filtered by the category ‘companies’ only, and then it fetched the sources of those keywords and summarized for us.

It had some issues in giving us a unified summary, where it simply used the information it got last and failed to use all of the research.

In reality we want it to fetch both trending and top keywords within several categories (not just companies), check sources, track specific keywords, and reason and summarize it all nicely before returning a response.

We can of course probe it and keep asking it questions but as you can imagine if we need something more complex it would start to make shortcuts in the workflow.

The key thing is, an agent system isn’t just gonna think the way we expect, we have to actually orchestrate it to do what we want.

So a single agent is great for something simple but as you can imagine it may not think or behave as we are expecting.

This is why going for a more complex system where each agent is responsible for one thing can be really useful.

Testing a Multi-Agent Workflow

Building multiagent workflows is a lot more difficult than building a single agent with access to some tools. To do this, you need to carefully think about the architecture beforehand and how data should flow between the agents.

The multi-agent workflow I’ll set up here uses two different teams — a research team and an editing team — with several agents under each.

Every agent has access to a specific set of tools.

The multiagent workflow logic with a hierarchical team | Image by author

We’re introducing some new tools, like a research pad that acts as a shared space — one team writes their findings, the other reads from it. The last LLM will read everything that has been researched and edited to make a summary.

An alternative to using a research pad is to store data in a scratchpad in state, isolating short-term memory for each team or agent. But that also means thinking carefully about what each agent’s memory should include.

I also decided to build out the tools a bit more to provide richer data upfront, so the agents don’t have to fetch sources for each keyword individually. Here I’m using normal programmatic logic because I can.

A key thing to remember: if you can use normal programming logic, do it.

Since we’re using multiple agents, you can lower costs by using cheaper models for most agents and reserving the more expensive ones for the important stuff.

Here, I’m using Gemini Flash 2.0 for all agents except the summarizer, which runs on OpenAI’s GPT-4o. If you want higher-quality summaries, you can use an even more advanced LLM with a larger context window.

The workflow is set up for you here. Before loading it, make sure to add both your OpenAI and Google API keys in a .env file.

In this workflow, the routes (edges) are setup dynamically instead of manually like we did with the single agent. It’ll look more complex if you peek into the code.

Once you boot up the workflow in LangGraph Studio — same process as before — you’ll see the graph with all these nodes ready.

Opening the multiagent workflow in LangGraph Studio | Image by author

LangGraph Studio lets us visualize how the system delegates work between agents when we run it—just like we saw in the simpler workflow above.

Since I understand the tools each agent is using, I can prompt the system in the right way. But regular users won’t know how to do this properly. So if you’re building something similar, I’d suggest introducing an agent that transforms the user’s query into something the other agents can actually work with.

We can test it out by setting a message.

“I’m an investor and I’m interested in getting an update for what has happened within the week in tech, and what people are talking about (this means categories like companies, people, websites and subjects are interesting). Please also track these specific keywords: AI, Google, Microsoft, and Large Language Models”

Then choosing “supervisor” as the Next parameter (we’d normally do this programmatically).

Running the multiagent workflow in LangGraph Studio — it will take several minutes | Image by author

This workflow will take several minutes to run, unlike the single-agent workflow we ran earlier which finished in under a minute.

So be patient while the tools are running.

In general, these systems take time to gather and process information and that’s just something we need to get used to.

The final summary will look something like this:

The result from the multiagent workflow in LangGraph Studio | Image by author

You can read the whole thing here instead if you want to check it out.

The news will obviously vary depending on when you run the workflow. I ran it the 28th of March so the example report will be for this date.

It should save the summary to a text document, but if you’re running this inside a container, you likely won’t be able to access that file easily. It’s better to send the output somewhere else — like Google Docs or via email.

As for the results, I’ll let you decide for yourself the difference between using a more complex system versus a simple one, and how it gives us more control over the process.

Finishing Notes

I’m working with a good data source here. Without that, you’d need to add a lot more error handling, which would slow everything down even more.

Clean and structured data is key. Without it, the LLM won’t perform at its best.

Even with solid data, it’s not perfect. You still need to work on the agents to make sure they do what they’re supposed to.

You’ve probably already noticed the system works — but it’s not quite there yet.

There are still several things that need improvement: parsing the user’s query into a more structured format, adding guardrails so agents always use their tools, summarizing more effectively to keep the research doc concise, improving error handling, and introducing long-term memory to better understand what the user actually needs.

State (short-term memory) is especially important if you want to optimize for performance and cost.

Right now, we’re just pushing every message into state and giving all agents access to it, which isn’t ideal. We really want to separate state between the teams. In this case, it’s something I haven’t done, but you can try it by introducing a scratchpad in the state schema to isolate what each team knows.

Regardless, I hope it was a fun experience to understand the results we can get by building different Agentic Workflows.

If you want to see more of what I’m working on, you can follow me here but also on Medium, GitHub, or LinkedIn (though I’m hoping to move over to X soon). I also have a Substack, where I hope to publishing shorter pieces in.

❤

The post Agentic AI: Single vs Multi-Agent Systems appeared first on Towards Data Science.

]]>
Data Science: From School to Work, Part III https://towardsdatascience.com/data-science-from-school-to-work-part-iii/ Fri, 28 Mar 2025 01:07:39 +0000 https://towardsdatascience.com/?p=605317 Good practices for Python error handling and logging

The post Data Science: From School to Work, Part III appeared first on Towards Data Science.

]]>
Introduction

Writing code is about solving problems, but not every problem is predictable. In the real world, your software will encounter unexpected situations: missing files, invalid user inputs, network timeouts, or even hardware failures. This is why handling errors isn’t just a nice-to-have; it’s a critical part of building robust and reliable applications for production.

Imagine an e-commerce website. A customer places an order, but during the checkout process, a database connection issue occurs. Without proper Error Handling, this issue could cause the application to crash, leaving the customer frustrated and the transaction incomplete. Worse, it might create inconsistent data, leading to even bigger problems down the line. Thus, error handling is a fundamental skill for any Python developer who wants to write code for production.

However, good error handling also goes hand in hand with a good logging system. It’s rare to have access to the console when the code is running in production. So there’s no chance of your print being seen by anyone. To ensure that you can monitor your application and investigate any incidents, you need to set up a logging system. This is where the loguru package comes into play, which I will introduce in this article.


I – How to handle Python errors?

In this part I present the best practices of error handling in Python, from try-except blocks and the use of raise to the finally statement. These concepts will help you write cleaner, more maintainable code that is suitable for a production environment.

The try-expect blocks

The try-except block is the main tool for handling errors in Python. It allows you to catch potential errors during code execution and prevent the program from crashing.

def divide(a, b):
  try:
    return a / b
  except ZeroDivisionError:
    print(f"Only Chuck Norris can divide by 0!")

In this trivial function, the try-except block allows the error caused by a division by 0 to be intercepted. The code in the try block is executed, and if an error occurs, the except block checks whether it is a ZeroDivisionError and print a message. But only this type of error is caught. For example, if b is a string, an error occurs. To avoid this, you can add a TypeError. So, it is important to test all possible errors.

The function becomes:

def divide(a, b):
    try:
        return a / b
    except ZeroDivisionError:
        print(f"Only Chuck Norris can divide by 0!")
    except TypeError:
        print("Do not compare apples and orange!")

Raise an exception

You can use the raise statement to manually raise an exception. This is useful if you want to report a user-defined error or impose a specific restriction on your code.

def divide(a, b):
    if b == 0:
        raise ValueError("Only Chuck Norris can divide by 0!")
    return a / b


try:
    result = divide(10, 0)
except ValueError as e:
    print(f"Error: {e}")
except TypeError:
    print("Do not compare apples and orange!")

In this example, a ValueError exception is triggered if the divisor is zero. In this way, you can explicitly control the error conditions. In the print function, the message will be “Error: Only Chuck Norris can divide by 0!“.

Some of the most common exceptions

ValueError: The type of a value is correct but its value is invalid.

try:
    number = math.sqrt(-10)
except ValueError:
  print("It's too complex to be real!")

KeyError: Trying to access a key that doesn’t exist in a dictionary.

data = {"name": "Alice"}
try:
    age = data["age"]
except KeyError:
    print("Never ask a lady her age!")

IndexError: Trying to access a non-existent index in a list.

items = [1, 2, 3]
try:
    print(items[3])
except IndexError:
    print("You forget that indexing starts at 0, don't you?")

TypeError: Performing an operation on incompatible types.

try:
    result = "text" + 5
except TypeError:
    print("Do not compare apples and orange!")

FileNotFoundError: Trying to open a non-existing file.

try:
    with open("notexisting_file.txt", "r") as file:
        content = file.read()
except FileNotFoundError:
    print("Are you sure of your path?")

Custom Error: You can trigger predefined exceptions or also define your own exception classes:

class CustomError(Exception):
    pass

try:
    raise CustomError("This is a custom error")
except CustomError as e:
    print(f"Catched error: {e}")

Clean with the finally statement

The finally block is executed in every case, regardless of whether the error has occurred or not. It is often used to perform cleanup actions, such as closing a connection to a database or releasing resources.

import sqlite3

try:
    conn = sqlite3.connect("users_db.db")  # Connect to a database
    cursor = conn.cursor()
    cursor.execute("SELECT * FROM users")  # Execute a query
    results = cursor.fetchall()  # Get result of the query
    print(results)
except sqlite3.DatabaseError as e:
    print("Database error:", e)
finally:
    print("Closing the database connection.")
    if 'conn' in locals():
        conn.close()  # Ensures the connection is closed

Best practices for error handling

  1. Catch specific exceptions: Avoid using a generic except block without specifying an exception, as it may mask unexpected errors. Prefer specifying the exception:
# Bad practice
try:
    result = 10 / 0
except Exception as e:
    print(f"Error: {e}")

# Good practice
try:
    result = 10 / 0
except ZeroDivisionError as e: 
    print(f"Error: {e}")
  1. Provide explicit messages: Add clear and descriptive messages when raising or handling exceptions.
  2. Avoid silent failures: If you catch an exception, ensure it is logged or re-raised so it doesn’t go unnoticed.
import logging

logging.basicConfig(level=logging.ERROR)

try:
    result = 10 / 0
except ZeroDivisionError:
    logging.error("Division by zero detected.")
  1. Use else and finally blocks: The else block runs only if no exception is raised in the try block.
try:
    result = 10 / 2
except ZeroDivisionError:
    logging.error("Division by zero detected.")
else:
    logging.info(f"Success: {result}")
finally:
    logging.info("End of processing.")

II – How to handle Python logs?

Good error-handling is one thing, but if no one knows that an error has occurred, the whole point is lost. As explained in the introduction, the monitor is rarely consulted or even seen when a program is running in production. No one will see print. Therefore, good error handling must be accompanied by a good logging system.

What are logs?

Logs are records of messages generated by a program to track the events that occur during its execution. These messages may contain information about errors, warnings, successful actions, process milestones or other relevant events. Logs are essential for debugging, tracking performance and monitoring the health of an application. They allow developers to understand what is going on in a program without having to interrupt its execution, making it easier to solve problems and continuously improve the software.

The loguru package

Python already has a native logging package: logging. But we prefer the loguru package, which is much simpler to use and easier to configure. In fact, complete output formatting is already preconfigured.

from loguru import logger
logger.debug("A pretty debug message!")
Image from author.

All the important elements are included directly in the message:

  • Time stamp
  • Log level, indicating the seriousness of the message.
  • File location, module and line number. In this example, the file location is __main__ because it was executed directly from the command line. The module is <module> due to the fact that the log is not located in a class or function.
  • The message.

The different logging levels

There are several log levels to take into account the importance of the message displayed (which is more complicated in a print). Each level has a name and an associated number:

  • TRACE (5): used to record detailed information on the program’s execution path for diagnostic purposes.
  • DEBUG (10): used by developers to record messages for debugging purposes.
  • INFO (20): used to record information messages describing normal program operation.
  • SUCCESS (25): similar to INFO, but used to indicate the success of an operation.
  • WARNING (30): used to indicate an unusual event that may require further investigation.
  • ERROR (40): used to record error conditions that have affected a specific operation.
  • CRITICAL (50): used to record error conditions that prevent a main function from working.

The package naturally handles different formatting depending on the level used

from loguru import logger

logger.trace("A trace message.")
logger.debug("A debug message.")
logger.info("An information message.")
logger.success("A success message.")
logger.warning("A warning message.")
logger.error("An error message.")
logger.critical("A critical message.")
Image from author.

The trace message was not displayed because the default minimum level used by loguru is debug. It therefore ignores all messages at lower levels.

It is possible to define new log levels with the level method and is used with the log method

logger.level("FATAL", no=60, color="<red>", icon="!!!")
logger.log("FATAL", "A FATAL event has just occurred.")
  • name : the name of the log level.
  • no : the corresponding severity value (must be an integer).
  • color : color markup.
  • icon : the level icon.

The logger configuration

It is possible to recreate a logger with a new configuration by deleting the old one with the remove command and generating a new logger with a new configuration with the add function. This function takes the following arguments:

  • sink [mandatory]: specifies a target for each data set created by the logger. By default, this value is set to sys.stderr (which corresponds to the standard error output).  We can also store all output in a “.log” file (except if you have a log collector).
  • level: Sets the minimum logging level for the recorder.
  • format: is useful to define a custom format for your logs. To maintain the coloring of the logs in the terminal, this must be specified (see example below).
  • filter: is used to determine whether a log should be recorded or not.
  • colorize: takes a boolean value and determines whether the terminal coloring should be activated or not.
  • serialize: causes the log to be displayed in JSON format if it is set to True.
  • backtrace: determines whether the exception trace should go beyond the point at which the error was recorded in order to facilitate troubleshooting.
  • diagnose: Determines whether variable values should be displayed in the exception trace. This option must be set to False in production environments so that no sensitive information is leaked.
  • enqueue: If this option is activated, the log data records are placed in a queue to avoid conflicts if several processes connect to the same target.
  • catch: If an unexpected error occurs when connecting to the server specified sink, you can detect it by setting this option to True. The error will be displayed in the standard error.
import sys
from loguru import logger

logger_format = (
    "{time:YYYY-MM-DD HH:mm:ss.SSS} | "
    "{level: <8} | "
    "{name}:{function}:{line}"
)
logger.remove()
logger.add(sys.stderr, format=logger_format)
    

Note:
Colors disappear in a file. This is because there are special characters (called ansi codes) that display colors in the terminal, but this formatting does not exist in the files.

Add context to logs

For complex applications, it can be useful to add further information to the logs to enable sorting and facilitate troubleshooting.

For example, if a user changes a database, it can be useful to have the user ID in addition to the change information.

Before you start recording context data, you need to make sure that the {extra} directive is included in your custom format. This variable is a Python dictionary that contains context data for each log entry (if applicable).

Here is an example of a customization where an extra user_id is added. In this format, the colors.

import sys
from loguru import logger


logger_format = (
    "<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | "
    "<level>{level: <8}</level> | "
    "<cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> | "
    "User ID: {extra[user_id]} - <level>{message}</level>"
)
logger.configure(extra={"user_id": ""})  # Default value
logger.remove()
logger.add(sys.stderr, format=logger_format)

It is now possible to use the bind method to create a child logger inheriting all the data from the parent logger.

childLogger = logger.bind(user_id="001")
childLogger.info("Here a message from the child logger")

logger.info("Here a message from the parent logger")
Image from author.

Another way to do this is to use the contextualize method in a with block.

with logger.contextualize(user_id="001"):
    logger.info("Here a message from the logger with user_id 001")

logger.info("Here a message from the logger without user_id")
Image from author.

Instead of the with block, you can use a decorator. The preceding code then becomes

@logger.contextualize(user_id="001")
def child_logger():
    logger.info("Here a message from the logger with user_id 001")

child_logger()

logger.info("Here a message from the logger without user_id")

The catch method

Errors can be automatically logged when they occur using the catch method.

def test(x):
    50/x

with logger.catch():
    test(0)
Image from author.

But it’s simpler to use this method as a decorator. This results in the following code

@logger.catch()
def test(x):
    50/x

test(0)

The log file

A production application is designed to run continuously and uninterrupted. In some cases, it is important to predict the behavior of the file, otherwise you will have to consult pages of logs in the event of an error.

Here are the different conditions under which a file can be modified:

  • rotation: specifies a condition under which the current log file is closed and a new file is created. This condition can be an int, a datetime or a str. Str is recommended as it is easier to read.
  • retention: specifies how long each log file should be kept before it is deleted from the file system.
  • compression: The log file is converted to the specified compression format if this option is activated.
  • delay: If this option is set to True, the creation of a new log file is delayed until the first log message has been pushed.
  • mode, buffering, encoding : Parameters that are passed to the Python function open and determine how Python opens log files. 

Note:
Usually, in the case of a production application, a log collector will be set up to retrieve the app’s outputs directly. It is therefore not necessary to create a log file.


Conclusion

Error handling in Python is an important step in writing professional and reliable code. By combining try-except blocks, the raise statement, and the finally block, you can handle errors predictably while maintaining readable and maintainable code.

Moreover, a good logging system improves the ability to monitor and debug your application. Loguru provides a simple and flexible package for logging messages and can therefore be easily integrated into your codebase.

In summary, combining effective error handling with a comprehensive logging system can significantly improve the reliability, maintainability, and debugging capability of your Python applications.


References

1 – Error handling in Python: official Python documentation on exceptions

2 – The loguru documentation: https://loguru.readthedocs.io/en/stable/

3 – Guide about loguru: https://betterstack.com/community/guides/logging/loguru/

The post Data Science: From School to Work, Part III appeared first on Towards Data Science.

]]>
A Clear Intro to MCP (Model Context Protocol) with Code Examples https://towardsdatascience.com/clear-intro-to-mcp/ Tue, 25 Mar 2025 18:03:19 +0000 https://towardsdatascience.com/?p=605246 MCP is a way to democratize access to tools for AI Agents. In this article we cover the fundamental components of MCP, how they work together, and a code example of how MCP works in practice.

The post A Clear Intro to MCP (Model Context Protocol) with Code Examples appeared first on Towards Data Science.

]]>
As the race to move AI agents from prototype to production heats up, the need for a standardized way for agents to call tools across different providers is pressing. This transition to a standardized approach to agent tool calling is similar to what we saw with REST APIs. Before they existed, developers had to deal with a mess of proprietary protocols just to pull data from different services. REST brought order to chaos, enabling systems to talk to each other in a consistent way. MCP (Model Context Protocol) is aiming to, as it sounds, provide context for AI models in a standard way. Without it, we’re headed towards tool-calling mayhem where multiple incompatible versions of “standardized” tool calls crop up simply because there’s no shared way for agents to organize, share, and invoke tools. MCP gives us a shared language and the democratization of tool calling.

One thing I’m personally excited about is how tool-calling standards like MCP can actually make Ai Systems safer. With easier access to well-tested tools more companies can avoid reinventing the wheel, which reduces security risks and minimizes the chance of malicious code. As Ai systems start scaling in 2025, these are valid concerns.

As I dove into MCP, I realized a huge gap in documentation. There’s plenty of high-level “what does it do” content, but when you actually want to understand how it works, the resources start to fall short—especially for those who aren’t native developers. It’s either high level explainers or deep in the source code.

In this piece, I’m going to break MCP down for a broader audience—making the concepts and functionality clear and digestible. If you’re able, follow along in the coding section, if not it will be well explained in natural language above the code snippets.

An Analogy to Understand MCP: The Restaurant

Let’s imagine the concept of MCP as a restaurant where we have:

The Host = The restaurant building (the environment where the agent runs)

The Server = The kitchen (where tools live)

The Client = The waiter (who sends tool requests)

The Agent = The customer (who decides what tool to use)

The Tools = The recipes (the code that gets executed)

The Components of MCP

Host
This is where the agent operates. In our analogy, it’s the restaurant building; in MCP, it’s wherever your agents or LLMs actually run. If you’re using Ollama locally, you’re the host. If you’re using Claude or GPT, then Anthropic or OpenAI are the hosts.

Client

This is the environment that sends tool call requests from the agent. Think of it as the waiter who takes your order and delivers it to the kitchen. In practical terms, it’s the application or interface where your agent runs. The client passes tool call requests to the Server using MCP.

Server

This is the kitchen where recipes, or tools, are housed. It centralizes tools so agents can access them easily. Servers can be local (spun up by users) or remote (hosted by companies offering tools). Tools on a server are typically either grouped by function or integration. For instance, all Slack-related tools can be on a “Slack server,” or all messaging tools can be grouped together on a “messaging server”. That decision is based on architectural and developer preferences.

Agent

The “brains” of the operation. Powered by an LLM, it decides which tools to call to complete a task. When it determines a tool is needed, it initiates a request to the server. The agent doesn’t need to natively understand MCP because it learns how to use it through the metadata associated with each of the tools. This metadata associated with each tool tells the agent the protocol for calling the tool and the execution method. But it is important to note that the platform or agent needs to support MCP so that it handles tool calls automatically. Otherwise it is up to the developer to write the complex translation logic of how to parse the metadata from the schema, form tool call requests in MCP format, map the requests to the correct function, execute the code, and return the result in MCP complaint format back to the agent.

Tools

These are the functions, such as calling APIs or custom code, that “does the work”. Tools live on servers and can be:

  • Custom tools you create and host on a local server.
  • Premade tools hosted by others on a remote server.
  • Premade code created by others but hosted by you on a local server.

How the components fit together

  1. Server Registers Tools
    Each tool is defined with a name, description, input/output schemas, a function handler (the code that runs) and registered to the server. This usually involves calling a method or API to tell the server “hey, here’s a new tool and this is how you use it”.
  2. Server Exposes Metadata
    When the server starts or an agent connects, it exposes the tool metadata (schemas, descriptions) via MCP.
  3. Agent Discovers Tools
    The agent queries the server (using MCP) to see what tools are available. It understands how to use each tool from the tool metadata. This typically happens on startup or when tools are added.
  4. Agent Plans Tool Use
    When the agent determines a tool is needed (based on user input or task context), it forms a tool call request in a standardized MCP JSON format which includes tool name, input parameters that match the tool’s input schema, and any other metadata. The client acts as the transport layer and sends the MCP formatted request to the server over HTTP.
  5. Translation Layer Executes
    The translation layer takes the agent’s standardized tool call (via MCP), maps the request to the corresponding function on the server, executes the function, formats the result back to MCP, and sends it back to the agent. A framework that abstracts MCP for you deos all of this without the developer needing to write the translation layer logic (which sounds like a headache).
Image by Sandi Besen

Code Example of A Re-Act Agent Using MCP Brave Search Server

In order to understand what MCP looks like when applied, let’s use the beeAI framework from IBM, which natively supports MCP and handles the translation logic for us.

 If you plan on running this code you will need to:

  1. Clone the beeai framework repo to gain access to the helper classes used in this code 
  2. Create a free Brave developer account and get your API key. There are free subscriptions available (credit card required). 
  3. Create an OpenAI developer account and create an API Key
  4. Add your Brave API key and OpenAI key to the .env file at the python folder level of the repo.
  5. Ensure you have npm installed and have set your path correctly.

Sample .env file

BRAVE_API_KEY= "<Your API Key Here>"
BEEAI_LOG_LEVEL=INFO
OPENAI_API_KEY= "<Your API Key Here>"

Sample mcp_agent.ipynb

1. Import the necessary libraries

import asyncio
import logging
import os
import sys
import traceback
from typing import Any
from beeai_framework.agents.react.runners.default.prompts import SystemPromptTemplate
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from beeai_framework import Tool
from beeai_framework.agents.react.agent import ReActAgent
from beeai_framework.agents.types import AgentExecutionConfig
from beeai_framework.backend.chat import ChatModel, ChatModelParameters
from beeai_framework.emitter.emitter import Emitter, EventMeta
from beeai_framework.errors import FrameworkError
from beeai_framework.logger import Logger
from beeai_framework.memory.token_memory import TokenMemory
from beeai_framework.tools.mcp_tools import MCPTool
from pathlib import Path
from beeai_framework.adapters.openai.backend.chat import OpenAIChatModel
from beeai_framework.backend.message import SystemMessa

2. Load the environment variables and set the system path (if needed)

import os
from dotenv import load_dotenv

# Absolute path to your .env file
# sometimes the system can have trouble locating the .env file
env_path = <Your path to your .env file>
# Load it
load_dotenv(dotenv_path=env_path)

# Get current working directory
path = <Your path to your current python directory> #...beeai-framework/python'
# Append to sys.path
sys.path.append(path)

3. Configure the logger

# Configure logging - using DEBUG instead of trace
logger = Logger("app", level=logging.DEBUG)

4. Load helper functions like process_agent_events,observer, and create an instance of ConsoleReader

  • process_agent_events: Handles agent events and logs messages to the console based on the event type (e.g., error, retry, update). It ensures meaningful output for each event to help track agent activity.
  • observer: Listens for all events from an emitter and routes them to process_agent_events for processing and display.
  • ConsoleReader: Manages console input/output, allowing user interaction and formatted message display with color-coded roles.
#load console reader
from examples.helpers.io import ConsoleReader
#this is a helper function that makes the assitant chat easier to read
reader = ConsoleReader()

def process_agent_events(data: dict[str, Any], event: EventMeta) -> None:
  """Process agent events and log appropriately"""

  if event.name == "error":
      reader.write("Agent 🤖 : ", FrameworkError.ensure(data["error"]).explain())
  elif event.name == "retry":
      reader.write("Agent 🤖 : ", "retrying the action...")
  elif event.name == "update":
      reader.write(f"Agent({data['update']['key']}) 🤖 : ", data["update"]["parsedValue"])
  elif event.name == "start":
      reader.write("Agent 🤖 : ", "starting new iteration")
  elif event.name == "success":
      reader.write("Agent 🤖 : ", "success")
  else:
      print(event.path)

def observer(emitter: Emitter) -> None:
  emitter.on("*.*", process_agent_events)

5. Set the Brave API Key and server parameters.

Anthropic has a list of MCP servers here.

brave_api_key = os.environ["BRAVE_API_KEY"]

brave_server_params = StdioServerParameters(
  command="/opt/homebrew/bin/npx",  # Full path to be safe
  args=[
      "-y",
      "@modelcontextprotocol/server-brave-search"
  ],
  env={
      "BRAVE_API_KEY": brave_api_key,
        "x-subscription-token": brave_api_key
  },
)

6. Create the brave tool that initiates the connection to the MCP server, discovers tools, and returns the discovered tools to the Agents so it can decide what tool is appropriate to call for a given task. 

In this case 2 tools are discoverable on the Brave MCP Server:

  • brave_web_search: Execute web searches with pagination and filtering
  • brave_local_search: Search for local businesses and services
async def brave_tool() -> MCPTool:
  brave_env = os.environ.copy()
  brave_server_params = StdioServerParameters(
      command="/opt/homebrew/bin/npx",
      args=["-y", "@modelcontextprotocol/server-brave-search"],
      env=brave_env
  )

  print("Starting MCP client...")
  try:
      async with stdio_client(brave_server_params) as (read, write), ClientSession(read, write) as session:
          print("Client connected, initializing...")

          await asyncio.wait_for(session.initialize(), timeout=10)
          print("Initialized! Discovering tools...")

          bravetools = await asyncio.wait_for(
              MCPTool.from_client(session, brave_server_params),
              timeout=10
          )
          print("Tools discovered!")
          return bravetools
  except asyncio.TimeoutError as e:
      print("❌ Timeout occurred during session initialization or tool discovery.")
  except Exception as e:
      print("❌ Exception occurred:", e)
      traceback.print_exc()

(Optional) Check the connection to the MCP server and ensure it returns all the available tools before providing it to the agent.

tool = await brave_tool()
print("Discovered tools:", tool)

for tool in tool:
  print(f"Tool Name: {tool.name}")
  print(f"Description: {getattr(tool, 'description', 'No description available')}")
  print("-" * 30)

OUTPUT:

Starting MCP client...

Client connected, initializing...

Initialized! Discovering tools...

Tools discovered!

Discovered tools: [<beeai_framework.tools.mcp_tools.MCPTool object at 0x119aa6c00>, <beeai_framework.tools.mcp_tools.MCPTool object at 0x10fee3e60>]

Tool Name: brave_web_search

Description: Performs a web search using the Brave Search API, ideal for general queries, news, articles, and online content. Use this for broad information gathering, recent events, or when you need diverse web sources. Supports pagination, content filtering, and freshness controls. Maximum 20 results per request, with offset for pagination. 

------------------------------

Tool Name: brave_local_search

Description: Searches for local businesses and places using Brave's Local Search API. Best for queries related to physical locations, businesses, restaurants, services, etc. Returns detailed information including:

- Business names and addresses

- Ratings and review counts

- Phone numbers and opening hours

Use this when the query implies 'near me' or mentions specific locations. Automatically falls back to web search if no local results are found.

7. Write the function that creates the agent:  

  • assign an LLM
  • create an instance of the brave_tool() function and assign it to a tools variable
  • create a re-act agent and assign it the chosen llm, tools, memory (so it can have constinous conversation)
  • Add a system prompt to the re-act agent.  

Note: You might notice that I added a sentence to the system prompt that reads “If you need to use the brave_tool you must use a count of 5.” This is a bandaid work-around becasue of a bug I found in the index.ts file of the brave server. I will contribute to the repo to fix it.

async def create_agent() -> ReActAgent:
  """Create and configure the agent with tools and LLM"""
  #using openai api instead
  llm = OpenAIChatModel(model_id="gpt-4o")
 
  # Configure tools
  tools: list[Tool] = await brave_tool()
  #tools: list[Tool] = [await brave_tool()]

  # Create agent with memory and tools
  agent = ReActAgent(llm=llm, tools=tools, memory=TokenMemory(llm), )
 
  await agent.memory.add(SystemMessage(content="You are a helpful assistant. If you need to use the brave_tool you must use a count of 5."))

  return agent

8. Create the main function

  • Creates the agent
  • Enters a conversation loop with the user and runs the agent with the user prompt and some configuration settings. Finishes the conversation if the user types “exit” or “quit”.
import asyncio
import traceback
import sys

# Your async main function
async def main() -> None:
  """Main application loop"""

  # Create agent
  agent = await create_agent()

  # Main interaction loop with user input
  for prompt in reader:
      # Exit condition
      if prompt.strip().lower() in {"exit", "quit"}:
          reader.write("Session ended by user. Goodbye! 👋n")
          break

      # Run agent with the prompt
      try:
          response = await agent.run(
              prompt=prompt,
              execution=AgentExecutionConfig(max_retries_per_step=3, total_max_retries=10, max_iterations=20),
          ).observe(observer)

          reader.write("Agent 🤖 : ", response.result.text)
      except Exception as e:
          reader.write("An error occurred: ", str(e))
          traceback.print_exc()
# Run main() with error handling
try:
  await main()
except FrameworkError as e:
  traceback.print_exc()
  sys.exit(e.explain())

OUTPUT:

Starting MCP client...

Client connected, initializing...

Initialized! Discovering tools...

Tools discovered!

Interactive session has started. To escape, input 'q' and submit.

Agent 🤖 : starting new iteration

Agent(thought) 🤖 : I will use the brave_local_search function to find the open hours for La Taqueria on Mission St in San Francisco.

Agent(tool_name) 🤖 : brave_local_search

Agent(tool_input) 🤖 : {'query': 'La Taqueria Mission St San Francisco'}

Agent(tool_output) 🤖 : [{"annotations": null, "text": "Error: Brave API error: 422 Unprocessable Entityn{"type":"ErrorResponse","error":{"id":"ddab2628-c96e-478f-80ee-9b5f8b1fda26","status":422,"code":"VALIDATION","detail":"Unable to validate request parameter(s)","meta":{"errors":[{"type":"greater_than_equal","loc":["query","count"],"msg":"Input should be greater than or equal to 1","input":"0","ctx":{"ge":1}}]}},"time":1742589546}", "type": "text"}]

Agent 🤖 : starting new iteration

Agent(thought) 🤖 : The function call resulted in an error. I will try again with a different approach to find the open hours for La Taqueria on Mission St in San Francisco.

Agent(tool_name) 🤖 : brave_local_search

Agent(tool_input) 🤖 : {'query': 'La Taqueria Mission St San Francisco', 'count': 5}

Agent(tool_output) 🤖 : [{"annotations": null, "text": "Title: LA TAQUERIA - Updated May 2024 - 2795 Photos & 4678 Reviews - 2889 Mission St, San Francisco, California - Mexican - Restaurant Reviews - Phone Number - YelpnDescription: LA TAQUERIA, <strong>2889 Mission St, San Francisco, CA 94110</strong>, 2795 Photos, Mon - Closed, Tue - Closed, Wed - 11:00 am - 8:45 pm, Thu - 11:00 am - 8:45 pm, Fri - 11:00 am - 8:45 pm, Sat - 11:00 am - 8:45 pm, Sun - 11:00 am - 7:45 pmnURL: https://www.yelp.com/biz/la-taqueria-san-francisco-2nnTitle: La Taqueria: Authentic Mexican Cuisine for Every TastenDescription: La Taqueria - <strong>Mexican Food Restaurant</strong> welcomes you to enjoy our delicious. La Taqueria provides a full-service experience in a fun casual atmosphere and fresh flavors where the customer always comes first!nURL: https://lataqueria.gotoeat.net/nnTitle: r/sanfrancisco on Reddit: Whats so good about La Taqueria in The Mission?nDescription: 182 votes, 208 comments. Don't get me wrong its good but I failed to see the hype. I waited in a long line and once I got my food it just tastes like…nURL: https://www.reddit.com/r/sanfrancisco/comments/1d0sf5k/whats_so_good_about_la_taqueria_in_the_mission/nnTitle: LA TAQUERIA, San Francisco - Mission District - Menu, Prices & Restaurant Reviews - TripadvisornDescription: La Taqueria still going strong. <strong>Historically the most well known Burrito home in the city and Mission District</strong>. Everything is run like a clock. The fillings are just spiced and prepared just right. Carnitas, chicken, asada, etc have true home made flavors. The Tortillas both are super good ...nURL: https://www.tripadvisor.com/Restaurant_Review-g60713-d360056-Reviews-La_Taqueria-San_Francisco_California.htmlnnTitle: La Taqueria – San Francisco - a MICHELIN Guide RestaurantnDescription: San Francisco Restaurants · La Taqueria · 4 · <strong>2889 Mission St., San Francisco, 94110, USA</strong> · $ · Mexican, Regional Cuisine · Visited · Favorite · Find bookable restaurants near me · <strong>2889 Mission St., San Francisco, 94110, USA</strong> · $ · Mexican, Regional Cuisine ·nURL: https://guide.michelin.com/us/en/california/san-francisco/restaurant/la-taqueria", "type": "text"}]

Agent 🤖 : starting new iteration

Agent(thought) 🤖 : I found the open hours for La Taqueria on Mission St in San Francisco. I will provide this information to the user.

Agent(final_answer) 🤖 : La Taqueria, located at 2889 Mission St, San Francisco, CA 94110, has the following opening hours:

- Monday: Closed

- Tuesday: Closed

- Wednesday to Saturday: 11:00 AM - 8:45 PM

- Sunday: 11:00 AM - 7:45 PM

For more details, you can visit their [Yelp page](https://www.yelp.com/biz/la-taqueria-san-francisco-2).

Agent 🤖 : success

Agent 🤖 : success

run.agent.react.finish

Agent 🤖 : La Taqueria, located at 2889 Mission St, San Francisco, CA 94110, has the following opening hours:

- Monday: Closed

- Tuesday: Closed

- Wednesday to Saturday: 11:00 AM - 8:45 PM

- Sunday: 11:00 AM - 7:45 PM

For more details, you can visit their [Yelp page](https://www.yelp.com/biz/la-taqueria-san-francisco-2).

Conclusion, Challenges, and Where MCP is Headed

In this article you’ve seen how MCP can provide a standardized way for agents to discover tools on an MCP server and then interact with them without the developer needing to specify the implementation details of the tool call. The level of abstraction that MCP offers is powerful. It means developers can focus on creating valuable tools while agents can seamlessly discover and use them through standard protocols.

Our Restaurant example helped us visualize how MCP concepts like the host, client, server, agent, and tools work together – each with their own important role. The code example, where we used a Re-Act Agent in the Beeai framework, which handles MCP tool calling natively, to call the Brave MCP server with access to two tools provided a real world understanding of MCP can be used in practice.
Without protocols like MCP, we face a fragmented landscape where every AI provider implements their own incompatible tool-calling mechanisms– creating complexity, security vulnerabilities, and wasted development effort.

In the coming months, we’ll likely see MCP gain significant traction for several reasons:

  • As more tool providers adopt MCP, the network effect will accelerate adoption across the industry.
  • Standardized protocols mean better testing, fewer vulnerabilities, and reduced risks as AI systems scale.
  • The ability to write a tool once and have it work across multiple agent frameworks will dramatically reduce development overhead.
  • Smaller players can compete by focusing on building excellent tools rather than reinventing complex agent architectures.
  • Organizations can integrate AI agents more confidently knowing they’re built on stable, interoperable standards.

That said, MCP faces important challenges that need addressing as adoption grows:

  • As demonstrated in our code example, agents can only discover tools once connected to a server
  • The agent’s functionality becomes dependent on server uptime and performance, introducing additional points of failure.
  • As the protocol evolves, maintaining compatibility while adding new features will require governance.
  • Standardizing how agents access potentially sensitive tools across different servers introduces security considerations.
  • The client-server architecture introduces additional latency.

For developers, AI researchers, and organizations building agent-based systems, understanding and adopting MCP now—while being mindful of these challenges—will provide a significant advantage as more AI solutions begin to scale.


Note: The opinions expressed both in this article and paper are solely those of the authors and do not necessarily reflect the views or policies of their respective employers.

Interested in connecting? Drop me a DM on Linkedin! I‘m always eager to engage in food for thought and iterate on my work.

The post A Clear Intro to MCP (Model Context Protocol) with Code Examples appeared first on Towards Data Science.

]]>
What Germany Currently Is Up To, Debt-Wise https://towardsdatascience.com/what-germany-currently-is-up-to-debt-wise/ Fri, 21 Mar 2025 19:30:58 +0000 https://towardsdatascience.com/?p=605206 Billions, visualized to scale using python and HTML

The post What Germany Currently Is Up To, Debt-Wise appeared first on Towards Data Science.

]]>
€1,600 per second. That’s how much interest Germany has to pay for its debts. In total, the German state has debts ranging into the trillions — more than a thousand billion Euros. And the government is planning to make even more, up to one trillion additional debt is rumored to follow over the next 10 years.

The numbers involved in governmental finances are so huge that one probably cannot realistically assess just how much even 1 billion Euro or Dollar are.

In this article, I demonstrate that conventional lists and Charts fail to convey a sense of just how much money is at stake when it comes to governmental spending. I then show how a little bit of programming can interactively visualize this money and how it relates to other numbers. I will use Germany as an example, as it currently receives a lot of media coverage and its debt statistics are freely available.

Plain enumeration

To start, we’ll use plain enumeration of the key facts as the first method to (not) put information into relation. It excludes household debts. As we’ll later see, this simple method utterly fails compared to the visualization tools provided through simple scripts.

  • €1,600: interest rate per second
  • €25,503: debt per German citizen if state debt is split

And here’s already a large jump for us. We are directly jumping into the billions:

  • €49,5 billion: interest rate per year
  • €100 billion: Sondervermögen (euphemism for debt) for German Army
  • €500 billion: planned additional debt for infrastructure

Now, we are making another jump:

  • €2,11 trillion: total German governmental debt (as of March 2025)

After reading these numbers, we might know a bit more about Germany’s debt. But we hardly have an understanding of how they relate to each other. Yes, we know that €1 billion is a thousand times €1 million. But that’s just common sense. 

We would probably fare better if we could see the numbers visualized side by side. That’s what we will do next.

Linearly scaled charts

Using python and the Matplotlib plotting library, it is straightforward to create a simple chart. (Complete code is linked in this article’s Resource section at the end). 

I picked four numbers to visualize together: €1,600 (because most people know just how much already that is), €25,503 (because it nicely shows the hidden debt that any German has), €1 billion (because that’s a very large sum, something that large companies don’t even make per year), and, finally €49,5 billion (because that’s how much Germany currently needs to spend just in interest per year which is more than most countries’ GDP).

import matplotlib.pyplot as plt


# Data
amounts = [1600, 25503, 1e9, 49.5e9, ]
labels = ['Per-sec. interest', 'Per-person debt','€1 billion', 'Yearly interest']


plt.figure(figsize=(10, 6))
plt.bar(labels, amounts, color=['orange', 'orange', '#03A9F4', '#ff0000'])

After running this code, we get the following plot:

What we see in an instant: we don’t see the small money. The huge amounts completely dwarf the €1,600. I’d wager to say that anybody reading this has more connection to just €1,000 than to, say, €1 million. We know what €1,000 could afford us. A couple of €1,000 is a good monthly income for most people.

But the chart does not even recognize it.

Is the mistake that I used linearly scaled axes? Let’s see next.

Logarithmically scaled charts

In visualizing the data logarithmically, we’ll stick to python and matplotlib. We merely need to add a single line of code and directly get an updated chart:

Is it better? To some extent, yes! We can now begin to see the difference between everyday amounts (like the €1,600 interest per second) and the planned spending (i.e., debt).

Thanks to the logarithmic scaling, they appear on the same chart. In this visualization, the chart does not grow linearly, but logarithmically. This means that the spacing between two markers on the y-axis does not represent a fixed, equal increment (like before in the linearly scaled plot). Instead, each step represents a multiplication by a constant factor. In our plot, the spacing is determined by multiplying with 100 (or, adding two trailing zeros).

For our purpose: is such logarithmic scaling better than linear scaling? Yes, definitely.

But, is it sufficient? Can we not do better in trying to convey what Germany’s up to when it plans for €500 billion of additional debt? And, how does this debt relate to other, already existing debts?

Yes, of course we can. Using a little bit of HTML, JavaScript, and some CSS styling, we can quickly create a simple interactive webpage. For a beginner it’s easily doable over a weekend.

A static webpage is all it needs!

Data scientists and programmers wrangle with data day-in, day-out. Tools like Excel and python scripts help them with transforming the data to gain insights.

Sometimes, however, a simple webpage can convey the relationship between numbers better. Especially when we are talking about the huge sums involved in governmental debts.

We start our visualization in HTML, by stacking a few div-elements on top of each other:

...
<div class="debt-wrapper">
     <h2 class="debt-title">€25,503 (Debt per German citizen <em>if total governmental debt is split </em>)</h2>
     <div class="debt one-thousand" data-height="25503"></div>
</div>
<div class="debt-wrapper">
     <h2 class="debt-title">€1 billion</h2>
     <div class="debt billion" data-height="1000000000"></div>
</div>
<div class="debt-wrapper" id="interest-year">
     <div class="debt-header">
       <h2 class="debt-title">€49,5 billion (German interest per year)</h2>
     </div>
     <div class="debt ruler" data-height="49500000000"></div>
</div>
...

For each section, we indicate the amount in € in an HTML attribute.

Next, we will use JavaScript to transform the amounts into an easy-to-grasp-visualization.

For this, we define that each pixel represents €1,000. By using rectangular forms, we can thus represent any amount of money:

document.addEventListener("DOMContentLoaded", function() {
     const wealthBars = document.querySelectorAll(".debt");
     wealthBars.forEach(bar => {
       if (!bar.dataset.scaled) {
         const amount = parseInt(bar.dataset.height) / 1000;
         const width = Math.min(Math.sqrt(amount), 200); // Cap the width pixels
         const height = amount / width;
         bar.style.width = width + "px";
         bar.style.height = height + "px";
         bar.dataset.scaled = "true";

Lastly, we add some CSS styling to make the rendered webpage look well:

.debt-wrapper {
 display: flex;
 flex-direction: column;
 align-items: center;
 margin: 20px 0;
}


.debt-title {
 font-size: 20px;
 margin-bottom: 10px;
}


/* Debt Bars */
.debt {
 position: relative;
 transition: height 0.3s ease-out, width 0.3s ease-out;
 background-color: #ffcc00;
 max-width: 200px; /* Maximum width for bars */
}

Putting all of this together (find the full source code in the Resources section below), we get the following (I added further key numbers that I considered relevant in putting the German debt into proportion):

Visualization by the author. Find it here: https://phrasenmaeher.github.io

Now, that is an easy to understand visualization! You can explore it yourself here: https://phrasenmaeher.github.io.

This simple webpage more accurately represents the huge amount of fresh debt that Germany wants to make. Using basic Programming skills, we show how the debt relates to everyday sums (like €1,600) and existing debt-related costs (like the €49,5 billion interest per year). Just start scrolling down, and you get a sense of how much money it is. In the above GIF, we have not even scrolled 1% of the entire way down (look at the scroll bar to the right, it barely moves).

Recall that 1 pixel equals €1,000. Even if you are earning €10,000 per month, that’s merely 10 pixels, which is barely noticeable in the debt bars. If you scroll just 1 pixel down, you have uncovered €200,000 of new debt (with the default bar width of 200). Even if you make €1 million (per year), that’s just a mere scrolling of 5 pixels. However much money you make, the visualization demonstrates: it’s literally a drop in the debt ocean.

If you are German, I don’t feel envy, especially not for the upcoming generations: somebody has to pay this back. In addition to existing debts.


Resources

The post What Germany Currently Is Up To, Debt-Wise appeared first on Towards Data Science.

]]>
How to Develop Complex DAX Expressions https://towardsdatascience.com/how-to-develop-complex-dax-expressions/ Tue, 11 Mar 2025 18:39:37 +0000 https://towardsdatascience.com/?p=599491 On the importance of taking some time to thoroughly understand the needs and the possible future use cases for the requested measures

The post How to Develop Complex DAX Expressions appeared first on Towards Data Science.

]]>
At some point or another, any Power BI developer must write complex Dax expressions to analyze data. But nobody tells you how to do it. What’s the process for doing it? What is the best way to do it, and how supportive can a development process be? These are the questions I will answer here.

Introduction 

Sometimes my clients ask me how I came up with the solution for a specific measure in DAX. My answer is always that I follow a specific process to find a solution. 

Sometimes, the process is not straightforward, and I must deviate or start from scratch when I  see that I have taken the wrong direction. 

But the development process is always the same: 

1. Understand the requirements. 

2. Define the math to calculate the result. 

3. Understand if the measure must work in any or one specific scenario.

4. Start with intermediary results and work my way step-by-step until I fully understand how it should work and can deliver the requested result. 

5. Calculate the final result. 

The third step is the most difficult. 

Sometimes my client asks me to calculate a specific result in a particular scenario. But after I ask again, the answer is: Yes, I will also use it in other scenarios. 

For example, some time ago, a client asked me to create some measures for a specific scenario in a report. I had to do it live during a workshop with the client’s team. 

Days after I delivered the requested results, he asked me to create another report based on the same semantic model and logic we elaborated on during the workshop, but for a more flexible scenario. 

The first set of measures was designed to work tightly with the first scenario, so I didn’t want to change them. Therefore, I created a new set of more generic measures. 

Yes, this is a worst-case scenario, but it is something that can happen. 

This was just an example of how important it is to take some time to thoroughly understand the needs and the possible future use cases for the requested measures. 

Step 1: The requirements 

For this piece, I take one measure from my previous article to calculate the linear extrapolation of my customer count. 

The requirements are:

  • Use the Customer Count Measure as the Basis Measure. 
  • The user can select the year to analyze. 
  • The user can select any other dimension in any Slicer. 
  • The User will analyze the result over time per month. 
  • The past Customer Count should be taken as the input values. 
  • The YTD growth rate must be used as the basis for the result. 
  • Based on the YTD growth rate, the Customer Count should be extrapolated to the end of  the year. 
  • The YTD Customer Count and the Extrapolation must be shown on the same Line-Chart.

The result should look like this for the year 2022: 

Figure 1 – Requested result for the linear extrapolation of the Customer Count (Figure by the Author) 

OK, let’s look at how I developed this measure.

But before doing so, we must understand what the filter context is. 

If you are already familiar with it, you can skip this section. Or you can read it anyway to ensure we are at the same level. 

Interlude: The filter context 

The filter context is the central concept of DAX. 

When writing measures in a semantic model, whether in Power Bi, a fabric semantic model, or an analysis services semantic model, you must always understand the current filter context. 

The filter context is: 

The sum of all Filters which affect the result of a DAX expression. 

Look at the following picture:

Figure 2 – Ask yourself: What is the Filter Context of the marked cells? (Figure by the Author) Can you explain the Filter Context of the marked cells? 

Now, look at the following picture: 

Figure 3 – All the Filters that affect the Filter Context of the marked cells (Figure by the Author) 

There are six filters, that affect the filter context of the marked cells for the two measures “Sum Retail Sales” and “Avg Retail Sales”: 

  • The Store “Contoso Paris Store” 
  • The City “Paris” 
  • The ClassName “Economy” 
  • The Month of April 2024 
  • The Country “France” 
  • The Manufacturer “Proseware Inc.” 

The first three filters come from the visual. We can call them “Internal Filters”. They control how the Matrix-Visual can expand and how many details we can see. 

The other filters are “External Filters”, which come from the Slicers or the Filter Pane in Power BI  and are controlled by the user. 

The Power of DAX Measures lies in the possibility of extracting the value of the Filter Context and the capability of manipulating the Filter context. 

We do this when writing DAX expressions: We manipulate the filter context.

Step 2: Intermediary results 

OK, now we are good to go. 

First, I do not start with the Line-Visual, but with a Table or a Matrix Visual. 

This is because it’s easier to see the result as a number than a line. 

Even though a linear progression is visible only as a line. 

However, the intermediary results are better readable in a Matrix. 

If you are not familiar with working with Variables in DAX, I recommend reading this piece, where  I explain the concepts for Variables: 

The next step is to define the Base Measure. This is the Measure we want to use to calculate the intended Result. 

As we want to calculate the YTD result, we can use a YTD Measure for the Customer Count: 

Online Customer Count YTD =
VAR YTDDates = DATESYTD('Date'[Date])
RETURN
CALCULATE(
DISTINCTCOUNT('Online Sales'[CustomerKey])
,YTDDates
)

Now we must consider what to do with these intermediary results. 

This means that we must define the arithmetic of the Measure. 

For each month, I must calculate the last known Customer Count YTD. 

This means, I always want to calculate 2,091 for each month. This is the last YTD Customer  Count for the year 2022. 

Then, I want to divide this result by the last month with Sales, in this case 6, for June. Then multiply it by the current month number. 

Therefore, the first intermediary result is to know when the last Sale was made. We must get the latest date in the Online Sales table for this. 

According to the requirements, the User can select any year to analyze, and the result must be calculated monthly. 

Therefore, the correct definition is: I must first know the month when the last sale was made for the selected year. 

The Fact table contains a date and a Relationship to the Date table, which includes the month number (Column: [Month]).

So, the first variable will be something like this: 

Linear extrapolation Customer Count YTD trend =
// Get the number of months since the start of the year
VAR LastMonthWithData = MAXX('Online Sales'

,RELATED('Date'[Month])
)

RETURN
LastMonthWithData

This is the result: 

Figure 4 – Get the last month with Sales (Figure by the Author) 

Hold on: We must always get the last month with sales. As it is now, we always get the same month as the Month of the current row. 

This is because each row has the Filter Context set to each month. 

Therefore, we must remove the Filter for the Month, while retaining the Year. We can do this with ALLEXCEPT()

Linear extrapolation Customer Count YTD trend =
// Get the number of months since the start of the year
VAR LastMonthWithData = CALCULATE(MAXX('Online Sales'
,RELATED('Date'[Month])
)
,ALLEXCEPT('Date', 'Date'[Year])
)

RETURN
LastMonthWithData

Now, the result looks much better:

Figure 5 – Last month with Sales calculated for all months (Figure by the Author) 

As we calculate the result for each month, we must know the month number of the current row (Month). We will reuse this as the factor for which we multiply the Average to get the linear extrapolation. 

The next intermediary result is to get the Month number: 

Linear extrapolation Customer Count YTD trend =
// Get the number of months since the start of the year
VAR LastMonthWithData = CALCULATE(MAXX('Online Sales'
,RELATED('Date'[Month])
)
,ALLEXCEPT('Date', 'Date'[Year])
)
// Get the last month
// Is needed if we are looking at the data at the year, semester, or
quarter level
VAR MaxMonth = MAX('Date'[Month])
RETURN
MaxMonth

I can leave the first Variable in place and only use the MaxMonth variable after the return. The result shows the month number per month:

Figure 6 – Get the current month number per row (Figure by the Author) 

According to the definition formulated before, we must get the last Customer Count YTD for the latest month with Sales. 

I can do this with the following Expression: 

Linear extrapolation Customer Count YTD trend =
// Get the number of months since the start of the year
VAR LastMonthWithData = CALCULATE(MAXX('Online Sales'
,RELATED('Date'[Month])
)
,ALLEXCEPT('Date', 'Date'[Year])
)
// Get the last month
// Is needed if we are looking at the data at the year, semester, or
quarter level
VAR MaxMonth = MAX('Date'[Month])
// Get the Customer Count YTD
VAR LastCustomerCountYTD = CALCULATE([Online Customer Count YTD]
,ALLEXCEPT('Date', 'Date'[Year])
,'Date'[Month] = LastMonthWithData
)

RETURN
LastCustomerCountYTD

As expected, the result shows 2,091 for each month:

Figure 7 – Calculating the latest Customer Count YTD for each month (Figure by the Author) 

You can see why I start with a table or a Matrix when developing complex Measures. 

Now, imagine that one intermediary result is a date or a text. 

Showing such a result in a line visual will not be practical. 

We are ready to calculate the final result according to the mathematical definition above. 

Step 3: The final result 

We have two ways to calculate the result: 

1. Write the expression after the RETURN statement. 

2. Create a new Variable “Result” and use this Variable after the RETURN statement. The final Expression is this: 

(LastCustomerCountYTD / LastMonthWithData) * MaxMonth

The first Variant looks like this: 

Linear extrapolation Customer Count YTD trend =
// Get the number of months since the start of the year
VAR LastMonthWithData = CALCULATE(MAXX('Online Sales'
,RELATED('Date'[Month])

)

,ALLEXCEPT('Date', 'Date'[Year])

)
// Get the last month
// Is needed if we are looking at the data at the year, semester, or
quarter level
VAR MaxMonth = MAX('Date'[Month])
// Get the Customer Count YTD
VAR LastCustomerCountYTD = CALCULATE([Online Customer Count YTD]
,ALLEXCEPT('Date', 'Date'[Year])
,'Date'[Month] = LastMonthWithData
)

RETURN
// Calculating the extrapolation
(LastCustomerCountYTD / LastMonthWithData) * MaxMonth

This is the second Variant: 

Linear extrapolation Customer Count YTD trend =
// Get the number of months since the start of the year
VAR LastMonthWithData = CALCULATE(MAXX('Online Sales'
,RELATED('Date'[Month])
)
,ALLEXCEPT('Date', 'Date'[Year])
)
// Get the last month
// Is needed if we are looking at the data at the year, semester, or
quarter level
VAR MaxMonth = MAX('Date'[Month])
// Get the Customer Count YTD
VAR LastCustomerCountYTD = CALCULATE([Online Customer Count YTD]
,ALLEXCEPT('Date', 'Date'[Year])
,'Date'[Month] = LastMonthWithData
)
// Calculating the extrapolation
VAR Result =
(LastCustomerCountYTD / LastMonthWithData) * MaxMonth
RETURN
Result

The result is the same. 

The second variant allows us to quickly switch back to the Intermediary results if the final result  is incorrect without needing to set the expression after the RETURN statement as a comment. 

It simply makes life easier. 

But it’s up to you which variant you like more. 

The result is this:

Figure 8 – Final result in a table (Figure by the Author) 

When converting this table to a Line Visual, we get the same result as in the first figure. The last step will be to set the line as a Dashed line, to get the needed visualization.

Figure 9 – Set the line for the extrapolation as a dashed line (Figure by the Author) 

Complex calculated columns 

The process is the same when writing complex DAX expressions for calculated columns. The difference is that we can see the result in the Table View of Power BI Desktop. 

Be aware that when calculated columns are calculated, the results are physically stored in the table when you press Enter. 

The results of Measures are not stored in the Model. They are calculated on the fly in the Visualizations. 

Another difference is that we can leverage Context Transition to get our result when we need it to depend on other rows in the table. 

Read this piece to learn more about this fascinating topic: 

Conclusion 

The development process for complex expressions always follows the same steps: 

1. Understand the requirements – Ask if something is unclear. 

2. Define the math for the results. 

3. Start with intermediary results and understand the results. 

4. Build on the intermediary results one by one – Do not try to write all in one step.

5. Decide where to write the expression for the final result. 

Following such a process can save you the day, as you don’t need to write everything in one step. 

Moreover, getting these intermediary results allows you to understand what’s happening and explore the Filter Context. 

This will help you learn DAX more efficiently and build even more complex stuff. 

But, be aware: Even though a certain level of complexity is needed, a good developer will keep it as simple as possible, while maintaining the least amount of complexity. 

References 

Here is the article mentioned at the beginning of this piece, to calculate the linear interpolation.

Like in my previous articles, I use the Contoso sample dataset. You can download the  ContosoRetailDW Dataset for free from Microsoft here.

The Contoso Data can be freely used under the MIT License, as described here. I changed the dataset to shift the data to contemporary dates.

The post How to Develop Complex DAX Expressions appeared first on Towards Data Science.

]]>
Practical SQL Puzzles That Will Level Up Your Skill https://towardsdatascience.com/practical-sql-puzzles-that-will-level-up-your-skill/ Tue, 04 Mar 2025 19:46:10 +0000 https://towardsdatascience.com/?p=598673 Three real-world SQL patterns that can be applied to many problems

The post Practical SQL Puzzles That Will Level Up Your Skill appeared first on Towards Data Science.

]]>
There are some Sql patterns that, once you know them, you start seeing them everywhere. The solutions to the puzzles that I will show you today are actually very simple SQL queries, but understanding the concept behind them will surely unlock new solutions to the queries you write on a day-to-day basis.

These challenges are all based on real-world scenarios, as over the past few months I made a point of writing down every puzzle-like query that I had to build. I also encourage you to try them for yourself, so that you can challenge yourself first, which will improve your learning!

All queries to generate the datasets will be provided in a PostgreSQL and DuckDB-friendly syntax, so that you can easily copy and play with them. At the end I will also provide you a link to a GitHub repo containing all the code, as well as the answer to the bonus challenge I will leave for you!

I organized these puzzles in order of increasing difficulty, so, if you find the first ones too easy, at least take a look at the last one, which uses a technique that I truly believe you won’t have seen before.

Okay, let’s get started.

Analyzing ticket moves

I love this puzzle because of how short and simple the final query is, even though it deals with many edge cases. The data for this challenge shows tickets moving in between Kanban stages, and the objective is to find how long, on average, tickets stay in the Doing stage.

The data contains the ID of the ticket, the date the ticket was created, the date of the move, and the “from” and “to” stages of the move. The stages present are New, Doing, Review, and Done.

Some things you need to know (edge cases):

  • Tickets can move backwards, meaning tickets can go back to the Doing stage.
  • You should not include tickets that are still stuck in the Doing stage, as there is no way to know how long they will stay there for.
  • Tickets are not always created in the New stage.
CREATE TABLE ticket_moves (
    ticket_id INT NOT NULL,
    create_date DATE NOT NULL,
    move_date DATE NOT NULL,
    from_stage TEXT NOT NULL,
    to_stage TEXT NOT NULL
);
INSERT INTO ticket_moves (ticket_id, create_date, move_date, from_stage, to_stage)
    VALUES
        -- Ticket 1: Created in "New", then moves to Doing, Review, Done.
        (1, '2024-09-01', '2024-09-03', 'New', 'Doing'),
        (1, '2024-09-01', '2024-09-07', 'Doing', 'Review'),
        (1, '2024-09-01', '2024-09-10', 'Review', 'Done'),
        -- Ticket 2: Created in "New", then moves: New → Doing → Review → Doing again → Review.
        (2, '2024-09-05', '2024-09-08', 'New', 'Doing'),
        (2, '2024-09-05', '2024-09-12', 'Doing', 'Review'),
        (2, '2024-09-05', '2024-09-15', 'Review', 'Doing'),
        (2, '2024-09-05', '2024-09-20', 'Doing', 'Review'),
        -- Ticket 3: Created in "New", then moves to Doing. (Edge case: no subsequent move from Doing.)
        (3, '2024-09-10', '2024-09-16', 'New', 'Doing'),
        -- Ticket 4: Created already in "Doing", then moves to Review.
        (4, '2024-09-15', '2024-09-22', 'Doing', 'Review');

A summary of the data:

  • Ticket 1: Created in the New stage, moves normally to Doing, then Review, and then Done.
  • Ticket 2: Created in New, then moves: New → Doing → Review → Doing again → Review.
  • Ticket 3: Created in New, moves to Doing, but it is still stuck there.
  • Ticket 4: Created in the Doing stage, moves to Review afterward.

It might be a good idea to stop for a bit and think how you would deal with this. Can you find out how long a ticket stays on a single stage?

Honestly, this sounds intimidating at first, and it looks like it will be a nightmare to deal with all the edge cases. Let me show you the full solution to the problem, and then I will explain what is happening afterward.

WITH stage_intervals AS (
    SELECT
        ticket_id,
        from_stage,
        move_date 
        - COALESCE(
            LAG(move_date) OVER (
                PARTITION BY ticket_id 
                ORDER BY move_date
            ), 
            create_date
        ) AS days_in_stage
    FROM
        ticket_moves
)
SELECT
    SUM(days_in_stage) / COUNT(DISTINCT ticket_id) as avg_days_in_doing
FROM
    stage_intervals
WHERE
    from_stage = 'Doing';

The first CTE uses the LAG function to find the previous move of the ticket, which will be the time the ticket entered that stage. Calculating the duration is as simple as subtracting the previous date from the move date.

What you should notice is the use of the COALESCE in the previous move date. What that does is that if a ticket doesn’t have a previous move, then it uses the date of creation of the ticket. This takes care of the cases of tickets being created directly into the Doing stage, as it still will properly calculate the time it took to leave the stage.

This is the result of the first CTE, showing the time spent in each stage. Notice how the Ticket 2 has two entries, as it visited the Doing stage in two separate occasions.

With this done, it’s just a matter of getting the average as the SUM of total days spent in doing, divided by the distinct number of tickets that ever left the stage. Doing it this way, instead of simply using the AVG, makes sure that the two rows for Ticket 2 get properly accounted for as a single ticket.

Not so bad, right?

Finding contract sequences

The goal of this second challenge is to find the most recent contract sequence of every employee. A break of sequence happens when two contracts have a gap of more than one day between them. 

In this dataset, there are no contract overlaps, meaning that a contract for the same employee either has a gap or ends a day before the new one starts.

CREATE TABLE contracts (
    contract_id integer PRIMARY KEY,
    employee_id integer NOT NULL,
    start_date date NOT NULL,
    end_date date NOT NULL
);

INSERT INTO contracts (contract_id, employee_id, start_date, end_date)
VALUES 
    -- Employee 1: Two continuous contracts
    (1, 1, '2024-01-01', '2024-03-31'),
    (2, 1, '2024-04-01', '2024-06-30'),
    -- Employee 2: One contract, then a gap of three days, then two contracts
    (3, 2, '2024-01-01', '2024-02-15'),
    (4, 2, '2024-02-19', '2024-04-30'),
    (5, 2, '2024-05-01', '2024-07-31'),
    -- Employee 3: One contract
    (6, 3, '2024-03-01', '2024-08-31');

As a summary of the data:

  • Employee 1: Has two continuous contracts.
  • Employee 2: One contract, then a gap of three days, then two contracts.
  • Employee 3: One contract.

The expected result, given the dataset, is that all contracts should be included except for the first contract of Employee 2, which is the only one that has a gap.

Before explaining the logic behind the solution, I would like you to think about what operation can be used to join the contracts that belong to the same sequence. Focus only on the second row of data, what information do you need to know if this contract was a break or not?

I hope it’s clear that this is the perfect situation for window functions, again. They are incredibly useful for solving problems like this, and understanding when to use them helps a lot in finding clean solutions to problems.

First thing to do, then, is to get the end date of the previous contract for the same employee with the LAG function. Doing that, it’s simple to compare both dates and check if it was a break of sequence.

WITH ordered_contracts AS (
    SELECT
        *,
        LAG(end_date) OVER (PARTITION BY employee_id ORDER BY start_date) AS previous_end_date
    FROM
        contracts
),
gapped_contracts AS (
    SELECT
        *,
        -- Deals with the case of the first contract, which won't have
        -- a previous end date. In this case, it's still the start of a new
        -- sequence.
        CASE WHEN previous_end_date IS NULL
            OR previous_end_date < start_date - INTERVAL '1 day' THEN
            1
        ELSE
            0
        END AS is_new_sequence
    FROM
        ordered_contracts
)
SELECT * FROM gapped_contracts ORDER BY employee_id ASC;

An intuitive way to continue the query is to number the sequences of each employee. For example, an employee who has no gap, will always be on his first sequence, but an employee who had 5 breaks in contracts will be on his 5th sequence. Funnily enough, this is done by another window function.

--
-- Previous CTEs
--
sequences AS (
    SELECT
        *,
        SUM(is_new_sequence) OVER (PARTITION BY employee_id ORDER BY start_date) AS sequence_id
FROM
    gapped_contracts
)
SELECT * FROM sequences ORDER BY employee_id ASC;

Notice how, for Employee 2, he starts his sequence #2 after the first gapped value. To finish this query, I grouped the data by employee, got the value of their most recent sequence, and then did an inner join with the sequences to keep only the most recent one.

--
-- Previous CTEs
--
max_sequence AS (
    SELECT
        employee_id,
        MAX(sequence_id) AS max_sequence_id
FROM
    sequences
GROUP BY
    employee_id
),
latest_contract_sequence AS (
    SELECT
        c.contract_id,
        c.employee_id,
        c.start_date,
        c.end_date
    FROM
        sequences c
        JOIN max_sequence m ON c.sequence_id = m.max_sequence_id
            AND c.employee_id = m.employee_id
        ORDER BY
            c.employee_id,
            c.start_date
)
SELECT
    *
FROM
    latest_contract_sequence;

As expected, our final result is basically our starting query just with the first contract of Employee 2 missing! 

Tracking concurrent events

Finally, the last puzzle — I’m glad you made it this far. 

For me, this is the most mind-blowing one, as when I first encountered this problem I thought of a completely different solution that would be a mess to implement in SQL.

For this puzzle, I’ve changed the context from what I had to deal with for my job, as I think it will make it easier to explain. 

Imagine you’re a data analyst at an event venue, and you’re analyzing the talks scheduled for an upcoming event. You want to find the time of day where there will be the highest number of talks happening at the same time.

This is what you should know about the schedules:

  • Rooms are booked in increments of 30min, e.g. from 9h-10h30.
  • The data is clean, there are no overbookings of meeting rooms.
  • There can be back-to-back meetings in a single meeting room.

Meeting schedule visualized (this is the actual data). 

CREATE TABLE meetings (
    room TEXT NOT NULL,
    start_time TIMESTAMP NOT NULL,
    end_time TIMESTAMP NOT NULL
);

INSERT INTO meetings (room, start_time, end_time) VALUES
    -- Room A meetings
    ('Room A', '2024-10-01 09:00', '2024-10-01 10:00'),
    ('Room A', '2024-10-01 10:00', '2024-10-01 11:00'),
    ('Room A', '2024-10-01 11:00', '2024-10-01 12:00'),
    -- Room B meetings
    ('Room B', '2024-10-01 09:30', '2024-10-01 11:30'),
    -- Room C meetings
    ('Room C', '2024-10-01 09:00', '2024-10-01 10:00'),
    ('Room C', '2024-10-01 11:30', '2024-10-01 12:00');

The way to solve this is using what is called a Sweep Line Algorithm, or also known as an event-based solution. This last name actually helps to understand what will be done, as the idea is that instead of dealing with intervals, which is what we have in the original data, we deal with events instead.

To do this, we need to transform every row into two separate events. The first event will be the Start of the meeting, and the second event will be the End of the meeting.

WITH events AS (
  -- Create an event for the start of each meeting (+1)
  SELECT 
    start_time AS event_time, 
    1 AS delta
  FROM meetings
  UNION ALL
  -- Create an event for the end of each meeting (-1)
  SELECT 
   -- Small trick to work with the back-to-back meetings (explained later)
    end_time - interval '1 minute' as end_time,
    -1 AS delta
  FROM meetings
)
SELECT * FROM events;

Take the time to understand what is happening here. To create two events from a single row of data, we’re simply unioning the dataset on itself; the first half uses the start time as the timestamp, and the second part uses the end time.

You might already notice the delta column created and see where this is going. When an event starts, we count it as +1, when it ends, we count it as -1. You might even be already thinking of another window function to solve this, and you’re actually right!

But before that, let me just explain the trick I used in the end dates. As I don’t want back-to-back meetings to count as two concurrent meetings, I’m subtracting a single minute of every end date. This way, if a meeting ends and another starts at 10h30, it won’t be assumed that two meetings are concurrently happening at 10h30.

Okay, back to the query and yet another window function. This time, though, the function of choice is a rolling SUM.

--
-- Previous CTEs
--
ordered_events AS (
  SELECT
    event_time,
    delta,
    SUM(delta) OVER (ORDER BY event_time, delta DESC) AS concurrent_meetings
  FROM events
)
SELECT * FROM ordered_events ORDER BY event_time DESC;

The rolling SUM at the Delta column is essentially walking down every record and finding how many events are active at that time. For example, at 9 am sharp, it sees two events starting, so it marks the number of concurrent meetings as two!

When the third meeting starts, the count goes up to three. But when it gets to 9h59 (10 am), then two meetings end, bringing the counter back to one. With this data, the only thing missing is to find when the highest value of concurrent meetings happens.

--
-- Previous CTEs
--
max_events AS (
  -- Find the maximum concurrent meetings value
  SELECT 
    event_time, 
    concurrent_meetings,
    RANK() OVER (ORDER BY concurrent_meetings DESC) AS rnk
  FROM ordered_events
)
SELECT event_time, concurrent_meetings
FROM max_events
WHERE rnk = 1;

That’s it! The interval of 9h30–10h is the one with the largest number of concurrent meetings, which checks out with the schedule visualization above!

This solution looks incredibly simple in my opinion, and it works for so many situations. Every time you are dealing with intervals now, you should think if the query wouldn’t be easier if you thought about it in the perspective of events.

But before you move on, and to really nail down this concept, I want to leave you with a bonus challenge, which is also a common application of the Sweep Line Algorithm. I hope you give it a try!

Bonus challenge

The context for this one is still the same as the last puzzle, but now, instead of trying to find the period when there are most concurrent meetings, the objective is to find bad scheduling. It seems that there are overlaps in the meeting rooms, which need to be listed so it can be fixed ASAP.

How would you find out if the same meeting room has two or more meetings booked at the same time? Here are some tips on how to solve it:

  • It’s still the same algorithm.
  • This means you will still do the UNION, but it will look slightly different.
  • You should think in the perspective of each meeting room.

You can use this data for the challenge:

CREATE TABLE meetings_overlap (
    room TEXT NOT NULL,
    start_time TIMESTAMP NOT NULL,
    end_time TIMESTAMP NOT NULL
);

INSERT INTO meetings_overlap (room, start_time, end_time) VALUES
    -- Room A meetings
    ('Room A', '2024-10-01 09:00', '2024-10-01 10:00'),
    ('Room A', '2024-10-01 10:00', '2024-10-01 11:00'),
    ('Room A', '2024-10-01 11:00', '2024-10-01 12:00'),
    -- Room B meetings
    ('Room B', '2024-10-01 09:30', '2024-10-01 11:30'),
    -- Room C meetings
    ('Room C', '2024-10-01 09:00', '2024-10-01 10:00'),
    -- Overlaps with previous meeting.
    ('Room C', '2024-10-01 09:30', '2024-10-01 12:00');

If you’re interested in the solution to this puzzle, as well as the rest of the queries, check this GitHub repo.

Conclusion

The first takeaway from this blog post is that window functions are overpowered. Ever since I got more comfortable with using them, I feel that my queries have gotten so much simpler and easier to read, and I hope the same happens to you.

If you’re interested in learning more about them, you would probably enjoy reading this other blog post I’ve written, where I go over how you can understand and use them effectively.

The second takeaway is that these patterns used in the challenges really do happen in many other places. You might need to find sequences of subscriptions, customer retention, or you might need to find overlap of tasks. There are many situations when you will need to use window functions in a very similar fashion to what was done in the puzzles.

The third thing I want you to remember is about this solution to using events besides dealing with intervals. I’ve looked at some problems I solved a long time ago that I could’ve used this pattern on to make my life easier, and unfortunately, I didn’t know about it at the time.


I really do hope you enjoyed this post and gave a shot to the puzzles yourself. And I’m sure that if you made it this far, you either learned something new about SQL or strengthened your knowledge of window functions! 

Thank you so much for reading. If you have questions or just want to get in touch with me, don’t hesitate to contact me at mtrentz.com.

All images by the author unless stated otherwise.

The post Practical SQL Puzzles That Will Level Up Your Skill appeared first on Towards Data Science.

]]>
Data Science: From School to Work, Part II https://towardsdatascience.com/data-science-from-school-to-work-part-ii/ Mon, 03 Mar 2025 14:00:00 +0000 https://towardsdatascience.com/?p=598609 How to write clean Python code

The post Data Science: From School to Work, Part II appeared first on Towards Data Science.

]]>
In my previous article, I highlighted the importance of effective project management in Python development. Now, let’s shift our focus to the code itself and explore how to write clean, maintainable code — an essential practice in professional and collaborative environments. 

  • Readability & Maintainability: Well-structured code is easier to read, understand, and modify. Other developers — or even your future self — can quickly grasp the logic without struggling to decipher messy code.
  • Debugging & Troubleshooting: Organized code with clear variable names and structured functions makes it easier to identify and fix bugs efficiently.
  • Scalability & Reusability: Modular, well-organized code can be reused across different projects, allowing for seamless scaling without disrupting existing functionality.

So, as you work on your next Python project, remember: 

Half of good code is Clean Code.


Introduction

Python is one of the most popular and versatile Programming languages, appreciated for its simplicity, comprehensibility and large community. Whether web development, data analysis, artificial intelligence or automation of tasks — Python offers powerful and flexible tools that are suitable for a wide range of areas.

However, the efficiency and maintainability of a Python project depends heavily on the practices used by the developers. Poor structuring of the code, a lack of conventions or even a lack of documentation can quickly turn a promising project into a maintenance and development-intensive puzzle. It is precisely this point that makes the difference between student code and professional code.

This article is intended to present the most important best practices for writing high-quality Python code. By following these recommendations, developers can create scripts and applications that are not only functional, but also readable, performant and easily maintainable by third parties.

Adopting these best practices right from the start of a project not only ensures better collaboration within teams, but also prepares your code to evolve with future needs. Whether you’re a beginner or an experienced developer, this guide is designed to support you in all your Python developments.


The code structuration

Good code structuring in Python is essential. There are two main project layouts: flat layout and src layout.

The flat layout places the source code directly in the project root without an additional folder. This approach simplifies the structure and is well-suited for small scripts, quick prototypes, and projects that do not require complex packaging. However, it may lead to unintended import issues when running tests or scripts.

📂 my_project/
├── 📂 my_project/                  # Directly in the root
│   ├── 🐍 __init__.py
│   ├── 🐍 main.py                   # Main entry point (if needed)
│   ├── 🐍 module1.py             # Example module
│   └── 🐍 utils.py
├── 📂 tests/                            # Unit tests
│   ├── 🐍 test_module1.py
│   ├── 🐍 test_utils.py
│   └── ...
├── 📄 .gitignore                      # Git ignored files
├── 📄 pyproject.toml              # Project configuration (Poetry, setuptools)
├── 📄 uv.lock                         # UV file
├── 📄 README.md               # Main project documentation
├── 📄 LICENSE                     # Project license
├── 📄 Makefile                       # Automates common tasks
├── 📄 DockerFile                   # Automates common tasks
├── 📂 .github/                        # GitHub Actions workflows (CI/CD)
│   ├── 📂 actions/               
│   └── 📂 workflows/

On the other hand, the src layout (src is the contraction of source) organizes the source code inside a dedicated src/ directory, preventing accidental imports from the working directory and ensuring a clear separation between source files and other project components like tests or configuration files. This layout is ideal for large projects, libraries, and production-ready applications as it enforces proper package installation and avoids import conflicts.

📂 my-project/
├── 📂 src/                              # Main source code
│   ├── 📂 my_project/            # Main package
│   │   ├── 🐍 __init__.py        # Makes the folder a package
│   │   ├── 🐍 main.py             # Main entry point (if needed)
│   │   ├── 🐍 module1.py       # Example module
│   │   └── ...
│   │   ├── 📂 utils/                  # Utility functions
│   │   │   ├── 🐍 __init__.py     
│   │   │   ├── 🐍 data_utils.py  # data functions
│   │   │   ├── 🐍 io_utils.py      # Input/output functions
│   │   │   └── ...
├── 📂 tests/                             # Unit tests
│   ├── 🐍 test_module1.py     
│   ├── 🐍 test_module2.py     
│   ├── 🐍 conftest.py              # Pytest configurations
│   └── ...
├── 📂 docs/                            # Documentation
│   ├── 📄 index.md                
│   ├── 📄 architecture.md         
│   ├── 📄 installation.md         
│   └── ...                     
├── 📂 notebooks/                   # Jupyter Notebooks for exploration
│   ├── 📄 exploration.ipynb       
│   └── ...                     
├── 📂 scripts/                         # Standalone scripts (ETL, data processing)
│   ├── 🐍 run_pipeline.py         
│   ├── 🐍 clean_data.py           
│   └── ...                     
├── 📂 data/                            # Raw or processed data (if applicable)
│   ├── 📂 raw/                    
│   ├── 📂 processed/
│   └── ....                                 
├── 📄 .gitignore                      # Git ignored files
├── 📄 pyproject.toml              # Project configuration (Poetry, setuptools)
├── 📄 uv.lock                         # UV file
├── 📄 README.md               # Main project documentation
├── 🐍 setup.py                       # Installation script (if applicable)
├── 📄 LICENSE                     # Project license
├── 📄 Makefile                       # Automates common tasks
├── 📄 DockerFile                   # To create Docker image
├── 📂 .github/                        # GitHub Actions workflows (CI/CD)
│   ├── 📂 actions/               
│   └── 📂 workflows/

Choosing between these layouts depends on the project’s complexity and long-term goals. For production-quality code, the src/ layout is often recommended, whereas the flat layout works well for simple or short-lived projects.

You can imagine different templates that are better adapted to your use case. It is important that you maintain the modularity of your project. Do not hesitate to create subdirectories and to group together scripts with similar functionalities and separate those with different uses. A good code structure ensures readability, maintainability, scalability and reusability and helps to identify and correct errors efficiently.

Cookiecutter is an open-source tool for generating preconfigured project structures from templates. It is particularly useful for ensuring the coherence and organization of projects, especially in Python, by applying good practices from the outset. The flat layout and src layout can be initiate using a UV tool.


The SOLID principles

SOLID programming is an essential approach to software development based on five basic principles for improving code quality, maintainability and scalability. These principles provide a clear framework for developing robust, flexible systems. By following the Solid Principles, you reduce the risk of complex dependencies, make testing easier and ensure that applications can evolve more easily in the face of change. Whether you are working on a single project or a large-scale application, mastering SOLID is an important step towards adopting object-oriented programming best practices.

S — Single Responsibility Principle (SRP)

The principle of single responsibility means that a class/function can only manage one thing. This means that it only has one reason to change. This makes the code more maintainable and easier to read. A class/function with multiple responsibilities is difficult to understand and often a source of errors.

Example:

# Violates SRP
class MLPipeline:
    def __init__(self, df: pd.DataFrame, target_column: str):
        self.df = df
        self.target_column = target_column
        self.scaler = StandardScaler()
        self.model = RandomForestClassifier()
   
    def preprocess_data(self):
        self.df.fillna(self.df.mean(), inplace=True)  # Handle missing values
        X = self.df.drop(columns=[self.target_column])
        y = self.df[self.target_column]
        X_scaled = self.scaler.fit_transform(X)  # Feature scaling
        return X_scaled, y
        
    def train_model(self):
        X, y = self.preprocess_data()  # Data preprocessing inside model training
        self.model.fit(X, y)
        print("Model training complete.")

Here, the Report class has two responsibilities: Generate content and save the file.

# Follows SRP
class DataPreprocessor:
    def __init__(self):
        self.scaler = StandardScaler()
        
    def preprocess(self, df: pd.DataFrame, target_column: str):
        df = df.copy()
        df.fillna(df.mean(), inplace=True)  # Handle missing values
        X = df.drop(columns=[target_column])
 
# Follows SRP
class DataPreprocessor:
    def __init__(self):
        self.scaler = StandardScaler()
        
    def preprocess(self, df: pd.DataFrame, target_column: str):
        df = df.copy()
        df.fillna(df.mean(), inplace=True)  # Handle missing values
        X = df.drop(columns=[target_column])
        y = df[target_column]
        X_scaled = self.scaler.fit_transform(X)  # Feature scaling
        return X_scaled, y


class ModelTrainer:
    def __init__(self, model):
        self.model = model
        
    def train(self, X, y):
        self.model.fit(X, y)
        print("Model training complete.")

O — Open/Closed Principle (OCP)

The open/close principle means that a class/function must be open to extension, but closed to modification. This makes it possible to add functionality without the risk of breaking existing code.

It is not easy to develop with this principle in mind, but a good indicator for the main developer is to see more and more additions (+) and fewer and fewer removals (-) in the merge requests during project development.

L — Liskov Substitution Principle (LSP)

The Liskov substitution principle states that a subordinate class can replace its parent class without changing the behavior of the program, ensuring that the subordinate class meets the expectations defined by the base class. It limits the risk of unexpected errors.

Example :

# Violates LSP
class Rectangle:
    def __init__(self, width, height):
        self.width = width
        self.height = height

    def area(self):
        return self.width * self.height


class Square(Rectangle):
    def __init__(self, side):
        super().__init__(side, side)
# Changing the width of a square violates the idea of a square.

To respect the LSP, it is better to avoid this hierarchy and use independent classes:

class Shape:
    def area(self):
        raise NotImplementedError


class Rectangle(Shape):
    def __init__(self, width, height):
        self.width = width
        self.height = height

    def area(self):
        return self.width * self.height


class Square(Shape):
    def __init__(self, side):
        self.side = side

    def area(self):
        return self.side * self.side

I — Interface Segregation Principle (ISP)

The principle of interface separation states that several small classes should be built instead of one with methods that cannot be used in certain cases. This reduces unnecessary dependencies.

Example:

# Violates ISP
class Animal:
    def fly(self):
        raise NotImplementedError

    def swim(self):
        raise NotImplementedError

It is better to split the class Animal into several classes:

# Follows ISP
class CanFly:
    def fly(self):
        raise NotImplementedError


class CanSwim:
    def swim(self):
        raise NotImplementedError


class Bird(CanFly):
    def fly(self):
        print("Flying")


class Fish(CanSwim):
    def swim(self):
        print("Swimming")

D — Dependency Inversion Principle (DIP)

The Dependency Inversion Principle means that a class must depend on an abstract class and not on a concrete class. This reduces the connections between the classes and makes the code more modular.

Example:

# Violates DIP
class Database:
    def connect(self):
        print("Connecting to database")


class UserService:
    def __init__(self):
        self.db = Database()

    def get_users(self):
        self.db.connect()
        print("Getting users")

Here, the attribute db of UserService depends on the class Database. To respect the DIP, db has to depend on an abstract class.

# Follows DIP
class DatabaseInterface:
    def connect(self):
        raise NotImplementedError


class MySQLDatabase(DatabaseInterface):
    def connect(self):
        print("Connecting to MySQL database")


class UserService:
    def __init__(self, db: DatabaseInterface):
        self.db = db

    def get_users(self):
        self.db.connect()
        print("Getting users")


# We can easily change the used database.
db = MySQLDatabase()
service = UserService(db)
service.get_users()

PEP standards

PEPs (Python Enhancement Proposals) are technical and informative documents that describe new features, language improvements or guidelines for the Python community. Among them, PEP 8, which defines style conventions for Python code, plays a fundamental role in promoting readability and consistency in projects.

Adopting the PEP standards, especially PEP 8, not only ensures that the code is understandable to other developers, but also that it conforms to the standards set by the community. This facilitates collaboration, re-reads and long-term maintenance.

In this article, I present the most important aspects of the PEP standards, including:

  • Style Conventions (PEP 8): Indentations, variable names and import organization.
  • Best practices for documenting code (PEP 257).
  • Recommendations for writing typed, maintainable code (PEP 484 and PEP 563).

Understanding and applying these standards is essential to take full advantage of the Python ecosystem and contribute to professional quality projects.


PEP 8

This documentation is about coding conventions to standardize the code, and there exists a lot of documentation about the PEP 8. I will not show all recommendation in this posts, only those that I judge essential when I review a code

Naming conventions

Variable, function and module names should be in lower case, and use underscore to separate words. This typographical convention is called snake_case.


my_variable
my_new_function()
my_module

Constances are written in capital letters and set at the beginning of the script (after the imports):


LIGHT_SPEED
MY_CONSTANT

Finally, class names and exceptions use the CamelCase format (a capital letter at the beginning of each word). Exceptions must contain an Error at the end.


MyGreatClass
MyGreatError

Remember to give your variables names that make sense! Don’t use variable names like v1, v2, func1, i, toto…

Single-character variable names are permitted for loops and indexes:

my_list = [1, 3, 5, 7, 9, 11]
for i in range(len(my_liste)):
    print(my_list[i])

A more “pythonic” way of writing, to be preferred to the previous example, gets rid of the i index:

my_list = [1, 3, 5, 7, 9, 11]
for element in my_list:
    print(element )

Spaces management

It is recommended surrounding operators (+, -, *, /, //, %, ==, !=, >, not, in, and, or, …) with a space before AND after:

# recommended code:
my_variable = 3 + 7
my_text = "mouse"
my_text == my_variable

# not recommended code:
my_variable=3+7
my_text="mouse"
my_text== ma_variable

You can’t add several spaces around an operator. On the other hand, there are no spaces inside square brackets, braces or parentheses:

# recommended code:
my_list[1]
my_dict{"key"}
my_function(argument)

# not recommended code:
my_list[ 1 ]
my_dict{ "key" }
my_function( argument )

A space is recommended after the characters “:” and “,”, but not before:

# recommended code:
my_list = [1, 2, 3]
my_dict = {"key1": "value1", "key2": "value2"}
my_function(argument1, argument2)

# not recommended code:
my_list = [1 , 2 , 3]
my_dict = {"key1":"value1", "key2":"value2"}
my_function(argument1 , argument2)

However, when indexing lists, we don’t put a space after the “:”:

my_list = [1, 3, 5, 7, 9, 1]

# recommended code:
my_list[1:3]
my_list[1:4:2]
my_list[::2]

# not recommended code:
my_list[1 : 3]
my_list[1: 4:2 ]
my_list[ : :2]

Line length

For the sake of readability, we recommend writing lines of code no longer than 80 characters long. However, in certain circumstances this rule can be broken, especially if you are working on a Dash project, it may be complicated to respect this recommendation 

The \ character can be used to cut lines that are too long.

For example:

my_variable = 3
if my_variable > 1 and my_variable < 10 \
    and my_variable % 2 == 1 and my_variable % 3 == 0:
    print(f"My variable is equal to {my_variable }")

Within a parenthesis, you can return to the line without using the \ character. This can be useful for specifying the arguments of a function or method when defining or using it:

def my_function(argument_1, argument_2,
                argument_3, argument_4):
    return argument_1 + argument_2

It is also possible to create multi-line lists or dictionaries by skipping a line after a comma:

my_list = [1, 2, 3,
          4, 5, 6,
          7, 8, 9]
my_dict = {"key1": 13,
          "key2": 42,
          "key2": -10}

Blank lines

In a script, blank lines are useful for visually separating different parts of the code. It is recommended to leave two blank lines before the definition of a function or class, and to leave a single blank line before the definition of a method (in a class). You can also leave a blank line in the body of a function to separate the logical sections of the function, but this should be used sparingly.

Comments

Comments always begin with the # symbol followed by a space. They give clear explanations of the purpose of the code and must be synchronized with the code, i.e. if the code is modified, the comments must be too (if applicable). They are on the same indentation level as the code they comment on. Comments are complete sentences, with a capital letter at the beginning (unless the first word is a variable, which is written without a capital letter) and a period at the end.I strongly recommend writing comments in English and it is important to be consistent between the language used for comments and the language used to name variables. Finally, Comments that follow the code on the same line should be avoided wherever possible, and should be separated from the code by at least two spaces.

Tool to help you

Ruff is a linter (code analysis tool) and formatter for Python code written in Rust. It combines the advantages of the flake8 linter and black and isort formatting while being faster.

Ruff has an extension on the VS Code editor.

To check your code you can type:

ruff check my_modul.py

But, it is also possible to correct it with the following command:

ruff format my_modul.py

PEP 20

PEP 20: The Zen of Python is a set of 19 principles written in poetic form. They are more a way of coding than actual guidelines.

Beautiful is better than ugly.
Explicit is better than implicit.
Simple is better than complex.
Complex is better than complicated.
Flat is better than nested.
Sparse is better than dense.
Readability counts.
Special cases aren’t special enough to break the rules.
Although practicality beats purity.
Errors should never pass silently.
Unless explicitly silenced.
In the face of ambiguity, refuse the temptation to guess.
There should be one– and preferably only one –obvious way to do it.
Although that way may not be obvious at first unless you’re Dutch.
Now is better than never.
Although never is often better than *right* now.
If the implementation is hard to explain, it’s a bad idea.
If the implementation is easy to explain, it may be a good idea.
Namespaces are one honking great idea — let’s do more of those!

PEP 257

The aim of PEP 257 is to standardize the use of docstrings.

What is a docstring?

A docstring is a string that appears as the first instruction after the definition of a function, class or method. A docstring becomes the output of the __doc__ special attribute of this object.

def my_function():
    """This is a doctring."""
    pass

And we have:

>>> my_function.__doc__
>>> 'This is a doctring.'

We always write a docstring between triple double quote """.

Docstring on a line

Used for simple functions or methods, it must fit on a single line, with no blank line at the beginning or end. The closing quotes are on the same line as opening quotes and there are no blank lines before or after the docstring.

def add(a, b):
    """Return the sum of a and b."""
    return a + b

Single-line docstring MUST NOT reintegrate function/method parameters. Do not do:

def my_function(a, b):
    """ my_function(a, b) -> list"""

Docstring on several lines

The first line should be a summary of the object being documented. An empty line follows, followed by more detailed explanations or clarifications of the arguments.

def divide(a, b):
    """Divide a byb.

    Returns the result of the division. Raises a ValueError if b equals 0.
    """
    if b == 0:
        raise ValueError("Only Chuck Norris can divide by 0") return a / b

Complete Docstring

A complete docstring is made up of several parts (in this case, based on the numpydoc standard).

  1. Short description: Summarizes the main functionality.
  2. Parameters: Describes the arguments with their type, name and role.
  3. Returns: Specifies the type and role of the returned value.
  4. Raises: Documents exceptions raised by the function.
  5. Notes (optional): Provides additional explanations.
  6. Examples (optional): Contains illustrated usage examples with expected results or exceptions.
def calculate_mean(numbers: list[float]) -> float:
    """
    Calculate the mean of a list of numbers.

    Parameters
    ----------
    numbers : list of float
        A list of numerical values for which the mean is to be calculated.

    Returns
    -------
    float
        The mean of the input numbers.

    Raises
    ------
    ValueError
        If the input list is empty.

    Notes
    -----
    The mean is calculated as the sum of all elements divided by the number of elements.

    Examples
    --------
    Calculate the mean of a list of numbers:
    >>> calculate_mean([1.0, 2.0, 3.0, 4.0])
    2.5"""

Tool to help you

VsCode’s autoDocstring extension lets you automatically create a docstring template.

PEP 484

In some programming languages, typing is mandatory when declaring a variable. In Python, typing is optional, but strongly recommended. PEP 484 introduces a typing system for Python, annotating the types of variables, function arguments and return values. This PEP provides a basis for improving code readability, facilitating static analysis and reducing errors.

What is typing?

Typing consists in explicitly declaring the type (float, string, etc.) of a variable. The typing module provides standard tools for defining generic types, such as Sequence, List, Union, Any, etc.

To type function attributes, we use “:” for function arguments and “->” for the type of what is returned.

Here a list of none typing functions:

def show_message(message):
    print(f"Message : {message}")

def addition(a, b):
    return a + b

def is_even(n):
    return n % 2 == 0

def list_square(numbers):
      return [x**2 for x in numbers]

def reverse_dictionary(d):
    return {v: k for k, v in d.items()}

def add_element(ensemble, element):
    ensemble.add(element)
  return ensemble

Now here’s how they should look:

from typing import List, Tuple, Dict, Set, Any

def show_message(message: str) -> None:
    print(f"Message : {message}")

def addition(a: int, b: int) -> int:
    return a + b

def is_even(n: int) -> bool:
    return n % 2 == 0

def list_square(numbers: List[int]) -> List[int]:
    return [x**2 for x in numbers]

def reverse_dictionary(d: Dict[str, int]) -> Dict[int, str]:
    return {v: k for k, v in d.items()}

def add_element(ensemble: Set[int], element: int) -> Set[int]:
    ensemble.add(element)
    return ensemble

Tool to help you

The MyPy extension automatically checks whether the use of a variable corresponds to the declared type. For example, for the following function:

def my_function(x: float) -> float:
    return x.mean()

The editor will point out that a float has no “mean” attribute.

Image from author

The benefit is twofold: you’ll know whether the declared type is the right one and whether the use of this variable corresponds to its type.

In the above example, x must be of a type that has a mean() method (e.g. np.array).


Conclusion

In this article, we have looked at the most important principles for creating clean Python production code. A solid architecture, adherence to SOLID principles, and compliance with PEP recommendations (at least the four discussed here) are essential for ensuring code quality. The desire for beautiful code is not (just) coquetry. It standardizes development practices and makes teamwork and maintenance much easier. There’s nothing more frustrating than spending hours (or even days) reverse-engineering a program, deciphering poorly written code before you’re finally able to fix the bugs. By applying these best practices, you ensure that your code remains clear, scalable, and easy for any developer to work with in the future.


References

1. src layout vs flat layout

2. SOLID principles

3. Python Enhancement Proposals index

The post Data Science: From School to Work, Part II appeared first on Towards Data Science.

]]>
Efficient Data Handling in Python with Arrow https://towardsdatascience.com/efficient-data-handling-in-python-with-arrow/ Tue, 25 Feb 2025 20:56:16 +0000 https://towardsdatascience.com/?p=598426 Introducing Arrow to those who are still unaware of its power

The post Efficient Data Handling in Python with Arrow appeared first on Towards Data Science.

]]>
1. Introduction

We’re all used to work with CSVs, JSON files… With the traditional libraries and for large datasets, these can be extremely slow to read, write and operate on, leading to performance bottlenecks (been there). It’s precisely with big amounts of data that being efficient handling the data is crucial for our data science/analytics workflow, and this is exactly where Apache Arrow comes into play. 

Why? The main reason resides in how the data is stored in memory. While JSON and CSVs, for example, are text-based formats, Arrow is a columnar in-memory data format (and that allows for fast data interchange between different data processing tools). Arrow is therefore designed to optimize performance by enabling zero-copy reads, reducing memory usage, and supporting efficient compression. 

Moreover, Apache Arrow is open-source and optimized for analytics. It is designed to accelerate big data processing while maintaining interoperability with various data tools, such as Pandas, Spark, and Dask. By storing data in a columnar format, Arrow enables faster read/write operations and efficient memory usage, making it ideal for analytical workloads.

Sounds great right? What’s best is that this is all the introduction to Arrow I’ll provide. Enough theory, we want to see it in action. So, in this post, we’ll explore how to use Arrow in Python and how to make the most out of it.

2. Arrow in Python

To get started, you need to install the necessary libraries: pandas and pyarrow.

pip install pyarrow pandas

Then, as always, import them in your Python script:

import pyarrow as pa
import pandas as pd

Nothing new yet, just necessary steps to do what follows. Let’s start by performing some simple operations.

2.1. Creating and Storing a Table

The simplest we can do is hardcode our table’s data. Let’s create a two-column table with football data:

teams = pa.array(['Barcelona', 'Real Madrid', 'Rayo Vallecano', 'Athletic Club', 'Real Betis'], type=pa.string())
goals = pa.array([30, 23, 9, 24, 12], type=pa.int8())

team_goals_table = pa.table([teams, goals], names=['Team', 'Goals'])

The format is pyarrow.table, but we can easily convert it to pandas if we want:

df = team_goals_table.to_pandas()

And restore it back to arrow using:

team_goals_table = pa.Table.from_pandas(df)

And we’ll finally store the table in a file. We could use different formats, like feather, parquet… I’ll use this last one because it’s fast and memory-optimized:

import pyarrow.parquet as pq
pq.write_table(team_goals_table, 'data.parquet')

Reading a parquet file would just consist of using pq.read_table('data.parquet').

2.2. Compute Functions

Arrow has its own compute module for the usual operations. Let’s start by comparing two arrays element-wise:

import pyarrow.compute as pc
>>> a = pa.array([1, 2, 3, 4, 5, 6])
>>> b = pa.array([2, 2, 4, 4, 6, 6])
>>> pc.equal(a,b)
[
  false,
  true,
  false,
  true,
  false,
  true
]

That was easy, we could sum all elements in an array with:

>>> pc.sum(a)
<pyarrow.Int64Scalar: 21>

And from this we could easily guess how we can compute a count, a floor, an exp, a mean, a max, a multiplication… No need to go over them, then. So let’s move to tabular operations.

We’ll start by showing how to sort it:

>>> table = pa.table({'i': ['a','b','a'], 'x': [1,2,3], 'y': [4,5,6]})
>>> pc.sort_indices(table, sort_keys=[('y', descending)])
<pyarrow.lib.UInt64Array object at 0x1291643a0>
[
  2,
  1,
  0
]

Just like in pandas, we can group values and aggregate the data. Let’s, for example, group by “i” and compute the sum on “x” and the mean on “y”:

>>> table.group_by('i').aggregate([('x', 'sum'), ('y', 'mean')])
pyarrow.Table
i: string
x_sum: int64
y_mean: double
----
i: [["a","b"]]
x_sum: [[4,2]]
y_mean: [[5,5]]

Or we can join two tables:

>>> t1 = pa.table({'i': ['a','b','c'], 'x': [1,2,3]})
>>> t2 = pa.table({'i': ['a','b','c'], 'y': [4,5,6]})
>>> t1.join(t2, keys="i")
pyarrow.Table
i: string
x: int64
y: int64
----
i: [["a","b","c"]]
x: [[1,2,3]]
y: [[4,5,6]]

By default, it is a left outer join but we could twist it by using the join_type parameter.

There are many more useful operations, but let’s see just one more to avoid making this too long: appending a new column to a table.

>>> t1.append_column("z", pa.array([22, 44, 99]))
pyarrow.Table
i: string
x: int64
z: int64
----
i: [["a","b","c"]]
x: [[1,2,3]]
z: [[22,44,99]]

Before ending this section, we must see how to filter a table or array:

>>> t1.filter((pc.field('x') > 0) & (pc.field('x') < 3))
pyarrow.Table
i: string
x: int64
----
i: [["a","b"]]
x: [[1,2]]

Easy, right? Especially if you’ve been using pandas and numpy for years!

3. Working with files

We’ve already seen how we can read and write Parquet files. But let’s check some other popular file types so that we have several options available.

3.1. Apache ORC

Being very informal, Apache ORC can be understood as the equivalent of Arrow in the realm of file types (even though its origins have nothing to do with Arrow). Being more correct, it’s an open source and columnar storage format. 

Reading and writing it is as follows:

from pyarrow import orc
# Write table
orc.write_table(t1, 't1.orc')
# Read table
t1 = orc.read_table('t1.orc')

As a side note, we could decide to compress the file while writing by using the “compression” parameter.

3.2. CSV

No secret here, pyarrow has the CSV module:

from pyarrow import csv
# Write CSV
csv.write_csv(t1, "t1.csv")
# Read CSV
t1 = csv.read_csv("t1.csv")

# Write CSV compressed and without header
options = csv.WriteOptions(include_header=False)
with pa.CompressedOutputStream("t1.csv.gz", "gzip") as out:
    csv.write_csv(t1, out, options)

# Read compressed CSV and add custom header
t1 = csv.read_csv("t1.csv.gz", read_options=csv.ReadOptions(
    column_names=["i", "x"], skip_rows=1
)]

3.2. JSON

Pyarrow allows JSON reading but not writing. It’s pretty straightforward, let’s see an example supposing we have our JSON data in “data.json”:

from pyarrow import json
# Read json
fn = "data.json"
table = json.read_json(fn)

# We can now convert it to pandas if we want to
df = table.to_pandas()

Feather is a portable file format for storing Arrow tables or data frames (from languages like Python or R) that utilizes the Arrow IPC format internally. So, contrary to Apache ORC, this one was indeed created early in the Arrow project.

from pyarrow import feather
# Write feather from pandas DF
feather.write_feather(df, "t1.feather")
# Write feather from table, and compressed
feather.write_feather(t1, "t1.feather.lz4", compression="lz4")

# Read feather into table
t1 = feather.read_table("t1.feather")
# Read feather into df
df = feather.read_feather("t1.feather")

4. Advanced Features

We just touched upon the most basic features and what the majority would need while working with Arrow. However, its amazingness doesn’t end here, it’s right where it starts.

As this will be quite domain-specific and not useful for anyone (nor considered introductory) I’ll just mention some of these features without using any code:

  • We can handle memory management through the Buffer type (built on top of C++ Buffer object). Creating a buffer with our data does not allocate any memory; it is a zero-copy view on the memory exported from the data bytes object. Keeping up with this memory management, an instance of MemoryPool tracks all the allocations and deallocations (like malloc and free in C). This allows us to track the amount of memory being allocated.
  • Similarly, there are different ways to work with input/output streams in batches.
  • PyArrow comes with an abstract filesystem interface, as well as concrete implementations for various storage types. So, for example,  we can write and read parquet files from an S3 bucket using the S3FileSystem. Google Cloud and Hadoop Distributed File System (HDFS) are also accepted.

5. Conclusion and Key Takeaways

Apache Arrow is a powerful tool for efficient Data Handling in Python. Its columnar storage format, zero-copy reads, and interoperability with popular data processing libraries make it ideal for data science workflows. By integrating Arrow into your pipeline, you can significantly boost performance and optimize memory usage.

6. Resources

The post Efficient Data Handling in Python with Arrow appeared first on Towards Data Science.

]]>