Source code for FlagEmbedding.evaluation.mteb.runner
import logging
import os
import mteb
import json
import pandas as pd
from typing import Tuple, Union
from FlagEmbedding.abc.evaluation import AbsEvalRunner, AbsEvalModelArgs
from .arguments import MTEBEvalArgs
from .searcher import MTEBEvalDenseRetriever, MTEBEvalReranker
from .prompts import get_task_def_by_task_name_and_type
logger = logging.getLogger(__name__)
def ensure_dir(file_path):
directory = os.path.dirname(file_path)
if not os.path.exists(directory):
os.makedirs(directory)
[docs]
class MTEBEvalRunner(AbsEvalRunner):
"""
Evaluation runner of MTEB.
"""
def __init__(
self,
eval_args: MTEBEvalArgs,
model_args: AbsEvalModelArgs,
):
self.eval_args = eval_args
self.model_args = model_args
self.retriever, self.reranker = self.load_retriever_and_reranker()
def load_retriever_and_reranker(self) -> Tuple[MTEBEvalDenseRetriever, Union[MTEBEvalReranker, None]]:
"""Load the retriever and reranker
Returns:
Tuple[MTEBEvalDenseRetriever, Union[MTEBEvalReranker, None]]: The retriever and reranker instances.
"""
embedder, reranker = self.get_models(self.model_args)
retriever = MTEBEvalDenseRetriever(
embedder,
search_top_k=self.eval_args.search_top_k,
overwrite=self.eval_args.overwrite
)
if reranker is not None:
reranker = MTEBEvalReranker(reranker, rerank_top_k=self.eval_args.rerank_top_k)
return retriever, reranker
def read_results(self, output_folder, tasks):
"""Read the evaluation results from directory.
Args:
output_folder (str): Path to the directory with results.
tasks (list): List of MTEB tasks.
Returns:
dict: The results of all the tasks.
"""
tasks_results = {}
task_types = list(set([t.metadata.type for t in tasks]))
for t_type in task_types:
tasks_results[t_type] = {}
for t in tasks:
if t.metadata.type != t_type: continue
task_name = t.metadata.name
metric = t.metadata.main_score
split = t.metadata.eval_splits[0]
if os.path.exists(os.path.join(output_folder, task_name + '.json')):
data = json.load(open(os.path.join(output_folder, task_name + '.json')))
tasks_results[t_type][task_name] = {}
for s in ['test', 'dev', 'validation']:
if s in data['scores']:
split = s
break
split = None
if split is None:
print('ERROR')
break
temp_datas = data['scores'][split][0]
temp_data = None
for td in temp_datas:
if td['hf_subset'] == 'default':
temp_data = td
if temp_data is None:
temp_data = temp_datas[0]
tasks_results[t_type][task_name] = round(temp_data['main_score'] * 100, 2)
print(f"tasks_results: {tasks_results}")
return tasks_results
def output_json(self, tasks_results, save_file):
"""Save the tasks results into a json file.
Args:
tasks_results (dict): The task results.
save_file (str): Path to a file to save the results.
"""
all_results = 0
all_results_num = 0
cqa_results = 0
cqa_results_num = 0
new_results = {}
for task_type in tasks_results.keys():
new_results[task_type] = {}
tmp_results = 0
tmp_results_num = 0
for task_name in tasks_results[task_type].keys():
if "CQADupstack" in task_name:
cqa_results += tasks_results[task_type][task_name]
cqa_results_num += 1
else:
new_results[task_type][task_name] = float(round(tasks_results[task_type][task_name], 2))
all_results_num += 1
all_results += tasks_results[task_type][task_name]
tmp_results_num += 1
tmp_results += tasks_results[task_type][task_name]
if cqa_results_num > 0:
cqa_results = cqa_results / cqa_results_num
new_results[task_type]["CQADupstack"] = float(round(cqa_results, 2))
all_results += cqa_results
all_results_num += 1
tmp_results += cqa_results
tmp_results_num += 1
new_results[task_type]['Avg'] = float(round(tmp_results / tmp_results_num, 2))
new_results['Avg'] = float(round(all_results / all_results_num, 2))
with open(save_file, 'w') as f:
json.dump(new_results, f)
def run(self):
"""
Run the evaluation.
"""
task_types = self.eval_args.task_types
tasks = self.eval_args.tasks
languages = self.eval_args.languages
tasks = mteb.get_tasks(
languages=languages,
tasks=tasks,
task_types=task_types
)
output_folder = self.eval_args.output_dir
for task in tasks:
task_name = task.metadata.name
task_type = task.metadata.type
self.retriever.stop_pool()
if self.eval_args.use_special_instructions:
try:
instruction = get_task_def_by_task_name_and_type(task_name, task_type)
self.retriever.set_instruction(instruction)
except:
logger.logger.info(f"No instruction found for {task_name}")
if self.eval_args.examples_path is not None:
try:
eg_pairs = json.load(open(os.path.join(self.eval_args.examples_path, task_name + '.json')))
except:
logger.logger.info(f"No examples found for {task_name}")
if task_type == 'Classification':
self.retriever.set_normalize_embeddings(False)
else:
self.retriever.set_normalize_embeddings(True)
evaluation = mteb.MTEB(tasks=[task])
results = evaluation.run(self.retriever, output_folder=f"{output_folder}/{str(self.retriever)}")
ensure_dir(self.eval_args.eval_output_path)
logger.info("Start computing metrics. Only save results as json.")
tasks_results = self.read_results(f"{output_folder}/{str(self.retriever)}/no_model_name_available/no_revision_available", tasks)
self.output_json(tasks_results, self.eval_args.eval_output_path)