offline_data_model_pipline/data_generate/query_completion/embedding_similarity.py

136 lines
5.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright @2024 AI. Inspur Inc.
#
# @author: suojiayi <suojiayi@inspur.com>
# @date: 2025/05/13
import torch
import pandas as pd
from torch.nn.functional import normalize
import faiss
import numpy as np
from transformers import AutoTokenizer, AutoModel
from tqdm import tqdm
from loguru import logger
# 设置随机种子
def set_seed(seed=42):
import random
import numpy as np
import torch
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
if torch.cuda.is_available():
torch.cuda.manual_seed_all(seed)
# 文本去重类
class SemanticDeduplicator:
def __init__(self, model_name_or_path: str, device: str = "cuda"):
self.device = torch.device(device if torch.cuda.is_available() else "cpu")
self.tokenizer = AutoTokenizer.from_pretrained(model_name_or_path)
self.model = AutoModel.from_pretrained(model_name_or_path).to(self.device)
self.model.eval()
self.dimension = self.model.config.hidden_size
self.index = faiss.IndexFlatIP(self.dimension) # 内积即余弦相似度
self.seen_embeddings = []
@torch.no_grad()
def get_embedding(self, texts):
inputs = self.tokenizer(texts, padding=True, truncation=True, max_length=512, return_tensors="pt").to(self.device)
outputs = self.model(**inputs)
embeddings = outputs.last_hidden_state[:, 0, :] # 取 [CLS] 向量作为句子表示
embeddings = normalize(embeddings, p=2, dim=1).cpu().numpy()
return embeddings
def deduplicate(self, texts, threshold=0.85):
result = []
for text in tqdm(texts, desc="De-duplicating"):
if not text.strip():
continue
emb = self.get_embedding([text])
if len(self.seen_embeddings) > 0:
self.index.add(np.array(self.seen_embeddings))
D, I = self.index.search(emb, k=1)
if '6+6' in texts:
print(D[0][0])
if '4+4' in texts:
print(D[0][0])
if D[0][0] < threshold:
result.append(text)
self.seen_embeddings.append(emb[0])
self.index.reset() # 每次 search 后要清空 index 缓存
else:
result.append(text)
self.seen_embeddings.append(emb[0])
return result
def main():
# 配置参数
input_path = './dhbq/frequency-score-5.xlsx' # 输入Excel文件路径
output_path = './dhbq/frequency-score-5-deduplicated.xlsx' # 输出Excel文件路径
model_name_or_path = "/model-pvc/suojiayi/bge-base-zh-v1.5/" # 使用的预训练模型名称
col_index = 0 # 需要去重的列索引
threshold = 0.65 # 相似度阈值
logger.info("加载数据...")
df = pd.read_excel(input_path).fillna("")
texts = df.iloc[:, col_index].astype(str).str.strip().tolist()
logger.info(f"开始语义去重(模型: {model_name_or_path} | 相似度阈值: {threshold}")
deduplicator = SemanticDeduplicator(model_name_or_path)
unique_texts = deduplicator.deduplicate(texts=texts, threshold=threshold)
logger.info(f"去重完成,共保留 {len(unique_texts)} 条文本")
new_df = pd.DataFrame({df.columns[col_index]: unique_texts})
merged_df = pd.merge(new_df, df, on=df.columns[col_index], how='left')
# 方法2: 使用 map适合映射单列
# new_df[df.columns[1]] = new_df[df.columns[col_index]].map(df.set_index(df.columns[col_index])[df.columns[1]])
# new_df[df.columns[2]] = new_df[df.columns[col_index]].map(df.set_index(df.columns[col_index])[df.columns[2]])
# 如果你只想保留 new_df 中存在的列,可以这样:
final_df = merged_df[df.columns] # 按原始顺序保留所有列
logger.info(f"保存结果到 {output_path}")
final_df.to_excel(output_path, index=False)
logger.info("保存成功!")
if __name__ == "__main__":
set_seed()
main()
#
# from transformers import AutoTokenizer, AutoModel
# import torch
# import numpy as np
#
# def get_embedding(model, tokenizer, text):
# inputs = tokenizer(text, padding=True, truncation=True, return_tensors="pt").to("cuda")
# with torch.no_grad():
# outputs = model(**inputs)
# embeddings = outputs.last_hidden_state[:, 0, :].cpu().numpy() # [CLS] 向量
# return embeddings
#
# def cos_sim(a, b):
# return np.dot(a, b.T) / (np.linalg.norm(a) * np.linalg.norm(b))
#
# # 加载模型
# model_path = "/model-pvc/suojiayi/bge-base-zh-v1.5/"
# tokenizer = AutoTokenizer.from_pretrained(model_path)
# model = AutoModel.from_pretrained(model_path).to("cuda")
#
# # 测试文本
# text1 = "今天天气"
# text2 = "今天天气怎么样"
#
# emb1 = get_embedding(model, tokenizer, text1)
# emb2 = get_embedding(model, tokenizer, text2)
#
# similarity = cos_sim(emb1, emb2)
# print(f"相似度:{similarity[0][0]:.4f}")