Преглед изворни кода

working draft for vllm using llama3-70B

Kai Wu пре 1 година
родитељ
комит
b07cbad1d7

BIN
recipes/use_cases/end2end-recipes/chatbot/data_pipelines/._faq-data


+ 6 - 1
recipes/use_cases/end2end-recipes/chatbot/data_pipelines/config.py

@@ -9,5 +9,10 @@ def load_config(config_path: str = "./config.yaml"):
     with open(config_path, "r") as file:
         config = yaml.safe_load(file)
     # Set the API key from the environment variable
-    config["api_key"] = os.environ["OCTOAI_API_TOKEN"]
+    try:
+        config["api_key"] = os.environ["OCTOAI_API_TOKEN"]
+    except KeyError:
+        print("API token did not found, please set the OCTOAI_API_TOKEN environment variable if using OctoAI, otherwise set api_key to default EMPTY")
+        # local Vllm endpoint did not need API key, so set the API key to "EMPTY" if not found
+        config["api_key"] = "EMPTY"
     return config

+ 8 - 8
recipes/use_cases/end2end-recipes/chatbot/data_pipelines/config.yaml

@@ -2,24 +2,24 @@ question_prompt_template: >
   You are a language model skilled in creating quiz questions.
   You will be provided with a document,
   read it and generate question and answer pairs
-  that are most likely be asked by a use of llama that just want to start, 
+  that are most likely be asked by a use of llama that just want to start,
   please make sure you follow those rules,
   1. Generate only {total_questions} question answer pairs.
   2. Generate in {language}.
-  3. The questions can be answered based *solely* on the given passage. 
+  3. The questions can be answered based *solely* on the given passage.
   4. Avoid asking questions with similar meaning.
   5. Make the answer as concise as possible, it should be at most 60 words.
   6. Provide relevant links from the document to support the answer.
   7. Never use any abbreviation.
-  8. Return the result in json format with the template: 
+  8. Return the result in json format with the template:
     [
       {{
-        "question": "your question A.",
-        "answer": "your answer to question A."
+        "Question": "your question A.",
+        "Answer": "your answer to question A."
       }},
       {{
-        "question": "your question B.",
-        "answer": "your answer to question B."
+        "Question": "your question B.",
+        "Answer": "your answer to question B."
       }}
     ]
 
@@ -27,4 +27,4 @@ data_dir: "./data"
 
 language: "English"
 
-total_questions: 2
+total_questions: 1000

+ 6 - 6
recipes/use_cases/end2end-recipes/chatbot/data_pipelines/doc_processor.py

@@ -6,13 +6,13 @@ AVERAGE_TOKENS_PER_RESULT = 100
 
 def get_token_limit_for_model(model: str) -> int:
     """Returns the token limit for a given model."""
-    if model == "llama-2-70b-chat-fp16" or model == "llama-2-13b-chat-turbo":
+    if model == "llama-2-13b-chat" or model == "llama-2-70b-chat":
         return 4096
-    
+    else:
+        return 8192
 
 def calculate_num_tokens_for_message(encoded_text) -> int:
     """Calculates the number of tokens used by a message."""
-    
     # Added 3 to account for priming with assistant's reply, as per original comment
     return len(encoded_text) + 3
 
@@ -29,7 +29,7 @@ def split_text_into_chunks(context: dict, text: str, tokenizer) -> list[str]:
     estimated_total_question_tokens = estimated_tokens_per_question * context["total_questions"]
     # Ensure there's a reasonable minimum chunk size
     max_tokens_for_text = max(model_token_limit - tokens_for_questions - estimated_total_question_tokens, model_token_limit // 10)
-    
+
     chunks, current_chunk = [], []
     print(f"Splitting text into chunks of {max_tokens_for_text} tokens, encoded_text {len(encoded_text)}", flush=True)
     for token in encoded_text:
@@ -43,5 +43,5 @@ def split_text_into_chunks(context: dict, text: str, tokenizer) -> list[str]:
         chunks.append(tokenizer.decode(current_chunk).strip())
 
     print(f"Number of chunks in the processed text: {len(chunks)}", flush=True)
-   
-    return chunks
+
+    return chunks

+ 15 - 12
recipes/use_cases/end2end-recipes/chatbot/data_pipelines/generate_question_answers.py

@@ -5,12 +5,12 @@ import argparse
 import asyncio
 import json
 from config import load_config
-from generator_utils import generate_question_batches, parse_qa_to_json
+from generator_utils import generate_question_batches, parse_qa_to_json, get_model_name
 from itertools import chain
 import logging
 import aiofiles  # Ensure aiofiles is installed for async file operations
 from abc import ABC, abstractmethod
-from octoai.client import Client
+from octoai.client import OctoAI
 from functools import partial
 from openai import OpenAI
 
@@ -35,7 +35,7 @@ class OctoAIChatService(ChatService):
         async with request_limiter:
             try:
                 event_loop = asyncio.get_running_loop()
-                client = Client(api_context['api_key'])
+                client = OctoAI(api_context['api_key'])
                 api_chat_call = partial(
                     client.chat.completions.create,
                     model=api_context['model'],
@@ -48,7 +48,7 @@ class OctoAIChatService(ChatService):
 
                 return assistant_response_json
             except Exception as error:
-                print(f"Error during chat request execution: {error}")
+                logging.error(f"Error during chat request execution: {error}",exc_info=True)
                 return ""
 # Use the local vllm openai compatible server for generating question/answer pairs to make API call syntax consistent
 # please read for more detail:https://docs.vllm.ai/en/latest/serving/openai_compatible_server.html.
@@ -57,25 +57,25 @@ class VllmChatService(ChatService):
         async with request_limiter:
             try:
                 event_loop = asyncio.get_running_loop()
-                client = OpenAI(api_key="EMPTY", base_url="http://localhost:"+ api_context['end_point']+"/v1")
+                model_name = get_model_name(api_context['model'])
+                client = OpenAI(api_key=api_context['api_key'], base_url="http://localhost:"+ str(api_context['endpoint'])+"/v1")
                 api_chat_call = partial(
                     client.chat.completions.create,
-                    model=api_context['model'],
+                    model=model_name,
                     messages=chat_request,
                     temperature=0.0
                 )
                 response = await event_loop.run_in_executor(None, api_chat_call)
                 assistant_response = next((choice.message.content for choice in response.choices if choice.message.role == 'assistant'), "")
                 assistant_response_json = parse_qa_to_json(assistant_response)
-
+                assert(len(assistant_response_json)!=0)
                 return assistant_response_json
             except Exception as error:
-                print(f"Error during chat request execution: {error}")
+                logging.error(f"Error during chat request execution: {error}",exc_info=True)
                 return ""
 
 async def main(context):
     if context["endpoint"]:
-        logging.info(f" Use local vllm service at port '{context["endpoint"]}'.")
         chat_service = VllmChatService()
     else:
         chat_service = OctoAIChatService()
@@ -93,7 +93,7 @@ async def main(context):
         logging.info("Data successfully written to 'data.json'. Process completed.")
 
     except Exception as e:
-        logging.error(f"An unexpected error occurred during the process: {e}")
+        logging.error(f"An unexpected error occurred during the process: {e}",exc_info=True)
 
 def parse_arguments():
     # Define command line arguments for the script
@@ -108,7 +108,7 @@ def parse_arguments():
     )
     parser.add_argument(
         "-m", "--model",
-        choices=["meta-llama-3-70b-instruct","meta-llama-3-8b-instruct","llama-2-70b-chat-fp16", "llama-2-13b-chat-fp16"],
+        choices=["meta-llama-3-70b-instruct","meta-llama-3-8b-instruct","llama-2-13b-chat", "llama-2-70b-chat"],
         default="meta-llama-3-70b-instruct",
         help="Select the model to use for generation."
     )
@@ -120,8 +120,9 @@ def parse_arguments():
     parser.add_argument(
         "-v", "--vllm_endpoint",
         default=None,
+        type=int,
         help="If a port is specified, then use local vllm endpoint for generating question/answer pairs."
-
+    )
     return parser.parse_args()
 
 if __name__ == "__main__":
@@ -133,4 +134,6 @@ if __name__ == "__main__":
     context["model"] = args.model
     context["endpoint"] = args.vllm_endpoint
     logging.info(f"Configuration loaded. Generating {args.total_questions} question/answer pairs using model '{args.model}'.")
+    if context["endpoint"]:
+        logging.info(f"Use local vllm service at port: '{args.vllm_endpoint}'.")
     asyncio.run(main(context))

+ 61 - 19
recipes/use_cases/end2end-recipes/chatbot/data_pipelines/generator_utils.py

@@ -4,21 +4,33 @@
 import os
 import re
 from transformers import  AutoTokenizer
-from octoai.client import Client
 import asyncio
 import magic
 from PyPDF2 import PdfReader
 import json
 from doc_processor import split_text_into_chunks
 import logging
+import json
 # Initialize logging
 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
 
-
+# Since OctoAI has different naming for llama models, get the huggingface offical model name using OctoAI names.
+def get_model_name(model):
+    if model == "meta-llama-3-70b-instruct":
+        return "meta-llama/Meta-Llama-3-70B-Instruct"
+    elif model == "meta-llama-3-8b-instruct":
+        return "meta-llama/Meta-Llama-3-8B-Instruct"
+    elif model == "llama-2-7b-chat":
+        return "meta-llama/Llama-2-7b-chat-hf"
+    else:
+        return "meta-llama/Llama-2-70b-chat-hf"
 def read_text_file(file_path):
     try:
         with open(file_path, 'r') as f:
-            return f.read().strip() + ' '
+            text = f.read().strip() + ' '
+            if len(text) == 0:
+                print("File is empty ",file_path)
+            return text
     except Exception as e:
         logging.error(f"Error reading text file {file_path}: {e}")
     return ''
@@ -29,6 +41,9 @@ def read_pdf_file(file_path):
             pdf_reader = PdfReader(f)
             num_pages = len(pdf_reader.pages)
             file_text = [pdf_reader.pages[page_num].extract_text().strip() + ' ' for page_num in range(num_pages)]
+            text = ''.join(file_text)
+            if len(text) == 0:
+                print("File is empty ",file_path)
             return ''.join(file_text)
     except Exception as e:
         logging.error(f"Error reading PDF file {file_path}: {e}")
@@ -41,6 +56,8 @@ def read_json_file(file_path):
             # Assuming each item in the list has a 'question' and 'answer' key
             # Concatenating question and answer pairs with a space in between and accumulating them into a single string
             file_text = ' '.join([item['question'].strip() + ' ' + item['answer'].strip() + ' ' for item in data])
+            if len(file_text) == 0:
+                print("File is empty ",file_path)
             return file_text
     except Exception as e:
         logging.error(f"Error reading JSON file {file_path}: {e}")
@@ -48,6 +65,7 @@ def read_json_file(file_path):
 
 
 def process_file(file_path):
+    print("starting to process file: ", file_path)
     file_type = magic.from_file(file_path, mime=True)
     if file_type in ['text/plain', 'text/markdown', 'JSON']:
         return read_text_file(file_path)
@@ -66,36 +84,56 @@ def read_file_content(context):
             file_text = process_file(file_path)
             if file_text:
                 file_strings.append(file_text)
-
+    text = ' '.join(file_strings)
+    if len(text) == 0:
+        logging.error(f"Error reading files, text is empty")
     return ' '.join(file_strings)
 
 
 
 def parse_qa_to_json(response_string):
-    # Adjusted regex to capture question-answer pairs more flexibly
-    # This pattern accounts for optional numbering and different question/answer lead-ins
-    pattern = re.compile(
-        r"\d*\.\s*Question:\s*(.*?)\nAnswer:\s*(.*?)(?=\n\d*\.\s*Question:|\Z)",
-        re.DOTALL
-    )
-
-    # Find all matches in the response string
-    matches = pattern.findall(response_string)
-
-    # Convert matches to a structured format
-    qa_list = [{"question": match[0].strip(), "answer": match[1].strip()} for match in matches]
-
-    # Convert the list to a JSON string
+    split_lines = response_string.split("\n")
+    start,end = None,None
+    # must use set to avoid duplicate question/answer pairs due to async function calls
+    qa_set = set()
+    for i in range(len(split_lines)):
+        line = split_lines[i]
+        # starting to find "Question"
+        if not start:
+            # Once found, set start to this line number
+            if '"Question":' in line:
+                start = i
+        else:
+            # "Question" has been found, find "Answer", once found, set end to this line number
+            if '"Answer":' in line:
+                end = i
+            # found Question means we have reached the end of the question, so add it to qa_list
+            elif '"Question":' in line:
+                question = " ".join(" ".join(split_lines[start:end]).split('"Question":')[1].split('"')[1:-1])
+                answer = " ".join(" ".join(split_lines[end:i]).split('"Answer":')[1].split('"')[1:-1])
+                start,end = i,None
+                qa_set.add((question, answer))
+        # adding last question back to qa_list
+        if start and end:
+            question = " ".join(" ".join(split_lines[start:end]).split('"Question":')[1].split('"')[1:-1])
+            answer = " ".join(" ".join(split_lines[end:i]).split('"Answer":')[1].split('"')[1:-1])
+            qa_set.add((question, answer))
+    qa_list = [{"question": q, "answer":a} for q,a in qa_set]
     return json.dumps(qa_list, indent=4)
 
 
 async def prepare_and_send_request(chat_service, api_context: dict, document_content: str, total_questions: int) -> dict:
     prompt_for_system = api_context['question_prompt_template'].format(total_questions=total_questions, language=api_context["language"])
     chat_request_payload = [{'role': 'system', 'content': prompt_for_system}, {'role': 'user', 'content': document_content}]
+    result = await chat_service.execute_chat_request_async(api_context, chat_request_payload)
+    if not result:
+        return {}
     return json.loads(await chat_service.execute_chat_request_async(api_context, chat_request_payload))
 
 async def generate_question_batches(chat_service, api_context: dict):
     document_text = read_file_content(api_context)
+    if len(document_text)== 0:
+        logging.error(f"Error reading files, document_text is empty")
     if api_context["model"] in ["meta-llama-3-70b-instruct","meta-llama-3-8b-instruct"]:
         tokenizer = AutoTokenizer.from_pretrained("meta-llama/Meta-Llama-3-8B", pad_token="</s>", padding_side="right")
     else:
@@ -114,7 +152,11 @@ async def generate_question_batches(chat_service, api_context: dict):
         #Distribute extra questions across the first few batches
         questions_in_current_batch = base_questions_per_batch + (1 if batch_index < extra_questions else 0)
         print(f"Batch {batch_index + 1} - {questions_in_current_batch} questions ********")
-        generation_tasks.append(prepare_and_send_request(chat_service, api_context, batch_content, questions_in_current_batch))
+        try:
+            result = prepare_and_send_request(chat_service, api_context, batch_content, questions_in_current_batch)
+            generation_tasks.append(result)
+        except Exception as e:
+            print(f"Error during chat request execution: {e}")
 
     question_generation_results = await asyncio.gather(*generation_tasks)