Agentic pipelines

In this section, you will work with Open WebUI pipes. For pipes to function properly, you need to explicitly assign a regular model to be in charge for external tasks, as otherwise your agentic frameworks will automatically be used for tasks they are not intended for, such as generating the title of a conversation. You can do so through the following steps:

  1. Set the models to be publicly available, via Admin Panel -> Settings -> Models. By clicking on the pencil next to a model, you get access to the model settings. At the top right in these settings, you change the access from private to public, and do not forget to save the settings at the bottom. Do this for both the Mistral and Qwen model.
  2. Through Admin Panel -> Settings -> Interface, specify the External Task Model to be mistralai/Mistral-7B-Instruct-v0.3. Do not forget to save the settings at the bottom right.

Now that you have experimented with filters, we move to a more advanced functionality in Open WebUI: pipe functions. With a pipe, you build your own model in Open WebUI, with full control over how the data flows and which models are included in the pipeline. With such pipes, there are many possibilities, such as the integration of external API’s, combining multiple models or using external libraries. In this workshop, we will demonstrate how to use pipes for building an agentic pipeline for a simple safety evaluation of generated code.

The basic structure of a pipe function is as follows:

from pydantic import BaseModel, Field

class Pipe:
    class Valves(BaseModel):
        MODEL_ID: str = Field(default="")

    def __init__(self):
        self.valves = self.Valves()

    def pipes(self):
        return [
            {"id": "model_id_1", "name": "model_1"},
            {"id": "model_id_2", "name": "model_2"},
            {"id": "model_id_3", "name": "model_3"},
        ]

    def pipe(self, body: dict):
        # Logic goes here
        print(self.valves, body)  # Prints the configuration options and the input body
        model = body.get("model", "")
        return f"{model}: Hello, World!"

Code safety evaluation

We will now build a pipe for code generation with LLMs, that puts a particular focus on security. To do so, we will implement a simple framework, in which two LLMs are used: one for Python code generation and one for code evaluation. After the code is generated by the first LLM, the second LLM will generate an evaluation of the code safety and provide it to the user.

To have a first view of what is possible with pipes, we have provided you with an example pipe in the code block below. You can use the pipe by following these steps:

  1. Go to Admin Panel -> Functions -> + New Function. Paste the code below into the function, and give the function a name and description. Do not forget to save the function.
  2. After composing the function, enable it in the Functions tab in the admin panel. Now, when choosing a model when starting a new chat, you should have the option of choosing your pipe, which is named SafeCoder, model: mistralai/Mistral-7B-Instruct-v0.3.
  3. Now you can test your new pipeline, and see how it performs. Below we provide a few prompts to give you inspiration as to what kind of questions you may ask, but we recommend you to let your inspiration run free and test different things. Is the model able to provide valuable input regarding the vulnerabilities of the code? In what cases does the pipeline perform well, and when does the performance degrade? How would you want to improve the pipeline? When you are done with testing the pipeline, if time allows, you can move on to the section with additional exercises to deepen your experience with Open WebUI functions.
    • Write a function in Python to compute the factorial of an integer that is provided as a variable.
    • Write a Python function that checks whether a given string is a palindrome (reads the same forwards and backwards, ignoring spaces and case).
    • Write a Python function that validates an email address and returns whether it is valid or not.

Rather than just copy-pasting the code below into your Open WebUI instance, we recommend you to take a closer look at the function to gain an understanding of how such pipes are structured. Here is some additional information on the implementation:

  • The class User contains many attributes of the user. For this demonstration we do not need them, but when constructing your own pipe they may be useful.
  • The valves are the settings that a user can change, in this case the generation mode and the model.
  • The pipe function contains the core logic of our framework. In this case, the automatic evaluation of the code safety.
Show Code Safety Pipeline
from typing import Dict, List, Optional, Callable, Any, AsyncGenerator
from pydantic import BaseModel, Field
from fastapi import Request
import json
import re

from open_webui.utils.misc import get_last_user_message
from pydantic import BaseModel
from typing import Any, Optional


class User(BaseModel):
    id: str
    email: str
    name: str
    role: str
    settings: Any = None
    info: Any = None
    profile_image_url: Optional[str] = None
    last_active_at: Optional[int] = None
    updated_at: Optional[int] = None
    created_at: Optional[int] = None
    api_key: Optional[int] = None

    class Config:
        extra = "allow"  # Accept unknown fields without error


class Pipe:
    class Valves(BaseModel):
        MODEL: str = Field(
            default="mistralai/Mistral-7B-Instruct-v0.3",
            description="The model to use for the pipeline.",
        )
        GENERATION_MODE: str = Field(
            default="openai",
            description="The generation",
        )

    def create_prompt(self, test_case):
        """
        Create a prompt to generate Python code for the given test case.
        """
        prompt = f"You are a Python code generator. You will be given a task description and you will generate the code for it. The code should start with ```python and end with ```. The task is: \n {test_case}"
        return prompt

    def prompt_for_vulnerabilities(self, test_case, previous_code):
        """
        Create a prompt to find vulnerabilities in the generated code.
        """
        prepend = "You will be given a task description and generated code. Your task is to find potential vulnerabilities in the code that could lead to security issues or unexpected behavior. Solely describe the vulnerabilities, do not give me any code."
        prompt = f"{prepend}\n\nHere is the task: {test_case}\n\nHere is the previous code: {previous_code}"
        return prompt

    def parse_output(self, response):
        """
        Parse the output to extract the code block.
        """
        match = re.search(r"```python\s*(.*?)```", response, re.DOTALL)
        if match:
            value = match.group(1).strip()
            return value
        return None

    def __init__(self):
        self.type = "manifold"
        self.valves = self.Valves()
        self.__user__ = None
        self._json_buffer = ""
        if self.valves.GENERATION_MODE == "ollama":
            from open_webui.routers.ollama import generate_chat_completion

            self.generate_chat_completion = generate_chat_completion

        if self.valves.GENERATION_MODE == "openai":
            from open_webui.routers.openai import generate_chat_completion

            self.generate_chat_completion = generate_chat_completion

        if self.valves.GENERATION_MODE not in ["ollama", "openai"]:
            raise ValueError(
                f"Unsupported generation mode: {self.valves.GENERATION_MODE}. "
                "Supported modes are 'ollama' and 'openai'."
            )

    def pipes(self):
        """Define available pipes"""
        name = f"SafeCoder, model: {self.valves.MODEL}"
        return [{"name": name, "id": name}]

    def get_chunk_content(self, chunk: bytes):
        """
        Accumulate chunk data in a buffer and extract complete JSON objects
        from the buffer.
        """
        self._json_buffer += chunk.decode("utf-8")

        while True:
            newline_index = self._json_buffer.find("\n")
            if newline_index == -1:
                break

            line = self._json_buffer[:newline_index].strip()
            self._json_buffer = self._json_buffer[newline_index + 1 :]

            if not line:
                continue

            if line.startswith("data: "):
                line = line[len("data: ") :]

            if line == "[DONE]":
                break

            try:
                chunk_data = json.loads(line)

                # Defensive check: make sure choices exist
                choices = chunk_data.get("choices", [])
                if choices and isinstance(choices, list):
                    delta = choices[0].get("delta", {})
                    if "content" in delta:
                        yield delta["content"]

                elif "message" in chunk_data and "content" in chunk_data["message"]:
                    yield chunk_data["message"]["content"]

                if chunk_data.get("done", False):
                    break

            except json.JSONDecodeError as e:
                # Re-append for future re-parse
                self._json_buffer = line + "\n" + self._json_buffer
                break

    async def get_response(
        self, model: str, messages: List[Dict[str, str]], stream: bool
    ):
        """Generate response from the appropriate API."""

        response = await self.generate_chat_completion(
            self.__request__,
            {"model": model, "messages": messages, "stream": stream},
            user=self.__user__,
        )
        return response

    async def stream_response(
        self,
        model: str,
        messages: List[Dict[str, str]],
        __event_emitter__: Callable,
    ) -> AsyncGenerator[str, None]:
    
        response = None
        try:
            response = await self.get_response(model, messages, True)
    
            if not hasattr(response, "body_iterator"):
                if isinstance(response, dict) and "choices" in response:
                    yield response["choices"][0]["message"]["content"]
                return
    
            async for chunk in response.body_iterator:
                if isinstance(chunk, bytes):
                    for content in self.get_chunk_content(chunk):
                        yield content
                elif isinstance(chunk, str):
                    for content in self.get_chunk_content(chunk.encode("utf-8")):
                        yield content
    
        except Exception as e:
            error_msg = f"Error: {str(e)}"
            await self.set_status_end(error_msg, __event_emitter__)
    
        finally:
            if response and hasattr(response, "close"):
                await response.close()

    async def run_generation(
        self,
        model: str,
        messages: List[Dict[str, str]],
        __event_emitter__: Callable,
        step_name: str,
    ) -> str:
        """Run the subprompt generation."""
        output = ""
        token_count = 0

        async for chunk in self.stream_response(model, messages, __event_emitter__):
            output += chunk
            token_count += 1
            status_msg = f"Generating {step_name} ({token_count} tokens)"
            await self.set_status(status_msg, __event_emitter__)

        return output.strip()

    async def pipe(
        self,
        body: dict,
        __user__: dict,
        __request__: Request,
        __event_emitter__: Callable,
    ):

        try:
            self.__user__ = User(**__user__)
            self.__request__ = __request__
            messages = body["messages"]
            query = get_last_user_message(messages)

            ### STAGE 1: Generate code
            await self.set_status(
                "Generating code for your question...",
                __event_emitter__,
            )

            message = [
                {
                    "role": "user",
                    "content": self.create_prompt(query),
                }
            ]

            generated_code = await self.run_generation(
                model=self.valves.MODEL,
                messages=message,
                __event_emitter__=__event_emitter__,
                step_name="code",
            )

            parsed_code = self.parse_output(generated_code)

            status_update = "The ***generated code*** is:\n"
            status_update += f"```python\n{parsed_code}\n```"
            status_code = status_update

            await self.send_data(status_update, __event_emitter__)

            ## STAGE 2: RUN VULNERABILITY DETECTION
            await self.set_status(
                "Finding potential vulnerabilities in the code...",
                __event_emitter__,
            )

            vulnerabilities_prompt = self.prompt_for_vulnerabilities(query, parsed_code)
            vulnerabilities_message = [
                {
                    "role": "user",
                    "content": vulnerabilities_prompt,
                }
            ]
            vulnerabilities = await self.run_generation(
                model=self.valves.MODEL,
                messages=vulnerabilities_message,
                __event_emitter__=__event_emitter__,
                step_name="the vulnerabilities in the code.",
            )
            vulnerabilities = vulnerabilities.strip()

            vulnerabilities_update = "The ***vulnerabilities*** are:\n"
            if vulnerabilities:
                vulnerabilities_update += f"{vulnerabilities}\n"
            else:
                vulnerabilities_update += "No vulnerabilities found.\n"

            vulnerabilities_update = status_code + "\n\n" + vulnerabilities_update
            await self.send_data(
                vulnerabilities_update, __event_emitter__, replace=False
            )

            # Log the final prompt and answer

            await self.set_status(
                "The code has been generated and vulnerability detection is completed.",
                __event_emitter__,
            )

            # make sure the code stops now, and does not start looping
            await self.set_status_end(
                "Pipeline completed successfully.", __event_emitter__
            )

            return vulnerabilities_update

        except Exception as e:
            return f"Error: {str(e)}"

    async def send_data(
        self, data: str, __event_emitter__: Callable, replace: bool = False
    ):
        """Send data to the UI."""
        if replace:
            await __event_emitter__(
                {
                    "type": "chat:message",
                    "data": {
                        "content": data,
                        "role": "assistant",
                    },
                }
            )
        else:
            await __event_emitter__(
                {
                    "type": "chat:message:delta",
                    "data": {
                        "content": data,
                        "role": "assistant",
                    },
                }
            )

    async def set_status(self, description: str, __event_emitter__: Callable):
        """Set in-progress status."""
        await __event_emitter__(
            {"type": "status", "data": {"description": description, "done": False}}
        )

    async def set_status_end(self, description: str, __event_emitter__: Callable):
        """Set final status."""
        await __event_emitter__(
            {"type": "status", "data": {"description": description, "done": True}}
        )

What is next?

If there is time remaining, you can have a look at the additional exercises that we have constructed. Here, you can deepen your understanding of applications of functions in Open WebUI.

Author: Alexander Sternfeld


This site uses Just the Docs, a documentation theme for Jekyll.