Prisma Cloud Docs
Provides search access to Prisma Cloud documentation by crawling and indexing pages from both main docs and API documentation, implementing caching with TTL expiration and relevance scoring to return structured results with snippets and URLs for quick documentation access.
0Tools
10Findings
3Stars
Mar 22, 2026Last Scanned
2 critical · 6 high · 1 medium · 1 low findings detected
Security Category Deep Dive
Prompt Injection
Prompt & context manipulation attacks
69
Maturity
14
Rules
5
Sub-Categories
1
Gaps
64%
Implemented
56
Tests
1
Stories
100%3 rules
Injection via tool descriptions and parameter fields
GAP-001Prompt Injection Coverage GapMissing detection coverage for emerging prompt injection attack variants not addressed by current rules
100%4 rules
Hidden instructions via external content and tool responses
100%2 rules
Context window saturation and prior-approval exploitation
100%3 rules
Payload hiding via invisible chars, base64, schema fields
100%2 rules
Injection via prompt templates and runtime tool output
Findings10
2critical
6high
1medium
1low
Critical2
criticalQ13MCP Bridge Package Supply Chain AttackMCP10-supply-chainAML.T0054
Pattern "(?:mcp|fastmcp|langchain-mcp|llama-index-mcp)(?:>=|~=|==)?(?!\d)" matched in source_code: "mcp" (at position 90)
MCP bridge packages (mcp-remote, mcp-proxy, @modelcontextprotocol/sdk, fastmcp) are high-value supply chain targets — CVE-2025-6514 (CVSS 9.6) in mcp-remote affected 437,000+ installs. Always pin exact versions (no ^ or ~ ranges). Use lockfiles (package-lock.json, pnpm-lock.yaml, uv.lock). Never run `npx mcp-remote` without version pinning. Verify package integrity with `npm audit` or `pip-audit` before deployment. Reference: CVE-2025-6514, OWASP ASI04.
criticalL7Transitive MCP Server DelegationMCP06-excessive-permissionsAML.T0054
Pattern "(?:connect|initialize).*(?:mcp|modelcontextprotocol).*(?:server|endpoint|url)" matched in source_code: "Initialize MCP server" (at position 326)
MCP servers MUST NOT create client connections to other MCP servers without explicit user disclosure. If delegation is required, declare all downstream servers in the server's capabilities and tool descriptions. Never forward user credentials to sub-servers. Implement a trust boundary between the approved server and any delegated servers. Log all transitive delegations for audit.
High6
highD1Known CVEs in DependenciesMCP08-dependency-vuln
Dependency "mcp@null" has known CVEs:
Update dependencies to versions that patch known CVEs. Run 'npm audit fix' or 'pip-audit' to identify and resolve vulnerable dependencies.
highK11Missing Server Integrity VerificationMCP10-supply-chainAML.T0054
Pattern "(connect|load|register|add)[_\s-]?(mcp|server|tool)(?!.*(?:verify|validate|checksum|hash|sign|cert|fingerprint|pin))" matched in source_code: "Register tool" (at position 7503)
Implement cryptographic verification for MCP server connections: (1) Pin server TLS certificates or public keys, (2) Verify server tool definition checksums against a known-good manifest, (3) Use package manager integrity checks (npm integrity, pip --require-hashes). The MCP spec recommends but doesn't yet mandate server signing — implement it proactively. Required by ISO 27001 A.8.24 and CoSAI MCP-T6.
highC3Server-Side Request Forgery (SSRF)MCP04-data-exfiltrationAML.T0057
Pattern "session\.(?:get|post|put|delete|patch|request)\s*\([^)]*(?:req|request|input|param|params|args|url|uri|href|target|endpoint|host)" matched in source_code: "session.get(url" (at position 1819)
Validate ALL user-supplied URLs before making HTTP requests:
1. Parse the URL and check the hostname against an explicit allowlist of permitted domains.
2. Block requests to RFC 1918 private ranges: 10.0.0.0/8, 172.16.0.0/12, 192.168.0.0/16.
3. Block loopback (127.0.0.0/8), link-local (169.254.0.0/16), and IPv6 equivalents.
4. Block file:// and other non-http(s) protocols explicitly.
5. Disable automatic redirect following, or re-validate each redirect destination.
6. In cloud environments: block requests to IMDS endpoints (169.254.169.254,
metadata.google.internal) at both the application AND network layer.
Example (Node.js): Use the `ssrf-req-filter` package or implement URL validation
against an allowlist before calling fetch/axios/got.
highK13Unsanitized Tool OutputMCP02-tool-poisoningAML.T0054
Pattern "(?:query|execute|select|find).*(?:return|respond|result|rows|data)(?!.*(?:sanitize|escape|encode|map|filter|select|pick))" matched in source_code: "query -> (result" (at position 800)
Sanitize all external data before including in tool responses. Implement output encoding that neutralizes prompt injection patterns. Truncate excessively long content. Validate structure before passing database results. Apply the principle: treat all external data as untrusted, even in tool outputs. Required by CoSAI MCP-T4.
highC7Wildcard CORSMCP07-insecure-config
Pattern "allow_origins\s*=\s*\[\s*['"]\*['"]\s*\]" matched in source_code: "allow_origins=["*"]" (at position 9713)
Replace wildcard CORS with an explicit allowlist of permitted origins. Wildcard CORS allows any website to make requests to the MCP server.
highO8Timing-Based Covert ChannelMCP04-data-exfiltrationAML.T0057
Pattern "(?:delay|sleep|timeout|interval)\s*[:=]\s*(?:[^;]*(?:secret|token|password|credential|key|env))" matched in source_code: "timeout=10) as response:
if response.status == 200:
content = await response.text()
soup = BeautifulSoup(content, 'html.parser')
# Extract page content
title = soup.find('title')
title_text = title.text.strip() if title else url
# Remove script and style elements
for script in soup(["script", "style"]):
script.decompose()
# Get text content
text_content = soup.get_text()
lines = (line.strip() for line in text_content.splitlines())
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
text = ' '.join(chunk for chunk in chunks if chunk)
# Store in cache
self.cached_pages[url] = CachedPage(
title=title_text,
content=text[:5000], # Limit content length
url=url,
site=site_name,
timestamp=time.time()
)
pages_indexed += 1
# Find more links to index
if pages_indexed < max_pages:
links = soup.find_all('a', href=True)
for link in links:
href = link['href']
full_url = urljoin(url, href)
# Only index URLs from the same domain
if urlparse(full_url).netloc == urlparse(base_url).netloc:
if full_url not in visited_urls and full_url not in urls_to_visit_set:
urls_to_visit.append(full_url)
urls_to_visit_set.add(full_url)
except Exception as e:
print(f"Error indexing {url}: {e}")
continue
return pages_indexed
async def search_docs(self, query: str, site: str = None) -> List[Dict]:
"""Search indexed documentation"""
if not self.cached_pages:
return []
query_lower = query.lower()
results = []
for url, page in self.cached_pages.items():
# Filter by site if specified
if site and page.site != site:
continue
# Calculate relevance score
score = 0
title_lower = page.title.lower()
content_lower = page.content.lower()
# Higher score for title matches
if query_lower in title_lower:
score += 10
# Even higher for exact title matches
if query_lower == title_lower:
score += 20
# Score for content matches
content_matches = content_lower.count(query_lower)
score += content_matches * 2
# Score for partial word matches in title
query_words = query_lower.split()
for word in query_words:
if word in title_lower:
score += 5
if word in content_lower:
score += 1
if score > 0:
# Extract snippet around first match
snippet = self._extract_snippet(page.content, query, max_length=200)
results.append({
'title': page.title,
'url': page.url,
'site': page.site,
'snippet': snippet,
'score': score
})
# Sort by relevance score (highest first) and limit results
results.sort(key=lambda x: x['score'], reverse=True)
return results[:10]
def _extract_snippet(self, content: str, query: str, max_length: int = 200) -> str:
"""Extract a snippet of content around the query match"""
query_lower = query.lower()
content_lower = content.lower()
# Find the first occurrence of the query
match_index = content_lower.find(query_lower)
if match_index == -1:
# If no exact match, return beginning of content
return content[:max_length] + "..." if len(content) > max_length else content
# Calculate snippet boundaries
start = max(0, match_index - max_length // 2)
end = min(len(content), start + max_length)
# Adjust start if we're near the end
if end - start < max_length:
start = max(0, end - max_length)
snippet = content[start:end]
# Add ellipsis if we're not at the beginning/end
if start > 0:
snippet = "..." + snippet
if end < len(content):
snippet = snippet + "..."
return snippet
# Initialize indexer
indexer = DocumentationIndexer()
# Register tools
@mcp.tool()
async def search_prisma_docs(query: str) -> str:
"""Search Prisma Cloud documentation"""
results = await indexer.search_docs(query, site='prisma_cloud')
return json.dumps(results, indent=2)
@mcp.tool()
async def search_prisma_api_docs(query: str) -> str:
"""Search Prisma Cloud API documentation"""
results = await indexer.search_docs(query, site='prisma_api')
return json.dumps(results, indent=2)
@mcp.tool()
async def search_all_docs(query: str) -> str:
"""Search across all Prisma Cloud documentation sites."""
results = await indexer.search_docs(query)
return json.dumps(results, indent=2)
@mcp.tool()
async def index_prisma_docs(max_pages: int = 50) -> str:
"""Index Prisma Cloud documentation. Call this first before searching."""
pages_indexed = await indexer.index_site('prisma_cloud', max_pages)
return f"Indexed {pages_indexed} pages from Prisma Cloud documentation"
@mcp.tool()
async def index_prisma_api_docs(max_pages: int = 50) -> str:
"""Index Prisma Cloud API documentation. Call this first before searching."""
pages_indexed = await indexer.index_site('prisma_api', max_pages)
return f"Indexed {pages_indexed} pages from Prisma Cloud API documentation"
@mcp.tool()
async def get_index_status() -> str:
"""Check how many documents are currently cached."""
total_docs = len(indexer.cached_pages)
sites = {}
for page in indexer.cached_pages.values():
site = page.site
sites[site] = sites.get(site, 0) + 1
# Also show cache statistics
expired_count = sum(1 for page in indexer.cached_pages.values() if page.is_expired)
return json.dumps({
'total_cached_pages': total_docs,
'expired_pages': expired_count,
'search_cache_entries': len(indexer.search_cache),
'by_site': sites
}, indent=2)
def main():
# HTTP mode for Smithery deployment
print("MCP Server starting in HTTP mode...")
# Setup Starlette app with CORS for cross-origin requests
app = mcp.streamable_http_app()
# IMPORTANT: add CORS middleware for browser based clients
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["GET", "POST", "OPTIONS"],
allow_headers=["*"],
expose_headers=["mcp-session-id", "mcp-protocol-version"],
max_age=86400,
)
# Use Smithery-required PORT environment variable
port = int(os.environ.get("PORT", 8081))
print(f"Listening on port {port}")
uvicorn.run(app, host="0.0.0.0", port=port, log_level="debug")
if __name__ == "__main__":
main()
// ═══ FILE: server.py ═══
from typing import List, Dict
from mcp.server.fastmcp import FastMCP
import aiohttp
from bs4 import BeautifulSoup
import json
from urllib.parse import urljoin, urlparse
import time
from dataclasses import dataclass
# Initialize the MCP server
mcp = FastMCP("Prisma Cloud Docs MCP Server")
@dataclass
class CachedPage:
title: str
content: str
url: str
site: str
timestamp: float
ttl: float = 3600 # 1 hour default TTL
@property
def is_expired(self) -> bool:
return time.time() > self.timestamp + self.ttl
class DocumentationIndexer:
def __init__(self):
self.cached_pages = {} # URL -> CachedPage
self.search_cache = {} # query -> (results, timestamp)
self.base_urls = {
'prisma_cloud': 'https://docs.prismacloud.io/',
'prisma_api': 'https://pan.dev/prisma-cloud/api/',
}
self.search_cache_ttl = 300 # 5 minutes for search results
async def index_site(self, site_name: str, max_pages: int = 100):
"""Index documentation from a specific site"""
if site_name not in self.base_urls:
raise ValueError(f"Unknown site: {site_name}")
base_url = self.base_urls[site_name]
visited_urls = set()
urls_to_visit = [base_url]
pages_indexed = 0
async with aiohttp.ClientSession() as session:
while urls_to_visit and pages_indexed < max_pages:
url = urls_to_visit.pop(0)
if url in visited_urls:
continue
visited_urls.add(url)
try:
async with session.get(url, timeout=10) as response:
if response.status == 200:
content = await response.text()
soup = BeautifulSoup(content, 'html.parser')
# Extract page content
title = soup.find('title')
title_text = title.text.strip() if title else url
# Remove script and style elements
for script in soup(["script", "style"]):
script.decompose()
# Get text content
text_content = soup.get_text()
lines = (line.strip() for line in text_content.splitlines())
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
text = ' '.join(chunk for chunk in chunks if chunk)
# Store in cache
self.cached_pages[url] = CachedPage(
title=title_text,
content=text[:5000], # Limit content length
url=url,
site=site_name,
timestamp=time.time()
)
pages_indexed += 1
# Find more links to index
if pages_indexed < max_pages:
links = soup.find_all('a', href=True)
for link in links:
href = link['href']
full_url = urljoin(url, href)
# Only index URLs from the same domain
if urlparse(full_url).netloc == urlparse(base_url).netloc:
if full_url not in visited_urls and full_url not in urls_to_visit:
urls_to_visit.append(full_url)
except Exception as e:
print(f"Error indexing {url}: {e}")
continue
return pages_indexed
async def search_docs(self, query: str, site: str = None) -> List[Dict]:
"""Search indexed documentation"""
if not self.cached_pages:
return []
query_lower = query.lower()
results = []
for url, page in self.cached_pages.items():
# Filter by site if specified
if site and page.site != site:
continue
# Calculate relevance score
score = 0
title_lower = page.title.lower()
content_lower = page.content.lower()
# Higher score for title matches
if query_lower in title_lower:
score += 10
# Even higher for exact title matches
if query_lower == title_lower:
score += 20
# Score for content matches
content_matches = content_lower.count(query_lower)
score += content_matches * 2
# Score for partial word matches in title
query_words = query_lower.split()
for word in query_words:
if word in title_lower:
score += 5
if word in content_lower:
score += 1
if score > 0:
# Extract snippet around first match
snippet = self._extract_snippet(page.content, query, max_length=200)
results.append({
'title': page.title,
'url': page.url,
'site': page.site,
'snippet': snippet,
'score': score
})
# Sort by relevance score (highest first) and limit results
results.sort(key" (at position 1836)
Remove all code that calculates sleep/delay durations from application data, secrets, or any variable-length content. Tool response times should be constant or determined only by legitimate processing time. If rate limiting is needed, use fixed intervals not derived from data values. Monitor for anomalous response time patterns that could indicate timing-based exfiltration.
Medium1
mediumK17Missing Timeout or Circuit BreakerMCP07-insecure-configAML.T0054
Pattern "(?:query|execute|find|select|aggregate)\s*\((?!.*(?:timeout|maxTimeMS|statement_timeout|deadline|cancel))" matched in source_code: "find(" (at position 2166)
Add timeouts to ALL external calls: HTTP requests (30s), database queries (10s), subprocess execution (60s), and MCP tool calls (30s). Implement circuit breakers that open after N consecutive failures (e.g., opossum, cockatiel). Use AbortSignal for cancellable operations. Required by EU AI Act Art. 15 and OWASP ASI08.
Low1
lowF4MCP Spec Non-ComplianceMCP07-insecure-config
Server fails MCP spec compliance checks: required:server_name; required:server_version; required:protocol_version; recommended:tool_descriptions; recommended:parameter_descriptions
Follow the MCP specification for server metadata. Include server name, version, and protocol version. Provide descriptions for all tools and parameters.