Skip to content

Integrate Captain with Your Data Lake

Connect Captain to your cloud storage (AWS S3 or Google Cloud Storage) to index and query your entire data lake. This integration enables persistent, searchable databases from your existing file storage infrastructure.


Overview

Captain's Data Lake Integration consists of three key components working together:

  1. Tagger - Scans your cloud storage, processes files, and extracts structured data
  2. DeepQuery - Executes natural language queries against indexed data
  3. Cloud Storage - Your AWS S3 or GCS buckets containing documents
┌─────────────────┐
│   Your Cloud    │
│  Storage (S3/   │
│      GCS)       │
└────────┬────────┘
         │ Tagger scans & indexes
┌─────────────────┐
│    Captain      │
│   Database      │
└────────┬────────┘
         │ DeepQuery executes
┌─────────────────┐
│  Natural Lang.  │
│     Queries     │
└─────────────────┘

Getting Started

Prerequisites

Authentication Setup

All Captain API endpoints use header-based authentication. Set up your credentials:

# Step 1: Set your credentials
API_KEY = "cap_prod_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
ORG_ID = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"

# Step 2: Create headers dictionary (reuse for all requests)
headers = {
    "Authorization": f"Bearer {API_KEY}",
    "X-Organization-ID": ORG_ID
}

# Step 3: Use headers in every request
response = requests.post(
    "https://api.runcaptain.com/v1/...",
    headers=headers,
    data={...}
)

Required headers for all requests: - Authorization: Bearer {your_api_key} - X-Organization-ID: {your_org_id}

Quick Start (4 Steps)

import requests

# Set your credentials
API_KEY = "cap_prod_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
ORG_ID = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
BASE_URL = "https://api.runcaptain.com"

headers = {
    "Authorization": f"Bearer {API_KEY}",
    "X-Organization-ID": ORG_ID
}

# 1. Create a database
requests.post(
    f"{BASE_URL}/v1/create-database",
    headers=headers,
    data={
        'database_name': 'my_documents'
    }
)

# 2. Index your S3 bucket (Tagger will scan all files)
index_response = requests.post(
    f"{BASE_URL}/v1/index-s3",
    headers=headers,
    data={
        'database_name': 'my_documents',
        'bucket_name': 'my-s3-bucket',
        'aws_access_key_id': 'AKIA...',
        'aws_secret_access_key': '...',
        'bucket_region': 'us-east-1'
    }
)

job_id = index_response.json()['job_id']

# 3. Monitor indexing progress
status = requests.get(
    f"{BASE_URL}/v1/indexing-status/{job_id}",
    headers=headers
)

# 4. Query your data (DeepQuery executes the search)
response = requests.post(
    f"{BASE_URL}/v1/query",
    headers=headers,
    data={
        'query': 'What are the main themes in Q4 reports?',
        'database_name': 'my_documents'
    }
)

Tagger: Intelligent File Indexing

The Tagger service automatically scans your cloud storage, extracts content from files, and creates searchable indexes.

Supported File Types

Documents: - PDF (.pdf) - Up to 512MB with automatic page chunking - Microsoft Word (.docx) - Text files (.txt) - Markdown (.md) - Rich Text Format (.rtf) - OpenDocument Text (.odt)

Spreadsheets & Data: - Microsoft Excel (.xlsx, .xls) - Row-based chunking - CSV (.csv) - Header preservation - JSON (.json)

Presentations: - Microsoft PowerPoint (.pptx, .ppt)

Images (with OCR & Computer Vision): - JPEG (.jpg, .jpeg) - PNG (.png) - BMP (.bmp) - Experimental - GIF (.gif) - Experimental - TIFF (.tiff) - Experimental

Code Files: - Python (.py) - TypeScript (.ts) - JavaScript (.js) - HTML (.html) - CSS (.css) - PHP (.php) - Java (.java)

Web Content: - XML (.xml)

How Tagger Works

  1. Discovery: Scans your bucket and identifies all supported files
  2. Processing: Extracts text, images, and metadata from each file
  3. Chunking: Intelligently splits large files into searchable chunks
  4. Tagging: Generates AI-powered tags and summaries for each chunk
  5. Indexing: Stores processed data in your Captain database

Processing Time: - Average: 2-5 seconds per file - Large PDFs (100+ pages): 10-30 seconds - Entire bucket (1000 files): 45-120 seconds


AWS S3 Integration

Index Entire S3 Bucket

Index all files in an S3 bucket:

import requests

API_KEY = "cap_prod_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
ORG_ID = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"

headers = {
    "Authorization": f"Bearer {API_KEY}",
    "X-Organization-ID": ORG_ID
}

response = requests.post(
    "https://api.runcaptain.com/v1/index-s3",
    headers=headers,
    data={
        'database_name': 'my_documents',
        'bucket_name': 'my-company-docs',
        'aws_access_key_id': 'AKIAIOSFODNN7EXAMPLE',
        'aws_secret_access_key': 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY',
        'bucket_region': 'us-east-1'
    }
)

print(response.json())

Response:

{
  "status": "success",
  "message": "Indexing job started",
  "job_id": "job_1729876543_a1b2c3d4",
  "estimated_duration_seconds": 90,
  "files_found": 1247
}

Index Single S3 File

Index a specific file from S3:

import requests

API_KEY = "cap_prod_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
ORG_ID = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"

headers = {
    "Authorization": f"Bearer {API_KEY}",
    "X-Organization-ID": ORG_ID
}

response = requests.post(
    "https://api.runcaptain.com/v1/index-s3-file",
    headers=headers,
    data={
        'file_uri': 's3://my-bucket/documents/report_q4_2024.pdf',
        'database_name': 'my_documents',
        'aws_access_key_id': 'AKIAIOSFODNN7EXAMPLE',
        'aws_secret_access_key': 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY',
        'bucket_region': 'us-east-1'
    }
)

AWS IAM Permissions

Your AWS credentials need these permissions:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "s3:ListBucket",
        "s3:GetObject"
      ],
      "Resource": [
        "arn:aws:s3:::your-bucket-name",
        "arn:aws:s3:::your-bucket-name/*"
      ]
    }
  ]
}

See: Cloud Credentials Guide for detailed AWS setup instructions.


Google Cloud Storage Integration

Index Entire GCS Bucket

Index all files in a GCS bucket:

import requests

API_KEY = "cap_prod_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
ORG_ID = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"

headers = {
    "Authorization": f"Bearer {API_KEY}",
    "X-Organization-ID": ORG_ID
}

# Load service account JSON
with open('path/to/service-account-key.json', 'r') as f:
    service_account_json = f.read()

response = requests.post(
    "https://api.runcaptain.com/v1/index-gcs",
    headers=headers,
    data={
        'database_name': 'my_documents',
        'bucket_name': 'my-gcs-bucket',
        'service_account_json': service_account_json
    }
)

print(response.json())

Response:

{
  "status": "success",
  "message": "Indexing job started",
  "job_id": "job_1729876543_x9y8z7w6",
  "estimated_duration_seconds": 85,
  "files_found": 892
}

Index Single GCS File

Index a specific file from GCS:

import requests

API_KEY = "cap_prod_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
ORG_ID = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"

headers = {
    "Authorization": f"Bearer {API_KEY}",
    "X-Organization-ID": ORG_ID
}

with open('service-account-key.json', 'r') as f:
    service_account_json = f.read()

response = requests.post(
    "https://api.runcaptain.com/v1/index-gcs-file",
    headers=headers,
    data={
        'file_uri': 'gs://my-bucket/documents/report_q4_2024.pdf',
        'database_name': 'my_documents',
        'service_account_json': service_account_json
    }
)

GCS IAM Permissions

Your service account needs this role:

roles/storage.objectViewer

Or custom permissions:

storage.buckets.get
storage.objects.get
storage.objects.list

See: Cloud Credentials Guide for detailed GCS setup instructions.


Monitoring Indexing Jobs

Get Indexing Status

Check the progress of your indexing job:

import requests

response = requests.get(
    f"https://api.runcaptain.com/v1/indexing-status/{job_id}",
    headers={
        'Authorization': 'Bearer cap_prod_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
    }
)

status = response.json()
print(f"Completed: {status['completed']}")
print(f"Status: {status['status']}")
print(f"Active workers: {status['active_file_processing_workers']}")

Response:

{
  "completed": false,
  "status": "RUNNING",
  "active_file_processing_workers": 12,
  "files_processed": 458,
  "files_total": 1247,
  "progress_percentage": 36.7
}

Get Detailed Step Function Status

Get detailed execution status with ETA:

import requests

response = requests.get(
    f"https://api.runcaptain.com/v1/index-status/{job_id}",
    headers={
        'Authorization': 'Bearer cap_prod_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
    }
)

status = response.json()
print(f"Status: {status['status']}")
print(f"Progress: {status['progress']['percentage']}%")
print(f"ETA: {status['estimated_completion_time']}")

Response:

{
  "status": "RUNNING",
  "execution_arn": "arn:aws:states:us-east-1:123456789012:execution:captain-tagger:job_1729876543",
  "start_date": "2024-10-25T10:15:30Z",
  "progress": {
    "files_processed": 458,
    "files_total": 1247,
    "percentage": 36.7
  },
  "estimated_completion_time": "2024-10-25T10:17:45Z"
}

Cancel Indexing Job

Stop a running indexing job:

import requests

response = requests.post(
    f"https://api.runcaptain.com/v1/index-stop/{job_id}",
    headers={
        'Authorization': 'Bearer cap_prod_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
    }
)

print(response.json())

Response:

{
  "status": "success",
  "message": "Indexing job cancelled",
  "tasks_revoked": 789
}


DeepQuery: Natural Language Queries

Once your data is indexed, use DeepQuery to ask questions in natural language.

Execute Query

import requests

API_KEY = "cap_prod_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
ORG_ID = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"

headers = {
    "Authorization": f"Bearer {API_KEY}",
    "X-Organization-ID": ORG_ID
}

response = requests.post(
    "https://api.runcaptain.com/v1/query",
    headers=headers,
    data={
        'query': 'What are the revenue projections for Q4 2024?',
        'database_name': 'my_documents',
        'include_files': 'true'  # Optional: Include file metadata
    }
)

result = response.json()
print(result['response'])

Response:

{
  "status": "success",
  "response": "Based on the indexed documents, Q4 2024 revenue projections are $15.2M, representing a 23% increase over Q3...",
  "query": "What are the revenue projections for Q4 2024?",
  "database_name": "my_documents",
  "processing_metrics": {
    "total_files_processed": 42,
    "total_tokens": 85000,
    "execution_time_ms": 2850
  },
  "relevant_files": [
    {
      "file_name": "Q4_Financial_Forecast.xlsx",
      "relevancy_score": 0.95,
      "file_type": "xlsx",
      "file_id": "file_abc123"
    },
    {
      "file_name": "2024_Revenue_Analysis.pdf",
      "relevancy_score": 0.87,
      "file_type": "pdf",
      "file_id": "file_def456"
    }
  ]
}

Query with Streaming

Enable real-time streaming for immediate results:

import requests

API_KEY = "cap_prod_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
ORG_ID = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"

headers = {
    "Authorization": f"Bearer {API_KEY}",
    "X-Organization-ID": ORG_ID
}

response = requests.post(
    "https://api.runcaptain.com/v1/query",
    headers=headers,
    data={
        'query': 'Summarize all security vulnerabilities found in code reviews',
        'database_name': 'my_documents',
        'stream': 'true'
    },
    stream=True
)

# Process streamed chunks
for line in response.iter_lines():
    if line:
        line_text = line.decode('utf-8')
        if line_text.startswith('data: '):
            print(line_text[6:], end='', flush=True)

Idempotency

Prevent duplicate processing with idempotency keys:

import requests
import uuid

API_KEY = "cap_prod_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
ORG_ID = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"

idempotency_key = str(uuid.uuid4())

response = requests.post(
    "https://api.runcaptain.com/v1/query",
    headers={
        'Authorization': f'Bearer {API_KEY}',
        'X-Organization-ID': ORG_ID,
        'Idempotency-Key': idempotency_key
    },
    data={
        'query': 'What are the main findings?',
        'database_name': 'my_documents'
    }
)

# Subsequent requests with same key return cached response
# No additional processing or token usage

Database Management

Create Database

import requests

API_KEY = "cap_prod_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
ORG_ID = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"

headers = {
    "Authorization": f"Bearer {API_KEY}",
    "X-Organization-ID": ORG_ID
}

response = requests.post(
    "https://api.runcaptain.com/v1/create-database",
    headers=headers,
    data={
        'database_name': 'contracts_db'
    }
)

List Databases

import requests

API_KEY = "cap_prod_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
ORG_ID = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"

headers = {
    "Authorization": f"Bearer {API_KEY}",
    "X-Organization-ID": ORG_ID
}

response = requests.post(
    "https://api.runcaptain.com/v1/list-databases",
    headers=headers
)

databases = response.json()['databases']
for db in databases:
    print(f"{db['database_name']} - {db['database_id']}")

List Files in Database

import requests

API_KEY = "cap_prod_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
ORG_ID = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"

headers = {
    "Authorization": f"Bearer {API_KEY}",
    "X-Organization-ID": ORG_ID
}

response = requests.post(
    "https://api.runcaptain.com/v1/list-files",
    headers=headers,
    data={
        'database_name': 'my_documents',
        'limit': 50,
        'offset': 0
    }
)

files = response.json()['files']
for file in files:
    print(f"{file['file_name']} - {file['file_type']}")

Delete Single File

import requests

API_KEY = "cap_prod_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
ORG_ID = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"

headers = {
    "Authorization": f"Bearer {API_KEY}",
    "X-Organization-ID": ORG_ID
}

response = requests.post(
    "https://api.runcaptain.com/v1/delete-file",
    headers=headers,
    data={
        'database_name': 'my_documents',
        'file_id': 'file_abc123'
    }
)

Wipe Database

Clear all files while keeping the database structure:

import requests

API_KEY = "cap_prod_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
ORG_ID = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"

headers = {
    "Authorization": f"Bearer {API_KEY}",
    "X-Organization-ID": ORG_ID
}

response = requests.post(
    "https://api.runcaptain.com/v1/wipe-database",
    headers=headers,
    data={
        'database_name': 'my_documents'
    }
)

print(f"Wiped {response.json()['files_deleted']} files")

Delete Database

Permanently delete a database and all its files:

import requests

API_KEY = "cap_prod_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
ORG_ID = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"

headers = {
    "Authorization": f"Bearer {API_KEY}",
    "X-Organization-ID": ORG_ID
}

response = requests.post(
    "https://api.runcaptain.com/v1/delete-database",
    headers=headers,
    data={
        'database_name': 'my_documents'
    }
)

Architecture

How It All Works Together

┌────────────────────────────────────────────────┐
│          Your Cloud Storage (S3/GCS)           │
│  ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐          │
│  │ PDF  │ │ DOCX │ │ XLSX │ │ IMG  │          │
│  └──────┘ └──────┘ └──────┘ └──────┘          │
└────────────────┬───────────────────────────────┘
                 │ API: /index-s3 or /index-gcs
┌────────────────────────────────────────────────┐
│                Tagger Service                  │
│  ┌──────────────────────────────────────┐     │
│  │  1. Scan bucket (list all files)     │     │
│  │  2. Download & extract content       │     │
│  │  3. Intelligent chunking             │     │
│  │  4. AI tagging & summarization       │     │
│  │  5. Store in database                │     │
│  └──────────────────────────────────────┘     │
└────────────────┬───────────────────────────────┘
                 │ Indexed data stored
┌────────────────────────────────────────────────┐
│            Captain Database (RDS)              │
│  ┌──────────────────────────────────────┐     │
│  │ • File metadata                      │     │
│  │ • Extracted content chunks           │     │
│  │ • AI-generated tags & summaries      │     │
│  │ • Vector embeddings                  │     │
│  └──────────────────────────────────────┘     │
└────────────────┬───────────────────────────────┘
                 │ API: /query
┌────────────────────────────────────────────────┐
│              DeepQuery Service                 │
│  ┌──────────────────────────────────────┐     │
│  │  1. Parse natural language query     │     │
│  │  2. Search indexed data              │     │
│  │  3. Rank relevant chunks             │     │
│  │  4. Generate natural language answer │     │
│  │  5. Return results + metadata        │     │
│  └──────────────────────────────────────┘     │
└────────────────┬───────────────────────────────┘
                 │ Results returned
┌────────────────────────────────────────────────┐
│              Your Application                  │
│  • Natural language answers                    │
│  • Relevant file references                    │
│  • Processing metrics                          │
└────────────────────────────────────────────────┘

Component Details

Tagger: - AWS Lambda-based serverless architecture - Parallel processing for high throughput - Automatic retry logic for failed files - Redis job tracking - Average: 200-500 files/minute

DeepQuery: - AWS Step Functions orchestration - Multi-stage pipeline: 1. Query parsing 2. Semantic search 3. Relevance ranking 4. Context building 5. LLM generation - Average query time: 2-5 seconds - Supports streaming responses

Storage: - RDS Aurora PostgreSQL for structured data - S3 for large file storage - Vector embeddings for semantic search - Automatic backups and replication


Best Practices

Organizing Databases

By Project:

# Good: Separate databases per project
databases = [
    'project_alpha_docs',
    'project_beta_contracts',
    'project_gamma_reports'
]

By Department:

# Good: Separate databases per department
databases = [
    'engineering_docs',
    'legal_contracts',
    'finance_reports'
]

Avoid:

# Bad: Single database for everything
database = 'all_company_documents'  # Hard to manage, slower queries

Re-indexing Strategy

Files are automatically re-indexed when you run index again:

# First index
requests.post('.../index-s3', data={'database_name': 'docs', ...})

# Update some files in S3...

# Re-index (existing files will be updated)
requests.post('.../index-s3', data={'database_name': 'docs', ...})

Behavior: - Existing files with same name are soft-deleted - New versions are indexed - Maintains query history

Query Optimization

Specific > General:

# Good
"What are the security vulnerabilities in the authentication module?"

# Less effective
"Tell me about security"

Include Context:

# Good
"What were the revenue projections mentioned in Q4 2024 reports?"

# Less effective
"What are the numbers?"

Use File Filtering:

# Include file metadata to understand sources
response = requests.post('.../query', data={
    'query': '...',
    'include_files': 'true',  # Returns which files were used
    ...
})

Environment Isolation

Use different API keys for dev/staging/prod:

# Development
API_KEY_DEV = 'cap_dev_...'
DATABASE_NAME_DEV = 'docs_dev'

# Production
API_KEY_PROD = 'cap_prod_...'
DATABASE_NAME_PROD = 'docs_prod'

This ensures: - Development testing doesn't affect production - Complete data isolation - Separate token usage tracking


Use Cases

Enterprise Knowledge Base

Index company wikis, documentation, and internal resources for instant employee access.

Index contracts, case files, and legal research for rapid clause extraction and risk analysis.

Customer Support Automation

Index support tickets, documentation, and FAQs to power AI-driven support responses.

Compliance & Audit

Index financial reports, audit logs, and compliance documents for regulatory analysis.

Research & Development

Index scientific papers, patents, and research notes for literature review and prior art searches.

Index entire codebases for semantic code search, documentation generation, and technical debt analysis.


Error Handling

Common Errors

Invalid Credentials:

{
  "status": "error",
  "message": "Invalid AWS credentials",
  "error_code": "INVALID_CREDENTIALS"
}

Bucket Not Found:

{
  "status": "error",
  "message": "S3 bucket 'my-bucket' does not exist or is not accessible",
  "error_code": "BUCKET_NOT_FOUND"
}

Database Not Found:

{
  "status": "error",
  "message": "Database 'my_documents' not found",
  "error_code": "DATABASE_NOT_FOUND"
}

File Type Not Supported:

{
  "status": "warning",
  "message": "Skipped 5 unsupported files (.exe, .dll)",
  "files_indexed": 1242
}


Rate Limits & Quotas

Tier Indexing Jobs/Hour Queries/Minute Max Files/Database
Standard 10 10 50,000
Premium Unlimited 60 Unlimited

Contact support@runcaptain.com to upgrade to Premium tier.