Skip to content

Build an AI-Powered Multimodal MCP Chatbot#

This step-by-step guide will walk you through building a modern chatbot that can chat with your documents, images, and videos. By the end, you'll have a working multimodal AI assistant and understand how to use Jac's unique programming features to build intelligent applications.

What You'll Build#

You'll create a chatbot that can:

  • Upload and chat with PDFs, text files, images, and videos
  • Search your documents and provide context-aware answers
  • Answer general questions using web search
  • Understand and discuss images and videos using AI vision
  • Route different types of questions to specialized AI handlers

What You'll Learn#

  • Object Spatial Programming: Use Jac's node-walker architecture to organize your application
  • Mean Typed Programming (MTP): Let AI classify and route user queries automatically with just simple definitions
  • Model Context Protocol (MCP): Build modular, reusable AI tools
  • Multimodal AI: Work with text, images, and videos in one application

Technologies We'll Use#

  • Jac Language: For the main application logic
  • Jac Cloud: Backend server infrastructure
  • Streamlit: User-friendly web interface
  • ChromaDB: Document search and storage
  • OpenAI GPT: AI chat and vision capabilities
  • Serper API: Real-time web search

Project Structure#

We'll create five main files:

  • client.jac: The web interface for chat and file uploads
  • server.jac: The main application using Object Spatial Programming
  • mcp_server.jac: Tool server for document search and web search
  • mcp_client.jac: Interface to communicate with tools
  • tools.jac: Document processing and search logic

Step 1: Set Up Your Environment#

First, install the required packages. We recommend Python 3.12 or newer:

pip install jaclang jac-cloud jac-streamlit mtllm langchain langchain-community langchain-openai langchain-chroma chromadb openai pypdf tiktoken requests mcp[cli] anyio

Next, get your API keys. You'll need an OpenAI API key for the AI features. For web search, get a free API key from Serper.

Set your environment variables:

export OPENAI_API_KEY=<your-openai-key>
export SERPER_API_KEY=<your-serper-key>

If you see no errors, you're ready to start building!

Step 2: Build the Document Processing Engine#

We'll start by creating the core engine that processes and searches your documents. Create tools.jac:

import os;
import requests;
import from langchain_community.document_loaders {PyPDFDirectoryLoader, PyPDFLoader}
import from langchain_text_splitters {RecursiveCharacterTextSplitter}
import from langchain.schema.document {Document}
import from langchain_openai {OpenAIEmbeddings}
import from langchain_chroma {Chroma}

glob SERPER_API_KEY: str = os.getenv('SERPER_API_KEY', '');

obj RagEngine {
    has file_path: str = "docs";
    has chroma_path: str = "chroma";

    def postinit {
        if not os.path.exists(self.file_path) {
            os.makedirs(self.file_path);
        }
        documents: list = self.load_documents();
        chunks: list = self.split_documents(documents);
        self.add_to_chroma(chunks);
    }

    def load_documents {
        document_loader = PyPDFDirectoryLoader(self.file_path);
        return document_loader.load();
    }

    def load_document(file_path: str) {
        loader = PyPDFLoader(file_path);
        return loader.load();
    }

    def add_file(file_path: str) {
        documents = self.load_document(file_path);
        chunks = self.split_documents(documents);
        self.add_to_chroma(chunks);
    }

    def split_documents(documents: list[Document]) {
        text_splitter = RecursiveCharacterTextSplitter(chunk_size=800,
        chunk_overlap=80,
        length_function=len,
        is_separator_regex=False);
        return text_splitter.split_documents(documents);
    }

    def get_embedding_function {
        embeddings = OpenAIEmbeddings();
        return embeddings;
    }

    def add_chunk_id(chunks: str) {
        last_page_id = None;
        current_chunk_index = 0;

        for chunk in chunks {
            source = chunk.metadata.get('source');
            page = chunk.metadata.get('page');
            current_page_id = f'{source}:{page}';

            if current_page_id == last_page_id {
                current_chunk_index +=1;
            } else {
                current_chunk_index = 0;
            }

            chunk_id = f'{current_page_id}:{current_chunk_index}';
            last_page_id = current_page_id;

            chunk.metadata['id'] = chunk_id;
        }

        return chunks;
    }

    def add_to_chroma(chunks: list[Document]) {
        db = Chroma(persist_directory=self.chroma_path, embedding_function=self.get_embedding_function());
        chunks_with_ids = self.add_chunk_id(chunks);

        existing_items = db.get(include=[]);
        existing_ids = set(existing_items['ids']);

        new_chunks = [];
        for chunk in chunks_with_ids {
            if chunk.metadata['id'] not in existing_ids {
                new_chunks.append(chunk);
            }
        }

        if len(new_chunks) {
            print('adding new documents');
            new_chunk_ids = [chunk.metadata['id'] for chunk in new_chunks];
            db.add_documents(new_chunks, ids=new_chunk_ids);
        } else {
            print('no new documents to add');
        }
    }

    def get_from_chroma(query: str,chunck_nos: int=5) {
        db = Chroma(
            persist_directory=self.chroma_path,
            embedding_function=self.get_embedding_function()
        );
        results = db.similarity_search_with_score(query,k=chunck_nos);
        return results;
    }

    def search(query: str, chunck_nos: int=5) {
        results = self.get_from_chroma(query=query, chunck_nos=chunck_nos);
        summary = "";
        for (doc, score) in results {
            page = doc.metadata.get('page');
            source = doc.metadata.get('source');
            chunk_txt = doc.page_content[:400];
            summary += f"{source} page {page}: {chunk_txt}\n";
        }
        return summary;
    }
}


obj WebSearch {
    has api_key: str = SERPER_API_KEY;
    has base_url: str = "https://google.serper.dev/search";

    def search(query: str) {
        headers = {"X-API-KEY": self.api_key, "Content-Type": "application/json"};
        payload = {"q": query};
        resp = requests.post(self.base_url, headers=headers, json=payload);
        if resp.status_code == 200 {
            data = resp.json();
            summary = "";
            results = data.get("organic", []) if isinstance(data, dict) else [];
            for r in results[:3] {
                summary += f"{r.get('title', '')}: {r.get('link', '')}\n";
                if r.get('snippet') {
                    summary += f"{r['snippet']}\n";
                }
            }
            return summary;
        }
        return f"Serper request failed: {resp.status_code}";
    }
}

This engine is the foundation of your chatbot. It processes your uploaded documents, splits them into chunks, creates embeddings, and stores them for efficient search. Let's break down what it does:

  • Document Processing: Reads PDFs and text files, extracting their content
  • Text Chunking: Splits large documents into smaller, searchable pieces
  • Vector Embeddings: Converts text into numerical representations for semantic search
  • Storage: Uses ChromaDB to store and index your documents

Step 3: Build the Tool Server#

Now create the MCP server that exposes document search and web search as tools. Create mcp_server.jac:

import sys;
import os;
import from tools {RagEngine, WebSearch}
import from mcp.server.fastmcp.tools {Tool}
import from mcp.server.fastmcp {FastMCP}
import typing;

glob rag_engine: RagEngine = RagEngine();
glob web_search: WebSearch = WebSearch();


with entry {
    mcp = FastMCP(name="RAG-MCP", port=8899);
}

def resolve_hints(fn: typing.Callable) -> typing.Callable {
    fn.__annotations__ = typing.get_type_hints(fn, include_extras=True);
    return fn;
}

@mcp.tool(name="search_docs")
@resolve_hints
async def tool_search_docs(query: str) -> str {
    return rag_engine.search(query);
}

@mcp.tool(name="search_web")
@resolve_hints
async def tool_search_web(query: str) -> str{
    web_search_results = web_search.search(query);
    if not web_search_results {
        return "Mention No results found for the web search";
    }
    return web_search_results;
}


with entry {
    mcp.run("streamable-http");
}

This server exposes two tools: one for searching your uploaded documents and another for web search. The FastMCP framework makes it easy to create these modular tools that your main application can use.

Step 4: Create the Tool Interface#

Next, create mcp_client.jac to communicate with your tool server:

import anyio;
import logging;
import mcp;
import os;
import from mcp.client { streamable_http }

with entry {
    logger = logging.getLogger(__name__);
    logger.setLevel(logging.INFO);
    logging.basicConfig(level=logging.INFO);
}
glob MCP_SERVER_URL = os.getenv('MCP_SERVER_URL', 'http://localhost:8899/mcp');


def list_mcp_tools()  -> list[dict] {
    async def _list()  -> list {
        async with streamable_http.streamablehttp_client(MCP_SERVER_URL) as (read, write, _)  {
            async with mcp.ClientSession(read, write) as sess  {
                await sess.initialize();
                tools = await sess.list_tools();
                structured_tools = [];
                logger.info(f"available tools 1:{tools.tools}");
                tool_names = [tool.name for tool in tools.tools];
                logger.info(f"tool list of names:{tool_names}");
                return tool_names;
            }
        }
    }
    return anyio.run(_list);
}


def call_mcp_tool(name: str, arguments:dict) -> str {
    async def _call()  -> str {
        async with streamable_http.streamablehttp_client(MCP_SERVER_URL) as (read, write, _)  {
            async with mcp.ClientSession(read, write) as sess  {
                await sess.initialize();
                result = await sess.call_tool(name=name, arguments=arguments);
                if result.isError {
                    return f"'MCP error: '{result.error.message}";
                }
            if (result.structuredContent and ('result' in result.structuredContent) ) {
                return result.structuredContent[ 'result' ];
            } if (result.content and (len(result.content) > 0) ) {
                return result.content[ 0 ].text;
            } }
        }
    }
    return anyio.run(_call);
}

This client handles the communication between your main application and the tools.

Step 5: Create the Main Application with Object Spatial Programming#

Now for the core application logic. Create server.jac:

import sys;
import from mtllm.llm {Model}
import from mtllm.types {Image, Video, Text}
import from tools {RagEngine}
import os;
import base64;
import requests;
import anyio;
import mcp_client;

glob rag_engine:RagEngine = RagEngine();
glob llm = Model(model_name='gpt-4o-mini', verbose=True);
glob MCP_SERVER_URL: str = os.getenv('MCP_SERVER_URL', 'http://localhost:8899/mcp');


"""ChatType enum defines the types of chat interactions. ChatType must be one of:
- RAG: For interactions that require document retrieval.
- QA: For interactions that does not require document retrieval, or image-video-related questions.
- IMAGE: For interactions involving image analysis or anything related to images, and follow up questions.
- VIDEO: For interactions involving video analysis or video-related questions.
"""
enum ChatType {
    RAG = "RAG",
    QA = "QA",
    IMAGE = "IMAGE",
    VIDEO = "VIDEO"
}

node Router {
    "Classify the message as RAG, QA, or VIDEO. If classification fails, default to QA."
    def classify(message: str) -> ChatType by llm(method="Reason", temperature=0.8);
}

node Chat {
    has chat_type: ChatType;
}

"""Get available MCP tool names."""
def list_mcp_tools() -> list[str] {
    return mcp_client.list_mcp_tools();
}

"""Use MCP tool to perform actions.
name must be one of available tools from list_mcp_tools(), do not make up any tool names.

Example input for `use_mcp_tool`:
{"name": "tool_name", "arguments": {"query": "your query"}}
"""
def use_mcp_tool(name: str, arguments: dict[str, str]) -> str {
    return mcp_client.call_mcp_tool(name=name, arguments=arguments);
}

walker infer {
    has message: str;
    has chat_history: list[dict];
    has file_path: str = "";

    can init_router with `root entry {
        visit [-->](`?Router) else {
            router_node = here ++> Router();
            router_node ++> RagChat();
            router_node ++> QAChat();
            router_node ++> ImageChat();
            router_node ++> VideoChat();
            visit router_node;
        }
    }
    can route with Router entry {
        classification = here.classify(message = self.message);
        print("Routing message:", self.message, "to chat type:", classification);
        visit [-->](`?Chat)(?chat_type==classification);
    }
}


node ImageChat(Chat) {
    has chat_type: ChatType = ChatType.IMAGE;

    """Answer the user's message(text) by referring to the provided image. Always refer to the given image, answer relevant to the given image."""
    def respond_with_image(img: Image, text: Text, chat_history: list[dict]) -> str by llm(tools=([use_mcp_tool, list_mcp_tools]));

    can chat with infer entry {
        img_path = visitor.file_path;
        response = self.respond_with_image(
            img=Image(img_path),
            text=visitor.message,
            chat_history=visitor.chat_history
        );

        visitor.chat_history.append({"role": "assistant", "content": response});
        self.chat_history = visitor.chat_history;
        visitor.response = response;
        report {"response": response, "chat_history": visitor.chat_history};
    }
}



node VideoChat(Chat) {
    has chat_type: ChatType = ChatType.VIDEO;

    """Answer the user's message using the provided video and text. Always refer to the given video, answer relevant to the given video."""
    def respond_with_video(video: Video, text: Text, chat_history: list[dict]) -> str by llm(
            method="Chain-of-Thoughts"
        );

    can chat with infer entry {
        video_path = visitor.file_path;
        response = self.respond_with_video(
            video=Video(video_path),
            text=visitor.message,
            chat_history=visitor.chat_history
        );

        visitor.chat_history.append({"role": "assistant", "content": response});
        self.chat_history = visitor.chat_history;
        visitor.response = response;
        report {"response": response, "chat_history": visitor.chat_history};
    }
}

node RagChat(Chat) {
    has chat_type: ChatType = ChatType.RAG;

    """Generate a helpful response to the user's message. Use available mcp tool when needed.Use list_mcp_tools to find out what are the available tools. Always pass arguments as a flat dictionary (e.g., {\"query\": \"Your search query\"}), never as a list or schema_dict_wrapper. """
    def respond(message:str, chat_history:list[dict]) -> str by llm(
            method="ReAct",
            tools=([list_mcp_tools, use_mcp_tool]),
            messages=chat_history,
            max_react_iterations=6
        );

    can chat with infer entry {
        response = self.respond(
            message=visitor.message,
            chat_history=visitor.chat_history,
        );
        visitor.chat_history.append({"role": "assistant", "content": response});
        self.chat_history = visitor.chat_history;
        visitor.response = response;
        report {"response": response, "chat_history": visitor.chat_history};
    }
}

node QAChat(Chat) {
    has chat_type: ChatType = ChatType.QA;

    """Generate a helpful response to the user's message. Use available mcp tool when needed. Always pass arguments as a flat dictionary (e.g., {\"query\": \"Your search query\"}), never as a list or schema_dict_wrapper. """
    def respond(message:str, chat_history:list[dict]) -> str
        by llm(
            method="ReAct",
            tools=([use_mcp_tool, list_mcp_tools]),
            messages=chat_history,
            max_react_iterations=6
        );

    can chat with infer entry {
        response = self.respond(
            message=visitor.message,
            chat_history=visitor.chat_history,
        );
        visitor.chat_history.append({"role": "assistant", "content": response});
        self.chat_history = visitor.chat_history;
        visitor.response = response;
        report {"response": response, "chat_history": visitor.chat_history};
    }
}

walker interact {
    has message: str;
    has session_id: str;
    has chat_history: list[dict] = [];
    has file_path: str = "";

    can init_session with `root entry {
        visit [-->](`?Session)(?id == self.session_id) else {
            session_node = here ++> Session(id=self.session_id, chat_history=[], file_path=self.file_path, status=1);
            print("Session Node Created");
            visit session_node;
        }
    }
}



node Session {
    has id: str;
    has chat_history: list[dict];
    has status: int = 1;
    has file_path: str = "";

    can chat with interact entry {
        visitor.chat_history = self.chat_history;
        visitor.chat_history.append({"role": "user", "content": visitor.message});
        response = infer(message=visitor.message, chat_history=self.chat_history, file_path=visitor.file_path) spawn root;
        visitor.chat_history.append({"role": "assistant", "content": response.response});
        self.chat_history = visitor.chat_history;
        report {"response": response.response};
    }
}


walker upload_file {
    has file_name: str;
    has file_data: str;
    has session_id: str;

    can save_doc with `root entry {
        upload_dir = os.path.join("uploads", self.session_id);
        if not os.path.exists(upload_dir) {
            os.makedirs(upload_dir);
        }

        file_path = os.path.join(upload_dir, self.file_name);
        data = base64.b64decode(self.file_data.encode('utf-8'));

        with open(file_path, 'wb') as f {
            f.write(data);
        }

        # Only add text-based documents to rag_engine
        lower_name = self.file_name.lower();
        if lower_name.endswith(".pdf") or lower_name.endswith(".txt") {
            rag_engine.add_file(file_path);
        }

        report {
            "status": "uploaded",
            "file_path": file_path,
            "added_to_rag": lower_name.endswith(".pdf") or lower_name.endswith(".txt")
        };
    }
}

Let's break down what we just built:

Router Node: This is the brain of your application. It uses Mean Typed Programming (MTP) to automatically classify user questions and route them to the right specialist.

Specialized Chat Nodes: Each type of question gets its own expert:

  • RagChat: Handles document-based questions
  • QAChat: Manages general questions and web search
  • ImageChat: Processes image-related conversations
  • VideoChat: Handles video discussions

Session Management: The Session node keeps track of each user's conversation history and uploaded files.

Walkers: These handle the flow of your application:

  • infer: Routes questions to the right chat node
  • interact: Manages conversations and maintains session state
  • upload_file: Processes file uploads

Step 6: Build the Web Interface#

Finally, create the user-friendly interface with client.jac:

import streamlit as st;
import requests;
import base64;


def bootstrap_frontend(token: str) {
    st.set_page_config(layout="wide");
    st.title("Welcome to your Jac MCP Chatbot!");

    # Initialize session state
    if "messages" not in st.session_state {
        st.session_state.messages = [];
    }
    if "session_id" not in st.session_state {
        st.session_state.session_id = "user_session_123";
    }
    uploaded_file = st.file_uploader('Upload File (PDF, TXT, Image, or Video)');
    if uploaded_file {
        file_b64 = base64.b64encode(uploaded_file.read()).decode('utf-8');
        file_extension = uploaded_file.name.lower().split('.')[-1];
        file_type = uploaded_file.type or '';
        supported_types = ['pdf', 'txt', 'png', 'jpg', 'jpeg', 'webp', 'mp4', 'avi', 'mov'];
        if file_extension not in supported_types and not (file_type.startswith('image') or file_type.startswith('video')) {
            st.error(f"Unsupported file type: {file_type or 'unknown'}. Please upload PDF, TXT, Image, or Video files.");
            return;
        }
        # Use upload_pdf walker endpoint for all uploads, saving in uploads/{session_id}
        payload = {
            "file_name": uploaded_file.name,
            "file_data": file_b64,
            "session_id": st.session_state.session_id
        };
        response = requests.post(
            "http://localhost:8000/walker/upload_file",
            json=payload,
            headers={"Authorization": f"Bearer {token}"}
        );
        if response.status_code == 200 {
            st.success(f"File '{uploaded_file.name}' uploaded and saved to uploads/{st.session_state.session_id}.");
            # Track last uploaded file path in session state
            st.session_state.last_uploaded_file_path = f"uploads/{st.session_state.session_id}/{uploaded_file.name}";
        } else {
            st.error(f"Failed to process {uploaded_file.name}: {response.text}");
        }
    }

    # Display chat messages from history on app rerun
    for message in st.session_state.messages {
        with st.chat_message(message["role"]) {
            st.markdown(message["content"]);
        }
    }

    if prompt := st.chat_input("What is up?") {
        # Add user message to chat history
        st.session_state.messages.append({"role": "user", "content": prompt});

        # Display user message in chat message container
        with st.chat_message("user") {
            st.markdown(prompt);
        }
        # Display assistant response in chat message container
        with st.chat_message("assistant") {
            with st.spinner("Thinking...") {
                # Call walker API
                payload = {
                    "message": prompt,
                    "session_id": st.session_state.session_id
                };
                # If a file was uploaded, include its path
                if "last_uploaded_file_path" in st.session_state {
                    payload["file_path"] = st.session_state.last_uploaded_file_path;
                }
                response = requests.post(
                    "http://localhost:8000/walker/interact",
                    json=payload,
                    headers={"Authorization": f"Bearer {token}"}
                );

                if response.status_code == 200 {
                    response = response.json();
                    print("response is",response);
                    st.write(response["reports"][0]["response"]);

                    # Add assistant response to chat history
                    st.session_state.messages.append({"role": "assistant", "content": response["reports"][0]["response"]});
                }
            }
        }
    }
}

with entry {

    INSTANCE_URL = "http://localhost:8000";
    TEST_USER_EMAIL = "test@mail.com";
    TEST_USER_PASSWORD = "password";

    response = requests.post(
        f"{INSTANCE_URL}/user/login",
        json={"email": TEST_USER_EMAIL, "password": TEST_USER_PASSWORD}
    );

    if response.status_code != 200 {
        # Try registering the user if login fails
        response = requests.post(
            f"{INSTANCE_URL}/user/register",
            json={
                "email": TEST_USER_EMAIL,
                "password": TEST_USER_PASSWORD
            }
        );
        assert response.status_code == 201;

        response = requests.post(
            f"{INSTANCE_URL}/user/login",
            json={"email": TEST_USER_EMAIL, "password": TEST_USER_PASSWORD}
        );
        assert response.status_code == 200;
    }

    token = response.json()["token"];

    print("Token:", token);

    bootstrap_frontend(token);
}

This creates a clean, intuitive interface where users can register, log in, upload files, and chat with the AI.

Step 7: Run Your Application#

Now let's see your creation in action! You'll need three terminal windows:

Terminal 1 - Start the tool server:

jac run mcp_server.jac

Terminal 2 - Start the main application:

jac serve server.jac

Terminal 3 - Launch the web interface:

jac streamlit client.jac

If everything starts successfully, open your browser and go to the Streamlit URL (typically http://localhost:8501).

Step 8: Test Your Chatbot#

  1. Register and log in using the web interface
  2. Upload some files: Try PDFs, text files, images, or videos
  3. Start chatting: Ask questions about your uploaded content or general questions

The system will automatically route your questions:

  • Document questions go to the RAG system
  • General questions use web search
  • Image questions use vision AI
  • Video questions analyze video content

Understanding the Architecture#

Your application uses Jac's Object Spatial Programming to create a clean, modular design:

Nodes represent different parts of your system (Router, Chat types, Sessions). Each node has specific responsibilities and capabilities.

Walkers move through your node network, carrying information and executing logic. They represent the actions your system can perform.

Mean Typed Programming (MTP) lets AI automatically classify and route requests, making your application intelligent without complex rule-based logic.

What You've Accomplished#

Congratulations! You've built a sophisticated AI application that demonstrates several advanced concepts:

  • Multimodal AI capabilities that work with text, images, and videos
  • Intelligent routing using AI-based classification
  • Modular architecture with reusable tools via MCP
  • Clean separation of concerns using Object Spatial Programming
  • Real-time web search integration
  • Efficient document search with vector embeddings

Extending Your Chatbot#

Your chatbot is designed to be extensible. You could add:

  • New file types: Support for audio files, spreadsheets, or presentations
  • Additional tools: Weather APIs, database connections, or custom business logic
  • Enhanced AI models: Different LLMs for specialized tasks
  • Advanced search: Hybrid search combining keyword and semantic search
  • Custom chat nodes: Specialized handlers for domain-specific questions

Troubleshooting#

If you run into issues:

  • Dependencies: Make sure all packages are installed and compatible with your Python version
  • Server startup: Start the MCP server before the main server
  • File uploads: Check server logs if uploads fail, and verify supported file types
  • API keys: Verify your OpenAI and Serper API keys are set correctly
  • Ports: Ensure all three services are running on their respective ports

API Reference#

Your application exposes these main endpoints:

  • POST /user/register — Create a new user account
  • POST /user/login — Login and get an access token
  • POST /walker/upload_file — Upload files (requires authentication)
  • POST /walker/interact — Chat with the AI (requires authentication)

Visit http://localhost:8000/docs to see the full API documentation.


You now have the foundation to build sophisticated AI applications using Jac's unique programming paradigms. The combination of Object Spatial Programming, Mean Typed Programming, and modular tool architecture gives you a solid base for creating intelligent, scalable applications.