file_handler.py 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
  1. import os
  2. import magic
  3. from PyPDF2 import PdfReader
  4. import logging
  5. # Initialize logging
  6. logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
  7. def read_text_file(file_path):
  8. try:
  9. with open(file_path, 'r') as f:
  10. return f.read().strip() + ' '
  11. except Exception as e:
  12. logging.error(f"Error reading text file {file_path}: {e}")
  13. return ''
  14. def read_pdf_file(file_path):
  15. try:
  16. with open(file_path, 'rb') as f:
  17. pdf_reader = PdfReader(f)
  18. num_pages = len(pdf_reader.pages)
  19. file_text = [pdf_reader.pages[page_num].extract_text().strip() + ' ' for page_num in range(num_pages)]
  20. return ''.join(file_text)
  21. except Exception as e:
  22. logging.error(f"Error reading PDF file {file_path}: {e}")
  23. return ''
  24. def process_file(file_path):
  25. file_type = magic.from_file(file_path, mime=True)
  26. if file_type in ['text/plain', 'text/markdown']:
  27. return read_text_file(file_path)
  28. elif file_type == 'application/pdf':
  29. return read_pdf_file(file_path)
  30. else:
  31. logging.warning(f"Unsupported file type {file_type} for file {file_path}")
  32. return ''
  33. def get_file_string(context):
  34. file_strings = []
  35. for root, _, files in os.walk(context['data_dir']):
  36. for file in files:
  37. file_path = os.path.join(root, file)
  38. file_text = process_file(file_path)
  39. if file_text:
  40. file_strings.append(file_text)
  41. return ' '.join(file_strings)