PYTHON FLASK ELASTICSEARCH – Search service and Query builder

P

Hi, and welcome to the 7th and last article devoted to the theme: “How to work with ElasticSearch, Python and Flask”. Previous article (Part 6: Python, Flask, ElasticSearch – indexer command) is located here. As a reminder I am providing our architecture scheme:

Search microservice architecture
Search microservice architecture

Finally we get to the end of the road. Now we are ready to investigate the search service. Let’s return to our controller (below I am providing only main method code – whole code you can find at 3d article of current tutorial). Here we have our search service that takes a query builder as an argument to it’s constructor.

def get(self):
    parser = self.prepareParser()
    request_data = parser.parse_args()

    builder = HotelSearchCriteriaUrlBuilder(request_data)
    director = HotelSearchCriteriaDirector(builder)
    director.build_criteria()
    criteria = director.get_criteria()

    search_service = SearchService(QueryBuilder())
    search_results = search_service.search(criteria)

    hotel_items_collection = []

    for hotel in search_results:
        hotel_items_collection.append(
            HotelSearchSimpleItemSchema.create_item_from_es_response(hotel)
        )

    result_response = HotelSearchResponseSchema.create_result_response(
        hotel_items_collection
    )

    return result_response if request_data else 0, 200

 Let’s open  SearchService class.

from src.model.criteria.hotel_search_criteria import HotelSearchCriteria
from src.dependencies.hotel_search.abstract_query_builder import AbstractQueryBuilder


class SearchService():

    def __init__(self, builder: AbstractQueryBuilder):
        self.builder = builder

    def search(self, criteria: HotelSearchCriteria):
        self.builder.create_query(criteria)
        search = self.builder.get_search()

        return search.execute()

As you can see it has rather simple structure in our case. It only executes search at a query that is prepared by the query builder. Now, please, have a look at dependencies -> hotel_search folder, where we have our query builder class and filters.

query_builder and filters

We extend our query builder class from abstract class. And here the most of the magic happens.

from src.model.criteria.hotel_search_criteria import HotelSearchCriteria
from src.dependencies.hotel_search.abstract_query_builder import AbstractQueryBuilder
from src.elasticsearch.connection import Connection
from src.elasticsearch.documents.hotels import HOTELS_INDEX
from elasticsearch_dsl import Search, Q
from src.dependencies.hotel_search.filters.city_name_filter import CityNameFilter
from src.dependencies.hotel_search.filters.hotel_name_filter import HotelNameFilter
from src.dependencies.hotel_search.filters.hotel_range_age_filter import HotelRangeAgeFilter
from src.dependencies.hotel_search.filters.geo_distance_filter import GeoDistanceFilter


class QueryBuilder(AbstractQueryBuilder):

    def __init__(self):
        self.search = Search(using=Connection.create_connection(), index=HOTELS_INDEX)

    def get_search(self) -> Search:
        return self.search

    def create_query(self, criteria: HotelSearchCriteria):
        self.set_page_offset(criteria)
        self.set_fields(criteria)
        self.set_filters(criteria)
        self.set_sorting(criteria)
        self.set_aggregations(criteria)

    def set_filters(self, criteria: HotelSearchCriteria):
        must_conditions = []
        should_conditions = []
        filter_conditions = []

        if criteria.city_name:
            must_conditions.append(CityNameFilter.create_filter(criteria))

        if criteria.hotel_name:
            must_conditions.append(HotelNameFilter.create_filter(criteria))

        if criteria.hotel_age:
            should_conditions.append(HotelRangeAgeFilter.create_filter(criteria))

        if criteria.geo_coordinates:
            filter_conditions.append(GeoDistanceFilter.create_filter(criteria))

        q_res = Q(
            'bool', 
            must=must_conditions, 
            should=should_conditions, 
            filter=filter_conditions
        )

        self.search = self.search.query(q_res)

    def set_page_offset(self, criteria: HotelSearchCriteria):
        start_from = (criteria.page - 1) * criteria.size
        start_from = start_from if start_from >= 0 else 0
        self.search = self.search[start_from:criteria.size]

    def set_fields(self, criteria: HotelSearchCriteria):
        # choose fields you want to get from ElasticSearch
        pass

    def set_aggregations(self, criteria: HotelSearchCriteria):
        # add aggregations
        pass

    def set_sorting(self, criteria: HotelSearchCriteria):
        # add sorting
        pass

The main method here is create_query, where we set pagination and sorting; fields we want to get; search filters and aggregations. My main goal is to show you Filter Design Pattern, so set_fields, set_aggregations and set_sorting methods are not realized – that is a small home task for you 🙂 Let’s concentrate at set_filters method now, where we adding all necessary filters gradually using Filter Design Pattern:

Filter design pattern
Filter design pattern

Builder interface is realized with AbstractFilter class:

import abc
from src.model.criteria.hotel_search_criteria import HotelSearchCriteria
from elasticsearch_dsl import Q


class AbstractFilter(abc.ABC):

    @abc.abstractstaticmethod
    def create_filter(self, criteria: HotelSearchCriteria) -> Q:
        pass

Then we have list of filters that are added dependently if according property is present at criteria DTO object. Below is the example of 2 realizations, that would be enough to understand general idea (if you want to get all realization, then please refer to my course, where you will get the access to whole project code). Every filter is extended from AbstractFilter:

from src.dependencies.hotel_search.filters.abstract_filter import AbstractFilter
from src.model.criteria.hotel_search_criteria import HotelSearchCriteria
from elasticsearch_dsl import Q


class CityNameFilter(AbstractFilter):
    FUZZINESS = 2

    @staticmethod
    def create_filter(criteria: HotelSearchCriteria) -> Q:
        q1 = Q(
            "match", 
            city_name_en={
                "query": criteria.city_name, 
                "fuzziness": CityNameFilter.FUZZINESS
            }
        )
        
        q2 = Q("match", city_name_en={"query": "London"})
        q_res = Q('bool', should=[q1, q2])

        return q_res

Please, pay attention at fuzziness property above. That is a rather interesting feature that means maximum edit distance allowed for matching. In practice it means that we allow 2 typos in word – and EaslticSearch will still allow us to perform correct search – cool, isn’t it? One more filter example:

from src.dependencies.hotel_search.filters.abstract_filter import AbstractFilter
from src.model.criteria.hotel_search_criteria import HotelSearchCriteria
from elasticsearch_dsl import Q


class HotelRangeAgeFilter(AbstractFilter):
    @staticmethod
    def create_filter(criteria: HotelSearchCriteria) -> Q:
        q_res = Q("range", age={"gte": criteria.hotel_age})

        return q_res

So we are constructing our filters to must, should, filter conditions at set_filters method. Filter design pattern is very useful at production, where you can have even hundreds of different filters. In that case it is not so easy to organize your code properly and to keep it in readable format. Current pattern helps a lot in resolving all such a problems. But it still will not resolve all your problems :). Imagine that you can have 30-50 such filters. It is rather easy to make some mistakes at composing complicated structures. You will need to constantly debug your final json query. Which way can we do it? First option is to dump some middle results at the controller by itself. That is not the most convenient way for several reasons. First of all uwsqi application servers cache our code – after any changes you have to refresh the cache. And at second our annotation defines some exact response structure – as result you will have to change it constantly.  So, suppose, better to reject that approach. Second option – is to configure a debugger at your code editor. That is a rather good solution. But as many people as many IDE and configurations. It would be difficult for me to show how to configure debugging at least for the most popular code editors. Fortunately, a rather universal solution exists. And that is using tests. Here I prepared one test that includes almost all controller functionality by itself. While running the test with -s option you can also dump any intermediate results. Here I commented on one such print statement which I suppose you would like a lot. So let’s uncomment print statement and run our test

from src.dependencies.hotel_search_criteria.hotel_search_criteria_url_builder import HotelSearchCriteriaUrlBuilder
from src.dependencies.hotel_search_criteria.hotel_search_criteria_director import HotelSearchCriteriaDirector
from src.dependencies.hotel_search.search_service import SearchService
from src.dependencies.hotel_search.query_builder import QueryBuilder
from src.model.response.hotel_search_simple_item_shema import HotelSearchSimpleItemSchema
import json


class TestClass:
    def test_one(self):
        request_data = {"c": "warsaw", "n": "golden", "age": 5, "lat": 52.21, "lng": 21.01}

        builder = HotelSearchCriteriaUrlBuilder(request_data)
        director = HotelSearchCriteriaDirector(builder)
        director.build_criteria()
        criteria = director.get_criteria()

        query_builder = QueryBuilder()
        query_builder.create_query(criteria)
        searh_query = query_builder.get_search()
        # print(json.dumps(searh_query.to_dict(), indent=4, sort_keys=True))

        search_service = SearchService(QueryBuilder())
        search_results = search_service.search(criteria)

        for hotel in search_results:
            item = HotelSearchSimpleItemSchema.create_item_from_es_response(hotel)
            assert item['star'] == 5

After running test at console you have to see raw json elasticsearch query – something similar to screen below:

ElasticSearch json raw query

To be completely aware of what is going on here, you have to understand the basics – how ElasticSearch works, how to read low level json queries. If you know basics – you can easily read that query at lower level, you can understand what was done wrong at upper level in case of some errors or unexpected results. You should understand that any programming language – that is only a high level wrapper – but basics stays always the same. When you understand fundamentals – than you can realize any logic within any programming language. If you would like to get deep knowledge from ElasticSearch fundamentals – then, please visit my course at udemy. Below is the link to the course. As the reader of that blog you are also getting possibility to use coupon for the best possible low price. Thank you for you attention. Hope that you liked current part.  I also hope that you could get out for yourself a lot of interesting and useful information within the whole tutorial. Thank you for being with me whole that time and welcome to my course if you want to get know more.


architecture cluster docker elasticsearch flask geo high availability java php programming languages python recommendation systems search systems spring boot symfony