第二章:构建 Prompt Injection 防御系统代码_第1页
第二章:构建 Prompt Injection 防御系统代码_第2页
第二章:构建 Prompt Injection 防御系统代码_第3页
第二章:构建 Prompt Injection 防御系统代码_第4页
第二章:构建 Prompt Injection 防御系统代码_第5页
已阅读5页,还剩4页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

第二章:构建PromptInjection防御系统代码2.1章节目标PromptInjection防御系统的目标是:识别用户输入中的注入意图防止用户影响系统提示词、策略、工具权限控制LLM对外部工具的调用范围检测并阻断模型输出中的敏感信息泄露为所有关键事件提供可审计记录适用于:聊天机器人Agent系统RAG检索问答工单助手企业内部Copilot2.2防御体系总览建议把防御系统设计成5层:输入防护层检测注入表达限制危险命令型文本标记高风险请求上下文隔离层系统提示与用户输入严格分离检索内容单独分区工具返回内容与用户请求分离工具调用控制层白名单工具参数校验调用次数限制高风险工具二次确认输出审查层敏感数据脱敏策略违规内容拦截泄露系统prompt检测审计与追踪层请求ID用户ID命中规则最终处置结果2.3关键设计原则2.3.1系统提示不可暴露系统提示词、策略、工具schema、内部规则,不应直接返回给用户。2.3.2工具调用必须受控模型不能自由调用任何工具,必须经过服务端验证。2.3.3外部内容不可信来自网页、文档、邮件、用户上传文件的内容,都应视为“不可信输入”。2.3.4输出前必须复检即使模型生成成功,也要在返回前做一次输出审查。2.4推荐系统架构UserInput

InputGuard

ContextBuilder

LLMOrchestrator

ToolGuard/ToolExecutor

OutputGuard

Response模块说明InputGuard:扫描输入中是否存在注入、索取密码、绕过策略等意图ContextBuilder:构造隔离上下文,避免系统提示混入用户输入LLMOrchestrator:统一管理模型调用ToolGuard:校验工具类型、参数范围、调用频率OutputGuard:检查回答中是否泄漏敏感数据AuditLog:记录每一步的结果2.5Python实现示例下面是一个完整的防御中间件雏形。2.5.1代码结构prompt-defense/

├──defense.py

├──tools.py

├──policies.py

└──app.py2.5.2policies.pyALLOWED_TOOLS={

"search_docs":{

"max_calls_per_request":3,

"allowed_params":["query","top_k"]

},

"summarize_text":{

"max_calls_per_request":2,

"allowed_params":["text"]

},

"fetch_ticket":{

"max_calls_per_request":5,

"allowed_params":["ticket_id"]

}

}

BLOCKED_INPUT_PATTERNS=[

r"(?i)ignore.*previous.*instructions",

r"(?i)bypass.*policy",

r"(?i)reveal.*systemprompt",

r"(?i)show.*hiddenprompt",

r"(?i)disable.*audit",

r"(?i)print.*memory",

]

BLOCKED_OUTPUT_PATTERNS=[

r"(?i)api[_-]?key",

r"(?i)secret[_-]?key",

r"(?i)password",

r"(?i)privatekey",

r"(?i)internalpolicy",

]2.5.3defense.pyimportre

fromtypingimportDict,Any,List

frompoliciesimportBLOCKED_INPUT_PATTERNS,BLOCKED_OUTPUT_PATTERNS,ALLOWED_TOOLS

classPromptDefense:

def__init__(self):

self.tool_call_counter={}

definspect_input(self,user_input:str)->Dict[str,Any]:

hits=[]

forpatterninBLOCKED_INPUT_PATTERNS:

ifre.search(pattern,user_input,re.DOTALL):

hits.append(pattern)

return{

"safe":len(hits)==0,

"hits":hits

}

definspect_output(self,output_text:str)->Dict[str,Any]:

hits=[]

forpatterninBLOCKED_OUTPUT_PATTERNS:

ifre.search(pattern,output_text,re.DOTALL):

hits.append(pattern)

return{

"safe":len(hits)==0,

"hits":hits

}

defsanitize_text(self,text:str)->str:

text=re.sub(r"[\x00-\x1f\x7f]","",text)

text=re.sub(r"\s+","",text).strip()

returntext

defvalidate_tool_call(self,tool_name:str,params:Dict[str,Any],request_id:str)->Dict[str,Any]:

iftool_namenotinALLOWED_TOOLS:

return{

"allowed":False,

"reason":"tool_not_allowed"

}

policy=ALLOWED_TOOLS[tool_name]

allowed_params=set(policy["allowed_params"])

received_params=set(params.keys())

ifnotreceived_params.issubset(allowed_params):

return{

"allowed":False,

"reason":"invalid_params",

"detail":list(received_params-allowed_params)

}

key=f"{request_id}:{tool_name}"

count=self.tool_call_counter.get(key,0)+1

self.tool_call_counter[key]=count

ifcount>policy["max_calls_per_request"]:

return{

"allowed":False,

"reason":"rate_limit_exceeded"

}

return{

"allowed":True

}

defbuild_safe_context(self,system_prompt:str,user_input:str,retrieved_docs:List[str]=None)->Dict[str,str]:

"""

将上下文分区,避免系统提示词与用户输入混杂。

"""

retrieved_docs=retrieved_docsor[]

safe_user_input=self.sanitize_text(user_input)

safe_docs=[self.sanitize_text(doc)fordocinretrieved_docs]

return{

"system_prompt":system_prompt,

"user_input":safe_user_input,

"retrieved_docs":"\n".join(safe_docs)

}2.5.4tools.pydefsearch_docs(query:str,top_k:int=3):

#这里应连接内部知识库,不应直接执行外部任意搜索

return[

f"Result1for{query}",

f"Result2for{query}",

][:top_k]

defsummarize_text(text:str):

#示例:实际应调用模型或摘要服务

returntext[:200]

deffetch_ticket(ticket_id:str):

#示例:仅允许访问内部工单系统

return{

"ticket_id":ticket_id,

"status":"open",

"summary":"Exampleticket"

}2.5.5app.pyimportjson

fromdefenseimportPromptDefense

fromtoolsimportsearch_docs,summarize_text,fetch_ticket

defrun_request(request_id:str,user_input:str):

defense=PromptDefense()

#1)输入检测

input_check=defense.inspect_input(user_input)

ifnotinput_check["safe"]:

return{

"request_id":request_id,

"status":"blocked",

"reason":"input_policy_violation",

"details":input_check["hits"]

}

#2)构建安全上下文

context=defense.build_safe_context(

system_prompt="Youareasecureenterpriseassistant.",

user_input=user_input,

retrieved_docs=[]

)

#3)示例:模拟模型决定调用工具

tool_name="search_docs"

tool_params={"query":context["user_input"],"top_k":2}

tool_check=defense.validate_tool_call(tool_name,tool_params,request_id)

ifnottool_check["allowed"]:

return{

"request_id":request_id,

"status":"blocked",

"reason":"tool_policy_violation",

"details":tool_check

}

#4)执行工具

iftool_name=="search_docs":

tool_result=search_docs(**tool_params)

eliftool_name=="summarize_text":

tool_result=summarize_text(**tool_params)

eliftool_name=="fetch_ticket":

tool_result=fetch_ticket(**tool_params)

else:

tool_result=None

#5)生成模型输出(此处用模拟内容)

model_output=f"Basedondocs:{tool_result}"

#6)输出检测

output_check=defense.inspect_output(model_output)

ifnotoutput_check["safe"]:

return{

"request_id":request_id,

"status":"blocked",

"reason":"output_policy_violation",

"details":output_check["hits"]

}

return{

"request_id":request_id,

"status":"ok",

"response":model_output

}

if__name__=="__main__":

result=run_request("req-00

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论