Jul 2, 2024
virtualenv
: python3 -m venv env
source env/bin/activate
pip install requests quickstreams
import requests
response = requests.get('https://api.open-meteo.com/v1/forecast', params={
'latitude': 51.5,
'longitude': -0.11,
'current_weather': 'true'
})
print(response.json())
from quickstreams import Application
app = Application(bootstrap_servers='localhost:1992', log_level='DEBUG')
producer = app.get_producer()
producer.produce('weather_data_demo', key='London', value=json.dumps(response.json()))
producer.flush()
with app.get_producer() as producer:
producer.produce('weather_data_demo', key='London', value=json.dumps(response.json()))
import requests
import json
from quickstreams import Application
def get_weather():
response = requests.get('https://api.open-meteo.com/v1/forecast', params={
'latitude': 51.5,
'longitude': -0.11,
'current_weather': 'true'
})
return response.json()
def main():
app = Application(bootstrap_servers='localhost:1992', log_level='DEBUG')
with app.get_producer() as producer:
weather = get_weather()
producer.produce('weather_data_demo', key='London', value=json.dumps(weather))
if __name__ == '__main__':
main()
get_weather()
- Fetch weather data from API.main()
- Main function to handle Kafka producer.import logging
logging.basicConfig(level=logging.DEBUG)
logging.debug('Got weather data')
logging.info('Produced message and sleeping...')
import time
while True:
weather = get_weather()
producer.produce('weather_data_demo', key='London', value=json.dumps(weather))
logging.info('Produced message and sleeping for 10 seconds...')
time.sleep(10)
kafkacat
to inspect the topic:
kafkacat -b localhost:1992 -t weather_data_demo -C -o end