Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(python-sdk): Improve request handling, pagination, and error handling #1165

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
300 changes: 154 additions & 146 deletions apps/python-sdk/firecrawl/firecrawl.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,26 @@
import logging
import os
import time
from typing import Any, Dict, Optional, List, Union
from typing import Any, Dict, Optional, List, Union, Callable, TypeVar, Generic
import json
from dataclasses import dataclass
from datetime import datetime

import requests
import pydantic
import websockets

logger : logging.Logger = logging.getLogger("firecrawl")

T = TypeVar('T')

class ResponseData(Generic[T]):
"""Generic container for API response data"""
success: bool
data: Optional[T]
error: Optional[str]
warning: Optional[str]

class SearchParams(pydantic.BaseModel):
query: str
limit: Optional[int] = 5
Expand All @@ -33,6 +44,18 @@ class SearchParams(pydantic.BaseModel):
timeout: Optional[int] = 60000
scrapeOptions: Optional[Dict[str, Any]] = None

@dataclass
class CrawlStatus:
"""Status information for a crawl job"""
status: str
total: int
completed: int
credits_used: int
expires_at: datetime
data: List[Dict[str, Any]]
error: Optional[str] = None
next_url: Optional[str] = None

class GenerateLLMsTextParams(pydantic.BaseModel):
"""
Parameters for the LLMs.txt generation operation.
Expand Down Expand Up @@ -266,75 +289,32 @@ def async_crawl_url(self, url: str, params: Optional[Dict[str, Any]] = None, ide
else:
self._handle_error(response, 'start crawl job')

def check_crawl_status(self, id: str) -> Any:
def check_crawl_status(self, id: str) -> Dict[str, Any]:
"""
Check the status of a crawl job using the Firecrawl API.

Check the status of a crawl job.
Args:
id (str): The ID of the crawl job.

id (str): The crawl job ID
Returns:
Any: The status of the crawl job.

Raises:
Exception: If the status check request fails.
Dict[str, Any]: Current status of the crawl job
"""
endpoint = f'/v1/crawl/{id}'

headers = self._prepare_headers()
response = self._get_request(f'{self.api_url}{endpoint}', headers)
if response.status_code == 200:
try:
status_data = response.json()
except:
raise Exception(f'Failed to parse Firecrawl response as JSON.')
if status_data['status'] == 'completed':
if 'data' in status_data:
data = status_data['data']
while 'next' in status_data:
if len(status_data['data']) == 0:
break
next_url = status_data.get('next')
if not next_url:
logger.warning("Expected 'next' URL is missing.")
break
try:
status_response = self._get_request(next_url, headers)
if status_response.status_code != 200:
logger.error(f"Failed to fetch next page: {status_response.status_code}")
break
try:
next_data = status_response.json()
except:
raise Exception(f'Failed to parse Firecrawl response as JSON.')
data.extend(next_data.get('data', []))
status_data = next_data
except Exception as e:
logger.error(f"Error during pagination request: {e}")
break
status_data['data'] = data

response = {
'status': status_data.get('status'),
'total': status_data.get('total'),
'completed': status_data.get('completed'),
'creditsUsed': status_data.get('creditsUsed'),
'expiresAt': status_data.get('expiresAt'),
'data': status_data.get('data')
}

if 'error' in status_data:
response['error'] = status_data['error']

if 'next' in status_data:
response['next'] = status_data['next']

return {
'success': False if 'error' in status_data else True,
**response
}
else:
self._handle_error(response, 'check crawl status')
response = self._get_request(f'{self.api_url}/v1/crawl/{id}', headers)
status_data = self._handle_response(response, 'check crawl status')

if status_data['status'] == 'completed' and 'data' in status_data:
status_data['data'] = self._handle_pagination(status_data, headers)

return {
'success': 'error' not in status_data,
'status': status_data.get('status'),
'total': status_data.get('total'),
'completed': status_data.get('completed'),
'creditsUsed': status_data.get('creditsUsed'),
'expiresAt': status_data.get('expiresAt'),
'data': status_data.get('data')
}

def check_crawl_errors(self, id: str) -> Dict[str, Any]:
"""
Expand Down Expand Up @@ -903,88 +883,14 @@ def _prepare_headers(self, idempotency_key: Optional[str] = None) -> Dict[str, s
'Authorization': f'Bearer {self.api_key}',
}

def _post_request(self, url: str,
data: Dict[str, Any],
headers: Dict[str, str],
retries: int = 3,
backoff_factor: float = 0.5) -> requests.Response:
"""
Make a POST request with retries.

Args:
url (str): The URL to send the POST request to.
data (Dict[str, Any]): The JSON data to include in the POST request.
headers (Dict[str, str]): The headers to include in the POST request.
retries (int): Number of retries for the request.
backoff_factor (float): Backoff factor for retries.

Returns:
requests.Response: The response from the POST request.

Raises:
requests.RequestException: If the request fails after the specified retries.
"""
for attempt in range(retries):
response = requests.post(url, headers=headers, json=data)
if response.status_code == 502:
time.sleep(backoff_factor * (2 ** attempt))
else:
return response
return response

def _get_request(self, url: str,
headers: Dict[str, str],
retries: int = 3,
backoff_factor: float = 0.5) -> requests.Response:
"""
Make a GET request with retries.
def _post_request(self, url: str, data: Dict[str, Any], headers: Dict[str, str]) -> requests.Response:
return self._make_request_with_retry('POST', url, headers, json_data=data)

Args:
url (str): The URL to send the GET request to.
headers (Dict[str, str]): The headers to include in the GET request.
retries (int): Number of retries for the request.
backoff_factor (float): Backoff factor for retries.
def _get_request(self, url: str, headers: Dict[str, str]) -> requests.Response:
return self._make_request_with_retry('GET', url, headers)

Returns:
requests.Response: The response from the GET request.

Raises:
requests.RequestException: If the request fails after the specified retries.
"""
for attempt in range(retries):
response = requests.get(url, headers=headers)
if response.status_code == 502:
time.sleep(backoff_factor * (2 ** attempt))
else:
return response
return response

def _delete_request(self, url: str,
headers: Dict[str, str],
retries: int = 3,
backoff_factor: float = 0.5) -> requests.Response:
"""
Make a DELETE request with retries.

Args:
url (str): The URL to send the DELETE request to.
headers (Dict[str, str]): The headers to include in the DELETE request.
retries (int): Number of retries for the request.
backoff_factor (float): Backoff factor for retries.

Returns:
requests.Response: The response from the DELETE request.

Raises:
requests.RequestException: If the request fails after the specified retries.
"""
for attempt in range(retries):
response = requests.delete(url, headers=headers)
if response.status_code == 502:
time.sleep(backoff_factor * (2 ** attempt))
else:
return response
return response
def _delete_request(self, url: str, headers: Dict[str, str]) -> requests.Response:
return self._make_request_with_retry('DELETE', url, headers)

def _monitor_job_status(self, id: str, headers: Dict[str, str], poll_interval: int) -> Any:
"""
Expand Down Expand Up @@ -1065,6 +971,108 @@ def _handle_error(self, response: requests.Response, action: str) -> None:
# Raise an HTTPError with the custom message and attach the response
raise requests.exceptions.HTTPError(message, response=response)

def _handle_response(self, response: requests.Response, error_context: str) -> Dict[str, Any]:
"""
Handle API response and return parsed JSON data.

Args:
response (requests.Response): Response from API request
error_context (str): Context for error messages

Returns:
Dict[str, Any]: Parsed JSON response data

Raises:
Exception: If response status is not 200 or JSON parsing fails
"""
if response.status_code == 200:
try:
return response.json()
except:
raise Exception('Failed to parse Firecrawl response as JSON.')
else:
self._handle_error(response, error_context)

def _handle_pagination(self, initial_data: Dict[str, Any], headers: Dict[str, str]) -> List[Dict[str, Any]]:
"""
Handle paginated responses by collecting all pages of data.

Args:
initial_data (Dict[str, Any]): Initial response data containing first page
headers (Dict[str, str]): Request headers

Returns:
List[Dict[str, Any]]: Combined data from all pages
"""
data = initial_data.get('data', [])
current_data = initial_data

while 'next' in current_data and current_data.get('data'):
next_url = current_data.get('next')
if not next_url:
logger.warning("Expected 'next' URL is missing.")
break

try:
response = self._get_request(next_url, headers)
current_data = self._handle_response(response, 'fetch next page')
data.extend(current_data.get('data', []))
except Exception as e:
logger.error(f"Error during pagination request: {e}")
break

return data

def _make_request_with_retry(self, method: str, url: str,
headers: Dict[str, str],
json_data: Optional[Dict[str, Any]] = None,
retries: int = 3,
backoff_factor: float = 0.5) -> requests.Response:
"""
Make HTTP request with exponential backoff retry logic.

Args:
method (str): HTTP method ('GET', 'POST', 'DELETE')
url (str): Request URL
headers (Dict[str, str]): Request headers
json_data (Optional[Dict[str, Any]]): JSON data for POST requests
retries (int): Number of retry attempts
backoff_factor (float): Backoff multiplier

Returns:
requests.Response: Response from successful request

Raises:
requests.RequestException: If all retries fail
"""
retry_codes = {502, 503, 504} # Add more status codes as needed

for attempt in range(retries):
try:
if method == 'GET':
response = requests.get(url, headers=headers)
elif method == 'POST':
response = requests.post(url, headers=headers, json=json_data)
elif method == 'DELETE':
response = requests.delete(url, headers=headers)
else:
raise ValueError(f"Unsupported HTTP method: {method}")

if response.status_code not in retry_codes:
return response

wait_time = backoff_factor * (2 ** attempt)
logger.debug(f"Request failed with status {response.status_code}. "
f"Retrying in {wait_time:.1f} seconds...")
time.sleep(wait_time)

except requests.RequestException as e:
if attempt == retries - 1:
raise
logger.warning(f"Request failed: {str(e)}. Retrying...")

return response # Return last response if all retries failed

class CrawlWatcher:
def __init__(self, id: str, app: FirecrawlApp):
self.id = id
Expand Down Expand Up @@ -1110,4 +1118,4 @@ async def _handle_message(self, msg: Dict[str, Any]):
self.dispatch_event('document', {'data': doc, 'id': self.id})
elif msg['type'] == 'document':
self.data.append(msg['data'])
self.dispatch_event('document', {'data': msg['data'], 'id': self.id})
self.dispatch_event('document', {'data': msg['data'], 'id': self.id})