How to Properly Use Serverless Technologies and AWS Lambda in Our Opinion

Updated on October 5, 2021
Read — 10 minutes
Contents

Recently I read and shared with team a topic about serverless technologies usage in terms of web API.

In short — the author did research to decide if he needs to use serverless technologies for his API instead of the old setup. He found out that Lambda is a little bit slower than EC2 Instance, but that’s not the main problem. The main problem was in the pricing, and actually it was in the price for API Gateway — not the Lambda itself.

I liked the research the author did and his approach. However, it reminded us of one of our successful usage of lambda.

Lambda function is a great asset for small independent functions (that’s all in the name). And especially it’s good when you need to use different APIs or scrap some web-pages. In that case, you can invoke as many lambdas as you need in parallel. Starting from 1 instance to a million without changing the infrastructure code. It just works and that’s it.

Each lambda will be launched on separate VPS with its IP address — which makes it hard to block your scrapper.
Because the only way to block it will be banning the whole pull of the AWS IP addresses, which is probably a bad decision.

Example

Words are cool, but what about some code example? Sure! Here we go.

In our example, we will work with cryptocurrency exchange markets to retrieve the latest info about the rate of exchange between two cryptocurrencies.

We’ll use CCXT API for that, it supports 127 different cryptocurrency exchange markets and has a Python implementation.

First of all, we need to create a simple lambda function. Its responsibility is to retrieve data about specified tickers from the specified market.

import json
import ccxt


def lambda_handler(event, context):
    id = event.get('exchanges')

    exchange_found = id in ccxt.exchanges

    output = []
    if exchange_found:
        exchange = getattr(ccxt, id)()

        for symbol in event.get('symbols'):
            ticker = exchange.fetch_ticker(symbol.upper())
            output.append({'exchange': id, 'symbol': symbol, 'ticker': ticker})

        return {'statusCode': 200, 'body': json.dumps(output)}

    return {
        'statusCode': 400,
        'body': json.dumps(f'Exchange {id} not found')
    }

When Lambda is ready — the next step is to run it. We will retrieve data from many sources and we need to do it as fast as possible because things can change very fast. For that purpose, we will make our code asynchronous and will invoke lambdas in parallel.

To invoke AWS Lambda we need to use Boto — AWS SDK for Python. In our case, the asynchronous version of Boto — aioboto3  .

client = aioboto3.client(
    'lambda',
    region_name='[YOUR_LAMBDA_REGION]',
    aws_access_key_id='[YOUR_AWS_KEY_ID]',
    aws_secret_access_key='[YOUR_AWS_KEY]'
)
response = await client.invoke(
    FunctionName='[YOUR_LAMBDA_FUNCTION_NAME_OR_ARN]',
    InvocationType='RequestResponse',
    LogType='Tail',
    Payload=json.dumps([PAYLOAD]),
)

Here is a simple example of how to invoke a lambda, you can always find more info in the Boto3 docs.

And here is the code of the runner I ended up with:

import ast
import asyncio
import json

import aioboto3


class LambdaInvoker:
    def __init__(self):
        self.client = aioboto3.client(
            'lambda',
            region_name='[YOUR_LAMBDA_REGION]',
            aws_access_key_id='[YOUR_AWS_KEY_ID]',
            aws_secret_access_key='[YOUR_AWS_KEY]'
        )

    async def invoke(self, payloads):
        results = await asyncio.gather(*map(self.__invoke_lambda, payloads))
        return results

    async def __invoke_lambda(self, payload):
        print(f'Invoking {payload["exchanges"]}')
        response = await self.client.invoke(
            FunctionName='[YOUR_LAMBDA_FUNCTION_NAME_OR_ARN]',
            InvocationType='RequestResponse',
            LogType='Tail',
            Payload=json.dumps(payload),
        )
        result = await response['Payload'].read()
        print(f'Finished {payload["exchanges"]}')

        return result

    async def close(self):
        await self.client.close()

    @staticmethod
    def print_result(result):
        data_dict = ast.literal_eval(result.decode('utf-8'))
        body_json = data_dict.get("body")
        if body_json:
            body = json.loads(body_json)
            print(json.dumps(body, indent=4))
        else:
            print(json.dumps(data_dict, indent=4))


if __name__ == '__main__':
    payloads = [
        {'exchanges': 'binance', 'symbols': ['BTC/USDT', 'ETH/BTC']},
        {'exchanges': 'cex', 'symbols': ['BTC/USD', 'ETH/BTC']},
        {'exchanges': 'bitfinex', 'symbols': ['BTC/USDT', 'ETH/BTC']},
        {'exchanges': 'kraken', 'symbols': ['BTC/USD', 'ETH/BTC']},
        {'exchanges': 'poloniex', 'symbols': ['BTC/USDT', 'ETH/BTC']}
    ]

    lambda_invoker = LambdaInvoker()

    loop = asyncio.get_event_loop()
    results = loop.run_until_complete(lambda_invoker.invoke(payloads))
    loop.run_until_complete(lambda_invoker.close())

    for result in results:
        LambdaInvoker.print_result(result)

The main logic here goes in methods __invoke_lambda and  invoke .

__invoke_lambda  uses code same as above to run lambda and retrieve its response. It’s an asynchronous method.

invoke  maps   __invoke_lambda  coroutine to each payload, and  asyncio.gather  schedules them as parallel tasks.

Bonus

If you don’t want to write or use asynchronous code — here is the same using the Python multiprocessing module:

import json
import multiprocessing as mp

import boto3


def run(payloads):
    pool = mp.Pool(len(payloads))
    return pool.map(invoke_lambda, payloads)


def invoke_lambda(payload):
    print(f'Invoking {payload["exchanges"]}')
    response = client.invoke(
        FunctionName='arn:aws:lambda:eu-central-1:557458395923:function:ccxt-exchange',
        InvocationType='RequestResponse',
        LogType='Tail',
        Payload=json.dumps(payload),
    )
    result = response['Payload'].read()
    print(f'Finished {payload["exchanges"]}')

    return result

Python

Copy


Thanks for reading! Hope you liked it.

Updated on October 5, 2021
Read — 10 minutes
Contents

How can we help you?