123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200 |
- import argparse
- import json
- import os
- from typing import Dict, List
- import yaml
- from datasets import load_dataset, load_from_disk
- from tqdm import tqdm
- from vllm import LLM, SamplingParams
- def load_system_prompt(yaml_path: str) -> str:
- """Load system prompt from a YAML file."""
- with open(yaml_path, "r") as f:
- config = yaml.safe_load(f)
- return config["system_prompt"]
- def setup_llm(
- model_name: str,
- tensor_parallel_size: int = 1,
- gpu_memory_utilization: float = 0.9,
- max_model_len: int = 128000,
- gpu_ids: List[int] = None,
- ) -> LLM:
- """Initialize the vLLM LLM with specified parameters for multi-GPU support."""
- # If specific GPUs are requested, set CUDA_VISIBLE_DEVICES
- if gpu_ids is not None:
- os.environ["CUDA_VISIBLE_DEVICES"] = ",".join(map(str, gpu_ids))
- llm = LLM(
- model=model_name,
- tensor_parallel_size=tensor_parallel_size,
- gpu_memory_utilization=gpu_memory_utilization,
- max_model_len=max_model_len,
- )
- return llm
- def create_messages(system_prompt: str, conversation: str) -> List[Dict[str, str]]:
- """Create the messages list for the model input."""
- return [
- {"role": "system", "content": system_prompt},
- {"role": "user", "content": conversation},
- ]
- def format_prompt(system_prompt: str, conversation: str) -> str:
- """Format the system prompt and conversation into the specific chat template format."""
- return (
- f"<|begin_of_text|><|start_header_id|>system<|end_header_id|>{system_prompt}<|eot_id|>"
- f"<|start_header_id|>user<|end_header_id|>{conversation}"
- )
- def process_dataset(
- dataset,
- llm: LLM,
- system_prompt: str,
- output_file: str,
- start_index: int = 0,
- end_index: int = None,
- max_new_tokens: int = 128000,
- ) -> None:
- """Process the dataset using vLLM."""
- sampling_params = SamplingParams(
- max_tokens=max_new_tokens,
- temperature=0.7,
- top_p=0.95,
- )
- # Handle end_index
- if end_index is None:
- end_index = len(dataset)
- else:
- end_index = min(end_index, len(dataset))
- # Validate indices
- if start_index < 0:
- start_index = 0
- if start_index >= len(dataset):
- raise ValueError(
- f"Start index {start_index} is larger than dataset size {len(dataset)}"
- )
- if start_index >= end_index:
- raise ValueError(
- f"Start index {start_index} must be less than end index {end_index}"
- )
- # Select the specified range
- dataset_slice = dataset.select(range(start_index, end_index))
- # Process examples one at a time
- with open(output_file, "w") as f:
- for item in tqdm(
- dataset_slice, desc=f"Processing rows {start_index} to {end_index}"
- ):
- # Format the prompt as a single string
- prompt = format_prompt(system_prompt, item["conversations"])
- # Generate the response
- output = llm.generate(prompt, sampling_params)[0]
- print(output.outputs[0].text)
- # Save the result
- result = {
- "id": item["id"],
- "conversations": output.outputs[0].text,
- }
- f.write(json.dumps(result) + "\n")
- def main():
- parser = argparse.ArgumentParser(
- description="Process dataset using vLLM with multi-GPU support"
- )
- parser.add_argument(
- "--model", type=str, required=True, help="Name or path of the model to use"
- )
- parser.add_argument(
- "--config",
- type=str,
- required=True,
- help="Path to YAML config file containing system prompt",
- )
- parser.add_argument(
- "--output-file",
- type=str,
- default="processed_outputs.jsonl",
- help="Output file path",
- )
- parser.add_argument(
- "--dataset-path", type=str, required=True, help="Path to the dataset"
- )
- parser.add_argument(
- "--gpu-ids",
- type=str,
- help="Comma-separated list of GPU IDs to use (e.g., '0,1,2,3')",
- )
- parser.add_argument(
- "--tensor-parallel-size",
- type=int,
- default=1,
- help="Number of GPUs to use for tensor parallelism",
- )
- parser.add_argument(
- "--gpu-memory-utilization",
- type=float,
- default=0.9,
- help="Target GPU memory utilization (0.0 to 1.0)",
- )
- # Add new arguments for range specification
- parser.add_argument(
- "--start-index",
- type=int,
- default=0,
- help="Starting index in the dataset (inclusive)",
- )
- parser.add_argument(
- "--end-index",
- type=int,
- help="Ending index in the dataset (exclusive). If not specified, processes until the end.",
- )
- args = parser.parse_args()
- # Parse GPU IDs if provided
- gpu_ids = None
- if args.gpu_ids:
- gpu_ids = [int(gpu_id) for gpu_id in args.gpu_ids.split(",")]
- # Load system prompt from YAML
- system_prompt = load_system_prompt(args.config)
- # Load dataset
- dataset = load_from_disk(args.dataset_path)
- dataset = dataset.select(range(0, 2000))
- # Initialize vLLM with multi-GPU support
- llm = setup_llm(
- model_name=args.model,
- tensor_parallel_size=args.tensor_parallel_size,
- gpu_memory_utilization=args.gpu_memory_utilization,
- gpu_ids=gpu_ids,
- )
- # Process dataset
- process_dataset(
- dataset=dataset,
- llm=llm,
- system_prompt=system_prompt,
- output_file=args.output_file,
- start_index=args.start_index,
- end_index=args.end_index,
- )
- if __name__ == "__main__":
- main()
|