Prisma Cloud Docs

Provides search access to Prisma Cloud documentation by crawling and indexing pages from both main docs and API documentation, implementing caching with TTL expiration and relevance scoring to return structured results with snippets and URLs for quick documentation access.

databasePython
0Tools
10Findings
3Stars
Mar 22, 2026Last Scanned
2 critical · 6 high · 1 medium · 1 low findings detected

Security Category Deep Dive

Prompt Injection
Prompt & context manipulation attacks
69
Maturity
14
Rules
5
Sub-Categories
1
Gaps
64%
Implemented
56
Tests
1
Stories
PI-DIRDirect Input Injection
100%3 rules
Injection via tool descriptions and parameter fields
GAP-001Prompt Injection Coverage GapMissing detection coverage for emerging prompt injection attack variants not addressed by current rules
PI-INDIndirect / Gateway Injection
100%4 rules
Hidden instructions via external content and tool responses
PI-CTXContext Manipulation
100%2 rules
Context window saturation and prior-approval exploitation
PI-ENCEncoding & Obfuscation
100%3 rules
Payload hiding via invisible chars, base64, schema fields
PI-TPLTemplate & Output Poisoning
100%2 rules
Injection via prompt templates and runtime tool output
Framework Coverage
OWASP MCP Top 1014/14
MITRE ATLAS14/14
CoSAI MCP2/14
OWASP Agentic Top 1012/14
Kill Chain Phases
0Initial Access
0Defense Evasion
0Execution
0Persistence

Findings10

2critical
6high
1medium
1low

Critical2

criticalQ13MCP Bridge Package Supply Chain AttackMCP10-supply-chainAML.T0054
Pattern "(?:mcp|fastmcp|langchain-mcp|llama-index-mcp)(?:>=|~=|==)?(?!\d)" matched in source_code: "mcp" (at position 90)
MCP bridge packages (mcp-remote, mcp-proxy, @modelcontextprotocol/sdk, fastmcp) are high-value supply chain targets — CVE-2025-6514 (CVSS 9.6) in mcp-remote affected 437,000+ installs. Always pin exact versions (no ^ or ~ ranges). Use lockfiles (package-lock.json, pnpm-lock.yaml, uv.lock). Never run `npx mcp-remote` without version pinning. Verify package integrity with `npm audit` or `pip-audit` before deployment. Reference: CVE-2025-6514, OWASP ASI04.
criticalL7Transitive MCP Server DelegationMCP06-excessive-permissionsAML.T0054
Pattern "(?:connect|initialize).*(?:mcp|modelcontextprotocol).*(?:server|endpoint|url)" matched in source_code: "Initialize MCP server" (at position 326)
MCP servers MUST NOT create client connections to other MCP servers without explicit user disclosure. If delegation is required, declare all downstream servers in the server's capabilities and tool descriptions. Never forward user credentials to sub-servers. Implement a trust boundary between the approved server and any delegated servers. Log all transitive delegations for audit.

High6

highD1Known CVEs in DependenciesMCP08-dependency-vuln
Dependency "mcp@null" has known CVEs:
Update dependencies to versions that patch known CVEs. Run 'npm audit fix' or 'pip-audit' to identify and resolve vulnerable dependencies.
highK11Missing Server Integrity VerificationMCP10-supply-chainAML.T0054
Pattern "(connect|load|register|add)[_\s-]?(mcp|server|tool)(?!.*(?:verify|validate|checksum|hash|sign|cert|fingerprint|pin))" matched in source_code: "Register tool" (at position 7503)
Implement cryptographic verification for MCP server connections: (1) Pin server TLS certificates or public keys, (2) Verify server tool definition checksums against a known-good manifest, (3) Use package manager integrity checks (npm integrity, pip --require-hashes). The MCP spec recommends but doesn't yet mandate server signing — implement it proactively. Required by ISO 27001 A.8.24 and CoSAI MCP-T6.
highC3Server-Side Request Forgery (SSRF)MCP04-data-exfiltrationAML.T0057
Pattern "session\.(?:get|post|put|delete|patch|request)\s*\([^)]*(?:req|request|input|param|params|args|url|uri|href|target|endpoint|host)" matched in source_code: "session.get(url" (at position 1819)
Validate ALL user-supplied URLs before making HTTP requests: 1. Parse the URL and check the hostname against an explicit allowlist of permitted domains. 2. Block requests to RFC 1918 private ranges: 10.0.0.0/8, 172.16.0.0/12, 192.168.0.0/16. 3. Block loopback (127.0.0.0/8), link-local (169.254.0.0/16), and IPv6 equivalents. 4. Block file:// and other non-http(s) protocols explicitly. 5. Disable automatic redirect following, or re-validate each redirect destination. 6. In cloud environments: block requests to IMDS endpoints (169.254.169.254, metadata.google.internal) at both the application AND network layer. Example (Node.js): Use the `ssrf-req-filter` package or implement URL validation against an allowlist before calling fetch/axios/got.
highK13Unsanitized Tool OutputMCP02-tool-poisoningAML.T0054
Pattern "(?:query|execute|select|find).*(?:return|respond|result|rows|data)(?!.*(?:sanitize|escape|encode|map|filter|select|pick))" matched in source_code: "query -> (result" (at position 800)
Sanitize all external data before including in tool responses. Implement output encoding that neutralizes prompt injection patterns. Truncate excessively long content. Validate structure before passing database results. Apply the principle: treat all external data as untrusted, even in tool outputs. Required by CoSAI MCP-T4.
highC7Wildcard CORSMCP07-insecure-config
Pattern "allow_origins\s*=\s*\[\s*['"]\*['"]\s*\]" matched in source_code: "allow_origins=["*"]" (at position 9713)
Replace wildcard CORS with an explicit allowlist of permitted origins. Wildcard CORS allows any website to make requests to the MCP server.
highO8Timing-Based Covert ChannelMCP04-data-exfiltrationAML.T0057
Pattern "(?:delay|sleep|timeout|interval)\s*[:=]\s*(?:[^;]*(?:secret|token|password|credential|key|env))" matched in source_code: "timeout=10) as response: if response.status == 200: content = await response.text() soup = BeautifulSoup(content, 'html.parser') # Extract page content title = soup.find('title') title_text = title.text.strip() if title else url # Remove script and style elements for script in soup(["script", "style"]): script.decompose() # Get text content text_content = soup.get_text() lines = (line.strip() for line in text_content.splitlines()) chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) text = ' '.join(chunk for chunk in chunks if chunk) # Store in cache self.cached_pages[url] = CachedPage( title=title_text, content=text[:5000], # Limit content length url=url, site=site_name, timestamp=time.time() ) pages_indexed += 1 # Find more links to index if pages_indexed < max_pages: links = soup.find_all('a', href=True) for link in links: href = link['href'] full_url = urljoin(url, href) # Only index URLs from the same domain if urlparse(full_url).netloc == urlparse(base_url).netloc: if full_url not in visited_urls and full_url not in urls_to_visit_set: urls_to_visit.append(full_url) urls_to_visit_set.add(full_url) except Exception as e: print(f"Error indexing {url}: {e}") continue return pages_indexed async def search_docs(self, query: str, site: str = None) -> List[Dict]: """Search indexed documentation""" if not self.cached_pages: return [] query_lower = query.lower() results = [] for url, page in self.cached_pages.items(): # Filter by site if specified if site and page.site != site: continue # Calculate relevance score score = 0 title_lower = page.title.lower() content_lower = page.content.lower() # Higher score for title matches if query_lower in title_lower: score += 10 # Even higher for exact title matches if query_lower == title_lower: score += 20 # Score for content matches content_matches = content_lower.count(query_lower) score += content_matches * 2 # Score for partial word matches in title query_words = query_lower.split() for word in query_words: if word in title_lower: score += 5 if word in content_lower: score += 1 if score > 0: # Extract snippet around first match snippet = self._extract_snippet(page.content, query, max_length=200) results.append({ 'title': page.title, 'url': page.url, 'site': page.site, 'snippet': snippet, 'score': score }) # Sort by relevance score (highest first) and limit results results.sort(key=lambda x: x['score'], reverse=True) return results[:10] def _extract_snippet(self, content: str, query: str, max_length: int = 200) -> str: """Extract a snippet of content around the query match""" query_lower = query.lower() content_lower = content.lower() # Find the first occurrence of the query match_index = content_lower.find(query_lower) if match_index == -1: # If no exact match, return beginning of content return content[:max_length] + "..." if len(content) > max_length else content # Calculate snippet boundaries start = max(0, match_index - max_length // 2) end = min(len(content), start + max_length) # Adjust start if we're near the end if end - start < max_length: start = max(0, end - max_length) snippet = content[start:end] # Add ellipsis if we're not at the beginning/end if start > 0: snippet = "..." + snippet if end < len(content): snippet = snippet + "..." return snippet # Initialize indexer indexer = DocumentationIndexer() # Register tools @mcp.tool() async def search_prisma_docs(query: str) -> str: """Search Prisma Cloud documentation""" results = await indexer.search_docs(query, site='prisma_cloud') return json.dumps(results, indent=2) @mcp.tool() async def search_prisma_api_docs(query: str) -> str: """Search Prisma Cloud API documentation""" results = await indexer.search_docs(query, site='prisma_api') return json.dumps(results, indent=2) @mcp.tool() async def search_all_docs(query: str) -> str: """Search across all Prisma Cloud documentation sites.""" results = await indexer.search_docs(query) return json.dumps(results, indent=2) @mcp.tool() async def index_prisma_docs(max_pages: int = 50) -> str: """Index Prisma Cloud documentation. Call this first before searching.""" pages_indexed = await indexer.index_site('prisma_cloud', max_pages) return f"Indexed {pages_indexed} pages from Prisma Cloud documentation" @mcp.tool() async def index_prisma_api_docs(max_pages: int = 50) -> str: """Index Prisma Cloud API documentation. Call this first before searching.""" pages_indexed = await indexer.index_site('prisma_api', max_pages) return f"Indexed {pages_indexed} pages from Prisma Cloud API documentation" @mcp.tool() async def get_index_status() -> str: """Check how many documents are currently cached.""" total_docs = len(indexer.cached_pages) sites = {} for page in indexer.cached_pages.values(): site = page.site sites[site] = sites.get(site, 0) + 1 # Also show cache statistics expired_count = sum(1 for page in indexer.cached_pages.values() if page.is_expired) return json.dumps({ 'total_cached_pages': total_docs, 'expired_pages': expired_count, 'search_cache_entries': len(indexer.search_cache), 'by_site': sites }, indent=2) def main(): # HTTP mode for Smithery deployment print("MCP Server starting in HTTP mode...") # Setup Starlette app with CORS for cross-origin requests app = mcp.streamable_http_app() # IMPORTANT: add CORS middleware for browser based clients app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["GET", "POST", "OPTIONS"], allow_headers=["*"], expose_headers=["mcp-session-id", "mcp-protocol-version"], max_age=86400, ) # Use Smithery-required PORT environment variable port = int(os.environ.get("PORT", 8081)) print(f"Listening on port {port}") uvicorn.run(app, host="0.0.0.0", port=port, log_level="debug") if __name__ == "__main__": main() // ═══ FILE: server.py ═══ from typing import List, Dict from mcp.server.fastmcp import FastMCP import aiohttp from bs4 import BeautifulSoup import json from urllib.parse import urljoin, urlparse import time from dataclasses import dataclass # Initialize the MCP server mcp = FastMCP("Prisma Cloud Docs MCP Server") @dataclass class CachedPage: title: str content: str url: str site: str timestamp: float ttl: float = 3600 # 1 hour default TTL @property def is_expired(self) -> bool: return time.time() > self.timestamp + self.ttl class DocumentationIndexer: def __init__(self): self.cached_pages = {} # URL -> CachedPage self.search_cache = {} # query -> (results, timestamp) self.base_urls = { 'prisma_cloud': 'https://docs.prismacloud.io/', 'prisma_api': 'https://pan.dev/prisma-cloud/api/', } self.search_cache_ttl = 300 # 5 minutes for search results async def index_site(self, site_name: str, max_pages: int = 100): """Index documentation from a specific site""" if site_name not in self.base_urls: raise ValueError(f"Unknown site: {site_name}") base_url = self.base_urls[site_name] visited_urls = set() urls_to_visit = [base_url] pages_indexed = 0 async with aiohttp.ClientSession() as session: while urls_to_visit and pages_indexed < max_pages: url = urls_to_visit.pop(0) if url in visited_urls: continue visited_urls.add(url) try: async with session.get(url, timeout=10) as response: if response.status == 200: content = await response.text() soup = BeautifulSoup(content, 'html.parser') # Extract page content title = soup.find('title') title_text = title.text.strip() if title else url # Remove script and style elements for script in soup(["script", "style"]): script.decompose() # Get text content text_content = soup.get_text() lines = (line.strip() for line in text_content.splitlines()) chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) text = ' '.join(chunk for chunk in chunks if chunk) # Store in cache self.cached_pages[url] = CachedPage( title=title_text, content=text[:5000], # Limit content length url=url, site=site_name, timestamp=time.time() ) pages_indexed += 1 # Find more links to index if pages_indexed < max_pages: links = soup.find_all('a', href=True) for link in links: href = link['href'] full_url = urljoin(url, href) # Only index URLs from the same domain if urlparse(full_url).netloc == urlparse(base_url).netloc: if full_url not in visited_urls and full_url not in urls_to_visit: urls_to_visit.append(full_url) except Exception as e: print(f"Error indexing {url}: {e}") continue return pages_indexed async def search_docs(self, query: str, site: str = None) -> List[Dict]: """Search indexed documentation""" if not self.cached_pages: return [] query_lower = query.lower() results = [] for url, page in self.cached_pages.items(): # Filter by site if specified if site and page.site != site: continue # Calculate relevance score score = 0 title_lower = page.title.lower() content_lower = page.content.lower() # Higher score for title matches if query_lower in title_lower: score += 10 # Even higher for exact title matches if query_lower == title_lower: score += 20 # Score for content matches content_matches = content_lower.count(query_lower) score += content_matches * 2 # Score for partial word matches in title query_words = query_lower.split() for word in query_words: if word in title_lower: score += 5 if word in content_lower: score += 1 if score > 0: # Extract snippet around first match snippet = self._extract_snippet(page.content, query, max_length=200) results.append({ 'title': page.title, 'url': page.url, 'site': page.site, 'snippet': snippet, 'score': score }) # Sort by relevance score (highest first) and limit results results.sort(key" (at position 1836)
Remove all code that calculates sleep/delay durations from application data, secrets, or any variable-length content. Tool response times should be constant or determined only by legitimate processing time. If rate limiting is needed, use fixed intervals not derived from data values. Monitor for anomalous response time patterns that could indicate timing-based exfiltration.

Medium1

mediumK17Missing Timeout or Circuit BreakerMCP07-insecure-configAML.T0054
Pattern "(?:query|execute|find|select|aggregate)\s*\((?!.*(?:timeout|maxTimeMS|statement_timeout|deadline|cancel))" matched in source_code: "find(" (at position 2166)
Add timeouts to ALL external calls: HTTP requests (30s), database queries (10s), subprocess execution (60s), and MCP tool calls (30s). Implement circuit breakers that open after N consecutive failures (e.g., opossum, cockatiel). Use AbortSignal for cancellable operations. Required by EU AI Act Art. 15 and OWASP ASI08.

Low1

lowF4MCP Spec Non-ComplianceMCP07-insecure-config
Server fails MCP spec compliance checks: required:server_name; required:server_version; required:protocol_version; recommended:tool_descriptions; recommended:parameter_descriptions
Follow the MCP specification for server metadata. Include server name, version, and protocol version. Provide descriptions for all tools and parameters.