DingTalk
Provides integration with the DingTalk API, enabling easy deployment and scaling of services like chatbots and data processing pipelines.
0Tools
12Findings
6Stars
Mar 22, 2026Last Scanned
4 critical · 5 high · 2 medium · 1 low findings detected
Security Category Deep Dive
Prompt Injection
Prompt & context manipulation attacks
69
Maturity
14
Rules
5
Sub-Categories
1
Gaps
64%
Implemented
56
Tests
1
Stories
100%3 rules
Injection via tool descriptions and parameter fields
GAP-001Prompt Injection Coverage GapMissing detection coverage for emerging prompt injection attack variants not addressed by current rules
100%4 rules
Hidden instructions via external content and tool responses
100%2 rules
Context window saturation and prior-approval exploitation
100%3 rules
Payload hiding via invisible chars, base64, schema fields
100%2 rules
Injection via prompt templates and runtime tool output
Findings12
4critical
5high
2medium
1low
Critical4
criticalK14Agent Credential Propagation via Shared StateMCP05-privilege-escalationAML.T0054
Pattern "(process\.env|os\.environ|setenv|putenv).*(?:token|credential|api[_\s-]?key|secret|password)" matched in source_code: "os.environ.get("DINGDING_APP_SECRET" (at position 2660)
Never write credentials to shared agent state. Use credential vaults (HashiCorp Vault, AWS Secrets Manager) with per-agent scoped access. Implement OAuth token exchange (RFC 8693) for cross-agent authorization. Redact credentials from all agent outputs before writing to shared memory. Required by OWASP ASI03/ASI07 and MAESTRO L7.
criticalQ13MCP Bridge Package Supply Chain AttackMCP10-supply-chainAML.T0054
Pattern "(?:mcp|fastmcp|langchain-mcp|llama-index-mcp)(?:>=|~=|==)?(?!\d)" matched in source_code: "mcp" (at position 149)
MCP bridge packages (mcp-remote, mcp-proxy, @modelcontextprotocol/sdk, fastmcp) are high-value supply chain targets — CVE-2025-6514 (CVSS 9.6) in mcp-remote affected 437,000+ installs. Always pin exact versions (no ^ or ~ ranges). Use lockfiles (package-lock.json, pnpm-lock.yaml, uv.lock). Never run `npx mcp-remote` without version pinning. Verify package integrity with `npm audit` or `pip-audit` before deployment. Reference: CVE-2025-6514, OWASP ASI04.
criticalN9MCP Logging Protocol InjectionMCP01-prompt-injectionAML.T0054
Pattern "(?:logger|logging)\s*\.\s*(?:info|warning|error|debug)\s*\(\s*(?:f"|f').*(?:result|output|response|tool_output|content)" matched in source_code: "logger.debug(f"Response" (at position 1669)
Sanitize all dynamic content before including in MCP log messages. MCP logging notifications (notifications/message) are displayed in client UIs — injection payloads in log data can influence AI behavior if the client processes logs as context. Strip control characters, ANSI escape codes, newlines, and prompt injection patterns from log content. Use structured logging with separate data fields.
criticalK8Cross-Boundary Credential SharingMCP05-privilege-escalationAML.T0054
Pattern "(return|respond|output|result).*(?:token|credential|api[_\s-]?key|secret|password|bearer)" matched in source_code: "return token" (at position 3386)
Never forward, share, or embed credentials across trust boundaries. Use OAuth token exchange (RFC 8693) to create scoped, delegated tokens instead of passing original credentials. Never include credentials in tool responses. Required by ISO 27001 A.5.17 and OWASP ASI03.
High5
highC3Server-Side Request Forgery (SSRF)MCP04-data-exfiltrationAML.T0057
Pattern "\.(?:get|post|put|delete|patch|request)\s*\([^)]*(?:url|uri|target|endpoint|host)[^)]*(?:req|input|param|args|f['"]|\+)" matched in source_code: ".get(url, params=param" (at position 1545)
Validate ALL user-supplied URLs before making HTTP requests:
1. Parse the URL and check the hostname against an explicit allowlist of permitted domains.
2. Block requests to RFC 1918 private ranges: 10.0.0.0/8, 172.16.0.0/12, 192.168.0.0/16.
3. Block loopback (127.0.0.0/8), link-local (169.254.0.0/16), and IPv6 equivalents.
4. Block file:// and other non-http(s) protocols explicitly.
5. Disable automatic redirect following, or re-validate each redirect destination.
6. In cloud environments: block requests to IMDS endpoints (169.254.169.254,
metadata.google.internal) at both the application AND network layer.
Example (Node.js): Use the `ssrf-req-filter` package or implement URL validation
against an allowlist before calling fetch/axios/got.
highD1Known CVEs in DependenciesMCP08-dependency-vuln
Dependency "mcp@0.1.0" has known CVEs:
Update dependencies to versions that patch known CVEs. Run 'npm audit fix' or 'pip-audit' to identify and resolve vulnerable dependencies.
highD1Known CVEs in DependenciesMCP08-dependency-vuln
Dependency "requests@2.31.0" has known CVEs:
Update dependencies to versions that patch known CVEs. Run 'npm audit fix' or 'pip-audit' to identify and resolve vulnerable dependencies.
highK16Unbounded Recursion / Missing Depth LimitsMCP07-insecure-configAML.T0054
Pattern "(invoke|call|execute)[_\s-]?(?:tool|agent|self)(?!.*(?:depth|level|limit|max[_\s-]?(?:depth|recursi|iter|call)|count))" matched in source_code: "call_tool" (at position 11156)
Add explicit depth/recursion limits to all recursive operations. Use iterative approaches where possible. Set maximum depth for directory walking (max_depth=10), tree traversal (max_level=20), and agent re-invocation (max_calls=5). Implement circuit breakers that halt after N iterations. Required by EU AI Act Art. 15 (robustness) and OWASP ASI08.
highO8Timing-Based Covert ChannelMCP04-data-exfiltrationAML.T0057
Pattern "(?:delay|sleep|timeout|interval)\s*[:=]\s*(?:[^;]*(?:secret|token|password|credential|key|env))" matched in source_code: "timeout=10)
response.raise_for_status()
data = response.json()
logger.debug(f"Response received: {data}")
if data.get("errcode", 0) != 0:
logger.error(f"DingTalk API error: {data}")
raise DingTalkAPIError(
"DingTalk API error",
data.get("errcode", -1),
data.get("errmsg", "Unknown error")
)
return data
except requests.RequestException as e:
logger.error(f"HTTP request failed: {str(e)}", exc_info=True)
raise DingTalkAPIError("HTTP request failed", -1, str(e))
except ValueError as e:
logger.error(f"Invalid JSON response: {str(e)}", exc_info=True)
raise DingTalkAPIError("Invalid JSON response", -1, str(e))
def get_access_token(self) -> str:
"""获取钉钉access token"""
logger.debug("Attempting to get access token...")
try:
appkey = os.environ.get("DINGDING_APP_KEY")
appsecret = os.environ.get("DINGDING_APP_SECRET")
logger.debug(f"Using APP_KEY: {appkey[:4]}*** and APP_SECRET: {appsecret[:4]}***")
if not all([appkey, appsecret]):
logger.error("Missing DingTalk API credentials in environment variables")
raise ValueError("Missing DingTalk API credentials in environment variables")
url = f"{self.base_url}/gettoken"
data = self._make_request(url, {
"appkey": appkey,
"appsecret": appsecret
})
token = data["access_token"]
logger.debug(f"Successfully obtained access token: {token[:4]}***")
return token
except Exception as e:
logger.error(f"Failed to get access token: {str(e)}", exc_info=True)
raise
def get_department_list(self, fetch_child: bool = True) -> str:
"""获取部门列表"""
try:
access_token = self.get_access_token()
url = f"{self.base_url}/v1/department/list"
data = self._make_request(url, {
"access_token": access_token,
"fetch_child": fetch_child
})
departments = data.get("department", [])
result = []
for dept in departments:
result.append(
f"Department ID: {dept['id']}\n"
f"Name: {dept['name']}\n"
f"Parent ID: {dept.get('parentid', 'N/A')}\n"
f"---"
)
return "\n".join(result) if result else "No departments found"
except Exception as e:
logger.error(f"Failed to get department list: {str(e)}")
return f"Error: {str(e)}"
def get_department_users(self, department_id: int) -> str:
"""获取部门用户列表"""
try:
access_token = self.get_access_token()
url = f"{self.base_url}/v1/user/simplelist"
data = self._make_request(url, {
"access_token": access_token,
"department_id": department_id
})
users = data.get("userlist", [])
result = []
for user in users:
result.append(
f"User ID: {user['userid']}\n"
f"Name: {user['name']}\n"
f"---"
)
return "\n".join(result) if result else "No users found in this department"
except Exception as e:
logger.error(f"Failed to get department users: {str(e)}")
return f"Error: {str(e)}"
def get_user_detail(self, userid: str) -> Dict[str, Any]:
"""获取用户详细信息"""
try:
access_token = self.get_access_token()
url = f"{self.base_url}/v1/user/get"
return self._make_request(url, {
"access_token": access_token,
"userid": userid
})
except Exception as e:
logger.error(f"Failed to get user details: {str(e)}")
raise
def search_user_by_name(self, name: str) -> str:
"""根据姓名搜索用户信息"""
try:
# 获取所有部门
dept_list_str = self.get_department_list()
if not dept_list_str or "Error" in dept_list_str:
return f"Failed to get department list: {dept_list_str}"
# 解析部门列表字符串
departments = []
current_dept = {}
for line in dept_list_str.split('\n'):
if line.startswith('Department ID:'):
if current_dept:
departments.append(current_dept)
current_dept = {'id': int(line.split(': ')[1])}
elif line.startswith('Name:'):
current_dept['name'] = line.split(': ')[1]
if current_dept:
departments.append(current_dept)
# 遍历所有部门查找用户
for dept in departments:
dept_id = dept['id']
users_str = self.get_department_users(dept_id)
if "Error" in users_str or "Failed" in users_str:
logger.warning(f"Failed to get users for department {dept_id}: {users_str}")
continue
# 解析用户列表字符串
users = []
current_user = {}
for line in users_str.split('\n'):
if line.startswith('User ID:'):
if current_user:
users.append(current_user)
current_user = {'userid': line.split(': ')[1]}
elif line.startswith('Name:'):
current_user['name'] = line.split(': ')[1]
if current_user:
users.append(current_user)
# 在部门中查找匹配姓名的用户
for user in users:
if user['name'] == name:
try:
# 获取用户详细信息
user_detail = self.get_user_detail(user['userid'])
return (f"Found user:\n"
f"User ID: {user_detail['userid']}\n"
f"Name: {user_detail['name']}\n"
f"Mobile: {user_detail.get('mobile', 'N/A')}\n"
f"Email: {user_detail.get('email', 'N/A')}\n"
f"Position: {user_detail.get('position', 'N/A')}\n"
f"Department: {dept['name']}")
except Exception as e:
logger.error(f"Failed to get details for user {user['userid']}: {str(e)}")
continue
return f"No user found with name: {name}"
except Exception as e:
logger.error(f"Error searching for user: {str(e)}")
return f"Error: {str(e)}"
def setup_tools(self):
logger.debug("Setting up MCP tools...")
tools = [
Tool(
name="get_access_token",
description="Retrieves an access token from the DingTalk API for authentication purposes.",
inputSchema={
"type": "object",
"properties": {},
"required": []
}
),
Tool(
name="get_department_list",
description="Retrieves a list of all departments in the organization.",
inputSchema={
"type": "object",
"properties": {
"fetch_child": {
"type": "boolean",
"description": "Whether to include child departments in the response. Default is true.",
"default": True
}
},
"required": []
}
),
Tool(
name="get_department_users",
description="Retrieves a list of users in a specific department.",
inputSchema={
"type": "object",
"properties": {
"department_id": {
"type": "integer",
"description": "The ID of the department to query."
}
},
"required": ["department_id"]
}
),
Tool(
name="search_user_by_name",
description="Searches for a user across all departments by their name.",
inputSchema={
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "The exact name of the user to search for."
}
},
"required": ["name"]
}
)
]
async def list_tools() -> List[Tool]:
logger.debug("list_tools called")
return tools
async def call_tool(name: str, arguments: dict) -> List[TextContent]:
logger.debug(f"Tool called: {name} with arguments: {arguments}")
try:
result = None
if name == "get_access_token":
token = self.get_access_token()
result = [TextContent(type="text", text=f"Access Token: {token" (at position 1570)
Remove all code that calculates sleep/delay durations from application data, secrets, or any variable-length content. Tool response times should be constant or determined only by legitimate processing time. If rate limiting is needed, use fixed intervals not derived from data values. Monitor for anomalous response time patterns that could indicate timing-based exfiltration.
Medium2
mediumK17Missing Timeout or Circuit BreakerMCP07-insecure-configAML.T0054
Pattern "(?:fetch|axios|got|request|urllib|httpx|http\.get|http\.post)\s*\((?!.*(?:timeout|signal|AbortSignal|deadline|cancel))" matched in source_code: "request(" (at position 1304)
Add timeouts to ALL external calls: HTTP requests (30s), database queries (10s), subprocess execution (60s), and MCP tool calls (30s). Implement circuit breakers that open after N consecutive failures (e.g., opossum, cockatiel). Use AbortSignal for cancellable operations. Required by EU AI Act Art. 15 and OWASP ASI08.
mediumK20Insufficient Audit Context in LoggingMCP09-logging-monitoringAML.T0054
Pattern "logger\.(info|warn|error)\s*\(.*(?:tool|request|handle|invoke)(?!.*(?:requestId|correlationId|traceId|spanId|agent[_\s-]?id|user[_\s-]?id))" matched in source_code: "logger.error(f"HTTP request" (at position 2126)
Use structured logging that includes all five ISO 27001 A.8.15 fields: (1) WHO — agent/user identity, (2) WHAT — tool name and operation, (3) WHEN — ISO 8601 timestamp, (4) WHERE — server ID and correlation ID, (5) OUTCOME — success/failure and result summary. Replace console.log with structured loggers (pino, winston). Add correlation IDs for request tracing across multi-agent chains.
Low1
lowF4MCP Spec Non-ComplianceMCP07-insecure-config
Server fails MCP spec compliance checks: required:server_name; required:server_version; required:protocol_version; recommended:tool_descriptions; recommended:parameter_descriptions
Follow the MCP specification for server metadata. Include server name, version, and protocol version. Provide descriptions for all tools and parameters.