| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743 |
- #!/usr/bin/env -S uv --quiet run --script
- # /// script
- # requires-python = ">=3.12"
- # dependencies = [
- # "bs4",
- # "httpx",
- # "pydantic",
- # "python-dateutil",
- # "python-frontmatter",
- # "python-slugify",
- # "pytz",
- # "rich",
- # "typer",
- # "markdown-it-py",
- # "sqlmodel",
- # ]
- # ///
- import json
- import os
- import re
- from datetime import datetime
- from pathlib import Path
- from typing import Any
- from typing import Optional
- from urllib.parse import urlparse
- import frontmatter
- import httpx
- import typer
- from bs4 import BeautifulSoup
- from bs4 import Tag
- from markdown_it import MarkdownIt
- from pydantic import BaseModel
- from pydantic import ConfigDict
- from pydantic import Field
- from rich import print
- from rich.console import Console
- from rich.progress import track
- from rich.table import Table
- from slugify import slugify
- from sqlmodel import Field as SQLField
- from sqlmodel import Session
- from sqlmodel import SQLModel
- from sqlmodel import create_engine
- from sqlmodel import select
- app = typer.Typer(
- add_help_option=False,
- no_args_is_help=True,
- rich_markup_mode="rich",
- )
- class Project(BaseModel):
- """Model representing a Django project from the awesome list."""
- model_config = ConfigDict(extra="allow")
- name: str
- description: str
- url: str
- category: str
- slug: str = Field(default="")
- tags: list[str] = Field(default_factory=list)
- github_stars: int | None = None
- github_forks: int | None = None
- github_last_update: str | None = None
- github_last_commit: str | None = None
- previous_urls: list[str] = Field(default_factory=list)
- def __init__(self, **data):
- super().__init__(**data)
- if not self.slug:
- self.slug = slugify(self.name)
- # SQLModel database model
- class ProjectDB(SQLModel, table=True):
- """SQLModel for storing projects in SQLite database."""
- __tablename__ = "projects"
- id: Optional[int] = SQLField(default=None, primary_key=True)
- name: str = SQLField(index=True)
- description: str
- url: str = SQLField(unique=True)
- category: str = SQLField(index=True)
- slug: str = SQLField(unique=True, index=True)
- tags: str = SQLField(default="[]") # JSON string
- github_stars: Optional[int] = SQLField(default=None, index=True)
- github_forks: Optional[int] = SQLField(default=None)
- github_last_update: Optional[str] = SQLField(default=None)
- github_last_commit: Optional[str] = SQLField(default=None, index=True)
- previous_urls: str = SQLField(default="[]") # JSON string
- @classmethod
- def from_project(cls, project: Project) -> "ProjectDB":
- """Convert a Project to ProjectDB."""
- return cls(
- name=project.name,
- description=project.description,
- url=project.url,
- category=project.category,
- slug=project.slug,
- tags=json.dumps(project.tags),
- github_stars=project.github_stars,
- github_forks=project.github_forks,
- github_last_update=project.github_last_update,
- github_last_commit=project.github_last_commit,
- previous_urls=json.dumps(project.previous_urls),
- )
- def to_project(self) -> Project:
- """Convert ProjectDB back to Project."""
- return Project(
- name=self.name,
- description=self.description,
- url=self.url,
- category=self.category,
- slug=self.slug,
- tags=json.loads(self.tags),
- github_stars=self.github_stars,
- github_forks=self.github_forks,
- github_last_update=self.github_last_update,
- github_last_commit=self.github_last_commit,
- previous_urls=json.loads(self.previous_urls),
- )
- # Database configuration
- DATABASE_PATH = Path("projects.db")
- DATABASE_URL = f"sqlite:///{DATABASE_PATH}"
- console = Console()
- def get_engine():
- """Get SQLModel engine."""
- return create_engine(DATABASE_URL, echo=False)
- def init_db():
- """Initialize the database and create tables."""
- engine = get_engine()
- SQLModel.metadata.create_all(engine)
- return engine
- def parse_project_line(line: Tag, category: str) -> Project | None:
- """Parse a project line from the markdown and return a Project object."""
- try:
- # Find the project link
- link = line.find("a")
- if not link:
- return None
- name = link.text.strip()
- url = link.get("href", "").strip()
- # Get description (text after the link)
- description = line.text.replace(name, "").strip()
- description = re.sub(r"^\s*-\s*", "", description) # Remove leading dash
- description = re.sub(r"^\s*", "", description) # Remove leading whitespace
- if not all([name, url, description]):
- return None
- return Project(name=name, description=description, url=url, category=category)
- except Exception as e:
- print(f"[red]Error parsing project line: {e}[/red]")
- return None
- def read_readme(file_path: Path) -> str:
- """Read README content from local file and convert to HTML."""
- markdown_content = file_path.read_text()
- md = MarkdownIt()
- html_content = md.render(markdown_content)
- return html_content
- def parse_readme(content: str) -> list[Project]:
- """Parse README content and extract projects."""
- soup = BeautifulSoup(content, "html.parser")
- projects = []
- current_category = ""
- for element in soup.find_all(["h2", "h3", "li"]):
- if element.name in ["h2", "h3"]:
- current_category = element.text.strip()
- elif element.name == "li" and current_category:
- if current_category == "Contents":
- continue
- project = parse_project_line(element, current_category)
- if project:
- projects.append(project)
- return projects
- def merge_project_data(existing: dict[str, Any], new: dict[str, Any]) -> dict[str, Any]:
- """
- Merge existing project data with new data, preserving existing values
- while updating with new information where appropriate.
- """
- # Start with the existing data
- merged = existing.copy()
- # Always update core fields from the README
- core_fields = {"name", "url", "category"}
- for field in core_fields:
- if field in new:
- # If URL is changing, store the old URL in previous_urls
- if field == "url" and new["url"] != existing.get("url"):
- previous_urls = merged.get("previous_urls", [])
- old_url = existing.get("url")
- if old_url and old_url not in previous_urls:
- previous_urls.append(old_url)
- merged["previous_urls"] = previous_urls
- merged[field] = new[field]
- # Smart merge for description - update only if meaningfully different
- if "description" in new and new["description"] != existing.get("description", ""):
- merged["description"] = new["description"]
- # Update GitHub metrics if they exist in new data
- github_fields = {"github_stars", "github_forks", "github_last_update", "github_last_commit"}
- for field in github_fields:
- if field in new and new[field] is not None:
- merged[field] = new[field]
- return merged
- def save_project(project: Project, output_dir: Path):
- """Save project as a markdown file with frontmatter, preserving and merging existing content."""
- output_file = output_dir / f"{project.slug}.md"
- project_data = project.model_dump(exclude_none=True)
- if output_file.exists():
- try:
- # Load existing file
- existing_post = frontmatter.load(output_file)
- existing_data = dict(existing_post.metadata)
- # Merge data, favoring preservation of existing content
- merged_data = merge_project_data(existing_data, project_data)
- # Create new post with merged data but keep existing content
- post = frontmatter.Post(existing_post.content, **merged_data)
- except Exception as e:
- print(
- f"[yellow]Warning: Could not load existing file {output_file}, creating new: {e}[/yellow]"
- )
- post = frontmatter.Post(project.description, **project_data)
- else:
- # Create new file
- post = frontmatter.Post(project.description, **project_data)
- output_file.write_text(frontmatter.dumps(post))
- def extract_github_info(url: str) -> dict[str, str] | None:
- """Extract owner and repo from a GitHub URL."""
- parsed = urlparse(url)
- if parsed.netloc != "github.com":
- return None
- parts = parsed.path.strip("/").split("/")
- if len(parts) >= 2:
- return {"owner": parts[0], "repo": parts[1]}
- return None
- def get_github_metrics(
- owner: str, repo: str, client: httpx.Client
- ) -> tuple[dict, str | None]:
- """
- Fetch GitHub metrics for a repository.
- Returns a tuple of (metrics_dict, new_url) where new_url is set if the repo has moved.
- """
- headers = {}
- if github_token := os.environ.get("GITHUB_TOKEN"):
- headers["Authorization"] = f"token {github_token}"
- api_url = f"https://api.github.com/repos/{owner}/{repo}"
- try:
- response = client.get(
- api_url,
- headers=headers,
- timeout=10.0,
- follow_redirects=True, # Enable following redirects
- )
- # Check if we followed a redirect
- new_url = None
- if len(response.history) > 0:
- for r in response.history:
- if r.status_code == 301:
- # Get the new location from the API response
- data = response.json()
- new_url = data.get("html_url")
- if new_url:
- print(
- f"[yellow]Repository moved: {owner}/{repo} -> {new_url}[/yellow]"
- )
- break
- response.raise_for_status()
- data = response.json()
- metrics = {
- "github_stars": data["stargazers_count"],
- "github_forks": data["forks_count"],
- "github_last_update": data["updated_at"],
- }
- # Fetch last commit date
- commits_url = f"https://api.github.com/repos/{owner}/{repo}/commits"
- try:
- commits_response = client.get(
- commits_url,
- headers=headers,
- params={"per_page": 1},
- timeout=10.0,
- follow_redirects=True,
- )
- commits_response.raise_for_status()
- commits_data = commits_response.json()
- if commits_data and len(commits_data) > 0:
- metrics["github_last_commit"] = commits_data[0]["commit"]["committer"]["date"]
- except httpx.HTTPError as e:
- print(f"[yellow]Warning: Could not fetch commits for {owner}/{repo}: {str(e)}[/yellow]")
- return metrics, new_url
- except httpx.HTTPError as e:
- print(f"[red]Error fetching GitHub metrics for {owner}/{repo}: {str(e)}[/red]")
- return {}, None
- def load_project(file_path: Path) -> Project | None:
- """Load a project from a markdown file."""
- try:
- post = frontmatter.load(file_path)
- return Project(**post.metadata)
- except Exception as e:
- print(f"[red]Error loading project from {file_path}: {str(e)}[/red]")
- return None
- @app.command()
- def parse(readme_path: Path = Path("README.md"), output_dir: str = "_projects"):
- """
- Parse local Awesome Django README and create individual project files with frontmatter.
- Preserves existing file content and metadata while updating with new information from README.
- """
- if not readme_path.exists():
- print(f"[red]Error: README file not found at {readme_path}[/red]")
- raise typer.Exit(1)
- print(f"[bold blue]Reading README from {readme_path}...[/bold blue]")
- # Create output directory
- output_path = Path(output_dir)
- output_path.mkdir(exist_ok=True)
- # Read and parse README
- content = read_readme(readme_path)
- projects = parse_readme(content)
- print(f"[green]Found {len(projects)} projects[/green]")
- # Save individual project files
- for project in projects:
- save_project(project, output_path)
- print(f"[green]Updated {project.name} in {project.slug}.md[/green]")
- @app.command()
- def update_metrics(projects_dir: Path = Path("_projects"), batch_size: int = 50):
- """
- Update GitHub metrics (stars, forks, last update) for all projects.
- """
- if not projects_dir.exists():
- print(f"[red]Error: Projects directory not found at {projects_dir}[/red]")
- raise typer.Exit(1)
- print(
- f"[bold blue]Updating GitHub metrics for projects in {projects_dir}...[/bold blue]"
- )
- # Load all projects
- project_files = list(projects_dir.glob("*.md"))
- projects = []
- for file in project_files:
- if project := load_project(file):
- projects.append((file, project))
- print(f"[green]Found {len(projects)} projects to update[/green]")
- # Update metrics in batches to avoid rate limiting
- with httpx.Client() as client:
- for i in track(
- range(0, len(projects), batch_size), description="Updating projects"
- ):
- batch = projects[i : i + batch_size]
- for file_path, project in batch:
- if github_info := extract_github_info(project.url):
- metrics, new_url = get_github_metrics(
- github_info["owner"], github_info["repo"], client
- )
- if metrics:
- # Update project with new metrics
- for key, value in metrics.items():
- setattr(project, key, value)
- # Update URL if repository has moved
- if new_url and new_url != project.url:
- # Store the old URL in previous_urls
- if not hasattr(project, "previous_urls"):
- project.previous_urls = []
- project.previous_urls.append(project.url)
- # Update to new URL
- project.url = new_url
- print(
- f"[yellow]Updated URL for {project.name}: {project.url}[/yellow]"
- )
- save_project(project, projects_dir)
- print(f"[green]Updated metrics for {project.name}[/green]")
- print("[bold blue]Finished updating GitHub metrics![/bold blue]")
- @app.command()
- def sync_db(projects_dir: Path = Path("_projects")):
- """
- Sync projects from markdown files to SQLite database.
- """
- if not projects_dir.exists():
- print(f"[red]Error: Projects directory not found at {projects_dir}[/red]")
- raise typer.Exit(1)
- print(f"[bold blue]Syncing projects to {DATABASE_PATH}...[/bold blue]")
- engine = init_db()
- # Load all projects from markdown files
- project_files = list(projects_dir.glob("*.md"))
- projects_loaded = 0
- with Session(engine) as session:
- # Clear existing data
- session.exec(select(ProjectDB)).all()
- for existing in session.exec(select(ProjectDB)).all():
- session.delete(existing)
- session.commit()
- # Load new data
- for file in track(project_files, description="Loading projects"):
- if project := load_project(file):
- db_project = ProjectDB.from_project(project)
- session.add(db_project)
- projects_loaded += 1
- session.commit()
- print(f"[green]Synced {projects_loaded} projects to {DATABASE_PATH}[/green]")
- @app.command()
- def query(
- category: Optional[str] = typer.Option(None, "--category", "-c", help="Filter by category"),
- min_stars: Optional[int] = typer.Option(None, "--min-stars", "-s", help="Minimum GitHub stars"),
- limit: int = typer.Option(20, "--limit", "-l", help="Maximum results to show"),
- sort_by: str = typer.Option("stars", "--sort", help="Sort by: stars, name, commits"),
- ):
- """
- Query projects from the database with filters.
- """
- if not DATABASE_PATH.exists():
- print("[red]Database not found. Run 'sync-db' first.[/red]")
- raise typer.Exit(1)
- engine = get_engine()
- with Session(engine) as session:
- statement = select(ProjectDB)
- if category:
- statement = statement.where(ProjectDB.category == category)
- if min_stars:
- statement = statement.where(ProjectDB.github_stars >= min_stars)
- # Sorting
- if sort_by == "stars":
- statement = statement.order_by(ProjectDB.github_stars.desc())
- elif sort_by == "name":
- statement = statement.order_by(ProjectDB.name)
- elif sort_by == "commits":
- statement = statement.order_by(ProjectDB.github_last_commit.desc())
- statement = statement.limit(limit)
- results = session.exec(statement).all()
- if not results:
- print("[yellow]No projects found matching criteria.[/yellow]")
- return
- table = Table(title=f"Projects ({len(results)} results)")
- table.add_column("Name", style="cyan", no_wrap=True)
- table.add_column("Category", style="green")
- table.add_column("Stars", justify="right", style="yellow")
- table.add_column("Last Commit", style="magenta")
- for p in results:
- stars = str(p.github_stars) if p.github_stars else "-"
- last_commit = p.github_last_commit[:10] if p.github_last_commit else "-"
- table.add_row(p.name, p.category, stars, last_commit)
- console.print(table)
- @app.command()
- def top(
- limit: int = typer.Option(20, "--limit", "-l", help="Number of projects to show"),
- category: Optional[str] = typer.Option(None, "--category", "-c", help="Filter by category"),
- ):
- """
- Show top projects by GitHub stars.
- """
- if not DATABASE_PATH.exists():
- print("[red]Database not found. Run 'sync-db' first.[/red]")
- raise typer.Exit(1)
- engine = get_engine()
- with Session(engine) as session:
- statement = select(ProjectDB).where(ProjectDB.github_stars.isnot(None))
- if category:
- statement = statement.where(ProjectDB.category == category)
- statement = statement.order_by(ProjectDB.github_stars.desc()).limit(limit)
- results = session.exec(statement).all()
- table = Table(title=f"Top {len(results)} Projects by Stars")
- table.add_column("#", justify="right", style="dim")
- table.add_column("Name", style="cyan", no_wrap=True)
- table.add_column("Category", style="green")
- table.add_column("Stars", justify="right", style="yellow")
- table.add_column("Forks", justify="right", style="blue")
- table.add_column("URL", style="dim")
- for i, p in enumerate(results, 1):
- table.add_row(
- str(i),
- p.name,
- p.category,
- f"{p.github_stars:,}",
- str(p.github_forks or "-"),
- p.url[:50] + "..." if len(p.url) > 50 else p.url,
- )
- console.print(table)
- @app.command()
- def categories():
- """
- List all categories with project counts.
- """
- if not DATABASE_PATH.exists():
- print("[red]Database not found. Run 'sync-db' first.[/red]")
- raise typer.Exit(1)
- engine = get_engine()
- with Session(engine) as session:
- results = session.exec(select(ProjectDB)).all()
- # Count by category
- category_counts: dict[str, int] = {}
- category_stars: dict[str, int] = {}
- for p in results:
- category_counts[p.category] = category_counts.get(p.category, 0) + 1
- category_stars[p.category] = category_stars.get(p.category, 0) + (p.github_stars or 0)
- # Sort by count
- sorted_categories = sorted(category_counts.items(), key=lambda x: x[1], reverse=True)
- table = Table(title="Categories")
- table.add_column("Category", style="cyan")
- table.add_column("Projects", justify="right", style="green")
- table.add_column("Total Stars", justify="right", style="yellow")
- for cat, count in sorted_categories:
- table.add_row(cat, str(count), f"{category_stars[cat]:,}")
- console.print(table)
- print(f"\n[bold]Total: {len(sorted_categories)} categories, {len(results)} projects[/bold]")
- @app.command()
- def search(
- query: str = typer.Argument(..., help="Search term"),
- limit: int = typer.Option(20, "--limit", "-l", help="Maximum results"),
- ):
- """
- Search projects by name or description.
- """
- if not DATABASE_PATH.exists():
- print("[red]Database not found. Run 'sync-db' first.[/red]")
- raise typer.Exit(1)
- engine = get_engine()
- query_lower = query.lower()
- with Session(engine) as session:
- results = session.exec(select(ProjectDB)).all()
- # Filter by search term
- matches = [
- p for p in results
- if query_lower in p.name.lower() or query_lower in p.description.lower()
- ]
- # Sort by stars
- matches.sort(key=lambda x: x.github_stars or 0, reverse=True)
- matches = matches[:limit]
- if not matches:
- print(f"[yellow]No projects found matching '{query}'[/yellow]")
- return
- table = Table(title=f"Search results for '{query}' ({len(matches)} matches)")
- table.add_column("Name", style="cyan", no_wrap=True)
- table.add_column("Category", style="green")
- table.add_column("Stars", justify="right", style="yellow")
- table.add_column("Description", style="dim", max_width=50)
- for p in matches:
- stars = str(p.github_stars) if p.github_stars else "-"
- desc = p.description[:50] + "..." if len(p.description) > 50 else p.description
- table.add_row(p.name, p.category, stars, desc)
- console.print(table)
- @app.command()
- def stale(
- days: int = typer.Option(365, "--days", "-d", help="Days since last commit to consider stale"),
- limit: int = typer.Option(30, "--limit", "-l", help="Maximum results"),
- ):
- """
- Find stale/unmaintained projects (no commits in X days).
- """
- if not DATABASE_PATH.exists():
- print("[red]Database not found. Run 'sync-db' first.[/red]")
- raise typer.Exit(1)
- engine = get_engine()
- cutoff = datetime.now().replace(tzinfo=None)
- with Session(engine) as session:
- results = session.exec(
- select(ProjectDB).where(ProjectDB.github_last_commit.isnot(None))
- ).all()
- # Filter stale projects
- stale_projects = []
- for p in results:
- try:
- last_commit = datetime.fromisoformat(p.github_last_commit.replace("Z", "+00:00"))
- last_commit = last_commit.replace(tzinfo=None)
- days_since = (cutoff - last_commit).days
- if days_since >= days:
- stale_projects.append((p, days_since))
- except (ValueError, AttributeError):
- continue
- # Sort by oldest first
- stale_projects.sort(key=lambda x: x[1], reverse=True)
- stale_projects = stale_projects[:limit]
- if not stale_projects:
- print(f"[green]No stale projects found (>{days} days without commits)[/green]")
- return
- table = Table(title=f"Stale Projects (no commits in {days}+ days)")
- table.add_column("Name", style="cyan", no_wrap=True)
- table.add_column("Category", style="green")
- table.add_column("Stars", justify="right", style="yellow")
- table.add_column("Last Commit", style="red")
- table.add_column("Days Ago", justify="right", style="red")
- for p, days_ago in stale_projects:
- stars = str(p.github_stars) if p.github_stars else "-"
- last_commit = p.github_last_commit[:10] if p.github_last_commit else "-"
- table.add_row(p.name, p.category, stars, last_commit, str(days_ago))
- console.print(table)
- print(f"\n[bold red]Found {len(stale_projects)} stale projects[/bold red]")
- @app.command()
- def stats():
- """
- Show database statistics.
- """
- if not DATABASE_PATH.exists():
- print("[red]Database not found. Run 'sync-db' first.[/red]")
- raise typer.Exit(1)
- engine = get_engine()
- with Session(engine) as session:
- all_projects = session.exec(select(ProjectDB)).all()
- github_projects = [p for p in all_projects if p.github_stars is not None]
- total_stars = sum(p.github_stars or 0 for p in all_projects)
- categories = set(p.category for p in all_projects)
- print("\n[bold blue]Database Statistics[/bold blue]")
- print(f" Total projects: [green]{len(all_projects)}[/green]")
- print(f" GitHub projects: [green]{len(github_projects)}[/green]")
- print(f" Categories: [green]{len(categories)}[/green]")
- print(f" Total stars: [yellow]{total_stars:,}[/yellow]")
- if github_projects:
- avg_stars = total_stars / len(github_projects)
- max_stars_project = max(github_projects, key=lambda x: x.github_stars or 0)
- print(f" Average stars: [yellow]{avg_stars:.0f}[/yellow]")
- print(f" Most starred: [cyan]{max_stars_project.name}[/cyan] ({max_stars_project.github_stars:,} stars)")
- if __name__ == "__main__":
- app()
|