diff --git a/data_generate/query_completion/README.md b/data_generate/query_completion/README.md new file mode 100644 index 0000000..a78ba8b --- /dev/null +++ b/data_generate/query_completion/README.md @@ -0,0 +1,26 @@ +# 数据处理流程 + +## 数据打分 +- **Score**: 对数据进行评分,生成带有分数的数据集。 + +## 数据标签 +- **Instag**: 生成`dhbq_instag.jsonl`。 +- **Prompt Label** +- **Export Embedding + Cluster KMeans**: 使用KMeans聚类算法对导出的embedding进行聚类,得到`dhbq_cluster_kmeans_result.jsonl`。 + +## 数据合并 +- **Merge**: 将得分和三种类型的标签(`instag`、`prompt_label`、`cluster`)合并到一起,生成`dhbq_merged_with_score.jsonl`。 + - 获取得分等于5的数据,生成`dhbq_merged_with_score_5.jsonl`。 + +## 高频问题处理 +- **Frequency**: 基于原始文件统计问题出现的频率,并识别得分等于5的问题。 +- **Embedding Similarity**: 使用embedding相似度过滤重复问题;对于无法通过此方法过滤的问题,将采取人工审查的方式。 + +## 类别统计 +- **Count Instag**: 统计`instag`标签的数量。 +- **Count Label**: 统计`prompt_label`的数量。 +- **Count Cluster**: 统计通过KMeans聚类结果的数量。 + +## 数据获取 +- **Select Final Data**: 按照类别从最终数据集中抽取所需数据。 +- **Gen Train Data**: 根据选定的数据生成适合训练模型使用的数据格式。 \ No newline at end of file diff --git a/data_generate/query_completion/cluster_kmeans.py b/data_generate/query_completion/cluster_kmeans.py new file mode 100644 index 0000000..d219d0b --- /dev/null +++ b/data_generate/query_completion/cluster_kmeans.py @@ -0,0 +1,100 @@ +# -*- coding: utf-8 -*- + +import json +import sys +import os +import torch +import numpy as np +from sklearnex import patch_sklearn, unpatch_sklearn +from matplotlib import pyplot as plt +import time +import pandas as pd + + +def train_kmeans_model(train_data, n_clusters, data_index_list): + # 注意需要先做 patch_sklearn() 的操作之后再正常导入 sklearn 的工具包。 + patch_sklearn() + # unpatch_sklearn() + from sklearn.cluster import KMeans + + start_time = time.time() + + # 这里 训练数据 要求 输入 是 np.ndarray 格式 + data_set = train_data + print("train_data_len:" + str(len(data_set))) + print("n_clusters:" + str(n_clusters)) + + # init 默认就是 k-means++ + model = KMeans(n_clusters=n_clusters, init='k-means++', max_iter=300, n_init="auto", random_state=0) + + model.fit(data_set) + # kmeans 模型本身就是包含了聚类中心 + center = model.cluster_centers_ + print(center) + + cluster_label = model.predict(data_set) + print("cluster_label_size:" + str(len(cluster_label))) + print("type:" + str(type(cluster_label))) + + train_dfs = pd.DataFrame(data_set) + + train_dfs["predict_label"] = cluster_label + train_dfs["data_index"] = data_index_list + + print(train_dfs.columns.values) + + end_time = time.time() + avg_time_cost = (end_time - start_time) * 1.0 + print("train_kmeans_time:" + str(avg_time_cost) + " s") + return train_dfs + + +# step1: 读取保存的 embeding 二进制文件 +embed_file = "./dhbq/dhbq_embedding.pth" +embed_dict_list = torch.load(embed_file) +print("len_embed_dict_list:", len(embed_dict_list)) + +# step2: 数据 parse +raw_embeding_list = [] +raw_index_list = [] +for line in enumerate(embed_dict_list): + # line[0] 是 index, line[1] 是保存的 dict, 这里 读取进来 直接 就是 dict 对象 + cur_dict = line[1] + cur_embeding = cur_dict["embedding"] + cur_data_idx = cur_dict["data_index"] + raw_embeding_list.append(cur_embeding) + raw_index_list.append(cur_data_idx) + +train_array = np.array(raw_embeding_list) +print("train_array_shape:", train_array.shape) + +# 总共聚类 1000类,先聚类 50大类,每个大类里选20小类 +# 好吧,直接干到1000个大类,要是效果不好,在继续选择 +num_cluster = 300 +# 这里会自动进行 pandas row index 对齐 +train_dfs = train_kmeans_model(train_array, num_cluster, raw_index_list) +predict_label = train_dfs['predict_label'].tolist() + +print("len_predict_label:", len(predict_label)) + + +# 这里不保存csv,因为 pandas 的 csv 数据保存在遇到特殊字符的时候有意想不到的异常!! +#data_to_save = {'embeding': raw_embeding_list, 'cluster_center': predict_label, "data_idx": raw_index_list} +data_to_save = [ + { + "embedding": raw_embeding_list[i], + "cluster_center": predict_label[i], + "data_idx": raw_index_list[i] + } + for i in range(len(raw_embeding_list)) +] + +output_file = "./dhbq/dhbq_cluster_kmeans_result.jsonl" +with open(output_file, 'w', encoding='utf-8') as f: + for record in data_to_save: + f.write(json.dumps(record, ensure_ascii=False) + '\n') + +print(f"Results saved to {output_file}") + +data_to_save_df = pd.DataFrame(data_to_save) +data_to_save_df.to_pickle("./dhbq/dhbq_cluster_kmeans_result.pkl") diff --git a/data_generate/query_completion/count_cluster.py b/data_generate/query_completion/count_cluster.py new file mode 100644 index 0000000..362b046 --- /dev/null +++ b/data_generate/query_completion/count_cluster.py @@ -0,0 +1,71 @@ +import json +from collections import defaultdict +import pandas as pd + +def classify_data_by_cluster_center(input_file, output_jsonl, output_excel): + """ + 根据 cluster_center 对数据进行分类。 + 每个 cluster_center 对应一个列表,存储去掉了 embedding 的数据。 + 输出 JSONL 文件按类型的数据量从多到少排序,并记录每种类型的数量。 + 同时将类型及其数量导出到 Excel 表格中。 + 每个 cluster 对应的 data_list 会根据 'score' 字段降序排序。 + """ + + # 初始化分类字典 + classified_data = defaultdict(list) + + # 读取输入文件 + with open(input_file, 'r', encoding='utf-8') as f: + for line in f: + record = json.loads(line.strip()) + + # 提取 cluster_center 部分 + cluster_center = record.get("cluster_center") + + # 如果 cluster_center 存在,则根据其值分类 + if cluster_center is not None: + record_without_embedding = {k: v for k, v in record.items() if k != "embedding"} + classified_data[cluster_center].append(record_without_embedding) + else: + # 如果没有 cluster_center,则归类到 "null" + record_without_embedding = {k: v for k, v in record.items() if k != "embedding"} + classified_data["null"].append(record_without_embedding) + + # 对每个 cluster_center 下的 data_list 按照 score 排序(默认为0) + for center in classified_data: + classified_data[center].sort(key=lambda x: x.get('score', 0), reverse=True) + + # 按类型的数据量从多到少排序 + sorted_classified_data = sorted(classified_data.items(), key=lambda x: len(x[1]), reverse=True) + + # 写入 JSONL 文件 + total_types = len(sorted_classified_data) + with open(output_jsonl, 'w', encoding='utf-8') as out_f: + for cluster_center, data_list in sorted_classified_data: + entry = { + str(cluster_center): data_list, + #"count": len(data_list) + } + out_f.write(json.dumps(entry, ensure_ascii=False) + '\n') + + # 准备 Excel 数据 + excel_data = [] + for cluster_center, data_list in sorted_classified_data: + excel_data.append({"Cluster Center": cluster_center, "Count": len(data_list)}) + + # 导出到 Excel 文件 + df = pd.DataFrame(excel_data) + df.to_excel(output_excel, index=False) + + print(f"Total types: {total_types}") + return total_types + + +# 示例用法 +if __name__ == "__main__": + input_file = './dhbq/dhbq_merged_with_score_0513.jsonl' + output_jsonl = './dhbq/dhbq_count_cluster_0513.jsonl' + output_excel = './dhbq/dhbq_count_cluster_0513.xlsx' + + total_types = classify_data_by_cluster_center(input_file, output_jsonl, output_excel) + print(f"Total types found: {total_types}") \ No newline at end of file diff --git a/data_generate/query_completion/count_instag.py b/data_generate/query_completion/count_instag.py new file mode 100644 index 0000000..828ad50 --- /dev/null +++ b/data_generate/query_completion/count_instag.py @@ -0,0 +1,89 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright @2024 AI. Inspur Inc. +# +# @author: suojiayi +# @date: 2025/05/13 + +import json +from collections import defaultdict +import pandas as pd + +def classify_data_by_ins_tag(input_file, output_jsonl, output_excel): + """ + 根据 ins_tag_label 中的标签对数据进行分类。 + 每个类型对应一个列表,存储去掉了 embedding 的数据。 + 输出 JSONL 文件按类型的数据量从多到少排序,并记录每种类型的数量。 + 同时将类型及其数量导出到 Excel 表格中。 + """ + + # 初始化分类字典 + classified_data = defaultdict(list) + + # 读取输入文件 + with open(input_file, 'r', encoding='utf-8') as f: + for line in f: + record = json.loads(line.strip()) + + # 提取 ins_tag_label 部分 + ins_tag_label = record.get("ins_tag_label", "[]") + try: + ins_tag_label = json.loads(ins_tag_label) + except json.JSONDecodeError: + #print(f"JSON decode error: {ins_tag_label}") + ins_tag_label = [] + + # 如果 ins_tag_label 存在,则根据类型分类 + if ins_tag_label: + for tag in ins_tag_label: + record_without_embedding = {k: v for k, v in record.items() if k != "embedding"} + classified_data[tag].append(record_without_embedding) + else: + # 如果没有 ins_tag_label,则归类到 "null" + record_without_embedding = {k: v for k, v in record.items() if k != "embedding"} + classified_data["null"].append(record_without_embedding) + + # 对每个 data_list 按照 score 降序排序(如果不存在 score,默认为 0) + for tag in classified_data: + classified_data[tag].sort(key=lambda x: x.get('score', 0), reverse=True) + + # 过滤掉长度小于83的类别,并排除 "null" 类别 + filtered_data = { + tag: records for tag, records in classified_data.items() + if len(records) >= 83 and tag != "null" + } + + # 按照记录数从多到少排序 + sorted_filtered_data = sorted(filtered_data.items(), key=lambda x: len(x[1]), reverse=True) + + # 写入 JSONL 文件 + total_types = len(sorted_filtered_data) + with open(output_jsonl, 'w', encoding='utf-8') as out_f: + for type_name, data_list in sorted_filtered_data: + entry = { + type_name: data_list, + #"count": len(data_list) + } + out_f.write(json.dumps(entry, ensure_ascii=False) + '\n') + + # 准备 Excel 数据 + excel_data = [] + for type_name, data_list in sorted_filtered_data: + excel_data.append({"Type": type_name, "Count": len(data_list)}) + + # 导出到 Excel 文件 + df = pd.DataFrame(excel_data) + df.to_excel(output_excel, index=False) + + print(f"Total types after filtering: {total_types}") + return total_types + + +# 示例用法 +input_file = './dhbq/dhbq_merged_with_score.jsonl' +output_jsonl = './dhbq/dhbq_count_instag.jsonl' +output_excel = './dhbq/dhbq_count_instag.xlsx' + +total_types = classify_data_by_ins_tag(input_file, output_jsonl, output_excel) +print(f"Final types found: {total_types}") \ No newline at end of file diff --git a/data_generate/query_completion/count_label.py b/data_generate/query_completion/count_label.py new file mode 100644 index 0000000..11efd4c --- /dev/null +++ b/data_generate/query_completion/count_label.py @@ -0,0 +1,97 @@ +import json +from collections import defaultdict +import pandas as pd + +def classify_data_by_labels(input_file, output_file, output_excel): + """ + 根据 prompt_label 中的 labels 提取类型(::后面的内容)。 + 每个类型对应一个列表,存储去掉了 embedding 的数据。 + 输出文件按类型的数据量从多到少排序,并记录每种类型的数量。 + 每个类型的 data_list 会根据 score 字段降序排序。 + 并且会过滤掉 data_list 长度小于 19 的类型和特定的 label 值。 + """ + # 初始化分类字典 + classified_data = defaultdict(list) + excluded_labels = {"null", "无", "无指代", "无指代消解", "无明显指代消解", + "无上下文依赖", "无明显指代消解需求", "无明确指代", + "无明显上下文依赖", "无依赖", "无上下文", "无明显指代", + } + + # 读取输入文件 + with open(input_file, 'r', encoding='utf-8') as f: + for line in f: + record = json.loads(line.strip()) + + # 提取 prompt_label 中的 labels 部分 + prompt_label = record.get("prompt_label", "{}") + try: + prompt_label = json.loads(prompt_label) + except json.JSONDecodeError: + prompt_label = {} + + if isinstance(prompt_label, list): + labels = prompt_label[0].get("labels", []) + else: + labels = prompt_label.get("labels", []) + + # 如果 labels 存在,则根据类型分类 + if labels: + for label in labels: + if "::" in label: + type_name = label.split("::")[-1] # 提取 :: 后面的内容 + # 排除特定的 label 值 + if any(excluded_label in label for excluded_label in excluded_labels): + continue + record_without_embedding = {k: v for k, v in record.items() if k != "embedding"} + classified_data[type_name].append(record_without_embedding) + # else: + # # 如果没有 labels,则归类到 "null" + # record_without_embedding = {k: v for k, v in record.items() if k != "embedding"} + # classified_data["null"].append(record_without_embedding) + + # 对每个类型的 data_list 按照 score 字段降序排序 + for type_name in classified_data.keys(): + classified_data[type_name].sort(key=lambda x: x.get('score', 0), reverse=True) + + # 过滤掉 data_list 长度小于 19 的类型 + filtered_classified_data = {k: v for k, v in classified_data.items() if len(v) >= 19} + + # 按类型的数据量从多到少排序 + sorted_classified_data = sorted(filtered_classified_data.items(), key=lambda x: len(x[1]), reverse=True) + + # 写入输出文件 + total_types = len(sorted_classified_data) + with open(output_file, 'w', encoding='utf-8') as out_f: + for type_name, data_list in sorted_classified_data: + entry = { + type_name: data_list, + #"count": len(data_list) + } + out_f.write(json.dumps(entry, ensure_ascii=False) + '\n') + + print(f"Total types after filtering: {total_types}") + + # 准备 Excel 数据 + excel_data = [] + for type_name, data_list in sorted_classified_data: + excel_data.append({"Type": type_name, "Count": len(data_list)}) + + # 导出到 Excel 文件 + df = pd.DataFrame(excel_data) + df.to_excel(output_excel, index=False) + + # 将类型为 null 的数据单独保存到一个 JSONL 文件中 + # null_data = classified_data.get("null", []) + # if len(null_data) >= 19: # 只有当 null 类型的数据长度大于等于19时才保存 + # with open('./dhbq/prompt_null.jsonl', 'w', encoding='utf-8') as null_f: + # for record in null_data: + # null_f.write(json.dumps(record, ensure_ascii=False) + '\n') + + return total_types + +# 示例用法 +input_file = './dhbq/dhbq_merged_with_score.jsonl' +output_file = './dhbq/dhbq_count_prompt_label.jsonl' +output_excel = './dhbq/dhbq_count_prompt_label.xlsx' +total_types = classify_data_by_labels(input_file, output_file, output_excel) +print(f"Total types found: {total_types}") \ No newline at end of file diff --git a/data_generate/query_completion/embedding_similarity.py b/data_generate/query_completion/embedding_similarity.py new file mode 100644 index 0000000..8191d88 --- /dev/null +++ b/data_generate/query_completion/embedding_similarity.py @@ -0,0 +1,136 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright @2024 AI. Inspur Inc. +# +# @author: suojiayi +# @date: 2025/05/13 + +import torch +import pandas as pd +from torch.nn.functional import normalize +import faiss +import numpy as np +from transformers import AutoTokenizer, AutoModel +from tqdm import tqdm +from loguru import logger + +# 设置随机种子 +def set_seed(seed=42): + import random + import numpy as np + import torch + random.seed(seed) + np.random.seed(seed) + torch.manual_seed(seed) + if torch.cuda.is_available(): + torch.cuda.manual_seed_all(seed) + +# 文本去重类 +class SemanticDeduplicator: + def __init__(self, model_name_or_path: str, device: str = "cuda"): + self.device = torch.device(device if torch.cuda.is_available() else "cpu") + self.tokenizer = AutoTokenizer.from_pretrained(model_name_or_path) + self.model = AutoModel.from_pretrained(model_name_or_path).to(self.device) + self.model.eval() + self.dimension = self.model.config.hidden_size + self.index = faiss.IndexFlatIP(self.dimension) # 内积即余弦相似度 + self.seen_embeddings = [] + + @torch.no_grad() + def get_embedding(self, texts): + inputs = self.tokenizer(texts, padding=True, truncation=True, max_length=512, return_tensors="pt").to(self.device) + outputs = self.model(**inputs) + embeddings = outputs.last_hidden_state[:, 0, :] # 取 [CLS] 向量作为句子表示 + embeddings = normalize(embeddings, p=2, dim=1).cpu().numpy() + return embeddings + + def deduplicate(self, texts, threshold=0.85): + result = [] + for text in tqdm(texts, desc="De-duplicating"): + if not text.strip(): + continue + emb = self.get_embedding([text]) + if len(self.seen_embeddings) > 0: + self.index.add(np.array(self.seen_embeddings)) + D, I = self.index.search(emb, k=1) + if '6+6' in texts: + print(D[0][0]) + if '4+4' in texts: + print(D[0][0]) + if D[0][0] < threshold: + result.append(text) + self.seen_embeddings.append(emb[0]) + self.index.reset() # 每次 search 后要清空 index 缓存 + else: + result.append(text) + self.seen_embeddings.append(emb[0]) + return result + +def main(): + # 配置参数 + input_path = './dhbq/frequency-score-5.xlsx' # 输入Excel文件路径 + output_path = './dhbq/frequency-score-5-deduplicated.xlsx' # 输出Excel文件路径 + model_name_or_path = "/model-pvc/suojiayi/bge-base-zh-v1.5/" # 使用的预训练模型名称 + col_index = 0 # 需要去重的列索引 + threshold = 0.65 # 相似度阈值 + + logger.info("加载数据...") + df = pd.read_excel(input_path).fillna("") + texts = df.iloc[:, col_index].astype(str).str.strip().tolist() + + logger.info(f"开始语义去重(模型: {model_name_or_path} | 相似度阈值: {threshold})") + deduplicator = SemanticDeduplicator(model_name_or_path) + unique_texts = deduplicator.deduplicate(texts=texts, threshold=threshold) + + logger.info(f"去重完成,共保留 {len(unique_texts)} 条文本") + + new_df = pd.DataFrame({df.columns[col_index]: unique_texts}) + + merged_df = pd.merge(new_df, df, on=df.columns[col_index], how='left') + + # 方法2: 使用 map(适合映射单列) + # new_df[df.columns[1]] = new_df[df.columns[col_index]].map(df.set_index(df.columns[col_index])[df.columns[1]]) + # new_df[df.columns[2]] = new_df[df.columns[col_index]].map(df.set_index(df.columns[col_index])[df.columns[2]]) + + # 如果你只想保留 new_df 中存在的列,可以这样: + final_df = merged_df[df.columns] # 按原始顺序保留所有列 + + logger.info(f"保存结果到 {output_path}") + final_df.to_excel(output_path, index=False) + logger.info("保存成功!") + +if __name__ == "__main__": + set_seed() + main() + + +# +# from transformers import AutoTokenizer, AutoModel +# import torch +# import numpy as np +# +# def get_embedding(model, tokenizer, text): +# inputs = tokenizer(text, padding=True, truncation=True, return_tensors="pt").to("cuda") +# with torch.no_grad(): +# outputs = model(**inputs) +# embeddings = outputs.last_hidden_state[:, 0, :].cpu().numpy() # [CLS] 向量 +# return embeddings +# +# def cos_sim(a, b): +# return np.dot(a, b.T) / (np.linalg.norm(a) * np.linalg.norm(b)) +# +# # 加载模型 +# model_path = "/model-pvc/suojiayi/bge-base-zh-v1.5/" +# tokenizer = AutoTokenizer.from_pretrained(model_path) +# model = AutoModel.from_pretrained(model_path).to("cuda") +# +# # 测试文本 +# text1 = "今天天气" +# text2 = "今天天气怎么样" +# +# emb1 = get_embedding(model, tokenizer, text1) +# emb2 = get_embedding(model, tokenizer, text2) +# +# similarity = cos_sim(emb1, emb2) +# print(f"相似度:{similarity[0][0]:.4f}") \ No newline at end of file diff --git a/data_generate/query_completion/export_embedding.py b/data_generate/query_completion/export_embedding.py new file mode 100644 index 0000000..e41aa54 --- /dev/null +++ b/data_generate/query_completion/export_embedding.py @@ -0,0 +1,192 @@ +from transformers import AutoTokenizer, AutoModel +import torch +import numpy as np +from torch.utils.data import DataLoader +import pandas as pd +from tqdm import tqdm +from loguru import logger +import random +import argparse +import json +import pandas +from typing import List + + +def set_seed(seed=128): + random.seed(seed) + torch.manual_seed(seed) + np.random.seed(seed) + torch.cuda.manual_seed_all(seed) + + +class MyTorchDataset(torch.utils.data.Dataset): + + def __init__(self, input_file, tokenizer, max_seq_length): + self.tokenizer = tokenizer + self.max_seq_length = max_seq_length + logger.info('Loading data: {}'.format(input_file)) + with open(input_file, 'r', encoding='utf8') as f: + data_list = f.readlines() + logger.info("There are {} data in dataset".format(len(data_list))) + + # 测试:这里仅取 top 100 条数据 + self.raw_data_list = data_list #[:100] + + # 提取对话内容 + self.query_list = [] + self.index_list = [] + for line in self.raw_data_list: + json_line = json.loads(line) + uid = json_line["uid"] + dialog_content = "" + + # 遍历每一轮对话 + for turn in json_line["data"]: + if turn['role'] == 'user': + content = turn["content"] + + # 如果 content 是字符串形式的 JSON,先解析 + if isinstance(content, str) and content.startswith("{"): + try: + content = json.loads(content) + except json.JSONDecodeError: + pass + + # 将 content 转换为字符串并拼接到对话中 + if isinstance(content, dict): + content_str = " ".join(f"{k}: {v}" for k, v in content.items()) + else: + content_str = str(content) + + dialog_content += content_str+"\n" + + # 去掉最后的换行符 + dialog_content = dialog_content.strip() + #print(dialog_content) + self.query_list.append(dialog_content) + self.index_list.append(uid) + + assert len(self.query_list) == len(self.index_list) + logger.info(f"final len_query_list:{len(self.query_list)}") + logger.info(f"final len_index_list:{len(self.index_list)}") + + # 批量 Tokenize 所有数据 + logger.info(f"开始批量 tokenize 所有数据 ...") + self.all_data_token = self.tokenizer( + self.query_list, + padding='max_length', + truncation=True, + max_length=self.max_seq_length, + return_tensors='pt' + ) + logger.info(f"批量 tokenize 所有数据 完成 !!!") + + def __getitem__(self, idx): + # item 包含了 input_ids,attention_mask 等参数,已经转化为 tensor + item = {key: torch.as_tensor(value[idx]) for key, value in self.all_data_token.items()} + item['data_index'] = self.index_list[idx] + return item + + def __len__(self): + return len(self.query_list) + + +class TextEmbedder: + def __init__(self, args): + self.args = args + self.device = torch.device("cuda:0") + self.tokenizer = AutoTokenizer.from_pretrained(self.args.model_name_or_path, + model_max_length=args.max_length, + # 这里是推理过程,指定 左侧padding,不然取推理结果有问题 + padding_side="left", + use_fast=False) + self.model = AutoModel.from_pretrained(self.args.model_name_or_path).to(self.device) + logger.info('tokenizer,model 加载完成') + self.all_dataset = MyTorchDataset(self.args.raw_data_path, self.tokenizer, self.args.max_length) + self.data_loader = DataLoader(dataset=self.all_dataset, batch_size=self.args.batch_size, + # pin_memory=True,num_workers=1, prefetch_factor=8 + ) + + def save_data(self, results: List) -> None: + logger.info(f'需要保存的数据长度: {len(results)}') + + df = pandas.DataFrame(results) + df.sort_values(by="data_index", inplace=True) + df.reset_index(drop=True, inplace=True) + + df.to_pickle(self.args.output_path) + logger.info(f"Saved pickle to {self.args.output_path}") + + ''' + # 这个地方 .csv 会保存失败 + csv_file_name = "./rl_demo_emb_240624.csv" + df.to_csv(csv_file_name, index=False) + logger.info(f"Saved csv to {csv_file_name}") + ''' + + # 指定保存的文件名 + torch_py_bin_name = './dhbq/dhbq_embedding.pth' + # 请注意,torch.save()和torch.load()函数不仅可以用于保存和加载张量,还可以用于保存和加载任何Python对象, + # 包括但不限于字典、集合、自定义类实例等。 + torch.save(results, torch_py_bin_name) + logger.info(f'List has been saved to {torch_py_bin_name}') + + def encode_samples(self): + total_samples = len(self.all_dataset) + logger.info(f"total_samples: {total_samples}") + total_batches = len(self.data_loader) + logger.info(f"total_batches: {total_batches}") + + all_embeddings_list = [] + + for b_idx, batch in enumerate(tqdm(self.data_loader, total=total_batches)): + self.model.eval() + batch_idx = batch["data_index"] + input_ids = batch['input_ids'].to(self.device) + attention_mask = batch['attention_mask'].to(self.device) + output = None + with torch.no_grad(): + output = self.model(input_ids=input_ids, attention_mask=attention_mask) + + cls_embedding = output.pooler_output.detach().cpu().numpy().tolist() + + # 记录 embedding 和 index + cur_sample_idx = batch_idx#.tolist() # 如果 batch_idx 是张量 + cur_sample_dict_list = [ + {"embedding": cls_emb, "data_index": s_id} + for cls_emb, s_id in zip(cls_embedding, cur_sample_idx) + ] + all_embeddings_list.extend(cur_sample_dict_list) + + return all_embeddings_list + + +if __name__ == '__main__': + torch.cuda.empty_cache() + # 参数设置 + set_seed() + # 若干参数设置 + parser = argparse.ArgumentParser() + # 不要改该参数,系统会自动分配 + parser.add_argument('--device', default='cuda', help='device id (i.e. 0 or 0,1 or cpu)') + # 开启的进程数(注意不是线程),不用设置该参数,会根据nproc_per_node自动设置 + parser.add_argument('--world_size', default=1, type=int, help='number of distributed processes') + args = parser.parse_args() + + args.max_length = 512 + args.batch_size = 2048 + args.model_name_or_path = "/model-pvc/suojiayi/bge-base-zh-v1.5/" + #args.raw_data_path = "/dataset-pvc/suojiayi/new/train_prepare/20250423_020157/tmp_data/instruct_data_COIG_filtered_2504212014.jsonl" + args.raw_data_path = '/dataset-pvc/suojiayi/duihuabuquan/train_prepare/20250425_103516/tmp_data/instruct_data_dhbq_0425_filtered_2504251824.jsonl' + args.output_path = "./dhbq/dhbq_embedding.bin" + + logger.info(f"\nargs:{args}\n") + + text_embeder = TextEmbedder(args) + all_embeddings_list = text_embeder.encode_samples() + + logger.info(f"len_all_embeddings_list:{len(all_embeddings_list)}") + logger.info("Finished embedding") + + text_embeder.save_data(all_embeddings_list) + logger.info(f"Pipeline run complete.") diff --git a/data_generate/query_completion/frequency.py b/data_generate/query_completion/frequency.py new file mode 100644 index 0000000..1a7a4c3 --- /dev/null +++ b/data_generate/query_completion/frequency.py @@ -0,0 +1,91 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright @2024 AI. Inspur Inc. +# +# @author: suojiayi +# @date: 2025/05/13 + +import json +import pandas as pd + +import json +from collections import Counter +import pandas as pd + +def process_jsonl_and_save_to_excel(jsonl_file, output_excel): + """ + 读取 JSONL 文件,提取最后一个 content 的内容,统计频率并保存到 Excel 表格中。 + + 参数: + jsonl_file: 输入的 JSONL 文件路径 + output_excel: 输出的 Excel 文件路径 + """ + # 存储所有最后一行的 content + last_contents = [] + + # 读取 JSONL 文件 + with open(jsonl_file, 'r', encoding='utf-8') as f: + for line in f: + # 解析每一行为 JSON 对象 + data = json.loads(line) + # 提取 data 列表的最后一个 content + if 'data' in data and isinstance(data['data'], list) and len(data['data']) > 0: + last_content = data['data'][-2].get('content', '') + last_contents.append(last_content) + + # 统计相同内容出现的频率 + content_counter = Counter(last_contents) + + # 转换为 DataFrame + df = pd.DataFrame(content_counter.items(), columns=['Content', 'Frequency']) + + # 按频率降序排序 + df = df.sort_values(by='Frequency', ascending=False) + + # 保存到 Excel 文件 + df.to_excel(output_excel, index=False) + print(f"统计结果已保存到 {output_excel}") + +# 示例调用 +jsonl_file = '/dataset-pvc/suojiayi/duihuabuquan/train_prepare/20250425_103516/tmp_data/instruct_data_dhbq_0425_filtered_2504251824.jsonl' # 输入的 JSONL 文件路径 +output_excel = "./dhbq/frequency.xlsx" # 输出的 Excel 文件路径 + +process_jsonl_and_save_to_excel(jsonl_file, output_excel) + +def index_jsonl(jsonl_file_path): + """索引jsonl文件,返回一个字典,键是'data'字段数组中最后一个对象的'content'值""" + index_dict = {} + with open(jsonl_file_path, 'r', encoding='utf-8') as file: + for line in file: + record = json.loads(line.strip()) + data_list = record.get('data', []) + if data_list: # 确保'data'字段存在且非空 + content_value = data_list[-1].get('content') + if content_value is not None: + index_dict[content_value] = record + return index_dict + + +def update_excel_with_matched_data(input_excel_path, output_excel_path, jsonl_index): + """根据索引更新Excel文件""" + df = pd.read_excel(input_excel_path) + matched_data = [jsonl_index.get(value) for value in df.iloc[:, 0]] + df['matched_json'] = matched_data + df.to_excel(output_excel_path, index=False) + + +# 使用示例 +if __name__ == '__main__': + input_excel_path = './dhbq/frequency.xlsx' # 输入Excel文件路径,请替换为你的实际路径 + output_excel_path = './dhbq/frequency-score-5.xlsx' # 输出Excel文件路径,请替换为你的实际路径 + jsonl_file_path = './dhbq/dhbq_merged_with_score_5.jsonl' # jsonl文件路径,请替换为你的实际路径 + + print("开始索引jsonl文件...") + jsonl_index = index_jsonl(jsonl_file_path) + print(f"完成索引,共索引 {len(jsonl_index)} 条记录") + + update_excel_with_matched_data(input_excel_path, output_excel_path, jsonl_index) + print(f"处理完成,已将结果保存至{output_excel_path}") + + diff --git a/data_generate/query_completion/gen_train_data.py b/data_generate/query_completion/gen_train_data.py new file mode 100644 index 0000000..8494638 --- /dev/null +++ b/data_generate/query_completion/gen_train_data.py @@ -0,0 +1,86 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright @2024 AI. Inspur Inc. +# +# @author: suojiayi +# @date: 2025/05/13 + +import json +import re + +def is_empty(value): + """ + 判断一个值是否为空(包括 None、空字符串、空列表、空字典等) + :param value: 检查的值 + :return: 如果值为空则返回 True,否则返回 False + """ + if value is None: + return True + if isinstance(value, str) and not value.strip(): + return True + if isinstance(value, (list, dict)) and len(value) == 0: + return True + return False + + +def filter_empty_fields(data): + """ + 递归过滤 JSON 数据中的空字段数据项 + :param data: 当前处理的数据(可以是 dict、list) + :return: 过滤后的数据 + """ + if isinstance(data, list): + filtered_list = [filter_empty_fields(item) for item in data if not is_empty(item)] + return [item for item in filtered_list if not is_empty(item)] + elif isinstance(data, dict): + filtered_dict = {k: filter_empty_fields(v) for k, v in data.items() if not is_empty(v)} + final_dict = {k: v for k, v in filtered_dict.items() if not is_empty(v)} + if not final_dict: + return None + return final_dict + else: + return data + +def process_jsonl(input_file, output_file): + with open(input_file, 'r', encoding='utf-8') as fin, open(output_file, 'w', encoding='utf-8') as fout: + for line in fin: + data = json.loads(line.strip())['data'] + + last_user = None + last_assistant = None + history = [] + + for i in range(len(data)): + if data[i]['role'] == 'user': + if i + 1 < len(data) and data[i+1]['role'] == 'assistant': + history.append([data[i]['content'], data[i+1]['content']]) + else: + last_user = data[i]['content'] + elif data[i]['role'] == 'assistant' and data[i-1]['role'] != 'user': + pass + + input_content = last_user or (data[-2]['content'] if len(data) >= 2 and data[-1]['role'] == 'assistant' else "") + output_content = data[-1]['content'] if data[-1]['role'] == 'assistant' else "" + if output_content: + target_pattern = "问题:" + output_content = re.sub(target_pattern, "", output_content) + if output_content: + result = { + "input": input_content, + "output": output_content, + "history": history[:-1] if history and input_content in history[-1] else history + } + + result = filter_empty_fields(result) + + fout.write(json.dumps(result, ensure_ascii=False) + '\n') + + +# input_file = './dhbq/merged_result.jsonl' +# output_file = './dhbq/train_data_0506.jsonl' +# process_jsonl(input_file, output_file) + +input_file = './dhbq/test_data.jsonl' +output_file = './dhbq/test_data_0512.jsonl' +process_jsonl(input_file, output_file) \ No newline at end of file diff --git a/data_generate/query_completion/instag.py b/data_generate/query_completion/instag.py new file mode 100644 index 0000000..edd8043 --- /dev/null +++ b/data_generate/query_completion/instag.py @@ -0,0 +1,126 @@ +import json +from vllm import LLM, SamplingParams +from vllm.transformers_utils.tokenizer import get_tokenizer +from tqdm import tqdm, trange +from copy import deepcopy + +# need vllm==0.4.2 +# firefly_copy 使用 + +# step1 读入待跑 tag 的 jsonl 文件 +input_jsonl_file = "/dataset-pvc/suojiayi/duihuabuquan/train_prepare/20250425_103516/tmp_data/instruct_data_dhbq_0425_filtered_2504251824.jsonl" +with open(input_jsonl_file, 'r', encoding='utf8') as f: + raw_data_list = f.readlines() +print("len_raw_data_list:", len(raw_data_list)) + +# 这里测试,先跑前100个数据 +# raw_data_list = raw_data_list[:100] + +# step2 读取 instag 模型 配置 tokenizer +model_path = '/model-pvc/instagger_qwen1_8B/' +tokenizer = get_tokenizer(model_path, trust_remote_code=True) +tokenizer.chat_template = ( + "{% for message in messages %}" + "{{ '### ' + message['role'].capitalize() + ':\\n' }}" + "{{ message['content'] | string + '\\n' }}" + "{% endfor %}" + "{% if add_generation_prompt %}" + "{{ '### Assistant:\\n' }}" + "{% endif %}" + +) +# step3 这里内存够用,一次性读取所有文件进行拼接 prompt 处理 +prompt_text_list = [] +for index, line in enumerate(raw_data_list): + # print("process:", index) + json_obj = json.loads(line) + # 遍历每一轮对话 + query = "" + for turn in json_obj["data"]: + if turn['role'] == 'user': + content = turn["content"] + + # 如果 content 是字符串形式的 JSON,先解析 + if isinstance(content, str) and content.startswith("{"): + try: + content = json.loads(content) + except json.JSONDecodeError: + pass + + # 将 content 转换为字符串并拼接到对话中 + if isinstance(content, dict): + content_str = " ".join(f"{k}: {v}" for k, v in content.items()) + else: + content_str = str(content) + + query += content_str + "\n" + + prompt = query + + messages = [ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": prompt} + ] + text = tokenizer.apply_chat_template( + messages, + tokenize=False, + add_generation_prompt=True + ) + prompt_text_list.append(text) + +print("len_prompt_text_list:", len(prompt_text_list)) +assert len(raw_data_list) == len(prompt_text_list) + + +def extract_raw_qwen_res(response): + if "<|im_end|>" in response: + response = response.split("<|im_end|>")[0] + return response + + +# step4: vllm 推理参数设置,官方推荐这里要求 temperature=0 +sampling_params = SamplingParams(max_tokens=64, + temperature=0, + top_p=0.8, + repetition_penalty=1.05) + +# step5: 读入模型 这里读入模型往后放,数据有问题在load 模型之前暴露出来节省时间 +# set gpu_memory_utilization=0.95 for OOM error +vllm_model = LLM(model=model_path, + tensor_parallel_size=1, + gpu_memory_utilization=0.95, + trust_remote_code=True) +vllm_model.set_tokenizer(tokenizer) + +# step6: 分批量推理并且得到结果的过程 +tag_result_list = [] +batch_size = 1024 +for start in trange(0, len(prompt_text_list), batch_size): + print("index:", start) + batch_list = prompt_text_list[start: start + batch_size] + vllm_outputs = vllm_model.generate(batch_list, sampling_params) + for output in vllm_outputs: + prompt = output.prompt + cur_tag_result = extract_raw_qwen_res(output.outputs[0].text) + tag_result_list.append(cur_tag_result) + +print("len_tag_result_list:", len(tag_result_list)) + +assert len(tag_result_list) == len(prompt_text_list) + +# step7: 写入结果 +print("开始写 instag 结果 数据") +single_turn_instag_path = "./dhbq/dhbq_instag.jsonl" +print("single_turn_instag_path:", single_turn_instag_path) +single_f_sample_save = open(single_turn_instag_path, "w") +write_single_sample_count = 0 +for line, tag in zip(raw_data_list, tag_result_list): + json_line = json.loads(line) + json_line["ins_tag_label"] = tag + cur_final_line = json.dumps(deepcopy(json_line), ensure_ascii=False) + write_single_sample_count = write_single_sample_count + 1 + single_f_sample_save.write(cur_final_line + "\n") + if write_single_sample_count % 100000 == 0: + print("write_single_count:", write_single_sample_count) + +print("all finish success !") diff --git a/data_generate/query_completion/merge.py b/data_generate/query_completion/merge.py new file mode 100644 index 0000000..27290df --- /dev/null +++ b/data_generate/query_completion/merge.py @@ -0,0 +1,248 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright @2024 AI. Inspur Inc. +# +# @author: suojiayi +# @date: 2025/05/13 + +import json + +def clean_answer(answer): + """ + 提取 ```json 和 ``` 包裹的内容,并清理多余标记和空格。 + 如果包裹的内容是合法的 JSON,则返回紧凑格式的 JSON 字符串; + 否则返回原始内容。 + """ + # 查找是否包含 ```json 开头和 ``` 结尾 + if "```" in answer: + # 提取 ```json 和 ``` 之间的内容 + start = answer.find("```json") + end = answer.find("```", start+len('```json')) + if end != -1: + content = answer[start+len('```json'):end].strip() + # print(content) + else: + content = answer.strip() # 如果没有找到结束的 ```,保留原始内容 + else: + content = answer.strip() # 如果没有匹配到 ```json 标记,保留原始内容 + + # 尝试解析为 JSON 对象 + try: + cleaned_json = json.loads(content) # 解析为 JSON 对象 + return json.dumps(cleaned_json, ensure_ascii=False) # 返回紧凑格式的 JSON 字符串 + except json.JSONDecodeError: + return content # 如果不是合法 JSON,返回原始内容 + + +def merge_jsonl_files(file1, file2, output_file): + """ + 合并两个JSONL文件,并清理answer字段。 + """ + merged_data = [] + count1 = 0 + count2 = 0 + # 读取第一个文件 + with open(file1, 'r', encoding='utf-8') as f1: + for line in f1: + data = json.loads(line.strip()) + if 'answer' in data: + data['answer'] = clean_answer(data['answer']) + merged_data.append(data) + + # 读取第二个文件 + with open(file2, 'r', encoding='utf-8') as f2: + for line in f2: + data = json.loads(line.strip()) + if 'answer' in data: + data['answer'] = clean_answer(data['answer']) + merged_data.append(data) + + + # 写入合并后的文件 + with open(output_file, 'w', encoding='utf-8') as out_f: + for item in merged_data: + out_f.write(json.dumps(item, ensure_ascii=False) + '\n') + + +示例用法 +file1 = './dhbq/duihuabuquan_prompt_1.jsonl' +file2 = './dhbq/duihuabuquan_prompt_2.jsonl' +output_file = './dhbq/dhbq_prompt.jsonl' +file1 = './dhbq/duihuabuquan_score_1.jsonl' +file2 = './dhbq/duihuabuquan_score_2.jsonl' +output_file = './dhbq/dhbq_score.jsonl' + +merge_jsonl_files(file1, file2, output_file) + + +import json + +def merge_jsonl_files_by_uid(file1, file2, output_file): + """ + 根据UID合并两个JSONL文件,保留每个UID的一组'data',并整合'answer'和'ins_tag_label'。 + 'answer'字段重命名为'prompt_label',同时保留'uid'字段。 + """ + # 从第一个文件读取数据并存储在字典中,键为uid + data_dict = {} + with open(file1, 'r', encoding='utf-8') as f1: + for line in f1: + record = json.loads(line.strip()) + uid = record['uid'] + if uid not in data_dict: + data_dict[uid] = { + 'uid': uid, # 保留uid字段 + 'data': record['data'], + 'prompt_label': record.get('answer') + } + else: + print(f"Warning: Duplicate UID found in the first file: {uid}") + + # 处理第二个文件,根据UID合并数据 + with open(file2, 'r', encoding='utf-8') as f2: + for line in f2: + record = json.loads(line.strip()) + uid = record['uid'] + if uid in data_dict: + # 如果UID存在,则添加或更新ins_tag_label字段 + if 'ins_tag_label' in record: + data_dict[uid]['ins_tag_label'] = record['ins_tag_label'] + else: + # 如果UID不存在于第一个文件中,则直接添加到结果集中 + new_record = { + 'uid': uid, # 保留uid字段 + 'data': record['data'] + } + if 'ins_tag_label' in record: + new_record['ins_tag_label'] = record['ins_tag_label'] + data_dict[uid] = new_record + + # 将合并后的数据写入输出文件 + with open(output_file, 'w', encoding='utf-8') as out_f: + for uid in data_dict: + out_f.write(json.dumps(data_dict[uid], ensure_ascii=False) + '\n') + +# 示例用法 +file1 = './dhbq/dhbq_prompt.jsonl' +file2 = './dhbq/dhbq_instag.jsonl' +output_file = './dhbq/dhbq.jsonl' + +merge_jsonl_files_by_uid(file1, file2, output_file) + +import json + +def merge_jsonl_files(file1, file2, output_file): + """ + 合并两个JSONL文件: + - 第一个文件包含'uid'和其他内容。 + - 第二个文件包含'index'、'cluster_center'和'embedding'。 + - 根据'uid'和'index'匹配,将第二个文件的'cluster_center'和'embedding'添加到第一个文件中。 + """ + # 读取第一个文件,构建以'uid'为键的字典 + data_dict = {} + with open(file1, 'r', encoding='utf-8') as f1: + for line in f1: + record = json.loads(line.strip()) + uid = record['uid'] + data_dict[uid] = record + + # 读取第二个文件,提取'index'、'cluster_center'和'embedding' + with open(file2, 'r', encoding='utf-8') as f2: + for line in f2: + record = json.loads(line.strip()) + index = record.get('data_idx') + cluster_center = record.get('cluster_center') + #embedding = record.get('embedding') + + # 如果'index'存在于第一个文件的'uid'中,则合并数据 + if index in data_dict: + data_dict[index]['cluster_center'] = cluster_center + #data_dict[index]['embedding'] = embedding + + # 将合并后的数据写入输出文件 + with open(output_file, 'w', encoding='utf-8') as out_f: + for uid in data_dict: + out_f.write(json.dumps(data_dict[uid], ensure_ascii=False) + '\n') + +# 示例用法 +file1 = './dhbq/dhbq.jsonl' +file2 = './dhbq/dhbq_cluster_kmeans_result.jsonl' +output_file = './dhbq/dhbq_merged.jsonl' + +merge_jsonl_files(file1, file2, output_file) + + +import json + +def merge_jsonl_files(file1, file2, output_file): + """ + 合并两个JSONL文件: + - 第一个文件包含'uid'和其他内容。 + - 第二个文件包含'index'、'cluster_center'和'embedding'。 + - 根据'uid'和'index'匹配,将第二个文件的'cluster_center'和'embedding'添加到第一个文件中。 + """ + # 读取第一个文件,构建以'uid'为键的字典 + data_dict = {} + with open(file1, 'r', encoding='utf-8') as f1: + for line in f1: + record = json.loads(line.strip()) + uid = record['uid'] + data_dict[uid] = record + + # 读取第二个文件,提取'index'、'cluster_center'和'embedding' + with open(file2, 'r', encoding='utf-8') as f2: + for line in f2: + record = json.loads(line.strip()) + index = record.get('uid') + score = record.get('answer') + embedding = record.get('embedding') + + # 如果'index'存在于第一个文件的'uid'中,则合并数据 + if index in data_dict: + data_dict[index]['score'] = score + data_dict[index]['embedding'] = embedding + + # 将合并后的数据写入输出文件 + with open(output_file, 'w', encoding='utf-8') as out_f: + for uid in data_dict: + out_f.write(json.dumps(data_dict[uid], ensure_ascii=False) + '\n') + +# 示例用法 +file1 = './dhbq/dhbq_merged.jsonl' +file2 = './dhbq/dhbq_score.jsonl' +output_file = './dhbq/dhbq_merged_with_score.jsonl' + +merge_jsonl_files(file1, file2, output_file) + + +import json + +def filter_records(input_file_path, output_file_path, target_field='score', target_value=5): + """ + 筛选出jsonl文件中指定字段等于特定值的记录,并保存为新的jsonl文件。 + + :param input_file_path: 输入jsonl文件路径 + :param output_file_path: 输出jsonl文件路径 + :param target_field: 目标字段,默认为'score' + :param target_value: 目标字段需要匹配的值,默认为5 + """ + with open(input_file_path, 'r', encoding='utf-8') as infile, \ + open(output_file_path, 'w', encoding='utf-8') as outfile: + for line in infile: + record = json.loads(line.strip()) + # 检查记录中目标字段的值是否等于目标值 + try: + if int(record.get(target_field)) == target_value: + # 将符合条件的记录写入输出文件 + outfile.write(json.dumps(record, ensure_ascii=False) + '\n') + except Exception as e: + continue + + +# 使用示例 +if __name__ == '__main__': + input_file_path = './dhbq/dhbq_merged_with_score.jsonl' # 输入jsonl文件路径,请替换为你的实际路径 + output_file_path = './dhbq/dhbq_merged_with_score_5.jsonl' # 输出jsonl文件路径,请替换为你的实际路径 + + filter_records(input_file_path, output_file_path) + print(f"筛选完成,已将符合条件的记录保存至{output_file_path}") diff --git a/data_generate/query_completion/prompt_label.py b/data_generate/query_completion/prompt_label.py new file mode 100644 index 0000000..0cbdef2 --- /dev/null +++ b/data_generate/query_completion/prompt_label.py @@ -0,0 +1,165 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright @2024 AI. Inspur Inc. +# +# @author: suojiayi +# @date: 2025/05/13 + +import json +import requests +import pandas as pd +from concurrent.futures import ThreadPoolExecutor, as_completed + +def read_jsonl_lines_in_batches(file_path, batch_size=10000): + """按批次读取 JSONL 文件""" + batch = [] + with open(file_path, mode="r", encoding="utf-8") as f: + for line in f: + try: + batch.append(json.loads(line.strip())) + if len(batch) == batch_size: + yield batch + batch = [] + except json.JSONDecodeError as e: + print(f"Error decoding JSON: {e}") + if batch: + yield batch + + +def process_data_concurrently(data_list, api_url, headers, max_workers=10): + """并发处理数据并调用 API""" + result_data = [] + + def process_single_data(data): + try: + query = data.get('data') + if query: + input_content = f'''你是一个多轮对话分析专家。请根据以下维度对给定的多轮对话(2-3轮)进行细粒度分类,并按统一格式返回结果。 +# 输入规范 +1. 输入为完整的多轮对话(2-3组问答对) +2. 每组问答包含明确的用户提问和系统/客服响应 + +# 核心分析维度 +## 1. 指代消解模式 +- 显式指代:直接使用完整名词(订单号123) +- 代词依赖:需解析"它/这个/那个"等代词 +- 省略恢复:需补全省略成分("还有呢?"→"其他问题") +- 零指代:无主语句("还没处理") + +## 2. 信息流结构 +- 自足单元:当前轮次信息完整 +- 跨轮依赖:需前文关键信息 +- 隐式跳转:无过渡的话题切换 +- 意图延续:延续前轮任务目标 +- 推理关联:需逻辑推算建立联系 + +## 3. 上下文管理 +- 短期记忆:依赖1-2轮内信息 +- 长期记忆:引用3轮前历史记录 +- 推理记忆:需计算/推理关联信息 + +## 4. 状态演化 +- 静态查询:纯信息检索 +- 动态演进:逐步完善任务参数 +- 混合操作:查询+修改组合 +- 信息修正:更正前轮错误数据 + +# 输出规范'''+'''```json +{ + "labels": ["指代消解::显式指代", "信息流::跨轮依赖", "上下文::推理记忆", "状态演化::动态演进"], + "analysis": { + "指代消解": { + "类型": "显式指代", + "证据": "例句:'订单A123'直接使用完整标识符" + }, + "信息流": { + "类型": "跨轮依赖", + "证据": "客服要求提供订单号是对用户首轮请求的响应" + }, + "上下文": { + "类型": "推理记忆", + "证据": "需要根据首轮日期推算当前状态" + }, + "状态演化": { + "类型": "动态演进", + "证据": "从查询请求逐步收集必要参数" + } + } +}```'''+f'''让我们一步一步思考,给出最后的返回结果,输入的多轮对话:{query}''' + response = requests.post( + api_url, + headers=headers, + json={ + "model": "Qwen2.5-72B-Instruct", + "stream": False, + "temperature": 0.01, + "messages": [{"role": "user", "content": input_content}] + } + ) + if response.status_code == 200: + try: + content = response.json()["choices"][0]["message"]["content"] + except (KeyError, IndexError, json.JSONDecodeError): + content = "无法解析返回内容" + else: + content = f"API请求失败,状态码:{response.status_code}" + return { + "uid": data.get('uid'), + "data": query, + "answer": content + } + except Exception as e: + print(e) + return None + + with ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = [executor.submit(process_single_data, data) for data in data_list] + for future in as_completed(futures): + result = future.result() + if result: + result_data.append(result) + + return result_data + + +def save_to_excel_in_batches(data_list, output_file, batch_size=10000): + """按批次保存数据到 Excel 文件""" + df = pd.DataFrame(data_list) + writer = pd.ExcelWriter(output_file, engine='openpyxl') + for i in range(0, len(df), batch_size): + batch_df = df.iloc[i:i + batch_size] + batch_df.to_excel(writer, index=False, startrow=i) + writer.close() + print(f"数据已成功保存到 {output_file}") + +def save_to_jsonl_in_batches(data_list, output_file, batch_size=10000): + """按批次保存数据到 JSONL 文件""" + with open(output_file, 'w', encoding='utf-8') as f: + for i in range(0, len(data_list), batch_size): + # 获取当前批次的数据 + batch_data = data_list[i:i + batch_size] + # 将每个数据对象写入文件,每行一个 JSON 对象 + for item in batch_data: + f.write(json.dumps(item, ensure_ascii=False) + '\n') + print(f"数据已成功保存到 {output_file}") + +if __name__ == "__main__": + #output_excel_file = 'result-taoli-5.xlsx' + # api_url = "http://100.105.149.39:8000/v1/chat/completions" + api_url = "http://100.105.230.95:8000/v1/chat/completions" + headers = { + "Content-Type": "application/json", + "Authorization": "7c3eafb5-2d6e-100d-ab0f-7b2c1cdafb3c" + } + + #file_path = '/dataset-pvc/suojiayi/new/train_prepare/20250423_020157/tmp_data/instruct_data_BELLE_Multiturn_Chat_filtered_2504232014.jsonl' + file_path = '/data/suojiayi/buquan/split_dhbq/part_2.jsonl' + output_file = './dhbq/duihuabuquan_prompt_2.jsonl' + #file_path = '/dataset-pvc/suojiayi/new/train_prepare/20250423_020157/tmp_data/instruct_data_COIG_filtered_2504212014.jsonl' + all_results = [] + for batch in read_jsonl_lines_in_batches(file_path, batch_size=10000): + processed_batch = process_data_concurrently(batch, api_url, headers, max_workers=20) + all_results.extend(processed_batch) + # save_to_excel_in_batches(all_results, output_excel_file, batch_size=23000) + save_to_jsonl_in_batches(all_results, output_file, batch_size=10000) diff --git a/data_generate/query_completion/score.py b/data_generate/query_completion/score.py new file mode 100644 index 0000000..ae7db92 --- /dev/null +++ b/data_generate/query_completion/score.py @@ -0,0 +1,143 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright @2024 AI. Inspur Inc. +# +# @author: suojiayi +# @date: 2025/05/13 + +import json +import requests +import pandas as pd +from concurrent.futures import ThreadPoolExecutor, as_completed + +def read_jsonl_lines_in_batches(file_path, batch_size=10000): + """按批次读取 JSONL 文件""" + batch = [] + with open(file_path, mode="r", encoding="utf-8") as f: + for line in f: + try: + batch.append(json.loads(line.strip())) + if len(batch) == batch_size: + yield batch + batch = [] + except json.JSONDecodeError as e: + print(f"Error decoding JSON: {e}") + if batch: + yield batch + + +def process_data_concurrently(data_list, api_url, headers, max_workers=10): + """并发处理数据并调用 API""" + result_data = [] + + def process_single_data(data): + try: + query = data.get('data') + if query: + question = query[-1]['content'] + conversation = query[:len(query)-1] + input_content = f'''任务说明: +请对给定的历史多轮对话进行分析,结合上下文逻辑和多轮对话特性,识别最后一轮问题的缺失信息, 对改写后的问题进行评分(1-5分)。 + +改写要求: +忠实性:确保改写后的问题与原问题意图一致,无歧义、无信息偏差,且无冗余修改。 +完整性:若上下文可提供必要信息,应填补空白,使问题更完整,同时保持语义连贯。 +保守性:若上下文无法有效澄清原问题,则保持原问题不变(不强行补全)。 +语义一致性:不得改变原问题的语义或缩小其范围。 +中立性:不得针对模型回答进行优化(即避免引导性改写)。 +时间相关性:除非原问题涉及时间,否则不得引入无关时间信息。 + +评分标准(1-5分): +分数 标准描述 +5(优秀) 完全符合原意图,填补信息精准且必要,语义连贯,无冗余或无关修改。 +4(良好) 基本符合原意图,补充信息合理但可能稍显冗余或不够精准,语义基本连贯。 +3(中等) 大体保留原意图,但补充信息可能不精准或必要性不足,语义稍显生硬。 +2(较差) 部分偏离原意图,补充信息冗余或关联性弱,语义不连贯或存在歧义。 +1(差) 严重偏离原意图,补充信息错误或无关,语义混乱,或强行补全导致问题失真。 +特殊情况处理: +上下文无关:若无法从上下文合理推断缺失信息,则保持原问题不变(5分)。 +强行补全:若补充信息导致语义改变或范围收窄,应扣分(视情况给2分或更低)。 + +原多轮对话及问题:{conversation} +改写后问题:{question} +评分:请基于上述标准给出1-5分的评分。 +输出格式: +仅返回1-5之间的整数分数,无需解释。''' + # print(input_content) + response = requests.post( + api_url, + headers=headers, + json={ + "model": "Qwen2.5-72B-Instruct", + "stream": False, + "temperature": 0.01, + "messages": [{"role": "user", "content": input_content}] + } + ) + if response.status_code == 200: + try: + content = response.json()["choices"][0]["message"]["content"] + except (KeyError, IndexError, json.JSONDecodeError): + content = "无法解析返回内容" + else: + content = f"API请求失败,状态码:{response.status_code}" + return { + "uid": data.get('uid'), + "data": query, + "answer": content + } + except Exception as e: + print(e) + return None + + with ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = [executor.submit(process_single_data, data) for data in data_list] + for future in as_completed(futures): + result = future.result() + if result: + result_data.append(result) + + return result_data + + +def save_to_excel_in_batches(data_list, output_file, batch_size=10000): + """按批次保存数据到 Excel 文件""" + df = pd.DataFrame(data_list) + writer = pd.ExcelWriter(output_file, engine='openpyxl') + for i in range(0, len(df), batch_size): + batch_df = df.iloc[i:i + batch_size] + batch_df.to_excel(writer, index=False, startrow=i) + writer.close() + print(f"数据已成功保存到 {output_file}") + +def save_to_jsonl_in_batches(data_list, output_file, batch_size=10000): + """按批次保存数据到 JSONL 文件""" + with open(output_file, 'w', encoding='utf-8') as f: + for i in range(0, len(data_list), batch_size): + # 获取当前批次的数据 + batch_data = data_list[i:i + batch_size] + # 将每个数据对象写入文件,每行一个 JSON 对象 + for item in batch_data: + f.write(json.dumps(item, ensure_ascii=False) + '\n') + print(f"数据已成功保存到 {output_file}") + +if __name__ == "__main__": + #output_excel_file = 'result-taoli-5.xlsx' + api_url = "http://100.105.48.35:8000/v1/chat/completions" + # api_url = "http://100.105.107.150:8000/v1/chat/completions" + headers = { + "Content-Type": "application/json", + "Authorization": "7c3eafb5-2d6e-100d-ab0f-7b2c1cdafb3c" + } + + #file_path = '/dataset-pvc/suojiayi/new/train_prepare/20250423_020157/tmp_data/instruct_data_BELLE_Multiturn_Chat_filtered_2504232014.jsonl' + file_path = '/data/suojiayi/buquan/split_dhbq/part_1.jsonl' + output_file = './dhbq/duihuabuquan_score_1.jsonl' + #file_path = '/dataset-pvc/suojiayi/new/train_prepare/20250423_020157/tmp_data/instruct_data_COIG_filtered_2504212014.jsonl' + all_results = [] + for batch in read_jsonl_lines_in_batches(file_path, batch_size=10000): + processed_batch = process_data_concurrently(batch, api_url, headers, max_workers=20) + all_results.extend(processed_batch) + # save_to_excel_in_batches(all_results, output_excel_file, batch_size=23000) + save_to_jsonl_in_batches(all_results, output_file, batch_size=10000) diff --git a/data_generate/query_completion/select_final_data.py b/data_generate/query_completion/select_final_data.py new file mode 100644 index 0000000..77f4381 --- /dev/null +++ b/data_generate/query_completion/select_final_data.py @@ -0,0 +1,179 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright @2024 AI. Inspur Inc. +# +# @author: suojiayi +# @date: 2025/05/13 + +import json +import pandas as pd + + +def load_excel_column(file_path, column_index=2): + """读取 Excel 文件的第三列,并将每一行解析为 dict""" + df = pd.read_excel(file_path) + + result = [] + seen_uids = set() + + for _, row in df.iterrows(): + try: + data = json.loads(row.iloc[column_index]) + if isinstance(data, dict) and 'uid' in data: + if data['uid'] not in seen_uids: + seen_uids.add(data['uid']) + result.append(data) + except (json.JSONDecodeError, IndexError): + continue + return result + + +def process_jsonl_file(file_path, top_n_per_group, result, seen_uids): + """ + 读取 jsonl 文件,对每一行 group 数据,尽力从中选出 top_n_per_group 条不重复数据 + 如果无法选够指定数量的数据,则跳过该 group 并打印警告信息。 + """ + with open(file_path, 'r', encoding='utf-8') as f: + for line_idx, line in enumerate(f): + line = line.strip() + if not line: + continue + try: + entry = json.loads(line) + + # 获取 group key 和对应的数据列表 + key = next(k for k in entry if k != "count") + records = entry[key] + + selected = [] + added_count = 0 + + # 遍历 records,直到选够 top_n_per_group 个不重复的数据 + for item in records: + uid = item.get('uid') + + if added_count >= top_n_per_group: + break # 已经选够了,退出 + + if uid and uid not in seen_uids: + seen_uids.add(uid) + selected.append(item) + result.append(item) + added_count += 1 + + # 如果最终无法选满 top_n_per_group 条,可以给出警告或处理 + if added_count < top_n_per_group: + print(f"[Group {key}] Only found {added_count}/{top_n_per_group} unique items. Skipping this group.") + # 如果需要的话,可以从结果中移除已添加的记录 + for item in selected: + result.remove(item) + seen_uids.discard(item.get('uid')) + else: + print(f"[Group {key}] Successfully added {added_count} items.") + + except Exception as e: + print(f"Error processing line {line_idx + 1}: {e}") + continue + +def is_similar_to_existing(embedding, existing_embeddings, threshold=0.85): + """ + 判断当前 embedding 是否与已有 embeddings 的最大余弦相似度超过阈值 + """ + if not existing_embeddings: + return False + + current_emb = torch.tensor(embedding, dtype=torch.float32) + existing_emb = torch.tensor(existing_embeddings, dtype=torch.float32) + + similarity = F.cosine_similarity(current_emb.unsqueeze(0), existing_emb) + return (similarity > threshold).any().item() + +def process_cluster_jsonl(file_path, top_n_per_group, result, seen_uids, similarity_threshold=0.5): + """ + 处理 dhbq_count_cluster.jsonl,每组最多取 top_n_per_group 条, + 并根据 embedding 字段进行余弦相似度过滤。 + """ + with open(file_path, 'r', encoding='utf-8') as f: + for line_idx, line in enumerate(f): + line = line.strip() + if not line: + continue + try: + entry = json.loads(line) + key = next(k for k in entry if k != "count") + records = entry[key] + + selected = [] + added_count = 0 + existing_embeddings = [item['embedding'] for item in result if 'embedding' in item] # 已有所有 embedding + + for item in records: + uid = item.get('uid') + emb = item.get('embedding') + + if added_count >= top_n_per_group: + break + + if not emb or not isinstance(emb, list): + print(f"Skipping item {uid}: missing or invalid embedding.") + continue + + if uid and uid not in seen_uids: + if is_similar_to_existing(emb, existing_embeddings, similarity_threshold): + print(f"Skipping item {uid} due to high similarity.") + continue + + seen_uids.add(uid) + selected.append(item) + result.append(item) + existing_embeddings.append(emb) + added_count += 1 + + print(f"[Group {key}] Successfully added {added_count} items.") + + except Exception as e: + print(f"Error processing line {line_idx + 1}: {e}") + continue + +def main(): + result = [] + seen_uids = set() + + # Step 1: 读取 Excel 第三列 + print("Step 1: Reading Excel file...") + excel_data = load_excel_column("./dhbq/frequency-score-5.xlsx", column_index=2) + for item in excel_data: + uid = item.get('uid') + if uid and uid not in seen_uids: + seen_uids.add(uid) + result.append(item) + + # Step 2: 处理 dhbq_count_prompt_label.jsonl,每组前3条 + print("Step 2: Processing dhbq_count_prompt_label.jsonl...") + process_jsonl_file("./dhbq/dhbq_count_prompt_label.jsonl", 170, result, seen_uids) + + # Step 3: 处理 dhbq_count_instag.jsonl,每组前130条 + print("Step 3: Processing dhbq_count_instag.jsonl...") + process_jsonl_file("./dhbq/dhbq_count_instag.jsonl", 3, result, seen_uids) + + # # Step 4: 处理 dhbq_count_cluster.jsonl,每组前4条 + # print("Step 4: Processing dhbq_count_cluster.jsonl...") + # process_jsonl_file("./dhbq/dhbq_count_cluster_0513.jsonl", 5, result, seen_uids) + print("Step 4: Processing dhbq_count_cluster.jsonl...") + process_cluster_jsonl("./dhbq/dhbq_count_cluster_0513.jsonl", 5, result, seen_uids, similarity_threshold=0.5) + + # 最终写入输出文件 + output_file = "./dhbq/merged_result_for_more_test_0513.jsonl" + with open(output_file, 'w', encoding='utf-8') as f: + for item in result: + f.write(json.dumps(item, ensure_ascii=False) + '\n') + + print(f"Total merged items: {len(result)}") + print(f"Saved to: {output_file}") + + +if __name__ == "__main__": + main() + +