offline_data_model_pipline/data_generate/query_completion/prompt_label.py

166 lines
6.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright @2024 AI. Inspur Inc.
#
# @author: suojiayi <suojiayi@inspur.com>
# @date: 2025/05/13
import json
import requests
import pandas as pd
from concurrent.futures import ThreadPoolExecutor, as_completed
def read_jsonl_lines_in_batches(file_path, batch_size=10000):
"""按批次读取 JSONL 文件"""
batch = []
with open(file_path, mode="r", encoding="utf-8") as f:
for line in f:
try:
batch.append(json.loads(line.strip()))
if len(batch) == batch_size:
yield batch
batch = []
except json.JSONDecodeError as e:
print(f"Error decoding JSON: {e}")
if batch:
yield batch
def process_data_concurrently(data_list, api_url, headers, max_workers=10):
"""并发处理数据并调用 API"""
result_data = []
def process_single_data(data):
try:
query = data.get('data')
if query:
input_content = f'''你是一个多轮对话分析专家。请根据以下维度对给定的多轮对话2-3轮进行细粒度分类并按统一格式返回结果。
# 输入规范
1. 输入为完整的多轮对话2-3组问答对
2. 每组问答包含明确的用户提问和系统/客服响应
# 核心分析维度
## 1. 指代消解模式
- 显式指代直接使用完整名词订单号123
- 代词依赖:需解析"它/这个/那个"等代词
- 省略恢复:需补全省略成分("还有呢?""其他问题"
- 零指代:无主语句("还没处理"
## 2. 信息流结构
- 自足单元:当前轮次信息完整
- 跨轮依赖:需前文关键信息
- 隐式跳转:无过渡的话题切换
- 意图延续:延续前轮任务目标
- 推理关联:需逻辑推算建立联系
## 3. 上下文管理
- 短期记忆依赖1-2轮内信息
- 长期记忆引用3轮前历史记录
- 推理记忆:需计算/推理关联信息
## 4. 状态演化
- 静态查询:纯信息检索
- 动态演进:逐步完善任务参数
- 混合操作:查询+修改组合
- 信息修正:更正前轮错误数据
# 输出规范'''+'''```json
{
"labels": ["指代消解::显式指代", "信息流::跨轮依赖", "上下文::推理记忆", "状态演化::动态演进"],
"analysis": {
"指代消解": {
"类型": "显式指代",
"证据": "例句:'订单A123'直接使用完整标识符"
},
"信息流": {
"类型": "跨轮依赖",
"证据": "客服要求提供订单号是对用户首轮请求的响应"
},
"上下文": {
"类型": "推理记忆",
"证据": "需要根据首轮日期推算当前状态"
},
"状态演化": {
"类型": "动态演进",
"证据": "从查询请求逐步收集必要参数"
}
}
}```'''+f'''让我们一步一步思考,给出最后的返回结果,输入的多轮对话:{query}'''
response = requests.post(
api_url,
headers=headers,
json={
"model": "Qwen2.5-72B-Instruct",
"stream": False,
"temperature": 0.01,
"messages": [{"role": "user", "content": input_content}]
}
)
if response.status_code == 200:
try:
content = response.json()["choices"][0]["message"]["content"]
except (KeyError, IndexError, json.JSONDecodeError):
content = "无法解析返回内容"
else:
content = f"API请求失败状态码{response.status_code}"
return {
"uid": data.get('uid'),
"data": query,
"answer": content
}
except Exception as e:
print(e)
return None
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(process_single_data, data) for data in data_list]
for future in as_completed(futures):
result = future.result()
if result:
result_data.append(result)
return result_data
def save_to_excel_in_batches(data_list, output_file, batch_size=10000):
"""按批次保存数据到 Excel 文件"""
df = pd.DataFrame(data_list)
writer = pd.ExcelWriter(output_file, engine='openpyxl')
for i in range(0, len(df), batch_size):
batch_df = df.iloc[i:i + batch_size]
batch_df.to_excel(writer, index=False, startrow=i)
writer.close()
print(f"数据已成功保存到 {output_file}")
def save_to_jsonl_in_batches(data_list, output_file, batch_size=10000):
"""按批次保存数据到 JSONL 文件"""
with open(output_file, 'w', encoding='utf-8') as f:
for i in range(0, len(data_list), batch_size):
# 获取当前批次的数据
batch_data = data_list[i:i + batch_size]
# 将每个数据对象写入文件,每行一个 JSON 对象
for item in batch_data:
f.write(json.dumps(item, ensure_ascii=False) + '\n')
print(f"数据已成功保存到 {output_file}")
if __name__ == "__main__":
#output_excel_file = 'result-taoli-5.xlsx'
# api_url = "http://100.105.149.39:8000/v1/chat/completions"
api_url = "http://100.105.230.95:8000/v1/chat/completions"
headers = {
"Content-Type": "application/json",
"Authorization": "7c3eafb5-2d6e-100d-ab0f-7b2c1cdafb3c"
}
#file_path = '/dataset-pvc/suojiayi/new/train_prepare/20250423_020157/tmp_data/instruct_data_BELLE_Multiturn_Chat_filtered_2504232014.jsonl'
file_path = '/data/suojiayi/buquan/split_dhbq/part_2.jsonl'
output_file = './dhbq/duihuabuquan_prompt_2.jsonl'
#file_path = '/dataset-pvc/suojiayi/new/train_prepare/20250423_020157/tmp_data/instruct_data_COIG_filtered_2504212014.jsonl'
all_results = []
for batch in read_jsonl_lines_in_batches(file_path, batch_size=10000):
processed_batch = process_data_concurrently(batch, api_url, headers, max_workers=20)
all_results.extend(processed_batch)
# save_to_excel_in_batches(all_results, output_excel_file, batch_size=23000)
save_to_jsonl_in_batches(all_results, output_file, batch_size=10000)