Files
Anton_wireframe/app/routers/companies.py
T

282 lines
8.9 KiB
Python

from typing import Optional
from db.db import get_db
from db.models import CompanyTable, InvestorTable
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel
from schemas.router_schemas import CompanyData, PaginatedResponse
from sqlalchemy.orm import Session, selectinload
router = APIRouter(tags=["Company Routes"])
# Request schemas for creating/updating
class CompanyCreate(BaseModel):
name: str
industry: str
location: str
description: Optional[str] = None
founded_year: Optional[int] = None
website: Optional[str] = None
class CompanyUpdate(BaseModel):
name: Optional[str] = None
industry: Optional[str] = None
location: Optional[str] = None
description: Optional[str] = None
founded_year: Optional[int] = None
website: Optional[str] = None
@router.get("/companies", response_model=PaginatedResponse[CompanyData])
def read_companies(
page: int = Query(1, ge=1, description="Page number (starts at 1)"),
page_size: int = Query(10, ge=1, le=100, description="Items per page (max 100)"),
db: Session = Depends(get_db),
):
"""Get all companies with their investor relationships (paginated)"""
# Calculate offset
offset = (page - 1) * page_size
# Get total count
total_count = (
db.query(CompanyTable)
.filter(CompanyTable.name.isnot(None), CompanyTable.description.isnot(None))
.count()
)
# Get paginated results
companies = (
db.query(CompanyTable)
.filter(CompanyTable.name.isnot(None), CompanyTable.description.isnot(None))
.options(
selectinload(CompanyTable.investors),
selectinload(CompanyTable.members),
selectinload(CompanyTable.sectors),
)
.offset(offset)
.limit(page_size)
.all()
)
# Transform CompanyTable objects to CompanyData format
company_data_list = []
for company in companies:
# Sort sectors alphabetically
sorted_sectors = sorted(company.sectors, key=lambda s: s.name) if company.sectors else []
company_data = CompanyData(
company=company,
investors=company.investors,
members=company.members,
sectors=sorted_sectors,
)
company_data_list.append(company_data)
# Calculate total pages
total_pages = (total_count + page_size - 1) // page_size
return PaginatedResponse(
items=company_data_list,
total=total_count,
page=page,
page_size=page_size,
total_pages=total_pages,
)
@router.get("/companies/filter", response_model=PaginatedResponse[CompanyData])
def filter_companies(
industry: Optional[str] = Query(
None, description="Filter by industry (partial match)"
),
location: Optional[str] = Query(
None, description="Filter by location (partial match)"
),
founded_after: Optional[int] = Query(None, description="Founded after year"),
founded_before: Optional[int] = Query(None, description="Founded before year"),
has_website: Optional[bool] = Query(
None, description="Filter companies with/without website"
),
investor_name: Optional[str] = Query(
None, description="Filter by investor name (partial match)"
),
page: int = Query(1, ge=1, description="Page number (starts at 1)"),
page_size: int = Query(10, ge=1, le=100, description="Items per page (max 100)"),
db: Session = Depends(get_db),
):
"""Filter companies based on various criteria (paginated)"""
# Start with base query
query = db.query(CompanyTable).options(
selectinload(CompanyTable.investors),
selectinload(CompanyTable.members),
selectinload(CompanyTable.sectors),
)
# Apply filters
if industry:
query = query.filter(CompanyTable.industry.ilike(f"%{industry}%"))
if location:
query = query.filter(CompanyTable.location.ilike(f"%{location}%"))
if founded_after is not None:
query = query.filter(CompanyTable.founded_year >= founded_after)
if founded_before is not None:
query = query.filter(CompanyTable.founded_year <= founded_before)
if has_website is not None:
if has_website:
query = query.filter(CompanyTable.website.isnot(None))
else:
query = query.filter(CompanyTable.website.is_(None))
# Filter by investor if provided
if investor_name:
query = query.join(CompanyTable.investors).filter(
InvestorTable.name.ilike(f"%{investor_name}%")
)
# Get total count before pagination
total_count = query.count()
# Calculate offset and apply pagination
offset = (page - 1) * page_size
companies = query.offset(offset).limit(page_size).all()
# Transform to CompanyData format
company_data_list = []
for company in companies:
# Sort sectors alphabetically
sorted_sectors = sorted(company.sectors, key=lambda s: s.name) if company.sectors else []
company_data = CompanyData(
company=company,
investors=company.investors,
members=company.members,
sectors=sorted_sectors,
)
company_data_list.append(company_data)
# Calculate total pages
total_pages = (total_count + page_size - 1) // page_size
return PaginatedResponse(
items=company_data_list,
total=total_count,
page=page,
page_size=page_size,
total_pages=total_pages,
)
@router.get("/companies/{company_id}", response_model=CompanyData)
def read_company(company_id: int, db: Session = Depends(get_db)):
"""Get a specific company by ID with its investors"""
company = (
db.query(CompanyTable)
.options(
selectinload(CompanyTable.investors),
selectinload(CompanyTable.members),
selectinload(CompanyTable.sectors),
)
.filter(CompanyTable.id == company_id)
.first()
)
if not company:
raise HTTPException(status_code=404, detail="Company not found")
# Sort sectors alphabetically
sorted_sectors = sorted(company.sectors, key=lambda s: s.name) if company.sectors else []
# Transform to CompanyData format
return CompanyData(
company=company,
investors=company.investors,
members=company.members,
sectors=sorted_sectors,
)
@router.post("/companies", response_model=CompanyData)
def create_company(company: CompanyCreate, db: Session = Depends(get_db)):
"""Create a new company"""
db_company = CompanyTable(**company.dict())
db.add(db_company)
db.commit()
db.refresh(db_company)
# Reload with relationships
company_with_relations = (
db.query(CompanyTable)
.options(
selectinload(CompanyTable.investors),
selectinload(CompanyTable.members),
selectinload(CompanyTable.sectors),
)
.filter(CompanyTable.id == db_company.id)
.first()
)
# Transform to CompanyData format
return CompanyData(
company=company_with_relations,
investors=company_with_relations.investors,
members=company_with_relations.members,
sectors=company_with_relations.sectors,
)
@router.put("/companies/{company_id}", response_model=CompanyData)
def update_company(
company_id: int, company: CompanyUpdate, db: Session = Depends(get_db)
):
"""Update an existing company"""
db_company = db.query(CompanyTable).filter(CompanyTable.id == company_id).first()
if not db_company:
raise HTTPException(status_code=404, detail="Company not found")
update_data = company.dict(exclude_unset=True)
for field, value in update_data.items():
setattr(db_company, field, value)
db.commit()
db.refresh(db_company)
# Reload with relationships
company_with_relations = (
db.query(CompanyTable)
.options(
selectinload(CompanyTable.investors),
selectinload(CompanyTable.members),
selectinload(CompanyTable.sectors),
)
.filter(CompanyTable.id == company_id)
.first()
)
# Sort sectors alphabetically
sorted_sectors = sorted(company_with_relations.sectors, key=lambda s: s.name) if company_with_relations.sectors else []
# Transform to CompanyData format
return CompanyData(
company=company_with_relations,
investors=company_with_relations.investors,
members=company_with_relations.members,
sectors=sorted_sectors,
)
@router.delete("/companies/{company_id}")
def delete_company(company_id: int, db: Session = Depends(get_db)):
"""Delete a company"""
db_company = db.query(CompanyTable).filter(CompanyTable.id == company_id).first()
if not db_company:
raise HTTPException(status_code=404, detail="Company not found")
db.delete(db_company)
db.commit()
return {"message": "Company deleted successfully"}