Agentic pipelines
In this section, you will work with Open WebUI pipes. For pipes to function properly, you need to explicitly assign a regular model to be in charge for external tasks, as otherwise your agentic frameworks will automatically be used for tasks they are not intended for, such as generating the title of a conversation. You can do so through the following steps:
- Set the models to be publicly available, via
Admin Panel -> Settings -> Models. By clicking on the pencil next to a model, you get access to the model settings. At the top right in these settings, you change the access from private to public, and do not forget to save the settings at the bottom. Do this for both the Mistral and Qwen model.- Through
Admin Panel -> Settings -> Interface, specify the External Task Model to bemistralai/Mistral-7B-Instruct-v0.3. Do not forget to save the settings at the bottom right.
Now that you have experimented with filters, we move to a more advanced functionality in Open WebUI: pipe functions. With a pipe, you build your own model in Open WebUI, with full control over how the data flows and which models are included in the pipeline. With such pipes, there are many possibilities, such as the integration of external API’s, combining multiple models or using external libraries. In this workshop, we will demonstrate how to use pipes for building an agentic pipeline for a simple safety evaluation of generated code.
The basic structure of a pipe function is as follows:
from pydantic import BaseModel, Field
class Pipe:
class Valves(BaseModel):
MODEL_ID: str = Field(default="")
def __init__(self):
self.valves = self.Valves()
def pipes(self):
return [
{"id": "model_id_1", "name": "model_1"},
{"id": "model_id_2", "name": "model_2"},
{"id": "model_id_3", "name": "model_3"},
]
def pipe(self, body: dict):
# Logic goes here
print(self.valves, body) # Prints the configuration options and the input body
model = body.get("model", "")
return f"{model}: Hello, World!"
Code safety evaluation
We will now build a pipe for code generation with LLMs, that puts a particular focus on security. To do so, we will implement a simple framework, in which two LLMs are used: one for Python code generation and one for code evaluation. After the code is generated by the first LLM, the second LLM will generate an evaluation of the code safety and provide it to the user.
To have a first view of what is possible with pipes, we have provided you with an example pipe in the code block below. You can use the pipe by following these steps:
- Go to
Admin Panel -> Functions -> + New Function. Paste the code below into the function, and give the function a name and description. Do not forget to save the function.- After composing the function, enable it in the
Functionstab in the admin panel. Now, when choosing a model when starting a new chat, you should have the option of choosing your pipe, which is named SafeCoder, model: mistralai/Mistral-7B-Instruct-v0.3.- Now you can test your new pipeline, and see how it performs. Below we provide a few prompts to give you inspiration as to what kind of questions you may ask, but we recommend you to let your inspiration run free and test different things. Is the model able to provide valuable input regarding the vulnerabilities of the code? In what cases does the pipeline perform well, and when does the performance degrade? How would you want to improve the pipeline? When you are done with testing the pipeline, if time allows, you can move on to the section with additional exercises to deepen your experience with Open WebUI functions.
- Write a function in Python to compute the factorial of an integer that is provided as a variable.
- Write a Python function that checks whether a given string is a palindrome (reads the same forwards and backwards, ignoring spaces and case).
- Write a Python function that validates an email address and returns whether it is valid or not.
Rather than just copy-pasting the code below into your Open WebUI instance, we recommend you to take a closer look at the function to gain an understanding of how such pipes are structured. Here is some additional information on the implementation:
- The class
Usercontains many attributes of the user. For this demonstration we do not need them, but when constructing your own pipe they may be useful.- The valves are the settings that a user can change, in this case the generation mode and the model.
- The
pipefunction contains the core logic of our framework. In this case, the automatic evaluation of the code safety.
Show Code Safety Pipeline
from typing import Dict, List, Optional, Callable, Any, AsyncGenerator
from pydantic import BaseModel, Field
from fastapi import Request
import json
import re
from open_webui.utils.misc import get_last_user_message
from pydantic import BaseModel
from typing import Any, Optional
class User(BaseModel):
id: str
email: str
name: str
role: str
settings: Any = None
info: Any = None
profile_image_url: Optional[str] = None
last_active_at: Optional[int] = None
updated_at: Optional[int] = None
created_at: Optional[int] = None
api_key: Optional[int] = None
class Config:
extra = "allow" # Accept unknown fields without error
class Pipe:
class Valves(BaseModel):
MODEL: str = Field(
default="mistralai/Mistral-7B-Instruct-v0.3",
description="The model to use for the pipeline.",
)
GENERATION_MODE: str = Field(
default="openai",
description="The generation",
)
def create_prompt(self, test_case):
"""
Create a prompt to generate Python code for the given test case.
"""
prompt = f"You are a Python code generator. You will be given a task description and you will generate the code for it. The code should start with ```python and end with ```. The task is: \n {test_case}"
return prompt
def prompt_for_vulnerabilities(self, test_case, previous_code):
"""
Create a prompt to find vulnerabilities in the generated code.
"""
prepend = "You will be given a task description and generated code. Your task is to find potential vulnerabilities in the code that could lead to security issues or unexpected behavior. Solely describe the vulnerabilities, do not give me any code."
prompt = f"{prepend}\n\nHere is the task: {test_case}\n\nHere is the previous code: {previous_code}"
return prompt
def parse_output(self, response):
"""
Parse the output to extract the code block.
"""
match = re.search(r"```python\s*(.*?)```", response, re.DOTALL)
if match:
value = match.group(1).strip()
return value
return None
def __init__(self):
self.type = "manifold"
self.valves = self.Valves()
self.__user__ = None
self._json_buffer = ""
if self.valves.GENERATION_MODE == "ollama":
from open_webui.routers.ollama import generate_chat_completion
self.generate_chat_completion = generate_chat_completion
if self.valves.GENERATION_MODE == "openai":
from open_webui.routers.openai import generate_chat_completion
self.generate_chat_completion = generate_chat_completion
if self.valves.GENERATION_MODE not in ["ollama", "openai"]:
raise ValueError(
f"Unsupported generation mode: {self.valves.GENERATION_MODE}. "
"Supported modes are 'ollama' and 'openai'."
)
def pipes(self):
"""Define available pipes"""
name = f"SafeCoder, model: {self.valves.MODEL}"
return [{"name": name, "id": name}]
def get_chunk_content(self, chunk: bytes):
"""
Accumulate chunk data in a buffer and extract complete JSON objects
from the buffer.
"""
self._json_buffer += chunk.decode("utf-8")
while True:
newline_index = self._json_buffer.find("\n")
if newline_index == -1:
break
line = self._json_buffer[:newline_index].strip()
self._json_buffer = self._json_buffer[newline_index + 1 :]
if not line:
continue
if line.startswith("data: "):
line = line[len("data: ") :]
if line == "[DONE]":
break
try:
chunk_data = json.loads(line)
# Defensive check: make sure choices exist
choices = chunk_data.get("choices", [])
if choices and isinstance(choices, list):
delta = choices[0].get("delta", {})
if "content" in delta:
yield delta["content"]
elif "message" in chunk_data and "content" in chunk_data["message"]:
yield chunk_data["message"]["content"]
if chunk_data.get("done", False):
break
except json.JSONDecodeError as e:
# Re-append for future re-parse
self._json_buffer = line + "\n" + self._json_buffer
break
async def get_response(
self, model: str, messages: List[Dict[str, str]], stream: bool
):
"""Generate response from the appropriate API."""
response = await self.generate_chat_completion(
self.__request__,
{"model": model, "messages": messages, "stream": stream},
user=self.__user__,
)
return response
async def stream_response(
self,
model: str,
messages: List[Dict[str, str]],
__event_emitter__: Callable,
) -> AsyncGenerator[str, None]:
response = None
try:
response = await self.get_response(model, messages, True)
if not hasattr(response, "body_iterator"):
if isinstance(response, dict) and "choices" in response:
yield response["choices"][0]["message"]["content"]
return
async for chunk in response.body_iterator:
if isinstance(chunk, bytes):
for content in self.get_chunk_content(chunk):
yield content
elif isinstance(chunk, str):
for content in self.get_chunk_content(chunk.encode("utf-8")):
yield content
except Exception as e:
error_msg = f"Error: {str(e)}"
await self.set_status_end(error_msg, __event_emitter__)
finally:
if response and hasattr(response, "close"):
await response.close()
async def run_generation(
self,
model: str,
messages: List[Dict[str, str]],
__event_emitter__: Callable,
step_name: str,
) -> str:
"""Run the subprompt generation."""
output = ""
token_count = 0
async for chunk in self.stream_response(model, messages, __event_emitter__):
output += chunk
token_count += 1
status_msg = f"Generating {step_name} ({token_count} tokens)"
await self.set_status(status_msg, __event_emitter__)
return output.strip()
async def pipe(
self,
body: dict,
__user__: dict,
__request__: Request,
__event_emitter__: Callable,
):
try:
self.__user__ = User(**__user__)
self.__request__ = __request__
messages = body["messages"]
query = get_last_user_message(messages)
### STAGE 1: Generate code
await self.set_status(
"Generating code for your question...",
__event_emitter__,
)
message = [
{
"role": "user",
"content": self.create_prompt(query),
}
]
generated_code = await self.run_generation(
model=self.valves.MODEL,
messages=message,
__event_emitter__=__event_emitter__,
step_name="code",
)
parsed_code = self.parse_output(generated_code)
status_update = "The ***generated code*** is:\n"
status_update += f"```python\n{parsed_code}\n```"
status_code = status_update
await self.send_data(status_update, __event_emitter__)
## STAGE 2: RUN VULNERABILITY DETECTION
await self.set_status(
"Finding potential vulnerabilities in the code...",
__event_emitter__,
)
vulnerabilities_prompt = self.prompt_for_vulnerabilities(query, parsed_code)
vulnerabilities_message = [
{
"role": "user",
"content": vulnerabilities_prompt,
}
]
vulnerabilities = await self.run_generation(
model=self.valves.MODEL,
messages=vulnerabilities_message,
__event_emitter__=__event_emitter__,
step_name="the vulnerabilities in the code.",
)
vulnerabilities = vulnerabilities.strip()
vulnerabilities_update = "The ***vulnerabilities*** are:\n"
if vulnerabilities:
vulnerabilities_update += f"{vulnerabilities}\n"
else:
vulnerabilities_update += "No vulnerabilities found.\n"
vulnerabilities_update = status_code + "\n\n" + vulnerabilities_update
await self.send_data(
vulnerabilities_update, __event_emitter__, replace=False
)
# Log the final prompt and answer
await self.set_status(
"The code has been generated and vulnerability detection is completed.",
__event_emitter__,
)
# make sure the code stops now, and does not start looping
await self.set_status_end(
"Pipeline completed successfully.", __event_emitter__
)
return vulnerabilities_update
except Exception as e:
return f"Error: {str(e)}"
async def send_data(
self, data: str, __event_emitter__: Callable, replace: bool = False
):
"""Send data to the UI."""
if replace:
await __event_emitter__(
{
"type": "chat:message",
"data": {
"content": data,
"role": "assistant",
},
}
)
else:
await __event_emitter__(
{
"type": "chat:message:delta",
"data": {
"content": data,
"role": "assistant",
},
}
)
async def set_status(self, description: str, __event_emitter__: Callable):
"""Set in-progress status."""
await __event_emitter__(
{"type": "status", "data": {"description": description, "done": False}}
)
async def set_status_end(self, description: str, __event_emitter__: Callable):
"""Set final status."""
await __event_emitter__(
{"type": "status", "data": {"description": description, "done": True}}
)
What is next?
If there is time remaining, you can have a look at the additional exercises that we have constructed. Here, you can deepen your understanding of applications of functions in Open WebUI.
Author: Alexander Sternfeld