Skip to main content

Overview

The docstrange Python SDK provides a type-safe, ergonomic interface for the Nanonets Document Extraction API. It includes full type annotations, async support, streaming, and automatic pagination.

Key Features

Type Safety

Full type annotations and Pydantic models for all requests and responses.

Async Support

Native async/await support with AsyncDocstrange client.

Streaming

Consume SSE streams with typed event objects.

File Upload

Upload files directly from file paths, bytes, or file objects.

Error Handling

Typed exceptions with status codes, messages, and request IDs.

Pagination

Automatic pagination helpers for list endpoints.

Installation

pip install docstrange
Requires Python 3.9+

Authentication

The SDK reads your API key from the DOCSTRANGE_API_KEY environment variable by default.
export DOCSTRANGE_API_KEY="your-api-key"
You can also pass it explicitly when creating the client:
from docstrange import Docstrange

# Uses DOCSTRANGE_API_KEY env var
client = Docstrange()

# Or pass explicitly
client = Docstrange(api_key="your-api-key")
Get your API key from the top right menu on docstrange.nanonets.com.

Core Methods

Synchronous Extraction

Extract content from a document and get results immediately. Best for files with 5 pages or less.
from docstrange import Docstrange

client = Docstrange()

result = client.extract.sync(
    file=open("invoice.pdf", "rb"),
    output_format="markdown",
    custom_instructions="Format all dates as YYYY-MM-DD",
    include_metadata="bounding_boxes,confidence_score",
)

print(result.result.markdown.content)
Parameters:
ParameterTypeRequiredDescription
fileFileTypes*File to upload (PDF, Word, Excel, PowerPoint, images). Provide exactly one of file, file_url, or file_base64.
file_urlstr*URL to download the file from.
file_base64str*Base64-encoded file content.
output_formatstrYesOutput format(s): markdown, html, json, csv. Comma-separate for multiple (e.g., "markdown,json").
custom_instructionsstrNoCustom extraction instructions (max 8,000 chars). E.g., "Format dates as YYYY-MM-DD".
prompt_modestrNo"append" (default) adds to base prompt, "replace" uses only your custom instructions.
json_optionsstrNoJSON extraction mode. Values: "hierarchy_output", "table-of-contents", field list '["field1", "field2"]', or JSON schema '{...}'.
csv_optionsstrNoCSV extraction options. E.g., "table".
include_metadatastrNoComma-separated metadata types: bounding_boxes, bounding_boxes_word, confidence_score.
Provide exactly one file input: file, file_url, or file_base64. The file parameter accepts file objects, bytes, or tuples of (filename, content, content_type).

Asynchronous Extraction

Queue a document for background processing. Returns a record_id to poll results. Recommended for large documents (>5 pages).
response = client.extract.async_(
    file=open("large-report.pdf", "rb"),
    output_format="json",
    json_options='["invoice_number", "date", "total_amount"]',
)

print(f"Queued with record_id: {response.record_id}")

# Poll for results
result = client.extract.results.retrieve(record_id=response.record_id)
print(result.status)
Parameters: Same as Synchronous Extraction.

Streaming Extraction

Stream extraction results in real-time via Server-Sent Events.
stream = client.extract.stream(
    file=open("document.pdf", "rb"),
    output_format="markdown",
    enable_streaming=True,
)

for event in stream:
    if event.type == "content":
        print(event.data, end="", flush=True)
    elif event.type == "done":
        print(f"\nCompleted in {event.processing_time}s")
    elif event.type == "async_queued":
        print(f"Large file queued: {event.record_id}")
Parameters: All Synchronous Extraction parameters, plus:
ParameterTypeRequiredDescription
enable_streamingboolNoTrue (default) for real-time incremental chunks. False for batch mode (complete content in a single SSE event).
SSE Event Types:
Event TypeDescription
contentIncremental content chunk (streaming mode)
completeFull content at once (batch mode, when enable_streaming=False)
doneFinal event with record_id and processing_time
errorError information
async_queuedLarge files automatically queued for async processing

Batch Extraction

Process multiple documents in a single request (max 50 files). All files share the same extraction options.
response = client.extract.batch(
    files=[
        open("invoice1.pdf", "rb"),
        open("invoice2.pdf", "rb"),
        open("invoice3.pdf", "rb"),
    ],
    output_format="json",
    json_options='["invoice_number", "date", "total_amount"]',
    custom_instructions="Extract amounts without currency symbols",
)

print(f"Batch {response.batch_id}: {response.accepted_files} files queued")

for record in response.records:
    print(f"  {record.filename}: {record.record_id}")
Parameters:
ParameterTypeRequiredDescription
fileslist[FileTypes]YesList of files to process (max 50).
output_formatstrYesOutput format(s) applied to all files.
custom_instructionsstrNoCustom extraction instructions (max 8,000 chars).
prompt_modestrNo"append" (default) or "replace".
json_optionsstrNoJSON extraction mode.
csv_optionsstrNoCSV extraction options.
include_metadatastrNoComma-separated metadata types.

Document Classification

Classify a document into predefined categories. Each page is classified individually with a category, confidence score (0-100), and reasoning.
result = client.classify.sync(
    file=open("document.pdf", "rb"),
    categories='[{"name": "Invoice", "description": "Bills and invoices"}, {"name": "Contract", "description": "Legal agreements"}, {"name": "Receipt"}]',
)

for page in result.result.pages:
    print(f"Page {page.page_number}: {page.category} ({page.confidence}%) - {page.reasoning}")
Parameters:
ParameterTypeRequiredDescription
fileFileTypesYesFile to classify (PDF, PNG, JPG, JPEG, TIFF, BMP, WebP).
categoriesstrYesJSON array of category objects: [{"name": "Category", "description": "Optional description"}].

Batch Classification

Classify multiple documents at once (max 50 files).
response = client.classify.batch(
    files=[
        open("doc1.pdf", "rb"),
        open("doc2.pdf", "rb"),
    ],
    categories='[{"name": "Invoice"}, {"name": "Receipt"}, {"name": "Contract"}]',
)

print(f"Batch {response.batch_id}: {response.successful_files} classified")

for result in response.results:
    print(f"  {result.filename}: {result.pages[0].category}")
Parameters:
ParameterTypeRequiredDescription
fileslist[FileTypes]YesFiles to classify (max 50).
categoriesstrYesJSON array of category objects (max 50 categories).

Retrieve Results

Get the status and result of a previous extraction by record ID.
result = client.extract.results.retrieve(
    record_id="12345",
    include_content=True,
)

if result.status == "completed":
    print(result.result.markdown.content)
elif result.status == "processing":
    print("Still processing...")
Parameters:
ParameterTypeRequiredDescription
record_idstrYesExtraction job ID (numeric string returned by the API).
include_contentboolNoInclude full extracted content (default: True). Set to False to retrieve only status and metadata.

List Results

List all extraction results for the authenticated user (paginated).
page = client.extract.results.list(
    page=1,
    page_size=10,
    sort_by="created_at",
    sort_order="desc",
)

for record in page.results:
    print(f"{record.record_id}: {record.status} ({record.filename})")

print(f"Page {page.pagination.page} of {page.pagination.total_pages}")
print(f"Total records: {page.pagination.total_count}")
Parameters:
ParameterTypeRequiredDescription
pageintNoPage number (default: 1, minimum: 1).
page_sizeintNoResults per page (default: 20, range: 1-100).
sort_bystrNoSort field. One of: created_at (default), updated_at, original_filename, file_size, processing_status.
sort_orderstrNoSort direction: "desc" (default) or "asc".

Async Client

For async/await support, use the AsyncDocstrange client. All methods have the same signatures.
import asyncio
from docstrange import AsyncDocstrange

async def main():
    client = AsyncDocstrange()

    result = await client.extract.sync(
        file=open("invoice.pdf", "rb"),
        output_format="markdown",
    )
    print(result.result.markdown.content)

    # Streaming also works with async
    stream = await client.extract.stream(
        file=open("document.pdf", "rb"),
        output_format="markdown",
        enable_streaming=True,
    )

    async for event in stream:
        if event.type == "content":
            print(event.data, end="", flush=True)

asyncio.run(main())

Error Handling

The SDK raises typed exceptions for API errors.
from docstrange import Docstrange, APIStatusError, APIConnectionError, APITimeoutError

client = Docstrange()

try:
    result = client.extract.sync(
        file=open("document.pdf", "rb"),
        output_format="markdown",
    )
except APIConnectionError:
    print("Failed to connect to the API")
except APITimeoutError:
    print("Request timed out")
except APIStatusError as e:
    print(f"API error {e.status_code}: {e.message}")
    print(f"Request ID: {e.request_id}")
ExceptionDescription
APIConnectionErrorNetwork connectivity issues
APITimeoutErrorRequest exceeded timeout
APIStatusErrorAPI returned an error status code
AuthenticationErrorInvalid or missing API key (401)
PermissionDeniedErrorInsufficient permissions (403)
NotFoundErrorResource not found (404)
RateLimitErrorToo many requests (429)
InternalServerErrorServer error (500+)

Configuration

Custom Base URL

For on-premise deployments, point the client to your own instance:
client = Docstrange(
    api_key="your-api-key",
    base_url="https://your-instance.example.com",
)

Timeouts

Configure request timeouts:
from docstrange import Docstrange, Timeout

client = Docstrange(
    timeout=Timeout(
        connect=5.0,
        read=60.0,
        write=10.0,
    ),
)

Retries

The SDK automatically retries failed requests with exponential backoff. Configure the maximum number of retries:
client = Docstrange(max_retries=3)  # default is 2

Next Steps

API Reference

Explore the complete API documentation with interactive examples.

Examples

See full code examples for every endpoint and output format.