瀏覽代碼

adding data pipeline

Hamid Shojanazeri 1 年之前
父節點
當前提交
ebc826d629

+ 12 - 0
tutorials/chatbot/data_pipelines/config.py

@@ -0,0 +1,12 @@
+import yaml
+import os
+
+
+def load_config():
+    # Read the YAML configuration file
+    file_path = "./config.yaml"
+    with open(file_path, "r") as file:
+        config = yaml.safe_load(file)
+
+    config["api_key"] = os.getenv('OPENAI_API_KEY')
+    return config

+ 29 - 0
tutorials/chatbot/data_pipelines/config.yaml

@@ -0,0 +1,29 @@
+question_generator: >
+  You are a quiz expert, you will be provided with a document,
+  read it and generate question and answer pairs
+  that are most likely be asked by a use of llama that just want to start, 
+  please make sure you follow those rules,
+  1. Generate only {num_data} question answer pairs.
+  2. Generate in {language}.
+  3. The questions can be answered based *solely* on the given passage. 
+  4. Avoid asking questions with similar meaning.
+  5. Make the answer as concise as possible, it should be at most 60 words.
+  6. Provide relevant links from the document to support the answer.
+  7. Never use any abbreviation.
+  8. Return the result in json format with the template: 
+    [
+      {{
+        "question": "your question A.",
+        "answer": "your answer to question A."
+      }},
+      {{
+        "question": "your question B.",
+        "answer": "your answer to question B."
+      }}
+    ]
+
+data_dir: "./data"
+
+language: "English"
+
+num_data_default: 2

+ 49 - 0
tutorials/chatbot/data_pipelines/file_handler.py

@@ -0,0 +1,49 @@
+import os
+import magic
+from PyPDF2 import PdfReader
+import logging
+
+# Initialize logging
+logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
+
+def read_text_file(file_path):
+    try:
+        with open(file_path, 'r') as f:
+            return f.read().strip() + ' '
+    except Exception as e:
+        logging.error(f"Error reading text file {file_path}: {e}")
+    return ''
+
+def read_pdf_file(file_path):
+    try:
+        with open(file_path, 'rb') as f:
+            pdf_reader = PdfReader(f)
+            num_pages = len(pdf_reader.pages)
+            file_text = [pdf_reader.pages[page_num].extract_text().strip() + ' ' for page_num in range(num_pages)]
+            return ''.join(file_text)
+    except Exception as e:
+        logging.error(f"Error reading PDF file {file_path}: {e}")
+    return ''
+
+def process_file(file_path):
+    file_type = magic.from_file(file_path, mime=True)
+    if file_type in ['text/plain', 'text/markdown']:
+        return read_text_file(file_path)
+    elif file_type == 'application/pdf':
+        return read_pdf_file(file_path)
+    else:
+        logging.warning(f"Unsupported file type {file_type} for file {file_path}")
+        return ''
+
+def get_file_string(context):
+    file_strings = []
+
+    for root, _, files in os.walk(context['data_dir']):
+        for file in files:
+            file_path = os.path.join(root, file)
+            file_text = process_file(file_path)
+            if file_text:
+                file_strings.append(file_text)
+
+    return ' '.join(file_strings)
+

+ 61 - 0
tutorials/chatbot/data_pipelines/generate_question_answers.py

@@ -0,0 +1,61 @@
+import argparse
+import asyncio
+import json
+from config import load_config
+from model_handler import generate_questions
+from itertools import chain
+import logging
+import aiofiles  # Ensure aiofiles is installed for async file operations
+
+# Configure logging to include the timestamp, log level, and message
+logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
+
+async def main(context):
+    try:
+        logging.info("Starting to generate question/answer pairs.")
+        data = await generate_questions(context)
+        if not data:
+            logging.warning("No data generated. Please check the input context or model configuration.")
+            return
+
+        flattened_list = list(chain.from_iterable(data))
+        logging.info(f"Successfully generated {len(flattened_list)} question/answer pairs.")
+
+        # Use asynchronous file operation for writing to the file
+        async with aiofiles.open("data.json", "w") as output_file:
+            await output_file.write(json.dumps(flattened_list, indent=4))
+        
+        logging.info("Data successfully written to 'data.json'. Process completed.")
+
+    except Exception as e:
+        logging.error(f"An unexpected error occurred during the process: {e}")
+
+def parse_arguments(context):
+    # Define command line arguments for the script
+    parser = argparse.ArgumentParser(
+        description="Generate question/answer pairs from documentation."
+    )
+    parser.add_argument(
+        "-n", "--num_data",
+        type=int,
+        default=context["num_data_default"],
+        help="Specify the number of question/answer pairs to generate."
+    )
+    parser.add_argument(
+        "-m", "--model",
+        choices=["gpt-3.5-turbo-16k", "gpt-3.5-turbo-0125"],
+        default="gpt-3.5-turbo-16k",
+        help="Select the model to use for generation."
+    )
+    return parser.parse_args()
+
+if __name__ == "__main__":
+    logging.info("Initializing the process and loading configuration...")
+    context = load_config()
+    args = parse_arguments(context)
+
+    context["num_data"] = args.num_data
+    context["model"] = args.model
+
+    logging.info(f"Configuration loaded. Generating {args.num_data} question/answer pairs using model '{args.model}'.")
+    asyncio.run(main(context))

+ 52 - 0
tutorials/chatbot/data_pipelines/model_handler.py

@@ -0,0 +1,52 @@
+import os
+import openai
+import asyncio
+from functools import partial
+import json
+from token_processor import split_string_by_token_length
+from file_handler import get_file_string
+
+
+# Throttling to manage rate limits
+model_rate_limits = 100
+max_concurrent_requests = int(model_rate_limits * 0.75)
+throttler = asyncio.Semaphore(max_concurrent_requests)
+
+async def send_chat_async(context: dict, request):
+    async with throttler:
+        try:
+            loop = asyncio.get_running_loop()
+            # Wrap the synchronous OpenAI API call with partial to pass arguments
+            func = partial(
+                openai.ChatCompletion.create,
+                model=context['model'],
+                messages=request,
+                temperature=0.0
+            )
+            # Run the synchronous function in a separate thread
+            resp = await loop.run_in_executor(None, func)
+            # Process the response as before
+            return next((msg['message']['content'] for msg in resp.choices if msg['message']['role'] == 'assistant'), "")
+        except Exception as e:
+            print(f"Error in send_chat_async: {e}")
+            return ""
+
+async def request_question(context: dict, input_str: str, num_data: int) -> dict:
+    system_prompt = context['question_generator'].format(num_data=num_data, language=context["language"])
+    request = [{'role': 'system', 'content': system_prompt}, {'role': 'user', 'content': input_str}]
+    return json.loads(await send_chat_async(context, request))
+
+async def generate_questions(context: dict):
+    
+    doc_string = get_file_string(context)
+    batches = split_string_by_token_length(context, doc_string)
+    num_questions_per_batch = context["num_data"] // len(batches)
+
+    tasks = []
+    for idx, batch in enumerate(batches):
+        num_questions = num_questions_per_batch + 1 if idx == len(batches) - 1 and len(batches) * num_questions_per_batch < context["num_data"] else num_questions_per_batch
+        tasks.append(request_question(context, batch, num_questions))
+
+    results = await asyncio.gather(*tasks)
+    return results
+