offline_data_model_pipline/data_generate/zcs/zaoshu/duihuazaoshu_piliang4.py

640 lines
30 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
from openpyxl import Workbook
from openpyxl.styles import Font, Alignment
import os
from faker import Faker
import json
import random
from typing import List, Dict, Tuple
import pandas as pd
from collections import defaultdict
import concurrent.futures
from functools import partial
def read_categories_config(file_path):
try:
# 读取Excel文件假设前两列是二级和三级分类
df = pd.read_excel(file_path)
# 检查至少有两列数据
if len(df.columns) < 2:
raise ValueError("Excel文件必须至少包含两列二级分类和三级分类")
categories_config = defaultdict(list)
# 遍历每一行数据
for _, row in df.iterrows():
level2 = str(row.iloc[0]).strip() # 二级分类(第一列)
level3 = str(row.iloc[1]).strip() # 三级分类(第二列)
# 跳过空行
if not level2 or not level3:
continue
# 确保三级分类不重复
if level3 not in categories_config[level2]:
categories_config[level2].append(level3)
return dict(categories_config)
except FileNotFoundError:
print(f"错误:文件 {file_path} 不存在", flush=True)
return {}
except Exception as e:
print(f"处理文件时出错: {str(e)}", flush=True)
return {}
def chat(content: str, models_url):
payload = json.dumps(
{
"model": "Qwen2.5-72B-Instruct",
"stream": False,
"temperature": 0.5,
"top_p": 0.5,
"repetition_penalty": 1.05,
"messages": [{"role": "user", "content": f"{content}"}],
}
)
headers = {
"Content-Type": "application/json",
"cache-control": "no-cache",
"Postman-Token": "4c70efd4-6448-4318-b2a9-e404f0181b80",
}
try:
response = requests.request("POST", models_url, data=payload, headers=headers)
if response.status_code == 200:
response_data = response.json()
content = response_data["choices"][0]["message"]["content"]
else:
logger.info(f"response is: {response.json()}")
logger.info(f"Request failed with status code: {response.status_code}")
logger.info(f"Response content: {response.content}")
content = None
except Exception as e:
logger.error(f"resquest_exception: {e}", exc_info=True)
return content
class FullyDynamicGenerator:
def __init__(self):
self.model_url = "http://100.105.61.165:8000/v1/chat/completions"
self.headers = {
"Content-Type": "application/json",
"Authorization": "7c3eafb5-2d6e-100d-ab0f-7b2c1cdafb3c"
}
self.model_name = "Qwen2.5-72B-Instruct"
self.faker = Faker('zh_CN')
self.dynamic_memory = {}
self.special_cases = [
"方言沟通", "老年人口齿不清", "情绪激动打断对话",
"背景噪音干扰", "信号断续"
]
# 添加锁用于线程安全的Excel写入
self._export_lock = threading.Lock()
def generate_dialog(self, category: str, subcategory: str, export_path: str = None) -> List[Dict]:
"""全动态对话生成入口"""
scene_knowledge = self.generate_scene_knowledge(category, subcategory)
self.dynamic_memory[f"{category}_{subcategory}"] = scene_knowledge
dialog = []
dialog.extend(self.generate_complex_opening(category, subcategory))
dialog.extend(self.generate_obstacle_base_phase(scene_knowledge, subcategory))
dialog.extend(self.generate_verification_with_challenges(dialog))
dialog.extend(self.generate_technical_extend_phase(scene_knowledge, subcategory))
dialog.extend(self.generate_final_confirmation(scene_knowledge, subcategory))
formatted_dialog = self.format_output(dialog)
if export_path:
with self._export_lock: # 使用锁保证线程安全
self.export_to_excel(formatted_dialog, export_path, category, subcategory)
return formatted_dialog
def _generate_single_dialog(self, category, subcategory, export_path, num_per_subcategory, i, total_tasks, current_task_counter):
"""生成单个对话的辅助函数,用于并发执行"""
with current_task_counter.get_lock():
current_task = current_task_counter.value + 1
current_task_counter.value = current_task
print(f"\n进度: {current_task}/{total_tasks} "
f"({(current_task/total_tasks)*100:.1f}%) - "
f"分类: {category} - "
f"子分类: {subcategory} - "
f"{i+1}/{num_per_subcategory}", flush=True)
dialog = self.generate_dialog(
category=category,
subcategory=subcategory,
export_path=export_path
)
return {
"category": category,
"subcategory": subcategory,
"dialog": dialog
}
def generate_dialogs_in_batch(self, categories: Dict[str, List[str]], num_per_subcategory: int, export_path: str):
"""
批量生成对话数据
:param categories: 字典格式 {分类: [子分类1, 子分类2,...]}
:param num_per_subcategory: 每个子分类生成的数量
:param export_path: 输出文件路径
"""
all_dialogs = []
# 计算总任务量
total_subcategories = sum(len(subcats) for subcats in categories.values())
total_tasks = total_subcategories * num_per_subcategory
print(f"\n总共需要生成 {total_subcategories} 个子分类的数据,每个子分类 {num_per_subcategory} 条,共计 {total_tasks} 条对话记录", flush=True)
# 使用ThreadPoolExecutor创建10个worker
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
# 创建共享计数器
current_task_counter = multiprocessing.Value('i', 0)
# 准备任务列表
futures = []
for category, subcategories in categories.items():
for subcategory in subcategories:
for i in range(num_per_subcategory):
futures.append(
executor.submit(
self._generate_single_dialog,
category=category,
subcategory=subcategory,
export_path=export_path,
num_per_subcategory=num_per_subcategory,
i=i,
total_tasks=total_tasks,
current_task_counter=current_task_counter
)
)
# 获取结果
for future in concurrent.futures.as_completed(futures):
try:
result = future.result()
all_dialogs.append(result)
except Exception as e:
print(f"生成对话时出错: {str(e)}", flush=True)
print(f"\n已完成所有生成任务,共生成{len(all_dialogs)}条对话记录", flush=True)
return all_dialogs
def export_to_excel(self, dialog: List[Dict], file_path: str, category: str, subcategory: str):
"""将整个对话作为一条记录保存到Excel文件追加模式"""
try:
# 合并对话内容格式为1. [客服]内容
dialog_text = "\n".join(
[f"{turn['turn']}. {turn['speaker']} {turn['content']}"
for turn in dialog]
)
# 创建包含元数据的DataFrame
record = {
"分类": category,
"子分类": subcategory,
"对话轮数": len(dialog),
"对话内容": dialog_text,
}
df = pd.DataFrame([record])
# 如果文件存在则追加,否则创建新文件
if os.path.exists(file_path):
with pd.ExcelWriter(file_path, mode='a', engine='openpyxl', if_sheet_exists='overlay') as writer:
# 读取现有数据
existing_df = pd.read_excel(file_path)
# 合并新旧数据
combined_df = pd.concat([existing_df, df], ignore_index=True)
# 写入合并后的数据
combined_df.to_excel(writer, index=False)
else:
# 确保目录存在
os.makedirs(os.path.dirname(file_path), exist_ok=True)
df.to_excel(file_path, index=False)
print(f"对话已成功保存到: {file_path}", flush=True)
except Exception as e:
print(f"保存Excel文件时出错: {str(e)}", flush=True)
def generate_complex_opening(self, category: str, subcategory: str) -> List[Tuple]:
"""生成带复杂情形的开场对话"""
phase = []
special_case = random.choice(self.special_cases + [None]*3)
# 首先让客服说话
response_text = "您好,我是政府热线服务,很高兴为您服务"
if special_case == "老年人口齿不清":
response_text += "(放慢语速)请您慢慢说"
phase.append(("客服", "greeting", response_text))
# 然后市民反馈问题
citizen_traits = {
"方言": random.choice(["带浓重口音", "夹杂方言词汇", "语法不规范"]),
"老年人": random.choice(["说话缓慢", "重复语句", "耳背听不清"]),
"情绪化": random.choice(["不断打断", "提高音量", "带哭腔"])
}
opening_prompt = f"""生成市民反映{subcategory}问题的电话开场白,要求:
1. 必须包含"您好"等礼貌用语
2. 体现真实通话特征:{citizen_traits.get(special_case, "正常沟通")}
3. 包含具体问题细节"""
opening = self.safe_llm_call(
prompt=opening_prompt,
system="你擅长模拟各类人群的真实对话",
response_format={"type": "json_object"}
)
try:
opening_data = json.loads(opening)
opening_text = opening_data.get("text", f"您好,我要反映{subcategory}问题")
if special_case == "方言沟通":
opening_text = self.add_dialect_features(opening_text)
except:
opening_text = f"您好,我想投诉{subcategory}问题"
phase.append(("市民", "open_call", opening_text))
# 如果需要确认问题
if special_case in ["方言沟通", "老年人口齿不清", "信号断续"]:
phase.append(("客服", "double_check", f"抱歉,刚才没有听清楚,您是说{subcategory}问题对吗?"))
phase.append(("市民", "clarify", random.choice([
"对,就是这个问题",
f"不是,是{random.choice(['更严重','其他'])}的问题",
"(声音断断续续)喂...听得到吗?"
])))
return phase
def generate_obstacle_base_phase(self, knowledge: Dict, scene: str) -> List[Tuple]:
"""生成带沟通障碍的基础信息采集"""
phase = []
required_fields = ["时间", "地点", "事件描述", "联系方式", "姓氏"]
for field in required_fields:
if random.random() < 0.1:
unclear_question = self.safe_llm_call(
prompt=f"仅返回生成有歧义的{field}的询问话术,仅返回询问话术,不返回额外内容",
system="故意制造1-2处不明确表述"
) or f"那个...关于{field}的情况能不能说下?"
phase.append(("客服", "unclear_question", unclear_question))
phase.append(("市民", "confused", "您问的是什么?我没听明白"))
question = self.safe_llm_call(
prompt=f"仅返回重新生成清晰的{field}询问话术",
system="使用最简明的表达"
) or f"请提供{field}的具体信息"
phase.append(("客服", "retry_question", question))
else:
question = self.safe_llm_call(
prompt=f"仅返回生成政务热线询问{field}的标准话术,场景:{scene},仅返回询问话术,不返回额外内容",
system="要求1.使用敬语 2.明确信息要求"
) or f"请问{scene}{field}是?"
phase.append(("客服", "info_request", question))
answer, needs_clarify = self.generate_complex_answer(scene, field, question)
phase.append(("市民", "info_response", answer))
if needs_clarify:
clarify_question = self.safe_llm_call(
prompt=f"仅返回根据模糊回答'{answer}'生成澄清{field}的追问,仅返回追问内容,不返回额外内容",
system="要求1.在追问中指出不明确处 2.进行礼貌的追问"
) or f"您提供的{field}不够具体,请补充(例:{self.get_field_example(field)}"
phase.append(("客服", "clarify_request", clarify_question))
if random.random() < 0.1:
phase.append(("市民", "refuse", random.choice([
"这么麻烦不说了!",
"你们政府办事就是繁琐",
f"{field}有什么好问的!"
])))
phase.append(("客服", "calm_down", random.choice([
"理解您的心情,但详细信息能帮助我们更快解决问题",
"抱歉给您带来不便,这是必要流程"
])))
phase.append(("市民", "clarified_response", f"哦,应该是{self.get_field_example(field)}"))
return phase
def generate_complex_answer(self, scene: str, field: str, question) -> Tuple[str, bool]:
"""生成带复杂特征的市民回答"""
if random.random() < 0.15:
special_answers = {
"时间": [
("就...就那个...前几天", True),
("(背景嘈杂)喂?时间啊...上周?", True),
("我不记得了!你们自己查!", False)
],
"地点": [
("俺们村东头那个...那个啥来着", True),
("(信号不好)在...哗哗...超市附近", True),
("这么简单的问题都处理不了?", False)
]
}
return random.choice(special_answers.get(field, [("这个我说不好", True)]))
answers = {
"时间": [
(f"{random.choice(['今天','昨天'])}{random.randint(1,12)}点左右", False),
(f"持续{random.randint(2,24)}小时了", False)
],
"地点": [
(f"{self.faker.building_number()}{random.choice(['东侧','南门'])}", False),
(f"{self.faker.street_name()}附近", True)
],
"联系方式": [
(f"{self.faker.phone_number()[:3]}****", True),
(f"固话:{self.faker.phone_number()[:4]}-{self.faker.phone_number()[-4:]}", False)
],
"姓氏": [
(f"免贵姓{self.faker.last_name()}", False),
("叫我老李就行", True)
]
}
common_answer = self.safe_llm_call(
prompt = f"""仅返回模拟市民对'{question}'的真实回答要求1. 包含具体{field}的细节数据。 2. 反映真实诉求和情绪梯度。""",
system="你是一个普通市民,回答要口语化并带生活细节"
)
return random.choice(answers.get(field, [(common_answer, False)]))
def generate_verification_with_challenges(self, previous_dialog: List[Tuple]) -> List[Tuple]:
"""生成带挑战的信息确认环节"""
phase = []
collected_info = {}
for turn in previous_dialog:
if turn[1] in ["info_response", "clarified_response"]:
for field in ["时间", "地点", "姓氏"]:
if field in turn[2]:
collected_info[field] = turn[2]
if random.random() < 0.1:
collected_info[field] = self.get_wrong_info(field)
if collected_info:
if random.random() < 0.05:
wrong_field = random.choice(list(collected_info.keys()))
correct_value = collected_info[wrong_field]
collected_info[wrong_field] = self.get_wrong_info(wrong_field)
verification_text = self.safe_llm_call(
prompt="仅返回根据以下信息生成确认话术:" + json.dumps(collected_info, ensure_ascii=False),
system="要求1.逐项确认 2.允许修正"
) or f"我确认下:时间:{collected_info.get('时间','')},地点:{collected_info.get('地点','')}..."
phase.append(("客服", "info_verification", verification_text))
if random.random() < 0.3:
correction_field = random.choice(list(collected_info.keys()))
phase.append(("市民", "correction",
f"{correction_field}不对!应该是{self.get_field_example(correction_field)}"))
if random.random() < 0.1:
phase.append(("市民", "angry", "你们连基本信息都记错!"))
phase.append(("客服", "apology", "非常抱歉,这是我们的失误"))
phase.append(("客服", "acknowledge_correction", f"已更正{correction_field}信息"))
phase.append(("市民", "final_confirmation", "现在对了"))
else:
phase.append(("市民", "confirmation", "对,没错"))
return phase
def generate_technical_extend_phase(self, knowledge: Dict, scene: str) -> List[Tuple]:
"""生成带技术障碍的扩展追问"""
phase = []
for question_config in knowledge.get("extend_questions", []):
# 确保question变量总是有值
question = question_config.get('prompt','') # 默认值
if random.random() < 0.05:
tech_question = self.safe_llm_call(
prompt=f"仅返回生成包含专业术语的{scene}问题",
system="使用3个以上专业词汇"
) or f"请问{scene}{random.choice(['频谱特征','声压级衰减曲线'])}是怎样的?"
phase.append(("客服", "technical_question", tech_question))
phase.append(("市民", "not_understand", "这些专业名词听不懂"))
simplified = self.safe_llm_call(
prompt=f"仅将'{tech_question}'转化为的通俗问题",
system="用生活化比喻解释"
) or f"就是问{scene}的具体表现是怎样的"
question = simplified # 更新question变量
phase.append(("客服", "simplified_question", simplified))
else:
generated_question = self.safe_llm_call(
prompt=f"仅返回基于{scene}场景生成的追问:{question_config.get('prompt','')}",
system="要求1.分步骤询问 2.适度专业"
)
question = generated_question or question_config.get('prompt','') # 确保question有值
phase.append(("客服", "extend_question", question))
# 现在question变量肯定有值
if random.random() < 0.15:
phase.append(("市民", "broken_response", "喂?...听得到吗?...我说到哪了?"))
phase.append(("客服", "reassure", "电话不太稳定,请您继续"))
answer = self.generate_realistic_answer(
question, scene, question_config.get("theme",""), "extend"
)
phase.append(("市民", "extend_answer", answer))
if random.random() < 0.1:
phase.append(("客服", "request_material", "需要您提供现场照片或录音证据"))
phase.append(("市民", "material_response", random.choice([
"我手机里有,怎么发给你们?",
"现在拍不了,你们自己来看!"
])))
phase.append(("客服", "guide", "可以通过微信公众号'市民服务'上传"))
return phase
def generate_final_confirmation(self, knowledge: Dict, scene: str) -> List[Tuple]:
"""生成最终确认"""
phase = []
confirmation = self.safe_llm_call(
prompt=f"仅返回生成{scene}问题的最终确认话术",
system="包含1.处理时限 2.反馈方式 3.应急联系人"
) or f"我们将在{random.choice(['24小时','3个工作日'])}内处理您的{scene}问题"
phase.append(("客服", "final_confirmation", confirmation))
if random.random() < 0.2:
phase.append(("市民", "follow_up", random.choice([
"如果超时没处理怎么办?",
"我要找哪个部门跟进?"
])))
phase.append(("客服", "replay", random.choice([
"可拨打监督电话12345查询进度",
"我们会主动给您回复"
])))
return phase
def generate_scene_knowledge(self, category: str, subcategory: str) -> Dict:
"""动态生成场景知识图谱"""
prompt = f"""作为政务热线专家,请为【{category}->{subcategory}】场景生成知识配置,包含:
1. 3-5个必问基础字段如时间、地点
2. 3个专业追问方向及追问话术模板
3. 该场景涉及的相关部门和处理时限参考
仅返回JSON格式结构示例
{{
"base_fields": [
{{"field": "时间", "prompt": "询问具体时间的标准话术"}},
{{"field": "地点", "prompt": "询问详细位置的专业话术"}}
],
"extend_questions": [
{{"theme": "历史记录", "prompt": "追问历史投诉情况的专业话术"}},
{{"theme": "紧急程度", "prompt": "评估问题紧急程度的询问方式"}}
],
"departments": ["城管局", "环保局"],
"time_ranges": ["24小时内", "3个工作日"]
}}"""
response = self.safe_llm_call(
prompt=prompt,
system="你是有10年经验的政务热线系统架构师",
response_format={"type": "json_object"}
)
try:
knowledge = json.loads(response)
knowledge["confirmation_template"] = self.generate_confirmation_template(
category, subcategory, knowledge.get("departments", []), knowledge.get("time_ranges", [])
)
return knowledge
except:
return self.get_fallback_knowledge(category, subcategory)
def generate_confirmation_template(self, category: str, subcategory: str,
departments: List[str], time_ranges: List[str]) -> str:
"""生成确认话术模板"""
prompt = f"""为【{category}->{subcategory}】创建确认话术模板,要求包含:
1. 处理部门:{departments}
2. 预计时限:{time_ranges}
3. 至少2种后续跟进方式
模板示例:\"我们将协调{{department}}{{timeframe}}内处理,可通过{{phone}}{{wechat}}查询进展\"
"""
return self.safe_llm_call(
prompt=prompt,
system="你需创建可参数化的文本模板,用{}标记变量位置"
) or f"我们将尽快处理您的{subcategory}问题"
def generate_realistic_answer(self, question: str, scene: str,
field: str, answer_type: str) -> str:
"""生成高真实性回答"""
prompt = f"""仅返回模拟市民对【{scene}】问题中'{question}'的真实回答,要求:
1. 包含具体{field}的细节数据
2. 反映真实诉求和情绪梯度
3. 使用该场景典型市民的语言特征"""
system = {
"base": "你是一个普通市民,回答要口语化并带生活细节",
"extend": "你是有相关专业知识的市民,回答要包含技术参数和量化描述"
}[answer_type]
answer = self.safe_llm_call(prompt=prompt, system=system)
return answer or self.get_field_example(field)
def get_field_example(self, field: str) -> str:
"""获取字段示例"""
examples = {
"时间": "2023年10月15日下午3点20分",
"地点": "朝阳区建国路88号地下二层停车场",
"联系方式": "13800138000或010-12345678",
"姓氏": "张先生/李女士"
}
return examples.get(field, "具体情况是这样的...")
def get_fallback_knowledge(self, category: str, subcategory: str) -> Dict:
"""应急知识库"""
return {
"base_fields": [
{"field": "时间", "prompt": f"请问{subcategory}发生的具体时间?"},
{"field": "地点", "prompt": f"请说明{category}问题的详细位置?"}
],
"extend_questions": [
{"theme": "基本情况", "prompt": f"请描述{subcategory}的具体表现?"}
],
"confirmation_template": f"我们将处理您的{category}问题",
"departments": ["相关部门"],
"time_ranges": ["尽快"]
}
def add_dialect_features(self, text: str) -> str:
"""添加方言特征"""
dialects = {
"北方方言": [("", ""), ("", ""), ("这个", "这玩意儿")],
"南方方言": [("是不是", "系唔系"), ("不知道", "母鸡"), ("", "")]
}
dialect_type, replacements = random.choice(list(dialects.items()))
for orig, rep in replacements:
if orig in text:
return text.replace(orig, rep)
return text + random.choice(["晓得伐?", "中不中?", "得啵?"])
def get_wrong_info(self, field) -> str:
"""生成错误信息"""
wrong_examples = {
"时间": random.choice(["昨天", "上周", "记不清了"]),
"地点": random.choice(["东边", "路口", "大概位置"]),
"姓氏": random.choice(["", "", ""])
}
return wrong_examples.get(field, "信息有误")
def safe_llm_call(self, prompt: str, system: str = None,**kwargs) -> str:
"""带熔断机制的API调用"""
try:
messages = []
if system:
messages.append({"role": "system", "content": system})
messages.append({"role": "user", "content": prompt})
data = {
"model": self.model_name,
"messages": messages,
"temperature": 0.7,
"max_tokens": 400
}
# 处理response_format参数
if "response_format" in kwargs:
data["response_format"] = kwargs["response_format"]
response = requests.post(
self.model_url,
headers=self.headers,
json=data,
timeout=60
)
if response.status_code == 200:
return response.json()["choices"][0]["message"]["content"]
else:
print(f"API调用失败: {response.status_code}, {response.text}", flush=True)
return ""
except Exception as e:
print(f"API异常: {str(e)}", flush=True)
return ""
def format_output(self, dialog: List[Tuple]) -> List[Dict]:
"""格式化输出,移除[xxx]类型标签"""
formatted = []
for idx, (speaker, dtype, content) in enumerate(dialog):
# 移除类型标签,只保留说话人
formatted.append({
"turn": idx+1,
"speaker": f"[{speaker}]",
"content": content
})
return formatted
if __name__ == "__main__":
import multiprocessing
import threading
generator = FullyDynamicGenerator()
# 示例文件路径
excel_path = "/data/zhaochsh01/buquan/12345/zaoshu/count_3level.xlsx"
# 读取并生成categories_config
categories_config = read_categories_config(excel_path)
# 打印结果
print("生成的categories_config:", flush=True)
for level2, level3_list in categories_config.items():
print(f"{level2}: {level3_list}", flush=True)
num_per_subcategory = 2 # 每个子分类生成3条数据
output_file = "./output/政务热线对话记录更新.xlsx"
# 批量生成数据
generator.generate_dialogs_in_batch(
categories=categories_config,
num_per_subcategory=num_per_subcategory,
export_path=output_file
)
# 示例打印最后生成的5条记录
sample_df = pd.read_excel(output_file)
print("\n=== 最后5条记录示例 ===", flush=True)
print(sample_df.tail(), flush=True)