Skip to content

MCP Utils API

hatchling.mcp_utils

All the MCP utilities for Hatchling. Including Client-Server communication, tool execution, and event handling.

Classes

MCPClient

Client for MCP servers that manages connections and tool execution.

Source code in hatchling/mcp_utils/client.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
class MCPClient:
    """Client for MCP servers that manages connections and tool execution."""
    def __init__(self, settings: AppSettings = None, python_executable_resolver: Optional[Callable[[], str]] = None):
        """Initialize the MCP client.

        Args:
            settings (AppSettings, optional): The application settings instance.
                                            If None, uses the singleton instance.
            python_executable_resolver (Callable[[], str], optional): Function to resolve the Python executable
                for the current environment. If None, defaults to "python".
        """
        self.client_id = str(uuid.uuid4())
        self.session: Optional[ClientSession] = None
        self.exit_stack = None  # Created in connection manager task
        self.connected = False
        self.tools = {}
        self.server_path = None
        self.read = None
        self.write = None

        # Store settings
        self.settings = settings or AppSettings.get_instance()

        # Python executable resolution
        self._python_executable_resolver = python_executable_resolver

        # Connection manager task and queue
        self._operation_queue = asyncio.Queue()
        self._manager_task = None
        self._manager_lock = asyncio.Lock()
        self._connection_task_id = None  # Track the ID of the task that manages connections

        # Add connection monitoring
        self._heartbeat_task = None
        self._reconnection_attempts = 0
        self.MAX_RECONNECTION_ATTEMPTS = 3
        self.RECONNECTION_DELAY = 2  # seconds

        # Get a debug log session from the LoggingManager
        self.logger = logging_manager.get_session(self.__class__.__name__,
                                  formatter=logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
    async def connect(self, server_path: str) -> bool:
        """Connect to an MCP server via stdio.

        Args:
            server_path (str): Path to the server script (.py file).

        Returns:
            bool: True if connection was successful, False otherwise.
        """
        # Start the connection manager if not running
        try:
            await self._start_connection_manager()

            # Create a future to get the result
            future = asyncio.Future()
            await self._operation_queue.put(("connect", [server_path], future))

            # Wait for the operation to complete
            return await asyncio.wait_for(future, timeout=30000)
        except asyncio.TimeoutError:
            self.logger.error(f"Connection attempt to {server_path} timed out after 30000 seconds")
            return False
        except Exception as e:
            self.logger.error(f"Error in connect operation: {e}")
            return False

    def _start_heartbeat(self):
        """Start a background task to periodically check connection health."""
        if self._heartbeat_task is None:
            self._heartbeat_task = asyncio.create_task(self._heartbeat_loop())
    async def _heartbeat_loop(self):
        """Periodically check if the connection is still alive."""
        try:
            while self.connected:
                await asyncio.sleep(30)  # Check every 30 seconds
                if not self.connected:
                    break

                try:
                    # Try a lightweight operation to check connection
                    if self.session:
                        await self.session.send_ping()
                        self.logger.debug("Connection heartbeat: OK")
                except Exception as e:
                    self.logger.warning(f"Connection heartbeat failed: {e}")
                    # Mark as disconnected without trying to reconnect
                    # This avoids task/context issues
                    self.logger.warning("Connection marked as failed - client needs to be reconnected")
                    self.connected = False
                    break

        except asyncio.CancelledError:
            self.logger.debug("Heartbeat task cancelled")
        except Exception as e:
            self.logger.error(f"Error in heartbeat task: {e}")
        finally:
            self.logger.debug("Heartbeat task stopped")
            self._heartbeat_task = None # _cleanup_connection has been replaced by _internal_cleanup in the connection manager

    async def disconnect(self):
        """Disconnect from the MCP server and clean up resources."""
        if not self.connected:
            return

        if not self._manager_task or self._manager_task.done():
            # If there's no manager task running, mark as disconnected directly
            self.connected = False
            self.session = None
            self.exit_stack = None
            self.tools = {}
            return

        try:
            # Create a future to get the result
            future = asyncio.Future()
            await self._operation_queue.put(("disconnect", [], future))

            # Wait for the operation to complete with timeout
            await asyncio.wait_for(future, timeout=10)
        except asyncio.TimeoutError:
            self.logger.error("Disconnect operation timed out after 10 seconds")
            # Even if disconnect fails, mark as disconnected
            self.connected = False
        except Exception as e:
            self.logger.error(f"Error in disconnect operation: {e}")
            # Even if disconnect fails, mark as disconnected
            self.connected = False

    async def get_citations(self) -> Dict[str, str]:
        """Get citations from the MCP server.

        Returns:
            Dict[str, str]: Dictionary with origin and MCP citations.
        """
        if not self.connected:
            raise ConnectionError("Not connected to MCP server")

        # Create a future to get the result
        future = asyncio.Future()
        await self._operation_queue.put(("get_citations", [], future))

        # Wait for the operation to complete
        try:
            return await future
        except Exception as e:
            self.logger.error(f"Error in get_citations operation: {e}")
            raise

    async def execute_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Any:
        """Execute an MCP tool by name with given arguments.

        Args:
            tool_name (str): Name of the tool to execute.
            arguments (Dict[str, Any): Arguments to pass to the tool.

        Returns:
            Any: Result of the tool execution.

        Raises:
            ConnectionError: If not connected to MCP server.
            ValueError: If the tool is not found.
            TimeoutError: If the tool execution times out.
            Exception: For any other errors during execution.
        """
        if not self.connected or not self.session:
            raise ConnectionError("Not connected to MCP server")

        if tool_name not in self.tools:
            raise ValueError(f"Tool '{tool_name}' not found")

        try:            
            # Execute the tool through the connection manager task
            future = asyncio.Future()
            await self._operation_queue.put(("execute_tool", [tool_name, arguments], future))

            # Wait for the result
            return await future

        except Exception as e:
            self.logger.error(f"Error executing tool {tool_name}: {str(e)}")
            raise

    async def _start_connection_manager(self):
        """Start the dedicated connection manager task if not already running."""
        async with self._manager_lock:
            # If there's a task but it's done, clean it up first
            if self._manager_task and self._manager_task.done():
                # Check if the task had an exception
                try:
                    exc = self._manager_task.exception()
                    if exc:
                        self.logger.error(f"Previous connection manager failed with: {exc}")
                except (asyncio.InvalidStateError, asyncio.CancelledError):
                    # Task was cancelled or is in an invalid state
                    pass

                self._manager_task = None

            # Create a new task if needed
            if self._manager_task is None:
                self.logger.debug("Starting connection manager task")
                self._manager_task = asyncio.create_task(self._connection_manager_loop())
                # Name the task for better debugging
                self._manager_task.set_name(f"mcp_connection_manager_{self.client_id[:8]}")

    async def _connection_manager_loop(self):
        """A dedicated task that handles all connection and disconnection operations.

        This ensures all operations with exit stacks and cancel scopes happen in the same task context.
        """
        # Store the task ID to track task context
        self._connection_task_id = id(asyncio.current_task())
        self.logger.debug(f"Connection manager task started with ID: {self._connection_task_id}")

        try:
            while True:
                # Get the next operation from the queue
                operation, args, future = await self._operation_queue.get()

                try:
                    self.logger.debug(f"Processing operation: {operation}")
                    if operation == "connect":
                        server_path = args[0]
                        result = await self._internal_connect(server_path)
                        future.set_result(result)
                    elif operation == "disconnect":
                        await self._internal_disconnect()
                        future.set_result(None)
                    elif operation == "execute_tool":
                        tool_name, arguments = args
                        result = await self._internal_execute_tool(tool_name, arguments)
                        future.set_result(result)
                    elif operation == "get_citations":
                        result = await self._internal_get_citations()
                        future.set_result(result)
                    else:
                        self.logger.warning(f"Unknown operation: {operation}")
                        future.set_exception(ValueError(f"Unknown operation: {operation}"))
                except Exception as e:
                    self.logger.error(f"Error processing operation {operation}: {e}")
                    future.set_exception(e)
                finally:
                    self._operation_queue.task_done()
        except asyncio.CancelledError:
            self.logger.debug(f"Connection manager task {self._connection_task_id} cancelled")
        except Exception as e:
            self.logger.error(f"Error in connection manager loop: {e}")
        finally:
            self.logger.debug(f"Connection manager task {self._connection_task_id} stopped")
            self._connection_task_id = None

    async def _internal_connect(self, server_path: str) -> bool:
        """Internal connect method that runs in the connection manager task.

        Args:
            server_path (str): Path to the server script.

        Returns:
            bool: True if connection was successful.
        """
        # Clean up any existing connection first
        await self._internal_cleanup()

        self.server_path = server_path
        # Create a fresh exit stack for this connection
        self.exit_stack = AsyncExitStack()

        # Create server parameters for stdio connection
        python_executable = self._get_python_executable()
        #env_vars = self._get_environment_vars()

        self.logger.debug(f"Using Python executable: {python_executable}")

        server_params = StdioServerParameters(
            command=python_executable,
            args=[server_path],
            #env=env_vars,
        )

        self.logger.debug(f"Connecting to MCP server: {server_path} using Python: {python_executable}")
        try:
            # Follow the exact sequence from the working example
            stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
            self.read, self.write = stdio_transport
            session = await self.exit_stack.enter_async_context(ClientSession(self.read, self.write))
            await session.initialize()
            self.session = session
            self.connected = True

            # List available tools
            response = await self.session.list_tools()
            tools = response.tools

            # Store tools in a dictionary for easy access
            for tool in tools:
                self.tools[tool.name] = tool

            self.logger.info(f"Connected to MCP server: {server_path}")
            self.logger.info(f"Discovered {len(self.tools)} tools: {', '.join(self.tools.keys())}")

            # List available citations
            citations = await self._internal_get_citations()
            self.logger.info("Tool Origin Citation: " + citations["origin"])
            self.logger.info("MCP Implementation Citation: " + citations["mcp"])

            # Start heartbeat task to monitor connection
            self._start_heartbeat()

            return True

        except asyncio.CancelledError:
            self.logger.warning("Connection attempt cancelled")
            await self._internal_cleanup()
            raise
        except Exception as e:
            self.logger.error(f"Failed to connect to MCP server at {server_path}: {str(e)}")
            await self._internal_cleanup()
            return False

    async def _internal_disconnect(self):
        """Internal disconnect method that runs in the connection manager task."""
        if not self.connected:
            return

        self.logger.debug(f"Disconnecting from MCP server: {self.server_path} in task {self._connection_task_id}")

        # Cancel heartbeat task first
        if self._heartbeat_task:
            self._heartbeat_task.cancel()
            try:
                # Wait for task to fully cancel (but don't wait indefinitely)
                await asyncio.wait_for(asyncio.shield(self._heartbeat_task), timeout=2)
            except (asyncio.TimeoutError, asyncio.CancelledError):
                # It's okay if the task times out or is cancelled
                pass
            finally:
                self._heartbeat_task = None

        # Clean up resources
        try:
            await self._internal_cleanup()
        except Exception as e:
            self.logger.error(f"Error during disconnection cleanup: {e}")
            # Even if cleanup fails, continue with state reset

        # Reset additional state
        self.tools = {}
        self._reconnection_attempts = 0

        self.logger.info(f"Disconnected from MCP server: {self.server_path}")

    async def _internal_cleanup(self):
        """Internal cleanup method that runs in the connection manager task."""
        self.logger.debug(f"Starting connection cleanup in task {self._connection_task_id}")

        try:
            if self.exit_stack:
                self.logger.debug("Closing exit stack from the same task that created it")
                await self.exit_stack.aclose()
        except asyncio.CancelledError:
            self.logger.warning("Cleanup interrupted by cancellation")
            raise  # Re-raise to allow proper handling
        except Exception as e:
            self.logger.error(f"Error during connection cleanup: {e}")
        finally:
            # Always reset these regardless of success/failure
            self.exit_stack = None
            self.session = None
            self.read = None
            self.write = None
            self.connected = False
            self.logger.debug("Connection state reset completed")

    async def _internal_execute_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Any:
        """Internal execute tool method that runs in the connection manager task."""
        if not self.connected or not self.session:
            raise ConnectionError("Not connected to MCP server")

        if tool_name not in self.tools:
            raise ValueError(f"Tool '{tool_name}' not found")

        try:
            # Execute the tool with timeout
            result = await asyncio.wait_for(
                self.session.call_tool(name=tool_name, arguments=arguments),
                timeout=self.settings.tool_calling.max_tool_working_time
            )

            # Extract the result value from the response object if needed
            if hasattr(result, 'result'):
                return result.result
            else:
                return result

        except asyncio.TimeoutError:
            self.logger.error(f"Tool execution timed out: {tool_name}")
            raise TimeoutError(f"Execution of tool {tool_name} timed out after {self.settings.tool_calling.max_tool_working_time} seconds")

        except Exception as e:
            self.logger.error(f"Error executing tool {tool_name}: {str(e)}")
            raise

    async def _internal_get_citations(self) -> Dict[str, str]:
        """Internal get citations method that runs in the connection manager task."""
        if not self.connected or not self.session:
            raise ConnectionError("Not connected to MCP server")

        citations = {
            "server_name": "None",
            "origin": "Citation not available",
            "mcp": "Citation not available"
        }

        try:
            # Extract server name from the path server uri
            try:
                server_name_uri = f"name://{self.server_path[1:]}"
                server_name_response = await self.session.read_resource(uri=server_name_uri)
                if server_name_response and server_name_response.contents:
                    citations["server_name"] = server_name_response.contents[0].text
                    self.logger.debug(f"Retrieved server name from {server_name_uri}: {citations['server_name']}")
            except Exception as e:
                self.logger.error(f"Failed to get server name: {e}")

            # Try to read origin citation
            try:
                origin_uri = f"citation://origin/{citations['server_name']}"
                origin_response = await self.session.read_resource(uri=origin_uri)
                if origin_response and origin_response.contents:
                    citations["origin"] = origin_response.contents[0].text
                    self.logger.debug(f"Retrieved origin citation from {origin_uri}")
            except Exception as e:
                self.logger.error(f"Failed to get origin citation: {e}")

            # Try to read MCP citation
            try:
                mcp_uri = f"citation://mcp/{citations['server_name']}"
                mcp_response = await self.session.read_resource(uri=mcp_uri)
                if mcp_response and mcp_response.contents:
                    citations["mcp"] = mcp_response.contents[0].text
                    self.logger.debug(f"Retrieved MCP citation from {mcp_uri}")
            except Exception as e:
                self.logger.error(f"Failed to get MCP citation: {e}")

        except Exception as e:
            self.logger.error(f"Error retrieving citations: {e}")

        return citations

    async def _stop_connection_manager(self):
        """Stop the connection manager task gracefully."""
        if self._manager_task and not self._manager_task.done():
            self.logger.debug("Stopping connection manager task")

            # Cancel the task
            self._manager_task.cancel()

            try:
                # Wait for the task to finish with timeout
                await asyncio.wait_for(self._manager_task, timeout=5)
            except (asyncio.CancelledError, asyncio.TimeoutError):
                self.logger.warning("Connection manager task cancellation timed out or was cancelled")
            except Exception as e:
                self.logger.error(f"Error stopping connection manager task: {e}")

            self._manager_task = None

    def _get_python_executable(self) -> str:
        """Get the appropriate Python executable for the current environment.

        Returns:
            str: Path to Python executable, falls back to "python" if no resolver provided.
        """
        if self._python_executable_resolver:
            try:
                python_executable = self._python_executable_resolver()
                self.logger.debug(f"Resolved Python executable: {python_executable}")
                return python_executable
            except Exception as e:
                self.logger.warning(f"Failed to resolve Python executable: {e}, falling back to 'python'")

        # Fallback to default Python command
        self.logger.debug("Using default Python executable: python")
        return "python"

    def _get_environment_vars(self) -> Optional[Dict[str, str]]:
        """Get environment variables for the MCP server process.

        Returns:
            Dict[str, str]: Environment variables to use, or None to use default environment.
        """
        # For now, use the current environment
        # Future enhancement: could get environment-specific variables from resolver
        return os.environ.copy()
Functions
__init__(settings=None, python_executable_resolver=None)

Initialize the MCP client.

Parameters:

Name Type Description Default
settings AppSettings

The application settings instance. If None, uses the singleton instance.

None
python_executable_resolver Callable[[], str]

Function to resolve the Python executable for the current environment. If None, defaults to "python".

None
Source code in hatchling/mcp_utils/client.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def __init__(self, settings: AppSettings = None, python_executable_resolver: Optional[Callable[[], str]] = None):
    """Initialize the MCP client.

    Args:
        settings (AppSettings, optional): The application settings instance.
                                        If None, uses the singleton instance.
        python_executable_resolver (Callable[[], str], optional): Function to resolve the Python executable
            for the current environment. If None, defaults to "python".
    """
    self.client_id = str(uuid.uuid4())
    self.session: Optional[ClientSession] = None
    self.exit_stack = None  # Created in connection manager task
    self.connected = False
    self.tools = {}
    self.server_path = None
    self.read = None
    self.write = None

    # Store settings
    self.settings = settings or AppSettings.get_instance()

    # Python executable resolution
    self._python_executable_resolver = python_executable_resolver

    # Connection manager task and queue
    self._operation_queue = asyncio.Queue()
    self._manager_task = None
    self._manager_lock = asyncio.Lock()
    self._connection_task_id = None  # Track the ID of the task that manages connections

    # Add connection monitoring
    self._heartbeat_task = None
    self._reconnection_attempts = 0
    self.MAX_RECONNECTION_ATTEMPTS = 3
    self.RECONNECTION_DELAY = 2  # seconds

    # Get a debug log session from the LoggingManager
    self.logger = logging_manager.get_session(self.__class__.__name__,
                              formatter=logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
connect(server_path) async

Connect to an MCP server via stdio.

Parameters:

Name Type Description Default
server_path str

Path to the server script (.py file).

required

Returns:

Name Type Description
bool bool

True if connection was successful, False otherwise.

Source code in hatchling/mcp_utils/client.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
async def connect(self, server_path: str) -> bool:
    """Connect to an MCP server via stdio.

    Args:
        server_path (str): Path to the server script (.py file).

    Returns:
        bool: True if connection was successful, False otherwise.
    """
    # Start the connection manager if not running
    try:
        await self._start_connection_manager()

        # Create a future to get the result
        future = asyncio.Future()
        await self._operation_queue.put(("connect", [server_path], future))

        # Wait for the operation to complete
        return await asyncio.wait_for(future, timeout=30000)
    except asyncio.TimeoutError:
        self.logger.error(f"Connection attempt to {server_path} timed out after 30000 seconds")
        return False
    except Exception as e:
        self.logger.error(f"Error in connect operation: {e}")
        return False
disconnect() async

Disconnect from the MCP server and clean up resources.

Source code in hatchling/mcp_utils/client.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
async def disconnect(self):
    """Disconnect from the MCP server and clean up resources."""
    if not self.connected:
        return

    if not self._manager_task or self._manager_task.done():
        # If there's no manager task running, mark as disconnected directly
        self.connected = False
        self.session = None
        self.exit_stack = None
        self.tools = {}
        return

    try:
        # Create a future to get the result
        future = asyncio.Future()
        await self._operation_queue.put(("disconnect", [], future))

        # Wait for the operation to complete with timeout
        await asyncio.wait_for(future, timeout=10)
    except asyncio.TimeoutError:
        self.logger.error("Disconnect operation timed out after 10 seconds")
        # Even if disconnect fails, mark as disconnected
        self.connected = False
    except Exception as e:
        self.logger.error(f"Error in disconnect operation: {e}")
        # Even if disconnect fails, mark as disconnected
        self.connected = False
execute_tool(tool_name, arguments) async

Execute an MCP tool by name with given arguments.

Parameters:

Name Type Description Default
tool_name str

Name of the tool to execute.

required
arguments Dict[str, Any

Arguments to pass to the tool.

required

Returns:

Name Type Description
Any Any

Result of the tool execution.

Raises:

Type Description
ConnectionError

If not connected to MCP server.

ValueError

If the tool is not found.

TimeoutError

If the tool execution times out.

Exception

For any other errors during execution.

Source code in hatchling/mcp_utils/client.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
async def execute_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Any:
    """Execute an MCP tool by name with given arguments.

    Args:
        tool_name (str): Name of the tool to execute.
        arguments (Dict[str, Any): Arguments to pass to the tool.

    Returns:
        Any: Result of the tool execution.

    Raises:
        ConnectionError: If not connected to MCP server.
        ValueError: If the tool is not found.
        TimeoutError: If the tool execution times out.
        Exception: For any other errors during execution.
    """
    if not self.connected or not self.session:
        raise ConnectionError("Not connected to MCP server")

    if tool_name not in self.tools:
        raise ValueError(f"Tool '{tool_name}' not found")

    try:            
        # Execute the tool through the connection manager task
        future = asyncio.Future()
        await self._operation_queue.put(("execute_tool", [tool_name, arguments], future))

        # Wait for the result
        return await future

    except Exception as e:
        self.logger.error(f"Error executing tool {tool_name}: {str(e)}")
        raise
get_citations() async

Get citations from the MCP server.

Returns:

Type Description
Dict[str, str]

Dict[str, str]: Dictionary with origin and MCP citations.

Source code in hatchling/mcp_utils/client.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
async def get_citations(self) -> Dict[str, str]:
    """Get citations from the MCP server.

    Returns:
        Dict[str, str]: Dictionary with origin and MCP citations.
    """
    if not self.connected:
        raise ConnectionError("Not connected to MCP server")

    # Create a future to get the result
    future = asyncio.Future()
    await self._operation_queue.put(("get_citations", [], future))

    # Wait for the operation to complete
    try:
        return await future
    except Exception as e:
        self.logger.error(f"Error in get_citations operation: {e}")
        raise

MCPManager

Centralized manager for everything MCP-related: servers, clients, and adapters.

Source code in hatchling/mcp_utils/manager.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
class MCPManager:
    """Centralized manager for everything MCP-related: servers, clients, and adapters."""

    _instance = None

    def __new__(cls):
        """Ensure singleton pattern implementation.

        Returns:
            MCPManager: The singleton instance of the MCPManager.
        """
        if cls._instance is None:
            cls._instance = super(MCPManager, cls).__new__(cls)
            cls._instance._initialized = False
        return cls._instance

    def __init__(self, settings: Optional[AppSettings] = None):
        """Initialize the MCP manager if not already initialized."""
        if self._initialized:
            return

        # Initialize only once
        self._initialized = True

        # Connection tracking
        self.mcp_clients: Dict[str, MCPClient] = {}
        self.server_processes: Dict[str, subprocess.Popen] = {}
        self._connection_lock = asyncio.Lock()

        # Hatchling settings registry
        self.settings = settings or AppSettings.get_instance()

        # Tool tracking
        self._tool_client_map: Dict[str, MCPClient] = {}  # Map of tool names to clients that provide them

        # Hatch server usage
        self._used_servers_in_session = set()

        # Environment context for Python executable
        self.hatch_env_manager = HatchEnvironmentManager(
            environments_dir=self.settings.paths.envs_dir,
            cache_ttl=86400,  # 1 day default
        )

        # Event publishing capabilities
        self._event_publisher = EventPublisher()

        # Tool management for lifecycle events
        self._managed_tools: Dict[str, MCPToolInfo] = {}  # Tool name -> MCPToolInfo

        # Get a debug log session
        self.logger = logging_manager.get_session(self.__class__.__name__,
                                  formatter=logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))

    @property
    def publisher(self) -> EventPublisher:
        """Access to the EventPublisher for MCP lifecycle events.

        Returns:
            EventPublisher: The publisher for MCP-related events.
        """
        return self._event_publisher

    @property
    def is_connected(self) -> bool:
        """Check if the manager has any active connections to MCP servers.

        Returns:
            bool: True if connected to at least one MCP server, False otherwise.
        """
        return bool(self.mcp_clients)

    def _publish_server_event(self, event_type: EventType, server_path: str, **additional_data) -> None:
        """Publish a server lifecycle event.

        Args:
            event_type (EventType): Type of server event to publish.
            server_path (str): Path to the server that triggered the event.
            **additional_data: Additional data to include in the event.
        """
        event_data = {
            "server_path": server_path,
            **additional_data
        }
        self._event_publisher.publish(event_type, event_data)
        self.logger.debug(f"Published {event_type.value} event for server: {server_path}")

    def _publish_tool_event(self, event_type: EventType, tool_name: str, 
                           tool_info: MCPToolInfo, **additional_data) -> None:
        """Publish a tool lifecycle event.

        Args:
            event_type (EventType): Type of tool event to publish.
            tool_name (str): Name of the tool that triggered the event.
            tool_info (MCPToolInfo): MCP Tool information.
            **additional_data: Additional data to include in the event.
        """
        event_data = {
            "tool_name": tool_name,
            "tool_info": tool_info,
            **additional_data
        }

        self._event_publisher.publish(event_type, event_data)
        self.logger.debug(f"Published {event_type.value} event for tool: {tool_name}")

    def validate_server_paths(self, server_paths: List[str]) -> List[str]:
        """Validate server paths and return the list of valid absolute paths.

        Args:
            server_paths (List[str]): List of server paths to validate.

        Returns:
            List[str]: List of valid absolute paths.
        """
        valid_paths = []
        for path in server_paths:
            # Convert to absolute path if relative
            abs_path = os.path.abspath(path)

            # Check if file exists
            if not os.path.isfile(abs_path):
                self.logger.error(f"MCP server script not found: {abs_path}")
                continue

            valid_paths.append(abs_path)

        return valid_paths

    async def connect_to_servers(self, server_paths: Optional[List[str]] = None) -> bool:
        """Connect to all configured MCP servers.

        Args:
            server_paths (Optional[List[str]]): List of paths to MCP server scripts.
            If None, connects to all configured servers.

        Returns:
            bool: False if not valid server paths are provided
        """

        if server_paths is None:
            all_server_paths = self.hatch_env_manager.get_servers_entry_points()
            # filter out server paths that are already connected
            server_paths = []
            for path in all_server_paths:
                already_connected = False
                for client in self.mcp_clients.values():
                    if client.server_path == path:
                        self.logger.debug(f"Skipping already connected server: {path}")
                        already_connected = True
                        break
                if not already_connected:
                    server_paths.append(path)

        if not server_paths:
            self.logger.info("No new MCP servers to connect to. All configured servers are already connected.")
            return True

        async with self._connection_lock:
            # Validate server paths
            valid_paths = self.validate_server_paths(server_paths)
            if not valid_paths:
                self.logger.error(". Check the paths provided.")
                return False

            # Connect to each valid server path
            for path in valid_paths:
                # Create client with environment resolver
                client = MCPClient(settings=self.settings, python_executable_resolver=self._get_python_executable)
                is_connected = await client.connect(path)
                if is_connected:
                    self.mcp_clients[path] = client

                    # Publish server up event
                    self._publish_server_event(EventType.MCP_SERVER_UP, path, 
                                             tool_count=len(client.tools))

                    # Cache tool mappings and create MCPToolInfo for each tool
                    for tool_name, tool_obj in client.tools.items():
                        self._tool_client_map[tool_name] = client

                        # Create MCPToolInfo for lifecycle management
                        tool_info = MCPToolInfo(
                            name=tool_name,
                            description=getattr(tool_obj, 'description', f'No description available'),
                            schema=getattr(tool_obj, 'No argument schema available', {}),
                            server_path=path,
                            status=MCPToolStatus.ENABLED,
                            reason=MCPToolStatusReason.FROM_SERVER_UP
                        )
                        self._managed_tools[tool_name] = tool_info

                        # Publish tool enabled event
                        self._publish_tool_event(EventType.MCP_TOOL_ENABLED, tool_name, tool_info)
                else:
                    # Publish server unreachable event
                    self._publish_server_event(EventType.MCP_SERVER_UNREACHABLE, path,
                                             error="Failed to connect")

            if len(self.mcp_clients) > 0:
                # Log the available tools across all clients
                total_tools = sum(len(client.tools) for client in self.mcp_clients.values())
                self.logger.info(f"Connected to {len(self.mcp_clients)} MCP servers with {total_tools} total tools")
            else:
                self.logger.warning("Failed to connect to any MCP server")

            return True

    async def disconnect_all(self) -> None:
        """Disconnect from all MCP servers."""

        async with self._connection_lock:
            # Store the current task for debugging
            current_task_id = id(asyncio.current_task())
            self.logger.debug(f"Disconnecting all clients from task: {current_task_id}")

            disconnection_errors = False

            # First try the graceful disconnect approach
            for path, client in list(self.mcp_clients.items()):
                try:
                    # Disable all tools from this server before disconnecting
                    for tool_name, tool_info in self._managed_tools.items():
                        if tool_info.server_path == path and tool_info.status == MCPToolStatus.ENABLED:
                            tool_info.status = MCPToolStatus.DISABLED
                            tool_info.reason = MCPToolStatusReason.FROM_SERVER_DOWN

                            # Publish tool disabled event
                            self._publish_tool_event(EventType.MCP_TOOL_DISABLED, tool_name, tool_info)

                    # Log task context for debugging
                    if hasattr(client, '_connection_task_id') and client._connection_task_id:
                        self.logger.debug(f"Client for {path} was created in task: {client._connection_task_id}")

                    # Try graceful disconnect first with timeout
                    disconnect_task = asyncio.create_task(client.disconnect())
                    try:
                        await asyncio.wait_for(disconnect_task, timeout=10)
                        # Publish server down event on successful disconnect
                        self._publish_server_event(EventType.MCP_SERVER_DOWN, path)
                    except asyncio.TimeoutError:
                        self.logger.warning(f"Disconnect timeout for {path}")
                        disconnection_errors = True
                        # Publish server unreachable event
                        self._publish_server_event(EventType.MCP_SERVER_UNREACHABLE, path,
                                                 error="Disconnect timeout")
                    except Exception as e:
                        self.logger.error(f"Error during graceful disconnect for {path}: {e}")
                        disconnection_errors = True
                        # Publish server unreachable event
                        self._publish_server_event(EventType.MCP_SERVER_UNREACHABLE, path,
                                                 error=str(e))
                except Exception as e:
                    self.logger.error(f"Error setting up disconnect for {path}: {e}")
                    disconnection_errors = True

            # If any disconnections failed with errors, use forceful termination
            if disconnection_errors:
                self.logger.warning("Some disconnections failed. Using forceful termination as fallback.")
                self._terminate_server_processes()

            # Clear all client tracking regardless of disconnection success
            self.mcp_clients = {}
            self._tool_client_map = {}

            # Clear managed tools
            self._managed_tools = {}

            self.logger.info("Disconnected from all MCP servers")

    def _terminate_server_processes(self) -> None:
        """Terminate all server processes directly.
        This is a fallback mechanism when graceful disconnection fails.
        """
        terminated_count = 0

        # Kill all server processes
        for path, process in list(self.server_processes.items()):
            if process.poll() is None:  # Process is still running
                try:
                    # Send SIGTERM first for cleaner shutdown
                    process.terminate()

                    # Give it a moment to terminate
                    try:
                        process.wait(timeout=2)
                    except subprocess.TimeoutExpired:
                        # If it doesn't terminate in time, force kill
                        process.kill()
                        self.logger.warning(f"Force killed MCP server process: {path}")

                    terminated_count += 1
                    self.logger.debug(f"Terminated MCP server process: {path}")
                except Exception as e:
                    self.logger.error(f"Failed to terminate process for {path}: {e}")

            # Remove from tracking regardless of kill success
            del self.server_processes[path]

        if terminated_count > 0:
            self.logger.info(f"Forcefully terminated {terminated_count} server processes")

    def get_enabled_tools(self) -> Dict[str, MCPToolInfo]:
        """Get all enabled tools with their information.

        Returns:
            Dict[str, MCPToolInfo]: Dictionary mapping tool names to enabled MCPToolInfo objects.
        """
        return {
            name: info for name, info in self._managed_tools.items()
            if info.status == MCPToolStatus.ENABLED
        }

    def get_all_managed_tools(self) -> Dict[str, MCPToolInfo]:
        """Get all managed tools (both enabled and disabled).

        Returns:
            Dict[str, MCPToolInfo]: Dictionary mapping tool names to all MCPToolInfo objects.
        """
        return self._managed_tools.copy()

    def enable_tool(self, tool_name: str) -> bool:
        """Enable a specific tool if it exists and is disabled.

        Args:
            tool_name (str): Name of the tool to enable.

        Returns:
            bool: True if the tool was enabled, False if it was already enabled or doesn't exist.
        """
        if tool_name not in self._managed_tools:
            self.logger.warning(f"Tool '{tool_name}' not found in managed tools")
            return False

        tool_info = self._managed_tools[tool_name]

        if tool_info.status == MCPToolStatus.ENABLED:
            self.logger.debug(f"Tool '{tool_name}' is already enabled")
            return False

        # Check if the server is still available
        if tool_info.server_path not in self.mcp_clients:
            self.logger.warning(f"Cannot enable tool '{tool_name}' - server is not connected")
            return False

        # Enable the tool
        tool_info.status = MCPToolStatus.ENABLED
        tool_info.reason = MCPToolStatusReason.FROM_USER_ENABLED

        # Update timestamp
        import time
        tool_info.last_updated = time.time()

        # Publish event
        self._publish_tool_event(EventType.MCP_TOOL_ENABLED, tool_name, tool_info)

        self.logger.info(f"Enabled tool: {tool_name}")
        return True

    def disable_tool(self, tool_name: str) -> bool:
        """Disable a specific tool if it exists and is enabled.

        Args:
            tool_name (str): Name of the tool to disable.

        Returns:
            bool: True if the tool was disabled, False if it was already disabled or doesn't exist.
        """
        if tool_name not in self._managed_tools:
            self.logger.warning(f"Tool '{tool_name}' not found in managed tools")
            return False

        tool_info = self._managed_tools[tool_name]

        if tool_info.status == MCPToolStatus.DISABLED:
            self.logger.debug(f"Tool '{tool_name}' is already disabled")
            return False

        # Disable the tool
        tool_info.status = MCPToolStatus.DISABLED
        tool_info.reason = MCPToolStatusReason.FROM_USER_DISABLED

        # Update timestamp
        import time
        tool_info.last_updated = time.time()

        # Publish event
        self._publish_tool_event(EventType.MCP_TOOL_DISABLED, tool_name, tool_info)

        self.logger.info(f"Disabled tool: {tool_name}")
        return True

    def get_tool_status(self, tool_name: str) -> Optional[MCPToolInfo]:
        """Get the current status and information for a tool.

        Args:
            tool_name (str): Name of the tool to get status for.

        Returns:
            Optional[MCPToolInfo]: Tool information if found, None otherwise.
        """
        return self._managed_tools.get(tool_name)

    async def execute_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Any:
        """Execute a tool by name with the given arguments.

        Args:
            tool_name (str): Name of the tool to execute.
            arguments (Dict[str, Any]): Arguments to pass to the tool.

        Returns:
            Any: Result of the tool execution.

        Raises:
            ConnectionError: If not connected to any MCP server.
            ValueError: If the tool is not found in any connected MCP server.
        """
        if not self.mcp_clients:
            raise ConnectionError("Not connected to any MCP server")

        if tool_name not in self._tool_client_map:
            raise ValueError(f"Tool '{tool_name}' not found in any connected MCP server")

        client = self._tool_client_map[tool_name]
        self._used_servers_in_session.add(client.server_path)

        try:
            return await client.execute_tool(tool_name, arguments)

        except ConnectionError:
            # mark the server as unreachable
            self.logger.error(f"Failed to execute tool '{tool_name}' - server {client.server_path} is unreachable")
            self._publish_server_event(EventType.MCP_SERVER_UNREACHABLE, client.server_path,
                                      error="Failed to execute tool")

    async def get_citations_for_session(self) -> Dict[str, Dict[str, str]]:
        """Get citations for all servers used in the current session.

        Returns:
            Dict[str, Dict[str, str]]: Dictionary of citations for each server.
        """
        citations = {}

        for path in self._used_servers_in_session:
            if path in self.mcp_clients:
                client = self.mcp_clients[path]
                try:
                    server_citations = await client.get_citations()
                    citations[path] = server_citations
                except Exception as e:
                    self.logger.error(f"Error getting citations for {path}: {e}")

        return citations

    def reset_session_tracking(self):
        """Reset the tracking of which servers were used."""
        self._used_servers_in_session.clear()

    def _get_python_executable(self) -> str:
        """Get the appropriate Python executable for the current environment.

        Returns:
            str: Path to Python executable, falls back to system Python.
        """
        if self.hatch_env_manager:
            current_env = self.hatch_env_manager.get_current_environment()
            if current_env:
                python_env_info = self.hatch_env_manager.get_python_environment_info(current_env)
                if python_env_info:
                    python_executable = python_env_info.get("python_executable")
                    if python_executable:
                        self.logger.debug(f"Using environment Python for {current_env}: {python_executable}")
                        return python_executable

        # Fallback to system Python
        system_python = sys.executable
        self.logger.debug(f"Using system Python: {system_python}")
        return system_python
Attributes
is_connected property

Check if the manager has any active connections to MCP servers.

Returns:

Name Type Description
bool bool

True if connected to at least one MCP server, False otherwise.

publisher property

Access to the EventPublisher for MCP lifecycle events.

Returns:

Name Type Description
EventPublisher EventPublisher

The publisher for MCP-related events.

Functions
__init__(settings=None)

Initialize the MCP manager if not already initialized.

Source code in hatchling/mcp_utils/manager.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def __init__(self, settings: Optional[AppSettings] = None):
    """Initialize the MCP manager if not already initialized."""
    if self._initialized:
        return

    # Initialize only once
    self._initialized = True

    # Connection tracking
    self.mcp_clients: Dict[str, MCPClient] = {}
    self.server_processes: Dict[str, subprocess.Popen] = {}
    self._connection_lock = asyncio.Lock()

    # Hatchling settings registry
    self.settings = settings or AppSettings.get_instance()

    # Tool tracking
    self._tool_client_map: Dict[str, MCPClient] = {}  # Map of tool names to clients that provide them

    # Hatch server usage
    self._used_servers_in_session = set()

    # Environment context for Python executable
    self.hatch_env_manager = HatchEnvironmentManager(
        environments_dir=self.settings.paths.envs_dir,
        cache_ttl=86400,  # 1 day default
    )

    # Event publishing capabilities
    self._event_publisher = EventPublisher()

    # Tool management for lifecycle events
    self._managed_tools: Dict[str, MCPToolInfo] = {}  # Tool name -> MCPToolInfo

    # Get a debug log session
    self.logger = logging_manager.get_session(self.__class__.__name__,
                              formatter=logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
__new__()

Ensure singleton pattern implementation.

Returns:

Name Type Description
MCPManager

The singleton instance of the MCPManager.

Source code in hatchling/mcp_utils/manager.py
26
27
28
29
30
31
32
33
34
35
def __new__(cls):
    """Ensure singleton pattern implementation.

    Returns:
        MCPManager: The singleton instance of the MCPManager.
    """
    if cls._instance is None:
        cls._instance = super(MCPManager, cls).__new__(cls)
        cls._instance._initialized = False
    return cls._instance
connect_to_servers(server_paths=None) async

Connect to all configured MCP servers.

Parameters:

Name Type Description Default
server_paths Optional[List[str]]

List of paths to MCP server scripts.

None

Returns:

Name Type Description
bool bool

False if not valid server paths are provided

Source code in hatchling/mcp_utils/manager.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
async def connect_to_servers(self, server_paths: Optional[List[str]] = None) -> bool:
    """Connect to all configured MCP servers.

    Args:
        server_paths (Optional[List[str]]): List of paths to MCP server scripts.
        If None, connects to all configured servers.

    Returns:
        bool: False if not valid server paths are provided
    """

    if server_paths is None:
        all_server_paths = self.hatch_env_manager.get_servers_entry_points()
        # filter out server paths that are already connected
        server_paths = []
        for path in all_server_paths:
            already_connected = False
            for client in self.mcp_clients.values():
                if client.server_path == path:
                    self.logger.debug(f"Skipping already connected server: {path}")
                    already_connected = True
                    break
            if not already_connected:
                server_paths.append(path)

    if not server_paths:
        self.logger.info("No new MCP servers to connect to. All configured servers are already connected.")
        return True

    async with self._connection_lock:
        # Validate server paths
        valid_paths = self.validate_server_paths(server_paths)
        if not valid_paths:
            self.logger.error(". Check the paths provided.")
            return False

        # Connect to each valid server path
        for path in valid_paths:
            # Create client with environment resolver
            client = MCPClient(settings=self.settings, python_executable_resolver=self._get_python_executable)
            is_connected = await client.connect(path)
            if is_connected:
                self.mcp_clients[path] = client

                # Publish server up event
                self._publish_server_event(EventType.MCP_SERVER_UP, path, 
                                         tool_count=len(client.tools))

                # Cache tool mappings and create MCPToolInfo for each tool
                for tool_name, tool_obj in client.tools.items():
                    self._tool_client_map[tool_name] = client

                    # Create MCPToolInfo for lifecycle management
                    tool_info = MCPToolInfo(
                        name=tool_name,
                        description=getattr(tool_obj, 'description', f'No description available'),
                        schema=getattr(tool_obj, 'No argument schema available', {}),
                        server_path=path,
                        status=MCPToolStatus.ENABLED,
                        reason=MCPToolStatusReason.FROM_SERVER_UP
                    )
                    self._managed_tools[tool_name] = tool_info

                    # Publish tool enabled event
                    self._publish_tool_event(EventType.MCP_TOOL_ENABLED, tool_name, tool_info)
            else:
                # Publish server unreachable event
                self._publish_server_event(EventType.MCP_SERVER_UNREACHABLE, path,
                                         error="Failed to connect")

        if len(self.mcp_clients) > 0:
            # Log the available tools across all clients
            total_tools = sum(len(client.tools) for client in self.mcp_clients.values())
            self.logger.info(f"Connected to {len(self.mcp_clients)} MCP servers with {total_tools} total tools")
        else:
            self.logger.warning("Failed to connect to any MCP server")

        return True
disable_tool(tool_name)

Disable a specific tool if it exists and is enabled.

Parameters:

Name Type Description Default
tool_name str

Name of the tool to disable.

required

Returns:

Name Type Description
bool bool

True if the tool was disabled, False if it was already disabled or doesn't exist.

Source code in hatchling/mcp_utils/manager.py
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
def disable_tool(self, tool_name: str) -> bool:
    """Disable a specific tool if it exists and is enabled.

    Args:
        tool_name (str): Name of the tool to disable.

    Returns:
        bool: True if the tool was disabled, False if it was already disabled or doesn't exist.
    """
    if tool_name not in self._managed_tools:
        self.logger.warning(f"Tool '{tool_name}' not found in managed tools")
        return False

    tool_info = self._managed_tools[tool_name]

    if tool_info.status == MCPToolStatus.DISABLED:
        self.logger.debug(f"Tool '{tool_name}' is already disabled")
        return False

    # Disable the tool
    tool_info.status = MCPToolStatus.DISABLED
    tool_info.reason = MCPToolStatusReason.FROM_USER_DISABLED

    # Update timestamp
    import time
    tool_info.last_updated = time.time()

    # Publish event
    self._publish_tool_event(EventType.MCP_TOOL_DISABLED, tool_name, tool_info)

    self.logger.info(f"Disabled tool: {tool_name}")
    return True
disconnect_all() async

Disconnect from all MCP servers.

Source code in hatchling/mcp_utils/manager.py
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
async def disconnect_all(self) -> None:
    """Disconnect from all MCP servers."""

    async with self._connection_lock:
        # Store the current task for debugging
        current_task_id = id(asyncio.current_task())
        self.logger.debug(f"Disconnecting all clients from task: {current_task_id}")

        disconnection_errors = False

        # First try the graceful disconnect approach
        for path, client in list(self.mcp_clients.items()):
            try:
                # Disable all tools from this server before disconnecting
                for tool_name, tool_info in self._managed_tools.items():
                    if tool_info.server_path == path and tool_info.status == MCPToolStatus.ENABLED:
                        tool_info.status = MCPToolStatus.DISABLED
                        tool_info.reason = MCPToolStatusReason.FROM_SERVER_DOWN

                        # Publish tool disabled event
                        self._publish_tool_event(EventType.MCP_TOOL_DISABLED, tool_name, tool_info)

                # Log task context for debugging
                if hasattr(client, '_connection_task_id') and client._connection_task_id:
                    self.logger.debug(f"Client for {path} was created in task: {client._connection_task_id}")

                # Try graceful disconnect first with timeout
                disconnect_task = asyncio.create_task(client.disconnect())
                try:
                    await asyncio.wait_for(disconnect_task, timeout=10)
                    # Publish server down event on successful disconnect
                    self._publish_server_event(EventType.MCP_SERVER_DOWN, path)
                except asyncio.TimeoutError:
                    self.logger.warning(f"Disconnect timeout for {path}")
                    disconnection_errors = True
                    # Publish server unreachable event
                    self._publish_server_event(EventType.MCP_SERVER_UNREACHABLE, path,
                                             error="Disconnect timeout")
                except Exception as e:
                    self.logger.error(f"Error during graceful disconnect for {path}: {e}")
                    disconnection_errors = True
                    # Publish server unreachable event
                    self._publish_server_event(EventType.MCP_SERVER_UNREACHABLE, path,
                                             error=str(e))
            except Exception as e:
                self.logger.error(f"Error setting up disconnect for {path}: {e}")
                disconnection_errors = True

        # If any disconnections failed with errors, use forceful termination
        if disconnection_errors:
            self.logger.warning("Some disconnections failed. Using forceful termination as fallback.")
            self._terminate_server_processes()

        # Clear all client tracking regardless of disconnection success
        self.mcp_clients = {}
        self._tool_client_map = {}

        # Clear managed tools
        self._managed_tools = {}

        self.logger.info("Disconnected from all MCP servers")
enable_tool(tool_name)

Enable a specific tool if it exists and is disabled.

Parameters:

Name Type Description Default
tool_name str

Name of the tool to enable.

required

Returns:

Name Type Description
bool bool

True if the tool was enabled, False if it was already enabled or doesn't exist.

Source code in hatchling/mcp_utils/manager.py
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
def enable_tool(self, tool_name: str) -> bool:
    """Enable a specific tool if it exists and is disabled.

    Args:
        tool_name (str): Name of the tool to enable.

    Returns:
        bool: True if the tool was enabled, False if it was already enabled or doesn't exist.
    """
    if tool_name not in self._managed_tools:
        self.logger.warning(f"Tool '{tool_name}' not found in managed tools")
        return False

    tool_info = self._managed_tools[tool_name]

    if tool_info.status == MCPToolStatus.ENABLED:
        self.logger.debug(f"Tool '{tool_name}' is already enabled")
        return False

    # Check if the server is still available
    if tool_info.server_path not in self.mcp_clients:
        self.logger.warning(f"Cannot enable tool '{tool_name}' - server is not connected")
        return False

    # Enable the tool
    tool_info.status = MCPToolStatus.ENABLED
    tool_info.reason = MCPToolStatusReason.FROM_USER_ENABLED

    # Update timestamp
    import time
    tool_info.last_updated = time.time()

    # Publish event
    self._publish_tool_event(EventType.MCP_TOOL_ENABLED, tool_name, tool_info)

    self.logger.info(f"Enabled tool: {tool_name}")
    return True
execute_tool(tool_name, arguments) async

Execute a tool by name with the given arguments.

Parameters:

Name Type Description Default
tool_name str

Name of the tool to execute.

required
arguments Dict[str, Any]

Arguments to pass to the tool.

required

Returns:

Name Type Description
Any Any

Result of the tool execution.

Raises:

Type Description
ConnectionError

If not connected to any MCP server.

ValueError

If the tool is not found in any connected MCP server.

Source code in hatchling/mcp_utils/manager.py
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
async def execute_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Any:
    """Execute a tool by name with the given arguments.

    Args:
        tool_name (str): Name of the tool to execute.
        arguments (Dict[str, Any]): Arguments to pass to the tool.

    Returns:
        Any: Result of the tool execution.

    Raises:
        ConnectionError: If not connected to any MCP server.
        ValueError: If the tool is not found in any connected MCP server.
    """
    if not self.mcp_clients:
        raise ConnectionError("Not connected to any MCP server")

    if tool_name not in self._tool_client_map:
        raise ValueError(f"Tool '{tool_name}' not found in any connected MCP server")

    client = self._tool_client_map[tool_name]
    self._used_servers_in_session.add(client.server_path)

    try:
        return await client.execute_tool(tool_name, arguments)

    except ConnectionError:
        # mark the server as unreachable
        self.logger.error(f"Failed to execute tool '{tool_name}' - server {client.server_path} is unreachable")
        self._publish_server_event(EventType.MCP_SERVER_UNREACHABLE, client.server_path,
                                  error="Failed to execute tool")
get_all_managed_tools()

Get all managed tools (both enabled and disabled).

Returns:

Type Description
Dict[str, MCPToolInfo]

Dict[str, MCPToolInfo]: Dictionary mapping tool names to all MCPToolInfo objects.

Source code in hatchling/mcp_utils/manager.py
334
335
336
337
338
339
340
def get_all_managed_tools(self) -> Dict[str, MCPToolInfo]:
    """Get all managed tools (both enabled and disabled).

    Returns:
        Dict[str, MCPToolInfo]: Dictionary mapping tool names to all MCPToolInfo objects.
    """
    return self._managed_tools.copy()
get_citations_for_session() async

Get citations for all servers used in the current session.

Returns:

Type Description
Dict[str, Dict[str, str]]

Dict[str, Dict[str, str]]: Dictionary of citations for each server.

Source code in hatchling/mcp_utils/manager.py
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
async def get_citations_for_session(self) -> Dict[str, Dict[str, str]]:
    """Get citations for all servers used in the current session.

    Returns:
        Dict[str, Dict[str, str]]: Dictionary of citations for each server.
    """
    citations = {}

    for path in self._used_servers_in_session:
        if path in self.mcp_clients:
            client = self.mcp_clients[path]
            try:
                server_citations = await client.get_citations()
                citations[path] = server_citations
            except Exception as e:
                self.logger.error(f"Error getting citations for {path}: {e}")

    return citations
get_enabled_tools()

Get all enabled tools with their information.

Returns:

Type Description
Dict[str, MCPToolInfo]

Dict[str, MCPToolInfo]: Dictionary mapping tool names to enabled MCPToolInfo objects.

Source code in hatchling/mcp_utils/manager.py
323
324
325
326
327
328
329
330
331
332
def get_enabled_tools(self) -> Dict[str, MCPToolInfo]:
    """Get all enabled tools with their information.

    Returns:
        Dict[str, MCPToolInfo]: Dictionary mapping tool names to enabled MCPToolInfo objects.
    """
    return {
        name: info for name, info in self._managed_tools.items()
        if info.status == MCPToolStatus.ENABLED
    }
get_tool_status(tool_name)

Get the current status and information for a tool.

Parameters:

Name Type Description Default
tool_name str

Name of the tool to get status for.

required

Returns:

Type Description
Optional[MCPToolInfo]

Optional[MCPToolInfo]: Tool information if found, None otherwise.

Source code in hatchling/mcp_utils/manager.py
413
414
415
416
417
418
419
420
421
422
def get_tool_status(self, tool_name: str) -> Optional[MCPToolInfo]:
    """Get the current status and information for a tool.

    Args:
        tool_name (str): Name of the tool to get status for.

    Returns:
        Optional[MCPToolInfo]: Tool information if found, None otherwise.
    """
    return self._managed_tools.get(tool_name)
reset_session_tracking()

Reset the tracking of which servers were used.

Source code in hatchling/mcp_utils/manager.py
475
476
477
def reset_session_tracking(self):
    """Reset the tracking of which servers were used."""
    self._used_servers_in_session.clear()
validate_server_paths(server_paths)

Validate server paths and return the list of valid absolute paths.

Parameters:

Name Type Description Default
server_paths List[str]

List of server paths to validate.

required

Returns:

Type Description
List[str]

List[str]: List of valid absolute paths.

Source code in hatchling/mcp_utils/manager.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def validate_server_paths(self, server_paths: List[str]) -> List[str]:
    """Validate server paths and return the list of valid absolute paths.

    Args:
        server_paths (List[str]): List of server paths to validate.

    Returns:
        List[str]: List of valid absolute paths.
    """
    valid_paths = []
    for path in server_paths:
        # Convert to absolute path if relative
        abs_path = os.path.abspath(path)

        # Check if file exists
        if not os.path.isfile(abs_path):
            self.logger.error(f"MCP server script not found: {abs_path}")
            continue

        valid_paths.append(abs_path)

    return valid_paths

MCPServerAPI

Clean API for MCP server management and debugging.

This class provides a simplified interface for: - Server connection management - Tool discovery and management - Manual tool execution for debugging - Server health monitoring

Source code in hatchling/mcp_utils/mcp_server_api.py
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
class MCPServerAPI:
    """Clean API for MCP server management and debugging.

    This class provides a simplified interface for:
    - Server connection management
    - Tool discovery and management
    - Manual tool execution for debugging
    - Server health monitoring
    """

    # =============================================================================
    # Server Management
    # =============================================================================

    @staticmethod
    async def connect_servers(server_paths: Optional[List[str]] = None) -> bool:
        """Connect to MCP servers.

        Args:
            server_paths (List[str]): List of paths to MCP server scripts.
            If None, connects to all configured servers.

        Returns:
            bool: True if at least one server connected successfully.
        """
        return await mcp_manager.connect_to_servers(server_paths)

    @staticmethod
    async def disconnect_all_servers() -> None:
        """Disconnect from all MCP servers."""
        await mcp_manager.disconnect_all()

    @staticmethod
    def get_server_list() -> List[MCPServerInfo]:
        """Get list of all configured MCP servers with their status.

        Returns:
            List[MCPServerInfo]: List of server information.
        """
        servers = []

        # Get connected servers
        for path, client in mcp_manager.mcp_clients.items():
            tool_count = len(client.tools)
            enabled_count = len([
                tool for tool in mcp_manager._managed_tools.values()
                if tool.server_path == path and tool.status == MCPToolStatus.ENABLED
            ])

            servers.append(MCPServerInfo(
                path=path,
                status=MCPServerStatus.CONNECTED,
                tool_count=tool_count,
                enabled_tool_count=enabled_count,
                last_connected=time.time()  # Approximate
            ))

        return servers

    @staticmethod
    def get_server_status(server_path: str) -> MCPServerInfo:
        """Get detailed status for a specific server.

        Args:
            server_path (str): Path to the MCP server script.

        Returns:
            MCPServerInfo: Server status information.
        """
        if server_path in mcp_manager.mcp_clients:
            client = mcp_manager.mcp_clients[server_path]
            tool_count = len(client.tools)
            enabled_count = len([
                tool for tool in mcp_manager._managed_tools.values()
                if tool.server_path == server_path and tool.status == MCPToolStatus.ENABLED
            ])

            return MCPServerInfo(
                path=server_path,
                status=MCPServerStatus.CONNECTED,
                tool_count=tool_count,
                enabled_tool_count=enabled_count,
                last_connected=time.time()
            )
        else:
            return MCPServerInfo(
                path=server_path,
                status=MCPServerStatus.DISCONNECTED,
                tool_count=0,
                enabled_tool_count=0,
                error_message="Server not connected"
            )

    # =============================================================================
    # Tool Management  
    # =============================================================================

    @staticmethod
    def get_all_tools() -> List[MCPToolSummary]:
        """Get list of all available MCP tools.

        Returns:
            List[MCPToolSummary]: List of tool summaries.
        """
        tools = []

        for tool_name, tool_info in mcp_manager.get_all_managed_tools().items():
            # Try to get tool description from the actual tool object
            description = None
            if tool_info.server_path in mcp_manager.mcp_clients:
                client = mcp_manager.mcp_clients[tool_info.server_path]
                tool_obj = client.tools.get(tool_name)
                if tool_obj and hasattr(tool_obj, 'description'):
                    description = tool_obj.description

            tools.append(MCPToolSummary(
                name=tool_name,
                server_path=tool_info.server_path,
                status=tool_info.status,
                description=description,
                last_updated=tool_info.last_updated
            ))

        return tools

    @staticmethod
    def get_enabled_tools() -> List[MCPToolSummary]:
        """Get list of enabled MCP tools.

        Returns:
            List[MCPToolSummary]: List of enabled tool summaries.
        """
        return [tool for tool in MCPServerAPI.get_all_tools() if tool.status == MCPToolStatus.ENABLED]

    @staticmethod
    def get_tools_by_server(server_path: str) -> List[MCPToolSummary]:
        """Get all tools provided by a specific server.

        Args:
            server_path (str): Path to the MCP server script.

        Returns:
            List[MCPToolSummary]: List of tools from the server.
        """
        return [tool for tool in MCPServerAPI.get_all_tools() if tool.server_path == server_path]

    @staticmethod
    def enable_tool(tool_name: str) -> bool:
        """Enable a specific tool.

        Args:
            tool_name (str): Name of the tool to enable.

        Returns:
            bool: True if tool was enabled successfully.
        """
        # Get managed tools directly instead of using manager method
        managed_tools = mcp_manager._managed_tools

        if tool_name not in managed_tools:
            logger.warning(f"Tool '{tool_name}' not found in managed tools")
            return False

        tool_info = managed_tools[tool_name]

        if tool_info.status == MCPToolStatus.ENABLED:
            logger.debug(f"Tool '{tool_name}' is already enabled")
            return True

        # Check if the server is still available
        if tool_info.server_path not in mcp_manager.mcp_clients:
            logger.warning(f"Cannot enable tool '{tool_name}' - server is not connected")
            return False

        # Enable the tool
        tool_info.status = MCPToolStatus.ENABLED
        tool_info.reason = MCPToolStatusReason.FROM_USER_ENABLED

        # Update timestamp
        import time
        tool_info.last_updated = time.time()

        # Publish event through manager's publisher
        mcp_manager._publish_tool_event(EventType.MCP_TOOL_ENABLED, tool_name, tool_info)

        logger.info(f"Enabled tool: {tool_name}")
        return True

    @staticmethod
    def disable_tool(tool_name: str) -> bool:
        """Disable a specific tool.

        Args:
            tool_name (str): Name of the tool to disable.

        Returns:
            bool: True if tool was disabled successfully.
        """
        # Get managed tools directly instead of using manager method
        managed_tools = mcp_manager._managed_tools

        if tool_name not in managed_tools:
            logger.warning(f"Tool '{tool_name}' not found in managed tools")
            return False

        tool_info = managed_tools[tool_name]

        if tool_info.status == MCPToolStatus.DISABLED:
            logger.debug(f"Tool '{tool_name}' is already disabled")
            return False

        # Disable the tool
        tool_info.status = MCPToolStatus.DISABLED
        tool_info.reason = MCPToolStatusReason.FROM_USER_DISABLED

        # Update timestamp
        import time
        tool_info.last_updated = time.time()

        # Publish event through manager's publisher
        mcp_manager._publish_tool_event(EventType.MCP_TOOL_DISABLED, tool_name, tool_info)

        logger.info(f"Disabled tool: {tool_name}")
        return True

    @staticmethod
    def get_tool_info(tool_name: str) -> Optional[MCPToolSummary]:
        """Get detailed information about a specific tool.

        Args:
            tool_name (str): Name of the tool.

        Returns:
            Optional[MCPToolSummary]: Tool information if found.
        """
        tool_info = mcp_manager.get_tool_status(tool_name)
        if not tool_info:
            return None

        # Get description from tool object
        description = None
        if tool_info.server_path in mcp_manager.mcp_clients:
            client = mcp_manager.mcp_clients[tool_info.server_path]
            tool_obj = client.tools.get(tool_name)
            if tool_obj and hasattr(tool_obj, 'description'):
                description = tool_obj.description

        return MCPToolSummary(
            name=tool_name,
            server_path=tool_info.server_path,
            status=tool_info.status,
            description=description,
            last_updated=tool_info.last_updated
        )

    # =============================================================================
    # Manual Tool Execution (Debugging)
    # =============================================================================

    @staticmethod
    async def execute_tool_manually(tool_name: str, arguments: Dict[str, Any]) -> Tuple[bool, Any, Optional[str]]:
        """Execute an MCP tool manually for debugging purposes.

        Args:
            tool_name (str): Name of the tool to execute.
            arguments (Dict[str, Any]): Arguments to pass to the tool.

        Returns:
            Tuple[bool, Any, Optional[str]]: Success flag, result, and error message if any.
        """
        try:
            result = await mcp_manager.execute_tool(tool_name, arguments)
            return True, result, None
        except ConnectionError as e:
            return False, None, f"Connection error: {e}"
        except ValueError as e:
            return False, None, f"Tool not found: {e}"
        except Exception as e:
            return False, None, f"Execution error: {e}"

    @staticmethod
    def get_tool_schema(tool_name: str) -> Optional[Dict[str, Any]]:
        """Get the JSON schema for a tool's arguments.

        Args:
            tool_name (str): Name of the tool.

        Returns:
            Optional[Dict[str, Any]]: Tool schema if available.
        """
        tool_info = mcp_manager.get_tool_status(tool_name)
        if not tool_info or tool_info.server_path not in mcp_manager.mcp_clients:
            return None

        return tool_info.schema

    # =============================================================================
    # Health and Diagnostics
    # =============================================================================

    @staticmethod
    def get_health_summary() -> Dict[str, Any]:
        """Get overall health summary of MCP system.

        Returns:
            Dict[str, Any]: Health summary including server and tool counts.
        """
        servers = MCPServerAPI.get_server_list()
        all_tools = MCPServerAPI.get_all_tools()

        return {
            "connected_servers": len(servers),
            "total_tools": len(all_tools),
            "enabled_tools": len([t for t in all_tools if t.status == MCPToolStatus.ENABLED]),
            "disabled_tools": len([t for t in all_tools if t.status == MCPToolStatus.DISABLED]),
            "server_details": [
                {
                    "path": server.path,
                    "status": server.status.value,
                    "tools": server.tool_count,
                    "enabled_tools": server.enabled_tool_count
                }
                for server in servers
            ]
        }

    @staticmethod
    async def get_session_citations() -> Dict[str, Dict[str, str]]:
        """Get citations for all servers used in the current session.

        Returns:
            Dict[str, Dict[str, str]]: Server citations.
        """
        return await mcp_manager.get_citations_for_session()

    @staticmethod
    def reset_session_tracking() -> None:
        """Reset session tracking for citations."""
        mcp_manager.reset_session_tracking()
Functions
connect_servers(server_paths=None) async staticmethod

Connect to MCP servers.

Parameters:

Name Type Description Default
server_paths List[str]

List of paths to MCP server scripts.

None

Returns:

Name Type Description
bool bool

True if at least one server connected successfully.

Source code in hatchling/mcp_utils/mcp_server_api.py
64
65
66
67
68
69
70
71
72
73
74
75
@staticmethod
async def connect_servers(server_paths: Optional[List[str]] = None) -> bool:
    """Connect to MCP servers.

    Args:
        server_paths (List[str]): List of paths to MCP server scripts.
        If None, connects to all configured servers.

    Returns:
        bool: True if at least one server connected successfully.
    """
    return await mcp_manager.connect_to_servers(server_paths)
disable_tool(tool_name) staticmethod

Disable a specific tool.

Parameters:

Name Type Description Default
tool_name str

Name of the tool to disable.

required

Returns:

Name Type Description
bool bool

True if tool was disabled successfully.

Source code in hatchling/mcp_utils/mcp_server_api.py
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
@staticmethod
def disable_tool(tool_name: str) -> bool:
    """Disable a specific tool.

    Args:
        tool_name (str): Name of the tool to disable.

    Returns:
        bool: True if tool was disabled successfully.
    """
    # Get managed tools directly instead of using manager method
    managed_tools = mcp_manager._managed_tools

    if tool_name not in managed_tools:
        logger.warning(f"Tool '{tool_name}' not found in managed tools")
        return False

    tool_info = managed_tools[tool_name]

    if tool_info.status == MCPToolStatus.DISABLED:
        logger.debug(f"Tool '{tool_name}' is already disabled")
        return False

    # Disable the tool
    tool_info.status = MCPToolStatus.DISABLED
    tool_info.reason = MCPToolStatusReason.FROM_USER_DISABLED

    # Update timestamp
    import time
    tool_info.last_updated = time.time()

    # Publish event through manager's publisher
    mcp_manager._publish_tool_event(EventType.MCP_TOOL_DISABLED, tool_name, tool_info)

    logger.info(f"Disabled tool: {tool_name}")
    return True
disconnect_all_servers() async staticmethod

Disconnect from all MCP servers.

Source code in hatchling/mcp_utils/mcp_server_api.py
77
78
79
80
@staticmethod
async def disconnect_all_servers() -> None:
    """Disconnect from all MCP servers."""
    await mcp_manager.disconnect_all()
enable_tool(tool_name) staticmethod

Enable a specific tool.

Parameters:

Name Type Description Default
tool_name str

Name of the tool to enable.

required

Returns:

Name Type Description
bool bool

True if tool was enabled successfully.

Source code in hatchling/mcp_utils/mcp_server_api.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
@staticmethod
def enable_tool(tool_name: str) -> bool:
    """Enable a specific tool.

    Args:
        tool_name (str): Name of the tool to enable.

    Returns:
        bool: True if tool was enabled successfully.
    """
    # Get managed tools directly instead of using manager method
    managed_tools = mcp_manager._managed_tools

    if tool_name not in managed_tools:
        logger.warning(f"Tool '{tool_name}' not found in managed tools")
        return False

    tool_info = managed_tools[tool_name]

    if tool_info.status == MCPToolStatus.ENABLED:
        logger.debug(f"Tool '{tool_name}' is already enabled")
        return True

    # Check if the server is still available
    if tool_info.server_path not in mcp_manager.mcp_clients:
        logger.warning(f"Cannot enable tool '{tool_name}' - server is not connected")
        return False

    # Enable the tool
    tool_info.status = MCPToolStatus.ENABLED
    tool_info.reason = MCPToolStatusReason.FROM_USER_ENABLED

    # Update timestamp
    import time
    tool_info.last_updated = time.time()

    # Publish event through manager's publisher
    mcp_manager._publish_tool_event(EventType.MCP_TOOL_ENABLED, tool_name, tool_info)

    logger.info(f"Enabled tool: {tool_name}")
    return True
execute_tool_manually(tool_name, arguments) async staticmethod

Execute an MCP tool manually for debugging purposes.

Parameters:

Name Type Description Default
tool_name str

Name of the tool to execute.

required
arguments Dict[str, Any]

Arguments to pass to the tool.

required

Returns:

Type Description
Tuple[bool, Any, Optional[str]]

Tuple[bool, Any, Optional[str]]: Success flag, result, and error message if any.

Source code in hatchling/mcp_utils/mcp_server_api.py
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
@staticmethod
async def execute_tool_manually(tool_name: str, arguments: Dict[str, Any]) -> Tuple[bool, Any, Optional[str]]:
    """Execute an MCP tool manually for debugging purposes.

    Args:
        tool_name (str): Name of the tool to execute.
        arguments (Dict[str, Any]): Arguments to pass to the tool.

    Returns:
        Tuple[bool, Any, Optional[str]]: Success flag, result, and error message if any.
    """
    try:
        result = await mcp_manager.execute_tool(tool_name, arguments)
        return True, result, None
    except ConnectionError as e:
        return False, None, f"Connection error: {e}"
    except ValueError as e:
        return False, None, f"Tool not found: {e}"
    except Exception as e:
        return False, None, f"Execution error: {e}"
get_all_tools() staticmethod

Get list of all available MCP tools.

Returns:

Type Description
List[MCPToolSummary]

List[MCPToolSummary]: List of tool summaries.

Source code in hatchling/mcp_utils/mcp_server_api.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
@staticmethod
def get_all_tools() -> List[MCPToolSummary]:
    """Get list of all available MCP tools.

    Returns:
        List[MCPToolSummary]: List of tool summaries.
    """
    tools = []

    for tool_name, tool_info in mcp_manager.get_all_managed_tools().items():
        # Try to get tool description from the actual tool object
        description = None
        if tool_info.server_path in mcp_manager.mcp_clients:
            client = mcp_manager.mcp_clients[tool_info.server_path]
            tool_obj = client.tools.get(tool_name)
            if tool_obj and hasattr(tool_obj, 'description'):
                description = tool_obj.description

        tools.append(MCPToolSummary(
            name=tool_name,
            server_path=tool_info.server_path,
            status=tool_info.status,
            description=description,
            last_updated=tool_info.last_updated
        ))

    return tools
get_enabled_tools() staticmethod

Get list of enabled MCP tools.

Returns:

Type Description
List[MCPToolSummary]

List[MCPToolSummary]: List of enabled tool summaries.

Source code in hatchling/mcp_utils/mcp_server_api.py
175
176
177
178
179
180
181
182
@staticmethod
def get_enabled_tools() -> List[MCPToolSummary]:
    """Get list of enabled MCP tools.

    Returns:
        List[MCPToolSummary]: List of enabled tool summaries.
    """
    return [tool for tool in MCPServerAPI.get_all_tools() if tool.status == MCPToolStatus.ENABLED]
get_health_summary() staticmethod

Get overall health summary of MCP system.

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: Health summary including server and tool counts.

Source code in hatchling/mcp_utils/mcp_server_api.py
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
@staticmethod
def get_health_summary() -> Dict[str, Any]:
    """Get overall health summary of MCP system.

    Returns:
        Dict[str, Any]: Health summary including server and tool counts.
    """
    servers = MCPServerAPI.get_server_list()
    all_tools = MCPServerAPI.get_all_tools()

    return {
        "connected_servers": len(servers),
        "total_tools": len(all_tools),
        "enabled_tools": len([t for t in all_tools if t.status == MCPToolStatus.ENABLED]),
        "disabled_tools": len([t for t in all_tools if t.status == MCPToolStatus.DISABLED]),
        "server_details": [
            {
                "path": server.path,
                "status": server.status.value,
                "tools": server.tool_count,
                "enabled_tools": server.enabled_tool_count
            }
            for server in servers
        ]
    }
get_server_list() staticmethod

Get list of all configured MCP servers with their status.

Returns:

Type Description
List[MCPServerInfo]

List[MCPServerInfo]: List of server information.

Source code in hatchling/mcp_utils/mcp_server_api.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
@staticmethod
def get_server_list() -> List[MCPServerInfo]:
    """Get list of all configured MCP servers with their status.

    Returns:
        List[MCPServerInfo]: List of server information.
    """
    servers = []

    # Get connected servers
    for path, client in mcp_manager.mcp_clients.items():
        tool_count = len(client.tools)
        enabled_count = len([
            tool for tool in mcp_manager._managed_tools.values()
            if tool.server_path == path and tool.status == MCPToolStatus.ENABLED
        ])

        servers.append(MCPServerInfo(
            path=path,
            status=MCPServerStatus.CONNECTED,
            tool_count=tool_count,
            enabled_tool_count=enabled_count,
            last_connected=time.time()  # Approximate
        ))

    return servers
get_server_status(server_path) staticmethod

Get detailed status for a specific server.

Parameters:

Name Type Description Default
server_path str

Path to the MCP server script.

required

Returns:

Name Type Description
MCPServerInfo MCPServerInfo

Server status information.

Source code in hatchling/mcp_utils/mcp_server_api.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
@staticmethod
def get_server_status(server_path: str) -> MCPServerInfo:
    """Get detailed status for a specific server.

    Args:
        server_path (str): Path to the MCP server script.

    Returns:
        MCPServerInfo: Server status information.
    """
    if server_path in mcp_manager.mcp_clients:
        client = mcp_manager.mcp_clients[server_path]
        tool_count = len(client.tools)
        enabled_count = len([
            tool for tool in mcp_manager._managed_tools.values()
            if tool.server_path == server_path and tool.status == MCPToolStatus.ENABLED
        ])

        return MCPServerInfo(
            path=server_path,
            status=MCPServerStatus.CONNECTED,
            tool_count=tool_count,
            enabled_tool_count=enabled_count,
            last_connected=time.time()
        )
    else:
        return MCPServerInfo(
            path=server_path,
            status=MCPServerStatus.DISCONNECTED,
            tool_count=0,
            enabled_tool_count=0,
            error_message="Server not connected"
        )
get_session_citations() async staticmethod

Get citations for all servers used in the current session.

Returns:

Type Description
Dict[str, Dict[str, str]]

Dict[str, Dict[str, str]]: Server citations.

Source code in hatchling/mcp_utils/mcp_server_api.py
376
377
378
379
380
381
382
383
@staticmethod
async def get_session_citations() -> Dict[str, Dict[str, str]]:
    """Get citations for all servers used in the current session.

    Returns:
        Dict[str, Dict[str, str]]: Server citations.
    """
    return await mcp_manager.get_citations_for_session()
get_tool_info(tool_name) staticmethod

Get detailed information about a specific tool.

Parameters:

Name Type Description Default
tool_name str

Name of the tool.

required

Returns:

Type Description
Optional[MCPToolSummary]

Optional[MCPToolSummary]: Tool information if found.

Source code in hatchling/mcp_utils/mcp_server_api.py
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
@staticmethod
def get_tool_info(tool_name: str) -> Optional[MCPToolSummary]:
    """Get detailed information about a specific tool.

    Args:
        tool_name (str): Name of the tool.

    Returns:
        Optional[MCPToolSummary]: Tool information if found.
    """
    tool_info = mcp_manager.get_tool_status(tool_name)
    if not tool_info:
        return None

    # Get description from tool object
    description = None
    if tool_info.server_path in mcp_manager.mcp_clients:
        client = mcp_manager.mcp_clients[tool_info.server_path]
        tool_obj = client.tools.get(tool_name)
        if tool_obj and hasattr(tool_obj, 'description'):
            description = tool_obj.description

    return MCPToolSummary(
        name=tool_name,
        server_path=tool_info.server_path,
        status=tool_info.status,
        description=description,
        last_updated=tool_info.last_updated
    )
get_tool_schema(tool_name) staticmethod

Get the JSON schema for a tool's arguments.

Parameters:

Name Type Description Default
tool_name str

Name of the tool.

required

Returns:

Type Description
Optional[Dict[str, Any]]

Optional[Dict[str, Any]]: Tool schema if available.

Source code in hatchling/mcp_utils/mcp_server_api.py
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
@staticmethod
def get_tool_schema(tool_name: str) -> Optional[Dict[str, Any]]:
    """Get the JSON schema for a tool's arguments.

    Args:
        tool_name (str): Name of the tool.

    Returns:
        Optional[Dict[str, Any]]: Tool schema if available.
    """
    tool_info = mcp_manager.get_tool_status(tool_name)
    if not tool_info or tool_info.server_path not in mcp_manager.mcp_clients:
        return None

    return tool_info.schema
get_tools_by_server(server_path) staticmethod

Get all tools provided by a specific server.

Parameters:

Name Type Description Default
server_path str

Path to the MCP server script.

required

Returns:

Type Description
List[MCPToolSummary]

List[MCPToolSummary]: List of tools from the server.

Source code in hatchling/mcp_utils/mcp_server_api.py
184
185
186
187
188
189
190
191
192
193
194
@staticmethod
def get_tools_by_server(server_path: str) -> List[MCPToolSummary]:
    """Get all tools provided by a specific server.

    Args:
        server_path (str): Path to the MCP server script.

    Returns:
        List[MCPToolSummary]: List of tools from the server.
    """
    return [tool for tool in MCPServerAPI.get_all_tools() if tool.server_path == server_path]
reset_session_tracking() staticmethod

Reset session tracking for citations.

Source code in hatchling/mcp_utils/mcp_server_api.py
385
386
387
388
@staticmethod
def reset_session_tracking() -> None:
    """Reset session tracking for citations."""
    mcp_manager.reset_session_tracking()

MCPToolInfo dataclass

Information about an MCP tool in the lifecycle management system.

Source code in hatchling/mcp_utils/mcp_tool_data.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@dataclass
class MCPToolInfo:
    """Information about an MCP tool in the lifecycle management system."""

    name: str                           # Tool name
    description: str                    # Tool description  
    schema: Dict[str, Any]             # Tool schema/definition
    server_path: str                   # Path to the MCP server providing this tool
    status: MCPToolStatus              # Current status (enabled/disabled)
    reason: MCPToolStatusReason        # Reason for current status
    provider_format: Optional[Dict[str, Any]] = None  # Cached provider-specific format
    last_updated: Optional[float] = None              # Timestamp of last status update

    def __post_init__(self):
        """Set last_updated timestamp if not provided."""
        if self.last_updated is None:
            import time
            self.last_updated = time.time()
Functions
__post_init__()

Set last_updated timestamp if not provided.

Source code in hatchling/mcp_utils/mcp_tool_data.py
41
42
43
44
45
def __post_init__(self):
    """Set last_updated timestamp if not provided."""
    if self.last_updated is None:
        import time
        self.last_updated = time.time()

MCPToolStatus

Bases: Enum

Status of an MCP tool in the lifecycle management system.

Source code in hatchling/mcp_utils/mcp_tool_data.py
 6
 7
 8
 9
10
class MCPToolStatus(Enum):
    """Status of an MCP tool in the lifecycle management system."""

    ENABLED = "enabled"      # Tool is available for use
    DISABLED = "disabled"    # Tool is not available for use

MCPToolStatusReason

Bases: Enum

Reasons why an MCP tool has a particular status.

Source code in hatchling/mcp_utils/mcp_tool_data.py
13
14
15
16
17
18
19
20
21
22
23
24
25
class MCPToolStatusReason(Enum):
    """Reasons why an MCP tool has a particular status."""

    # Enabled reasons
    FROM_SERVER_UP = "server_up"              # Tool enabled because server came online
    FROM_USER_ENABLED = "user_enabled"        # Tool explicitly enabled by user
    FROM_SERVER_REACHABLE = "server_reachable"  # Tool enabled because server became reachable

    # Disabled reasons  
    FROM_SERVER_DOWN = "server_down"          # Tool disabled because server went down
    FROM_SERVER_UNREACHABLE = "unreachable"  # Tool disabled because server is unreachable
    FROM_USER_DISABLED = "user_disabled"     # Tool explicitly disabled by user
    FROM_SYSTEM_ERROR = "system_error"       # Tool disabled due to system error

Core MCP Components

hatchling.mcp_utils.client

Classes

MCPClient

Client for MCP servers that manages connections and tool execution.

Source code in hatchling/mcp_utils/client.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
class MCPClient:
    """Client for MCP servers that manages connections and tool execution."""
    def __init__(self, settings: AppSettings = None, python_executable_resolver: Optional[Callable[[], str]] = None):
        """Initialize the MCP client.

        Args:
            settings (AppSettings, optional): The application settings instance.
                                            If None, uses the singleton instance.
            python_executable_resolver (Callable[[], str], optional): Function to resolve the Python executable
                for the current environment. If None, defaults to "python".
        """
        self.client_id = str(uuid.uuid4())
        self.session: Optional[ClientSession] = None
        self.exit_stack = None  # Created in connection manager task
        self.connected = False
        self.tools = {}
        self.server_path = None
        self.read = None
        self.write = None

        # Store settings
        self.settings = settings or AppSettings.get_instance()

        # Python executable resolution
        self._python_executable_resolver = python_executable_resolver

        # Connection manager task and queue
        self._operation_queue = asyncio.Queue()
        self._manager_task = None
        self._manager_lock = asyncio.Lock()
        self._connection_task_id = None  # Track the ID of the task that manages connections

        # Add connection monitoring
        self._heartbeat_task = None
        self._reconnection_attempts = 0
        self.MAX_RECONNECTION_ATTEMPTS = 3
        self.RECONNECTION_DELAY = 2  # seconds

        # Get a debug log session from the LoggingManager
        self.logger = logging_manager.get_session(self.__class__.__name__,
                                  formatter=logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
    async def connect(self, server_path: str) -> bool:
        """Connect to an MCP server via stdio.

        Args:
            server_path (str): Path to the server script (.py file).

        Returns:
            bool: True if connection was successful, False otherwise.
        """
        # Start the connection manager if not running
        try:
            await self._start_connection_manager()

            # Create a future to get the result
            future = asyncio.Future()
            await self._operation_queue.put(("connect", [server_path], future))

            # Wait for the operation to complete
            return await asyncio.wait_for(future, timeout=30000)
        except asyncio.TimeoutError:
            self.logger.error(f"Connection attempt to {server_path} timed out after 30000 seconds")
            return False
        except Exception as e:
            self.logger.error(f"Error in connect operation: {e}")
            return False

    def _start_heartbeat(self):
        """Start a background task to periodically check connection health."""
        if self._heartbeat_task is None:
            self._heartbeat_task = asyncio.create_task(self._heartbeat_loop())
    async def _heartbeat_loop(self):
        """Periodically check if the connection is still alive."""
        try:
            while self.connected:
                await asyncio.sleep(30)  # Check every 30 seconds
                if not self.connected:
                    break

                try:
                    # Try a lightweight operation to check connection
                    if self.session:
                        await self.session.send_ping()
                        self.logger.debug("Connection heartbeat: OK")
                except Exception as e:
                    self.logger.warning(f"Connection heartbeat failed: {e}")
                    # Mark as disconnected without trying to reconnect
                    # This avoids task/context issues
                    self.logger.warning("Connection marked as failed - client needs to be reconnected")
                    self.connected = False
                    break

        except asyncio.CancelledError:
            self.logger.debug("Heartbeat task cancelled")
        except Exception as e:
            self.logger.error(f"Error in heartbeat task: {e}")
        finally:
            self.logger.debug("Heartbeat task stopped")
            self._heartbeat_task = None # _cleanup_connection has been replaced by _internal_cleanup in the connection manager

    async def disconnect(self):
        """Disconnect from the MCP server and clean up resources."""
        if not self.connected:
            return

        if not self._manager_task or self._manager_task.done():
            # If there's no manager task running, mark as disconnected directly
            self.connected = False
            self.session = None
            self.exit_stack = None
            self.tools = {}
            return

        try:
            # Create a future to get the result
            future = asyncio.Future()
            await self._operation_queue.put(("disconnect", [], future))

            # Wait for the operation to complete with timeout
            await asyncio.wait_for(future, timeout=10)
        except asyncio.TimeoutError:
            self.logger.error("Disconnect operation timed out after 10 seconds")
            # Even if disconnect fails, mark as disconnected
            self.connected = False
        except Exception as e:
            self.logger.error(f"Error in disconnect operation: {e}")
            # Even if disconnect fails, mark as disconnected
            self.connected = False

    async def get_citations(self) -> Dict[str, str]:
        """Get citations from the MCP server.

        Returns:
            Dict[str, str]: Dictionary with origin and MCP citations.
        """
        if not self.connected:
            raise ConnectionError("Not connected to MCP server")

        # Create a future to get the result
        future = asyncio.Future()
        await self._operation_queue.put(("get_citations", [], future))

        # Wait for the operation to complete
        try:
            return await future
        except Exception as e:
            self.logger.error(f"Error in get_citations operation: {e}")
            raise

    async def execute_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Any:
        """Execute an MCP tool by name with given arguments.

        Args:
            tool_name (str): Name of the tool to execute.
            arguments (Dict[str, Any): Arguments to pass to the tool.

        Returns:
            Any: Result of the tool execution.

        Raises:
            ConnectionError: If not connected to MCP server.
            ValueError: If the tool is not found.
            TimeoutError: If the tool execution times out.
            Exception: For any other errors during execution.
        """
        if not self.connected or not self.session:
            raise ConnectionError("Not connected to MCP server")

        if tool_name not in self.tools:
            raise ValueError(f"Tool '{tool_name}' not found")

        try:            
            # Execute the tool through the connection manager task
            future = asyncio.Future()
            await self._operation_queue.put(("execute_tool", [tool_name, arguments], future))

            # Wait for the result
            return await future

        except Exception as e:
            self.logger.error(f"Error executing tool {tool_name}: {str(e)}")
            raise

    async def _start_connection_manager(self):
        """Start the dedicated connection manager task if not already running."""
        async with self._manager_lock:
            # If there's a task but it's done, clean it up first
            if self._manager_task and self._manager_task.done():
                # Check if the task had an exception
                try:
                    exc = self._manager_task.exception()
                    if exc:
                        self.logger.error(f"Previous connection manager failed with: {exc}")
                except (asyncio.InvalidStateError, asyncio.CancelledError):
                    # Task was cancelled or is in an invalid state
                    pass

                self._manager_task = None

            # Create a new task if needed
            if self._manager_task is None:
                self.logger.debug("Starting connection manager task")
                self._manager_task = asyncio.create_task(self._connection_manager_loop())
                # Name the task for better debugging
                self._manager_task.set_name(f"mcp_connection_manager_{self.client_id[:8]}")

    async def _connection_manager_loop(self):
        """A dedicated task that handles all connection and disconnection operations.

        This ensures all operations with exit stacks and cancel scopes happen in the same task context.
        """
        # Store the task ID to track task context
        self._connection_task_id = id(asyncio.current_task())
        self.logger.debug(f"Connection manager task started with ID: {self._connection_task_id}")

        try:
            while True:
                # Get the next operation from the queue
                operation, args, future = await self._operation_queue.get()

                try:
                    self.logger.debug(f"Processing operation: {operation}")
                    if operation == "connect":
                        server_path = args[0]
                        result = await self._internal_connect(server_path)
                        future.set_result(result)
                    elif operation == "disconnect":
                        await self._internal_disconnect()
                        future.set_result(None)
                    elif operation == "execute_tool":
                        tool_name, arguments = args
                        result = await self._internal_execute_tool(tool_name, arguments)
                        future.set_result(result)
                    elif operation == "get_citations":
                        result = await self._internal_get_citations()
                        future.set_result(result)
                    else:
                        self.logger.warning(f"Unknown operation: {operation}")
                        future.set_exception(ValueError(f"Unknown operation: {operation}"))
                except Exception as e:
                    self.logger.error(f"Error processing operation {operation}: {e}")
                    future.set_exception(e)
                finally:
                    self._operation_queue.task_done()
        except asyncio.CancelledError:
            self.logger.debug(f"Connection manager task {self._connection_task_id} cancelled")
        except Exception as e:
            self.logger.error(f"Error in connection manager loop: {e}")
        finally:
            self.logger.debug(f"Connection manager task {self._connection_task_id} stopped")
            self._connection_task_id = None

    async def _internal_connect(self, server_path: str) -> bool:
        """Internal connect method that runs in the connection manager task.

        Args:
            server_path (str): Path to the server script.

        Returns:
            bool: True if connection was successful.
        """
        # Clean up any existing connection first
        await self._internal_cleanup()

        self.server_path = server_path
        # Create a fresh exit stack for this connection
        self.exit_stack = AsyncExitStack()

        # Create server parameters for stdio connection
        python_executable = self._get_python_executable()
        #env_vars = self._get_environment_vars()

        self.logger.debug(f"Using Python executable: {python_executable}")

        server_params = StdioServerParameters(
            command=python_executable,
            args=[server_path],
            #env=env_vars,
        )

        self.logger.debug(f"Connecting to MCP server: {server_path} using Python: {python_executable}")
        try:
            # Follow the exact sequence from the working example
            stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
            self.read, self.write = stdio_transport
            session = await self.exit_stack.enter_async_context(ClientSession(self.read, self.write))
            await session.initialize()
            self.session = session
            self.connected = True

            # List available tools
            response = await self.session.list_tools()
            tools = response.tools

            # Store tools in a dictionary for easy access
            for tool in tools:
                self.tools[tool.name] = tool

            self.logger.info(f"Connected to MCP server: {server_path}")
            self.logger.info(f"Discovered {len(self.tools)} tools: {', '.join(self.tools.keys())}")

            # List available citations
            citations = await self._internal_get_citations()
            self.logger.info("Tool Origin Citation: " + citations["origin"])
            self.logger.info("MCP Implementation Citation: " + citations["mcp"])

            # Start heartbeat task to monitor connection
            self._start_heartbeat()

            return True

        except asyncio.CancelledError:
            self.logger.warning("Connection attempt cancelled")
            await self._internal_cleanup()
            raise
        except Exception as e:
            self.logger.error(f"Failed to connect to MCP server at {server_path}: {str(e)}")
            await self._internal_cleanup()
            return False

    async def _internal_disconnect(self):
        """Internal disconnect method that runs in the connection manager task."""
        if not self.connected:
            return

        self.logger.debug(f"Disconnecting from MCP server: {self.server_path} in task {self._connection_task_id}")

        # Cancel heartbeat task first
        if self._heartbeat_task:
            self._heartbeat_task.cancel()
            try:
                # Wait for task to fully cancel (but don't wait indefinitely)
                await asyncio.wait_for(asyncio.shield(self._heartbeat_task), timeout=2)
            except (asyncio.TimeoutError, asyncio.CancelledError):
                # It's okay if the task times out or is cancelled
                pass
            finally:
                self._heartbeat_task = None

        # Clean up resources
        try:
            await self._internal_cleanup()
        except Exception as e:
            self.logger.error(f"Error during disconnection cleanup: {e}")
            # Even if cleanup fails, continue with state reset

        # Reset additional state
        self.tools = {}
        self._reconnection_attempts = 0

        self.logger.info(f"Disconnected from MCP server: {self.server_path}")

    async def _internal_cleanup(self):
        """Internal cleanup method that runs in the connection manager task."""
        self.logger.debug(f"Starting connection cleanup in task {self._connection_task_id}")

        try:
            if self.exit_stack:
                self.logger.debug("Closing exit stack from the same task that created it")
                await self.exit_stack.aclose()
        except asyncio.CancelledError:
            self.logger.warning("Cleanup interrupted by cancellation")
            raise  # Re-raise to allow proper handling
        except Exception as e:
            self.logger.error(f"Error during connection cleanup: {e}")
        finally:
            # Always reset these regardless of success/failure
            self.exit_stack = None
            self.session = None
            self.read = None
            self.write = None
            self.connected = False
            self.logger.debug("Connection state reset completed")

    async def _internal_execute_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Any:
        """Internal execute tool method that runs in the connection manager task."""
        if not self.connected or not self.session:
            raise ConnectionError("Not connected to MCP server")

        if tool_name not in self.tools:
            raise ValueError(f"Tool '{tool_name}' not found")

        try:
            # Execute the tool with timeout
            result = await asyncio.wait_for(
                self.session.call_tool(name=tool_name, arguments=arguments),
                timeout=self.settings.tool_calling.max_tool_working_time
            )

            # Extract the result value from the response object if needed
            if hasattr(result, 'result'):
                return result.result
            else:
                return result

        except asyncio.TimeoutError:
            self.logger.error(f"Tool execution timed out: {tool_name}")
            raise TimeoutError(f"Execution of tool {tool_name} timed out after {self.settings.tool_calling.max_tool_working_time} seconds")

        except Exception as e:
            self.logger.error(f"Error executing tool {tool_name}: {str(e)}")
            raise

    async def _internal_get_citations(self) -> Dict[str, str]:
        """Internal get citations method that runs in the connection manager task."""
        if not self.connected or not self.session:
            raise ConnectionError("Not connected to MCP server")

        citations = {
            "server_name": "None",
            "origin": "Citation not available",
            "mcp": "Citation not available"
        }

        try:
            # Extract server name from the path server uri
            try:
                server_name_uri = f"name://{self.server_path[1:]}"
                server_name_response = await self.session.read_resource(uri=server_name_uri)
                if server_name_response and server_name_response.contents:
                    citations["server_name"] = server_name_response.contents[0].text
                    self.logger.debug(f"Retrieved server name from {server_name_uri}: {citations['server_name']}")
            except Exception as e:
                self.logger.error(f"Failed to get server name: {e}")

            # Try to read origin citation
            try:
                origin_uri = f"citation://origin/{citations['server_name']}"
                origin_response = await self.session.read_resource(uri=origin_uri)
                if origin_response and origin_response.contents:
                    citations["origin"] = origin_response.contents[0].text
                    self.logger.debug(f"Retrieved origin citation from {origin_uri}")
            except Exception as e:
                self.logger.error(f"Failed to get origin citation: {e}")

            # Try to read MCP citation
            try:
                mcp_uri = f"citation://mcp/{citations['server_name']}"
                mcp_response = await self.session.read_resource(uri=mcp_uri)
                if mcp_response and mcp_response.contents:
                    citations["mcp"] = mcp_response.contents[0].text
                    self.logger.debug(f"Retrieved MCP citation from {mcp_uri}")
            except Exception as e:
                self.logger.error(f"Failed to get MCP citation: {e}")

        except Exception as e:
            self.logger.error(f"Error retrieving citations: {e}")

        return citations

    async def _stop_connection_manager(self):
        """Stop the connection manager task gracefully."""
        if self._manager_task and not self._manager_task.done():
            self.logger.debug("Stopping connection manager task")

            # Cancel the task
            self._manager_task.cancel()

            try:
                # Wait for the task to finish with timeout
                await asyncio.wait_for(self._manager_task, timeout=5)
            except (asyncio.CancelledError, asyncio.TimeoutError):
                self.logger.warning("Connection manager task cancellation timed out or was cancelled")
            except Exception as e:
                self.logger.error(f"Error stopping connection manager task: {e}")

            self._manager_task = None

    def _get_python_executable(self) -> str:
        """Get the appropriate Python executable for the current environment.

        Returns:
            str: Path to Python executable, falls back to "python" if no resolver provided.
        """
        if self._python_executable_resolver:
            try:
                python_executable = self._python_executable_resolver()
                self.logger.debug(f"Resolved Python executable: {python_executable}")
                return python_executable
            except Exception as e:
                self.logger.warning(f"Failed to resolve Python executable: {e}, falling back to 'python'")

        # Fallback to default Python command
        self.logger.debug("Using default Python executable: python")
        return "python"

    def _get_environment_vars(self) -> Optional[Dict[str, str]]:
        """Get environment variables for the MCP server process.

        Returns:
            Dict[str, str]: Environment variables to use, or None to use default environment.
        """
        # For now, use the current environment
        # Future enhancement: could get environment-specific variables from resolver
        return os.environ.copy()
Functions
__init__(settings=None, python_executable_resolver=None)

Initialize the MCP client.

Parameters:

Name Type Description Default
settings AppSettings

The application settings instance. If None, uses the singleton instance.

None
python_executable_resolver Callable[[], str]

Function to resolve the Python executable for the current environment. If None, defaults to "python".

None
Source code in hatchling/mcp_utils/client.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def __init__(self, settings: AppSettings = None, python_executable_resolver: Optional[Callable[[], str]] = None):
    """Initialize the MCP client.

    Args:
        settings (AppSettings, optional): The application settings instance.
                                        If None, uses the singleton instance.
        python_executable_resolver (Callable[[], str], optional): Function to resolve the Python executable
            for the current environment. If None, defaults to "python".
    """
    self.client_id = str(uuid.uuid4())
    self.session: Optional[ClientSession] = None
    self.exit_stack = None  # Created in connection manager task
    self.connected = False
    self.tools = {}
    self.server_path = None
    self.read = None
    self.write = None

    # Store settings
    self.settings = settings or AppSettings.get_instance()

    # Python executable resolution
    self._python_executable_resolver = python_executable_resolver

    # Connection manager task and queue
    self._operation_queue = asyncio.Queue()
    self._manager_task = None
    self._manager_lock = asyncio.Lock()
    self._connection_task_id = None  # Track the ID of the task that manages connections

    # Add connection monitoring
    self._heartbeat_task = None
    self._reconnection_attempts = 0
    self.MAX_RECONNECTION_ATTEMPTS = 3
    self.RECONNECTION_DELAY = 2  # seconds

    # Get a debug log session from the LoggingManager
    self.logger = logging_manager.get_session(self.__class__.__name__,
                              formatter=logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
connect(server_path) async

Connect to an MCP server via stdio.

Parameters:

Name Type Description Default
server_path str

Path to the server script (.py file).

required

Returns:

Name Type Description
bool bool

True if connection was successful, False otherwise.

Source code in hatchling/mcp_utils/client.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
async def connect(self, server_path: str) -> bool:
    """Connect to an MCP server via stdio.

    Args:
        server_path (str): Path to the server script (.py file).

    Returns:
        bool: True if connection was successful, False otherwise.
    """
    # Start the connection manager if not running
    try:
        await self._start_connection_manager()

        # Create a future to get the result
        future = asyncio.Future()
        await self._operation_queue.put(("connect", [server_path], future))

        # Wait for the operation to complete
        return await asyncio.wait_for(future, timeout=30000)
    except asyncio.TimeoutError:
        self.logger.error(f"Connection attempt to {server_path} timed out after 30000 seconds")
        return False
    except Exception as e:
        self.logger.error(f"Error in connect operation: {e}")
        return False
disconnect() async

Disconnect from the MCP server and clean up resources.

Source code in hatchling/mcp_utils/client.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
async def disconnect(self):
    """Disconnect from the MCP server and clean up resources."""
    if not self.connected:
        return

    if not self._manager_task or self._manager_task.done():
        # If there's no manager task running, mark as disconnected directly
        self.connected = False
        self.session = None
        self.exit_stack = None
        self.tools = {}
        return

    try:
        # Create a future to get the result
        future = asyncio.Future()
        await self._operation_queue.put(("disconnect", [], future))

        # Wait for the operation to complete with timeout
        await asyncio.wait_for(future, timeout=10)
    except asyncio.TimeoutError:
        self.logger.error("Disconnect operation timed out after 10 seconds")
        # Even if disconnect fails, mark as disconnected
        self.connected = False
    except Exception as e:
        self.logger.error(f"Error in disconnect operation: {e}")
        # Even if disconnect fails, mark as disconnected
        self.connected = False
execute_tool(tool_name, arguments) async

Execute an MCP tool by name with given arguments.

Parameters:

Name Type Description Default
tool_name str

Name of the tool to execute.

required
arguments Dict[str, Any

Arguments to pass to the tool.

required

Returns:

Name Type Description
Any Any

Result of the tool execution.

Raises:

Type Description
ConnectionError

If not connected to MCP server.

ValueError

If the tool is not found.

TimeoutError

If the tool execution times out.

Exception

For any other errors during execution.

Source code in hatchling/mcp_utils/client.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
async def execute_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Any:
    """Execute an MCP tool by name with given arguments.

    Args:
        tool_name (str): Name of the tool to execute.
        arguments (Dict[str, Any): Arguments to pass to the tool.

    Returns:
        Any: Result of the tool execution.

    Raises:
        ConnectionError: If not connected to MCP server.
        ValueError: If the tool is not found.
        TimeoutError: If the tool execution times out.
        Exception: For any other errors during execution.
    """
    if not self.connected or not self.session:
        raise ConnectionError("Not connected to MCP server")

    if tool_name not in self.tools:
        raise ValueError(f"Tool '{tool_name}' not found")

    try:            
        # Execute the tool through the connection manager task
        future = asyncio.Future()
        await self._operation_queue.put(("execute_tool", [tool_name, arguments], future))

        # Wait for the result
        return await future

    except Exception as e:
        self.logger.error(f"Error executing tool {tool_name}: {str(e)}")
        raise
get_citations() async

Get citations from the MCP server.

Returns:

Type Description
Dict[str, str]

Dict[str, str]: Dictionary with origin and MCP citations.

Source code in hatchling/mcp_utils/client.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
async def get_citations(self) -> Dict[str, str]:
    """Get citations from the MCP server.

    Returns:
        Dict[str, str]: Dictionary with origin and MCP citations.
    """
    if not self.connected:
        raise ConnectionError("Not connected to MCP server")

    # Create a future to get the result
    future = asyncio.Future()
    await self._operation_queue.put(("get_citations", [], future))

    # Wait for the operation to complete
    try:
        return await future
    except Exception as e:
        self.logger.error(f"Error in get_citations operation: {e}")
        raise

hatchling.mcp_utils.manager

Classes

MCPManager

Centralized manager for everything MCP-related: servers, clients, and adapters.

Source code in hatchling/mcp_utils/manager.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
class MCPManager:
    """Centralized manager for everything MCP-related: servers, clients, and adapters."""

    _instance = None

    def __new__(cls):
        """Ensure singleton pattern implementation.

        Returns:
            MCPManager: The singleton instance of the MCPManager.
        """
        if cls._instance is None:
            cls._instance = super(MCPManager, cls).__new__(cls)
            cls._instance._initialized = False
        return cls._instance

    def __init__(self, settings: Optional[AppSettings] = None):
        """Initialize the MCP manager if not already initialized."""
        if self._initialized:
            return

        # Initialize only once
        self._initialized = True

        # Connection tracking
        self.mcp_clients: Dict[str, MCPClient] = {}
        self.server_processes: Dict[str, subprocess.Popen] = {}
        self._connection_lock = asyncio.Lock()

        # Hatchling settings registry
        self.settings = settings or AppSettings.get_instance()

        # Tool tracking
        self._tool_client_map: Dict[str, MCPClient] = {}  # Map of tool names to clients that provide them

        # Hatch server usage
        self._used_servers_in_session = set()

        # Environment context for Python executable
        self.hatch_env_manager = HatchEnvironmentManager(
            environments_dir=self.settings.paths.envs_dir,
            cache_ttl=86400,  # 1 day default
        )

        # Event publishing capabilities
        self._event_publisher = EventPublisher()

        # Tool management for lifecycle events
        self._managed_tools: Dict[str, MCPToolInfo] = {}  # Tool name -> MCPToolInfo

        # Get a debug log session
        self.logger = logging_manager.get_session(self.__class__.__name__,
                                  formatter=logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))

    @property
    def publisher(self) -> EventPublisher:
        """Access to the EventPublisher for MCP lifecycle events.

        Returns:
            EventPublisher: The publisher for MCP-related events.
        """
        return self._event_publisher

    @property
    def is_connected(self) -> bool:
        """Check if the manager has any active connections to MCP servers.

        Returns:
            bool: True if connected to at least one MCP server, False otherwise.
        """
        return bool(self.mcp_clients)

    def _publish_server_event(self, event_type: EventType, server_path: str, **additional_data) -> None:
        """Publish a server lifecycle event.

        Args:
            event_type (EventType): Type of server event to publish.
            server_path (str): Path to the server that triggered the event.
            **additional_data: Additional data to include in the event.
        """
        event_data = {
            "server_path": server_path,
            **additional_data
        }
        self._event_publisher.publish(event_type, event_data)
        self.logger.debug(f"Published {event_type.value} event for server: {server_path}")

    def _publish_tool_event(self, event_type: EventType, tool_name: str, 
                           tool_info: MCPToolInfo, **additional_data) -> None:
        """Publish a tool lifecycle event.

        Args:
            event_type (EventType): Type of tool event to publish.
            tool_name (str): Name of the tool that triggered the event.
            tool_info (MCPToolInfo): MCP Tool information.
            **additional_data: Additional data to include in the event.
        """
        event_data = {
            "tool_name": tool_name,
            "tool_info": tool_info,
            **additional_data
        }

        self._event_publisher.publish(event_type, event_data)
        self.logger.debug(f"Published {event_type.value} event for tool: {tool_name}")

    def validate_server_paths(self, server_paths: List[str]) -> List[str]:
        """Validate server paths and return the list of valid absolute paths.

        Args:
            server_paths (List[str]): List of server paths to validate.

        Returns:
            List[str]: List of valid absolute paths.
        """
        valid_paths = []
        for path in server_paths:
            # Convert to absolute path if relative
            abs_path = os.path.abspath(path)

            # Check if file exists
            if not os.path.isfile(abs_path):
                self.logger.error(f"MCP server script not found: {abs_path}")
                continue

            valid_paths.append(abs_path)

        return valid_paths

    async def connect_to_servers(self, server_paths: Optional[List[str]] = None) -> bool:
        """Connect to all configured MCP servers.

        Args:
            server_paths (Optional[List[str]]): List of paths to MCP server scripts.
            If None, connects to all configured servers.

        Returns:
            bool: False if not valid server paths are provided
        """

        if server_paths is None:
            all_server_paths = self.hatch_env_manager.get_servers_entry_points()
            # filter out server paths that are already connected
            server_paths = []
            for path in all_server_paths:
                already_connected = False
                for client in self.mcp_clients.values():
                    if client.server_path == path:
                        self.logger.debug(f"Skipping already connected server: {path}")
                        already_connected = True
                        break
                if not already_connected:
                    server_paths.append(path)

        if not server_paths:
            self.logger.info("No new MCP servers to connect to. All configured servers are already connected.")
            return True

        async with self._connection_lock:
            # Validate server paths
            valid_paths = self.validate_server_paths(server_paths)
            if not valid_paths:
                self.logger.error(". Check the paths provided.")
                return False

            # Connect to each valid server path
            for path in valid_paths:
                # Create client with environment resolver
                client = MCPClient(settings=self.settings, python_executable_resolver=self._get_python_executable)
                is_connected = await client.connect(path)
                if is_connected:
                    self.mcp_clients[path] = client

                    # Publish server up event
                    self._publish_server_event(EventType.MCP_SERVER_UP, path, 
                                             tool_count=len(client.tools))

                    # Cache tool mappings and create MCPToolInfo for each tool
                    for tool_name, tool_obj in client.tools.items():
                        self._tool_client_map[tool_name] = client

                        # Create MCPToolInfo for lifecycle management
                        tool_info = MCPToolInfo(
                            name=tool_name,
                            description=getattr(tool_obj, 'description', f'No description available'),
                            schema=getattr(tool_obj, 'No argument schema available', {}),
                            server_path=path,
                            status=MCPToolStatus.ENABLED,
                            reason=MCPToolStatusReason.FROM_SERVER_UP
                        )
                        self._managed_tools[tool_name] = tool_info

                        # Publish tool enabled event
                        self._publish_tool_event(EventType.MCP_TOOL_ENABLED, tool_name, tool_info)
                else:
                    # Publish server unreachable event
                    self._publish_server_event(EventType.MCP_SERVER_UNREACHABLE, path,
                                             error="Failed to connect")

            if len(self.mcp_clients) > 0:
                # Log the available tools across all clients
                total_tools = sum(len(client.tools) for client in self.mcp_clients.values())
                self.logger.info(f"Connected to {len(self.mcp_clients)} MCP servers with {total_tools} total tools")
            else:
                self.logger.warning("Failed to connect to any MCP server")

            return True

    async def disconnect_all(self) -> None:
        """Disconnect from all MCP servers."""

        async with self._connection_lock:
            # Store the current task for debugging
            current_task_id = id(asyncio.current_task())
            self.logger.debug(f"Disconnecting all clients from task: {current_task_id}")

            disconnection_errors = False

            # First try the graceful disconnect approach
            for path, client in list(self.mcp_clients.items()):
                try:
                    # Disable all tools from this server before disconnecting
                    for tool_name, tool_info in self._managed_tools.items():
                        if tool_info.server_path == path and tool_info.status == MCPToolStatus.ENABLED:
                            tool_info.status = MCPToolStatus.DISABLED
                            tool_info.reason = MCPToolStatusReason.FROM_SERVER_DOWN

                            # Publish tool disabled event
                            self._publish_tool_event(EventType.MCP_TOOL_DISABLED, tool_name, tool_info)

                    # Log task context for debugging
                    if hasattr(client, '_connection_task_id') and client._connection_task_id:
                        self.logger.debug(f"Client for {path} was created in task: {client._connection_task_id}")

                    # Try graceful disconnect first with timeout
                    disconnect_task = asyncio.create_task(client.disconnect())
                    try:
                        await asyncio.wait_for(disconnect_task, timeout=10)
                        # Publish server down event on successful disconnect
                        self._publish_server_event(EventType.MCP_SERVER_DOWN, path)
                    except asyncio.TimeoutError:
                        self.logger.warning(f"Disconnect timeout for {path}")
                        disconnection_errors = True
                        # Publish server unreachable event
                        self._publish_server_event(EventType.MCP_SERVER_UNREACHABLE, path,
                                                 error="Disconnect timeout")
                    except Exception as e:
                        self.logger.error(f"Error during graceful disconnect for {path}: {e}")
                        disconnection_errors = True
                        # Publish server unreachable event
                        self._publish_server_event(EventType.MCP_SERVER_UNREACHABLE, path,
                                                 error=str(e))
                except Exception as e:
                    self.logger.error(f"Error setting up disconnect for {path}: {e}")
                    disconnection_errors = True

            # If any disconnections failed with errors, use forceful termination
            if disconnection_errors:
                self.logger.warning("Some disconnections failed. Using forceful termination as fallback.")
                self._terminate_server_processes()

            # Clear all client tracking regardless of disconnection success
            self.mcp_clients = {}
            self._tool_client_map = {}

            # Clear managed tools
            self._managed_tools = {}

            self.logger.info("Disconnected from all MCP servers")

    def _terminate_server_processes(self) -> None:
        """Terminate all server processes directly.
        This is a fallback mechanism when graceful disconnection fails.
        """
        terminated_count = 0

        # Kill all server processes
        for path, process in list(self.server_processes.items()):
            if process.poll() is None:  # Process is still running
                try:
                    # Send SIGTERM first for cleaner shutdown
                    process.terminate()

                    # Give it a moment to terminate
                    try:
                        process.wait(timeout=2)
                    except subprocess.TimeoutExpired:
                        # If it doesn't terminate in time, force kill
                        process.kill()
                        self.logger.warning(f"Force killed MCP server process: {path}")

                    terminated_count += 1
                    self.logger.debug(f"Terminated MCP server process: {path}")
                except Exception as e:
                    self.logger.error(f"Failed to terminate process for {path}: {e}")

            # Remove from tracking regardless of kill success
            del self.server_processes[path]

        if terminated_count > 0:
            self.logger.info(f"Forcefully terminated {terminated_count} server processes")

    def get_enabled_tools(self) -> Dict[str, MCPToolInfo]:
        """Get all enabled tools with their information.

        Returns:
            Dict[str, MCPToolInfo]: Dictionary mapping tool names to enabled MCPToolInfo objects.
        """
        return {
            name: info for name, info in self._managed_tools.items()
            if info.status == MCPToolStatus.ENABLED
        }

    def get_all_managed_tools(self) -> Dict[str, MCPToolInfo]:
        """Get all managed tools (both enabled and disabled).

        Returns:
            Dict[str, MCPToolInfo]: Dictionary mapping tool names to all MCPToolInfo objects.
        """
        return self._managed_tools.copy()

    def enable_tool(self, tool_name: str) -> bool:
        """Enable a specific tool if it exists and is disabled.

        Args:
            tool_name (str): Name of the tool to enable.

        Returns:
            bool: True if the tool was enabled, False if it was already enabled or doesn't exist.
        """
        if tool_name not in self._managed_tools:
            self.logger.warning(f"Tool '{tool_name}' not found in managed tools")
            return False

        tool_info = self._managed_tools[tool_name]

        if tool_info.status == MCPToolStatus.ENABLED:
            self.logger.debug(f"Tool '{tool_name}' is already enabled")
            return False

        # Check if the server is still available
        if tool_info.server_path not in self.mcp_clients:
            self.logger.warning(f"Cannot enable tool '{tool_name}' - server is not connected")
            return False

        # Enable the tool
        tool_info.status = MCPToolStatus.ENABLED
        tool_info.reason = MCPToolStatusReason.FROM_USER_ENABLED

        # Update timestamp
        import time
        tool_info.last_updated = time.time()

        # Publish event
        self._publish_tool_event(EventType.MCP_TOOL_ENABLED, tool_name, tool_info)

        self.logger.info(f"Enabled tool: {tool_name}")
        return True

    def disable_tool(self, tool_name: str) -> bool:
        """Disable a specific tool if it exists and is enabled.

        Args:
            tool_name (str): Name of the tool to disable.

        Returns:
            bool: True if the tool was disabled, False if it was already disabled or doesn't exist.
        """
        if tool_name not in self._managed_tools:
            self.logger.warning(f"Tool '{tool_name}' not found in managed tools")
            return False

        tool_info = self._managed_tools[tool_name]

        if tool_info.status == MCPToolStatus.DISABLED:
            self.logger.debug(f"Tool '{tool_name}' is already disabled")
            return False

        # Disable the tool
        tool_info.status = MCPToolStatus.DISABLED
        tool_info.reason = MCPToolStatusReason.FROM_USER_DISABLED

        # Update timestamp
        import time
        tool_info.last_updated = time.time()

        # Publish event
        self._publish_tool_event(EventType.MCP_TOOL_DISABLED, tool_name, tool_info)

        self.logger.info(f"Disabled tool: {tool_name}")
        return True

    def get_tool_status(self, tool_name: str) -> Optional[MCPToolInfo]:
        """Get the current status and information for a tool.

        Args:
            tool_name (str): Name of the tool to get status for.

        Returns:
            Optional[MCPToolInfo]: Tool information if found, None otherwise.
        """
        return self._managed_tools.get(tool_name)

    async def execute_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Any:
        """Execute a tool by name with the given arguments.

        Args:
            tool_name (str): Name of the tool to execute.
            arguments (Dict[str, Any]): Arguments to pass to the tool.

        Returns:
            Any: Result of the tool execution.

        Raises:
            ConnectionError: If not connected to any MCP server.
            ValueError: If the tool is not found in any connected MCP server.
        """
        if not self.mcp_clients:
            raise ConnectionError("Not connected to any MCP server")

        if tool_name not in self._tool_client_map:
            raise ValueError(f"Tool '{tool_name}' not found in any connected MCP server")

        client = self._tool_client_map[tool_name]
        self._used_servers_in_session.add(client.server_path)

        try:
            return await client.execute_tool(tool_name, arguments)

        except ConnectionError:
            # mark the server as unreachable
            self.logger.error(f"Failed to execute tool '{tool_name}' - server {client.server_path} is unreachable")
            self._publish_server_event(EventType.MCP_SERVER_UNREACHABLE, client.server_path,
                                      error="Failed to execute tool")

    async def get_citations_for_session(self) -> Dict[str, Dict[str, str]]:
        """Get citations for all servers used in the current session.

        Returns:
            Dict[str, Dict[str, str]]: Dictionary of citations for each server.
        """
        citations = {}

        for path in self._used_servers_in_session:
            if path in self.mcp_clients:
                client = self.mcp_clients[path]
                try:
                    server_citations = await client.get_citations()
                    citations[path] = server_citations
                except Exception as e:
                    self.logger.error(f"Error getting citations for {path}: {e}")

        return citations

    def reset_session_tracking(self):
        """Reset the tracking of which servers were used."""
        self._used_servers_in_session.clear()

    def _get_python_executable(self) -> str:
        """Get the appropriate Python executable for the current environment.

        Returns:
            str: Path to Python executable, falls back to system Python.
        """
        if self.hatch_env_manager:
            current_env = self.hatch_env_manager.get_current_environment()
            if current_env:
                python_env_info = self.hatch_env_manager.get_python_environment_info(current_env)
                if python_env_info:
                    python_executable = python_env_info.get("python_executable")
                    if python_executable:
                        self.logger.debug(f"Using environment Python for {current_env}: {python_executable}")
                        return python_executable

        # Fallback to system Python
        system_python = sys.executable
        self.logger.debug(f"Using system Python: {system_python}")
        return system_python
Attributes
is_connected property

Check if the manager has any active connections to MCP servers.

Returns:

Name Type Description
bool bool

True if connected to at least one MCP server, False otherwise.

publisher property

Access to the EventPublisher for MCP lifecycle events.

Returns:

Name Type Description
EventPublisher EventPublisher

The publisher for MCP-related events.

Functions
__init__(settings=None)

Initialize the MCP manager if not already initialized.

Source code in hatchling/mcp_utils/manager.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def __init__(self, settings: Optional[AppSettings] = None):
    """Initialize the MCP manager if not already initialized."""
    if self._initialized:
        return

    # Initialize only once
    self._initialized = True

    # Connection tracking
    self.mcp_clients: Dict[str, MCPClient] = {}
    self.server_processes: Dict[str, subprocess.Popen] = {}
    self._connection_lock = asyncio.Lock()

    # Hatchling settings registry
    self.settings = settings or AppSettings.get_instance()

    # Tool tracking
    self._tool_client_map: Dict[str, MCPClient] = {}  # Map of tool names to clients that provide them

    # Hatch server usage
    self._used_servers_in_session = set()

    # Environment context for Python executable
    self.hatch_env_manager = HatchEnvironmentManager(
        environments_dir=self.settings.paths.envs_dir,
        cache_ttl=86400,  # 1 day default
    )

    # Event publishing capabilities
    self._event_publisher = EventPublisher()

    # Tool management for lifecycle events
    self._managed_tools: Dict[str, MCPToolInfo] = {}  # Tool name -> MCPToolInfo

    # Get a debug log session
    self.logger = logging_manager.get_session(self.__class__.__name__,
                              formatter=logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
__new__()

Ensure singleton pattern implementation.

Returns:

Name Type Description
MCPManager

The singleton instance of the MCPManager.

Source code in hatchling/mcp_utils/manager.py
26
27
28
29
30
31
32
33
34
35
def __new__(cls):
    """Ensure singleton pattern implementation.

    Returns:
        MCPManager: The singleton instance of the MCPManager.
    """
    if cls._instance is None:
        cls._instance = super(MCPManager, cls).__new__(cls)
        cls._instance._initialized = False
    return cls._instance
connect_to_servers(server_paths=None) async

Connect to all configured MCP servers.

Parameters:

Name Type Description Default
server_paths Optional[List[str]]

List of paths to MCP server scripts.

None

Returns:

Name Type Description
bool bool

False if not valid server paths are provided

Source code in hatchling/mcp_utils/manager.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
async def connect_to_servers(self, server_paths: Optional[List[str]] = None) -> bool:
    """Connect to all configured MCP servers.

    Args:
        server_paths (Optional[List[str]]): List of paths to MCP server scripts.
        If None, connects to all configured servers.

    Returns:
        bool: False if not valid server paths are provided
    """

    if server_paths is None:
        all_server_paths = self.hatch_env_manager.get_servers_entry_points()
        # filter out server paths that are already connected
        server_paths = []
        for path in all_server_paths:
            already_connected = False
            for client in self.mcp_clients.values():
                if client.server_path == path:
                    self.logger.debug(f"Skipping already connected server: {path}")
                    already_connected = True
                    break
            if not already_connected:
                server_paths.append(path)

    if not server_paths:
        self.logger.info("No new MCP servers to connect to. All configured servers are already connected.")
        return True

    async with self._connection_lock:
        # Validate server paths
        valid_paths = self.validate_server_paths(server_paths)
        if not valid_paths:
            self.logger.error(". Check the paths provided.")
            return False

        # Connect to each valid server path
        for path in valid_paths:
            # Create client with environment resolver
            client = MCPClient(settings=self.settings, python_executable_resolver=self._get_python_executable)
            is_connected = await client.connect(path)
            if is_connected:
                self.mcp_clients[path] = client

                # Publish server up event
                self._publish_server_event(EventType.MCP_SERVER_UP, path, 
                                         tool_count=len(client.tools))

                # Cache tool mappings and create MCPToolInfo for each tool
                for tool_name, tool_obj in client.tools.items():
                    self._tool_client_map[tool_name] = client

                    # Create MCPToolInfo for lifecycle management
                    tool_info = MCPToolInfo(
                        name=tool_name,
                        description=getattr(tool_obj, 'description', f'No description available'),
                        schema=getattr(tool_obj, 'No argument schema available', {}),
                        server_path=path,
                        status=MCPToolStatus.ENABLED,
                        reason=MCPToolStatusReason.FROM_SERVER_UP
                    )
                    self._managed_tools[tool_name] = tool_info

                    # Publish tool enabled event
                    self._publish_tool_event(EventType.MCP_TOOL_ENABLED, tool_name, tool_info)
            else:
                # Publish server unreachable event
                self._publish_server_event(EventType.MCP_SERVER_UNREACHABLE, path,
                                         error="Failed to connect")

        if len(self.mcp_clients) > 0:
            # Log the available tools across all clients
            total_tools = sum(len(client.tools) for client in self.mcp_clients.values())
            self.logger.info(f"Connected to {len(self.mcp_clients)} MCP servers with {total_tools} total tools")
        else:
            self.logger.warning("Failed to connect to any MCP server")

        return True
disable_tool(tool_name)

Disable a specific tool if it exists and is enabled.

Parameters:

Name Type Description Default
tool_name str

Name of the tool to disable.

required

Returns:

Name Type Description
bool bool

True if the tool was disabled, False if it was already disabled or doesn't exist.

Source code in hatchling/mcp_utils/manager.py
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
def disable_tool(self, tool_name: str) -> bool:
    """Disable a specific tool if it exists and is enabled.

    Args:
        tool_name (str): Name of the tool to disable.

    Returns:
        bool: True if the tool was disabled, False if it was already disabled or doesn't exist.
    """
    if tool_name not in self._managed_tools:
        self.logger.warning(f"Tool '{tool_name}' not found in managed tools")
        return False

    tool_info = self._managed_tools[tool_name]

    if tool_info.status == MCPToolStatus.DISABLED:
        self.logger.debug(f"Tool '{tool_name}' is already disabled")
        return False

    # Disable the tool
    tool_info.status = MCPToolStatus.DISABLED
    tool_info.reason = MCPToolStatusReason.FROM_USER_DISABLED

    # Update timestamp
    import time
    tool_info.last_updated = time.time()

    # Publish event
    self._publish_tool_event(EventType.MCP_TOOL_DISABLED, tool_name, tool_info)

    self.logger.info(f"Disabled tool: {tool_name}")
    return True
disconnect_all() async

Disconnect from all MCP servers.

Source code in hatchling/mcp_utils/manager.py
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
async def disconnect_all(self) -> None:
    """Disconnect from all MCP servers."""

    async with self._connection_lock:
        # Store the current task for debugging
        current_task_id = id(asyncio.current_task())
        self.logger.debug(f"Disconnecting all clients from task: {current_task_id}")

        disconnection_errors = False

        # First try the graceful disconnect approach
        for path, client in list(self.mcp_clients.items()):
            try:
                # Disable all tools from this server before disconnecting
                for tool_name, tool_info in self._managed_tools.items():
                    if tool_info.server_path == path and tool_info.status == MCPToolStatus.ENABLED:
                        tool_info.status = MCPToolStatus.DISABLED
                        tool_info.reason = MCPToolStatusReason.FROM_SERVER_DOWN

                        # Publish tool disabled event
                        self._publish_tool_event(EventType.MCP_TOOL_DISABLED, tool_name, tool_info)

                # Log task context for debugging
                if hasattr(client, '_connection_task_id') and client._connection_task_id:
                    self.logger.debug(f"Client for {path} was created in task: {client._connection_task_id}")

                # Try graceful disconnect first with timeout
                disconnect_task = asyncio.create_task(client.disconnect())
                try:
                    await asyncio.wait_for(disconnect_task, timeout=10)
                    # Publish server down event on successful disconnect
                    self._publish_server_event(EventType.MCP_SERVER_DOWN, path)
                except asyncio.TimeoutError:
                    self.logger.warning(f"Disconnect timeout for {path}")
                    disconnection_errors = True
                    # Publish server unreachable event
                    self._publish_server_event(EventType.MCP_SERVER_UNREACHABLE, path,
                                             error="Disconnect timeout")
                except Exception as e:
                    self.logger.error(f"Error during graceful disconnect for {path}: {e}")
                    disconnection_errors = True
                    # Publish server unreachable event
                    self._publish_server_event(EventType.MCP_SERVER_UNREACHABLE, path,
                                             error=str(e))
            except Exception as e:
                self.logger.error(f"Error setting up disconnect for {path}: {e}")
                disconnection_errors = True

        # If any disconnections failed with errors, use forceful termination
        if disconnection_errors:
            self.logger.warning("Some disconnections failed. Using forceful termination as fallback.")
            self._terminate_server_processes()

        # Clear all client tracking regardless of disconnection success
        self.mcp_clients = {}
        self._tool_client_map = {}

        # Clear managed tools
        self._managed_tools = {}

        self.logger.info("Disconnected from all MCP servers")
enable_tool(tool_name)

Enable a specific tool if it exists and is disabled.

Parameters:

Name Type Description Default
tool_name str

Name of the tool to enable.

required

Returns:

Name Type Description
bool bool

True if the tool was enabled, False if it was already enabled or doesn't exist.

Source code in hatchling/mcp_utils/manager.py
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
def enable_tool(self, tool_name: str) -> bool:
    """Enable a specific tool if it exists and is disabled.

    Args:
        tool_name (str): Name of the tool to enable.

    Returns:
        bool: True if the tool was enabled, False if it was already enabled or doesn't exist.
    """
    if tool_name not in self._managed_tools:
        self.logger.warning(f"Tool '{tool_name}' not found in managed tools")
        return False

    tool_info = self._managed_tools[tool_name]

    if tool_info.status == MCPToolStatus.ENABLED:
        self.logger.debug(f"Tool '{tool_name}' is already enabled")
        return False

    # Check if the server is still available
    if tool_info.server_path not in self.mcp_clients:
        self.logger.warning(f"Cannot enable tool '{tool_name}' - server is not connected")
        return False

    # Enable the tool
    tool_info.status = MCPToolStatus.ENABLED
    tool_info.reason = MCPToolStatusReason.FROM_USER_ENABLED

    # Update timestamp
    import time
    tool_info.last_updated = time.time()

    # Publish event
    self._publish_tool_event(EventType.MCP_TOOL_ENABLED, tool_name, tool_info)

    self.logger.info(f"Enabled tool: {tool_name}")
    return True
execute_tool(tool_name, arguments) async

Execute a tool by name with the given arguments.

Parameters:

Name Type Description Default
tool_name str

Name of the tool to execute.

required
arguments Dict[str, Any]

Arguments to pass to the tool.

required

Returns:

Name Type Description
Any Any

Result of the tool execution.

Raises:

Type Description
ConnectionError

If not connected to any MCP server.

ValueError

If the tool is not found in any connected MCP server.

Source code in hatchling/mcp_utils/manager.py
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
async def execute_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Any:
    """Execute a tool by name with the given arguments.

    Args:
        tool_name (str): Name of the tool to execute.
        arguments (Dict[str, Any]): Arguments to pass to the tool.

    Returns:
        Any: Result of the tool execution.

    Raises:
        ConnectionError: If not connected to any MCP server.
        ValueError: If the tool is not found in any connected MCP server.
    """
    if not self.mcp_clients:
        raise ConnectionError("Not connected to any MCP server")

    if tool_name not in self._tool_client_map:
        raise ValueError(f"Tool '{tool_name}' not found in any connected MCP server")

    client = self._tool_client_map[tool_name]
    self._used_servers_in_session.add(client.server_path)

    try:
        return await client.execute_tool(tool_name, arguments)

    except ConnectionError:
        # mark the server as unreachable
        self.logger.error(f"Failed to execute tool '{tool_name}' - server {client.server_path} is unreachable")
        self._publish_server_event(EventType.MCP_SERVER_UNREACHABLE, client.server_path,
                                  error="Failed to execute tool")
get_all_managed_tools()

Get all managed tools (both enabled and disabled).

Returns:

Type Description
Dict[str, MCPToolInfo]

Dict[str, MCPToolInfo]: Dictionary mapping tool names to all MCPToolInfo objects.

Source code in hatchling/mcp_utils/manager.py
334
335
336
337
338
339
340
def get_all_managed_tools(self) -> Dict[str, MCPToolInfo]:
    """Get all managed tools (both enabled and disabled).

    Returns:
        Dict[str, MCPToolInfo]: Dictionary mapping tool names to all MCPToolInfo objects.
    """
    return self._managed_tools.copy()
get_citations_for_session() async

Get citations for all servers used in the current session.

Returns:

Type Description
Dict[str, Dict[str, str]]

Dict[str, Dict[str, str]]: Dictionary of citations for each server.

Source code in hatchling/mcp_utils/manager.py
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
async def get_citations_for_session(self) -> Dict[str, Dict[str, str]]:
    """Get citations for all servers used in the current session.

    Returns:
        Dict[str, Dict[str, str]]: Dictionary of citations for each server.
    """
    citations = {}

    for path in self._used_servers_in_session:
        if path in self.mcp_clients:
            client = self.mcp_clients[path]
            try:
                server_citations = await client.get_citations()
                citations[path] = server_citations
            except Exception as e:
                self.logger.error(f"Error getting citations for {path}: {e}")

    return citations
get_enabled_tools()

Get all enabled tools with their information.

Returns:

Type Description
Dict[str, MCPToolInfo]

Dict[str, MCPToolInfo]: Dictionary mapping tool names to enabled MCPToolInfo objects.

Source code in hatchling/mcp_utils/manager.py
323
324
325
326
327
328
329
330
331
332
def get_enabled_tools(self) -> Dict[str, MCPToolInfo]:
    """Get all enabled tools with their information.

    Returns:
        Dict[str, MCPToolInfo]: Dictionary mapping tool names to enabled MCPToolInfo objects.
    """
    return {
        name: info for name, info in self._managed_tools.items()
        if info.status == MCPToolStatus.ENABLED
    }
get_tool_status(tool_name)

Get the current status and information for a tool.

Parameters:

Name Type Description Default
tool_name str

Name of the tool to get status for.

required

Returns:

Type Description
Optional[MCPToolInfo]

Optional[MCPToolInfo]: Tool information if found, None otherwise.

Source code in hatchling/mcp_utils/manager.py
413
414
415
416
417
418
419
420
421
422
def get_tool_status(self, tool_name: str) -> Optional[MCPToolInfo]:
    """Get the current status and information for a tool.

    Args:
        tool_name (str): Name of the tool to get status for.

    Returns:
        Optional[MCPToolInfo]: Tool information if found, None otherwise.
    """
    return self._managed_tools.get(tool_name)
reset_session_tracking()

Reset the tracking of which servers were used.

Source code in hatchling/mcp_utils/manager.py
475
476
477
def reset_session_tracking(self):
    """Reset the tracking of which servers were used."""
    self._used_servers_in_session.clear()
validate_server_paths(server_paths)

Validate server paths and return the list of valid absolute paths.

Parameters:

Name Type Description Default
server_paths List[str]

List of server paths to validate.

required

Returns:

Type Description
List[str]

List[str]: List of valid absolute paths.

Source code in hatchling/mcp_utils/manager.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def validate_server_paths(self, server_paths: List[str]) -> List[str]:
    """Validate server paths and return the list of valid absolute paths.

    Args:
        server_paths (List[str]): List of server paths to validate.

    Returns:
        List[str]: List of valid absolute paths.
    """
    valid_paths = []
    for path in server_paths:
        # Convert to absolute path if relative
        abs_path = os.path.abspath(path)

        # Check if file exists
        if not os.path.isfile(abs_path):
            self.logger.error(f"MCP server script not found: {abs_path}")
            continue

        valid_paths.append(abs_path)

    return valid_paths

hatchling.mcp_utils.mcp_server_api

MCP Server Management API.

This module provides a clean, command-friendly API for managing MCP servers, tools, and debugging operations. It wraps the MCPManager functionality with a simplified interface suitable for CLI commands and user interaction.

Classes

MCPServerAPI

Clean API for MCP server management and debugging.

This class provides a simplified interface for: - Server connection management - Tool discovery and management - Manual tool execution for debugging - Server health monitoring

Source code in hatchling/mcp_utils/mcp_server_api.py
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
class MCPServerAPI:
    """Clean API for MCP server management and debugging.

    This class provides a simplified interface for:
    - Server connection management
    - Tool discovery and management
    - Manual tool execution for debugging
    - Server health monitoring
    """

    # =============================================================================
    # Server Management
    # =============================================================================

    @staticmethod
    async def connect_servers(server_paths: Optional[List[str]] = None) -> bool:
        """Connect to MCP servers.

        Args:
            server_paths (List[str]): List of paths to MCP server scripts.
            If None, connects to all configured servers.

        Returns:
            bool: True if at least one server connected successfully.
        """
        return await mcp_manager.connect_to_servers(server_paths)

    @staticmethod
    async def disconnect_all_servers() -> None:
        """Disconnect from all MCP servers."""
        await mcp_manager.disconnect_all()

    @staticmethod
    def get_server_list() -> List[MCPServerInfo]:
        """Get list of all configured MCP servers with their status.

        Returns:
            List[MCPServerInfo]: List of server information.
        """
        servers = []

        # Get connected servers
        for path, client in mcp_manager.mcp_clients.items():
            tool_count = len(client.tools)
            enabled_count = len([
                tool for tool in mcp_manager._managed_tools.values()
                if tool.server_path == path and tool.status == MCPToolStatus.ENABLED
            ])

            servers.append(MCPServerInfo(
                path=path,
                status=MCPServerStatus.CONNECTED,
                tool_count=tool_count,
                enabled_tool_count=enabled_count,
                last_connected=time.time()  # Approximate
            ))

        return servers

    @staticmethod
    def get_server_status(server_path: str) -> MCPServerInfo:
        """Get detailed status for a specific server.

        Args:
            server_path (str): Path to the MCP server script.

        Returns:
            MCPServerInfo: Server status information.
        """
        if server_path in mcp_manager.mcp_clients:
            client = mcp_manager.mcp_clients[server_path]
            tool_count = len(client.tools)
            enabled_count = len([
                tool for tool in mcp_manager._managed_tools.values()
                if tool.server_path == server_path and tool.status == MCPToolStatus.ENABLED
            ])

            return MCPServerInfo(
                path=server_path,
                status=MCPServerStatus.CONNECTED,
                tool_count=tool_count,
                enabled_tool_count=enabled_count,
                last_connected=time.time()
            )
        else:
            return MCPServerInfo(
                path=server_path,
                status=MCPServerStatus.DISCONNECTED,
                tool_count=0,
                enabled_tool_count=0,
                error_message="Server not connected"
            )

    # =============================================================================
    # Tool Management  
    # =============================================================================

    @staticmethod
    def get_all_tools() -> List[MCPToolSummary]:
        """Get list of all available MCP tools.

        Returns:
            List[MCPToolSummary]: List of tool summaries.
        """
        tools = []

        for tool_name, tool_info in mcp_manager.get_all_managed_tools().items():
            # Try to get tool description from the actual tool object
            description = None
            if tool_info.server_path in mcp_manager.mcp_clients:
                client = mcp_manager.mcp_clients[tool_info.server_path]
                tool_obj = client.tools.get(tool_name)
                if tool_obj and hasattr(tool_obj, 'description'):
                    description = tool_obj.description

            tools.append(MCPToolSummary(
                name=tool_name,
                server_path=tool_info.server_path,
                status=tool_info.status,
                description=description,
                last_updated=tool_info.last_updated
            ))

        return tools

    @staticmethod
    def get_enabled_tools() -> List[MCPToolSummary]:
        """Get list of enabled MCP tools.

        Returns:
            List[MCPToolSummary]: List of enabled tool summaries.
        """
        return [tool for tool in MCPServerAPI.get_all_tools() if tool.status == MCPToolStatus.ENABLED]

    @staticmethod
    def get_tools_by_server(server_path: str) -> List[MCPToolSummary]:
        """Get all tools provided by a specific server.

        Args:
            server_path (str): Path to the MCP server script.

        Returns:
            List[MCPToolSummary]: List of tools from the server.
        """
        return [tool for tool in MCPServerAPI.get_all_tools() if tool.server_path == server_path]

    @staticmethod
    def enable_tool(tool_name: str) -> bool:
        """Enable a specific tool.

        Args:
            tool_name (str): Name of the tool to enable.

        Returns:
            bool: True if tool was enabled successfully.
        """
        # Get managed tools directly instead of using manager method
        managed_tools = mcp_manager._managed_tools

        if tool_name not in managed_tools:
            logger.warning(f"Tool '{tool_name}' not found in managed tools")
            return False

        tool_info = managed_tools[tool_name]

        if tool_info.status == MCPToolStatus.ENABLED:
            logger.debug(f"Tool '{tool_name}' is already enabled")
            return True

        # Check if the server is still available
        if tool_info.server_path not in mcp_manager.mcp_clients:
            logger.warning(f"Cannot enable tool '{tool_name}' - server is not connected")
            return False

        # Enable the tool
        tool_info.status = MCPToolStatus.ENABLED
        tool_info.reason = MCPToolStatusReason.FROM_USER_ENABLED

        # Update timestamp
        import time
        tool_info.last_updated = time.time()

        # Publish event through manager's publisher
        mcp_manager._publish_tool_event(EventType.MCP_TOOL_ENABLED, tool_name, tool_info)

        logger.info(f"Enabled tool: {tool_name}")
        return True

    @staticmethod
    def disable_tool(tool_name: str) -> bool:
        """Disable a specific tool.

        Args:
            tool_name (str): Name of the tool to disable.

        Returns:
            bool: True if tool was disabled successfully.
        """
        # Get managed tools directly instead of using manager method
        managed_tools = mcp_manager._managed_tools

        if tool_name not in managed_tools:
            logger.warning(f"Tool '{tool_name}' not found in managed tools")
            return False

        tool_info = managed_tools[tool_name]

        if tool_info.status == MCPToolStatus.DISABLED:
            logger.debug(f"Tool '{tool_name}' is already disabled")
            return False

        # Disable the tool
        tool_info.status = MCPToolStatus.DISABLED
        tool_info.reason = MCPToolStatusReason.FROM_USER_DISABLED

        # Update timestamp
        import time
        tool_info.last_updated = time.time()

        # Publish event through manager's publisher
        mcp_manager._publish_tool_event(EventType.MCP_TOOL_DISABLED, tool_name, tool_info)

        logger.info(f"Disabled tool: {tool_name}")
        return True

    @staticmethod
    def get_tool_info(tool_name: str) -> Optional[MCPToolSummary]:
        """Get detailed information about a specific tool.

        Args:
            tool_name (str): Name of the tool.

        Returns:
            Optional[MCPToolSummary]: Tool information if found.
        """
        tool_info = mcp_manager.get_tool_status(tool_name)
        if not tool_info:
            return None

        # Get description from tool object
        description = None
        if tool_info.server_path in mcp_manager.mcp_clients:
            client = mcp_manager.mcp_clients[tool_info.server_path]
            tool_obj = client.tools.get(tool_name)
            if tool_obj and hasattr(tool_obj, 'description'):
                description = tool_obj.description

        return MCPToolSummary(
            name=tool_name,
            server_path=tool_info.server_path,
            status=tool_info.status,
            description=description,
            last_updated=tool_info.last_updated
        )

    # =============================================================================
    # Manual Tool Execution (Debugging)
    # =============================================================================

    @staticmethod
    async def execute_tool_manually(tool_name: str, arguments: Dict[str, Any]) -> Tuple[bool, Any, Optional[str]]:
        """Execute an MCP tool manually for debugging purposes.

        Args:
            tool_name (str): Name of the tool to execute.
            arguments (Dict[str, Any]): Arguments to pass to the tool.

        Returns:
            Tuple[bool, Any, Optional[str]]: Success flag, result, and error message if any.
        """
        try:
            result = await mcp_manager.execute_tool(tool_name, arguments)
            return True, result, None
        except ConnectionError as e:
            return False, None, f"Connection error: {e}"
        except ValueError as e:
            return False, None, f"Tool not found: {e}"
        except Exception as e:
            return False, None, f"Execution error: {e}"

    @staticmethod
    def get_tool_schema(tool_name: str) -> Optional[Dict[str, Any]]:
        """Get the JSON schema for a tool's arguments.

        Args:
            tool_name (str): Name of the tool.

        Returns:
            Optional[Dict[str, Any]]: Tool schema if available.
        """
        tool_info = mcp_manager.get_tool_status(tool_name)
        if not tool_info or tool_info.server_path not in mcp_manager.mcp_clients:
            return None

        return tool_info.schema

    # =============================================================================
    # Health and Diagnostics
    # =============================================================================

    @staticmethod
    def get_health_summary() -> Dict[str, Any]:
        """Get overall health summary of MCP system.

        Returns:
            Dict[str, Any]: Health summary including server and tool counts.
        """
        servers = MCPServerAPI.get_server_list()
        all_tools = MCPServerAPI.get_all_tools()

        return {
            "connected_servers": len(servers),
            "total_tools": len(all_tools),
            "enabled_tools": len([t for t in all_tools if t.status == MCPToolStatus.ENABLED]),
            "disabled_tools": len([t for t in all_tools if t.status == MCPToolStatus.DISABLED]),
            "server_details": [
                {
                    "path": server.path,
                    "status": server.status.value,
                    "tools": server.tool_count,
                    "enabled_tools": server.enabled_tool_count
                }
                for server in servers
            ]
        }

    @staticmethod
    async def get_session_citations() -> Dict[str, Dict[str, str]]:
        """Get citations for all servers used in the current session.

        Returns:
            Dict[str, Dict[str, str]]: Server citations.
        """
        return await mcp_manager.get_citations_for_session()

    @staticmethod
    def reset_session_tracking() -> None:
        """Reset session tracking for citations."""
        mcp_manager.reset_session_tracking()
Functions
connect_servers(server_paths=None) async staticmethod

Connect to MCP servers.

Parameters:

Name Type Description Default
server_paths List[str]

List of paths to MCP server scripts.

None

Returns:

Name Type Description
bool bool

True if at least one server connected successfully.

Source code in hatchling/mcp_utils/mcp_server_api.py
64
65
66
67
68
69
70
71
72
73
74
75
@staticmethod
async def connect_servers(server_paths: Optional[List[str]] = None) -> bool:
    """Connect to MCP servers.

    Args:
        server_paths (List[str]): List of paths to MCP server scripts.
        If None, connects to all configured servers.

    Returns:
        bool: True if at least one server connected successfully.
    """
    return await mcp_manager.connect_to_servers(server_paths)
disable_tool(tool_name) staticmethod

Disable a specific tool.

Parameters:

Name Type Description Default
tool_name str

Name of the tool to disable.

required

Returns:

Name Type Description
bool bool

True if tool was disabled successfully.

Source code in hatchling/mcp_utils/mcp_server_api.py
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
@staticmethod
def disable_tool(tool_name: str) -> bool:
    """Disable a specific tool.

    Args:
        tool_name (str): Name of the tool to disable.

    Returns:
        bool: True if tool was disabled successfully.
    """
    # Get managed tools directly instead of using manager method
    managed_tools = mcp_manager._managed_tools

    if tool_name not in managed_tools:
        logger.warning(f"Tool '{tool_name}' not found in managed tools")
        return False

    tool_info = managed_tools[tool_name]

    if tool_info.status == MCPToolStatus.DISABLED:
        logger.debug(f"Tool '{tool_name}' is already disabled")
        return False

    # Disable the tool
    tool_info.status = MCPToolStatus.DISABLED
    tool_info.reason = MCPToolStatusReason.FROM_USER_DISABLED

    # Update timestamp
    import time
    tool_info.last_updated = time.time()

    # Publish event through manager's publisher
    mcp_manager._publish_tool_event(EventType.MCP_TOOL_DISABLED, tool_name, tool_info)

    logger.info(f"Disabled tool: {tool_name}")
    return True
disconnect_all_servers() async staticmethod

Disconnect from all MCP servers.

Source code in hatchling/mcp_utils/mcp_server_api.py
77
78
79
80
@staticmethod
async def disconnect_all_servers() -> None:
    """Disconnect from all MCP servers."""
    await mcp_manager.disconnect_all()
enable_tool(tool_name) staticmethod

Enable a specific tool.

Parameters:

Name Type Description Default
tool_name str

Name of the tool to enable.

required

Returns:

Name Type Description
bool bool

True if tool was enabled successfully.

Source code in hatchling/mcp_utils/mcp_server_api.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
@staticmethod
def enable_tool(tool_name: str) -> bool:
    """Enable a specific tool.

    Args:
        tool_name (str): Name of the tool to enable.

    Returns:
        bool: True if tool was enabled successfully.
    """
    # Get managed tools directly instead of using manager method
    managed_tools = mcp_manager._managed_tools

    if tool_name not in managed_tools:
        logger.warning(f"Tool '{tool_name}' not found in managed tools")
        return False

    tool_info = managed_tools[tool_name]

    if tool_info.status == MCPToolStatus.ENABLED:
        logger.debug(f"Tool '{tool_name}' is already enabled")
        return True

    # Check if the server is still available
    if tool_info.server_path not in mcp_manager.mcp_clients:
        logger.warning(f"Cannot enable tool '{tool_name}' - server is not connected")
        return False

    # Enable the tool
    tool_info.status = MCPToolStatus.ENABLED
    tool_info.reason = MCPToolStatusReason.FROM_USER_ENABLED

    # Update timestamp
    import time
    tool_info.last_updated = time.time()

    # Publish event through manager's publisher
    mcp_manager._publish_tool_event(EventType.MCP_TOOL_ENABLED, tool_name, tool_info)

    logger.info(f"Enabled tool: {tool_name}")
    return True
execute_tool_manually(tool_name, arguments) async staticmethod

Execute an MCP tool manually for debugging purposes.

Parameters:

Name Type Description Default
tool_name str

Name of the tool to execute.

required
arguments Dict[str, Any]

Arguments to pass to the tool.

required

Returns:

Type Description
Tuple[bool, Any, Optional[str]]

Tuple[bool, Any, Optional[str]]: Success flag, result, and error message if any.

Source code in hatchling/mcp_utils/mcp_server_api.py
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
@staticmethod
async def execute_tool_manually(tool_name: str, arguments: Dict[str, Any]) -> Tuple[bool, Any, Optional[str]]:
    """Execute an MCP tool manually for debugging purposes.

    Args:
        tool_name (str): Name of the tool to execute.
        arguments (Dict[str, Any]): Arguments to pass to the tool.

    Returns:
        Tuple[bool, Any, Optional[str]]: Success flag, result, and error message if any.
    """
    try:
        result = await mcp_manager.execute_tool(tool_name, arguments)
        return True, result, None
    except ConnectionError as e:
        return False, None, f"Connection error: {e}"
    except ValueError as e:
        return False, None, f"Tool not found: {e}"
    except Exception as e:
        return False, None, f"Execution error: {e}"
get_all_tools() staticmethod

Get list of all available MCP tools.

Returns:

Type Description
List[MCPToolSummary]

List[MCPToolSummary]: List of tool summaries.

Source code in hatchling/mcp_utils/mcp_server_api.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
@staticmethod
def get_all_tools() -> List[MCPToolSummary]:
    """Get list of all available MCP tools.

    Returns:
        List[MCPToolSummary]: List of tool summaries.
    """
    tools = []

    for tool_name, tool_info in mcp_manager.get_all_managed_tools().items():
        # Try to get tool description from the actual tool object
        description = None
        if tool_info.server_path in mcp_manager.mcp_clients:
            client = mcp_manager.mcp_clients[tool_info.server_path]
            tool_obj = client.tools.get(tool_name)
            if tool_obj and hasattr(tool_obj, 'description'):
                description = tool_obj.description

        tools.append(MCPToolSummary(
            name=tool_name,
            server_path=tool_info.server_path,
            status=tool_info.status,
            description=description,
            last_updated=tool_info.last_updated
        ))

    return tools
get_enabled_tools() staticmethod

Get list of enabled MCP tools.

Returns:

Type Description
List[MCPToolSummary]

List[MCPToolSummary]: List of enabled tool summaries.

Source code in hatchling/mcp_utils/mcp_server_api.py
175
176
177
178
179
180
181
182
@staticmethod
def get_enabled_tools() -> List[MCPToolSummary]:
    """Get list of enabled MCP tools.

    Returns:
        List[MCPToolSummary]: List of enabled tool summaries.
    """
    return [tool for tool in MCPServerAPI.get_all_tools() if tool.status == MCPToolStatus.ENABLED]
get_health_summary() staticmethod

Get overall health summary of MCP system.

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: Health summary including server and tool counts.

Source code in hatchling/mcp_utils/mcp_server_api.py
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
@staticmethod
def get_health_summary() -> Dict[str, Any]:
    """Get overall health summary of MCP system.

    Returns:
        Dict[str, Any]: Health summary including server and tool counts.
    """
    servers = MCPServerAPI.get_server_list()
    all_tools = MCPServerAPI.get_all_tools()

    return {
        "connected_servers": len(servers),
        "total_tools": len(all_tools),
        "enabled_tools": len([t for t in all_tools if t.status == MCPToolStatus.ENABLED]),
        "disabled_tools": len([t for t in all_tools if t.status == MCPToolStatus.DISABLED]),
        "server_details": [
            {
                "path": server.path,
                "status": server.status.value,
                "tools": server.tool_count,
                "enabled_tools": server.enabled_tool_count
            }
            for server in servers
        ]
    }
get_server_list() staticmethod

Get list of all configured MCP servers with their status.

Returns:

Type Description
List[MCPServerInfo]

List[MCPServerInfo]: List of server information.

Source code in hatchling/mcp_utils/mcp_server_api.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
@staticmethod
def get_server_list() -> List[MCPServerInfo]:
    """Get list of all configured MCP servers with their status.

    Returns:
        List[MCPServerInfo]: List of server information.
    """
    servers = []

    # Get connected servers
    for path, client in mcp_manager.mcp_clients.items():
        tool_count = len(client.tools)
        enabled_count = len([
            tool for tool in mcp_manager._managed_tools.values()
            if tool.server_path == path and tool.status == MCPToolStatus.ENABLED
        ])

        servers.append(MCPServerInfo(
            path=path,
            status=MCPServerStatus.CONNECTED,
            tool_count=tool_count,
            enabled_tool_count=enabled_count,
            last_connected=time.time()  # Approximate
        ))

    return servers
get_server_status(server_path) staticmethod

Get detailed status for a specific server.

Parameters:

Name Type Description Default
server_path str

Path to the MCP server script.

required

Returns:

Name Type Description
MCPServerInfo MCPServerInfo

Server status information.

Source code in hatchling/mcp_utils/mcp_server_api.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
@staticmethod
def get_server_status(server_path: str) -> MCPServerInfo:
    """Get detailed status for a specific server.

    Args:
        server_path (str): Path to the MCP server script.

    Returns:
        MCPServerInfo: Server status information.
    """
    if server_path in mcp_manager.mcp_clients:
        client = mcp_manager.mcp_clients[server_path]
        tool_count = len(client.tools)
        enabled_count = len([
            tool for tool in mcp_manager._managed_tools.values()
            if tool.server_path == server_path and tool.status == MCPToolStatus.ENABLED
        ])

        return MCPServerInfo(
            path=server_path,
            status=MCPServerStatus.CONNECTED,
            tool_count=tool_count,
            enabled_tool_count=enabled_count,
            last_connected=time.time()
        )
    else:
        return MCPServerInfo(
            path=server_path,
            status=MCPServerStatus.DISCONNECTED,
            tool_count=0,
            enabled_tool_count=0,
            error_message="Server not connected"
        )
get_session_citations() async staticmethod

Get citations for all servers used in the current session.

Returns:

Type Description
Dict[str, Dict[str, str]]

Dict[str, Dict[str, str]]: Server citations.

Source code in hatchling/mcp_utils/mcp_server_api.py
376
377
378
379
380
381
382
383
@staticmethod
async def get_session_citations() -> Dict[str, Dict[str, str]]:
    """Get citations for all servers used in the current session.

    Returns:
        Dict[str, Dict[str, str]]: Server citations.
    """
    return await mcp_manager.get_citations_for_session()
get_tool_info(tool_name) staticmethod

Get detailed information about a specific tool.

Parameters:

Name Type Description Default
tool_name str

Name of the tool.

required

Returns:

Type Description
Optional[MCPToolSummary]

Optional[MCPToolSummary]: Tool information if found.

Source code in hatchling/mcp_utils/mcp_server_api.py
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
@staticmethod
def get_tool_info(tool_name: str) -> Optional[MCPToolSummary]:
    """Get detailed information about a specific tool.

    Args:
        tool_name (str): Name of the tool.

    Returns:
        Optional[MCPToolSummary]: Tool information if found.
    """
    tool_info = mcp_manager.get_tool_status(tool_name)
    if not tool_info:
        return None

    # Get description from tool object
    description = None
    if tool_info.server_path in mcp_manager.mcp_clients:
        client = mcp_manager.mcp_clients[tool_info.server_path]
        tool_obj = client.tools.get(tool_name)
        if tool_obj and hasattr(tool_obj, 'description'):
            description = tool_obj.description

    return MCPToolSummary(
        name=tool_name,
        server_path=tool_info.server_path,
        status=tool_info.status,
        description=description,
        last_updated=tool_info.last_updated
    )
get_tool_schema(tool_name) staticmethod

Get the JSON schema for a tool's arguments.

Parameters:

Name Type Description Default
tool_name str

Name of the tool.

required

Returns:

Type Description
Optional[Dict[str, Any]]

Optional[Dict[str, Any]]: Tool schema if available.

Source code in hatchling/mcp_utils/mcp_server_api.py
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
@staticmethod
def get_tool_schema(tool_name: str) -> Optional[Dict[str, Any]]:
    """Get the JSON schema for a tool's arguments.

    Args:
        tool_name (str): Name of the tool.

    Returns:
        Optional[Dict[str, Any]]: Tool schema if available.
    """
    tool_info = mcp_manager.get_tool_status(tool_name)
    if not tool_info or tool_info.server_path not in mcp_manager.mcp_clients:
        return None

    return tool_info.schema
get_tools_by_server(server_path) staticmethod

Get all tools provided by a specific server.

Parameters:

Name Type Description Default
server_path str

Path to the MCP server script.

required

Returns:

Type Description
List[MCPToolSummary]

List[MCPToolSummary]: List of tools from the server.

Source code in hatchling/mcp_utils/mcp_server_api.py
184
185
186
187
188
189
190
191
192
193
194
@staticmethod
def get_tools_by_server(server_path: str) -> List[MCPToolSummary]:
    """Get all tools provided by a specific server.

    Args:
        server_path (str): Path to the MCP server script.

    Returns:
        List[MCPToolSummary]: List of tools from the server.
    """
    return [tool for tool in MCPServerAPI.get_all_tools() if tool.server_path == server_path]
reset_session_tracking() staticmethod

Reset session tracking for citations.

Source code in hatchling/mcp_utils/mcp_server_api.py
385
386
387
388
@staticmethod
def reset_session_tracking() -> None:
    """Reset session tracking for citations."""
    mcp_manager.reset_session_tracking()

MCPServerInfo dataclass

Information about an MCP server.

Source code in hatchling/mcp_utils/mcp_server_api.py
27
28
29
30
31
32
33
34
35
@dataclass
class MCPServerInfo:
    """Information about an MCP server."""
    path: str
    status: MCPServerStatus
    tool_count: int
    enabled_tool_count: int
    last_connected: Optional[float] = None
    error_message: Optional[str] = None

MCPServerStatus

Bases: Enum

Status of an MCP server connection.

Source code in hatchling/mcp_utils/mcp_server_api.py
19
20
21
22
23
24
class MCPServerStatus(Enum):
    """Status of an MCP server connection."""
    CONNECTED = "connected"
    DISCONNECTED = "disconnected"
    ERROR = "error"
    UNKNOWN = "unknown"

MCPToolSummary dataclass

Summary information about an MCP tool.

Source code in hatchling/mcp_utils/mcp_server_api.py
38
39
40
41
42
43
44
45
46
@dataclass
class MCPToolSummary:
    """Summary information about an MCP tool."""
    name: str
    server_path: str
    status: MCPToolStatus
    description: Optional[str] = None
    last_updated: Optional[float] = None
    error_message: Optional[str] = None

Tool Management

hatchling.mcp_utils.mcp_tool_call_subscriber

MCP Tool Call Subscriber for handling LLM_TOOL_CALL_REQUEST events from LLM streams.

This module provides a subscriber that listens for LLM_TOOL_CALL_REQUEST events from LLM providers and dispatches them to MCPToolExecution for processing.

Classes

MCPToolCallSubscriber

Bases: EventSubscriber

Subscriber that handles LLM_TOOL_CALL_REQUEST events and dispatches them for execution.

This subscriber only processes one tool call per request ID, using a rolling buffer to avoid memory growth and ensure out-of-order tool calls are ignored.

Source code in hatchling/mcp_utils/mcp_tool_call_subscriber.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
class MCPToolCallSubscriber(EventSubscriber):
    """Subscriber that handles LLM_TOOL_CALL_REQUEST events and dispatches them for execution.

    This subscriber only processes one tool call per request ID, using a rolling buffer
    to avoid memory growth and ensure out-of-order tool calls are ignored.
    """

    def __init__(self, tool_execution: MCPToolExecution):
        """Initialize the MCP tool call subscriber.

        Args:
            tool_execution (MCPToolExecution): The tool execution manager to dispatch calls to.
        """
        self.tool_execution = tool_execution
        self.logger = logging_manager.get_session(f"MCPToolCallSubscriber")
        self._recent_request_ids = deque(maxlen=2)  # Rolling buffer for request IDs

    def get_subscribed_events(self):
        """Get the list of event types this subscriber handles.

        Returns:
            List[EventType]: List of event types this subscriber handles.
        """
        return [EventType.LLM_TOOL_CALL_REQUEST]

    def on_event(self, event: Event) -> None:
        """Handle incoming events.

        Only the first tool call for each request ID is processed. Subsequent tool calls
        for the same request ID are ignored. The buffer ensures only the last two request IDs
        are tracked, preventing memory growth.

        Args:
            event (Event): The event to handle.
        """
        if event.type == EventType.LLM_TOOL_CALL_REQUEST:
            if event.request_id in self._recent_request_ids:
                self.logger.warning(f"LLM requested several tool calls from a unique prompt ({event.request_id}) " +\
                                    f"We currently only process the first one.\nSkipping tool call {event.data}")
                return

            try:
                self.logger.debug(f"Received LLM_TOOL_CALL_REQUEST event: {event}")
                provider = ProviderRegistry.get_provider(event.provider)
                parsed_tool_call = provider.llm_to_hatchling_tool_call(event)
            except Exception as e:
                self.logger.error(f"Error parsing tool call event: {e}")
                return

            if parsed_tool_call:
                self.logger.info(f"\nParsed tool call: {json_dumps(parsed_tool_call.to_dict(), indent=2)}\n")
                self.tool_execution.event_publisher.set_request_id(event.request_id)
                self._recent_request_ids.append(event.request_id)  # Add to rolling buffer
                self._handle_tool_call_event(parsed_tool_call)
        else:
            self.logger.warning(f"Received unexpected event type: {event.type}")

    def _handle_tool_call_event(self, parsed_tool_call: ToolCallParsedResult) -> None:
        """Handle LLM_TOOL_CALL_REQUEST events by dispatching to tool execution.

        Args:
            event (Event): The LLM_TOOL_CALL_REQUEST event to handle.
        """
        try:
            # Dispatch the tool call for execution
            self.tool_execution.execute_tool_sync(parsed_tool_call)

        except Exception as e:
            self.logger.error(f"Error handling tool call event: {e}")

            self.tool_execution.event_publisher.publish(
                EventType.MCP_TOOL_CALL_ERROR,
                {
                    "parsed_tool_call": parsed_tool_call.to_dict(),
                    "error": str(e)
                }
            )
Functions
__init__(tool_execution)

Initialize the MCP tool call subscriber.

Parameters:

Name Type Description Default
tool_execution MCPToolExecution

The tool execution manager to dispatch calls to.

required
Source code in hatchling/mcp_utils/mcp_tool_call_subscriber.py
24
25
26
27
28
29
30
31
32
def __init__(self, tool_execution: MCPToolExecution):
    """Initialize the MCP tool call subscriber.

    Args:
        tool_execution (MCPToolExecution): The tool execution manager to dispatch calls to.
    """
    self.tool_execution = tool_execution
    self.logger = logging_manager.get_session(f"MCPToolCallSubscriber")
    self._recent_request_ids = deque(maxlen=2)  # Rolling buffer for request IDs
get_subscribed_events()

Get the list of event types this subscriber handles.

Returns:

Type Description

List[EventType]: List of event types this subscriber handles.

Source code in hatchling/mcp_utils/mcp_tool_call_subscriber.py
34
35
36
37
38
39
40
def get_subscribed_events(self):
    """Get the list of event types this subscriber handles.

    Returns:
        List[EventType]: List of event types this subscriber handles.
    """
    return [EventType.LLM_TOOL_CALL_REQUEST]
on_event(event)

Handle incoming events.

Only the first tool call for each request ID is processed. Subsequent tool calls for the same request ID are ignored. The buffer ensures only the last two request IDs are tracked, preventing memory growth.

Parameters:

Name Type Description Default
event Event

The event to handle.

required
Source code in hatchling/mcp_utils/mcp_tool_call_subscriber.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def on_event(self, event: Event) -> None:
    """Handle incoming events.

    Only the first tool call for each request ID is processed. Subsequent tool calls
    for the same request ID are ignored. The buffer ensures only the last two request IDs
    are tracked, preventing memory growth.

    Args:
        event (Event): The event to handle.
    """
    if event.type == EventType.LLM_TOOL_CALL_REQUEST:
        if event.request_id in self._recent_request_ids:
            self.logger.warning(f"LLM requested several tool calls from a unique prompt ({event.request_id}) " +\
                                f"We currently only process the first one.\nSkipping tool call {event.data}")
            return

        try:
            self.logger.debug(f"Received LLM_TOOL_CALL_REQUEST event: {event}")
            provider = ProviderRegistry.get_provider(event.provider)
            parsed_tool_call = provider.llm_to_hatchling_tool_call(event)
        except Exception as e:
            self.logger.error(f"Error parsing tool call event: {e}")
            return

        if parsed_tool_call:
            self.logger.info(f"\nParsed tool call: {json_dumps(parsed_tool_call.to_dict(), indent=2)}\n")
            self.tool_execution.event_publisher.set_request_id(event.request_id)
            self._recent_request_ids.append(event.request_id)  # Add to rolling buffer
            self._handle_tool_call_event(parsed_tool_call)
    else:
        self.logger.warning(f"Received unexpected event type: {event.type}")

hatchling.mcp_utils.mcp_tool_data

Classes

MCPToolInfo dataclass

Information about an MCP tool in the lifecycle management system.

Source code in hatchling/mcp_utils/mcp_tool_data.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@dataclass
class MCPToolInfo:
    """Information about an MCP tool in the lifecycle management system."""

    name: str                           # Tool name
    description: str                    # Tool description  
    schema: Dict[str, Any]             # Tool schema/definition
    server_path: str                   # Path to the MCP server providing this tool
    status: MCPToolStatus              # Current status (enabled/disabled)
    reason: MCPToolStatusReason        # Reason for current status
    provider_format: Optional[Dict[str, Any]] = None  # Cached provider-specific format
    last_updated: Optional[float] = None              # Timestamp of last status update

    def __post_init__(self):
        """Set last_updated timestamp if not provided."""
        if self.last_updated is None:
            import time
            self.last_updated = time.time()
Functions
__post_init__()

Set last_updated timestamp if not provided.

Source code in hatchling/mcp_utils/mcp_tool_data.py
41
42
43
44
45
def __post_init__(self):
    """Set last_updated timestamp if not provided."""
    if self.last_updated is None:
        import time
        self.last_updated = time.time()

MCPToolStatus

Bases: Enum

Status of an MCP tool in the lifecycle management system.

Source code in hatchling/mcp_utils/mcp_tool_data.py
 6
 7
 8
 9
10
class MCPToolStatus(Enum):
    """Status of an MCP tool in the lifecycle management system."""

    ENABLED = "enabled"      # Tool is available for use
    DISABLED = "disabled"    # Tool is not available for use

MCPToolStatusReason

Bases: Enum

Reasons why an MCP tool has a particular status.

Source code in hatchling/mcp_utils/mcp_tool_data.py
13
14
15
16
17
18
19
20
21
22
23
24
25
class MCPToolStatusReason(Enum):
    """Reasons why an MCP tool has a particular status."""

    # Enabled reasons
    FROM_SERVER_UP = "server_up"              # Tool enabled because server came online
    FROM_USER_ENABLED = "user_enabled"        # Tool explicitly enabled by user
    FROM_SERVER_REACHABLE = "server_reachable"  # Tool enabled because server became reachable

    # Disabled reasons  
    FROM_SERVER_DOWN = "server_down"          # Tool disabled because server went down
    FROM_SERVER_UNREACHABLE = "unreachable"  # Tool disabled because server is unreachable
    FROM_USER_DISABLED = "user_disabled"     # Tool explicitly disabled by user
    FROM_SYSTEM_ERROR = "system_error"       # Tool disabled due to system error

hatchling.mcp_utils.mcp_tool_execution

MCP tool execution management with event publishing.

This module provides enhanced functionality for handling tool execution requests from LLMs, managing tool calling chains, and processing tool results with event-driven architecture.

Classes

MCPToolExecution

Manages tool execution and tool calling chains with event publishing.

Source code in hatchling/mcp_utils/mcp_tool_execution.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
class MCPToolExecution:
    """Manages tool execution and tool calling chains with event publishing."""

    def __init__(self, settings: AppSettings = None):
        """Initialize the MCP tool execution manager.

        Args:
            settings (AppSettings, optional): The application settings.
                                            If None, uses the singleton instance.
        """
        self.settings = settings or AppSettings.get_instance()
        self.logger = logging_manager.get_session("MCPToolExecution")
        logging_manager.set_log_level(logging.INFO)

        # Initialize event publisher
        self._event_publisher = EventPublisher()

        # Tool calling control properties
        self.current_tool_call_iteration = 0
        self.tool_call_start_time = None
        self.root_tool_query = None  # Track the original user query that started the tool sequence

    @property
    def event_publisher(self) -> EventPublisher:
        """Get the stream publisher for this tool execution manager.

        Returns:
            EventPublisher: The stream publisher instance.
        """
        return self._event_publisher

    def reset_for_new_query(self, query: str) -> None:
        """Reset tool execution state for a new user query.

        Args:
            query (str): The user's query that's starting a new conversation.
        """
        self.current_tool_call_iteration = 0
        self.tool_call_start_time = time.time()
        self.root_tool_query = query

    async def execute_tool(self, parsed_tool_call: ToolCallParsedResult) -> None:
        """Execute a tool and return its result.

        Sends the tool call to the MCPManager for execution and publishes events
        for tool call dispatched, progress, result, and error handling.
        You can subscribe to `event_publisher` of this class to receive
        MCP_TOOL_CALL_DISPATCHED, MCP_TOOL_CALL_PROGRESS, MCP_TOOL_CALL_RESULT, and MCP_TOOL_CALL_ERROR events.
        That will allow you to react to tool calls in real-time and handle them accordingly.

        Args:
            parsed_tool_call (ToolCallParsedResult): The parsed tool call containing
                tool_id, function_name, and arguments.
        """
        self.logger.debug(
            f"Redirecting to tool execution for (tool_call_id: {parsed_tool_call.tool_call_id}; "
            f"function: {parsed_tool_call.function_name}; arguments: {parsed_tool_call.arguments})"
        )

        self.current_tool_call_iteration += 1

        # Publish tool call dispatched event
        self._event_publisher.publish(EventType.MCP_TOOL_CALL_DISPATCHED, parsed_tool_call.to_dict())

        try:
            # Process the tool call using MCPManager
            tool_response = await mcp_manager.execute_tool(
                tool_name=parsed_tool_call.function_name,
                arguments=parsed_tool_call.arguments
            )
            self.logger.debug(f"Tool {parsed_tool_call.function_name} executed with responses: {tool_response}")

            if tool_response and not tool_response.isError:
                result_obj = ToolCallExecutionResult(
                    **parsed_tool_call.to_dict(),
                    result=tool_response,
                    error=None
                )
                self._event_publisher.publish(EventType.MCP_TOOL_CALL_RESULT, result_obj.to_dict())
            else:
                result_obj = ToolCallExecutionResult(
                    **parsed_tool_call.to_dict(),
                    result=tool_response,
                    error="Tool execution failed or returned no valid response"
                )
                self._event_publisher.publish(EventType.MCP_TOOL_CALL_ERROR, result_obj.to_dict())

        except Exception as e:
            self.logger.error(f"Error executing tool: {e}")
            result_obj = ToolCallExecutionResult(
                **parsed_tool_call.to_dict(),
                result=CallToolResult(
                    content=[{"type": "text", "text": f"{e}"}],
                    isError=True,
                ),
                error=str(e)
            )
            self._event_publisher.publish(EventType.MCP_TOOL_CALL_ERROR, result_obj.to_dict())

    def execute_tool_sync(self, parsed_tool_call: ToolCallParsedResult) -> None:
        """Synchronous wrapper for execute_tool that handles async execution internally.

        This method creates a task to execute the tool asynchronously without blocking
        the caller. It's designed for use in synchronous contexts where you want to
        dispatch tool execution but don't need to wait for the result.

        Args:
            parsed_tool_call (ToolCallParsedResult): The parsed tool call containing
                tool_id, function_name, and arguments.
        """
        try:
            # Try to create a task in the current event loop
            asyncio.create_task(self.execute_tool(parsed_tool_call))
        except RuntimeError:
            # No event loop running, create one for this execution
            try:
                asyncio.run(self.execute_tool(parsed_tool_call))
            except Exception as e:
                self.logger.warning(f"Failed to execute tool synchronously: {e}")
Attributes
event_publisher property

Get the stream publisher for this tool execution manager.

Returns:

Name Type Description
EventPublisher EventPublisher

The stream publisher instance.

Functions
__init__(settings=None)

Initialize the MCP tool execution manager.

Parameters:

Name Type Description Default
settings AppSettings

The application settings. If None, uses the singleton instance.

None
Source code in hatchling/mcp_utils/mcp_tool_execution.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def __init__(self, settings: AppSettings = None):
    """Initialize the MCP tool execution manager.

    Args:
        settings (AppSettings, optional): The application settings.
                                        If None, uses the singleton instance.
    """
    self.settings = settings or AppSettings.get_instance()
    self.logger = logging_manager.get_session("MCPToolExecution")
    logging_manager.set_log_level(logging.INFO)

    # Initialize event publisher
    self._event_publisher = EventPublisher()

    # Tool calling control properties
    self.current_tool_call_iteration = 0
    self.tool_call_start_time = None
    self.root_tool_query = None  # Track the original user query that started the tool sequence
execute_tool(parsed_tool_call) async

Execute a tool and return its result.

Sends the tool call to the MCPManager for execution and publishes events for tool call dispatched, progress, result, and error handling. You can subscribe to event_publisher of this class to receive MCP_TOOL_CALL_DISPATCHED, MCP_TOOL_CALL_PROGRESS, MCP_TOOL_CALL_RESULT, and MCP_TOOL_CALL_ERROR events. That will allow you to react to tool calls in real-time and handle them accordingly.

Parameters:

Name Type Description Default
parsed_tool_call ToolCallParsedResult

The parsed tool call containing tool_id, function_name, and arguments.

required
Source code in hatchling/mcp_utils/mcp_tool_execution.py
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
async def execute_tool(self, parsed_tool_call: ToolCallParsedResult) -> None:
    """Execute a tool and return its result.

    Sends the tool call to the MCPManager for execution and publishes events
    for tool call dispatched, progress, result, and error handling.
    You can subscribe to `event_publisher` of this class to receive
    MCP_TOOL_CALL_DISPATCHED, MCP_TOOL_CALL_PROGRESS, MCP_TOOL_CALL_RESULT, and MCP_TOOL_CALL_ERROR events.
    That will allow you to react to tool calls in real-time and handle them accordingly.

    Args:
        parsed_tool_call (ToolCallParsedResult): The parsed tool call containing
            tool_id, function_name, and arguments.
    """
    self.logger.debug(
        f"Redirecting to tool execution for (tool_call_id: {parsed_tool_call.tool_call_id}; "
        f"function: {parsed_tool_call.function_name}; arguments: {parsed_tool_call.arguments})"
    )

    self.current_tool_call_iteration += 1

    # Publish tool call dispatched event
    self._event_publisher.publish(EventType.MCP_TOOL_CALL_DISPATCHED, parsed_tool_call.to_dict())

    try:
        # Process the tool call using MCPManager
        tool_response = await mcp_manager.execute_tool(
            tool_name=parsed_tool_call.function_name,
            arguments=parsed_tool_call.arguments
        )
        self.logger.debug(f"Tool {parsed_tool_call.function_name} executed with responses: {tool_response}")

        if tool_response and not tool_response.isError:
            result_obj = ToolCallExecutionResult(
                **parsed_tool_call.to_dict(),
                result=tool_response,
                error=None
            )
            self._event_publisher.publish(EventType.MCP_TOOL_CALL_RESULT, result_obj.to_dict())
        else:
            result_obj = ToolCallExecutionResult(
                **parsed_tool_call.to_dict(),
                result=tool_response,
                error="Tool execution failed or returned no valid response"
            )
            self._event_publisher.publish(EventType.MCP_TOOL_CALL_ERROR, result_obj.to_dict())

    except Exception as e:
        self.logger.error(f"Error executing tool: {e}")
        result_obj = ToolCallExecutionResult(
            **parsed_tool_call.to_dict(),
            result=CallToolResult(
                content=[{"type": "text", "text": f"{e}"}],
                isError=True,
            ),
            error=str(e)
        )
        self._event_publisher.publish(EventType.MCP_TOOL_CALL_ERROR, result_obj.to_dict())
execute_tool_sync(parsed_tool_call)

Synchronous wrapper for execute_tool that handles async execution internally.

This method creates a task to execute the tool asynchronously without blocking the caller. It's designed for use in synchronous contexts where you want to dispatch tool execution but don't need to wait for the result.

Parameters:

Name Type Description Default
parsed_tool_call ToolCallParsedResult

The parsed tool call containing tool_id, function_name, and arguments.

required
Source code in hatchling/mcp_utils/mcp_tool_execution.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
def execute_tool_sync(self, parsed_tool_call: ToolCallParsedResult) -> None:
    """Synchronous wrapper for execute_tool that handles async execution internally.

    This method creates a task to execute the tool asynchronously without blocking
    the caller. It's designed for use in synchronous contexts where you want to
    dispatch tool execution but don't need to wait for the result.

    Args:
        parsed_tool_call (ToolCallParsedResult): The parsed tool call containing
            tool_id, function_name, and arguments.
    """
    try:
        # Try to create a task in the current event loop
        asyncio.create_task(self.execute_tool(parsed_tool_call))
    except RuntimeError:
        # No event loop running, create one for this execution
        try:
            asyncio.run(self.execute_tool(parsed_tool_call))
        except Exception as e:
            self.logger.warning(f"Failed to execute tool synchronously: {e}")
reset_for_new_query(query)

Reset tool execution state for a new user query.

Parameters:

Name Type Description Default
query str

The user's query that's starting a new conversation.

required
Source code in hatchling/mcp_utils/mcp_tool_execution.py
49
50
51
52
53
54
55
56
57
def reset_for_new_query(self, query: str) -> None:
    """Reset tool execution state for a new user query.

    Args:
        query (str): The user's query that's starting a new conversation.
    """
    self.current_tool_call_iteration = 0
    self.tool_call_start_time = time.time()
    self.root_tool_query = query

hatchling.mcp_utils.mcp_tool_lifecycle_subscriber

Classes

ToolLifecycleSubscriber

Bases: EventSubscriber

Subscriber that manages MCP tool lifecycle and maintains tool cache in the format required by the LLM provider.

This subscriber listens for MCP server and tool lifecycle events and maintains a cache of all discovered tools with their current status. It provides methods to get enabled tools for use in LLM payloads.

Source code in hatchling/mcp_utils/mcp_tool_lifecycle_subscriber.py
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
class ToolLifecycleSubscriber(EventSubscriber):
    """Subscriber that manages MCP tool lifecycle and maintains tool cache
    in the format required by the LLM provider.

    This subscriber listens for MCP server and tool lifecycle events and maintains
    a cache of all discovered tools with their current status. It provides methods
    to get enabled tools for use in LLM payloads.
    """

    def __init__(self, provider_name: str, mcp_to_provider_tool_func: Callable[[MCPToolInfo], Dict[str, Any]]):
        """Initialize the tool lifecycle subscriber.

        Args:
            provider_name (str): Name of the LLM provider using this subscriber.
            mcp_to_provider_tool_func (Callable): Function to convert MCP tools to provider-specific format.
        """
        self.provider_name = provider_name
        self._tool_cache: Dict[str, MCPToolInfo] = {}
        self._mcp_to_provider_tool_func = mcp_to_provider_tool_func
        self.logger = logging.getLogger(f"{self.__class__.__name__}[{provider_name}]")

    def on_event(self, event: Event) -> None:
        """Handle MCP lifecycle events and update tool cache.

        Args:
            event (Event): The event to handle.
        """
        try:
            if event.type == EventType.MCP_SERVER_UP:
                self._handle_server_up_event(event)
            elif event.type == EventType.MCP_SERVER_DOWN:
                self._handle_server_down_event(event)
            elif event.type == EventType.MCP_SERVER_UNREACHABLE:
                self._handle_server_unreachable_event(event)
            elif event.type == EventType.MCP_SERVER_REACHABLE:
                self._handle_server_reachable_event(event)
            elif event.type == EventType.MCP_TOOL_ENABLED:
                self._handle_tool_enabled_event(event)
            elif event.type == EventType.MCP_TOOL_DISABLED:
                self._handle_tool_disabled_event(event)

        except Exception as e:
            self.logger.error(f"Error handling event {event.type}: {e}")

    def get_subscribed_events(self) -> List[EventType]:
        """Return MCP lifecycle event types this subscriber is interested in.

        Returns:
            List[EventType]: MCP lifecycle event types.
        """
        return [
            EventType.MCP_SERVER_UP,
            EventType.MCP_SERVER_DOWN,
            EventType.MCP_SERVER_UNREACHABLE,
            EventType.MCP_SERVER_REACHABLE,
            EventType.MCP_TOOL_ENABLED,
            EventType.MCP_TOOL_DISABLED
        ]

    def _handle_server_up_event(self, event: Event) -> None:
        """Handle server up event."""
        server_path = event.data.get("server_path", "")
        tool_count = event.data.get("tool_count", 0)

        self.logger.info(f"Server up: {server_path} with {tool_count} tools")

    def _handle_server_down_event(self, event: Event) -> None:
        """Handle server down event."""
        server_path = event.data.get("server_path", "")

        # Disable all tools from this server
        tools_disabled = 0
        for tool_info in self._tool_cache.values():
            if tool_info.server_path == server_path and tool_info.status == MCPToolStatus.ENABLED:
                tool_info.status = MCPToolStatus.DISABLED
                tool_info.reason = MCPToolStatusReason.FROM_SERVER_DOWN
                tools_disabled += 1

        self.logger.info(f"Server down: {server_path} - disabled {tools_disabled} tools")

    def _handle_server_unreachable_event(self, event: Event) -> None:
        """Handle server unreachable event."""
        server_path = event.data.get("server_path", "")
        error = event.data.get("error", "Unknown error")

        # Disable all tools from this server
        tools_disabled = 0
        for tool_info in self._tool_cache.values():
            if tool_info.server_path == server_path and tool_info.status == MCPToolStatus.ENABLED:
                tool_info.status = MCPToolStatus.DISABLED
                tool_info.reason = MCPToolStatusReason.FROM_SERVER_UNREACHABLE
                tools_disabled += 1

        self.logger.warning(f"Server unreachable: {server_path} ({error}) - disabled {tools_disabled} tools")

    def _handle_server_reachable_event(self, event: Event) -> None:
        """Handle server reachable event."""
        server_path = event.data.get("server_path", "")

        # Re-enable tools from this server that were disabled due to unreachability
        tools_enabled = 0
        for tool_info in self._tool_cache.values():
            if (tool_info.server_path == server_path and 
                tool_info.status == MCPToolStatus.DISABLED and
                tool_info.reason == MCPToolStatusReason.FROM_SERVER_UNREACHABLE):

                tool_info.status = MCPToolStatus.ENABLED
                tool_info.reason = MCPToolStatusReason.FROM_SERVER_REACHABLE
                tools_enabled += 1

        self.logger.info(f"Server reachable: {server_path} - re-enabled {tools_enabled} tools")

    def _handle_tool_enabled_event(self, event: Event) -> None:
        """Handle tool enabled event."""
        tool_name = event.data.get("tool_name", "")
        tool_info = event.data.get("tool_info", {})

        if not tool_info:
            self.logger.error(f"'Tool enabled event' missing 'tool_info' for tool '{tool_name}'")
            return

        # Always convert tool to provider-specific format
        # This ensures that tools are properly formatted even during reconnection
        # when they already exist in the cache but need their provider_format refreshed
        self._mcp_to_provider_tool_func(tool_info)

        # Update cache with the new tool info (whether new or existing)
        self._tool_cache[tool_name] = tool_info
        self.logger.debug(f"Tool enabled: {tool_name}")

    def _handle_tool_disabled_event(self, event: Event) -> None:
        """Handle tool disabled event."""
        tool_name = event.data.get("tool_name", "")
        tool_info = event.data.get("tool_info", {})

        if not tool_info:
            self.logger.error(f"'Tool disabled event' missing 'tool_info' for tool '{tool_name}'")
            return

        # Update cache with the fresh tool info from the event
        # This ensures consistency with _handle_tool_enabled_event() and prevents
        # stale cache entries that could cause disabled tools to remain in payloads
        self._tool_cache[tool_name] = tool_info
        self.logger.debug(f"Tool disabled: {tool_name}")

    def get_enabled_tools(self) -> Dict[str, MCPToolInfo]:
        """Get all currently enabled tools.

        Returns:
            Dict[str, MCPToolInfo]: Dictionary mapping tool names to enabled tool info.
        """
        return {
            name: info for name, info in self._tool_cache.items()
            if info.status == MCPToolStatus.ENABLED
        }

    def get_all_tools(self) -> Dict[str, MCPToolInfo]:
        """Get all tools (enabled and disabled).

        Returns:
            Dict[str, MCPToolInfo]: Dictionary mapping tool names to all tool info.
        """
        return self._tool_cache.copy()

    def get_tool_count(self) -> Dict[str, int]:
        """Get count of enabled and disabled tools.

        Returns:
            Dict[str, int]: Dictionary with 'enabled' and 'disabled' counts.
        """
        enabled = sum(1 for info in self._tool_cache.values() 
                     if info.status == MCPToolStatus.ENABLED)
        disabled = len(self._tool_cache) - enabled

        return {"enabled": enabled, "disabled": disabled}

    def clear_cache(self) -> None:
        """Clear the tool cache."""
        self._tool_cache.clear()
        self.logger.debug("Tool cache cleared")

    def prettied_reason(self, reason: MCPToolStatusReason) -> str:
        """Get a prettified string representation of the tool status reason.

        Args:
            reason (MCPToolStatusReason): The reason to prettify.

        Returns:
            str: Prettified reason string.
        """
        if reason == MCPToolStatusReason.FROM_SERVER_UP:
            return "Enabled at server startup"
        elif reason == MCPToolStatusReason.FROM_USER_ENABLED:
            return "Enabled by user (while server is up and reachable)"
        elif reason == MCPToolStatusReason.FROM_SERVER_REACHABLE:
            return "Server is reachable again after being down or unreachable"
        elif reason == MCPToolStatusReason.FROM_SERVER_DOWN:
            return "Server is down"
        elif reason == MCPToolStatusReason.FROM_SERVER_UNREACHABLE:
            return "Server is unreachable"
        elif reason == MCPToolStatusReason.FROM_USER_DISABLED:
            return "Tool disabled by user"
        elif reason == MCPToolStatusReason.FROM_SYSTEM_ERROR:
            return "System error occurred"
        else:
            return str(reason.value)
Functions
__init__(provider_name, mcp_to_provider_tool_func)

Initialize the tool lifecycle subscriber.

Parameters:

Name Type Description Default
provider_name str

Name of the LLM provider using this subscriber.

required
mcp_to_provider_tool_func Callable

Function to convert MCP tools to provider-specific format.

required
Source code in hatchling/mcp_utils/mcp_tool_lifecycle_subscriber.py
18
19
20
21
22
23
24
25
26
27
28
def __init__(self, provider_name: str, mcp_to_provider_tool_func: Callable[[MCPToolInfo], Dict[str, Any]]):
    """Initialize the tool lifecycle subscriber.

    Args:
        provider_name (str): Name of the LLM provider using this subscriber.
        mcp_to_provider_tool_func (Callable): Function to convert MCP tools to provider-specific format.
    """
    self.provider_name = provider_name
    self._tool_cache: Dict[str, MCPToolInfo] = {}
    self._mcp_to_provider_tool_func = mcp_to_provider_tool_func
    self.logger = logging.getLogger(f"{self.__class__.__name__}[{provider_name}]")
clear_cache()

Clear the tool cache.

Source code in hatchling/mcp_utils/mcp_tool_lifecycle_subscriber.py
185
186
187
188
def clear_cache(self) -> None:
    """Clear the tool cache."""
    self._tool_cache.clear()
    self.logger.debug("Tool cache cleared")
get_all_tools()

Get all tools (enabled and disabled).

Returns:

Type Description
Dict[str, MCPToolInfo]

Dict[str, MCPToolInfo]: Dictionary mapping tool names to all tool info.

Source code in hatchling/mcp_utils/mcp_tool_lifecycle_subscriber.py
165
166
167
168
169
170
171
def get_all_tools(self) -> Dict[str, MCPToolInfo]:
    """Get all tools (enabled and disabled).

    Returns:
        Dict[str, MCPToolInfo]: Dictionary mapping tool names to all tool info.
    """
    return self._tool_cache.copy()
get_enabled_tools()

Get all currently enabled tools.

Returns:

Type Description
Dict[str, MCPToolInfo]

Dict[str, MCPToolInfo]: Dictionary mapping tool names to enabled tool info.

Source code in hatchling/mcp_utils/mcp_tool_lifecycle_subscriber.py
154
155
156
157
158
159
160
161
162
163
def get_enabled_tools(self) -> Dict[str, MCPToolInfo]:
    """Get all currently enabled tools.

    Returns:
        Dict[str, MCPToolInfo]: Dictionary mapping tool names to enabled tool info.
    """
    return {
        name: info for name, info in self._tool_cache.items()
        if info.status == MCPToolStatus.ENABLED
    }
get_subscribed_events()

Return MCP lifecycle event types this subscriber is interested in.

Returns:

Type Description
List[EventType]

List[EventType]: MCP lifecycle event types.

Source code in hatchling/mcp_utils/mcp_tool_lifecycle_subscriber.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def get_subscribed_events(self) -> List[EventType]:
    """Return MCP lifecycle event types this subscriber is interested in.

    Returns:
        List[EventType]: MCP lifecycle event types.
    """
    return [
        EventType.MCP_SERVER_UP,
        EventType.MCP_SERVER_DOWN,
        EventType.MCP_SERVER_UNREACHABLE,
        EventType.MCP_SERVER_REACHABLE,
        EventType.MCP_TOOL_ENABLED,
        EventType.MCP_TOOL_DISABLED
    ]
get_tool_count()

Get count of enabled and disabled tools.

Returns:

Type Description
Dict[str, int]

Dict[str, int]: Dictionary with 'enabled' and 'disabled' counts.

Source code in hatchling/mcp_utils/mcp_tool_lifecycle_subscriber.py
173
174
175
176
177
178
179
180
181
182
183
def get_tool_count(self) -> Dict[str, int]:
    """Get count of enabled and disabled tools.

    Returns:
        Dict[str, int]: Dictionary with 'enabled' and 'disabled' counts.
    """
    enabled = sum(1 for info in self._tool_cache.values() 
                 if info.status == MCPToolStatus.ENABLED)
    disabled = len(self._tool_cache) - enabled

    return {"enabled": enabled, "disabled": disabled}
on_event(event)

Handle MCP lifecycle events and update tool cache.

Parameters:

Name Type Description Default
event Event

The event to handle.

required
Source code in hatchling/mcp_utils/mcp_tool_lifecycle_subscriber.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def on_event(self, event: Event) -> None:
    """Handle MCP lifecycle events and update tool cache.

    Args:
        event (Event): The event to handle.
    """
    try:
        if event.type == EventType.MCP_SERVER_UP:
            self._handle_server_up_event(event)
        elif event.type == EventType.MCP_SERVER_DOWN:
            self._handle_server_down_event(event)
        elif event.type == EventType.MCP_SERVER_UNREACHABLE:
            self._handle_server_unreachable_event(event)
        elif event.type == EventType.MCP_SERVER_REACHABLE:
            self._handle_server_reachable_event(event)
        elif event.type == EventType.MCP_TOOL_ENABLED:
            self._handle_tool_enabled_event(event)
        elif event.type == EventType.MCP_TOOL_DISABLED:
            self._handle_tool_disabled_event(event)

    except Exception as e:
        self.logger.error(f"Error handling event {event.type}: {e}")
prettied_reason(reason)

Get a prettified string representation of the tool status reason.

Parameters:

Name Type Description Default
reason MCPToolStatusReason

The reason to prettify.

required

Returns:

Name Type Description
str str

Prettified reason string.

Source code in hatchling/mcp_utils/mcp_tool_lifecycle_subscriber.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
def prettied_reason(self, reason: MCPToolStatusReason) -> str:
    """Get a prettified string representation of the tool status reason.

    Args:
        reason (MCPToolStatusReason): The reason to prettify.

    Returns:
        str: Prettified reason string.
    """
    if reason == MCPToolStatusReason.FROM_SERVER_UP:
        return "Enabled at server startup"
    elif reason == MCPToolStatusReason.FROM_USER_ENABLED:
        return "Enabled by user (while server is up and reachable)"
    elif reason == MCPToolStatusReason.FROM_SERVER_REACHABLE:
        return "Server is reachable again after being down or unreachable"
    elif reason == MCPToolStatusReason.FROM_SERVER_DOWN:
        return "Server is down"
    elif reason == MCPToolStatusReason.FROM_SERVER_UNREACHABLE:
        return "Server is unreachable"
    elif reason == MCPToolStatusReason.FROM_USER_DISABLED:
        return "Tool disabled by user"
    elif reason == MCPToolStatusReason.FROM_SYSTEM_ERROR:
        return "System error occurred"
    else:
        return str(reason.value)