Skip to content

unblu_mcp

unblu-mcp package.

A model context protocol server for interacting with Unblu deployments.

Modules:

  • cli

Classes:

Functions:

ConnectionConfig dataclass

ConnectionConfig(
    base_url: str,
    headers: dict[str, str] = dict(),
    auth: Auth | None = None,
    timeout: float = 30.0,
)

Configuration returned by a connection provider.

Attributes:

auth class-attribute instance-attribute

auth: Auth | None = None

Optional httpx auth handler for basic auth, etc.

base_url instance-attribute

base_url: str

The base URL for API requests (e.g., http://localhost:8084/app/rest/v4).

headers class-attribute instance-attribute

headers: dict[str, str] = field(default_factory=dict)

Additional headers to include in all requests (e.g., trusted headers).

timeout class-attribute instance-attribute

timeout: float = 30.0

Request timeout in seconds.

ConnectionProvider

Bases: ABC

Abstract base class for connection providers.

Connection providers handle the complexity of connecting to Unblu deployments in various environments. They can:

  • Start/stop port-forwards or tunnels
  • Manage authentication (API keys, basic auth, trusted headers)
  • Handle environment switching
  • Refresh credentials when needed

Example implementation for Kubernetes:

class K8sConnectionProvider(ConnectionProvider):
    def __init__(self, environment: str = "dev"):
        self.environment = environment
        self._port_forward_process = None

    async def setup(self) -> None:
        # Start kubectl port-forward
        ...

    async def teardown(self) -> None:
        # Stop port-forward
        ...

    def get_config(self) -> ConnectionConfig:
        return ConnectionConfig(
            base_url=f"http://localhost:{self.port}/app/rest/v4",
            headers={
                "x-unblu-trusted-user-id": "superadmin",
                "x-unblu-trusted-user-role": "SUPER_ADMIN",
            },
        )

Methods:

  • get_config

    Return the current connection configuration.

  • health_check

    Check if the connection is healthy.

  • setup

    Initialize the connection (start port-forward, refresh auth, etc.).

  • teardown

    Clean up resources (stop port-forward, close connections, etc.).

get_config abstractmethod

get_config() -> ConnectionConfig

Return the current connection configuration.

This is called for each API request, allowing dynamic configuration (e.g., refreshing tokens, switching environments).

Returns:

  • ConnectionConfig

    ConnectionConfig with base_url, headers, auth, and timeout.

Source code in src/unblu_mcp/_internal/providers.py
79
80
81
82
83
84
85
86
87
88
@abstractmethod
def get_config(self) -> ConnectionConfig:
    """Return the current connection configuration.

    This is called for each API request, allowing dynamic configuration
    (e.g., refreshing tokens, switching environments).

    Returns:
        ConnectionConfig with base_url, headers, auth, and timeout.
    """

health_check async

health_check() -> bool

Check if the connection is healthy.

Override this to implement custom health checks (e.g., ping the API).

Returns:

  • bool

    True if the connection is healthy, False otherwise.

Source code in src/unblu_mcp/_internal/providers.py
90
91
92
93
94
95
96
97
98
async def health_check(self) -> bool:
    """Check if the connection is healthy.

    Override this to implement custom health checks (e.g., ping the API).

    Returns:
        True if the connection is healthy, False otherwise.
    """
    return True

setup abstractmethod async

setup() -> None

Initialize the connection (start port-forward, refresh auth, etc.).

Called once when the MCP server starts, before any API requests. Should be idempotent - safe to call multiple times.

Source code in src/unblu_mcp/_internal/providers.py
63
64
65
66
67
68
69
@abstractmethod
async def setup(self) -> None:
    """Initialize the connection (start port-forward, refresh auth, etc.).

    Called once when the MCP server starts, before any API requests.
    Should be idempotent - safe to call multiple times.
    """

teardown abstractmethod async

teardown() -> None

Clean up resources (stop port-forward, close connections, etc.).

Called when the MCP server shuts down. Should be safe to call even if setup() was never called.

Source code in src/unblu_mcp/_internal/providers.py
71
72
73
74
75
76
77
@abstractmethod
async def teardown(self) -> None:
    """Clean up resources (stop port-forward, close connections, etc.).

    Called when the MCP server shuts down.
    Should be safe to call even if setup() was never called.
    """

DefaultConnectionProvider

DefaultConnectionProvider(
    base_url: str | None = None,
    api_key: str | None = None,
    username: str | None = None,
    password: str | None = None,
    trusted_headers: dict[str, str] | None = None,
)

Bases: ConnectionProvider

Default provider using environment variables.

This is the standard provider for direct connections to Unblu Cloud or self-hosted deployments with direct network access.

Environment variables

UNBLU_BASE_URL: API base URL (default: https://unblu.cloud/app/rest/v4) UNBLU_API_KEY: Bearer token for API key auth UNBLU_USERNAME: Username for basic auth UNBLU_PASSWORD: Password for basic auth UNBLU_TRUSTED_HEADERS: Trusted headers (format: "key:value,key:value")

Methods:

  • get_config

    Build config from environment variables and constructor args.

  • health_check

    Check if the connection is healthy.

  • setup

    No setup needed for direct connections.

  • teardown

    No teardown needed for direct connections.

Source code in src/unblu_mcp/_internal/providers.py
115
116
117
118
119
120
121
122
123
124
125
126
127
def __init__(
    self,
    base_url: str | None = None,
    api_key: str | None = None,
    username: str | None = None,
    password: str | None = None,
    trusted_headers: dict[str, str] | None = None,
) -> None:
    self._base_url = base_url
    self._api_key = api_key
    self._username = username
    self._password = password
    self._trusted_headers = trusted_headers

get_config

get_config() -> ConnectionConfig

Build config from environment variables and constructor args.

Source code in src/unblu_mcp/_internal/providers.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
def get_config(self) -> ConnectionConfig:
    """Build config from environment variables and constructor args."""
    # Load from environment if not provided
    base_url = self._base_url or os.environ.get("UNBLU_BASE_URL", "https://unblu.cloud/app/rest/v4")
    api_key = self._api_key or os.environ.get("UNBLU_API_KEY")
    username = self._username or os.environ.get("UNBLU_USERNAME")
    password = self._password or os.environ.get("UNBLU_PASSWORD")
    trusted_headers = self._trusted_headers
    if trusted_headers is None:
        trusted_headers = _parse_trusted_headers(os.environ.get("UNBLU_TRUSTED_HEADERS"))

    # Build headers and auth
    headers: dict[str, str] = {}
    auth: httpx.Auth | None = None

    if trusted_headers:
        headers.update(trusted_headers)
    elif api_key:
        headers["Authorization"] = f"Bearer {api_key}"
    elif username and password:
        auth = httpx.BasicAuth(username, password)

    return ConnectionConfig(
        base_url=base_url,
        headers=headers,
        auth=auth,
    )

health_check async

health_check() -> bool

Check if the connection is healthy.

Override this to implement custom health checks (e.g., ping the API).

Returns:

  • bool

    True if the connection is healthy, False otherwise.

Source code in src/unblu_mcp/_internal/providers.py
90
91
92
93
94
95
96
97
98
async def health_check(self) -> bool:
    """Check if the connection is healthy.

    Override this to implement custom health checks (e.g., ping the API).

    Returns:
        True if the connection is healthy, False otherwise.
    """
    return True

setup async

setup() -> None

No setup needed for direct connections.

Source code in src/unblu_mcp/_internal/providers.py
129
130
async def setup(self) -> None:
    """No setup needed for direct connections."""

teardown async

teardown() -> None

No teardown needed for direct connections.

Source code in src/unblu_mcp/_internal/providers.py
132
133
async def teardown(self) -> None:
    """No teardown needed for direct connections."""

K8sConnectionProvider

K8sConnectionProvider(
    environment: str | K8sEnvironmentConfig = "dev",
    trusted_user_id: str = "superadmin",
    trusted_user_role: str = "SUPER_ADMIN",
    environments: dict[str, K8sEnvironmentConfig]
    | None = None,
)

Bases: ConnectionProvider

Connection provider for Kubernetes deployments using port-forwarding.

This provider: - Automatically starts kubectl port-forward on setup - Cleans up the port-forward process on teardown - Configures trusted headers authentication - Supports multiple environments with different ports

Parameters:

  • environment (str | K8sEnvironmentConfig, default: 'dev' ) –

    Environment name (dev, staging, prod) or custom K8sEnvironmentConfig.

  • trusted_user_id (str, default: 'superadmin' ) –

    User ID for trusted headers auth (default: superadmin).

  • trusted_user_role (str, default: 'SUPER_ADMIN' ) –

    User role for trusted headers auth (default: SUPER_ADMIN).

  • environments (dict[str, K8sEnvironmentConfig] | None, default: None ) –

    Custom environment configurations (overrides defaults).

Example

provider = K8sConnectionProvider(environment="dev") await provider.setup() # Starts port-forward config = provider.get_config() # Returns connection config await provider.teardown() # Stops port-forward

Methods:

  • get_config

    Return connection config with trusted headers.

  • health_check

    Check if the port-forward is healthy.

  • setup

    Start kubectl port-forward if not already running.

  • teardown

    Stop the port-forward process only if we own it.

Attributes:

Source code in src/unblu_mcp/_internal/providers_k8s.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
def __init__(
    self,
    environment: str | K8sEnvironmentConfig = "dev",
    trusted_user_id: str = "superadmin",
    trusted_user_role: str = "SUPER_ADMIN",
    environments: dict[str, K8sEnvironmentConfig] | None = None,
) -> None:
    self._environments = environments or _get_default_environments()

    if isinstance(environment, str):
        if environment not in self._environments:
            valid = ", ".join(self._environments.keys())
            msg = f"Unknown environment '{environment}'. Valid: {valid}"
            raise ValueError(msg)
        self._env_config = self._environments[environment]
    else:
        self._env_config = environment

    self._trusted_user_id = trusted_user_id
    self._trusted_user_role = trusted_user_role
    self._port_forward_process: subprocess.Popen[bytes] | None = None
    self._file_lock: FileLock | None = None  # Cross-platform file lock
    self._owns_port_forward = False  # Whether we started the port-forward

environment property

environment: str

Return the current environment name.

local_port property

local_port: int

Return the local port for this environment.

get_config

get_config() -> ConnectionConfig

Return connection config with trusted headers.

Source code in src/unblu_mcp/_internal/providers_k8s.py
276
277
278
279
280
281
282
283
284
def get_config(self) -> ConnectionConfig:
    """Return connection config with trusted headers."""
    return ConnectionConfig(
        base_url=f"http://localhost:{self._env_config.local_port}{self._env_config.api_path}",
        headers={
            "x-unblu-trusted-user-id": self._trusted_user_id,
            "x-unblu-trusted-user-role": self._trusted_user_role,
        },
    )

health_check async

health_check() -> bool

Check if the port-forward is healthy.

Source code in src/unblu_mcp/_internal/providers_k8s.py
286
287
288
async def health_check(self) -> bool:
    """Check if the port-forward is healthy."""
    return self._is_port_in_use()

setup async

setup() -> None

Start kubectl port-forward if not already running.

Uses a lock file to coordinate between multiple MCP server instances. Only one instance will start the port-forward; others will reuse it.

Source code in src/unblu_mcp/_internal/providers_k8s.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
async def setup(self) -> None:
    """Start kubectl port-forward if not already running.

    Uses a lock file to coordinate between multiple MCP server instances.
    Only one instance will start the port-forward; others will reuse it.
    """
    # Try to acquire exclusive lock for this environment's port-forward
    if self._try_acquire_lock():
        # We got the lock - we're responsible for the port-forward
        self._owns_port_forward = True
        _logger.debug("Acquired port-forward lock for %s", self._env_config.name)

        if not self._is_port_in_use():
            # Port not in use, start port-forward
            await self._start_port_forward()
        else:
            # Port already in use (orphaned from previous run?), reuse it
            _logger.debug("Port %d already in use, reusing", self._env_config.local_port)
    else:
        # Another instance owns the port-forward, just wait for port
        self._owns_port_forward = False
        _logger.debug(
            "Another instance owns port-forward for %s, waiting...",
            self._env_config.name,
        )
        await self._wait_for_port()

teardown async

teardown() -> None

Stop the port-forward process only if we own it.

Only the instance that acquired the lock and started the port-forward will terminate it. Other instances just release their reference.

Source code in src/unblu_mcp/_internal/providers_k8s.py
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
async def teardown(self) -> None:
    """Stop the port-forward process only if we own it.

    Only the instance that acquired the lock and started the port-forward
    will terminate it. Other instances just release their reference.
    """
    if self._owns_port_forward:
        # We own the port-forward, clean it up
        if self._port_forward_process is not None:
            _logger.debug("Stopping port-forward for %s", self._env_config.name)
            self._port_forward_process.terminate()
            try:
                self._port_forward_process.wait(timeout=5)
            except subprocess.TimeoutExpired:
                self._port_forward_process.kill()
            self._port_forward_process = None

        # Release the lock
        self._release_lock()
    else:
        _logger.debug("Not owner, skipping port-forward cleanup for %s", self._env_config.name)

K8sEnvironmentConfig dataclass

K8sEnvironmentConfig(
    name: str,
    local_port: int,
    namespace: str,
    service: str = "haproxy",
    service_port: int = 8080,
    api_path: str = "/app/rest/v4",
)

Configuration for a Kubernetes environment.

Attributes:

api_path class-attribute instance-attribute

api_path: str = '/app/rest/v4'

API path prefix.

local_port instance-attribute

local_port: int

Local port for port-forwarding.

name instance-attribute

name: str

Environment name (e.g., dev, staging, prod).

namespace instance-attribute

namespace: str

Kubernetes namespace.

service class-attribute instance-attribute

service: str = 'haproxy'

Kubernetes service name.

service_port class-attribute instance-attribute

service_port: int = 8080

Service port to forward.

OperationInfo

Bases: BaseModel

Brief information about an API operation.

OperationSchema

Bases: BaseModel

Full schema for an API operation.

ServiceInfo

Bases: BaseModel

Information about an API service category.

UnbluAPIRegistry

UnbluAPIRegistry(spec: dict[str, Any])

Registry for Unblu API operations parsed from OpenAPI spec.

Methods:

Source code in src/unblu_mcp/_internal/server.py
61
62
63
64
65
66
67
def __init__(self, spec: dict[str, Any]) -> None:
    self.spec = spec
    self.services: dict[str, ServiceInfo] = {}
    self.operations: dict[str, dict[str, Any]] = {}
    self.operations_by_service: dict[str, list[str]] = {}
    self._schema_cache: dict[str, dict[str, Any]] = {}
    self._parse_spec()

get_operation_schema

get_operation_schema(
    operation_id: str,
) -> OperationSchema | None

Get full schema for an operation.

Source code in src/unblu_mcp/_internal/server.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
def get_operation_schema(self, operation_id: str) -> OperationSchema | None:
    """Get full schema for an operation."""
    op = self.operations.get(operation_id)
    if not op:
        return None

    # Check cache first
    if operation_id in self._schema_cache:
        cached = self._schema_cache[operation_id]
        return OperationSchema(**cached)

    # Resolve $ref references in parameters and request body
    parameters = self._resolve_refs(op["parameters"])
    request_body = self._resolve_refs(op["request_body"]) if op["request_body"] else None

    schema = OperationSchema(
        operation_id=op["operation_id"],
        method=op["method"],
        path=op["path"],
        summary=op["summary"],
        description=op.get("description"),
        parameters=parameters,
        request_body=request_body,
        responses=op["responses"],
    )

    # Cache the resolved schema
    self._schema_cache[operation_id] = schema.model_dump()

    return schema

list_operations

list_operations(service: str) -> list[OperationInfo]

List operations for a specific service (case-insensitive).

Source code in src/unblu_mcp/_internal/server.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def list_operations(self, service: str) -> list[OperationInfo]:
    """List operations for a specific service (case-insensitive)."""
    # Case-insensitive service lookup
    service_key = self._find_service_key(service)
    if service_key is None:
        return []
    op_ids = self.operations_by_service.get(service_key, [])
    result = []
    for op_id in op_ids:
        op = self.operations[op_id]
        result.append(
            OperationInfo(
                operation_id=op["operation_id"],
                method=op["method"],
                path=op["path"],
                summary=op["summary"],
            ),
        )
    return result

list_services

list_services() -> list[ServiceInfo]

List all available API services.

Source code in src/unblu_mcp/_internal/server.py
112
113
114
def list_services(self) -> list[ServiceInfo]:
    """List all available API services."""
    return sorted(self.services.values(), key=lambda s: s.name)

search_operations

search_operations(
    query: str, limit: int = 20
) -> list[OperationInfo]

Search operations by name, path, or description.

Source code in src/unblu_mcp/_internal/server.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
def search_operations(self, query: str, limit: int = 20) -> list[OperationInfo]:
    """Search operations by name, path, or description."""
    query_lower = query.lower()
    results = []
    for op_id, op in self.operations.items():
        score = 0
        if query_lower in op_id.lower():
            score += 3
        if query_lower in op["path"].lower():
            score += 2
        if query_lower in op["summary"].lower():
            score += 1
        if query_lower in (op.get("description") or "").lower():
            score += 1
        if score > 0:
            results.append((score, op))

    results.sort(key=lambda x: -x[0])
    return [
        OperationInfo(
            operation_id=op["operation_id"],
            method=op["method"],
            path=op["path"],
            summary=op["summary"],
        )
        for _, op in results[:limit]
    ]

create_server

create_server(
    spec_path: str | Path | None = None,
    base_url: str | None = None,
    api_key: str | None = None,
    username: str | None = None,
    password: str | None = None,
    provider: ConnectionProvider | None = None,
    policy_file: str | Path | None = None,
) -> FastMCP

Create the Unblu MCP server with progressive disclosure tools.

Parameters:

  • spec_path (str | Path | None, default: None ) –

    Path to swagger.json. Defaults to swagger.json in package root.

  • base_url (str | None, default: None ) –

    Unblu API base URL. Defaults to UNBLU_BASE_URL env var.

  • api_key (str | None, default: None ) –

    API key for authentication. Defaults to UNBLU_API_KEY env var.

  • username (str | None, default: None ) –

    Username for basic auth. Defaults to UNBLU_USERNAME env var.

  • password (str | None, default: None ) –

    Password for basic auth. Defaults to UNBLU_PASSWORD env var.

  • provider (ConnectionProvider | None, default: None ) –

    Optional connection provider for complex connectivity (e.g., K8s port-forward). If provided, overrides base_url/api_key/username/password.

  • policy_file (str | Path | None, default: None ) –

    Optional path to Eunomia policy JSON file for authorization. Requires the 'safety' extra: pip install unblu-mcp[safety]

Returns:

  • FastMCP

    Configured FastMCP server instance.

Source code in src/unblu_mcp/_internal/server.py
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
def create_server(
    spec_path: str | Path | None = None,
    base_url: str | None = None,
    api_key: str | None = None,
    username: str | None = None,
    password: str | None = None,
    provider: ConnectionProvider | None = None,
    policy_file: str | Path | None = None,
) -> FastMCP:
    """Create the Unblu MCP server with progressive disclosure tools.

    Args:
        spec_path: Path to swagger.json. Defaults to swagger.json in package root.
        base_url: Unblu API base URL. Defaults to UNBLU_BASE_URL env var.
        api_key: API key for authentication. Defaults to UNBLU_API_KEY env var.
        username: Username for basic auth. Defaults to UNBLU_USERNAME env var.
        password: Password for basic auth. Defaults to UNBLU_PASSWORD env var.
        provider: Optional connection provider for complex connectivity (e.g., K8s port-forward).
                  If provided, overrides base_url/api_key/username/password.
        policy_file: Optional path to Eunomia policy JSON file for authorization.
                     Requires the 'safety' extra: pip install unblu-mcp[safety]

    Returns:
        Configured FastMCP server instance.
    """
    from unblu_mcp._internal.providers import DefaultConnectionProvider  # noqa: PLC0415

    # Use provider if given, otherwise create default provider from args/env
    if provider is None:
        provider = DefaultConnectionProvider(
            base_url=base_url,
            api_key=api_key,
            username=username,
            password=password,
        )

    # Create lifespan context manager to handle provider setup/teardown
    # This is critical for K8s provider which needs to start port-forward on startup
    @asynccontextmanager
    async def lifespan(_mcp: FastMCP) -> AsyncIterator[None]:
        await provider.setup()
        try:
            yield
        finally:
            await provider.teardown()

    # Get connection config from provider
    # Note: For K8s provider, the port may not be available yet until setup() is called
    # but get_config() just returns the expected URL, it doesn't connect
    config = provider.get_config()

    # Load OpenAPI spec
    if spec_path is None:
        # Try to load from package resources first (works when installed from PyPI)
        try:
            spec_file = importlib.resources.files("unblu_mcp").joinpath("swagger.json")
            spec_content = spec_file.read_text(encoding="utf-8")
            spec = json.loads(spec_content)
        except (FileNotFoundError, TypeError):
            # Fall back to file system for development
            candidates = [
                Path.cwd() / "swagger.json",
            ]
            for candidate in candidates:
                if candidate.exists():
                    with open(candidate, encoding="utf-8") as f:
                        spec = json.load(f)
                    break
            else:
                raise FileNotFoundError("swagger.json not found. Please provide spec_path.")
    else:
        with open(spec_path, encoding="utf-8") as f:
            spec = json.load(f)

    registry = UnbluAPIRegistry(spec)

    # Create HTTP client from provider config
    client = httpx.AsyncClient(
        base_url=config.base_url,
        headers=config.headers,
        auth=config.auth,
        timeout=config.timeout,
    )

    # Create FastMCP server with lifespan for provider setup/teardown
    # mask_error_details=True hides internal errors from clients (they still get logged server-side)
    mcp = FastMCP(
        name="unblu-mcp",
        lifespan=lifespan,
        mask_error_details=True,
        instructions="""Unblu MCP Server - Token-Efficient API Access

This server provides access to 300+ Unblu API endpoints using progressive disclosure
to minimize token usage. Instead of loading all tool definitions upfront, use these
discovery tools to find and execute the operations you need:

1. list_services() - See available API service categories
2. list_operations(service) - See operations in a specific service
3. search_operations(query) - Find operations by keyword
4. get_operation_schema(operation_id) - Get full details for an operation
5. call_api(operation_id, ...) - Execute any API operation

Example workflow:
1. list_services() to see categories like "Conversations", "Users", "Bots"
2. list_operations("Conversations") to see available conversation operations
3. get_operation_schema("conversationsGetById") to see required parameters
4. call_api("conversationsGetById", path_params={"conversationId": "abc123"})
""",
    )

    # Add response caching for discovery tools (static spec data)
    # Cache list_services, list_operations, search_operations, get_operation_schema
    # but NOT call_api (live API data)
    mcp.add_middleware(
        ResponseCachingMiddleware(
            call_tool_settings=CallToolSettings(
                included_tools=[
                    "list_services",
                    "list_operations",
                    "search_operations",
                    "get_operation_schema",
                ],
            ),
        )
    )

    # Add error handling middleware for consistent error logging and tracking
    # This catches exceptions, logs them, and converts to proper MCP error responses
    mcp.add_middleware(ErrorHandlingMiddleware())

    # Add logging middleware for observability
    # Logs tool calls with payloads to help identify usage patterns and errors
    mcp.add_middleware(
        LoggingMiddleware(
            include_payloads=True,
            max_payload_length=1000,
        )
    )

    # Add Eunomia authorization middleware if policy file is provided
    if policy_file is not None:
        try:
            from eunomia_mcp import create_eunomia_middleware  # noqa: PLC0415

            policy_path = Path(policy_file) if isinstance(policy_file, str) else policy_file
            if not policy_path.exists():
                raise FileNotFoundError(f"Policy file not found: {policy_path}")
            middleware = create_eunomia_middleware(policy_file=str(policy_path))
            mcp.add_middleware(middleware)
        except ImportError as e:
            raise ImportError(
                "eunomia-mcp is required for policy-based authorization. Install with: pip install unblu-mcp[safety]"
            ) from e

    @mcp.tool(
        annotations={
            "title": "List API Services",
            "readOnlyHint": True,
            "openWorldHint": False,
        },
    )
    async def list_services() -> list[dict[str, Any]]:
        """List all available Unblu API service categories.

        Returns a list of services (API tags) with their descriptions and
        operation counts. Use this to discover what API capabilities are available.

        Returns:
            List of services with name, description, and operation_count.
        """
        try:
            services = registry.list_services()
            return [s.model_dump() for s in services]
        except Exception as e:
            raise ToolError(f"Failed to list services: {e}") from e

    @mcp.tool(
        annotations={
            "title": "List Service Operations",
            "readOnlyHint": True,
            "openWorldHint": False,
        },
    )
    async def list_operations(service: str) -> list[dict[str, Any]]:
        """List all operations available in a specific service.

        Args:
            service: Service name (e.g., "Conversations", "Users", "Bots").
                    Use list_services() to see available services.

        Returns:
            List of operations with operation_id, method, path, and summary.
        """
        operations = registry.list_operations(service)
        if not operations:
            available = [s.name for s in registry.list_services()][:5]
            raise ToolError(f"Service '{service}' not found. Available services include: {available}")
        try:
            return [op.model_dump() for op in operations]
        except Exception as e:
            raise ToolError(f"Failed to list operations for '{service}': {e}") from e

    @mcp.tool(
        annotations={
            "title": "Search Operations",
            "readOnlyHint": True,
            "openWorldHint": False,
        },
    )
    async def search_operations(query: str, limit: int = 20) -> list[dict[str, Any]]:
        """Search for API operations by keyword.

        Searches operation IDs, paths, summaries, and descriptions.

        Args:
            query: Search term (e.g., "conversation", "create user", "bot").
            limit: Maximum number of results to return (default 20).

        Returns:
            List of matching operations with operation_id, method, path, and summary.
        """
        try:
            operations = registry.search_operations(query, limit)
            return [op.model_dump() for op in operations]
        except Exception as e:
            raise ToolError(f"Failed to search operations: {e}") from e

    @mcp.tool(
        annotations={
            "title": "Get Operation Schema",
            "readOnlyHint": True,
            "openWorldHint": False,
        },
    )
    async def get_operation_schema(operation_id: str) -> dict[str, Any]:
        """Get the full schema for a specific API operation.

        Use this to understand the required parameters and request body
        before calling an operation.

        Args:
            operation_id: The operation ID (e.g., "conversationsGetById").
                         Use list_operations() or search_operations() to find IDs.

        Returns:
            Full operation schema including parameters, request body, and responses.
        """
        schema = registry.get_operation_schema(operation_id)
        if not schema:
            raise ToolError(
                f"Operation '{operation_id}' not found. Use search_operations() to find valid operation IDs."
            )
        try:
            return schema.model_dump()
        except Exception as e:
            raise ToolError(f"Failed to get schema for '{operation_id}': {e}") from e

    @mcp.tool(
        annotations={
            "title": "Execute API Call",
            "readOnlyHint": False,
            "destructiveHint": True,
            "idempotentHint": False,
            "openWorldHint": True,
        },
    )
    async def call_api(
        operation_id: str,
        path_params: dict[str, str] | None = None,
        query_params: dict[str, Any] | None = None,
        body: dict[str, Any] | None = None,
        headers: dict[str, str] | None = None,
        fields: list[str] | None = None,
        max_response_size: int | None = None,
    ) -> dict[str, Any]:
        """Execute an Unblu API operation.

        Args:
            operation_id: The operation ID to execute.
            path_params: Path parameters (e.g., {"conversationId": "abc123"}).
            query_params: Query string parameters.
            body: Request body for POST/PUT/PATCH operations.
            headers: Additional headers to include.
            fields: Optional list of field paths to include in response (e.g., ["id", "name", "items.id"]).
            max_response_size: Maximum response size in bytes (approximate). Responses exceeding this
                             will be truncated and marked as truncated.

        Returns:
            API response as JSON, or error details if the request failed.
        """
        op = registry.operations.get(operation_id)
        if not op:
            raise ToolError(
                f"Operation '{operation_id}' not found. Use search_operations() to find valid operation IDs."
            )

        def _filter_fields(data: Any, fields: list[str]) -> Any:
            """Filter response data to include only specified field paths.

            Args:
                data: The response data to filter
                fields: List of field paths (e.g., ["id", "name", "items.id"])

            Returns:
                Filtered data with only the requested fields
            """
            if not fields or not isinstance(data, dict):
                return data

            result = {}
            for field_path in fields:
                parts = field_path.split(".")
                current = data
                current_result = result

                # Navigate through the path
                for i, part in enumerate(parts):
                    if isinstance(current, dict) and part in current:
                        if i == len(parts) - 1:
                            # Last part - include the value
                            current_result[part] = current[part]
                        else:
                            # Intermediate part - ensure dict exists
                            if part not in current_result:
                                current_result[part] = {}
                            current_result = current_result[part]
                            current = current[part]
                    else:
                        # Path doesn't exist - skip
                        break

            return result

        # Build URL with path parameters
        path = op["path"]
        if path_params:
            for key, value in path_params.items():
                path = path.replace(f"{{{key}}}", str(value))

        # Check for unresolved path parameters
        if "{" in path:
            missing = re.findall(r"\{(\w+)\}", path)[:3]
            raise ToolError(
                f"Missing required path parameters: {missing}. Use get_operation_schema() to see required parameters."
            )

        # Build request
        method = op["method"].lower()
        request_headers = dict(headers or {})

        try:
            response = await client.request(
                method=method,
                url=path,
                params=query_params,
                json=body if body else None,
                headers=request_headers,
            )

            # Parse response
            if response.status_code == _HTTP_NO_CONTENT:
                return {"status": "success", "status_code": _HTTP_NO_CONTENT}

            try:
                raw_text = response.text
                data = json.loads(raw_text)
            except json.JSONDecodeError:
                data = {"raw": response.text[:200]}

            # Apply field filtering if requested
            if fields and response.is_success and isinstance(data, dict):
                data = _filter_fields(data, fields)

            # Check response size and truncate if necessary
            if max_response_size and response.is_success:
                response_str = json.dumps(data, separators=(",", ":"))
                if len(response_str.encode("utf-8")) > max_response_size:
                    # Truncate to fit within limit
                    truncated_data = {
                        "_truncated": True,
                        "_size": len(response_str.encode("utf-8")),
                        "_limit": max_response_size,
                        "data": None,
                    }

                    # Try to include a summary or first few items
                    if isinstance(data, dict):
                        truncated_data["data"] = {"keys": list(data.keys())[:10]}
                    elif isinstance(data, list):
                        truncated_data["data"] = {"count": len(data), "first_items": data[:3] if data else []}

                    data = truncated_data

            if response.is_success:
                return {"status": "success", "status_code": response.status_code, "data": data}
            return {
                "status": "error",
                "code": response.status_code,
                "error": str(data)[:300] if data else "Unknown error",
            }

        except httpx.RequestError as e:
            raise ToolError(f"API request failed: {e!s}") from e

    return mcp

detect_environment_from_context

detect_environment_from_context() -> str | None

Detect the environment from the current kubectl context.

Matches environment names from the loaded configuration against patterns in the current kubectl context name.

Returns:

  • str | None

    Environment name or None if not detected.

Source code in src/unblu_mcp/_internal/providers_k8s.py
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
def detect_environment_from_context() -> str | None:
    """Detect the environment from the current kubectl context.

    Matches environment names from the loaded configuration against
    patterns in the current kubectl context name.

    Returns:
        Environment name or None if not detected.
    """
    try:
        result = subprocess.run(
            ["kubectl", "config", "current-context"],  # noqa: S607
            capture_output=True,
            text=True,
            check=True,
        )
        context = result.stdout.strip()

        # Match against configured environment names
        environments = _get_default_environments()
        for env in environments:
            if f"-{env}-" in context or context.endswith(f"-{env}"):
                return env
        return None  # noqa: TRY300
    except (subprocess.CalledProcessError, FileNotFoundError):
        return None

get_parser

get_parser() -> ArgumentParser

Return the CLI argument parser.

Returns:

Source code in src/unblu_mcp/_internal/cli.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def get_parser() -> argparse.ArgumentParser:
    """Return the CLI argument parser.

    Returns:
        An argparse parser.
    """
    parser = argparse.ArgumentParser(
        prog="unblu-mcp",
        description="Unblu MCP Server - Token-efficient access to Unblu API",
    )
    parser.add_argument("-V", "--version", action="version", version=f"%(prog)s {debug._get_version()}")
    parser.add_argument("--debug-info", action=_DebugInfo, help="Print debug information.")
    parser.add_argument(
        "--spec",
        type=str,
        default=None,
        help="Path to swagger.json OpenAPI spec file.",
    )
    parser.add_argument(
        "--policy",
        type=str,
        default=None,
        help="Path to Eunomia policy JSON file for authorization. Requires unblu-mcp[safety].",
    )
    parser.add_argument(
        "--provider",
        type=str,
        choices=["default", "k8s"],
        default="default",
        help="Connection provider to use (default: default).",
    )
    parser.add_argument(
        "--environment",
        type=str,
        default="dev",
        help="K8s environment name (only with --provider k8s). Default: dev.",
    )
    parser.add_argument(
        "--k8s-config",
        type=str,
        default=None,
        help="Path to K8s environments YAML config file (only with --provider k8s).",
    )
    return parser

get_server

get_server() -> FastMCP

Get or create the global server instance.

Source code in src/unblu_mcp/_internal/server.py
651
652
653
654
655
def get_server() -> FastMCP:
    """Get or create the global server instance."""
    if _ServerHolder.instance is None:
        _ServerHolder.instance = create_server()
    return _ServerHolder.instance

main

main(args: list[str] | None = None) -> int

Run the main program.

This function is executed when you type unblu-mcp or python -m unblu_mcp.

Parameters:

  • args (list[str] | None, default: None ) –

    Arguments passed from the command line.

Returns:

  • int

    An exit code.

Source code in src/unblu_mcp/_internal/cli.py
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def main(args: list[str] | None = None) -> int:
    """Run the main program.

    This function is executed when you type `unblu-mcp` or `python -m unblu_mcp`.

    Parameters:
        args: Arguments passed from the command line.

    Returns:
        An exit code.
    """
    parser = get_parser()
    if args == []:
        parser.print_help()
        return 0
    opts = parser.parse_args(args=args)

    provider = _get_provider(opts.provider, opts.environment, opts.k8s_config)
    server = _create_server(spec_path=opts.spec, policy_file=opts.policy, provider=provider)
    server.run()
    return 0