| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334 | '''    Generate prompts for the LLM Needle Haystack.    Source code from:         https://github.com/gkamradt/LLMTest_NeedleInAHaystack/tree/main        https://github.com/THUDM/LongAlign/tree/main/Needle_test'''from dotenv import load_dotenvimport osimport tiktokenimport globimport jsonimport yamlimport argparsefrom anthropic import Anthropicimport numpy as npimport asynciofrom asyncio import Semaphorefrom transformers import AutoTokenizerload_dotenv()class Prompter:    """    This class is used to test the LLM Needle Haystack.    """    def __init__(self,                 needle="\nThe best thing to do in San Francisco is eat a sandwich and sit in Dolores Park on a sunny day.\n",                 haystack_dir="PaulGrahamEssays",                 retrieval_question="What is the best thing to do in San Francisco?",                 context_lengths_min = 1000,                 context_lengths_max = 200000,                 context_lengths_num_intervals = 35,                 context_lengths = None,                 document_depth_percent_min = 0,                 document_depth_percent_max = 100,                 document_depth_percent_intervals = 35,                 document_depth_percents = None,                 document_depth_percent_interval_type = "linear",                 tokenizer_type = "OpenAI",                 model_name = "gpt-4-1106-preview",                 num_concurrent_requests = 1,                 final_context_length_buffer = 200,                 save_dir = "prompts",                 print_ongoing_status = True):        """                :param needle: The needle to be found in the haystack. Default is None.        :param haystack_dir: The directory of text files to use as background context (or a haystack) in which the needle is to be found. Default is Paul Graham Essays.        :param retrieval_question: The question which with to prompt the model to do the retrieval.        :param results_version: In case you would like to try the same combination of model, context length, and depth % multiple times, change the results version other than 1        :param num_concurrent_requests: Due to volume, this object is set up to run concurrent requests, default = 1. Be careful of rate limits.        :param save_results: Whether or not you would like to save your contexts to file. Warning: These will get long! Default = True        :param save_contexts: Whether or not you would like to save your contexts to file. Warning: These will get long! Default is True.        :param final_context_length_buffer: The amount of cushion you'd like to leave off the input context to allow for the output context. Default 200 tokens        :param context_lengths_min: The minimum length of the context. Default is 1000.        :param context_lengths_max: The maximum length of the context. Default is 200000.        :param context_lengths_num_intervals: The number of intervals for the context length. Default is 35.        :param context_lengths: The lengths of the context. Default is None.        :param document_depth_percent_min: The minimum depth percent of the document. Default is 0.        :param document_depth_percent_max: The maximum depth percent of the document. Default is 100.        :param document_depth_percent_intervals: The number of intervals for the document depth percent. Default is 35.        :param document_depth_percents: The depth percentages of the document. Default is None.        :param document_depth_percent_interval_type: The type of interval for the document depth percent. Must be either 'linear' or 'sigmoid'. Default is 'linear'.        :param model_provider: The provider of the model. Must be either 'OpenAI' or 'Anthropic'. Default is 'OpenAI'.        :param openai_api_key: The API key for OpenAI. Default is None.        :param anthropic_api_key: The API key for Anthropic. Default is None.        :param model_name: The name of the model. Default is 'gpt-4-1106-preview'.        :param seconds_to_sleep_between_completions: The number of seconds to sleep between completions. Default is None.        :param print_ongoing_status: Whether or not to print the ongoing status. Default is True.        """        if not needle or not haystack_dir or not retrieval_question:            raise ValueError("Needle, haystack, and retrieval_question must be provided.")                self.needle = needle        self.haystack_dir = haystack_dir        self.retrieval_question = retrieval_question        self.num_concurrent_requests = num_concurrent_requests        self.final_context_length_buffer = final_context_length_buffer        self.print_ongoing_status = print_ongoing_status        self.tokenizer_type = tokenizer_type        self.model_name = model_name        self.testing_results = []        if context_lengths is None:            if context_lengths_min is None or context_lengths_max is None or context_lengths_num_intervals is None:                raise ValueError("Either context_lengths_min, context_lengths_max, context_lengths_intervals need to be filled out OR the context_lengths_list needs to be supplied.")            else:                self.context_lengths = np.round(np.linspace(context_lengths_min, context_lengths_max, num=context_lengths_num_intervals, endpoint=True)).astype(int)        else:            self.context_lengths = context_lengths        if document_depth_percents is None:            if document_depth_percent_min is None or document_depth_percent_max is None or document_depth_percent_intervals is None:                raise ValueError("Either document_depth_percent_min, document_depth_percent_max, document_depth_percent_intervals need to be filled out OR the document_depth_percents needs to be supplied.")            else:                if document_depth_percent_interval_type == 'linear':                    self.document_depth_percents = np.round(np.linspace(document_depth_percent_min, document_depth_percent_max, num=document_depth_percent_intervals, endpoint=True)).astype(int)                elif document_depth_percent_interval_type == 'sigmoid':                    self.document_depth_percents = [self.logistic(x) for x in np.linspace(document_depth_percent_min, document_depth_percent_max, document_depth_percent_intervals)]        else:            self.document_depth_percents = document_depth_percents        if document_depth_percent_interval_type not in [None, "linear", "sigmoid"]:            raise ValueError("document_depth_percent_interval_type must be either None, 'linear' or 'sigmoid'. If you'd like your own distribution give a list of ints in via document_depth_percent_intervals")                if self.tokenizer_type == "OpenAI":            assert self.model_name is not None, "If you're using OpenAI, you must provide a model name."            self.enc = tiktoken.encoding_for_model(self.model_name)        elif self.tokenizer_type == "Anthropic":            self.enc = Anthropic().get_tokenizer()        elif self.tokenizer_type == "Huggingface":            self.enc = AutoTokenizer.from_pretrained(self.model_name, trust_remote_code=True, use_fast=False)        else:            raise ValueError("tokenizer_type is not supported. Must be either 'tiktoken', 'Anthropic', or 'Huggingface'")                self.save_dir = save_dir    def logistic(self, x, L=100, x0=50, k=.1):        if x == 0:            return 0        if x == 100:            return 100        return np.round(L / (1 + np.exp(-k * (x - x0))), 3)        async def bound_evaluate_and_log(self, sem, *args):        async with sem:            await self.evaluate_and_log(*args)    async def run_test(self):        sem = Semaphore(self.num_concurrent_requests)        # Run through each iteration of context_lengths and depths        tasks = []        for context_length in self.context_lengths:            for depth_percent in self.document_depth_percents:                task = self.bound_evaluate_and_log(sem, context_length, depth_percent)                tasks.append(task)        # Wait for all tasks to complete        await asyncio.gather(*tasks)    def generate_prompt(self, context):        return [            {                "role": "system",                "content": "You are a helpful AI bot that answers questions for a user. Keep your response short and direct"            },            {                "role": "user",                "content": context + '\n\n' + f"Read the document and answer: {self.retrieval_question}"            },        ]    async def evaluate_and_log(self, context_length, depth_percent):        # Checks to see if you've already checked a length/percent/version.        # This helps if the program stop running and you want to restart later        # Go generate the required length context and place your needle statement in        context = await self.generate_context(context_length, depth_percent)        print('Generate for context length:', context_length, 'depth percent:', depth_percent)        # Prepare your message to send to the model you're going to evaluate        prompt = self.generate_prompt(context)        context_file_location = f'{self.tokenizer_type}_len_{context_length}_depth_{int(depth_percent*100)}'        # Save the prompts to file for retesting        if not os.path.exists(self.save_dir):            os.makedirs(self.save_dir)        # Save the result to file for retesting        with open(f'{self.save_dir}/{context_file_location}_prompts.json', 'w') as f:            json.dump(prompt, f)    async def generate_context(self, context_length, depth_percent):        # Load up tiktoken so we navigate tokens more easily        # Get your Paul Graham files loaded into a string        context = self.read_context_files()        # Truncate the Paul Graham essays to the context length you desire        context = self.encode_and_trim(context, context_length)        # Insert your random statement according to your depth percent        context = self.insert_needle(context, depth_percent, context_length)        return context        def encode_text_to_tokens(self, text):        if self.tokenizer_type == "OpenAI":            return self.enc.encode(text)        elif self.tokenizer_type == "Anthropic":            # Assuming you have a different encoder for Anthropic            return self.enc.encode(text).ids        elif self.tokenizer_type == "Huggingface":            return self.enc(text, truncation=False, return_tensors="pt", add_special_tokens=False).input_ids.view(-1).tolist()        else:            raise ValueError("tokenizer_type must be either 'OpenAI', 'Anthropic', or 'Huggingface'")        def insert_needle(self, context, depth_percent, context_length):        tokens_needle = self.encode_text_to_tokens(self.needle)        tokens_context = self.encode_text_to_tokens(context)                # Reducing the context length by 150 buffer. This is to account for system message, the user question, and response.        context_length -= self.final_context_length_buffer        # If your context + needle are longer than the context length (which it will be), then reduce tokens from the context by the needle length        if len(tokens_context) + len(tokens_needle) > context_length:            tokens_context = tokens_context[:context_length - len(tokens_needle)]        if depth_percent == 100:            # If your depth percent is 100 (which means your needle is the last thing in the doc), throw it at the end            tokens_new_context = tokens_context + tokens_needle        else:            # Go get the position (in terms of tokens) to insert your needle            insertion_point = int(len(tokens_context) * (depth_percent / 100))            # tokens_new_context represents the tokens before the needle            tokens_new_context = tokens_context[:insertion_point]            # We want to make sure that we place our needle at a sentence break so we first see what token a '.' is            period_tokens = self.encode_text_to_tokens('.')            period_tokens = [30930]                        # Then we iteration backwards until we find the first period            while tokens_new_context and tokens_new_context[-1] not in period_tokens:                insertion_point -= 1                tokens_new_context = tokens_context[:insertion_point]            # Once we get there, then add in your needle, and stick the rest of your context in on the other end.            # Now we have a needle in a haystack            tokens_new_context += tokens_needle + tokens_context[insertion_point:]        # Convert back to a string and return it        new_context = self.decode_tokens(tokens_new_context)        return new_context    def get_context_length_in_tokens(self, context):        if self.tokenizer_type == "OpenAI":            return len(self.enc.encode(context))        elif self.tokenizer_type == "Anthropic":            # Assuming you have a different encoder for Anthropic            return len(self.enc.encode(context).ids)        elif self.tokenizer_type == "Huggingface":            return self.enc(context, truncation=False, return_tensors="pt").input_ids.shape[-1]        else:            raise ValueError("tokenizer_type must be either 'OpenAI', 'Anthropic', or 'Huggingface'")    def read_context_files(self):        context = ""        max_context_length = max(self.context_lengths)        while self.get_context_length_in_tokens(context) < max_context_length:            for file in glob.glob(f"{self.haystack_dir}/*.txt"):                with open(file, 'r') as f:                    context += f.read()        return context    def get_tokens_from_context(self, context):        if self.tokenizer_type == "OpenAI":            return self.enc.encode(context)        elif self.tokenizer_type == "Anthropic":            # Assuming you have a different encoder for Anthropic            return self.enc.encode(context).ids        elif self.tokenizer_type == "Huggingface":            return self.enc(context, truncation=False, return_tensors="pt").input_ids.view(-1).tolist()        else:            raise ValueError("tokenizer_type must be either 'OpenAI', 'Anthropic', or 'Huggingface'")            def decode_tokens(self, tokens, context_length=None):        if self.tokenizer_type == "OpenAI":            return self.enc.decode(tokens[:context_length])        elif self.tokenizer_type == "Anthropic":            # Assuming you have a different decoder for Anthropic            return self.enc.decode(tokens[:context_length])        elif self.tokenizer_type == "Huggingface":            decoded = self.enc.decode(tokens[:context_length], skip_special_tokens=True)            return decoded        else:            raise ValueError("tokenizer_type must be either 'OpenAI', 'Anthropic', or 'Huggingface'")    def encode_and_trim(self, context, context_length):        tokens = self.get_tokens_from_context(context)        if len(tokens) > context_length:            context = self.decode_tokens(tokens, context_length)        return context        def get_results(self):        return self.testing_results        def print_start_test_summary(self):        print ("\n")        print ("Starting Prompt Generation ...")        print (f"- Tokenizer: {self.tokenizer_type}")        print (f"- Context Lengths: {len(self.context_lengths)}, Min: {min(self.context_lengths)}, Max: {max(self.context_lengths)}")        print (f"- Document Depths: {len(self.document_depth_percents)}, Min: {min(self.document_depth_percents)}%, Max: {max(self.document_depth_percents)}%")        print (f"- Needle: {self.needle.strip()}")        print ("\n\n")    def start_test(self):        if self.print_ongoing_status:            self.print_start_test_summary()        asyncio.run(self.run_test())if __name__ == '__main__':    with open('utils/needle_test/config-prompt.yaml', 'r') as file:        config = yaml.load(file, Loader=yaml.FullLoader)    parser = argparse.ArgumentParser()    parser.add_argument('--model_name', type=str, default='None')    args = parser.parse_args()    ht = Prompter(        needle=config['prompt']['needle'],        haystack_dir=config['prompt']['haystack_dir'],        retrieval_question=config['prompt']['retrieval_question'],        context_lengths_min=config['context']['min_len'],        context_lengths_max=config['context']['max_len'],        context_lengths_num_intervals=config['context']['interval'],        context_lengths=config['context']['manually_select_list'],        document_depth_percent_min=config['document_depth']['min_percent'],        document_depth_percent_max=config['document_depth']['max_percent'],        document_depth_percent_intervals=config['document_depth']['interval'],        document_depth_percents=config['document_depth']['manually_select_list'],        document_depth_percent_interval_type=config['document_depth']['interval_type'],        tokenizer_type=config['tokenizer']['tokenizer_type'],        model_name=args.model_name,        save_dir=config['save_dir'],    )    ht.start_test()
 |