Hamid Shojanazeri пре 1 година
родитељ
комит
9fe26e7b9a

+ 108 - 0
tutorials/chatbot/data_pipelines/generator_utils.py

@@ -0,0 +1,108 @@
+import os
+import openai
+import asyncio
+import magic
+from PyPDF2 import PdfReader
+from functools import partial
+import json
+from token_processor import split_text_into_tokenized_chunks
+# from file_handler import read_file_content
+import logging
+
+# Initialize logging
+logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
+
+# Manage rate limits with throttling
+rate_limit_threshold = 100
+allowed_concurrent_requests = int(rate_limit_threshold * 0.75)
+request_limiter = asyncio.Semaphore(allowed_concurrent_requests)
+
+def read_text_file(file_path):
+    try:
+        with open(file_path, 'r') as f:
+            return f.read().strip() + ' '
+    except Exception as e:
+        logging.error(f"Error reading text file {file_path}: {e}")
+    return ''
+
+def read_pdf_file(file_path):
+    try:
+        with open(file_path, 'rb') as f:
+            pdf_reader = PdfReader(f)
+            num_pages = len(pdf_reader.pages)
+            file_text = [pdf_reader.pages[page_num].extract_text().strip() + ' ' for page_num in range(num_pages)]
+            return ''.join(file_text)
+    except Exception as e:
+        logging.error(f"Error reading PDF file {file_path}: {e}")
+    return ''
+
+def process_file(file_path):
+    file_type = magic.from_file(file_path, mime=True)
+    if file_type in ['text/plain', 'text/markdown']:
+        return read_text_file(file_path)
+    elif file_type == 'application/pdf':
+        return read_pdf_file(file_path)
+    else:
+        logging.warning(f"Unsupported file type {file_type} for file {file_path}")
+        return ''
+
+def read_file_content(context):
+    file_strings = []
+
+    for root, _, files in os.walk(context['data_dir']):
+        for file in files:
+            file_path = os.path.join(root, file)
+            file_text = process_file(file_path)
+            if file_text:
+                file_strings.append(file_text)
+
+    return ' '.join(file_strings)
+
+
+async def execute_chat_request_async(api_context: dict, chat_request):
+    async with request_limiter:
+        try:
+            event_loop = asyncio.get_running_loop()
+            # Prepare the OpenAI API call
+            openai_chat_call = partial(
+                openai.ChatCompletion.create,
+                model=api_context['model'],
+                messages=chat_request,
+                temperature=0.0
+            )
+            # Execute the API call in a separate thread
+            response = await event_loop.run_in_executor(None, openai_chat_call)
+            # Extract and return the assistant's response
+            return next((message['message']['content'] for message in response.choices if message['message']['role'] == 'assistant'), "")
+        except Exception as error:
+            print(f"Error during chat request execution: {error}")
+            return ""
+
+async def prepare_and_send_request(api_context: dict, document_content: str, total_questions: int) -> dict:
+    prompt_for_system = api_context['question_prompt_template'].format(total_questions=total_questions, language=api_context["language"])
+    chat_request_payload = [{'role': 'system', 'content': prompt_for_system}, {'role': 'user', 'content': document_content}]
+    return json.loads(await execute_chat_request_async(api_context, chat_request_payload))
+
+async def generate_question_batches(api_context: dict):
+    
+    document_text = read_file_content(api_context)
+    print("completed step 1")
+    document_batches = split_text_into_tokenized_chunks(api_context, document_text)
+    print("completed step 2")
+
+    questions_per_batch = api_context["total_questions"] // len(document_batches)
+    print("completed step 3")
+
+    generation_tasks = []
+    for batch_index, batch_content in enumerate(document_batches):
+        questions_in_current_batch = questions_per_batch + 1 if batch_index == len(document_batches) - 1 and len(document_batches) * questions_per_batch < api_context["total_questions"] else questions_per_batch
+        generation_tasks.append(prepare_and_send_request(api_context, batch_content, questions_in_current_batch))
+    print("completed step 4")
+
+
+    question_generation_results = await asyncio.gather(*generation_tasks)
+    print("completed step 5")
+
+    return question_generation_results
+
+

+ 0 - 52
tutorials/chatbot/data_pipelines/model_handler.py

@@ -1,52 +0,0 @@
-import os
-import openai
-import asyncio
-from functools import partial
-import json
-from token_processor import split_string_by_token_length
-from file_handler import get_file_string
-
-
-# Throttling to manage rate limits
-model_rate_limits = 100
-max_concurrent_requests = int(model_rate_limits * 0.75)
-throttler = asyncio.Semaphore(max_concurrent_requests)
-
-async def send_chat_async(context: dict, request):
-    async with throttler:
-        try:
-            loop = asyncio.get_running_loop()
-            # Wrap the synchronous OpenAI API call with partial to pass arguments
-            func = partial(
-                openai.ChatCompletion.create,
-                model=context['model'],
-                messages=request,
-                temperature=0.0
-            )
-            # Run the synchronous function in a separate thread
-            resp = await loop.run_in_executor(None, func)
-            # Process the response as before
-            return next((msg['message']['content'] for msg in resp.choices if msg['message']['role'] == 'assistant'), "")
-        except Exception as e:
-            print(f"Error in send_chat_async: {e}")
-            return ""
-
-async def request_question(context: dict, input_str: str, num_data: int) -> dict:
-    system_prompt = context['question_generator'].format(num_data=num_data, language=context["language"])
-    request = [{'role': 'system', 'content': system_prompt}, {'role': 'user', 'content': input_str}]
-    return json.loads(await send_chat_async(context, request))
-
-async def generate_questions(context: dict):
-    
-    doc_string = get_file_string(context)
-    batches = split_string_by_token_length(context, doc_string)
-    num_questions_per_batch = context["num_data"] // len(batches)
-
-    tasks = []
-    for idx, batch in enumerate(batches):
-        num_questions = num_questions_per_batch + 1 if idx == len(batches) - 1 and len(batches) * num_questions_per_batch < context["num_data"] else num_questions_per_batch
-        tasks.append(request_question(context, batch, num_questions))
-
-    results = await asyncio.gather(*tasks)
-    return results
-