Skip to content

Build an AI-Powered Multimodal MCP Chatbot#

This step-by-step guide will walk you through building a modern chatbot that can chat with your documents, images, and videos. By the end, you'll have a working multimodal AI assistant and understand how to use Jac's unique programming features to build intelligent applications.

What You'll Build#

You'll create a chatbot that can:

  • Upload and chat with PDFs, text files, images, and videos
  • Search your documents and provide context-aware answers
  • Answer general questions using web search
  • Understand and discuss images and videos using AI vision
  • Route different types of questions to specialized AI handlers

What You'll Learn#

  • Object Spatial Programming: Use Jac's node-walker architecture to organize your application
  • Mean Typed Programming (MTP): Let AI classify and route user queries automatically with just simple definitions
  • Model Context Protocol (MCP): Build modular, reusable AI tools
  • Multimodal AI: Work with text, images, and videos in one application

Technologies We'll Use#

  • Jac Language: For the main application logic
  • Jac Cloud: Backend server infrastructure
  • Streamlit: User-friendly web interface
  • ChromaDB: Document search and storage
  • OpenAI GPT: AI chat and vision capabilities
  • Serper API: Real-time web search

Project Structure#

We'll create six main files:

  • client.jac: The web interface for chat and file uploads
  • server.jac: The main application using Object Spatial Programming
  • server.impl.jac: Implementation details and function bodies for server.jac (automatically imported by Jac)
  • mcp_server.jac: Tool server for document search and web search
  • mcp_client.jac: Interface to communicate with tools
  • tools.jac: Document processing and search logic

Complete Code Preview#

Here's what you'll build - a full AI-powered multimodal MCP chatbot:

Chatbot Workflow

import streamlit as st;
import requests;
import base64;


def bootstrap_frontend(token: str) {
    st.set_page_config(layout="wide");
    st.title("Welcome to your Jac MCP Chatbot!");

    # Initialize session state
    if "messages" not in st.session_state {
        st.session_state.messages = [];
    }
    if "session_id" not in st.session_state {
        st.session_state.session_id = "user_session_123";
    }
    uploaded_file = st.file_uploader('Upload File (PDF, TXT, Image, or Video)');
    if uploaded_file {
        file_b64 = base64.b64encode(uploaded_file.read()).decode('utf-8');
        file_extension = uploaded_file.name.lower().split('.')[-1];
        file_type = uploaded_file.type or '';
        supported_types = ['pdf', 'txt', 'png', 'jpg', 'jpeg', 'webp', 'mp4', 'avi', 'mov'];
        if file_extension not in supported_types and not (file_type.startswith('image') or file_type.startswith('video')) {
            st.error(f"Unsupported file type: {file_type or 'unknown'}. Please upload PDF, TXT, Image, or Video files.");
            return;
        }
        # Use upload_pdf walker endpoint for all uploads, saving in uploads/{session_id}
        payload = {
            "file_name": uploaded_file.name,
            "file_data": file_b64,
            "session_id": st.session_state.session_id
        };
        response = requests.post(
            "http://localhost:8000/walker/upload_file",
            json=payload,
            headers={"Authorization": f"Bearer {token}"}
        );
        if response.status_code == 200 {
            st.success(f"File '{uploaded_file.name}' uploaded and saved to uploads/{st.session_state.session_id}.");
            # Track last uploaded file path in session state
            st.session_state.last_uploaded_file_path = f"uploads/{st.session_state.session_id}/{uploaded_file.name}";
        } else {
            st.error(f"Failed to process {uploaded_file.name}: {response.text}");
        }
    }

    # Display chat messages from history on app rerun
    for message in st.session_state.messages {
        with st.chat_message(message["role"]) {
            st.markdown(message["content"]);
        }
    }

    if prompt := st.chat_input("What is up?") {
        # Add user message to chat history
        st.session_state.messages.append({"role": "user", "content": prompt});

        # Display user message in chat message container
        with st.chat_message("user") {
            st.markdown(prompt);
        }
        # Display assistant response in chat message container
        with st.chat_message("assistant") {
            with st.spinner("Thinking...") {
                # Call walker API
                payload = {
                    "message": prompt,
                    "session_id": st.session_state.session_id
                };
                # If a file was uploaded, include its path
                if "last_uploaded_file_path" in st.session_state {
                    payload["file_path"] = st.session_state.last_uploaded_file_path;
                }
                response = requests.post(
                    "http://localhost:8000/walker/interact",
                    json=payload,
                    headers={"Authorization": f"Bearer {token}"}
                );

                if response.status_code == 200 {
                    response = response.json();
                    print("response is",response);
                    st.write(response["reports"][0]["response"]);

                    # Add assistant response to chat history
                    st.session_state.messages.append({"role": "assistant", "content": response["reports"][0]["response"]});
                }
            }
        }
    }
}

with entry {

    INSTANCE_URL = "http://localhost:8000";
    TEST_USER_EMAIL = "test@mail.com";
    TEST_USER_PASSWORD = "password";

    response = requests.post(
        f"{INSTANCE_URL}/user/login",
        json={"email": TEST_USER_EMAIL, "password": TEST_USER_PASSWORD}
    );

    if response.status_code != 200 {
        # Try registering the user if login fails
        response = requests.post(
            f"{INSTANCE_URL}/user/register",
            json={
                "email": TEST_USER_EMAIL,
                "password": TEST_USER_PASSWORD
            }
        );
        assert response.status_code == 201;

        response = requests.post(
            f"{INSTANCE_URL}/user/login",
            json={"email": TEST_USER_EMAIL, "password": TEST_USER_PASSWORD}
        );
        assert response.status_code == 200;
    }

    token = response.json()["token"];

    print("Token:", token);

    bootstrap_frontend(token);
}
import from byllm.llm {Model}
import from byllm.types {Image, Video, Text}
import from tools {RagEngine}
import os;
import base64;
import mcp_client;

glob rag_engine:RagEngine = RagEngine();
glob llm = Model(model_name='gpt-4o-mini', verbose=True);
glob MCP_SERVER_URL: str = os.getenv('MCP_SERVER_URL', 'http://localhost:8899/mcp');


enum ChatType {
    RAG,
    QA,
    IMAGE,
    VIDEO
}

node Router {
    def classify(message: str) -> ChatType by llm(method="Reason", temperature=0.8);
}

node Chat {
    has chat_type: ChatType;
}

walker infer {
    has message: str;
    has chat_history: list[dict];
    has file_path: str = "";

    can init_router with `root entry {
        visit [-->](`?Router) else {
            router_node = here ++> Router();
            router_node ++> RagChat();
            router_node ++> QAChat();
            router_node ++> ImageChat();
            router_node ++> VideoChat();
            visit router_node;
        }
    }
    can route with Router entry {
        classification = here.classify(message = self.message);
        print("Routing message:", self.message, "to chat type:", classification);
        visit [-->](`?Chat)(?chat_type==classification);
    }
}


node ImageChat(Chat) {
    has chat_type: ChatType = ChatType.IMAGE;

    def respond_with_image(img: Image, text: Text, chat_history: list[dict]) -> str by llm(tools=([use_mcp_tool, list_mcp_tools]));

    can chat with infer entry;
}



node VideoChat(Chat) {
    has chat_type: ChatType = ChatType.VIDEO;

    def respond_with_video(video: Video, text: Text, chat_history: list[dict]) -> str by llm(
        method="Chain-of-Thoughts");

    can chat with infer entry;
}

node RagChat(Chat) {
    has chat_type: ChatType = ChatType.RAG;

    def respond(message:str, chat_history:list[dict]) -> str by llm(
            method="ReAct",
            tools=([list_mcp_tools, use_mcp_tool]),
            messages=chat_history,
            max_react_iterations=6
        );

    can chat with infer entry;
}

node QAChat(Chat) {
    has chat_type: ChatType = ChatType.QA;

    def respond(message:str, chat_history:list[dict]) -> str
        by llm(
            messages=chat_history,
            max_react_iterations=6
        );

    can chat with infer entry;
}

walker interact {
    has message: str;
    has session_id: str;
    has chat_history: list[dict] = [];
    has file_path: str = "";

    can init_session with `root entry {
        visit [-->](`?Session)(?id == self.session_id) else {
            session_node = here ++> Session(id=self.session_id, chat_history=[], file_path=self.file_path, status=1);
            visit session_node;
        }
    }
}



node Session {
    has id: str;
    has chat_history: list[dict];
    has status: int = 1;
    has file_path: str = "";

    can chat with interact entry {
        visitor.chat_history = self.chat_history;
        visitor.chat_history.append({"role": "user", "content": visitor.message});
        response = infer(message=visitor.message, chat_history=self.chat_history, file_path=visitor.file_path) spawn root;
        visitor.chat_history.append({"role": "assistant", "content": response.response});
        self.chat_history = visitor.chat_history;
        report {"response": response.response};
    }
}


walker upload_file {
    has file_name: str;
    has file_data: str;
    has session_id: str;

    can save_doc with `root entry;
}
sem ChatType = """ChatType enum defines the types of chat interactions. ChatType must be one of:
- RAG: For interactions that require document retrieval.
- QA: For interactions that does not require document retrieval, or image-video-related questions.
- IMAGE: For interactions involving image analysis or anything related to images, and follow up questions.
- VIDEO: For interactions involving video analysis or video-related questions.
""";

sem Router.classify = "Classify the message as RAG, QA, or VIDEO. If classification fails, default to QA.";

sem ImageChat.respond_with_image = """Answer the user's message(text) by referring to the provided image. Always refer to the given image, answer relevant to the given image.""";

sem VideoChat.respond_with_video = """Answer the user's message using the provided video and text. Always refer to the given video, answer relevant to the given video.""";

sem RagChat.respond = """Generate a helpful response to the user's message. Use available mcp tool when needed.Use list_mcp_tools to find out what are the available tools. Always pass arguments as a flat dictionary (e.g., {\"query\": \"Your search query\"}), never as a list or schema_dict_wrapper. """;

sem QAChat.respond = """Generate a helpful response to the user's message.""";


impl ImageChat.chat {
    img_path = visitor.file_path;
    response = self.respond_with_image(
        img=Image(img_path),
        text=visitor.message,
        chat_history=visitor.chat_history
    );

    visitor.chat_history.append({"role": "assistant", "content": response});
    self.chat_history = visitor.chat_history;
    visitor.response = response;
    report {"response": response, "chat_history": visitor.chat_history};
}

impl VideoChat.chat {
    video_path = visitor.file_path;
    response = self.respond_with_video(
        video=Video(video_path),
        text=visitor.message,
        chat_history=visitor.chat_history
    );

    visitor.chat_history.append({"role": "assistant", "content": response});
    self.chat_history = visitor.chat_history;
    visitor.response = response;
    report {"response": response, "chat_history": visitor.chat_history};
}


impl RagChat.chat {
    response = self.respond(
        message=visitor.message,
        chat_history=visitor.chat_history,
    );
    visitor.chat_history.append({"role": "assistant", "content": response});
    self.chat_history = visitor.chat_history;
    visitor.response = response;
    report {"response": response, "chat_history": visitor.chat_history};
}

impl QAChat.chat {
    response = self.respond(
        message=visitor.message,
        chat_history=visitor.chat_history,
    );
    visitor.chat_history.append({"role": "assistant", "content": response});
    self.chat_history = visitor.chat_history;
    visitor.response = response;
    report {"response": response, "chat_history": visitor.chat_history};
}

impl upload_file.save_doc {
    upload_dir = os.path.join("uploads", self.session_id);
    if not os.path.exists(upload_dir) {
        os.makedirs(upload_dir);
    }

    file_path = os.path.join(upload_dir, self.file_name);
    data = base64.b64decode(self.file_data.encode('utf-8'));

    with open(file_path, 'wb') as f {
        f.write(data);
    }

    # Only add text-based documents to rag_engine
    lower_name = self.file_name.lower();
    if lower_name.endswith(".pdf") or lower_name.endswith(".txt") {
        rag_engine.add_file(file_path);
    }

    report {
        "status": "uploaded",
        "file_path": file_path,
        "added_to_rag": lower_name.endswith(".pdf") or lower_name.endswith(".txt")
    };
}

"""Get available MCP tool names."""
def list_mcp_tools() -> list[str] {
    return mcp_client.list_mcp_tools();
}

"""Use MCP tool to perform actions.
name must be one of available tools from list_mcp_tools(), do not make up any tool names.

Example input for `use_mcp_tool`:
{"name": "tool_name", "arguments": {"query": "your query"}}
"""
def use_mcp_tool(name: str, arguments: dict[str, str]) -> str {
    return mcp_client.call_mcp_tool(name=name, arguments=arguments);
}
import os;
import from tools {RagEngine, WebSearch}
import from mcp.server.fastmcp.tools {Tool}
import from mcp.server.fastmcp {FastMCP}
import typing;

glob rag_engine: RagEngine = RagEngine();
glob web_search: WebSearch = WebSearch();


with entry {
    mcp = FastMCP(name="RAG-MCP", port=8899);
}

def resolve_hints(fn: typing.Callable) -> typing.Callable {
    fn.__annotations__ = typing.get_type_hints(fn, include_extras=True);
    return fn;
}

@mcp.tool(name="search_docs")
@resolve_hints
async def tool_search_docs(query: str) -> str {
    return rag_engine.search(query);
}

@mcp.tool(name="search_web")
@resolve_hints
async def tool_search_web(query: str) -> str{
    web_search_results = web_search.search(query);
    if not web_search_results {
        return "Mention No results found for the web search";
    }
    return web_search_results;
}


with entry {
    mcp.run("streamable-http");
}
import anyio;
import logging;
import mcp;
import os;
import from mcp.client { streamable_http }

with entry {
    logger = logging.getLogger(__name__);
    logger.setLevel(logging.INFO);
    logging.basicConfig(level=logging.INFO);
}
glob MCP_SERVER_URL = os.getenv('MCP_SERVER_URL', 'http://localhost:8899/mcp');


def list_mcp_tools()  -> list[dict] {
    async def _list()  -> list {
        async with streamable_http.streamablehttp_client(MCP_SERVER_URL) as (read, write, _)  {
            async with mcp.ClientSession(read, write) as sess  {
                await sess.initialize();
                tools = await sess.list_tools();
                structured_tools = [];
                logger.info(f"available tools 1:{tools.tools}");
                tool_names = [tool.name for tool in tools.tools];
                logger.info(f"tool list of names:{tool_names}");
                return tool_names;
            }
        }
    }
    return anyio.run(_list);
}


def call_mcp_tool(name: str, arguments:dict) -> str {
    async def _call()  -> str {
        async with streamable_http.streamablehttp_client(MCP_SERVER_URL) as (read, write, _)  {
            async with mcp.ClientSession(read, write) as sess  {
                await sess.initialize();
                result = await sess.call_tool(name=name, arguments=arguments);
                if result.isError {
                    return f"'MCP error: '{result.error.message}";
                }
            if (result.structuredContent and ('result' in result.structuredContent) ) {
                return result.structuredContent[ 'result' ];
            } if (result.content and (len(result.content) > 0) ) {
                return result.content[ 0 ].text;
            } }
        }
    }
    return anyio.run(_call);
}
import os;
import requests;
import from langchain_community.document_loaders {PyPDFDirectoryLoader, PyPDFLoader}
import from langchain_text_splitters {RecursiveCharacterTextSplitter}
import from langchain.schema.document {Document}
import from langchain_openai {OpenAIEmbeddings}
import from langchain_chroma {Chroma}

glob SERPER_API_KEY: str = os.getenv('SERPER_API_KEY', '');

obj RagEngine {
    has file_path: str = "uploads/user_session_123";
    has chroma_path: str = "chroma";

    def postinit {
        if not os.path.exists(self.file_path) {
            os.makedirs(self.file_path);
        }
        documents: list = self.load_documents();
        chunks: list = self.split_documents(documents);
        self.add_to_chroma(chunks);
    }

    def load_documents {
        document_loader = PyPDFDirectoryLoader(self.file_path);
        docs = document_loader.load();
        for i in range(min(3, len(docs))) {
            doc = docs[i];
            source = doc.metadata.get('source', 'unknown');
            page = doc.metadata.get('page', 'unknown');
        }
        return docs;
    }

    def load_document(file_path: str) {
        loader = PyPDFLoader(file_path);
        return loader.load();
    }

    def add_file(file_path: str) {
        documents = self.load_document(file_path);
        chunks = self.split_documents(documents);
        self.add_to_chroma(chunks);
    }

    def split_documents(documents: list[Document]) {
        text_splitter = RecursiveCharacterTextSplitter(chunk_size=800,
        chunk_overlap=80,
        length_function=len,
        is_separator_regex=False);
        return text_splitter.split_documents(documents);
    }

    def get_embedding_function {
        embeddings = OpenAIEmbeddings();
        return embeddings;
    }

    def add_chunk_id(chunks: str) {
        last_page_id = None;
        current_chunk_index = 0;

        for chunk in chunks {
            source = chunk.metadata.get('source');
            page = chunk.metadata.get('page');
            current_page_id = f'{source}:{page}';

            if current_page_id == last_page_id {
                current_chunk_index +=1;
            } else {
                current_chunk_index = 0;
            }

            chunk_id = f'{current_page_id}:{current_chunk_index}';
            last_page_id = current_page_id;

            chunk.metadata['id'] = chunk_id;
        }

        return chunks;
    }

    def add_to_chroma(chunks: list[Document]) {
        db = Chroma(persist_directory=self.chroma_path, embedding_function=self.get_embedding_function());
        chunks_with_ids = self.add_chunk_id(chunks);

        existing_items = db.get(include=[]);
        existing_ids = set(existing_items['ids']);

        new_chunks = [];
        for chunk in chunks_with_ids {
            if chunk.metadata['id'] not in existing_ids {
                new_chunks.append(chunk);
            }
        }

        if len(new_chunks) {
            print('adding new documents');
            new_chunk_ids = [chunk.metadata['id'] for chunk in new_chunks];
            db.add_documents(new_chunks, ids=new_chunk_ids);
        } else {
            print('no new documents to add');
        }
    }

    def get_from_chroma(query: str,chunck_nos: int=5) {
        db = Chroma(
            persist_directory=self.chroma_path,
            embedding_function=self.get_embedding_function()
        );
        results = db.similarity_search_with_score(query,k=chunck_nos);
        return results;
    }

    def search(query: str, chunck_nos: int=5) {
        results = self.get_from_chroma(query=query, chunck_nos=chunck_nos);
        summary = "";
        for i in range(len(results)) {
            doc = results[i][0];
            score = results[i][1];
            page = doc.metadata.get('page');
            source = doc.metadata.get('source');
            chunk_txt = doc.page_content[0:400] if len(doc.page_content) > 400 else doc.page_content;
            preview = chunk_txt[0:100] if len(chunk_txt) > 100 else chunk_txt;
            summary += f"{source} page {page}: {chunk_txt}\n";
        }
        return summary;
    }
}


obj WebSearch {
    has api_key: str = SERPER_API_KEY;
    has base_url: str = "https://google.serper.dev/search";

    def search(query: str) {
        headers = {"X-API-KEY": self.api_key, "Content-Type": "application/json"};
        payload = {"q": query};
        resp = requests.post(self.base_url, headers=headers, json=payload);
        if resp.status_code == 200 {
            data = resp.json();
            summary = "";
            results = data.get("organic", []) if isinstance(data, dict) else [];
            for r in results[:3] {
                summary += f"{r.get('title', '')}: {r.get('link', '')}\n";
                if r.get('snippet') {
                    summary += f"{r['snippet']}\n";
                }
            }
            return summary;
        }
        return f"Serper request failed: {resp.status_code}";
    }
}

The full source code for this project is also available at: https://github.com/jaseci-labs/Agentic-AI/tree/main/jac-mcp-chatbot


Step 1: Set Up Your Environment#

First, install the required packages. We recommend Python 3.12 or newer:

pip install jaclang jac-cloud jac-streamlit byllm langchain langchain-community langchain-openai langchain-chroma chromadb openai pypdf tiktoken requests mcp[cli] anyio

Next, get your API keys. You'll need an OpenAI API key for the AI features. For web search, get a free API key from Serper.

Set your environment variables:

export OPENAI_API_KEY=<your-openai-key>
export SERPER_API_KEY=<your-serper-key>

If you see no errors, you're ready to start building!

Step 2: Understanding the Architecture#

Your application uses Jac's Object Spatial Programming to create a clean, modular design:

Nodes represent different parts of your system (Router, Chat types, Sessions). Each node has specific responsibilities and capabilities.

Walkers move through your node network, carrying information and executing logic. They represent the actions your system can perform.

Mean Typed Programming (MTP) lets AI automatically classify and route requests, making your application intelligent without complex rule-based logic.

Implementation Separation: The server.jac file contains the high-level structure and logic, while server.impl.jac provides the detailed function implementations. Jac seamlessly imports the implementation file, allowing for clean separation of concerns.

The application consists of:

  • Document Processing Engine (tools.jac): Processes and searches documents using vector embeddings
  • Tool Server (mcp_server.jac): Exposes document and web search as MCP tools
  • Tool Client (mcp_client.jac): Interfaces with the tool server
  • Main Application (server.jac + server.impl.jac): Routes queries and manages conversations
  • Web Interface (client.jac): User-friendly Streamlit interface

Step 3: Run Your Application#

Now let's see your creation in action! You'll need three terminal windows:

Terminal 1 - Start the tool server:

jac run mcp_server.jac

Terminal 2 - Start the main application:

jac serve server.jac

Terminal 3 - Launch the web interface:

jac streamlit client.jac

If everything starts successfully, open your browser and go to the Streamlit URL (typically http://localhost:8501).

Step 4: Test Your Chatbot#

  1. Register and log in using the web interface
  2. Upload some files: Try PDFs, text files, images, or videos
  3. Start chatting: Ask questions about your uploaded content or general questions

The system will automatically route your questions:

  • Document questions go to the RAG system
  • General questions use web search
  • Image questions use vision AI
  • Video questions analyze video content

What You've Accomplished#

Congratulations! You've built a sophisticated AI application that demonstrates several advanced concepts:

  • Multimodal AI capabilities that work with text, images, and videos
  • Intelligent routing using AI-based classification
  • Modular architecture with reusable tools via MCP
  • Clean separation of concerns using Object Spatial Programming
  • Real-time web search integration
  • Efficient document search with vector embeddings

Extending Your Chatbot#

Your chatbot is designed to be extensible. You could add:

  • New file types: Support for audio files, spreadsheets, or presentations
  • Additional tools: Weather APIs, database connections, or custom business logic
  • Enhanced AI models: Different LLMs for specialized tasks
  • Advanced search: Hybrid search combining keyword and semantic search
  • Custom chat nodes: Specialized handlers for domain-specific questions

Troubleshooting#

If you run into issues:

  • Dependencies: Make sure all packages are installed and compatible with your Python version
  • Server startup: Start the MCP server before the main server
  • File uploads: Check server logs if uploads fail, and verify supported file types
  • API keys: Verify your OpenAI and Serper API keys are set correctly
  • Ports: Ensure all three services are running on their respective ports

API Reference#

Your application exposes these main endpoints:

  • POST /user/register — Create a new user account
  • POST /user/login — Login and get an access token
  • POST /walker/upload_file — Upload files (requires authentication)
  • POST /walker/interact — Chat with the AI (requires authentication)

Visit http://localhost:8000/docs to see the full API documentation.


You now have the foundation to build sophisticated AI applications using Jac's unique programming paradigms. The combination of Object Spatial Programming, Mean Typed Programming, and modular tool architecture gives you a solid base for creating intelligent, scalable applications.