Skip to content

Streaming Large Datasets

Memory-efficient data processing with streaming.

Why Stream?

Loading large datasets into memory can exhaust resources. Streaming processes data incrementally:

  • Memory Efficient: Process millions of records without loading all at once
  • Faster Start: Begin processing immediately without waiting for complete download
  • Scalable: Handle datasets of any size
  • Interruptible: Stop early if you find what you need

Basic Streaming

from py_gdelt import GDELTClient
from py_gdelt.filters import DateRange, EventFilter
from datetime import date, timedelta

async with GDELTClient() as client:
    event_filter = EventFilter(
        date_range=DateRange(
            start=date(2024, 1, 1),
            end=date(2024, 1, 7),
        ),
    )

    # Stream instead of query()
    async for event in client.events.stream(event_filter):
        process(event)  # Process one at a time

Filtering While Streaming

Apply additional filters during streaming:

us_protest_count = 0

async for event in client.events.stream(event_filter):
    # Filter in-stream
    if event.event_code == "14":  # Protest
        if hasattr(event, 'actor1') and event.actor1:
            if hasattr(event.actor1, 'country_code') and event.actor1.country_code == 'US':
                us_protest_count += 1

Early Exit

Stop processing when you have enough data:

count = 0
async for event in client.events.stream(event_filter):
    count += 1
    if count >= 1000:
        break  # Stop after 1000 events

Batching

Process in batches for efficiency:

batch_size = 100
batch = []

async for event in client.events.stream(event_filter):
    batch.append(event)

    if len(batch) >= batch_size:
        process_batch(batch)
        batch = []

# Process remaining
if batch:
    process_batch(batch)

GKG Streaming

Stream GKG records similarly:

from py_gdelt.filters import GKGFilter

gkg_filter = GKGFilter(
    date_range=DateRange(start=date(2024, 1, 1)),
    themes=["ENV_CLIMATECHANGE"],
)

async for record in client.gkg.stream(gkg_filter):
    # Process GKG record
    for theme in record.themes:
        print(theme.name)

NGrams Streaming

Stream word/phrase occurrences:

from py_gdelt.filters import NGramsFilter

ngrams_filter = NGramsFilter(
    date_range=DateRange(start=date(2024, 1, 1)),
    ngram="climate",
    language="en",
)

async for ngram in client.ngrams.stream(ngrams_filter):
    print(f"{ngram.context}")

Memory Monitoring

Track memory usage during streaming:

import psutil
import os

process = psutil.Process(os.getpid())

count = 0
async for event in client.events.stream(event_filter):
    count += 1

    if count % 10000 == 0:
        mem_mb = process.memory_info().rss / 1024 / 1024
        print(f"Processed {count} events, Memory: {mem_mb:.2f} MB")

Error Handling

Handle errors gracefully during streaming:

from py_gdelt.exceptions import DataError

try:
    async for event in client.events.stream(event_filter):
        try:
            process(event)
        except Exception as e:
            # Log and continue
            logger.error(f"Error processing event: {e}")
            continue
except DataError as e:
    logger.error(f"Data stream error: {e}")

Comparison: Query vs Stream

# query() - Loads all into memory
result = await client.events.query(event_filter)
for event in result:
    process(event)
# Memory: ~500MB for 100k events

# stream() - Process incrementally
async for event in client.events.stream(event_filter):
    process(event)
# Memory: ~50MB constant

Best Practices

  • Use stream() for >1000 records
  • Use query() for <1000 records (simpler)
  • Batch processing for efficiency
  • Handle errors per-record, not per-stream
  • Monitor memory in production
  • Use early exit to save resources
  • Apply filters early to reduce data volume