diff --git a/data_generate/zw12345/zcs/baogao_content_extract_zaoshu.py b/data_generate/zw12345/zcs/baogao_content_extract_zaoshu.py deleted file mode 100644 index 7955d6f..0000000 --- a/data_generate/zw12345/zcs/baogao_content_extract_zaoshu.py +++ /dev/null @@ -1,164 +0,0 @@ -""" -政务12345全国数据生成系统 -功能: -1. 支持全国范围地理位置生成 -2. 多层级分类扩展 -3. 数据保存至Excel -4. 真实业务场景模拟 -""" - -import pandas as pd -import random -import time -import re -import json -import requests -from typing import List, Dict, Tuple - -class NationalDataGenerator: - def __init__(self, excel_path: str, category_column: str): - self.base_categories = self._load_excel_categories(excel_path, category_column) - self.location_pool = self._generate_national_locations() - self.expanded_categories = self._expand_categories_with_gpt() - self.used_records = set() - - - - def _chat(self, content: str) -> str: - """调用Qwen模型的统一接口""" - payload = json.dumps({ - "model": "Qwen2.5-72B-Instruct", - "stream": False, - "temperature": 0.01, - "top_p": 0.1, - "repetition_penalty": 1.05, - "messages": [{"role": "user", "content": content}], - }) - headers = { - "Content-Type": "application/json", - "cache-control": "no-cache" - } - - try: - response = requests.post("http://100.105.214.176:8000/v1/chat/completions", headers=headers, data=payload) - response.raise_for_status() - return response.json()["choices"][0]["message"]["content"] - except Exception as e: - print(f"API调用失败: {str(e)}") - return "" - - def _load_excel_categories(self, path: str, column: str) -> List[str]: - """从Excel读取基础分类""" - df = pd.read_excel(path) - return df[column].dropna().unique().tolist() - - def _generate_national_locations(self, num=200) -> List[str]: - """生成全国真实地理位置库""" - prompt = f"生成{num}个中国各城市真实存在的地理位置,按省市区三级格式,示例:\n- 广东省广州市天河区珠江新城\n- 浙江省杭州市余杭区未来科技城" - response = self._chat(prompt) - print("生成的地理位置库为") - print(response) - print(type(response)) - locations = [ - parts[1] # 取第二部分(地址) - for line in response.strip().split("\n") - if line and (parts := line.split(maxsplit=1)) and len(parts) >= 2 - ] - print(locations) - return locations - - def _expand_categories_with_gpt(self) -> Dict[str, List[str]]: - """Qwen扩展分类体系""" - category_map = {} - for base_cat in self.base_categories: - prompt = f"生成与【{base_cat}】相关但具有政务场景区分度的5个细分类型,示例:\n- 类型1:施工许可违规\n- 类型2:夜间施工超时" - response = self._chat(prompt) - print("扩展类型为") - print(response) - print(type(response)) - sub_cats = [ - re.sub(r"^.*类型\d+:|\s*$", "", line) # 移除 "类型X:" 和首尾空格 - for line in response.strip().split("\n") - if "类型" in line and ":" in line # 只处理包含 "类型" 和 ":" 的行 - ] - category_map[base_cat] = sub_cats - time.sleep(1) - return category_map - - def generate_dataset(self, num_records: int) -> pd.DataFrame: - """生成核心数据集""" - data = [] - while len(data) < num_records: - base_cat = random.choice(self.base_categories) - sub_cat = random.choice(self.expanded_categories[base_cat]) - location = random.choice(self.location_pool) - - content, keywords = self._generate_content(base_cat, sub_cat, location) - if content and self._validate_record(content, keywords, base_cat): - data.append({ - "ID": len(data)+1, - "内容": content, - "关键词": " ".join(keywords), - "参考答案": base_cat, - "细分类型": sub_cat, - "地理位置": location - }) - time.sleep(1.2) - - return pd.DataFrame(data) - - def _generate_content(self, base_cat: str, sub_cat: str, location: str) -> Tuple[str, List[str]]: - """生成政务工单内容""" - prompt = f"""生成真实可信的12345政务工单,要求: -1. 主分类:【{base_cat}】 -2. 细分类型:【{sub_cat}】 -3. 发生地点:【{location}】 -4. 包含要素:时间、具体问题、影响范围、市民诉求 -5. 生成5个关键词(必须包含{base_cat}) -6. 内容长度80-150字 - -示例格式: -市民反映{location}某建筑工地违规夜间施工至凌晨,噪音严重干扰周边居民。已向环保部门投诉3次未解决,要求立即停工整顿。 -关键词:夜间施工 噪音污染 环保投诉 施工许可 居民维权""" - - try: - response = self._chat(prompt) - raw_text = response.strip() - return self._parse_generated_text(raw_text) - except Exception as e: - print(f"生成失败:{str(e)}") - return None, [] - - def _parse_generated_text(self, text: str) -> Tuple[str, List[str]]: - """解析生成文本""" - content = re.sub(r"关键词:.*", "", text).strip() - keywords = re.findall(r"关键词:(.+)", text)[0].split()[:5] - return content, keywords - - def _validate_record(self, content: str, keywords: List[str], category: str) -> bool: - """五重数据校验""" - return ( - len(content) >= 80 and - len(keywords) == 5 and - category in keywords and - content not in self.used_records and - any(c.isdigit() for c in content) # 包含数字要素 - ) - - -if __name__ == "__main__": - # 初始化生成器 - generator = NationalDataGenerator( - excel_path="/data/zhaochsh01/buquan/12345/zaoshu/12345政务服务大模型测试集.xlsx", - category_column="answer" - ) - - # 生成100条数据 - df = generator.generate_dataset(100) - - # 保存到Excel - with pd.ExcelWriter("./output/government_12345_data.xlsx") as writer: - df.to_excel(writer, index=False) - - print("生成数据示例:") - print(df[["ID", "内容", "关键词", "参考答案"]].head(3).to_string(index=False)) \ No newline at end of file diff --git a/data_generate/zw12345/zcs/duihuazaoshu_piliang4.py b/data_generate/zw12345/zcs/duihuazaoshu_piliang4.py deleted file mode 100644 index a94a3fe..0000000 --- a/data_generate/zw12345/zcs/duihuazaoshu_piliang4.py +++ /dev/null @@ -1,640 +0,0 @@ -import requests -from openpyxl import Workbook -from openpyxl.styles import Font, Alignment -import os -from faker import Faker -import json -import random -from typing import List, Dict, Tuple -import pandas as pd -from collections import defaultdict -import concurrent.futures -from functools import partial - -def read_categories_config(file_path): - try: - # 读取Excel文件(假设前两列是二级和三级分类) - df = pd.read_excel(file_path) - - # 检查至少有两列数据 - if len(df.columns) < 2: - raise ValueError("Excel文件必须至少包含两列:二级分类和三级分类") - - categories_config = defaultdict(list) - - # 遍历每一行数据 - for _, row in df.iterrows(): - level2 = str(row.iloc[0]).strip() # 二级分类(第一列) - level3 = str(row.iloc[1]).strip() # 三级分类(第二列) - - # 跳过空行 - if not level2 or not level3: - continue - - # 确保三级分类不重复 - if level3 not in categories_config[level2]: - categories_config[level2].append(level3) - - return dict(categories_config) - - except FileNotFoundError: - print(f"错误:文件 {file_path} 不存在", flush=True) - return {} - except Exception as e: - print(f"处理文件时出错: {str(e)}", flush=True) - return {} - -def chat(content: str, models_url): - - payload = json.dumps( - { - "model": "Qwen2.5-72B-Instruct", - "stream": False, - "temperature": 0.5, - "top_p": 0.5, - "repetition_penalty": 1.05, - "messages": [{"role": "user", "content": f"{content}"}], - } - ) - headers = { - "Content-Type": "application/json", - "cache-control": "no-cache", - "Postman-Token": "4c70efd4-6448-4318-b2a9-e404f0181b80", - } - - try: - response = requests.request("POST", models_url, data=payload, headers=headers) - if response.status_code == 200: - response_data = response.json() - content = response_data["choices"][0]["message"]["content"] - else: - logger.info(f"response is: {response.json()}") - logger.info(f"Request failed with status code: {response.status_code}") - logger.info(f"Response content: {response.content}") - content = None - except Exception as e: - logger.error(f"resquest_exception: {e}", exc_info=True) - return content - -class FullyDynamicGenerator: - def __init__(self): - self.model_url = "http://100.105.61.165:8000/v1/chat/completions" - self.headers = { - "Content-Type": "application/json", - "Authorization": "7c3eafb5-2d6e-100d-ab0f-7b2c1cdafb3c" - } - self.model_name = "Qwen2.5-72B-Instruct" - self.faker = Faker('zh_CN') - self.dynamic_memory = {} - self.special_cases = [ - "方言沟通", "老年人口齿不清", "情绪激动打断对话", - "背景噪音干扰", "信号断续" - ] - # 添加锁用于线程安全的Excel写入 - self._export_lock = threading.Lock() - - def generate_dialog(self, category: str, subcategory: str, export_path: str = None) -> List[Dict]: - """全动态对话生成入口""" - scene_knowledge = self.generate_scene_knowledge(category, subcategory) - self.dynamic_memory[f"{category}_{subcategory}"] = scene_knowledge - dialog = [] - dialog.extend(self.generate_complex_opening(category, subcategory)) - dialog.extend(self.generate_obstacle_base_phase(scene_knowledge, subcategory)) - dialog.extend(self.generate_verification_with_challenges(dialog)) - dialog.extend(self.generate_technical_extend_phase(scene_knowledge, subcategory)) - dialog.extend(self.generate_final_confirmation(scene_knowledge, subcategory)) - - formatted_dialog = self.format_output(dialog) - - if export_path: - with self._export_lock: # 使用锁保证线程安全 - self.export_to_excel(formatted_dialog, export_path, category, subcategory) - - return formatted_dialog - - def _generate_single_dialog(self, category, subcategory, export_path, num_per_subcategory, i, total_tasks, current_task_counter): - """生成单个对话的辅助函数,用于并发执行""" - with current_task_counter.get_lock(): - current_task = current_task_counter.value + 1 - current_task_counter.value = current_task - - print(f"\n进度: {current_task}/{total_tasks} " - f"({(current_task/total_tasks)*100:.1f}%) - " - f"分类: {category} - " - f"子分类: {subcategory} - " - f"第 {i+1}/{num_per_subcategory} 条", flush=True) - - dialog = self.generate_dialog( - category=category, - subcategory=subcategory, - export_path=export_path - ) - return { - "category": category, - "subcategory": subcategory, - "dialog": dialog - } - - def generate_dialogs_in_batch(self, categories: Dict[str, List[str]], num_per_subcategory: int, export_path: str): - """ - 批量生成对话数据 - :param categories: 字典格式 {分类: [子分类1, 子分类2,...]} - :param num_per_subcategory: 每个子分类生成的数量 - :param export_path: 输出文件路径 - """ - all_dialogs = [] - - # 计算总任务量 - total_subcategories = sum(len(subcats) for subcats in categories.values()) - total_tasks = total_subcategories * num_per_subcategory - print(f"\n总共需要生成 {total_subcategories} 个子分类的数据,每个子分类 {num_per_subcategory} 条,共计 {total_tasks} 条对话记录", flush=True) - - # 使用ThreadPoolExecutor创建10个worker - with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: - # 创建共享计数器 - current_task_counter = multiprocessing.Value('i', 0) - - # 准备任务列表 - futures = [] - for category, subcategories in categories.items(): - for subcategory in subcategories: - for i in range(num_per_subcategory): - futures.append( - executor.submit( - self._generate_single_dialog, - category=category, - subcategory=subcategory, - export_path=export_path, - num_per_subcategory=num_per_subcategory, - i=i, - total_tasks=total_tasks, - current_task_counter=current_task_counter - ) - ) - - # 获取结果 - for future in concurrent.futures.as_completed(futures): - try: - result = future.result() - all_dialogs.append(result) - except Exception as e: - print(f"生成对话时出错: {str(e)}", flush=True) - - print(f"\n已完成所有生成任务,共生成{len(all_dialogs)}条对话记录", flush=True) - return all_dialogs - - def export_to_excel(self, dialog: List[Dict], file_path: str, category: str, subcategory: str): - """将整个对话作为一条记录保存到Excel文件(追加模式)""" - try: - # 合并对话内容,格式为:1. [客服]内容 - dialog_text = "\n".join( - [f"{turn['turn']}. {turn['speaker']} {turn['content']}" - for turn in dialog] - ) - - # 创建包含元数据的DataFrame - record = { - "分类": category, - "子分类": subcategory, - "对话轮数": len(dialog), - "对话内容": dialog_text, - } - - df = pd.DataFrame([record]) - - # 如果文件存在则追加,否则创建新文件 - if os.path.exists(file_path): - with pd.ExcelWriter(file_path, mode='a', engine='openpyxl', if_sheet_exists='overlay') as writer: - # 读取现有数据 - existing_df = pd.read_excel(file_path) - # 合并新旧数据 - combined_df = pd.concat([existing_df, df], ignore_index=True) - # 写入合并后的数据 - combined_df.to_excel(writer, index=False) - else: - # 确保目录存在 - os.makedirs(os.path.dirname(file_path), exist_ok=True) - df.to_excel(file_path, index=False) - - print(f"对话已成功保存到: {file_path}", flush=True) - - except Exception as e: - print(f"保存Excel文件时出错: {str(e)}", flush=True) - - def generate_complex_opening(self, category: str, subcategory: str) -> List[Tuple]: - """生成带复杂情形的开场对话""" - phase = [] - special_case = random.choice(self.special_cases + [None]*3) - - # 首先让客服说话 - response_text = "您好,我是政府热线服务,很高兴为您服务" - if special_case == "老年人口齿不清": - response_text += "(放慢语速)请您慢慢说" - phase.append(("客服", "greeting", response_text)) - - # 然后市民反馈问题 - citizen_traits = { - "方言": random.choice(["带浓重口音", "夹杂方言词汇", "语法不规范"]), - "老年人": random.choice(["说话缓慢", "重复语句", "耳背听不清"]), - "情绪化": random.choice(["不断打断", "提高音量", "带哭腔"]) - } - opening_prompt = f"""生成市民反映{subcategory}问题的电话开场白,要求: - 1. 必须包含"您好"等礼貌用语 - 2. 体现真实通话特征:{citizen_traits.get(special_case, "正常沟通")} - 3. 包含具体问题细节""" - opening = self.safe_llm_call( - prompt=opening_prompt, - system="你擅长模拟各类人群的真实对话", - response_format={"type": "json_object"} - ) - try: - opening_data = json.loads(opening) - opening_text = opening_data.get("text", f"您好,我要反映{subcategory}问题") - if special_case == "方言沟通": - opening_text = self.add_dialect_features(opening_text) - except: - opening_text = f"您好,我想投诉{subcategory}问题" - phase.append(("市民", "open_call", opening_text)) - - # 如果需要确认问题 - if special_case in ["方言沟通", "老年人口齿不清", "信号断续"]: - phase.append(("客服", "double_check", f"抱歉,刚才没有听清楚,您是说{subcategory}问题对吗?")) - phase.append(("市民", "clarify", random.choice([ - "对,就是这个问题", - f"不是,是{random.choice(['更严重','其他'])}的问题", - "(声音断断续续)喂...听得到吗?" - ]))) - return phase - - def generate_obstacle_base_phase(self, knowledge: Dict, scene: str) -> List[Tuple]: - """生成带沟通障碍的基础信息采集""" - phase = [] - required_fields = ["时间", "地点", "事件描述", "联系方式", "姓氏"] - for field in required_fields: - if random.random() < 0.1: - unclear_question = self.safe_llm_call( - prompt=f"仅返回生成有歧义的{field}的询问话术,仅返回询问话术,不返回额外内容", - system="故意制造1-2处不明确表述" - ) or f"那个...关于{field}的情况能不能说下?" - phase.append(("客服", "unclear_question", unclear_question)) - phase.append(("市民", "confused", "您问的是什么?我没听明白")) - question = self.safe_llm_call( - prompt=f"仅返回重新生成清晰的{field}询问话术", - system="使用最简明的表达" - ) or f"请提供{field}的具体信息" - phase.append(("客服", "retry_question", question)) - else: - question = self.safe_llm_call( - prompt=f"仅返回生成政务热线询问{field}的标准话术,场景:{scene},仅返回询问话术,不返回额外内容", - system="要求:1.使用敬语 2.明确信息要求" - ) or f"请问{scene}的{field}是?" - phase.append(("客服", "info_request", question)) - answer, needs_clarify = self.generate_complex_answer(scene, field, question) - phase.append(("市民", "info_response", answer)) - if needs_clarify: - clarify_question = self.safe_llm_call( - prompt=f"仅返回根据模糊回答'{answer}'生成澄清{field}的追问,仅返回追问内容,不返回额外内容", - system="要求:1.在追问中指出不明确处 2.进行礼貌的追问" - ) or f"您提供的{field}不够具体,请补充(例:{self.get_field_example(field)})" - phase.append(("客服", "clarify_request", clarify_question)) - if random.random() < 0.1: - phase.append(("市民", "refuse", random.choice([ - "这么麻烦不说了!", - "你们政府办事就是繁琐", - f"{field}有什么好问的!" - ]))) - phase.append(("客服", "calm_down", random.choice([ - "理解您的心情,但详细信息能帮助我们更快解决问题", - "抱歉给您带来不便,这是必要流程" - ]))) - phase.append(("市民", "clarified_response", f"哦,应该是{self.get_field_example(field)}")) - return phase - - def generate_complex_answer(self, scene: str, field: str, question) -> Tuple[str, bool]: - """生成带复杂特征的市民回答""" - if random.random() < 0.15: - special_answers = { - "时间": [ - ("就...就那个...前几天", True), - ("(背景嘈杂)喂?时间啊...上周?", True), - ("我不记得了!你们自己查!", False) - ], - "地点": [ - ("俺们村东头那个...那个啥来着", True), - ("(信号不好)在...哗哗...超市附近", True), - ("这么简单的问题都处理不了?", False) - ] - } - return random.choice(special_answers.get(field, [("这个我说不好", True)])) - answers = { - "时间": [ - (f"{random.choice(['今天','昨天'])}{random.randint(1,12)}点左右", False), - (f"持续{random.randint(2,24)}小时了", False) - ], - "地点": [ - (f"{self.faker.building_number()}号{random.choice(['东侧','南门'])}", False), - (f"{self.faker.street_name()}附近", True) - ], - "联系方式": [ - (f"{self.faker.phone_number()[:3]}****", True), - (f"固话:{self.faker.phone_number()[:4]}-{self.faker.phone_number()[-4:]}", False) - ], - "姓氏": [ - (f"免贵姓{self.faker.last_name()}", False), - ("叫我老李就行", True) - ] - } - common_answer = self.safe_llm_call( - prompt = f"""仅返回模拟市民对'{question}'的真实回答,要求:1. 包含具体{field}的细节数据。 2. 反映真实诉求和情绪梯度。""", - system="你是一个普通市民,回答要口语化并带生活细节" - ) - - return random.choice(answers.get(field, [(common_answer, False)])) - - def generate_verification_with_challenges(self, previous_dialog: List[Tuple]) -> List[Tuple]: - """生成带挑战的信息确认环节""" - phase = [] - collected_info = {} - for turn in previous_dialog: - if turn[1] in ["info_response", "clarified_response"]: - for field in ["时间", "地点", "姓氏"]: - if field in turn[2]: - collected_info[field] = turn[2] - if random.random() < 0.1: - collected_info[field] = self.get_wrong_info(field) - if collected_info: - if random.random() < 0.05: - wrong_field = random.choice(list(collected_info.keys())) - correct_value = collected_info[wrong_field] - collected_info[wrong_field] = self.get_wrong_info(wrong_field) - verification_text = self.safe_llm_call( - prompt="仅返回根据以下信息生成确认话术:" + json.dumps(collected_info, ensure_ascii=False), - system="要求:1.逐项确认 2.允许修正" - ) or f"我确认下:时间:{collected_info.get('时间','')},地点:{collected_info.get('地点','')}..." - phase.append(("客服", "info_verification", verification_text)) - if random.random() < 0.3: - correction_field = random.choice(list(collected_info.keys())) - phase.append(("市民", "correction", - f"{correction_field}不对!应该是{self.get_field_example(correction_field)}")) - if random.random() < 0.1: - phase.append(("市民", "angry", "你们连基本信息都记错!")) - phase.append(("客服", "apology", "非常抱歉,这是我们的失误")) - phase.append(("客服", "acknowledge_correction", f"已更正{correction_field}信息")) - phase.append(("市民", "final_confirmation", "现在对了")) - else: - phase.append(("市民", "confirmation", "对,没错")) - return phase - - def generate_technical_extend_phase(self, knowledge: Dict, scene: str) -> List[Tuple]: - """生成带技术障碍的扩展追问""" - phase = [] - for question_config in knowledge.get("extend_questions", []): - # 确保question变量总是有值 - question = question_config.get('prompt','') # 默认值 - - if random.random() < 0.05: - tech_question = self.safe_llm_call( - prompt=f"仅返回生成包含专业术语的{scene}问题", - system="使用3个以上专业词汇" - ) or f"请问{scene}的{random.choice(['频谱特征','声压级衰减曲线'])}是怎样的?" - phase.append(("客服", "technical_question", tech_question)) - phase.append(("市民", "not_understand", "这些专业名词听不懂")) - simplified = self.safe_llm_call( - prompt=f"仅将'{tech_question}'转化为的通俗问题", - system="用生活化比喻解释" - ) or f"就是问{scene}的具体表现是怎样的" - question = simplified # 更新question变量 - phase.append(("客服", "simplified_question", simplified)) - else: - generated_question = self.safe_llm_call( - prompt=f"仅返回基于{scene}场景生成的追问:{question_config.get('prompt','')}", - system="要求:1.分步骤询问 2.适度专业" - ) - question = generated_question or question_config.get('prompt','') # 确保question有值 - phase.append(("客服", "extend_question", question)) - - # 现在question变量肯定有值 - if random.random() < 0.15: - phase.append(("市民", "broken_response", "喂?...听得到吗?...我说到哪了?")) - phase.append(("客服", "reassure", "电话不太稳定,请您继续")) - - answer = self.generate_realistic_answer( - question, scene, question_config.get("theme",""), "extend" - ) - phase.append(("市民", "extend_answer", answer)) - - if random.random() < 0.1: - phase.append(("客服", "request_material", "需要您提供现场照片或录音证据")) - phase.append(("市民", "material_response", random.choice([ - "我手机里有,怎么发给你们?", - "现在拍不了,你们自己来看!" - ]))) - phase.append(("客服", "guide", "可以通过微信公众号'市民服务'上传")) - return phase - - def generate_final_confirmation(self, knowledge: Dict, scene: str) -> List[Tuple]: - """生成最终确认""" - phase = [] - confirmation = self.safe_llm_call( - prompt=f"仅返回生成{scene}问题的最终确认话术", - system="包含:1.处理时限 2.反馈方式 3.应急联系人" - ) or f"我们将在{random.choice(['24小时','3个工作日'])}内处理您的{scene}问题" - phase.append(("客服", "final_confirmation", confirmation)) - if random.random() < 0.2: - phase.append(("市民", "follow_up", random.choice([ - "如果超时没处理怎么办?", - "我要找哪个部门跟进?" - ]))) - phase.append(("客服", "replay", random.choice([ - "可拨打监督电话12345查询进度", - "我们会主动给您回复" - ]))) - return phase - - def generate_scene_knowledge(self, category: str, subcategory: str) -> Dict: - """动态生成场景知识图谱""" - prompt = f"""作为政务热线专家,请为【{category}->{subcategory}】场景生成知识配置,包含: - 1. 3-5个必问基础字段(如时间、地点) - 2. 3个专业追问方向及追问话术模板 - 3. 该场景涉及的相关部门和处理时限参考 - 仅返回JSON格式,结构示例: - {{ - "base_fields": [ - {{"field": "时间", "prompt": "询问具体时间的标准话术"}}, - {{"field": "地点", "prompt": "询问详细位置的专业话术"}} - ], - "extend_questions": [ - {{"theme": "历史记录", "prompt": "追问历史投诉情况的专业话术"}}, - {{"theme": "紧急程度", "prompt": "评估问题紧急程度的询问方式"}} - ], - "departments": ["城管局", "环保局"], - "time_ranges": ["24小时内", "3个工作日"] - }}""" - response = self.safe_llm_call( - prompt=prompt, - system="你是有10年经验的政务热线系统架构师", - response_format={"type": "json_object"} - ) - try: - knowledge = json.loads(response) - knowledge["confirmation_template"] = self.generate_confirmation_template( - category, subcategory, knowledge.get("departments", []), knowledge.get("time_ranges", []) - ) - return knowledge - except: - return self.get_fallback_knowledge(category, subcategory) - - def generate_confirmation_template(self, category: str, subcategory: str, - departments: List[str], time_ranges: List[str]) -> str: - """生成确认话术模板""" - prompt = f"""为【{category}->{subcategory}】创建确认话术模板,要求包含: - 1. 处理部门:{departments} - 2. 预计时限:{time_ranges} - 3. 至少2种后续跟进方式 - 模板示例:\"我们将协调{{department}}在{{timeframe}}内处理,可通过{{phone}}或{{wechat}}查询进展\" - """ - return self.safe_llm_call( - prompt=prompt, - system="你需创建可参数化的文本模板,用{}标记变量位置" - ) or f"我们将尽快处理您的{subcategory}问题" - - def generate_realistic_answer(self, question: str, scene: str, - field: str, answer_type: str) -> str: - """生成高真实性回答""" - prompt = f"""仅返回模拟市民对【{scene}】问题中'{question}'的真实回答,要求: - 1. 包含具体{field}的细节数据 - 2. 反映真实诉求和情绪梯度 - 3. 使用该场景典型市民的语言特征""" - system = { - "base": "你是一个普通市民,回答要口语化并带生活细节", - "extend": "你是有相关专业知识的市民,回答要包含技术参数和量化描述" - }[answer_type] - answer = self.safe_llm_call(prompt=prompt, system=system) - return answer or self.get_field_example(field) - - def get_field_example(self, field: str) -> str: - """获取字段示例""" - examples = { - "时间": "2023年10月15日下午3点20分", - "地点": "朝阳区建国路88号地下二层停车场", - "联系方式": "13800138000或010-12345678", - "姓氏": "张先生/李女士" - } - return examples.get(field, "具体情况是这样的...") - - def get_fallback_knowledge(self, category: str, subcategory: str) -> Dict: - """应急知识库""" - return { - "base_fields": [ - {"field": "时间", "prompt": f"请问{subcategory}发生的具体时间?"}, - {"field": "地点", "prompt": f"请说明{category}问题的详细位置?"} - ], - "extend_questions": [ - {"theme": "基本情况", "prompt": f"请描述{subcategory}的具体表现?"} - ], - "confirmation_template": f"我们将处理您的{category}问题", - "departments": ["相关部门"], - "time_ranges": ["尽快"] - } - - def add_dialect_features(self, text: str) -> str: - """添加方言特征""" - dialects = { - "北方方言": [("我", "俺"), ("的", "滴"), ("这个", "这玩意儿")], - "南方方言": [("是不是", "系唔系"), ("不知道", "母鸡"), ("说", "讲")] - } - dialect_type, replacements = random.choice(list(dialects.items())) - for orig, rep in replacements: - if orig in text: - return text.replace(orig, rep) - return text + random.choice(["晓得伐?", "中不中?", "得啵?"]) - - def get_wrong_info(self, field) -> str: - """生成错误信息""" - wrong_examples = { - "时间": random.choice(["昨天", "上周", "记不清了"]), - "地点": random.choice(["东边", "路口", "大概位置"]), - "姓氏": random.choice(["王", "李", "张"]) - } - return wrong_examples.get(field, "信息有误") - - def safe_llm_call(self, prompt: str, system: str = None,**kwargs) -> str: - """带熔断机制的API调用""" - try: - messages = [] - if system: - messages.append({"role": "system", "content": system}) - messages.append({"role": "user", "content": prompt}) - - data = { - "model": self.model_name, - "messages": messages, - "temperature": 0.7, - "max_tokens": 400 - } - - # 处理response_format参数 - if "response_format" in kwargs: - data["response_format"] = kwargs["response_format"] - - response = requests.post( - self.model_url, - headers=self.headers, - json=data, - timeout=60 - ) - - if response.status_code == 200: - return response.json()["choices"][0]["message"]["content"] - else: - print(f"API调用失败: {response.status_code}, {response.text}", flush=True) - return "" - - except Exception as e: - print(f"API异常: {str(e)}", flush=True) - return "" - - def format_output(self, dialog: List[Tuple]) -> List[Dict]: - """格式化输出,移除[xxx]类型标签""" - formatted = [] - for idx, (speaker, dtype, content) in enumerate(dialog): - # 移除类型标签,只保留说话人 - formatted.append({ - "turn": idx+1, - "speaker": f"[{speaker}]", - "content": content - }) - return formatted - - -if __name__ == "__main__": - import multiprocessing - import threading - - generator = FullyDynamicGenerator() - - # 示例文件路径 - excel_path = "/data/zhaochsh01/buquan/12345/zaoshu/count_3level.xlsx" - - # 读取并生成categories_config - categories_config = read_categories_config(excel_path) - - # 打印结果 - print("生成的categories_config:", flush=True) - for level2, level3_list in categories_config.items(): - print(f"{level2}: {level3_list}", flush=True) - - num_per_subcategory = 2 # 每个子分类生成3条数据 - output_file = "./output/政务热线对话记录更新.xlsx" - - # 批量生成数据 - generator.generate_dialogs_in_batch( - categories=categories_config, - num_per_subcategory=num_per_subcategory, - export_path=output_file - ) - - # 示例:打印最后生成的5条记录 - sample_df = pd.read_excel(output_file) - print("\n=== 最后5条记录示例 ===", flush=True) - print(sample_df.tail(), flush=True) \ No newline at end of file