Integrate Captain with Your Data Lake
Connect Captain to your cloud storage (AWS S3 or Google Cloud Storage) to index and query your entire data lake. This integration enables persistent, searchable databases from your existing file storage infrastructure.
Overview
Captain's Data Lake Integration consists of three key components working together:
- Tagger - Scans your cloud storage, processes files, and extracts structured data
- DeepQuery - Executes natural language queries against indexed data
- Cloud Storage - Your AWS S3 or GCS buckets containing documents
┌─────────────────┐
│ Your Cloud │
│ Storage (S3/ │
│ GCS) │
└────────┬────────┘
│
│ Tagger scans & indexes
▼
┌─────────────────┐
│ Captain │
│ Database │
└────────┬────────┘
│
│ DeepQuery executes
▼
┌─────────────────┐
│ Natural Lang. │
│ Queries │
└─────────────────┘
Getting Started
Prerequisites
- Captain API key (get one at app.runcaptain.com)
- AWS S3 bucket or Google Cloud Storage bucket
- Cloud storage credentials (see Cloud Credentials Guide)
Authentication Setup
All Captain API endpoints use header-based authentication. Set up your credentials:
# Step 1: Set your credentials
API_KEY = "cap_prod_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
ORG_ID = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
# Step 2: Create headers dictionary (reuse for all requests)
headers = {
"Authorization": f"Bearer {API_KEY}",
"X-Organization-ID": ORG_ID
}
# Step 3: Use headers in every request
response = requests.post(
"https://api.runcaptain.com/v1/...",
headers=headers,
data={...}
)
Required headers for all requests:
- Authorization: Bearer {your_api_key}
- X-Organization-ID: {your_org_id}
Quick Start (4 Steps)
import requests
# Set your credentials
API_KEY = "cap_prod_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
ORG_ID = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
BASE_URL = "https://api.runcaptain.com"
headers = {
"Authorization": f"Bearer {API_KEY}",
"X-Organization-ID": ORG_ID
}
# 1. Create a database
requests.post(
f"{BASE_URL}/v1/create-database",
headers=headers,
data={
'database_name': 'my_documents'
}
)
# 2. Index your S3 bucket (Tagger will scan all files)
index_response = requests.post(
f"{BASE_URL}/v1/index-s3",
headers=headers,
data={
'database_name': 'my_documents',
'bucket_name': 'my-s3-bucket',
'aws_access_key_id': 'AKIA...',
'aws_secret_access_key': '...',
'bucket_region': 'us-east-1'
}
)
job_id = index_response.json()['job_id']
# 3. Monitor indexing progress
status = requests.get(
f"{BASE_URL}/v1/indexing-status/{job_id}",
headers=headers
)
# 4. Query your data (DeepQuery executes the search)
response = requests.post(
f"{BASE_URL}/v1/query",
headers=headers,
data={
'query': 'What are the main themes in Q4 reports?',
'database_name': 'my_documents'
}
)
Tagger: Intelligent File Indexing
The Tagger service automatically scans your cloud storage, extracts content from files, and creates searchable indexes.
Supported File Types
Documents: - PDF (.pdf) - Up to 512MB with automatic page chunking - Microsoft Word (.docx) - Text files (.txt) - Markdown (.md) - Rich Text Format (.rtf) - OpenDocument Text (.odt)
Spreadsheets & Data: - Microsoft Excel (.xlsx, .xls) - Row-based chunking - CSV (.csv) - Header preservation - JSON (.json)
Presentations: - Microsoft PowerPoint (.pptx, .ppt)
Images (with OCR & Computer Vision): - JPEG (.jpg, .jpeg) - PNG (.png) - BMP (.bmp) - Experimental - GIF (.gif) - Experimental - TIFF (.tiff) - Experimental
Code Files: - Python (.py) - TypeScript (.ts) - JavaScript (.js) - HTML (.html) - CSS (.css) - PHP (.php) - Java (.java)
Web Content: - XML (.xml)
How Tagger Works
- Discovery: Scans your bucket and identifies all supported files
- Processing: Extracts text, images, and metadata from each file
- Chunking: Intelligently splits large files into searchable chunks
- Tagging: Generates AI-powered tags and summaries for each chunk
- Indexing: Stores processed data in your Captain database
Processing Time: - Average: 2-5 seconds per file - Large PDFs (100+ pages): 10-30 seconds - Entire bucket (1000 files): 45-120 seconds
AWS S3 Integration
Index Entire S3 Bucket
Index all files in an S3 bucket:
import requests
API_KEY = "cap_prod_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
ORG_ID = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
headers = {
"Authorization": f"Bearer {API_KEY}",
"X-Organization-ID": ORG_ID
}
response = requests.post(
"https://api.runcaptain.com/v1/index-s3",
headers=headers,
data={
'database_name': 'my_documents',
'bucket_name': 'my-company-docs',
'aws_access_key_id': 'AKIAIOSFODNN7EXAMPLE',
'aws_secret_access_key': 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY',
'bucket_region': 'us-east-1'
}
)
print(response.json())
Response:
{
"status": "success",
"message": "Indexing job started",
"job_id": "job_1729876543_a1b2c3d4",
"estimated_duration_seconds": 90,
"files_found": 1247
}
Index Single S3 File
Index a specific file from S3:
import requests
API_KEY = "cap_prod_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
ORG_ID = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
headers = {
"Authorization": f"Bearer {API_KEY}",
"X-Organization-ID": ORG_ID
}
response = requests.post(
"https://api.runcaptain.com/v1/index-s3-file",
headers=headers,
data={
'file_uri': 's3://my-bucket/documents/report_q4_2024.pdf',
'database_name': 'my_documents',
'aws_access_key_id': 'AKIAIOSFODNN7EXAMPLE',
'aws_secret_access_key': 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY',
'bucket_region': 'us-east-1'
}
)
AWS IAM Permissions
Your AWS credentials need these permissions:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:ListBucket",
"s3:GetObject"
],
"Resource": [
"arn:aws:s3:::your-bucket-name",
"arn:aws:s3:::your-bucket-name/*"
]
}
]
}
See: Cloud Credentials Guide for detailed AWS setup instructions.
Google Cloud Storage Integration
Index Entire GCS Bucket
Index all files in a GCS bucket:
import requests
API_KEY = "cap_prod_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
ORG_ID = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
headers = {
"Authorization": f"Bearer {API_KEY}",
"X-Organization-ID": ORG_ID
}
# Load service account JSON
with open('path/to/service-account-key.json', 'r') as f:
service_account_json = f.read()
response = requests.post(
"https://api.runcaptain.com/v1/index-gcs",
headers=headers,
data={
'database_name': 'my_documents',
'bucket_name': 'my-gcs-bucket',
'service_account_json': service_account_json
}
)
print(response.json())
Response:
{
"status": "success",
"message": "Indexing job started",
"job_id": "job_1729876543_x9y8z7w6",
"estimated_duration_seconds": 85,
"files_found": 892
}
Index Single GCS File
Index a specific file from GCS:
import requests
API_KEY = "cap_prod_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
ORG_ID = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
headers = {
"Authorization": f"Bearer {API_KEY}",
"X-Organization-ID": ORG_ID
}
with open('service-account-key.json', 'r') as f:
service_account_json = f.read()
response = requests.post(
"https://api.runcaptain.com/v1/index-gcs-file",
headers=headers,
data={
'file_uri': 'gs://my-bucket/documents/report_q4_2024.pdf',
'database_name': 'my_documents',
'service_account_json': service_account_json
}
)
GCS IAM Permissions
Your service account needs this role:
Or custom permissions:
See: Cloud Credentials Guide for detailed GCS setup instructions.
Monitoring Indexing Jobs
Get Indexing Status
Check the progress of your indexing job:
import requests
response = requests.get(
f"https://api.runcaptain.com/v1/indexing-status/{job_id}",
headers={
'Authorization': 'Bearer cap_prod_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
}
)
status = response.json()
print(f"Completed: {status['completed']}")
print(f"Status: {status['status']}")
print(f"Active workers: {status['active_file_processing_workers']}")
Response:
{
"completed": false,
"status": "RUNNING",
"active_file_processing_workers": 12,
"files_processed": 458,
"files_total": 1247,
"progress_percentage": 36.7
}
Get Detailed Step Function Status
Get detailed execution status with ETA:
import requests
response = requests.get(
f"https://api.runcaptain.com/v1/index-status/{job_id}",
headers={
'Authorization': 'Bearer cap_prod_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
}
)
status = response.json()
print(f"Status: {status['status']}")
print(f"Progress: {status['progress']['percentage']}%")
print(f"ETA: {status['estimated_completion_time']}")
Response:
{
"status": "RUNNING",
"execution_arn": "arn:aws:states:us-east-1:123456789012:execution:captain-tagger:job_1729876543",
"start_date": "2024-10-25T10:15:30Z",
"progress": {
"files_processed": 458,
"files_total": 1247,
"percentage": 36.7
},
"estimated_completion_time": "2024-10-25T10:17:45Z"
}
Cancel Indexing Job
Stop a running indexing job:
import requests
response = requests.post(
f"https://api.runcaptain.com/v1/index-stop/{job_id}",
headers={
'Authorization': 'Bearer cap_prod_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
}
)
print(response.json())
Response:
DeepQuery: Natural Language Queries
Once your data is indexed, use DeepQuery to ask questions in natural language.
Execute Query
import requests
API_KEY = "cap_prod_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
ORG_ID = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
headers = {
"Authorization": f"Bearer {API_KEY}",
"X-Organization-ID": ORG_ID
}
response = requests.post(
"https://api.runcaptain.com/v1/query",
headers=headers,
data={
'query': 'What are the revenue projections for Q4 2024?',
'database_name': 'my_documents',
'include_files': 'true' # Optional: Include file metadata
}
)
result = response.json()
print(result['response'])
Response:
{
"status": "success",
"response": "Based on the indexed documents, Q4 2024 revenue projections are $15.2M, representing a 23% increase over Q3...",
"query": "What are the revenue projections for Q4 2024?",
"database_name": "my_documents",
"processing_metrics": {
"total_files_processed": 42,
"total_tokens": 85000,
"execution_time_ms": 2850
},
"relevant_files": [
{
"file_name": "Q4_Financial_Forecast.xlsx",
"relevancy_score": 0.95,
"file_type": "xlsx",
"file_id": "file_abc123"
},
{
"file_name": "2024_Revenue_Analysis.pdf",
"relevancy_score": 0.87,
"file_type": "pdf",
"file_id": "file_def456"
}
]
}
Query with Streaming
Enable real-time streaming for immediate results:
import requests
API_KEY = "cap_prod_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
ORG_ID = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
headers = {
"Authorization": f"Bearer {API_KEY}",
"X-Organization-ID": ORG_ID
}
response = requests.post(
"https://api.runcaptain.com/v1/query",
headers=headers,
data={
'query': 'Summarize all security vulnerabilities found in code reviews',
'database_name': 'my_documents',
'stream': 'true'
},
stream=True
)
# Process streamed chunks
for line in response.iter_lines():
if line:
line_text = line.decode('utf-8')
if line_text.startswith('data: '):
print(line_text[6:], end='', flush=True)
Idempotency
Prevent duplicate processing with idempotency keys:
import requests
import uuid
API_KEY = "cap_prod_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
ORG_ID = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
idempotency_key = str(uuid.uuid4())
response = requests.post(
"https://api.runcaptain.com/v1/query",
headers={
'Authorization': f'Bearer {API_KEY}',
'X-Organization-ID': ORG_ID,
'Idempotency-Key': idempotency_key
},
data={
'query': 'What are the main findings?',
'database_name': 'my_documents'
}
)
# Subsequent requests with same key return cached response
# No additional processing or token usage
Database Management
Create Database
import requests
API_KEY = "cap_prod_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
ORG_ID = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
headers = {
"Authorization": f"Bearer {API_KEY}",
"X-Organization-ID": ORG_ID
}
response = requests.post(
"https://api.runcaptain.com/v1/create-database",
headers=headers,
data={
'database_name': 'contracts_db'
}
)
List Databases
import requests
API_KEY = "cap_prod_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
ORG_ID = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
headers = {
"Authorization": f"Bearer {API_KEY}",
"X-Organization-ID": ORG_ID
}
response = requests.post(
"https://api.runcaptain.com/v1/list-databases",
headers=headers
)
databases = response.json()['databases']
for db in databases:
print(f"{db['database_name']} - {db['database_id']}")
List Files in Database
import requests
API_KEY = "cap_prod_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
ORG_ID = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
headers = {
"Authorization": f"Bearer {API_KEY}",
"X-Organization-ID": ORG_ID
}
response = requests.post(
"https://api.runcaptain.com/v1/list-files",
headers=headers,
data={
'database_name': 'my_documents',
'limit': 50,
'offset': 0
}
)
files = response.json()['files']
for file in files:
print(f"{file['file_name']} - {file['file_type']}")
Delete Single File
import requests
API_KEY = "cap_prod_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
ORG_ID = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
headers = {
"Authorization": f"Bearer {API_KEY}",
"X-Organization-ID": ORG_ID
}
response = requests.post(
"https://api.runcaptain.com/v1/delete-file",
headers=headers,
data={
'database_name': 'my_documents',
'file_id': 'file_abc123'
}
)
Wipe Database
Clear all files while keeping the database structure:
import requests
API_KEY = "cap_prod_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
ORG_ID = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
headers = {
"Authorization": f"Bearer {API_KEY}",
"X-Organization-ID": ORG_ID
}
response = requests.post(
"https://api.runcaptain.com/v1/wipe-database",
headers=headers,
data={
'database_name': 'my_documents'
}
)
print(f"Wiped {response.json()['files_deleted']} files")
Delete Database
Permanently delete a database and all its files:
import requests
API_KEY = "cap_prod_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
ORG_ID = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
headers = {
"Authorization": f"Bearer {API_KEY}",
"X-Organization-ID": ORG_ID
}
response = requests.post(
"https://api.runcaptain.com/v1/delete-database",
headers=headers,
data={
'database_name': 'my_documents'
}
)
Architecture
How It All Works Together
┌────────────────────────────────────────────────┐
│ Your Cloud Storage (S3/GCS) │
│ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ │
│ │ PDF │ │ DOCX │ │ XLSX │ │ IMG │ │
│ └──────┘ └──────┘ └──────┘ └──────┘ │
└────────────────┬───────────────────────────────┘
│
│ API: /index-s3 or /index-gcs
▼
┌────────────────────────────────────────────────┐
│ Tagger Service │
│ ┌──────────────────────────────────────┐ │
│ │ 1. Scan bucket (list all files) │ │
│ │ 2. Download & extract content │ │
│ │ 3. Intelligent chunking │ │
│ │ 4. AI tagging & summarization │ │
│ │ 5. Store in database │ │
│ └──────────────────────────────────────┘ │
└────────────────┬───────────────────────────────┘
│
│ Indexed data stored
▼
┌────────────────────────────────────────────────┐
│ Captain Database (RDS) │
│ ┌──────────────────────────────────────┐ │
│ │ • File metadata │ │
│ │ • Extracted content chunks │ │
│ │ • AI-generated tags & summaries │ │
│ │ • Vector embeddings │ │
│ └──────────────────────────────────────┘ │
└────────────────┬───────────────────────────────┘
│
│ API: /query
▼
┌────────────────────────────────────────────────┐
│ DeepQuery Service │
│ ┌──────────────────────────────────────┐ │
│ │ 1. Parse natural language query │ │
│ │ 2. Search indexed data │ │
│ │ 3. Rank relevant chunks │ │
│ │ 4. Generate natural language answer │ │
│ │ 5. Return results + metadata │ │
│ └──────────────────────────────────────┘ │
└────────────────┬───────────────────────────────┘
│
│ Results returned
▼
┌────────────────────────────────────────────────┐
│ Your Application │
│ • Natural language answers │
│ • Relevant file references │
│ • Processing metrics │
└────────────────────────────────────────────────┘
Component Details
Tagger: - AWS Lambda-based serverless architecture - Parallel processing for high throughput - Automatic retry logic for failed files - Redis job tracking - Average: 200-500 files/minute
DeepQuery: - AWS Step Functions orchestration - Multi-stage pipeline: 1. Query parsing 2. Semantic search 3. Relevance ranking 4. Context building 5. LLM generation - Average query time: 2-5 seconds - Supports streaming responses
Storage: - RDS Aurora PostgreSQL for structured data - S3 for large file storage - Vector embeddings for semantic search - Automatic backups and replication
Best Practices
Organizing Databases
By Project:
# Good: Separate databases per project
databases = [
'project_alpha_docs',
'project_beta_contracts',
'project_gamma_reports'
]
By Department:
# Good: Separate databases per department
databases = [
'engineering_docs',
'legal_contracts',
'finance_reports'
]
Avoid:
# Bad: Single database for everything
database = 'all_company_documents' # Hard to manage, slower queries
Re-indexing Strategy
Files are automatically re-indexed when you run index again:
# First index
requests.post('.../index-s3', data={'database_name': 'docs', ...})
# Update some files in S3...
# Re-index (existing files will be updated)
requests.post('.../index-s3', data={'database_name': 'docs', ...})
Behavior: - Existing files with same name are soft-deleted - New versions are indexed - Maintains query history
Query Optimization
Specific > General:
# Good
"What are the security vulnerabilities in the authentication module?"
# Less effective
"Tell me about security"
Include Context:
# Good
"What were the revenue projections mentioned in Q4 2024 reports?"
# Less effective
"What are the numbers?"
Use File Filtering:
# Include file metadata to understand sources
response = requests.post('.../query', data={
'query': '...',
'include_files': 'true', # Returns which files were used
...
})
Environment Isolation
Use different API keys for dev/staging/prod:
# Development
API_KEY_DEV = 'cap_dev_...'
DATABASE_NAME_DEV = 'docs_dev'
# Production
API_KEY_PROD = 'cap_prod_...'
DATABASE_NAME_PROD = 'docs_prod'
This ensures: - Development testing doesn't affect production - Complete data isolation - Separate token usage tracking
Use Cases
Enterprise Knowledge Base
Index company wikis, documentation, and internal resources for instant employee access.
Legal Document Management
Index contracts, case files, and legal research for rapid clause extraction and risk analysis.
Customer Support Automation
Index support tickets, documentation, and FAQs to power AI-driven support responses.
Compliance & Audit
Index financial reports, audit logs, and compliance documents for regulatory analysis.
Research & Development
Index scientific papers, patents, and research notes for literature review and prior art searches.
Code Repository Search
Index entire codebases for semantic code search, documentation generation, and technical debt analysis.
Error Handling
Common Errors
Invalid Credentials:
Bucket Not Found:
{
"status": "error",
"message": "S3 bucket 'my-bucket' does not exist or is not accessible",
"error_code": "BUCKET_NOT_FOUND"
}
Database Not Found:
{
"status": "error",
"message": "Database 'my_documents' not found",
"error_code": "DATABASE_NOT_FOUND"
}
File Type Not Supported:
{
"status": "warning",
"message": "Skipped 5 unsupported files (.exe, .dll)",
"files_indexed": 1242
}
Rate Limits & Quotas
| Tier | Indexing Jobs/Hour | Queries/Minute | Max Files/Database |
|---|---|---|---|
| Standard | 10 | 10 | 50,000 |
| Premium | Unlimited | 60 | Unlimited |
Contact support@runcaptain.com to upgrade to Premium tier.
Related Documentation
- Infinite-Responses API - Process massive text inputs without indexing
- Cloud Credentials Guide - Detailed AWS & GCS setup
- API Reference - Complete endpoint documentation
- Getting Started - Quick start guide