Writing Data to Kafka Using Python

Jul 2, 2024

Writing Data to Kafka Using Python

Introduction

  • Aim: Show how to write data to Kafka using Python step by step from scratch.
  • Goal: Create a working application that writes data to Kafka.

Setting Up Environment

  1. Create a new directory
    • Ensure starting from scratch.
  2. Set up Python environment
    • Use virtualenv: python3 -m venv env
    • Activate environment: source env/bin/activate
  3. Install necessary libraries
    • pip install requests quickstreams

Step 1: Get Data

  • Import requests library:
    import requests
    
  • Fetch data from API:
    response = requests.get('https://api.open-meteo.com/v1/forecast', params={
        'latitude': 51.5,
        'longitude': -0.11,
        'current_weather': 'true'
    })
    print(response.json())
    
    • API Used: Open Meteo Weather API.
    • Example parameters: latitude (51.5, London), longitude (-0.11), current temperature.

Step 2: Write to Kafka

  1. Import QuickStreams:
    from quickstreams import Application
    
  2. Setup application object:
    app = Application(bootstrap_servers='localhost:1992', log_level='DEBUG')
    producer = app.get_producer()
    
  3. Produce message to Kafka topic:
    producer.produce('weather_data_demo', key='London', value=json.dumps(response.json()))
    producer.flush()
    
  4. Use with statement for automatic cleanup:
    with app.get_producer() as producer:
        producer.produce('weather_data_demo', key='London', value=json.dumps(response.json()))
    

Example Script

  • Combined script:
    import requests
    import json
    from quickstreams import Application
    
    def get_weather():
        response = requests.get('https://api.open-meteo.com/v1/forecast', params={
            'latitude': 51.5,
            'longitude': -0.11,
            'current_weather': 'true'
        })
        return response.json()
    
    def main():
        app = Application(bootstrap_servers='localhost:1992', log_level='DEBUG')
        with app.get_producer() as producer:
            weather = get_weather()
            producer.produce('weather_data_demo', key='London', value=json.dumps(weather))
    
    if __name__ == '__main__':
        main()
    

Refactoring and Making It a Real Application

  1. Refactor into functions:
    • get_weather() - Fetch weather data from API.
    • main() - Main function to handle Kafka producer.
  2. Adding Logging:
    import logging
    logging.basicConfig(level=logging.DEBUG)
    logging.debug('Got weather data')
    logging.info('Produced message and sleeping...')
    
  3. Run in a loop for continuous data fetching and publishing:
    import time
    while True:
        weather = get_weather()
        producer.produce('weather_data_demo', key='London', value=json.dumps(weather))
        logging.info('Produced message and sleeping for 10 seconds...')
        time.sleep(10)
    

Verifying the Data in Kafka

  • Use Kafka command-line tool kafkacat to inspect the topic:
    kafkacat -b localhost:1992 -t weather_data_demo -C -o end
    
    • Check the streamed weather data in real-time.

Conclusion

  • Successful creation of Python Kafka producer app.
  • Future tasks: Reading data from Kafka, processing data using Python.

Call to Action

  • Like and subscribe for more tutorials.