Skip to content

Instantly share code, notes, and snippets.

@uranusjr
Last active October 14, 2016 10:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save uranusjr/453f112e219275a5cf3af05c851ed4f3 to your computer and use it in GitHub Desktop.
Save uranusjr/453f112e219275a5cf3af05c851ed4f3 to your computer and use it in GitHub Desktop.
Simple demo for different async styles using asyncio.
import requests
TOKEN = '__token__'
def main():
# Send request for repo data.
response = session.get('https://api.github.com/users/uranusjr/repos')
# Read repo data.
if response.status_code != 200:
print('API error {}'.format(response.status_code))
return
data = response.json()
# Send requests for each forked repo.
responses = []
for d in data:
if d.get('fork'):
responses.append(session.get(d['url']))
# Read data of fork sources.
datasets = [r.json() for r in responses if r.status_code == 200]
# Read source names.
print('Forked sources:')
for data in datasets:
print('{} from {}'.format(data['name'], data['source']['full_name']))
session = requests.Session()
session.headers['Authorization'] = 'token {}'.format(TOKEN)
main()
import asyncio
import aiohttp
TOKEN = '__token__'
def cleanup():
session.close()
loop.stop()
def print_source_names(future):
datasets = future.result()
print('Forked sources:')
for data in datasets:
print('{} from {}'.format(data['name'], data['source']['full_name']))
cleanup()
def read_forked_repo_joined(future):
responses = future.result()
futures = (r.json() for r in responses if r.status == 200)
task = asyncio.gather(*futures, loop=loop)
task.add_done_callback(print_source_names)
def send_forked_repo_requests(future):
data = future.result()
futures = (session.get(d['url']) for d in data if d.get('fork'))
task = asyncio.gather(*futures, loop=loop)
task.add_done_callback(read_forked_repo_joined)
def read_repos(future):
response = future.result()
if response.status != 200:
print('API error {}'.format(response.status))
if response.connection:
response.connection.close()
response.close()
cleanup()
return
future = response.json()
task = loop.create_task(future)
task.add_done_callback(send_forked_repo_requests)
def send_repos_request():
future = session.get('https://api.github.com/users/uranusjr/repos')
task = loop.create_task(future)
task.add_done_callback(read_repos)
loop = asyncio.get_event_loop()
session = aiohttp.ClientSession(
loop=loop,
headers={'Authorization': 'token {}'.format(TOKEN)},
)
loop.call_soon(send_repos_request)
loop.run_forever()
import asyncio
import aiohttp
TOKEN = '__token__'
class Application:
def __init__(self):
self.loop = asyncio.get_event_loop()
self.session = aiohttp.ClientSession(
loop=self.loop,
headers={'Authorization': 'token {}'.format(TOKEN)}
)
def run(self):
self.loop.call_soon(self.send_repos_request)
self.loop.run_forever()
def finalize(self):
self.session.close()
self.loop.stop()
def send_repos_request(self):
future = self.session.get(
'https://api.github.com/users/uranusjr/repos',
)
task = self.loop.create_task(future)
task.add_done_callback(self.read_repos)
def read_repos(self, future):
response = future.result()
if response.status != 200:
print('API error {}'.format(response.status))
if response.connection:
response.connection.close()
response.close()
self.finalize()
return
future = response.json()
task = self.loop.create_task(future)
task.add_done_callback(self.send_forked_repo_requests)
def send_forked_repo_requests(self, future):
data = future.result()
futures = (self.session.get(d['url']) for d in data if d.get('fork'))
task = asyncio.gather(*futures, loop=self.loop)
task.add_done_callback(self.read_forked_repos)
def read_forked_repos(self, future):
responses = future.result()
futures = (r.json() for r in responses if r.status == 200)
task = asyncio.gather(*futures, loop=self.loop)
task.add_done_callback(self.print_source_names)
def print_source_names(self, future):
datasets = future.result()
print('Forked sources:')
for data in datasets:
print('{} from {}'.format(
data['name'],
data['source']['full_name'],
))
self.finalize()
app = Application()
app.run()
import asyncio
import aiohttp
from promise import Promise as BasePromise, promisify
TOKEN = '__token__'
class Promise(BasePromise):
"""Use asyncio's event loop to fulfill/reject the promise.
"""
@classmethod
def fulfilled(cls, x):
p = cls()
loop.call_soon(p.fulfill, x)
return p
@classmethod
def rejected(cls, x):
p = cls()
loop.call_soon(p.reject, x)
return p
def send_repos_request():
promise = promisify(session.get(
'https://api.github.com/users/uranusjr/repos',
))
return promise
class APIError(Exception):
pass
def read_repos(response):
if response.status != 200:
print('API error {}'.format(response.status))
if response.connection:
response.connection.close()
response.close()
return Promise.rejected(APIError())
promise = promisify(response.json())
return promise
def send_forked_repo_requests(data):
futures = (session.get(d['url']) for d in data if d.get('fork'))
promise = promisify(asyncio.gather(*futures, loop=loop))
return promise
def read_forked_repo_requests(responses):
futures = (r.json() for r in responses if r.status == 200)
task = asyncio.gather(*futures, loop=loop)
promise = promisify(task)
return promise
def print_source_names(datasets):
print('Forked sources:')
for data in datasets:
print('{} from {}'.format(
data['name'],
data['source']['full_name'],
))
return Promise.resolve(None)
def cleanup(_):
session.close()
loop.stop()
def main():
(send_repos_request()
.then(read_repos)
.then(send_forked_repo_requests, cleanup)
.then(read_forked_repo_requests)
.then(print_source_names)
.done(cleanup))
loop = asyncio.get_event_loop()
session = aiohttp.ClientSession(
loop=loop,
headers={'Authorization': 'token {}'.format(TOKEN)},
)
loop.call_soon(main)
loop.run_forever()
import asyncio
import aiohttp
TOKEN = '__token__'
@asyncio.coroutine
def send_repos_request():
url = 'https://api.github.com/users/uranusjr/repos'
response = yield from session.get(url)
return response
class APIError(Exception):
pass
@asyncio.coroutine
def read_repos(response):
if response.status != 200:
print('API error {}'.format(response.status))
raise APIError
data = yield from response.json()
return data
@asyncio.coroutine
def send_forked_repo_requests(data):
futures = (session.get(d['url']) for d in data if d.get('fork'))
task = asyncio.gather(*futures, loop=loop)
responses = yield from task
return responses
@asyncio.coroutine
def read_forked_repo_requests(responses):
futures = (r.json() for r in responses if r.status == 200)
task = asyncio.gather(*futures, loop=loop)
datasets = yield from task
return datasets
def print_source_names(datasets):
print('Forked sources:')
for data in datasets:
print('{} from {}'.format(
data['name'],
data['source']['full_name'],
))
def cleanup():
session.close()
loop.stop()
@asyncio.coroutine
def main():
response = yield from send_repos_request()
try:
data = yield from read_repos(response)
except APIError:
if response.connection:
response.connection.close()
response.close()
cleanup()
return
responses = yield from send_forked_repo_requests(data)
datasets = yield from read_forked_repo_requests(responses)
print_source_names(datasets)
cleanup()
loop = asyncio.get_event_loop()
session = aiohttp.ClientSession(
loop=loop,
headers={'Authorization': 'token {}'.format(TOKEN)},
)
loop.create_task(main())
loop.run_forever()
import asyncio
import aiohttp
TOKEN = '__token__'
async def send_repos_request(session):
response = await session.get('https://api.github.com/users/uranusjr/repos')
return response
class APIError(Exception):
pass
async def read_repos(response):
if response.status != 200:
print('API error {}'.format(response.status))
raise APIError
data = await response.json()
return data
async def send_forked_repo_requests(session, data):
futures = (session.get(d['url']) for d in data if d.get('fork'))
task = asyncio.gather(*futures, loop=loop)
responses = await task
return responses
async def read_forked_repo_requests(responses):
futures = (r.json() for r in responses if r.status == 200)
task = asyncio.gather(*futures, loop=loop)
datasets = await task
return datasets
def print_source_names(datasets):
print('Forked sources:')
for data in datasets:
print('{} from {}'.format(
data['name'],
data['source']['full_name'],
))
async def run(session):
response = await send_repos_request(session)
try:
data = await read_repos(response)
except APIError:
return
responses = await send_forked_repo_requests(session, data)
datasets = await read_forked_repo_requests(responses)
print_source_names(datasets)
async def main():
headers = {'Authorization': 'token {}'.format(TOKEN)}
async with aiohttp.ClientSession(loop=loop, headers=headers) as session:
await run(session)
loop.stop()
loop = asyncio.get_event_loop()
loop.create_task(main())
loop.run_forever()
import asyncio
import aiohttp
TOKEN = '__token__'
async def run(session):
response = await session.get('https://api.github.com/users/uranusjr/repos')
if response.status != 200:
print('API error {}'.format(response.status))
return
data = await response.json()
futures = [session.get(d['url']) for d in data if d.get('fork')]
responses = await asyncio.gather(*futures, loop=loop)
futures = [r.json() for r in responses if r.status == 200]
datasets = await asyncio.gather(*futures, loop=loop)
print('Forked sources:')
for data in datasets:
print('{} from {}'.format(
data['name'],
data['source']['full_name'],
))
async def main():
headers = {'Authorization': 'token {}'.format(TOKEN)}
async with aiohttp.ClientSession(loop=loop, headers=headers) as session:
await run(session)
loop.stop()
loop = asyncio.get_event_loop()
loop.create_task(main())
loop.run_forever()
aiohttp
promise
requests
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment