From a116fa2bbeadeb5dacd95925bc3b359c597efea0 Mon Sep 17 00:00:00 2001 From: "@zhaochsh01" Date: Tue, 13 May 2025 14:32:47 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E5=AF=B9=E8=AF=9D=E5=92=8C?= =?UTF-8?q?=E6=8A=A5=E5=91=8A=E5=86=85=E5=AE=B9=E6=8F=90=E5=8F=96=E7=9A=84?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E7=94=9F=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../zcs/baogao_content_extract_zaoshu.py | 164 +++++ .../zw12345/zcs/duihuazaoshu_piliang4.py | 640 ++++++++++++++++++ 2 files changed, 804 insertions(+) create mode 100644 data_generate/zw12345/zcs/baogao_content_extract_zaoshu.py create mode 100644 data_generate/zw12345/zcs/duihuazaoshu_piliang4.py diff --git a/data_generate/zw12345/zcs/baogao_content_extract_zaoshu.py b/data_generate/zw12345/zcs/baogao_content_extract_zaoshu.py new file mode 100644 index 0000000..7955d6f --- /dev/null +++ b/data_generate/zw12345/zcs/baogao_content_extract_zaoshu.py @@ -0,0 +1,164 @@ +""" +政务12345全国数据生成系统 +功能: +1. 支持全国范围地理位置生成 +2. 多层级分类扩展 +3. 数据保存至Excel +4. 真实业务场景模拟 +""" + +import pandas as pd +import random +import time +import re +import json +import requests +from typing import List, Dict, Tuple + +class NationalDataGenerator: + def __init__(self, excel_path: str, category_column: str): + self.base_categories = self._load_excel_categories(excel_path, category_column) + self.location_pool = self._generate_national_locations() + self.expanded_categories = self._expand_categories_with_gpt() + self.used_records = set() + + + + def _chat(self, content: str) -> str: + """调用Qwen模型的统一接口""" + payload = json.dumps({ + "model": "Qwen2.5-72B-Instruct", + "stream": False, + "temperature": 0.01, + "top_p": 0.1, + "repetition_penalty": 1.05, + "messages": [{"role": "user", "content": content}], + }) + headers = { + "Content-Type": "application/json", + "cache-control": "no-cache" + } + + try: + response = requests.post("http://100.105.214.176:8000/v1/chat/completions", headers=headers, data=payload) + response.raise_for_status() + return response.json()["choices"][0]["message"]["content"] + except Exception as e: + print(f"API调用失败: {str(e)}") + return "" + + def _load_excel_categories(self, path: str, column: str) -> List[str]: + """从Excel读取基础分类""" + df = pd.read_excel(path) + return df[column].dropna().unique().tolist() + + def _generate_national_locations(self, num=200) -> List[str]: + """生成全国真实地理位置库""" + prompt = f"生成{num}个中国各城市真实存在的地理位置,按省市区三级格式,示例:\n- 广东省广州市天河区珠江新城\n- 浙江省杭州市余杭区未来科技城" + response = self._chat(prompt) + print("生成的地理位置库为") + print(response) + print(type(response)) + locations = [ + parts[1] # 取第二部分(地址) + for line in response.strip().split("\n") + if line and (parts := line.split(maxsplit=1)) and len(parts) >= 2 + ] + print(locations) + return locations + + def _expand_categories_with_gpt(self) -> Dict[str, List[str]]: + """Qwen扩展分类体系""" + category_map = {} + for base_cat in self.base_categories: + prompt = f"生成与【{base_cat}】相关但具有政务场景区分度的5个细分类型,示例:\n- 类型1:施工许可违规\n- 类型2:夜间施工超时" + response = self._chat(prompt) + print("扩展类型为") + print(response) + print(type(response)) + sub_cats = [ + re.sub(r"^.*类型\d+:|\s*$", "", line) # 移除 "类型X:" 和首尾空格 + for line in response.strip().split("\n") + if "类型" in line and ":" in line # 只处理包含 "类型" 和 ":" 的行 + ] + category_map[base_cat] = sub_cats + time.sleep(1) + return category_map + + def generate_dataset(self, num_records: int) -> pd.DataFrame: + """生成核心数据集""" + data = [] + while len(data) < num_records: + base_cat = random.choice(self.base_categories) + sub_cat = random.choice(self.expanded_categories[base_cat]) + location = random.choice(self.location_pool) + + content, keywords = self._generate_content(base_cat, sub_cat, location) + if content and self._validate_record(content, keywords, base_cat): + data.append({ + "ID": len(data)+1, + "内容": content, + "关键词": " ".join(keywords), + "参考答案": base_cat, + "细分类型": sub_cat, + "地理位置": location + }) + time.sleep(1.2) + + return pd.DataFrame(data) + + def _generate_content(self, base_cat: str, sub_cat: str, location: str) -> Tuple[str, List[str]]: + """生成政务工单内容""" + prompt = f"""生成真实可信的12345政务工单,要求: +1. 主分类:【{base_cat}】 +2. 细分类型:【{sub_cat}】 +3. 发生地点:【{location}】 +4. 包含要素:时间、具体问题、影响范围、市民诉求 +5. 生成5个关键词(必须包含{base_cat}) +6. 内容长度80-150字 + +示例格式: +市民反映{location}某建筑工地违规夜间施工至凌晨,噪音严重干扰周边居民。已向环保部门投诉3次未解决,要求立即停工整顿。 +关键词:夜间施工 噪音污染 环保投诉 施工许可 居民维权""" + + try: + response = self._chat(prompt) + raw_text = response.strip() + return self._parse_generated_text(raw_text) + except Exception as e: + print(f"生成失败:{str(e)}") + return None, [] + + def _parse_generated_text(self, text: str) -> Tuple[str, List[str]]: + """解析生成文本""" + content = re.sub(r"关键词:.*", "", text).strip() + keywords = re.findall(r"关键词:(.+)", text)[0].split()[:5] + return content, keywords + + def _validate_record(self, content: str, keywords: List[str], category: str) -> bool: + """五重数据校验""" + return ( + len(content) >= 80 and + len(keywords) == 5 and + category in keywords and + content not in self.used_records and + any(c.isdigit() for c in content) # 包含数字要素 + ) + + +if __name__ == "__main__": + # 初始化生成器 + generator = NationalDataGenerator( + excel_path="/data/zhaochsh01/buquan/12345/zaoshu/12345政务服务大模型测试集.xlsx", + category_column="answer" + ) + + # 生成100条数据 + df = generator.generate_dataset(100) + + # 保存到Excel + with pd.ExcelWriter("./output/government_12345_data.xlsx") as writer: + df.to_excel(writer, index=False) + + print("生成数据示例:") + print(df[["ID", "内容", "关键词", "参考答案"]].head(3).to_string(index=False)) \ No newline at end of file diff --git a/data_generate/zw12345/zcs/duihuazaoshu_piliang4.py b/data_generate/zw12345/zcs/duihuazaoshu_piliang4.py new file mode 100644 index 0000000..a94a3fe --- /dev/null +++ b/data_generate/zw12345/zcs/duihuazaoshu_piliang4.py @@ -0,0 +1,640 @@ +import requests +from openpyxl import Workbook +from openpyxl.styles import Font, Alignment +import os +from faker import Faker +import json +import random +from typing import List, Dict, Tuple +import pandas as pd +from collections import defaultdict +import concurrent.futures +from functools import partial + +def read_categories_config(file_path): + try: + # 读取Excel文件(假设前两列是二级和三级分类) + df = pd.read_excel(file_path) + + # 检查至少有两列数据 + if len(df.columns) < 2: + raise ValueError("Excel文件必须至少包含两列:二级分类和三级分类") + + categories_config = defaultdict(list) + + # 遍历每一行数据 + for _, row in df.iterrows(): + level2 = str(row.iloc[0]).strip() # 二级分类(第一列) + level3 = str(row.iloc[1]).strip() # 三级分类(第二列) + + # 跳过空行 + if not level2 or not level3: + continue + + # 确保三级分类不重复 + if level3 not in categories_config[level2]: + categories_config[level2].append(level3) + + return dict(categories_config) + + except FileNotFoundError: + print(f"错误:文件 {file_path} 不存在", flush=True) + return {} + except Exception as e: + print(f"处理文件时出错: {str(e)}", flush=True) + return {} + +def chat(content: str, models_url): + + payload = json.dumps( + { + "model": "Qwen2.5-72B-Instruct", + "stream": False, + "temperature": 0.5, + "top_p": 0.5, + "repetition_penalty": 1.05, + "messages": [{"role": "user", "content": f"{content}"}], + } + ) + headers = { + "Content-Type": "application/json", + "cache-control": "no-cache", + "Postman-Token": "4c70efd4-6448-4318-b2a9-e404f0181b80", + } + + try: + response = requests.request("POST", models_url, data=payload, headers=headers) + if response.status_code == 200: + response_data = response.json() + content = response_data["choices"][0]["message"]["content"] + else: + logger.info(f"response is: {response.json()}") + logger.info(f"Request failed with status code: {response.status_code}") + logger.info(f"Response content: {response.content}") + content = None + except Exception as e: + logger.error(f"resquest_exception: {e}", exc_info=True) + return content + +class FullyDynamicGenerator: + def __init__(self): + self.model_url = "http://100.105.61.165:8000/v1/chat/completions" + self.headers = { + "Content-Type": "application/json", + "Authorization": "7c3eafb5-2d6e-100d-ab0f-7b2c1cdafb3c" + } + self.model_name = "Qwen2.5-72B-Instruct" + self.faker = Faker('zh_CN') + self.dynamic_memory = {} + self.special_cases = [ + "方言沟通", "老年人口齿不清", "情绪激动打断对话", + "背景噪音干扰", "信号断续" + ] + # 添加锁用于线程安全的Excel写入 + self._export_lock = threading.Lock() + + def generate_dialog(self, category: str, subcategory: str, export_path: str = None) -> List[Dict]: + """全动态对话生成入口""" + scene_knowledge = self.generate_scene_knowledge(category, subcategory) + self.dynamic_memory[f"{category}_{subcategory}"] = scene_knowledge + dialog = [] + dialog.extend(self.generate_complex_opening(category, subcategory)) + dialog.extend(self.generate_obstacle_base_phase(scene_knowledge, subcategory)) + dialog.extend(self.generate_verification_with_challenges(dialog)) + dialog.extend(self.generate_technical_extend_phase(scene_knowledge, subcategory)) + dialog.extend(self.generate_final_confirmation(scene_knowledge, subcategory)) + + formatted_dialog = self.format_output(dialog) + + if export_path: + with self._export_lock: # 使用锁保证线程安全 + self.export_to_excel(formatted_dialog, export_path, category, subcategory) + + return formatted_dialog + + def _generate_single_dialog(self, category, subcategory, export_path, num_per_subcategory, i, total_tasks, current_task_counter): + """生成单个对话的辅助函数,用于并发执行""" + with current_task_counter.get_lock(): + current_task = current_task_counter.value + 1 + current_task_counter.value = current_task + + print(f"\n进度: {current_task}/{total_tasks} " + f"({(current_task/total_tasks)*100:.1f}%) - " + f"分类: {category} - " + f"子分类: {subcategory} - " + f"第 {i+1}/{num_per_subcategory} 条", flush=True) + + dialog = self.generate_dialog( + category=category, + subcategory=subcategory, + export_path=export_path + ) + return { + "category": category, + "subcategory": subcategory, + "dialog": dialog + } + + def generate_dialogs_in_batch(self, categories: Dict[str, List[str]], num_per_subcategory: int, export_path: str): + """ + 批量生成对话数据 + :param categories: 字典格式 {分类: [子分类1, 子分类2,...]} + :param num_per_subcategory: 每个子分类生成的数量 + :param export_path: 输出文件路径 + """ + all_dialogs = [] + + # 计算总任务量 + total_subcategories = sum(len(subcats) for subcats in categories.values()) + total_tasks = total_subcategories * num_per_subcategory + print(f"\n总共需要生成 {total_subcategories} 个子分类的数据,每个子分类 {num_per_subcategory} 条,共计 {total_tasks} 条对话记录", flush=True) + + # 使用ThreadPoolExecutor创建10个worker + with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: + # 创建共享计数器 + current_task_counter = multiprocessing.Value('i', 0) + + # 准备任务列表 + futures = [] + for category, subcategories in categories.items(): + for subcategory in subcategories: + for i in range(num_per_subcategory): + futures.append( + executor.submit( + self._generate_single_dialog, + category=category, + subcategory=subcategory, + export_path=export_path, + num_per_subcategory=num_per_subcategory, + i=i, + total_tasks=total_tasks, + current_task_counter=current_task_counter + ) + ) + + # 获取结果 + for future in concurrent.futures.as_completed(futures): + try: + result = future.result() + all_dialogs.append(result) + except Exception as e: + print(f"生成对话时出错: {str(e)}", flush=True) + + print(f"\n已完成所有生成任务,共生成{len(all_dialogs)}条对话记录", flush=True) + return all_dialogs + + def export_to_excel(self, dialog: List[Dict], file_path: str, category: str, subcategory: str): + """将整个对话作为一条记录保存到Excel文件(追加模式)""" + try: + # 合并对话内容,格式为:1. [客服]内容 + dialog_text = "\n".join( + [f"{turn['turn']}. {turn['speaker']} {turn['content']}" + for turn in dialog] + ) + + # 创建包含元数据的DataFrame + record = { + "分类": category, + "子分类": subcategory, + "对话轮数": len(dialog), + "对话内容": dialog_text, + } + + df = pd.DataFrame([record]) + + # 如果文件存在则追加,否则创建新文件 + if os.path.exists(file_path): + with pd.ExcelWriter(file_path, mode='a', engine='openpyxl', if_sheet_exists='overlay') as writer: + # 读取现有数据 + existing_df = pd.read_excel(file_path) + # 合并新旧数据 + combined_df = pd.concat([existing_df, df], ignore_index=True) + # 写入合并后的数据 + combined_df.to_excel(writer, index=False) + else: + # 确保目录存在 + os.makedirs(os.path.dirname(file_path), exist_ok=True) + df.to_excel(file_path, index=False) + + print(f"对话已成功保存到: {file_path}", flush=True) + + except Exception as e: + print(f"保存Excel文件时出错: {str(e)}", flush=True) + + def generate_complex_opening(self, category: str, subcategory: str) -> List[Tuple]: + """生成带复杂情形的开场对话""" + phase = [] + special_case = random.choice(self.special_cases + [None]*3) + + # 首先让客服说话 + response_text = "您好,我是政府热线服务,很高兴为您服务" + if special_case == "老年人口齿不清": + response_text += "(放慢语速)请您慢慢说" + phase.append(("客服", "greeting", response_text)) + + # 然后市民反馈问题 + citizen_traits = { + "方言": random.choice(["带浓重口音", "夹杂方言词汇", "语法不规范"]), + "老年人": random.choice(["说话缓慢", "重复语句", "耳背听不清"]), + "情绪化": random.choice(["不断打断", "提高音量", "带哭腔"]) + } + opening_prompt = f"""生成市民反映{subcategory}问题的电话开场白,要求: + 1. 必须包含"您好"等礼貌用语 + 2. 体现真实通话特征:{citizen_traits.get(special_case, "正常沟通")} + 3. 包含具体问题细节""" + opening = self.safe_llm_call( + prompt=opening_prompt, + system="你擅长模拟各类人群的真实对话", + response_format={"type": "json_object"} + ) + try: + opening_data = json.loads(opening) + opening_text = opening_data.get("text", f"您好,我要反映{subcategory}问题") + if special_case == "方言沟通": + opening_text = self.add_dialect_features(opening_text) + except: + opening_text = f"您好,我想投诉{subcategory}问题" + phase.append(("市民", "open_call", opening_text)) + + # 如果需要确认问题 + if special_case in ["方言沟通", "老年人口齿不清", "信号断续"]: + phase.append(("客服", "double_check", f"抱歉,刚才没有听清楚,您是说{subcategory}问题对吗?")) + phase.append(("市民", "clarify", random.choice([ + "对,就是这个问题", + f"不是,是{random.choice(['更严重','其他'])}的问题", + "(声音断断续续)喂...听得到吗?" + ]))) + return phase + + def generate_obstacle_base_phase(self, knowledge: Dict, scene: str) -> List[Tuple]: + """生成带沟通障碍的基础信息采集""" + phase = [] + required_fields = ["时间", "地点", "事件描述", "联系方式", "姓氏"] + for field in required_fields: + if random.random() < 0.1: + unclear_question = self.safe_llm_call( + prompt=f"仅返回生成有歧义的{field}的询问话术,仅返回询问话术,不返回额外内容", + system="故意制造1-2处不明确表述" + ) or f"那个...关于{field}的情况能不能说下?" + phase.append(("客服", "unclear_question", unclear_question)) + phase.append(("市民", "confused", "您问的是什么?我没听明白")) + question = self.safe_llm_call( + prompt=f"仅返回重新生成清晰的{field}询问话术", + system="使用最简明的表达" + ) or f"请提供{field}的具体信息" + phase.append(("客服", "retry_question", question)) + else: + question = self.safe_llm_call( + prompt=f"仅返回生成政务热线询问{field}的标准话术,场景:{scene},仅返回询问话术,不返回额外内容", + system="要求:1.使用敬语 2.明确信息要求" + ) or f"请问{scene}的{field}是?" + phase.append(("客服", "info_request", question)) + answer, needs_clarify = self.generate_complex_answer(scene, field, question) + phase.append(("市民", "info_response", answer)) + if needs_clarify: + clarify_question = self.safe_llm_call( + prompt=f"仅返回根据模糊回答'{answer}'生成澄清{field}的追问,仅返回追问内容,不返回额外内容", + system="要求:1.在追问中指出不明确处 2.进行礼貌的追问" + ) or f"您提供的{field}不够具体,请补充(例:{self.get_field_example(field)})" + phase.append(("客服", "clarify_request", clarify_question)) + if random.random() < 0.1: + phase.append(("市民", "refuse", random.choice([ + "这么麻烦不说了!", + "你们政府办事就是繁琐", + f"{field}有什么好问的!" + ]))) + phase.append(("客服", "calm_down", random.choice([ + "理解您的心情,但详细信息能帮助我们更快解决问题", + "抱歉给您带来不便,这是必要流程" + ]))) + phase.append(("市民", "clarified_response", f"哦,应该是{self.get_field_example(field)}")) + return phase + + def generate_complex_answer(self, scene: str, field: str, question) -> Tuple[str, bool]: + """生成带复杂特征的市民回答""" + if random.random() < 0.15: + special_answers = { + "时间": [ + ("就...就那个...前几天", True), + ("(背景嘈杂)喂?时间啊...上周?", True), + ("我不记得了!你们自己查!", False) + ], + "地点": [ + ("俺们村东头那个...那个啥来着", True), + ("(信号不好)在...哗哗...超市附近", True), + ("这么简单的问题都处理不了?", False) + ] + } + return random.choice(special_answers.get(field, [("这个我说不好", True)])) + answers = { + "时间": [ + (f"{random.choice(['今天','昨天'])}{random.randint(1,12)}点左右", False), + (f"持续{random.randint(2,24)}小时了", False) + ], + "地点": [ + (f"{self.faker.building_number()}号{random.choice(['东侧','南门'])}", False), + (f"{self.faker.street_name()}附近", True) + ], + "联系方式": [ + (f"{self.faker.phone_number()[:3]}****", True), + (f"固话:{self.faker.phone_number()[:4]}-{self.faker.phone_number()[-4:]}", False) + ], + "姓氏": [ + (f"免贵姓{self.faker.last_name()}", False), + ("叫我老李就行", True) + ] + } + common_answer = self.safe_llm_call( + prompt = f"""仅返回模拟市民对'{question}'的真实回答,要求:1. 包含具体{field}的细节数据。 2. 反映真实诉求和情绪梯度。""", + system="你是一个普通市民,回答要口语化并带生活细节" + ) + + return random.choice(answers.get(field, [(common_answer, False)])) + + def generate_verification_with_challenges(self, previous_dialog: List[Tuple]) -> List[Tuple]: + """生成带挑战的信息确认环节""" + phase = [] + collected_info = {} + for turn in previous_dialog: + if turn[1] in ["info_response", "clarified_response"]: + for field in ["时间", "地点", "姓氏"]: + if field in turn[2]: + collected_info[field] = turn[2] + if random.random() < 0.1: + collected_info[field] = self.get_wrong_info(field) + if collected_info: + if random.random() < 0.05: + wrong_field = random.choice(list(collected_info.keys())) + correct_value = collected_info[wrong_field] + collected_info[wrong_field] = self.get_wrong_info(wrong_field) + verification_text = self.safe_llm_call( + prompt="仅返回根据以下信息生成确认话术:" + json.dumps(collected_info, ensure_ascii=False), + system="要求:1.逐项确认 2.允许修正" + ) or f"我确认下:时间:{collected_info.get('时间','')},地点:{collected_info.get('地点','')}..." + phase.append(("客服", "info_verification", verification_text)) + if random.random() < 0.3: + correction_field = random.choice(list(collected_info.keys())) + phase.append(("市民", "correction", + f"{correction_field}不对!应该是{self.get_field_example(correction_field)}")) + if random.random() < 0.1: + phase.append(("市民", "angry", "你们连基本信息都记错!")) + phase.append(("客服", "apology", "非常抱歉,这是我们的失误")) + phase.append(("客服", "acknowledge_correction", f"已更正{correction_field}信息")) + phase.append(("市民", "final_confirmation", "现在对了")) + else: + phase.append(("市民", "confirmation", "对,没错")) + return phase + + def generate_technical_extend_phase(self, knowledge: Dict, scene: str) -> List[Tuple]: + """生成带技术障碍的扩展追问""" + phase = [] + for question_config in knowledge.get("extend_questions", []): + # 确保question变量总是有值 + question = question_config.get('prompt','') # 默认值 + + if random.random() < 0.05: + tech_question = self.safe_llm_call( + prompt=f"仅返回生成包含专业术语的{scene}问题", + system="使用3个以上专业词汇" + ) or f"请问{scene}的{random.choice(['频谱特征','声压级衰减曲线'])}是怎样的?" + phase.append(("客服", "technical_question", tech_question)) + phase.append(("市民", "not_understand", "这些专业名词听不懂")) + simplified = self.safe_llm_call( + prompt=f"仅将'{tech_question}'转化为的通俗问题", + system="用生活化比喻解释" + ) or f"就是问{scene}的具体表现是怎样的" + question = simplified # 更新question变量 + phase.append(("客服", "simplified_question", simplified)) + else: + generated_question = self.safe_llm_call( + prompt=f"仅返回基于{scene}场景生成的追问:{question_config.get('prompt','')}", + system="要求:1.分步骤询问 2.适度专业" + ) + question = generated_question or question_config.get('prompt','') # 确保question有值 + phase.append(("客服", "extend_question", question)) + + # 现在question变量肯定有值 + if random.random() < 0.15: + phase.append(("市民", "broken_response", "喂?...听得到吗?...我说到哪了?")) + phase.append(("客服", "reassure", "电话不太稳定,请您继续")) + + answer = self.generate_realistic_answer( + question, scene, question_config.get("theme",""), "extend" + ) + phase.append(("市民", "extend_answer", answer)) + + if random.random() < 0.1: + phase.append(("客服", "request_material", "需要您提供现场照片或录音证据")) + phase.append(("市民", "material_response", random.choice([ + "我手机里有,怎么发给你们?", + "现在拍不了,你们自己来看!" + ]))) + phase.append(("客服", "guide", "可以通过微信公众号'市民服务'上传")) + return phase + + def generate_final_confirmation(self, knowledge: Dict, scene: str) -> List[Tuple]: + """生成最终确认""" + phase = [] + confirmation = self.safe_llm_call( + prompt=f"仅返回生成{scene}问题的最终确认话术", + system="包含:1.处理时限 2.反馈方式 3.应急联系人" + ) or f"我们将在{random.choice(['24小时','3个工作日'])}内处理您的{scene}问题" + phase.append(("客服", "final_confirmation", confirmation)) + if random.random() < 0.2: + phase.append(("市民", "follow_up", random.choice([ + "如果超时没处理怎么办?", + "我要找哪个部门跟进?" + ]))) + phase.append(("客服", "replay", random.choice([ + "可拨打监督电话12345查询进度", + "我们会主动给您回复" + ]))) + return phase + + def generate_scene_knowledge(self, category: str, subcategory: str) -> Dict: + """动态生成场景知识图谱""" + prompt = f"""作为政务热线专家,请为【{category}->{subcategory}】场景生成知识配置,包含: + 1. 3-5个必问基础字段(如时间、地点) + 2. 3个专业追问方向及追问话术模板 + 3. 该场景涉及的相关部门和处理时限参考 + 仅返回JSON格式,结构示例: + {{ + "base_fields": [ + {{"field": "时间", "prompt": "询问具体时间的标准话术"}}, + {{"field": "地点", "prompt": "询问详细位置的专业话术"}} + ], + "extend_questions": [ + {{"theme": "历史记录", "prompt": "追问历史投诉情况的专业话术"}}, + {{"theme": "紧急程度", "prompt": "评估问题紧急程度的询问方式"}} + ], + "departments": ["城管局", "环保局"], + "time_ranges": ["24小时内", "3个工作日"] + }}""" + response = self.safe_llm_call( + prompt=prompt, + system="你是有10年经验的政务热线系统架构师", + response_format={"type": "json_object"} + ) + try: + knowledge = json.loads(response) + knowledge["confirmation_template"] = self.generate_confirmation_template( + category, subcategory, knowledge.get("departments", []), knowledge.get("time_ranges", []) + ) + return knowledge + except: + return self.get_fallback_knowledge(category, subcategory) + + def generate_confirmation_template(self, category: str, subcategory: str, + departments: List[str], time_ranges: List[str]) -> str: + """生成确认话术模板""" + prompt = f"""为【{category}->{subcategory}】创建确认话术模板,要求包含: + 1. 处理部门:{departments} + 2. 预计时限:{time_ranges} + 3. 至少2种后续跟进方式 + 模板示例:\"我们将协调{{department}}在{{timeframe}}内处理,可通过{{phone}}或{{wechat}}查询进展\" + """ + return self.safe_llm_call( + prompt=prompt, + system="你需创建可参数化的文本模板,用{}标记变量位置" + ) or f"我们将尽快处理您的{subcategory}问题" + + def generate_realistic_answer(self, question: str, scene: str, + field: str, answer_type: str) -> str: + """生成高真实性回答""" + prompt = f"""仅返回模拟市民对【{scene}】问题中'{question}'的真实回答,要求: + 1. 包含具体{field}的细节数据 + 2. 反映真实诉求和情绪梯度 + 3. 使用该场景典型市民的语言特征""" + system = { + "base": "你是一个普通市民,回答要口语化并带生活细节", + "extend": "你是有相关专业知识的市民,回答要包含技术参数和量化描述" + }[answer_type] + answer = self.safe_llm_call(prompt=prompt, system=system) + return answer or self.get_field_example(field) + + def get_field_example(self, field: str) -> str: + """获取字段示例""" + examples = { + "时间": "2023年10月15日下午3点20分", + "地点": "朝阳区建国路88号地下二层停车场", + "联系方式": "13800138000或010-12345678", + "姓氏": "张先生/李女士" + } + return examples.get(field, "具体情况是这样的...") + + def get_fallback_knowledge(self, category: str, subcategory: str) -> Dict: + """应急知识库""" + return { + "base_fields": [ + {"field": "时间", "prompt": f"请问{subcategory}发生的具体时间?"}, + {"field": "地点", "prompt": f"请说明{category}问题的详细位置?"} + ], + "extend_questions": [ + {"theme": "基本情况", "prompt": f"请描述{subcategory}的具体表现?"} + ], + "confirmation_template": f"我们将处理您的{category}问题", + "departments": ["相关部门"], + "time_ranges": ["尽快"] + } + + def add_dialect_features(self, text: str) -> str: + """添加方言特征""" + dialects = { + "北方方言": [("我", "俺"), ("的", "滴"), ("这个", "这玩意儿")], + "南方方言": [("是不是", "系唔系"), ("不知道", "母鸡"), ("说", "讲")] + } + dialect_type, replacements = random.choice(list(dialects.items())) + for orig, rep in replacements: + if orig in text: + return text.replace(orig, rep) + return text + random.choice(["晓得伐?", "中不中?", "得啵?"]) + + def get_wrong_info(self, field) -> str: + """生成错误信息""" + wrong_examples = { + "时间": random.choice(["昨天", "上周", "记不清了"]), + "地点": random.choice(["东边", "路口", "大概位置"]), + "姓氏": random.choice(["王", "李", "张"]) + } + return wrong_examples.get(field, "信息有误") + + def safe_llm_call(self, prompt: str, system: str = None,**kwargs) -> str: + """带熔断机制的API调用""" + try: + messages = [] + if system: + messages.append({"role": "system", "content": system}) + messages.append({"role": "user", "content": prompt}) + + data = { + "model": self.model_name, + "messages": messages, + "temperature": 0.7, + "max_tokens": 400 + } + + # 处理response_format参数 + if "response_format" in kwargs: + data["response_format"] = kwargs["response_format"] + + response = requests.post( + self.model_url, + headers=self.headers, + json=data, + timeout=60 + ) + + if response.status_code == 200: + return response.json()["choices"][0]["message"]["content"] + else: + print(f"API调用失败: {response.status_code}, {response.text}", flush=True) + return "" + + except Exception as e: + print(f"API异常: {str(e)}", flush=True) + return "" + + def format_output(self, dialog: List[Tuple]) -> List[Dict]: + """格式化输出,移除[xxx]类型标签""" + formatted = [] + for idx, (speaker, dtype, content) in enumerate(dialog): + # 移除类型标签,只保留说话人 + formatted.append({ + "turn": idx+1, + "speaker": f"[{speaker}]", + "content": content + }) + return formatted + + +if __name__ == "__main__": + import multiprocessing + import threading + + generator = FullyDynamicGenerator() + + # 示例文件路径 + excel_path = "/data/zhaochsh01/buquan/12345/zaoshu/count_3level.xlsx" + + # 读取并生成categories_config + categories_config = read_categories_config(excel_path) + + # 打印结果 + print("生成的categories_config:", flush=True) + for level2, level3_list in categories_config.items(): + print(f"{level2}: {level3_list}", flush=True) + + num_per_subcategory = 2 # 每个子分类生成3条数据 + output_file = "./output/政务热线对话记录更新.xlsx" + + # 批量生成数据 + generator.generate_dialogs_in_batch( + categories=categories_config, + num_per_subcategory=num_per_subcategory, + export_path=output_file + ) + + # 示例:打印最后生成的5条记录 + sample_df = pd.read_excel(output_file) + print("\n=== 最后5条记录示例 ===", flush=True) + print(sample_df.tail(), flush=True) \ No newline at end of file