Source code for app.sql_service

import app
import model.models as model
import datetime
from parser import Parser


[docs]class ReceiverSQLService(object): """ Service that gets the xml input from the html request, generates SQL output and stores it into the configured relational database. """ def __init__(self, content): self.parser = Parser(content) self.time = datetime.datetime.now()
[docs] def store_data(self, user_ip): """ Stores the data in the XML into the relational database. :param user_ip: The IP of the user that calls the service. :raises: The exception that made the data storing fail. If an exception occurs, nothing will be stored into the database. """ session = app.db.session try: self._store_data(user_ip, session) session.commit() except: session.rollback() raise finally: session.close()
def _store_data(self, user_ip, session): organization = self._store_organization(session) self._store_user(session, organization, user_ip) datasource = self._store_datasource(session, organization) dataset = self._store_dataset(session, datasource) # Store indicators self._store_simple_indicators(dataset, session) self._store_compound_indicators(dataset, session) self._store_indicator_relationships(session) self._store_indicator_groups(session) # Store slices self._store_slices(dataset, session) # Store observations self._store_observations(dataset, session) def _store_simple_indicators(self, dataset, session): def enrich_indicator(ind): ind.starred = DBHelper.check_indicator_starred(session, ind.id) ind.last_update = self.time return ind for item in self.parser.get_simple_indicators(): indicator = session.merge(enrich_indicator(item)) dataset.add_indicator(indicator) session.flush() def _store_compound_indicators(self, dataset, session): def enrich_indicator(ind): ind.starred = DBHelper.check_indicator_starred(session, ind.id) ind.last_update = self.time for rel in DBHelper.find_indicators(session, ind.related_id): rel.compound_indicator_id = ind.id return ind for item in self.parser.get_compound_indicators(): indicator = session.merge(enrich_indicator(item)) dataset.add_indicator(indicator) session.flush() def _store_indicator_groups(self, session): for group in self.parser.get_indicator_groups(): indicator_ref = DBHelper.find_compound_indicator(session, group.indicator_ref) indicator_ref.indicator_ref_group_id = group.id session.merge(group) session.flush() def _store_indicator_relationships(self, session): indicators = self.parser.get_simple_indicators() for ind in indicators: # Each indicator may be related with others # The related_id field was created in the parser and WILL NOT be # persisted, it is only used to create the relationship objects relationships = [] for rel_id in ind.related_id: rel = model.IndicatorRelationship() rel.source_id = ind.id rel.target_id = rel_id relationships.append(rel) session.add_all(relationships) session.flush() def _store_dataset(self, session, datasource): dataset = self.parser.get_dataset() dataset.datasource_id = datasource.id session.add(dataset) session.flush() return dataset def _store_datasource(self, session, organization): xml_datasource = self.parser.get_datasource() db_datasource = DBHelper.check_datasource(session, xml_datasource.id) # The datasource may exist in the database. if db_datasource is not None: return db_datasource else: xml_datasource.organization_id = organization.id session.add(xml_datasource) session.flush() return xml_datasource def _store_organization(self, session): xml_organization = self.parser.get_organization() db_organization = DBHelper.check_organization(session, xml_organization.url) if db_organization is not None: return db_organization else: session.add(xml_organization) session.flush() return xml_organization def _store_user(self, session, organization, user_ip): xml_user = self.parser.get_user() db_user = DBHelper.check_user(session, xml_user.id) if db_user is not None: return db_user else: xml_user.organization_id = organization.id xml_user.ip = user_ip session.add(xml_user) session.flush() return xml_user def _store_observations(self, dataset, session): def enrich_observation(obs): obs.dataset_id = dataset.id if obs.region_code is not None: obs.region_id = DBHelper.find_region_id(session, obs.region_code) elif obs.country_code is not None: obs.region_id = DBHelper.find_country_id(session, obs.country_code) return obs #We need to make the flush after a certain number of elements in order to control #that not too many inserts are made at the same time, which results in a DataBase #crash. number_inserted = 0 for observation in self.parser.get_observations(): session.add(enrich_observation(observation)) number_inserted += 1 if number_inserted == 10000: session.flush() session.expunge_all() number_inserted = 0 session.flush() session.expunge_all() def _store_slices(self, dataset, session): def enrich_slice(sli): sli.dataset_id = dataset.id if sli.region_code is not None: sli.dimension_id = DBHelper.find_region_id(session, sli.region_code) elif sli.country_code is not None: sli.dimension_id = DBHelper.find_country_id(session, sli.country_code) return sli number_inserted = 0 for slice in self.parser.get_slices(): session.add(enrich_slice(slice)) number_inserted += 1 if number_inserted == 5000: number_inserted = 0 session.flush() session.expunge_all() session.flush() session.expunge_all()
[docs]class DBHelper(object): """ Helper for querying the relational database. """ @staticmethod
[docs] def check_datasource(session, datasource_id): """Find a DataSource in the DB. Returns None if not found.""" datasource = session.query(model.DataSource)\ .filter(model.DataSource.id == datasource_id)\ .first() return datasource
@staticmethod
[docs] def check_organization(session, organization_url): organization = session.query(model.Organization)\ .filter(model.Organization.url == organization_url)\ .first() return organization
@staticmethod
[docs] def check_user(session, user_id): """Check if a user exists in the database using its ID""" user = session.query(model.User).filter(model.User.id == user_id)\ .first() return user
@staticmethod
[docs] def check_indicator_starred(session, indicator_id): """Check if an Indicator is starred""" indicator = session.query(model.Indicator)\ .filter(model.Indicator.id == indicator_id)\ .first() return indicator.starred if indicator is not None else False
@staticmethod
[docs] def find_region_id(session, reg_code): """Get the Region ID using its UN_CODE""" region = session.query(model.Region)\ .filter(model.Region.un_code == reg_code)\ .first() if region is None: raise Exception("The region with UN_CODE = {} does not exist in " "the database".format(reg_code)) else: return region.id
@staticmethod
[docs] def find_country_id(session, country_code): """Get the Country ID using its ISO3""" country = session.query(model.Country)\ .filter(model.Country.iso3 == country_code)\ .first() if country is None: raise Exception("The country with ISO3 = {} does not exist in " "the database".format(country_code)) else: return country.id
@staticmethod
[docs] def find_observations(session, ids): """Get all observations in a list of IDs""" return session.query(model.Observation)\ .filter(model.Observation.id.in_(ids))\ .all()
@staticmethod
[docs] def find_indicators(session, ids): """Get all indicators in a list of IDs""" return session.query(model.Indicator)\ .filter(model.Indicator.id.in_(ids))\ .all()
@staticmethod
[docs] def find_compound_indicator(session, ind_id): """Get a compound indicator by its ID""" return session.query(model.CompoundIndicator)\ .filter(model.CompoundIndicator.id == ind_id)\ .first()