Skip to main content

cURL

Simple Text-to-Video

curl -X POST https://platform.runblob.io/v1/kling/o1-video/generate -H "Authorization: Bearer YOUR_API_KEY" -H "Content-Type: application/json" -d '{
  "prompt": "A majestic eagle flying over mountains at sunset",
  "duration": "10",
  "aspect_ratio": "16:9"
}'

Image-to-Video with URLs

curl -X POST https://platform.runblob.io/v1/kling/o1-video/generate -H "Authorization: Bearer YOUR_API_KEY" -H "Content-Type: application/json" -d '{
  "prompt": "Animate these photos into a smooth video sequence",
  "images_urls": [
    "https://example.com/photo1.jpg",
    "https://example.com/photo2.jpg",
    "https://example.com/photo3.jpg"
  ],
  "duration": "8",
  "aspect_ratio": "16:9"
}'

With Webhook

curl -X POST https://platform.runblob.io/v1/kling/o1-video/generate -H "Authorization: Bearer YOUR_API_KEY" -H "Content-Type: application/json" -d '{
  "prompt": "Epic cinematic scene",
  "duration": "10",
  "aspect_ratio": "16:9",
  "callback_url": "https://your-app.com/webhook/o1-video"
}'

Check Status

curl -X GET https://platform.runblob.io/v1/kling/o1-video/generations/550e8400-e29b-41d4-a716-446655440000 -H "Authorization: Bearer YOUR_API_KEY"

Python

Simple Client

import requests
import time

class KlingO1VideoClient:
    def __init__(self, api_key):
        self.api_key = api_key
        self.base_url = "https://platform.runblob.io"
        self.headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }
    
    def generate(self, prompt, **kwargs):
        response = requests.post(
            f"{self.base_url}/v1/kling/o1-video/generate",
            headers=self.headers,
            json={"prompt": prompt, **kwargs}
        )
        return response.json()
    
    def generate_from_images(self, prompt, images_urls, **kwargs):
        response = requests.post(
            f"{self.base_url}/v1/kling/o1-video/generate",
            headers=self.headers,
            json={
                "prompt": prompt,
                "images_urls": images_urls,
                **kwargs
            }
        )
        return response.json()
    
    def status(self, generation_id):
        response = requests.get(
            f"{self.base_url}/v1/kling/o1-video/generations/{generation_id}",
            headers=self.headers
        )
        return response.json()
    
    def wait_for_completion(self, generation_id, poll_interval=10):
        while True:
            result = self.status(generation_id)
            if result["status"] == "completed":
                return result["video_url"]
            elif result["status"] == "failed":
                raise Exception(f"Generation failed: {generation_id}")
            time.sleep(poll_interval)

# Text-to-Video
client = KlingO1VideoClient("YOUR_API_KEY")
result = client.generate(
    "A cinematic shot of a dragon flying",
    duration="10",
    aspect_ratio="16:9"
)
video_url = client.wait_for_completion(result["generation_id"])
print(f"Video ready: {video_url}")

# Image-to-Video
result = client.generate_from_images(
    "Animate these moments into a story",
    images_urls=[
        "https://example.com/photo1.jpg",
        "https://example.com/photo2.jpg"
    ],
    duration="8",
    aspect_ratio="16:9"
)
video_url = client.wait_for_completion(result["generation_id"])

Advanced Usage with Error Handling

import requests
import time
from typing import Optional, List

class KlingO1VideoAPI:
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = "https://platform.runblob.io/v1/kling/o1-video"
        self.headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }
    
    def generate(self, prompt: str, duration: str, 
                 aspect_ratio: str = "16:9",
                 images_urls: Optional[List[str]] = None,
                 callback_url: Optional[str] = None) -> dict:
        """Create a new video generation."""
        payload = {
            "prompt": prompt,
            "duration": duration,
            "aspect_ratio": aspect_ratio
        }
        
        if images_urls:
            if len(images_urls) > 5:
                raise ValueError("Maximum 5 images allowed")
            payload["images_urls"] = images_urls
        
        if callback_url:
            payload["callback_url"] = callback_url
        
        try:
            response = requests.post(
                f"{self.base_url}/generate",
                headers=self.headers,
                json=payload
            )
            response.raise_for_status()
            return response.json()
        except requests.exceptions.HTTPError as e:
            if e.response.status_code == 402:
                raise Exception("Insufficient credits")
            elif e.response.status_code == 401:
                raise Exception("Invalid API key")
            elif e.response.status_code == 400:
                error_msg = e.response.json().get("detail", "Bad request")
                raise Exception(f"Invalid request: {error_msg}")
            else:
                raise Exception(f"API Error: {e.response.text}")
    
    def get_status(self, generation_id: str) -> dict:
        """Get generation status."""
        try:
            response = requests.get(
                f"{self.base_url}/generations/{generation_id}",
                headers=self.headers
            )
            response.raise_for_status()
            return response.json()
        except requests.exceptions.HTTPError as e:
            if e.response.status_code == 404:
                raise Exception("Generation not found")
            raise Exception(f"API Error: {e.response.text}")
    
    def wait_for_completion(self, generation_id: str, 
                          timeout: int = 900) -> Optional[str]:
        """Wait for generation to complete with timeout (default 15 min)."""
        start_time = time.time()
        
        while time.time() - start_time < timeout:
            result = self.get_status(generation_id)
            
            if result["status"] == "completed":
                return result["video_url"]
            elif result["status"] == "failed":
                raise Exception(f"Generation failed")
            
            time.sleep(10)
        
        raise Exception("Generation timeout")

# Usage
try:
    client = KlingO1VideoAPI("YOUR_API_KEY")
    
    # Text-to-Video
    result = client.generate(
        prompt="Epic cinematic mountain landscape",
        duration="10",
        aspect_ratio="16:9"
    )
    
    print(f"Generation started: {result['generation_id']}")
    print(f"Price: ${result['price']}")
    
    # Wait for completion
    video_url = client.wait_for_completion(result['generation_id'])
    print(f"Video ready: {video_url}")
    
except Exception as e:
    print(f"Error: {str(e)}")

Upload Images as Base64

import base64
import requests

def image_to_base64(file_path):
    """Convert image file to base64 string."""
    with open(file_path, "rb") as f:
        image_data = base64.b64encode(f.read()).decode("utf-8")
        # Detect file extension
        ext = file_path.split('.')[-1].lower()
        mime_type = f"image/{ext if ext != 'jpg' else 'jpeg'}"
        return f"data:{mime_type};base64,{image_data}"

# Convert multiple images
images = [
    image_to_base64("photo1.jpg"),
    image_to_base64("photo2.jpg"),
    image_to_base64("photo3.jpg")
]

# Generate video
response = requests.post(
    "https://platform.runblob.io/v1/kling/o1-video/generate",
    headers={
        "Authorization": "Bearer YOUR_API_KEY",
        "Content-Type": "application/json"
    },
    json={
        "prompt": "Bring these photos to life with smooth animation",
        "images_base64": images,
        "duration": "8",
        "aspect_ratio": "16:9"
    }
)

result = response.json()
print(f"Generation ID: {result['generation_id']}")

JavaScript / Node.js

Basic Client

const fetch = require('node-fetch');

class KlingO1VideoAPI {
  constructor(apiKey) {
    this.apiKey = apiKey;
    this.baseUrl = 'https://platform.runblob.io/v1/kling/o1-video';
  }

  async generate(params) {
    const response = await fetch(`${this.baseUrl}/generate`, {
      method: 'POST',
      headers: {
        'Authorization': `Bearer ${this.apiKey}`,
        'Content-Type': 'application/json'
      },
      body: JSON.stringify({
        duration: '5',
        aspect_ratio: '16:9',
        ...params
      })
    });

    if (!response.ok) {
      throw new Error(`API Error: ${response.statusText}`);
    }

    return response.json();
  }

  async getStatus(generationId) {
    const response = await fetch(
      `${this.baseUrl}/generations/${generationId}`,
      {
        headers: {
          'Authorization': `Bearer ${this.apiKey}`
        }
      }
    );

    if (!response.ok) {
      throw new Error(`API Error: ${response.statusText}`);
    }

    return response.json();
  }

  async waitForCompletion(generationId, pollInterval = 10000) {
    while (true) {
      const result = await this.getStatus(generationId);
      
      if (result.status === 'completed') {
        return result.video_url;
      } else if (result.status === 'failed') {
        throw new Error('Generation failed');
      }
      
      await new Promise(resolve => setTimeout(resolve, pollInterval));
    }
  }
}

// Usage
const client = new KlingO1VideoAPI('YOUR_API_KEY');

// Text-to-Video
client.generate({
  prompt: 'Epic cinematic scene of a dragon',
  duration: '10'
})
  .then(result => {
    console.log('Generation ID:', result.generation_id);
    return client.waitForCompletion(result.generation_id);
  })
  .then(videoUrl => {
    console.log('Video ready:', videoUrl);
  })
  .catch(error => {
    console.error('Error:', error.message);
  });

// Image-to-Video
client.generate({
  prompt: 'Animate these photos',
  images_urls: [
    'https://example.com/photo1.jpg',
    'https://example.com/photo2.jpg'
  ],
  duration: '8',
  aspect_ratio: '16:9'
})
  .then(result => client.waitForCompletion(result.generation_id))
  .then(videoUrl => console.log('Video ready:', videoUrl));
// Frontend - Start generation
const response = await fetch('https://platform.runblob.io/v1/kling/o1-video/generate', {
  method: 'POST',
  headers: {
    'Authorization': 'Bearer YOUR_API_KEY',
    'Content-Type': 'application/json'
  },
  body: JSON.stringify({
    prompt: 'Epic cinematic scene',
    duration: '10',
    aspect_ratio: '16:9',
    callback_url: 'https://your-backend.com/webhooks/kling-video'
  })
});

const { generation_id } = await response.json();
// Save generation_id, wait for webhook

// Backend - Handle webhook
app.post('/webhooks/kling-video', async (req, res) => {
  const { generation_id, status, video_url, message } = req.body;

  if (status === 'completed') {
    // Update database
    await db.updateGeneration(generation_id, {
      status: 'completed',
      videoUrl: video_url
    });

    // Notify user
    io.emit(`generation:${generation_id}`, {
      status: 'completed',
      videoUrl: video_url
    });
  } else if (status === 'failed') {
    await db.updateGeneration(generation_id, {
      status: 'failed',
      error: message
    });

    io.emit(`generation:${generation_id}`, {
      status: 'failed',
      error: message
    });
  }

  res.sendStatus(200);
});

Upload Images from Browser

// Convert File to Base64
function fileToBase64(file) {
  return new Promise((resolve, reject) => {
    const reader = new FileReader();
    reader.onload = () => resolve(reader.result);
    reader.onerror = reject;
    reader.readAsDataURL(file);
  });
}

// Usage in React/Vue
const handleFileUpload = async (files) => {
  const base64Images = await Promise.all(
    Array.from(files).map(fileToBase64)
  );

  const response = await fetch('https://platform.runblob.io/v1/kling/o1-video/generate', {
    method: 'POST',
    headers: {
      'Authorization': 'Bearer YOUR_API_KEY',
      'Content-Type': 'application/json'
    },
    body: JSON.stringify({
      prompt: 'Bring these photos to life',
      images_base64: base64Images,
      duration: '5',
      aspect_ratio: '1:1'
    })
  });

  const { generation_id } = await response.json();
  return generation_id;
};

Best Practices

✅ Use Webhooks for Production

// Good: Use webhooks
{
  "prompt": "...",
  "callback_url": "https://your-backend.com/webhooks"
}

✅ Validate Input Before Sending

function validateRequest(data) {
  // Check prompt length
  if (data.prompt.length < 1 || data.prompt.length > 2500) {
    return 'Prompt must be 1-2500 characters';
  }

  // Check duration for text-to-video
  if (!data.images_urls && !data.images_base64) {
    if (!['5', '10'].includes(data.duration)) {
      return 'Text-to-video only supports 5s or 10s';
    }
  }

  // Check image count
  const imageCount = (data.images_urls?.length || 0) + 
                     (data.images_base64?.length || 0);
  if (imageCount > 5) {
    return 'Maximum 5 images allowed';
  }

  return null; // Valid
}

✅ Handle Errors Gracefully

async function safeGenerate(request) {
  try {
    const response = await fetch('/v1/kling/o1-video/generate', {
      method: 'POST',
      headers: {
        'Authorization': `Bearer ${apiKey}`,
        'Content-Type': 'application/json'
      },
      body: JSON.stringify(request)
    });

    if (!response.ok) {
      const error = await response.json();
      
      if (response.status === 402) {
        throw new Error('INSUFFICIENT_CREDITS: Please top up your balance');
      }
      
      if (response.status === 400) {
        throw new Error(`Invalid request: ${error.detail}`);
      }
      
      throw new Error(`HTTP ${response.status}: ${error.detail}`);
    }

    return await response.json();

  } catch (error) {
    console.error('Generation failed:', error);
    throw error;
  }
}